入站从pipeline的头部开始执行
出站从pipeline的尾部开始执行
服务端代码
public class NettyServer {
public static void main(String[] args) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new MyEncode());
pipeline.addLast(new MyDecode());
pipeline.addLast(new MyServerHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(6668).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
public class MyServerHandler extends SimpleChannelInboundHandler{ @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, Long aLong) throws Exception { System.out.println("服务端接收到消息"+aLong); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { // System.out.println("服务端发送数据"); // ctx.writeAndFlush(98765L); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); cause.printStackTrace(); } }
客户端代码
public class NettyClient {
public static void main(String[] args) {
NioEventLoopGroup loopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(loopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new MyEncode());
pipeline.addLast(new MyDecode());
pipeline.addLast(new MyClientHanlder());
}
});
ChannelFuture channelFuture = bootstrap.connect("localhost", 6668).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
loopGroup.shutdownGracefully();
}
}
}
public class MyClientHanlder extends SimpleChannelInboundHandler{ @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, Long aLong) throws Exception { // System.out.println("客户端接收消息"+aLong); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("发送信息"); // ctx.writeAndFlush(1234L); // long 是8字节 解码一次即可 // "abcdabcdabcdabcd" 是十六字节 所以解码两次 ctx.writeAndFlush(Unpooled.copiedBuffer("abcdabcdabcdabcd", CharsetUtil.UTF_8)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); cause.printStackTrace(); } }
编解码
public class MyEncode extends MessageToByteEncoder{ @Override protected void encode(ChannelHandlerContext channelHandlerContext, Long aLong, ByteBuf byteBuf) throws Exception { System.out.println("编码---------------------"); byteBuf.writeLong(aLong); } }
public class MyDecode extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List
总结
1.从上边的运行结果可以看出,只有符合编码条件才会编码
ctx.writeAndFlush(Unpooled.copiedBuffer("abcdabcdabcdabcd", CharsetUtil.UTF_8));
写入字符串不属于long类型,所以不会经过编码器。
2.解码时如果传入msg大于指定数字,则会进行多次解码
ctx.writeAndFlush(Unpooled.copiedBuffer("abcdabcdabcdabcd", CharsetUtil.UTF_8));
字符串是16字节,所以进行两次解码(以上边例子为例)。
粘包和拆包——发现问题由于TCP无消息保护边界,需要在接收端处理边界问题。
若不处理,则会出现的现象
服务端代码
public class NettyServer {
public static void main(String[] args) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new MyServerHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(6668).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
handler
public class MyServerHandler extends SimpleChannelInboundHandler{ @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf msg) throws Exception { byte[] b = new byte[msg.readableBytes()]; msg.readBytes(b); System.out.println("服务端接收到消息:"+new String(b, CharsetUtil.UTF_8)); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { // System.out.println("服务端发送数据"); // ctx.writeAndFlush(98765L); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); cause.printStackTrace(); } }
客户端代码
public class NettyClient {
public static void main(String[] args) {
NioEventLoopGroup loopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(loopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new MyClientHanlder());
}
});
ChannelFuture channelFuture = bootstrap.connect("localhost", 6668).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
loopGroup.shutdownGracefully();
}
}
}
handler
public class MyClientHanlder extends SimpleChannelInboundHandler{ @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf msg) throws Exception { } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { for (int i = 0; i <10 ; i++) { ByteBuf buf = Unpooled.copiedBuffer("hello"+i, CharsetUtil.UTF_8); ctx.writeAndFlush(buf); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); cause.printStackTrace(); } }
运行结果展示
第一次运行
服务端接收到消息:hello0hello1hello2hello3hello4hello5hello6hello7hello8hello9
第二次运行
服务端接收到消息:hello0
服务端接收到消息:hello1hello2
服务端接收到消息:hello3
服务端接收到消息:hello4
服务端接收到消息:hello5
服务端接收到消息:hello6
服务端接收到消息:hello7
服务端接收到消息:hello8
服务端接收到消息:hello9
第三次运行
粘包和拆包——解决问题服务端接收到消息:hello0
服务端接收到消息:hello1
服务端接收到消息:hello2hello3hello4hello5hello6hello7hello8hello9
自定义协议+编解码器
协议实体类
@Data
@AllArgsConstructor
@NoArgsConstructor
public class MessageProtocol {
private int length;
private byte[] content;
}
服务端
public class NettyServer {
public static void main(String[] args) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new MyMessageEncode());
pipeline.addLast(new MyMessageDecode());
pipeline.addLast(new MyServerHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(6668).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
handler
public class MyServerHandler extends SimpleChannelInboundHandler{ @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, MessageProtocol msg) throws Exception { System.out.println("服务端接收到消息:"+new String(msg.getContent(), CharsetUtil.UTF_8)); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { // System.out.println("服务端发送数据"); // ctx.writeAndFlush(98765L); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); cause.printStackTrace(); } }
客户端
public class NettyClient {
public static void main(String[] args) {
NioEventLoopGroup loopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(loopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new MyMessageEncode());
pipeline.addLast(new MyMessageDecode());
pipeline.addLast(new MyClientHanlder());
}
});
ChannelFuture channelFuture = bootstrap.connect("localhost", 6668).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
loopGroup.shutdownGracefully();
}
}
}
handler
public class MyClientHanlder extends SimpleChannelInboundHandler{ @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, MessageProtocol msg) throws Exception { } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { for (int i = 0; i <10 ; i++) { byte[] content = ("测试用例"+i).getBytes(); int length = content.length; ctx.writeAndFlush(new MessageProtocol(length,content)); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); cause.printStackTrace(); } }
编解码器
public class MyMessageEncode extends MessageToByteEncoder{ @Override protected void encode(ChannelHandlerContext channelHandlerContext, MessageProtocol messageProtocol, ByteBuf byteBuf) throws Exception { byteBuf.writeInt(messageProtocol.getLength()); byteBuf.writeBytes(messageProtocol.getContent()); } }
public class MyMessageDecode extends ReplayingDecoder{ @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List
效果
每次运行都不出现粘包
服务端接收到消息:测试用例0
服务端接收到消息:测试用例1
服务端接收到消息:测试用例2
服务端接收到消息:测试用例3
服务端接收到消息:测试用例4
服务端接收到消息:测试用例5
服务端接收到消息:测试用例6
服务端接收到消息:测试用例7
服务端接收到消息:测试用例8
服务端接收到消息:测试用例9



