栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

(大厂必备)厂长熬夜爆肝万字之多线程高并发JUC编程(一)⭐学妹已收藏

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

(大厂必备)厂长熬夜爆肝万字之多线程高并发JUC编程(一)⭐学妹已收藏

(大厂必备)厂长熬夜爆肝万字之多线程高并发JUC编程(一)⭐学妹已收藏 ❤️‍ 大家好,我是java厂长,今天带你们体验一把多线程高并发的面试高频!❤️‍ 关于作者
  • 作者介绍

 博客主页:作者主页

 简介:JAVA领域优质创作者磊、一名在校大三学生、在校期间参加各种省赛、国赛,斩获一系列荣誉。

 关注我:关注我学习资料、文档下载统统都有,每日定时更新文章,励志做一名JAVA资深程序猿‍。

JUC学习

文章目录
  • (大厂必备)厂长熬夜爆肝万字之多线程高并发JUC编程(一)⭐学妹已收藏
    • 关于作者
    • JUC学习
      • 1、什么是JUC
      • 2、线程和进程
        • 面试题:谈一谈wait和sleep区别?
      • 3、Lock锁(重点)
      • 4、生产者消费者问题
      • 5、8锁现象
      • 6、集合不安全
      • 7、Callable
      • 8、常用的辅助类
        • 1) CountDownLatch—减法计数器
        • 2) CyclicBarrier—加法计数器
        • 3) Semaphore
      • 9、读写锁ReadwriteLock
      • 10、堵塞队列
        • 1) 四组API
        • 2) SynchronizedQueue 同步队列
      • 11、线程池
        • 1) 线程池的好处:
        • 2) 线程池: 三大方法
        • 3) 七大参数
        • 4) 拒绝策略
      • 12、四大函数式接口
        • 1) Function函数型接口
        • 2) Predicate 断定型接口
        • 3) Suppier 供给型接口
        • 4) Consummer 消费型接口
      • 13、Stream 流式计算
      • 14、ForkJoin—多线并发处理框架
        • 1)ForkJoin 特点: 工作窃取!
      • 2)如果使用ForkJoin
      • ❤️‍❤️‍❤️‍最后(学妹照片)

1、什么是JUC


源码+官方文档

JUC是 java util concurrent

面试高频问JUC~!

java.util 是Java的一个工具包

业务:普通的线程代码 Thread

Runnable: 没有返回值、效率相比于Callable 相对较低!

2、线程和进程

进程:一个程序,允许一个java程序会进程里面会出现一个java.exe;数据+代码+pcb

一个进程可以包含多个线程,至少包含一个线程!

Java默认有几个线程?2个线程! main线程、GC线程

线程:开了一个进程qq,聊天打字,消息提示(线程负责的)

对于Java而言:Thread、Runable、Callable进行开启线程的。

JAVA真的可以开启线程吗? 开不了的!
原因Java没有权限去开启线程、操作硬件的,这是一个native的一个本地方法,它调用的底层的C++代码。

并发、并行

并发: 多线程操作同一个资源。

  • CPU 只有一核,模拟出来多条线程,那么我们就可以使用CPU快速交替,来模拟多线程。

并行: 多个人并排行走。

  • CPU多核,多个线程可以同时执行。
public class Test {
    public static void main(String[] args) {
        //获取cpu的核数
        System.out.println(Runtime.getRuntime().availableProcessors());
    }
}

并发编程的本质:充分利用CPU的资源!

线程的6个状态

public enum State {
    	//创建
        NEW,

    	//运行
        RUNNABLE,
    
    	//阻塞
        BLOCKED,
    
    	//等待
        WAITING,
    
    	//超时等待
        TIMED_WAITING,

    	//终止
        TERMINATED;
    }
面试题:谈一谈wait和sleep区别?
区别waitsleep
操作的类ObjectThread
锁的释放会释放锁抱着锁睡觉
范围同步代码块中任何地方
异常捕获不需要捕获异常需要捕获异常
3、Lock锁(重点)

synchronized锁问题

package com.zmz.day01;

//资源类 属性 + 方法 oop
class Ticket{
    private int num = 50;
    //卖票方式  synchronized 本质:队列 锁
    public synchronized void sale(){
        if(num > 0){
            System.out.println(Thread.currentThread().getName()+ " 卖出了第"+ num +" 张票,剩余:"+ --num +" 张票");
        }
    }
}


