Flink源码解析(六)
2024-12-23 14:51:52 # Flink # 源码解析 # Dispatcher启动流程

Flink源码解析(六)

Dispatcher启动流程

接下来我们看一下这里的创建Dispatcher的工厂类

这里还是来到我们创建的三个工厂类里面

1
2
3
4
log.debug("Starting Dispatcher.");
dispatcherRunner = dispatcherRunnerFactory
.createDispatcherRunner(highAvailabilityServices.getDispatcherLeaderElectionService(), fatalErrorHandler,
new HaServicesJobGraphStoreFactory(highAvailabilityServices), ioExecutor, rpcService, partialDispatcherServices);

我们在这里没有看到启动的命令,这里我们需要往里面找一下,但是在这之前先看一下Dispatcher代码的体系结构

我们找到dispatcher代码看一下他的体系结构

1
public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<DispatcherId> implements DispatcherGateway {

我们首先看到了这个Dispatcher是一个抽象类,说明我们要点下去看,里面可能不是他的实现内容

然后里面有很多的初始化变量,子类初始化一部分,父类初始化一部分

我们先说一下这个步骤十二做了什么

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 步骤十二:
* 创建并启动 Dispatcher
* DispatcherRunnerLeaderElectionLifecycleManager dispatcherRunner
* DefaultDispatcherRunnerFactory dispatcherRunnerFactory
* 作用:Dispather
* Dispatcher组件负责接收作业的提交、对作业进行持久化、
* 产生新的JobMaster执行作业、在Job-Manager节点崩溃恢复时恢复所有作业的执行,
* 以及管理作业对应JobMaster的状态。Dispatcher组件的基础类为Dispatcher
*
* Dispatcher作为抽象类:
* 继承 FencedRpcEndpoint 类,来外部提供RPC ( Remote Proce-dure Call,远程过程调用);
* Dis-patcher实现LeaderContender接口,来处理首领选举;
* Dispatcher实现DispatcherGateway接口,提供给REST组件通过RPC调用的方法来暴露其服务;
* Dispatcher实现SubmittedJobGraphListener接口,来实现侦听持久化作业信息变更后的处理逻辑。
* MiniDispatcher类和StandaloneDispatcher类作为Dispatcher的子类实现。两者的不同是,
* MiniDispatcher类是作为Per-Job模式(一个作业对应一个集群的模式)的实现,
* 而StandaloneDispatcher是作为Session模式(一个集群可以有多个作业的模式)的实现。
* /

我们主要看的是standaloneDispatcher这个

上面是简单的概括一下,下面我们具体来看

首先回到我们之前的工厂里面找到这个位置

1
2
3
4
log.debug("Starting Dispatcher.");
dispatcherRunner = dispatcherRunnerFactory
.createDispatcherRunner(highAvailabilityServices.getDispatcherLeaderElectionService(), fatalErrorHandler,
new HaServicesJobGraphStoreFactory(highAvailabilityServices), ioExecutor, rpcService, partialDispatcherServices);

发现这里没有start,我们看一下createDispatcherRunner

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Override
public DispatcherRunner createDispatcherRunner(

LeaderElectionService leaderElectionService,
FatalErrorHandler fatalErrorHandler,

JobGraphStoreFactory jobGraphStoreFactory,
Executor ioExecutor,
RpcService rpcService,
PartialDispatcherServices partialDispatcherServices) throws Exception {

// dispatcherLeaderProcessFactoryFactory = SessionDispatcherLeaderProcessFactoryFactory
final DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory = dispatcherLeaderProcessFactoryFactory.createFactory(
jobGraphStoreFactory,
ioExecutor,
rpcService,
partialDispatcherServices,
fatalErrorHandler);

/**
*
* 注释:
* 第一个参数:ZooKeeperLeaderElectionService
* 第四个参数:SessionDispatcherLeaderProcessFactoryFactory
*/
return DefaultDispatcherRunner.create(
leaderElectionService,
fatalErrorHandler,
dispatcherLeaderProcessFactory);
}


---------------------------------------------------------
dispatcherRunner = dispatcherRunnerFactory
.createDispatcherRunner(highAvailabilityServices.getDispatcherLeaderElectionService(), fatalErrorHandler,
new HaServicesJobGraphStoreFactory(highAvailabilityServices), ioExecutor, rpcService, partialDispatcherServices);
log.debug("Starting ResourceManager.");

这里是我们看到createFactory是初始化这个工厂,传入了一些服务有关的内容和HA有关的内容还有一个JobGraph存储有关的工厂

我们点进去这个DefaultDispatcherRunner.create的方法看一下,这里传入的其实就是把上面的选举的初始化后的内容和一些前面创建的dispatcherLeaderProcessFactory这个工厂的内容传入到这里再创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static DispatcherRunner create(LeaderElectionService leaderElectionService, FatalErrorHandler fatalErrorHandler,
DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory) throws Exception {

/**
*
* 注释:
* 第一个参数: ZooKeeperLeaderElectionService
* 第三个参数: SessionDispatcherLeaderProcessFactoryFactory
*/
final DefaultDispatcherRunner dispatcherRunner = new DefaultDispatcherRunner(leaderElectionService, fatalErrorHandler,
dispatcherLeaderProcessFactory);

/**
*
* 注释: 开启 DispatcherRunner 的声明周期
* 第一个参数: dispatcherRunner = DefaultDispatcherRunner
* 第二个参数: leaderElectionService = ZooKeeperLeaderElectionService
*/
return DispatcherRunnerLeaderElectionLifecycleManager.createFor(dispatcherRunner, leaderElectionService);
}

这里最后return的内容createfor我们再看一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public static <T extends DispatcherRunner & LeaderContender> DispatcherRunner createFor(T dispatcherRunner, LeaderElectionService leaderElectionService) throws Exception {

/**
*
* 注释:
* 第一个参数: dispatcherRunner = DefaultDispatcherRunner
* 第二个参数: leaderElectionService = ZooKeeperLeaderElectionService
*/
return new DispatcherRunnerLeaderElectionLifecycleManager<>(dispatcherRunner, leaderElectionService);
}


--------------------------------------------------------
private DispatcherRunnerLeaderElectionLifecycleManager(T dispatcherRunner, LeaderElectionService leaderElectionService) throws Exception {
this.dispatcherRunner = dispatcherRunner;
this.leaderElectionService = leaderElectionService;

/***
* 注释: 启动选举
* 参数:dispatcherRunner = DefaultDispatcherRunner
* 调用对象:leaderElectionService = ZooKeeperLeaderElectionService
* 这个选举服务对象 leaderElectionService 内部的 leaderContender 是 : DefaultDispatcherRunner
*/
leaderElectionService.start(dispatcherRunner);
}

这里其实就是选举了,使用前面传入的leader选举内容选举dispatcherRunner

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Override
public void start(LeaderContender contender) throws Exception {
Preconditions.checkNotNull(contender, "Contender must not be null.");
Preconditions.checkState(leaderContender == null, "Contender was already set.");

LOG.info("Starting ZooKeeperLeaderElectionService {}.", this);

synchronized(lock) {

client.getUnhandledErrorListenable().addListener(this);

// 注释: 这个值到底是什么,根据情况而定
leaderContender = contender;

/**
*
* 注释: Fink 的 选举,和 HBase 一样都是通过 ZooKeeper 的 API 框架 Curator 实现的
* 1、leaderLatch.start(); 事实上就是举行选举
* 2、当选举结束的时候:
* 如果成功了: isLeader()
* 如果失败了: notLeader()
*/
leaderLatch.addListener(this);
leaderLatch.start();

/**
*
* 注释: 注册监听器,如果选举结束之后:
* 1、自己成为 Leader, 则会回调 isLeader() 进行处理
* 2、自己成为 Follower,则会回调 notLeader() 进行处理
*/
cache.getListenable().addListener(this);
cache.start();

client.getConnectionStateListenable().addListener(listener);

running = true;
}
}

我们发现又来到了zookeeper的这个选举方法里面,这里和前面的内容就差不多了

选举成功之后会调用isleader,然后走到grantLeadership

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
public void isLeader() {
synchronized(lock) {
if(running) {
issuedLeaderSessionID = UUID.randomUUID();
clearConfirmedLeaderInformation();

if(LOG.isDebugEnabled()) {
LOG.debug("Grant leadership to contender {} with session ID {}.", leaderContender.getDescription(), issuedLeaderSessionID);
}

/**
* 注释: 分配 LeaderShip
* leaderContender = this = WebMonitorEndpoint
* WebMonitorEndpoint.grantLeadership -> 确认机制
*/
leaderContender.grantLeadership(issuedLeaderSessionID);

} else {
LOG.debug("Ignoring the grant leadership notification since the service has " + "already been stopped.");
}
}
}

我们来看一下这个grantLeadership里面,现在进的是DefaultDispatcherRunner里面了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Override
public void grantLeadership(UUID leaderSessionID) {

/**
* 开启 Dispatcher 服务
*/
runActionIfRunning(() -> startNewDispatcherLeaderProcess(leaderSessionID));
}

-------------------------------------------------------


private void startNewDispatcherLeaderProcess(UUID leaderSessionID) {

// 先停掉已有的dispatcherLeader
stopDispatcherLeaderProcess();

// 创建SessionDispatcherLeaderProcess
dispatcherLeaderProcess = createNewDispatcherLeaderProcess(leaderSessionID);

final DispatcherLeaderProcess newDispatcherLeaderProcess = dispatcherLeaderProcess;

/**
* 注释: 再启动一个新的
* 调用: SessionDispatcherLeaderProcess.start()
*/
FutureUtils.assertNoException(previousDispatcherLeaderProcessTerminationFuture.thenRun(newDispatcherLeaderProcess::start));
}

这里的逻辑就是先停掉一个已有的leader然后去创建一个新leader,然后赋值,然后启动一个新的sessionDispatcher然后走thenRun

我们直接看最后的那个start,进入AbstractDispatcherLeaderProcess

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@Override
public final void start() {

/**
* 注释: 调用 startInternal()
*/
runIfStateIs(State.CREATED, this::startInternal);
}

--------------------------------------------------

final void runIfStateIs(State expectedState, Runnable action) {
runIfState(expectedState::equals, action);
}

-------------------------------------------------------

private void startInternal() {
log.info("Start {}.", getClass().getSimpleName());
state = State.RUNNING;
// SessionDispatcherLeaderProcess
onStart();
}
-------------------------------------------------------
//这里的onstart调用的是SessionDispatcherLeaderProcess这个类中的onstart方法

@Override
protected void onStart() {

/**
* 开启服务: 启动 JobGraghStore
* 一个用来存储 JobGragh 的存储组件
*/
startServices();

//可以恢复作业
onGoingRecoveryOperation = recoverJobsAsync()

/**
* 进入 createDispatcherIfRunning()
*/
.thenAccept(this::createDispatcherIfRunning)
.handle(this::onErrorIfRunning);
}

这里是启动了一个jobGraphStore用来存储JobGraph的组件

1
jobGraphStore.start(this);

然后可以再恢复作业

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
private void createDispatcherIfRunning(Collection<JobGraph> jobGraphs) {

runIfStateIs(State.RUNNING, () -> createDispatcher(jobGraphs));
}





------------------------------------------------------------

private void createDispatcher(Collection<JobGraph> jobGraphs) {

/**
* DefaultDispatcherGatewayServiceFactory
* 进入create
*/
final DispatcherGatewayService dispatcherService = dispatcherGatewayServiceFactory.create(
DispatcherId.fromUuid(getLeaderSessionId()),
jobGraphs,
jobGraphStore);
completeDispatcherSetup(dispatcherService);
}


----------------------------------------------------------
@Override
public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(DispatcherId fencingToken, Collection<JobGraph> recoveredJobs,
JobGraphWriter jobGraphWriter) {

// 注释: Dispatcher 的一个默认引导程序
// 注释: 待恢复执行的 job 的集合
final DispatcherBootstrap bootstrap = new DefaultDispatcherBootstrap(recoveredJobs);

final Dispatcher dispatcher;
try {

/**
* 创建 Dispatcher 实例
* dispatcherFactory = SessionDispatcherFactory
*/
dispatcher = dispatcherFactory.createDispatcher(rpcService, fencingToken, bootstrap,
// 注释: PartialDispatcherServicesWithJobGraphStore
PartialDispatcherServicesWithJobGraphStore.from(partialDispatcherServices, jobGraphWriter));
} catch(Exception e) {
throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e);
}

/***
* 注释: Dispatcher 也是一个 RpcEndpoint 启动起来了之后,给自己发送一个 Hello 消息证明启动
*/
dispatcher.start();


// 注释: 返回一个返回值
return DefaultDispatcherGatewayService.from(dispatcher);
}

