栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

netty中ChannelFuture.sync()的作用是什么?

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

netty中ChannelFuture.sync()的作用是什么?

如果你使用过 netty,你一定见过下面两行代码,它们可以说是创建一个 netty server 的标配代码

1
2
ChannelFuture future = bootstrap.bind(port).sync();
future.channel().closeFuture().sync();

不知道你有没想过这里面的 sync() 的作用是什么,如果去掉会有什么问题?

bootstrap.bind(port).sync() 分析

先来一步一步分析下第一行代码中的 sync 的作用

现象跟踪

为了方便调试,我把主要的代码贴到的下面,至于 EchoServerHandler 比较简单,你可以随便写个

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

try {
    ServerBootstrap bootstrap = new ServerBootstrap();
    bootstrap.group(eventLoopGroup)
        .channel(NioServerSocketChannel.class)
        .childHandler(new ChannelInitializer() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(new EchoServerHandler());
            }
        });

    ChannelFuture future = bootstrap.bind(port).sync();
    future.channel().closeFuture().sync();
} finally {
    eventLoopGroup.shutdownGracefully();
}

回到这行代码

1
ChannelFuture future = bootstrap.bind(port).sync();

我们可以把它拆成两行

1
2
ChannelFuture future1 = bootstrap.bind(port);
ChannelFuture future2 = future1.sync();

debug 下你会发现,future1 和 future2 其实是同一个对象

那么这个 sync 有什么作用呢?从名字上看是同步,猜测执行后会等待某种事件。将断点放在 future2 执行前再次 debug

你会发现事情起了变化,future1 执行 toString() 的结果中有一个 incomplete 字样,是一个未执行完成的状态

继续执行 future1.sync() 并查看状态

此时 future1 的状态变成了 success。这说明 future1.sync() 会等待异步事件执行完成,并且返回自身,可通过以下代码进一步验证

1
2
3
ChannelFuture future1 = bootstrap.bind(port);
Thread.sleep(1000);
ChannelFuture future2 = future1.sync();

在 future1.sync() 之前 sleep 1 秒钟,等待异步事件执行完,再次 debug 查看 future 状态

此时 future1 已经是 success 状态

代码分析

bootstrap.bind(port) 返回了一个 future 对象,它是一个 AbstractBootstrap$PendingRegistrationPromise 类型的实例

PendingRegistrationPromise 持有了 channel 的引用,并且往 ChannelFuture 注册了一个 Listener,当 ChannelFuture 的任务处理完成后,根据执行结果是否有异常决定执行 promise.setFailure 或 promise.registered 方法,若执行成功,则继续执行 doBind0 方法进行绑定操作。

回到上层,接着跟踪 future.sync 的内部执行逻辑,逐步跟踪进行,会发现执行到 DefaultPromise 的 await 方法

isDone 会判断 result 的状态

1
2
3
private static boolean isDone0(Object result) {
    return result != null && result != UNCANCELLABLE;
}

因为此时 result 为 null,所以返回 false,继续往下执行

最终会执行到 while 循环中,将 waiters 变量的值加一,并且进入到 Object.wait() 中,等待被 notify 或 notifyAll 唤醒

1
2
3
4
5
6
7
8
9
10
11
while (!isDone()) {
    // waiters变量的值加一
    incWaiters();
    try {
        // Object.wait(),等待被唤醒
        wait();
    } finally {
        // waiters变量的值加一
        decWaiters();
    }
}

那么它是被谁唤醒的呢?debug 跟踪 promise.registered 的执行,会发现它并没有修改 promise 的执行状态,继续跟踪 doBind0 方法

1
2
3
4
5
6
7
8
9
doBind0 
  -> ChannelFuture.bind 
  -> AbstractChannel.bind 
  -> pipeline.bind 
  -> tail.bind 
  -> next.invokeBind 
  -> ((ChannelOutboundHandler) handler()).bind 
  -> unsafe.bind 
  -> AbstractChannel$AbstractUnsafe.bind

经过层层地跟踪,最终会执行到 AbstractUnsafe.bind 方法

注意最后一行,它是用来设置 future 的执行结果状态的,可以看到在执行前的状态是 uncancellable。跟踪进去

