简介
RabbitMQ的消息确认有两种。
第一种是消费接收确认。这种是确认消费者是否成功消费了队列中的消息。
第二种是消息发送确认。这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递。发送确认分为两步,一是确认是否到达交换器,二是确认是否到达队列。
消费者确认
消费者确认或者说消费者应答指的是RabbitMQ需要确认消息到底有没有被收到
自动应答
boolean autoAck = true;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
在订阅消息的时候可以指定应答模式,当自动应答等于true的时候,表示当消费者一收到消息就表示消费者收到了消息,消费者收到了消息就会立即从队列中删除。
手动应答
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
手动应答和自动应答不一样,需要将autoAck设置为false,当消费者收到消息在合适的时候来显示的进行确认,说我已经接收到了该消息了,RabbitMQ可以从队列中删除该消息了,可以通过显示调用channel.basicAck(envelope.getDeliveryTag(), false);来告诉消息服务器来删除消息
生产者确认
当生产者发布消息到RabbitMQ中,生产者需要知道是否真的已经发送到RabbitMQ中,需要RabbitMQ告诉生产者。
事务机制
channel.txSelect();
channel.txCommit();
channel.txRollback();
//获取链接
Connection connection = ConnectionUtil.getConnection();
//从连接中创建通道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
try {
//声明启动事务模式
channel.txSelect();
for (int i = 0; i < 5; i++) {
String message = " 生产消息_" + i;
//消息发布方法
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
}
//提交事务
channel.txCommit();
} catch (Exception e) {
//回滚事务
channel.txRollback();
}
注意:事务机制是非常非常非常消耗性能的,最好使用Confirm机制,Confirm机制相比事务机制性能上要好很多。
Confirm机制的三种实现方式:
方式一:channel.waitForConfirms()普通发送方确认模式;
方式二:channel.waitForConfirmsOrDie()批量确认模式;
方式三:channel.addConfirmListener()异步监听发送方确认模式;
方式一:普通Confirm模式
//获取链接
Connection connection = ConnectionUtil.getConnection();
//从连接中创建通道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//开启发送方确认模式
channel.confirmSelect();
for (int i = 0; i < 5; i++) {
String message = " 生产消息_" + i;
//消息发布方法
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
}
//普通发送方确认模式
if (channel.waitForConfirms()){
System.out.println("消息发送成功" );
}
看代码可以知道,我们只需要在推送消息之前,channel.confirmSelect()声明开启发送方确认模式,再使用channel.waitForConfirms()等待消息被服务器确认即可。
方式二:channel.waitForConfirmsOrDie()批量确认模式
//获取链接
Connection connection = ConnectionUtil.getConnection();
//从连接中创建通道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//开启发送方确认模式
channel.confirmSelect();
for (int i = 0; i < 5; i++) {
String message = " 生产消息_" + i;
//消息发布方法
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
}
channel.waitForConfirmsOrDie();//直到所有信息都发布,只要有一个未确认就会IOException
System.out.println("消息发送成功" );
以上代码可以看出来channel.waitForConfirmsOrDie(),使用同步方式等所有的消息发送之后才会执行后面代码,只要有一个消息未被确认就会抛出IOException异常。
方式三:channel.addConfirmListener()异步监听发送方确认模式
//获取链接
Connection connection = ConnectionUtil.getConnection();
//从连接中创建通道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//开启发送方确认模式
channel.confirmSelect();
for (int i = 0; i < 5; i++) {
String message = " 生产消息_" + i;
//消息发布方法
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
}
//异步监听确认和未确认的消息
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("未确认消息,标识:" + deliveryTag);
}
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println(String.format("已确认消息,标识:%d,多个消息:%b", deliveryTag, multiple));
}
});
可以看出,代码是异步执行的,消息确认有可能是批量确认的,是否批量确认在于返回的multiple的参数,此参数为bool值,如果true表示批量执行了deliveryTag这个值以前的所有消息,如果为false的话表示单条确认。
总结
RabbitMQ一般情况很少丢失,但是不能排除意外,为了保证我们自己系统高可用,我们必须作出更好完善措施,保证系统的稳定性。除了确认机制可以防止数据丢失外,还有消息持久化、设置集群镜像模式、消息补偿机制,也能做到防止数据丢失。
转载:https://blog.csdn.net/weixin_45498465/article/details/105660924