多线程实现(一)——概念和三种实现多线程实操
多线程实现(二)——线程池实现多线程
多线程实现(三)——JUC异步并发
- 系列文章目录
- 前言
- 一、什么是异步?
- 二、Future
- 三、异步编排
- 3.1 CompletableFutrue
- 3.1.2 串行执行
- 3.1.3 聚合-AND 聚合
- 3.1.4 聚合-OR 聚合
- 3.1.5 异常处理
- 四、异步开启
- 五、 案例演练
前言
并发强调的是N人干同样的事,保证不争抢 。像拔河,要劲往一处使,不能抢着乱用劲。(lock,atomic,synchronize,volatile, cas)
异步强调的是1/N人干不同的事,不该等的别等。像小时候家里草锅做饭,爸爸负责烧锅煮米饭,妈妈在另一个锅负责炒菜,互不妨碍,不需要等米饭好了才能做菜。就是要一起吃饭哈哈哈 (thread pool, future, async,reactive)
提示:以下是本篇文章正文内容,下面案例可供参考
一、什么是异步?异步就是主线程序把某个(些)任务交给子线程去执行,主线程继续往下执行。子线程执行完后把结果给主线程,得到最终的目标结果。就像科研,导师负责主要技术攻关,学生负责能力以内的计算,不用互相等待,将结果给老师,老师再综合数据经过一系列操作得到最终结果。老师就像是主线程,学生就像是子线程,互相执行不等待,拿最终的结果;亦像地铁和乘客,地铁是主线,从起点运行到终点;乘客就像子线程,中途可以下车(子线程执行结束)。
二、FutureFuture 是一个异步计算结果返回接口,目的获取返回值结果。但是 future 在获取返回值结构的时候,方法必须同步阻塞等待返回值结果就像大巴车和乘客。大巴车不到终点,乘客就不能就近下车。大巴车是子线程,乘客是主线程。只能等子线程结束了才能执行主线程。
常用的方法:
Get : 获取结果(等待,阻塞)
Get(timeout) : 获取结果,指定等待时间
Cancel : 取消当前任务
isDone : 判断任务是否已经完成 (轮询)
futrure 对于结果获取不是很方便,只能通过同步阻塞的方式获取结果,或者是轮询的方式
获取到结果;阻塞的方式获取返回值结果与异步的思想相违背,轮询方式又很占用 cpu 资源,
也不能及时得到我们结果。
Future的特点导致需要新的方法来完成我们异步计算返回结果(不同步阻塞),由此有了CompletableFutrue。CompletableFutrue可以帮助我们简化异步编程复杂性,提供了函数式编程的能力,可以通过回调函数的方式处理计算结果:
public class CompletableFutureimplements Future , CompletionStage
CompletableFuture 具有Future的特性,还实现了CompletionStage接口,具备CompletionStage
接口的特性: 串行执行,并行执行,聚合(AND 聚合,OR 聚合)
串行执行就是先执行的线程的结果,影响下一个线程的执行。具有先后关系,即then-然后。类似先做饭后吃饭!
串行方式一
public CompletionStagethenRun(Runnable action); public CompletionStage thenRunAsync(Runnable action);
示例1.1:thenRun(没有返回值,不关心上一步执行结果,只和上一步执行具有顺序关系)
@Slf4j
public class supplyAsyncDemoThenRun {
public static void main(String[] args) throws ExecutionException, InterruptedException {
log.info("主线程start........");
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
log.info("子线程future线程start。。。。。。。。。");
int i = 10 / 2;
log.info("线程名称:{},线程执行结果:{},", Thread.currentThread().getName(), i);
log.info("子线程future线程end。。。。。。。。。");
return i;
}).thenRun(()->{
log.info("thenRun运行。。。。。。。。。");
});
//调用异步任务
future.get();
log.info("主线程end..............");
}
}
示例1.2:thenRunAsync(没有返回值,也不关心上一步执行结果,只和上一步执行具有顺序关系)
@Slf4j
public class SupplyAsyncDemoThenRunAsync {
public static void main(String[] args) throws ExecutionException, InterruptedException {
log.info("主线程start........");
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
log.info("子线程future线程start。。。。。。。。。");
int i = 10 / 2;
log.info("线程名称:{},线程执行结果:{},", Thread.currentThread().getName(), i);
log.info("子线程future线程end。。。。。。。。。");
return i;
}).thenRunAsync(()->{
log.info("thenRunAsync运行。。。。。。。。。");
});
//调用异步任务
future.get();
log.info("主线程end..............");
}
}
拓展: thenRun/thenRunAsync都内部调用了uniRunStage(Executor e, Runnable f) 方法
private CompletableFutureuniRunStage(Executor e, Runnable f) { if (f == null) throw new NullPointerException(); CompletableFuture d = new CompletableFuture (); if (e != null || !d.uniRun(this, f, null)) { UniRun c = new UniRun (e, d, this, f); push(c); c.tryFire(SYNC); } return d; } public CompletableFuture thenRun(Runnable action) { //这里我看到Executor入参为null,表示执行第一个任务的时候,传入了一个线程池, //当执行第二个任务的时候调用的是thenRun方法,则第二个任务和第一个任务是共用同一个线程池。 return uniRunStage(null, action); } public CompletableFuture thenRunAsync(Runnable action) { //这里我看到Executor入参为asyncPool,所以它是使用的默认的ForkJoin线程池。 //即:第一个任务使用的是你传入的线程池,第二个任务使用的是ForkJoin线程池, //由于ForkJoin线程池是整个应用程序公用的,该线程池可以在程序启动的时候进行设置, //所以,不建议你在不知道程序其他地方哪里还会使用该线程池的情况下使用。 return uniRunStage(asyncPool, action); } public CompletableFuture thenRunAsync(Runnable action, Executor executor) { //Executor参数传的是外部传入的自定义线程池,所以它是使用的你指定的线程池。 //在执行第一个任务的时候,传入了A线程池,当执行第二个任务的时候调用的是: //thenRunAsync(Runnable action,Executor executor)方法并传入了B线程池, //则第一个任务使用的是A线程池,第二个任务使用的是B线程池。 return uniRunStage(screenExecutor(executor), action); }
串行方式二
public CompletionStage thenApply(Function super T,? extends U> fn); public CompletionStage thenApplyAsync(Function super T,? extends U> fn); public CompletionStage thenApplyAsync(Function super T,? extends U> fn,Executor executor);
示例:thenApply 此方法具有返回值,上一步直接的结果当成传参的传递给 thenApply
T 就是参数类型,U 就是返回值类
@Slf4j
public class SupplyAsyncDemoThenApply {
public static void main(String[] args) throws ExecutionException, InterruptedException {
log.info("主线程start........");
//1.public static CompletableFuture supplyAsync(Supplier supplier);
//thenApply 有返回值,返回值类型是U(跟上一步执行结果有关系),上一步执行结果会被当成参数传递给下一步,参数类型为T
//2.public completionStage thenApply(Function super T,? extend U> fn);
//多线程异步编排
CompletableFuture apply = CompletableFuture.supplyAsync(() -> {
log.info("子线程future线程start。。。。。。。。。");
int i = 10 / 2;
log.info("线程名称:{},线程执行结果:{},", Thread.currentThread().getName(), i);
log.info("子线程future线程end。。。。。。。。。");
return i;
}).thenApply((t) -> {
log.info("子线程thenApply运行,参数是:{}", t);
long res = t * 5;
log.info("计算结果是:{}", res);
return res;
});
//调用异步任务
apply.get();
log.info("主线程end..............");
}
}
串行方式三
public CompletionStagethenAccept(Consumer super T> action); public CompletionStage thenAcceptAsync(Consumer super T> action); public CompletionStage thenAcceptAsync(Consumer super T> action, Executor executor);
示例:thenAccept 此方法没有返回值,上一步直接的结果当成传参的传递给 thenAccept
@Slf4j
public class SupplyAsyncDemoThenAccept {
public static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
13,
3,
TimeUnit.SECONDS,
new linkedBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
public static void main(String[] args) throws ExecutionException, InterruptedException {
log.info("主线程start........");
//1.public static CompletableFuture supplyAsync(Supplier supplier);
//thenAccept 没有返回值(t跟上一步执行结果有关系),上一步执行结果会被当成参数传递给下一步,参数类型为T
//2.public completionStage thenAccept(Consumer super T> action);
//多线程异步编排
CompletableFuture accept = CompletableFuture.supplyAsync(() -> {
log.info("子线程future线程start。。。。。。。。。");
int i = 10 / 2;
log.info("线程名称:{},线程执行结果:{},", Thread.currentThread().getName(), i);
log.info("子线程future线程end。。。。。。。。。");
return i;
},threadPool).thenAccept((t) -> {
log.info("子线程thenAccept运行,参数是:{}", t);
long res = t * 5;
log.info("计算结果是:{}", res);
});
//调用异步任务
accept.get();
log.info("主线程end..............");
}
}
串行方式四
public CompletionStage thenCompose(Function super T, ? extends CompletionStage> fn); public CompletionStage thenComposeAsync(Function super T, ? extends CompletionStage> fn); public CompletionStage thenComposeAsync(Function super T, ? extends CompletionStage> fn, Executor executor);
示例:thenCompose 有返回值,上一步直接的结果当成传参的传递给 thenCompose
@Slf4j
public class SupplyAsyncDemoThenCompose {
public static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
13,
3,
TimeUnit.SECONDS,
new linkedBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
public static void main(String[] args) throws ExecutionException, InterruptedException {
log.info("主线程start........");
//1.public static CompletableFuture supplyAsync(Supplier supplier);
//允许对两个completionStage流水线进行操作,第一个操作完成时,讲第一个操作结果传递给第二个completionStage
//2.public completionStage thenCompose(Function super T,? extends CompletionStage> fn);
//多线程异步编排
CompletableFuture thenCompose = CompletableFuture.supplyAsync(() -> {
log.info("子线程future线程start。。。。。。。。。");
int i = 10 / 2;
log.info("线程名称:{},线程执行结果:{},", Thread.currentThread().getName(), i);
log.info("子线程future线程end。。。。。。。。。");
return i;
//第一次执行
}, threadPool).thenCompose(new Function>() {
@Override
public CompletionStage apply(Integer t) {
//第二次执行
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
log.info("子线程thenCompose运行,参数是:{}", t);
long res = t * 5;
log.info("计算结果是:{}", res);
return res;
});
return future;
}
});
//调用异步任务
thenCompose.get();
log.info("主线程end..............");
}
}
3.1.3 聚合-AND 聚合
public CompletionStagethenCombine(CompletionStage extends U> other,BiFunction super T,? super U,? extends V> fn); public CompletionStage thenCombineAsync(CompletionStage extends U> other,BiFunction super T,? super U,? extends V> fn); public CompletionStage thenCombineAsync(CompletionStage extends U> other,BiFunction super T,? super U,? extends V> fn,Executor executor)
示例
@Slf4j
public class SupplyAsyncDemoThenCombin {
public static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
13,
3,
TimeUnit.SECONDS,
new linkedBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
public static void main(String[] args) throws ExecutionException, InterruptedException {
log.info("主线程start........");
//1.public static CompletableFuture supplyAsync(Supplier supplier);
//thenCombine 有返回值,会把两个CompletionStage的任务都执行完毕后,吧两个任务结果一块交给thenCombine去处理
//2.public CompletionStage thenCombine (CompletionStage extends U> other,BiFunction super T,? super U,? extends V> fn)
//多线程异步编排
//第一个:CompletionStage
CompletableFuture f1 = CompletableFuture.supplyAsync(() -> {
log.info("子线程f1线程start。。。。。。。。。");
int i = 10 / 2;
log.info("线程名称:{},线程执行结果:{},", Thread.currentThread().getName(), i);
log.info("子线程future线程end。。。。。。。。。");
return i;
}, threadPool);
//第二个:CompletionStage
CompletableFuture f2 = CompletableFuture.supplyAsync(() -> {
log.info("子线程f2线程start。。。。。。。。。");
int i = 10 / 3;
log.info("线程名称:{},线程执行结果:{},", Thread.currentThread().getName(), i);
log.info("子线程future线程end。。。。。。。。。");
return i;
}, threadPool);
//合并操作
CompletableFuture thenCombine = f1.thenCombine(f2, (t, u) -> {
log.info("t是第一个执行后的结果:{}", t);
log.info("u是第二个执行后的结果:{}", u);
return t+u;
});
//调用方法
Integer integer = thenCombine.get();
log.info("最终的调用结果是:{}",integer);
log.info("主线程end..............");
}
}
没有返回值的AND聚合
public CompletionStagethenAcceptBoth (CompletionStage extends U> other, BiConsumer super T, ? super U> action); public CompletionStage thenAcceptBothAsync(CompletionStage extends U> other,BiConsumer super T, ? super U> action); public CompletionStage thenAcceptBothAsync(CompletionStage extends U> other,BiConsumer super T, ? super U> action,Executor executor)
@Slf4j
public class SupplyAsyncDemoThenAcceptBoth {
public static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
13,
3,
TimeUnit.SECONDS,
new linkedBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
public static void main(String[] args) throws ExecutionException, InterruptedException {
log.info("主线程start........");
//1.public static CompletableFuture supplyAsync(Supplier supplier);
//thenAcceptBoth 当两个阶段的CompletionStage都执行完毕后,把结果一块交给thenAcceptBoth
//2.public CompletionStage thenAcceptBoth (CompletionStage extends U> other,BiConsumer super T, ? super U> action);
//多线程异步编排
//第一个:CompletionStage
CompletableFuture f1 = CompletableFuture.supplyAsync(() -> {
log.info("子线程f1线程start。。。。。。。。。");
int i = 10 / 2;
log.info("线程名称:{},线程执行结果:{},", Thread.currentThread().getName(), i);
log.info("子线程future线程end。。。。。。。。。");
return i;
});
//第二个:CompletionStage
CompletableFuture f2 = CompletableFuture.supplyAsync(() -> {
log.info("子线程f2线程start。。。。。。。。。");
int i = 10 / 3;
log.info("线程名称:{},线程执行结果:{},", Thread.currentThread().getName(), i);
log.info("子线程future线程end。。。。。。。。。");
return i;
}, threadPool);
//合并操作
CompletableFuture thenAcceptBoth = f1.thenAcceptBoth(f2, (t, u) -> {
log.info("t是第一个执行后的结果:{}", t);
log.info("u是第二个执行后的结果:{}", u);
});
//调用方法
thenAcceptBoth.get();
log.info("主线程end..............");
}
}
3.1.4 聚合-OR 聚合
public CompletionStage applyToEitherAsync(CompletionStage extends T> other,Function super T, U> fn); public CompletionStage applyToEitherAsync(CompletionStage extends T> other,Function super T, U> fn,Executor executor); public CompletionStage applyToEitherAsync(CompletionStage extends T> other,Function super T, U> fn); public CompletionStage applyToEitherAsync(CompletionStage extends T> other,Function super T, U> fn,Executor executor); public CompletionStageacceptEither(CompletionStage extends T> other,Consumer ? super T> action
示例代码
@Slf4j
public class SupplyAsyncDemoApplyToEitherAsync {
public static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
13,
3,
TimeUnit.SECONDS,
new linkedBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
public static void main(String[] args) throws ExecutionException, InterruptedException {
log.info("主线程start........");
//1.public static CompletableFuture supplyAsync(Supplier supplier);
//applyToEitherAsync 针对两个阶段的CompletionStage,将计算速度最快的那个CompletionStage的结果作为下一步处理的消费
//applyToEitherAsync = applyToEither
//2.public CompletionStage applyToEither (CompletionStage extends T> other, Function super T, U> fn);
//多线程异步编排
//第一个:CompletionStage
CompletableFuture f1 = CompletableFuture.supplyAsync(() -> {
log.info("子线程f1线程start。。。。。。。。。");
int i = 10 / 2;
log.info("线程名称:{},线程执行结果:{},", Thread.currentThread().getName(), i);
log.info("第一个:CompletionStage子线程future线程end。。。。。。。。。");
return i;
});
//第二个:CompletionStage
CompletableFuture f2 = CompletableFuture.supplyAsync(() -> {
log.info("子线程f2线程start。。。。。。。。。");
int i = 10 / 3;
log.info("线程名称:{},线程执行结果:{},", Thread.currentThread().getName(), i);
log.info("第二个:CompletionStage子线程future线程end。。。。。。。。。");
return i;
}, threadPool);
//合并操作
CompletableFuture applyToEither = f1.applyToEither(f2, r -> {
log.info("有个applyToEither任务正在执行,参数是:{}", r);
return r;
});
//调用方法
Integer integer = applyToEither.get();
log.info("最终计算结果:{}",integer);
log.info("主线程end..............");
}
}
3.1.5 异常处理
方法接口
public CompletionStageexceptionally(Function fn); public CompletionStage whenComplete(BiConsumer super T, ? super Throwable> action); public CompletionStage whenCompleteAsync(BiConsumer super T, ? super Throwable> action); public CompletionStage whenCompleteAsync(BiConsumer super T, ? super Throwable> action,Executor executor); public CompletionStage handle(BiFunction super T, Throwable, ? extends U> fn); public CompletionStage handleAsync(BiFunction super T, Throwable, ? extends U> fn); public CompletionStage handleAsync(BiFunction super T, Throwable, ? extends U> fn,Executor executor)
代码示例:exceptionally
@Slf4j
public class SupplyAsyncDemoExceptionally {
public static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
13,
3,
TimeUnit.SECONDS,
new linkedBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
public static void main(String[] args) throws ExecutionException, InterruptedException {
log.info("主线程start........");
//1.public static CompletableFuture supplyAsync(Supplier supplier);
//exceptionally 异常处理
//2.public CompletionStage exceptionally(Function fn);
//多线程异步编排
//第一个:CompletionStage
CompletableFuture f1 = CompletableFuture.supplyAsync(() -> {
log.info("子线程f1线程start。。。。。。。。。");
int i = 10 / 0;
log.info("线程名称:{},线程执行结果:{},", Thread.currentThread().getName(), i);
log.info("第一个CompletionStage子线程future线程end。。。。。。。。。");
return i;
}).exceptionally((t)->{
log.info("业务执行失败:{}", t.getMessage());
return null;
});
//调用方法
Integer integer = f1.get();
log.info("最终计算结果:{}",integer);
log.info("主线程end..............");
}
代码示例:handle
@Slf4j
public class SupplyAsyncDemoHandle {
public static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
13,
3,
TimeUnit.SECONDS,
new linkedBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
public static void main(String[] args) throws ExecutionException, InterruptedException {
log.info("主线程start........");
//1.public static CompletableFuture supplyAsync(Supplier supplier);
//handle 异常处理 = try{}finally{} :对上一步执行的结果进行处理,还可以处理异常任务
//2.public CompletionStage handle (BiFunction super T, Throwable, ? extends U> fn);
//多线程异步编排
//第一个:CompletionStage
CompletableFuture f = CompletableFuture.supplyAsync(() -> {
log.info("子线程f1线程start。。。。。。。。。");
//模拟异常
int i = 10 / 2;
log.info("线程名称:{},线程执行结果:{},", Thread.currentThread().getName(), i);
log.info("第一个CompletionStage子线程future线程end。。。。。。。。。");
return i;
}).handle((t,u)->{
int res = -1;
if (u!=null){
log.info("业务执行失败:{}", u.getMessage());
}else {
res = t*5;
}
return res;
});
//调用方法
Integer integer = f.get();
log.info("最终计算结果:{}",integer);
log.info("主线程end..............");
}
}
四、异步开启
CompletableFuture 提供了 4 个静态的方法,来创建一个异步操作(异步开启: 从这 4 个静态的方法开发即可)
方式一:runAsync: 没有返回值的方法,不关注返回值 RunAsync : 没有使用自定义线程池,默认使用的线程池 ForkJoinPool.commonPool public static CompletableFuturerunAsync(Runnable runnable); public static CompletableFuture runAsync(Runnable runnable,Executor executor); 方式二: supplyAsync : 有返回值,关注返回值的。 public static CompletableFuture supplyAsync(Supplier supplier); public static CompletableFuture supplyAsync(Supplier supplier,Executor executor
代码示例:runAsync
@Slf4j
public class RunAsyncFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
log.info("主线程start........");
//第一种方法:runAsync:实现异步编排,没有返回值
//没有使用自定义线程池,默认使用的线程池 ForkJoinPool.commonPool
//public static completableFuture runAsync(Runnable runnable)
CompletableFuture future = CompletableFuture.runAsync(() -> {
log.info("子线程future线程start。。。。。。。。。");
int i = 10 / 2;
log.info("线程名称:{},线程执行结果:{},", Thread.currentThread().getName(), i);
log.info("子线程future线程end。。。。。。。。。");
});
log.info("主线程end..............");
//调用异步任务
future.get();
}
}
代码示例:supplyAsync
@Slf4j
public class SupplyAsyncFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
log.info("主线程start........");
//第二种方法:SupplyAsync:实现异步编排,有返回值
//public static completableFuture supplyAsync(supplier supplier)
CompletableFuture uCompletableFuture = CompletableFuture.supplyAsync(() -> {
log.info("子线程future线程start。。。。。。。。。");
int i = 10 / 2;
log.info("线程名称:{},线程执行结果:{},", Thread.currentThread().getName(), i);
log.info("子线程future线程end。。。。。。。。。");
return i;
});
//调用异步任务
Integer integer = uCompletableFuture.get();
log.info("supplyAsync异步编排的返回值结果:{},", integer);
log.info("主线程end..............");
}
}
五、 案例演练
现有商品信息页面,为应对高并发,使用线程池进行异步处理。
代码仅供参考,下方的各个实体类可以依据自己的场景demo自由更换,动动小手,实操一下吧(狗头)。
public SkuItemVo skuItem(Long skuId) {
//新建一个包装类对象
SkuItemVo skuItemVo = new SkuItemVo();
//开启异步编排实现,提升服务性能
//1.根据skuId 查询 sku基本信息
CompletableFuture infoFuture = CompletableFuture.supplyAsync(() -> {
SkuInfoEntity skuInfoEntity = this.getById(skuId);
skuItemVo.setInfo(skuInfoEntity);
return skuInfoEntity;
}, executor);
//2.根据skuId 查询 sku图片信息(多个图片)
CompletableFuture imagesFuture = CompletableFuture.runAsync(() -> {
List imagesList = skuImagesService.list(new QueryWrapper().eq("sku_id", skuId));
skuItemVo.setImages(imagesList);
}, executor);
//3.根据skuId 查询 spu销售属性
CompletableFuture salesFutrue = infoFuture.thenAcceptAsync((res) -> {
//获取sku与之对应的spuId
Long spuId = res.getSpuId();
List saleAttrVos = skuSaleAttrValueService.getSaleAttrs(spuId);
if (!CollectionUtils.isEmpty(saleAttrVos)){
skuItemVo.setAttrSales(saleAttrVos);
}
}, executor);
//4.根据skuId 查询 spu的描述信息
CompletableFuture desFutrue = infoFuture.thenAcceptAsync((res) -> {
//获取sku与之对应的spuId
Long spuId = res.getSpuId();
SpuInfoDescEntity spuInfoDescEntity = spuInfoDescService.getOne(new QueryWrapper().eq("spu_id",spuId));
if (null != spuInfoDescEntity){
skuItemVo.setDesc(spuInfoDescEntity);
}
}, executor);
//5.根据skuId,categoryId 查询 sku分组规格参数属性值
CompletableFuture groupFutrue = infoFuture.thenAcceptAsync((res) -> {
//获取sku与之对应的spuId
Long spuId = res.getSpuId();
//获取商品分类id
Long categoryId = res.getCategoryId();
List attrGroupVos = attrGroupService.getGroupAttr(spuId,categoryId);
skuItemVo.setAttrGroups(attrGroupVos);
if (CollectionUtils.isEmpty(attrGroupVos)){
skuItemVo.setAttrGroups(attrGroupVos);
}
}, executor);
//allOf: 等待所有的任务完成后,才返回结果
try {
CompletableFuture.allOf(infoFuture,imagesFuture,salesFutrue,desFutrue,groupFutrue).get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return skuItemVo;
}



