栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

JUC之阻塞队列

JUC之阻塞队列

目录

1. 什么是阻塞队列:2.为什么需要阻塞队列:3.阻塞队列的种类分析:4.代码演示阻塞队列的核心方法:SynchronousQueue介绍:使用阻塞队列实现生产者消费者模式:

自定义一个阻塞队列(数组实现):

1. 什么是阻塞队列:

阻塞队列(BlockingQueue)是这样的一种数据结构,它是一个队列(类似于一个List), 是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

1.当阻塞队列为空时,从队列中获取元素的操作将会被阻塞。
2.当阻塞队列为满时,从队列里添加元素的操作将会被阻塞。

1.当队列中没有数据的情况下,消费者端的所有线程都会被自动阻塞(挂起),直到有数据放入队列。
2.当队列中填满数据的情况下,生产者端的所有线程都会被自动阻塞(挂起),直到队列中有空的位置,线程被自动唤醒


如上图所示:当队列中没有数据的情况下,消费者端的所有线程都会被自动阻塞(挂起),直到有数据放入队列。

如上图所示:当队列中填满数据的情况下,生产者端的所有线程都会被自动阻塞(挂起),直到队列中有空的位置,线程被自动唤醒。

注意:
添加:只有当阻塞队列满了的时候才阻塞等待,其他时候都不阻塞,一直添加,但是每次添加完都会立即通知等待的消费者来消费

移除:只有当阻塞队列为空的时候才阻塞等待,其他时候都不阻塞,一直移除,但是每次移除消费完会立即通知等待的生产者生产

JDK 文档提到的几个特点:

BlockingQueue 不接受 null 元素。试图 add、put 或 offer 一个 null 元素时,某些实现会抛出 NullPointerException。null 被用作指示 poll 操作失败的警戒值。BlockingQueue 可以是限定容量的。它在任意给定时间都可以有一个 remainingCapacity,超出此容量,便无法无阻塞地 put 附加元素。没有任何内部容量约束的 BlockingQueue 总是报告 Integer.MAX_VALUE 的剩余容-量。BlockingQueue 实现主要用于生产者-使用者队列,但它另外还支持 Collection 接口。因此,举例来说,使用 remove(x) 从队列中移除任意一个元素是有可能的。然而,这种操作通常不 会有效执行,只能有计划地偶尔使用,比如在取消排队信息时。BlockingQueue 实现是线程安全的。所有排队方法都可以使用内部锁或其他形式的并发控制来自动达到它们的目的。然而,大量的 Collection 操作(addAll、containsAll、retainAll 和 removeAll,这些方法尽可能地少使用)没有 必要自动执行,除非在实现中特别说明。因此,举例来说,在只添加了 c 中的一些元素后,addAll© 有可能失败(抛出一个异常)。 2.为什么需要阻塞队列:

为何需要阻塞队列:因为在生活中,我们的一些需求是不得不阻塞,如去海底捞吃火锅,里面坐满人的话,只能在外面阻塞等待,因为我们不能丢失顾客,只能阻塞等待。

在多线程领域:所谓阻塞,在某些情况下会挂起线程(阻塞),一旦条件满足,被挂起的线程又会自动被唤醒。

好处是我们作为BlockingQueue的使用者,再也不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue都给你一手包办了。

3.阻塞队列的种类分析:

Java里的阻塞队列:

JDK7提供了7个阻塞队列。分别是:

ArrayBlockingQueue : 一个由数组结构组成的有界阻塞队列。
linkedBlockingQueue : 由链表结构组成的有界(但是默认值大小为Integer,MAX_VALUE)阻塞队列。
PriorityBlockingQueue : 一个支持优先级排序的无界阻塞队列。
DelayQueue: 一个使用优先级队列实现的无界阻塞队列。
SynchronousQueue: 不存储元素的队列,也即单个元素的队列。没有容量,每一个put操作必须等待一个take操作,否则不能添加元素,反之亦然。
linkedTransferQueue: 一个由链表结构组成的无界阻塞队列。
linkedBlockingDeque: 一个由链表结构组成的双向阻塞队列。

BlockingQueue接口 与 BlockingDeque 接口:

注意,BlockingQueue,BlockingDeque 这两个都是接口

JDK提供的阻塞队列中,linkedBlockingDeque 是一个 Deque(双向的队列),其实现的接口是 BlockingDeque;其余6个阻塞队列则是 Queue(单向队列),实现的接口是 BlockingQueue。

