飞道的博客

RabbitMq事务实现

388人阅读  评论(0)

我们都知道事务的四大特性,但是那是针对的数据库的事务。但是Rabbitmq的事务到底是表达何种意思?根据一般概念的规律来说,mq的事务和数据库事务是类似的。我们可以将mq看做是数据库。

rabbitmq提供了与三个事务相关的命令:select、commit、rollback

其中select表示将当前模式设置为标准事务模式,commit表示提交当前事务,rollback表示事物回滚。也就是说select开启事务,通过commit操作之后publish的消息一定在消息队列中,当然如果发生rollback回滚,那么消息队列中的消息就会被撤销掉。AMQP事务大概过程如下图所示:

大概得代码如下:


   
  1. @Slf4j
  2. @Configuration
  3. public class RabbitConfig {
  4. /**
  5. * 消息转化
  6. * @return
  7. */
  8. @Bean
  9. public MessageConverter customMessageConvert() {
  10. return new Jackson2JsonMessageConverter();
  11.     }
  12. @Bean
  13. public Queue directOneQueue() {
  14. Map map= new HashMap<>();
  15. map.put( "x-max-priority", 10);
  16. return new Queue( "DDD", true, false, false,map);
  17.     }
  18. @Bean
  19. public Queue directTwoQueue() {
  20. Map map= new HashMap<>();
  21. return new Queue( "EEE", true, false, false,map);
  22.     }
  23. /**
  24. * 定义一个rabbitmq消息发送器
  25. * @param connectionFactory
  26. * @return
  27. */
  28. @Bean
  29. public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
  30. //mq事务是通过事务管理器提交的,这块不能设置为手动提交
  31. // connectionFactory.setPublisherConfirms(true);
  32. connectionFactory.setPublisherReturns( true);
  33. RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
  34. rabbitTemplate.setMandatory( true);
  35. rabbitTemplate.setChannelTransacted( true);
  36. //这块也和发送消息确认有关系
  37. // rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause));
  38. rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> log.info( "消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message));
  39. return rabbitTemplate;
  40. }
  41. /**
  42. * 配置启用rabbitmq事务
  43. * @param connectionFactory
  44. * @return
  45. */
  46. @Bean("rabbitTransactionManager")
  47. public RabbitTransactionManager rabbitTransactionManager(CachingConnectionFactory connectionFactory) {
  48. return new RabbitTransactionManager(connectionFactory);
  49. }
  50. }

发送消息测试:


   
  1. //通过id是否为0决定是否抛出异常
  2. @GetMapping(value = "/test/{id}")
  3. @Transactional(rollbackFor = Exception.class,transactionManager = "rabbitTransactionManager")
  4. public void test(@PathVariable int id) throws Exception {
  5. try {
  6. Test t= new Test();
  7. t.setName( "tianjingle-ceshi");
  8. byte[] body = JSON.toJSONBytes(t, SerializeConfig.globalInstance);
  9. //设置消息相关属性
  10. MessageProperties messageProperties = new MessageProperties();
  11. messageProperties.setMessageId(UUID.randomUUID().toString());
  12. messageProperties.setContentType(MediaType.APPLICATION_JSON_VALUE);
  13. messageProperties.setPriority( 10);
  14. messageProperties.setCorrelationId( "tianjingle");
  15. messageProperties.setReplyTo( "EEE");
  16. Message message1 = new Message(body, messageProperties);
  17. rabbitTemplate.convertAndSend( "DDD",message1);
  18. int z= 1/id;
  19. } catch (Exception e){
  20. throw new Exception( "12");
  21.         }
  22. }

事务回滚的情况。

事务提交的情况

总结:通过上述实践,我们认为AMQP的事务是完全可靠的,但是事务的加入势必会让消息队列的性能上有所损耗,因为每个步骤都需要broker做出响应。


转载:https://blog.csdn.net/tianjingle_blog/article/details/114697723
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场