Flink源码解析(六) Dispatcher启动流程 接下来我们看一下这里的创建Dispatcher的工厂类
1 2 3 4 log.debug("Starting Dispatcher." ); dispatcherRunner = dispatcherRunnerFactory .createDispatcherRunner(highAvailabilityServices.getDispatcherLeaderElectionService(), fatalErrorHandler, new HaServicesJobGraphStoreFactory (highAvailabilityServices), ioExecutor, rpcService, partialDispatcherServices);
1 public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint <DispatcherId> implements DispatcherGateway {
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
1 2 3 4 log.debug("Starting Dispatcher." ); dispatcherRunner = dispatcherRunnerFactory .createDispatcherRunner(highAvailabilityServices.getDispatcherLeaderElectionService(), fatalErrorHandler, new HaServicesJobGraphStoreFactory (highAvailabilityServices), ioExecutor, rpcService, partialDispatcherServices);
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 @Override public DispatcherRunner createDispatcherRunner ( LeaderElectionService leaderElectionService, FatalErrorHandler fatalErrorHandler, JobGraphStoreFactory jobGraphStoreFactory, Executor ioExecutor, RpcService rpcService, PartialDispatcherServices partialDispatcherServices) throws Exception { final DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory = dispatcherLeaderProcessFactoryFactory.createFactory( jobGraphStoreFactory, ioExecutor, rpcService, partialDispatcherServices, fatalErrorHandler); return DefaultDispatcherRunner.create( leaderElectionService, fatalErrorHandler, dispatcherLeaderProcessFactory); } --------------------------------------------------------- dispatcherRunner = dispatcherRunnerFactory .createDispatcherRunner(highAvailabilityServices.getDispatcherLeaderElectionService(), fatalErrorHandler, new HaServicesJobGraphStoreFactory (highAvailabilityServices), ioExecutor, rpcService, partialDispatcherServices); log.debug("Starting ResourceManager." );
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public static DispatcherRunner create (LeaderElectionService leaderElectionService, FatalErrorHandler fatalErrorHandler, DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory) throws Exception { final DefaultDispatcherRunner dispatcherRunner = new DefaultDispatcherRunner (leaderElectionService, fatalErrorHandler, dispatcherLeaderProcessFactory); return DispatcherRunnerLeaderElectionLifecycleManager.createFor(dispatcherRunner, leaderElectionService); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public static <T extends DispatcherRunner & LeaderContender> DispatcherRunner createFor (T dispatcherRunner, LeaderElectionService leaderElectionService) throws Exception { return new DispatcherRunnerLeaderElectionLifecycleManager <>(dispatcherRunner, leaderElectionService); } -------------------------------------------------------- private DispatcherRunnerLeaderElectionLifecycleManager (T dispatcherRunner, LeaderElectionService leaderElectionService) throws Exception { this .dispatcherRunner = dispatcherRunner; this .leaderElectionService = leaderElectionService; leaderElectionService.start(dispatcherRunner); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 @Override public void start (LeaderContender contender) throws Exception { Preconditions.checkNotNull(contender, "Contender must not be null." ); Preconditions.checkState(leaderContender == null , "Contender was already set." ); LOG.info("Starting ZooKeeperLeaderElectionService {}." , this ); synchronized (lock) { client.getUnhandledErrorListenable().addListener(this ); leaderContender = contender; leaderLatch.addListener(this ); leaderLatch.start(); cache.getListenable().addListener(this ); cache.start(); client.getConnectionStateListenable().addListener(listener); running = true ; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Override public void isLeader () { synchronized (lock) { if (running) { issuedLeaderSessionID = UUID.randomUUID(); clearConfirmedLeaderInformation(); if (LOG.isDebugEnabled()) { LOG.debug("Grant leadership to contender {} with session ID {}." , leaderContender.getDescription(), issuedLeaderSessionID); } leaderContender.grantLeadership(issuedLeaderSessionID); } else { LOG.debug("Ignoring the grant leadership notification since the service has " + "already been stopped." ); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @Override public void grantLeadership (UUID leaderSessionID) { runActionIfRunning(() -> startNewDispatcherLeaderProcess(leaderSessionID)); } ------------------------------------------------------- private void startNewDispatcherLeaderProcess (UUID leaderSessionID) { stopDispatcherLeaderProcess(); dispatcherLeaderProcess = createNewDispatcherLeaderProcess(leaderSessionID); final DispatcherLeaderProcess newDispatcherLeaderProcess = dispatcherLeaderProcess; FutureUtils.assertNoException(previousDispatcherLeaderProcessTerminationFuture.thenRun(newDispatcherLeaderProcess::start)); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 @Override public final void start () { runIfStateIs(State.CREATED, this ::startInternal); } -------------------------------------------------- final void runIfStateIs (State expectedState, Runnable action) { runIfState(expectedState::equals, action); } ------------------------------------------------------- private void startInternal () { log.info("Start {}." , getClass().getSimpleName()); state = State.RUNNING; onStart(); } ------------------------------------------------------- @Override protected void onStart () { startServices(); onGoingRecoveryOperation = recoverJobsAsync() .thenAccept(this ::createDispatcherIfRunning) .handle(this ::onErrorIfRunning); }
1 jobGraphStore.start(this );
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 private void createDispatcherIfRunning (Collection<JobGraph> jobGraphs) { runIfStateIs(State.RUNNING, () -> createDispatcher(jobGraphs)); } ------------------------------------------------------------ private void createDispatcher (Collection<JobGraph> jobGraphs) { final DispatcherGatewayService dispatcherService = dispatcherGatewayServiceFactory.create( DispatcherId.fromUuid(getLeaderSessionId()), jobGraphs, jobGraphStore); completeDispatcherSetup(dispatcherService); } ---------------------------------------------------------- @Override public AbstractDispatcherLeaderProcess.DispatcherGatewayService create (DispatcherId fencingToken, Collection<JobGraph> recoveredJobs, JobGraphWriter jobGraphWriter) { final DispatcherBootstrap bootstrap = new DefaultDispatcherBootstrap (recoveredJobs); final Dispatcher dispatcher; try { dispatcher = dispatcherFactory.createDispatcher(rpcService, fencingToken, bootstrap, PartialDispatcherServicesWithJobGraphStore.from(partialDispatcherServices, jobGraphWriter)); } catch (Exception e) { throw new FlinkRuntimeException ("Could not create the Dispatcher rpc endpoint." , e); } dispatcher.start(); return DefaultDispatcherGatewayService.from(dispatcher); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @Override public StandaloneDispatcher createDispatcher (RpcService rpcService, DispatcherId fencingToken, DispatcherBootstrap dispatcherBootstrap, PartialDispatcherServicesWithJobGraphStore partialDispatcherServicesWithJobGraphStore) throws Exception { return new StandaloneDispatcher (rpcService, fencingToken, dispatcherBootstrap, DispatcherServices.from(partialDispatcherServicesWithJobGraphStore, DefaultJobManagerRunnerFactory.INSTANCE)); } ------------------------------------------------------- public StandaloneDispatcher (RpcService rpcService, DispatcherId fencingToken, DispatcherBootstrap dispatcherBootstrap, DispatcherServices dispatcherServices) throws Exception { super (rpcService, fencingToken, dispatcherBootstrap, dispatcherServices); } ---------------------------------------------
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @Override public void onStart () throws Exception { try { startDispatcherServices(); } catch (Exception e) { final DispatcherException exception = new DispatcherException (String.format("Could not start the Dispatcher %s" , getAddress()), e); onFatalError(exception); throw exception; } dispatcherBootstrap.initialize(this , this .getRpcService().getScheduledExecutor()); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private void startDispatcherServices () throws Exception { try { registerDispatcherMetrics(jobManagerMetricGroup); } catch (Exception e) { handleStartDispatcherServicesException(e); } } ------------------------------------------------------------------------ private void registerDispatcherMetrics (MetricGroup jobManagerMetricGroup) { jobManagerMetricGroup.gauge(MetricNames.NUM_RUNNING_JOBS, () -> (long ) jobManagerRunnerFutures.size()); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 dispatcherBootstrap.initialize(this , this .getRpcService().getScheduledExecutor()); ----------------------------------------------------- @Override public void initialize (final Dispatcher dispatcher, ScheduledExecutor scheduledExecutor) { launchRecoveredJobGraphs(dispatcher, recoveredJobs); recoveredJobs.clear(); } --------------------------------------------------------- protected void launchRecoveredJobGraphs (final Dispatcher dispatcher, final Collection<JobGraph> recoveredJobGraphs) { checkNotNull(dispatcher); checkNotNull(recoveredJobGraphs); for (JobGraph recoveredJob : recoveredJobGraphs) { dispatcher.runRecoveredJob(recoveredJob); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 void runRecoveredJob (final JobGraph recoveredJob) { checkNotNull(recoveredJob); FutureUtils.assertNoException(runJob(recoveredJob).handle(handleRecoveredJobStartError(recoveredJob.getJobID()))); } --------------------------------------------------------------------- private CompletableFuture<Void> runJob (JobGraph jobGraph) { Preconditions.checkState(!jobManagerRunnerFutures.containsKey(jobGraph.getJobID())); final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph); jobManagerRunnerFutures.put(jobGraph.getJobID(), jobManagerRunnerFuture); return jobManagerRunnerFuture.thenApply( FunctionUtils.uncheckedFunction(this ::startJobManagerRunner) ).thenApply( FunctionUtils.nullFn() ).whenCompleteAsync( (ignored, throwable) -> { if (throwable != null ) { jobManagerRunnerFutures.remove(jobGraph.getJobID()); } }, getMainThreadExecutor()); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 private CompletableFuture<JobManagerRunner> createJobManagerRunner (JobGraph jobGraph) { final RpcService rpcService = getRpcService(); return CompletableFuture.supplyAsync(() -> { try { return jobManagerRunnerFactory .createJobManagerRunner(jobGraph, configuration, rpcService, highAvailabilityServices, heartbeatServices, jobManagerSharedServices, new DefaultJobManagerJobMetricGroupFactory (jobManagerMetricGroup), fatalErrorHandler); } catch (Exception e) { throw new CompletionException (new JobExecutionException (jobGraph.getJobID(), "Could not instantiate JobManager." , e)); } }, rpcService.getExecutor()); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 @Override public JobManagerRunner createJobManagerRunner ( JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, JobManagerSharedServices jobManagerServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler) throws Exception { final JobMasterConfiguration jobMasterConfiguration = JobMasterConfiguration.fromConfiguration(configuration); final SlotPoolFactory slotPoolFactory = DefaultSlotPoolFactory.fromConfiguration(configuration); final SchedulerFactory schedulerFactory = DefaultSchedulerFactory.fromConfiguration(configuration); final SchedulerNGFactory schedulerNGFactory = SchedulerNGFactoryFactory.createSchedulerNGFactory(configuration); final ShuffleMaster<?> shuffleMaster = ShuffleServiceLoader.loadShuffleServiceFactory(configuration).createShuffleMaster(configuration); final JobMasterServiceFactory jobMasterFactory = new DefaultJobMasterServiceFactory ( jobMasterConfiguration, slotPoolFactory, schedulerFactory, rpcService, highAvailabilityServices, jobManagerServices, heartbeatServices, jobManagerJobMetricGroupFactory, fatalErrorHandler, schedulerNGFactory, shuffleMaster); return new JobManagerRunnerImpl ( jobGraph, jobMasterFactory, highAvailabilityServices, jobManagerServices.getLibraryCacheManager().registerClassLoaderLease(jobGraph.getJobID()), jobManagerServices.getScheduledExecutorService(), fatalErrorHandler); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 jobManagerRunnerFutures.put(jobGraph.getJobID(), jobManagerRunnerFuture); return jobManagerRunnerFuture.thenApply( FunctionUtils.uncheckedFunction(this ::startJobManagerRunner) ).thenApply( FunctionUtils.nullFn() ).whenCompleteAsync( (ignored, throwable) -> { if (throwable != null ) { jobManagerRunnerFutures.remove(jobGraph.getJobID()); } }, getMainThreadExecutor());
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 private JobManagerRunner startJobManagerRunner (JobManagerRunner jobManagerRunner) throws Exception { final JobID jobId = jobManagerRunner.getJobID(); FutureUtils.assertNoException( jobManagerRunner.getResultFuture().handleAsync((ArchivedExecutionGraph archivedExecutionGraph, Throwable throwable) -> { final JobManagerRunner currentJobManagerRunner = Optional.ofNullable(jobManagerRunnerFutures.get(jobId)) .map(future -> future.getNow(null )).orElse(null ); if (jobManagerRunner == currentJobManagerRunner) { if (archivedExecutionGraph != null ) { jobReachedGloballyTerminalState(archivedExecutionGraph); } else { final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable); if (strippedThrowable instanceof JobNotFinishedException) { jobNotFinished(jobId); } else { jobMasterFailed(jobId, strippedThrowable); } } } else { log.debug("There is a newer JobManagerRunner for the job {}." , jobId); } return null ; }, getMainThreadExecutor())); jobManagerRunner.start(); return jobManagerRunner; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 @Override public void start (LeaderContender contender) throws Exception { Preconditions.checkNotNull(contender, "Contender must not be null." ); Preconditions.checkState(leaderContender == null , "Contender was already set." ); LOG.info("Starting ZooKeeperLeaderElectionService {}." , this ); synchronized (lock) { client.getUnhandledErrorListenable().addListener(this ); leaderContender = contender; leaderLatch.addListener(this ); leaderLatch.start(); cache.getListenable().addListener(this ); cache.start(); client.getConnectionStateListenable().addListener(listener); running = true ; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @Override public void grantLeadership (UUID leaderSessionID) { runActionIfRunning(() -> startNewDispatcherLeaderProcess(leaderSessionID)); } ------------------------------------------------------------- private void startNewDispatcherLeaderProcess (UUID leaderSessionID) { stopDispatcherLeaderProcess(); dispatcherLeaderProcess = createNewDispatcherLeaderProcess(leaderSessionID); final DispatcherLeaderProcess newDispatcherLeaderProcess = dispatcherLeaderProcess; FutureUtils.assertNoException(previousDispatcherLeaderProcessTerminationFuture.thenRun(newDispatcherLeaderProcess::start)); }