消费者源码解析,基于0.10.0.1版本
初始化 consumer 对象
// 主要初始化的几个组件:new几个对象,NetworkClient、ConsumerCoordinator、Fetcher
new KafkaConsumer(properties);
拉取数据
ConsumerRecords records = kafkaConsumer.poll(5000);
// 核心方法:
org.apache.kafka.clients.consumer.KafkaConsumer#pollonce
// 确保 coordinator 准备好,确定哪台服务器是 coordinator
coordinator.ensureCoordinatorReady();
org.apache.kafka.clients.consumer.internals.AbstractCoordinator#ensureCoordinatorReady
// 寻找该组对应的 coordinator,返回一个 future 对象,之后对该对象进行轮询,直到拿到返回值。拿到返回值后判断 coordinator 是否正常
org.apache.kafka.clients.consumer.internals.AbstractCoordinator#sendGroupCoordinatorRequest
// 寻找一台负载较低,未完成请求比较少的节点,连接该节点获取coordinator
Node node = this.client.leastLoadedNode();
return client.send(node, ApiKeys.GROUP_COORDINATOR, metadataRequest)
.compose(new RequestFutureAdapter() {
@Override
public void onSuccess(ClientResponse response, RequestFuture future) {
// 根据服务端返回的相应信息,封装coordinator
handleGroupmetadataResponse(response, future);
// start sending heartbeats only if we have a valid generation
if (generation > 0)
// 开启心跳任务
heartbeatTask.reset();
// 执行任务调度
client.schedule(this, now);
// 往一个延时队列里面塞入一个任务
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient#schedule
delayedTasks.add(task, at);
// 这个延时队列有个死循环的代码,只要往里面加任务,就会拿出来执行
public void poll(long now) {
while (!tasks.isEmpty() && tasks.peek().timeout <= now) {
Entry entry = tasks.poll();
entry.task.run(now);
}
}
}
});
// 服务端执行 ApiKeys.GROUP_COORDINATOR 的请求逻辑。
// kafka.server.KafkaApis是server端执行各种kafka请求的地方
case ApiKeys.GROUP_COORDINATOR => handleGroupCoordinatorRequest(request)
kafka.server.KafkaApis#handleGroupCoordinatorRequest
// 计算 coordinator 在 __consumer_offsets 主题的哪个分区下
kafka.coordinator.GroupCoordinator#partitionFor
// group id 的 hash 值对 __consumer_offsets 主题的分区数取模
def partitionFor(groupId: String): Int = Utils.abs(groupId.hashCode) % groupmetadataTopicPartitionCount
// coordinator准备好后,就开始发送 join group 请求,同时选举出 leader consumer
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#ensurePartitionAssignment
if (subscriptions.partitionsAutoAssigned()) // 调用 subscribe 方法时会调用 setSubscriptionType(SubscriptionType.AUTO_TOPICS); 从而这个判断会返为true
coordinator.ensurePartitionAssignment();
// join group 的方法
org.apache.kafka.clients.consumer.internals.AbstractCoordinator#ensureActiveGroup
org.apache.kafka.clients.consumer.internals.AbstractCoordinator#sendJoinGroupRequest
return client.send(coordinator, ApiKeys.JOIN_GROUP, request)
.compose(new JoinGroupResponseHandler());
// 服务端执行 ApiKeys.JOIN_GROUP 的请求逻辑
case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
kafka.server.KafkaApis#handleJoinGroupRequest
kafka.coordinator.GroupCoordinator#handleJoinGroup
// 判断当前组的状态处于哪个阶段,执行不同的方法
kafka.coordinator.GroupCoordinator#doJoinGroup
// 这里决定了哪个 consumer 是 leader consumer
kafka.coordinator.Groupmetadata#add
def add(memberId: String, member: Membermetadata) {
assert(supportsProtocols(member.protocols))
// 从这个方法可以看出哪个 consumer 先来,哪个就是 leader consumer
if (leaderId == null)
leaderId = memberId
members.put(memberId, member)
}
// 回调函数 compose(new JoinGroupResponseHandler()) 这里当发生 join group 请求返回成功后会回调
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.JoinGroupResponseHandler#handle
// 组里的所有成员都会发送 sync group 的请求,但是只有 leader 需要制定消费数据的方案
if (joinResponse.isLeader()) {
onJoinLeader(joinResponse).chain(future);
org.apache.kafka.clients.consumer.internals.AbstractCoordinator#onJoinLeader
// 制定消费数据方案
org.apache.kafka.clients.consumer.internals.AbstractCoordinator#performAssignment
// 发送 sync group 请求,顺便带上消费数据方案
org.apache.kafka.clients.consumer.internals.AbstractCoordinator#sendSyncGroupRequest
return client.send(coordinator, ApiKeys.SYNC_GROUP, request)
.compose(new SyncGroupResponseHandler());
} else {
onJoinFollower().chain(future);
}
// 服务端执行 ApiKeys.SYNC_GROUP 的请求逻辑
case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
kafka.server.KafkaApis#handleSyncGroupRequest
// 根据组的不同状态执行不同逻辑
kafka.coordinator.GroupCoordinator#doSyncGroup
// 由于 join group 已经完成,所以状态应该是AwaitingSync,这里不管是 leader 还是 follower 都会进入这里
case AwaitingSync =>
// 准备好回调,用来等下下发数据消费方案
group.get(memberId).awaitingSyncCallback = responseCallback
// if (memberId == group.leaderId)
// 下发数据消费方案到各个消费者
setAndPropagateAssignment(group, assignment)
// 下发数据消费方案
kafka.coordinator.GroupCoordinator#propagateAssignment
for (member <- group.allMembermetadata) {
if (member.awaitingSyncCallback != null) {
// 调用刚才准备好的的回调下发数据消费方案
member.awaitingSyncCallback(member.assignment, errorCode)
member.awaitingSyncCallback = null
// reset the session timeout for members after propagating the member's assignment.
// This is because if any member's session expired while we were still awaiting either
// the leader sync group or the storage callback, its expiration will be ignored and no
// future heartbeat expectations will not be scheduled.
completeAndScheduleNextHeartbeatExpiration(group, member)
}
// 更改消费者组状态为 stable
group.transitionTo(Stable)
未完待续