- 1 Netty线程模型
- 1.1 传统阻塞 I/O 服务模型
- 1.2 Reactor线程模型
- 1.2.1 单 Reactor 单线程模型
- 1.2.2 单Reactor多线程
- 1.2.3 主从 Reactor 多线程
- 1.2.4 Reactor线程模型小结
- 1.3 Netty线程模型
- 1.3.1 简单版Netty模型
- 1.3.2 进阶版Netty模型
- 1.3.3 详细版Netty模型
- 2 Netty快速入门案例-TCP服务
- 2.1 服务端代码实现
- 2.2 客户端代码实现
- 3 Netty任务队列
- 3.1 用户自定义的普通任务
- 3.2 用户自定义定时任务
不同的线程模式,对程序的性能有很大影响,在学习Netty线程模式之前,首先需要了解下 各个线程模 式, 最后看看 Netty 线程模型有什么优越性。目前存在的线程模型有:
- 传统阻塞 I/O 服务模型
- Reactor 模式
- Reactor 单线程;
- 单 Reactor多线程;
- 主从 Reactor多线程
- 模型特点
- 采用阻塞 IO 模式获取输入的数据
- 每个连接都需要独立的线程完成数据的输入,业务处理,数据返回
- 存在问题
- 当并发数很大,就会创建大量的线程,占用很大系统资源
- 连接创建后,如果当前线程暂时没有数据可读,该线程会阻塞在 read 操作,造成线程资源浪费
针对传统阻塞 I/O 服务模型的 2 个缺点,解决方案:
- 基于 I/O 复用模型:多个连接共用一个阻塞对象,应用程序只需要在一个阻塞对象等待,无需阻塞等待所有连接。当某个连接有新的数据可以处理时,操作系统通知应用程序,线程从阻塞状态返回,开始进行业务处理
- 基于线程池复用线程资源:不必再为每个连接创建线程,将连接完成后的业务处理任务分配给线程进行处理,一个线程可以处理多个连接的业务。
IO复用结合线程池,就是Reactor模式的基本设计思想。
Reactor 模式中核心组成
- Reactor:Reactor 在一个单独的线程中运行,负责监听和分发事件,分发给适当的处理程序来对 IO 事件做出反应。它就像公司的电话接线员,它接听来自客户的电话并将线路转移到适当的联系人;
- Handlers:处理程序执行 I/O 事件要完成的实际事件,类似于客户想要与之交谈的公司中的实际官员。Reactor 通过调度适当的处理程序来响应 I/O 事件,处理程序执行非阻塞操作。
-
处理流程
- Selector是可以实现应用程序通过一个阻塞对象监听多路连接请求 ;
- Reactor 对象通过 Selector监控客户端请求事件,收到事件后通过 Dispatch 进行分发 ;
- 建立连接请求事件,则由 Acceptor 通过 Accept 处理连接请求,然后创建一个 Handler 对 象处理连接完成后的后续业务处理 ;
- Handler 会完成 Read→业务处理→Send 的完整业务流程。
-
优缺点
- 优点:模型简单,没有多线程、进程通信、竞争的问题,全部都在一个线程中完成
- 缺点:性能问题,只有一个线程,无法完全发挥多核 CPU 的性能。Handler在处理某个连接上的业务时,整个进程无法处理其他连接事件,很容易导致性能瓶颈
- 缺点:可靠性问题,线程意外终止,或者进入死循环,会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障
使用场景:客户端的数量有限,业务处理非常快速,比如 Redis 在业务处理的时间复杂度 O(1) 的情况
- 处理流程
- Reactor 对象通过 Select 监控客户端请求事件,收到事件后,通过 Dispatch 进行分发
- 如果建立连接请求,则右 Acceptor 通过 accept 处理连接请求,然后创建一个 Handler 对象处理完成连接后的各种事件
- 如果不是连接请求,则由 Reactor 分发调用连接对应的 handler 来处理
- handler 只负责响应事件,不做具体的业务处理,通过 read 读取数据后,会分发给后面的 worker 线程池的某个线程处理业务
- worker 线程池会分配独立线程完成真正的业务,并将结果返回给 handler
- handler 收到响应后,通过 send 将结果返回给 client
- 优缺点
- 优点:可以充分的利用多核 cpu 的处理能力
- 缺点:多线程数据共享和访问比较复杂,Reactor 处理所有的事件的监听和响应,在单线程运行,在高并发场景容易出现性能瓶颈。
- 处理流程
- Reactor 主线程 MainReactor 对象通过 select 监听连接事件,收到事件后,通过 Acceptor 处理连接事件
- 当 Acceptor 处理连接事件后,MainReactor 将连接分配给 SubReactor
- subreactor 将连接加入到连接队列进行监听,并创建 handler 进行各种事件处理
- 当有新事件发生时,subreactor 就会调用对应的 handler 处理
- handler 通过 read 读取数据,分发给后面的 worker 线程处理
- worker 线程池分配独立的 worker 线程进行业务处理,并返回结果
- handler 收到响应的结果后,再通过 send 将结果返回给 client
- Reactor 主线程可以对应多个 Reactor 子线程,即 MainRecator 可以关联多个 SubReactor
- 3中线程模型生活场景类比
- 单 Reactor 单线程,前台接待员和服务员是同一个人,全程为顾客服务
- 单 Reactor 多线程,1 个前台接待员,多个服务员,接待员只负责接待
- 主从 Reactor 多线程,多个前台接待员,多个服务生
- Reactor 模式具有如下的优点
- 响应快,不必为单个同步时间所阻塞,虽然 Reactor 本身依然是同步的
- 可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/进程的切换开销
- 扩展性好,可以方便的通过增加 Reactor 实例个数来充分利用 CPU 资源
- 复用性好,Reactor 模型本身与具体事件处理逻辑无关,具有很高的复用性
Netty 的设计主要基于主从 Reactor 多线程模式,并做了一定的改进。
1.3.1 简单版Netty模型- BossGroup 线程维护 Selector,只关注 Accecpt
- 当接收到 Accept 事件,获取到对应的 SocketChannel,封装成 NIOScoketChannel 并注册到 Worker 线程(事件循环),并进行维护
- 当 Worker 线程监听到 Selector 中通道发生自己感兴趣的事件后,就进行处理(就由 handler),注意 handler 已经加入到通道
- 有两组线程池:BossGroup 和 WorkerGroup,BossGroup 中的线程专门负责和客户端建立 连接,WorkerGroup 中的线程专门负责处理连接上的读写,BossGroup 和 WorkerGroup 类型都是 NioEventLoopGroup;
- BossGroup 和 WorkerGroup 含有多个不断循环的执行事件处理的线程,每个线程都包含一 个 Selector,用于监听注册在其上的 Channel;
- 每个 BossGroup 中的线程循环执行以下三个步骤
- 轮训注册在其上的 ServerSocketChannel 的 accept 事件(OP_ACCEPT 事件)
- 处理 accept 事件,与客户端建立连接,生成一个 NioSocketChannel,并将其注册到 WorkerGroup 中某个线程上的 Selector 上
- 处理任务队列的任务,即 runAllTasks
- 每个 WorkerGroup 中的线程循环执行以下三个步骤
- 轮询 read,write 事件
- 处理 I/O 事件,即 read,write 事件,在对应 NioScocketChannel 处理
- 处理任务队列的任务,即 runAllTasks
- 每个 Worker NIOEventLoop 处理业务时,会使用 pipeline,pipeline 中包含了 channel,即通过 pipeline 可以获取到对应通道,管道中维护了很多的处理器
- Netty 抽象出两组线程池:BossGroup 和 WorkerGroup。每个线程池中都有 NioEventLoop 线程。BossGroup 中的线程专门负责和客户端建立连接,WorkerGroup 中的 线程专门负责处理连接上的读写。BossGroup 和 WorkerGroup 的类型都是 NioEventLoopGroup
- NioEventLoopGroup 相当于一个事件循环组,这个组中含有多个事件循环,每个事件循环就 是一个 NioEventLoo
- NioEventLoop 表示一个不断循环的执行处理任务的线程,每个 NioEventLoop 都有一个 Selector,用于监听绑定在其上的 socket 的网络通讯
- NioEventLoopGroup 可以有多个线程,即可以含有多个 NioEventLoop
- 每个 BossNioEventLoop 循环执行的步骤有 3 步:
- 轮询 accept 事件
- 处理 accept 事件,与 client 建立连接,生成 NioScocketChannel,并将其注册到某个 worker NIOEventLoop 上的 Selector
- 处理任务队列的任务,即 runAllTasks
- 每个 WorkerNIOEventLoop 循环执行的步骤:
- 轮询 read,write 事件
- 处理 I/O 事件,即 read,write 事件,在对应 NioScocketChannel 处理
- 处理任务队列的任务,即 runAllTasks
- 在以上两个processSelectedKeys步骤中,会使用 Pipeline(管道),Pipeline 中引用了 Channel,即通过 Pipeline 可以获取到对应的 Channel,Pipeline 中维护了很多的处理器 (拦截处理器、过滤处理器、自定义处理器等)。
package com.warybee.simple;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
//1.创建bossGroup线程组: 处理网络连接事件 线程数默认为: 2 * 处理器线程数
EventLoopGroup bossGroup = new NioEventLoopGroup();
//2.创建workerGroup线程组: 处理网络事件--读写事件 2 * 处理器线程数
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//3.创建服务端启动助手
ServerBootstrap bootstrap=new ServerBootstrap();
//4.设置线程组
bootstrap.group(bossGroup,workerGroup)
//5.设置服务端通道实现
.channel(NioServerSocketChannel.class)
//6.参数设置-设置线程队列中等待连接个数
.option(ChannelOption.SO_BACKLOG,128)
//7.参数设置-设置活跃状态,child是设置workerGroup
.childOption(ChannelOption.SO_KEEPALIVE,true)
8.创建一个通道初始化对象
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//9.向pipeline中添加自定义业务处理handler
ch.pipeline().addLast(new NettyServerHandler());
}
});
System.out.println("服务端就绪");
//10.启动服务端并绑定端口,同时将异步改为同步
ChannelFuture channelFuture = bootstrap.bind(9999).sync();
//11.关闭通道(并不是真正意义上的关闭,而是监听通道关闭状态)和关闭连接池
channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
Netty服务端自定义Handler
package com.warybee.simple;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("server ctx:"+ctx);
ByteBuf byteBuffer=(ByteBuf)msg;
Channel channel = ctx.channel();
ChannelPipeline pipeline = ctx.pipeline();
System.out.println("客户端发送的消息是:"+byteBuffer.toString(StandardCharsets.UTF_8));
System.out.println("客户端地址:"+ctx.channel().remoteAddress());
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//发送消息给客户端
ctx.writeAndFlush(Unpooled.copiedBuffer("hello 客户端",StandardCharsets.UTF_8));
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
2.2 客户端代码实现
package com.warybee.simple;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
//创建线程组
EventLoopGroup eventExecutors = new NioEventLoopGroup();
try {
//创建客户端启动助手
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventExecutors)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyClientHandler());
}
});
//启动客户端, 等待连接服务端,
ChannelFuture cf = bootstrap.connect("127.0.0.1", 9999).sync();
//关闭通道和关闭连接池
cf.channel().closeFuture().sync();
}
finally {
eventExecutors.shutdownGracefully();
}
}
}
Netty客户端自定义Handler
package com.warybee.simple;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.nio.charset.StandardCharsets;
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client "+ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello 服务端", StandardCharsets.UTF_8));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf=(ByteBuf) msg;
System.out.println("服务端回复的消息:"+byteBuf.toString(StandardCharsets.UTF_8));
System.out.println("服务器地址:"+ctx.channel().remoteAddress());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
3 Netty任务队列
在使用Netty的时候,我们的业务处理都是放到我们自定义的handler里面,那么如果handler里面有一些执行比较耗时的操作的话,依旧会出现线程阻塞的情况,那么怎么来处理呢?
我们可以回过头去看看Netty的模型图,里面有一块是TaskQueue,这个就是Netty提供给我们的任务队列,可以用来异步处理任务,它是和channel是一一绑定的。
在定义Handler里面通过ChannelHandlerContext 获取channel
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
//TODO 这里可以执行耗时任务
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端", StandardCharsets.UTF_8));
}
});
System.out.println("client "+ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello 服务端", StandardCharsets.UTF_8));
}
3.2 用户自定义定时任务
任务是提交到 scheduleTaskQueue中
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//用户自定义定时任务,5秒后执行
ctx.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
try {
//TODO 这里可以执行耗时任务
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端", CharsetUtil.UTF_8));
System.out.println("channel code=" + ctx.channel().hashCode());
} catch (Exception ex) {
System.out.println("发生异常" + ex.getMessage());
}
}
}, 5, TimeUnit.SECONDS);
}



