所谓异步编程,那我们首先要明白什么是同步编程?
我们常见的编程手段基本都是基于同步策略,client发起一次方法调用,server(也可以是与client处于同一JVM的一个方法)处理请求,处理完成后返回响应给client。client在接收到响应之前处于阻塞状态。整个过程有点像之前 IO简述时,针对BIO、NIO和AIO的分析过程。当然这是基于操作系统层面的,我们今天分析的主要是基于应用层面的。
同步编程示例图如下:
异步编程时,client在发起一起请求后,立即返回一个结果Future,等server(方法)执行完成后,(执行针对client的回调方法),client获取到对应结果值。在整个过程中client是不发生阻塞的。
异步编程示例图如下:
1.JDK提供的Future异步方案
JDK6已经提供了基于Future的异步方案,示例如下:
public class SimpleFutureDemo {
public static void main(String[] args) throws Throwable, ExecutionException {
ExecutorService executor = Executors.newFixedThreadPool(1);
// 使用线程池来执行callable任务
Future f = executor.submit(new Callable() {
@Override
public String call() throws Exception {
// 模拟任务执行
System.out.println("begin...");
Thread.sleep(2000);
System.out.println("end...");
return "job executed ";
}
});
// 获取执行结果
System.out.println(f.get());
System.out.println("main thread end...");
}
}
// 执行结果如下:
begin...
// 在等待2秒之后才打印出来end
end...
job executed
// 主线程的这句最后才打印出来,说明任务的执行获取结果集影响到主线程的执行
main thread end...
我们来看下java.util.concurrent.Future提供的方法
public interface Future {
// 取消任务
boolean cancel(boolean mayInterruptIfRunning);
// 任务是否被取消
boolean isCancelled();
// 任务是否已完成
boolean isDone();
// 获取结果集
V get() throws InterruptedException, ExecutionException;
// 在指定时间获取结果集
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
总结:根据上文中的示例,我们在将callable任务提交给ExecutorService后,立即就获取一个Future的结果集,当前主线程不必阻塞等待执行。
但是,当我们调用future.get()方法时,发生了阻塞,一直阻塞到方法执行完成,这里是不太符合我们的预期的。
当然,我们也可以使用另外一种方式,
while (!f.isDone()) {
// sleep(100);
}
// 获取执行结果
System.out.println(f.get());
不断通过轮询来判断future任务是否已经执行完成。
但是这种方案缺点显而易见,就是主线程一直在执行状态(如果没有sleep的话),CPU消耗过高,那么有没有更好的方案呢?
2.JDK提供的CompletableFuture方案在JDK8中,提供了CompletableFuture方案,该方案真正的实现了异步调用,我们不必要阻塞获取结果集,或者不断轮询是否任务已经执行完成,而是通过注册回调函数的方案来实现结果集的获取。
public class SimplePromiseDemo {
public static void main(String[] args) throws Throwable, ExecutionException {
// 两个线程的线程池
ExecutorService executor = Executors.newFixedThreadPool(1);
//jdk1.8之前的实现方式
CompletableFuture future = CompletableFuture.supplyAsync(new Supplier() {
@Override
public String get() {
System.out.println("begin...");
try {
//模拟耗时操作
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end...");
return "job executed ";
}
}, executor);
//注册future执行成功后的回调函数
future.thenAccept(e -> System.out.println(e));
System.out.println("main thread end...");
}
}
// 打印结果
begin...
// 主线程直接结束,阻塞任务的执行没有影响到主线程
main thread end...
// 2秒后打印出来,
end...
job executed
总结:结果对比还是比较明显的,在CompletableFuture这种注册回调函数的异步调用方案下,主线程的方法执行没有被callable任务所阻塞,在任务执行完成后,主动执行了future.thenAccept中注册的任务。
这种才是真正的异步执行。
3.Netty提供的异步方案作为Netty而言,并没有使用CompletableFuture的方案,而是自己实现了一套异步执行方案。
我们先来看一下示例代码:
public class NettyPromiseTest {
public static void main(String[] args) throws Throwable {
// 使用Netty自定义的线程池组
EventExecutorGroup group = new DefaultEventExecutorGroup(1);
Future f = group.submit(new Callable() {
@Override
public String call() {
System.out.println("begin...");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end...");
return "job executed";
}
});
//注册监听,也就是上面的成功回调方法
f.addListener(new FutureListener() {
@Override
public void operationComplete(Future f) throws Exception {
System.out.println(f.get());
}
});
System.out.println("main thread end...");
}
}
// 打印结果如下:
begin...
// 没有阻塞,main线程直接结束
main thread end...
// 2秒后打印出来
end...
job executed
总结:Netty提供的这种异步方案,与上面2中的CompletableFuture比较相似,都是方法调用后返回一个Future,future注册一个回调函数,用于方法完成后的回调。
这些均不影响主线程的后续方法操作。
4.Netty异步策略源码分析Netty提供的这种异步策略,还是比较适合在我们实际的工作中的使用的,那它是怎么实现的呢?怎么完成从同步调用到异步调用的转换的呢?我们来看下。
4.1 io.netty.util.concurrent.Future及其实现类
public interface Future extends java.util.concurrent.Future {
// 以下是抽取的重要的方法
// 任务执行是否成功
boolean isSuccess();
// 添加监听方法
Future addListener(GenericFutureListener extends Future super V>> listener);
// 删除监听方法
Future removeListener(GenericFutureListener extends Future super V>> listener);
// 获取结果集
V getNow();
}
可以看到,Netty有自己的Future接口,它继承了java.util.concurrent.Future接口。
在JDK提供的Future的基础上,Netty的Future又提供了几个重要方法,里面最重要的就是addListener()添加监听方法,正是此方法实现了对任务的异步监听。
该Future接口有哪些重要的实现类呢,如下图所示:
其中ChannelPromise接口又添加了一些特定方法
public interface ChannelPromise extends ChannelFuture, Promise4.2 EventExecutorGroup.submit(Callable task)的执行 类似于JDK中ExecutorService.submit(Callable task)并返回Future,Netty中的EventExecutorGroup.submit也返回io.netty.util.concurrent.Future。我们来看下其具体实现{ Channel channel(); ChannelPromise setSuccess(Void result); ChannelPromise setSuccess(); // 来自于Promise的实现 Promise setSuccess(V result); Promise setFailure(Throwable cause); }
// DefaultEventExecutorGroup.submit具体在父类中实现AbstractEventExecutor
// AbstractEventExecutor.submit
public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {
@Override
public Future submit(Callable task) {
// 继续交由父类执行,也就是JDK中的AbstractExecutorService
return (Future) super.submit(task);
}
}
// java.util.concurrent.AbstractExecutorService
public abstract class AbstractExecutorService implements ExecutorService {
public Future submit(Callable task) {
if (task == null) throw new NullPointerException();
// 以下两个方法均被子类覆写,参考下面两个方法
RunnableFuture ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
}
public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {
@Override
protected final RunnableFuture newTaskFor(Runnable runnable, T value) {
return new PromiseTask(this, runnable, value);
}
}
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
public void execute(Runnable task) {
ObjectUtil.checkNotNull(task, "task");
execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
}
private void execute(Runnable task, boolean immediate) {
boolean inEventLoop = inEventLoop();
// 将包装后的task(PromiseTask)添加到taskQueue中,
// 添加完成之后,主线程会有不断轮询taskQueue中的任务进行执行
// 类似于JDK的ExecutorService
addTask(task);
if (!inEventLoop) {
startThread();
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
...
}
}
if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
}
}
所以,EventExecutorGroup的sumbit方法,不是直接执行callable任务,而是将任务封装成PromiseTask然后添加到taskQueue队列中,后续通过线程中的run()方法不断从taskQueue中获取task,执行其方法。
所以,我们来看下PromiseTask.run()方法
class PromiseTask总结:extends DefaultPromise implements RunnableFuture { public void run() { try { if (setUncancellableInternal()) { // 执行任务 V result = runTask(); // 调用所有监听实现 setSuccessInternal(result); } } catch (Throwable e) { setFailureInternal(e); } } // 执行任务 final V runTask() throws Exception { final Object task = this.task; if (task instanceof Callable) { // 如果是callable类型,则直接调用其call方法 return ((Callable ) task).call(); } ((Runnable) task).run(); return null; } protected final Promise setSuccessInternal(V result) { // 在这里,调用父类实现 super.setSuccess(result); clearTaskAfterCompletion(true, COMPLETED); return this; } } public class DefaultPromise extends AbstractFuture implements Promise { @Override public Promise setSuccess(V result) { if (setSuccess0(result)) { return this; } throw new IllegalStateException("complete already: " + this); } private boolean setValue0(Object objResult) { if (RESULT_UPDATeR.compareAndSet(this, null, objResult) || RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) { if (checkNotifyWaiters()) { // 在这里 notifyListeners(); } return true; } return false; } private void notifyListeners() { EventExecutor executor = executor(); if (executor.inEventLoop()) { final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get(); final int stackDepth = threadLocals.futureListenerStackDepth(); if (stackDepth < MAX_LISTENER_STACK_DEPTH) { threadLocals.setFutureListenerStackDepth(stackDepth + 1); try { notifyListenersNow(); } finally { threadLocals.setFutureListenerStackDepth(stackDepth); } return; } } safeExecute(executor, new Runnable() { @Override public void run() { notifyListenersNow(); } }); } // 最终调用在这里,获取所有的监听器,依次调用 private void notifyListenersNow() { Object listeners; synchronized (this) { // only proceed if there are listeners to notify and we are not already notifying listeners. if (notifyingListeners || this.listeners == null) { return; } notifyingListeners = true; listeners = this.listeners; this.listeners = null; } for (;;) { if (listeners instanceof DefaultFutureListeners) { notifyListeners0((DefaultFutureListeners) listeners); } else { notifyListener0(this, (GenericFutureListener>) listeners); } synchronized (this) { if (this.listeners == null) { // Nothing can throw from within this method, so setting notifyingListeners back to false does not // need to be in a finally block. notifyingListeners = false; return; } listeners = this.listeners; this.listeners = null; } } } }
学习各种异步操作的方式,以后在我们创建高并发的中间件时大有裨益。



