-
连接mysql 数据库
# @time: 2022/1/21 13:12 # @function : 连接mysql数据库 import pymysql import pandas as pd class Mysql_: def __init__(self): self.db = pymysql.Connect(user='user', password='xxxx', host='xx.xx.xx.xx', database='database', port=3306) self.cursor = self.db.cursor() def select(self, sql, data=None): states = self.cursor.execute(sql) res = self.cursor.fetchall() return states, res def insert_one(self, sql, data): try: self.cursor.execute(sql, data) self.db.commit() except: self.db.rollback() finally: self.cursor.close() def insert_many(self, sql, data): try: self.cursor.executemany(sql, data) self.db.commit() except: self.db.rollback() finally: self.cursor.close() def __del__(self): self.cursor.close() self.db.close() if __name__ == '__main__': data = pd.read_csv('xxx.csv') data.fillna(0, inplace=True) mysql_ = Mysql_() sql = "insert into tablename values(%(filed_name1)s,%(filed_name2)s)" items = [] for item in data.values[:1]: values = [] for i in item[1:]: if type(i) != str: values.append(int(i)) else: values.append(i) data = dict(zip(['filed_name1','filed_name2'], values)) items.append(data) # 插入数据库多条数据,使用%(name)s作sql语句占位符,数据参数项使用[{name1:1,name2:2},{}]格式传入,经测试插入200万数据,花费大概1分钟 mysql_.insert_many(sql, items) -
分别使用impyla和ibis两种模块,连接hive/impala数仓
from impala.dbapi import connect from krbcontext import krbcontext from impala.util import as_pandas import ibis # 配置文件 class Config: config = { "kerberos_principal":"kerberos_user", "keytab_file":"/home/user/user.keytab", "host":"xxxxx", "port":10000, "AUTH_MECHANISM":"GSSAPI", # ibis连接impala需要连接hdfs分布式文件的客户端,否则只能读取数据,不能进行表操作、数据插入更新等。 "webhdfs_host1":"xxxxxx",# hdfs服务 "webhdfs_port":9870 } # 使用impyla连接 class Hive_connect: def __init__(self,config,database,service_name='hive'): # 进行kerberos认证 with krbcontext(using_keytab=True,principal=config.config['kerberos_principal'],keytab_file=config.config['keytab_file']): self.conn = connect(host=config.config['host'],port=config.config['port'],database=database,auth_mechanism=config.config['AUTH_MECHANISM'] ,kerberos_service_name=service_name) self.cur = self.conn.cursor() def select(self,sql): self.cur.execute(sql) #print(self.cur.description) # 打印结果list(字段和字段类型) #res = self.cur.fetchall() # sql查询结果list集 # print(res) df = as_pandas(self.cur) # 获取dataframe类型数据,将数据转化为dataframe数,数据会有些小问题,比如数据库字段类型为float会转化为object return df def insert_one(self,sql): state = 'INSERT_FAIL' try: self.cur.execute(sql) self.conn.commit() state = self.cur.status() except: self.conn.rollback() finally: self.conn_close() return state # 'FINISHED_STATEDI' # 插入多条数据,看源码得知,就是遍历data,执行insert_one,hive插入数据会进行MR运算,速度非常慢,慢的离谱,插入多条sql占位符可以上面mysql的一样,也可以使用%s进行占位,插入数据格式[(data1,data2),(),()] def insert_many(self,sql,data): state = 'INSERT_FAIL' try: self.cur.executemany(operation=sql,seq_of_parameters=data) self.conn.commit() state = self.cur.status() except: self.conn.rollback() finally: self.conn_close() return state # 'FINISHED_STATEDI' def delete(self): raise AttributeError ('不允许执行删除操作') def conn_close(self): self.cur.close() # 先关游标连接,在断客户端连接 self.conn.close() # 使用ibis连接 class Hive_connect_ibis: def __init__(self,config,database,service_name='hive'): with krbcontext(using_keytab=True,principal=config.config['kerberos_principal'],keytab_file=config.config['keytab_file']): self.hdfs = ibis.impala.hdfs_connect(host=config.config['webhdfs_host1'],port=config.config['webhdfs_port'],auth_mechanism=config.config['AUTH_MECHANISM'] ,use_https=False,verify=False) self.client = ibis.impala.connect(host=config.config['host'],port=config.config['port'],database=database,hdfs_client=self.hdfs ,auth_mechanism=config.config['AUTH_MECHANISM'],kerberos_service_name=service_name) def select(self,sql): res = self.client.sql(sql) df = res.execute(limit='default'/None)# 默认返回的数据集行数 self.conn_close() return df def insert(self): pass # hdfs未连上,禁止创建数据库、数据表、插入或更新数据 raise AttributeError ('不允许执行删除操作') def conn_close(self): self.client.close() config = Config() if __name__ == '__main__': config = Config() hive_ = Hive_connect_ibis(config=config,database='database') hive_.execute('select * from d_cust limit 10')


![记录python使用pymysql连接mysql数据库,使用impyla、ibis-framework[impala]连接hiveimpala(kerberos)数据库(备以后查阅) 记录python使用pymysql连接mysql数据库,使用impyla、ibis-framework[impala]连接hiveimpala(kerberos)数据库(备以后查阅)](http://www.mshxw.com/aiimages/31/853458.png)
