栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

JUC并发编程

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

JUC并发编程

文章目录

前言一、基础

1.进程、线程怎么理解2.并发、并行和串行3.线程的四种创建方式4.常用方法

1.start和run2.sleep、yieId和setPriority3.join4.wait、notify和notifyAll 5.优雅的停止一个线程、守护线程6.synchronized互斥锁7.变量的线程安全分析 二、进阶

1.park、unpark2.死锁

定位死锁 3.活锁3.ReentrantLock4.原子整数

为什么无锁效率高CAS 的特点 5.原子引用

ABA问题 6.原子数组7.字段更新器8.字段累加器9.Unsafe9.线程池

1.线程池状态2.线程池构造方法3.固定大小线程池4.缓冲线程池5.单线程线程池6.具有任务调度功能的线程池7.线程池常用方法8.创建多大线程池合适9.如何处理线程池异常 10. ReentrantReadWriteLock11. StampedLock12. Semaphore13. CountdownLatch14. CyclicBarrier 拔高


前言 随着互联网的蓬勃发展,越来越多的人开始使用网络产品,流量剧增,导致在应用开发时不得不考虑并发安全性问题,本次向大家介绍java的并发技术——JUC。
一、基础 1.进程、线程怎么理解

拿生活中的例子来讲,大家对360安全卫士肯定有些了解,360安全卫士就可以理解是一个进程,它里面的每条任务调度流程都可以理解是一条线程(如每执行一次清理系统垃圾就是一条线程)。

2.并发、并行和串行

假如一个人同一时刻只能做一件事是情况下:
并发:一个人以很快的速度去做三件事情,让人以为这三件事情都是同时进行的,其实这种情况并不能达到三件事同时进行的效果。
并行:三个人去做三件事情,那么就可以给每个人分一件事情,这样就可以达到三件事同时进行了。
串行:一个人按序就班的去做三件事情,A事情做完才能去做B事情,B事情做完了才能去做C事情。
这三种情况,并行的执行效率最好,串行效率最差,在多核CPU的机器上可以达到并行的效果,但在单核CPU加上最多只能达到并发的效果。

3.线程的四种创建方式
    继承Thread类实现Runable接口Callable搭配FutureTask线程池
    第三种创建方式代码如下:
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
@Slf4j
public class CreataThread{
 public static void main(String[] args) throws ExecutionException, InterruptedException {
 // Callable是个函数式接口,我这里用lambda表达式简化了
  FutureTask integerFutureTask = new FutureTask<>(()->{
   {
    log.debug("runing......");
    Thread.sleep(2000);
    return 1;
   }
  });
  new Thread(integerFutureTask,"t1").start();

  // 哪个线程想要返回的结果就让哪条线程去调用get方法,get方法会阻塞当前线程(即调用线程),直至t1线程执行完毕
  log.debug("结果{}",integerFutureTask.get());
 }
}


如下结果如下:

4.常用方法 1.start和run

start方法:该方法是Thread类的方法,它的作用是启动线程,使线程进入就绪状态,供CPU调用。
run方法:该方法是Runable接口中的方法,多被子类重写,若直接调用该方法,就仅仅只是一次普通的方法调用,达不到启动线程的作用。

2.sleep、yieId和setPriority

sleep方法:Thread的静态方法,会使当前线程进入阻塞状态并放弃CPU使用权,其他线程可以通过调用当前线程的interrupt方法打断其睡眠,下面有详细代码示例。
yieId方法:Thread的静态方法,使当前线程从运行状态进入就绪状态,主动让出CPU使用权。
setPriority方法:Thread的方法,用来设置线程的优先级,线程优先级可设置成1到10,1优先级最低,10优先级最高,默认优先级为5,原则上优先级高的线程比优先级低的线程获得更多的CPU使用权,但事实上还是得看自己的运气,起不到决定性作用。

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

@Slf4j
public class CreataThread{
 public static void main(String[] args) throws ExecutionException, InterruptedException {
  Thread t1 = new Thread(() -> {
   log.debug("runing...");
   try {
    // 睡上2秒
    TimeUnit.SECONDS.sleep(2);
   } catch (InterruptedException e) {
    log.debug("被打断睡眠.....");
    e.printStackTrace();
   }
  }, "t1");
  t1.start();

  TimeUnit.SECONDS.sleep((long) 0.5);
  // 主线程调用interrupt方法打断t1线程睡眠
  t1.interrupt();
  // t1.isInterrupted()可以查看t1线程有没有被打断过,如果t1是在sleep、wait、join时被打断的话会返回false
  //反之返回true,可以通过此机制来优雅的停止一个线程,下面有代码示例
 }
}

如下结果如下:

3.join

join方法是Thread的方法,调用哪个线程的join方法那么就得等这个线程执行完才能继续往下执行。

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.TimeUnit;

