栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ 定时消息处理场景

RabbitMQ 定时消息处理场景

一、需求场景

需求:后台管理添加消息支持定时发送;

看到这个需求,我第一个想到的是查询用户消息时,只查询发送时间小于当前时间的消息;

这确实是一种解决方案,不过我这边的需求复杂一些,在手机通知中心中也能定时收到消息;【就是从手机顶部滑下来的消息】

这就要用到延迟队列了,延迟队列的实现有好几种,这里主要讲 rabbitMQ 的实现方式;


二、rabbitMQ 实现消息定时发送

具体流程:添加消息时如果发送时间大于当前时间,调用添加延迟队列的方法;

我们会先计算消息发送时间和当前时间的时间差,把这个时间差和这条消息封装成放入死信队列中,如果时间一到死信队列会根据路由键把消息发送到消息推送队列,我们有个监听消息推送队列的方法,当监听到有消息时处理相应的业务逻辑;

注:死信队列可以理解为放一些异常或无法立即被消费的消息;


三、代码

    引入依赖;

    
        org.springframework.boot
        spring-boot-starter-amqp
    
    

    配置文件;

    spring:
      rabbitmq:
        host: 127.0.0.1
        port: 5672
        username: guest
        password: guest
        # 虚拟主机,可以用项目名
        virtual-host: demo
        # 开启发送方消息抵达broker确认回调
        publisher-/confirm/i-type: correlated
        # 开启发送方消息抵达队列确认回调
        publisher-returns: true
        # 只要抵达队列,以异步发送优先回调return/confirm/i
        template:
          mandatory: true
        # 手动ack消息,不使用默认消费端确认
        listener:
          simple:
            acknowledge-mode: manual
    

    配置类:配置交换机,队列和路由键的关联关系;

    @Configuration
    public class MyMQConfig {
    
    	
    	public static final String NOTICE_PUSH_EXCHANGE = "notice_push_exchange";
    	
    	public static final String NOTICE_DL_QUEUE = "notice_dl_queue";
    	
    	public static final String NOTICE_DL_KEY = "notice_dl_key";
    
    	
    	public static final String NOTICE_PUSH_QUEUE = "notice_push_queue";
    	
    	public static final String NOTICE_PUSH_KEY = "notice_push_key";
    
    	
    	@Bean
    	public Exchange noticePushExchange() {
            // 参数说明:交换机名字,是否持久化,是否自动删除,交换机参数
    		return new DirectExchange(NOTICE_PUSH_EXCHANGE, true,false, null);
    	}
    
    	
    	@Bean
    	public Queue deadLetterQueue() {
    		Map args = new HashMap<>(2);
    		// 声明死信交换机
    		args.put("x-dead-letter-exchange", NOTICE_PUSH_EXCHANGE);
    		// 转发路由键
    		args.put("x-dead-letter-routing-key", NOTICE_PUSH_KEY);
            // 参数说明:队列名字,是否持久化,是否排它,是否自动删除,队列参数
            // 排它指的是只能被一个消费者连接使用
    		return new Queue(NOTICE_DL_QUEUE, true, false, false, args);
    	}
    
    	
    	@Bean
    	public Queue noticePushQueue() {
    		return new Queue(NOTICE_PUSH_QUEUE, true, false, false, null);
    	}
    
    	
    	@Bean
    	public Binding deadLetterBinding() {
            // 参数说明:绑定名称【队列名称/】,类型【队列/交换机】,绑定交换机名称,绑定路由键,绑定参数
    		return new Binding(NOTICE_DL_QUEUE, Binding.DestinationType.QUEUE, NOTICE_PUSH_EXCHANGE, NOTICE_DL_KEY, null);
    
    	}
    
    	
    	@Bean
    	public Binding noticePushBinding() {
    		return new Binding(NOTICE_PUSH_QUEUE, Binding.DestinationType.QUEUE, NOTICE_PUSH_EXCHANGE, NOTICE_PUSH_KEY, null);
    	}
    }
    

    添加延迟消息的方法;

    public void addDelayNotice(Notice notice){
        
        // 发送时间和当前时间相差的毫秒数,DateUtil.betweenMs()是hutool工具包的方法
        long betweenMs = DateUtil.betweenMs(notice.getSendTime(), new Date());
        
        MessagePostProcessor messagePostProcessor = message -> {
            MessageProperties messageProperties = message.getMessageProperties();
            // 设置编码
            messageProperties.setContentEncoding("utf-8");
            // 设置过期时间,单位毫秒
            messageProperties.setExpiration(Long.toString(betweenMs));
            return message;
        };
        
        // 加入到死信队列
        rabbitTemplate.convertAndSend(MyMQConfig.NOTICE_PUSH_EXCHANGE, MyMQConfig.NOTICE_DL_KEY, notice, messagePostProcessor);
    }
    
    
    @Data
    public class Notice {
        
        // 主键id
    	private Integer id;
        
        // 发送时间
    	@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone = "GMT+8")
    	private Date sendTime;
        
        // 其他属性
    }
    

    监听消息推送队列;

    @Service
    @RabbitListener(queues = {MyMQConfig.NOTICE_PUSH_QUEUE})
    public class NoticePushListener {
    
        @SneakyThrows
        @RabbitHandler
        public void noticePush(Notice notice, Channel channel, Message message){
    
            try {
                // TODO 监听到消息推送队列传来的notice对象,调用推送的方法
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            } catch (Exception e) {
                // 拒签,重新放回队列(可能自身服务问题报错等原因)
                channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
            }
        }
    }
    
四、消息重复消费问题

当我们修改还没发送的消息时也会将消息加到死信队列,这时有个消息重复的问题;

在项目中我是这样处理的,在发送消息的方法中比较数据库的发送时间和消息推送队列中Notice对象的发送时间是否一样,如果不一样就不发消息;也可以加个版本号字段判断,每次修改时版本号都加1;

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/774956.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号