栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

【Kafka学习】源码系列(二)发送消息-实例化KafkaProducer

【Kafka学习】源码系列(二)发送消息-实例化KafkaProducer

前言:读源码将,不能一行一行分析,需要带着目的去读,最后重要的步骤串联起来

上一节我们将阅读前准备做完了,这一节我们就开始来看发送消息-实例化KafkaProducer时都做了什么?

1、kafka.examples.Producer

作为Debug整个生产者的入口,它其实也就是一个线程类

public class Producer extends Thread

整个类我们关注的重点在 new KafkaProducer<>(props)

	public Producer(final String topic,
                   final Boolean isAsync,
                   final String transactionalId,
                   final boolean enableIdempotency,
                   final int numRecords,
                   final int transactionTimeoutMs,
                   final CountDownLatch latch) {
       Properties props = new Properties();
       // 服务器地址 bootstrap.servers => localhost:9092
       props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
       // 客户端ID(每个客户端唯一) client.id => DemoProducer
       props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
       // 消息的 Key 的序列化方式
       props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
       // 消息的 Value 的序列化方式
       props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
       // 发送消息超时时间
       if (transactionTimeoutMs > 0) {
           props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs);
       }
       // 事务ID
       if (transactionalId != null) {
           props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
       }
       // 是否启动幂等性 false
       props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);

       producer = new KafkaProducer<>(props);
       this.topic = topic;
       this.isAsync = isAsync;
       this.numRecords = numRecords;
       this.latch = latch;
   }
2、org.apache.kafka.clients.producer.KafkaProducer 1、类成员变量
    
    private final String clientId;
    // Visible for testing
    final Metrics metrics;
    
    private final Partitioner partitioner;
    
    private final int maxRequestSize;
    
    private final long totalMemorySize;
    
    private final Producermetadata metadata;
    
    private final RecordAccumulator accumulator;
    
    private final Sender sender;
    
    private final Thread ioThread;
    
    private final CompressionType compressionType;
    private final Sensor errors;
    
    private final Time time;
    
    private final Serializer keySerializer;
    
    private final Serializer valueSerializer;
    
    private final ProducerConfig producerConfig;
    
    private final long maxBlockTimeMs;
    
    private final ProducerInterceptors interceptors;
    
    private final ApiVersions apiVersions;
    
    private final TransactionManager transactionManager;
2、消息发送流程(暂不用细看)

3、构造方法分析

整个带参构造方法很长,可以将其分为以下几步:
1.配置一些用户自定义的参数
设置一些默认配置,详见下面代码部分
2.Metric监控信息
3.设置分区器,默认使用的DefaultPartitioner
设置一些默认配置,详见下面代码部分
4.设置序列化器
5.设置拦截器
设置一些默认配置,详见下面代码部分
6.创建了一个核心的组件RecordAccumulator,缓存消息
7.获取 kafka 集群主机列表
8.设置metadata元数据
9.初始化Sender线程
10.初始化KafkaThread(守护线程)
11.启动KafkaThread
设置一些默认配置,详见下面代码部分

