栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

ElasticJob‐Lite:作业分片策略介绍与源码分析

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

ElasticJob‐Lite:作业分片策略介绍与源码分析

分片

弹性调度是ElasticJob最重要的功能,也是这款产品名称的由来。它是一款能够让任务通过分片进行水
平扩展的任务处理系统。

ElasticJob中任务分片项的概念,使得任务可以在分布式的环境下运行,每台任务服务器只运行分配给该服务器的分片。随着服务器的增加或宕机,ElasticJob会近乎实时的感知服务器数量的变更,从而重新为分布式的任务服务器分配更加合理的任务分片项,使得任务可以随着资源的增加而提升效率。
任务的分布式执行,需要将一个任务拆分为多个独立的任务项,然后由分布式的服务器分别执行某一个或几个分片项。

举例说明,如果作业分为4片,用两台服务器执行,则每个服务器分到2片,分别负责作业的50%的负载,如下图所示。

ElasticJob并不直接提供数据处理的功能,而是将分片项分配至各个运行中的作业服务器,开发者需要自行处理分片项与业务的对应关系。分片项为数字,始于0而终于分片总数减1。

以上是ElasticJob的官方文档对分片的描述,而文档对作业分片策略的介绍非常简单,只给了作业分片策略的SPI名称,如下图所示:

作业分片策略

博主目前使用的是3.0.1版本的ElasticJob‐Lite(目前最新版本)。

        
            org.apache.shardingsphere.elasticjob
            elasticjob-lite-core
            3.0.1
        


作业分片策略的SPI名称是JobShardingStrategy,是作业分片策略的顶层设计。

package org.apache.shardingsphere.elasticjob.infra.handler.sharding;

import org.apache.shardingsphere.elasticjob.infra.spi.TypedSPI;

import java.util.List;
import java.util.Map;


public interface JobShardingStrategy extends TypedSPI {
    
    
    Map> sharding(List jobInstances, String jobName, int shardingTotalCount);
}

JobShardingStrategy接口的sharding方法就是用来定义作业分片的逻辑,供子类实现,目前有三个实现类:AverageAllocationJobShardingStrategy、OdevitySortByNameJobShardingStrategy以及RoundRobinByNameJobShardingStrategy。

AverageAllocationJobShardingStrategy

源码如下:

package org.apache.shardingsphere.elasticjob.infra.handler.sharding.impl;

import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobShardingStrategy;

import java.util.ArrayList;
import java.util.Collections;
import java.util.linkedHashMap;
import java.util.List;
import java.util.Map;

public final class AverageAllocationJobShardingStrategy implements JobShardingStrategy {
    
    @Override
    public Map> sharding(final List jobInstances, final String jobName, final int shardingTotalCount) {
        if (jobInstances.isEmpty()) {
            return Collections.emptyMap();
        }
        Map> result = shardingAliquot(jobInstances, shardingTotalCount);
        addAliquant(jobInstances, shardingTotalCount, result);
        return result;
    }
    
    private Map> shardingAliquot(final List shardingUnits, final int shardingTotalCount) {
        Map> result = new linkedHashMap<>(shardingUnits.size(), 1);
        int itemCountPerSharding = shardingTotalCount / shardingUnits.size();
        int count = 0;
        for (JobInstance each : shardingUnits) {
            List shardingItems = new ArrayList<>(itemCountPerSharding + 1);
            for (int i = count * itemCountPerSharding; i < (count + 1) * itemCountPerSharding; i++) {
                shardingItems.add(i);
            }
            result.put(each, shardingItems);
            count++;
        }
        return result;
    }
    
    private void addAliquant(final List shardingUnits, final int shardingTotalCount, final Map> shardingResults) {
        int aliquant = shardingTotalCount % shardingUnits.size();
        int count = 0;
        for (Map.Entry> entry : shardingResults.entrySet()) {
            if (count < aliquant) {
                entry.getValue().add(shardingTotalCount / shardingUnits.size() * shardingUnits.size() + count);
            }
            count++;
        }
    }
    
    @Override
    public String getType() {
        return "AVG_ALLOCATION";
    }
}

这是一种尽量平均分配的分片策略,如果作业的分片项无法平均分配给所有的作业服务器,即作业的分片项数%作业服务器数不为零,则将无法平均分配的冗余分片项依次添加到序号较小的服务器中。 例如:

  • 如果有3个作业服务器,总分片数为9,每个作业服务器的分片项为:1=[0,1,2],2=[3,4,5],3=[6,7,8]。
  • 如果有3个作业服务器,总分片数为8,每个作业服务器的分片项为:1=[0,1,6],2=[2,3,7],3=[4,5]。
  • 如果有3个作业服务器,总分片数为10,每个作业服务器的分片项为:1=[0,1,2,9],2=[3,4,5],3=[6,7,8]。

