栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

任务处理基类

任务处理基类


public abstract class baseTask {
    //定义parameterTool工具类
    public static ParameterTool parameterTool;
    public static String appName;

    
    static {
        try {
            parameterTool = ParameterTool.fromPropertiesFile(baseTask.class.getClassLoader().getResourceAsStream("conf.properties"));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    //todo 1)初始化flink流式处理的开发环境
    private static StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    
    public static StreamExecutionEnvironment getEnv(String className){
        System.setProperty("HADOOP_USER_NAME", "root");
        //设置全局的参数(使用的时候可以直接用法:getRuntimeContext())
        env.getConfig().setGlobalJobParameters(parameterTool);
        //todo  2)按照事件时间处理数据(terminalTimeStamp)进行窗口的划分和水印的添加
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //为了后续进行测试方便,将并行度设置为1,在生产环境一定不要设置代码级别的并行度,可以设置client级别的并行度
        env.setParallelism(1);

        //todo 3)开启checkpoint
        //  3.1:设置每隔30s周期性开启checkpoint
        env.enableCheckpointing(30*1000);
        //  3.2:设置检查点的model、exactly-once、保证数据一次性语义
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //  3.3:设置两次checkpoint的时间间隔,避免两次间隔太近导致频繁的checkpoint,而出现业务处理能力下降
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(20 * 1000);
        //  3.4:设置checkpoint的超时时间
        env.getCheckpointConfig().setCheckpointTimeout(20*1000);
        //  3.5:设置checkpoint的最大尝试次数,同一个时间有几个checkpoint在运行
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        //  3.6:设置checkpoint取消的时候,是否保留checkpoint,checkpoint默认会在job取消的时候删除checkpoint
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //  3.7:设置执行job过程中,保存检查点错误时,job不失败
        env.getCheckpointConfig().setFailonCheckpointingErrors(false);
        //  3.8:设置检查点的存储位置,使用rocketDBStateBackend,存储本地+hdfs分布式文件系统,可以进行增量检查点
        String hdfsbasePath = parameterTool.getRequired("hdfsUri");
        try {
            env.setStateBackend(new RocksDBStateBackend(hdfsbasePath+"/flink/checkpoint/"+ className));
        } catch (IOException e) {
            e.printStackTrace();
        }
        //todo  4)设置任务的重启策略(固定延迟重启策略、失败率重启策略、无重启策略)
        //todo  4.1:如果开启了checkpoint,默认不停的重启,没有开启checkpoint,无重启策略
        env.setRestartStrategy(RestartStrategies.fallBackRestart());

        appName = className;
        //返回env对象
        return env;
    }

    
    public static DataStream createKafkaStream(Class clazz) throws IllegalAccessException, InstantiationException {
        //todo  5)创建flink消费kafka数据的对象,指定kafka的参数信息
        Properties props = new Properties();
        // 5.1:设置kafka的集群地址
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, parameterTool.getRequired("bootstrap.servers"));
        // 5.2:设置消费者组id
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, parameterTool.getRequired("kafka.group.id")+appName);
        // 5.3:设置kafka的分区感知(动态感知)
        props.setProperty("flink.partition-discovery.interval-millis", "30000");
        // 5.4:设置key和value的反序列化(可选)
        // 5.5:设置是否自动递交offset
        props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, parameterTool.get("enable.auto.reset", "earliest"));
        // 5.6:创建kafka的消费者实例
        FlinkKafkaConsumer011 kafkaConsumer011= new FlinkKafkaConsumer011(
                parameterTool.getRequired("kafka.topic"),
                clazz.newInstance(),
                props
        );
        // 5.7:设置自动递交offset保存到检查点
        kafkaConsumer011.setCommitOffsetsonCheckpoints(true);
        //todo  6)将kafka消费者对象添加到环境中
        DataStreamSource streamSource = env.addSource(kafkaConsumer011);
        //返回消费到的数据
        return streamSource;
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/389550.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号