项目中经常有些任务需要异步(提交到线程池中)去执行,而主线程往往需要知道异步执行产生的结果,这时我们要怎么做呢?用runnable是无法实现的,我们需要用callable看下面的代码:
import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class AddTask implements Callable{ private int a,b; public AddTask(int a, int b) { this.a = a; this.b = b; } @Override public Integer call throws Exception { Integer result = a + b; return result; } public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService executor = Executors.newSingleThreadExecutor; //JDK目前为止返回的都是FutureTask的实例 Future future = executor.submit(new AddTask(1, 2)); Integer result = future.get;// 只有当future的状态是已完成时(future.isDone = true),get方法才会返回 } }
虽然可以实现获取异步执行结果的需求,但是我们发现这个Future其实很不好用,因为它没有提供通知的机制,也就是说我们不知道future什么时候完成(如果我们需要轮询isDone()来判断的话感觉就没有用这个的必要了)。看下java.util.concurrent.future.Future 的接口方法:
public interface Future{ boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled; boolean isDone; V get throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
由此可见JDK的Future机制其实并不好用,如果能给这个future加个监听器,让它在完成时通知监听器的话就比较好用了,就像下面这个IFuture:
package future; import java.util.concurrent.CancellationException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; public interface IFutureextends Future { boolean isSuccess; // 是否成功 V getNow; //立即返回结果(不管Future是否处于完成状态) Throwable cause; //若执行失败时的原因 boolean isCancellable; //是否可以取消 IFuture await throws InterruptedException; //等待future的完成 boolean await(long timeoutMillis) throws InterruptedException; // 超时等待future的完成 boolean await(long timeout, TimeUnit timeunit) throws InterruptedException; IFuture awaitUninterruptibly; //等待future的完成,不响应中断 boolean awaitUninterruptibly(long timeoutMillis);//超时等待future的完成,不响应中断 boolean awaitUninterruptibly(long timeout, TimeUnit timeunit); IFuture addListener(IFutureListener l); //当future完成时,会通知这些加进来的监听器 IFuture removeListener(IFutureListener l); }
接下来就一起来实现这个IFuture,在这之前要说明下Object.wait,Object.notifyAll方法,因为整个Future实现的原���的核心就是这两个方法.看看JDK里面的解释:
public class Object {
public final void wait throws InterruptedException {
wait(0);
}
public final native void notifyAll;
}
知道这个后,我们要自己实现Future也就有了思路,当线程调用了IFuture.await等一系列的方法时,如果Future还未完成,那么就调用future.wait 方法使线程进入WAITING状态。而当别的线程设置Future为完成状态(注意这里的完成状态包括正常结束和异常结束)时,就需要调用future.notifyAll方法来唤醒之前因为调用过wait方法而处于WAITING状态的那些线程。完整的实现如下(代码应该没有很难理解的地方,我是参考netty的Future机制的。有兴趣的可以去看看netty的源码):
package future; import java.util.Collection; import java.util.concurrent.CancellationException; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class AbstractFutureimplements IFuture { protected volatile Object result; // 需要保证其可见性 protected Collection > listeners = new CopyOnWriteArrayList >; private static final SuccessSignal SUCCESS_SIGNAL = new SuccessSignal; @Override public boolean cancel(boolean mayInterruptIfRunning) { if (isDone) { // 已完成了不能取消 return false; } synchronized (this) { if (isDone) { // double check return false; } result = new CauseHolder(new CancellationException); notifyAll; // isDone = true, 通知等待在该对象的wait的线程 } notifyListeners; // 通知监听器该异步操作已完成 return true; } @Override public boolean isCancellable { return result == null; } @Override public boolean isCancelled { return result != null && result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException; } @Override public boolean isDone { return result != null; } @Override public V get throws InterruptedException, ExecutionException { await; // 等待执行结果 Throwable cause = cause; if (cause == null) { // 没有发生异常,异步操作正常结束 return getNow; } if (cause instanceof CancellationException) { // 异步操作被取消了 throw (CancellationException) cause; } throw new ExecutionException(cause); // 其他异常 } @Override public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (await(timeout, unit)) {// 超时等待执行结果 Throwable cause = cause; if (cause == null) {// 没有发生异常,异步操作正常结束 return getNow; } if (cause instanceof CancellationException) {// 异步操作被取消了 throw (CancellationException) cause; } throw new ExecutionException(cause);// 其他异常 } // 时间到了异步操作还没有结束, 抛出超时异常 throw new TimeoutException; } @Override public boolean isSuccess { return result == null ? false : !(result instanceof CauseHolder); } @SuppressWarnings("unchecked") @Override public V getNow { return (V) (result == SUCCESS_SIGNAL ? null : result); } @Override public Throwable cause { if (result != null && result instanceof CauseHolder) { return ((CauseHolder) result).cause; } return null; } @Override public IFuture addListener(IFutureListener listener) { if (listener == null) { throw new NullPointerException("listener"); } if (isDone) { // 若已完成直接通知该监听器 notifyListener(listener); return this; } synchronized (this) { if (!isDone) { listeners.add(listener); return this; } } notifyListener(listener); return this; } @Override public IFuture removeListener(IFutureListener listener) { if (listener == null) { throw new NullPointerException("listener"); } if (!isDone) { listeners.remove(listener); } return this; } @Override public IFuture await throws InterruptedException { return await0(true); } private IFuture await0(boolean interruptable) throws InterruptedException { if (!isDone) { // 若已完成就直接返回了 // 若允许终端且被中断了则抛出中断异常 if (interruptable && Thread.interrupted) { throw new InterruptedException("thread " + Thread.currentThread.getName + " has been interrupted."); } boolean interrupted = false; synchronized (this) { while (!isDone) { try { wait; // 释放锁进入waiting状态,等待其它线程调用本对象的notify/notifyAll方法 } catch (InterruptedException e) { if (interruptable) { throw e; } else { interrupted = true; } } } } if (interrupted) { // 为什么这里要设中断标志位?因为从wait方法返回后, 中断标志是被clear了的, // 这里重新设置以便让其它代码知道这里被中断了。 Thread.currentThread.interrupt; } } return this; } @Override public boolean await(long timeoutMillis) throws InterruptedException { return await0(TimeUnit.MILLISECONDS.tonanos(timeoutMillis), true); } @Override public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return await0(unit.tonanos(timeout), true); } private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException { if (isDone) { return true; } if (timeoutNanos <= 0) { return isDone; } if (interruptable && Thread.interrupted) { throw new InterruptedException(toString); } long startTime = timeoutNanos <= 0 ? 0 : System.nanoTime; long waitTime = timeoutNanos; boolean interrupted = false; try { synchronized (this) { if (isDone) { return true; } if (waitTime <= 0) { return isDone; } for (;;) { try { wait(waitTime / 1000000, (int) (waitTime % 1000000)); } catch (InterruptedException e) { if (interruptable) { throw e; } else { interrupted = true; } } if (isDone) { return true; } else { waitTime = timeoutNanos - (System.nanoTime - startTime); if (waitTime <= 0) { return isDone; } } } } } finally { if (interrupted) { Thread.currentThread.interrupt; } } } @Override public IFuture awaitUninterruptibly { try { return await0(false); } catch (InterruptedException e) { // 这里若抛异常了就无法处理了 throw new java.lang.InternalError; } } @Override public boolean awaitUninterruptibly(long timeoutMillis) { try { return await0(TimeUnit.MILLISECONDS.tonanos(timeoutMillis), false); } catch (InterruptedException e) { throw new java.lang.InternalError; } } @Override public boolean awaitUninterruptibly(long timeout, TimeUnit unit) { try { return await0(unit.tonanos(timeout), false); } catch (InterruptedException e) { throw new java.lang.InternalError; } } protected IFuture setFailure(Throwable cause) { if (setFailure0(cause)) { notifyListeners; return this; } throw new IllegalStateException("complete already: " + this); } private boolean setFailure0(Throwable cause) { if (isDone) { return false; } synchronized (this) { if (isDone) { return false; } result = new CauseHolder(cause); notifyAll; } return true; } protected IFuture setSuccess(Object result) { if (setSuccess0(result)) { // 设置成功后通知监听器 notifyListeners; return this; } throw new IllegalStateException("complete already: " + this); } private boolean setSuccess0(Object result) { if (isDone) { return false; } synchronized (this) { if (isDone) { return false; } if (result == null) { // 异步操作正常执行完毕的结果是null this.result = SUCCESS_SIGNAL; } else { this.result = result; } notifyAll; } return true; } private void notifyListeners { for (IFutureListener l : listeners) { notifyListener(l); } } private void notifyListener(IFutureListener l) { try { l.operationCompleted(this); } catch (Exception e) { e.printStackTrace; } } private static class SuccessSignal { } private static final class CauseHolder { final Throwable cause; CauseHolder(Throwable cause) { this.cause = cause; } } }
那么要怎么使用这个呢,有了上面的骨架实现,我们就可以定制各种各样的异步结果了。下面模拟一下一个延时的任务:
package future.test;
import future.IFuture;
import future.IFutureListener;
public class DelayAdder {
public static void main(String[] args) {
new DelayAdder.add(3 * 1000, 1, 2).addListener(new IFutureListener {
@Override
public void operationCompleted(IFuture future) throws Exception {
System.out.println(future.getNow);
}
});
}
public DelayAdditionFuture add(long delay, int a, int b) {
DelayAdditionFuture future = new DelayAdditionFuture;
new Thread(new DelayAdditionTask(delay, a, b, future)).start;
return future;
}
private class DelayAdditionTask implements Runnable {
private long delay;
private int a, b;
private DelayAdditionFuture future;
public DelayAdditionTask(long delay, int a, int b, DelayAdditionFuture future) {
super;
this.delay = delay;
this.a = a;
this.b = b;
this.future = future;
}
@Override
public void run {
try {
Thread.sleep(delay);
Integer i = a + b;
// TODO 这里设置future为完成状态(正常执行完毕)
future.setSuccess(i);
} catch (InterruptedException e) {
// TODO 这里设置future为完成状态(异常执行完毕)
future.setFailure(e.getCause);
}
}
}
} package future.test;
import future.AbstractFuture;
import future.IFuture;
//只是把两个方法对外暴露
public class DelayAdditionFuture extends AbstractFuture {
@Override
public IFuture setSuccess(Object result) {
return super.setSuccess(result);
}
@Override
public IFuture setFailure(Throwable cause) {
return super.setFailure(cause);
}
}
可以看到客户端不用主动去询问future是否完成,而是future完成时自动回调operationcompleted方法,客户端只需在回调里实现逻辑即可。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持考高分网。



