exception:
...
(s8,1651649852323,-0.5016369965544841)
>>>>>>>>>>>>>>>>>>>>>>>> task: 0 collect sensor data [(s8,1651649847591,-1.2380488385396649)]
(s8,1651649847591,-1.2380488385396649)
>>>>>>>>>>>>>>>>>>>>>>>> task: 0 collect sensor data [(s10,1651649838365,0.094858647175857)]
(s10,1651649838365,0.094858647175857)
>>>>>>>>>>>>>>>>>>>>>>>> task: 0 collect sensor data [(s5,1651649835715,-1.0281408001674686)]
(s5,1651649835715,-1.0281408001674686)
>>>>>>>>>>>>>>>>>>>>>>>> task: 0 collect sensor data [(s9,1651649840604,0.7907248303726941)]
(s9,1651649840604,0.7907248303726941)
>>>>>>>>>>>>>>>>>>>>>>>> task: 0 collect sensor data [(s6,1651649845531,2.15487589803033)]
(s6,1651649845531,2.15487589803033)
eeeeeeeeeeeeeeeeeeeeee error: An exception occurred in run(..) functioncurrent sensor timestamp is: 1651649844382
[Source: Custom Source -> Sink: Print to Std. Out (1/1)#0] WARN org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> Sink: Print to Std. Out (1/1)#0 (dad8a279f4f968c9c537f4830779725a) switched from RUNNING to FAILED with failure cause: java.lang.RuntimeException: An unstoppable error has occurred in SensorDataGenerator by 2022-5-4 15:24:52
at cn.jiayeli.SensorDataGenerator.run(SensorDataGenerator.java:72)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:323)
[Source: Custom Source -> Sink: Print to Std. Out (1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Source: Custom Source -> Sink: Print to Std. Out (1/1)#0 (dad8a279f4f968c9c537f4830779725a).
[flink-akka.actor.default-dispatcher-29] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state FAILED to JobManager for task Source: Custom Source -> Sink: Print to Std. Out (1/1)#0 dad8a279f4f968c9c537f4830779725a.
[flink-akka.actor.default-dispatcher-30] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source -> Sink: Print to Std. Out (1/1) (dad8a279f4f968c9c537f4830779725a) switched from RUNNING to FAILED on 577037fe-22c0-4534-81fc-67a167074b6a @ localhost (dataPort=-1).
java.lang.RuntimeException: An unstoppable error has occurred in SensorDataGenerator by 2022-5-4 15:24:52
at cn.jiayeli.SensorDataGenerator.run(SensorDataGenerator.java:72)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:323)
[flink-akka.actor.default-dispatcher-29] INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager - Clearing resource requirements of job 45a00d9a600c4c1302d5ed2f585f3bef
[flink-akka.actor.default-dispatcher-30] INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy - Calculating tasks to restart to recover the failed task cbc357ccb763df2852fee8c4fc7d55f2_0.
[flink-akka.actor.default-dispatcher-30] INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy - 1 tasks should be restarted to recover the failed task cbc357ccb763df2852fee8c4fc7d55f2_0.
[flink-akka.actor.default-dispatcher-30] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job operator state demo (45a00d9a600c4c1302d5ed2f585f3bef) switched from state RUNNING to RESTARTING.
[flink-akka.actor.default-dispatcher-30] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job operator state demo (45a00d9a600c4c1302d5ed2f585f3bef) switched from state RESTARTING to RUNNING.
[flink-akka.actor.default-dispatcher-30] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Restoring job 45a00d9a600c4c1302d5ed2f585f3bef from Checkpoint 12 @ 1651649083468 for 45a00d9a600c4c1302d5ed2f585f3bef located at file:/home/kuro/workspace/bigdata/wordCount/src/main/resources/ckdir/45a00d9a600c4c1302d5ed2f585f3bef/chk-12.
[flink-akka.actor.default-dispatcher-30] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - No master state to restore
[flink-akka.actor.default-dispatcher-30] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source -> Sink: Print to Std. Out (1/1) (91c3ef7e9304e91d75a781c73f0eb1c0) switched from CREATED to SCHEDULED.
[flink-akka.actor.default-dispatcher-35] INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager - Received resource requirements from job 45a00d9a600c4c1302d5ed2f585f3bef: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=1}]
[flink-akka.actor.default-dispatcher-30] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source -> Sink: Print to Std. Out (1/1) (91c3ef7e9304e91d75a781c73f0eb1c0) switched from SCHEDULED to DEPLOYING.
[flink-akka.actor.default-dispatcher-30] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Source: Custom Source -> Sink: Print to Std. Out (1/1) (attempt #1) with attempt id 91c3ef7e9304e91d75a781c73f0eb1c0 to 577037fe-22c0-4534-81fc-67a167074b6a @ localhost (dataPort=-1) with allocation id e89ec2565f336c52122cad2cda394963
[flink-akka.actor.default-dispatcher-35] INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl - Activate slot e89ec2565f336c52122cad2cda394963.
[flink-akka.actor.default-dispatcher-35] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task Source: Custom Source -> Sink: Print to Std. Out (1/1)#1 (91c3ef7e9304e91d75a781c73f0eb1c0), deploy into slot with allocation id e89ec2565f336c52122cad2cda394963.
[Source: Custom Source -> Sink: Print to Std. Out (1/1)#1] INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> Sink: Print to Std. Out (1/1)#1 (91c3ef7e9304e91d75a781c73f0eb1c0) switched from CREATED to DEPLOYING.
[Source: Custom Source -> Sink: Print to Std. Out (1/1)#1] INFO org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task Source: Custom Source -> Sink: Print to Std. Out (1/1)#1 (91c3ef7e9304e91d75a781c73f0eb1c0) [DEPLOYING].
[Source: Custom Source -> Sink: Print to Std. Out (1/1)#1] INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Using job/cluster config to configure application-defined state backend: org.apache.flink.runtime.state.hashmap.HashMapStateBackend@619837ff
[Source: Custom Source -> Sink: Print to Std. Out (1/1)#1] INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Using application-defined state backend: org.apache.flink.runtime.state.hashmap.HashMapStateBackend@65a7af14
[Source: Custom Source -> Sink: Print to Std. Out (1/1)#1] INFO org.apache.flink.runtime.state.StateBackendLoader - State backend loader loads the state backend as HashMapStateBackend
[Source: Custom Source -> Sink: Print to Std. Out (1/1)#1] INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Using job/cluster config to configure application-defined checkpoint storage: org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage@20a53dc4
[Source: Custom Source -> Sink: Print to Std. Out (1/1)#1] INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> Sink: Print to Std. Out (1/1)#1 (91c3ef7e9304e91d75a781c73f0eb1c0) switched from DEPLOYING to INITIALIZING.
[flink-akka.actor.default-dispatcher-35] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source -> Sink: Print to Std. Out (1/1) (91c3ef7e9304e91d75a781c73f0eb1c0) switched from DEPLOYING to INITIALIZING.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
代码:
package cn.jiayeli;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URI;
import java.util.concurrent.TimeUnit;
public class OperatorSateDemo {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//指定存储目录
env.getCheckpointConfig().setCheckpointStorage(URI.create("file:///home/kuro/workspace/bigdata/wordCount/src/main/resources/ckdir"));
env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);
env.getCheckpointConfig().setCheckpointTimeout(TimeUnit.SECONDS.toSeconds(15));
//checkpoint间隔
env.getCheckpointConfig().setCheckpointInterval(1000 * 10);
env.setStateBackend(new HashMapStateBackend());
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, Time.seconds(5)));
Logger logger = LoggerFactory.getLogger(OperatorSateDemo.class.getName());
DataStreamSource> tuple3DataStreamSource = env.addSource(new SensorDataGenerator());
tuple3DataStreamSource.print();
try {
env.execute("operator state demo");
} catch (Exception e) {
e.printStackTrace();
}
}
}
SensorDataGenerator
package cn.jiayeli; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.scala.typeutils.Types; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.text.DateFormat; import java.util.*; import java.util.concurrent.TimeUnit; public class SensorDataGenerator extends RichParallelSourceFunction> implements CheckpointedFunction { private boolean isRunning = true; private static final Random random = new Random(); private static final Logger logger = LoggerFactory.getLogger(SensorDataGenerator.class.getName()); //传感器列表 List sensorList = Arrays.asList("s1", "s2", "s3", "s4", "s5", "s6", "s7", "s8", "s9", "s10"); //传感器数据 Tuple3 sensorData; //传感器时间戳 private static Long timestamp; //保存当前时间戳,用于持久化后容错 private ListState listState; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); } @Override public void run(SourceContext > ctx) throws Exception { Object lock = ctx.getCheckpointLock(); while (isRunning) { int randomNum = Math.abs(this.random.nextInt()); String sensorId = sensorList.get(randomNum % sensorList.size()); //模拟15minter内乱序的时间 timestamp = timestamp == null ? System.currentTimeMillis() - (this.random.nextInt() % (1000 * 60 * 15)) : timestamp - (this.random.nextInt() % (1000 * 10)); double temperature = this.random.nextGaussian(); if (randomNum % 100 == 0) { logger.debug("An exception occurred in run(..) function" + "current sensor timestamp is:t" + timestamp); System.out.println("eeeeeeeeeeeeeeeeeeeeeeterror: tAn exception occurred in run(..) function" + "current sensor timestamp is:t" + timestamp); throw new RuntimeException("An unstoppable error has occurred in SensorDataGenerator byt" + DateFormat.getDateTimeInstance().format(System.currentTimeMillis())); } try { TimeUnit.SECONDS.sleep(randomNum % 2); } catch (InterruptedException e) { e.printStackTrace(); } sensorData = Tuple3.of(sensorId, timestamp, temperature); logger.debug(">>>>>>>>>>>>>>>>>>>>>>>> task: " + getRuntimeContext().getIndexOfThisSubtask() + "tcollect sensor datat[" + sensorData + "]"); System.out.println(">>>>>>>>>>>>>>>>>>>>>>>> task: " + getRuntimeContext().getIndexOfThisSubtask() + "tcollect sensor datat[" + sensorData + "]"); // synchronized (lock) { ctx.collect(sensorData); } } } @Override public void cancel() { this.isRunning = false; } @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { System.out.println("sssssssssssssssssssssssssssssssssssss:ttask: " + getRuntimeContext().getIndexOfThisSubtask() + "tsnapshot cn.jiayeli.state byt" + DateFormat.getDateTimeInstance(1,0).format(System.currentTimeMillis()) + "cn.jiayeli.state content:t" + timestamp); System.out.println("sssssssssssssssssssssssssssssssssssss:tcheckpointId:t" + context.getCheckpointId() + "t" + "CheckpointTimestamp:t" + context.getCheckpointTimestamp()); listState.update(Arrays.asList(timestamp)); } @Override public void initializeState(FunctionInitializationContext context) throws Exception { listState = context.getOperatorStateStore().getListState(new ListStateDescriptor ("sensor timestamp", Types.LONG())); while (listState.get().iterator().hasNext()) { timestamp = listState.get().iterator().next(); } System.out.println("iiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii:ttask id : " + getRuntimeContext().getIndexOfThisSubtask() + "tinitializer:t[" + timestamp + "]"); } }
原因:
checkpoint时没有task在运行,但是吧flink的checkpoint时在task状态为running时才会进行trigger,上面的报错才 switched from DEPLOYING to INITIALIZING就triggerl。为啥呢???
解决:
- stackoverflow: source中没有数据时将线程永久挂起
- 待解决



