包含:
1 生产者补单队列
2 生产者重试机制
3 消费者重试机制
4 可靠性投递,最终事务一致原则
5 消费端限流 (服务端限流内存和磁盘配置此处不涉及)
rabbitConfig全局配置
package org.jeecg.boot.starter.rabbitmq.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.SerializerMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
@Configuration
@Slf4j
public class RabbitConfig {
@Autowired
private CachingConnectionFactory connectionFactory;
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate xigmaTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMandatory(true);
template.setMessageConverter(new Jackson2JsonMessageConverter());
return template;
}
//
// @Bean("customContainerFactory")
// public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
// ConnectionFactory connectionFactory) {
// SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
// factory.setConcurrentConsumers(1);//默认并发1
// factory.setMaxConcurrentConsumers(1);//最大并发1
// factory.setPrefetchCount(1);//每个消费者每次监听时可拉取,对消费者限流防止消费者压垮
// configurer.configure(factory, connectionFactory);
// return factory;
// }
//
//
// @Bean("customContainerFactory2")
// public SimpleRabbitListenerContainerFactory containerFactory2(SimpleRabbitListenerContainerFactoryConfigurer configurer,
// ConnectionFactory connectionFactory) {
// SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
// factory.setConcurrentConsumers(1);//默认并发1
// factory.setMaxConcurrentConsumers(5);//最大并发5
// factory.setPrefetchCount(1);//每个消费者每次监听时可拉取,对消费者限流防止消费者压垮
// configurer.configure(factory, connectionFactory);
// return factory;
// }
//
// @Bean("customContainerFactory3")
// public SimpleRabbitListenerContainerFactory containerFactory3(SimpleRabbitListenerContainerFactoryConfigurer configurer,
// ConnectionFactory connectionFactory) {
// SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
// factory.setConcurrentConsumers(5);//默认并发5
// factory.setMaxConcurrentConsumers(20);//最大并发20
// factory.setPrefetchCount(5);//每个消费者每次监听时可拉取,对消费者限流防止消费者压垮
// configurer.configure(factory, connectionFactory);
// return factory;
// }
}
MqConstant常量
package org.jeecg.boot.starter.rabbitmq.constant;
public class MqConstant {
public final static String ORDER_DIRECT_EXCHAGE = "order_direct_exchange"; //创建订单交换机
public final static String ORDER_DIRECT_QUEUE = "order_direct_queue"; //创建订单队列
public final static String HANDLER_ORDER_DIRECT_QUEUE = "handler_order_direct_queue"; //创建配单交队列
public final static String ORDER_DIRECT_KEY= "order_direct_key"; //创建订单key
}
控制器
package org.jeecg.modules.rabbitmq.controller;
import java.util.*;
import java.util.stream.Collectors;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.jeecg.common.api.vo.Result;
import org.jeecg.common.system.query.QueryGenerator;
import org.jeecg.common.util.UUIDGenerator;
import org.jeecg.common.util.oConvertUtils;
import org.jeecg.modules.rabbitmq.entity.RabbitmqHandleOrder;
import org.jeecg.modules.rabbitmq.entity.RabbitmqOrder;
import org.jeecg.modules.rabbitmq.rabbitmq.producer.OrderProducer;
import org.jeecg.modules.rabbitmq.service.IRabbitmqHandleOrderService;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.modules.rabbitmq.service.IRabbitmqOrderService;
import org.jeecgframework.poi.excel.ExcelImportUtil;
import org.jeecgframework.poi.excel.def.NormalExcelConstants;
import org.jeecgframework.poi.excel.entity.ExportParams;
import org.jeecgframework.poi.excel.entity.ImportParams;
import org.jeecgframework.poi.excel.view.JeecgEntityExcelView;
import org.jeecg.common.system.base.controller.JeecgController;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.multipart.MultipartFile;
import org.springframework.web.multipart.MultipartHttpServletRequest;
import org.springframework.web.servlet.ModelAndView;
import com.alibaba.fastjson.JSON;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.jeecg.common.aspect.annotation.AutoLog;
@Api(tags = "rabbitmq_handle_order")
@RestController
@RequestMapping("/rabbitmq/rabbitmqHandleOrder")
@Slf4j
public class RabbitmqHandleOrderController extends JeecgController {
@Autowired
private OrderProducer orderProducer;
@Autowired
private IRabbitmqOrderService rabbitmqOrderService;
@AutoLog(value = "rabbitmq_handle_order-添加")
@ApiOperation(value = "rabbitmq_handle_order-添加", notes = "rabbitmq_handle_order-添加")
@PostMapping(value = "/add")
@Transactional
public Result> add(@RequestBody RabbitmqHandleOrder rabbitmqHandleOrder) throws JsonProcessingException {
RabbitmqOrder order = new RabbitmqOrder().setId(rabbitmqHandleOrder.getId()).setName("测试").setOrderTime(new Date());
boolean save = rabbitmqOrderService.save(order);
if(save){
//如果订单保存成功开始派单
orderProducer.sendHandler(order);
}
return Result.OK("添加成功!");
}
}
rabbitmq交换机队列和key创建
package org.jeecg.modules.rabbitmq.rabbitmq.config;
import org.jeecg.boot.starter.rabbitmq.constant.MqConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class InitConfig {
@Bean("orderDirectExchange")
public DirectExchange getOrderDirectExchange() {
//交换机持久化 durable=true
return new DirectExchange(MqConstant.ORDER_DIRECT_EXCHAGE,true,false);
}
@Bean("orderQueue")
public Queue getOrderQueue() {
//队列持久化 durable=true
return new Queue(MqConstant.ORDER_DIRECT_QUEUE, true, false, false);
}
@Bean
public Binding bindOrder(@Qualifier("orderQueue") Queue queue, @Qualifier("orderDirectExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(MqConstant.ORDER_DIRECT_KEY);
}
@Bean("handlerQueue")
public Queue getHandlerQueue() {
//队列持久化 durable=true
return new Queue(MqConstant.HANDLER_ORDER_DIRECT_QUEUE, true, false, false);
}
@Bean
public Binding bindHandler(@Qualifier("handlerQueue") Queue queue, @Qualifier("orderDirectExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(MqConstant.ORDER_DIRECT_KEY);
}
}
订单生产者补单消费
package org.jeecg.modules.rabbitmq.rabbitmq.cosume;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.boot.starter.rabbitmq.constant.MqConstant;
import org.jeecg.modules.rabbitmq.entity.RabbitmqHandleOrder;
import org.jeecg.modules.rabbitmq.entity.RabbitmqOrder;
import org.jeecg.modules.rabbitmq.service.IRabbitmqHandleOrderService;
import org.jeecg.modules.rabbitmq.service.IRabbitmqOrderService;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.io.IOException;
@Component
@Slf4j
public class OrderAgainComsume {
@Autowired
private IRabbitmqOrderService rabbitmqOrderService;
@RabbitHandler
@RabbitListener(queues = MqConstant.ORDER_DIRECT_QUEUE)
@Transactional
public void process(Channel channel, Message message) throws IOException {
try {
log.info("订单消费者成功接收消息:{}", message.getMessageProperties().getMessageId());//消息id必须唯一当前就是订单id
ObjectMapper mapper=new ObjectMapper();
RabbitmqOrder order = mapper.readValue(message.getBody(), RabbitmqOrder.class);
//查询订单是否存在
RabbitmqOrder byId = rabbitmqOrderService.getById(order.getId());
if(null==byId){
rabbitmqOrderService.save(order);
}
log.info("订单已存在无需补单");
// 确认收到消息,只确认当前消费者的一个消息收到,手动签收消息,通知mq服务端删除改消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// throw new NullPointerException();
} catch (Exception e) {
e.printStackTrace();
//次重复消息投递
log.info("消费消息失败: {}", message.getMessageProperties().getDeliveryTag());
// 拒绝消息,并且不再重新进入队列,重试机制两个条件1抛出异常,2不能重入队列而是重试,此处设置false
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
//记录重试次数日志,重试次数超过全局设置的5次开始进行定时job补偿或者人工补偿,
// 注意不建议开启死性队列 原因 1:当前补偿一般是网络延迟等重试一次就可以解决。2:服务挂了重启重试一次也可以解决。3:代码本身判断有问题无法消费(这种情况大概率要人工补偿了),所以死性队列没多大意义
//记录日志库逻辑...
throw e;
}
}
}
派单消费者
package org.jeecg.modules.rabbitmq.rabbitmq.cosume;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.boot.starter.rabbitmq.constant.MqConstant;
import org.jeecg.modules.rabbitmq.entity.RabbitmqHandleOrder;
import org.jeecg.modules.rabbitmq.entity.RabbitmqOrder;
import org.jeecg.modules.rabbitmq.service.IRabbitmqHandleOrderService;
import org.jeecg.modules.rabbitmq.service.IRabbitmqOrderService;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.io.IOException;
import java.util.Map;
import static com.alibaba.fastjson.JSONValidator.Type.Object;
@Component
@Slf4j
public class OrderComsume {
@Autowired
private IRabbitmqHandleOrderService rabbitmqHandleOrderService;
@RabbitHandler
@RabbitListener(queues = MqConstant.ORDER_DIRECT_QUEUE)
@Transactional
public void process(Channel channel, Message message) throws IOException {
try {
log.info("消费者成功接收消息:{}", message.getMessageProperties().getDeliveryTag());
ObjectMapper mapper=new ObjectMapper();
RabbitmqOrder order = mapper.readValue(message.getBody(), RabbitmqOrder.class);
RabbitmqHandleOrder rabbitmqHandleOrder=new RabbitmqHandleOrder();
rabbitmqHandleOrder.setOrderId(order.getId());
rabbitmqHandleOrderService.save(rabbitmqHandleOrder);
// 确认收到消息,只确认当前消费者的一个消息收到,手动签收消息,通知mq服务端删除改消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// throw new NullPointerException();
} catch (Exception e) {
e.printStackTrace();
//次重复消息投递
log.info("消费消息失败: {}", message.getMessageProperties().getDeliveryTag());
// 拒绝消息,并且不再重新进入队列,重试机制两个条件1抛出异常,2不能重入队列而是重试,此处设置false
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
//记录重试次数日志,重试次数超过全局设置的5次开始进行定时job补偿或者人工补偿,
// 注意不建议开启死性队列 原因 1:当前补偿一般是网络延迟等重试一次就可以解决。2:服务挂了重启重试一次也可以解决。3:代码本身判断有问题无法消费(这种情况大概率要人工补偿了),所以死性队列没多大意义
//记录日志库逻辑...
throw e;
}
}
}
派单生产者
package org.jeecg.modules.rabbitmq.rabbitmq.producer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.boot.starter.rabbitmq.callback.CustomConfirmAndReturnCallback;
import org.jeecg.boot.starter.rabbitmq.constant.MqConstant;
import org.jeecg.modules.rabbitmq.entity.RabbitmqOrder;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.UUID;
@Component
@Slf4j
public class OrderProducer extends CustomConfirmAndReturnCallback {
@Autowired
private RabbitTemplate xigmaTemplate;
@PostConstruct
public void init() {
//指定 ConfirmCallback
xigmaTemplate.setConfirmCallback(this);
//指定 ReturnCallback
xigmaTemplate.setReturnCallback(this);
}
public void sendHandler(RabbitmqOrder order) throws JsonProcessingException {
log.info("发送消息成功:{}",order.getId());
//创建消费对象,并指定全局唯一ID(这里使用UUID,也可以根据业务规则生成,只要保证全局唯一即可)
CorrelationData correlationData = new CorrelationData(order.getId());
MessageProperties properties = new MessageProperties();
properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);//消息持久化
properties.setMessageId(order.getId());
properties.setPriority(0);
properties.setContentType(MessageProperties.CONTENT_TYPE_BYTES);
properties.setContentType("utf-8");
ObjectMapper mapper=new ObjectMapper();
byte[] bytes = mapper.writeValueAsBytes(order);
Message message = new Message(bytes, properties);
xigmaTemplate.convertAndSend(MqConstant.ORDER_DIRECT_EXCHAGE, MqConstant.ORDER_DIRECT_KEY, message,correlationData);
}
@Override
public void confirm(CorrelationData correlationData, boolean isSendSuccess, String error) {
// super.confirm(correlationData, isSendSuccess, error);
String orderId=correlationData.getId();
log.info("重写确认机制:{}",orderId);
if(isSendSuccess){
log.info("重写发送成功:{}",orderId);
}else{
log.info("重写发送失败原因:{}",error);
RabbitmqOrder rabbitmqOrder=new RabbitmqOrder();
rabbitmqOrder.setId(orderId);
try {
//重试机制重新发送,注意当前生产者发送变成死循环,需要全局配置重试机制,超过重试次数就不执行了
sendHandler(rabbitmqOrder);
//记录重试次数日志,重试次数超过全局设置的5次开始进行定时job补偿或者人工补偿
//记录日志库
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
}
}
application全局配置消费者限流等信息
#rabbitmq配置
rabbitmq:
host: pe-boot-rabbitmq
username: admin
password: admin
port: 5672
virtual-host: zzq
connection-timeout: 15000
#开启confirm模式
publisher-confirms: true
#开启return模式,前提是下面的mandatory设置为true否则会删除消息
publisher-returns: true
#消费者端开启自动ack模式
template.mandatory: true
#新版本publisher-confirms已经修改为publisher-confirm-type,默认为NONE,CORRELATED值是发布消息成功到交换器会触发回调
publisher-confirm-type: correlated
listener:
simple:
acknowledge-mode: manual
#并发消费者的最大值
max-concurrency: 5
#并发消费者的初始化值
concurrency: 1
#每个消费者每次监听时可拉取
prefetch: 1
# 重试机制
retry:
#是否开启消费者重试
enabled: true
#最大重试次数
max-attempts: 5
#重试间隔时间(单位毫秒)
initial-interval: 5000
#重试最大时间间隔(单位毫秒)
max-interval: 10000
#间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间
multiplier: 2
pom文件依赖
org.springframework.cloud
spring-cloud-starter-bus-amqp



