pom依赖:
org.apache.rocketmq rocketmq-client4.9.0 org.apache.rocketmq rocketmq-spring-boot-starter2.2.0 org.apache.rocketmq rocketmq-common4.9.0
配置类:
public class RocketMqConfig {
public static final String PAYED_ORDER_TOPIC = "TRADE_PAYED_ORDER_TOPIC";
public static final String PAYED_ORDER_GROUP = "TRADE_PAYED_ORDER_GROUP";
}
发送类:
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class MqSender {
@Autowired(required = false)
private RocketMQTemplate rocketMqTemplate;
private final static int DEFAULT_TIMEOUT = 3000;
public void sendRetry(String topic, MqRetryMessage mqRetryMessage){
sendDelay(topic, mqRetryMessage, mqRetryMessage.getDelayLevel());
}
public void sendRetry(String topic, MqRetryMessage mqRetryMessage, SendCallback sendCallback){
sendDelay(topic, mqRetryMessage, sendCallback, mqRetryMessage.getDelayLevel());
}
public void sendDelay(String topic, T message, SendCallback sendCallback, MqDelayLevel delayLevel){
rocketMqTemplate.asyncSend(topic, MqMessage.builder().body(message).build(), sendCallback, DEFAULT_TIMEOUT, delayLevel.getLevel());
}
public void send(String topic, T message, SendCallback sendCallback){
rocketMqTemplate.asyncSend(topic, MqMessage.builder().body(message).build(), sendCallback, DEFAULT_TIMEOUT, 0);
}
public void sendDelay(String topic, T message, MqDelayLevel delayLevel){
sendDelay(topic, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("发送消息成功 {}", sendResult);
}
@Override
public void onException(Throwable throwable) {
log.error("发送消息失败", throwable);
}
}, delayLevel);
}
public void send(String topic, T message){
send(topic, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("发送消息成功 {}", sendResult);
}
@Override
public void onException(Throwable throwable) {
log.error("发送消息失败", throwable);
}
});
}
public void sendRocketDelay(String topic, T message,String from, MqDelayLevel delayLevel){
sendDelay(topic, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("{} 发送消息成功 {}",from, sendResult);
}
@Override
public void onException(Throwable throwable) {
log.error("{} 发送消息失败",from, throwable);
}
}, delayLevel);
}
public void sendRocket(String topic, T message,String from){
send(topic, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("{} 发送消息成功 {}",from, sendResult);
}
@Override
public void onException(Throwable throwable) {
log.error("{} 发送消息失败",from, throwable);
}
});
}
}
延时级别:同原有的配置,只是加了自己需要的时间
import lombok.AllArgsConstructor;
import lombok.Getter;
@AllArgsConstructor
@Getter
public enum MqDelayLevel {
//1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 1d 1w 1m 1y
ONE_SEC(1),
FIVE_SEC(2),
TEN_SEC(3),
THIRTY_SEC(4),
ONE_MIN(5),
TWO_MIN(6),
THREE_MIN(7),
FOUR_MIN(8),
FIVE_MIN(9),
SIX_MIN(10),
SEVEN_MIN(11),
EIGHT_MIN(12),
NINE_MIN(13),
TEN_MIN(14),
TWENTY_MIN(15),
THIRTY_MIN(16),
ONE_HOUR(17),
TWO_HOUR(18),
ONE_DAY(19),
ONE_WEEK(20),
ONE_MonTH(21),
ONE_YEAR(22)
;
private int level;
}
MQ消息封装类:
import lombok.AllArgsConstructor; import lombok.Builder; import lombok.NoArgsConstructor; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; @AllArgsConstructor @NoArgsConstructor @Builder public class MqMessageimplements Message { private T body; private MessageHeaders headers; @Override public T getPayload() { return body; } @Override public MessageHeaders getHeaders() { return headers; } }
延时尝试类:
import com.fasterxml.jackson.annotation.JsonFormat; import com.fasterxml.jackson.annotation.JsonIgnore; import com.weibu.cloud.common.utils.DateUtil; import com.weibu.cloud.common.utils.uuid.UUID; import lombok.Getter; import lombok.ToString; import lombok.extern.slf4j.Slf4j; import java.io.Serializable; import java.time.LocalDateTime; @Getter @ToString @Slf4j public class MqRetryMessageimplements Serializable { private static final long serialVersionUID = 408371795120089055L; private final static MqDelayLevel[] DEFAULT_DELAY_LEVEL = { MqDelayLevel.ONE_MIN , MqDelayLevel.FIVE_MIN , MqDelayLevel.TEN_MIN , MqDelayLevel.THIRTY_MIN , MqDelayLevel.ONE_HOUR , MqDelayLevel.TWO_HOUR }; private final String retryId; private final int retryTimes; @JsonFormat(pattern = DateUtil.PATTERN) private final LocalDateTime startTime; private final int maxAttempts; private MqDelayLevel[] delayLevelList = DEFAULT_DELAY_LEVEL; private final T body; public MqRetryMessage(){ this(null, 6); } public MqRetryMessage(T body){ this(body, 6); } public MqRetryMessage(T body, int maxAttempts){ this(body, maxAttempts, 0, null, UUID.randomUUID().toString()); } private MqRetryMessage(T body, int maxAttempts, int retryTimes, MqDelayLevel[] delayLevelList, String retryId){ this.retryTimes = retryTimes; this.maxAttempts = maxAttempts; this.startTime = LocalDateTime.now(); this.body = body; this.retryId = retryId; this.setDelayLevelList(delayLevelList); } public boolean canRetry(){ //maxAttempt 为0时无限制 if(maxAttempts == 0){ return true; } return retryTimes + 1 <= maxAttempts; } public MqRetryMessage nextRetry(){ if(!canRetry()){ throw new IllegalStateException("重试次数已到达最大无法重试"); } return new MqRetryMessage<>(body, maxAttempts, retryTimes + 1, delayLevelList, retryId); } @JsonIgnore public MqDelayLevel getDelayLevel(){ int index = retryTimes; if(retryTimes + 1 > delayLevelList.length){ index = delayLevelList.length - 1; } return delayLevelList[index]; } public void setDelayLevelList(MqDelayLevel[] delayLevelList) { if(delayLevelList != null && delayLevelList.length > 0) { this.delayLevelList = delayLevelList; } } }
MQ消息生产者:
import com.alibaba.fastjson.JSONObject;
import com.weibu.cloud.common.constant.OrderEnum;
import com.weibu.cloud.common.feign.user.UserApi;
import com.weibu.cloud.common.mq.MqSender;
import com.weibu.cloud.common.mq.RocketMqConfig;
import com.weibu.cloud.common.utils.DateUtil;
import com.weibu.cloud.trade.dto.OrderStatisticalItem;
import com.weibu.cloud.trade.dto.UserBrief;
import com.weibu.cloud.trade.entity.WbOrder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
@Slf4j
@Component
public class PayedOrderStatisticalProducer {
@Autowired
private MqSender mqSender;
@Autowired
private UserApi userApi;
public void send(OrderStatisticalItem item){
mqSender.send(RocketMqConfig.ORDER_STATISTICAL_TOPIC, item, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("statistical send success.sendResult:{}", JSONObject.toJSonString(sendResult));
}
@Override
public void onException(Throwable e) {
log.error("statistical send failed . :{}",e);
}
});
}
@Async
public void sendList(List list){
log.info("PayedOrderStatisticalProducer.sendList -> list:{}",list);
if (CollectionUtils.isEmpty(list)){
return;
}
list.forEach(wb -> {
OrderStatisticalItem item = entityToItem(wb);
this.send(item);
});
}
public OrderStatisticalItem entityToItem(WbOrder entity){
OrderStatisticalItem item = new OrderStatisticalItem();
item.setOrderType(queryEnumType(entity.getOrderType()));
item.setPayedAmount(entity.getAmountRealPay());
item.setPayedDate(DateUtil.localDateTimeTransLocalDate(entity.getCreateTime()));
item.setPayFlag(true);
List list = new ArrayList<>(2);
list.add(UserBrief.build(entity.getAgentId(),2));
list.add(UserBrief.build(entity.getMerchantId(),3));
item.setUserIds(list);
return item;
}
private OrderEnum.OrderType queryEnumType(Integer orderType){
switch (orderType){
case 0:
return OrderEnum.OrderType.ALL;
case 1:
return OrderEnum.OrderType.AGENT;
case 2:
return OrderEnum.OrderType.BIZ;
case 3:
return OrderEnum.OrderType.MALL;
}
return OrderEnum.OrderType.BIZ;
}
}
MQ消息消费者:
import com.alibaba.fastjson.JSONObject;
import com.weibu.cloud.common.mq.RocketMqConfig;
import com.weibu.cloud.trade.dto.OrderStatisticalItem;
import com.weibu.cloud.trade.service.impl.AsyncStatisticalService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RocketMQMessageListener(
topic = RocketMqConfig.ORDER_STATISTICAL_TOPIC,
consumerGroup = RocketMqConfig.ORDER_STATISTICAL_GROUP,
consumeMode = ConsumeMode.ORDERLY, //并行处理,默认并行;ORDERLY 按顺序处理
selectorexpression = "*")
public class PayedOrderStatisticalConsumer implements RocketMQListener {
@Autowired
private AsyncStatisticalService asyncStatisticalService;
@Override
public void onMessage(OrderStatisticalItem item) {
log.info("PayedOrderStatisticalConsumer -> item:{}", JSONObject.toJSonString(item));
statisticalOrder(item);
}
//重试机制
//@Retryable(value = Throwable.class, backoff = @Backoff(value = 600000L, delay = 300000L, multiplier = 5, maxDelay = 3600000L))
@Retryable(value = Throwable.class, backoff = @Backoff(delay = 60000L, multiplier = 2), recover = "liquidationNotifyFailed")
public void statisticalOrder(OrderStatisticalItem item){
log.info("statisticalOrder -> item:{}",JSONObject.toJSonString(item));
asyncStatisticalService.updateUserOrderStatistical(item.getUserIds(),item.getPayedAmount(),item.getPayedDate(),item.getOrderType(),item.getPayFlag());
}
}



