栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink-Exactly-once一致性系列实践1

Flink-Exactly-once一致性系列实践1

Flink-Exactly-once系列实践-KafkaToKafka

文章目录

Flink-Exactly-once系列实践-KafkaToKafka一、Kafka输入输出流工具类二、统计字符个数案例三、消费者消费kafka的事务数据总结与可能出现的问题


一、Kafka输入输出流工具类

代码如下(示例):

//获取kafkaStream流
    public static  DataStream getKafkaDataStream(ParameterTool parameterTool,Class clazz,StreamExecutionEnvironment env) throws IllegalAccessException, InstantiationException {
        //加入到flink的环境全局配置中,后续可以通过上下文获取该工具类,总而得到想要的值
        env.getConfig().setGlobalJobParameters(parameterTool);


        //kafka配置项
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", parameterTool.get("bootstrap.servers"));
        properties.setProperty("group.id",parameterTool.get("group.idsource"));
        properties.setProperty("auto.offset.reset",parameterTool.get("auto.offset.reset"));
        properties.setProperty("enable.auto.commit",parameterTool.get("enable.auto.commit", String.valueOf(false)));
        String topics = parameterTool.get("Consumertopics");

        //序列化类实例化
        DeserializationSchema deserializationSchema = clazz.newInstance();

        FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer<>(topics, deserializationSchema, properties);

        flinkKafkaConsumer.setStartFromEarliest();
        //开启kafka的offset与checkpoint绑定
        flinkKafkaConsumer.setCommitOffsetsOnCheckpoints(true);

        return env.addSource(flinkKafkaConsumer);
    }



    //获取kafka生产者通用方法

    
    public static  FlinkKafkaProducer getFlinkKafkaProducer(ParameterTool parameterTool,KafkaSerializationSchema kafkaSerializationSchema){
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", parameterTool.get("bootstrap.servers"));
        properties.setProperty("group.id",parameterTool.get("group.idsink"));
//        properties.setProperty("transaction.max.timeout.ms",parameterTool.get("transaction.max.timeout.ms"));
        properties.setProperty("transaction.timeout.ms",parameterTool.get("transaction.timeout.ms"));
        properties.setProperty("client.id", "flinkOutputTopicClient");
        String topics = parameterTool.get("Producetopice");

        return new FlinkKafkaProducer(topics,kafkaSerializationSchema,properties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);

    }

注意点事项
一、消费者注意项
1.flinkKafkaConsumer.setCommitOffsetsonCheckpoints(true),将kafka自动提交offset关闭并且与flink的CheckPoint绑定。
2.bootstrap.servers kafka的broker host
3.setStartFromEarliest()设置kafka的消息消费从最初位置开始
二、生产者注意项
1.transaction.timeout.ms 默认情况下Kafka Broker 将transaction.max.timeout.ms设置为15分钟,我们需要将此值设置低于15分钟
2.FlinkKafkaProducer.Semantic.EXACTLY_ONCE设置kafka为精确一次

二、统计字符个数案例

代码如下(示例):