@Slf4j
public class CreataThread{
 public static void main(String[] args) throws InterruptedException {
  Thread t1 = new Thread(() -> {
   log.debug("runing...");
   try {
    // 睡2秒
    TimeUnit.SECONDS.sleep(2);
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
  }, "t1");
  t1.start();

  log.debug("要开始等着t1了");
  t1.join();
  log.debug("终于等完了,可以往下执行了");
 }
}

如下结果如下:

4.wait、notify和notifyAll

这一组方法是搭配使用的,可达到线程通信的目的

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.TimeUnit;

@Slf4j
public class CreataThread {
 static boolean yan = false;
 static boolean waiMai = false;
 static Object lock = new Object();

 public static void main(String[] args) throws InterruptedException {
  Thread t1 = new Thread(() -> {
   synchronized (lock) {
    // 这样用while是为了让线程醒来之后重新判断执行条件,防止虚假唤醒
    while (!yan) {
     log.debug("没烟干不了活啊!");
     try {
      lock.wait();
     } catch (InterruptedException e) {
      e.printStackTrace();
     }
    }
    log.debug("有烟了,开始干活!");
   }
  }, "t1");

  Thread t2 = new Thread(() -> {
   log.debug("好饿啊,我想吃外卖,不吃饱干不了活!");
   synchronized (lock) {
    while (!waiMai) {
     try {
      lock.wait();
     } catch (InterruptedException e) {
      e.printStackTrace();
     }
    }
    log.debug("吃饱了,开始干活!");
   }
  }, "t2");

  t1.start();
  t2.start();

  // 主线程睡一秒,让t1、t2线程都wait,进而主线程可以拿到锁执行notifyAll唤醒操作
  TimeUnit.SECONDS.sleep(1);
  synchronized (lock){
   log.debug("外卖到咯!");
   waiMai = true;
   lock.notifyAll();
  }
 }
}

执行结果如下:

5.优雅的停止一个线程、守护线程
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.TimeUnit;

@Slf4j
public class CreataThread{
 public static void main(String[] args) throws InterruptedException {
  Thread t1 = new Thread(() -> {
  while (true){
  // 检验有没有被打断过,被打断过就退出循环结束线程
   if (Thread.currentThread().isInterrupted()){
    log.debug("完蛋,被开除了,o(╥﹏╥)o");
    break;
   }
   log.debug("摸鱼玩手机。。。。");
  }
  }, "t1");
// 守护线程只有在所有非守护线程都运行结束之后才会停止运行,就算它的任务没执行完成也会停止,例如垃圾回收线程
// 把t1设置成守护线程
// t1.setDaemon(true);
  t1.start();

  TimeUnit.SECONDS.sleep(3);
  log.debug("这t1老是摸鱼,开除他");
  // 执行打断t1操作
  t1.interrupt();
 }
}
6.synchronized互斥锁
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class CreataThread{
 static int i= 0;
 public static void main(String[] args) throws InterruptedException {

  Thread t1 = new Thread(() -> {
  for (int j = 0;j < 5000;j++){
    i++;
  }
  }, "t1");
  Thread t2 = new Thread(() -> {
   for (int j = 0;j < 5000;j++){
     i--;
   }
  }, "t2");

  t1.start();
  t2.start();
  t1.join();
  t2.join();

  log.debug("{}",count);
 }
}

如下结果如下:

一个线程做5000次加1操作另一个线程做5000次减一操作,最终的结果应该是0才对,那为什么最终结果是-1366呢?
i++时:

i- -时:

只要一来就会出现线程安全问题

所以我们得保证i++和i- - 是原子的,我们可以通过synchronized关键字来实现,代码修改之后如下:

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class CreataThread{
 static int count = 0;
 static Object obj = new Object();
 public static void main(String[] args) throws InterruptedException {

  Thread t1 = new Thread(() -> {
  for (int j = 0;j < 5000;j++){
   synchronized (obj){
    count++;
   }
  }
  }, "t1");
  Thread t2 = new Thread(() -> {
   for (int j = 0;j < 5000;j++){
    synchronized (obj){
     count--;
    }
   }
  }, "t2");

  t1.start();
  t2.start();
  t1.join();
  t2.join();
  
  log.debug("{}",count);
 }
}

值得注意的是,synchronized加在普通方法上锁的是this对象,加在静态方法上锁的是类的字节码对象,同步块的锁需要手动指定。

7.变量的线程安全分析


局部变量引用的对象的例子如下:

import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;

@Slf4j
public class CreataThread{
 static final int THREAD_NUMBER = 2;
 static final int LOOP_NUMBER = 200;

 public static void main(String[] args) {
  ThreadSafe test = new exThreadSafe();
  for (int i = 0; i < THREAD_NUMBER; i++) {
   new Thread(() -> {
    test.method1(LOOP_NUMBER);
   }, "Thread" + i).start();
  }
 }
}

