我之前是从LocalExecutor换成了CeleryExecutor,用的是官方的demo docker-compose文件,在此基础上进行的修改。
docker-compose文件的代码如下
---
version: '3.8'
x-airflow-common:
&airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.2.0-python3.8}
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__CORE__DEFAULT_TIMEZONE: Asia/Shanghai
AIRFLOW__WEBSERVER__DEFAULT_UI_TIMEZONE: Asia/Shanghai
AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
AIRFLOW__EMAIL__EMAIL_BACKEND: 'airflow.utils.email.send_email_smtp'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'true'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'true'
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
# user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}"
user: "root"
privileged: true
depends_on:
redis:
condition: service_healthy
postgres:
condition: service_healthy
services:
postgres:
container_name: airflow-postgres
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
TZ: Asia/Shanghai
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: [ "CMD", "pg_isready", "-U", "airflow" ]
interval: 5s
retries: 5
restart: always
privileged: true
redis:
container_name: airflow-redis
image: redis:latest
environment:
TZ: Asia/Shanghai
ports:
- 6379:6379
healthcheck:
test: [ "CMD", "redis-cli", "ping" ]
interval: 5s
timeout: 30s
retries: 50
restart: always
privileged: true
airflow-webserver:
<<: *airflow-common
container_name: airflow-webserver
command: webserver
# build: ./airflow-webserver
ports:
- 8080:8080
healthcheck:
test: [ "CMD", "curl", "--fail", "http://localhost:8080/health" ]
interval: 10s
timeout: 10s
retries: 5
restart: always
privileged: true
airflow-scheduler:
<<: *airflow-common
container_name: airflow-scheduler
command: scheduler
build: ./airflow-scheduler
healthcheck:
test: [ "CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"' ]
interval: 10s
timeout: 10s
retries: 5
restart: always
privileged: true
airflow-worker:
<<: *airflow-common
container_name: airflow-worker
command: celery worker
healthcheck:
test:
- "CMD-SHELL"
- 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
interval: 10s
timeout: 10s
retries: 5
restart: always
privileged: true
airflow-init:
<<: *airflow-common
container_name: airflow-init
command: version
environment:
<<: *airflow-common-env
_AIRFLOW_DB_UPGRADE: 'true'
_AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
privileged: true
flower:
container_name: airflow-flower
<<: *airflow-common
command: celery flower
ports:
- 5555:5555
healthcheck:
test: [ "CMD", "curl", "--fail", "http://localhost:5555/" ]
interval: 10s
timeout: 10s
retries: 5
restart: always
privileged: true
volumes:
postgres-db-volume:
看得出我是在airflow-scheduler中build的,但是我最近遇到了一下小问题(具体问题另开一篇文章记录),让我思考这个build是否是有问题的。
之前在使用LocalExecutor时,只对scheduler进行build,目的是在scheduler的容器中安装业务需要的python环境,然后airflow在调度、执行task时用到的都是scheduler中的环境,区别只是调度用的是容器中自带python环境,执行task用到的是我自己安装的业务python环境。
到了CeleryExecutor中有些不太一样了,调度用的是scheduler,执行task用到的是worker。可我依旧只是对scheduler进行build,奇怪的是scheduler和worker中都有一份业务环境???但既然work中有业务环境,那也能继续用。
直到最近,我尝试了将build的对象改为worker,且丝毫不影响airflow的运行,我想这才是正确的用法吧,现在的代码如下
---
version: '3.8'
x-airflow-common:
&airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.2.3-python3.8}
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__CORE__DEFAULT_TIMEZONE: Asia/Shanghai
AIRFLOW__WEBSERVER__DEFAULT_UI_TIMEZONE: Asia/Shanghai
AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
AIRFLOW__EMAIL__EMAIL_BACKEND: 'airflow.utils.email.send_email_smtp'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'true'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'true'
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
# user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}"
user: "root"
privileged: true
depends_on:
redis:
condition: service_healthy
postgres:
condition: service_healthy
services:
postgres:
container_name: airflow-postgres
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
TZ: Asia/Shanghai
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: [ "CMD", "pg_isready", "-U", "airflow" ]
interval: 5s
retries: 5
restart: always
privileged: true
redis:
container_name: airflow-redis
image: redis:latest
environment:
TZ: Asia/Shanghai
ports:
- 6379:6379
healthcheck:
test: [ "CMD", "redis-cli", "ping" ]
interval: 5s
timeout: 30s
retries: 50
restart: always
privileged: true
airflow-webserver:
<<: *airflow-common
container_name: airflow-webserver
command: webserver
# build: ./airflow-webserver
ports:
- 8080:8080
healthcheck:
test: [ "CMD", "curl", "--fail", "http://localhost:8080/health" ]
interval: 10s
timeout: 10s
retries: 5
restart: always
privileged: true
airflow-scheduler:
<<: *airflow-common
container_name: airflow-scheduler
command: scheduler
# build: ./airflow-scheduler
healthcheck:
test: [ "CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"' ]
interval: 10s
timeout: 10s
retries: 5
restart: always
privileged: true
airflow-worker:
<<: *airflow-common
image: core.harbor.techfin.ai/library/apache/airflow-worker:2.2.0-python3.8
build: ./airflow-scheduler
container_name: airflow-worker
command: celery worker
healthcheck:
test:
- "CMD-SHELL"
- 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
interval: 10s
timeout: 10s
retries: 5
restart: always
privileged: true
airflow-init:
<<: *airflow-common
container_name: airflow-init
command: version
environment:
<<: *airflow-common-env
_AIRFLOW_DB_UPGRADE: 'true'
_AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
privileged: true
flower:
container_name: airflow-flower
<<: *airflow-common
command: celery flower
ports:
- 5555:5555
healthcheck:
test: [ "CMD", "curl", "--fail", "http://localhost:5555/" ]
interval: 10s
timeout: 10s
retries: 5
restart: always
privileged: true
volumes:
postgres-db-volume:



