hive亿级数据导入ClickHouse,并每日导入
(技术工具看上文)
hive中表结构: 数据量7.6亿
DROp TABLE IF EXISTS dwd_ipqc_online;
CREATE EXTERNAL TABLE dwd_ipqc_online
(
MACH_ID string COMMENT '機台ID',
MACH_IP string COMMENT '機台IP',
CREATE_TIME string COMMENT '創建時間',
IPQC_onLINEID string COMMENT 'ID',
INS_TIME string COMMENT '插入時間',
PROD_SN string COMMENT '產品SN',
DOT_ID string COMMENT '点位',
DOT_VALUE string COMMENT '值'
) COMMENT '在線量測记录'
PARTITIonED BY (`dt` string)
STORED AS PARQUET
LOCATION '/warehouse/xx/dwd/dwd_ipqc_online/'
TBLPROPERTIES ("parquet.compression" = "lzo");
ClickHouse中表:采用本地表
drop table if exists dwd_ipqc_online;
create table dwd_ipqc_online
(
mach_id String comment '機台ID',
mach_ip String comment '機台IP',
create_time DateTime comment '創建時間',
ipqc_onlineid String comment 'ID',
ins_time DateTime comment '插入時間',
prod_sn String comment '產品SN',
dot_id String comment '点位',
dot_value String comment '值'
)engine =MergeTree
ORDER BY (create_time)
partition by toYYYYMMDD(create_time)
;
抽取脚本:
touch ~/bin/mytest1.sh && chmod u+x ~/bin/mytest1.sh && vim ~/bin/mytest1.sh
#!/bin/bash
# 环境变量
unset SPARK_HOME
export SPARK_HOME=$SPARK2_HOME
SEATUNNEL_HOME=/u/module/seatunnel-1.5.1
# 接收两个参数,第一个为要抽取的表,第二个为抽取时间
# 若输入的第一个值为first,不输入第二参数则直接退出脚本
if [[ $1 = first ]]; then
if [ -n "$2" ] ;then
do_date=$2
else
echo "请传入日期参数"
exit
fi
# 若输入的第一个值为all,不输入第二参数则取前一天
elif [[ $1 = all ]]; then
# 判断非空,如果不传时间默认取前一天数据,传时间就取设定,主要是用于手动传参
if [ -n "$2" ] ;then
do_date=$2
else
do_date=`date -d '-1 day' +%F`
fi
else
if [ -n "$2" ] ;then
do_date=$2
else
echo "请传入日期参数"
exit
fi
fi
echo "日期:$do_date"
import_conf(){
# 打印数据传输脚本并赋值
cat>$SEATUNNEL_HOME/jobs/hive2ck_test.conf<
相比之前的脚本,我主要修改了Spark的执行参数。
这里first是首日历史数据导入,all是每日增量更新
import_test仅仅是测试,主要看import_dwd_ipqc_online
所以这里首日我执行脚本:
mytest1.sh first 2021-11-30
测试无误,抽取结果:
我感觉抽取速度在100w+数据/s
实际:
>>> 7.6*10000*10000/2084
364683.30134357006
emm,果然直觉什么的都是骗人的,可能是开始快,后面就慢了,但整体还ok
需注意的错误:
java.lang.ClassCastException: java.sql.Timestamp cannot be cast to java.lang.String
hive中的string可直接转为ck中的时间,但datetime字段要截取一下,因为hive中时间timestamp,我这边在ods存的是timestamp,所以dwd也是timestamp,导出错误
Caused by: org.apache.parquet.hadoop.BadConfigurationException: Class com.hadoop.compression.lzo.LzoCodec was not found
这边hive中dwd压缩格式是parquet+lzo,读取出来没问题,插入时报错,我直接将之前搭建Hadoop集群时$HADOOP_HOME/share/hadoop/common/hadoop-lzo-0.4.20.jar放到/u/module/spark-2.4.8-bin-hadoop2.7/jars(spark目录下的jars)下,即可解决
另注意:
sql语句不能有分号
总结:
这样就完成了从hive导入ClickHouse,这个脚本可以放在Azkaban调度执行,后续将探索kafka+spark/flink 批量插入,毕竟这一套还是太重了。



