本文章简单流程追踪一下Kafka从服务端拉取数据到客户端的流程。
看完本文,你将会大致了解消费者数据消费的过程。
Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "xxx:9092"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.StickyAssignor"); KafkaConsumerkafkaConsumer = new KafkaConsumer<>(properties); // 订阅主题 kafkaConsumer.subscribe(Lists.newArrayList("test")); while (true) { // 拉取数据 ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord records : consumerRecords) { System.out.println(records); } }
以上是一个简单的消费者demo,在开始源码之前务必 需要了解一下消费者消费初始化流程。
3.消费者消费初始化流程大致流程:
1.每个consumer都发送JoinGroup请求。
2.coordinator选出一个consumer作为leader。
3.把要消费的topic信息发送给leader消费者。
4.leader会制定一个消费方案,并把消费方案发给coordinator。
5.coordinator把消费方案下发给每个consumer。
6.每个consumer都会和coordinator保持心跳,默认3秒,一旦超时45秒,该消费者就会被移除,触发在平衡。
下面开始我们将重点分析数据拉取流程。
4.KafkaConsumerKafkaConsumer的实例化:
private KafkaConsumer(ConsumerConfig config, DeserializerkeyDeserializer, Deserializer valueDeserializer) { try { // 创建客户端对象 // reconnect.backoff.ms 连接重试时间 50ms // reconnect.backoff.max.ms 最大连接重试时间 1s // send.buffer.bytes 发送缓存 128kb // receive.buffer.bytes 64kb NetworkClient netClient = new NetworkClient( new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder, logContext), this.metadata, config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG), config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG), config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG), heartbeatIntervalMs); // 消费者分区分配策略 this.assignors = config.getConfiguredInstances( ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, PartitionAssignor.class); // 协助消费者初始化 // auto.commit.interval.ms 自动提交offset 5s this.coordinator = groupId == null ? null : new ConsumerCoordinator(logContext, this.client, groupId, maxPollIntervalMs, sessionTimeoutMs, config.getBoolean(ConsumerConfig.LEAVE_GROUP_ON_CLOSE_CONFIG)); // 消费者抓取器 // fetch.min.bytes 最小一次抓取的字节数 // fetch.max.bytes 最大抓取的字节数 // fetch.max.wait.ms 抓取的等待时间 // max.poll.records 一次性返回的最大条数 this.fetcher = new Fetcher<>( logContext, this.client, config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG), config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG), config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG), config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG), apiVersions); log.debug("Kafka consumer initialized"); } }
实例化过程中初始化了几个比较重要的组件。
5.consumer.subscribepublic void subscribe(Collectiontopics, ConsumerRebalanceListener listener) { try { log.info("Subscribed to topic(s): {}", Utils.join(topics, ", ")); // 订阅主题 (判断你是否需要更新订阅的主题) 注册了一个监听器,,有其他消费者挂了,需要重新分配 if (this.subscriptions.subscribe(new HashSet<>(topics), listener)) // 在平衡了,更新订阅信息 metadata.requestUpdateForNewTopics(); } finally { release(); } }
subscribe()方法:
public synchronized boolean subscribe(Set6.consumer.poll()topics, ConsumerRebalanceListener listener) { // 注册负载均衡监听器 registerRebalanceListener(listener); // 自动订阅模式 setSubscriptionType(SubscriptionType.AUTO_TOPICS); // 是否需要更改订阅的主题 return changeSubscription(topics); }
private ConsumerRecordspoll(final Timer timer, final boolean includemetadataInTimeout) { try { do { if (includemetadataInTimeout) { // 1.消费者 或者 消费者组的初始化 // try to update assignment metadata BUT do not need to block on the timer for join group updateAssignmentmetadataIfNeeded(timer, false); } // 2.抓取数据 final Map >> records = pollForFetches(timer); if (!records.isEmpty()) { // 3.拦截器处理数据 return this.interceptors.onConsume(new ConsumerRecords<>(records)); } } while (timer.notExpired()); return ConsumerRecords.empty(); } finally { } }
拉取数据,然后在经过拦截器处理数据,最后返回。
pollForFetches(timer):
// 开始拉取数据
fetcher.sendFetches();
public synchronized int sendFetches() {
Map fetchRequestMap = prepareFetchRequests();
for (Map.Entry entry : fetchRequestMap.entrySet()) {
final Node fetchTarget = entry.getKey();
final FetchSessionHandler.FetchRequestData data = entry.getValue();
// maxWaitMs 最大等待时间,默认500ms
// minBytes 最少抓取一个字节
// maxBytes 最大抓取多少数据 默认50m
final FetchRequest.Builder request = FetchRequest.Builder
.forConsumer(this.maxWaitMs, this.minBytes, data.toSend())
.isolationLevel(isolationLevel)
.setMaxBytes(this.maxBytes)
.metadata(data.metadata())
.toForget(data.toForget())
.rackId(clientRackId);
// 发送拉取数据的请求
RequestFuture future = client.send(fetchTarget, request);
future.addListener(new RequestFutureListener() {
// 拉取成功的回调
@Override
public void onSuccess(ClientResponse resp) {
// 把数据添加到队列里面
completedFetches.add(new CompletedFetch(partition, partitionData,
metricAggregator, batches, fetchOffset, responseVersion));
}
});
}
}
发送拉取数据的请求,拉取到的数据放到队列里面。
5.原理图完结。



