栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Java操作Kafka收发消息demo

Java操作Kafka收发消息demo

通过Java程序来进行Kafka收发消息的演示

Kafka自身提供的Java客户端来演示消息的收发,与Kafka的Java客户端相关的Maven依赖如下:


        2.11
        1.7.21
        2.0.0
        1.18.8
        4.11
        2.2.4
        1.5.4
        2.3.1
    


    org.apache.kafka
    kafka-clients
    ${kafka.version}

创建生产者
package com.demo.kafkademo.ch1;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class ProducerFastStart {
    // Kafka集群地址
    private static final String brokerList = "192.168.33.129:9092";
    // 主题名称-之前已经创建
    private static final String topic = "topicone";
    public static void main(String[] args) {
        Properties properties = new Properties();
        // 设置key序列化器
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        //另外一种写法
        //properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 设置重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG, 10);
        // 设置值序列化器
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 设置集群地址
        properties.put("bootstrap.servers", brokerList);
        // KafkaProducer 线程安全
        KafkaProducer producer = new KafkaProducer<>(properties);
        ProducerRecord record = new ProducerRecord<>(topic, "Kafka-demo-001", "hello, Kafka!");
        try {
            producer.send(record);
            //Recordmetadata recordmetadata = producer.send(record).get();
            //System.out.println("part:" + recordmetadata.partition() + ";topic:" + recordmetadata.topic());
        } catch (Exception e) {
            e.printStackTrace();
        }
        producer.close();
    }
}

消费者
package com.demo.kafkademo.ch1;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;


public class ConsumerFastStart {
    // Kafka集群地址
    private static final String brokerList = "192.168.33.129:9092";
    // 主题名称-之前已经创建
    private static final String topic = "topicone";
    // 消费组
    private static
    final String groupId = "group.demo";

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("bootstrap.servers", brokerList);
        properties.put("group.id", groupId);

        KafkaConsumer consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Collections.singletonList(topic));

        while (true) {
            ConsumerRecords records =
                    consumer.poll(Duration.ofMillis(5000));
            for (ConsumerRecord record : records) {
                System.out.println(record.value());
            }
        }
    }
}

先启动消费端,再启动生产端进行消息的发送

附:

注意 : waring:使用java连接linux下kafka集群需要设置hosts绑定;
kafka 安装目录 config/server.properties 文件 其中 listeners=PLAINTEXT://:9092
改为listeners=PLAINTEXT://192.168.33.129:9092 (加上kafka服务所在虚拟机ip)
否则会出现异常: Connection to node 1 (localhost/127.0.0.1:9092) could not be established

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/487084.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号