首先已经安装的Java环境,并且安装了zookeeper服务注册中心。
下载https://kafka.apache.org//
当前最新版本3.1.0
在windows解压后主要关注的是config文件下的server.properties文件,修改:
broker.id=0 //只有一个kafka就不用改 log.dirs = "" //自己要存的目录 zookeeper.connect=localhost:2181//zookeeper没有改动的化默认就行启动
bin/windows/kafka-server-start.bat ./config/server.propertiesjava jar包
生产者org.apache.kafka kafka-clients 3.1.0
public class Producer {
public static void main(String[] args) throws InterruptedException {
Properties p = new Properties();
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");//kafka地址,多个地址用逗号分割
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
KafkaProducer kafkaProducer = new KafkaProducer<>(p);
int i = 0;
try {
while (true) {
String msg = "Hello," + i;
ProducerRecord record = new ProducerRecord("test", msg);
kafkaProducer.send(record);
//System.out.println("消息发送成功:" + msg);
Thread.sleep(2000);
i++;
}
} finally {
kafkaProducer.close();
}
}
}
消费者
public class Consumer {
public static void main(String[] args) {
Properties p = new Properties();
p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
p.put(ConsumerConfig.GROUP_ID_CONFIG, "duanjt_test");
KafkaConsumer kafkaConsumer = new KafkaConsumer(p);
kafkaConsumer.subscribe(Collections.singletonList("test"));// 订阅消息
while (true) {
ConsumerRecords records = kafkaConsumer.poll(Duration.ZERO.withSeconds(1));
for (ConsumerRecord record : records) {
System.out.println(String.format("topic:%s,offset:%d,消息:%s", //
record.topic(), record.offset(), record.value()));
}
}
}
}



