def do_method_insert(self):
self.create_external_table()
self.do_insert(self.get_qualified_tablename())
def get_qualified_tablename(self):
tblname = "%s.%s" % (self.schema, self.table)
return tblname
关注reuse_tables、fast_match、staging_table和log_errors选项。
创建外部表创建外部表的SQL:create external table self.get_ext_schematable(self.extSchemaName, self.extTableName) (from_cols) location(locationStr) format formatType (self.formatOpts) encoding encodingStr log errors segment reject limit limitStr。
def create_external_table(self):
# extract all control file information and transform it accordingly in order to construct a CREATE EXTERNAL TABLE statement if will be needed later on
formatType = self.getconfig('gpload:input:format', unicode, 'text').lower()
locationStr = ','.join(map(quote,self.locations))
self.get_external_table_formatOpts('delimiter')
nullas = self.getconfig('gpload:input:null_as', unicode, False)
self.log(self.DEBUG, "null " + unicode(nullas))
if nullas != False: # could be empty string
self.formatOpts += "null %s " % quote_no_slash(nullas)
elif formatType=='csv':
self.formatOpts += "null '' "
else:
self.formatOpts += "null %s " % quote_no_slash("N")
esc = self.getconfig('gpload:input:escape', None, None)
if esc:
if type(esc) != unicode and type(esc) != str:
self.control_file_error("gpload:input:escape must be a string")
if esc.lower() == 'off':
if formatType == 'csv':
self.control_file_error("ESCAPE cannot be set to OFF in CSV mode")
self.formatOpts += "escape 'off' "
else:
self.get_external_table_formatOpts('escape')
else:
if formatType=='csv':
self.get_external_table_formatOpts('quote','escape')
else:
self.formatOpts += "escape '\'"
if formatType=='csv':
self.get_external_table_formatOpts('quote')
if self.getconfig('gpload:input:header',bool,False):
self.formatOpts += "header "
force_not_null_columns = self.getconfig('gpload:input:force_not_null',list,[])
if force_not_null_columns:
for i in force_not_null_columns:
if type(i) != unicode and type(i) != str:
self.control_file_error("gpload:input:force_not_null must be a YAML sequence of strings")
self.formatOpts += "force not null %s " % ','.join(force_not_null_columns)
encodingCode = None
encodingStr = self.getconfig('gpload:input:encoding', unicode, None)
if encodingStr is None:
result = self.db.query("SHOW SERVER_ENCODING".encode('utf-8')).getresult()
if len(result) > 0:
encodingStr = result[0][0]
if encodingStr:
sql = "SELECt pg_char_to_encoding('%s')" % encodingStr
result = self.db.query(sql.encode('utf-8')).getresult()
if len(result) > 0:
encodingCode = result[0][0]
limitStr = self.getconfig('gpload:input:error_limit',int, None)
if self.log_errors and not limitStr:
self.control_file_error("gpload:input:log_errors requires gpload:input:error_limit to be specified")
self.extSchemaName = self.getconfig('gpload:external:schema', unicode, None)
if self.extSchemaName == '%': self.extSchemaName = self.schema
# get the list of columns to use in the extnernal table
if not self.from_cols_from_user:
# don't put values serial columns
from_cols = filter(lambda a: a[3] != True, self.from_columns)
else: from_cols = self.from_columns
如果reuse_tables为True,需要从系统表中查找已经存在的外部表,并复用。
如果设置了staging_table,如果没有配置Schema,则使用第一种SQL SELECt n.nspname as Schema, c.relname as Name FROM pg_catalog.pg_class c LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace WHERe c.relkind IN ('r','v','S','') AND c.relstorage IN ('h', 'a', 'c','x','v','') AND n.nspname <> 'pg_catalog' AND n.nspname <> 'information_schema' AND n.nspname !~ '^pg_toast' AND c.relname = '%s' AND pg_catalog.pg_table_is_visible(c.oid) ORDER BY 1,2;,如果配置了使用第二种sql select * from pg_catalog.pg_tables where schemaname = '%s' and tablename = '%s'。这里所查询的表就是staging_table。
如果没有设置staging_table,如果配置了fast_match,则调用self.get_fast_match_exttable_query产生sql,如果没有配置fast_match,则调用self.get_reuse_exttable_query
# If the 'reuse tables' option was specified we now try to find an
# already existing external table in the catalog which will match
# the one that we need to use. It must have identical attributes,
# external location, format, and encoding specifications.
if self.reuse_tables == True:
if self.staging_table:
if '.' in self.staging_table:
self.log(self.ERROR, "Character '.' is not allowed in staging_table parameter. Please use EXTERNAL->SCHEMA to set the schema of external table")
self.extTableName = quote_unident(self.staging_table)
if self.extSchemaName is None:
sql = """SELECt n.nspname as Schema,
c.relname as Name
FROM pg_catalog.pg_class c
LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
WHERe c.relkind IN ('r','v','S','')
AND c.relstorage IN ('h', 'a', 'c','x','v','')
AND n.nspname <> 'pg_catalog'
AND n.nspname <> 'information_schema'
AND n.nspname !~ '^pg_toast'
AND c.relname = '%s'
AND pg_catalog.pg_table_is_visible(c.oid)
ORDER BY 1,2;""" % self.extTableName
else:
sql = "select * from pg_catalog.pg_tables where schemaname = '%s' and tablename = '%s'" % (quote_unident(self.extSchemaName), self.extTableName)
result = self.db.query(sql.encode('utf-8')).getresult()
if len(result) > 0:
self.extSchemaTable = self.get_ext_schematable(quote_unident(self.extSchemaName), self.extTableName)
self.log(self.INFO, "reusing external staging table %s" % self.extSchemaTable)
return
else:
# process the single quotes in order to successfully find an existing external table to reuse.
self.formatOpts = self.formatOpts.replace("E'\''","'''")
if self.fast_match:
sql = self.get_fast_match_exttable_query(formatType, self.formatOpts,
limitStr, self.extSchemaName, self.log_errors, encodingCode)
else:
sql = self.get_reuse_exttable_query(formatType, self.formatOpts,
limitStr, from_cols, self.extSchemaName, self.log_errors, encodingCode)
resultList = self.db.query(sql.encode('utf-8')).getresult()
if len(resultList) > 0:
# found an external table to reuse. no need to create one. we're done here.
self.extTableName = (resultList[0])[0]
# fast match result is only table name, so we need add schema info
if self.fast_match:
self.extSchemaTable = self.get_ext_schematable(quote_unident(self.extSchemaName), self.extTableName)
else:
self.extSchemaTable = self.extTableName
self.log(self.INFO, "reusing external table %s" % self.extSchemaTable)
return
# didn't find an existing external table suitable for reuse. Format a reusable
# name and issue a CREATE EXTERNAL TABLE on it. Hopefully we can use it next time
# around
self.extTableName = "ext_gpload_reusable_%s" % self.unique_suffix
self.log(self.INFO, "did not find an external table to reuse. creating %s" % self.get_ext_schematable(self.extSchemaName, self.extTableName))
# process the single quotes in order to successfully create an external table.
self.formatOpts = self.formatOpts.replace("'''","E'\''")
# construct a CREATE EXTERNAL TABLE statement and execute it
self.extSchemaTable = self.get_ext_schematable(self.extSchemaName, self.extTableName)
sql = "create external table %s" % self.extSchemaTable
sql += "(%s)" % ','.join(map(lambda a:'%s %s' % (a[0], a[1]), from_cols))
sql += "location(%s) "%locationStr
sql += "format%s "% quote(formatType)
if len(self.formatOpts) > 0:
sql += "(%s) "% self.formatOpts
if encodingStr:
sql += "encoding%s "%quote(encodingStr)
if self.log_errors:
sql += "log errors "
if limitStr:
if limitStr < 2:
self.control_file_error("error_limit must be 2 or higher")
sql += "segment reject limit %s "%limitStr
try:
self.db.query(sql.encode('utf-8'))
except Exception, e:
self.log(self.ERROR, 'could not run SQL "%s": %s' % (sql, unicode(e)))
# set up to drop the external table at the end of operation, unless user
# specified the 'reuse_tables' option, in which case we don't drop
if self.reuse_tables == False:
self.cleanupSql.append('drop external table if exists %s'%self.extSchemaTable)



