栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Java高并发之设计模式,设计思想

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Java高并发之设计模式,设计思想

本文主要讲解几种常见并行模式, 具体目录结构如下图.

单例

单例是最常见的一种设计模式, 一般用于全局对象管理, 比如xml配置读写之类的.
一般分为懒汉式, 饿汉式.

懒汉式: 方法上加synchronized

public static synchronized Singleton getInstance() {  
  if (single == null) {    
      single = new Singleton();  
  }    
 return single;  
}  
这种方式, 由于每次获取示例都要获取锁, 不推荐使用, 性能较差
懒汉式: 使用双检锁 + volatile
private volatile Singleton singleton = null;
    public static Singleton getInstance() {
 if (singleton == null) {
     synchronized (Singleton.class) {
  if (singleton == null) {
      singleton = new Singleton();
  }
     }
 }
 return singleton;
    }

本方式是对直接在方法上加锁的一个优化, 好处在于只有第一次初始化获取了锁.
后续调用getInstance已经是无锁状态. 只是写法上稍微繁琐点.
至于为什么要volatile关键字, 主要涉及到jdk指令重排, 详见之前的博文: Java内存模型与指令重排

懒汉式: 使用静态内部类

public class Singleton {
    private static class LazyHolder {
private static final Singleton INSTANCE = new Singleton();
    }
    private Singleton (){}
    public static final Singleton getInstance() {
return LazyHolder.INSTANCE;
    }
}

该方式既解决了同步问题, 也解决了写法繁琐问题. 推荐使用改写法.
缺点在于无法响应事件来重新初始化INSTANCE.

饿汉式

public class Singleton1 {
    private Singleton1() {}
    private static final Singleton1 single = new Singleton1();
    public static Singleton1 getInstance() {
 return single;
    }
}

缺点在于对象在一开始就直接初始化了.

Future模式

该模式的核心思想是异步调用. 有点类似于异步的ajax请求.
当调用某个方法时, 可能该方法耗时较久, 而在主函数中也不急于立刻获取结果.
因此可以让调用者立刻返回一个凭证, 该方法放到另外线程执行,后续主函数拿凭证再去获取方法的执行结果即可, 其结构图如下

jdk中内置了Future模式的支持, 其接口如下:

通过FutureTask实现

注意其中两个耗时操作.

  • 如果doOtherThing耗时2s, 则整个函数耗时2s左右.
  • 如果doOtherThing耗时0.2s, 则整个函数耗时取决于RealData.costTime, 即1s左右结束.
public class FutureDemo1 {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
 FutureTask future = new FutureTask(new Callable() {
     @Override
     public String call() throws Exception {
  return new RealData().costTime();
     }
 });
 ExecutorService service = Executors.newCachedThreadPool();
 service.submit(future);

 System.out.println("RealData方法调用完毕");
 // 模拟主函数中其他耗时操作
 doOtherThing();
 // 获取RealData方法的结果
 System.out.println(future.get());
    }

    private static void doOtherThing() throws InterruptedException {
 Thread.sleep(2000L);
    }
}

class RealData {

    public String costTime() {
 try {
     // 模拟RealData耗时操作
     Thread.sleep(1000L);
     return "result";
 } catch (InterruptedException e) {
     e.printStackTrace();
 }
 return "exception";
    }

}

通过Future实现

与上述FutureTask不同的是, RealData需要实现Callable接口.

public class FutureDemo2 {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
 ExecutorService service = Executors.newCachedThreadPool();
 Future future = service.submit(new RealData2());

 System.out.println("RealData2方法调用完毕");
 // 模拟主函数中其他耗时操作
 doOtherThing();
 // 获取RealData2方法的结果
 System.out.println(future.get());
    }

    private static void doOtherThing() throws InterruptedException {
 Thread.sleep(2000L);
    }
}

class RealData2 implements Callable{

    public String costTime() {
 try {
     // 模拟RealData耗时操作
     Thread.sleep(1000L);
     return "result";
 } catch (InterruptedException e) {
     e.printStackTrace();
 }
 return "exception";
    }

    @Override
    public String call() throws Exception {
 return costTime();
    }
}

另外Future本身还提供了一些额外的简单控制功能, 其API如下

