python链接不同数据库会使用到不同的第三方库,SQLAlchemy可以很好的整合大部分常用数据库,利用SQLAlchemy可以很好的完成自用的数据库链接模块。因python中数据存储常用到Dataframe,所以在查询中增加了统一Dataframe格式输出,方便后续的操作。
因为工作原因,目前只完成了Oracle、MySQL、sqlserver三种数据库的查询部分,后续会陆续完善其余部分以及增加pg数据库的链接和相关操作。
import cx_Oracle
import pandas as pd
from sqlalchemy import create_engine
import sqlalchemy
from urllib import parse
# 暂时只考虑了简单的增删查改
SQL_TYPE = ['select', 'insert', 'update', 'truncate']
def conn_oracle(database, sql_type, sql):
if sql_type not in SQL_TYPE:
result = 'sql_type值不符合规定'
return result
else:
try:
engine = create_engine(
'oracle+cx_oracle://%s:%s@%s:%s/%s' % (
database['username'], parse.quote(database['password']), database['ip'], database['port'],
database['db_name']))
conn = engine.connect()
except sqlalchemy.exc.DatabaseError as e:
if e.orig.args[0].code == 12545:
result = 'ip不正确,数据库连接失败'
return result
if e.orig.args[0].code == 12541:
result = '端口号不正确,数据库连接失败'
return result
elif e.orig.args[0].code == 12505:
# 经实测,Oracle 19c用create_engine方式会报错,说SID无效,经过多次尝试,改用cx_Oracle的传参形式可以成功
try:
conn = cx_Oracle.connect(user=database['username'], password=parse.quote(database['password']),
dsn="%s:%s/%s" % (database['ip'], database['port'], database['db_name']))
except sqlalchemy.exc.DatabaseError as e:
result = str(e)
return result
else:
result = str(e)
return result
cursor = conn.cursor()
if sql_type == 'select':
# Oracle没使用通用的__select_data_to_df函数,因为列名获取稍有差异
data = cursor.execute(sql)
list_data = []
list_column = [i[0] for i in data.description]
for item in data:
list_data.append(item)
df_data = pd.Dataframe(data=list_data, columns=list_column)
elif sql_type == 'insert':
cursor.execute(sql)
df_data = 0
elif sql_type == 'update':
pass
elif sql_type == 'truncate':
cursor.execute(sql)
df_data = 0
conn.commit()
conn.close()
return df_data
def conn_mysql(database, sql_type, sql):
if sql_type not in SQL_TYPE:
result = 'sql_type值不符合规定'
return result
else:
engine = create_engine(
'mysql+pymysql://%s:%s@%s:%s/%s' % (
database['username'], parse.quote(database['password']), database['ip'], database['port'],
database['db_name']))
conn = engine.connect()
if sql_type == 'select':
data = conn.execute(sql)
df_data = __select_data_to_df(data)
elif sql_type == 'insert':
pass
elif sql_type == 'update':
df_data = conn.execute(sql)
conn.close()
return df_data
def conn_sqlserver(database, sql_type, sql):
if sql_type not in SQL_TYPE:
result = 'sql_type值不符合规定'
return result
else:
username = database['username']
pwd = parse.quote(database['password'])
ip = database['ip']
if 'port' in database:
port = ':%s' % database['port']
else:
port = ''
db_name = database['db_name']
if 'charset' in database:
charset = '?charset=%s' % database['charset']
else:
charset = ''
engine = create_engine('mssql+pymssql://%s:%s@%s%s/%s%s' % (username, pwd, ip, port, db_name, charset))
conn = engine.connect()
if sql_type == 'select':
data = conn.execute(sql)
df_data = __select_data_to_df(data)
elif sql_type == 'insert':
pass
elif sql_type == 'update':
df_data = conn.execute(sql)
conn.close()
return df_data
def __select_data_to_df(data):
'''
函数内容:数据库select所得数据转存为Dataframe
参数:data 数据库select所得数据
返回值:df_data Dataframe格式的数据库select数据,包含列名(数据库中对应列名)
'''
list_data = []
list_column = [i[0] for i in data.cursor.description]
for item in data:
list_data.append(item)
df_data = pd.Dataframe(data=list_data, columns=list_column)
return df_data
第三方库的导入问题
除了py文件中明确import的第三方库,不同数据库还需要各自的第三方库:
oracle==>cx_Oracle
mysql==>PyMySQL
sqlserver==>pymssql
数据库连接问题
1、Oracle 19c链接和其他版本的链接方式稍有不同,详见代码中conn_oracle函数部分
2、密码中若有特殊符号,例如@时,若直接传参,会令程序识别出错,需要转义传参,如下:
engine = create_engine( 'mysql+pymysql://%s:%s@%s:%s/%s' % (
database['username'], parse.quote(database['password']), database['ip'],database['port'],database['db_name']))
最后
该代码是自用,适用场景有限,代码格式也不算太标准,若有任何建议欢迎反馈。



