栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Java多线程之异步Future机制的原理和实现

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Java多线程之异步Future机制的原理和实现

项目中经常有些任务需要异步(提交到线程池中)去执行,而主线程往往需要知道异步执行产生的结果,这时我们要怎么做呢?用runnable是无法实现的,我们需要用callable看下面的代码:

 import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class AddTask implements Callable {

 private int a,b;
 
 public AddTask(int a, int b) {
 this.a = a;
 this.b = b;
 }
 
 @Override
 public Integer call throws Exception {
 Integer result = a + b;
 return result;
 }
 
 public static void main(String[] args) throws InterruptedException, ExecutionException {
 ExecutorService executor = Executors.newSingleThreadExecutor;
 //JDK目前为止返回的都是FutureTask的实例
 Future future = executor.submit(new AddTask(1, 2));
 Integer result = future.get;// 只有当future的状态是已完成时(future.isDone = true),get方法才会返回
 }
} 

虽然可以实现获取异步执行结果的需求,但是我们发现这个Future其实很不好用,因为它没有提供通知的机制,也就是说我们不知道future什么时候完成(如果我们需要轮询isDone()来判断的话感觉就没有用这个的必要了)。看下java.util.concurrent.future.Future 的接口方法:

 public interface Future {
  boolean cancel(boolean mayInterruptIfRunning);
  boolean isCancelled;
  boolean isDone;
  V get throws InterruptedException, ExecutionException;
  V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException;
} 

由此可见JDK的Future机制其实并不好用,如果能给这个future加个监听器,让它在完成时通知监听器的话就比较好用了,就像下面这个IFuture:

 package future;

import java.util.concurrent.CancellationException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;


public interface IFuture extends Future { 
 boolean isSuccess; // 是否成功 
 V getNow; //立即返回结果(不管Future是否处于完成状态)
 Throwable cause; //若执行失败时的原因
    boolean isCancellable; //是否可以取消
 IFuture await throws InterruptedException; //等待future的完成
 boolean await(long timeoutMillis) throws InterruptedException; // 超时等待future的完成
 boolean await(long timeout, TimeUnit timeunit) throws InterruptedException;
    IFuture awaitUninterruptibly; //等待future的完成,不响应中断
    boolean awaitUninterruptibly(long timeoutMillis);//超时等待future的完成,不响应中断
 boolean awaitUninterruptibly(long timeout, TimeUnit timeunit);
 IFuture addListener(IFutureListener l); //当future完成时,会通知这些加进来的监听器
 IFuture removeListener(IFutureListener l);
 
} 

接下来就一起来实现这个IFuture,在这之前要说明下Object.wait,Object.notifyAll方法,因为整个Future实现的原���的核心就是这两个方法.看看JDK里面的解释:

 public class Object {
  
  public final void wait throws InterruptedException {
    wait(0);
  }

  
  public final native void notifyAll;
} 

知道这个后,我们要自己实现Future也就有了思路,当线程调用了IFuture.await等一系列的方法时,如果Future还未完成,那么就调用future.wait 方法使线程进入WAITING状态。而当别的线程设置Future为完成状态(注意这里的完成状态包括正常结束和异常结束)时,就需要调用future.notifyAll方法来唤醒之前因为调用过wait方法而处于WAITING状态的那些线程。完整的实现如下(代码应该没有很难理解的地方,我是参考netty的Future机制的。有兴趣的可以去看看netty的源码):

 package future;

import java.util.Collection;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;


public class AbstractFuture implements IFuture {

 protected volatile Object result; // 需要保证其可见性
    
 protected Collection> listeners = new CopyOnWriteArrayList>;

 
 private static final SuccessSignal SUCCESS_SIGNAL = new SuccessSignal;

 @Override
 public boolean cancel(boolean mayInterruptIfRunning) {
 if (isDone) { // 已完成了不能取消
  return false;
 }

 synchronized (this) {
  if (isDone) { // double check
  return false;
  }
  result = new CauseHolder(new CancellationException);
  notifyAll; // isDone = true, 通知等待在该对象的wait的线程
 }
 notifyListeners; // 通知监听器该异步操作已完成
 return true;
 }
 
 @Override
 public boolean isCancellable {
 return result == null;
 }
 