先给每个作业服务器分配相同数量的作业分片项(数量为:作业的分片项数/作业服务器数)。

    private Map> shardingAliquot(final List shardingUnits, final int shardingTotalCount) {
        Map> result = new linkedHashMap<>(shardingUnits.size(), 1);
        // 每个作业服务器最少应该分配的作业分片项数
        int itemCountPerSharding = shardingTotalCount / shardingUnits.size();
        int count = 0;
        for (JobInstance each : shardingUnits) {
            // 每个作业服务器申请的作业分片项列表(容量为itemCountPerSharding + 1)
            // itemCountPerSharding + 1为每个作业服务器最多应该分配的作业分片项数
            List shardingItems = new ArrayList<>(itemCountPerSharding + 1);
            for (int i = count * itemCountPerSharding; i < (count + 1) * itemCountPerSharding; i++) {
                // 给作业分片项列表添加第i个作业分片项
                shardingItems.add(i);
            }
            // 将作业服务器与它执行的作业分片项列表进行关联
            result.put(each, shardingItems);
            count++;
        }
        return result;
    }

如果作业的分片项无法平均分配给所有的作业服务器,则将无法平均分配的冗余分片项依次添加到序号较小的服务器中。

    private void addAliquant(final List shardingUnits, final int shardingTotalCount, final Map> shardingResults) {
        // 无法平均分配的分片项数
        int aliquant = shardingTotalCount % shardingUnits.size();
        // 已分配的无法平均分配的分片项数
        int count = 0;
        for (Map.Entry> entry : shardingResults.entrySet()) {
            // 是否还有无法平均分配的分片项
            if (count < aliquant) {
                // 分配给序号较小的作业服务器
                entry.getValue().add(shardingTotalCount / shardingUnits.size() * shardingUnits.size() + count);
            }
            // 已分配数更新
            count++;
        }
    }
OdevitySortByNameJobShardingStrategy

源码如下:

package org.apache.shardingsphere.elasticjob.infra.handler.sharding.impl;

import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobShardingStrategy;

import java.util.Collections;
import java.util.List;
import java.util.Map;

public final class OdevitySortByNameJobShardingStrategy implements JobShardingStrategy {
    
    private final AverageAllocationJobShardingStrategy averageAllocationJobShardingStrategy = new AverageAllocationJobShardingStrategy();
    
    @Override
    public Map> sharding(final List jobInstances, final String jobName, final int shardingTotalCount) {
        long jobNameHash = jobName.hashCode();
        if (0 == jobNameHash % 2) {
            Collections.reverse(jobInstances);
        }
        return averageAllocationJobShardingStrategy.sharding(jobInstances, jobName, shardingTotalCount);
    }
    
    @Override
    public String getType() {
        return "ODEVITY";
    }
}

其实还是使用AverageAllocationJobShardingStrategy作业分片策略进行分配,只是会先根据作业名称的哈希码的奇偶性来决定是否对作业服务器列表进行reverse操作。例如:

  • 如果有3个作业服务器,总分片数为2,作业名称的哈希码为奇数(对作业服务器列表不进行reverse操作),每个作业服务器的分片项为:1=[0],2=[1],3=[]。
  • 如果有3个作业服务器,总分片数为2,作业名的哈希码是偶数(对作业服务器列表进行reverse操作),每个作业服务器的分片项为:3=[0],2=[1],1=[]。
RoundRobinByNameJobShardingStrategy

源码如下:

package org.apache.shardingsphere.elasticjob.infra.handler.sharding.impl;

import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobShardingStrategy;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public final class RoundRobinByNameJobShardingStrategy implements JobShardingStrategy {
    
    private final AverageAllocationJobShardingStrategy averageAllocationJobShardingStrategy = new AverageAllocationJobShardingStrategy();
    
    @Override
    public Map> sharding(final List jobInstances, final String jobName, final int shardingTotalCount) {
        return averageAllocationJobShardingStrategy.sharding(rotateServerList(jobInstances, jobName), jobName, shardingTotalCount);
    }
    
    private List rotateServerList(final List shardingUnits, final String jobName) {
        int shardingUnitsSize = shardingUnits.size();
        int offset = Math.abs(jobName.hashCode()) % shardingUnitsSize;
        if (0 == offset) {
            return shardingUnits;
        }
        List result = new ArrayList<>(shardingUnitsSize);
        for (int i = 0; i < shardingUnitsSize; i++) {
            int index = (i + offset) % shardingUnitsSize;
            result.add(shardingUnits.get(index));
        }
        return result;
    }
    
    @Override
    public String getType() {
        return "ROUND_ROBIN";
    }
}