 class ThreadSafe {
 public void method1(int loopNumber) {
  // 局部的引用变量,method1方法被final修饰了,method2、method3方法的访问修饰符都是private它们都不能被子类重写
  // 每条线程都会在堆里新建一个list对象然后作为参数传给method2、method3方法,所以线程和线程之间用的list对象是隔离的
  ArrayList list = new ArrayList<>();
  for (int i = 0; i < loopNumber; i++) {
   method2(list);
   method3(list);
  }
 }
 private void method2(ArrayList list) {
  list.add("1");
 }
 public void method3(ArrayList list) {
  list.remove(0);
 }
}

// 当把method3方法的访问修饰符改成public,那么该方法就会被子类重写,这样就会有线程安全问题了
class exThreadSafe extends ThreadSafe{
 @Override
 public void method3(ArrayList list) {
  // 这时候下面这个线程用的list是前一个线程传递的,也就是这个list被多条线程共享了(list被暴露给其他线程了),那么就会有线程安全问题
  new Thread(()->{
   list.remove(0);
  }).start();
 }
}


二、进阶 1.park、unpark

代码如下(示例):

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;

@Slf4j
public class CreataThread {

 public static void main(String[] args) throws InterruptedException {
  Thread t1 = new Thread(() -> {
   log.debug("runing...");
   // park会使当前线程进入阻塞
   LockSupport.park();
   log.debug("end...");
  }, "t1");

  t1.start();

  TimeUnit.SECONDS.sleep(1);
  // unpark可以精准唤醒park住的线程
  // 不一定非得先park再unpark才能唤醒线程,先unpark再park也是可以唤醒线程的
  LockSupport.unpark(t1);
 }
}

执行结果如下:

与wait、notify和notifyAll相比,其有一下特点:

    无须作用在synchronized下。可精准唤醒线程。无须先park再unpark,它们的顺序随意,但在wait、notify和notifyAll里就必须先wait再notify或notifyAll。
2.死锁

多线程情况下,线程在获得了一把锁时还相互争夺对方的锁就会产生死锁(吃着碗里瞧着锅里)。
代码如下(示例):

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.TimeUnit;

@Slf4j
public class CreataThread {
 static Object A = new Object();
 static Object B = new Object();

 public static void main(String[] args)  {
  Thread t1 = new Thread(() -> {
   synchronized (A) {
    log.debug("获得A锁。。。。");
    try {
     TimeUnit.SECONDS.sleep(1);
     log.debug("尝试获取B锁。。。。");
     synchronized (B) {
      log.debug("获取到了B锁。。。。");
     }
    } catch (InterruptedException e) {
     e.printStackTrace();
    }
   }
  }, "t1");

  Thread t2 = new Thread(() -> {
   synchronized (B) {
    log.debug("获得B锁。。。。");
    try {
     TimeUnit.SECONDS.sleep(1);
     log.debug("尝试获取A锁。。。。");
     synchronized (A) {
      log.debug("获取到了A锁。。。。");
     }
    } catch (InterruptedException e) {
     e.printStackTrace();
    }
   }
  }, "t2");

  t1.start();
  t2.start();
 }
}

执行结果如下:

定位死锁

可以使用JPS定位进程ID查看其内部线程情况。在idea的terminal中输入jps之后就会显示一些正在运行的进程,在用jstack命令查看具体进程信息,例如想看ID为123的进程,那命令就为jstack 123。我的查看结果如下:

3.活锁

活锁出现在两个线程互相改变对方的结束条件,最后谁也无法结束,例如下面的代码:

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class CreataThread {
 static volatile int count = 10;

 public static void main(String[] args)  {
  new Thread(() -> {
   // 期望减到 0 退出循环
   while (count > 0) {
    try {
     Thread.sleep(200);
    } catch (InterruptedException e) {
     e.printStackTrace();
    }
    count--;
    log.debug("count: {}", count);
   }
  }, "t1").start();
  new Thread(() -> {
   // 期望超过 20 退出循环
   while (count < 20) {
    try {
     Thread.sleep(200);
    } catch (InterruptedException e) {
     e.printStackTrace();
    }
    count++;
    log.debug("count: {}", count);
   }
  }, "t2").start();
 }
}
3.ReentrantLock
    可重入
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.locks.ReentrantLock;

@Slf4j
public class CreataThread {
 static volatile int count = 10;
 static ReentrantLock lock = new ReentrantLock();

 private static void method(){
  try {
   lock.lock();
   log.debug("获得第二把锁");
  }finally {
   lock.unlock();
   log.debug("释放第二把锁");
  }

 }

 public static void main(String[] args)  {
  new Thread(() -> {
   try {
    lock.lock();
    log.debug("获得第一把锁");
    method();
   }finally {
    lock.unlock();
    log.debug("释放第一把锁");
   }
  }, "t1").start();
 }
}
    可设置成公平锁
ReentrantLock lock = new ReentrantLock(true); // 默认是非公平锁,非公平锁性能好一些
    可设置超时时间
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

@Slf4j
public class CreataThread {
 static ReentrantLock lock = new ReentrantLock();

 public static void main(String[] args) throws InterruptedException {
  Thread t1 = new Thread(() -> {
   try {
   // 尝试2秒去获取锁
    if (!lock.tryLock(2,TimeUnit.SEConDS )){
     log.debug("没获得锁!");
    }
   } catch (InterruptedException e) {
    e.printStackTrace();
   }

   try {
    log.debug("获取到锁了");
   }finally {
    lock.unlock();
   }
  }, "t1");

  // 主线程先获得锁
  lock.lock();
  t1.start();
  TimeUnit.SECONDS.sleep(1);
  lock.unlock();
 }
}

