栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Juc并发编程11——一网打尽常用并发容器、阻塞队列使用与原理

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Juc并发编程11——一网打尽常用并发容器、阻塞队列使用与原理

前言

本文将介绍常用的并发容器,比较传统容器与并发容器的区别,介绍并发容器的基本原理。是面试常考、工作常用的热门知识点。

1.传统容器安全吗?

运行如下代码。

    public static void main(String[] args) throws InterruptedException {
        ArrayList arr = new ArrayList<>();
        Runnable r = () -> {
            for (int i = 0; i < 100; i++) {
                arr.add("hello");
            }
        };
        for (int i = 0; i < 100; i++) {
            new Thread(r).start();
        }
        TimeUnit.SECONDS.sleep(1);
        System.out.println(arr.size());

    }

其结果如下。
按照堆栈信息查看ArrayList第663行。

    public boolean add(E e) {
        ensureCapacityInternal(size + 1);  // 确保容量
        elementData[size++] = e; // 这一行报错
        return true;
    }

我们看源码发现,ArrayList是有确保容量的操作的,为何会在多线程环境下报错呢?

试想ArrayList刚好占满的情况,线程1执行完ensureCapacityInternal后,线程2获得了cpu,进行ensureCapacityInternal发现容量足够,没有扩容,并且执行了元素增加操作。此时线程1重新获取到cpu资源,又执行元素增加操作,就出现了树组下标越界了。

总的来说,就是由于ArrayList的扩容与元素增加操作为非原子性操作,导致出现了并发安全的问题。

再来看看HashMap。

public class Demo02 {
    public static void main(String[] args) throws InterruptedException {
        HashMap map = new HashMap<>();
        for (int i = 0; i < 100; i++) {
            int tmp = i;
            new Thread(() -> {
                for (int j = 0; j < 100; j++) {
                    map.put(1000 * tmp + j, "a");
                }
            }).start();
        }
        TimeUnit.SECONDS.sleep(2);
        System.out.println(map.size());

    }
}

其输出如下。不是预期值10000.实际上,实际上,它还可能引起Entry对象出现环状的数据结构,导致死循环。

2.常用并发容器介绍

如何才能够解决容器遇到的并发问题呢?我们首先想到的是使用Synchoronized进行加锁的操作。早期的一些容器比如Vector或者Hashtable就是这么做的。不过众所周知,它们的效率实在是太底了。因此现在我们很少使用它们了。

JUC为我们提供了专门用于并发场景的容器。

2.1 CopyOnWriteArrayList
    public static void main(String[] args) throws InterruptedException {
        List arr = new CopyOnWriteArrayList<>();
        Runnable r = () -> {
            for (int i = 0; i < 100; i++) {
                arr.add("hello");
            }
        };
        for (int i = 0; i < 100; i++) {
            new Thread(r).start();
        }
        TimeUnit.SECONDS.sleep(1);
        System.out.println(arr.size());

    }

其结果如下。

看看它add方法的源码吧。

    public boolean add(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] elements = getArray();
            int len = elements.length;
            Object[] newElements = Arrays.copyOf(elements, len + 1);
            newElements[len] = e;
            setArray(newElements);
            return true;
        } finally {
            lock.unlock();
        }
    }

原来是使用了ReentrantLock锁呀。可以看到,它实际上是在加锁以后,获取并复制树组进行了操作,最后再一次性setArray。

因为读一般是线程安全的,并且在add时也是先操作复制的树组,再一次性进行setArray数组替换操作,get方法并没有加锁。

  public E get(int index) {
        return get(getArray(), index);
    }
2.2 ConcurrentHashMap
 public static void main(String[] args) throws InterruptedException {
        Map map = new ConcurrentHashMap<>();
        for (int i = 0; i < 100; i++) {
            int tmp = i;
            new Thread(() -> {
                for (int j = 0; j < 100; j++) {
                    map.put(1000 * tmp + j, "a");
                }
            }).start();
        }
        TimeUnit.SECONDS.sleep(2);
        System.out.println(map.size());

    }

