栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

【JAVA多线程】DelayQueue 源码分析

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

【JAVA多线程】DelayQueue 源码分析


目录
    • DelayQueue
      • 原理
      • 实例
      • 成员变量
      • 生产元素
      • 消费元素

DelayQueue

非常适合指定时间之后,才能让消费者获取到的场景。

队列 的头部Delayed是过去延迟过期最远的元素。如果没有延迟过期,则没有 headpoll 并将返回null。当元素的 getDelay(TimeUnit.NANOSECONDS)方法返回的值小于或等于零时,就会发生过期。即使无法使用takeorpoll删除未过期的元素,它们也会被视为普通元素。例如,该size方法返回过期和未过期元素的计数。此队列不允许空元素。

原理

DelayQueue是一个没有边界BlockingQueue实现,加入其中的元素必需实现Delayed接口。

当生产者线程调用put之类的方法加入元素时,会触发元素中的compareTo方法进行排序,也就是说队列中元素的顺序是按到期时间排序的,而非它们进入队列的顺序。排在队列头部的元素是最早到期的,越往后到期时间赿晚。

消费者线程查看队列头部的元素,然后调用元素的getDelay方法,如果此方法返回的值小0或者等于0,则消费者线程会从队列中取出此元素,并进行处理。如果getDelay方法返回的值大于0,则消费者线程wait返回的时间值后,再从队列头部取出元素,此时元素应该已经到期。

DelayQueue是Leader-Followr模式的变种,消费者线程处于等待状态时,总是等待最先到期的元素,而不是长时间的等待。消费者线程尽量把时间花在处理任务上,最小化空等的时间,以提高线程的利用效率。

实例

最优的消费者线程的个数: 任务数/单个任务处理时间的最大值

生产者同一时间2任务,单个任务处理时间的最大值2, 消费者2

生产者同一时间3任务,单个任务处理时间的最大值2, 消费者3

生产者同一时间4任务,单个任务处理时间的最大值2, 消费者4

规律:消费者线程数与生产者同一时间生产的任务数和消费者任务处理时间成正比。

如果元素进入队列的速度很快,元素之间的到期时间相对集中,而处理每个到期元素的速度又比较慢的话,则队列会越来越大,队列后边的元素延期处理的时间会越来越长。

import java.util.concurrent.*;

public class Test {

    public static void main(String[] args) throws InterruptedException {
        DelayQueue delayQueue = new DelayQueue<>();
        new ProductThread(delayQueue).start();
        for (int i = 0; i < 10; i++) {
            new ConsumerThread(delayQueue).start();
        }

    }

    private static class DelayElem implements Delayed {
        
        private final long delay;

        public long getExpire() {
            return expire;
        }

        
        private final long expire;
        
        private final String msg;

        private DelayElem(long delay, String msg) {
            this.delay = delay;
            this.msg = msg;
            //到期时间 = 当前时间+延迟时间
            this.expire = System.currentTimeMillis() + this.delay;
        }
        
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(this.expire - System.currentTimeMillis() , TimeUnit.MILLISECONDS);
        }
        
        @Override
        public int compareTo(Delayed o) {
            return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
        }
        @Override
        public String toString() {
            return "DelayElem{" +
                    "delay=" + delay +
                    ", expire=" + expire +
                    ", msg='" + msg + ''' +
                    '}';
        }
    }

    private static class ProductThread extends Thread {
        private final DelayQueue delayQueue;
        private ProductThread(DelayQueue delayQueue) {
            this.delayQueue = delayQueue;
        }
        @Override
        public void run() {
            for(int i = 0; i < 10; i++) {
                DelayElem element = new DelayElem(1000+ (long) (Math.random() * 1000),i+"test");
                delayQueue.offer(element);
                System.out.println(System.currentTimeMillis() + " 放入元素 " + i);
            }
        }
    }

    private static class ConsumerThread extends Thread {
        private final DelayQueue delayQueue;
        private ConsumerThread(DelayQueue delayQueue) {
            this.delayQueue = delayQueue;
        }
        @Override
        public void run() {
            while (true){
                try {
                    DelayElem element =  delayQueue.take();
                    long startTime = System.currentTimeMillis();
                    System.out.println(startTime +" 获取元素:" + element+"延迟:"+(startTime - element.getExpire()));
                    try {
                        Thread.sleep(2000);//执行任务
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
成员变量
private final transient ReentrantLock lock = new ReentrantLock();


private final Condition available = lock.newCondition();


private final PriorityQueue q = new PriorityQueue();



private Thread leader = null;
生产元素
public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.offer(e); // 优先级队列入队
            if (q.peek() == e) {
                leader = null;
                available.signal();//通知等待锁的线程
            }
            return true;
        } finally {
            lock.unlock();
        }
    }
消费元素
 public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();//可中断
        try {
            for (;;) {
                E first = q.peek();// 获取第一个元素
                if (first == null)
                    available.await();//阻塞进行等待
                else {
                    long delay = first.getDelay(NANOSECONDS);// 获取第一个元素的延迟时间
                    if (delay <= 0)
                        return q.poll();// 如果时间已经到了,直接返回元素。
                    first = null; // don't retain ref while waiting
                    if (leader != null)
                        available.await();// 当前有其他线程再处理,则进入等待。
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;// 设置 leader 为当前线程
                        try {
                            available.awaitNanos(delay);// 执行等待
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();// 释放锁并且通知其他等待线程
            lock.unlock();
        }
    }
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/866084.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号