JUC并发编程 进阶
什么是JUC?线程和进程Lock锁(重点)生产者和消费者问题如何判断锁的是谁? 8锁现象集合类不安全Callable常用的辅助类(必会)
CountDownLatchCyclicBarrierSemaphore 读写锁阻塞队列线程池(重点)四大函数式接口(必须掌握)Stream 流式计算ForkJoin异步回调JMMVolatile深入理解单例模式深入理解CAS原子引用各种锁的理解
公平锁、非公平锁可重入锁自旋锁死锁
JUC并发编程 进阶jdk环境
保证项目JDK至少有1.8 Project和Moudle都要设置好
什么是JUC?源码+官方文档 面试高频
java.util工具包
业务:普通的线程代码 Thread
Runnable 没有返回值,效率相比Callable低!
线程和进程线程,进程,如果不能用一句话说出来的技术说明不扎实!
进程:一个进程,QQ.exe Music.exe 程序的集合,运行中的程序
一个进程往往可以包含多个线程,至少包含一个!
java默认有几个线程呢?2个 main+GC
线程:开启一个进程 Typora,写字,自动保存(线程负责)
Thread、Runnable、Callable
Java真的可以开启线程嘛? 不可以!
public synchronized void start() {
if (threadStatus != 0)
throw new IllegalThreadStateException();
group.add(this);
boolean started = false;
try {
start0();
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
}
}
}
//本地方法 底层的C++ Java 无法直接操作硬件
private native void start0();
并发 并行
并发编程:并发、并行
并发(多线程操作同一个资源)
CPU一核,模拟出来的多线程,天下武功,唯快不破,快速交替
并行(多个线程同一时刻执行)
CPU多核,多个线程可以同时执行[线程池]
public static void main(String[] args) {
// 获取CPU的核数
// CPU密集型,IO密集型
System.out.println(Runtime.getRuntime().availableProcessors());
}
并发编程的本质:充分利用CPU资源
线程有几个状态?
public enum State {
// 新生
NEW,
// 运行
RUNNABLE,
// 阻塞
BLOCKED,
//等待(一直等)
WAITING,
// 超时等待(等待一定时间,没有等到直接结束)
TIMED_WAITING,
//终止
TERMINATED;
}
wait/sleep 区别
- 来自不同的类
wait => Object
sleep => Thread关于锁的释放
wait 会释放锁,sleep不会释放锁使用的范围不同
wait 必须在同步代码块中
sleep 可以在任何地方执行sleep不需要被唤醒(休眠之后推出阻塞),但是wait需要(不指定时间需要被别人中断)。
传统的synchronized
public class SaleTicketDemo01 {
public static void main(String[] args) {
// 并发:多线程操作同一个资源类,把资源丢进线程
Ticket ticket = new Ticket();
// @FunctionalInterface 函数式接口 jdk1.8 兰八大表达式 ( 参数 )->{ 代码 }
for (String s : Arrays.asList("A", "B", "C")) {
new Thread(()->{
for (int i = 0; i < 60; i++) {
ticket.sale();
}
}, s).start();
}
}
}
class Ticket{
// 属性 方法
private int number = 50;
// 买票的方式
public synchronized void sale(){
if (number > 0) {
System.out.println(Thread.currentThread().getName() + "卖出了" + (number--) + "票,剩余:" + number);
}
}
}
Lock 接口
Lock lock = …; lock.lock();//加锁 lock.unlock();//解锁
可重入锁(常用)
实现类:ReentrantLock,ReentrantReadWriteLock.ReadLock(读锁),ReentrantReadWriteLock.WriteLock(写锁)
public class SaleTicketDemo02 {
public static void main(String[] args) {
// 并发:多线程操作同一个资源类,把资源丢进线程
Ticket2 ticket = new Ticket2();
// @FunctionalInterface 函数式接口 jdk1.8 兰八大表达式 ( 参数 )->{ 代码 }
for (String s : Arrays.asList("A", "B", "C")) {
new Thread(()->{
for (int i = 0; i < 60; i++) {
ticket.sale();
}
}, s).start();
}
}
}
class Ticket2{
// 属性 方法
private int number = 30;
Lock lock = new ReentrantLock();
// 买票的方式
public void sale(){
lock.lock(); // 加锁
try {
// 业务代码
if (number > 0) {
System.out.println(Thread.currentThread().getName() + "卖出了" + (number--) + "票,剩余:" + number);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock(); // 解锁
}
}
}
Synchronized 和 Lock 区别
- Sycnchronized 内置的java关键字,Lock 是一个java类Synchronized 无法判断取锁的状态,而 Lock 可以判断是否获取到了锁Synchronized 会自动释放锁, Lock 锁必须要手动释放锁!如果不释放锁,死锁!Synchronized 线程1 (获得锁,阻塞)、线程2(一直等);Lock 不会一直等待,会尝试获取锁Synchronized 可重入锁, 不可以中断的,非公平;Lock ,可重入锁,可以判断锁,非公平(可以自己设置)Synchronized 适合锁少量的代码同步问题,Lock 适合锁大量的同步代码!
生产者和消费者问题锁是什么?如何判断锁的是谁?
面试:单例模式、排序算法、生产者消费者、死锁
生产者和消费者问题 Synchronized 版
public class A {
public static void main(String[] args) {
final Data data = new Data();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "A").start();
new Thread(()-> {
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "B").start();
}
}
class Data{
private int number = 0;
//+1
public synchronized void increment() throws InterruptedException {
if (number!=0){
//等待
this.wait();
}
number++;
System.out.println(Thread.currentThread().getName() + "=>" + number);
//通知其他线程 我加一完毕了;
this.notifyAll();
}
//-1
public synchronized void decrement() throws InterruptedException {
if (number==0){
//等待
this.wait();
}
number--;
System.out.println(Thread.currentThread().getName() + "=>" + number);
//通知其他线程,我减一完毕了
this.notifyAll();
}
}
问题 如果同时存在多个线程 虚假唤醒
JUC 版,生产者与消费者问题
必须把if改成while!!!
代码实现:
public class B {
public static void main(String[] args) {
final Data2 data = new Data2();
new Thread(()->{
for (int i = 0; i < 10; i++) {
data.increment();
}
}, "A").start();
new Thread(()-> {
for (int i = 0; i < 10; i++) {
data.decrement();
}
}, "B").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
data.increment();
}
}, "C").start();
new Thread(()-> {
for (int i = 0; i < 10; i++) {
data.decrement();
}
}, "D").start();
}
}
class Data2{
private int number = 0;
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
//condition.await(); // 等待
//condition.signalAll(); // 唤醒全部
//+1
public void increment() {
lock.lock();
try {
//业务代码
while (number!=0){
//等待
condition.await();
}
number++;
System.out.println(Thread.currentThread().getName() + "=>" + number);
//通知其他线程 我加一完毕了;
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
//-1
public void decrement() {
lock.lock();
try {
while (number==0){
//等待
condition.await();
}
number--;
System.out.println(Thread.currentThread().getName() + "=>" + number);
//通知其他线程,我减一完毕了
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
任何一个新的技术,绝对不是仅仅只是覆盖了原来的技术,优势和补充!
Condition 精准通知和唤醒线程
代码测试:
package kuang.juc.pc;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class C {
public static void main(String[] args) {
final Data3 data = new Data3();
new Thread(()->{
for (int i = 0; i < 10; i++) {
data.printA();
}
}, "A").start();
new Thread(()-> {
for (int i = 0; i < 10; i++) {
data.printB();
}
}, "B").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
data.printC();
}
}, "C").start();
}
}
class Data3{
Lock lock = new ReentrantLock();
Condition condition1 = lock.newCondition();
Condition condition2 = lock.newCondition();
Condition condition3 = lock.newCondition();
private int number = 1;//A 1 B 2 C 3
public void printA(){
lock.lock();
try {
// 业务 判断->执行->通知
while (number != 1) {
//等待
condition1.await();
}
System.out.println(Thread.currentThread().getName() + "=> A");
// 唤醒 B
number = 2;
condition2.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void printB(){
lock.lock();
try {
// 业务 判断->执行->通知
while (number != 2) {
//等待
condition2.await();
}
System.out.println(Thread.currentThread().getName() + "=> B");
// 唤醒 C
number = 3;
condition3.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void printC(){
lock.lock();
try {
// 业务 判断->执行->通知
while (number != 3) {
//等待
condition3.await();
}
System.out.println(Thread.currentThread().getName() + "=> C");
// 唤醒 A
number = 1;
condition1.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
如何判断锁的是谁? 8锁现象
深刻理解我们的锁
- 锁创建的对象 对象方法类对象的锁 类方法
List不安全
public class ListTest {
public static void main(String[] args) {
List list = new CopyOnWriteArrayList<>();
for (int i = 0; i < 10; i++) {
new Thread(()->{
list.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(list);
},String.valueOf(i)).start();
}
}
}
Set不安全
public class SetTest {
public static void main(String[] args) {
// Set set = new HashSet<>();
// Set set = Collections.synchronizedSet(new HashSet<>());
Set set = new CopyOnWriteArraySet<>();
for (int i = 0; i < 30; i++) {
new Thread(()->{
set.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(set);
},String.valueOf(i)).start();
}
}
}
hashSet底层是什么?
public HashSet(){
map = new HashMap<>();
}
public boolean add(E e){
return map.add(e, PRESENT)==null;
}
private static final Object PRESENT = new Object();//常量
Map 不安全
//ConcurrentModificationException
public class MapTest {
public static void main(String[] args) {
// map 是这样的嘛? 不是 , 工作中不使用 HashMap
// 默认等价于什么? new HashMap<>(16,0.75)
// Map map = new HashMap<>();
// Map map = Collections.synchronizedMap(new HashMap<>());
// 加载因子 初始化容量
Map map = new ConcurrentHashMap<>();
for (int i = 0; i < 30; i++) {
new Thread(()->{
map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0,5));
System.out.println(map);
}, String.valueOf(i)).start();
}
}
}
Callable
public interface Callable
Callable 接口类似于 Runnable,因为他们都是为其实例,可能由另一个线程执行的类设计的。然而,Runnable 不返回结果,也不能抛出被检查异常。
- 可以有返回值可以抛出异常方法不同,run()/call()
代码测试
public class CallableTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
MyThread myThread = new MyThread();
FutureTask stringFutureTask = new FutureTask<>(myThread);
// 只打印了一个call!只会执行一次!!
new Thread(stringFutureTask,"A").start();
new Thread(stringFutureTask,"B").start(); // 结果会被缓存 效率高!
String s = stringFutureTask.get();//get方法可能会产生阻塞!放到最后!
//或者通过异步通信来处理!
System.out.println(s);
}
}
class MyThread implements Callable {
@Override
public String call() throws Exception {
System.out.println("call()");
// 耗时的操作
return UUID.randomUUID().toString().substring(0,5);
}
}
- 有缓存结果可能需要等待,会阻塞!
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
final int COUNT = 6;
final CountDownLatch countDownLatch = new CountDownLatch(COUNT);
for (int i = 0; i < COUNT; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName()+" Go out");
countDownLatch.countDown();//数量减一
},String.valueOf(i)).start();
}
//等待计数器归零 然后再向下执行
countDownLatch.await();
System.out.println("Close Door");
}
}
原理:
countDownLatch.countDown(); // 数量减一
countDownLatch.await(); // 等待计数器归零,然后再向下执行
每次有线程调用couontDown()数量-1,假设计数器变成0,countDownLatch.await()就会被唤醒,继续执行!
CyclicBarrier加法计数器
public class CyclicBarrierDemo {
public static void main(String[] args) {
final int COUNT = 7;
//召唤龙珠的线程
CyclicBarrier cyclicBarrier = new CyclicBarrier(COUNT,()->{
System.out.println("召唤神龙成功!");
});
for (int i = 1; i <= COUNT; i++) {
//lambda 不能拿到非常量i 可以通过定义一个常量来获取值
final int I = i;
new Thread(()->{
System.out.println(Thread.currentThread().getName() + "收集" + I + "颗龙珠");
try {
cyclicBarrier.await();// 等待
} catch (BrokenBarrierException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, String.valueOf(i)).start();
}
}
}
原理:
new CyclicBarrier( COUNT , ()->{Task} );// 创建加法计数器 指定数值COUNT和目标任务Task
cyclicBarrier.await();// 计数器加一 若达到计数器指定数值 则执行目标任务
SemaphoreSemaphore:信号量
抢车位!6车抢3个停车位 限流!
public class SemaphoreDemo {
public static void main(String[] args) {
//线程数量 : 停车位 (6辆车抢3个车位) 限流!
final int PERMITS = 3;
final Semaphore semaphore = new Semaphore(PERMITS);
for (int i = 1; i <= 6; i++) {
new Thread(()->{
try {
// acquire() 得到
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "抢到车位");
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + "离开车位");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// release() 释放
semaphore.release();
}
},String.valueOf(i)).start();
}
}
}
原理:
new Semaphore( PERMITS ); // 创建限流器,设置上限为 PERMITS
semaphore.acquire();// 获得资源 , 如果资源用完了,等待资源释放为止。
semaphore.release();// 释放资源,信号量+1,然后唤醒等待的资源。
作用:多个共享资源互斥的使用!并发限流,控制最大的线程数。
读写锁ReadWriteLock (ReentrantReadWriteLock)
ReadWriteLock维护一对关联的locks,一个用于只读操作,一个用于写入。read lock可以由多个阅读器线程同时进行,只要没有作者。write lock 是只能一个线程单独使用的。
可以多线程读,只能单线程写
public class ReadWriteLockDemo {
public static void main(String[] args) {
final MyCache myCache = new MyCache();
//写入
for (int i = 1; i <= 5; i++) {
final int I = i;
new Thread(()->{myCache.put(String.valueOf(I),I);},String.valueOf(i)).start();
}
//读取
for (int i = 1; i <= 5; i++) {
final int I = i;
new Thread(()->{myCache.get(String.valueOf(I));},String.valueOf(i)).start();
}
}
}
class MyCache {
private volatile Map map = new HashMap<>();
//存 (写)
public void put(String key, Object value){
System.out.println(Thread.currentThread().getName() + "写入" + key);
map.put(key,value);
System.out.println(Thread.currentThread().getName() + "写入完毕");
}
//取 (读)
public void get(String key){
System.out.println(Thread.currentThread().getName() + "读取" + key);
map.get(key);
System.out.println(Thread.currentThread().getName() + "读取完毕");
}
}
class MyCacheLock {
private volatile Map map = new HashMap<>();
//读写锁 更加细粒度的控制
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
//存 (写) 希望写的时候只有一个线程
public void put(String key, Object value){
readWriteLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "写入" + key);
map.put(key,value);
System.out.println(Thread.currentThread().getName() + "写入完毕");
} catch (Exception e){
e.printStackTrace();
}finally {
readWriteLock.writeLock().unlock();
}
}
//取 (读)可以多个线程同时读
public void get(String key){
readWriteLock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "读取" + key);
map.get(key);
System.out.println(Thread.currentThread().getName() + "读取完毕");
} catch (Exception e){
e.printStackTrace();
}finally {
readWriteLock.readLock().unlock();
}
}
}
阻塞队列
BlockingQueue 不是新的东西,与Set、List同级的Queue的实现类
什么情况下我们会使用 阻塞队列? 多线程并发处理,线程池!
学会使用队列
添加,移除
四组API
- 抛出异常不会抛出异常阻塞 等待超时等待
| 方式 | 抛出异常 | 有返回值(不抛出异常) | 阻塞 等待 | 超时等待 |
|---|---|---|---|---|
| 添加 | add() | offer() | put() | offer(e,t,Unit) |
| 移除 | remove() | poll() | take() | poll(t,Unit) |
| 查看队首元素 | element() | peek() | - | - |
public class Test {
public static void main(String[] args) throws InterruptedException {
// test1();
// test2();
// test3();
test4();
}
public static void test1(){
// 队列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
// //当队列满了 写入元素add 抛出异常!IllegalStateException : Queue full
// blockingQueue.add("d");
System.out.println("====================");
//查看队首元素 element()
System.out.println(blockingQueue.element());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
// //当队列为空 读取元素remove 抛出异常! NoSuchElementException
// System.out.println(blockingQueue.remove());
}
public static void test2(){
// 队列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
//当队列满了 offer写入元素 无异常
System.out.println(blockingQueue.offer("d"));//false
System.out.println("===================");
//查看队首元素 peek()
System.out.println(blockingQueue.peek());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
//当队列为空 poll 返回null
System.out.println(blockingQueue.poll());//null
}
public static void test3() throws InterruptedException {
// 队列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
blockingQueue.put("a");
blockingQueue.put("b");
blockingQueue.put("c");
// //当队列满了 put写入元素 阻塞
// blockingQueue.put("d");
System.out.println("===================");
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
// //当队列为空 add读取元素 阻塞
// System.out.println(blockingQueue.take());
}
public static void test4() throws InterruptedException {
// 队列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
//当队列满了 offer写入元素 设置了阻塞最大时间 超过 则返回false
System.out.println(blockingQueue.offer("d", 2, TimeUnit.SECONDS));
System.out.println("===================");
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
//当队列为空 add读取元素 返回null
System.out.println(blockingQueue.poll(2,TimeUnit.SECONDS));
}
}
SynchronousQueue 同步队列
没有容量,放进去一个元素后,必须等待取出之后,才可以再放入一个元素!
package kuang.juc.bq;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
public class SynchronousQueueDemo {
public static void main(String[] args) {
final BlockingQueue blockingQueue = new SynchronousQueue<>();
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName() + " put 1");
blockingQueue.put("1");
System.out.println(Thread.currentThread().getName() + " put 2");
blockingQueue.put("2");
System.out.println(Thread.currentThread().getName() + " put 3");
blockingQueue.put("3");
} catch (Exception e) {
e.printStackTrace();
}
},"A").start();
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + " get " + blockingQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + " get " + blockingQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + " get " + blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
},"B").start();
}
}
线程池(重点)
线程池:三大方法,七大参数,四种拒绝策略
池化技术
程序运行的本质:占用系统的资源!优化系统资源的使用 => 池化技术
线程池、连接池、内存池、对象池、、、(创建、销毁 十分浪费资源)
池化技术:事先准备好一些资源,有需要用则到这里拿,用完之后归还。
设置默认大小,最大资源值等。
线程池的好处:
- 降低资源的消耗提高响应的速度方便管理
线程复用,可以控制最大并发数,管理线程
线程池 三大方法
public class Demo01 {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newSingleThreadExecutor();// 单个线程
// ExecutorService threadPool = Executors.newFixedThreadPool(5); // 创建一个固定的线程池大小
// ExecutorService threadPool = Executors.newCachedThreadPool(); // 可伸缩的,自动调整的线程池
try {
for (int i = 0; i < 10; i++) {
// 使用了线程池后 使用线程池来创建线程
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName() + " ok");
});
}
} finally {
// 线程池用完后,在程序结束前需要 关闭线程池
threadPool.shutdown();
}
}
}
七大参数
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new linkedBlockingQueue()));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue());
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new linkedBlockingQueue());
}
//本质:ThreadPoolExecutor()
public ThreadPoolExecutor(int corePoolSize, //核心线程池大小
int maximumPoolSize, //最大核心线程池大小
long keepAliveTime, //超时了没有人调用就会释放
TimeUnit unit, //超时时间的时间单位
BlockingQueue workQueue,//阻塞队列
ThreadFactory threadFactory, //线程工厂(创建线程的)不用动
RejectedExecutionHandler handler //拒绝策略
) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
手动创建一个线程池
public class Demo02 {
public static void main(String[] args) {
ExecutorService threadPool = new ThreadPoolExecutor(
2, // 默认核心线程数(正常情况下银行窗口数)
5, // 最大核心线程数(银行的最大窗口数)
3, // 如果指定时间内没有超过默认线程数的需求则关闭非默认线程
TimeUnit.SECONDS, // 指定的超时时间单位
new linkedBlockingQueue<>(3), // 阻塞队列(银行的候客区)
Executors.defaultThreadFactory(), // 线程工厂
new ThreadPoolExecutor.AbortPolicy() // 拒绝策略(满了还有人来,不处理并抛出异常)
);
try {
// 最大承载:D额queue + max
for (int i = 0; i < 10; i++) {
// 使用了线程池后 使用线程池来创建线程
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName() + " ok");
});
}
} finally {
// 线程池用完后,在程序结束前需要 关闭线程池
threadPool.shutdown();
}
}
}
四种拒绝策略
小结和拓展
最大线程该如何定义?
- CPU密集型 几核CPU就设置多少,效率最高
通过Runtime.getRuntime().availiableProcessors()获取CPU核数IO密集型 大于 判断程序中十分耗IO的线程
新时代程序员:lambda表达式、链式编程、函数式接口、Stream流式计算
函数式接口:只有一个方法的接口
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
//简化编程模型,在新版本的框架底层大量使用
//foreach(消费者式函数式接口)
代码测试:
Function 函数式接口
public class Demo01 {
public static void main(String[] args) {
//输出输入的字符串
// Function function = new Function() {
// @Override
// public String apply(String str) {
// return str;
// }
// };
Function function = str -> str;
System.out.println(function.apply("FUNCTION"));
}
}
Predicate 断定型接口
public class Demo02 {
public static void main(String[] args) {
//判断字符串是否为空
Predicate predicate = str -> str == null || str.isEmpty();
System.out.println(predicate.test(""));
}
}
Counsumer 消费型接口
public class Demo03_Consumer {
public static void main(String[] args) {
Consumer consumer = str -> System.out.println(str);
consumer.accept("CONSUMER");
}
}
Supplier 供给型接口
public class Demo04_Supplier {
public static void main(String[] args) {
//返回随机UUID的前五个字符的字符串
Supplier supplier = () -> UUID.randomUUID().toString().substring(0,5);
System.out.println(supplier.get());
}
}
Stream 流式计算
什么是Stream流式计算
大数据:存储+计算
集合、MySQL本质就是存储东西的
计算都应该交给流来计算!
public class Test {
public static void main(String[] args) {
List list = Arrays.asList(
new User(1,"a",21),
new User(2,"b",22),
new User(3,"c",23),
new User(4,"d",24),
new User(6,"e",25)
);
//计算交给流
//lambda表达式 链式编程 函数式接口 流式计算
list.stream()
.filter(u->(u.getId()&1)==0)
.filter(u->u.getAge()>23)
.map(u->u.getName().toUpperCase())
.sorted((u1,u2)->u2.compareTo(u1))
.limit(1)
.forEach(System.out::println);
}
}
ForkJoin
什么是 ForkJoin
ForkJoin 在 JDK 1.7,并行执行任务!提高效率,大数据量!
大数据:Map Reduce(大任务拆分成小任务)
ForkJoin特点:工作窃取
这里面维护的是双端队列,已经完成工作的线程窃取未完成工作的线程的工作来做,提高效率!
forkJoinPool.execute(ForkJoinTask> task)为异步执行给定任务的排列
测试类
public class ForkJoinDemo extends RecursiveTask{ private long start; private long end; //临界值 private long temp = 10000L; private ForkJoinDemo forkJoinRight; public ForkJoinDemo(long start,long end){ this.start = start; this.end = end; } public static void main(String[] args) { } @Override protected Long compute() { if ((end-start) > 1; // 中间值 ForkJoinDemo forkJoinLeft = new ForkJoinDemo(start, middle); forkJoinLeft.fork();// 拆分任务 把任务压入线程队列 ForkJoinDemo forkJoinRight = new ForkJoinDemo(middle + 1, end); forkJoinRight.fork();// 拆分任务 把任务压入线程队列 return forkJoinLeft.join() + forkJoinRight.join(); } } }
测试:
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// test1();
// test2();
test3();
}
private static long END = 10_10000_0000L;
public static void test1(){
long start = System.currentTimeMillis();
long sum = 0L;
for (long i = 1L; i <= END; i++) {
sum += i;
}
long end = System.currentTimeMillis();
System.out.println("sum=" + sum + "时间:"+(end-start));
//sum=-4335232216078654848时间:2827
}
//ForkJoin
public static void test2() throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinDemo task = new ForkJoinDemo(1L,END);
// forkJoinPool.execute(task);
final ForkJoinTask submit = forkJoinPool.submit(task);//提交任务
long sum = submit.get();
long end = System.currentTimeMillis();
System.out.println("sum=" + sum + "时间:"+(end-start));
//sum=-4335232216078654848时间:2020
}
//Stream 并行流
public static void test3(){
long start = System.currentTimeMillis();
//rangeClosed (] range ()
long sum = LongStream.rangeClosed(0L, END).parallel().reduce(0, Long::sum);
long end = System.currentTimeMillis();
System.out.println("sum=" + sum + "时间:"+(end-start));
//sum=-4335232216078654848时间:1433
}
}
异步回调
Future 设计的初衷:对将来的某个事件的结果进行建模
CompletableFuture
runAsync() 无返回值Void cf.get() -> null
supplyAsync() 有返回值 cf.whenComplete((t, u)->{return …}).exceptionally(e->{return …}).get()
public class Demo01{
// 发起一个请求
public static void main(String[] args) throws ExecutionException, InterruptedException {
// voidTest();
supplyTest();
}
public static void voidTest() throws ExecutionException, InterruptedException {
// 没有返回值的 runAsync 异步回调
CompletableFuture completableFuture = CompletableFuture.runAsync(()->{
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+" runAsync => Void");
});
System.out.println("=====================");
System.out.println(completableFuture.get());//null
}
public static void supplyTest() throws ExecutionException, InterruptedException {
// 有返回值的 supplyAsync 异步回调
CompletableFuture completableFuture = CompletableFuture.supplyAsync(()->{
int i = 1/0;//制造错误!
System.out.println(Thread.currentThread().getName()+" runAsync => Integer");
return 200;
});
System.out.println("=====================");
//成功与失败的回调
System.out.println(completableFuture.whenComplete((t, u) -> {
System.out.println("t:" + t + ",u:" + u);
//t:返回值 如果错误这里为null
//u:错误信息,如果没有报错返回null
}).exceptionally(e -> {
System.out.println(e.getMessage());//可以获取 异常对象
return 500;//错误时将返回值设置为此处的值 (get)
}).get());
}
}
JMM
什么是JMM?
JMM:Java内存模型,不存在的东西,概念!约定!
关于JMM的一些同步的约定:
- 线程解锁前,必须把共享变量立刻刷回主存。线程加锁前,必须读取主存中的最新值到工作内存中。加锁和解锁是同一把锁。
线程:工作内存、主内存
八种操作:
- lock (锁定):作用于主内存的变量,把一个变量标识为线程独占状态unlock (解锁):作用于主内存的变量,它把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定read (读取):作用于主内存变量,它把一个变量的值从主内存传输到线程的工作内存中,以便随后的load动作使用load (载入):作用于工作内存的变量,它把read操作从主存中变量放入工作内存中use (使用):作用于工作内存中的变量,它把工作内存中的变量传输给执行引擎,每当虚拟机遇到一个需要使用到变量的值,就会使用到这个指令assign (赋值):作用于工作内存中的变量,它把一个从执行引擎中接受到的值放入工作内存的变量副本中store (存储):作用于主内存中的变量,它把一个从工作内存中一个变量的值传送到主内存中,以便后续的write使用write (写入):作用于主内存中的变量,它把store操作从工作内存中得到的变量的值放入主内存的变量中
对八种操作的规则:
- 不允许read和load、store和write操作之一单独出现。即使用了read必须load,使用了store必须write不允许线程丢弃他最近的assign操作,即工作变量的数据改变了之后,必须告知主存不允许一个线程将没有assign的数据从工作内存同步回主内存一个新的变量必须在主内存中诞生,不允许工作内存直接使用一个未被初始化的变量。就是怼变量实施use、store操作之前,必须经过assign和load操作一个变量同一时间只有一个线程能对其进行lock。多次lock后,必须执行相同次数的unlock才能解锁如果对一个变量进行lock操作,会清空所有工作内存中此变量的值,在执行引擎使用这个变量前,必须重新load或assign操作初始化变量的值如果一个变量没有被lock,就不能对其进行unlock操作。也不能unlock一个被其他线程锁住的变量
对一个变量进行unlock操作之前,必须把此变量同步回主内存
问题
当两个线程共用一个主存变量时,都加载到自己的工作线程中,但是当其中一个线程修改了主存值,另一个线程是不可见的!
Volatile请你谈谈对Volatile的理解
Volatile是Java虚拟机提供 轻量级的同步机制
- 保证可见性不保证原子性禁止指令重排
保证可见性
public class JMMDemo {
// 不加 volatile 程序就会死循环!
public volatile static int num = 0;
public static void main(String[] args) {
new Thread(()->{// 线程1
while(num==0){
}
}).start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
num = 1;
System.out.println(num);
}
}
不保证原子性
原子性:不可分割
一个线程在执行任务的时候,不能被打扰也不能被分割。要么操作同时成功要么操作同时失败。
public class VolatileDemo {
private volatile static int num = 0;
public static void add(){
num++;// 不是原子性操作
}
public static void main(String[] args) {
// 理论上 num结果为 20*100 = 20000
for (int i = 0; i < 20; i++) {
new Thread((()->{
for (int j = 0; j < 1000; j++) {
add();
}
})).start();
}
//等待以上线程执行完
while (Thread.activeCount() > 2) {
// 两个线程指 main 和 gc
Thread.yield();// 让出时间片
}
System.out.println(num);//小于20000
}
}
如果不加 lock 和 Synchronized , 如果保证原子性?
通过javap -c demo.java查看反编译字节码文件可知,num++是多个操作!
使用原子类 atomic包
package kuang.juc.jmm;
import java.util.concurrent.atomic.AtomicInteger;
public class VolatileDemo {
private volatile static AtomicInteger num = new AtomicInteger(0);
public static void add(){
// num++;// 非原子操作
num.getAndIncrement(); // AtomicInteger +1 方法 (CAS)
}
public static void main(String[] args) {
// 理论上 num结果为 20*100 = 20000
for (int i = 0; i < 20; i++) {
new Thread((()->{
for (int j = 0; j < 1000; j++) {
add();
}
})).start();
}
//等待以上线程执行完
while (Thread.activeCount() > 2) {
// 两个线程指 main 和 gc
Thread.yield();// 让出时间片
}
System.out.println(num);
}
}
这些类的底层都直接和操作系统挂钩!在内存中修改值。Unsafe类是一个很特殊的存在。
深入理解单例模式饿汉式单例
public class Hungry {
// 一开始就创建了对象,可能会浪费空间
private byte[] data = new byte[1024*1024];
private Hungry(){}
private final static Hungry HUNGRY = new Hungry();
public static Hungry getInstance(){
return HUNGRY;
}
}
懒汉式单例
public class LazyMan {
private LazyMan() {
System.out.println(Thread.currentThread().getName() + " 创建了lazyMan!");
}
private volatile static LazyMan lazyMan;
// 双重检查锁模式的 懒汉式单例 DCL懒汉式
private static LazyMan getInstance(){
if (lazyMan == null) {
// 避免并发条件下创建多次!
synchronized (LazyMan.class){
if (lazyMan == null) {
lazyMan = new LazyMan(); // 不是原子性操作
}
}
}
return lazyMan;
}
public static void main(String[] args) {
for (int i = 0; i < 20; i++) {
new Thread(LazyMan::getInstance).start();
}
}
}
静态内部类
public class Holder {
private Holder(){
}
public static Holder getInstance(){
return InnerClass.HOLDER;
}
public static class InnerClass{
private static final Holder HOLDER = new Holder();
}
}
单例不安全,反射!
枚举类型
public enum EnumSingle {
INSTANCE;
public EnumSingle getInstance(){
return INSTANCE;
}
}
class Test{
public static void main(String[] args) throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException {
EnumSingle instance1 = EnumSingle.INSTANCE;
final Constructor con = EnumSingle.class.getDeclaredConstructor(String.class,int.class);//最后是一个两参构造器!
con.setAccessible(true);
EnumSingle instance2 = con.newInstance(null);
}
}
枚举类型反编译的代码:
深入理解CAS什么是CAS
public class CASDemo {
// CAS Compare and Swap 比较并交换
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(2020);
// 期望,更新
// public final boolean compareAndSet(int expect, int update);
// 如果达到期望的值,则更新,否则不更新! CAS 是 CPU的并发原语!
System.out.println(atomicInteger.compareAndSet(2020, 2021));
System.out.println(atomicInteger);
System.out.println(atomicInteger.compareAndSet(2020, 2022));
System.out.println(atomicInteger);
}
}
Unsafe类
CAS : 比较当前工作内存中的值和主内存中的值,如果这个值是期望的,那么则执行操作!如果不是就一直循环!(自旋锁)
缺点:
- 循环耗时一次性只能保证一个共享变量的原子性ABA问题
原子引用CAS:ABA 问题(狸猫换太子)
解决ABA问题,引入原子引用!对应的思想是:乐观锁
带版本号的原子操作!
package kuang.juc.cas;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicStampedReference;
public class ABADemo {
public static void main(String[] args) {
// AtomicStampedReference 注意,如果泛型是一个包装类,注意对象的引用问题!
// 正常业务不使用包装类
AtomicStampedReference atomicStampedReference = new AtomicStampedReference<>(1,1);
System.out.println(atomicStampedReference.getReference());
//乐观锁原理
new Thread(()->{
System.out.println("A1:" + atomicStampedReference.getStamp());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("A2:" + atomicStampedReference.getStamp());
System.out.println(atomicStampedReference.compareAndSet(1, 2, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1));
System.out.println("A3:" + atomicStampedReference.getStamp());
},"A").start();
new Thread(()->{
System.out.println("B1:" + atomicStampedReference.getStamp());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("B2:" + atomicStampedReference.getStamp());
System.out.println(atomicStampedReference.compareAndSet(1, 3, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1));
System.out.println("B3:" + atomicStampedReference.getStamp());
},"B").start();
while (Thread.activeCount() > 2){
Thread.yield();
}
System.out.println(atomicStampedReference.getReference());
}
}
注意!
Integer使用了对象缓存机制,默认范围是 -128~127,推荐使用静态工厂方法valueOf获取对象实例,而不是new,因为valueOf使用缓存,而new一定会创建新的对象分配新的内存空间!
各种锁的理解 公平锁、非公平锁公平锁:非常公平,不允许插队,必须先来后到!
非公平锁:非常不公平,可以插队(默认都是非公平锁)
public ReentrantLock() {
sync = new NofairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
可重入锁
可重入锁(递归锁)
public class Demo02 {
public static void main(String[] args) {
final Phone phone = new Phone();
new Thread(()->{
phone.sms();
},"A").start();
new Thread(()->{
phone.call();
},"B").start();
}
}
class Phone2{
Lock lock = new ReentrantLock();
public void sms(){
lock.lock(); // 细节问题:
// 锁必须配对!否则就会死锁 上了两次锁就需要解锁两次!
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "发短信");
call();// 这里也有锁
} finally {
lock.unlock();
lock.unlock();
}
}
public void call(){
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "打电话");
} finally {
lock.unlock();
}
}
}
自旋锁
spinlock
public class SpinLockDemo {
//Thread 默认为 null
AtomicReference atomicReference = new AtomicReference<>();
// 加锁
public void myLock(){
Thread thread = Thread.currentThread();
System.out.println(thread.getName() + "==> myLock");
//自旋锁
while (!atomicReference.compareAndSet(null,thread)) {
}
}
// 解锁
public void myUnLock(){
Thread thread = Thread.currentThread();
System.out.println(thread.getName() + "==> myUnLock");
atomicReference.compareAndSet(thread,null);
}
}
测试
public class TestSpinlock {
public static void main(String[] args) throws InterruptedException {
// Lock lock = new ReentrantLock();
// lock.lock();
// lock.unlock();
// 底层使用的自旋锁
final SpinLockDemo myLock = new SpinLockDemo();
new Thread(()->{
myLock.myLock();
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
myLock.myUnLock();
}
},"A").start();
TimeUnit.SECONDS.sleep(1);
new Thread(()->{
myLock.myLock();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
myLock.myUnLock();
}
},"B").start();
}
}
死锁
死锁是什么?
两个线程相互持有对方需要的锁,且不释放
public class DeadLockDemo {
public static void main(String[] args) {
String lockA = "LOCK_A";
String lockB = "LOCK_B";
new Thread(new MyThread(lockA,lockB),"A").start();
new Thread(new MyThread(lockB,lockA),"B").start();
}
}
class MyThread implements Runnable{
private String lockA;
private String lockB;
public MyThread(String A,String B){
lockA = A;
lockB = B;
}
@Override
public void run() {
synchronized (lockA) {
System.out.println(Thread.currentThread().getName() + " lock:" + lockA + " => get " + lockB);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lockB) {
System.out.println(Thread.currentThread().getName() + " lock:" + lockB + " => get " + lockA);
}
}
}
}
怎么排除死锁?
使用jps -l定位进程号
使用jstack 进程号排查



