栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Java 线程池的使用【二】【ThreadPoolExecutor 】

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Java 线程池的使用【二】【ThreadPoolExecutor 】

跳转链接:Java 线程池的使用【一】【Executor】

跳转链接:Java 线程池的使用【三】【Executors】

一、ThreadPoolExecutor类的介绍:

线程池实现类ThreadPoolExecutor是Executor框架最核心的类,继承自Executor接口,里面有一个execute方法,用来执行线程,线程池主要提供一个队列,队列中保存着所有等待状态的线程。

ThreadPoolExecutor类中提供的四个构造方法。我们来看最长的那个,其余三个都是在这个构造方法基础上产生的。

public ThreadPoolExecutor(int corePoolSize,//核心线程数
                          int maximumPoolSize,//最大线程数
                          long keepAliveTime,//空闲存活时间
                          TimeUnit unit,//时间单位
                          BlockingQueue workQueue,//线程池任务队列
                          ThreadFactory threadFactory,//创建线程的工厂
                          RejectedExecutionHandler handler) {//拒绝策略
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), handler);
}

参数介绍:

1、corePoolSize 核心线程数: 是指线程池中长期存活的线程数;

2、maximumPoolSize 最大线程数: 线程池允许创建的最大线程数量,当线程数池的任务队列满了之后,可以创建的最大线程数;

注意:最大线程数 maximumPoolSize 的值不能小于核心线程数 corePoolSize,否则在程序运行时会报 IllegalArgumentException 非法参数异常。

3、keepAliveTime,空闲线程存活时间: 当线程池中没有任务时,会销毁一些线程,销毁的线程数=maximumPoolSize(最大线程数)-corePoolSize(核心线程数);

4、TimeUtil,时间单位: 空闲线程存活时间的描述单位,此参数配合参数3使用;

TimeUtil有7个参数:

  • TimeUnit.DAYS:天
  • TimeUnit.HOURS:小时
  • TimeUnit.MINUTES:分
  • TimeUnit.SECONDS:秒
  • TimeUnit.MILLISECONDS:毫秒
  • TimeUnit.MICROSECONDS:微妙
  • TimeUnit.NANOSECONDS:纳秒

5、BlockingQueue 阻塞队列: 线程池存放任务的队列,用来存储线程池的所有待执行任务。

它可以设置以下几个值:

  • ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
  • LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。
  • SynchronousQueue:一个不存储元素的阻塞队列,即直接提交给线程不保持它们。
  • PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
  • DelayQueue:一个使用优先级队列实现的无界阻塞队列,只有在延迟期满时才能从中提取元素。
  • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。与SynchronousQueue类似,还含有非阻塞方法。
  • LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

比较常用的是LinkedBlockingQueue,线程池的排队策略和BlockingQueue息息相关。

6、ThreadFactory 线程工厂: 线程池创建线程调用的工厂方法,通过此方法可以设置线程的优先级、线程命名规则以及线程类型(用户线程还是守护线程)等。

线程工厂的使用实例如下:

public static void main(String[] args) {
    //创建线程工厂    
    ThreadFactory threadFactory = new ThreadFactory() {        
        @Override
        public Thread newThread(Runnable r) {
            //创建线程池中的线程            
            Thread thread = new Thread(r);            
            //设置线程名称            
            thread.setName("Thread-" + r.hashCode());            
            //设置线程优先级(最大值:10)
            thread.setPriority(Thread.MAX_PRIORITY);            
            //......            
            return thread;        
        }    
    };    
    //创建线程池    
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 
            0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), threadFactory);
    //使用自定义的线程工厂
    threadPoolExecutor.submit(new Runnable() {        
        @Override        
        public void run() {
            Thread thread = Thread.currentThread();
            System.out.println(String.format("线程:%s,线程优先级:%d", thread.getName(), thread.getPriority()));
        }    
    });
} 

以上程序运行结果:

线程:Thread-495623552,线程优先级:10
Process finished with exit code -1

从上述执行结果可以看出,自定义线程工厂起作用了,线程的名称和线程优先级都是通过线程工厂设置的。

7、RejectedExecutionHandler 拒绝策略: 当线程池任务超出线程池队列可以存储的最大值后,执行的策略。

默认有以下四种策略:

  • AbortPolicy:拒绝并抛出异常;
  • CallerRunsPolicy:使用当前调用的线程来执行此任务;
  • DiscardOldestPolicy:抛弃队列头部(最旧)的一个任务,并执行当前任务;
  • DiscardPolicy:忽略并抛弃当前任务。

线程池的默认策略是:AbortPolicy(拒绝并抛出异常)。

二、工作过程:


1、通过execute方法添加任务时,如果工作线程数小于corePoolSize,则创建一个新线程并执行该任务。注:新创建的线程会通过while循环不断从阻塞队列获取任务执行。

2、如果工作线程数大于等于corePoolSize,则将任务添加到阻塞队列。

3、如果阻塞队列满了,则判断工作线程是否小于maximumPoolSize,如果小于则创建新线程并执行该任务,否则调用handler拒绝策略。

三、执行任务逻辑:(原码分析)

线程调用runWork方法,通过while循环从getTask方法获取任务并执行。只要getTask方法不返回null,线程就不会退出。

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

四、获取任务过程:

getTask方法实际上是从workQueue阻塞队列取任务。如果工作线程数大于corePoolSize,则会调用workQueue的poll方法获取任务,超时时间是keepAliveTime,如果超过了keepAliveTime时长,poll返回了null,上面提到的while循环就会退出,线程也就退出了。如果工作线程数小于等于corePoolSize,则会调用workQueue的take方法阻塞在当前直到有新的任务进来。

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

五、使用方法:

1、直接new一个对象:

ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
                8, 8, 0, TimeUnit.SECONDS,
                new LinkedBlockingQueue(30), 
                Executors.defaultThreadFactory(), 
                new ThreadPoolExecutor.CallerRunsPolicy());

2、通过Spring配置:


        
        
        
         
        
            
        
        
    

六、总结:

线程池大小最好是corePoolSize和maximumPoolSize保持一致,不要超过CPU核心数的3倍,如果纯计算任务(包括同机房的redis服务),只有很少的网络或者磁盘IO,那么线程池的大小设置为CPU的2倍比较合理。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/855269.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号