栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

elastic-job-lite简单入门

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

elastic-job-lite简单入门

简介
Elastic-Job分为两个独立的子项目:
Elastic-Job-Lite和Elastic-Job-Cloud
Elastic-Job-Lite
简单的说Elastic-Job-Lite就是一个分布式定时任务。
Quick Start

在SpringBoot中实现一个DEMO.

因为项目中使用了很多ZK的特性,首先需要安装一个ZK.
创建一个SpringBoot项目.
快速使用的话首先要引入相关的依赖包:

 
 
     com.dangdang
     elastic-job-lite-core
     2.1.5
 

 
     com.dangdang
     elastic-job-lite-spring
     2.1.5
 

在使用的时候 首先要配置ZK注册中心,然后配置我们需要调度的任务信息


@Configuration
@ConditionalOnexpression("'${regCenter.serverList}'.length() > 0")
public class RegistryCenterConfig {


    @Value("${registry.serverList}")
    private String serverList;

    @Value("${registry.namespace}")
    private String namespace;


    @Bean(initMethod = "init")
    public ZookeeperRegistryCenter zookeeperRegistryCenter() {


 return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
    }
}

实现一个有业务逻辑的Job,实现SimpleJob接口

public class SpringBootSimpleJob implements SimpleJob{

    public void execute(ShardingContext shardingContext) {

 // do something

    }
}

@Configuration
public class SimpleJobConfig {

    @Autowired
    private ZookeeperRegistryCenter zookeeperRegistryCenter;

    
    @Bean
    public SimpleJob simpleJob() {
 return new SpringBootSimpleJob();
    }


    
    @Bean(initMethod = "init")
    public JobScheduler simpleJobScheduler(final SimpleJob simpleJob, @Value("${simpleJob.cron}") final String cron,
 @Value("${simpleJob.shardingTotalCount}") final int shardingTotalCount,
 @Value("${simpleJob.shardingItemParameters}") final String shardingItemParameters) {

 return new SpringJobScheduler(simpleJob, zookeeperRegistryCenter,
  getLiteJobConfiguration(simpleJob.getClass(), cron, shardingTotalCount, shardingItemParameters));
    }

    
    private LiteJobConfiguration getLiteJobConfiguration(final Class jobClass, final String cron,
 final int shardingTotalCount, final String shardingItemParameters) {

 return LiteJobConfiguration.newBuilder(
  new SimpleJobConfiguration(
   JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount)
    .shardingItemParameters(shardingItemParameters).build(), jobClass.getCanonicalName())).overwrite(true).build();

    }

}

上面就完成了一个简单Job的配置,可以进行测试使用。

源码学习
通过上面创建的demo,整体熟悉一下elastic-job-lite的代码流。

入口:SpringJobScheduler
参数:(ElasticJob, CoordinatorRegistryCenter, LiteJobConfiguration)
可以看到分别传入了:
自己实现的SpringBootSimpleJob
zookeeperRegistryCenter
getLiteJobConfiguration()


getLiteJobConfiguration()方法对我们传入的job信息进行了配置,最后返回了LiteJobConfiguration。
上面的DEMO只传入了少量的参数cron,shardingTotalCount,shardingItemParameters
还有更多的参数可以配置,可以参考JobCoreConfiguration这个类里面的字段信息。


SpringJobScheduler extends JobScheduler:
直接看 JobScheduler 里面

     
    public void init() {

 //去ZK 保存或者更新 job的信息  (ZookeeperRegistryCenter)
 LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig);

 //单列模式实现的一个缓存 (当前分片总数)
 JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());

 //初始化 JobScheduleController
 JobScheduleController jobScheduleController = new JobScheduleController(
  createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());

 JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter);

 //注册作业的启动信息
 schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());

 //启动Quartz调度
 jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());
    }


JobScheduleController初始化:
创建Quartz的调度器
指定Quartz调度时候执行任务的具体实现类LiteJob


registerStartUpInfo注册作业的启动信息:
利用ZK选举当前作业的主节点
持久化作业服务器上线信息
持久化作业运行实例上线相关信息


scheduleJob()加入Quartz调度并且进行调度


上面已经知道了具体在调度中执行的类是LiteJob,继续看LiteJob中的execute()

    @Override
    public void execute(final JobExecutionContext context) throws JobExecutionException {
 JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();
    }

getJobExecutor:
获取作业执行器,我们使用的是SimpleJobExecutor

execute()方法会走到AbstractElasticJobExecutor中,进行作业的执行。

暂时只关注三个流程
1-获取当前作业服务器的分片上下文.

ShardingContexts shardingContexts = jobFacade.getShardingContexts();

getShardingContexts()方法中做的事情:

