紧接着上篇文章,这篇文章讲述 Consumer 提供的两种 commit 机制和两种 partition 分配机制,具体如何使用是需要用户结合具体的场景进行选择,本文讲述一下其底层实现。
两种 commit 机制 先看下两种不同的 commit 机制,一种是同步 commit,一种是异步 commit,既然其作用都是 offset commit,应该不难猜到它们底层使用接口都是一样的,其调用流程如下图所示:
同步 commit 1 2 3 4 5 6 public void commitSync () {}public void commitSync (final Map<TopicPartition, OffsetAndMetadata> offsets) {}
其实,从上图中,就已经可以看出,同步 commit 的实现方式,client.poll()
方法会阻塞直到这个request 完成或超时才会返回。
异步 commit 1 2 3 4 public void commitAsync () {}public void commitAsync (OffsetCommitCallback callback) {}public void commitAsync (final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {}
而对于异步的 commit,最后调用的都是 doCommitOffsetsAsync
方法,其具体实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 private void doCommitOffsetsAsync (final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) { this .subscriptions.needRefreshCommits(); RequestFuture<Void> future = sendOffsetCommitRequest(offsets); final OffsetCommitCallback cb = callback == null ? defaultOffsetCommitCallback : callback; future.addListener(new RequestFutureListener<Void>() { @Override public void onSuccess (Void value) { if (interceptors != null ) interceptors.onCommit(offsets); completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, null )); } @Override public void onFailure (RuntimeException e) { Exception commitException = e; if (e instanceof RetriableException) commitException = new RetriableCommitFailedException(e); completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, commitException)); } }); }
在异步 commit 中,可以添加相应的回调函数,如果 request 处理成功或处理失败,ConsumerCoordinator 会通过 invokeCompletedOffsetCommitCallbacks()
方法唤醒相应的回调函数。
关于 offset commit 请求的处理见上一篇文章中的Offset Commit 请求处理 ,对于提交的 offset,GroupCoordinator 会记录在 GroupMetadata 对象中。
两种 partition 分配机制 consumer 提供的两种不同 partition 分配策略,可以通过 partition.assignment.strategy
参数进行配置,默认情况下使用的是 org.apache.kafka.clients.consumer.RangeAssignor
,Kafka 中提供另一种 partition 的分配策略 org.apache.kafka.clients.consumer.RoundRobinAssignor
,它们关系如下图所示:
通过上图可以看出,用户可以自定义相应的 partition 分配机制,只需要继承这个 AbstractPartitionAssignor
抽象类即可。
AbstractPartitionAssignor AbstractPartitionAssignor 有一个抽象方法,如下所示:
1 2 3 4 5 6 7 8 9 public abstract Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,Map<String, List<String>> subscriptions);
assign()
这个方法,有两个参数:
partitionsPerTopic
:所订阅的每个 topic 与其 partition 数的对应关系,metadata 没有的 topic 将会被移除;
subscriptions
:每个 consumerId 与其所订阅的 topic 列表的关系。
RangeAssignor
和 RoundRobinAssignor
通过这个方法 assign()
的实现,来进行相应的 partition 分配。
RangeAssignor 分配模式 直接看一下这个方法的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, Map<String, List<String>> subscriptions) { Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions); Map<String, List<TopicPartition>> assignment = new HashMap<>(); for (String memberId : subscriptions.keySet()) assignment.put(memberId, new ArrayList<TopicPartition>()); for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) { String topic = topicEntry.getKey(); List<String> consumersForTopic = topicEntry.getValue(); Integer numPartitionsForTopic = partitionsPerTopic.get(topic); if (numPartitionsForTopic == null ) continue ; Collections.sort(consumersForTopic); int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size(); int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size(); List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic); for (int i = 0 , n = consumersForTopic.size(); i < n; i++) { int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition); int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1 ); assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length)); } } return assignment; }
假设 topic 的 partition 数为 numPartitionsForTopic,group 中订阅这个 topic 的 member 数为 consumersForTopic.size()
,首先需要算出两个值:
numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size()
:表示平均每个 consumer 会分配到几个 partition;
consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size()
:表示平均分配后还剩下多少个 partition 未分配。
分配的规则是:对于剩下的那些 partition 分配到前 consumersWithExtraPartition 个 consumer 上,也就是前 consumersWithExtraPartition 个 consumer 获得 topic-partition 列表会比后面多一个。
在上述的程序中,举了一个例子,假设有一个 topic 有 7 个 partition,group 有5个 consumer,这个5个 consumer 都订阅这个 topic,那么 range 的分配方式如下:
consumer 0:start: 0, length: 2, topic-partition: p0,p1;
consumer 1:start: 2, length: 2, topic-partition: p2,p3;
consumer 2:start: 4, length: 1, topic-partition: p4;
consumer 3:start: 5, length: 1, topic-partition: p5;
consumer 4:start: 6, length: 1, topic-partition: p6
而如果 group 中有 consumer 没有订阅这个 topic,那么这个 consumer 将不会参与分配。下面再举个例子,将有两个 topic,一个 partition 有5个,一个 partition 有7个,group 有5个 consumer,但是只有前3个订阅第一个 topic,而另一个 topic 是所有 consumer 都订阅了,那么其分配结果如下:
consumer
订阅的 topic1 的列表
订阅的 topic2 的列表
consumer 0
t1p0, t1p1
t2p0, t2p1
consumer 1
t1p2, t1p3
t2p2, t2p3
consumer 2
t1p4
t2p4
consumer 3
t2p5
consumer 4
t2p6
RoundRobinAssignor 这个是 roundrobin 的实现,其实现方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 @Override public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, Map<String, List<String>> subscriptions) { Map<String, List<TopicPartition>> assignment = new HashMap<>(); for (String memberId : subscriptions.keySet()) assignment.put(memberId, new ArrayList<TopicPartition>()); CircularIterator<String> assigner = new CircularIterator<>(Utils.sorted(subscriptions.keySet())); for (TopicPartition partition : allPartitionsSorted(partitionsPerTopic, subscriptions)) { final String topic = partition.topic(); while (!subscriptions.get(assigner.peek()).contains(topic)) assigner.next(); assignment.get(assigner.next()).add(partition); } return assignment; } public List<TopicPartition> allPartitionsSorted (Map<String, Integer> partitionsPerTopic, Map<String, List<String>> subscriptions) { SortedSet<String> topics = new TreeSet<>(); for (List<String> subscription : subscriptions.values()) topics.addAll(subscription); List<TopicPartition> allPartitions = new ArrayList<>(); for (String topic : topics) { Integer numPartitionsForTopic = partitionsPerTopic.get(topic); if (numPartitionsForTopic != null ) allPartitions.addAll(AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic)); } return allPartitions; }
roundrobin 的实现原则,简单来说就是:列出所有 topic-partition 和列出所有的 consumer member,然后开始分配,一轮之后继续下一轮,假设有有一个 topic,它有7个 partition,group 有3个 consumer 都订阅了这个 topic,那么其分配方式为:
consumer
分配列表
consumer 0
tp0, tp3, tp6
consumer 1
tp1, tp4
consumer 2
tp2, tp5
对于多个 topic 的订阅,将有两个 topic,一个 partition 有5个,一个 partition 有7个,group 有5个 consumer,但是只有前3个订阅第一个 topic,而另一个 topic 是所有 consumer 都订阅了,那么其分配结果如下:
consumer
订阅的 topic1 的列表
订阅的 topic2 的列表
consumer 0
t1p0, t1p3
t2p0, t2p5
consumer 1
t1p1, t1p4
t2p1, t2p6
consumer 2
t1p2
t2p2
consumer 3
t2p3
consumer 4
t2p4
roundrobin 分配方式与 range 的分配方式还是略有不同。