栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

kafka的demo踩坑01

kafka的demo踩坑01

kafka的demo踩坑

最近在写涉及kafka的web项目,首先,写了个demo练练手,但是出师不利,上来就给我报错哈哈哈,报错好,那我们就去解决他吧!

1. 创建Order类
public class Order {

    private Long orderId;
    private int count;

    public Order(Long orderId, int count) {
        this.orderId = orderId;
        this.count = count;
    }

    public Order() {
    }

    public Long getOrderId() {
        return orderId;
    }

    public void setOrderId(Long orderId) {
        this.orderId = orderId;
    }

    public int getCount() {
        return count;
    }

    public void setCount(int count) {
        this.count = count;
    }
}

2. 导入依赖

        
            com.alibaba
            fastjson
            1.2.47
        
        
        
            org.apache.kafka
            kafka-clients
            2.4.1
        
    
3. 生产者demo
// 1. topic名
private static final String TOPIC_NAME = "my-replicated-topic";

public class MyProducerDemo01 {
    public static void main(String[] args) {
        // 2. 配置
        Properties properties = new Properties();
        // 连接kafkaIP
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.100.108:9002,192.168.100.108:9003,192.168.100.108:9004");
        // 序列化器
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 设置acks
        properties.put(ProducerConfig.ACKS_CONFIG,"0");
        // 3. 创建一个要发送的消息对象,并带上配置
        Producer producer = new KafkaProducer(properties);

        // 4. 封装消息对象
        // 生成order对象
        Order order = new Order(1001L,2);
        // 封装
        ProducerRecord producerRecord = new ProducerRecord(TOPIC_NAME,
                String.valueOf(order.getOrderId()),
                JSON.toJSONString(order));
        try {
            Recordmetadata metadata = producer.send(producerRecord).get();
            System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-"
                    + metadata.partition() + "|offset-" + metadata.offset());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}

4. 运行 5. 报错(23333…)

"C:Program FilesJavajdk1.8.0_301binjava.exe" "-javaagent:D:IntelliJ IDEA 2019.3.1libidea_rt.jar=55245:D:IntelliJ IDEA 2019.3.1bin" -Dfile.encoding=UTF-8 -classpath "C:Program FilesJavajdk1.8.0_301jrelibcharsets.jar;C:Program FilesJavajdk1.8.0_301jrelibdeploy.jar;C:Program FilesJavajdk1.8.0_301jrelibextaccess-bridge-64.jar;C:Program FilesJavajdk1.8.0_301jrelibextcldrdata.jar;C:Program FilesJavajdk1.8.0_301jrelibextdnsns.jar;C:Program FilesJavajdk1.8.0_301jrelibextjaccess.jar;C:Program FilesJavajdk1.8.0_301jrelibextjfxrt.jar;C:Program FilesJavajdk1.8.0_301jrelibextlocaledata.jar;C:Program FilesJavajdk1.8.0_301jrelibextnashorn.jar;C:Program FilesJavajdk1.8.0_301jrelibextsunec.jar;C:Program FilesJavajdk1.8.0_301jrelibextsunjce_provider.jar;C:Program FilesJavajdk1.8.0_301jrelibextsunmscapi.jar;C:Program FilesJavajdk1.8.0_301jrelibextsunpkcs11.jar;C:Program FilesJavajdk1.8.0_301jrelibextzipfs.jar;C:Program FilesJavajdk1.8.0_301jrelibjavaws.jar;C:Program FilesJavajdk1.8.0_301jrelibjce.jar;C:Program FilesJavajdk1.8.0_301jrelibjfr.jar;C:Program FilesJavajdk1.8.0_301jrelibjfxswt.jar;C:Program FilesJavajdk1.8.0_301jrelibjsse.jar;C:Program FilesJavajdk1.8.0_301jrelibmanagement-agent.jar;C:Program FilesJavajdk1.8.0_301jrelibplugin.jar;C:Program FilesJavajdk1.8.0_301jrelibresources.jar;C:Program FilesJavajdk1.8.0_301jrelibrt.jar;D:IdeaProjectskafka-demotargetclasses;D:mavenRepMavencomalibabafastjson1.2.47fastjson-1.2.47.jar;D:mavenRepMavenorgapachekafkakafka-clients2.4.1kafka-clients-2.4.1.jar;D:mavenRepMavencomgithublubenzstd-jni1.4.3-1zstd-jni-1.4.3-1.jar;D:mavenRepMavenorglz4lz4-java1.6.0lz4-java-1.6.0.jar;D:mavenRepMavenorgxerialsnappysnappy-java1.1.7.3snappy-java-1.1.7.3.jar;D:mavenRepMavenorgslf4jslf4j-api1.7.28slf4j-api-1.7.28.jar" com.hao.kafka.MyProducerDemo01
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Topic my-replicated-topic not present in metadata after 60000 ms.
	at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.(KafkaProducer.java:1299)
	at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:963)
	at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:865)
	at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:752)
	at com.hao.kafka.MyProducerDemo01.main(MyProducerDemo01.java:37)
Caused by: org.apache.kafka.common.errors.TimeoutException: Topic my-replicated-topic not present in metadata after 60000 ms.

Process finished with exit code 0

亲爱的,终于解决了~!!!
效果如下

step1. kafka集群中,把集群启动

step2. zk中查kafka节点配置

step3.修改代码中的配置ip

step4.成功!!!完结撒花
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/761749.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号