飞道的博客

带你从头进行RabbitMQ安装、集群搭建、镜像队列配置和代码验证

308人阅读  评论(0)

前言

不知道说什么好,直接开始吧。本来想采用最新版本的,一想到生产和测试必须版本保持一致,不能随便升级,就只好去下载指定版本的rabbitmq的rpm。

RabbitMQ概念

Broker :消息中间件的服务节点,RabbitMQ的一个服务实例,也可以看做是RabbitMQ的一台服务器

Queue 队列:用于存储消息。kafka不一样,它的消息存在在topic逻辑层面,而队列存储的只是topic中实际存储文件中的编译标识。多个消费者可以同时订阅一个队列,平均分摊(Round-robin轮询)处理消息

Exchange 交换器:生产者将消息发送到交换器,由交换器路由到一个或者多个队列中

  • direct exchangequeue进行bingding时会设置相应的routingkey。生产者发送消息到交换器时会设定相应的routingkey,如果这两个routingkey相同,消息都会投放到绑定的队列上。

  • topic 和direct一样,但是支持routingkey的通配符模式,可以有通配符:* , #。 其中 * 表示匹配一个单词, #则表示匹配没有或者多个单词

  • fanout 直接将发送到该交换器的消息路由到它绑定的一个或者多个队列

  • header 根据添加的header来判断

    • x-match == all,匹配所有header

    • x-match == any, 只需要匹配其中的一个header的值

Routingkey 路由键: 生产者将消息发给交换器的时候, 一般会指定一个 RoutingKey ,用 来指定这个消息的路由规则,而这个 RoutingKey 需要与交换器类型和绑定键 (BindingKey) 合起来使用才能最终生效。在交换器类型和绑定键 (BindingKey) 固定的情况下,生产者可以在发送消息给交换器时, 通过指定 RoutingKey 来决定消息流向哪里

Bindingkey 绑定:通过绑定将交换器与队列关联起来,在绑定的时候一般会指定一绑定键 BindingKey ,这样 RabbitMQ 就知何正确将消息路由到队列了。BindingKey只针对特定交换器才有效。

Producer:消息生产者

Consumer:消息消费者

安装条件

环境

Centos 7.4 3台虚机8c16g

用户权限

需要有sudo权限

安装文件

下载的文件统一在/home/lazasha/download目录下, rabbitmq和erlang对应的版本关系可以参考:

https://www.rabbitmq.com/which-erlang.html

epel: epel-release-7-12.noarch.rpm

下载地址:https://mirrors.tuna.tsinghua.edu.cn/epel/7/x86_64/Packages/e/epel-release-7-12.noarch.rpm

erlang:erlang-22.1.8-1.el7.x86_64.rpm

下载地址:https://github.com/rabbitmq/erlang-rpm/releases

rabbitmq: rabbitmq-server-3.8.2-1.el7.noarch.rpm

下载地址:https://dl.bintray.com/rabbitmq/all/rabbitmq-server/3.8.2/

key: rabbitmq-release-signing-key.asc (我好像后面没有用到)

下载地址:https://github.com/rabbitmq/signing-keys/releases

步骤

epel安装

sudo yum -y install epel-release-7-12.noarch.rpm

erlang安装

sudo yum -y install erlang-22.1.8-1.el7.x86_64.rpm

 

检查是否安装成功:

输入:erl

 

rabbitmq安装

sudo yum -y install rabbitmq-server-3.8.2-1.el7.noarch.rpm 

 

验证是否成功:


  
  1. sudo systemctl  start rabbitmq- server 
  2. sudo systemctl  status rabbitmq- server

 

停止服务:

sudo systemctl stop rabbitmq-server

在他两台机器上同样操作. 服务缺省端口是5672.

集群搭建

在3台机器上/etc/hosts文件中添加IP和节点名称的对应


  
  1. 10 .156 .13 .92  lchod1392
  2. 10 .156 .13 .93  lchod1393
  3. 10 .156 .13 .94  lchod1394

把lchod1392上的 cookie文件,赋值到lchod1393、lchod1394节点上,集群环境下各个节点的cookie必须一致。rpm安装的cookie 文件默认路 径为 /var/lib/rabbitmq/.erlang.cookie

