Flink源码解析(十一)
2025-01-09 15:54:42 # Flink # 源码解析 # TaaskManager总结&TaskExecutor接收来自ResourceManager的Slot请求处理

Flink源码解析(十一)

TaskManger总结

我们首先来看一下TaskManager做了哪些事情

TaskExecutor组件是TaskManager的核心部分,主要负责多个Task(任务)的执行

  • TaskExecutor组件的基础类为TaskExecutor类

  • TaskExecutor类继承RpcEndpoint抽象类,由实现的AkkaRpcService来支持RpcEndpoint的实现

  • TaskExecutor类实现TaskExecutorGateway接口,提供其他组件(如JobMaster、ResourceManager等)RPC的方法。这里一看到GateWay这个网关就一定是用来通信的

  • TaskManagerRunner是各种部署模式下TaskManager的执行入口,负责构建TaskExecutor的网络、I/O管理、内存管理、RPC服务、HA服务以及启动TaskExecutor。我们看TaskManager的启动流程就是看的这个

  • 其中TaskExecutor与ResourceManager、JobMaster的通信时机的情况如下。

    • 与ResourceManager初次建立通信是在ResourceManager向部署模式申请和启动TaskExecutor,TaskExecutor启动后,通过HA服务监听到ResourceManager的首领信息,主动发送消息建立联系。
    • TaskExecutor与JobMaster建立通信的时机是ResourceManager向TaskExecutor申请Slot时,TaskExecutor会根据申请Slot中的作业信息,获取JobMaster的通信地址,主动发送信息建立通信,并将Slot提供给JobMaster。
  • 重点介绍TaskExecutor接收来自ResourceManager请求的处理和将Slot提供给JobMaster的过程。

  • TaskSlot组织结构与状态

    • Slot是划分TaskExecutor资源的基本逻辑单元。TaskExecutor中所有Slot的情况由TaskSlotTable类来组织管理。TaskSlotTable类由以下属性组成,管理所有Slot的情况。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      public class TaskSlotTable implements TimeoutListener<AllocationID> {

      private static final Logger LOG = LoggerFactory.getLogger(TaskSlotTable.class);

      /**
      * 负责将处于已分配状态(ALLOCATED)的Slot加入超时检测服务。TaskExecutor将Slot提供给JobMaster,
      * 一旦Slot提供JobMaster成功,将移除超时检测,否则最终会超时。超时会调用freeSlot方法将该Slot
      * 置为空闲状态(FREE)。
      * */
      private final TimerService<AllocationID> timerService;

      /**
      * 维护所有TaskSlot的列表,供Slot请求通过Slot下标来占有相应的Slot。
      * */
      private final List<TaskSlot> taskSlots;

      /**
      * 记录分配ID与TaskSlot的映射情况,供通过分配ID查询TaskSlot的情况
      * */
      private final Map<AllocationID, TaskSlot> allocationIDTaskSlotMap;

      /** 记录执行任务的唯一确定ID与任务和TaskSlot的映射,
      * 供通过执行任务的唯一确定ID查询任务绑定TaskSlot的情况。 */
      private final Map<ExecutionAttemptID, TaskSlotMapping> taskSlotMappings;

      /**
      * 记录作业占有Slot的情况。
      * */
      private final Map<JobID, Set<AllocationID>> slotsPerJob;

      /**
      * 负责Slot的释放与超时的逻辑。
      * */
      private SlotActions slotActions;

      /**
      * 标记TaskSlotTable是否已经启动。
      * */
      private boolean started;
      • timerService:负责将处于已分配状态(ALLOCATED)的Slot加入超时检测服务。TaskExecutor将Slot提供给JobMaster,一旦Slot提供JobMaster成功,将移除超时检测,否则最终会超时。超时会调用freeSlot方法将该Slot置为空闲状态(FREE)。
      • taskSlots:维护所有TaskSlot的列表,供Slot请求通过Slot下标来占有相应的Slot。
      • allocationIDTaskSlotMap:记录分配ID与TaskSlot的映射情况,供通过分配ID查询TaskSlot的情况。
      • taskSlotMappings:记录执行任务的唯一确定ID与任务和TaskSlot的映射,供通过执行任务的唯一确定ID查询任务绑定TaskSlot的情况。
      • slotsPerJob:记录作业占有Slot的情况。
      • slotActions:负责Slot的释放与超时的逻辑。
      • started:标记TaskSlotTable是否已经启动。
      • 这里也就是表明了TaskSlotTable这个类是管理所有TaskExcecutor中的slot情况的,里面的属性就是
  • 我们再看一下TaskExecutor中Slot的分配情况

    • 这里假设job1在taskSlot-0编号是54523sadfsdaf487里面运行了三个任务,job2在taskSlot-1里面也运行了三个任务,这个就是TaskExecutor对于slot的分配i情况

我们有时候说的slot有时候说的taskSlot都是什么意思

TaskExecutor的里面的单个Slot都是以TaskSlot类来组织的,在TaskSlot类里面,记录了slot的信息包括:

- Slot在TaskSlotTable位置(下标)
- 分配情况(包括分配占有ID,分配占有Slot的作业ID以及分配在Slot上的运行任务列表)
- TaskSlot的状态。

大概是这个关系,首先有一个TaskExecutor,然后这个里面有TaskSlotTable用来做管理的,然后TaskSlotTable里面记录了维护的TaskSlot列表,然后这个每一个TaskSlot列表也就对应着每一个Slot,因为这每一个TaskSlot就相当于slot的元数据,记录的是每一个slot的信息

我们再看一下TaskSlot的状态分为几种:

1
2
3
4
5
6
enum TaskSlotState {
ACTIVE, // 活跃状态,表示TaskSlot已经提供给某个作业对用的JobMaster
ALLOCATED, //已分配状态,但是还未成功提供给JobMaster。对于来自ResourceManager申请,会先将对应的TaskSlot标记为ALLOCATED状态。
RELEASING, //Slot已经被调用释放,但其上还存在运作中的任务,等待所有的任务被移除变成FREE的状态
FREE //TaskSlot的初始状态为空闲.
}

TaskExecutor接收来自ResourceManager的Slot请求

TaskExecutor接收来自ResourceManager的slot请求的入口在TaskExecutor的requestSlot()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
@Override
public CompletableFuture<Acknowledge> requestSlot(
final SlotID slotId,
final JobID jobId,
final AllocationID allocationId,
final String targetAddress,
final ResourceManagerId resourceManagerId,
final Time timeout) {
// TODO: Filter invalid requests from the resource manager by using the instance/registration Id

log.info("Receive slot request {} for job {} from resource manager with leader id {}.",
allocationId, jobId, resourceManagerId);

try {
/**
* 检查TaskExecutor是否于ResourceManager建立了连接,如果没有,则返回TaskManager is not connected to the resource manager
* */
if (!isConnectedToResourceManager(resourceManagerId)) {
final String message = String.format("TaskManager is not connected to the resource manager %s.", resourceManagerId);
log.debug(message);
throw new TaskManagerException(message);
}
/**
* 分配占有Slot,
* 分配占有之前判断申请slot的请求种的slotId对应的Slot是否可以分配占有
* 判断依据是Slot是否为空闲(处于FREE状态)
*
* */
if (taskSlotTable.isSlotFree(slotId.getSlotNumber())) {
//如果是空闲的,则调用allocateSlot方法占用该slot
if (taskSlotTable.allocateSlot(slotId.getSlotNumber(), jobId, allocationId, taskManagerConfiguration.getTimeout())) {
log.info("Allocated slot for {}.", allocationId);
} else {
log.info("Could not allocate slot for {}.", allocationId);
//占有失败了,返回Could not allocate slot
throw new SlotAllocationException("Could not allocate slot.");
}
/**
* 对于对应的Slot不是空闲的情况,判断slotId对应的slot是否已经被本次的
* slot申请占有(重复申请请求的情况)
* 如果不是 则返回Slot已经被占用的异常。
* */
} else if (!taskSlotTable.isAllocated(slotId.getSlotNumber(), jobId, allocationId)) {
final String message = "The slot " + slotId + " has already been allocated for a different job.";

log.info(message);

final AllocationID allocationID = taskSlotTable.getCurrentAllocation(slotId.getSlotNumber());
throw new SlotOccupiedException(message, allocationID, taskSlotTable.getOwningJob(allocationID));
}

/**
* 将Slot提供给JobMaster
* 断TaskExecutor是否已于本次Slot申请对应的JobMaster建立连接
* jobManagerTable中存在本次Slot申请对应的jobId
* */
if (jobManagerTable.contains(jobId)) {
//如果已经建立,直接调用offerSlotsToJobManager将slot提供给JobMaster的逻辑
offerSlotsToJobManager(jobId);
} else {
try {
/**
*否则通过jobLeaderService监听对应的JobMaster的Leader信息后建立连接
* */
jobLeaderService.addJob(jobId, targetAddress);
} catch (Exception e) {
// 监听JobMaster的Leader信息失败的情况
try {
//先释放slotId对应的slot
taskSlotTable.freeSlot(allocationId);
} catch (SlotNotFoundException slotNotFoundException) {
// slot no longer existent, this should actually never happen, because we've
// just allocated the slot. So let's fail hard in this case!
/**
* 如果释放slot异常,则TaskExecutor的对应进程会异常退出
* onFatalError的最终逻辑就是 System.exit方法
* */
onFatalError(slotNotFoundException);
}

/**
* 如果释放Slot未产生异常,就会接着释放本次申请Slot分配allocationId对应的本地State(针对开启Local Recovery的情况)
* */
localStateStoresManager.releaseLocalStateForAllocationId(allocationId);

// 最后对slotId对应的Slot进行死否空闲的检查
if (!taskSlotTable.isSlotFree(slotId.getSlotNumber())) {
/**
* 如果不空闲,执行TaskExecutor的对应进程会异常退出
* */
onFatalError(new Exception("Could not free slot " + slotId));
}
//返回作业无法添加到job leader service的异常
throw new SlotAllocationException("Could not add job to job leader service.", e);
}
}
} catch (TaskManagerException taskManagerException) {
return FutureUtils.completedExceptionally(taskManagerException);
}

return CompletableFuture.completedFuture(Acknowledge.get());
}

这里首先是判断了TaskExecutor和resourceManager是否建立了连接

通过taskSlotTable也就是这个放slot信息的看一下这个slot是不是空闲的

如果是空闲的话,就调用allocateSlot占用这个空闲的slot

如果不是空闲的话就判断一下现在这个slotId对应的slot是不是已经被本次的这个申请占用了

如果是被本次的申请申请占用了就不管,这是因为重复占用了

如果不是被本次的申请占用了就抛出异常

上面就是占用了这个空闲的slot

然后下面是把这个提供给jobMaster

判断是否和jobMaster连接上,

如果连接上直接调offerSlotsToJobManager将slot提供给JobMaster

如果没有连接上就通过jobLeaderService来监听JobMaster的leader信息后连接

如果监听也失败了那就释放调这个slotId对应的slot

如果释放slot也失败了….额,如果啥啥都失败了那就调用onFatalError(slotNotFoundException);直接退出这个进程吧

如果释放slotId对应的slot没有问题,就继续释放本次申请slot分配的一些状态,就是这个slot分配的这些资源都释放掉

再做一个对slotId对应的slot是不是空闲的判断,如果还不是空闲,说明前面关的一大堆没关掉,那就直接退出吧


以上就是TaskExecutor接收来自ResourceManager请求的过程

下面我们仔细的看一下各个内容之间的细节的介绍

我们先来看一下这个allocateSlot方法,这个方法是占用空闲的slot的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* allocateSlot处理逻辑为:
* 判断本次slot申请对应的分配ID是否已经分配过slot,如果已经分配过,则返回分配失败的标记(false)
*/
public boolean allocateSlot(int index, JobID jobId, AllocationID allocationId, Time slotTimeout) {
//TaskSlotTable是否已经启动
checkInit();
//取出taskSlot
TaskSlot taskSlot = taskSlots.get(index);

//如果未分配过,调用taskSlot.allocate(jobId, allocationId),为本次slot申请分配slot
boolean result = taskSlot.allocate(jobId, allocationId);

if (result) {
/**
* 分配占用slot成功时:
* 记录本次Slot申请的allocationId与TaskSlot,JobId与TaskSlot的映射关系
* allocationIDTaskSlotMap.put(allocationId, taskSlot);
* Set<AllocationID> slots = slotsPerJob.get(jobId);
*
*
* */
allocationIDTaskSlotMap.put(allocationId, taskSlot);

// register a timeout for this slot since it's in state allocated
/**
* 并通过timerService注册本次分配对应的Slot从已分配状态(Allocated)转换到活跃状态的超时逻辑。
* 在超时的情况下会将本次分配的taskSlot转换为空闲状态(FREE)。
* */
timerService.registerTimeout(allocationId, slotTimeout.getSize(), slotTimeout.getUnit());


Set<AllocationID> slots = slotsPerJob.get(jobId);

if (slots == null) {
slots = new HashSet<>(4);
slotsPerJob.put(jobId, slots);
}

slots.add(allocationId);
}
return result;
}

这里首先是取出taskSlot

然后调用allocate方法,为这次申请分配slot

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public boolean allocate(JobID newJobId, AllocationID newAllocationId) {
/**
* 对于slot为空闲状态的情况,将状态装换为已分配状态,
* 记录jobId和allocationId,并且返回true
* */
if (TaskSlotState.FREE == state) {
// sanity checks
Preconditions.checkState(allocationId == null);
Preconditions.checkState(jobId == null);

this.jobId = Preconditions.checkNotNull(newJobId);
this.allocationId = Preconditions.checkNotNull(newAllocationId);

state = TaskSlotState.ALLOCATED;

return true;
} else if (TaskSlotState.ALLOCATED == state || TaskSlotState.ACTIVE == state) {
Preconditions.checkNotNull(newJobId);
Preconditions.checkNotNull(newAllocationId);
/**
* 对于slot为ALLOCATED和ACTIVE的情况,则检测本次申请中的作业id和分配id与
* Slot记录的作业id和分配id是否一致,将结果返回。
* */
return newJobId.equals(jobId) && newAllocationId.equals(allocationId);
} else {
//当Slot处于其他状态时,返回分配失败失败,false
return false;
}
}

当现在的slot是空闲状态,将状态改为已分配,然后返回true

如果现在的slot是allocated或者active状态,就监测一下本次申请的id和现在已经分配的id是否一致

我们转换为slot的状态后就记录一下映射关系,然后记录了一个超时逻辑,当slot在一定时间内一直没有从已分配转换为活跃的话,就会将状态转为Free


以上的内容是关于占用空闲slot的逻辑

下面我们再看一下将slot提供给jobMaster的逻辑offerSlotsToJobManager方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
* 将Slot提供给JobMaster的过程
* */
private void offerSlotsToJobManager(final JobID jobId) {
final JobManagerConnection jobManagerConnection = jobManagerTable.get(jobId);
//先检查TaskExecutor是否与作业对应的JobMater建立连接
if (jobManagerConnection == null) {
log.debug("There is no job manager connection to the leader of job {}.", jobId);
} else {
//建立连接
if (taskSlotTable.hasAllocatedSlots(jobId)) {
log.info("Offer reserved slots to the leader of job {}.", jobId);

final JobMasterGateway jobMasterGateway = jobManagerConnection.getJobManagerGateway();
//则从taskSlotTable中筛选出已经被该作业占有但不处于活跃状态的TaskSlot
final Iterator<TaskSlot> reservedSlotsIterator = taskSlotTable.getAllocatedSlots(jobId);
final JobMasterId jobMasterId = jobManagerConnection.getJobMasterId();

final Collection<SlotOffer> reservedSlots = new HashSet<>(2);
//并从reservedSlotsIterator中提取一些信息来组装成Slot提供信息列表(allocationId,slotIndex,resourceProfile)
while (reservedSlotsIterator.hasNext()) {
SlotOffer offer = reservedSlotsIterator.next().generateSlotOffer();
reservedSlots.add(offer);
}

/**
* 然后通过向JobMaster发送请求,将reservedSlots提供给JobMaster
* 请求的返回提供Slot情况的消息由handleAcceptedSlotOffers来处理。
* */
CompletableFuture<Collection<SlotOffer>> acceptedSlotsFuture = jobMasterGateway.offerSlots(
getResourceID(),
reservedSlots,
taskManagerConfiguration.getTimeout());

acceptedSlotsFuture.whenCompleteAsync(
handleAcceptedSlotOffers(jobId, jobMasterGateway, jobMasterId, reservedSlots),
getMainThreadExecutor());
} else {
log.debug("There are no unassigned slots for the job {}.", jobId);
}
}
}

这里的内容是判断TaskExecutor和作业对应的JobMaster是否建立了连接

如果没有建立连接,直接打印错误日志,如果没有建立连接的话,就会走下面的释放资源的逻辑,我们再粘贴一下这里的代码,详细的这里释放资源的讲解已经在上面写过了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
 else {
try {
/**
*否则通过jobLeaderService监听对应的JobMaster的Leader信息后建立连接
* */
jobLeaderService.addJob(jobId, targetAddress);
} catch (Exception e) {
// 监听JobMaster的Leader信息失败的情况
try {
//先释放slotId对应的slot
taskSlotTable.freeSlot(allocationId);
} catch (SlotNotFoundException slotNotFoundException) {
// slot no longer existent, this should actually never happen, because we've
// just allocated the slot. So let's fail hard in this case!
/**
* 如果释放slot异常,则TaskExecutor的对应进程会异常退出
* onFatalError的最终逻辑就是 System.exit方法
* */
onFatalError(slotNotFoundException);
}

/**
* 如果释放Slot未产生异常,就会接着释放本次申请Slot分配allocationId对应的本地State(针对开启Local Recovery的情况)
* */
localStateStoresManager.releaseLocalStateForAllocationId(allocationId);

// 最后对slotId对应的Slot进行是否空闲的检查
if (!taskSlotTable.isSlotFree(slotId.getSlotNumber())) {
/**
* 如果不空闲,执行TaskExecutor的对应进程会异常退出
* */
onFatalError(new Exception("Could not free slot " + slotId));
}
//返回作业无法添加到job leader service的异常
throw new SlotAllocationException("Could not add job to job leader service.", e);
}
}

我们仔细看一下这个释放资源的freeSlot方法先

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/**
* 处理逻辑:
* 检测分配ID是否占用Slot,如果不占有,则不需要释放,返回throw new SlotNotFoundException(allocationId);
* 如果占有,则进行下一步操作
*/
public int freeSlot(AllocationID allocationId, Throwable cause) throws SlotNotFoundException {
checkInit();

TaskSlot taskSlot = getTaskSlot(allocationId);

if (taskSlot != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Free slot {}.", taskSlot, cause);
} else {
LOG.info("Free slot {}.", taskSlot);
}

final JobID jobId = taskSlot.getJobId();
//判断该Slot上有没有运行的任务,如果没有,则将Slot标记为空闲状态并返回
if (taskSlot.markFree()) {
// remove the allocation id to task slot mapping
allocationIDTaskSlotMap.remove(allocationId);

// unregister a potential timeout
timerService.unregisterTimeout(allocationId);

Set<AllocationID> slots = slotsPerJob.get(jobId);

if (slots == null) {
throw new IllegalStateException("There are no more slots allocated for the job " + jobId +
". This indicates a programming bug.");
}

slots.remove(allocationId);

if (slots.isEmpty()) {
slotsPerJob.remove(jobId);
}

return taskSlot.getIndex();
} else {
/**
* 如果有运行任务,则将Slot设置为Releasing状态
* 同时将运行的任务以其所在的slot资源被释放为转换失败状态(failed)
* 等待任务失败后将Slot设置为空闲状态
*
* 释放Slot的时机不单有Slot从分配状态转换到活跃状态超时,还有Slot的所有任务进入终态(Finished,Canceled,Failed)
* */
taskSlot.markReleasing();

Iterator<Task> taskIterator = taskSlot.getTasks();

while (taskIterator.hasNext()) {
taskIterator.next().failExternally(cause);
}

return -1;
}
} else {
throw new SlotNotFoundException(allocationId);
}
}

这里释放资源首先是首先判断这个slot上有没有运行的任务,如果没有就把这个slot标记为free,删除映射关系,取消前面注册的超时事件,如果这个槽里面没有对应的任务就抛出异常,从slots删除这个allocationId,如果这个slot为空,说明这个任务槽已经没有任务分配了,就移除这个任务槽的分配记录

这里的如果这个slot上面没有运行的任务做的事情其实也就是将之前保存的一些对应关系和内容删除掉

如果这个槽上有任务在执行

那么就将这个槽标记为正在释放markReleasing()

然后把这个槽里面的任务逐个标记失败


上面的这个就是释放slotId的逻辑,我们再回到offerSlotsToJobManager这个方法里面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
else {
//建立连接
if (taskSlotTable.hasAllocatedSlots(jobId)) {
log.info("Offer reserved slots to the leader of job {}.", jobId);

final JobMasterGateway jobMasterGateway = jobManagerConnection.getJobManagerGateway();
//则从taskSlotTable中筛选出已经被该作业占有但不处于活跃状态的TaskSlot
final Iterator<TaskSlot> reservedSlotsIterator = taskSlotTable.getAllocatedSlots(jobId);
final JobMasterId jobMasterId = jobManagerConnection.getJobMasterId();

final Collection<SlotOffer> reservedSlots = new HashSet<>(2);
//并从reservedSlotsIterator中提取一些信息来组装成Slot提供信息列表(allocationId,slotIndex,resourceProfile)
while (reservedSlotsIterator.hasNext()) {
SlotOffer offer = reservedSlotsIterator.next().generateSlotOffer();
reservedSlots.add(offer);
}

/**
* 然后通过向JobMaster发送请求,将reservedSlots提供给JobMaster
* 请求的返回提供Slot情况的消息由handleAcceptedSlotOffers来处理。
* */
CompletableFuture<Collection<SlotOffer>> acceptedSlotsFuture = jobMasterGateway.offerSlots(
getResourceID(),
reservedSlots,
taskManagerConfiguration.getTimeout());

acceptedSlotsFuture.whenCompleteAsync(
handleAcceptedSlotOffers(jobId, jobMasterGateway, jobMasterId, reservedSlots),
getMainThreadExecutor());
} else {
log.debug("There are no unassigned slots for the job {}.", jobId);
}
}

