目录
一、线程回顾
1.1、初始化线程
二、线程池详解
2.1、线程池的 7 大参数
2.2、运行流程
2.3、常见的 4 种线程池
2.4、线程池优点
三、异步编排
3.1、业务场景
3.2、创建异步对象
3.3、计算完成时回调方法
3.4、线程串行方法
3.5、 两任务组合 - 都要完成
3.6、两任务组合 - 一个完成
3.7、多任务组合
一、线程回顾
1.1、初始化线程
初始化线程的 4 种方式
1、继承 Thread 2、实现 Runnable 3、实现 Callable 接口 + FutureTask(可以拿到返回结果,可以处理异常) 4、线程池
方式一:继承 Thread
主进程无法获取线程的运算结果,不适合当前场景
public class ThreadTest {
public static void main(String[] args) {
System.out.println("start++++++++");
Thread thread_01 = new Thread_01();
thread_01.start();
System.out.println("end--------");
}
public static class Thread_01 extends Thread {
@Override
public void run() {
System.out.println("当前线程" + Thread.currentThread().getId());
for (int i = 0; i < 5; i++) {
System.out.println("运行结果" + i);
}
}
}
}
方式二:实现 Runnable
主进程无法获取线程的运算结果,不适合当前场景
public class ThreadTest {
public static void main(String[] args) {
System.out.println("start++++++++");
Runnable runnable_01 = new Runnable_01();
new Thread(runnable_01).start();
System.out.println("end--------");
}
public static class Runnable_01 implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
System.out.println("运行结果" + i);
}
}
}
}
方式三:实现 Callable 接口 + FutureTask(可以拿到返回结果,可以处理异常)
主进程可以获取当前线程的运算结果,但是不利于控制服务器种的线程资源,可以导致服务器资源耗尽
public class ThreadTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("start++++++++");
FutureTask futureTask = new FutureTask<>(new Callable_01());
//阻塞等待整个线程执行完成
new Thread(futureTask).start();
Object o = futureTask.get();//获取返回结果
System.out.println("end--------");
System.out.println(o);//打印返回结果
}
public static class Callable_01 implements Callable {
@Override
public Object call() throws Exception {
for (int i = 0; i < 10; i++) {
System.out.println("运行结果" + i);
}
return "返回+结果";
}
}
}
方式四:使用线程池
可以控制资源,性能稳定。
Executors.newFixedThreadPool(3);
public class ThreadTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("start++++++++");
ExecutorService executorService = Executors.newFixedThreadPool(3);
executorService.execute(new Runnable_01());
System.out.println("end--------");
}
public static class Runnable_01 implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
System.out.println("Runnable-运行结果:" + i);
}
}
}
}
new ThreadPollExecutor(corePoolSize,maximumPoolSize,keepAliveTime,TimeUnit,unit,workQueue,threadFactory,handler);
通过线程池性能稳定,也可以获取执行结果,并捕获异常,但是,在业务复杂情况下,一个异步调用可能会依赖另一个异步调用的执行结果。
二、线程池详解
2.1、线程池的 7 大参数
1.corePoolSize:核心线程数
corePoolSize the number of threads to keep in the pool, eve if they are idle, unless is set {@code allowCoreThreadTimeOut}
corePoolSize 保留在池中的线程数,即使它们是空闲的,除非设置 {@code allowCoreThreadTimeOut}
2.maximumPoolSize:最大线程数,控制资源
maximumPoolSize the maximum number of threads to allow in the pool
maximumPoolSize 池中允许的最大线程数
3.keepAliveTime:存活时间
keepAliveTime when the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminating.
keepAliveTime 当线程数大于核心时,这是多余的空闲线程在终止前等待新任务的最长时间。
4.unit:单位时间
unit the time unit for the {@code keepAliveTime} argument
unit {@code keepAliveTime} 参数的时间单位
5.workQueue:阻塞队列
workQueue the queue to use for holding tasks before they are executed. This queue will hold only the {@code Runnable} asks submitted by the {@code execute} method.
workQueue 用于在执行任务之前保存任务的队列,只要有空闲线程,就会去队列中取出新任务。
此队列将仅保存由 {@code execute} 方法提交的 {@code Runnable} 请求。
6.threadFactory:线程的创建工厂
threadFactory the factory to use when the executor creates a new thread
threadFactory 执行器创建新线程时使用的工厂
7.handler:拒绝策略
handler the handler to use when execution is blocked because the thread bounds and queue capacities are reached
处理程序执行被阻塞时使用的处理程序,因为达到了线程边界和队列容量
2.2、运行流程
1、线程池创建,准备好 core 数量 的核心线程,准备接受任务
2、新的任务进来,用 core 准备好的空闲线程执行
core 满了,就将再进来的任务放入阻塞队列中,空闲的 core 就会自己去阻塞队列获取任务执行阻塞队列也满了,就直接开新线程去执行,最大只能开到 max 指定的数量max 都执行好了,Max-core 数量空闲的线程会在 keepAliveTime 指定的时间后自动销毁,终保持到 core 大小如果线程数开到了 max 数量,还有新的任务进来,就会使用 reject 指定的拒绝策略进行处理
3、所有的线程创建都是由指定的 factory 创建的
案例:一个线程池 core 7、max 20 ,queue 50 ,100个并发进来怎么分配的 ? 1.先有 7 个能直接得到运行,接下来 50 个进入队列排队; 2.再多开 13 个继续执行,(线程队列50+线程20) 70个被安排上了; 3.剩下30个默认拒绝策略(直接抛弃)。
2.3、常见的 4 种线程池
1.newCacheThreadPool
创建一个可缓存的线程池,如果线程池长度超过需要,可灵活回收空闲线程,若无可回收,则新建线程
2.newFixedThreadPool
创建一个指定长度的线程池,可控制线程最大并发数,超出的线程会再队列中等待
3.newScheduleThreadPool
创建一个定长线程池,支持定时及周期性任务执行
4.newSingleThreadExecutor
创建一个单线程化的线程池,她只会用唯一的工作线程来执行任务,保证所有任务
2.4、线程池优点
降低资源的消耗
通过重复利用已创建好的线程降低线程的创建和销毁带来的损耗。提高响应速度
因为线程池中的线程没有超过线程池的最大上限时,有的线程处于等待分配任务的状态,当任务来时无需创建新的线程就能执行。提高线程的客观理性
线程池会根据当前系统的特点对池内的线程进行优化处理,减少创建和销毁线程带来的系统开销,无限的创建和销毁线程不仅消耗系统资源,还降低系统的稳定性,使用线程池进行统一分配。
三、异步编排
3.1、业务场景
CompletableFuture 异步编排的使用场景:查询商品详情页逻辑比较复杂,有些数据还需要远程调用,必然需要花费更多的时间。
假如商品详情页的每个查询,需要如下标注时间才能完成 那么,用户需要5.5s后才能看到商品相详情页的内容,很显然是不能接受的 如果有多个线程同时完成这 6 步操作,也许只需要 1.5s 即可完成响应
3.2、创建异步对象
CompletableFuture 提供了四个静态方法来创建一个异步操作:
public static CompletableFuturerunAsync(Runnable runnable); public static CompletableFuture runAsync(Runnable runnable, Executor executor); public static CompletableFuture supplyAsync(Supplier supplier); public static CompletableFuture supplyAsync(Supplier supplier, Executor executor);
1、runXxx 都是没有返回结果的,supplyXxxx都是可以获取返回结果的2、可以传入自定义的线程池,否则就是用默认的线程池3、根据方法的返回类型来判断该方法是否有返回类型
3.3、计算完成时回调方法
public CompletableFuture whenComplete(BiConsumer super T,? super Throwable> action);
public CompletableFuture whenCompleteAsync(BiConsumer super T,? super Throwable> action);
public CompletableFuture whenCompleteAsync(BiConsumer super T,? super Throwable> action, Executor executor);
public CompletableFuture exceptionally(Function fn);
whenComplete 和 whenCompleteAsync 的区别:
whenComplete 可以处理正常和异常的计算结果,exceptionally 处理异常情况 whenComplete :是执行当前任务的线程继续执行 whencomplete 的任务 whenCompleteAsync: 是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行
handle 方法和complete 一样,可以对结果做最后的处理(可处理异常),可改变返回值
package com.firefly.fireflymall.elasticsearch.thread;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main....start.....");
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 0;
System.out.println("10 / 2运行结果:" + i);
return i;
}).whenComplete((res, ex) -> {
//虽然能感知异常,但是无法修改返回数据
System.out.println("whenComplete---异步执行结果:" + res + "异常是" + ex);
}).exceptionally(throwable -> {
System.out.println("exceptionally---");
//可以感知异常,可以修改返回数据
return 10;
}).handle((res, thr) -> {
//可以感知异常,可以修改返回数据
System.out.println("handle---");
if (res != null) {
return res * 5;
}
if (thr != null) {
return 10;
}
return 5;
});
Integer integer = future.get();
System.out.println("main....end....."+integer);
}
}
3.4、线程串行方法
public CompletableFuture thenApply(Function super T,? extends U> fn)
public CompletableFuture thenApplyAsync(Function super T,? extends U> fn)
public CompletableFuture thenApplyAsync(Function super T,? extends U> fn, Executor executor)
public CompletionStage thenAccept(Consumer super T> action);
public CompletionStage thenAcceptAsync(Consumer super T> action);
public CompletionStage thenAcceptAsync(Consumer super T> action,Executor executor);
public CompletionStage thenRun(Runnable action);
public CompletionStage thenRunAsync(Runnable action);
public CompletionStage thenRunAsync(Runnable action,Executor executor);
thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任物的返回值thenAccept方法:消费处理结果,接受任务处理结果,并消费处理,无返回结果thenRun 方法:只要上面任务执行完成,就开始执行 thenRun ,只是处理完任务后,执行 thenRun的后续操作带有 Async 默认是异步执行的,同之前,以上都要前置任务完成
package com.firefly.fireflymall.elasticsearch.thread;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class ThenApplyAsyncTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main....start.....");
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("10 / 2运行结果:" + i);
return i;
}).thenApplyAsync((res) -> {
System.out.println("res:" + res);
return "hello+" + res;
});
String hellos = future.get();
System.out.println("hello+res:"+ hellos);
System.out.println("main....end.....");
}
}
3.5、 两任务组合 - 都要完成
两个任务必须都完成,触发该任务
thenCombine: 组合两个 future,获取两个 future的返回结果,并返回当前任务的返回值thenAccpetBoth: 组合两个 future,获取两个 future 任务的返回结果,然后处理任务,没有返回值runAfterBoth:组合 两个 future,不需要获取 future 的结果,只需要两个 future处理完成任务后,处理该任务,
f1 和 f2 单独定义返回结果
package com.firefly.fireflymall.elasticsearch.thread;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class TwoAllTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture future01 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务1当前线程:" + Thread.currentThread().getId());
int i = 10 / 4;
System.out.println("任务1结束:" + i);
return i;
});
CompletableFuture future02 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务2当前线程:" + Thread.currentThread().getId());
System.out.println("任务2结束:");
return "Hello";
});
// f1 和 f2 单独定义返回结果
CompletableFuture future = future01.runAfterBothAsync(future02, () -> {
System.out.println("执行任务三");
});
System.out.println("main....end....." );
}
}
返回f1 和 f2 的运行结果
package com.firefly.fireflymall.elasticsearch.thread;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class TwoAllTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture future01 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务1当前线程:" + Thread.currentThread().getId());
int i = 10 / 4;
System.out.println("任务1结束:" + i);
return i;
});
CompletableFuture future02 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务2当前线程:" + Thread.currentThread().getId());
System.out.println("任务2结束:");
return "Hello";
});
// 返回f1 和 f2 的运行结果
future01.thenAcceptBothAsync(future02, (f1, f2) -> {
System.out.println("任务3开始....之前的结果....f1:" + f1 + "==>f2:" + f2);
});
System.out.println("main....end.....");
}
}
f1 和 f2 单独定义返回结果
package com.firefly.fireflymall.elasticsearch.thread;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class TwoAllTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture future01 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务1当前线程:" + Thread.currentThread().getId());
int i = 10 / 4;
System.out.println("任务1结束:" + i);
return i;
});
CompletableFuture future02 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务2当前线程:" + Thread.currentThread().getId());
System.out.println("任务2结束:");
return "Hello";
});
// f1 和 f2 单独定义返回结果
CompletableFuture future = future01.thenCombineAsync(future02, (f1, f2) -> {
return f1 + ":" + f2 + "-> Haha";
});
System.out.println("f1 和 f2 单独定义返回结果:"+ future.get());
System.out.println("main....end....." );
}
}
3.6、两任务组合 - 一个完成
当两个任务中,任意一个future 任务完成时,执行任务
applyToEither;两个任务有一个执行完成,感知结果,处理任务,并有新的返回值acceptEither: 两个任务有一个执行完成,感知结果,处理任务,没有新的返回值runAfterEither:两个任务有一个执行完成,不感知结果,处理任务,也没有返回值
package com.firefly.fireflymall.elasticsearch.thread;
import java.util.concurrent.CompletableFuture;
public class TwooneTest {
public static void main(String[] args) {
CompletableFuture future01 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务1当前线程:" + Thread.currentThread().getId());
int i = 10 / 4;
System.out.println("任务1结束:" + i);
return i;
});
CompletableFuture future02 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务2当前线程:" + Thread.currentThread().getId());
System.out.println("任务2结束:");
return "Hello";
});
//future01.runAfterEitherAsync(future02,() ->{
// System.out.println("任务3开始...之前的结果:");
//});
//future01.acceptEitherAsync(future02,(res) -> {
// System.out.println("任务3开始...之前的结果:" + res);
//});
//future01.applyToEitherAsync(future02, res -> {
// System.out.println("任务3开始...之前的结果:" + res);
// return res.toString() + "->哈哈";
//});
}
}
3.7、多任务组合
allOf:等待所有任务完成anyOf:只要有一个任务完成
package com.firefly.fireflymall.elasticsearch.thread;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class AllOfAnyOfTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture futureImg = CompletableFuture.supplyAsync(() -> {
System.out.println("查询商品的图片信息");
return "hello.jpg";
});
CompletableFuture futureAttr = CompletableFuture.supplyAsync(() -> {
System.out.println("查询商品的属性");
return "黑色+256G";
});
CompletableFuture futureDesc = CompletableFuture.supplyAsync(() -> {
try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("查询商品介绍");
return "华为";
});
// 等待全部执行完
CompletableFuture allOf = CompletableFuture.allOf(futureImg, futureAttr, futureDesc);
allOf.get();
System.out.println("main....end.....");
}
}
// 只需要有一个执行完 CompletableFuture



