1. Airflow 安装Airflow 任务调度工具+ETL工具(不常用)
Apache开源 ,编程(Python)、调度、监控 (UI) 动态 可扩展 优雅 可伸缩 上手简单
本地安装
# 最好使用conda创建一个虚拟Python环境,防止本机Python环境不兼容
# 0.config
export AIRFLOW_HOME=/opt/module/airflow
AIRFLOW_VERSION=2.2.3
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
# 1. install
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
# 2.init db
airflow db init
# 3.create admin
airflow users create
--username airflow
--firstname jin
--lastname suo
--role Admin
--email jinsuo@eamil.com
# 4. run
airflow webserver --port 8080 -D
airflow scheduler -D
# 5.stop
ps -ef |grep airflow|grep master|cut -d ' ' -f 7|xargs kill -9
ps -ef |grep airflow|grep scheduler|cut -d ' ' -f 7|xargs kill -9
2. Airflow 架构及组件
1个工作流(Workflow)= 1个 DAG = n 个有依赖的 Task
4大核心组件
- Scheduler 调度器 包含(Executor)WebServer 浏览器UImetadata database 元数据存储Worker 执行具体的DAG Dag里包含一个个Task(内部重要的是Operator)
一个任务流程 = 一个DAG
以第一个官方实例为例:
- 查看DAG
- 运行DAG
- 删除旧的DAG(旧的DAG:已经在服务器里不存在的)
自定义编写一个工作流(DAG),并 启动运行
- 查看DAG的存放目录(ariflow.cfg)
- 在此文件夹下编写dag
打印Helloworld,Start -> 休眠 5s -> 打印 Helloworld,stop
dag01.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'dag01',
default_args=default_args,
schedule_interval=timedelta(days=1),
start_date=datetime(2022, 1, 8),
catchup=False,
tags=['dag01'],
) as dag:
t1 = BashOperator(
task_id='print1',
bash_command='echo Hellowrld,Start',
)
t2 = BashOperator(
task_id='sleep',
depends_on_past=False,
bash_command='sleep 5',
retries=3,
)
t3 = BashOperator(
task_id='print2',
depends_on_past=False,
bash_command="echo Helloworld,Stop"
)
t1 >> t2 >> t3
- 运行dag01
5. Airflow metadata DB关于外部传参问题:
自动跑时可以使用Variable全局参数在代码中接收手动跑传入参数可以使用context在代码中接收
切换 Airflow 默认元数据存储数据库从 SQLite -> MySQL
在mysql中创建airflow数据库
修改配置文件(airflow.cfg)
executor = LocalExecutor sql_alchemy_conn = mysql+pymysql://root:000000@localhost:3306/airflow
- 修改my.cnf
explicit_defaults_for_timestamp=1
- 安装一个额外依赖
conda install mysql-connector-python
- 重新初始化airflow
airflow db init
airflow users create
--username airflow
--firstname jin
--lastname suo
--role Admin
--email jinsuo@eamil.com
# run: scheduler不能再用上面的单线程启动方式来启动了
airflow webserver -p 8080 -D
nohup airflow scheduler &
6. Airflow 使用核心
AIrflow 编写核心代码是各种operator,通过operator来执行各项操作:比如执行shell脚本、执行python函数等,然后通过 >> 来串联任务,工作中经常把任务封装成shell脚本,通过SSHoperator 来调用集群(conn_id)中Shell所在位置,最终执行Shell脚本任务!
Operators
- BashOperator 执行shell 命令PythonOperator 执行python函数MysqlOperator … 连接各种数据库,执行SQLSSHOperator 调用远程Shell脚本
op1 >> op2 >> op3
其他重要组成
- Hooks 外部平台和数据库的接口Connections 外部系统的连接信息 conn_idVariables 源代码外部变量Jija Templating 与宏结合使用,作为环境变量
为什么用Shell脚本:常用的Spark任务、Hive任务、简单Shell任务都可以封装在Shell里
后续将分享一些公司中编写的一些DAG的Python脚本……



