栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

springboot使用ThreadPoolTaskScheduler实现动态定时任务管理

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

springboot使用ThreadPoolTaskScheduler实现动态定时任务管理

1. 需求说明

1.1 定时任务的动态管理;

1.2 单个执行器可配置多个cron表达式,及多次不同规则执行;

1.3 创建定时任务时,可将业务参数传入到执行器中进行业务关联。

        以上需求,在网上搜索,基本都是只实现了1.1,一个执行器对应一个cron表达式,千篇一律的代码复制、粘贴。又不想花太多的时间去了解xxl-job,因为一来项目上用不到定时任务分布式管理,二来也大概搜索过,没发现能完全满足以上需求的文章。所以,只能自己想办法解决了。

        实现思路:利用反射找到类及有参构造函数。

2. 实现过程

PS:代码内有相关业务类和代码,实际使用时删除,只需要关心定时任务的动态管理代码就行了

2.1 创建定时任务配置表和java对象
create table `cj_scheduled_task` (
  `id` varchar(50) primary key comment '主键id',
  `task_class` varchar(100) not null comment '定时任务完整类名',
  `cron_expression` varchar(20) not null comment 'cron表达式',
  `task_explain` varchar(200) default null comment '任务描述',
  `status` tinyint(1) not null comment '状态:1.启用;2.停用',
  `create_by` varchar(50) comment '创建人',
  `create_time` datetime comment '创建时间',
  `update_by` varchar(50) comment '修改人',
  `update_time` datetime comment '修改时间'
)
ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4
COMMENT = '定时任务';
package com.xxx.modules.chunjun.desreport.entity.scheduled;// 包路径,需要自己修改,下同

import java.io.Serializable;
import java.util.Date;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;

import io.swagger.annotations.ApiModelProperty;
import lombok.Data;


@TableName("cj_scheduled_task")
@Data
public class ScheduledTask implements Serializable {
    private static final long serialVersionUID = 1L;

    
    @TableId(type = IdType.ASSIGN_UUID)
    @ApiModelProperty(value = "主键id")
    private String id;

    
    @TableField("task_name")
    @ApiModelProperty(value = "定时任务名称")
    private String taskName;

    
    @TableField("task_class")
    @ApiModelProperty(value = "定时任务完整类名")
    private String taskClass;

    
    @TableField("cron_expression")
    @ApiModelProperty(value = "cron表达式")
    private String cronexpression;

    
    @TableField("task_explain")
    @ApiModelProperty(value = "任务描述")
    private String taskExplain;

    
    @TableField("status")
    @ApiModelProperty(value = "状态:1.启用;2.停用")
    private int status;

    
    @TableField("create_by")
    @ApiModelProperty(value = "创建人")
    private String createBy;

    
    @TableField("create_time")
    @ApiModelProperty(value = "创建时间")
    private Date createTime;

    
    @TableField("update_by")
    @ApiModelProperty(value = "修改人")
    private String updateBy;

    
    @TableField("update_time")
    @ApiModelProperty(value = "修改时间")
    private Date updateTime;
}
package com.xxx.modules.chunjun.desreport.model;

import java.io.Serializable;
import java.util.List;

import com.xxx.modules.chunjun.desreport.entity.scheduled.ScheduledTask;
import com.xxx.modules.chunjun.desreport.entity.scheduled.ScheduledTaskReport;

import io.swagger.annotations.ApiModelProperty;
import lombok.Getter;
import lombok.Setter;


@Getter
@Setter
public class ScheduledTaskVO extends ScheduledTask implements Serializable {
    private static final long serialVersionUID = 1L;

    
    @ApiModelProperty(value = "任务报表关联集")
    private List strList;

    
    @ApiModelProperty(value = "批量删除id集")
    private List ids;
}
2.3 ThreadPoolTaskScheduler配置类
package com.xxx.modules.chunjun.config.scheduled;// 包路径,需要自己修改

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

import lombok.extern.slf4j.Slf4j;


@Configuration
@Slf4j
public class ScheduledConfig {