对于 BlockingQueue 的阻塞队列提供了四种处理方法:

抛出异常: 是指当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException(“Queue full”)异常。当队列为空时,从队列里获取元素时会抛出NoSuchElementEx·ception异常 。

返回特殊值: 插入方法会返回是否成功,成功则返回true。移除方法,则是从队列里拿出一个元素,如果没有则返回null

一直阻塞: 当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到拿到数据,或者响应中断退出。当队列空时,消费者线程试图从队列里take元素,队列也会阻塞消费者线程,直到队列可用。

超时退出: 当阻塞队列满时,队列会阻塞生产者线程一段时间,如果超过一定的时间,生产者线程就会退出。

抛出异常 与 返回特殊值 方法的实现是一样的,只不过对失败的操作的处理不一样!通过 AbstractQueue 的源码可以发现,add(e),remove(),element() 都是分别基于 offer(),poll(),peek() 实现的:

public boolean add(E arg0) {
   	if (this.offer(arg0)) {//add方法本质调用this.offer(arg0)
   		return true;
   	} else {
   	//----这里就是add方法当添加满是抛出的异常------
   		throw new IllegalStateException("Queue full");
   	}
   }

   public E remove() {
   	Object arg0 = this.poll();//remove方法本质调用this.poll()
   	if (arg0 != null) {
   		return arg0;
   	} else {
   		throw new NoSuchElementException();
   	}
   }

   public E element() {
   	Object arg0 = this.peek();
   	if (arg0 != null) {
   		return arg0;
   	} else {
   		throw new NoSuchElementException();
   	}
   }
4.代码演示阻塞队列的核心方法:

1.抛出异常的方法:add/remove:
add方法源码:向阻塞队列增加新元素,返回是否增加成功:不成功抛异常

public boolean add(E e) {
        return super.add(e);
}
//上面的add方法调用下面的add方法
public boolean add(E e) {
        if (offer(e))
            return true;
        else
        //----这里就是add方法当添加满是抛出的异常------
            throw new IllegalStateException("Queue full");
}

//调用的offer方法
    public boolean offer(E e) {
        Objects.requireNonNull(e);
        //此处证明阻塞队列是一个线程安全的队列
        final ReentrantLock lock = this.lock;//这里是可重入锁
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }
//调用入队的方法enqueue
    private void enqueue(E e) {
        // assert lock.isHeldByCurrentThread();
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        //this.items即final Object[] items,即声明的阻塞队列的数组
        final Object[] items = this.items;
        //putIndex表示最后插入元素的index
        items[putIndex] = e;//将元素存入相应的下标位置,putIndex开始为0
        //添加元素后,下标加一,加一后的下标如果等于数组长度,下标重置为0
        // 如果队列满了即++putIndex == items.length, 那么回归回队首,赶紧让消费者来取元素
        if (++putIndex == items.length) putIndex = 0;
        count++;//数组中实际存储的元素个数加一
        //notEmpty = lock.newCondition();在前面构造方法中进行声明了
        //Condition for waiting takes(消费者的条件队列:notEmpty:通知消费者队列取出数据
        notEmpty.signal();//没有满的条件队列被唤醒
    }
//注意---- if (++takeIndex == items.length) takeIndex = 0;
//这行代码体现了这个数组的可重用,为了下一次还可以重新添加或者移除元素

队列思想:

注意: if (++takeIndex == items.length) takeIndex = 0;这行代码体现了这个数组的可重用,为了下一次还可以重新添加或者移除元素;

remove方法源码:从阻塞队列移除元素,返回的是被移除的那个元素,如果队列为空,抛异常

