栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

【Java多线程】线程安全、生产者消费者模式、定时器、线程池、其他常识

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

【Java多线程】线程安全、生产者消费者模式、定时器、线程池、其他常识

目录

1. 线程安全

2. synchronized锁

3. volatile和单例模式

4. 阻塞队列及生产者消费者模式

5. 定时器

6. 线程池

6.1 使用线程池的意义

6.2 基本使用

6.3 手动实现一个线程池

7. 其他相关知识

7.1 关于锁

7.2 synchronized的优化

总结:


1. 线程安全

什么是线程安全问题?

        举个栗子。一个班上的班费允许给学习委员和体育委员进行学习用品和体育用品的采购,两个人只能采购不超出班费的用品,有一天学习委员查看了一下班费还剩100,于是打算去超市采购了一批学习资料花费50元,但是还未付款。此时体育委员并不知道,而且体育委员查看班费此时还剩100元,于是打算花100购买一个篮球。于是大家发现,明明大家都查看了余额足够,但是购买的时候就会出现问题。

发生线程安全问题的条件是什么?

        线程安全问题产生的条件有3个,1.多线程并发,2.存在共享的数据,3.共享的数据存在修改的动作(写操作)。

        也就是说,多线程的破坏了操作的原子性(然而并不只是原子性破环就引发线程安全问题)。我们指出两个最常见的违反原子性的操作1.read-write操作,2.cheek-update操作。

2. synchronized锁

        为了解决以上的问题,我们让体育委员和学习委员定下一个约定,就是每次花班费之前,要拿着记录班费信息的账本才可以进行消费。也就是说,只有拿着账本的人才可以使用班费。虽然实际上的情况和以上情况有一定区别,但是我们可以通过该例子来理解这种过程。

        以上的操作,我们可以看作一个加锁的过程。也就是说我们要通过这种方式来保证操作的原子性。

        Java中常见的方法是使用synchronized来实现。称为同步锁,内部使用monitor实现,又称monitor锁。让我们来看Java中的具体用法。

public class SomeClass {
    synchronized void m1(){}
    synchronized static void m2(){}
    void m3(){}
    void m4(){
        synchronized (this){ }
    }
    void m5(){
        synchronized (SomeClass.class){ }
    }
    Object o1=new Object();
    void m6(){
        synchronized (o1){}
    }
    static Object o2=new Object();
    void m7(){
        synchronized (o2){}
    }

    public static void main(String[] args) {
        SomeClass s1 = new SomeClass();
        SomeClass s2 = new SomeClass();
        SomeClass s3 = s1;
        //如果t1线程t2线程分别是如下的情况,请问是否互斥?
                

    }
}

        加锁的作用就是:当多个线程1.都有加锁操作2.并且申请的是同一把锁时,只会有一个线程可以加锁成功,其他加锁失败的线程都会1.进入该锁的阻塞队列2.放弃CPU。(造成临界区代码块互斥,所谓临界区代码就是加锁和解锁间的代码) 。

        所以对于以上的问题:

        *  s1.m1()           s1.m3()        s1.m3()没加锁不会互斥
        *  s1.m1()           s1.m4()        都加锁且锁是同一个所以会互斥
        *  s1.m1()           s3.m4()        s1和s3指向同一个对象,和上面一样会互斥
        *  s1.m1()           s1.m5()        s1.m5()加的是类锁,对象不同不会互斥
        *  s1.m2()           s1.m5()        都加的是类锁,会互斥
        *  s1.m1()           s1.m6()        对象不同不会互斥
        *  s1.m6()           s2.m6()        锁的对象不同不会互斥
        *  s1.m6()           s3.m6()        同一个对象会互斥
        *  s1.m2()           s1.m7()        对象不同不会互斥
        *  s1.m7()           s3.m7()        对象相同会互斥
        *  s1.m7()           s2.m7()        对象相同会互斥

        对于synchronized锁。主要是保证了原子性,在一定程度上保证了内存可见以及重排序的一定约束。

        synchronized锁是较为早期使用的一种方式。Java后来以类和对象的形式重新进行了设计。我们可以在 java.util.concurrent.locks.Lock 查看。其使用方法和synchronized基本相同。此处不再过多介绍。

