正如我在评论中所说,csv文件需要在行(或行)边界上拆分。您的代码无法执行此操作,并且有可能将它们拆分成一个代码的中间位置-
我怀疑是您的原因
_csv.Error。
下面通过将输入文件处理为一系列行来避免这样做。我已经对其进行了测试,并且似乎可以将示例文件分成大小 大致
相等的块,因此它可以独立工作,因为不太可能将全部行都完全适合一个块。
更新资料
这是一个 基本
的代码的速度更快的版本比我最初发布。改进之处在于,它现在使用临时文件自己的
tell()方法来确定正在写入的文件的不断变化的长度,而不是调用
os.path.getsize(),从而消除了
flush()对文件的需要,并
os.fsync()在每一行写入后对其进行调用。
import csvimport multiprocessingimport osimport tempfiledef split(infilename, num_chunks=multiprocessing.cpu_count()): READ_BUFFER = 2**13 in_file_size = os.path.getsize(infilename) print 'in_file_size:', in_file_size chunk_size = in_file_size // num_chunks print 'target chunk_size:', chunk_size files = [] with open(infilename, 'rb', READ_BUFFER) as infile: for _ in xrange(num_chunks): temp_file = tempfile.TemporaryFile() while temp_file.tell() < chunk_size: try: temp_file.write(infile.next()) except StopIteration: # end of infile break temp_file.seek(0) # rewind files.append(temp_file) return filesfiles = split("sample_simple.csv", num_chunks=4)print 'number of files created: {}'.format(len(files))for i, ifile in enumerate(files, start=1): print 'size of temp file {}: {}'.format(i, os.path.getsize(ifile.name)) print 'contents of file {}:'.format(i) reader = csv.reader(ifile) for row in reader: print row print ''


