Flink源码解析(十二)
2025-01-10 17:46:38 # Flink # 源码解析 # TaskManager启动流程

Flink源码解析(十二)

TaskManagerRunner的启动流程

关于前面taskExecutor怎么接收resourceManager的请求怎么给jobMaster的过程已经说完了

我们现在再看一下这里的TaskManagerRunner的启动流程

我们这里看的是TaskManagerRunner的main方法,这个就是TaskManager的启动流程入口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* TaskManager 启动入口
*/
public static void main(String[] args) throws Exception {
// startup checks and logging
EnvironmentInformation.logEnvironmentInfo(LOG, "TaskManager", args);
SignalHandler.register(LOG);
JvmShutdownSafeguard.installAsShutdownHook(LOG);
long maxOpenFileHandles = EnvironmentInformation.getOpenFileHandlesLimit();

if(maxOpenFileHandles != -1L) {
LOG.info("Maximum number of open file descriptors is {}.", maxOpenFileHandles);
} else {
LOG.info("Cannot determine the maximum number of open file descriptors");
}

/**
* 第一步:runTaskManagerSecurely
* args:初始化参数
* ResourceID.generate:唯一编号
*/
runTaskManagerSecurely(args, ResourceID.generate());
}

这里的前面都是一些监测,这些先不看,我们看到了这个runTaskManagerSecurely,看这个名字应该是和安全有关

这里给了一个args一个初始化参数,我们看一下这里的第二个参数是什么 ResourceID.generate()

1
2
3
4
5
6
7
8
9
10
11
public static ResourceID generate() {
return new ResourceID(new AbstractID().toString());
}

-----------------------------------------------

public AbstractID() {
this.lowerPart = RND.nextLong();
this.upperPart = RND.nextLong();
}

这里的第二个参数,其实是一个随机值,调的random方法,这里也就是唯一编号

我们再看一下这个runTaskManagerSecurely方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static void runTaskManagerSecurely(String[] args, ResourceID resourceID) {
try {

/**
* 加载配置参数
* 解析 main方法参数 和 flink-conf.yaml 配置信息
*/
Configuration configuration = loadConfiguration(args);

/**
* 启动TaskManager
* 进入runTaskManagerSecurely
*/
runTaskManagerSecurely(configuration, resourceID);

} catch(Throwable t) {
final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
LOG.error("TaskManager initialization failed.", strippedThrowable);
System.exit(STARTUP_FAILURE_RETURN_CODE);
}
}

这里首先是加载了一个配置参数,也就是解析了fink-conf.yaml配置信息,然后调用了runTaskManagerSecurely方法,我们再看一下这个方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public static void runTaskManagerSecurely(Configuration configuration, ResourceID resourceID) throws Exception {
replaceGracefulExitWithHaltIfConfigured(configuration);

/**
* 加载插件
*/
final PluginManager pluginManager = PluginUtils.createPluginManagerFromRootFolder(configuration);

// 初始化文件系统
FileSystem.initialize(configuration, pluginManager);

//加载安全方面的配置
SecurityUtils.install(new SecurityConfiguration(configuration));

SecurityUtils.getInstalledContext().runSecured(

//启动TaskManager
() -> {
// 进入TaskManager
runTaskManager(configuration, resourceID, pluginManager);
return null;
});
}

首先replaceGracefulExitWithHaltIfConfigured方法的意思是如果相关配置启用了“直接停止”,在调用该方法时,程序将跳过正常的优雅退出流程,直接强制终止。

然后加载了插件,这个加载插件我们之前在resourceManager里面也看过这个插件的内容

然后初始化了文件系统,加载了安全方面的配置

之后是启动了一个线程,这个线程启动了TaskManager,调用了runTaskManager方法

1
2
3
4
5
6
7
8
9
10
11
12
13
public static void runTaskManager(Configuration configuration, ResourceID resourceId, PluginManager pluginManager) throws Exception {

/**
* 构建TaskManager实例
* 进入TaskManagerRunner
*/
final TaskManagerRunner taskManagerRunner = new TaskManagerRunner(configuration, resourceId, pluginManager);

/**
* 确认启动成功
*/
taskManagerRunner.start();
}

