栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

dubbo源码分析第十篇一提供者端通信NettyServer

dubbo源码分析第十篇一提供者端通信NettyServer

文章目录

原理图源码分析

DubboProtocol.openServer

createServer通过HeaderExchanger创建ExchangeServerTransporter完成bind NettyServer构建

doOpen实现netty编排 总结

原理图

与dubbo消费者的exchange transport codec基本一致区别在于 transport层虽都是netty,一个是NioServerSocketChannel,一个是NioSocketChannel
源码分析 DubboProtocol.openServer

提供者通信入口createServer创建服务

  private void openServer(URL url) {
        // ip:port 进行server级别的复用
        String key = url.getAddress();
        boolean isServer = url.getParameter(IS_SERVER_KEY, true);
        if (isServer) {
            ExchangeServer server = serverMap.get(key);
            if (server == null) {
            	线程安全
                synchronized (this) {
                    server = serverMap.get(key);
                    if (server == null) {
                        创建server
                        serverMap.put(key, createServer(url));
                    }
                }
            } else {
                server.reset(url);
            }
        }
    }
createServer

调用Exchangers 完成ExchangeServer创建

private ExchangeServer createServer(URL url) {
    ...... 删除url配置参数代码
    ExchangeServer server;
    try {
        server = Exchangers.bind(url, requestHandler);
    } catch (RemotingException e) {
        throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
    }

    ...... 删除其他代码

    return server;
}
通过HeaderExchanger创建ExchangeServer

handler结构同nettyclientDecodeHandler不同于DubboCodec,DecodeHandler是更精细化的编解码针对dubbo的rpc业务层面,而DubboCodec针对网络传输层面

DecodeHandlerHeaderExchangeHandlerExchangeHandlerAdapter
进一步完成Request内部的dubbo协议解码 用于方法调用完成请求响应映射查找dubbo exportor集合进行服务调用
public class HeaderExchanger implements Exchanger {
    public static final String NAME = "header";
    @Override
    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }
}
Transporter完成bind

通过spi选择NettyTransporterNettyTransporter 负责NettyServer构建

public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
    return getTransporter().bind(url, handler);
}

public class NettyTransporter implements Transporter {
    public static final String NAME = "netty";

    @Override
    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyServer(url, listener);
    }
}
NettyServer构建

完成handler增强 MultiMessageHandler HeartbeatHandler AllChannelHandler设置DubboCountCodec和handler装饰者到NettyServer的codec和handler属性doOpen完成netty编排

    public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
      
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
    }
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
    设置codec 和handler属性
    super(url, handler);
    netty编排
    doOpen();
 
}
doOpen实现netty编排

完成codec 和 exchange层handler向netty框架的注入编排

protected void doOpen() throws Throwable {
    bootstrap = new ServerBootstrap();

    bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
    workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
            new DefaultThreadFactory("NettyServerWorker", true));
    构建NettyServerHandler
    final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
    channels = nettyServerHandler.getChannels();

    bootstrap.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
            .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
            .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .childHandler(new ChannelInitializer() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    
                    int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                    构建NettyCodecAdapter
                    NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                    ch.pipeline()
                            .addLast("decoder", adapter.getDecoder())
                            .addLast("encoder", adapter.getEncoder())
                            .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
                            .addLast("handler", nettyServerHandler);
                }
            });
    // 启动服务端socketchannel
    ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
    channelFuture.syncUninterruptibly();
    channel = channelFuture.channel();

}
总结

结合第九篇,概述dubbo remote模块三层架构[exhcange,transport codec]介绍编解码器以及各层实现基本作用,handler装饰者结构中,各handler基本作用介绍dubbo的客户端和服务端的可复用共享模型[ip:port]

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/706436.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号