栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

手写请求合并

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

手写请求合并

1. 前言

项目会使用sentinel,架构师意见再引用hystrix太重了,所以我这里就自己写了一套请求合并
OK不OK等过个一年半载再来看吧(如果到时候这篇帖子还在,那多半是OK的了),可能后续会再回头来优化也不一定

核心逻辑

  1. 定义注解 RequestCollapser,标记哪些查询需要做请求合并
  2. 编写对应的批量查询方法,返回值需要按Map形式固定(见示例代码)
  3. 定义AOP对 RequestCollapser注解的方法进行 拦截,将固定毫秒数内的请求全部拦截下来放入一个容器中
  4. 定义一个 schedule 线程每X毫秒(这里给的50)轮询一次存放请求的容器,做批量查询操作
  5. 判断请求数,大于 maxSize 的,立即执行批量查询操作
  6. 通过 CompletableFuture.complete 操作,将数据返回到对应线程
  7. 对应请求的线程用 futureRest.get(maxWaitMillisecond, TimeUnit.MILLISECONDS); 获取批量查询返回的结果;超过 maxWaitMillisecond 毫秒未拿到结果,则放行,走对应service方法去数据库查询
2. 代码 自定义注解RequestCollapser
package com.mea.pay.transaction.annotation;

import java.lang.annotation.*;


@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface RequestCollapser {

    
    String batchMethod();

    //
    //long timerDelayInMilliseconds() default 50L;

    
    int maxRequest() default 1000;

    
    boolean requestCache() default false;
}

封装请求 CollasperRequestInfo
package com.mea.pay.transaction.helper;

import com.mea.pay.transaction.annotation.RequestCollapser;
import lombok.Getter;
import lombok.Setter;

import java.lang.reflect.Method;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;


@Getter
@Setter
public class CollasperRequestInfo {

    
    private final ConcurrentHashMap> requestContainer;

    
    private Object target;

    
    private RequestCollapser requestCollapser;

    
    private Method batchMethod;

    
    private ConcurrentHashMap> batchSelectingData;

    
    private Long lastExecuteTime;

    public CollasperRequestInfo(Object target, Method batchMethod, RequestCollapser requestCollapser) {
        this.requestContainer = new ConcurrentHashMap<>(1500);
        this.batchSelectingData = new ConcurrentHashMap<>(1500);
        this.target = target;
        this.requestCollapser = requestCollapser;
        this.batchMethod = batchMethod;
    }
}

切面 RequestCollapserAspect 拦截service方法调用
package com.mea.pay.transaction.aspect;

import com.mea.pay.common.exception.BusinessException;
import com.mea.pay.transaction.annotation.RequestCollapser;
import com.mea.pay.transaction.helper.CollasperRequestInfo;
import com.mea.pay.transaction.task.RequestCollapserTask;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;
import java.lang.reflect.Method;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;


@Slf4j
@Aspect
@SuppressWarnings({"unchecked", "rawtypes"})
@Configuration
public class RequestCollapserAspect {

    private static final ReadWriteLock RWLOCK = new ReentrantReadWriteLock();
    public static final Lock READ_LOCK = RWLOCK.readLock();
    public static final Lock WRITE_LOCK = RWLOCK.writeLock();

    
    private static final Map METHOD_CONTAINER = new ConcurrentHashMap(16);

    
    @Value("${transaction.request-collapser.execute-immediately-threshold-value: 500}")
    private long maxSize;

    
    @Value("${transaction.request-collapser.execute-interval-millisecond: 50}")
    private int executeIntervalMillisecond;

