栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

java连接kafka查询对应topic消费具体信息

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

java连接kafka查询对应topic消费具体信息

首先需要创建maven项目导入坐标


            org.apache.kafka
            kafka_2.11
            0.10.2.1
        

接下来,直接上代码

package com.example.demo;


import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.junit.Test;

import java.util.*;
import java.util.concurrent.ExecutionException;


public class KafkaTest2 {
    @Test
    public void DemoTest() throws ExecutionException, InterruptedException {

        Properties properties=new Properties();
        //主机信息查看kafka/config对应目录下advertised.host.name
        properties.put("bootstrap.servers","对应的主机ip:端口号");
        //这个id在kafka目录下使用bin/kafka-consumer-groups.sh --zookeeper 对应的zookeeperip:2181 --list命令查看
        properties.put("group.id", "test");
        
        properties.put("enable.auto.commit", "true");
        
        properties.put("auto.commit.interval.ms", "1000");
        
        properties.put("auto.offset.reset", "earliest");
        
        properties.put("session.timeout.ms", "30000");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);

        kafkaConsumer.assign(Arrays.asList(new TopicPartition("对应的topic",0)));
        
//        kafkaConsumer.subscribe(Arrays.asList("mt1"));
        Map> stringListMap = kafkaConsumer.listTopics();

        System.out.println(stringListMap.get("对应的topic").stream());
        ConsumerRecords records = kafkaConsumer.poll(100);
        //查询最新的topic信息数量
        records.count();
//  查询topic具体内容
//        for (ConsumerRecord record : records) {
//            //Thread.sleep(1000);
//            System.out.printf("offset = %d, value = %s", record.offset(), record.value());
//            System.out.println();
//        }
//如果想实时获取可以使用while
		//        while (true) {
//            ConsumerRecords records = kafkaConsumer.poll(100);
//            for (ConsumerRecord record : records) {
//                //Thread.sleep(1000);
//                System.out.printf("offset = %d, value = %s", record.offset(), record.value());
//                System.out.println();
//            }
//        }
    }

}

另外,查询所有的topic可以在kafka目录下使用bin/kafka-topics.sh --zookeeper 对应的zookeeperip:2181 --list
也可以使用java代码:

        ZkUtils zkUtils = ZkUtils.apply("对应的zookeeperip:2181", 30000, 30000, JaasUtils.isZkSecurityEnabled());
        Map topics = JavaConverters.mapAsJavaMapConverter(AdminUtils.fetchAllTopicConfigs(zkUtils))
                .asJava();
        for (Map.Entry entry : topics.entrySet()) {
            String key = entry.getKey();
            Object value = entry.getValue();
            System.out.println(key + ":" + value);
        }
        zkUtils.close();
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/345250.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号