- 简介
- 继承体系
- 源码分析
- Runnable接口
- Callable接口
- Future接口
- 成员属性
- 构造方法
- 成员方法
- run()
- get()
- cancel()
- 总结
在Java中一般通过继承Thread类或者实现Runnable接口这两种方式来创建多线程,但是这两种方式都有个缺陷,就是不能在执行完成后获取执行的结果,因此Java 1.5之后提供了Callable和Future接口,通过它们就可以在任务执行完毕之后得到任务的执行结果。
继承体系 源码分析 Runnable接口public interface Runnable {
public abstract void run();
}
可以看到Runnable接口既没有返回值也没有抛出异常
Callable接口//Runnable是没有返回结果的任务,而Callable则是有返回结果的任务 public interface Callable{ V call() throws Exception; }
可以看到Callable是个泛型接口,泛型V就是要call()方法返回的类型。Callable接口和Runnable接口很像,都可以被另外一个线程执行,但是,Runnable不会返回数据也不能抛出异常。
Future接口public interface Future{ boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
- Future只是一个接口,不能直接用来创建对象,FutureTask是Future的实现类
// 表示当前task的状态 private volatile int state; // 表示当前任务尚未执行 private static final int NEW = 0; // 表示当前任务正在结束,尚未完全结束,一种临界状态 private static final int COMPLETING = 1; // 表示当前任务正常结束 private static final int NORMAL = 2; // 表示当前任务执行过程中发生了异常。 内部封装的 callable.run() 向上抛出异常了 private static final int EXCEPTIonAL = 3; // 表示当前任务被取消 private static final int CANCELLED = 4; // 表示当前任务中断中.. private static final int INTERRUPTING = 5; // 表示当前任务已中断 private static final int INTERRUPTED = 6; private Callable构造方法callable; private Object outcome; // non-volatile, protected by state reads/writes private volatile Thread runner; private volatile WaitNode waiters;
public FutureTask(Callablecallable) { // 非空校验 if (callable == null) throw new NullPointerException(); // callable就是我们自己的任务 this.callable = callable; // 设置当前任务状态为NEW: 表示当前任务尚未执行 this.state = NEW; // ensure visibility of callable 确保可调用文件的可见性 } public FutureTask(Runnable runnable, V result) { // 使用装饰者模式将runnable转换为了callable接口,外部线程通过get获取 // 当前任务执行结束时,结果可能为 null 也可能为传进来的值 this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable } // callable public static Callable callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter (task, result); } // RunnableAdapter static final class RunnableAdapter implements Callable { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }
- 当参数为Callable时,就直接赋值给callable
- 当参数为Runnable时,就还要传入一个返回结果,并且用装饰者模式把Runnable装饰成Callable
RunnableFuture 接口
public interface RunnableFutureextends Runnable, Future { void run(); }
- RunnableFuture 接口继承了Runnable和Future接口。而FutureTask重写run方法
- 运行当前任务,其中涉及setException、set、handlePossibleCancellationInterrupt
public void run() {
if (state != NEW ||
!RUNNER.compareAndSet(this, null, Thread.currentThread()))
return;
// 如果执行到这里,说明当前task一定是 NEW 状态,而且当前线程也抢占TASK成功!
try {
// callable 就是我们自己封装逻辑的callable任务 或者装饰后的runnable
Callable c = callable;
// 再次检验、防止空指针异常、防止外部线程 cancel掉当前任务。
if (c != null && state == NEW) {
// 结果的引用
V result;
// true 表示callable.run 代码块执行成功 未抛出异常
// false 表示callable.run 代码块执行失败 抛出异常
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
// 设置异常信息
setException(ex);
}
if (ran)
// 正常运行结束
// 设置正常结束的结果
set(result);
}
} finally {
// 将当前执行任务的线程置为null
runner = null;
// 当前任务的状态
int s = state;
if (s >= INTERRUPTING)
// 说明当前任务处于中断中或者已中断状态
// 让出cpu,不断的判断是否是中断中...
handlePossibleCancellationInterrupt(s);
}
}
- 判断当前任务的state是否等于NEW(任务未执行),如果不为NEW则说明任务或者已经执行过,或者已经被取消,直接返回。
- 如果状态为NEW则接着会通过unsafe类把任务执行线程引用采用CAS保存在runner字段中,如果保存失败,则直接返回。
- 执行任务,设置任务返回结果。
- 如果任务执行发生异常,则调用setException()方法保存异常信息
- set 以CAS的方式设置结果v给outcome
protected void set(V v) {
// 使用CAS方式设置当前任务状态为完成中(一种临界状态)
// 也有可能会失败,就是其他线程在本线程CAS的时候,就把task取消了
if (STATE.compareAndSet(this, NEW, COMPLETING)) {
outcome = v;
// 将结果v赋值给outcome之后,马上会将当前任务状态修改为NORMAL(正常结束状态)
STATE.setRelease(this, NORMAL); // final state 最终的状态
// 唤醒之前挂起的线程
finishCompletion();
}
}
- finishCompletion 移除并唤醒所有等待线程,执行done,置空callable
private void finishCompletion() {
// 遍历阻塞队列 q指向waiters链表的头结点
for (WaitNode q; (q = waiters) != null;) {
// 使用cas设置 waiters为null
// 为了防止外部线程使用cancel取消当前任务,也会触发finishCompletion方法。(小概率事件)
if (WAITERS.weakCompareAndSet(this, q, null)) {
// 自旋
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
// 唤醒当前节点对应的线程(在awaitDone方法最后一个else判断中park,在此处唤醒)
LockSupport.unpark(t);
}
WaitNode next = q.next;
// next == null 说明是最后一个节点,则直接break即可
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
// 模板方法,可以被覆盖
done();
// 将callable 设置为null helpGC
callable = null; // to reduce footprint
}
- setException以CAS的方式设置异常信息t给outcome
protected void setException(Throwable t) {
if (STATE.compareAndSet(this, NEW, COMPLETING)) {
// 引用的是 callable 向上层抛出来的异常。
outcome = t;
// 将当前任务的状态 修改为 EXCEPTIonAL(发生了异常)
STATE.setRelease(this, EXCEPTIONAL); // final state
finishCompletion();
}
}
- handlePossibleCancellationInterrupt,状态为中断中…则会让出cpu。在cancel里面会有设置为中断中状态
private void handlePossibleCancellationInterrupt(int s) {
// It is possible for our interrupter to stall before getting a
// chance to interrupt us. Let's spin-wait patiently.
if (s == INTERRUPTING)
while (state == INTERRUPTING)
// 如果是中断中,则让出cpu
Thread.yield(); // wait out pending interrupt
// assert state == INTERRUPTED;
// We want to clear any interrupt we may have received from
// cancel(true). However, it is permissible to use interrupts
// as an independent mechanism for a task to communicate with
// its caller, and there is no way to clear only the
// cancellation interrupt.
//
// Thread.interrupted();
}
get()
- 获取当前任务执行结束后得到的结果,其中涉及awaitDone、report
public V get() throws InterruptedException, ExecutionException {
int s = state;
// COMPLETING(尚未完全结束,一种临界状态)
if (s <= COMPLETING)
// 说明当前任务还没有结束,当前线程就会被阻塞
// awaitDone执行完后会返回task当前状态,如果该方法执行期间,task被中断了,则会直接抛出中断异常:
// awaitDone是futureTask实现阻塞的关键方法: 等待任务执行完毕,如果任务取消或者超时则停止!
s = awaitDone(false, 0L);
return report(s);
}
- 如果此时任务已经执行完毕则会直接返回任务结果,如果任务还没执行完毕,则调用方会阻塞直到任务执行结束返回结果为止
- awaitDone 是futureTask实现阻塞的关键方法
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// deadline=0 不会超时
final long deadline = timed ? System.nanoTime() + nanos : 0L;
// 引用当前线程封装成 WaitNode对象(头插、头取的一个队列。)
WaitNode q = null;
// 表示当前线程 waitNode对象是否入队/压栈
boolean queued = false;
// 自旋
for (;;) {
// 判断阻塞线程是否被中断,如果被中断则在等待队列中删除该节点并抛出InterruptedException异常
if (Thread.interrupted()) {
// 当前线程节点出队
removeWaiter(q);
// 上抛,使get方法抛出中断异常。
throw new InterruptedException();
}
// 假设当前线程是被其它线程使用unpark(thread) 唤醒的话,会正常自旋,走下面逻辑:
// 获取当前任务最新状态
int s = state;
// 条件成立:说明当前任务已经有结果了.. (可能是正常完成、异常、中断、取消等等)
if (s > COMPLETING) {
// 条件成立:说明已经为当前线程创建过WaitNode了,此时需要将 node.thread = null helpGC
if (q != null)
q.thread = null;
// 直接返回当前状态.
return s;
}
// 条件成立:说明当前任务接近完成状态(表示任务已经结束但是任务执行线程还没来得及给outcome赋值)
// 这里让当前线程释放cpu让其他线程优先执行 ,进行下一次抢占cpu:
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
// 条件成立:第一次自旋,当前线程还未创建 WaitNode 对象,此时为当前线程创建 WaitNode对象,也就是创建一个结点
else if (q == null)
q = new WaitNode();
// 条件成立:第二次自旋,当前线程已经创建 WaitNode对象了,但是node对象还未入队
else if (!queued){
// 当前线程node节点 next 指向原队列的头节点 waiters 一直指向队列的头!
q.next = waiters;
// cas方式设置waiters引用指向当前线程node, 成功的话 queued == true 否则,可能其它线程先你一步入队了。
queued = UNSAFE.compareAndSwapObject(this, waitersOffset, waiters, q);
}
// 第三次自旋,会到这里:表示是否设置了超时时间
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
// 已经超时的话,移除等待节点
removeWaiter(q);
return state;
}
// 未超时,将当前线程挂起指定时间
LockSupport.parkNanos(this, nanos);
}
else
// 走到这里,当前get操作的线程就会被park了。线程状态会变为 WAITING状态,相当于休眠了..
// 除非有其它线程将你唤醒 或者 将当前线程中断。
// 如果当前线程被其他线程唤醒,醒来时,还是从这里向下继续执行(继续进入自旋for进行条件判断)
// (在上面的finishCompletion中会唤醒这个挂起的线程!)
LockSupport.park(this);
}
}
- 判断调用get()的线程是否被其他线程中断,如果是的话则在等待队列中删除对应节点然后抛出InterruptedException异常。
- 获取任务当前状态,如果当前任务状态大于COMPLETING则表示任务执行完成,则把thread字段置null(协助GC)并返回结果。
- 如果任务处于COMPLETING状态,则表示任务已经处理完成(正常执行完成或者执行出现异常),但是执行结果或者异常原因还没有保存到outcome字段中。这个时候调用线程让出执行权让其他线程优先执行。
- 如果等待节点为空,则构造一个等待节点WaitNode。
- 如果第四步中新建的节点还没如队列,则CAS的把该节点加入waiters队列的首节点。
- 阻塞等待。
- report去获取最终task执行结束得到的结果
@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
// 正常情况下,outcome 保存的是callable运行结束的结果
// 非正常情况下,保存的是 callable 抛出的异常。
Object x = outcome;
// 条件成立(正常情况):当前任务状态正常结束
if (s == NORMAL)
// 直接返回callable运算结果
return (V)x;
// 条件成立(非正常情况):当前任务是被取消或中断状态
if (s >= CANCELLED)
// 抛异常!
throw new CancellationException();
// 执行到这,说明callable接口实现中,是有bug的...
throw new ExecutionException((Throwable)x);
}
cancel()
- 将当前线程的任务取消(中断)掉
public boolean cancel(boolean mayInterruptIfRunning) {
// tate == NEW 成立,表示当前任务处于运行中或者处于线程池任务队列中.
// mayInterruptIfRunning为true则修改为中断中..,为false则修改为任务被取消
// 条件成立:说明CAS修改状态成功,可以去执行下面逻辑了,否则返回false,表示cancel失败。
if (!(state == NEW && STATE.compareAndSet
(this, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
// 执行当前FutureTask 的线程,有可能现在是null,是null 的情况是: 当前任务在 队列中,还没有线程获取到它呢
Thread t = runner;
if (t != null)
// 给runner线程一个中断信号
t.interrupt();
} finally { // final state
// 设置任务状态为 中断完成。
STATE.setRelease(this, INTERRUPTED);
}
}
} finally {
// 唤醒所有get()阻塞的线程。
finishCompletion();
}
return true;
}
- 根据mayInterruptIfRunning是否为true,CAS设置状态为INTERRUPTING或CANCELLED,设置成功,继续第二步,否则返回false
- 如果mayInterruptIfRunning为true,调用runner.interupt(),设置状态为INTERRUPTED
- 唤醒所有在get()方法等待的线程
- 通过FutureTask不仅能够获取任务执行的结果,还有感知到任务执行的异常,甚至还可以取消任务
- FutureTask其实就是典型的异常调用的实现方式
- 比如RPC框架常用的调用方式有同步调用、异步调用,其实它们本质上都是异步调用,它们就是用FutureTask的方式来实现的
参考文章
线程池源码分析_01 FutureTask源码分析
FutureTask源码解析(JDK1.8)
FutureTask源码解读



