Flink源码解析(九)
2025-01-05 17:35:28 # Flink # 源码解析 # ResourceManager,JobMaster和TaskExecutor之间的重要流程 —— 申请slot

ResourceManager,JobMaster和TaskExecutor之间的重要流程 —— 申请slot

申请slot流程图

申请slot的过程:

  1. JobMaster会根据Task的调度按需向ResourceManager发出申请slot的请求
  2. ResourceManager会根据自身注册TaskManager的slot空闲情况处理,TaskManager的空闲slot资源够的情况下,就直接往对应的TaskManager发起申请占有slot的请求;不够的话就首先会从各种部署模式(Standalone/k8s/Yarn)申请TaskManager
  3. 各种部署模式会根据ResourceManager申请TaskManager的资源规格分配并且启动TaskManager
  4. 启动的TaskManager会注册到ResourceManager中,注册成功后,TaskManager会汇报自身slot的情况
  5. ResourceManager会根据TaskManager汇报的slot情况向TaskManager申请slot
  6. TaskManager根据申请占用的slot信息中的作业信息注册对应的jobMaster,并将slot提供给jobMaster调用分配task

对resourceManager的总结

之前是对启动流程,现在是对组件的总结

我们最开始见到resourceManager是在工厂类里面

1
2
3
4
resourceManager = resourceManagerFactory
.createResourceManager(configuration, ResourceID.generate(), rpcService, highAvailabilityServices, heartbeatServices,
fatalErrorHandler, new ClusterInformation(hostname, blobServer.getPort()), webMonitorEndpoint.getRestBaseUrl(), metricRegistry,
hostname);

我们可以看一下这个resourceManager的体系图

他继承了和Dispatcher一样的接口和类

slot过程解析

首先我们来到resourceManager中,找一下这个里面有没有slotManager

1
private final SlotManager slotManager;

发现有slotManager

这里的slotManager是ResourceManager的重要属性,用于维护和注册来自TaskManager的Slot并处理来自JobMaster的Slot申请,其中的SlotManager服务的实现类是SlotManagerImpl。SlotManager处理来自所有JobManager的Slot申请,其处理过程分为两部分:

  1. 申请资源slot
  2. 处理TaskManager的注册slot

这里我们首先看一下ResourceManager里面的这个requestslot这个方法,这里就是申请slot资源的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Override
public CompletableFuture<Acknowledge> requestSlot(JobMasterId jobMasterId, SlotRequest slotRequest, final Time timeout) {

/*************************************************
*
* 注释: 先获取 JobID, SlotRequest 中携带 JobID
* 判断该 Job 是否已经注册过。
*/
JobID jobId = slotRequest.getJobId();
JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.get(jobId);

if(null != jobManagerRegistration) {

// 注释: 判断申请 slot 的 JobMaster 和 注册的 Job 的 Master 地址是否一样
// 注释: 如果不一样,则放弃。防止因为 JobMaster 迁移导致申请了双倍的slot导致资源浪费
if(Objects.equals(jobMasterId, jobManagerRegistration.getJobMasterId())) {
log.info("Request slot with profile {} for job {} with allocation id {}.", slotRequest.getResourceProfile(), slotRequest.getJobId(),
slotRequest.getAllocationId());

try {
/*************************************************
*
* 注释: 调用 SlotManagerImpl 来申请 slot
*/
slotManager.registerSlotRequest(slotRequest);

} catch(ResourceManagerException e) {
return FutureUtils.completedExceptionally(e);
}
return CompletableFuture.completedFuture(Acknowledge.get());
} else {
return FutureUtils.completedExceptionally(new ResourceManagerException(
"The job leader's id " + jobManagerRegistration.getJobMasterId() + " does not match the received id " + jobMasterId + '.'));
}
} else {
return FutureUtils.completedExceptionally(new ResourceManagerException("Could not find registered job manager for job " + jobId + '.'));
}
}

这里首先判断一下现在申请slot的这个jobmaster和之前注册这个job的jobMaster地址是否相同,如果不相同说明这个job换了个jobmaster,会造成获取两份资源导致浪费。然后我们看一下主要的方法slotManager.registerSlotRequest(slotRequest);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Override
public boolean registerSlotRequest(SlotRequest slotRequest) throws ResourceManagerException {

// 检测SlotManager是否已经启动(通过检查started属性)
checkInit();

/**
* 是否重复提交过:
* 检查待分配或者已经完成和活跃的Slot申请Map中是否存在该Slot的AllocationID。
* 申请Slot的AllocationID是在JobMaster组件中就产生的,是唯一确定的。
* */

//检查申请的Slot是否重复提交过
if(checkDuplicateRequest(slotRequest.getAllocationId())) {
LOG.debug("Ignoring a duplicate slot request with allocation id {}.", slotRequest.getAllocationId());
return false;
} else {
PendingSlotRequest pendingSlotRequest = new PendingSlotRequest(slotRequest);
pendingSlotRequests.put(slotRequest.getAllocationId(), pendingSlotRequest);
try {

/**
* 在检测完Slot申请的有效性后,
* 会通过internalRequestSlot方法执行Slot和待分配请求匹配的逻辑。
* internalRequestSlot:完成资源的匹配过程
*/
internalRequestSlot(pendingSlotRequest);
} catch(ResourceManagerException e) {
// requesting the slot failed --> remove pending slot request
pendingSlotRequests.remove(slotRequest.getAllocationId());
throw new ResourceManagerException("Could not fulfill slot request " + slotRequest.getAllocationId() + '.', e);
}
return true;
}
}

前面都是监测这个请求是否重复,看后面的internalRequestSlot(pendingSlotRequest);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private void internalRequestSlot(PendingSlotRequest pendingSlotRequest) throws ResourceManagerException {
final ResourceProfile resourceProfile = pendingSlotRequest.getResourceProfile();
/**
* 先根据申请的Slot的资源规格匹配SlotManager的空闲Slot列表,匹配上空闲Slot,
* 则完成该空闲Slot的分配过程,否则匹配待分配的申请请求和申请TaskManager。
* 进入findMatchingSlot 查看匹配策略
*
* 匹配完成后:进入allocateSlot
* 对于已经匹配的Slot,调用执行分配Slot的过程,即完成TaskManager的Slot与JobMaster的绑定
*/
OptionalConsumer.of(findMatchingSlot(resourceProfile)).ifPresent(
//对于已经匹配的Slot,调用执行分配Slot的过程,即完成TaskManager的Slot与JobMaster的绑定
taskManagerSlot -> allocateSlot(taskManagerSlot, pendingSlotRequest)
).ifNotPresent(
//没有匹配到可以的Slot
() -> fulfillPendingSlotRequestWithPendingTaskManagerSlot(pendingSlotRequest)
);
}

这里是一个判断,判断是否有可用的插槽,如果有就向taskmanager申请slot,如果没有就使用待处理的任务管理器插槽满足待处理的插槽请求,也就是有匹配成功和没有匹配成功两种情况

我们首先看一下这里的判断findMatchingSlot

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* Slot匹配过程是根据Slot匹配策略(slotMatchingStrategy)从SlotManager注册的空闲TaskManagerSlot列表中挑选符合条件的TaskManagerSlot。
* 其TaskManagerSlot记录Slot在TaskManager的地址,在TaskManager的SlotTable中的下标,
* Slot在TaskManager上的占用情况以及在ResourceManager的状态。(TaskManagerSlot的状态转换后面会详细介绍。)
* slotMatchingStrategy对应类的实现有两种:
* 分别是AnyMatchingSlotMatchingStrategy和
* LeastUtilizationSlotMatchingStrategy。
* AnyMatchingSlotMatchingStrategy的挑选策略
* 是从空闲的TaskManagerSlot中任意挑选一个,这是默认的挑选策略;
* 而LeastUtilizationSlotMatchingStrategy是在空闲的TaskManagerSlot中挑选空闲Slot数量最多的
* TaskManager的TaskManagerSlot,该策略需要将参数cluster.evenly-spread-out-slots的值设置为true才会生效。
*/
private Optional<TaskManagerSlot> findMatchingSlot(ResourceProfile requestResourceProfile) {
final Optional<TaskManagerSlot> optionalMatchingSlot = slotMatchingStrategy
.findMatchingSlot(requestResourceProfile, freeSlots.values(), this::getNumberRegisteredSlotsOf);
optionalMatchingSlot.ifPresent(taskManagerSlot -> {
Preconditions.checkState(
//判断 TaskManagerSlot.State.FREE必须为空闲的
taskManagerSlot.getState() == TaskManagerSlot.State.FREE,
"TaskManagerSlot %s is not in state FREE but %s.",
taskManagerSlot.getSlotId(),
taskManagerSlot.getState());
//把TaskManagerSlot.State为空闲的Slot从资源中移除
freeSlots.remove(taskManagerSlot.getSlotId());
});

return optionalMatchingSlot;
}

