栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Python

Greenplum Python工具库gpload学习——do

Python 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Greenplum Python工具库gpload学习——do

def do_method_insert(self):
    self.create_external_table()
    self.do_insert(self.get_qualified_tablename())
    
def get_qualified_tablename(self):
        tblname = "%s.%s" % (self.schema, self.table)
        return tblname

关注reuse_tables、fast_match、staging_table和log_errors选项。

创建外部表

创建外部表的SQL:create external table self.get_ext_schematable(self.extSchemaName, self.extTableName) (from_cols) location(locationStr) format formatType (self.formatOpts) encoding encodingStr log errors segment reject limit limitStr。

    def create_external_table(self):
        # extract all control file information and transform it accordingly in order to construct a CREATE EXTERNAL TABLE statement if will be needed later on
        formatType = self.getconfig('gpload:input:format', unicode, 'text').lower()
        locationStr = ','.join(map(quote,self.locations))
        self.get_external_table_formatOpts('delimiter')
        nullas = self.getconfig('gpload:input:null_as', unicode, False)
        self.log(self.DEBUG, "null " + unicode(nullas))
        if nullas != False: # could be empty string 
            self.formatOpts += "null %s " % quote_no_slash(nullas)
        elif formatType=='csv':
            self.formatOpts += "null '' "
        else:
            self.formatOpts += "null %s " % quote_no_slash("N")

        esc = self.getconfig('gpload:input:escape', None, None)
        if esc:
            if type(esc) != unicode and type(esc) != str:
                self.control_file_error("gpload:input:escape must be a string")
            if esc.lower() == 'off':
                if formatType == 'csv':
                    self.control_file_error("ESCAPE cannot be set to OFF in CSV mode")
                self.formatOpts += "escape 'off' "
            else:
                self.get_external_table_formatOpts('escape')
        else:
            if formatType=='csv':
                self.get_external_table_formatOpts('quote','escape')
            else:
                self.formatOpts += "escape '\'"

        if formatType=='csv':
            self.get_external_table_formatOpts('quote')

        if self.getconfig('gpload:input:header',bool,False):
            self.formatOpts += "header "

        force_not_null_columns = self.getconfig('gpload:input:force_not_null',list,[])
        if force_not_null_columns:
            for i in force_not_null_columns:
                if type(i) != unicode and type(i) != str:
                    self.control_file_error("gpload:input:force_not_null must be a YAML sequence of strings")
            self.formatOpts += "force not null %s " % ','.join(force_not_null_columns)

        encodingCode = None
        encodingStr = self.getconfig('gpload:input:encoding', unicode, None)
        if encodingStr is None:
            result = self.db.query("SHOW SERVER_ENCODING".encode('utf-8')).getresult()
            if len(result) > 0:
                encodingStr = result[0][0]

        if encodingStr:
            sql = "SELECt pg_char_to_encoding('%s')" % encodingStr
            result = self.db.query(sql.encode('utf-8')).getresult()
            if len(result) > 0:
                encodingCode = result[0][0]

        limitStr = self.getconfig('gpload:input:error_limit',int, None)
        if self.log_errors and not limitStr:
            self.control_file_error("gpload:input:log_errors requires gpload:input:error_limit to be specified")

        self.extSchemaName = self.getconfig('gpload:external:schema', unicode, None)
        if self.extSchemaName == '%': self.extSchemaName = self.schema

        # get the list of columns to use in the extnernal table
        if not self.from_cols_from_user:
            # don't put values serial columns
            from_cols = filter(lambda a: a[3] != True, self.from_columns)
        else: from_cols = self.from_columns

