阿里巴巴任务调度Schedulerx2.0自研轻量级分布式模型MapReduce,可以进行大数据的实时/离线跑批。通过一个map方法就能将海量数据分布式到多台机器上执行,通过process方法处理子任务的业务,最后通过reduce方法可以获取所有子任务执行的状态和结果。常见场景,比如
- 电商领域:通过MapReduce模型不停轮询订单,进行订单确认。如果有订单超时未支付,则关闭订单。如果订单支付完成,并且各参数没问题,则更新订单状态为已完成。
- 物流领域:通过MapReduce模型不停扫描入仓的订单,通过订单的收件地址,进行拣货出库。
- IoT领域:通过MapReduce模型不停轮询所有设备的状态,如果发现有设备有故障,则汇报给主机,更新设备状态。
同时,MapReduce模型的任务,也可以结合工作流一起使用,通过reduce方法可以返回这次跑批的结果,进行工作流上下游数据传递。
对比大数据跑批的优势
速度快
大数据跑批,需要配合导入导出工具,先将传统数据库中的数据导入到大数据平台中,跑批结束后,再把结果导回数据库中,导入导出增加了很多时间开销。同时大部分大数据跑批(比如Hadoop的MapReduce模型)过程也比较慢,涉及到数据的拆分和中间结果的数据传输,比较耗时间。不适合用来做实时跑批。
Schedulerx2.0的轻量级MapReduce模型,可以直接操作用户的原始数据库数据,不涉及到数据的导入导出和中间结果的数据传输,可以作为实时业务的跑批。
数据安全
大数据跑批,需要首先将数据上传到大数据平台中,如果使用云厂商的大数据平台,用户往往担心数据安全问题。
Schedulerx2.0的跑批,不需要上传数据,计算节点也是用户自己的,没有任何安全问题。
成本低
大数据跑批,需要将数据上传到大数据平台,跑批过程消耗大数据的计算资源,需要为存储和计算成本买单。
Schedulerx2.0的跑批,不需要任何额外的存储和计算成本,只需要应用依赖一个jar包,即可以将应用自己的机器自建成一个分布式计算引擎,进行MapReduce模型的跑批。
编程简单
大数据跑批,需要学习大数据的知识,学习成本比较高。如果涉及到非常复杂的业务逻辑(比如需要比较多的条件判断和循环),无法通过大数据跑批解决。虽然大数据系统一般都有提供UDF,但是使用起来还是比较麻烦。
Schedulerx2.0的跑批,直接编写业务代码,兼容Spring原生语法,可以直接调用已经封装好的各种service代码,开发迅速,可读性高。
接口说明
使用MapReduce模型,只需要继承com.alibaba.schedulerx.worker.processor.MapReduceJobProcessor,该抽象类有如下接口须实现:
| 接口 | 是否必选 | 描述 |
| public ProcessResult map(List extends Object> taskList, String taskName) | 是 | 通过map方法分发子任务列表 |
| public ProcessResult process(JobContext context) throws Exception | 是 | 执行子任务的具体业务实现,通过JobContext可以拿到子任务的信息 |
| public ProcessResult reduce(JobContext context) | 是 | 所有子任务执行完,会回调reduce方法,可以在JobContext中拿到所有子任务的执行状态和结果 |
| public void kill(JobContext context) | 否 | 实现该方法,可以主动停止正在执行的子任务 |
| public boolean runReduceIfFail(JobContext context) | 否 | 如果有子任务失败,是否执行reduce方法,默认是 |
原理
Schedulerx2.0中,MapReduce模型只有一个reduce,所有子任务完成后会执行reduce方法,原理如下图所示:
可以在reduce方法中返回该任务实例的执行结果,作为工作流的上下游数据传递。
Reduce方法也会通过ProcessResult返回任务状态,只有所有子任务和reduce都返回true,才算这次实例成功。
Demo
订单跑批
1. 构建OrderInfo数据结构
package com.alibaba.schedulerx.test.info;
public class OrderInfo {
private String id;
private int value;
public OrderInfo(String id, int value) {
this.id = id;
this.value = value;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public int getValue() {
return value;
}
public void setValue(int value) {
this.value = value;
}
@Override
public String toString() {
return "OrderInfo [id=" + id + ", value=" + value + "]";
}
}
2. 继承MapReduceProcessor,创建订单处理任务
package com.alibaba.schedulerx.test.processor;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import com.alibaba.schedulerx.common.domain.TaskStatus;
import com.alibaba.schedulerx.test.processor.TestMapReduceJobProcessor.OrderInfo;
import com.alibaba.schedulerx.worker.domain.JobContext;
import com.alibaba.schedulerx.worker.processor.MapReduceJobProcessor;
import com.alibaba.schedulerx.worker.processor.ProcessResult;
public class TradeOrderJob extends MapReduceJobProcessor {
@Override
public ProcessResult process(JobContext context) {
String taskName = context.getTaskName();
if (isRootTask(context)) {
System.out.println("start root task");
List orderInfos = getOrderInfos();
return map(orderInfos, "OrderInfo");
} else if (taskName.equals("OrderInfo")) {
OrderInfo orderInfo = (OrderInfo)context.getTask();
System.out.print(orderInfo);
return new ProcessResult(true, String.valueOf(orderInfo.getValue()));
}
return new ProcessResult(false);
}
@Override
public ProcessResult reduce(JobContext context) throws Exception {
Map allTaskResults = context.getTaskResults();
Map allTaskStatuses = context.getTaskStatuses();
long sum = 0;
for (Entry entry : allTaskResults.entrySet()) {
System.out.println(entry.getKey() + ":" + entry.getValue());
// 过滤根任务
if (entry.getKey() == 0) {
continue;
}
if (allTaskStatuses.get(entry.getKey()).equals(TaskStatus.SUCCESS)) {
sum += Integer.valueOf(entry.getValue());
}
}
System.out.print("reduce: count=" + sum);
return new ProcessResult(true, String.valueOf(sum));
}
private List getOrderInfos() {
List orderList = new ArrayList<>();
for (int i = 1; i <= 20; i++) {
OrderInfo orderInfo = new OrderInfo("id_" + i, i);
orderList.add(orderInfo);
}
return orderList;
}
}
3. 注册2个worker实例,执行任务如下
可以看到如程序描述,构造了20个订单,分别由两个worker分布式执行,最后reduce汇总结果为1+2+...+20=210
在工作流中的集成使用
举个例子,比如一个工作流JobA->JobB->JobC。JobA和JobC是java任务单机执行,JobB是网格计算MapReduce任务。代码如下:
public class TestJobA extends JavaProcessor {
@Override
public ProcessResult process(JobContext context) throws Exception {
System.out.println("hello JobA");
return new ProcessResult(true, String.valueOf(10));
}
}
public class TestJobB extends MapReduceJobProcessor {
@Override
public ProcessResult process(JobContext context) {
String executorName = context.getTaskName();
if (executorName.equals(WorkerConstants.MAP_TASK_ROOT_NAME)) {
System.out.println("start root task");
String upstreamData = context.getUpstreamData().get(0).getData();
int dispatchNum = Integer.valueOf(upstreamData);
List msgList = Lists.newArrayList();
for (int i = 0; i <= dispatchNum; i++) {
msgList.add("msg_" + i);
}
return map(msgList, "Level1Dispatch");
} else if (executorName.equals("Level1Dispatch")) {
String executor = (String)context.getTask();
System.out.println(executor);
return new ProcessResult(true);
}
return new ProcessResult(false);
}
public ProcessResult reduce(JobContext context) throws Exception {
return new ProcessResult(true, "520");
}
}
public class TestJobC extends JavaProcessor {
@Override
public ProcessResult process(JobContext context) throws Exception {
System.out.println("hello JobC");
String upstreamData = context.getUpstreamData().get(0).getData();
System.out.print(upstreamData);
return new ProcessResult(true);
}
}
执行结果如下:
jobA输出了10,jobB产生了0~10个msg并通过reduce输出520,jobC打印520。
如何快速定位任务失败的原因
如果通过MapReduce跑批的业务有问题,如何快速在分布式应用中分析问题(比如订单跑批业务,如何快速分析出有问题的订单处)?
阿里任务调度SchedulerX提供了日志服务解决方案,可以搜集每次跑批的日志,通过控制台可以快速检索出问题的原因。以Demo-订单跑批为例,进行简单修改
package com.alibaba.schedulerx.test.processor;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.alibaba.schedulerx.common.domain.TaskStatus;
import com.alibaba.schedulerx.test.processor.TestMapReduceJobProcessor.OrderInfo;
import com.alibaba.schedulerx.worker.domain.JobContext;
import com.alibaba.schedulerx.worker.processor.MapReduceJobProcessor;
import com.alibaba.schedulerx.worker.processor.ProcessResult;
public class TradeOrderJob extends MapReduceJobProcessor {
private static final Logger LOGGER = LogManager.getLogger("schedulerx");
@Override
public ProcessResult process(JobContext context) {
String taskName = context.getTaskName();
if (isRootTask(context)) {
LOGGER.info("start root task");
List orderInfos = getOrderInfos();
return map(orderInfos, "OrderInfo");
} else if (taskName.equals("OrderInfo")) {
OrderInfo orderInfo = (OrderInfo)context.getTask();
//id_10这个订单,构造一个异常(1/0会抛异常)
if (orderInfo.getId().equals("id_10")) {
int a = 1/0;
}
LOGGER.info(orderInfo);
return new ProcessResult(true, String.valueOf(orderInfo.getValue()));
}
return new ProcessResult(false);
}
@Override
public ProcessResult reduce(JobContext context) throws Exception {
Map allTaskResults = context.getTaskResults();
Map allTaskStatuses = context.getTaskStatuses();
long sum = 0;
for (Entry entry : allTaskResults.entrySet()) {
// 过滤根任务
if (entry.getKey() == 0) {
continue;
}
if (allTaskStatuses.get(entry.getKey()).equals(TaskStatus.SUCCESS)) {
sum += Integer.valueOf(entry.getValue());
}
}
LOGGER.info("reduce: count=" + sum);
return new ProcessResult(true, String.valueOf(sum));
}
private List getOrderInfos() {
List orderList = new ArrayList<>();
for (int i = 1; i <= 20; i++) {
OrderInfo orderInfo = new OrderInfo("id_" + i, i);
orderList.add(orderInfo);
}
return orderList;
}
}
通过任务管理->历史记录,可以看到最近执行的历史,发现任务执行失败了
点击日志,可以看到这次跑批的所有日志
通过关键字搜索"ERROR",可以快速定位到失败的原因
原文链接:阿里任务调度Schedulerx2.0之MapReduce模型-阿里云开发者社区