这里将slot提供给jobManager如果检查taskExecutor和jobMaster能建立起连接,就会走上面的内容

首先这里会先查看taskSlotTable中是否有这个jobId分配的taskSlottaskSlotTable.hasAllocatedSlots(jobId)

如果没有直接报错,如果有就走到下面的步骤

下面的步骤是,首先获取网关

然后获取已经被现在的这个job占有了,但是现在是不活跃状态的TaskSlot

然后循环这些不活跃的TaskSlot,然后每一个都调用generateSlotOffer转为一个SlotOffer,然后放在一个HashSet中

generateSlotOffer这个方法里面就是判断一下当前状态到底是不是不活跃的状态

然后我们通过网关,向JobMaster发送请求,把前面的这些不活跃的taskslot转完的这个hashset发给jobMaster,我们会获得一个acceptedSlotsFuture这个是返回的情况

然后把这个返回情况交给handleAcceptedSlotOffers来处理


下面我们重点再看一下这个handleAcceptedSlotOffers方法中TaskExecutor是怎么向JobMaster提供Slot的返回信息处理的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
/**
* TaskExecutor向JobMaster提供Slot的返回信息处理
* */
@Nonnull
private BiConsumer<Iterable<SlotOffer>, Throwable> handleAcceptedSlotOffers(JobID jobId, JobMasterGateway jobMasterGateway, JobMasterId jobMasterId, Collection<SlotOffer> offeredSlots) {
return (Iterable<SlotOffer> acceptedSlots, Throwable throwable) -> {
if (throwable != null) {
//向obMaster提供Slot请求超时,会重新调用offerSlotsToJobManager
if (throwable instanceof TimeoutException) {
log.info("Slot offering to JobManager did not finish in time. Retrying the slot offering.");
// We ran into a timeout. Try again.
offerSlotsToJobManager(jobId);
} else {
log.warn("Slot offering to JobManager failed. Freeing the slots " +
"and returning them to the ResourceManager.", throwable);

/**
* 对于返回的异常非超时的异常,即JobMaster返回的是未识别的TaskManager异常
* 将SlotOffer列表对应的TaskSlot释放掉
* */
for (SlotOffer reservedSlot: offeredSlots) {
freeSlotInternal(reservedSlot.getAllocationId(), throwable);
}
}
} else {
// 返回的结果为JobMaster接收的SlotOffer列表时,检测TaskExecutor是否与JobMaster连接
if (isJobManagerConnectionValid(jobId, jobMasterId)) {
// mark accepted slots active
for (SlotOffer acceptedSlot : acceptedSlots) {
try {
/**
* 会将返回的SlotOffer列表(提供成功的Slot列表)中对应的TaskSlot由原来的已分配状态(allocated)标记为活跃状态(Active)
* 并移除在timerService中主注册的超时检测逻辑,调用markSlotActive方法.
* */
if (!taskSlotTable.markSlotActive(acceptedSlot.getAllocationId())) {
// the slot is either free or releasing at the moment
final String message = "Could not mark slot " + jobId + " active.";
log.debug(message);
jobMasterGateway.failSlot(
getResourceID(),
acceptedSlot.getAllocationId(),
new FlinkException(message));
}
} catch (SlotNotFoundException e) {
//对于转换失败的到活跃状态的Slot,通过JobMaster调用failSlot方法来以失败的方式释放。
final String message = "Could not mark slot " + jobId + " active.";
jobMasterGateway.failSlot(
getResourceID(),
acceptedSlot.getAllocationId(),
new FlinkException(message));
}

offeredSlots.remove(acceptedSlot);
}

final Exception e = new Exception("The slot was rejected by the JobManager.");

//对于未成功提供给JobMaster的Slot,通过freeSlotInternal方法释放
for (SlotOffer rejectedSlot : offeredSlots) {
freeSlotInternal(rejectedSlot.getAllocationId(), e);
}
} else {
// discard the response since there is a new leader for the job
log.debug("Discard offer slot response since there is a new leader " +
"for the job {}.", jobId);
}
}
};
}

