栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

深入浅出理解kafka原理系列之:kafka消费者参数设置

深入浅出理解kafka原理系列之:kafka消费者参数设置

深入浅出理解kafka原理系列之:kafka消费者参数设置
    • 1.max.poll.records
    • 2.max.poll.interval.ms
    • 3.enable.auto.commit
    • 4.heartbeat.interval.ms
    • 5.session.timeout.ms
    • 6.auto.offset.reset
    • 6.指定分区消费
    • 7.消息回溯消费
    • 8.指定offset消费
    • 9.指定时间点开始消费

1.max.poll.records
  • 默认情况下,消费者一次会poll500条消息
  • 一次poll到500条,就直接执行for循环
  • 一次没有poll到500条,且时间在1秒内,长轮询继续poll,要么到500条,要么到1s
  • 如果多次poll都没到500条,且1秒时间到了,那么直接for循环
props.put("max.poll.records",500);

实例:

            while (true) {  //1)
                ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));
                for(ConsumerRecord record : records){
                    System.out.printf("topic = %s, partition = %s, offset = %d, key = %s, value =%sn",
                            record.topic(), record.partition(), record.offset(), record.key(), record.value());
                }
2.max.poll.interval.ms
  • 可以根据消费速度的快慢来设置,因为如果两次poll的时间超出了30s的时间间隔,kafka会认为消费能力过弱,将其踢出消费组,将分区分配给其他消费者。
  • 会触发rebalance机制,rebalance机制会造成性能开销,可以通过设置max.poll.records参数,让一次poll的消息条数少一点。
props.put("max.poll.interval.ms",30*1000);
3.enable.auto.commit
  • 指定了消费者是否自动提交消费位移,默认为true。
  • 如果需要减少重复消费或者数据丢失,你可以设置为false。
  • 如果为true,需要关注自动提交的时间间隔,该间隔由auto.commit.interval.ms设置。
props.put("enable.auto.commit",false);
4.heartbeat.interval.ms
  • consumer给broker发送心跳的间隔时刻,每隔1s
props.put("heartbeat.interval.ms",1000);
5.session.timeout.ms
props.put("session.timeout.ms",10*1000);
  • kafka如果超过10秒没有收到消费者宕心跳,则会把消费者提出消费者组,进行rebalance,把分区分配给其他消费者
6.auto.offset.reset
  • 默认latest:新消费组中的消费者在启动以后,默认会从当前分区的最后一条消息的offset+1开始消费(消费新消息)。
  • earliest:可以通过以下设置,让新的消费者第一次从头开始消费,之后开始消费新消息(最后消费的位置的偏移量+1)
//新消费组从头消费
props.put("auto.offset.reset","earliest");
6.指定分区消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME,0)));
7.消息回溯消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME,0)));
consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME,0)));
8.指定offset消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME,0)));
consumer.seek(new TopicPartition(TOPIC_NAME,0),10);
9.指定时间点开始消费
        //从指定时间点开始消费
        List partitionInfos = consumer.partitionsFor(TOPIC_NAME);
        //从1小时前开始消费
        long fetchDataTime = new Date().getTime() - 1000 * 60 * 60;
        HashMap map = new HashMap<>();
        for (PartitionInfo partitionInfo : partitionInfos){
            map.put(new TopicPartition(TOPIC_NAME, partitionInfo.partition()),fetchDataTime);
        }

        //分区,偏移量
        Map parMap = consumer.offsetsForTimes(map);

        for(Map.Entry entry : parMap.entrySet()){
            TopicPartition key = entry.getKey();
            OffsetAndTimestamp value = entry.getValue();
            if(key == null || value == null) continue;
            long offset = value.offset();
            System.out.println("partition-" + key.partition() + "  |offset-" + offset);
            System.out.println();

            consumer.assign(Collections.singletonList(key));
            consumer.seek(key,offset);

        }
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/671234.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号