public class TicketTest {
    public static void main(String[] args) {
        //多线陈操作
        //并发:多个线程操作同一个资源ticket
        Ticket ticket = new Ticket();
        //@FunctionalInterface 函数式接口 jdk1.8之后 lambda表达式
        new Thread(()->{
            for (int i = 0; i < 60; i++) {
                ticket.sale();
            }
        },"A").start();
        new Thread(()->{
            for (int i = 0; i < 60; i++) {
                ticket.sale();
            }
        },"B").start();
        new Thread(()->{
            for (int i = 0; i < 60; i++) {
                ticket.sale();
            }
        },"C").start();
    }
}

Lock接口

公平锁: 公平,必须先来后到~;

非公平锁: 不公平,可以插队;(默认为非公平锁)

使用Lock进行操作

package com.zmz.day01;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;



class Ticket2{
    
    Lock l = new ReentrantLock();
    private int num = 50;
    //卖票方式  synchronized 本质:队列 锁
    public  void sale(){
        //加锁
        l.lock();
        try {
            //业务代码
            if(num > 0){
                System.out.println(Thread.currentThread().getName()+ " 卖出了第"+ num +" 张票,剩余:"+ --num +" 张票");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //解锁
            l.unlock();
        }
    }
}


public class TicketTest2 {
    public static void main(String[] args) {
        //多线陈操作
        //并发:多个线程操作同一个资源ticket
        Ticket ticket = new Ticket();
        //@FunctionalInterface 函数式接口 jdk1.8之后 lambda表达式
        new Thread(()->{
            for (int i = 0; i < 60; i++) {
                ticket.sale();
            }
        },"A").start();
        new Thread(()->{
            for (int i = 0; i < 60; i++) {
                ticket.sale();
            }
        },"B").start();
        new Thread(()->{
            for (int i = 0; i < 60; i++) {
                ticket.sale();
            }
        },"C").start();
    }
}
区别synchronizedlock
名称属于关键字属于对象
状态不可以获取锁的状态可以获取锁的状态
锁的管理自动释放锁需要手动加锁以及释放锁
线程自己抱着锁等待
可重入锁,不可以中断的,非公平的可重入的,可以判断锁,可以自己设置公平锁和非公平锁
代码同步适合少量的代码同步适合大量的代码同步
4、生产者消费者问题

synchronized版

package com.zmz.day01;



public class TicketTest3 {
    public static void main(String[] args) {
        Data data = new Data();
        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"A").start();

        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"B").start();
    }
}


//判断等待  业务  唤醒
class Data{
    private int number = 0;
    //      +1操作
    public synchronized void increment() throws InterruptedException {
        if(number != 0 ){
            this.wait();
        }
        number++;
        System.out.println(Thread.currentThread().getName()+"=>"+number);
        this.notifyAll();
    }
    //      -1操作
    public synchronized void decrement() throws InterruptedException{
        if (number == 0){
            this.wait();
        }
        number--;
        System.out.println(Thread.currentThread().getName()+"=>"+number);
        this.notifyAll();
    }
}

问题存在,A线程B线程,现在如果我有四个线程A B C D!该怎么去解决问题

if判断改为While判断就可以解决虚假唤醒的问题。

package com.zmz.day01;



//线程之间的通讯问题:生产者和消费者的问题!  等待唤醒,通知唤醒
//线程交替执行 A   B操作同一个资源
public class TicketTest3 {
    public static void main(String[] args) {
        Data data = new Data();
        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"A").start();

        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"B").start();
        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"C").start();
        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"D").start();
    }
}


//判断等待  业务  唤醒
class Data{
    private int number = 0;
    //      +1操作
    public synchronized void increment() throws InterruptedException {
        while(number != 0 ){
            this.wait();
        }
        number++;
        System.out.println(Thread.currentThread().getName()+"=>"+number);
        this.notifyAll();
    }
    //      -1操作
    public synchronized void decrement() throws InterruptedException{
        while (number == 0){
            this.wait();
        }
        number--;
        System.out.println(Thread.currentThread().getName()+"=>"+number);
        this.notifyAll();
    }
}

JUC版本的解决A B C D多线程的问题

package com.zmz.day01;


import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;


public class JucTest1 {
    public static void main(String[] args) {
        Data2 data = new Data2();

        new Thread(()->{for(int i=0;i<10;i++) {
            data.increment();
        }
        },"A").start();
        new Thread(()->{for(int i=0;i<10;i++) {
            data.decrement();
        }},"B").start();
        new Thread(()->{for(int i=0;i<10;i++) {
            data.increment();
        }
        },"C").start();
        new Thread(()->{for(int i=0;i<10;i++) {
            data.decrement();
        }
        },"D").start();
    }
}