这里的内容比较多,我们慢慢看一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
if (throwable != null) {
//向obMaster提供Slot请求超时,会重新调用offerSlotsToJobManager
if (throwable instanceof TimeoutException) {
log.info("Slot offering to JobManager did not finish in time. Retrying the slot offering.");
// We ran into a timeout. Try again.
offerSlotsToJobManager(jobId);
} else {
log.warn("Slot offering to JobManager failed. Freeing the slots " +
"and returning them to the ResourceManager.", throwable);

/**
* 对于返回的异常非超时的异常,即JobMaster返回的是未识别的TaskManager异常
* 将SlotOffer列表对应的TaskSlot释放掉
* */
for (SlotOffer reservedSlot: offeredSlots) {
freeSlotInternal(reservedSlot.getAllocationId(), throwable);
}
}
}

这里的内容首先是判断一下刚才把不活跃的哪些slot列表发给了jobMaster后有没有异常

如果是超时异常,我们调用offerSlotsToJobManager方法

如果是其他异常,我们执行

1
2
3
for (SlotOffer reservedSlot: offeredSlots) {
freeSlotInternal(reservedSlot.getAllocationId(), throwable);
}

把这个slot从不活跃的slot这个列表中释放掉

我们先看一下这个如果是超时异常是怎么处理,这里调用的是offerSlotsToJobManager方法,也就是如果TaskExecutor向JobMaster提供slot请求超时了,那么就重新走一下将Slot提供给JobMaster的过程的这个方法,这个方法的详情可以看一下上面

