栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink源码解析系列--FutureCompletingBlockingQueue阻塞队列

Flink源码解析系列--FutureCompletingBlockingQueue阻塞队列

本文的Flink源码版本为: 1.15-SNAPSHOT,读者可自行从Github clone.

Flink 在实现 SourceReaderbase 类时,并没有直接使用 JDK 自带的BlockingQueue 阻塞队列来缓冲 fetcher 线程获取的元素,而是自定义1个阻塞队列,即 FutureCompletingBlockingQueue 类。

FutureCompletingBlockingQueue 的入队实现基本和 ArrayBlockingQueue实现类似,当队列已满时,会通过 Condition 阻塞等待队列空出位置,当队列中空出位置时,该线程会被 siganl 唤醒,执行入队操作。

在出队实现上,如take()方法,FutureCompletingBlockingQueue 并没有像入队操作一样基于 Condition 的阻塞唤醒机制,而是采用 CompletableFuture 异步获取队列的可用状态,当队列可用时(代表有可出队的元素),整个过程完全异步,避免了队列为空时,执行 take() 方法的线程被阻塞挂起。

如果你不熟悉啥是 CompletableFuture,可以参阅我之前写过的一篇博客:

Java多线程系列–Future接口和CompletableFuture类

下面看一下 FutureCompletingBlockingQueue 的具体实现。

类构造
// 基于 CompletableFuture 表示队列可用性
public static final CompletableFuture AVAILABLE = getAvailableFuture();

// 阻塞队列的容量
private final int capacity;

// 当前线程持有的 future 队列可用性
private CompletableFuture currentFuture;

// 锁
private final Lock lock;

// 元素的容器
@GuardedBy("lock")
private final Queue queue;

// 等待插入元素的线程队列
@GuardedBy("lock")
private final Queue notFull;

// 等待插入元素的线程所持有的 Condition 及其 Flag
@GuardedBy("lock")
private ConditionAndFlag[] putConditionAndFlags;

public FutureCompletingBlockingQueue() {
	this(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY.defaultValue());
}

public FutureCompletingBlockingQueue(int capacity) {
	checkArgument(capacity > 0, "capacity must be > 0");
	this.capacity = capacity;
	// 底层的元素容器是1个双向队列 ArrayDeque
	this.queue = new ArrayDeque<>(capacity);
	// 加锁基于 ReentrantLock
	this.lock = new ReentrantLock();
	this.putConditionAndFlags = new ConditionAndFlag[1];
	this.notFull = new ArrayDeque<>();

	// 因为初始队列为空,所以初始 currentFuture 为不可用
	this.currentFuture = new CompletableFuture<>();
}
入队

FutureCompletingBlockingQueue 提供了 put(int threadIndex, T element) 方法来完成入队操作。

public boolean put(int threadIndex, T element) throws InterruptedException {
	if (element == null) {
		throw new NullPointerException();
	}
	lock.lockInterruptibly();
	try {
		// 若队列长度大于等于容量
		while (queue.size() >= capacity) {
			// 检查线程是否是被唤醒的线程
			// 若是被唤醒的线程,则重置唤醒状态为 false,直接返回 false,元素插入失败
			if (getAndResetWakeUpFlag(threadIndex)) {
				return false;
			}
			// 创建或获取该线程对应的 Condition,并插入到 notFull队列中
			// 调用 cond.await() 进入阻塞,等待 signal 唤醒
			waitonPut(threadIndex);
		}
		// 执行到这的代码有2种情况:
		// (1) queue.size() >= capacity,插入线程先阻塞,等待 queue 中有空位后被 signal 唤醒
		// (2) queue.size() < capacity,代表 queue 当前就有空位,直接进行插入
		enqueue(element);
		return true;
	} finally {
		lock.unlock();
	}
}

@GuardedBy("lock")
private boolean getAndResetWakeUpFlag(int threadIndex) {
	maybeCreateCondition(threadIndex);
	// 如果当前线程的唤醒标记为 true,就将唤醒标记重置为 false,并返回 true
	// 如果当前线程的唤醒标记为 false,则直接返回 false
	if (putConditionAndFlags[threadIndex].getWakeUp()) {
		putConditionAndFlags[threadIndex].setWakeUp(false);
		return true;
	}
	return false;
}

