栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Kafka: 按partition同步手动提交offset

Kafka: 按partition同步手动提交offset

package com.cisdi.dsp.modules.metaAnalysis.rest;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.*;
import java.util.function.Consumer;

public class ManualSubmitOffsetByPartition {
    public static void main(String[] args) {
        //定义topic
        String topic="testTopic2";
        //定义broker
        String server="localhost:9092";
        //定义消费者组
        String group="consumerGroupTest2";

        //定义Properties对象来构建kafka Consumer
        Properties properties=new Properties();
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,group);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,server);
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);

        //构建Kafka Consumer
        KafkaConsumer myConsumer=new KafkaConsumer<>(properties);
        //订阅topic
        myConsumer.subscribe(Arrays.asList(topic));
        try{
            while(true){
                //每隔2秒从服务器获取消息
                ConsumerRecords records=myConsumer.poll(Duration.ofMillis(2000));
                //从ConsumerRecords对象获取所有的TopicPartition集合
                Set partitions = records.partitions();
                //遍历TopicPartition集合
                for(TopicPartition topicPartition: partitions){
                    //获取收到的消息中属于某个partition的所有消息记录
                    List> recordList = records.records(topicPartition);
                    //消费消息
                    recordList.forEach(new Consumer>() {
                        @Override
                        public void accept(ConsumerRecord stringStringConsumerRecord) {
                            System.out.println(stringStringConsumerRecord.value());
                        }
                    });
                    //获取某个partition中最大的消息offset
                    long latestOffsetInOneTopicPartition=recordList.get(recordList.size()-1).offset();
                    //提交某个partition的消费offset
                    myConsumer.commitSync(Collections.singletonMap(topicPartition,new OffsetAndmetadata(latestOffsetInOneTopicPartition+1)));
                }
            }
        }catch (Exception ex){
            myConsumer.close();
        }
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/761576.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号