使用策略模式改造公司的一段关于 RabbitMQ 延时队列的一段代码
改造之前的代码如下:
@RabbitListener(queues = "${queue.name}")
public void process(Message content, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
String msg = new String(content.getBody());
log.info("队列接收消息: msg = {}", msg);
RabbitMsg rmsg= JSONObject.parseObject(msg, RabbitMsg.class);
try {
String pType = rmsg.getProcessType();
if ("A".equals(pType)) {
//do something...
} else if ("B".equals(pType)) {
//do something...
} else if ("C".equals(pType)) {
//do something...
} else if ("D".equals(pType)) {
//do something...
} else if ("E".equals(pType)) {
//do something...
}
else if ("ZZ".equals(pType)) {
//do something...
}
} catch (Exception e) {
// 增加try catch,防止消费失败后,一直循环消费
log.error("消息消费异常:参数->{}", msg);
RabbitExceptionInfo info= new RabbitExceptionInfo();
info.setData(rmsg.getData());
info.setTime(DateUtil.format(new Date(), DateUtil.DATETYPE_1));
indo.setRemarks(EXCEPTION_REMARK);
//加入异常队列
produceExceptionMsg(rabbitExceptionInfo);
}
//手动ack
channel.basicAck(tag, false);
}
这样根据一个 processType 来判断做什么事情,用策略模式是比较适合的
动手改造吧
- 先将根据 processType 的处理逻辑抽象出来:
public interface RabbitmqDlxHandler {
void doProcess(RabbitMsg rmsg);
}
- 有接口,就会有具体的接口实现类来完成具体的业务逻辑:
@Component
@ProcessType(value = "A")
public class processTypeA implements RabbitmqDlxHandler {
@Override
void doProcess(RabbitMsg rmsg) {
//do something about processType A...
}
}
@Component
@ProcessType(value = "B")
public class processTypeB implements RabbitmqDlxHandler {
@Override
void doProcess(RabbitMsg rmsg) {
//do something about processType B...
}
}
@Component
@ProcessType(value = "C")
public class processTypeC implements RabbitmqDlxHandler {
@Override
void doProcess(RabbitMsg rmsg) {
//do something about processType C...
}
}
- 定义一个注解 @ProcessType 来标注处理器的类型,并使用此注解将处理器初始化(后面会用到):
@Inherited
@documented
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface ProcessType {
@NotEmpty
String value();
}
- 项目启动,需要先将具体的处理器初始化到上下文中:
@Slf4j
@Component
public class RabbitmqDlxContext implements ApplicationContextAware {
private static final Map HANDLER_MAP = new HashMap<>();
@Autowired
private ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
Map beansWithAnnotation = applicationContext.getBeansWithAnnotation(ProcessType.class);
if (!CollectionUtils.isEmpty(beansWithAnnotation)) {
beansWithAnnotation.forEach((k, v) -> HANDLER_MAP.put(v.getClass().getAnnotation(ProcessType.class).value(), v.getClass()));
log.info("死信队列处理器初始化完毕:HANDLER_MAP = {}", HANDLER_MAP);
}
}
public RabbitmqDlxHandler getConcertHandler(String processType) {
Class aClass = HANDLER_MAP.get(processType);
if (Objects.isNull(aClass))
throw new ApplicationException("8848", "系统中没有orderType = " + processType+ " 对应的处理器", null);
return (RabbitmqDlxHandler) applicationContext.getBean(aClass);
}
}
- 准备工作完成了,接着来改造 RabbitMQ 延时队列的逻辑代码吧:
@Resource
private RabbitmqDlxContext rabbitmqDlxContext;
@RabbitListener(queues = "${queue.name}")
public void process(Message content, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
String msg = new String(content.getBody());
log.info("队列接收消息: msg = {}", msg);
RabbitMsg rmsg = JSONObject.parseObject(msg, RabbitMsg.class);
try {
String pType = rmsg.getProcessType();
rabbitmqDlxContext.getConcertHandler(pType).doProcess(rmsg);
} catch (Exception e) {
// 增加try catch,防止消费失败后,一直循环消费
log.error("消息消费异常:参数->{}", msg);
RabbitExceptionInfo info= new RabbitExceptionInfo();
info.setData(rmsg.getData());
info.setTime(DateUtil.format(new Date(), DateUtil.DATETYPE_1));
indo.setRemarks(EXCEPTION_REMARK);
//加入异常队列
produceExceptionMsg(rabbitExceptionInfo);
}
//手动ack
channel.basicAck(tag, false);
}
- 后续需要扩展处理器的类型,只需要增加一个处理器然后实现抽象处理器接口,然后加上 @ProcessType 注解即可
- 消息处理异常后,加入异常队列,监听异常队列消费信息,此时需要考虑消息消费的幂等性哟



