栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 面试经验 > 面试问答

Python Airflow-从PythonOperator返回结果

面试问答 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Python Airflow-从PythonOperator返回结果

您可能想查看Airflow的XCOM:https :
//airflow.apache.org/concepts.html#xcoms

如果从函数返回值,则此值存储在xcom中。就您而言,您可以像从其他Python代码一样访问它:

task_instance = kwargs['task_instance']task_instance.xcom_pull(task_ids='Task1')

或像这样的模板中:

{{ task_instance.xcom_pull(task_ids='Task1') }}

如果要指定键,可以将其推入XCOM(在任务内):

task_instance = kwargs['task_instance']task_instance.xcom_push(key='the_key', value=my_str)

然后,您可以像下面这样访问它:

task_instance.xcom_pull(task_ids='my_task', key='the_key')

编辑1

后续问题: 我不能在其他函数中使用该值,而是将其传递给另一个PythonOperator,例如-“ t2 =” BashOperator(task_id
=’Moving_bucket’,bash_command =’python /home/raw.py“%s” ‘%file_name,dag =
dag)“-我想访问“ Task1”返回的file_name。如何实现?

首先,在我看来,该值实际上 不是 传递给另一个,

PythonOperator
而是传递给
BashOperator

其次,这已经在我上面的回答中涵盖了。该字段

bash_command
是模板化的(请参阅
template_fields
源代码:https :
//github.com/apache/incubator-
airflow/blob/master/airflow/operators/bash_operator.py)。因此,我们可以使用模板版本:

BashOperator(  task_id='Moving_bucket',   bash_command='python /home/raw.py {{ task_instance.xcom_pull(task_ids='Task1') }} ',  dag=dag,)

编辑2

说明:Airflow的工作方式如下:它将执行Task1,然后填充xcom,然后执行下一个任务。因此,为使您的示例正常工作,您需要先执行Task1,然后在Task1的下游执行Moving_bucket。

由于您使用的是返回函数,因此您也可以省略

key='file'
from
xcom_pull
而不是在函数中手动设置它。



转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/611022.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号