注意:python的版本必须是python3+,我用的是python3.7这个版本。
pip install apache-airflow
通过该命令即可安装成功。
这样安装好后,我的airflow的执行文件在目录下:
/Users/xhz/opt/anaconda3/envs/py37/bin
注意:由于我是使用anaconda3来管理python环境的,这里可以自己通过find命令来搜一下airflow这个命令的位置。把airflow的可执行程序的路径添加到PATH路径中,这样就可以直接使用airflow命令了。
airflow的配置文件目录是在:
/Users/xhz/airflow/
修改airflow的配置,就是修改以下文件:
/Users/xhz/airflow/airflow.cfg初始化 初始化数据库
若不修改配置,默认airflow使用的是SQLite文本数据,可以在airflow.cfg配置文件中配置该数据库的位置。也就是修改以下这一行的配置:
sql_alchemy_conn = sqlite:Users/reyun/airflow/airflow.db
注意:我这里不配置,而是使用默认的配置,这里可以修改成mysql数据库。
执行以下命令来初始化airflow运行时需要使用的数据库配置数据:
airflow db init添加一个用户
添加一个用户才能登录webserver页面,所以需要添加一个新的管理用户。
airflow users create
--username admin
--firstname admin
--lastname admin
--role Admin
--email admin@example.org
启动webserver和scheduler
- 启动webserver
airflow webserver -p 20001 -D
说明:-p是指定端口; -D是作为后台执行。
- 启动scheduler
airflow scheduler -D
说明:-D是把schduler放到后台执行。
现在就可以在浏览器中使用输入:http://127.0.0.1:20001查看airflow的管理页面。
使用编辑以下python文件,保存为:hello_world.py。
# -*- coding: utf-8 -*-
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import timedelta
#-------------------------------------------------------------------------------
# these args will get passed on to each operator
# you can override them on a per-task basis during operator initialization
default_args = {
'owner': 'xhz',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(2),
'email': ['xhz@qq.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'adhoc':False,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'trigger_rule': u'all_success'
}
#-------------------------------------------------------------------------------
# dag
dag = DAG(
'xhz_hello_world_dag',
default_args=default_args,
description='xhz first DAG',
schedule_interval=timedelta(days=1))
#-------------------------------------------------------------------------------
# first operator
date_operator = BashOperator(
task_id='date_task',
bash_command='date',
dag=dag)
#-------------------------------------------------------------------------------
# second operator
sleep_operator = BashOperator(
task_id='sleep_task',
depends_on_past=False,
bash_command='sleep 5',
dag=dag)
#-------------------------------------------------------------------------------
# third operator
def print_hello():
print("Hello world xhz!")
return 'Hello world xhz!'
hello_operator = PythonOperator(
task_id='xhz_hello_task',
python_callable=print_hello,
dag=dag)
#-------------------------------------------------------------------------------
# dependencies
# t1
# /
# t2 t3
#
sleep_operator.set_upstream(date_operator)
hello_operator.set_upstream(date_operator)
执行
以上代码写好后,通过python命令来提交和执行任务。
python hello_world.py总结
本文介绍了airflow的安装和基本的使用。以后的文章会针对任务调度比较关注的问题对其进行介绍。



