栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

yyds网络编程Netty

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

yyds网络编程Netty

1、Netty的简介

1.1、Netty 是基于 Java NIO 的client-server(客户端服务器)框架,使用 Netty 可以快速开发网络应用,Netty 提供了高层次的抽象来简化 TCP 和 UDP 服务器的编程,但是你仍然可以使用底层的 API。

1.2、Netty 的内部实现是很复杂的,但是 Netty 提供了简单易用的API从网络处理代码中解耦业务逻辑。Netty 是完全基于 NIO 实现的,所以整个 Netty 都是异步的。

1.3、Netty 是最流行的 NIO 框架,它已经得到成百上千的商业、商用项目验证,许多框架和开源组件的底层 rpc 都是使用的 Netty,比如我们常用的 Dubbo、RocketMQ、Elasticsearch、gRPC等都用到了Netty。
1.4、支持多种协议 如 FTP,SMTP,HTTP 以及各种二进制和基于文本的传统协议。

2、为什么要用Netty?

2.1 统一的 API,支持多种传输类型,阻塞和非阻塞的。
2.2 简单而强大的线程模型。
2.3 功能强大,预置了编解码器解决 TCP 粘包/拆包功能,支持多种主流协议;
2.4 真正的无连接数据包套接字支持。
2.5 比直接使用 Java 核心 API 有更高的吞吐量、更低的延迟、更低的资源消耗和更少的内存复制。
2.6 安全性不错,有完整的 SSL/TLS 以及 StartTLS 支持。
2.7 社区活跃,版本迭代周期短,发现的BUG可以被及时修复,同时,更多的新功能会加入。
2.8 成熟稳定,经历了大型项目的使用和考验,而且很多开源项目都使用到了 Netty, 比如我们经常接触的 Dubbo、RocketMQ 等等。
2.9 成熟、稳定,Netty修复了已经发现的所有JDK NIO BUG,业务开发人员不需要再为NIO的BUG而烦恼。

3、Netty的应用场景

3.1 作为 RPC 框架的网络通信工具
比如我们的微服务框架中,不同的服务之间经常需要相互调用,这个时候就需要RPC框架了。

3.2 实现一个实时通讯系统
使用 Netty 我们可以实现一个可以聊天功能,比如微信的实时通讯系统。

Netty是基于Java NIO实现的,不可少的要讲一下NIO的基本概念:

阻塞(Block)与非阻塞(Non-Block)

阻塞(Block):需要等待缓冲区中的数据准备好过后才处理其他的事情,否则一直等待在那里。

非阻塞(Non-Block):进程访问数据缓冲区的时候,如果数据没有准备好则直接返回,不会等待。如果数据已经准备好,也直接返回。

同步(Synchronous)与异步(Asynchronous)

同步(Synchronous):客户端发出请求到服务端,无论服务端是否阻塞以及其他情况,只要服务端同步返回结果,就认定是同步。

异步(Asynchronous):客户端发出请求到服务端,同步返回值中没有结果,Client只需要等待通知,收到通知后才能拉取结果,就是异步。

Java BIO与NIO的不同之处 BIO(传统IO):

传统的 java.io 包,它是基于流模型实现的,交互的方式是同步、阻塞方式,也就是说在读入输入流或者输出流时,在读写动作完成之前,线程会一直阻塞在那里,它们之间的调用时可靠的线性顺序。

NIO(Non-blocking/New I/O):

NIO 是一种同步非阻塞的 I/O 模型,于 Java 1.4 中引入,对应 java.nio 包,提供了 Channel , Selector,Buffer 等抽象。NIO 中的 N 可以理解为 Non-blocking,不单纯是 New。它支持面向缓冲的,基于通道的 I/O 操作方法。NIO 提供了与传统 BIO 模型中的 Socket 和 ServerSocket 相对应的 SocketChannel 和 ServerSocketChannel 两种不同的套接字通道实现,两种通道都支持阻塞和非阻塞两种模式。对于高负载、高并发的(网络)应用,应使用 NIO 的非阻塞模式来开发。


Java NIO 和 BIO 之间第一个最大的区别是,BIO 是面向流的,NIO 是面向缓冲区的。

NIO的特点:

一个线程可以处理多个通道,减少线程创建数量,并且读或写不阻塞,就算没有数据可读写也不会发生阻塞导致线程资源的浪费。

Netty的执行流程:

接下来贴出Client 和Server代码讲解Netty 核心组件负责的主要功能。 Server代码:
   public void start(InetSocketAddress address) {
       //配置服务端的NIO线程组
       EventLoopGroup bossGroup = new NioEventLoopGroup(1);
       EventLoopGroup workerGroup = new NioEventLoopGroup();
       try {
           ServerBootstrap bootstrap = new ServerBootstrap()
                   .group(bossGroup, workerGroup)
                   .channel(NioServerSocketChannel.class)
                   .localAddress(address)
                   .childHandler(new NettyServerChannelInitializer())
                   .option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(1024*1024))
                   .childOption(ChannelOption.SO_KEEPALIVE, true);
           // 绑定端口,开始接收进来的连接
           ChannelFuture future = bootstrap.bind(address).sync();
           log.info("netty服务器开始监听端口:" + address.getPort());
           //关闭channel和块,直到它被关闭
           future.channel().closeFuture().sync();
       } catch (Exception e) {
           e.printStackTrace();
       } finally {
           //关闭主线程组
           bossGroup.shutdownGracefully();
           //关闭工作线程组
           workerGroup.shutdownGracefully();
       }
   }
Client代码:
    public void Client(InetSocketAddress address) {
     NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
                            pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
                            pipeline.addLast(new DemoSocketClientHandler());
                        }
                    });

            ChannelFuture future = bootstrap.connect("localhost", 8888).sync();
            future.channel().closeFuture().sync();
        } finally {
            if(eventLoopGroup != null) {
                eventLoopGroup.shutdownGracefully();
            }
        }
    }
    }
Channel:

Channel 接口是 Netty 对网络操作抽象类,它除了包括基本的 I/O 操作,如 bind()、connect()、read()、write() 等。

比较常用的Channel接口实现类是NioServerSocketChannel(服务端)和NioSocketChannel(客户端),这两个 Channel 可以和 BIO 编程模型中的ServerSocket以及Socket两个概念对应上。Netty 的 Channel 接口所提供的 API,大大地降低了直接使用 Socket 类的复杂性。

而且我们对数据的读取和写入要通过Channel管道,它就像水管一样,是一个通道。通道不同于流的地方就是通道是双向的,可以用于读、写和同时读写操作。

EventLoop:

主要作用实际就是负责监听网络事件并调用事件处理器进行相关 I/O 操作的处理。

EventloopGroup:

顾名思义就是EventLoop的组。

EventLoopGroup 包含多个 EventLoop,每一个 EventLoop 通常内部包含一个线程,上面我们已经说了 EventLoop 的主要作用实际就是负责监听网络事件并调用事件处理器进行相关 I/O 操作的处理,并且 EventLoop 处理的 I/O 事件都将在它专有的 Thread 上被处理,即 Thread 和 EventLoop 属于 1 : 1 的关系,从而保证线程安全。

从上图结合以下的代码图可以看出:

其中 bossGroup 用于接收连接,workerGroup 用于具体的处理(消息的读写以及其他逻辑处理)。
当客户端通过 connect 方法连接服务端时,bossGroup 处理客户端连接请求。当客户端处理完成后,会将这个连接提交给 workerGroup 来处理,然后 workerGroup 负责处理其 IO 相关操作。

