栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

最完善的rabbitmq消息队列工具类

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

最完善的rabbitmq消息队列工具类

包含:

1 生产者补单队列

2 生产者重试机制

3 消费者重试机制

4 可靠性投递,最终事务一致原则

5 消费端限流 (服务端限流内存和磁盘配置此处不涉及)

rabbitConfig全局配置

package org.jeecg.boot.starter.rabbitmq.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.SerializerMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;

@Configuration
@Slf4j
public class RabbitConfig {
    @Autowired
    private CachingConnectionFactory connectionFactory;
    
    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate xigmaTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        
        template.setMandatory(true);
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        return template;
    }

//    
//    @Bean("customContainerFactory")
//    public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
//                                                                 ConnectionFactory connectionFactory) {
//        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
//        factory.setConcurrentConsumers(1);//默认并发1
//        factory.setMaxConcurrentConsumers(1);//最大并发1
//        factory.setPrefetchCount(1);//每个消费者每次监听时可拉取,对消费者限流防止消费者压垮
//        configurer.configure(factory, connectionFactory);
//        return factory;
//    }
//
//    
//    @Bean("customContainerFactory2")
//    public SimpleRabbitListenerContainerFactory containerFactory2(SimpleRabbitListenerContainerFactoryConfigurer configurer,
//                                                                 ConnectionFactory connectionFactory) {
//        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
//        factory.setConcurrentConsumers(1);//默认并发1
//        factory.setMaxConcurrentConsumers(5);//最大并发5
//        factory.setPrefetchCount(1);//每个消费者每次监听时可拉取,对消费者限流防止消费者压垮
//        configurer.configure(factory, connectionFactory);
//        return factory;
//    }
//    
//    @Bean("customContainerFactory3")
//    public SimpleRabbitListenerContainerFactory containerFactory3(SimpleRabbitListenerContainerFactoryConfigurer configurer,
//                                                                  ConnectionFactory connectionFactory) {
//        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
//        factory.setConcurrentConsumers(5);//默认并发5
//        factory.setMaxConcurrentConsumers(20);//最大并发20
//        factory.setPrefetchCount(5);//每个消费者每次监听时可拉取,对消费者限流防止消费者压垮
//        configurer.configure(factory, connectionFactory);
//        return factory;
//    }

}
MqConstant常量
package org.jeecg.boot.starter.rabbitmq.constant;


public class MqConstant {

    
    public final static String ORDER_DIRECT_EXCHAGE = "order_direct_exchange"; //创建订单交换机

    
    public final static String ORDER_DIRECT_QUEUE = "order_direct_queue"; //创建订单队列
    public final static String HANDLER_ORDER_DIRECT_QUEUE = "handler_order_direct_queue"; //创建配单交队列

    
    public final static String ORDER_DIRECT_KEY= "order_direct_key"; //创建订单key


 }

控制器

package org.jeecg.modules.rabbitmq.controller;

import java.util.*;
import java.util.stream.Collectors;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import com.fasterxml.jackson.core.JsonProcessingException;
import org.jeecg.common.api.vo.Result;
import org.jeecg.common.system.query.QueryGenerator;
import org.jeecg.common.util.UUIDGenerator;
import org.jeecg.common.util.oConvertUtils;
import org.jeecg.modules.rabbitmq.entity.RabbitmqHandleOrder;
import org.jeecg.modules.rabbitmq.entity.RabbitmqOrder;
import org.jeecg.modules.rabbitmq.rabbitmq.producer.OrderProducer;
import org.jeecg.modules.rabbitmq.service.IRabbitmqHandleOrderService;

import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import lombok.extern.slf4j.Slf4j;

import org.jeecg.modules.rabbitmq.service.IRabbitmqOrderService;
import org.jeecgframework.poi.excel.ExcelImportUtil;
import org.jeecgframework.poi.excel.def.NormalExcelConstants;
import org.jeecgframework.poi.excel.entity.ExportParams;
import org.jeecgframework.poi.excel.entity.ImportParams;
import org.jeecgframework.poi.excel.view.JeecgEntityExcelView;
import org.jeecg.common.system.base.controller.JeecgController;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.multipart.MultipartFile;
import org.springframework.web.multipart.MultipartHttpServletRequest;
import org.springframework.web.servlet.ModelAndView;
import com.alibaba.fastjson.JSON;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.jeecg.common.aspect.annotation.AutoLog;


@Api(tags = "rabbitmq_handle_order")
@RestController
@RequestMapping("/rabbitmq/rabbitmqHandleOrder")
@Slf4j
public class RabbitmqHandleOrderController extends JeecgController {

    @Autowired
    private OrderProducer orderProducer;
    @Autowired
    private IRabbitmqOrderService rabbitmqOrderService;
    
    @AutoLog(value = "rabbitmq_handle_order-添加")
    @ApiOperation(value = "rabbitmq_handle_order-添加", notes = "rabbitmq_handle_order-添加")
    @PostMapping(value = "/add")
    @Transactional
    public Result add(@RequestBody RabbitmqHandleOrder rabbitmqHandleOrder) throws JsonProcessingException {
        RabbitmqOrder order = new RabbitmqOrder().setId(rabbitmqHandleOrder.getId()).setName("测试").setOrderTime(new Date());
        boolean save = rabbitmqOrderService.save(order);
        if(save){
            //如果订单保存成功开始派单
            orderProducer.sendHandler(order);
        }

        return Result.OK("添加成功!");
    }



}

rabbitmq交换机队列和key创建

package org.jeecg.modules.rabbitmq.rabbitmq.config;


import org.jeecg.boot.starter.rabbitmq.constant.MqConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class InitConfig {

    
    @Bean("orderDirectExchange")
    public DirectExchange getOrderDirectExchange() {
        //交换机持久化 durable=true
        return new DirectExchange(MqConstant.ORDER_DIRECT_EXCHAGE,true,false);
    }

    @Bean("orderQueue")
    public Queue getOrderQueue() {
        //队列持久化 durable=true
        return new Queue(MqConstant.ORDER_DIRECT_QUEUE, true, false, false);
    }

    @Bean
    public Binding bindOrder(@Qualifier("orderQueue") Queue queue, @Qualifier("orderDirectExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(MqConstant.ORDER_DIRECT_KEY);
    }


    @Bean("handlerQueue")
    public Queue getHandlerQueue() {
        //队列持久化 durable=true
        return new Queue(MqConstant.HANDLER_ORDER_DIRECT_QUEUE, true, false, false);
    }

    @Bean
    public Binding bindHandler(@Qualifier("handlerQueue") Queue queue, @Qualifier("orderDirectExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(MqConstant.ORDER_DIRECT_KEY);
    }
}

订单生产者补单消费

package org.jeecg.modules.rabbitmq.rabbitmq.cosume;


import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.boot.starter.rabbitmq.constant.MqConstant;
import org.jeecg.modules.rabbitmq.entity.RabbitmqHandleOrder;
import org.jeecg.modules.rabbitmq.entity.RabbitmqOrder;
import org.jeecg.modules.rabbitmq.service.IRabbitmqHandleOrderService;
import org.jeecg.modules.rabbitmq.service.IRabbitmqOrderService;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.io.IOException;


@Component
@Slf4j
public class OrderAgainComsume {
    @Autowired
    private IRabbitmqOrderService rabbitmqOrderService;

    @RabbitHandler
    @RabbitListener(queues = MqConstant.ORDER_DIRECT_QUEUE)
    @Transactional
    public void process(Channel channel, Message message) throws IOException {
        try {

            log.info("订单消费者成功接收消息:{}", message.getMessageProperties().getMessageId());//消息id必须唯一当前就是订单id
            ObjectMapper mapper=new ObjectMapper();
            RabbitmqOrder order = mapper.readValue(message.getBody(), RabbitmqOrder.class);
            //查询订单是否存在
            RabbitmqOrder byId = rabbitmqOrderService.getById(order.getId());
            if(null==byId){
                rabbitmqOrderService.save(order);
            }
            log.info("订单已存在无需补单");
            // 确认收到消息,只确认当前消费者的一个消息收到,手动签收消息,通知mq服务端删除改消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
         //    throw new NullPointerException();
        } catch (Exception e) {
            e.printStackTrace();
            //次重复消息投递
            log.info("消费消息失败: {}", message.getMessageProperties().getDeliveryTag());
            // 拒绝消息,并且不再重新进入队列,重试机制两个条件1抛出异常,2不能重入队列而是重试,此处设置false
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
            //记录重试次数日志,重试次数超过全局设置的5次开始进行定时job补偿或者人工补偿,
            // 注意不建议开启死性队列  原因 1:当前补偿一般是网络延迟等重试一次就可以解决。2:服务挂了重启重试一次也可以解决。3:代码本身判断有问题无法消费(这种情况大概率要人工补偿了),所以死性队列没多大意义
            //记录日志库逻辑...
            throw e;
        }
    }
}

 派单消费者

package org.jeecg.modules.rabbitmq.rabbitmq.cosume;


import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.boot.starter.rabbitmq.constant.MqConstant;
import org.jeecg.modules.rabbitmq.entity.RabbitmqHandleOrder;
import org.jeecg.modules.rabbitmq.entity.RabbitmqOrder;
import org.jeecg.modules.rabbitmq.service.IRabbitmqHandleOrderService;
import org.jeecg.modules.rabbitmq.service.IRabbitmqOrderService;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.io.IOException;
import java.util.Map;

import static com.alibaba.fastjson.JSONValidator.Type.Object;


@Component
@Slf4j
public class OrderComsume {
    @Autowired
    private IRabbitmqHandleOrderService rabbitmqHandleOrderService;

    @RabbitHandler
    @RabbitListener(queues = MqConstant.ORDER_DIRECT_QUEUE)
    @Transactional
    public void process(Channel channel, Message message) throws IOException {
        try {

            log.info("消费者成功接收消息:{}", message.getMessageProperties().getDeliveryTag());
            ObjectMapper mapper=new ObjectMapper();
            RabbitmqOrder order = mapper.readValue(message.getBody(), RabbitmqOrder.class);
            RabbitmqHandleOrder rabbitmqHandleOrder=new RabbitmqHandleOrder();
            rabbitmqHandleOrder.setOrderId(order.getId());
            rabbitmqHandleOrderService.save(rabbitmqHandleOrder);
            // 确认收到消息,只确认当前消费者的一个消息收到,手动签收消息,通知mq服务端删除改消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
         //    throw new NullPointerException();
        } catch (Exception e) {
            e.printStackTrace();
            //次重复消息投递
            log.info("消费消息失败: {}", message.getMessageProperties().getDeliveryTag());
            // 拒绝消息,并且不再重新进入队列,重试机制两个条件1抛出异常,2不能重入队列而是重试,此处设置false
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
            //记录重试次数日志,重试次数超过全局设置的5次开始进行定时job补偿或者人工补偿,
            // 注意不建议开启死性队列  原因 1:当前补偿一般是网络延迟等重试一次就可以解决。2:服务挂了重启重试一次也可以解决。3:代码本身判断有问题无法消费(这种情况大概率要人工补偿了),所以死性队列没多大意义
            //记录日志库逻辑...
            throw e;
        }
    }
}

派单生产者

package org.jeecg.modules.rabbitmq.rabbitmq.producer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.boot.starter.rabbitmq.callback.CustomConfirmAndReturnCallback;
import org.jeecg.boot.starter.rabbitmq.constant.MqConstant;
import org.jeecg.modules.rabbitmq.entity.RabbitmqOrder;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.UUID;


@Component
@Slf4j
public class OrderProducer extends CustomConfirmAndReturnCallback {
    @Autowired
    private RabbitTemplate xigmaTemplate;

    @PostConstruct
    public void init() {
        //指定 ConfirmCallback
        xigmaTemplate.setConfirmCallback(this);
        //指定 ReturnCallback
        xigmaTemplate.setReturnCallback(this);
    }

    public void sendHandler(RabbitmqOrder order) throws JsonProcessingException {
        log.info("发送消息成功:{}",order.getId());
        //创建消费对象,并指定全局唯一ID(这里使用UUID,也可以根据业务规则生成,只要保证全局唯一即可)
        CorrelationData correlationData = new CorrelationData(order.getId());
        MessageProperties properties = new MessageProperties();
        properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);//消息持久化
        properties.setMessageId(order.getId());
        properties.setPriority(0);
        properties.setContentType(MessageProperties.CONTENT_TYPE_BYTES);
        properties.setContentType("utf-8");
        ObjectMapper mapper=new ObjectMapper();
        byte[] bytes = mapper.writeValueAsBytes(order);
        Message message = new Message(bytes, properties);
        xigmaTemplate.convertAndSend(MqConstant.ORDER_DIRECT_EXCHAGE, MqConstant.ORDER_DIRECT_KEY, message,correlationData);
    }

    
    @Override
    public void confirm(CorrelationData correlationData, boolean isSendSuccess, String error) {
       // super.confirm(correlationData, isSendSuccess, error);
        String orderId=correlationData.getId();
        log.info("重写确认机制:{}",orderId);
        if(isSendSuccess){
            log.info("重写发送成功:{}",orderId);
        }else{
            log.info("重写发送失败原因:{}",error);
            RabbitmqOrder rabbitmqOrder=new RabbitmqOrder();
            rabbitmqOrder.setId(orderId);
            try {
                //重试机制重新发送,注意当前生产者发送变成死循环,需要全局配置重试机制,超过重试次数就不执行了
                sendHandler(rabbitmqOrder);
                //记录重试次数日志,重试次数超过全局设置的5次开始进行定时job补偿或者人工补偿
                //记录日志库
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            }
        }

    }
}

application全局配置消费者限流等信息

#rabbitmq配置
  rabbitmq:
    host: pe-boot-rabbitmq
    username: admin
    password: admin
    port: 5672
    virtual-host: zzq
    connection-timeout: 15000
    #开启confirm模式
    publisher-confirms: true
    #开启return模式,前提是下面的mandatory设置为true否则会删除消息
    publisher-returns: true
     #消费者端开启自动ack模式
    template.mandatory: true
    #新版本publisher-confirms已经修改为publisher-confirm-type,默认为NONE,CORRELATED值是发布消息成功到交换器会触发回调
    publisher-confirm-type: correlated
    listener:
      simple:
        acknowledge-mode: manual
        #并发消费者的最大值
        max-concurrency: 5
        #并发消费者的初始化值
        concurrency: 1
        #每个消费者每次监听时可拉取
        prefetch: 1
        # 重试机制
        retry:
          #是否开启消费者重试
          enabled: true 
          #最大重试次数
          max-attempts: 5
          #重试间隔时间(单位毫秒)
          initial-interval: 5000
          #重试最大时间间隔(单位毫秒)
          max-interval: 10000
          #间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间
          multiplier: 2 

pom文件依赖

  
        
            org.springframework.cloud
            spring-cloud-starter-bus-amqp
        

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/880855.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号