栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 面试经验 > 面试问答

使用Apache Beam以CSV格式将BigQuery结果写入GCS

面试问答 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

使用Apache Beam以CSV格式将BigQuery结果写入GCS

您可以通过

WriteToText
添加
.csv
后缀和来实现
headers
。考虑到您需要将查询结果解析为CSV格式。作为示例,我使用了莎士比亚公共数据集和以下查询:

bigquery-public-data.samples.shakespeare
中选择单词,单词数,语料库,其中CHAR_LENGTH(word)> 3按单词数计数排序限制10

现在,我们通过以下方式读取查询结果:

BQ_DATA = p | 'read_bq_view' >> beam.io.Read(    beam.io.BigQuerySource(query=query, use_standard_sql=True))

BQ_DATA
现在包含键值对:

{u'corpus': u'hamlet', u'word': u'HAMLET', u'word_count': 407}{u'corpus': u'kingrichardiii', u'word': u'that', u'word_count': 319}{u'corpus': u'othello', u'word': u'OTHELLO', u'word_count': 313}

我们可以应用一个

beam.Map
函数只产生值:

BQ_VALUES = BQ_DATA | 'read values' >> beam.Map(lambda x: x.values())

摘录

BQ_VALUES

[u'hamlet', u'HAMLET', 407][u'kingrichardiii', u'that', 319][u'othello', u'OTHELLO', 313]

最后再次映射,使所有列值用逗号而不是列表分开(考虑到如果双引号可以出现在字段中,则需要转义双引号):

BQ_CSV = BQ_VALUES | 'CSV format' >> beam.Map(    lambda row: ', '.join(['"'+ str(column) +'"' for column in row]))

现在,我们将结果后缀和标头写入GCS:

BQ_CSV | 'Write_to_GCS' >> beam.io.WriteToText(    'gs://{0}/results/output'.format(BUCKET), file_name_suffix='.csv', header='word, word count, corpus')

书面结果:

$ gsutil cat gs://$BUCKET/results/output-00000-of-00001.csvword, word count, corpus"hamlet", "HAMLET", "407""kingrichardiii", "that", "319""othello", "OTHELLO", "313""merrywivesofwindsor", "MISTRESS", "310""othello", "IAGO", "299""antonyandcleopatra", "ANTONY", "284""asyoulikeit", "that", "281""antonyandcleopatra", "CLEOPATRA", "274""measureforemeasure", "your", "274""romeoandjuliet", "that", "270"


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/646577.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号