栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

webSocket实现点对点、群发消息通知

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

webSocket实现点对点、群发消息通知

第一:配置Websocket请求的路径
package com.ella.operation.server.client;

import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;

//实现接口来配置Websocket请求的路径和拦截器。
@Configuration
@EnableWebSocket
public class H5WebSocketConfig implements WebSocketConfigurer {

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {

        //handler是webSocket的核心,配置入口
        registry.addHandler(new MyHandler(), "/myHandler/{type}/{ID}").setAllowedOrigins("*").addInterceptors(new WebSocketInterceptor());

    }
}

第二:配置Websocket拦截器

package com.ella.operation.server.client;

import lombok.extern.slf4j.Slf4j;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;

import javax.servlet.http.HttpSession;
import java.util.Map;

@Slf4j
public class WebSocketInterceptor implements HandshakeInterceptor {

    //在握手之前执行该方法, 继续握手返回true, 中断握手返回false. 通过attributes参数设置WebSocketSession的属性
    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Map attributes) throws Exception {
        if (request instanceof ServletServerHttpRequest) {
//            String ID = request.getURI().toString().split("ID=")[1];
//            System.out.println("当前session的ID="+ID);
            ServletServerHttpRequest serverHttpRequest = (ServletServerHttpRequest) request;
            HttpSession session = serverHttpRequest.getServletRequest().getSession();
//            attributes.put("WEBSOCKET_USERID",ID);

            String requestUri = request.getURI().toString();
            String[] requestUriArr = requestUri.split("/");
            String userCode = requestUriArr[requestUriArr.length - 1];
            String type = requestUriArr[requestUriArr.length - 2];
            if (!"operation".equals(type) && !"composition".equals(type)) {
                return false;
            }
            attributes.put("WEBSOCKET_USER_ID", userCode);
            log.error("当前session userCode:{}", userCode);
        }
        return true;
    }

    //完成握手之后执行该方法
    @Override
    public void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) {
        System.out.println("进来webSocket的afterHandshake拦截器!");
    }
}

第三:适配器

package com.ella.operation.server.client;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import org.springframework.web.socket.*;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

@Slf4j
@Service
public class MyHandler implements WebSocketHandler {

    //在线用户列表
    private static final Map users;
    private static final Map> usersMap;


    static {
        users = new HashMap<>();
        usersMap = new HashMap<>();
    }

    //新增socket
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
//        log.info("成功建立连接");
//        String ID = session.getUri().toString().split("ID=")[1];
//        log.info("ID:"+ID);
//        if (ID != null) {
//            users.put(ID, session);
//            session.sendMessage(new TextMessage("成功建立socket连接"));
//            log.info("ID:"+users.get(ID));
//            log.info("session:"+session);
//        }
//        log.info("当前在线人数:"+users.size());

