栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

多线程实现(三)——JUC异步并发

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

多线程实现(三)——JUC异步并发

系列文章目录

多线程实现(一)——概念和三种实现多线程实操
多线程实现(二)——线程池实现多线程
多线程实现(三)——JUC异步并发

多线程实现文章目录
  • 系列文章目录
  • 前言
  • 一、什么是异步?
  • 二、Future
  • 三、异步编排
    • 3.1 CompletableFutrue
    • 3.1.2 串行执行
    • 3.1.3 聚合-AND 聚合
    • 3.1.4 聚合-OR 聚合
    • 3.1.5 异常处理
  • 四、异步开启
  • 五、 案例演练


前言

并发强调的是N人干同样的事,保证不争抢 。像拔河,要劲往一处使,不能抢着乱用劲。(lock,atomic,synchronize,volatile, cas)
异步强调的是1/N人干不同的事,不该等的别等。像小时候家里草锅做饭,爸爸负责烧锅煮米饭,妈妈在另一个锅负责炒菜,互不妨碍,不需要等米饭好了才能做菜。就是要一起吃饭哈哈哈 (thread pool, future, async,reactive)


提示:以下是本篇文章正文内容,下面案例可供参考

一、什么是异步?

异步就是主线程序把某个(些)任务交给子线程去执行,主线程继续往下执行。子线程执行完后把结果给主线程,得到最终的目标结果。就像科研,导师负责主要技术攻关,学生负责能力以内的计算,不用互相等待,将结果给老师,老师再综合数据经过一系列操作得到最终结果。老师就像是主线程,学生就像是子线程,互相执行不等待,拿最终的结果;亦像地铁和乘客,地铁是主线,从起点运行到终点;乘客就像子线程,中途可以下车(子线程执行结束)。

二、Future

Future 是一个异步计算结果返回接口,目的获取返回值结果。但是 future 在获取返回值结构的时候,方法必须同步阻塞等待返回值结果就像大巴车和乘客。大巴车不到终点,乘客就不能就近下车。大巴车是子线程,乘客是主线程。只能等子线程结束了才能执行主线程。

常用的方法:
Get : 获取结果(等待,阻塞)
Get(timeout) : 获取结果,指定等待时间
Cancel : 取消当前任务
isDone : 判断任务是否已经完成 (轮询)

futrure 对于结果获取不是很方便,只能通过同步阻塞的方式获取结果,或者是轮询的方式
获取到结果;阻塞的方式获取返回值结果与异步的思想相违背,轮询方式又很占用 cpu 资源,
也不能及时得到我们结果。

三、异步编排 3.1 CompletableFutrue

Future的特点导致需要新的方法来完成我们异步计算返回结果(不同步阻塞),由此有了CompletableFutrue。CompletableFutrue可以帮助我们简化异步编程复杂性,提供了函数式编程的能力,可以通过回调函数的方式处理计算结果:

public class CompletableFuture implements Future, CompletionStage

CompletableFuture 具有Future的特性,还实现了CompletionStage接口,具备CompletionStage
接口的特性: 串行执行,并行执行,聚合(AND 聚合,OR 聚合)

3.1.2 串行执行

串行执行就是先执行的线程的结果,影响下一个线程的执行。具有先后关系,即then-然后。类似先做饭后吃饭!

串行方式一

public CompletionStage thenRun(Runnable action);

public CompletionStage thenRunAsync(Runnable action);

示例1.1:thenRun(没有返回值,不关心上一步执行结果,只和上一步执行具有顺序关系)

@Slf4j
public class supplyAsyncDemoThenRun {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        log.info("主线程start........");

        CompletableFuture future = CompletableFuture.supplyAsync(() -> {
            log.info("子线程future线程start。。。。。。。。。");
            int i = 10 / 2;
            log.info("线程名称:{},线程执行结果:{},", Thread.currentThread().getName(), i);
            log.info("子线程future线程end。。。。。。。。。");
            return i;
        }).thenRun(()->{
            log.info("thenRun运行。。。。。。。。。");
        });

        //调用异步任务
        future.get();


        log.info("主线程end..............");


    }
}

示例1.2:thenRunAsync(没有返回值,也不关心上一步执行结果,只和上一步执行具有顺序关系)

@Slf4j
public class SupplyAsyncDemoThenRunAsync {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        log.info("主线程start........");

