Flink源码解析(五) 第二个工厂resourceManager的实现 前面说了WebMonitorEndpoint的实现,下面说一下这个ResourceManager的实现
我们回到ClusterEntrypoint的方法里面,这里从create方法里面点
往下点点到DefaultDispatcherResourceManagerComponentFactory这个方法中,找到步骤十一的resourceManager的内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 resourceManager = resourceManagerFactory .createResourceManager(configuration, ResourceID.generate(), rpcService, highAvailabilityServices, heartbeatServices, fatalErrorHandler, new ClusterInformation (hostname, blobServer.getPort()), webMonitorEndpoint.getRestBaseUrl(), metricRegistry, hostname); --------------------------------------------------------- public ResourceManager<T> createResourceManager (Configuration configuration, ResourceID resourceId, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, FatalErrorHandler fatalErrorHandler, ClusterInformation clusterInformation, @Nullable String webInterfaceUrl, MetricRegistry metricRegistry, String hostname) throws Exception { final ResourceManagerMetricGroup resourceManagerMetricGroup = ResourceManagerMetricGroup.create(metricRegistry, hostname); final SlotManagerMetricGroup slotManagerMetricGroup = SlotManagerMetricGroup.create(metricRegistry, hostname); final ResourceManagerRuntimeServices resourceManagerRuntimeServices = createResourceManagerRuntimeServices(configuration, rpcService, highAvailabilityServices, slotManagerMetricGroup); return createResourceManager(configuration, resourceId, rpcService, highAvailabilityServices, heartbeatServices, fatalErrorHandler, clusterInformation, webInterfaceUrl, resourceManagerMetricGroup, resourceManagerRuntimeServices); }
我们看到这里最后return的createResourceManager点进去
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 @Override protected ResourceManager<ResourceID> createResourceManager ( Configuration configuration, ResourceID resourceId, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, FatalErrorHandler fatalErrorHandler, ClusterInformation clusterInformation, @Nullable String webInterfaceUrl, ResourceManagerMetricGroup resourceManagerMetricGroup, ResourceManagerRuntimeServices resourceManagerRuntimeServices) { final Time standaloneClusterStartupPeriodTime = ConfigurationUtils.getStandaloneClusterStartupPeriodTime(configuration); return new StandaloneResourceManager ( rpcService, resourceId, highAvailabilityServices, heartbeatServices, resourceManagerRuntimeServices.getSlotManager(), ResourceManagerPartitionTrackerImpl::new , resourceManagerRuntimeServices.getJobLeaderIdService(), clusterInformation, fatalErrorHandler, resourceManagerMetricGroup, standaloneClusterStartupPeriodTime, AkkaUtils.getTimeoutAsTime(configuration)); }
这里是return了一个StandaloneResourceManager对象,我们可以看到这里是初始化创建了包括rpcservice在内的一堆服务,然后我们来看一下这个方法的体系结构
这里介绍一下如何在idea里面看一个方法的体系结构
首先我们点击一个方法,然后右键,右键后会有一个
我们点击了这个的话会出现下面的内容,这个就是这个方法的体系结构了
这里绿色的接口,蓝色的是类
我们这里主要看蓝色的部分,类之间的继承关系
我们可以看到这个关系中,我们从StandaloneResourceManager一直到RpcEndpoint里面这里是有一个onstart的钩子函数的,我们从最下面的StandaloneResourceManager开始看
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 public StandaloneResourceManager (RpcService rpcService, ResourceID resourceId, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, SlotManager slotManager, ResourceManagerPartitionTrackerFactory clusterPartitionTrackerFactory, JobLeaderIdService jobLeaderIdService, ClusterInformation clusterInformation, FatalErrorHandler fatalErrorHandler, ResourceManagerMetricGroup resourceManagerMetricGroup, Time startupPeriodTime, Time rpcTimeout) { super (rpcService, resourceId, highAvailabilityServices, heartbeatServices, slotManager, clusterPartitionTrackerFactory, jobLeaderIdService, clusterInformation, fatalErrorHandler, resourceManagerMetricGroup, rpcTimeout); this .startupPeriodTime = Preconditions.checkNotNull(startupPeriodTime); --------------------------------------------------------------- private CompletableFuture<Void> clearStateFuture = CompletableFuture.completedFuture(null ); public ResourceManager (RpcService rpcService, ResourceID resourceId, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, SlotManager slotManager, ResourceManagerPartitionTrackerFactory clusterPartitionTrackerFactory, JobLeaderIdService jobLeaderIdService, ClusterInformation clusterInformation, FatalErrorHandler fatalErrorHandler, ResourceManagerMetricGroup resourceManagerMetricGroup, Time rpcTimeout) { super (rpcService, AkkaRpcServiceUtils.createRandomName(RESOURCE_MANAGER_NAME), null ); this .resourceId = checkNotNull(resourceId); this .highAvailabilityServices = checkNotNull(highAvailabilityServices); this .heartbeatServices = checkNotNull(heartbeatServices); this .slotManager = checkNotNull(slotManager); this .jobLeaderIdService = checkNotNull(jobLeaderIdService); this .clusterInformation = checkNotNull(clusterInformation); this .fatalErrorHandler = checkNotNull(fatalErrorHandler); this .resourceManagerMetricGroup = checkNotNull(resourceManagerMetricGroup); this .jobManagerRegistrations = new HashMap <>(4 ); this .jmResourceIdRegistrations = new HashMap <>(4 ); this .taskExecutors = new HashMap <>(8 ); this .taskExecutorGatewayFutures = new HashMap <>(8 ); this .jobManagerHeartbeatManager = NoOpHeartbeatManager.getInstance(); this .taskManagerHeartbeatManager = NoOpHeartbeatManager.getInstance(); this .clusterPartitionTracker = checkNotNull(clusterPartitionTrackerFactory).get( (taskExecutorResourceId, dataSetIds) -> taskExecutors.get(taskExecutorResourceId).getTaskExecutorGateway() .releaseClusterPartitions(dataSetIds, rpcTimeout).exceptionally(throwable -> { log.debug("Request for release of cluster partitions belonging to data sets {} was not successful." , dataSetIds, throwable); throw new CompletionException (throwable); })); } ----------------------------------------------------------------- protected FencedRpcEndpoint (RpcService rpcService, String endpointId, @Nullable F fencingToken) { super (rpcService, endpointId); Preconditions.checkArgument(rpcServer instanceof FencedMainThreadExecutable, "The rpcServer must be of type %s." , FencedMainThreadExecutable.class.getSimpleName()); this .fencingToken = fencingToken; this .unfencedMainThreadExecutor = new UnfencedMainThreadExecutor ((FencedMainThreadExecutable) rpcServer); this .fencedMainThreadExecutor = new MainThreadExecutor (getRpcService().fenceRpcServer(rpcServer, fencingToken), this ::validateRunsInMainThread); } ------------------------------------------------------------------- protected RpcEndpoint (final RpcService rpcService, final String endpointId) { this .rpcService = checkNotNull(rpcService, "rpcService" ); this .endpointId = checkNotNull(endpointId, "endpointId" ); this .rpcServer = rpcService.startServer(this ); this .mainThreadExecutor = new MainThreadExecutor (rpcServer, this ::validateRunsInMainThread); }
我们点进去这里的startserver方法看一下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 @Override public <C extends RpcEndpoint & RpcGateway> RpcServer startServer (C rpcEndpoint) { checkNotNull(rpcEndpoint, "rpc endpoint" ); final SupervisorActor.ActorRegistration actorRegistration = registerAkkaRpcActor(rpcEndpoint); final ActorRef actorRef = actorRegistration.getActorRef(); final CompletableFuture<Void> actorTerminationFuture = actorRegistration.getTerminationFuture(); LOG.info("Starting RPC endpoint for {} at {} ." , rpcEndpoint.getClass().getName(), actorRef.path()); final String akkaAddress = AkkaUtils.getAkkaURL(actorSystem, actorRef); final String hostname; Option<String> host = actorRef.path().address().host(); if (host.isEmpty()) { hostname = "localhost" ; } else { hostname = host.get(); } Set<Class<?>> implementedRpcGateways = new HashSet <>(RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass())); implementedRpcGateways.add(RpcServer.class); implementedRpcGateways.add(AkkaBasedEndpoint.class); final InvocationHandler akkaInvocationHandler; if (rpcEndpoint instanceof FencedRpcEndpoint) { akkaInvocationHandler = new FencedAkkaInvocationHandler <>(akkaAddress, hostname, actorRef, configuration.getTimeout(), configuration.getMaximumFramesize(), actorTerminationFuture, ((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken, captureAskCallstacks); implementedRpcGateways.add(FencedMainThreadExecutable.class); } else { akkaInvocationHandler = new AkkaInvocationHandler (akkaAddress, hostname, actorRef, configuration.getTimeout(), configuration.getMaximumFramesize(), actorTerminationFuture, captureAskCallstacks); } ClassLoader classLoader = getClass().getClassLoader(); @SuppressWarnings("unchecked") RpcServer server = (RpcServer) Proxy .newProxyInstance(classLoader, implementedRpcGateways.toArray(new Class <?>[implementedRpcGateways.size()]), akkaInvocationHandler); return server; }
这里首先是去获取一个代理actorRef
然后去获取这个代理的地址和主机名
然后根据这个代理的地址和主机名等信息再初始化一个akkaInvocationHandler
然后再用这个akkaInvocationHandler再去初始化newProxyInstance一个Rpcserver
现在这个rpcserver里面就包括actorRef了,然后再返回这个rpcserver
1 2 3 4 5 6 @SuppressWarnings("unchecked") RpcServer server = (RpcServer) Proxy .newProxyInstance(classLoader, implementedRpcGateways.toArray(new Class <?>[implementedRpcGateways.size()]), akkaInvocationHandler);
上面这个这个内容是调用了java的动态代理,newProxyInstance
这里的逻辑还是需要再看一下,对于这个java的动态代理实在没有怎么了解过
我们继续回退StandaloneResourceManager,我们发现到这里这个方法就结束了
我们看到在ResourceManager这个方法里面这里是当这个构造方法结束的时候会调用onstart方法
我们去Rpcendpoint里面我们会发现这个onstart没有做任何事情
我们看一下他的子类里面发现也没有onstart
在看到ResourceManager方法里面才有这个onstart方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 @Override public void onStart () throws Exception { try { startResourceManagerServices(); } catch (Exception e) { final ResourceManagerException exception = new ResourceManagerException ( String.format("Could not start the ResourceManager %s" , getAddress()), e); onFatalError(exception); throw exception; } } --------------------------------------------------- private void startResourceManagerServices () throws Exception { try { leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService(); initialize(); leaderElectionService.start(this ); jobLeaderIdService.start(new JobLeaderIdActionsImpl ()); registerTaskExecutorMetrics(); } catch (Exception e) { handleStartResourceManagerServicesException(e); } }
我们点入这个initialize发现他在standalone模式下什么也没做
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 leaderElectionService.start(this ); jobLeaderIdService.start(new JobLeaderIdActionsImpl ()); ----------------------------------------------------------- @Override public void start (LeaderContender newContender) throws Exception { if (contender != null ) { throw new IllegalArgumentException ("Leader election service cannot be started multiple times." ); } contender = Preconditions.checkNotNull(newContender); contender.grantLeadership(HighAvailabilityServices.DEFAULT_LEADER_ID); }
看到grantLeadership这个方法进入
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 @Override public void grantLeadership (final UUID newLeaderSessionID) { final CompletableFuture<Boolean> acceptLeadershipFuture = clearStateFuture .thenComposeAsync((ignored) -> tryAcceptLeadership(newLeaderSessionID), getUnfencedMainThreadExecutor()); final CompletableFuture<Void> confirmationFuture = acceptLeadershipFuture.thenAcceptAsync((acceptLeadership) -> { if (acceptLeadership) { leaderElectionService.confirmLeadership(newLeaderSessionID, getAddress()); } }, getRpcService().getExecutor()); confirmationFuture.whenComplete((Void ignored, Throwable throwable) -> { if (throwable != null ) { onFatalError(ExceptionUtils.stripCompletionException(throwable)); } }); } ----------------------------------------------------------- private CompletableFuture<Boolean> tryAcceptLeadership (final UUID newLeaderSessionID) { if (leaderElectionService.hasLeadership(newLeaderSessionID)) { final ResourceManagerId newResourceManagerId = ResourceManagerId.fromUuid(newLeaderSessionID); log.info("ResourceManager {} was granted leadership with fencing token {}" , getAddress(), newResourceManagerId); if (getFencingToken() != null ) { clearStateInternal(); } setFencingToken(newResourceManagerId); startServicesOnLeadership(); return prepareLeadershipAsync().thenApply(ignored -> true ); } else { return CompletableFuture.completedFuture(false ); } } ---------------------------------------------- protected void startServicesOnLeadership () { startHeartbeatServices(); slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl ()); }
这里面前面是一个异步编程,这个可以看一下,这里不太了解了,后面是一个startServiceonLeadership,我们看一下是开启了什么
其实这里面有两个方法,一个是开启了心跳服务一个是开启了两个定时任务,我们先点进第一个看一下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 private void startHeartbeatServices () { taskManagerHeartbeatManager = heartbeatServices .createHeartbeatManagerSender(resourceId, new TaskManagerHeartbeatListener (), getMainThreadExecutor(), log); jobManagerHeartbeatManager = heartbeatServices .createHeartbeatManagerSender(resourceId, new JobManagerHeartbeatListener (), getMainThreadExecutor(), log); } ------------------------------------------------------------------ public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender (ResourceID resourceId, HeartbeatListener<I, O> heartbeatListener, ScheduledExecutor mainThreadExecutor, Logger log) { return new HeartbeatManagerSenderImpl <>(heartbeatInterval, heartbeatTimeout, resourceId, heartbeatListener, mainThreadExecutor, log); } -----------------------------------------------------------------
这里我们点进去这个HeartbeatManagerSenderImpl来看一下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 HeartbeatManagerSenderImpl(long heartbeatPeriod, long heartbeatTimeout, ResourceID ownResourceID, HeartbeatListener<I, O> heartbeatListener, ScheduledExecutor mainThreadExecutor, Logger log) { this (heartbeatPeriod, heartbeatTimeout, ownResourceID, heartbeatListener, mainThreadExecutor, log, new HeartbeatMonitorImpl .Factory<>()); } HeartbeatManagerSenderImpl(long heartbeatPeriod, long heartbeatTimeout, ResourceID ownResourceID, HeartbeatListener<I, O> heartbeatListener, ScheduledExecutor mainThreadExecutor, Logger log, HeartbeatMonitor.Factory<O> heartbeatMonitorFactory) { super (heartbeatTimeout, ownResourceID, heartbeatListener, mainThreadExecutor, log, heartbeatMonitorFactory); this .heartbeatPeriod = heartbeatPeriod; mainThreadExecutor.schedule(this , 0L , TimeUnit.MILLISECONDS); } @Override public void run () { if (!stopped) { log.debug("Trigger heartbeat request." ); for (HeartbeatMonitor<O> heartbeatMonitor : getHeartbeatTargets().values()) { requestHeartbeat(heartbeatMonitor); } getMainThreadExecutor().schedule(this , heartbeatPeriod, TimeUnit.MILLISECONDS); } }
这里我们调用HeartbeatManagerSenderImpl这个方法其实是调用了下面的run方法,因为有这个命令mainThreadExecutor.schedule(this, 0L, TimeUnit.MILLISECONDS);
这个方法接受一个 Runnable
对象作为任务,即 this
(当前对象)。在 Java 中,实现了 Runnable
接口的类会将 run()
方法作为任务的执行入口。调用 schedule()
时传入的延迟时间为 0L
毫秒,意味着任务会被立即调度 并执行。也就是说,run()
方法会在构造函数执行完毕后立即运行。
那么我们再看一下这个run方法吧,先看一下里面的requestHeartbeat
1 2 3 4 5 6 7 8 9 10 11 12 private void requestHeartbeat (HeartbeatMonitor<O> heartbeatMonitor) { O payload = getHeartbeatListener().retrievePayload(heartbeatMonitor.getHeartbeatTargetId()); final HeartbeatTarget<O> heartbeatTarget = heartbeatMonitor.getHeartbeatTarget(); heartbeatTarget.requestHeartbeat(getOwnResourceID(), payload); }
这个的内容是首先getHeartbeatTarget需要监控心跳的目标,然后每10s发送一次心跳
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Override public void requestHeartbeat (final ResourceID requestOrigin, I heartbeatPayload) { if (!stopped) { log.debug("Received heartbeat request from {}." , requestOrigin); final HeartbeatTarget<O> heartbeatTarget = reportHeartbeat(requestOrigin); if (heartbeatTarget != null ) { if (heartbeatPayload != null ) { heartbeatListener.reportPayload(requestOrigin, heartbeatPayload); } heartbeatTarget.receiveHeartbeat(getOwnResourceID(), heartbeatListener.retrievePayload(requestOrigin)); } } }
这里最后listener收到了这个回应heartbeatListener.reportPayload(requestOrigin, heartbeatPayload);
我们再回到我们的ResourceManager的代码里面看下一段
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl ()); ----------------------------------------------------- @Override public void start (ResourceManagerId newResourceManagerId, Executor newMainThreadExecutor, ResourceActions newResourceActions) { LOG.info("Starting the SlotManager." ); this .resourceManagerId = Preconditions.checkNotNull(newResourceManagerId); mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor); resourceActions = Preconditions.checkNotNull(newResourceActions); started = true ; taskManagerTimeoutCheck = scheduledExecutor .scheduleWithFixedDelay(() -> mainThreadExecutor.execute(() -> checkTaskManagerTimeouts()), 0L , taskManagerTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); slotRequestTimeoutCheck = scheduledExecutor .scheduleWithFixedDelay(() -> mainThreadExecutor.execute(() -> checkSlotRequestTimeouts()), 0L , slotRequestTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); registerSlotManagerMetrics(); } ---------------------------------------------------------
这里的第一个定时任务是检查TaskManager的心跳,看看他们挂没挂
第二个定时任务是要在申请资源的时候咔看看五分钟之内有没有回应
1 2 3 4 5 6 7 8 9 10 11 registerSlotManagerMetrics(); ------------------------------------------ private void registerSlotManagerMetrics () { slotManagerMetricGroup.gauge(MetricNames.TASK_SLOTS_AVAILABLE, () -> (long ) getNumberFreeSlots()); slotManagerMetricGroup.gauge(MetricNames.TASK_SLOTS_TOTAL, () -> (long ) getNumberRegisteredSlots()); }
这个最后面还有两个监控,这两个监控是监控了
一个是taskSlot使用了多少
一个是taskSlot剩余了多少
好了,这里的resourceManager暂时差不多了,我们回到clusterentrypoint
在DefaultDispatcherResourceManagerComponentFactory这个代码里面有一个
1 2 3 4 5 6 7 8 9 10 11 12 13 resourceManager.start(); ------------------------------------------------------- @Override public void start () { rpcEndpoint.tell(ControlMessages.START, ActorRef.noSender()); }
这里就是再启动一下,启动成功后是给自己发一条消息