栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

任务调度Schedulerx2.0分布式计算之MapReduce模型

任务调度Schedulerx2.0分布式计算之MapReduce模型

简介

阿里巴巴任务调度Schedulerx2.0自研轻量级分布式模型MapReduce,可以进行大数据的实时/离线跑批。通过一个map方法就能将海量数据分布式到多台机器上执行,通过process方法处理子任务的业务,最后通过reduce方法可以获取所有子任务执行的状态和结果。常见场景,比如

  1. 电商领域:通过MapReduce模型不停轮询订单,进行订单确认。如果有订单超时未支付,则关闭订单。如果订单支付完成,并且各参数没问题,则更新订单状态为已完成。
  2. 物流领域:通过MapReduce模型不停扫描入仓的订单,通过订单的收件地址,进行拣货出库。
  3. IoT领域:通过MapReduce模型不停轮询所有设备的状态,如果发现有设备有故障,则汇报给主机,更新设备状态。

同时,MapReduce模型的任务,也可以结合工作流一起使用,通过reduce方法可以返回这次跑批的结果,进行工作流上下游数据传递。

对比大数据跑批的优势

速度快

大数据跑批,需要配合导入导出工具,先将传统数据库中的数据导入到大数据平台中,跑批结束后,再把结果导回数据库中,导入导出增加了很多时间开销。同时大部分大数据跑批(比如Hadoop的MapReduce模型)过程也比较慢,涉及到数据的拆分和中间结果的数据传输,比较耗时间。不适合用来做实时跑批。

Schedulerx2.0的轻量级MapReduce模型,可以直接操作用户的原始数据库数据,不涉及到数据的导入导出和中间结果的数据传输,可以作为实时业务的跑批。

数据安全

大数据跑批,需要首先将数据上传到大数据平台中,如果使用云厂商的大数据平台,用户往往担心数据安全问题。

Schedulerx2.0的跑批,不需要上传数据,计算节点也是用户自己的,没有任何安全问题。

成本低

大数据跑批,需要将数据上传到大数据平台,跑批过程消耗大数据的计算资源,需要为存储和计算成本买单。

Schedulerx2.0的跑批,不需要任何额外的存储和计算成本,只需要应用依赖一个jar包,即可以将应用自己的机器自建成一个分布式计算引擎,进行MapReduce模型的跑批。

编程简单

大数据跑批,需要学习大数据的知识,学习成本比较高。如果涉及到非常复杂的业务逻辑(比如需要比较多的条件判断和循环),无法通过大数据跑批解决。虽然大数据系统一般都有提供UDF,但是使用起来还是比较麻烦。

Schedulerx2.0的跑批,直接编写业务代码,兼容Spring原生语法,可以直接调用已经封装好的各种service代码,开发迅速,可读性高。

接口说明

使用MapReduce模型,只需要继承com.alibaba.schedulerx.worker.processor.MapReduceJobProcessor,该抽象类有如下接口须实现:

接口

是否必选

描述

public ProcessResult map(List taskList, String taskName)

通过map方法分发子任务列表

public ProcessResult process(JobContext context) throws Exception

执行子任务的具体业务实现,通过JobContext可以拿到子任务的信息

public ProcessResult reduce(JobContext context)

所有子任务执行完,会回调reduce方法,可以在JobContext中拿到所有子任务的执行状态和结果

public void kill(JobContext context)

实现该方法,可以主动停止正在执行的子任务

public boolean runReduceIfFail(JobContext context)

如果有子任务失败,是否执行reduce方法,默认是

原理

Schedulerx2.0中,MapReduce模型只有一个reduce,所有子任务完成后会执行reduce方法,原理如下图所示:

可以在reduce方法中返回该任务实例的执行结果,作为工作流的上下游数据传递。

Reduce方法也会通过ProcessResult返回任务状态,只有所有子任务和reduce都返回true,才算这次实例成功。

Demo

订单跑批

1. 构建OrderInfo数据结构

package com.alibaba.schedulerx.test.info;

public class OrderInfo {
    private String id;
    private int value;
    
    public OrderInfo(String id, int value) {
        this.id = id;
        this.value = value;
    }
    
    public String getId() {
        return id;
    }
    
    public void setId(String id) {
        this.id = id;
    }
    
    public int getValue() {
        return value;
    }
    
    public void setValue(int value) {
        this.value = value;
    }
    
    @Override
    public String toString() {
        return "OrderInfo [id=" + id + ", value=" + value + "]";
    }
    
}

2. 继承MapReduceProcessor,创建订单处理任务

package com.alibaba.schedulerx.test.processor;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import com.alibaba.schedulerx.common.domain.TaskStatus;
import com.alibaba.schedulerx.test.processor.TestMapReduceJobProcessor.OrderInfo;
import com.alibaba.schedulerx.worker.domain.JobContext;
import com.alibaba.schedulerx.worker.processor.MapReduceJobProcessor;
import com.alibaba.schedulerx.worker.processor.ProcessResult;


public class TradeOrderJob extends MapReduceJobProcessor {
     
    @Override
    public ProcessResult process(JobContext context) {
        String taskName = context.getTaskName();
        if (isRootTask(context)) {
            System.out.println("start root task");
            List orderInfos = getOrderInfos();
            return map(orderInfos, "OrderInfo");
        } else if (taskName.equals("OrderInfo")) {
            OrderInfo orderInfo = (OrderInfo)context.getTask();
            System.out.print(orderInfo);
            return new ProcessResult(true, String.valueOf(orderInfo.getValue()));
        }
        return new ProcessResult(false);
    }
    
