Flink源码解析(十)
2025-01-07 11:26:50 # Flink # 源码解析 # jobMaster发起slot请求

Flink源码解析(十)

这里的内容是关于jobManager和前面总结的resourceManager结合起来

前面我们总结了resourceManager主要就是资源的管理和分配slot的申请

现在我们总结一下JobManager怎么去resourceManager上面申请slot来形成一个闭环

我们先总结一下jobMaster

JobMaster总结

  • JobMaster类继承FencedRpcEndpoint类来实现带Token(Fencing Token)检查的RpcEndpoint
  • JobMaster类实现JobMasterGateway接口,来提供其他组件调用的RPC方法;
  • JobMaster类实现JobMasterService接口,来供JobManagerRunner调用。
  • JobManagerRunner负责JobMaster的创建与启动,以及与JobMaster首领选举相关的处理
  • 在JobMaster组件中,最核心的组件为Scheduler和CheckpointCoordinator。在这里我们先总结为作业的任务调度申请slot的内容,这个检查点的内容先不看后面单独再看
    • 其中Scheduler负责ExecutionGraph的调度
      • JobMaster的Scheduler的一个核心逻辑是为作业的任务调度申请Slot
    • 而CheckpointCoordinator负责作业检查点的协调
  • JobMaste是申请Slot的流程的发起方,其中的SlotPool作为作业执行图在调度时提供Slot功能以及对Slot的生命周期管理,与作业一一对应(一个作业有一个SlotPool实例),其实现类为SlotPoolImpl
    • private final SlotPool slotPool

通过上面的总结内容我们知道了这个入口是slotPoolImpl,我们从这里入手

slotpool在申请slot的流程中分两部分的处理逻辑:

  • 发起slot请求
  • 接收来自TaskEecutor的slot

发起slot请求

我们先总结一下整个流程,带着流程去看代码会更加的清晰

1
2
3
4
5
6
当Scheduler调度作业的任务,需要分配Slot,但在SlotPool中没有匹配的空闲Slot时,会发起Slot请求。在JobMaster中,往SlotPool中发起Slot申请请求的处理过程如下

1)查看SlotPool的空闲列表中是否存在与任务所需的资源规格相匹配的空闲Slot,存在即返回,无则存在可能触发SlotPool发起申请新Slot的逻辑。
2)申请新的Slot。判断JobMaster是否已与ResourceManager连接:如未连接,则将申请Slot的请求记录下来,等待与ResourceManager连接后再向ResourceManager发起申请Slot的请求;如已连接,则直接向ResourceManager发起申请Slot的请求。
3)在向ResourceManage发起申请Slot的请求之前,会将该待分配Slot请求记录到待分配的Slot列表中,供TaskExecutor提供的Slot绑定分配。在向ResourceManager发起请求时,会对请求因异常而失败进行处理。
4.从Scheduler向SlotPool发起Slot申请的入口是SlotPoolImpl的requestNewAllocatedSlot方法

