前言:读源码将,不能一行一行分析,需要带着目的去读,最后重要的步骤串联起来
上一节我们将阅读前准备做完了,这一节我们就开始来看发送消息-实例化KafkaProducer时都做了什么?
1、kafka.examples.Producer作为Debug整个生产者的入口,它其实也就是一个线程类
public class Producer extends Thread
整个类我们关注的重点在 new KafkaProducer<>(props)
public Producer(final String topic,
final Boolean isAsync,
final String transactionalId,
final boolean enableIdempotency,
final int numRecords,
final int transactionTimeoutMs,
final CountDownLatch latch) {
Properties props = new Properties();
// 服务器地址 bootstrap.servers => localhost:9092
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
// 客户端ID(每个客户端唯一) client.id => DemoProducer
props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
// 消息的 Key 的序列化方式
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
// 消息的 Value 的序列化方式
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 发送消息超时时间
if (transactionTimeoutMs > 0) {
props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs);
}
// 事务ID
if (transactionalId != null) {
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
}
// 是否启动幂等性 false
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
producer = new KafkaProducer<>(props);
this.topic = topic;
this.isAsync = isAsync;
this.numRecords = numRecords;
this.latch = latch;
}
2、org.apache.kafka.clients.producer.KafkaProducer
1、类成员变量
private final String clientId;
// Visible for testing
final Metrics metrics;
private final Partitioner partitioner;
private final int maxRequestSize;
private final long totalMemorySize;
private final Producermetadata metadata;
private final RecordAccumulator accumulator;
private final Sender sender;
private final Thread ioThread;
private final CompressionType compressionType;
private final Sensor errors;
private final Time time;
private final Serializer keySerializer;
private final Serializer valueSerializer;
private final ProducerConfig producerConfig;
private final long maxBlockTimeMs;
private final ProducerInterceptors interceptors;
private final ApiVersions apiVersions;
private final TransactionManager transactionManager;
2、消息发送流程(暂不用细看)
3、构造方法分析
整个带参构造方法很长,可以将其分为以下几步:
1.配置一些用户自定义的参数
… 设置一些默认配置,详见下面代码部分…
2.Metric监控信息
3.设置分区器,默认使用的DefaultPartitioner
… 设置一些默认配置,详见下面代码部分…
4.设置序列化器
5.设置拦截器
… 设置一些默认配置,详见下面代码部分…
6.创建了一个核心的组件RecordAccumulator,缓存消息
7.获取 kafka 集群主机列表
8.设置metadata元数据
9.初始化Sender线程
10.初始化KafkaThread(守护线程)
11.启动KafkaThread
… 设置一些默认配置,详见下面代码部分…
为了大家阅读体验,这里只贴该类构造器方法
KafkaProducer(Map3、org.apache.kafka.clients.producer.KafkaProducer#newSenderconfigs, Serializer keySerializer, Serializer valueSerializer, Producermetadata metadata, KafkaClient kafkaClient, ProducerInterceptors interceptors, Time time) { ProducerConfig config = new ProducerConfig(ProducerConfig.appendSerializerToConfig(configs, keySerializer, valueSerializer)); try { // 1.配置一些用户自定义的参数 Map userProvidedConfigs = config.originals(); this.producerConfig = config; this.time = time; // kafka在0.11之后支持事务 String transactionalId = (String) userProvidedConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG); // 配置client id编号,一般不配置 this.clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG); LogContext logContext; if (transactionalId == null) logContext = new LogContext(String.format("[Producer clientId=%s] ", clientId)); else logContext = new LogContext(String.format("[Producer clientId=%s, transactionalId=%s] ", clientId, transactionalId)); log = logContext.logger(KafkaProducer.class); log.trace("Starting the Kafka producer"); // metric是一些监控信息,我们一般分析源码的时候 不需要去关心 Map metricTags = Collections.singletonMap("client-id", clientId); MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)) .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS) .recordLevel(Sensor.RecordingLevel.forName(config.getString(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG))) .tags(metricTags); List reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class, Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)); JmxReporter jmxReporter = new JmxReporter(); jmxReporter.configure(userProvidedConfigs); reporters.add(jmxReporter); MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX, config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX)); this.metrics = new Metrics(metricConfig, reporters, time, metricsContext); // 设置分区器,默认使用的DefaultPartitioner this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class); // 发送失败后重试时间间隔 retry.backoff.ms 默认100ms long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG); // 设置序列化器 也支持自定义序列化器 if (keySerializer == null) { this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class); this.keySerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), true); } else { config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG); this.keySerializer = keySerializer; } if (valueSerializer == null) { this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class); this.valueSerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), false); } else { config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG); this.valueSerializer = valueSerializer; } // load interceptors and make sure they get clientId userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId); ProducerConfig configWithClientId = new ProducerConfig(userProvidedConfigs, false); // 设置拦截器 类似于一个过滤器 List > interceptorList = (List) configWithClientId.getConfiguredInstances( ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class); if (interceptors != null) this.interceptors = interceptors; else this.interceptors = new ProducerInterceptors<>(interceptorList); ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters); // max.request.size 生产者往服务端发送消息的时候,规定一条消息最大多大? // 如果你超过了这个规定消息的大小,你的消息就不能发送过去。 // 默认是1M,这个值偏小,在生产环境中,我们需要修改这个值。 // 经验值是10M。但是大家也可以根据自己公司的情况来。 this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG); // 指定缓冲区缓存大小 this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG); // kafka是支持压缩数据的,这儿设置压缩格式。 // 提高你的系统的吞吐量,你可以设置压缩格式。 // 一次发送出去的消息就更多。生产者这儿会消耗更多的cpu. this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG)); // buffer满了或者metadata获取不到(比如leader挂了), 或者序列化没完成分区函数没计算完等等情况下的最大阻塞时间,默认60000ms (60秒) this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG); // 发送消息上报成功或失败的最大时间 int deliveryTimeoutMs = configureDeliveryTimeout(config, log); this.apiVersions = new ApiVersions(); this.transactionManager = configureTransactionState(config, logContext); // 创建了一个核心的组件RecordAccumulator,缓存消息 // 创建消息收集器,它会将为消息申请内存、消息压缩(如果需要)并压如到待发送消息缓存队列中 this.accumulator = new RecordAccumulator(logContext, // 每个批次的大小16KB config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), // 压缩 this.compressionType, // 没有达到批次最大限制时 延迟发送的时间 默认0ms lingerMs(config), retryBackoffMs, deliveryTimeoutMs, metrics, PRODUCER_METRIC_GROUP_NAME, time, apiVersions, transactionManager, // 初始化32M的缓冲区 new BufferPool(this.totalMemorySize, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, PRODUCER_METRIC_GROUP_NAME)); // 获取 kafka 集群主机列表 List addresses = ClientUtils.parseAndValidateAddresses( config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG), config.getString(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG)); if (metadata != null) { // 元数据存在时就直接赋值 this.metadata = metadata; } else { // 不存在就创建一个metadata对象,主要参数都是超时时间 this.metadata = new Producermetadata(retryBackoffMs, config.getLong(ProducerConfig.metaDATA_MAX_AGE_CONFIG), config.getLong(ProducerConfig.metaDATA_MAX_IDLE_CONFIG), logContext, clusterResourceListeners, Time.SYSTEM); // 更新元数据 this.metadata.bootstrap(addresses); } this.errors = this.metrics.sensor("errors"); // 初始化Sender线程 this.sender = newSender(logContext, kafkaClient, this.metadata); String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId; // 创建了一个线程(守护线程 setDaemon(daemon)),然后里面传进去了一个sender对象。 // 内部包含了线程代码 // 实现把业务代码与线程代码进行隔离,这样会显得清晰点 this.ioThread = new KafkaThread(ioThreadName, this.sender, true); // 启动线程 this.ioThread.start(); // WARN 记录用户自定义了但是未使用到的配置 config.logUnused(); // 注册一个监控和日志插件 AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds()); log.debug("Kafka producer started"); } catch (Throwable t) { // call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121 close(Duration.ofMillis(0), true); // now propagate the exception throw new KafkaException("Failed to construct kafka producer", t); } }
上面我们讲了KafkaProducer构造方法,其中最重要的还是Sender线程
public class Sender implements Runnable
所以我们接着看newSender方法
Sender newSender(LogContext logContext, KafkaClient kafkaClient, Producermetadata metadata) {
// 允许发送失败的条数, 幂等和顺序: max.in.flight.requests.per.connection = 1
// 需要保证 max.in.flight.requests.per.connection 参数的值不能大于5 否则会报出 ConfigException:
int maxInflightRequests = configureInflightRequests(producerConfig);
// 超时配置 默认30s, 自定义配置至少需要大于10s. ReplicaLagTimeMaxMs = 10000L
// 这应该大于 replica.lag.time.max.ms(代理配置)以减少由于不必要的生产者重试而导致消息重复的可能性。
// replica.lag.time.max.ms: Kafka判断ISR中的follower和leader同步的根据是Broker的配置参数 replica.lag.time.max.ms 默认是10s
// 就是说如果follower落后leader 10s,则认为他失效了,会踢出ISR集合。
int requestTimeoutMs = producerConfig.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
// KafkaChannel负责基于socket的连接,认证,数据读取发送。
// 它包含TransportLayer和Authenticator两个部分。TransportLayer负责数据交互,Authenticator负责安全验证
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(producerConfig, time, logContext);
// 监控 看源码的时候可忽略
ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics);
Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics);
// 初始化了一个重要的网络管理的组件
// NetworkClient 是后面Sender线程和服务端进行网络I/O的核心类
KafkaClient client = kafkaClient != null ? kafkaClient : new NetworkClient(
// connections.max.idle.ms: 默认值是9分钟,一个网络连接最多空闲多久,超过这个空闲时间,就关闭这个网络连接。
// 可以设置为-1 ,不回收连接,减少频繁的创建和销毁连接
new Selector(producerConfig.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
this.metrics, time, "producer", channelBuilder, logContext),
metadata,
clientId,
// max.in.flight.requests.per.connection:默认是5
// 发送数据的时候,其实是有多个网络连接。每个网络连接可以忍受 producer端发送给broker消息后,消息没有响应的个数。
// 因为kafka有重试机制,所以有可能会造成数据乱序,如果想要保证有序,这个值要把设置为1.
maxInflightRequests,
// reconnect.backoff.ms:socket尝试重新连接指定主机的时间间隔
producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
// send.buffer.bytes:socket发送数据的缓冲区的大小,默认值是128K
producerConfig.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
// receive.buffer.bytes:socket接受数据的缓冲区的大小,默认值是32K
producerConfig.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
requestTimeoutMs,
producerConfig.getLong(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG),
producerConfig.getLong(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG),
ClientDnsLookup.forConfig(producerConfig.getString(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG)),
time,
true,
apiVersions,
throttleTimeSensor,
logContext);
// 获取acks参数 默认-1 如果用户还显式地指定了 acks 参数,那么还需要保证这个参数的值为 -1(all)
// 如果不为 -1(这个参数的值默认为1),那么也会报出 ConfigException:
short acks = configureAcks(producerConfig, log);
return new Sender(logContext,
client,
metadata,
this.accumulator,
maxInflightRequests == 1,
producerConfig.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
acks,
producerConfig.getInt(ProducerConfig.RETRIES_CONFIG),
metricsRegistry.senderMetrics,
time,
requestTimeoutMs,
producerConfig.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
this.transactionManager,
apiVersions);
}
PS:1、其中有些细节 我就不一一讲了,比如:
需要保证 max.in.flight.requests.per.connection 参数的值不能大于5 否则会报出 ConfigException
acks如果不为 -1(这个参数的值默认为1),那么也会报出 ConfigException
2、很多配置需要大家自己看代码org.apache.kafka.clients.producer.ProducerConfig,就没有挨着讲了,因为这个类中都给了对应的解释以及默认值了。
解释:
默认值:



