主启动
@SpringBootApplication
public class UdpListenerApplication implements CommandLineRunner {
@Value("${netty.port}")
private int port;
@Value("${netty.url}")
private String url;
@Autowired
private BootNettyUdpServer bootNettyUdpServer;
@Override
public void run(String... args) throws Exception {
InetSocketAddress address = new InetSocketAddress(url, port);
System.out.println("服务端启动成功:" + url + ":" + port);
bootNettyUdpServer.start(address);
}
public static void main(String[] args) {
SpringApplication.run(UdpListenerApplication.class, args);
System.out.println("UdpListenerApplication启动成功......");
}
}
Server
@Service
public class BootNettyUdpServer {
//logger
private static final Logger log = LoggerFactory.getLogger(BootNettyUdpServer.class);
@Autowired
BootNettyUdpSimpleChannelInboundHandler bootNettyUdpSimpleChannelInboundHandler;
public void start(InetSocketAddress address) {
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();//
b.group(workerGroup).channel(NioDatagramChannel.class)//设置UDP通道
// .handler(new BootNettyUdpSimpleChannelInboundHandler())//初始化处理器
.handler(bootNettyUdpSimpleChannelInboundHandler)//初始化处理器
.option(ChannelOption.SO_BROADCAST, true)// 支持广播
.option(ChannelOption.SO_RCVBUF, 1024 * 1024)// 设置UDP读缓冲区为1M
.option(ChannelOption.SO_SNDBUF, 1024 * 1024);// 设置UDP写缓冲区为1M
System.out.println("[UDP 启动了]");
// 绑定端口,开始接收进来的连接
ChannelFuture f = b.bind(address).sync();
//让线程进入wait状态,也就是main线程暂时不会执行到finally里面,nettyserver也持续运行,如果监听到关闭事件,可以优雅的关闭通道和nettyserver
f.channel().closeFuture().sync();
} catch (Exception e) {
// TODO: handle exception
System.out.println("Exception"+e.toString());
workerGroup.shutdownGracefully();
}
}
}
Handler
@Component
public class BootNettyUdpSimpleChannelInboundHandler extends SimpleChannelInboundHandler {
private static final Charset CHARSET = Charset.forName("GBK");
private static Logger logger = LoggerFactory.getLogger(BootNettyUdpSimpleChannelInboundHandler.class);
@Autowired
ParserManager parserManager;
@Override
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket datagramPacket) throws Exception {
try {
logger.info("服务器接收到了数据");
ByteBuf byteBuf = datagramPacket.copy().content();
logger.info("接收到的数据,hexDump:{}", ByteBufUtil.hexDump(byteBuf));
frame frame = new frame();
int len = getframeLength(byteBuf, frame);
logger.info("帧长度为", len);
if (len == 0) {
logger.info("帧长度为", 0);
return;
} else if (len < 0) {
logger.info("帧长度小于0,无效帧");
// may be not a valid frame
logger.error("recv a invalid frame, peer ip is: {}, close channel: {}, recv buffer: {}", ctx.channel().remoteAddress(), ctx.channel(), ByteBufUtil
.hexDump(byteBuf));
byteBuf.skipBytes(byteBuf.readableBytes());
return;
// ctx.close();
} else {
if (!ctx.channel().isActive()) {
logger.info("channel is not active");
}
if (len > 1024) {
logger.error("frame len exceed limit len 1024, close it");
byteBuf.skipBytes(byteBuf.readableBytes());
// ctx.close();
return;
}
frame.setframeLength(len);
frame.setInsertTime(DateUtils.currentDate());
frame.setCtx(ctx);
frame.setDatagramPacket(datagramPacket);
byte[] body = ByteBufUtil.getBytes(byteBuf, byteBuf.readerIndex() + 4, len - 6);
frame.setOrigin(body);
frame.setEnder(byteBuf.getUnsignedShort(byteBuf.readerIndex() + (len - 2)));
if (frame != null) {
parserManager.parser(frame);
logger.info("frame is {}", frame);
} else {
byteBuf.release();
logger.warn("newRequestframe return a null frame, close channel");
ctx.close();
}
byteBuf.skipBytes(len);
}
//收到udp消息后,可通过此方式原路返回的方式返回消息,例如返回时间戳
// ctx.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer("receice data", CharsetUtil.UTF_8),
// datagramPacket.sender()));
} catch (Exception e) {
logger.error("BootNettyUdpSimpleChannelInboundHandler.channelRead0发生异常", e);
}
}