飞道的博客

RabbitMQ精讲4:深入RabbitMQ高级特性-可靠性投递、幂等性消费、Confirm确认消息、Return返回消息

295人阅读  评论(0)

目录

前言

1 消息如何保障100%的投递成功?

1.1 什么是生产端的可靠性投递?

BAT/TMD 互联网大厂的解决方案:

1.2 生产端-可靠性投递方案1-消息落库,对消息状态进行打标

在高并发的场景下是否合适?

1.3 生产端-可靠性投递方案2-消息的延迟投递,做二次确认,回调检查

2. 幂等性概念2.1 幂等性是什么?

2.2 消费端-幂等性保障

2.2.1 唯一ID+指纹码机制

2.2.2 Redis 原子特性实现

3. Confirm 确认消息

3.1 如何实现Confirm确认消息?

3.2 代码编写:

生产者:

消费者:

4. Return消息机制

4.1 Return消息机制流程

4.2 代码演示

生产者:

消费者:


前言

本章主要为大家讲解RabbitMQ的高级特性和实际场景应用,包括

  • 消息如何保障 100% 的投递成功 ?
  • 幂等性概念详解,
  • 在海量订单产生的业务高峰期,如何避免消息的重复消费问题?
  • Confirm确认消息、Return返回消息等。

1 消息如何保障100%的投递成功?

1.1 什么是生产端的可靠性投递?

生产端的可靠性投递

 

  • 保障消息的成功发出
  • 保障MQ节点的成功接收
  • 发送端收到MQ节点(Broker)确认应答
  • 完善的消息进行补偿机制

前三步不一定能保障消息能够100%投递成功。因此要加上第四步

BAT/TMD 互联网大厂的解决方案:

生产端的可靠性投递

1. 消息落库,对消息状态进行打标

  • 在发送消息的时候,需要将消息持久化到数据库中,并给这个消息设置一个状态(未发送、发送中、到达)。当消息状态发生了变化,需要对消息做一个变更。针对没有到达的消息做一个轮训操作,重新发送。对轮训次数也需要做一个限制3-5次。确保消息能够成功的发送.

2. 消息的延迟投递,做二次确认,回调检查
具体采用哪种方案,还需要根据业务与消息的并发量而定。

1.2 生产端-可靠性投递方案1-消息落库,对消息状态进行打标

生产端-可靠性投递方案1-消息落库,对消息状态进行打标

 

  • 蓝色部分表示:生产者负责发送消息发送至Broker端
  • Biz DB:订单数据库 MSG DB: 消息数据
  • 面对小规模的应用可以采用加事务的方式,保证事务的一致性。但在大厂中面对高并发,并没有加事务,事务的性能拼接非常严重,而是做补偿。

比如:如下发一条订单消息。

  • step1:存储订单消息(创建订单),业务数据入库,消息也入库。缺点:需要持久化两次。(status:0)
  • step2:在step1成功的前提下,发送消息
  • step3:Broker收到消息后,confirm给我们的生产端。Confirm Listener异步监听Broker回送的消息。
  • step4:抓取出指定的消息,更新(status=1),表示消息已经投递成功。
  • step5:分布式定时任务获取消息状态,如果等于0则抓取数据出来。
  • step6:重新发送消息
  • step7:重试限制设置3次。如果消息重试了3次还是失败,那么(status=2),认为这个消息就是失败的。

查询这些消息为什么失败,可能需要人工去查询。

  • 假设step2执行成功,step3由于网络闪断。那么confirm将永远收不到消息,那么我们需要设定一个规则:
  • 例如:在消息入库的时候,设置一个临界值 timeout=5min,当超过5min之后,就将这条数据抓取出来。
  • 或者写一个定时任务每隔5分钟就将status=0的消息抓取出来。可能存在小问题:消息发送出去,定时任务又正好刚执行,Confirm还未收到,定时任务就会执行,会导致消息执行两次。
  • 更精细化操作:消息超时容忍限制。confirm在2-3分钟内未收到消息,则重新发送

在高并发的场景下是否合适?

第一种方案对数据有两次入库,一次业务数据入库,一次消息入库。这样对数据的入库是一个瓶颈。
其实我们只需要对业务进行入库。

