栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 面试经验 > 面试问答

Google Cloud Dataflow Python,检索作业ID

面试问答 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Google Cloud Dataflow Python,检索作业ID

您可以通过

dataflow.projects().locations().jobs().list
在管道中进行调用来实现(请参见下面的完整代码)。一种可能性是始终使用相同的作业名称来调用模板,这很有意义,否则可以将作业前缀作为运行时参数传递。使用正则表达式解析作业列表,以查看该作业是否包含名称前缀,如果包含名称前缀,则返回该作业ID。如果有多个,它将​​仅返回最新的一个(当前正在运行的一个)。

在定义

PROJECT
BUCKET
变量之后,使用以下命令暂存该模板:

python script.py     --runner DataflowRunner     --project $PROJECT     --staging_location gs://$BUCKET/staging     --temp_location gs://$BUCKET/temp     --template_location gs://$BUCKET/templates/retrieve_job_id

然后,

myjobprefix
在执行模板化作业时指定所需的作业名称(在我的情况下):

gcloud dataflow jobs run myjobprefix    --gcs-location gs://$BUCKET/templates/retrieve_job_id

retrieve_job_id
函数将从作业中返回作业ID,将更
job_prefix
改为与给定名称匹配。

import argparse, logging, refrom googleapiclient.discovery import buildfrom oauth2client.client import GoogleCredentialsimport apache_beam as beamfrom apache_beam.options.pipeline_options import PipelineOptionsfrom apache_beam.options.pipeline_options import SetupOptionsdef retrieve_job_id(element):  project = 'PROJECT_ID'  job_prefix = "myjobprefix"  location = 'us-central1'  logging.info("Looking for jobs with prefix {} in region {}...".format(job_prefix, location))  try:    credentials = GoogleCredentials.get_application_default()    dataflow = build('dataflow', 'v1b3', credentials=credentials)    result = dataflow.projects().locations().jobs().list(      projectId=project,      location=location,    ).execute()    job_id = "none"    for job in result['jobs']:      if re.findall(r'' + re.escape(job_prefix) + '', job['name']):        job_id = job['id']        break    logging.info("Job ID: {}".format(job_id))    return job_id  except Exception as e:    logging.info("Error retrieving Job ID")    raise KeyError(e)def run(argv=None):  parser = argparse.ArgumentParser()  known_args, pipeline_args = parser.parse_known_args(argv)  pipeline_options = PipelineOptions(pipeline_args)  pipeline_options.view_as(SetupOptions).save_main_session = True  p = beam.Pipeline(options=pipeline_options)  init_data = (p    | 'Start' >> beam.Create(["Init pipeline"])    | 'Retrieve Job ID' >> beam.FlatMap(retrieve_job_id))  p.run()if __name__ == '__main__':  run()


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/660220.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号