CompletableFuture是一个异步编程工具类
简单使用
CompletableFuturecompletableFuture = new CompletableFuture<>(); new Thread(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } //保存结果 completableFuture.complete("hello world"); }).start(); //获取结果 String result = completableFuture.get(); System.out.println("输出结果:" + result);
CompletableFuture实现了Future接口,所以它调用get()方法的时候会阻塞在那, 直到结果返回。
runAsync方法 //runAsync传入一个runnable,返回一个CompletableFuture
CompletableFuture voidCompletableFuture = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(2000);
System.out.println("线程执行完毕");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 等待线程执行完毕
System.out.println(voidCompletableFuture.get());
System.out.println("程序运行结束");
runAsync()方法的实现原理
public static CompletableFuturerunAsync(Runnable runnable) { return asyncRunStage(asyncPool, runnable); } //判断使用ForkJoinPool还是每过来一个任务创建一个线程执行,这里asyncPool是一个ForkJoinPool private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); //common在ForkJoinPool静态代码块中被初始化 public static ForkJoinPool commonPool() { // assert common != null : "static init error"; return common; }
//e是一个ForkJoinPool
static CompletableFuture asyncRunStage(Executor e, Runnable f) {
if (f == null) throw new NullPointerException();
CompletableFuture d = new CompletableFuture();
//封装成AsyncRun,AsyncRun继承了ForkJoinTask,实现了runnable接口,然后执行ForkJoinPool的execute
//如果是supplyAsync这里会封装成一个AsyncSupply
//thenRun封装成UniRun,thenAccept封装成UniAccept,thenApply封装成UniApply然后放到前一个任务的栈里,即后一个任务会被放进前一个任务的CompletableFuture的栈中:unipush(new UniRun...)
e.execute(new AsyncRun(d, f));
return d;
}
AsyncRun(CompletableFuture dep, Runnable fn) {
this.dep = dep; this.fn = fn;
}
public void execute(Runnable task) {
if (task == null)
throw new NullPointerException();
ForkJoinTask> job;
//传入的是一个AsyncRun,AsyncRun继承了ForkJoinTask
if (task instanceof ForkJoinTask>) // avoid re-wrap
job = (ForkJoinTask>) task;
else
job = new ForkJoinTask.RunnableExecuteAction(task);
//job被转换成ForkJoinTask
externalPush(job);
}
final void externalPush(ForkJoinTask> task) {
WorkQueue[] ws; WorkQueue q; int m;
//获得当前线程的探针
int r = ThreadLocalRandom.getProbe();
int rs = runState;
//不会进if
if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
(q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
U.compareAndSwapInt(q, QLOCK, 0, 1)) {
ForkJoinTask>[] a; int am, n, s;
if ((a = q.array) != null &&
(am = a.length - 1) > (n = (s = q.top) - q.base)) {
int j = ((am & s) << ASHIFT) + Abase;
U.putOrderedObject(a, j, task);
U.putOrderedInt(q, QTOP, s + 1);
U.putIntVolatile(q, QLOCK, 0);
if (n <= 1)
signalWork(ws, q);
return;
}
U.compareAndSwapInt(q, QLOCK, 1, 0);
}
//task入队列并执行
externalSubmit(task);
}
private void externalSubmit(ForkJoinTask> task) {
int r; // initialize caller's probe
//如果probe没有初始化,则初始化
if ((r = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit();
r = ThreadLocalRandom.getProbe();
}
for (;;) {
WorkQueue[] ws; WorkQueue q; int rs, m, k;
boolean move = false;
//判断ForkJoinPool的状态
if ((rs = runState) < 0) {
tryTerminate(false, false); // help terminate
throw new RejectedExecutionException();
}
else if ((rs & STARTED) == 0 || // initialize
((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
int ns = 0;
rs = lockRunState();
try {
//初始化workQueues
if ((rs & STARTED) == 0) {
U.compareAndSwapObject(this, STEALCOUNTER, null,
new AtomicLong());
// create workQueues array with size a power of two
int p = config & SMASK; // ensure at least 2 slots
int n = (p > 1) ? p - 1 : 1;
n |= n >>> 1; n |= n >>> 2; n |= n >>> 4;
n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
workQueues = new WorkQueue[n];
ns = STARTED;
}
} finally {
unlockRunState(rs, (rs & ~RSLOCK) | ns);
}
}
else if ((q = ws[k = r & m & SQMASK]) != null) {
//加锁
if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
//获得队列中的ForkJoinTask数组
ForkJoinTask>[] a = q.array;
int s = q.top;
boolean submitted = false; // initial submission or resizing
try { // locked version of push
if ((a != null && a.length > s + 1 - q.base) ||
(a = q.growArray()) != null) {
int j = (((a.length - 1) & s) << ASHIFT) + Abase;
//cas把task加入到ForkJoinTask数组中
U.putOrderedObject(a, j, task);
//更新下一个加入的task的数组索引
U.putOrderedInt(q, QTOP, s + 1);
submitted = true;
}
} finally {
//解锁
U.compareAndSwapInt(q, QLOCK, 1, 0);
}
if (submitted) {
//通知执行task
signalWork(ws, q);
return;
}
}
move = true; // move on failure
}
else if (((rs = runState) & RSLOCK) == 0) { // create new queue
q = new WorkQueue(this, null);
q.hint = r;
q.config = k | SHARED_QUEUE;
q.scanState = INACTIVE;
rs = lockRunState(); // publish index
if (rs > 0 && (ws = workQueues) != null &&
k < ws.length && ws[k] == null)
ws[k] = q; // else terminated
unlockRunState(rs, rs & ~RSLOCK);
}
else
move = true; // move if busy
if (move)
r = ThreadLocalRandom.advanceProbe(r);
}
}
//task已经添加到了workqueue中
final void signalWork(WorkQueue[] ws, WorkQueue q) {
long c; int sp, i; WorkQueue v; Thread p;
while ((c = ctl) < 0L) { // too few active
if ((sp = (int)c) == 0) { // no idle workers
if ((c & ADD_WORKER) != 0L) // too few workers
//添加工作线程
tryAddWorker(c);
break;
}
if (ws == null) // unstarted/terminated
break;
if (ws.length <= (i = sp & SMASK)) // terminated
break;
if ((v = ws[i]) == null) // terminating
break;
int vs = (sp + SS_SEQ) & ~INACTIVE; // next scanState
int d = sp - v.scanState; // screen CAS
long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
v.scanState = vs; // activate v
if ((p = v.parker) != null)
U.unpark(p);
break;
}
if (q != null && q.base == q.top) // no more work
break;
}
}
private void tryAddWorker(long c) {
boolean add = false;
do {
long nc = ((AC_MASK & (c + AC_UNIT)) |
(TC_MASK & (c + TC_UNIT)));
if (ctl == c) {
int rs, stop; // check if terminating
if ((stop = (rs = lockRunState()) & STOP) == 0)
add = U.compareAndSwapLong(this, CTL, c, nc);
unlockRunState(rs, rs & ~RSLOCK);
if (stop != 0)
break;
if (add) {
//添加worker
createWorker();
break;
}
}
} while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
}
private boolean createWorker() {
//factory在静态代码块中被初始化
ForkJoinWorkerThreadFactory fac = factory;
Throwable ex = null;
ForkJoinWorkerThread wt = null;
try {
//wt = ForkJoinWorkerThread,ForkJoinWorkerThread继承Thread
if (fac != null && (wt = fac.newThread(this)) != null) {
//执行ForkJoinWorkerThread的start方法即执行它的run方法
wt.start();
return true;
}
} catch (Throwable rex) {
ex = rex;
}
deregisterWorker(wt, ex);
return false;
}
public void run() {
if (workQueue.array == null) { // only run once
Throwable exception = null;
try {
onStart();
//执行task
pool.runWorker(workQueue);
} catch (Throwable ex) {
exception = ex;
} finally {
try {
onTermination(exception);
} catch (Throwable ex) {
if (exception == null)
exception = ex;
} finally {
pool.deregisterWorker(this, exception);
}
}
}
}
final void runWorker(WorkQueue w) {
w.growArray(); // allocate queue
int seed = w.hint; // initially holds randomization hint
int r = (seed == 0) ? 1 : seed; // avoid 0 for xorShift
for (ForkJoinTask> t;;) {
//获得task
if ((t = scan(w, r)) != null)
//执行task
w.runTask(t);
else if (!awaitWork(w, r))
break;
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
}
}
final void runTask(ForkJoinTask> task) {
if (task != null) {
scanState &= ~SCANNING; // mark as busy
//执行task
(currentSteal = task).doExec();
U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC
execLocalTasks();
ForkJoinWorkerThread thread = owner;
if (++nsteals < 0) // collect on overflow
transferStealCount(pool);
scanState |= SCANNING;
if (thread != null)
thread.afterTopLevelExec();
}
}
final int doExec() {
int s; boolean completed;
if ((s = status) >= 0) {
try {
//ForkJoinTask一开始被传入的他的子类AsyncRun,所以这里执行AsyncRun的exec方法
completed = exec();
} catch (Throwable rex) {
return setExceptionalCompletion(rex);
}
if (completed)
s = setCompletion(NORMAL);
}
return s;
}
public final boolean exec() {
run();
return true;
}
public void run() {
CompletableFuture d; Runnable f;
if ((d = dep) != null && (f = fn) != null) {
dep = null; fn = null;
if (d.result == null) {
try {
//执行最开始自定义的runnable
f.run();
d.completeNull();
} catch (Throwable ex) {
d.completeThrowable(ex);
}
}
//thenRun/thenAccept/thenApply这种有后续的方法的话,会将压入栈中任务出栈执行
d.postComplete();
}
}
supplyAsync方法
//传入的是一个Supplier,表示他有返回值
CompletableFuture future = CompletableFuture.supplyAsync(new Supplier() {
@Override
public String get() {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "返回结果";
}
});
//获取返回值
String result = future.get();
System.out.println("任务执行结果:" + result);
supplyAsync原理和runAsync类似,只是在forkjoinpool.execute的时候传入的是AsyncSupply,所以后面执行的是
AsyncSupply的exec方法
public final boolean exec() { run(); return true; }
public void run() {
CompletableFuture d; Supplier f;
//获得最开始声明的CompletableFuture
if ((d = dep) != null && (f = fn) != null) {
dep = null; fn = null;
if (d.result == null) {
try {
//将Supplier执行后的值放到CompletableFuture中,CompletableFuture通过get方法获取
d.completevalue(f.get());
} catch (Throwable ex) {
d.completeThrowable(ex);
}
}
d.postComplete();
}
}
thenRun
CompletableFuture voidCompletableFuture = CompletableFuture.supplyAsync(() -> {
try {
System.out.println("supplyAsync run");
Thread.sleep(2000);
System.out.println("supplyAsync run end");
} catch (InterruptedException e) {
e.printStackTrace();
}
return "返回结果";
}).thenRun(() -> {
try {
System.out.println("thenRun run");
Thread.sleep(3000);
System.out.println("thenRun run end");
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务执行结束之后执行的语句");
});
// 阻塞等待任务执行完成
System.out.println("阻塞");
voidCompletableFuture.get();
System.out.println("任务执行结束");
thenAccept(Consumer)
CompletableFuturethenApply(Function)future = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("supplyAsync执行完毕"); return "返回的结果"; }).thenAccept(new Consumer () { //参数为前面supplyAsync的结果 @Override public void accept(String param) { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("输出结果:" + param); } }); // 等待任务执行完成 future.get(); System.out.println("任务执行完毕");
CompletableFuturethenComposefuture = CompletableFuture.supplyAsync(()-> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } //返回supplyAsync结果; return "返回supplyAsync的结果"; }).thenApply(new Function () { //参数为前面supplyAsync的结果 @Override public String apply(String middle) { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } //获取supplyAsync结果,返回"); return middle+"再执行thenApply后返回"; } }); String str = future.get(); System.out.println("最终的结果为:" + str);
CompletableFuturethenCombinefuture = CompletableFuture.supplyAsync(new Supplier () { @Override public String get() { return "hello world"; } }).thenCompose(new Function >() { @Override public CompletionStage apply(String s) { return CompletableFuture.supplyAsync(new Supplier () { @Override public String get() { String finalStr = s + "执行thenCompose中的supplyAsync然后返回"; return finalStr; } }); } }); String str = future.get(); System.out.println(str);
CompletableFutureallOf 和anyOffuture = CompletableFuture.supplyAsync(new Supplier () { @Override public String get() { return "hello"; } //thenCombine的第二个参数是一个BiFunction,对CompletableFuture的结果进行统一处理 }).thenCombine(CompletableFuture.supplyAsync(new Supplier () { @Override public String get() { return "word"; } //获得上面两个supplyAsync的返回结果并处理 }), new BiFunction () { @Override public String apply(String s1, String s2) { return s1 +" "+ s2; } }); String result = future.get(); System.out.println(result);
thenCompose和thenCombine方法只能组合2个CompletableFuture,allOf 和anyOf 可以组合多个CompletableFuture
Random RANDOM = new Random();
CompletableFuture[] futures = new CompletableFuture[10];
int[] ints = new int[10];
for (int i = 0; i < 10; i++) {
final int finalI = i;
CompletableFuture future = CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
try {
//随机一个数并sleep
int intNext = RANDOM.nextInt(5000);
Thread.sleep(intNext);
//随机数放入数组
ints[finalI] = intNext;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
//future放入数组
futures[i] = future;
}
CompletableFuture.allOf(futures).thenRun(new Runnable() {
// CompletableFuture.anyOf(futures).thenRun(new Runnable() {
@Override
public void run() {
System.out.println(Arrays.toString(ints));
}
}).get();



