前言一、基础
1.进程、线程怎么理解2.并发、并行和串行3.线程的四种创建方式4.常用方法
1.start和run2.sleep、yieId和setPriority3.join4.wait、notify和notifyAll 5.优雅的停止一个线程、守护线程6.synchronized互斥锁7.变量的线程安全分析 二、进阶
1.park、unpark2.死锁
定位死锁 3.活锁3.ReentrantLock4.原子整数
为什么无锁效率高CAS 的特点 5.原子引用
ABA问题 6.原子数组7.字段更新器8.字段累加器9.Unsafe9.线程池
1.线程池状态2.线程池构造方法3.固定大小线程池4.缓冲线程池5.单线程线程池6.具有任务调度功能的线程池7.线程池常用方法8.创建多大线程池合适9.如何处理线程池异常 10. ReentrantReadWriteLock11. StampedLock12. Semaphore13. CountdownLatch14. CyclicBarrier 拔高
前言 随着互联网的蓬勃发展,越来越多的人开始使用网络产品,流量剧增,导致在应用开发时不得不考虑并发安全性问题,本次向大家介绍java的并发技术——JUC。
一、基础 1.进程、线程怎么理解
拿生活中的例子来讲,大家对360安全卫士肯定有些了解,360安全卫士就可以理解是一个进程,它里面的每条任务调度流程都可以理解是一条线程(如每执行一次清理系统垃圾就是一条线程)。
2.并发、并行和串行假如一个人同一时刻只能做一件事是情况下:
并发:一个人以很快的速度去做三件事情,让人以为这三件事情都是同时进行的,其实这种情况并不能达到三件事同时进行的效果。
并行:三个人去做三件事情,那么就可以给每个人分一件事情,这样就可以达到三件事同时进行了。
串行:一个人按序就班的去做三件事情,A事情做完才能去做B事情,B事情做完了才能去做C事情。
这三种情况,并行的执行效率最好,串行效率最差,在多核CPU的机器上可以达到并行的效果,但在单核CPU加上最多只能达到并发的效果。
- 继承Thread类实现Runable接口Callable搭配FutureTask线程池
第三种创建方式代码如下:
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
@Slf4j
public class CreataThread{
public static void main(String[] args) throws ExecutionException, InterruptedException {
// Callable是个函数式接口,我这里用lambda表达式简化了
FutureTask integerFutureTask = new FutureTask<>(()->{
{
log.debug("runing......");
Thread.sleep(2000);
return 1;
}
});
new Thread(integerFutureTask,"t1").start();
// 哪个线程想要返回的结果就让哪条线程去调用get方法,get方法会阻塞当前线程(即调用线程),直至t1线程执行完毕
log.debug("结果{}",integerFutureTask.get());
}
}
如下结果如下:
start方法:该方法是Thread类的方法,它的作用是启动线程,使线程进入就绪状态,供CPU调用。
run方法:该方法是Runable接口中的方法,多被子类重写,若直接调用该方法,就仅仅只是一次普通的方法调用,达不到启动线程的作用。
sleep方法:Thread的静态方法,会使当前线程进入阻塞状态并放弃CPU使用权,其他线程可以通过调用当前线程的interrupt方法打断其睡眠,下面有详细代码示例。
yieId方法:Thread的静态方法,使当前线程从运行状态进入就绪状态,主动让出CPU使用权。
setPriority方法:Thread的方法,用来设置线程的优先级,线程优先级可设置成1到10,1优先级最低,10优先级最高,默认优先级为5,原则上优先级高的线程比优先级低的线程获得更多的CPU使用权,但事实上还是得看自己的运气,起不到决定性作用。
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@Slf4j
public class CreataThread{
public static void main(String[] args) throws ExecutionException, InterruptedException {
Thread t1 = new Thread(() -> {
log.debug("runing...");
try {
// 睡上2秒
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
log.debug("被打断睡眠.....");
e.printStackTrace();
}
}, "t1");
t1.start();
TimeUnit.SECONDS.sleep((long) 0.5);
// 主线程调用interrupt方法打断t1线程睡眠
t1.interrupt();
// t1.isInterrupted()可以查看t1线程有没有被打断过,如果t1是在sleep、wait、join时被打断的话会返回false
//反之返回true,可以通过此机制来优雅的停止一个线程,下面有代码示例
}
}
如下结果如下:
join方法是Thread的方法,调用哪个线程的join方法那么就得等这个线程执行完才能继续往下执行。
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
@Slf4j
public class CreataThread{
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
log.debug("runing...");
try {
// 睡2秒
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t1");
t1.start();
log.debug("要开始等着t1了");
t1.join();
log.debug("终于等完了,可以往下执行了");
}
}
如下结果如下:
这一组方法是搭配使用的,可达到线程通信的目的
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
@Slf4j
public class CreataThread {
static boolean yan = false;
static boolean waiMai = false;
static Object lock = new Object();
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
synchronized (lock) {
// 这样用while是为了让线程醒来之后重新判断执行条件,防止虚假唤醒
while (!yan) {
log.debug("没烟干不了活啊!");
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("有烟了,开始干活!");
}
}, "t1");
Thread t2 = new Thread(() -> {
log.debug("好饿啊,我想吃外卖,不吃饱干不了活!");
synchronized (lock) {
while (!waiMai) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("吃饱了,开始干活!");
}
}, "t2");
t1.start();
t2.start();
// 主线程睡一秒,让t1、t2线程都wait,进而主线程可以拿到锁执行notifyAll唤醒操作
TimeUnit.SECONDS.sleep(1);
synchronized (lock){
log.debug("外卖到咯!");
waiMai = true;
lock.notifyAll();
}
}
}
执行结果如下:
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
@Slf4j
public class CreataThread{
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
while (true){
// 检验有没有被打断过,被打断过就退出循环结束线程
if (Thread.currentThread().isInterrupted()){
log.debug("完蛋,被开除了,o(╥﹏╥)o");
break;
}
log.debug("摸鱼玩手机。。。。");
}
}, "t1");
// 守护线程只有在所有非守护线程都运行结束之后才会停止运行,就算它的任务没执行完成也会停止,例如垃圾回收线程
// 把t1设置成守护线程
// t1.setDaemon(true);
t1.start();
TimeUnit.SECONDS.sleep(3);
log.debug("这t1老是摸鱼,开除他");
// 执行打断t1操作
t1.interrupt();
}
}
6.synchronized互斥锁
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class CreataThread{
static int i= 0;
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
for (int j = 0;j < 5000;j++){
i++;
}
}, "t1");
Thread t2 = new Thread(() -> {
for (int j = 0;j < 5000;j++){
i--;
}
}, "t2");
t1.start();
t2.start();
t1.join();
t2.join();
log.debug("{}",count);
}
}
如下结果如下:
一个线程做5000次加1操作另一个线程做5000次减一操作,最终的结果应该是0才对,那为什么最终结果是-1366呢?
i++时:
i- -时:
只要一来就会出现线程安全问题
所以我们得保证i++和i- - 是原子的,我们可以通过synchronized关键字来实现,代码修改之后如下:
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class CreataThread{
static int count = 0;
static Object obj = new Object();
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
for (int j = 0;j < 5000;j++){
synchronized (obj){
count++;
}
}
}, "t1");
Thread t2 = new Thread(() -> {
for (int j = 0;j < 5000;j++){
synchronized (obj){
count--;
}
}
}, "t2");
t1.start();
t2.start();
t1.join();
t2.join();
log.debug("{}",count);
}
}
值得注意的是,synchronized加在普通方法上锁的是this对象,加在静态方法上锁的是类的字节码对象,同步块的锁需要手动指定。
7.变量的线程安全分析
局部变量引用的对象的例子如下:
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
@Slf4j
public class CreataThread{
static final int THREAD_NUMBER = 2;
static final int LOOP_NUMBER = 200;
public static void main(String[] args) {
ThreadSafe test = new exThreadSafe();
for (int i = 0; i < THREAD_NUMBER; i++) {
new Thread(() -> {
test.method1(LOOP_NUMBER);
}, "Thread" + i).start();
}
}
}
class ThreadSafe {
public void method1(int loopNumber) {
// 局部的引用变量,method1方法被final修饰了,method2、method3方法的访问修饰符都是private它们都不能被子类重写
// 每条线程都会在堆里新建一个list对象然后作为参数传给method2、method3方法,所以线程和线程之间用的list对象是隔离的
ArrayList list = new ArrayList<>();
for (int i = 0; i < loopNumber; i++) {
method2(list);
method3(list);
}
}
private void method2(ArrayList list) {
list.add("1");
}
public void method3(ArrayList list) {
list.remove(0);
}
}
// 当把method3方法的访问修饰符改成public,那么该方法就会被子类重写,这样就会有线程安全问题了
class exThreadSafe extends ThreadSafe{
@Override
public void method3(ArrayList list) {
// 这时候下面这个线程用的list是前一个线程传递的,也就是这个list被多条线程共享了(list被暴露给其他线程了),那么就会有线程安全问题
new Thread(()->{
list.remove(0);
}).start();
}
}
二、进阶
1.park、unpark
代码如下(示例):
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
@Slf4j
public class CreataThread {
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
log.debug("runing...");
// park会使当前线程进入阻塞
LockSupport.park();
log.debug("end...");
}, "t1");
t1.start();
TimeUnit.SECONDS.sleep(1);
// unpark可以精准唤醒park住的线程
// 不一定非得先park再unpark才能唤醒线程,先unpark再park也是可以唤醒线程的
LockSupport.unpark(t1);
}
}
执行结果如下:
与wait、notify和notifyAll相比,其有一下特点:
- 无须作用在synchronized下。可精准唤醒线程。无须先park再unpark,它们的顺序随意,但在wait、notify和notifyAll里就必须先wait再notify或notifyAll。
多线程情况下,线程在获得了一把锁时还相互争夺对方的锁就会产生死锁(吃着碗里瞧着锅里)。
代码如下(示例):
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
@Slf4j
public class CreataThread {
static Object A = new Object();
static Object B = new Object();
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
synchronized (A) {
log.debug("获得A锁。。。。");
try {
TimeUnit.SECONDS.sleep(1);
log.debug("尝试获取B锁。。。。");
synchronized (B) {
log.debug("获取到了B锁。。。。");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "t1");
Thread t2 = new Thread(() -> {
synchronized (B) {
log.debug("获得B锁。。。。");
try {
TimeUnit.SECONDS.sleep(1);
log.debug("尝试获取A锁。。。。");
synchronized (A) {
log.debug("获取到了A锁。。。。");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "t2");
t1.start();
t2.start();
}
}
执行结果如下:
可以使用JPS定位进程ID查看其内部线程情况。在idea的terminal中输入jps之后就会显示一些正在运行的进程,在用jstack命令查看具体进程信息,例如想看ID为123的进程,那命令就为jstack 123。我的查看结果如下:
活锁出现在两个线程互相改变对方的结束条件,最后谁也无法结束,例如下面的代码:
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class CreataThread {
static volatile int count = 10;
public static void main(String[] args) {
new Thread(() -> {
// 期望减到 0 退出循环
while (count > 0) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
count--;
log.debug("count: {}", count);
}
}, "t1").start();
new Thread(() -> {
// 期望超过 20 退出循环
while (count < 20) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
count++;
log.debug("count: {}", count);
}
}, "t2").start();
}
}
3.ReentrantLock
- 可重入
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j
public class CreataThread {
static volatile int count = 10;
static ReentrantLock lock = new ReentrantLock();
private static void method(){
try {
lock.lock();
log.debug("获得第二把锁");
}finally {
lock.unlock();
log.debug("释放第二把锁");
}
}
public static void main(String[] args) {
new Thread(() -> {
try {
lock.lock();
log.debug("获得第一把锁");
method();
}finally {
lock.unlock();
log.debug("释放第一把锁");
}
}, "t1").start();
}
}
- 可设置成公平锁
ReentrantLock lock = new ReentrantLock(true); // 默认是非公平锁,非公平锁性能好一些
- 可设置超时时间
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j
public class CreataThread {
static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
try {
// 尝试2秒去获取锁
if (!lock.tryLock(2,TimeUnit.SEConDS )){
log.debug("没获得锁!");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
log.debug("获取到锁了");
}finally {
lock.unlock();
}
}, "t1");
// 主线程先获得锁
lock.lock();
t1.start();
TimeUnit.SECONDS.sleep(1);
lock.unlock();
}
}
- 可打断
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j
public class CreataThread {
static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
try {
// 可打断式的加锁
log.debug("尝试加锁!");
lock.lockInterruptibly();
} catch (InterruptedException e) {
log.debug("没有获得锁!");
e.printStackTrace();
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
t1.start();
// 打断t1线程的等待
t1.interrupt();
}
}
- 支持多个条件变量
synchronized 中也有条件变量 waitSet (后面会讲到),当条件不满足时进入 waitSet 等待
ReentrantLock 的条件变量比 synchronized 强大之处在于,它是支持多个条件变量的
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j
public class CreataThread {
static ReentrantLock lock = new ReentrantLock();
// 新建两个环境变量
static Condition waitCigaretteQueue = lock.newCondition();
static Condition waitbreakfastQueue = lock.newCondition();
static volatile boolean hasCigrette = false;
static volatile boolean hasBreakfast = false;
private static void sendCigarette() {
lock.lock();
try {
log.debug("送烟来了");
hasCigrette = true;
waitCigaretteQueue.signal();
} finally {
lock.unlock();
}
}
private static void sendBreakfast() {
lock.lock();
try {
log.debug("送早餐来了");
hasBreakfast = true;
waitbreakfastQueue.signal();
} finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
try {
lock.lock();
while (!hasCigrette) {
try {
waitCigaretteQueue.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("等到了它的烟");
} finally {
lock.unlock();
}
}).start();
new Thread(() -> {
try {
lock.lock();
while (!hasBreakfast) {
try {
waitbreakfastQueue.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("等到了它的早餐");
} finally {
lock.unlock();
}
}).start();
TimeUnit.SECONDS.sleep(1);
sendBreakfast();
TimeUnit.SECONDS.sleep(1);
sendCigarette();
}
}
4.原子整数
JUC 并发包提供了AtomicInteger、AtomicLong等原子整数,基于CAS实现无锁并发,下面以AtomicInteger为例。
AtomicInteger i = new AtomicInteger(0); // 获取并自增(i = 0, 结果 i = 1, 返回 0),类似于 i++ System.out.println(i.getAndIncrement()); // 自增并获取(i = 1, 结果 i = 2, 返回 2),类似于 ++i System.out.println(i.incrementAndGet()); // 自减并获取(i = 2, 结果 i = 1, 返回 1),类似于 --i System.out.println(i.decrementAndGet()); // 获取并自减(i = 1, 结果 i = 0, 返回 1),类似于 i-- System.out.println(i.getAndDecrement()); // 获取并加值(i = 0, 结果 i = 5, 返回 0) System.out.println(i.getAndAdd(5)); // 加值并获取(i = 5, 结果 i = 0, 返回 0) System.out.println(i.addAndGet(-5)); // 获取并更新(i = 0, p 为 i 的当前值, 结果 i = -2, 返回 0) // 其中函数中的操作能保证原子,但函数需要无副作用 System.out.println(i.getAndUpdate(p -> p - 2)); // 更新并获取(i = -2, p 为 i 的当前值, 结果 i = 0, 返回 0) // 其中函数中的操作能保证原子,但函数需要无副作用 System.out.println(i.updateAndGet(p -> p + 2)); // 获取并计算(i = 0, p 为 i 的当前值, x 为参数1, 结果 i = 10, 返回 0) // 其中函数中的操作能保证原子,但函数需要无副作用 // getAndUpdate 如果在 lambda 中引用了外部的局部变量,要保证该局部变量是 final 的 // getAndAccumulate 可以通过 参数1 来引用外部的局部变量,但因为其不在 lambda 中因此不必是 final System.out.println(i.getAndAccumulate(10, (p, x) -> p + x)); // 计算并获取(i = 10, p 为 i 的当前值, x 为参数1, 结果 i = 0, 返回 0) // 其中函数中的操作能保证原子,但函数需要无副作用 System.out.println(i.accumulateAndGet(-10, (p, x) -> p + x));为什么无锁效率高
无锁情况下,即使重试失败,线程始终在高速运行,没有停歇,而 synchronized 会让线程在没有获得锁的时候,发生上下文切换,进入阻塞。打个比喻线程就好像高速跑道上的赛车,高速运行时,速度超快,一旦发生上下文切换,就好比赛车要减速、熄火,等被唤醒又得重新打火、启动、加速… 恢复到高速运行,代价比较大,但无锁情况下,因为线程要保持运行,需要额外 CPU 的支持,CPU 在这里就好比高速跑道,没有额外的跑道,线程想高速运行也无从谈起,虽然不会进入阻塞,但由于没有分到时间片,仍然会进入可运行状态,还
是会导致上下文切换。
结合 CAS 和 volatile 可以实现无锁并发,适用于线程数少、多核 CPU 的场景下。
CAS 是基于乐观锁的思想:最乐观的估计,不怕别的线程来修改共享变量,就算改了也没关系,我吃亏点再
重试呗。
synchronized 是基于悲观锁的思想:最悲观的估计,得防着其它线程来修改共享变量,我上了锁你们都别想
改,我改完了解开锁,你们才有机会。
CAS 体现的是无锁并发、无阻塞并发,请仔细体会这两句话的意思
因为没有使用 synchronized,所以线程不会陷入阻塞,这是效率提升的因素之一
但如果竞争激烈,可以想到重试必然频繁发生,反而效率会受影响
由于我们并不都是保护基本类型,也可能保护的是引用类型,所以JUC 并发包提供了AtomicReference、AtomicMarkableReference、AtomicStampedReference等原子引用类型。
import lombok.extern.slf4j.Slf4j;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
@Slf4j
public class CreataThread {
AtomicReference ref;
public CreataThread(BigDecimal balance) {
ref = new AtomicReference<>(balance);
}
public BigDecimal getBalance() {
return ref.get();
}
public void withdraw(BigDecimal amount) {
while (true) {
BigDecimal prev = ref.get();
BigDecimal next = prev.subtract(amount);
// cas操作,比较并赋值
if (ref.compareAndSet(prev, next)) {
break;
}
}
}
public static void main(String[] args) throws InterruptedException {
CreataThread creataThread = new CreataThread(new BigDecimal("10000"));
List ts = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
ts.add(new Thread(() -> {
creataThread.withdraw(BigDecimal.TEN);
}));
}
ts.forEach(Thread::start);
ts.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println(creataThread.getBalance());
}
}
ABA问题
看下面代码:
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.atomic.AtomicReference;
@Slf4j
public class CreataThread {
static AtomicReference ref = new AtomicReference<>("A");
public static void main(String[] args) throws InterruptedException {
log.debug("main start...");
// 获取值 A
// 这个共享变量被它线程修改过?
String prev = ref.get();
other();
Thread.sleep(1000);
// 尝试改为 C
log.debug("change A->C {}", ref.compareAndSet(prev, "C"));
}
private static void other() throws InterruptedException {
new Thread(() -> {
log.debug("change A->B {}", ref.compareAndSet(ref.get(), "B"));
}, "t1").start();
Thread.sleep(500);
new Thread(() -> {
log.debug("change B->A {}", ref.compareAndSet(ref.get(), "A"));
}, "t2").start();
}
}
主线程仅能判断出共享变量的值与最初值 A 是否相同,不能感知到这种从 A 改为 B 又 改回 A 的情况,如果主线程希望:只要有其它线程【动过了】共享变量,那么自己的 cas 就算失败,这时,仅比较值是不够的,需要再加一个版本号,AtomicStampedReference可解决ABA问题。
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.atomic.AtomicStampedReference;
@Slf4j
public class CreataThread {
static AtomicStampedReference ref = new AtomicStampedReference<>("A", 0);
public static void main(String[] args) throws InterruptedException {
log.debug("main start...");
// 获取值 A
String prev = ref.getReference();
// 获取版本号
int stamp = ref.getStamp();
log.debug("版本 {}", stamp);
// 如果中间有其它线程干扰,发生了 ABA 现象
other();
Thread.sleep(1000);
// 尝试改为 C
log.debug("change A->C {}", ref.compareAndSet(prev, "C", stamp, stamp + 1));
}
private static void other() throws InterruptedException {
new Thread(() -> {
log.debug("change A->B {}", ref.compareAndSet(ref.getReference(), "B",
ref.getStamp(), ref.getStamp() + 1));
log.debug("更新版本为 {}", ref.getStamp());
}, "t1").start();
Thread.sleep(500);
new Thread(() -> {
log.debug("change B->A {}", ref.compareAndSet(ref.getReference(), "A",
ref.getStamp(), ref.getStamp() + 1));
log.debug("更新版本为 {}", ref.getStamp());
}, "t2").start();
}
}
AtomicStampedReference 可以给原子引用加上版本号,追踪原子引用整个的变化过程,如: A -> B -> A -> C ,通过
AtomicStampedReference,我们可以知道,引用变量中途被更改了几次。但是有时候,并不关心引用变量更改了几次,只是单纯的关心是否更改过,所以就有了AtomicMarkableReference
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.atomic.AtomicMarkableReference;
class GarbageBag {
String desc;
public GarbageBag(String desc) {
this.desc = desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
@Override
public String toString() {
return super.toString() + " " + desc;
}
}
@Slf4j
public class CreataThread {
public static void main(String[] args) throws InterruptedException {
GarbageBag bag = new GarbageBag("装满了垃圾");
// 参数2 mark 可以看作一个标记,表示垃圾袋满了
AtomicMarkableReference ref = new AtomicMarkableReference<>(bag, true);
log.debug("老王 start...");
GarbageBag prev = ref.getReference();
log.debug(prev.toString());
// 保洁阿姨把垃圾袋里的垃圾清理掉了,现在垃圾袋里是空的,标识就变成false
new Thread(() -> {
log.debug("保洁阿姨 start...");
bag.setDesc("空垃圾袋");
while (!ref.compareAndSet(bag, bag, true, false)) {}
log.debug(bag.toString());
}).start();
// 老王今天很勤快,他装备去把垃圾袋清理一下,但在这之前,保洁阿姨就已经把垃圾清理完了(标识变成false了),显然老王最后是没倒成垃圾
Thread.sleep(1000);
log.debug("老王想倒垃圾?");
boolean success = ref.compareAndSet(prev, new GarbageBag("空垃圾袋"), true, false);
log.debug("倒成了么?" + success);
log.debug(ref.getReference().toString());
}
}
6.原子数组
AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray
7.字段更新器AtomicReferenceFieldUpdater、AtomicIntegerFieldUpdater、AtomicLongFieldUpdater
利用字段更新器,可以针对对象的某个域(Field)进行原子操作,只能配合 volatile 修饰的字段使用,否则会出现
异常。
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@Slf4j
public class CreataThread {
private volatile int field;
public static void main(String[] args) {
AtomicIntegerFieldUpdater fieldUpdater =
AtomicIntegerFieldUpdater.newUpdater(CreataThread.class, "field");
CreataThread test5 = new CreataThread();
fieldUpdater.compareAndSet(test5, 0, 10);
// 修改成功 field = 10
System.out.println(test5.field);
// 修改成功 field = 20
fieldUpdater.compareAndSet(test5, 10, 20);
System.out.println(test5.field);
// 修改失败 field = 20
fieldUpdater.compareAndSet(test5, 10, 30);
System.out.println(test5.field);
}
}
8.字段累加器
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Consumer;
import java.util.function.Supplier;
@Slf4j
public class CreataThread {
private static void demo(Supplier adderSupplier, Consumer action) {
T adder = adderSupplier.get();
long start = System.nanoTime();
List ts = new ArrayList<>();
// 4 个线程,每人累加 50 万
for (int i = 0; i < 40; i++) {
ts.add(new Thread(() -> {
for (int j = 0; j < 500000; j++) {
action.accept(adder);
}
}));
}
ts.forEach(t -> t.start());
ts.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long end = System.nanoTime();
System.out.println(adder + " cost:" + (end - start)/1000_000);
}
public static void main(String[] args) {
for (int i = 0; i < 5; i++) {
demo(() -> new LongAdder(), adder -> adder.increment());
}
for (int i = 0; i < 5; i++) {
demo(() -> new AtomicLong(), adder -> adder.getAndIncrement());
}
}
}
执行结果如下:
性能提升的原因很简单,就是在有竞争时,设置多个累加单元,Therad-0 累加 Cell[0],而 Thread-1 累加Cell[1]… 最后将结果汇总。这样它们在累加时操作的不同的 Cell 变量,因此减少了 CAS 重试失败,从而提高性能,但拆分的单元数不会大于CPU数。
Unsafe 对象提供了非常底层的,操作内存、线程的方法,Unsafe 对象不能直接调用,只能通过反射获得,java.util.concurrent.atomic包下的原子类的方法很多都是调用的Unsafe 的方法。
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import sun.misc.Unsafe;
import java.lang.reflect.Field;
@Data
class Student {
volatile int id;
volatile String name; }
@Slf4j
public class CreataThread {
static Unsafe unsafe;
static {
try {
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
unsafe = (Unsafe) theUnsafe.get(null);
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new Error(e);
}
}
public static void main(String[] args) throws NoSuchFieldException {
Field id = Student.class.getDeclaredField("id");
Field name = Student.class.getDeclaredField("name");
// 获得成员变量的偏移量
long idOffset = CreataThread.unsafe.objectFieldOffset(id);
long nameOffset = CreataThread.unsafe.objectFieldOffset(name);
Student student = new Student();
// 使用 cas 方法替换成员变量的值
CreataThread.unsafe.compareAndSwapInt(student, idOffset, 0, 20); // 返回 true
CreataThread.unsafe.compareAndSwapObject(student, nameOffset, null, "张三"); // 返回 true
System.out.println(student);
}
}
9.线程池
1.线程池状态
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
线程池的工作流程如下:
- 线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务。当线程数达到 corePoolSize 并没有线程空闲,这时再加入任务,新加的任务会被加入workQueue 队列排队,直到有空闲的线程。如果队列选择了有界队列,那么任务超过了队列大小时,会创建 maximumPoolSize - corePoolSize 数目的线程来救急。如果线程到达 maximumPoolSize 仍然有新任务这时会执行拒绝策略。拒绝策略 jdk 提供了 4 种实现:AbortPolicy 让调用者抛出 RejectedExecutionException 异常,这是默认策略、CallerRunsPolicy 让调用者运行任务、DiscardPolicy 放弃本次任务、DiscardOldestPolicy 放弃队列中最早的任务,本任务取而代之。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new linkedBlockingQueue());
}
4.缓冲线程池
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue());
}
// 核心线程数为0,最大线程数为Integer.MAX_VALUE,所以此线程池的所有线程都是急救线程,存活时间是60
//秒,队列采用了 SynchronousQueue 实现特点是,它没有容量,没有线程来取是放不进去的(一手交钱、一手
//交货)
5.单线程线程池
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new linkedBlockingQueue()));
}
6.具有任务调度功能的线程池
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@Slf4j
public class CreataThread {
public static void main(String[] args) {
// 任务调度功能的线程池
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
// 参数一:任务 参数二:延迟时间 参数三:执行的间隔时间 参数四:时间单位
// 延迟一秒之后,每个一秒就打印一次runing...
scheduledExecutorService.scheduleAtFixedRate(()->{
log.debug("runing...");
}, 1,1 ,TimeUnit.SEConDS );
}
}
7.线程池常用方法
// 执行任务 void execute(Runnable command); // 提交任务 task,用返回值 Future 获得任务执行结果8.创建多大线程池合适Future submit(Callable task); // 提交 tasks 中所有任务 List > invokeAll(Collection extends Callable > tasks) throws InterruptedException; // 提交 tasks 中所有任务,带超时时间 List > invokeAll(Collection extends Callable > tasks, long timeout, TimeUnit unit) throws InterruptedException; // 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消 T invokeAny(Collection extends Callable > tasks) throws InterruptedException, ExecutionException; // 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间 T invokeAny(Collection extends Callable > tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; void shutdown(); List shutdownNow();
- CPU密集型:通常采用CPU核数 + 1能够实现最优的CPU利用率,+1是保证当线程由于某种原因导致暂停时有额外的这个线程顶上去。I/O密集型:CPU不总是处于繁忙状态,例如,当你执行业务计算时,这时候会使用CPU资源,但在执行IO操作额远程RPC调用的时候,CPU就闲下来了,你可以利用多线程提高它的利用率
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@Slf4j
public class CreataThread {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService threadPool = Executors.newFixedThreadPool(1);
// 1.主动try/catch,在catch里记录异常信息(推荐使用这种方法)
// threadPool.execute(()->{
// try {
// log.debug("runing...");
// int i = 1/0;
// }catch (Exception e){
// log.error(e.getMessage());
// e.printStackTrace();
// }
// });
// 2.submit配合callable得到Future,通过Future的get方法获取异常信息
Future submit = threadPool.submit(() -> {
log.debug("runing...");
int i = 1 / 0;
return true;
});
log.debug("{}",submit.get());
}
}
10. ReentrantReadWriteLock
当读操作远远高于写操作时,这时候使用 读写锁 让 读-读 可以并发,提高性能,读写和写写都是互斥的,重入时升级不支持:即持有读锁的情况下去获取写锁,会导致获取写锁永久等待,重入时降级支持:即持有写锁的情况下去获取读锁。
11. StampedLock是为了进一步优化读性能,它的特点是在使用读锁、写锁时都必须配合【戳】使用,StampedLock 支持 tryOptimisticRead() 方法(乐观读),读取完毕后需要做一次 戳校验 如果校验通过,表示这期间确实没有写操作,数据可以安全使用,如果校验没通过,需要重新获取读锁,保证数据安全,与ReentrantReadWriteLock不同的是ReentrantReadWriteLock的写锁可以降到读锁的,但StampedLock的读写锁无法升、降级。
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.locks.StampedLock;
import static java.lang.Thread.sleep;
@Slf4j
public class CreataThread {
int data;
private final StampedLock lock = new StampedLock();
public CreataThread(int data) {
this.data = data;
}
public int read(int readTime) throws InterruptedException {
// 进行一次乐观读,返回一个戳
long stamp = lock.tryOptimisticRead();
log.debug("optimistic read locking...{}", stamp);
sleep(readTime);
// 验证戳
if (lock.validate(stamp)) {
log.debug("read finish...{}, data:{}", stamp, data);
return data;
}
// 戳验证失败锁升级 - 读锁
log.debug("updating to read lock... {}", stamp);
try {
stamp = lock.readLock();
log.debug("read lock {}", stamp);
sleep(readTime);
log.debug("read finish...{}, data:{}", stamp, data);
return data;
} finally {
log.debug("read unlock {}", stamp);
lock.unlockRead(stamp);
}
}
public void write(int newData) {
long stamp = lock.writeLock();
log.debug("write lock {}", stamp);
try {
sleep(2);
this.data = newData;
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
log.debug("write unlock {}", stamp);
lock.unlockWrite(stamp);
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
CreataThread creataThread = new CreataThread(234);
new Thread(() -> {
try {
creataThread.read(creataThread.data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t1").start();
new Thread(() -> {
creataThread.write(creataThread.data);
}, "t2").start();
}
}
12. Semaphore
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@Slf4j
public class CreataThread {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// Semaphore可简单的实现限流的功能,可设置最多有几个线程同时运行
Semaphore semaphore = new Semaphore(3);
for (int i = 0;i < 10;i++){
new Thread(()->{
try {
// 获取许可,只有获取到许可的线程才可以往下运行
semaphore.acquire();
log.debug("runing...");
TimeUnit.SECONDS.sleep(1);
log.debug("释放许可");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
semaphore.release();
}
},"Thread-"+i).start();
}
}
}
13. CountdownLatch
用来进行线程同步协作,等待所有线程完成倒计时。
其中构造参数用来初始化等待计数值,await() 用来等待计数归零,countDown() 用来让计数减一
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@Slf4j
public class CreataThread {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(2);
Thread t1 = new Thread(() -> {
log.debug("runing...");
// 执行CountDownLatch-1操作
countDownLatch.countDown();
}, "t1");
Thread t2 = new Thread(() -> {
log.debug("runing...");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 执行CountDownLatch-1操作
countDownLatch.countDown();
}, "t2");
t1.start();
t2.start();
countDownLatch.await();
log.debug("等待完毕,可以往下执行了!");
}
}
14. CyclicBarrier
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
@Slf4j
public class CreataThread {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService threadPool = Executors.newFixedThreadPool(2);
// CyclicBarrier可重复利用,当参数减为0之后会自动填充为2
CyclicBarrier cyclicBarrier = new CyclicBarrier(2,()->{
log.debug("最后任务完成");
});
threadPool.submit(()->{
log.debug("runing...");
try {
// CyclicBarrier执行减1,不为0就等待
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
});
threadPool.submit(()->{
log.debug("runing...");
try {
TimeUnit.SECONDS.sleep(2);
// CyclicBarrier执行减1,减为0之后,所有在CyclicBarrier上等待的线程都恢复运行
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
});
}
}
拔高
提示:这里对文章进行总结:
例如:以上就是今天要讲的内容,本文仅仅简单介绍了pandas的使用,而pandas提供了大量能使我们快速便捷地处理数据的函数和方法。