注意:.erlang.cookie可能有权限问题,可以使用下面的操作:

sudo chmod -R 600 /var/lib/rabbitmq/.erlang.cookie

注意: 拷贝到另外两台机器上后,不管怎么样执行一下下面的命令,改一下.erlang.cookie的owner:

sudo chown -R rabbitmq:rabbitmq /var/lib/rabbitmq/.erlang.cookie

 

通过Rabbitmqctl来配置集群,集群内部通讯端口是25672

1.首先启动3个节点上的RabbitMQ服务

sudo systemctl start rabbitmq-server

可以使用rabbitmqctl cluster_status 查看各个节点的集群状态

2.以lchod1392为基准,将lchod1393、lchod1394加入到集群中,把3个节点都设置为硬盘节点了。

lchod1393


  
  1.     sudo rabbitmqctl stop_app                     //只关闭rabbitmq服务,不关闭erlang服务
  2.     sudo rabbitmqctl reset                        //这个命令我在加集群时没有执行
  3.     sudo rabbitmqctl join_cluster rabbit @lchod1392    //--ram这个参数是内存节点模式,不是就是硬盘节点
  4.     sudo rabbitmqctl start_app

lchod1394


  
  1.     sudo rabbitmqctl stop_app                     //只关闭rabbitmq服务,不关闭erlang服务
  2.     sudo rabbitmqctl reset                        //这个命令我在加集群时没有执行
  3.     sudo rabbitmqctl join_cluster rabbit @lchod1392    //--ram这个参数是内存节点模式,不是就是硬盘节点
  4.     sudo rabbitmqctl start_app

 

3.检查集群状态

sudo rabbitmqctl cluster_status

 

注意点: 如果关闭了集群中的所有节点,确保启动时最后一个关闭的节点第一个启动,否则会有问题。

创建远程访问用户


  
  1. sudo rabbitmqctl add_user rabbitmq ******
  2. sudo rabbitmqctl set_user_tags rabbitmq administrator
  3. sudo rabbitmqctl set_permissions -p  "/" rabbitmq  ".*"  ".*"  ".*"
  4. //查看新增加的用户
  5. sudo rabbitmqctl list_users

 

注意:不用在启动后台管理插件了,使用systemctl start rabbitmq-server就已经启动了,端口是15672

Mirror Queue 镜像队列搭建

针对每一个镜像队列都包含一个master节点 和 多个slave节点,需求确保队列的master节点均匀分散的落在集群的各个broker中。如果master不工作,那么假如镜像队列最早的salve升级为master.

镜像队列的配置主要是通过添加相应的 Policy 来完成 :


  
  1. rabbitmqctl set_policy [-p vhost) [--priority
  2. priority) [--apply-  to apply-  to) {name) {pattern) {definition)

definition 要包含 个部分 ha-mode、 ha-params、 ha-sync-mode

  • ha-mode 指明镜像队列的模式,有效值为 all/exactly/nodes默认为 all
        all 表示在集群中所有的节点上进行镜像
        exactly 表示在指定个数的节点上进行镜像,节点个数由 ha-params 指定;
        nodes 表示在指定节点上进行镜像,节点名称通ha-params 指定,节点的名称通常类似于 rabbit@hostname ,可以通过rabbitmqctl cluster status 命令查看到

  • ha-params 不同的 hamode 配置中需要用到的参数。

  • ha-sync-mode 队列中消息的同步方式,有效值为 automatic 、manual

 

命令样例

  • 对队列名称以 queue_" 开头的所有队列进行镜像,并在集群的两个节点上完 成镜像

    
        
    1. rabbitmqctl set_policy --priority  0 --apply-to queues mirror_queue  " ^queue_"
    2. ' { "ha-mode ": "exactly""ha-params "2, "ha-sync-mode ""automatic" }'
  • 对队列名称以 queue_" 开头的所有队列进行镜像,并在集群的所有节点上完 成镜像

    
        
    1. rabbitmqctl set_policy --priority  0 --apply-to queues mirror_queue  " ^queue_"
    2. ' { "ha-mode ": "all""ha-sync-mode ""automatic" }'

    rabbitmqctl set_policy ha-all “^” ‘{“ha-mode”:“all”}’ 可以把队列设置为镜像队列

