栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink通过本地文件持久化算子状态并重启恢复数据

Flink通过本地文件持久化算子状态并重启恢复数据

Flink在运行过程中, 难免会因为一些异常导致服务终止, 因为Flink的优势在于处理实时数据, 所以重启的话, 可能会导致部分数据指标不正确, 会丢失部分数据, 比如统计最近一小时数据, 运行半小时终止, 再次重启, 也只能重新开启统计. 但Flink可以通过state来解决这个问题, 将状态保存在内存, 文件系统或者db中, 持久化后, 即可实现故障后重启继续计算.

以下示例是通过kafka作为数据源, 统计各message出现的次数, 利用keyBy, process和窗口富函数实现state初始化及更新的.

示例
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;

import java.util.Properties;

public class FlinkStateTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 每5s进行一次checkpoint
        env.enableCheckpointing(5000);
        // 恰好只消费一次 根据要求切换模式
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // checkpoint时间间隔
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000);
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        // 最大并发产生checkpoint个数
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        // job cancel之后checkpoint相关文件是否会清理
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        // backend保存位置
        env.setStateBackend(new FsStateBackend("file:///Users/guands/dev/checkpoints"));

        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
        kafkaProps.setProperty("group.id", "flink");
        FlinkKafkaConsumer kafka = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), kafkaProps);
        // kafka的offset开始位置
        kafka.setStartFromGroupOffsets();
        // 每次生成checkpoint时提交offset 保证数据不会丢失
        kafka.setCommitOffsetsOnCheckpoints(true);
        DataStreamSource dataStreamByEventTime = env.addSource(kafka);
        dataStreamByEventTime
                .keyBy((KeySelector) s -> s)
                .process(new KeyedProcessFunction() {
                    @Override
                    public void processElement(String value, KeyedProcessFunction.Context ctx, Collector out) throws Exception {
                        long count = 0;
                        if (countState.value() != null) {
                            count = countState.value();
                        }
                        count++;
                        countState.update(count);
                        System.out.println("count: " + count);
                        out.collect(value);
                    }

                    private ValueState countState;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        // 初始化state 获取当前key对应的个数
                        countState = getRuntimeContext().getState(new ValueStateDescriptor<>("test_count", Long.class));
                    }
                }).print();
        env.execute();
    }

}

运行后会在对应目录下产生一些文件

canncel job后通过以下命令重启flink job, 一般使用最新的checkpoint文件来恢复state, 避免无用计算.

重启
flink run -s D:Usersguandsdevcheckpoints7be830af951177a89815c8ab450b3c41chk-6_metadata -c com.....FlinkStateTest D:flink-1.0-SNAPSHOT.jar

再次生产kafka消息, 可以看到基数在上次基础上累加. 重启成功!

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/326403.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号