- producer有事务
//FlinkKafkaProducer 默认不读取全局配置而是写死默认值AT_LEAST_onCE 在创建KafkaProducer时要指定时间语义 详见: new FlinkKafkaProducer<>() Optional> customPartitioner = Optional.of(new FlinkFixedPartitioner<>()); flinkKafkaProducer = new FlinkKafkaProducer<>(topic, new SimpleStringSchema(), producerProp, customPartitioner.orElse(null), FlinkKafkaProducer.Semantic.EXACTLY_ONCE, DEFAULT_KAFKA_PRODUCERS_POOL_SIZE); //生产者的事务超时属性 使用EXACTLY_ONCE需要增加 producerProp.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 1000 * 60 * 5); //设置事务ID,这里用了类名做唯一ID producerProp.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, getClassName()); //开启幂等性 producerProp.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"true"); producerProp.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,"5");
- consumer 有偏移量 且设置读已提交
//设置为读未提交 consumerProp.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_committed");
- 中间有checkpoint 所以flink整合kafka 可以实现完整的精准一次性
- 注意: flink-kafka精准一次的致命弱点: 在进行checkpoint时才会将数据提交到kafka,导致 流处理会转换成微批处理(微批的大小为checkpoint的间隔时间)
- 特别注意如果是本地只用导入flink-connector-kafka_2.12即可
- 如果是flink集群上需要导入以下三个包才能正常使用kafka 因为flink-connector-kafka包中把后面两个排除掉了
- flink-connector-kafka_2.12-1.12.0
- flink-connector-kafka-base_2.12-1.12.0
- kafka-clients-2.2.0
KafkaConsumerApp关键代码org.apache.flink flink-connector-kafka_2.12${flink.version} org.apache.flink flink-connector-base1.12.0 本地测试依赖 org.apache.kafka kafka-clients2.4.1 org.apache.flink flink-connector-kafka_2.12${flink.version}
env.setParallelism(1);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointInterval(TimeUnit.MINUTES.toMillis(1));
Properties consumerProp = new Properties();
consumerProp.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));
FlinkKafkaConsumer kafkaComsumer = KafkaUtil.getKafkaComsumer("sleepy.test.lq_test2",consumerProp);
DataStreamSource source = env.addSource(kafkaComsumer);
source.map(item-> {
log.debug("KafkaConsumerApp: " + item);
return item;
});
source.addSink(KafkaUtil.getKafkaProducer("sleepy.test.lq_test3"));
execute();
KafkaProducerApp 关键代码
env.setParallelism(1);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointInterval(TimeUnit.MINUTES.toMillis(1));
FlinkKafkaConsumer kafkaComsumer = KafkaUtil.getKafkaComsumer("sleepy.test.lq_test");
DataStreamSource source = env.addSource(kafkaComsumer);
Properties producerProp = new Properties();
producerProp.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, getClassName());
producerProp.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"true");
producerProp.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,"5");
FlinkKafkaProducer kafkaProducer = KafkaUtil.getKafkaProducer("sleepy.test.lq_test2",producerProp);
source.addSink(kafkaProducer);
execute();
测试步骤
- 打开KafkaConsumerApp, 再打开KafkaProducerApp
- 向topic:sleepy.test.lq_test发送数据KafkaProducerApp接受到数据后会以事务的方式发送到topic:sleepy.test.lq_test2(先发送未提交的数据,checkpoint时再执行提交)
- KafkaConsumerApp会收到topic:sleepy.test.lq_test2中未提交的数据, 但是我们设置的模式是READ_COMMITTED(读已提交)则数据不会往下面发,也不会处理,等到上游KafkaProducerApp执行checkpoint时会将数据提交,此时KafkaConsumerApp才会处理数据并且打印debug日志最后发往topic:sleepy.test.lq_test3
- 如果KafkaConsumerApp设置的模式为READ_UNCOMMITTED(读未提交)则会及时的处理消息,未提交的消息也会被消费到. 此时无法保证两个flink程序整条链路的精准一次性.
public enum Semantic {
EXACTLY_ONCE,
总结建议
- 实时数仓最好使用AT_LEAST_ONCE+下游幂等性来保证端到端的精准一次.这样可以最大程度保证实时性.
- 整条链路使用精准一次性,可以在以下情况同时满足时使用.但需要提前说明利弊
- 没有幂等性保证
- 下游无法找出唯一键去重
- 业务要求数据的精准一次
- KafkaProducer 默认是按照Task的ID%分区数 来向下游进行消息发送的. 如果上游只有一个分区内有数据则只会发往下游topic的一个分区中, 可以使用自定义分区器,来指定数据随机发往下游某个分区
public static FlinkKafkaPartitionergetRandomPartitioner(){ FlinkKafkaPartitioner flinkKafkaPartitioner = new FlinkKafkaPartitioner () { @Override public int partition(String record, byte[] key, byte[] value, String targetTopic, int[] partitions) { return partitions[RandomUtils.nextInt(0, partitions.length)]; } }; return flinkKafkaPartitioner; } public static FlinkKafkaProducer getKafkaProducer(String topic, Properties prop,FlinkKafkaPartitioner flinkKafkaPartitioner) { //如果传入了配置则对默认配置进行覆盖 if (prop != null) { producerProp.putAll(prop); } FlinkKafkaProducer flinkKafkaProducer; if(flinkKafkaPartitioner==null){ flinkKafkaProducer=new FlinkKafkaProducer<>(topic, new SimpleStringSchema(), producerProp); }else{ flinkKafkaProducer= new FlinkKafkaProducer<>(topic, new SimpleStringSchema(), producerProp, Optional.of(flinkKafkaPartitioner)); } return flinkKafkaProducer; }
- 2.4.1的kafkaCliet向低版本kafka 发送数据如果使用KafkaSerializationSchema 会导致数据无法发送进去
- flink1.11.1 有bug KafkaProducer 因为没有重写open方法往kafka下游发送 只会发往固定的0分区, flink-1.11.3已修复
- SimpleStringSchema使用注意无法处理null, 源码如下
@Override
public String deserialize(byte[] message) {
return new String(message, charset);
}
@Override
public byte[] serialize(String element) {
return element.getBytes(charset);
}
解决方法-改造后
public static class StringSchema extends SimpleStringSchema {
@Override
public String deserialize(byte[] message) {
return message != null ? new String(message, StandardCharsets.UTF_8) : "";
}
}



