介绍:
消息队列(Message Queue,简称MQ)。消息中间件作为实现分布式消息系统可拓展、可伸缩性的关键组件,具有高吞吐量、高可用等等优点。
优势:
- 削峰填谷(主要解决瞬时写压力大于应用服务能力导致消息丢失、系统奔溃等问题)
- 系统解耦(解决不同重要程度、不同能力级别系统之间依赖导致一死全死)
- 提升性能(当存在一对多调用时,可以发一条消息给消息系统,让消息系统通知相关系统)
- 蓄流压测(线上有些链路不好压测,可以通过堆积一定量消息再放开来压测)
目前主流的MQ主要是Rocketmq、kafka、Rabbitmq,Rocketmq相比于Rabbitmq、kafka具有主要优势特性有:
- 支持事务型消息(消息发送和DB操作保持两方的最终一致性,rabbitmq和kafka不支持)
- 支持结合rocketmq的多个系统之间数据最终一致性(多方事务,二方事务是前提)
- 支持18个级别的延迟消息(rabbitmq和kafka不支持)
- 支持指定次数和时间间隔的失败消息重发(kafka不支持,rabbitmq需要手动确认)
- 支持consumer端tag过滤,减少不必要的网络传输(rabbitmq和kafka不支持)
- 支持重复消费(rabbitmq不支持,kafka支持)
Rocketmq、kafka、Rabbitmq的详细对比,请参照下表格:
基础:
RocketMQ主要组成:NameServer、Broker(代理)、Producer(消
息生产者)、Cosumer(消息消费者)
集群部署结构:
NameServer
NameServer: rocketmq名称服务器,更新和发现 broker服务。一个几乎无状态节点,可集群部署,节点之间无任何信息同步
Broker:消息中转角色,负责存储和转发消息。
Broker分为Master和Slave。一个Master可以对应多个Slave,但是一个Slave只能对应一个Master。Master和Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义。BrokerId为0表示Master,BrokerId非0表示Slave。然后所有的Broker和Name Server上的节点建立长连接,定时注册Topic信息到所有Name Server。
Broker-Master:broker 消息主机服务器 Broker-Slave: broker 消息从机服务器
每个Broker与Name Server集群中的所有节点建立长连接,定时(每隔30s)注册Topic信息到所有Name Server。Name Server定时(每隔10s)扫描所有存活broker的连接,如果Name Server超过2分钟没有收到心跳,则Name Server断开与Broker的连接。
Producer: 消息生产者。
Producer与Name Server其中一个节点建立连接。定期从Name Server取Topic信息。并与提供该Topic信息的Master建立长连接。Producer也可以集群部署。
Producer每隔30s(由ClientConfig的pollNameServerInterval)从Name server获取所有topic队列的最新情况,这意味着如果Broker不可用,Producer最多30s能够感知,在此期间内发往Broker的所有消息都会失败。
Producer每隔30s(由ClientConfig中heartbeatBrokerInterval决定)向所有关联的broker发送心跳,Broker每隔10s中扫描所有存活的连接,如果Broker在2分钟内没有收到心跳数据,则关闭与Producer的连接。
Consumer: 消息消费者。
Consumer 与Name Server 集群中的其中一个节点(随机选择)建立长连接,定期从Name Server 取Topic 路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。
Consumer每隔30s从Name server获取topic的最新队列情况,这意味着Broker不可用时,Consumer最多最需要30s才能感知。
Consumer每隔30s(由ClientConfig中heartbeatBrokerInterval决定)向所有关联的broker发送心跳,Broker每隔10s扫描所有存活的连接,若某个连接2分钟内没有发送心跳数据,则关闭连接;并向该Consumer Group的所有Consumer发出通知,Group内的Consumer重新分配队列,然后继续消费。
当Consumer得到master宕机通知后,转向slave消费,slave不能保证master的消息100%都同步过来了,因此会有少量的消息丢失。但是一旦master恢复,未同步过去的消息会被最终消费掉。
消费者对列是消费者连接之后(或者之前有连接过)才创建的。我们将原生的消费者标识由 {IP}@{消费者group}扩展为 {IP}@{消费者group}{topic}{tag},(例如 xxx.xxx.xxx.xxx@mqtest_producer-group_2m2sTest_tag-zyk )。任何一个元素不同,都认为是不同的消费端,每个消费端会拥有一份自己消费对列(默认是broker对列数量*broker数量)。新挂载的消费者对列中拥有commitlog中的所有数据。
消费模式(**在默认情况下,就是集群消费 **CLUSTERING)
-
广播消费 BROADCASTING
类似于ActiveMQ中的发布订阅模式,一条消息被多个consumer消费,即使这些consumer属于同一个ConsumerGroup,消息也会被ConsumerGroup中的每个Consumer都消费一次,广播消费中ConsumerGroup概念可以认为在消息划分方面无意义。
-
集群消费 CLUSTERING (默认)
一个ConsumerGroup中的Consumer实例分摊消费消息,实现了天然的消息负载均衡。例如某个Topic有9条消息,其中一个ConsumerGroup有3个实例(可能是3个进程,或者3台机器),那么每个实例只消费其中部分,消费完的消息不能被其他实例消费。默认的分配算法是AllocateMessageQueueAveragely
消息原语
幂等 :RocketMQ使用的消息原语是At Least Once,所以consumer可能多次收到同一个消息,此时务必做好幂等
安装
本文介绍源码编译安装
downlaod source
下载rocketMq source 地址 点我
下载源码后配置编译环境 编译需要 jdk 和 maven 3.2.x
好环境后,开始解压编译源码,编译用时较长,因为需要下载一些依赖
> unzip rocketmq-all-4.x.0-source-release.zip
> cd rocketmq-all-4.x..0/
> mvn -Prelease-all -DskipTests clean install -U
> cd distribution/target/apache-rocketmq
启动
nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log
nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log
停止
sh bin/mqshutdown broker
sh bin/mqshutdown namesrv
常用命令
rocketMq添加主题命令 该命令执行会在broker所在机器创建一个新的topic,若topic已存在,则会更新topic的属性
sh mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t orderTopic
deleteTopic
从Broker和Name Server删除Topic
sh mqadmin deleteTopic -t TopicTest -c DefaultCluster -n 192.168.0.1:9876
查询name server 的topic list
sh mqadmin topicList -n localhost:9876
更多命令查看
https://blog.csdn.net/gwd1154978352/article/details/80829534
code
代码地址如下:点我
目前只有链接原生rocketMq 的代码,日后会添加连接 阿里rocketmq 的代码毕竟现在大家都用阿里云
参考:
https://www.jianshu.com/p/2838890f3284
https://www.jianshu.com/p/84ab3a7e4b91
https://blog.csdn.net/u014427391/article/details/78343163
转载:https://blog.csdn.net/u012373815/article/details/103654406