命令执行


  
  1.    sudo rabbitmqctl set_policy --priority  0 --apply-to queues mirror_queue  " ^queue_"
  2.    ' { "ha-mode ": "all""ha-sync-mode ""automatic" }'

验证

使用新建的rabbitmq用户从本机登录远程的机器

lchod1392: 创建一个队列,以queue开头

lchod1393: 已经有了这个队列

lchod1394: 有了这个队列

队列知识

mandatory、 immediate 参数 channel.basicPublish 方法中的两个参数

  • mandatory参数 mandatory 参数设为 true 时,交换器无法根据自身的类型和路由键找到一个符合条件 的队列,那么 RabbitMQ 会调用 Basic.Return 命令将消息返回给生产者 。当 mandatory 数设置为 false 时,出现上述情形,则消息直接被丢弃。那么生产者如何获取到没有被正确路由到合适队列的消息呢?这时候可以通过调用 channel.addReturnListener 来添加 ReturnListener 监昕器实现。

  • immediate参数 immediate 参数设为 true 时,如果交换器在将消息路由到队列时发现队列上并不存在 任何消费者,那么这条消息将不会存入队列中。当与路由键匹配所有队列都没有消费者时,该消息会通过 Basic.Return 返回至生产者。

  • 概括来说 mandatory 参数告诉服务器至少将该消息路由到一个队中, 将消息返 回给生产者。 imrnediate 参数告诉服务器 如果该消息关联的队列上有消费者, 立刻投递; 如果所有匹配的队列上都没有消费者,则直接将消息返还给生产者,不用将消息存入队列而等待消费者了。

  • RabbitMQ 3.0 版本开始去掉了对 immediate 参数的支持,对此RabbitMQ官方解释是 immediate 参数会影响镜像队列的性能,增加代码码复杂性,建议采用 TTL、 DLX 的方法

TTL time to live 过期时间

  • 设置方式:通过队列属性设置,整个队列的消息都有同样的过期时间;也可以对单条消息单独设置,则一个队列中消息有不同的过期时间。如果两种都设置了,以时间小的为准

  • 设置队列消息的TTL代码

    
        
    1. Map< String, Object> argss =  new HashMap< StringObject>();
    2. argss.put( "x-message-ttl " ,  5000);
    3. channel.queueDeclare(queueName , durable , exclusive , autoDelete , argss) ; 

    这种方式, 一旦消息过期,就会从队列中抹去

    针对每条消息设置 TTL 的方法是在 channel.basicPublish 方法中加入 expiration 的属性参数,单位为毫秒:

    
        
    1. AMQP.BasicProperties.Builder builder =  new AMQP.BasicProperties.Builder();
    2. builder deliveryMode(2); 持久化消息
    3. builder expiration50000 );/ 设置 TTL= 50000ms
    4. AMQP.BasicProperties properties = builder. build() ;
    5. channel.basicPublish(exchangeName , routingKey, mandatory, properties,
    6. "test ttl".getBytes());

    这种方式, 即使消息过期,也不会马上从队列中抹去,因为每条消息是否过期是在即将投递到消费者之前判定的

  • 如果不设置 TTL.则表示此消息不会过期 ;如果将 TTL 设置为 0,则表示除非此时可以直 接将消息投递到消费者,否则该消息会被立即丢弃

  • 设置队列的TTL

    通过 channel.queueDeclare 方法中的 expires 参数可以控制队列被自动删除前处于未使用状态的时间。未使用的意思是队列上没有任何的消费者,队列也没有被重新声明,并 且在过期时间段内也未调用过 Basic.Get 命令。

    
        
    1. Map< String ,  Object> args =口ew HashMap< StringObject>{) ;
    2. args . put(  "x-expires" ,  100000); 
    3. channel . queueDeclare( "queuesleb " ,  false ,  false ,  false , args) ; 

