栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

flink ... Aborting checkpoint. Failure reason: Not all required tasks are currently running.

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

flink ... Aborting checkpoint. Failure reason: Not all required tasks are currently running.

exception:

...
(s8,1651649852323,-0.5016369965544841)
>>>>>>>>>>>>>>>>>>>>>>>> task: 0	collect sensor data	[(s8,1651649847591,-1.2380488385396649)]
(s8,1651649847591,-1.2380488385396649)
>>>>>>>>>>>>>>>>>>>>>>>> task: 0	collect sensor data	[(s10,1651649838365,0.094858647175857)]
(s10,1651649838365,0.094858647175857)
>>>>>>>>>>>>>>>>>>>>>>>> task: 0	collect sensor data	[(s5,1651649835715,-1.0281408001674686)]
(s5,1651649835715,-1.0281408001674686)
>>>>>>>>>>>>>>>>>>>>>>>> task: 0	collect sensor data	[(s9,1651649840604,0.7907248303726941)]
(s9,1651649840604,0.7907248303726941)
>>>>>>>>>>>>>>>>>>>>>>>> task: 0	collect sensor data	[(s6,1651649845531,2.15487589803033)]
(s6,1651649845531,2.15487589803033)
eeeeeeeeeeeeeeeeeeeeee	error: 	An exception occurred in run(..) functioncurrent sensor timestamp is:	1651649844382
[Source: Custom Source -> Sink: Print to Std. Out (1/1)#0] WARN org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> Sink: Print to Std. Out (1/1)#0 (dad8a279f4f968c9c537f4830779725a) switched from RUNNING to FAILED with failure cause: java.lang.RuntimeException: An unstoppable error has occurred in SensorDataGenerator by	2022-5-4 15:24:52
	at cn.jiayeli.SensorDataGenerator.run(SensorDataGenerator.java:72)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:323)