@GuardedBy("lock")
private void waitonPut(int fetcherIndex) throws InterruptedException {
	maybeCreateCondition(fetcherIndex);
	Condition cond = putConditionAndFlags[fetcherIndex].condition();
	// 每个等待插入元素的线程均会依次添加到 notFull 队列中
	notFull.add(cond);
	cond.await();
}

@GuardedBy("lock")
private void enqueue(T element) {
	// 获取该元素入队前的队列长度
	final int sizeBefore = queue.size();
	// 将该元素插入队列
	queue.add(element);
	// 首元素入队后,标记该队列为可用状态
	if (sizeBefore == 0) {
		moveToAvailable();
	}
	if (sizeBefore < capacity - 1 && !notFull.isEmpty()) {
		// 通知下一个等待插入元素的线程
		signalNextPutter();
	}
}

@GuardedBy("lock")
private void moveToAvailable() {
	final CompletableFuture current = currentFuture;
	if (current != AVAILABLE) {
		currentFuture = AVAILABLE;
		current.complete(null);
	}
}

@GuardedBy("lock")
private void signalNextPutter() {
	if (!notFull.isEmpty()) {
		// 从等待队列中取出队头,进行 siganl 唤醒
		notFull.poll().signal();
	}
}
出队

FutureCompletingBlockingQueue 提供了2种出队方法:

poll()

若当前队列不为空,执行元素出队并返回;若当前队列为空,则直接返回 null。

当队列为空时,该方法是非阻塞的。

take()

若当前队列不为空,执行元素出队并返回;若当前队列为空,则阻塞等待队列不为空时,然后再执行元素出队并返回。

当队列为空时,该方法是阻塞的。

poll()
public T poll() {
	lock.lock();
	try {
		// 若 queue 为空,则将该队列标记为不可用,直接返回 null
		if (queue.size() == 0) {
			moveToUnAvailable();
			return null;
		}
		// 通过 dequeue() 方法执行出队操作
		return dequeue();
	} finally {
		lock.unlock();
	}
}

@GuardedBy("lock")
private T dequeue() {
	// 获取出队前的队列长度
	final int sizeBefore = queue.size();
	// 执行出队操作
	final T element = queue.poll();
	// 若出队前的队列长度为 capacity
	// 此时出队了1个元素,所以队列中有了1个空缺
	// 同时,等待插入元素的线程队列不为空
	if (sizeBefore == capacity && !notFull.isEmpty()) {
		// 取出等待队列中的头线程,siganl 唤醒其进行元素插入
		signalNextPutter();
	}
	// 尾元素出队后,标记该队列不可用
	if (queue.isEmpty()) {
		moveToUnAvailable();
	}
	// 返回出队元素
	return element;
}
take()
@VisibleForTesting
public T take() throws InterruptedException {
	T next;
	// 通过 poll() 非阻塞执行出队操作
	// 若 poll() 返回元素非 null,则证明出队成功,直接跳过 while 循环,返回该元素
	// 若 poll() 返回元素为 null,则方法会阻塞在 while 循环上
	// 在 while 循环内调用 getAvailabilityFuture().get()
	// 当队列中不为空时(代表有可出队的元素),则异步通知该线程继续执行,进行 while 循环的下一次 (next = poll()) == null 操作
	// 之所以用 CompletableFuture 来进行队列可用状态的获取,主要是因为 CompletableFuture 是完全异步的,避免线程 busy 等待
	while ((next = poll()) == null) {
		// use the future to wait for availability to avoid busy waiting
		try {
			getAvailabilityFuture().get();
		} catch (ExecutionException | CompletionException e) {
			// this should never happen, but we propagate just in case
			throw new FlinkRuntimeException("exception in queue future completion", e);
		}
	}
	return next;
}

public CompletableFuture getAvailabilityFuture() {
	return currentFuture;
}

FutureCompletingBlockingQueue 的其他方法实现都比较简单,读者可自行阅读。

本文到此结束,感谢阅读!

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/773968.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号