栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

使用Redis zset做消息队列

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

使用Redis zset做消息队列

目录

背景实现思路执行效果结语和代码

背景

按理来说,Redis做消息队列应该使用Stream。没错,但那是5.0及以上的功能,奈何公司用的是3.2.9的,没法用Stream。PubSub的消息又不持久化,所以选择了zset,不仅有序利用score特性还能做到延迟队列的效果。 实现思路

1、有一个线程不断轮询到时间的队列。要注意空轮询的问题。

... 省略...
  while (!Thread.interrupted() && !destroyFlag) {
... 省略...
}

2、查询到时间的redis数据,zset score实现这个功能。

Set strings = stringRedisTemplate.opsForZSet().rangeByScore(queueName, 0, System.currentTimeMillis());

3、序列号成对象,抢占消息。

... 省略 ...
  // 有可能是集群,多个节点,设置抢占
        for (RedisQueueMessage redisQueueMessage : msgList) {
            // TODO 同一个消息可能被多个进程获取到,之后通过删除的方式竞争,那些没抢到的相当于白取了一次任务;可以用lua优化抢占这块
            //TODO 建议增加抢占日志
            if (remove(redisQueueMessage)) { // 为true 表示抢占到了
                return redisQueueMessage;
            }
        }
   ...省略..

4、这个时候就是处理消息了,为了应对其他消息,我用了策略模式+模板方法。

...省略...
RedisQueueProcessService redisQueueProcessService = adapterHandler(redisQueueMessage.getBeanName());
... 省略...
    private RedisQueueProcessService adapterHandler(String beanName) {
       return applicationContext.getBean(beanName, RedisQueueProcessService.class);
    }

5、执行真正的处理逻辑

 
    private void invokeHandler(RedisQueueMessage redisQueueMessage, RedisQueueProcessService redisQueueProcessService) {
        RedisQueueProcessResp result = null;
        try {
            //TODO 建议增加真正消费之前的日志
            result = redisQueueProcessService.handler(redisQueueMessage);
            ifFailAgainAddQueue(redisQueueMessage, result);
            //TODO 建议增加真正消费之后的日志
        } catch (Exception e) {
            //TODO 可以限制下重试次数,如果再失败,后续人工补偿
            // 执行出现异常重新加入队列
            push(redisQueueMessage);
            System.out.println("执行业务代码程序异常");
            //TODO 建议增加真正消费之后的日志,出异常的情况
            throw new RuntimeException("执行业务代码异常");
        }
    }

完整核心代码

 package com.springboot.sample.redis;


import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import com.springboot.sample.bean.RedisQueueMessage;
import com.springboot.sample.bean.RedisQueueProcessResp;
import com.springboot.sample.redis.process.RedisQueueProcessService;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.http.HttpStatus;
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

@Component
public class DelayingQueueService implements InitializingBean {

    private static ObjectMapper mapper = Jackson2ObjectMapperBuilder.json().build();

    
    private volatile boolean destroyFlag = false;

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @Resource
    private ApplicationContext applicationContext;

    // 设定空轮询最大次数
    private static final int SELECTOR_AUTO_REBUILD_THRESHOLD = 512;

