/** * Do one round of polling. In addition to checking for new data, this does any needed offset commits * (if auto-commit is enabled), and offset resets (if an offset reset policy is defined). * @param timeout The maximum time to block in the underlying call to {@link ConsumerNetworkClient#poll(long)}. * @return The fetched records (may be empty) */ // note: 一次 poll 过程,包括检查新的数据、做一些必要的 commit 以及 offset 重置操作 private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) { // note: 1. 获取 GroupCoordinator 并连接、加入 Group、sync Group, 期间 group 会进行 rebalance 并获取 coordinator.poll(time.milliseconds()); // assignment
// fetch positions if we have partitions we're subscribed to that we // don't know the offset for // note: 2. 更新要拉取 partition 的 offset(如果需要更新的话) if (!subscriptions.hasAllFetchPositions()) updateFetchPositions(this.subscriptions.missingFetchPositions());
// if data is available already, return it immediately // note: 3. 获取 fetcher 已经拉取到的数据 Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords(); if (!records.isEmpty()) return records; // note: 说明上次 fetch 到是的数据已经全部拉取了,需要再次发送 fetch 请求,从 broker 拉取数据
// send any new fetches (won't resend pending fetches) // note: 4. 向订阅的所有 partition 发送 fetch 请求,会从多个 partition 拉取数据 fetcher.sendFetches();
long now = time.milliseconds(); long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout);
//note: 5. 调用 poll 方法发送数据 client.poll(pollTimeout, now, new PollCondition() { @Override publicbooleanshouldBlock(){ // since a fetch might be completed by the background thread, we need this poll condition // to ensure that we do not block unnecessarily in poll() return !fetcher.hasCompletedFetches(); } });
// after the long poll, we should check whether the group needs to rebalance // prior to returning data so that the group can stabilize faster //note: 6. 如果 group 需要 rebalance, 直接返回空数据,这样更快地让 group 进行稳定状态 if (coordinator.needRejoin()) return Collections.emptyMap();
return fetcher.fetchedRecords(); }
在这里,我们把一个 pollOnce 模型分为6个部分,这里简单介绍一下:
连接 GroupCoordinator,并发送 join-group、sync-group 请求,加入 group 成功,并获取其分配的 tp 列表;
更新这些分配的 tp 列表的 the last committed offset(没有的话,根据其设置进行获取 offset);
调用 Fetcher 获取拉取的数据,如果有数据,立马返回,没有的话就进行下面的操作;
调用 Fetcher 发送 fetch 请求(只是加入队列,并未真正发送);
调用 poll() 方法发送请求;
如果 group 之前是需要 rebalacne 的,直接返回空集合,这样可以便于 group 尽快达到一个稳定的状态。
一个 Consumer 实例消费数据的前提是能够加入一个 group 成功,并获取其要订阅的 tp(topic-partition)列表,这都是在第一步中完成的,如果这个 group 是一个新的 group,那么 group 的状态将会由 Empty –> PreparingRebalance –> AwaitSync –> Stable 的变化过程,下面将会详细介绍。
// note: Step2 判断是否需要重新加入 group,如果订阅的 partition 变化或则分配的 partition 变化时,需要 rejoin if (needRejoin()) { // due to a race condition between the initial metadata fetch and the initial rebalance, // we need to ensure that the metadata is fresh before joining initially. This ensures // that we have matched the pattern against the cluster's topics at least once before joining. // note: rejoin group 之前先刷新一下 metadata(对于 AUTO_PATTERN 而言) if (subscriptions.hasPatternSubscription()) client.ensureFreshMetadata();
// note: 确保 group 是 active; 加入 group; 分配订阅的 partition ensureActiveGroup(); now = time.milliseconds(); }
if (future.failed()) {// note: 如果获取的过程中失败了 if (future.isRetriable()) { remainingMs = timeoutMs - (time.milliseconds() - startTimeMs); if (remainingMs <= 0) break;
log.debug("Coordinator discovery failed for group {}, refreshing metadata", groupId); client.awaitMetadataUpdate(remainingMs); } else throw future.exception(); } elseif (coordinator != null && client.connectionFailed(coordinator)) { // we found the coordinator, but the connection has failed, so mark // it dead and backoff before retrying discovery coordinatorDead(); time.sleep(retryBackoffMs); }
// note: 选择一个连接最小的节点,发送 groupCoordinator 请求 protectedsynchronized RequestFuture<Void> lookupCoordinator(){ if (findCoordinatorFuture == null) { // find a node to ask about the coordinator Node node = this.client.leastLoadedNode();//NOTE: 找一个节点,发送 groupCoordinator 的请求 if (node == null) { // TODO: If there are no brokers left, perhaps we should use the bootstrap set // from configuration? log.debug("No broker available to send GroupCoordinator request for group {}", groupId); return RequestFuture.noBrokersAvailable(); } else findCoordinatorFuture = sendGroupCoordinatorRequest(node);//NOTE: 发送请求,并对 response 进行处理 } return findCoordinatorFuture; }
//NOTE: 发送 GroupCoordinator 的请求 private RequestFuture<Void> sendGroupCoordinatorRequest(Node node){ // initiate the group metadata request log.debug("Sending GroupCoordinator request for group {} to broker {}", groupId, node); GroupCoordinatorRequest.Builder requestBuilder = new GroupCoordinatorRequest.Builder(this.groupId); return client.send(node, requestBuilder) .compose(new GroupCoordinatorResponseHandler()); //NOTE: compose 的作用是将 GroupCoordinatorResponseHandler 类转换为 RequestFuture //NOTE: 实际上就是为返回的 Future 类重置 onSuccess() 和 onFailure() 方法 }
//NOTE: 确保 Group 是 active,并且加入该 group publicvoidensureActiveGroup(){ // always ensure that the coordinator is ready because we may have been disconnected // when sending heartbeats and does not necessarily require us to rejoin the group. ensureCoordinatorReady();//NOTE: 确保 GroupCoordinator 已经连接 startHeartbeatThreadIfNeeded();//NOTE: 启动心跳发送线程(并不一定发送心跳,满足条件后才会发送心跳) joinGroupIfNeeded();//NOTE: 发送 JoinGroup 请求,并对返回的信息进行处理 }
// note: join group publicvoidjoinGroupIfNeeded(){ while (needRejoin() || rejoinIncomplete()) { ensureCoordinatorReady(); // call onJoinPrepare if needed. We set a flag to make sure that we do not call it a second // time if the client is woken up before a pending rebalance completes. This must be called // on each iteration of the loop because an event requiring a rebalance (such as a metadata // refresh which changes the matched subscription set) can occur while another rebalance is // still in progress. //note: 触发 onJoinPrepare, 包括 offset commit 和 rebalance listener if (needsJoinPrepare) { onJoinPrepare(generation.generationId, generation.memberId); needsJoinPrepare = false; }
//NOTE: 发送 JoinGroup 的请求, 并添加 listener privatesynchronized RequestFuture<ByteBuffer> initiateJoinGroup(){ // we store the join future in case we are woken up by the user after beginning the // rebalance in the call to poll below. This ensures that we do not mistakenly attempt // to rejoin before the pending rebalance has completed. if (joinFuture == null) { // fence off the heartbeat thread explicitly so that it cannot interfere with the join group. // Note that this must come after the call to onJoinPrepare since we must be able to continue // sending heartbeats if that callback takes some time. // note: rebalance 期间,心跳线程停止 disableHeartbeatThread();
state = MemberState.REBALANCING;//NOTE: 标记为 rebalance joinFuture = sendJoinGroupRequest();//NOTE: 发送 JoinGroup 请求 joinFuture.addListener(new RequestFutureListener<ByteBuffer>() { @Override publicvoidonSuccess(ByteBuffer value){ // handle join completion in the callback so that the callback will be invoked // even if the consumer is woken up before finishing the rebalance synchronized (AbstractCoordinator.this) { log.info("Successfully joined group {} with generation {}", groupId, generation.generationId); state = MemberState.STABLE;//NOTE: 标记 Consumer 为 stable
if (heartbeatThread != null) heartbeatThread.enable(); } }
@Override publicvoidonFailure(RuntimeException e){ // we handle failures below after the request finishes. if the join completes // after having been woken up, the exception is ignored and we will rejoin synchronized (AbstractCoordinator.this) { state = MemberState.UNJOINED;//NOTE: 标记 Consumer 为 Unjoined } } }); } return joinFuture; }
/** * Join the group and return the assignment for the next generation. This function handles both * JoinGroup and SyncGroup, delegating to {@link #performAssignment(String, String, Map)} if * elected leader by the coordinator. * @return A request future which wraps the assignment returned from the group leader */ //NOTE: 发送 JoinGroup 请求并返回 the assignment for the next generation(这个是在 JoinGroupResponseHandler 中做的) private RequestFuture<ByteBuffer> sendJoinGroupRequest(){ if (coordinatorUnknown()) return RequestFuture.coordinatorNotAvailable();
// send a join group request to the coordinator log.info("(Re-)joining group {}", groupId); JoinGroupRequest.Builder requestBuilder = new JoinGroupRequest.Builder( groupId, this.sessionTimeoutMs, this.generation.memberId, protocolType(), metadata()).setRebalanceTimeout(this.rebalanceTimeoutMs);
//NOTE: 处理 JoinGroup response 的 handler(同步 group 信息) privateclassJoinGroupResponseHandlerextendsCoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> { @Override publicvoidhandle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future){ Errors error = Errors.forCode(joinResponse.errorCode()); if (error == Errors.NONE) { log.debug("Received successful JoinGroup response for group {}: {}", groupId, joinResponse); sensors.joinLatency.record(response.requestLatencyMs());
synchronized (AbstractCoordinator.this) { if (state != MemberState.REBALANCING) {//NOTE: 如果此时 Consumer 的状态不是 rebalacing,就引起异常 // if the consumer was woken up before a rebalance completes, we may have already left // the group. In this case, we do not want to continue with the sync group. future.raise(new UnjoinedGroupException()); } else { AbstractCoordinator.this.generation = new Generation(joinResponse.generationId(), joinResponse.memberId(), joinResponse.groupProtocol()); AbstractCoordinator.this.rejoinNeeded = false; //NOTE: join group 成功,下面需要进行 sync-group,获取分配的 tp 列表。 if (joinResponse.isLeader()) { onJoinLeader(joinResponse).chain(future); } else { onJoinFollower().chain(future); } } } } elseif (error == Errors.GROUP_LOAD_IN_PROGRESS) { log.debug("Attempt to join group {} rejected since coordinator {} is loading the group.", groupId, coordinator()); // backoff and retry future.raise(error); } elseif (error == Errors.UNKNOWN_MEMBER_ID) { // reset the member id and retry immediately resetGeneration(); log.debug("Attempt to join group {} failed due to unknown member id.", groupId); future.raise(Errors.UNKNOWN_MEMBER_ID); } elseif (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR_FOR_GROUP) { // re-discover the coordinator and retry with backoff coordinatorDead(); log.debug("Attempt to join group {} failed due to obsolete coordinator information: {}", groupId, error.message()); future.raise(error); } elseif (error == Errors.INCONSISTENT_GROUP_PROTOCOL || error == Errors.INVALID_SESSION_TIMEOUT || error == Errors.INVALID_GROUP_ID) { // log the error and re-throw the exception log.error("Attempt to join group {} failed due to fatal error: {}", groupId, error.message()); future.raise(error); } elseif (error == Errors.GROUP_AUTHORIZATION_FAILED) { future.raise(new GroupAuthorizationException(groupId)); } else { // unexpected error, throw the exception future.raise(new KafkaException("Unexpected error in join group response: " + error.message())); } } }
//note: 当 consumer 为 follower 时,从 GroupCoordinator 拉取分配结果 private RequestFuture<ByteBuffer> onJoinFollower(){ // send follower's sync group with an empty assignment SyncGroupRequest.Builder requestBuilder = new SyncGroupRequest.Builder(groupId, generation.generationId, generation.memberId, Collections.<String, ByteBuffer>emptyMap()); log.debug("Sending follower SyncGroup for group {} to coordinator {}: {}", groupId, this.coordinator, requestBuilder); return sendSyncGroupRequest(requestBuilder); }
//note: 当 consumer 客户端为 leader 时,对 group 下的所有实例进行分配,将 assign 的结果发送到 GroupCoordinator private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse){ try { // perform the leader synchronization and send back the assignment for the group Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.leaderId(), joinResponse.groupProtocol(), joinResponse.members());//NOTE: 进行 assign 操作
SyncGroupRequest.Builder requestBuilder = new SyncGroupRequest.Builder(groupId, generation.generationId, generation.memberId, groupAssignment); log.debug("Sending leader SyncGroup for group {} to coordinator {}: {}", groupId, this.coordinator, requestBuilder); return sendSyncGroupRequest(requestBuilder);//NOTE: 发送 sync-group 请求 } catch (RuntimeException e) { return RequestFuture.failure(e); } }
// note: 加入 group 成功 @Override protectedvoidonJoinComplete(int generation, String memberId, String assignmentStrategy, ByteBuffer assignmentBuffer){ // only the leader is responsible for monitoring for metadata changes (i.e. partition changes) if (!isLeader) assignmentSnapshot = null;
// set the flag to refresh last committed offsets //note: 设置是否需要拉取 last committed offsets 为 true subscriptions.needRefreshCommits();
// update partition assignment //note: 更新订阅的 tp list subscriptions.assignFromSubscribed(assignment.partitions());
// check if the assignment contains some topics that were not in the original // subscription, if yes we will obey what leader has decided and add these topics // into the subscriptions as long as they still match the subscribed pattern // // TODO this part of the logic should be removed once we allow regex on leader assign Set<String> addedTopics = new HashSet<>(); for (TopicPartition tp : subscriptions.assignedPartitions()) { if (!joinedSubscription.contains(tp.topic())) addedTopics.add(tp.topic()); }
if (!addedTopics.isEmpty()) { Set<String> newSubscription = new HashSet<>(subscriptions.subscription()); Set<String> newJoinedSubscription = new HashSet<>(joinedSubscription); newSubscription.addAll(addedTopics); newJoinedSubscription.addAll(addedTopics);
// update the metadata and enforce a refresh to make sure the fetcher can start // fetching data in the next iteration //note: 更新 metadata,确保在下一次循环中可以拉取 this.metadata.setTopics(subscriptions.groupSubscription()); client.ensureFreshMetadata();
// give the assignor a chance to update internal state based on the received assignment assignor.onAssignment(assignment);
// reschedule the auto commit starting from now this.nextAutoCommitDeadline = time.milliseconds() + autoCommitIntervalMs;
// execute the user's callback after rebalance //note: 执行 listener ConsumerRebalanceListener listener = subscriptions.listener(); log.info("Setting newly assigned partitions {} for group {}", subscriptions.assignedPartitions(), groupId); try { Set<TopicPartition> assigned = new HashSet<>(subscriptions.assignedPartitions()); listener.onPartitionsAssigned(assigned); } catch (WakeupException | InterruptException e) { throw e; } catch (Exception e) { log.error("User provided listener {} for group {} failed on partition assignment", listener.getClass().getName(), groupId, e); } }