栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

futuretask源码分析(推荐)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

futuretask源码分析(推荐)

FutureTask只实现RunnableFuture接口:

该接口继承了java.lang.Runnable和Future接口,也就是继承了这两个接口的特性。

1.可以不必直接继承Thread来生成子类,只要实现run方法,且把实例传入到Thread构造函数,Thread就可以执行该实例的run方法了( Thread(Runnable) )。

2.可以让任务独立执行,get获取任务执行结果时,可以阻塞直至执行结果完成。也可以中断执行,判断执行状态等。

FutureTask是一个支持取消行为的异步任务执行器。该类实现了Future接口的方法。

如: 1. 取消任务执行

2. 查询任务是否执行完成

3. 获取任务执行结果(”get“任务必须得执行完成才能获取结果,否则会阻塞直至任务完成)。

注意:一旦任务执行完成,则不能执行取消任务或者重新启动任务。(除非一开始就使用runAndReset模式运行任务)
FutureTask支持执行两种任务, Callable 或者 Runnable的实现类。且可把FutureTask实例交由Executor执行。

源码部分(很简单):

public class FutureTask implements RunnableFuture {
  
  
  private volatile int state;
  private static final int NEW     = 0;
  private static final int COMPLETING  = 1;
  private static final int NORMAL    = 2;
  private static final int EXCEPTIonAL = 3;
  private static final int CANCELLED  = 4;
  private static final int INTERRUPTING = 5;
  private static final int INTERRUPTED = 6;
  
  private Callable callable;
  
  private Object outcome; // non-volatile, protected by state reads/writes
  
  private volatile Thread runner;
  
  private volatile WaitNode waiters;
  
