- Flink kafka偏移量
- 1 代码
- 2 设置 offset 五种模式
- 2.1 kafkaConsumer.setStartFromGroupOffsets();
- 2.2 kafkaConsumer.setStartFromEarliest();
- 2.3 kafkaConsumer.setStartFromLatest()
- 2.4 kafkaConsumer.setStartFromTimestamp()
- 2.5 kafkaConsumer.setStartFromSpecificOffsets(specificStartOffsets)
- 3 properties配置offset
- 3.1 earliest
- 3.2 latest
- 3.3 none
- 4 相关指令
- 4.1 查看所有的group
- 4.2 查看消费者组的offset
- 4.3 把某个group的offset设置到最初或指定位置
version:1.13.1
1 代码 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// kafka 配置项
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "kafka:9092");
properties.setProperty("group.id", "test1");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 从 kafka 读取数据
FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
// 1.设置偏移量
Map specificStartOffsets = new HashMap<>();
// 1.1 topic 第0个分区,offset从第二个开始消费,offset从0开始
specificStartOffsets.put(new KafkaTopicPartition("test", 0), 2L);
// 1.2 设置其他offset模式
//kafkaConsumer.setStartFromGroupOffsets();
//kafkaConsumer.setStartFromEarliest();
//kafkaConsumer.setStartFromLatest();
//kafkaConsumer.setStartFromTimestamp()
kafkaConsumer.setStartFromSpecificOffsets(specificStartOffsets);
DataStream inputDataStream = env.addSource(kafkaConsumer);
inputDataStream.print("data");
env.execute();
2 设置 offset 五种模式
2.1 kafkaConsumer.setStartFromGroupOffsets();
- 默认值
- Flink从topic中指定的group上次消费的位置开始消费,所以必须配置group.id参数
- 测试效果:该默认值效果与latest一致
- 从最早的记录开始,使用此配置,在kafka中已经提交的offset将被忽略,不会被使用
- 测试效果:每次消费者重新连接,从头开始消费数据
- 从最新的数据开始消费
- 测试效果:每次消费者重新连接,从最新的数据开始消费
- Flink从topic中指定的时间点开始消费,指定时间点之前的数据忽略
- 测试效果:从topic中指定的时间点开始消费,指定时间点之前的数据忽略
- Flink从topic中指定的offset开始,这个比较复杂,需要手动指定topic,Partition,offset
- 测试效果:从topic-Partition的offset开始消费
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");3.1 earliest
当各partition有消费者组已提交的offset时,从提交的offset开始消费;无提交的offset时,从起始开始消费
3.2 latest当各partition下有消费者组已提交的offset时,从提交的offset开始消费;无提交的offset时,消费最新的该partition下的数据
3.3 nonetopic各partition都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
4 相关指令 4.1 查看所有的groupkafka-consumer-groups.sh --bootstrap-server master:9092 --list4.2 查看消费者组的offset
kafka-consumer-groups.sh --bootstrap-server kafka:9092 --describe --group test14.3 把某个group的offset设置到最初或指定位置
# 把group sys在topic:event上的offcet恢复到最初 [root@slave3 ~]# kafka-consumer-groups.sh --bootstrap-server kafka:9092 --group sys --topic event --reset-offsets --to-earliest –execute TOPIC PARTITION NEW-OFFSET event 0 519 # 【注意】最初的offset不一定是0,比如本例,519位置以前的数据已经过期,所以offset的最初位置就是519 # 恢复offcet到指定位置 [root@slave3 ~]# kafka-consumer-groups.sh --bootstrap-server kafka:9092 --group sys --topic event --reset-offsets --to-offset 1000 --execute [2020-06-17 16:26:06,074] WARN New offset (518) is lower than earliest offset. Value will be set to 519 (kafka.admin.ConsumerGroupCommand$) TOPIC PARTITION NEW-OFFSET event 0 1000 # 把offcet从当前位置往前移动100个,如果是正数就是往后移动。 [root@slave3 ~]# kafka-consumer-groups.sh --bootstrap-server kafka:9092 --group sys --topic event --reset-offsets --shift-by -100 --execute



