栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Python

python链接各类数据库汇总【持续更新】

Python 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

python链接各类数据库汇总【持续更新】

python链接不同数据库会使用到不同的第三方库,SQLAlchemy可以很好的整合大部分常用数据库,利用SQLAlchemy可以很好的完成自用的数据库链接模块。因python中数据存储常用到Dataframe,所以在查询中增加了统一Dataframe格式输出,方便后续的操作。
因为工作原因,目前只完成了Oracle、MySQL、sqlserver三种数据库的查询部分,后续会陆续完善其余部分以及增加pg数据库的链接和相关操作。

import cx_Oracle
import pandas as pd
from sqlalchemy import create_engine
import sqlalchemy
from urllib import parse

# 暂时只考虑了简单的增删查改
SQL_TYPE = ['select', 'insert', 'update', 'truncate']


def conn_oracle(database, sql_type, sql):
    if sql_type not in SQL_TYPE:
        result = 'sql_type值不符合规定'
        return result
    else:
        try:
            engine = create_engine(
                'oracle+cx_oracle://%s:%s@%s:%s/%s' % (
                    database['username'], parse.quote(database['password']), database['ip'], database['port'],
                    database['db_name']))
            conn = engine.connect()
        except sqlalchemy.exc.DatabaseError as e:
            if e.orig.args[0].code == 12545:
                result = 'ip不正确,数据库连接失败'
                return result
            if e.orig.args[0].code == 12541:
                result = '端口号不正确,数据库连接失败'
                return result
            elif e.orig.args[0].code == 12505:
                # 经实测,Oracle 19c用create_engine方式会报错,说SID无效,经过多次尝试,改用cx_Oracle的传参形式可以成功
                try:
                    conn = cx_Oracle.connect(user=database['username'], password=parse.quote(database['password']),
                                             dsn="%s:%s/%s" % (database['ip'], database['port'], database['db_name']))
                except sqlalchemy.exc.DatabaseError as e:
                    result = str(e)
                    return result
            else:
                result = str(e)
                return result
        cursor = conn.cursor()
        if sql_type == 'select':
            # Oracle没使用通用的__select_data_to_df函数,因为列名获取稍有差异
            data = cursor.execute(sql)
            list_data = []
            list_column = [i[0] for i in data.description]
            for item in data:
                list_data.append(item)
            df_data = pd.Dataframe(data=list_data, columns=list_column)
        elif sql_type == 'insert':
            cursor.execute(sql)
            df_data = 0
        elif sql_type == 'update':
            pass
        elif sql_type == 'truncate':
            cursor.execute(sql)
            df_data = 0
        conn.commit()
        conn.close()
        return df_data


def conn_mysql(database, sql_type, sql):
    if sql_type not in SQL_TYPE:
        result = 'sql_type值不符合规定'
        return result
    else:
        engine = create_engine(
            'mysql+pymysql://%s:%s@%s:%s/%s' % (
                database['username'], parse.quote(database['password']), database['ip'], database['port'],
                database['db_name']))
        conn = engine.connect()
        if sql_type == 'select':
            data = conn.execute(sql)
            df_data = __select_data_to_df(data)
        elif sql_type == 'insert':
            pass
        elif sql_type == 'update':
            df_data = conn.execute(sql)
        conn.close()
        return df_data


def conn_sqlserver(database, sql_type, sql):
    if sql_type not in SQL_TYPE:
        result = 'sql_type值不符合规定'
        return result
    else:
        username = database['username']
        pwd = parse.quote(database['password'])
        ip = database['ip']
        if 'port' in database:
            port = ':%s' % database['port']
        else:
            port = ''
        db_name = database['db_name']
        if 'charset' in database:
            charset = '?charset=%s' % database['charset']
        else:
            charset = ''
        engine = create_engine('mssql+pymssql://%s:%s@%s%s/%s%s' % (username, pwd, ip, port, db_name, charset))
        conn = engine.connect()
        if sql_type == 'select':
            data = conn.execute(sql)
            df_data = __select_data_to_df(data)
        elif sql_type == 'insert':
            pass
        elif sql_type == 'update':
            df_data = conn.execute(sql)
        conn.close()
    return df_data


def __select_data_to_df(data):
    '''
    函数内容:数据库select所得数据转存为Dataframe
    参数:data 数据库select所得数据
    返回值:df_data Dataframe格式的数据库select数据,包含列名(数据库中对应列名)
    '''
    list_data = []
    list_column = [i[0] for i in data.cursor.description]
    for item in data:
        list_data.append(item)
    df_data = pd.Dataframe(data=list_data, columns=list_column)
    return df_data

第三方库的导入问题
除了py文件中明确import的第三方库,不同数据库还需要各自的第三方库:
oracle==>cx_Oracle
mysql==>PyMySQL
sqlserver==>pymssql
数据库连接问题
1、Oracle 19c链接和其他版本的链接方式稍有不同,详见代码中conn_oracle函数部分
2、密码中若有特殊符号,例如@时,若直接传参,会令程序识别出错,需要转义传参,如下:

engine = create_engine( 'mysql+pymysql://%s:%s@%s:%s/%s' % (
         database['username'], parse.quote(database['password']), database['ip'],database['port'],database['db_name']))

最后
该代码是自用,适用场景有限,代码格式也不算太标准,若有任何建议欢迎反馈。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/321851.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号