/* start group coordinator */ // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue groupCoordinator = GroupCoordinator(config, zkUtils, replicaManager, Time.SYSTEM) groupCoordinator.startup()//note: 启动 groupCoordinator
GroupCoordinator 服务在调用 setup() 方法启动后,进行的操作如下,实际上只是把一个标志变量值 isActive 设置为 true,并且启动了一个后台线程来删除过期的 group metadata。
1 2 3 4 5 6 7 8 9 10
/** * Startup logic executed at the same time when the server starts up. */ defstartup(enableMetadataExpiration: Boolean = true) { info("Starting up.") if (enableMetadataExpiration) groupManager.enableMetadataExpiration() isActive.set(true) info("Startup complete.") }
/** * Group contains the following metadata: * * Membership metadata: * 1. Members registered in this group * 2. Current protocol assigned to the group (e.g. partition assignment strategy for consumers) * 3. Protocol metadata associated with group members * * State metadata: * 1. group state * 2. generation id * 3. leader id */ @nonthreadsafe //NOTE: group 的 meta 信息,对 group 级别而言,每个 group 都会有一个实例对象 private[coordinator] classGroupMetadata(val groupId: String, initialState: GroupState = Empty) {
privatevar state: GroupState = initialState // group 的状态 privateval members = new mutable.HashMap[String, MemberMetadata] // group 的 member 信息 privateval offsets = new mutable.HashMap[TopicPartition, OffsetAndMetadata] //对应的 commit offset privateval pendingOffsetCommits = new mutable.HashMap[TopicPartition, OffsetAndMetadata] // commit offset 成功后更新到上面的 map 中
var protocolType: Option[String] = None var generationId = 0// generation id var leaderId: String = null// leader consumer id var protocol: String = null }
/** * Member metadata contains the following metadata: * * Heartbeat metadata: * 1. negotiated heartbeat session timeout 心跳超时时间 * 2. timestamp of the latest heartbeat 上次发送心跳的时间 * * Protocol metadata: * 1. the list of supported protocols (ordered by preference) 支持的 partition reassign 协议 * 2. the metadata associated with each protocol * * In addition, it also contains the following state information: * * 1. Awaiting rebalance callback: when the group is in the prepare-rebalance state, * its rebalance callback will be kept in the metadata if the * member has sent the join group request * 2. Awaiting sync callback: when the group is in the awaiting-sync state, its sync callback * is kept in metadata until the leader provides the group assignment * and the group transitions to stable */ @nonthreadsafe //NOTE: 记录 group 中每个成员的状态信息 private[coordinator] classMemberMetadata(val memberId: String, val groupId: String, val clientId: String, val clientHost: String, val rebalanceTimeoutMs: Int, val sessionTimeoutMs: Int, val protocolType: String, var supportedProtocols: List[(String, Array[Byte])]) {}
GroupCoordinator 请求处理
正如前面所述,Kafka Server 端可以介绍的21种请求中,其中有8种是由 GroupCoordinator 来处理的,这里主要介绍一下,GroupCoordinator 如何处理这些请求的。
val offsetFetchResponse = // reject the request if not authorized to the group if (!authorize(request.session, Read, newResource(Group, offsetFetchRequest.groupId))) offsetFetchRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED) else { if (header.apiVersion == 0) { val (authorizedPartitions, unauthorizedPartitions) = offsetFetchRequest.partitions.asScala .partition(authorizeTopicDescribe)
// version 0 reads offsets from ZK val authorizedPartitionData = authorizedPartitions.map { topicPartition => val topicDirs = newZKGroupTopicDirs(offsetFetchRequest.groupId, topicPartition.topic) try { if (!metadataCache.contains(topicPartition.topic)) (topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION) else { val payloadOpt = zkUtils.readDataMaybeNull(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}")._1 payloadOpt match { caseSome(payload) => (topicPartition, newOffsetFetchResponse.PartitionData( payload.toLong, OffsetFetchResponse.NO_METADATA, Errors.NONE)) caseNone => (topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION) } } } catch { case e: Throwable => (topicPartition, newOffsetFetchResponse.PartitionData( OffsetFetchResponse.INVALID_OFFSET, OffsetFetchResponse.NO_METADATA, Errors.forException(e))) } }.toMap
val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNKNOWN_PARTITION).toMap newOffsetFetchResponse(Errors.NONE, (authorizedPartitionData ++ unauthorizedPartitionData).asJava, header.apiVersion) } else { // versions 1 and above read offsets from Kafka if (offsetFetchRequest.isAllPartitions) {//note: 获取这个 group 消费的所有 tp offset val (error, allPartitionData) = coordinator.handleFetchOffsets(offsetFetchRequest.groupId) if (error != Errors.NONE) offsetFetchRequest.getErrorResponse(error) else { // clients are not allowed to see offsets for topics that are not authorized for Describe //note: 如果没有 Describe 权限的话,不能查看相应的 offset val authorizedPartitionData = allPartitionData.filter { case (topicPartition, _) => authorizeTopicDescribe(topicPartition) } newOffsetFetchResponse(Errors.NONE, authorizedPartitionData.asJava, header.apiVersion) } } else { //note: 获取指定列表的 tp offset val (authorizedPartitions, unauthorizedPartitions) = offsetFetchRequest.partitions.asScala .partition(authorizeTopicDescribe) val (error, authorizedPartitionData) = coordinator.handleFetchOffsets(offsetFetchRequest.groupId, Some(authorizedPartitions)) if (error != Errors.NONE) offsetFetchRequest.getErrorResponse(error) else { val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNKNOWN_PARTITION).toMap newOffsetFetchResponse(Errors.NONE, (authorizedPartitionData ++ unauthorizedPartitionData).asJava, header.apiVersion) } } } }
trace(s"Sending offset fetch response $offsetFetchResponse for correlation id ${header.correlationId} to client ${header.clientId}.") requestChannel.sendResponse(newResponse(request, offsetFetchResponse)) }
/** * Store offsets by appending it to the replicated log and then inserting to cache */ //note: 记录 commit 的 offset defprepareStoreOffsets(group: GroupMetadata, consumerId: String, generationId: Int, offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata], responseCallback: immutable.Map[TopicPartition, Short] => Unit): Option[DelayedStore] = { // first filter out partitions with offset metadata size exceeding limit //note: 首先过滤掉 offset 信息超过范围的 metadata val filteredOffsetMetadata = offsetMetadata.filter { case (_, offsetAndMetadata) => validateOffsetMetadataLength(offsetAndMetadata.metadata) }
// construct the message set to append //note: 构造一个 msg set 追加 getMagicAndTimestamp(partitionFor(group.groupId)) match { caseSome((magicValue, timestampType, timestamp)) => val records = filteredOffsetMetadata.map { case (topicPartition, offsetAndMetadata) => Record.create(magicValue, timestampType, timestamp, GroupMetadataManager.offsetCommitKey(group.groupId, topicPartition), //note: key是一个三元组: group、topic、partition GroupMetadataManager.offsetCommitValue(offsetAndMetadata)) }.toSeq
val offsetTopicPartition = newTopicPartition(Topic.GroupMetadataTopicName, partitionFor(group.groupId))
// set the callback function to insert offsets into cache after log append completed defputCacheCallback(responseStatus: Map[TopicPartition, PartitionResponse]) { // the append response should only contain the topics partition if (responseStatus.size != 1 || ! responseStatus.contains(offsetTopicPartition)) thrownewIllegalStateException("Append status %s should only have one partition %s" .format(responseStatus, offsetTopicPartition))
// construct the commit response status and insert // the offset and metadata to cache if the append status has no error val status = responseStatus(offsetTopicPartition)
val responseCode = group synchronized { if (status.error == Errors.NONE) { //note: 如果已经追加到了 replicated log 中了,那么就更新其缓存 if (!group.is(Dead)) { //note: 更新到 group 的 offset 中 filteredOffsetMetadata.foreach { case (topicPartition, offsetAndMetadata) => group.completePendingOffsetWrite(topicPartition, offsetAndMetadata) } } Errors.NONE.code } else { if (!group.is(Dead)) { filteredOffsetMetadata.foreach { case (topicPartition, offsetAndMetadata) => group.failPendingOffsetWrite(topicPartition, offsetAndMetadata) } }
debug(s"Offset commit $filteredOffsetMetadata from group ${group.groupId}, consumer $consumerId " + s"with generation $generationId failed when appending to log due to ${status.error.exceptionName}")
// transform the log append error code to the corresponding the commit status error code val responseError = status.error match { caseErrors.UNKNOWN_TOPIC_OR_PARTITION | Errors.NOT_ENOUGH_REPLICAS | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND => Errors.GROUP_COORDINATOR_NOT_AVAILABLE
// compute the final error codes for the commit response val commitStatus = offsetMetadata.map { case (topicPartition, offsetAndMetadata) => if (validateOffsetMetadataLength(offsetAndMetadata.metadata)) (topicPartition, responseCode) else (topicPartition, Errors.OFFSET_METADATA_TOO_LARGE.code) }
// finally trigger the callback logic passed from the API layer responseCallback(commitStatus) }
group synchronized { group.prepareOffsetCommit(offsetMetadata) //note: 添加到 group 的 pendingOffsetCommits 中 }
defhandleLeaveGroup(groupId: String, memberId: String, responseCallback: Short => Unit) { if (!isActive.get) { responseCallback(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code) } elseif (!isCoordinatorForGroup(groupId)) { responseCallback(Errors.NOT_COORDINATOR_FOR_GROUP.code) } elseif (isCoordinatorLoadingInProgress(groupId)) { responseCallback(Errors.GROUP_LOAD_IN_PROGRESS.code) } else { groupManager.getGroup(groupId) match { caseNone => // if the group is marked as dead, it means some other thread has just removed the group // from the coordinator metadata; this is likely that the group has migrated to some other // coordinator OR the group is in a transient unstable phase. Let the consumer to retry // joining without specified consumer id, responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
caseSome(group) => group synchronized { if (group.is(Dead) || !group.has(memberId)) { responseCallback(Errors.UNKNOWN_MEMBER_ID.code) } else { val member = group.get(memberId) removeHeartbeatForLeavingMember(group, member)//NOTE: 认为心跳完成 onMemberFailure(group, member)//NOTE: 从 group 移除当前 member,并进行 rebalance responseCallback(Errors.NONE.code) } } } } }
privatedefonMemberFailure(group: GroupMetadata, member: MemberMetadata) { trace("Member %s in group %s has failed".format(member.memberId, group.groupId)) group.remove(member.memberId)//NOTE: 从 Group 移除当前 member 信息 group.currentState match { caseDead | Empty => caseStable | AwaitingSync => maybePrepareRebalance(group)//NOTE: 进行 rebalance casePreparingRebalance => joinPurgatory.checkAndComplete(GroupKey(group.groupId))//NOTE: 检查 join-group 是否可以完成 } }
从上面可以看出,GroupCoordinator 在处理 LEAVE_GROUP 请求时,实际上就是调用了 onMemberFailure() 方法,从 group 移除了失败的 member 的,并且将进行相应的状态转换:
如果 group 原来是在 Dead 或 Empty 时,那么由于 group 本来就没有 member,就不再进行任何操作;
//NOTE: Server 端处理心跳请求 defhandleHeartbeat(groupId: String, memberId: String, generationId: Int, responseCallback: Short => Unit) { if (!isActive.get) {//NOTE: GroupCoordinator 已经失败 responseCallback(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code) } elseif (!isCoordinatorForGroup(groupId)) {//NOTE: 当前的 GroupCoordinator 不包含这个 group responseCallback(Errors.NOT_COORDINATOR_FOR_GROUP.code) } elseif (isCoordinatorLoadingInProgress(groupId)) {//NOTE: group 的状态信息正在 loading,直接返回成功结果 // the group is still loading, so respond just blindly responseCallback(Errors.NONE.code) } else { groupManager.getGroup(groupId) match { caseNone => //NOTE: 当前 GroupCoordinator 不包含这个 group responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
caseSome(group) => //NOTE: 包含这个 group group synchronized { group.currentState match { caseDead => //NOTE: group 的状态已经变为 dead,意味着 group 的 meta 已经被清除,返回 UNKNOWN_MEMBER_ID 错误 // if the group is marked as dead, it means some other thread has just removed the group // from the coordinator metadata; this is likely that the group has migrated to some other // coordinator OR the group is in a transient unstable phase. Let the member retry // joining without the specified member id, responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
caseEmpty => //NOTE: group 的状态为 Empty, 意味着 group 的成员为空,返回 UNKNOWN_MEMBER_ID 错误 responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
caseAwaitingSync => //NOTE: group 状态为 AwaitingSync, 意味着 group 刚 rebalance 结束 if (!group.has(memberId)) //NOTE: group 不包含这个 member,返回 UNKNOWN_MEMBER_ID 错误 responseCallback(Errors.UNKNOWN_MEMBER_ID.code) else//NOTE: 返回当前 group 正在进行 rebalance,要求 client rejoin 这个 group responseCallback(Errors.REBALANCE_IN_PROGRESS.code)
casePreparingRebalance => //NOTE: group 状态为 PreparingRebalance if (!group.has(memberId)) { //NOTE: group 不包含这个 member,返回 UNKNOWN_MEMBER_ID 错误 responseCallback(Errors.UNKNOWN_MEMBER_ID.code) } elseif (generationId != group.generationId) { responseCallback(Errors.ILLEGAL_GENERATION.code) } else { //NOTE: 正常处理心跳信息,并返回 REBALANCE_IN_PROGRESS 错误 val member = group.get(memberId) //note: 更新心跳时间,认为心跳完成,并监控下次的调度情况(超时的话,会把这个 member 从 group 中移除) completeAndScheduleNextHeartbeatExpiration(group, member) responseCallback(Errors.REBALANCE_IN_PROGRESS.code) }
caseStable => if (!group.has(memberId)) { responseCallback(Errors.UNKNOWN_MEMBER_ID.code) } elseif (generationId != group.generationId) { responseCallback(Errors.ILLEGAL_GENERATION.code) } else { //NOTE: 正确处理心跳信息 val member = group.get(memberId) //note: 更新心跳时间,认为心跳完成,并监控下次的调度情况(超时的话,会把这个 member 从 group 中移除) completeAndScheduleNextHeartbeatExpiration(group, member) responseCallback(Errors.NONE.code) } } } } }
group 的状态机
GroupCoordinator 在进行 group 和 offset 相关的管理操作时,有一项重要的工作就是处理和维护 group 状态的变化,一个 Group 状态机如下如所示。
在这个状态机中,最核心就是 rebalance 操作,简单说一下 rebalance 过程:
当一些条件发生时将 group 从 Stable 状态变为 PreparingRebalance;
然后就是等待 group 中的所有 consumer member 发送 join-group 请求加入 group,如果都已经发送 join-group 请求,此时 GroupCoordinator 会向所有 member 发送 join-group response,那么 group 的状态变为 AwaitingSync;
leader consumer 会收到各个 member 订阅的 topic 详细信息,等待其分配好 partition 后,通过 sync-group 请求将结果发给 GroupCoordinator(非 leader consumer 发送的 sync-group 请求的 data 是为空的);
如果 GroupCoordinator 收到了 leader consumer 发送的 response,获取到了这个 group 各个 member 所分配的 topic-partition 列表,group 的状态就会变成 Stable。