在这里我们构建了一个TaskManagerRunner,然后进行了启动,我们先看一下这个TaskManagerRuner的内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
public TaskManagerRunner(Configuration configuration, ResourceID resourceId, PluginManager pluginManager) throws Exception {
this.configuration = checkNotNull(configuration);
this.resourceId = checkNotNull(resourceId);

timeout = AkkaUtils.getTimeoutAsTime(configuration);

/**
* 基于CPU核数初始化进行回调处理的线程池
*/
this.executor = java.util.concurrent.Executors
.newScheduledThreadPool(Hardware.getNumberCPUCores(), new ExecutorThreadFactory("taskmanager-future"));

/**
* 初始化HA服务
* ZooKeeperHaServices highAvailabilityServices
* 进入createHighAvailabilityServices
*/
highAvailabilityServices = HighAvailabilityServicesUtils
.createHighAvailabilityServices(configuration, executor, HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);

/**
* 初始化 RpcService
*/
rpcService = createRpcService(configuration, highAvailabilityServices);

/**
* 初始化 HeartbeatServices
* TaskExecutor与JobMaster中的Slot状态通过JobMaster向TaskExecutor定时发送的心跳消息里的AllocatedSlotReport来同步,
* 而TaskExecutor与ResourceManager中的Slot状态通过TaskExecutor向ResourceManager定时发送的心跳消息里的SlotReport来同步。
* Slot的状态传递是基于心跳传递的
*/
HeartbeatServices heartbeatServices = HeartbeatServices.fromConfiguration(configuration);

/**
* Flink 集群监控
*/
metricRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration),
ReporterSetup.fromConfiguration(configuration, pluginManager));
final RpcService metricQueryServiceRpcService = MetricUtils.startRemoteMetricsRpcService(configuration, rpcService.getAddress());
metricRegistry.startQueryService(metricQueryServiceRpcService, resourceId);

/**
*
* 初始化 BlobCacheService
* 用来定时执行检查,删除过期的Job的资源文件和对大文件的处理
* 主节启动:BlobServer
* 从节点:BlobCacheService
*/
blobCacheService = new BlobCacheService(configuration, highAvailabilityServices.createBlobStore(), null);

final ExternalResourceInfoProvider externalResourceInfoProvider = ExternalResourceUtils
.createStaticExternalResourceInfoProvider(ExternalResourceUtils.getExternalResourceAmountMap(configuration),
ExternalResourceUtils.externalResourceDriversFromConfig(configuration, pluginManager));

/**
*
* 初始化TaskExecutor
* 进入startTaskManager
*/
taskManager = startTaskManager(this.configuration,
this.resourceId,
rpcService,
highAvailabilityServices,
heartbeatServices,
metricRegistry,
blobCacheService,
false,
externalResourceInfoProvider,
this);

this.terminationFuture = new CompletableFuture<>();
this.shutdown = false;

MemoryLogger.startIfConfigured(LOG, configuration, terminationFuture);
}

这里的代码稍微有点长,我们从头慢慢看看

刚开始是获取了一个超时事件,这里就是这个timeout

然后根据CPU的核数创建了一个线程池,用Handware,这是一些硬件信息,获取CPU核数

初始化了HA服务,这里可以稍微看一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public static HighAvailabilityServices createHighAvailabilityServices(Configuration configuration, Executor executor,
AddressResolution addressResolution) throws Exception {

/**
* 加载配置
* 在fink-conf.yaml 配置文件中,我们会去配置:high-availability = zookeeper
* 查看fromConfig
* */
HighAvailabilityMode highAvailabilityMode = HighAvailabilityMode.fromConfig(configuration);

switch(highAvailabilityMode) {

/**
* 根据HA模式来进行启动
*/
case NONE:
final Tuple2<String, Integer> hostnamePort = getJobManagerAddress(configuration);

final String resourceManagerRpcUrl = AkkaRpcServiceUtils
.getRpcUrl(hostnamePort.f0, hostnamePort.f1, AkkaRpcServiceUtils.createWildcardName(ResourceManager.RESOURCE_MANAGER_NAME),
addressResolution, configuration);
final String dispatcherRpcUrl = AkkaRpcServiceUtils
.getRpcUrl(hostnamePort.f0, hostnamePort.f1, AkkaRpcServiceUtils.createWildcardName(Dispatcher.DISPATCHER_NAME),
addressResolution, configuration);
final String webMonitorAddress = getWebMonitorAddress(configuration, addressResolution);

// 如果不是高可用:StandaloneHaServices
return new StandaloneHaServices(resourceManagerRpcUrl, dispatcherRpcUrl, webMonitorAddress);
case ZOOKEEPER:
BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(configuration);
/**
* 创建ZooKeeperHaServices服务
* */
return new ZooKeeperHaServices(ZooKeeperUtils.startCuratorFramework(configuration), executor, configuration, blobStoreService);
case FACTORY_CLASS:
return createCustomHAServices(configuration, executor);
default:
throw new Exception("Recovery mode " + highAvailabilityMode + " is not supported.");
}
}

