Kafka 源码解析之 Consumer 两种订阅模式(八)
在前面两篇 Kafka Consumer 的文章中,Consumer Poll 模型这部分基本上已经完整结束,Consumer 这块的文章计划是要写五篇,这篇是 Consumer 这块的第三篇,本来计划是要从其中的三个小块细节内容着手,这三个地方有一个相同之处,那就是在 Kafka Consumer 中都提供了两个不同的解决方案,但具体怎么去使用是需要用户根据自己的业务场景去配置,这里会讲述其底层的具体实现(但为了阅读得更为方便,本来计划的这篇文章将拆分为两篇来,第一篇先讲述第一点,后面两点放在一起讲述)。
本篇文章讲述的这三点内容分别是:
- consumer 的两种订阅模式,
subscribe()
和assign()
模式,一种是 topic 粒度(使用 group 管理),一种是 topic-partition 粒度(用户自己去管理); - consumer 的两种 commit 实现,
commitAsync()
和commitSync()
,即同步 commit 和异步 commit; - consumer 提供的两种不同
partition.assignment.strategy
,这是关于一个 group 订阅一些 topic 后,group 内各个 consumer 实例的 partition 分配策略。
0.9.X 之前 Kafka Consumer 是支持两个不同的订阅模型 —— high level 和 simple level,这两种模型的最大区别是:第一个其 offset 管理是由 Kafka 来做,包括 rebalance 操作,第二个则是由使用者自己去做,自己去管理相关的 offset,以及自己去进行 rebalance。
在新版的 consumer 中对 high level 和 simple level 的接口实现了统一,简化了相应的相应的编程模型。
订阅模式
在新版的 Consumer 中,high level 模型现在叫做订阅模式,KafkaConsumer 提供了三种 API,如下:
1 | // 订阅指定的 topic 列表,并且会自动进行动态 partition 订阅 |
以上三种 API 都是按照 topic 级别去订阅,可以动态地获取其分配的 topic-partition,这是使用 Group 动态管理,它不能与手动 partition 管理一起使用。当监控到发生下面的事件时,Group 将会触发 rebalance 操作:
- 订阅的 topic 列表变化;
- topic 被创建或删除;
- consumer group 的某个 consumer 实例挂掉;
- 一个新的 consumer 实例通过
join
方法加入到一个 group 中。
在这种模式下,当 KafkaConsumer 调用 pollOnce 方法时,第一步会首先加入到一个 group 中,并获取其分配的 topic-partition 列表(见Kafka 源码解析之 Consumer 如何加入一个 Group(六)),前面两篇文章都是以这种情况来讲述的。
这里介绍一下当调用 subscribe()
方法之后,Consumer 所做的事情,分两种情况介绍,一种按 topic 列表订阅,一种是按 pattern 模式订阅:
- topic 列表订阅
- 更新 SubscriptionState 中记录的
subscription
(记录的是订阅的 topic 列表),将 SubscriptionType 类型设置为 AUTO_TOPICS; - 更新 metadata 中的 topic 列表(
topics
变量),并请求更新 metadata;
- 更新 SubscriptionState 中记录的
- pattern 模式订阅
- 更新 SubscriptionState 中记录的
subscribedPattern
,设置为 pattern,将 SubscriptionType 类型设置为 AUTO_PATTERN; - 设置 Metadata 的 needMetadataForAllTopics 为 true,即在请求 metadata 时,需要更新所有 topic 的 metadata 信息,设置后再请求更新 metadata;
- 调用
coordinator.updatePatternSubscription()
方法,遍历所有 topic 的 metadata,找到所有满足 pattern 的 topic 列表,更新到 SubscriptionState 的subscriptions
和 Metadata 的topics
中; - 通过在 ConsumerCoordinator 中调用
addMetadataListener()
方法在 Metadata 中添加 listener 当每次 metadata update 时就调用第三步的方法更新,但是只有当本地缓存的 topic 列表与现在要订阅的 topic 列表不同时,才会触发 rebalance 操作。
- 更新 SubscriptionState 中记录的
其他部分,两者基本一样,只是 pattern 模型在每次更新 topic-metadata 时,获取全局的 topic 列表,如果发现有新加入的符合条件的 topic,就立马去订阅,其他的地方,包括 Group 管理、topic-partition 的分配都是一样的。
分配模式
下面来看一下 Consumer 提供的分配模式,熟悉 0.8.X 版本的人,可能会把这种方法称为 simple consumer 的接口,当调用 assign()
方法手动分配 topic-partition 列表时,是不会使用 consumer 的 Group 管理机制,也即是当 consumer group member 变化或 topic 的 metadata 信息变化时是不会触发 rebalance 操作的。比如:当 topic 的 partition 增加时,这里是无法感知,需要用户进行相应的处理,Apache Flink 就是使用的这种方式,后续我会写篇文章介绍 Flink 是如何实现这种机制的。
1 | //note: 手动向 consumer 分配一些 topic-partition 列表,并且这个接口不允许增加分配的 topic-partition 列表,将会覆盖之前分配的 topic-partition 列表,如果给定的 topic-partition 列表为空,它的作用将会与 unsubscribe() 方法一样。 |
这里来看一下 Kafka 提供的 Group 管理到底是什么?
如果有印象的话,在Kafka 源码解析之 Consumer 如何加入一个 Group(六)中介绍 Poll 模型的第一步中,详细介绍了 ConsumerCoordinator.poll()
方法,我们再来看一下这个方法:
1 | // note: 它确保了这个 group 的 coordinator 是已知的,并且这个 consumer 是已经加入到了 group 中,也用于 offset 周期性的 commit |
如果使用的是 assign 模式,也即是非 AUTO_TOPICS 或 AUTO_PATTERN 模式时,Consumer 实例在调用 poll 方法时,是不会向 GroupCoordinator 发送 join-group/sync-group/heartbeat 请求的,也就是说 GroupCoordinator 是拿不到这个 Consumer 实例的相关信息,也不会去维护这个 member 是否存活,这种情况下就需要用户自己管理自己的处理程序。但是在这种模式是可以进行 offset commit的。
commit offset 请求处理
当 Kafka Serve 端受到来自 client 端的 Offset Commit 请求时,其处理逻辑如下所示,是在 kafka.coordinator.GroupCoordinator
中实现的。
1 | // kafka.coordinator.GroupCoordinator |
处理过程如下:
- 如果这个 group 还不存在(groupManager没有这个 group 信息),并且 generation 为 -1(一般情况下应该都是这样),就新建一个 GroupMetadata, 其 Group 状态为 Empty;
- 现在 group 已经存在,就调用
doCommitOffsets()
提交 offset; - 如果是来自 assign 模式的请求,并且其对应的 group 的状态为 Empty(generationId < 0 && group.is(Empty)),那么就记录这个 offset;
- 如果是来自 assign 模式的请求,但这个 group 的状态不为 Empty(!group.has(memberId)),也就是说,这个 group 已经处在活跃状态,assign 模式下的 group 是不会处于的活跃状态的,可以认为是 assign 模式使用的 group.id 与 subscribe 模式下使用的 group 相同,这种情况下就会拒绝 assign 模式下的这个 offset commit 请求。
小结
根据上面的讲述,这里做一下小节,如下图所示:
简单做一下总结:
模式 | 不同之处 | 相同之处 |
---|---|---|
subscribe() | 使用 Kafka Group 管理,自动进行 rebalance 操作 | 可以在 Kafka 保存 offset |
assign() | 用户自己进行相关的处理 | 也可以进行 offset commit,但是尽量保证 group.id 唯一性,如果使用一个与上面模式一样的 group,offset commit 请求将会被拒绝 |
公众号
个人公众号(柳年思水)已经上线,最新文章会同步在公众号发布,欢迎大家关注~