我们一直往下走,发现终于看见了这个start,dispatcher.start();这个内容终于看见创建了

我们看一下createDispatcher这个方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Override
public StandaloneDispatcher createDispatcher(RpcService rpcService, DispatcherId fencingToken, DispatcherBootstrap dispatcherBootstrap,
PartialDispatcherServicesWithJobGraphStore partialDispatcherServicesWithJobGraphStore) throws Exception {

/**
* 注释: 构建一个 StandaloneDispatcher
* 进入StandaloneDispatcher
*/

return new StandaloneDispatcher(rpcService, fencingToken, dispatcherBootstrap,
// 注释:
DispatcherServices.from(partialDispatcherServicesWithJobGraphStore, DefaultJobManagerRunnerFactory.INSTANCE));
}


-------------------------------------------------------
public StandaloneDispatcher(RpcService rpcService, DispatcherId fencingToken, DispatcherBootstrap dispatcherBootstrap,
DispatcherServices dispatcherServices) throws Exception {
/**
* Dispatcher的构造
*/
super(rpcService, fencingToken, dispatcherBootstrap, dispatcherServices);
}
---------------------------------------------

到这里了,我们点进父类看一下有没有onstart方法

发现有

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Override
public void onStart() throws Exception {
try {

/**
* 启动 Dispatcher 服务
*
* 2种情况
* 1. 第一次启动
* 2. 失败了恢复
* 2.1 启动暂停的作业。(检测作业是否恢复)
*/
//启动后的第一件事
startDispatcherServices();

} catch(Exception e) {
final DispatcherException exception = new DispatcherException(String.format("Could not start the Dispatcher %s", getAddress()), e);
onFatalError(exception);
throw exception;
}

/**
* 第二件事
* 把所有中断的 job 恢复执行
* 进入initialize
*/
dispatcherBootstrap.initialize(this, this.getRpcService().getScheduledExecutor());
}

