栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java > SpringBoot

补习系列(13)-springboot redis 与发布订阅

SpringBoot 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

补习系列(13)-springboot redis 与发布订阅

一、订阅发布

订阅发布是一种常见的设计模式,常见于消息系统的场景。
如下面的图:

[图来自百科]
消息发布者是消息载体的生产者,其通过某些主题来向调度中心发送消息;
而消息订阅者会事先向调度中心订阅其**“感兴趣”**的主题,随后会获得新消息。
在这里,调度中心是一个负责消息控制中转的逻辑实体,可以是消息队列如ActiveMQ,也可以是Web服务等等。

常见应用
  • 微博,每个用户的粉丝都是该用户的订阅者,当用户发完微博,所有粉丝都将收到他的动态;
  • 新闻,资讯站点通常有多个频道,每个频道就是一个主题,用户可以通过主题来做订阅(如RSS),这样当新闻发布时,订阅者可以获得更新。

二、Redis 与订阅发布

Redis 支持 (pub/sub) 的订阅发布能力,客户端可以通过channel(频道)来实现消息的发布及接收。

  1. 客户端通过 SUBSCRIBE 命令订阅 channel;

  1. 客户端通过PUBLISH 命令向channel 发送消息;

而后,订阅 channel的客户端可实时收到消息。

除了简单的SUBSCRIBE/PUBLISH命令之外,Redis还支持订阅某一个模式的主题(正则表达式),
如下:

PSUBSCRIBE  /topic/cars
    @Bean
    public Jackson2JsonRedisSerializer jackson2JsonSerializer() {
 Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(
  Object.class);

 // 初始化objectmapper
 ObjectMapper mapper = new ObjectMapper();
 mapper.setSerializationInclusion(Include.NON_NULL);
 mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
 jackson2JsonRedisSerializer.setObjectMapper(mapper);
 return jackson2JsonRedisSerializer;
    }

    
    @Bean
    public RedisTemplate redisTemplate(JedisConnectionFactory connectionFactory,
     Jackson2JsonRedisSerializer jackson2JsonRedisSerializer) {

 RedisTemplate template = new RedisTemplate();
 template.setConnectionFactory(connectionFactory);

 // 设置key/hashkey序列化
 RedisSerializer stringSerializer = new StringRedisSerializer();
 template.setKeySerializer(stringSerializer);
 template.setHashKeySerializer(stringSerializer);

 // 设置值序列化
 template.setValueSerializer(jackson2JsonRedisSerializer);
 template.setHashValueSerializer(jackson2JsonRedisSerializer);
 template.afterPropertiesSet();

 return template;
    }

C. 发布消息

消息发布,需要先指定一个ChannelTopic对象,随后通过RedisTemplate方法操作。

@Service
public class RedisPubSub {  
    private static final Logger logger = LoggerFactory.getLogger(RedisPubSub.class);

    @Autowired
    private RedisTemplate redisTemplate;

    private ChannelTopic topic = new ChannelTopic("/redis/pubsub");

  
    @Scheduled(initialDelay = 5000, fixedDelay = 10000)
    private void schedule() {
 logger.info("publish message");
 publish("admin", "hey you must go now!");
    }

    
    public void publish(String publisher, String content) {
 logger.info("message send {} by {}", content, publisher);

 SimpleMessage pushMsg = new SimpleMessage();
 pushMsg.setContent(content);
 pushMsg.setCreateTime(new Date());
 pushMsg.setPublisher(publisher);

 redisTemplate.convertAndSend(topic.getTopic(), pushMsg);
    }

上述代码使用一个定时器(@Schedule)来做发布,为了保证运行需要在主类中启用定时器注解:

@EnableScheduling
@SpringBootApplication
public class BootSampleRedis{
...
}

D. 接收消息

定义一个消息接收处理的Bean:

    @Component
    public static class MessageSubscriber {

 public void onMessage(SimpleMessage message, String pattern) {
     logger.info("topic {} received {} ", pattern, JsonUtil.toJson(message));
 }
    }

接下来,利用 MessageListenerAdapter 可将消息通知到Bean方法:

 @Bean
 public MessageListenerAdapter listener(Jackson2JsonRedisSerializer jackson2JsonRedisSerializer,
  MessageSubscriber subscriber) {
     MessageListenerAdapter adapter = new MessageListenerAdapter(subscriber, "onMessage");
     adapter.setSerializer(jackson2JsonRedisSerializer);
     adapter.afterPropertiesSet();
     return adapter;
 }

最后,关联到消息发布的Topic:

 
 @Bean
 public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
  MessageListenerAdapter listener) {

     RedisMessageListenerContainer container = new RedisMessageListenerContainer();
     container.setConnectionFactory(connectionFactory);
     container.addMessageListener(listener, new PatternTopic("/redis/*"));
     return container;
 }

运行结果
启动程序,从控制台可输出:

.RedisPubSub : publish message
.RedisPubSub : message send hey you must go now! by admin
.RedisPubSub : topic /redis/* received {"publisher":"admin","content":"hey you must go now!","createTime":1543418694007} 

这样,我们便完成了订阅发布功能。

小结

消息订阅发布是分布式系统中的常用手段,也经常用来实现系统解耦、性能优化等目的;
当前小节结合SpringBoot 演示了 Redis订阅发布(pub/sub)的实现,在部分场景下可以参考使用。
欢迎继续关注"美码师的补习系列-springboot篇" ,期待更多精彩内容-

原文链接:https://www.cnblogs.com/littleatp/p/10035786.html

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号