栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink-kafka端到端精准一次性

flink-kafka端到端精准一次性

flink-kafka端到端精准一次性
  • producer有事务
//FlinkKafkaProducer 默认不读取全局配置而是写死默认值AT_LEAST_onCE 在创建KafkaProducer时要指定时间语义 详见: new FlinkKafkaProducer<>()
Optional> customPartitioner = Optional.of(new FlinkFixedPartitioner<>());
            flinkKafkaProducer = new FlinkKafkaProducer<>(topic,
                    new SimpleStringSchema(),
                    producerProp,
                    customPartitioner.orElse(null),
                    FlinkKafkaProducer.Semantic.EXACTLY_ONCE,
                    DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
//生产者的事务超时属性 使用EXACTLY_ONCE需要增加
producerProp.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 1000 * 60 * 5);
//设置事务ID,这里用了类名做唯一ID
producerProp.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, getClassName());
//开启幂等性
producerProp.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"true");
producerProp.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,"5");
  • consumer 有偏移量 且设置读已提交
//设置为读未提交
consumerProp.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_committed");
  • 中间有checkpoint 所以flink整合kafka 可以实现完整的精准一次性
  • 注意: flink-kafka精准一次的致命弱点: 在进行checkpoint时才会将数据提交到kafka,导致 流处理会转换成微批处理(微批的大小为checkpoint的间隔时间)
代码样例 pom.xml
  • 特别注意如果是本地只用导入flink-connector-kafka_2.12即可
  • 如果是flink集群上需要导入以下三个包才能正常使用kafka 因为flink-connector-kafka包中把后面两个排除掉了
    • flink-connector-kafka_2.12-1.12.0
    • flink-connector-kafka-base_2.12-1.12.0
    • kafka-clients-2.2.0

    org.apache.flink
    flink-connector-kafka_2.12
    ${flink.version}



    org.apache.flink
    flink-connector-base
    1.12.0


    org.apache.kafka
    kafka-clients
    2.4.1

本地测试依赖


    org.apache.flink
    flink-connector-kafka_2.12
    ${flink.version}

KafkaConsumerApp关键代码
        env.setParallelism(1);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointInterval(TimeUnit.MINUTES.toMillis(1));
        Properties consumerProp = new Properties();
        consumerProp.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));
        FlinkKafkaConsumer kafkaComsumer = KafkaUtil.getKafkaComsumer("sleepy.test.lq_test2",consumerProp);
        DataStreamSource source = env.addSource(kafkaComsumer);
        source.map(item-> {
            log.debug("KafkaConsumerApp: " + item);
            return item;
        });
        source.addSink(KafkaUtil.getKafkaProducer("sleepy.test.lq_test3"));
        execute();

KafkaProducerApp 关键代码
        env.setParallelism(1);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointInterval(TimeUnit.MINUTES.toMillis(1));
        FlinkKafkaConsumer kafkaComsumer = KafkaUtil.getKafkaComsumer("sleepy.test.lq_test");
        DataStreamSource source = env.addSource(kafkaComsumer);
        Properties producerProp = new Properties();
        producerProp.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, getClassName());
        producerProp.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"true");
        producerProp.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,"5");


        FlinkKafkaProducer kafkaProducer = KafkaUtil.getKafkaProducer("sleepy.test.lq_test2",producerProp);
        source.addSink(kafkaProducer);
        execute();
测试步骤
  1. 打开KafkaConsumerApp, 再打开KafkaProducerApp
  2. 向topic:sleepy.test.lq_test发送数据KafkaProducerApp接受到数据后会以事务的方式发送到topic:sleepy.test.lq_test2(先发送未提交的数据,checkpoint时再执行提交)
  3. KafkaConsumerApp会收到topic:sleepy.test.lq_test2中未提交的数据, 但是我们设置的模式是READ_COMMITTED(读已提交)则数据不会往下面发,也不会处理,等到上游KafkaProducerApp执行checkpoint时会将数据提交,此时KafkaConsumerApp才会处理数据并且打印debug日志最后发往topic:sleepy.test.lq_test3
  4. 如果KafkaConsumerApp设置的模式为READ_UNCOMMITTED(读未提交)则会及时的处理消息,未提交的消息也会被消费到. 此时无法保证两个flink程序整条链路的精准一次性.
源码说明 FlinkKafkaProducer 110行
public enum Semantic {

		
		EXACTLY_ONCE,
总结建议
  • 实时数仓最好使用AT_LEAST_ONCE+下游幂等性来保证端到端的精准一次.这样可以最大程度保证实时性.
  • 整条链路使用精准一次性,可以在以下情况同时满足时使用.但需要提前说明利弊
    1. 没有幂等性保证
    2. 下游无法找出唯一键去重
    3. 业务要求数据的精准一次
其他注意事项
  • KafkaProducer 默认是按照Task的ID%分区数 来向下游进行消息发送的. 如果上游只有一个分区内有数据则只会发往下游topic的一个分区中, 可以使用自定义分区器,来指定数据随机发往下游某个分区
public static FlinkKafkaPartitioner getRandomPartitioner(){
    FlinkKafkaPartitioner flinkKafkaPartitioner = new FlinkKafkaPartitioner() {
        @Override
        public int partition(String record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
            return partitions[RandomUtils.nextInt(0, partitions.length)];

        }
    };
    return flinkKafkaPartitioner;
}
public static FlinkKafkaProducer getKafkaProducer(String topic, Properties prop,FlinkKafkaPartitioner flinkKafkaPartitioner) {
    //如果传入了配置则对默认配置进行覆盖
    if (prop != null) {
        producerProp.putAll(prop);
    }
    FlinkKafkaProducer flinkKafkaProducer;
    if(flinkKafkaPartitioner==null){
        flinkKafkaProducer=new FlinkKafkaProducer<>(topic,
            new SimpleStringSchema(),
            producerProp);
    }else{
        flinkKafkaProducer= new FlinkKafkaProducer<>(topic,
            new SimpleStringSchema(),
            producerProp, Optional.of(flinkKafkaPartitioner));
    }

    return flinkKafkaProducer;
}
  • 2.4.1的kafkaCliet向低版本kafka 发送数据如果使用KafkaSerializationSchema 会导致数据无法发送进去
  • flink1.11.1 有bug KafkaProducer 因为没有重写open方法往kafka下游发送 只会发往固定的0分区, flink-1.11.3已修复
  • SimpleStringSchema使用注意无法处理null, 源码如下
    @Override
	public String deserialize(byte[] message) {
		return new String(message, charset);
	}
	@Override
	public byte[] serialize(String element) {
		return element.getBytes(charset);
	}

解决方法-改造后

public static class StringSchema extends SimpleStringSchema {
    @Override
    public String deserialize(byte[] message) {
        return message != null ? new String(message, StandardCharsets.UTF_8) : "";
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/676989.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号