@Override public CompletableFuture<Acknowledge> requestSlot( final SlotID slotId, final JobID jobId, final AllocationID allocationId, final String targetAddress, final ResourceManagerId resourceManagerId, final Time timeout) { // TODO: Filter invalid requests from the resource manager by using the instance/registration Id
log.info("Receive slot request {} for job {} from resource manager with leader id {}.", allocationId, jobId, resourceManagerId);
try { /** * 检查TaskExecutor是否于ResourceManager建立了连接,如果没有,则返回TaskManager is not connected to the resource manager * */ if (!isConnectedToResourceManager(resourceManagerId)) { finalStringmessage= String.format("TaskManager is not connected to the resource manager %s.", resourceManagerId); log.debug(message); thrownewTaskManagerException(message); } /** * 分配占有Slot, * 分配占有之前判断申请slot的请求种的slotId对应的Slot是否可以分配占有 * 判断依据是Slot是否为空闲(处于FREE状态) * * */ if (taskSlotTable.isSlotFree(slotId.getSlotNumber())) { //如果是空闲的,则调用allocateSlot方法占用该slot if (taskSlotTable.allocateSlot(slotId.getSlotNumber(), jobId, allocationId, taskManagerConfiguration.getTimeout())) { log.info("Allocated slot for {}.", allocationId); } else { log.info("Could not allocate slot for {}.", allocationId); //占有失败了,返回Could not allocate slot thrownewSlotAllocationException("Could not allocate slot."); } /** * 对于对应的Slot不是空闲的情况,判断slotId对应的slot是否已经被本次的 * slot申请占有(重复申请请求的情况) * 如果不是 则返回Slot已经被占用的异常。 * */ } elseif (!taskSlotTable.isAllocated(slotId.getSlotNumber(), jobId, allocationId)) { finalStringmessage="The slot " + slotId + " has already been allocated for a different job.";
// register a timeout for this slot since it's in state allocated /** * 并通过timerService注册本次分配对应的Slot从已分配状态(Allocated)转换到活跃状态的超时逻辑。 * 在超时的情况下会将本次分配的taskSlot转换为空闲状态(FREE)。 * */ timerService.registerTimeout(allocationId, slotTimeout.getSize(), slotTimeout.getUnit());
/** * 将Slot提供给JobMaster的过程 * */ privatevoidofferSlotsToJobManager(final JobID jobId) { finalJobManagerConnectionjobManagerConnection= jobManagerTable.get(jobId); //先检查TaskExecutor是否与作业对应的JobMater建立连接 if (jobManagerConnection == null) { log.debug("There is no job manager connection to the leader of job {}.", jobId); } else { //建立连接 if (taskSlotTable.hasAllocatedSlots(jobId)) { log.info("Offer reserved slots to the leader of job {}.", jobId);
finalJobMasterGatewayjobMasterGateway= jobManagerConnection.getJobManagerGateway(); //则从taskSlotTable中筛选出已经被该作业占有但不处于活跃状态的TaskSlot final Iterator<TaskSlot> reservedSlotsIterator = taskSlotTable.getAllocatedSlots(jobId); finalJobMasterIdjobMasterId= jobManagerConnection.getJobMasterId();
final Collection<SlotOffer> reservedSlots = newHashSet<>(2); //并从reservedSlotsIterator中提取一些信息来组装成Slot提供信息列表(allocationId,slotIndex,resourceProfile) while (reservedSlotsIterator.hasNext()) { SlotOfferoffer= reservedSlotsIterator.next().generateSlotOffer(); reservedSlots.add(offer); }
finalJobIDjobId= taskSlot.getJobId(); //判断该Slot上有没有运行的任务,如果没有,则将Slot标记为空闲状态并返回 if (taskSlot.markFree()) { // remove the allocation id to task slot mapping allocationIDTaskSlotMap.remove(allocationId);
// unregister a potential timeout timerService.unregisterTimeout(allocationId);
Set<AllocationID> slots = slotsPerJob.get(jobId);
if (slots == null) { thrownewIllegalStateException("There are no more slots allocated for the job " + jobId + ". This indicates a programming bug."); }
slots.remove(allocationId);
if (slots.isEmpty()) { slotsPerJob.remove(jobId); }
/** * TaskExecutor向JobMaster提供Slot的返回信息处理 * */ @Nonnull private BiConsumer<Iterable<SlotOffer>, Throwable> handleAcceptedSlotOffers(JobID jobId, JobMasterGateway jobMasterGateway, JobMasterId jobMasterId, Collection<SlotOffer> offeredSlots) { return (Iterable<SlotOffer> acceptedSlots, Throwable throwable) -> { if (throwable != null) { //向obMaster提供Slot请求超时,会重新调用offerSlotsToJobManager if (throwable instanceof TimeoutException) { log.info("Slot offering to JobManager did not finish in time. Retrying the slot offering."); // We ran into a timeout. Try again. offerSlotsToJobManager(jobId); } else { log.warn("Slot offering to JobManager failed. Freeing the slots " + "and returning them to the ResourceManager.", throwable);
/** * 对于返回的异常非超时的异常,即JobMaster返回的是未识别的TaskManager异常 * 将SlotOffer列表对应的TaskSlot释放掉 * */ for (SlotOffer reservedSlot: offeredSlots) { freeSlotInternal(reservedSlot.getAllocationId(), throwable); } } } else { // 返回的结果为JobMaster接收的SlotOffer列表时,检测TaskExecutor是否与JobMaster连接 if (isJobManagerConnectionValid(jobId, jobMasterId)) { // mark accepted slots active for (SlotOffer acceptedSlot : acceptedSlots) { try { /** * 会将返回的SlotOffer列表(提供成功的Slot列表)中对应的TaskSlot由原来的已分配状态(allocated)标记为活跃状态(Active) * 并移除在timerService中主注册的超时检测逻辑,调用markSlotActive方法. * */ if (!taskSlotTable.markSlotActive(acceptedSlot.getAllocationId())) { // the slot is either free or releasing at the moment finalStringmessage="Could not mark slot " + jobId + " active."; log.debug(message); jobMasterGateway.failSlot( getResourceID(), acceptedSlot.getAllocationId(), newFlinkException(message)); } } catch (SlotNotFoundException e) { //对于转换失败的到活跃状态的Slot,通过JobMaster调用failSlot方法来以失败的方式释放。 finalStringmessage="Could not mark slot " + jobId + " active."; jobMasterGateway.failSlot( getResourceID(), acceptedSlot.getAllocationId(), newFlinkException(message)); }
offeredSlots.remove(acceptedSlot); }
finalExceptione=newException("The slot was rejected by the JobManager.");
//对于未成功提供给JobMaster的Slot,通过freeSlotInternal方法释放 for (SlotOffer rejectedSlot : offeredSlots) { freeSlotInternal(rejectedSlot.getAllocationId(), e); } } else { // discard the response since there is a new leader for the job log.debug("Discard offer slot response since there is a new leader " + "for the job {}.", jobId); } } }; }
这里的内容比较多,我们慢慢看一下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
if (throwable != null) { //向obMaster提供Slot请求超时,会重新调用offerSlotsToJobManager if (throwable instanceof TimeoutException) { log.info("Slot offering to JobManager did not finish in time. Retrying the slot offering."); // We ran into a timeout. Try again. offerSlotsToJobManager(jobId); } else { log.warn("Slot offering to JobManager failed. Freeing the slots " + "and returning them to the ResourceManager.", throwable);
// 返回的结果为JobMaster接收的SlotOffer列表时,检测TaskExecutor是否与JobMaster连接 if (isJobManagerConnectionValid(jobId, jobMasterId)) { // mark accepted slots active for (SlotOffer acceptedSlot : acceptedSlots) { try { /** * 会将返回的SlotOffer列表(提供成功的Slot列表)中对应的TaskSlot由原来的已分配状态(allocated)标记为活跃状态(Active) * 并移除在timerService中主注册的超时检测逻辑,调用markSlotActive方法. * */ if (!taskSlotTable.markSlotActive(acceptedSlot.getAllocationId())) { // the slot is either free or releasing at the moment finalStringmessage="Could not mark slot " + jobId + " active."; log.debug(message); jobMasterGateway.failSlot( getResourceID(), acceptedSlot.getAllocationId(), newFlinkException(message)); } } catch (SlotNotFoundException e) { //对于转换失败的到活跃状态的Slot,通过JobMaster调用failSlot方法来以失败的方式释放。 finalStringmessage="Could not mark slot " + jobId + " active."; jobMasterGateway.failSlot( getResourceID(), acceptedSlot.getAllocationId(), newFlinkException(message)); }
offeredSlots.remove(acceptedSlot); }000000000000000-ptione=newException("The slot was rejected by the JobManager.");
//对于未成功提供给JobMaster的Slot,通过freeSlotInternal方法释放 for (SlotOffer rejectedSlot : offeredSlots) { freeSlotInternal(rejectedSlot.getAllocationId(), e); } } else { // discard the response since there is a new leader for the job log.debug("Discard offer slot response since there is a new leader " + "for the job {}.", jobId); }
-------------------------------------------------------------------------------- publicbooleanmarkActive() { if (TaskSlotState.ALLOCATED == state || TaskSlotState.ACTIVE == state) { state = TaskSlotState.ACTIVE;