这里是首先加载配置,然后根据HA模式启动,我们一般使用的就是ZK的HA模式

1
2
3
4
5
6
case ZOOKEEPER:
BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(configuration);
/**
* 创建ZooKeeperHaServices服务
* */
return new ZooKeeperHaServices(ZooKeeperUtils.startCuratorFramework(configuration), executor, configuration, blobStoreService);

这里首先创建一个Blob的Service,这个是创建了大文件处理的内容,和文件存储使用和处理的是Blob,这里我们之前也看过

在这之后创建了一个ZookeeperHaServices,这个就是初始化HA服务

我们返回到创建TaskManagerRunner这里往下面继续看

下面是初始化了一个RpcService,这个是用于各个节点之间进行通信的内容

然后初始化心跳检测,这个初始化心跳很重要,这里好好看一下我注释里面的两句话

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
		/**
* 初始化 HeartbeatServices
* TaskExecutor与JobMaster中的Slot状态通过JobMaster向TaskExecutor定时发送的心跳消息里的AllocatedSlotReport来同步,
* 而TaskExecutor与ResourceManager中的Slot状态通过TaskExecutor向ResourceManager定时发送的心跳消息里的SlotReport来同步。
* Slot的状态传递是基于心跳传递的
*/
HeartbeatServices heartbeatServices = HeartbeatServices.fromConfiguration(configuration);



-------------------------------------------------



public static HeartbeatServices fromConfiguration(Configuration configuration) {

/**
* 注释: 获取心跳的两个关键参数:
* 1、心跳间隔时间 heartbeatInterval(heartbeat.interval = 10000)
* 2、心跳超时时间 heartbeatTimeout(heartbeat.timeout = 50000)
*/
long heartbeatInterval = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL);
long heartbeatTimeout = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT);

/**
* 注释: 启动心跳服务
*/
return new HeartbeatServices(heartbeatInterval, heartbeatTimeout);
}

这里的心跳检测很重要,因为

  • TaskExecutor和JobMaster中的slot状态就是通过JobMaster向TaskExecutor定时发送心跳信息来同步
  • TaskExexutor和ResourceManager中的slot状态是通过TaskExecutor向ResourceManager定时发送心跳信息来同步

再往下看是一个集群监控,这里就先不看了

再往下看这个初始化blobService,这个是用来定时检查过期的Job资源和对大文件的处理