        String requestUri = session.getUri().toString();
        log.error(requestUri);
        String[] requestUriArr = requestUri.split("/");
        String userCode = requestUriArr[requestUriArr.length - 1];
        String type = requestUriArr[requestUriArr.length - 2];
        if (StringUtils.isNotBlank(userCode)) {
            putUserToMap(type, userCode, session);
            session.sendMessage(new TextMessage("成功建立socket连接"));
            log.info("user_code:{},type:{}", users.get(userCode + "_" + type), type);
            log.info("session:" + session);
        }
        log.info("成功建立连接 user_code:{},type:{}", userCode, type);
    }

    //接收socket信息
    @Override
    public void handleMessage(WebSocketSession webSocketSession, WebSocketMessage webSocketMessage) throws Exception {
        

    }

    
    public boolean sendMessageToUser(String clientId, String type, TextMessage message) {
        Map map = getUserMap(type);
        if (null == map) {
            return false;
        }
        WebSocketSession session = map.get(clientId);
        if (null == session) {
            return false;
        }
        log.info("sendMessage:" + session);
        if (!session.isOpen()) {
            return false;
        }
        try {
            session.sendMessage(message);
        } catch (IOException e) {
            e.printStackTrace();
            return false;
        }
        return true;
    }

    
    public boolean sendMessageToUser(String clientId, TextMessage message) {
        System.out.println(users.size());
        WebSocketSession session = users.get(clientId);
        if (session == null) {
            return false;
        }
        log.info("sendMessage:" + session);
        if (!session.isOpen()) {
            return false;
        }
        try {
            session.sendMessage(message);
        } catch (IOException e) {
            e.printStackTrace();
            return false;
        }
        return true;
    }

    
    public boolean sendMessageToAllUsers(TextMessage message) {
        boolean allSendSuccess = true;
        Set clientIds = users.keySet();
        WebSocketSession session = null;
        for (String clientId : clientIds) {
            try {
                session = users.get(clientId);
                if (session.isOpen()) {
                    session.sendMessage(message);
                }
            } catch (IOException e) {
                e.printStackTrace();
                allSendSuccess = false;
            }
        }

        return allSendSuccess;
    }

    
    public boolean sendMessageToAllUsers(String type,TextMessage message) {
        Map map = getUserMap(type);
        if (null == map) {
            return false;
        }
        boolean allSendSuccess = true;
        Set clientIds = map.keySet();
        WebSocketSession session = null;
        for (String clientId : clientIds) {
            try {
                session = map.get(clientId);
                if (session.isOpen()) {
                    session.sendMessage(message);
                }
            } catch (IOException e) {
                e.printStackTrace();
                allSendSuccess = false;
            }
        }
        return allSendSuccess;
    }
    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
        if (session.isOpen()) {
            session.close();
        }
        log.info("连接出错");
        users.remove(getClientId(session));
    }

    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
        log.info("连接已关闭:" + status);
        users.remove(getClientId(session));
    }

    @Override
    public boolean supportsPartialMessages() {
        return false;
    }

    
    private Integer getClientId(WebSocketSession session) {
        try {
            Integer clientId = (Integer) session.getAttributes().get("WEBSOCKET_USERID");
            return clientId;
        } catch (Exception e) {
            return null;
        }
    }

    public Map getUserMap(String type) {
        Map map = usersMap.get(type);
        return map;
    }

    public void putUserToMap(String type, String userCode, WebSocketSession session) {
        Map map = getUserMap(type);
        if (null == map) {
            map = new HashMap<>();
        }
        map.put(userCode, session);
        usersMap.put(type, map);
    }

//    @Scheduled(fixedRate = 2000)
//    public  void sen() {
//        Boolean b = sendMessageToUser("H2",new TextMessage("H22222222222"));
//    }
//
//    @Scheduled(fixedRate = 5000)
//    public  void sen1() {
//        Boolean b = sendMessageToUser("H1",new TextMessage("H111111111111"));
//    }
}

第四:应用

Map compositionMap=new HashMap<>(2);
compositionMap.put("baseBookCode",fromDetail.getBaseBookCode());
compositionMap.put("operationType",Constant.MESSAGE_TYPE_COMPOSITION_DELETE_PAGE);
//删除页编排工具通知
handler.sendMessageToAllUsers(Constant.MESSAGE_TYPE_COMPOSITION,new TextMessage(JSONObject.toJSONString(compositionMap)));

第五:依赖



    javax.websocket
    javax.websocket-api
    1.1
    provided


    javax
    javaee-api
    7.0
    provided


    org.springframework.boot
    spring-boot-starter-websocket

接口说明:

**开发者:** 

- 王庆乾


**简要描述:** 

- TCP 协议

**请求URL:** 
-  `ws://ip:端口/myHandler/{type}/{userCode}`
-  `type编排工具恒传composition,工作台恒传operation,userCode对应登录人的用户编码`
-  `具体业务根据响应参数中的operationType判断获取对应消息`

如:

socket=new WebSocket
("ws://ip:端口/myHandler/operation/111");

就是给 111 用户 发送弹窗
**业务说明:**
-  `1、任务通知     归属   工作台`
-  `2、图书删除页   归属   工作台`

**1、任务通知     归属   工作台:**

**响应参数:**

再原基础上新增operationType参数,值为TASK

**2、图书删除页   归属   工作台:**

**响应参数:**

|参数名|类型|说明|
|:----    |:---|:----- |-----   
|bookName  |string |图书名称  |
|userName  |string |操作人名称   |
|operationTime  |string |操作时间   |
|pageNum  |string |页号   |
|operationType  |string |操作类型=OPERATION_DELETE_PAGE   |
 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/831364.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号