小言_互联网的博客

RabbitMq消息防丢失(含springboot代码示例)

395人阅读  评论(0)

目录

1.概述

1.1.数据丢失的原因

1.2.如何防止数据丢失

2.手动应答

3.消息确认机制

3.1.AMQP事务

3.2.confirm


1.概述

1.1.数据丢失的原因

在消息中有三种可能性造成数据丢失:

  1. 消费者消费消息失败
  2. 生产者生产消息失败
  3. MQ数据丢失

消费者消费消息失败:

RabbitMq存在应答机制,默认为自动应答,MQ向消费者推送一条消息,消费者收到这条消息后会返回一个ack(应答)给MQ,MQ收到应答后会删除这条消息。

自动应答存在一个问题,就是消费者收到消息后立马就会给MQ返回ack,如果消费者返回完ack但还没来的及真正处理这条消息时,消费者断电宕机了,那么这条消息就丢失了。

这就是由于消费者消费消息失败造成的数据丢失。

生产者生产数据失败:

生产者向MQ推送了一条消息,但是由于由于诸如网络故障等原因mq并没有收到该条消息,这样就造成了这条消息的丢失。

MQ数据丢失:

MQ的数据是存在内存中的,诸如断电等原因可能会造成数据的丢失。

1.2.如何防止数据丢失

解决以上列举的数据丢失问题的办法有三种:

  1. 手动应答
  2. 消息确认机制
  3. 持久化

手动应答:

RabbitMQ默认是自动应答,消费者收到消息后就会自动返回ack给MQ,可以将应答模式改为手动应答,在消费者一侧消息的消费动作完成后手动来返回ack给MQ,用来解决“消费者消费消息失败”问题。

消息确认机制:

当消息队列收到消息后,告知生产者,让生产者感知到自己生产的消息,消息队列已经接收到,用来解决“生产者生产消息失败”问题。消息确认机制有两种实现方式:

  • AMQP事务
  • confirm

持久化:

消息队列的消息持久化到磁盘上,用来解决“MQ数据丢失”问题。

2.手动应答

手动应答是通过设置channel来实现的,以下为一个完整代码示例。

配置类:


  
  1. @Configuration
  2. public class config {
  3. @Bean
  4. public Queue queue (){
  5. return new Queue( "queue_01", false);
  6. }
  7. }

生产者:


  
  1. @SpringBootTest(classes = Main.class)
  2. public class Producer {
  3. @Autowired
  4. RabbitTemplate rabbitTemplate;
  5. @Test
  6. public void producerMsg (){
  7. rabbitTemplate.convertAndSend( "queue_01", "hello_world");
  8. }
  9. }

消费者:


  
  1. @Component
  2. @Slf4j
  3. public class Consumer {
  4. @RabbitListener(queues = {"queue_01"})
  5. public void consumerMsg (String msg, Message message,Channel channel){
  6. try {
  7. log.info( "消费者消费消息: "+msg);
  8. /**
  9. * 没有异常就确认消息
  10. * basicAck(long deliveryTag, boolean multiple)
  11. * deliveryTag:当前消息在队列中的的索引;
  12. * multiple:为true的话就是批量确认
  13. */
  14. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  15. } catch (Exception e) {
  16. /**
  17. * 有异常就拒收消息
  18. * basicNack(long deliveryTag, boolean multiple, boolean requeue)
  19. * requeue:true为将消息重返当前消息队列,重新发送给消费者;
  20. * false将消息丢弃
  21. */
  22. try {
  23. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
  24. } catch (Exception ex) {
  25. log.error(ex.getMessage());
  26. }
  27. }
  28. }
  29. }

3.消息确认机制

AQMP事务、confirm其实都是基于channel的。

3.1.AMQP事务

AMQP事务和数据库事务类似,定义一组对MQ的操作,统一提交,成功则全部一起执行,失败则全部回滚。AMQP事务在spring boot中的使用很简单,和数据库的事务一样,一个注解就可以搞定。


  
  1. @GetMapping("/direct/wx/transactional")
  2. @Transactional(rollbackFor = Exception.class)
  3. public String sendDirectMessageTransactional () {
  4. rabbitTemplate.convertAndSend( "direct_exchange", "wx", "hello world!");
  5. log.info( "开启事务消息机制");
  6. try {
  7. Thread.sleep( 5000);
  8. } catch (Exception e) {
  9. e.printStackTrace();
  10. }
  11. return "ok";
  12. }

3.2.confirm

confirm是基于channel的,一旦channel进入confirm模式,所有在该channel上发布的消息都会被指派一个唯一的ID(从1开始),消息被投递道匹配队列后broker会发送一个确认消息给生产者。如果消息和队列是可持久化的(durable为true),那么确认消息会在消息被写入磁盘后发出。

confirm最大的好处在于异步,生产者在等待上一条消息的确认消息的时候可以继续往下发送。

confirm在spring boot中的使用很简单,在配置文件中开启即可,并且支持自定义回调函数:

配置文件:

spring.rabbitmq.publisher-confirms: true

spring.rabbitmq.publisher-returns: true

生产者:


  
  1. @Slf4j
  2. @Component
  3. public class RabbitmqService implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
  4. @Autowired
  5. private RabbitTemplate rabbitTemplate;
  6. public void sendMessage (String exchange,String routingKey,Object msg) {
  7. // 设置交换机处理失败消息的模式 true 表示消息由交换机 到达不了队列时,会将消息重新返回给生产者
  8. // 如果不设置这个指令,则交换机向队列推送消息失败后,不会触发 setReturnCallback
  9. rabbitTemplate.setMandatory( true);
  10. //消息消费者确认收到消息后,手动ack回执
  11. rabbitTemplate.setConfirmCallback( this);
  12. // 暂时关闭 return 配置
  13. //rabbitTemplate.setReturnCallback(this);
  14. //发送消息
  15. rabbitTemplate.convertAndSend(exchange,routingKey,msg);
  16. }
  17. /**
  18. * 交换机并未将数据丢入指定的队列中时,触发
  19. * channel.basicPublish(exchange_name,next.getKey(), true, properties,next.getValue().getBytes());
  20. * 参数三:true 表示如果消息无法正常投递,则return给生产者 ;false 表示直接丢弃
  21. * @param message 消息对象
  22. * @param replyCode 错误码
  23. * @param replyText 错误信息
  24. * @param exchange 交换机
  25. * @param routingKey 路由键
  26. */
  27. @Override
  28. public void returnedMessage (Message message, int replyCode, String replyText, String exchange, String routingKey) {
  29. log.info( "---- returnedMessage ----replyCode="+replyCode+ " replyText="+replyText+ " ");
  30. }
  31. /**
  32. * 消息生产者发送消息至交换机时触发,用于判断交换机是否成功收到消息
  33. * @param correlationData 相关配置信息
  34. * @param ack exchange 交换机,判断交换机是否成功收到消息 true 表示交换机收到
  35. * @param cause 失败原因
  36. */
  37. @Override
  38. public void confirm (CorrelationData correlationData, boolean ack, String cause) {
  39. log.info( "---- confirm ----ack="+ack+ " cause="+String.valueOf(cause));
  40. log.info( "correlationData -->"+correlationData.toString());
  41. if(ack){
  42. // 交换机接收到
  43. log.info( "---- confirm ----ack==true cause="+cause);
  44. } else{
  45. // 没有接收到
  46. log.info( "---- confirm ----ack==false cause="+cause);
  47. }
  48. }
  49. }


转载:https://blog.csdn.net/Joker_ZJN/article/details/128402725
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场