最近自己学习Kafka,了解最简单的Demo
,结果.遇上生产端一直运行无法发送消息,最后看日志,提示连接被拒绝
上网找了半天,说这样,说那样,可我只想要,任意IP,访问,撤了半天,最后根据网上资料,实现了
修改
server.properties
将其中的配置修改为如下,其中arm-64,是kafka程序安装的主机名称
############################# Socket Server Settings ############################# # The address the socket server listens on. It will get the value returned from # java.net.InetAddress.getCanonicalHostName() if not configured. # FORMAT: # listeners = listener_name://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 #listeners=PLAINTEXT://:9092 listeners=PLAINTEXT://:9092 # Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for "listeners" if configured. Otherwise, it will use the value # returned from java.net.InetAddress.getCanonicalHostName(). #advertised.listeners=PLAINTEXT://your.host.name:9092 advertised.listeners=PLAINTEXT://arm-64:9092
同时也附上测试代码
生产端
package com.example.demo_kafka2;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
//生产者
public class Demo1 {
public static void main(String[] args) {
//1.kafka的配置信息
Properties properties = new Properties();
//kafka的连接地址
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.147.17.93:9092");
//发送失败,失败的重试次数
properties.put(ProducerConfig.RETRIES_CONFIG,5);
//消息key的序列化器
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
//消息value的序列化器
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
//2.生产者对象
KafkaProducer producer = new KafkaProducer(properties);
//封装发送的消息
ProducerRecord record = new ProducerRecord("IT-topic","100001","hello kafka");
//3.发送消息
producer.send(record);
//4.关闭消息通道,必须关闭,否则消息发送不成功
producer.close();
}
}
消费端
package com.example.demo_kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class ConsumerQuickStart {
public static void main(String[] args) {
//1.kafka的配置信息
Properties properties = new Properties();
//kafka的连接地址
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.147.17.93:9092");
//消费组
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");
//消息反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//消费者对象
KafkaConsumer consumer = new KafkaConsumer(properties);
//订阅主题
consumer.subscribe(Collections.singletonList("IT-topic"));
//保持线程
while (true){
ConsumerRecords consumerRecords = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord consumerRecord : consumerRecords) {
System.out.println(consumerRecord.key());
System.out.println(consumerRecord.value());
}
}
}
}
Maven配置
4.0.0 org.springframework.boot spring-boot-starter-parent2.6.4 com.example demo_kafka20.0.1-SNAPSHOT demo_kafka2 demo_kafka2 1.8 org.springframework.boot spring-boot-starterorg.springframework.boot spring-boot-starter-testtest org.springframework.kafka spring-kafka-testtest org.springframework.kafka spring-kafkaorg.springframework.boot spring-boot-maven-plugin



