栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

大数据高级开发工程师——工作流调度器Azkaban(2)

大数据高级开发工程师——工作流调度器Azkaban(2)

文章目录

工作流调度器Azkaban

Azkaban使用

Flow 2.0

1. 入门案例 HelloWorld2. 单job有多个command3. 包含多个有依赖关系job的flow4. 自动失败重试5. 手动失败重试

方案一方案二 6. 操作HDFS7. MR任务8. Hive任务

解决方案:指定executor Flow 2.0高级

1. javaprocess 类型2. 条件工作流

运行时参数预定于宏运行时参数混合预定义宏 3. 定时执行4. 邮件告警

注册邮箱邮件告警案例

工作流调度器Azkaban Azkaban使用

azkaban 4.x目前同时支持flow1.0与flow 2.0,但官网说 flow 1.0将来会被淘汰,所以这里使用flow 2.0对flow 1.0感兴趣的同学,可以参考文章自行学习体验Azkaba内置的任务类型支持command、java Flow 2.0 1. 入门案例 HelloWorld

创建文件 flow20.project,内容如下

azkaban-flow-version: 2.0

再创建文件 hello.flow,内容如下(yaml格式)

nodes:
  - name: jobA
    type: command
    config:
      command: echo "Hello World! This is an flow 2.0 example."

文件中要有nodes的部分,包含所有你想要运行的job;需要为每个job指定name、type;大多的job需要config

将这两个文件压缩成一个 zip 包,并起名为 01-hello.zip使用账号/密码 abcrwe/abc123 登录 Azkaban,并创建工程

上传项目zip文件:

执行 flow

下图Execution queued successfully with exec id 1表示,web server选择id是1的exec server执行此流

执行成功

查看日志

2. 单job有多个command

同样需要两个文件 flow20.project 和 .flow 文件,新建 multipleCommand.flow,内容如下

nodes:
  - name: jobA
    type: command
    config:
      command: mkdir /bigdata/install/azkaban-4.0.0/azkaban-exec-server-4.0.0/executions/test1
      command.1: mkdir /bigdata/install/azkaban-4.0.0/azkaban-exec-server-4.0.0/executions/test2

第一个command用command表示,第二个用command.1表示,第三个用command.2表示,以此类推

将文件件 flow20.project 和 multipleCommand.flow 压缩成 zip 文件 02-multipleCommand.zip,并通过 web ui 界面上传,执行 flow,查看日志输出

执行成功


3. 包含多个有依赖关系job的flow

job间可以相互依赖,创建flow文件 dependon.flow,内容如下

nodes:
  - name: jobC
    type: noop
    # jobC depends on jobA and jobB
    dependsOn:
      - jobA
      - jobB

  - name: jobA
    type: command
    config:
      command: echo "This is echoed by jobA."

  - name: jobB
    type: command
    config:
      command: pwd

jobC依赖jobA、jobB

Noop: A job that takes no parameters and is essentially a null operation. Used for organizing your graph

将 dependon.flow 与 flow20.project 压缩生成 zip 文件 03-dependon.zipweb server ui界面创建项目,然后上传项目zip文件,然后执行,并查看Job List及job日志
执行成功

4. 自动失败重试

如果job执行失败,可以配置成自动重试若干次,每次重试时间间隔一定时长创建flow文件 retry.flow,内容如下

nodes: 
  - name: jobA
    type: command
    config:
      command: sh /a_non_exists.sh
      retries: 3
      retry.backoff: 3000

/a_non_exists.sh是一个不存在的sh脚本;retries是重试次数;retry.backoff每次重试的时间间隔,单位毫秒

将 retry.flow 与 flow20.project 压缩生成 zip 文件04-retry.zipweb server ui界面创建项目,然后上传项目zip文件,然后执行,并查看Job List及job日志

执行失败并重试

也可以在flow文件中,加入全局重试次数,此重试配置对flow文件中的所有job都生效。内容如下

config:
  retries: 3
  retry.backoff: 3000
nodes:
  - name: JobA
    type: command
    config:
      command: sh /a_non_exists.sh
5. 手动失败重试

手动失败重试场景:

对于某些 flow 中的失败 job,不能通过自动重试解决的,比如并非一些系统短时的问题,比如暂时的网络故障导致的超时、暂时的资源不足导致的执行失败此时需要手动的做些处理后,然后再进行重新执行 flow 中 job

跳过成功的 job从失败的 job 开始执行 假设在一个 flow 中有 5 个 job,有依赖关系如下:

jobE 依赖 jobDjobD 依赖 jobCjobC 依赖 jobBjobB 依赖 jobA 创建flow文件 manulretry.flow,内容如下

nodes: 
  - name: JobA
    type: command
    config:
      command: echo "This is JobA."
  - name: JobB
    type: command
    dependsOn:
      - JobA
    config:
      command: echo "This is JobB."
  - name: JobC
    type: command
    dependsOn:
      - JobB
    config:
      command: sh /bigdata/install/azkaban-4.0.0/azkaban-exec-server-4.0.0/tmp.sh
  - name: JobD
    type: command
    dependsOn:
      - JobC
    config:
      command: echo "This is JobD."
  - name: JobE
    type: command
    dependsOn:
      - JobD
    config:
      command: echo "This is JobE."

将 manulretry.flow 与 flow20.project 压缩生成 zip 文件05-manulretry.zipweb server ui界面创建项目,然后上传项目zip文件,然后执行,并查看Job List及job日志

执行结果:JobA、JobB 成功,JobC、JobD、JobE 都失败

失败原因是 /bigdata/install/azkaban-4.0.0/azkaban-exec-server-4.0.0/tmp.sh: 没有那个文件或目录,这个无论你自动重试多少次,没有这个脚本就都是失败。那么在对应的 exec server 的对应目录下创建此 sh 脚本文件

由于不确定 flow 重试时,web 选择哪个 exec 执行 flow,所以保险起见,有两个方法方法一:在3个exec节点中都创建 tmp.sh 脚本

#!/bin/bash
echo 'this is echoed by tmp sh script'
方法二:重试flow时,指定执行的executor 方案一

重试成功:JobC、JobD、JobE 重新执行成功,JobA、JobB不重新执行

方案二

进入Projects,找到要重试的项目,执行Flow Execute Flow

重试执行成功:

Enable 和 Disable 下面都分别有如下参数:

Parents:该作业的上一个jobAncestors:该作业前的所有jobChildren:该作业后的一个jobDescendents:该作业后的所有jobEnable All: 所有的job 可以根据实际情况选择enable方案 6. 操作HDFS

首先需要启动 hadoop 集群编写 flow 文件 operateHdfs.flow,内容如下:

nodes:
  - name: jobA
    type: command
    config:
      command: echo "start execute"
      command.1: hdfs dfs -mkdir /azkaban
      command.2: hdfs dfs -put /bigdata/install/hadoop-3.1.4/NOTICE.txt  /azkaban

将 operateHdfs.flow 与 flow20.project 压缩生成 zip 文件06-operateHdfs.zipweb server ui界面创建项目,然后上传项目zip文件,然后执行,并去 hdfs web 界面查看上传的文件

7. MR任务

编写 flow 文件 mr.flow,内容如下:

nodes:
  - name: jobMR
    type: command
    config:
      command: hadoop jar /bigdata/install/hadoop-3.1.4/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.4.jar pi 3 3

为了避免执行mr过程中,对hdfs操作的一些权限问题

[hadoop@centos128 azkaban-exec-server-4.0.0]$ hdfs dfs -chmod -R 777 /tmp/

将 mr.flow 与 flow20.project 压缩生成 zip 文件07-mr.zipweb server ui界面创建项目,然后上传项目zip文件,然后执行,并查看Job List及 job 日志
去yarn界面看看此job的执行情况:

8. Hive任务

编写 hive sql脚本文件 hive.sql,内容如下:

create database if not exists azhive;
use azhive;
create table if not exists aztest(id string,name string) row format delimited fields terminated by 't';

编写 flow 文件 hive.flow,内容如下:

nodes:
  - name: jobHive
    type: command
    config:
      command: hive -f 'hive.sql'

将 hive.sql、hive.flow 与 flow20.project 压缩成 zip 文件 08-hive.zipweb server ui界面创建项目,然后上传项目zip文件,然后执行

执行失败

解决方案:指定executor

指定节点运行失败:

官网提供说明:要指定执行flow的executor的话,azkaban用户必须拥有admin权限

我们在安装 azkaban web 服务时,在文件中指定创建了拥有 ADMIN权限的用户 abcadmin

接下来我们想用 abcadmin登录 azkaban,并指定执行executor是node03(安装了hive的机器)那么,在此之前,得在 node03 创建用户abcadmin,并且此用户属于 node03 的linux用户组myazkaban(参考创建abcrwe的做法即可)

