栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

如何利用RabbitMQ的延迟队列实现文章一段时间后自动处理审核

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

如何利用RabbitMQ的延迟队列实现文章一段时间后自动处理审核

问题

假设有一个需求是文章发布后如果一段时间没有被审核的话就会自动成为已审核状态,或者也可以是成为未通过审核状态。总之需求就是自打文章发布那一刻起就随时携带了一个“定时器”,到点就自动执行某些操作。
根据这个特性我们很容易想到利用cron表达式,每隔1s或2s去检查此时距离文章发布时是否达到了指定时间,如果超过了指定时间就执行一段目标程序。

		UPDATe `p_post`
        SET `status` = 1//通过审核状态
        WHERe
        status = 0
        AND
        NOW() >= DATE_ADD(time,INTERVAL 1 DAY)//距离发布时间time已经过了1天以上

可是数据多的情况下这样未免太耗时了,所以我们使用延迟队列来解决这个需求。

解决

关于rabbitmq和延迟队列以及死信队列的概念就不多介绍了,可以参考相关文章,这里直接给出我的代码实现。

maven
	
      org.springframework.boot
      spring-boot-starter-amqp
    
YML
  rabbitmq:
    host: 
    port: 5672
    password: 
    username: 
    listener:
      type: simple
      simple:
        default-requeue-rejected: false
        acknowledge-mode: manual
    virtual-host: 
    #    确认回调
    publisher-/confirm/i-type: correlated
    #    失败回调
    publisher-returns: true
RabbitMq的配置常量
package com.zhuhodor.server.rabbitMq.config;


public class RabbitMqConstant {
	//12个小时
    public static final int DELAY_TIME = 12*60*60*1000;

    public static final String DEAD_LETTER_EXCHANGE = "delay.queue.check.deadletter.exchange";
    public static final String DELAY_EXCHANGE_NAME = "delay.queue.check.business.exchange";

    public static final String DELAY_QUEUEA_NAME = "delay.queue.check.business.queuea";
    public static final String DELAY_QUEUEB_NAME = "delay.queue.check.business.queueb";
    public static final String DEAD_LETTER_QUEUEA_NAME = "delay.queue.check.deadletter.queuea";
    public static final String DEAD_LETTER_QUEUEB_NAME = "delay.queue.check.deadletter.queueb";

    public static final String DELAY_QUEUEA_ROUTING_KEY = "delay.queue.check.business.queuea.routingkey";
    public static final String DELAY_QUEUEB_ROUTING_KEY = "delay.queue.check.business.queueb.routingkey";
    public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "delay.queue.check.deadletter.delay_12h.routingkey";
    public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "delay.queue.check.deadletter.delay_xh.routingkey";

}

RabbitMq的Config

这里主要配置业务交换器,业务队列,死信队列,死信交换器

package com.zhuhodor.server.rabbitMq.config;


import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
@Slf4j
public class RabbitMqConfig {
    @Autowired
    private CachingConnectionFactory cachingConnectionFactory;

    @Bean
    public RabbitTemplate rabbitTemplate(){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);

        //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
//        rabbitTemplate.setMandatory(true);
        
