飞道的博客

RabbitMQ系列(二)确认模式

391人阅读  评论(0)

简介

RabbitMQ的消息确认有两种。

第一种是消费接收确认。这种是确认消费者是否成功消费了队列中的消息。

第二种是消息发送确认。这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递。发送确认分为两步,一是确认是否到达交换器,二是确认是否到达队列。

消费者确认

消费者确认或者说消费者应答指的是RabbitMQ需要确认消息到底有没有被收到

自动应答

boolean autoAck = true;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);

在订阅消息的时候可以指定应答模式,当自动应答等于true的时候,表示当消费者一收到消息就表示消费者收到了消息,消费者收到了消息就会立即从队列中删除。

手动应答

boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);

手动应答和自动应答不一样,需要将autoAck设置为false,当消费者收到消息在合适的时候来显示的进行确认,说我已经接收到了该消息了,RabbitMQ可以从队列中删除该消息了,可以通过显示调用channel.basicAck(envelope.getDeliveryTag(), false);来告诉消息服务器来删除消息

生产者确认

当生产者发布消息到RabbitMQ中,生产者需要知道是否真的已经发送到RabbitMQ中,需要RabbitMQ告诉生产者。

事务机制

channel.txSelect();
channel.txCommit();
channel.txRollback();

        //获取链接
        Connection connection = ConnectionUtil.getConnection();
        //从连接中创建通道
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		try {
			//声明启动事务模式
		    channel.txSelect();
		    for (int i = 0; i < 5; i++) {
		         	String message = " 生产消息_" + i;
		            //消息发布方法
		            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
		    }
		    //提交事务
		    channel.txCommit();
		} catch (Exception e) {
			//回滚事务
		    channel.txRollback();
		}

注意:事务机制是非常非常非常消耗性能的,最好使用Confirm机制,Confirm机制相比事务机制性能上要好很多。

Confirm机制的三种实现方式:

方式一:channel.waitForConfirms()普通发送方确认模式;

方式二:channel.waitForConfirmsOrDie()批量确认模式;

方式三:channel.addConfirmListener()异步监听发送方确认模式;

方式一:普通Confirm模式
        //获取链接
        Connection connection = ConnectionUtil.getConnection();
        //从连接中创建通道
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		//开启发送方确认模式
		channel.confirmSelect();
		for (int i = 0; i < 5; i++) {
		    		String message = " 生产消息_" + i;
		            //消息发布方法
		            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
		}
		//普通发送方确认模式
		if (channel.waitForConfirms()){
			System.out.println("消息发送成功" );
		}

看代码可以知道,我们只需要在推送消息之前,channel.confirmSelect()声明开启发送方确认模式,再使用channel.waitForConfirms()等待消息被服务器确认即可。

方式二:channel.waitForConfirmsOrDie()批量确认模式
        //获取链接
        Connection connection = ConnectionUtil.getConnection();
        //从连接中创建通道
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		//开启发送方确认模式
		channel.confirmSelect();
		for (int i = 0; i < 5; i++) {
		    		String message = " 生产消息_" + i;
		            //消息发布方法
		            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
		}
		channel.waitForConfirmsOrDie()//直到所有信息都发布,只要有一个未确认就会IOException
		System.out.println("消息发送成功" );

以上代码可以看出来channel.waitForConfirmsOrDie(),使用同步方式等所有的消息发送之后才会执行后面代码,只要有一个消息未被确认就会抛出IOException异常。

方式三:channel.addConfirmListener()异步监听发送方确认模式
        //获取链接
        Connection connection = ConnectionUtil.getConnection();
        //从连接中创建通道
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		//开启发送方确认模式
		channel.confirmSelect();
		for (int i = 0; i < 5; i++) {
		    		String message = " 生产消息_" + i;
		            //消息发布方法
		            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
		}
		//异步监听确认和未确认的消息
		channel.addConfirmListener(new ConfirmListener() {
		                                       @Override
		                                       public void handleNack(long deliveryTag, boolean multiple) throws IOException {
		                                           System.out.println("未确认消息,标识:" + deliveryTag);
		                                       }
		                                       @Override
		                                       public void handleAck(long deliveryTag, boolean multiple) throws IOException {
		                                           System.out.println(String.format("已确认消息,标识:%d,多个消息:%b", deliveryTag, multiple));
		                                       }
		                                   });

可以看出,代码是异步执行的,消息确认有可能是批量确认的,是否批量确认在于返回的multiple的参数,此参数为bool值,如果true表示批量执行了deliveryTag这个值以前的所有消息,如果为false的话表示单条确认。

总结

RabbitMQ一般情况很少丢失,但是不能排除意外,为了保证我们自己系统高可用,我们必须作出更好完善措施,保证系统的稳定性。除了确认机制可以防止数据丢失外,还有消息持久化、设置集群镜像模式、消息补偿机制,也能做到防止数据丢失。


转载:https://blog.csdn.net/weixin_45498465/article/details/105660924
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场