1.3 生产端-可靠性投递方案2-消息的延迟投递,做二次确认,回调检查

可靠性投递方案2-消息的延迟投递,做二次确认,回调检查
  • 这种方式并不一定能保证100%成功,但是也能保证99.99%的消息成功。如果遇到特别极端的情况,那么就只能需要人工去补偿,或者定时任务去做。
  • 第二种方式主要是为了减少对数据库的操作。
可靠性投递方案2-消息的延迟投递,做二次确认,回调检查
  • Upstream service:生产端
  • DownStream service:消费端
  • Callback service:回调服务

比如:

  • step1:业务消息入库成功后,第一次消息发送。
  • step2:同样在消息入库成功后,发送第二次消息,这两条消息是同时发送的。第二条消息是延迟检查,可以设置2min、5min 延迟发送。
  • step3:消费端监听指定队列。
  • step4:消费端处理完消息后,内部生成新的消息send confirm。投递到MQ Broker。
  • step5: Callback Service 回调服务监听MQ Broker,如果收到Downstream service发送的消息,则可以确定消息发送成功,执行消息存储到MSG DB。
  • step6:Check Detail检查监听step2延迟投递的消息。此时两个监听的队列不是同一个,5分钟后,Callback service收到消息,检查MSG DB。如果发现之前的消息已经投递成功,则不需要做其他事情。如果检查发现失败,则Callback 进行补偿,主动发送RPC 通信。通知上游生产端重新发送消息。

这样做的目的:少做了一次DB存储。关注点并不是百分百的投递成功,而是性能。

2. 幂等性概念
2.1 幂等性是什么?

幂等性
  • 幂等(idempotent、idempotence)是一个数学与计算机学概念,常见于抽象代数中,即f(f(x)) = f(x)。简单的来说就是一个操作多次执行产生的结果与一次执行产生的结果一致。
  • 我们可以借鉴数据库的乐观锁机制:
  • 比如我们执行一条更新库存的SQL语句:UPDATE T_REPS SET COUNT = COUNT - 1,VERSION = VERSION + 1 WHERE VERSION = 1
  • 利用加版本号Version的方式来保证幂等性。

2.2 消费端-幂等性保障

消费端-幂等性保障

在海量订单产生的业务高峰期,如何避免消息的重复消费问题?

在高并发的情况下,会有大量的消息到达MQ,消费端需要监听大量的消息。这样的情况下,难免会出现消息的重复投递,网络闪断等等。如果不去做幂等,则会出现消息的重复消费。

-消费端实现幂等性,就意味着,我们的消息永远不会被消费多次,即使我们收到了多条一样的消息,也只会执行一次。

看下互联网大厂主流的幂等性操作:

幂等性
  1. -唯一ID+指纹码机制,利用数据库主键去重。
  2. -利用Redis的原子性实现
  3. -其他的技术实现幂等性

2.2.1 唯一ID+指纹码机制

唯一ID+指纹码机制


唯一ID + 指纹吗机制,利用数据库主键去重。

  • 保证唯一性
  • SELECT COUNT(1) FROM T_ORDER WHERE ID = 唯一ID + 指纹码
  • 如果查询没有,则添加。有则不需要做任何操作,消费端不需要消费消息。
  • 好处:实现简单
  • 坏处:高并发下有数据库写入的性能瓶颈
  • 解决方案:跟进ID进行分库分表进行算法路由分摊流量压力。

2.2.2 Redis 原子特性实现

Redis 原子特性


使用Redis的自增。

使用Redis进行幂等,需要考虑的问题。

  • 第一:我们是否需要数据落库,如果落库的话,关键解决的问题是数据库和缓存如何做到原子性?
    • 加事务不行,Redis和数据库的事务不是同一个,无法保证同时成功同时失败。大家有什么更好的方案呢?
  • 第二:如果不进行落库,那么都存储到缓存中,如何设置定时同步的策略?
    • 怎么做到缓存数据的稳定性?

3. Confirm 确认消息

Confirm 确认消息

理解Confirm 消息确认机制:

消息的确认,是指生产者投递消息后,如果Broker收到消息,则会给我们生产者一个应答。
生产者进行接收应答,用来确定这条消息是否正常的发送到Broker,这种方式也是消息的可靠性投递的核心保障!

Confirm 确认消息

 

  • 蓝色:producer 生产者   红色:MQ Broker 服务器
  • 生产者把消息发送到Broker端,Broker收到消息之后回送给producer。Confirm Listener 监听应答。
  • 操作是异步操作,当生产者发送完消息之后,就不需要管了。Confirm Listener 监听MQ Broker的应答。

3.1 如何实现Confirm确认消息?

实现Confirm确认消息


第一步:在channel上开启确认模式:channel.confirmSelect()
第二步;在chanel上 添加监听:addConfirmListener,监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或记录日志等后续处理!

3.2 代码编写:

生产者:


  
  1. import java.io.IOException;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.ConfirmListener;
  4. import com.rabbitmq.client.Connection;
  5. import com.rabbitmq.client.ConnectionFactory;
  6. public class Producer {
  7. public static void main(String[] args) throws Exception {
  8. //1 创建ConnectionFactory
  9. ConnectionFactory connectionFactory = new ConnectionFactory();
  10. connectionFactory.setHost( "192.168.11.76");
  11. connectionFactory.setPort( 5672);
  12. connectionFactory.setVirtualHost( "/");
  13. //2 获取Connection
  14. Connection connection = connectionFactory.newConnection();
  15. //3 通过Connection创建一个新的Channel
  16. Channel channel = connection.createChannel();
  17. //4 指定我们的消息投递模式: 消息的确认模式
  18. channel.confirmSelect();
  19. String exchangeName = "test_confirm_exchange";
  20. String routingKey = "confirm.save";
  21. //5 发送一条消息
  22. String msg = "Hello RabbitMQ Send confirm message!";
  23. channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
  24. //6 添加一个确认监听
  25. channel.addConfirmListener( new ConfirmListener() {
  26. @Override
  27. public void handleNack(long deliveryTag, boolean multiple) throws IOException {
  28. System.err.println( "-------no ack!-----------");
  29. }
  30. @Override
  31. public void handleAck(long deliveryTag, boolean multiple) throws IOException {
  32. System.err.println( "-------ack!-----------");
  33. }
  34. });
  35. }
  36. }

消费者:


  
  1. import com.rabbitmq.client.Channel;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. import com.rabbitmq.client.QueueingConsumer;
  5. import com.rabbitmq.client.QueueingConsumer.Delivery;
  6. public class Consumer {
  7. public static void main(String[] args) throws Exception {
  8. //1 创建ConnectionFactory
  9. ConnectionFactory connectionFactory = new ConnectionFactory();
  10. connectionFactory.setHost( "192.168.11.76");
  11. connectionFactory.setPort( 5672);
  12. connectionFactory.setVirtualHost( "/");
  13. //2 获取Connection
  14. Connection connection = connectionFactory.newConnection();
  15. //3 通过Connection创建一个新的Channel
  16. Channel channel = connection.createChannel();
  17. String exchangeName = "test_confirm_exchange";
  18. String routingKey = "confirm.#";
  19. String queueName = "test_confirm_queue";
  20. //4 声明交换机和队列 然后进行绑定设置, 最后制定路由Key
  21. channel.exchangeDeclare(exchangeName, "topic", true);
  22. channel.queueDeclare(queueName, true, false, false, null);
  23. channel.queueBind(queueName, exchangeName, routingKey);
  24. //5 创建消费者
  25. QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
  26. channel.basicConsume(queueName, true, queueingConsumer);
  27. while( true){
  28. Delivery delivery = queueingConsumer.nextDelivery();
  29. String msg = new String(delivery.getBody());
  30. System.err.println( "消费端: " + msg);
  31. }
  32. }
  33. }
  • 可以观察到消费端先接收到消息,之后生产端再接收到回调信息。如果出现磁盘已满、RabbitMQ出现异常、queue容量到达上限都可能接收到no ack
  • 如果ack和no ack消息都未接收到,这就是之前所说的。RabbitMQ出现网络闪断,可以采用上面所说的消息补偿。