这里是返回标识如果找到了匹配的slot就返回,否则就返回empty

主要是从空闲的slot中找到一个匹配到的taskmanagerslot

如果找到了匹配的slot并且这个slot是空闲的,就把这个slot从空闲的slot列表中删除

如果没有找到匹配的slot就返回一个空的empty

lotMatchingStrategy对应类的实现有两种:

分别是AnyMatchingSlotMatchingStrategy和LeastUtilizationSlotMatchingStrategy

AnyMatchingSlotMatchingStrategy的挑选策略

是从空闲的TaskManagerSlot中任意挑选一个,这是默认的挑选策略;

而LeastUtilizationSlotMatchingStrategy是在空闲的TaskManagerSlot中挑选空闲Slot数量最多的

这两个策略就是slot的匹配策略的核心

然后我们回去再看,匹配到了slot后就需要进入执行分配slot的过程了,完成taskManager和JobMaster的绑定

1
taskManagerSlot -> allocateSlot(taskManagerSlot, pendingSlotRequest)

这里是对于匹配到的slot,调用执行分配slot,完成taskManager的slot和这个jobMaster的绑定

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
	/**
* 1.分配Slot的过程是先将空闲的TaskManagerSlot的状态(FREE)标记为待分配状态(PENDING)
* 2.再绑定对应待分配的slot申请请求(pendingRequest)
* 3.通过TaskManagerSlot中taskManager的信息taskExecutor请求异步占有对应的Slot。然后对返回的
* 结果进行处理。
* 返回的结果对象acknowledge有三种情况
* 1.acknowledge != null
* 当确认返回的acknowledge不为空,会更新TaskManagerSlot的状态,有待分配转变为分配状态(ALLOCATED)
* 2.throwable instanceof SlotOccupiedException
* 表示TaskExecutor的slot已经被其他的slot请求占用了,会拒绝此次绑定的slot申请pendingSlotRequest的请求
* 将TaskManagerSlot的状态从待分配(PENDING)修改已经分配状态(ALLOCATED)
* 并绑定返回已占用的allocationId来完成与TaskExecutor Slot占用情况的映射。
* 3. throwable instanceof CancellationException
* 执行handleFailedSlotRequest(slotId, allocationId, throwable);
* 会移除TaskManagerSlot与待分配的slot申请 pendingSlotRequest的绑定关系.
*/
private void allocateSlot(TaskManagerSlot taskManagerSlot, PendingSlotRequest pendingSlotRequest) {
//要分配的Slot是否是空闲的
Preconditions.checkState(taskManagerSlot.getState() == TaskManagerSlot.State.FREE);
//获得与taskExecutor的连接
TaskExecutorConnection taskExecutorConnection = taskManagerSlot.getTaskManagerConnection();
//获取gateway,要远程调用TaskExecutor
TaskExecutorGateway gateway = taskExecutorConnection.getTaskExecutorGateway();

final CompletableFuture<Acknowledge> completableFuture = new CompletableFuture<>();
final AllocationID allocationId = pendingSlotRequest.getAllocationId();
final SlotID slotId = taskManagerSlot.getSlotId();
final InstanceID instanceID = taskManagerSlot.getInstanceId();


//TaskManagerSlot的状态从空闲状态(FREE)标记为待分配状态(PENDING)
taskManagerSlot.assignPendingSlotRequest(pendingSlotRequest);

pendingSlotRequest.setRequestFuture(completableFuture);

returnPendingTaskManagerSlotIfAssigned(pendingSlotRequest);

TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceID);

if(taskManagerRegistration == null) {
throw new IllegalStateException("Could not find a registered task manager for instance id " + instanceID + '.');
}

/**
* 将TaskManager标记为已经使用,SlotManager检查空闲的TaskManager并回收的逻辑就不会被回收了
* */
taskManagerRegistration.markUsed();

/**
* 向TaskManager,请求分配Slot,给JobMaster中作业的task调度部署
*/
CompletableFuture<Acknowledge> requestFuture = gateway.requestSlot(
slotId,
pendingSlotRequest.getJobId(),
allocationId,
pendingSlotRequest.getResourceProfile(),
pendingSlotRequest.getTargetAddress(),
resourceManagerId,
taskManagerRequestTimeout
);


requestFuture.whenComplete((Acknowledge acknowledge, Throwable throwable) -> {
if(acknowledge != null) {
completableFuture.complete(acknowledge);
} else {
completableFuture.completeExceptionally(throwable);
}
});

/**
* 对taskManager返回的结果进行判断,返回的是一个acknowledge
* */
completableFuture.whenCompleteAsync((Acknowledge acknowledge, Throwable throwable) -> {
try {
if(acknowledge != null) {
updateSlot(slotId, allocationId, pendingSlotRequest.getJobId());
} else {
if(throwable instanceof SlotOccupiedException) {
SlotOccupiedException exception = (SlotOccupiedException) throwable;
updateSlot(slotId, exception.getAllocationId(), exception.getJobId());
} else {
removeSlotRequestFromSlot(slotId, allocationId);
}

if(!(throwable instanceof CancellationException)) {
handleFailedSlotRequest(slotId, allocationId, throwable);
} else {
LOG.debug("Slot allocation request {} has been cancelled.", allocationId, throwable);
}
}
} catch(Exception e) {
LOG.error("Error while completing the slot allocation.", e);
}
}, mainThreadExecutor);
}


------------------------------------------------------------------------
@Override
public CompletableFuture<Acknowledge> requestSlot(final SlotID slotId, // 注释: SlotID
final JobID jobId, // 注释: JobID
final AllocationID allocationId, // 注释: AllocationID
final ResourceProfile resourceProfile, // 注释: ResourceProfile
final String targetAddress, // 注释: ResourceManager ID
final ResourceManagerId resourceManagerId, // 注释: ResourceManagerId
final Time timeout) {
// TODO: Filter invalid requests from the resource manager by using the instance/registration Id

log.info("Receive slot request {} for job {} from resource manager with leader id {}.", allocationId, jobId, resourceManagerId);

if(!isConnectedToResourceManager(resourceManagerId)) {
final String message = String.format("TaskManager is not connected to the resource manager %s.", resourceManagerId);
log.debug(message);
return FutureUtils.completedExceptionally(new TaskManagerException(message));
}

try {

/*************************************************
*
* 注释: 分配 Slot
* 简单来说: ResourceManager 告诉 TaskExecutor 说,你应该把 slotid 的 Slot 分配给 JobID 的 Job 去使用
* 先在 TaskExecutor 上,自己先登记,该 Slot 已经被使用
*/
allocateSlot(slotId, jobId, allocationId, resourceProfile);

} catch(SlotAllocationException sae) {
return FutureUtils.completedExceptionally(sae);
}

/*************************************************
*
* 注释: 启动 Job, 并不是一个真正的 Job,而是一个代表是否有链接存在
*/
final JobTable.Job job;
try {
job = jobTable.getOrCreateJob(jobId, () -> registerNewJobAndCreateServices(jobId, targetAddress));
} catch(Exception e) {
// free the allocated slot
try {
taskSlotTable.freeSlot(allocationId);
} catch(SlotNotFoundException slotNotFoundException) {
// slot no longer existent, this should actually never happen, because we've
// just allocated the slot. So let's fail hard in this case!
onFatalError(slotNotFoundException);
}

// release local state under the allocation id.
localStateStoresManager.releaseLocalStateForAllocationId(allocationId);

// sanity check
if(!taskSlotTable.isSlotFree(slotId.getSlotNumber())) {
onFatalError(new Exception("Could not free slot " + slotId));
}

return FutureUtils.completedExceptionally(new SlotAllocationException("Could not create new job.", e));
}

if(job.isConnected()) {

/*************************************************
*
* 注释: 提供一个 Slot 给 JobManager(JobMaster)
*/
offerSlotsToJobManager(jobId);
}

return CompletableFuture.completedFuture(Acknowledge.get());
}



