编辑:从2.12.0版开始,Beam附带了新的
fileio转换,使您可以从CSV读取数据而不必重新实现源代码。您可以这样做:
def get_csv_reader(readable_file): # You can return whichever kind of reader you want here # a DictReader, or a normal csv.reader. if sys.version_info >= (3, 0): return csv.reader(io.TextIOWrapper(readable_file.open())) else: return csv.reader(readable_file.open())with Pipeline(...) as p: content_pc = (p | beam.io.fileio.MatchFiles("/my/file/name") | beam.io.fileio.ReadMatches() | beam.Reshuffle() # Useful if you expect many matches | beam.FlatMap(get_csv_reader))我最近为Apache
Beam编写了一个测试。您可以查看Github存储库。
旧的答案 依赖于重新实现源。这已不再是推荐的主要方法:)
想法是有一个返回已解析的CSV行的源。您可以通过对
FilebasedSource类进行子类化以包括CSV解析来实现。特别是,该
read_records函数将如下所示:
class MyCsvFileSource(apache_beam.io.filebasedsource.FilebasedSource): def read_records(self, file_name, range_tracker): self._file = self.open_file(file_name) reader = csv.reader(self._file) for rec in reader: yield rec