//此方法在抽象类AbstractQueue中
 public E remove() {
        E x = poll();
        if (x != null)
            return x;
        else
        //----这就是移除不成功抛出异常的代码,即当队列为空移除不成功---
            throw new NoSuchElementException();
}
//调用poll方法
    public E poll() {
        final ReentrantLock lock = this.lock;//这里是可重入锁ReentrantLock 
        lock.lock();
        try {
        //---如果队列中元素个数为0,返回null,否则调用出队方法dequeue----
            return (count == 0) ? null : dequeue();
        } finally {
            lock.unlock();
        }
    }
    //出队的方法
    private E dequeue() {
        // assert lock.isHeldByCurrentThread();
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        //获取当前对象的存储元素的数组items
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        //获取出队的元素e,在队头
        E e = (E) items[takeIndex];
        //然后将队头元素置为空
        items[takeIndex] = null;
        //出队的下标到了数组的末尾了,即移除全部元素了/队列为空,则takeIndex = 0回到队首
        //然后赶紧通知生产者线程赶紧 存数据
        if (++takeIndex == items.length) takeIndex = 0;
        count--;//数组实际元素数量减一
        if (itrs != null)
            itrs.elementDequeued();
        //通知生产者线程 干活 了,notFull是生产者的条件队列
        //notFull:Condition for waiting puts
        notFull.signal();
        return e;
    }

代码演示add/remove/element方法:

package com.fan.blockqueue;
import java.util.concurrent.ArrayBlockingQueue;

public class BlockQueueTest {
    public static void main(String[] args) {
        ArrayBlockingQueue blockingQueue =
                new ArrayBlockingQueue<>(3);//有界,即容量是3
        //1.向阻塞队列中添加元素
        System.out.println(blockingQueue.add("a"));//true
        System.out.println(blockingQueue.add("b"));//true
        System.out.println(blockingQueue.add("c"));//true
        //超出设定的容量则add方法抛出异常java.lang.IllegalStateException: Queue full
        //System.out.println(blockingQueue.add("4"));

        //2.查看队首元素:返回队首元素
        System.out.println("查看队首元素:"+blockingQueue.element());

        //3.从阻塞队列中移除元素
        System.out.println("------从阻塞队列中移除元素-----");
        System.out.println(blockingQueue.remove());
        System.out.println(blockingQueue.remove());
        System.out.println(blockingQueue.remove());
        //当队列中没有元素的时候还移除元素:抛异常java.util.NoSuchElementException
        //System.out.println(blockingQueue.remove());
    }
}

2.特殊值的方法:offer/poll:


向阻塞队列中添加元素:offer方法源码:

public boolean offer(E e) {//添加的方法
        Objects.requireNonNull(e);
        final ReentrantLock lock = this.lock;//使用ReentrantLock进行了加锁
        lock.lock();
        try {
            if (count == items.length)//当元素实际个数等于 数组容量时,即队列满了
                return false;//添加失败返回false
            else {
                enqueue(e);//入队的方法
                return true;//添加成功返回true
            }
        } finally {
            lock.unlock();
        }
}

private void enqueue(E e) {//入队的方法
        // assert lock.isHeldByCurrentThread();
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = e;//将元素放入数组中
        if (++putIndex == items.length) putIndex = 0;
        count++;
        notEmpty.signal();
}

阻塞队列中移除元素:poll()方法源码:

public E poll() {//队列中移除元素方法
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (count == 0) ? null : dequeue();//调用出队方法
        } finally {
            lock.unlock();
        }
 }
    
private E dequeue() {//出队方法
        // assert lock.isHeldByCurrentThread();
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E e = (E) items[takeIndex];
        items[takeIndex] = null;
        //出队的下标到了数组的末尾了,即移除全部元素了,则takeIndex = 0从头开始
        if (++takeIndex == items.length) takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();//唤醒生产者线程 干活了
        return e;
}

代码演示offer/poll方法:

package com.fan.blockqueue;
import java.util.concurrent.ArrayBlockingQueue;
public class BlockQueueTest2 {
    public static void main(String[] args) {
        ArrayBlockingQueue blockingQueue =
                new ArrayBlockingQueue<>(3);//3个有界容量
        //1.向阻塞队列中增加元素
        System.out.println(blockingQueue.offer("a"));//true
        System.out.println(blockingQueue.offer("b"));//true
        System.out.println(blockingQueue.offer("c"));//true
        //当阻塞队列满的时候,再添加元素,会返回false,不再抛异常:
        //System.out.println(blockingQueue.offer("4"));//false

        //2.查看阻塞队列队首元素:
        System.out.println("查看队首元素:"+blockingQueue.peek());

        //3.从阻塞队列的队首移除元素
        System.out.println("从阻塞队列的队首移除元素-----");
        System.out.println(blockingQueue.poll());//a
        System.out.println(blockingQueue.poll());//b
        System.out.println(blockingQueue.poll());//c
        System.out.println(blockingQueue.poll());//null
    }
}