这里的启动我们分两种情况,第一种是第一次启动,第二种是失败后集群恢复

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private void startDispatcherServices() throws Exception {
try {
/**
* 对jobManager组进行性能监控,有多少个Job在运行
* */
registerDispatcherMetrics(jobManagerMetricGroup);
} catch(Exception e) {
handleStartDispatcherServicesException(e);
}
}

------------------------------------------------------------------------
private void registerDispatcherMetrics(MetricGroup jobManagerMetricGroup) {
jobManagerMetricGroup.gauge(MetricNames.NUM_RUNNING_JOBS, () -> (long) jobManagerRunnerFutures.size());
}



这个启动的第一件事就是先注册了一个监控,监控jobmanager组,这里是看看有多少个job在运行,然后就这个方法结束了,然后我们进行第二件事

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* 第二件事
* 把所有中断的 job 恢复执行
* 进入initialize
*/
dispatcherBootstrap.initialize(this, this.getRpcService().getScheduledExecutor());

-----------------------------------------------------
@Override
public void initialize(final Dispatcher dispatcher, ScheduledExecutor scheduledExecutor) {

/**
* 恢复JobGraghs
*/
launchRecoveredJobGraphs(dispatcher, recoveredJobs);

// 恢复执行之后的清理
recoveredJobs.clear();
}


