众所周知啊,java开启一个线程的方法一般有三种
第一种 继承Thread类,重新他的run方法
第二种 实现Runnable接口,再通过Thread类来开启新的线程
第三种 实现Callable接口搭配FutureTask 然后通过提交到线程池或者通过Thread类来开启新的线程
但其实无论是哪一种,开启一个新的线程都需要Thread类
可以先简单的了解一下这个过程,其实新的线程一开始调用的都是当前这个Thread类的run方法,这个Thread类也是有实现Runnable接口的,而我们重写他的run方法就可以达到开启一个线程执行我们的代码这样子的一个作用
而实现Runnable接口,然后把这个实现类传递给Thread类,为什么也可以达到这个效果呢,我们可以看一下Thread类的run方法的实现
public void run() {
if (target != null) {
target.run();
}
}
这个target就是我们传入的Runnable接口的实现类,所以通过第二种方法也是可以实现
下面来看看第三种,其实第三种FutureTask其实本质也是实现了Runnable接口的,实际的执行还是从FutureTask开始执行的
只不过第三种相比第二种多了一个功能,就是可以获得多线程执行的结果
这也是我们这篇文章所要写的,,来看看他是如何实现的
先看一下测试代码:
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// new 出FutureTask实现类 传进去一个Callable实现类
FutureTask futureTask = new FutureTask(new Callable() {
@Override
public String call() throws Exception {
return "call";
}
});
// 开启线程
new Thread(futureTask).start();
// 获得多线程执行结果
System.out.println(futureTask.get());
}
}
上面我们说过了,新的线程一开始都是调用当前这个Thread类的run方法,而Thread类的run方法就是调用我们传进去这个Runnable实现类的run方法,所以我们之间从FutureTask的run方法开始阅读
public void run() {
// state就是当前线程执行的状态
// 如果当前状态不为NEW 或者cas失败 就返回
// cas失败就代表有其他线程也在调用这个run方法出现了竞争
// 所以就让cas成功的那个线程去继续执行就好了
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
// 这边就是获得我们传入的Callable实现类
Callable c = callable;
// 判断传入的对象是否为null 并且状态为NEW状态
if (c != null && state == NEW) {
V result;
// 是否执行成功的标志位
boolean ran;
try {
// 这边调用我们传入的Callable实现类的call方法并且获得获得返回值
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
// 如果执行成功
if (ran)
// 设置结果值
set(result);
}
} finally {
runner = null;
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
我们来看看 set方法
protected void set(V v) {
// 使用cas去把状态从NEW 替换成 COMPLETING
// 这边可能会有人有疑问 只有一个线程在执行 为什么还要用cas呢
// 因为FutureTask还有其他方法 比如cancel方法 也会涉及到状态的转换
// 所以这边要使用cas
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 设置执行的结果的返回值
outcome = v;
// 设置状态的值
// 这边可能也有疑问,为什么有COMPLETING状态还要再次在这边设置一个NORMAL值呢
// 我个人的理解 因为可见性的问题 outcome 这个变量并没有加volatile关键字
// 然而状态这个变量有加volatile关键字
// 无论是根据先行发生原则 或者 内存屏障的作用
// 这边对状态的设置值 都可以保证 outcome这个变量 对其他线程可见
// 关于可见性的现象 大家可以参考我之前的博客
UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
// 这边再做一下最后的工作
finishCompletion();
}
}
我们进入finishCompletion方法
这个方法大家可能有点不知道为什么要这样子
现在这边解释一下为什么要这样子的原因
因为 可能会有很多个线程来调用FutureTask的get方法来获得线程的执行结果
然后可能线程还没有执行完成,那么这些线程就会陷入阻塞,既然陷入了阻塞,那么得完成之后再来唤醒这些线程啊,所以需要记录下这些阻塞的线程的信息,FutureTask是通过链表来记录的,链表的每一个节点都是WaitNode 类型,这是FutureTask的一个内部类,结构非常简单,读者可以自己看一下
下面这个方法其实就是把阻塞的线程统统唤醒
private void finishCompletion() {
// 判断阻塞链表里面 是否有节点
for (WaitNode q; (q = waiters) != null;) {
// 使用cas 把waiters替换为null waiters也就是这个链表的头节点
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
// 下面就是遍历链表 把阻塞的线程统统唤醒
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null;
q = next;
}
break;
}
}
done();
callable = null;
}
ok 以上就是 FutureTask run方法的执行流程
下面我们再分析一下 他的get方法 看看 是如何去阻塞
public V get() throws InterruptedException, ExecutionException {
// 获得当前状态
int s = state;
// 如果当前状态还是在COMPLETING或者之前
if (s <= COMPLETING)
// 进入这个方法
s = awaitDone(false, 0L);
return report(s);
}
我们进入awaitDone方法
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// 这边是判断需不需要有时间的阻塞
// 因为FutureTask的get方法 可以传入时间
// 就是可以不用一直等待
final long deadline = timed ? System.nanoTime() + nanos : 0L;
// 这是链表中的一个节点类型
WaitNode q = null;
// 这是判断节点是否有加进去链表的一个标志位
boolean queued = false;
for (;;) {
// 如果线程调用了打断的方法
if (Thread.interrupted()) {
// 先从链表中删除
removeWaiter(q);
// 在抛出异常
throw new InterruptedException();
}
// 获得当前的状态
int s = state;
// 如果以及完成了就直接返回
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
// 或者在COMPLETING状态 就先暂时让出cpu
else if (s == COMPLETING)
Thread.yield();
// 或者节点为null
else if (q == null)
// q 指向一个节点对象
q = new WaitNode();
// 或者节点已经不为null了 但是还没有加进链表里面去
else if (!queued)
// 使用cas加进去链表
// 这边采用的是头插法
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
// 或者 已经加进去链表了 又需要有时间的阻塞
else if (timed) {
// 下面就是调用有时间的阻塞的逻辑
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
// 否则就陷入阻塞
else
LockSupport.park(this);
}
}
好的 以上就是get方法的流程
我们来总结一下
其实 FutureTask就是调用我们传进去的Callable接口的实现类的call方法然后来获得执行的结果,最后把结果赋值给类的成员变量,通过状态值来判断当前的执行情况,然后执行完成之后在把阻塞的线程唤醒 其他线程去获得执行结果也是判断当前有没有执行完成,如果没有完成就把当前线程的信息使用链表的形式记录下来然后阻塞住 等执行完成之后唤醒 再去获得结果
以上就是我这篇文章的全部内容
如果有不对的地方 欢迎指正