    // deadline 以及任务穿插逻辑处理  ,业务处理事件可能是5毫秒
    private long timeoutMillis = TimeUnit.MILLISECONDS.tonanos(5);

    
    @Value("${redisQueue.name}")
    public String queueName;


    
    public Boolean push(RedisQueueMessage redisQueueMessage)  {
        Boolean addFlag = null;
        try {
            addFlag = stringRedisTemplate.opsForZSet().add(queueName, mapper.writevalueAsString(redisQueueMessage), redisQueueMessage.getDelayTime());
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        return addFlag;
    }

    
    public Boolean remove(RedisQueueMessage redisQueueMessage) {
        Long remove = 0L;
        try {
            remove = stringRedisTemplate.opsForZSet().remove(queueName, mapper.writevalueAsString(redisQueueMessage));
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        return remove > 0 ? true : false;
    }


    
    public RedisQueueMessage dequeue() {
        Set strings = stringRedisTemplate.opsForZSet().rangeByScore(queueName, 0, System.currentTimeMillis());
        if (strings == null) {
            return null;
        }
        List msgList = strings.stream().map(msg -> {
            RedisQueueMessage redisQueueMessage = null;
            try {
                redisQueueMessage = mapper.readValue(msg, RedisQueueMessage.class);
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }
            return redisQueueMessage;
        }).collect(Collectors.toList());

        // 有可能是集群,多个节点,设置抢占
        for (RedisQueueMessage redisQueueMessage : msgList) {
            // TODO 同一个消息可能被多个进程获取到,之后通过删除的方式竞争,那些没抢到的相当于白取了一次任务;可以用lua优化抢占这块
            //TODO 建议增加抢占日志
            if (remove(redisQueueMessage)) { // 为true 表示抢占到了
                return redisQueueMessage;
            }
        }
        return null;
    }


    @Override
    public void afterPropertiesSet() throws Exception {
        Thread thread = new Thread("loop-redis-queue") {
            @Override
            public void run() {
                int selectCnt = 0;

                while (!Thread.interrupted() && !destroyFlag) {
                    long currentTimeNanos = System.nanoTime();

                    RedisQueueMessage redisQueueMessage = dequeue();
                    System.out.println("拉取的数据 " + redisQueueMessage);
                    if (!StringUtils.isEmpty(redisQueueMessage)) {
                        try {
                            RedisQueueProcessService redisQueueProcessService = adapterHandler(redisQueueMessage.getBeanName());

                            invokeHandler(redisQueueMessage,redisQueueProcessService);

                        }catch (Exception e){
                            e.printStackTrace();
                        }
                    }
                    selectCnt++;


                    // 解决空轮询问题
                    long time = System.nanoTime();
                    System.out.println("执行纳秒数" + (time - currentTimeNanos));
                    System.out.println(time + " -- " + (time - TimeUnit.MILLISECONDS.tonanos(timeoutMillis)) + "--" + currentTimeNanos);
                    // 当前时间减去阻塞使用的时间  >= 上面的当前时间
                    if (time - timeoutMillis >= currentTimeNanos) {
                        // 有效的轮询
                        selectCnt = 1;
                    } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                        // 如果空轮询次数大于等于SELECTOR_AUTO_REBUILD_THRESHOLD 默认512
                        selectCnt = 1;
                        threadSleep();
                    }
                }
            }
        };
        thread.setDaemon(true);
        thread.start();
    }

    private void invokeHandler(RedisQueueMessage redisQueueMessage, RedisQueueProcessService redisQueueProcessService) {
        RedisQueueProcessResp result = null;
        try {
            //TODO 建议增加真正消费之前的日志
            result =  redisQueueProcessService.handler(redisQueueMessage);
            ifFailAgainAddQueue(redisQueueMessage, result);
            //TODO 建议增加真正消费之后的日志
        } catch (Exception e) {
            //TODO 可以限制下重试次数,如果再失败,后续人工补偿
            // 执行出现异常重新加入队列
            push(redisQueueMessage);
            System.out.println("执行业务代码程序异常");
            //TODO 建议增加真正消费之后的日志,出异常的情况
            throw new RuntimeException("执行业务代码异常");
        }
    }

    protected void ifFailAgainAddQueue(RedisQueueMessage redisQueueMessage, RedisQueueProcessResp result) {
        if (!StringUtils.isEmpty(result) && HttpStatus.OK.value() != result.getCode()) {
            //TODO 可以限制下重试次数,如果再失败,后续人工补偿
            // 错误要重新加入队列
            push(redisQueueMessage);
        }
    }


    private RedisQueueProcessService adapterHandler(String beanName) {
       return applicationContext.getBean(beanName, RedisQueueProcessService.class);
    }

    private void threadSleep() {
        try {
            System.out.println("睡眠了");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }


    @PreDestroy
    public void destroy() throws Exception {
        System.out.println("应用程序已关闭");
        this.destroyFlag = true;
    }



}

执行效果

请求地址 http://localhost:8080/redis/sendMessage?msg=%E5%BC%A0%E4%B8%89&delay=3,delay=3延迟了3秒
结语和代码

核心逻辑就是一个线程不断轮询redis,有数据的时候先抢占,抢到了就找对应的处理类,执行业务代码。这中间可以记录下消费日志,方便人工补偿和排查问题。完整代码地址:https://gitee.com/apple_1030907690/spring-boot-kubernetes/tree/v1.0.1

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/762780.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号