defexecuteAssignment(zkUtils: ZkUtils, reassignmentJsonString: String, throttle: Long = -1) { val partitionsToBeReassigned = parseAndValidate(zkUtils, reassignmentJsonString) val reassignPartitionsCommand = newReassignPartitionsCommand(zkUtils, partitionsToBeReassigned.toMap)
// If there is an existing rebalance running, attempt to change its throttle //note: 如果副本迁移正在进行,那么这次的副本迁移计划是无法提交的 if (zkUtils.pathExists(ZkUtils.ReassignPartitionsPath)) { println("There is an existing assignment running.") reassignPartitionsCommand.maybeLimit(throttle) } else { printCurrentAssignment(zkUtils, partitionsToBeReassigned) if (throttle >= 0) println(String.format("Warning: You must run Verify periodically, until the reassignment completes, to ensure the throttle is removed. You can also alter the throttle by rerunning the Execute command passing a new value.")) //note: 将迁移计划更新到 zk 上 if (reassignPartitionsCommand.reassignPartitions(throttle)) { println("Successfully started reassignment of partitions.") } else println("Failed to reassign partitions %s".format(partitionsToBeReassigned)) } }
/** * Invoked when some partitions need to move leader to preferred replica */ defdoHandleDataChange(dataPath: String, data: AnyRef) { inLock(controllerContext.controllerLock) { debug("Reassigned partitions isr change listener fired for path %s with children %s".format(dataPath, data)) val topicAndPartition = TopicAndPartition(topic, partition) try { // check if this partition is still being reassigned or not //note: 检查这个副本是不是还在迁移中(这个方法只用于副本迁移中) controllerContext.partitionsBeingReassigned.get(topicAndPartition) match { caseSome(reassignedPartitionContext) => // need to re-read leader and isr from zookeeper since the zkclient callback doesn't return the Stat object //note: 从 zk 获取最新的 leader 和 isr 信息 val newLeaderAndIsrOpt = zkUtils.getLeaderAndIsrForPartition(topic, partition) newLeaderAndIsrOpt match { caseSome(leaderAndIsr) => // check if new replicas have joined ISR val caughtUpReplicas = reassignedReplicas & leaderAndIsr.isr.toSet if(caughtUpReplicas == reassignedReplicas) { //note: 新分配的副本已经全部在 isr 中了 // resume the partition reassignment process info("%d/%d replicas have caught up with the leader for partition %s being reassigned." .format(caughtUpReplicas.size, reassignedReplicas.size, topicAndPartition) + "Resuming partition reassignment") //note: 再次触发 onPartitionReassignment 方法,副本已经迁移完成 controller.onPartitionReassignment(topicAndPartition, reassignedPartitionContext) } else { //note: 否则不进行任何处理 info("%d/%d replicas have caught up with the leader for partition %s being reassigned." .format(caughtUpReplicas.size, reassignedReplicas.size, topicAndPartition) + "Replica(s) %s still need to catch up".format((reassignedReplicas -- leaderAndIsr.isr.toSet).mkString(","))) } caseNone => error("Error handling reassignment of partition %s to replicas %s as it was never created" .format(topicAndPartition, reassignedReplicas.mkString(","))) } caseNone => } } catch { case e: Throwable => error("Error while handling partition reassignment", e) } } }
//note: 这个回调方法被 reassigned partitions listener 触发,当需要进行分区副本迁移时,会在【/admin/reassign_partitions】下创建一个节点来触发操作 //note: RAR: 重新分配的副本, OAR: 这个分区原来的副本列表, AR: 当前的分配的副本 defonPartitionReassignment(topicAndPartition: TopicAndPartition, reassignedPartitionContext: ReassignedPartitionsContext) { val reassignedReplicas = reassignedPartitionContext.newReplicas if (!areReplicasInIsr(topicAndPartition.topic, topicAndPartition.partition, reassignedReplicas)) { //note: 新分配的并没有权限在 isr 中 info("New replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) + "reassigned not yet caught up with the leader") //note: RAR-OAR val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicAndPartition).toSet //note: RAR+OAR val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicAndPartition)).toSet //1. Update AR in ZK with OAR + RAR. updateAssignedReplicasForPartition(topicAndPartition, newAndOldReplicas.toSeq) //2. Send LeaderAndIsr request to every replica in OAR + RAR (with AR as OAR + RAR). updateLeaderEpochAndSendRequest(topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition), newAndOldReplicas.toSeq) //3. replicas in RAR - OAR -> NewReplica //note: 新分配的副本状态更新为 NewReplica(在第二步中发送 LeaderAndIsr 请求时,新的副本会开始创建并且同步数据) startNewReplicasForReassignedPartition(topicAndPartition, reassignedPartitionContext, newReplicasNotInOldReplicaList) info("Waiting for new replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) + "reassigned to catch up with the leader") } else { //note: 新副本全在 isr 中了 //4. Wait until all replicas in RAR are in sync with the leader. //note: 【OAR-RAR】 val oldReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).toSet -- reassignedReplicas.toSet //5. replicas in RAR -> OnlineReplica //note: RAR 中的副本都在 isr 中了,将副本状态设置为 OnlineReplica reassignedReplicas.foreach { replica => replicaStateMachine.handleStateChanges(Set(newPartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, replica)), OnlineReplica) } //6. Set AR to RAR in memory. //7. Send LeaderAndIsr request with a potential new leader (if current leader not in RAR) and // a new AR (using RAR) and same isr to every broker in RAR //note: 到这一步,新加入的 replica 已经同步完成,leader和isr都更新到最新的结果 moveReassignedPartitionLeaderIfRequired(topicAndPartition, reassignedPartitionContext) //8. replicas in OAR - RAR -> Offline (force those replicas out of isr) //9. replicas in OAR - RAR -> NonExistentReplica (force those replicas to be deleted) //note: 下线旧的副本 stopOldReplicasOfReassignedPartition(topicAndPartition, reassignedPartitionContext, oldReplicas) //10. Update AR in ZK with RAR. updateAssignedReplicasForPartition(topicAndPartition, reassignedReplicas) //11. Update the /admin/reassign_partitions path in ZK to remove this partition. //note: partition 迁移完成,从待迁移的集合中移除该 Partition removePartitionFromReassignedPartitions(topicAndPartition) info("Removed partition %s from the list of reassigned partitions in zookeeper".format(topicAndPartition)) controllerContext.partitionsBeingReassigned.remove(topicAndPartition) //12. After electing leader, the replicas and isr information changes, so resend the update metadata request to every broker //note: 发送 metadata 更新请求给所有存活的 broker sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicAndPartition)) // signal delete topic thread if reassignment for some partitions belonging to topics being deleted just completed //note: topic 删除恢复(如果当前 topic 设置了删除,之前由于无法删除) deleteTopicManager.resumeDeletionForTopics(Set(topicAndPartition.topic)) } }
这个方法整体分为以下12个步骤:
把 AR = OAR+RAR ({1、2、3、4})更新到 zk 及本地 Controller 缓存中;
发送 LeaderAndIsr 给 AR 中每一个副本,并且会强制更新 zk 中 leader 的 epoch;