3. volatile和单例模式

        volatile的作用是什么?volatile在Java中可以用来修饰变量,被其修饰的变量就是让该变量无法缓存在线程的工作内存中。也就是该变量的读取一定是从主内存中读取,写的时候立即写回主内存。(volatile可以保护这些变量的内存可见性问题)我们可以看以下的例子

public class Test01 {
    static boolean quit=false;
    //static volatile boolean quit=false;
    static class MyThread extends Thread{
        @Override
        public void run() {
            int r=0;
            while (quit==false){
                try {
                    Thread.sleep(1000);
                    r++;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
            System.out.println(r);
        }
    }
    public static void main(String[] args) throws InterruptedException {
        MyThread myThread = new MyThread();
        myThread.start();

        Thread.sleep(1000*5);
        quit=true;
    }
}

        以上的方法不会打印,因为我们即使修改了quit的值,但是由于quit的值已经被我们提前缓存到工作内存中,也就是说,该改动并不会被该线程读到。

        如果我们想让该改动生效,我们可以在quit前面加上volatile。这样该线程每次都会从主内存中加载该值,也就是说该改动就生效了。

接下来我们来看一下有关线程安全的一个应用——单例模式

什么是单例模式?单例模式,属于创建类型的一种常用的软件设计模式。通过单例模式的方法创建的类在当前进程中只有一个实例(根据需要,也有可能一个线程中属于单例,如:仅线程上下文内使用同一个实例)单例模式有两种实现方式,第一种是饿汉模式(即在类的加载时就创建好对象)第二种是懒汉模式(在使用的时候再进行初始化)

public class StarvingMode {
    //将构造方法私有化
    private StarvingMode(){}

    //在类加载的时候就执行(jvm保证在类加载的时候是线程安全的,所以饿汉模式天生线程安全)
    private static final StarvingMode instance= new StarvingMode();

    public static StarvingMode getInstance() {
        return instance;
    }
}

以上为饿汉模式,因为类加载的时候是线程安全的,所以饿汉模式天生是线程安全的。

public class LazyModeV3 {
    //此处必须这样,因为重排序可能会造成错误。(一个线程将引用指向不为空,但是没有真正的初始化。然后另外一个线程就会直接return)
    private static volatile LazyModeV3 instance =null;

    //在使用的时候再初始化
    public static  LazyModeV3 getInstance(){
        if(instance==null){
            synchronized (LazyModeV3.class){
                if(instance==null){     //进行二次判断,避免重复new对象
                    instance=new LazyModeV3();
                }
            }
        }
        return instance;
    }
}

        以上为懒汉模式,为什么synchronized要加在判断为空后面?因为只有第一次为null的情况下才需要加锁,后买不为空的情况都是直接return。我们为什么要加上volatile呢?因为重排序问题可能会产生这样的结果(一个线程将引用指向不为空,但是斌没有真正的初始化。然后另外一个线程就会直接return)当我们加上就不会产生这样的问题。为什么要进行二次判断instance不为空?因为如果不在判断一次,就破坏了判断和修改的原子性(一个线程进入判断,另外一个线程也进入判断,因为不为空new了一个实例,然后原线程也new了一个实例。这样就new了两个实例)

4. 阻塞队列及生产者消费者模式

        阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

        Java提供了一系列有关阻塞队列的接口和实现类。具体使用方法如下:

    public static void main(String[] args) throws InterruptedException {
        BlockingQueue queue=new LinkedBlockingQueue<>();
        String take= queue.take();
        System.out.println("此方法永远不会执行!");
    }