---------------------------------------------------------
protected void launchRecoveredJobGraphs(final Dispatcher dispatcher, final Collection<JobGraph> recoveredJobGraphs) {
checkNotNull(dispatcher);
checkNotNull(recoveredJobGraphs);

/**
* 循环进行recoveredJob判断,满足条件进行恢复.
* */
for (JobGraph recoveredJob : recoveredJobGraphs) {
dispatcher.runRecoveredJob(recoveredJob);
}
}


这里就是恢复job的任务,会循环恢复每一个job

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
void runRecoveredJob(final JobGraph recoveredJob) {
checkNotNull(recoveredJob);
/**
* runJob 进入内部查看
*/
FutureUtils.assertNoException(runJob(recoveredJob).handle(handleRecoveredJobStartError(recoveredJob.getJobID())));
}


---------------------------------------------------------------------
private CompletableFuture<Void> runJob(JobGraph jobGraph) {
/**
* 要恢复的job是否有效
* */
Preconditions.checkState(!jobManagerRunnerFutures.containsKey(jobGraph.getJobID()));

/***
*
* 创建 JobManagerRunner
* obManagerRunner构建JobMaster用来负责作业的运行;
*/
final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph);

/**
* 将当前的待回复的job,添加到jobManagerRunnerFutures组
* */
jobManagerRunnerFutures.put(jobGraph.getJobID(), jobManagerRunnerFuture);

