场景:测试百万条数据条件下,前端查询效率是否满足性能需求。
准备条件:通过mysql语法可知,使用insert into table(col1,col2,col3) values
(value1,value2,value3),(value1,value2,value3);可以一次性批量插入多条数据,利用此特性,我们可以通过python脚本拼接sql语句,实现一次输入多条数据,此处1百万条数据我分为10次插入,每次拼接10万条数据,脚本包含三部分:数据库连接,insert语句拼接及脚本执行,代码如下:
# -*- coding: utf-8 -*-
# @CreateTime : 2021/9/24 10:17
# @ModifyTime : 2021/9/24 10:17
# @Author : xuming
# @File : connect_mysql.py
# @Software: PyCharm
# @Description:
import os
import pymysql
from common.parse_config import ParseConfig
from common.log import log
os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8'
class MysqlConnect(object):
def __init__(self, service="icp_res"):
host = '147.119.19.211'
port = 3306
service = service
user = 'root'
password = 'password'
self.db = pymysql.connect(host=host, port=port, db=service, user=user, passwd=password, charset='utf8')
self.cr = self.db.cursor()
@staticmethod
def sql_read(file_path):
"""
功能:读取sql查询语句 (单个文件中存储1条sql语句)
file_path:sql语句存储路径
返回值:sql语句
"""
with open(file_path, 'r', encoding='utf8') as fp:
tmp = []
for i in fp:
tmp.append(i)
str_sql = [str(i) for i in tmp]
sql = ''.join(str_sql)
return sql
def sql_select(self, sql):
"""
功能:查询数据库记录
sql:数据库命令
返回值:包含以元组形式返回的查询结果的列表
"""
self.cr.execute(sql)
rs = self.cr.fetchall()
return rs
def sql_select_dict(self, sql):
"""
功能:查询数据库记录
sql:数据库命令
返回值:包含以字典形式返回的查询结果的列表
"""
rs = self.sql_select(sql)
cols = [d[0] for d in self.cr.description]
rs_format = []
for row in rs:
rs_dict = dict(zip(cols, row))
rs_format.append(rs_dict)
return rs_format
def sql_iud(self, sql):
"""
功能:添加、删除、修改数据库记录
sql:数据库命令
返回值:None
"""
try:
self.cr.execute(sql)
self.db.commit()
except pymysql.IntegrityError as ErrInfo:
log.info("执行错误:{}".format(sql))
raise ErrInfo
def clear_db_data(self, path, info):
"""
功能:清理数据库
path:清理数据库SQL文件路径
info:删除条件,用于传入sql
返回值:None
"""
with open(path, "r", encoding="utf-8") as f:
for i in f.readlines():
self.sql_iud(i.format(parameter=info))
def get_table_columns(self, sql):
"""
功能:获取数据库表的所有字段名称
sql:数据库命令
返回值:包含所有字段名称的列表
"""
self.sql_select(sql)
columns = [d[0] for d in self.cr.description]
return columns
# def __del__(self):
# self.cr.close()
# self.db.close()
if __name__ == "__main__":
import time
start_time = time.time()
def get_million_number(start_time, end_time):
id = start_time
code = start_time
sql_values = "({}, '{}', '{}', 500006, 10, 1462624898605121537, NULL, '512001,512003,512005,512006,512007,512008,512004', NULL, 0, NULL, '2021-11-22 11:32:06', '2021-11-22 11:32:06', '{}', NULL)"
for _ in range(start_time, end_time):
id += 1
code += 1
sql = sql_values.format(id, code, code, code)
yield sql
sqls = ""
sql_insert = "INSERT INTO `icp_res`.`tb_equipment` (`id`, `code`, `name`, `category`, `type_id`, `organization_id`, `extend_info`, `capability`, `remark`, `status`, `gbid`, `gmt_created`, `gmt_modified`, `account`, `executor_id`) VALUES "
times = [i for i in range(0, 1000001, 100000)]
print(times)
for i in range(10):
sql_many = ",".join(get_million_number(times[i], times[i + 1]))
print(times[i], times[i + 1])
res1 = MysqlConnect().sql_iud(sql_insert + sql_many + ";")
end_time = time.time()
print(end_time - start_time)
"""
测试结果
[0, 100000, 200000, 300000, 400000, 500000, 600000, 700000, 800000, 900000, 1000000]
0 100000
100000 200000
200000 300000
300000 400000
400000 500000
500000 600000
600000 700000
700000 800000
800000 900000
900000 1000000
70.83681845664978
Process finished with exit code 0
"""
场景二:sql语句中存在花括号,使用2个花括号即可实现将花括号改变为普通符号,脚本如下:
# -*- coding: utf-8 -*-
# @CreateTime : 2021/9/24 10:17
# @ModifyTime : 2021/9/24 10:17
# @Author : xuming
# @File : connect_mysql.py
# @Software: PyCharm
# @Description:
import os
import pymysql
from common.parse_config import ParseConfig
from common.log import log
os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8'
class MysqlConnect(object):
def __init__(self, service="icp_iap"):
# cfg = ParseConfig()
# env = cfg.get_cfg("ENV", "default")
# if env == "TEST":
# db_name = "ICP_X_TEST_MYSQL"
# else:
# db_name = "ICP_X_DEV_MYSQL"
host = '147.209.23.10'
port = 30336
service = service
user = 'root'
password = 'password'
self.db = pymysql.connect(host=host, port=port, db=service, user=user, passwd=password, charset='utf8')
self.cr = self.db.cursor()
@staticmethod
def sql_read(file_path):
"""
功能:读取sql查询语句 (单个文件中存储1条sql语句)
file_path:sql语句存储路径
返回值:sql语句
"""
with open(file_path, 'r', encoding='utf8') as fp:
tmp = []
for i in fp:
tmp.append(i)
str_sql = [str(i) for i in tmp]
sql = ''.join(str_sql)
return sql
def sql_select(self, sql):
"""
功能:查询数据库记录
sql:数据库命令
返回值:包含以元组形式返回的查询结果的列表
"""
self.cr.execute(sql)
rs = self.cr.fetchall()
return rs
def sql_select_dict(self, sql):
"""
功能:查询数据库记录
sql:数据库命令
返回值:包含以字典形式返回的查询结果的列表
"""
rs = self.sql_select(sql)
cols = [d[0] for d in self.cr.description]
rs_format = []
for row in rs:
rs_dict = dict(zip(cols, row))
rs_format.append(rs_dict)
return rs_format
def sql_iud(self, sql):
"""
功能:添加、删除、修改数据库记录
sql:数据库命令
返回值:None
"""
try:
self.cr.execute(sql)
self.db.commit()
except pymysql.IntegrityError as ErrInfo:
log.info("执行错误:{}".format(sql))
raise ErrInfo
def clear_db_data(self, path, info):
"""
功能:清理数据库
path:清理数据库SQL文件路径
info:删除条件,用于传入sql
返回值:None
"""
with open(path, "r", encoding="utf-8") as f:
for i in f.readlines():
self.sql_iud(i.format(parameter=info))
def get_table_columns(self, sql):
"""
功能:获取数据库表的所有字段名称
sql:数据库命令
返回值:包含所有字段名称的列表
"""
self.sql_select(sql)
columns = [d[0] for d in self.cr.description]
return columns
# def __del__(self):
# self.cr.close()
# self.db.close()
if __name__ == "__main__":
import time
start_time = time.time()
def get_million_number(start_time, end_time):
id = start_time
sql_values = """({id}, 858873766871040, 2, '念蕾appears atIAP_ALARM_UNKNOWN(0.0,0.0)', '2021-11-29 15:43:15', '1', 0.00000000, 0.00000000, 'IAP_ALARM_UNKNOWN(0.0,0.0)', '123456', '98765445670987654679', '123456', '{{"scr":97,"age":3,"gender":2,"eyeglass":2,"mouthmask":2}}', '{{"left":0,"top":0,"right":0,"bottom":0}}', '{{"sceneImageUrl":"/static/20211129/000000243656736265231625_scene.jpg","thumbImageUrl":"/static/20211129/000000243656736265231625_thumb.jpg"}}', '{{"blackGroupName":"测试组7","country":"韩国","faceImageUrl":"/static/20211129/000000243656736265231625_face.jpg","occupation":"大学教授","gender":-1,"description":"教授","credentialType":0,"credentialNumber":"8789456156432123","bornTime":"1989-01-06","nationality":"","name":"念蕾{id}"}}', 1, 2, 1465146127773859843, '2021-11-29 15:42:45', 0, '2021-11-29 15:42:40', '2021-11-29 15:42:45', '{id}')"""
for _ in range(start_time, end_time):
id += 1
sql = sql_values.format(id=id)
yield sql
sql_insert = "INSERT INTO `icp_iap`.`tb_alarm` (`id`, `suspect_task_id`, `type`, `title`, `alarm_time`, `alarm_level`, `longtitude`, `latitude`, `address`, `camera_Isdn`, `camera_gbid`, `camera_name`, `meta_data`, `position`, `picture`, `object_info`, `status`, `operation_type`, `operation_id`, `operation_time`, `event_id`, `gmt_created`, `gmt_modified`, `alarm_id`) VALUES "
times = [i for i in range(0, 1000001, 50000)]
print(times)
for i in range(len(times)-1):
sql_many = ",".join(get_million_number(times[i], times[i + 1]))
res1 = MysqlConnect().sql_iud(sql_insert + sql_many + ";")
print(i)
end_time = time.time()
print(end_time - start_time)