        BlockingQueue的一个重要的用途就是可以实现生产者消费者模式。其实生产者与消费者模式就是一个多线程并发协作的模式,在这个模式中呢,一部分线程被用于去生产数据,另一部分线程去处理数据,于是便有了形象的生产者与消费者了。而为了更好的优化生产者与消费者的关系,便设立一个缓冲区,也就相当于一个数据仓库,当生产者生产数据时锁住仓库,不让消费者访问,当消费者消费时锁住仓库,不让生产者访问仓库。

        于是我们发现,为了实现这个效果,我们需要让线程等待某个条件的发生(wait)以及线程需要唤醒另外一个线程(notify)。这两个方法都是属于Object类的方法。注意,以下两个方法必须在synchronized加锁后才能使用。

        1. wait():调用了wait()方法的线程进入等待池进行等待,等待池中的线程不去竞争对象锁,直到其它的线程通知,才会进入锁池。注意:wait()会先释放占有的锁(顺带一提,join()底层使用了wait()来实现)

        2. notify():随机唤醒一个在该对象上等待的线程,被唤醒的线程进行锁池,开始竞争该对锁上的锁。注意:notify()不会释放锁。wait()等待中的线程被notify唤醒之后不会立马执行,被唤醒的对象需要重新竞争锁对象,获得锁的线程可以从wait处继续执行。

        有了以上的基础之后,接下来我们手动实现一个简单的BlockingQueue(简单版本的1V1的阻塞队列)。代码如下:注意该版本不能用于多个生产者和消费者情况下。

public class MyArrayBlockingQueue {
    private long[] array;
    private int frontIndex;
    private int rearIndex;
    private int size;
    public MyArrayBlockingQueue(int capacity){
        array=new long[capacity];
        frontIndex=0;
        rearIndex=0;
        size=0;
    }
    public synchronized void put(long e) throws InterruptedException {
        while (size==array.length){
            this.wait();
        }
        array[rearIndex]=e;
        rearIndex++;
        if (rearIndex==array.length){
            rearIndex=0;
        }
        size++;
        //走到此处,一定可以消费了,于是唤醒消费者
        this.notify();
    }
    public synchronized long take() throws InterruptedException {
        while (size==0){
            this.wait();
        }
        long e=array[frontIndex];
        frontIndex++;
        if(frontIndex==array.length){
            frontIndex=0;
        }
        size--;
        //此处一定可以生产了
        this.notify();
        return e;
    }

5. 定时器

        Java 定时器就是在给定的间隔时间执行自己的任务。主要介绍以下的形式:java.util.Timer和java.util.TimerTask两个类可以实现一些定时器的任务,简单的使用方法如下:

public class TimerTest {
    public static void main(String[] args) {
        //创建一个定时器对象
        Timer timer=new Timer();

        //指定定时任务
        SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        Date parse = null;
        try {
            parse = sdf.parse("2022-05-13 16:49:00");
        } catch (ParseException e) {
            e.printStackTrace();
        }

        Date firstTime=parse;

        
        timer.schedule(new LogTimerTask(),firstTime,1000 * 10);
        Timer timer1 = new Timer();
    }
}
//编写一个定时任务类
class LogTimerTask extends TimerTask {

    @Override
    public void run() {
        //此处编写需要完成的任务
        SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String format = sdf.format(new Date());
        System.out.println(format+":完成备份!!");
    }
}

        定时器的官方实现是通过阻塞队列来实现的,同样是采用了生产者消费者模式。接下来我们来实现一个简单的定时器:

        我们将任务放入阻塞队列之后,可以使用一个线程来专门执行任务(消费者)。对于队列中元素的取出需要根据任务的执行时间来决定,所以会用到一个优先级队列。

MyTimer类

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;

public class MyTimer {
    private final BlockingQueue queue = new PriorityBlockingQueue<>();
    private final Object newTaskComing =new Object();
    public MyTimer() {
        MyTimerWorker worker = new MyTimerWorker(newTaskComing,queue);
        worker.start();
    }
    public void scheduleAtFixedRate(MyTimerTask task, long delay,long period) throws InterruptedException{
        task.period=period;
        // 通过 now 和 delay,计算出 task 应该于何时允许
        long runAt = System.currentTimeMillis() + delay;
        task.runAt = runAt;

        // 把任务放到优先级阻塞队列中
        // 我们是生产者的角色
        queue.put(task);
        synchronized (newTaskComing){       //
            newTaskComing.notify();
        }
    }

