flink源码解析(二)
2024-12-18 10:13:14 # Flink # 源码解析 # Flink心跳检测机制

Flink源码解析(二)

Flink心跳检测机制

Flink对各个组件监控统一使用心跳服务来进行管理

Flink心跳入口

首先心跳检测机制的集群入口是org.apache.flink.runtime.entrypoint.ClusterEntrypoint在这个心跳检测的集群入口里面我们找到初始化类initializeServices

这个类是启动这个集群,然后进行初始化,这里传入的这个配置文件是从flink-conf.yaml中读取的。

Flink心跳初始化

接下来我们点进这个初始化的方法,会发现下面的一个内容

1
2
3
4
5
6
/**
* 心跳服务管理初始化
* */
heartbeatServices = createHeartbeatServices(configuration);


这里的是我们的心跳服务的初始化,我们再点入这个createHeartbeatServices方法

1
2
3
4
5
6
7
/**
* 解析配置参数:
* 进入fromConfiguration
* */
protected HeartbeatServices createHeartbeatServices(Configuration configuration) {
return HeartbeatServices.fromConfiguration(configuration);
}

点入这个解析配置的方法fromConfiguration中看一下

1
2
3
4
5
6
7
8
9
10
11
/**
* 从配置文件中提取
* 心跳间隔heartbeat.interval 默认 10秒
* 心跳超时时间heartbeat.timeout 默认 50秒
* 并创建HeartbeatServices
*/
public static HeartbeatServices fromConfiguration(Configuration configuration) {
long heartbeatInterval = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL);
long heartbeatTimeout = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT);
return new HeartbeatServices(heartbeatInterval, heartbeatTimeout);
}

这里是一个配置文件中获取心跳间隔事件和超时事件,默认心跳间隔时间是每10s监测一次,心跳超时时间是50s外算超时

这里是一个解析传入的配置参数,我们再点入这个HeartbeatServices方法看一下

HeartbeatServices方法允许访问心跳所需的所有服务,包括创建心跳接收和发送

JobMaster中心跳监测内容

然后我们再去JobMaster里面找到和心跳有关的内容

1
public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway, JobMasterService {

这里这个JobMaster实现的是网关的内容和JobMaster服务的内容

下面是JobMaster里面对心跳监测有关的内容

TaskManager心跳检测

这里首先对TaskManager的心跳检测做一个简要的概括

  • 创建taskManager心跳管理

  • 执行:HeartbeatManagerSenderImpl

  • HeartbeatManagerSenderImpl是HeartbeatManagerImpl的子类,

  • 由心跳管理的一方(例如JM)创建,创建后立即开启周期调度线程,

  • 每次遍历自己管理的heartbeatTarget,触发heartbeatTarget.requestHeartbeat,

  • 属于主动触发。

    进入createHeartbeatManagerSender方法后发现return了一个HeartbeatManagerSenderImpl

    1
    2
    3
    4
    5
    public class HeartbeatManagerSenderImpl<I, O> extends HeartbeatManagerImpl<I, O> implements Runnable {
    public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O> {
    public interface HeartbeatManager<I, O> extends HeartbeatTarget<I> {
    public interface HeartbeatTarget<I> {

    我们来到最底层的这个HeartbeatTarget里面来看,这个方法主要是可以接收监控目标发送的心跳请求信息,也可以对监控目标发送心跳请求,心跳响应和心跳请求都可以携带一个心跳数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    /**
    * Interface for components which can be sent heartbeats and from which one can request a
    * heartbeat response().
    * Both the heartbeat response as well as the heartbeat request can carry a
    * payload. This payload is reported to the heartbeat target and contains additional information.
    * The payload can be empty which is indicated by a null value.
    *
    * 可以发送心跳,可以从其中请求心跳响应
    * 心跳响应和心跳请求都可以携带一个payload。该payload被报告给心跳目标,
    * 并包含附加信息。payload可以是空的,这是由一个空值表示的。
    *
    * @param <I> Type of the payload which is sent to the heartbeat target
    * 发送到心跳目标的payload的类型
    */
    public interface HeartbeatTarget<I> {

    /**
    * 接收监控目标发送来的心跳请求信息
    * @param heartbeatOrigin 标识应报告心跳的机器的资源 ID。
    * @param heartbeatPayload 心跳响应携带的数据
    */
    void receiveHeartbeat(ResourceID heartbeatOrigin, I heartbeatPayload);

    /**
    * 向监控目标发送心跳请求
    * 请求目标的心跳。 每个心跳请求都可以携带一个payload,其中包含心跳目标的附加信息。
    * Requests a heartbeat from the target. Each heartbeat request can carry a payload which contains additional information for the heartbeat target.
    *
    * @param requestOrigin Resource ID identifying the machine issuing the heartbeat request.
    * 标识发出心跳请求的机器的资源 ID。
    * @param heartbeatPayload Payload of the heartbeat request. Null indicates an empty payload.
    * 心跳请求的Payload。 Null表示空负载。
    */
    void requestHeartbeat(ResourceID requestOrigin, I heartbeatPayload);
    }

    这里是一个接口,我们看一下这个接口中的实现

    这里的逻辑其实就是检查是不是还有心跳,如果检查到还有心跳回应,那么就重置心跳检测的时间,我们点进去看一下reportHeartbeat这个方法

    我们这里再点进去这个reportHeartbeat方法中去

    这里就是重置我们的超时线程

    我们再往下面看,下面是一个

    1
    2
    3
    if (heartbeatPayload != null) {
    heartbeatListener.reportPayload(heartbeatOrigin, heartbeatPayload);
    }

这里是一个herartbeatListener调用的reportPayload方法

这里是这个HeartbeatListener的内容,这里的reportPayload方法就是接收到有关心跳的payload就会调用