 @Override
 public boolean isCancelled {
 return result != null && result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException;
 }

 @Override
 public boolean isDone {
 return result != null;
 }

 @Override
 public V get throws InterruptedException, ExecutionException {
 await; // 等待执行结果

 Throwable cause = cause;
 if (cause == null) { // 没有发生异常,异步操作正常结束
  return getNow;
 }
 if (cause instanceof CancellationException) { // 异步操作被取消了
  throw (CancellationException) cause;
 }
 throw new ExecutionException(cause); // 其他异常
 }

 @Override
 public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
 if (await(timeout, unit)) {// 超时等待执行结果
  Throwable cause = cause;
  if (cause == null) {// 没有发生异常,异步操作正常结束
  return getNow;
  }
  if (cause instanceof CancellationException) {// 异步操作被取消了
  throw (CancellationException) cause;
  }
  throw new ExecutionException(cause);// 其他异常
 }
 // 时间到了异步操作还没有结束, 抛出超时异常
 throw new TimeoutException;
 }

 @Override
 public boolean isSuccess {
 return result == null ? false : !(result instanceof CauseHolder);
 }

 @SuppressWarnings("unchecked")
 @Override
 public V getNow {
 return (V) (result == SUCCESS_SIGNAL ? null : result);
 }

 @Override
 public Throwable cause {
 if (result != null && result instanceof CauseHolder) {
  return ((CauseHolder) result).cause;
 }
 return null;
 }

 @Override
 public IFuture addListener(IFutureListener listener) {
 if (listener == null) {
  throw new NullPointerException("listener");
 }
 if (isDone) { // 若已完成直接通知该监听器
  notifyListener(listener);
  return this;
 }
 synchronized (this) {
  if (!isDone) {
  listeners.add(listener);
  return this;
  }
 }
 notifyListener(listener);
 return this;
 }

 @Override
 public IFuture removeListener(IFutureListener listener) {
 if (listener == null) {
  throw new NullPointerException("listener");
 }

 if (!isDone) {
  listeners.remove(listener);
 }

 return this;
 }

 @Override
 public IFuture await throws InterruptedException {
 return await0(true);
 }

 
 private IFuture await0(boolean interruptable) throws InterruptedException {
 if (!isDone) { // 若已完成就直接返回了
  // 若允许终端且被中断了则抛出中断异常
  if (interruptable && Thread.interrupted) {
  throw new InterruptedException("thread " + Thread.currentThread.getName + " has been interrupted.");
  }

  boolean interrupted = false;
  synchronized (this) {
  while (!isDone) {
   try {
   wait; // 释放锁进入waiting状态,等待其它线程调用本对象的notify/notifyAll方法
   } catch (InterruptedException e) {
   if (interruptable) {
    throw e;
   } else {
    interrupted = true;
   }
   }
  }
  }
  if (interrupted) {
  // 为什么这里要设中断标志位?因为从wait方法返回后, 中断标志是被clear了的, 
  // 这里重新设置以便让其它代码知道这里被中断了。
  Thread.currentThread.interrupt;
  }
 }
 return this;
 }
 
 @Override
 public boolean await(long timeoutMillis) throws InterruptedException {
 return await0(TimeUnit.MILLISECONDS.tonanos(timeoutMillis), true);
 }
 
 @Override
 public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
 return await0(unit.tonanos(timeout), true);
 }

 private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
 if (isDone) {
  return true;
 }

 if (timeoutNanos <= 0) {
  return isDone;
 }

 if (interruptable && Thread.interrupted) {
  throw new InterruptedException(toString);
 }

 long startTime = timeoutNanos <= 0 ? 0 : System.nanoTime;
 long waitTime = timeoutNanos;
 boolean interrupted = false;

 try {
  synchronized (this) {
  if (isDone) {
   return true;
  }

  if (waitTime <= 0) {
   return isDone;
  }

  for (;;) {
   try {
   wait(waitTime / 1000000, (int) (waitTime % 1000000));
   } catch (InterruptedException e) {
   if (interruptable) {
    throw e;
   } else {
    interrupted = true;
   }
   }

   if (isDone) {
   return true;
   } else {
   waitTime = timeoutNanos - (System.nanoTime - startTime);
   if (waitTime <= 0) {
    return isDone;
   }
   }
  }
  }
 } finally {
  if (interrupted) {
  Thread.currentThread.interrupt;
  }
 }
 }

 @Override
 public IFuture awaitUninterruptibly {
 try {
  return await0(false);
 } catch (InterruptedException e) { // 这里若抛异常了就无法处理了
  throw new java.lang.InternalError;
 }
 }
 
 @Override
 public boolean awaitUninterruptibly(long timeoutMillis) {
 try {
  return await0(TimeUnit.MILLISECONDS.tonanos(timeoutMillis), false);
 } catch (InterruptedException e) {
  throw new java.lang.InternalError;
 }
 }

 @Override
 public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
 try {
  return await0(unit.tonanos(timeout), false);
 } catch (InterruptedException e) {
  throw new java.lang.InternalError;
 }
 }

 protected IFuture setFailure(Throwable cause) {
 if (setFailure0(cause)) {
  notifyListeners;
  return this;
 }
 throw new IllegalStateException("complete already: " + this);
 }

 private boolean setFailure0(Throwable cause) {
 if (isDone) {
  return false;
 }

 synchronized (this) {
  if (isDone) {
  return false;
  }
  result = new CauseHolder(cause);
  notifyAll;
 }

 return true;
 }

 protected IFuture setSuccess(Object result) {
 if (setSuccess0(result)) { // 设置成功后通知监听器
  notifyListeners;
  return this;
 }
 throw new IllegalStateException("complete already: " + this);
 }

 private boolean setSuccess0(Object result) {
 if (isDone) {
  return false;
 }

 synchronized (this) {
  if (isDone) {
  return false;
  }
  if (result == null) { // 异步操作正常执行完毕的结果是null
  this.result = SUCCESS_SIGNAL;
  } else {
  this.result = result;
  }
  notifyAll;
 }
 return true;
 }

 private void notifyListeners {
 for (IFutureListener l : listeners) {
  notifyListener(l);
 }
 }

 private void notifyListener(IFutureListener l) {
 try {
  l.operationCompleted(this);
 } catch (Exception e) {
  e.printStackTrace;
 }
 }

 private static class SuccessSignal {

 }

 private static final class CauseHolder {
 final Throwable cause;

 CauseHolder(Throwable cause) {
  this.cause = cause;
 }
 }
} 

