栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

京东零售多线程并行框架 - AsyncTool

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

京东零售多线程并行框架 - AsyncTool

文章目录
  • 京东零售多线程并行框架 - AsyncTool
    • 一、基本入门
      • 1 首发案例(并发执行)
        • 第一步:获取源码
        • 第二步:构建任务类
        • 第三步:执行
        • 总结
      • 2 案例二:获取上一个任务的执行结果作为自身的入参
      • 3 案例三:如果上一个任务出现了异常
      • 4 案例四:完全异步
        • 玩法一
        • 玩法二
    • 二、源码解析
    • 持续更新中
  • 万事如意,阖家安康

京东零售多线程并行框架 - AsyncTool
文章目录
QuickStart.md · 京东零售/asyncTool - Gitee.com
/
/

在平常工作中经常会习惯使用传统的CompleteableFuture完成任务编排工作,但是如果出现了异常或者其他无法预估的情况都是无法捕捉的,在码云上看了这位老师写的并行框架,还挺有趣,拉过来学习以下

一、基本入门

在Java的并行框架中,已经有CompleteableFuture为我们解决基本的异步并行问题,而京东零售的AsyncTool也是封装了上面这个框架,我觉得很强的地方在于,他能够捕捉一些异常,不过学习内部原理之前,我先学会怎么玩

1 首发案例(并发执行) 第一步:获取源码
git clone https://gitee.com/jd-platform-opensource/asyncTool.git
第二步:构建任务类

任务A

public class TestA implements IWorker, ICallback {
    @Override
    public void begin() {
        System.out.println(">>>>>>> TestA任务开始begin方法执行中...");
    }

    @Override
    public void result(boolean success, String param, WorkResult workResult) {
        System.out.println("TestA - result方法执行中————————————————————————");
        System.out.println("success=" + success);
        System.out.println("param=" + param);
        System.out.println("workResult=" + workResult);
        System.out.println("result方法执行结束————————————————————————");
        System.out.println();
        System.out.println();
    }

    @Override
    public String action(String object, Map allWrappers) {
        System.out.println("TestA - 需要执行的任务...action方法...");
        System.out.println(object + ":这个是object入参");
        return "TestA";
    }
}

任务C

public class TestC implements IWorker, ICallback {

    @Override
    public void begin() {
        System.out.println(">>>>>>> TestC任务开始begin方法执行中...");
    }

    @Override
    public void result(boolean success, String param, WorkResult workResult) {
        System.out.println("TestC - result方法执行中————————————————————————");
        System.out.println("success=" + success);
        System.out.println("param=" + param);
        System.out.println("workResult=" + workResult);
        System.out.println("result方法执行结束————————————————————————");
        System.out.println();
        System.out.println();
    }

    @Override
    public String action(String object, Map allWrappers) {
        System.out.println("TestC - 需要执行的任务...action方法...");
        System.out.println(object + ":这个是object入参");
        return "TestC";
    }
}
第三步:执行
public class DemoApplication {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        // 线程池
        ThreadPoolExecutor threadPoolExecutor =
                new ThreadPoolExecutor(3
                        , 5
                        , 30
                        , TimeUnit.SEConDS
                        , new ArrayBlockingQueue(5));

        // 任务类
        TestA testA = new TestA();
        TestC testC = new TestC();

        // 任务流程构建
        WorkerWrapper stringStringWorkerWrapperC =
                new WorkerWrapper.Builder()
                        .worker(testC)
                        .callback(testC)
                        .param("我是paramC")
                        .build();

        WorkerWrapper stringStringWorkerWrapperA =
                new WorkerWrapper.Builder()
                        .worker(testA)
                        .callback(testA)
                        .param("我是paramA")
                        .build();

        // 任务执行 并发执行两个任务
        Async.beginWork(1, threadPoolExecutor, stringStringWorkerWrapperA, stringStringWorkerWrapperC);

        // 任务关闭
        Async.shutDown();
    }
}

执行效果

总结
  1. 构建任务类,继承接口IWorker和ICallback

    IWorker
    T,V两个泛型,分别是入参和出参类型。

    一个最小的任务执行单元。通常是一个网络调用,或一段耗时操作。

    ICallback

    对每个worker的回调。worker执行完毕后,会回调该接口,带着执行成功、失败、原始入参、和详细的结果。

  2. 任务执行顺序编排,可以通过next指定下一个跑起来的任务

