栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Kafka 消费特定时间戳之后的消息

Kafka 消费特定时间戳之后的消息

文章目录
    • 前言
    • 代码实现

前言

我们都知道 kafka 可以根据制定的分区和偏移量来消费。但是最近碰到一个需求,需要把之前一周的消息都拉出来做分析,那么就要根据时间戳来进行消费。

代码实现
    public void seekBeforeTimestamp() {
    	// 初始化 kafka
        KafkaConsumer consumer = init();
        
        Set assignment = new HashSet<>();
        // 在poll()方法内部执行分区分配逻辑,该循环确保分区已被分配。
        // 当分区消息为0时进入此循环,如果不为0,则说明已经成功分配到了分区。
        while (assignment.size() == 0) {
            consumer.poll(Duration.ofMillis(1000));
            // assignment()方法是用来获取消费者所分配到的分区消息的
            assignment = consumer.assignment();
        }

        System.out.println("assignment.size() = " + assignment.size());

        Map timestampToSearch = new HashMap<>();
        for (TopicPartition tp : assignment) {
            // 设置查询分区时间戳的条件:获取当前时间前周之后的消息
            timestampToSearch.put(tp, System.currentTimeMillis() - 7 * 24 * 3600 * 1000);
        }
        
        Map offsets = consumer.offsetsForTimes(timestampToSearch);

        for (TopicPartition tp : assignment) {
            // 获取该分区的offset以及timestamp
            OffsetAndTimestamp offsetAndTimestamp = offsets.get(tp);
            // 如果offsetAndTimestamp不为null,则证明当前分区有符合时间戳条件的消息
            if (offsetAndTimestamp != null) {
                consumer.seek(tp, offsetAndTimestamp.offset());
            }
        }
    }

上面代码中的 init() 方法为初始化 kafka 配置的方法,此处略,大家根据自己的需求配置初始化参数即可。这里就说明一下,如果要订阅 Topic 中的全部分区的实现方法。

        // 订阅全部分区
        List partitions = Lists.newArrayList();
        List partitionInfos = consumer.partitionsFor(KAFKA_TOPIC);
        System.out.println("partitionInfos.size() = " + partitionInfos.size());
        if (partitionInfos.size() > 0) {
            for (PartitionInfo info : partitionInfos) {
                partitions.add(new TopicPartition(info.topic(), info.partition()));
            }
        }
        consumer.assign(partitions);
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/671796.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号