long begin = time.milliseconds(); long remainingWaitMs = maxWaitMs; long elapsed;
// 发送 metadata 请求,直到获取了这个 topic 的 metadata 或者请求超时 do { log.trace("Requesting metadata update for topic {}.", topic); int version = metadata.requestUpdate();// 返回当前版本号,初始值为0,每次更新时会自增,并将 needUpdate 设置为 true sender.wakeup();// 唤起 sender,发送 metadata 请求 try { metadata.awaitUpdate(version, remainingWaitMs);// 等待 metadata 的更新 } catch (TimeoutException ex) { // Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs thrownew TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); } cluster = metadata.fetch(); elapsed = time.milliseconds() - begin; if (elapsed >= maxWaitMs) thrownew TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");// 超时 if (cluster.unauthorizedTopics().contains(topic))// 认证失败,对当前 topic 没有 Write 权限 thrownew TopicAuthorizationException(topic); remainingWaitMs = maxWaitMs - elapsed; partitionsCount = cluster.partitionCountForTopic(topic); } while (partitionsCount == null);// 不停循环,直到 partitionsCount 不为 null(即直到 metadata 中已经包含了这个 topic 的相关信息)
if (partition != null && partition >= partitionsCount) { thrownew KafkaException( String.format("Invalid partition given with record: %d is not in the range [0...%d).", partition, partitionsCount)); }
returnnew ClusterAndWaitTime(cluster, elapsed); }
如果 metadata 中不存在这个 topic 的 metadata,那么就请求更新 metadata,如果 metadata 没有更新的话,方法就一直处在 do ... while 的循环之中,在循环之中,主要做以下操作:
// 更新 metadata 信息(根据当前 version 值来判断) publicsynchronizedvoidawaitUpdate(finalint lastVersion, finallong maxWaitMs)throws InterruptedException { if (maxWaitMs < 0) { thrownew IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds"); } long begin = System.currentTimeMillis(); long remainingWaitMs = maxWaitMs; while (this.version <= lastVersion) {// 不断循环,直到 metadata 更新成功,version 自增 if (remainingWaitMs != 0) wait(remainingWaitMs);// 阻塞线程,等待 metadata 的更新 long elapsed = System.currentTimeMillis() - begin; if (elapsed >= maxWaitMs)// timeout thrownew TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); remainingWaitMs = maxWaitMs - elapsed; } }
在 Metadata.awaitUpdate() 方法中,线程会阻塞在 while 循环中,直到 metadata 更新成功或者 timeout。
从前面可以看出,此时 Producer 线程会阻塞在两个 while 循环中,直到 metadata 信息更新,那么 metadata 是如何更新的呢?如果有印象的话,前面应该已经介绍过了,主要是通过 sender.wakeup() 来唤醒 sender 线程,间接唤醒 NetworkClient 线程,NetworkClient 线程来负责发送 Metadata 请求,并处理 Server 端的响应。
publiclongmaybeUpdate(long now){ // should we update our metadata? // metadata 是否应该更新 long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);// metadata 下次更新的时间(需要判断是强制更新还是 metadata 过期更新,前者是立马更新,后者是计算 metadata 的过期时间) // 如果一条 metadata 的 fetch 请求还未从 server 收到恢复,那么时间设置为 waitForMetadataFetch(默认30s) long waitForMetadataFetch = this.metadataFetchInProgress ? requestTimeoutMs : 0;
long metadataTimeout = Math.max(timeToNextMetadataUpdate, waitForMetadataFetch); if (metadataTimeout > 0) {// 时间未到时,直接返回下次应该更新的时间 return metadataTimeout; }
Node node = leastLoadedNode(now);// 选择一个连接数最小的节点 if (node == null) { log.debug("Give up sending metadata request since no node is available"); return reconnectBackoffMs; }
/** * Add a metadata request to the list of sends if we can make one */ // 判断是否可以发送请求,可以的话将 metadata 请求加入到发送列表中 privatelongmaybeUpdate(long now, Node node){ String nodeConnectionId = node.idString();
// If there's any connection establishment underway, wait until it completes. This prevents // the client from unnecessarily connecting to additional nodes while a previous connection // attempt has not been completed. if (isAnyNodeConnecting()) {// 如果 client 正在与任何一个 node 的连接状态是 connecting,那么就进行等待 // Strictly the timeout we should return here is "connect timeout", but as we don't // have such application level configuration, using reconnect backoff instead. return reconnectBackoffMs; }
if (connectionStates.canConnect(nodeConnectionId, now)) {// 如果没有连接这个 node,那就初始化连接 // we don't have a connection to this node right now, make one log.debug("Initialize connection to node {} for sending metadata request", node.id()); initiateConnect(node, now);// 初始化连接 return reconnectBackoffMs; } return Long.MAX_VALUE; }
// 处理任何已经完成的接收响应 privatevoidhandleCompletedReceives(List<ClientResponse> responses, long now){ for (NetworkReceive receive : this.selector.completedReceives()) { String source = receive.source(); InFlightRequest req = inFlightRequests.completeNext(source); AbstractResponse body = parseResponse(receive.payload(), req.header); log.trace("Completed receive from node {}, for key {}, received {}", req.destination, req.header.apiKey(), body); if (req.isInternalRequest && body instanceof MetadataResponse)// 如果是 meta 响应 metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body); elseif (req.isInternalRequest && body instanceof ApiVersionsResponse) handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) body); // 如果是其他响应 else responses.add(req.completed(body, now)); } }
// 处理 Server 端对 Metadata 请求处理后的 response publicvoidhandleCompletedMetadataResponse(RequestHeader requestHeader, long now, MetadataResponse response){ this.metadataFetchInProgress = false; Cluster cluster = response.cluster(); // check if any topics metadata failed to get updated Map<String, Errors> errors = response.errors(); if (!errors.isEmpty()) log.warn("Error while fetching metadata with correlation id {} : {}", requestHeader.correlationId(), errors);
// don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being // created which means we will get errors and no nodes until it exists if (cluster.nodes().size() > 0) { this.metadata.update(cluster, now);// 更新 meta 信息 } else {// 如果 metadata 中 node 信息无效,则不更新 metadata 信息 log.trace("Ignoring empty metadata response with correlation id {}.", requestHeader.correlationId()); this.metadata.failedUpdate(now); } }