Flink源码解析(八)
2024-12-26 10:54:24 # Flink # 源码解析 # JobManager启动总结(二)

JobManager启动流程解析

jobManager

负责一个具体的job的执行,一个集群中有多个jobManager在执行。jobManager主要是接收Flink作业,调度Task,管理TaskManager,接收到客户端的任务才会启动

ResourceManager

TaskSlot的资源申请

TaskManager启动后会不停的对ResourceManager心跳通信

Dispatcher

dispatcher提供rest接口接收job提交,负责启动jobManager和提交job,运行web ui

WebMonitorEndPoint

客户端提交一个job到flink集群,由WebMonitorEndPoint来接收,决定哪个Handler处理

Flink 集群的节点运行:ResourceManager 和 Dispatcher,当Client提交一个 job 到集群运行的时候(客户端会把该 Job 构建成一个 JobGragh 对象),Dispatcher 负责调度JobManager 来管理这个Job的执行,JobManager向ResourceManager 申请执行Task所需要的资源。

RunTime

Flink RunTime作为Flink引擎的核心部分,支撑着Flink流作业和批作业的运行,同时保障作业的高可用和可扩展性等。Flink RunTime采用Master-Worker的架构,其中Flink的Master节点为JobManager,Worker节点为TaskManager。

运行时架构

运行时架构中,JobManager包括Rest,Dispatcher,ResourceManager和JobManager;TaskManager里面有TaskExecutor

  • REST的主体部分WebMonitorEndpoint接收客户端的HTTP请求,提供各种REST服务,如作业、集群的指标、各种作业信息的情况、操作作业等。
  • Dispatcher的主要功能是接收REST转发的操作JobMaster请求,启动和管理JobMaster。
  • JobMaster主要负责作业的运行调度和检查点的协调。
  • ResourceManager在不同部署模式下对资源进行管理(主要包括申请、回收资源及资源状态管控)
  • TaskExecutor对资源(CPU、内存等)以逻辑的Slot进行划分,Slot供作业的Task调度到其上运行。

Rest

rest主要是为客户端和前端提供了http服务,rest的核心就是WebMonitorEndPoint

这里首先我们可以看到DispatcherRestEndpoint是WebMonitorEndpoint的子类,会调用initializeHandlers方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Override
protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(final CompletableFuture<String> localAddressFuture) {

/**
* 调用父类进行handlers的初始化操作
*/
List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = super.initializeHandlers(localAddressFuture);

// Add the Dispatcher specific handlers
final Time timeout = restConfiguration.getTimeout();

/**
* 注释: JobSubmit Handler:任务提交处理器
* JobSubmitHandler 来执行处理
* 我们提交 Job 的时候,由WebMonitorEndpoint 接收到跳转到使用JobSubmitHandler来执行
* 最终执行请求的就是 handleRequest()
*/
JobSubmitHandler jobSubmitHandler = new JobSubmitHandler(leaderRetriever, timeout, responseHeaders, executor, clusterConfiguration);
handlers.add(Tuple2.of(jobSubmitHandler.getMessageHeaders(), jobSubmitHandler));
return handlers;
}

这里就可以看出当提交job的时候是先给到WebMonitorEndpoint然后跳转到DispathcerRestEndpoint再使用是JobSubmitHandler来执行,最终执行是handleRequest()

这个就是对应的flink上的web界面上的summit New job的flink web ui提交jar的功能

