- 前言
- 1. 原生NIO存在的问题如下
- 一、三种Reactor模型
- 1. 单Reactor模型
- 2. 单Reactor多线程
- 3. 主从Reactor多线程
- 二、工作原理
- 1.一般模型
- 2.原理
- 三、编程实例
- NettyServer
- NettyServerHandler
- NettyClient
- NettyClientHandler
前言
介绍Netty的三种模型及其工作原理 , Netty编程Demo
1. 原生NIO存在的问题如下- NIO的类库,API复杂
- 需要对多线程和网络编程很熟悉,学习成本高
- 开发成本高 , 难以处理网络异常带来的问题
- 原生NIO会存在一些自有的BUG
提示:以下是本篇文章正文内容,下面案例可供参考
一、三种Reactor模型Reactor模式基本实现
- 基于IO复用模型,多个连接使用一个阻塞对象 , 只需要在一个阻塞对象等待
- 基于线程池复用线程资源 , 不再需要为每个连接创建线程
通过一个或多个输入同时传递给服务处理器(ServerHandler)
服务器端的程序会处理传入的多个请求,并将其同步分派到相应的处理线程 (网络服务高并发处理的关键)
reactor模式的核心组成 :
- reactor在一个单独的线程中运行 , 只负责监听和分发事件
- handler用于处理实际的业务
reactor和handler所在的线程是同一个线程
适用于客户端数量少, 业务处理速度很快的情况
- reactor对象通过select来监控客户端请求事件 , 在收到事件后通过dispatch分发事件
- 在建立连接后 , 通过Acceptor的accept处理连接请求 , 然后创建一个handler对象处理业务
- 如果不是连接请求 , 则由reactor对象分发调用连接对应的handler来做处理
- handler只负责响应事件 , 在读取数据后 , 把数据传给worker线程池的某个线程 , 由这个独立的线程做业务处理 , 最后将结果返回给handler
- 由handler将数据返回给客户端 , 而不是reactor
缺点 : 多线程数据共享 , 这比单个线程复杂 ; 单线程reactor处理所有的事件监听响应 , 而且handler也是运行在这单个reactor线程里的 , 在高并发场景容易出现性能瓶颈
3. 主从Reactor多线程相对于单reactor多线程模型 , 主从的方式是将handler从单个reactor中拆分出来了 , 将原先的reactor分成了两层
同时 , 我们的SubReactor是可以由多个的 , subreactor负责处理I/O的读取
二、工作原理netty主要基于主从reactor多线程模型做了改进
1.一般模型- bossgroup维护了一个selector , 只负责做Accept
- 在接收到Accept事件后 , 获取对应的SocketChannel并封装成NIOSocketChannel , 将其注册到worker线程中去
- 当worker线程监听到selector中发生了指定的事件后 , 交给handler去完成 , 此时handler已经加入到通道中去了
- Netty抽象出两组线程池 BossGroup ,专门用于接收客户端的连接 , WorkerGroup专门负责网络的读写
- BossGroup 和WorkerGroup的类型都是 NioEventLoopGroup
- NioEventLoopGroup相当于一个事件循环组 , 这个组中有多个事件循环 , 每个事件循环是一个NioEventLoop
- NioEventLoop 表示一个不断循环的执行处理任务的线程 , 每个NioEventLoop 都有一个selector , 用于监听绑定在其中的socket网络通信
- NioEventLoopGroup是可以包含多个线程的 , 即可以包含多个NioEventLoop , 其数量是可以指定的
- 每个Boss NioEventLoop有三个执行步骤
- 轮询accept事件
- 处理accept事件 , 建立与client的连接 , 生成一个NioSocketChannel , 并将其注册到某个Worker NioEventLoop的selector上
- 处理任务队列的任务 , 即runAllTasks
- 每个Worker NioEventLoop循环执行的步骤
- 轮询 read, write事件
- 在对应的NioSocketChannel上处理 read, write事件
- 处理任务队列的任务 , 即runAllTasks
- 每个Worker NioEventLoop在处理业务时 , 会使用pipeline管道 , pipeline包含channel
编程实例分为客户端和服务器端 , 及其各自的处理器
NettyServerpublic class NettyServer {
public static void main(String[] args) throws InterruptedException {
// bossGroup仅处理连接请求
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
// workerGroup和客户端做业务处理
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try{
// 创建服务器端启动的对象 , 配置启动参数
ServerBootstrap bootstrap = new ServerBootstrap();
// 链式编程设置参数
bootstrap.group(bossGroup,workerGroup) // 设置两个线程组
.channel(NioServerSocketChannel.class) // 使用NioSctpServerChannel作为服务器的通道实现
.option(ChannelOption.SO_BACKLOG,128) // 设置线程队列得到连接个数
.childOption(ChannelOption.SO_KEEPALIVE,true) // 设置保持活动连接状态
.childHandler(new ChannelInitializer() {
// 向pipeline 设置处理器
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 将自定义的处理器加入到管道中
socketChannel.pipeline().addLast(new NettyServerHandler());
}
});
System.out.println("服务器就绪");
//绑定一个端口并设置为同步
// 启动服务器
ChannelFuture cf = bootstrap.bind(6666).sync();
// 监听关闭通道事件
cf.channel().closeFuture().sync();
} catch (Exception e){
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
NettyServerHandler
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
// 自定义一个Handler需要继承netty规定好的HandlerAdapter
public static void main(String[] args) {
}
// 实质上读取数据操作就是在这里做的
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("server ctx = "+ctx);
// 将msg转成ByteBuf
ByteBuf buf = (ByteBuf) msg;
System.out.println("客户端发送的消息:"+buf.toString(CharsetUtil.UTF_8));
System.out.println("客户端地址"+ctx.channel().remoteAddress());
}
// 数据读取完毕
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// 将数据写入到缓存中并刷新
// 需要对发送的数据做编码
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,cli",CharsetUtil.UTF_8));
}
// 发生异常后关闭客户端
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
NettyClient
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
// 客户端一个事件循环组即可
NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
try{
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventExecutors) // 设置线程组
.channel(NioSocketChannel.class) //设置客户端通道
.handler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyClientHandler());
}
});
System.out.println("客户端就绪");
// 启动客户端连接服务器
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync();
channelFuture.channel().closeFuture().sync();
}catch (Exception e){
e.printStackTrace();
}finally {
eventExecutors.shutdownGracefully();
}
}
}
NettyClientHandler
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
//通道就绪时触发
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client"+ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello ,server", CharsetUtil.UTF_8));
}
// 当通道有读取事件时,触发
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("服务器回复的消息"+buf.toString(CharsetUtil.UTF_8));
System.out.println("服务器地址"+ctx.channel().remoteAddress());
}
//异常处理
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}



