栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Java 数据分批调用接口的正确姿势

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Java 数据分批调用接口的正确姿势

一、背景

现实业务开发中,通常为了避免超时、对方接口限制等原因需要对支持批量的接口的数据分批调用。

比如List参数的size可能为 几十个甚至上百个,但是假如对方dubbo接口比较慢,传入50个以上会超时,那么可以每次传入20个,分批执行。

通常很多人会写 for 循环或者 while 循环,非常不优雅,无法复用,而且容易出错。

下面结合 Java8 的 Stream ,Function ,Consumer 等特性实现分批调用的工具类封装和自测。

并给出 CompletableFuture 的异步改进方案。

二、实现

工具类:

package com.chujianyun.common.java8.function;


import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.commons.collections4.CollectionUtils;

import java.util.*;
import java.util.function.Consumer;
import java.util.function.Function;


public class ExecuteUtil {


    public static  void partitionRun(List dataList, int size, Consumer> consumer) {
 if (CollectionUtils.isEmpty(dataList)) {
     return;
 }
 Preconditions.checkArgument(size > 0, "size must not be a minus");
 Lists.partition(dataList, size).forEach(consumer);
    }

    public static  List partitionCall2List(List dataList, int size, Function, List> function) {

 if (CollectionUtils.isEmpty(dataList)) {
     return new ArrayList<>(0);
 }
 Preconditions.checkArgument(size > 0, "size must not be a minus");

 return Lists.partition(dataList, size)
  .stream()
  .map(function)
  .filter(Objects::nonNull)
  .reduce(new ArrayList<>(),
   (resultList1, resultList2) -> {
resultList1.addAll(resultList2);
return resultList1;
   });


    }

    public static  Map partitionCall2Map(List dataList, int size, Function, Map> function) {
 if (CollectionUtils.isEmpty(dataList)) {
     return new HashMap<>(0);
 }
 Preconditions.checkArgument(size > 0, "size must not be a minus");
 return Lists.partition(dataList, size)
  .stream()
  .map(function)
  .filter(Objects::nonNull)
  .reduce(new HashMap<>(),
   (resultMap1, resultMap2) -> {
resultMap1.putAll(resultMap2);
return resultMap1;
   });


    }
}

待调用的服务(模拟)

package com.chujianyun.common.java8.function;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class SomeManager {

    public void aRun(Long id, List data) {

    }

    public List aListMethod(Long id, List data) {
 return new ArrayList<>(0);
    }

    public Map aMapMethod(Long id, List data) {
 return new HashMap<>(0);
    }
}

单元测试:

package com.chujianyun.common.java8.function;

import org.apache.commons.lang3.RandomUtils;
import org.jeasy.random.EasyRandom;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.internal.verification.Times;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.modules.junit4.PowerMockRunner;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;

@RunWith(PowerMockRunner.class)
public class ExecuteUtilTest {

    private EasyRandom easyRandom = new EasyRandom();

    @Mock
    private SomeManager someManager;

    // 测试数据
    private List mockDataList;

    private int total = 30;

    @Before
    public void init() {
 // 构造30条数据
 mockDataList = easyRandom.objects(String.class, 30).collect(Collectors.toList());

    }

    @Test
    public void test_a_run_partition() {
 // mock aRun
 PowerMockito.doNothing().when(someManager).aRun(anyLong(), any());

 // 每批 10 个
 ExecuteUtil.partitionRun(mockDataList, 10, (eachList) -> someManager.aRun(1L, eachList));

 //验证执行了 3 次
 Mockito.verify(someManager, new Times(3)).aRun(anyLong(), any());
    }


    @Test
    public void test_call_return_list_partition() {
 // mock  每次调用返回条数(注意每次调用都是这2个)
 int eachReturnSize = 2;
 PowerMockito
  .doReturn(easyRandom.objects(String.class, eachReturnSize).collect(Collectors.toList()))
  .when(someManager)
  .aListMethod(anyLong(), any());

 // 分批执行
 int size = 4;
 List resultList = ExecuteUtil.partitionCall2List(mockDataList, size, (eachList) -> someManager.aListMethod(2L, eachList));

 //验证执行次数
 int invocations = 8;
 Mockito.verify(someManager, new Times(invocations)).aListMethod(anyLong(), any());

 // 正好几轮
 int turns;
 if (total % size == 0) {
     turns = total / size;
 } else {
     turns = total / size + 1;
 }
 Assert.assertEquals(turns * eachReturnSize, resultList.size());
    }


