线程池的核心类- ThreadPoolExecutor
线程池重要的方法 创建线程池
ThreadPoolExecutor自定义Executor创建 多线程异常处理
多线程异常处理
try-catch Callable接口Executor改写UncaughtExceptionHandler接口 多线程定时任务
java定时器Timer类ScheduledExecutorService运行多任务Spring-Task任务调度工具Quartz大数据任务调度框架
线程池的核心类- ThreadPoolExecutorThreadPoolExecutor extends AbstractExecutorService
Executors
ExecutorService
构造方法
ThreadPoolExecutor(int corePoolSize, // 核心线程数
int maximumPoolSize, // 最大线程数
long keepAliveTime, // 空闲时间
TimeUnit unit, // 时间单位
BlockingQueue workQueue, // 任务队列
ThreadFactory threadFactory, // 线程工厂
RejectedExecutionHandler handler // 拒绝策略
)
任务队列
BlockingQueue接口的实现类
1. SynchronousQueue类
2. linkedBlockingQueue类
3. ArrayBlockingQueue类
4. linkedTransferQueue类
线程池重要的方法
execute
execute(Runnable command) 1. 充当核心线程运行 2. 放入任务队列,需要double-check 3. 如果放不进任务队列,尝试运行新线程,失败了就拒绝
addWorker
addWorker(Runnable firstTask, boolean core) 根据核心线程数和最大线程数判断是否添加新的线程 第一个参数是execute提交的任务 第二个参数是核心线程或者最大线程
submit
submit(Runnable task) 未来能得到返回值创建线程池 ThreadPoolExecutor自定义
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5,
10,
10,
TimeUnit.SECONDS,
new linkedBlockingQueue<>(10),
new ThreadPoolExecutor.CallerRunsPolicy());
Executor创建
ExecutorService p1 = Executors.newFixedThreadPool(3); ScheduledExecutorService p2 = Executors.newSingleThreadScheduledExecutor(); ExecutorService p3 = Executors.newSingleThreadExecutor(); ExecutorService p4 = Executors.newCachedThreadPool(); ScheduledExecutorService p5 = Executors.newScheduledThreadPool(5); ExecutorService p6 = Executors.newWorkStealingPool(2);多线程异常处理
例子 – 无法捕获异常
public class MultiThreadsException {
public static void main(String[] args) {
Thread thread = new Thread(new ThrowsExceptionRun());
try {
thread.start();
}catch (Exception e){
System.out.println("捕获到异常。。。");
}
}
static class ThrowsExceptionRun implements Runnable {
@Override
public void run() {
int dividend = 25;
int divisor = 3;
while (divisor >=0){
System.out.println(dividend + " 和 " + divisor + "相除结果是: " + dividend / divisor);
divisor --;
}
System.out.println("结束运算。。。");
}
}
}
// 不能捕获异常
原因:
Thread所创建的线程需要在自己的逻辑中,即run方法中捕获异常
不能抛出异常给外界的线程
底层原因是Thread默认使用uncaughtException()方法空处理,虚拟机会忽略该方法之后的抛出异常
多线程异常处理
1. 在Thread及Runnable的run方法中try-catch 2. 使用Callable接口创建线程,通过get() 方法获取异常 3. 使用线程池Executor设置改写的ExceptionHandler来处理线程异常try-catch
public class MultiThreadsException {
public static void main(String[] args) {
Thread thread = new Thread(new ThrowsExceptionRun());
try {
thread.start();
}catch (Exception e){
System.out.println("捕获到异常。。。");
}
}
static class ThrowsExceptionRun implements Runnable {
@Override
public void run() {
int dividend = 25;
int divisor = 3;
while (divisor >=0){
try {
System.out.println(dividend + " 和 " + divisor + "相除结果是: " + dividend / divisor);
}catch (Exception e){
System.out.println("哟,有异常了");
throw e;
}
divisor --;
}
System.out.println("结束运算。。。");
}
}
}
Callable接口
public class MultiThreadsException {
public static void main(String[] args) {
Callable getCallThread = new ThrowsExceptionCall();
FutureTask futureTask = new FutureTask<>(getCallThread);
Thread thread = new Thread(futureTask);
thread.start();
try {
TimeUnit.SECONDS.sleep(3);
String s = futureTask.get();
System.out.println(s);
} catch (Exception e) {
System.out.println("异常捕获到了。。。");
e.printStackTrace();
}
}
static class ThrowsExceptionCall implements Callable {
@Override
public String call() throws Exception {
String fullString = "I'm OK,do not worry about me";
return fullString.substring(3, 60); // 下标越界异常
}
}
Executor改写UncaughtExceptionHandler接口
public class ExecutorCatchException {
public static void main(String[] args) throws InterruptedException {
ExecutorService service = Executors.newCachedThreadPool(new SuccessCatchThreadExceptionFactory());
service.execute(new ThrowsExceptionRun());
service.awaitTermination(3, TimeUnit.SECONDS);
service.shutdown();
}
}
// 线程工厂 设置了异常处理器
class SuccessCatchThreadExceptionFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable);
thread.setUncaughtExceptionHandler(new SuccessCatchThreadExceptionHandler());
return thread;
}
}
// 异常处理器
class SuccessCatchThreadExceptionHandler implements Thread.UncaughtExceptionHandler{
@Override
public void uncaughtException(Thread thread, Throwable e) {
System.out.println(thread.getName() + "抛出了异常,请看: " + e.getMessage());
}
}
// 任务
class ThrowsExceptionRun implements Runnable {
@Override
public void run() {
int dividend = 25;
int divisor = 3;
while (divisor >= 0) {
System.out.println(dividend + " 和 " + divisor + "相除结果是: " + dividend / divisor);
divisor--;
}
System.out.println("结束运算。。。");
}
}
多线程定时任务
java定时器Timer类
package com.chauncy.timers;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicInteger;
public class TestMultiTimerTask {
static volatile AtomicInteger decreaseInteger = new AtomicInteger(8);
public static void main(String[] args) {
Timer timer = new Timer();
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMdd HH:mm:ss");
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("我是第一个TimerTask线程,当前时间为: " + dateFormat.format(new Date()));
}
}, 2000, 5000);
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("我是第二个TimerTask线程,当前时间为: " + dateFormat.format(new Date()));
int de = TestMultiTimerTask.decreaseInteger.decrementAndGet();
int i = (100 / de);
}
}, 3001, 1000);
}
}
原理
优先级任务队列 TaskQueue queue = new TaskQueue()
线程 TimerThread thread = new TimerThread(queue);
启动的线程会等待任务 -- queue.wait();
放任务到优先级队列 queue.add(task);
取任务 task = queue.getMin();
执行 task.run();
schedule(TimerTask task, long delay, long period) ->
sched(task, System.currentTimeMillis()+delay, -period) ->
sched(TimerTask task, long time, long period) ->
task.nextExecutionTime = time
task.period = period;
task.state = TimerTask.SCHEDULED;
queue.add(task); -> queue[++size] = task; fixUp(size);
loop线程
队列为空时等待 queue.wait();
取任务 task = queue.getMin();
是否执行 taskFired = (executionTime<=currentTime)
queue.rescheduleMin(加上period时间) -> queue[1].nextExecutionTime = newTime;
执行任务 task.run();
ScheduledExecutorService运行多任务
package com.chauncy.timers;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import java.util.TimerTask;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class ScheduleExecutorServiceTest {
static volatile AtomicInteger decreaseInteger = new AtomicInteger(8);
public static void main(String[] args) {
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(4, new BasicThreadFactory.Builder()
.namingPattern("example-schedule-pool-%d").daemon(true).build());
//ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
executor.scheduleAtFixedRate(() -> System.out.println( Thread.currentThread().getName() + "第一次延迟2s执行,以后每隔5s执行一次"),
2000,
5000,
TimeUnit.MILLISECONDS);
executor.schedule(() -> {
System.out.println(Thread.currentThread().getName()+ " 每隔3s执行");
},3,TimeUnit.SECONDS);
executor.scheduleWithFixedDelay(() -> System.out.println(Thread.currentThread().getName() + " 延迟1s执行。。。"),1,1,TimeUnit.SECONDS);
executor.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 第一次延迟3s执行,以后每隔1s执行一次");
int de = ScheduleExecutorServiceTest.decreaseInteger.decrementAndGet();
try {
int i = (100 / de);
}catch (Exception e){
e.printStackTrace();
}
}
},3001,1000, TimeUnit.MILLISECONDS);
while (true){
// main线程活着
}
}
}
Spring-Task任务调度工具
Quartz大数据任务调度框架


