栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Kafka中消费者的基本实现

Kafka中消费者的基本实现

Maven必须配置


    org.apache.kafka
    kafka-clients
    2.4.1

Java实现

package com.qf.kafka;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class MyConsumer {

    private final static String TOPIC_NAME = "my-replicated-topic";
    private final static String CONSUMER_GROUP_NAME = "testGroup";

    public static void main(String[] args) {
        // 1.设置参数
        // 使用 Properties对象 存放键值对(参数配置信息)
        Properties props = new Properties();
        // 键值对中存入 bootstrap_server => kafka节点或集群(用逗号隔开)
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.232.136:9092,192.168.232.136:9093,192.168.232.136:9094");
        // 设置消费组名
        props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
        // 把需要发送的key从字符串序列化为字节数组
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 把需要发送的value从字符串序列化为字节数组
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        // 2.创建消费者的客户端,传入参数(默认是最新+1开始消费)
        KafkaConsumer consumer = new KafkaConsumer(props);

        // 3.消费者订阅主题列表
        consumer.subscribe(Arrays.asList(TOPIC_NAME));

     
        while (true){
            // 4.poll消息
            ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));
            // 5.循环打印获取的消息
            for(ConsumerRecord record: records){
                System.out.printf("收到消息:partition = %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
            }
        }
    }
}

手动同步提交offset

public class MyConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        // 设置手动提交offset
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

        KafkaConsumer consumer = new KafkaConsumer<>(props);

        consumer.subscribe(Arrays.asList(TOPIC_NAME));

        while (true){
            ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));

            // 代码逻辑...

            //手动同步提交
            if (records.count() > 0){
                // 当前线程会阻塞直到offset提交成功
                // 一般使用同步提交,因为提交之后一般也没有什么逻辑代码了
                consumer.commitSync();
            }
        }
    }
}

手动异步提交offset

public class MyConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        // 设置手动提交offset
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

        KafkaConsumer consumer = new KafkaConsumer<>(props);

        consumer.subscribe(Arrays.asList(TOPIC_NAME));

        while (true){
            ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));
       
            // 代码逻辑...

            //手动异步提交
            if (records.count() > 0){
                consumer.commitAsync(new OffsetCommitCallback() {
                    @Override
                    public void onComplete(Map map, Exception e) {
                        if (e != null){
                            System.err.println("Commit failed for " + map);
                            System.err.println("Commit failed exception: " + e.getStackTrace());
                        }
                    }
                });
            }
        }
    }
}

指定分区消费

// 替换第3步,指定该主题0号分区进行消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));

消息回溯消费

// 替换第3步,消息回溯消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));

指定offset消费

// 替换第3步,指定offset消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);

从指定时间点消费

// 替换第3步,从指定时间点开始消费

// 1.获取主题下所有分区
List topicPartitions = consumer.partitionsFor(TOPIC_NAME);
// 2.获取一小时起的时间点
long fetchDataTime = new Date().getTime() - 1000 * 60 * 60;
// 3. 创建一个hashMap
HashMap map = new HashMap<>();
// 4. 循环所有分区,将分区和时间点(分区=>1小时前时间点)存入hashMap中
for(PartitionInfo par: topicPartitions)
map.put(new TopicPartition(TOPIC_NAME, par.partition()), fetchDataTime);
// 5.根据分区和时间hashMap,拿到分区和偏移量(分区对象->偏移量对象)存入map中
Map parMap = consumer.offsetsForTimes(map);
// 6.遍历键值对(key分区对象 -> value偏移量对象)
for(Map.Entry entry: parMap.entrySet()){
        // 7. 获取键(分区对象)
        TopicPartition key = entry.getKey();
        // 8. 获取值(偏移量对象)
        OffsetAndTimestamp value = entry.getValue();
        // 9. 校验,为空则跳过
        if (key==null || value==null) continue;
        // 10.获取实际偏移量
        long offset = value.offset();
        // 11.打印分区和偏移量
        System.out.println("partition-" + key.partition() + "|offset-" + offset);
        System.out.println();
        // 12.偏移量不为Null则进入设置
        if (value != null){
            // 13.设置指定偏移量开始消费
            consumer.assign(Arrays.asList(key));
            consumer.seek(key, offset);
        }
}

 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/663318.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号