public class MyElasticJob implements SimpleJob {
@Override
public void execute(ShardingContext context) {
switch (context.getShardingItem()) {
case 0:
// do something by sharding item 0
break;
case 1:
// do something by sharding item 1
break;
case 2:
// do something by sharding item 2
break;
// case n: ...
}
}
}
public class MyElasticJob implements DataflowJob {
@Override
public List fetchData(ShardingContext context) {
switch (context.getShardingItem()) {
case 0:
List data = // get data from database by sharding item 0
return data;
case 1:
List data = // get data from database by sharding item 1
return data;
case 2:
List data = // get data from database by sharding item 2
return data;
// case n: ...
}
}
@Override
public void processData(ShardingContext shardingContext, List data) {
// process data
// ...
}
}
public final class AverageAllocationJobShardingStrategy implements JobShardingStrategy {
// 先循环8/3
private Map> shardingAliquot(final List shardingUnits, final int shardingTotalCount) {
Map> result = new linkedHashMap<>(shardingTotalCount, 1);
int itemCountPerSharding = shardingTotalCount / shardingUnits.size();
int count = 0;
for (JobInstance each : shardingUnits) {
List shardingItems = new ArrayList<>(itemCountPerSharding + 1);
for (int i = count * itemCountPerSharding; i < (count + 1) * itemCountPerSharding; i++) {
shardingItems.add(i);
}
result.put(each, shardingItems);
count++;
}
return result;
}
// 先循环8%3
private void addAliquant(final List shardingUnits, final int shardingTotalCount, final Map> shardingResults) {
int aliquant = shardingTotalCount % shardingUnits.size();
int count = 0;
for (Map.Entry> entry : shardingResults.entrySet()) {
if (count < aliquant) {
entry.getValue().add(shardingTotalCount / shardingUnits.size() * shardingUnits.size() + count);
}
count++;
}
}
}
(4) 弹性扩容+失效转移 (demo演示)
(5) 作业监听器
可通过配置多个任务监听器,在任务执行前和执行后执行监听的方法。
@Component
public class MyElasticJobListener implements ElasticJobListener {
@Override
public void beforeJobExecuted(ShardingContexts shardingContexts) {
// do something ...
}
@Override
public void afterJobExecuted(ShardingContexts shardingContexts) {
// do something ...
}
}
(6) 定制化处理
若job运行异常,可个性化异常处理器,进行异常干预
@Slf4j
public final class DefaultJobExceptionHandler implements JobExceptionHandler {
@Override
public void handleException(final String jobName, final Throwable cause) {
log.error(String.format("Job '%s' exception occur in job processing", jobName), cause);
}
}
public final class LiteJobConfiguration implements JobRootConfiguration {
private final JobTypeConfiguration typeConfig; //作业类型配置
private final String jobShardingStrategyClass; //作业分片策略实现类全路径
private final boolean monitorExecution; // 监控作业运行时状态。默认为 false
private final int monitorPort; // 作业监控端口
private final int maxTimeDiffSeconds; // 作业监控端口// 设置最大容忍的本机与注册中心的时间误差秒数。默认为 -1,不检查时间误差。选填。
private final int reconcileIntervalMinutes; // 修复作业服务器不一致状态服务调度间隔时间,配置为小于1的任意值表示不执行修复。默认为 10
private final boolean disabled; //作业是否禁用执行
private final boolean overwrite; // 本地配置覆盖注册中心作业配置
}
JobCoreConfiguration
public final class JobCoreConfiguration {
private final String jobName; //作业名称
private final String cron; //作业定时任务
private final int shardingTotalCount; // 作业分片总数
private final String shardingItemParameters; // 分片序列号和参数 例如0=a,1=b,2=c
private final String jobParameter; //作业自定义参数
private final boolean failover; // 是否开启作业执行时效转移
private final boolean misfire; // 是否开启错误作业重新执行
private final String description; // 作业描述
private final JobProperties jobProperties; // 作业属性配置
}
JobProperties
public final class JobProperties {
private EnumMap map = new EnumMap<>(JobPropertiesEnum.class);
public enum JobPropertiesEnum {
JOB_EXCEPTION_HANDLER("job_exception_handler", JobExceptionHandler.class, DefaultJobExceptionHandler.class.getCanonicalName()),
EXECUTOR_SERVICE_HANDLER("executor_service_handler", ExecutorServiceHandler.class, DefaultExecutorServiceHandler.class.getCanonicalName());
private final String key;
private final Class> classType;
private final String defaultValue;
}
}
(2) 作业初始化
关系图
JobRegistry (作业注册表) 单例
public final class JobRegistry {
private static volatile JobRegistry instance;
private Map schedulerMap = new ConcurrentHashMap<>();
private Map regCenterMap = new ConcurrentHashMap<>();
private Map jobInstanceMap = new ConcurrentHashMap<>();
private Map jobRunningMap = new ConcurrentHashMap<>();
private Map currentShardingTotalCountMap = new ConcurrentHashMap<>();
public static JobRegistry getInstance() {
if (null == instance) {
synchronized (JobRegistry.class) {
if (null == instance) {
instance = new JobRegistry();
}
}
}
return instance;
}
// .... 省略方法
}
JobSchedule (作业调度器)
public class JobScheduler {
private final LiteJobConfiguration liteJobConfig;
private final CoordinatorRegistryCenter regCenter;
@Getter
private final SchedulerFacade schedulerFacade;
private final JobFacade jobFacade;
public JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final ElasticJobListener... elasticJobListeners) {
this(regCenter, liteJobConfig, new JobEventBus(), elasticJobListeners);
}
public JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final JobEventConfiguration jobEventConfig,
final ElasticJobListener... elasticJobListeners) {
this(regCenter, liteJobConfig, new JobEventBus(jobEventConfig), elasticJobListeners);
}
private JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final JobEventBus jobEventBus, final ElasticJobListener... elasticJobListeners) {
// 添加 作业运行实例
JobRegistry.getInstance().addJobInstance(liteJobConfig.getJobName(), new JobInstance());
// 设置 Lite作业配置
this.liteJobConfig = liteJobConfig;
this.regCenter = regCenter;
// 设置 作业监听器
List elasticJobListenerList = Arrays.asList(elasticJobListeners);
setGuaranteeServiceForElasticJobListeners(regCenter, elasticJobListenerList);
// 设置 调度器门面对象
schedulerFacade = new SchedulerFacade(regCenter, liteJobConfig.getJobName(), elasticJobListenerList);
// 设置 作业门面对象
jobFacade = new LiteJobFacade(regCenter, liteJobConfig.getJobName(), Arrays.asList(elasticJobListeners), jobEventBus);
}
}
public void init() {
// 更新 作业配置
LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig);
// 设置 当前作业分片总数
JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());
// 创建 作业调度控制器
JobScheduleController jobScheduleController = new JobScheduleController(
createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());
// 添加 作业调度控制器
JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter);
// 注册 作业启动信息
schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());
// 调度作业
jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());
}
(3) 选举主节点
为什么要选举主节点
Elastic-Job-Lite 定位为轻量级无中心化解决方案,使用 jar 包的形式提供分布式任务的协调服务。
另外,主节点的选举是以作业为维度。例如:有一个 Elastic-Job-Lite 集群有三个作业节点 A、B、C,存在两个作业 a、b,可能 a 作业的主节点是 C,b 作业的主节点是 A。
关系图
选举
// LeaderService.java
public void electLeader() {
log.debug("Elect a new leader now.");
jobNodeStorage.executeInLeader(LeaderNode.LATCH, new LeaderElectionExecutionCallback());
log.debug("Leader election completed.");
}
// JobNodeStorage.java
public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) {
try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) {
latch.start();
latch.await();
callback.execute();
} catch (final Exception ex) {
handleException(ex);
}
}
// LeaderElectionExecutionCallback.java
class LeaderElectionExecutionCallback implements LeaderExecutionCallback {
@Override
public void execute() {
if (!hasLeader()) { // 当前无主节点
jobNodeStorage.fillEphemeralJobNode(LeaderNode.INSTANCE, JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
}
}
}
注:
使用 Curator LeaderLatch 分布式锁,保证同一时间有且仅有一个工作节点能够调用 LeaderElectionExecutionCallback#execute() 方法执行主节点设置
在 LeaderElectionExecutionCallback#execute() 为什么要调用 #hasLeader() 呢?LeaderLatch 只保证同一时间有且仅有一个工作节点,在获得分布式锁的工作节点结束逻辑后,第二个工作节点会开始逻辑,如果不判断当前是否有主节点,原来的主节点会被覆盖。
选举主节点时机 节点数据发生变化时
(4) 作业执行
关系图
LiteJob
JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(liteJobConfig.getJobName()).build();
=====================================================================
public final class LiteJob implements Job {
@Setter
private ElasticJob elasticJob;
@Setter
private JobFacade jobFacade;
@Override
public void execute(final JobExecutionContext context) throws JobExecutionException {
JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();
}
}
// SimpleJobExecutor.java
public final class SimpleJobExecutor extends AbstractElasticJobExecutor {
private final SimpleJob simpleJob;
public SimpleJobExecutor(final SimpleJob simpleJob, final JobFacade jobFacade) {
super(jobFacade);
this.simpleJob = simpleJob;
}
}
// DataflowJobExecutor.java
public final class DataflowJobExecutor extends AbstractElasticJobExecutor {
private final DataflowJob
public void setCrashedFailoverFlag(final int item) {
if (!isFailoverAssigned(item)) {
jobNodeStorage.createJobNodeIfNeeded(FailoverNode.getItemsNode(item));
}
}
public void failoverIfNecessary() {
if (needFailover()) {
jobNodeStorage.executeInLeader(FailoverNode.LATCH, new FailoverLeaderExecutionCallback());
}
}