/**
* 注释: 启动 JobManagerRunner
*/
return jobManagerRunnerFuture.thenApply(

/**
* this::startJobManagerRunner
*/
FunctionUtils.uncheckedFunction(this::startJobManagerRunner)

).thenApply(
FunctionUtils.nullFn()
).whenCompleteAsync(
(ignored, throwable) -> {
if(throwable != null) {
jobManagerRunnerFutures.remove(jobGraph.getJobID());
}
}, getMainThreadExecutor());
}


这里就是主要的恢复逻辑,我们首先看一下这里的创建这个jobMaster的内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraph jobGraph) {
final RpcService rpcService = getRpcService();

return CompletableFuture.supplyAsync(() -> {
try {

/*************************************************
*
* 注释: createJobManagerRunner 方法返回 JobManagerRunnerImpl
* 在 JobManagerRunnerImpl 初始化的时候,初始化了一个 JobMaster 对象
*/
return jobManagerRunnerFactory
.createJobManagerRunner(jobGraph, configuration, rpcService, highAvailabilityServices, heartbeatServices,
jobManagerSharedServices, new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup), fatalErrorHandler);
} catch(Exception e) {
throw new CompletionException(new JobExecutionException(jobGraph.getJobID(), "Could not instantiate JobManager.", e));
}
}, rpcService.getExecutor());
}

这里是启动了一个rpc的服务,然后我们再往里面看

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@Override
public JobManagerRunner createJobManagerRunner(
JobGraph jobGraph,
Configuration configuration,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
JobManagerSharedServices jobManagerServices,
JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
FatalErrorHandler fatalErrorHandler) throws Exception {

final JobMasterConfiguration jobMasterConfiguration = JobMasterConfiguration.fromConfiguration(configuration);

final SlotPoolFactory slotPoolFactory = DefaultSlotPoolFactory.fromConfiguration(configuration);
final SchedulerFactory schedulerFactory = DefaultSchedulerFactory.fromConfiguration(configuration);
final SchedulerNGFactory schedulerNGFactory = SchedulerNGFactoryFactory.createSchedulerNGFactory(configuration);
final ShuffleMaster<?> shuffleMaster = ShuffleServiceLoader.loadShuffleServiceFactory(configuration).createShuffleMaster(configuration);

/*************************************************
*
* 注释: 生成 DefaultJobMasterServiceFactory
*/
final JobMasterServiceFactory jobMasterFactory = new DefaultJobMasterServiceFactory(
jobMasterConfiguration,
slotPoolFactory,
schedulerFactory,
rpcService,
highAvailabilityServices,
jobManagerServices,
heartbeatServices,
jobManagerJobMetricGroupFactory,
fatalErrorHandler,
schedulerNGFactory,
shuffleMaster);

/*************************************************
*
* 注释: 返回 JobManagerRunnerImpl
* 负责启动 JobMaster
*/
return new JobManagerRunnerImpl(
jobGraph,
jobMasterFactory,
highAvailabilityServices,
jobManagerServices.getLibraryCacheManager().registerClassLoaderLease(jobGraph.getJobID()),
jobManagerServices.getScheduledExecutorService(),
fatalErrorHandler);
}

这里创建是启动了一个rpc然后进行一堆初始化,配置了HA,心跳什么的,最后返回了一个JobManagerRunnerImpl

我们继续往回退

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* 将当前的待回复的job,添加到jobManagerRunnerFutures组
* */
jobManagerRunnerFutures.put(jobGraph.getJobID(), jobManagerRunnerFuture);

