问题
在测试环境部署的flink,无法成功的存储checkpoints。或者使用flink命令执行savepoint也无法成功保存。hdsf中创建了对应的目录,却没有写任何文件。
通过flink控制台可以看到,job的checkpoint状态处于IN_PROGRESS状态。
执行flink savepoint也可以看到输出(log4j-cli.properties中开启DEBUG级别),不断获得状态是IN_PROGRESS直到超时。
| 2021-11-19 08:34:29,317 DEBUG org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufUtil [] - -Dio.netty.allocator.type: pooled 2021-11-19 08:34:29,317 DEBUG org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufUtil [] - -Dio.netty.threadLocalDirectBufferSize: 0 2021-11-19 08:34:29,317 DEBUG org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufUtil [] - -Dio.netty.maxThreadLocalCharBufferSize: 16384 2021-11-19 08:34:29,329 INFO org.apache.flink.client.cli.CliFrontend [] - Waiting for response... Waiting for response... 2021-11-19 08:34:29,377 DEBUG org.apache.flink.shaded.netty4.io.netty.util.Recycler [] - -Dio.netty.recycler.maxCapacityPerThread: 4096 2021-11-19 08:34:29,377 DEBUG org.apache.flink.shaded.netty4.io.netty.util.Recycler [] - -Dio.netty.recycler.maxSharedCapacityFactor: 2 2021-11-19 08:34:29,377 DEBUG org.apache.flink.shaded.netty4.io.netty.util.Recycler [] - -Dio.netty.recycler.linkCapacity: 16 2021-11-19 08:34:29,377 DEBUG org.apache.flink.shaded.netty4.io.netty.util.Recycler [] - -Dio.netty.recycler.ratio: 8 2021-11-19 08:34:29,916 DEBUG org.apache.flink.runtime.rest.RestClient [] - Received response {"request-id":"2acc7bbbc0ef3a19a595ffeb85c1706a"}. 2021-11-19 08:34:29,981 DEBUG org.apache.flink.runtime.rest.RestClient [] - Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to node24.qlteacher.com:8081/v1/jobs/9e97360ca975514b4a91369b05431267/savepoints/2acc7bbbc0ef3a19a595ffeb85c1706a 2021-11-19 08:34:30,011 DEBUG org.apache.flink.runtime.rest.RestClient [] - Received response {"status":{"id":"IN_PROGRESS"},"operation":null}. 2021-11-19 08:34:30,042 DEBUG org.apache.flink.runtime.rest.RestClient [] - Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to node24.qlteacher.com:8081/v1/jobs/9e97360ca975514b4a91369b05431267/savepoints/2acc7bbbc0ef3a19a595ffeb85c1706a 2021-11-19 08:34:30,059 DEBUG org.apache.flink.runtime.rest.RestClient [] - Received response {"status":{"id":"IN_PROGRESS"},"operation":null}. 2021-11-19 08:34:30,081 DEBUG org.apache.flink.runtime.rest.RestClient [] - Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to node24.qlteacher.com:8081/v1/jobs/9e97360ca975514b4a91369b05431267/savepoints/2acc7bbbc0ef3a19a595ffeb85c1706a 2021-11-19 08:34:30,094 DEBUG org.apache.flink.runtime.rest.RestClient [] - Received response {"status":{"id":"IN_PROGRESS"},"operation":null}. 2021-11-19 08:34:30,135 DEBUG org.apache.flink.runtime.rest.RestClient [] - Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to node24.qlteacher.com:8081/v1/jobs/9e97360ca975514b4a91369b05431267/savepoints/2acc7bbbc0ef3a19a595ffeb85c1706a 2021-11-19 08:34:30,149 DEBUG org.apache.flink.runtime.rest.RestClient [] - Received response {"status":{"id":"IN_PROGRESS"},"operation":null}. 2021-11-19 08:34:30,230 DEBUG org.apache.flink.runtime.rest.RestClient [] - Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to node24.qlteacher.com:8081/v1/jobs/9e97360ca975514b4a91369b05431267/savepoints/2acc7bbbc0ef3a19a595ffeb85c1706a |
通过flink控制台查看Job Manager的日志(log4j.properties中开启DEBUG级别,并且增加了%t关于线程名称的输出)可以看到"Triggering checkpoint 1 (type=SAVEPOINT) @ 1637282069853 for job 9e97360ca975514b4a91369b05431267."触发了checkpoint后,再无其他相关日志或异常
| 2021-11-19 08:34:29,723 DEBUG flink-rest-server-netty-worker-thread-1 org.apache.flink.shaded.netty4.io.netty.util.Recycler [] - -Dio.netty.recycler.maxCapacityPerThread: 4096 2021-11-19 08:34:29,724 DEBUG flink-rest-server-netty-worker-thread-1 org.apache.flink.shaded.netty4.io.netty.util.Recycler [] - -Dio.netty.recycler.maxSharedCapacityFactor: 2 2021-11-19 08:34:29,724 DEBUG flink-rest-server-netty-worker-thread-1 org.apache.flink.shaded.netty4.io.netty.util.Recycler [] - -Dio.netty.recycler.linkCapacity: 16 2021-11-19 08:34:29,724 DEBUG flink-rest-server-netty-worker-thread-1 org.apache.flink.shaded.netty4.io.netty.util.Recycler [] - -Dio.netty.recycler.ratio: 8 2021-11-19 08:34:29,850 INFO flink-akka.actor.default-dispatcher-19 org.apache.flink.runtime.jobmaster.JobMaster [] - Triggering savepoint for job 9e97360ca975514b4a91369b05431267. 2021-11-19 08:34:29,880 INFO Checkpoint Timer org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 1 (type=SAVEPOINT) @ 1637282069853 for job 9e97360ca975514b4a91369b05431267. 2021-11-19 08:34:38,958 DEBUG flink-akka.actor.default-dispatcher-2 org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Trigger heartbeat request. 2021-11-19 08:34:38,958 DEBUG flink-akka.actor.default-dispatcher-2 org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Trigger heartbeat request. 2021-11-19 08:34:38,958 DEBUG flink-akka.actor.default-dispatcher-17 org.apache.flink.runtime.jobmaster.JobMaster [] - Received heartbeat request from 86c79c1b6c206f760550c3773b560a98. 2021-11-19 08:34:38,959 DEBUG flink-akka.actor.default-dispatcher-2 org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Received heartbeat from f671b9c4e094cdf0975ea0ae43b50319. 2021-11-19 08:34:38,969 DEBUG flink-akka.actor.default-dispatcher-2 org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Received heartbeat from container_1637051640864_0050_01_000002. |
同样的Job在10.0.11.66部署的单机flink可以checkpoint,在开发本机上可以checkpoint,在10.0.11.21-24部署的yarn集群上不能checkpoint。所以怀疑yarn或其他环境问题导致。
解决过程
查看Flink源码,找到日志中最后一条有用输出“Triggering checkpoint”的源码位置。org.apache.flink.runtime.checkpoint.CheckpointCoordinator#createPendingCheckpoint
org.apache.flink.runtime.checkpoint.CheckpointCoordinator#createPendingCheckpoint
private PendingCheckpoint createPendingCheckpoint( long timestamp, CheckpointProperties props, CheckpointPlan checkpointPlan, boolean isPeriodic, long checkpointID, CheckpointStorageLocation checkpointStorageLocation, CompletableFutureonCompletionPromise) { synchronized (lock) { try { // since we haven't created the PendingCheckpoint yet, we need to check the // global state here. preCheckGlobalState(isPeriodic); } catch (Throwable t) { throw new CompletionException(t); } } final PendingCheckpoint checkpoint = new PendingCheckpoint( job, checkpointID, timestamp, checkpointPlan, OperatorInfo.getIds(coordinatorsToCheckpoint), masterHooks.keySet(), props, checkpointStorageLocation, onCompletionPromise); trackPendingCheckpointStats(checkpoint); synchronized (lock) { pendingCheckpoints.put(checkpointID, checkpoint); ScheduledFuture> cancellerHandle = timer.schedule( new CheckpointCanceller(checkpoint), checkpointTimeout, TimeUnit.MILLISECONDS); if (!checkpoint.setCancellerHandle(cancellerHandle)) { // checkpoint is already disposed! cancellerHandle.cancel(false); } } LOG.info( "Triggering checkpoint {} (type={}) @ {} for job {}.", checkpointID, checkpoint.getProps().getCheckpointType(), timestamp, job); return checkpoint; }
“Triggering checkpoint”的成功输出,说明这里没有问题,查询调用者代码org.apache.flink.runtime.checkpoint.CheckpointCoordinator#startTriggeringCheckpoint
org.apache.flink.runtime.checkpoint.CheckpointCoordinator#startTriggeringCheckpoint
private void startTriggeringCheckpoint(CheckpointTriggerRequest request) {
try {
synchronized (lock) {
preCheckGlobalState(request.isPeriodic);
}
// we will actually trigger this checkpoint!
Preconditions.checkState(!isTriggering);
isTriggering = true;
final long timestamp = System.currentTimeMillis();
CompletableFuture checkpointPlanFuture =
checkpointPlanCalculator.calculateCheckpointPlan();
final CompletableFuture pendingCheckpointCompletableFuture =
checkpointPlanFuture
.thenApplyAsync(
plan -> {
try {
CheckpointIdAndStorageLocation
checkpointIdAndStorageLocation =
initializeCheckpoint(
request.props,
request.externalSavepointLocation);
return new Tuple2<>(
plan, checkpointIdAndStorageLocation);
} catch (Throwable e) {
throw new CompletionException(e);
}
},
executor)
.thenApplyAsync(
(checkpointInfo) ->
createPendingCheckpoint(
timestamp,
request.props,
checkpointInfo.f0,
request.isPeriodic,
checkpointInfo.f1.checkpointId,
checkpointInfo.f1.checkpointStorageLocation,
request.getonCompletionFuture()),
timer);
final CompletableFuture> coordinatorCheckpointsComplete =
pendingCheckpointCompletableFuture.thenComposeAsync(
(pendingCheckpoint) ->
OperatorCoordinatorCheckpoints
.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(
coordinatorsToCheckpoint,
pendingCheckpoint,
timer),
timer);
// We have to take the snapshot of the master hooks after the coordinator checkpoints
// has completed.
// This is to ensure the tasks are checkpointed after the OperatorCoordinators in case
// ExternallyInducedSource is used.
final CompletableFuture> masterStatesComplete =
coordinatorCheckpointsComplete.thenComposeAsync(
ignored -> {
// If the code reaches here, the pending checkpoint is guaranteed to
// be not null.
// We use FutureUtils.getWithoutException() to make compiler happy
// with checked
// exceptions in the signature.
PendingCheckpoint checkpoint =
FutureUtils.getWithoutException(
pendingCheckpointCompletableFuture);
return snapshotMasterState(checkpoint);
},
timer);
FutureUtils.assertNoException(
CompletableFuture.allOf(masterStatesComplete, coordinatorCheckpointsComplete)
.handleAsync(
(ignored, throwable) -> {
final PendingCheckpoint checkpoint =
FutureUtils.getWithoutException(
pendingCheckpointCompletableFuture);
Preconditions.checkState(
checkpoint != null || throwable != null,
"Either the pending checkpoint needs to be created or an error must have been occurred.");
if (throwable != null) {
// the initialization might not be finished yet
if (checkpoint == null) {
onTriggerFailure(request, throwable);
} else {
onTriggerFailure(checkpoint, throwable);
}
} else {
if (checkpoint.isDisposed()) {
onTriggerFailure(
checkpoint,
new CheckpointException(
CheckpointFailureReason
.TRIGGER_CHECKPOINT_FAILURE,
checkpoint.getFailureCause()));
} else {
// no exception, no discarding, everything is OK
final long checkpointId =
checkpoint.getCheckpointId();
snapshotTaskState(
timestamp,
checkpointId,
checkpoint.getCheckpointStorageLocation(),
request.props,
checkpoint
.getCheckpointPlan()
.getTasksToTrigger());
coordinatorsToCheckpoint.forEach(
(ctx) ->
ctx.afterSourceBarrierInjection(
checkpointId));
// It is possible that the tasks has finished
// checkpointing at this point.
// So we need to complete this pending checkpoint.
if (!maybeCompleteCheckpoint(checkpoint)) {
return null;
}
onTriggerSuccess();
}
}
return null;
},
timer)
.exceptionally(
error -> {
if (!isShutdown()) {
throw new CompletionException(error);
} else if (findThrowable(
error, RejectedExecutionException.class)
.isPresent()) {
LOG.debug("Execution rejected during shutdown");
} else {
LOG.warn("Error encountered during shutdown", error);
}
return null;
}));
} catch (Throwable throwable) {
onTriggerFailure(request, throwable);
}
}
这里是重点,然后疯狂的给源码加LOG输出,几乎每行后面,每一个子方法里面都疯狂的加LOG输出。然后编译flink部署测试环境10.0.11.24(只留下这一台实验),编译一次20分钟,折腾了N回。
pendingCheckpointCompletableFuture
.thenComposeAsync(
(pendingCheckpoint) -> {
LOG.warn(
"pendingCheckpointCompletableFuture.thenComposeAsync >>>>>>>> pendingCheckpoint:{},Thread.currentThread():{}",
pendingCheckpoint,
Thread.currentThread().getId());
return OperatorCoordinatorCheckpoints
.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(
coordinatorsToCheckpoint,
pendingCheckpoint,
timer);
},
timer);
最终发现org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpoints#triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion中走了之后,其中返回的CompletableFuture没有被thenComposeAsync执行。
org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpoints#triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion
public static CompletableFuturetriggerAndAcknowledgeAllCoordinatorCheckpoints( final Collection coordinators, final PendingCheckpoint checkpoint, final Executor acknowledgeExecutor) throws Exception { LOG.warn( "triggerAndAcknowledgeAllCoordinatorCheckpoints >>>>> checkpoint:{}", checkpoint); try { final CompletableFuture snapshots = triggerAllCoordinatorCheckpoints(coordinators, checkpoint.getCheckpointId()); LOG.warn( "triggerAndAcknowledgeAllCoordinatorCheckpoints >>>>> snapshots:{},acknowledgeExecutor:{}", snapshots, acknowledgeExecutor); // TODO: 2021/11/18 下面没有走 ,为了看有没有错误导致不执行,源码的thenAcceptAsync替换成handleAsync return snapshots.handleAsync( (allSnapshots, err) -> { if (err != null) { LOG.error( "snapshots.thenAcceptAsync >>>>> err", err); } else { try { LOG.warn( "snapshots.thenAcceptAsync >>>>> checkpoint:{}, allSnapshots.snapshots:{}", checkpoint, allSnapshots.snapshots); acknowledgeAllCoordinators(checkpoint, allSnapshots.snapshots); } catch (Exception e) { LOG.error("snapshots.thenAcceptAsync >>>>> Exception", e); throw new CompletionException(e); } } return null; }, acknowledgeExecutor); } catch (Exception ex) { LOG.error( "triggerAndAcknowledgeAllCoordinatorCheckpoints >>>>> Exception", ex); throw ex; } } public static CompletableFuture triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion( final Collection coordinators, final PendingCheckpoint checkpoint, final Executor acknowledgeExecutor) throws CompletionException { try { LOG.warn( "OperatorCoordinatorCheckpoints.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion >>>>> checkpoint:{}", checkpoint); return triggerAndAcknowledgeAllCoordinatorCheckpoints( coordinators, checkpoint, acknowledgeExecutor); } catch (Exception e) { LOG.error( "OperatorCoordinatorCheckpoints.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion >>>>> Exception", e); throw new CompletionException(e); } }
返回snapshots.thenAcceptAsync产生的CompletableFuture没有执行,不知道卡住在哪里了。
return snapshots.thenAcceptAsync(
(allSnapshots) -> {
try {
acknowledgeAllCoordinators(checkpoint, allSnapshots.snapshots);
} catch (Exception e) {
throw new CompletionException(e);
}
},
acknowledgeExecutor);
注意到日志输出时,输出的日志中线程名称都是“Checkpoint Timer”
| 2021-11-19 08:34:29,880 INFO Checkpoint Timer org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 1 (type=SAVEPOINT) @ 1637282069853 for job 9e97360ca975514b4a91369b05431267. |
在测试环境上用arthas查查这个“Checkpoint Timer”干啥了
| #切换用户为yarn,因为flink进程时yarn启动的,否则不允许调试 su yarn #注意yarn不允许登录,需要修改/etc/passwd文件yarn:x:985:984:Hadoop Yarn:/var/lib/hadoop-yarn:/bin/bash yarn@node24:/tmp$ java -jar arthas-boot.jar [INFO] arthas-boot version: 3.5.3 [INFO] Found existing java process, please choose one and input the serial number of the process, eg : 1. Then hit ENTER. * [1]: 4176317 org.apache.flink.yarn.YarnTaskExecutorRunner [2]: 2775993 org.apache.hadoop.yarn.server.nodemanager.NodeManager [3]: 4176146 org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint #选择org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint进程 3 [INFO] arthas home: /var/lib/hadoop-yarn/.arthas/lib/3.5.4/arthas [INFO] Try to attach process 4176146 [INFO] Attach process 4176146 success. [INFO] arthas-client connect 127.0.0.1 3658 ,---. ,------. ,--------.,--. ,--. ,---. ,---. / O | .--. ''--. .--'| '--' | / O ' .-' | .-. || '--'.' | | | .--. || .-. |`. `-. | | | || | | | | | | || | | |.-' | `--' `--'`--' '--' `--' `--' `--'`--' `--'`-----' wiki Arthas 用户文档 — Arthas 3.5.4 文档 tutorials 淘宝网 - 淘!我喜欢-tutorials.html version 3.5.4 main_class pid 4176146 time 2021-11-19 09:16:06 #查询叫Checkpoint Timer的线程 [arthas@4176146]$ thread -all | grep Checkpoint Timer 86 Checkpoint Timer main 5 WAITING 0.0 0.000 0:0.020 false true #查看这个线程 [arthas@4176146]$ thread 86 "Checkpoint Timer" Id=86 WAITING on java.util.concurrent.CompletableFuture$WaitNode@7dbca052 at sun.misc.Unsafe.park(Native Method) - waiting on java.util.concurrent.CompletableFuture$WaitNode@7dbca052 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.CompletableFuture$WaitNode.block(CompletableFuture.java:271) at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3226) at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:319) at java.util.concurrent.CompletableFuture.access$000(CompletableFuture.java:111) at java.util.concurrent.CompletableFuture$AsyncCompose.exec(CompletableFuture.java:616) at java.util.concurrent.CompletableFuture$Async.run(CompletableFuture.java:428) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) [arthas@4176146]$ |
果然这个线程在奇怪的傻等着“WAITING on java.util.concurrent.CompletableFuture”,为什么会这么等着呢。继续观察源码。
里面:
return snapshots.thenAcceptAsync(
(allSnapshots) -> {
try {
acknowledgeAllCoordinators(checkpoint, allSnapshots.snapshots);
} catch (Exception e) {
throw new CompletionException(e);
}
},
acknowledgeExecutor);
外面:
final CompletableFuture> coordinatorCheckpointsComplete = pendingCheckpointCompletableFuture.thenComposeAsync( (pendingCheckpoint) -> OperatorCoordinatorCheckpoints .triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion( coordinatorsToCheckpoint, pendingCheckpoint, timer), timer);
观察thenComposeAsync内外都使用了同一个Executor参数执行,追查一下这个timer怎么来的,查到org.apache.flink.runtime.executiongraph.DefaultExecutionGraph#enableCheckpointing方法创建CheckpointCoordinator实例的时候。
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph#enableCheckpointing
checkpointCoordinatorTimer = Executors.newSingleThreadScheduledExecutor( new DispatcherThreadFactory( Thread.currentThread().getThreadGroup(), "Checkpoint Timer")); // create the coordinator that triggers and commits checkpoints and holds the state checkpointCoordinator = new CheckpointCoordinator( jobInformation.getJobId(), chkConfig, operatorCoordinators, checkpointIDCounter, checkpointStore, checkpointStorage, ioExecutor, checkpointsCleaner, new ScheduledExecutorServiceAdapter(checkpointCoordinatorTimer), SharedStateRegistry.DEFAULT_FACTORY, failureManager, createCheckpointPlanCalculator(), new ExecutionAttemptMappingProvider(getAllExecutionVertices()));
发现这个叫timer的Executor是Executors.newSingleThreadScheduledExecutor创建的单线程执行器。那问题会不会出在单线程上,这个执行器只要一个线程,外面的异步执行已经用了,而里面的异步也需要这个执行器提供一个线程,然后没线程用了。
我自己写了一套例子,然而本地执行证明这个猜想并不成立呀,没有卡住。不死心网上搜索相关问题,果然有人同样这么问,并且是有卡住的可能
How to use CompletableFuture.thenComposeAsync()?
我在将他写的例子在本地执行,但是仍然不会卡住。突然想起来问题是测试环境才有的
public class CompletableFutureTest {
public static void main(String[] args) {
ScheduledExecutorService singleThreadExecutor = Executors.newSingleThreadScheduledExecutor();
CompletableFuture.runAsync(() ->
{
System.out.println("Task 1. Thread: " + Thread.currentThread().getId());
}, singleThreadExecutor).thenComposeAsync((Void unused) ->
{
return CompletableFuture.runAsync(() ->
{
System.out.println("Task 2. Thread: " + Thread.currentThread().getId());
}, singleThreadExecutor);
}, singleThreadExecutor).join();
System.out.println("finished");
// 我写的例子更模仿flink的checkpoint代码,其实问题只在thenComposeAsync,都一样
// CompletableFuture> cf = CompletableFuture
// .supplyAsync(() -> "aaaaa", scheduledExecutorService)
// .thenComposeAsync((str) -> {
// return CompletableFuture.runAsync(() -> {
// try {
// Thread.sleep(2000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// System.out.println(str.concat(" bbbbbbbbbbbb"));
// }, scheduledExecutorService);
// }, scheduledExecutorService);
// CompletableFuture
// .allOf(cf.thenApplyAsync((str) -> {
// try {
// Thread.sleep(5000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// System.out.println("ddddddd");
// return "ccccc";
// }, scheduledExecutorService), cf)
// .handleAsync((str, err) -> {
// System.out.println(str);
// return null;
// }, scheduledExecutorService).join();
// scheduledExecutorService.shutdown();
}
}
然后拿到10.0.11.24出问题的测试环境执行,果然卡住了
然后拿到没有问题的10.0.11.66上执行
对比JDK版本 问题机器24:
没问题机器66:
也就是说某些低版本的jdk存在CompletableFuture会被Executors线程数不够卡死的问题。较高版本的JDK解决了这个问题。
在问题机器24上更换JDK版本
然而不行,查看Job Manager的日志JAVA_HOME仍然指向老的jdk
| 2021-11-19 10:17:27,420 INFO main org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Starting YarnJobClusterEntrypoint (Version: 1.13.1, Scala: 2.11, Rev:a7f3192, Date:2021-05-25T12:02:11+02:00) 2021-11-19 10:17:27,421 INFO main org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - OS current user: yarn 2021-11-19 10:17:27,712 INFO main org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Current Hadoop/Kerberos user: root 2021-11-19 10:17:27,714 INFO main org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.20-b23 2021-11-19 10:17:27,715 INFO main org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Maximum heap size: 429 MiBytes 2021-11-19 10:17:27,715 INFO main org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - JAVA_HOME: /usr/lib/jvm/j2sdk1.8-oracle 2021-11-19 10:17:27,718 INFO main org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Hadoop version: 3.0.0-cdh6.3.2 2021-11-19 10:17:27,718 INFO main org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - JVM Options: 2021-11-19 10:17:27,718 INFO main org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - -Xmx469762048 2021-11-19 10:17:27,718 INFO main org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - -Xms469762048 2021-11-19 10:17:27,718 INFO main org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - -XX:MaxmetaspaceSize=268435456 2021-11-19 10:17:27,719 INFO main org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - -Dlog.file=/yarn/ |
试了好几种方式yarn环境里的JAVA_HOME: /usr/lib/jvm/j2sdk1.8-oracle仍然指向老版本的JDK,为了着急看效果,直接将新版jdk文件覆盖/usr/lib/jvm/j2sdk1.8-oracle,然后重试。
savepoint成功!!!!
| 2021-11-19 10:48:44,377 DEBUG org.apache.flink.runtime.rest.RestClient [] - Received response {"status":{"id":"IN_PROGRESS"},"operation":null}. 2021-11-19 10:48:44,699 DEBUG org.apache.flink.runtime.rest.RestClient [] - Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to node24.qlteacher.com:8081/v1/jobs/ef1a9b56a942da679f0aaaf7602b710b/savepoints/3829b863721270f8b536a8ca8910d6e3 2021-11-19 10:48:44,716 DEBUG org.apache.flink.runtime.rest.RestClient [] - Received response {"status":{"id":"COMPLETED"},"operation":{"location":"hdfs://node21.qlteacher.com:8020/flink/cluster_yarn/savepoints/savepoint-ef1a9b-01be13ab205e"}}. 2021-11-19 10:48:44,718 INFO org.apache.flink.client.cli.CliFrontend [] - Savepoint completed. Path: hdfs://node21.qlteacher.com:8020/flink/cluster_yarn/savepoints/savepoint-ef1a9b-01be13ab205e Savepoint completed. Path: hdfs://node21.qlteacher.com:8020/flink/cluster_yarn/savepoints/savepoint-ef1a9b-01be13ab205e 2021-11-19 10:48:44,718 INFO org.apache.flink.client.cli.CliFrontend [] - You can resume your program from this savepoint with the run command. You can resume your program from this savepoint with the run command. 2021-11-19 10:48:44,721 DEBUG org.apache.flink.runtime.rest.RestClient [] - Shutting down rest endpoint. |