        CompletableFuture future = CompletableFuture.supplyAsync(() -> {
            log.info("子线程future线程start。。。。。。。。。");
            int i = 10 / 2;
            log.info("线程名称:{},线程执行结果:{},", Thread.currentThread().getName(), i);
            log.info("子线程future线程end。。。。。。。。。");
            return i;
        }).thenRunAsync(()->{
            log.info("thenRunAsync运行。。。。。。。。。");
        });

        //调用异步任务
        future.get();

        log.info("主线程end..............");
    }
}

拓展: thenRun/thenRunAsync都内部调用了uniRunStage(Executor e, Runnable f) 方法

 private CompletableFuture uniRunStage(Executor e, Runnable f) {
       if (f == null) throw new NullPointerException();
       CompletableFuture d = new CompletableFuture();
       if (e != null || !d.uniRun(this, f, null)) {
           UniRun c = new UniRun(e, d, this, f);
           push(c);
           c.tryFire(SYNC);
       }
       return d;
 }


public CompletableFuture thenRun(Runnable action) {
	//这里我看到Executor入参为null,表示执行第一个任务的时候,传入了一个线程池,
	//当执行第二个任务的时候调用的是thenRun方法,则第二个任务和第一个任务是共用同一个线程池。
    return uniRunStage(null, action);
}

public CompletableFuture thenRunAsync(Runnable action) {
    //这里我看到Executor入参为asyncPool,所以它是使用的默认的ForkJoin线程池。
	//即:第一个任务使用的是你传入的线程池,第二个任务使用的是ForkJoin线程池,
	//由于ForkJoin线程池是整个应用程序公用的,该线程池可以在程序启动的时候进行设置,
	//所以,不建议你在不知道程序其他地方哪里还会使用该线程池的情况下使用。
   return uniRunStage(asyncPool, action);
}

public CompletableFuture thenRunAsync(Runnable action, Executor executor) {
    //Executor参数传的是外部传入的自定义线程池,所以它是使用的你指定的线程池。
    //在执行第一个任务的时候,传入了A线程池,当执行第二个任务的时候调用的是:
    //thenRunAsync(Runnable action,Executor executor)方法并传入了B线程池,
    //则第一个任务使用的是A线程池,第二个任务使用的是B线程池。
    return uniRunStage(screenExecutor(executor), action);
}

串行方式二

public  CompletionStage thenApply(Function fn);

public  CompletionStage thenApplyAsync(Function fn);

public  CompletionStage thenApplyAsync(Function fn,Executor executor);

示例:thenApply 此方法具有返回值,上一步直接的结果当成传参的传递给 thenApply
T 就是参数类型,U 就是返回值类

@Slf4j
public class SupplyAsyncDemoThenApply {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        log.info("主线程start........");

        //1.public static  CompletableFuture supplyAsync(Supplier supplier);
        //thenApply 有返回值,返回值类型是U(跟上一步执行结果有关系),上一步执行结果会被当成参数传递给下一步,参数类型为T
        //2.public  completionStage thenApply(Function fn);

        //多线程异步编排
        CompletableFuture apply = CompletableFuture.supplyAsync(() -> {
            log.info("子线程future线程start。。。。。。。。。");
            int i = 10 / 2;
            log.info("线程名称:{},线程执行结果:{},", Thread.currentThread().getName(), i);
            log.info("子线程future线程end。。。。。。。。。");
            return i;
        }).thenApply((t) -> {
            log.info("子线程thenApply运行,参数是:{}", t);
            long res = t * 5;
            log.info("计算结果是:{}", res);
            return res;
        });

        //调用异步任务
        apply.get();


        log.info("主线程end..............");


    }
}

串行方式三

public CompletionStage thenAccept(Consumer action);

public CompletionStage thenAcceptAsync(Consumer action);

public CompletionStage thenAcceptAsync(Consumer action, Executor executor);

示例:thenAccept 此方法没有返回值,上一步直接的结果当成传参的传递给 thenAccept