    @Bean
    public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
        log.info("创建定时任务调度线程池 start");
        ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
        threadPoolTaskScheduler.setPoolSize(20);
        threadPoolTaskScheduler.setThreadNamePrefix("taskExecutor-");
        threadPoolTaskScheduler.setWaitForTasksToCompleteonShutdown(true);
        threadPoolTaskScheduler.setAwaitTerminationSeconds(60);
        log.info("创建定时任务调度线程池 end");
        return threadPoolTaskScheduler;
    }
}
2.4 定时任务管理服务接口
package com.xxx.modules.chunjun.desreport.service;

import java.util.List;

import com.xxx.modules.chunjun.common.vo.Result;
import com.xxx.modules.chunjun.desreport.entity.scheduled.ScheduledTask;
import com.xxx.modules.chunjun.desreport.model.ScheduledTaskVO;


public interface IScheduledTaskService {

    
    List listAll();

    
    Result pageList(String taskClass);

    
    ScheduledTask getById(String id);

    
    Result start(String id);

    
    Result stop(String id);

    
    Result restart(String id);

    
    Result update(ScheduledTaskVO st);

    
    Result insert(ScheduledTaskVO st);

    
    void initTask();

    
    Result pageListActuator(String keywords);
}
2.5 定时任务管理服务实现

属性说明:

taskbaseTaskList:执行器列表,写到配置文件中,如:

task:
  baseTaskList: 生成excel:com.trasen.modules.chunjun.desreport.task.GenerateExcelTask
package com.xxx.modules.chunjun.desreport.service.impl;

import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

import com.alibaba.fastjson.JSONObject;
import com.xxx.modules.chunjun.common.base.query.CriteriaQuery;
import com.xxx.modules.chunjun.common.base.service.baseServiceImpl;
import com.xxx.modules.chunjun.common.util.LoginUserUtil;
import com.xxx.modules.chunjun.common.vo.Result;
import com.xxx.modules.chunjun.desreport.entity.scheduled.ScheduledTask;
import com.xxx.modules.chunjun.desreport.entity.scheduled.ScheduledTaskReport;
import com.xxx.modules.chunjun.desreport.mapper.ScheduledTaskMapper;
import com.xxx.modules.chunjun.desreport.model.ScheduledTaskActuator;
import com.xxx.modules.chunjun.desreport.model.ScheduledTaskVO;
import com.xxx.modules.chunjun.desreport.service.IScheduledTaskReportService;
import com.xxx.modules.chunjun.desreport.service.IScheduledTaskService;
import com.xxx.modules.chunjun.desreport.task.ScheduledOfTask;

import lombok.extern.slf4j.Slf4j;


