栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

FutureTask的源码学习

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

FutureTask的源码学习

一、简介

1、FutureTask是一个可取消的异步计算。这个类是Future的实现类,有开始和取消一个计算的方法,如果一个计算已经完成可以查看结果。如果在计算没有完成的情况下调用get获取计算结果会阻塞。且一旦任务完成后,计算不能重新开始或被取消,除非计算被runAndReset调用执行。

2、FutureTask被用来去封装一个Callable或者Runnable,一个FutureTask能够被submit作为一个Executor

3、FutureTask 的线程安全由CAS来保证。

二、源码分析

1、成员属性

 public class FutureTask implements RunnableFuture {
        //state表示的任务的状态
        private volatile int state;
        private static final int NEW          = 0;
        private static final int COMPLETING   = 1;
        private static final int NORMAL       = 2;
        private static final int EXCEPTIonAL  = 3;
        private static final int CANCELLED    = 4;
        private static final int INTERRUPTING = 5;
        private static final int INTERRUPTED  = 6;
    
        //任务
        private Callable callable;
    
        //存储任务完成以后的结果
        private Object outcome; 
    
        //执行当前任务的线程
        private volatile Thread runner;
    
        //执行当前任务被阻塞的线程
        private volatile WaitNode waiters;
    }

可能有的状态转换:

NEW -> COMPLETING -> NORMAL
NEW -> COMPLETING -> EXCEPTIonAL
NEW -> CANCELLED
NEW -> INTERRUPTING -> INTERRUPTED

注意:state用volatile修饰的,如果在多线程并发的情况下,某一个线程改变了任务的状态,其他线程都能够立马知道,保证了state字段的可见性。

2、构造函数

 public FutureTask(Callable callable) {
            if (callable == null)
                throw new NullPointerException();
            this.callable = callable;
            this.state = NEW; 
        }
        
    public FutureTask(Runnable runnable, V result) {
            this.callable = Executors.callable(runnable, result);
            this.state = NEW; 
        }

很好的诠释了FutureTask封装了Runnable或Callable,构造完成后将任务的状态变为NEW。同时注意,封装Runnable时用的Executors的静态方法callable

顺带看下Executors.callable()这个方法,这个方法的功能是把Runnable转换成Callable,代码如下:

public static  Callable callable(Runnable task, T result) {
    if (task == null)
       throw new NullPointerException();
    return new RunnableAdapter(task, result);
}

static final class RunnableAdapter implements Callable {
    final Runnable task;
    final T result;
    RunnableAdapter(Runnable task, T result) {
        this.task = task;
        this.result = result;
    }
    public T call() {
        task.run();
        return result;
    }
}

所以,FutureTask封装Runnable使用了适配器模式的设计模式