@Slf4j
public class SupplyAsyncDemoThenAccept {

	
    public static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
            Runtime.getRuntime().availableProcessors(),
            13,
            3,
            TimeUnit.SECONDS,
            new linkedBlockingQueue<>(3),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy()
    );

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        log.info("主线程start........");

        //1.public static  CompletableFuture supplyAsync(Supplier supplier);
        //thenAccept 没有返回值(t跟上一步执行结果有关系),上一步执行结果会被当成参数传递给下一步,参数类型为T
        //2.public completionStage thenAccept(Consumer action);

        //多线程异步编排
        CompletableFuture accept = CompletableFuture.supplyAsync(() -> {
            log.info("子线程future线程start。。。。。。。。。");
            int i = 10 / 2;
            log.info("线程名称:{},线程执行结果:{},", Thread.currentThread().getName(), i);
            log.info("子线程future线程end。。。。。。。。。");
            return i;
        },threadPool).thenAccept((t) -> {
            log.info("子线程thenAccept运行,参数是:{}", t);
            long res = t * 5;
            log.info("计算结果是:{}", res);
        });

        //调用异步任务
        accept.get();


        log.info("主线程end..............");


    }
}

串行方式四

public  CompletionStage thenCompose(Function> fn);

public  CompletionStage thenComposeAsync(Function> fn);

public  CompletionStage thenComposeAsync(Function> fn, Executor executor);

示例:thenCompose 有返回值,上一步直接的结果当成传参的传递给 thenCompose

@Slf4j
public class SupplyAsyncDemoThenCompose {

	
    public static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
            Runtime.getRuntime().availableProcessors(),
            13,
            3,
            TimeUnit.SECONDS,
            new linkedBlockingQueue<>(3),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy()
    );
    
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        log.info("主线程start........");

        //1.public static  CompletableFuture supplyAsync(Supplier supplier);
        //允许对两个completionStage流水线进行操作,第一个操作完成时,讲第一个操作结果传递给第二个completionStage
        //2.public  completionStage thenCompose(Function> fn);

        //多线程异步编排
        CompletableFuture thenCompose = CompletableFuture.supplyAsync(() -> {
            log.info("子线程future线程start。。。。。。。。。");
            int i = 10 / 2;
            log.info("线程名称:{},线程执行结果:{},", Thread.currentThread().getName(), i);
            log.info("子线程future线程end。。。。。。。。。");
            return i;
            //第一次执行
        }, threadPool).thenCompose(new Function>() {
            @Override
            public CompletionStage apply(Integer t) {
                //第二次执行
                CompletableFuture future = CompletableFuture.supplyAsync(() -> {
                    log.info("子线程thenCompose运行,参数是:{}", t);
                    long res = t * 5;
                    log.info("计算结果是:{}", res);
                    return res;
                });

                return future;
            }

        });

        //调用异步任务
       thenCompose.get();


        log.info("主线程end..............");


    }
}


3.1.3 聚合-AND 聚合
public  CompletionStage thenCombine(CompletionStage other,BiFunction fn);

public  CompletionStage thenCombineAsync(CompletionStage other,BiFunction fn);

public  CompletionStage thenCombineAsync(CompletionStage other,BiFunction fn,Executor executor)

示例

@Slf4j
public class SupplyAsyncDemoThenCombin {

    
    public static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
            Runtime.getRuntime().availableProcessors(),
            13,
            3,
            TimeUnit.SECONDS,
            new linkedBlockingQueue<>(3),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy()
    );


    public static void main(String[] args) throws ExecutionException, InterruptedException {
        log.info("主线程start........");

        //1.public static  CompletableFuture supplyAsync(Supplier supplier);
        //thenCombine 有返回值,会把两个CompletionStage的任务都执行完毕后,吧两个任务结果一块交给thenCombine去处理
        //2.public  CompletionStage thenCombine (CompletionStage other,BiFunction fn)

        //多线程异步编排
        //第一个:CompletionStage
        CompletableFuture f1 = CompletableFuture.supplyAsync(() -> {
            log.info("子线程f1线程start。。。。。。。。。");
            int i = 10 / 2;
            log.info("线程名称:{},线程执行结果:{},", Thread.currentThread().getName(), i);
            log.info("子线程future线程end。。。。。。。。。");
            return i;
        }, threadPool);

        //第二个:CompletionStage
        CompletableFuture f2 = CompletableFuture.supplyAsync(() -> {
            log.info("子线程f2线程start。。。。。。。。。");
            int i = 10 / 3;
            log.info("线程名称:{},线程执行结果:{},", Thread.currentThread().getName(), i);
            log.info("子线程future线程end。。。。。。。。。");
            return i;
        }, threadPool);

        //合并操作
        CompletableFuture thenCombine = f1.thenCombine(f2, (t, u) -> {
            log.info("t是第一个执行后的结果:{}", t);
            log.info("u是第二个执行后的结果:{}", u);
            return t+u;
        });

        //调用方法
        Integer integer = thenCombine.get();
        log.info("最终的调用结果是:{}",integer);

        log.info("主线程end..............");

    }
}