// 取消任务
    boolean cancel(boolean mayInterruptIfRunning);
    // 是否已经取消
    boolean isCancelled();
    // 是否已经完成
    boolean isDone();
    // 取得返回对象
    V get() throws InterruptedException, ExecutionException;
    // 取得返回对象, 并可以设置超时时间
    V get(long timeout, TimeUnit unit)
     throws InterruptedException, ExecutionException, TimeoutException;

生产消费者模式

生产者-消费者模式是一个经典的多线程设计模式. 它为多线程间的协作提供了良好的解决方案。
在生产者-消费者模式中,通常由两类线程,即若干个生产者线程和若干个消费者线程。
生产者线程负责提交用户请求,消费者线程则负责具体处理生产者提交的任务。
生产者和消费者之间则通过共享内存缓冲区进行通信, 其结构图如下

PCData为我们需要处理的元数据模型, 生产者构建PCData, 并放入缓冲队列.
消费者从缓冲队列中获取数据, 并执行计算.

生产者核心代码

while(isRunning) {
     Thread.sleep(r.nextInt(SLEEP_TIME));
     data = new PCData(count.incrementAndGet);
     // 构造任务数据
     System.out.println(data + " is put into queue");
     if (!queue.offer(data, 2, TimeUnit.SECONDS)) {
  // 将数据放入队列缓冲区中
  System.out.println("faild to put data: " + data);
     }
 }

消费者核心代码

while (true) {
     PCData data = queue.take();
     // 提取任务
     if (data != null) {
  // 获取数据, 执行计算操作
  int re = data.getData() * 10;
  System.out.println("after cal, value is : " + re);
  Thread.sleep(r.nextInt(SLEEP_TIME));
     }
 }

生产消费者模式可以有效对数据解耦, 优化系统结构.
降低生产者和消费者线程相互之间的依赖与性能要求.
一般使用BlockingQueue作为数据缓冲队列, 他是通过锁和阻塞来实现数据之间的同步,
如果对缓冲队列有性能要求, 则可以使用基于CAS无锁设计的ConcurrentlinkedQueue.

分而治之

严格来讲, 分而治之不算一种模式, 而是一种思想.
它可以将一个大任务拆解为若干个小任务并行执行, 提高系统吞吐量.
我们主要讲两个场景, Master-Worker模式, ForkJoin线程池.

Master-Worker模式

该模式核心思想是系统由两类进行协助工作: Master进程, Worker进程.
Master负责接收与分配任务, Worker负责处理任务. 当各个Worker处理完成后,
将结果返回给Master进行归纳与总结.
假设一个场景, 需要计算100个任务, 并对结果求和, Master持有10个子进程.

Master代码

public class MasterDemo {
    // 盛装任务的集合
    private ConcurrentlinkedQueue workQueue = new ConcurrentlinkedQueue();
    // 所有worker
    private HashMap workers = new HashMap<>();
    // 每一个worker并行执行任务的结果
    private ConcurrentHashMap resultMap = new ConcurrentHashMap<>();

    public MasterDemo(WorkerDemo worker, int workerCount) {
 // 每个worker对象都需要持有queue的引用, 用于领任务与提交结果
 worker.setResultMap(resultMap);
 worker.setWorkQueue(workQueue);
 for (int i = 0; i < workerCount; i++) {
     workers.put("子节点: " + i, new Thread(worker));
 }
    }

    // 提交任务
    public void submit(TaskDemo task) {
 workQueue.add(task);
    }

    // 启动所有的子任务
    public void execute(){
 for (Map.Entry entry : workers.entrySet()) {
     entry.getValue().start();
 }
    }

    // 判断所有的任务是否执行结束
    public boolean isComplete() {
 for (Map.Entry entry : workers.entrySet()) {
     if (entry.getValue().getState() != Thread.State.TERMINATED) {
  return false;
     }
 }

 return true;
    }

    // 获取最终汇总的结果
    public int getResult() {
 int result = 0;
 for (Map.Entry entry : resultMap.entrySet()) {
     result += Integer.parseInt(entry.getValue().toString());
 }

 return result;
    }

}

Worker代码

public class WorkerDemo implements Runnable{

    private ConcurrentlinkedQueue workQueue;
    private ConcurrentHashMap resultMap;