我们首先看到发起slot请求的入口是SlotPoolImpl的requestNewAllocatedSlot方法,那么我们就从这个方法开始看

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/**
* Scheduler向SlotPool发起Slot申请
* */
@Nonnull
@Override
public CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(@Nonnull SlotRequestId slotRequestId, @Nonnull ResourceProfile resourceProfile,
Time timeout) {

/**
* 先检查是否处理消息的主线程,以防止多线程访问
* */
componentMainThreadExecutor.assertRunningInMainThread();

/**
*
* 创建流式的待分配Slot请求
* 进入createStreamingRequest
* private PendingRequest(SlotRequestId slotRequestId, ResourceProfile resourceProfile, boolean isBatchRequest,
* CompletableFuture<AllocatedSlot> allocatedSlotFuture) {
* this.slotRequestId = 作为本次Slot请求的唯一标志
* this.resourceProfile = SlotRequest需要的资源规格情况
* this.isBatchRequest = 区分批式还是流式的待分配Slot请求
* this.allocatedSlotFuture = 用来表示Slot分配完成情况
* this.unfillableSince = 用于批式的pendingRequest,用与判断批式的Slot申请分配超时
* */
final PendingRequest pendingRequest = PendingRequest.createStreamingRequest(slotRequestId, resourceProfile);

/**
* 注册申请超时时间
* 如果申请超过超时时间,会将本次待分配的slot请求以超时原因中止,
* 其中超时时间slot.request.timeout配置对应的值,默认时间为300秒
* */
FutureUtils.orTimeout(pendingRequest.getAllocatedSlotFuture(), timeout.toMilliseconds(), TimeUnit.MILLISECONDS, componentMainThreadExecutor)
.whenComplete((AllocatedSlot ignored, Throwable throwable) -> {
if(throwable instanceof TimeoutException) {
timeoutPendingSlotRequest(slotRequestId);
}
});

/**
* 调用requestNewAllocatedSlotInternal,执行后续向ResourceManager发起Slot申请的请求
* */
return requestNewAllocatedSlotInternal(pendingRequest).thenApply((Function.identity()));
}

这里首先是创建了一个流式的结构是流式的待分配slot请求

我们点进去看一下这个请求的格式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

/**
*
* 创建流式的待分配Slot请求
* 进入createStreamingRequest
* private PendingRequest(SlotRequestId slotRequestId, ResourceProfile resourceProfile, boolean isBatchRequest,
* CompletableFuture<AllocatedSlot> allocatedSlotFuture) {
* this.slotRequestId = 作为本次Slot请求的唯一标志
* this.resourceProfile = SlotRequest需要的资源规格情况
* this.isBatchRequest = 区分批式还是流式的待分配Slot请求
* this.allocatedSlotFuture = 用来表示Slot分配完成情况
* this.unfillableSince = 用于批式的pendingRequest,用与判断批式的Slot申请分配超时
* */