    可打断
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.locks.ReentrantLock;

@Slf4j
public class CreataThread {
 static ReentrantLock lock = new ReentrantLock();

 public static void main(String[] args)  {
  Thread t1 = new Thread(() -> {
   try {
    // 可打断式的加锁
    log.debug("尝试加锁!");
    lock.lockInterruptibly();
   } catch (InterruptedException e) {
    log.debug("没有获得锁!");
    e.printStackTrace();
   } finally {
    lock.unlock();
   }
  }, "t1");

  lock.lock();
  t1.start();
  // 打断t1线程的等待
  t1.interrupt();
 }
}
    支持多个条件变量
    synchronized 中也有条件变量 waitSet (后面会讲到),当条件不满足时进入 waitSet 等待
    ReentrantLock 的条件变量比 synchronized 强大之处在于,它是支持多个条件变量的
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;



@Slf4j
public class CreataThread {
 static ReentrantLock lock = new ReentrantLock();
 // 新建两个环境变量
 static Condition waitCigaretteQueue = lock.newCondition();
 static Condition waitbreakfastQueue = lock.newCondition();
 static volatile boolean hasCigrette = false;
 static volatile boolean hasBreakfast = false;

 private static void sendCigarette() {
  lock.lock();
  try {
   log.debug("送烟来了");
   hasCigrette = true;
   waitCigaretteQueue.signal();
  } finally {
   lock.unlock();
  }
 }
 private static void sendBreakfast() {
  lock.lock();
  try {
   log.debug("送早餐来了");
   hasBreakfast = true;
   waitbreakfastQueue.signal();
  } finally {
   lock.unlock();
  }
 }

 public static void main(String[] args) throws InterruptedException {
   new Thread(() -> {
    try {
     lock.lock();
     while (!hasCigrette) {
      try {
       waitCigaretteQueue.await();
      } catch (InterruptedException e) {
       e.printStackTrace();
      }
     }
     log.debug("等到了它的烟");
    } finally {
     lock.unlock();
    }
   }).start();

  new Thread(() -> {
   try {
    lock.lock();
    while (!hasBreakfast) {
     try {
      waitbreakfastQueue.await();
     } catch (InterruptedException e) {
      e.printStackTrace();
     }
    }
    log.debug("等到了它的早餐");
   } finally {
    lock.unlock();
   }
  }).start();
  TimeUnit.SECONDS.sleep(1);
  sendBreakfast();
  TimeUnit.SECONDS.sleep(1);
  sendCigarette();
 }
}
4.原子整数

JUC 并发包提供了AtomicInteger、AtomicLong等原子整数,基于CAS实现无锁并发,下面以AtomicInteger为例。

AtomicInteger i = new AtomicInteger(0);
// 获取并自增(i = 0, 结果 i = 1, 返回 0),类似于 i++
System.out.println(i.getAndIncrement());
// 自增并获取(i = 1, 结果 i = 2, 返回 2),类似于 ++i
System.out.println(i.incrementAndGet());
// 自减并获取(i = 2, 结果 i = 1, 返回 1),类似于 --i
System.out.println(i.decrementAndGet());
// 获取并自减(i = 1, 结果 i = 0, 返回 1),类似于 i--
System.out.println(i.getAndDecrement());
// 获取并加值(i = 0, 结果 i = 5, 返回 0)
System.out.println(i.getAndAdd(5));
// 加值并获取(i = 5, 结果 i = 0, 返回 0)
System.out.println(i.addAndGet(-5));
// 获取并更新(i = 0, p 为 i 的当前值, 结果 i = -2, 返回 0)
// 其中函数中的操作能保证原子,但函数需要无副作用
System.out.println(i.getAndUpdate(p -> p - 2));
// 更新并获取(i = -2, p 为 i 的当前值, 结果 i = 0, 返回 0)
// 其中函数中的操作能保证原子,但函数需要无副作用
System.out.println(i.updateAndGet(p -> p + 2));
// 获取并计算(i = 0, p 为 i 的当前值, x 为参数1, 结果 i = 10, 返回 0)
// 其中函数中的操作能保证原子,但函数需要无副作用
// getAndUpdate 如果在 lambda 中引用了外部的局部变量,要保证该局部变量是 final 的
// getAndAccumulate 可以通过 参数1 来引用外部的局部变量,但因为其不在 lambda 中因此不必是 final
System.out.println(i.getAndAccumulate(10, (p, x) -> p + x));
// 计算并获取(i = 10, p 为 i 的当前值, x 为参数1, 结果 i = 0, 返回 0)
// 其中函数中的操作能保证原子,但函数需要无副作用
System.out.println(i.accumulateAndGet(-10, (p, x) -> p + x));
为什么无锁效率高

无锁情况下,即使重试失败,线程始终在高速运行,没有停歇,而 synchronized 会让线程在没有获得锁的时候,发生上下文切换,进入阻塞。打个比喻线程就好像高速跑道上的赛车,高速运行时,速度超快,一旦发生上下文切换,就好比赛车要减速、熄火,等被唤醒又得重新打火、启动、加速… 恢复到高速运行,代价比较大,但无锁情况下,因为线程要保持运行,需要额外 CPU 的支持,CPU 在这里就好比高速跑道,没有额外的跑道,线程想高速运行也无从谈起,虽然不会进入阻塞,但由于没有分到时间片,仍然会进入可运行状态,还
是会导致上下文切换。

CAS 的特点

结合 CAS 和 volatile 可以实现无锁并发,适用于线程数少、多核 CPU 的场景下。
CAS 是基于乐观锁的思想:最乐观的估计,不怕别的线程来修改共享变量,就算改了也没关系,我吃亏点再
重试呗。
synchronized 是基于悲观锁的思想:最悲观的估计,得防着其它线程来修改共享变量,我上了锁你们都别想
改,我改完了解开锁,你们才有机会。
CAS 体现的是无锁并发、无阻塞并发,请仔细体会这两句话的意思
因为没有使用 synchronized,所以线程不会陷入阻塞,这是效率提升的因素之一
但如果竞争激烈,可以想到重试必然频繁发生,反而效率会受影响

5.原子引用

由于我们并不都是保护基本类型,也可能保护的是引用类型,所以JUC 并发包提供了AtomicReference、AtomicMarkableReference、AtomicStampedReference等原子引用类型。

import lombok.extern.slf4j.Slf4j;

import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;


@Slf4j
public class CreataThread {

