栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

并发编程之线程池

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

并发编程之线程池

目录

前言

ThreadPoolExecutor

构造方法

提交方法

Executors

newFixedThreadPool

newCachedThreadPool

newSingleThreadExecutor

线程池大小

CPU密集型运算

I/O密集型运算

ScheduledThreadPoolExecutor

Fork/join

三个模块

自定义线程池


前言

线程池,说通俗一点就是一个阻塞队列;生产者不停的提交新的任务,消费者不停的执行任务,而这两者之间是会有时差的,那么这个时候就需要一个消息队列,来存放这些任务,以平衡消费者生产者之间的资源,然他们都能专注于自己的事情

线程池主要解决两个问题:一 是当执行大量异步任务时线程池能够提供较好的性能 在不使用线程池时,每当需要执行异步任务时直接 new 个线程来运行,而线程的创建和销毁是 要开销的 线程池里面的线程是可复用的 ,不 需要每次 执行 异步任务时都重新创建和销毁线程 。 二 是线程池 也提供了一 种资源限制和管理的手段,比如可以限制线程的个数, 动态新增线程等每个 ThreadPoo!Executor 也保留一 些基本 的统计 数据, 比如当前线 池完成的任务数目 等

ThreadPoolExecutor
  • 线程池的基本接口是ExecutorService,ThreadPoolExecutor继承了 AbstractExecutorService而AES又实现了ExecutorService继承了Executor

  • ThreadPoolExecutor使用一个AtomicInteger(如果用两个int,为了保证原子性那么就需要两次cas耗性能)的高三位表示线程状态,其余29位表示线程数量;默认是Running线程数是0

  • 线程池的状态TERMINATED>TIDYING>STOP>SHUTDOWN>RUNNING

线程状态高三位是否接受新任务是否处理阻塞任务说明
Running111YY运行中接受新任务并且处理阻塞队列任务
SHUTDOWN000NY一种较为温柔的停止会把目前的任务处理掉但是不接受新任务了
STOP001NN一种暴力的停止不管是当前任务还是阻塞任务全部抛弃
TIDYING010--任务全部执行完,活动线程为0即将进入终结状态
TERMINATED011--终结状态

构造方法
  • 也就是常问的线程池有哪些核心参数

    • 在jdk提供的线程池中有两类线程一种是核心线程一种是救急线程(最大线程减核心线程就是救急线程数)

    • 在任务执行完毕后救急任务就会结束,直到下一次任务高峰;而核心线程是没有周期的即使没有任务也会存在

 public ThreadPoolExecutor(int corePoolSize,//核心线程数
                              int maximumPoolSize,//最大线程数目
                              long keepAliveTime,//等待时间-->指的是救急线程的时间
                              TimeUnit unit,//时间单位--》针对救急线程
                              BlockingQueue workQueue,//阻塞池类
                              ThreadFactory threadFactory,//线程工厂
                              RejectedExecutionHandler handler) {}//拒绝策略
  • 四种拒绝策略

    • ThreadPoolExecutor.AbortPolicy: 丢弃任务并抛出RejectedExecutionException异常。(默认)

    • ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。

    • ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后本任务取代

    • ThreadPoolExecutor.CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务

提交方法

execute

  • execute 方法的作 用是 command 线程池 进行执 行。