protected static class PendingRequest {

private final SlotRequestId slotRequestId;

private final ResourceProfile resourceProfile;

private final boolean isBatchRequest;

private final CompletableFuture<AllocatedSlot> allocatedSlotFuture;

private long unfillableSince;

private PendingRequest(SlotRequestId slotRequestId, ResourceProfile resourceProfile, boolean isBatchRequest) {
this(slotRequestId, resourceProfile, isBatchRequest, new CompletableFuture<>());
}

在这之后呢就是注册申请超时时间,如果申请的时间超过了超时时间就会把这次的待分配slot请求终止掉

最后调用requestNewAllocatedSlotInternal,执行后续向ResourceManager发起Slot申请的请求

这个是主要的发送slot请求的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* 向ResourceManager发起Slot申请的请求
*/
@Nonnull
private CompletableFuture<AllocatedSlot> requestNewAllocatedSlotInternal(PendingRequest pendingRequest) {

/**
* 在向ResourceManager发起Slot申请的请求之前,先判断是否连接上了ResourceManager
* */
if(resourceManagerGateway == null) {
/**
* 没有连接上ResourceManager
* 将Slot请求记录到等待连接上ResourceManager的请求列表中(waitingForResourceManager)
* 防止未连接上ResourceManager,Slot请求丢失。
* */
stashRequestWaitingForResourceManager(pendingRequest);
} else {
/**
* 连接上ResourceManager
* 调用requestSlotFromResourceManager方法申请Slot的请求
* */
requestSlotFromResourceManager(resourceManagerGateway, pendingRequest);
}
return pendingRequest.getAllocatedSlotFuture();
}

这里分了两种情况,一种是在向resourceManager发起slot申请的请求之前连接上了resourceManager,一种是没有连接上resourceManager

没有连接上resourceManager

没有连接上resourceManager调用的是stashRequestWaitingForResourceManager方法

这里是把slot的请求记录到了等待连接resourceManager的列表中,记录一下,他迟早会连接上resourceManager的

等到连接到resourceManager的时候调用的就是connectToResourceManager方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* 对于原来向ResourceManager发起但因未连接上ResourceManager而中止的申请Slot的请求
* 当JobMaster连接上ResourceManager时,会调用SlotPoolImpl的connectToResourceManager方法
*
* 处理逻辑:
*
* */
@Override
public void connectToResourceManager(@Nonnull ResourceManagerGateway resourceManagerGateway) {
//检查连接
this.resourceManagerGateway = checkNotNull(resourceManagerGateway);

/**
* 遍历保存在等待resourceManager连接上的Slot请求列表
* 然后调用requestSlotFromResourceManager处理向resourceManager申请Slot的请求
* */
for(PendingRequest pendingRequest : waitingForResourceManager.values()) {
requestSlotFromResourceManager(resourceManagerGateway, pendingRequest);
}
// 最后将等待resourceManager连接上的Slot请求列表(waitingForResourceManager)清空
waitingForResourceManager.clear();
}

这里就是当终于连接到了resourceManager的时候就会遍历我们等待连接时候的请求列表,然后调用requestSlotFromResourceManager处理向resourceManager申请Slot的请求,然后处理了这些申请slot请求后再把等待时候的slot请求列表清空

我们再看一下这个连接上resourceManager的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* 连接上ResourceManager
* 调用requestSlotFromResourceManager方法申请Slot的请求
* */
private void requestSlotFromResourceManager(final ResourceManagerGateway resourceManagerGateway, final PendingRequest pendingRequest) {

//判断
checkNotNull(resourceManagerGateway);
checkNotNull(pendingRequest);

log.info("Requesting new slot [{}] and profile {} from resource manager.", pendingRequest.getSlotRequestId(), pendingRequest.getResourceProfile());

//1.创建分配allocationId,该分配ID贯穿ResourceManager和TaskExecutor分配请求
final AllocationID allocationId = new AllocationID();

//2.将本次Slot请求添加到待分配请求列表(pendingRequests)
pendingRequests.put(pendingRequest.getSlotRequestId(), allocationId, pendingRequest);

/**
* 3.监控本次Slot申请的分配完成清空
* 获取pendingRequest的AllocatedSlotFuture来处理异常分配的情况。当以分配异常返回,
* 或者已经完成分配Slot的分配ID,与本次创建的Slot的分配ID不一致,向resourceManager
* 发起取消本次Slot申请请求。
* */
pendingRequest.getAllocatedSlotFuture().whenComplete((AllocatedSlot allocatedSlot, Throwable throwable) -> {
if(throwable != null || !allocationId.equals(allocatedSlot.getAllocationId())) {
//申请Slot失败或者申请的Slot已经被占用时,就会触发取消Slot请求的逻辑
resourceManagerGateway.cancelSlotRequest(allocationId);
}
});

/**
* 4.向resourceManager发送申请Slot的请求
*/
CompletableFuture<Acknowledge> rmResponse = resourceManagerGateway.requestSlot(jobMasterId,
new SlotRequest(jobId, allocationId, pendingRequest.getResourceProfile(), jobManagerAddress), rpcTimeout
);

FutureUtils.whenCompleteAsyncIfNotDone(rmResponse, componentMainThreadExecutor, (Acknowledge ignored, Throwable failure) -> {
// on failure, fail the request future
if(failure != null) {
//当请求返回异常时执行向ResourceManger申请Slot失败的逻辑
slotRequestToResourceManagerFailed(pendingRequest.getSlotRequestId(), failure);
}
});
}

在这里我们终于看到了这个allocationId,这个在resourceManager中出现的这个id是在这里出来的

这个allocationId就是jobMaster向resourceManager申请slot时候分配的id

这个分配的id贯穿了resourceManager和TaskExecutor分配请求

首先将本次slot请求添加到待分配请求列表中

这里在resourceManager中一直说的待分配请求列表在这里出现,表示的是这个jobMaster发起了slot请求,这个请求就需要进入待分配列表中等待分配

接下来是处理异常清空,获取Slot分配完成情况getAllocatedSlotFuture,当分配情况是异常或者本次创建的slot和分配的这个id不一致,就会向resourceManager发起取消这次slot的申请请求

1
2
3
4
5
6
7
8
9
10
11
12
/**
* 3.监控本次Slot申请的分配完成清空
* 获取pendingRequest的AllocatedSlotFuture来处理异常分配的情况。当以分配异常返回,
* 或者已经完成分配Slot的分配ID,与本次创建的Slot的分配ID不一致,向resourceManager
* 发起取消本次Slot申请请求。
* */
pendingRequest.getAllocatedSlotFuture().whenComplete((AllocatedSlot allocatedSlot, Throwable throwable) -> {
if(throwable != null || !allocationId.equals(allocatedSlot.getAllocationId())) {
//申请Slot失败或者申请的Slot已经被占用时,就会触发取消Slot请求的逻辑
resourceManagerGateway.cancelSlotRequest(allocationId);
}
});

接下来就是重点了向resourceManager发送申请slot的请求

1
2
3
4
5
6
/**
* 4.向resourceManager发送申请Slot的请求
*/
CompletableFuture<Acknowledge> rmResponse = resourceManagerGateway.requestSlot(jobMasterId,
new SlotRequest(jobId, allocationId, pendingRequest.getResourceProfile(), jobManagerAddress), rpcTimeout
);

这里我们看到了调用的是requestSlot这个方法

这个方法我们在之前的resourceManager中说过,这里就把jobMaster发起slot请求和resourceManager分配资源这里连接上了

再之后是一个返回异常的处理,我们看一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
    FutureUtils.whenCompleteAsyncIfNotDone(rmResponse, componentMainThreadExecutor, (Acknowledge ignored, Throwable failure) -> {
// on failure, fail the request future
if(failure != null) {
//当请求返回异常时执行向ResourceManger申请Slot失败的逻辑
slotRequestToResourceManagerFailed(pendingRequest.getSlotRequestId(), failure);
}
});
}

