栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java > SpringBoot

Redis 的发布订阅功能在 SpringBoot 中的应用

SpringBoot 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Redis 的发布订阅功能在 SpringBoot 中的应用

认识 Redis 的发布订阅功能

关于 Redis 发布订阅的功能介绍可以参考:订阅与发布。下面我来介绍下 Redis 发布订阅功能的特性和适用场景。

Redis 发布订阅功能的特性
  • 消息的发送者与接收者之间通过 channel 绑定:channel 可以是确定的字符串,也可以基于模式匹配
  • 客户端可以订阅任意多个 channel
  • 发送者发送的消息无法持久化,所以可能会造成消息丢失
  • 由于消息无法持久化,所以,消费者无法收到在订阅 channel 之间发送的消息
  • 发送者与客户端之间的消息发送与接收不存在 ACK 机制
Redis 发布订阅功能的适用场景

由于没有消息持久化与 ACK 的保证,所以,Redis 的发布订阅功能并不可靠。这也就导致了它的应用场景很有限,建议用于实时与可靠性要求不高的场景。例如:

  • 消息推送
  • 内网环境的消息通知

总之,Redis 发布订阅功能足够简单,如果没有过多的要求,且不想搭建 Kafka、RabbitMQ 这样的可靠型消息系统时,可以考虑尝试使用 Redis。

Redis 发布订阅功能在 SpringBoot 中的关键类 SpringBoot 版本

  org.springframework.boot
  spring-boot-starter-parent
  2.0.2.RELEASE
  

Spring Data Redis 实现发布订阅功能非常简单,只有这样的几个类:Topic、MessageListener、RedisMessageListenerContainer。下面对它们进行解释:

org.springframework.data.redis.listener.Topic

消息发送者与接收者之间的 channel 定义,有两个实现类:

  • org.springframework.data.redis.listener.ChannelTopic:一个确定的字符串
  • org.springframework.data.redis.listener.PatternTopic:基于模式匹配
org.springframework.data.redis.connection.MessageListener

一个回调接口,消息监听器,用于接收发送到 channel 的消息,接口定义如下:

package org.springframework.data.redis.connection;

import org.springframework.lang.Nullable;


public interface MessageListener {

	
	void onMessage(Message message, @Nullable byte[] pattern);
}
org.springframework.data.redis.listener.RedisMessageListenerContainer

用于消息监听,需要将 Topic 和 MessageListener 注册到 RedisMessageListenerContainer 中。这样,当 Topic 上有消息时,由 RedisMessageListenerContainer 通知 MessageListener,客户端通过 onMessage 拿到消息后,自行处理。

Redis 发布订阅功能在 SpringBoot 中的实践

说明:当前给出的示例代码使用 ChannelTopic,可以自行测试使用 PatternTopic。

  • VO 对象定义:CityInfo
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;


@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class CityInfo implements Serializable {

    
    private String city;

    
    private Double longitude;

    
    private Double latitude;
}
  • 配置类定义:RedisConfig
import listener.SubscribeListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.JdkSerializationRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;


@Configuration
public class RedisConfig {

    private final RedisConnectionFactory redisConnectionFactory;

    @Autowired
    public RedisConfig(RedisConnectionFactory redisConnectionFactory) {
 this.redisConnectionFactory = redisConnectionFactory;
    }

    
    @Bean
    public SubscribeListener listener() {
 return new SubscribeListener();
    }

    
    @Bean
    public ChannelTopic channelTopic() {
 return new ChannelTopic("city");
    }

    
    @Bean
    public PatternTopic patternTopic() {
 return new PatternTopic("/city
    @Bean
    public RedisMessageListenerContainer messageListenerContainer() {

 RedisMessageListenerContainer container = new RedisMessageListenerContainer();
 container.setConnectionFactory(redisConnectionFactory);

 // 可以修改成 patternTopic, 看一看 MessageListener 中监听的数据
 container.addMessageListener(listener(), channelTopic());
 return container;
    }
}
  • MessageListener 接口实现类:SubscribeListener
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;


public class SubscribeListener implements MessageListener {

    
    @Override
    public void onMessage(Message message, byte[] pattern) {

 String body = new String(message.getBody());
 String channel = new String(message.getChannel());
 String pattern_ = new String(pattern);

 System.out.println(body);
 System.out.println(channel);
 System.out.println(pattern_);// 如果是 ChannelTopic, 则 channel 字段与 pattern 字段值相同
    }
}
  • 测试用例:RedisPubSubTest
import com.alibaba.fastjson.JSON;
import Application;
import vo.CityInfo;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.test.context.junit4.SpringRunner;


@RunWith(SpringRunner.class)
@SpringBootTest(classes = {Application.class}, webEnvironment = SpringBootTest.WebEnvironment.NONE)
public class RedisPubSubTest {

    @Autowired
    private StringRedisTemplate redisTemplate;

    @Autowired
    private ChannelTopic topic;

    @Test
    public void testRedisPubSub() {

 redisTemplate.convertAndSend(
  topic.getTopic(),
  JSON.toJSONString(new CityInfo("hefei", 117.17, 31.52))
 );
    }
}

执行测试用例,可以看到如下打印信息:

2019-03-12 17:54:41.699  INFO 5627 --- [enerContainer-1] io.lettuce.core.EpollProvider     : Starting without optional epoll library
2019-03-12 17:54:41.703  INFO 5627 --- [enerContainer-1] io.lettuce.core.KqueueProvider    : Starting without optional kqueue library
2019-03-12 17:54:42.354  INFO 5627 --- [    main] com.imooc.ad.service.RedisPubSubTest     : Started RedisPubSubTest in 8.364 seconds (JVM running for 12.321)
{"city":"hefei","latitude":31.52,"longitude":117.17}
city
city
2019-03-12 17:54:42.936  INFO 5627 --- [Thread-4] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@2812b107: startup date [Tue Mar 12 17:54:35 CST 2019]; root of context hierarchy
2019-03-12 17:54:42.939  INFO 5627 --- [Thread-4] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 2147483647

Process finished with exit code 0

·······························
欢迎关注课程:

基于 SpringCloud 微服务架构下 广告系统设计与实现

JAVA分布式优惠券系统后台 手把手实战开发

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号