在了解了RocketMQ的发送与接收后,也好奇RocketMQ内部是如何处理好生产端、消费端的负载均衡的,下面通过分析源码、查阅相关文档资料以及结合自己的理解,做了下归纳总结。
RocketMQ的消息负载均衡都是下放到Client端来实现的,具体可细分为2块:发送负载(Producer端)、消费负载(Consumer端)。
1、发送负载
1.1 路由信息
消息生产者Producer作为客户端发送消息时候,需要根据消息的Topic从本地缓存的TopicPublishInfoTable获取路由信息。如果没有则更新路由信息会从NameServer上重新拉取,同时Producer会默认每隔30s向NameServer拉取一次路由信息。
1.2 选择队列
1.2.1 默认方式(sendLatencyFaultEnable 开关关闭)
生产者端发送消息时,会根据Topic信息(每条消息都必须指定有Topic信息),从TopicPublishInfo中的messageQueueList中选择一个队列(MessageQueue)进行发送消息。随机递增式的轮询,每个生产者都通过ThreadLocal维护自己的一套下标index,初始化时产生随机数生成下标,后续每次都递增加1后对队列个数取模,从而获取对应下标的messageQueue。
1.2.2 Broker故障延迟方式(sendLatencyFaultEnable 开关打开)
在随机递增取模的基础上,结合消息失败延迟策略,过滤掉暂时认为不可用的Broker的消息队列。
消息失败延迟策略的算法在MQFaultStrategy上实现(MQFaultStrategy也被称为失败延迟策略实现的门面类),其中2个重要的参数 latencyMax、notAvailableDuration(单位都是毫秒)。
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
这2个参数如何结合实现延迟的呢?
latencyMax,在发送消息后,根据本次消息的发送耗时 currentLatency,从latencyMax数组最后一个值往前找,直到第一个比currentLatency小的值,其对应的下标为currentIdx,则可设置Broker的不可用时长为notAvailableDuration[currentIdx],调用门面类updateFaultItem方法进行更新,以此达到退避的效果。
举个例子,如果请求的latency为3300L,则currentLatency=5,对应的不可用时长为notAvailableDuration[5]=180000L,也即本次记录broker需要退避的时长180秒。
该延迟机制(latencyFaultTolerance)也是消费者端实现高可用的核心所在。
2、消费负载
这里主要讲消费端的集群消费模式下的处理(另一种模式是广播模式)。
2.1 消息获取模型概述
目前客户端与服务端(Broker)之间有两种模式:推模式、拉模式。
这里的推模式是基于拉模式进行了封装,也即通过长轮询的方式来达到兼具Pull与Push的优点。在服务端收到客户端的请求后,会进行查询,如果队列里没有数据,此时服务端先挂起,不着急返回,等待一定时间(默认5s)后,会再进一步继续查询,当一直未查询到结果并超过重试次数后返回空结果(比较适合在客户端连接数量可控的场景中)。
PS,RocketMQ的前身,第一代的Notify主要使用了推模型,解决了事务消息。第二代的MetaQ则主要使用了拉模型,解决了顺序消息和海量堆积的问题。所以一个优秀的项目其实都是在不断进化演变中的。
2.2 消费者队列如何负载
消息消费队列在同一消费组不同消费者之间的负载均衡,其核心设计理念是在一个消息消费队列在同一时间只允许被同一消费组内的一个消费者消费,一个消息消费者能同时消费多个消息队列。
在RocketMQ中,消息队列的负载均衡是由客户端启动MQClientInstance实例部分时,触发负载均衡服务线程(具体由RebalanceService线程实现),默认每20s执行一次。
底层实现均衡的逻辑是在RebalanceImpl类的rebalanceByTopic()方法中。代码如下:
/**
* 消费负载均衡核心方法
*
* @param topic 待重均衡主题
* @param isOrder
*/
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
if (changed) {
this.messageQueueChanged(topic, mqSet, mqSet);
log.info("messageQueueChanged {} {} {} {}",
consumerGroup,
topic,
mqSet,
mqSet);
}
} else {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
break;
}
/** 集群模式 */
case CLUSTERING: {
/** 1、获取该topic下的所有mq消费队列 */
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
/** 2、获取该topic、消费者分组下的所有消费者id */
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
if (null == mqSet) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
}
if (null == cidAll) {
log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
}
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
Collections.sort(mqAll);
Collections.sort(cidAll);
/** 3、获取消息队列分配策略 */
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
/** 4、开始给当前消费者分配消费队列 */
List<MessageQueue> allocateResult = null;
try {
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
e);
return;
}
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
/** 5、重均衡后,更新快照队列信息 */
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
log.info(
"rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
allocateResultSet.size(), allocateResultSet);
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
break;
}
default:
break;
}
}
具体过程解释(针对集群模式):
1、获取该topic下的所有mq消息队列;
2、获取该topic、消费者分组下的所有消费者id;
3、校验步骤1/2中任意一个结果,如果结果为空则跳过不做处理;否则进入步骤4;
4、获取消息队列分配策略;
目前RocketMQ提供了6种分配算法,默认使用消息队列的平均分配算法(AllocateMessageQueueAveragely),也推荐使用这种。
平均算法举例说明:假设有8个队列,q1,q2,……,q8,有3个消费者c1,c2,c3,则在平均分配算法下,各消费者的分配队列如下:
c1:q1,q2,q3
c2:q4,q5,q6
c3:q7,q8
(也因此可以看出,当消费者数量大于队列数量时,则会存在消费者无法分配到队列的情况)
5、重均衡后,更新快照队列信息(ProcessQueueTable)
此时调用RebalanceImpl#updateProcessQueueTableInRebalance()进行处理
假设本次通过上面几个步骤分配后得到的队列集合(mqSet)为mq1,mq2,mq3,mq4,在更新ProcessQueueTable中,会拿已分配到的队列与当前的消费队列快照(Queue consumption snapshot)比对。
变量解释说明:
processQueueTable:当前消费者负载的消息队列缓存表,结构是 ConcurrentMap<MessageQueue, ProcessQueue>
队列的比对情况(3种)以及对应执行的操作如下:
1)当前快照队列集合存在,新分配队列集合不存在(假设为上图processQueueTable标注的红色部分,e1,e2)
执行剔除e1,e2的操作,将状态标识字段 droped 置为 true,这样,该 ProcessQueue 中的消息将不会再被消费。
2)当前快照队列集合存在,新分配队列集合也存在(假设为上图processQueueTable标注的绿色部分,e3,e4)
Pull模式直接忽略不做调整;Push模式下判断processQueueTable中的该2个ProcessQueue是否已过期,已过期则移除。
3)当前快照队列集合不存在,新分配队列集合存在(假设为上图processQueueTable标注的白色部分,e5,e6);
本次新增的消息队列,添加入processQueueTable中。
至此,完成了消费端的负载均衡。
转载:https://blog.csdn.net/BuDongOrz/article/details/101036805