首先获取了taskManager的连接和网关,然后网关去申请slot。

我们看一下申请slot的逻辑requestslot

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
try {

/*************************************************
*
* 注释: 分配 Slot
* 简单来说: ResourceManager 告诉 TaskExecutor 说,你应该把 slotid 的 Slot 分配给 JobID 的 Job 去使用
* 先在 TaskExecutor 上,自己先登记,该 Slot 已经被使用
*/
allocateSlot(slotId, jobId, allocationId, resourceProfile);

} catch(SlotAllocationException sae) {
return FutureUtils.completedExceptionally(sae);
}




----------------------------------------------------------


private void allocateSlot(SlotID slotId, JobID jobId, AllocationID allocationId, ResourceProfile resourceProfile) throws SlotAllocationException {

// 注释: 如果有参数数量的 Slot 空余,则分配
if(taskSlotTable.isSlotFree(slotId.getSlotNumber())) {

// 注释: 分配
if(taskSlotTable.allocateSlot(slotId.getSlotNumber(), jobId, allocationId, resourceProfile, taskManagerConfiguration.getTimeout())) {
log.info("Allocated slot for {}.", allocationId);
} else {
log.info("Could not allocate slot for {}.", allocationId);
throw new SlotAllocationException("Could not allocate slot.");
}
} else if(!taskSlotTable.isAllocated(slotId.getSlotNumber(), jobId, allocationId)) {
final String message = "The slot " + slotId + " has already been allocated for a different job.";

log.info(message);

final AllocationID allocationID = taskSlotTable.getCurrentAllocation(slotId.getSlotNumber());
throw new SlotOccupiedException(message, allocationID, taskSlotTable.getOwningJob(allocationID));
}
}


-----------------------------------------------------

@Override
public boolean allocateSlot(int index, JobID jobId, AllocationID allocationId, ResourceProfile resourceProfile, Time slotTimeout) {
// 检查当前系统是否处于运行状态,如果不在运行状态,抛出异常
checkRunning();

// 校验槽的索引是否有效,必须小于总的槽数
Preconditions.checkArgument(index < numberSlots);

// 检查该分配ID是否已经分配过,如果已经分配,则返回false
TaskSlot<T> taskSlot = allocatedSlots.get(allocationId);
if (taskSlot != null) {
LOG.info("Allocation ID {} is already allocated in {}.", allocationId, taskSlot);
return false; // 如果已分配,返回false
}

// 检查槽的索引是否已经存在任务槽
if (taskSlots.containsKey(index)) {
// 获取已存在的任务槽
TaskSlot<T> duplicatedTaskSlot = taskSlots.get(index);
LOG.info("Slot with index {} already exist, with resource profile {}, job id {} and allocation id {}.",
index, duplicatedTaskSlot.getResourceProfile(), duplicatedTaskSlot.getJobId(), duplicatedTaskSlot.getAllocationId());
// 如果该任务槽的jobId和allocationId与当前相同,则认为分配成功
return duplicatedTaskSlot.getJobId().equals(jobId) && duplicatedTaskSlot.getAllocationId().equals(allocationId);
} else if (allocatedSlots.containsKey(allocationId)) {
// 如果分配ID已经被分配,直接返回true
return true;
}

// 如果索引大于等于0,使用默认资源配置;否则使用传入的资源配置
resourceProfile = index >= 0 ? defaultSlotResourceProfile : resourceProfile;

// 检查是否能够成功预留资源
if (!budgetManager.reserve(resourceProfile)) {
// 资源不足,无法分配
LOG.info(
"Cannot allocate the requested resources. Trying to allocate {}, while the currently remaining available resources are {}, total is {}.",
resourceProfile, budgetManager.getAvailableBudget(), budgetManager.getTotalBudget());
return false; // 预留失败,返回false
}

/*************************************************
* 封装一个新的 TaskSlot 对象,表示任务槽的资源分配信息
* Index = Slot 序号,ResourceProfile = 资源配置
*/
taskSlot = new TaskSlot<>(index, resourceProfile, memoryPageSize, jobId, allocationId, memoryVerificationExecutor);

// 如果索引大于等于0,保存该任务槽到taskSlots映射中,表示该槽已经被分配
if (index >= 0) {
taskSlots.put(index, taskSlot); // 保存分配信息
}

// 更新分配ID和任务槽的映射关系
allocatedSlots.put(allocationId, taskSlot); // 将allocationId与taskSlot关联

// 为该槽注册超时事件,表示该槽分配的有效时间
timerService.registerTimeout(allocationId, slotTimeout.getSize(), slotTimeout.getUnit());

/*************************************************
* 更新作业在该节点上申请的槽数量
*/
Set<AllocationID> slots = slotsPerJob.get(jobId);
if (slots == null) {
// 如果该作业没有槽的集合,创建一个新的集合
slots = new HashSet<>(4);
slotsPerJob.put(jobId, slots); // 将作业ID与其槽集合关联
}

// 维护Slot和allocationId之间的关系,将当前的allocationId添加到作业的槽集合中
slots.add(allocationId);

// 返回true表示槽分配成功
return true;
}

这里的代码是分配slot。简单来说: ResourceManager 告诉 TaskExecutor 说,你应该把 slotid 的 Slot 分配给 JobID 的 Job 去使用。

我们再往下看就是先启动job,看看有没有这个job的链接在,然后是

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
if(job.isConnected()) {

/*************************************************
*
* 注释: 提供一个 Slot 给 JobManager(JobMaster)
*/
offerSlotsToJobManager(jobId);
}

---------------------------------------------------------
private void offerSlotsToJobManager(final JobID jobId) {

/*************************************************
*
* 注释: 分派 Slot 给 JobMaster
*/
jobTable.getConnection(jobId).ifPresent(this::internalOfferSlotsToJobManager);
}


--------------------------------------------------------
private void internalOfferSlotsToJobManager(JobTable.Connection jobManagerConnection) {

// 注释: JobID
final JobID jobId = jobManagerConnection.getJobId();

if(taskSlotTable.hasAllocatedSlots(jobId)) {
log.info("Offer reserved slots to the leader of job {}.", jobId);

/*************************************************
*
* 注释: 获取 JobMaster 地址
*/
final JobMasterGateway jobMasterGateway = jobManagerConnection.getJobManagerGateway();

final Iterator<TaskSlot<Task>> reservedSlotsIterator = taskSlotTable.getAllocatedSlots(jobId);

// 注释: JobMasterId
final JobMasterId jobMasterId = jobManagerConnection.getJobMasterId();

final Collection<SlotOffer> reservedSlots = new HashSet<>(2);

while(reservedSlotsIterator.hasNext()) {
SlotOffer offer = reservedSlotsIterator.next().generateSlotOffer();
reservedSlots.add(offer);
}

/*************************************************
*
* 注释: 将自己的 Slot 分配给 JobManager(JobMaster)
*/
CompletableFuture<Collection<SlotOffer>> acceptedSlotsFuture = jobMasterGateway
.offerSlots(getResourceID(), reservedSlots, taskManagerConfiguration.getTimeout());

acceptedSlotsFuture.whenCompleteAsync(handleAcceptedSlotOffers(jobId, jobMasterGateway, jobMasterId, reservedSlots), getMainThreadExecutor());
} else {
log.debug("There are no unassigned slots for the job {}.", jobId);
}
}



-------------------------------------------------------------------



/**
* 作用:接收来自TaskExecutor的Slot
*
*/
@Override
public CompletableFuture<Collection<SlotOffer>> offerSlots(final ResourceID taskManagerId, final Collection<SlotOffer> slots,
final Time timeout) {

//判断汇报的TaskExecutor的taskManagerId是否在registeredTaskManagers的列表中
Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManager = registeredTaskManagers.get(taskManagerId);

//检查汇报的taskExecutor是否已经注册
if(taskManager == null) {
//如果没有注册,向TaskExecutor返回:Unknown TaskManager
return FutureUtils.completedExceptionally(new Exception("Unknown TaskManager " + taskManagerId));
}

final TaskManagerLocation taskManagerLocation = taskManager.f0;
final TaskExecutorGateway taskExecutorGateway = taskManager.f1;

final RpcTaskManagerGateway rpcTaskManagerGateway = new RpcTaskManagerGateway(taskExecutorGateway, getFencingToken());

//调用slotPool.offerSlots方法来处理slot的逻辑
return CompletableFuture.completedFuture(slotPool.offerSlots(taskManagerLocation, rpcTaskManagerGateway, slots));
}

