栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Flink源码篇 No.6-任务提交之启动ResourceManager(per-job on yarn)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Flink源码篇 No.6-任务提交之启动ResourceManager(per-job on yarn)

第1章 create创建并启动resourceManager

回到最初创建resourceManager的地方: 

org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory#create

@Override
public DispatcherResourceManagerComponent create(
		Configuration configuration,
		Executor ioExecutor,
		RpcService rpcService,
		HighAvailabilityServices highAvailabilityServices,
		BlobServer blobServer,
		HeartbeatServices heartbeatServices,
		MetricRegistry metricRegistry,
		ArchivedExecutionGraphStore archivedExecutionGraphStore,
		MetricQueryServiceRetriever metricQueryServiceRetriever,
		FatalErrorHandler fatalErrorHandler) throws Exception {

		// ...
		// TODO 创建resourceManager
		resourceManager = resourceManagerFactory.createResourceManager(
			configuration,
			ResourceID.generate(),
			rpcService,
			highAvailabilityServices,
			heartbeatServices,
			fatalErrorHandler,
			new ClusterInformation(hostname, blobServer.getPort()),
			webMonitorEndpoint.getRestbaseUrl(),
			metricRegistry,
			hostname,
			ioExecutor);

		// ...

		// TODO 创建并启动 Dispatcher,并由Dispatcher启动JobManager
		log.debug("Starting Dispatcher.");
		dispatcherRunner = dispatcherRunnerFactory.createDispatcherRunner(
			highAvailabilityServices.getDispatcherLeaderElectionService(),
			fatalErrorHandler,
			new HaServicesJobGraphStoreFactory(highAvailabilityServices),
			ioExecutor,
			rpcService,
			partialDispatcherServices);

		//TODO 启动 ResourceManager
		log.debug("Starting ResourceManager.");
		resourceManager.start();

		resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);
		dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);

		return new DispatcherResourceManagerComponent(
			dispatcherRunner,
			DefaultResourceManagerService.createFor(resourceManager),
			dispatcherLeaderRetrievalService,
			resourceManagerRetrievalService,
			webMonitorEndpoint,
			fatalErrorHandler);
		
		// ...
		
}

同样是调用Akka(RPC服务)启动ResourceManager,执行ResourceManager类中的onStart方法

org.apache.flink.runtime.resourcemanager.ResourceManager#onStart

@Override
public final void onStart() throws Exception {
	try {
		// TODO 启动RM服务
		startResourceManagerServices();
	} catch (Throwable t) {
		final ResourceManagerException exception = new ResourceManagerException(String.format("Could not start the ResourceManager %s", getAddress()), t);
		onFatalError(exception);
		throw exception;
	}
}

org.apache.flink.runtime.resourcemanager.ResourceManager#startResourceManagerServices

private void startResourceManagerServices() throws Exception {
	try {
		leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService();

		// TODO 初始化
		initialize();

		// TODO leader选举服务进行启动
		leaderElectionService.start(this);
		jobLeaderIdService.start(new JobLeaderIdActionsImpl());

		registerTaskExecutorMetrics();
	} catch (Exception e) {
		handleStartResourceManagerServicesException(e);
	}
}

第2章 initialize初始化

org.apache.flink.runtime.resourcemanager.ResourceManager#initialize是一个抽象方法,我们看具体的实现

org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager#initialize

@Override
protected void initialize() throws ResourceManagerException {
	try {
		resourceManagerDriver.initialize(
				this,
				new GatewayMainThreadExecutor(),
				ioExecutor);
	} catch (Exception e) {
		throw new ResourceManagerException("Cannot initialize resource provider.", e);
	}
}

org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver#initialize的具体实现如下:

这里我们看YarnResourceManagerDriver的父类AbstractResourceManagerDriver

org.apache.flink.runtime.resourcemanager.active.AbstractResourceManagerDriver#initialize

@Override
public final void initialize(
		ResourceEventHandler resourceEventHandler,
		ScheduledExecutor mainThreadExecutor,
		Executor ioExecutor) throws Exception {
	this.resourceEventHandler = Preconditions.checkNotNull(resourceEventHandler);
	this.mainThreadExecutor = Preconditions.checkNotNull(mainThreadExecutor);
	this.ioExecutor = Preconditions.checkNotNull(ioExecutor);

	// TODO YARN模式下初始化ResourceManager
	initializeInternal();
}

 org.apache.flink.yarn.YarnResourceManagerDriver#initializeInternal

