- 京东零售多线程并行框架 - AsyncTool
- 一、基本入门
- 1 首发案例(并发执行)
- 第一步:获取源码
- 第二步:构建任务类
- 第三步:执行
- 总结
- 2 案例二:获取上一个任务的执行结果作为自身的入参
- 3 案例三:如果上一个任务出现了异常
- 4 案例四:完全异步
- 玩法一
- 玩法二
- 二、源码解析
- 持续更新中
- 万事如意,阖家安康
| 文章目录 |
|---|
| QuickStart.md · 京东零售/asyncTool - Gitee.com |
| / |
| / |
在平常工作中经常会习惯使用传统的CompleteableFuture完成任务编排工作,但是如果出现了异常或者其他无法预估的情况都是无法捕捉的,在码云上看了这位老师写的并行框架,还挺有趣,拉过来学习以下
一、基本入门在Java的并行框架中,已经有CompleteableFuture为我们解决基本的异步并行问题,而京东零售的AsyncTool也是封装了上面这个框架,我觉得很强的地方在于,他能够捕捉一些异常,不过学习内部原理之前,我先学会怎么玩
1 首发案例(并发执行) 第一步:获取源码git clone https://gitee.com/jd-platform-opensource/asyncTool.git第二步:构建任务类
任务A
public class TestA implements IWorker, ICallback { @Override public void begin() { System.out.println(">>>>>>> TestA任务开始begin方法执行中..."); } @Override public void result(boolean success, String param, WorkResult workResult) { System.out.println("TestA - result方法执行中————————————————————————"); System.out.println("success=" + success); System.out.println("param=" + param); System.out.println("workResult=" + workResult); System.out.println("result方法执行结束————————————————————————"); System.out.println(); System.out.println(); } @Override public String action(String object, Map allWrappers) { System.out.println("TestA - 需要执行的任务...action方法..."); System.out.println(object + ":这个是object入参"); return "TestA"; } }
任务C
public class TestC implements IWorker第三步:执行, ICallback { @Override public void begin() { System.out.println(">>>>>>> TestC任务开始begin方法执行中..."); } @Override public void result(boolean success, String param, WorkResult workResult) { System.out.println("TestC - result方法执行中————————————————————————"); System.out.println("success=" + success); System.out.println("param=" + param); System.out.println("workResult=" + workResult); System.out.println("result方法执行结束————————————————————————"); System.out.println(); System.out.println(); } @Override public String action(String object, Map allWrappers) { System.out.println("TestC - 需要执行的任务...action方法..."); System.out.println(object + ":这个是object入参"); return "TestC"; } }
public class DemoApplication {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 线程池
ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(3
, 5
, 30
, TimeUnit.SEConDS
, new ArrayBlockingQueue(5));
// 任务类
TestA testA = new TestA();
TestC testC = new TestC();
// 任务流程构建
WorkerWrapper stringStringWorkerWrapperC =
new WorkerWrapper.Builder()
.worker(testC)
.callback(testC)
.param("我是paramC")
.build();
WorkerWrapper stringStringWorkerWrapperA =
new WorkerWrapper.Builder()
.worker(testA)
.callback(testA)
.param("我是paramA")
.build();
// 任务执行 并发执行两个任务
Async.beginWork(1, threadPoolExecutor, stringStringWorkerWrapperA, stringStringWorkerWrapperC);
// 任务关闭
Async.shutDown();
}
}
执行效果
总结-
构建任务类,继承接口IWorker和ICallback
IWorker
T,V两个泛型,分别是入参和出参类型。一个最小的任务执行单元。通常是一个网络调用,或一段耗时操作。
ICallback
对每个worker的回调。worker执行完毕后,会回调该接口,带着执行成功、失败、原始入参、和详细的结果。
-
任务执行顺序编排,可以通过next指定下一个跑起来的任务
主要有两个关键的地方
-
每个任务自己要有一个标识id
-
通过在任务类中的形参allWrappers获取返回参数
最后的结果就是
并且可以在*Async.shutDown();*后获取到前面任务的返回结果
String result = stringStringWorkerWrapperA.getWorkResult().getResult();3 案例三:如果上一个任务出现了异常
可以通过重写defaultValue()方法来
你学会废了吗
4 案例四:完全异步两种玩法,第一种是我写的很蠢的用线程池实现异步,第二种是用框架里面的beginWorkAsync()方法实现纯异步
玩法一利用线程池抓出一个线程出来跑异步
public class DemoApplication {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 线程池
ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(3
, 5
, 30
, TimeUnit.SEConDS
, new ArrayBlockingQueue(5));
// 任务类
TestC testC = new TestC();
// 任务流程构建
WorkerWrapper stringStringWorkerWrapperC =
new WorkerWrapper.Builder()
.worker(testC)
.callback(testC)
.param("我是paramC")
.id("C")
.build();
// 异步线程执行
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
try {
Async.beginWork(1500, threadPoolExecutor, stringStringWorkerWrapperC);
} catch (ExecutionException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
Async.shutDown();
}
}
});
System.out.println("让我看看是不是全异步...............");
}
}
玩法二
构建两个类
-
基本任务类
public class TestB implements IWorker
, ICallback { @Override public String action(String object, Map allWrappers) { System.out.println("...action..."); return "action..."; } @Override public void result(boolean success, String param, WorkResult workResult) { System.out.println("...result..."); } } -
异步回调结果类
public class TestAsyncCallback implements IGroupCallback { @Override public void success(ListworkerWrappers) { System.out.println("回调执行 ----- "); for (WorkerWrapper workerWrapper : workerWrappers) { System.out.println(workerWrapper.getWorkResult()); } System.out.println("回调执行 ----- success "); Async.shutDown(); } @Override public void failure(List workerWrappers, Exception e) { System.out.println("回调执行 ----- "); for (WorkerWrapper workerWrapper : workerWrappers) { System.out.println(workerWrapper.getWorkResult()); } System.out.println("回调执行 ----- failure "); Async.shutDown(); } } -
执行,查看结果
public class RunDemo { public static void main(String[] args) { // 线程池 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3 , 5 , 30 , TimeUnit.SEConDS , new ArrayBlockingQueue(5)); // 任务构建 TestB testB = new TestB(); WorkerWrapper workerWrapper = new WorkerWrapper.Builder () .worker(testB) .callback(testB) .param("我是参数") .build(); // 异步执行的回调类 TestAsyncCallback testAsyncCallback = new TestAsyncCallback(); // 任务执行 Async.beginWorkAsync(1500, threadPoolExecutor, testAsyncCallback, workerWrapper); System.out.println("让我来验证一下是不是真的异步"); } }
结果图如下
你学废了吗
二、源码解析 我感觉学到这里已经可以去基本的使用了,但是我突然有个构思,我们是不是可以去把这些任务执行的结果放在缓存里面,通过定时任务框架每天或者每周把任务执行的结果存到数据库进行可视化的查阅呢?
| 文章目录 |
|---|
| 手写中间件之——并发框架_tianyaleixiaowu的专栏-CSDN博客 |
代码的原作者在CSDN上也写了文章对这个框架进行解释,但是我想自己先学习琢磨琢磨
1
持续更新中1
万事如意,阖家安康


