栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

java多服务推送websocket

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

java多服务推送websocket

场景:在使用多个服务使用一个websocket地址进行推送,这里使用redis订阅发布来实现。

        
            org.springframework.boot
            spring-boot-starter-data-redis
        
        
            org.springframework.boot
            spring-boot-starter-websocket
        

websocket使用

@Slf4j
@ServerEndpoint(value = "/ws/alarm")
@Component
public class BaseWebSocket {
    private static ConcurrentHashMap webSocketSet = new ConcurrentHashMap();
    private Session session;
    //uid
    private String uid = "";

    @OnOpen
    public void onOpen(Session session) {
        String param=session.getQueryString();
        if(StringUtils.isBlank(param)){
            return;
        }
        if(param.contains("uid=")){
            String[] split = param.split("=");
            this.uid= split[1];
            this.session = session;
            webSocketSet.put(session.getId(), this);//加入map中
        }
    }

    
    @OnClose
    public void onClose(Session session) {
        String sid = session.getId();
        if (StringUtils.isNotBlank(sid)) {
            webSocketSet.remove(sid);
        }
    }

    
    @OnMessage
    public void onMessage(String message, Session session) {
        log.info("来自客户端的消息:" + message);
    }

    @Autowired
    SysUserManageService sysUserManageService;

    
    public void sendToUser(String message) {
        for(String sid:webSocketSet.keySet()){
            sendUserById(message, sid);
        }
    }


    private void sendUserById(String message,String uid){
        try {
            if (webSocketSet.get(uid) != null) {
                webSocketSet.get(uid).sendMessage(message,webSocketSet.get(uid));
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    
    public void sendAll(String message) {
            sendUserById(message, id);
    }


    @OnError
    public void onError(Session session, Throwable error) {
        error.printStackTrace();
    }

    public void sendMessage(String message,BaseWebSocket webSocketSet) throws IOException {
        synchronized (webSocketSet.session) {
            webSocketSet.session.getBasicRemote().sendText(message);
        }
    }

使用

@Autowired
BaseWebSocket baseWebSocket;       

baseWebSocket.sendToUser(String);

多服务中

websocket服务端

@Configuration
public class MsgListenerConfig extends CachingConfigurerSupport {

    
    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory factory){
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(factory);
        //订阅一个通道 该处的通道名是发布消息时的名称
        container.addMessageListener(msgAdapter(),new PatternTopic("appMsg"));
        return container;
    }

    
    @Bean
    MessageListenerAdapter msgAdapter(){
        return new MessageListenerAdapter(new MsgListener());
    }

}
@Component
@Slf4j
public class MsgListener implements MessageListener {
    
    
    @Autowired
    BaseWebSocket baseWebSocket;

    @Override
    public void onMessage(Message message, byte[] bytes) {
        log.info("appMsg:" + message.toString());
        if(baseWebSocket==null){
            baseWebSocket=SpringContextUtil.getBean(BaseWebSocket.class);
        }
        baseWebSocket.sendToUser(message.toString());
    }
}

其他服务端

		
@Autowired
StringRedisTemplate stringRedisTemplate;


stringRedisTemplate.convertAndSend("appMsg", "str");

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/878175.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号