    public void schedule(MyTimerTask task, long delay) throws InterruptedException {
        // 通过 now 和 delay,计算出 task 应该于何时允许
        long runAt = System.currentTimeMillis() + delay;
        task.runAt = runAt;

        // 把任务放到优先级阻塞队列中
        // 我们是生产者的角色
        queue.put(task);
        synchronized (newTaskComing){       //
            newTaskComing.notify();
        }
    }
}

MyTimerTask类

public abstract class MyTimerTask implements Comparable {
    public long runAt;  // 这个任务时候应该执行
    public Long period=null;    //一举两得,如果是null正好说明是非周期任务,如果不是null那就是周期

    public abstract void run();

    @Override
    public int compareTo(MyTimerTask o) {
        if (runAt < o.runAt) {
            return -1;
        } else if (runAt > o.runAt) {
            return 1;
        } else {
            return 0;
        }
    }
}

MyTimerWorker类

import java.util.concurrent.BlockingQueue;

// 定时器中的线程要执行的代码
public class MyTimerWorker extends Thread {
    private final BlockingQueue queue;
    private final Object newTaskComing;
    public MyTimerWorker(Object newTaskComing,BlockingQueue queue) {
        this.queue = queue;
        this.newTaskComing = newTaskComing;
    }

    @Override
    public void run() {
        try {
            while (true) {
                MyTimerTask task = queue.take();
                // 怎么判断这个任务是否应该执行了?
                long delay = task.runAt - System.currentTimeMillis();
                if (delay > 0) {
                    //Thread.sleep(delay);
                    synchronized (newTaskComing) {
                        //1.没有新任务或者超时2.有新任务
                        newTaskComing.wait(delay);          //那么是谁在唤醒呢?应该是有新任务的时候,唤醒后如果到时间了就继续执行,否则放回去重新比较
                    }
                    if (task.runAt<=System.currentTimeMillis()) {
                        task.run();
                        if(task.period!=null){
                            task.runAt=System.currentTimeMillis()+task.period;
                            queue.put(task);
                        }
                    }else {
                        queue.put(task);
                    }
                }else {
                    task.run();
                    if(task.period!=null){
                        task.runAt=System.currentTimeMillis()+task.period;
                        queue.put(task);
                    }
                }
            }
        } catch (InterruptedException ignore) {}
    }
}

6. 线程池

6.1 使用线程池的意义

        我们为什么要使用线程池?因为创建和销毁线程是有一定的成本的,频繁的创建和消耗线程的成本很高。于是我们使用一个叫做线程池的东西来减少创建和销毁线程的成本消耗。

6.2 基本使用

        java.util.concurrent包下的Executor接口、ExecutorService接口、ThreadPoolExecutor实现类等。以上的类帮助我们来使用线程池。基本的使用方法如下:

import java.util.Scanner;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class Test {
    public static void main(String[] args) {

        BlockingQueue queue=new ArrayBlockingQueue<>(2);

        AtomicInteger threadId=new AtomicInteger(1);

        ThreadFactory factory=new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                int id=threadId.getAndIncrement();   //类似i++
                String name=String.format("线程池中的工人-%d",id);
                return new Thread(r,name);
            }
        };

        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                3,
                7,
                60, TimeUnit.SECONDS,
                queue,
                factory,
                new ThreadPoolExecutor.AbortPolicy()
        );