-----------------------------------------------------------------



/**
* 处理TaskExecutor汇报上来的所有slot的逻辑:
* 1. 遍历TaskExecutor汇报上来的slot信息列表(offers),调用offerSlot方法, 实现将Slot添加到SlotPool的逻辑
* 2. 如果提供的Slot添加到SlotPool成功,将其Slot的信息(SlotOffer)添加到成功result列表中
* 3. 最后将result列表返回给汇报的TaskExecutor
*
* SlotOffer:
* allocationId:分配的ID
* slotIndex:TaskExecutor的Slot中的下标
* resourceProfile:Slot对应的资源规格
* 如何确定唯一的Slot: taskManagerLocation + SlotOffer确定Slot的唯一性
* */
@Override
public Collection<SlotOffer> offerSlots(TaskManagerLocation taskManagerLocation, TaskManagerGateway taskManagerGateway, Collection<SlotOffer> offers) {
ArrayList<SlotOffer> result = new ArrayList<>(offers.size());
for(SlotOffer offer : offers) {
if(offerSlot(taskManagerLocation, taskManagerGateway, offer)) {
result.add(offer);
}
}
return result;
}

这里最后这个offerSlots方法就是把申请到的slot放到了slotPool中

这个逻辑就是把slot分配到jobManager的逻辑

因为这最后获取整个的申请的slotpool后往前看是

1
2
CompletableFuture<Collection<SlotOffer>> acceptedSlotsFuture = jobMasterGateway
.offerSlots(getResourceID(), reservedSlots, taskManagerConfiguration.getTimeout());

这里就是把offerSlots的那些slot分配给了这个jobMaster

然后我们回到SlotManagerImpl的最后

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
	/**
* 向TaskManager,请求分配Slot,给JobMaster中作业的task调度部署
*/
CompletableFuture<Acknowledge> requestFuture = gateway.requestSlot(
slotId,
pendingSlotRequest.getJobId(),
allocationId,
pendingSlotRequest.getResourceProfile(),
pendingSlotRequest.getTargetAddress(),
resourceManagerId,
taskManagerRequestTimeout
);


requestFuture.whenComplete((Acknowledge acknowledge, Throwable throwable) -> {
if(acknowledge != null) {
completableFuture.complete(acknowledge);
} else {
completableFuture.completeExceptionally(throwable);
}
});

/**
* 对taskManager返回的结果进行判断,返回的是一个acknowledge
* */
completableFuture.whenCompleteAsync((Acknowledge acknowledge, Throwable throwable) -> {
try {
if(acknowledge != null) {
updateSlot(slotId, allocationId, pendingSlotRequest.getJobId());
} else {
if(throwable instanceof SlotOccupiedException) {
SlotOccupiedException exception = (SlotOccupiedException) throwable;
updateSlot(slotId, exception.getAllocationId(), exception.getJobId());
} else {
removeSlotRequestFromSlot(slotId, allocationId);
}

if(!(throwable instanceof CancellationException)) {
handleFailedSlotRequest(slotId, allocationId, throwable);
} else {
LOG.debug("Slot allocation request {} has been cancelled.", allocationId, throwable);
}
}
} catch(Exception e) {
LOG.error("Error while completing the slot allocation.", e);
}
}, mainThreadExecutor);
}

上面申请了slot之后下面的内容就是根据返回的acknowledge是不是null判断是否更新申请slot成功

处理Slot分配的成功或失败。

这里我们详细的看一下这个acknowledge的三种情况:

  1. acknowledge != null

