Kafka 源码解析之 Controller 发送模型(二十一)
本篇主要讲述 Controller 向各个 Broker 发送请求的模型,算是对 Controller Channel Manager 部分的一个补充,在这篇文章中,将会看到 Controller 在处理 leader 切换、ShutDown 请求时如何向 Broker 发送相应的请求。
Kafka Controller 向 Broker 发送的请求类型主要分为三种:LeaderAndIsr、UpdateMetadata、StopReplica 请求,正如 Controller Channel Manager 这里介绍的,Controller 会为每台 Broker 初始化为一个 ControllerBrokerStateInfo 对象,该对象主要包含以下四个内容:
- NetworkClient:与 Broker 的网络连接对象;
- Node:Broker 的节点信息;
- MessageQueue:每个 Broker 对应的请求队列,Controller 向 Broker 发送的请求会想放在这个队列里;
- RequestSendThread:每台 Broker 对应的请求发送线程。
Controller 的请求发送模型
在讲述 Controller 发送模型之前,先看下 Controller 是如何向 Broker 发送请求的,这里以发送 metadata 更新请求为例,简略的代码如下:
1 | //note: 创建新的批量请求 |
这里有一个比较重要的对象,就是 ControllerBrokerRequestBatch 对象,可以认为它是一个专门用于批量请求发送的对象,在这个对象中有几个重要成员变量:
- leaderAndIsrRequestMap:记录每个 broker 与要发送的 LeaderAndIsr 请求集合的 map;
- stopReplicaRequestMap:记录每个 broker 与要发送的 StopReplica 集合的 map;
- updateMetadataRequestBrokerSet:记录要发送的 update-metadata 请求的 broker 集合;
- updateMetadataRequestPartitionInfoMap:记录 update-metadata 请求要更新的 Topic Partition 集合。
Controller 可以通过下面这三方法向这些集合添加相应的请求:
addLeaderAndIsrRequestForBrokers()
:向给定的 Broker 发送某个 Topic Partition 的 LeaderAndIsr 请求;addStopReplicaRequestForBrokers()
:向给定的 Broker 发送某个 Topic Partition 的 StopReplica 请求;addUpdateMetadataRequestForBrokers()
:向给定的 Broker 发送某一批 Partitions 的 UpdateMetadata 请求。
Controller 整体的请求模型概况如下图所示:
上述三个方法将相应的请求添加到对应的集合中后,然后通过 sendRequestsToBrokers()
方法将该请求添加到该 Broker 对应的请求队列中,接着再由该 Broker 对应的 RequestSendThread 去发送相应的请求。
ControllerBrokerRequestBatch
这节详细讲述一下关于 ControllerBrokerRequestBatch 的一些方法实现。
newBatch 方法
Controller 在添加请求前,都会先调用 newBatch()
方法,该方法的实现如下:
1 | //note: 创建新的请求前,确保前一批请求全部发送完毕 |
这个方法的主要作用是检查上一波的 LeaderAndIsr、UpdateMetadata、StopReplica 请求是否已经发送,正常情况下,Controller 在调用 sendRequestsToBrokers()
方法之后,这些集合中的请求都会被发送,发送之后,会将相应的请求集合清空,当然在异常情况可能会导致部分集合没有被清空,导致无法 newBatch()
,这种情况下,通常策略是重启 controller,因为现在 Controller 的设计还是有些复杂,在某些情况下还是可能会导致异常发生,并且有些异常还是无法恢复的。
添加 LeaderAndIsr 请求
Controller 可以通过 addLeaderAndIsrRequestForBrokers()
向指定 Broker 列表添加某个 Topic Partition 的 LeaderAndIsr 请求,其具体实现如下:
1 | //note: 将 LeaderAndIsr 添加到对应的 broker 中,还未开始发送数据 |
这个方法的处理流程如下:
- 向对应的 Broker 添加 LeaderAndIsr 请求,请求会被添加到 leaderAndIsrRequestMap 集合中;
- 并通过
addUpdateMetadataRequestForBrokers()
方法向所有的 Broker 添加这个 Topic-Partition 的 UpdateMatedata 请求,leader 或 isr 变动时,会向所有 broker 同步这个 Partition 的 metadata 信息,这样可以保证每台 Broker 上都有最新的 metadata 信息。
添加 UpdateMetadata 请求
Controller 可以通过 addUpdateMetadataRequestForBrokers()
向指定 Broker 列表添加某批 Partitions 的 UpdateMetadata 请求,其具体实现如下:
1 | //note: 向给行的 Broker 发送 UpdateMetadataRequest 请求 |
这个方法的实现逻辑如下:
- 首先过滤出要发送的 Partition 列表,如果没有指定要发送 partitions 列表,那么默认就是发送全局的 metadata 信息;
- 接着将已经标记为删除的 Partition 从上面的列表中移除;
- 将要发送的 Broker 列表添加到 updateMetadataRequestBrokerSet 集合中;
- 将前面过滤的 Partition 列表对应的 metadata 信息添加到对应的 updateMetadataRequestPartitionInfoMap 集合中;
- 将当前设置为删除的所有 Partition 的 metadata 信息也添加到 updateMetadataRequestPartitionInfoMap 集合中,添加前会把其 leader 设置为-2,这样 Broker 收到这个 Partition 的 metadata 信息之后就会知道这个 Partition 是设置删除标志。
添加 StopReplica 请求
Controller 可以通过 addStopReplicaRequestForBrokers()
向指定 Broker 列表添加某个 Topic Partition 的 StopReplica 请求,其具体实现如下:
1 | //note: 将 StopReplica 添加到对应的 Broker 中,还未开始发送数据 |
这个方法的实现逻辑比较简单,直接将 StopReplica 添加到 stopReplicaRequestMap 中。
向 Broker 发送请求
Controller 在添加完相应的请求后,最后一步都会去调用 sendRequestsToBrokers()
方法构造相应的请求,并把请求添加到 Broker 对应的 RequestQueue 中。
1 | //note: 发送请求给 broker(只是将对应处理后放入到对应的 queue 中) |
上面这个方法看着很复杂,其实做的事情很明确,就是将三个集合中的请求发送对应 Broker 的请求队列中,这里简单作一个总结:
- 从 leaderAndIsrRequestMap 集合中构造相应的 LeaderAndIsr 请求,通过 Controller 的
sendRequest()
方法将请求添加到 Broker 对应的 MessageQueue 中,最后清空 leaderAndIsrRequestMap 集合; - 从 updateMetadataRequestPartitionInfoMap 集合中构造相应的 UpdateMetadata 请求,,通过 Controller 的
sendRequest()
方法将请求添加到 Broker 对应的 MessageQueue 中,最后清空 updateMetadataRequestBrokerSet 和 updateMetadataRequestPartitionInfoMap 集合; - 从 stopReplicaRequestMap 集合中构造相应的 StopReplica 请求,在构造时会根据是否设置删除标志将要涉及的 Partition 分成两类,构造对应的请求,对于要删除数据的 StopReplica 会设置相应的回调函数,然后通过 Controller 的
sendRequest()
方法将请求添加到 Broker 对应的 MessageQueue 中,最后清空 stopReplicaRequestMap 集合。
走到这一步,Controller 要发送的请求算是都添加到对应 Broker 的 MessageQueue 中,后台的 RequestSendThread 线程会从这个请求队列中遍历相应的请求,发送给对应的 Broker。
公众号
个人公众号(柳年思水)已经上线,最新文章会同步在公众号发布,欢迎大家关注~