死信队列 DLX(Dead Letter Message) 当 消息在一个队列中变成死信 (dea message) 之后,它能被重新被发送到另一个交换器中,这个 交换器就是 DLX ,绑定 DLX 的队列就称之为死信队列。

  • 消息被拒绝 (Basic.Reject/Basic .Na ck) ,井且设置 requeue 参数为 false

  • 消息过期

  • 队列达到最大长度

  • 可以创建消费者监听这个队列的消息进行处理

  • 通过在 channel.queueDeclare 方法中设置 x-dead-letter-exchange 参数来为这 个队列添加 DLX

     


  
  1.     channel.exchangeDeclare( "dlx_exchange " ,  "direct ");  // 创建 DLX: dlx_exchange
  2.      Map< StringObject> args =  new HashMap< StringObject>();
  3.     args.put( "x-dead-letter-exchange" ,  " dlx-exchange ");
  4.      //为队列 myqueue 添加 DLX
  5.     channel.queueDeclare( "myqueue" ,  false ,  false ,  false , args); 
  6.      //也可以为这个 DLX 指定路由键,如果没有特殊指定,则使用原队列的路由键, 如果指定了,则消费者需要使用
  7.      //的路由键才能消费这个队列的消息:
  8.     args.put( "x-dead-letter-routing-key" ,  "dlx-routing-key"); 

延迟队列

  • 场景:一个订单在30分钟内支付有效,否则自动取消

  • 利用上面的TTL和DLX来达到延迟队列的功能

优先级队列

通过设置队列的 x-max-priority 参数来实现:


  
  1.      Map< StringObject> args =  new HashMap< StringObject>() ;
  2.     args.put(  "x-max-priority" ,  10) ;
  3.     channel.queueDeclare(  "queue.priority" ,  true , fa1se ,  false , args) ; 

在生产者速度大于消费者速度且broker中有积压的消息的时候,才有效果

持久化

  • 交换器的持久化、队列的持久化和消息的持久化 ,才能真正的持久化

  • 交换器的持久化:设置durable = true

  • 队列的持久化: durable = true

  • 消息的持久化:通过将消息的投递模式 (BasicPropertes 中的 deliveryMode 属性)设置为2( DeliveryMode.PERSISTENT) 即可实现消息的持久化 )

发送方确认机制 publisher confirm

publisher-confirms: true #确认消息是否到达broker服务器,也就是只确认是否正确到达exchange中即可,只要正确的到达exchange中,broker即可确认该消息返回给客户端

ackpublisher-returns: true #确认消息是否正确到达queue,如果没有则触发,如果有则不触发

ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后接收ack回调。


  
  1.     rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
  2.                  if (ack) {
  3.                      CorrelationDataEx  c = ( CorrelationDataEx)correlationData;
  4.                      System.out. println( "发送消息: " +  c.getMsg());
  5.                      System.out. println( "HelloSender 消息发送成功 :" + correlationData. toString() );
  6.                      /**
  7.                      * 通过设置correlationData.id为业务主键,消息发送成功后去继续做候选业务。
  8.                      */
  9.                 }  else {
  10.                      System.out. println( "HelloSender消息发送失败" + cause);
  11.                 }
  12.             });

ReturnCallback接口用于实现消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调


  
  1.      rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
  2.                   //Users users1 = (Users)message.getBody().toString();
  3.                   //String correlationId = message.getMessageProperties().getCorrelationId();
  4.                  System.out. println( "Message : " +  new String(message.getBody()));
  5.                   //System.out.println("Message : " + new String(message.getBody()));
  6.                  System.out. println( "replyCode : " + replyCode);
  7.                  System.out. println( "replyText : " + replyText);   //错误原因
  8.                  System.out. println( "exchange : " + exchange);
  9.                  System.out. println( "routingKey : " + routingKey); //queue名称
  10.              });

 


  
  1.       /**
  2.               * CorrelationDataEx继承CorrelationData, 把需要发送消息的关键字段加入
  3.               * 这样confirmcallback可以返回带有关键字段的correlationData,我们可以通过这个来确定发送的是那条业务记录
  4.               */
  5.               CorrelationDataEx  c = new  CorrelationDataEx();
  6.               c.setId(users.getId(). toString());
  7.               c.setMsg(users. toString());
  8.               /**
  9.               * 加上这个,可以从returncallback参数中读取发送的json消息,否则是二进制bytes
  10.               * 比如:如果returncallback触发,则表明消息没有投递到队列,则继续业务操作,比如将消息记录标志位未投递成功,记录投递次数
  11.               */
  12.              rabbitTemplate.setMessageConverter(new  Jackson2JsonMessageConverter());
  13.              rabbitTemplate.convertAndSend( EXCHANGEQUEUE_TWO_ROUTING, users,  c);

