栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

RabbitMQ防止重复消费

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

RabbitMQ防止重复消费

RabbitConfig 

@EnableRabbit
@Configuration
public class RabbitConfig {
    
    
    //测试队列名称
    private String fanoutQueueName = "fanoutQueue";
    // 测试交换机名称
    private String fanoutExchange = "fanoutExchange";
    // RoutingKey 路由键无需配置,配置也不起作用
    //private String fanoutRoutingKey = "fanoutRoutingKey";

    
    @Bean
    public Queue fanoutQueue() {
        return new Queue(fanoutQueueName);
    }
    
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(fanoutExchange);
    }
    
    @Bean
    public Binding fanoutExchangeBinding() {
        return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange());
    }

    
}

producer

@Component
public class MyProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;
   
    
    public String sendjsonObject(String queueName) {//构造前端回传参数
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("email", "Icomxchishi@qq.com");
        jsonObject.put("phone", "03803880388");
        
        jsonObject.put("age", 15);
        jsonObject.put("sex","女");
        jsonObject.put("name", "messageData");

        jsonObject.put("data", Arrays.asList("张三","12345678",12,"男"));
        jsonObject.put("timestamp", System.currentTimeMillis());
        String jsonString = jsonObject.toJSONString();
        
        String messageId = String.valueOf(UUID.randomUUID());
        Message message = MessageBuilder.withBody(jsonString.getBytes())
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                .setContentType(MessageProperties.CONTENT_TYPE_JSON)
                .setDeliveryTag(System.currentTimeMillis())
                .setContentEncoding("utf-8")
                .setMessageId(messageId)
                .build();
        rabbitTemplate.convertAndSend(queueName, message);
        return "sendjsonObject";
    }

    
    public String sendMessageMap(String queueName) {
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "my message!";
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map map = new HashMap<>();
        map.put("messageId", messageId);
        map.put("messageData", messageData);
        map.put("createTime", createTime);
        map.put("msg","随便");
        map.put("data", Arrays.asList("张三","12345678",12,"男"));
        rabbitTemplate.convertAndSend(queueName, map);
        return "sendMapMessage";
    }

  
    

    

    
}

consumer,以sendjsonObject();为例

@Slf4j
@Component
public class MyConsumer {

    @Autowired
    private RedisTemplate redisTemplate;
    @Autowired
    private RedisClient redisClient;//本人封装的,可以使用redisTemplate
    @Autowired
    private UserService userService;
    @Autowired
    private LoggerService loggerService;


    
    @RabbitListener(queues = "fanoutQueue")
    public void sendjsonObject(Message message, Channel channel) throws Exception{//消费者消息重复
        log.info("MyConsumer  消费者收到消息:{}" , JSONObject.toJSONString(message));
        String messageId = message.getMessageProperties().getMessageId();
        String msg = new String(message.getBody(), "UTF-8");
        String messageIdRedis = //(String) redisTemplate.opsForValue().get("messageId");
        redisClient.getString("messageId");
        if (messageId != null) {
            if (messageId.equals(messageIdRedis)) {
                return;
            }
        }
        System.out.println("number==" + channel.getChannelNumber());
        JSONObject jsonObject = JSONObject.parseObject(msg);
        long deliverTag = message.getMessageProperties().getDeliveryTag();
        try {
            //具体业务,如:插入或修改user,这里用数据库可以不用redis做判断
            Integer age = (Integer) jsonObject.get("age");
            String name = (String) jsonObject.get("name");
            String sex = (String) jsonObject.get("sex");
            LocalDateTime localDateTime = LocalDateTime.now();
            Date dataTime = Date.from(localDateTime.atZone(ZoneId.systemDefault()).toInstant());
            User user = new User();
            user.setSex(sex);
            user.setName(name);
            user.setDataTime(dataTime);
            user.setAge(age);
            int i = 0;
            if (user != null){
                i = userService.savetUser(user);//新增或修改
                redisClient.setKey(user.getId() + "", user,  5_000L);// 写入缓存,便于操作延时业务
                //redisTemplate.opsForList().rightPushAll(user.getId(), Arrays.asList(user));
                //loggerService.saveLog(jsonObject);//日志记录
            }
            log.info("收到消息obj:" + user + ",id=" + user.getId());
            log.info("保存了" + i + "记录");
            channel.basicAck(deliverTag,true);//手动设置ack,消费成功,确认消息
        }catch (Exception e){
            try {
                //异常返回false,就重新回到队列
                channel.basicNack(deliverTag, false, true);
            } catch (Exception ioException) {
                log.error("重新放入队列失败,失败原因:{}",e.getMessage(),e);
            }
            log.error("TopicConsumer消费者出错,mq参数:{},错误信息:{}",message,e.getMessage(),e);
            
            //channel.basicReject(deliverTag, false);
        }
        //redisTemplate.opsForValue().set(messageId, messageId, 30, TimeUnit.SECONDS);//时间具体根据业务定
        redisClient.setKey(messageId,messageId,3_000L);// 写入缓存
        log.info("消费消息jsonObject:" + jsonObject + ",messageId:" + messageId);
    }
    


    @RabbitListener(queues = "fanoutQueue")
    public void sendMessageMap(Map map){
        log.info("收到消息map:" + map.toString());
    }



   
}
ProducerController

@RestController
@RequestMapping(value = "producer")
public class ProducerController {

    @Autowired
    private MyProducer myProducer;

    @RequestMapping("/sendjsonObject")
    public String sendjsonObject() {//前端传参数回来在producer那里模拟了,这里就略
        return myProducer.sendjsonObject("fanoutQueue");
    }


    @GetMapping("/sendMessageMap")
    public String sendMessageMap() {
        return myProducer.sendMessageMap("fanoutQueue");
    }



   

}

application.yml

server:
  port: 8081

       

spring:
  datasource:
    url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false
    driver-class-name: com.mysql.jdbc.Driver
    username: root
    password: 
    dbcp2: 
      initial-size: 5                                        
      max-total: 5                                            
      max-wait-millis: 200                                    
      min-idle: 5 

  redis:
    database: 0
    host: 127.0.0.1
    port: 6379
    password:
  

  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    publisher-returns: true
    publisher-confirms: true
    connection-timeout: 5000
 
    #另外一种打印语句的方式
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
 

logging:
  level:
    com:
      acong:
        dao: debug
  file: d:/logs/redis.log
  

效果图:

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/841065.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号