再往下就是初始化一个TaskExecutor,这个就是把刚才的一堆示例全都初始化都再包装成一个内容中,然后调用了startTaskManager方法,我们看一下这个方法的内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
/**
* startTaskManager方法实例化TaskExecutor
*/
public static TaskExecutor startTaskManager(Configuration configuration, ResourceID resourceID, RpcService rpcService,
HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry,
BlobCacheService blobCacheService, boolean localCommunicationOnly, ExternalResourceInfoProvider externalResourceInfoProvider,
FatalErrorHandler fatalErrorHandler) throws Exception {

checkNotNull(configuration);
checkNotNull(resourceID);
checkNotNull(rpcService);
checkNotNull(highAvailabilityServices);

LOG.info("Starting TaskManager with ResourceID: {}", resourceID);

String externalAddress = rpcService.getAddress();

/**
* 根据配置信息
* 初始化TaskExecutor所在服务器的资源配置信息(cpucore, memroy, network, ...)
* private final CPUResource cpuCores;
* private final MemorySize taskHeapSize;
* private final MemorySize taskOffHeapSize;
* private final MemorySize networkMemSize;
* private final MemorySize managedMemorySize;
*/
final TaskExecutorResourceSpec taskExecutorResourceSpec = TaskExecutorResourceUtils.resourceSpecFromConfig(configuration);

//创建TaskManagerServicesConfiguration封装配置信息
TaskManagerServicesConfiguration taskManagerServicesConfiguration = TaskManagerServicesConfiguration
.fromConfiguration(configuration, resourceID, externalAddress, localCommunicationOnly,
taskExecutorResourceSpec
);

Tuple2<TaskManagerMetricGroup, MetricGroup> taskManagerMetricGroup = MetricUtils
.instantiateTaskManagerMetricGroup(metricRegistry, externalAddress, resourceID,
taskManagerServicesConfiguration.getSystemResourceMetricsProbingInterval());

//初始化 ioExecutor,提供IO服务
final ExecutorService ioExecutor = Executors
.newFixedThreadPool(taskManagerServicesConfiguration.getNumIoThreads(), new ExecutorThreadFactory("flink-taskexecutor-io"));

/**
* 初始化了TaskManager在运行过程中,需要的用来对外提供服务的各种服务组件
* 进入fromConfiguration
*/
TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration(
taskManagerServicesConfiguration,
blobCacheService.getPermanentBlobService(),
taskManagerMetricGroup.f1,
ioExecutor,
fatalErrorHandler);

// 注释: TaskManagerConfiguration
TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration
.fromConfiguration(configuration, taskExecutorResourceSpec, externalAddress);

String metricQueryServiceAddress = metricRegistry.getMetricQueryServiceGatewayRpcAddress();

/**
*
* 1.创建 TaskExecutor 实例:主要就是做基础组件和TaskExecutor核心服务组件的初始化工作
*
* 2.内部会创建两个重要的心跳管理器:
* 1、JobManagerHeartbeatManager
* 2、ResourceManagerHeartbeatManager
*/
return new TaskExecutor(
rpcService,
taskManagerConfiguration,
highAvailabilityServices,
taskManagerServices,
externalResourceInfoProvider,
heartbeatServices,
taskManagerMetricGroup.f0,
metricQueryServiceAddress,
blobCacheService,
fatalErrorHandler,
new TaskExecutorPartitionTrackerImpl(taskManagerServices.getShuffleEnvironment()),
createBackPressureSampleService(configuration, rpcService.getScheduledExecutor())
);
}

首先是初始化TaskExecutor所在服务器的资源配置final TaskExecutorResourceSpec taskExecutorResourceSpec = TaskExecutorResourceUtils.resourceSpecFromConfig(configuration);

我们看一下这个里面的内容

1
2
3
4
5
6
7
8
9
10
11
public class TaskExecutorResourceSpec {

private final CPUResource cpuCores;

private final MemorySize taskHeapSize;

private final MemorySize taskOffHeapSize;

private final MemorySize networkMemSize;

private final MemorySize managedMemorySize;

这个里面的配置是一些比如内存啊,网络啊,这些做初始化

然后是封装了一下配置信息

再然后是初始化了ioExecutor,提供了Io服务,这个就是创建了一个ioExecutor线程池

再然后就是初始化了TaskManager在运行过程中的一些对外提供服务的服务组件,调用了fromConfiguration方法,我们看一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
public static TaskManagerServices fromConfiguration(TaskManagerServicesConfiguration taskManagerServicesConfiguration,
PermanentBlobService permanentBlobService, MetricGroup taskManagerMetricGroup, ExecutorService ioExecutor,
FatalErrorHandler fatalErrorHandler) throws Exception {

//检查临时目录是否可用
checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths());

/**
* 初始化 TaskEventDispatcher
* Flink运行的是流式任务: StreamTask与StreamTask之间的调度又taskEventDispatcher来负责
*/
final TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();

/**
* 初始化 IOManagerASync,异步IO管理
*/
final IOManager ioManager = new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());

/**
* shuffleEnvironment: 对象为将来的shuffle提供各种组件创建的支撑
* streamSlot和StreamSlot的shuffle管理
*/
final ShuffleEnvironment<?, ?> shuffleEnvironment = createShuffleEnvironment(
taskManagerServicesConfiguration,
taskEventDispatcher,
taskManagerMetricGroup,
ioExecutor);