    @Test
    public void test_call_return_map_partition() {
 // mock  每次调用返回条数
 // 注意:
 // 如果仅调用doReturn一次,那么每次返回都是key相同的Map,
 // 如果需要不覆盖,则doReturn次数和 invocations 相同)
 int eachReturnSize = 3;
 PowerMockito
  .doReturn(mockMap(eachReturnSize))
  .doReturn(mockMap(eachReturnSize))
  .when(someManager).aMapMethod(anyLong(), any());

 // 每批
 int size = 16;
 Map resultMap = ExecuteUtil.partitionCall2Map(mockDataList, size, (eachList) -> someManager.aMapMethod(2L, eachList));

 //验证执行次数
 int invocations = 2;
 Mockito.verify(someManager, new Times(invocations)).aMapMethod(anyLong(), any());

 // 正好几轮
 int turns;
 if (total % size == 0) {
     turns = total / size;
 } else {
     turns = total / size + 1;
 }
 Assert.assertEquals(turns * eachReturnSize, resultMap.size());
    }

    private Map mockMap(int size) {
 Map result = new HashMap<>(size);
 for (int i = 0; i < size; i++) {

// 极力保证key不重复
     result.put(easyRandom.nextObject(String.class) + RandomUtils.nextInt(), easyRandom.nextInt());
 }
 return result;
    }


}

注意:

1 判空

.filter(Objects::nonNull) 

这里非常重要,避免又一次调用返回 null,而导致空指针异常。

2 实际使用时可以结合apollo配置, 灵活设置每批执行的数量,如果超时随时调整

3 用到的类库

集合工具类: commons-collections4、guava (可以不用)

这里的list划分子list也可以使用stream的 skip ,limit特性自己去做,集合判空也可以不借助collectionutils.

构造数据:easy-random

单元测试框架: Junit4 、 powermockito、mockito

4 大家可以加一些更强大的功能,如允许设置每次调用的时间间隔、并行或并发调用等。

三、改进

以上面的List接口为例,将其改为异步版本:

    public static  List partitionCall2ListAsync(List dataList,
 int size,
 ExecutorService executorService,
 Function, List> function) {

 if (CollectionUtils.isEmpty(dataList)) {
     return new ArrayList<>(0);
 }
 Preconditions.checkArgument(size > 0, "size must not be a minus");

 List>> completableFutures = Lists.partition(dataList, size)
  .stream()
  .map(eachList -> {
      if (executorService == null) {
   return CompletableFuture.supplyAsync(() -> function.apply(eachList));
      } else {
   return CompletableFuture.supplyAsync(() -> function.apply(eachList), executorService);
      }

  })
  .collect(Collectors.toList());


 CompletableFuture allFinished = CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0]));
 try {
     allFinished.get();
 } catch (Exception e) {
     throw new RuntimeException(e);
 }
 return completableFutures.stream()
  .map(CompletableFuture::join)
  .filter(CollectionUtils::isNotEmpty)
  .reduce(new ArrayList(), ((list1, list2) -> {
      List resultList = new ArrayList<>();
      if(CollectionUtils.isNotEmpty(list1)){
  resultList.addAll(list1);
  }

      if(CollectionUtils.isNotEmpty(list2)){
    resultList.addAll(list2);
  }
      return resultList;
  }));
    }

测试代码:

    
 // 测试数据
    private List mockDataList;

    private int total = 300;

    private AtomicInteger atomicInteger;

    @Before
    public void init() {
 // 构造total条数据
 mockDataList = easyRandom.objects(String.class, total).collect(Collectors.toList());

    }



@Test
 public void test_call_return_list_partition_async() {

 ExecutorService executorService = Executors.newFixedThreadPool(10);

 atomicInteger = new AtomicInteger(0);
 Stopwatch stopwatch = Stopwatch.createStarted();
 // 分批执行
 int size = 2;
 List resultList = ExecuteUtil.partitionCall2ListAsync(mockDataList, size, executorService, (eachList) -> someCall(2L, eachList));

 Stopwatch stop = stopwatch.stop();
 log.info("执行时间: {} 秒", stop.elapsed(TimeUnit.SECONDS));

 Assert.assertEquals(total, resultList.size());
 // 正好几轮
 int turns;
 if (total % size == 0) {
     turns = total / size;
 } else {
     turns = total / size + 1;
 }
 log.info("共调用了{}次", turns);
 Assert.assertEquals(turns, atomicInteger.get());
    
      // 顺序也一致
 for(int i =0; i< mockDataList.size();i++){
     Assert.assertEquals((Integer) mockDataList.get(i).length(), resultList.get(i));
 }
    }


  
    private List someCall(Long id, List strList) {

 log.info("当前-->{},strList.size:{}", atomicInteger.incrementAndGet(), strList.size());
 try {
     TimeUnit.SECONDS.sleep(2L);
 } catch (InterruptedException e) {
     e.printStackTrace();
 }
 return strList.stream()
  .map(String::length)
  .collect(Collectors.toList());
    }

通过异步可以尽可能快得拿到执行结果。

四、总结

1 要灵活运用Java 8 的 特性简化代码

2 要注意代码的封装来使代码更加优雅,复用性更强

3 要利用来构造单元测试的数据框架如 java-faker和easy-random来提高构造数据的效率

4 要了解性能改进的常见思路:合并请求、并发、并行、缓存等。

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号