Netty之ByteBuf原理解析
Netty框架之责任链模式及其应用
前言之前文章解析Netty的责任链框架及bytebuf 分析netty的 可复用 动态扩容、零拷贝机制、达到高效,API使用更加便捷等好处;这篇文章会继续分析 netty的启动类、以及编解码器、各种协议的支持、及tcp粘包拆包的解决
Netty引导 Bootstrap BootStrap是Netty中负责引导服务器和客户端启动,它将ChannelPipeline、 ChannelHandler和EventLoop组织起来,成为一个可实际运行的引用程序。简单来说,引 导一个应用程序是指对它进行配置,并运行起来的过程。 Bootstrap:引导客户端运行
public final class EchoClient {
static final String HOST = System.getProperty("host", "127.0.0.1");
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));
public static void main(String[] args) throws Exception {
// Configure the client.
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new EchoClientHandler());
}
});
// Start the client.
ChannelFuture f = b.connect(HOST, PORT).sync();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down the event loop to terminate all threads.
group.shutdownGracefully();
}
}
}
ServerBootstrap:引导服务端运行
public class NettyStarter {
public static void main(String[] args) throws InterruptedException {
// 主线程组 处理客户连接
NioEventLoopGroup mainGroup = new NioEventLoopGroup(1);
// 工人线程组,处理客户端的请求 读取 和写入
NioEventLoopGroup subGroup = new NioEventLoopGroup();
// 创建启动器, 并配置
ServerBootstrap boostrap = new ServerBootstrap();
boostrap.group(mainGroup, subGroup).option(ChannelOption.SO_BACKLOG, 1024).channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO));
// 绑定端口并使用
Channel channel = boostrap.bind(8081).sync().channel();
// 监听channeal 释放并 停止 线程组
channel.closeFuture().addListeners(future -> {
mainGroup.shutdownGracefully();
subGroup.shutdownGracefully();
});
}
}
- group:设置工作组
- channel:设置传输类型
- option: 启动时配置参数
- handler 对数据添加处理
这里的bind方法 在AbstractBootstrap 中实现,然后根据 doBind方法进去
创建通道的channel方法 这里需要传入对应的类进来,因此在启动的时候需要指定对应的NioSocketChannel.class
public B channel(Class extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory(channelClass));
}
然后在ServerBootstrap 中init方法继续下去创建 pipline
这里会将handler给添加到pipline中
调用到channelRead方法 在eventloop中收到连接或者请求到这里 里面有run 方法循环监听的
当初始化完毕,将channel进行注册
然后这里的ChannelFuture 是获的返回值的。
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
这里 启动的一个线程去注册(initAndRegister)方法,然后判断future 是否注册成功,注册成功,就调用bind方法进行绑定端口,没有成功就添加监听事件,等待注册成功,然后在绑定
最后调用到NioServerSocketChannel的 dobind绑定方法
TCP 粘包 拆包 TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行包的 划分,所以在业务上认为,一个完整的包可能会被TCP拆分成多个包进行发送,也有可 能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。客户端和服务端,都有一个数据缓存区 memory,因为它并不知道业务场景下数据有多大,默认是1024,它会自己定义只有到1024个字节时,才发送,有可能一次请求的数据是小于1024的,也有可能是大于1024的,因此 导致数据不正确。
客户端发送的数据如果大于1024,而一次请求是发送数据的一部分,到服务端的内存中,这时候服务端访问数据,这个数据就是错误的。这就是拆包问题
以及超时时间
出现问题的实例public static void main(String[] args) throws Exception {
Socket socket = new Socket("127.0.0.1", 9999);
socket.setTcpNoDelay(true);
OutputStream outputStream = socket.getOutputStream();
// 消息长度固定为 160字节,包含有
// 1. 目标用户ID长度为10, 10 000 000 000 ~ 19 999 999 999
// 2. 消息内容字符串长度最多48。 按一个汉字3字节,内容的最大长度为144字节
byte[] request = new byte[160];
byte[] userId = "10000000000".getBytes();
byte[] content = "测试测试测试测试测试测试测试测试测试测试测试测试测试测试测试测试测试测试测试测试测试测试测试测试".getBytes();
System.arraycopy(userId, 0, request, 0, 10);
System.arraycopy(content, 0, request, 10, content.length);
CountDownLatch countDownLatch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
countDownLatch.countDown();
outputStream.write(request);
} catch (IOException e) {
e.printStackTrace();
}
}).start();
}
countDownLatch.await();
for (int i = 0; i < 10; i++) {
outputStream.write(request);
}
Thread.sleep(2000L); // 两秒后退出
socket.close();
}
服务端用一个netty去接收数据
public static void main(String[] args) throws Exception {
// 1、 线程定义
// accept 处理连接的线程池
EventLoopGroup acceptGroup = new NioEventLoopGroup();
// read io 处理数据的线程池
EventLoopGroup readGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(acceptGroup, readGroup);
// 2、 选择TCP协议,NIO的实现方式
b.channel(NioServerSocketChannel.class);
b.handler(new LoggingHandler(LogLevel.INFO));
b.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 3、 职责链定义(请求收到后怎么处理)
ChannelPipeline pipeline = ch.pipeline();
// TODO 3.2 打印出内容 handdler
pipeline.addLast(new XHandller());
}
});
// 4、 绑定端口
System.out.println("启动成功,端口 9999");
Channel channel = b.bind(new InetSocketAddress(9999)).sync().channel();
System.out.println(channel.localAddress());
channel.closeFuture().sync();
} finally {
acceptGroup.shutdownGracefully();
readGroup.shutdownGracefully();
}
}
服务端收到的结果是 都凑到一堆去了
Netty中的解决办法 TCP的粘包、拆包问题,可以通过自定义通信协议的方式来解决。通信协议就是通信双方约定好 的数据格式,发送方按照这个数据格式来发送,接受方按照这个格式来解析。 典型的协议包括:定长协议、特殊字符分隔符协议、报文头指定Length等。在确定了使用什么通 信协议的情况下,发送方和接收方要完成的工作不同。 编码: 发送方要将发送的二进制数据转换成协议规定的格式的二进制数据流,称之为编码 (encode),编码功能由编码器(encoder)完成。 解码: 接收方需要根据协议的格式,对二进制数据进行解析,称之为解码(decode),解码功能由 解码器(decoder)完成。 编解码: 既能编码,又能解码,则称之为编码解码器(codec)。这种组件在发送方和接收方都可 以使用。 Netty解码器Netty中主要提供了抽象基类ByteToMessageDecoder 和MessageToMessageDecoder 实现了ChannelInboundHandler 接口。
- ByteToMessageDecoder:用于将接收到的二进制数据(byte)解码,得到完整的请求报文 (Message)。 抽象的类型 例如 json 对象、object
实现:
FixedLengthframeDecoder: 定长协议解码器,可以指定固定的字节数算一个完整的报文 LinebasedframeDecoder: 行分隔符解码器,遇到n或者rn,则认为是一个完整的报文 DelimiterbasedframeDecoder:分隔符解码器,与LinebasedframeDecoder类似,只不过分 隔符可以自己指定 LengthFieldbasedframeDecoder: 长度编码解码器,将报文划分为报文头/报文体,根据报 文头中的Length字段确定报文体的长度,因此报文提的长度是可变的 JsonObjectDecoder: json格式解码器,当检测到匹配数量的"{" 、”}”或”[””]”时, 则认为是一个完整的json对象或者json数组。- MessageToMessageDecoder:将一个本身就包含完整报文信息的对象转换成另一个Java对象。
public class XDecoder extends ByteToMessageDecoder {
static final int PACKET_SIZE = 160;
// 用来临时保留没有处理过的请求报文
ByteBuf tempMsg = Unpooled.buffer();
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List
总结
整篇文章主要介绍的是Netty的启动类,及为了解决tcp的粘包和拆包,而出现的编解码器。对源码解析,在springboot中其实也是采用这种方式进行启动,将它进行整合;则需要看springboot start中的整合。