    @Override
    public ProcessResult reduce(JobContext context) throws Exception {
        Map allTaskResults = context.getTaskResults();
        Map allTaskStatuses = context.getTaskStatuses();
        long sum = 0;
        for (Entry entry : allTaskResults.entrySet()) {
            System.out.println(entry.getKey() + ":" + entry.getValue());
            // 过滤根任务
            if (entry.getKey() == 0) {
                continue;
            }
            if (allTaskStatuses.get(entry.getKey()).equals(TaskStatus.SUCCESS)) {
                sum += Integer.valueOf(entry.getValue());
            }
        }
        
        System.out.print("reduce: count=" + sum);
        return new ProcessResult(true, String.valueOf(sum));
    }
    
    
    private List getOrderInfos() {
        List orderList = new ArrayList<>();
        for (int i = 1; i <= 20; i++) {
            OrderInfo orderInfo = new OrderInfo("id_" + i, i);
            orderList.add(orderInfo);
        }
        return orderList;
    }
    
}

3. 注册2个worker实例,执行任务如下

可以看到如程序描述,构造了20个订单,分别由两个worker分布式执行,最后reduce汇总结果为1+2+...+20=210

在工作流中的集成使用

举个例子,比如一个工作流JobA->JobB->JobC。JobA和JobC是java任务单机执行,JobB是网格计算MapReduce任务。代码如下:

public class TestJobA extends JavaProcessor {
    @Override
    public ProcessResult process(JobContext context) throws Exception {
        System.out.println("hello JobA");
        return new ProcessResult(true, String.valueOf(10));
    }
}
public class TestJobB extends MapReduceJobProcessor {
    @Override
    public ProcessResult process(JobContext context) {
        String executorName = context.getTaskName();
        if (executorName.equals(WorkerConstants.MAP_TASK_ROOT_NAME)) {
            System.out.println("start root task");
            String upstreamData = context.getUpstreamData().get(0).getData();
            int dispatchNum = Integer.valueOf(upstreamData);
            List msgList = Lists.newArrayList();
            for (int i = 0; i <= dispatchNum; i++) {
                msgList.add("msg_" + i);
            }
            return map(msgList, "Level1Dispatch");
        } else if (executorName.equals("Level1Dispatch")) {
            String executor = (String)context.getTask();
            System.out.println(executor);
            return new ProcessResult(true);
        }
        
        return new ProcessResult(false);
    }
    
    public ProcessResult reduce(JobContext context) throws Exception {
        return new ProcessResult(true, "520");
    }
}
public class TestJobC extends JavaProcessor {
    
    @Override
    public ProcessResult process(JobContext context) throws Exception {
        System.out.println("hello JobC");
        String upstreamData = context.getUpstreamData().get(0).getData();
        System.out.print(upstreamData);
        return new ProcessResult(true);
    }
}

执行结果如下:

jobA输出了10,jobB产生了0~10个msg并通过reduce输出520,jobC打印520。

如何快速定位任务失败的原因

如果通过MapReduce跑批的业务有问题,如何快速在分布式应用中分析问题(比如订单跑批业务,如何快速分析出有问题的订单处)?

阿里任务调度SchedulerX提供了日志服务解决方案,可以搜集每次跑批的日志,通过控制台可以快速检索出问题的原因。以Demo-订单跑批为例,进行简单修改

package com.alibaba.schedulerx.test.processor;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import com.alibaba.schedulerx.common.domain.TaskStatus;
import com.alibaba.schedulerx.test.processor.TestMapReduceJobProcessor.OrderInfo;
import com.alibaba.schedulerx.worker.domain.JobContext;
import com.alibaba.schedulerx.worker.processor.MapReduceJobProcessor;
import com.alibaba.schedulerx.worker.processor.ProcessResult;


public class TradeOrderJob extends MapReduceJobProcessor {
    private static final Logger LOGGER = LogManager.getLogger("schedulerx");
     
    @Override
    public ProcessResult process(JobContext context) {
        String taskName = context.getTaskName();
        if (isRootTask(context)) {
            LOGGER.info("start root task");
            List orderInfos = getOrderInfos();
            return map(orderInfos, "OrderInfo");
        } else if (taskName.equals("OrderInfo")) {
            OrderInfo orderInfo = (OrderInfo)context.getTask();
            //id_10这个订单,构造一个异常(1/0会抛异常)
            if (orderInfo.getId().equals("id_10")) {
                int a = 1/0;
            }
            LOGGER.info(orderInfo);
            return new ProcessResult(true, String.valueOf(orderInfo.getValue()));
        }
        return new ProcessResult(false);
    }
    
    @Override
    public ProcessResult reduce(JobContext context) throws Exception {
        Map allTaskResults = context.getTaskResults();
        Map allTaskStatuses = context.getTaskStatuses();
        long sum = 0;
        for (Entry entry : allTaskResults.entrySet()) {
            // 过滤根任务
            if (entry.getKey() == 0) {
                continue;
            }
            if (allTaskStatuses.get(entry.getKey()).equals(TaskStatus.SUCCESS)) {
                sum += Integer.valueOf(entry.getValue());
            }
        }
        
        LOGGER.info("reduce: count=" + sum);
        return new ProcessResult(true, String.valueOf(sum));
    }
    
    
    private List getOrderInfos() {
        List orderList = new ArrayList<>();
        for (int i = 1; i <= 20; i++) {
            OrderInfo orderInfo = new OrderInfo("id_" + i, i);
            orderList.add(orderInfo);
        }
        return orderList;
    }
    
}

通过任务管理->历史记录,可以看到最近执行的历史,发现任务执行失败了

点击日志,可以看到这次跑批的所有日志

通过关键字搜索"ERROR",可以快速定位到失败的原因


原文链接:阿里任务调度Schedulerx2.0之MapReduce模型-阿里云开发者社区 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/762309.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号