前言
上文 Kafka-Consumer 源码解析 – rebalance过程和partition的确认中已经介绍了各个consumer的partition的确认,在确认partition之后,数据拉取开始正常执行。
1、数据拉取
数据拉取步骤:
- 非自动提交和record模式下,提交对于offset
- 获取最新的partition offset,确认数据拉取位置
- 执行 KafkaConsumer poll 拉取数据
- poll 中调用 pollForFetches 执行数据拉取
- pollForFetches 首先判断是否存在已经拉取的数据,这里是因为在上一次的数据拉取过程中进行了提前的拉取工作,有可能已经存在了消息等待处理。如果没有已经拉取的数据,执行新的拉取请求。
- 拉取到数据之后,并不会将数据立即返回,而是再次执行数据拉取的操作,这里使用非阻塞网络io,消费数据和下一次的数据拉取可以同步进行,提高了数据处理能力。
在 ListenerConsumer
类的 pollAndInvoke
方法执行数据拉取、offset提交和数据消费的过程。
pollAndInvoke
实现:
protected void pollAndInvoke() {
// 如果当前的提交模式不是 自动提交 并且也不是 逐条 提交,执行 commit 操作
if (!this.autoCommit && !this.isRecordAck) {
processCommits();
}
// 如果当前 seeks 中存储有 TopicPartitionOffset 的实例,则需要获取偏移量信息
if (this.seeks.size() > 0) {
processSeeks();
}
// 检查当前消费是否暂停
checkPaused();
this.lastPoll = System.currentTimeMillis();
// 设置数据拉取状态
this.polling.set(true);
// 拉取数据
ConsumerRecords<K, V> records = this.consumer.poll(this.pollTimeout);
// 对 polling 拉取状态执行 CAS 过程
if (!this.polling.compareAndSet(true, false)) {
/*
* There is a small race condition where wakeIfNecessary was called between
* exiting the poll and before we reset the boolean.
*/
if (records.count() > 0) {
this.logger.debug(() -> "Discarding polled records, container stopped: " + records.count());
}
return;
}
checkResumed();
debugRecords(records);
// 如果拉取到的数据不为 empty ,执行数据消费的过程
if (records != null && records.count() > 0) {
if (this.containerProperties.getIdleEventInterval() != null) {
this.lastReceive = System.currentTimeMillis();
}
invokeListener(records);
}
else {
checkIdle();
}
}
查看 KafkaConsumer
的 poll
方法,实际拉取数据的过程为pollForFetches
:
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(Timer timer) {
long pollTimeout = coordinator == null ? timer.remainingMs() :
Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());
// if data is available already, return it immediately
// 这里检查是否数据已经准备完成
// consumer 的数据 poll 是在拉取完成之后,并不会立即返回数据,而是执行一次非阻塞的数据拉取过程
// 这样的处理可以大大提高线程的并行处理能力
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
if (!records.isEmpty()) {
return records;
}
// send any new fetches (won't resend pending fetches)
// 发送数据拉取的请求
fetcher.sendFetches();
// We do not want to be stuck blocking in poll if we are missing some positions
// since the offset lookup may be backing off after a failure
// NOTE: the use of cachedSubscriptionHashAllFetchPositions means we MUST call
// updateAssignmentMetadataIfNeeded before this method.
if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) {
pollTimeout = retryBackoffMs;
}
Timer pollTimer = time.timer(pollTimeout);
// 使用异步io执行下一次的数据拉取
client.poll(pollTimer, () -> {
// since a fetch might be completed by the background thread, we need this poll condition
// to ensure that we do not block unnecessarily in poll()
return !fetcher.hasCompletedFetches();
});
timer.update(pollTimer.currentTimeMs());
// after the long poll, we should check whether the group needs to rebalance
// prior to returning data so that the group can stabilize faster
if (coordinator != null && coordinator.rejoinNeededOrPending()) {
return Collections.emptyMap();
}
return fetcher.fetchedRecords();
}
2、offset提交
offset的手动提交方式:
- RECORD
每处理一条commit一次 - BATCH(默认)
每次poll的时候批量提交一次,频率取决于每次poll的调用频率 - TIME
每次间隔ackTime的时间去commit(跟auto commit interval有什么区别呢?) - COUNT
累积达到ackCount次的ack去commit - COUNT_TIME
ackTime或ackCount哪个条件先满足,就commit - MANUAL
listener负责ack,但是背后也是批量上去 - MANUAL_IMMEDIATE
listner负责ack,每调用一次,就立即commit
手动提交的实现在ListenerConsumer
类 invokeOnMessage
方法中调用 ackCurrent
执行。
自动提交可通过配置项auto.commit.interval.ms来设置提交操作的时间间隔,自动提交并非通过定时任务周期性地提交,而是在一些特定事件发生时才检测与上一次提交的时间间隔是否超过了${auto.commit.interval.ms}计算出的下一次提交的截止时间nextAutoCommitDeadline,若时间间隔超过了nextAutoCommitDeadline 则请求提交偏移量,同时更新下一次提交消费偏移量的nextAutoCommitDeadline。
自动提交的触发事件:
- 通过 KafkaConsumer.assign()订阅分区
- ConsumerCoordinator.poll()方法处理时(maybeAutoCommitOffsetsAsync方法)
- .在消费者进行平衡操作前
- ConsumerCoordinator 关闭操作
ConsumerCoordinator.poll()
方法:
@Override
public ConsumerRecords<K, V> poll(final Duration timeout) {
return poll(time.timer(timeout), true);
}
private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
acquireAndEnsureOpen();
try {
if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
}
// poll for new data until the timeout expires
do {
client.maybeTriggerWakeup();
if (includeMetadataInTimeout) {
if (!updateAssignmentMetadataIfNeeded(timer)) {
return ConsumerRecords.empty();
}
} else {
while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE))) {
log.warn("Still waiting for metadata");
}
}
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
if (!records.isEmpty()) {
// before returning the fetched records, we can send off the next round of fetches
// and avoid block waiting for their responses to enable pipelining while the user
// is handling the fetched records.
//
// NOTE: since the consumed position has already been updated, we must not allow
// wakeups or any other errors to be triggered prior to returning the fetched records.
if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {
client.pollNoWakeup();
}
return this.interceptors.onConsume(new ConsumerRecords<>(records));
}
} while (timer.notExpired());
return ConsumerRecords.empty();
} finally {
release();
}
}
其中 updateAssignmentMetadataIfNeeded
方法:
boolean updateAssignmentMetadataIfNeeded(final Timer timer) {
if (coordinator != null && !coordinator.poll(timer)) {
return false;
}
return updateFetchPositions(timer);
}
// ConsumerCoordinator 类中
public boolean poll(Timer timer) {
maybeUpdateSubscriptionMetadata();
invokeCompletedOffsetCommitCallbacks();
if (subscriptions.partitionsAutoAssigned()) {
// Always update the heartbeat last poll time so that the heartbeat thread does not leave the
// group proactively due to application inactivity even if (say) the coordinator cannot be found.
pollHeartbeat(timer.currentTimeMs());
if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) {
return false;
}
if (rejoinNeededOrPending()) {
// due to a race condition between the initial metadata fetch and the initial rebalance,
// we need to ensure that the metadata is fresh before joining initially. This ensures
// that we have matched the pattern against the cluster's topics at least once before joining.
if (subscriptions.hasPatternSubscription()) {
if (this.metadata.timeToAllowUpdate(timer.currentTimeMs()) == 0) {
this.metadata.requestUpdate();
}
if (!client.ensureFreshMetadata(timer)) {
return false;
}
maybeUpdateSubscriptionMetadata();
}
if (!ensureActiveGroup(timer)) {
return false;
}
}
} else {
if (metadata.updateRequested() && !client.hasReadyNodes(timer.currentTimeMs())) {
client.awaitMetadataUpdate(timer);
}
}
maybeAutoCommitOffsetsAsync(timer.currentTimeMs());
return true;
}
maybeAutoCommitOffsetsAsync
为异步提交位移的实现。
转载:https://blog.csdn.net/qq_38245668/article/details/105982955