-
涉及到的包
-
java.util.concurrent
-
java.util.concurrent.atomic
-
java.util.concurrent.locks
-
java.util.function
-
-
并发编程的本质:充分利用 CPU 的资源
普通的线程代码
- Thread
- Runnable 没有返回值,效率相比 Callable 相对较低
公司一般用 Callable
并发和并行并发:一核 CPU 下,模拟出多条线程,多个线程操作同一个资源,交替执行
并行:多核 CPU 下,多个线程一起执行
System.out.println(Runtime.getRuntime().availableProcessors()); //获取CPU的核数线程和进程
- 程序的一次执行叫做进程
- 线程是进程的实际运作单位
- 线程就是一个单独的资源类,没有任何附属的操作
- 一个进程往往包含多个线程,至少包含一个
- Java 默认有两个线程:main,GC
- Java 不能开启线程,是通过本地方法调用底层的 C 开启线程
- Java 无法直接操作硬件
- NEW 新生
- RUNNABLE 运行
- BLOCKED 阻塞
- WAITING 无限等待
- TIMED_WAITING 超时等待,超过一段时间停止等待
- TERMINATED 终止
-
wait 来自 Object 类
-
sleep 来自 Thread 类
-
企业中使用 TimeUnit 类(java.util.concurrent 包下)下的方法
-
wait 会释放锁
-
sleep 不会释放锁
- wait 必须在同步代码块中使用
- sleep 范围无限制
Synchronized
package BXBF;
public class CPU {
public static void main(String[] args) {
//实际开发需要降低耦合性
//并发:多线程操作同一个资源类,把资源类丢入线程
Ticket ticket = new Ticket();
//lambda表达式
new Thread(()-> {
for (int i = 0; i < 101; i++) {
ticket.sale();
}
},"灰").start();
new Thread(()-> {
for (int i = 0; i < 101; i++) {
ticket.sale();
}
},"黑").start();
new Thread(()-> {
for (int i = 0; i < 101; i++) {
ticket.sale();
}
},"白").start();
}
}
//资源类 OOP
class Ticket{
//属性,方法
private int number=100;
public synchronized void sale(){
if (number>0){
System.out.println(Thread.currentThread().getName()+"买到了第"+number--+"张票");
}
}
}
Lock
实现类
- ReentrantLock(可重用锁(常用))
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
非公平锁:NonfairSync:不公平,可以插队,默认使用非公平锁
公平锁:FairSync:十分公平,先来后到
-
ReentrantReadWriteLock.ReadLock(读锁)
-
ReentrantReadWriteLock.WriteLock(写锁)
- lock(),加锁
- unlock(),解锁
- tryLock(),尝试获取锁
package BXBF;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Ticketer {
public static void main(String[] args) {
Ticket1 ticket = new Ticket1();
new Thread(()-> {
for (int i = 0; i < 101; i++) {
ticket.sale();
}
},"灰").start();
new Thread(()-> {
for (int i = 0; i < 101; i++) {
ticket.sale();
}
},"黑").start();
new Thread(()-> {
for (int i = 0; i < 101; i++) {
ticket.sale();
}
},"白").start();
}
}
class Ticket1{
private int number=100;
Lock lock=new ReentrantLock(); //创建锁
public void sale(){
lock.lock(); //加锁
try {
if (number>0){
System.out.println(Thread.currentThread().getName()+"买到了第"+number--+"张票");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock(); //解锁
}
}
}
Synchronized 和 Lock 的区别
- Synchronized 是内置的 java 关键字;Lock 是一个 java 类
- Synchronized 无法判断获取锁的状态;Lock 可以判断是否获取到锁
- Synchronized 会自动释放锁;Lock 必须手动释放锁。如果不释放,可能会发生死锁
- Synchronized 线程 1 获得锁,线程 2 一直等待,容易阻塞;Lock 线程 1 获得锁,线程 2 不会一直等待
- Synchronized 是可重入锁,不可以中断,非公平;Lock 可重入,可以判断锁,非公平(可以设置)
- Synchronized 适合锁少量的代码同步问题;Lock 适合锁大量的代码同步问题
- 读锁( writeLock() ),共享锁
- 写锁( read() ),独占锁
package BXBF;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ReadWriteText {
public static void main(String[] args) {
MyRW myRW = new MyRW();
for (int i = 0; i < 5; i++) {
final int i1=i;
new Thread(()->{
myRW.Write(i1+"",i1+"");
},String.valueOf(i)).start();
}
for (int i = 0; i < 5; i++) {
final int i2=i;
new Thread(()->{
myRW.Read(i2+"");
},String.valueOf(i)).start();
}
}
}
class MyRW{
private volatile Map map=new HashMap<>();
private ReentrantReadWriteLock lock=new ReentrantReadWriteLock();
public void Write(String key,String value){
lock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName()+"开始写");
map.put(key,value);
System.out.println(Thread.currentThread().getName()+"写完了");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.writeLock().unlock();
}
}
//不加读锁可能会产生脏读
public void Read(String key){
lock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName()+"开始读");
String s = map.get(key);
System.out.println(Thread.currentThread().getName()+"读完了");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.readLock().unlock();
}
}
}
生产者和消费者问题
Synchronized 解决 方法
- wait() 等待
- notifyAll() 唤醒全部线程
package BXBF;
public class PC {
public static void main(String[] args) {
Product product = new Product();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
product.add();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"A").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
product.subtract();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"B").start();
}
}
class Product{
private int number=0;
public synchronized void add() throws InterruptedException {
if (number!=0){
this.wait();
}
number++;
System.out.println(Thread.currentThread().getName()+"---->"+number);
this.notifyAll();
}
public synchronized void subtract() throws InterruptedException {
if (number==0){
this.wait();
}
number--;
System.out.println(Thread.currentThread().getName()+"---->"+number);
this.notifyAll();
}
}
当生产者消费者各为一个时,线程安全
当生产者消费者不为一时,为了防止虚假唤醒,等待应该总是出现在循环中
package BXBF;
public class PC {
public static void main(String[] args) {
Product product = new Product();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
product.add();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"A").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
product.add();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"B").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
product.subtract();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"C").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
product.subtract();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"D").start();
}
}
class Product{
private int number=0;
public synchronized void add() throws InterruptedException {
while (number!=0){
this.wait();
}
number++;
System.out.println(Thread.currentThread().getName()+"---->"+number);
this.notifyAll();
}
public synchronized void subtract() throws InterruptedException {
while (number==0){
this.wait();
}
number--;
System.out.println(Thread.currentThread().getName()+"---->"+number);
this.notifyAll();
}
}
问题:线程是无序的,随机执行的
Lock 解决Condition 取代了对象监视器方法的使用
作用:精准的通知和唤醒线程,解决 Synchronized 线程无序,随机执行的问题
方法- await() 等待
- signalAll() 唤醒全部线程
package BXBF;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ConditPCL {
public static void main(String[] args) {
Productions product = new Productions();
new Thread(()->{
for (int i = 0; i < 10; i++) {
product.A();
}
},"A").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
product.B();
}
},"B").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
product.C();
}
},"C").start();
}
}
class Productions{
private int number=1;
Lock lock=new ReentrantLock();
Condition condition1=lock.newCondition();
Condition condition2=lock.newCondition();
Condition condition3=lock.newCondition();
public void A(){
lock.lock();
try {
while (number!=1){
condition1.await();
}
System.out.println(Thread.currentThread().getName()+"---->"+number);
number=2;
condition2.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void B(){
lock.lock();
try {
while (number!=2){
condition2.await();
}
System.out.println(Thread.currentThread().getName()+"---->"+number);
number=3;
condition3.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void C(){
lock.lock();
try {
while (number!=3){
condition3.await();
}
System.out.println(Thread.currentThread().getName()+"---->"+number);
number=1;
condition1.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
6 个关于锁执行的问题
问题1
下方代码执行后,先输出 1 还是先出输出 2 ?
package BXBF;import java.util.concurrent.TimeUnit;public class EightS { public static void main(String[] args){ Cards cards = new Cards(); new Thread(()->{ cards.C1(); },"1").start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(()->{ cards.C2(); },"2").start(); }}class Cards{ public synchronized void C1(){ System.out.println("1"); } public synchronized void C2(){ System.out.println("2"); }}
答:先输出 1
问题2原因:synchronized 锁的对象是方法的调用者(对象)
两个方法用的是同一把锁,谁先拿到谁执行,在代码中线程 2 等待了一秒,故而线程 1 先获得锁,先输出 1
下方代码执行后,先输出 1 还是先出输出 2 ?
package BXBF;import java.util.concurrent.TimeUnit;public class EightS { public static void main(String[] args){ Cards cards = new Cards(); new Thread(()->{ cards.C1(); },"1").start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(()->{ cards.C2(); },"2").start(); }}class Cards{ public synchronized void C1(){ try { TimeUnit.SECONDS.sleep(4); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("1"); } public synchronized void C2(){ System.out.println("2"); }}
答:先输出 1
问题3原因:方法 C1 中虽然休息了 4 秒,但是在休息之前,线程 1 已经获得了锁,sleep 不会释放锁,故而先输出 1
下方代码执行后,先输出 1 还是先出输出 3 ?
package BXBF;import java.util.concurrent.TimeUnit;public class EightS { public static void main(String[] args){ Cards cards = new Cards(); new Thread(()->{ cards.C1(); },"1").start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(()->{ cards.C3(); },"2").start(); }}class Cards{ public synchronized void C1(){ try { TimeUnit.SECONDS.sleep(4); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("1"); } public synchronized void C2(){ System.out.println("2"); } public void C3(){ System.out.println("3"); }}
答:先输出 3
问题4原因:虽然线程 1 已经获得了锁,但 C3 没有锁,不是同步方法,不受锁的影响,在休息 1 秒后,线程 2 执行,输出 3 ,此时线程 1 还在抱锁休息
下方代码执行后,先输出 1 还是先出输出 2 ?
package BXBF;
import java.util.concurrent.TimeUnit;
public class EightS {
public static void main(String[] args){
Cards cards = new Cards();
Cards cards1 = new Cards();
new Thread(()->{
cards.C1();
},"1").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(()->{
cards1.C2();
},"2").start();
}
}
class Cards{
public synchronized void C1(){
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("1");
}
public synchronized void C2(){
System.out.println("2");
}
}
答:先输出 2
问题5原因:两个不同的对象,有两把锁,线程 1 抱住对象 1 的锁休息 4 s,线程 2 休息 1 秒后获得对象 2 的锁,故而线程 2 先执行,先输出 2
下方代码执行后,先输出 1 还是先出输出 2 ?
package BXBF;
import java.util.concurrent.TimeUnit;
public class EightS {
public static void main(String[] args){
Cards cards = new Cards();
Cards cards1 = new Cards();
new Thread(()->{
cards.C1();
},"1").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(()->{
cards1.C2();
},"2").start();
}
}
class Cards{
public static synchronized void C1(){
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("1");
}
public static synchronized void C2(){
System.out.println("2");
}
}
答:先输出 1
问题6原因:static 静态方法,类一加载就有了,这里锁的是 Class 模板(唯一的 Class 对象),无论创建多少个该类的对象,锁只有一个,线程 1 先抢到锁,所以先执行线程 1,先输出 1
下方代码执行后,先输出 1 还是先出输出 2 ?
package BXBF;
import java.util.concurrent.TimeUnit;
public class EightS {
public static void main(String[] args){
Cards cards = new Cards();
new Thread(()->{
cards.C1();
},"1").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(()->{
cards.C2();
},"2").start();
}
}
class Cards{
public static synchronized void C1(){
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("1");
}
public synchronized void C2(){
System.out.println("2");
}
}
答:先输出 2
集合不安全原因:代码中的两个方法,一个是静态同步方法,锁的是 Class 类模板;一个是普通同步方法,锁的是类的调用者(对象)。两个方法的锁不同,不需要争抢,线程 1 抱住 Class 类模板的锁休息 4 s,线程 2 休息 1 秒后获得对象的锁,故而线程 2 先执行,先输出 2
List
package BXBF;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.UUID;
public class ListText {
public static void main(String[] args) {
List list = new ArrayList<>();
for (int i = 1; i <= 10; i++) {
new Thread(()->{
list.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(list);
},String.valueOf(i)).start();
}
}
}
- 会报出 ConcurrentModificationException 并发修改异常
- 并发下 ArrrayList 是不安全的
-
List
list = new Vector<>(); 不推荐,效率低
-
List
list = Collections.synchronizedList(new ArrayList<>()); -
List
list = new CopyOnWriteArrayList<>(); 效率高
CopyonWrite 写入时复制,COW 思想,计算机设计领域的一种优化策略
Set写入时复制(CopyOnWrite,简称COW)思想是计算机程序设计领域中的一种通用优化策略。其核心思想是,如果有多个调用者(Callers)同时访问相同的资源(如内存或者是磁盘上的数据存储),他们会共同获取相同的指针指向相同的资源,直到某个调用者修改资源内容时,系统才会真正复制一份专用副本(private copy)给该调用者,而其他调用者所见到的最初的资源仍然保持不变。这过程对其他的调用者都是透明的(transparently)。此做法主要的优点是如果调用者没有修改资源,就不会有副本(private copy)被创建,因此多个调用者只是读取操作时可以共享同一份资源。当调用者往资源中添加元素时,不直接操作当前资源,而是对系统复制出的专用副本进行修改,修改后,再将原资源的引用指向新的资源,即完成了整个修改操作;
package BXBF;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
public class ListText {
public static void main(String[] args) {
Set set = new HashSet<>();
for (int i = 1; i <= 20; i++) {
new Thread(()->{
set.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(set);
},String.valueOf(i)).start();
}
}
}
- 会报出 ConcurrentModificationException 并发修改异常
- 并发下 HashSet 是不安全的
-
Set
set = Collections.synchronizedSet(new HashSet<>()); -
Set
set = new CopyOnWriteArraySet<>();
- HashMap
public HashSet() { map = new HashMap<>();}
- HashSet 的 add 方法
public boolean add(E e) { return map.put(e, PRESENT)==null; //PRESENT是一个常量}
- Set 的本质就是 Map 的 key,key 是无法重复的,所以 Set 无序
- 工作中不适用 HashMap
- 默认等价于:new HashMap<>(16,0.75)
package BXBF;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
//ConcurrentModificationException 并发修改异常
public class ListText {
public static void main(String[] args) {
Map map = new HashMap<>();
for (int i = 1; i <= 20; i++) {
new Thread(()->{
map.put(Thread.currentThread().getName(),UUID.randomUUID().toString().substring(0,5));
System.out.println(map);
},String.valueOf(i)).start();
}
}
}
- 会报出 ConcurrentModificationException 并发修改异常
- 并发下 HashMap 是不安全的
-
Map
map = Collections.synchronizedMap(new HashMap<>()); -
Map
map = new ConcurrentHashMap<>();
- 多线程的第三种创建方式
- 可以有返回值
- 可以抛出异常
- 需要重写的方法不同( call() )
@FunctionalInterface public interface Callable{ V call() throws Exception; }
- 泛型的参数等于方法的返回值
package BXBF;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class callable {
//可以抛出异常
public static void main(String[] args) throws ExecutionException, InterruptedException {
Mycallable mycallable = new Mycallable();
FutureTask futureTask = new FutureTask(mycallable);
new Thread(futureTask).start();
String str = (String) futureTask.get(); //可以获得返回值,这个方法可能会产生阻塞,一般把它放在最后,或者使用异步通信来处理
System.out.println(str);
}
}
class Mycallable implements Callable {
@Override
public String call() {
System.out.println("call");
return "145236";
}
}
-
线程的开启只能通过 new Thread().start();
-
但 Callable 不能通过 new Thread().start(); 直接开启
Callable 线程类开启的方法
- 已知只有 Runnable 接口可以通过 new Thread().start(); 直接开启线程,即 new Thread(new Runnable()).start();
- 使用 Runnable 接口的实现了开启线程,即 new Thread(new FutureTask()).start();
- FutureTask 类有一个带 Callable 类型参数的构造器,可以调用 Callable,即new Thread(new FutureTask(Callable)).start();
- 最终完成线程开启
-
多个 FutureTask 类 对象开启线程时,只返回一次结果
当 JVM 再调用 FutureTask 对象锁持有的线程时,FutureTask 的 start 此时已非 new 状态,会直接结束对应线程,任务不被执行,只有第一次调用时返回结果保存,故而只返回一次结果
CountDownLatch
- 减法计数器
-
countDown(),计数器减 1
-
await(),计数器归 0 后,程序继续向下执行
package BXBF;
import java.util.concurrent.CountDownLatch;
public class Count {
public static void main(String[] args) throws InterruptedException {
CountDownLatch cdl = new CountDownLatch(5);
for (int i = 1; i <=5; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"号走了");
cdl.countDown();
},String.valueOf(i)).start();
}
cdl.await();
System.out.println("关门");
}
}
CyclicBarrier
- 加法计数器
- await(),当计数器数量到达指定值时,阻塞当前线程,执行构造器中的线程
package BXBF;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class Cl {
public static void main(String[] args) {
CyclicBarrier cb = new CyclicBarrier(5, () -> {
System.out.println("开门");
});
for (int i = 0; i < 5; i++) {
//当lambda表达式中想使用for循环中的i时,需要将i经过如下转换才可以
// final int ii=i;
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"号同学来了");
try {
cb.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
},String.valueOf(i)).start();
}
}
}
Semaphore
- 计数信号量
- acquire(),获得位置,位置已满的话等待位置被释放
- release(),将当前的信号量释放,唤醒等待线程
package BXBF;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class Sp {
public static void main(String[] args) {
//限流
Semaphore sp = new Semaphore(4); //同一时间只能有四条线程并发执行
for (int i = 1; i <= 8; i++) {
new Thread(()->{
try {
sp.acquire(); //等到位置
System.out.println(Thread.currentThread().getName()+"号进来了");
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName()+"号走了");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
sp.release(); //释放位置
}
},String.valueOf(i)).start();
}
}
}
作用:多个共享资源互斥的使用,并发限流,控制最大的线程数
阻塞队列(BlockingQueue)- 如果队列满了,需要阻塞等待数据取出
- 如果队列为空,需要阻塞等待数据写入
- 四组 API
- 添加方法:add()
- 移除方法:remove()
- 获得队首数据:element()
package BXBF;
import java.util.concurrent.ArrayBlockingQueue;
public class BQ {
public static void main(String[] args) {
test();
}
public static void test(){
ArrayBlockingQueue abq = new ArrayBlockingQueue<>(3);
System.out.println(abq.add("145"));
System.out.println(abq.add("258"));
System.out.println(abq.add("198"));
System.out.println(abq.add("198"));
}
}
阻塞队列大小被定义为 3,当写入数据数量大于 3 时,报错:IllegalStateException: Queue full 非法状态异常,队列已满
package BXBF;
import java.util.concurrent.ArrayBlockingQueue;
public class BQ {
public static void main(String[] args) {
test();
}
public static void test(){
ArrayBlockingQueue abq = new ArrayBlockingQueue<>(3);
System.out.println(abq.add("145"));
System.out.println(abq.add("258"));
System.out.println(abq.add("198"));
System.out.println(abq.remove());
System.out.println(abq.remove());
System.out.println(abq.remove());
System.out.println(abq.remove());
}
}
阻塞队列大小为 3,写入 3 个数据后,取的数据个数大于写入数据个数时,报错:NoSuchElementException 找不到元素异常
2.有返回值,不抛出异常- 添加方法:offer()
- 移除方法:poll()
- 获得队首数据:peek()
package BXBF;
import java.util.concurrent.ArrayBlockingQueue;
public class BQ {
public static void main(String[] args) {
test();
}
public static void test(){
ArrayBlockingQueue abq = new ArrayBlockingQueue<>(3);
System.out.println(abq.offer("145"));
System.out.println(abq.offer("258"));
System.out.println(abq.offer("198"));
System.out.println(abq.offer("123"));
}
}
阻塞队列大小被定义为 3,当写入数据数量大于 3 时,不会报错报错,而是返回 false
package BXBF;
import java.util.concurrent.ArrayBlockingQueue;
public class BQ {
public static void main(String[] args) {
test();
}
public static void test(){
ArrayBlockingQueue abq = new ArrayBlockingQueue<>(3);
System.out.println(abq.offer("145"));
System.out.println(abq.offer("258"));
System.out.println(abq.offer("198"));
System.out.println(abq.poll());
System.out.println(abq.poll());
System.out.println(abq.poll());
System.out.println(abq.poll());
}
}
阻塞队列大小为 3,写入 3 个数据后,取的数据个数大于写入数据个数时,不会报错,而是返回 null
3.阻塞等待(一直等待)- 添加方法:put()
- 移除方法:take()
package BXBF;
import java.util.concurrent.ArrayBlockingQueue;
public class BQ {
public static void main(String[] args) throws InterruptedException {
test();
}
public static void test() throws InterruptedException {
ArrayBlockingQueue abq = new ArrayBlockingQueue<>(3);
abq.put("145");
abq.put("258");
abq.put("198");
abq.put("18");
}
}
阻塞队列大小被定义为 3,当写入数据数量大于 3 时,程序阻塞,直到队列中出现空位可以将数据写入
package BXBF;
import java.util.concurrent.ArrayBlockingQueue;
public class BQ {
public static void main(String[] args) throws InterruptedException {
test();
}
public static void test() throws InterruptedException {
ArrayBlockingQueue abq = new ArrayBlockingQueue<>(3);
abq.put("145");
abq.put("258");
abq.put("198");
System.out.println(abq.take());
System.out.println(abq.take());
System.out.println(abq.take());
System.out.println(abq.take());
}
}
阻塞队列大小为 3,写入 3 个数据后,取的数据个数大于写入数据个数时,程序阻塞,直到队列中有新的数据写入
4.超时等待(超时退出)- 添加方法:offer(),带参
- 移除方法:poll(),带参
package BXBF;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
public class BQ {
public static void main(String[] args) throws InterruptedException {
test();
}
public static void test() throws InterruptedException {
ArrayBlockingQueue abq = new ArrayBlockingQueue<>(3);
System.out.println(abq.offer("145"));
System.out.println(abq.offer("258"));
System.out.println(abq.offer("198"));
System.out.println(abq.offer("11", 1, TimeUnit.SECONDS));
}
}
阻塞队列大小被定义为 3,当写入数据数量大于 3 时,程序阻塞,等待指定时间后,获得位置则写入,否则返回 false
package BXBF;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
public class BQ {
public static void main(String[] args) throws InterruptedException {
test();
}
public static void test() throws InterruptedException {
ArrayBlockingQueue abq = new ArrayBlockingQueue<>(3);
System.out.println(abq.offer("145"));
System.out.println(abq.offer("258"));
System.out.println(abq.offer("198"));
System.out.println(abq.poll());
System.out.println(abq.poll());
System.out.println(abq.poll());
System.out.println(abq.poll(1, TimeUnit.SECONDS));
}
}
阻塞队列大小为 3,写入 3 个数据后,取的数据个数大于写入数据个数时,程序阻塞,等待指定时间后,有新数据写入则移除,否则返回 null
同步队列(SynchronousQueue)-
没有容量
进去一个元素,必须等待取出来之后,才能再往里面放另一个元素
-
添加方法:put()
-
移除方法:take()
package BXBF;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
public class BQ {
public static void main(String[] args) throws InterruptedException {
SynchronousQueue s = new SynchronousQueue<>();
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+"写入了数据"); //打印会优先抢占cpu资源
s.put("123");
System.out.println(Thread.currentThread().getName()+"写入了数据");
s.put("123");
System.out.println(Thread.currentThread().getName()+"写入了数据");
s.put("123");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"1").start();
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName()+"移除了数据"+s.take());
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName()+"移除了数据"+s.take());
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName()+"移除了数据"+s.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
},"2").start();
}
}
线程池(重点)
- 一般使用 ThreadPoolExecutor() 自定义线程池
- Executors 不安全
- 优化资源的使用
- 事先准备好一些资源,需要用时使用,用完归还,避免多次创建,销毁
- 好处:降低资源消耗,提高响应速度,方便管理 —> 线程复用,可以控制最大并发数,管理线程
- 线程池,连接池,内存池,对象池
- 创建一个大小为一的线程池
package BXBF;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Ex {
public static void main(String[] args) {
ExecutorService es = Executors.newSingleThreadExecutor(); //创建一个大小为一的线程池
try {
for (int i = 0; i < 10; i++) {
es.execute(()->{ //开启线程池开启线程
System.out.println(Thread.currentThread().getName());
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
es.shutdown(); //关闭线程池
}
}
}
newFixedThreadPoo()
- 创建一个大小固定的线程池
package BXBF;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Ex {
public static void main(String[] args) {
ExecutorService es = Executors.newFixedThreadPool(5); //创建一个大小固定的线程池
try {
for (int i = 0; i < 10; i++) {
es.execute(()->{
System.out.println(Thread.currentThread().getName());
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
es.shutdown();
}
}
}
newCachedThreadPoo()
- 创建一个大小可变的线程池
package BXBF;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Ex {
public static void main(String[] args) {
ExecutorService es = Executors.newCachedThreadPool(); //创建一个大小可变的线程池
try {
for (int i = 0; i < 10; i++) {
es.execute(()->{
System.out.println(Thread.currentThread().getName());
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
es.shutdown();
}
}
}
7 大参数
- 通过源码得知 newSingleThreadExecutor(),newFixedThreadPoo(),newCachedThreadPoo() 三个方法本质上都是调用 ThreadPoolExecutor() 方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
ThreadPoolExecutor() 方法中的七个参数
int corePoolSize
- 核心线程池大小,线程池的初始容量
- 最大核心线程池大小,当 BlockingQueue workQueue 满时,线程池可以开启的最大容量
- 超过指定时间没有被调用就会将线程池容量初始化
- 超过指定时间的单位
- 阻塞队列,需要设置阻塞队列的大小,到超过上限时触发线程池扩容
- 线程工厂,创建线程,一般不用动
- 解决策略
- 当线程池达到最大容量且阻塞队列达到上限后,还有数据进入,拒绝处理并抛出异常
package BXBF;
import java.util.concurrent.*;
public class Ex {
public static void main(String[] args) {
ThreadPoolExecutor tpe = new ThreadPoolExecutor(
2,
5,
3,
TimeUnit.SECONDS,
new linkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
try {
for (int i = 1; i <= 9; i++) {
tpe.execute(() -> {
System.out.println(Thread.currentThread().getName());
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
tpe.shutdown();
}
}
}
CallerRunsPolicy
- 当线程池达到最大容量且阻塞队列达到上限后,还有数据进入,将数据交给调用该数据的线程执行
package BXBF;
import java.util.concurrent.*;
public class Ex {
public static void main(String[] args) {
ThreadPoolExecutor tpe = new ThreadPoolExecutor(
2,
5,
3,
TimeUnit.SECONDS,
new linkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy());
try {
for (int i = 1; i <= 9; i++) {
tpe.execute(() -> {
System.out.println(Thread.currentThread().getName());
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
tpe.shutdown();
}
}
}
DiscardOldestPolicy
- 当线程池达到最大容量且阻塞队列达到上限后,还有新数据进入,移除最早的数据,新数据尝试再次进入
package BXBF;
import java.util.concurrent.*;
public class Ex {
public static void main(String[] args) {
ThreadPoolExecutor tpe = new ThreadPoolExecutor(
2,
5,
3,
TimeUnit.SECONDS,
new linkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy());
try {
for (int i = 1; i <= 9; i++) {
tpe.execute(() -> {
System.out.println(Thread.currentThread().getName());
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
tpe.shutdown();
}
}
}
DiscardPolicy
- 当线程池达到最大容量且阻塞队列达到上限后,还有数据进入,拒绝处理但不抛出异常
package BXBF;
import java.util.concurrent.*;
public class Ex {
public static void main(String[] args) {
ThreadPoolExecutor tpe = new ThreadPoolExecutor(
2,
5,
3,
TimeUnit.SECONDS,
new linkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardPolicy());
try {
for (int i = 1; i <= 9; i++) {
tpe.execute(() -> {
System.out.println(Thread.currentThread().getName());
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
tpe.shutdown();
}
}
}
线程池的最大容量如何定义
CPU 密集型
- 电脑为几条线程就为几,可以保证 CPU 效率最高
ThreadPoolExecutor tpe = new ThreadPoolExecutor(
2,
Runtime.getRuntime().availableProcessors(), //获得当前电脑的线程数目
3,
TimeUnit.SECONDS,
new linkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy());
IO 密集型
- 判断程序中十分耗 IO 的线程数,将线程池的最大容量定义为此线程数的两倍
- 只有一个方法的接口
@FunctionalInterface //函数式接口
public interface Runnable {
public abstract void run();
}}
- 可以简化编程模型,在新版本的框架底层大量应用
- 函数型接口
public interface Function{ //传入参数T,返回类型R R apply(T t); }
package BXBF;
import java.util.function.Function;
public class Funct {
public static void main(String[] args) {
Function f=new Function() {
@Override
public String apply(String s) {
return s;
}
};
System.out.println(f.apply("147"));
}
}
lambda 表达式简化
package BXBF;
import java.util.function.Function;
public class Funct {
public static void main(String[] args) {
Function f=(s)->{return s;};
System.out.println(f.apply("147"));
}
}
Predicate
- 断定型接口
public interface Predicate{ //传入参数T,返回boolean类型 boolean test(T t); }
package BXBF;
import java.util.function.Predicate;
public class Pr {
public static void main(String[] args) {
Predicate p=new Predicate() {
@Override
public boolean test(String s) {
return s.isEmpty();
}
};
System.out.println(p.test(""));
}
}
lambda 表达式简化
package BXBF;
import java.util.function.Predicate;
public class Pr {
public static void main(String[] args) {
Predicate p=(s)-> {return s.isEmpty();};
System.out.println(p.test(""));
}
}
Consumer
- 消费型接口
public interface Consumer{ //传入参数T,没有返回值 void accept(T t); }
package BXBF;
import java.util.function.Consumer;
public class Co {
public static void main(String[] args) {
Consumer c= new Consumer() {
@Override
public void accept(String s) {
System.out.println(s);
}
};
c.accept("147");
}
}
lambda 表达式简化
package BXBF;
import java.util.function.Consumer;
public class Co {
public static void main(String[] args) {
Consumer c=(s)-> {System.out.println(s);};
c.accept("147");
}
}
Supplier
- 供给型接口
public interface Supplier{ //没有参数只有返回值 T get(); }
package BXBF;
import java.util.function.Supplier;
public class Spl {
public static void main(String[] args) {
Supplier s=new Supplier() {
@Override
public String get() {
return "14526";
}
};
System.out.println(s.get());
}
}
lambda 表达式简化
package BXBF;
import java.util.function.Supplier;
public class Spl {
public static void main(String[] args) {
Supplier s=()-> {return "14526";};
System.out.println(s.get());
}
}
Stream 流式计算
- 大数据:存储 + 计算
- 集合,数据库用来存储
- 流用来计算
- 筛选:filter()
- 转化:map()
- 比较:sorted()
- 输出指定个数:limit()
- 打印:forEach()
- 添加此区间的数据(开区间):range()
- 添加此区间的数据(前开后闭):rangeClosed()
- 并行计算:parallel()
小例子
对四个用户进行以下操作
- 只保留 ID 为偶数的
- 只保留年龄大于 17 的
- 将用户名转换为大写
- 将用户名倒着排序
- 只输出一个用户
package BXBF;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
public class UUSER {
public static void main(String[] args) {
User user = new User("a",1,12);
User user1 = new User("b",2,18);
User user2 = new User("c",3,1);
User user3 = new User("d",4,28);
List list = Arrays.asList(user, user1, user2, user3);
list.stream()
.filter(u->{return u.getId()%2==0;})
.filter(u->{return u.getAge()>17;})
.map(u->{u.setName(u.getName().toUpperCase());return u;})
.sorted((u1,u2)->{return u2.getName().compareTo(u1.getName());})
.limit(1)
.forEach(System.out::println);
}
}
ForkJoin
- 并行执行任务,在数据量特别大的时候使用以提高效率
- 原理,将大任务不断的拆分成一个个小任务,再将小任务执行,得到结果,将结果一次向上合并,最终得到大任务的结果
- 里面维护的都是双端队列
- 特点:工作窃取,当一个线程完成自己的工作后,会帮住别的线程完成未完成的工作,从队尾开始
-
通过 ForkJoinPool 来执行
相关方法:执行:execute()
提交任务:submit()
-
新建一个计算任务,ForkJoinPool.execute(ForkJoinTask task)
ForkJoinTask 有两个实现类 RecursiveAction(递归事件,没有返回值),Resursivetask(递归任务,有返回值)
-
计算类继承 Resursivetask 实现类
- 拆分任务,把任务压入线程队列:fork()
- 获取执行结果:join()
小例子:大量数据的加法实现
package BXBF; import java.util.concurrent.RecursiveTask; public class ForkJ extends RecursiveTask{ private Long start; private Long end; private Long middle=10000L; public ForkJ(long start, long end) { this.start = start; this.end = end; } @Override protected Long compute() { if ((end-start) package BXBF; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.stream.LongStream; public class FJtext { public static void main(String[] args) throws ExecutionException, InterruptedException { test1(); test2(); test3(); } //直接计算 public static void test1(){ Long sum=0L; long start = System.currentTimeMillis(); for (Long i = 0L; i <= 1_0000_0000L; i++) { sum+=i; } long end = System.currentTimeMillis(); System.out.println("结果为:"+sum+",用时:"+(end-start)); } //ForkJoin计算 public static void test2() throws ExecutionException, InterruptedException { long start = System.currentTimeMillis(); ForkJoinPool fjp = new ForkJoinPool(); ForkJoinTaskfj = new ForkJ(0L, 1_0000_0000L); ForkJoinTask submit = fjp.submit(fj); Long sum=submit.get(); long end = System.currentTimeMillis(); System.out.println("结果为:"+sum+",用时:"+(end-start)); } //Stream流式计算 public static void test3(){ long start = System.currentTimeMillis(); Long sum = LongStream.rangeClosed(0L, 1_0000_0000L).parallel().reduce(0, Long::sum); long end = System.currentTimeMillis(); System.out.println("结果为:"+sum+",用时:"+(end-start)); } } 效率:Stream 流式计算 > ForkJoin 计算 > 直接计算
ForkJoin 可以通过改变临界值提高效率
JMM
关于 JMM 的一些同步的约定
- Java 内存模型,不存在,是一种约定
8 种操作
- 线程加锁前
- 必须读取主存中的最新值到线程的工作内存中
- 线程解锁前
- 当线程需要使用主存中的变量时,不会对主存变量进行直接操作,而是会将其复制一份到线程自己的工作内存中,线程实际操作的是自己工作内存中的复制体,当解锁前,必须把修改后的复制体立刻刷新回主存,替换掉主存变量
- 加锁和解锁必须是同一把锁
- JMM 规范的内存与工作内存之间进行数据交换的八种操作,每个操作都是原子性的
lock(加锁):作用于主内存的变量,一个变量在同一时间只能一个线程锁定。该操作表示该线程独占锁定的变量
unlock(解锁):作用于主内存的变量,表示这个变量的状态由处于锁定状态被释放,这样其他线程才能对该变量进行锁定
read(读取):作用于主内存变量,表示把一个主内存变量的值传输到线程的工作内存,以便随后的 load 操作使用
load(载入):作用于线程的工作内存的变量,表示把 read 操作从主内存中读取的变量的值放到工作内存的变量副本中
use(使用):作用于线程的工作内存中的变量,表示把工作内存中的一个变量的值传递给执行引擎
assign(赋值):作用于线程的工作内存的变量,表示把执行引擎返回的结果赋值给工作内存中的变量
store(存储):作用于线程的工作内存中的变量,把工作内存中的一个变量的值传递给主内存,以便随后的 write 操作使用
write(写入):作用于主内存的变量,把 store 操作从工作内存中得到的变量的值放入主内存的变量中
Volatile
三大作用 保证可见性
Java 一个内置的关键字
Java 虚拟机提供的轻量级的同步机制
package BXBF; import java.util.concurrent.TimeUnit; public class TextJ { private static int i=0; public static void main(String[] args) throws InterruptedException { new Thread(()->{ while (i==0){ } },"1").start(); TimeUnit.SECONDS.sleep(1); i=1; System.out.println(i); } }上述情况中主存中的 i 值已经变化,然而线程 1 对主存中的变化是不可见的,程序一直循环
在变量前用 Volatile 修饰,让线程 1 对主存的变化可见
package BXBF; import java.util.concurrent.TimeUnit; public class TextJ { private volatile static int i=0; public static void main(String[] args) throws InterruptedException { new Thread(()->{ while (i==0){ } },"1").start(); TimeUnit.SECONDS.sleep(1); i=1; System.out.println(i); } }不保证原子性
- 原子性:不可分割
- 线程在执行任务时,不能被打扰,也不能被分割,要么同时成功,要么同时失败
在不使用 synchronized 和 Lock 的情况下如何保证原子性
使用 Atomic 原子类,解决原子性问题
package BXBF; import java.util.concurrent.atomic.AtomicInteger; public class At { // private static int i=0; private static AtomicInteger i=new AtomicInteger(); public static void add(){ // i++; //非原子性操作 i.getAndIncrement(); //AtomicInteger类中的+1方法,用的是CAS,效率极高 } public static void main(String[] args) { for (int i1 = 0; i1 < 10; i1++) { new Thread(()->{ for (int i2 = 0; i2 < 1000; i2++) { add(); } }).start(); } while (Thread.activeCount()>2){ Thread.yield(); } System.out.println(i); } }避免指令重排单例模式
- 指令重排:计算机并不是按照你写的程序去执行的
- 代码执行的过程
- 源代码 -> 编译器优化重排 -> 指令并行也可能会重排 -> 内存系统也会重排 -> 执行
- 处理器在进行指令重排时,会考虑数据之间的依赖性
- 内存屏障(CPU指令),在单例模式中使用的最多
- 作用
- 保证特定的操作的执行顺序
- 可以保证某些变量的内存可见性(利用这些特性 volatile 实现了可见性)
- 当一个线程中操作的目标加了 volatile ,在 CPU 执行时,会在该线程执行前后加内存屏障
饿汉式单例
- 构造器私有,保证对象唯一
- 反射可以破坏单例模式
package BXBF; public class Hungry { //会在程序开始时将该类的内容全部加载,可能会造成空间浪费 private byte[] a1=new byte[1024*1024]; private byte[] a2=new byte[1024*1024]; private byte[] a3=new byte[1024*1024]; private byte[] a4=new byte[1024*1024]; private Hungry() { } private final static Hungry Hungry=new Hungry(); //保证对象唯一 public static Hungry getInstance(){ return Hungry; } }懒汉式单例package BXBF; public class Lazy { private Lazy() { System.out.println(Thread.currentThread().getName()); } private volatile static Lazy lazy; //双重检测锁模式,DCL懒汉式 private static Lazy getInstance(){ if (lazy==null){ synchronized (Lazy.class){ if (lazy == null) { lazy = new Lazy(); //不是一个原子性操作 } } } return lazy; } public static void main(String[] args) { for (int i = 0; i < 10; i++) { new Thread(()->{ lazy.getInstance(); }).start(); } } }匿名内部类package BXBF; public class ClassR { private ClassR(){ } public static ClassR getInstance(){ return ClassROO.Classr; } public static class ClassROO{ private static final ClassR Classr=new ClassR(); } }单例模式都不安全可以通过反射破坏
枚举CAS
- 本身也是一个 Class 类
- 没有无参构造,只有有参构造(包含两个参数)
- 安全的,无法通过发射破坏
- 是单例模式的最佳方法
- compareAndSet(比较并交换)
public final boolean compareAndSet(int expect, int update) { return unsafe.compareAndSwapInt(this, valueOffset, expect, update); }当第一个参数等于初始值时,将初始值更新为第二个参数,否则不更新,一直循环
- Java 无法操作内存
- Java 可以通过 native方法调用 C++
- C++ 可以操作内存
- Java 可以通过 unsafe 类操作内存
- 缺点:
- 循环会耗时
- 一次性只能保证一个共享变量的原子性
- 会存在 ABA 问题