NioEventLoopGroup 初始化分析

讲解之前先讲一下EventLoopGroup 和NioEventLoopGroup 到底是什么关系,如下图

从上图中可以看出,在Netty中,EventLoopGroup和NioEventLoopGroup都是一个线程池,Netty中的EventLoopGroup接口直接继承JDK中的EventExecutorGroup接口。Netty中的NioEventLoopGroup是继承了MultithreadEventLoopGroup抽象类。

而MultithreadEventLoopGroup继承MultithreadEventExecutorGroup抽象类。MultithreadEventExecutorGroup抽象类实现了Netty中的EventLoopGroup接口。如果还不明白的话可以看一下继承关系:

这样是否更好的理解呢

下面具体分析,从 NioEventLoopGroup 的初始化进入源码分析

    
    public NioEventLoopGroup() {
        this(0);
    }
   
    public NioEventLoopGroup(int nThreads) {
     // 第二个参数是这个group所包含的executor
        this(nThreads, (Executor)null);
    }
   
    public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
        this(nThreads, threadFactory, SelectorProvider.provider());
    }

    public NioEventLoopGroup(int nThreads, Executor executor) {
       // 第三个参数是provider,其用于提供selector及selectable的channel,
        // 这个provider是当前JVM中唯一的一个单例的provider
        this(nThreads, executor, SelectorProvider.provider());
    }
  public NioEventLoopGroup(
            int nThreads, Executor executor, final SelectorProvider selectorProvider) {
        // 第四个参数是一个选择策略工厂实例
        this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
    }
    }
构造函数中指定的每个参数含义:

nThreads :DEFAULT_EVENT_LOOP_THREADS//默认当前cpu的核数的2
倍。

SelectorProvider.provider():

    public static SelectorProvider provider() {
        synchronized (lock) {
            if (provider != null)
                return provider;
            return AccessController.doPrivileged(
                new PrivilegedAction() {
                    public SelectorProvider run() {
                           // 1. java.nio.channels.spi.SelectorProvider 属性指定实现类
                            if (loadProviderFromProperty())
                                return provider;
                                    // 2. SPI 指定实现类
                            if (loadProviderAsService())
                                return provider;
                            provider = sun.nio.ch.DefaultSelectorProvider.create();
                            return provider;
                        }
                    });
        }
    }

从代码中可以看出这是一个单例的provider。

SelectStrategyFactory:DefaultSelectStrategyFactory.INSTANCE//默认选择策略工厂实例。
chooserFactory:DefaultEventExecutorChooserFactory.INSTANCE//选择器工厂实例

这里主要的是重点讲一下MultithreadEventExecutorGroup构造函数

   protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
        this.terminatedChildren = new AtomicInteger();
        this.terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        } else {
          //如果没有指定线程工厂,那么构造一个默认的线程工厂
            if (executor == null) {
             // 这个executor是group所包含的executor,其将来会为其所包含的每个eventLoop创建一个线程
                executor = new ThreadPerTaskExecutor(this.newDefaultThreadFactory());
            }

            this.children = new EventExecutor[nThreads];

            int j;
            for(int i = 0; i < nThreads; ++i) {
                boolean success = false;
                boolean var18 = false;

                try {
                    var18 = true;
                     // 创建eventLoop
                    this.children[i] = this.newChild((Executor)executor, args);
                       // 在NioEventLoopGroup中,chlidren的每个元素其实都是一个NioEventLoop
                    success = true;
                    var18 = false;
                } catch (Exception var19) {
                    throw new IllegalStateException("failed to create a child event loop", var19);
                } finally {
                    if (var18) {
                      // 在创建这些eventLoop过程中,只要有一个创建失败,则关闭之前所有已经创建好的eventLoop
                        if (!success) {
                            int j;
                            for(j = 0; j < i; ++j) {
                                // 关闭之前所有已经创建好的eventLoop
                                this.children[j].shutdownGracefully();
                            }
      // 终止所有eventLoop上所执行的任务
                            for(j = 0; j < i; ++j) {
                                EventExecutor e = this.children[j];

                                try {
                                    while(!e.isTerminated()) {
                                        e.awaitTermination(2147483647L, TimeUnit.SECONDS);
                                    }
                                } catch (InterruptedException var20) {
                                    Thread.currentThread().interrupt();
                                    break;
                                }
                            }
                        }

                    }
                }

                if (!success) {
                    for(j = 0; j < i; ++j) {
                        this.children[j].shutdownGracefully();
                    }

                    for(j = 0; j < i; ++j) {
                        EventExecutor e = this.children[j];

                        try {
                            while(!e.isTerminated()) {
                                e.awaitTermination(2147483647L, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException var22) {
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
    // 创建一个选择器
            this.chooser = chooserFactory.newChooser(this.children);
            FutureListener terminationListener = new FutureListener() {
                public void operationComplete(Future future) throws Exception {
                    if (MultithreadEventExecutorGroup.this.terminatedChildren.incrementAndGet() == MultithreadEventExecutorGroup.this.children.length) {
                        MultithreadEventExecutorGroup.this.terminationFuture.setSuccess((Object)null);
                    }

                }
            };
            EventExecutor[] var24 = this.children;
            j = var24.length;
	//添加监听器,在NioEventLoop被关闭时触发
            for(int var26 = 0; var26 < j; ++var26) {
                EventExecutor e = var24[var26];
                e.terminationFuture().addListener(terminationListener);
            }

            Set childrenSet = new linkedHashSet(this.children.length);
            Collections.addAll(childrenSet, this.children);
            this.readonlyChildren = Collections.unmodifiableSet(childrenSet);
        }
    }
 
newDefaultThreadFactory: 

cexecutor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
如果没有指定线程工厂(ThreadFactory),那么默认调用newDefaultThreadFactory方法来构造一个线程工厂。

点击下一步:

public DefaultThreadFactory(Class poolType) {
        this(poolType, false, Thread.NORM_PRIORITY);//参数poolType为newDefaultThreadFactory的class,false表示线程不是后台线
        //程,Thread.NORM_PRIORITY,是正常的线程的优先级(三个优先级:MIN_PRIORITY = 1;NORM_PRIORITY = 5;MAX_PRIORITY = 10;)。
    } 

点击下一步:

  public DefaultThreadFactory(Class poolType, boolean daemon, int priority) {
        this(toPoolName(poolType), daemon, priority);
    }

toPoolName(poolType):比如我们给定当前newDefaultThreadFactory的poolType为io.netty.util.concurrent.newDefaultThreadFactory,那么经过toPoolName()方法返回为newDefaultThreadFactory。

点击下一步:

  public DefaultThreadFactory(String poolName, boolean daemon, int priority) {
        this(poolName, daemon, priority, System.getSecurityManager() == null ? Thread.currentThread().getThreadGroup() : System.getSecurityManager().getThreadGroup());
    }

在该构造方法中加入了线程组参数。

点击下一步:

    public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) {
        this.nextId = new AtomicInteger();
        ObjectUtil.checkNotNull(poolName, "poolName");
        if (priority >= 1 && priority <= 10) {
            this.prefix = poolName + '-' + poolId.incrementAndGet() + '-';
            this.daemon = daemon;
            this.priority = priority;
            this.threadGroup = threadGroup;
        } else {
            throw new IllegalArgumentException("priority: " + priority + " (expected: Thread.MIN_PRIORITY <= priority <= Thread.MAX_PRIORITY)");
        }
    }

到这里指定了一些参数:
c//poolId:private static final AtomicInteger poolId = new AtomicInteger();保证线程安全。
cthis.daemon = daemon;//是否后台线程
cthis.priority = priority;//优先级
cthis.threadGroup = threadGroup;//线程组


newDefaultThreadFactory这个说完回到原点说一下newDefaultThreadFactory(线程执行器)这个构造方法,每次执行任务都会通过它来创建一个线程实体。

public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        this.threadFactory = (ThreadFactory)ObjectUtil.checkNotNull(threadFactory, "threadFactory");
    }

    public void execute(Runnable command) {
        this.threadFactory.newThread(command).start();
    }
}

传递进来的threadFactory为DefaultThreadFactory创建的默认参数,这里面会构造NioEventLoop线程命名规则为nioEventLoop-1-xxx,当线程执行的时候会调用execute()方法,这里会创建一个FastThreadLocalThread线程。

public class DefaultThreadFactory implements ThreadFactory {
    @Override
    public Thread newThread(Runnable r) {
        Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
        return t;
    }

    protected Thread newThread(Runnable r, String name) {
        return new FastThreadLocalThread(threadGroup, r, name);
    }
}

通过上面的newThread()来创建一个线程,然后初始化线程对象数据,最终会调用到Thread.init()中。

接下来可以看到创建了一个children 数组,根据需要创建对应数量的数组
cthis.children = new EventExecutor[nThreads];
因为每个 NioEventLoopGroup 都是 NioEventLoop 的集合,所以这里的 children 数组就是当前 NioEventLoopGroup 的 NioEventLoop。讲解一下就行,这个不是重点。

重点代码是NioEventLoop 的创建,所以往下走找到:
cthis.children[i] = this.newChild((Executor)executor, args);;
这行代码,因为当前是 NioEventLoopGroup 的创建,所以找到子类的 newChild 实现。

protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory)args[3] : null;
        return new NioEventLoop(this, executor, (SelectorProvider)args[0], ((SelectStrategyFactory)args[1]).newSelectStrategy(), (RejectedExecutionHandler)args[2], queueFactory);
    }

newChild 方法,是一个抽象方法, 它的任务是实例化 EventLoop 对象。

我们接着看

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory queueFactory) {
   //newTaskQueue创建队列(jctools)
    super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
            rejectedExecutionHandler);
    //设置nio selectorProvider
    this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
    //设置select策略选择器,负责控制nio loop逻辑
    this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
    //selectorTuple其实就是一个简单的bean,内部存有原生selector和包装后的selector
    final SelectorTuple selectorTuple = openSelector();
    this.selector = selectorTuple.selector;
    this.unwrappedSelector = selectorTuple.unwrappedSelector;
    }

NioEventLoop构造方法中就是实例化一个 NioEventLoop 对象, 然后返回。NioEventLoop构造函数中会保存provider和事件轮询器selector,在其父类中还会创建一个MpscQueue队列,然后保存线程执行器executor。

再回过头来想一想,MultithreadEventExecutorGroup 内部维护了一个 EventExecutor[] children数组, Netty 的 EventLoopGroup 的实现机制其实就建立在 MultithreadEventExecutorGroup 之上。

最后总结一下 EventLoopGroup 的初始化过程:

1、EventLoopGroup(其实是MultithreadEventExecutorGroup) 内部维护一个类型为 EventExecutor 2、children 数组,数组长度是nThreads。
3、如果我们在实例化 NioEventLoopGroup 时, 如果指定线程池大小, 则 nThreads 就是指定的值, 反之是处理器核心数 * 2。
4、MultithreadEventExecutorGroup 中会调用 newChild 抽象方法来初始化 children 数组
抽象方法 newChild是在NioEventLoopGroup 中实现的, 它返回一个 NioEventLoop 实例。

####Bootstrap 和 ServerBootstrap:
Bootstrap :客户端启动配置类

ServerBootstrap:服务端启动配置类

Bootstrap 也可以通过 bind() 方法绑定本地的一个端口,作为 UDP 协议通信中的一端。
ServerBootstrap通常使用 bind() 方法绑定本地的端口上,然后等待客户端的连接。
Bootstrap 只需要配置一个线程组— EventLoopGroup ,而 ServerBootstrap需要配置两个线程组— EventLoopGroup ,一个用于接收连接,一个用于具体的处理,主要用于绑定我们创建的 EventLoopGroup,指定 Channel 的类型以及绑定 Channel 处理器等操作,主要做的都是给 ServerBootstrap与 Bootstrap 的属性赋值操作,所以称其为配置类。

Bootstrap与 ServerBootstrap 的继承关系


AbstractBootstrap是一个抽象类,Bootstrap和ServerBootstrap的都继承自AbstractBootstrap抽象类。AbstractBootstrap提供了类似于建造者模式的相关方法。

public abstract class AbstractBootstrap, C extends Channel> implements Cloneable {

    
    volatile EventLoopGroup group;
    
    @SuppressWarnings("deprecation")
    private volatile ChannelFactory channelFactory;
    
    private volatile SocketAddress localAddress;
    
    private final Map, Object> options = new linkedHashMap, Object>();
    
    private final Map, Object> attrs = new linkedHashMap, Object>();
    
    private volatile ChannelHandler handler;

    
    AbstractBootstrap(AbstractBootstrap bootstrap) {
        group = bootstrap.group;
        channelFactory = bootstrap.channelFactory;
        handler = bootstrap.handler;
        localAddress = bootstrap.localAddress;
        synchronized (bootstrap.options) {
            options.putAll(bootstrap.options);
        }
        synchronized (bootstrap.attrs) {
            attrs.putAll(bootstrap.attrs);
        }
    }

    
    public B group(EventLoopGroup group) {
        if (group == null) {
            throw new NullPointerException("group");
        }
        if (this.group != null) { // 不允许重复设置
            throw new IllegalStateException("group set already");
        }
        this.group = group;
        return self();
    }

    
    public B channel(Class channelClass) {
        if (channelClass == null) {
            throw new NullPointerException("channelClass");
        }
        return channelFactory(new ReflectiveChannelFactory(channelClass));
    }

    
    @SuppressWarnings({"unchecked", "deprecation"})
    public B channelFactory(io.netty.channel.ChannelFactory channelFactory) {
        return channelFactory((ChannelFactory) channelFactory);
    }

    
    public B localAddress(SocketAddress localAddress) {
        this.localAddress = localAddress;
        return self();
    }

    
    public  B option(ChannelOption option, T value) {
        if (option == null) {
            throw new NullPointerException("option");
        }
        if (value == null) {
            // 空,意味着移除
            synchronized (options) {
                options.remove(option);
            }
        } else { // 非空,进行修改
            synchronized (options) {
                options.put(option, value);
            }
        }
        return self();
    }

    
    @Override
    @SuppressWarnings("CloneDoesntDeclareCloneNotSupportedException")
    public abstract B clone();

    
    public ChannelFuture register() {
        validate();
        return initAndRegister();
    }

    
    public ChannelFuture bind() {
        //1.校验服务启动的必要参数
        validate();
        SocketAddress localAddress = this.localAddress;
        if (localAddress == null) {
            throw new IllegalStateException("localAddress not set");
        }
        //2.绑定地址端口
        return doBind(localAddress);
    }

     
    abstract void init(Channel channel) throws Exception;

    
    public B handler(ChannelHandler handler) {
        if (handler == null) {
            throw new NullPointerException("handler");
        }
        this.handler = handler;
        return self();
    }

    
    public abstract AbstractBootstrapConfig config();

    
    static void setChannelOptions(Channel channel, Map, Object> options, InternalLogger logger) {
        for (Map.Entry, Object> e : options.entrySet()) {
            setChannelOption(channel, e.getKey(), e.getValue(), logger);
        }
    }
}

这里面大部分方法都是设置相关的参数,对于channel方法需要注意,channel设置通道类型,内部实际上根据传入的类型参数初始化好了ChannelFactory channelFactory这个Channel工厂对象,后续会使用该工厂对象来生产Channel对象。

Server和Client的源码解析 ServerBootstrap服务端启动配置
public void start(InetSocketAddress address) {
		// 1.bossGroup 用于接收连接,workerGroup 用于具体的处理
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
			//2.创建服务端启动引导/辅助类:ServerBootstrap
            ServerBootstrap bootstrap = new ServerBootstrap()
				//3.给引导类配置两大线程组,确定了线程模型
                    .group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .localAddress(address)
                    .childHandler(new NettyServerChannelInitializer())
                    .option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(1024*1024))
                    .childOption(ChannelOption.SO_KEEPALIVE, true);
            // 绑定端口,开始接收进来的连接
            ChannelFuture future = bootstrap.bind(address).sync();
            log.info("netty服务器开始监听端口:" + address.getPort());
            //关闭channel和块,直到它被关闭
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //关闭主线程组
            bossGroup.shutdownGracefully();
            //关闭工作线程组
            workerGroup.shutdownGracefully();
        }
    }

接下来我们看一下group方法有什么

   public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
        super.group(parentGroup);
        if (this.childGroup != null) {
            throw new IllegalStateException("childGroup set already");
        } else {
            this.childGroup = (EventLoopGroup)ObjectUtil.checkNotNull(childGroup, "childGroup");
            return this;
        }
    }

在group方法中,会继续调用父类的group方法,通过类继承图我们知道,super.group(parentGroup)其实调用的就是AbstractBootstrap的group方法。AbstractBootstrap中group代码如下:

    public B group(EventLoopGroup group) {
        ObjectUtil.checkNotNull(group, "group");
        if (this.group != null) {
            throw new IllegalStateException("group set already");
        } else {
            this.group = group;
            return this.self();
        }
    }

这里只是初始化,都是为了后面的操作做准备,如果大家感兴趣可以进去看看。

c这里我们主要讲的是bootstrap.bind(address).sync()

我们点击bind进去源码看一下

public ChannelFuture bind(SocketAddress localAddress) {
// 验证group与channelFactory是否为null
        this.validate();
        return this.doBind((SocketAddress)ObjectUtil.checkNotNull(localAddress, "localAddress"));
        
    }

这里主要是校验了 Bootstrap 的 group 与 channelFactory 是否绑定成功。
我们接着点击doBind进去

    private ChannelFuture doBind(final SocketAddress localAddress) {
       // 创建、初始化channel,并将其注册到selector,返回一个异步结果
        final ChannelFuture regFuture = this.initAndRegister();
            // 从异步结果中获取channel
        final Channel channel = regFuture.channel();
            // 若异步操作执行过程中出现了异常,则直接返回异步对象(直接结束)
        if (regFuture.cause() != null) {
            return regFuture;
              // 处理异步操作完成的情况(可能是正常结束,或发生异常,或任务取消,这些情况都属于有结果的情况)
        } else if (regFuture.isDone()) {
            ChannelPromise promise = channel.newPromise();
              // 绑定指定的端口
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
        //出来异步操作还没有结果的的情况
            final AbstractBootstrap.PendingRegistrationPromise promise = new AbstractBootstrap.PendingRegistrationPromise(channel);
             // 为异步操作添加监听
            regFuture.addListener(new ChannelFutureListener() {
            // 若异步操作具有了结果(即完成),则触发该方法的执行
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    // 异步操作执行过程中出现了问题
                    if (cause != null) {
                        promise.setFailure(cause);
                    } else { // 异步操作正常结果
                        promise.registered();
                            // 绑定指定的端口
                        AbstractBootstrap.doBind0(regFuture, channel, localAddress, promise);
                    }

                }
            });
            return promise;
        }
    }  

首先我们从上往下分几步走:

1、ChannelPromise 与 ChannelFuture有什么关系?

ChannelFuture内部提供了修改当前 Future 状态的方法。在 ChannelFuture 的基础上实现了设置最终状态的修改方法,而 ChannelFuture 只可以查询当前异步操作的结果,不可以修改当前异步结果的 Future 。这里需要知道的就是 ChannelPromise 可以修改当前异步结果的状态。

ChannelPromise是ChannelFuturer的子类,点击ChannelPromise进去查看就知道它继承了ChannelFuture,其定义了一些方法如setSuccess()、setFailure()和sync()等。

2、final ChannelFuture regFuture = this.initAndRegister();

initAndRegister:方法中主要做三件事:
1、创建 Channel 实例。
2.、初始化 Channel。
3.、注册 Channel到selector;

    final ChannelFuture initAndRegister() {
        Channel channel = null;

        try {
           // 创建channel
           // channelFactory 是在调用 AbstractBootstrap 类的
           // public B channel(Class channelClass) 方法中设置的
           // 默认为 ReflectiveChannelFactory 类工厂即通过反射的方式生成 Channel 的工厂
           //
           // .channel(NioServerSocketChannel.class) 设置的是 NioServerSocketChannel 类
           // 则此处生成了 NioServerSocketChannel 类实例,调用的是 NioServerSocketChannel 类的无参构造方法
            channel = this.channelFactory.newChannel();
                 // 初始化channel
            this.init(channel);
        } catch (Throwable var3) {
            if (channel != null) {
                channel.unsafe().closeForcibly();
                return (new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE)).setFailure(var3);
            }

            return (new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE)).setFailure(var3);
        }
   // 将channel注册到selector
        ChannelFuture regFuture = this.config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }

        return regFuture;
    }
第一步:使用channelFactory.newChannel() 创建 Channel

首先我们debug模式调试一下channelFactory.newChannel()看结果:

发现这个channelFactory返回的类型是ReflectiveChannelFactory,并且里面的属性clazz的值是NioServerSocketChannel.class。那么NioServerSocketChannel.class是怎么来的呢?



从上图中可以看出赋值是通过bootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)实现的。
上图中的channel这个方法的作用就是给channelFactory赋值以及给ReflectiveChannelFactory中的clazz赋值。

现在我们点击newChannel找到对应的newChannel方法

public T newChannel() {
        try {
         // 调用无参构造器创建channel
            return (Channel)this.constructor.newInstance();
        } catch (Throwable var2) {
            throw new ChannelException("Unable to create Channel from class " + this.constructor.getDeclaringClass(), var2);
        }
    }

我们再 Server 端传入的 channel 类为NioServerSocketChannel.class ,所以上面看的 constructor.newInstance(); 对应的也就是 NioServerSocketChannel 的无参构造。这样我们继续点击 NioServerSocketChannel构造函数。

// NIO中的provider,其用于创建selector与channel。并且是单例的
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();

