栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

SpringBoot集成Redis实现消息队列的方法(stringRedisTemplate.executePipelined)的用法

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

SpringBoot集成Redis实现消息队列的方法(stringRedisTemplate.executePipelined)的用法

引入依赖:


  org.springframework.boot
  spring-boot-starter-data-redis

消息提供者:

@Override
    public void produce(GiveVouchersParamDTO giveVouchersParamDTO) {
        // TODO:转成JSON格式
        String msg = JsonUtil.encode(giveVouchersParamDTO);
        // TODO: 左放入消息队列
        stringRedisTemplate.opsForList().leftPush(MESSAGE_KEY, msg);
    }

消息消费者:

@Override
    public void blockingConsume() {

//管道返回对应值
        List obj = stringRedisTemplate.executePipelined(new RedisCallback() {
            @Nullable
            @Override
            public Object doInRedis(RedisConnection connection) throws DataAccessException {
                //队列没有元素会阻塞操作,直到队列获取新的元素或超时,
                //阻塞线程每隔20s超时执行一次。该方法解决了 CPU 空转的问题。
                //MESSAGE_KEY: redisKye
                return connection.bLPop(20, MESSAGE_KEY.getBytes());
            }
        }, new StringRedisSerializer());

        if (obj == null) {
            return;
        }
        // 对管道返回值进行对应处理
        for (Object value : obj) {
            if (value != null) {
                if (value instanceof List) {
                    List list = (List) value;
                    for (String v : list) {
                        if (v != null) {
                            if (!v.startsWith("{")) {
                                continue;
                            }
                            try {
                                
                                GiveVouchersParamDTO giveVouchersParamDTO =   JsonUtil.decode(v,GiveVouchersParamDTO.class);
                                log.info(v);
                                log.info(giveVouchersParamDTO );
                            } catch (Exception e) {
                                log.error(e.getMessage(), e);
                            } 
                        }
                    }
                }
            }
        }
    }
 

启动线程去监听消息消费:
@PostConstruct:被@PostConstruct修饰的方法会在服务器加载Servlet的时候运行,并且只会被服务器调用一次,类似于Serclet的init()方法。被 @PostConstruct修饰的方法会在构造函数之后,init()方法之前运行
如果想在生成对象时完成某些初始化操作,而偏偏这些初始化操作又依赖于依赖注入,那么久无法在构造函数中实现。为此,可以使用@PostConstruct注解一个方法来完成初始化,@PostConstruct注解的方法将会在依赖注入完成后被自动调用。

@Component
public class RedisMqJob {

    @Autowired
    RedisMQService redisMQService;

    @PostConstruct
    public void consume() {
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    try {
                        redisMQService.blockingConsume();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        thread.start();
    }
}
转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号