1 public class FutureDemo1 {
2
3 public static void main(String[] args) throws InterruptedException, ExecutionException {
4 FutureTask future = new FutureTask(new Callable() {
5 @Override
6 public String call() throws Exception {
7 return new RealData().costTime();
8 }
9 });
10 ExecutorService service = Executors.newCachedThreadPool();
11 service.submit(future);
12
13 System.out.println(“RealData方法调用完毕”);
14 // 模拟主函数中其他耗时操作
15 doOtherThing();
16 // 获取RealData方法的结果
17 System.out.println(future.get());
18 }
19
20 private static void doOtherThing() throws InterruptedException {
21 Thread.sleep(2000L);
22 }
23 }
24
25 class RealData {
26
27 public String costTime() {
28 try {
29 // 模拟RealData耗时操作
30 Thread.sleep(1000L);
31 return “result”;
32 } catch (InterruptedException e) {
33 e.printStackTrace();
34 }
35 return “exception”;
36 }
37
38 }
通过Future实现
与上述FutureTask不同的是, RealData需要实现Callable接口
1 public class FutureDemo2 {
2
3 public static void main(String[] args) throws InterruptedException, ExecutionException {
4 ExecutorService service = Executors.newCachedThreadPool();
5 Future future = service.submit(new RealData2());
6
7 System.out.println(“RealData2方法调用完毕”);
8 // 模拟主函数中其他耗时操作
9 doOtherThing();
10 // 获取RealData2方法的结果
11 System.out.println(future.get());
12 }
13
14 private static void doOtherThing() throws InterruptedException {
15 Thread.sleep(2000L);
16 }
17 }
18
19 class RealData2 implements Callable{
20
21 public String costTime() {
22 try {
23 // 模拟RealData耗时操作
24 Thread.sleep(1000L);
25 return “result”;
26 } catch (InterruptedException e) {
27 e.printStackTrace();
28 }
29 return “exception”;
30 }
31
32 @Override
33 public String call() throws Exception {
34 return costTime();
35 }
36 }
另外Future本身还提供了一些额外的简单控制功能, 其API如下
1 // 取消任务
2 boolean cancel(boolean mayInterruptIfRunning);
3 // 是否已经取消
4 boolean isCancelled();
5 // 是否已经完成
6 boolean isDone();
7 // 取得返回对象
8 V get() throws InterruptedException, ExecutionException;
9 // 取得返回对象, 并可以设置超时时间
10 V get(long timeout, TimeUnit unit)
11 throws InterruptedException, ExecutionException, TimeoutException;
生产消费者模式
生产者-消费者模式是一个经典的多线程设计模式. 它为多线程间的协作提供了良好的解决方案。
在生产者-消费者模式中,通常由两类线程,即若干个生产者线程和若干个消费者线程。
生产者线程负责提交用户请求,消费者线程则负责具体处理生产者提交的任务。
生产者和消费者之间则通过共享内存缓冲区进行通信, 其结构图如下
PCData为我们需要处理的元数据模型, 生产者构建PCData, 并放入缓冲队列.
消费者从缓冲队列中获取数据, 并执行计算.
生产者核心代码
1 while(isRunning) {
2 Thread.sleep(r.nextInt(SLEEP_TIME));
3 data = new PCData(count.incrementAndGet);
4 // 构造任务数据
5 System.out.println(data + " is put into queue");
6 if (!queue.offer(data, 2, TimeUnit.SECONDS)) {
7 // 将数据放入队列缓冲区中
8 System.out.println("faild to put data: " + data);
9 }
10 }
消费者核心代码
1 while (true) {
2 PCData data = queue.take();
3 // 提取任务
4 if (data != null) {
5 // 获取数据, 执行计算操作
6 int re = data.getData() * 10;
7 System.out.println("after cal, value is : " + re);
8 Thread.sleep(r.nextInt(SLEEP_TIME));
9 }
10 }
生产消费者模式可以有效对数据解耦, 优化系统结构.
降低生产者和消费者线程相互之间的依赖与性能要求.
一般使用BlockingQueue作为数据缓冲队列, 他是通过锁和阻塞来实现数据之间的同步,
如果对缓冲队列有性能要求, 则可以使用基于CAS无锁设计的ConcurrentlinkedQueue.
分而治之严格来讲, 分而治之不算一种模式, 而是一种思想.
它可以将一个大任务拆解为若干个小任务并行执行, 提高系统吞吐量.
我们主要讲两个场景, Master-Worker模式, ForkJoin线程池.
Master-Worker模式
该模式核心思想是系统由两类进行协助工作: Master进程, Worker进程.
Master负责接收与分配任务, Worker负责处理任务. 当各个Worker处理完成后,
将结果返回给Master进行归纳与总结.
假设一个场景, 需要计算100个任务, 并对结果求和, Master持有10个子进程.
Master代码
1 public class MasterDemo {
2 // 盛装任务的集合
3 private ConcurrentlinkedQueue workQueue = new ConcurrentlinkedQueue();
4 // 所有worker
5 private HashMap
6 // 每一个worker并行执行任务的结果
7 private ConcurrentHashMap
8
9 public MasterDemo(WorkerDemo worker, int workerCount) {
10 // 每个worker对象都需要持有queue的引用, 用于领任务与提交结果
11 worker.setResultMap(resultMap);
12 worker.setWorkQueue(workQueue);
13 for (int i = 0; i < workerCount; i++) {
14 workers.put("子节点: " + i, new Thread(worker));
15 }
16 }
17
18 // 提交任务
19 public void submit(TaskDemo task) {
20 workQueue.add(task);
21 }
22
23 // 启动所有的子任务
24 public void execute(){
25 for (Map.Entry
26 entry.getValue().start();
27 }
28 }
29
30 // 判断所有的任务是否执行结束
31 public boolean isComplete() {
32 for (Map.Entry
33 if (entry.getValue().getState() != Thread.State.TERMINATED) {
34 return false;
35 }
36 }
37
38 return true;
39 }
40
41 // 获取最终汇总的结果
42 public int getResult() {
43 int result = 0;
44 for (Map.Entry
45 result += Integer.parseInt(entry.getValue().toString());
46 }
47
48 return result;
49 }
50
51 }
Worker代码
1 public class WorkerDemo implements Runnable{
2
3 private ConcurrentlinkedQueue workQueue;
4 private ConcurrentHashMap
5
6 @Override
7 public void run() {
8 while (true) {
9 TaskDemo input = this.workQueue.poll();
10 // 所有任务已经执行完毕
11 if (input == null) {
12 break;
13 }
14 // 模拟对task进行处理, 返回结果
15 int result = input.getPrice();
16 this.resultMap.put(input.getId() + “”, result);
17 System.out.println("任务执行完毕, 当前线程: " + Thread.currentThread().getName());
18 }
19 }
20
21 public ConcurrentlinkedQueue getWorkQueue() {
22 return workQueue;
23 }
24
25 public void setWorkQueue(ConcurrentlinkedQueue workQueue) {
26 this.workQueue = workQueue;
27 }
28
29 public ConcurrentHashMap
30 retu
【一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义】 浏览器打开:qq.cn.hn/FTf 免费领取
rn resultMap;
31 }
32
33 public void setResultMap(ConcurrentHashMap
34 this.resultMap = resultMap;
35 }
36 }
1 public class TaskDemo {
2
3 private int id;
4 private String name;
5 private int price;
6
7 public int getId() {
8 return id;
9 }
10
11 public void setId(int id) {
12 this.id = id;
13 }
14
15 public String getName() {
16 return name;
17 }
18
19 public void setName(String name) {
20 this.name = name;
21 }
22
23 public int getPrice() {
24 return price;
25 }
26
27 public void setPrice(int price) {



