if(!(throwable instanceof CancellationException)) { handleFailedSlotRequest(slotId, allocationId, throwable); } else { LOG.debug("Slot allocation request {} has been cancelled.", allocationId, throwable); } } } catch(Exception e) { LOG.error("Error while completing the slot allocation.", e); } }, mainThreadExecutor); }
------------------------------------------------------------------------ @Override public CompletableFuture<Acknowledge> requestSlot(final SlotID slotId, // 注释: SlotID final JobID jobId, // 注释: JobID final AllocationID allocationId, // 注释: AllocationID final ResourceProfile resourceProfile, // 注释: ResourceProfile final String targetAddress, // 注释: ResourceManager ID final ResourceManagerId resourceManagerId, // 注释: ResourceManagerId final Time timeout) { // TODO: Filter invalid requests from the resource manager by using the instance/registration Id
log.info("Receive slot request {} for job {} from resource manager with leader id {}.", allocationId, jobId, resourceManagerId);
if(!isConnectedToResourceManager(resourceManagerId)) { finalStringmessage= String.format("TaskManager is not connected to the resource manager %s.", resourceManagerId); log.debug(message); return FutureUtils.completedExceptionally(newTaskManagerException(message)); }
/************************************************* * * 注释: 启动 Job, 并不是一个真正的 Job,而是一个代表是否有链接存在 */ final JobTable.Job job; try { job = jobTable.getOrCreateJob(jobId, () -> registerNewJobAndCreateServices(jobId, targetAddress)); } catch(Exception e) { // free the allocated slot try { taskSlotTable.freeSlot(allocationId); } catch(SlotNotFoundException slotNotFoundException) { // slot no longer existent, this should actually never happen, because we've // just allocated the slot. So let's fail hard in this case! onFatalError(slotNotFoundException); }
// release local state under the allocation id. localStateStoresManager.releaseLocalStateForAllocationId(allocationId);
// 检查是否能够成功预留资源 if (!budgetManager.reserve(resourceProfile)) { // 资源不足,无法分配 LOG.info( "Cannot allocate the requested resources. Trying to allocate {}, while the currently remaining available resources are {}, total is {}.", resourceProfile, budgetManager.getAvailableBudget(), budgetManager.getTotalBudget()); returnfalse; // 预留失败,返回false }
acceptedSlotsFuture.whenCompleteAsync(handleAcceptedSlotOffers(jobId, jobMasterGateway, jobMasterId, reservedSlots), getMainThreadExecutor()); } else { log.debug("There are no unassigned slots for the job {}.", jobId); } }
/** * 作用:接收来自TaskExecutor的Slot * */ @Override public CompletableFuture<Collection<SlotOffer>> offerSlots(final ResourceID taskManagerId, final Collection<SlotOffer> slots, final Time timeout) {
if(!pendingTaskManagerSlotOptional.isPresent()) { pendingTaskManagerSlotOptional = allocateResource(resourceProfile); } OptionalConsumer.of(pendingTaskManagerSlotOptional) .ifPresent(pendingTaskManagerSlot -> assignPendingTaskManagerSlot(pendingSlotRequest, pendingTaskManagerSlot)).ifNotPresent(() -> { // request can not be fulfilled by any free slot or pending slot that can be allocated, // check whether it can be fulfilled by allocated slots if(failUnfulfillableRequest && !isFulfillableByRegisteredOrPendingSlots(pendingSlotRequest.getResourceProfile())) { thrownewUnfulfillableSlotRequestException(pendingSlotRequest.getAllocationId(), pendingSlotRequest.getResourceProfile()); } }); }
private Optional<PendingTaskManagerSlot> allocateResource(ResourceProfile requestedSlotResourceProfile) { finalintnumRegisteredSlots= getNumberRegisteredSlots(); finalintnumPendingSlots= getNumberPendingTaskManagerSlots(); if(isMaxSlotNumExceededAfterAdding(numSlotsPerWorker)) { LOG.warn("Could not allocate {} more slots. The number of registered and pending slots is {}, while the maximum is {}.", numSlotsPerWorker, numPendingSlots + numRegisteredSlots, maxSlotNum); return Optional.empty(); }
LOG.debug("Registering TaskManager {} under {} at the SlotManager.", taskExecutorConnection.getResourceID(), taskExecutorConnection.getInstanceID());
// 检查TaskManager是否为首次注册 if(taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) { //非首次注册的情况(一般情况下都是首次注册) reportSlotStatus(taskExecutorConnection.getInstanceID(), initialSlotReport); returnfalse; } else { if(isMaxSlotNumExceededAfterRegistration(initialSlotReport)) { LOG.info("The total number of slots exceeds the max limitation {}, release the excess resource.", maxSlotNum); resourceActions .releaseResource(taskExecutorConnection.getInstanceID(), newFlinkException("The total number of slots exceeds the max limitation.")); returnfalse; }
//首先检查该Slot是否首次注册,如果非首次注册则从已注册的TaskManagerSlot列表中移除老的Slot if(slots.containsKey(slotId)) { // remove the old slot first removeSlot(slotId, newSlotManagerException(String.format("Re-registration of slot %s. This indicates that the TaskExecutor has re-connected.", slotId))); }
if(taskManagerRegistration != null) { //更新TaskManagerSlot的状态 updateSlotState(slot, taskManagerRegistration, allocationId, jobId); returntrue; } else { thrownewIllegalStateException("Trying to update a slot from a TaskManager " + slot.getInstanceId() + " which has not been registered."); } } else { LOG.debug("Trying to update unknown slot with slot id {}.", slotId);
/** * TaskManagerSlot状态变化: * Slot的申请与分配过程中涉及TaskManagerSlot的状态变化 * 如果分配allocationId为空,表示TaskManagerSlot对应的TaskExecutor的Slot还没被分配占用 * 如果不为空,则表示已经别配占用 * */ privatevoidupdateSlotState(TaskManagerSlot slot, TaskManagerRegistration taskManagerRegistration, @Nullable AllocationID allocationId, @Nullable JobID jobId) { if(null != allocationId) { switch(slot.getState()) { /** * 1. 当汇报的Slot已分配占有,且TaskMangerSlot的状态为待分配(PENDING)时。 * 先查找TaskMangerSlot绑定的待分配Slot申请,再比较该待分配slot申请的allocationId * 与汇报上来的Slot的allocationId是否相等。 * 如果相等: * 取消待分配的Slot申请并将其中待分配的Slot申请列表中移除,标记本次分配Slot申请成功, * 同时TaskMangerSlot状态从待分配状态(PENDING),转换到已分配状态(ALLOCATED) * * 不相等: * 将TaskMangerSlot绑定的待分配slot申请拒绝掉,根据汇报的slot分配ID匹配待分配的申请, * 将匹配的待分配slot申请取消并从待分配的Slot申请列表中移除,标记本次分配slot申请完成 * 同时TaskMangerSlot状态从待分配变换为已分配。 * */ case PENDING: // we have a pending slot request --> check whether we have to reject it PendingSlotRequestpendingSlotRequest= slot.getAssignedSlotRequest();
if(Objects.equals(pendingSlotRequest.getAllocationId(), allocationId)) { // we can cancel the slot request because it has been fulfilled cancelPendingSlotRequest(pendingSlotRequest);
// remove the pending slot request, since it has been completed pendingSlotRequests.remove(pendingSlotRequest.getAllocationId());
slot.completeAllocation(allocationId, jobId); } else { // we first have to free the slot in order to set a new allocationId slot.clearPendingSlotRequest(); // set the allocation id such that the slot won't be considered for the pending slot request slot.updateAllocation(allocationId, jobId);
// remove the pending request if any as it has been assigned finalPendingSlotRequestactualPendingSlotRequest= pendingSlotRequests.remove(allocationId);
// this will try to find a new slot for the request rejectPendingSlotRequest(pendingSlotRequest, newException("Task manager reported slot " + slot.getSlotId() + " being already allocated.")); }
taskManagerRegistration.occupySlot(); break; case ALLOCATED: /** * 2. 当汇报的Slot已分配占有,且TaskManager的状态为已分配(ALLOCATED)时,如果TaskManagerSlot的 * 分配ID与汇报的Slot的分配ID不一致,则通过先释放后占用的方式,将TaskManagerSlot的分配ID * 变更为Slot的分配ID,TaskManagerSlot的状态变换为分配状态(ALLOCATED) -> (FREE) -> (ALLOCATED) * * */ if(!Objects.equals(allocationId, slot.getAllocationId())) { slot.freeSlot(); slot.updateAllocation(allocationId, jobId); } break; case FREE: /** * 3. 当汇报的Slot已分配占有,且TaskManagerSlot的状态为空闲,TaskManagerSlot从空闲的TaskManagerSlot列表移除 * 将TaskManagerSlot的分配ID设置为汇报的分配ID,然后TaskManagerSlot由空闲状态转换为已分配(ALLOCATED)状态. * */ // the slot is currently free --> it is stored in freeSlots freeSlots.remove(slot.getSlotId()); slot.updateAllocation(allocationId, jobId); taskManagerRegistration.occupySlot(); break; }
case FREE: /** * 3. 当汇报的Slot已分配占有,且TaskManagerSlot的状态为空闲,TaskManagerSlot从空闲的TaskManagerSlot列表移除 * 将TaskManagerSlot的分配ID设置为汇报的分配ID,然后TaskManagerSlot由空闲状态转换为已分配(ALLOCATED)状态. * */ // the slot is currently free --> it is stored in freeSlots freeSlots.remove(slot.getSlotId()); slot.updateAllocation(allocationId, jobId); taskManagerRegistration.occupySlot(); break;
/** * 1. 当汇报的Slot已分配占有,且TaskMangerSlot的状态为待分配(PENDING)时。 * 先查找TaskMangerSlot绑定的待分配Slot申请,再比较该待分配slot申请的allocationId * 与汇报上来的Slot的allocationId是否相等。 * 如果相等: * 取消待分配的Slot申请并将其中待分配的Slot申请列表中移除,标记本次分配Slot申请成功, * 同时TaskMangerSlot状态从待分配状态(PENDING),转换到已分配状态(ALLOCATED) * * 不相等: * 将TaskMangerSlot绑定的待分配slot申请拒绝掉,根据汇报的slot分配ID匹配待分配的申请, * 将匹配的待分配slot申请取消并从待分配的Slot申请列表中移除,标记本次分配slot申请完成 * 同时TaskMangerSlot状态从待分配变换为已分配。 * */ case PENDING: // we have a pending slot request --> check whether we have to reject it PendingSlotRequestpendingSlotRequest= slot.getAssignedSlotRequest();
if(Objects.equals(pendingSlotRequest.getAllocationId(), allocationId)) { // we can cancel the slot request because it has been fulfilled cancelPendingSlotRequest(pendingSlotRequest);
// remove the pending slot request, since it has been completed pendingSlotRequests.remove(pendingSlotRequest.getAllocationId());
slot.completeAllocation(allocationId, jobId); } else { // we first have to free the slot in order to set a new allocationId slot.clearPendingSlotRequest(); // set the allocation id such that the slot won't be considered for the pending slot request slot.updateAllocation(allocationId, jobId);
// remove the pending request if any as it has been assigned finalPendingSlotRequestactualPendingSlotRequest= pendingSlotRequests.remove(allocationId);
// this will try to find a new slot for the request rejectPendingSlotRequest(pendingSlotRequest, newException("Task manager reported slot " + slot.getSlotId() + " being already allocated.")); }