class Data2{
    private int number = 0;
    //lock锁
    Lock l = new ReentrantLock();
    Condition condition = l.newCondition();

    public void increment() {
        l.lock();
        try {
            //业务
            while (number!=0){
                //等待操作
                condition.await();
            }
            number++;
            System.out.println(Thread.currentThread().getName()+"=>"+number);
            //通知其他线程 我+1完毕了
            condition.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            l.unlock();
        }
    }

    public void decrement()  {
        l.lock();
        try {
            //业务
            while (number==0){
                //等待操作
                condition.await();
            }
            number--;
            System.out.println(Thread.currentThread().getName()+"=>"+number);
            //通知其他线程 我-1完毕了
            condition.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            l.unlock();
        }
    }
}

Condition的优势:精准通知、唤醒的线程

package com.zmz.day01;


import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;


public class JucTest2 {
    public static void main(String[] args) {
        Data3 data3 = new Data3();
        new Thread(()->{
            for(int i=0;i<10;i++){
                data3.printA();
            }
        },"A").start();
        new Thread(()->{
            for(int i=0;i<10;i++){
                data3.printB();
            }
        },"B").start();
        new Thread(()->{
            for(int i=0;i<10;i++){
                data3.printC();
            }
        },"C").start();
    }
}


class Data3{
    private Lock l = new ReentrantLock();
    Condition condition1 = l.newCondition();
    Condition condition2 = l.newCondition();
    Condition condition3 = l.newCondition();

    private int flag = 1;
    public void printA(){
        l.lock();
        //判断 -> 执行 -> 通知
        try {
            while(flag != 1){
                //等待
                condition1.await();
            }
            System.out.println(Thread.currentThread().getName() + "->A" );
            flag = 2;
            //唤醒指定线程
            condition2.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            l.unlock();
        }
    }

    public void printB(){
        l.lock();
        //判断 -> 执行 -> 通知
        try {
            while(flag != 2){
                //等待
                condition2.await();
            }
            System.out.println(Thread.currentThread().getName() + "->BB" );
            flag = 3;
            //唤醒指定线程
            condition3.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            l.unlock();
        }
    }

    public void printC(){
        l.lock();
        //判断 -> 执行 -> 通知
        try {
            while(flag != 3){
                //等待
                condition3.await();
            }
            System.out.println(Thread.currentThread().getName() + "->CCC" );
            flag = 1;
            //唤醒指定线程
            condition1.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            l.unlock();
        }
    }
}
5、8锁现象

1-2锁

package com.zmz.lock8;

import java.util.concurrent.TimeUnit;



public class Test1 {
    public static void main(String[] args) {
        Phone phone = new Phone();

        //锁存在
        new Thread(()->{
            phone.sendSms();
        },"A").start();

        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        new Thread(()->{
            phone.call();
        },"B").start();
    }
}


class Phone{

    //synchronized锁的对象是方法的调用者!
    //两个方法用的都是phone对象的锁!
    //谁先拿到谁执行!
    public synchronized void sendSms(){
        try {
            TimeUnit.SECONDS.sleep(4);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("发短信");
    }
    public synchronized void call(){
        System.out.println("打电话");
    }
}

3-4锁

package com.zmz.lock8;

import java.util.concurrent.TimeUnit;


//3、增加一个普通方法后! 发短信还是Hello,发短信       hello
//4、两个对象,两个同步方法,发短信还是打电话      打电话
public class Test2 {
    public static void main(String[] args) {
        //两个对象,两把锁
        Phone2 phone = new Phone2();
        Phone2 phone2 = new Phone2();

        //锁存在
        new Thread(()->{
            phone.sendSms();
        },"A").start();

        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        new Thread(()->{
            phone2.call();
        },"B").start();


    }
}
class Phone2{

    //synchronized锁的对象是方法的调用者!

    public synchronized void sendSms(){
        try {
            TimeUnit.SECONDS.sleep(4);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("发短信");
    }
    public synchronized void call(){
        System.out.println("打电话");
    }

    //这里没有锁!bubu不是同步方法,不受锁的影响
  public void hello(){
        System.out.println("hello");
    }
}

5-6锁

package com.zmz.lock8;

import java.util.concurrent.TimeUnit;



public class Test3 {
    public static void main(String[] args) {
        //两个对象,两个调用者,两把锁!
        //两个对象的class类模板只有一个,static,锁的是Class
        Phone3 phone = new Phone3();
        Phone3 phone2 = new Phone3();
        //锁存在
        new Thread(()->{
            phone.sendSms();
        },"A").start();

        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        new Thread(()->{
            phone2.call();
        },"B").start();


    }
}
//Phone3唯一的一个Class对象
class Phone3{