public NioServerSocketChannel() {
    // DEFAULT_SELECTOR_PROVIDER 静态变量
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}

我们接着点击newSocket

  private static java.nio.channels.ServerSocketChannel newSocket(SelectorProvider provider) {
        try {
            return provider.openServerSocketChannel();
        } catch (IOException var2) {
            throw new ChannelException("Failed to open a server socket.", var2);
        }
    }

这里返回Java NIO 原生的 Channel,最后将 NIO 原生的Channel 包装成 NioServerSocketChannel。
继续点击 this(newSocket(DEFAULT_SELECTOR_PROVIDER))进入下面的代码:

   public NioServerSocketChannel(java.nio.channels.ServerSocketChannel channel) {
     // 参数1:父channel
    // 参数2:NIO原生channel
    // 参数3:指定当前channel所关注的事件为  接受连接
        super((Channel)null, channel, 16);
            // 用于对channel进行配置的属性集合
        this.config = new NioServerSocketChannel.NioServerSocketChannelConfig(this, this.javaChannel().socket());
    }

方法的作用是调用父类构造函数,并且生成一个配置类对象。到此channel的创建就算完成了。

第二步:使用this.init(channel)初始化 Channel

我们点击进入init方法

void init(Channel channel) throws Exception {
    // 获取serverBootstrap中的options属性
    final Map, Object> options = options0();
    // 将options属性设置到channel
    synchronized (options) {
        setChannelOptions(channel, options, logger);
    }

    // 获取serverBootstrap中的attrs属性
    final Map, Object> attrs = attrs0();
    synchronized (attrs) {
        // 遍历attrs属性
        for (Entry, Object> e: attrs.entrySet()) {
            @SuppressWarnings("unchecked")
            AttributeKey key = (AttributeKey) e.getKey();
            // 将当前遍历的attr初始化到channel
            channel.attr(key).set(e.getValue());
        }
    }

    // 获取channel的pipeline
    ChannelPipeline p = channel.pipeline();

    // 将serverBootstrap中所有以child开头的属性写入到局部变量,
    // 然后将它们初始化到childChannel中
    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry, Object>[] currentChildOptions;
    final Entry, Object>[] currentChildAttrs;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
    }
    synchronized (childAttrs) {
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
    }

    p.addLast(new ChannelInitializer() {
        @Override
        public void initChannel(final Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }

            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    // 将ServerBootstrapAcceptor处理器添加到pipeline
                    // ServerBootstrapAcceptor处理器用于接收ServerBootstrap中的属性值,
                    // 我们通常称其为连接处理器
                    pipeline.addLast(new ServerBootstrapAcceptor(
                        ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}
 

这里面主要分为3步走:
1、设置options和attrs属性并给channel和config赋值,意思就是ServerBootstrap 与 Bootstrap 在调用 doBind() 之前通过option() 与 attr() 设置的参数值,其中 options 属性设置到了 Channel 的 config 属性中,attrs 是直接被设置在了 Channel 上的。

2、设置childOptions和childAttrs:在设置完 options 属性与 attrs 属性时,接着获取了当前 channel 的 pipeline,接下来还是获取我们在 doBind() 之前设置的属性值,以 child 开头的方法 childOption() 与 childAttr() 设置的属性值。

3、添加ChannelInitializer执行器:

pipeline.addLast(new ServerBootstrapAcceptor(
                        ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }

为添加ServerBootstrapAcceptor连接器做准备。

这里讲一下ServerBootstrapAcceptor构造器

类继承结构图:

因为它负责客户端Channel的建立和初始化,因此需要childChannel相关的配置信息,主要属性有:
private final EventLoopGroup childGroup; //Reactor模型中的WorkerGroup
private final ChannelHandler childHandler;// 客户端Channel的ChannelHandler
private final Entry, Object>[] childOptions;客户端Channel的Options
private final Entry, Object>[] childAttrs;// 客户端Channel的Attrs
private final Runnable enableAutoReadTask;// 启用自动读取的任务

那么ServerBootstrapAcceptor是如何触发事件的呢?

从ServerBootstrapAcceptor方法中我们可以看出它重写了channelRead和exceptionCaught两个方法方法。

当客户端连接发送到服务端时,NioEventLoop会接收客户端连接,创建SocketChannel,并触发channelRead回调,如下代码:

public void channelRead(ChannelHandlerContext ctx, Object msg) {
    // msg为客户端发送来的数据,其为NioSocketChannel,即子channel,childChannel
    final Channel child = (Channel) msg;

    // 将来自于ServerBootstrap的child开头属性初始化到childChannel中(childHandler、childOptions、childAttrs)
    child.pipeline().addLast(childHandler);
    setChannelOptions(child, childOptions, logger);
    for (Entry, Object> e: childAttrs) {
        child.attr((AttributeKey) e.getKey()).set(e.getValue());
    }

    try {
       
		//将客户端Channel注册到WorkerGroup:
		//1.next()轮询出一个EventLoop.register()
		//2.Channel.Unsafe.register(),Channel注册到Selector
		//3.触发各种回调
		//Channel一旦注册到EventLoop,就由该EventLoop负责处理它整个生命周期的所有事件。
        //需要注意的是,这里的selector与父channel所注册的selector不是同一个
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
            	// 如果注册失败,强制关闭连接
                if (!future.isSuccess()) {
                // 底层就是调用原生JDK的关闭方法:javaChannel().close();
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}
 

概括一下就是做了两件事:
1、初始化 childChannel。
2、将成功从 client 连接过来的 channel 注册到 selector 上。

接下来是exceptionCaught:转播异常

当ChannelRead出现异常时,Netty会强制关闭连接,调用forceClose(child, var5)方法关闭连接。

// 强制关闭连接
private static void forceClose(Channel child, Throwable t) {
    child.unsafe().closeForcibly();
    logger.warn("Failed to register an accepted channel: {}", child, t);
}

底层还是调用了JDK底层的SocketChannel.close()方法关闭连接。

ChannelRead事件触发异常,Pipeline会传播异常事件,执行exceptionCaught回调,ServerBootstrapAcceptor面对异常时,会暂停1秒停止接受客户端连接,等待ServerSocketChannel恢复,并将异常事件传播出去。

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ChannelConfig config = ctx.channel().config();
            if (config.isAutoRead()) {
                // 1秒内停止接收新客户端
                config.setAutoRead(false);
                ctx.channel().eventLoop().schedule(this.enableAutoReadTask, 1L, TimeUnit.SECONDS);
            }
                // 将异常事件传播出去
            ctx.fireExceptionCaught(cause);
        }

至此,channel初始化代码已经讲解完。

第三步:this.config().group().register(channel) 将channel注册到selector

首先我们点击register方法进去

根据 NioEventLoopGroup 的继承体系就可以直接找到 实现 MultithreadEventLoopGroup。因为只有 MultithreadEventLoopGroup 在其继承体系中。

继续点击

   public ChannelFuture register(Channel channel) {
        return this.next().register(channel);
    }

这里需要了解下 next() 方法,因为我们现在是 eventLoopGroup next() 就是从当前 group 中获取一个 EventLoop,然后这里在继续跟进需要找 EventLoop 继承体系中实现 register 方法的类:SingleThreadEventLoop。

 
 public ChannelFuture register(Channel channel) {
    // 创建一个 ChannelPromise 然后注册
     return this.register((ChannelPromise)(new DefaultChannelPromise(channel, this)));
 }
  //  这里继续点击 promise.channel().unsafe().register(this, promise)中的 register
 public ChannelFuture register(ChannelPromise promise) {
     ObjectUtil.checkNotNull(promise, "promise");
     promise.channel().unsafe().register(this, promise);
     return promise;
 }
找到以下的代码
 public final void register(EventLoop eventLoop, final ChannelPromise promise) {
           ObjectUtil.checkNotNull(eventLoop, "eventLoop");
           if (AbstractChannel.this.isRegistered()) {
               promise.setFailure(new IllegalStateException("registered to an event loop already"));
           } else if (!AbstractChannel.this.isCompatible(eventLoop)) {
               promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
           } else {
           // channel与eventLoop的绑定就发生在这里,
   // 需要注意,这里的eventLoop还没有绑定线程,因为这个线程还没有创建
               AbstractChannel.this.eventLoop = eventLoop;
                 // 判断当前线程与eventLoop所绑定线程是否是同一个线程
               if (eventLoop.inEventLoop()) {
               //核心方法
                   this.register0(promise);
               } else {
                   try {
                     // 执行当前线程所绑定的eventLoop的execute(), 这个execute()会将参数任务写入到任务队列,并创建启动新的线程
           eventLoop.execute(new Runnable() {
                       eventLoop.execute(new Runnable() {
                           public void run() {
                               AbstractUnsafe.this.register0(promise);
                           }
                       });
                   } catch (Throwable var4) {
                       AbstractChannel.logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, var4);
                       this.closeForcibly();
                       AbstractChannel.this.closeFuture.setClosed();
                       this.safeSetFailure(promise, var4);
                   }
               }

           }
       }

直接点击this.register0(promise)这个方法

private void register0(ChannelPromise promise) {
            try {
                if (!promise.setUncancellable() || !this.ensureOpen(promise)) {
                    return;
                }

                boolean firstRegistration = this.neverRegistered;
                AbstractChannel.this.doRegister(); // 绑定,就在这里
                this.neverRegistered = false;
                AbstractChannel.this.registered = true;
                 //这里的invokeHandlerAddedIfNeeded会回调ChannelInitializer的handlerAdded反方,从而将ServerBootstrapAcceptor真正添加到Pipeline中
                AbstractChannel.this.pipeline.invokeHandlerAddedIfNeeded();
                this.safeSetSuccess(promise);
                AbstractChannel.this.pipeline.fireChannelRegistered();
                //这里的isActive()方法会返回false,底层调用NioServerSocketChannel的isActive判断,此时bind操作还没完成
                if (AbstractChannel.this.isActive()) {
                    if (firstRegistration) {
                        AbstractChannel.this.pipeline.fireChannelActive();
                    } else if (AbstractChannel.this.config().isAutoRead()) {
                        this.beginRead();
                    }
                }
            } catch (Throwable var3) {
                this.closeForcibly();
                AbstractChannel.this.closeFuture.setClosed();
                this.safeSetFailure(promise, var3);
            }

        }

点击AbstractChannel.this.doRegister()方法进去

最终调用了AbstractNioChannel里的doRegister方法来进行事件注册。

   protected void doRegister() throws Exception {
        boolean selected = false;

        while(true) {
            try {
              // 在这里进行了注册,将NIO原生channel注册到了NIO原生selector
                this.selectionKey = this.javaChannel().register(this.eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException var3) {
                if (selected) {
                    throw var3;
                }

                this.eventLoop().selectNow();
                selected = true;
            }
        }
    }

selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this)这个就是 channel 注册 Selector 的代码了。
完成事件注册之后,会调用pipeline.invokeHandlerAddedIfNeeded(); 从而调用ChannelInitializer的initChannel方法,也就是执行如下代码,真正将ServerBootstrapAcceptor添加到Pipeline,如下的代码

doBind0() 绑定端口号
public void initChannel(Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = ServerBootstrap.this.config.handler();
                if (handler != null) {
                    pipeline.addLast(new ChannelHandler[]{handler});
                }
 
                ch.eventLoop().execute(new Runnable() {
                    public void run() {
                        //4.添加ServerBootstrapAcceptor
                        pipeline.addLast(new ChannelHandler[]{new ServerBootstrap.ServerBootstrapAcceptor(currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)});
                    }
                });
            }
    private ChannelFuture doBind(final SocketAddress localAddress) {
       // 创建、初始化channel,并将其注册到selector,返回一个异步结果
        final ChannelFuture regFuture = this.initAndRegister();
            // 从异步结果中获取channel
        final Channel channel = regFuture.channel();
            // 若异步操作执行过程中出现了异常,则直接返回异步对象(直接结束)
        if (regFuture.cause() != null) {
            return regFuture;
              // 处理异步操作完成的情况(可能是正常结束,或发生异常,或任务取消,这些情况都属于有结果的情况)
        } else if (regFuture.isDone()) {
            ChannelPromise promise = channel.newPromise();
              // 绑定指定的端口
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
        //出来异步操作还没有结果的的情况
            final AbstractBootstrap.PendingRegistrationPromise promise = new AbstractBootstrap.PendingRegistrationPromise(channel);
             // 为异步操作添加监听
            regFuture.addListener(new ChannelFutureListener() {
            // 若异步操作具有了结果(即完成),则触发该方法的执行
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    // 异步操作执行过程中出现了问题
                    if (cause != null) {
                        promise.setFailure(cause);
                    } else { // 异步操作正常结果
                        promise.registered();
                            // 绑定指定的端口
                        AbstractBootstrap.doBind0(regFuture, channel, localAddress, promise);
                    }

                }
            });
            return promise;
        }
    }  
找到doBind0方法
private static void doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) {
        channel.eventLoop().execute(new Runnable() {
            public void run() {
             // 只有当channel初始化注册成功后,才会进行绑定
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }

            }
        });
    }

点击channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);进入方法


最好是调用的AbstractChannel bind方法

继续点击bind方法源码

public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        return this.tail.bind(localAddress, promise);
    }
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
        ObjectUtil.checkNotNull(localAddress, "localAddress");
        if (this.isNotValidPromise(promise, false)) {
            return promise;
        } else {
            final AbstractChannelHandlerContext next = this.findContextOutbound(512);
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {
                next.invokeBind(localAddress, promise);
            } else {
                safeExecute(executor, new Runnable() {
                    public void run() {
                        next.invokeBind(localAddress, promise);
                    }
                }, promise, (Object)null, false);
            }

            return promise;
        }
    }

