- 导入架包,注意:netty4.1.6版本底层会报内存泄露。需要我们自己去清空ByteBuf。但是官网说4.1.21版本优化了ByteBuf回收机制。我这使用的netty4.1.25。
maven导入:会自动下载一些依赖的
org.yeauty netty-websocket-spring-boot-starter 0.8.0
- 搭建通道。使用http格式搭建通道,适用于高并发情况。
public class nettyConfig {
private final int port=8888; //端口
private void startServer() {
// 服务端需要2个线程组 boss处理客户端连接 work进行客服端连接之后的处理
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup work = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(boss, work).channel(NioServerSocketChannel.class)
.localAddress(port)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) //启用池化ByteBuff
.childHandler(new ChannelInitializer() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpServerCodec()); // HTTP 协议解析,用于握手阶段
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpObjectAggregator(1024 * 1024 * 100));
pipeline.addLast(new NettyHandler()); //自己业务类
pipeline.addLast(new WebSocketServerProtocolHandler("/", null, true, 65535*100*100)); // WebSocket// 握手、控制帧处理
}
});
// ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID); //用于打印内存是否泄露,如果泄露就会打印具体信息。PARANOID:代表每次发送数据都会检查是否内存泄露(测试时候推荐使用)
ChannelFuture f = b.bind(port).sync();
System.out.println(" 启动正在监听: " + f.channel().localAddress());
f.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭资源
boss.shutdownGracefully();
work.shutdownGracefully();
}
}
@PostConstruct
public void init() {
// 需要开启一个新的线程来执行netty server 服务器
new Thread(new Runnable() {
public void run() {
startServer();
}
}).start();
}
}
- 自己的业务实现类 记住实现这个SimpleChannelInboundHandler类。
@Sharable public class NettyHandler extends SimpleChannelInboundHandler{ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println(ctx.channel().id()); System.out.println("连接的客户端地址:" + ctx.channel().remoteAddress()); super.channelActive(ctx); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (null != msg && msg instanceof FullHttpRequest) { FullHttpRequest request = (FullHttpRequest) msg; log.info("调用 channelRead request.uri() [ {} ]", request.uri()); String uri = request.uri(); if (null != uri && uri.contains("/") && uri.contains("?")) { String[] uriArray = uri.split("\?"); if (null != uriArray && uriArray.length > 1) { String[] paramsArray = uriArray[1].split("="); if (null != paramsArray && paramsArray.length > 1) { //截取出sid后面的值 verificationConn(ctx, paramsArray[1]); //我自己方法 验证sid值是否符合我这边要求。符合后里面开启心跳等。 } } request.setUri("/"); } else { log.info("传参格式有问题 "); ctx.close(); } super.channelRead(ctx, msg); //重新调用父类方法,父类该方法会让ByteBuf 计数清零。从而达到数据被GC回收。如果没有调用该方法内存必会泄露。同时注意:该父类会判断出如果是TextWebSocketframe 数据类型会调用你重写的channelRead0;具体可以参考源码 } @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketframe msg) throws Exception { // TODO 处理每次发送过来的数据。 } }
以上就是完成这个需求重要的步骤。缺一不可的。剩余的一些业务自己处理即可。



