栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Flink kafka偏移量

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Flink kafka偏移量

文章目录
  • Flink kafka偏移量
    • 1 代码
    • 2 设置 offset 五种模式
      • 2.1 kafkaConsumer.setStartFromGroupOffsets();
      • 2.2 kafkaConsumer.setStartFromEarliest();
      • 2.3 kafkaConsumer.setStartFromLatest()
      • 2.4 kafkaConsumer.setStartFromTimestamp()
      • 2.5 kafkaConsumer.setStartFromSpecificOffsets(specificStartOffsets)
    • 3 properties配置offset
      • 3.1 earliest
      • 3.2 latest
      • 3.3 none
    • 4 相关指令
      • 4.1 查看所有的group
      • 4.2 查看消费者组的offset
      • 4.3 把某个group的offset设置到最初或指定位置

Flink kafka偏移量

version:1.13.1

1 代码
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // kafka 配置项
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "kafka:9092");
        properties.setProperty("group.id", "test1");
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // 从 kafka 读取数据
        FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);

        // 1.设置偏移量
        Map specificStartOffsets = new HashMap<>();
        // 1.1 topic 第0个分区,offset从第二个开始消费,offset从0开始
        specificStartOffsets.put(new KafkaTopicPartition("test", 0), 2L);
        // 1.2 设置其他offset模式
        //kafkaConsumer.setStartFromGroupOffsets();
        //kafkaConsumer.setStartFromEarliest();
        //kafkaConsumer.setStartFromLatest();
        //kafkaConsumer.setStartFromTimestamp()
        kafkaConsumer.setStartFromSpecificOffsets(specificStartOffsets);
        DataStream inputDataStream = env.addSource(kafkaConsumer);

        inputDataStream.print("data");
        env.execute();
2 设置 offset 五种模式 2.1 kafkaConsumer.setStartFromGroupOffsets();
  1. 默认值
  2. Flink从topic中指定的group上次消费的位置开始消费,所以必须配置group.id参数
  3. 测试效果:该默认值效果与latest一致
2.2 kafkaConsumer.setStartFromEarliest();
  1. 从最早的记录开始,使用此配置,在kafka中已经提交的offset将被忽略,不会被使用
  2. 测试效果:每次消费者重新连接,从头开始消费数据
2.3 kafkaConsumer.setStartFromLatest()
  1. 从最新的数据开始消费
  2. 测试效果:每次消费者重新连接,从最新的数据开始消费
2.4 kafkaConsumer.setStartFromTimestamp()
  1. Flink从topic中指定的时间点开始消费,指定时间点之前的数据忽略
  2. 测试效果:从topic中指定的时间点开始消费,指定时间点之前的数据忽略
2.5 kafkaConsumer.setStartFromSpecificOffsets(specificStartOffsets)
  1. Flink从topic中指定的offset开始,这个比较复杂,需要手动指定topic,Partition,offset
  2. 测试效果:从topic-Partition的offset开始消费
3 properties配置offset
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
3.1 earliest

当各partition有消费者组已提交的offset时,从提交的offset开始消费;无提交的offset时,从起始开始消费

3.2 latest

当各partition下有消费者组已提交的offset时,从提交的offset开始消费;无提交的offset时,消费最新的该partition下的数据

3.3 none

topic各partition都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

4 相关指令 4.1 查看所有的group
kafka-consumer-groups.sh --bootstrap-server master:9092 --list
4.2 查看消费者组的offset
kafka-consumer-groups.sh --bootstrap-server kafka:9092 --describe --group test1
4.3 把某个group的offset设置到最初或指定位置
# 把group sys在topic:event上的offcet恢复到最初
[root@slave3 ~]# kafka-consumer-groups.sh --bootstrap-server kafka:9092 --group sys --topic event --reset-offsets --to-earliest –execute
TOPIC                          PARTITION  NEW-OFFSET     
event                          0          519
# 【注意】最初的offset不一定是0,比如本例,519位置以前的数据已经过期,所以offset的最初位置就是519
# 恢复offcet到指定位置
[root@slave3 ~]# kafka-consumer-groups.sh --bootstrap-server kafka:9092 --group sys --topic event --reset-offsets --to-offset 1000 --execute
[2020-06-17 16:26:06,074] WARN New offset (518) is lower than earliest offset. Value will be set to 519 (kafka.admin.ConsumerGroupCommand$)
TOPIC                          PARTITION  NEW-OFFSET     
event                          0          1000
# 把offcet从当前位置往前移动100个,如果是正数就是往后移动。
[root@slave3 ~]# kafka-consumer-groups.sh --bootstrap-server kafka:9092 --group sys --topic event --reset-offsets --shift-by -100 --execute
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/583240.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号