//note: 初始化 TaskManagerServices(TM 相关服务的初始化都在这里) TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration( taskManagerServicesConfiguration, taskManagerMetricGroup.f1, rpcService.getExecutor()); // TODO replace this later with some dedicated executor for io.
//note: 创建 taskEventDispatcher final TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
// start the I/O manager, it will create some temp directories. //note: 创建 IO 管理器 final IOManager ioManager = new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());
//note: 创建 KvStateService 实例并启动 final KvStateService kvStateService = KvStateService.fromConfiguration(taskManagerServicesConfiguration); kvStateService.start();
//note: 初始化 taskManagerLocation,记录 connection 信息 final TaskManagerLocation taskManagerLocation = new TaskManagerLocation( taskManagerServicesConfiguration.getResourceID(), taskManagerServicesConfiguration.getTaskManagerAddress(), dataPort);
// this call has to happen strictly after the network stack has been initialized //note: 初始化 MemoryManager final MemoryManager memoryManager = createMemoryManager(taskManagerServicesConfiguration); finallong managedMemorySize = memoryManager.getMemorySize();
//note: 初始化 BroadcastVariableManager 对象 final BroadcastVariableManager broadcastVariableManager = new BroadcastVariableManager();
//note: 注册一个超时(AKKA 超时设置)服务(在 TaskSlotTable 用于监控 slot 分配是否超时) final TimerService<AllocationID> timerService = new TimerService<>( new ScheduledThreadPoolExecutor(1), taskManagerServicesConfiguration.getTimerServiceShutdownTimeout());
//note: 这里会维护 slot 相关列表 final TaskSlotTable taskSlotTable = new TaskSlotTable(resourceProfiles, timerService);
//note: 维护 jobId 与 JobManager connection 之间的关系 final JobManagerTable jobManagerTable = new JobManagerTable();
//note: 监控注册的 job 的 JobManger leader 信息 final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation, taskManagerServicesConfiguration.getRetryingRegistrationConfiguration());
final String[] stateRootDirectoryStrings = taskManagerServicesConfiguration.getLocalRecoveryStateRootDirectories();
final File[] stateRootDirectoryFiles = new File[stateRootDirectoryStrings.length];
for (int i = 0; i < stateRootDirectoryStrings.length; ++i) { stateRootDirectoryFiles[i] = new File(stateRootDirectoryStrings[i], LOCAL_STATE_SUB_DIRECTORY_ROOT); }
//note: 创建 TaskExecutorLocalStateStoresManager 对象:维护状态信息 final TaskExecutorLocalStateStoresManager taskStateManager = new TaskExecutorLocalStateStoresManager( taskManagerServicesConfiguration.isLocalRecoveryEnabled(), stateRootDirectoryFiles, taskIOExecutor);
//note: 与集群的 ResourceManager 建立连接(并创建一个 listener) // start by connecting to the ResourceManager resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
// tell the task slot table who's responsible for the task slot actions //note: taskSlotTable 启动 taskSlotTable.start(new SlotActionsImpl());
// start the job leader service //note: 启动 job leader 服务 jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());
@Override publicvoidjobManagerGainedLeadership( final JobID jobId, final JobMasterGateway jobManagerGateway, final JMTMRegistrationSuccess registrationMessage){ //note: 建立与 JobManager 的连接 runAsync( () -> establishJobManagerConnection( jobId, jobManagerGateway, registrationMessage)); }
@Override publicvoidjobManagerLostLeadership(final JobID jobId, final JobMasterId jobMasterId){ log.info("JobManager for job {} with leader id {} lost leadership.", jobId, jobMasterId);
runAsync(() -> closeJobManagerConnection( jobId, new Exception("Job leader for job id " + jobId + " lost leadership."))); }
//note: slot 请求 @Override public CompletableFuture<Acknowledge> requestSlot( final SlotID slotId, final JobID jobId, final AllocationID allocationId, final String targetAddress, final ResourceManagerId resourceManagerId, final Time timeout){ // TODO: Filter invalid requests from the resource manager by using the instance/registration Id
log.info("Receive slot request {} for job {} from resource manager with leader id {}.", allocationId, jobId, resourceManagerId);
try { if (!isConnectedToResourceManager(resourceManagerId)) { //note: 如果 TM 并没有跟这个 RM 通信,就抛出异常 final String message = String.format("TaskManager is not connected to the resource manager %s.", resourceManagerId); log.debug(message); thrownew TaskManagerException(message); }
if (taskSlotTable.isSlotFree(slotId.getSlotNumber())) { //note: Slot 状态是 free,还未分配出去 if (taskSlotTable.allocateSlot(slotId.getSlotNumber(), jobId, allocationId, taskManagerConfiguration.getTimeout())) { log.info("Allocated slot for {}.", allocationId); //note: allcate 成功 } else { log.info("Could not allocate slot for {}.", allocationId); thrownew SlotAllocationException("Could not allocate slot."); } } elseif (!taskSlotTable.isAllocated(slotId.getSlotNumber(), jobId, allocationId)) { //note: slot 已经分配出去,但分配的并不是当前这个作业 final String message = "The slot " + slotId + " has already been allocated for a different job.";
log.info(message);
final AllocationID allocationID = taskSlotTable.getCurrentAllocation(slotId.getSlotNumber()); thrownew SlotOccupiedException(message, allocationID, taskSlotTable.getOwningJob(allocationID)); }
if (jobManagerTable.contains(jobId)) { //note: 如果 TM 已经有这个 JobManager 的 meta,这里会将这个 job 的 slot 分配再汇报给 JobManager 一次 offerSlotsToJobManager(jobId); } else { try { //note: 监控这个作业 JobManager 的 leader 变化 jobLeaderService.addJob(jobId, targetAddress); } catch (Exception e) { // free the allocated slot try { taskSlotTable.freeSlot(allocationId); } catch (SlotNotFoundException slotNotFoundException) { // slot no longer existent, this should actually never happen, because we've // just allocated the slot. So let's fail hard in this case! onFatalError(slotNotFoundException); }
// release local state under the allocation id. localStateStoresManager.releaseLocalStateForAllocationId(allocationId);
// sanity check if (!taskSlotTable.isSlotFree(slotId.getSlotNumber())) { onFatalError(new Exception("Could not free slot " + slotId)); }
thrownew SlotAllocationException("Could not add job to job leader service.", e); } } } catch (TaskManagerException taskManagerException) { return FutureUtils.completedExceptionally(taskManagerException); }
//note: 分配这个 TaskSlot boolean result = taskSlot.allocate(jobId, allocationId);
if (result) { //note: 分配成功,记录到缓存中 // update the allocation id to task slot map allocationIDTaskSlotMap.put(allocationId, taskSlot);
// register a timeout for this slot since it's in state allocated timerService.registerTimeout(allocationId, slotTimeout.getSize(), slotTimeout.getUnit());
// add this slot to the set of job slots Set<AllocationID> slots = slotsPerJob.get(jobId);
if (slots == null) { slots = new HashSet<>(4); slotsPerJob.put(jobId, slots); }
if (isConnectedToResourceManager()) { //note: 通知 ResourceManager 这个 slot 因为被释放了,所以可以变可用了 // the slot was freed. Tell the RM about it ResourceManagerGateway resourceManagerGateway = establishedResourceManagerConnection.getResourceManagerGateway();
resourceManagerGateway.notifySlotAvailable( establishedResourceManagerConnection.getTaskExecutorRegistrationId(), new SlotID(getResourceID(), slotIndex), allocationId); }
if (jobId != null) { closeJobManagerConnectionIfNoAllocatedResources(jobId); } } } catch (SlotNotFoundException e) { log.debug("Could not free slot for allocation id {}.", allocationId, e); }