KafkaConsumer中的位移提交

说位移提交之前,我们首先简单的回顾一下位移和千金小姐图位移之间的区别。我们通常所说的位移是指 TopicPartition 在 Broker 端的存储偏移量。而千金小姐图位移是指某个千金小姐图组在不同 TopicPartition 上面的消费偏移量。下面我们介绍一下千金小姐图位移的提交方式,其中主要包含了自动提交和手动提交。

自动提交

对于启用自动提交位移,在 KafkaConsumer 实例初始化的时候,通过设置参数 enable.auto.commit 的值为 true 即可(默认为true)。同时与其相关联的参数 auto.commit.interval.ms,这个参数可以设置提交的时间间隔,这个值默认是5秒。

对于自动提交的触发条件,除了要满足时间的阈值,还需要Client端调用 KafkaConsumer.poll() 方法才能触发。每次执行都会调用 ConsumerCoordinator.poll() 执行千金小姐图入组的流程,在方法执行的最后会执行一个异步的 offset 提交。实现代码如下:

public void maybeAutoCommitOffsetsAsync(long now) {
?? ?if (autoCommitEnabled) {
?? ??? ?nextAutoCommitTimer.update(now);
?? ??? ?if (nextAutoCommitTimer.isExpired()) {
?? ??? ??? ?nextAutoCommitTimer.reset(autoCommitIntervalMs);
?? ??? ??? ?doAutoCommitOffsetsAsync();
?? ??? ?}
?? ?}
}

private void doAutoCommitOffsetsAsync() {
?? ?Map allConsumedOffsets = subscriptions.allConsumed();
?? ?log.debug("Sending asynchronous auto-commit of offsets {}", allConsumedOffsets);

?? ?commitOffsetsAsync(allConsumedOffsets, (offsets, exception) -> {
?? ??? ?if (exception != null) {
?? ??? ??? ?if (exception instanceof RetriableException) {
?? ??? ??? ??? ?log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error: {}", offsets,
?? ??? ??? ??? ??? ?exception);
?? ??? ??? ??? ?nextAutoCommitTimer.updateAndReset(rebalanceConfig.retryBackoffMs);
?? ??? ??? ?} else {
?? ??? ??? ??? ?log.warn("Asynchronous auto-commit of offsets {} failed: {}", offsets, exception.getMessage());
?? ??? ??? ?}
?? ??? ?} else {
?? ??? ??? ?log.debug("Completed asynchronous auto-commit of offsets {}", offsets);
?? ??? ?}
?? ?});
}

手动提交

与自动提交相对应 就是手动提交了,此时在创建KafkaConsumer实例时,就需要指定 enable.auto.commit 的值为false。然后在处理完消息之后,自己手动执行 commit 操作。对于手动提交位移,又分为同步提交和异步提交。

同步提交

同步提交时通过 KafkaConsumer.commitSync()方法实现,其内部又调用了ConsumerCoordinator.commitOffsetsSync() 方法发送位移提交请求。同步的位移提交提供了重试的机制,其代码实现如下:

/**
?* 同步提交位移,如失败,此方法将会重试提交位移,直至超时
?*/
public boolean commitOffsetsSync(Map offsets, Timer timer) {
?? ?invokeCompletedOffsetCommitCallbacks();

?? ?if (offsets.isEmpty())
?? ??? ?return true;

?? ?do {
?? ??? ?if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) {
?? ??? ??? ?return false;
?? ??? ?}

?? ??? ?RequestFuture future = sendOffsetCommitRequest(offsets);
?? ??? ?client.poll(future, timer);

?? ??? ?// 如有处理中的位移提交,则等待执行完成
?? ??? ?invokeCompletedOffsetCommitCallbacks();

?? ??? ?if (future.succeeded()) {
?? ??? ??? ?if (interceptors != null)
?? ??? ??? ??? ?interceptors.onCommit(offsets);
?? ??? ??? ?return true;
?? ??? ?}

?? ??? ?if (future.failed() && !future.isRetriable())
?? ??? ??? ?throw future.exception();

?? ??? ?timer.sleep(rebalanceConfig.retryBackoffMs);
?? ?} while (timer.notExpired());

?? ?return false;
}

对于同步提交,官方文档上面提供了一次拉取,按照批次提交位移的方式,这样可以减少重复消费的批量。

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "false");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 200;
List<>> buffer = new ArrayList<>();
while (true) {
? ? ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
? ? for (ConsumerRecord record : records) {
? ? ? ? buffer.add(record);
? ? }
? ? if (buffer.size() >= minBatchSize) {
? ? ? ? insertIntoDb(buffer);
? ? ? ? consumer.commitSync();
? ? ? ? buffer.clear();
? ? }
}

另外一种方式是,可以按照TopicPartition 分区的维度去提交位移。

try {
? ? while(running) {
? ? ? ? ConsumerRecords records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
? ? ? ? for (TopicPartition partition : records.partitions()) {
? ? ? ? ? ? List<>> partitionRecords = records.records(partition);
? ? ? ? ? ? for (ConsumerRecord record : partitionRecords) {
? ? ? ? ? ? ? ? System.out.println(record.offset() + ": " + record.value());
? ? ? ? ? ? }
? ? ? ? ? ? long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
? ? ? ? ? ? consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
? ? ? ? }
? ? }
} finally {
? consumer.close();
}

异步提交

对于同步的位移提交,通常情况下会影响系统的吞吐量。此时KafkaConsumer也提供了异步的提交方式,也就是commitAsync()。但是相对同步的位移提交,此时异步提交缺少了重试的机制,同步的重试机制可以在网络抖动的场景下,减少提交失败的场景。异步提交在这里没有重试机制,是因为重试的时候消费位移可能已经变化,此时提交已经没啥意义了。

在生产上面,还是建议使用异步的位移提交,这样也可以提升客户端的TPS。对于提交的方式,笔者从网上找到了如下的提交方式:

try {
? ? while(true) {
? ? ? ? ConsumerRecords records = consumer.poll(Duration.ofSeconds(1));
? ? ? ? process(records); // 处理消息
? ? ? ? commitAysnc(); // 使用异步提交规避阻塞
? ? }
} catch(Exception e) {
? ? handle(e); // 处理异常
} finally {
? ? try {
? ? ? ? consumer.commitSync(); // 最后一次提交使用同步阻塞式提交
?? ?} finally {
?? ? ? ?consumer.close();
? ? }
}


参考:《Apache Kafka 源码剖析》、《Kafka技术内幕》、《极客时间:Kafka核心技术与实战》