弹性调度是ElasticJob最重要的功能,也是这款产品名称的由来。它是一款能够让任务通过分片进行水
平扩展的任务处理系统。
ElasticJob中任务分片项的概念,使得任务可以在分布式的环境下运行,每台任务服务器只运行分配给该服务器的分片。随着服务器的增加或宕机,ElasticJob会近乎实时的感知服务器数量的变更,从而重新为分布式的任务服务器分配更加合理的任务分片项,使得任务可以随着资源的增加而提升效率。
任务的分布式执行,需要将一个任务拆分为多个独立的任务项,然后由分布式的服务器分别执行某一个或几个分片项。
举例说明,如果作业分为4片,用两台服务器执行,则每个服务器分到2片,分别负责作业的50%的负载,如下图所示。
ElasticJob并不直接提供数据处理的功能,而是将分片项分配至各个运行中的作业服务器,开发者需要自行处理分片项与业务的对应关系。分片项为数字,始于0而终于分片总数减1。
以上是ElasticJob的官方文档对分片的描述,而文档对作业分片策略的介绍非常简单,只给了作业分片策略的SPI名称,如下图所示:
博主目前使用的是3.0.1版本的ElasticJob‐Lite(目前最新版本)。
org.apache.shardingsphere.elasticjob elasticjob-lite-core 3.0.1
作业分片策略的SPI名称是JobShardingStrategy,是作业分片策略的顶层设计。
package org.apache.shardingsphere.elasticjob.infra.handler.sharding;
import org.apache.shardingsphere.elasticjob.infra.spi.TypedSPI;
import java.util.List;
import java.util.Map;
public interface JobShardingStrategy extends TypedSPI {
Map> sharding(List jobInstances, String jobName, int shardingTotalCount);
}
JobShardingStrategy接口的sharding方法就是用来定义作业分片的逻辑,供子类实现,目前有三个实现类:AverageAllocationJobShardingStrategy、OdevitySortByNameJobShardingStrategy以及RoundRobinByNameJobShardingStrategy。
源码如下:
package org.apache.shardingsphere.elasticjob.infra.handler.sharding.impl;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobShardingStrategy;
import java.util.ArrayList;
import java.util.Collections;
import java.util.linkedHashMap;
import java.util.List;
import java.util.Map;
public final class AverageAllocationJobShardingStrategy implements JobShardingStrategy {
@Override
public Map> sharding(final List jobInstances, final String jobName, final int shardingTotalCount) {
if (jobInstances.isEmpty()) {
return Collections.emptyMap();
}
Map> result = shardingAliquot(jobInstances, shardingTotalCount);
addAliquant(jobInstances, shardingTotalCount, result);
return result;
}
private Map> shardingAliquot(final List shardingUnits, final int shardingTotalCount) {
Map> result = new linkedHashMap<>(shardingUnits.size(), 1);
int itemCountPerSharding = shardingTotalCount / shardingUnits.size();
int count = 0;
for (JobInstance each : shardingUnits) {
List shardingItems = new ArrayList<>(itemCountPerSharding + 1);
for (int i = count * itemCountPerSharding; i < (count + 1) * itemCountPerSharding; i++) {
shardingItems.add(i);
}
result.put(each, shardingItems);
count++;
}
return result;
}
private void addAliquant(final List shardingUnits, final int shardingTotalCount, final Map> shardingResults) {
int aliquant = shardingTotalCount % shardingUnits.size();
int count = 0;
for (Map.Entry> entry : shardingResults.entrySet()) {
if (count < aliquant) {
entry.getValue().add(shardingTotalCount / shardingUnits.size() * shardingUnits.size() + count);
}
count++;
}
}
@Override
public String getType() {
return "AVG_ALLOCATION";
}
}
这是一种尽量平均分配的分片策略,如果作业的分片项无法平均分配给所有的作业服务器,即作业的分片项数%作业服务器数不为零,则将无法平均分配的冗余分片项依次添加到序号较小的服务器中。 例如:
- 如果有3个作业服务器,总分片数为9,每个作业服务器的分片项为:1=[0,1,2],2=[3,4,5],3=[6,7,8]。
- 如果有3个作业服务器,总分片数为8,每个作业服务器的分片项为:1=[0,1,6],2=[2,3,7],3=[4,5]。
- 如果有3个作业服务器,总分片数为10,每个作业服务器的分片项为:1=[0,1,2,9],2=[3,4,5],3=[6,7,8]。
先给每个作业服务器分配相同数量的作业分片项(数量为:作业的分片项数/作业服务器数)。
private Map> shardingAliquot(final List shardingUnits, final int shardingTotalCount) { Map > result = new linkedHashMap<>(shardingUnits.size(), 1); // 每个作业服务器最少应该分配的作业分片项数 int itemCountPerSharding = shardingTotalCount / shardingUnits.size(); int count = 0; for (JobInstance each : shardingUnits) { // 每个作业服务器申请的作业分片项列表(容量为itemCountPerSharding + 1) // itemCountPerSharding + 1为每个作业服务器最多应该分配的作业分片项数 List shardingItems = new ArrayList<>(itemCountPerSharding + 1); for (int i = count * itemCountPerSharding; i < (count + 1) * itemCountPerSharding; i++) { // 给作业分片项列表添加第i个作业分片项 shardingItems.add(i); } // 将作业服务器与它执行的作业分片项列表进行关联 result.put(each, shardingItems); count++; } return result; }
如果作业的分片项无法平均分配给所有的作业服务器,则将无法平均分配的冗余分片项依次添加到序号较小的服务器中。
private void addAliquant(final ListOdevitySortByNameJobShardingStrategyshardingUnits, final int shardingTotalCount, final Map > shardingResults) { // 无法平均分配的分片项数 int aliquant = shardingTotalCount % shardingUnits.size(); // 已分配的无法平均分配的分片项数 int count = 0; for (Map.Entry > entry : shardingResults.entrySet()) { // 是否还有无法平均分配的分片项 if (count < aliquant) { // 分配给序号较小的作业服务器 entry.getValue().add(shardingTotalCount / shardingUnits.size() * shardingUnits.size() + count); } // 已分配数更新 count++; } }
源码如下:
package org.apache.shardingsphere.elasticjob.infra.handler.sharding.impl;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobShardingStrategy;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public final class OdevitySortByNameJobShardingStrategy implements JobShardingStrategy {
private final AverageAllocationJobShardingStrategy averageAllocationJobShardingStrategy = new AverageAllocationJobShardingStrategy();
@Override
public Map> sharding(final List jobInstances, final String jobName, final int shardingTotalCount) {
long jobNameHash = jobName.hashCode();
if (0 == jobNameHash % 2) {
Collections.reverse(jobInstances);
}
return averageAllocationJobShardingStrategy.sharding(jobInstances, jobName, shardingTotalCount);
}
@Override
public String getType() {
return "ODEVITY";
}
}
其实还是使用AverageAllocationJobShardingStrategy作业分片策略进行分配,只是会先根据作业名称的哈希码的奇偶性来决定是否对作业服务器列表进行reverse操作。例如:
- 如果有3个作业服务器,总分片数为2,作业名称的哈希码为奇数(对作业服务器列表不进行reverse操作),每个作业服务器的分片项为:1=[0],2=[1],3=[]。
- 如果有3个作业服务器,总分片数为2,作业名的哈希码是偶数(对作业服务器列表进行reverse操作),每个作业服务器的分片项为:3=[0],2=[1],1=[]。
源码如下:
package org.apache.shardingsphere.elasticjob.infra.handler.sharding.impl;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobShardingStrategy;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public final class RoundRobinByNameJobShardingStrategy implements JobShardingStrategy {
private final AverageAllocationJobShardingStrategy averageAllocationJobShardingStrategy = new AverageAllocationJobShardingStrategy();
@Override
public Map> sharding(final List jobInstances, final String jobName, final int shardingTotalCount) {
return averageAllocationJobShardingStrategy.sharding(rotateServerList(jobInstances, jobName), jobName, shardingTotalCount);
}
private List rotateServerList(final List shardingUnits, final String jobName) {
int shardingUnitsSize = shardingUnits.size();
int offset = Math.abs(jobName.hashCode()) % shardingUnitsSize;
if (0 == offset) {
return shardingUnits;
}
List result = new ArrayList<>(shardingUnitsSize);
for (int i = 0; i < shardingUnitsSize; i++) {
int index = (i + offset) % shardingUnitsSize;
result.add(shardingUnits.get(index));
}
return result;
}
@Override
public String getType() {
return "ROUND_ROBIN";
}
}
其实跟OdevitySortByNameJobShardingStrategy作业分片策略类似,都是使用AverageAllocationJobShardingStrategy作业分片策略进行分配,并且在分配前都会根据作业名称的哈希码将作业服务器列表中的作业服务器项改变顺序,只是变序规则不一样而已,OdevitySortByNameJobShardingStrategy作业分片策略根据作业名称的哈希码的奇偶性来决定是否对作业服务器列表进行reverse操作,而RoundRobinByNameJobShardingStrategy作业分片策略根据作业名称的哈希码的绝对值%作业服务器数的值对作业服务器列表进行rotate操作。例如:
- 如果有3个作业服务器,总分片数为2,作业名称的哈希码的绝对值%作业服务器数的值为0,每个作业服务器的分片项为:1=[0],2=[1],3=[]。
- 如果有3个作业服务器,总分片数为2,作业名称的哈希码的绝对值%作业服务器数的值为1,每个作业服务器的分片项为:2=[0],3=[1],1=[]。
- 如果有3个作业服务器,总分片数为2,作业名称的哈希码的绝对值%作业服务器数的值为2,每个作业服务器的分片项为:3=[0],1=[1],2=[]。
作业的分片策略通过JobShardingStrategyFactory类(作业分片策略工厂类)的getStrategy方法获取,源码如下:
package org.apache.shardingsphere.elasticjob.infra.handler.sharding;
import com.google.common.base.Strings;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.elasticjob.infra.exception.JobConfigurationException;
import org.apache.shardingsphere.elasticjob.infra.spi.ElasticJobServiceLoader;
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class JobShardingStrategyFactory {
private static final String DEFAULT_STRATEGY = "AVG_ALLOCATION";
static {
ElasticJobServiceLoader.registerTypedService(JobShardingStrategy.class);
}
public static JobShardingStrategy getStrategy(final String type) {
if (Strings.isNullOrEmpty(type)) {
return ElasticJobServiceLoader.getCachedTypedServiceInstance(JobShardingStrategy.class, DEFAULT_STRATEGY).get();
}
return ElasticJobServiceLoader.getCachedTypedServiceInstance(JobShardingStrategy.class, type)
.orElseThrow(() -> new JobConfigurationException("Cannot find sharding strategy using type '%s'.", type));
}
}
在JobShardingStrategyFactory类的静态块中使用ElasticJobServiceLoader类的registerTypedService方法加载所有作业分片策略。
static {
ElasticJobServiceLoader.registerTypedService(JobShardingStrategy.class);
}
ElasticJobServiceLoader类的相关代码如下所示,通过Java提供的SPI机制(ServiceLoader类)加载所有作业分片策略。
private static final ConcurrentMap, ConcurrentMap > TYPED_SERVICES = new ConcurrentHashMap<>(); private static final ConcurrentMap , ConcurrentMap >> TYPED_SERVICE_CLASSES = new ConcurrentHashMap<>(); public static void registerTypedService(final Class typedService) { if (TYPED_SERVICES.containsKey(typedService)) { return; } ServiceLoader.load(typedService).forEach(each -> registerTypedServiceClass(typedService, each)); } private static void registerTypedServiceClass(final Class typedService, final TypedSPI instance) { TYPED_SERVICES.computeIfAbsent(typedService, unused -> new ConcurrentHashMap<>()).putIfAbsent(instance.getType(), instance); TYPED_SERVICE_CLASSES.computeIfAbsent(typedService, unused -> new ConcurrentHashMap<>()).putIfAbsent(instance.getType(), instance.getClass()); }
默认为AverageAllocationJobShardingStrategy作业分片策略,和官方文档给的示意图是对应的。
private static final String DEFAULT_STRATEGY = "AVG_ALLOCATION";
AverageAllocationJobShardingStrategy类的getType方法(ElasticJobServiceLoader类加载所有作业分片策略会将getType方法的返回值作为存储每个作业分片策略实例的第二个key值)。
@Override
public String getType() {
return "AVG_ALLOCATION";
}
到这里就结束了,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。



