WebSocket 实战
- 配置 WebSocketConfig
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter(){
return new ServerEndpointExporter();
}
}
- 创建 WebSocket 服务端
@Log4j2
@Component
@ServerEndpoint(value = "/api/websocket/queue/{userId}")
public class WebSocketQueue {
// 当前会话(与某个客户端的连接会话,需要通过它来给客户端发送数据)
private Session session;
// 总会话集合:key:userId value:Session
private static Map sessionPool = new HashMap<>();
// session 与 userId 关系集合:key:sessionId value:userId
private static Map sessionIds = new HashMap<>();
@OnOpen
public void onOpen(Session session, @PathParam(value = "userId") String userId){
this.session = session;
sessionPool.put(userId,session);
sessionIds.put(session.getId(),userId);
log.info("Queue-用户:{},新连接加入!当前在线人数为:{}",userId,getOnlineNum());
}
@OnClose
public void onClose(){
log.info("【webSocket-onClose】关闭连接调用session:{}",sessionIds);
sessionPool.remove(sessionIds.get(session.getId())); //断开连接,移除用户
sessionIds.remove(session.getId()); //断开连接移除用户
log.info("Queue-有一连接关闭!当前在线人数为" + getOnlineNum());
}
@OnMessage
public void onMessage(String message, Session session) {
if("ping".equals(message)){ log.info("Queue-定时请求,防止掉线;心跳sessionId:{},sessionName:{}",session.getId(),sessionIds.get(session.getId())); return;}
log.info("Queue-来自客户端的消息:{}" , message);
sendMessage(message,sessionIds.get(session.getId()));
}
@OnError
public void onError(Session session, Throwable error){
log.info("【webSocket-onError】发生错误时调用 session:{}",session);
log.info("Queue-webSocket发生错误:session:{}-{}",session.getId(),error.getMessage());
error.printStackTrace();
throw new baseErrorException("TOPIC-onError发送客户信息失败,错误:"+error.getMessage());
}
public static void sendMessage(String message,String userId) throws baseErrorException{
log.info("【webSocket消息发送】发送号:{}-发送消息:{}",userId,message);
Session s = sessionPool.get(userId);
if (s!=null){
try {
s.getBasicRemote().sendText(message);
} catch (IOException e) {
log.info("【webSocket发送接口】Queue-发送客户信息失败,错误:"+e.getMessage());
}
}
}
public synchronized int getOnlineNum() {
return sessionPool.size();
}
public synchronized void getOnlineUsers() {
for (String key : sessionIds.keySet()){
log.info("Queue-所有在线用户:{}",sessionIds.get(key));
}
}
public synchronized void sendAll(String msg) {
for (String key : sessionIds.keySet()){
sendMessage(msg,sessionIds.get(key));
}
}
}
- 服务器给客户端发送消息
// 伪代码
WebSocketQueue.sendMessage(message, userId);
- 客户端
webSocket demo