Flink源码解析(二)
Flink心跳检测机制
Flink对各个组件监控统一使用心跳服务来进行管理
Flink心跳入口
首先心跳检测机制的集群入口是org.apache.flink.runtime.entrypoint.ClusterEntrypoint
在这个心跳检测的集群入口里面我们找到初始化类initializeServices
这个类是启动这个集群,然后进行初始化,这里传入的这个配置文件是从flink-conf.yaml中读取的。
Flink心跳初始化
接下来我们点进这个初始化的方法,会发现下面的一个内容
1 | /** |
这里的是我们的心跳服务的初始化,我们再点入这个createHeartbeatServices
方法
1 | /** |
点入这个解析配置的方法fromConfiguration
中看一下
1 | /** |
这里是一个配置文件中获取心跳间隔事件和超时事件,默认心跳间隔时间是每10s监测一次,心跳超时时间是50s外算超时
这里是一个解析传入的配置参数,我们再点入这个HeartbeatServices
方法看一下
HeartbeatServices
方法允许访问心跳所需的所有服务,包括创建心跳接收和发送
JobMaster中心跳监测内容
然后我们再去JobMaster
里面找到和心跳有关的内容
1 | public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway, JobMasterService { |
这里这个JobMaster实现的是网关的内容和JobMaster服务的内容
下面是JobMaster里面对心跳监测有关的内容
TaskManager心跳检测
这里首先对TaskManager的心跳检测做一个简要的概括
创建taskManager心跳管理
执行:HeartbeatManagerSenderImpl
HeartbeatManagerSenderImpl是HeartbeatManagerImpl的子类,
由心跳管理的一方(例如JM)创建,创建后立即开启周期调度线程,
每次遍历自己管理的heartbeatTarget,触发heartbeatTarget.requestHeartbeat,
属于主动触发。
进入
createHeartbeatManagerSender
方法后发现return了一个HeartbeatManagerSenderImpl
1
2
3
4
5public class HeartbeatManagerSenderImpl<I, O> extends HeartbeatManagerImpl<I, O> implements Runnable {
public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O> {
public interface HeartbeatManager<I, O> extends HeartbeatTarget<I> {
public interface HeartbeatTarget<I> {我们来到最底层的这个
HeartbeatTarget
里面来看,这个方法主要是可以接收监控目标发送的心跳请求信息,也可以对监控目标发送心跳请求,心跳响应和心跳请求都可以携带一个心跳数据1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35/**
* Interface for components which can be sent heartbeats and from which one can request a
* heartbeat response().
* Both the heartbeat response as well as the heartbeat request can carry a
* payload. This payload is reported to the heartbeat target and contains additional information.
* The payload can be empty which is indicated by a null value.
*
* 可以发送心跳,可以从其中请求心跳响应
* 心跳响应和心跳请求都可以携带一个payload。该payload被报告给心跳目标,
* 并包含附加信息。payload可以是空的,这是由一个空值表示的。
*
* @param <I> Type of the payload which is sent to the heartbeat target
* 发送到心跳目标的payload的类型
*/
public interface HeartbeatTarget<I> {
/**
* 接收监控目标发送来的心跳请求信息
* @param heartbeatOrigin 标识应报告心跳的机器的资源 ID。
* @param heartbeatPayload 心跳响应携带的数据
*/
void receiveHeartbeat(ResourceID heartbeatOrigin, I heartbeatPayload);
/**
* 向监控目标发送心跳请求
* 请求目标的心跳。 每个心跳请求都可以携带一个payload,其中包含心跳目标的附加信息。
* Requests a heartbeat from the target. Each heartbeat request can carry a payload which contains additional information for the heartbeat target.
*
* @param requestOrigin Resource ID identifying the machine issuing the heartbeat request.
* 标识发出心跳请求的机器的资源 ID。
* @param heartbeatPayload Payload of the heartbeat request. Null indicates an empty payload.
* 心跳请求的Payload。 Null表示空负载。
*/
void requestHeartbeat(ResourceID requestOrigin, I heartbeatPayload);
}这里是一个接口,我们看一下这个接口中的实现
这里的逻辑其实就是检查是不是还有心跳,如果检查到还有心跳回应,那么就重置心跳检测的时间,我们点进去看一下
reportHeartbeat
这个方法我们这里再点进去这个
reportHeartbeat
方法中去这里就是重置我们的超时线程
我们再往下面看,下面是一个
1
2
3if (heartbeatPayload != null) {
heartbeatListener.reportPayload(heartbeatOrigin, heartbeatPayload);
}
这里是一个herartbeatListener调用的reportPayload方法
这里是这个HeartbeatListener
的内容,这里的reportPayload
方法就是接收到有关心跳的payload就会调用