  @SuppressWarnings("unchecked")
  private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
      return (V)x;
    if (s >= CANCELLED)
      throw new CancellationException();
    throw new ExecutionException((Throwable)x);
  }
  
  public FutureTask(Callable callable) {
    if (callable == null)
      throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;    // ensure visibility of callable
  }
  
  public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;    // ensure visibility of callable
  }
  //判断任务是否已取消(异常中断、取消等)
  public boolean isCancelled() {
    return state >= CANCELLED;
  }
  
  public boolean cancel(boolean mayInterruptIfRunning) {
    if (state != NEW)
      return false;
    if (mayInterruptIfRunning) {
      if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING))
 return false;
      Thread t = runner;
      if (t != null)
 t.interrupt();
      UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state
    }
    else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED))
      return false;
    finishCompletion();
    return true;
  }
  
  public V get() throws InterruptedException, ExecutionException {
    int s = state;
    //如果任务未彻底完成,那么则阻塞直至任务完成后唤醒该线程
    if (s <= COMPLETING)
      s = awaitDone(false, 0L);
    return report(s);
  }
  
  public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
      throw new NullPointerException();
    int s = state;
    if (s <= COMPLETING &&
      (s = awaitDone(true, unit.tonanos(timeout))) <= COMPLETING)
      throw new TimeoutException();
    return report(s);
  }
  
  protected void done() { }
  
  protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
      outcome = v;
      UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
      finishCompletion();
    }
  }
  
  protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
      outcome = t;
      UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
      finishCompletion();
    }
  }
  
  public void run() {
    //只有当任务状态=new时才被运行继续执行
    if (state != NEW ||
      !UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
      return;
    try {
      Callable c = callable;
      if (c != null && state == NEW) {
 V result;
 boolean ran;
 try {
   //调用Callable的Call方法
   result = c.call();
   ran = true;
 } catch (Throwable ex) {
   result = null;
   ran = false;
   setException(ex);
 }
 if (ran)
   set(result);
      }
    } finally {
      // runner must be non-null until state is settled to
      // prevent concurrent calls to run()
      runner = null;
      // state must be re-read after nulling runner to prevent
      // leaked interrupts
      int s = state;
      if (s >= INTERRUPTING)
 handlePossibleCancellationInterrupt(s);
    }
  }
  
  protected boolean runAndReset() {
    if (state != NEW ||
      !UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
      return false;
    boolean ran = false;
    int s = state;
    try {
      Callable c = callable;
      if (c != null && s == NEW) {
 try {
   c.call(); // don't set result
   ran = true;
 } catch (Throwable ex) {
   setException(ex);
 }
      }
    } finally {
      // runner must be non-null until state is settled to
      // prevent concurrent calls to run()
      runner = null;
      // state must be re-read after nulling runner to prevent
      // leaked interrupts
      s = state;
      if (s >= INTERRUPTING)
 handlePossibleCancellationInterrupt(s);
    }
    return ran && s == NEW;
  }
  
  private void handlePossibleCancellationInterrupt(int s) {
    // It is possible for our interrupter to stall before getting a
    // chance to interrupt us. Let's spin-wait patiently.
    if (s == INTERRUPTING)
      while (state == INTERRUPTING)
 Thread.yield(); // wait out pending interrupt
    // assert state == INTERRUPTED;
    // We want to clear any interrupt we may have received from
    // cancel(true). However, it is permissible to use interrupts
    // as an independent mechanism for a task to communicate with
    // its caller, and there is no way to clear only the
    // cancellation interrupt.
    //
    // Thread.interrupted();
  }
  
  static final class WaitNode {
    volatile Thread thread;
    volatile WaitNode next;
    WaitNode() { thread = Thread.currentThread(); }
  }
  
  private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
      if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
 for (;;) {
   Thread t = q.thread;
   if (t != null) {
     q.thread = null;
     LockSupport.unpark(t);
   }
   WaitNode next = q.next;
   if (next == null)
     break;
   q.next = null; // unlink to help gc
   q = next;
 }
 break;
      }
    }
    done();
    callable = null;    // to reduce footprint
  }
  
  private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
      
      if (Thread.interrupted()) {
 removeWaiter(q);
 throw new InterruptedException();
      }
      int s = state;
      if (s > COMPLETING) {
 if (q != null)
   q.thread = null;
 return s;
      }
      else if (s == COMPLETING) // cannot time out yet
 Thread.yield();
      else if (q == null)
 q = new WaitNode();
      else if (!queued)
 queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
      q.next = waiters, q);
      else if (timed) {
 nanos = deadline - System.nanoTime();
 if (nanos <= 0L) {
   removeWaiter(q);
   return state;
 }
 LockSupport.parkNanos(this, nanos);
      }
      else
 LockSupport.park(this);
    }
  }
  
  private void removeWaiter(WaitNode node) {
    if (node != null) {
      node.thread = null;
      retry:
      for (;;) {     // restart on removeWaiter race
 for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
   s = q.next;
   if (q.thread != null)
     pred = q;
   else if (pred != null) {
     pred.next = s;
     if (pred.thread == null) // check for race
continue retry;
   }
   else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
 q, s))
     continue retry;
 }
 break;
      }
    }
  }
  // Unsafe mechanics
  private static final sun.misc.Unsafe UNSAFE;
  private static final long stateOffset;
  private static final long runnerOffset;
  private static final long waitersOffset;
  static {
    try {
      UNSAFE = sun.misc.Unsafe.getUnsafe();
      Class k = FutureTask.class;
      stateOffset = UNSAFE.objectFieldOffset
 (k.getDeclaredField("state"));
      runnerOffset = UNSAFE.objectFieldOffset
 (k.getDeclaredField("runner"));
      waitersOffset = UNSAFE.objectFieldOffset
 (k.getDeclaredField("waiters"));
    } catch (Exception e) {
      throw new Error(e);
    }
  }
}

总结

以上就是本文关于futuretask源码分析(推荐)的全部内容,希望对大家有所帮助。感兴趣的朋友可以参阅:Java利用future及时获取多线程运行结果、浅谈Java多线程处理中Future的妙用(附源码)、futuretask用法及使用场景介绍等,有什么问题可以随时留言,欢迎大家一起交流讨论。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/143864.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号