栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Python 交互数据库总结

Python 交互数据库总结

1. 入库

大数据量且快速,使用IO。

import io
from sqlalchemy import create_engine

def write_to_table(df, table_name,schema, if_exists='append'):
    #engine database
    db_engine = create_engine('//user:password@host/dbname')
    string_data_io = io.StringIO()
    df.to_csv(string_data_io, sep='|', index=False)
    pd_sql_engine = pd.io.sql.pandasSQL_builder(db_engine)
    table = pd.io.sql.SQLTable(table_name, pd_sql_engine, frame=df,
                               index=False, if_exists=if_exists,schema = '{schema}')
    table.create()
    string_data_io.seek(0)
#     string_data_io.readline()  # remove header
    with db_engine.connect() as connection:
        with connection.connection.cursor() as cursor:
            copy_cmd = "COPY {schema}.%s FROM STDIN DELIMITER '|' CSV HEADER" %table_name  # remove header
            cursor.copy_expert(copy_cmd, string_data_io)
        connection.connection.commit()

普通 慢 .tosql

def putsimilarData(df, name):
    engine = create_engine(.....)
    try:
        df.to_sql(name, engine, index=False, if_exists='append',schema='{schema}')
    except Exception as e:
        print(e)
2.update
df_predict = df_predict[['model_predict', 'upset_type', 'chnl_code']]
update_sql(df_predict)
logger.info('入库成功:{0},{1},{2}'.format(chnl_code,predict_sales,dates))


def update_sql(df):
    """
    :param df:三个值 geohash_daily_sales = '%s',upset_type = '%s' chnl_code='%s'
    :return:
    """
    client = psycopg2.connect(dbname=, user=, password=, host=,port=)
    cursor = client.cursor()
    init_sql = """ update table 
                   set    geohash_daily_sales = '%s',
                          upset_type = '%s'
                   where  chnl_code='%s' """
    for value in df.values.tolist():
        sql = init_sql % (tuple(value))
        cursor.execute(sql)  # 传值
        client.commit()  # 提交事务
    client.close()
## 可以用update代替,不再复用

def update_importance_sql(df,chnl_code,table='familymart_feature_importance'):
    """
        :param df:
        :param status: 0清空更新;1增量更新
        :return:
        """
    #engine = create_engine()

    # 分情况  增量直接写入 修改则是删除再写入
    test_sql = f""" select chnl_code
                    from {table}
                    where chnl_code ='{chnl_code}'
                """
    df_test = pd.read_sql(test_sql, client)
    client.close()

    if len(df_test) == 0:
        #基盘 增量直接写入
        putsimilarData(df, table)
    else:
        delete_sql = f"""delete from {table}
                        where chnl_code ='{chnl_code}'
                    """
        # 删除
        deletesimilarData(delete_sql)
        # 重新写入
        putsimilarData(df, table)
 3. 删除
def deletesimilarData(sql):
    # # 获得连接
     conn = psycopg2.connect(dbname=...)
    # 获得游标对象
    cursor = conn.cursor()
    # 执行语句
    cursor.execute(sql)
    # 事物提交
    conn.commit()
    # 关闭数据库连接
    conn.close()


 4. 读

# pg
def connect_sql_poi(sql):
    '''连接MySQL或SqlServer数据库
    参数说明:
    RDBMS.要连接的数据库,mysql或者sqlsever
    database:数据库名
    sql:sql语句'''
    client = psycopg2.connect(dbname=。。。)
    df = pd.read_sql(sql, client)
    client.close()
    return df

#Hive
from pyhive import hive
def conn_hive(sql):
    connection = hive.connect(host=,
                          port=,
                          database=,
                          username=)
    cur = connection.cursor()
    cur.execute(sql)
    columns = [col[0] for col in cur.description]
    result = [dict(zip(columns, row)) for row in cur.fetchall()]
    Main = pd.Dataframe(result)
    Main.columns = columns 
    return Main

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/422900.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号