Flink源码解析(七)
2024-12-23 17:05:20 # Flink # 源码解析 # JobManager启动总结

JobManager启动总结

工厂类开始总结

WebMonitorEndPoint

webMonitorEndpoint实现了很多的handle

这些handler也就对应着界面用户想要看的一些数据,比如集群使用量啊,flink当前状态啊各种的内容,这些的处理器就是这些handler,这是在创建工厂类的时候做的

然后在start启动的时候首先就去初始化了这些handler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 步骤二十二:
* 初始化各种Handler
* 查看initializeHandlers代码实现
*/
handlers = initializeHandlers(restAddressFuture);

/**
针对所有的 Handlers 进行排序,排序规则:RestHandlerUrlComparator
sort the handlers such that they are ordered the following:
一个url对应一个handler
* /jobs
* /jobs/overview
* /jobs/:jobid
* /jobs/:jobid/config
*/
Collections.sort(handlers, RestHandlerUrlComparator.INSTANCE);

然后去

1
2
3
4
5
/**
* 启动清理的定时任务
* 选举机制
*/
startInternal();

去zk里面选举然后走isleader选举,选举玩开启定时任务,主要是删除一些临时文件和缓存文件,这里会cleanup,看看这个文件的时间,如果超过了ttl时间就删除

然后就结束了

ResourceManager

首先创建实例

然后看一下这个StandaloneResourceManager体系

一直点到最后的rpcendpoint看rpcserver服务

这里和一开始的akka服务就相关了,主要是动态代理actorRef对象

然后我们去他的体系里面的类关系中找onstart方法,发现在ResourceManager中,再到grantLeadership

然后确认leader之前需要首先判断是否存在这个leaderReasourceManager然后两个方法,一个是启动心跳一个是开启两个定时任务

启动心跳就是监听了TaskManager的心跳和JobManager的心跳

开启的两个定时任务是一个是检查TaskManager的心跳,一个是请求solt检查是否超时5分钟

在这两个定时任务下面还有一个监控,这个监控是监控taskslot使用多少和taskslot剩余多少

然后resourceManager启动成功后会给自己发一条启动成功的信息

Dispatcher

这个的主要还是先选举,选举之后走isleader

这个内容比较多暂不总结,可以看前面的详细的

http://www.luckiness.cc/2024/12/20/Flink%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90(%E5%85%AD)/

这里有一些点说一下

isleader后走onstart

这里的onstart分为了两种情况,一种是第一次启动的,一种是失败后的,这种失败后的需要恢复

onstart的第一件事是监控有多少job在运行,然后循环恢复,初始化一个JobManagerRunnerImpl,先把需要恢复的id和需要恢复的JobManagerRunnerImpl放在jobManagerRunner组中,然后启动JobManagerRunner进行恢复

启动了一个存储jobGraph的组件,然后启动的时候他也是给自己发送了一个消息证明已经启动了,因为和resourceManager一样都是走的RpcEndpoint