栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

聊聊juc常用的并发流程控制工具

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

聊聊juc常用的并发流程控制工具

文章目录

并发控制流程概览CountDownLatch

简介示例

工厂中,质检,5个工人检查,所有人都认为通过,才通过以运动员听枪比赛示例介绍countDownLatch确保多线程同时开始的示例 信号量

简介使用方式

使用示例 注意事项 条件对象

简介示例

基础使用示例基于条件对象完成生产者、消费者模式 循环栅栏对象

简介示例与countdownlatch区别 参考文献

并发控制流程概览

CountDownLatch 简介

倒计时门闩,只有倒计时门闩值变为0时,阻塞状态才会结束。
常用api

countDownLatch(int count)//构造函数,参数count为需要倒数的值。


awiait()//调用这个方法的线程会被挂起,直到count为0才继续执行。


countDown()//调用一次这个方法,会将count减1。

示例 工厂中,质检,5个工人检查,所有人都认为通过,才通过
package flowcontrol.countdownlatch;

import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CountDownLatchbaseUse {
    public static void main(String[] args) {
        CountDownLatch latch = new CountDownLatch(5);
        ExecutorService threadPool = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 5; i++) {
            int no = i + 1;

                Runnable runnable = new Runnable() {
                    @Override
                    public void run() {
                        try {
                            Thread.sleep((long) (Math.random() * 10000));
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }finally {
                            latch.countDown();
                        }
                        System.out.println("No" + no + "完成了检查");
                    }
                };
            threadPool.execute(runnable);

        }
        System.out.println("等待所有检查完成");
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("所有人都完成了工作,进入下一个环节。");
        threadPool.shutdown();

    }
}

以运动员听枪比赛示例介绍countDownLatch确保多线程同时开始的示例
package flowcontrol.countdownlatch;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class CountDownLatchbaseUse2 {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch=new CountDownLatch(1);
        ExecutorService threadPool= Executors.newFixedThreadPool(5);
        for (int i = 0; i < 5; i++) {
            int finalI = i+1;
            Runnable runnable=new Runnable() {
                @Override
                public void run() {
                    System.out.println("no"+ finalI +"准备完成,等待枪响");
                    try {
                        countDownLatch.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("no"+ finalI +"冲向终点");
                }
            };

            threadPool.execute(runnable);
        }
        Thread.sleep(5000);
        System.out.println("开枪,比赛开始。。。。。");
        countDownLatch.countDown();
        threadPool.shutdown();

    }
}

信号量 简介

信号量常用于控制多线程使用有限资源的场景

使用方式
    初始化Semaphore并指定许可证的数量。需要信号量使用acquire()来获取,只要有剩余就会分配。当使用完毕后,使用release()释放信号量。
使用示例
public class SemaphorebaseUse {
    //加true则实现公平策略线程会按照顺序来
    static Semaphore semaphore = new Semaphore(5,true);

    public static void main(String[] args) {
        ExecutorService threadPool = Executors.newFixedThreadPool(50);
        for (int i = 0; i < 50; i++) {
            threadPool.submit(new Task());
        }
        threadPool.shutdown();
    }

    static class Task implements Runnable {


        @Override
        public void run() {

            try {
                semaphore.acquire(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println(Thread.currentThread().getName() + "拿到了许可证");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            semaphore.release(3);
            System.out.println(Thread.currentThread().getName() + "释放了许可证");
        }
    }
}
注意事项
    获取和释放的时候都可以指定数量,但是要保持一致。公平性设置为true会更加合理并不必须由获取许可证的线程释放许可证。可以是A获取,B释放。
条件对象 简介

当A线程需要等待某个条件的时候,它就去执行condition.await()方法,一旦执行了await()方法,线程就会进入阻塞状态。

如果线程B执行condition.signal()方法,则JVM就会从被阻塞线程中找到等待该condition的线程。当线程A收到可执行信号的时候,他的线程状态就会变成Runnable可执行状态。

示例 基础使用示例
public class ConditionbaseUse1 {
    private ReentrantLock reentrantLock = new ReentrantLock();
    private Condition condition = reentrantLock.newCondition();

    public static void main(String[] args) {
        ConditionbaseUse1 conditionbaseUse1=new ConditionbaseUse1();
        //注意这里必须先让唤醒线程开启,并且将其设置为休眠,保证它开启,且能够让出cpu时间片给等待线程运行
        new Thread(()->{
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            conditionbaseUse1.notifyCondition();
        }).start();

        conditionbaseUse1.waitCondition();
    }


    void waitCondition() {
        reentrantLock.lock();
        System.out.println("等待条件完成中。。。。。");
        try {
            condition.await();
            System.out.println("条件完成,开始执行业务逻辑。。。。。");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            reentrantLock.unlock();
        }
    }

    void notifyCondition() {
        reentrantLock.lock();
        try {
            System.out.println("条件完成,通知其他线程");
            condition.signal();
        } finally {
            reentrantLock.unlock();
        }
    }

}
基于条件对象完成生产者、消费者模式
package flowcontrol.condition;

import java.util.PriorityQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionbaseUse2 {
    private PriorityQueue queue = new PriorityQueue<>(10);
    private int queueSize = 10;
    private ReentrantLock lock = new ReentrantLock();
    private Condition notFull = lock.newCondition();
    private Condition notEmpty = lock.newCondition();

    public static void main(String[] args) {
        ConditionbaseUse2 conditionbaseUse2=new ConditionbaseUse2();
        Consumer consumer=conditionbaseUse2.new Consumer();
        Producer producer=conditionbaseUse2.new Producer();
        producer.start();
        consumer.start();
    }

    class Consumer extends Thread {
        @Override
        public void run() {
            consumer();
        }

        void consumer() {
            while (true) {
                lock.lock();
                try {
                    if (queue.size() == 0) {
                        System.out.println("队列已空,等待生产");
                        notEmpty.await();
                    }

                    queue.poll();
                    notFull.signal();
                    System.out.println("从队列消费一个数据,当前剩余 " +  queue.size());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }

            }
        }



    }

    class Producer extends Thread {
        @Override
        public void run() {
            producer();
        }

        void producer() {
            while (true) {
                lock.lock();
                try {
                    if (queue.size() == queueSize) {
                        System.out.println("队列已满,等待消费");
                        notFull.await();
                    }

                    queue.offer(1);
                    notEmpty.signal();
                    System.out.println("向队列插入一个元素,当前剩余空间 " + (queueSize - queue.size()));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }

            }
        }
    }
}
循环栅栏对象 简介

直到指定数量的线程都到达同一个点,然后才一起继续执行。

示例
package flowcontrol.cyclicbarrier;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierbaseUse1 {


    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> {
            System.out.println("人已到齐,准备出发");
        });

        //循环栅栏可以复用
        for (int i = 0; i <10 ; i++) {
            new Thread(new Task(i,cyclicBarrier)).start();
        }
    }


    static class Task implements Runnable {
        private int id;
        private CyclicBarrier cyclicBarrier;

        public Task(int id, CyclicBarrier cyclicBarrier) {
            this.id = id;
            this.cyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run() {
            System.out.println("线程" + id + "前往集合地点");
            try {
                Thread.sleep((long) (Math.random() * 10000));
                System.out.println("线程" + id + "到达集合地点,等待其他人到达");
                cyclicBarrier.await();
                System.out.println("线程" + id + "准备出发");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }

        }
    }
}

与countdownlatch区别
    countdownlatch用户事件,循环栅栏作用于线程循环栅栏可重复使用,countdownlatch则不能
参考文献

控制并发流程

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/732321.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号