controller层:MQ 消息发送器 接口RabbitMQ的配置项
RabbitMQBeanProcessorMQ 线程池配置 mq清除商户登录信息通知mq支付订单商户通知
代码选自jeepay
controller层:@RestController
@RequestMapping("/api/mchNotify")
public class MchNotifyController extends CommonCtrl {
@Autowired private MchNotifyRecordService mchNotifyService;
@Autowired private IMQSender mqSender;
@PreAuthorize("hasAuthority('ENT_MCH_NOTIFY_RESEND')")
@RequestMapping(value="resend/{notifyId}", method = RequestMethod.POST)
public ApiRes resend(@PathVariable("notifyId") Long notifyId) {
MchNotifyRecord mchNotify = mchNotifyService.getById(notifyId);
if (mchNotify == null) {
return ApiRes.fail(ApiCodeEnum.SYS_OPERATION_FAIL_SELETE);
}
if (mchNotify.getState() != MchNotifyRecord.STATE_FAIL) {
throw new BizException("请选择失败的通知记录");
}
//更新通知中
mchNotifyService.getbaseMapper().updateIngAndAddNotifyCountLimit(notifyId);
//调起MQ重发
mqSender.send(PayOrderMchNotifyMQ.build(notifyId));
return ApiRes.ok(mchNotify);
}
}
MQ 消息发送器 接口
public interface IMQSender {
void send(AbstractMQ mqModel);
void send(AbstractMQ mqModel, int delay);
}
有三个实现类,根据自己使用的mq类型来选择:
这里看rabbitmq的:
@Component
@ConditionalOnProperty(name = MQVenderCS.YML_VENDER_KEY, havingValue = MQVenderCS.RABBIT_MQ)
//在spring boot中有时候需要控制配置类是否生效,可以使用@ConditionalOnProperty注解来控制@Configuration是否生效.
public class RabbitMQSender implements IMQSender {
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void send(AbstractMQ mqModel) {
if(mqModel.getMQType() == MQSendTypeEnum.QUEUE){
rabbitTemplate.convertAndSend(mqModel.getMQName(), mqModel.toMessage());
}else{
// fanout模式 的 routeKEY 没意义。
this.rabbitTemplate.convertAndSend(RabbitMQConfig.FANOUT_EXCHANGE_NAME_PREFIX + mqModel.getMQName(), null, mqModel.toMessage());
}
}
@Override
public void send(AbstractMQ mqModel, int delay) {
if(mqModel.getMQType() == MQSendTypeEnum.QUEUE){
rabbitTemplate.convertAndSend(RabbitMQConfig.DELAYED_EXCHANGE_NAME, mqModel.getMQName(), mqModel.toMessage(), messagePostProcessor ->{
messagePostProcessor.getMessageProperties().setDelay(Math.toIntExact(delay * 1000));
return messagePostProcessor;
});
}else{
// fanout模式 的 routeKEY 没意义。 没有延迟属性
this.rabbitTemplate.convertAndSend(RabbitMQConfig.FANOUT_EXCHANGE_NAME_PREFIX + mqModel.getMQName(), null, mqModel.toMessage());
}
}
}
PayOrderMchNotifyMQ的 build方法:
@Data
@NoArgsConstructor
@AllArgsConstructor
public class PayOrderMchNotifyMQ extends AbstractMQ {
public static final String MQ_NAME = "QUEUE_PAY_ORDER_MCH_NOTIFY";
private MsgPayload payload;
@Data
@AllArgsConstructor
public static class MsgPayload {
private Long notifyId;
}
@Override
public String getMQName() {
return MQ_NAME;
}
@Override
public MQSendTypeEnum getMQType(){
return MQSendTypeEnum.QUEUE; // QUEUE - 点对点 、 BROADCAST - 广播模式
}
@Override
public String toMessage() {
return JSONObject.toJSONString(payload);
}
public static PayOrderMchNotifyMQ build(Long notifyId){
return new PayOrderMchNotifyMQ(new MsgPayload(notifyId));
}
public static MsgPayload parse(String msg){
return JSON.parseObject(msg, MsgPayload.class);
}
public interface IMQReceiver{
void receive(MsgPayload payload);
}
}
RabbitMQ的配置项
@Component
@ConditionalOnProperty(name = MQVenderCS.YML_VENDER_KEY, havingValue = MQVenderCS.RABBIT_MQ)
public class RabbitMQConfig {
public static final String DELAYED_EXCHANGE_NAME = "delayedExchange";
public static final String FANOUT_EXCHANGE_NAME_PREFIX = "fanout_exchange_";
@Autowired
@Qualifier(DELAYED_EXCHANGE_NAME)
private CustomExchange delayedExchange;
@Autowired
private RabbitMQBeanProcessor rabbitMQBeanProcessor;
@PostConstruct
public void init(){
// 获取到所有的MQ定义
Set> set = ClassUtil.scanPackageBySuper(ClassUtil.getPackage(AbstractMQ.class), AbstractMQ.class);
for (Class> aClass : set) {
// 实例化
AbstractMQ amq = (AbstractMQ) ReflectUtil.newInstance(aClass);
// 注册Queue === new Queue(name), queue名称/bean名称 = mqName
rabbitMQBeanProcessor.beanDefinitionRegistry.registerBeanDefinition(amq.getMQName(),
BeanDefinitionBuilder.rootBeanDefinition(Queue.class).addConstructorArgValue(amq.getMQName()).getBeanDefinition());
// 广播模式
if(amq.getMQType() == MQSendTypeEnum.BROADCAST){
// 动态注册交换机, 交换机名称/bean名称 = FANOUT_EXCHANGE_NAME_PREFIX + amq.getMQName()
rabbitMQBeanProcessor.beanDefinitionRegistry.registerBeanDefinition(FANOUT_EXCHANGE_NAME_PREFIX +amq.getMQName(),
BeanDefinitionBuilder.genericBeanDefinition(FanoutExchange.class, () ->{
// 普通FanoutExchange 交换机
return new FanoutExchange(FANOUT_EXCHANGE_NAME_PREFIX +amq.getMQName(),true,false);
// 支持 延迟的 FanoutExchange 交换机, 配置无效果。
// Map args = new HashMap<>();
// args.put("x-delayed-type", ExchangeTypes.FANOUT);
// return new CustomExchange(RabbitMQConfig.DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
}
).getBeanDefinition()
);
}else{
// 延迟交换机与Queue进行绑定, 绑定Bean名称 = mqName_DelayedBind
rabbitMQBeanProcessor.beanDefinitionRegistry.registerBeanDefinition(amq.getMQName() + "_DelayedBind",
BeanDefinitionBuilder.genericBeanDefinition(Binding.class, () ->
BindingBuilder.bind(SpringBeansUtil.getBean(amq.getMQName(), Queue.class)).to(delayedExchange).with(amq.getMQName()).noargs()
).getBeanDefinition()
);
}
}
}
}
RabbitMQBeanProcessor
@Configuration
@ConditionalOnProperty(name = MQVenderCS.YML_VENDER_KEY, havingValue = MQVenderCS.RABBIT_MQ)
public class RabbitMQBeanProcessor implements BeanDefinitionRegistryPostProcessor {
//对标准BeanFactoryPostProcessor SPI 的扩展,允许在常规 BeanFactoryPostProcessor 检测开始之前注册进一步的 bean 定义。
// 特别是,BeanDefinitionRegistryPostProcessor 可以注册进一步的 bean 定义,
// 这些定义反过来定义 BeanFactoryPostProcessor 实例
protected BeanDefinitionRegistry beanDefinitionRegistry;
@Override
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry beanDefinitionRegistry) throws BeansException {
this.beanDefinitionRegistry = beanDefinitionRegistry;
}
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory configurableListableBeanFactory) throws BeansException {
}
@Bean(name = RabbitMQConfig.DELAYED_EXCHANGE_NAME)
CustomExchange delayedExchange() {
Map args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(RabbitMQConfig.DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
}
}
MQ 线程池配置
@Configuration
@EnableAsync
public class MqThreadExecutor {
public static final String EXECUTOR_PAYORDER_MCH_NOTIFY = "mqQueue4PayOrderMchNotifyExecutor";
@Bean
public Executor mqQueue4PayOrderMchNotifyExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(20); // 线程池维护线程的最少数量
executor.setMaxPoolSize(300); // 线程池维护线程的最大数量
executor.setQueueCapacity(10); // 缓存队列
executor.setThreadNamePrefix("payOrderMchNotifyExecutor-");
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //对拒绝task的处理策略
executor.setKeepAliveSeconds(60); // 允许的空闲时间
executor.initialize();
return executor;
}
}
mq清除商户登录信息通知
public static final String MQ_NAME = "QUEUE_CLEAN_MCH_LOGIN_AUTH_CACHE";
private MsgPayload payload;
@Data
@AllArgsConstructor
public static class MsgPayload {
private List userIdList;
}
@Override
public String getMQName() {
return MQ_NAME;
}
@Override
public MQSendTypeEnum getMQType(){
return MQSendTypeEnum.QUEUE; // QUEUE - 点对点 、 BROADCAST - 广播模式
}
@Override
public String toMessage() {
return JSONObject.toJSONString(payload);
}
public static CleanMchLoginAuthCacheMQ build(List userIdList){
return new CleanMchLoginAuthCacheMQ(new MsgPayload(userIdList));
}
public static MsgPayload parse(String msg){
return JSON.parseObject(msg, MsgPayload.class);
}
public interface IMQReceiver{
void receive(MsgPayload payload);
}
}
对应消息接收器:
@Override
@Async(MqThreadExecutor.EXECUTOR_PAYORDER_MCH_NOTIFY)
@RabbitListener(queues = CleanMchLoginAuthCacheMQ.MQ_NAME)
public void receiveMsg(String msg){
mqReceiver.receive(CleanMchLoginAuthCacheMQ.parse(msg));
}
}
@Slf4j
@Component
public class CleanMchLoginAuthCacheMQReceiver implements CleanMchLoginAuthCacheMQ.IMQReceiver {
@Override
public void receive(CleanMchLoginAuthCacheMQ.MsgPayload payload) {
log.info("成功接收删除商户用户登录的订阅通知, msg={}", payload);
// 字符串转List
List userIdList = payload.getUserIdList();
// 删除redis用户缓存
if(userIdList == null || userIdList.isEmpty()){
log.info("用户ID为空");
return ;
}
for (Long sysUserId : userIdList) {
Collection cacheKeyList = RedisUtil.keys(CS.getCacheKeyToken(sysUserId, "*"));
if(cacheKeyList == null || cacheKeyList.isEmpty()){
continue;
}
for (String cacheKey : cacheKeyList) {
// 删除用户Redis信息
RedisUtil.del(cacheKey);
continue;
}
}
log.info("无权限登录用户信息已清除");
}
}
mq支付订单商户通知
@Component
@ConditionalOnProperty(name = MQVenderCS.YML_VENDER_KEY, havingValue = MQVenderCS.RABBIT_MQ)
@ConditionalOnBean(PayOrderMchNotifyMQ.IMQReceiver.class)
public class PayOrderMchNotifyRabbitMQReceiver implements IMQMsgReceiver {
@Autowired
private PayOrderMchNotifyMQ.IMQReceiver mqReceiver;
@Override
@Async(MqThreadExecutor.EXECUTOR_PAYORDER_MCH_NOTIFY)
// @Async中的参数用于指定使用哪一个线程池执行
@RabbitListener(queues = PayOrderMchNotifyMQ.MQ_NAME)
public void receiveMsg(String msg){
mqReceiver.receive(PayOrderMchNotifyMQ.parse(msg));
}
}
PayOrderMchNotifyMQ:
@Data
@NoArgsConstructor
@AllArgsConstructor
public class PayOrderMchNotifyMQ extends AbstractMQ {
public static final String MQ_NAME = "QUEUE_PAY_ORDER_MCH_NOTIFY";
private MsgPayload payload;
@Data
@AllArgsConstructor
public static class MsgPayload {
private Long notifyId;
}
@Override
public String getMQName() {
return MQ_NAME;
}
@Override
public MQSendTypeEnum getMQType(){
return MQSendTypeEnum.QUEUE; // QUEUE - 点对点 、 BROADCAST - 广播模式
}
@Override
public String toMessage() {
return JSONObject.toJSONString(payload);
}
public static PayOrderMchNotifyMQ build(Long notifyId){
return new PayOrderMchNotifyMQ(new MsgPayload(notifyId));
}
public static MsgPayload parse(String msg){
return JSON.parseObject(msg, MsgPayload.class);
}
public interface IMQReceiver{
void receive(MsgPayload payload);
}
}