        //总员工个数为7个,队列容量为2,所以最多提交9个任务。
        Scanner scanner = new Scanner(System.in);
        for (int i = 1; i <1000 ; i++) {
            final int id=i;         //匿名类使用外部的变量需要该变量是final的
            Runnable commad=new Runnable() {
                @Override
                public void run() {
                    try {
                        TimeUnit.DAYS.sleep(365);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

                @Override
                public String toString() {
                    return String.format("{任务%d}",id);
                }
            };
            scanner.nextLine();
            System.out.println("提交"+commad);
            executor.execute(commad);
        }
    }
}

我们可以梳理一下创建线程的流程:

0. 一开始什么都没有

1. 当要执行任务的时候,如果当前正式工的数量小于corePoolSize,那么我们先将正式员工招满。

2. 当正式工满了之后,还来任务就往阻塞队列(workQueue)里面放。

3. 当阻塞队列满了,还来任务就招临时工。

4. 当员工数达到上限,还有任务,此时我们执行拒绝策略(handler)。

自带的拒绝策略有4种都是静态属性:①.AbortPolicy(抛异常),这个方法是Java默认的。

②. CallerRunsPolicy(交给调用者处理)③. DiscardOldestPolicy(丢弃最老的任务(队首))

④. DiscardPolicy(丢弃当前提交的任务)

注意:这里的正式员工和临时员工是线程的抽象说法。只是为了方便理解而这么说,本质上这些线程没有什么区别。

我们重点来了解一下实现类ThreadPoolExecutord的构造方法。

参数含义
int corePoolSize正式员工数
int maximumPoolSize最大员工数(正式+临时)
long keepAliveTime临时工允许的摸鱼时间
TimerUnit unit时间的单位
BlockingQueue workQueue传递任务的阻塞队列
ThreadFactory threadFactory,创建线程的工厂(方便我们创建线程)
RejectedExecutionHandler handler拒绝策略

6.3 手动实现一个线程池

        其实线程池也是利用了生产者消费者模式。我们可以简单的实现一下,只需要逻辑相同。有些东西就直接用Java现成的了。

        MyThreadPoolExecutor

import java.util.concurrent.*;

public class MyThreadPoolExecutor implements Executor {
    // 创建线程的工厂对象
    private final ThreadFactory threadFactory;
    // 临时工摸鱼的时间上限
    private final long keepAliveTime;
    private final TimeUnit unit;

    //当前正式员工的数量
    private int CurrentCoreSize;
    //正式员工数量上限
    private final int CorePoolSize;
    //临时员工的数量
    private int CurrentTemporarySize;
    //临时员工的数量上限
    private final int TmporaryPoolSize;