$ sudo groupadd abcadmin
[sudo] hadoop 的密码:
$ sudo useradd -g myazkaban abcadmin
$ sudo passwd abcadmin
更改用户 abcadmin 的密码 。
新的 密码:abc123
无效的密码: 密码未通过字典检查 - 它基于字典单词
重新输入新的 密码:
passwd:所有的身份验证令牌已经成功更新。
# 将kkbadmin添加附加用户组
$ sudo usermod -a -G hadoop abcadmin
# 查看用户 abcadmin
$ sudo id abcadmin
uid=1002(abcadmin) gid=1001(myazkaban) 组=1001(myazkaban),1000(hadoop)

运行前,因为 abcrwe 没有ADMIN权限,所以先退出登录web ui界面,使用abcadmin登录web ui界面。创建项目、上传zip、执行flow并指定executor服务器是node03节点(安装了hive的节点)
Flow 2.0高级 1. javaprocess 类型

type 类型为 javaprocess 的 job,可以运行一个自定义 Java 类的 main 方法,可用的配置如下:

Xms:最小堆Xmx:最大堆classpath:类路径java.class:要运行的 Java 对象,其中必须包含 Main 方法main.args: main 方法的参数 新建一个 azkaban 的 maven 工程,包名:com.yw.azkaban,包中创建类

public class JavaProcessMain {
    public static void main(String[] args) {
        System.out.println("This is " + args[0] + " javaprocess job type test!");
    }
}

将项目打成 jar 包,编写 flow 文件 javaProcess.flow,内容如下:

nodes:
  - name: javaProcessJob
  	type: javaprocess
  	config:
  	  Xms: 96M
  	  Xmx: 200M
  	  java.class: com.yw.azkaban.JavaProcessMain
  	  main.args: MyAzkaban

将 jar 包、flow 文件和 project 文件一起压缩成 zip 文件 09-testJavaProcessweb server ui界面创建项目,然后上传项目zip文件,然后执行

运行失败解决办法:一是在 node01、node02 上都创建 abcadmin 这个用户,二是退出 azkaban 使用 abcrwe 这个用户登录(三个节点上都有该用户)我们先将该项目授权给 abcrwe 这个用户,然后使用 abcrwe 重新登录

再次执行,成功!

2. 条件工作流

官网文档条件工作流功能允许用户自定义条件,决定是否运行某些Job。分两种情况:

运行时参数:可以根据一个job之前的 job 的输出,决定此 job 是执行还是不执行预定义宏:也可以使用基于之前的 job 的 status 预定义宏,决定此 job 是执行还是不执行 在这些条件下,用户可以在确定 Job执行逻辑时获得更大的灵活性

例如,只要父 Job 之一成功,就可以运行当前 Job 运行时参数

原理:

父 Job 将参数写入$JOB_OUTPUT_PROP_FILE环境变量所指向的文件子 Job 使用 ${jobName:param}来获取父 Job 输出的参数,参数值与一个字符串或数字进行比较,来定义执行条件 支持的比较、逻辑运算符有:

先定义一个flow文件,包含两个job

jobA执行一个sh脚本,脚本将当前是星期几的值,写入文件$JOB_OUTPUT_PROP_FILEjobB执行一个脚本,判断如果jobA输出的是星期一或星期三,则jobB执行 flow 文件 runtimeParam.flow 内容如下:

nodes:
  - name: JobA
    type: command
    config:
      command: sh JobA.sh
  - name: JobB
    type: command
    dependsOn:
      - JobA
    config:
      command: sh JobB.sh
    condition: ${JobA:dayOfTheWeek} == 1 || ${JobA:dayOfTheWeek} == 3

JobA.sh 内容如下:

#!/bin/bash
echo "do JobA"
dayOfTheWeek=`date +%w`
echo "{"dayOfTheWeek":$dayOfTheWeek}" > $JOB_OUTPUT_PROP_FILE

JobB.sh 内容如下:

#!/bin/bash
echo "do JobB"

将 JobA.sh、JobB.sh、 runtimeParam.flow 和 flow20.project 打包成10-runtimeParam.zipweb server ui界面创建项目,然后上传项目zip文件,然后执行

由于不是周一或周三,所以 jobB 执行失败,修改 flow 文件,将条件改为:如果jobA输出的是星期二或星期四,则jobB执行,再次上传 zip 包,并运行,成功!

预定于宏

Azkaban 中预置了几个预定义宏,预定义宏会根据当前 job 的所有父 Job 的完成情况进行判断,再决定是否执行当前job。支持的预定义宏

每个预定义宏的相应作业状态

案例:flow 中有3个job

jobA 执行一个sh脚本jobB 执行一个sh脚本jobC 执行一个sh命令,只要jobA或jobB任一个成功,jobC才执行 编写 flow 文件 macro.flow,内容如下:

nodes:
  - name: JobA
    type: command
    config:
      command: sh JobA.sh
  - name: JobB
    type: command
    config:
      command: sh JobB.sh
  - name: JobC
    type: command
    dependsOn:
      - JobA
      - JobB
    config:
      command: echo 'This is jobC'
    condition: one_success

将 JobB.sh、 macro.flow、flow20.project 打包成 11-macro.zip,故意没有将JobA.sh打包到zip文件;这样JobA肯定执行失败web server ui界面创建项目,然后上传项目zip文件,然后执行

运行时参数混合预定义宏

指定job是否执行的条件,除了单独由一个“运行时参数”或一个“预定义宏”指定外,还可以用两类的组合结果指定条件

即若干个“运行时参数”再加上一个“预定义宏”通过逻辑运算,根据最终结果是true还是false,决定job是否执行 举例:

# JobA 的输出参数 param1 的值等于 1,并且 JobB 的输出参数 param2 大于 5
# 两个条件都成立,当前 Job 才执行
condition: ${JobA:param1} == 1 && ${JobB:param2} > 5

# 当前job的父 Job 至少一个成功才执行,当前job才被执行
condition: one_success

# 当前job的所有父 Job 全部完成,并且 JobC 的输出参数 param3 的值不等于 foo
# 两个条件都成立,当前 Job 才执行
condition: all_done && ${JobC:param3} != "foo"

# 根据{JobD:param4}、{JobE:parm5}、all_success、${JobF:parm6} == "bar"进行复杂的逻辑运算后
# 如果最终结果是true,那么当前job才执行
condition: (!{JobD:param4} || !{JobE:parm5}) && all_success || ${JobF:parm6} == "bar"

编写 flow 文件 mixtureCondition.flow,内容如下:

nodes:
 - name: JobA
   type: command
   config:
     command: sh write_to_props.sh

 - name: JobB
   type: command
   dependsOn:
     - JobA
   config:
     command: echo “This is JobB.”
   condition: ${JobA:jobAParam1} == 1

 - name: JobC
   type: command
   dependsOn:
     - JobA
   config:
     command: echo “This is JobC.”
   condition: ${JobA:jobAParam1} == 2

 - name: JobD
   type: command
   dependsOn:
     - JobB
     - JobC
   config:
     command: pwd
   condition: one_success

脚本 write_to_props.sh,内容如下

#!/bin/bash
echo '{"jobAParam1":"1"}' > $JOB_OUTPUT_PROP_FILE

将 mixtureCondition.flow、flow20.project、write_to_props.sh 打包生成 12-macro.zipweb server ui界面创建项目,然后上传项目zip文件,然后执行

3. 定时执行

将入门案例 01-hello.zip 重命名为 13-scheduler.zip,web server ui界面创建项目,然后上传项目zip文件,然后设置调度执行

选择 Schedule 调度执行

参考crontab语法

删除定时调度:

4. 邮件告警

azkaban邮件报警机制,需要发件人邮箱、收件人邮箱 注册邮箱

注册一个126邮箱,此处邮箱地址是bigdata0111@126.com,用于处理发件人邮箱登录邮箱,设置开启SMTP服务

收集微信扫码发送短信
获取到授权码:

SMTP 服务开启成功
邮件告警案例

在 node03 的 web 服务器,配置发送告警的发件人邮箱

$ pwd
/bigdata/install/azkaban-4.0.0/azkaban-web-server-4.0.0
$ vim conf/azkaban.properties

mail.sender指定邮件发件人的邮箱

mail.host指定所用邮件的smtp服务器

mail.user指定用户

mail.password为注册邮箱时,给的授权密码;根据自己的实际的“授权密码”进行修改

保存,退出文件,重启web server

cd ../azkaban-web-server-4.0.0/
$ bin/shutdown-web.sh
Killing web-server. [pid: 8142], attempt: 1
shutdown succeeded
$ bin/start-web.sh
$ jps
8082 AzkabanExecutorServer
7508 DataNode
11495 Jps
7644 NodeManager
11484 AzkabanWebServer

编写flow文件 email.flow,内容如下:

nodes:
  - name: jobA
    type: command
    config:
      command: echo "inform user when job sucessful or failed by sending email."

将 email.flow、flow20.project 打包生成 14-email.zip 文件web server ui界面创建项目,然后上传项目 zip 文件

设置任务执行成功或失败后通知接收人的邮箱地址

执行成功,接收到邮件:

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/707997.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号