一、ClusterEntrypoint
当用户用Session cli命令启动集群时,首先会在Flink集群启动脚本中调用ClusterEntrypoint抽象类中提供的main()方法,以启动和运行相应类型的集群环境。
也就是说,ClusterEntrypoint是整个集群的入口类,且带有main()方法。在运行时管理中,所有的服务都是通过CE类进行触发和启动,进而完成核心组件的创建和初始化。
我们先通过下图看一下CE抽象类的继承关系
可以看到ClusterEntrypoint分为两类
SessionClusterEntrypoint
只建立一个集群,能够同时运行多个作业,这样资源利用率更高,但是如果集群挂掉, 会影响很多作业。JobClusterEntrypoint
又叫Per-job模式,为每个job单独创建一个集群,这样如果集群挂掉也只影响一个任务。
standalone对应的本地模式,mesos、yarn集群模式的不同调度器。
我们再从StandaloneSessionClusterEntrypoint中的main()方法开始,看看ClusterEntrypoint如何启动集群
public static void main(String[] args) {
// startup checks and logging 启动配置检查和日志加载
EnvironmentInformation.logEnvironmentInfo(LOG, StandaloneSessionClusterEntrypoint.class.getSimpleName(), args);
SignalHandler.register(LOG);
JvmShutdownSafeguard.installAsShutdownHook(LOG);
EntrypointClusterConfiguration entrypointClusterConfiguration = null;
final CommandLineParser commandLineParser = new CommandLineParser<>(new EntrypointClusterConfigurationParserFactory());
try {
entrypointClusterConfiguration = commandLineParser.parse(args);
} catch (FlinkParseException e) {
LOG.error("Could not parse command line arguments {}.", args, e);
commandLineParser.printHelp(StandaloneSessionClusterEntrypoint.class.getSimpleName());
System.exit(1);
}
Configuration configuration = loadConfiguration(entrypointClusterConfiguration);
StandaloneSessionClusterEntrypoint entrypoint = new StandaloneSessionClusterEntrypoint(configuration);
//经过上面一系列的配置之后,通过调用CE抽象类的runClusterEntrypoint启动
ClusterEntrypoint.runClusterEntrypoint(entrypoint);
}
通过最后一行代码我们可以发现,经过一系列的配置和日志加载,最后调用了ClusterEntrypoint里的runClusterEntrypoint方法。我们再来看看这个方法干了什么。
public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {
final String clusterEntrypointName = clusterEntrypoint.getClass().getSimpleName();
try {
clusterEntrypoint.startCluster();//⭐通过这一行启动集群
} catch (ClusterEntrypointException e) {
LOG.error(String.format("Could not start cluster entrypoint %s.", clusterEntrypointName), e);
System.exit(STARTUP_FAILURE_RETURN_CODE);
}
clusterEntrypoint.getTerminationFuture().whenComplete((applicationStatus, throwable) -> {
final int returnCode;
if (throwable != null) {
returnCode = RUNTIME_FAILURE_RETURN_CODE;
} else {
returnCode = applicationStatus.processExitCode();
}
LOG.info("Terminating cluster entrypoint process {} with exit code {}.", clusterEntrypointName, returnCode, throwable);
System.exit(returnCode);
});
}
上述代码中带⭐的代码又调用的CE.startCluster()继续启动,然后等运行结束,用clusterEntrypoint.getTerminationFuture().whenComplete()获取运行结束状态并进行对应的处理。
我们再看看startCluster()干了什么
public void startCluster() throws ClusterEntrypointException {
LOG.info("Starting {}.", getClass().getSimpleName());
try {
configureFileSystems(configuration);//配置文件系统
SecurityContext securityContext = installSecurityContext(configuration);
securityContext.runSecured((Callable) () -> {
runCluster(configuration);//⭐在securityContext安全环境里继续启动
return null;
});
} catch (Throwable t) {
final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
try {
// clean up any partial state
shutDownAsync(
ApplicationStatus.FAILED,
ExceptionUtils.stringifyException(strippedThrowable),
false).get(INITIALIZATION_SHUTDOWN_TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
strippedThrowable.addSuppressed(e);
}
throw new ClusterEntrypointException(
String.format("Failed to initialize the cluster entrypoint %s.", getClass().getSimpleName()),
strippedThrowable);
}
}
注意⭐号的代码,这里是SecurityContext在继续runCluster,而不是ClusterEntrypoint在做,继续看runCluster
private void runCluster(Configuration configuration) throws Exception {
synchronized (lock) {
//⭐初始化运行时集群需要创建的基础组件服务,如HAServices、CommonRPCService等。
initializeServices(configuration);
// write host information into configuration 把host信息写入配置
configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());
final DispatcherResourceManagerComponentFactory> dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(configuration);
//⭐创建集群组件clusterComponent
//⭐其中包含了resourceManager、dispatcher、webMonitorEndpoint
clusterComponent = dispatcherResourceManagerComponentFactory.create(
configuration,
commonRpcService,
haServices,
blobServer,
heartbeatServices,
metricRegistry,
archivedExecutionGraphStore,
new AkkaQueryServiceRetriever(
metricQueryServiceActorSystem,
Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT))),
this);
clusterComponent.getShutDownFuture().whenComplete(
(ApplicationStatus applicationStatus, Throwable throwable) -> {
if (throwable != null) {
shutDownAsync(
ApplicationStatus.UNKNOWN,
ExceptionUtils.stringifyException(throwable),
false);
} else {
// This is the general shutdown path. If a separate more specific shutdown was
// already triggered, this will do nothing
shutDownAsync(
applicationStatus,
null,
true);
}
});
}
}
这一步启动了多种服务和组件,并通过dispatcherResourceManagerComponentFactory调用create来启动,继续看
@Override public DispatcherResourceManagerComponentcreate( Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, BlobServer blobServer, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, ArchivedExecutionGraphStore archivedExecutionGraphStore, MetricQueryServiceRetriever metricQueryServiceRetriever, FatalErrorHandler fatalErrorHandler) throws Exception { LeaderRetrievalService dispatcherLeaderRetrievalService = null; LeaderRetrievalService resourceManagerRetrievalService = null; WebMonitorEndpoint webMonitorEndpoint = null; ResourceManager> resourceManager = null; JobManagerMetricGroup jobManagerMetricGroup = null; T dispatcher = null; try { dispatcherLeaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever(); resourceManagerRetrievalService = highAvailabilityServices.getResourceManagerLeaderRetriever(); final LeaderGatewayRetriever dispatcherGatewayRetriever = new RpcGatewayRetriever<>( rpcService, DispatcherGateway.class, DispatcherId::fromUuid, 10, Time.milliseconds(50L)); final LeaderGatewayRetriever resourceManagerGatewayRetriever = new RpcGatewayRetriever<>( rpcService, ResourceManagerGateway.class, ResourceManagerId::fromUuid, 10, Time.milliseconds(50L)); final ExecutorService executor = WebMonitorEndpoint.createExecutorService( configuration.getInteger(RestOptions.SERVER_NUM_THREADS), configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY), "DispatcherRestEndpoint"); final long updateInterval = configuration.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL); final MetricFetcher metricFetcher = updateInterval == 0 ? VoidMetricFetcher.INSTANCE : MetricFetcherImpl.fromConfiguration( configuration, metricQueryServiceRetriever, dispatcherGatewayRetriever, executor); webMonitorEndpoint = restEndpointFactory.createRestEndpoint( configuration, dispatcherGatewayRetriever, resourceManagerGatewayRetriever, blobServer, executor, metricFetcher, highAvailabilityServices.getWebMonitorLeaderElectionService(), fatalErrorHandler);//⭐创建webMonitorEndpoint log.debug("Starting Dispatcher REST endpoint."); webMonitorEndpoint.start();//⭐启动webMonitorEndpoint final String hostname = getHostname(rpcService); jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup( metricRegistry, hostname, ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration)); resourceManager = resourceManagerFactory.createResourceManager( configuration, ResourceID.generate(), rpcService, highAvailabilityServices, heartbeatServices, metricRegistry, fatalErrorHandler, new ClusterInformation(hostname, blobServer.getPort()), webMonitorEndpoint.getRestbaseUrl(), jobManagerMetricGroup); //⭐创建ResourceManager final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, webMonitorEndpoint); dispatcher = dispatcherFactory.createDispatcher( configuration, rpcService, highAvailabilityServices, resourceManagerGatewayRetriever, blobServer, heartbeatServices, jobManagerMetricGroup, metricRegistry.getMetricQueryServicePath(), archivedExecutionGraphStore, fatalErrorHandler, historyServerArchivist);//⭐创建dispatcher log.debug("Starting ResourceManager."); resourceManager.start();//⭐启动ResourceManager resourceManagerRetrievalService.start(resourceManagerGatewayRetriever); log.debug("Starting Dispatcher."); dispatcher.start();//⭐启动dispatcher dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever); return createDispatcherResourceManagerComponent( dispatcher, resourceManager, dispatcherLeaderRetrievalService, resourceManagerRetrievalService, webMonitorEndpoint, jobManagerMetricGroup); } catch (Exception exception) { // clean up all started components if (dispatcherLeaderRetrievalService != null) { try { dispatcherLeaderRetrievalService.stop(); } catch (Exception e) { exception = ExceptionUtils.firstOrSuppressed(e, exception); } } if (resourceManagerRetrievalService != null) { try { resourceManagerRetrievalService.stop(); } catch (Exception e) { exception = ExceptionUtils.firstOrSuppressed(e, exception); } } final Collection > terminationFutures = new ArrayList<>(3); if (webMonitorEndpoint != null) { terminationFutures.add(webMonitorEndpoint.closeAsync()); } if (resourceManager != null) { terminationFutures.add(resourceManager.closeAsync()); } if (dispatcher != null) { terminationFutures.add(dispatcher.closeAsync()); } final FutureUtils.ConjunctFuture terminationFuture = FutureUtils.completeAll(terminationFutures); try { terminationFuture.get(); } catch (Exception e) { exception = ExceptionUtils.firstOrSuppressed(e, exception); } if (jobManagerMetricGroup != null) { jobManagerMetricGroup.close(); } throw new FlinkException("Could not create the DispatcherResourceManagerComponent.", exception); } }
几个创建和启动组件的地方用⭐标注出来了。
可以看到最后ClusterEntrypoint启动了WebMonitorEndpoint、Dispatcher、ResourceManager几个组件,我们再分别来看一看具体是如何启动的。
二、WebMonitorEndpoint
WebMonitorEndpoint基于Netty通信框架实现了Restful的服务后端,提供Restful接口支持Flink Web在内的所有Rest请求,例如获取集群监控指标。
如果没接触过Netty和Rest api,可以通过这个了解一下
我们再看看这个类的继承关系
WME的父类RestServerPoint基于Netty实现了Rest后端,并提供了自定义Handler的初始化和现实抽象方法。WebMonitorEndpoint和DispatcherRestEndpoint等子类能拓展处理各自业务的Rest接口对应的Handler实现。
MiniDRE是本地执行实现的DRE,区别在于mini版不用加载JobGraph提交使用的Handler。在Idea里运行时创建的实际上就是MiniDRE。
我们先来看一看上一节最后一个代码块中,AbstractDispatcherResourceManagerComponentFactory是如何创建和启动WME的
webMonitorEndpoint = restEndpointFactory.createRestEndpoint(
configuration, //集群配置参数
dispatcherGatewayRetriever, //dispatcherGateway服务地址获取器,用于获取当前活跃的dG地址,基于dG可以实现与dispatcher的RPC通信,最终提交的JobGraph通过dispatcherGateway发送给dispatcher
resourceManagerGatewayRetriever,//作用和上面类似,用于获取RMG的地址,在TaskManagersHandler中可以通过调用RMG获取集群中的TaskManager监控信息
blobServer,//临时二进制对象数据存储服务
executor,//用于处理WebMonitorEndpoint请求的线程池服务
metricFetcher,//用于拉去JobManager和TaskManager上的Metric监控指标
highAvailabilityServices.getWebMonitorLeaderElectionService(),//实现高可用的服务
fatalErrorHandler //异常处理器,WME异常时,用这个中的处理接口
);
log.debug("Starting Dispatcher REST endpoint.");
webMonitorEndpoint.start();//⭐启动webMonitorEndpoint
启动所需要的参数都在上面列出来了,也做了注释,最后是WebMonitorEndpoint.start()启动了服务,我们再来看看这个start干了什么
public final void start() throws Exception {
synchronized (lock) {
//检查RestServerEndpoint.state是否为Created状态
Preconditions.checkState(state == State.CREATED, "The RestServerEndpoint cannot be restarted.");
//启动Rest Endpoint
log.info("Starting rest endpoint.");
final Router router = new Router();//根据Router寻找Handlers
final CompletableFuture restAddressFuture = new CompletableFuture<>();
//初始化Handlers
handlers = initializeHandlers(restAddressFuture);
Collections.sort(
handlers,
RestHandlerUrlComparator.INSTANCE);
//挨个注册
handlers.forEach(handler -> {
registerHandler(router, handler, log);
});
//创建channelInitializer,初始化channel
ChannelInitializer initializer = new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel ch) {
//创建路由RouterHandler,完成业务请求拦截
RouterHandler handler = new RouterHandler(router, responseHeaders);
// SSL should be the first handler in the pipeline 把SSL放在第一个handler
if (isHttpsEnabled()) {
ch.pipeline().addLast("ssl",
new RedirectingSslHandler(restAddress, restAddressFuture, sslHandlerFactory));
}
ch.pipeline()
.addLast(new HttpServerCodec())
.addLast(new FileUploadHandler(uploadDir))
.addLast(new FlinkHttpObjectAggregator(maxContentLength, responseHeaders))
.addLast(new ChunkedWriteHandler())
.addLast(handler.getName(), handler)
.addLast(new PipelineErrorHandler(log, responseHeaders));
}
};
//创建bossGroup和workerGroup
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1, new ExecutorThreadFactory("flink-rest-server-netty-boss"));
NioEventLoopGroup workerGroup = new NioEventLoopGroup(0, new ExecutorThreadFactory("flink-rest-server-netty-worker"));
//创建ServerBootstrap启动类
bootstrap = new ServerBootstrap();
//绑定bossGroup和workerGroup以及initializer
bootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(initializer);
//从restBindPortRanger选择端口
Iterator portsIterator;
try {
portsIterator = NetUtils.getPortRangeFromString(restBindPortRange);
} catch (IllegalConfigurationException e) {
throw e;
} catch (Exception e) {
throw new IllegalArgumentException("Invalid port range definition: " + restBindPortRange);
}
//从portsIterator选择没有占用的端口,作为bootstrap启动的端口
int chosenPort = 0;
while (portsIterator.hasNext()) {
try {
chosenPort = portsIterator.next();
final ChannelFuture channel;
if (restBindAddress == null) {
channel = bootstrap.bind(chosenPort);
} else {
channel = bootstrap.bind(restBindAddress, chosenPort);
}
serverChannel = channel.syncUninterruptibly().channel();
break;
} catch (final Exception e) {
// continue if the exception is due to the port being in use, fail early otherwise
if (!(e instanceof org.jboss.netty.channel.ChannelException || e instanceof java.net.BindException)) {
throw e;
}
}
}
if (serverChannel == null) {
throw new BindException("Could not start rest endpoint on any port in port range " + restBindPortRange);
}
//ServerBootstrap启动成功
log.debug("Binding rest endpoint to {}:{}.", restBindAddress, chosenPort);
final InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress();
final String advertisedAddress;
if (bindAddress.getAddress().isAnyLocalAddress()) {
advertisedAddress = this.restAddress;
} else {
advertisedAddress = bindAddress.getAddress().getHostAddress();
}
final int port = bindAddress.getPort();
log.info("Rest endpoint listening at {}:{}", advertisedAddress, port);
restbaseUrl = new URL(determineProtocol(), advertisedAddress, port, "").toString();
restAddressFuture.complete(restbaseUrl);
//状态设定为running
state = State.RUNNING;
//调用内部启动方法,启动RestEndpoint服务
startInternal();
}
}
每一段的功能都标在上面了,读者可以自己看。最后用startInternal()启动了服务,这是一个抽象方法,不同的WME有不同的实现。
三、Dispatcher
Dispatcher涉及的组件很多,可以通过下面的图大致了解一下
- Dispatcher:负责对集群中的作业进行接收和分发处理。客户端把作业通过ClusterClient提交到Dispatcher,Dispatcher通过JobGraph启动JobManager。DispatcherRunner:负责启动和管理Dispatcher组件,支持Leader选举。DispatcherLeaderProcess:管理Dispatcher生命周期,提供对JobGraph的任务恢复管理功能。DispatcherGatewayService:用于获取DispatcherGateway。
这部分源码还没看明白,等研究到job提交那块再来补
三、ResourceManager
RM的功能应该知道的比较多,我就不细写了
先看看RM的继承关系
继承关系是 RpcEndpoint→FencedRpcEndpoint→ResourceManager
RpcEndpoint:rpc节点的基类,所有提供原创调用的组件都要继承这个类,所以可以看见Dispatcher、JobMaster也在下面
FencedRpcEndpoint:包装了一层高可用相关的功能
我们以StandaloneResourceManagerFactory的createResourceManager()为例看一下Rm怎么创建
public ResourceManagercreateResourceManager( Configuration configuration, ResourceID resourceId, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, FatalErrorHandler fatalErrorHandler, ClusterInformation clusterInformation, @Nullable String webInterfaceUrl, JobManagerMetricGroup jobManagerMetricGroup) throws Exception { final ResourceManagerRuntimeServicesConfiguration resourceManagerRuntimeServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration); //先创建runtimeService再返回要创建的RM final ResourceManagerRuntimeServices resourceManagerRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration( resourceManagerRuntimeServicesConfiguration, highAvailabilityServices, rpcService.getScheduledExecutor()); return new StandaloneResourceManager( rpcService, getEndpointId(), resourceId, highAvailabilityServices, heartbeatServices, resourceManagerRuntimeServices.getSlotManager(), metricRegistry, resourceManagerRuntimeServices.getJobLeaderIdService(), clusterInformation, fatalErrorHandler, jobManagerMetricGroup); }
可以看到代码中在返回RM之前创建了应该ResourceManagerRuntimeServices,其中fromConfiguration方法包含了SlotManager和JobLeaderService两个内部服务的创建
创建RM需要RpcService、HeartbeatService、HAService等服务,这些已经提前创建好作为参数传进来了。
1、SlotManager
SlotManager创建完成之后,会调用ResourceManager.start()来启动RM组件,因为RM继承自RpcEndpoint,所以RM本质上是一个RPC组件服务,启动RM组件实际上就是启动RM组件对应的RpcServer,当RM对应的RPC服务启动后,就会通过RpcEndpoint调用RM.onStart()方法启动RM内部的其他核心服务,最终完成RM的启动流程。
当对应的RPC服务启动后,会通过RpcEndpoint调用RM.onstart()方法中的startResourceManagerServices()启动RM内部其他组件。该方法代码如下
private void startResourceManagerServices() throws Exception {
try {
//从高可用服务中获得选举服务
leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService();
//初始化
initialize();
//通过LeaderElectionService服务启动当前RM,并设定为Leader
leaderElectionService.start(this);
//启动JobLeaderIdService
jobLeaderIdService.start(new JobLeaderIdActionsImpl());
//注册slot和TaskExecutor的监控指标
registerSlotAndTaskExecutorMetrics();
} catch (Exception e) {
handleStartResourceManagerServicesException(e);
}
}
在上面的leaderElectionService.start(this)的代码会调用一个RM的grantLeadership()方法,会把当前节点设为RM的leader节点,这个方法代码如下
@Override
public void grantLeadership(final UUID newLeaderSessionID) {
//增加异步操作
final CompletableFuture acceptLeadershipFuture = clearStateFuture
.thenComposeAsync((ignored) -> tryAcceptLeadership(newLeaderSessionID), getUnfencedMainThreadExecutor());
final CompletableFuture confirmationFuture = acceptLeadershipFuture.thenAcceptAsync(
(acceptLeadership) -> {
if (acceptLeadership) {
// confirming the leader session ID might be blocking,进行leadership确认
leaderElectionService.confirmLeaderSessionID(newLeaderSessionID);
}
},
getRpcService().getExecutor());
/confirm/iationFuture.whenComplete(
(Void ignored, Throwable throwable) -> {
if (throwable != null) {
onFatalError(ExceptionUtils.stripCompletionException(throwable));
}
});
}
其中 tryAcceptLeadership(newLeaderSessionID)方法启动了心跳服务和slotmanager服务,代码如下
private CompletableFuturetryAcceptLeadership(final UUID newLeaderSessionID) { if (leaderElectionService.hasLeadership(newLeaderSessionID)) { final ResourceManagerId newResourceManagerId = ResourceManagerId.fromUuid(newLeaderSessionID); log.info("ResourceManager {} was granted leadership with fencing token {}", getAddress(), newResourceManagerId); // clear the state if we've been the leader before if (getFencingToken() != null) { clearStateInternal(); } setFencingToken(newResourceManagerId); //启动心跳服务 startHeartbeatServices(); //启动SM服务 slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl()); return prepareLeadershipAsync().thenApply(ignored -> true); } else { return CompletableFuture.completedFuture(false); } }
HeartService的启动方法包含了对JobManager和TaskManager两种组件的心跳服务。
另外SlotManager的服务也被slotManager.start()启动起来,代码如下
public void start(ResourceManagerId newResourceManagerId, Executor newMainThreadExecutor, ResourceActions newResourceActions) {
LOG.info("Starting the SlotManager.");
//校验参数
this.resourceManagerId = Preconditions.checkNotNull(newResourceManagerId);
mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor);
resourceActions = Preconditions.checkNotNull(newResourceActions);
//开始状态设为true
started = true;
//周期性超时检查线程服务,防止TM长时间掉线
taskManagerTimeoutCheck = scheduledExecutor.scheduleWithFixedDelay(
() -> mainThreadExecutor.execute(
() -> checkTaskManagerTimeouts()),
0L,
taskManagerTimeout.toMilliseconds(),
TimeUnit.MILLISECONDS);
//启动SlotRequest周期性超时检查
slotRequestTimeoutCheck = scheduledExecutor.scheduleWithFixedDelay(
() -> mainThreadExecutor.execute(
() -> checkSlotRequestTimeouts()),
0L,
slotRequestTimeout.toMilliseconds(),
TimeUnit.MILLISECONDS);
}
这样,RM就被成功启动起来,此时RM可以和TM和JM交互进行工作了。
TaskManager
欠着