// 启动了Netty服务端和客户端,为上下游的Slot提供数据传输服务
final int listeningDataPort = shuffleEnvironment.start();

/**
* 初始化状态管理服务
*
*/
final KvStateService kvStateService = KvStateService.fromConfiguration(taskManagerServicesConfiguration);
kvStateService.start();

final UnresolvedTaskManagerLocation unresolvedTaskManagerLocation = new UnresolvedTaskManagerLocation(
taskManagerServicesConfiguration.getResourceID(), taskManagerServicesConfiguration.getExternalAddress(),
// we expose the task manager location with the listening port
// iff the external data port is not explicitly defined
taskManagerServicesConfiguration.getExternalDataPort() > 0 ?
taskManagerServicesConfiguration.getExternalDataPort() : listeningDataPort);

/**
* 初始化 BroadCastVariableManager = BroadcastVariableManager
*/
final BroadcastVariableManager broadcastVariableManager = new BroadcastVariableManager();

/**
* 初始化 TaskSlotTable
* TaskSlot各种关系的映射进行保存
*/
final TaskSlotTable<Task> taskSlotTable = createTaskSlotTable(
taskManagerServicesConfiguration.getNumberOfSlots(),
taskManagerServicesConfiguration.getTaskExecutorResourceSpec(),
taskManagerServicesConfiguration.getTimerServiceShutdownTimeout(),
taskManagerServicesConfiguration.getPageSize(),
ioExecutor);

/**
* 初始化DefaultJobTable,管理与Job相关的信息
*/
final JobTable jobTable = DefaultJobTable.create();

/**
* jobLeaderService监听对应的JobMaster的Leader信息
* */
final JobLeaderService jobLeaderService = new DefaultJobLeaderService(
unresolvedTaskManagerLocation,
taskManagerServicesConfiguration.getRetryingRegistrationConfiguration()
);

/**
* 初始化 StateRootDirectory 和 StateRootDirectoryFile
*/
final String[] stateRootDirectoryStrings = taskManagerServicesConfiguration.getLocalRecoveryStateRootDirectories();
final File[] stateRootDirectoryFiles = new File[stateRootDirectoryStrings.length];
for(int i = 0; i < stateRootDirectoryStrings.length; ++i) {
stateRootDirectoryFiles[i] = new File(stateRootDirectoryStrings[i], LOCAL_STATE_SUB_DIRECTORY_ROOT);
}

/**
* 初始化 TaskExecutorLocalStateStoresManager
*/
final TaskExecutorLocalStateStoresManager taskStateManager = new TaskExecutorLocalStateStoresManager(
taskManagerServicesConfiguration.isLocalRecoveryEnabled(), stateRootDirectoryFiles, ioExecutor);

final boolean failOnJvmMetaspaceOomError = taskManagerServicesConfiguration.getConfiguration()
.getBoolean(CoreOptions.FAIL_ON_USER_CLASS_LOADING_METASPACE_OOM);

/**
* 初始化 LibraryCacheManager
* 第一个参数: PermanentBlobService
* 第二个参数: DefaultClassLoaderFactory
*/
final LibraryCacheManager libraryCacheManager = new BlobLibraryCacheManager(
permanentBlobService,
BlobLibraryCacheManager.defaultClassLoaderFactory(
taskManagerServicesConfiguration.getClassLoaderResolveOrder(),
taskManagerServicesConfiguration.getAlwaysParentFirstLoaderPatterns(),
failOnJvmMetaspaceOomError ? fatalErrorHandler : null)
);

/**
* TaskManagerServices
*/
return new TaskManagerServices(
unresolvedTaskManagerLocation,
taskManagerServicesConfiguration.getManagedMemorySize().getBytes(),
ioManager,
shuffleEnvironment,
kvStateService,
broadcastVariableManager,
taskSlotTable,
jobTable,
jobLeaderService,
taskStateManager,
taskEventDispatcher,
ioExecutor,
libraryCacheManager
);
}

首先检测临时目录

然后初始化一个streamTask和streamTask之间调度的taskEventDispatcher负责

