飞道的博客

Kafka学习笔记: 生产者消息分区机制原理剖析

231人阅读  评论(0)

 

对于那种大批量机器组成的集群环境,每分钟产生的日志量都能以 GB 数,

因此如何将这么大的数据量均匀地分配到 Kafka 的各个 Broker 上,就成为一个非常重要的问题。

分区是实现负载均衡以及高吞吐量的关键,故在生产者这一端就要仔细盘算合适的分区策略,避免造成消息数据的“倾斜”,使得某些分区成为性能瓶颈,这样极易引发下游数据消费的性能下降。

 

分区的概念

 Kafka 有主题(Topic)的概念,它是承载真实数据的逻辑容器,而在主题之下还分为若干个分区,也就是说 Kafka 的消息组织方式实际上是三级结构:主题 - 分区 - 消息。

主题下的每条消息只会保存在某一个分区中,而不会在多个分区中被保存多份

 

 

分区的作用就是提供负载均衡的能力,或者说对数据进行分区的主要原因,就是为了实现系统的高伸缩性(Scalability)

不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的读写请求处理。并且,我们还可以通过添加新的节点机器来增加整体系统的吞吐量。

 

分区策略

所谓分区策略是决定生产者将消息发送到哪个分区的算法。

Kafka 为我们提供了默认的分区策略,同时它也支持你自定义分区策略。

如果要自定义分区策略,你需要显式地配置生产者端的参数partitioner.class。方法很简单,在编写生产者程序时,你可以编写一个具体的类实现org.apache.kafka.clients.producer.Partitioner接口。接口只定义了两个方法:partition()和close().


  
  1. int partition( String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

这里的topic、key、keyBytes、value和valueBytes都属于消息数据,cluster则是集群信息(比如当前 Kafka 集群共有多少主题、多少 Broker 等)。Kafka 给你这么多信息,就是希望让你能够充分地利用这些信息对消息进行分区,计算出它要被发送到哪个分区中。只要你自己的实现类定义好了 partition 方法,同时设置partitioner.class参数为你自己实现类的 Full Qualified Name,那么生产者程序就会按照你的代码逻辑对消息进行分区。

轮询策略

也称 Round-robin 策略,即顺序分配。比如一个主题下有 3 个分区,那么第一条消息被发送到分区 0,第二条被发送到分区 1,第三条被发送到分区 2,以此类推。当生产第 4 条消息时又会重新开始,即将其分配到分区 0,就像下面这张图展示的那样。

轮询策略是 Kafka Java 生产者 API 默认提供的分区策略。

如果你未指定partitioner.class参数,那么你的生产者程序会按照轮询的方式在主题的所有分区间均匀地“码放”消息。

随机策略

也称 Randomness 策略。所谓随机就是我们随意地将消息放置到任意一个分区上


  
  1. List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
  2. return ThreadLocalRandom.current().nextInt(partitions.size());

先计算出该主题总的分区数,然后随机地返回一个小于它的正整数。

本质上看随机策略也是力求将数据均匀地打散到各个分区,但从实际表现来看,它要逊于轮询策略所以如果追求数据的均匀分布,还是使用轮询策略比较好

事实上,随机策略是老版本生产者使用的分区策略,在新版本中已经改为轮询了。按消息键保序策略

按消息键保序策略

Kafka 允许为每条消息定义消息键,简称为 Key。这个 Key 的作用非常大,它可以是一个有着明确业务含义的字符串,比如客户代码、部门编号或是业务 ID 等;也可以用来表征消息元数据。特别是在 Kafka 不支持时间戳的年代,在一些场景中,工程师们都是直接将消息创建时间封装进 Key 里面的。一旦消息被定义了 Key,那么你就可以保证同一个 Key 的所有消息都进入到相同的分区里面,由于每个分区下的消息处理都是有顺序的,故这个策略被称为按消息键保序策略,如下图所示。

 

实现这个策略的 partition 方法同样简单,只需要下面两行代码即可:


  
  1. List< PartitionInfo> partitions = cluster.partitionsForTopic(topic);
  2. return Math. abs(key.hashCode()) % partitions.size();

前面提到的 Kafka 默认分区策略实际上同时实现了两种策略:如果指定了 Key,那么默认实现按消息键保序策略;如果没有指定 Key,则使用轮询策略。

 

其他分区策略

其实还有一种比较常见的,即所谓的基于地理位置的分区策略。当然这种策略一般只针对那些大规模的 Kafka 集群,特别是跨城市、跨国家甚至是跨大洲的集群。

 

从所有分区中找出那些 Leader 副本在南方的所有分区,然后随机挑选一个进行消息发送。


  
  1. List< PartitionInfo> partitions = cluster.partitionsForTopic(topic);
  2. return partitions.stream(). filter(p -> isSouth(p.leader().host())). map( PartitionInfo:: partition).findAny(). get();

 

 

 

原文引用:

Kafka核心技术与实战 - 胡夕

 

 

 

 

 

 

 

 


转载:https://blog.csdn.net/zhanglong_4444/article/details/103667539
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场