我们都知道事务的四大特性,但是那是针对的数据库的事务。但是Rabbitmq的事务到底是表达何种意思?根据一般概念的规律来说,mq的事务和数据库事务是类似的。我们可以将mq看做是数据库。
rabbitmq提供了与三个事务相关的命令:select、commit、rollback
其中select表示将当前模式设置为标准事务模式,commit表示提交当前事务,rollback表示事物回滚。也就是说select开启事务,通过commit操作之后publish的消息一定在消息队列中,当然如果发生rollback回滚,那么消息队列中的消息就会被撤销掉。AMQP事务大概过程如下图所示:
大概得代码如下:
-
@Slf4j
-
@Configuration
-
public
class RabbitConfig {
-
/**
-
* 消息转化
-
* @return
-
*/
-
@Bean
-
public MessageConverter customMessageConvert() {
-
return
new Jackson2JsonMessageConverter();
-
}
-
@Bean
-
public Queue directOneQueue() {
-
Map map=
new HashMap<>();
-
map.put(
"x-max-priority",
10);
-
return
new Queue(
"DDD",
true,
false,
false,map);
-
}
-
@Bean
-
public Queue directTwoQueue() {
-
Map map=
new HashMap<>();
-
return
new Queue(
"EEE",
true,
false,
false,map);
-
}
-
/**
-
* 定义一个rabbitmq消息发送器
-
* @param connectionFactory
-
* @return
-
*/
-
@Bean
-
public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
-
//mq事务是通过事务管理器提交的,这块不能设置为手动提交
-
// connectionFactory.setPublisherConfirms(true);
-
connectionFactory.setPublisherReturns(
true);
-
RabbitTemplate rabbitTemplate =
new RabbitTemplate(connectionFactory);
-
rabbitTemplate.setMandatory(
true);
-
rabbitTemplate.setChannelTransacted(
true);
-
//这块也和发送消息确认有关系
-
// rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause));
-
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> log.info(
"消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message));
-
return rabbitTemplate;
-
}
-
/**
-
* 配置启用rabbitmq事务
-
* @param connectionFactory
-
* @return
-
*/
-
@Bean("rabbitTransactionManager")
-
public RabbitTransactionManager rabbitTransactionManager(CachingConnectionFactory connectionFactory) {
-
return
new RabbitTransactionManager(connectionFactory);
-
}
-
}
发送消息测试:
-
//通过id是否为0决定是否抛出异常
-
@GetMapping(value = "/test/{id}")
-
@Transactional(rollbackFor = Exception.class,transactionManager = "rabbitTransactionManager")
-
public void test(@PathVariable int id) throws Exception {
-
try {
-
Test t=
new Test();
-
t.setName(
"tianjingle-ceshi");
-
byte[] body = JSON.toJSONBytes(t, SerializeConfig.globalInstance);
-
//设置消息相关属性
-
MessageProperties messageProperties =
new MessageProperties();
-
messageProperties.setMessageId(UUID.randomUUID().toString());
-
messageProperties.setContentType(MediaType.APPLICATION_JSON_VALUE);
-
messageProperties.setPriority(
10);
-
messageProperties.setCorrelationId(
"tianjingle");
-
messageProperties.setReplyTo(
"EEE");
-
Message message1 =
new Message(body, messageProperties);
-
rabbitTemplate.convertAndSend(
"DDD",message1);
-
int z=
1/id;
-
}
catch (Exception e){
-
throw
new Exception(
"12");
-
}
-
}
事务回滚的情况。
事务提交的情况
总结:通过上述实践,我们认为AMQP的事务是完全可靠的,但是事务的加入势必会让消息队列的性能上有所损耗,因为每个步骤都需要broker做出响应。
转载:https://blog.csdn.net/tianjingle_blog/article/details/114697723
查看评论