栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Netty源码解析-Future与Promise

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Netty源码解析-Future与Promise

前言:

    所谓异步编程,那我们首先要明白什么是同步编程?

    我们常见的编程手段基本都是基于同步策略,client发起一次方法调用,server(也可以是与client处于同一JVM的一个方法)处理请求,处理完成后返回响应给client。client在接收到响应之前处于阻塞状态。整个过程有点像之前 IO简述时,针对BIO、NIO和AIO的分析过程。当然这是基于操作系统层面的,我们今天分析的主要是基于应用层面的。

    同步编程示例图如下:

 

    异步编程时,client在发起一起请求后,立即返回一个结果Future,等server(方法)执行完成后,(执行针对client的回调方法),client获取到对应结果值。在整个过程中client是不发生阻塞的。

    异步编程示例图如下:

 

1.JDK提供的Future异步方案 JDK6已经提供了基于Future的异步方案,示例如下:
public class SimpleFutureDemo {

    public static void main(String[] args) throws Throwable, ExecutionException {
        ExecutorService executor = Executors.newFixedThreadPool(1);
        // 使用线程池来执行callable任务
        Future f = executor.submit(new Callable() {

            @Override
            public String call() throws Exception {
                // 模拟任务执行
                System.out.println("begin...");
                Thread.sleep(2000);
                System.out.println("end...");
                return "job executed ";
            }
        });

        // 获取执行结果
        System.out.println(f.get());
        System.out.println("main thread end...");
    }
}
// 执行结果如下:
begin...
// 在等待2秒之后才打印出来end
end...
job executed 
// 主线程的这句最后才打印出来,说明任务的执行获取结果集影响到主线程的执行
main thread end...
我们来看下java.util.concurrent.Future提供的方法
public interface Future {
    // 取消任务
    boolean cancel(boolean mayInterruptIfRunning);
    // 任务是否被取消
    boolean isCancelled();
    
    // 任务是否已完成
    boolean isDone();
    // 获取结果集
    V get() throws InterruptedException, ExecutionException;
    // 在指定时间获取结果集
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

总结:根据上文中的示例,我们在将callable任务提交给ExecutorService后,立即就获取一个Future的结果集,当前主线程不必阻塞等待执行。

但是,当我们调用future.get()方法时,发生了阻塞,一直阻塞到方法执行完成,这里是不太符合我们的预期的。

当然,我们也可以使用另外一种方式,

while (!f.isDone()) {
   // sleep(100);
}
// 获取执行结果
System.out.println(f.get());

不断通过轮询来判断future任务是否已经执行完成。

但是这种方案缺点显而易见,就是主线程一直在执行状态(如果没有sleep的话),CPU消耗过高,那么有没有更好的方案呢?

2.JDK提供的CompletableFuture方案

    在JDK8中,提供了CompletableFuture方案,该方案真正的实现了异步调用,我们不必要阻塞获取结果集,或者不断轮询是否任务已经执行完成,而是通过注册回调函数的方案来实现结果集的获取。

public class SimplePromiseDemo {

    public static void main(String[] args) throws Throwable, ExecutionException {
        // 两个线程的线程池
        ExecutorService executor = Executors.newFixedThreadPool(1);
        //jdk1.8之前的实现方式
        CompletableFuture future = CompletableFuture.supplyAsync(new Supplier() {
            @Override
            public String get() {
                System.out.println("begin...");
                try {
                    //模拟耗时操作
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("end...");
                return "job executed ";
            }
        }, executor);

        //注册future执行成功后的回调函数
        future.thenAccept(e -> System.out.println(e));
        System.out.println("main thread end...");
    }
}

// 打印结果
begin...
// 主线程直接结束,阻塞任务的执行没有影响到主线程
main thread end...
// 2秒后打印出来,
end...
job executed 

总结:结果对比还是比较明显的,在CompletableFuture这种注册回调函数的异步调用方案下,主线程的方法执行没有被callable任务所阻塞,在任务执行完成后,主动执行了future.thenAccept中注册的任务。

这种才是真正的异步执行。

3.Netty提供的异步方案

    作为Netty而言,并没有使用CompletableFuture的方案,而是自己实现了一套异步执行方案。

    我们先来看一下示例代码:

public class NettyPromiseTest {
    public static void main(String[] args) throws Throwable {
        // 使用Netty自定义的线程池组
        EventExecutorGroup group = new DefaultEventExecutorGroup(1);
        Future f = group.submit(new Callable() {

            @Override
            public String call() {
                System.out.println("begin...");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("end...");
                return "job executed";
            }
        });

        //注册监听,也就是上面的成功回调方法
        f.addListener(new FutureListener() {
            @Override
            public void operationComplete(Future f) throws Exception {
                System.out.println(f.get());
            }
        });

        System.out.println("main thread end...");
    }
}

// 打印结果如下:
begin...
// 没有阻塞,main线程直接结束
main thread end...
    
// 2秒后打印出来
end...
job executed

总结:Netty提供的这种异步方案,与上面2中的CompletableFuture比较相似,都是方法调用后返回一个Future,future注册一个回调函数,用于方法完成后的回调。

这些均不影响主线程的后续方法操作。

4.Netty异步策略源码分析

    Netty提供的这种异步策略,还是比较适合在我们实际的工作中的使用的,那它是怎么实现的呢?怎么完成从同步调用到异步调用的转换的呢?我们来看下。

4.1 io.netty.util.concurrent.Future及其实现类
public interface Future extends java.util.concurrent.Future {
 
    // 以下是抽取的重要的方法
    
