飞道的博客

Ruby使用RabbitMQ(基础)

411人阅读  评论(0)

Ruby使用RabbitMQ(基础)

RabbitMQ documentation

rabbitmq-tutorials

rabbitmq-configure

bunny

前提

最近刚刚接触到mq, 就在极客时间上买了一门关于mq的课程.
学习了一些基础加上在 RabbitMQ 官网上的例子.

总结了一下.

为什么要使用mq

消息队列(mq)可以帮助我们去处理系统之间的消息传递.
帮助我们去解决消息在传递过程中可能出现的数据丢失问题.

同时,消息队列还可以起到缓存的作用.
消息队列可以暂存一些消息. 平衡消息上下游之间速度不平衡.

mq是必须的吗

mq 不是必须的, 很多系统没有mq 也可以正常运行.
但是, 有mq 可以帮助我们很好地处理系统之间的交互.

mq的应用场景

  • 异步处理
  • 流量控制
  • 日志
  • 服务解耦

不适合mq的场景

  • 数据一致性要求很高
  • 第三方支付
  • 银行转账等

术语 jargon

RabbitMQ 就像一个邮局一样, 有发送信件的人, 也有接受人.
人们不去关心到底通过什么样子的方式, 把信交到收信人那里的.
人们只要把信交给邮局就好.

RabbitMQ 在系统间就起到邮局的作用.
帮助系统之间进行通信. 并且要确保信息不丢失.

Producer(生产者)

Producer 就是消息的生产者.

也就是消息的上游, 一个系统A 产生了一些信息.
可能需要另外一个系统B 去处理.
那么, 系统A 就是一个 Producer

Consumer(消费者)

一个系统B, 需要处理另一个系统A 的消息.
那么 系统B就是一个 Consumer

注意: 一个系统可以是Producer,也可以是Consumer.

Broker

Broker 就是mq, 消息队列中间件.
它去协调在两个系统之间的消息. 尽可能地让消息不丢失,不重复.

安装

  • deepin操作系统
  • 使用docker安装
# 安装docker
sudo apt-get install docker-ce
# 使用docker运行
sudo docker run -it --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
# 关闭
docker stop rabbitmq
# 启动
docker start -a rabbitmq

docker 运行完成后,本地会启动一个server,
也有一个web端的控制台

http://localhost:15672/#/

使用bunny

刚刚安装了RabbitMQ的server.

那么,现在就应该安装RabbitMQ的client了;
RabbitMQ 支持的语言有很多中.
这里因为工作原因, 选择使用了Ruby的bunny

安装 bunny 用于和rabbitmq 进行交互

# 通过gem来安装
gem install bunny
# 直接require 就可以使用了
require 'bunny'

简单使用

Producer

代码中 hostname 是我自己在hosts设置的;
如果 Producer 和 Consumer 在同一个主机上.
就不用设置 hostname

# 引入 bunny
require 'bunny'
# hostname 是我自己在hosts设置的;
connection = Bunny.new(hostname: 'dev.local')
connection.start

channel = connection.create_channel

# 获取queue是幂等的
queue = channel.queue('simple')

100.times do |n|
  message = "第#{n}条消息"
  queue.publish(message)
end
# 关闭连接
connection.close

Consumer

require 'bunny'

connection = Bunny.new(hostname: 'dev.local')
connection.start

channel = connection.create_channel

# 获取queue是幂等的
queue = channel.queue('simple')

# block: true 表明会一直监听mq, 等待消息的传入
# 真实的环境不要使用
queue.subscribe(block: true) do |_delivery_info, _properties, body|
  # 模拟延时任务, 延时1s
  sleep 1
  puts body
end

消息队列可以帮助我们处理消息;
无论是先启动 Consumer 还是先启动 Producer
消息是不会丢失的;

我们分别先执行Consumer,Producer;
我们收到的信息都是一样的;

Consumer 可扩展性

当一个Consumer的处理速度已经满足不了 Producer的生产速度时,
我们可以同时运行多个Consumer.

我们可以执行多个上面 Consumer 的代码;
我们可以看到消息被mq一个个地均分给了一个个的Consumer.

但是, 注意: 这里需要先启动多个 Consumer, 再启动 Producer.
不然, 只能有一个Consumer收到消息.

消息确认

在上面的 Consumer 代码中, 如果一个Consumer 挂了;
或者在 Consumer处理问题发生了问题;
导致消息没有正常处理.

那么这条消息就是丢失了;

RabbitMQ 中如何解决这个问题;
在client上加入 manual_ack: true 就可以了;

queue.subscribe(block: true, manual_ack: true) do |_delivery_info, _properties, body|
  # 模拟延时任务, 延时1s
  sleep 1
  puts body
  # 发送ack, 通知mq 消息已处理完成;
  # 必须发送ack, 不然mq 会一直requeue
  channel.ack(_delivery_info.delivery_tag)
end

修改完了, 我们再看一下结果;
就可以发现, 如果一个Consumer挂掉了;

