JDK1.8
Linux下安装好kafka,这里使用的是版本是kafka_2.11-2.4.1
Windows下安装好IDEA
步骤新建Maven工程
pom.xml依赖
org.apache.kafka kafka-clients2.4.1
新建包org.example
新建类:SimpleProducer.java、SimpleConsumer.java
生产者代码SimpleProducer.java
package org.example;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class SimpleProducer {
public static void main(String[] args) {
String topicName = "Hello-Kafka";
Properties props = new Properties();
props.put("bootstrap.servers", "node1:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms",1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer
消费者代码SimpleConsumer.java
package org.example;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class SimpleConsumer {
public static void main(String[] args) {
String topicName = "Hello-Kafka";
Properties props = new Properties();
props.put("bootstrap.servers","node1:9092");
props.put("group.id","test");
props.put("enable.auto.commit","true");
props.put("auto.commit.interval.ms","1000");
props.put("session.timeout.ms","30000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");//注意是StringDeserializer
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer
提示:相关属性的理解,可以查看kafka文档:Kafka 中文文档 - ApacheCN
测试
创建主题Hello-Kafka
[hadoop@node1 ~]$ kafka-topics.sh --create --zookeeper localhost:2181/kafka --replication-factor 1 --partitions 1 --topic Hello-Kafka
查看主题
[hadoop@node1 ~]$ kafka-topics.sh --list --zookeeper localhost:2181/kafka
运行消费者SimpleConsumer.java
此时如果有生产者生产数据,在消费者控制台将看到信息输出
运行生产者SimpleProducer.java
生产者发送消息后,消费者控制台将有如下输出
注意:截图offset不是从0开始,因为截图时不是第一次运行生产者代码。
在命令行生产消息
[hadoop@node1 kafka]$ kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafka >hello >world
消费者终端输出
offset = 30, key = null, value = hello offset = 31, key = null, value = world
demo代码:kafka-api-demo: kafka-api-demo
完成!enjoy it!



