- 要求: 方法调用的过程中。失败的时候,系统有办法进行自动重试,重试达到一定次数后,钉钉通知开发
- 实现设计: 注解,反射,定时任务
– 补偿表
CREATE TABLE `a_compensation` ( `id` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT '数据库主键', `busKey` VARCHAR(255) NOT NULL COMMENT '业务主键', `type` VARCHAR(20) NOT NULL DEFAULT 'auto' COMMENT '处理方式:自动(auto),manual(人工介入)', `dataStatus` TINYTEXT NOT NULL COMMENT '数据状态:0待处理,1:成功,2:失败', `className` VARCHAR(200) NOT NULL COMMENT '完整类名', `beanName` VARCHAR(100) NOT NULL COMMENT '类名', `methodName` VARCHAR(100) NOT NULL COMMENT '方法名', `reqArgsType` VARCHAR(500) NOT NULL COMMENT '方法入参类型', `reqArgs` TEXT NOT NULL COMMENT '方法入参参数', `reqArgsTypeReal` VARCHAR(500) DEFAULT NULL COMMENT '实际参数类型', `retryCount` BIGINT(10) NOT NULL COMMENT '重试次数', `resultMsg` VARCHAR(2000) DEFAULT NULL COMMENT '返回信息', `gmtCreate` TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) COMMENT '创建时间', `gmtModified` TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) COMMENT '更新时间', `isValid` TINYINT(4) NOT NULL DEFAULT '1' COMMENT '是否有效', PRIMARY KEY (`id`) ) ENGINE=INNODB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;代码
- 注解
@documented
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Compensate {
Class>[] exceptionType() default Exception.class;
}
- 切面类
@Aspect
@Component
public class AnnotationCompensateEntry {
private static final Logger logger = LoggerFactory.getLogger(AnnotationCompensateEntry.class);
@Autowired
private CompensationService compensationService;
@AfterThrowing(pointcut = "@annotation(com.base.annotation.Compensate)", throwing = "e")
public void aopCompensate(JoinPoint joinPoint, Exception e) {
try {
// 1:获取需要的数据,并且进行验证
Method targetMethod = ((MethodSignature) joinPoint.getSignature()).getMethod();
Compensate compensate = targetMethod.getAnnotation(Compensate.class);
Object[] args = joinPoint.getArgs();
if (args.length == 0) {
logger.info("无参方法无法补偿!");
return;
}
//异常类型
Class>[] exceptionTypes = compensate.exceptionType();
for (Class> exceptionType : exceptionTypes) {
// exceptionType 是 e.getClass() 的父类或者一样的。
if (exceptionType.isAssignableFrom(e.getClass())) {
saveCompensationDO(e, joinPoint);
}
}
} catch (Throwable throwable) {
// 异常需要抛出。要不然没办法,业务代码依赖这个报错的信息会被干扰。
logger.error(throwable.getMessage(), throwable);
Throwables.propagateIfPossible(throwable);
}
}
public void saveCompensationDO(Exception e, JoinPoint joinPoint) throws JsonProcessingException {
// 1:获取切面对象的参数
MethodInfo methodInfo = new MethodInfo(joinPoint);
methodInfo.init();
// 2:构建需要补偿的对象
CompensationEntity compensationDO = buildCompensationDO(methodInfo, e);
// 3: 上面根据的数据,得保证是满足数据的完整性。然后进行数据的储存入库。然后结束。
List compensationEntities = compensationService.findByProperty(CompensationEntity.class, "busKey", compensationDO.getBusKey());
if (CollectionUtils.isEmpty(compensationEntities)) {
compensationService.save(compensationDO);
}
}
private CompensationEntity buildCompensationDO(MethodInfo methodInfo, Exception e) {
// 1:存入通用的参数
CompensationEntity compensationDO = new CompensationEntity();
Annotation[] annotations= methodInfo.getTargetClass().getAnnotations();
String springBeanName="";
for (Annotation annotation : annotations) {
if (annotation.annotationType().equals(Service.class)){
springBeanName=methodInfo.getTargetClass().getAnnotation(Service.class).value();
}
if (annotation.annotationType().equals(Component.class)){
springBeanName=methodInfo.getTargetClass().getAnnotation(Component.class).value();
}
}
if (StringUtil.isEmpty(springBeanName)){
springBeanName=methodInfo.getTargetClass().getSimpleName();
}
compensationDO.setBeanName(springBeanName);
compensationDO.setClassName(methodInfo.getClassName());
compensationDO.setMethodName(methodInfo.getMethodName());
compensationDO.setDataStatus(StatusEnum.StatusEnum_INIT.getCode());
compensationDO.setRetryCount(0);
compensationDO.setResultMsg(e.getMessage());
compensationDO.setType(HandleMethodEnum.AUTO.getDes());
// 2:处理请求参数类型
// 一:消息无转换,说明入参是一样的。直接存。 1: 单个->类型 2:多个->数组 3. 无参不处理
Object[] args = methodInfo.getArgs();
ValidateUtils.isFalse(args.length == 0, "无参方法不允许使用补偿注解!");
if (args.length == 1) {
//获取方法参数类型数组
compensationDO.setReqArgs(methodInfo.getJsonArgs());
compensationDO.setReqArgsType(args[0].getClass().getName());
}
if (args.length > 1) {
Class>[] classesReq = methodInfo.getMethod().getParameterTypes();
StringBuilder stringBuffer = new StringBuilder();
for (Class> aClass : classesReq) {
stringBuffer.append(aClass.getName()).append(CommonConstants.Public.COMMA);
}
compensationDO.setReqArgs(methodInfo.getJsonArgs());
compensationDO.setReqArgsType(Object[].class.getName());
compensationDO.setReqArgsTypeReal(stringBuffer.substring(0, stringBuffer.length() - 1));
}
compensationDO.setBusKey(compensationDO.createBusKey());
return compensationDO;
}
}
- 辅助类
@Data
public class MethodInfo {
private JoinPoint joinPoint;
private MethodSignature signature;
private Method method;
private Class> targetClass;
private Object[] args;
private String jsonArgs;
private String className;
private String methodName;
public MethodInfo(JoinPoint joinPoint) {
this.joinPoint = joinPoint;
}
public void init() {
this.signature = (MethodSignature) joinPoint.getSignature();
this.method = signature.getMethod();
this.methodName = method.getName();
this.targetClass = method.getDeclaringClass();
this.className = targetClass.getName();
this.args = joinPoint.getArgs();
if (args.length == 1) {
this.jsonArgs = JsonUtil.toJsonString(args[0]);
} else {
this.jsonArgs = JsonUtil.toJsonString(args);
}
}
}
public class CompensationEntity {
private Integer id;
private String busKey;
private String type;
private String dataStatus;
private String className;
private String beanName;
private String methodName;
private String reqArgsType;
private String reqArgs;
private String reqArgsTypeReal;
private Integer retryCount;
private String resultMsg;
private Date gmtCreate = new Date();
private Date gmtModified = new Date();
private Boolean isValid = Boolean.TRUE;
@Id
@GeneratedValue(generator = "generator")
@GenericGenerator(name = "generator", strategy = "native")
@Column(name = "Id")
public Integer getId() {
return id;
}
public String createBusKey() {
return MD5Util.string2MD5(this.getClassName() + this.getMethodName() + this.getReqArgs());
}
}
public class JsonUtil {
public static ObjectMapper mapper;
static {
mapper = new ObjectMapper();
//设置Jackson序列化时只包含不为空的字段
mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
//设置在反序列化时忽略在JSON字符串中存在,而在Java中不存在的属性
mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
}
public static String toPrettyJsonString(Object value) {
//打印原始格式不做格式化
return toJsonString(value);
}
public static String toJsonString(Object value) {
String jsonString;
try {
jsonString = mapper.writevalueAsString(value);
}catch (JsonProcessingException e){
throw new BusinessException("Json序列化失败",false);
}
return jsonString;
}
public static T fromJsonString(String content,Class valueType) {
T t;
try {
t = mapper.readValue(content,valueType);
} catch (IOException e) {
throw new BusinessException("Json反序列化失败n" + GatewayUtil.getStackTraceAsString(e),false);
}
return t;
}
}
- 补偿那块的逻辑实现
@Service
@Transactional
public class CompensationServiceImpl extends CommonServiceImpl implements CompensationService {
private static final Logger logger = LoggerFactory.getLogger(CompensationServiceImpl.class);
public final static Integer RETRY_LIMIT_COUNT = 3;
@Resource
CompensationMiniDao compensationMiniDao;
@Override
public String autoRetry() {
// 1: 查询所有需要重试的记录,数据入库的时候重试次数为0
StringBuilder stringBuffer = new StringBuilder();
List list = compensationMiniDao.getRetryList();
stringBuffer.append("待补偿数据:").append(list.size()).append("条!");
int successCount = 0;
int failCount = 0;
for (CompensationEntity compensationEntity : list) {
int retryCount = compensationEntity.getRetryCount() + 1;
compensationEntity.setRetryCount(retryCount);
try {
// 调用需要重试的方法
boolean callStatus = callTargetMethod(compensationEntity);
// 补偿成功
if (callStatus) {
compensationEntity.setResultMsg(CommonConstants.Public.SUCCESS);
compensationEntity.setDataStatus(StatusEnum.StatusEnum_SUCCESS.getCode());
successCount++;
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
compensationEntity.setResultMsg(e.getMessage());
if (RETRY_LIMIT_COUNT == retryCount) {
compensationEntity.setDataStatus(StatusEnum.StatusEnum_FAIL.getCode());
}
failCount++;
}
this.updateBySqlString(buildUpdateSql(compensationEntity));
}
stringBuffer.append("处理成功:").append(successCount).append("条!");
stringBuffer.append("处理失败:").append(failCount).append("条!");
return stringBuffer.toString();
}
@Override
public String manualRetry(Integer id) {
// 1: 定损需要处理的对象和返回的参数
CompensationEntity dealObj = null;
String result = CommonConstants.Public.SUCCESS;
try {
// 2: 根据id查询数据库
List dealObjList = this.findByProperty(CompensationEntity.class, "id", id);
ValidateUtils.notEmpty(dealObjList, String.format("根据id:%s,查询返回的对象为空", id));
dealObj = dealObjList.get(0);
int retryCount = dealObj.getRetryCount() + 1;
dealObj.setRetryCount(retryCount);
dealObj.setType(HandleMethodEnum.MANUAL.getMsg());
boolean handleResult = callTargetMethod(dealObj);
if (handleResult) {
dealObj.setDataStatus(StatusEnum.StatusEnum_SUCCESS.getCode());
}
} catch (Exception e) {
logger.error(e.getCause().getMessage());
result = e.getCause().getMessage();
}
// 3: 操作保存数据,如果依然是失败只改重试的次数,不修改数据的状态
if (Objects.nonNull(dealObj)) {
dealObj.setResultMsg(result);
this.updateBySqlString(buildUpdateSql(dealObj));
}
return result;
}
private String buildUpdateSql(CompensationEntity compensationEntity) {
StringBuilder stringBufferUpdate = new StringBuilder("update a_compensation set dataStatus='");
stringBufferUpdate.append(compensationEntity.getDataStatus()).append("'").append(CommonConstants.Public.COMMA);
stringBufferUpdate.append(" retryCount=").append(compensationEntity.getRetryCount()).append(CommonConstants.Public.COMMA);
stringBufferUpdate.append(" type=").append(""").append(compensationEntity.getType()).append(""");
if (StringUtil.isNotEmpty(compensationEntity.getResultMsg())) {
stringBufferUpdate.append(CommonConstants.Public.COMMA);
stringBufferUpdate.append("resultMsg=").append(""").append(compensationEntity.getResultMsg()).append(""");
}
stringBufferUpdate.append(" where id=").append(compensationEntity.getId());
return stringBufferUpdate.toString();
}
@Override
public String analysisData() {
List allCompensationEntityData = this.getList(CompensationEntity.class);
StringBuilder dataStr = new StringBuilder(String.format("补偿表数据总量为:%s条!", allCompensationEntityData.size()));
if (CollectionUtils.isNotEmpty(allCompensationEntityData)) {
Map> resultList = allCompensationEntityData.stream().collect(Collectors.groupingBy(CompensationEntity::getDataStatus));
resultList.forEach((k, v) -> {
StatusEnum statusEnum = StatusEnum.getStatusEnumByCode(k);
ValidateUtils.notNull(statusEnum, String.format("补偿表钉钉提醒的时候,查询的数据状态有问题,状态为:%s", k));
dataStr.append(statusEnum.getMsg()).append(":").append(v.size()).append("条!");
});
}
return dataStr.toString();
}
private boolean callTargetMethod(CompensationEntity compensationEntity) throws Exception {
// 1: 通过类找到对应的方法
Method targetMethod = null;
try {
Class> targetClass = Class.forName(compensationEntity.getClassName());
ValidateUtils.notNull(targetClass, "补偿时未获取到对应的class bean");
Method[] methods = targetClass.getMethods();
for (Method method : methods) {
// 匹配方法
if (matchMethod(method, compensationEntity)) {
targetMethod = method;
break;
}
}
} catch (ClassNotFoundException e) {
logger.error(e.getMessage(), e);
throw e;
}
try {
//补偿的目标方法中的参数类型
ValidateUtils.notNull(targetMethod, "补偿时未获取到对应的bean需要调用的目标方法");
// 2: 回去方法的参数类型
Class>[] paramTypes = targetMethod.getParameterTypes();
// 3: 从spring 取的对象
Object targetObject = SpringUtils.getBean(StringUtil.firstLowerCase(compensationEntity.getBeanName()));
if (paramTypes.length == 1) {
// 转成实际类型调用目标方法
Object obj = JsonUtil.mapper.readValue(compensationEntity.getReqArgs(), paramTypes[0]);
targetMethod.invoke(targetObject, obj);
} else if (paramTypes.length > 1) {
List
- 定时任务类
@Component
public class CompensateDataSchedulerTask {
private static final Logger logger = LoggerFactory.getLogger(CompensateDataSchedulerTask.class);
@Resource
CompensationService compensationService;
@Resource
MessageReminderHandler messageReminderHandler;
@Autowired
private UtilDictKVService utilDictKVService;
public final static String INTERFACE_NAME = "通用补偿接口";
@Scheduled(initialDelay = 10000, fixedDelay = 300000)
public void retry() {
if (exeScheduledIP()) {
logger.info("开始进行补偿!");
String dealResult = compensationService.autoRetry();
logger.info(dealResult);
}
}
@Scheduled(cron = "0 15 10 ? * *")
public void dingDingCompensationRemind() {
if (exeScheduledIP()) {
// 获取需要发送的 内容
String env = messageReminderHandler.getEnv();
String remindStr = compensationService.analysisData();
DingDingMsgVO dingDingMsgVO = messageReminderHandler.buildDingDingMsgVO(INTERFACE_NAME, "环境:" + env + "!" + remindStr, DictCodeKeyEnum.DING_DING_COMPENSATE_MOBILE);
String sendResult = messageReminderHandler.dingDingSend(DictCodeKeyEnum.DING_DING_COMPENSATE_URL, dingDingMsgVO);
logger.info("CompensateDataSchedulerTask->dingDingRemind->sendResult:" + sendResult);
}
}
private Boolean exeScheduledIP() {
UtilDictKVEntity utilDictKVEntity = utilDictKVService.getKvsByDictCodeKey(DictCodeKeyEnum.CURRENT_COMPENSATE_IP.getCode(), DictCodeKeyEnum.CURRENT_COMPENSATE_IP.getKey(), 3600L);
ValidateUtils.notNull(utilDictKVEntity, "未配置执行定时任务的");
String localIp = null;
try {
InetAddress ia = InetAddress.getLocalHost();
localIp = ia.getHostAddress();
} catch (UnknownHostException e) {
logger.error(e.getMessage(), e);
return false;
}
return localIp.equals(utilDictKVEntity.getDictValue());
}
}



