硬件设施:
大数据插入脚本:
import json, time
import pymongo,traceback
from clickhouse_driver import Client
import uuid
import random
# 装饰器统计运行耗时
def coast_time(func):
def fun(*args, **kwargs):
t = time.perf_counter()
result = func(*args, **kwargs)
print(f'func {func.__name__} coast time:{time.perf_counter() - t:.8f} s')
return result
return fun
class MyEncoder(json.JSONEncoder):
"""
func:
解决Object of type 'bytes' is not JSON serializable
"""
def default(self, obj):
if isinstance(obj, bytes):
return str(obj, encoding='utf-8')
return json.JSONEncoder.default(self, obj)
create_task_table = """CREATE TABLE IF NOT EXISTS task(
`_id` String,
`task_name` String,
`task_size` UInt16,
`status` UInt8
)
ENGINE = MergeTree() PRIMARY KEY _id;
"""
ck_client = Client(host='192.168.12.199', port=9000, database="testdb", user='default', password='123456', send_receive_timeout=20)
mongo_client = pymongo.MongoClient("mongodb://192.168.12.199:27017/")
mongo_db = mongo_client["testdb"]
mongo_col = mongo_db["task"]
@coast_time
def insert_mongo_task_data(total, patch):
"""
func:批量插入任务数据到mongo
"""
mongo_col.drop()
sig = 0
data_list = []
for i in range(total):
sig += 1
try:
dicts = {
'_id':str(uuid.uuid1()),
'task_name': 'task_' + str(i),
'task_size': i,
'status': random.choice([0, 1])
}
data_list.append(dicts)
if sig == patch:
mongo_col.insert_many(data_list)
sig = 0
data_list=[]
except Exception as e:
print("task name :%s process failed:%s" % ('task_' + str(i), traceback.print_exc()))
if len(data_list) >0:
mongo_col.insert_many(data_list)
@coast_time
def insert_ck_task_data(total, patch):
"""
func:批量插入任务数据到CK
"""
ck_client.execute('DROP TABLE IF EXISTS task')
ck_client.execute(create_task_table)
sig = 0
data_list = []
for i in range(total):
sig += 1
try:
dicts = {
'_id':str(uuid.uuid1()),
'task_name': 'task_' + str(i),
'task_size': i,
'status': random.choice([0, 1])
}
data_list.append(dicts)
if sig == patch:
ck_client.execute("INSERT INTO task(*) VALUES", data_list, types_check=True)
sig = 0
data_list=[]
except Exception as e:
print("task name :%s process failed:%s" % ('task_' + str(i), traceback.print_exc()))
if len(data_list) >0:
ck_client.execute("INSERT INTO task(*) VALUES", data_list, types_check=True)
insert_ck_task_data(100000000, 10000)
insert_mongo_task_data(100000000, 10000)
批次
10000
:
| 插入条数 | mongo 耗时 | ck 耗时 | Mongo 大小 | Ck 大小 |
| 1000 | 0.01972508s | 0.01732014s | 99.5K | 12K |
| 10000 | 0.12277857s | 0.08004815s | 1004.8K | 119K |
| 100000 | 1.12529528s | 0.73075602s | 9.0M | 1.9M |
| 1000000 | 10.92156150s | 7.17739819s | 100M | 49M |
| 10000000 | 108.91806854s | 72.16343116s | 1009.8M | 117M |
| 100000000 | 1189.25558783s | 748.89750133s | 10G | 1.1G |
| 插入条数 | db.task.find({'task_name':'task_1'}) .explain("executionStats") 耗时 | select * from testdb.task where task_name ='task_1' 耗时 |
| 1000 | 0ms | 0.004 sec |
| 10000 | 3ms | 0.008 sec |
| 100000 | 29ms | 0.006 sec |
| 1000000 | 340ms | 0.009 sec |
| 10000000 | 3281ms | 0.035 sec |
| 100000000 | 165762ms | 0.626 sec |
| 插入条数 | db.task.find({'task_name':{ '$regex':'.*_1.*'}}) .explain("executionStats") 耗时 | select * from testdb.task where task_name like '%_1%' 耗时 |
| 1000 | 0ms | 0.004 sec |
| 10000 | 4ms | 0.012 sec |
| 100000 | 42ms | 0.022 sec |
| 1000000 | 468ms | 0.077 sec |
| 10000000 | 5871ms | 0.670 sec |
| 100000000 | 112334ms | 21.094 sec |



