目录
1.概述
1.1.数据丢失的原因
在消息中有三种可能性造成数据丢失:
- 消费者消费消息失败
- 生产者生产消息失败
- MQ数据丢失
消费者消费消息失败:
RabbitMq存在应答机制,默认为自动应答,MQ向消费者推送一条消息,消费者收到这条消息后会返回一个ack(应答)给MQ,MQ收到应答后会删除这条消息。
自动应答存在一个问题,就是消费者收到消息后立马就会给MQ返回ack,如果消费者返回完ack但还没来的及真正处理这条消息时,消费者断电宕机了,那么这条消息就丢失了。
这就是由于消费者消费消息失败造成的数据丢失。
生产者生产数据失败:
生产者向MQ推送了一条消息,但是由于由于诸如网络故障等原因mq并没有收到该条消息,这样就造成了这条消息的丢失。
MQ数据丢失:
MQ的数据是存在内存中的,诸如断电等原因可能会造成数据的丢失。
1.2.如何防止数据丢失
解决以上列举的数据丢失问题的办法有三种:
- 手动应答
- 消息确认机制
- 持久化
手动应答:
RabbitMQ默认是自动应答,消费者收到消息后就会自动返回ack给MQ,可以将应答模式改为手动应答,在消费者一侧消息的消费动作完成后手动来返回ack给MQ,用来解决“消费者消费消息失败”问题。
消息确认机制:
当消息队列收到消息后,告知生产者,让生产者感知到自己生产的消息,消息队列已经接收到,用来解决“生产者生产消息失败”问题。消息确认机制有两种实现方式:
- AMQP事务
- confirm
持久化:
消息队列的消息持久化到磁盘上,用来解决“MQ数据丢失”问题。
2.手动应答
手动应答是通过设置channel来实现的,以下为一个完整代码示例。
配置类:
-
@Configuration
-
public
class
config {
-
@Bean
-
public Queue
queue
(){
-
return
new
Queue(
"queue_01",
false);
-
}
-
}
生产者:
-
@SpringBootTest(classes = Main.class)
-
public
class
Producer {
-
-
@Autowired
-
RabbitTemplate rabbitTemplate;
-
-
@Test
-
public
void
producerMsg
(){
-
rabbitTemplate.convertAndSend(
"queue_01",
"hello_world");
-
}
-
}
消费者:
-
@Component
-
@Slf4j
-
public
class
Consumer {
-
@RabbitListener(queues = {"queue_01"})
-
public
void
consumerMsg
(String msg, Message message,Channel channel){
-
try {
-
log.info(
"消费者消费消息: "+msg);
-
/**
-
* 没有异常就确认消息
-
* basicAck(long deliveryTag, boolean multiple)
-
* deliveryTag:当前消息在队列中的的索引;
-
* multiple:为true的话就是批量确认
-
*/
-
channel.basicAck(message.getMessageProperties().getDeliveryTag(),
false);
-
}
catch (Exception e) {
-
/**
-
* 有异常就拒收消息
-
* basicNack(long deliveryTag, boolean multiple, boolean requeue)
-
* requeue:true为将消息重返当前消息队列,重新发送给消费者;
-
* false将消息丢弃
-
*/
-
try {
-
channel.basicNack(message.getMessageProperties().getDeliveryTag(),
false,
true);
-
}
catch (Exception ex) {
-
log.error(ex.getMessage());
-
}
-
}
-
}
-
}
3.消息确认机制
AQMP事务、confirm其实都是基于channel的。
3.1.AMQP事务
AMQP事务和数据库事务类似,定义一组对MQ的操作,统一提交,成功则全部一起执行,失败则全部回滚。AMQP事务在spring boot中的使用很简单,和数据库的事务一样,一个注解就可以搞定。
-
@GetMapping("/direct/wx/transactional")
-
@Transactional(rollbackFor = Exception.class)
-
public String
sendDirectMessageTransactional
() {
-
rabbitTemplate.convertAndSend(
"direct_exchange",
"wx",
"hello world!");
-
-
log.info(
"开启事务消息机制");
-
try {
-
Thread.sleep(
5000);
-
}
catch (Exception e) {
-
e.printStackTrace();
-
}
-
return
"ok";
-
}
3.2.confirm
confirm是基于channel的,一旦channel进入confirm模式,所有在该channel上发布的消息都会被指派一个唯一的ID(从1开始),消息被投递道匹配队列后broker会发送一个确认消息给生产者。如果消息和队列是可持久化的(durable为true),那么确认消息会在消息被写入磁盘后发出。
confirm最大的好处在于异步,生产者在等待上一条消息的确认消息的时候可以继续往下发送。
confirm在spring boot中的使用很简单,在配置文件中开启即可,并且支持自定义回调函数:
配置文件:
spring.rabbitmq.publisher-confirms: true
spring.rabbitmq.publisher-returns: true
生产者:
-
@Slf4j
-
@Component
-
public
class
RabbitmqService
implements
RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
-
-
@Autowired
-
private RabbitTemplate rabbitTemplate;
-
-
public
void
sendMessage
(String exchange,String routingKey,Object msg) {
-
// 设置交换机处理失败消息的模式 true 表示消息由交换机 到达不了队列时,会将消息重新返回给生产者
-
// 如果不设置这个指令,则交换机向队列推送消息失败后,不会触发 setReturnCallback
-
rabbitTemplate.setMandatory(
true);
-
//消息消费者确认收到消息后,手动ack回执
-
rabbitTemplate.setConfirmCallback(
this);
-
-
// 暂时关闭 return 配置
-
//rabbitTemplate.setReturnCallback(this);
-
//发送消息
-
rabbitTemplate.convertAndSend(exchange,routingKey,msg);
-
}
-
-
/**
-
* 交换机并未将数据丢入指定的队列中时,触发
-
* channel.basicPublish(exchange_name,next.getKey(), true, properties,next.getValue().getBytes());
-
* 参数三:true 表示如果消息无法正常投递,则return给生产者 ;false 表示直接丢弃
-
* @param message 消息对象
-
* @param replyCode 错误码
-
* @param replyText 错误信息
-
* @param exchange 交换机
-
* @param routingKey 路由键
-
*/
-
@Override
-
public
void
returnedMessage
(Message message, int replyCode, String replyText, String exchange, String routingKey) {
-
log.info(
"---- returnedMessage ----replyCode="+replyCode+
" replyText="+replyText+
" ");
-
}
-
-
/**
-
* 消息生产者发送消息至交换机时触发,用于判断交换机是否成功收到消息
-
* @param correlationData 相关配置信息
-
* @param ack exchange 交换机,判断交换机是否成功收到消息 true 表示交换机收到
-
* @param cause 失败原因
-
*/
-
@Override
-
public
void
confirm
(CorrelationData correlationData, boolean ack, String cause) {
-
log.info(
"---- confirm ----ack="+ack+
" cause="+String.valueOf(cause));
-
log.info(
"correlationData -->"+correlationData.toString());
-
if(ack){
-
// 交换机接收到
-
log.info(
"---- confirm ----ack==true cause="+cause);
-
}
else{
-
// 没有接收到
-
log.info(
"---- confirm ----ack==false cause="+cause);
-
}
-
}
-
}
转载:https://blog.csdn.net/Joker_ZJN/article/details/128402725