为了大家阅读体验,这里只贴该类构造器方法

	KafkaProducer(Map configs,
                  Serializer keySerializer,
                  Serializer valueSerializer,
                  Producermetadata metadata,
                  KafkaClient kafkaClient,
                  ProducerInterceptors interceptors,
                  Time time) {
        ProducerConfig config = new ProducerConfig(ProducerConfig.appendSerializerToConfig(configs, keySerializer,
                valueSerializer));
        try {
            // 1.配置一些用户自定义的参数
            Map userProvidedConfigs = config.originals();
            this.producerConfig = config;
            this.time = time;

            // kafka在0.11之后支持事务
            String transactionalId = (String) userProvidedConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
            // 配置client id编号,一般不配置
            this.clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);

            LogContext logContext;
            if (transactionalId == null)
                logContext = new LogContext(String.format("[Producer clientId=%s] ", clientId));
            else
                logContext = new LogContext(String.format("[Producer clientId=%s, transactionalId=%s] ", clientId, transactionalId));
            log = logContext.logger(KafkaProducer.class);
            log.trace("Starting the Kafka producer");

            // metric是一些监控信息,我们一般分析源码的时候 不需要去关心
            Map metricTags = Collections.singletonMap("client-id", clientId);
            MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
                    .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
                    .recordLevel(Sensor.RecordingLevel.forName(config.getString(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
                    .tags(metricTags);
            List reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
                    MetricsReporter.class,
                    Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
            JmxReporter jmxReporter = new JmxReporter();
            jmxReporter.configure(userProvidedConfigs);
            reporters.add(jmxReporter);
            MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX,
                    config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
            this.metrics = new Metrics(metricConfig, reporters, time, metricsContext);

            // 设置分区器,默认使用的DefaultPartitioner
            this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
            // 发送失败后重试时间间隔 retry.backoff.ms 默认100ms
            long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
            // 设置序列化器 也支持自定义序列化器
            if (keySerializer == null) {
                this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                                                                                         Serializer.class);
                this.keySerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), true);
            } else {
                config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
                this.keySerializer = keySerializer;
            }
            if (valueSerializer == null) {
                this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                                                                                           Serializer.class);
                this.valueSerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), false);
            } else {
                config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
                this.valueSerializer = valueSerializer;
            }

            // load interceptors and make sure they get clientId
            userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
            ProducerConfig configWithClientId = new ProducerConfig(userProvidedConfigs, false);

            // 设置拦截器   类似于一个过滤器
            List> interceptorList = (List) configWithClientId.getConfiguredInstances(
                    ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class);
            if (interceptors != null)
                this.interceptors = interceptors;
            else
                this.interceptors = new ProducerInterceptors<>(interceptorList);
            ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer,
                    valueSerializer, interceptorList, reporters);

            // max.request.size 生产者往服务端发送消息的时候,规定一条消息最大多大?
            // 如果你超过了这个规定消息的大小,你的消息就不能发送过去。
            // 默认是1M,这个值偏小,在生产环境中,我们需要修改这个值。
            // 经验值是10M。但是大家也可以根据自己公司的情况来。
            this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
            // 指定缓冲区缓存大小
            this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
            // kafka是支持压缩数据的,这儿设置压缩格式。
            // 提高你的系统的吞吐量,你可以设置压缩格式。
            // 一次发送出去的消息就更多。生产者这儿会消耗更多的cpu.
            this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));

            // buffer满了或者metadata获取不到(比如leader挂了), 或者序列化没完成分区函数没计算完等等情况下的最大阻塞时间,默认60000ms (60秒)
            this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
            // 发送消息上报成功或失败的最大时间
            int deliveryTimeoutMs = configureDeliveryTimeout(config, log);

            this.apiVersions = new ApiVersions();
            this.transactionManager = configureTransactionState(config, logContext);
            // 创建了一个核心的组件RecordAccumulator,缓存消息
            // 创建消息收集器,它会将为消息申请内存、消息压缩(如果需要)并压如到待发送消息缓存队列中
            this.accumulator = new RecordAccumulator(logContext,
                    // 每个批次的大小16KB
                    config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
                    // 压缩
                    this.compressionType,
                    // 没有达到批次最大限制时 延迟发送的时间 默认0ms
                    lingerMs(config),
                    retryBackoffMs,
                    deliveryTimeoutMs,
                    metrics,
                    PRODUCER_METRIC_GROUP_NAME,
                    time,
                    apiVersions,
                    transactionManager,
                    // 初始化32M的缓冲区
                    new BufferPool(this.totalMemorySize, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, PRODUCER_METRIC_GROUP_NAME));

            // 获取 kafka 集群主机列表
            List addresses = ClientUtils.parseAndValidateAddresses(
                    config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
                    config.getString(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG));
            if (metadata != null) {
                // 元数据存在时就直接赋值
                this.metadata = metadata;
            } else {
                // 不存在就创建一个metadata对象,主要参数都是超时时间
                this.metadata = new Producermetadata(retryBackoffMs,
                        config.getLong(ProducerConfig.metaDATA_MAX_AGE_CONFIG),
                        config.getLong(ProducerConfig.metaDATA_MAX_IDLE_CONFIG),
                        logContext,
                        clusterResourceListeners,
                        Time.SYSTEM);
                // 更新元数据
                this.metadata.bootstrap(addresses);
            }
            this.errors = this.metrics.sensor("errors");

            // 初始化Sender线程
            this.sender = newSender(logContext, kafkaClient, this.metadata);
            String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;

            // 创建了一个线程(守护线程 setDaemon(daemon)),然后里面传进去了一个sender对象。
            // 内部包含了线程代码
            // 实现把业务代码与线程代码进行隔离,这样会显得清晰点
            this.ioThread = new KafkaThread(ioThreadName, this.sender, true);

            // 启动线程
            this.ioThread.start();
            // WARN 记录用户自定义了但是未使用到的配置
            config.logUnused();

            // 注册一个监控和日志插件
            AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());
            log.debug("Kafka producer started");
        } catch (Throwable t) {
            // call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121
            close(Duration.ofMillis(0), true);
            // now propagate the exception
            throw new KafkaException("Failed to construct kafka producer", t);
        }
    }