这里我们来看一下这个JobSubmitHandler中的handleRequest

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
/**
* 注释:接收到客户端的请求(SubmitJob)
*/
@Override
protected CompletableFuture<JobSubmitResponseBody> handleRequest(@Nonnull HandlerRequest<JobSubmitRequestBody, EmptyMessageParameters> request,
@Nonnull DispatcherGateway gateway) throws RestHandlerException {

// 从请求中获取文件: 包含 JobGraph 序列化文件
final Collection<File> uploadedFiles = request.getUploadedFiles();
final Map<String, Path> nameToFile = uploadedFiles.stream().collect(Collectors.toMap(File::getName, Path::fromLocalFile));

if(uploadedFiles.size() != nameToFile.size()) {
throw new RestHandlerException(String.format("The number of uploaded files was %s than the expected count. Expected: %s Actual %s",
uploadedFiles.size() < nameToFile.size() ? "lower" : "higher", nameToFile.size(), uploadedFiles.size()),
HttpResponseStatus.BAD_REQUEST);
}

/**
* 获取请求体
*/
final JobSubmitRequestBody requestBody = request.getRequestBody();

if(requestBody.jobGraphFileName == null) {
throw new RestHandlerException(String.format("The %s field must not be omitted or be null.", JobSubmitRequestBody.FIELD_NAME_JOB_GRAPH),
HttpResponseStatus.BAD_REQUEST);
}

/**
*
* 服务端接收到客户端提交的,其实就是一个 JobGragh
* 客户端把 JobGragh提交给JobManager了
* 最终由 JobSubmitHandler 来执行处理
*/
CompletableFuture<JobGraph> jobGraphFuture = loadJobGraph(requestBody, nameToFile);

/**
* 得到 jar
*/
Collection<Path> jarFiles = getJarFilesToUpload(requestBody.jarFileNames, nameToFile);

/**
* 依赖 jar
*/
Collection<Tuple2<String, Path>> artifacts = getArtifactFilesToUpload(requestBody.artifactFileNames, nameToFile);

/**
*
* 上传: JobGraph + 程序jar + 依赖jar 到HDFS
*/
CompletableFuture<JobGraph> finalizedJobGraphFuture = uploadJobGraphFiles(gateway, jobGraphFuture, jarFiles, artifacts, configuration);

/**
*
* 注释: 提交任务
*/
CompletableFuture<Acknowledge> jobSubmissionFuture = finalizedJobGraphFuture.thenCompose(
/**
* 提交任务
* gateway = DispatcherGateway
* Dispatcher来复制提交处理
* */
jobGraph -> gateway.submitJob(jobGraph, timeout)
);
return jobSubmissionFuture.thenCombine(jobGraphFuture,

// 封装处理响应返回
(ack, jobGraph) -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID())
);
}

这里我们可以看到这里的内容就是处理web ui上submit new job的相关请求,比如上传jar,生成jar列表。删除jar,执行jar,生成执行图这些

所有handler都继承自AbstractHandler

我们随便来找一个handler看一下

1
2
3
4
5
6
//    注释: JobIdsHandler
JobIdsHandler jobIdsHandler = new JobIdsHandler(leaderRetriever, timeout, responseHeaders, JobIdsWithStatusesOverviewHeaders.getInstance());

-------------------------------------------
public class JobIdsHandler extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, JobIdsWithStatusOverview, EmptyMessageParameters> {

所有的Handler都有以下的特点:

  • 所有的handler都在org.apache.flink.runtime.rest.handler下面
  • handler都有messageHeaders属性,这个属性里面包含请求的url,参数类型,请求参数,响应类型,响应返回码,http请求类型和是否接收文件上传等
  • Handler会根据需要使用WebMonitor的LeaderRetriver和ResourceManagerRetriever字段对Dispatcher和ResourceManager访问获取作业资源信息

我们再返回这个WebMonitorEndpoint中看一下

1
public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndpoint implements LeaderContender, JsonArchivist {

这里的WebMonitorEndPoint是继承了RestServerEndpoint,实现了LeaderContender, JsonArchivist

我们来看一下这里继承的RestServerEndpoint

其中有一个start方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
public final void start() throws Exception {
synchronized(lock) {
Preconditions.checkState(state == State.CREATED, "The RestServerEndpoint cannot be restarted.");
log.info("Starting rest endpoint.");
final Router router = new Router();
final CompletableFuture<String> restAddressFuture = new CompletableFuture<>();

/**
* 步骤二十二:
* 初始化各种Handler
* 查看initializeHandlers代码实现
*/
handlers = initializeHandlers(restAddressFuture);

/**
针对所有的 Handlers 进行排序,排序规则:RestHandlerUrlComparator
sort the handlers such that they are ordered the following:
一个url对应一个handler
* /jobs
* /jobs/overview
* /jobs/:jobid
* /jobs/:jobid/config
*/
Collections.sort(handlers, RestHandlerUrlComparator.INSTANCE);

checkAllEndpointsAndHandlersAreUnique(handlers);
handlers.forEach(handler -> registerHandler(router, handler, log));

/**
* 启动Netty服务端
*/
ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
RouterHandler handler = new RouterHandler(router, responseHeaders);

// SSL should be the first handler in the pipeline
if(isHttpsEnabled()) {
ch.pipeline().addLast("ssl", new RedirectingSslHandler(restAddress, restAddressFuture, sslHandlerFactory));
}

ch.pipeline()
.addLast(new HttpServerCodec())
.addLast(new FileUploadHandler(uploadDir))
.addLast(new FlinkHttpObjectAggregator(maxContentLength, responseHeaders))
.addLast(new ChunkedWriteHandler())
.addLast(handler.getName(), handler)
.addLast(new PipelineErrorHandler(log, responseHeaders));
}
};

NioEventLoopGroup bossGroup = new NioEventLoopGroup(1, new ExecutorThreadFactory("flink-rest-server-netty-boss"));
NioEventLoopGroup workerGroup = new NioEventLoopGroup(0, new ExecutorThreadFactory("flink-rest-server-netty-worker"));

/**
* 启动 Netty 网络通信 服务端引导程序
*/
bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(initializer);

Iterator<Integer> portsIterator;
try {
portsIterator = NetUtils.getPortRangeFromString(restBindPortRange);
} catch(IllegalConfigurationException e) {
throw e;
} catch(Exception e) {
throw new IllegalArgumentException("Invalid port range definition: " + restBindPortRange);
}

// 注释: 绑定端口,通过轮询的方式来搞定
int chosenPort = 0;
while(portsIterator.hasNext()) {
try {
chosenPort = portsIterator.next();
final ChannelFuture channel;
if(restBindAddress == null) {
channel = bootstrap.bind(chosenPort);
} else {
channel = bootstrap.bind(restBindAddress, chosenPort);
}
serverChannel = channel.syncUninterruptibly().channel();
break;
} catch(final Exception e) {
if(!(e instanceof org.jboss.netty.channel.ChannelException || e instanceof java.net.BindException)) {
throw e;
}
}
}

if(serverChannel == null) {
throw new BindException("Could not start rest endpoint on any port in port range " + restBindPortRange);
}

log.debug("Binding rest endpoint to {}:{}.", restBindAddress, chosenPort);

final InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress();
final String advertisedAddress;
if(bindAddress.getAddress().isAnyLocalAddress()) {
advertisedAddress = this.restAddress;
} else {
advertisedAddress = bindAddress.getAddress().getHostAddress();
}
final int port = bindAddress.getPort();

log.info("Rest endpoint listening at {}:{}", advertisedAddress, port);

restBaseUrl = new URL(determineProtocol(), advertisedAddress, port, "").toString();

restAddressFuture.complete(restBaseUrl);

state = State.RUNNING;
/**
* 启动清理的定时任务
* 选举机制
*/
startInternal();
}
}

这里首先是初始化了很多的handler,我们点进去看一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
handlers = initializeHandlers(restAddressFuture);


-------------------------------------------

protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(final CompletableFuture<String> localAddressFuture) {

/**
* 注释: Handler 容器初始化
*/
ArrayList<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = new ArrayList<>(30);

// 注释: WebSubmissionExtension
final Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> webSubmissionHandlers = initializeWebSubmissionHandlers(
localAddressFuture);
handlers.addAll(webSubmissionHandlers);
final boolean hasWebSubmissionHandlers = !webSubmissionHandlers.isEmpty();

final Time timeout = restConfiguration.getTimeout();

// 注释: ClusterOverviewHandler
ClusterOverviewHandler clusterOverviewHandler = new ClusterOverviewHandler(leaderRetriever, timeout, responseHeaders,
ClusterOverviewHeaders.getInstance());

我们发现这个初始化的里面是很多的handler容器的初始化

我们再回到RestServerEndpoint

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
针对所有的 Handlers 进行排序,排序规则:RestHandlerUrlComparator
sort the handlers such that they are ordered the following:
一个url对应一个handler
* /jobs
* /jobs/overview
* /jobs/:jobid
* /jobs/:jobid/config
*/
Collections.sort(handlers, RestHandlerUrlComparator.INSTANCE);

checkAllEndpointsAndHandlersAreUnique(handlers);
/**
* 遍历所有的 Handlers,并且注册到 Router 中
*/
handlers.forEach(handler -> registerHandler(router, handler, log));

这里主要的就是遍历所有的Handlers注册到Router中

这里的内容我们点进去看一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private static void registerHandler(Router router, String handlerURL, HttpMethodWrapper httpMethod, ChannelInboundHandler handler) {
switch(httpMethod) {
case GET:
router.addGet(handlerURL, handler);
break;
case POST:
router.addPost(handlerURL, handler);
break;
case DELETE:
router.addDelete(handlerURL, handler);
break;
case PATCH:
router.addPatch(handlerURL, handler);
break;
default:
throw new RuntimeException("Unsupported http method: " + httpMethod + '.');
}
}

这里说明,dispatcherRestendPoint会根据httpMethod的请求调用将handler注册到router中对应的这个httpmethod的列表中

我们看一下这个router类里面是什么

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class Router<T> {
//handler注册信息
private final Map<HttpMethod, MethodlessRouter<T>> routers =
new HashMap<HttpMethod, MethodlessRouter<T>>();

private final MethodlessRouter<T> anyMethodRouter =
new MethodlessRouter<T>();

private T notFound;



----------------------------------------



final class MethodlessRouter<T> {
private static final Logger log = LoggerFactory.getLogger(MethodlessRouter.class);

// A path pattern can only point to one target
private final Map<PathPattern, T> routes = new LinkedHashMap<>();



--------------------------------------------
final class PathPattern {
private final String pattern;

//--------------------------------------------------------------------------
private final String[] tokens;
/**
* The pattern must not contain query, example:
* {@code constant1/constant2?foo=bar}.
*
* <p>The pattern will be stored without slashes at both ends.
*/
public PathPattern(String pattern) {
if (pattern.contains("?")) {
throw new IllegalArgumentException("Path pattern must not contain query");
}

this.pattern = removeSlashesAtBothEnds(checkNotNull(pattern, "pattern"));
this.tokens = this.pattern.split("/");
}

这里的routers其实是一个httpmethod映射到MethodlessRouter的Map<HttpMethod, MethodlessRouter>,这里的MethodlessRouter中的routes属性就是一个pathPattern映射到Handler上的Map,其中的PathPattern是由请求的URL和path全路径和path基于path分隔符拆分成单词的数组

这里的Router的Handler的注册信息的结构是这样的:

我们回到步骤二十二,也就是RestServerEndpoint,下面是启动Netty服务器

这里留一下,日后再看,这个启动Netty服务器这里不太了解,先暂时搁置一下,没太看明白

初始化所有Handler将其注册到Router列表后,会创建启动NettyServer,暴露给外部的Rest服务

这里的创建和启动NettyServer有两部分:初始化处理Channel和绑定端口启动

然后我们往下看

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* 启动清理的定时任务
* 选举机制
*/
startInternal();



----------------------------------------------


/**
* 选举和删除临时问价
*/
@Override
public void startInternal() throws Exception {
/**
* 1.执行选举的方法
* this = webMonitorEndpoint
* ZooKeeperLeaderElectionService.start()
* 选为Leader:isLeader() 否则:notLeader()
*/
leaderElectionService.start(this);

/**
* 2.开启定时任务,删除ExecutionGraphEntry执行完成后的临时文件和缓存文件
*/
startExecutionGraphCacheCleanupTask();

if(hasWebUI) {
log.info("Web frontend listening at {}.", getRestBaseUrl());
}
}

这里的前面的 leaderElectionService.start(this);还是走的zookeeper选举的那个内容,然后isleader方法,之前说过,这里就不说了,看一下下面的这个定时清理文件的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void startExecutionGraphCacheCleanupTask() {
final long cleanupInterval = 2 * restConfiguration.getRefreshInterval();

/**
* 定期的执行清理任务
* 具体执行:executionGraphCache::cleanup
* 删除临时文件和task任务执行的一些缓存文件
*/
executionGraphCleanupTask = executor
.scheduleWithFixedDelay(executionGraphCache::cleanup, cleanupInterval, cleanupInterval, TimeUnit.MILLISECONDS);
}


------------------------------------
@Override
public void cleanup() {
long currentTime = System.currentTimeMillis();
/**
* 超时移除
* ExecutionGraphEntry执行是否超过生命周期指定的时间
* 如果超过就了删除
*/
// remove entries which have exceeded their time to live
cachedExecutionGraphs.values().removeIf((ExecutionGraphEntry entry) -> currentTime >= entry.getTTL());
}

这里是删除一些超过文件生命周期的文件