项目会使用sentinel,架构师意见再引用hystrix太重了,所以我这里就自己写了一套请求合并
OK不OK等过个一年半载再来看吧(如果到时候这篇帖子还在,那多半是OK的了),可能后续会再回头来优化也不一定
核心逻辑
2. 代码 自定义注解RequestCollapser
- 定义注解 RequestCollapser,标记哪些查询需要做请求合并
- 编写对应的批量查询方法,返回值需要按Map形式固定(见示例代码)
- 定义AOP对 RequestCollapser注解的方法进行 拦截,将固定毫秒数内的请求全部拦截下来放入一个容器中
- 定义一个 schedule 线程每X毫秒(这里给的50)轮询一次存放请求的容器,做批量查询操作
- 判断请求数,大于 maxSize 的,立即执行批量查询操作
- 通过 CompletableFuture.complete 操作,将数据返回到对应线程
- 对应请求的线程用 futureRest.get(maxWaitMillisecond, TimeUnit.MILLISECONDS); 获取批量查询返回的结果;超过 maxWaitMillisecond 毫秒未拿到结果,则放行,走对应service方法去数据库查询
package com.mea.pay.transaction.annotation;
import java.lang.annotation.*;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface RequestCollapser {
String batchMethod();
//
//long timerDelayInMilliseconds() default 50L;
int maxRequest() default 1000;
boolean requestCache() default false;
}
封装请求 CollasperRequestInfo
package com.mea.pay.transaction.helper;
import com.mea.pay.transaction.annotation.RequestCollapser;
import lombok.Getter;
import lombok.Setter;
import java.lang.reflect.Method;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@Getter
@Setter
public class CollasperRequestInfo {
private final ConcurrentHashMap
切面 RequestCollapserAspect 拦截service方法调用
package com.mea.pay.transaction.aspect;
import com.mea.pay.common.exception.BusinessException;
import com.mea.pay.transaction.annotation.RequestCollapser;
import com.mea.pay.transaction.helper.CollasperRequestInfo;
import com.mea.pay.transaction.task.RequestCollapserTask;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
import java.lang.reflect.Method;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@Slf4j
@Aspect
@SuppressWarnings({"unchecked", "rawtypes"})
@Configuration
public class RequestCollapserAspect {
private static final ReadWriteLock RWLOCK = new ReentrantReadWriteLock();
public static final Lock READ_LOCK = RWLOCK.readLock();
public static final Lock WRITE_LOCK = RWLOCK.writeLock();
private static final Map METHOD_CONTAINER = new ConcurrentHashMap(16);
@Value("${transaction.request-collapser.execute-immediately-threshold-value: 500}")
private long maxSize;
@Value("${transaction.request-collapser.execute-interval-millisecond: 50}")
private int executeIntervalMillisecond;
@Value("${transaction.request-collapser.max-wait-millisecond: 500}")
private int maxWaitMillisecond;
private ScheduledThreadPoolExecutor requestCollapserExecutor;
@PostConstruct
void initBatchSelectThreadPool() {
// 如果阻塞了?单次执行时间超过50毫秒呢?
requestCollapserExecutor = new ScheduledThreadPoolExecutor(1,
(r) -> new Thread(r, "request_collapser_pool_" + r.hashCode()),
(r, e) -> log.error("execution is blocked because the thread bounds and queue capacities are reached")
);
requestCollapserExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(true);
// 每50毫秒执行一次 RequestCollapserTask 任务
int initialDelayMillisecond = 1000;
requestCollapserExecutor.scheduleAtFixedRate(new RequestCollapserTask(METHOD_CONTAINER),
initialDelayMillisecond,
executeIntervalMillisecond,
TimeUnit.MILLISECONDS);
}
@Around("execution(* com.mea.pay.transaction.service..*.*(..)) && @annotation(com.mea.pay.transaction.annotation.RequestCollapser)")
public Object around(ProceedingJoinPoint pjp) throws Throwable {
Object[] args = pjp.getArgs();
if (args == null || args.length != 1) {
throw new BusinessException(" the multi-parameter method is not supported temporary ");
}
String needRquestCollasperMethod = pjp.getSignature().toShortString();
// 取请求合并的缓存
CollasperRequestInfo collasperRequestInfo = getCollasperRequestInfo(pjp, needRquestCollasperMethod);
ConcurrentHashMap> requestContainer = collasperRequestInfo.getRequestContainer();
// 如果累积的请求数在 executeIntervalMillisecond 毫秒内大于了 MAX_SIZE 个,则立即执行批量查询
if (requestContainer.mappingCount() >= maxSize) {
// 注:这里执行后,下次执行的时间间隔可能小于 executeIntervalMillisecond 毫秒 - 这个问题暂时不处理,没有想到好的处理方式
//System.out.println("=====立即执行");
requestCollapserExecutor.execute(new RequestCollapserTask(METHOD_CONTAINER));
}
CompletableFuture future = new CompletableFuture();
RequestCollapserAspect.READ_LOCK.lock();
try {
requestContainer.putIfAbsent(args[0], future);
} finally {
RequestCollapserAspect.READ_LOCK.unlock();
}
Object resObject = null;
// 这里需要从容器中获取 futureRest,不能直接用前面的 future,因为存在多个线程使用同一个查询条件的情况,这时只有一个线程能成功执行 requestContainer.putIfAbsent
CompletableFuture futureRest = requestContainer.get(args[0]);
try {
resObject = futureRest.get(maxWaitMillisecond, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
log.error("future.get exception:" + e.getMessage());
}
if (resObject != null) {
// 正常拿到批量查询结果,直接返回
return resObject;
}
// 等待超时,直接执行service方法,去数据库查询
return pjp.proceed();
}
private CollasperRequestInfo getCollasperRequestInfo(ProceedingJoinPoint pjp, String needRquestCollasperMethod) throws NoSuchMethodException {
CollasperRequestInfo collasperRequestInfo = METHOD_CONTAINER.get(needRquestCollasperMethod);
if (collasperRequestInfo == null) {
RequestCollapser requestCollapser = ((MethodSignature) pjp.getSignature()).getMethod().getAnnotation(RequestCollapser.class);
Object target = pjp.getTarget();
Class> targetClass = target.getClass();
String batchMethodName = requestCollapser.batchMethod();
Method batchMethod = targetClass.getDeclaredMethod(batchMethodName, List.class);
METHOD_CONTAINER.putIfAbsent(needRquestCollasperMethod, new CollasperRequestInfo(target, batchMethod, requestCollapser));
// methodContainer.putIfAbsent 无论是否执行成功,这里一定有值了
collasperRequestInfo = METHOD_CONTAINER.get(needRquestCollasperMethod);
}
return collasperRequestInfo;
}
}
serviceImpl示例
package com.mea.pay.transaction.service.impl; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.mea.pay.transaction.annotation.RequestCollapser; import com.mea.pay.transaction.dos.PaymentRecordDO; import com.mea.pay.transaction.mapper.PaymentRecordMapper; import com.mea.pay.transaction.service.PaymentRecordService; import org.springframework.stereotype.Service; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.function.Function; import java.util.stream.Collectors; @Service public class PaymentRecordServiceImpl extends ServiceImpl3. 测试implements PaymentRecordService { @Override @RequestCollapser(batchMethod = "getPaymentRecordByIds") public PaymentRecordDO getPaymentRecordById(Long id) { return super.getById(id); } @Override public Map getPaymentRecordByIds(List ids) { List paymentRecords = super.listByIds(ids); if (paymentRecords == null) { return Collections.emptyMap(); } return paymentRecords.stream().collect(Collectors.toMap(PaymentRecordDO::getId, Function.identity())); } @Override @RequestCollapser(batchMethod = "getPaymentRecordByRequestIds") public PaymentRecordDO getPaymentRecordByRequestId(String requestId) { return super.getOne(new LambdaQueryWrapper () {{ eq(PaymentRecordDO::getRequestId, requestId); }}); } @Override public Map getPaymentRecordByRequestIds(List requestIds) { List records = super.list(new LambdaQueryWrapper () {{ in(PaymentRecordDO::getRequestId, requestIds); }}); if (records == null) { return Collections.emptyMap(); } return records.stream().collect(Collectors.toMap(PaymentRecordDO::getRequestId, Function.identity())); } }
我这里测试时设置的 maxSize 为 15
JMeter配置
这里并发量过大会出现如下异常,不过这个和服务器性能/最大并发量、tomcat线程数挂钩,这个先不管,先用着试试
Connection reset by peer: socket write error
OK,觉得有帮助的话点个赞吧