[Source: Custom Source -> Sink: Print to Std. Out (1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Source: Custom Source -> Sink: Print to Std. Out (1/1)#0 (dad8a279f4f968c9c537f4830779725a).
[flink-akka.actor.default-dispatcher-29] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state FAILED to JobManager for task Source: Custom Source -> Sink: Print to Std. Out (1/1)#0 dad8a279f4f968c9c537f4830779725a.
[flink-akka.actor.default-dispatcher-30] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source -> Sink: Print to Std. Out (1/1) (dad8a279f4f968c9c537f4830779725a) switched from RUNNING to FAILED on 577037fe-22c0-4534-81fc-67a167074b6a @ localhost (dataPort=-1).
java.lang.RuntimeException: An unstoppable error has occurred in SensorDataGenerator by	2022-5-4 15:24:52
	at cn.jiayeli.SensorDataGenerator.run(SensorDataGenerator.java:72)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:323)
[flink-akka.actor.default-dispatcher-29] INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager - Clearing resource requirements of job 45a00d9a600c4c1302d5ed2f585f3bef
[flink-akka.actor.default-dispatcher-30] INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy - Calculating tasks to restart to recover the failed task cbc357ccb763df2852fee8c4fc7d55f2_0.
[flink-akka.actor.default-dispatcher-30] INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy - 1 tasks should be restarted to recover the failed task cbc357ccb763df2852fee8c4fc7d55f2_0. 
[flink-akka.actor.default-dispatcher-30] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job operator state demo (45a00d9a600c4c1302d5ed2f585f3bef) switched from state RUNNING to RESTARTING.
[flink-akka.actor.default-dispatcher-30] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job operator state demo (45a00d9a600c4c1302d5ed2f585f3bef) switched from state RESTARTING to RUNNING.
[flink-akka.actor.default-dispatcher-30] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Restoring job 45a00d9a600c4c1302d5ed2f585f3bef from Checkpoint 12 @ 1651649083468 for 45a00d9a600c4c1302d5ed2f585f3bef located at file:/home/kuro/workspace/bigdata/wordCount/src/main/resources/ckdir/45a00d9a600c4c1302d5ed2f585f3bef/chk-12.
[flink-akka.actor.default-dispatcher-30] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - No master state to restore
[flink-akka.actor.default-dispatcher-30] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source -> Sink: Print to Std. Out (1/1) (91c3ef7e9304e91d75a781c73f0eb1c0) switched from CREATED to SCHEDULED.
[flink-akka.actor.default-dispatcher-35] INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager - Received resource requirements from job 45a00d9a600c4c1302d5ed2f585f3bef: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=1}]
[flink-akka.actor.default-dispatcher-30] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source -> Sink: Print to Std. Out (1/1) (91c3ef7e9304e91d75a781c73f0eb1c0) switched from SCHEDULED to DEPLOYING.
[flink-akka.actor.default-dispatcher-30] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Source: Custom Source -> Sink: Print to Std. Out (1/1) (attempt #1) with attempt id 91c3ef7e9304e91d75a781c73f0eb1c0 to 577037fe-22c0-4534-81fc-67a167074b6a @ localhost (dataPort=-1) with allocation id e89ec2565f336c52122cad2cda394963
[flink-akka.actor.default-dispatcher-35] INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl - Activate slot e89ec2565f336c52122cad2cda394963.
[flink-akka.actor.default-dispatcher-35] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task Source: Custom Source -> Sink: Print to Std. Out (1/1)#1 (91c3ef7e9304e91d75a781c73f0eb1c0), deploy into slot with allocation id e89ec2565f336c52122cad2cda394963.
[Source: Custom Source -> Sink: Print to Std. Out (1/1)#1] INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> Sink: Print to Std. Out (1/1)#1 (91c3ef7e9304e91d75a781c73f0eb1c0) switched from CREATED to DEPLOYING.
[Source: Custom Source -> Sink: Print to Std. Out (1/1)#1] INFO org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task Source: Custom Source -> Sink: Print to Std. Out (1/1)#1 (91c3ef7e9304e91d75a781c73f0eb1c0) [DEPLOYING].
[Source: Custom Source -> Sink: Print to Std. Out (1/1)#1] INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Using job/cluster config to configure application-defined state backend: org.apache.flink.runtime.state.hashmap.HashMapStateBackend@619837ff
[Source: Custom Source -> Sink: Print to Std. Out (1/1)#1] INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Using application-defined state backend: org.apache.flink.runtime.state.hashmap.HashMapStateBackend@65a7af14
[Source: Custom Source -> Sink: Print to Std. Out (1/1)#1] INFO org.apache.flink.runtime.state.StateBackendLoader - State backend loader loads the state backend as HashMapStateBackend
[Source: Custom Source -> Sink: Print to Std. Out (1/1)#1] INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Using job/cluster config to configure application-defined checkpoint storage: org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage@20a53dc4
[Source: Custom Source -> Sink: Print to Std. Out (1/1)#1] INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> Sink: Print to Std. Out (1/1)#1 (91c3ef7e9304e91d75a781c73f0eb1c0) switched from DEPLOYING to INITIALIZING.
[flink-akka.actor.default-dispatcher-35] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source -> Sink: Print to Std. Out (1/1) (91c3ef7e9304e91d75a781c73f0eb1c0) switched from DEPLOYING to INITIALIZING.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
[Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Failed to trigger checkpoint for job 45a00d9a600c4c1302d5ed2f585f3bef because Checkpoint triggering task Source: Custom Source -> Sink: Print to Std. Out (1/1) of job 45a00d9a600c4c1302d5ed2f585f3bef is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.

代码:

package cn.jiayeli;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URI;
import java.util.concurrent.TimeUnit;

public class OperatorSateDemo {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //指定存储目录
        env.getCheckpointConfig().setCheckpointStorage(URI.create("file:///home/kuro/workspace/bigdata/wordCount/src/main/resources/ckdir"));
      env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);
        env.getCheckpointConfig().setCheckpointTimeout(TimeUnit.SECONDS.toSeconds(15));
        //checkpoint间隔
        env.getCheckpointConfig().setCheckpointInterval(1000 * 10);
        env.setStateBackend(new HashMapStateBackend());
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, Time.seconds(5)));
        Logger logger = LoggerFactory.getLogger(OperatorSateDemo.class.getName());
        DataStreamSource> tuple3DataStreamSource = env.addSource(new SensorDataGenerator());

        tuple3DataStreamSource.print();

        try {
            env.execute("operator state demo");
        } catch (Exception e) {
            e.printStackTrace();
        }


    }
}


SensorDataGenerator

package cn.jiayeli;

import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.scala.typeutils.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.text.DateFormat;
import java.util.*;
import java.util.concurrent.TimeUnit;


public class SensorDataGenerator extends RichParallelSourceFunction> implements CheckpointedFunction {

    private boolean isRunning = true;

    private  static final Random random = new Random();

    private static final Logger logger = LoggerFactory.getLogger(SensorDataGenerator.class.getName());

    //传感器列表
    List sensorList = Arrays.asList("s1", "s2", "s3", "s4", "s5", "s6", "s7", "s8", "s9", "s10");

    //传感器数据
    Tuple3 sensorData;

    //传感器时间戳
    private static Long timestamp;

    //保存当前时间戳,用于持久化后容错
    private ListState listState;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
    }

    @Override
    public void run(SourceContext> ctx) throws Exception {
        Object lock = ctx.getCheckpointLock();

        while (isRunning) {

            int randomNum = Math.abs(this.random.nextInt());

            String sensorId = sensorList.get(randomNum % sensorList.size());
            //模拟15minter内乱序的时间
            timestamp = timestamp == null ? System.currentTimeMillis() - (this.random.nextInt() % (1000 * 60 * 15)) : timestamp - (this.random.nextInt() % (1000 * 10));

            double temperature = this.random.nextGaussian();

            if (randomNum % 100 == 0) {
                logger.debug("An exception occurred in run(..) function" + "current sensor timestamp is:t" + timestamp);
                System.out.println("eeeeeeeeeeeeeeeeeeeeeeterror: tAn exception occurred in run(..) function" + "current sensor timestamp is:t" + timestamp);
                throw new RuntimeException("An unstoppable error has occurred in SensorDataGenerator byt" + DateFormat.getDateTimeInstance().format(System.currentTimeMillis()));
            }

            try {
                TimeUnit.SECONDS.sleep(randomNum % 2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            sensorData = Tuple3.of(sensorId, timestamp, temperature);
            logger.debug(">>>>>>>>>>>>>>>>>>>>>>>> task: " +  getRuntimeContext().getIndexOfThisSubtask() + "tcollect sensor datat[" + sensorData + "]");
            System.out.println(">>>>>>>>>>>>>>>>>>>>>>>> task: " +  getRuntimeContext().getIndexOfThisSubtask() + "tcollect sensor datat[" + sensorData + "]");
            //
            synchronized (lock) {
                ctx.collect(sensorData);
            }
        }

    }

    @Override
    public void cancel() {
        this.isRunning = false;
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {

        System.out.println("sssssssssssssssssssssssssssssssssssss:ttask: " +  getRuntimeContext().getIndexOfThisSubtask() + "tsnapshot cn.jiayeli.state byt"
                + DateFormat.getDateTimeInstance(1,0).format(System.currentTimeMillis())
                + "cn.jiayeli.state content:t" + timestamp);
        System.out.println("sssssssssssssssssssssssssssssssssssss:tcheckpointId:t" + context.getCheckpointId() + "t" + "CheckpointTimestamp:t" + context.getCheckpointTimestamp());
        listState.update(Arrays.asList(timestamp));

    }


    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        listState = context.getOperatorStateStore().getListState(new ListStateDescriptor("sensor timestamp", Types.LONG()));

        while (listState.get().iterator().hasNext()) {
            timestamp = listState.get().iterator().next();
        }

        System.out.println("iiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii:ttask id : " + getRuntimeContext().getIndexOfThisSubtask() + "tinitializer:t[" + timestamp + "]");
    }
}

原因:
checkpoint时没有task在运行,但是吧flink的checkpoint时在task状态为running时才会进行trigger,上面的报错才 switched from DEPLOYING to INITIALIZING就triggerl。为啥呢???

解决:

  • stackoverflow: source中没有数据时将线程永久挂起
  • 待解决
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/860255.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号