栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Kafka消费者读取数据

Kafka消费者读取数据

Consumer测试类

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.Properties;

public class Consumer {
    private static Logger logger = LoggerFactory.getLogger("Consumer");
    public static void main(String[] args) {
	    // 1.创建消费者配置
        Properties properties = new Properties();
        properties.put("bootstrap.servers","ip:port");
        // 消费者组
        properties.put("group.id","test");
        // 自动提交offset
        properties.put("enable.auto.commit","true");
        // 自动提交offset的时间间隔
        properties.put("enable.commit.interval.ms","1000");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // 安全认证
        properties.setProperty("security.protocol", "SASL_PLAINTEXT");
        properties.setProperty("sasl.mechanism", "PLAIN");
        String jassc = "org.apache.kafka.common.security.plain.PlainLoginModule requiredn"
                + "username="" + "用户名" + ""n"
                + "password="" + "密码" + "";";
        properties.setProperty("sasl.jaas.config", jassc);
        // 创建消费者
        KafkaConsumer consumer = new KafkaConsumer(properties);
        // 订阅Topic
        consumer.subscribe(Collections.singletonList("net_vehicles_position"));

        while(true){
            ConsumerRecords records = consumer.poll(100);
            for(ConsumerRecord record : records){
                String topic = record.topic();
                long offset = record.offset();
                String key = record.key();
                String value = record.value();
                logger.info(topic + "t" + offset + "t" + key + "t" + value);

            }
        }
    }
}

maven依赖


        
            org.apache.kafka
            kafka_2.12
            1.0.0
            provided
        

        
            org.apache.kafka
            kafka-clients
            1.0.0
        

        
            org.apache.kafka
            kafka-streams
            1.0.0
        


        
            log4j
            log4j
            1.2.17
        
        
            org.slf4j
            slf4j-api
            1.7.25
        
        
            org.slf4j
            slf4j-log4j12
            1.7.25
            test
        
        
            org.slf4j
            slf4j-simple
            1.7.25
            test
        
    
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/751243.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号