栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

SpringBoot如何实现,避免消息重复消费

SpringBoot如何实现,避免消息重复消费

利用redis实现,因此先导入Redis依赖:


    org.springframework.boot
    spring-boot-starter-data-redis

编写配置文件,增加redis的ip和端口配置

spring:
  rabbitmq:
    host: 192.168.200.129
    port: 5672
    username: test
    password: test
    virtual-host: /test
    listener:
      simple:
        acknowledge-mode: manual
    publisher-/confirm/i-type: simple
    publisher-returns: true
  redis:
    host: 192.168.200.129
    port: 6379

修改生产者,

@Test
void contextLoads() throws IOException {
    CorrelationData messageId = new CorrelationData(UUID.randomUUID().toString());//data,id的d巧记
    
    rabbitTemplate.convertAndSend("boot-topic-exchange","slow.red.dog","红色大狼狗!!",messageId);//增加一个data即封装id的对象,方法重载
    System.in.read();
}

修改消费者,利用redis和setnx方法的特点和id控制标记法,解决消息被消费者重复消费的问题

@Autowired
private StringRedisTemplate redisTemplate;//换类StringRedisTemplate是RedisTemplate的子类,更加强大

@RabbitListener(queues = "boot-queue")
public void getMessage(String msg, Channel channel, Message message) throws IOException {
    //0. 通过消息属性的头,或者spring被动返回的消息相关,数据比如获取MessageId
    String messageId = message.getMessageProperties().getHeader("spring_returned_message_correlation");
    
    //1. 设置key到Redis
    if(redisTemplate.opsForValue().setIfAbsent(messageId,"0",10, TimeUnit.SECONDS)) {//原来是10秒测试redis中是否有消费完消息后id为1的数据,设置为10秒太快消失不便于下面的测试查看
        //2. 消费消息,暂时先打印消息来模拟消费,至于消息以后用来干啥得看具体的需求↓
        System.out.println("接收到消息:" + msg);

        //3. 设置key的value为1
        redisTemplate.opsForValue().set(messageId,"1",10,TimeUnit.SECONDS);
        //4.  手动ack
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }else {
        //5. 获取Redis中的value即可 如果是1,手动ack
        if("1".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))){
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }
    }
}

启动测试,可以看到效果。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/721401.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号