2 案例二:获取上一个任务的执行结果作为自身的入参

主要有两个关键的地方

  1. 每个任务自己要有一个标识id

  2. 通过在任务类中的形参allWrappers获取返回参数

最后的结果就是

并且可以在*Async.shutDown();*后获取到前面任务的返回结果

String result = stringStringWorkerWrapperA.getWorkResult().getResult();
3 案例三:如果上一个任务出现了异常

可以通过重写defaultValue()方法来

你学会废了吗

4 案例四:完全异步

两种玩法,第一种是我写的很蠢的用线程池实现异步,第二种是用框架里面的beginWorkAsync()方法实现纯异步

玩法一

利用线程池抓出一个线程出来跑异步

public class DemoApplication {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        // 线程池
        ThreadPoolExecutor threadPoolExecutor =
                new ThreadPoolExecutor(3
                        , 5
                        , 30
                        , TimeUnit.SEConDS
                        , new ArrayBlockingQueue(5));

        // 任务类
        TestC testC = new TestC();

        // 任务流程构建
        WorkerWrapper stringStringWorkerWrapperC =
                new WorkerWrapper.Builder()
                        .worker(testC)
                        .callback(testC)
                        .param("我是paramC")
                        .id("C")
                        .build();

        // 异步线程执行
        threadPoolExecutor.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Async.beginWork(1500, threadPoolExecutor, stringStringWorkerWrapperC);
                } catch (ExecutionException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    Async.shutDown();
                }
            }
        });
        System.out.println("让我看看是不是全异步...............");
    }
}
玩法二

构建两个类

  1. 基本任务类

    public class TestB implements IWorker, ICallback {
        @Override
        public String action(String object, Map allWrappers) {
            System.out.println("...action...");
            return "action...";
        }
    
        @Override
        public void result(boolean success, String param, WorkResult workResult) {
            System.out.println("...result...");
        }
    }
    
  2. 异步回调结果类

    public class TestAsyncCallback implements IGroupCallback {
        @Override
        public void success(List workerWrappers) {
            System.out.println("回调执行 -----  ");
            for (WorkerWrapper workerWrapper : workerWrappers) {
                System.out.println(workerWrapper.getWorkResult());
            }
            System.out.println("回调执行 ----- success ");
            Async.shutDown();
        }
    
        @Override
        public void failure(List workerWrappers, Exception e) {
            System.out.println("回调执行 -----  ");
            for (WorkerWrapper workerWrapper : workerWrappers) {
                System.out.println(workerWrapper.getWorkResult());
            }
            System.out.println("回调执行 ----- failure ");
            Async.shutDown();
        }
    }
    
  3. 执行,查看结果

    public class RunDemo {
        public static void main(String[] args) {
    
            // 线程池
            ThreadPoolExecutor threadPoolExecutor =
                    new ThreadPoolExecutor(3
                            , 5
                            , 30
                            , TimeUnit.SEConDS
                            , new ArrayBlockingQueue(5));
    
            // 任务构建
            TestB testB = new TestB();
            WorkerWrapper workerWrapper = new WorkerWrapper.Builder()
                    .worker(testB)
                    .callback(testB)
                    .param("我是参数")
                    .build();
    
            // 异步执行的回调类
            TestAsyncCallback testAsyncCallback = new TestAsyncCallback();
    
            // 任务执行
            Async.beginWorkAsync(1500, threadPoolExecutor, testAsyncCallback, workerWrapper);
    
            System.out.println("让我来验证一下是不是真的异步");
        }
    }
    

结果图如下

你学废了吗

​ 我感觉学到这里已经可以去基本的使用了,但是我突然有个构思,我们是不是可以去把这些任务执行的结果放在缓存里面,通过定时任务框架每天或者每周把任务执行的结果存到数据库进行可视化的查阅呢?

二、源码解析
文章目录
手写中间件之——并发框架_tianyaleixiaowu的专栏-CSDN博客

代码的原作者在CSDN上也写了文章对这个框架进行解释,但是我想自己先学习琢磨琢磨

1

持续更新中

1

万事如意,阖家安康
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/691621.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号