1.1 定时任务的动态管理;
1.2 单个执行器可配置多个cron表达式,及多次不同规则执行;
1.3 创建定时任务时,可将业务参数传入到执行器中进行业务关联。
以上需求,在网上搜索,基本都是只实现了1.1,一个执行器对应一个cron表达式,千篇一律的代码复制、粘贴。又不想花太多的时间去了解xxl-job,因为一来项目上用不到定时任务分布式管理,二来也大概搜索过,没发现能完全满足以上需求的文章。所以,只能自己想办法解决了。
实现思路:利用反射找到类及有参构造函数。
2. 实现过程PS:代码内有相关业务类和代码,实际使用时删除,只需要关心定时任务的动态管理代码就行了
2.1 创建定时任务配置表和java对象create table `cj_scheduled_task` ( `id` varchar(50) primary key comment '主键id', `task_class` varchar(100) not null comment '定时任务完整类名', `cron_expression` varchar(20) not null comment 'cron表达式', `task_explain` varchar(200) default null comment '任务描述', `status` tinyint(1) not null comment '状态:1.启用;2.停用', `create_by` varchar(50) comment '创建人', `create_time` datetime comment '创建时间', `update_by` varchar(50) comment '修改人', `update_time` datetime comment '修改时间' ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COMMENT = '定时任务';
package com.xxx.modules.chunjun.desreport.entity.scheduled;// 包路径,需要自己修改,下同
import java.io.Serializable;
import java.util.Date;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
@TableName("cj_scheduled_task")
@Data
public class ScheduledTask implements Serializable {
private static final long serialVersionUID = 1L;
@TableId(type = IdType.ASSIGN_UUID)
@ApiModelProperty(value = "主键id")
private String id;
@TableField("task_name")
@ApiModelProperty(value = "定时任务名称")
private String taskName;
@TableField("task_class")
@ApiModelProperty(value = "定时任务完整类名")
private String taskClass;
@TableField("cron_expression")
@ApiModelProperty(value = "cron表达式")
private String cronexpression;
@TableField("task_explain")
@ApiModelProperty(value = "任务描述")
private String taskExplain;
@TableField("status")
@ApiModelProperty(value = "状态:1.启用;2.停用")
private int status;
@TableField("create_by")
@ApiModelProperty(value = "创建人")
private String createBy;
@TableField("create_time")
@ApiModelProperty(value = "创建时间")
private Date createTime;
@TableField("update_by")
@ApiModelProperty(value = "修改人")
private String updateBy;
@TableField("update_time")
@ApiModelProperty(value = "修改时间")
private Date updateTime;
}
package com.xxx.modules.chunjun.desreport.model;
import java.io.Serializable;
import java.util.List;
import com.xxx.modules.chunjun.desreport.entity.scheduled.ScheduledTask;
import com.xxx.modules.chunjun.desreport.entity.scheduled.ScheduledTaskReport;
import io.swagger.annotations.ApiModelProperty;
import lombok.Getter;
import lombok.Setter;
@Getter
@Setter
public class ScheduledTaskVO extends ScheduledTask implements Serializable {
private static final long serialVersionUID = 1L;
@ApiModelProperty(value = "任务报表关联集")
private List strList;
@ApiModelProperty(value = "批量删除id集")
private List ids;
}
2.3 ThreadPoolTaskScheduler配置类
package com.xxx.modules.chunjun.config.scheduled;// 包路径,需要自己修改
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import lombok.extern.slf4j.Slf4j;
@Configuration
@Slf4j
public class ScheduledConfig {
@Bean
public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
log.info("创建定时任务调度线程池 start");
ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
threadPoolTaskScheduler.setPoolSize(20);
threadPoolTaskScheduler.setThreadNamePrefix("taskExecutor-");
threadPoolTaskScheduler.setWaitForTasksToCompleteonShutdown(true);
threadPoolTaskScheduler.setAwaitTerminationSeconds(60);
log.info("创建定时任务调度线程池 end");
return threadPoolTaskScheduler;
}
}
2.4 定时任务管理服务接口
package com.xxx.modules.chunjun.desreport.service;
import java.util.List;
import com.xxx.modules.chunjun.common.vo.Result;
import com.xxx.modules.chunjun.desreport.entity.scheduled.ScheduledTask;
import com.xxx.modules.chunjun.desreport.model.ScheduledTaskVO;
public interface IScheduledTaskService {
List listAll();
Result> pageList(String taskClass);
ScheduledTask getById(String id);
Result> start(String id);
Result> stop(String id);
Result> restart(String id);
Result> update(ScheduledTaskVO st);
Result> insert(ScheduledTaskVO st);
void initTask();
Result> pageListActuator(String keywords);
}
2.5 定时任务管理服务实现
属性说明:
taskbaseTaskList:执行器列表,写到配置文件中,如:
task: baseTaskList: 生成excel:com.trasen.modules.chunjun.desreport.task.GenerateExcelTask
package com.xxx.modules.chunjun.desreport.service.impl; import java.lang.reflect.Constructor; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.scheduling.support.CronTrigger; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.Assert; import org.springframework.util.StringUtils; import com.alibaba.fastjson.JSONObject; import com.xxx.modules.chunjun.common.base.query.CriteriaQuery; import com.xxx.modules.chunjun.common.base.service.baseServiceImpl; import com.xxx.modules.chunjun.common.util.LoginUserUtil; import com.xxx.modules.chunjun.common.vo.Result; import com.xxx.modules.chunjun.desreport.entity.scheduled.ScheduledTask; import com.xxx.modules.chunjun.desreport.entity.scheduled.ScheduledTaskReport; import com.xxx.modules.chunjun.desreport.mapper.ScheduledTaskMapper; import com.xxx.modules.chunjun.desreport.model.ScheduledTaskActuator; import com.xxx.modules.chunjun.desreport.model.ScheduledTaskVO; import com.xxx.modules.chunjun.desreport.service.IScheduledTaskReportService; import com.xxx.modules.chunjun.desreport.service.IScheduledTaskService; import com.xxx.modules.chunjun.desreport.task.ScheduledOfTask; import lombok.extern.slf4j.Slf4j; @Slf4j @Service public class ScheduledTaskServiceImpl extends baseServiceImpl2.6 执行器父类implements IScheduledTaskService { @Value("${task.baseTaskList}") private String taskbaseTaskList; private ReentrantLock lock = new ReentrantLock(); @Autowired private ThreadPoolTaskScheduler threadPoolTaskScheduler; @Autowired private IScheduledTaskReportService strService; @SuppressWarnings("rawtypes") public Map scheduledFutureMap = new ConcurrentHashMap (); @Override public List listAll() { return this.list(); } @Override public Result> pageList(String taskClass) { CriteriaQuery cq = new CriteriaQuery (this.request); cq.like(null != taskClass && !taskClass.isEmpty(), ScheduledTask::getTaskClass, taskClass); return this.pageList(cq); } @Override public ScheduledTask getById(String id) { return super.getById(id); } @Override public Result> start(String id) { // 根据id查询任务 ScheduledTask task = super.getById(id); // 判断任务是否启用 if (1 != task.getStatus()) { return Result.error("定时任务未启用,无法执行!"); } String taskClass = task.getTaskClass(); log.info("启动定时任务:" + taskClass); // 添加锁放一个线程启动,防止多人启动多次 lock.lock(); log.info("加锁完成"); try { if (this.isStart(id)) { String msg = "当前任务在启动状态中"; log.info(msg); return Result.error(msg); } // 任务启动 this.doStartTask(task); } finally { lock.unlock(); log.info("解锁完毕"); } return Result.OK(); } @SuppressWarnings("rawtypes") @Override public Result> stop(String id) { // 根据id查询任务 log.info("停止任务: " + id); boolean flag = scheduledFutureMap.containsKey(id); log.info("当前实例是否存在 " + flag); if (flag) { ScheduledFuture scheduledFuture = scheduledFutureMap.get(id); scheduledFuture.cancel(true); scheduledFutureMap.remove(id); } else { String msg = "当前任务不存在!"; log.info(msg + id); Result.error(msg); } return Result.OK(); } @Override public Result> restart(String id) { log.info("重启定时任务:" + id); // 停止 this.stop(id); // 启动 return this.start(id); } @Override public Result> update(ScheduledTaskVO st) { // 赋值更新人、更新时间 st.setUpdateBy(LoginUserUtil.getCurrentUserId()); st.setUpdateTime(new Date()); if (!super.updateById(st)) { String msg = "新增失败!"; log.error(msg); return Result.error(msg); } // 批量保存关联数据 JSonObject params = new JSonObject(); List strList = st.getStrList(); if (null != strList && !strList.isEmpty()) { // 赋值任务id for (ScheduledTaskReport str : strList) { str.setTaskId(st.getId()); } } params.put("strList", strList); params.put("ids", st.getIds()); if (!strService.saveBatch(params).isSuccess()) { String msg = "批量保存失败!"; log.error(msg); return Result.error(msg); } return Result.OK(); } @Override @Transactional public Result> insert(ScheduledTaskVO st) { // 赋值创建人、创建时间 st.setCreateBy(LoginUserUtil.getCurrentUserId()); st.setCreateTime(new Date()); // 新增定时任务 if (!super.save(st)) { String msg = "新增失败!"; log.error(msg); return Result.error(msg); } // 新增关联数据 JSonObject params = new JSonObject(); List strList = st.getStrList(); if (null != strList && !strList.isEmpty()) { // 赋值任务id for (ScheduledTaskReport str : strList) { str.setTaskId(st.getId()); } params.put("strList", strList); if (!strService.saveBatch(params).isSuccess()) { String msg = "批量新增失败!"; log.error(msg); return Result.error(msg); } } return Result.OK(); } @Override public void initTask() { List stList = super.list(); if (null != stList && !stList.isEmpty()) { // 循环所有定时任务 for (ScheduledTask st : stList) { // 判断是否启用 if (1 != st.getStatus()) { continue; } // 执行任务 doStartTask(st); } } } @SuppressWarnings(value = {"unchecked", "rawtypes"}) private void doStartTask(ScheduledTask scheduledTask) { String taskClass = scheduledTask.getTaskClass(); log.info(taskClass); if (1 != scheduledTask.getStatus()) { return; } Class clazz; ScheduledOfTask task; try { // 通过类全路径找到类 clazz = Class.forName(taskClass); // 获取构造函数 Constructor> cons = clazz.getConstructor(ScheduledTask.class); // 实例化对象,传入参数,用于执行器关联业务 task = (ScheduledOfTask)cons.newInstance(scheduledTask); // 检查执行器是否继承父类,如果未继承,则直接中断 Assert.isAssignable(ScheduledOfTask.class, task.getClass(), "定时任务业务类必须继承ScheduledOfTask父类"); // 实例化任务触发器 CronTrigger cronTrigger = new CronTrigger(scheduledTask.getCronexpression()); // 实例化定时任务 ScheduledFuture scheduledFuture = threadPoolTaskScheduler.schedule(task, (triggerContext -> cronTrigger.nextExecutionTime(triggerContext))); // 存入定时任务,用于查找和停用 scheduledFutureMap.put(scheduledTask.getId(), scheduledFuture); } catch (Exception e) { log.error("定时任务【" + taskClass + "】启动异常。", e); } } private Boolean isStart(String id) { // 校验是否已经启动 if (scheduledFutureMap.containsKey(id)) { if (!scheduledFutureMap.get(id).isCancelled()) { return true; } } return false; } @Override public Result> pageListActuator(String keywords) { // 从配置文件读取并初始化执行器数据 String[] taskbaseTaskArray = taskbaseTaskList.split(","); List bList = new ArrayList (); if (null != taskbaseTaskArray && taskbaseTaskArray.length > 0) { for (int i = 0; i < taskbaseTaskArray.length; i++) { String[] taskbaseTaskAttr = taskbaseTaskArray[i].split(":"); String tempName = taskbaseTaskAttr[0]; String tempClass = taskbaseTaskAttr[1]; boolean isAdd = false; // 判断并添加条件 if (!StringUtils.isEmpty(keywords)) { // 添加名称模糊查询条件 // 模糊查询 使用(Pattern、Matcher) Pattern pattern1 = Pattern.compile(keywords); Pattern pattern2 = Pattern.compile(keywords); Matcher matcher1 = pattern1.matcher(tempName); Matcher matcher2 = pattern2.matcher(tempClass); if (matcher1.find() || matcher2.find()) { isAdd = true; } } else { isAdd = true; } if (isAdd) { ScheduledTaskActuator temp = new ScheduledTaskActuator(); temp.setActuatorName(taskbaseTaskAttr[0]); temp.setTaskClass(taskbaseTaskAttr[1]); bList.add(temp); } } } return Result.OK(bList); } }
package com.xxx.modules.chunjun.desreport.task;
import com.xxx.modules.chunjun.desreport.entity.scheduled.ScheduledTask;
public class ScheduledOfTask implements Runnable {
private ScheduledTask scheduledTask;
public ScheduledOfTask(ScheduledTask scheduledTask) {
this.scheduledTask = scheduledTask;
}
public ScheduledTask getScheduledTask() {
return this.scheduledTask;
}
@Override
public void run() {
System.out.println(scheduledTask.getId() + ":开始执行!");
}
}
2.7 执行器业务类
package com.trasen.modules.chunjun.desreport.task;
import com.alibaba.fastjson.JSONObject;
import com.trasen.modules.chunjun.desreport.entity.scheduled.ScheduledTask;
import com.trasen.modules.chunjun.desreport.service.IReportExportExcel;
import cn.hutool.extra.spring.SpringUtil;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class GenerateExcelTask extends ScheduledOfTask {
public GenerateExcelTask(ScheduledTask scheduledTask) {
super(scheduledTask);
}
@Override
public void run() {
ScheduledTask scheduledTask = super.getScheduledTask();
// 具体业务,无需关心
// String id = scheduledTask.getId();
// JSonObject jsonObject = new JSonObject();
// jsonObject.put("taskId", id);
// jsonObject.put("pageNo", 1);
// jsonObject.put("pageSize", 10000);
// IReportExportExcel reportExportExcel = SpringUtil.getBean(IReportExportExcel.class);
// reportExportExcel.generateExcelUploadMinIO(jsonObject);
log.info("任务id:{},任务说明:{},cron表达式:{}", id, scheduledTask.getTaskExplain(), scheduledTask.getCronexpression());
}
}
2.8 项目启动时初始化定时任务
package com.xxx.modules.chunjun.desreport.task;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import com.trasen.modules.chunjun.desreport.service.IScheduledTaskService;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Component
public class ScheduledTaskRunner implements ApplicationRunner {
@Autowired
private IScheduledTaskService scheduledTaskService;
@Override
public void run(ApplicationArguments args) throws Exception {
log.info("----初始化定时任务开始----");
scheduledTaskService.initTask();
log.info("----初始化定时任务完成----");
}
}