消息消费

1.配置


  
  1.         listener:
  2.               simple:
  3.                 prefetch:  1                #设置一次处理一个消息
  4.                 acknowledge-mode: manual   #设置消费端手动 ack
  5.                 concurrency:  3             #设置同时有3个消费者消费,需要3个消费者实例

2.代码


  
  1.          @RabbitHandler
  2.              @RabbitListener(queues = QUEUE_ONE_ROUTING)  //containerFactory = "rabbitListenerContainerFactory", concurrency = "2")
  3.              public void process(Users users, Channel channel, Message message) throws IOException {
  4.                 System.out.println( "HelloReceiver收到  : " + users.toString() +  "收到时间" +  new Date());
  5.                  try {
  6.                      //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了
  7.                      // 否则消息服务器以为这条消息没处理掉 后续还会在发
  8.                     channel.basicAck(message.getMessageProperties().getDeliveryTag(),  false);
  9.                     System.out.println( "receiver success");
  10.                 }  catch (IOException e) {
  11.                     e.printStackTrace();
  12.                      //丢弃这条消息,则不会重新发送了
  13.                      //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
  14.                     System.out.println( "receiver fail");
  15.                 }
  16.             }

验证

创建消息生产者和消费者

生产者

集群配置:


  
  1.     spring:
  2.       application:
  3.         name: rabbitmq-producer-demo
  4.       rabbitmq:
  5.          # 单点配置
  6.          #host: localhost
  7.          #port: 5672
  8.          # 集群的配置
  9.         addresses:  10.156 .13 .92: 5672, 10.156 .13 .93: 5672, 10.156 .13 .94: 5672
  10.         username: rabbitmq   #guest是缺省,只能localhost网络访问,要访问远程网络,需要创建用户
  11.         password:  123456
  12.          # 像mysql拥有数据库的概念并且可以指定用户对库和表等操作的权限。那RabbitMQ呢?RabbitMQ也有类似的权限管理。
  13.          # 在RabbitMQ中可以虚拟消息服务器VirtualHost,每个VirtualHost相当于一个相对独立的RabbitMQ服务器,
  14.          # 每个VirtualHost之间是相互隔离的。exchange、queue、message不能互通。 相当于mysql的db。
  15.          # Virtual Name一般以/开头
  16.          virtual-host: /
  17.          # 确认消息是否正确到达queue,如果没有则触发,如果有则不触发
  18.         publisher-returns:  on
  19.          # 确认消息是否到达broker服务器,也就是只确认是否正确到达exchange中即可,
  20.          # 只要正确的到达exchange中,broker即可确认该消息返回给客户端ack
  21.          # 如果是simple就不会回调
  22.         publisher-confirm-type: correlated
  23.         template:
  24.            #设置为 on 后 消费者在消息没有被路由到合适队列情况下会被return监听,而不会自动删除
  25.           mandatory:  on

