目录
一、原理简述
【1】Producer 将消息进行分组分别发送到对应 leader节点;
【2】Leader 将消息写入本地 log;
【3】Followers 从 Leader pull 最新消息,写入 log后向 Leader 发送 ack确认;
【4】Leader 收到所有 ISR中的 Follower 节点的 ACK后,增加 HW,标记消息已确认全部备份完成,最后返回给 Producer 消息已提交成功;
【5】消费端从对应 Leader 节点 poll最新消息并消费,消费完成后将最新的 offset位置提交至 Topic为 _consumer_offsets的主节点中保存。
二、Producer 原理
初始化 KafkaProducer,会创建一个后台线程 KafkaThread,会循环的判断缓存中的数据是否需要提交。同时会发送消息,主要指定 Topic和 Value,不建议指定 partition和 key防止分区分配的不均匀,扩容不方便等。然后拉取 metadata 它是在 zk中维护的,里面存储了 topic 可用的分区和正在同步的备份。分区器由两种分区策略,第一种是根据 key的 hash值,将相同的 key放在同一个分区,保证相同 key的顺序性。第二种是轮询,key不存在的时候就会轮询的向1、2、3依次存储。当确定了要发送的分区后,会发送到 RecordAccmulator缓存种,key是要发送到指定的分区,value是一个双端队列。然后会判断双端队列是否已满(batch.size)会唤醒 send 发送到 kafka broker中。同时,后端线程也会循环的判断这个双端队列是否已满。还有一种就是根据固定的时间判断是否该发送消息,从而提高 kafka的性能。当发送失败的时候,会将发送失败的消息放到双端队列的最前面。Retry默认是3,一般发送失败是网络等原因或者broker有问题,因此这个值不建议设置比较大。
三、Producer 端参数详解
bootstrap.servers | broker 的地址,集群的地址用逗号分隔,也可以指写集群的一个地址,但防止单点故障,最好都写。 |
key.serializer | 关键字的序列化方式 |
value.serializer | 消息值的序列化方式 |
acks | 指定必须要有多少个分区的副本接收到该消息 |
max.block.ms | 表示 send() 方法在抛出异常之前可以阻塞多久的时间,默认是60s |
compression.type | 消息在发往 kafka 之前可以进行压缩处理,不建议压缩,因为会消耗CPU资源,建议设置大一点 |
retries | 生产者向 kafka 发送消息发生错误时重试 |
retry.backoff.ms | 每次重试之间间隔的时间 |
batch.size | 生产者在发送消息时,可以将同一个分区的消息放在一个批次里,一次性提交。如果数据体比较大时,该值应该设置大一些 |
linger.ms | 生产者在发送一个批次之前,延迟指定时间,让更多的消息加入到该批次 |
buffer.memory | 生产者的内存缓冲区大小,对应的 RecordAccmulator |
四、Kafka Server 基本原理
【1】Zookeeper 集群作用:管理 Kafka 集群,同步元数据信息;
【2】Metadata 信息用来存放什么数据的:broker中主从分区信息(是否可用等),以及从分区进行数据同步的信息;
【3】Producer 发送过来的数据怎么落盘的:链接 数据进来会先存储在页缓存中,提高性能。
五、KafkaServer 主分区与副本数据同步原理
HW高水位:HW之前的数据是已经备份的数据;
LEO末端位移:HW至LEO之间的是未备份数据;
如上5-9之间的数据未备份,Follower 节点会从 Leader节点拉取数据,Leader 节点存储了一个 Follower 副本,该副本也具有HW和LEO ,Follower节点会拉去Leader节点上的 Follower副本,并将数据写入到 log文件中,同时更新 LEO为拉取到的值9。Follower 更新完后,会通知 Leader节点的 Follower 副本,并将 HW 写到该副本中。最后 Leader 副本的 HW会更新为所有 Follower 副本 LEO的最小值。然后判断 HW 是否等于 LEO,如果相等则说明备份好了。
六、KafkaServer 零拷贝原理
Kafka 的数据并不是实时的写入硬盘,利用操作系统分页存储来提高 I/O效率:
【1】数据直接在内核完成输入和输出,不需要拷贝到用户空间再写出去。可能会存在系统挂掉时,数据丢失的风险。零拷贝链接
【2】Kafka 数据写入磁盘前,数据先写到进程的内核空间;
【3】副本备份数据以及消息者读取数据直接从内核空间的网卡读取;
【4】只能缓存有限大小的数据,即如果消费时间久远的数据需要从磁盘读取;
生产者将数据写入页缓存,然后再同步到磁盘,消费者也会先读取页缓存,只有部分数据才会从磁盘中获取。从而提高性能。
七、KafkaServer Leader 选举
Kafka Broker 启动的时候,会启动一个 controller 节点,该节点就是选举的时候使用,controller 会与zk进行通信。如果 broker挂掉了,就会取通知 ctroller 进行统计,看那些节点存活者,然后进行选举,看那个分区成为主分区。这个计算为什么要分在 broker上,因为 zk不适合做大量的计算和读写操作。计算的规则是 LSR中同步的副本,如果那个分区先备份完成就会放在最前面,因此就选择 ISR表中的第一个备份分区为主分区。
八、KafkaConsumer 原理
【1】Consumer 是如何协调消费分区数据的:Consumer 初始化时都会初始化一个 Consumer 协调器,发送心跳检测;Consumer 协调器与 Kafka 对应 Broker 的 group协调器进行通讯,并获取正在消费的所有 Consumer 信息,并将信息发送给主 Consumer协调器,Consumer Leader计算当前所有 Consumer应该对应消费哪个的分区数据(减少broker的压力)。
【2】Consumer 的 offset是如何管理的:Kafka 内部维护了一个_consumer_offsets用于存储 offset;consumer 消费完数据时会将当前消费分区最新的 offset ack到 _consumer_offsets(分区有50个)中。
【3】Consumer group 如何 rebalance的:group 协调器检测有 consumer节点挂掉,会将当前存在的 consumer 发送给 consumer 协调器,主协调器计算 consumer 应对应消费哪个分区数据。计算完成将结果发回至 group协调器。group 协调器与 consumer 协调器通信,通知对应 consumer节点是否需要消费,以及接着消费 offset位置。
九、KafkaConsumer 参数详解
【1】group.id:组ID,唯一标识一个消费组,需要确定一个不重复的组ID;
【2】session.timeout.ms:用于检测 consumer失效(failure)的时间,如果要缩短错误检测时间,可适当减少该值;
【3】auto.offset.reset:指定 consumer启动时消费策略,earliest(设定位移为当前最小位移);latest(设定位移为当前最新位移);
【4】enable.auto.commit:是否让Kafka 在后台自动提交位移,推荐 false,手动提交;
【5】heartbeat.interval.ms:心跳检测频率,如果 consumer group 中成员很多的话推荐将此值设置大一点;
【6】max.poll.interval.ms:用于调节消费主线程的消费速度,需比 poll消息消耗时长更大,否则可能导致 consumer再平衡;
【7】max.poll.records:控制单次 poll回来的消息数,建议越少越好;
十、性能优化方案
【1】acks 按需设置;
【2】尽量使用随机分区,防止数据分配不均匀;
【3】为各个 Producer 配置 retries,建议设置为整形的最大大小,感觉没什么用;
【4】为高吞吐量的 Producer,调优缓冲区的大小,分别是 buffer.memory 和 batch.size(以字节为单位)
KafkaServer 端的建议:
【1】评估当前项吞吐量,测定主分区的写入和消费速率,合理申请分区个数;使用 kafka 测试工具,根据自己实际吞吐量进行计算;分区多消耗资源,同时进行Leader选举也会比较耗时,备份的次数也就多。不断的调整测试,因为分区和吞掉量是一个驼峰式的曲线,存在一个最佳值,并不是越大越好。分区建议是机器的整数倍。
./kafka-producer-perf-test.sh --topic test --num-records 5000 --record-size 1024 --throughput -1 --producer-props bootstrap.servers=192.168.32.12:9092 acks=1
【2】建议每个 partition 至少两个副本,建议有几个 broker就设置几个副本;
【3】log.dirs优化:数据是存在磁盘中的,可以将数据存在不同的磁盘中。进行分盘保存,就可以减少磁盘的压力;
【4】在各个 Brokers上,请压缩 Topics所需的内存和 CPU资源;
【5】不要忽略监控 Brokers的 in-sync replica(ISR);
【6】禁止 Topic 自动创建,默认创建是一份分区一份副本;
【7】对于那些具有持续高吞吐量的 Brokers,请提供足够的内存,以避免它们从磁盘子系统中进行读操作;
【8】Topic 隔离,减少影响半径;
Consumer 消费端的建议:
【1】客户端版本升级到最新,因为老版本很多计算都是放在zk上,新版本都是放在broker上,offset新版本存储在topic中,旧版本存储在zk中;
【2】调优 consumer 的套接字缓冲区,例如poll(60) 超过60s获取不下来就会报错;
【3】设计具有高吞吐量的 consumers;
【4】JVM 上运行各种 consumers时,请警惕垃圾回收对它们可能产生的影响;
转载:https://blog.csdn.net/zhengzhaoyang122/article/details/108287254