继续点击 next.invokeBind(localAddress, promise)方法看源码

 private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
        if (this.invokeHandler()) {
            try {
                ((ChannelOutboundHandler)this.handler()).bind(this, localAddress, promise);
            } catch (Throwable var4) {
                notifyOutboundHandlerException(var4, promise);
            }
        } else {
            this.bind(localAddress, promise);
        }

    }

继续点击bind方法看源码
![在这里插入图片描述](https://img-blog.csdnimg.cn/a2717310542f42aca2d053022c2128a7.png?x-oss-process=image/watermark,type
_ZHJvaWRzYW5zZmFsbGJhY2s,shadow_50,text_Q1NETiBA6Zey5b6X5peg6IGK55qE5Lq6,size_20,color_FFFFFF,t_70,g_se,x_16)

  public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
            this.unsafe.bind(localAddress, promise);
        }

继续点击bind方法看源码

public final void bind(SocketAddress localAddress, ChannelPromise promise) {
            this.assertEventLoop();
            if (promise.setUncancellable() && this.ensureOpen(promise)) {
                if (Boolean.TRUE.equals(AbstractChannel.this.config().getOption(ChannelOption.SO_BROADCAST)) && localAddress instanceof InetSocketAddress && !((InetSocketAddress)localAddress).getAddress().isAnyLocalAddress() && !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
                    AbstractChannel.logger.warn("A non-root user can't receive a broadcast packet if the socket is not bound to a wildcard address; binding to a non-wildcard address (" + localAddress + ") anyway as requested.");
                }
 // 获取当前channel是否被激活。注意,现在还没有被激活,所以其值为false
                boolean wasActive = AbstractChannel.this.isActive();

                try {
                / 绑定
                    AbstractChannel.this.doBind(localAddress);
                } catch (Throwable var5) {
                    this.safeSetFailure(promise, var5);
                    this.closeIfClosed();
                    return;
                }

                if (!wasActive && AbstractChannel.this.isActive()) {
                    this.invokeLater(new Runnable() {
                        public void run() {
                        // 触发重写的channelActivate方法的执行
            }
                            AbstractChannel.this.pipeline.fireChannelActive();
                        }
                    });
                }

                this.safeSetSuccess(promise);
            }
        }