    //synchronized锁的对象是方法的调用者!
    //static 静态方法
    //类一加载就有了!Class 模板 锁的是Class
    public static synchronized void sendSms(){
        try {
            TimeUnit.SECONDS.sleep(4);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("发短信");
    }
    public static synchronized void call(){
        System.out.println("打电话");
    }
}

7-8锁

package com.zmz.lock8;

import java.util.concurrent.TimeUnit;



public class Test4 {
    public static void main(String[] args) {
        //两个对象,两个调用者,两把锁!
        //两个对象的class类模板只有一个,static,锁的是Class
        Phone4 phone = new Phone4();
        //锁存在
        new Thread(()->{
            phone.sendSms();
        },"A").start();

        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        new Thread(()->{
            phone.call();
        },"B").start();


    }
}
//Phone4唯一的一个Class对象
class Phone4{

    //synchronized锁的对象是方法的调用者!
    //锁的是Class类模板
    public static synchronized void sendSms(){
        try {
            TimeUnit.SECONDS.sleep(4);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("发短信");
    }
    //锁的是调用者
    public synchronized void call(){
        System.out.println("打电话");
    }


}

6、集合不安全

1、ArrayList集合不安全

package com;

import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;


public class unsafe {
    public static void main(String[] args) {
        //高并发下的ArrayList真的安全么?
        
        List list = new CopyOnWriteArrayList<>();
        //启动10个多线程
        for (int i = 1; i < 10; i++) {
            new Thread(() -> {
                list.add(UUID.randomUUID().toString().substring(0, 5));
                System.out.println(list);
            }, String.valueOf(i)).start();
        }
    }
}

CopyOnWriteArrayList源码分析

2、Set不安全

package com.zmz.unsafe;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArraySet;


public class SetSafe {
    public static void main(String[] args) {
        //Set set = new HashSet<>();
        //Set set = Collections.synchronizedSet(new HashSet<>());
        Set set = new CopyOnWriteArraySet<>();
        for (int i = 1; i < 60; i++) {
            new Thread(() -> {
                set.add(UUID.randomUUID().toString().substring(0, 5));
                System.out.println(set);
            }, String.valueOf(i)).start();
        }
    }
}

HashSet源码

    public HashSet() {
        map = new HashMap<>();
    }

  //HashSet本质就是Map集合
  public boolean add(E e) {
        return map.put(e, PRESENT)==null;
    }

private static final Object PRESENT = new Object();//不变的值

3、HashMap不安全

package com.zmz.unsafe;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;


public class MapSafe {
    public static void main(String[] args) {
        //Map map = new HashMap<>();
        //Map map = Collections.synchronizedMap(new HashMap<>());
        Map map = new ConcurrentHashMap<>();

        for (int i = 1; i <= 30; i++) {
            new Thread(()->{
                map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0, 5));
                System.out.println(map);
            },String.valueOf(i)).start();
        }
    }
}
7、Callable

  1. 可以有返回值
  2. 可以抛出异常
  3. 方法不同,run()/call()

callable源码

代码测试

package com.zmz.callable;

import com.sun.org.apache.bcel.internal.generic.NEW;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;


public class CallableTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // new Thread(new Runnable()).start();
        // new Thread(new FutureTask()).start();
        // new Thread(new FutureTask( Callable )).start();

        new Thread().start();

        MyThread myThread = new MyThread();
        FutureTask futureTask = new FutureTask(myThread);//适配类

        new Thread(futureTask,"A").start();
        new Thread(futureTask,"B").start(); //结果存在缓存提交效率
        Integer o = (Integer) futureTask.get();//获取返回的结果
        //这个get方法可以会产生阻塞! 解决办法 放到最后一行或者异步通信
        System.out.println(o);
    }
}
class MyThread implements Callable{

    @Override
    public Integer call() throws Exception {
        System.out.println("call()方法");
        //耗时操作
        return 1024;
    }
}

8、常用的辅助类 1) CountDownLatch—减法计数器
package com.zmz.assist;

import java.util.concurrent.CountDownLatch;


