1、FutureTask是一个可取消的异步计算。这个类是Future的实现类,有开始和取消一个计算的方法,如果一个计算已经完成可以查看结果。如果在计算没有完成的情况下调用get获取计算结果会阻塞。且一旦任务完成后,计算不能重新开始或被取消,除非计算被runAndReset调用执行。
2、FutureTask被用来去封装一个Callable或者Runnable,一个FutureTask能够被submit作为一个Executor
3、FutureTask 的线程安全由CAS来保证。
二、源码分析1、成员属性
public class FutureTaskimplements RunnableFuture { //state表示的任务的状态 private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIonAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; //任务 private Callable callable; //存储任务完成以后的结果 private Object outcome; //执行当前任务的线程 private volatile Thread runner; //执行当前任务被阻塞的线程 private volatile WaitNode waiters; }
可能有的状态转换:
NEW -> COMPLETING -> NORMAL NEW -> COMPLETING -> EXCEPTIonAL NEW -> CANCELLED NEW -> INTERRUPTING -> INTERRUPTED
注意:state用volatile修饰的,如果在多线程并发的情况下,某一个线程改变了任务的状态,其他线程都能够立马知道,保证了state字段的可见性。
2、构造函数
public FutureTask(Callablecallable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; } public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; }
很好的诠释了FutureTask封装了Runnable或Callable,构造完成后将任务的状态变为NEW。同时注意,封装Runnable时用的Executors的静态方法callable
顺带看下Executors.callable()这个方法,这个方法的功能是把Runnable转换成Callable,代码如下:
public staticCallable callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter (task, result); } static final class RunnableAdapter implements Callable { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }
所以,FutureTask封装Runnable使用了适配器模式的设计模式
3、核心方法
//运行任务的方法
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable c = callable; //得到当前任务
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call(); //当前任务调用call方法,执行,同时,执行完后将结果返回
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran) //表示任务执行成功
set(result); //CAS改变任务的状态从NEW->COMPLETING->NORMAL,同时将任务返回的结果保存到outcome属性中,再移除并唤醒所有等待线程
}
} finally {
runner = null;
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v; //将任务成功执行完后返回的结果保存到outcome中
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 最终的状态,表示任务结束
finishCompletion(); //移除并唤醒所有等待线程
}
}
//该方法用于移除并唤醒所有等待线程
private void finishCompletion() {
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t); //唤醒
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null;
}
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try {
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt(); //打断
} finally { // 设置成为最终态INTERRUPTED
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion(); //移除并唤醒所有等待线程
}
return true;
}
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L); //如果任务没有完成或者其他的问题,将阻塞;创建一个新节点存入阻塞栈中
return report(s);
}
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
三、示例
常用使用方式:
- 第一种方式: Future + ExecutorService
- 第二种方式: FutureTask + ExecutorService
- 第三种方式: FutureTask + Thread
第一种方式:Future + ExecutorService
public class FutureDemo {
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
Future future = executorService.submit(new Callable
第二种方式:FutureTask + ExecutorService
ExecutorService executor = Executors.newCachedThreadPool();
Task task = new Task();
FutureTask futureTask = new FutureTask(task);
executor.submit(futureTask);
第三种方式:FutureTask + Thread
FutureTask四、总结futureTask = new FutureTask (new Task()); Thread thread = new Thread(futureTask); thread.setName("Task thread"); thread.start();
1、FutureTask用来封装Runnable或者Callable接口,可以当成一个任务。
2、在Java并发程序中FutureTask表示一个可以取消的异步运算。它有启动和取消运算、查询运算是否完成和取回运算结果等方法。只有当运算完成的时候结果才能取回,如果运算尚未完成get方法将会阻塞。一个FutureTask对象可以对调用了Callable和Runnable的对象进行包装,由于FutureTask也是调用了Runnable接口所以它可以提交给Executor来执行。
3、FutureTask可用于异步获取执行结果或取消执行任务的场景,通过传入Runnable或者Callable的任务给FutureTask,直接调用其run方法或者放入线程池执行,之后可以在外部通过FutureTask的get方法异步获取执行结果,因此,FutureTask非常适合用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果。另外,FutureTask还可以确保即使调用了多次run方法,它都只会执行一次Runnable或者Callable任务,或者通过cancel取消FutureTask的执行等。
4、FutureTask间接继承了Runnable和Callable
5、FutureTask的线程安全由CAS操作来保证
6、FutureTask结果返回机制 :只有任务成功执行完成后,通过get方法能够得到任务返回的结果,其他情况都会导致阻塞。
参考链接
https://www.pdai.tech/md/java/thread/java-thread-x-juc-executor-FutureTask.html#futuretask%E7%AE%80%E4%BB%8B



