最近工作需要使用clickhouse,然后在使用python插入clickhouse数据中,报错 clickhouse : Cannot parse JSON string: expected opening quote: (while read the value of key data): (at row 1): While executing SourceFromInputStream,下面的错误代码
#这块主要是从mysql设备中获取数据,主要错误原因就是这里(现在截图是已经改完的)
def asset_type_name():
mysql_data = database(config['mysql']['host'], config['mysql']['port'], config['mysql']['user'], config['mysql']['password'])
db = mysql_data.connect_mysql()
cursor = db.cursor()
cursor.execute("SELECt name from vital where id = %d " %asset_type_id)
name = cursor.fetchall()
#错误内容是 当时写name = cursor.fetchall() ,现在已经改成type_name
type_name = ''.join('%s' %a for a in name )
db.close()
return type_name
#这个位置是初始化数据
def data_init():
t = time.time()
data = {
# 必填不能为空 现在为毫秒级时间戳
"ck_f_id" : str(int(round(t * 1000))),
# 必填不能为空 现在为当前时间
"ck_f_create_time" : f"{time.strftime('%Y-%m-%d',time.localtime(int(time.time())))}",
# hash id 必填
"ck_f_hash_id": f"{uuid.uuid4()}",
# 这是需要添加的字段
# 告警id 12位数字uuid
"id" : f"""cf{shortuuid.ShortUUID(alphabet="0123456789").random(length=12)}""",
# 告警名称
"name" : f"告警名称{random.randint(2, 1100)}",
# 告警等级
"level" : random.randint(1, 5),
# 告警产生时间 使用当前年月即可
"time_gen" : f"{time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(int(time.time())))}",
# 告警详情
"content" :content[random.randint(0,3)],
# 是否误报
"false_positive": 1,
#目标ip
"target_ip":target_ip[random.randint(0,5)],
#目标端口
"target_port":fake.port_number(),
#目标mac地址
"target_mac":fake.mac_address(),
#分类
"category":category[random.randint(0,3)],
#源IP
"origin_ip":fake.ipv4(),
#源mac地址
"origin_mac":fake.mac_address(),
#源端口
"origin_port":fake.port_number(),
#优先级
"priority":random.randint(1,3),
#设备类型
"dev_type":asset_type_name(),
}
print(data)
return data
#插入数据
def create_data():
clickhouse_data = database(config['clickhouse']['host'], config['clickhouse']['port'], config['clickhouse']['user'], config['clickhouse']['password'])
client=clickhouse_data.connect_clickhouce()
database_name = "jiraiya"
# 表名称
table_name = 'vital_origin_incident'
for i in range(10):
# init_data = data_init()
# print(init_data['level'])
# ddd = data_add(init_data)
# sql = f"INSERT INTO {database_name}.{table_name} ({','.join('%s' %id for id in ddd['column'])}) VALUES({','.join('%s' %id for id in ddd['value'])}),({','.join('%s' %id for id in ddd['value'])}),({','.join('%s' %id for id in ddd['value'])}),({','.join('%s' %id for id in ddd['value'])});"
sql = f"INSERT INTO {database_name}.{table_name} FORMAT JSonEachRow {json.dumps(data_init()) * 1}"
# print(sql)
# 执行sql
client.execute(sql)
print(i)
当时报错的原因是***clickhouse***不支持比较复杂的***json***,然后我给mysql返回的数据类型直接插入到clickhouse数据库中,mysql返回的数据类型为tuple,需要先使用转换成字符串(因为我只需要字符串里面的值),所以就报错 。
把一下地方修改一下,然后再插入就可以了
name = cursor.fetchall()
#错误内容是 当时写name = cursor.fetchall() ,现在已经改成type_name
type_name = ''.join('%s' %a for a in name )
这个方法借鉴于别人的思路超链接



