Flink源码解析(十二) TaskManagerRunner的启动流程 关于前面taskExecutor怎么接收resourceManager的请求怎么给jobMaster的过程已经说完了
我们现在再看一下这里的TaskManagerRunner的启动流程
我们这里看的是TaskManagerRunner的main方法,这个就是TaskManager的启动流程入口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public static void main (String[] args) throws Exception { EnvironmentInformation.logEnvironmentInfo(LOG, "TaskManager" , args); SignalHandler.register(LOG); JvmShutdownSafeguard.installAsShutdownHook(LOG); long maxOpenFileHandles = EnvironmentInformation.getOpenFileHandlesLimit(); if (maxOpenFileHandles != -1L ) { LOG.info("Maximum number of open file descriptors is {}." , maxOpenFileHandles); } else { LOG.info("Cannot determine the maximum number of open file descriptors" ); } runTaskManagerSecurely(args, ResourceID.generate()); }
这里的前面都是一些监测,这些先不看,我们看到了这个runTaskManagerSecurely,看这个名字应该是和安全有关
这里给了一个args一个初始化参数,我们看一下这里的第二个参数是什么 ResourceID.generate()
1 2 3 4 5 6 7 8 9 10 11 public static ResourceID generate () { return new ResourceID (new AbstractID ().toString()); } ----------------------------------------------- public AbstractID () { this .lowerPart = RND.nextLong(); this .upperPart = RND.nextLong(); }
这里的第二个参数,其实是一个随机值,调的random方法,这里也就是唯一编号
我们再看一下这个runTaskManagerSecurely方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public static void runTaskManagerSecurely (String[] args, ResourceID resourceID) { try { Configuration configuration = loadConfiguration(args); runTaskManagerSecurely(configuration, resourceID); } catch (Throwable t) { final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class); LOG.error("TaskManager initialization failed." , strippedThrowable); System.exit(STARTUP_FAILURE_RETURN_CODE); } }
这里首先是加载了一个配置参数,也就是解析了fink-conf.yaml配置信息,然后调用了runTaskManagerSecurely方法,我们再看一下这个方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public static void runTaskManagerSecurely (Configuration configuration, ResourceID resourceID) throws Exception { replaceGracefulExitWithHaltIfConfigured(configuration); final PluginManager pluginManager = PluginUtils.createPluginManagerFromRootFolder(configuration); FileSystem.initialize(configuration, pluginManager); SecurityUtils.install(new SecurityConfiguration (configuration)); SecurityUtils.getInstalledContext().runSecured( () -> { runTaskManager(configuration, resourceID, pluginManager); return null ; }); }
首先replaceGracefulExitWithHaltIfConfigured方法的意思是如果相关配置启用了“直接停止”,在调用该方法时,程序将跳过正常的优雅退出流程,直接强制终止。
然后加载了插件,这个加载插件我们之前在resourceManager里面也看过这个插件的内容
然后初始化了文件系统,加载了安全方面的配置
之后是启动了一个线程,这个线程启动了TaskManager,调用了runTaskManager方法
1 2 3 4 5 6 7 8 9 10 11 12 13 public static void runTaskManager (Configuration configuration, ResourceID resourceId, PluginManager pluginManager) throws Exception { final TaskManagerRunner taskManagerRunner = new TaskManagerRunner (configuration, resourceId, pluginManager); taskManagerRunner.start(); }
在这里我们构建了一个TaskManagerRunner,然后进行了启动,我们先看一下这个TaskManagerRuner的内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 public TaskManagerRunner (Configuration configuration, ResourceID resourceId, PluginManager pluginManager) throws Exception { this .configuration = checkNotNull(configuration); this .resourceId = checkNotNull(resourceId); timeout = AkkaUtils.getTimeoutAsTime(configuration); this .executor = java.util.concurrent.Executors .newScheduledThreadPool(Hardware.getNumberCPUCores(), new ExecutorThreadFactory ("taskmanager-future" )); highAvailabilityServices = HighAvailabilityServicesUtils .createHighAvailabilityServices(configuration, executor, HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION); rpcService = createRpcService(configuration, highAvailabilityServices); HeartbeatServices heartbeatServices = HeartbeatServices.fromConfiguration(configuration); metricRegistry = new MetricRegistryImpl (MetricRegistryConfiguration.fromConfiguration(configuration), ReporterSetup.fromConfiguration(configuration, pluginManager)); final RpcService metricQueryServiceRpcService = MetricUtils.startRemoteMetricsRpcService(configuration, rpcService.getAddress()); metricRegistry.startQueryService(metricQueryServiceRpcService, resourceId); blobCacheService = new BlobCacheService (configuration, highAvailabilityServices.createBlobStore(), null ); final ExternalResourceInfoProvider externalResourceInfoProvider = ExternalResourceUtils .createStaticExternalResourceInfoProvider(ExternalResourceUtils.getExternalResourceAmountMap(configuration), ExternalResourceUtils.externalResourceDriversFromConfig(configuration, pluginManager)); taskManager = startTaskManager(this .configuration, this .resourceId, rpcService, highAvailabilityServices, heartbeatServices, metricRegistry, blobCacheService, false , externalResourceInfoProvider, this ); this .terminationFuture = new CompletableFuture <>(); this .shutdown = false ; MemoryLogger.startIfConfigured(LOG, configuration, terminationFuture); }
这里的代码稍微有点长,我们从头慢慢看看
刚开始是获取了一个超时事件,这里就是这个timeout
然后根据CPU的核数创建了一个线程池,用Handware,这是一些硬件信息,获取CPU核数
初始化了HA服务,这里可以稍微看一下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public static HighAvailabilityServices createHighAvailabilityServices (Configuration configuration, Executor executor, AddressResolution addressResolution) throws Exception { HighAvailabilityMode highAvailabilityMode = HighAvailabilityMode.fromConfig(configuration); switch (highAvailabilityMode) { case NONE: final Tuple2<String, Integer> hostnamePort = getJobManagerAddress(configuration); final String resourceManagerRpcUrl = AkkaRpcServiceUtils .getRpcUrl(hostnamePort.f0, hostnamePort.f1, AkkaRpcServiceUtils.createWildcardName(ResourceManager.RESOURCE_MANAGER_NAME), addressResolution, configuration); final String dispatcherRpcUrl = AkkaRpcServiceUtils .getRpcUrl(hostnamePort.f0, hostnamePort.f1, AkkaRpcServiceUtils.createWildcardName(Dispatcher.DISPATCHER_NAME), addressResolution, configuration); final String webMonitorAddress = getWebMonitorAddress(configuration, addressResolution); return new StandaloneHaServices (resourceManagerRpcUrl, dispatcherRpcUrl, webMonitorAddress); case ZOOKEEPER: BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(configuration); return new ZooKeeperHaServices (ZooKeeperUtils.startCuratorFramework(configuration), executor, configuration, blobStoreService); case FACTORY_CLASS: return createCustomHAServices(configuration, executor); default : throw new Exception ("Recovery mode " + highAvailabilityMode + " is not supported." ); } }
这里是首先加载配置,然后根据HA模式启动,我们一般使用的就是ZK的HA模式
1 2 3 4 5 6 case ZOOKEEPER: BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(configuration); return new ZooKeeperHaServices (ZooKeeperUtils.startCuratorFramework(configuration), executor, configuration, blobStoreService);
这里首先创建一个Blob的Service,这个是创建了大文件处理的内容,和文件存储使用和处理的是Blob,这里我们之前也看过
在这之后创建了一个ZookeeperHaServices,这个就是初始化HA服务
我们返回到创建TaskManagerRunner这里往下面继续看
下面是初始化了一个RpcService,这个是用于各个节点之间进行通信的内容
然后初始化心跳检测,这个初始化心跳很重要,这里好好看一下我注释里面的两句话
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 HeartbeatServices heartbeatServices = HeartbeatServices.fromConfiguration(configuration); ------------------------------------------------- public static HeartbeatServices fromConfiguration (Configuration configuration) { long heartbeatInterval = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL); long heartbeatTimeout = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT); return new HeartbeatServices (heartbeatInterval, heartbeatTimeout); }
这里的心跳检测很重要,因为
TaskExecutor和JobMaster中的slot状态就是通过JobMaster向TaskExecutor定时发送心跳信息来同步
TaskExexutor和ResourceManager中的slot状态是通过TaskExecutor向ResourceManager定时发送心跳信息来同步
再往下看是一个集群监控,这里就先不看了
再往下看这个初始化blobService,这个是用来定时检查过期的Job资源和对大文件的处理
再往下就是初始化一个TaskExecutor,这个就是把刚才的一堆示例全都初始化都再包装成一个内容中,然后调用了startTaskManager方法,我们看一下这个方法的内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 public static TaskExecutor startTaskManager (Configuration configuration, ResourceID resourceID, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, BlobCacheService blobCacheService, boolean localCommunicationOnly, ExternalResourceInfoProvider externalResourceInfoProvider, FatalErrorHandler fatalErrorHandler) throws Exception { checkNotNull(configuration); checkNotNull(resourceID); checkNotNull(rpcService); checkNotNull(highAvailabilityServices); LOG.info("Starting TaskManager with ResourceID: {}" , resourceID); String externalAddress = rpcService.getAddress(); final TaskExecutorResourceSpec taskExecutorResourceSpec = TaskExecutorResourceUtils.resourceSpecFromConfig(configuration); TaskManagerServicesConfiguration taskManagerServicesConfiguration = TaskManagerServicesConfiguration .fromConfiguration(configuration, resourceID, externalAddress, localCommunicationOnly, taskExecutorResourceSpec ); Tuple2<TaskManagerMetricGroup, MetricGroup> taskManagerMetricGroup = MetricUtils .instantiateTaskManagerMetricGroup(metricRegistry, externalAddress, resourceID, taskManagerServicesConfiguration.getSystemResourceMetricsProbingInterval()); final ExecutorService ioExecutor = Executors .newFixedThreadPool(taskManagerServicesConfiguration.getNumIoThreads(), new ExecutorThreadFactory ("flink-taskexecutor-io" )); TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration( taskManagerServicesConfiguration, blobCacheService.getPermanentBlobService(), taskManagerMetricGroup.f1, ioExecutor, fatalErrorHandler); TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration .fromConfiguration(configuration, taskExecutorResourceSpec, externalAddress); String metricQueryServiceAddress = metricRegistry.getMetricQueryServiceGatewayRpcAddress(); return new TaskExecutor ( rpcService, taskManagerConfiguration, highAvailabilityServices, taskManagerServices, externalResourceInfoProvider, heartbeatServices, taskManagerMetricGroup.f0, metricQueryServiceAddress, blobCacheService, fatalErrorHandler, new TaskExecutorPartitionTrackerImpl (taskManagerServices.getShuffleEnvironment()), createBackPressureSampleService(configuration, rpcService.getScheduledExecutor()) ); }
首先是初始化TaskExecutor所在服务器的资源配置final TaskExecutorResourceSpec taskExecutorResourceSpec = TaskExecutorResourceUtils.resourceSpecFromConfig(configuration);
我们看一下这个里面的内容
1 2 3 4 5 6 7 8 9 10 11 public class TaskExecutorResourceSpec { private final CPUResource cpuCores; private final MemorySize taskHeapSize; private final MemorySize taskOffHeapSize; private final MemorySize networkMemSize; private final MemorySize managedMemorySize;
这个里面的配置是一些比如内存啊,网络啊,这些做初始化
然后是封装了一下配置信息
再然后是初始化了ioExecutor,提供了Io服务,这个就是创建了一个ioExecutor线程池
再然后就是初始化了TaskManager在运行过程中的一些对外提供服务的服务组件,调用了fromConfiguration方法,我们看一下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 public static TaskManagerServices fromConfiguration (TaskManagerServicesConfiguration taskManagerServicesConfiguration, PermanentBlobService permanentBlobService, MetricGroup taskManagerMetricGroup, ExecutorService ioExecutor, FatalErrorHandler fatalErrorHandler) throws Exception { checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths()); final TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher (); final IOManager ioManager = new IOManagerAsync (taskManagerServicesConfiguration.getTmpDirPaths()); final ShuffleEnvironment<?, ?> shuffleEnvironment = createShuffleEnvironment( taskManagerServicesConfiguration, taskEventDispatcher, taskManagerMetricGroup, ioExecutor); final int listeningDataPort = shuffleEnvironment.start(); final KvStateService kvStateService = KvStateService.fromConfiguration(taskManagerServicesConfiguration); kvStateService.start(); final UnresolvedTaskManagerLocation unresolvedTaskManagerLocation = new UnresolvedTaskManagerLocation ( taskManagerServicesConfiguration.getResourceID(), taskManagerServicesConfiguration.getExternalAddress(), taskManagerServicesConfiguration.getExternalDataPort() > 0 ? taskManagerServicesConfiguration.getExternalDataPort() : listeningDataPort); final BroadcastVariableManager broadcastVariableManager = new BroadcastVariableManager (); final TaskSlotTable<Task> taskSlotTable = createTaskSlotTable( taskManagerServicesConfiguration.getNumberOfSlots(), taskManagerServicesConfiguration.getTaskExecutorResourceSpec(), taskManagerServicesConfiguration.getTimerServiceShutdownTimeout(), taskManagerServicesConfiguration.getPageSize(), ioExecutor); final JobTable jobTable = DefaultJobTable.create(); final JobLeaderService jobLeaderService = new DefaultJobLeaderService ( unresolvedTaskManagerLocation, taskManagerServicesConfiguration.getRetryingRegistrationConfiguration() ); final String[] stateRootDirectoryStrings = taskManagerServicesConfiguration.getLocalRecoveryStateRootDirectories(); final File[] stateRootDirectoryFiles = new File [stateRootDirectoryStrings.length]; for (int i = 0 ; i < stateRootDirectoryStrings.length; ++i) { stateRootDirectoryFiles[i] = new File (stateRootDirectoryStrings[i], LOCAL_STATE_SUB_DIRECTORY_ROOT); } final TaskExecutorLocalStateStoresManager taskStateManager = new TaskExecutorLocalStateStoresManager ( taskManagerServicesConfiguration.isLocalRecoveryEnabled(), stateRootDirectoryFiles, ioExecutor); final boolean failOnJvmMetaspaceOomError = taskManagerServicesConfiguration.getConfiguration() .getBoolean(CoreOptions.FAIL_ON_USER_CLASS_LOADING_METASPACE_OOM); final LibraryCacheManager libraryCacheManager = new BlobLibraryCacheManager ( permanentBlobService, BlobLibraryCacheManager.defaultClassLoaderFactory( taskManagerServicesConfiguration.getClassLoaderResolveOrder(), taskManagerServicesConfiguration.getAlwaysParentFirstLoaderPatterns(), failOnJvmMetaspaceOomError ? fatalErrorHandler : null ) ); return new TaskManagerServices ( unresolvedTaskManagerLocation, taskManagerServicesConfiguration.getManagedMemorySize().getBytes(), ioManager, shuffleEnvironment, kvStateService, broadcastVariableManager, taskSlotTable, jobTable, jobLeaderService, taskStateManager, taskEventDispatcher, ioExecutor, libraryCacheManager ); }
首先检测临时目录
然后初始化一个streamTask和streamTask之间调度的taskEventDispatcher负责
然后初始化了一个io管理。异步io就是,比如通信有的很多就是用的异步io。比如taskSlot建立之后会调用wait什么等待的方法,发送一个io之后然后等待,这种异步就是这个
又初始化了一个各个组件shuffle的管理。这个就是用来做shuffle的,streamslot和streamslot的shuffle关联。因为我们在一个taskslot里面运行多个任务,有时候上游任务到下游任务需要shuffle
下面启动一个netty服务端和客户端,上游往下游写数据的话,进行数据对接,这个也可以去看一下,主要是netty相关的代码,这里先不讨论了,有时间再看
再下面是一个状态管理KvStateService
然后处理广播变量的BroadcastVariableManager
再然后是初始化了TaskSlotTable,这里是对taskSlot的各种关系的映射进行保存
1 2 3 4 5 6 7 8 9 10 final TaskSlotTable<Task> taskSlotTable = createTaskSlotTable( taskManagerServicesConfiguration.getNumberOfSlots(), taskManagerServicesConfiguration.getTaskExecutorResourceSpec(), taskManagerServicesConfiguration.getTimerServiceShutdownTimeout(), taskManagerServicesConfiguration.getPageSize(), ioExecutor);
第一个是有多少slot
第二个是资源信息
第三个是每一个taskSlot的监听服务和超时事件
第四个是基于一个表进行管理
第五个是io
然后初始化了一个jobTable存储job的信息
再然后创建了一个jobLeaderService
这里我们之前也见过,这个就是初始化一个jobLeaderService来监听jobMaster的leader信息
这里也可以看一下TaskExecutor的代码中的这个jobLeaderService的内容,这里就先不说了
再往下看创建了一个stateRootDirectoryFiles是恢复本地的state
taskStataeManager是管理本地的state
再然后整个组件都初始化完了就会return一个实例封装前面的组件TaskManagerServices
我们回到前面TaskManagerRunner代码中的TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration(
这里我们就初始化完了一堆服务组件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 return new TaskExecutor ( rpcService, taskManagerConfiguration, highAvailabilityServices, taskManagerServices, externalResourceInfoProvider, heartbeatServices, taskManagerMetricGroup.f0, metricQueryServiceAddress, blobCacheService, fatalErrorHandler, new TaskExecutorPartitionTrackerImpl (taskManagerServices.getShuffleEnvironment()), createBackPressureSampleService(configuration, rpcService.getScheduledExecutor()) );
然后这里就是我们把前面的一堆服务给实例化一起了,然后我们点进这个里面看一下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 public TaskExecutor (RpcService rpcService, TaskManagerConfiguration taskManagerConfiguration, HighAvailabilityServices haServices, TaskManagerServices taskExecutorServices, ExternalResourceInfoProvider externalResourceInfoProvider, HeartbeatServices heartbeatServices, TaskManagerMetricGroup taskManagerMetricGroup, @Nullable String metricQueryServiceAddress, BlobCacheService blobCacheService, FatalErrorHandler fatalErrorHandler, TaskExecutorPartitionTracker partitionTracker, BackPressureSampleService backPressureSampleService) { super (rpcService, AkkaRpcServiceUtils.createRandomName(TASK_MANAGER_NAME)); checkArgument(taskManagerConfiguration.getNumberSlots() > 0 , "The number of slots has to be larger than 0." ); this .taskManagerConfiguration = checkNotNull(taskManagerConfiguration); this .taskExecutorServices = checkNotNull(taskExecutorServices); this .haServices = checkNotNull(haServices); this .fatalErrorHandler = checkNotNull(fatalErrorHandler); this .partitionTracker = partitionTracker; this .taskManagerMetricGroup = checkNotNull(taskManagerMetricGroup); this .blobCacheService = checkNotNull(blobCacheService); this .metricQueryServiceAddress = metricQueryServiceAddress; this .backPressureSampleService = checkNotNull(backPressureSampleService); this .externalResourceInfoProvider = checkNotNull(externalResourceInfoProvider); this .libraryCacheManager = taskExecutorServices.getLibraryCacheManager(); this .taskSlotTable = taskExecutorServices.getTaskSlotTable(); this .jobTable = taskExecutorServices.getJobTable(); this .jobLeaderService = taskExecutorServices.getJobLeaderService(); this .unresolvedTaskManagerLocation = taskExecutorServices.getUnresolvedTaskManagerLocation(); this .localStateStoresManager = taskExecutorServices.getTaskManagerStateStore(); this .shuffleEnvironment = taskExecutorServices.getShuffleEnvironment(); this .kvStateService = taskExecutorServices.getKvStateService(); this .ioExecutor = taskExecutorServices.getIOExecutor(); this .resourceManagerLeaderRetriever = haServices.getResourceManagerLeaderRetriever(); this .hardwareDescription = HardwareDescription.extractFromSystem(taskExecutorServices.getManagedMemorySize()); this .resourceManagerAddress = null ; this .resourceManagerConnection = null ; this .currentRegistrationTimeoutId = null ; final ResourceID resourceId = taskExecutorServices.getUnresolvedTaskManagerLocation().getResourceID(); this .jobManagerHeartbeatManager = createJobManagerHeartbeatManager(heartbeatServices, resourceId); this .resourceManagerHeartbeatManager = createResourceManagerHeartbeatManager(heartbeatServices, resourceId); }
这里有两个很重要的就是jobManagerHeartbeatManager和resourceManagerHeartbeatManager
这两个就是前面说的和JobMaster的心跳检测和对ResourceManager的心跳检测
这个方法走完了,我们向前再看
我们这个startTaskManager的方法最后赋值给了taskManager
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 taskManager = startTaskManager(this .configuration, this .resourceId, rpcService, highAvailabilityServices, heartbeatServices, metricRegistry, blobCacheService, false , externalResourceInfoProvider, this );
也就是说我们初始化的这个TaskExcecutor方法赋值给了TaskManager
我们再返回这个startTaskManager方法最后return的TaskExecutor方法中
1 2 3 4 5 6 7 8 9 public TaskExecutor (RpcService rpcService, TaskManagerConfiguration taskManagerConfiguration, HighAvailabilityServices haServices, TaskManagerServices taskExecutorServices, ExternalResourceInfoProvider externalResourceInfoProvider, HeartbeatServices heartbeatServices, TaskManagerMetricGroup taskManagerMetricGroup, @Nullable String metricQueryServiceAddress, BlobCacheService blobCacheService, FatalErrorHandler fatalErrorHandler, TaskExecutorPartitionTracker partitionTracker, BackPressureSampleService backPressureSampleService) { super (rpcService, AkkaRpcServiceUtils.createRandomName(TASK_MANAGER_NAME));
我们可以看到这里有一个父类,我们点进去看看他的父类是谁
1 protected RpcEndpoint (final RpcService rpcService, final String endpointId) {
我们点进去一看竟然是RpcEndPoint
如果是他的子类的话就会在执行完后调用一个onstart的方法
我们再返回TaskManagerRunner这个方法就执行完了,我们再网上看runTaskManager方法
下一个有一个
1 2 3 4 5 6 7 8 9 10 11 12 taskManagerRunner.start(); -------------------------------------- public final void start () { rpcServer.start(); }
这里也就是确认启动成功,然后给自己发一条消息
前面的就是taskManagerRunner的这条流程了
后面还有一个就是穷的那个流程中的最后这个rpcendPoint的一个onstart方法,下一个文章再写吧