其运行结果如下。
因为多个线程的操作可能会争抢同一把锁,在之前介绍LongAdder时,提到过,既然线程间都竞争同一把锁,我不防多搞几把锁,进行压力分散。在jdk7及之前,ConcurrentHashMap采用了类似的策略,它将数据进行分段,每一个小段加一把锁,这样当一个线程获取锁时,仅仅锁了一小段,不会影响其它段的数据被其它线程访问。

在jdk8以后,ConcurrentHashMap采用CAS配合锁机制进行实现。

我们先来回顾下HashMap的实现原理吧。


Hash表本质上是一个用于存放后续节点头节点的数组,数组中的每一个元素都是一个头节点(或者说是一个链表)。当我们添加一个数据时,会先通过Hash算法计算得到它的Hash值,对应找到它的数组下标,在该位置的链表后插入数据。当链表的长度到达8时,会将链表自动转换为红黑树,这会使其查询时可以使用二分法进行查询,效率大幅度的提升。

因为ConncurrentHashMap的源码比较复杂,我们从简单的地方入手。其构造方法的结构与HashMap大体相似:维护一个哈希表,哈希表中存放的要么是链表要么是红黑树。这里就不去看了,先看看put方法吧。

	public V put(K key, V value) {
    return putVal(key, value, false);
    }

    
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException(); // 赋值不能为空,基本操作
        int hash = spread(key.hashCode()); // 计算键的哈希值,用于锁定在哈希表中的位置
        int binCount = 0; // 用于记录链表长度
        for (Node[] tab = table;;) { // 无限循环,CAS自旋锁;table:哈希表
            Node f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0) //1.如果哈希表为空
                tab = initTable(); // 初始化哈希表,之后进入下一轮循环
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 2.如果该位置为空,说明哈希表中该位置还没有头节点(注意这里f指向了头节点所在的位置)
                if (casTabAt(tab, i, null,
                             new Node(hash, key, value, null))) // 进行CAS操作插入节点作为头节点
                    break;                   // CAS成功则直接break跳出put方法,否则说明有其它线程在操作,跳出当前循环,进入下一轮循环
            }
            else if ((fh = f.hash) == MOVED) //3.头节点的哈希值为-1,说明正在进行扩容
                tab = helpTransfer(tab, f); //帮助其进行数据迁移,完事后进入下一轮循环
            else { //4.除了以上特殊情况以外的正常插入情况
                V oldVal = null;
                synchronized (f) { //将头节点进行加锁,防止同一时间其它线程也在操作哈希表中该位置的链表或者红黑树
                    if (tabAt(tab, i) == f) { 
                        if (fh >= 0) { // 4.1 如果头节点的哈希值>=0,说明是链表,针对链表进行操作
                            binCount = 1;
                            for (Node e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) { // 4.2 如果f是TreeBin,说明是红黑树
                            Node p;
                            binCount = 2;
                            if ((p = ((TreeBin)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                // 根据链表长度决定链表是否要进化成为红黑树
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i); //如果当前哈希表的长度小于64,不会进化成红黑树,优先考虑扩容,这是因为其长度约长,对于并发场景处理支持越好(结合下面的图片理解)
                    if (oldVal != null)
                        return oldVal;
                    break;
                } 
            }
        }  // for end
        addCount(1L, binCount);
        return null;
    }

下图总结了put插入节点的过程:对hash表中的某个节点进行单独加锁。这样是不是也可以在保证线程安全的情况下降低由于竞争所带来的性能损耗呀。因此ConcurrentHashMao在进行扩容时的机制是,如果当前哈希表的长度小于64,不会进化成红黑树,优先考虑扩容,这是因为其长度约长,对于并发场景处理支持越好(结合下面的图片理解)
接着来看看get方法吧。

 public V get(Object key) {
        Node[] tab; Node e, p; int n, eh; K ek;
        int h = spread(key.hashCode()); //计算哈希值
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) { 
            // 1.如果头节点就是我们要找的,直接返回
            if ((eh = e.hash) == h) { 
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            // 2.哈希值为负数,说明要么在扩容,要么就是红黑树
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
           // 3.链表的情形,进行遍历查找
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        // 4.没找到只能返回null了
        return null;
    }
3.阻塞队列 3.1 阻塞队列的介绍

除了常用的并发容器以外,juc还提供了各种阻塞队列,适用于不同的工作场景。先看看接口BlockingQueue。

public interface BlockingQueue extends Queue {
    
    boolean add(E e);

   //入队,如果成功则返回true,否则如果队列已满,插入失败返回false(非阻塞)
    boolean offer(E e);

    // 入队(阻塞,直到队列有容量可以插入元素为止)
    void put(E e) throws InterruptedException;

   //入队,阻塞,直到入队成功、超时或者中断为止
    boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;

   // 出队 ,阻塞,如果队列为空阻塞线程直到能够出队为止  
    E take() throws InterruptedException;

  //出队,阻塞,直到入队成功、超时或者中断为止
    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;

   //在理想情况下(无内存或者资源限制),可以不阻塞的入队的容量,如无限制则返回Integer.MAX_VALUE
    int remainingCapacity();

    
    boolean remove(Object o);

    public boolean contains(Object o);

   // 一次性从BlockingQueue获取所有可用的对象
    int drainTo(Collection c);

   
    int drainTo(Collection c, int maxElements);
}

阻塞队列会阻塞线程,不废话,上图。
还记得消费者与生产者模式吗?利用阻塞队列可以很轻松的进行实现。
下面我们来实战下。假设有2个厨师,3个顾客,一个厨师炒一个菜的时间是3s,一个顾客吃掉一个菜的时间是4s。窗口一次只能够放一个菜。

public class Demo03 {
    public static void main(String[] args) throws InterruptedException {
        ArrayBlockingQueue queue = new ArrayBlockingQueue<>(1);
        Runnable supplier = () -> {
            while (true) {
                String name = Thread.currentThread().getName();
                System.out.println(currentTimeInFormat() + "Thread " + name + " to prepare food..." );
                try {
                    TimeUnit.SECONDS.sleep(3);
                    queue.put(new Object());
                } catch (InterruptedException exception) {
                    exception.printStackTrace();
                }
                System.out.println(currentTimeInFormat()  + "Thread " + name + "prepare  food finished..");
            }
         
        };

        Runnable comsumer = () -> {
            while (true) {
                String name = Thread.currentThread().getName();
                System.out.println(currentTimeInFormat() + "Thread " + name + " wait ..." );
                try {
                    queue.take();
                    System.out.println(currentTimeInFormat() + "Thread " + name + " eat food..." );
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException exception) {
                    exception.printStackTrace();
                }
                System.out.println(currentTimeInFormat() + "Thread " + name + "eat  food finished..");
            }
        };

        for (int i = 0; i < 2; i++) {
            new Thread(supplier, "s[" + i + "]").start();
        }

        for (int i = 0; i < 3; i++) {
            new Thread(comsumer, "c[" + i + "]").start();
        }
        TimeUnit.SECONDS.sleep(1);
    }

    public static String currentTimeInFormat() {
        SimpleDateFormat format = new SimpleDateFormat("HH:mm:ss");
        return "[" + format.format( new Date()) + "]" ;
    }
}

 

输出结果如下哟。

常见的阻塞队列有:
ArrayBlockingQueue:带缓冲的阻塞队列(即添加容量限制)
SynchonousBlockingQueue:无缓冲的阻塞队列(相当于容量为1的ArrayBlockingQueue)
LinkedBlockingQueue:无界的阻塞队列,基于链表实现,可以限制容量,可以实现阻塞

3.2 ArrayBlockingQueue源码分析

先看看它的构造方法。

   final ReentrantLock lock;

   private final Condition notEmpty;
    
   private final Condition notFull;

   public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

   public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair); // 采用指定的公平/非公平锁实现并发安全
        notEmpty = lock.newCondition();  //两个condition,之后用于线程入队和出队的阻塞控制
        notFull =  lock.newCondition(); 
    }

接下来看看put方法与offer方法是如何实现的。

     public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock; //加锁
        lock.lock();
        try {
            // 非阻塞,直接判断队列是不是满的,如果满返回false
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }
 public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock; //加锁
        lock.lockInterruptibly(); //lock锁可以响应中断
        try {
            while (count == items.length)
                notFull.await(); //如果队列满,将当前线程挂起,直到其它线程出队时将线程唤醒
            enqueue(e); // 被唤醒后入队
        } finally {
            lock.unlock();
        }
    }

接下来看看出队操作deQueue.

private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }

(未完待续)

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号