如果reuse_tables为True,需要从系统表中查找已经存在的外部表,并复用。
如果设置了staging_table,如果没有配置Schema,则使用第一种SQL SELECt n.nspname as Schema, c.relname as Name FROM pg_catalog.pg_class c LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace WHERe c.relkind IN ('r','v','S','') AND c.relstorage IN ('h', 'a', 'c','x','v','') AND n.nspname <> 'pg_catalog' AND n.nspname <> 'information_schema' AND n.nspname !~ '^pg_toast' AND c.relname = '%s' AND pg_catalog.pg_table_is_visible(c.oid) ORDER BY 1,2;,如果配置了使用第二种sql select * from pg_catalog.pg_tables where schemaname = '%s' and tablename = '%s'。这里所查询的表就是staging_table。
如果没有设置staging_table,如果配置了fast_match,则调用self.get_fast_match_exttable_query产生sql,如果没有配置fast_match,则调用self.get_reuse_exttable_query

        # If the 'reuse tables' option was specified we now try to find an
        # already existing external table in the catalog which will match
        # the one that we need to use. It must have identical attributes,
        # external location, format, and encoding specifications.
        if self.reuse_tables == True:
            if self.staging_table:
                if '.' in self.staging_table:
                    self.log(self.ERROR, "Character '.' is not allowed in staging_table parameter. Please use EXTERNAL->SCHEMA to set the schema of external table")
                self.extTableName = quote_unident(self.staging_table) 
                if self.extSchemaName is None:
                    sql = """SELECt n.nspname as Schema,
                            c.relname as Name
                        FROM pg_catalog.pg_class c
                            LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
                        WHERe c.relkind IN ('r','v','S','')
                            AND c.relstorage IN ('h', 'a', 'c','x','v','')
                            AND n.nspname <> 'pg_catalog'
                            AND n.nspname <> 'information_schema'
                            AND n.nspname !~ '^pg_toast'
                            AND c.relname = '%s'
                            AND pg_catalog.pg_table_is_visible(c.oid)
                        ORDER BY 1,2;""" % self.extTableName
                else:
                    sql = "select * from pg_catalog.pg_tables where schemaname = '%s' and tablename = '%s'" % (quote_unident(self.extSchemaName),  self.extTableName)
                result = self.db.query(sql.encode('utf-8')).getresult()
                if len(result) > 0:
                    self.extSchemaTable = self.get_ext_schematable(quote_unident(self.extSchemaName), self.extTableName)
                    self.log(self.INFO, "reusing external staging table %s" % self.extSchemaTable)
                    return
            else:
                # process the single quotes in order to successfully find an existing external table to reuse.
                self.formatOpts = self.formatOpts.replace("E'\''","'''")
                if self.fast_match:
                    sql = self.get_fast_match_exttable_query(formatType, self.formatOpts,
                        limitStr, self.extSchemaName, self.log_errors, encodingCode)
                else:
                    sql = self.get_reuse_exttable_query(formatType, self.formatOpts,
                        limitStr, from_cols, self.extSchemaName, self.log_errors, encodingCode)

                resultList = self.db.query(sql.encode('utf-8')).getresult()
                if len(resultList) > 0:
                    # found an external table to reuse. no need to create one. we're done here.
                    self.extTableName = (resultList[0])[0]
                    # fast match result is only table name, so we need add schema info
                    if self.fast_match:
                        self.extSchemaTable = self.get_ext_schematable(quote_unident(self.extSchemaName), self.extTableName)
                    else:
                        self.extSchemaTable = self.extTableName
                    self.log(self.INFO, "reusing external table %s" % self.extSchemaTable)
                    return

                # didn't find an existing external table suitable for reuse. Format a reusable
                # name and issue a CREATE EXTERNAL TABLE on it. Hopefully we can use it next time
                # around

                self.extTableName = "ext_gpload_reusable_%s" % self.unique_suffix
                self.log(self.INFO, "did not find an external table to reuse. creating %s" % self.get_ext_schematable(self.extSchemaName, self.extTableName))
        # process the single quotes in order to successfully create an external table.
        self.formatOpts = self.formatOpts.replace("'''","E'\''")

        # construct a CREATE EXTERNAL TABLE statement and execute it
        self.extSchemaTable = self.get_ext_schematable(self.extSchemaName, self.extTableName)
        sql = "create external table %s" % self.extSchemaTable
        sql += "(%s)" % ','.join(map(lambda a:'%s %s' % (a[0], a[1]), from_cols))

        sql += "location(%s) "%locationStr
        sql += "format%s "% quote(formatType)
        if len(self.formatOpts) > 0:
            sql += "(%s) "% self.formatOpts
        if encodingStr:
            sql += "encoding%s "%quote(encodingStr)
        if self.log_errors:
            sql += "log errors "

        if limitStr:
            if limitStr < 2:
                self.control_file_error("error_limit must be 2 or higher")
            sql += "segment reject limit %s "%limitStr

        try:
            self.db.query(sql.encode('utf-8'))
        except Exception, e:
            self.log(self.ERROR, 'could not run SQL "%s": %s' % (sql, unicode(e)))

        # set up to drop the external table at the end of operation, unless user
        # specified the 'reuse_tables' option, in which case we don't drop
        if self.reuse_tables == False:
            self.cleanupSql.append('drop external table if exists %s'%self.extSchemaTable)
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/423176.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号