1
2
3
4
5
protected final void safeSetSuccess(ChannelPromise promise) {
    if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
        logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
    }
}

关键的代码是 promise.trySuccess

1
2
3
DefaultChannelPromise.trySuccess 
  -> DefaultPromise.trySuccess(null) 
  -> DefaultPromise.setSuccess0(null)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
AtomicReferenceFieldUpdater RESULT_UPDATER =
            AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");

private boolean setSuccess0(V result) {
    return setValue0(result == null ? SUCCESS : result);
}

private boolean setValue0(Object objResult) {
    if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
        RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
        // 通知waiter
        if (checkNotifyWaiters()) {
            // 通知listener
            notifyListeners();
        }
        return true;
    }
    return false;
}

最终会通过 cas 操作将 AbstractBootstrap$PendingRegistrationPromise 的 result 属性设置为 SUCCESS,然后通知 waiter 和 listener

继续跟踪 checkNotifyWaiters 方法

1
2
3
4
5
6
7
8
private synchronized boolean checkNotifyWaiters() {
    // waiter的数量大于0
    if (waiters > 0) {
        // 唤醒所有的waiter
        notifyAll();
    }
    return listeners != null;
}

它会检查 waiters 变量的值,若大于 0 则说明有线程执行了 Object.wait 方法,此时通过 notifyAll 唤醒所有的线程,至此也就将整个过程串联了起来。

总结

bootstrap.bind() 返回一个 AbstractBootstrap$PendingRegistrationPromise 对象,它本质上是一个 DefaultPromise 对象,实现了 Future 接口

future.sync() 最终会使 DefaultPromise 的属性 waiters 值加一,然后调用 Object.wait 方法阻塞等待

bootstrap.bind() 会提交绑定事件到 EventLoop 中执行,待 socket 绑定地址成功后会调用 DefaultPromise 的 trySuccess 方法更改其状态并通知所有 waiter

此处 future.sync() 目的是等待异步的 socket 绑定事件完成

future.channel().closeFuture().sync() 分析

有了上面的经验,再看这行代码就轻松多了

代码分析

为了便于理解,我将这行代码拆开并加上了注释

1
2
3
4
5
6
7
8
// 获取ServerSocketChannel
Channel channel = future.channel();

// 获取closeFuture
ChannelFuture closeFuture = channel.closeFuture();

// 同步等待closeFuture执行完成
closeFuture.sync();

这里面需要关注的是 closeFuture,它的定义如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private final CloseFuture closeFuture = new CloseFuture(this);

static final class CloseFuture extends DefaultChannelPromise {
    CloseFuture(AbstractChannel ch) {
        super(ch);
    }

    @Override
    public ChannelPromise setSuccess() {
        throw new IllegalStateException();
    }

    @Override
    public ChannelPromise setFailure(Throwable cause) {
        throw new IllegalStateException();
    }

    @Override
    public boolean trySuccess() {
        throw new IllegalStateException();
    }

    @Override
    public boolean tryFailure(Throwable cause) {
        throw new IllegalStateException();
    }

    boolean setClosed() {
        return super.trySuccess();
    }
}

可以看到 CloseFuture 类只提供了一个 setClosed 方法,调用后会将其自身的状态设置为 SUCCESS。

如果还未调用过 setClosed 方法,执行 closeFuture.sync() 方法会阻塞在 Object.wait() 上,等待被唤醒。

那么在什么情况下会调用 setClosed 方法呢?跟踪代码引用情况,你会发现在 register 和 close 两个时机都有可能调用

它们之间的区别是,在 register 阶段,只有当出现异常的情况下会调用 closeFuture.setClosed() 方法

而在 close 阶段,因为本身就是要关闭,所以不管成功或出现异常都会调用,只是在抛异常的情况下会记录异常栈

总结
1
2
ChannelFuture future = bootstrap.bind(port).sync();
future.channel().closeFuture().sync();

再回顾下代码,这里面共有两个 Future 对象:

  • bind 方法返回的 future 用于等待底层网络组件启动完成
  • closeFuture 用于等待网络组件关闭完成
  • 本文作者: gorden5566
  • 本文链接: netty中ChannelFuture.sync()的作用是什么? - gorden5566
  • 版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/606062.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号