    // 任务执行是否成功
    boolean isSuccess();
    // 添加监听方法
    Future addListener(GenericFutureListener> listener);
    // 删除监听方法
    Future removeListener(GenericFutureListener> listener);
    // 获取结果集
    V getNow();
}

    可以看到,Netty有自己的Future接口,它继承了java.util.concurrent.Future接口。

在JDK提供的Future的基础上,Netty的Future又提供了几个重要方法,里面最重要的就是addListener()添加监听方法,正是此方法实现了对任务的异步监听。

    该Future接口有哪些重要的实现类呢,如下图所示:

 

其中ChannelPromise接口又添加了一些特定方法

public interface ChannelPromise extends ChannelFuture, Promise {
    Channel channel();
    ChannelPromise setSuccess(Void result);
    ChannelPromise setSuccess();
    
    // 来自于Promise的实现
    Promise setSuccess(V result);
    Promise setFailure(Throwable cause);
}
4.2 EventExecutorGroup.submit(Callable task)的执行 类似于JDK中ExecutorService.submit(Callable task)并返回Future,Netty中的EventExecutorGroup.submit也返回io.netty.util.concurrent.Future。我们来看下其具体实现
// DefaultEventExecutorGroup.submit具体在父类中实现AbstractEventExecutor
// AbstractEventExecutor.submit
public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {
    @Override
    public  Future submit(Callable task) {
        // 继续交由父类执行,也就是JDK中的AbstractExecutorService
        return (Future) super.submit(task);
    }
}

// java.util.concurrent.AbstractExecutorService
public abstract class AbstractExecutorService implements ExecutorService {
    public  Future submit(Callable task) {
        if (task == null) throw new NullPointerException();
        // 以下两个方法均被子类覆写,参考下面两个方法
        RunnableFuture ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
}

public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {
    @Override
    protected final  RunnableFuture newTaskFor(Runnable runnable, T value) {
        return new PromiseTask(this, runnable, value);
    }
}

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {

    public void execute(Runnable task) {
        ObjectUtil.checkNotNull(task, "task");
        execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
    }
    private void execute(Runnable task, boolean immediate) {
        boolean inEventLoop = inEventLoop();
        // 将包装后的task(PromiseTask)添加到taskQueue中,
        // 添加完成之后,主线程会有不断轮询taskQueue中的任务进行执行
        // 类似于JDK的ExecutorService
        addTask(task);
        if (!inEventLoop) {
            startThread();
            if (isShutdown()) {
                boolean reject = false;
                try {
                    if (removeTask(task)) {
                        reject = true;
                    }
                ...
            }
        }

        if (!addTaskWakesUp && immediate) {
            wakeup(inEventLoop);
        }
    }
}

所以,EventExecutorGroup的sumbit方法,不是直接执行callable任务,而是将任务封装成PromiseTask然后添加到taskQueue队列中,后续通过线程中的run()方法不断从taskQueue中获取task,执行其方法。

所以,我们来看下PromiseTask.run()方法

class PromiseTask extends DefaultPromise implements RunnableFuture {
	public void run() {
        try {
            if (setUncancellableInternal()) {
                // 执行任务
                V result = runTask();
                // 调用所有监听实现
                setSuccessInternal(result);
            }
        } catch (Throwable e) {
            setFailureInternal(e);
        }
    }
    
    // 执行任务
    final V runTask() throws Exception {
        final Object task = this.task;
        if (task instanceof Callable) {
            // 如果是callable类型,则直接调用其call方法
            return ((Callable) task).call();
        }
        ((Runnable) task).run();
        return null;
    }
    
    protected final Promise setSuccessInternal(V result) {
        // 在这里,调用父类实现
        super.setSuccess(result);
        clearTaskAfterCompletion(true, COMPLETED);
        return this;
    }
}

public class DefaultPromise extends AbstractFuture implements Promise {
    @Override
    public Promise setSuccess(V result) {
        if (setSuccess0(result)) {
            return this;
        }
        throw new IllegalStateException("complete already: " + this);
    }
    
    private boolean setValue0(Object objResult) {
        if (RESULT_UPDATeR.compareAndSet(this, null, objResult) ||
            RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
            if (checkNotifyWaiters()) {
                // 在这里
                notifyListeners();
            }
            return true;
        }
        return false;
    }
    
    private void notifyListeners() {
        EventExecutor executor = executor();
        if (executor.inEventLoop()) {
            final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
            final int stackDepth = threadLocals.futureListenerStackDepth();
            if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
                threadLocals.setFutureListenerStackDepth(stackDepth + 1);
                try {
                    notifyListenersNow();
                } finally {
                    threadLocals.setFutureListenerStackDepth(stackDepth);
                }
                return;
            }
        }

        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
                notifyListenersNow();
            }
        });
    }
    
    // 最终调用在这里,获取所有的监听器,依次调用
    private void notifyListenersNow() {
        Object listeners;
        synchronized (this) {
            // only proceed if there are listeners to notify and we are not already notifying listeners.
            if (notifyingListeners || this.listeners == null) {
                return;
            }
            notifyingListeners = true;
            listeners = this.listeners;
            this.listeners = null;
        }
        for (;;) {
            if (listeners instanceof DefaultFutureListeners) {
                notifyListeners0((DefaultFutureListeners) listeners);
            } else {
                notifyListener0(this, (GenericFutureListener) listeners);
            }
            synchronized (this) {
                if (this.listeners == null) {
                    // Nothing can throw from within this method, so setting notifyingListeners back to false does not
                    // need to be in a finally block.
                    notifyingListeners = false;
                    return;
                }
                listeners = this.listeners;
                this.listeners = null;
            }
        }
    }
}
总结:

学习各种异步操作的方式,以后在我们创建高并发的中间件时大有裨益。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/683379.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号