栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 面试经验 > 面试问答

将二进制COPY表FROM与psycopg2一起使用

面试问答 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

将二进制COPY表FROM与psycopg2一起使用

这是Python 3的COPY FROM的二进制等效文件:

from io import BytesIOfrom struct import packimport psycopg2# Two rows of data; "id" is not in the upstream data source# Columns: node, ts, val1, val2data = [(23253, 342, -15.336734, 2494627.949375),        (23256, 348, 43.23524, 2494827.949375)]conn = psycopg2.connect("dbname=mydb user=postgres")curs = conn.cursor()# Determine starting value for sequencecurs.execute("SELECt nextval('num_data_id_seq')")id_seq = curs.fetchone()[0]# Make a binary file object for COPY FROMcpy = BytesIO()# 11-byte signature, no flags, no header extensioncpy.write(pack('!11sii', b'PGCOPYn377rn', 0, 0))# Columns: id, node, ts, val1, val2# Zip: (column position, format, size)row_format = list(zip(range(-1, 4),('i', 'i', 'h', 'f', 'd'),( 4,   4,   2,   4,   8 )))for row in data:    # Number of columns/fields (always 5)    cpy.write(pack('!h', 5))    for col, fmt, size in row_format:        value = (id_seq if col == -1 else row[col])        cpy.write(pack('!i' + fmt, size, value))    id_seq += 1  # manually increment sequence outside of database# File trailercpy.write(pack('!h', -1))# Copy data to databasecpy.seek(0)curs.copy_expert("COPY num_data FROM STDIN WITH BINARY", cpy)# Update sequence on databasecurs.execute("SELECT setval('num_data_id_seq', %s, false)", (id_seq,))conn.commit()

更新资料

我改写了上面的方法来为COPY编写文件。我在Python中的数据位于NumPy数组中,因此使用它们很有意义。这是一个

data
具有1M行,7列的示例:

import psycopg2import numpy as npfrom struct import packfrom io import BytesIOfrom datetime import datetimeconn = psycopg2.connect("dbname=mydb user=postgres")curs = conn.cursor()# NumPy record arrayshape = (7, 2000, 500)print('Generating data with %i rows, %i columns' % (shape[1]*shape[2], shape[0]))dtype = ([('id', 'i4'), ('node', 'i4'), ('ts', 'i2')] +         [('s' + str(x), 'f4') for x in range(shape[0])])data = np.empty(shape[1]*shape[2], dtype)data['id'] = np.arange(shape[1]*shape[2]) + 1data['node'] = np.tile(np.arange(shape[1]) + 1, shape[2])data['ts'] = np.repeat(np.arange(shape[2]) + 1, shape[1])data['s0'] = np.random.rand(shape[1]*shape[2]) * 100prv = 's0'for nxt in data.dtype.names[4:]:    data[nxt] = data[prv] + np.random.rand(shape[1]*shape[2]) * 10    prv = nxt

在我的数据库中,我有两个看起来像的表:

CREATE TABLE num_data_binary(  id integer PRIMARY KEY,  node integer NOT NULL,  ts smallint NOT NULL,  s0 real,  s1 real,  s2 real,  s3 real,  s4 real,  s5 real,  s6 real) WITH (OIDS=FALSE);

另一个类似的表名为

num_data_text

以下是一些简单的辅助函数,它们通过使用NumPy记录数组中的信息为COPY(文本和二进制格式)准备数据:

def prepare_text(dat):    cpy = BytesIO()    for row in dat:        cpy.write('t'.join([repr(x) for x in row]) + 'n')    return(cpy)def prepare_binary(dat):    pgcopy_dtype = [('num_fields','>i2')]    for field, dtype in dat.dtype.descr:        pgcopy_dtype += [(field + '_length', '>i4'),   (field, dtype.replace('<', '>'))]    pgcopy = np.empty(dat.shape, pgcopy_dtype)    pgcopy['num_fields'] = len(dat.dtype)    for i in range(len(dat.dtype)):        field = dat.dtype.names[i]        pgcopy[field + '_length'] = dat.dtype[i].alignment        pgcopy[field] = dat[field]    cpy = BytesIO()    cpy.write(pack('!11sii', b'PGCOPYn377rn', 0, 0))    cpy.write(pgcopy.tostring())  # all rows    cpy.write(pack('!h', -1))  # file trailer    return(cpy)

这就是我使用帮助程序函数对两种COPY格式方法进行基准测试的方式:

def time_pgcopy(dat, table, binary):    print('Processing copy object for ' + table)    tstart = datetime.now()    if binary:        cpy = prepare_binary(dat)    else:  # text        cpy = prepare_text(dat)    tendw = datetime.now()    print('Copy object prepared in ' + str(tendw - tstart) + '; ' +          str(cpy.tell()) + ' bytes; transfering to database')    cpy.seek(0)    if binary:        curs.copy_expert('COPY ' + table + ' FROM STDIN WITH BINARY', cpy)    else:  # text        curs.copy_from(cpy, table)    conn.commit()    tend = datetime.now()    print('Database copy time: ' + str(tend - tendw))    print('        Total time: ' + str(tend - tstart))    returntime_pgcopy(data, 'num_data_text', binary=False)time_pgcopy(data, 'num_data_binary', binary=True)

这是最后两个

time_pgcopy
命令的输出:

Processing copy object for num_data_textCopy object prepared in 0:01:15.288695; 84355016 bytes; transfering to databaseDatabase copy time: 0:00:37.929166        Total time: 0:01:53.217861Processing copy object for num_data_binaryCopy object prepared in 0:00:01.296143; 80000021 bytes; transfering to databaseDatabase copy time: 0:00:23.325952        Total time: 0:00:24.622095

因此,使用二进制方法,NumPy→文件和File→数据库步骤都更快。明显的区别是Python如何准备COPY文件,这对于文本来说确实很慢。通常,二进制格式会以这种格式的文本格式在2/3的时间内将其加载到数据库中。

最后,我比较了数据库中两个表中的值,以查看数字是否不同。大约1.46%的行的column值不同

s0
,并且该比例的值增加到6.17%
s6
(可能与我使用的随机方法有关)。所有70M
32位浮点值之间的非零绝对差值介于9.3132257e-010和7.6293945e-006之间。文本和二进制加载方法之间的这些细微差别是由于文本格式方法所需的float→text→float转换而导致精度损失。



转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/382630.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号