-------------------------------------------------------------

/**
* ResourceManger申请Slot失败的逻辑处理:
* 判断在待分配的Slot请求中是否有对应的待分配的Slot请求,通过requestId来进行判断,如果没有,不做处理;
* */
private void slotRequestToResourceManagerFailed(SlotRequestId slotRequestID, Throwable failure) {
final PendingRequest request = pendingRequests.getKeyA(slotRequestID);
if(request != null) {
if(isBatchRequestAndFailureCanBeIgnored(request, failure)) {
//如果没有,不做处理;
log.debug("Ignoring failed request to the resource manager for a batch slot request.");
} else {
//如果有,且是流式请求,则将待分配的Slot请求列表(pendingRequests)中对应的待分配的Slot请求移除
//并将对应的Slot请求的分配情况以异常返回
//作业调度中遇到过的问题:No pooled slot available and request to ResourceManager for new slot failed
pendingRequests.removeKeyA(slotRequestID);
request.getAllocatedSlotFuture().completeExceptionally(
new NoResourceAvailableException("No pooled slot available and request to ResourceManager for new slot failed", failure));
}
} else {
if(log.isDebugEnabled()) {
log.debug("Unregistered slot request [{}] failed.", slotRequestID, failure);
}
}
}

这里的异常逻辑是如果没有请求id就不处理

如果有请求id并且是流式请求,就把待分配的slot请求列表中对于的待分配的slot请求移除并且返回异常

返回的这个异常就是No pooled slot available and request to ResourceManager for new slot failed

这就是我们在作业异常的时候看见的这个日志的报错

连接上resourceManager

如果连接上了resourceManager就直接调用requestSlotFromResourceManager方法

上面就是向resourceManager申请slot的逻辑

接收来自TaskExecutor的slot

这个是JobMaster的重要的第二个功能