/**
* 注释: 启动 JobManagerRunner
*/
return jobManagerRunnerFuture.thenApply(

/**
* this::startJobManagerRunner
*/
FunctionUtils.uncheckedFunction(this::startJobManagerRunner)

).thenApply(
FunctionUtils.nullFn()
).whenCompleteAsync(
(ignored, throwable) -> {
if(throwable != null) {
jobManagerRunnerFutures.remove(jobGraph.getJobID());
}
}, getMainThreadExecutor());

这里下面是我们将需要恢复的job添加到组里面,然后调用了startJobManagerRunner

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private JobManagerRunner startJobManagerRunner(JobManagerRunner jobManagerRunner) throws Exception {
final JobID jobId = jobManagerRunner.getJobID();

FutureUtils.assertNoException(
jobManagerRunner.getResultFuture().handleAsync((ArchivedExecutionGraph archivedExecutionGraph, Throwable throwable) -> {
// check if we are still the active JobManagerRunner by checking the identity
final JobManagerRunner currentJobManagerRunner = Optional.ofNullable(jobManagerRunnerFutures.get(jobId))
.map(future -> future.getNow(null)).orElse(null);
//noinspection ObjectEquality
if(jobManagerRunner == currentJobManagerRunner) {
if(archivedExecutionGraph != null) {
jobReachedGloballyTerminalState(archivedExecutionGraph);
} else {
final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);

if(strippedThrowable instanceof JobNotFinishedException) {
jobNotFinished(jobId);
} else {
jobMasterFailed(jobId, strippedThrowable);
}
}
} else {
log.debug("There is a newer JobManagerRunner for the job {}.", jobId);
}

return null;
}, getMainThreadExecutor()));

/*************************************************
*
* 注释: 启动 jobManagerRunner
*/
jobManagerRunner.start();

return jobManagerRunner;
}

这里首先就是拿到了一个之前加到组里面的jobid然后最后启动调用了.start

我们这里看一下这个start

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Override
public void start(LeaderContender contender) throws Exception {
Preconditions.checkNotNull(contender, "Contender must not be null.");
Preconditions.checkState(leaderContender == null, "Contender was already set.");

LOG.info("Starting ZooKeeperLeaderElectionService {}.", this);

synchronized(lock) {

client.getUnhandledErrorListenable().addListener(this);

// 注释: 这个值到底是什么,根据情况而定
leaderContender = contender;

/**
*
* 注释: Fink 的 选举,和 HBase 一样都是通过 ZooKeeper 的 API 框架 Curator 实现的
* 1、leaderLatch.start(); 事实上就是举行选举
* 2、当选举结束的时候:
* 如果成功了: isLeader()
* 如果失败了: notLeader()
*/
leaderLatch.addListener(this);
leaderLatch.start();

/**
*
* 注释: 注册监听器,如果选举结束之后:
* 1、自己成为 Leader, 则会回调 isLeader() 进行处理
* 2、自己成为 Follower,则会回调 notLeader() 进行处理
*/
cache.getListenable().addListener(this);
cache.start();

client.getConnectionStateListenable().addListener(listener);

running = true;
}
}

这里和之前的都差不多,这里的start注册了一堆心跳选举,然后去看isleader方法,然后看到grantLeadership方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Override
public void grantLeadership(UUID leaderSessionID) {

/**
* 开启 Dispatcher 服务
*/
runActionIfRunning(() -> startNewDispatcherLeaderProcess(leaderSessionID));
}


-------------------------------------------------------------
private void startNewDispatcherLeaderProcess(UUID leaderSessionID) {

// 先停掉已有的dispatcherLeader
stopDispatcherLeaderProcess();

// 创建SessionDispatcherLeaderProcess
dispatcherLeaderProcess = createNewDispatcherLeaderProcess(leaderSessionID);

final DispatcherLeaderProcess newDispatcherLeaderProcess = dispatcherLeaderProcess;

/**
* 注释: 再启动一个新的
* 调用: SessionDispatcherLeaderProcess.start()
*/
FutureUtils.assertNoException(previousDispatcherLeaderProcessTerminationFuture.thenRun(newDispatcherLeaderProcess::start));
}

这里的启动就又回到之前的内容了

这个就是整体的dispatcher的启动流程