那么要怎么使用这个呢,有了上面的骨架实现,我们就可以定制各种各样的异步结果了。下面模拟一下一个延时的任务:

 package future.test;

import future.IFuture;
import future.IFutureListener;


public class DelayAdder {
 
 public static void main(String[] args) {
 new DelayAdder.add(3 * 1000, 1, 2).addListener(new IFutureListener {
  
  @Override
  public void operationCompleted(IFuture future) throws Exception {
  System.out.println(future.getNow);
  }
  
 });
 }
 
 public DelayAdditionFuture add(long delay, int a, int b) {
 DelayAdditionFuture future = new DelayAdditionFuture; 
 new Thread(new DelayAdditionTask(delay, a, b, future)).start;
 return future;
 }
 
 private class DelayAdditionTask implements Runnable {

 private long delay;
 
 private int a, b;
 
 private DelayAdditionFuture future;
 
 public DelayAdditionTask(long delay, int a, int b, DelayAdditionFuture future) {
  super;
  this.delay = delay;
  this.a = a;
  this.b = b;
  this.future = future;
 }

 @Override
 public void run {
  try {
  Thread.sleep(delay);
  Integer i = a + b;
  // TODO 这里设置future为完成状态(正常执行完毕)
  future.setSuccess(i);
  } catch (InterruptedException e) {
  // TODO 这里设置future为完成状态(异常执行完毕)
  future.setFailure(e.getCause);
  }
 }
 
 }
} package future.test;

import future.AbstractFuture;
import future.IFuture;
//只是把两个方法对外暴露
public class DelayAdditionFuture extends AbstractFuture {
 
 @Override
 public IFuture setSuccess(Object result) {
 return super.setSuccess(result);
 }
 
 @Override
 public IFuture setFailure(Throwable cause) {
 return super.setFailure(cause);
 }
 
} 

可以看到客户端不用主动去询问future是否完成,而是future完成时自动回调operationcompleted方法,客户端只需在回调里实现逻辑即可。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持考高分网。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/149371.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号