栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

十一、自定义线程池

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

十一、自定义线程池

一、简介 1. 为什么引入
  • 一个任务过来,一个线程去负责做。如果每次过来都去创建新的线程,性能低且比较耗费内存
  • 线程并不是越多越好,如果线程数多于CPU的核心数,由于每次线程切换,要保存原来线程饿的状态,开启现在的线程,势必会更加耗费资源
  • 充分利用已有线程,去处理原来的任务
2. 自定义线程池
  • 线程池(消费者):用来保存一定数量的线程来处理任务
  • 生产者:用来不断的产生任务
  • 阻塞队列:用来平衡消费者和生产者之间任务处理的一个等待队列

 1. 生产者生产任务的速度较快,多余的任务需要存储在blocking queue中
 2. 生产者生产任务速度慢,那么线程池中也需要在blocking queue中等

 3. blocking queue:用来平衡生产者和消费者之间的一个桥梁 
二、自定义线程池 1. 阻塞队列
package com.dreamer.multithread.day03;

import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;


public class BlockingQueue {

    
    private Deque queue = new ArrayDeque();

    
    private ReentrantLock lock = new ReentrantLock();

    
    private Condition producerRoom = lock.newCondition();

    
    private Condition consumerRoom = lock.newCondition();

    
    private int capacity;

    public BlockingQueue(int capacity) {
        this.capacity = capacity;
    }

    
    public T getTaskWithTimes(long timeout, TimeUnit timeUnit) {
        lock.lock();
        try {
            // 统一时间管理为nano,将s,h,d转换为纳秒
            long nanos = timeUnit.toNanos(timeout);
            while (queue.isEmpty()) {
                try {
                    if (nanos <= 0) {
                        return null;
                    }
                    
                    nanos = consumerRoom.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            T t = queue.removeFirst();
            producerRoom.signalAll();
            return t;
        } finally {
            lock.unlock();
        }
    }

    
    public T getTask() {
        lock.lock();
        try {
            // while 防止虚假唤醒,不能用if
            while (queue.isEmpty()) {
                try {
                    System.out.println(" no task,consumer waiting....");
                    consumerRoom.await();
                    // 被唤醒后,再次进入while循环,先判断是否为空,因为可能刚添加的任务已经被消费了
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            // 唤醒后执行返回,并唤醒等待添加的线程
            T t = queue.removeFirst();
            producerRoom.signalAll();
            return t;
        } finally {
            lock.unlock();
        }
    }

    
    public void addTask(T t) {
        lock.lock();
        try {
            while (queue.size() == capacity) {
                try {
                    System.out.println("full task, producer waiting");
                    producerRoom.await();
                    // 唤醒后先再次检查,不为空,执行添加, 并唤醒在等着消费的线程
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            queue.addLast(t);
            consumerRoom.signalAll();
        } finally {
            lock.unlock();
        }
    }

    
    public int getSize() {
        lock.lock();
        try {
            return queue.size();
        } finally {
            lock.unlock();
        }
    }
}
2. 线程池
package com.dreamer.multithread.day03;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;

public class ThreadPool {
    
    private BlockingQueue blockingQueue;

    
    private Set workers = new HashSet();

    
    private int threadNumber;

    
    private long timeout;
    private TimeUnit timeUnit;

    
    public ThreadPool(int blockingQueueCapacity, int threadNumber, long timeout, TimeUnit timeUnit) {
        blockingQueue = new BlockingQueue<>(100);
        this.threadNumber = threadNumber;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
    }

    
    public synchronized void execute(Runnable task) {
        

        
        if (workers.size() < threadNumber) {
            Worker workerThread = new Worker(task);
            System.out.println("新增worker对象:" + workerThread);
            workerThread.start(); // 开启线程执行任务
            workers.add(workerThread);
        } else {
            System.out.println("线程池已满,加入等待队列中");
            blockingQueue.addTask(task);
        }
    }

    
    private class Worker extends Thread {
        private Runnable task;

        public Worker(Runnable task) {
            this.task = task;
        }

        
        @Override
        public void run() {

            
            // 检查当前的任务,检查阻塞队列中是否有任务
            // 如果没有任务,线程会尝试一直去阻塞等待:程序不会结束,只需要换一个超时方法即可
            // task = blockingQueue.getTaskWithTimes(2, TimeUnit.SECONDS)) != null) {
            while (task != null || (task = blockingQueue.getTask()) != null) {
                // 执行任务时候可能有异常
                try {
                    task.run();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    task = null;//本次执行完毕,置为null,继续进入下一个循环
                }
            }

            // 一旦执行完当前任务,并且阻塞队列中没其他任务,则将该线程关闭,并挪出线程池
            synchronized (workers) {
                System.out.println("任务执行完毕,线程已经被移除池子");
                workers.remove(this);
            }
        }
    }
}
3. 测试
package com.dreamer.multithread.day03;

import java.util.concurrent.TimeUnit;

public class TestPool {
    public static void main(String[] args) {
        ThreadPool pool = new ThreadPool(100, 3, 5, TimeUnit.SECONDS);
        for (int i = 0; i < 5; i++) {

            int round = i;
            
            pool.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println("本次任务是打印:" + round);
                }
            });
        }
    }
}
三、拒绝策略
  • 如果多个任务都在等待线程池中的线程来执行,但是当前所有线程都很忙
  • 多个任务就会被加入到blocking queue中,但是如果超过了blocking queue的最大值
  • 除非blocking queue中的任务被其他干完活的线程开始处理了,否则blocking queue就一直添加不进去任务
# 对于添加不进阻塞队列的任务,这个任务是由调用方来发起的
1. 一直等待
2. 超时等待,过时不加
3. 让调用者自己放弃添加
4. 让调用者抛出异常
5. 让调用者自己执行任务
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/309565.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号