@Override
protected void initializeInternal() throws Exception {
	final YarnContainerEventHandler yarnContainerEventHandler = new YarnContainerEventHandler();
	try {
		// TODO 创建并启动Yarn的resourceManager客户端,用于向Yarn申请资源
		resourceManagerClient = yarnResourceManagerClientFactory.createResourceManagerClient(
			yarnHeartbeatIntervalMillis,
			yarnContainerEventHandler);
		resourceManagerClient.init(yarnConfig);
		resourceManagerClient.start();

		// TODO 注册AppMaster
		final RegisterApplicationMasterResponse registerApplicationMasterResponse = registerApplicationMaster();
		getContainersFromPreviousAttempts(registerApplicationMasterResponse);
		taskExecutorProcessSpecContainerResourcePriorityAdapter =
			new TaskExecutorProcessSpecContainerResourcePriorityAdapter(
				registerApplicationMasterResponse.getMaximumResourceCapability(),
				ExternalResourceUtils.getExternalResources(flinkConfig, YarnConfigOptions.EXTERNAL_RESOURCE_YARN_CONFIG_KEY_SUFFIX));
	} catch (Exception e) {
		throw new ResourceManagerException("Could not start resource manager client.", e);
	}

	// TODO 创建并启动Yarn的nodeManager课户端,用于启动taskManager
	nodeManagerClient = yarnNodeManagerClientFactory.createNodeManagerClient(yarnContainerEventHandler);
	nodeManagerClient.init(yarnConfig);
	nodeManagerClient.start();
}

第3章 选举并启动resourceManager

这里的leaderElectionService.start(this)和之前启动JobManager是类似的。

org.apache.flink.runtime.leaderelection.LeaderElectionService#start的实现类,咱们看org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService,这个类其实和启动dispatcher是同一个类。

org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService#start

@Override
public void start(LeaderContender contender) throws Exception {
	Preconditions.checkNotNull(contender, "Contender must not be null.");
	Preconditions.checkState(leaderContender == null, "Contender was already set.");
 
	LOG.info("Starting ZooKeeperLeaderElectionService {}.", this);
 
	synchronized (lock) {
 
		client.getUnhandledErrorListenable().addListener(this);
 
		leaderContender = contender;
 
		//TODO 启动监听
		leaderLatch.addListener(this);
		leaderLatch.start();
 
		cache.getListenable().addListener(this);
		cache.start();
 
		client.getConnectionStateListenable().addListener(listener);
 
		running = true;
	}
}

这里就不再复述了,监听到状态变化之后,直接执行isLeader方法

org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService#isLeader

@Override
public void isLeader() {
	synchronized (lock) {
		if (running) {
			issuedLeaderSessionID = UUID.randomUUID();
			clearConfirmedLeaderInformation();

			if (LOG.isDebugEnabled()) {
				LOG.debug(
					"Grant leadership to contender {} with session ID {}.",
					leaderContender.getDescription(),
					issuedLeaderSessionID);
			}

			//TODO 内部创建dispatcher、jobManager、resourceManager 
			leaderContender.grantLeadership(issuedLeaderSessionID);
		} else {
			LOG.debug("Ignoring the grant leadership notification since the service has " +
				"already been stopped.");
		}
	}
}

org.apache.flink.runtime.leaderelection.LeaderContender#grantLeadership的具体实现类:

 这次我们是ResourceManager的启动,所以是ResourceManager

org.apache.flink.runtime.resourcemanager.ResourceManager#grantLeadership

@Override
public void grantLeadership(final UUID newLeaderSessionID) {
	// TODO 选举成功后的处理
	final CompletableFuture acceptLeadershipFuture = clearStateFuture
		.thenComposeAsync((ignored) -> tryAcceptLeadership(newLeaderSessionID), getUnfencedMainThreadExecutor());

	final CompletableFuture confirmationFuture = acceptLeadershipFuture.thenAcceptAsync(
		(acceptLeadership) -> {
			if (acceptLeadership) {
				// confirming the leader session ID might be blocking,
				leaderElectionService.confirmLeadership(newLeaderSessionID, getAddress());
			}
		},
		ioExecutor);

	/confirm/iationFuture.whenComplete(
		(Void ignored, Throwable throwable) -> {
			if (throwable != null) {
				onFatalError(ExceptionUtils.stripCompletionException(throwable));
			}
		});
}

org.apache.flink.runtime.resourcemanager.ResourceManager#tryAcceptLeadership

private CompletableFuture tryAcceptLeadership(final UUID newLeaderSessionID) {
	if (leaderElectionService.hasLeadership(newLeaderSessionID)) {
		final ResourceManagerId newResourceManagerId = ResourceManagerId.fromUuid(newLeaderSessionID);

		log.info("ResourceManager {} was granted leadership with fencing token {}", getAddress(), newResourceManagerId);

		// clear the state if we've been the leader before
		if (getFencingToken() != null) {
			clearStateInternal();
		}

		setFencingToken(newResourceManagerId);

		// TODO 启动RM服务
		startServicesonLeadership();

		return prepareLeadershipAsync().thenApply(ignored -> true);
	} else {
		return CompletableFuture.completedFuture(false);
	}
}

org.apache.flink.runtime.resourcemanager.ResourceManager#startServicesOnLeadership

private void startServicesonLeadership() {
	// TODO 启动心跳服务,RM与taskManager、jobManager之间的心跳
	startHeartbeatServices();

	// TODO 启动slotManager
	slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl());

	onLeadership();
}

到这里Flink就启动了ResourceManager,并启动了ResourceManager内部的slotManager。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/301952.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号