Flink源码解析(六) Dispatcher启动流程 接下来我们看一下这里的创建Dispatcher的工厂类
这里还是来到我们创建的三个工厂类里面
1 2 3 4 log.debug("Starting Dispatcher." ); dispatcherRunner = dispatcherRunnerFactory .createDispatcherRunner(highAvailabilityServices.getDispatcherLeaderElectionService(), fatalErrorHandler, new HaServicesJobGraphStoreFactory (highAvailabilityServices), ioExecutor, rpcService, partialDispatcherServices);
我们在这里没有看到启动的命令,这里我们需要往里面找一下,但是在这之前先看一下Dispatcher代码的体系结构
我们找到dispatcher代码看一下他的体系结构
1 public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint <DispatcherId> implements DispatcherGateway {
我们首先看到了这个Dispatcher是一个抽象类,说明我们要点下去看,里面可能不是他的实现内容
然后里面有很多的初始化变量,子类初始化一部分,父类初始化一部分
我们先说一下这个步骤十二做了什么
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
我们主要看的是standaloneDispatcher这个
上面是简单的概括一下,下面我们具体来看
首先回到我们之前的工厂里面找到这个位置
1 2 3 4 log.debug("Starting Dispatcher." ); dispatcherRunner = dispatcherRunnerFactory .createDispatcherRunner(highAvailabilityServices.getDispatcherLeaderElectionService(), fatalErrorHandler, new HaServicesJobGraphStoreFactory (highAvailabilityServices), ioExecutor, rpcService, partialDispatcherServices);
发现这里没有start,我们看一下createDispatcherRunner
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 @Override public DispatcherRunner createDispatcherRunner ( LeaderElectionService leaderElectionService, FatalErrorHandler fatalErrorHandler, JobGraphStoreFactory jobGraphStoreFactory, Executor ioExecutor, RpcService rpcService, PartialDispatcherServices partialDispatcherServices) throws Exception { final DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory = dispatcherLeaderProcessFactoryFactory.createFactory( jobGraphStoreFactory, ioExecutor, rpcService, partialDispatcherServices, fatalErrorHandler); return DefaultDispatcherRunner.create( leaderElectionService, fatalErrorHandler, dispatcherLeaderProcessFactory); } --------------------------------------------------------- dispatcherRunner = dispatcherRunnerFactory .createDispatcherRunner(highAvailabilityServices.getDispatcherLeaderElectionService(), fatalErrorHandler, new HaServicesJobGraphStoreFactory (highAvailabilityServices), ioExecutor, rpcService, partialDispatcherServices); log.debug("Starting ResourceManager." );
这里是我们看到createFactory是初始化这个工厂,传入了一些服务有关的内容和HA有关的内容还有一个JobGraph存储有关的工厂
我们点进去这个DefaultDispatcherRunner.create
的方法看一下,这里传入的其实就是把上面的选举的初始化后的内容和一些前面创建的dispatcherLeaderProcessFactory这个工厂的内容传入到这里再创建
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public static DispatcherRunner create (LeaderElectionService leaderElectionService, FatalErrorHandler fatalErrorHandler, DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory) throws Exception { final DefaultDispatcherRunner dispatcherRunner = new DefaultDispatcherRunner (leaderElectionService, fatalErrorHandler, dispatcherLeaderProcessFactory); return DispatcherRunnerLeaderElectionLifecycleManager.createFor(dispatcherRunner, leaderElectionService); }
这里最后return的内容createfor我们再看一下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public static <T extends DispatcherRunner & LeaderContender> DispatcherRunner createFor (T dispatcherRunner, LeaderElectionService leaderElectionService) throws Exception { return new DispatcherRunnerLeaderElectionLifecycleManager <>(dispatcherRunner, leaderElectionService); } -------------------------------------------------------- private DispatcherRunnerLeaderElectionLifecycleManager (T dispatcherRunner, LeaderElectionService leaderElectionService) throws Exception { this .dispatcherRunner = dispatcherRunner; this .leaderElectionService = leaderElectionService; leaderElectionService.start(dispatcherRunner); }
这里其实就是选举了,使用前面传入的leader选举内容选举dispatcherRunner
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 @Override public void start (LeaderContender contender) throws Exception { Preconditions.checkNotNull(contender, "Contender must not be null." ); Preconditions.checkState(leaderContender == null , "Contender was already set." ); LOG.info("Starting ZooKeeperLeaderElectionService {}." , this ); synchronized (lock) { client.getUnhandledErrorListenable().addListener(this ); leaderContender = contender; leaderLatch.addListener(this ); leaderLatch.start(); cache.getListenable().addListener(this ); cache.start(); client.getConnectionStateListenable().addListener(listener); running = true ; } }
我们发现又来到了zookeeper的这个选举方法里面,这里和前面的内容就差不多了
选举成功之后会调用isleader,然后走到grantLeadership
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Override public void isLeader () { synchronized (lock) { if (running) { issuedLeaderSessionID = UUID.randomUUID(); clearConfirmedLeaderInformation(); if (LOG.isDebugEnabled()) { LOG.debug("Grant leadership to contender {} with session ID {}." , leaderContender.getDescription(), issuedLeaderSessionID); } leaderContender.grantLeadership(issuedLeaderSessionID); } else { LOG.debug("Ignoring the grant leadership notification since the service has " + "already been stopped." ); } } }
我们来看一下这个grantLeadership里面,现在进的是DefaultDispatcherRunner里面了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @Override public void grantLeadership (UUID leaderSessionID) { runActionIfRunning(() -> startNewDispatcherLeaderProcess(leaderSessionID)); } ------------------------------------------------------- private void startNewDispatcherLeaderProcess (UUID leaderSessionID) { stopDispatcherLeaderProcess(); dispatcherLeaderProcess = createNewDispatcherLeaderProcess(leaderSessionID); final DispatcherLeaderProcess newDispatcherLeaderProcess = dispatcherLeaderProcess; FutureUtils.assertNoException(previousDispatcherLeaderProcessTerminationFuture.thenRun(newDispatcherLeaderProcess::start)); }
这里的逻辑就是先停掉一个已有的leader然后去创建一个新leader,然后赋值,然后启动一个新的sessionDispatcher然后走thenRun
我们直接看最后的那个start,进入AbstractDispatcherLeaderProcess
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 @Override public final void start () { runIfStateIs(State.CREATED, this ::startInternal); } -------------------------------------------------- final void runIfStateIs (State expectedState, Runnable action) { runIfState(expectedState::equals, action); } ------------------------------------------------------- private void startInternal () { log.info("Start {}." , getClass().getSimpleName()); state = State.RUNNING; onStart(); } ------------------------------------------------------- @Override protected void onStart () { startServices(); onGoingRecoveryOperation = recoverJobsAsync() .thenAccept(this ::createDispatcherIfRunning) .handle(this ::onErrorIfRunning); }
这里是启动了一个jobGraphStore用来存储JobGraph的组件
1 jobGraphStore.start(this );
然后可以再恢复作业
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 private void createDispatcherIfRunning (Collection<JobGraph> jobGraphs) { runIfStateIs(State.RUNNING, () -> createDispatcher(jobGraphs)); } ------------------------------------------------------------ private void createDispatcher (Collection<JobGraph> jobGraphs) { final DispatcherGatewayService dispatcherService = dispatcherGatewayServiceFactory.create( DispatcherId.fromUuid(getLeaderSessionId()), jobGraphs, jobGraphStore); completeDispatcherSetup(dispatcherService); } ---------------------------------------------------------- @Override public AbstractDispatcherLeaderProcess.DispatcherGatewayService create (DispatcherId fencingToken, Collection<JobGraph> recoveredJobs, JobGraphWriter jobGraphWriter) { final DispatcherBootstrap bootstrap = new DefaultDispatcherBootstrap (recoveredJobs); final Dispatcher dispatcher; try { dispatcher = dispatcherFactory.createDispatcher(rpcService, fencingToken, bootstrap, PartialDispatcherServicesWithJobGraphStore.from(partialDispatcherServices, jobGraphWriter)); } catch (Exception e) { throw new FlinkRuntimeException ("Could not create the Dispatcher rpc endpoint." , e); } dispatcher.start(); return DefaultDispatcherGatewayService.from(dispatcher); }
我们一直往下走,发现终于看见了这个start,dispatcher.start();
这个内容终于看见创建了
我们看一下createDispatcher这个方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @Override public StandaloneDispatcher createDispatcher (RpcService rpcService, DispatcherId fencingToken, DispatcherBootstrap dispatcherBootstrap, PartialDispatcherServicesWithJobGraphStore partialDispatcherServicesWithJobGraphStore) throws Exception { return new StandaloneDispatcher (rpcService, fencingToken, dispatcherBootstrap, DispatcherServices.from(partialDispatcherServicesWithJobGraphStore, DefaultJobManagerRunnerFactory.INSTANCE)); } ------------------------------------------------------- public StandaloneDispatcher (RpcService rpcService, DispatcherId fencingToken, DispatcherBootstrap dispatcherBootstrap, DispatcherServices dispatcherServices) throws Exception { super (rpcService, fencingToken, dispatcherBootstrap, dispatcherServices); } ---------------------------------------------
到这里了,我们点进父类看一下有没有onstart方法
发现有
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @Override public void onStart () throws Exception { try { startDispatcherServices(); } catch (Exception e) { final DispatcherException exception = new DispatcherException (String.format("Could not start the Dispatcher %s" , getAddress()), e); onFatalError(exception); throw exception; } dispatcherBootstrap.initialize(this , this .getRpcService().getScheduledExecutor()); }
这里的启动我们分两种情况,第一种是第一次启动,第二种是失败后集群恢复
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private void startDispatcherServices () throws Exception { try { registerDispatcherMetrics(jobManagerMetricGroup); } catch (Exception e) { handleStartDispatcherServicesException(e); } } ------------------------------------------------------------------------ private void registerDispatcherMetrics (MetricGroup jobManagerMetricGroup) { jobManagerMetricGroup.gauge(MetricNames.NUM_RUNNING_JOBS, () -> (long ) jobManagerRunnerFutures.size()); }
这个启动的第一件事就是先注册了一个监控,监控jobmanager组,这里是看看有多少个job在运行,然后就这个方法结束了,然后我们进行第二件事
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 dispatcherBootstrap.initialize(this , this .getRpcService().getScheduledExecutor()); ----------------------------------------------------- @Override public void initialize (final Dispatcher dispatcher, ScheduledExecutor scheduledExecutor) { launchRecoveredJobGraphs(dispatcher, recoveredJobs); recoveredJobs.clear(); } --------------------------------------------------------- protected void launchRecoveredJobGraphs (final Dispatcher dispatcher, final Collection<JobGraph> recoveredJobGraphs) { checkNotNull(dispatcher); checkNotNull(recoveredJobGraphs); for (JobGraph recoveredJob : recoveredJobGraphs) { dispatcher.runRecoveredJob(recoveredJob); } }
这里就是恢复job的任务,会循环恢复每一个job
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 void runRecoveredJob (final JobGraph recoveredJob) { checkNotNull(recoveredJob); FutureUtils.assertNoException(runJob(recoveredJob).handle(handleRecoveredJobStartError(recoveredJob.getJobID()))); } --------------------------------------------------------------------- private CompletableFuture<Void> runJob (JobGraph jobGraph) { Preconditions.checkState(!jobManagerRunnerFutures.containsKey(jobGraph.getJobID())); final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph); jobManagerRunnerFutures.put(jobGraph.getJobID(), jobManagerRunnerFuture); return jobManagerRunnerFuture.thenApply( FunctionUtils.uncheckedFunction(this ::startJobManagerRunner) ).thenApply( FunctionUtils.nullFn() ).whenCompleteAsync( (ignored, throwable) -> { if (throwable != null ) { jobManagerRunnerFutures.remove(jobGraph.getJobID()); } }, getMainThreadExecutor()); }
这里就是主要的恢复逻辑,我们首先看一下这里的创建这个jobMaster的内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 private CompletableFuture<JobManagerRunner> createJobManagerRunner (JobGraph jobGraph) { final RpcService rpcService = getRpcService(); return CompletableFuture.supplyAsync(() -> { try { return jobManagerRunnerFactory .createJobManagerRunner(jobGraph, configuration, rpcService, highAvailabilityServices, heartbeatServices, jobManagerSharedServices, new DefaultJobManagerJobMetricGroupFactory (jobManagerMetricGroup), fatalErrorHandler); } catch (Exception e) { throw new CompletionException (new JobExecutionException (jobGraph.getJobID(), "Could not instantiate JobManager." , e)); } }, rpcService.getExecutor()); }
这里是启动了一个rpc的服务,然后我们再往里面看
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 @Override public JobManagerRunner createJobManagerRunner ( JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, JobManagerSharedServices jobManagerServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler) throws Exception { final JobMasterConfiguration jobMasterConfiguration = JobMasterConfiguration.fromConfiguration(configuration); final SlotPoolFactory slotPoolFactory = DefaultSlotPoolFactory.fromConfiguration(configuration); final SchedulerFactory schedulerFactory = DefaultSchedulerFactory.fromConfiguration(configuration); final SchedulerNGFactory schedulerNGFactory = SchedulerNGFactoryFactory.createSchedulerNGFactory(configuration); final ShuffleMaster<?> shuffleMaster = ShuffleServiceLoader.loadShuffleServiceFactory(configuration).createShuffleMaster(configuration); final JobMasterServiceFactory jobMasterFactory = new DefaultJobMasterServiceFactory ( jobMasterConfiguration, slotPoolFactory, schedulerFactory, rpcService, highAvailabilityServices, jobManagerServices, heartbeatServices, jobManagerJobMetricGroupFactory, fatalErrorHandler, schedulerNGFactory, shuffleMaster); return new JobManagerRunnerImpl ( jobGraph, jobMasterFactory, highAvailabilityServices, jobManagerServices.getLibraryCacheManager().registerClassLoaderLease(jobGraph.getJobID()), jobManagerServices.getScheduledExecutorService(), fatalErrorHandler); }
这里创建是启动了一个rpc然后进行一堆初始化,配置了HA,心跳什么的,最后返回了一个JobManagerRunnerImpl
我们继续往回退
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 jobManagerRunnerFutures.put(jobGraph.getJobID(), jobManagerRunnerFuture); return jobManagerRunnerFuture.thenApply( FunctionUtils.uncheckedFunction(this ::startJobManagerRunner) ).thenApply( FunctionUtils.nullFn() ).whenCompleteAsync( (ignored, throwable) -> { if (throwable != null ) { jobManagerRunnerFutures.remove(jobGraph.getJobID()); } }, getMainThreadExecutor());
这里下面是我们将需要恢复的job添加到组里面,然后调用了startJobManagerRunner
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 private JobManagerRunner startJobManagerRunner (JobManagerRunner jobManagerRunner) throws Exception { final JobID jobId = jobManagerRunner.getJobID(); FutureUtils.assertNoException( jobManagerRunner.getResultFuture().handleAsync((ArchivedExecutionGraph archivedExecutionGraph, Throwable throwable) -> { final JobManagerRunner currentJobManagerRunner = Optional.ofNullable(jobManagerRunnerFutures.get(jobId)) .map(future -> future.getNow(null )).orElse(null ); if (jobManagerRunner == currentJobManagerRunner) { if (archivedExecutionGraph != null ) { jobReachedGloballyTerminalState(archivedExecutionGraph); } else { final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable); if (strippedThrowable instanceof JobNotFinishedException) { jobNotFinished(jobId); } else { jobMasterFailed(jobId, strippedThrowable); } } } else { log.debug("There is a newer JobManagerRunner for the job {}." , jobId); } return null ; }, getMainThreadExecutor())); jobManagerRunner.start(); return jobManagerRunner; }
这里首先就是拿到了一个之前加到组里面的jobid然后最后启动调用了.start
我们这里看一下这个start
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 @Override public void start (LeaderContender contender) throws Exception { Preconditions.checkNotNull(contender, "Contender must not be null." ); Preconditions.checkState(leaderContender == null , "Contender was already set." ); LOG.info("Starting ZooKeeperLeaderElectionService {}." , this ); synchronized (lock) { client.getUnhandledErrorListenable().addListener(this ); leaderContender = contender; leaderLatch.addListener(this ); leaderLatch.start(); cache.getListenable().addListener(this ); cache.start(); client.getConnectionStateListenable().addListener(listener); running = true ; } }
这里和之前的都差不多,这里的start注册了一堆心跳选举,然后去看isleader方法,然后看到grantLeadership方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @Override public void grantLeadership (UUID leaderSessionID) { runActionIfRunning(() -> startNewDispatcherLeaderProcess(leaderSessionID)); } ------------------------------------------------------------- private void startNewDispatcherLeaderProcess (UUID leaderSessionID) { stopDispatcherLeaderProcess(); dispatcherLeaderProcess = createNewDispatcherLeaderProcess(leaderSessionID); final DispatcherLeaderProcess newDispatcherLeaderProcess = dispatcherLeaderProcess; FutureUtils.assertNoException(previousDispatcherLeaderProcessTerminationFuture.thenRun(newDispatcherLeaderProcess::start)); }
这里的启动就又回到之前的内容了
这个就是整体的dispatcher的启动流程