public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch count = new CountDownLatch(10);
        for (int i = 1; i <= 10; i++) {
            new Thread(()->{
                System.out.println(Thread.currentThread().getName() + "Go out");
                count.countDown();//数量-1
            },String.valueOf(i)).start();

        }
        count.await();//等计数器归零,然后再往下执行
        System.out.println("Close Door");
    }
}

原理:

  • count.countDown();//数量-1
  • count.await();//等待计数器归零。然后再向下执行

每次有线程用countDown()数量-1,如果计算器变为0了,然后count.await()就被唤醒,继续下面的执行!

2) CyclicBarrier—加法计数器
package com.zmz.assist;

import com.sun.org.apache.bcel.internal.generic.NEW;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;


public class CycilcBarrierDemo {
    public static void main(String[] args) throws BrokenBarrierException, InterruptedException {
        CyclicBarrier barrier = new CyclicBarrier(7,()->{
            System.out.println("你已经凑齐了七颗龙珠!可以变身了");
        });

        for (int i = 1; i <= 7; i++) {
            final int temp = i;
            new Thread(()->{
                System.out.println(Thread.currentThread().getName()+ "收集" +(temp) +"颗龙珠");
                try {
                    barrier.await();//等待
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }).start();
        }

    }
}

3) Semaphore

Semaphore:信号量,很多时候用来处理高并发。

package com.zmz.assist;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;


public class SemaphoreDemo {
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(3);
        for (int i = 1; i <= 6; i++) {
            new Thread(()->{
                try {
                    semaphore.acquire();//获得
                    System.out.println(Thread.currentThread().getName()+"得到车位");
                    TimeUnit.SECONDS.sleep(2);
                    System.out.println(Thread.currentThread().getName()+"离开车位");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }   finally {
                    semaphore.release();//释放
                }

            },String.valueOf(i)).start();
        }

    }
}

原理:

  • acquire():获得,假设已经满了组需要等待,直到被释放为止

  • release():释放,会将当前的信号量释放+1,然后唤醒等待的线程!

    作用:多个共享的资源互斥使用!并发限流控制最大的线程数量!

9、读写锁ReadwriteLock

package com.zmz.lock;

import java.util.HashMap;
import java.util.Map;


public class ReadwritelockDemo {
    public static void main(String[] args) {
        Mycache mycache = new Mycache();
        //写入
        for (int i = 1; i < 6; i++) {
            final int temp = i;
            new Thread(()->{
                mycache.put(temp+"",temp+"");
            },String.valueOf(i)).start();
        }
        //读取
        for (int i = 1; i < 6; i++) {
            final int temp = i;
            new Thread(()->{
                mycache.get(temp+"");
            },String.valueOf(i)).start();
        }
    }
}

class Mycache{
    private volatile Map map = new HashMap<>();
    public void put(String key, Object value){
        System.out.println(Thread.currentThread().getName() + "写入" + key);
        map.put(key,value);
        System.out.println(Thread.currentThread().getName() + "写入完成");
    }

    public void get(String key){
        System.out.println(Thread.currentThread().getName() + "读取" + key);
        Object o = map.get(key);
        System.out.println(Thread.currentThread().getName() + "读取完成");
    }
}


我们可以看到出现了严重的插队问题!该如何去解决囊?我们使用读写锁来解决插队的问题。

修改后的操作

package com.zmz.lock;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;




public class ReadwritelockDemo {
    public static void main(String[] args) {
        MycacheLock mycache = new MycacheLock();
        //写入
        for (int i = 1; i < 6; i++) {
            final int temp = i;
            new Thread(() -> {
                mycache.put(temp + "", temp + "");
            }, String.valueOf(i)).start();
        }
        //读取
        for (int i = 1; i < 6; i++) {
            final int temp = i;
            new Thread(() -> {
                mycache.get(temp + "");
            }, String.valueOf(i)).start();
        }
    }
}
//加锁
class MycacheLock {
    private volatile Map map = new HashMap<>();
    //读写锁:更加细粒度的控制
    ReadWriteLock readWriteLock = new ReentrantReadWriteLock();

    //存、写的时候,只希望同时只有一个线程写
    public void put(String key, Object value) {
        readWriteLock.writeLock().lock();

        try {
            System.out.println(Thread.currentThread().getName() + "写入" + key);
            map.put(key, value);
            System.out.println(Thread.currentThread().getName() + "写入完成");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            readWriteLock.writeLock().unlock();
        }

    }

    //取、读所有人都可以进行操作
    public void get(String key) {
        readWriteLock.readLock().lock();

        try {
            System.out.println(Thread.currentThread().getName() + "读取" + key);
            Object o = map.get(key);
            System.out.println(Thread.currentThread().getName() + "读取完成");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            readWriteLock.readLock().unlock();
        }

    }
}