如果没有异常的话我们走到下面的内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// 返回的结果为JobMaster接收的SlotOffer列表时,检测TaskExecutor是否与JobMaster连接
if (isJobManagerConnectionValid(jobId, jobMasterId)) {
// mark accepted slots active
for (SlotOffer acceptedSlot : acceptedSlots) {
try {
/**
* 会将返回的SlotOffer列表(提供成功的Slot列表)中对应的TaskSlot由原来的已分配状态(allocated)标记为活跃状态(Active)
* 并移除在timerService中主注册的超时检测逻辑,调用markSlotActive方法.
* */
if (!taskSlotTable.markSlotActive(acceptedSlot.getAllocationId())) {
// the slot is either free or releasing at the moment
final String message = "Could not mark slot " + jobId + " active.";
log.debug(message);
jobMasterGateway.failSlot(
getResourceID(),
acceptedSlot.getAllocationId(),
new FlinkException(message));
}
} catch (SlotNotFoundException e) {
//对于转换失败的到活跃状态的Slot,通过JobMaster调用failSlot方法来以失败的方式释放。
final String message = "Could not mark slot " + jobId + " active.";
jobMasterGateway.failSlot(
getResourceID(),
acceptedSlot.getAllocationId(),
new FlinkException(message));
}

offeredSlots.remove(acceptedSlot);
}000000000000000-ption e = new Exception("The slot was rejected by the JobManager.");

//对于未成功提供给JobMaster的Slot,通过freeSlotInternal方法释放
for (SlotOffer rejectedSlot : offeredSlots) {
freeSlotInternal(rejectedSlot.getAllocationId(), e);
}
} else {
// discard the response since there is a new leader for the job
log.debug("Discard offer slot response since there is a new leader " +
"for the job {}.", jobId);
}