@Slf4j
@Service
public class ScheduledTaskServiceImpl extends baseServiceImpl
    implements IScheduledTaskService {

    @Value("${task.baseTaskList}")
    private String taskbaseTaskList;

    
    private ReentrantLock lock = new ReentrantLock();

    
    @Autowired
    private ThreadPoolTaskScheduler threadPoolTaskScheduler;
    @Autowired
    private IScheduledTaskReportService strService;

    
    @SuppressWarnings("rawtypes")
    public Map scheduledFutureMap = new ConcurrentHashMap();

    @Override
    public List listAll() {
        return this.list();
    }

    @Override
    public Result pageList(String taskClass) {
        CriteriaQuery cq = new CriteriaQuery(this.request);
        cq.like(null != taskClass && !taskClass.isEmpty(), ScheduledTask::getTaskClass, taskClass);
        return this.pageList(cq);
    }

    @Override
    public ScheduledTask getById(String id) {
        return super.getById(id);
    }

    @Override
    public Result start(String id) {
        // 根据id查询任务
        ScheduledTask task = super.getById(id);
        // 判断任务是否启用
        if (1 != task.getStatus()) {
            return Result.error("定时任务未启用,无法执行!");
        }
        String taskClass = task.getTaskClass();
        log.info("启动定时任务:" + taskClass);
        // 添加锁放一个线程启动,防止多人启动多次
        lock.lock();
        log.info("加锁完成");
        try {
            if (this.isStart(id)) {
                String msg = "当前任务在启动状态中";
                log.info(msg);
                return Result.error(msg);
            }
            // 任务启动
            this.doStartTask(task);
        } finally {
            lock.unlock();
            log.info("解锁完毕");
        }
        return Result.OK();
    }

    @SuppressWarnings("rawtypes")
    @Override
    public Result stop(String id) {
        // 根据id查询任务
        log.info("停止任务: " + id);
        boolean flag = scheduledFutureMap.containsKey(id);
        log.info("当前实例是否存在 " + flag);
        if (flag) {
            ScheduledFuture scheduledFuture = scheduledFutureMap.get(id);
            scheduledFuture.cancel(true);
            scheduledFutureMap.remove(id);
        } else {
            String msg = "当前任务不存在!";
            log.info(msg + id);
            Result.error(msg);
        }
        return Result.OK();
    }

    @Override
    public Result restart(String id) {
        log.info("重启定时任务:" + id);
        // 停止
        this.stop(id);
        // 启动
        return this.start(id);
    }

    @Override
    public Result update(ScheduledTaskVO st) {
        // 赋值更新人、更新时间
        st.setUpdateBy(LoginUserUtil.getCurrentUserId());
        st.setUpdateTime(new Date());
        if (!super.updateById(st)) {
            String msg = "新增失败!";
            log.error(msg);
            return Result.error(msg);
        }
        // 批量保存关联数据
        JSonObject params = new JSonObject();
        List strList = st.getStrList();
        if (null != strList && !strList.isEmpty()) {
            // 赋值任务id
            for (ScheduledTaskReport str : strList) {
                str.setTaskId(st.getId());
            }
        }
        params.put("strList", strList);
        params.put("ids", st.getIds());
        if (!strService.saveBatch(params).isSuccess()) {
            String msg = "批量保存失败!";
            log.error(msg);
            return Result.error(msg);
        }
        return Result.OK();
    }

    @Override
    @Transactional
    public Result insert(ScheduledTaskVO st) {
        // 赋值创建人、创建时间
        st.setCreateBy(LoginUserUtil.getCurrentUserId());
        st.setCreateTime(new Date());
        // 新增定时任务
        if (!super.save(st)) {
            String msg = "新增失败!";
            log.error(msg);
            return Result.error(msg);
        }
        // 新增关联数据
        JSonObject params = new JSonObject();
        List strList = st.getStrList();
        if (null != strList && !strList.isEmpty()) {
            // 赋值任务id
            for (ScheduledTaskReport str : strList) {
                str.setTaskId(st.getId());
            }
            params.put("strList", strList);
            if (!strService.saveBatch(params).isSuccess()) {
                String msg = "批量新增失败!";
                log.error(msg);
                return Result.error(msg);
            }
        }
        return Result.OK();
    }

    @Override
    public void initTask() {
        List stList = super.list();
        if (null != stList && !stList.isEmpty()) {
            // 循环所有定时任务
            for (ScheduledTask st : stList) {
                // 判断是否启用
                if (1 != st.getStatus()) {
                    continue;
                }
                // 执行任务
                doStartTask(st);
            }
        }
    }

    
    @SuppressWarnings(value = {"unchecked", "rawtypes"})
    private void doStartTask(ScheduledTask scheduledTask) {
        String taskClass = scheduledTask.getTaskClass();
        log.info(taskClass);
        if (1 != scheduledTask.getStatus()) {
            return;
        }
        Class clazz;
        ScheduledOfTask task;
        try {
            // 通过类全路径找到类
            clazz = Class.forName(taskClass);
            // 获取构造函数
            Constructor cons = clazz.getConstructor(ScheduledTask.class);
            // 实例化对象,传入参数,用于执行器关联业务
            task = (ScheduledOfTask)cons.newInstance(scheduledTask);
            // 检查执行器是否继承父类,如果未继承,则直接中断
            Assert.isAssignable(ScheduledOfTask.class, task.getClass(), "定时任务业务类必须继承ScheduledOfTask父类");
            // 实例化任务触发器
            CronTrigger cronTrigger = new CronTrigger(scheduledTask.getCronexpression());
            // 实例化定时任务
            ScheduledFuture scheduledFuture = threadPoolTaskScheduler.schedule(task,
                (triggerContext -> cronTrigger.nextExecutionTime(triggerContext)));
            // 存入定时任务,用于查找和停用
            scheduledFutureMap.put(scheduledTask.getId(), scheduledFuture);
        } catch (Exception e) {
            log.error("定时任务【" + taskClass + "】启动异常。", e);
        }
    }

    
    private Boolean isStart(String id) {
        // 校验是否已经启动
        if (scheduledFutureMap.containsKey(id)) {
            if (!scheduledFutureMap.get(id).isCancelled()) {
                return true;
            }
        }
        return false;
    }

    @Override
    public Result pageListActuator(String keywords) {
        // 从配置文件读取并初始化执行器数据
        String[] taskbaseTaskArray = taskbaseTaskList.split(",");
        List bList = new ArrayList();
        if (null != taskbaseTaskArray && taskbaseTaskArray.length > 0) {
            for (int i = 0; i < taskbaseTaskArray.length; i++) {
                String[] taskbaseTaskAttr = taskbaseTaskArray[i].split(":");
                String tempName = taskbaseTaskAttr[0];
                String tempClass = taskbaseTaskAttr[1];
                boolean isAdd = false;
                // 判断并添加条件
                if (!StringUtils.isEmpty(keywords)) {
                    // 添加名称模糊查询条件
                    // 模糊查询 使用(Pattern、Matcher)
                    Pattern pattern1 = Pattern.compile(keywords);
                    Pattern pattern2 = Pattern.compile(keywords);
                    Matcher matcher1 = pattern1.matcher(tempName);
                    Matcher matcher2 = pattern2.matcher(tempClass);
                    if (matcher1.find() || matcher2.find()) {
                        isAdd = true;
                    }
                } else {
                    isAdd = true;
                }

                if (isAdd) {
                    ScheduledTaskActuator temp = new ScheduledTaskActuator();
                    temp.setActuatorName(taskbaseTaskAttr[0]);
                    temp.setTaskClass(taskbaseTaskAttr[1]);
                    bList.add(temp);
                }
            }
        }

        return Result.OK(bList);
    }
}
2.6 执行器父类
package com.xxx.modules.chunjun.desreport.task;