其实跟OdevitySortByNameJobShardingStrategy作业分片策略类似,都是使用AverageAllocationJobShardingStrategy作业分片策略进行分配,并且在分配前都会根据作业名称的哈希码将作业服务器列表中的作业服务器项改变顺序,只是变序规则不一样而已,OdevitySortByNameJobShardingStrategy作业分片策略根据作业名称的哈希码的奇偶性来决定是否对作业服务器列表进行reverse操作,而RoundRobinByNameJobShardingStrategy作业分片策略根据作业名称的哈希码的绝对值%作业服务器数的值对作业服务器列表进行rotate操作。例如:

  • 如果有3个作业服务器,总分片数为2,作业名称的哈希码的绝对值%作业服务器数的值为0,每个作业服务器的分片项为:1=[0],2=[1],3=[]。
  • 如果有3个作业服务器,总分片数为2,作业名称的哈希码的绝对值%作业服务器数的值为1,每个作业服务器的分片项为:2=[0],3=[1],1=[]。
  • 如果有3个作业服务器,总分片数为2,作业名称的哈希码的绝对值%作业服务器数的值为2,每个作业服务器的分片项为:3=[0],1=[1],2=[]。
JobShardingStrategyFactory

作业的分片策略通过JobShardingStrategyFactory类(作业分片策略工厂类)的getStrategy方法获取,源码如下:

package org.apache.shardingsphere.elasticjob.infra.handler.sharding;

import com.google.common.base.Strings;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.elasticjob.infra.exception.JobConfigurationException;
import org.apache.shardingsphere.elasticjob.infra.spi.ElasticJobServiceLoader;

@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class JobShardingStrategyFactory {
    
    private static final String DEFAULT_STRATEGY = "AVG_ALLOCATION";
    
    static {
        ElasticJobServiceLoader.registerTypedService(JobShardingStrategy.class);
    }
    
    public static JobShardingStrategy getStrategy(final String type) {
        if (Strings.isNullOrEmpty(type)) {
            return ElasticJobServiceLoader.getCachedTypedServiceInstance(JobShardingStrategy.class, DEFAULT_STRATEGY).get();
        }
        return ElasticJobServiceLoader.getCachedTypedServiceInstance(JobShardingStrategy.class, type)
                .orElseThrow(() -> new JobConfigurationException("Cannot find sharding strategy using type '%s'.", type));
    }
}

在JobShardingStrategyFactory类的静态块中使用ElasticJobServiceLoader类的registerTypedService方法加载所有作业分片策略。

    static {
        ElasticJobServiceLoader.registerTypedService(JobShardingStrategy.class);
    }

ElasticJobServiceLoader类的相关代码如下所示,通过Java提供的SPI机制(ServiceLoader类)加载所有作业分片策略。

    private static final ConcurrentMap, ConcurrentMap> TYPED_SERVICES = new ConcurrentHashMap<>();
    
    private static final ConcurrentMap, ConcurrentMap>> TYPED_SERVICE_CLASSES = new ConcurrentHashMap<>();
 
    public static  void registerTypedService(final Class typedService) {
        if (TYPED_SERVICES.containsKey(typedService)) {
            return;
        }
        ServiceLoader.load(typedService).forEach(each -> registerTypedServiceClass(typedService, each));
    }
    
    private static  void registerTypedServiceClass(final Class typedService, final TypedSPI instance) {
        TYPED_SERVICES.computeIfAbsent(typedService, unused -> new ConcurrentHashMap<>()).putIfAbsent(instance.getType(), instance);
        TYPED_SERVICE_CLASSES.computeIfAbsent(typedService, unused -> new ConcurrentHashMap<>()).putIfAbsent(instance.getType(), instance.getClass());
    }

默认为AverageAllocationJobShardingStrategy作业分片策略,和官方文档给的示意图是对应的。

private static final String DEFAULT_STRATEGY = "AVG_ALLOCATION";

AverageAllocationJobShardingStrategy类的getType方法(ElasticJobServiceLoader类加载所有作业分片策略会将getType方法的返回值作为存储每个作业分片策略实例的第二个key值)。

    @Override
    public String getType() {
        return "AVG_ALLOCATION";
    }

到这里就结束了,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/692600.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号