这里首先会判断一下taskExecutor和JobMaster的连接还连着吗,如果没有连着就直接打印错误,如果在连接就往下执行

循环返回的slotOffer列表,调用markSlotActive方法,将TaskSlot从原来的已分配状态改为活跃状态,如果转换失败的就把转换失败的消息使用网关发送给jobMaster

这里我们看一下这个转换的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public boolean markSlotActive(AllocationID allocationId) throws SlotNotFoundException {
checkInit();

TaskSlot taskSlot = getTaskSlot(allocationId);

if (taskSlot != null) {
if (taskSlot.markActive()) {
// unregister a potential timeout
LOG.info("Activate slot {}.", allocationId);

timerService.unregisterTimeout(allocationId);

return true;
} else {
return false;
}
} else {
throw new SlotNotFoundException(allocationId);
}
}


--------------------------------------------------------------------------------

public boolean markActive() {
if (TaskSlotState.ALLOCATED == state || TaskSlotState.ACTIVE == state) {
state = TaskSlotState.ACTIVE;

return true;
} else {
return false;
}
}


-------------------------------------------------------------------------------

public void unregisterTimeout(K key) {
Timeout<K> timeout = timeouts.remove(key);

if (timeout != null) {
timeout.cancel();
}
}

这里的代码是首先将taskSlot的状态转换成active活跃状态,然后移除注册时的超时监测

再然后从 offeredSlots 中移除已成功分配的 slot。

1
offeredSlots.remove(acceptedSlot);

最后对于那些列表中还剩余的slot就是没有成功提供给JobMaster的slot了,会调用freeSlotInternal方法释放掉

我们这里发送给了jobMaster后,jobMaster具体做了哪些操作,需要看一下==Flink源码解析(十)==里面jobMaster怎么处理接收的TaskExecutor中的内容的

上面就是我们对于TaskManager的接收到来自ResourceManager的Slot请求的内容了