    @Value("${transaction.request-collapser.max-wait-millisecond: 500}")
    private int maxWaitMillisecond;

    
    private ScheduledThreadPoolExecutor requestCollapserExecutor;

    
    @PostConstruct
    void initBatchSelectThreadPool() {
        // 如果阻塞了?单次执行时间超过50毫秒呢?
        requestCollapserExecutor = new ScheduledThreadPoolExecutor(1,
                (r) -> new Thread(r, "request_collapser_pool_" + r.hashCode()),
                (r, e) -> log.error("execution is blocked because the thread bounds and queue capacities are reached")
        );
        requestCollapserExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(true);
        // 每50毫秒执行一次 RequestCollapserTask 任务
        int initialDelayMillisecond = 1000;
        requestCollapserExecutor.scheduleAtFixedRate(new RequestCollapserTask(METHOD_CONTAINER),
                initialDelayMillisecond,
                executeIntervalMillisecond,
                TimeUnit.MILLISECONDS);
    }


    
    @Around("execution(* com.mea.pay.transaction.service..*.*(..)) && @annotation(com.mea.pay.transaction.annotation.RequestCollapser)")
    public Object around(ProceedingJoinPoint pjp) throws Throwable {

        Object[] args = pjp.getArgs();
        if (args == null || args.length != 1) {
            throw new BusinessException(" the multi-parameter method is not supported temporary ");
        }

        String needRquestCollasperMethod = pjp.getSignature().toShortString();
        // 取请求合并的缓存
        CollasperRequestInfo collasperRequestInfo = getCollasperRequestInfo(pjp, needRquestCollasperMethod);
        ConcurrentHashMap> requestContainer = collasperRequestInfo.getRequestContainer();
        // 如果累积的请求数在 executeIntervalMillisecond 毫秒内大于了 MAX_SIZE 个,则立即执行批量查询
        if (requestContainer.mappingCount() >= maxSize) {
            // 注:这里执行后,下次执行的时间间隔可能小于 executeIntervalMillisecond 毫秒 - 这个问题暂时不处理,没有想到好的处理方式
            //System.out.println("=====立即执行");
            requestCollapserExecutor.execute(new RequestCollapserTask(METHOD_CONTAINER));
        }

        CompletableFuture future = new CompletableFuture();
        RequestCollapserAspect.READ_LOCK.lock();
        try {
            requestContainer.putIfAbsent(args[0], future);
        } finally {
            RequestCollapserAspect.READ_LOCK.unlock();
        }

        Object resObject = null;
        // 这里需要从容器中获取 futureRest,不能直接用前面的 future,因为存在多个线程使用同一个查询条件的情况,这时只有一个线程能成功执行 requestContainer.putIfAbsent
        CompletableFuture futureRest = requestContainer.get(args[0]);
        try {
            resObject = futureRest.get(maxWaitMillisecond, TimeUnit.MILLISECONDS);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            e.printStackTrace();
            log.error("future.get exception:" + e.getMessage());
        }
        if (resObject != null) {
            // 正常拿到批量查询结果,直接返回
            return resObject;
        }
        // 等待超时,直接执行service方法,去数据库查询
        return pjp.proceed();
    }

    private CollasperRequestInfo getCollasperRequestInfo(ProceedingJoinPoint pjp, String needRquestCollasperMethod) throws NoSuchMethodException {
        CollasperRequestInfo collasperRequestInfo = METHOD_CONTAINER.get(needRquestCollasperMethod);
        if (collasperRequestInfo == null) {
            RequestCollapser requestCollapser = ((MethodSignature) pjp.getSignature()).getMethod().getAnnotation(RequestCollapser.class);
            Object target = pjp.getTarget();
            Class targetClass = target.getClass();
            String batchMethodName = requestCollapser.batchMethod();
            Method batchMethod = targetClass.getDeclaredMethod(batchMethodName, List.class);
            METHOD_CONTAINER.putIfAbsent(needRquestCollasperMethod, new CollasperRequestInfo(target, batchMethod, requestCollapser));
            // methodContainer.putIfAbsent 无论是否执行成功,这里一定有值了
            collasperRequestInfo = METHOD_CONTAINER.get(needRquestCollasperMethod);
        }
        return collasperRequestInfo;
    }
}
 
serviceImpl示例 
package com.mea.pay.transaction.service.impl;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.mea.pay.transaction.annotation.RequestCollapser;
import com.mea.pay.transaction.dos.PaymentRecordDO;
import com.mea.pay.transaction.mapper.PaymentRecordMapper;
import com.mea.pay.transaction.service.PaymentRecordService;
import org.springframework.stereotype.Service;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;


@Service
public class PaymentRecordServiceImpl extends ServiceImpl
        implements PaymentRecordService {

    
    @Override
    @RequestCollapser(batchMethod = "getPaymentRecordByIds")
    public PaymentRecordDO getPaymentRecordById(Long id) {
        return super.getById(id);
    }

    
    @Override
    public Map getPaymentRecordByIds(List ids) {
        List paymentRecords = super.listByIds(ids);
        if (paymentRecords == null) {
            return Collections.emptyMap();
        }
        return paymentRecords.stream().collect(Collectors.toMap(PaymentRecordDO::getId, Function.identity()));
    }

    
    @Override
    @RequestCollapser(batchMethod = "getPaymentRecordByRequestIds")
    public PaymentRecordDO getPaymentRecordByRequestId(String requestId) {
        return super.getOne(new LambdaQueryWrapper() {{
            eq(PaymentRecordDO::getRequestId, requestId);
        }});
    }

    
    @Override
    public Map getPaymentRecordByRequestIds(List requestIds) {
        List records = super.list(new LambdaQueryWrapper() {{
            in(PaymentRecordDO::getRequestId, requestIds);
        }});
        if (records == null) {
            return Collections.emptyMap();
        }
        return records.stream().collect(Collectors.toMap(PaymentRecordDO::getRequestId, Function.identity()));
    }
}
3. 测试

我这里测试时设置的 maxSize 为 15

JMeter配置



这里并发量过大会出现如下异常,不过这个和服务器性能/最大并发量、tomcat线程数挂钩,这个先不管,先用着试试
Connection reset by peer: socket write error

OK,觉得有帮助的话点个赞吧

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号