public void execute(Runnable command) {
//如果任务为null抛出异常
        if (command == null)
            throw new NullPointerException();
      //获取当前线程池状态及线程个数的组合值
        int c = ctl.get();
//如果当前线程池线程少于核心线程那么创建新的线程
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
//如果线程池处于RUNNING状态,则添加任务到阻塞队列
        if (isRunning(c) && workQueue.offer(command)) {
            //二次检查
            int recheck = ctl.get();
//如果不是运行状态就从队列中删掉,不接受这个新任务,并执行拒绝策略
            if (! isRunning(recheck) && remove(command))
                reject(command);
//否则如果当前线程池为空则添加一个线程
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
//此时任务队列也满了,那么尝试添加一个新的线程也就是救急线程,如果失败那就说明全部都满了,执行拒绝策略
        else if (!addWorker(command, false))
            reject(command);
    }

submit

  • submit提交一个callbale接口的参数,并使用Future类接受结果使用get方法获取返回的结果

  • 有返回值时callable接口没返回就是runnable

invokeAll

  • 接受一个集合,集合中是多个callable对象,返回的也是一个future的集合

invokeAny

  • 提交tasks中所有的task,会返回一个最先执行完的结果;其他的线程就不执行了(包括阻塞的线程也不执行了)

shutdown

  • 会将线程状态从Running变为shutdown;不会接收新线程;正在运行的会继续完成,阻塞的任务也会完成;主线程不会等shutdown方法,会继续执行其他代码;最后会尝试终结,如果全部线程都结束了就直接终结,如果还有其他线程就不管了

    • 如果在shutdown后面还想处理,即主线程等待shutdown执行完,需要调用awaitTermination方法

  • shutdownNow:会让线程从Running变为Stop;不会接受先任务,也不会执行阻塞任务;正在执行的直接interrupt方法打断,阻塞的任务返回List集合;最后尝试终结,肯定会成功之前所有的线程都被打断了

Executors
  • Executors其实是个工具类,有许多静态方法,更加方便的创建线程池

  • 提供了不同的实现的线程池的方法;但其实并不推荐使用这个方式

  • Executors 返回的线程池对象的弊端如下:

        1)FixedThreadPool 和 SingleThreadPool: 允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。

        2)CachedThreadPool 和 ScheduledThreadPool: 允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。

newFixedThreadPool
 public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue());
    }
  • 设为固定大小的线程池;由于核心线程数和最大线程数一样所以没有救急线程,并使用LinkedBlockingQueue创建阻塞池时没有指定大小所以是一个无界的阻塞池

  • 适用于任务量已知,并且任务执行较长的任务


newCachedThreadPool
  public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue());
    }
  • 带缓冲的线程池;核心线程数是0,所以所有的线程都是救急线程,存活时间是60s;由于也没有设置阻塞池的大小,也是无界阻塞池

  • SynchronousQueue是没有容量的,如果存放时没有来取的是放不进去的

  • 适用于线程数密集,并且任务时间较短


newSingleThreadExecutor
  public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue()));
    }
  • 单线程线程池;希望多线程排队执行,线程数只有一个,当任务多于时会进入阻塞池,是无界队列;任务执行完这个线程并不会结束

  • 和自己创建一个线程的区别:如果是自己创建的一个线程执行多个任务,那么如果其中一个发生了异常并且没有得到及时处理就会导致其他线程无法得到执行;而这个单线程线程池会保证永远有一个线程在执行,即其中一个异常并不影响其他线程的执行

  • 和固定大小的线程池设为1时的区别:固定大小的线程池返回的时线程池对象,可以通过暴露在外面的ThreadPoolExecutor接口来调用内部方法修改线程池大小等;而单线程线程池被包装过时无法修改的要永远都是1

线程池大小
  • 如果太小,会导致饥饿问题(两个线程有两个阶段,一点餐二做菜,两个线程两个阶段都能做,那么同时来了两个客人,两个人都点餐没人做菜就饥饿了)-->解决办法:创建两个线程池一个之点餐一个只做菜

  • 如果太大,更多的线程上下文切换,占用更多内存

CPU密集型运算
  • CPU核数加一个就可以充分发挥,加一是备胎

I/O密集型运算
  • 线程数=CPU核数*期望CPU利用率*总时间(CPU计算时间+等待时间)/CPU计算时间

    • 例如4核CPU计算时间50%,其他等待时间50%,期望CPU100%利用

      4*100%*100%/50%=8

ScheduledThreadPoolExecutor
  • 可以定时调用线程

  • schedule(Runnable command, long delay, TimeUnit unit) // 无返回值的延迟任务

  • schedule(Callable callable, long delay, TimeUnit unit) // 有返回值的延迟任务

  • scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) // 固定频率周期任务,若超时直接下一次

  • scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) // 固定延迟周期任务,若超时等任务结束再等delay

Fork/join
  • 1.7之后的高级线程池,将大任务拆分成很多小任务来异步执行的工具;采用分治算法