接收来自TaskExecutor的Slot

  1. 是供SlotPool中待分配的Slot请求或者后续的Slot请求分配的

  2. 接收来自TaskExecutor的Slot,是在注册JobMaster成功后,通过调用JobMaster的offerSlots方法实现的。

我们主要看一下offerslots方法,从这个方法开始会接收来自TaskExecutor的slot

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* 处理TaskExecutor汇报上来的所有slot的逻辑:
* 1. 遍历TaskExecutor汇报上来的slot信息列表(offers),调用offerSlot方法, 实现将Slot添加到SlotPool的逻辑
* 2. 如果提供的Slot添加到SlotPool成功,将其Slot的信息(SlotOffer)添加到成功result列表中
* 3. 最后将result列表返回给汇报的TaskExecutor
*
* SlotOffer:
* allocationId:分配的ID
* slotIndex:TaskExecutor的Slot中的下标
* resourceProfile:Slot对应的资源规格
* 如何确定唯一的Slot: taskManagerLocation + SlotOffer确定Slot的唯一性
* */
@Override
public Collection<SlotOffer> offerSlots(TaskManagerLocation taskManagerLocation, TaskManagerGateway taskManagerGateway, Collection<SlotOffer> offers) {
ArrayList<SlotOffer> result = new ArrayList<>(offers.size());
for(SlotOffer offer : offers) {
if(offerSlot(taskManagerLocation, taskManagerGateway, offer)) {
result.add(offer);
}
}
return result;
}

这里做了三个事情:

  1. 遍历TaskExecutor汇报上来的slot信息列表(offsets),然后调用offerSlot方法,实现将slot添加到slotpool的逻辑
  2. 如果提供的slot成功添加到了slotpool中,就将这个slot的信息(slotOffer)添加到成功result列表中
  3. 最后把成功放到slotpool的这些slot列表返回给刚才汇报的TaskExecutor

我们看一下这个slot的信息slotOffer里面是什么

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
	/* * 	SlotOffer:
* allocationId:分配的ID
* slotIndex:TaskExecutor的Slot中的下标
* resourceProfile:Slot对应的资源规格
* 如何确定唯一的Slot: taskManagerLocation + SlotOffer确定Slot的唯一性
* */
@Override

