栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Java编程操作Kafka

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Java编程操作Kafka

目录

 

5. Java编程操作Kafka

5.1 同步生产消息到Kafka中

5.1.1 需求

5.1.2 准备工作

5.1.3 代码开发

 5.2 从Kafka的topic中消费消息

5.2.1 需求

5.2.2 准备工作

5.2.3 开发步骤

5.3 异步使用带有回调函数方法生产消息


Kafka入门及进阶汇总 

5. Java编程操作Kafka

5.1 同步生产消息到Kafka中

5.1.1 需求

接下来,我们将编写Java程序,将1-100的数字消息写入到Kafka中。

5.1.2 准备工作

5.1.2.1 导入Maven Kafka POM依赖


    
        central
        http://maven.aliyun.com/nexus/content/groups/public//
        
            true
        

        
            true
            always
            fail
        

    




    
    
        org.apache.kafka
        kafka-clients
        2.4.1
    


    
    
        org.apache.commons
        commons-io
        1.3.2
    


    
    
        org.slf4j
        slf4j-log4j12
        1.7.6
    


    
    
        log4j
        log4j
        1.2.16
    




    
        
            org.apache.maven.plugins
            maven-compiler-plugin
            3.7.0
            
                1.8
                1.8
            

        

    

5.1.2.2 导入log4j.properties

将log4j.properties配置文件放入到resources文件夹中

log4j.rootLogger=INFO,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p - %m%n

5.1.2.3 创建包和类

创建包cn.itcast.kafka,并创建KafkaProducerTest类。

5.1.3 代码开发

可以参考以下方式来编写第一个Kafka示例程序

参考以下文档:kafka 2.4.0 API

1 创建用于连接Kafka的Properties配置

Properties props = new Properties();

props.put("bootstrap.servers", "192.168.88.100:9092");

props.put("acks", "all");

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

2 创建一个生产者对象KafkaProducer

3 调用send发送1-100消息到指定Topic test,并获取返回值Future,该对象封装了返回值

4 再调用一个Future.get()方法等待响应

5 关闭生产者

参考代码:

public class KafkaProducerTest {
    public static void main(String[] args) {
        // 1. 创建用于连接Kafka的Properties配置
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.88.100:9092");
        props.put("acks", "all");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 2. 创建一个生产者对象KafkaProducer
        KafkaProducer producer = new KafkaProducer(props);

        // 3. 调用send发送1-100消息到指定Topic test
        for(int i = 0; i < 100; ++i) {
            try {
                // 获取返回值Future,该对象封装了返回值
                Future future = producer.send(new ProducerRecord("test", null, i + ""));
                // 调用一个Future.get()方法等待响应
                future.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }

        // 5. 关闭生产者
        producer.close();
    }
}

 5.2 从Kafka的topic中消费消息

5.2.1 需求

从 test topic中,将消息都消费,并将记录的offset、key、value打印出来

5.2.2 准备工作

创建KafkaConsumerTest类

5.2.3 开发步骤


1 创建Kafka消费者配置

Properties props = new Properties();
props.setProperty("bootstrap.servers", "node1.itcast.cn:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

2 创建Kafka消费者

3 订阅要消费的主题

4 使用一个while循环,不断从Kafka的topic中拉取消息

5 将将记录(record)的offset、key、value都打印出来

参考代码

public class KafkaProducerTest {
    public static void main(String[] args) {
        // 1. 创建用于连接Kafka的Properties配置
        Properties props = new Properties();
        props.put("bootstrap.servers", "node1.itcast.cn:9092");
        props.put("acks", "all");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 2. 创建一个生产者对象KafkaProducer
        KafkaProducer producer = new KafkaProducer(props);

        // 3. 调用send发送1-100消息到指定Topic test
        for(int i = 0; i < 100; ++i) {
            try {
                // 获取返回值Future,该对象封装了返回值
                Future future = producer.send(new ProducerRecord("test", null, i + ""));
                // 调用一个Future.get()方法等待响应
                future.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }

        // 5. 关闭生产者
        producer.close();
    }
}

参考官网API文档:

kafka 2.4.0 API

5.3 异步使用带有回调函数方法生产消息


如果我们想获取生产者消息是否成功,或者成功生产消息到Kafka中后,执行一些其他动作。此时,可以很方便地使用带有回调函数来发送消息。

需求:
1.在发送消息出现异常时,能够及时打印出异常信息
2.在发送消息成功时,打印Kafka的topic名字、分区id、offset

public class KafkaProducerTest {
    public static void main(String[] args) {
        // 1. 创建用于连接Kafka的Properties配置
        Properties props = new Properties();
        props.put("bootstrap.servers", "node1.itcast.cn:9092");
        props.put("acks", "all");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 2. 创建一个生产者对象KafkaProducer
        KafkaProducer producer = new KafkaProducer(props);

        // 3. 调用send发送1-100消息到指定Topic test
        for(int i = 0; i < 100; ++i) {
            // 一、同步方式
            // 获取返回值Future,该对象封装了返回值
            // Future future = producer.send(new ProducerRecord("test", null, i + ""));
            // 调用一个Future.get()方法等待响应
            // future.get();

            // 二、带回调函数异步方式
            producer.send(new ProducerRecord("test", null, i + ""), new Callback() {
                @Override
                public void onCompletion(Recordmetadata metadata, Exception exception) {
                    if(exception != null) {
                        System.out.println("发送消息出现异常");
                    }
                    else {
                        String topic = metadata.topic();
                        int partition = metadata.partition();
                        long offset = metadata.offset();

                        System.out.println("发送消息到Kafka中的名字为" + topic + "的主题,第" + partition + "分区,第" + offset + "条数据成功!");
                    }
                }
            });
        }

        // 5. 关闭生产者
        producer.close();
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/719519.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号