三个模块
  • 创建任务对象:ForkJoinTask

  • 执行任务对象:ForkJoinWorkerThread

  • 线程池:ForkJoinPool

  • 这三者的关系就是ForkJoinPool可以通过ForkJoinWorkerThread来执行ForkJoinTask任务

  • 核心就是将任务分成不可分割的小任务然后将这些小任务的执行分配给线程池中的线程去执行,这样提高效率;当然任务拆分还是个技术活

自定义线程池
package com.threads;

import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;


public class TestPool {

    public static void main(String[] args) {
        ThreadPools threadPools = new ThreadPools(2, 1000, TimeUnit.MILLISECONDS, 2,(queue,task)->{
            //等待超时;如果什么都不写就是放弃任务;也可以直接task.run()让调用者自己调用
            queue.offer(task,1000,TimeUnit.MILLISECONDS);
        });
        for (int i = 0; i < 5; i++) {
            int j = i;
            threadPools.execute(()->{
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName()+"第"+j+"号");
            });
        }
    }
}

class ThreadPools{
    //任务队列
    private BlockingQueue taskQueue;

    //线程集合
    private HashSet workers = new HashSet<>();

    //核心线程数
    private int coreSize;

    //超时时间
    private long timout;

    //时间单位
    private TimeUnit timeUnit;

    //拒绝策略
    private RejectionPolicy reject;

    
    public ThreadPools(int coreSize, long timout, TimeUnit timeUnit,int size,RejectionPolicy reject) {
        this.coreSize = coreSize;
        this.timout = timout;
        this.timeUnit = timeUnit;
        this.taskQueue = new BlockingQueue<>(size);
        this.reject = reject;
    }

    
    public void execute(Runnable task){
       synchronized (workers){
           if (workers.size(){
    void reject(BlockingQueue queue,T task);
}
//阻塞池子 存放任务,用来平衡消费者和生产者的速率
class BlockingQueue{
    //一个双向链表存放阻塞的任务
    private Deque deque = new ArrayDeque<>();

    
    private ReentrantLock lock = new ReentrantLock();

    //消费者等待池
    private Condition xWait = lock.newCondition();

    //生产者等待池
    private Condition sWait = lock.newCondition();

    //容量,阻塞池的大小
    private int size;

    public BlockingQueue(int size) {
        this.size = size;
    }

    public T poll(long timeout, TimeUnit unit){
        lock.lock();
        try {
            //统一单位为纳秒
            long nanos = unit.toNanos(timeout);
            while (deque.isEmpty()){
                try {
                    if (nanos<=0){
                        return null;
                    }
                    nanos = xWait.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //将第一个获取并删除
            T t = deque.removeFirst();
            sWait.signal();
            return t;
        }finally {
            lock.unlock();
        }

    }
    //获取,并且设置超时时间
    public T take(){
        lock.lock();
        try {
            while (deque.isEmpty()){
                try {
                    xWait.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //将第一个获取并删除
            T t = deque.removeFirst();
            sWait.signal();
            return t;
        }finally {
            lock.unlock();
        }
    }
    //添加
    public void put(T task){
        lock.lock();
        try {
            while (deque.size()==size){
                try {
                    System.out.println(task +"等待加入队列");
                    sWait.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println(task +"加入队列");
            deque.addLast(task);
            xWait.signal();
        }finally {
            lock.unlock();
        }
    }

    //代超时的添加方法
    public boolean offer(T task,long timeOut,TimeUnit unit){
        lock.lock();
        try {
            long nanos = unit.toNanos(timeOut);
            while (deque.size()==size){
                try {
                    if (nanos<=0){
                        return false;
                    }
                    System.out.println(task +"等待加入队列");
                    nanos = sWait.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println(task +"加入队列");
            deque.addLast(task);
            xWait.signal();
            return true;
        }finally {
            lock.unlock();
        }
    }

    //获取大小,查看阻塞池还有多少任务
    public int getSize() {
        lock.lock();
        try {
            return deque.size();
        }finally {
            lock.unlock();
        }
    }

    public void tryPut(RejectionPolicy rejection,T task) {
        lock.lock();
        try {
            //队列已满
            if (deque.size() == size){
                //也就时这个当前的队列要进行上面操作
                rejection.reject(this,task);
            }else {
                System.out.println(task +"加入队列");
                deque.addLast(task);
                xWait.signal();
            }
        }finally {
            lock.unlock();
        }
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/837026.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号