然后初始化了一个io管理。异步io就是,比如通信有的很多就是用的异步io。比如taskSlot建立之后会调用wait什么等待的方法,发送一个io之后然后等待,这种异步就是这个

又初始化了一个各个组件shuffle的管理。这个就是用来做shuffle的,streamslot和streamslot的shuffle关联。因为我们在一个taskslot里面运行多个任务,有时候上游任务到下游任务需要shuffle

下面启动一个netty服务端和客户端,上游往下游写数据的话,进行数据对接,这个也可以去看一下,主要是netty相关的代码,这里先不讨论了,有时间再看

再下面是一个状态管理KvStateService

然后处理广播变量的BroadcastVariableManager

再然后是初始化了TaskSlotTable,这里是对taskSlot的各种关系的映射进行保存

1
2
3
4
5
6
7
8
9
10
/**
* 初始化 TaskSlotTable
* TaskSlot各种关系的映射进行保存
*/
final TaskSlotTable<Task> taskSlotTable = createTaskSlotTable(
taskManagerServicesConfiguration.getNumberOfSlots(),
taskManagerServicesConfiguration.getTaskExecutorResourceSpec(),
taskManagerServicesConfiguration.getTimerServiceShutdownTimeout(),
taskManagerServicesConfiguration.getPageSize(),
ioExecutor);

第一个是有多少slot

第二个是资源信息

第三个是每一个taskSlot的监听服务和超时事件

第四个是基于一个表进行管理

第五个是io

然后初始化了一个jobTable存储job的信息

再然后创建了一个jobLeaderService

这里我们之前也见过,这个就是初始化一个jobLeaderService来监听jobMaster的leader信息

这里也可以看一下TaskExecutor的代码中的这个jobLeaderService的内容,这里就先不说了

再往下看创建了一个stateRootDirectoryFiles是恢复本地的state

taskStataeManager是管理本地的state

再然后整个组件都初始化完了就会return一个实例封装前面的组件TaskManagerServices