import com.xxx.modules.chunjun.desreport.entity.scheduled.ScheduledTask;


public class ScheduledOfTask implements Runnable {

    
    private ScheduledTask scheduledTask;

    
    public ScheduledOfTask(ScheduledTask scheduledTask) {
        this.scheduledTask = scheduledTask;
    }

    
    public ScheduledTask getScheduledTask() {
        return this.scheduledTask;
    }

    
    @Override
    public void run() {
        System.out.println(scheduledTask.getId() + ":开始执行!");
    }
}
2.7 执行器业务类
package com.trasen.modules.chunjun.desreport.task;

import com.alibaba.fastjson.JSONObject;
import com.trasen.modules.chunjun.desreport.entity.scheduled.ScheduledTask;
import com.trasen.modules.chunjun.desreport.service.IReportExportExcel;

import cn.hutool.extra.spring.SpringUtil;
import lombok.extern.slf4j.Slf4j;


@Slf4j
public class GenerateExcelTask extends ScheduledOfTask {
    public GenerateExcelTask(ScheduledTask scheduledTask) {
        super(scheduledTask);
    }

    @Override
    public void run() {
        ScheduledTask scheduledTask = super.getScheduledTask();
        // 具体业务,无需关心
        // String id = scheduledTask.getId();
        // JSonObject jsonObject = new JSonObject();
        // jsonObject.put("taskId", id);
        // jsonObject.put("pageNo", 1);
        // jsonObject.put("pageSize", 10000);
        // IReportExportExcel reportExportExcel = SpringUtil.getBean(IReportExportExcel.class);
        // reportExportExcel.generateExcelUploadMinIO(jsonObject);
        log.info("任务id:{},任务说明:{},cron表达式:{}", id, scheduledTask.getTaskExplain(), scheduledTask.getCronexpression());
    }
}
2.8 项目启动时初始化定时任务
package com.xxx.modules.chunjun.desreport.task;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import com.trasen.modules.chunjun.desreport.service.IScheduledTaskService;

import lombok.extern.slf4j.Slf4j;


@Slf4j
@Component
public class ScheduledTaskRunner implements ApplicationRunner {

    @Autowired
    private IScheduledTaskService scheduledTaskService;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        log.info("----初始化定时任务开始----");
        scheduledTaskService.initTask();
        log.info("----初始化定时任务完成----");
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/686490.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号