- 前言
- 一、juc
- 1.并发容器
- 1.1 BlockingQueue
- 1.2 CopyOnWrite
- 1.2.1 CopyOnWriteArrayList
- 1.2.2 CopyOnWriteArraySet
- 1.3 concurrentHashMap
- 1.4 concurrentSkipListMap
- 2.同步工具类
- 2.1semaphore
- 2.2 countDownLatch
- 2.3CyclicBarrier
- 二、线程池与future
- 1.线程池实现原理
- 2.completableFuture
- 三、ForkJoinPool
- 总结
前言
并发编程不仅是考验基本功的核心之一,也是面试必问考点,一起来深入一下吧嵐
一、juc 1.并发容器 1.1 BlockingQueue
BlockingQueue是一个带阻塞功能的队列,当入队列时,若队列已满,阻塞调用者;当出队列时,若队列为空,阻塞调用者。
在Concurrent包中,BlockingQueue是一个接口,有许多个不同的实现类:
从上图可以看出,这个接口的实现类有很多,感兴趣的可以深入一下。
CopyOnWrite指的是"写"的时候,不直接写数据源,而是把数据拷贝一份进行修改,再通过悲观锁或者乐观锁的方式回写。
why? 为了读的时候不加锁
既然它的读方法没有加锁,那么写方法如何加锁的呢,一起来看下:
1.2.2 CopyonWriteArraySetpublic class CopyonWriteArrayList implements List,
RandomAccess, Cloneable, java.io.Serializable { // 锁对象
final transient Object lock = new Object();
public boolean add(E e) {
synchronized (lock) { // 同步锁对象
Object[] es = getArray();
int len = es.length;
es = Arrays.copyOf(es, len + 1); // CopyOnWrite,写的时候,先拷贝一 份之前的数组。
es[len] = e;
setArray(es);
return true;
} }
public void add(int index, E element) {
synchronized (lock) { // 同步锁对象
Object[] es = getArray();
int len = es.length;
if (index > len || index < 0)
throw new IndexOutOfBoundsException(outOfBounds(index, len)); Object[] newElements;
int numMoved = len - index;
if (numMoved == 0)
newElements = Arrays.copyOf(es, len + 1);
else {newElements = new Object[len + 1];
System.arraycopy(es, 0, newElements, 0, index);
// CopyOnWrite,写的时候,先拷贝一份之前的数组。 System.arraycopy(es, index, newElements, index + 1, numMoved);
}newElements[index] = element;
setArray(newElements); // 把新数组赋值给老数组 } } }
CopyonWriteArraySet 就是用 Array 实现的一个 Set,保证所有元素都不重复。其内部是封装的一个CopyOnWriteArrayList
1.3 concurrentHashMappublic class CopyonWriteArraySet extends AbstractSet implements
java.io.Serializable { // 新封装的CopyonWriteArrayList private final
CopyonWriteArrayList al; public CopyonWriteArraySet() { al = new
CopyonWriteArrayList(); }public boolean add(E e) { return
al.addIfAbsent(e); // 不重复的加进去 }}
这个是重点中的重点,一定会被问到的,如果还不清楚一定移步大神这里:https://crossoverjie.top/2018/07/23/java-senior/ConcurrentHashMap/
1.4 concurrentSkipListMapConcurrentHashMap 是一种 key 无序的 HashMap,ConcurrentSkipListMap则是 key 有序的,实现了NavigableMap接口,此接口又继承了SortedMap接口。
为什么要SkipList实现Map?
在Java的util包中,有一个非线程安全的HashMap,也就是TreeMap,是key有序的,基于红黑树实现。而在Concurrent包中,提供的key有序的HashMap,也就是ConcurrentSkipListMap,是基于SkipList(跳查表)来实现的。
这里为什么不用红黑树,而用跳查表来实现呢?
官方的回答是这样的:目前计算机领域还未找到一种高效的、作用在树上的、无锁的、增加和删除节点的办法。所以大神 Doug Lea人狠话不多,直接实现了一个。
Semaphore也就是信号量,提供了资源数量的并发访问控制。
如下图所示,假设有n个线程来获取Semaphore里面的10份资源(n > 10),n个线程中只有10个线程能获取到,其他线程都会阻塞。直到有线程释放了资源,其他线程才能获取到。
当初始的资源个数为1的时候,Semaphore退化为排他锁。正因为如此,Semaphone的实现原理和锁十分类似,是基于AQS,有公平和非公平之分。
首先来看下怎么用:
假设一个主线程要等待5个 Worker 线程执行完才能退出,可以使用CountDownLatch来实现。
使用场景?
10个工程师一起来公司应聘,招聘方式分为笔试和面试。首先,要等人到齐后,开始笔试;笔试结束之后,再一起参加面试。把10个人看作10个线程,10个线程之间的同步过程如下图所示:
实现原理?
CyclicBarrier基于ReentrantLock+Condition实现。
下图所示为线程池的实现原理:调用方不断地向线程池中提交任务;线程池中有一组线程,不断地从队列中取任务,这是一个典型的生产者—消费者模型。
要熟悉它的运行原理,就要先来看下它的参数:
- corePoolSize:在线程池中始终维护的线程个数。
- maxPoolSize:在corePooSize已满、队列也满的情况下,扩充线程至此值。
- keepAliveTime/TimeUnit:maxPoolSize 中的空闲线程,销毁所需要的时间,总线程数收缩回corePoolSize。
- blockingQueue:线程池所用的队列类型。
- threadFactory:线程创建工厂,可以自定义,有默认值
Executors.defaultThreadFactory() 。 - RejectedExecutionHandler:corePoolSize已满,队列已满,maxPoolSize 已满,最后的拒绝策略。
下面来看这6个配置参数在任务的提交过程中是怎么运作的。在每次往线程池中提交任务的时候,有如下的处理流程:
步骤一:判断当前线程数是否大于或等于corePoolSize。如果小于,则新建线程执行;如果大于,
则进入步骤二。
步骤二:判断队列是否已满。如未满,则放入;如已满,则进入步骤三。
步骤三:判断当前线程数是否大于或等于maxPoolSize。如果小于,则新建线程执行;如果大于,
则进入步骤四。
步骤四:根据拒绝策略,拒绝任务。
总结一下:首先判断corePoolSize,其次判断blockingQueue是否已满,接着判断maxPoolSize,最后使用拒绝策略。
很显然,基于这种流程,如果队列是无界的,将永远没有机会走到步骤三,也即maxPoolSize没有使用,也一定不会走到步骤四。
在JDK8之前,异步编程可以通过线程池和Future来实现,但功能还不够强大,所以从1.8开始引入了CompletableFuture。
先来看个:
package com.lagou.concurrent.demo;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture future = new CompletableFuture<>();
new Thread(() -> {
try {Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
future.complete("hello world"); }).start();
System.out.println("获取结果中。。。");
String result = future.get();
System.out.println("获取的结果:" + result); } }
CompletableFuture实现了Future接口,所以它也具有Future的特性:调用get()方法会阻塞在那,直到结果返回。
另外1个线程调用complete方法完成该Future,则所有阻塞在get()方法的线程都将获得返回结果。
completableFuture实现原理?
CompletableFuture的构造:ForkJoinPool,CompletableFuture中任务的执行依靠ForkJoinPool。
ForkJoinPool就是JDK7提供的一种“分治算法”的多线程并行计算框架。Fork意为分叉,Join意为合并,一分一合,相互配合,形成分治算法。此外,也可以将ForkJoinPool看作一个单机版的Map/Reduce,多个线程并行计算。
相比于ThreadPoolExecutor,ForkJoinPool可以更好地实现计算的负载均衡,提高资源利用率。
总结
对于多线程的编程还是需要多练习,才能做到熟能生巧。还是那句话,如果不想吃生活的苦,就跟我来吃学习的苦,我是油条~下期见