我们回到前面TaskManagerRunner代码中的TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration(

这里我们就初始化完了一堆服务组件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
*
* 1.创建 TaskExecutor 实例:主要就是做基础组件和TaskExecutor核心服务组件的初始化工作
*
* 2.内部会创建两个重要的心跳管理器:
* 1、JobManagerHeartbeatManager
* 2、ResourceManagerHeartbeatManager
*/
return new TaskExecutor(
rpcService,
taskManagerConfiguration,
highAvailabilityServices,
taskManagerServices,
externalResourceInfoProvider,
heartbeatServices,
taskManagerMetricGroup.f0,
metricQueryServiceAddress,
blobCacheService,
fatalErrorHandler,
new TaskExecutorPartitionTrackerImpl(taskManagerServices.getShuffleEnvironment()),
createBackPressureSampleService(configuration, rpcService.getScheduledExecutor())
);

然后这里就是我们把前面的一堆服务给实例化一起了,然后我们点进这个里面看一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/**
* 当前构造方法执行完了之后,执行 onStart() 方法
*/
public TaskExecutor(RpcService rpcService, TaskManagerConfiguration taskManagerConfiguration, HighAvailabilityServices haServices,
TaskManagerServices taskExecutorServices, ExternalResourceInfoProvider externalResourceInfoProvider, HeartbeatServices heartbeatServices,
TaskManagerMetricGroup taskManagerMetricGroup, @Nullable String metricQueryServiceAddress, BlobCacheService blobCacheService,
FatalErrorHandler fatalErrorHandler, TaskExecutorPartitionTracker partitionTracker, BackPressureSampleService backPressureSampleService) {

super(rpcService, AkkaRpcServiceUtils.createRandomName(TASK_MANAGER_NAME));

checkArgument(taskManagerConfiguration.getNumberSlots() > 0, "The number of slots has to be larger than 0.");

this.taskManagerConfiguration = checkNotNull(taskManagerConfiguration);
this.taskExecutorServices = checkNotNull(taskExecutorServices);
this.haServices = checkNotNull(haServices);
this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
this.partitionTracker = partitionTracker;
this.taskManagerMetricGroup = checkNotNull(taskManagerMetricGroup);
this.blobCacheService = checkNotNull(blobCacheService);
this.metricQueryServiceAddress = metricQueryServiceAddress;
this.backPressureSampleService = checkNotNull(backPressureSampleService);
this.externalResourceInfoProvider = checkNotNull(externalResourceInfoProvider);

this.libraryCacheManager = taskExecutorServices.getLibraryCacheManager();
this.taskSlotTable = taskExecutorServices.getTaskSlotTable();
this.jobTable = taskExecutorServices.getJobTable();
this.jobLeaderService = taskExecutorServices.getJobLeaderService();
this.unresolvedTaskManagerLocation = taskExecutorServices.getUnresolvedTaskManagerLocation();
this.localStateStoresManager = taskExecutorServices.getTaskManagerStateStore();
this.shuffleEnvironment = taskExecutorServices.getShuffleEnvironment();
this.kvStateService = taskExecutorServices.getKvStateService();
this.ioExecutor = taskExecutorServices.getIOExecutor();

/**
* LeaderRetrievalService
*/
this.resourceManagerLeaderRetriever = haServices.getResourceManagerLeaderRetriever();

/**
* HardwareDescription 资源的封装
*/
this.hardwareDescription = HardwareDescription.extractFromSystem(taskExecutorServices.getManagedMemorySize());

this.resourceManagerAddress = null;
this.resourceManagerConnection = null;
this.currentRegistrationTimeoutId = null;

final ResourceID resourceId = taskExecutorServices.getUnresolvedTaskManagerLocation().getResourceID();

/**
*
* HeartbeatManagerImpl jobManagerHeartbeatManager
* 和JobMaster的心跳检测
*/
this.jobManagerHeartbeatManager = createJobManagerHeartbeatManager(heartbeatServices, resourceId);

/**
* HeartbeatManagerImpl resourceManagerHeartbeatManager
* 和ResourceManager的心跳检测
*/
this.resourceManagerHeartbeatManager = createResourceManagerHeartbeatManager(heartbeatServices, resourceId);
}

这里有两个很重要的就是jobManagerHeartbeatManager和resourceManagerHeartbeatManager

这两个就是前面说的和JobMaster的心跳检测和对ResourceManager的心跳检测

这个方法走完了,我们向前再看

我们这个startTaskManager的方法最后赋值给了taskManager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
*
* 初始化TaskExecutor
* 进入startTaskManager
*/
taskManager = startTaskManager(this.configuration,
this.resourceId,
rpcService,
highAvailabilityServices,
heartbeatServices,
metricRegistry,
blobCacheService,
false,
externalResourceInfoProvider,
this);

也就是说我们初始化的这个TaskExcecutor方法赋值给了TaskManager

我们再返回这个startTaskManager方法最后return的TaskExecutor方法中

1
2
3
4
5
6
7
8
9
/**
* 当前构造方法执行完了之后,执行 onStart() 方法
*/
public TaskExecutor(RpcService rpcService, TaskManagerConfiguration taskManagerConfiguration, HighAvailabilityServices haServices,
TaskManagerServices taskExecutorServices, ExternalResourceInfoProvider externalResourceInfoProvider, HeartbeatServices heartbeatServices,
TaskManagerMetricGroup taskManagerMetricGroup, @Nullable String metricQueryServiceAddress, BlobCacheService blobCacheService,
FatalErrorHandler fatalErrorHandler, TaskExecutorPartitionTracker partitionTracker, BackPressureSampleService backPressureSampleService) {

super(rpcService, AkkaRpcServiceUtils.createRandomName(TASK_MANAGER_NAME));

我们可以看到这里有一个父类,我们点进去看看他的父类是谁

1
protected RpcEndpoint(final RpcService rpcService, final String endpointId) {

我们点进去一看竟然是RpcEndPoint

如果是他的子类的话就会在执行完后调用一个onstart的方法

我们再返回TaskManagerRunner这个方法就执行完了,我们再网上看runTaskManager方法

下一个有一个

1
2
3
4
5
6
7
8
9
10
11
12
/**
* 确认启动成功
*/
taskManagerRunner.start();


--------------------------------------


public final void start() {
rpcServer.start();
}

这里也就是确认启动成功,然后给自己发一条消息


前面的就是taskManagerRunner的这条流程了

后面还有一个就是穷的那个流程中的最后这个rpcendPoint的一个onstart方法,下一个文章再写吧