class Mycache {
    private volatile Map map = new HashMap<>();

    public void put(String key, Object value) {
        System.out.println(Thread.currentThread().getName() + "写入" + key);
        map.put(key, value);
        System.out.println(Thread.currentThread().getName() + "写入完成");
    }

    public void get(String key) {
        System.out.println(Thread.currentThread().getName() + "读取" + key);
        Object o = map.get(key);
        System.out.println(Thread.currentThread().getName() + "读取完成");
    }
}


我们可以看到输出的结果在写入的时候有序的进行,读操作的时候可以无序的进行,可以看到已经到达我们预期的效果

10、堵塞队列

堵塞

什么情况下我们使用阻塞队列:多线程并发处理,线程池!

10.1 学会使用队列

添加、移除元素,现在有四组API

1) 四组API
方法抛出异常不会抛出异常,有返回值阻塞等待超时等待
添加add()offer()put()offer(E e, long timeout, TimeUnit unit)
移除remove()poll()take()poll(long timeout, TimeUnit unit)
判断首部element()peek()--
    public static void test1(){
        //队列大小3
        ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
        System.out.println(blockingQueue.add("a"));
        System.out.println(blockingQueue.add("b"));
        System.out.println(blockingQueue.add("c"));
        //IllegalState ExceptionQueue full  抛出异常
        //System.out.println(blockingQueue.add("d"));

        //检测队首元素
        System.out.println(blockingQueue.element());								                                 System.out.println("================================================");
        System.out.println(blockingQueue.remove());
        System.out.println(blockingQueue.remove());
        System.out.println(blockingQueue.remove());
        //java.util.NoSuchElementException  抛出异常
        //System.out.println(blockingQueue.remove());

    }
    public static void test2(){
        //队列大小3
        ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
        System.out.println(blockingQueue.offer("a"));
        System.out.println(blockingQueue.offer("b"));
        System.out.println(blockingQueue.offer("c"));
        //false
        System.out.println(blockingQueue.offer("d"));

        //检测队首元素
        System.out.println(blockingQueue.peek());
        System.out.println("================================================");
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        //null
        System.out.println(blockingQueue.poll());

    }
    public static void test3() throws InterruptedException {
        //队列大小3
        ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
        blockingQueue.put("a");
        blockingQueue.put("b");
        blockingQueue.put("c");
        //blockingQueue.put("d");

        //无检测队首元素
        System.out.println("================================================");
        System.out.println(blockingQueue.take());
        System.out.println(blockingQueue.take());
        System.out.println(blockingQueue.take());
      //System.out.println(blockingQueue.take());

    }
    public static void test4() throws InterruptedException {
        //队列大小3
        ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
        blockingQueue.offer("a");
        blockingQueue.offer("b");
        blockingQueue.offer("c");
        blockingQueue.offer("d",2, TimeUnit.SECONDS);

        //无检测队首元素
        System.out.println("================================================");
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        blockingQueue.poll(2,TimeUnit.SECONDS);
    }
2) SynchronizedQueue 同步队列

没有容量,进去一个元素,必须等待取出来之后,才能再往里面放一个元素

put take

package com.zmz.queue;



import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;


public class SyncQueue {

    public static void main(String[] args) {
        SynchronousQueue synchronousQueue = new SynchronousQueue<>(); //同步队列

        new Thread(()->{

            try {
                System.out.println(Thread.currentThread().getName() + "put 1");
                synchronousQueue.put("1");
                System.out.println(Thread.currentThread().getName() + "put 2");
                synchronousQueue.put("2");
                System.out.println(Thread.currentThread().getName() + "put 3");
                synchronousQueue.put("3");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"Thread1").start();

        new Thread(()->{
            try {
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName() + "=>" + synchronousQueue.take());
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName() + "=>" + synchronousQueue.take());
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName() + "=>" + synchronousQueue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
            }
        },"Thread2").start();
    }
}


11、线程池

线程池有三大方法,七大参数,四种拒绝策略

程序的运行,本质: 占用系统的资源 ! 优化CPU资源的使用 ===>池化技术

线程池, 连接池, 内存池, 对象池///…

池化技术: 实现准备好一些资源, 有人要用,就来我这里拿,用完之后还给我

1) 线程池的好处:
  1. 降低资源消耗
  2. 提高响应速度
  3. 方便管理

