classSocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time, val credentialProvider: CredentialProvider) extendsLoggingwithKafkaMetricsGroup{
defstartup() { this.synchronized { //note: 一台 broker 一般只设置一个端口,当然这里也可以设置两个 config.listeners.foreach { endpoint => val listenerName = endpoint.listenerName val securityProtocol = endpoint.securityProtocol val processorEndIndex = processorBeginIndex + numProcessorThreads
//note: N 个 processor for (i <- processorBeginIndex until processorEndIndex) processors(i) = newProcessor(i, connectionQuotas, listenerName, securityProtocol)
defrun() { serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)//note: 注册 accept 事件 startupComplete() try { var currentProcessor = 0 while (isRunning) { try { val ready = nioSelector.select(500) if (ready > 0) { val keys = nioSelector.selectedKeys() val iter = keys.iterator() while (iter.hasNext && isRunning) { try { val key = iter.next iter.remove() if (key.isAcceptable) accept(key, processors(currentProcessor))//note: 拿到一个socket 连接,轮询选择一个processor进行处理 else thrownewIllegalStateException("Unrecognized key state for acceptor thread.")
//note: 轮询算法,使用 round robin // round robin to the next processor thread currentProcessor = (currentProcessor + 1) % processors.length } catch { case e: Throwable => error("Error while accepting connection", e) } } } } catch { // We catch all the throwables to prevent the acceptor thread from exiting on exceptions due // to a select operation on a specific channel or a bad request. We don't want // the broker to stop responding to requests from other clients in these scenarios. case e: ControlThrowable => throw e case e: Throwable => error("Error occurred", e) } } } finally { debug("Closing server socket and selector.") swallowError(serverChannel.close()) swallowError(nioSelector.close()) shutdownComplete() } }
debug("Accepted connection from %s on %s and assigned it to processor %d, sendBufferSize [actual|requested]: [%d|%d] recvBufferSize [actual|requested]: [%d|%d]" .format(socketChannel.socket.getRemoteSocketAddress, socketChannel.socket.getLocalSocketAddress, processor.id, socketChannel.socket.getSendBufferSize, sendBufferSize, socketChannel.socket.getReceiveBufferSize, recvBufferSize))
//note: 轮询选择不同的 processor 进行处理 processor.accept(socketChannel) } catch { case e: TooManyConnectionsException => info("Rejected connection from %s, address already has the configured maximum of %d connections.".format(e.ip, e.count)) close(socketChannel) } }
overridedefrun() { startupComplete() while (isRunning) { try { // setup any new connections that have been queued up configureNewConnections()//note: 对新的 socket 连接,并注册 READ 事件 // register any new responses for writing processNewResponses()//note: 处理 response 队列中 response poll() //note: 监听所有的 socket channel,是否有新的请求发送过来 processCompletedReceives() //note: 处理接收到请求,将其放入到 request queue 中 processCompletedSends() //note: 处理已经完成的发送 processDisconnected() //note: 处理断开的连接 } catch { // We catch all the throwables here to prevent the processor thread from exiting. We do this because // letting a processor exit might cause a bigger impact on the broker. Usually the exceptions thrown would // be either associated with a specific socket channel or a bad request. We just ignore the bad socket channel // or request. This behavior might need to be reviewed if we see an exception that need the entire broker to stop. case e: ControlThrowable => throw e case e: Throwable => error("Processor got uncaught exception.", e) } }
//note: 如果有新的连接过来,将该 Channel 的 OP_READ 事件注册到 selector 上 privatedefconfigureNewConnections() { while (!newConnections.isEmpty) { val channel = newConnections.poll() try { debug(s"Processor $id listening to new connection from ${channel.socket.getRemoteSocketAddress}") val localHost = channel.socket().getLocalAddress.getHostAddress val localPort = channel.socket().getLocalPort val remoteHost = channel.socket().getInetAddress.getHostAddress val remotePort = channel.socket().getPort val connectionId = ConnectionId(localHost, localPort, remoteHost, remotePort).toString selector.register(connectionId, channel) } catch { // We explicitly catch all non fatal exceptions and close the socket to avoid a socket leak. The other // throwables will be caught in processor and logged as uncaught exceptions. caseNonFatal(e) => val remoteAddress = channel.getRemoteAddress // need to close the channel here to avoid a socket leak. close(channel) error(s"Processor $id closed connection from $remoteAddress", e) } } }
//note: 处理一个新的 response 响应 privatedefprocessNewResponses() { var curr = requestChannel.receiveResponse(id) while (curr != null) { try { curr.responseAction match { caseRequestChannel.NoOpAction => //note: 如果这个请求不需要返回 response,再次注册该监听事件 // There is no response to send to the client, we need to read more pipelined requests // that are sitting in the server's socket buffer curr.request.updateRequestMetrics trace("Socket server received empty response to send, registering for read: " + curr) val channelId = curr.request.connectionId if (selector.channel(channelId) != null || selector.closingChannel(channelId) != null) selector.unmute(channelId) caseRequestChannel.SendAction => //note: 需要发送的 response,那么进行发送 sendResponse(curr) caseRequestChannel.CloseConnectionAction => //note: 要关闭的 response curr.request.updateRequestMetrics trace("Closing socket connection actively according to the response code.") close(selector, curr.request.connectionId) } } finally { curr = requestChannel.receiveResponse(id) } } }
/* `protected` for test usage */ //note: 发送的对应的 response protected[network] defsendResponse(response: RequestChannel.Response) { trace(s"Socket server received response to send, registering for write and sending data: $response") val channel = selector.channel(response.responseSend.destination) // `channel` can be null if the selector closed the connection because it was idle for too long if (channel == null) { warn(s"Attempting to send response via channel for which there is no open connection, connection id $id") response.request.updateRequestMetrics() } else { selector.send(response.responseSend) //note: 发送该 response inflightResponses += (response.request.connectionId -> response) //note: 添加到 inflinght 中 } }
//note: 处理接收到的所有请求 privatedefprocessCompletedReceives() { selector.completedReceives.asScala.foreach { receive => try { val openChannel = selector.channel(receive.source) val session = { // Only methods that are safe to call on a disconnected channel should be invoked on 'channel'. val channel = if (openChannel != null) openChannel else selector.closingChannel(receive.source) RequestChannel.Session(newKafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName), channel.socketAddress) } val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, listenerName = listenerName, securityProtocol = securityProtocol) requestChannel.sendRequest(req) //note: 添加到请求队列,如果队列满了,将会阻塞 selector.mute(receive.source) //note: 移除该连接的 OP_READ 监听 } catch { case e @ (_: InvalidRequestException | _: SchemaException) => // note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier error(s"Closing socket for ${receive.source} because of error", e) close(selector, receive.source) } } }
processCompletedSends
processCompletedSends() 方法是处理已经完成的发送,其实现如下:
1 2 3 4 5 6 7 8 9 10
privatedefprocessCompletedSends() { selector.completedSends.asScala.foreach { send => //note: response 发送完成,从正在发送的集合中移除 val resp = inflightResponses.remove(send.destination).getOrElse { thrownewIllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`") } resp.request.updateRequestMetrics() selector.unmute(send.destination) //note: 完成这个请求之后再次监听 OP_READ 事件 } }
classKafkaRequestHandlerPool(val brokerId: Int, val requestChannel: RequestChannel, val apis: KafkaApis, time: Time, numThreads: Int) extendsLoggingwithKafkaMetricsGroup{
/* a meter to track the average free capacity of the request handlers */ privateval aggregateIdleMeter = newMeter("RequestHandlerAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)
this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "], " val threads = newArray[Thread](numThreads) val runnables = newArray[KafkaRequestHandler](numThreads) //note: 建立 M 个(numThreads)KafkaRequestHandler for(i <- 0 until numThreads) { //note: requestChannel 是 Processor 存放 request 请求的地方,也是 Handler 处理完请求存放 response 的地方 runnables(i) = newKafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis, time) threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i)) threads(i).start() }
defrun() { while(true) { try { var req : RequestChannel.Request = null while (req == null) { // We use a single meter for aggregate idle percentage for the thread pool. // Since meter is calculated as total_recorded_value / time_window and // time_window is independent of the number of threads, each recorded idle // time should be discounted by # threads. val startSelectTime = time.nanoseconds req = requestChannel.receiveRequest(300) //note: 从 request queue 中拿去 request val idleTime = time.nanoseconds - startSelectTime aggregateIdleMeter.mark(idleTime / totalHandlerThreads) }
if(req eq RequestChannel.AllDone) { debug("Kafka request handler %d on broker %d received shut down command".format( id, brokerId)) return } req.requestDequeueTimeMs = time.milliseconds trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req)) apis.handle(req) //note: 处理请求,并将处理的结果通过 sendResponse 放入 response queue 中 } catch { case e: Throwable => error("Exception when handling request", e) } } }