SprinngBoot下集成webSock的几种方式
基于kafka配置推送
package cn.microvideo.flowac.sys.util;
import cn.microvideo.flowac.sys.entity.LocalSysMsgForWSBean;
import cn.microvideo.flowac.sys.entity.WSMsgModuleEnum;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
@Component
public class WebSocketUtil {
private static WebSocketUtil instance;
// TODO 暂时使用kafkaOne对象,后续可能会更改为系统自己的kafka
@Resource(name = "kafkaOneTemplate")
private KafkaTemplate kafkaTemplate;
@Value("${localSysMessageForWSTopic}")
private String localSysMessageForWSTopic;
@PostConstruct
public void init() {
instance = this;
}
public static void sendForWS(Object message, WSMsgModuleEnum moduleEnum) {
LocalSysMsgForWSBean bean = createBean(message, moduleEnum, 0);
instance.kafkaTemplate.send(instance.localSysMessageForWSTopic, GsonUtil.toJson(bean));
}
public static void sendForWSByUnit(Object message, WSMsgModuleEnum moduleEnum, String unitCode) {
LocalSysMsgForWSBean bean = createBean(message, moduleEnum, 1);
bean.setUnitCode(unitCode);
instance.kafkaTemplate.send(instance.localSysMessageForWSTopic, GsonUtil.toJson(bean));
}
public static void sendForWSByUser(Object message, WSMsgModuleEnum moduleEnum, String userId) {
LocalSysMsgForWSBean bean = createBean(message, moduleEnum, 2);
bean.setUserId(userId);
instance.kafkaTemplate.send(instance.localSysMessageForWSTopic, GsonUtil.toJson(bean));
}
private static LocalSysMsgForWSBean createBean(Object message, WSMsgModuleEnum moduleEnum, Integer sendType) {
LocalSysMsgForWSBean bean = new LocalSysMsgForWSBean();
bean.setModule(moduleEnum.getModule());
bean.setMessageType(moduleEnum.getMessageType());
bean.setSendType(sendType);
bean.setMessage(message);
return bean;
}
}
基于session配置推送
package com.microvideo.ewcp.configration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter(){
return new ServerEndpointExporter();
}
}
package com.microvideo.ewcp.configration;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import org.springframework.stereotype.Component;
@Component
@ServerEndpoint("/websocket/{sid}/{rid}")
public class WebSocketServer {
//静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
private static int onlineCount = 0;
//concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
private static CopyOnWriteArraySet webSocketSet = new CopyOnWriteArraySet();
//与某个客户端的连接会话,需要通过它来给客户端发送数据
private Session session;
//接收sid
private String sid="";
public static final String event = "fk_centerid";//在线单位集合key
@OnOpen
public void onOpen(Session session,@PathParam("sid") String sid,@PathParam("rid") String rid) {
this.session = session;
webSocketSet.add(this); //加入set中
addOnlineCount(); //在线数加1
this.sid=sid;
}
@OnClose
public void onClose(@PathParam("rid") String rid) {
webSocketSet.remove(this); //从set中删除
subOnlineCount(); //在线数减1
}
@OnMessage
public void onMessage(String message, Session session) {
//群发消息
for (WebSocketServer item : webSocketSet) {
try {
item.sendMessage(message);
} catch (IOException e) {
e.printStackTrace();
}
}
}
@OnError
public void onError(Session session, Throwable error) {
error.printStackTrace();
}
public void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
}
public static void sendInfo(String message,@PathParam("sid") String sid) throws IOException {
for (WebSocketServer item : webSocketSet) {
try {
//这里可以设定只推送给这个sid的,为null则全部推送
if(sid==null) {
item.sendMessage(message);
}else if(item.sid.equals(sid)){
item.sendMessage(message);
}
} catch (IOException e) {
continue;
}
}
}
public static synchronized int getOnlineCount() {
return onlineCount;
}
public static synchronized void addOnlineCount() {
WebSocketServer.onlineCount++;
}
public static synchronized void subOnlineCount() {
WebSocketServer.onlineCount--;
}
}