栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Java-Kafka笔记

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Java-Kafka笔记

Kafka设置偏移量

        Kafka的偏移量其实是顺序递增,简单理解就是自增id,从0开始。

        Kafka一般不会重复消费的,因为Kafka的broke会记录每个消费者组的消费offset,每次开启消费的时候,会从这个offset开始消费。

        Kafka如何消费已经消费过的数据

        办法1:开启另一个消费者组,进行消费。因为新的消费者组的offset肯定是从0开始的。

        办法2:在这个消费者组中,重新设置偏移量进行消费。

        还有其他的啥办法不,这个我不知道了。就先记到这里吧。

    public static void main(String[] args) {
        Properties properties = new Properties();

        properties.put("bootstrap.servers", "173.14.22.49:9092");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //设置消费的偏移量,如果以前消费过则接着消费,如果没有就从头开始消费
        properties.put("auto.offset.reset", "earliest");
        properties.put("enable.auto.commit", false);

        //消费者组
        properties.put("group.id", "he1qdaqwq2iow_1234");
        KafkaConsumer consumer = new KafkaConsumer(properties);
        
        //注意一定不要用订阅模式
        //consumer.subscribe(Collections.singleton("topicName"));
   

        TopicPartition topicPartition = new TopicPartition("topicName", 0);

        consumer.assign(Arrays.asList(new TopicPartition("topicName",0),new TopicPartition("topicName",1)));
        consumer.seek(new TopicPartition("topicName",0), 6);



        while (flag) {
            ConsumerRecords records = consumer.poll(Duration.ofMillis(3000));
            System.out.println("Get record size : " + records.count());
            for (ConsumerRecord record : records) {
                // 循环打印消息记录
                //处理消息
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                consumer.commitAsync();
            }

            try {
                Thread.sleep(3000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        consumer.close();
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/705123.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号