    private final BlockingQueue workQueue;
    //向线程池中提交任务
    public MyThreadPoolExecutor(int corePoolSize,
                                int maximumPoolSize,
                                long keepAliveTime,
                                TimeUnit unit,
                                BlockingQueue workQueue,
                                ThreadFactory threadFactory,
                                RejectedExecutionHandler handler){
        this.CorePoolSize=corePoolSize;
        this.TmporaryPoolSize=maximumPoolSize-corePoolSize;
        this.workQueue=workQueue;
        this.threadFactory = threadFactory;
        this.keepAliveTime = keepAliveTime;
        this.unit = unit;

    }
    @Override
    public void execute(Runnable command) {
        //优先创建正式员工
        if(CurrentCoreSize 

CoreJob

import java.util.concurrent.BlockingQueue;

//职责就是取任务完成任务
public class CoreJob implements Runnable{
    private final BlockingQueue workQueue;
    private Runnable firstCommand;

    public CoreJob(BlockingQueue workQueue,Runnable firstCommand) {
        this.workQueue = workQueue;
        this.firstCommand=firstCommand;
    }

    @Override
    public void run() {
        try {
            firstCommand.run();     //优先做刚提交的任务
            firstCommand=null;      //为了不影响GC回收

            while (true){
                Runnable command = workQueue.take();
                command.run();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}

TemporaryJob

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;


public class TemporaryJob implements Runnable {
    // 需要阻塞队列
    private final BlockingQueue workQueue;
    private final long keepAliveTime;
    private final TimeUnit unit;
    private Runnable firstCommand;

    TemporaryJob(long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, Runnable firstCommand) {
        this.keepAliveTime = keepAliveTime;
        this.unit = unit;
        this.workQueue = workQueue;
        this.firstCommand = firstCommand;
    }

    @Override
    public void run() {
        try {
            firstCommand.run();     // 优先先把刚提交的任务先做掉了
            firstCommand = null;    // 这里设置 null 的意思是,不影响 firstCommand 对象被 GC 时的回收

            // 一旦超过一定时间没有任务,临时工是需要退出的
            // 1. keepAliveTime + unit 记录起来
            // 2. 怎么就知道超过多久没有任务了?如果一定时间内都无法从队列中取出来任务,则认为摸鱼时间够了
            while (!Thread.interrupted()) {
//                Runnable command = workQueue.take();
                Runnable command = workQueue.poll(keepAliveTime, unit);
                if (command == null) {
                    // 说明,没有取到任务
                    // 说明超时时间已到
                    // 说明该线程已经 keepAliveTime + unit 时间没有工作了
                    // 所以,可以退出了
                    break;
                }
                command.run();
            }
        } catch (InterruptedException ignored) {}
    }
}

7. 其他相关知识

7.1 关于锁

        读锁(共享锁、S锁)写锁(独占锁、X锁):我们之前讨论的锁都是写锁,也就是在这种情况下,即使两个线程的操作都是读,这两个线程也都是互斥的。而读锁也就是共享锁,允许同时读取一个对象,在均为读操作时不互斥。

        重入锁(ReentrantLock)和不可重入锁:我们之前见过可重入锁,可重入锁允许同一个线程重复加同一把锁。而不可重入锁就不可以。

        公平锁和不公平锁:一个线程处于锁着的状态,其他请求锁失败的线程都进入了等待的队列,如果这个线程的锁解开了,然后有个后来的线程也来请求锁,如果这个线程运气好取得了锁,说明这个锁是不公平锁。如果这个线程主动让出然后进入等待队列,则称这个锁为公平锁。

        乐观锁和悲观锁:严格来说,这两个不能称之为锁,只是实现并发控制的两种方案。乐观锁经过评估后,多个线程同时修改一个共享资源的情况较为少见,可以采用轻量级方式(无锁)进行并发控制。悲观锁则是认为多个线程同时修改一个共享资源的情况较为频繁,必须使用互斥(锁)的方式来进行并发控制。

        互斥锁与自旋锁:我们之前讨论的都是互斥锁,对于互斥锁,如果资源已经被占用,资源申请者只能进入睡眠状态。但是自旋锁不会引起调用者睡眠,如果自旋锁已经被别的执行单元保持,调用者就一直循环在那里看是否该自旋锁的保持者已经释放了锁,"自旋"一词就是因此而得名。自旋锁比较适用于目前的计算机多核的情况。

        synchronized锁属于可重入、不公平、独占锁。

7.2 synchronized的优化

        锁消除优化:有时候为了保证线程安全,我们每个方法都加了synchronized锁,但是当我们是单线程的时候,这些加锁、释放锁的操作都是无用功。所以编译器+JVM在判断出只有一个线程的时候,就会消除所有的锁,来提升性能。

        锁粗化优化:已经没有办法进行锁消除优化的时候,如果操作过程类似于

加锁 操作 解锁 加锁 操作 解锁 加锁 操作 解锁。。。这样的情况时,会将锁的粒度进行粗化,让加锁解锁不要那么频繁。

        锁升级(膨胀):无锁、偏向锁、轻量级锁、重量级锁。锁的级别按照上述先后顺序依次升级,我们把这个升级的过程称之为“锁膨胀”。

总结:

Java多线程比较多比较杂,一定要注意理解,并且一定要以线程的角度来看待。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/880668.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号