工作流调度器Azkaban
Azkaban使用
Flow 2.0
1. 入门案例 HelloWorld2. 单job有多个command3. 包含多个有依赖关系job的flow4. 自动失败重试5. 手动失败重试
方案一方案二 6. 操作HDFS7. MR任务8. Hive任务
解决方案:指定executor Flow 2.0高级
1. javaprocess 类型2. 条件工作流
运行时参数预定于宏运行时参数混合预定义宏 3. 定时执行4. 邮件告警
注册邮箱邮件告警案例
工作流调度器Azkaban Azkaban使用azkaban 4.x目前同时支持flow1.0与flow 2.0,但官网说 flow 1.0将来会被淘汰,所以这里使用flow 2.0对flow 1.0感兴趣的同学,可以参考文章自行学习体验Azkaba内置的任务类型支持command、java Flow 2.0 1. 入门案例 HelloWorld
创建文件 flow20.project,内容如下
azkaban-flow-version: 2.0
再创建文件 hello.flow,内容如下(yaml格式)
nodes:
- name: jobA
type: command
config:
command: echo "Hello World! This is an flow 2.0 example."
文件中要有nodes的部分,包含所有你想要运行的job;需要为每个job指定name、type;大多的job需要config
将这两个文件压缩成一个 zip 包,并起名为 01-hello.zip使用账号/密码 abcrwe/abc123 登录 Azkaban,并创建工程
上传项目zip文件:
执行 flow
下图Execution queued successfully with exec id 1表示,web server选择id是1的exec server执行此流
执行成功
查看日志
2. 单job有多个command同样需要两个文件 flow20.project 和 .flow 文件,新建 multipleCommand.flow,内容如下
nodes:
- name: jobA
type: command
config:
command: mkdir /bigdata/install/azkaban-4.0.0/azkaban-exec-server-4.0.0/executions/test1
command.1: mkdir /bigdata/install/azkaban-4.0.0/azkaban-exec-server-4.0.0/executions/test2
第一个command用command表示,第二个用command.1表示,第三个用command.2表示,以此类推
将文件件 flow20.project 和 multipleCommand.flow 压缩成 zip 文件 02-multipleCommand.zip,并通过 web ui 界面上传,执行 flow,查看日志输出
执行成功
job间可以相互依赖,创建flow文件 dependon.flow,内容如下
nodes:
- name: jobC
type: noop
# jobC depends on jobA and jobB
dependsOn:
- jobA
- jobB
- name: jobA
type: command
config:
command: echo "This is echoed by jobA."
- name: jobB
type: command
config:
command: pwd
jobC依赖jobA、jobB
Noop: A job that takes no parameters and is essentially a null operation. Used for organizing your graph
将 dependon.flow 与 flow20.project 压缩生成 zip 文件 03-dependon.zipweb server ui界面创建项目,然后上传项目zip文件,然后执行,并查看Job List及job日志
执行成功
如果job执行失败,可以配置成自动重试若干次,每次重试时间间隔一定时长创建flow文件 retry.flow,内容如下
nodes:
- name: jobA
type: command
config:
command: sh /a_non_exists.sh
retries: 3
retry.backoff: 3000
/a_non_exists.sh是一个不存在的sh脚本;retries是重试次数;retry.backoff每次重试的时间间隔,单位毫秒
将 retry.flow 与 flow20.project 压缩生成 zip 文件04-retry.zipweb server ui界面创建项目,然后上传项目zip文件,然后执行,并查看Job List及job日志
执行失败并重试
也可以在flow文件中,加入全局重试次数,此重试配置对flow文件中的所有job都生效。内容如下
config:
retries: 3
retry.backoff: 3000
nodes:
- name: JobA
type: command
config:
command: sh /a_non_exists.sh
5. 手动失败重试
手动失败重试场景:
对于某些 flow 中的失败 job,不能通过自动重试解决的,比如并非一些系统短时的问题,比如暂时的网络故障导致的超时、暂时的资源不足导致的执行失败此时需要手动的做些处理后,然后再进行重新执行 flow 中 job
跳过成功的 job从失败的 job 开始执行 假设在一个 flow 中有 5 个 job,有依赖关系如下:
jobE 依赖 jobDjobD 依赖 jobCjobC 依赖 jobBjobB 依赖 jobA 创建flow文件 manulretry.flow,内容如下
nodes:
- name: JobA
type: command
config:
command: echo "This is JobA."
- name: JobB
type: command
dependsOn:
- JobA
config:
command: echo "This is JobB."
- name: JobC
type: command
dependsOn:
- JobB
config:
command: sh /bigdata/install/azkaban-4.0.0/azkaban-exec-server-4.0.0/tmp.sh
- name: JobD
type: command
dependsOn:
- JobC
config:
command: echo "This is JobD."
- name: JobE
type: command
dependsOn:
- JobD
config:
command: echo "This is JobE."
将 manulretry.flow 与 flow20.project 压缩生成 zip 文件05-manulretry.zipweb server ui界面创建项目,然后上传项目zip文件,然后执行,并查看Job List及job日志
执行结果:JobA、JobB 成功,JobC、JobD、JobE 都失败
失败原因是 /bigdata/install/azkaban-4.0.0/azkaban-exec-server-4.0.0/tmp.sh: 没有那个文件或目录,这个无论你自动重试多少次,没有这个脚本就都是失败。那么在对应的 exec server 的对应目录下创建此 sh 脚本文件
由于不确定 flow 重试时,web 选择哪个 exec 执行 flow,所以保险起见,有两个方法方法一:在3个exec节点中都创建 tmp.sh 脚本
#!/bin/bash echo 'this is echoed by tmp sh script'方法二:重试flow时,指定执行的executor 方案一
重试成功:JobC、JobD、JobE 重新执行成功,JobA、JobB不重新执行
方案二进入Projects,找到要重试的项目,执行Flow Execute Flow
重试执行成功:
Enable 和 Disable 下面都分别有如下参数:
Parents:该作业的上一个jobAncestors:该作业前的所有jobChildren:该作业后的一个jobDescendents:该作业后的所有jobEnable All: 所有的job 可以根据实际情况选择enable方案 6. 操作HDFS
首先需要启动 hadoop 集群编写 flow 文件 operateHdfs.flow,内容如下:
nodes:
- name: jobA
type: command
config:
command: echo "start execute"
command.1: hdfs dfs -mkdir /azkaban
command.2: hdfs dfs -put /bigdata/install/hadoop-3.1.4/NOTICE.txt /azkaban
将 operateHdfs.flow 与 flow20.project 压缩生成 zip 文件06-operateHdfs.zipweb server ui界面创建项目,然后上传项目zip文件,然后执行,并去 hdfs web 界面查看上传的文件
7. MR任务编写 flow 文件 mr.flow,内容如下:
nodes:
- name: jobMR
type: command
config:
command: hadoop jar /bigdata/install/hadoop-3.1.4/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.4.jar pi 3 3
为了避免执行mr过程中,对hdfs操作的一些权限问题
[hadoop@centos128 azkaban-exec-server-4.0.0]$ hdfs dfs -chmod -R 777 /tmp/
将 mr.flow 与 flow20.project 压缩生成 zip 文件07-mr.zipweb server ui界面创建项目,然后上传项目zip文件,然后执行,并查看Job List及 job 日志
去yarn界面看看此job的执行情况:
编写 hive sql脚本文件 hive.sql,内容如下:
create database if not exists azhive; use azhive; create table if not exists aztest(id string,name string) row format delimited fields terminated by 't';
编写 flow 文件 hive.flow,内容如下:
nodes:
- name: jobHive
type: command
config:
command: hive -f 'hive.sql'
将 hive.sql、hive.flow 与 flow20.project 压缩成 zip 文件 08-hive.zipweb server ui界面创建项目,然后上传项目zip文件,然后执行
执行失败
解决方案:指定executor
指定节点运行失败:
官网提供说明:要指定执行flow的executor的话,azkaban用户必须拥有admin权限
我们在安装 azkaban web 服务时,在文件中指定创建了拥有 ADMIN权限的用户 abcadmin
接下来我们想用 abcadmin登录 azkaban,并指定执行executor是node03(安装了hive的机器)那么,在此之前,得在 node03 创建用户abcadmin,并且此用户属于 node03 的linux用户组myazkaban(参考创建abcrwe的做法即可)
$ sudo groupadd abcadmin [sudo] hadoop 的密码: $ sudo useradd -g myazkaban abcadmin $ sudo passwd abcadmin 更改用户 abcadmin 的密码 。 新的 密码:abc123 无效的密码: 密码未通过字典检查 - 它基于字典单词 重新输入新的 密码: passwd:所有的身份验证令牌已经成功更新。 # 将kkbadmin添加附加用户组 $ sudo usermod -a -G hadoop abcadmin # 查看用户 abcadmin $ sudo id abcadmin uid=1002(abcadmin) gid=1001(myazkaban) 组=1001(myazkaban),1000(hadoop)
运行前,因为 abcrwe 没有ADMIN权限,所以先退出登录web ui界面,使用abcadmin登录web ui界面。创建项目、上传zip、执行flow并指定executor服务器是node03节点(安装了hive的节点)
Flow 2.0高级
1. javaprocess 类型
type 类型为 javaprocess 的 job,可以运行一个自定义 Java 类的 main 方法,可用的配置如下:
Xms:最小堆Xmx:最大堆classpath:类路径java.class:要运行的 Java 对象,其中必须包含 Main 方法main.args: main 方法的参数 新建一个 azkaban 的 maven 工程,包名:com.yw.azkaban,包中创建类
public class JavaProcessMain {
public static void main(String[] args) {
System.out.println("This is " + args[0] + " javaprocess job type test!");
}
}
将项目打成 jar 包,编写 flow 文件 javaProcess.flow,内容如下:
nodes: - name: javaProcessJob type: javaprocess config: Xms: 96M Xmx: 200M java.class: com.yw.azkaban.JavaProcessMain main.args: MyAzkaban
将 jar 包、flow 文件和 project 文件一起压缩成 zip 文件 09-testJavaProcessweb server ui界面创建项目,然后上传项目zip文件,然后执行
运行失败解决办法:一是在 node01、node02 上都创建 abcadmin 这个用户,二是退出 azkaban 使用 abcrwe 这个用户登录(三个节点上都有该用户)我们先将该项目授权给 abcrwe 这个用户,然后使用 abcrwe 重新登录
再次执行,成功!
2. 条件工作流官网文档条件工作流功能允许用户自定义条件,决定是否运行某些Job。分两种情况:
运行时参数:可以根据一个job之前的 job 的输出,决定此 job 是执行还是不执行预定义宏:也可以使用基于之前的 job 的 status 预定义宏,决定此 job 是执行还是不执行 在这些条件下,用户可以在确定 Job执行逻辑时获得更大的灵活性
例如,只要父 Job 之一成功,就可以运行当前 Job 运行时参数
原理:
父 Job 将参数写入$JOB_OUTPUT_PROP_FILE环境变量所指向的文件子 Job 使用 ${jobName:param}来获取父 Job 输出的参数,参数值与一个字符串或数字进行比较,来定义执行条件 支持的比较、逻辑运算符有:
先定义一个flow文件,包含两个job
jobA执行一个sh脚本,脚本将当前是星期几的值,写入文件$JOB_OUTPUT_PROP_FILEjobB执行一个脚本,判断如果jobA输出的是星期一或星期三,则jobB执行 flow 文件 runtimeParam.flow 内容如下:
nodes:
- name: JobA
type: command
config:
command: sh JobA.sh
- name: JobB
type: command
dependsOn:
- JobA
config:
command: sh JobB.sh
condition: ${JobA:dayOfTheWeek} == 1 || ${JobA:dayOfTheWeek} == 3
JobA.sh 内容如下:
#!/bin/bash
echo "do JobA"
dayOfTheWeek=`date +%w`
echo "{"dayOfTheWeek":$dayOfTheWeek}" > $JOB_OUTPUT_PROP_FILE
JobB.sh 内容如下:
#!/bin/bash echo "do JobB"
将 JobA.sh、JobB.sh、 runtimeParam.flow 和 flow20.project 打包成10-runtimeParam.zipweb server ui界面创建项目,然后上传项目zip文件,然后执行
由于不是周一或周三,所以 jobB 执行失败,修改 flow 文件,将条件改为:如果jobA输出的是星期二或星期四,则jobB执行,再次上传 zip 包,并运行,成功!
预定于宏Azkaban 中预置了几个预定义宏,预定义宏会根据当前 job 的所有父 Job 的完成情况进行判断,再决定是否执行当前job。支持的预定义宏
每个预定义宏的相应作业状态
案例:flow 中有3个job
jobA 执行一个sh脚本jobB 执行一个sh脚本jobC 执行一个sh命令,只要jobA或jobB任一个成功,jobC才执行 编写 flow 文件 macro.flow,内容如下:
nodes:
- name: JobA
type: command
config:
command: sh JobA.sh
- name: JobB
type: command
config:
command: sh JobB.sh
- name: JobC
type: command
dependsOn:
- JobA
- JobB
config:
command: echo 'This is jobC'
condition: one_success
将 JobB.sh、 macro.flow、flow20.project 打包成 11-macro.zip,故意没有将JobA.sh打包到zip文件;这样JobA肯定执行失败web server ui界面创建项目,然后上传项目zip文件,然后执行
运行时参数混合预定义宏指定job是否执行的条件,除了单独由一个“运行时参数”或一个“预定义宏”指定外,还可以用两类的组合结果指定条件
即若干个“运行时参数”再加上一个“预定义宏”通过逻辑运算,根据最终结果是true还是false,决定job是否执行 举例:
# JobA 的输出参数 param1 的值等于 1,并且 JobB 的输出参数 param2 大于 5
# 两个条件都成立,当前 Job 才执行
condition: ${JobA:param1} == 1 && ${JobB:param2} > 5
# 当前job的父 Job 至少一个成功才执行,当前job才被执行
condition: one_success
# 当前job的所有父 Job 全部完成,并且 JobC 的输出参数 param3 的值不等于 foo
# 两个条件都成立,当前 Job 才执行
condition: all_done && ${JobC:param3} != "foo"
# 根据{JobD:param4}、{JobE:parm5}、all_success、${JobF:parm6} == "bar"进行复杂的逻辑运算后
# 如果最终结果是true,那么当前job才执行
condition: (!{JobD:param4} || !{JobE:parm5}) && all_success || ${JobF:parm6} == "bar"
编写 flow 文件 mixtureCondition.flow,内容如下:
nodes:
- name: JobA
type: command
config:
command: sh write_to_props.sh
- name: JobB
type: command
dependsOn:
- JobA
config:
command: echo “This is JobB.”
condition: ${JobA:jobAParam1} == 1
- name: JobC
type: command
dependsOn:
- JobA
config:
command: echo “This is JobC.”
condition: ${JobA:jobAParam1} == 2
- name: JobD
type: command
dependsOn:
- JobB
- JobC
config:
command: pwd
condition: one_success
脚本 write_to_props.sh,内容如下
#!/bin/bash
echo '{"jobAParam1":"1"}' > $JOB_OUTPUT_PROP_FILE
将 mixtureCondition.flow、flow20.project、write_to_props.sh 打包生成 12-macro.zipweb server ui界面创建项目,然后上传项目zip文件,然后执行
3. 定时执行将入门案例 01-hello.zip 重命名为 13-scheduler.zip,web server ui界面创建项目,然后上传项目zip文件,然后设置调度执行
选择 Schedule 调度执行
参考crontab语法
删除定时调度:
4. 邮件告警azkaban邮件报警机制,需要发件人邮箱、收件人邮箱 注册邮箱
注册一个126邮箱,此处邮箱地址是bigdata0111@126.com,用于处理发件人邮箱登录邮箱,设置开启SMTP服务
收集微信扫码发送短信
获取到授权码:
SMTP 服务开启成功
邮件告警案例
在 node03 的 web 服务器,配置发送告警的发件人邮箱
$ pwd /bigdata/install/azkaban-4.0.0/azkaban-web-server-4.0.0 $ vim conf/azkaban.properties
mail.sender指定邮件发件人的邮箱
mail.host指定所用邮件的smtp服务器
mail.user指定用户
mail.password为注册邮箱时,给的授权密码;根据自己的实际的“授权密码”进行修改
保存,退出文件,重启web server
cd ../azkaban-web-server-4.0.0/ $ bin/shutdown-web.sh Killing web-server. [pid: 8142], attempt: 1 shutdown succeeded $ bin/start-web.sh $ jps 8082 AzkabanExecutorServer 7508 DataNode 11495 Jps 7644 NodeManager 11484 AzkabanWebServer
编写flow文件 email.flow,内容如下:
nodes:
- name: jobA
type: command
config:
command: echo "inform user when job sucessful or failed by sending email."
将 email.flow、flow20.project 打包生成 14-email.zip 文件web server ui界面创建项目,然后上传项目 zip 文件
设置任务执行成功或失败后通知接收人的邮箱地址
执行成功,接收到邮件:



