JobManager启动流程解析 jobManager 负责一个具体的job的执行,一个集群中有多个jobManager在执行。jobManager主要是接收Flink作业,调度Task,管理TaskManager,接收到客户端的任务才会启动
ResourceManager TaskSlot的资源申请
TaskManager启动后会不停的对ResourceManager心跳通信
Dispatcher dispatcher提供rest接口接收job提交,负责启动jobManager和提交job,运行web ui
WebMonitorEndPoint 客户端提交一个job到flink集群,由WebMonitorEndPoint来接收,决定哪个Handler处理
Flink 集群的节点运行:ResourceManager 和 Dispatcher,当Client提交一个 job 到集群运行的时候(客户端会把该 Job 构建成一个 JobGragh 对象),Dispatcher 负责调度JobManager 来管理这个Job的执行,JobManager向ResourceManager 申请执行Task所需要的资源。
RunTime Flink RunTime作为Flink引擎的核心部分,支撑着Flink流作业和批作业的运行,同时保障作业的高可用和可扩展性等。Flink RunTime采用Master-Worker的架构,其中Flink的Master节点为JobManager,Worker节点为TaskManager。
运行时架构 运行时架构中,JobManager包括Rest,Dispatcher,ResourceManager和JobManager;TaskManager里面有TaskExecutor
REST的主体部分WebMonitorEndpoint接收客户端的HTTP请求,提供各种REST服务,如作业、集群的指标、各种作业信息的情况、操作作业等。
Dispatcher的主要功能是接收REST转发的操作JobMaster请求,启动和管理JobMaster。
JobMaster主要负责作业的运行调度和检查点的协调。
ResourceManager在不同部署模式下对资源进行管理(主要包括申请、回收资源及资源状态管控)
TaskExecutor对资源(CPU、内存等)以逻辑的Slot进行划分,Slot供作业的Task调度到其上运行。
Rest rest主要是为客户端和前端提供了http服务,rest的核心就是WebMonitorEndPoint
这里首先我们可以看到DispatcherRestEndpoint是WebMonitorEndpoint的子类,会调用initializeHandlers方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Override protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers (final CompletableFuture<String> localAddressFuture) { List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = super .initializeHandlers(localAddressFuture); final Time timeout = restConfiguration.getTimeout(); JobSubmitHandler jobSubmitHandler = new JobSubmitHandler (leaderRetriever, timeout, responseHeaders, executor, clusterConfiguration); handlers.add(Tuple2.of(jobSubmitHandler.getMessageHeaders(), jobSubmitHandler)); return handlers; }
这里就可以看出当提交job的时候是先给到WebMonitorEndpoint然后跳转到DispathcerRestEndpoint再使用是JobSubmitHandler来执行,最终执行是handleRequest()
这个就是对应的flink上的web界面上的summit New job的flink web ui提交jar的功能
这里我们来看一下这个JobSubmitHandler中的handleRequest
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 @Override protected CompletableFuture<JobSubmitResponseBody> handleRequest (@Nonnull HandlerRequest<JobSubmitRequestBody, EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException { final Collection<File> uploadedFiles = request.getUploadedFiles(); final Map<String, Path> nameToFile = uploadedFiles.stream().collect(Collectors.toMap(File::getName, Path::fromLocalFile)); if (uploadedFiles.size() != nameToFile.size()) { throw new RestHandlerException (String.format("The number of uploaded files was %s than the expected count. Expected: %s Actual %s" , uploadedFiles.size() < nameToFile.size() ? "lower" : "higher" , nameToFile.size(), uploadedFiles.size()), HttpResponseStatus.BAD_REQUEST); } final JobSubmitRequestBody requestBody = request.getRequestBody(); if (requestBody.jobGraphFileName == null ) { throw new RestHandlerException (String.format("The %s field must not be omitted or be null." , JobSubmitRequestBody.FIELD_NAME_JOB_GRAPH), HttpResponseStatus.BAD_REQUEST); } CompletableFuture<JobGraph> jobGraphFuture = loadJobGraph(requestBody, nameToFile); Collection<Path> jarFiles = getJarFilesToUpload(requestBody.jarFileNames, nameToFile); Collection<Tuple2<String, Path>> artifacts = getArtifactFilesToUpload(requestBody.artifactFileNames, nameToFile); CompletableFuture<JobGraph> finalizedJobGraphFuture = uploadJobGraphFiles(gateway, jobGraphFuture, jarFiles, artifacts, configuration); CompletableFuture<Acknowledge> jobSubmissionFuture = finalizedJobGraphFuture.thenCompose( jobGraph -> gateway.submitJob(jobGraph, timeout) ); return jobSubmissionFuture.thenCombine(jobGraphFuture, (ack, jobGraph) -> new JobSubmitResponseBody ("/jobs/" + jobGraph.getJobID()) ); }
这里我们可以看到这里的内容就是处理web ui上submit new job的相关请求,比如上传jar,生成jar列表。删除jar,执行jar,生成执行图这些
所有handler都继承自AbstractHandler
我们随便来找一个handler看一下
1 2 3 4 5 6 JobIdsHandler jobIdsHandler = new JobIdsHandler (leaderRetriever, timeout, responseHeaders, JobIdsWithStatusesOverviewHeaders.getInstance());------------------------------------------- public class JobIdsHandler extends AbstractRestHandler <RestfulGateway, EmptyRequestBody, JobIdsWithStatusOverview, EmptyMessageParameters> {
所有的Handler都有以下的特点:
所有的handler都在org.apache.flink.runtime.rest.handler下面
handler都有messageHeaders属性,这个属性里面包含请求的url,参数类型,请求参数,响应类型,响应返回码,http请求类型和是否接收文件上传等
Handler会根据需要使用WebMonitor的LeaderRetriver和ResourceManagerRetriever字段对Dispatcher和ResourceManager访问获取作业资源信息
我们再返回这个WebMonitorEndpoint中看一下
1 public class WebMonitorEndpoint <T extends RestfulGateway > extends RestServerEndpoint implements LeaderContender , JsonArchivist {
这里的WebMonitorEndPoint是继承了RestServerEndpoint,实现了LeaderContender, JsonArchivist
我们来看一下这里继承的RestServerEndpoint
其中有一个start方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 public final void start () throws Exception { synchronized (lock) { Preconditions.checkState(state == State.CREATED, "The RestServerEndpoint cannot be restarted." ); log.info("Starting rest endpoint." ); final Router router = new Router (); final CompletableFuture<String> restAddressFuture = new CompletableFuture <>(); handlers = initializeHandlers(restAddressFuture); Collections.sort(handlers, RestHandlerUrlComparator.INSTANCE); checkAllEndpointsAndHandlersAreUnique(handlers); handlers.forEach(handler -> registerHandler(router, handler, log)); ChannelInitializer<SocketChannel> initializer = new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel ch) { RouterHandler handler = new RouterHandler (router, responseHeaders); if (isHttpsEnabled()) { ch.pipeline().addLast("ssl" , new RedirectingSslHandler (restAddress, restAddressFuture, sslHandlerFactory)); } ch.pipeline() .addLast(new HttpServerCodec ()) .addLast(new FileUploadHandler (uploadDir)) .addLast(new FlinkHttpObjectAggregator (maxContentLength, responseHeaders)) .addLast(new ChunkedWriteHandler ()) .addLast(handler.getName(), handler) .addLast(new PipelineErrorHandler (log, responseHeaders)); } }; NioEventLoopGroup bossGroup = new NioEventLoopGroup (1 , new ExecutorThreadFactory ("flink-rest-server-netty-boss" )); NioEventLoopGroup workerGroup = new NioEventLoopGroup (0 , new ExecutorThreadFactory ("flink-rest-server-netty-worker" )); bootstrap = new ServerBootstrap (); bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(initializer); Iterator<Integer> portsIterator; try { portsIterator = NetUtils.getPortRangeFromString(restBindPortRange); } catch (IllegalConfigurationException e) { throw e; } catch (Exception e) { throw new IllegalArgumentException ("Invalid port range definition: " + restBindPortRange); } int chosenPort = 0 ; while (portsIterator.hasNext()) { try { chosenPort = portsIterator.next(); final ChannelFuture channel; if (restBindAddress == null ) { channel = bootstrap.bind(chosenPort); } else { channel = bootstrap.bind(restBindAddress, chosenPort); } serverChannel = channel.syncUninterruptibly().channel(); break ; } catch (final Exception e) { if (!(e instanceof org.jboss.netty.channel.ChannelException || e instanceof java.net.BindException)) { throw e; } } } if (serverChannel == null ) { throw new BindException ("Could not start rest endpoint on any port in port range " + restBindPortRange); } log.debug("Binding rest endpoint to {}:{}." , restBindAddress, chosenPort); final InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress(); final String advertisedAddress; if (bindAddress.getAddress().isAnyLocalAddress()) { advertisedAddress = this .restAddress; } else { advertisedAddress = bindAddress.getAddress().getHostAddress(); } final int port = bindAddress.getPort(); log.info("Rest endpoint listening at {}:{}" , advertisedAddress, port); restBaseUrl = new URL (determineProtocol(), advertisedAddress, port, "" ).toString(); restAddressFuture.complete(restBaseUrl); state = State.RUNNING; startInternal(); } }
这里首先是初始化了很多的handler,我们点进去看一下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 handlers = initializeHandlers(restAddressFuture); ------------------------------------------- protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers (final CompletableFuture<String> localAddressFuture) { ArrayList<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = new ArrayList <>(30 ); final Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> webSubmissionHandlers = initializeWebSubmissionHandlers( localAddressFuture); handlers.addAll(webSubmissionHandlers); final boolean hasWebSubmissionHandlers = !webSubmissionHandlers.isEmpty(); final Time timeout = restConfiguration.getTimeout(); ClusterOverviewHandler clusterOverviewHandler = new ClusterOverviewHandler (leaderRetriever, timeout, responseHeaders, ClusterOverviewHeaders.getInstance());
我们发现这个初始化的里面是很多的handler容器的初始化
我们再回到RestServerEndpoint
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 Collections.sort(handlers, RestHandlerUrlComparator.INSTANCE); checkAllEndpointsAndHandlersAreUnique(handlers); handlers.forEach(handler -> registerHandler(router, handler, log));
这里主要的就是遍历所有的Handlers注册到Router中
这里的内容我们点进去看一下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private static void registerHandler (Router router, String handlerURL, HttpMethodWrapper httpMethod, ChannelInboundHandler handler) { switch (httpMethod) { case GET: router.addGet(handlerURL, handler); break ; case POST: router.addPost(handlerURL, handler); break ; case DELETE: router.addDelete(handlerURL, handler); break ; case PATCH: router.addPatch(handlerURL, handler); break ; default : throw new RuntimeException ("Unsupported http method: " + httpMethod + '.' ); } }
这里说明,dispatcherRestendPoint会根据httpMethod的请求调用将handler注册到router中对应的这个httpmethod的列表中
我们看一下这个router类里面是什么
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 public class Router <T> { private final Map<HttpMethod, MethodlessRouter<T>> routers = new HashMap <HttpMethod, MethodlessRouter<T>>(); private final MethodlessRouter<T> anyMethodRouter = new MethodlessRouter <T>(); private T notFound; ---------------------------------------- final class MethodlessRouter <T> { private static final Logger log = LoggerFactory.getLogger(MethodlessRouter.class); private final Map<PathPattern, T> routes = new LinkedHashMap <>(); -------------------------------------------- final class PathPattern { private final String pattern; private final String[] tokens; public PathPattern (String pattern) { if (pattern.contains("?" )) { throw new IllegalArgumentException ("Path pattern must not contain query" ); } this .pattern = removeSlashesAtBothEnds(checkNotNull(pattern, "pattern" )); this .tokens = this .pattern.split("/" ); }
这里的routers其实是一个httpmethod映射到MethodlessRouter的Map<HttpMethod, MethodlessRouter>,这里的MethodlessRouter中的routes属性就是一个pathPattern映射到Handler上的Map,其中的PathPattern是由请求的URL和path全路径和path基于path分隔符拆分成单词的数组
这里的Router的Handler的注册信息的结构是这样的:
我们回到步骤二十二,也就是RestServerEndpoint,下面是启动Netty服务器
这里留一下,日后再看,这个启动Netty服务器这里不太了解,先暂时搁置一下,没太看明白
初始化所有Handler将其注册到Router列表后,会创建启动NettyServer,暴露给外部的Rest服务
这里的创建和启动NettyServer有两部分:初始化处理Channel和绑定端口启动
然后我们往下看
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 startInternal(); ---------------------------------------------- @Override public void startInternal () throws Exception { leaderElectionService.start(this ); startExecutionGraphCacheCleanupTask(); if (hasWebUI) { log.info("Web frontend listening at {}." , getRestBaseUrl()); } }
这里的前面的 leaderElectionService.start(this);
还是走的zookeeper选举的那个内容,然后isleader方法,之前说过,这里就不说了,看一下下面的这个定时清理文件的方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 private void startExecutionGraphCacheCleanupTask () { final long cleanupInterval = 2 * restConfiguration.getRefreshInterval(); executionGraphCleanupTask = executor .scheduleWithFixedDelay(executionGraphCache::cleanup, cleanupInterval, cleanupInterval, TimeUnit.MILLISECONDS); } ------------------------------------ @Override public void cleanup () { long currentTime = System.currentTimeMillis(); cachedExecutionGraphs.values().removeIf((ExecutionGraphEntry entry) -> currentTime >= entry.getTTL()); }
这里是删除一些超过文件生命周期的文件