最近在写涉及kafka的web项目,首先,写了个demo练练手,但是出师不利,上来就给我报错哈哈哈,报错好,那我们就去解决他吧!
1. 创建Order类public class Order {
private Long orderId;
private int count;
public Order(Long orderId, int count) {
this.orderId = orderId;
this.count = count;
}
public Order() {
}
public Long getOrderId() {
return orderId;
}
public void setOrderId(Long orderId) {
this.orderId = orderId;
}
public int getCount() {
return count;
}
public void setCount(int count) {
this.count = count;
}
}
2. 导入依赖
3. 生产者democom.alibaba fastjson 1.2.47 org.apache.kafka kafka-clients 2.4.1
// 1. topic名
private static final String TOPIC_NAME = "my-replicated-topic";
public class MyProducerDemo01 {
public static void main(String[] args) {
// 2. 配置
Properties properties = new Properties();
// 连接kafkaIP
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.100.108:9002,192.168.100.108:9003,192.168.100.108:9004");
// 序列化器
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 设置acks
properties.put(ProducerConfig.ACKS_CONFIG,"0");
// 3. 创建一个要发送的消息对象,并带上配置
Producer producer = new KafkaProducer(properties);
// 4. 封装消息对象
// 生成order对象
Order order = new Order(1001L,2);
// 封装
ProducerRecord producerRecord = new ProducerRecord(TOPIC_NAME,
String.valueOf(order.getOrderId()),
JSON.toJSONString(order));
try {
Recordmetadata metadata = producer.send(producerRecord).get();
System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-"
+ metadata.partition() + "|offset-" + metadata.offset());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
4. 运行
5. 报错(23333…)
"C:Program FilesJavajdk1.8.0_301binjava.exe" "-javaagent:D:IntelliJ IDEA 2019.3.1libidea_rt.jar=55245:D:IntelliJ IDEA 2019.3.1bin" -Dfile.encoding=UTF-8 -classpath "C:Program FilesJavajdk1.8.0_301jrelibcharsets.jar;C:Program FilesJavajdk1.8.0_301jrelibdeploy.jar;C:Program FilesJavajdk1.8.0_301jrelibextaccess-bridge-64.jar;C:Program FilesJavajdk1.8.0_301jrelibextcldrdata.jar;C:Program FilesJavajdk1.8.0_301jrelibextdnsns.jar;C:Program FilesJavajdk1.8.0_301jrelibextjaccess.jar;C:Program FilesJavajdk1.8.0_301jrelibextjfxrt.jar;C:Program FilesJavajdk1.8.0_301jrelibextlocaledata.jar;C:Program FilesJavajdk1.8.0_301jrelibextnashorn.jar;C:Program FilesJavajdk1.8.0_301jrelibextsunec.jar;C:Program FilesJavajdk1.8.0_301jrelibextsunjce_provider.jar;C:Program FilesJavajdk1.8.0_301jrelibextsunmscapi.jar;C:Program FilesJavajdk1.8.0_301jrelibextsunpkcs11.jar;C:Program FilesJavajdk1.8.0_301jrelibextzipfs.jar;C:Program FilesJavajdk1.8.0_301jrelibjavaws.jar;C:Program FilesJavajdk1.8.0_301jrelibjce.jar;C:Program FilesJavajdk1.8.0_301jrelibjfr.jar;C:Program FilesJavajdk1.8.0_301jrelibjfxswt.jar;C:Program FilesJavajdk1.8.0_301jrelibjsse.jar;C:Program FilesJavajdk1.8.0_301jrelibmanagement-agent.jar;C:Program FilesJavajdk1.8.0_301jrelibplugin.jar;C:Program FilesJavajdk1.8.0_301jrelibresources.jar;C:Program FilesJavajdk1.8.0_301jrelibrt.jar;D:IdeaProjectskafka-demotargetclasses;D:mavenRepMavencomalibabafastjson1.2.47fastjson-1.2.47.jar;D:mavenRepMavenorgapachekafkakafka-clients2.4.1kafka-clients-2.4.1.jar;D:mavenRepMavencomgithublubenzstd-jni1.4.3-1zstd-jni-1.4.3-1.jar;D:mavenRepMavenorglz4lz4-java1.6.0lz4-java-1.6.0.jar;D:mavenRepMavenorgxerialsnappysnappy-java1.1.7.3snappy-java-1.1.7.3.jar;D:mavenRepMavenorgslf4jslf4j-api1.7.28slf4j-api-1.7.28.jar" com.hao.kafka.MyProducerDemo01 SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Topic my-replicated-topic not present in metadata after 60000 ms. at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.(KafkaProducer.java:1299) at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:963) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:865) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:752) at com.hao.kafka.MyProducerDemo01.main(MyProducerDemo01.java:37) Caused by: org.apache.kafka.common.errors.TimeoutException: Topic my-replicated-topic not present in metadata after 60000 ms. Process finished with exit code 0
亲爱的,终于解决了~!!!
效果如下