3、核心方法

 //运行任务的方法
    public void run() {
            if (state != NEW ||
                !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                             null, Thread.currentThread()))
                return;
            try {
                Callable c = callable;  //得到当前任务
                if (c != null && state == NEW) {
                    V result;
                    boolean ran;
                    try {
                        result = c.call();  //当前任务调用call方法,执行,同时,执行完后将结果返回
                        ran = true;
                    } catch (Throwable ex) {
                        result = null;
                        ran = false;
                        setException(ex);
                    }
                    if (ran)  //表示任务执行成功
                        set(result);  //CAS改变任务的状态从NEW->COMPLETING->NORMAL,同时将任务返回的结果保存到outcome属性中,再移除并唤醒所有等待线程
                }
            } finally {
                runner = null;
                int s = state;
                if (s >= INTERRUPTING)
                    handlePossibleCancellationInterrupt(s);
            }
        }
    
    protected void set(V v) {
            if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
                outcome = v;  //将任务成功执行完后返回的结果保存到outcome中
                UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 最终的状态,表示任务结束
                finishCompletion();   //移除并唤醒所有等待线程
            }
        }
    
    //该方法用于移除并唤醒所有等待线程
    private void finishCompletion() {
            for (WaitNode q; (q = waiters) != null;) {
                if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                    for (;;) {
                        Thread t = q.thread;
                        if (t != null) {
                            q.thread = null;
                            LockSupport.unpark(t);  //唤醒
                        }
                        WaitNode next = q.next;
                        if (next == null)
                            break;
                        q.next = null; // unlink to help gc
                        q = next;
                    }
                    break;
                }
            }
    
            done();
    
            callable = null;    
        }
    
    public boolean cancel(boolean mayInterruptIfRunning) {
            if (!(state == NEW &&
                  UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                      mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
                return false;
            try {    
                if (mayInterruptIfRunning) {
                    try {
                        Thread t = runner;
                        if (t != null)
                            t.interrupt();   //打断
                    } finally { // 设置成为最终态INTERRUPTED
                        UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                    }
                }
            } finally {
                finishCompletion();  //移除并唤醒所有等待线程
            }
            return true;
        }
    
        public V get() throws InterruptedException, ExecutionException {
            int s = state;
            if (s <= COMPLETING)
                s = awaitDone(false, 0L);  //如果任务没有完成或者其他的问题,将阻塞;创建一个新节点存入阻塞栈中
            return report(s);
        }
    
        public V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
            if (unit == null)
                throw new NullPointerException();
            int s = state;
            if (s <= COMPLETING &&
                (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
                throw new TimeoutException();
            return report(s);
        }
    
    private V report(int s) throws ExecutionException {
            Object x = outcome;
            if (s == NORMAL)
                return (V)x;
            if (s >= CANCELLED)
                throw new CancellationException();
            throw new ExecutionException((Throwable)x);
        }
三、示例

常用使用方式:

  • 第一种方式: Future + ExecutorService
  • 第二种方式: FutureTask + ExecutorService
  • 第三种方式: FutureTask + Thread

第一种方式:Future + ExecutorService

public class FutureDemo {
      public static void main(String[] args) {
          ExecutorService executorService = Executors.newCachedThreadPool();
          Future future = executorService.submit(new Callable() {
              @Override
              public Object call() throws Exception {
                  Long start = System.currentTimeMillis();
                  while (true) {
                      Long current = System.currentTimeMillis();
                     if ((current - start) > 1000) {
                         return 1;
                     }
                 }
             }
         });
  
         try {
             Integer result = (Integer)future.get();
             System.out.println(result);
         }catch (Exception e){
             e.printStackTrace();
         }
     }
}
 

第二种方式:FutureTask + ExecutorService

ExecutorService executor = Executors.newCachedThreadPool();
          Task task = new Task();
          FutureTask futureTask = new FutureTask(task);
          executor.submit(futureTask);

第三种方式:FutureTask + Thread

FutureTask futureTask = new FutureTask(new Task());
        Thread thread = new Thread(futureTask);
        thread.setName("Task thread");
        thread.start();
四、总结

1、FutureTask用来封装Runnable或者Callable接口,可以当成一个任务。

2、在Java并发程序中FutureTask表示一个可以取消的异步运算。它有启动和取消运算、查询运算是否完成和取回运算结果等方法。只有当运算完成的时候结果才能取回,如果运算尚未完成get方法将会阻塞。一个FutureTask对象可以对调用了Callable和Runnable的对象进行包装,由于FutureTask也是调用了Runnable接口所以它可以提交给Executor来执行。

3、FutureTask可用于异步获取执行结果或取消执行任务的场景,通过传入Runnable或者Callable的任务给FutureTask,直接调用其run方法或者放入线程池执行,之后可以在外部通过FutureTask的get方法异步获取执行结果,因此,FutureTask非常适合用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果。另外,FutureTask还可以确保即使调用了多次run方法,它都只会执行一次Runnable或者Callable任务,或者通过cancel取消FutureTask的执行等。

4、FutureTask间接继承了Runnable和Callable

5、FutureTask的线程安全由CAS操作来保证

6、FutureTask结果返回机制 :只有任务成功执行完成后,通过get方法能够得到任务返回的结果,其他情况都会导致阻塞。

参考链接
https://www.pdai.tech/md/java/thread/java-thread-x-juc-executor-FutureTask.html#futuretask%E7%AE%80%E4%BB%8B

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号