4. Return消息机制

Return消息机制

 

  • Return Listener用于处理一些不可路由的消息!
  • 我们的消息生产者,通过指定一个Exchange和Routingkey,把消息送达到某一个队列中去,然后我们的消费者监听队列,进行消费处理操作!
  • 但是在某些情况下,如果我们在发送消息的时候,当前的exchange不存在或者指定的路由key路由不到,这个时候如果我们需要监听这种不可达的消息,就要使用Return Listener!
Return消息机制
  • 在基础API中有一个关键的配置项:
    • Mandatory:如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么broker端自动删除该消息!

4.1 Return消息机制流程

Return消息机制流程

 

  • Producer生产端将消息发送到MQ Broker端,但是出现NotFind Exchange,发送的消息的Exchange,在Broker端未能找到。或者找到了,但是路由key路由不到指定的队列。因此是一个错误的消息。
  • 这个时候,生产端应该知道发送的这条消息,并不会被处理。因此MQ Broker提供了这种Return机制,将这些不可达的消息发送给生产端,这时候生产端就需要设置Return Listener去接收这些不可达的消息。然后及时记录日志,去处理这些消息。

4.2 代码演示

生产者:


  
  1. import java.io.IOException;
  2. import com.rabbitmq.client.AMQP;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. import com.rabbitmq.client.ConnectionFactory;
  6. import com.rabbitmq.client.ReturnListener;
  7. import com.rabbitmq.client.AMQP.BasicProperties;
  8. public class Producer {
  9. public static void main(String[] args) throws Exception {
  10. ConnectionFactory connectionFactory = new ConnectionFactory();
  11. connectionFactory.setHost( "192.168.11.76");
  12. connectionFactory.setPort( 5672);
  13. connectionFactory.setVirtualHost( "/");
  14. Connection connection = connectionFactory.newConnection();
  15. Channel channel = connection.createChannel();
  16. String exchange = "test_return_exchange";
  17. String routingKey = "return.save";
  18. String routingKeyError = "abc.save";
  19. String msg = "Hello RabbitMQ Return Message";
  20. channel.addReturnListener( new ReturnListener() {
  21. @Override
  22. public void handleReturn(int replyCode, String replyText, String exchange,
  23. String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
  24. System.err.println( "---------handle return----------");
  25. System.err.println( "replyCode: " + replyCode);
  26. System.err.println( "replyText: " + replyText);
  27. System.err.println( "exchange: " + exchange);
  28. System.err.println( "routingKey: " + routingKey);
  29. System.err.println( "properties: " + properties);
  30. System.err.println( "body: " + new String(body));
  31. }
  32. });
  33. channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());
  34. //channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());
  35. }
  36. }

消费者:


  
  1. import com.rabbitmq.client.Channel;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. import com.rabbitmq.client.QueueingConsumer;
  5. import com.rabbitmq.client.QueueingConsumer.Delivery;
  6. public class Consumer {
  7. public static void main(String[] args) throws Exception {
  8. ConnectionFactory connectionFactory = new ConnectionFactory();
  9. connectionFactory.setHost( "192.168.11.76");
  10. connectionFactory.setPort( 5672);
  11. connectionFactory.setVirtualHost( "/");
  12. Connection connection = connectionFactory.newConnection();
  13. Channel channel = connection.createChannel();
  14. String exchangeName = "test_return_exchange";
  15. String routingKey = "return.#";
  16. String queueName = "test_return_queue";
  17. channel.exchangeDeclare(exchangeName, "topic", true, false, null);
  18. channel.queueDeclare(queueName, true, false, false, null);
  19. channel.queueBind(queueName, exchangeName, routingKey);
  20. QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
  21. channel.basicConsume(queueName, true, queueingConsumer);
  22. while( true){
  23. Delivery delivery = queueingConsumer.nextDelivery();
  24. String msg = new String(delivery.getBody());
  25. System.err.println( "消费者: " + msg);
  26. }
  27. }
  28. }

 


转载:https://blog.csdn.net/fly910905/article/details/105464139
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场