Flink源码解析(十) 这里的内容是关于jobManager和前面总结的resourceManager结合起来
前面我们总结了resourceManager主要就是资源的管理和分配slot的申请
现在我们总结一下JobManager怎么去resourceManager上面申请slot来形成一个闭环
我们先总结一下jobMaster
JobMaster总结
JobMaster类继承FencedRpcEndpoint类来实现带Token(Fencing Token)检查的RpcEndpoint
JobMaster类实现JobMasterGateway接口,来提供其他组件调用的RPC方法;
JobMaster类实现JobMasterService接口,来供JobManagerRunner调用。
JobManagerRunner负责JobMaster的创建与启动,以及与JobMaster首领选举相关的处理
在JobMaster组件中,最核心的组件为Scheduler和CheckpointCoordinator。在这里我们先总结为作业的任务调度申请slot的内容,这个检查点的内容先不看后面单独再看
其中Scheduler负责ExecutionGraph的调度
JobMaster的Scheduler的一个核心逻辑是为作业的任务调度申请Slot
而CheckpointCoordinator负责作业检查点的协调
JobMaste是申请Slot的流程的发起方,其中的SlotPool作为作业执行图在调度时提供Slot功能以及对Slot的生命周期管理,与作业一一对应(一个作业有一个SlotPool实例),其实现类为SlotPoolImpl
private final SlotPool slotPool
通过上面的总结内容我们知道了这个入口是slotPoolImpl,我们从这里入手
slotpool在申请slot的流程中分两部分的处理逻辑:
发起slot请求
接收来自TaskEecutor的slot
发起slot请求 我们先总结一下整个流程,带着流程去看代码会更加的清晰
1 2 3 4 5 6 当Scheduler调度作业的任务,需要分配Slot,但在SlotPool中没有匹配的空闲Slot时,会发起Slot请求。在JobMaster中,往SlotPool中发起Slot申请请求的处理过程如下 1)查看SlotPool的空闲列表中是否存在与任务所需的资源规格相匹配的空闲Slot,存在即返回,无则存在可能触发SlotPool发起申请新Slot的逻辑。 2)申请新的Slot。判断JobMaster是否已与ResourceManager连接:如未连接,则将申请Slot的请求记录下来,等待与ResourceManager连接后再向ResourceManager发起申请Slot的请求;如已连接,则直接向ResourceManager发起申请Slot的请求。 3)在向ResourceManage发起申请Slot的请求之前,会将该待分配Slot请求记录到待分配的Slot列表中,供TaskExecutor提供的Slot绑定分配。在向ResourceManager发起请求时,会对请求因异常而失败进行处理。 4.从Scheduler向SlotPool发起Slot申请的入口是SlotPoolImpl的requestNewAllocatedSlot方法
我们首先看到发起slot请求的入口是SlotPoolImpl的requestNewAllocatedSlot方法,那么我们就从这个方法开始看
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 @Nonnull @Override public CompletableFuture<PhysicalSlot> requestNewAllocatedSlot (@Nonnull SlotRequestId slotRequestId, @Nonnull ResourceProfile resourceProfile, Time timeout) { componentMainThreadExecutor.assertRunningInMainThread(); final PendingRequest pendingRequest = PendingRequest.createStreamingRequest(slotRequestId, resourceProfile); FutureUtils.orTimeout(pendingRequest.getAllocatedSlotFuture(), timeout.toMilliseconds(), TimeUnit.MILLISECONDS, componentMainThreadExecutor) .whenComplete((AllocatedSlot ignored, Throwable throwable) -> { if (throwable instanceof TimeoutException) { timeoutPendingSlotRequest(slotRequestId); } }); return requestNewAllocatedSlotInternal(pendingRequest).thenApply((Function.identity())); }
这里首先是创建了一个流式的结构是流式的待分配slot请求
我们点进去看一下这个请求的格式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 protected static class PendingRequest { private final SlotRequestId slotRequestId; private final ResourceProfile resourceProfile; private final boolean isBatchRequest; private final CompletableFuture<AllocatedSlot> allocatedSlotFuture; private long unfillableSince; private PendingRequest (SlotRequestId slotRequestId, ResourceProfile resourceProfile, boolean isBatchRequest) { this (slotRequestId, resourceProfile, isBatchRequest, new CompletableFuture <>()); }
在这之后呢就是注册申请超时时间,如果申请的时间超过了超时时间就会把这次的待分配slot请求终止掉
最后调用requestNewAllocatedSlotInternal,执行后续向ResourceManager发起Slot申请的请求
这个是主要的发送slot请求的方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @Nonnull private CompletableFuture<AllocatedSlot> requestNewAllocatedSlotInternal (PendingRequest pendingRequest) { if (resourceManagerGateway == null ) { stashRequestWaitingForResourceManager(pendingRequest); } else { requestSlotFromResourceManager(resourceManagerGateway, pendingRequest); } return pendingRequest.getAllocatedSlotFuture(); }
这里分了两种情况,一种是在向resourceManager发起slot申请的请求之前连接上了resourceManager,一种是没有连接上resourceManager
没有连接上resourceManager 没有连接上resourceManager调用的是stashRequestWaitingForResourceManager方法
这里是把slot的请求记录到了等待连接resourceManager的列表中,记录一下,他迟早会连接上resourceManager的
等到连接到resourceManager的时候调用的就是connectToResourceManager方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Override public void connectToResourceManager (@Nonnull ResourceManagerGateway resourceManagerGateway) { this .resourceManagerGateway = checkNotNull(resourceManagerGateway); for (PendingRequest pendingRequest : waitingForResourceManager.values()) { requestSlotFromResourceManager(resourceManagerGateway, pendingRequest); } waitingForResourceManager.clear(); }
这里就是当终于连接到了resourceManager的时候就会遍历我们等待连接时候的请求列表,然后调用requestSlotFromResourceManager处理向resourceManager申请Slot的请求,然后处理了这些申请slot请求后再把等待时候的slot请求列表清空
我们再看一下这个连接上resourceManager的代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 private void requestSlotFromResourceManager (final ResourceManagerGateway resourceManagerGateway, final PendingRequest pendingRequest) { checkNotNull(resourceManagerGateway); checkNotNull(pendingRequest); log.info("Requesting new slot [{}] and profile {} from resource manager." , pendingRequest.getSlotRequestId(), pendingRequest.getResourceProfile()); final AllocationID allocationId = new AllocationID (); pendingRequests.put(pendingRequest.getSlotRequestId(), allocationId, pendingRequest); pendingRequest.getAllocatedSlotFuture().whenComplete((AllocatedSlot allocatedSlot, Throwable throwable) -> { if (throwable != null || !allocationId.equals(allocatedSlot.getAllocationId())) { resourceManagerGateway.cancelSlotRequest(allocationId); } }); CompletableFuture<Acknowledge> rmResponse = resourceManagerGateway.requestSlot(jobMasterId, new SlotRequest (jobId, allocationId, pendingRequest.getResourceProfile(), jobManagerAddress), rpcTimeout ); FutureUtils.whenCompleteAsyncIfNotDone(rmResponse, componentMainThreadExecutor, (Acknowledge ignored, Throwable failure) -> { if (failure != null ) { slotRequestToResourceManagerFailed(pendingRequest.getSlotRequestId(), failure); } }); }
在这里我们终于看到了这个allocationId,这个在resourceManager中出现的这个id是在这里出来的
这个allocationId就是jobMaster向resourceManager申请slot时候分配的id
这个分配的id贯穿了resourceManager和TaskExecutor分配请求
首先将本次slot请求添加到待分配请求列表中
这里在resourceManager中一直说的待分配请求列表在这里出现,表示的是这个jobMaster发起了slot请求,这个请求就需要进入待分配列表中等待分配
接下来是处理异常清空,获取Slot分配完成情况getAllocatedSlotFuture,当分配情况是异常或者本次创建的slot和分配的这个id不一致,就会向resourceManager发起取消这次slot的申请请求
1 2 3 4 5 6 7 8 9 10 11 12 pendingRequest.getAllocatedSlotFuture().whenComplete((AllocatedSlot allocatedSlot, Throwable throwable) -> { if (throwable != null || !allocationId.equals(allocatedSlot.getAllocationId())) { resourceManagerGateway.cancelSlotRequest(allocationId); } });
接下来就是重点了向resourceManager发送申请slot的请求
1 2 3 4 5 6 CompletableFuture<Acknowledge> rmResponse = resourceManagerGateway.requestSlot(jobMasterId, new SlotRequest (jobId, allocationId, pendingRequest.getResourceProfile(), jobManagerAddress), rpcTimeout );
这里我们看到了调用的是requestSlot这个方法
这个方法我们在之前的resourceManager中说过,这里就把jobMaster发起slot请求和resourceManager分配资源这里连接上了
再之后是一个返回异常的处理,我们看一下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 FutureUtils.whenCompleteAsyncIfNotDone(rmResponse, componentMainThreadExecutor, (Acknowledge ignored, Throwable failure) -> { if (failure != null ) { slotRequestToResourceManagerFailed(pendingRequest.getSlotRequestId(), failure); } }); } ------------------------------------------------------------- private void slotRequestToResourceManagerFailed (SlotRequestId slotRequestID, Throwable failure) { final PendingRequest request = pendingRequests.getKeyA(slotRequestID); if (request != null ) { if (isBatchRequestAndFailureCanBeIgnored(request, failure)) { log.debug("Ignoring failed request to the resource manager for a batch slot request." ); } else { pendingRequests.removeKeyA(slotRequestID); request.getAllocatedSlotFuture().completeExceptionally( new NoResourceAvailableException ("No pooled slot available and request to ResourceManager for new slot failed" , failure)); } } else { if (log.isDebugEnabled()) { log.debug("Unregistered slot request [{}] failed." , slotRequestID, failure); } } }
这里的异常逻辑是如果没有请求id就不处理
如果有请求id并且是流式请求,就把待分配的slot请求列表中对于的待分配的slot请求移除并且返回异常
返回的这个异常就是No pooled slot available and request to ResourceManager for new slot failed
这就是我们在作业异常的时候看见的这个日志的报错
连接上resourceManager 如果连接上了resourceManager就直接调用requestSlotFromResourceManager方法
上面就是向resourceManager申请slot的逻辑
接收来自TaskExecutor的slot 这个是JobMaster的重要的第二个功能
接收来自TaskExecutor的Slot
是供SlotPool中待分配的Slot请求或者后续的Slot请求分配的
接收来自TaskExecutor的Slot,是在注册JobMaster成功后,通过调用JobMaster的offerSlots方法实现的。
我们主要看一下offerslots方法,从这个方法开始会接收来自TaskExecutor的slot
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Override public Collection<SlotOffer> offerSlots (TaskManagerLocation taskManagerLocation, TaskManagerGateway taskManagerGateway, Collection<SlotOffer> offers) { ArrayList<SlotOffer> result = new ArrayList <>(offers.size()); for (SlotOffer offer : offers) { if (offerSlot(taskManagerLocation, taskManagerGateway, offer)) { result.add(offer); } } return result; }
这里做了三个事情:
遍历TaskExecutor汇报上来的slot信息列表(offsets),然后调用offerSlot方法,实现将slot添加到slotpool的逻辑
如果提供的slot成功添加到了slotpool中,就将这个slot的信息(slotOffer)添加到成功result列表中
最后把成功放到slotpool的这些slot列表返回给刚才汇报的TaskExecutor
我们看一下这个slot的信息slotOffer里面是什么
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Override public class SlotOffer implements Serializable { private static final long serialVersionUID = -7067814231108250971L ; private AllocationID allocationId; private final int slotIndex; private final ResourceProfile resourceProfile;
这个就是slotOffer里面的属性,我们如何确定每一个slot的唯一性呢,就是使用这个taskManagerLocation+slotOffer来确定唯一性,也就是用这个slot属于哪个taskManager和这个slot的属性来确定唯一性
我们再看一下这里的对每一个slot的逻辑处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 boolean offerSlot (final TaskManagerLocation taskManagerLocation, final TaskManagerGateway taskManagerGateway, final SlotOffer slotOffer) { componentMainThreadExecutor.assertRunningInMainThread(); final ResourceID resourceID = taskManagerLocation.getResourceID(); final AllocationID allocationID = slotOffer.getAllocationId(); if (!registeredTaskManagers.contains(resourceID)) { log.debug("Received outdated slot offering [{}] from unregistered TaskManager: {}" , slotOffer.getAllocationId(), taskManagerLocation); return false ; } AllocatedSlot existingSlot; if ((existingSlot = allocatedSlots.get(allocationID)) != null || (existingSlot = availableSlots.get(allocationID)) != null ) { final SlotID existingSlotId = existingSlot.getSlotId(); final SlotID newSlotId = new SlotID (taskManagerLocation.getResourceID(), slotOffer.getSlotIndex()); if (existingSlotId.equals(newSlotId)) { log.info("Received repeated offer for slot [{}]. Ignoring." , allocationID); return true ; } else { return false ; } } final AllocatedSlot allocatedSlot = new AllocatedSlot (allocationID, taskManagerLocation, slotOffer.getSlotIndex(), slotOffer.getResourceProfile(), taskManagerGateway); PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID); if (pendingRequest != null ) { allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot); if (!pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot)) { allocatedSlots.remove(pendingRequest.getSlotRequestId()); tryFulfillSlotRequestOrMakeAvailable(allocatedSlot); } else { log.debug("Fulfilled slot request [{}] with allocated slot [{}]." , pendingRequest.getSlotRequestId(), allocationID); } } else { tryFulfillSlotRequestOrMakeAvailable(allocatedSlot); } return true ; }
首先看一下这个汇报的TaskExecutor是不是在注册好的TaskExecutor列表中
然后第二步是检查一下这个汇报的allocationId是不是已经汇报过了,避免重复汇报,也就是看一下已分配的slot列表和可用的slot列表中有没有当前汇报的这个slot
如果之前这个allocationId已经汇报过了,就检查一下当时汇报的这个id和现在汇报的这个slotid是不是相同的,如果是相同的也就是重复汇报,没什么问题,因为汇报slot有幂等性,如果之前这个allocationId汇报的slot和当前汇报的slotid不一致的话就说明现在slot汇报因为已经被其他的slot占用了,就返回false
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 final AllocatedSlot allocatedSlot = new AllocatedSlot (allocationID, taskManagerLocation, slotOffer.getSlotIndex(), slotOffer.getResourceProfile(), taskManagerGateway); PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID);if (pendingRequest != null ) { allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot); if (!pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot)) { allocatedSlots.remove(pendingRequest.getSlotRequestId()); tryFulfillSlotRequestOrMakeAvailable(allocatedSlot); } else { log.debug("Fulfilled slot request [{}] with allocated slot [{}]." , pendingRequest.getSlotRequestId(), allocationID); } } else { tryFulfillSlotRequestOrMakeAvailable(allocatedSlot); } return true ;
然后这里的内容是首先根据创建了一个已分配的slot
然后去等待分配slot请求的任务列表中看一看有没有等待分配slot的任务需要这个slot
如果有,我们先把这个slot放到已分配的列表中
再判断是不是已经完成了分配,如果已经完成了分配就是这个slot已经和需要分配的任务绑定了,就把这个slot从已分配的列表中删除
然后执行满足其他待分配的Slot请求和将添加到可用的Slot列表中(availableSlots)运行tryFulfillSlotRequestOrMakeAvailable方法
如果没有完成分配就不做操作,等着完成分配就好
如果待分配的任务里面没有需要这个slot的任务,那么还是运行tryFulfillSlotRequestOrMakeAvailable方法
我们具体看一下tryFulfillSlotRequestOrMakeAvailable这个方法有什么用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private void tryFulfillSlotRequestOrMakeAvailable (AllocatedSlot allocatedSlot) { Preconditions.checkState(!allocatedSlot.isUsed(), "Provided slot is still in use." ); final PendingRequest pendingRequest = pollMatchingPendingRequest(allocatedSlot); if (pendingRequest != null ) { log.debug("Fulfilling pending slot request [{}] early with returned slot [{}]" , pendingRequest.getSlotRequestId(), allocatedSlot.getAllocationId()); allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot); pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot); } else { log.debug("Adding returned slot [{}] to available slots" , allocatedSlot.getAllocationId()); availableSlots.add(allocatedSlot, clock.relativeTimeMillis()); } }
这个方法是这样的,首先去检查一下
然后是调用pollMatchingPendingRequest方法,获取一个PendingRequest,然后看一下获取的是不是空
如果不是null,就放到已分配的slot列表
如果是null就放到可用的slot列表
我们再看一下pollMatchingPendingRequest这个方法是怎么判断,判断什么的呢
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 private PendingRequest pollMatchingPendingRequest (final AllocatedSlot slot) { final ResourceProfile slotResources = slot.getResourceProfile(); for (PendingRequest request : pendingRequests.values()) { if (slotResources.isMatching(request.getResourceProfile())) { pendingRequests.removeKeyA(request.getSlotRequestId()); return request; } } for (PendingRequest request : waitingForResourceManager.values()) { if (slotResources.isMatching(request.getResourceProfile())) { waitingForResourceManager.remove(request.getSlotRequestId()); return request; } } return null ; }
匹配请求的顺序 :首先检查 pendingRequests
中的待处理请求,如果找到一个匹配的请求,则从中移除该请求并返回。若没有匹配,再检查 waitingForResourceManager
中的请求。
资源匹配 :通过 slotResources.isMatching(request.getResourceProfile())
判断资源配置是否符合要求。这个方法可能会比较资源的大小、数量等。
移除已处理请求 :一旦找到了匹配的请求,无论是从 pendingRequests
还是 waitingForResourceManager
中,都将该请求从相应的列表中移除,确保它不会被重复处理。
返回请求 :匹配到请求后,直接返回该请求,表示找到了合适的 Slot 可以分配给这个请求。
无匹配请求 :如果没有找到匹配的请求,则返回 null
,意味着当前没有请求符合这个 Slot 的资源要求。
这里也就是匹配一下有没有待匹配资源的请求需要我们当前的这个slot
这里就是接收来自TaskExecutor的slot的处理