public class SlotOffer implements Serializable {

private static final long serialVersionUID = -7067814231108250971L;

/** Allocation id of this slot, this would be the only identifier for this slot offer */
private AllocationID allocationId;

/** Index of the offered slot */
private final int slotIndex;

/** The resource profile of the offered slot */
private final ResourceProfile resourceProfile;

这个就是slotOffer里面的属性,我们如何确定每一个slot的唯一性呢,就是使用这个taskManagerLocation+slotOffer来确定唯一性,也就是用这个slot属于哪个taskManager和这个slot的属性来确定唯一性

我们再看一下这里的对每一个slot的逻辑处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
/**
* TaskExecutor汇报上来的每个Slot的逻辑处理:
*/
boolean offerSlot(final TaskManagerLocation taskManagerLocation, final TaskManagerGateway taskManagerGateway, final SlotOffer slotOffer) {

componentMainThreadExecutor.assertRunningInMainThread();

// check if this TaskManager is valid
final ResourceID resourceID = taskManagerLocation.getResourceID();
final AllocationID allocationID = slotOffer.getAllocationId();

//1. 检查汇报的TaskExecutor是否在注册的TaskExecutor列表中,如果不在,则Slot提供失败
if(!registeredTaskManagers.contains(resourceID)) {
log.debug("Received outdated slot offering [{}] from unregistered TaskManager: {}", slotOffer.getAllocationId(), taskManagerLocation);
return false;
}

// check whether we have already using this slot
AllocatedSlot existingSlot;
/**
* 2. 检查该Slot的分配ID是否已经汇报过:
* 检查汇报Slot的分配ID是否存在于已经添加到SlotPool的Slot列表中,(包括了allocatedSlots(已分配的slot列表),availableSlots(可用的Slot列表))
* */
if((existingSlot = allocatedSlots.get(allocationID)) != null || (existingSlot = availableSlots.get(allocationID)) != null) {

// we need to figure out if this is a repeated offer for the exact same slot,
// or another offer that comes from a different TaskManager after the ResourceManager
// re-tried the request

// we write this in terms of comparing slot IDs, because the Slot IDs are the identifiers of
// the actual slots on the TaskManagers
// Note: The slotOffer should have the SlotID
final SlotID existingSlotId = existingSlot.getSlotId();
final SlotID newSlotId = new SlotID(taskManagerLocation.getResourceID(), slotOffer.getSlotIndex());

//如果存在,检查SlotPool列表中分配的ID,与汇报的SlotID是否一致
//如果是,则返回该Slot,汇报成功,即重复汇报(汇报Slot具有幂等性)
if(existingSlotId.equals(newSlotId)) {
log.info("Received repeated offer for slot [{}]. Ignoring.", allocationID);

// return true here so that the sender will get a positive acknowledgement to the retry
// and mark the offering as a success
return true;
} else {
//否则该Slot汇报因分配ID已经被其他的Slot占用而失败
// the allocation has been fulfilled by another slot, reject the offer so the task executor
// will offer the slot to the resource manager
return false;
}
}

//创建已分配Slot
final AllocatedSlot allocatedSlot = new AllocatedSlot(allocationID, taskManagerLocation, slotOffer.getSlotIndex(), slotOffer.getResourceProfile(),
taskManagerGateway);

// 从待分配的Slot请求列表匹配同样分配ID的Slot请求
PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID);
if(pendingRequest != null) {
// we were waiting for this!
allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot);
//添加到已经分配列表中,然后再判断该Slot请求是否已经完成分配
if(!pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot)) {
//如果该请求完成分配,将已经添加到allocatedSlots中的已分配移除
allocatedSlots.remove(pendingRequest.getSlotRequestId());
//执行满足其他待分配的Slot请求和将添加到可用的Slot列表中(availableSlots)
tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
} else {
//如果该请求未完成分配,不做任何操作
log.debug("Fulfilled slot request [{}] with allocated slot [{}].", pendingRequest.getSlotRequestId(), allocationID);
}
} else {

// we were actually not waiting for this:
// - could be that this request had been fulfilled
// - we are receiving the slots from TaskManagers after becoming leaders
////执行满足其他待分配的Slot请求和将添加到可用的Slot列表中(availableSlots)
tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
}

// we accepted the request in any case. slot will be released after it idled for
// too long and timed out
return true;
}

首先看一下这个汇报的TaskExecutor是不是在注册好的TaskExecutor列表中

然后第二步是检查一下这个汇报的allocationId是不是已经汇报过了,避免重复汇报,也就是看一下已分配的slot列表和可用的slot列表中有没有当前汇报的这个slot

如果之前这个allocationId已经汇报过了,就检查一下当时汇报的这个id和现在汇报的这个slotid是不是相同的,如果是相同的也就是重复汇报,没什么问题,因为汇报slot有幂等性,如果之前这个allocationId汇报的slot和当前汇报的slotid不一致的话就说明现在slot汇报因为已经被其他的slot占用了,就返回false

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//创建已分配Slot
final AllocatedSlot allocatedSlot = new AllocatedSlot(allocationID, taskManagerLocation, slotOffer.getSlotIndex(), slotOffer.getResourceProfile(),
taskManagerGateway);

// 从待分配的Slot请求列表匹配同样分配ID的Slot请求
PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID);
if(pendingRequest != null) {
// we were waiting for this!
allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot);
//添加到已经分配列表中,然后再判断该Slot请求是否已经完成分配
if(!pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot)) {
//如果该请求完成分配,将已经添加到allocatedSlots中的已分配移除
allocatedSlots.remove(pendingRequest.getSlotRequestId());
//执行满足其他待分配的Slot请求和将添加到可用的Slot列表中(availableSlots)
tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
} else {
//如果该请求未完成分配,不做任何操作
log.debug("Fulfilled slot request [{}] with allocated slot [{}].", pendingRequest.getSlotRequestId(), allocationID);
}
} else {

// we were actually not waiting for this:
// - could be that this request had been fulfilled
// - we are receiving the slots from TaskManagers after becoming leaders
////执行满足其他待分配的Slot请求和将添加到可用的Slot列表中(availableSlots)
tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
}

