Kafka自身提供的Java客户端来演示消息的收发,与Kafka的Java客户端相关的Maven依赖如下:
创建生产者2.11 1.7.21 2.0.0 1.18.8 4.11 2.2.4 1.5.4 2.3.1 org.apache.kafka kafka-clients ${kafka.version}
package com.demo.kafkademo.ch1;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class ProducerFastStart {
// Kafka集群地址
private static final String brokerList = "192.168.33.129:9092";
// 主题名称-之前已经创建
private static final String topic = "topicone";
public static void main(String[] args) {
Properties properties = new Properties();
// 设置key序列化器
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//另外一种写法
//properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 设置重试次数
properties.put(ProducerConfig.RETRIES_CONFIG, 10);
// 设置值序列化器
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 设置集群地址
properties.put("bootstrap.servers", brokerList);
// KafkaProducer 线程安全
KafkaProducer producer = new KafkaProducer<>(properties);
ProducerRecord record = new ProducerRecord<>(topic, "Kafka-demo-001", "hello, Kafka!");
try {
producer.send(record);
//Recordmetadata recordmetadata = producer.send(record).get();
//System.out.println("part:" + recordmetadata.partition() + ";topic:" + recordmetadata.topic());
} catch (Exception e) {
e.printStackTrace();
}
producer.close();
}
}
消费者
package com.demo.kafkademo.ch1;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class ConsumerFastStart {
// Kafka集群地址
private static final String brokerList = "192.168.33.129:9092";
// 主题名称-之前已经创建
private static final String topic = "topicone";
// 消费组
private static
final String groupId = "group.demo";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("bootstrap.servers", brokerList);
properties.put("group.id", groupId);
KafkaConsumer consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList(topic));
while (true) {
ConsumerRecords records =
consumer.poll(Duration.ofMillis(5000));
for (ConsumerRecord record : records) {
System.out.println(record.value());
}
}
}
}
先启动消费端,再启动生产端进行消息的发送
附:注意 : waring:使用java连接linux下kafka集群需要设置hosts绑定;
kafka 安装目录 config/server.properties 文件 其中 listeners=PLAINTEXT://:9092
改为listeners=PLAINTEXT://192.168.33.129:9092 (加上kafka服务所在虚拟机ip)
否则会出现异常: Connection to node 1 (localhost/127.0.0.1:9092) could not be established



