栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

springboot基于Redis发布订阅集群下WebSocket的解决方案

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

springboot基于Redis发布订阅集群下WebSocket的解决方案

一、背景

单机节点下,WebSocket连接成功后,可以直接发送消息。而多节点下,连接时通过nginx会代理到不同节点。

假设一开始用户连接了node1的socket服务。触发消息发送的条件的时候也通过nginx进行代理,假如代理转到了node2节点上,那么node2节点的socket服务就发送不了消息,因为一开始用户注册的是node1节点。这就导致了消息发送失败。

为了解决这一方案,消息发送时,就需要一个中间件来记录,这样,三个节点都可以获取消息,然后在根据条件进行消息推送。

二、解决方案(springboot 基于 Redis发布订阅)

1、依赖

    

	org.springframework.boot
	spring-boot-starter-data-redis

 

  org.springframework.boot
  spring-boot-starter-websocket

2、创建业务处理类 Demo.class,该类可以实现MessageListener接口后重写onMessage方法,也可以不实现,自己写方法。

import com.alibaba.fastjson.JSON;
import com.dy.service.impl.OrdersServiceImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
 
import java.util.HashMap;
 

@Component
public class Demo implements MessageListener {
  Logger logger = LoggerFactory.getLogger(this.getClass());
 
  @Override
  public void onMessage(Message message, byte[] pattern) {
    logger.info("消息订阅成功---------");
    logger.info("内容:"+message.getBody());
    logger.info("交换机:"+message.getChannel());
  }
}

3、创建PubSubConfig配置类

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
 

@Configuration
@EnableCaching
public class PubSubConfig {
  Logger logger = LoggerFactory.getLogger(this.getClass());
 
  //如果是多个交换机,则参数为(RedisConnectionFactory connectionFactory,
  //MessageListenerAdapter listenerAdapter,
  //MessageListenerAdapter listenerAdapter2)
  @Bean
  RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
 MessageListenerAdapter listenerAdapter) {
 
    RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    // 可以添加多个 messageListener,配置不同的交换机
    container.addMessageListener(listenerAdapter, new PatternTopic("channel:demo"));
    //container.addMessageListener(listenerAdapter2, new PatternTopic("channel:demo2"));
    return container;
  }
 
  
  @Bean
  MessageListenerAdapter listenerAdapter(Demo demo) {
    logger.info("----------------消息监听器加载成功----------------");
    // onMessage 就是方法名,基于反射调用
    return new MessageListenerAdapter(demo, "onMessage");
  }
 
  
  //@Bean
  //MessageListenerAdapter listenerAdapter2(SubCheckOrder subCheckOrder) {
  //  logger.info("----------------消息监听器加载成功----------------");
  //  return new MessageListenerAdapter(subCheckOrder, "onMessage");
  //}
 
  @Bean
  StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
    return new StringRedisTemplate(connectionFactory);
  }
}

4、消息发布

@Autowired
private RedisTemplate redisTemplate;
 
redisTemplate.convertAndSend("channel:demo", "我是内容");
三、具体用法
  • socket连接成功。
  • socket消息推送时,把信息发布到redis中。socket服务订阅redis的消息,订阅成功后进行推送。集群下的socket都能订阅到消息,但是只有之前连接成功的节点能推送成功,其余的无法推送。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/129223.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号