栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Netty 实现高并发通讯原理理解

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Netty 实现高并发通讯原理理解

1.前言

 

        最近写了很多关于Netty应用级别的文章,针对为什么选择Netty来实现高并发通讯,Netty实现高并发通讯的原理是什么?今天有时间把我对Netty的一些理解做个简单的说明,如有不对欢迎指正与探讨。

2.Netty工作原理

         上图是我对Netty实现高并发通讯工作机制的一个简单理解,当我们的智能终端接入到网关程序后,首先是由一个线程池进行进行绑定操作,我们叫它(parentGroup),绑定之后将后续的事情交给另外一个线程池进行后续的处理,我们叫它(childGroup)

parentGroup 负责处理客户端的TCP连接请求,如果服务端只有一个端口需要监听,建议将其线程数设置为1,相当于Reactor模型中的Acceptor线程池。

childGroup 负责I/O的读写操作,通过 ServerBootstrap 的 group 方法进行设置,用于后续产生的 Channel 绑定,相当于Reactor模型中的NIO线程池。可以不设置线程数,默认为当前服务器的处理器核心数*2。

        这里会有人疑问,并没有提及Netty如何实现高并发通讯机制,接触过Netty的都清楚,Netty是NIO框架,使用到Selector组件进行实现多路复用,其实通过上图workGroup一个EventLoop管理多个Channel,其实就是采用的Selector来实现的。

        另外Netty采用的是零拷贝ByteBuf,可以减少内存复制,从而减少GC,提升效率,并且其相应的read与write操作也是非常好用的。

3.实现代码解析
package com.gateway.xxxx.netty;

import com.gateway.xxxx.constants.Constant;
import com.gateway.xxxx.netty.codec.frameDecoder;
import com.gateway.xxxx.netty.codec.ProtocolDecoder;
import com.gateway.xxxx.netty.codec.ProtocolEncoder;
import com.gateway.xxxx.netty.handler.BusinessHandler;
import com.gateway.xxxx.netty.handler.LoginHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.EventExecutorGroup;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;


@Slf4j
public class TcpServer extends Thread{
    private int port;

    private EventLoopGroup bossGroup;

    private EventLoopGroup workerGroup;

    private ServerBootstrap serverBootstrap = new ServerBootstrap();
    //这里引入计数器的意思是如果开启多个TCP服务,可以通过计数器来判断是否所有的tcp服务都已经正常启动
    private CountDownLatch countDownLatch;

    public TcpServer(int port, CountDownLatch countDownLatch) {
        this.port = port;
        this.countDownLatch = countDownLatch;

        bossGroup = new NioEventLoopGroup(1);
        workerGroup = new NioEventLoopGroup();
        //启动一个EventExecutorGroup来处理比较耗时的业务
        final EventExecutorGroup executorGroup = new DefaultEventExecutorGroup(Math.max(threadPoolSize, Runtime.getRuntime().availableProcessors() * 2));
        serverBootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childHandler(new ChannelInitializer() {

                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new IdleStateHandler(HcIotConstant.READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS));
                        ch.pipeline().addLast(new frameDecoder());
                        //添加一个接收数据包预处理业务,和统一的指令下发封装业务
                        ch.pipeline().addLast(ProtocolDecoder.INSTANCE, new ProtocolEncoder());
                        //添加一个登录鉴权
                        ch.pipeline().addLast(LoginHandler.INSTANCE);
                        //添加一个业务处理(业务处理比较耗时,这里可以使用EventExecutorGroup来处理比较耗时的业务)
                        ch.pipeline().addLast(executorGroup,BusinessHandler.INSTANCE);
                    }
                });
    }

    @Override
    public void run() {
        bind();
    }

    
    private void bind() {
        serverBootstrap.bind(port).addListener(future -> {
            if (future.isSuccess()) {
                log.info("{} TCP服务器启动,端口:{}", protocolType, port);
                countDownLatch.countDown();
            } else {
                log.error("{} TCP服务器启动失败,端口:{}", protocolType, port, future.cause());
                System.exit(-1);
            }
        });
    }

    
    public void shutdown() {
        workerGroup.shutdownGracefully();
        bossGroup.shutdownGracefully();
        log.info("{} TCP服务器关闭,端口:{}", protocolType, port);
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/643473.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号