然后调用了NioServerSocketChannel的doBind0方法,在这里调用了java 底层的nio 包

   protected void doBind(SocketAddress localAddress) throws Exception {
        if (PlatformDependent.javaVersion() >= 7) {
            this.javaChannel().bind(localAddress, this.config.getBacklog());
        } else {
            this.javaChannel().socket().bind(localAddress, this.config.getBacklog());
        }

    }

javaChannel() 即获取 NIO 原生 channel 的方法,再获取到 NIO 原生 channel 之后调用 bind 方法完成绑定。

Client 端代码分析

看完上面的Server代码那对于Client的代码就容易理解了,Client和Srver有重复代码也有相同的逻辑也有不同的地方。从启动类中可以看出他们的区别。

首先还是贴出Client代码看一下

Bootstrap服务端启动配置
  // 创建一个 EventLoopGroup 对象
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
 // 创建 Bootstrap 对象
    Bootstrap bootstrap = new Bootstrap();
     //设置使用的 EventLoopGroup
    bootstrap.group(eventLoopGroup)
        .channel(NioSocketChannel.class)// 设置要被实例化的为 NioSocketChannel 类
        .option(ChannelOption.SO_KEEPALIVE, true); // (4) 配置通道选项ChannelOption,此处设置连接通道保持连接
        .handler(new ChannelInitializer() {// 设置 NioSocketChannel 的处理器
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
                pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
                pipeline.addLast(new SomeSocketClientHandler());
            }
        });
        // 连接服务器,并同步等待成功,即启动客户端
    ChannelFuture future = bootstrap.connect("127.0.0.1", 8080).sync();
        // 监听客户端关闭,并阻塞等待
    future.channel().closeFuture().sync();
} finally {
    if(eventLoopGroup != null) {
        // 优雅关闭一个对象
        eventLoopGroup.shutdownGracefully();
    }
}

这里额创建 NioEventLoopGroup,这个和 Server 端是一样的,所以不在讲解。

初始化 Bootstrap 启动配置类,配置启动参数,这里也不需要说。

主要需要讲解的地方是bootstrap.connect(“127.0.0.1”, 8080).sync()这里,它与Server的主要区别 :

Server端是启动一个服务端服务,使用的是 bind 绑定当前机器的端口,对外暴露服务,而在 Client 端就是主动去连接 Server 端,与服务器建立连接。所以这里是 connect。

我们进入到启动客户端的地方connect(…)看源码

public ChannelFuture connect(String inetHost, int inetPort) {
    return connect(InetSocketAddress.createUnresolved(inetHost, inetPort));
}

public ChannelFuture connect(SocketAddress remoteAddress) {
    if (remoteAddress == null) {
        throw new NullPointerException("remoteAddress");
    }
   //验证一些必要配置是否都已经配置过
    validate();
   // 解析远程地址,并进行连接
    return doResolveAndConnect(remoteAddress, config.localAddress());
}

1、createUnresolved方法将输入的连接地址和端口号保存创建创建 InetSocketAddress 对象返回。

2、validate() 方法主要校验 bootstrap 的必须配置是否为空,group、channelFactory与handler是否为空。

3、然后调用 doResolveAndConnect 方法建立连接。

我们点击进入到doResolveAndConnect()方法看源码

 
    private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
        // channel的创建、初始化与注册
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        // 若channel注册完毕
        if (regFuture.isDone()) {
            if (!regFuture.isSuccess()) {
                return regFuture;
            }
            // 解析server端地址并连接
            return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
        } else {
   
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
              //如果异步注册对应的 ChanelFuture 未完成,则调用下面方法,添加监听器
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        promise.setFailure(cause);
                    } else {
        
                        promise.registered();
                               // 解析server端地址并连接
                        doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }

这里除了doResolveAndConnect0()这个方法不同之外,客户端Channel初始化和注册实现与服务端基本一样,首先通过异步的方式初始化并注册 channel,然后获取异步结果,判断是否异常,处理异常情况。没有异常,判断当前异步方法是否结束,如果结束根据结束的状态处理结束的逻辑,因为结束可以是正常也可以是异常结束。如果是异步结果一直没有结果,那就建立监听,监听异步结果返回时,触发最终逻辑。

如果不明白的可以往上看Server的代码解析,这里主要讲的还是doResolveAndConnect0()方法,

我们继续点击doResolveAndConnect0方法进去看源码

private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress,
                                           final SocketAddress localAddress, final ChannelPromise promise) {
    try {
        final EventLoop eventLoop = channel.eventLoop();
        // 创建一个地址解析器,其中包含一个地址格式匹配器
        //首先进行域名解析(如果指定的服务端地址时域名而不是
        //IP时需要进行解析)
        final AddressResolver resolver = this.resolver.getResolver(eventLoop);

        // 若解析器不支持该地址 或 该地址已经解析过了,则直接对该地址进行连接,
        // 返回可修改的promise,即成功了就成功,失败了则promise中有失败信息
        if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) {
               //解析成功之后进行连接操作
            doConnect(remoteAddress, localAddress, promise);
            return promise;
        }
        // 处理地址没有解析且解析器也支持该地址格式的情况
        // 以异步的方式解析地址
        final Future resolveFuture = resolver.resolve(remoteAddress);

        if (resolveFuture.isDone()) {
        // 解析远程地址失败,关闭 Channel ,并回调通知 promise 异常
            final Throwable resolveFailureCause = resolveFuture.cause();
            // 若解析过程中出现异常,则关闭channel,否则连接解析的地址
            if (resolveFailureCause != null) {  
                channel.close();
                promise.setFailure(resolveFailureCause);
            } else { 
                // 处理异步解析成功的情况
                // resolveFuture.getNow() 从异步对象中获取解析结果,即解析过的地址
                doConnect(resolveFuture.getNow(), localAddress, promise);
            }
            return promise;
        }
       // 若解析还没有完成,则为其添加监听器
        resolveFuture.addListener(new FutureListener() {
            @Override
            public void operationComplete(Future future) throws Exception {
              // 解析远程地址失败,关闭 Channel ,并回调通知 promise 异常
                if (future.cause() != null) {
                    channel.close();
                    promise.setFailure(future.cause());
                } else {
                  // 解析远程地址成功,连接远程地址
                    doConnect(future.getNow(), localAddress, promise);
                }
            }
        });
    } catch (Throwable cause) {
             // 发生异常,并回调通知 promise 异常
        promise.tryFailure(cause);
    }
    return promise;
}

