栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Kafka开源代码阅读学习之旅(三) - 从一个DEMO入手

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Kafka开源代码阅读学习之旅(三) - 从一个DEMO入手

Kafka开源代码阅读学习之旅(三) - 从一个DEMO入手
  • 一、Producer-生产者
    • 1.**producer类**
  • 二、Producer核心流程
  • 三、Producer初始化
    • 1.初始化KafkaProducer对象
    • 2.初始化KafkaProducer重要参数

一、Producer-生产者

1.producer类
public class Producer extends Thread {
    private final KafkaProducer producer;
    private final String topic;
    private final Boolean isAsync;
    
    public Producer(String topic, Boolean isAsync) {
        Properties props = new Properties();
        //  指定kafka集群地址
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
        //   client.id一般不做设置
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
        //  设置序列化的类。 数据传输的过程中需要进行序列化,消费者获取数据需要反序列化
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        //todo: 初始化KafkaProducer对象
        producer = new KafkaProducer<>(props);
        this.topic = topic;
        this.isAsync = isAsync;
    }

    public void run() {
        int messageNo = 1;
        // todo: 一直会往kafka发送数据
        while (true) {
            String messageStr = "Message_" + messageNo;
            long startTime = System.currentTimeMillis();
            //isAsync , kafka发送数据的时候,有两种方式
            //todo:1: 异步发送   isAsync=true
            //todo:2: 同步发送   isAsync=false
            if (isAsync) { // Send asynchronously
                //todo:异步发送
                //这样的方式,性能比较好,我们生产代码用的就是这种方式。
                producer.send(new ProducerRecord<>(topic,
                        messageNo,
                        messageStr), new DemoCallBack(startTime, messageNo, messageStr));
            } else { // Send synchronously
                try {
                    //todo:同步发送
                    //todo:发送一条消息,等这条消息所有的后续工作都完成以后才继续下一条消息的发送。
                    producer.send(new ProducerRecord<>(topic,
                            messageNo,
                            messageStr)).get();  //阻塞方法
                    System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            }
            ++messageNo;
        }
    }
}
二、Producer核心流程
  1. ProducerInterceptors是一个拦截器,对发送的数据进行拦截处理
  2. Serializer对消息的key和value进行序列化
  3. 通过使用分区器作用在每一条消息上,实现数据分发进入到topic不同的分区之中
  4. RecordAccumulator缓存消息,实现批量发送
  5. Sender从RecordAccumulator中获取消息
  6. 构建ClientRequest对象
  7. 将ClientRequest交到NetWorkClient准备发送
  8. ClientRequest将请求放入到KafkaChannel的缓存
  9. 发送请求到kafka集群
  10. Sender线程接受服务端发送的响应
  11. 执行绑定的回调函数
三、Producer初始化 1.初始化KafkaProducer对象
 private final KafkaProducer producer;
 //todo: 初始化KafkaProducer对象
        producer = new KafkaProducer<>(props);
2.初始化KafkaProducer重要参数
 Properties props = new Properties();
        //  指定kafka集群地址
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
        //   client.id一般不做设置
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
        //  设置序列化的类。 数据传输的过程中需要进行序列化,消费者获取数据需要反序列化
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/328632.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号