栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Python

python 插入clickhouse数据报错

Python 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

python 插入clickhouse数据报错

python 插入clickhouse报错 clickhouse : Cannot parse JSON string: expected opening quote: (while read the value of key data): (at row 1): While executing SourceFromInputStream

    最近工作需要使用clickhouse,然后在使用python插入clickhouse数据中,报错 clickhouse : Cannot parse JSON string: expected opening quote: (while read the value of key data): (at row 1): While executing SourceFromInputStream,下面的错误代码

#这块主要是从mysql设备中获取数据,主要错误原因就是这里(现在截图是已经改完的)
def asset_type_name():
    mysql_data = database(config['mysql']['host'], config['mysql']['port'], config['mysql']['user'], config['mysql']['password'])
    db = mysql_data.connect_mysql()
    cursor = db.cursor()
    cursor.execute("SELECt name from vital where id = %d " %asset_type_id)
    name = cursor.fetchall()
    #错误内容是 当时写name = cursor.fetchall() ,现在已经改成type_name
    type_name = ''.join('%s' %a for a in name )
    db.close()
    return type_name
#这个位置是初始化数据
def data_init():
    t = time.time()
    data = {
        # 必填不能为空  现在为毫秒级时间戳
        "ck_f_id" : str(int(round(t * 1000))),
        # 必填不能为空  现在为当前时间
        "ck_f_create_time" : f"{time.strftime('%Y-%m-%d',time.localtime(int(time.time())))}",
        # hash id 必填
        "ck_f_hash_id": f"{uuid.uuid4()}",
        # 这是需要添加的字段
        # 告警id 12位数字uuid
        "id" : f"""cf{shortuuid.ShortUUID(alphabet="0123456789").random(length=12)}""",
        # 告警名称
        "name" : f"告警名称{random.randint(2, 1100)}",
        # 告警等级
        "level" : random.randint(1, 5),
        # 告警产生时间 使用当前年月即可
        "time_gen" : f"{time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(int(time.time())))}",
        # 告警详情
        "content" :content[random.randint(0,3)],
        # 是否误报
        "false_positive": 1,
        #目标ip
        "target_ip":target_ip[random.randint(0,5)],
        #目标端口
        "target_port":fake.port_number(),
        #目标mac地址
        "target_mac":fake.mac_address(),
        #分类
        "category":category[random.randint(0,3)],
        #源IP
        "origin_ip":fake.ipv4(),
        #源mac地址
        "origin_mac":fake.mac_address(),
        #源端口
        "origin_port":fake.port_number(),
        #优先级
        "priority":random.randint(1,3),
        #设备类型
        "dev_type":asset_type_name(),

    }
    print(data)
    return data

#插入数据
def create_data():
    clickhouse_data = database(config['clickhouse']['host'], config['clickhouse']['port'], config['clickhouse']['user'], config['clickhouse']['password'])
    client=clickhouse_data.connect_clickhouce()
    database_name = "jiraiya"
    # 表名称
    table_name = 'vital_origin_incident'
    for i in range(10):
        # init_data = data_init()
        # print(init_data['level'])
        # ddd = data_add(init_data)
        # sql = f"INSERT INTO {database_name}.{table_name} ({','.join('%s' %id for id in ddd['column'])}) VALUES({','.join('%s' %id for id in ddd['value'])}),({','.join('%s' %id for id in ddd['value'])}),({','.join('%s' %id for id in ddd['value'])}),({','.join('%s' %id for id in ddd['value'])});"
        sql = f"INSERT INTO {database_name}.{table_name} FORMAT JSonEachRow {json.dumps(data_init()) * 1}"

        # print(sql)
        # 执行sql
        client.execute(sql)
        print(i)
  

当时报错的原因是***clickhouse***不支持比较复杂的***json***,然后我给mysql返回的数据类型直接插入到clickhouse数据库中,mysql返回的数据类型为tuple,需要先使用转换成字符串(因为我只需要字符串里面的值),所以就报错 。
把一下地方修改一下,然后再插入就可以了

 name = cursor.fetchall()
 #错误内容是 当时写name = cursor.fetchall() ,现在已经改成type_name
 type_name = ''.join('%s' %a for a in name )

这个方法借鉴于别人的思路超链接

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/339649.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号