这里需要讲一下,服务端地址不仅可以使用IP,还可以使用域名,我们在指定域名时,就需要Netty将其解析为IP地址,实现逻辑也比较简单,默认的解析器为DefaultNameResolver,根据域名解析出IP调用的方法为java.net.InetAddress.getAllByName(hostname),这里有兴趣的小伙伴可以自行了解一下,这里就不在讲解了。

接下来我们继续跟进doConnect方法看源码

private static void doConnect(
            final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {

        //获取到当前Channel的eventLoop后的exeuct以异步的方式去连接服务端地址
        final Channel channel = connectPromise.channel();
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
              //如果没有配置本机地址,则进行远程连接
                if (localAddress == null) {
                    channel.connect(remoteAddress, connectPromise);
                } else {
                  //准备跟踪
                    channel.connect(remoteAddress, localAddress, connectPromise);
                }
                // 为promise添加一个异常监听器。连接过程发生异常,则关闭channel
                connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            }
        });
    }

继续点击connect方法看源码

public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
        return this.pipeline.connect(remoteAddress, promise);
    }
    public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
        return this.tail.connect(remoteAddress, promise);
    }

public final ChannelFuture connect(
    SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
    return tail.connect(remoteAddress, localAddress, promise);
}
//pipeline 获取为节点进行调用连接:

public ChannelFuture connect(
    final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
//检查输入参数的合法性
    if (remoteAddress == null) {
        throw new NullPointerException("remoteAddress");
    }
    //检查输入参数的合法性
    if (isNotValidPromise(promise, false)) {
        // cancelled
        return promise;
    }

   //查找下一个处理器。因为pipleline是一个链表数据结构,即可通过tail的pre属性获得上一个处理器。经过查找最后得到了定义的内部类DefaultChannelPipeline$HeadContext的对象
    final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
    // 获取处理器节点的executor
    EventExecutor executor = next.executor();
    //检查当前线程是否处于next处理器关联的事件循环器中。如果在循环器中则直接执行,实际执行发现在循环中,接着走该分支
    if (executor.inEventLoop()) {
        next.invokeConnect(remoteAddress, localAddress, promise);
    } else {
    //如果当前线程不在循环其中,则提交一个异步任务
        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
                next.invokeConnect(remoteAddress, localAddress, promise);
            }
        }, promise, null);
    }
    return promise;
}

继续点击connet方法看源码

    private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
        try {
        //判断是否执行处理器的connect方法,该方法当前情况下返回true,执行下面的操作
        if (this.invokeHandler()) {
            try {
            //调用handler的connect方法来执行真正的连接方法。当前的handler()方法返回的结果就是DefaultChannelPipeline$HeadContext对象本身,因此会就如该类的connect方法
                ((ChannelOutboundHandler)this.handler()).connect(this, remoteAddress, localAddress, promise);
            } catch (Throwable var5) {
            //若连接异常则发出异常通知
                notifyOutboundHandlerException(var5, promise);
            }
        } else {
        //若不是执行处理器,则执行当前类中的connect方法
            this.connect(remoteAddress, localAddress, promise);
        }

    }

继续点击connet方法看源码

获取到底层 unsafe 对象进行连接

public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
            this.unsafe.connect(remoteAddress, localAddress, promise);
        }

继续点击connect方法看源码

       public final void connect(final SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
       //设置并检查promise合法性
            if (promise.setUncancellable() && this.ensureOpen(promise)) {
                try {
                //检查当前连接是否正在连接中
                    if (AbstractNioChannel.this.connectPromise != null) {
                        throw new ConnectionPendingException();
                    }

                    boolean wasActive = AbstractNioChannel.this.isActive();
                    //调用类NioSocketChannel的doConnect方法执行连接操作。如果返回true表示连接完成,则直接填充promise对象的结果
                    if (AbstractNioChannel.this.doConnect(remoteAddress, localAddress)) {
                        this.fulfillConnectPromise(promise, wasActive);
                    } else {
                    //当doConnect方法返回false的情况下的处理
                        AbstractNioChannel.this.connectPromise = promise;
                        AbstractNioChannel.this.requestedRemoteAddress = remoteAddress;
                        int connectTimeoutMillis = AbstractNioChannel.this.config().getConnectTimeoutMillis();
                        if (connectTimeoutMillis > 0) {
                        //添加一个定时调度任务。在设置的连接超时时间经过之后将promise状态设置为失败,失败原因是超时
                            AbstractNioChannel.this.connectTimeoutFuture = AbstractNioChannel.this.eventLoop().schedule(new Runnable() {
                                public void run() {
                                    ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                                    ConnectTimeoutException cause = new ConnectTimeoutException("connection timed out: " + remoteAddress);
                                    if (connectPromise != null && connectPromise.tryFailure(cause)) {
                                        AbstractNioUnsafe.this.close(AbstractNioUnsafe.this.voidPromise());
                                    }

                                }
                            }, (long)connectTimeoutMillis, TimeUnit.MILLISECONDS);
                        }
                            //给promise对象添加一个监听器。在操作完成的情况下出发该监听器的操作,如果连接被取消,则取消超时futrue和关闭连接
                        promise.addListener(new ChannelFutureListener() {
                            public void operationComplete(ChannelFuture future) throws Exception {
                                if (future.isCancelled()) {
                                    if (AbstractNioChannel.this.connectTimeoutFuture != null) {
                                        AbstractNioChannel.this.connectTimeoutFuture.cancel(false);
                                    }

                                    AbstractNioChannel.this.connectPromise = null;
                                    AbstractNioUnsafe.this.close(AbstractNioUnsafe.this.voidPromise());
                                }

                            }
                        });
                    }
                } catch (Throwable var6) {
                //异常通知处理
                    promise.tryFailure(this.annotateConnectException(var6, remoteAddress));
                    this.closeIfClosed();
                }

            }
        }

继续点击doConnect 方法看源码

 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
        //若传入localAddress,则先绑定该地址
        if (localAddress != null) {
        // 将localAddress绑定到channel
            this.doBind0(localAddress);
        }

        boolean success = false;

        boolean var5;
        try {
        // 连接server地址,若本次连接成功,则成功;若不成功,则当前channel的连接就绪
            boolean connected = SocketUtils.connect(this.javaChannel(), remoteAddress);
            if (!connected) {
        // 指定其关注的事件为  连接就绪
                this.selectionKey().interestOps(8);
            }

            success = true;
            var5 = connected;
        } finally {
            if (!success) {
                this.doClose();
            }

        }

        return var5;
    }

this.doBind0(localAddress)将Client 端指定的端口号绑定到 channel,localAddress 为配置类设置的 Client 端口号。

SocketUtils的connect方法是执行了真正的连接方法,这里首先在执行时直接进行连接,如果第一次连接成功则直接返回成功,如果失败,注册 Selector 事件 OP_ConNECT ,即将当前 channel 修改为连接就绪,后续执行到 run 方法时就会再次执行连接,直到连接成功,结束当前连接就绪.

c### 到此Client也讲完了,比Server少了很多代码量,整体代码看下来流程都差不多了

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号