    @Override
    public void run() {
 while (true) {
     TaskDemo input = this.workQueue.poll();
     // 所有任务已经执行完毕
     if (input == null) {
  break;
     }
     // 模拟对task进行处理, 返回结果
     int result = input.getPrice();
     this.resultMap.put(input.getId() + "", result);
     System.out.println("任务执行完毕, 当前线程: " + Thread.currentThread().getName());
 }
    }
    public ConcurrentlinkedQueue getWorkQueue() {
 return workQueue;
    }

    public void setWorkQueue(ConcurrentlinkedQueue workQueue) {
 this.workQueue = workQueue;
    }

    public ConcurrentHashMap getResultMap() {
 return resultMap;
    }

    public void setResultMap(ConcurrentHashMap resultMap) {
 this.resultMap = resultMap;
    }
}

public class TaskDemo {

    private int id;
    private String name;
    private int price;

    public int getId() {
 return id;
    }

    public void setId(int id) {
 this.id = id;
    }

    public String getName() {
 return name;
    }

    public void setName(String name) {
 this.name = name;
    }

    public int getPrice() {
 return price;
    }

    public void setPrice(int price) {
 this.price = price;
    }
}

主函数测试

MasterDemo master = new MasterDemo(new WorkerDemo(), 10);
 for (int i = 0; i < 100; i++) {
     TaskDemo task = new TaskDemo();
     task.setId(i);
     task.setName("任务" + i);
     task.setPrice(new Random().nextInt(10000));
     master.submit(task);
 }

 master.execute();

 while (true) {
     if (master.isComplete()) {
  System.out.println("执行的结果为: " + master.getResult());
  break;
     }
 }

ForkJoin线程池

该线程池是jdk7之后引入的一个并行执行任务的框架, 其核心思想也是将任务分割为子任务,
有可能子任务还是很大, 还需要进一步拆解, 最终得到足够小的任务.
将分割出来的子任务放入双端队列中, 然后几个启动线程从双端队列中获取任务执行.
子任务执行的结果放到一个队列里, 另起线程从队列中获取数据, 合并结果.

假设我们的场景需要计算从0到20000000L的累加求和. CountTask继承自RecursiveTask, 可以携带返回值.
每次分解大任务, 简单的将任务划分为100个等规模的小任务, 并使用fork()提交子任务.
在子任务中通过THRESHOLD设置子任务分解的阈值, 如果当前需要求和的总数大于THRESHOLD, 则子任务需要再次分解,如果子任务可以直接执行, 则进行求和操作, 返回结果. 最终等待所有的子任务执行完毕, 对所有结果求和.

public class CountTask extends RecursiveTask{
    // 任务分解的阈值
    private static final int THRESHOLD = 10000;
    private long start;
    private long end;


    public CountTask(long start, long end) {
 this.start = start;
 this.end = end;
    }

    public Long compute() {
 long sum = 0;
 boolean canCompute = (end - start) < THRESHOLD;
 if (canCompute) {
     for (long i = start; i <= end; i++) {
  sum += i;
     }
 } else {
     // 分成100个小任务
     long step = (start + end) / 100;
     ArrayList subTasks = new ArrayList();
     long pos = start;
     for (int i = 0; i < 100; i++) {
  long lastOne = pos + step;
  if (lastOne > end) {
      lastOne = end;
  }
  CountTask subTask = new CountTask(pos, lastOne);
  pos += step + 1;
  // 将子任务推向线程池
  subTasks.add(subTask);
  subTask.fork();
     }

     for (CountTask task : subTasks) {
  // 对结果进行join
  sum += task.join();
     }
 }
 return sum;
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
 ForkJoinPool pool = new ForkJoinPool();
 // 累加求和 0 -> 20000000L
 CountTask task = new CountTask(0, 20000000L);
 ForkJoinTask result = pool.submit(task);
 System.out.println("sum result : " + result.get());
    }
}

ForkJoin线程池使用一个无锁的栈来管理空闲线程, 如果一个工作线程暂时取不到可用的任务, 则可能被挂起.
挂起的线程将被压入由线程池维护的栈中, 待将来有任务可用时, 再从栈中唤醒这些线程.

作者:大道方圆
cnblogs.com/xdecode/p/9137793.html

欢迎关注我的微信公众号「码农突围」,分享Python、Java、大数据、机器学习、人工智能等技术,关注码农技术提升•职场突围•思维跃迁,20万+码农成长充电第一站,陪有梦想的你一起成长

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/239645.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号