一.导入Netty依赖
io.netty netty-all4.1.25.Final
二.搭建websocket服务器
@Component
public class WebSocketServer {
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private ServerBootstrap server;
private ChannelFuture future;
public void start() {
future = server.bind(9001);
System.out.println("netty server - 启动成功");
}
public WebSocketServer() {
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
server = new ServerBootstrap();
server.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new WebsocketInitializer());
}
}
三.初始化Websocket
public class WebsocketInitializer extends ChannelInitializer{ @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // ------------------ // 用于支持Http协议 // ------------------ // websocket基于http协议,需要有http的编解码器 pipeline.addLast(new HttpServerCodec()); // 对写大数据流的支持 pipeline.addLast(new ChunkedWriteHandler()); // 添加对HTTP请求和响应的聚合器:只要使用Netty进行Http编程都需要使用 //设置单次请求的文件的大小 pipeline.addLast(new HttpObjectAggregator(1024 * 64)); //webSocket 服务器处理的协议,用于指定给客户端连接访问的路由 :/ws pipeline.addLast(new WebSocketServerProtocolHandler("/ws")); // 添加Netty空闲超时检查的支持 // 1. 读空闲超时(超过一定的时间会发送对应的事件消息) // 2. 写空闲超时 // 3. 读写空闲超时 pipeline.addLast(new IdleStateHandler(4, 8, 12)); //添加心跳处理 pipeline.addLast(new HearBeatHandler()); // 添加自定义的handler pipeline.addLast(new ChatHandler()); } }
四.创建Netty监听器
@Component public class NettyListener implements ApplicationListener{ @Resource private WebSocketServer websocketServer; @Override public void onApplicationEvent(ContextRefreshedEvent event) { if(event.getApplicationContext().getParent() == null) { try { websocketServer.start(); } catch (Exception e) { e.printStackTrace(); } } } }
五.建立消息通道
public class UserChannelMap {
// private static Map userChannelMap;
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
private static ConcurrentHashMap userChannelMap = new ConcurrentHashMap<>();
private UserChannelMap(){}
public static void put(String userNum, Channel channel) {
userChannelMap.put(userNum, channel);
}
public static void remove(String userNum) {
userChannelMap.remove(userNum);
}
public static void removeByChannelId(String channelId) {
if(!StringUtils.isNotBlank(channelId)) {
return;
}
for (String s : userChannelMap.keySet()) {
Channel channel = userChannelMap.get(s);
if(channelId.equals(channel.id().asLongText())) {
System.out.println("客户端连接断开,取消用户" + s + "与通道" + channelId + "的关联");
userChannelMap.remove(s);
UserService userService = SpringUtil.getBean(UserService.class);
userService.logout(s);
break;
}
}
}
public static void print() {
for (String s : userChannelMap.keySet()) {
System.out.println("用户id:" + s + " 通道:" + userChannelMap.get(s).id());
}
}
public static Channel get(String receiverNum) {
return userChannelMap.get(receiverNum);
}
public static ChannelGroup getChannelGroup() {
return channelGroup;
}
public static ConcurrentHashMap getUserChannelMap(){
return userChannelMap;
}
}
六.自定义消息类型
public class Message {
private Integer type;
private String message;
private Object ext;
public Integer getType() {
return type;
}
public void setType(Integer type) {
this.type = type;
}
public MarketChatRecord getChatRecord() {
return marketChatRecord;
}
public void setChatRecord(MarketChatRecord chatRecord) {
this.marketChatRecord = chatRecord;
}
public Object getExt() {
return ext;
}
public void setExt(Object ext) {
this.ext = ext;
}
@Override
public String toString() {
return "Message{" +
"type=" + type +
", marketChatRecord=" + marketChatRecord +
", ext=" + ext +
'}';
}
}
七.创建处理消息的handler
public class ChatHandler extends SimpleChannelInboundHandler{ private static final Logger log = LoggerFactory.getLogger(WebSocketServer.class); private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketframe msg) throws Exception { // 当接收到数据后会自动调用 // 获取客户端发送过来的文本消息 Gson gson = new Gson(); log.info("服务器收到消息:{}",msg.text()); System.out.println("接收到消息数据为:" + msg.text()); Message message = gson.fromJson(msg.text(), Message.class); //根据业务要求进行消息处理 switch (message.getType()) { // 处理客户端连接的消息 case 0: // 建立用户与通道的关联 // 处理客户端发送好友消息 break; case 1: // 处理客户端的签收消息 break; case 2: // 将消息记录设置为已读 break; case 3: // 接收心跳消息 break; default: break; } } // 当有新的客户端连接服务器之后,会自动调用这个方法 @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { log.info("handlerAdded 被调用"+ctx.channel().id().asLongText()); // 添加到channelGroup 通道组 UserChannelMap.getChannelGroup().add(ctx.channel()); // clients.add(ctx.channel()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.info("{异常:}"+cause.getMessage()); // 删除通道 UserChannelMap.getChannelGroup().remove(ctx.channel()); UserChannelMap.removeByChannelId(ctx.channel().id().asLongText()); ctx.channel().close(); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { log.info("handlerRemoved 被调用"+ctx.channel().id().asLongText()); //删除通道 UserChannelMap.getChannelGroup().remove(ctx.channel()); UserChannelMap.removeByChannelId(ctx.channel().id().asLongText()); UserChannelMap.print(); } }
八.处理心跳
public class HearBeatHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if(evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent)evt;
if(idleStateEvent.state() == IdleState.READER_IDLE) {
System.out.println("读空闲事件触发...");
}
else if(idleStateEvent.state() == IdleState.WRITER_IDLE) {
System.out.println("写空闲事件触发...");
}
else if(idleStateEvent.state() == IdleState.ALL_IDLE) {
System.out.println("---------------");
System.out.println("读写空闲事件触发");
System.out.println("关闭通道资源");
ctx.channel().close();
}
}
}
}
搭建完成后调用测试
1.页面访问http://localhost:9001/ws
2.端口号9001和访问路径ws都是我们在上边配置的,然后传入我们自定义的消息message类型。
3.大概流程:消息发送 :用户1先连接通道,然后发送消息给用户2,用户2若是在线直接可以发送给用户,若没在线可以将消息暂存在redis或者通道里,用户2链接通道的话,两者可以直接通讯。
消息推送 :用户1连接通道,根据通道id查询要推送的人是否在线,或者推送给所有人,这里我只推送给指定的人。
到此这篇关于SpringBoot+Netty+WebSocket实现消息发送的示例代码的文章就介绍到这了,更多相关SpringBoot Netty WebSocket消息发送内容请搜索考高分网以前的文章或继续浏览下面的相关文章希望大家以后多多支持考高分网!



