栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

linux本地连接kafka生产者和消费者可以成功但是通过javaAPI操作生产者发送消息不成功报错org.apache.kafka.common.errors.TimeoutException

linux本地连接kafka生产者和消费者可以成功但是通过javaAPI操作生产者发送消息不成功报错org.apache.kafka.common.errors.TimeoutException

通过命令行工具连接服务器本地操作(kafka-console-producer.sh和kafka-console-consumer.sh)是能够相互通信的,producer发布的信息consumer能够接收到。

但是
java通过kafka-client的API写的代码始终不能跟kafka通信:java producer的消息发不出去, java comsumer也收不到任何消息。
仔细检查了下代码中IP、端口都没有写错。
服务器的防火墙也是关闭的。

解决办法

将kafka/config/server.properties文件中advertised.listeners改为如下属性。
192.168.32.XXX是我虚拟机的IP。

advertised.listeners=PLAINTEXT://192.168.32.XXX:9092

修改后消费者的命令监控的ip也要从loacalhost改为192.168.32.XXX

 kafka-console-consumer.sh --bootstrap-server 192.168.32.XXX:9092 --topic topic_1 --from-beginning

不然也无法接收到消息`在

      {
        Map configs = new HashMap<>();
//        设置连接Kafka的初始连接⽤到的服务器地址
        configs.put("bootstrap.servers","192.168.32.129:9092");
//        设置key的序列化器
        configs.put("key.serializer", IntegerSerializer.class);
//        设置value的序列化器
        configs.put("value.serializer", StringSerializer.class);
        
//        configs.put("acks","all");
//        
//        configs.put("reties","3");


        KafkaProducer producer=new KafkaProducer(configs);

        List
headers = new ArrayList<>(); headers.add(new RecordHeader("biz.name","producer.demo".getBytes())); ProducerRecord record = new ProducerRecord( "topic_1", // 主题名称 0, // 分区编号,现在只有⼀个分区,所以是0 0, // 数字作为key "message 0", // 字符串作为value headers ); //消息同步确认 final Future future = producer.send(record); final Recordmetadata metadata = future.get(); System.out.println("主题"+ metadata.topic()); System.out.println("分区" + metadata.partition()); System.out.println("偏移量" + metadata.offset()); //关闭生产者 producer.close(); }

成功后


idea返回结果

消费者接受发了两次message 0

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/487135.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号