JUC是java.util.concurrent的简写
在jdk官方手册中可以看到JUC相关的jar包有三个。
JUC的意思就是java并发编程工具包
并发编程的本质就是充分利用CPU资源
进程、线程 进程进程是指在系统中正在运行的一个应用程序,程序一旦运行就是进程。
进程是系统进行资源分配的独立实体, 且每个进程拥有独立的地址空间。
一个进程可以拥有多个线程,每个线程使用其所属进程的栈空间。
线程线程是进程的一个实体,是进程的一条执行路径。
线程是CPU独立运行和独立调度的基本单位。
进程:一个程序,QQ.exe Music.exe 程序的集合;
一个进程往往可以包含多个线程,至少包含一个!
java默认有几个线程?
2个线程 main线程、GC线程
线程:开了一个进程Typora,写字,自动保存(线程负责的)
对于Java创建线程方式:Thread、Runnable、Callable
Java真的可以开启线程吗?
开不了,用native本地方法调用底层的C++开启的!java无法操作硬件!
这里我们可以看下一原码,在源码中发现java调用的是 private native void start0();
native修饰,所以可以知道java调用的是本地方法接口,就是底层的c写的开启方法。
public synchronized void start() {
if (threadStatus != 0)
throw new IllegalThreadStateException();
group.add(this);
boolean started = false;
try {
start0();
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
}
}
}
private native void start0();
并发编程:并发、并行
并发(多线程操作同一个资源)
CPU一核,模拟出来多条线程
并行(多个人一起行走)
CPU多核,多个线程可以同时执行;线程池
线程有几种状态?六种
可以通过Thread.State来看一下其中的状态。中间的注解是解释文档看不懂的可以去百度翻译
public enum State {
//创建
NEW,
//运行
RUNNABLE,
//阻塞
BLOCKED,
//等待,一直等待
WAITING,
//超时等待,
TIMED_WAITING,
//终止
TERMINATED;
}
wait/sleep的区别
来自不同的类
wait来自object,sleep来自Thread
企业中一般不会使用这两个方法来使线程等待
import java.util.concurrent.TimeUnit;
//没有截全图,只是想说明这个类来自java.util.concurrent.TimeUnit; JUC
TimeUnit.DAYS.sleep(1);//等待一天
TimeUnit.HOURS.sleep(1);//等待一小时
TimeUnit.SECONDS.sleep(1);//等待一秒
TimeUnit.MINUTES.sleep(1);//等待一分钟
TimeUnit.MICROSECONDS.sleep(1);//等待一微秒
TimeUnit.MILLISECONDS.sleep(1);//等待一毫秒
TimeUnit.NANOSECONDS.sleep(1);//等待一纳秒
是否释放锁
wait会释放锁,sleep睡觉了 ,抱着锁,不会释放,而且不会释放一切资源一直占有。
使用的范围是不同的
wait:必须在同步代码块中
sleep:可以在任何地方睡
是否需要捕获异常
wait:不需要捕获异常,会有一个中断异常,可以抛出
try {
new ThreadDemo().wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
sleep:需要捕获异常
Lock锁*传统Synchronized
//传统的线程开发
package sang.threads;
//线程就是一个单独的资源类,没有任何附属的操作!
//里面只包含1、属性,2、方法
public class ThreadDemo {
public static void main(String[] args) {
Thread thread = new Thread(new threadTest());
thread.start();
}
}
class threadTest implements Runnable{
@Override
public void run() {
}
}
举例瞅瞅:
package sang.threads;
public class ThreadDemo {
public static void main(String[] args) {
threadTest test = new threadTest();
new Thread(() -> {
for (int i = 0; i < 52; i++) {
test.sale();
}
},"A").start();
new Thread(() -> {
for (int i = 0; i < 52; i++) {
test.sale();
}
},"B").start();
new Thread(() -> {
for (int i = 0; i < 52; i++) {
test.sale();
}
},"C").start();
}
}
//资源类 OOP思想
class threadTest {
//属性、方法
private int number = 50;
//synchronized本质上就是排队
public synchronized void sale(){
if (number>0) {
System.out.println(Thread.currentThread().getName() + "卖出了第:" + (number--) + "张票,还剩:" + number + "张票");
}
}
}
OOP:面向对象程序设计(Object Oriented Programming)
Synchronized的作用在JDK1.5之前都是使用synchronized关键字保证同步的
它可以把任意一个非NULL的对象当作锁。
- 作用于方法时,锁住的是对象的实例(this);
- 当作用于静态方法时,锁住的是Class实例,又因为Class的相关数据存储在永久带PermGen(jdk1.8则是metaspace),永久带是全局共享的,因此静态方法锁相当于类的一个全局锁,会锁所有调用该方法的线程;
- synchronized作用于一个对象实例时,锁住的是所有以该对象为锁的代码块。
它有多个队列,当多个线程一起访问某个对象监视器的时候,对象监视器会将这些线程存储在不同的容器中。
Contention List:竞争队列,所有请求锁的线程首先被放在这个竞争队列中;
Entry List:Contention List中那些有资格成为候选资源的线程被移动到Entry List中;
Wait Set:哪些调用wait方法被阻塞的线程被放置在这里;
OnDeck:任意时刻,最多只有一个线程正在竞争锁资源,该线程被成为OnDeck;
Owner:当前已经获取到所资源的线程被称为Owner;
Owner:当前释放锁的线程。
JVM每次从队列的尾部取出一个数据用于锁竞争候选者(OnDeck),但是并发情况下,ContentionList会被大量的并发线程进行CAS访问,为了降低对尾部元素的竞争,JVM会将一部分线程移动到EntryList中作为候选竞争线程。Owner线程会在unlock时,将ContentionList中的部分线程迁移到EntryList中,并指定EntryList中的某个线程为OnDeck线程(一般是最先进去的那个线程)。Owner线程并不直接把锁传递给OnDeck线程,而是把锁竞争的权利交给OnDeck,OnDeck需要重新竞争锁。这样虽然牺牲了一些公平性,但是能极大的提升系统的吞吐量,在JVM中,也把这种选择行为称之为“竞争切换”。
Synchronized是非公平锁。 Synchronized在线程进入ContentionList时,等待的线程会先尝试自旋获取锁,如果获取不到就进入ContentionList,这明显对于已经进入队列的线程是不公平的,还有一个不公平的事情就是自旋获取锁的线程还可能直接抢占OnDeck线程的锁资源。
Lock锁
synchronized是java底层支持的,而concurrent包则是jdk实现。
公平锁:十分公平,可以先来后到
非公平锁:十分不公平,可以插队(默认)
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
package sang.threads;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ThreadDemo1 {
public static void main(String[] args) {
threadTest1 test = new threadTest1();
new Thread(() -> { for (int i = 0; i < 52; i++) test.sale(); },"A").start();
new Thread(() -> { for (int i = 0; i < 52; i++) test.sale(); },"B").start();
new Thread(() -> { for (int i = 0; i < 52; i++) test.sale(); },"C").start();
}
}
class threadTest1 {
Lock lock = new ReentrantLock();
private int number = 50;
public void sale(){
lock.lock();
try {
if (number>0) {
System.out.println(Thread.currentThread().getName() + "卖出了第:" + (number--) + "张票,还剩:" + number + "张票");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
//找了半天快捷键,结果发现我把方法包括进去了,快捷键: ctrl+alt + t
Synchronized和Lock的区别
- Synchronized内置的Java关键字,Lock是一个Java类
- Synchronized无法判断获取锁的状态,Lock可以判断是否获取到了锁
- Synchronized会自动释放锁,Lock必须要手动释放锁!如果不释放锁,死锁!
- Synchronized线程1(获得锁)、线程2(等待,傻傻的等);Lock就不一定会等待下去
- Synchronized可重入锁,不可以中断的,非公平;Lock,可重入锁,可以判断锁,非公平(可以自己设置)
- Synchronized适合锁少量的代码同步问题,Lock适合锁大量的同步代码
- Lock只有代码块锁,synchronized有代码块锁和方法锁
- 使用Lock锁,JVM将花费较少的时间来调度线程,性能更好。并且具有更好的扩展性(提供更多的子类)
- synchronized 关键字锁定代码库
- 可重入锁 java.util.concurrent.lock.ReentrantLock
- 可重复读写锁 java.util.concurrent.lock.ReentrantReadWriteLock
- 指在同一个线程在外层方法获取锁的时候,进入内层方法会自动获取锁。JDK 中基本都是可重入锁,避免死锁的发生。上面提到的常见的锁都是可重入锁。
- 公平锁,指多个线程按照申请锁的顺序来获取锁。如 java.util.concurrent.lock.ReentrantLock.FairSync
- 非公平锁,指多个线程获取锁的顺序并不是按照申请锁的顺序,有可能后申请的线程先获得锁。如 synchronized、java.util.concurrent.lock.ReentrantLock.NonfairSync
独享锁 / 共享锁- 独享锁,指锁一次只能被一个线程所持有。synchronized、java.util.concurrent.locks.ReentrantLock 都是独享锁
- 共享锁,指锁可被多个线程所持有。ReadWriteLock 返回的 ReadLock 就是共享锁
悲观锁 / 乐观锁- 悲观锁,一律会对代码块进行加锁,如 synchronized、java.util.concurrent.locks.ReentrantLock
- 乐观锁,默认不会进行并发修改,通常采用 CAS 算法不断尝试更新
- 悲观锁适合写操作较多的场景,乐观锁适合读操作较多的场景
粗粒度锁 / 细粒度锁- 粗粒度锁,就是把执行的代码块都锁定
- 细粒度锁,就是锁住尽可能小的代码块,java.util.concurrent.ConcurrentHashMap 中的分段锁就是一种细粒度锁
- 粗粒度锁和细粒度锁是相对的,没有什么标准
偏向锁 / 轻量级锁 / 重量级锁- JDK 1.5 之后新增锁的升级机制,提升性能。
- 通过 synchronized 加锁后,一段同步代码一直被同一个线程所访问,那么该线程获取的就是偏向锁
- 偏向锁被一个其他线程访问时,Java 对象的偏向锁就会升级为轻量级锁
- 再有其他线程会以自旋的形式尝试获取锁,不会阻塞,自旋一定次数仍然未获取到锁,就会膨胀为重量级锁
自旋锁- 自旋锁是指尝试获取锁的线程不会立即阻塞,而是采用循环的方式去尝试获取锁,这样的好处是减少线程上下文切换的消耗,缺点是循环占有、浪费 CPU 资源
生产者和消费者问题 Synchronized版 wait notify
package sang.threads;
public class ATest {
public static void main(String[] args) {
Data data = new Data();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"B").start();
}
}
//判断、等待、通知
class Data{
private int number = 0;
public synchronized void increment() throws InterruptedException {
if (number != 0) {//判断
//等待
this.wait();
}
number++;
System.out.println(Thread.currentThread().getName() + "=>" + number);
//通知
this.notify();
}
public synchronized void decrement() throws InterruptedException {
if (number == 0) {
this.wait();
}
number--;
System.out.println(Thread.currentThread().getName() + "=>" + number);
this.notify();
}
}
如果再多两个线程的话就会出现一点问题,可以自己写一下代码测试一下就是虚假唤醒问题
什么是虚假唤醒?举个例子,我们现在有一个生产者-消费者队列和三个线程。
1) 1号线程从队列中获取了一个元素,此时队列变为空。
2) 2号线程也想从队列中获取一个元素,但此时队列为空,2号线程便只能进入阻塞(cond.wait()),等待队列非空。
3) 这时,3号线程将一个元素入队,并调用cond.notify()唤醒条件变量。
4) 处于等待状态的2号线程接收到3号线程的唤醒信号,便准备解除阻塞状态,执行接下来的任务(获取队列中的元素)。
5) 然而可能出现这样的情况:当2号线程准备获得队列的锁,去获取队列中的元素时,此时1号线程刚好执行完之前的元素操作,返回再去请求队列中的元素,1号线程便获得队列的锁,检查到队列非空,就获取到了3号线程刚刚入队的元素,然后释放队列锁。
6) 等到2号线程获得队列锁,判断发现队列仍为空,1号线程“偷走了”这个元素,所以对于2号线程而言,这次唤醒就是“虚假”的,它需要再次等待队列非空。
如果用if判断,多个等待线程在满足if条件时都会被唤醒(虚假的),但实际上条件并不满足,生产者生产出来的消费品已经被第一个线程消费了。
这就是我们使用while去做判断而不是使用if的原因:因为等待在条件变量上的线程被唤醒有可能不是因为条件满足而是由于虚假唤醒。所以,我们需要对条件变量的状态进行不断检查直到其满足条件,不仅要在线程等待前检查条件是否成立,在线程等待之后也要检查。
解决办法将if改为while
package sang.threads;
public class ATest {
public static void main(String[] args) {
Data data = new Data();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"C").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"B").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"D").start();
}
}
//判断、等待、通知
class Data{
private int number = 0;
public synchronized void increment() throws InterruptedException {
while (number != 0) {//判断
//等待
this.wait();
}
number++;
System.out.println(Thread.currentThread().getName() + "=>" + number);
//通知
this.notify();
}
public synchronized void decrement() throws InterruptedException {
while (number == 0) {
this.wait();
}
number--;
System.out.println(Thread.currentThread().getName() + "=>" + number);
this.notify();
}
}
因为if只会执行一次,执行完会接着向下执行if()外边的 而while不会,直到条件满足才会向下执行while()外边的
JUC版的生产者和消费者问题
package sang.threads;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class BTest {
public static void main(String[] args) {
Data1 data = new Data1();
new Thread(() -> { for (int i = 0; i < 10; i++) data.increment(); },"A").start();
new Thread(() -> { for (int i = 0; i < 10; i++) data.decrement();},"C").start();
new Thread(() -> { for (int i = 0; i < 10; i++) data.increment(); },"B").start();
new Thread(() -> { for (int i = 0; i < 10; i++) data.decrement(); },"D").start();
}
}
//判断、等待、通知
class Data1{
private int number = 0;
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
public void increment(){
lock.lock();
try {
while (number != 0) {//判断
//等待
condition.await();
}
number++;
System.out.println(Thread.currentThread().getName() + "=>" + number);
//通知
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
public void decrement() {
lock.lock();
try {
while (number == 0) {
condition.await();
}
number--;
System.out.println(Thread.currentThread().getName() + "=>" + number);
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
任何一个新的技术,绝对不是仅仅只是覆盖原来的技术,优势和补充!
Condition 精准的通知和唤醒线程
package sang.threads;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class CTest {
public static void main(String[] args) {
Data2 data2 = new Data2();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data2.print1();
}
}).start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data2.print2();
}
}).start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data2.print3();
}
}).start();
}
}
class Data2{
private int number = 2;
Lock lock = new ReentrantLock();
Condition condition1 = lock.newCondition();
Condition condition2 = lock.newCondition();
Condition condition3 = lock.newCondition();
public void print1(){
lock.lock();
try {
while(number != 1) {
condition1.await();
}
System.out.println(Thread.currentThread().getName() + "AAAAAAAA");
number = 2;
condition2.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void print2(){
lock.lock();
try {
while(number != 2) {
condition2.await();
}
System.out.println(Thread.currentThread().getName() + "BBBBBBBB");
number = 3;
condition3.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void print3(){
lock.lock();
try {
while(number != 3) {
condition3.await();
}
System.out.println(Thread.currentThread().getName() + "CCCCCCCC");
number = 1;
condition1.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
八锁现象
package sang.threads;
import java.util.concurrent.TimeUnit;
public class Test {
public static void main(String[] args) throws InterruptedException {
Phone phone = new Phone();
new Thread(() -> {phone.sendInfo();},"A").start();
TimeUnit.SECONDS.sleep(1);
new Thread(() -> {phone.call();},"B").start();
}
}
class Phone {
public synchronized void sendInfo(){
System.out.println("发短信");
}
public synchronized void call(){
System.out.println("打电话");
}
}
//标准情况下是先执行发短信还是先打印打电话?
//结果:打电话,发短信
//为什么? 因为有锁的存在
//synchronized 锁的对象是方法的调用者
//两个方法用的是同一个类,谁先拿到谁执行
package sang.threads;
import java.util.concurrent.TimeUnit;
//2.sendInfo延迟4秒,两个线程先发短信还是打电话?1/发短信 2/打电话
//结果:先打短信,在打电话
//被synchronized修饰的方法 锁的对象是方法的调用者
//两个方法用的是同一个类,谁先拿到谁执行
public class Test {
public static void main(String[] args) throws InterruptedException {
Phone phone = new Phone();
new Thread(() -> {phone.sendInfo();},"A").start();
TimeUnit.SECONDS.sleep(1);
new Thread(() -> {phone.call();},"B").start();
}
}
class Phone {
public synchronized void sendInfo(){
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
public synchronized void call(){
System.out.println("打电话");
}
}
现在我们再加一个普通方法不加锁的hello方法,我们再来看一下谁先执行
package sang.threads;
import java.util.concurrent.TimeUnit;
public class Test1 {
public static void main(String[] args) throws InterruptedException {
Phone1 phone = new Phone1();
new Thread(() -> {phone.sendInfo();},"A").start();
TimeUnit.SECONDS.sleep(1);
new Thread(() -> {phone.call();},"B").start();
new Thread(() -> {phone.hello();},"C").start();
}
}
class Phone1 {
public synchronized void sendInfo(){
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
public synchronized void call(){
System.out.println("打电话");
}
//这里没有锁不是同步方法,不受锁的影响
public void hello(){
System.out.println("hello");
}
}
//我们发现,先执行的是hello然后是发短信,在然后是打电话
如果在new一个对象出来,会怎么样呢?
package sang.threads;
import java.util.concurrent.TimeUnit;
public class Test1 {
public static void main(String[] args) throws InterruptedException {
Phone1 phone = new Phone1();
Phone1 phone1 = new Phone1();
new Thread(() -> {phone.sendInfo();},"A").start();
TimeUnit.SECONDS.sleep(1);
new Thread(() -> {phone1.call();},"B").start();
new Thread(() -> {phone.hello();},"C").start();
}
}
class Phone1 {
public synchronized void sendInfo(){
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
public synchronized void call(){
System.out.println("打电话");
}
public void hello(){
System.out.println("hello");
}
}
//我们发现会先执行打电话,然后是hello最后是发短信
//因为有两个对象,所以有两个调用者,所以就是有两个锁,锁不一样
如果将上面的hello线程和打电话线程调换位置调用的话,我们发现先执行hello然后是打电话,再然后是发短信
可以得出结论,因为hello没有锁,所以不受锁的影响,而又有两个对象,是两把不同的锁,所以互相之间也不会受影响,因此,就变成了谁先调用谁先执行。
//我们现在增加两个static方法,还是一样的问题
package sang.threads;
import java.util.concurrent.TimeUnit;
public class Test2 {
public static void main(String[] args) throws InterruptedException {
Phone2 phone = new Phone2();
//Phone2 phone1 = new Phone2();
new Thread(() -> {phone.sendInfo();},"A").start();
TimeUnit.SECONDS.sleep(1);
new Thread(() -> {phone.call();},"B").start();
new Thread(() -> {phone.hello();},"C").start();
}
}
class Phone2 {
public static synchronized void sendInfo(){
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
public static synchronized void call(){
System.out.println("打电话");
}
public void hello(){
System.out.println("hello");
}
}
//结果就是先执行hello,然后是发短信,最后是打电话
//因为有一个static修饰,static是静态方法,在类一加载就会装载,所以锁的对象是Class,是模板
//Phone2只有唯一的一个Class对象,就是我们发射而得来的那个Class对象,
//锁的是Class,所以才会谁先调用谁先执行
如果现在再增加一个调用者,也就是在增加一个对象,会不会有所不同呢?
答案是不会
package sang.threads;
import java.util.concurrent.TimeUnit;
public class Test2 {
public static void main(String[] args) throws InterruptedException {
Phone2 phone1 = new Phone2();
Phone2 phone2 = new Phone2();
//Phone2 phone1 = new Phone2();
new Thread(() -> {phone1.sendInfo();},"A").start();
TimeUnit.SECONDS.sleep(1);
new Thread(() -> {phone2.call();},"B").start();
new Thread(() -> {phone1.hello();},"C").start();
}
}
class Phone2 {
public static synchronized void sendInfo(){
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
public static synchronized void call(){
System.out.println("打电话");
}
public void hello(){
System.out.println("hello");
}
}
//经过测试我们知道无论增加多少对象都不会对同步代码有影响,因为锁的是Class对象,而Class对象全局唯一
现在我们将方法中的其中一个static去掉,那又会是什么结果呢?
package sang.threads;
import java.util.concurrent.TimeUnit;
public class Test3 {
public static void main(String[] args) throws InterruptedException {
Phone3 phone1 = new Phone3();
new Thread(() -> {phone1.sendInfo();},"A").start();
TimeUnit.SECONDS.sleep(1);
new Thread(() -> {phone1.call();},"B").start();
new Thread(() -> {phone1.hello();},"C").start();
}
}
class Phone3 {
public static synchronized void sendInfo(){
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
public synchronized void call(){
System.out.println("打电话");
}
public void hello(){
System.out.println("hello");
}
}
//结果是:打电话,hello,发短信
//call()和hello就不用说了
//这是因为,sendInfo锁的是Class模板,而call锁的是Phone3这个对象,所以有两把锁,互不影响
最后,如果再new一个对象会怎么样呢?
结果是不会有变化的
集合类不安全 ListList 不安全
package sang.Lists;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
public class ListTest {
public static void main(String[] args) {
//并发下ArrayList是不安全的
List list = new ArrayList();
for (int i = 0; i < 10; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(list);
},String.valueOf(i)).start();
}
}
}
ArrayList()在单线程下是线程安全的,但是在多线程下则是线程不安全的,通过上述代码我们可以发现他会报一个错误java.util.ConcurrentModificationException 并发修改异常
有可能你第一次跑是正常的,你在多跑几次就会发现报这个异常
所以得出并发下ArrayList是不安全的,那怎么解决呢?
先看一下ArrayList的原码
public boolean add(E e) {
ensureCapacityInternal(size + 1); // Increments modCount!!
elementData[size++] = e;
return true;
}//底层就是数组
那我们要不要使用synchronized来解决呢?
你以为jdk官方想不到吗?
我们看一下List的版本在看一下Vector的版本
public class Vectorextends AbstractList implements List , RandomAccess, Cloneable, java.io.Serializable ========================================================================= public synchronized boolean add(E e) { modCount++; ensureCapacityHelper(elementCount + 1); elementData[elementCount++] = e; return true; }
我们发现在jdk1.0的时候jdk官方就已经给出了Vector的add方法添加了synchronized的方法,而list是在1.2的时候出的
但这也是一种解决方案
还有一种解决方法就是使用集合的工具类,
package sang.Lists;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
public class ListTest {
public static void main(String[] args) {
List list = Collections.synchronizedList(new ArrayList<>());
for (int i = 0; i < 10; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(list);
},String.valueOf(i)).start();
}
}
}
今天学的是JUC所有亿以JUC也有一种解决方法
package sang.Lists;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
public class ListTest {
public static void main(String[] args) {
List list =new CopyOnWriteArrayList<>();
for (int i = 0; i < 10; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(list);
},String.valueOf(i)).start();
}
}
}
我们来看一下CopyOnWriteArrayList的底层是什么
public CopyOnWriteArrayList() {
setArray(new Object[0]);
}
===========================================
final void setArray(Object[] a) {
array = a;
}
=============================================
private transient volatile Object[] array;
我们发现,new出来的CopyOnWriteArrayList在初始化的时候会去调用构造方法,然后调用setArray方法,这个array是由 transient 和volatile 修饰的,所以我们可以猜测,解决的根源就在这两个关键字上,大家可以百度一下这两个关键字,这里不予讲解。
CopyonWritexxx 这个是写入时复制 ,是一种COW思想,是计算机程序设计的一种优化策略。是提高效率的。
现在存在多个线程调用的时候,资源是相同的,读取到时候是固定的,写入到时候可能会出现覆盖操作,在写入到时候避免覆盖造成数据问题。这就是一种读写分离的思想。
所以:
方案一 :List list =new Vector<>();
方案二: List list = Collections.synchronizedList(new ArrayList<>());
方案三:List list =new CopyOnWriteArrayList<>();//JUC包下的工具类就是解决并发问题的
为什么不使用Vector而要使用CopyOnWriteArrayList呢?
看一下Vector的底层
public Vector() {
this(10);
}
public Vector(int initialCapacity) {
this(initialCapacity, 0);
}
public Vector(int initialCapacity, int capacityIncrement) {
super();
if (initialCapacity < 0)
throw new IllegalArgumentException("Illegal Capacity: "+
initialCapacity);
this.elementData = new Object[initialCapacity];
this.capacityIncrement = capacityIncrement;
}
public synchronized boolean add(E e) {
modCount++;
ensureCapacityHelper(elementCount + 1);
elementData[elementCount++] = e;
return true;
}
private void ensureCapacityHelper(int minCapacity) {
// overflow-conscious code
if (minCapacity - elementData.length > 0)
grow(minCapacity);
}
private void grow(int minCapacity) {
// overflow-conscious code
int oldCapacity = elementData.length;
int newCapacity = oldCapacity + ((capacityIncrement > 0) ? capacityIncrement : oldCapacity);
if (newCapacity - minCapacity < 0) newCapacity = minCapacity;
if (newCapacity - MAX_ARRAY_SIZE > 0) newCapacity = hugeCapacity(minCapacity);
elementData = Arrays.copyOf(elementData, newCapacity);
}
我们可以看到底层是new了一个Object;类型的数组,而且Object初始化长度为10,而add方法则是将数组的的最大索引加一然后又调用ensureCapacityHelper方法进行了一个判断,使用grow方法进行了一系列判断最终使用Arrays.copyOf创建并赋值了一个新数组。
再来看一下CopyOnWriteArrayList的底层代码
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
使用的是lock锁而上面的使用的是synchronized锁,所以要比他效率高一些最终实现的也是Arrays.copyOf方法只是少了许多的判断条件
Setset和List没多大区别
还是java.util.ConcurrentModificationException线程修改异常这个错
package sang.Lists;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
public class SetTest {
public static void main(String[] args) {
Set set = new HashSet();
for (int i = 0; i < 11; i++) {
new Thread(() -> {
set.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(set);
},String.valueOf(i)).start();
}
}
}
解决方案:
1、集合工具类:Set set = Collections.synchronizedSet(new HashSet<>());
2、JUC工具类:Set set = new CopyonWriteArraySet();
HashSet的底层是什么?就是HashMep
public HashSet() {
map = new HashMap<>();
}
public boolean add(E e) {
return map.put(e, PRESENT)==null;
}//add方法就是HashMap的put方法
private static final Object PRESENT = new Object();
//PRESENT就是个常量
Map
也是不安全的
package sang.Lists;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
public class MapTest {
public static void main(String[] args) {
Map map = new ConcurrentHashMap();
for (int i = 0; i < 11; i++) {
new Thread(() -> {
map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0,5));
System.out.println(map);
},String.valueOf(i)).start();
}
}
}
解决方案:
1、集合工具类:Map map = Collections.synchronizedMap(new HashMap<>());
2、JUC工具类:Map map = new ConcurrentHashMap();
ConcurrentHashMap是采用分段锁的技术!线程安全!
HashMap的数据结构HashMap的底层主要是基于数组和链表来实现的,它之所以有相当快的查询速度主要是因为它是通过计算散列码来决定存储的位置。
HashMap是基于哈希表的Map接口的非同步实现。此实现提供所有可选的映射操作,并允许使用null值和null键。此类不保证映射的顺序,特别是它不保证该顺序恒久不变。
HashMap中主要是通过key的hashCode来计算hash值的,只要hashCode相同,计算出来的hash值就一样。如果存储的对象对多了,就有可能不同的对象所算出来的hash值是相同的,这就出现了所谓的hash冲突。
hashMap结构:哈希表是由数组+链表组成的,数组的默认长度为16(可以自动变长。在构造HashMap的时候也可以指定一个长度),数组里每个元素存储的是一个链表的头结点。而组成链表的结点其实就是hashmap内部定义的一个类:Entity。Entity包含三个元素:key,value和指向下一个Entity(实体类)的next
本来想把ConcurrentHashMap的底层逻辑写一下的,但是自己百度了一下,好多概念都不是特别清除,所以不敢写了,里面有红黑树这种数据结构,还有分段锁技术。没几篇文章介绍不清楚。
CallableCallable接口类似于Runnable ,因为它们都是为其实例可能由另一个线程执行的类设计的。 然而,A Runnable不返回结果,也不能抛出被检查的异常。
1、有返回值
2、可以抛出异常
3、方法不同run() -》 call()
现在实现一下Callable这个接口
由于Thread()里面只能放Runnable类型的对象,所以我们Callable要和Runnable挂上钩,所以我们找到了public class FutureTask implements RunnableFuture 他是RunnableFuture的实现类,而RunnableFuture又是public interface RunnableFuture extends Runnable, Future Runnablle的实现类,并且
public FutureTask(Callablecallable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable }
FutureTask的类构造器又可以写入Callable类型的参数,所以我们可以通过FutureTask来开启线程。
package sang.Lists;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class CallTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask futureTask = new FutureTask<>(new MyThread());
new Thread(futureTask,"A").start();
String s = futureTask.get();//获取Callable的返回值
System.out.println(s);
}
}
class MyThread implements Callable {
@Override
public String call() throws Exception {
System.out.println("call()");
return "12121211";
}
}//get有可能会产生阻塞,因为它需要获取callable的返回值,所以需要等call运行完成,因此我们将产生阻塞的代码放到最后
//或者使用异步通信来处理
如果在添加一个线程会怎么样呢?
package sang.Lists;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class CallTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask futureTask = new FutureTask<>(new MyThread());
new Thread(futureTask,"A").start();
new Thread(futureTask,"B").start();
String s = futureTask.get();//获取Callable的返回值
System.out.println(s);
}
}
class MyThread implements Callable {
@Override
public String call() throws Exception {
System.out.println("call()");
return "12121211";
}
}
protected boolean runAndReset() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable c = callable;
if (c != null && s == NEW) {
try {
c.call(); // don't set result
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW;
}
这里的runAndReset,运行方法进行了判断
if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))
所以第二次的是没有执行的
常用的辅助类 CountDownLatch-
public class CountDownLatch extends Object
允许一个或多个线程等待直到在其他线程中执行的一组操作完成的同步辅助。
A CountDownLatch用给定的计数初始化。 await方法阻塞,直到由于countDown()方法的调用而导致当前计数达到零,之后所有等待线程被释放,并且任何后续的await 调用立即返回。 这是一个一次性的现象 - 计数无法重置。 如果您需要重置计数的版本,请考虑使用CyclicBarrier 。
A CountDownLatch是一种通用的同步工具,可用于多种用途。 一个CountDownLatch为一个计数的CountDownLatch用作一个简单的开/关锁存器,或者门:所有线程调用await在门口等待,直到被调用countDown()的线程打开。 一个CountDownLatch初始化N可以用来做一个线程等待,直到N个线程完成某项操作,或某些动作已经完成N次。
CountDownLatch一个有用的属性是,它不要求调用countDown线程等待计数到达零之前继续,它只是阻止任何线程通过await ,直到所有线程可以通过。
(不知道为什么这里截图不好使了1,这是在jdk文档上复制的)
//CountDownLatch源码中的构造1器,需要填一个int类型的参数 public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } Sync(int count) { setState(count); } protected final void setState(int newState) { state = newState; } private volatile int state; private final Sync sync;
一般用在某些必须执行完成的线程内,保证线程的安全
package sang.Lists;
import java.util.concurrent.CountDownLatch;
//计数器
public class CountDownLatchDome {
public static void main(String[] args) throws InterruptedException {
//倒计时,总数是6
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 0; i < 6; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "Get out!");
countDownLatch.countDown();//倒计时,-1
},String.valueOf(i)).start();
}
countDownLatch.await();//等待计数器归零
System.out.println("Close the Door!");
}
}
CyclicBarrier
加法计数器
-
public class CyclicBarrier extends Object
允许一组线程全部等待彼此达到共同屏障点的同步辅助。循环阻塞在涉及固定大小的线程方的程序中很有用,这些线程必须偶尔等待彼此。屏障被称为循环 ,因为它可以在等待的线程被释放之后重新使用。
A CyclicBarrier支持一个可选的Runnable命令,每个屏障点运行一次,在派对中的最后一个线程到达之后,但在任何线程释放之前。 在任何一方继续进行之前,此屏障操作对更新共享状态很有用。
可以简单理解为加法计数器
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
this(parties, null);
}
//CyclicBarrier的构造方法有两个,只放一个参数就是简单地加法倒计时,第二个参数可以放置一个Runnable类型的类
//所以我们在后面添加一个Lamda表达式
package sang.Lists;
import java.sql.Time;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
public class CyclicBarrierDemo {
public static void main(String[] args) throws InterruptedException {
//集齐七科龙珠召唤神龙
CyclicBarrier cyclicBarrier = new CyclicBarrier(7,() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("召唤神龙成功!!!");
});
for (int i = 1; i < 8; i++) {
final int j = i;
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "集齐了" + j + "颗龙珠");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
},String.valueOf(i)).start();
}
TimeUnit.SECONDS.sleep(1);
System.out.println("谁先执行?");
}
}
有点意思,自己探索吧!
Semaphore(信号量)-
一个计数信号量
-
在概念上,信号量维持一组许可证。
-
如果有必要,每个acquire()都会阻塞,直到许可证可用,然后才能使用它。
-
每个release()添加许可证,潜在地释放阻塞获取方。
-
但是,没有使用实际的许可证对象; Semaphore只保留可用数量的计数,并相应地执行。
-
信号量通常用于限制线程数,而不是访问某些(物理或逻辑)资源
package sang.Lists;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreDemo {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 7; i++) {
new Thread(() -> {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "我进去了!");
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + "我又出去了!");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
semaphore.release();
}
},String.valueOf(i)).start();
}
}
}
原理
semaphore.acquire();获取,假设已经满了,等待被释放为止!
semaphore.release();释放,会将当前的信号量释放+1,然后唤醒等待的线程
作用:多个共享资源互斥的使用!并发限流,控制最大的线程数!
- A ReadWriteLock维护一对关联的locks ,一个用于只读操作,一个用于写入。 read lock可以由多个阅读器线程同时进行,只要没有作者。 write lock是独家的。
也就是说,读可以由多个线程去读,写只能有一个线程去写
package sang.Lists;
import java.util.HashMap;
import java.util.Map;
public class ReadWriteLockDemo {
public static void main(String[] args) {
MyCache myCache = new MyCache();
for (int i = 0; i < 7; i++) {
final int temp = i;
new Thread(() -> {
myCache.put(String.valueOf(temp),String.valueOf(temp));
},String.valueOf(i)).start();
}
for (int i = 0; i < 7; i++) {
final int temp = i;
new Thread(() -> {
myCache.get(String.valueOf(temp));
},String.valueOf(i)).start();
}
}
}
class MyCache{
private Map map = new HashMap<>();
public void put(String key,Object value){
System.out.println(Thread.currentThread().getName() + "写入" + key);
map.put(key,value);
System.out.println(Thread.currentThread().getName() + "写入完成!");
}
public void get(String key){
System.out.println(Thread.currentThread().getName() + "开始读取" + key);
map.get(key);
System.out.println(Thread.currentThread().getName() + "读取完成");
}
}
加一下锁
package sang.Lists;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ReadWriteLockDemo {
public static void main(String[] args) {
MyCache myCache = new MyCache();
for (int i = 0; i < 7; i++) {
final int temp = i;
new Thread(() -> {
myCache.put(String.valueOf(temp),String.valueOf(temp));
},String.valueOf(i)).start();
}
for (int i = 0; i < 7; i++) {
final int temp = i;
new Thread(() -> {
myCache.get(String.valueOf(temp));
},String.valueOf(i)).start();
}
}
}
class MyCache{
private Map map = new HashMap<>();
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
public void put(String key,Object value){
readWriteLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "写入" + key);
map.put(key,value);
System.out.println(Thread.currentThread().getName() + "写入完成!");
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.writeLock().unlock();
}
}
public void get(String key){
readWriteLock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "开始读取" + key);
map.get(key);
System.out.println(Thread.currentThread().getName() + "读取完成");
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.readLock().unlock();
}
}
}//如果不加读锁的话会产生一点小问题,大家可以自己测试一下
独占锁(写锁)一次只能有一个线程占有
共享锁(读锁)一次可以有多个线程占有
阻塞队列BlockingQueue阻塞队列不是一个新东西,我们可以从下图可看出了
什么情况下我们会使用阻塞队列:多线程并发处理,线程池!
package sang.Lists;
import java.util.Collection;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
public class BlockQueueDemo {
public static void main(String[] args) {
BlockingQueue blockingQueue = new ArrayBlockingQueue(3);
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
System.out.println(blockingQueue.add("d"));
}
}
=======================================================
true
true
true
Exception in thread "main" java.lang.IllegalStateException: Queue full
at java.util.AbstractQueue.add(AbstractQueue.java:98)
at java.util.concurrent.ArrayBlockingQueue.add(ArrayBlockingQueue.java:312)
at sang.Lists.BlockQueueDemo.main(BlockQueueDemo.java:15)
Process finished with exit code 1
四组API
1、抛出异常
//抛出异常
public static void test1(){
ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(arrayBlockingQueue.add("A"));
System.out.println(arrayBlockingQueue.add("B"));
System.out.println(arrayBlockingQueue.add("C"));
//IllegalStateException: Queue full 。
//System.out.println(arrayBlockingQueue.add("D"));
System.out.println(arrayBlockingQueue.remove());
System.out.println(arrayBlockingQueue.remove());
System.out.println(arrayBlockingQueue.element());//检查队首元素
System.out.println(arrayBlockingQueue.remove());
//.NoSuchElementException ,抛出异常
//System.out.println(arrayBlockingQueue.remove());
}
2、不会抛出异常
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
//有返回值不会抛出异常
public static void test2(){
ArrayBlockingQueue
3、阻塞等待
public static void test3() throws InterruptedException {
ArrayBlockingQueue
4、超时等待
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}
//等待,阻塞超时等待
public static void test4() throws InterruptedException {
ArrayBlockingQueue
SynchronousQueue同步队列
没有容量
进去一个元素,必须等待取出来之后,才能向里面再放一个元素,存put,取tack。
//synchronized同步队列, 和其他的BlockingQueue不一样, SynchronousQueue不存储元素
public static void test5(){
SynchronousQueue
可能出现线程抢占问题,有可能你第一次看不到这个小问题。多测几次或者多加几个线程,会发现线程T1的打印线程会抢占线程T2的打印线程,上述问题。因为多线程下不能保证原子性操作,put是原子性操作但是打印线程不是,会去抢占CPU
for (;;) {
QNode t = tail;
QNode h = head;
if (t == null || h == null) // saw uninitialized value
continue; // spin
if (h == t || t.isData == isData) { // empty or same-mode
QNode tn = t.next;
if (t != tail) // inconsistent read
continue;
if (tn != null) { // lagging tail
advanceTail(t, tn);
continue;
}
if (timed && nanos <= 0) // can't wait
return null;
if (s == null)
s = new QNode(e, isData);
if (!t.casNext(null, s)) // failed to link in
continue;
advanceTail(t, s); // swing tail and wait
Object x = awaitFulfill(s, e, timed, nanos);
if (x == s) { // wait was cancelled
clean(t, s);
return null;
}
if (!s.isOffList()) { // not already unlinked
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;
s.waiter = null;
}
return (x != null) ? (E)x : e;
} else { // complementary-mode
QNode m = h.next; // node to fulfill
if (t != tail || m == null || h != head)
continue; // inconsistent read
Object x = m.item;
if (isData == (x != null) || // m already fulfilled
x == m || // m cancelled
!m.casItem(x, e)) { // lost CAS
advanceHead(h, m); // dequeue and retry
continue;
}
advanceHead(h, m); // successfully fulfilled
LockSupport.unpark(m.waiter);
return (x != null) ? (E)x : e;
}
}
阅读原码知道,这里使用了一个for循环进无限循环,只有指定条件满足才会继续向下执行,我猜测阻塞就是在这里
这里可以先put,然后再打印语句,然后把T2线程注释掉。就会发现put之后的输出语句是没办法执行的,也就是说put操作时就进入到阻塞了,只有调用了take方法后,原有的put才会继续执行。
线程池线程池:三大方法,7大参数,4种拒绝策略
池化技术程序运行本质:占用系统资源,所以我们要优化资源的使用 ====》所以就引进了一种技术叫,池化技术
池化技术 (Pool) 是一种很常见的编程技巧,在请求量大时能明显优化应用性能,降低系统频繁建连的资源开销。我们日常工作中常见的有数据库连接池、线程池、对象池等,它们的特点都是将 “昂贵的”、“费时的” 的资源维护在一个特定的 “池子” 中,规定其最小连接数、最大连接数、阻塞队列等配置,方便进行统一管理和复用,通常还会附带一些探活机制、强制回收、监控一类的配套功能。
池化技术能够减少资源对象的创建次数,提高程序的性能,特别是在高并发下这种提高更加明显。使用池化技术缓存的资源对象有如下共同特点:
1,对象创建时间长;
2,对象创建需要大量资源;
3,对象创建后可被重复使用。
一个资源池具备如下功能:租用资源对象、归还资源对象、清除过期资源对象,接下来我们就从这几个功能点出发分别进行分析。
白话文就是:事先准备好一些资源,有人要用,就来我这里拿,用完之后还给我
线程池的好处:
-
降低资源的消耗
-
提高响应的速度
-
方便管理
线程复用,可以控制最大并发数、管理线程
就是说线程池有三个方法
Executors.newCachedThreadPool(); //可伸缩的,遇强则强,遇弱则弱
Executors.newSingleThreadExecutor(); //单个线程
Executors.newFixedThreadPool(5); //创建一个固定的线程池的大小
package sang.Lists;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
public class ThreadPoolDemo {
public static void main(String[] args) {
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();//可伸缩的,遇强则强,遇弱则弱
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();//单个线程
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);//创建一个固定的线程池的大小
for (int i = 0; i < 10; i++) {
fixedThreadPool.execute(() -> {
System.out.println(Thread.currentThread().getName());
});
}
fixedThreadPool.shutdown();
}
}
7大参数
我们来看一下三大方法的原码
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new linkedBlockingQueue()));
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new linkedBlockingQueue());
}
//我们发现他们都创建了一个叫ThreadPoolExecutor的对象所以我们来看一下ThreadPoolExecutor的原码
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
//我们发现这里都使用了方法重载,都调用了下面这个方法,而这个方法正好有七个参数,所以这就是我们所说的七大参数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
解释一下:
int corePoolSize, //核心线程池大小
int maximumPoolSize, //最大核心线程大小
long keepAliveTime, //超时了没有人调用就会释放
TimeUnit unit, //超时单位
BlockingQueue workQueue, //阻塞队列
ThreadFactory threadFactory, //线程工厂,创建线程的的,一般不用动
RejectedExecutionHandler handler //拒绝策略)
核心线程数就是最基本的线程数 最大线程数就是当核心线程数不够时 多生产出线程的最大数 还有个等待时间 我猜是 当超出核心线程数的那一部分线程在这个时间内没有任务 就会销毁
SingleThreadExecutor被FinalizableDelegaledExcutorService包装后,无法向下转型,它被确定以后,无法修改,做到了真正的Single!
分析线程池的工作原理
比如说银行业务办理,大家都去过银行,可能都知道银行的窗口并不是全部开启,只是开启2-3个,假设今天开启了两个窗口,进来了2个人,那么这两个窗口不就已经排满了吗,这时又进来了三个人,是不是就没有窗口去办理业务了,那么他们就会先去叫号然后去等待区坐着等待,假设候客区只有三个位置(这里是假设,也可以有多个,但一定是有限的,因为大厅就那么大),那么,如果这时候再进来一个人,那么这时候候客区已经满了,而窗口正在办理的柜台也已经满了。那么这时候银行的经理是不是就会采取一种措施,叫那三个窗口的人赶紧回来干活,现在就变成了五个窗口在办理业务,如果这时候又进来一个人,发现候客区满了,窗口满了那么这时候就会采取一种拒绝策略。那么假设我们最大效率的运转,这时候1只有1-2个人还在办理业务,那么4,5,6三个窗口就没有事干了,在等待了1小时候还是没有大量的人来,那么银行经理为了节省工资就会将3,4,5开回家,到人多的时候在将他们叫回来,这就是超时等待。
我们现在将上面那种思想办成java代码来实现他,那么这就是我们自己写一个线程池了
手动创建线程池package sang.Lists;
import java.util.concurrent.Executors;
import java.util.concurrent.linkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolTest {
public static void main(String[] args) {
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
2,
5,
3,
TimeUnit.SECONDS,
new linkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
for (int i = 1; i <=7; i++) {//这里随着i的变化会出现不同的线程去执行打印方法,所以变化判断条件多试几次
poolExecutor.execute(() -> {
System.out.println(Thread.currentThread().getName() + " OK ");
});
}
poolExecutor.shutdown();
}
}
四种默认的拒绝策略
new ThreadPoolExecutor.AbortPolicy()//银行满了,还有人进来,不处理这个人的,抛出异常 new ThreadPoolExecutor.CallerRunsPolicy()//哪来的去哪里,main ok(这里是在main线程中执行的程序,所以会让main线程执行) new ThreadPoolExecutor.DiscardPolicy()//队列满了,丢掉任务,不会抛出异常 new ThreadPoolExecutor.DiscardOldestPolicy()//也不会抛出异常,将任务队列最老的任务丢弃,并尝试再次提交新的任务
DiscardOldestPolicy():一个被拒绝任务的处理程序,尝试丢弃最旧的未处理请求,然后重试{@code execute},除非执行程序关闭,在这种情况下任务被丢弃。
如果你电脑速度慢点,你可以多尝试几次9循环,用第一个策略(AbortPolicy),你会发现没报错,最后一个走的线程5
最大线程数如何定义:
1、cpu密集型
获取电脑最大线程数,也就是CPU能够运行的虚拟机数
System.out.println(Runtime.getRuntime().availableProcessors());//获取CPU的核数
2、IO密集型
判断你程序中耗费io的线程,一般设置耗费线程的两倍大,
四大函数式接口Consumer,Function,Predicate,Supplier四大函数式接口
为了简化编程模型,在新版本的框架底层大量应用!
Function@FunctionalInterface public interface Function{ R apply(T t); //返回值R,传入参数T
package sang.Lists;
import java.util.function.Function;
public class FunctionDemo {
//自定义工具类,输出输入的值
public static void main(String[] args) {
Function function = (str) ->{
return Integer.parseInt(str);
};
System.out.println(function.apply("123"));
}
}
Predicate 断定型接口
有一个输入参数,返回值只能是布尔值
@FunctionalInterface public interface Predicate{ boolean test(T t);
package sang.Lists;
import java.util.function.Predicate;
public class PredicateDemo {
public static void main(String[] args) {
// Predicate predicate = new Predicate() {
// @Override
// public boolean test(String s) {
// if (s.isEmpty())
// return false;
// else return true;
// }
// };
Predicate predicate = (str) -> {
if (str.isEmpty())
return false;
else return true;
};
System.out.println(predicate.test("1"));
}
}
Predicate 断定型接口:有一个输入参数,返回值只能是布尔值
Consumer消费型接口@FunctionalInterface public interface ConsumerSupplier供给型接口{ void accept(T t); ================================= package sang.Lists; import java.util.function.Consumer; public class ConsumerDemo { public static void main(String[] args) { Consumer consumer = new Consumer () { @Override public void accept(String s) { System.out.println(s); } }; consumer.accept("sadafsa"); } }
@FunctionalInterface public interface Supplier{ T get(); }//没有参数只有返回值,供给型接口
package sang.Lists;
import java.util.function.Supplier;
public class SupplierDemo {
public static void main(String[] args) {
// Supplier supplier = new Supplier() {
// @Override
// public Object get() {
// return 1024;
// }
// };
Supplier supplier = () -> {
return 1024;
};
System.out.println(supplier.get());
}
}
Stream流式计算
什么是Stream流式计算?
大数据:存储+计算
存储:集合、MySQL本质就是存储东西的;
计算都应该交给流来操作!
package sang.steam;
import java.util.Arrays;
import java.util.List;
public class SteamDemo {
public static void main(String[] args) {
User user1 = new User("a",12,"男");
User user2 = new User("b",14,"女");
User user3 = new User("c",16,"男");
User user4 = new User("d",18,"女");
User user5 = new User("e",17,"男");
User user6 = new User("f",13,"女");
//集合就是存储
List list = Arrays.asList(user1, user2, user3, user4, user5, user6);
//计算交给Stream流
//链式编程
list.stream()
.filter((user) -> {return user.getAge() > 14;})过滤
.filter((user) -> {return user.getSex().equals("男");})
.map((user) -> {return user.getName().toUpperCase();})转大写
.sorted((uu1,uu2) -> {return uu1.compareTo(uu2);})//排序
.limit(2)
.forEach(System.out::println);
}
}
//lambda表达式,链式编程,函数式接口,Stream流式计算
//Stream filter(Predicate super T> predicate);断言式接口,返回boolean值
// Stream map(Function super T, ? extends R> mapper);函数式接口,返回值R,传入参数T
// void forEach(Consumer super T> action);消费式接口,没有返回值
//Stream limit(long maxSize);最大数量
======================================================
package sang.steam;
public class User {
private String name;
private int age;
private String sex;
public User() {
}
public User(String name, int age, String sex) {
this.name = name;
this.age = age;
this.sex = sex;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public String getSex() {
return sex;
}
public void setSex(String sex) {
this.sex = sex;
}
}
ForkJoin
分支合并
并行任务时提高效率的·就是大数据量的时候提高效率的。
Map Reduce 就是将大任务转换成许多的小任务,然后将每个小任务的结果合并起来,变成最后的结果。
特点:工作窃取
双端队列所以能够窃取
package sang.steam; import java.util.concurrent.RecursiveTask; public class ForkJoinDemo extends RecursiveTask{ private long start; private long end; private long temp = 10000L; public ForkJoinDemo(long start, long end) { this.start = start; this.end = end; } //计算方法 @Override protected Long compute() { if ((end - start) < temp) { long sum = 0L; for (long i = start; i <= end; i++) { sum += i; } return sum; } else { long l = (start + end) / 2; ForkJoinDemo forkJoinDemo = new ForkJoinDemo(start,l); forkJoinDemo.fork(); ForkJoinDemo forkJoinDemo1 = new ForkJoinDemo(l + 1, end); forkJoinDemo1.fork(); //这里会递归调用这个分支方法,也就是会一直循环if——else这一部分代码块,start出现问题不要惊讶,因为下面的那一条分支会将Start //赋值为l + 1 ,也就是说if的判断条件最终会变成true,然后一直执行结束并将分支的结果累加,可以想象成二叉树,一直分支,直到最后的叶子满足if中的条件就不在分了,然后将叶子的执行结果累加。这就会有可能出现oom异常,因为如果数再大一些,就会一直分支,而每一个分支就是一个ForkJoinDemo对象,当对象足够多就会出现oom异常了,想要看到的话可以设置一下堆得内存大小,尽量小一点。 //这里我将long类型都设置了一遍,防止重复拆装箱造成时间浪费。 return forkJoinDemo.join() + forkJoinDemo1.join(); } } }
package sang.steam;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;
public class Demo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
test3();
}
//普通程序员
public static void test1(){
long sum = 0L;
Long start = System.currentTimeMillis();
for (long i = 1L; i < 10_0000_0000; i++) {
sum += i;
}
long end = System.currentTimeMillis();
System.out.println("sum = " + sum + "时间" + (end - start));
//sum = 499999999500000000时间950
}
//会ForkJoin的程序员
public static void test2() throws ExecutionException, InterruptedException {
Long start = System.currentTimeMillis();
ForkJoinDemo forkJoinDemo = new ForkJoinDemo(1L,10_0000_0000L);
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask submit = forkJoinPool.submit(forkJoinDemo);
Long sum = submit.get();
long end = System.currentTimeMillis();
System.out.println("sum = " + sum + "时间" + (end - start));
//sum = 500000000500000000时间842
}
//Stream并行流
public static void test3(){
Long start = System.currentTimeMillis();
long sum = LongStream.rangeClosed(1L, 10_0000_0000L).parallel().reduce(0,Long::sum);
long end = System.currentTimeMillis();
System.out.println("sum = " + sum + "时间" + (end - start));
//sum = 500000000500000000时间931
}
}
//可能是我电脑比较垃圾,所以看起来没有多大差别
异步回调
Future设计的初衷就是对将来的某个事件的结果进行建模
异步调用:一个可以无需等待被调用函数的返回值就让操作继续进行的方法
public final
class Void {
//void的空参返回值泛型类
@SuppressWarnings("unchecked")
public static final Class TYPE = (Class) Class.getPrimitiveClass("void");
private Void() {}
}
package sang.steam;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class FutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//没有返回值的CompletableFuture.runAsync方法
// CompletableFuture runAsync = CompletableFuture.runAsync(() -> {
// try {
// TimeUnit.SECONDS.sleep(2);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// });
// System.out.println("111111111111");
// runAsync.get();
//有返回值的1方法
CompletableFuture supplyAsync = CompletableFuture.supplyAsync(() -> {
try {
System.out.println("111111111111");
TimeUnit.SECONDS.sleep(2);
System.out.println("121212");
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1024;
});
System.out.println("=============");//这里会先输出try里的1111打印然后停住,之后打印===这个,
// 有点神奇,如果先执行过下面的get()方法然后注释掉get方法会发现只打印了====之后就结束了,在执行一遍会发现1111又出现了
//System.out.println(supplyAsync.get());//如果调用了get()方法则会先打印===然后111然后121212然后是1024
}
}
package sang.steam;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class FutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture completableFuture = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getName() + "supplyAsync => Integer");
int i = 10/0;
return 1024;
});
System.out.println(completableFuture.whenComplete((t, u) -> {
System.out.println("t => " + t);//正常的返回结果
System.out.println("u => " + u);//错误信息
}).exceptionally((e) -> {
return 233;
}));
// ForkJoinPool.commonPool-worker-1supplyAsync => Integer
// t => null
// u => java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
// java.util.concurrent.CompletableFuture@378bf509[Completed normally]
//没有返回值的CompletableFuture.runAsync方法
// CompletableFuture runAsync = CompletableFuture.runAsync(() -> {
// try {
// TimeUnit.SECONDS.sleep(2);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// });
// System.out.println("111111111111");
// runAsync.get();
//有返回值的1方法
// CompletableFuture supplyAsync = CompletableFuture.supplyAsync(() -> {
// try {
// System.out.println("111111111111");
// TimeUnit.SECONDS.sleep(2);
// System.out.println("121212");
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// return 1024;
// });
//System.out.println("=============");//这里会先输出try里的1111打印然后停住,之后打印===这个,
// 有点神奇,如果先执行过下面的get()方法然后注释掉get方法会发现只打印了====之后就结束了,在执行一遍会发现1111又出现了
//System.out.println(supplyAsync.get());//如果调用了get()方法则会先打印===然后111然后121212然后是1024
}
}
JMM
什么是JMM?
JMM:java内存模型,不存在的东西,概念!约定!
关于JMM的一些同步的约定:
- 线程解锁前,必须把共享变量立刻刷回主存!
- 线程加锁前,必须读取主存中的最新值到工作内存中!
- 加锁和解锁是同一把锁!
- JMM 三大特征分别是:原子性,可见性,有序性。
lock(锁定):作用于主内存中的变量,一个变量在同一时间只能被一个线程锁定,即把变量标识为线程独占状态。
read(读取):作用于主内存变量,表示把一个变量值从主内存传输到线程的工作内存中,以便下一步的 load 操作使用。
load(载入):作用于线程的工作内存的变量,表示把 read 操作从主内存中读取的变量值放到工作内存的变量副本中(副本是相对于主内存的变量而言的)。
use(使用):作用于线程的工作内存中的变量,表示把工作内存中的一个变量值传递给执行引擎,每当虚拟机遇到一个需要使用变量的值的字节码指令时就会执行该操作。
assign(赋值):作用于线程的工作内存的变量,表示把执行引擎返回的值赋值给工作内存中的变量,每当虚拟机遇到一个给变量赋值的字节码指令时就会执行该操作。
store(存储):作用于线程的工作内存中的变量,把工作内存中的一个变量的值传递给主内存,以便下一步的 write 操作使用。
write(写入):作用于主内存的变量,表示把 store 操作从工作内存中得到的变量的值放入主内存的变量中。
unlock(解锁):作用于主内存的变量,表示把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定。
JMM 还规定了以上八种操作需按照如下规则进行:
不允许read 和 load、store 和 write 操作之一单独出现,也就是 read 操作后必须 load,store 操作后必须 write。即不允许一个变量从主内存读取了但工作内存不接受,或者从工作内存发起回写了但主内存不接受的情况出现。
不允许线程丢弃它最近的 assign 操作,即变量在工作内存中改变了之后必须把该变化同步回主内存。
不允许线程将没有 assign 的数据从工作内存同步到主内存。
一个新的变量必须在主内存中诞生,不允许工作内存直接使用一个未被初始化的变量。也就是对变量实施 use 和 store 操作之前,必须经过 load 和 assign 操作。
一个变量同一时间只能有一个线程对其进行 lock 操作。但 lock 操作可以被同一条线程重复执行多次,多次 lock 之后,必须执行相同次数 unlock 才可以解锁。
如果对一个变量进行 lock 操作,会清空所有工作内存中此变量的值。在执行引擎使用这个变量前,必须重新 load 或 assign 操作初始化变量的值。
如果一个变量没有被 lock,就不能对其进行 unlock 操作。也不能 unlock 一个被其他线程锁住的变量。
一个线程对一个变量进行 unlock 操作之前,必须先把此变量同步回主内存。
package sang.steam;
import java.util.concurrent.TimeUnit;
public class JMMDemo {
public static int num = 0;
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
while (num == 0){
}
}).start();
TimeUnit.SECONDS.sleep(2);
num = 1;
System.out.println(num);
//所以我们需要thread线程知道main线程中的变量改变了
//问题:程序不知道主内存中的值被修改过了,所以引出Volatile
}
}
Volatile
volatile是一个特征修饰符(type specifier).volatile的作用是作为指令关键字,确保本条指令不会因编译器的优化而省略,且要求每次直接读值。
请你谈谈你对Volatile?
Volatile是java虚拟机提供的轻量级的同步机制!
保证可见性(如果问到是怎样保证可见性的,那么就要和他谈一谈上面的JMM)
//将上面的代码加上volatile会发现死循环解决了
package sang.steam;
import java.util.concurrent.TimeUnit;
public class JMMDemo {
//线程一对主内存中的变化不知道
//不加volatile程序就会死循环,加了volatile可以保证可见性
public static volatile int num = 0;
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
while (num == 0){
}
}).start();
TimeUnit.SECONDS.sleep(2);
num = 1;
System.out.println(num);
}
}
//这就说明了volatile保证了可见性
不保证原子性
原子性是指一个操作是不可分割、不可中断的,要么全部执行成功要么全部执行失败。
package sang.steam;
public class VolatileDemo {
private static int num = 0;
//Volatile不保证原子性
public static void main(String[] args) {
for (int i = 0; i < 20; i++) {
//理论上num的结果应该是2万
new Thread(() -> {
for (int j = 0; j < 1000; j++) {
add();
}
},String.valueOf(i)).start();
}
while (Thread.activeCount() > 2)//java默认有两个线程在运行,一个main,一个gc
Thread.yield();
System.out.println(num);
}
public static void add(){
num++;//num++不是一个原子性操作
}
}
//引出问题,num++为什么不是一个原子性操作?
现在我们通过java的反编译命令,看一下底层的字节码javap -c VolatileDemo
那现在不用synchronized和lock锁怎么解决这个问题呢?
使用JUC中的原子类来解决这个问题AtomicInteger类
package sang.steam;
import java.util.concurrent.atomic.AtomicInteger;
public class VolatileDemo {
private static AtomicInteger num = new AtomicInteger();
//Volatile不保证原子性
public static void main(String[] args) {
for (int i = 0; i < 20; i++) {
//理论上num的结果应该是2万
new Thread(() -> {
for (int j = 0; j < 1000; j++) {
add();
}
},String.valueOf(i)).start();
}
while (Thread.activeCount() > 2)//java默认有两个线程在运行,一个main,一个gc
Thread.yield();
System.out.println(num);
}
public static void add(){
num.getAndIncrement();
// num++;//num++不是一个原子性操作
}
}
是不是有点疑惑,new一个类不是比加一把锁更耗费资源吗?可以看一下原码
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
private static final Unsafe unsafe = Unsafe.getUnsafe();
public native int getInt(java.lang.Object o, long l);
public native void putInt(java.lang.Object o, long l, int i);
public native java.lang.Object getObject(java.lang.Object o, long l);
public native void putObject(java.lang.Object o, long l, java.lang.Object o1);
public native boolean getBoolean(java.lang.Object o, long l);
public native void putBoolean(java.lang.Object o, long l, boolean b);
//然后发现底层调用的都是native修饰的方法,这就不是java写的了,而是jvm虚拟机调用的本地方法栈,所以运行的比较快
//这些类的底层都直接和操作系统挂钩,直接在内存中修改值Unsafe是一个很特殊的类!
禁止指令重排
Java并发编程:volatile关键字解析:原文详解:https://www.cnblogs.com/dolphin0520/p/3920373.html
什么是指令重排?什么是指令重排:你写的程序,计算机并不是按照你写的那样去执行的!
源代码—>编译器优化的重排–>指令并行也可能会重排—>内存系统也会重排—>执行
一般情况下,CPU和编译器为了提升程序执行的效率,会按照一定的规则允许进行指令优化,在某些情况下,这种优化会带来一些执行的逻辑问题,主要的原因是代码逻辑之间是存在一定的先后顺序,在并发执行情况下,会按照不同的执行逻辑,会得到不同的结果信息。
编译器不会对存在数据依赖性的程序指令进行重排的,这里的依赖性仅仅指单个线程下的数据依赖性;多线程并发情况下,此规则将失去作用。这样就会造成一些比较匪夷所思的问题,比如一个线程已经赋值而另一个线程并不知道,虽然在实际中只有千万分之一的概率会出现但是在逻辑上确实存在。
冯·诺依曼计算机的特点(机器以运算器为中心)
1、计算机由
控制器**(分析和执行机器指令并控制各部件的协同工作)**
运算器**(根据控制信号对数据进行算术运算和逻辑运算)**
存储器**(内存存储中间结果,外存存储需要长期保存的信息)**
输入设备**(接收外界信息) 和 输出设备(向外界输送信息)**
五大部件组成
2、指令(程序)和数据以二进制不加区别地存储在存储器中
3、程序自动运行
现代计算机由三大部分组成(已经转化为以存储器为中心)
-
CPU(Central Processing Unit) 中央处理器,核心部件为ALU(Arithmetic Logic Unit,算术逻辑单元)和CU(Control Unit,控制单元)
-
I/O设备(受CU控制)
-
主存储器(Main Memory,MM),分为RAM(随机存储器)和ROM(只读存储器)
CPU与MM合成主机,I/O设备可称为外部设备
几乎所有的冯·诺伊曼型计算机的CPU,其工作都可以分为5个阶段:
1.取指令阶段 2.指令译码阶段 3.访存取数阶段 4.执行指令阶段 5.结果写回阶段 6.循环阶段
原文;链接:https://blog.csdn.net/fuhanghang/article/details/83421254
volatile可以避免指令重排内存屏障(可以想象成CPU指令)。作用:
1.保证特定的操作的执行顺序
2.可以保证某些变量的内存可见性(利用这些特性volatile实现了可见性)
一旦一个共享变量(类的成员变量、类的静态成员变量)被volatile修饰之后,那么就具备了两层语义:
1)保证了不同线程对这个变量进行操作时的可见性,即一个线程修改了某个变量的值,这新值对其他线程来说是立即可见的。
2)禁止进行指令重排序。
volatile关键字禁止指令重排序有两层意思:
1)当程序执行到volatile变量的读操作或者写操作时,在其前面的操作的更改肯定全部已经进行,且结果已经对后面的操作可见;在其后面的操作肯定还没有进行;
2)在进行指令优化时,不能将在对volatile变量访问的语句放在其后面执行,也不能把volatile变量后面的语句放到其前面执行。
观察加入volatile关键字和没有加入volatile关键字时所生成的汇编代码发现,加入volatile关键字时,会多出一个lock前缀指令
----摘自《深入理解Java虚拟机》
lock前缀指令实际上相当于一个内存屏障(也称为内存栅栏),内存屏障会提供3个功能:
1)它确保指令重排序时不会把其后面的指令排到内存屏障之前的位置,也不会把前面的指令排到内存屏障的后面;即在执行到内存屏障这句指令时,在它前面的操作已经全部完成;
2)它会强制将对缓存的修改操作立即写入主存;
3)如果是写操作,它会导致其他CPU中对应的缓存行无效。
摘自:https://www.cnblogs.com/dolphin0520/p/3920373.html
所以,Volatile可以保持可见性,不能保证原子性,由于内存屏障, 可以保证避免指令重排的现象产生,在单例模式中使用最多。
单例模式可以先看一下这个:https://blog.csdn.net/m0_57698935/article/details/120297308
CAS 什么是CAS?CAS是英文单词Compare and Swap的缩写,翻译过来就是比较并替换。
CAS机制中使用了3个基本操作数:内存地址V,旧的预期值A,要修改的新值B。
更新一个变量的时候,只有当变量的预期值A和内存地址V当中的实际值相同时,才会将内存地址V对应的值修改为B。
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
//比较并交换,
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
private volatile int value;
//可以得到调用者的类 Reflection.getCallerClass();
@CallerSensitive
public static Unsafe getUnsafe() {
Class var0 = Reflection.getCallerClass();
if (!VM.isSystemDomainLoader(var0.getClassLoader())) {
throw new SecurityException("Unsafe");
} else {
return theUnsafe;
}
}
CAS:比较当前工作内存中的值和主内存中的值,如果这个值是期望的,那么则执行操作!如果不是就一直循环
缺点:
- 由于底层是自旋锁,它的旋转会浪费时间!
- 一次只能保证一个共享变量的原子性
- ABA问题
java无法操作内存,所以java会通过native关键字调用本地方法区的本地方法一般为c++所写,而c++运行不通过虚拟机可以直接操作内存,所以Unsafe类可以看成是java的后门,可以通过这个课来操作计算机的内存。
package sang.steam;
import java.lang.reflect.ParameterizedType;
import java.util.concurrent.atomic.AtomicInteger;
public class CASDemo {
public static void main(String[] args) {
//原子类底层都运用的CAS
AtomicInteger atomicInteger = new AtomicInteger(2021);
//compareAndSet 比较并设置,
//expect 期望,update 更新
boolean b = atomicInteger.compareAndSet(2021, 2022);
//如果我期望的值达到了就更新,否则就不更新CAS是计算机的并发原语
System.out.println(b);
System.out.println(atomicInteger.get());
}
}
什么是ABA问题
package sang.steam;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class ABAProblem {
static int num = 1;
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(num);
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
atomicInteger.compareAndSet(num,2);
System.out.println();
},"A").start();
System.out.println(num);
//假设先执行的B线程,B将num的值先改成了3然后又改成了1
//之后又让A线程获取了。
new Thread(() -> {
atomicInteger.compareAndSet(num,3);
atomicInteger.compareAndSet(3,num);
},"B").start();
System.out.println(num);
}
}
原子引用
解决ABA问题,引入原子引用!对应的思想:乐观锁!
就是带版本号的原子操作
public class AtomicReferenceimplements java.io.Serializable public AtomicStampedReference(V initialRef, int initialStamp) { pair = Pair.of(initialRef, initialStamp); }
-
看compareAndSet的源码,里面是使用 == 进行比较的。
-
由于new的时候声明泛型肯定是装箱类,这个时候传入值类型将会自动装箱
-
自动装箱的后果就是地址不一致,使用==判断的结果就为false
-
总结:最好不使用原子类型,使用原子类型得保证比较时候传入的为同一个装箱类
package sang.steam;
import java.sql.Time;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicStampedReference;
public class ABADome {
static AtomicStampedReference atomicStampedReference = new AtomicStampedReference(2,1);
public static void main(String[] args) {
new Thread(() -> {
int stamp = atomicStampedReference.getStamp();
System.out.println(Thread.currentThread().getName() + "=>" + stamp);
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
atomicStampedReference.compareAndSet(2,3,
atomicStampedReference.getStamp(),atomicStampedReference.getStamp() + 1);
System.out.println(Thread.currentThread().getName() + "=>" + atomicStampedReference.getStamp());
atomicStampedReference.compareAndSet(3,2,
atomicStampedReference.getStamp(),atomicStampedReference.getStamp() + 1);
System.out.println(Thread.currentThread().getName() + "=>" + atomicStampedReference.getStamp());
},"A").start();
new Thread(() -> {
System.out.println(atomicStampedReference.getStamp());
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
atomicStampedReference.compareAndSet(2,3,
atomicStampedReference.getStamp(),atomicStampedReference.getStamp() + 1);
System.out.println(Thread.currentThread().getName() + atomicStampedReference.getStamp());
},"B").start();
}
}
多种锁的理解
公平锁、非公平锁
公平锁:非常公平,不能插队,必须先来后到!
非平锁:非常不公平,可以插队(默认都是非公平的)
// ReentrantLock默认为非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}//方法重载,默认都是非公平锁,如果传入true就会新建一个公平锁
//static final class NonfairSync extends Sync
//static final class FairSync extends Sync
//abstract static class Sync extends AbstractQueuedSynchronizer
//public abstract class AbstractOwnableSynchronizer implements java.io.Serializable
可重入锁
可重入锁(递归锁)
可重入锁也就是某个线程已经获得某个锁,可以再次获取锁而不会出现死锁
代码测试:
synchronized为非公平锁
package sang.steam;
import java.util.concurrent.locks.ReentrantLock;
//synchronized
public class LockDome {
public static void main(String[] args) {
Phone phone = new Phone();
new Thread(() -> {
phone.call();
},"A").start();
new Thread(() -> {
phone.send();
phone.call();
},"B").start();
}
}
class Phone{
public synchronized void call(){
System.out.println(Thread.currentThread().getName() + "=>" + "call");
send();
}
public synchronized void send(){
System.out.println(Thread.currentThread().getName() + "=> send");
}
}
//正常情况下,线程A执行成功之后会执行线程B,但是无论怎么运行都是线程B最后执行,这就是可重入锁
lock
package sang.steam;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class UnLockDome {
public static void main(String[] args) {
Phone1 phone = new Phone1();
new Thread(() -> {
phone.call();
},"A").start();
new Thread(() -> {
phone.send();
phone.call();
},"B").start();
}
}
class Phone1{
Lock reentrantLock = new ReentrantLock();
public void call(){
reentrantLock.lock();
try {
System.out.println(Thread.currentThread().getName() + "=>" + "call");
send();
} catch (Exception e) {
e.printStackTrace();
} finally {
reentrantLock.unlock();
}
}
public void send(){
reentrantLock.lock();
try {
System.out.println(Thread.currentThread().getName() + "=> send");
} catch (Exception e) {
e.printStackTrace();
} finally {
reentrantLock.unlock();
}
}
}
}
//问题,lock和unlock必须配对,否则会出现死锁
自旋锁
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
//标准的自旋锁
不断的尝试,直到成功为止
自定义自旋锁
CAS是自旋锁的核心
package sang.steam;
import java.util.concurrent.atomic.AtomicReference;
public class SpinlockDemo {
//Thread null
AtomicReference atomicReference=new AtomicReference<>();
//加锁
public void myLock(){
Thread thread=Thread.currentThread();
System.out.println(thread.getName()+"==>"+"myLock");
//自旋锁
while (!atomicReference.compareAndSet(null,thread)){
}
}
//解锁
public void myUnLock(){
Thread thread=Thread.currentThread();
System.out.println(thread.getName()+"==>"+"myUnLock");
atomicReference.compareAndSet(thread,null);
}
}
测试
package sang.steam;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
public class TestSpinLock {
public static void main(String[] args) {
// ReentrantLock reentrantLock = new ReentrantLock();
// reentrantLock.lock();
// reentrantLock.unlock();
//底层使用的自旋锁
SpinlockDemo spinlockDemo=new SpinlockDemo();
new Thread(()->{
spinlockDemo.myLock();
try {
TimeUnit.SECONDS.sleep(3);
} catch (Exception e) {
e.printStackTrace();
} finally {
spinlockDemo.myUnLock();
}
},"T1").start();
new Thread(()->{
spinlockDemo.myLock();
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
e.printStackTrace();
} finally {
spinlockDemo.myUnLock();
}
},"T2").start();
}
}
死锁
死锁是指两个或两个以上的进程在执行过程中,由于竞争资源或者由于彼此通信而造成的一种阻塞的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程。
https://www.jianshu.com/p/c4276da1d440这里讲的比较详细
am;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class UnLockDome {
public static void main(String[] args) {
Phone1 phone = new Phone1();
new Thread(() -> {
phone.call();
},“A”).start();
new Thread(() -> {
phone.send();
phone.call();
},"B").start();
}
}
class Phone1{
Lock reentrantLock = new ReentrantLock();
public void call(){
reentrantLock.lock();
try {
System.out.println(Thread.currentThread().getName() + "=>" + "call");
send();
} catch (Exception e) {
e.printStackTrace();
} finally {
reentrantLock.unlock();
}
}
public void send(){
reentrantLock.lock();
try {
System.out.println(Thread.currentThread().getName() + "=> send");
} catch (Exception e) {
e.printStackTrace();
} finally {
reentrantLock.unlock();
}
}
}
}
//问题,lock和unlock必须配对,否则会出现死锁
### 自旋锁
```java
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
//标准的自旋锁
不断的尝试,直到成功为止
自定义自旋锁
CAS是自旋锁的核心
package sang.steam;
import java.util.concurrent.atomic.AtomicReference;
public class SpinlockDemo {
//Thread null
AtomicReference atomicReference=new AtomicReference<>();
//加锁
public void myLock(){
Thread thread=Thread.currentThread();
System.out.println(thread.getName()+"==>"+"myLock");
//自旋锁
while (!atomicReference.compareAndSet(null,thread)){
}
}
//解锁
public void myUnLock(){
Thread thread=Thread.currentThread();
System.out.println(thread.getName()+"==>"+"myUnLock");
atomicReference.compareAndSet(thread,null);
}
}
测试
package sang.steam;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
public class TestSpinLock {
public static void main(String[] args) {
// ReentrantLock reentrantLock = new ReentrantLock();
// reentrantLock.lock();
// reentrantLock.unlock();
//底层使用的自旋锁
SpinlockDemo spinlockDemo=new SpinlockDemo();
new Thread(()->{
spinlockDemo.myLock();
try {
TimeUnit.SECONDS.sleep(3);
} catch (Exception e) {
e.printStackTrace();
} finally {
spinlockDemo.myUnLock();
}
},"T1").start();
new Thread(()->{
spinlockDemo.myLock();
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
e.printStackTrace();
} finally {
spinlockDemo.myUnLock();
}
},"T2").start();
}
}
死锁
死锁是指两个或两个以上的进程在执行过程中,由于竞争资源或者由于彼此通信而造成的一种阻塞的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程。
https://www.jianshu.com/p/c4276da1d440这里讲的比较详细



