栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Sqoop Mysql Hive同步

Sqoop Mysql Hive同步

统一文件存储格式Parquet,使用Impala进行计算

全量同步:

sqoop import 
 --username 'root' 
 --password '000' 
 --connect "jdbc:mysql://cm1:3306/demo?useUnicode=true&characterEncoding=utf8" 
 --table a 
 --hcatalog-database default 
 --hcatalog-table a 
 --hcatalog-partition-keys dt 
 --hcatalog-partition-values '2022-02-08' 
 --fields-terminated-by '01' 
 --lines-terminated-by 'n' 
 --hive-drop-import-delims 
 --null-string '\N' 
 --null-non-string '\N' 
 --split-by id 
 -m 10
sqoop import 
--username root 
--password '000' 
--connect "jdbc:mysql://cm1:3306/demo?useUnicode=true&characterEncoding=utf8"  
--query "select id ,name, age, ads, birthday, hobday, gzads , source from student where ads='北京市丰台区南苑机场警备东路6号一区' and $ConDITIONS limit 100 " 
--hcatalog-database default 
--hcatalog-table student 
--hcatalog-partition-keys dt 
--hcatalog-partition-values '2022-02-08' 
--fields-terminated-by '001' 
--lines-terminated-by 'n' 
--hive-drop-import-delims 
--null-string '\N' 
--null-non-string '\N' 
--split-by id 
--m 3

增量同步:同时需要对历史修改的记录进行同步,且自动获取last-value最新时间,这里创建sqoop job 然后通过执行sqoop job 后可以自动保存增量数据里最新的last-value值就不用手动去指定了,但是sqoop 在自动保存最新的last-value值的时候要比原表中最后的last-value值要大一点,这个不会出现增量导入数据不一致的问题所以不用担心,并且由于hcatalog的lastmodified模式中merge-key不生效…也就导致了无法将Mysql里历史数据中修改过的记录和Hive中的历史数据进行合并,最后会导致Hive里数据重复,数据和MYSQL数据不一致的问题,所以在进行增量同步的时候使用传统方式,但是由于Hive中的表是Parquet格式,在同步的时候无法将MYSQL表里timestamp类型的数据转换成对应的类型,Sqoop在转换Parquet格式文件时会自动把Mysql里timestamp类型的数据转换成Int类型,这样就导致最后格式不对入库失败,所以需要通过map-column-java crt_date=String 来表示Sqoop在解析该字段的时候会把该字段自动转化成String来解析,通过map-column-hive crt_date=String指定Hive中日期字段的类型认为是String,这样才能解析成功,如果不加这2个参数,Mysql里如果有timestamp类型的数据最后会解析失败.

sqoop job 
--create ajob 
-- import 
--username 'root' 
--password '000' 
--table a 
--connect "jdbc:mysql://cm1:3306/demo?useUnicode=true&characterEncoding=utf8" 
--map-column-java crt_date=String 
--map-column-hive crt_date=String 
--incremental lastmodified 
--check-column crt_date 
--last-value "2022-02-01 13:35:41" 
--merge-key id 
--target-dir /user/hive/warehouse/a/dt=2022-02-08 
--as-parquetfile 
--split-by id 
-m 3

sqoop job 
--create ajob 
-- import 
--username 'root' 
--password '000' 
--connect "jdbc:mysql://cm1:3306/demo?useUnicode=true&characterEncoding=utf8" 
--table a 
--hive-database default 
--hive-table a 
--map-column-java crt_date=String 
--map-column-hive crt_date=String 
--fields-terminated-by '01' 
--incremental lastmodified 
--check-column crt_date 
--last-value "2022-02-07 13:35:41" 
--merge-key id 
--target-dir /user/hive/warehouse/a/dt=2022-02-08 
--as-parquetfile 
-m 1

sqoop import 
--username 'root' 
--password '000' 
--query "select id,name,crt_date from a where $CONDITIONS" 
--connect "jdbc:mysql://cm1:3306/demo?useUnicode=true&characterEncoding=utf8" 
--map-column-java crt_date=String 
--map-column-hive crt_date=String 
--incremental lastmodified 
--check-column crt_date 
--last-value "2022-02-09 17:42:19" 
--merge-key id 
--target-dir /user/hive/warehouse/a/dt=2022-02-08 
--as-parquetfile 
-m 1

这两种方式都可以,last-value随便指定一个就行了,跑完一次sqoop job以后 sqoop会自动保存last-value的值

sqoop job --exec ajob
sqoop job --delete ajob
sqoop job --show ajob
sqoop job --list a job 

最后基于Parquet格式导出Mysql

sqoop export 
--username root 
--password '000' 
--connect "jdbc:mysql://cm1:3306/demo?useUnicode=true&characterEncoding=utf8"  
--table student_hive 
--hcatalog-database default 
--hcatalog-table student 
--hcatalog-partition-keys dt 
--hcatalog-partition-values '2022-02-08' 
--num-mappers 3 

需要在impala里做的相关操作

alter table a add partition(dt='2022-02-08');
refresh a partition(dt='2022-02-08');
alter table a drop partition(dt='2022-02-08')

这里使用定时监听启动Sqoop启动指令,启动指令是由外部程序发送到MYSQL里的字符串,3分钟监听一次,如果查询到了该字符串就启动Sqoop,Sqoop运行完成成,将MYSQL里的启动指令字符串删除等待程序发送的下一次指令.

#!/bin/sh
hostname=localhost
port=3306
username=root
password=000

dbname=demo
tbname=sync_command

select_sql_tb1="select * from ${dbname}.${tbname} where syn_str='000001' and syn_table='tb1'"
tb1_result=$(/opt/mysql-5.7.25/bin/mysql -h${hostname} -P${port} -u${username} -p${password} 2>/dev/null -e "${select_sql_tb1}")
delete_sql_tb1="delete from ${dbname}.${tbname} where syn_str='000001' and syn_table='tb1'"
if [ ! -n "$tb1_result" ]; then
  echo "没有获取到tb1表同步指令" >> /opt/tb1.log
else
  echo "成功获取tb1表同步指令开始同步....." >> /opt/tb1.log
  echo $tb1_result
  /opt/mysql-5.7.25/bin/mysql -h${hostname} -P${port} -u${username} -p${password} 2>/dev/null -e "${delete_sql_tb1}"
  echo "已成功同步完成tb1表删除同步记录等待下次同步命令...." >>/opt/tb1.log 
fi
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/734766.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号