栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

java连接kafka实现生产者消费者功能

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

java连接kafka实现生产者消费者功能

一、功能描述

利用Java连接Kafka,通过API实现生产者和消费者,对于Kafka生产或者消费数据。将日志信息进行输出。

二、依赖导入

首先,创建一个简单的maven的工程并将依赖导入


	org.apache.kafka
	kafka-clients
	${kafka.version}


	log4j
	log4j
	1.2.17


	org.slf4j
	slf4j-log4j12
	1.7.33

三、日志配置
#指定log4j的输出信息
log4j.rootLogger=INFO, stdout, logfile
#指定log4j的标准输出
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
#指定log4j的标准输出的样式
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
#指定标准输出的转换的格式
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
#指定日志文件的输出
log4j.appender.logfile=org.apache.log4j.FileAppender
#指定log4j的输出路径文件名
log4j.appender.logfile.File=log/hd.log
#指定日志日志输出样式
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
#指定日志文件的转换格式
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
四、基于Zookeeper的消费者
//进行导包
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;

public class ZkConsumer {
    public static void main(String[] args) {
        //初始化配置信息
        Properties config = new Properties();
        //定义连接的主机信息,相当于kafka脚本命令的--bootstrap-server
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"single01:9092");
        //定义分组信息,相当于kafka脚本命令的-group
        config.put(ConsumerConfig.GROUP_ID_CONFIG,"kb16");
        //定义数据偏移量配置,配置信息有:earliest、latest、none和anything else四种配置
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        //定义自动提交时间,时间单位为ms
        config.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,500);
        //定义是否开启自动提交
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
        //定义消费者的键的反序列化的配置
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.IntegerDeserializer");
        //定义消费者的值的反序列化配置
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");
        //初始化存放消费者的队列
        KafkaConsumer consumer=new KafkaConsumer<>(config);
        //订阅主题
        consumer.subscribe(Arrays.asList("kb16-test02"));
        //循环遍历进行数据获取
        while(true){
            //迭代器遍历消费者数据
            Iterator> it = consumer.poll(Duration.ofMillis(500)).iterator();
            //如果还有数据
            if(it.hasNext()) {
                //遍历消费者数据,并数据拼接起来
                do {
                    ConsumerRecord record = it.next();
                    StringBuilder builder = new StringBuilder();
                    builder.append(record.topic());
                    builder.append("t");
                    builder.append(record.partition());
                    builder.append("t");
                    builder.append(record.offset());
                    builder.append("t");
                    builder.append(record.timestamp());
                    builder.append("t");
                    builder.append(record.key());
                    builder.append("t");
                    builder.append(record.value());
                    builder.append("t");
                    System.out.println(builder.toString());
                } while (it.hasNext());
            }
        }
        //consumer.close();
    }
}
五、基于Zookeeper的生产者
//导包
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
public class ZkProducer {
    public static void main(String[] args) {
        //初始化配置
        Properties config = new Properties();
        //定义连接的主机信息,相当于kafka脚本命令的--bootstrap-server
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"single01:9092");
        //定义批次大小信息
        config.put(ProducerConfig.BATCH_SIZE_CONFIG,5);
        //生产者将在请求传输之间到达的任何记录组合成一个批处理请求。
        config.put(ProducerConfig.LINGER_MS_CONFIG,1000);
        //定义确认策略,配置信息有:0、1和all,默认一般为all
        config.put(ProducerConfig.ACKS_CONFIG,"all");
        //定义失败重试的次数
        config.put(ProducerConfig.RETRIES_CONFIG,3);
        //producer -Event Stream->kafka server(java object)
        //定义生产者键的serialization序列化
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.IntegerSerializer");
        //定义生产者的值的序列化
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
		//初始化生产者队列
        KafkaProducer producer = new KafkaProducer(config);
		//定义主题
        final String TOPIC="kb16-test02";
        //定义偏移量
        final int PART=0;
        for (int i = 0; i < 100; i++) {
            //传入数据进行封装
            ProducerRecord record =
                    new ProducerRecord<>(TOPIC,PART,System.currentTimeMillis(),i,"happy new year"+i);
            //向kafka发送数据
            producer.send(record);
        }
        //关闭生产者
        producer.close();
    }

}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/782770.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号