栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 面试经验 > 面试问答

具有Spring Batch作业的Spring Cloud数据流

面试问答 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

具有Spring Batch作业的Spring Cloud数据流

我遇到了与Elad的第3点相同的问题,并最终通过使用此处演示的基本框架解决了这个问题,但是使用了DeployerPartitionHandler和DeployerStepExecutionHandler的修改版本。

我首先尝试了一种创建两级分区的幼稚方法,其中,每个工作人员执行的步骤本身都被划分为子分区。但是该框架似乎并不支持这一点。它对步骤的状态感到困惑。

因此,我回到了一组平坦的分区,但将多个步骤执行ID传递给每个工作人员。为此,我创建了DeployerMultiPartitionHandler,它启动配置的数量的工作程序并将每个步骤执行ID传递给每个工作程序。请注意,现在有两个自由度:工人数和gridSize,gridSize是尽可能均匀地分配给工人的分区总数。不幸的是,我不得不在这里重复很多DeployerPartitionHandler的代码。

@Slf4j@Getter@Setterpublic class DeployerMultiPartitionHandler implements PartitionHandler, EnvironmentAware, InitializingBean {    public static final String SPRING_CLOUD_TASK_STEP_EXECUTION_IDS = "spring.cloud.task.step-execution-ids";    public static final String SPRING_CLOUD_TASK_JOB_EXECUTION_ID = "spring.cloud.task.job-execution-id";    public static final String SPRING_CLOUD_TASK_STEP_EXECUTION_ID = "spring.cloud.task.step-execution-id";    public static final String SPRING_CLOUD_TASK_STEP_NAME = "spring.cloud.task.step-name";    public static final String SPRING_CLOUD_TASK_PARENT_EXECUTION_ID = "spring.cloud.task.parentExecutionId";    public static final String SPRING_CLOUD_TASK_NAME = "spring.cloud.task.name";    private int maxWorkers = -1;    private int gridSize = 1;    private int currentWorkers = 0;    private TaskLauncher taskLauncher;    private JobExplorer jobExplorer;    private TaskExecution taskExecution;    private Resource resource;    private String stepName;    private long pollInterval = 10000;    private long timeout = -1;    private Environment environment;    private Map<String, String> deploymentProperties;    private EnvironmentVariablesProvider environmentVariablesProvider;    private String applicationName;    private CommandLineArgsProvider commandLineArgsProvider;    private boolean defaultArgsAsEnvironmentVars = false;    public DeployerMultiPartitionHandler(TaskLauncher taskLauncher,   JobExplorer jobExplorer,   Resource resource,   String stepName) { Assert.notNull(taskLauncher, "A taskLauncher is required"); Assert.notNull(jobExplorer, "A jobExplorer is required"); Assert.notNull(resource, "A resource is required"); Assert.hasText(stepName, "A step name is required"); this.taskLauncher = taskLauncher; this.jobExplorer = jobExplorer; this.resource = resource; this.stepName = stepName;    }    @Override    public Collection<StepExecution> handle(StepExecutionSplitter stepSplitter,StepExecution stepExecution) throws Exception {        final Set<StepExecution> tempCandidates =     stepSplitter.split(stepExecution, this.gridSize);        // Following two lines due to https://jira.spring.io/browse/BATCH-2490        final List<StepExecution> candidates = new ArrayList<>(tempCandidates.size());        candidates.addAll(tempCandidates);        int partitions = candidates.size();        log.debug(String.format("%s partitions were returned", partitions));        final Set<StepExecution> executed = new HashSet<>(candidates.size());        if (CollectionUtils.isEmpty(candidates)) { return null;        }        launchWorkers(candidates, executed);        candidates.removeAll(executed);        return pollReplies(stepExecution, executed, partitions);    }    private void launchWorkers(List<StepExecution> candidates, Set<StepExecution> executed) {        int partitions = candidates.size();        int numWorkers = this.maxWorkers != -1 ? Math.min(this.maxWorkers, partitions) : partitions;        IntStream.range(0, numWorkers).boxed()     .map(i -> candidates.subList(partitionOffset(partitions, numWorkers, i), partitionOffset(partitions, numWorkers, i + 1)))     .filter(not(List::isEmpty))     .forEach(stepExecutions -> processStepExecutions(stepExecutions, executed));    }    private void processStepExecutions(List<StepExecution> stepExecutions, Set<StepExecution> executed) {        launchWorker(stepExecutions);        this.currentWorkers++;        executed.addAll(stepExecutions);    }    private void launchWorker(List<StepExecution> workerStepExecutions) {        List<String> arguments = new ArrayList<>();        StepExecution firstWorkerStepExecution = workerStepExecutions.get(0);        ExecutionContext copyContext = new ExecutionContext(firstWorkerStepExecution.getExecutionContext());        arguments.addAll(     this.commandLineArgsProvider  .getCommandLineArgs(copyContext));        String jobExecutionId = String.valueOf(firstWorkerStepExecution.getJobExecution().getId());        String stepExecutionIds = workerStepExecutions.stream().map(workerStepExecution -> String.valueOf(workerStepExecution.getId())).collect(joining(","));        String taskName = String.format("%s_%s_%s",     taskExecution.getTaskName(),     firstWorkerStepExecution.getJobExecution().getJobInstance().getJobName(),     firstWorkerStepExecution.getStepName());        String parentExecutionId = String.valueOf(taskExecution.getExecutionId());        if(!this.defaultArgsAsEnvironmentVars) { arguments.add(formatArgument(SPRING_CLOUD_TASK_JOB_EXECUTION_ID,         jobExecutionId)); arguments.add(formatArgument(SPRING_CLOUD_TASK_STEP_EXECUTION_IDS,         stepExecutionIds)); arguments.add(formatArgument(SPRING_CLOUD_TASK_STEP_NAME, this.stepName)); arguments.add(formatArgument(SPRING_CLOUD_TASK_NAME, taskName)); arguments.add(formatArgument(SPRING_CLOUD_TASK_PARENT_EXECUTION_ID,         parentExecutionId));        }        copyContext = new ExecutionContext(firstWorkerStepExecution.getExecutionContext());        log.info("launchWorker context={}", copyContext);        Map<String, String> environmentVariables = this.environmentVariablesProvider.getEnvironmentVariables(copyContext);        if(this.defaultArgsAsEnvironmentVars) { environmentVariables.put(SPRING_CLOUD_TASK_JOB_EXECUTION_ID,         jobExecutionId); environmentVariables.put(SPRING_CLOUD_TASK_STEP_EXECUTION_ID,         String.valueOf(firstWorkerStepExecution.getId())); environmentVariables.put(SPRING_CLOUD_TASK_STEP_NAME, this.stepName); environmentVariables.put(SPRING_CLOUD_TASK_NAME, taskName); environmentVariables.put(SPRING_CLOUD_TASK_PARENT_EXECUTION_ID,         parentExecutionId);        }        AppDefinition definition =     new AppDefinition(resolveApplicationName(),  environmentVariables);        AppDeploymentRequest request =     new AppDeploymentRequest(definition,  this.resource,  this.deploymentProperties,  arguments);        taskLauncher.launch(request);    }    private String resolveApplicationName() {        if(StringUtils.hasText(this.applicationName)) { return this.applicationName;        }        else { return this.taskExecution.getTaskName();        }    }    private String formatArgument(String key, String value) {        return String.format("--%s=%s", key, value);    }    private Collection<StepExecution> pollReplies(final StepExecution masterStepExecution,      final Set<StepExecution> executed,      final int size) throws Exception {        final Collection<StepExecution> result = new ArrayList<>(executed.size());        Callable<Collection<StepExecution>> callback = new Callable<Collection<StepExecution>>() { @Override public Collection<StepExecution> call() {     Set<StepExecution> newExecuted = new HashSet<>();     for (StepExecution curStepExecution : executed) {         if (!result.contains(curStepExecution)) {  StepExecution partitionStepExecution =          jobExplorer.getStepExecution(masterStepExecution.getJobExecutionId(), curStepExecution.getId());  if (isComplete(partitionStepExecution.getStatus())) {      result.add(partitionStepExecution);      currentWorkers--;  }         }     }     executed.addAll(newExecuted);     if (result.size() == size) {         return result;     }     else {         return null;     } }        };        Poller<Collection<StepExecution>> poller = new DirectPoller<>(this.pollInterval);        Future<Collection<StepExecution>> resultsFuture = poller.poll(callback);        if (timeout >= 0) { return resultsFuture.get(timeout, TimeUnit.MILLISECONDS);        }        else { return resultsFuture.get();        }    }    private boolean isComplete(BatchStatus status) {        return status.equals(BatchStatus.COMPLETED) || status.isGreaterThan(BatchStatus.STARTED);    }    @Override    public void setEnvironment(Environment environment) {        this.environment = environment;    }    @Override    public void afterPropertiesSet() {        Assert.notNull(taskExecution, "A taskExecution is required");        if(this.environmentVariablesProvider == null) { this.environmentVariablesProvider =         new CloudEnvironmentVariablesProvider(this.environment);        }        if(this.commandLineArgsProvider == null) { SimpleCommandLineArgsProvider simpleCommandLineArgsProvider = new SimpleCommandLineArgsProvider(); simpleCommandLineArgsProvider.onTaskStartup(taskExecution); this.commandLineArgsProvider = simpleCommandLineArgsProvider;        }    }}

借助静态函数partitionOffset将分区分配给工作程序,以确保每个工作程序接收的分区数量最多相差一个:

static int partitionOffset(int length, int numberOfPartitions, int partitionIndex) {    return partitionIndex * (length / numberOfPartitions) + Math.min(partitionIndex, length % numberOfPartitions);}

在接收端,我创建了DeployerMultiStepExecutionHandler,它从TaskExecutorPartitionHandler继承分区的并行执行,此外还实现了与DeployerMultiPartitionHandler匹配的命令行界面:

@Slf4jpublic class DeployerMultiStepExecutionHandler extends TaskExecutorPartitionHandler implements CommandLineRunner {    private JobExplorer jobExplorer;    private JobRepository jobRepository;    private Log logger = LogFactory.getLog(org.springframework.cloud.task.batch.partition.DeployerStepExecutionHandler.class);    @Autowired    private Environment environment;    private StepLocator stepLocator;    public DeployerMultiStepExecutionHandler(BeanFactory beanFactory, JobExplorer jobExplorer, JobRepository jobRepository) {        Assert.notNull(beanFactory, "A beanFactory is required");        Assert.notNull(jobExplorer, "A jobExplorer is required");        Assert.notNull(jobRepository, "A jobRepository is required");        this.stepLocator = new BeanFactoryStepLocator();        ((BeanFactoryStepLocator) this.stepLocator).setBeanFactory(beanFactory);        this.jobExplorer = jobExplorer;        this.jobRepository = jobRepository;    }    @Override    public void run(String... args) throws Exception {        validateRequest();        Long jobExecutionId = Long.parseLong(environment.getProperty(SPRING_CLOUD_TASK_JOB_EXECUTION_ID));        Stream<Long> stepExecutionIds = Stream.of(environment.getProperty(SPRING_CLOUD_TASK_STEP_EXECUTION_IDS).split(",")).map(Long::parseLong);        Set<StepExecution> stepExecutions = stepExecutionIds.map(stepExecutionId -> jobExplorer.getStepExecution(jobExecutionId, stepExecutionId)).collect(Collectors.toSet());        log.info("found stepExecutions:n{}", stepExecutions.stream().map(stepExecution -> stepExecution.getId() + ":" + stepExecution.getExecutionContext()).collect(joining("n")));        if (stepExecutions.isEmpty()) { throw new NoSuchStepException(String.format("No StepExecution could be located for step execution id %s within job execution %s", stepExecutionIds, jobExecutionId));        }        String stepName = environment.getProperty(SPRING_CLOUD_TASK_STEP_NAME);        setStep(stepLocator.getStep(stepName));        doHandle(null, stepExecutions);    }    private void validateRequest() {        Assert.isTrue(environment.containsProperty(SPRING_CLOUD_TASK_JOB_EXECUTION_ID), "A job execution id is required");        Assert.isTrue(environment.containsProperty(SPRING_CLOUD_TASK_STEP_EXECUTION_IDS), "A step execution id is required");        Assert.isTrue(environment.containsProperty(SPRING_CLOUD_TASK_STEP_NAME), "A step name is required");        Assert.isTrue(this.stepLocator.getStepNames().contains(environment.getProperty(SPRING_CLOUD_TASK_STEP_NAME)), "The step requested cannot be found in the provided BeanFactory");    }}


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/380302.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号