目录
pom依赖
Server端
main方法初始化netty
服务端端主动给客户端发送消息
pom依赖
io.netty
netty-all
4.1.36.Final
Server端
package com.kc.monitor.core.utils.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
public class NettyServer{
public static ConcurrentHashMap userIdMap = new ConcurrentHashMap();
public static ConcurrentHashMap remoteAddressMap = new ConcurrentHashMap();
public static int port = 8081;
public void initServer() {
// 用于接受客户端连接的请求 (并没有处理请求)
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
// 用于处理客户端连接的读写操作,三个线程
NioEventLoopGroup workGroup = new NioEventLoopGroup(3);
// 用于创建我们的ServerBootstrap
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// LinebasedframeDecoder解决粘包-解包问题,设置我们分割最大长度为1024
// 原理就是自动帮我们把带有n或rn的数据进行换行
//socketChannel.pipeline().addLast(new LinebasedframeDecoder(1024));
socketChannel.pipeline().addLast(new StringEncoder());// String编码器
socketChannel.pipeline().addLast(new StringDecoder());// String解码器
socketChannel.pipeline().addLast(new ServerHandler());// 管道类-接收发送消息
}
});
try {
// 绑定端口号,同步等待成功
ChannelFuture future = serverBootstrap.bind(port).sync();
System.out.println("服务器启动成功:" + port);
// 等待服务器监听端口
future.channel().closeFuture().sync();
System.out.println("关闭服务器");
} catch (Exception e) {
e.printStackTrace();
} finally {
// 优雅的关闭连接
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
class ServerHandler extends SimpleChannelInboundHandler {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
// msg消息协议格式:id/消息内容
String[] split = msg.split("/");
NettyServer.userIdMap.put(split[0], channelHandlerContext);
System.out.println("接收客户端消息:" + msg);
// 响应客户端内容:
channelHandlerContext.writeAndFlush("00");
InetSocketAddress ipSocket = (InetSocketAddress)channelHandlerContext.channel().remoteAddress();
System.out.println("获取客户端ip:" + ipSocket.getAddress().toString());
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// remoteAddress = /192.168.52.1:59807
String remoteAddress = ctx.channel().remoteAddress().toString();
System.out.println("有客户端连接... 客户端地址" + remoteAddress);
NettyServer.remoteAddressMap.put(remoteAddress, ctx);
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
String remoteAddress = ctx.channel().remoteAddress().toString();
System.out.println("有客户端断开连接... " + remoteAddress);
NettyServer.remoteAddressMap.remove(remoteAddress);
super.channelInactive(ctx);
}
}
main方法初始化netty
@SpringBootApplication
@MapperScan(basePackages = "com.kc.monitor.mapper")
public class MonitorApplication {
public static void main(String[] args) {
SpringApplication.run(MonitorApplication.class, args);
// 初始化Netty服务器
NettyServer nettyServer = new NettyServer();
nettyServer.initServer();
}
}
服务端端主动给客户端发送消息
package com.kc.monitor.core.utils.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
public class NettyServer{
public static ConcurrentHashMap userIdMap = new ConcurrentHashMap();
public static ConcurrentHashMap remoteAddressMap = new ConcurrentHashMap();
public static int port = 8081;
public void initServer() {
// 用于接受客户端连接的请求 (并没有处理请求)
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
// 用于处理客户端连接的读写操作,三个线程
NioEventLoopGroup workGroup = new NioEventLoopGroup(3);
// 用于创建我们的ServerBootstrap
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// LinebasedframeDecoder解决粘包-解包问题,设置我们分割最大长度为1024
// 原理就是自动帮我们把带有n或rn的数据进行换行
//socketChannel.pipeline().addLast(new LinebasedframeDecoder(1024));
socketChannel.pipeline().addLast(new StringEncoder());// String编码器
socketChannel.pipeline().addLast(new StringDecoder());// String解码器
socketChannel.pipeline().addLast(new ServerHandler());// 管道类-接收发送消息
}
});
try {
// 绑定端口号,同步等待成功
ChannelFuture future = serverBootstrap.bind(port).sync();
System.out.println("服务器启动成功:" + port);
// 等待服务器监听端口
future.channel().closeFuture().sync();
System.out.println("关闭服务器");
} catch (Exception e) {
e.printStackTrace();
} finally {
// 优雅的关闭连接
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
class ServerHandler extends SimpleChannelInboundHandler {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
// msg消息协议格式:id/消息内容
String[] split = msg.split("/");
NettyServer.userIdMap.put(split[0], channelHandlerContext);
System.out.println("接收客户端消息:" + msg);
// 响应客户端内容:
channelHandlerContext.writeAndFlush("00");
InetSocketAddress ipSocket = (InetSocketAddress)channelHandlerContext.channel().remoteAddress();
System.out.println("获取客户端ip:" + ipSocket.getAddress().toString());
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// remoteAddress = /192.168.52.1:59807
String remoteAddress = ctx.channel().remoteAddress().toString();
System.out.println("有客户端连接... 客户端地址" + remoteAddress);
NettyServer.remoteAddressMap.put(remoteAddress, ctx);
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
String remoteAddress = ctx.channel().remoteAddress().toString();
System.out.println("有客户端断开连接... " + remoteAddress);
NettyServer.remoteAddressMap.remove(remoteAddress);
super.channelInactive(ctx);
}
}
main方法初始化netty
@SpringBootApplication
@MapperScan(basePackages = "com.kc.monitor.mapper")
public class MonitorApplication {
public static void main(String[] args) {
SpringApplication.run(MonitorApplication.class, args);
// 初始化Netty服务器
NettyServer nettyServer = new NettyServer();
nettyServer.initServer();
}
}
服务端端主动给客户端发送消息
发送前提是以有客户端连接了服务器端。
注意:修改address客户端地址,也可以直接从remoteAddressMap中读取客户端地址。
@GetMapping("/test")
public void add(String msg){
System.out.println(NettyServer.remoteAddressMap.size());
System.out.println(NettyServer.userIdMap.size());
// 1.根据客户端ip地址发送,发送消息到“/192.168.52.1:59807”客户端
String address = "/192.168.52.1:59807";
if (NettyServer.remoteAddressMap.containsKey(address)){
NettyServer.remoteAddressMap.get(address).writeAndFlush(msg);
}
// 2.给用户id为1的客户端发送消息
String userId = "1";
if (NettyServer.userIdMap.containsKey(userId)){
NettyServer.userIdMap.get(userId).writeAndFlush(msg);
}
}