// we accepted the request in any case. slot will be released after it idled for
// too long and timed out
return true;

然后这里的内容是首先根据创建了一个已分配的slot

然后去等待分配slot请求的任务列表中看一看有没有等待分配slot的任务需要这个slot

如果有,我们先把这个slot放到已分配的列表中

再判断是不是已经完成了分配,如果已经完成了分配就是这个slot已经和需要分配的任务绑定了,就把这个slot从已分配的列表中删除

然后执行满足其他待分配的Slot请求和将添加到可用的Slot列表中(availableSlots)运行tryFulfillSlotRequestOrMakeAvailable方法

如果没有完成分配就不做操作,等着完成分配就好

如果待分配的任务里面没有需要这个slot的任务,那么还是运行tryFulfillSlotRequestOrMakeAvailable方法

我们具体看一下tryFulfillSlotRequestOrMakeAvailable这个方法有什么用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private void tryFulfillSlotRequestOrMakeAvailable(AllocatedSlot allocatedSlot) {
Preconditions.checkState(!allocatedSlot.isUsed(), "Provided slot is still in use.");

final PendingRequest pendingRequest = pollMatchingPendingRequest(allocatedSlot);

if(pendingRequest != null) {
log.debug("Fulfilling pending slot request [{}] early with returned slot [{}]", pendingRequest.getSlotRequestId(), allocatedSlot.getAllocationId());

allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot);
pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot);
} else {
log.debug("Adding returned slot [{}] to available slots", allocatedSlot.getAllocationId());
availableSlots.add(allocatedSlot, clock.relativeTimeMillis());
}
}

这个方法是这样的,首先去检查一下

然后是调用pollMatchingPendingRequest方法,获取一个PendingRequest,然后看一下获取的是不是空

如果不是null,就放到已分配的slot列表

如果是null就放到可用的slot列表

我们再看一下pollMatchingPendingRequest这个方法是怎么判断,判断什么的呢

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private PendingRequest pollMatchingPendingRequest(final AllocatedSlot slot) {
final ResourceProfile slotResources = slot.getResourceProfile();

// try the requests sent to the resource manager first
for (PendingRequest request : pendingRequests.values()) {
if (slotResources.isMatching(request.getResourceProfile())) {
pendingRequests.removeKeyA(request.getSlotRequestId());
return request;
}
}

// try the requests waiting for a resource manager connection next
for (PendingRequest request : waitingForResourceManager.values()) {
if (slotResources.isMatching(request.getResourceProfile())) {
waitingForResourceManager.remove(request.getSlotRequestId());
return request;
}
}

// no request pending, or no request matches
return null;
}

匹配请求的顺序:首先检查 pendingRequests 中的待处理请求,如果找到一个匹配的请求,则从中移除该请求并返回。若没有匹配,再检查 waitingForResourceManager 中的请求。

资源匹配:通过 slotResources.isMatching(request.getResourceProfile()) 判断资源配置是否符合要求。这个方法可能会比较资源的大小、数量等。

移除已处理请求:一旦找到了匹配的请求,无论是从 pendingRequests 还是 waitingForResourceManager 中,都将该请求从相应的列表中移除,确保它不会被重复处理。

返回请求:匹配到请求后,直接返回该请求,表示找到了合适的 Slot 可以分配给这个请求。

无匹配请求:如果没有找到匹配的请求,则返回 null,意味着当前没有请求符合这个 Slot 的资源要求。

这里也就是匹配一下有没有待匹配资源的请求需要我们当前的这个slot

这里就是接收来自TaskExecutor的slot的处理