当确认返回的acknowledge不为空,会更新TaskManagerSlot的状态,有待分配转变为分配状态(ALLOCATED

  1. throwable instanceof SlotOccupiedException

表示TaskExecutor的slot已经被其他的slot请求占用了,会拒绝此次绑定的slot申请pendingSlotRequest的请求

将TaskManagerSlot的状态从待分配(PENDING)修改已经分配状态(ALLOCATED)

并绑定返回已占用的allocationId来完成与TaskExecutor Slot占用情况的映射。

  1. throwable instanceof CancellationException

执行handleFailedSlotRequest(slotId, allocationId, throwable);

会移除TaskManagerSlot与待分配的slot申请 pendingSlotRequest的绑定关系.

那么这里的返回的这个三种情况在哪里看呢

1
2
3
4
5
6
7
8
9
CompletableFuture<Acknowledge> requestFuture = gateway.requestSlot(
slotId,
pendingSlotRequest.getJobId(),
allocationId,
pendingSlotRequest.getResourceProfile(),
pendingSlotRequest.getTargetAddress(),
resourceManagerId,
taskManagerRequestTimeout
);

从这里点进去,我们回到TaskExecutor里面

这里的requestslot方法中return的异常就是我们前面说的acknowledge的三种状态,最后如果没有异常返回,最后就会返回一个标志位return CompletableFuture.completedFuture(Acknowledge.get());

成功时更新Slot,失败时根据具体异常进行不同的处理(如更新状态、移除请求、记录日志等)。

成功处理:如果 acknowledge 不为 null,表示 Slot 分配成功,调用 updateSlot 更新 Slot 状态。有待分配转变为分配状态(ALLOCATED)

异常处理

  • 如果是 SlotOccupiedException,更新 Slot 的占用状态。表示TaskExecutor的slot已经被其他的slot请求占用了,会拒绝此次绑定的slot申请pendingSlotRequest的请求。TaskManagerSlot的状态从待分配(PENDING)修改已经分配状态(ALLOCATED),并绑定返回已占用的allocationId来完成与TaskExecutor Slot占用情况的映射。
  • 如果是其他异常,移除 Slot 请求并执行失败处理。
  • 如果是 CancellationException,仅打印调试信息,不做进一步处理。

失败处理:对于其他异常(如网络错误等),调用 handleFailedSlotRequest 进行失败请求的处理。会移除TaskManagerSlot与待分配的slot申请 pendingSlotRequest的绑定关系.

前面的内容就是ifPresent的也就是如果slot匹配成功SlotManager空闲的slot的,下面看一下如果 没有完成匹配的话

我们回到SlotManagerImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
private void internalRequestSlot(PendingSlotRequest pendingSlotRequest) throws ResourceManagerException {
final ResourceProfile resourceProfile = pendingSlotRequest.getResourceProfile();

OptionalConsumer.of(findMatchingSlot(resourceProfile)).ifPresent(

/*************************************************
*
* 注释: 向 TaskManager 申请 Slot
*/
taskManagerSlot -> allocateSlot(taskManagerSlot, pendingSlotRequest)
).ifNotPresent(

/*************************************************
*
* 注释: 使用待处理的任务管理器插槽满足待处理的插槽请求
*/
() -> fulfillPendingSlotRequestWithPendingTaskManagerSlot(pendingSlotRequest)
);
}




-------------------------------------------------------------------
/**
* 1. 会执行匹配待完成资源申请的Slot或申请TaskManager的过程
* 2. 先查看待完成资环申请的slot列表中是否存在slot未绑定slot申请,且与本次待分配的slot请求的资源
* 规格一致。如果存在,直接将符合条件的待审申请slot(pendingTaskManagerSlot)与本次待分配的Slot请求绑定;
* 3. 否则:
* 直接通过部署模式申请TaskExecutor的资源,将返回一个或者多个待完成资源申请的slot,并
* 将其记录到待完成资源申请的slot列表中,并从待完成子会员申请的slot列表中选择一个与本次
* slot申请绑定。
* */
private void fulfillPendingSlotRequestWithPendingTaskManagerSlot(PendingSlotRequest pendingSlotRequest) throws ResourceManagerException {
ResourceProfile resourceProfile = pendingSlotRequest.getResourceProfile();
Optional<PendingTaskManagerSlot> pendingTaskManagerSlotOptional = findFreeMatchingPendingTaskManagerSlot(resourceProfile);

if(!pendingTaskManagerSlotOptional.isPresent()) {
pendingTaskManagerSlotOptional = allocateResource(resourceProfile);
}
OptionalConsumer.of(pendingTaskManagerSlotOptional)
.ifPresent(pendingTaskManagerSlot -> assignPendingTaskManagerSlot(pendingSlotRequest, pendingTaskManagerSlot)).ifNotPresent(() -> {
// request can not be fulfilled by any free slot or pending slot that can be allocated,
// check whether it can be fulfilled by allocated slots
if(failUnfulfillableRequest && !isFulfillableByRegisteredOrPendingSlots(pendingSlotRequest.getResourceProfile())) {
throw new UnfulfillableSlotRequestException(pendingSlotRequest.getAllocationId(), pendingSlotRequest.getResourceProfile());
}
});
}


---------------------------------------------------------------------

首先获取待处理的slot,然后去申请资源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private Optional<PendingTaskManagerSlot> allocateResource(ResourceProfile requestedSlotResourceProfile) {
final int numRegisteredSlots = getNumberRegisteredSlots();
final int numPendingSlots = getNumberPendingTaskManagerSlots();
if(isMaxSlotNumExceededAfterAdding(numSlotsPerWorker)) {
LOG.warn("Could not allocate {} more slots. The number of registered and pending slots is {}, while the maximum is {}.", numSlotsPerWorker,
numPendingSlots + numRegisteredSlots, maxSlotNum);
return Optional.empty();
}

if(!defaultSlotResourceProfile.isMatching(requestedSlotResourceProfile)) {
// requested resource profile is unfulfillable
return Optional.empty();
}

/*************************************************
*
* 注释: 申请资源
*/
if(!resourceActions.allocateResource(defaultWorkerResourceSpec)) {
// resource cannot be allocated
return Optional.empty();
}

PendingTaskManagerSlot pendingTaskManagerSlot = null;
for(int i = 0; i < numSlotsPerWorker; ++i) {
pendingTaskManagerSlot = new PendingTaskManagerSlot(defaultSlotResourceProfile);
pendingSlots.put(pendingTaskManagerSlot.getTaskManagerSlotId(), pendingTaskManagerSlot);
}

return Optional.of(Preconditions.checkNotNull(pendingTaskManagerSlot, "At least one pending slot should be created."));
}

检查资源是否可以分配(是否超过最大限制、是否能满足请求的资源配置)。

尝试分配资源,并在成功时为每个请求的Slot创建一个PendingTaskManagerSlot

将创建的PendingTaskManagerSlot放入pendingSlots集合,表示这些Slot正在等待进一步的处理。

返回一个Optional,包含成功创建的PendingTaskManagerSlot,如果分配失败则返回Optional.empty()

上面的内容就是resourceManager申请slot的内容,调用的是requestSlot方法,从这里开始下面的内容就是Slot的注册分配. 首先需要先申请slot,然后进入到注册和分配的内容中

首先我们看一下这里的requestSlot方法,这个方法里面就是slot的注册分配,我们首先回到ResourceManager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* Slot注册与分配的过程:
* 启动TaskManager会注册到ResourceManager,同时将其Slot信息汇报给ResourceManager
* 而汇报的Slot由SlotManager进行管理
*
* */
@Override
public CompletableFuture<Acknowledge> sendSlotReport(ResourceID taskManagerResourceId, InstanceID taskManagerRegistrationId,
SlotReport slotReport, Time timeout) {
final WorkerRegistration<WorkerType> workerTypeWorkerRegistration = taskExecutors.get(taskManagerResourceId);

//判断TaskManager是否已经注册到ResourceManager
if(workerTypeWorkerRegistration.getInstanceID().equals(taskManagerRegistrationId)) {
//执行slotManager处理Slot注册与分配的逻辑
if(slotManager.registerTaskManager(workerTypeWorkerRegistration, slotReport)) {
onTaskManagerRegistration(workerTypeWorkerRegistration);
}
return CompletableFuture.completedFuture(Acknowledge.get());
} else {
return FutureUtils.completedExceptionally(
new ResourceManagerException(String.format("Unknown TaskManager registration id %s.", taskManagerRegistrationId)));
}
}

这里再看一下这里的slot的注册和分配的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/**
* 1. 检查SlotManager是否启动
* 2. TaskManager是否为首次注册
* 非首次:直接汇报TaskManagerSlot状态,最终更新TaskManagerSlot的状态
* 首次:会将注册的TaskManager记录到已注册的TaskManager列表,并遍历TaskManager所汇报的Slots,
* 执行Slot注册的逻辑
*/
@Override
public boolean registerTaskManager(final TaskExecutorConnection taskExecutorConnection, SlotReport initialSlotReport) {
//首秀检查SlotManager是否启动
checkInit();

LOG.debug("Registering TaskManager {} under {} at the SlotManager.", taskExecutorConnection.getResourceID(), taskExecutorConnection.getInstanceID());

// 检查TaskManager是否为首次注册
if(taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) {
//非首次注册的情况(一般情况下都是首次注册)
reportSlotStatus(taskExecutorConnection.getInstanceID(), initialSlotReport);
return false;
} else {
if(isMaxSlotNumExceededAfterRegistration(initialSlotReport)) {
LOG.info("The total number of slots exceeds the max limitation {}, release the excess resource.", maxSlotNum);
resourceActions
.releaseResource(taskExecutorConnection.getInstanceID(), new FlinkException("The total number of slots exceeds the max limitation."));
return false;
}

// 首次注册和汇报Slot的TaskManager
ArrayList<SlotID> reportedSlots = new ArrayList<>();

for(SlotStatus slotStatus : initialSlotReport) {
reportedSlots.add(slotStatus.getSlotID());
}

TaskManagerRegistration taskManagerRegistration = new TaskManagerRegistration(taskExecutorConnection, reportedSlots);

taskManagerRegistrations.put(taskExecutorConnection.getInstanceID(), taskManagerRegistration);

// next register the new slots
for(SlotStatus slotStatus : initialSlotReport) {
//注册TaskManager所有汇报的Slot
registerSlot(slotStatus.getSlotID(),
slotStatus.getAllocationID(),
slotStatus.getJobID(),
slotStatus.getResourceProfile(),
taskExecutorConnection);
}
return true;
}

}

这里首先看一下TaskManager是不是首次注册,一般都是首次注册

如果是首次注册的话就遍历每一个slot注册,注册taskmanager所有汇报的slot

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* SlotManager处理注册Slot的逻辑:
* 1. 首先检查该Slot是否首次注册,如果非首次注册则从已注册的TaskManagerSlot列表中移除老的Slot
* 2. 创建新的TaskManagerSlot来与TaskManager汇报的Slot对应,并从待分配Slot申请列表中匹配符合
* TaskManager资源规格的待分配申请。
* 3. 如果匹配不到,直接更新TaskManager的状态,匹配到会执行allocateSlot
*/
private void registerSlot(SlotID slotId, AllocationID allocationId, JobID jobId, ResourceProfile resourceProfile,
TaskExecutorConnection taskManagerConnection) {

//首先检查该Slot是否首次注册,如果非首次注册则从已注册的TaskManagerSlot列表中移除老的Slot
if(slots.containsKey(slotId)) {
// remove the old slot first
removeSlot(slotId,
new SlotManagerException(String.format("Re-registration of slot %s. This indicates that the TaskExecutor has re-connected.", slotId)));
}

final TaskManagerSlot slot = createAndRegisterTaskManagerSlot(slotId, resourceProfile, taskManagerConnection);

final PendingTaskManagerSlot pendingTaskManagerSlot;

if(allocationId == null) {
//匹配待分配Slot的申请
pendingTaskManagerSlot = findExactlyMatchingPendingTaskManagerSlot(resourceProfile);
} else {
pendingTaskManagerSlot = null;
}

if(pendingTaskManagerSlot == null) {
//不存在匹配的待分配的Slot申请,直接更新TaskMangerSlot的状态
updateSlot(slotId, allocationId, jobId);
} else {
pendingSlots.remove(pendingTaskManagerSlot.getTaskManagerSlotId());
//检查待分配的Slot申请是否绑定待申请资源的Slot
final PendingSlotRequest assignedPendingSlotRequest = pendingTaskManagerSlot.getAssignedPendingSlotRequest();

if(assignedPendingSlotRequest == null) {
//未绑定待申请资源的Slot,执行将TaskManagerSlot设置为空闲的逻辑
handleFreeSlot(slot);
} else {
//绑定了待审请资源的Slot,执行分配该Slot的逻辑
assignedPendingSlotRequest.unassignPendingTaskManagerSlot();
allocateSlot(slot, assignedPendingSlotRequest);
}
}
}

前面我们检查的是这个taskManager是不是首次注册,如果不是首次注册的话,这里还需要一个判断,判断这个slot是不是首次注册

如果这个slot不是首次注册就从已经注册是列表中移除这个老的slot

然后创建一个新的taskManagerSlot,去和taskmanager汇报的slot对应,如果没有对应上就说明没有能匹配上的待分配的slot

这时候就把这个TaskManagrSlot更新为空闲状态,因为标识这个slot不是即将要使用的

如果匹配上了就检查待分配的slot申请是否绑定待申请资源的slot

如果没有绑定那么就是匹配到待分配的slot。如果没有绑定,那么就需要绑定

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 如果Slot为空闲,调用findMatchingRequest方法接着去检查待分配的Slot申请列表中是否存在与
* TaskManagerSlot的资源规格一样,且未被绑定的Slot申请。如果存在,则直接将TaskManagerSlot
* 分配,不存在加入空闲等待
*/
private void handleFreeSlot(TaskManagerSlot freeSlot) {
Preconditions.checkState(freeSlot.getState() == TaskManagerSlot.State.FREE);

PendingSlotRequest pendingSlotRequest = findMatchingRequest(freeSlot.getResourceProfile());

if(null != pendingSlotRequest) {
allocateSlot(freeSlot, pendingSlotRequest);
} else {
freeSlots.put(freeSlot.getSlotId(), freeSlot);
}
}

这里就是绑定,就是首先去找到规格一样的未被绑定的资源也就是findMatchingRequest

然后就是如果找到了的话就调用allocateSlot方法,这个方法我们在前面说过,可以回去看一下

如果这里是非首次注册这个slot的话就返回false

1
2
3
4
5
6
// 检查TaskManager是否为首次注册
if(taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) {
//非首次注册的情况(一般情况下都是首次注册)
reportSlotStatus(taskExecutorConnection.getInstanceID(), initialSlotReport);
return false;
}

这个就是resourceManager中的两个方法,一个是申请slot,一个是slot的注册和分配slot

我们再来回到SlotManagerImpl类中的看一下registerSlot方法中的没有绑定申请资源的slot会执行设置空闲的逻辑,等于这个资源没有人用就闲置起来的方法handleFreeslo

我们来看一下这个方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 如果Slot为空闲,调用findMatchingRequest方法接着去检查待分配的Slot申请列表中是否存在与
* TaskManagerSlot的资源规格一样,且未被绑定的Slot申请。如果存在,则直接将TaskManagerSlot
* 分配,不存在加入空闲等待
*/
private void handleFreeSlot(TaskManagerSlot freeSlot) {
Preconditions.checkState(freeSlot.getState() == TaskManagerSlot.State.FREE);

PendingSlotRequest pendingSlotRequest = findMatchingRequest(freeSlot.getResourceProfile());

if(null != pendingSlotRequest) {
allocateSlot(freeSlot, pendingSlotRequest);
} else {
freeSlots.put(freeSlot.getSlotId(), freeSlot);
}
}

这个方法就是如果slot为空闲就调用findMatchingRequest检查待分配的slot列表中是否有和taskManagerslot的资源规格一样且未被绑定的slot申请,如果有就直接分配,没有就加入空闲中

我们再回到这个registerSlot,这里有一段是

1
2
3
4
if(pendingTaskManagerSlot == null) {
//不存在匹配的待分配的Slot申请,直接更新TaskMangerSlot的状态
updateSlot(slotId, allocationId, jobId);
}

这里的如果不存在匹配的待分配的slot申请,也就是没有哪个申请能用到当前的这个slot,就把这个slot设置成空闲

这个updateSlot方法我们重点再看一下,这个方法是改变这个slot的状态的方法,slot有六种状态,下面会详细的解读一下slot的六种状态

改变slot的六种状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private boolean updateSlot(SlotID slotId, AllocationID allocationId, JobID jobId) {
final TaskManagerSlot slot = slots.get(slotId);

if(slot != null) {
final TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(slot.getInstanceId());

if(taskManagerRegistration != null) {
//更新TaskManagerSlot的状态
updateSlotState(slot, taskManagerRegistration, allocationId, jobId);
return true;
} else {
throw new IllegalStateException("Trying to update a slot from a TaskManager " + slot.getInstanceId() + " which has not been registered.");
}
} else {
LOG.debug("Trying to update unknown slot with slot id {}.", slotId);

return false;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
/**
* TaskManagerSlot状态变化:
* Slot的申请与分配过程中涉及TaskManagerSlot的状态变化
* 如果分配allocationId为空,表示TaskManagerSlot对应的TaskExecutor的Slot还没被分配占用
* 如果不为空,则表示已经别配占用
* */
private void updateSlotState(TaskManagerSlot slot, TaskManagerRegistration taskManagerRegistration, @Nullable AllocationID allocationId,
@Nullable JobID jobId) {
if(null != allocationId) {
switch(slot.getState()) {
/**
* 1. 当汇报的Slot已分配占有,且TaskMangerSlot的状态为待分配(PENDING)时。
* 先查找TaskMangerSlot绑定的待分配Slot申请,再比较该待分配slot申请的allocationId
* 与汇报上来的Slot的allocationId是否相等。
* 如果相等:
* 取消待分配的Slot申请并将其中待分配的Slot申请列表中移除,标记本次分配Slot申请成功,
* 同时TaskMangerSlot状态从待分配状态(PENDING),转换到已分配状态(ALLOCATED)
*
* 不相等:
* 将TaskMangerSlot绑定的待分配slot申请拒绝掉,根据汇报的slot分配ID匹配待分配的申请,
* 将匹配的待分配slot申请取消并从待分配的Slot申请列表中移除,标记本次分配slot申请完成
* 同时TaskMangerSlot状态从待分配变换为已分配。
* */
case PENDING:
// we have a pending slot request --> check whether we have to reject it
PendingSlotRequest pendingSlotRequest = slot.getAssignedSlotRequest();

if(Objects.equals(pendingSlotRequest.getAllocationId(), allocationId)) {
// we can cancel the slot request because it has been fulfilled
cancelPendingSlotRequest(pendingSlotRequest);

// remove the pending slot request, since it has been completed
pendingSlotRequests.remove(pendingSlotRequest.getAllocationId());

slot.completeAllocation(allocationId, jobId);
} else {
// we first have to free the slot in order to set a new allocationId
slot.clearPendingSlotRequest();
// set the allocation id such that the slot won't be considered for the pending slot request
slot.updateAllocation(allocationId, jobId);

// remove the pending request if any as it has been assigned
final PendingSlotRequest actualPendingSlotRequest = pendingSlotRequests.remove(allocationId);

if(actualPendingSlotRequest != null) {
cancelPendingSlotRequest(actualPendingSlotRequest);
}

// this will try to find a new slot for the request
rejectPendingSlotRequest(pendingSlotRequest,
new Exception("Task manager reported slot " + slot.getSlotId() + " being already allocated."));
}

taskManagerRegistration.occupySlot();
break;
case ALLOCATED:
/**
* 2. 当汇报的Slot已分配占有,且TaskManager的状态为已分配(ALLOCATED)时,如果TaskManagerSlot的
* 分配ID与汇报的Slot的分配ID不一致,则通过先释放后占用的方式,将TaskManagerSlot的分配ID
* 变更为Slot的分配ID,TaskManagerSlot的状态变换为分配状态(ALLOCATED) -> (FREE) -> (ALLOCATED)
*
* */
if(!Objects.equals(allocationId, slot.getAllocationId())) {
slot.freeSlot();
slot.updateAllocation(allocationId, jobId);
}
break;
case FREE:
/**
* 3. 当汇报的Slot已分配占有,且TaskManagerSlot的状态为空闲,TaskManagerSlot从空闲的TaskManagerSlot列表移除
* 将TaskManagerSlot的分配ID设置为汇报的分配ID,然后TaskManagerSlot由空闲状态转换为已分配(ALLOCATED)状态.
* */
// the slot is currently free --> it is stored in freeSlots
freeSlots.remove(slot.getSlotId());
slot.updateAllocation(allocationId, jobId);
taskManagerRegistration.occupySlot();
break;
}

fulfilledSlotRequests.put(allocationId, slot.getSlotId());
} else {

switch(slot.getState()) {
case FREE:
/**
* 4. 当汇报的Slot未分配(汇报的Slot的allocationId为空),且TaskMangerSlot的状态为空闲时,直接调用handleFreeSlot
* 检查是否有匹配的待分配的Slot申请供其分配。
* */
handleFreeSlot(slot);
break;
case PENDING:
/**
* 5. 当汇报的Slot未分配,且TaskManagerSlot的状态为待分配时,不需要做任何逻辑处理
* */
// don't do anything because we still have a pending slot request
break;
case ALLOCATED:
/**
* 6. 当汇报的Slot未分配,且TaskManagerSlot的状态为已分配时,将TaskManagerSlot释放
* 并将TaskManagerSlot从已经分配完成的列表中移除。
* TaskManagerSlot的状态由已分配状态(ALLOCATED)转换为空闲状态(FREE)
* 接着直接调用handleFreeSlot检查是否由匹配的待分配的Slot请求供其分配。
* 汇报的Slot已分配占由的情况下,都会将分配ID,记录到已经完成的Slot申请列表中(fulfilledSlotRequests)
* */
AllocationID oldAllocation = slot.getAllocationId();
slot.freeSlot();
fulfilledSlotRequests.remove(oldAllocation);
taskManagerRegistration.freeSlot();

handleFreeSlot(slot);
break;
}
}
}

这里的代码比较复杂

我们首先解释一下什么是allocationid

这里的allocationid是一个唯一标识,标记一个具体的slot的分配请求,表示某个任务对slot的需求,每次任务请求资源时,会生成一个对应的 AllocationID,表示任务对 Slot 的资源需求。

我们再解释一下分配占有是什么意思

“分配占有” 表示一个 TaskManagerSlot 是否已经绑定了一个有效的 AllocationID,以及是否被任务真正使用。

未分配状态 (FREE):

  • TaskManagerSlot 当前是空闲状态,尚未被任务占用。
  • 没有绑定 AllocationID

待分配状态 (PENDING):

  • TaskManagerSlot 收到了分配请求,但还未完成分配。
  • 此时 TaskManagerSlot 处于等待状态,绑定了一个 PendingSlotRequest,但未完成分配。

已分配状态 (ALLOCATED):

  • TaskManagerSlot 被一个任务成功分配并占用。
  • 绑定了一个 AllocationID,表示该 Slot 已经分配给具体任务使用。

总结六种状态

(1) Slot 已分配 (allocationId != null)

  • 状态:PENDING 当前状态为待分配(PENDING 是指某个 TaskManagerSlot 已经收到一个分配请求,但分配过程尚未完成。具体来说,Slot 正在等待分配完成,尚未绑定到具体的任务。出现这种情况的原因是异步调用机制
    • 如果当前 Slot 状态为待分配 (PENDING),则:
      • 对比汇报的 AllocationID 和 Slot 当前绑定的 PendingSlotRequest 的 AllocationID
        • 相等:表示分配请求完成,将 Slot 状态改为 ALLOCATED
        • 不相等:释放当前绑定的分配请求并处理新的分配请求。
  • 状态:ALLOCATED
    • 如果当前 Slot 状态为已分配 (ALLOCATED),但 AllocationID 不一致:
      • 通过释放并重新分配的方式更新 Slot 的 AllocationID
  • 状态:FREE
    • 如果当前 Slot 状态为空闲 (Free),则:
      • 从空闲 Slot 列表中移除,更新其 AllocationID,并将其状态更新为 ALLOCATED

(2) Slot 未分配 (allocationId == null)

  • 状态:FREE
    • 如果当前 Slot 状态为空闲,调用 handleFreeSlot 试图匹配待分配的 Slot 请求。
  • 状态:PENDING
    • 如果当前 Slot 状态为待分配,不做任何处理,保留原有状态。
  • 状态:ALLOCATED
    • 如果当前 Slot 状态为已分配,则:
      • 释放 Slot,并从已完成的分配请求列表中移除该 Slot 的分配记录。
      • 调用 handleFreeSlot,尝试匹配新的分配请求。

taskManagerslot的四五六,三种没有allocationId的情况

我们先从四五六这三种i情况开始看,这三种情况是没有allocationId的情况

我们先从第四种情况开始看

当汇报的slot未分配也就是allocationId为空的时候,且TaskManagerSlot为空闲的情况,

直接调用handleFreeslot,也就是继续看有没有和当前是slot匹配到的资源,如果有就分配,没有就加入空闲

1
2
3
4
5
6
7
8
switch(slot.getState()) {
case FREE:
/**
* 4. 当汇报的Slot未分配(汇报的Slot的allocationId为空),且TaskMangerSlot的状态为空闲时,直接调用handleFreeSlot
* 检查是否有匹配的待分配的Slot申请供其分配。
* */
handleFreeSlot(slot);
break;

然后看一下第五种情况

这个情况就是当slot即将分配出去了,这里就什么都不需要做了,因为taskManagerSlot的状态已经是待分配了,而且当前汇报的slot未分配

1
2
3
4
5
6
case PENDING:
/**
* 5. 当汇报的Slot未分配,且TaskManagerSlot的状态为待分配时,不需要做任何逻辑处理
* */
// don't do anything because we still have a pending slot request
break;

再看一下第六种i情况

第六种情况其实是异常情况

这种是taskManagerSlot已经分配了,但是现在是没有allocationId的,这时候的分配是有问题的

我们需要把已经分配的这个移除,然后把状态转换为空闲状态,再去调用handliefreeslot看看带申请的有没有和自己匹配的提供分配

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
case ALLOCATED:
/**
* 6. 当汇报的Slot未分配,且TaskManagerSlot的状态为已分配时,将TaskManagerSlot释放
* 并将TaskManagerSlot从已经分配完成的列表中移除。
* TaskManagerSlot的状态由已分配状态(ALLOCATED)转换为空闲状态(FREE)
* 接着直接调用handleFreeSlot检查是否由匹配的待分配的Slot请求供其分配。
* 汇报的Slot已分配占由的情况下,都会将分配ID,记录到已经完成的Slot申请列表中(fulfilledSlotRequests)
* */
AllocationID oldAllocation = slot.getAllocationId();
slot.freeSlot();
fulfilledSlotRequests.remove(oldAllocation);
taskManagerRegistration.freeSlot();

handleFreeSlot(slot);
break;




-------------------------------------------------------------------------



/**
* 如果Slot为空闲,调用findMatchingRequest方法接着去检查待分配的Slot申请列表中是否存在与
* TaskManagerSlot的资源规格一样,且未被绑定的Slot申请。如果存在,则直接将TaskManagerSlot
* 分配,不存在加入空闲等待
*/
private void handleFreeSlot(TaskManagerSlot freeSlot) {
Preconditions.checkState(freeSlot.getState() == TaskManagerSlot.State.FREE);

PendingSlotRequest pendingSlotRequest = findMatchingRequest(freeSlot.getResourceProfile());

if(null != pendingSlotRequest) {
allocateSlot(freeSlot, pendingSlotRequest);
} else {
freeSlots.put(freeSlot.getSlotId(), freeSlot);
}
}


---------------------------------------------------------------------------

/**
* 1.分配Slot的过程是先将空闲的TaskManagerSlot的状态(FREE)标记为待分配状态(PENDING)
* 2.再绑定对应待分配的slot申请请求(pendingRequest)
* 3.通过TaskManagerSlot中taskManager的信息taskExecutor请求异步占有对应的Slot。然后对返回的
* 结果进行处理。
* 返回的结果对象acknowledge有三种情况
* 1.acknowledge != null
* 当确认返回的acknowledge不为空,会更新TaskManagerSlot的状态,有待分配转变为分配状态(ALLOCATED)
* 2.throwable instanceof SlotOccupiedException
* 表示TaskExecutor的slot已经被其他的slot请求占用了,会拒绝此次绑定的slot申请pendingSlotRequest的请求
* 将TaskManagerSlot的状态从待分配(PENDING)修改已经分配状态(ALLOCATED)
* 并绑定返回已占用的allocationId来完成与TaskExecutor Slot占用情况的映射。
* 3. throwable instanceof CancellationException
* 执行handleFailedSlotRequest(slotId, allocationId, throwable);
* 会移除TaskManagerSlot与待分配的slot申请 pendingSlotRequest的绑定关系.
*/
private void allocateSlot(TaskManagerSlot taskManagerSlot, PendingSlotRequest pendingSlotRequest) {
//要分配的Slot是否是空闲的
Preconditions.checkState(taskManagerSlot.getState() == TaskManagerSlot.State.FREE);
//获得与taskExecutor的连接
TaskExecutorConnection taskExecutorConnection = taskManagerSlot.getTaskManagerConnection();
//获取gateway,要远程调用TaskExecutorer
TaskExecutorGateway gateway = taskExecutorConnection.getTaskExecutorGateway();

final CompletableFuture<Acknowledge> completableFuture = new CompletableFuture<>();
final AllocationID allocationId = pendingSlotRequest.getAllocationId();
final SlotID slotId = taskManagerSlot.getSlotId();
final InstanceID instanceID = taskManagerSlot.getInstanceId();


//TaskManagerSlot的状态从空闲状态(FREE)标记为待分配状态(PENDING)
taskManagerSlot.assignPendingSlotRequest(pendingSlotRequest);

pendingSlotRequest.setRequestFuture(completableFuture);

returnPendingTaskManagerSlotIfAssigned(pendingSlotRequest);

TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceID);

if(taskManagerRegistration == null) {
throw new IllegalStateException("Could not find a registered task manager for instance id " + instanceID + '.');
}

/**
* 将TaskManager标记为已经使用,SlotManager检查空闲的TaskManager并回收的逻辑就不会被回收了
* */
taskManagerRegistration.markUsed();

/**
* 向TaskManager,请求分配Slot,给JobMaster中作业的task调度部署
*/
CompletableFuture<Acknowledge> requestFuture = gateway.requestSlot(
slotId,
pendingSlotRequest.getJobId(),
allocationId,
pendingSlotRequest.getResourceProfile(),
pendingSlotRequest.getTargetAddress(),
resourceManagerId,
taskManagerRequestTimeout
);


requestFuture.whenComplete((Acknowledge acknowledge, Throwable throwable) -> {
if(acknowledge != null) {
completableFuture.complete(acknowledge);
} else {
completableFuture.completeExceptionally(throwable);
}
});

/**
* 对taskManager返回的结果进行判断,返回的是一个acknowledge
* */
completableFuture.whenCompleteAsync((Acknowledge acknowledge, Throwable throwable) -> {
try {
if(acknowledge != null) {
updateSlot(slotId, allocationId, pendingSlotRequest.getJobId());
} else {
if(throwable instanceof SlotOccupiedException) {
SlotOccupiedException exception = (SlotOccupiedException) throwable;
updateSlot(slotId, exception.getAllocationId(), exception.getJobId());
} else {
removeSlotRequestFromSlot(slotId, allocationId);
}

if(!(throwable instanceof CancellationException)) {
handleFailedSlotRequest(slotId, allocationId, throwable);
} else {
LOG.debug("Slot allocation request {} has been cancelled.", allocationId, throwable);
}
}
} catch(Exception e) {
LOG.error("Error while completing the slot allocation.", e);
}
}, mainThreadExecutor);
}

我们再点进去这个handleFreeSlot方法中点到allocateSlot方法,这里就会再返回到我们的updateSlot中,这一次返回到方法,由于找到了allocationId就会走到这个updateSlot方法中的

1
fulfilledSlotRequests.put(allocationId, slot.getSlotId());

这里就是在汇报的Slot已分配的情况下,都会将分配ID,并且记录到已经完成的Slot申请列表中(fulfilledSlotRequests)

taskManagerslot的一二三,三种有allocationId的情况

我们先看第二种情况,第一种情况有些复杂,需要等一下我们再看吧

第二种情况是

当我们汇报的slot已经分配占有,且现在的TaskManager的状态是已封面配的时候,如果当前的这个已经分配的id和汇报的id不一致,就会先释放再占有的方式变更为slot的分配id,变更为分配到未分配再到分配

1
2
3
4
5
6
7
8
9
10
11
12
case ALLOCATED:
/**
* 2. 当汇报的Slot已分配占有,且TaskManager的状态为已分配(ALLOCATED)时,如果TaskManagerSlot的
* 分配ID与汇报的Slot的分配ID不一致,则通过先释放后占用的方式,将TaskManagerSlot的分配ID
* 变更为Slot的分配ID,TaskManagerSlot的状态变换为分配状态(ALLOCATED) -> (FREE) -> (ALLOCATED)
*
* */
if(!Objects.equals(allocationId, slot.getAllocationId())) {
slot.freeSlot();
slot.updateAllocation(allocationId, jobId);
}
break;

第三种情况

这个情况就是正常分配的情况,当前汇报的slot已经分配占有了,且taskManagerSlot的状态是空闲,那么把状态转为已分配,把分配id改为汇报的分配id

1
2
3
4
5
6
7
8
9
10
case FREE:
/**
* 3. 当汇报的Slot已分配占有,且TaskManagerSlot的状态为空闲,TaskManagerSlot从空闲的TaskManagerSlot列表移除
* 将TaskManagerSlot的分配ID设置为汇报的分配ID,然后TaskManagerSlot由空闲状态转换为已分配(ALLOCATED)状态.
* */
// the slot is currently free --> it is stored in freeSlots
freeSlots.remove(slot.getSlotId());
slot.updateAllocation(allocationId, jobId);
taskManagerRegistration.occupySlot();
break;

第一个情况

第一个情况比较复杂,我们先看一下代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* 1. 当汇报的Slot已分配占有,且TaskMangerSlot的状态为待分配(PENDING)时。
* 先查找TaskMangerSlot绑定的待分配Slot申请,再比较该待分配slot申请的allocationId
* 与汇报上来的Slot的allocationId是否相等。
* 如果相等:
* 取消待分配的Slot申请并将其中待分配的Slot申请列表中移除,标记本次分配Slot申请成功,
* 同时TaskMangerSlot状态从待分配状态(PENDING),转换到已分配状态(ALLOCATED)
*
* 不相等:
* 将TaskMangerSlot绑定的待分配slot申请拒绝掉,根据汇报的slot分配ID匹配待分配的申请,
* 将匹配的待分配slot申请取消并从待分配的Slot申请列表中移除,标记本次分配slot申请完成
* 同时TaskMangerSlot状态从待分配变换为已分配。
* */
case PENDING:
// we have a pending slot request --> check whether we have to reject it
PendingSlotRequest pendingSlotRequest = slot.getAssignedSlotRequest();

if(Objects.equals(pendingSlotRequest.getAllocationId(), allocationId)) {
// we can cancel the slot request because it has been fulfilled
cancelPendingSlotRequest(pendingSlotRequest);

// remove the pending slot request, since it has been completed
pendingSlotRequests.remove(pendingSlotRequest.getAllocationId());

slot.completeAllocation(allocationId, jobId);
} else {
// we first have to free the slot in order to set a new allocationId
slot.clearPendingSlotRequest();
// set the allocation id such that the slot won't be considered for the pending slot request
slot.updateAllocation(allocationId, jobId);

// remove the pending request if any as it has been assigned
final PendingSlotRequest actualPendingSlotRequest = pendingSlotRequests.remove(allocationId);

if(actualPendingSlotRequest != null) {
cancelPendingSlotRequest(actualPendingSlotRequest);
}

// this will try to find a new slot for the request
rejectPendingSlotRequest(pendingSlotRequest,
new Exception("Task manager reported slot " + slot.getSlotId() + " being already allocated."));
}

taskManagerRegistration.occupySlot();
break;

首先先说一下第一种情况是什么意思:

第一种情况是当前汇报的slot已经分配,且TaskManager的状态是待分配,也就是说已经获取到了分配的请求,不过分配还没有完成

我们需要首先去找一下现在这个TaskManager绑定的待分配slot申请,再比较一下待分配slot申请的allocatioId和汇报的这个allocationId是不是相等的

这里就出现了两种情况

一种i情况是相等,说明现在待分配的状态和现在汇报的这个是一个请求,那么就取消待分配状态,改为已分配状态,因为不能重复分配

那么第二种情况就是不相等,这个情况下我们默认是以最新的这个汇报的为准

清理当前 Slot 的旧分配关系。

将 Slot 绑定到新的分配请求,并更新其状态为 ALLOCATED

取消与旧分配请求相关的资源或任务。

通知调度器绑定的旧分配请求已经失败,需要重新调度

这个就是变换状态的这个方法

上面的内容是关于resourceManager的申请slot部分的内容了