- 前言
- 一、Netty是什么?
- 二、使用步骤
- 引入库
- 请求消息类
- Netty配置
- 服务端
- Netty服务端启动器
- Netty服务端主处理器
- Netty服务端监听消息处理器
- 客户端
- Netty客户端启动器
- Netty客户端监听消息处理器
- 公共
- Netty属性配置
- 编码器
- 解码器
- Netty开关注解
- Netty客户服务端开关整合注解
- Netty客户端开关注解
- Netty服务端开关注解
- 项目启动类开启netty注解
- 配置netty服务器相关信息
- 三、效果展示
- 四、总结
- 五、项目源码
前言
本文介绍了springboot集成netty的配置以及使用方式,同时提供一种方式解决常见的tcp粘包拆包问题。
提示:以下是本篇文章正文内容,下面案例可供参考
一、Netty是什么?是一个高性能、高可靠性的基于NIO封装的网络应用框架
二、使用步骤 引入库请求消息类io.netty netty-all
@Data
@AllArgsConstructor
@NoArgsConstructor
public class MessageBean {
private Integer len;
private byte[] content;
public MessageBean(Object object){
content = JSONUtil.toJsonStr(object).getBytes(StandardCharsets.UTF_8);
len = content.length;
}
}
Netty配置
@Configuration
@EnableConfigurationProperties
public class NettyConfig {
@Autowired
MyNettyProperties myNettyProperties;
@Bean
public NioEventLoopGroup boosGroup(){
return new NioEventLoopGroup(myNettyProperties.getBoss());
}
@Bean
public NioEventLoopGroup workerGroup(){
return new NioEventLoopGroup(myNettyProperties.getWorker());
}
@Bean
public ServerBootstrap serverBootstrap(){
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
.group(boosGroup(),workerGroup()) // 指定使用的线程组
.channel(NioServerSocketChannel.class) // 指定使用的通道
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,myNettyProperties.getTimeout()) // 指定连接超时时间
.childHandler(new ServerHandler()); // 指定worker处理器
return serverBootstrap;
}
@Bean
public Bootstrap bootstrap(){
// 新建一组线程池
NioEventLoopGroup eventExecutors = new NioEventLoopGroup(myNettyProperties.getBoss());
Bootstrap bootstrap = new Bootstrap();
bootstrap
.group(eventExecutors) // 指定线程组
.option(ChannelOption.SO_KEEPALIVE, true)
.channel(NioSocketChannel.class) // 指定通道
.handler(new ClientHandler()); // 指定处理器
return bootstrap;
}
}
服务端
Netty服务端启动器
@Component
@Slf4j
public class ServerBoot {
@Autowired
ServerBootstrap serverBootstrap;
@Resource
NioEventLoopGroup boosGroup;
@Resource
NioEventLoopGroup workerGroup;
@Autowired
MyNettyProperties nettyProperties;
@PostConstruct
public void start() throws InterruptedException {
// 绑定端口启动
serverBootstrap.bind(nettyProperties.getPort()).sync();
serverBootstrap.bind(nettyProperties.getPortSalve()).sync();
log.info("启动Netty多端口服务器: {},{}",nettyProperties.getPort(),nettyProperties.getPortSalve());
}
@PreDestroy
public void close() throws InterruptedException {
log.info("优雅得关闭Netty服务器");
boosGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
Netty服务端主处理器
public class ServerHandler extends ChannelInitializer{ @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new MessageDecodeHandler()); pipeline.addLast(new MessageEncodeHandler()); pipeline.addLast(new ServerListenerHandler()); } }
Netty服务端监听消息处理器
@Slf4j @ChannelHandler.Sharable public class ServerListenerHandler extends SimpleChannelInboundHandler{ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("{}客户端连接进来了",ctx.channel().remoteAddress()); ctx.fireChannelActive(); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.info("{}连接断开了",ctx.channel().remoteAddress()); ctx.fireChannelInactive(); } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, MessageBean messageBean) throws Exception { String remoteAddress = channelHandlerContext.channel().remoteAddress().toString(); log.info("来自客户端{}的消息{}", remoteAddress,new String(messageBean.getContent(), CharsetUtil.UTF_8)); channelHandlerContext.writeAndFlush(new MessageBean("收到了客户端"+ remoteAddress + "的消息")); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.error("{}连接出异常了",ctx.channel().remoteAddress()); log.error(ExceptionUtil.printStackTrace((Exception) cause)); ctx.close(); } }
客户端 Netty客户端启动器
@Component
public class ClientBoot {
@Autowired
Bootstrap bootstrap;
@Autowired
MyNettyProperties myNettyProperties;
public Channel connect() throws InterruptedException {
// 连接服务器
ChannelFuture channelFuture = bootstrap.connect(myNettyProperties.getHost(), myNettyProperties.getPort()).sync();
// 监听关闭
Channel channel = channelFuture.channel();
return channel;
}
public Channel connectSlave() throws InterruptedException {
// 连接服务器
ChannelFuture channelFuture = bootstrap.connect(myNettyProperties.getHost(), myNettyProperties.getPort()).sync();
// 监听关闭
Channel channel = channelFuture.channel();
channel.closeFuture().sync();
return channel;
}
public void sendMsg(MessageBean messageBean) throws InterruptedException {
connect().writeAndFlush(messageBean);
}
}
Netty客户端监听消息处理器
@Slf4j @ChannelHandler.Sharable public class ClientListenerHandler extends SimpleChannelInboundHandler{ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("{}连上了服务器",ctx.channel().remoteAddress()); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.info("{}断开了服务器",ctx.channel().remoteAddress()); ctx.fireChannelInactive(); } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, MessageBean messageBean) throws Exception { log.info("来自服务端的消息:{}",new String(messageBean.getContent(), CharsetUtil.UTF_8)); channelHandlerContext.channel().close(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.error("{}连接出异常了",ctx.channel().remoteAddress()); log.error(ExceptionUtil.printStackTrace((Exception) cause)); ctx.close(); } }
公共 Netty属性配置
@ConfigurationProperties(prefix = "netty")
@Data
@Configuration
public class MyNettyProperties {
private Integer boss;
private Integer worker;
private Integer timeout = 30000;
private Integer port = 7000;
private Integer portSalve = 7001;
private String host = "127.0.0.1";
}
编码器
public class MessageEncodeHandler extends MessageToByteEncoder{ @Override protected void encode(ChannelHandlerContext channelHandlerContext, MessageBean messageBean, ByteBuf byteBuf) throws Exception { byteBuf.writeInt(messageBean.getLen()); byteBuf.writeBytes(messageBean.getContent()); } }
解码器
public class MessageDecodeHandler extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List
Netty开关注解 Netty客户服务端开关整合注解
@Import(ServerBoot.class)
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface EnableNettyServer {
}
Netty客户端开关注解
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(ClientBoot.class)
public @interface EnableNettyClient {
}
Netty服务端开关注解
@Import(ServerBoot.class)
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface EnableNettyServer {
}
项目启动类开启netty注解
@SpringBootApplication
@Slf4j
@EnableNetty
public class SpringBootExampleApplication {
public static void main(String[] args){
SpringApplication.run(SpringBootExampleApplication.class, args);
}
}
配置netty服务器相关信息
# netty配置 netty: boss: 1 worker: 4 timeout: 6000 port: 7000 portSalve: 7001 host: 127.0.0.1三、效果展示
浏览器访问: http://localhost:8080/netty/sendStudent?name=张三&age=20
[INFO ] 2022-05-12 14:33:29 [nioEventLoopGroup-2-1:131720:c.g.n.c.h.ClientListenerHandler:27] - /127.0.0.1:7000连上了服务器
[INFO ] 2022-05-12 14:33:29 [nioEventLoopGroup-4-1:131720:c.g.n.s.h.ServerListenerHandler:29] - /127.0.0.1:1823客户端连接进来了
[INFO ] 2022-05-12 14:33:29 [nioEventLoopGroup-4-1:131735:c.g.n.s.h.ServerListenerHandler:54] - 来自客户端/127.0.0.1:1823的消息{"name":"张三","age":20}
[INFO ] 2022-05-12 14:33:29 [nioEventLoopGroup-2-1:131735:c.g.n.c.h.ClientListenerHandler:50] - 来自服务端的消息:收到了客户端/127.0.0.1:1823的消息
[INFO ] 2022-05-12 14:33:29 [nioEventLoopGroup-4-1:131735:c.g.n.s.h.ServerListenerHandler:40] - /127.0.0.1:1823连接断开了
[INFO ] 2022-05-12 14:33:29 [nioEventLoopGroup-2-1:131735:c.g.n.c.h.ClientListenerHandler:37] - /127.0.0.1:7000断开了服务器
四、总结
本文在springboot中集成了netty,并通过一种自定义编解码器方式解决常见的tcp粘包拆包问题,整理成单独模块被其他springboot项目调用。
五、项目源码使用示例源码地址:
https://gitee.com/teajoy/springboot-modules/tree/master/springboot-example
netty模块源码地址:
https://gitee.com/teajoy/springboot-modules/tree/master/springboot-netty



