| package cn.itcast.streaming.task;
import cn.itcast.entity.ItcastDataObj; import cn.itcast.streaming.sink.SrcDataToHbaseSink; import cn.itcast.streaming.sink.SrcDataToHbaseSinkOptimizer; import cn.itcast.streaming.sink.VehicleDetailSinkFunction; import cn.itcast.utils.FlinkUtil; import cn.itcast.utils.JsonParseUtil; import org.apache.commons.lang.StringUtils; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink; import org.apache.flink.streaming.connectors.fs.bucketing.DateTimeBucketer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import java.io.IOException; import java.util.Properties; import java.util.concurrent.TimeUnit;
public class KafkaSourceDataTask { public static void main(String[] args) throws Exception { //加载conf.properties配置文件,返回ParameterTool工具类对象 ParameterTool parameterTool = ParameterTool.fromPropertiesFile(baseTask.class.getClassLoader().getResourceAsStream("conf.properties"));
//TODO 1)初始化flink流式处理的开发环境 System.setProperty("HADOOP_USER_NAME", "root"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置全局的参数 env.getConfig().setGlobalJobParameters(parameterTool); //TODO 2)设置按照事件时间处理数据(划分窗口或者添加水印都需要事件时间) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //TODO 3)开启checkpoint //TODO 3.1:设置每隔30秒钟开启checkpoint env.enableCheckpointing(30*1000); //TODO 3.2:设置检查点的model,exactly-once,保证数据消费一次,数据不重复消费 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //TODO 3.3:设置两次checkpoint时间间隔,避免两次间隔太近导致频繁checkpoint而出现业务处理能力下降 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(20000); //TODO 3.4:设置checkpoint的超时时间 env.getCheckpointConfig().setCheckpointTimeout(20000); //TODO 3.5:设置checkpoint最大的尝试次数,同一个时间有几个checkpoint并行执行 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //TODO 3.6:设置checkpoint取消的时候,是否保留checkpoint,checkpoint默认会在job取消的时候删除 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //TODO 3.7:设置执行job过程中,保存检查点错误时,job不失败 env.getCheckpointConfig().setFailonCheckpointingErrors(false); //TODO 3.8:设置检查点存储的位置,使用rocksDBStateBackend,存储到本地+hdfs分布式文件,增量检查点 String bashHdfsUri = parameterTool.getRequired("hdfsUri"); try { env.setStateBackend(new RocksDBStateBackend(bashHdfsUri+"/flink/checkpoint/"+KafkaSourceDataTask.class.getSimpleName())); } catch (IOException e) { e.printStackTrace(); } //TODO 4)设置任务的重启策略(固定延迟重启策略、失败率重启策略、无重启策略) env.setRestartStrategy(RestartStrategies.noRestart());
//TODO 5)创建flink消费kafka数据的对象,指定kafka的参数信息 Properties props = new Properties(); //TODO 5.1:设置kafka集群地址 props.setProperty("bootstrap.servers", parameterTool.getRequired("bootstrap.servers")); //TODO 5.2:设置消费者组id props.setProperty("group.id", "KafkaSourceDataTask04"); //TODO 5.3:设置kafka的分区感知(动态监测) props.setProperty("flink.partition-discovery.interval-millis", "30000"); //TODO 5.5:设置自动递交offset位置策略 props.setProperty("auto.offset.reset", parameterTool.get("auto.offset.reset", "earliest")); //5.不自动提交偏移量,交给flink的checkpoint处理哦 props.setProperty("enable.auto.commit", parameterTool.get("enable.auto.commit", "false")); //TODO 5.6:创建kafka的消费者实例 FlinkKafkaConsumer011 kafkaConsumer = new FlinkKafkaConsumer011( parameterTool.getRequired("kafka.topic"), new SimpleStringSchema(), props ); //TODO 5.7:设置自动递交offset保存到检查点 kafkaConsumer.setCommitOffsetsonCheckpoints(true);
//TODO 6)将kafka消费者对象添加到环境中 DataStream dataStreamSource = env.addSource(kafkaConsumer);
//打印输出测试 dataStreamSource.print();
//TODO 13)启动作业,运行任务 env.execute(); } } |