在 springBoot 项目中,要使用定时任务十分容易,我们只需使用 @EnableScheduling 开启定时任务支持,再配合 @Scheduled(cron = "cron表达式"),即可完成定时任务的集成,简单方便的同时,此种方式却也存在着硬编码问题,当我们需要动态的开启或关闭一个定时任务时,就需要修改源码重启项目,才能生效,达不到动态效果,下文教大家如何实现一个定时任务的动态增删改功能。
首先看下我们要使用的三张表:
scheduled_job :在定时任务中执行的工作 表
表记录:
理论上这张表是跑完初始化脚本就不会再修改的,这个表的每一条记录对应代码里的一个处理任务的Bean,这种硬编码是必然存在的,应该没人会想要在数据库添一条记录就要在代码里动态添加一个类型的Bean的需求吧?如果有,那什么动态字节码通过远程网络加载一个Bean,自己去搞吧,哈哈。。。
scheduled_corn : 存 corn 表达式的表
表记录:
这个表,理论上存一些常用的corn表达式,基本也不会去修改了,我们主要是修改下面的关系表。
scheduled_corn_job :中间关系表
这个表先空着,我们在测试时再插入数据。
下面开始环境搭建。
POM依赖:
org.springframework.boot spring-boot-starter-weborg.mybatis.spring.boot mybatis-spring-boot-startercom.baomidou mybatis-plus-boot-starter... 其他略... org.projectlombok lombok
yaml配置,几乎没什么内容:
server:
port: 8080
# Spring相关
spring:
# 数据源相关
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/xxx?useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC&useSSL=true
username: xxx
password: xxx
VO类:
@Data
@TableName("scheduled_corn_job")
public class ScheduledCornJobVO {
// 主键
@TableId(type = IdType.AUTO)
private Long id;
// 在内存存储时的key值
private String storeKey;
// 外键: corn表id
private Long cornId;
// 外键: job表id
private Long jobId;
// 是否开启(0:关闭,1:开启)
private String status;
}
@Data
@TableName("scheduled_corn")
public class ScheduledCornVO extends baseVO {
// 主键
@TableId(type = IdType.AUTO)
private Long id;
// 任务表达式
private String corn;
// 表达式描述
private String description;
// 提供转换为CronTrigger的工具方法
public CronTrigger toCronTrigger() {
return new CronTrigger(this.corn);
}
}
@Data
@TableName("scheduled_job")
public class ScheduledJobVO extends baseVO {
// 主键
@TableId(type = IdType.AUTO)
private Long id;
// 执行具体工作的bean名称
private String beanName;
// 工作描述
private String description;
}
Mapper 和 Service没什么可看的,就简单的贴一起了:
public interface ScheduledCornJobMapper extends baseMapper{ } public interface ScheduledCornMapper extends baseMapper { } public interface ScheduledJobMapper extends baseMapper { } public interface ScheduledCornJobService { List list(); List list(String status); ScheduledCornJobVO one(Long id); Integer add(ScheduledCornJobVO scheduledCornJob); Integer update(ScheduledCornJobVO scheduledCornJob); } public interface ScheduledCornService { List list(); } public interface ScheduledJobService { List list(); } -------------------------------- 实现类 -------------------------------- @AllArgsConstructor @Service public class ScheduledCornJobServiceImpl implements ScheduledCornJobService { private final ScheduledCornJobMapper scheduledCornJobMapper; @Override public List list() { return new LambdaQueryChainWrapper<>(scheduledCornJobMapper).list(); } @Override public List list(String status) { return new LambdaQueryChainWrapper<>(scheduledCornJobMapper).eq(ScheduledCornJobVO::getStatus, status).list(); } @Override public ScheduledCornJobVO one(Long id) { return new LambdaQueryChainWrapper<>(scheduledCornJobMapper).eq(ScheduledCornJobVO::getId, id).one(); } @Override public Integer add(ScheduledCornJobVO scheduledCornJob) { return scheduledCornJobMapper.insert(scheduledCornJob); } @Override public Integer update(ScheduledCornJobVO scheduledCornJob) { return scheduledCornJobMapper.update( scheduledCornJob, new LambdaUpdateWrapper ().eq( ScheduledCornJobVO::getId, scheduledCornJob.getId() )); } } @AllArgsConstructor @Service public class ScheduledCornServiceImpl implements ScheduledCornService { private final ScheduledCornMapper scheduledCornMapper; @Override public List list() { return new LambdaQueryChainWrapper<>(scheduledCornMapper).list(); } } @AllArgsConstructor @Service public class ScheduledJobServiceImpl implements ScheduledJobService { private final ScheduledJobMapper scheduledJobMapper; @Override public List list() { return new LambdaQueryChainWrapper<>(scheduledJobMapper).list(); } }
config类 和 controller类 是重点,下面开始介绍:
配置类一共有两个,先说简单的,这个配置类仅仅是用来简单的注入一些我们的定时任务工作的实现类:
@SpringBootConfiguration
public class JobConfig {
@Bean(name = "clean")
public Worker cleanJob() {
return () -> System.out.println(Thread.currentThread().getName() + "正在执行清理数据的工作...");
}
@Bean(name = "count")
public Worker countJob() {
return () -> System.out.println(Thread.currentThread().getName() + "正在执行统计报表的工作...");
}
@Bean(name = "printA")
public Worker printA() {
return () -> System.out.println(Thread.currentThread().getName() + "正在执行打印a的工作...");
}
@Bean(name = "printB")
public Worker printB() {
return () -> System.out.println(Thread.currentThread().getName() + "正在执行打印b的工作...");
}
}
@FunctionalInterface
public interface Worker extends Runnable {
void doWork();
default void run() {
doWork();
}
}
@AllArgsConstructor
@SpringBootConfiguration
public class TaskConfig {
// 走构造注入 使用lombok生成全参数构造
private final ScheduledCornService scheduledCornService;
private final ScheduledJobService scheduledJobService;
private final ScheduledCornJobService scheduledCornJobService;
// 依赖搜索:拿到spring容器中的所有实现类,即上面配置类中配置的bean
private final Map allWorkerMap;
@Bean(name = "workerMap")
public Map workerMap() {
return new ConcurrentHashMap<>();
}
@Bean(name = "triggerMap")
public Map triggerMap() {
return new ConcurrentHashMap<>();
}
@Bean(name = "scheduledFutureMap")
public Map scheduledFutureMap() {
return new ConcurrentHashMap<>();
}
@Bean(name = "threadPoolTaskScheduler")
public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
threadPoolTaskScheduler.setPoolSize(Runtime.getRuntime().availableProcessors());
threadPoolTaskScheduler.setThreadNamePrefix("WorKerThread:");
threadPoolTaskScheduler.setWaitForTasksToCompleteonShutdown(true);
threadPoolTaskScheduler.setAwaitTerminationSeconds(30);
return threadPoolTaskScheduler;
}
@PostConstruct
public void initMap() {
// 初始化工作注册表
initWorkerMap();
// 初始化触发器注册表
initTriggerMap();
// 初始化ScheduledFuture注册表
initScheduledFutureMap();
}
private void initWorkerMap() {
// 查询数据库配置的所有job记录
List jobList = scheduledJobService.list();
// 拿到我们注册的工作类 Worker 和数据库配置的 scheduled_job 工作来匹配
Map effectiveWorkerMap = jobList.stream().collect(Collectors.toMap(ScheduledJobVO::getId, scheduledJob -> allWorkerMap.get(scheduledJob.getBeanName())));
// 保存起来,方便后续操作
workerMap().putAll(effectiveWorkerMap);
}
private void initTriggerMap() {
// 查询数据库配置的所有corn记录
List cornList = scheduledCornService.list();
// 将我们数据库表记录 scheduled_corn 存起来,不过要先转化为CronTrigger对象
// 参看 {@link ScheduledCornVO#toCronTrigger}
Map cronTriggerMap = cornList.stream().collect(Collectors.toMap(ScheduledCornVO::getId, ScheduledCornVO::toCronTrigger));
// 匹配到的保存起来,方便后续操作
triggerMap().putAll(cronTriggerMap);
}
private void initScheduledFutureMap() {
// 查询数据库配置的corn-job关系表的有效记录(已经配置开启的status=1)
List cornJobList = scheduledCornJobService.list("1");
// 线程池
ThreadPoolTaskScheduler threadPoolTaskScheduler = threadPoolTaskScheduler();
// scheduledFuture注册表
Map scheduledFutureMap = scheduledFutureMap();
// 已经初始化完成的工作类注册表
Map workerMap = workerMap();
// 已经初始化完成的触发器注册表
Map triggerMap = triggerMap();
// 遍历进行定时任务的批量开启
cornJobList.forEach(cornJob -> {
// 每开启一个定时任务所产生的 schedule对象,用于后续的关闭操作,所以要保存起来
ScheduledFuture> schedule = threadPoolTaskScheduler.schedule((workerMap.get(cornJob.getJobId())), triggerMap.get(cornJob.getCornId()));
// 用我们配置的指定key值来保存至注册表中
scheduledFutureMap.put(cornJob.getStoreKey(), schedule);
});
}
}
@RequiredArgsConstructor
@RestController
@RequestMapping("/task")
public class TaskController {
private final ScheduledCornJobService scheduledCornJobService;
private final ThreadPoolTaskScheduler threadPoolTaskScheduler;
private final Map scheduledFutureMap;
private final Map workerMap;
private final Map triggerMap;
private Map> operationMap = new HashMap<>();
// 初始化operationMap 的两个操作
@PostConstruct
private void initOperationMap() {
// 定时任务:打开操作
Consumer open = cornJob -> {
// 动态开启一个定时任务
ScheduledFuture> schedule = threadPoolTaskScheduler.schedule(workerMap.get(cornJob.getJobId()), triggerMap.get(cornJob.getCornId()));
// 将任务添加至注册表中进行维护
scheduledFutureMap.put(cornJob.getStoreKey(), schedule);
};
operationMap.put("1", open);
// 定时任务:关闭操作
Consumer close = cornJob -> {
String key = cornJob.getStoreKey();
// 取消此定时任务
scheduledFutureMap.get(key).cancel(true);
// 从注册表中移除
scheduledFutureMap.remove(key);
};
operationMap.put("0", close);
}
// 向scheduled_corn_job中插入记录,并刷新定时任务
@PostMapping
public String add(@RequestBody ScheduledCornJobVO scheduledCornJob) {
// 操作关系表
scheduledCornJobService.add(scheduledCornJob);
// 刷新内存中定时任务
refresh(scheduledCornJob.getId());
return "操作成功";
}
// 开启一个定时任务或者关闭一个定时任务
@PutMapping
public String update(@RequestBody ScheduledCornJobVO scheduledCornJob) {
// 操作关系表
scheduledCornJobService.update(scheduledCornJob);
// 刷新内存中定时任务
refresh(scheduledCornJob.getId());
return "操作成功";
}
// 刷新所有,我们为了测试方便,直接改表记录,调用此方法就行
@GetMapping("/refreshAll")
public void refreshAll() {
List cornJobList = scheduledCornJobService.list();
cornJobList.stream().forEach(this::refresh);
}
// 通过id去查询数据库完成单个定时任务的刷新,调用下面重载方法,传入查询的实时结果
private void refresh(Long id) {
refresh(scheduledCornJobService.one(id));
}
// 重载方法:根据传入的查询的实时结果,拿到定时任务的状态(开启/关闭),去operationMap中取对应操作来执行
private void refresh(ScheduledCornJobVO cornJob) {
operationMap.get(cornJob.getStatus()).accept(cornJob);
}
}
至此以上核心代码讲解完成,下面进入测试阶段:
先启动项目,此时控制台并无任何输出。。
我们向scheduled_corn_job表插入一条如下记录:表示每10s调用一次清理数据工作...
我们打开浏览器调用 http://localhost:8080/task/refreshAll 接口,10s后控制台开始输出结果,说明定时任务的动态添加功能已经生效。
我们来修改一下工作内容以及执行频率,修改为如下:每一秒执行一次打印a工作..
调用 http://localhost:8080/task/refreshAll 接口进行刷新,输出如下,成功切换工作内容及执行频率。
我们再将status修改为0
调用 http://localhost:8080/task/refreshAll 接口进行刷新
控制台不再输出,定时任务停止成功,至此,所有定时任务的动态增删改完成。
值得说明的是:增加定时任务,我们只能在已有的任务实现类中来选择动态的做某件事,而添加一件之前就不存在实现类的事,我认为属于奇葩需求,这里就不去实现了。
至此,全文完结,笔者技术、文笔有限,感谢大家的耐心观看。



