当当网开源的分布式定时任务框架,后被apache基金会支持;
官方文档:https://shardingsphere.apache.org/elasticjob/current/cn/overview/
问:ElasticJob是什么?
答:定时任务框架;
优势:
-
支持分布式部署;不同节点上执行的是不一样的任务(代码是同一套);对于一个大任务,可以用分片策略,让他在多节点上执行;
-
能够保证高可用;
-
利用zk实现分布式环境管理;
-
水平扩展(核心)
例如: 定义了10个分片(对应的片名是0-9),假设我们的定时任务是每1分钟执行一次,定时方法是execute。 当我们只有一台服务器的时候,那么每1分钟会调用十次execute(每次调用的时候分片名(0-9)都不一样)。 当我们有两台服务器的时候,那么每1分钟A、B服务器各自调用五次execute(每次调用的时候分片名(A 0-4,B 5-9) 当有三台服务器的时候A(3个),B(3个),C(4个),这样水平扩展就很容易了。快速入门(Demo)
1.建库建表想实际操作,可以copy下面代码练手
-- 建库 CREATE DATAbase `elastic_job_demo` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci'; -- 建表 DROp TABLE IF EXISTS `t_file`; CREATE TABLE `t_file` ( `id` varchar(11) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL, `name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `type` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `content` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `backedUp` tinyint(1) NULL DEFAULT NULL, PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;2.pom依赖
3.核心配置+任务调度4.0.0 org.springframework.boot spring-boot-starter-parent 2.1.5.RELEASE com.example.elasticjob elasticjobdemo 0.0.1-SNAPSHOT elasticjobdemo Demo project for Spring Boot 1.8 org.springframework.boot spring-boot-starter org.projectlombok lombok true org.springframework.boot spring-boot-starter-test test org.springframework.boot spring-boot-starter-jdbc org.springframework.boot spring-boot-starter-web org.apache.shardingsphere.elasticjob elasticjob-lite-core 3.0.0-RC1 org.apache.shardingsphere.elasticjob elasticjob-lite-spring-boot-starter 3.0.0-RC1 org.apache.shardingsphere.elasticjob elasticjob-error-handler-email 3.0.0-RC1 mysql mysql-connector-java 8.0.15 org.springframework.boot spring-boot-maven-plugin 2.1.5.RELEASE org.projectlombok lombok
步骤:
1.配置zookeeper调动中心
2.配置ElasticJob核心配置
3.调度定时任务
- 1.zookeeper配置类(创建zk客户端,且调用init方法)
package com.example.elasticjob.quickStart.config;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ZookepeerConfig {
//zookeeper链接字符串 localhost:2181
private String ZOOKEEPER_CONNECTION_STRING = "localhost:2181" ;
//定时任务命名空间
private String JOB_NAMESPACE = "elastic-job-boot-java";
//zk的配置及创建注册中心
@Bean(initMethod = "init")
public CoordinatorRegistryCenter setUpRegistryCenter(){
//zk的配置
ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(ZOOKEEPER_CONNECTION_STRING, "elastic-job-boot-java");
//设置zk超时时间
zookeeperConfiguration.setSessionTimeoutMilliseconds(1000);
//创建注册中心
CoordinatorRegistryCenter zookeeperRegistryCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
return zookeeperRegistryCenter;
}
}
- 2.配置esjob+任务调度
package com.example.elasticjob.quickStart.config;
import com.example.elasticjob.quickStart.job.FileBackupJobDb;
import com.example.elasticjob.quickStart.service.FileService;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ElasticJobConfig {
@Autowired
CoordinatorRegistryCenter registryCenter;
@Autowired
FileService fileService;
@Bean
public JobConfiguration createJobConfiguration() {
// 定义作业核心配置
JobConfiguration jobConfig = JobConfiguration.newBuilder("myJob", 3)
.cron("0/5 * * * * ?")
.shardingItemParameters("0=text,1=image,2=radio")
.failover(true)
.overwrite(true)
.monitorExecution(true)
.misfire(true).build();
//启动分布式定时任务
new ScheduleJobBootstrap(registryCenter, new FileBackupJobDb(fileService), jobConfig).schedule();
return jobConfig;
}
}
4.其他
- FileService类
package com.example.elasticjob.quickStart.service;
import com.example.elasticjob.quickStart.pojo.FileCustom;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import java.util.List;
@Component
public class FileService {
@Autowired
JdbcTemplate jdbcTemplate;
public List fetchUnBackupFiles(String fileType, Integer count){
String sql="select * from t_file where type = ? and backedUp = 0 limit 0,?";
List files = jdbcTemplate.query(sql, new Object[]{fileType, count}, new BeanPropertyRowMapper(FileCustom.class));
return files;
}
public void backupFiles(List files){
for(FileCustom file:files){
String sql="update t_file set backedUp = 1 where id = ?";
jdbcTemplate.update(sql,new Object[]{file.getId()});
System.out.println(String.format("线程 %d | 已备份文件:%s 文件类型:%s"
,Thread.currentThread().getId()
,file.getName()
,file.getType()));
}
}
}
- FileCustom实体
package com.example.elasticjob.quickStart.pojo;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
public class FileCustom {
private String id;
private String name;
private String type;
private String content;
private Boolean backedUp = false;
public FileCustom(String id, String name, String type, String content){
this.id = id;
this.name = name;
this.type = type;
this.content = content;
}
}
- 数据源
package com.example.elasticjob.quickStart.config;
import org.apache.commons.dbcp.BasicDataSource;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;
@Configuration
public class DataSourceConfig {
@Value("${spring.datasource.driver}")
private String driverClassName;
@Value("${spring.datasource.url}")
private String url;
@Value("${spring.datasource.username}")
private String username;
@Value("${spring.datasource.password}")
private String password;
@Bean(name = "myDataSource")
public DataSource getMyDataSource() {
BasicDataSource result = new BasicDataSource();
result.setDriverClassName(driverClassName);
result.setUrl(url);
result.setUsername(username);
result.setPassword(password);
return result;
}
}
- yml
server:
port: ${port:8081}
esjob:
zkServerlists: localhost:2181
zkNamespace: es-job-cupid
startedTimeoutMilliseconds: 500
completedTimeoutMilliseconds: 500
spring:
datasource:
url: jdbc:mysql://localhost:3306/elastic_job_demo?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=UTC&allowPublicKeyRetrieval=true
username: root
password: 1234
driver: com.mysql.jdbc.Driver
基于yml的方式配置ElasticJob
如果修改yml文件不生效,可有如下两个办法
无需进行 JobConfiguration配置 + zookeeper配置
- 改一下zookeeper的名命空间
- 配置文件job下面添加 overwrite: true
org.springframework.boot spring-boot-starter 2.2.0.RELEASE org.springframework.boot spring-boot-starter-web 2.2.0.RELEASE org.apache.shardingsphere.elasticjob elasticjob-lite-spring-boot-starter 3.0.0-RC1
这个starter里面有数据库相关的连接,我们只是简单测试,不想配置数据源的话,改一下启动类注解即可
@SpringBootApplication(exclude = { DataSourceAutoConfiguration.class })
2.编写作业
- 普通作业
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import org.springframework.stereotype.Component;
@Component
public class MyElasticJob implements SimpleJob {
@Override
public void execute(ShardingContext context) {
System.out.println(context.getShardingTotalCount() + " " + context.getShardingItem());
switch (context.getShardingItem()) {
case 0:
// do something by sharding item 0
break;
case 1:
// do something by sharding item 1
break;
case 2:
// do something by sharding item 2
break;
// case n: ...
}
}
}
- 数据流作业
import com.xdx97.elasticjob.bean.XdxBean; import org.apache.shardingsphere.elasticjob.api.ShardingContext; import org.apache.shardingsphere.elasticjob.dataflow.job.DataflowJob; import org.springframework.stereotype.Component; import java.util.ArrayList; import java.util.List; @Component public class MyDataflowJob implements DataflowJob{ @Override public List fetchData(ShardingContext shardingContext) { List foos = new ArrayList<>(); double random = Math.random(); System.out.println("fetchData------ " + random); if (random > 0.5){ XdxBean foo = new XdxBean(); foo.setName("aaa"); foos.add(foo); } return foos; } @Override public void processData(ShardingContext shardingContext, List list) { for (XdxBean xdxBean : list) { System.out.println("processData方法开始处理! " + xdxBean.getNum() + " " + "当前分片:" + shardingContext.getShardingParameter() + " " + "当前分片项:" + shardingContext.getShardingItem()); } } }
@Data
@NoArgsConstructor
public class XdxBean {
private String name;
}
注:Math.random() 产生的数据在0-1之间。
从上面运行的结果,我们可以得出结论,所谓的数据流作业其实也是一个定时任务,只不过当这个定时任务产生数据的时候,就会携带数据去调用processData()方法
3.yml配置文件server:
port: 8085
elasticjob:
regCenter:
#zookeeper 的ip:port
serverLists: 127.0.0.1:2181
#名命空间,自己定义就好了
namespace: my-job4
jobs:
#你的这个定时任务名称,自定义名称
myElasticJob:
#定时任务的全路径名
elasticJobClass: com.elastic.job.MyElasticJob
#定时任务执行的cron表达式
cron: 0/5 * * * * ?
#分片数量
shardingTotalCount: 10
作业配置yml参数
配置前缀:elasticjob.jobs
可配置属性:
| 属性名 | 是否必填 |
|---|---|
| elasticJobClass / elasticJobType | 是 |
| cron | 否 |
| timeZone | 否 |
| jobBootstrapBeanName | 否 |
| sharding-total-count | 是 |
| sharding-item-parameters | 否 |
| job-parameter | 否 |
| monitor-execution | 否 |
| failover | 否 |
| misfire | 否 |
| max-time-diff-seconds | 否 |
| reconcile-interval-minutes | 否 |
| job-sharding-strategy-type | 否 |
| job-executor-service-handler-type | 否 |
| job-error-handler-type | 否 |
| job-listener-types | 否 |
| description | 否 |
| props | 否 |
| disabled | 否 |
| overwrite | 否 |
配置前缀:elasticjob.reg-center
可配置属性:
| 属性名 | 是否必填 |
|---|---|
| server-lists | 是 |
| namespace | 是 |
| base-sleep-time-milliseconds | 否 |
| max-sleep-time-milliseconds | 否 |
| max-retries | 否 |
| session-timeout-milliseconds | 否 |
| connection-timeout-milliseconds | 否 |
| digest |
- 下载ElasticJob-Lite-UI
链接:https://shardingsphere.apache.org/elasticjob/current/cn/downloads/
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-zpafnxpE-1640855259419)(https://gitee.com/li_hewei/img/raw/master/images/1640855010(1)].jpg)
-
解压后在bin目录启动 start.bat
-
启动后游览器访问(默认端口是8088):http://127.0.0.1:8088/#/login 用户名/密码 root/root
-
登录成功后,链接上注册中心,链接成功后便可以进行任务的管理
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-r60QcxL8-1640855259420)(https://gitee.com/li_hewei/img/raw/master/images/1640855122(1)].jpg)
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-URQAijSD-1640855259420)(https://gitee.com/li_hewei/img/raw/master/images/1640855142(1)].jpg)
链接成功后即可在UI中修改定时任务相关配置。



