from clickhouse_driver import Client
import pandas as pd
import os
from datetime import datetime, date
import time
import math
def get_all_files_by_root_sub_dirs(directory, file_type):
data = list()
if os.path.isdir(directory): # 是目录
dir_list = os.walk(directory) # os.listdir(directory)
for (root, sub, files) in dir_list:
for file in files:
path = os.path.join(root, file) # root +'\'+file
if path.endswith(file_type):
data.append(path)
else: # 不是目录,直接输出
if directory.endswith(file_type):
data.append(directory)
return data
def get_code_from_csv_file(file):
# .csv; .h5 file
# D:join_quant_datafuturesminuteA.XDCEA1909.XDCE_2019-07-25_2019-08-12.CSV
# D:join_quant_datafuturesminuteA.XDCE.h5
s = os.path.basename(file) # A1909.XDCE_2019-07-25_2019-08-12.CSV
sp = s.split('_')[0]
if sp.endswith(".csv") or sp.endswith(".CSV"):
code = sp[:-4]
else:
code = sp
return code # A1909.XDCE
# CREATE TABLE stock_tb
# (
# `code` String,
# `datetime` DateTime,
# `open` Float32,
# `close` Float32,
# `low` Float32,
# `high` Float32,
# `volume` Float64,
# `money` Float64,
# `factor` Float32,
# `high_limit` Float32,
# `low_limit` Float32,
# `avg` Float32,
# `pre_close` Float32,
# `paused` Float32,
# `open_interest` Float64
# )
# ENGINE = MergeTree
# ORDER BY datetime
def insert_data():
client = Client('localhost')
database_name = "my_db"
table_name = 'stock_tb'
dir_path = "/mnt/d/join_quant_data_stock_product/stock/minute/"
files = get_all_files_by_root_sub_dirs(dir_path,".csv")
t0 = time.time()
file_num = 0
for _file in files:
t_file = time.time()
print(f"{_file} => 第{file_num}个文件, 总共:{len(files)}个!")
block_insert_data = [] # 每个文件当批量insert的单元
df = pd.read_csv(_file)
code = get_code_from_csv_file(_file)
for row in df.itertuples():
_row = list(row)[0:15]
_row[0] = code # code
_row[1] = datetime.strptime(row[1],'%Y-%m-%d %H:%M:%S') # datetime:
_row[2] = float(row.open) # open: ,
_row[3] = float(row.close) # close: ,
_row[4] = float(row.low) # low:
_row[5] = float(row.high) # high: ,
_row[6] = float(row.volume) # volume: ,
_row[7] = float(row.money) # money: ,
_row[8] = float(row.factor) # factor: ,
_row[9] = float(row.high_limit) # high_limit:,
_row[10] = float(row.low_limit) # low_limit:,
_row[11] = float(row.avg) # avg: ,
_row[12] = float(row.pre_close) # pre_close:,
_row[13] = float(row.paused) # paused: ,
if math.isnan(row.open_interest):
_row[14] = 0.0
else:
_row[14] = float(row.open_interest) #open_interest:
block_insert_data.append(_row)
# if file_num ==0:
# print(_row)
# 逐条也可以insert json
# sql = f"INSERT INTO {database_name}.{table_name} FORMAT JSonEachRow {json.dumps(row_data) * 1}"
# 批量insert
client.execute(f'INSERT INTO {database_name}.{table_name} VALUES', block_insert_data,types_check=True)
table_info = client.execute(f'select count(1) from {database_name}.{table_name}')
print(f"clickhouse stock_tb 表信息: {table_info}")
print(f"第{file_num}个文件 总共:{len(files)}个 => {_file}读写完成! cost time:{time.time()-t_file}")
file_num = file_num +1
print(f"文件总共:{file_num}读写完成! cost time:{time.time()-t0}")
insert_data()
运行情况如下:
期间clickhouse表数据的动态情况:[不是最后完成入库的情况]
SELECt
table AS `表名`,
sum(rows) AS `总行数`,
formatReadableSize(sum(data_uncompressed_bytes)) AS `原始大小`,
formatReadableSize(sum(data_compressed_bytes)) AS `压缩大小`,
round((sum(data_compressed_bytes) / sum(data_uncompressed_bytes)) * 100, 0) AS `压缩率`
FROM system.parts
WHERe table IN ('stock_tb')
GROUP BY table
数据量较大,文件有20万个csv,估计有199G左右,数据导入后,后面便于用于测试查询。
另外,可以看出,clickhouse的压缩率很高,这个优点是空间占用较小。
查看一下表的情况:



