许多需求计算量都在扩大,比如合同下的门店会有三四千个,计算这些门店的数据在进行聚合,对于服务的内存和接口执行时间有着很大的影响。
针对越来越大容量、并发高的接口或者其他计算方法,同一时间在运行的计算维度进行限制,比要计算门店设备,服务最多支持10000个门店同时在计算,相当于把资源到计算的对象维度。基于这个原因,作者编写了一个计算限流工具。
不同于参数限制,工具针对的是服务所有线程对于该计算维度的限流。
二、切面工具类 1、注解要设置针对的方法、对象、参数和限制数量
当限制的计算资源就在入参中,paramName就不用设置了,否则就需要在对象中取出针对的参数
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface CountLimit {
String methodName();
String objectName();
String paramName();
int limit();
}
2、切面
切面依赖redis,将计算资源的数量进行比较看是否超过限制。
当计算完成之后再更改redis资源,让其他线程的计算任务可以正常执行。
当然在过程中需要保证redis的比较与资源数量设置的幂等性,采用了作者封装的另外一个工具:分布式代理锁。有兴趣的同学可以看看Redis分布式代理锁的两种实现_tingmailang的博客-CSDN博客,代理锁工具是基于Redisson通过两种方式实现代理分布式锁:
1、ThreadLocal线程缓存 + AOP切面
2、AOP切面 + 入参固定
这里使用的是ThreadLocal线程缓存 + AOP切面
针对计算资源超出限制,作者示例是做一个等待,超出一定时间再进行打断,在等待过程中尝试进入计算。
@Slf4j
@Aspect
@Component
public class CountLimitAspect {
@Resource
private RedissonClient redissonClient;
@Pointcut("@annotation(com.enmonster.platform.amb.aspect.querylimit.annotation.CountLimit)")
public void lockPointCut() {
}
@Around("lockPointCut() && @annotation(countLimit)")
public Object around(ProceedingJoinPoint joinPoint, CountLimit countLimit) throws Throwable {
LocalDateTime start = LocalDateTime.now();
String inter = countLimit.methodName();
String objectName = countLimit.objectName();
String par = countLimit.paramName();
Object[] args = joinPoint.getArgs();
String[] paramNames = ((CodeSignature) joinPoint.getSignature()).getParameterNames();
Map param = new HashMap<>();
for (int i = 0; i < paramNames.length; i++) {
param.put(paramNames[i], args[i]);
}
//获取限制的参数
List queryPar;
String key;
int count;
try {
if (Objects.isNull(par)) {
//如果没有设置参数,说明在入参中
queryPar = (List) param.get(objectName);
} else {
//说明在入参的某个对象中,有一个参数是进行限流
Object o = param.get(objectName);
queryPar = (List) this.getFieldValueByName(par, o);
}
count = queryPar.size();
key = inter + par;
LockUtil.set(key);
while (!this.checkExceed(key, count, countLimit.limit())) {
//是否超出等待时间
if (start.plusSeconds(countLimit.waitTime()).isBefore(LocalDateTime.now())) {
throw new BusinessException(ErrorCodeEnum.EXCEED_LIMIT_COUNT_ERROR);
}
//将等待时长划分为20份
Thread.sleep(countLimit.waitTime() * 1000 / 20);
}
} catch (Exception e) {
throw new BusinessException(ErrorCodeEnum.EXCEED_LIMIT_COUNT_ERROR, e.getMessage());
}
try {
return joinPoint.proceed();
} finally {
LockUtil.set(key);
this.reduce(key, count);
}
}
@RedisLock(key = RedisConsts.QUERY_LIMIT_LOCK, atuoRemove = true)
public boolean checkExceed(String key, int count, int limit) {
RBucket bucket = redissonClient.getBucket(RedisConsts.QUERY_LIMIT_COUNT + key);
int now = 0;
if (bucket.isExists()) {
now = bucket.get();
}
if (now + count > limit) {
return false;
} else {
bucket.set(now + count);
return true;
}
}
@RedisLock(key = RedisConsts.QUERY_LIMIT_LOCK, atuoRemove = true)
public void reduce(String key, int count) {
RBucket bucket = redissonClient.getBucket(RedisConsts.QUERY_LIMIT_COUNT + key);
int now = bucket.get();
bucket.set(now - count);
}
private static Object getFieldValueByName(String fieldName, Object o) {
try {
String firstLetter = fieldName.substring(0, 1).toUpperCase();
String getter = "get" + firstLetter + fieldName.substring(1);
Method method = o.getClass().getMethod(getter, new Class[]{});
Object value = method.invoke(o, new Object[]{});
return value;
} catch (Exception e) {
log.error("获取属性值失败!" + e, e);
}
return null;
}
}
三、使用
这里作者使用一个接口做示例,其实任何一个计算方法都可以使用,加@CountLimit注解,把限流参数填一下就可以。
@ApiOperation("测试计算限流")
@ApiResponses(@ApiResponse(code = HttpStatus.SC_OK, message = "测试计算限流"))
@PostMapping("/count-limit")
@MethodLogger
@CountLimit(interfaceName = "countLimit", objectName = "request", paramName = "shopIdList", limit = 10000)
public BaseResponse countLimit(@RequestBody CheckDeviceBaseRequestDTO request) {
return BaseResponse.createSuccessResult(offLineReportFacade.closeBill(request));
}
四、总结
目前对于计算资源的需求越来越大,很多需求上线之前根本没法估算会遇到多大的qps,也就没法知道到底有多少资源同时在计算,有兴趣的同学可以试试作者的计算限流工具,至少不要让服务崩掉。