 AtomicReference ref;
 public CreataThread(BigDecimal balance) {
  ref = new AtomicReference<>(balance);
 }

 public BigDecimal getBalance() {
  return ref.get();
 }

 public void withdraw(BigDecimal amount) {
  while (true) {
   BigDecimal prev = ref.get();
   BigDecimal next = prev.subtract(amount);
   // cas操作,比较并赋值
   if (ref.compareAndSet(prev, next)) {
    break;
   }
  }
 }

 public static void main(String[] args) throws InterruptedException {
  CreataThread creataThread = new CreataThread(new BigDecimal("10000"));

  List ts = new ArrayList<>();
  for (int i = 0; i < 1000; i++) {
   ts.add(new Thread(() -> {
    creataThread.withdraw(BigDecimal.TEN);
   }));
  }
  ts.forEach(Thread::start);
  ts.forEach(t -> {
   try {
    t.join();
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
  });
  System.out.println(creataThread.getBalance());
 }
}
ABA问题

看下面代码:

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.atomic.AtomicReference;


@Slf4j
public class CreataThread {

 static AtomicReference ref = new AtomicReference<>("A");
 public static void main(String[] args) throws InterruptedException {
  log.debug("main start...");
  // 获取值 A
  // 这个共享变量被它线程修改过?
  String prev = ref.get();
  other();
  Thread.sleep(1000);
  // 尝试改为 C
  log.debug("change A->C {}", ref.compareAndSet(prev, "C"));
 }
 private static void other() throws InterruptedException {
  new Thread(() -> {
   log.debug("change A->B {}", ref.compareAndSet(ref.get(), "B"));
  }, "t1").start();
  Thread.sleep(500);

  new Thread(() -> {
   log.debug("change B->A {}", ref.compareAndSet(ref.get(), "A"));
  }, "t2").start();
 }
}

主线程仅能判断出共享变量的值与最初值 A 是否相同,不能感知到这种从 A 改为 B 又 改回 A 的情况,如果主线程希望:只要有其它线程【动过了】共享变量,那么自己的 cas 就算失败,这时,仅比较值是不够的,需要再加一个版本号,AtomicStampedReference可解决ABA问题。

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.atomic.AtomicStampedReference;


@Slf4j
public class CreataThread {

