本文的Flink源码版本为: 1.15-SNAPSHOT,读者可自行从Github clone.
Flink 在实现 SourceReaderbase 类时,并没有直接使用 JDK 自带的BlockingQueue 阻塞队列来缓冲 fetcher 线程获取的元素,而是自定义1个阻塞队列,即 FutureCompletingBlockingQueue 类。
FutureCompletingBlockingQueue 的入队实现基本和 ArrayBlockingQueue实现类似,当队列已满时,会通过 Condition 阻塞等待队列空出位置,当队列中空出位置时,该线程会被 siganl 唤醒,执行入队操作。
在出队实现上,如take()方法,FutureCompletingBlockingQueue 并没有像入队操作一样基于 Condition 的阻塞唤醒机制,而是采用 CompletableFuture 异步获取队列的可用状态,当队列可用时(代表有可出队的元素),整个过程完全异步,避免了队列为空时,执行 take() 方法的线程被阻塞挂起。
如果你不熟悉啥是 CompletableFuture,可以参阅我之前写过的一篇博客:
Java多线程系列–Future接口和CompletableFuture类
下面看一下 FutureCompletingBlockingQueue 的具体实现。
类构造// 基于 CompletableFuture 表示队列可用性 public static final CompletableFuture入队AVAILABLE = getAvailableFuture(); // 阻塞队列的容量 private final int capacity; // 当前线程持有的 future 队列可用性 private CompletableFuture currentFuture; // 锁 private final Lock lock; // 元素的容器 @GuardedBy("lock") private final Queue queue; // 等待插入元素的线程队列 @GuardedBy("lock") private final Queue notFull; // 等待插入元素的线程所持有的 Condition 及其 Flag @GuardedBy("lock") private ConditionAndFlag[] putConditionAndFlags; public FutureCompletingBlockingQueue() { this(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY.defaultValue()); } public FutureCompletingBlockingQueue(int capacity) { checkArgument(capacity > 0, "capacity must be > 0"); this.capacity = capacity; // 底层的元素容器是1个双向队列 ArrayDeque this.queue = new ArrayDeque<>(capacity); // 加锁基于 ReentrantLock this.lock = new ReentrantLock(); this.putConditionAndFlags = new ConditionAndFlag[1]; this.notFull = new ArrayDeque<>(); // 因为初始队列为空,所以初始 currentFuture 为不可用 this.currentFuture = new CompletableFuture<>(); }
FutureCompletingBlockingQueue 提供了 put(int threadIndex, T element) 方法来完成入队操作。
public boolean put(int threadIndex, T element) throws InterruptedException {
if (element == null) {
throw new NullPointerException();
}
lock.lockInterruptibly();
try {
// 若队列长度大于等于容量
while (queue.size() >= capacity) {
// 检查线程是否是被唤醒的线程
// 若是被唤醒的线程,则重置唤醒状态为 false,直接返回 false,元素插入失败
if (getAndResetWakeUpFlag(threadIndex)) {
return false;
}
// 创建或获取该线程对应的 Condition,并插入到 notFull队列中
// 调用 cond.await() 进入阻塞,等待 signal 唤醒
waitonPut(threadIndex);
}
// 执行到这的代码有2种情况:
// (1) queue.size() >= capacity,插入线程先阻塞,等待 queue 中有空位后被 signal 唤醒
// (2) queue.size() < capacity,代表 queue 当前就有空位,直接进行插入
enqueue(element);
return true;
} finally {
lock.unlock();
}
}
@GuardedBy("lock")
private boolean getAndResetWakeUpFlag(int threadIndex) {
maybeCreateCondition(threadIndex);
// 如果当前线程的唤醒标记为 true,就将唤醒标记重置为 false,并返回 true
// 如果当前线程的唤醒标记为 false,则直接返回 false
if (putConditionAndFlags[threadIndex].getWakeUp()) {
putConditionAndFlags[threadIndex].setWakeUp(false);
return true;
}
return false;
}
@GuardedBy("lock")
private void waitonPut(int fetcherIndex) throws InterruptedException {
maybeCreateCondition(fetcherIndex);
Condition cond = putConditionAndFlags[fetcherIndex].condition();
// 每个等待插入元素的线程均会依次添加到 notFull 队列中
notFull.add(cond);
cond.await();
}
@GuardedBy("lock")
private void enqueue(T element) {
// 获取该元素入队前的队列长度
final int sizeBefore = queue.size();
// 将该元素插入队列
queue.add(element);
// 首元素入队后,标记该队列为可用状态
if (sizeBefore == 0) {
moveToAvailable();
}
if (sizeBefore < capacity - 1 && !notFull.isEmpty()) {
// 通知下一个等待插入元素的线程
signalNextPutter();
}
}
@GuardedBy("lock")
private void moveToAvailable() {
final CompletableFuture current = currentFuture;
if (current != AVAILABLE) {
currentFuture = AVAILABLE;
current.complete(null);
}
}
@GuardedBy("lock")
private void signalNextPutter() {
if (!notFull.isEmpty()) {
// 从等待队列中取出队头,进行 siganl 唤醒
notFull.poll().signal();
}
}
出队
FutureCompletingBlockingQueue 提供了2种出队方法:
poll()
若当前队列不为空,执行元素出队并返回;若当前队列为空,则直接返回 null。
当队列为空时,该方法是非阻塞的。
take()
若当前队列不为空,执行元素出队并返回;若当前队列为空,则阻塞等待队列不为空时,然后再执行元素出队并返回。
当队列为空时,该方法是阻塞的。
poll()public T poll() {
lock.lock();
try {
// 若 queue 为空,则将该队列标记为不可用,直接返回 null
if (queue.size() == 0) {
moveToUnAvailable();
return null;
}
// 通过 dequeue() 方法执行出队操作
return dequeue();
} finally {
lock.unlock();
}
}
@GuardedBy("lock")
private T dequeue() {
// 获取出队前的队列长度
final int sizeBefore = queue.size();
// 执行出队操作
final T element = queue.poll();
// 若出队前的队列长度为 capacity
// 此时出队了1个元素,所以队列中有了1个空缺
// 同时,等待插入元素的线程队列不为空
if (sizeBefore == capacity && !notFull.isEmpty()) {
// 取出等待队列中的头线程,siganl 唤醒其进行元素插入
signalNextPutter();
}
// 尾元素出队后,标记该队列不可用
if (queue.isEmpty()) {
moveToUnAvailable();
}
// 返回出队元素
return element;
}
take()
@VisibleForTesting
public T take() throws InterruptedException {
T next;
// 通过 poll() 非阻塞执行出队操作
// 若 poll() 返回元素非 null,则证明出队成功,直接跳过 while 循环,返回该元素
// 若 poll() 返回元素为 null,则方法会阻塞在 while 循环上
// 在 while 循环内调用 getAvailabilityFuture().get()
// 当队列中不为空时(代表有可出队的元素),则异步通知该线程继续执行,进行 while 循环的下一次 (next = poll()) == null 操作
// 之所以用 CompletableFuture 来进行队列可用状态的获取,主要是因为 CompletableFuture 是完全异步的,避免线程 busy 等待
while ((next = poll()) == null) {
// use the future to wait for availability to avoid busy waiting
try {
getAvailabilityFuture().get();
} catch (ExecutionException | CompletionException e) {
// this should never happen, but we propagate just in case
throw new FlinkRuntimeException("exception in queue future completion", e);
}
}
return next;
}
public CompletableFuture getAvailabilityFuture() {
return currentFuture;
}
FutureCompletingBlockingQueue 的其他方法实现都比较简单,读者可自行阅读。
本文到此结束,感谢阅读!