线程复用,可以控制最大并发数,管理线程

2) 线程池: 三大方法
  • ExecutorService service = Executors.newSingleThreadExecutor();//单个线程
  • ExecutorService service = Executors.newFixedThreadPool(5);//创建一个固定的线程池的大小
  • ExecutorService service = Executors.newCachedThreadPool();//可伸缩的,
package com.zmz.Pool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class ThreadPool {
    public static void main(String[] args) {

        ExecutorService threadPool = Executors.newSingleThreadExecutor();//单个线程
        ExecutorService threadPool2 = Executors.newFixedThreadPool(5); //创建一个固定的线程池的大小
        ExecutorService threadPool3 = Executors.newCachedThreadPool(); //可伸缩的

        //线程池用完必须要关闭线程池
        try {

            for (int i = 1; i <=100 ; i++) {
                //通过线程池创建线程
                threadPool3.execute(()->{
                    System.out.println(Thread.currentThread().getName()+ " ok");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPool3.shutdown();
        }
    }
}
3) 七大参数
public ThreadPoolExecutor(int corePoolSize,  //核心线程池大小
                          int maximumPoolSize, //最大的线程池大小
                          long keepAliveTime,  //超时了没有人调用就会释放
                          TimeUnit unit, //超时单位
                          BlockingQueue workQueue, //阻塞队列
                          ThreadFactory threadFactory, //线程工厂 创建线程的 一般不用动
                          RejectedExecutionHandler handler //拒绝策略
                         ) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

package com.zmz.Pool;

import java.util.concurrent.*;


public class ThreadPoolExecutorTest {
    public static void main(String[] args) {
        // 获取cpu 的核数
        int max = Runtime.getRuntime().availableProcessors();
        ExecutorService service =new ThreadPoolExecutor(
                2,//核心线程池大小
                max,//最大的线程池大小
                3,//超时了没有人调用就会释放
                TimeUnit.SECONDS,//超时单位
                new linkedBlockingDeque<>(3),//阻塞队列
                Executors.defaultThreadFactory(),//线程工厂 创建线程的
                new ThreadPoolExecutor.AbortPolicy()//拒绝策略
        );
        try {
            for (int i = 1; i <= 5; i++) {
                service.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + "运行成功");
                });
            }
        }catch (Exception e) {
            e.printStackTrace();
        }
        finally {
            service.shutdown();
        }
    }
}
4) 拒绝策略

  • new ThreadPoolExecutor.AbortPolicy() 超出最大处理线程抛出异常
  • new ThreadPoolExecutor.CallerRunsPolicy() 从哪个线程创建就由那个线程执行
  • new ThreadPoolExecutor.DiscardPolicy() 队列满了不会抛出异常
  • new ThreadPoolExecutor.DiscardOldestPolicy() 尝试去和第一个竞争,也不会抛出异常
12、四大函数式接口

新时代的程序员‍:lambda表达式、链式编程、函数式接口、Stream流式计算

1) Function函数型接口

package com.zmz.FourFunction;

import java.util.function.Function;


public class functionDemo {
    public static void main(String[] args) {
        Function function = (str) -> {
            return str;
        };
        System.out.println(function.apply("Hello,zmz!"));
    }
}
2) Predicate 断定型接口

package com.zmz.FourFunction;

import java.util.function.Predicate;


public class PredicateDemo {
    public static void main(String[] args) {
        Predicate predicate = (str) -> {return str.isEmpty();};
        // false
        System.out.println(predicate.test("zmz"));
        // true
        System.out.println(predicate.test(""));
    }
}
3) Suppier 供给型接口

package com.zmz.FourFunction;

import java.util.function.Supplier;


public class SuppierDemo {
    public static void main(String[] args) {
        Supplier supplier = ()->{return "1024";};
        System.out.println(supplier.get());
    }
}
4) Consummer 消费型接口

package com.zmz.FourFunction;

import java.util.function.Consumer;


public class ConsummerDemo {
    public static void main(String[] args) {
        Consumer consumer = (str)->{
            System.out.println(str);
        };
        consumer.accept("zmz");
    }
}
13、Stream 流式计算
package com.zmz.Stream;


public class User {
    private int id;
    private String name;
    private int age;

    public User(int id, String name, int age) {
        this.id = id;
        this.name = name;
        this.age = age;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }
}
package com.zmz.Stream;



import java.util.Arrays;
import java.util.List;


