之前遇到过定时任务异常终止的问题,此次对 jdk 的 ScheduledThreadPoolExecutor 与 spring 的 @Scheduled 进行了测试以及源码的分析。
测试每秒执行一次,当 count == 3 时抛出异常。
JdkTestpublic class JdkTest {
private static final ScheduledExecutorService EXECUTOR = new ScheduledThreadPoolExecutor(1);
private static final AtomicInteger COUNT = new AtomicInteger();
public static void main(String[] args) throws ExecutionException, InterruptedException {
ScheduledFuture> scheduledFuture = EXECUTOR.scheduleAtFixedRate(task(), 0, 1, TimeUnit.SECONDS);
scheduledFuture.get();
}
public static Runnable task() {
return () -> {
System.out.println(COUNT.get());
if (COUNT.get() == 3) {
throw new RuntimeException();
}
COUNT.incrementAndGet();
};
}
}
SpringTest
@Component
public class SpringTest {
private final AtomicInteger count = new AtomicInteger();
@Scheduled(fixedRate = 1000)
public void test() {
System.out.println(count.get());
if (count.get() == 3) {
throw new RuntimeException();
}
count.incrementAndGet();
}
}
测试结果
- JdkTest:在抛出异常后任务就终止了
- SpringTest:抛出异常后会继续执行,并且是立即执行重试,而不是 1 秒后
public ScheduledFuture> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
// 用于表示任务
ScheduledFutureTask sft =
new ScheduledFutureTask(command,
null,
// 计算首次的执行时间
triggerTime(initialDelay, unit),
unit.toNanos(period));
// 装饰 task,默认实现为空
RunnableScheduledFuture t = decorateTask(command, sft);
sft.outerTask = t;
// 执行
delayedExecute(t);
return t;
}
private void delayedExecute(RunnableScheduledFuture> task) {
// 如果线程池正在关闭或已经关闭,则拒绝任务
if (isShutdown())
reject(task);
else {
// 将任务放入队列
super.getQueue().add(task);
// 若加入队列后线程池关闭了,根据设置删除并取消任务
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
// 启动线程
else
ensurePrestart();
}
}
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
// 若线程数小于 corePoolSize 则新建核心线程
if (wc < corePoolSize)
addWorker(null, true);
// 当线程数为 0 时,新建普通线程
else if (wc == 0)
addWorker(null, false);
}
工作线程从队列中获取任务,进而执行任务
执行任务便是调用 ScheduledFutureTask#run 方法。
// ScheduledFutureTask 继承了 FutureTask 并实现了 RunnableScheduledFuture 接口 private class ScheduledFutureTaskextends FutureTask implements RunnableScheduledFuture {...} // 继承了接口 RunnableFuture、ScheduledFuture,并添加了方法 isPeriodic public interface RunnableScheduledFuture extends RunnableFuture , ScheduledFuture { // 是否是周期性任务,周期性任务会根据某个计划多次运行。非周期性任务只能运行一次。 boolean isPeriodic(); } public interface ScheduledFuture extends Delayed, Future { } // 以延迟相关的核心接口,实现此接口必须同时实现 Comparable,且 compareTo 方法提供与其 getDelay 方法一致的顺序。 // ScheduledThreadPoolExecutor 的延迟队列是一个最小堆,需要依赖于 compareTo 方法进行比较 public interface Delayed extends Comparable { // 以给定的时间单位返回与此对象关联的剩余延迟。 long getDelay(TimeUnit unit); }
private class ScheduledFutureTask任务异常终止的罪魁祸首:FutureTask#runAndResetextends FutureTask implements RunnableScheduledFuture { private final long sequenceNumber; // 下次执行的时间 private long time; // 重复任务的周期(纳秒)。正值表示固定频率执行。负值表示固定延迟执行。0表示为非重复任务。 private final long period; // 被 reExecutePeriodic 方法重新入队的实际任务 RunnableScheduledFuture outerTask = this; // 当前任务在延迟队列中的索引,能够更加方便的取消当前任务 int heapIndex; ScheduledFutureTask(Runnable r, V result, long ns) { super(r, result); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); } ScheduledFutureTask(Runnable r, V result, long ns, long period) { super(r, result); this.time = ns; this.period = period; this.sequenceNumber = sequencer.getAndIncrement(); } ScheduledFutureTask(Callable callable, long ns) { super(callable); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); } // 用下次执行时间减去当前时间,计算出剩余的延迟时间 public long getDelay(TimeUnit unit) { return unit.convert(time - now(), NANOSECONDS); } // 用于延迟队列的入队、出队,将最小的元素(下一个要执行的任务)放在前面 public int compareTo(Delayed other) { if (other == this) // compare zero if same object return 0; // 如果是 ScheduledFutureTask 的实例,则比较 time,即下次执行时间,若 time 相同再比较序列号 if (other instanceof ScheduledFutureTask) { ScheduledFutureTask> x = (ScheduledFutureTask>)other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; } // 是否是周期性任务,即固定频率的,或固定延迟的 // 只有通过 scheduleAtFixedRate、scheduleWithFixedDelay 方法提交的任务 period 才不是 0 // 两个 schedule 方法提交的都是单次执行的任务 public boolean isPeriodic() { return period != 0; } // 设置周期性任务的下次执行时间 private void setNextRunTime() { long p = period; // 固定频率,上次任务的开始时间加上任务的执行周期 if (p > 0) time += p; // 固定延迟,上次任务的结束时间加上任务的执行周期 else time = triggerTime(-p); } // 取消执行 public boolean cancel(boolean mayInterruptIfRunning) { // 先取消任务 boolean cancelled = super.cancel(mayInterruptIfRunning); // 从队列中删除 if (cancelled && removeonCancel && heapIndex >= 0) remove(this); return cancelled; } // 重写了 FutureTask,如果是周期性任务会重新入队 public void run() { boolean periodic = isPeriodic(); // 若当前线程池状态不能运行任务,则取消 if (!canRunInCurrentRunState(periodic)) cancel(false); // 不是周期性任务,直接执行 FutureTask#run else if (!periodic) ScheduledFutureTask.super.run(); // 周期性任务执行 FutureTask#runAndReset // 且如果 runAndReset 返回 true 的话才设置任务的下次执行时间,并重新入队 // 若返回 false 则什么都不做 else if (ScheduledFutureTask.super.runAndReset()) { // 设置任务的下次执行时间 setNextRunTime(); // 重新入队 reExecutePeriodic(outerTask); } } } // 重新入队 void reExecutePeriodic(RunnableScheduledFuture> task) { // 若当前线程池状态可以运行周期性任务,则重新入队 if (canRunInCurrentRunState(true)) { super.getQueue().add(task); // 入队后再次检测状态,若状态不符合,则删除并取消任务 if (!canRunInCurrentRunState(true) && remove(task)) task.cancel(false); else ensurePrestart(); } }
执行任务单不设置结果,然后将 Future 重置为初始状态,若任务异常或被取消,则不会重置。本方法被设计为实际执行不止一次的任务。
protected boolean runAndReset() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return false;
// 默认为 false
boolean ran = false;
int s = state;
try {
Callable c = callable;
if (c != null && s == NEW) {
try {
// 执行任务
c.call(); // don't set result
// 状态改为已执行
ran = true;
} catch (Throwable ex) {
// 若抛出异常则设置异常
setException(ex);
}
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
// 任务是否已执行 && 当前状态为 NEW
return ran && s == NEW;
}
从上面可以看出,若抛出异常,那 ran 最终为 false,runAndReset 方法的返回值也是 false,所以不会将任务重新入队,即任务异常终止了。
为什么 SpringTest 抛出异常后没有终止?其实原因很简单,spring 将任务进行了包装,添加了错误处理器,从日志中可以看出是最终日志是由 TaskUtils$LoggingErrorHandler 打印的,从异常堆栈可以找到抛异常的地方是 DelegatingErrorHandlingRunnable#run 方法:
// 类很简单,只有两个字段,delegate 是任务本身,errorHandler 是错误处理器,重写了 run 方法进行错误处理
public class DelegatingErrorHandlingRunnable implements Runnable {
private final Runnable delegate;
private final ErrorHandler errorHandler;
public DelegatingErrorHandlingRunnable(Runnable delegate, ErrorHandler errorHandler) {
Assert.notNull(delegate, "Delegate must not be null");
Assert.notNull(errorHandler, "ErrorHandler must not be null");
this.delegate = delegate;
this.errorHandler = errorHandler;
}
@Override
public void run() {
// 执行任务
try {
this.delegate.run();
}
// 若抛出异常由 ErrorHandler 处理
catch (UndeclaredThrowableException ex) {
this.errorHandler.handleError(ex.getUndeclaredThrowable());
}
catch (Throwable ex) {
this.errorHandler.handleError(ex);
}
}
@Override
public String toString() {
return "DelegatingErrorHandlingRunnable for " + this.delegate;
}
}
TaskUtils 中 ErrorHandler 有两种实现:LoggingErrorHandler、PropagatingErrorHandler,重复执行的任务使用 LoggingErrorHandler,只执行一次的任务使用 PropagatingErrorHandler:
public abstract class TaskUtils {
// 只打印 error 日志,但不执行进一步的处理。这将抑制错误,从而不会阻止任务的后续执行。
public static final ErrorHandler LOG_AND_SUPPRESS_ERROR_HANDLER = new LoggingErrorHandler();
// 打印 error 日志,然后重新抛出异常。注意:这通常会阻止计划任务的后续执行。
public static final ErrorHandler LOG_AND_PROPAGATE_ERROR_HANDLER = new PropagatingErrorHandler();
private static class LoggingErrorHandler implements ErrorHandler {
private final Log logger = LogFactory.getLog(LoggingErrorHandler.class);
@Override
public void handleError(Throwable t) {
// 只打印日志
logger.error("Unexpected error occurred in scheduled task", t);
}
}
private static class PropagatingErrorHandler extends LoggingErrorHandler {
@Override
public void handleError(Throwable t) {
// 打印日志
super.handleError(t);
// 重写抛出异常
ReflectionUtils.rethrowRuntimeException(t);
}
}
// 使用 ErrorHandler 装饰 Runnable
public static DelegatingErrorHandlingRunnable decorateTaskWithErrorHandler(
Runnable task, @Nullable ErrorHandler errorHandler, boolean isRepeatingTask) {
if (task instanceof DelegatingErrorHandlingRunnable) {
return (DelegatingErrorHandlingRunnable) task;
}
// 根据是否是重复性任务获取 ErrorHandler
ErrorHandler eh = (errorHandler != null ? errorHandler : getDefaultErrorHandler(isRepeatingTask));
return new DelegatingErrorHandlingRunnable(task, eh);
}
// 根据是否是重复性任务获取 ErrorHandler,对于重复任务,它将抑制错误,对于一次性任务,它将传播。在这两种情况下,都记录 error 日志。
public static ErrorHandler getDefaultErrorHandler(boolean isRepeatingTask) {
return (isRepeatingTask ? LOG_AND_SUPPRESS_ERROR_HANDLER : LOG_AND_PROPAGATE_ERROR_HANDLER);
}
}