没有返回值的AND聚合

public  CompletionStage thenAcceptBoth (CompletionStage other, BiConsumer action);

public  CompletionStage thenAcceptBothAsync(CompletionStage other,BiConsumer action);

public  CompletionStage thenAcceptBothAsync(CompletionStage other,BiConsumer action,Executor executor)
@Slf4j
public class SupplyAsyncDemoThenAcceptBoth {


    
    public static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
            Runtime.getRuntime().availableProcessors(),
            13,
            3,
            TimeUnit.SECONDS,
            new linkedBlockingQueue<>(3),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy()
    );


    public static void main(String[] args) throws ExecutionException, InterruptedException {
        log.info("主线程start........");

        //1.public static  CompletableFuture supplyAsync(Supplier supplier);
        //thenAcceptBoth 当两个阶段的CompletionStage都执行完毕后,把结果一块交给thenAcceptBoth
        //2.public  CompletionStage thenAcceptBoth (CompletionStage other,BiConsumer action);

        //多线程异步编排
        //第一个:CompletionStage
        CompletableFuture f1 = CompletableFuture.supplyAsync(() -> {
            log.info("子线程f1线程start。。。。。。。。。");
            int i = 10 / 2;
            log.info("线程名称:{},线程执行结果:{},", Thread.currentThread().getName(), i);
            log.info("子线程future线程end。。。。。。。。。");
            return i;
        });

        //第二个:CompletionStage
        CompletableFuture f2 = CompletableFuture.supplyAsync(() -> {
            log.info("子线程f2线程start。。。。。。。。。");
            int i = 10 / 3;
            log.info("线程名称:{},线程执行结果:{},", Thread.currentThread().getName(), i);
            log.info("子线程future线程end。。。。。。。。。");
            return i;
        }, threadPool);

        //合并操作
        CompletableFuture thenAcceptBoth = f1.thenAcceptBoth(f2, (t, u) -> {
            log.info("t是第一个执行后的结果:{}", t);
            log.info("u是第二个执行后的结果:{}", u);
        });

        //调用方法
        thenAcceptBoth.get();


        log.info("主线程end..............");


    }
}
3.1.4 聚合-OR 聚合
public  CompletionStage applyToEitherAsync(CompletionStage other,Function fn);

public  CompletionStage applyToEitherAsync(CompletionStage other,Function fn,Executor executor);

public  CompletionStage applyToEitherAsync(CompletionStage other,Function fn);

public  CompletionStage applyToEitherAsync(CompletionStage other,Function fn,Executor executor);

public CompletionStage acceptEither(CompletionStage other,Consumer action

示例代码

@Slf4j
public class SupplyAsyncDemoApplyToEitherAsync {


    
    public static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
            Runtime.getRuntime().availableProcessors(),
            13,
            3,
            TimeUnit.SECONDS,
            new linkedBlockingQueue<>(3),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy()
    );


    public static void main(String[] args) throws ExecutionException, InterruptedException {
        log.info("主线程start........");

        //1.public static  CompletableFuture supplyAsync(Supplier supplier);
        //applyToEitherAsync  针对两个阶段的CompletionStage,将计算速度最快的那个CompletionStage的结果作为下一步处理的消费
        //applyToEitherAsync = applyToEither
        //2.public  CompletionStage applyToEither (CompletionStage other, Function fn);

        //多线程异步编排
        //第一个:CompletionStage
        CompletableFuture f1 = CompletableFuture.supplyAsync(() -> {
            log.info("子线程f1线程start。。。。。。。。。");
            int i = 10 / 2;
            log.info("线程名称:{},线程执行结果:{},", Thread.currentThread().getName(), i);
            log.info("第一个:CompletionStage子线程future线程end。。。。。。。。。");
            return i;
        });

        //第二个:CompletionStage
        CompletableFuture f2 = CompletableFuture.supplyAsync(() -> {
            log.info("子线程f2线程start。。。。。。。。。");
            int i = 10 / 3;
            log.info("线程名称:{},线程执行结果:{},", Thread.currentThread().getName(), i);
            log.info("第二个:CompletionStage子线程future线程end。。。。。。。。。");
            return i;
        }, threadPool);

        //合并操作
        CompletableFuture applyToEither = f1.applyToEither(f2, r -> {
            log.info("有个applyToEither任务正在执行,参数是:{}", r);
            return r;
        });

        //调用方法
        Integer integer = applyToEither.get();
        log.info("最终计算结果:{}",integer);

        log.info("主线程end..............");


    }
}
3.1.5 异常处理

