- 三、Netty
- 3.1 原生 NIO 存在的问题
- 3.2 线程模型
- 3.3 Netty 模型
- 3.4 入门实例
- 3.5 taskQueue 自定义任务
- 3.6 异步模型原理
- 3.7 HTTP 服务案例
- 3.8 Netty 核心模块
- 3.9 Pipline 组件
- 3.10 进阶实例
- 3.11 心跳检测
- 3.12 WebSocket 长连接
1、NIO 库和 API 复杂,使用麻烦。
2、需要具备 Java 多线程,网络编程能力。
3、开发工总量大,难度大;断线重连,网络拥塞控制等。
4、BIO 的 bug,Epoll Bug 会导致 Selector 空轮询,最终导致 CPU 100%。
3.2 线程模型1、传统阻塞 I/O 模型
特点:
-
阻塞 IO 获取输入数据。
-
每个连接需要独立线程。
缺点:
-
并发数大,创建大量线程,浪费资源。
-
线程无数据可读时,会阻塞在 read,浪费资源。
2、Reactor 模型
- 基于 IO 复用,多个连接公用一个阻塞对象。
- 基于线程池,一个线程处理多个连接。
- 负责监听和分发事件,中介者。
3、单 Reactor 单线程。
上文的NIO 聊天室,就是采用模式的。
优点:
- 简单,所有都在一个线程中完成。
缺点:
- 无法发挥多核 CPU 的性能。
- 线程意外终止或死循环,会导致整个系统不可用,节点故障。
使用场景:客户端少,业务处理块。
4、单 Reactor 多线程。
优点:
- 充分利用多核 CPU。
缺点:
- 多线程数据共享,访问复杂。
- reactor 处理所有事件的监听和响应,在单线程中运行,高并发时出现性能瓶颈。
5、主从 Reactor 多线程。[Netty 基于它]
优点:
- Reactior 父子线程任务明确,父线程只接收新连接,子线程负责业务处理。
- 父子线程交互简单,父线程只需把连接给子线程,子线程无需返回数据。
缺点:
- 实现复杂。
简单版:
详解版:
1、Netty 有两组线程池:BossGroup[只接收新连接],WorkGroup[业务逻辑]。
2、BossGroup 和 WorkGroup 都是 NioEventLoopGroup 类型。
3、每个 NioEventLoop 都有 selector,用于绑定 socket 网络通讯。
3.4 入门实例1、客户端给服务器发送信息,服务器回复客户端 “你好,客户端”。
package com.sugar.netty.simple;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
// 创建 bossgroup 和 workergroup
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 服务器启动对象 配置参数
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup,workerGroup) // 设置两个线程组
.channel(NioServerSocketChannel.class) // 使用 NioServerSocketChannel
.option(ChannelOption.SO_BACKLOG,128) // 设置线程队列 连接个数
.childOption(ChannelOption.SO_KEEPALIVE,true) // 设置保持活动连接状态
.childHandler(new ChannelInitializer() {
// 给管道设置处理器
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyServerHandler());
}
});
System.out.println("服务器准备完成!");
// 绑定端口 - 同步处理
// 启动服务器
ChannelFuture cf = bootstrap.bind(6666).sync();
// 关闭通道监听
cf.channel().closeFuture().sync();
}finally {
// 出现异常直接关闭
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
package com.sugar.netty.simple;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.channel().close();
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// 回复客户端消息
ctx.writeAndFlush(Unpooled.copiedBuffer("你好,客户端",StandardCharsets.UTF_8));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("ctx = " + ctx);
// 接受客户端消息
ByteBuf buf = (ByteBuf) msg;
System.out.println("客户端发送:" + buf.toString(StandardCharsets.UTF_8));
System.out.println("客户端地址:" + ctx.channel().remoteAddress());
}
}
一些关系的图示:
3.5 taskQueue 自定义任务1、用户自定义普通任务。
// 解决方案1:用户自定义普通任务
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(10 * 1000);
}catch (Exception e) {
System.out.println("自定义普通任务 执行异常");
}
ctx.writeAndFlush(Unpooled.copiedBuffer("你好,客户端2",StandardCharsets.UTF_8));
}
});
2、用户自定义定时任务。
// 解决方案2:用户自定义定时任务
ctx.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
ctx.writeAndFlush(Unpooled.copiedBuffer("你好,客户端2",StandardCharsets.UTF_8));
}
},5, TimeUnit.SECONDS);
3、非当前 Reactor 调用 Channel 方法。
略
3.6 异步模型原理介绍
1、调用者不能立刻得到结果,调用完成后通过回调通知调用者。
2、Netty 的 IO 操作是异步的。
Future
1、异步执行的结果,通过它的方法检测执行是否完成。
2、ChannelFulture 是一个接口,可以添加监听器,监听事件发生时通知监听器。
Future Listener 机制
1、Future 创建时处于非完成状态,可以用过 ChannelFulture 查询执行状态、注册监听器等。
2、案例:实现异步绑定端口,绑定完毕后调用相应监听器输出是否成功。
// 给 ChannelFuture 注册监听器,监控是否注册成功,并输出
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()) {
System.out.println("绑定端口6666成功");
}else {
System.out.println("绑定失败:" + channelFuture.isCancellable());
}
}
});
3、相比传统 IO,线程会被阻塞直到操作完成(这里指绑定端口);异步则不会阻塞,可以去做别的事,当绑定成功,监听器会通知你。
3.7 HTTP 服务案例Netty 可以做 HTTP 服务开发
每个客户端独享一个 Pipline,每个 PipLine 存放多个 Handler。
案例:监听 6677 端口,服务器给客户端回复 “Hello Netty”,并过滤特定请求资源。
服务端:
package com.sugar.netty.http;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class HttpServer {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new HttpServerInitializer());
ChannelFuture cf = serverBootstrap.bind(6677).sync();
cf.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
服务器初始化器:
package com.sugar.netty.http; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpServerCodec; public class HttpServerInitializer extends ChannelInitializer{ @Override protected void initChannel(SocketChannel socketChannel) throws Exception { // 获取管道 ChannelPipeline pipeline = socketChannel.pipeline(); // 加入解码器 pipeline.addLast("我的解码器",new HttpServerCodec()); // 加入自定义处理器 pipeline.addLast("我的处理器",new HttpServerHandler()); } }
处理器:
package com.sugar.netty.http; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.*; import io.netty.util.CharsetUtil; import java.net.URI; public class HttpServerHandler extends SimpleChannelInboundHandler3.8 Netty 核心模块{ @Override protected void channelRead0(ChannelHandlerContext ctx, HttpObject httpObject) throws Exception { // 判断 msg 是不是一个 request 请求 if (httpObject instanceof HttpRequest) { System.out.println("管道哈希值:" + ctx.pipeline().hashCode() + " 处理器哈希值:" + this.hashCode()); System.out.println("客户端地址:" + ctx.channel().remoteAddress()); // 请求过滤 HttpRequest request = (HttpRequest) httpObject; URI uri = new URI(request.uri()); if ("/favicon.ico".equals(uri.getPath())) { System.out.println("此请求被过滤"); return; } // 回复浏览器 [回复客户端 - http协议] ByteBuf buf = Unpooled.copiedBuffer("Hello Netty", CharsetUtil.UTF_8); // 构造 Http 协议 httpResponse DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK,buf); response.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/plain"); response.headers().set(HttpHeaderNames.CONTENT_LENGTH,buf.readableBytes()); // 发送 ctx.writeAndFlush(response); } } }
1、Bootstrap,ServerBootstrap
- Bootstrap配置整个 Netty 程序 ,串联组件,是客户端引导类,ServerBootstrap 是服务端引导类。
2、Future,ChannelFuture
- Netty 所有操作都是异步,无需立刻获得结果,而由监听器通知调用者和触发监听事件。
3、Channel
- 网络通信组件,执行异步 IO 操作。
4、Selector
- 基于 Selector 实现多路复用,一个 Selector 可以监听多个 Channel。
- Channel 向 Selector 注册,不断查询 Channel 中是否有就绪事件,
5、ChannelHandler
- 处理 IO 事件,并转发到 Pipline 下一个 Handler 中。
1、Pipline 相当与一个链表,储存了许多 Handler,类似一种链式过滤器模式。
2、Channel 包含的 ChannelPipline 中维护了一个 ChannelHandlerContext 组成的双向链表,每个 ChannelHanderContext 关联着一个 ChannelHandler。
3.10 进阶实例Netty 群聊系统
要求:实现多人群聊系统。服务端监控用户上下线,消息转发;客户端发送消息给其他用户。
package com.sugar.netty.XChatGroup;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.util.Scanner;
public class GroupClient {
private final String host;
private final int port;
public GroupClient (String host,int port) {
this.host = host;
this.port = port;
}
public void run () throws InterruptedException {
// 创建线程组
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.handler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 添加处理器
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast("我的解码器",new StringDecoder());
pipeline.addLast("我的编码器",new StringEncoder());
pipeline.addLast("我的业务处理器",new GroupClientHandler());
}
});
ChannelFuture cf = bootstrap.connect(host, port).sync();
cf.channel().closeFuture();
//----------------------------------------------------------------------------------
Channel channel = cf.channel();
System.out.println("[客户端 - " + channel.localAddress() + "] 启动完毕");
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String msg = scanner.nextLine();
// 发送数据
channel.writeAndFlush(msg + "n") ;
}
}finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
GroupClient groupClient = new GroupClient("127.0.0.1", 7000);
groupClient.run();
}
}
package com.sugar.netty.XChatGroup; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; public class GroupClientHandler extends SimpleChannelInboundHandler{ @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception { System.out.println(s); } }
package com.sugar.netty.XChatGroup;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class GroupServer {
private final int port;
public GroupServer (int port) {
this.port = port;
}
// 处理客户端请求
public void run () throws InterruptedException {
// 创建线程组
EventLoopGroup boss = new NioEventLoopGroup(1);
EventLoopGroup work = new NioEventLoopGroup(8);
try {
// 启动器
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss,work)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,128)
.childOption(ChannelOption.SO_KEEPALIVE,true)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 添加处理器
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast("我的解码器",new StringDecoder());
pipeline.addLast("我的编码器",new StringEncoder());
pipeline.addLast("我的业务处理器",new GroupServerHandler());
}
});
System.out.println("Netty 启动完成");
ChannelFuture cf = bootstrap.bind(port).sync();
cf.channel().closeFuture().sync();
}finally {
boss.shutdownGracefully();
work.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
GroupServer groupServer = new GroupServer(7000);
groupServer.run();
}
}
package com.sugar.netty.XChatGroup; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; public class GroupServerHandler extends SimpleChannelInboundHandler3.11 心跳检测{ // 定义所有 Channel 组,方便管理 private static final ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // 推送给其他客户端,当前用户上线了 Channel channel = ctx.channel(); channelGroup.writeAndFlush("[客户端 - " + channel.remoteAddress() + "] 加入聊天!n"); System.out.println("当前在线用户:" + channelGroup.size()); // 加入频道组 channelGroup.add(channel); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); channelGroup.writeAndFlush("[客户端 - " + channel.remoteAddress() + "] 离开聊天!n"); System.out.println("当前在线用户:" + channelGroup.size()); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("[客户端 - " + ctx.channel().remoteAddress() + "] 上线!n"); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("[客户端 - " + ctx.channel().remoteAddress() + "] 离线!n"); } @Override protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception { // 发送者频道 Channel channel = ctx.channel(); // 转发,除去自己 channelGroup.forEach(ch -> { if (channel != ch) { ch.writeAndFlush("[客户端 - " + channel.remoteAddress() + "] 说:" + s + "n"); } }); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
实时感知客户端是否存在
心跳检测器会会将事件转发给管道下一个处理器。
// 心跳检测器
pipeline.addLast("心跳检测器",new IdleStateHandler(3,5,7, TimeUnit.SECONDS));
// 心跳处理器
pipeline.addLast("心跳处理器",new ServerHandler());
package com.sugar.netty.HeartBeat;
import com.sun.scenario.effect.impl.sw.sse.SSEBlend_SRC_OUTPeer;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
public class ServerHandler extends SimpleChannelInboundHandler {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
String eventType = null;
switch (event.state()) {
case READER_IDLE:
eventType = "读空闲";
break;
case WRITER_IDLE:
eventType = "写空闲";
break;
case ALL_IDLE:
eventType = "读写空闲";
break;
}
System.out.println(ctx.channel().remoteAddress() + "超时时间:" + eventType);
System.out.println("服务器进行相关处理.");
}
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
}
}
3.12 WebSocket 长连接
Http 协议是无状态的,请求响应一次,下次仍会建立新连接。
案例:
- 实现基于 WebSocket 的长连接全双工交互。
- 服务器,客户端可以互发消息;并且可以相互感知,谁关闭。
package com.sugar.netty.WebSocket;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
public class Server {
public static void main(String[] args) throws InterruptedException {
// 创建线程组
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup work = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss,work)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO)) // boss 添加日志处理器
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// 基于 http 协议,加入 http 解码器编码器
pipeline.addLast(new HttpServerCodec());
// 以块方式写,加入 块处理器
pipeline.addLast(new ChunkedWriteHandler());
// http 数据传输中是分段的,加入 段聚合处理器
pipeline.addLast(new HttpObjectAggregator(8192));
// websocket 数据以帧传输
// 浏览器请求URL: ws://127.0.0.1:7000/hello
// 核心功能将 http协议 ------> ws协议
pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));
// 我的业务处理器
pipeline.addLast(new ServerHandler());
}
});
ChannelFuture cf = bootstrap.bind(7000).sync();
cf.channel().closeFuture().sync();
System.out.println("服务器启动完毕");
}finally {
boss.shutdownGracefully();
work.shutdownGracefully();
}
}
}
package com.sugar.netty.WebSocket; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.websocketx.TextWebSocketframe; import java.time.LocalDateTime; public class ServerHandler extends SimpleChannelInboundHandler{ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.channel().close(); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { // 唯一 id System.out.println("[handlerRemoved] " + ctx.channel().id().asLongText()); System.out.println("[handlerRemoved] " + ctx.channel().id().asShortText()); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // 唯一 id System.out.println("[handlerAdded] " + ctx.channel().id().asLongText()); System.out.println("[handlerAdded] " + ctx.channel().id().asShortText()); } @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketframe textWebSocketframe) throws Exception { // 接受消息 System.out.println("收到消息:" + textWebSocketframe.text()); // 回复消息 ctx.channel().writeAndFlush(new TextWebSocketframe("[服务器] - " + LocalDateTime.now() + " " + textWebSocketframe.text())); } }
WebSocket长连接 Demo