public static void main(String[] args) throws Exception {
        //1.创建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //2.设置并行度
        env.setParallelism(4);

        //3.设置CK和状态后端
        CkAndStateBacked.setCheckPointAndStateBackend(env,"FS");

        //4.获取kafkaStream流
        InputStream kafkaPropertiesStream = KafkaToKafkaExacitly.class.getClassLoader().getResourceAsStream("kafka.properties");
        ParameterTool parameterTool=ParameterTool.fromPropertiesFile(kafkaPropertiesStream);
        //将配置流放到全局flink运行时环境
        env.getConfig().setGlobalJobParameters(parameterTool);
        SimpleStringSchema simpleStringSchema = new SimpleStringSchema();
        Class stringSchemaClass = simpleStringSchema.getClass();
        DataStream kafkaDataStream = KafkaUtil.getKafkaDataStream(parameterTool, stringSchemaClass, env);

        System.out.println("==================================================");
        kafkaDataStream.print();

        //5.map包装成value,1
        SingleOutputStreamOperator> tupleStream = kafkaDataStream.map(new MapFunction>() {
            @Override
            public Tuple2 map(String value) throws Exception {
                if("error".equals(value)){
                    throw new RuntimeException("发生异常!!!");
                }
                return new Tuple2<>(value, 1);
            }
        });
        tupleStream.print();
        //6.按照value进行分组,并且统计value的个数
        SingleOutputStreamOperator> reduceStream = tupleStream.keyBy(new KeySelector, String>() {
            @Override
            public String getKey(Tuple2 value) throws Exception {
                return value.f0;
            }
        }).reduce(new ReduceFunction>() {
            @Override
            public Tuple2 reduce(Tuple2 value1, Tuple2 value2) throws Exception {
                return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
            }
        });


        System.out.println("=====================================================");
        reduceStream.print();
        //7.将数据输出到kafka
        FlinkKafkaProducer> flinkKafkaProducer = KafkaUtil.getFlinkKafkaProducer(parameterTool, new KafkaSerializationSchema>() {
            @Override
            public void open(SerializationSchema.InitializationContext context) throws Exception {
                System.out.println("=========正在向KafkaProduce输出数据!!!=============");
            }

            @Override
            public ProducerRecord serialize(Tuple2 element, @Nullable Long timestamp) {
                String producetopics = parameterTool.get("Producetopice");
                String result = element.toString();
                return new ProducerRecord(producetopics, result.getBytes(StandardCharsets.UTF_8));
            }
        });
        reduceStream.addSink(flinkKafkaProducer).name("kafkasinktest").uid("kafkasink");

        //任务执行
        env.execute("KafkaToKafkaTest");
    }

注意事项:
这里使用的是本地FSstateBackend,注意你的路径的设置,以hdfs://或者file://为地址标识符,否则Flink的文件系统将无法识别。

三、消费者消费kafka的事务数据
ublic static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties sourceProperties = new Properties();
        sourceProperties.setProperty("bootstrap.servers", "*****");
        sourceProperties.setProperty("group.id", "****");
        //端到端一致性:消费数据时需要配置isolation.level=read_committed(默认值为read_uncommitted)
        sourceProperties.put("isolation.level", "read_committed");

        FlinkKafkaConsumer ConsumerKafka = new FlinkKafkaConsumer<>("*****", new SimpleStringSchema(), sourceProperties);
        ConsumerKafka.setStartFromEarliest();
        DataStreamSource dataStreamSource = env.addSource(ConsumerKafka);

        dataStreamSource.print();

        env.execute();
    }

isolation.level这里设置为read_committed(默认为read_uncommitted)
这里可以看到以你CheckPoint设置的时间,来批量展示kafka生产者的消息。

总结与可能出现的问题

以上是flink 实现kafka的精确一次的测试例子,这里还有一点要注意,就是小伙伴们的kafka的配置里面

offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
default.replication.factor=1

这四个参数里面default.replication.factor是你kafka真正每个topic的副本数量,但是在开启事务也就是flink的addsink的时候会默认继承两阶段提交的方式,这里transaction.state.log.replication.factor一定要大于或者等于transaction.state.log.min.isr,否则你的kafka集群不满足事务副本复制的基本属性,会一直不成功,那么你的CheckPoint就会超时过期,从而导致任务的整体失败。

kafka集群第一次有消费者消费消息时会自动创建 __consumer_offsets,它的副本因子受 offsets.topic.replication.factor 参数的约束,默认值为3(注意:该参数的使用限制在0.11.0.0版本发生变化),分区数可以通过 offsets.topic.num.partitions 参数设置,默认值为50,在开启事务性的情况下就会首先会获得一个全局的TransactionCoordinator id和transactional producer并且生成唯一的序列号等
类似于一下的例子来唯一标识当前事务的消息对应的offset,以及标识

[2022-03-24 21:07:40,022] INFO [TransactionCoordinator id=0] Initialized transactionalId Keyed Reduce -> (Sink: Print to Std. Out, Sink: kafkasinktest)-b0c5e26be6392399cc3c8a38581a81c2-8 with producerId 11101 and producer epoch 8 on partition __transaction_state-18 (kafka.coordinator.transaction.TransactionCoordinator)

当flink任务出现异常的情况下,kafka会把以及提交但是未标记可以消费的数据直接销毁,或者正常的情况下,会正式提交(本质是修改消息的标志位),之后对于消费者在开启isolation.level的时候就可以读取以及标记为可以读取的message!

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/774229.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号