队列设置: 设置了queue_sleb_accept队列


  
  1.      @Configuration
  2.      public  class RabbitConfig {
  3.          /**
  4.          * 投保消息交换机的名字
  5.          */
  6.          public static  final String EXCHANGE_SLEB_ACCEPT =  "exchange_sleb_accept";
  7.          /**
  8.          * 投保消息队列
  9.          */
  10.          public static  final String QUEUE_SLEB_ACCEPT =  "queue_sleb_accept";
  11.          /**
  12.          * 投保消息路由键
  13.          */
  14.          public static  final String ROUTING_KEY_ACCEPT =  "routing_key_accept";
  15.          /**
  16.          *  投保消息死信交换机
  17.          */
  18.          public static  final String DLX_EXCHANGE_SLEB_ACCEPT =  "exchange_dlx_sleb_accept";
  19.          /**
  20.          * 投保消息死信队列
  21.          */
  22.          public static  final String DLX_QUEUE_SLEB_ACCEPT =  "queue_dlx_sleb_accept";
  23.          /**
  24.          *  常用交换器类型如下:
  25.          *       Direct(DirectExchange):direct 类型的行为是"先匹配, 再投送".
  26.          *       即在绑定时设定一个 routing_key, 消息的routing_key完全匹配时, 才会被交换器投送到绑定的队列中去。
  27.          *       Topic(TopicExchange):按规则转发消息(最灵活)。
  28.          *       Headers(HeadersExchange):设置header attribute参数类型的交换机。
  29.          *       Fanout(FanoutExchange):转发消息到所有绑定队列。
  30.          *
  31.          * 下面都是采用direct, 必须严格匹配exchange和queue
  32.          * 投保消息交换机
  33.          */
  34.          @Bean("slebAcceptExchange")
  35.         DirectExchange slebAcceptExchange() {
  36.              return ExchangeBuilder.directExchange(EXCHANGE_SLEB_ACCEPT).durable( true).build();
  37.         }
  38.          /**
  39.          * 第二个参数 durable: 是否持久化,如果true,则此种队列叫持久化队列(Durable queues)。此队列会被存储在磁盘上,
  40.          *                 当消息代理(broker)重启的时候,它依旧存在。没有被持久化的队列称作暂存队列(Transient queues)。
  41.          * 第三个参数 execulusive: 表示此对应只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
  42.          * 第四个参数 autoDelete: 当没有生成者/消费者使用此队列时,此队列会被自动删除。(即当最后一个消费者退订后即被删除)
  43.          *
  44.          * 这儿是(queue)队列持久化(durable=true),exchange也需要持久化
  45.          * ********************死信队列**********************************************************
  46.          *            x-dead-letter-exchange    这里声明当前队列绑定的死信交换机
  47.          *            x-dead-letter-routing-key  这里声明当前队列的死信路由key
  48.          *            采用死信队列,才会用到下面的参数
  49.          *            Map<String, Object> args = new HashMap<>(2);
  50.          *            args.put("x-dead-letter-exchange", DLX_EXCHANGE_SLEB_ACCEPT);
  51.          *            args.put("x-dead-letter-routing-key", ROUTING_KEY_ACCEPT);
  52.          *            return QueueBuilder.durable(QUEUE_SLEB_ACCEPT).withArguments(args).build();
  53.          * ********************死信队列**********************************************************
  54.          * 投保消息队列
  55.          */
  56.          @Bean("slebAcceptQueue")
  57.          public Queue slebAcceptQueue() {
  58.              return QueueBuilder.durable(QUEUE_SLEB_ACCEPT).build();
  59.         }
  60.          /**
  61.          * 交换机、队列、绑定
  62.          */
  63.          @Bean("bindingSlebAcceptExchange")
  64.         Binding bindingSlebAcceptExchange( @Qualifier("slebAcceptQueue") Queue queue,
  65.                                            @Qualifier("slebAcceptExchange") DirectExchange directExchange) {
  66.              return BindingBuilder.bind(queue).to(directExchange).with(ROUTING_KEY_ACCEPT);
  67.         }
  68.          /**
  69.          * 投保死信交换机
  70.          */
  71.          @Bean("slebDlxAcceptExchange")
  72.         DirectExchange slebDlxAcceptExchange() {
  73.              return ExchangeBuilder.directExchange(DLX_EXCHANGE_SLEB_ACCEPT).durable( true).build();
  74.         }
  75.          /**
  76.          * 投保死信队列
  77.          */
  78.          @Bean("slebDlxAcceptQueue")
  79.          public Queue slebDlxAcceptQueue() {
  80.              return QueueBuilder.durable(DLX_QUEUE_SLEB_ACCEPT).build();
  81.         }
  82.          /**
  83.          * 死信交换机、队列、绑定
  84.          */
  85.          @Bean("bindingDlxSlebAcceptExchange")
  86.         Binding bindingDlxSlebAcceptExchange( @Qualifier("slebDlxAcceptQueue") Queue     queue,  @Qualifier("slebDlxAcceptExchange") DirectExchange directExchange) {
  87.              return BindingBuilder.bind(queue).to(directExchange).with(ROUTING_KEY_ACCEPT);
  88.         }

生产消息


  
  1.      @Service
  2.      public  class AcceptProducerServiceImpl implements AcceptProducerService {
  3.          private  final Logger logger = LoggerFactory.getLogger(AcceptProducerServiceImpl.class);
  4.          private  final RabbitTemplate rabbitTemplate;
  5.          public AcceptProducerServiceImpl(RabbitTemplate rabbitTemplate) {
  6.              this.rabbitTemplate = rabbitTemplate;
  7.         }
  8.          @Override
  9.          public void sendMessage(PolicyModal policyModal) {
  10.             logger.info( "开始发送时间: " + DateUtils.localDateTimeToString(LocalDateTime.now())
  11.                     +  ",保单号: " + policyModal.getPolicyNo()
  12.                     +  ",发送内容: " + policyModal.toString());
  13.              /*
  14.              * policyDataEx继承CorrelationData, 把需要发送消息的关键字段加入
  15.              * 这样confirmcallback可以返回带有关键字段的correlationData,我们可以通过这个来确定发送的是那条业务记录
  16.              * policyno为唯一的值
  17.              */
  18.             PolicyDataEx policyDataEx =  new PolicyDataEx();
  19.             policyDataEx.setId(policyModal.getPolicyNo());
  20.             policyDataEx.setMessage(policyModal.toString());
  21.              /*
  22.              * 加上这个,可以从returncallback参数中读取发送的json消息,否则是二进制bytes
  23.              * 比如:如果returncallback触发,则表明消息没有投递到队列,则继续业务操作,比如将消息记录标志位未投递成功,记录投递次数
  24.              */
  25.              //rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
  26.              //PolicyModal类的全限名称(包名+类名)会带入到mq, 所以消费者服务一边必须有同样全限名称的类,否则接收会失败。
  27.             rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_SLEB_ACCEPT, RabbitConfig.ROUTING_KEY_ACCEPT, policyModal, policyDataEx);
  28.         }

运行验证

http://localhost:9020/sendsing

 

查看3台服务器控制台:看到已经创建了镜像队列,并且有一个消息在队列里面:

消费者

配置


  
  1.     spring:
  2.       application:
  3.         name: rabbitmq-consumer-demo
  4.       rabbitmq:
  5.          # 单点配置
  6.          #host: localhost
  7.          #port: 5672
  8.          # 集群的配置
  9.         addresses:  10.156 .13 .92: 5672, 10.156 .13 .93: 5672, 10.156 .13 .94: 5672
  10.         username: rabbitmq
  11.         password:  123456
  12.          # 像mysql拥有数据库的概念并且可以指定用户对库和表等操作的权限。那RabbitMQ呢?RabbitMQ也有类似的权限管理。
  13.          # 在RabbitMQ中可以虚拟消息服务器VirtualHost,每个VirtualHost相当于一个相对独立的RabbitMQ服务器,
  14.          # 每个VirtualHost之间是相互隔离的。exchange、queue、message不能互通。 相当于mysql的db。
  15.          # Virtual Name一般以/开头
  16.          virtual-host: /
  17.         listener:
  18.           simple:
  19.             prefetch:  1                #设置一次处理一个消息
  20.             acknowledge-mode: manual   #设置消费端手动 ack
  21.             concurrency:  3             #设置同时有3个消费者消费
  22.              #消息接收确认,可选模式:NONE(不确认)、AUTO(自动确认)、MANUAL(手动确认)

配置队列名称,主要名称和生产者里面的名称一样


  
  1.      public  class RabbitMQConfigInfo {
  2.          /**
  3.          * 投保消息队列
  4.          */
  5.          public  static  final  String  QUEUE_SLEB_ACCEPT =  "queue_sleb_accept";
  6.          /**
  7.          * 投保消息交换机的名字
  8.          */
  9.          public  static  final  String  EXCHANGE_SLEB_ACCEPT =  "exchange_sleb_accept";
  10.          /**
  11.          * 投保消息路由键
  12.          */
  13.          public  static  final  String  ROUTING_KEY_ACCEPT =  "routing_key_accept";
  14.     }

消费


  
  1.      @Service
  2.      public  class RabbitConsumerServiceImpl implements RabbitConsumerService {
  3.          private  final Logger logger = LoggerFactory.getLogger(RabbitConsumerServiceImpl.class);
  4.          @RabbitHandler
  5.          @RabbitListener(bindings =  @QueueBinding(
  6.                 value =  @Queue(name = QUEUE_SLEB_ACCEPT, durable =  "true"),
  7.                 exchange =  @Exchange(name = EXCHANGE_SLEB_ACCEPT,
  8.                         ignoreDeclarationExceptions =  "true"),
  9.                 key = {ROUTING_KEY_ACCEPT}
  10.         ))
  11.          @Override
  12.          public void process(Channel channel, Message message) throws IOException {
  13.             String jsonStr =  new String(message.getBody());
  14.             logger.info( "接收信息时间: " + DateUtils.localDateTimeToString(LocalDateTime.now())
  15.                     +  "n,消息:" + jsonStr);
  16.              //PolicyModal类的全限名称(包名+类名)会带入到mq, 所以消费者服务一边必须有同样全限名称的类,否则接收会失败。
  17.             PolicyModal policyModal = JsonUtils.JSON2Object(jsonStr, PolicyModal.class);
  18.              assert policyModal !=  null;
  19.              try {
  20.                  //将message中的body获取出来, 转换为PolicyModal,再获取policyno
  21.                  //更根据policyno新数据库里面的标志,
  22.                  // todo
  23.                  //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了
  24.                  // 否则消息服务器以为这条消息没处理掉 后续还会在发
  25.                  //throw new IOException("myself");
  26.                 channel.basicAck(message.getMessageProperties().getDeliveryTag(),  false);
  27.                  /*logger.info("接收处理成功:n"
  28.                         + "接收信息时间: " + DateUtils.localDateTimeToString(LocalDateTime.now())
  29.                         + ",保单号: " + policyModal.getPolicyNo()
  30.                         + "n,消息:" + new String(message.getBody()));
  31.     */
  32.             }  catch (IOException e) {
  33.                 e.printStackTrace();
  34.                  //丢弃这条消息,则不会重新发送了
  35.                  //一般不丢弃,超时后mq自动会转到死信队列(如果设置了超时时间和死信交换机和队列后)
  36.                  //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
  37.                 logger.info( "接收处理失败:n"
  38.                         +  "接收信息时间: " + DateUtils.localDateTimeToString(LocalDateTime.now())
  39.                         +  ",保单号: " + policyModal.getPolicyNo()
  40.                         +  "n,消息:" +  new String(message.getBody()));
  41.             }
  42.         }
  43.     }

启动验证

在看各个服务器控制台:消息已经被消费,队列里面消息为0

结束

技术文章难写,这个花了前后一个礼拜的时间,希望对大家有帮助。有要验证代码的,可以发邮件:lazasha@163.com联系我,我给你发。懒,没空上github,回来再说。

END

Java面试题专栏

【30期】说一下HashMap的实现原理?

【29期】Java集合框架 10 连问,你有被问过吗?

【28期】ZooKeeper面试那些事儿

【27期】Dubbo面试八连问,这些你都能答上来吗?

【26期】如何判断一个对象是否存活?(或者GC对象的判定方法)?

【25期】这三道常见的面试题,你有被问过吗?

【24期】请你谈谈单例模式的优缺点,注意事项,使用场景

【23期】请你谈谈关于IO同步、异步、阻塞、非阻塞的区别

【22期】为什么需要消息队列?使用消息队列有什么好处?

【21期】你能说说Java中Comparable和Comparator的区别吗

 


转载:https://blog.csdn.net/weixin_38405253/article/details/103903623
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场