3、org.apache.kafka.clients.producer.KafkaProducer#newSender

上面我们讲了KafkaProducer构造方法,其中最重要的还是Sender线程

public class Sender implements Runnable

所以我们接着看newSender方法

	Sender newSender(LogContext logContext, KafkaClient kafkaClient, Producermetadata metadata) {

        // 允许发送失败的条数, 幂等和顺序: max.in.flight.requests.per.connection = 1
        // 需要保证 max.in.flight.requests.per.connection 参数的值不能大于5 否则会报出 ConfigException:
        int maxInflightRequests = configureInflightRequests(producerConfig);
        // 超时配置 默认30s, 自定义配置至少需要大于10s. ReplicaLagTimeMaxMs = 10000L
        // 这应该大于 replica.lag.time.max.ms(代理配置)以减少由于不必要的生产者重试而导致消息重复的可能性。
        // replica.lag.time.max.ms: Kafka判断ISR中的follower和leader同步的根据是Broker的配置参数 replica.lag.time.max.ms 默认是10s
        // 就是说如果follower落后leader 10s,则认为他失效了,会踢出ISR集合。
        int requestTimeoutMs = producerConfig.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
        // KafkaChannel负责基于socket的连接,认证,数据读取发送。
        // 它包含TransportLayer和Authenticator两个部分。TransportLayer负责数据交互,Authenticator负责安全验证
        ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(producerConfig, time, logContext);
        
        // 监控 看源码的时候可忽略
        ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics);
        Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics);

        // 初始化了一个重要的网络管理的组件
        // NetworkClient 是后面Sender线程和服务端进行网络I/O的核心类
        KafkaClient client = kafkaClient != null ? kafkaClient : new NetworkClient(
                // connections.max.idle.ms: 默认值是9分钟,一个网络连接最多空闲多久,超过这个空闲时间,就关闭这个网络连接。
                // 可以设置为-1 ,不回收连接,减少频繁的创建和销毁连接
                new Selector(producerConfig.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
                        this.metrics, time, "producer", channelBuilder, logContext),
                metadata,
                clientId,
                // max.in.flight.requests.per.connection:默认是5
                // 发送数据的时候,其实是有多个网络连接。每个网络连接可以忍受 producer端发送给broker消息后,消息没有响应的个数。
                // 因为kafka有重试机制,所以有可能会造成数据乱序,如果想要保证有序,这个值要把设置为1.
                maxInflightRequests,
                // reconnect.backoff.ms:socket尝试重新连接指定主机的时间间隔
                producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
                producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
                // send.buffer.bytes:socket发送数据的缓冲区的大小,默认值是128K
                producerConfig.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
                // receive.buffer.bytes:socket接受数据的缓冲区的大小,默认值是32K
                producerConfig.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
                requestTimeoutMs,
                producerConfig.getLong(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG),
                producerConfig.getLong(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG),
                ClientDnsLookup.forConfig(producerConfig.getString(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG)),
                time,
                true,
                apiVersions,
                throttleTimeSensor,
                logContext);

        // 获取acks参数 默认-1 如果用户还显式地指定了 acks 参数,那么还需要保证这个参数的值为 -1(all)
        // 如果不为 -1(这个参数的值默认为1),那么也会报出 ConfigException:
        short acks = configureAcks(producerConfig, log);
        
        return new Sender(logContext,
                client,
                metadata,
                this.accumulator,
                maxInflightRequests == 1,
                producerConfig.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
                acks,
                producerConfig.getInt(ProducerConfig.RETRIES_CONFIG),
                metricsRegistry.senderMetrics,
                time,
                requestTimeoutMs,
                producerConfig.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
                this.transactionManager,
                apiVersions);
    }

PS:1、其中有些细节 我就不一一讲了,比如:
需要保证 max.in.flight.requests.per.connection 参数的值不能大于5 否则会报出 ConfigException

acks如果不为 -1(这个参数的值默认为1),那么也会报出 ConfigException

2、很多配置需要大家自己看代码org.apache.kafka.clients.producer.ProducerConfig,就没有挨着讲了,因为这个类中都给了对应的解释以及默认值了。
解释:

默认值:

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/653804.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号