栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Python

Clickhouse: A股分时线bar数据python实验-批量导入

Python 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Clickhouse: A股分时线bar数据python实验-批量导入

from clickhouse_driver import Client
import pandas as pd
import os
from datetime import datetime, date
import time
import math

def get_all_files_by_root_sub_dirs(directory, file_type):
   data = list()
   if os.path.isdir(directory):  # 是目录
      dir_list = os.walk(directory)  # os.listdir(directory)
      for (root, sub, files) in dir_list:
          for file in files:
             path = os.path.join(root, file)  # root +'\'+file
             if path.endswith(file_type):
                data.append(path)
   else:  # 不是目录,直接输出
      if directory.endswith(file_type):
          data.append(directory)
   return data
def get_code_from_csv_file(file):
    # .csv; .h5 file
    # D:join_quant_datafuturesminuteA.XDCEA1909.XDCE_2019-07-25_2019-08-12.CSV
    # D:join_quant_datafuturesminuteA.XDCE.h5
    s = os.path.basename(file)  # A1909.XDCE_2019-07-25_2019-08-12.CSV
    sp = s.split('_')[0]

    if sp.endswith(".csv") or sp.endswith(".CSV"):
        code = sp[:-4]
    else:
        code = sp
    return code  # A1909.XDCE

# CREATE TABLE stock_tb
# (
#     `code` String,
#     `datetime` DateTime,
#     `open` Float32,
#     `close` Float32,
#     `low` Float32,
#     `high` Float32,
#     `volume` Float64,
#     `money` Float64,
#     `factor` Float32,
#     `high_limit` Float32,
#     `low_limit` Float32,
#     `avg` Float32,
#     `pre_close` Float32,
#     `paused` Float32,
#     `open_interest` Float64
# )
# ENGINE = MergeTree
# ORDER BY datetime

def insert_data():
    client = Client('localhost')
    database_name = "my_db"
    table_name = 'stock_tb'
    dir_path = "/mnt/d/join_quant_data_stock_product/stock/minute/"
    files = get_all_files_by_root_sub_dirs(dir_path,".csv") 
    t0 = time.time()
    file_num = 0 
    for _file in files:
        t_file = time.time()
        print(f"{_file}  => 第{file_num}个文件, 总共:{len(files)}个!")
        block_insert_data = [] # 每个文件当批量insert的单元
        df = pd.read_csv(_file)
        code = get_code_from_csv_file(_file)
        for row in df.itertuples():
            _row = list(row)[0:15]
            _row[0] = code                                            # code
            _row[1] = datetime.strptime(row[1],'%Y-%m-%d %H:%M:%S')   # datetime: 
            _row[2] = float(row.open)                                 # open: ,
            _row[3] = float(row.close)                                # close: ,
            _row[4] = float(row.low)                                  # low:    
            _row[5] = float(row.high)                                 # high: ,
            _row[6] = float(row.volume)                               # volume: ,
            _row[7] = float(row.money)                                # money: ,
            _row[8] = float(row.factor)                               # factor: ,
            _row[9] = float(row.high_limit)                           # high_limit:,
            _row[10] = float(row.low_limit)                           # low_limit:,
            _row[11] = float(row.avg)                                 # avg:  ,
            _row[12] = float(row.pre_close)                           # pre_close:,
            _row[13] = float(row.paused)                              # paused: ,
            if math.isnan(row.open_interest):
                _row[14] = 0.0 
            else:
                _row[14] = float(row.open_interest)                   #open_interest:
            block_insert_data.append(_row)
            # if file_num ==0:
            #     print(_row)
        # 逐条也可以insert json
        # sql = f"INSERT INTO {database_name}.{table_name} FORMAT JSonEachRow {json.dumps(row_data) * 1}"
        # 批量insert 
        client.execute(f'INSERT INTO {database_name}.{table_name}  VALUES', block_insert_data,types_check=True)
        table_info = client.execute(f'select count(1) from {database_name}.{table_name}')
        print(f"clickhouse stock_tb 表信息: {table_info}")
        print(f"第{file_num}个文件 总共:{len(files)}个 => {_file}读写完成! cost time:{time.time()-t_file}")
        file_num = file_num +1

    print(f"文件总共:{file_num}读写完成! cost time:{time.time()-t0}")

insert_data()


运行情况如下:


期间clickhouse表数据的动态情况:[不是最后完成入库的情况]

SELECt
    table AS `表名`,
    sum(rows) AS `总行数`,
    formatReadableSize(sum(data_uncompressed_bytes)) AS `原始大小`,
    formatReadableSize(sum(data_compressed_bytes)) AS `压缩大小`,
    round((sum(data_compressed_bytes) / sum(data_uncompressed_bytes)) * 100, 0) AS `压缩率`
FROM system.parts
WHERe table IN ('stock_tb')
GROUP BY table

数据量较大,文件有20万个csv,估计有199G左右,数据导入后,后面便于用于测试查询。
另外,可以看出,clickhouse的压缩率很高,这个优点是空间占用较小。

查看一下表的情况:

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/739126.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号