栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

大数据学习教程SD版第十七篇【Airflow】

大数据学习教程SD版第十七篇【Airflow】

Airflow 任务调度工具+ETL工具(不常用)

Apache开源 ,编程(Python)、调度、监控 (UI) 动态 可扩展 优雅 可伸缩 上手简单

1. Airflow 安装

本地安装

# 最好使用conda创建一个虚拟Python环境,防止本机Python环境不兼容
# 0.config
export AIRFLOW_HOME=/opt/module/airflow
AIRFLOW_VERSION=2.2.3
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
# 1. install
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

# 2.init db
airflow db init

# 3.create admin
airflow users create 
    --username airflow 
    --firstname jin 
    --lastname suo 
    --role Admin 
    --email jinsuo@eamil.com
    
# 4. run
airflow webserver --port 8080 -D
airflow scheduler -D

# 5.stop
ps -ef |grep airflow|grep master|cut -d ' ' -f 7|xargs kill -9
ps -ef |grep airflow|grep scheduler|cut -d ' ' -f 7|xargs kill -9
2. Airflow 架构及组件

1个工作流(Workflow)= 1个 DAG = n 个有依赖的 Task

4大核心组件

    Scheduler 调度器 包含(Executor)WebServer 浏览器UImetadata database 元数据存储Worker 执行具体的DAG Dag里包含一个个Task(内部重要的是Operator)
3. Aifflow UI基本操作

一个任务流程 = 一个DAG

以第一个官方实例为例:

    查看DAG

    运行DAG

    删除旧的DAG(旧的DAG:已经在服务器里不存在的)

4. Airflow 简单DAG

自定义编写一个工作流(DAG),并 启动运行

    查看DAG的存放目录(ariflow.cfg)

    在此文件夹下编写dag

打印Helloworld,Start -> 休眠 5s -> 打印 Helloworld,stop

dag01.py

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'dag01',
    default_args=default_args,
    schedule_interval=timedelta(days=1),
    start_date=datetime(2022, 1, 8),
    catchup=False,
    tags=['dag01'],
) as dag:

    t1 = BashOperator(
        task_id='print1',
        bash_command='echo Hellowrld,Start',
    )
    t2 = BashOperator(
        task_id='sleep',
        depends_on_past=False,
        bash_command='sleep 5',
        retries=3,
    )
    t3 = BashOperator(
        task_id='print2',
        depends_on_past=False,
        bash_command="echo Helloworld,Stop"
    )

    t1 >> t2 >> t3
    运行dag01

关于外部传参问题:

    自动跑时可以使用Variable全局参数在代码中接收手动跑传入参数可以使用context在代码中接收
5. Airflow metadata DB

切换 Airflow 默认元数据存储数据库从 SQLite -> MySQL

    在mysql中创建airflow数据库

    修改配置文件(airflow.cfg)

executor = LocalExecutor
sql_alchemy_conn = mysql+pymysql://root:000000@localhost:3306/airflow
    修改my.cnf
explicit_defaults_for_timestamp=1
    安装一个额外依赖
conda install mysql-connector-python
    重新初始化airflow
airflow db init

airflow users create 
    --username airflow 
    --firstname jin 
    --lastname suo 
    --role Admin 
    --email jinsuo@eamil.com
 
# run: scheduler不能再用上面的单线程启动方式来启动了
airflow webserver -p 8080 -D
nohup airflow scheduler &
6. Airflow 使用核心

AIrflow 编写核心代码是各种operator,通过operator来执行各项操作:比如执行shell脚本、执行python函数等,然后通过 >> 来串联任务,工作中经常把任务封装成shell脚本,通过SSHoperator 来调用集群(conn_id)中Shell所在位置,最终执行Shell脚本任务!

Operators

    BashOperator 执行shell 命令PythonOperator 执行python函数MysqlOperator … 连接各种数据库,执行SQLSSHOperator 调用远程Shell脚本
依赖调度
op1 >> op2 >> op3

其他重要组成

    Hooks 外部平台和数据库的接口Connections 外部系统的连接信息 conn_idVariables 源代码外部变量Jija Templating 与宏结合使用,作为环境变量

为什么用Shell脚本:常用的Spark任务、Hive任务、简单Shell任务都可以封装在Shell里


后续将分享一些公司中编写的一些DAG的Python脚本……

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/700191.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号