 static AtomicStampedReference ref = new AtomicStampedReference<>("A", 0);
 public static void main(String[] args) throws InterruptedException {
  log.debug("main start...");
  // 获取值 A
  String prev = ref.getReference();
  // 获取版本号
  int stamp = ref.getStamp();
  log.debug("版本 {}", stamp);
  // 如果中间有其它线程干扰,发生了 ABA 现象
  other();
  Thread.sleep(1000);
  // 尝试改为 C
  log.debug("change A->C {}", ref.compareAndSet(prev, "C", stamp, stamp + 1));
 }
 private static void other() throws InterruptedException {
  new Thread(() -> {
   log.debug("change A->B {}", ref.compareAndSet(ref.getReference(), "B",
    ref.getStamp(), ref.getStamp() + 1));
   log.debug("更新版本为 {}", ref.getStamp());
  }, "t1").start();
  Thread.sleep(500);
  new Thread(() -> {
   log.debug("change B->A {}", ref.compareAndSet(ref.getReference(), "A",
    ref.getStamp(), ref.getStamp() + 1));
   log.debug("更新版本为 {}", ref.getStamp());
  }, "t2").start();
 }
}

AtomicStampedReference 可以给原子引用加上版本号,追踪原子引用整个的变化过程,如: A -> B -> A -> C ,通过
AtomicStampedReference,我们可以知道,引用变量中途被更改了几次。但是有时候,并不关心引用变量更改了几次,只是单纯的关心是否更改过,所以就有了AtomicMarkableReference

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.atomic.AtomicMarkableReference;


class GarbageBag {
 String desc;
 public GarbageBag(String desc) {
  this.desc = desc;
 }
 public void setDesc(String desc) {
  this.desc = desc;
 }
 @Override
 public String toString() {
  return super.toString() + " " + desc;
 }
}

@Slf4j
public class CreataThread {
 public static void main(String[] args) throws InterruptedException {
  GarbageBag bag = new GarbageBag("装满了垃圾");
  // 参数2 mark 可以看作一个标记,表示垃圾袋满了
  AtomicMarkableReference ref = new AtomicMarkableReference<>(bag, true);
  log.debug("老王 start...");
  GarbageBag prev = ref.getReference();
  log.debug(prev.toString());

  // 保洁阿姨把垃圾袋里的垃圾清理掉了,现在垃圾袋里是空的,标识就变成false
  new Thread(() -> {
   log.debug("保洁阿姨 start...");
   bag.setDesc("空垃圾袋");
   while (!ref.compareAndSet(bag, bag, true, false)) {}
   log.debug(bag.toString());
  }).start();

  // 老王今天很勤快,他装备去把垃圾袋清理一下,但在这之前,保洁阿姨就已经把垃圾清理完了(标识变成false了),显然老王最后是没倒成垃圾
  Thread.sleep(1000);
  log.debug("老王想倒垃圾?");
  boolean success = ref.compareAndSet(prev, new GarbageBag("空垃圾袋"), true, false);
  log.debug("倒成了么?" + success);
  log.debug(ref.getReference().toString());
 }
}

6.原子数组

AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray

7.字段更新器

AtomicReferenceFieldUpdater、AtomicIntegerFieldUpdater、AtomicLongFieldUpdater
利用字段更新器,可以针对对象的某个域(Field)进行原子操作,只能配合 volatile 修饰的字段使用,否则会出现
异常。

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;


@Slf4j
public class CreataThread {
 private volatile int field;
 public static void main(String[] args) {
  AtomicIntegerFieldUpdater fieldUpdater =
  AtomicIntegerFieldUpdater.newUpdater(CreataThread.class, "field");

  CreataThread test5 = new CreataThread();
  fieldUpdater.compareAndSet(test5, 0, 10);
  // 修改成功 field = 10
  System.out.println(test5.field);
  // 修改成功 field = 20
  fieldUpdater.compareAndSet(test5, 10, 20);
  System.out.println(test5.field);
  // 修改失败 field = 20
  fieldUpdater.compareAndSet(test5, 10, 30);
  System.out.println(test5.field);
 }
}
8.字段累加器
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Consumer;
import java.util.function.Supplier;


@Slf4j
public class CreataThread {

 private static  void demo(Supplier adderSupplier, Consumer action) {
  T adder = adderSupplier.get();
  long start = System.nanoTime();
  List ts = new ArrayList<>();
  // 4 个线程,每人累加 50 万
  for (int i = 0; i < 40; i++) {
   ts.add(new Thread(() -> {
    for (int j = 0; j < 500000; j++) {
     action.accept(adder);
    }
   }));
  }
  ts.forEach(t -> t.start());
  ts.forEach(t -> {
   try {
    t.join();
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
  });
  long end = System.nanoTime();
  System.out.println(adder + " cost:" + (end - start)/1000_000);
 }

 public static void main(String[] args) {
  for (int i = 0; i < 5; i++) {
   demo(() -> new LongAdder(), adder -> adder.increment());
  }
  for (int i = 0; i < 5; i++) {
   demo(() -> new AtomicLong(), adder -> adder.getAndIncrement());
  }
 }
}

执行结果如下:

性能提升的原因很简单,就是在有竞争时,设置多个累加单元,Therad-0 累加 Cell[0],而 Thread-1 累加Cell[1]… 最后将结果汇总。这样它们在累加时操作的不同的 Cell 变量,因此减少了 CAS 重试失败,从而提高性能,但拆分的单元数不会大于CPU数。

9.Unsafe

Unsafe 对象提供了非常底层的,操作内存、线程的方法,Unsafe 对象不能直接调用,只能通过反射获得,java.util.concurrent.atomic包下的原子类的方法很多都是调用的Unsafe 的方法。

import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import sun.misc.Unsafe;

import java.lang.reflect.Field;

@Data
class Student {
 volatile int id;
 volatile String name; }

@Slf4j
public class CreataThread {

 static Unsafe unsafe;
 static {
  try {
   Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
   theUnsafe.setAccessible(true);
   unsafe = (Unsafe) theUnsafe.get(null);
  } catch (NoSuchFieldException | IllegalAccessException e) {
   throw new Error(e);
  }
 }


 public static void main(String[] args) throws NoSuchFieldException {
  Field id = Student.class.getDeclaredField("id");
  Field name = Student.class.getDeclaredField("name");
// 获得成员变量的偏移量
  long idOffset = CreataThread.unsafe.objectFieldOffset(id);
  long nameOffset = CreataThread.unsafe.objectFieldOffset(name);
  Student student = new Student();
// 使用 cas 方法替换成员变量的值
  CreataThread.unsafe.compareAndSwapInt(student, idOffset, 0, 20); // 返回 true
  CreataThread.unsafe.compareAndSwapObject(student, nameOffset, null, "张三"); // 返回 true
  System.out.println(student);
 }
}
9.线程池 1.线程池状态


2.线程池构造方法
public ThreadPoolExecutor(int corePoolSize,
 						  int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)


线程池的工作流程如下:

    线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务。当线程数达到 corePoolSize 并没有线程空闲,这时再加入任务,新加的任务会被加入workQueue 队列排队,直到有空闲的线程。如果队列选择了有界队列,那么任务超过了队列大小时,会创建 maximumPoolSize - corePoolSize 数目的线程来救急。如果线程到达 maximumPoolSize 仍然有新任务这时会执行拒绝策略。拒绝策略 jdk 提供了 4 种实现:AbortPolicy 让调用者抛出 RejectedExecutionException 异常,这是默认策略、CallerRunsPolicy 让调用者运行任务、DiscardPolicy 放弃本次任务、DiscardOldestPolicy 放弃队列中最早的任务,本任务取而代之。
3.固定大小线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
 return new ThreadPoolExecutor(nThreads, nThreads,
                              0L, TimeUnit.MILLISECONDS,
                              new linkedBlockingQueue());
}

4.缓冲线程池
public static ExecutorService newCachedThreadPool() {
 return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                               60L, TimeUnit.SECONDS,
                               new SynchronousQueue());
}
// 核心线程数为0,最大线程数为Integer.MAX_VALUE,所以此线程池的所有线程都是急救线程,存活时间是60
//秒,队列采用了 SynchronousQueue 实现特点是,它没有容量,没有线程来取是放不进去的(一手交钱、一手
//交货)
5.单线程线程池
public static ExecutorService newSingleThreadExecutor() {
 return new FinalizableDelegatedExecutorService
 (new ThreadPoolExecutor(1, 1,
                         0L, TimeUnit.MILLISECONDS,
                         new linkedBlockingQueue()));
}

6.具有任务调度功能的线程池
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;


@Slf4j
public class CreataThread {
 public static void main(String[] args) {
  // 任务调度功能的线程池
  ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);

  // 参数一:任务 参数二:延迟时间 参数三:执行的间隔时间  参数四:时间单位
  // 延迟一秒之后,每个一秒就打印一次runing...
  scheduledExecutorService.scheduleAtFixedRate(()->{
   log.debug("runing...");
  }, 1,1 ,TimeUnit.SEConDS );
 }
}
7.线程池常用方法
// 执行任务
void execute(Runnable command);

// 提交任务 task,用返回值 Future 获得任务执行结果
 Future submit(Callable task);

// 提交 tasks 中所有任务
 List> invokeAll(Collection> tasks)
 throws InterruptedException;
 
// 提交 tasks 中所有任务,带超时时间
 List> invokeAll(Collection> tasks,
                              long timeout, TimeUnit unit)
 throws InterruptedException;
 
// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
 T invokeAny(Collection> tasks)
 throws InterruptedException, ExecutionException;

// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间
 T invokeAny(Collection> tasks,
                long timeout, TimeUnit unit)
 throws InterruptedException, ExecutionException, TimeoutException;


void shutdown();


List shutdownNow();
8.创建多大线程池合适
    CPU密集型:通常采用CPU核数 + 1能够实现最优的CPU利用率,+1是保证当线程由于某种原因导致暂停时有额外的这个线程顶上去。I/O密集型:CPU不总是处于繁忙状态,例如,当你执行业务计算时,这时候会使用CPU资源,但在执行IO操作额远程RPC调用的时候,CPU就闲下来了,你可以利用多线程提高它的利用率
9.如何处理线程池异常
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;


@Slf4j
public class CreataThread {
 public static void main(String[] args) throws ExecutionException, InterruptedException {
  ExecutorService threadPool = Executors.newFixedThreadPool(1);

  // 1.主动try/catch,在catch里记录异常信息(推荐使用这种方法)
//  threadPool.execute(()->{
//   try {
//    log.debug("runing...");
//    int i = 1/0;
//   }catch (Exception e){
//    log.error(e.getMessage());
//    e.printStackTrace();
//   }
//  });

  // 2.submit配合callable得到Future,通过Future的get方法获取异常信息
  Future submit = threadPool.submit(() -> {
   log.debug("runing...");
   int i = 1 / 0;
   return true;
  });
  log.debug("{}",submit.get());
 }
}
10. ReentrantReadWriteLock

当读操作远远高于写操作时,这时候使用 读写锁 让 读-读 可以并发,提高性能,读写和写写都是互斥的,重入时升级不支持:即持有读锁的情况下去获取写锁,会导致获取写锁永久等待,重入时降级支持:即持有写锁的情况下去获取读锁。

11. StampedLock

是为了进一步优化读性能,它的特点是在使用读锁、写锁时都必须配合【戳】使用,StampedLock 支持 tryOptimisticRead() 方法(乐观读),读取完毕后需要做一次 戳校验 如果校验通过,表示这期间确实没有写操作,数据可以安全使用,如果校验没通过,需要重新获取读锁,保证数据安全,与ReentrantReadWriteLock不同的是ReentrantReadWriteLock的写锁可以降到读锁的,但StampedLock的读写锁无法升、降级。

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.locks.StampedLock;

import static java.lang.Thread.sleep;


@Slf4j
public class CreataThread {
 int data;
 private final StampedLock lock = new StampedLock();
 public CreataThread(int data) {
  this.data = data;
 }

 public int read(int readTime) throws InterruptedException {
  // 进行一次乐观读,返回一个戳
  long stamp = lock.tryOptimisticRead();
  log.debug("optimistic read locking...{}", stamp);
  sleep(readTime);
  // 验证戳
  if (lock.validate(stamp)) {
   log.debug("read finish...{}, data:{}", stamp, data);
   return data;
  }
  // 戳验证失败锁升级 - 读锁
  log.debug("updating to read lock... {}", stamp);
  try {
   stamp = lock.readLock();
   log.debug("read lock {}", stamp);
   sleep(readTime);
   log.debug("read finish...{}, data:{}", stamp, data);
   return data;
  } finally {
   log.debug("read unlock {}", stamp);
   lock.unlockRead(stamp);
  }
 }
 public void write(int newData) {
  long stamp = lock.writeLock();
  log.debug("write lock {}", stamp);
  try {
   sleep(2);
   this.data = newData;
  } catch (InterruptedException e) {
   e.printStackTrace();
  } finally {
   log.debug("write unlock {}", stamp);
   lock.unlockWrite(stamp);
  }
 }

 public static void main(String[] args) throws ExecutionException, InterruptedException {
  CreataThread creataThread = new CreataThread(234);
  new Thread(() -> {
   try {
    creataThread.read(creataThread.data);
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
  }, "t1").start();
  new Thread(() -> {
   creataThread.write(creataThread.data);
  }, "t2").start();
 }
}
12. Semaphore
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;


@Slf4j
public class CreataThread {

 public static void main(String[] args) throws ExecutionException, InterruptedException {
  // Semaphore可简单的实现限流的功能,可设置最多有几个线程同时运行
  Semaphore semaphore = new Semaphore(3);

  for (int i = 0;i < 10;i++){
   new Thread(()->{
    try {
     // 获取许可,只有获取到许可的线程才可以往下运行
     semaphore.acquire();
     log.debug("runing...");
     TimeUnit.SECONDS.sleep(1);
     log.debug("释放许可");
    } catch (InterruptedException e) {
     e.printStackTrace();
    }finally {
     semaphore.release();

    }
   },"Thread-"+i).start();
  }
 }
}

13. CountdownLatch

用来进行线程同步协作,等待所有线程完成倒计时。
其中构造参数用来初始化等待计数值,await() 用来等待计数归零,countDown() 用来让计数减一

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;


@Slf4j
public class CreataThread {

 public static void main(String[] args) throws ExecutionException, InterruptedException {
  CountDownLatch countDownLatch = new CountDownLatch(2);

  Thread t1 = new Thread(() -> {
   log.debug("runing...");
   // 执行CountDownLatch-1操作
   countDownLatch.countDown();
  }, "t1");

  Thread t2 = new Thread(() -> {
   log.debug("runing...");

   try {
    TimeUnit.SECONDS.sleep(2);
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
   // 执行CountDownLatch-1操作
   countDownLatch.countDown();
  }, "t2");

  t1.start();
  t2.start();

  countDownLatch.await();
  log.debug("等待完毕,可以往下执行了!");
 }
}
14. CyclicBarrier
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.*;


@Slf4j
public class CreataThread {

 public static void main(String[] args) throws ExecutionException, InterruptedException {
  ExecutorService threadPool = Executors.newFixedThreadPool(2);
  // CyclicBarrier可重复利用,当参数减为0之后会自动填充为2
  CyclicBarrier cyclicBarrier = new CyclicBarrier(2,()->{
   log.debug("最后任务完成");
  });

  threadPool.submit(()->{
   log.debug("runing...");
   try {
    // CyclicBarrier执行减1,不为0就等待
    cyclicBarrier.await();
   } catch (InterruptedException e) {
    e.printStackTrace();
   } catch (BrokenBarrierException e) {
    e.printStackTrace();
   }
  });

  threadPool.submit(()->{
   log.debug("runing...");
   try {
    TimeUnit.SECONDS.sleep(2);
    // CyclicBarrier执行减1,减为0之后,所有在CyclicBarrier上等待的线程都恢复运行
    cyclicBarrier.await();
   } catch (InterruptedException e) {
    e.printStackTrace();
   } catch (BrokenBarrierException e) {
    e.printStackTrace();
   }
  });
 }
}
拔高

提示:这里对文章进行总结:
例如:以上就是今天要讲的内容,本文仅仅简单介绍了pandas的使用,而pandas提供了大量能使我们快速便捷地处理数据的函数和方法。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/731871.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号