栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

那就手写一个线程池玩一下吧

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

那就手写一个线程池玩一下吧

        最近在学JUC,学到线程池,感觉简简单单调用API对线程池的理解不够透彻,于是我决定,手写一个线程池。

        首先,要手写一个线程池,就需要了解线程池具体的概念,它实现了什么功能,具体的组件是什么。以大哥李写的Executors为参考,下面先对线程池做一个简单的介绍。

        线程池顾名思义,是一个装线程的一个池子。它可以创建,销毁线程,并且给这些线程安排任务。线程池被设计出来肯定是为了解决一些技术问题。技术问题就是比如创建一个线程需要T1的时间,销毁一个线程需要T2的时间,执行这个线程的任务需要T3的时间。一般来说,创建和销毁线程所需要的时间都很多,为了节省T1和T2的时间,使用线程池这个技术,可以避免多次创建,多次销毁线程,实现线程的复用。

        线程池的具体组件:

  1. 线程池管理器(ThreadPool):用于创建并管理线程池,包括 创建线程池,销毁线程池,添加新任务;
  2. 工作线程(PoolWorker):线程池中线程,在没有任务时处于等待状态,可以循环的执行任务;
  3. 任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等;
  4. 任务队列(taskQueue):用于存放没有处理的任务。提供一种缓冲机制。

        先写任务队列

//使用泛型
class BlockingQueue{
    //1.任务队列
    private Deque queue = new ArrayDeque<>();

    //2.锁
    private ReentrantLock lock = new ReentrantLock();

    //3.生产者条件变量
    private Condition fullWaitSet = lock.newCondition();

    //4.消费者条件变量
    private Condition emptyWaitSet = lock.newCondition();

    //5.容量
    private int capcity;

    public BlockingQueue(int capcity) {
        this.capcity = capcity;
    }

    public T pull(long timeout, TimeUnit unit){
        lock.lock();
        try{
            //转换为纳秒
            long nanos =unit.tonanos(timeout);
            while(queue.isEmpty()){
                try{
                    if(nanos<=0){
                        return null;
                    }
                    //返回的是剩余的等待时间
                   nanos = emptyWaitSet.awaitNanos(nanos);
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
            }
            T t = queue.removeFirst();
            fullWaitSet.signal();
            return t;
        }finally {
            lock.unlock();
        }

    }

    //阻塞获取
    public T take(){
        lock.lock();
        try{
            while(queue.isEmpty()){
                try{
                emptyWaitSet.await();
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
            }
            T t = queue.removeFirst();
            fullWaitSet.signal();
            return t;
        }finally {
            lock.unlock();
        }
    }

    //阻塞添加
    public void put(T element){
        lock.lock();
        try{
            while(queue.size() == capcity) {
                try {
                    fullWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                queue.addLast(element);
                emptyWaitSet.signal();
            }
        }finally {
            lock.unlock();
        }

    }
    //获取大小
    public int size(){
        lock.lock();
        try {
            return queue.size();
        } finally {
            lock.unlock();
        }
    }
}

然后写工作线程

 class Worker extends Thread {
        private Runnable task;

        public Worker(Runnable task) {
            this.task = task;
        }
        @Override
        public void run(){
            //执行任务
            //1)当task不为空,则执行任务
            //2)当task执行完毕,接着从任务队列获取任务并执行
            while(task != null || (task = taskQueue.pull(timeout,timeUnit)) != null){
                try{
                    log.debug("正在执行。。。{}",task);
                    task.run();
                }catch (Exception e){
                    e.printStackTrace();
                }finally {
                    task = null;
                }
            }
            synchronized (workers){
                log.debug("worker被移除{}",this);
                workers.remove(this);
            }
        }
    }
}

最后写线程池

class ThreadPool{
    //任务队列
    private BlockingQueue taskQueue;

    //线程集合
    private HashSet workers = new HashSet<>();

    //线程核心数
    private int coreSize;

    //获取任务超时时间
    private long timeout;

    //时间单位
    private TimeUnit timeUnit;

    public ThreadPool(int queueCapcity,int coreSize, long timeout, TimeUnit timeUnit) {
        this.taskQueue = new BlockingQueue<>(queueCapcity);
        this.coreSize = coreSize;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
    }
    //执行任务
    public void execute(Runnable task){
        //当任务数没有超过coreSize时,直接交给worker对象执行
        //如果任务超过coreSize,加入任务队列暂存
        synchronized (workers){
            if(coreSize>workers.size()){
                Worker worker = new Worker(task);
                log.debug("新增worker{}",worker);
                workers.add(worker);
                worker.start();
            }else{
                log.debug("加入任务队列{}",task);
                taskQueue.put(task);
            }
        }
    }

测试代码:

@Slf4j(topic = "c.TestPool")
public class TestPool {
    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(10,3,1000,TimeUnit.MILLISECONDS);
        for(int i = 0; i < 10;i++){
            int j = i;
            log.debug("增加任务{}",j);
            threadPool.execute(()->{
                log.debug("{}",j);
            });
        }
    }
}

测试结果:

 

出现问题,线程池中的线程执行完一个任务后,不会再接着去执行任务,而是等到时间结束之后自动被移出,代码问题不知道出在那里,有会的大佬希望能指点一下。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/658242.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号