栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

2.生产者消息发送流程及API使用

2.生产者消息发送流程及API使用

学习尚硅谷kafka教程记录的笔记,视频地址: kafka3.x教程

生产者消息发送流程

​ 在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程。在 main 线程中创建了一个双端队列 RecordAccumulator。main 线程将消息发送给 RecordAccumulator,Sender 线程不断从RecordAccumulator 中拉取消息发送到 Kafka Broker。

​ 在mian线程发送消息到队列前,还经过拦截器、序列化器、分区器。拦截器用于拦截一些数据,序列化器中可以指定消息的key和value的序列化,分区器可以指定消息发送到哪个分区。

​ RecordAccumulator队列默认为32M,每批次大小默认为16kb。

​ Sender线程拉取数据有两个条件达到一个即开始拉取

​ 1、数据累积到batch.size(默认16kb)。

​ 2、sender等待linger.ms(默认0ms),表示没有延迟,来了一条即开始拉取。

​ 发送数据时,NewWorkClient中有缓存的请求,按照kafka中的节点个数,每个节点对应一个队列,一个队列最多可以缓存5个请求。

​ 最后通过Selector打通RecordAccumulator队列到集群的链路,将消息发送到kafka集群中。

​ 发送完成后,kafka对消息做出应答,应答有0、1、-1三种形式。

0:生产者发送过来的数据,不需要等数据落盘应答。
1:生产者发送过来的数据,Leader收到数据后应答。
-1(all):生产者发送过来的数据,Leader和ISR队列里面的所有节点收齐数据后应答。-1和all等价。

具体的流程如下图:

生产者发送消息API使用

引入依赖


        
            org.apache.kafka
            kafka-clients
            3.0.0
        

编写测试代码

public static void main(String[] args) {
        // 1. 创建 kafka 生产者的配置对象
        Properties properties = new Properties();
        // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "hadoop102:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");

        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        // 3. 创建 kafka 生产者对象
        KafkaProducer kafkaProducer = new
                KafkaProducer(properties);
        // 4. 调用 send 方法,发送消息
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new
                    ProducerRecord("first","hello " + i));
        }
        // 5. 关闭资源
        kafkaProducer.close();
    }

测试:在 hadoop102 上开启 Kafka 消费者,启动项目查看消费者是否接收到消息

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic topicname

带回调函数的异步发送

​ 回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元数据信息(Recordmetadata)和异常信息(Exception),如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。

只需要修改ProducerRecord方法的参数,代码如下

// 4. 调用 send 方法,发送消息
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new
                    ProducerRecord("first", "hello callback" + i), new Callback() {
                public void onCompletion(Recordmetadata metadata, Exception e) {
                    if (e == null) {
                        System.out.println(" 主题: " +
                                metadata.topic() + "->" + "分区:" + metadata.partition());
                    } else {
                        e.printStackTrace();
                        System.out.println(" 主题: " +
                                metadata.topic() + "->" + "分区:" + metadata.partition() + "发送失败");
                    }
                }
            });
        }
生产者分区

生产者分区的好处:

便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一

块一块数据存储在多台Broker上。

提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。

生产者发送消息的分区策略

指明partition的情况下,直接将指明的值作为partition值;没有指明partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到partition值;既没有partition值又没有key值的情况下,Kafka采用Sticky Partition(黏性分区器),会随机选择一个分区,并尽可能一直使用该分区,待该分区的batch已满或者已完成,Kafka再随机一个分区进行使用(和上一次的分区不同)。

自定义分区器

1.定义类实现 Partitioner 接口。2.重写 partition()方法。

public class MyPartition implements Partitioner {
    public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
        String value = o1.toString();
        int partiton = 0;
        if(value.contains("atguigu")){
            partiton = 0;
        }else{
            partiton = 1;
        }
        return partiton;
    }

    public void close() {

    }

    public void configure(Map map) {

    }
}

3.在生产者的配置中添加分区器参数

//自定义分区
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.atguigu.kafka.producer.MyPartition");

生产者如何提高吞吐量

1.批次大小,默认16kb

properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);

2.修改等待时间,修改为5-100ms

properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);

3.修改缓冲区大小,修改为64m

properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);

4.使用压缩snappy

// compression.type:压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstd
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/746520.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号