在之前的博客中,博主介绍了Simple、Dataflow、script和HTTP作业:
- ElasticJob‐Lite:Simple & Dataflow作业
- ElasticJob‐Lite:script & HTTP作业
ElasticJob的作业分类基于class和type两种类型。基于class的作业需要开发者自行通过实现接口的方式织入业务逻辑;基于type的作业则无需编码,只需要提供相应配置即可。基于class的作业接口的方法参数shardingContext包含作业配置、片和运行时信息。可通过getShardingTotalCount()、getShardingItem()等方法分别获取分片总数和运行在本作业服务器的分片序列号等。
ElasticJob目前提供Simple、Dataflow这两种基于class的作业类型,并提供script、HTTP这两种基于type的作业类型,用户可通过实现SPI接口自行扩展作业类型。
本篇博客介绍如何通过实现SPI接口自行扩展作业类型和作业的一次性调度。
扩展作业类型想要通过实现SPI接口自行扩展作业类型需要三个步骤(基于class的作业类型),而基于type的作业类型只需要后面两个步骤:
- 定义作业逻辑的执行接口(基于type的作业类型不需要此步骤,比如script作业的作业逻辑由脚本程序执行,而HTTP作业的作业逻辑由请求的服务端执行,因此基于type的作业类型不需要定义作业逻辑的执行接口)。
- 实现作业逻辑执行接口的JobItemExecutor。
- 通过Java SPI的方式声明实现的JobItemExecutor。
KavenJob接口(继承ElasticJob接口,作业逻辑的执行接口):
package com.kaven.job.my;
import org.apache.shardingsphere.elasticjob.api.ElasticJob;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
public interface KavenJob extends ElasticJob {
void work(ShardingContext shardingContext, String jobExecutorName);
}
KavenJob接口的实现类MyKavenJob(实现作业逻辑的执行):
package com.kaven.job.my;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import java.text.SimpleDateFormat;
import java.util.Date;
public class MyKavenJob implements KavenJob{
private static final SimpleDateFormat formatter =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Override
public void work(ShardingContext shardingContext, String jobExecutorName) {
String job = shardingContext.getShardingParameter();
if(job == null || job.trim().equals("")) {
System.out.println("请指定帮[Kaven]执行的任务名称!");
throw new RuntimeException();
}
System.out.printf("%s[%s]:帮[Kaven]执行%s任务!n", jobExecutorName, formatter.format(new Date()), job);
}
}
内置的Simple和Dataflow作业的作业逻辑执行接口也是这样定义的:
package org.apache.shardingsphere.elasticjob.simple.job;
import org.apache.shardingsphere.elasticjob.api.ElasticJob;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
public interface SimpleJob extends ElasticJob {
void execute(ShardingContext var1);
}
package org.apache.shardingsphere.elasticjob.dataflow.job; import java.util.List; import org.apache.shardingsphere.elasticjob.api.ElasticJob; import org.apache.shardingsphere.elasticjob.api.ShardingContext; public interface DataflowJobextends ElasticJob { List fetchData(ShardingContext var1); void processData(ShardingContext var1, List var2); }
因此想要定义Simple或Dataflow作业就只需要分别实现SimpleJob或DataflowJob接口即可。
JobItemExecutorElasticJob的作业分类基于class和type两种类型,因此JobItemExecutor必须继承或实现ClassedJobItemExecutor或者TypedJobItemExecutor接口。
KavenJobItemExecutor接口(继承了ClassedJobItemExecutor,如果要继承TypedJobItemExecutor接口来扩展type类型作业的自定义JobItemExecutor,也是类似的):
package com.kaven.job.my; import org.apache.shardingsphere.elasticjob.executor.item.impl.ClassedJobItemExecutor; public interface KavenJobItemExecutor extends ClassedJobItemExecutor{ String getJobExecutorName(); }
KavenJobItemExecutor接口的实现类MyKavenJobExecutor:
package com.kaven.job.my;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.executor.JobFacade;
public class MyKavenJobExecutor implements KavenJobItemExecutor {
public MyKavenJobExecutor() {}
@Override
public Class getElasticJobClass() {
return KavenJob.class;
}
@Override
public void process(KavenJob kavenJob, JobConfiguration jobConfiguration, JobFacade jobFacade, ShardingContext shardingContext) {
kavenJob.work(shardingContext, getJobExecutorName());
}
@Override
public String getJobExecutorName() {
return this.getClass().getName();
}
}
很显然MyKavenJob类中的work方法将在MyKavenJobExecutor类的process方法中调用,这是由ElasticJob控制的。Simple、Dataflow、script和HTTP作业也是如此,在JobItemExecutor中执行作业的分片(以Simple和script作业为例):
package org.apache.shardingsphere.elasticjob.simple.executor; import org.apache.shardingsphere.elasticjob.api.JobConfiguration; import org.apache.shardingsphere.elasticjob.api.ShardingContext; import org.apache.shardingsphere.elasticjob.executor.JobFacade; import org.apache.shardingsphere.elasticjob.executor.item.impl.ClassedJobItemExecutor; import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob; public final class SimpleJobExecutor implements ClassedJobItemExecutor{ public SimpleJobExecutor() { } public void process(SimpleJob elasticJob, JobConfiguration jobConfig, JobFacade jobFacade, ShardingContext shardingContext) { elasticJob.execute(shardingContext); } public Class getElasticJobClass() { return SimpleJob.class; } }
package org.apache.shardingsphere.elasticjob.script.executor;
import com.google.common.base.Strings;
import java.io.IOException;
import java.util.Properties;
import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecutor;
import org.apache.shardingsphere.elasticjob.api.ElasticJob;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.executor.JobFacade;
import org.apache.shardingsphere.elasticjob.executor.item.impl.TypedJobItemExecutor;
import org.apache.shardingsphere.elasticjob.infra.exception.JobConfigurationException;
import org.apache.shardingsphere.elasticjob.infra.exception.JobSystemException;
import org.apache.shardingsphere.elasticjob.infra.json.GsonFactory;
public final class scriptJobExecutor implements TypedJobItemExecutor {
public scriptJobExecutor() {
}
public void process(ElasticJob elasticJob, JobConfiguration jobConfig, JobFacade jobFacade, ShardingContext shardingContext) {
CommandLine commandLine = CommandLine.parse(this.getscriptCommandLine(jobConfig.getProps()));
commandLine.addArgument(GsonFactory.getGson().toJson(shardingContext), false);
try {
(new DefaultExecutor()).execute(commandLine);
} catch (IOException var7) {
throw new JobSystemException("Execute script failure.", new Object[]{var7});
}
}
private String getscriptCommandLine(Properties props) {
String result = props.getProperty("script.command.line");
if (Strings.isNullOrEmpty(result)) {
throw new JobConfigurationException("Cannot find script command line, job is not executed.", new Object[0]);
} else {
return result;
}
}
public String getType() {
return "script";
}
}
声明JobItemExecutor
通过Java SPI的方式声明实现的JobItemExecutor,那为什么需要声明实现的JobItemExecutor?因为ElasticJob需要知道作业对应的JobItemExecutor,以便用它来执行该作业的分片。ElasticJob通过ScheduleJobBootstrap实例来完成定时任务的调度。
Application类(启动类):
package com.kaven.job;
import com.kaven.job.my.MyKavenJob;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
public class Application {
public static void main(String[] args) {
new ScheduleJobBootstrap(createRegistryCenter(), new MyKavenJob(),
createJobConfiguration()).schedule();
}
private static CoordinatorRegistryCenter createRegistryCenter() {
ZookeeperConfiguration zc = new ZookeeperConfiguration("192.168.1.200:9999", "my-job");
zc.setConnectionTimeoutMilliseconds(40000);
zc.setMaxRetries(5);
CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zc);
regCenter.init();
return regCenter;
}
private static JobConfiguration createJobConfiguration() {
String jobs = "0=看论文,1=做实验,2=打比赛,3=开组会,4=看书,5=其他";
return JobConfiguration.newBuilder("KavenJob", 6)
.cron("30 * * * * ?")
.shardingItemParameters(jobs)
.overwrite(true)
.failover(true)
.build();
}
}
创建ScheduleJobBootstrap实例,也会创建JobScheduler实例。
public ScheduleJobBootstrap(CoordinatorRegistryCenter regCenter, ElasticJob elasticJob, JobConfiguration jobConfig) {
this.jobScheduler = new JobScheduler(regCenter, elasticJob, jobConfig);
}
public ScheduleJobBootstrap(CoordinatorRegistryCenter regCenter, String elasticJobType, JobConfiguration jobConfig) {
this.jobScheduler = new JobScheduler(regCenter, elasticJobType, jobConfig);
}
而在创建JobScheduler实例时,还会创建ElasticJobExecutor实例。
public JobScheduler(CoordinatorRegistryCenter regCenter, ElasticJob elasticJob, JobConfiguration jobConfig) {
...
this.jobExecutor = new ElasticJobExecutor(elasticJob, this.jobConfig, this.jobFacade);
...
}
public JobScheduler(CoordinatorRegistryCenter regCenter, String elasticJobType, JobConfiguration jobConfig) {
...
this.jobExecutor = new ElasticJobExecutor(elasticJobType, this.jobConfig, this.jobFacade);
...
}
在创建ElasticJobExecutor实例时,会通过JobItemExecutorFactory类获取作业对应的JobItemExecutor。
public ElasticJobExecutor(final ElasticJob elasticJob, final JobConfiguration jobConfig, final JobFacade jobFacade) {
this(elasticJob, jobConfig, jobFacade, JobItemExecutorFactory.getExecutor(elasticJob.getClass()));
}
public ElasticJobExecutor(final String type, final JobConfiguration jobConfig, final JobFacade jobFacade) {
this(null, jobConfig, jobFacade, JobItemExecutorFactory.getExecutor(type));
}
JobItemExecutorFactory类:
package org.apache.shardingsphere.elasticjob.executor.item;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.elasticjob.api.ElasticJob;
import org.apache.shardingsphere.elasticjob.infra.exception.JobConfigurationException;
import org.apache.shardingsphere.elasticjob.executor.item.impl.ClassedJobItemExecutor;
import org.apache.shardingsphere.elasticjob.executor.item.impl.TypedJobItemExecutor;
import org.apache.shardingsphere.elasticjob.infra.spi.ElasticJobServiceLoader;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.ServiceLoader;
@SuppressWarnings("rawtypes")
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class JobItemExecutorFactory {
private static final Map CLASSED_EXECUTORS = new HashMap<>();
static {
ElasticJobServiceLoader.registerTypedService(TypedJobItemExecutor.class);
ServiceLoader.load(ClassedJobItemExecutor.class).forEach(each -> CLASSED_EXECUTORS.put(each.getElasticJobClass(), each));
}
@SuppressWarnings("unchecked")
public static JobItemExecutor getExecutor(final Class extends ElasticJob> elasticJobClass) {
for (Entry entry : CLASSED_EXECUTORS.entrySet()) {
if (entry.getKey().isAssignableFrom(elasticJobClass)) {
return entry.getValue();
}
}
throw new JobConfigurationException("Can not find executor for elastic job class `%s`", elasticJobClass.getName());
}
public static JobItemExecutor getExecutor(final String elasticJobType) {
return ElasticJobServiceLoader.getCachedTypedServiceInstance(TypedJobItemExecutor.class, elasticJobType)
.orElseThrow(() -> new JobConfigurationException("Cannot find executor for elastic job type `%s`", elasticJobType));
}
}
ServiceLoader类就是Java提供的SPI,SPI(Service Provider Interface)是JDK内置的一种服务提供发现机制,可以用来启用框架扩展和替换组件,主要是被框架的开发人员使用,不同厂商可以针对同一接口做出不同的实现,比如java.sql.Driver接口,MySQL和PostgreSQL都提供了对应的实现给用户使用,而Java的SPI机制可以为某个接口寻找服务实现。Java中SPI机制主要思想是将装配的控制权移到程序之外,在模块化设计中这个机制尤其重要,其核心思想就是解耦。
ServiceLoader类正常工作的唯一要求是服务提供类必须具有无参构造函数,以便它们可以在加载期间实例化。通过在资源目录的meta-INF/services中放置服务提供者配置文件来标识服务提供者。文件名是服务类型的完全限定名。 该文件包含具体的服务提供者类的完全限定名列表,每行一个。 每个名称周围的空格和制表符以及空行都将被忽略。该文件必须以UTF-8编码。如下图所示:
完成这三个步骤,我们实现的JobItemExecutor就可以被ElasticJob发现,以便将它们用于对应作业分片的执行。
static {
// 加载type作业的JobItemExecutor
ElasticJobServiceLoader.registerTypedService(TypedJobItemExecutor.class);
// 加载class作业的JobItemExecutor
ServiceLoader.load(ClassedJobItemExecutor.class).forEach(each -> CLASSED_EXECUTORS.put(each.getElasticJobClass(), each));
}
在JobItemExecutorFactory类的静态块中会加载class和type作业的JobItemExecutor,加载class作业的JobItemExecutor时,以each.getElasticJobClass()为key,each为value。而MyKavenJobExecutor类的getElasticJobClass方法返回KavenJob.class,这样作业和JobItemExecutor就对应起来了。
@Override
public Class getElasticJobClass() {
return KavenJob.class;
}
加载type作业的JobItemExecutor是在ElasticJobServiceLoader类中完成的(也是使用ServiceLoader类来加载),以instance.getType()为key,instance为value。
public staticvoid registerTypedService(final Class typedService) { if (TYPED_SERVICES.containsKey(typedService)) { return; } ServiceLoader.load(typedService).forEach(each -> registerTypedServiceClass(typedService, each)); } private static void registerTypedServiceClass(final Class typedService, final TypedSPI instance) { TYPED_SERVICES.computeIfAbsent(typedService, unused -> new ConcurrentHashMap<>()).putIfAbsent(instance.getType(), instance); TYPED_SERVICE_CLASSES.computeIfAbsent(typedService, unused -> new ConcurrentHashMap<>()).putIfAbsent(instance.getType(), instance.getClass()); }
这也是为什么ScheduleJobBootstrap构造器的elasticJobType参数需要全部大写(比如script和HTTP)的原因。
new ScheduleJobBootstrap(createRegistryCenter(), "script",
createJobConfiguration()).schedule();
因为这两种作业对应的JobItemExecutor就是使用getType()的返回值作为key进行存储的。
public String getType() {
return "script";
}
public String getType() {
return "HTTP";
}
这样就通过实现SPI接口自行扩展了作业类型,输出如下图所示:
type类型作业的扩展也是类似的,博主对不同之处也进行了分析,这里就不赘述了。
之前的调度都是定时调度(通过ScheduleJobBootstrap类),现在来介绍一次性调用。实现一次性调用其实非常简单,只需要将ScheduleJobBootstrap类换成OneOffJobBootstrap类即可,并且作业配置不能配置cron时间表达式。
package com.kaven.job;
import com.kaven.job.my.MyKavenJob;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
public class Application {
public static void main(String[] args) {
OneOffJobBootstrap oneOffJobBootstrap = new OneOffJobBootstrap(createRegistryCenter(), new MyKavenJob(),
createJobConfiguration());
oneOffJobBootstrap.execute();
}
private static CoordinatorRegistryCenter createRegistryCenter() {
ZookeeperConfiguration zc = new ZookeeperConfiguration("192.168.1.200:9999", "my-job");
zc.setConnectionTimeoutMilliseconds(40000);
zc.setMaxRetries(5);
CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zc);
regCenter.init();
return regCenter;
}
private static JobConfiguration createJobConfiguration() {
String jobs = "0=看论文,1=做实验,2=打比赛,3=开组会,4=看书,5=其他";
return JobConfiguration.newBuilder("MyOneOffJob", 6)
.shardingItemParameters(jobs)
.overwrite(true)
.build();
}
}
输出如下图所示:
多次调度可以多次执行oneOffJobBootstrap.execute()。ElasticJob的扩展作业类型和一次性调度就介绍到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。