方法接口

public CompletionStage exceptionally(Function fn);

public CompletionStage whenComplete(BiConsumer action);

public CompletionStage whenCompleteAsync(BiConsumer action);

public CompletionStage whenCompleteAsync(BiConsumer action,Executor executor);

public  CompletionStage handle(BiFunction fn);

public  CompletionStage handleAsync(BiFunction fn);

public  CompletionStage handleAsync(BiFunction fn,Executor executor)

代码示例:exceptionally

@Slf4j
public class SupplyAsyncDemoExceptionally {


    
    public static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
            Runtime.getRuntime().availableProcessors(),
            13,
            3,
            TimeUnit.SECONDS,
            new linkedBlockingQueue<>(3),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy()
    );


    public static void main(String[] args) throws ExecutionException, InterruptedException {
        log.info("主线程start........");

        //1.public static  CompletableFuture supplyAsync(Supplier supplier);
        //exceptionally  异常处理
        //2.public CompletionStage exceptionally(Function fn);

        //多线程异步编排
        //第一个:CompletionStage
        CompletableFuture f1 = CompletableFuture.supplyAsync(() -> {
            log.info("子线程f1线程start。。。。。。。。。");
            int i = 10 / 0;
            log.info("线程名称:{},线程执行结果:{},", Thread.currentThread().getName(), i);
            log.info("第一个CompletionStage子线程future线程end。。。。。。。。。");
            return i;
        }).exceptionally((t)->{
            log.info("业务执行失败:{}", t.getMessage());
            return null;
        });

        //调用方法
        Integer integer = f1.get();
        log.info("最终计算结果:{}",integer);

        log.info("主线程end..............");
    }

代码示例:handle

@Slf4j
public class SupplyAsyncDemoHandle {


    
    public static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
            Runtime.getRuntime().availableProcessors(),
            13,
            3,
            TimeUnit.SECONDS,
            new linkedBlockingQueue<>(3),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy()
    );


    public static void main(String[] args) throws ExecutionException, InterruptedException {
        log.info("主线程start........");

        //1.public static  CompletableFuture supplyAsync(Supplier supplier);
        //handle  异常处理 = try{}finally{} :对上一步执行的结果进行处理,还可以处理异常任务
        //2.public  CompletionStage handle (BiFunction fn);

        //多线程异步编排
        //第一个:CompletionStage
        CompletableFuture f = CompletableFuture.supplyAsync(() -> {
            log.info("子线程f1线程start。。。。。。。。。");
            //模拟异常
            int i = 10 / 2;
            log.info("线程名称:{},线程执行结果:{},", Thread.currentThread().getName(), i);
            log.info("第一个CompletionStage子线程future线程end。。。。。。。。。");
            return i;
        }).handle((t,u)->{
            int res = -1;
            if (u!=null){
                log.info("业务执行失败:{}", u.getMessage());
            }else {
                res = t*5;
            }

            return res;
        });

        //调用方法
        Integer integer = f.get();
        log.info("最终计算结果:{}",integer);

        log.info("主线程end..............");


    }
}
四、异步开启

CompletableFuture 提供了 4 个静态的方法,来创建一个异步操作(异步开启: 从这 4 个静态的方法开发即可)

方式一:runAsync: 没有返回值的方法,不关注返回值
RunAsync : 没有使用自定义线程池,默认使用的线程池 ForkJoinPool.commonPool

public static CompletableFuture runAsync(Runnable runnable);

public static CompletableFuture runAsync(Runnable runnable,Executor executor);

方式二: supplyAsync : 有返回值,关注返回值的。

public static  CompletableFuture supplyAsync(Supplier supplier);

