栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

java 线程 2(线程池)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

java 线程 2(线程池)

文章目录
  • 写在前面
    • 1、线程池的创建
      • 1.1、workQueue 超出后保存任务的队列
      • 1.2、饱和策略
    • 2、继承自ThreadPoolExecutor的几个线程池
      • 2.1、newCachedThreadPool 无限扩大线程池
      • 2.2、newFixedThreadPool 定长线程池
      • 2.3、newScheduledThreadPool 定长线程池,支持定时及周期性任务
      • 2.4、 newSingleThreadExecutor 单线程线程池
      • 2.5、newWorkStealingPool 多个任务队列线程池
    • 3、在spring里手动创建一个继承自ThreadPoolExecutor的线程池

写在前面 1、线程池的创建

线程池的基本参数:线程池维护最少数量、线程池维护最大数量、缓冲队列数量、空闲时间、线程工厂、饱和策略

public ThreadPoolExecutor(int corePoolSize,                      // 线程池维护最少数量
                          int maximumPoolSize,                   // 线程池维护最大数量
                          long keepAliveTime,                    // 多于的空闲线程最多存活时间
                          TimeUnit unit,                         // keepAliveTime 参数的时间单位
                          BlockingQueue workQueue,     // 超出最大值,队列保存的方式
                          ThreadFactory threadFactory,           // 执行程序创建新线程时使用的工厂
                          RejectedExecutionHandler handler       // 饱和策略
                          ) {
       ...
    }
1.1、workQueue 超出后保存任务的队列

默认是linkedBlockingQueue,可以设置一个

队列说明
无界队列无界linkedBlockingQueue,队列无大小限制,使用该队列做为阻塞队列时要尤其当心,当任务耗时较长时可能会导致大量新任务在队列中堆积最终导致OOM。
有界队列常用的有两类,一类是遵循FIFO原则的队列如ArrayBlockingQueue与有界的linkedBlockingQueue,另一类是优先级队列如PriorityBlockingQueue。PriorityBlockingQueue中的优先级由任务的Comparator决定。使用有界队列时队列大小需和线程池大小互相配合,线程池较小有界队列较大时可减少内存消耗,降低cpu使用率和上下文切换,但是可能会限制系统吞吐量。
同步移交如果不希望任务在队列中等待而是希望将任务直接移交给工作线程,可使用SynchronousQueue作为等待队列。SynchronousQueue不是一个真正的队列,而是一种线程之间移交的机制。要将一个元素放入SynchronousQueue中,必须有另一个线程正在等待接收这个元素。只有在使用无界线程池或者有饱和策略时才建议使用该队列。
1.2、饱和策略

AbortPolicy中止策略(默认策略)

  public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }

使用该策略时在饱和时会抛出RejectedExecutionException(继承自RuntimeException),调用者可捕获该异常自行处理。

DiscardPolicy抛弃策略

  public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }

DiscardOldestPolicy抛弃旧任务策略

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }

CallerRunsPolicy调用者运行

   public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }

使用该策略时线程池饱和后将由调用线程池的主线程自己来执行任务,因此在执行任务的这段时间里主线程无法再提交新任务,从而使线程池中工作线程有时间将正在处理的任务处理完成。

2、继承自ThreadPoolExecutor的几个线程池

强烈建议程序员使用较为方便的Executors工厂方法Executors.newCachedThreadPool()(无界线程池,可以进行自动线程回收)、Executors.newFixedThreadPool(int)(固定大小线程池)Executors.newSingleThreadExecutor()(单个后台线程)它们均为大多数使用场景预定义了设置。

2.1、newCachedThreadPool 无限扩大线程池

在newCachedThreadPool中如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。

     public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue());
    }
2.2、newFixedThreadPool 定长线程池

定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new linkedBlockingQueue());
    }
2.3、newScheduledThreadPool 定长线程池,支持定时及周期性任务
   public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
2.4、 newSingleThreadExecutor 单线程线程池
   public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
    }
2.5、newWorkStealingPool 多个任务队列线程池
   public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }
3、在spring里手动创建一个继承自ThreadPoolExecutor的线程池

AsyncTaskExecutePool.java

import java.util.concurrent.ThreadPoolExecutor;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import lombok.extern.slf4j.Slf4j;

@Slf4j  
@Configuration  
@EnableAsync 
public class AsyncTaskExecutePool {
	
	
	  
    private int corePoolSize = 5;  
      
    private int maxPoolSize = 50;  
      
    private int queueCapacity = 2; 
    
    
	@Bean(name = "asyncExecutor")
	public ThreadPoolTaskExecutor asyncExecutor() {
		ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();  
        executor.setCorePoolSize(corePoolSize);    
        executor.setMaxPoolSize(maxPoolSize);    
        executor.setQueueCapacity(queueCapacity);    
        executor.setKeepAliveSeconds(3600);    
        executor.setThreadNamePrefix("asyncExecutor----");    
        executor.setWaitForTasksToCompleteOnShutdown(true);
        // rejection-policy:当pool已经达到max size的时候,如何处理新任务    
        // CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行    
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());  
        executor.initialize();    
        log.info("------------------asyncExecutor start -------------------");
        return executor;
	}
	
	@Bean(name = "delAsyncExecutor")
	public ThreadPoolTaskExecutor delAsyncExecutor() {
		ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();  
        executor.setCorePoolSize(corePoolSize);    
        executor.setMaxPoolSize(maxPoolSize);    
        executor.setQueueCapacity(queueCapacity);    
        executor.setKeepAliveSeconds(3600);    
        executor.setThreadNamePrefix("delAsyncExecutor----");    
        executor.setWaitForTasksToCompleteOnShutdown(true);
        // rejection-policy:当pool已经达到max size的时候,如何处理新任务    
        // CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行    
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());  
        executor.initialize();    
        log.info("------------------delAsyncExecutor start -------------------");
        return executor;
	}

}

线程执行任务 AsyncTask.java

import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;


@Component 
@Slf4j
public class AsyncTask {
	@Async("asyncExecutor")
	public void newThread(String key,RedisDao redis)  {
		redis.delete(key);
	}
}

调用线程池实例

.
.
.

	@Autowired
	private AsyncTask asyncTask;
	@Autowired
	@Qualifier("asyncExecutor")
	private ThreadPoolTaskExecutor executor;

.
.
.

		int activeCount = executor.getActiveCount();
		while (activeCount >= 49) {
			Thread.sleep(10000);
			activeCount = executor.getActiveCount();
			log.info("executor-ActiveCount " +activeCount);
		}
		asyncTask.newThread(key,redisDao);
.
.
.
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/340268.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号