public class StreamDemo {
    public static void main(String[] args) {
        User u1 = new User(1, "a", 23);
        User u2 = new User(2, "b", 23);
        User u3 = new User(3, "c", 23);
        User u4 = new User(6, "d", 24);
        User u5 = new User(4, "e", 25);

        List list = Arrays.asList(u1, u2, u3, u4, u5);
        // lambda、链式编程、函数式接口、流式计算
        list.stream()
                .filter(user -> {return user.getId()%2 == 0;})
                .filter(user -> {return user.getAge() > 20;})
                .map(user -> {return user.getName().toUpperCase();})
                .sorted((user1, user2) -> {return user2.compareTo(user1);})
                .limit(1)
                .forEach(System.out::println);
    }
}


14、ForkJoin—多线并发处理框架

什么是ForkJoin?

ava.util.concurrent.ForkJoinPool由Java大师Doug Lea主持编写,它可以将一个大的任务拆分成多个子任务进行并行处理,最后将子任务结果合并成最后的计算结果,并进行输出。本文中对Fork/Join框架的讲解,基于JDK1.8+中的Fork/Join框架实现,参考的Fork/Join框架主要源代码也基于JDK1.8+。

这几篇文章将试图解释Fork/Join框架的知识点,以便对自己、对各位读者在并发程序的设计思路上进行一些启发。文章将首先讲解Fork/Join框架的基本使用,以及其中需要注意的使用要点;接着使用Fork/Join框架解决一些实际问题;最后再讲解Fork/Join框架的工作原理。

1)ForkJoin 特点: 工作窃取!

2)如果使用ForkJoin

第一步,通过ForkJoinPool来执行

第二步,计算任务 execute(ForkJoinTask task)

第三步,计算类要去继承ForkJoinTask

ForkJoin 的计算类

ForkJoinComputer.java

package com.zmz.ForkJoin;
import java.util.concurrent.RecursiveTask;




public class ForkJoinComputer extends RecursiveTask {
    private long start;
    private long end;
    
    private long temp = 1000000L;

    public ForkJoinComputer(long start, long end) {
        this.start = start;
        this.end = end;
    }

    
    @Override
    protected Long compute() {
        if ((end - start) < temp) {
            Long sum = 0L;
            for (Long i = start; i < end; i++) {
                sum += i;
            }
            return sum;
        }else {
            // 使用ForkJoin 分而治之 计算
            //1 . 计算平均值
            long middle = (start + end) / 2;
            ForkJoinComputer forkJoinDemo1 = new ForkJoinComputer(start, middle);
            // 拆分任务,把线程压入线程队列
            forkJoinDemo1.fork();
            ForkJoinComputer forkJoinDemo2 = new ForkJoinComputer(middle, end);
            forkJoinDemo2.fork();

            long taskSum = forkJoinDemo1.join() + forkJoinDemo2.join();
            return taskSum;
        }
    }
}

测试类 ForkJoinTest.java

package com.zmz.ForkJoin;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;


public class ForkJoinTest {
    private static final long SUM = 20_0000_0000;

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        test1();
        test2();
        test3();
    }

    
    public static void test1() {
        long star = System.currentTimeMillis();
        long sum = 0L;
        for (long i = 1; i < SUM ; i++) {
            sum += i;
        }
        long end = System.currentTimeMillis();
        System.out.println(sum);
        System.out.println("普通程序猿——时间:" + (end - star));
        System.out.println("============================");
    }
    
    public static void test2() throws ExecutionException, InterruptedException {
        long start = System.currentTimeMillis();

        ForkJoinPool forkJoinPool = new ForkJoinPool();
        ForkJoinTask task = new ForkJoinComputer(0L, SUM);
        ForkJoinTask submit = forkJoinPool.submit(task);
        Long along = submit.get();

        System.out.println(along);
        long end = System.currentTimeMillis();
        System.out.println("中级程序猿——时间:" + (end - start));
        System.out.println("--------------");
    }
    
    public static void test3() {
        long start = System.currentTimeMillis();

        long sum = LongStream.range(0L, 20_0000_0000L).parallel().reduce(0, Long::sum);
        System.out.println(sum);
        long end = System.currentTimeMillis();
        System.out.println("高级程序猿——时间:" + (end - start));
        System.out.println("--------------");
        System.out.println("============================");
    }
}

分析一下高级程序猿的处理:

.parallel().reduce(0, Long::sum)使用一个并行流去计算整个计算,提高效率。

❤️‍❤️‍❤️‍最后(学妹照片)

附一张学妹的照片

⭐学妹都已经收藏了!!!⭐

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/305143.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号