栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 面试经验 > 面试问答

运行时气流动态任务

面试问答 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

运行时气流动态任务

无法在DAG执行期间修改DAG(无需进行更多工作)。

dag =DAG(...
是由调度循环回升。它将具有任务实例
'python_operator'
。该任务实例在dag运行中进行调度,并由工作程序或执行程序执行。由于仅通过调度程序更新Airflow
DB中的DAG模型,因此这些添加的虚拟任务将不会保留到DAG中,也不会计划运行。他们在工人离开时将被遗忘。除非您从调度程序中复制有关持久性和更新模型的所有代码…否则下次调度程序访问DAG文件进行解析时,该代码将被撤消,这可能是每分钟一次,每秒一次或更快,具体取决于其他多少次。有DAG文件要解析。

气流实际上希望每个DAG在运行之间大致保持相同的布局。它还希望不断重新加载/解析DAG文件。因此,尽管您可以制作一个DAG文件,该文件在每次运行时根据某些外部数据动态地确定任务(最好是缓存在文件或pyc模块中,而不是像数据库查找一样缓存在网络I
/ O中),但您会减慢整个调度循环的速度对于 所有 DAG),这不是一个好计划,因为您的图形和树形视图将使所有混乱,并且该查找将使调度程序解析更加繁重。

您可以使可调用对象运行每个任务…

def make_tasks(context):    du1 = DummyOperator(task_id='dummy1', dag=dag)    du2 = DummyOperator(task_id='dummy2', dag=dag)    du3 = DummyOperator(task_id='dummy3', dag=dag)    du1.execute(context)    du2.execute(context)    du3.execute(context)p = PythonOperator(    provides_context=true,

但这是顺序的,您必须弄清楚如何使用python使它们并行(使用Future?),并且如果有任何异常,则整个任务将失败。而且它被绑定到一个执行者或工人,因此不使用气流的任务分配(kubernetes,mesos,芹菜)。

使用此方法的另一种方法是添加固定数量的任务(最大数量),并使用可调用对象将不需要的任务短路或为每个任务使用xcom推送参数,从而在运行时更改其行为但不更改DAG。



转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/653206.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号