判断是否需要分片,如果需要就进行分片(调用了shardingService.shardingIfNecessary()方法进行判断)
在shardingIfNecessary()中会判断当前节点是否是主节点,如果是子节点 在这里 等待分片的完成!!!  不然不能执行后续代码。
进行具体分片:调用JobShardingStrategy.sharding()方法进行分片,有几种实现类,先不管。
分片结束会进程将分片信息保存到ZK:
curatorTransactionFinal.create().forPath(jobNodePath.getFullPath(ShardingNode.getInstanceNode(shardingItem)), entry.getKey().getJobInstanceId().getBytes()).and();
entry.getKey().getJobInstanceId()可以简单理解为一个服务器
大致保存的内容可以理解为:
服务器1 中  有 [0,1,2]分片
服务器2 中  有 [3,4,5]分片


2-分片完成后,获取当前机器需要执行的分片任务!!! (当前作业的主节点会进行分片,子节点会等待主节点分片完成, 最后都会走分片后的逻辑。)

List shardingItems = shardingService.getLocalShardingItems();
 
 public List getShardingItems(final String jobInstanceId) {
 JobInstance jobInstance = new JobInstance(jobInstanceId);
 if (!serverService.isAvailableServer(jobInstance.getIp())) {
     return Collections.emptyList();
 }
 List result = new linkedList<>();
 int shardingTotalCount = configService.load(true).getTypeConfig().getCoreConfig().getShardingTotalCount();
 for (int i = 0; i < shardingTotalCount; i++) {
     if (jobInstance.getJobInstanceId().equals(jobNodeStorage.getJobNodeData(ShardingNode.getInstanceNode(i)))) {
  result.add(i);
     }
 }
 return result;
    }

可以看到同样是根据getJobInstanceId  来获取 分片任务的,和之前呼应。


3-真正的执行分片的作业
具体代码不展示了
最后会调用抽象方法protected abstract void process(ShardingContext shardingContext);

之前已经知道了具体的实现类:SimpleJobExecutor:

    @Override
    protected void process(final ShardingContext shardingContext) {
 simpleJob.execute(shardingContext);
    }
    
    
 可以看到这里真正调用了我们的 simpleJob (具体实现类SpringBootSimpleJob)  的  execute方法


常用类
对源码中常用的类进行分析

JobRegistry类:作业注册表

JobRegistry是一个单列的类,在项目启动后只有一个实例。在这里面存储了作业的相关信息

     // 作业名称 ---> 作业调度控制器
     Map schedulerMap = new ConcurrentHashMap<>();
    
     // 作业名称 ---> 注册中心
     Map regCenterMap = new ConcurrentHashMap<>();
    
     // 作业名称 ---> 作业实例
     Map jobInstanceMap = new ConcurrentHashMap<>();
    
     //作业名称 ---> 作业是否运行     获取作业是否在运行
     Map jobRunningMap = new ConcurrentHashMap<>();
    
      //作业名称 ---> 作业的分片数量
     Map currentShardingTotalCountMap = new ConcurrentHashMap<>();



ZookeeperRegistryCenter类:注册中心
ZookeeperRegistryCenter依赖  curator  来实现
初始化的时候回调用init()方法进行启动客户的(client.start();)

注册中心具体提供了Zk操作的一些工具方法
比如创建持久节点,创建临时节点。。

TreeCache:
可以监听指定节点下的节点的变化(所有节点的变化,多级目录)
可以监听的事件包括:节点创建,节点数据的变化,节点的删除
 TreeCache cache = new TreeCache(client, cachePath);
 try {
     cache.start();
 //CHECKSTYLE:OFF
 } catch (final Exception ex) {
 //CHECKSTYLE:ON
     RegExceptionHandler.handleException(ex);
 }
 caches.put(cachePath + "/", cache);

Map caches = new HashMap<>();  //保存了每个监听节点的cache


JobNodePath类:作业节点路径类
基本上所有获取的节点都会加上作业名称(jobName) 



JobNodeStorage类:作业节点数据访问类


LeaderService类:主节点的服务类
先通过LeaderLatch 设置 主节点,如果leader节点选举成功,会保存作业实例节点到ZK


ServerService类:作业服务器服务
ServerService提供了注册上线的服务器的信息(jobName/server/ip)
提供了hasAvailableServers获取可用服务器信息(jobName/server)

和xxl-job的对比

之了解过xxl-job,感觉整体流程不一样
xxl-job会有一个Master节点,利用RPC分发任务到worker节点。
worker节点启动后会注册到zk或者mysql.  Master节点分配任务会根据任务配置 和 负载策略选择一个worker进行任务的执行。
xxl-job整体调度的实现也是使用Quartz.



elastic-job整体感觉是启动节点后,会先选择一个主节点,也会将信息保存到zk中,(主节点和其他节点都是调度的工作节点,只不过主节点需要完成分片的功能),然后添加作业到调度
在调度进行的时候(主节点)进行分片的配置,从节点会等待分片完成,配置完成后,主节点和从节点会进行获取当前节点分片的任务,执行具体的job.
更多的使用了ZK.

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号