public static  CompletableFuture supplyAsync(Supplier supplier,Executor executor

代码示例:runAsync

@Slf4j
public class RunAsyncFutureDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        log.info("主线程start........");

        //第一种方法:runAsync:实现异步编排,没有返回值
        //没有使用自定义线程池,默认使用的线程池 ForkJoinPool.commonPool
        //public static completableFuture runAsync(Runnable runnable)
        CompletableFuture future = CompletableFuture.runAsync(() -> {
            log.info("子线程future线程start。。。。。。。。。");
            int i = 10 / 2;
            log.info("线程名称:{},线程执行结果:{},", Thread.currentThread().getName(), i);
            log.info("子线程future线程end。。。。。。。。。");
        });

        log.info("主线程end..............");

        //调用异步任务
        future.get();
    }
}

代码示例:supplyAsync

@Slf4j
public class SupplyAsyncFutureDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        log.info("主线程start........");

        //第二种方法:SupplyAsync:实现异步编排,有返回值
        //public static completableFuture supplyAsync(supplier supplier)
        CompletableFuture uCompletableFuture = CompletableFuture.supplyAsync(() -> {
            log.info("子线程future线程start。。。。。。。。。");
            int i = 10 / 2;
            log.info("线程名称:{},线程执行结果:{},", Thread.currentThread().getName(), i);
            log.info("子线程future线程end。。。。。。。。。");
            return i;
        });

        //调用异步任务
        Integer integer = uCompletableFuture.get();

        log.info("supplyAsync异步编排的返回值结果:{},", integer);

        log.info("主线程end..............");

    }
}

五、 案例演练

现有商品信息页面,为应对高并发,使用线程池进行异步处理。
代码仅供参考,下方的各个实体类可以依据自己的场景demo自由更换,动动小手,实操一下吧(狗头)。

public SkuItemVo skuItem(Long skuId) {
        //新建一个包装类对象
        SkuItemVo skuItemVo = new SkuItemVo();

        //开启异步编排实现,提升服务性能
        //1.根据skuId 查询 sku基本信息
        CompletableFuture infoFuture = CompletableFuture.supplyAsync(() -> {
            SkuInfoEntity skuInfoEntity = this.getById(skuId);
            skuItemVo.setInfo(skuInfoEntity);
            return skuInfoEntity;
        }, executor);

        //2.根据skuId 查询 sku图片信息(多个图片)
        CompletableFuture imagesFuture = CompletableFuture.runAsync(() -> {
            List imagesList = skuImagesService.list(new QueryWrapper().eq("sku_id", skuId));
            skuItemVo.setImages(imagesList);
        }, executor);


        //3.根据skuId 查询 spu销售属性
        CompletableFuture salesFutrue = infoFuture.thenAcceptAsync((res) -> {
            //获取sku与之对应的spuId
            Long spuId = res.getSpuId();
            List saleAttrVos = skuSaleAttrValueService.getSaleAttrs(spuId);
            if (!CollectionUtils.isEmpty(saleAttrVos)){
                skuItemVo.setAttrSales(saleAttrVos);
            }
        }, executor);


        //4.根据skuId 查询 spu的描述信息
        CompletableFuture desFutrue = infoFuture.thenAcceptAsync((res) -> {
            //获取sku与之对应的spuId
            Long spuId = res.getSpuId();
            SpuInfoDescEntity spuInfoDescEntity = spuInfoDescService.getOne(new QueryWrapper().eq("spu_id",spuId));
            if (null != spuInfoDescEntity){
                skuItemVo.setDesc(spuInfoDescEntity);
            }
        }, executor);


        //5.根据skuId,categoryId 查询 sku分组规格参数属性值
        CompletableFuture groupFutrue = infoFuture.thenAcceptAsync((res) -> {
            //获取sku与之对应的spuId
            Long spuId = res.getSpuId();
            //获取商品分类id
            Long categoryId = res.getCategoryId();
            List attrGroupVos = attrGroupService.getGroupAttr(spuId,categoryId);
            skuItemVo.setAttrGroups(attrGroupVos);
            if (CollectionUtils.isEmpty(attrGroupVos)){
                skuItemVo.setAttrGroups(attrGroupVos);
            }
        }, executor);

        //allOf: 等待所有的任务完成后,才返回结果
        try {
            CompletableFuture.allOf(infoFuture,imagesFuture,salesFutrue,desFutrue,groupFutrue).get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        return skuItemVo;
    }

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号