3.阻塞的方法:put/take:

put方法源码:

    public void put(E e) throws InterruptedException {
        Objects.requireNonNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
        //当阻塞队列满的时候要阻塞等待
            while (count == items.length)
                //---注意:这里没有抛异常,也没有返回特殊值,而是调用等待方法----
                notFull.await();//生产者线程要阻塞等待
            enqueue(e);//调用入队方法,和前面的方法一样
        } finally {
            lock.unlock();
        }
    }

take方法源码:

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
        //当阻塞队列为空的时候消费者线程阻塞等待,不能取数据
        //注意:这里没有抛异常,也没有返回特殊值,而是调用等待方法
            while (count == 0)
                notEmpty.await();
            return dequeue();//调用出队方法
        } finally {
            lock.unlock();
        }
    }
//

    private E dequeue() {
        // assert lock.isHeldByCurrentThread();
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E e = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length) takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return e;
    }

代码演示put/take:

package com.fan.blockqueue;
import java.util.concurrent.ArrayBlockingQueue;

public class BlockQueueTest3 {
    public static void main(String[] args) throws InterruptedException {
        //创建阻塞队列:
        ArrayBlockingQueue blockingQueue =
                new ArrayBlockingQueue<>(3);
        blockingQueue.put("a");
        blockingQueue.put("b");
        blockingQueue.put("c");
        //队列满时put此方法一直阻塞,使得程序不能停止
        //blockingQueue.put("4");

        System.out.println(blockingQueue.take());
        System.out.println(blockingQueue.take());
        System.out.println(blockingQueue.take());
        //当队列为空时,take方法一直阻塞等待---
        //System.out.println(blockingQueue.take());
    }
}

4.超时退出的方法:offer(e,time,unit)/poll(time,unit)

代码演示超时方法:

		//创建阻塞队列:
        ArrayBlockingQueue blockingQueue =
                new ArrayBlockingQueue<>(3);
		//增加元素
        System.out.println(blockingQueue.offer("a", 1L, TimeUnit.SECONDS));//true
        System.out.println(blockingQueue.offer("b", 1L, TimeUnit.SECONDS));//true
        System.out.println(blockingQueue.offer("c", 1L, TimeUnit.SECONDS));//true
        System.out.println(blockingQueue.offer("4", 2L, TimeUnit.SECONDS));//false

        //取元素
        System.out.println(blockingQueue.poll(2L,TimeUnit.SECONDS));
        System.out.println(blockingQueue.poll(2L,TimeUnit.SECONDS));
        System.out.println(blockingQueue.poll(2L,TimeUnit.SECONDS));
        System.out.println(blockingQueue.poll(2L,TimeUnit.SECONDS));
SynchronousQueue介绍:

SynchronousQueue:0库存的阻塞队列:

一种阻塞队列,没有容量,其中每个插入操作必须等待另一个线程的对应移除操作 ,反之亦然。

SynchronousQueue 的几个特点:

同步队列没有任何内部容量,甚至连一个队列的容量都没有。 所以很多继承的方法就没有用了,(如 isEmpty()始终返回true,size()为0,包含contain、移除remove 都始终为false 等等)。或者说,真正有意义的只有以下几个方法:获取并移除(poll()、poll(timeout,timeunit)、take())、插入(offer()、offer(timeout,timeunit)、put());

适合于传递性设计,在这种设计中, 每一个put操作必须等待一个take操作,反之亦然 。(当然,如果用的是offer、poll的话,那么就不会阻塞等待)。SynchronousQueue可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。

支持可选的公平排序策略。 默认情况下不保证这种排序。但是,使用公平设置为 true 所构造的队列可保证线程以 FIFO 的顺序进行访问。

package com.fan.blockqueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