消息会转移到其它存活的Consumer上.

注意: 如果使用了manual_ack, 一定要发送 ack. 不然RabbitMQ会不停占用内存, 最后导致系统崩了;

消息持久化(Message durability)

通过上面的代码, 我们解决了当Consumer挂掉的时候, mq 会帮助我们把消息发送给别的Consumer;
确保了, 在Consumer端处理得当;

但是, 我们也要考虑到当 RabbitMQ server 挂了的时候.
我们如何处理呢?

  1. 确保 queue 是可持久化的
  2. 确保 messages 是可持久化的

我们在代码中可以添加

同时在Consumer和Producer的代码中修改
注意: 我们使用了不同的queue

# 加入 durable: true
queue = channel.queue('simple-durable', durable: true)

在Producer 中加入

queue.publish(message, persistent: true)

执行docker kill 可以在试试看效果

不过要注意:

这样并不会完全保证消息不丢失; 尽管RabbitMQ知道了要把数据保存到硬盘中.
但是仍然有一个短暂的窗口时间: 当mq 接受到消息, 但是还未将消息保存到硬盘.

同样, MQ也不会收到一条消息就保存到内存中去;
这个持久化并不strong. 当然,一般情况下够用.

如果需要一个更strong的保证. 可以使用 publisher confirms

公平分配(Fair dispatch)

我们运行上面的Consumer 可以发现.
消息其实是公平分配给每个Consumer的.

那么,这可能会出现问题;
假如某个节点处理消息的速度快, 但是因为公平分配.
这个节点的利用率就比较低了;

为了解决这个问题:
我们可以使用 channel.prefetch(n)

n = 1
channel.prefetch(n)

这将告诉RabbitMQ 不要一次发布超过 n 条消息给Consumer;
直到Consumer处理完消息, 并返回ack.

注意: 如果所有的Consumer都处于繁忙状态, 消息可能会堆积.

总结消息丢失

确认消息丢失

首先, 我们需要确定消息丢失.

我们可以利用消息队列的有序性来验证是否有消息丢失

一般MQ会有 拦截器机制, 让我们在Producer和Consumer代码之前,
进行消息丢失的验证;

图中的三个系统 Produce, Broker, Consumer

三个阶段 生产阶段, 存储阶段, 消费阶段

四个流程 send, pull, ack_to_broker, ack_to_producer

都有可能因为系统,网络等原因导致消息的丢失;

Producer

如何在生产者这一阶段确保消息不丢失呢?

生产者发送给MQ消息后,
MQ会给生产者一个ack. 如果没有接受到ack.
说明MQ 挂掉了,或者网络没能达到等等.

这时候, 需要生产者这一端进行数据重发.

只要 Producer 收到了 Broker 的确认响应,
那说明在生产者这一阶段是确保了消息不丢失;

但是, 这也有可能导致消息重复;

Broker

消息传递到了MQ这里. 一般情况下,消息可以正常发送给Consumer.
但是, 如果MQ挂了; 怎么处理?

我们在之前的代码中提到了消息的持久化;
消息的持久化, 可以确保消息在MQ挂了的情况下, 还能够保存下来;

等到下次重启就可以继续把消息发送给Consumer.

Consumer

在Consumer阶段. 很有可能在处理消息时, 系统重启或者挂掉;
那么, 处理消息的业务逻辑还没有完成.

这条消息就不能算是被成功处理了;
所以需要MQ 再次重发消息;

不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认.

消息重复

消息在网络传输过程中发送错误,由于发送方收不到确认,会通过重发来保证消息不丢失。
但是,如果确认响应在网络传输时丢失,也会导致重发消息。

也就是说,无论是 Broker 还是 Consumer 都是有可能收到重复消息的

质量标准

传递消息时能够提供的服务质量标准

  • at more once 消息最多发送一次, 不管不顾. 发完就完事了;

  • at least once 至少一次. 消息会有确认机制. 保证消息被消费者消费了; 但是肯定会有重复;

  • exactly once 仅仅一次; 这个MQ没有实现; 也很难实现; 因为系统层面上的问题, 不能全让MQ 处理;

exactly once 的实现需要 消息队列 + 系统处理

从业务逻辑设计上入手,将消费的业务逻辑设计成具备幂等性的操作。
但是, 很多代码不是幂等的;

需要我们处理成幂等;

消费幂等

消费者一段的代码, 如果是幂等的;
那么, 遇到多余的消息. 直接执行逻辑也无妨;

例如:

给Tony 性别设置为 男;

这个逻辑执行1次, 和执行 n次效果一样;

所以,如果业务处理逻辑本身就是幂等的,
那么,就不用考虑消息重复的问题;

幂等: 多次执行和执行一次结果相同;

消息查重

更新的数据设置前置条件

消息先去重,根据业务ID(或者其它能够标识消息唯一性的就行),
去查询是否消费过此消息了,
消费了,则抛弃,否则就消费.


转载:https://blog.csdn.net/qq_34120430/article/details/105929724
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场