        rabbitTemplate.set/confirm/iCallback(((correlationData, ack, cause) -> {
            String msgId = correlationData.getId();
            if (ack){
                log.info("{}====================>消息发送成功", msgId);
//                mailLogService.update(new UpdateWrapper().set("status", 1).eq("msgId", msgId));
            }else {
                log.error("{}============消息发送失败", msgId);
            }
        }));
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        
        rabbitTemplate.setReturnsCallback((msg)->{
            log.info("{}=======================>消息发送到queue时失败",msg.getMessage());
        });
        return rabbitTemplate;
    }

    // 业务延时队列
    // 绑定到对应的死信交换机
    @Bean
    public Queue delayQueueA(){
    	//直接new Queue的方法
        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
        // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
        //   return new Queue("DirectQueue",true,true,false);

		//利用QueueBuilder的方法
        Map args = new HashMap<>(2);
        // x-dead-letter-exchange    这里声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", RabbitMqConstant.DEAD_LETTER_EXCHANGE);
        // x-dead-letter-routing-key  这里声明当前队列的死信路由key
        args.put("x-dead-letter-routing-key", RabbitMqConstant.DEAD_LETTER_QUEUEA_ROUTING_KEY);
        // x-message-ttl  声明队列的TTL
        args.put("x-message-ttl", RabbitMqConstant.DELAY_TIME);
        return QueueBuilder.durable(RabbitMqConstant.DELAY_QUEUEA_NAME).withArguments(args).build();
    }

    // 声明死信队列A 用于接收需要延时处理的消息
    @Bean
    public Queue deadLetterQueueA(){
        return new Queue(RabbitMqConstant.DEAD_LETTER_QUEUEA_NAME);
    }

    //业务交换机
    @Bean
    public DirectExchange delayDirectExchange() {
        //autoDelete 没有任何消费者就自动删除,设为false
        return new DirectExchange(RabbitMqConstant.DELAY_EXCHANGE_NAME,true,false);
    }

    //死信交换机
    @Bean
    public DirectExchange deadLetterExchange() {
        //autoDelete 没有任何消费者就自动删除,设为false
        return new DirectExchange(RabbitMqConstant.DEAD_LETTER_EXCHANGE,true,false);
    }

    // 声明延时队列A绑定关系
    @Bean
    public Binding delayBindingA(@Qualifier("delayQueueA") Queue queue,
                                 @Qualifier("delayDirectExchange") DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(RabbitMqConstant.DELAY_QUEUEA_ROUTING_KEY);
    }

    // 声明死信队列A绑定关系
    @Bean
    public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue,
                                      @Qualifier("deadLetterExchange") DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(RabbitMqConstant.DEAD_LETTER_QUEUEA_ROUTING_KEY);
    }

}

生产者
@Component
public class DelayMessageProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMsg(Post msg){
        rabbitTemplate.convertAndSend(RabbitMqConstant.DELAY_EXCHANGE_NAME, RabbitMqConstant.DELAY_QUEUEA_ROUTING_KEY, msg);
    }
}
死信队列延迟消费
@Component
@Slf4j
public class DeadLetterConsumer {
    @Autowired
    private IPostService postService;
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = RabbitMqConstant.DEAD_LETTER_QUEUEA_NAME)
    public void receiveA(Message message, Channel channel) throws IOException {
        channel.basicQos(1);//一次只给消费者发一条消息
        Post post = (Post)rabbitTemplate.getMessageConverter().fromMessage(message);
        log.info("死信队列A收到消息:{}", post.toString());
        //文章在数据库里的最新更新时间,因为发布文章后有可能会对文章进行修改,导致队列中的不是最新的文章
        LocalDateTime lastTime = postService.getOne(new QueryWrapper().eq("id", post.getId()).select("time")).getTime();
        //如果数据库里的文章修改时间比队列里的时间晚了5纳秒,则认为队列里的消息是最新的
        if (lastTime.compareTo(post.getTime()) < 5){
            if (postService.update(new UpdateWrapper()
                    .eq("id", Integer.valueOf(post.getId()))
                    .set("status", 1))){//消费消息,修改数据库里的文章状态
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            }else {//数据库更新的时候出现异常,把消息重新放回队列
                channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
            }
        }else {
            log.info("丢弃死信队列A的消息");
            //队列里的文章不是最新的,直接丢弃,不放回队列
            channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
        }
    }
}
Controller
@ApiOperation(value = "保存帖子")
    @PostMapping("/save")
    public Result savePost(@RequestBody Post post){
        post.setTime(LocalDateTime.now());
        if(postService.savePost(post)){
            messageProducer.sendMsg(post);//放入消息队列
            return Result.success("提交成功!",post);
        } else {
            return Result.fail("提交出错了!");
        }
    }
总结

实现消息队列有三种方式

    给每条消息设置TTL给队列设置TTL,此时队列里的所有消息都有过期时间利用插件

我用的是第2种方法。第一种方法的缺点是RabbitMQ只会检查队列头部的消息是否过期,如果过期则丢到死信队列,所以如果队列中第一个消息的延时时长很长,而第二个消息的延时时长很短,则第二个消息并不会优先放入死信队列。
而第三种需要自己下载插件,略显麻烦。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/785820.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号