public class BlockQueueTest4 {
    public static void main(String[] args) throws InterruptedException{
        BlockingQueue synchronousQueue =
                new SynchronousQueue<>();
        System.out.println("synchronousQueue容量:"+synchronousQueue.size());
        new Thread(()->{
            try {
                //注意,此线程没有加锁
                System.out.println(Thread.currentThread().getName()+
                        "t put  1");
                synchronousQueue.put("1");//在此处会阻塞的---只有等这个消费完了才继续生产

                System.out.println(Thread.currentThread().getName()+
                        "t put  2");
                synchronousQueue.put("2");

                System.out.println(Thread.currentThread().getName()+
                        "t put  3");
                synchronousQueue.put("3");

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"AA").start();

        new Thread(()->{
            //暂定一会线程,让AA先运行
            try {
                //每次等待AA先运行,然后BB取出元素
                try { TimeUnit.SECONDS.sleep(3);}
                catch (InterruptedException e) {e.printStackTrace();}
                //3秒后,BB线程去取元素
                System.out.println(Thread.currentThread().getName()+
                        "t 取出元素:"+synchronousQueue.take());

                //每次等待AA先运行存,然后BB取出元素
                try { TimeUnit.SECONDS.sleep(3);}
                catch (InterruptedException e) {e.printStackTrace();}
                System.out.println(Thread.currentThread().getName()+
                        "t 取出元素:"+synchronousQueue.take());
                //每次等待AA先运行存,然后BB取出元素
                try { TimeUnit.SECONDS.sleep(3);}
                catch (InterruptedException e) {e.printStackTrace();}
                System.out.println(Thread.currentThread().getName()+
                        "t 取出元素:"+synchronousQueue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"BB").start();

    }
}

应用范围:

多线程编程的思路:

    线程 操作(方法) 资源类;高内聚,低耦合;(资源类高内聚低耦合的一些方法,自身操作变量的同步方法)判断并等待–>干活–>通知唤醒;防止虚假唤醒机制;
package com.fan.blockqueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

//定义资源类,声明一个加一同步方法和减一的同步方法
class ShareNum{
    int num = 0;
    Lock lock = new ReentrantLock();
    //获取等待队列,用于等待唤醒
    Condition condition = lock.newCondition();
    //定义操作方法,这个方法是高内聚的
    public void increment(){//+1的方法
        lock.lock();
        try {
            //1.判断等待
            while(num != 0){
                condition.await();
            }
            //干活,等于0时干活
            num++;
            //打印线程名字的方法要写在资源类的方法中,不能写在main方法中
            System.out.println(Thread.currentThread().getName()+
                    ":"+num);
            //3.通知剩余的线程
            condition.signalAll();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
    //减一的方法
    public void decrement(){
        lock.lock();
        try {
            //1.判断什么时候等待
            while(num != 1){
                condition.await();
            }
            //2.什么时候干活,==1时
            num--;
            System.out.println(Thread.currentThread().getName()+
                    ":"+num);
            //3.通知其他线程
            condition.signalAll();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }

}

public class ProConsumerTest {
    public static void main(String[] args) {
        ShareNum shareNum = new ShareNum();
        new Thread(()->{
            for (int i = 1; i <=5 ; i++) {
                shareNum.increment();
            }
        },"AA").start();

        new Thread(()->{
            for (int i = 1; i <=5 ; i++) {
                shareNum.decrement();
            }
        },"BB").start();
    }
}

conditon参考这个:https://juejin.cn/post/6871976482726477838

使用阻塞队列实现生产者消费者模式:

在分析阻塞队列之前我们先看生产者消费者模式,这是一个很常见的模式,生产者负责数据的生产,而消费者则负数据的消费。一般来说生产者与消费者的数量比例是m:n,该模式最大的好处就是将数据生产方与消费方进行了解耦,使得它们之间不会互相影响。为了将生产者和消费者连接起来,我们需要一个特殊的容器,该容器能存储生产者生产的数据,而消费者则能从该容器中取出数据。

如图解释生产者和消费者:

我们可以通过厨师、桌子、顾客来说明生产者消费者模式,厨师就好比生产者,他们制作生产出来美食将放到桌子这个容器中,顾客则好比是消费者,他们从桌子上取出美食进行消费享用。

阻塞队列:

生产者消费者模式的核心部分就是生产者和消费者之间的那个特殊容器,我们通过实现一个线程安全且具有一定策略的容器便连接起两端的生产者和消费者。这个容器可以具有队列性质,也可以具有栈性质,亦或是其它数据结构。最常见的就是阻塞队列,队列保证了先进的数据先出,而阻塞则是队列已满时和队列为空时的处理策略,即队列已满时的入队操作和队列为空时的出队操作都会引起阻塞。

下图是阻塞队列工作示意图,线程一、线程二、线程三生产的数据通过put操作进行入队,线程四、线程五通过take操作进行出队,当队列满时put操作会阻塞等待消费者将队列的元素拿走,当队列为空时take操作会阻塞等待生产者将数据入队。

模拟实现
根据前面对阻塞队列的介绍,我们试着来模拟实现一个简单的阻塞队列。先看数据结构的设计,可以使用一个数组来存放队列的元素,并通过head/takeIndex(出队从头部出)和tail/putIndex(入队从尾部入)指针来约束先进先出规则。入队操作使用tail指向的位置,而出队则使用head指向的位置,一旦到达数组尾部就重新从头开始。

下面看看具体的实现,Object数组用于保存元素,size表示队列的大小,此外还有head和tail指针。通过构造函数来指定阻塞队列大小,生产者生产的数据调put方法进行入队,如果size等于队列最大长度时则调用wait阻塞(此时队列已经满了),否则将元素保存到队列中,同时维护size和tail,最后如果size等于1时要调用notifyAll方法通知消费者可以消费了。消费者通过调用take方法进行数据消费,如果size等于0时则调用wait阻塞(队列为空需要等待),否则通过head获取队列头的元素,同时维护size和head,最后如果size等于queue.length-1时调用notifyAll方法通知生产者可以生产了。

自定义一个阻塞队列(数组实现):

(尽量靠近源码):https://cloud.tencent.com/developer/article/1773744

package com.fan.blockqueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

//手写一个ArrayBlockingQueue
public class MyArrayBlockingQueue {
    //使用数组模拟阻塞队列,其实也是就模拟队列,加锁阻塞
    final Object [] items;//元素数组
    //声明头指针和尾指针,int数组下标相当于头尾指针,开始都在0
    int takeIndex = 0;//头指针,从头拿取出队,头指针移动
    int putIndex = 0;//尾指针,从尾部添加入队,尾指针移动
    int count;//数组中实际元素的个数
    //声明可重入锁
    ReentrantLock lock = null;
    //绑定两个条件队列,一个用于拿取通知,一个用于生产的通知
    Condition notEmpty = null;//消费者的条件队列,没有空就可以通知拿取
    Condition notFull = null;//生产着的条件队列,没有满就可以通知生产

    //声明有参数的构造方法,参数是数组容量
    public MyArrayBlockingQueue(int capacity) {
        if (capacity <= 0)//容量<0,抛异常
            throw new IllegalArgumentException();
        this.items = new Object[capacity];//初始化数组元素
        //可以将lock,和Condition的初始化放在构造方法中
        lock = new ReentrantLock();
        notEmpty = lock.newCondition();
        notFull = lock.newCondition();
    }

    //队列,就是先进先出原则,从队尾增加,从队头删除
    //定义非阻塞的增加方法,返回特殊值//offer/poll/peek
    //1.队列尾部增加元素
    public boolean offer(Object o){
        //声明可重入锁,构造中已初始化
        ReentrantLock lock = this.lock;
        lock.lock();//加锁,入队的操作是线程安全的
        try {
            //入队操作,这里我们写一个入队的方法enqueue
            //阻塞队列的一个要求就是,当队列满的时候是不能再入队的
            if(count == items.length){
                return false;//当队列满了直接返回false,
            }else{//队列没满,直接入队操作,并返回true
                enqueue(o);
                return true;
            }
        }finally {
            lock.unlock();//解锁
        }
    }

    //2.队列头删除元素
    public Object poll() {
        //引用锁
        ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (count == 0) ? null : dequeue();
        }  finally {
            lock.unlock();
        }
    }
    //3.定义一个阻塞的入队方法put
    public void put(Object o) throws InterruptedException {
        Object[] items = this.items;
        ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //当队列满的时候,生产者条件队列等待
            //1.判断什么时候等待
            while(count == items.length){
                notFull.await();
            }
            //2.干活
            enqueue(o);//3.通知在enqueue方法内
        } finally {
            lock.unlock();
        }
    }
    //4.定义一个出队的阻塞方法take,取数据的方法返回值一直是元素
    public Object take() throws InterruptedException {
        //先引用数组和lock
        Object[] items = this.items;
        ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //1.判断什么时候等待
            while(count == 0){
                notEmpty.await();//消费者线程等待
            }
            //2.干活
            return dequeue();//注意,通知在dequeue出队方法内部的最后
            //3.通知
        } finally {
            lock.unlock();
        }
    }

    //定义入队的方法
    private void enqueue(Object o){//这里建议用泛型
        Object[] items = this.items;//定义成局部的数组
        //1.当数组没有满的时候,直接存,即count != items.length
        items[putIndex] = o;//直接入队,前面已经通过count判断这里没有满的
        putIndex++;//尾指针 后移,头指针不动,
        //2.判断尾指针  有没有到末尾,到末尾了,直接从头又开始
        if(putIndex == items.length ){//
            putIndex = 0;//从新从头开始,
        }
        //3.
        count++;//元素数量+1
        //4.通知唤醒消费者可以消费,notEmpty为消费者条件队列
        notEmpty.signal();
    }


    public Object dequeue(){//无条件出队,因为前面已经判断过count了
        //先引用当前对象数组
        Object[] items = this.items;
        //1.1拿到要出队的第一个队首元素,一会返回
        Object o = items[takeIndex];
        //1.2然后直接将第一个位置置为null,即移除
        items[takeIndex] = null;
        takeIndex++;//指针后移
        //2.判断是否到队尾,如果到队尾,则从头开始再来
        if(takeIndex == items.length){
            takeIndex = 0;
        }
        count--;//元素个数-1
        //3.通知唤醒生产者线程
        notFull.signal();
        return o;
    }
    //显示队列元素
    public void showArray(){
        for (Object item : items) {
            System.out.println(item);
        }
    }
    
	public int size(){
        return count;
    }

}

测试类:

package com.fan.blockqueue;

public class MyArrayBlockingQueueTest {
    public static void main(String[] args) throws InterruptedException {
        MyArrayBlockingQueue arrayBlockingQueue =
                new MyArrayBlockingQueue(3);
        

        arrayBlockingQueue.put(1);
        arrayBlockingQueue.put(2);
        arrayBlockingQueue.put(3);
        //arrayBlockingQueue.put(4);
        arrayBlockingQueue.showArray();
        System.out.println("出队====");
        System.out.println(arrayBlockingQueue.take());
        System.out.println(arrayBlockingQueue.take());
        System.out.println(arrayBlockingQueue.take());
        System.out.println(arrayBlockingQueue.take());
    }
}

使用两个线程测试:

package com.fan.blockqueue;
import lombok.SneakyThrows;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

public class MyArrayBlockingQueueTest {
    MyArrayBlockingQueue arrayBlockingQueue =
            new MyArrayBlockingQueue(3);
    
    public static void main(String[] args) throws InterruptedException {

        MyArrayBlockingQueueTest test1 = new MyArrayBlockingQueueTest();
        new Thread(test1.new Producer()).start();
        new Thread(test1.new Consumer()).start();
    }
    //内部类:
    class Producer implements Runnable {
        @SneakyThrows
        @Override
        public void run() {
           while(true){
               try { TimeUnit.SECONDS.sleep(1);}
               catch (InterruptedException e) {e.printStackTrace();}
               arrayBlockingQueue.put(UUID.randomUUID().toString().substring(0,4));
               System.out.println("生产商品-----总共有:"+arrayBlockingQueue.size());
           }
        }
    }

    class Consumer implements Runnable {
        @SneakyThrows
        @Override
        public void run() {
            while(true){
                try { TimeUnit.MILLISECONDS.sleep(500);}
                catch (InterruptedException e) {e.printStackTrace();}
                arrayBlockingQueue.take();
                System.out.println("消费商品:剩余总数:"+arrayBlockingQueue.size());
            }
        }
    }

}

打印:
消费商品:剩余总数:0
生产商品-----总共有:1
生产商品-----总共有:1
消费商品:剩余总数:0
生产商品-----总共有:1
消费商品:剩余总数:0
生产商品-----总共有:1
消费商品:剩余总数:0

阻塞队列实现生产者消费者模式超级简单,它提供开箱即用支持阻塞(和唤醒)的方法put()和take(),开发者不须要写困惑的wait-nofity代码去实现通讯。BlockingQueue 一个接口,Java5提供了不一样的现实,如ArrayBlockingQueue和linkedBlockingQueue,二者都是先进先出(FIFO)顺序。而ArraylinkedQueue是天然有界的,linkedBlockingQueue可选的边界。下面这是一个完整的生产者消费者代码例子,对比传统的wait、nofity代码,它更易于理解。

案例一:

package com.fan.blockqueue;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

class MyResource{
    private volatile boolean FLAG = true;//默认开启,进行生产和消费
    private AtomicInteger atomicInteger = new AtomicInteger();
    BlockingQueue blockingQueue = null;

    public MyResource(BlockingQueue blockingQueue) {
        this.blockingQueue = blockingQueue;
        System.out.println(blockingQueue.getClass().getName());
    }
    //生产方法
    public void myProd() throws InterruptedException {
        String data = null;
        boolean returnValue;
        while(FLAG){
            data = atomicInteger.incrementAndGet()+"";
            returnValue = blockingQueue.offer(data,2L, TimeUnit.SECONDS);
            if(returnValue){
                System.out.println(Thread.currentThread().getName()+
                        "t 插入队列"+data+"成功");
            }else{
                System.out.println(Thread.currentThread().getName()+
                        "t"+"插入队列"+data+"失败");
            }
            TimeUnit.SECONDS.sleep(1);
        }
        System.out.println(Thread.currentThread().getName()+
                "t 老板叫停,表示flag=false,生产停止");
    }

    public void myConsumer() throws InterruptedException {
        String result = null;
        while(FLAG){
           result = blockingQueue.poll(2L,TimeUnit.SECONDS);
            if(null == result || result.equalsIgnoreCase("")){
                FLAG = false;
                System.out.println(Thread.currentThread().getName()+
                        "t 超过2秒没有取到蛋糕,消费退出");
                System.out.println();
                return;
            }
            System.out.println(Thread.currentThread().getName()+
                    "t消费队列蛋糕"+result+ "成功");
        }
    }
    public void stop() throws Exception{
        this.FLAG = false;
    }
}
public class BlockQueueDemo1 {
    public static void main(String[] args) throws Exception {
        MyResource myResource = new MyResource(new ArrayBlockingQueue<>(10));
        new Thread(()->{
            System.out.println(Thread.currentThread().getName()+"t生产者线程启动--");
            try {
                myResource.myProd();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"prod").start();

        new Thread(()->{
            System.out.println(Thread.currentThread().getName()+"t消费者线程启动--");
            try {
                myResource.myConsumer();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"consumer").start();

        //暂停一会
        try { TimeUnit.SECONDS.sleep(6);}
        catch (InterruptedException e) {e.printStackTrace();}
        System.out.println();
        System.out.println();
        System.out.println("5秒钟到了,老板main线程叫停,活动结束");
        myResource.stop();
    }
}

案例二:

package com.fan.blockqueue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

//资源类
class MyResource3{
    private BlockingQueue blockingQueue = new ArrayBlockingQueue(3);
    //线程操作资源类
    //向资源池中添加资源
    public  void add() throws InterruptedException {
        try {
            //put自带锁和通知唤醒方法
            try { TimeUnit.SECONDS.sleep(1);}//模拟生产耗时1秒
            catch (InterruptedException e) {e.printStackTrace();}
            //put方法是自带锁的阻塞唤醒方法,不需要我们写锁,通知和唤醒
            blockingQueue.put(1);
            System.out.println("生产者"+Thread.currentThread().getName()+
                    "生产一件资源,当前资源池有"+blockingQueue.size()+"个资源");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    //向资源池中移除资源
    public  void remove(){
        try {
            try { TimeUnit.SECONDS.sleep(1);}//模拟消费耗时1秒
            catch (InterruptedException e) {e.printStackTrace();}
            Object take = blockingQueue.take();//自带锁和通知唤醒方法
            System.out.println("消费者" + Thread.currentThread().getName() +
                    "消耗一件资源," + "当前资源池有" + blockingQueue.size()
                    + "个资源");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
//使用阻塞队列BlockingQueue解决生产者消费者
public class BlockQueueDemo2 {
    public static void main(String[] args) {
        MyResource3 myResource3 = new MyResource3();
        //这里可以通过for循环的次数控制生产者和消费者的比例,来模拟缓存区的缓存剩余情况
        for (int i = 1; i <= 5 ; i++) {//请变换生产者和消费者数量进行测试
            //模拟两个生产者线程
            new Thread(()->{
                while(true){//循环生产
                    try {
                        myResource3.add();//生产数据
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            },String.valueOf(i)).start();
        }

        for (int i = 1; i <= 2 ; i++) {//5个消费者
            new Thread(()->{
                while (true){//循环消费
                    myResource3.remove();
                }
            },String.valueOf(i)).start();
        }
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/780722.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号