无法在DAG执行期间修改DAG(无需进行更多工作)。
该
dag =DAG(...是由调度循环回升。它将具有任务实例
'python_operator'。该任务实例在dag运行中进行调度,并由工作程序或执行程序执行。由于仅通过调度程序更新Airflow
DB中的DAG模型,因此这些添加的虚拟任务将不会保留到DAG中,也不会计划运行。他们在工人离开时将被遗忘。除非您从调度程序中复制有关持久性和更新模型的所有代码…否则下次调度程序访问DAG文件进行解析时,该代码将被撤消,这可能是每分钟一次,每秒一次或更快,具体取决于其他多少次。有DAG文件要解析。
气流实际上希望每个DAG在运行之间大致保持相同的布局。它还希望不断重新加载/解析DAG文件。因此,尽管您可以制作一个DAG文件,该文件在每次运行时根据某些外部数据动态地确定任务(最好是缓存在文件或pyc模块中,而不是像数据库查找一样缓存在网络I
/ O中),但您会减慢整个调度循环的速度对于 所有 DAG),这不是一个好计划,因为您的图形和树形视图将使所有混乱,并且该查找将使调度程序解析更加繁重。
您可以使可调用对象运行每个任务…
def make_tasks(context): du1 = DummyOperator(task_id='dummy1', dag=dag) du2 = DummyOperator(task_id='dummy2', dag=dag) du3 = DummyOperator(task_id='dummy3', dag=dag) du1.execute(context) du2.execute(context) du3.execute(context)p = PythonOperator( provides_context=true,
但这是顺序的,您必须弄清楚如何使用python使它们并行(使用Future?),并且如果有任何异常,则整个任务将失败。而且它被绑定到一个执行者或工人,因此不使用气流的任务分配(kubernetes,mesos,芹菜)。
使用此方法的另一种方法是添加固定数量的任务(最大数量),并使用可调用对象将不需要的任务短路或为每个任务使用xcom推送参数,从而在运行时更改其行为但不更改DAG。



