- 用 sqoop 将数据从源数据库抽到 hive 初始化
sqoop import
#连接数据
--connect jdbc:mysql://zy-mysql.mysql.rds.aliyuncs.com:3306/$database_name
#数据库用户名
--username $username
#数据库密码
--password $password
#(要抽取的表)
--table $table_name
#根据/t 分割 数据为空则\N
--fields-terminated-by 't' --null-string '\N' --null-non-string '\N'
#map执行个数 一般为了数据正确性设为1
-m1
#放入到hive
--hive-import
#hive的数据库
--hive-database default
#创建表
--create-hive-table
#导入的表
--hive-table $table_name
#增量导入
--incremental append
#检查字段
--check-column id
- 用 sqoop 将增量数据从源数据库抽到 hive
sqoop import
#连接数据
--connect jdbc:mysql://zy-mysql.mysql.rds.aliyuncs.com:3306/$database_name
#数据库用户名
--username $username
#数据库密码
--password $password
#(要抽取的表)
--table $table_name
#根据/t 分割 数据为空则\N
--fields-terminated-by 't' --null-string '\N' --null-non-string '\N'
#map执行个数 一般为了数据正确性设为1
-m1
#放入到hive
--hive-import
#hive的数据库
--hive-database default
#导入的表
--hive-table $table_name
#增量导入
--incremental append
#检查字段
--check-column id
二.分区计算
- 抽取到 hive 的数据按照平台时间创建分区表并初始化
use default; -- 创建分区表 create external table zy_refund_part1(refund_amount decimal(9,2),total_amount decimal(9,2)) partitioned by (platform string,byyear string,bymonth string,byday string) row format delimited fields terminated by ',' --列分隔符 lines terminated by 'n' --行分隔符 null defined as '' --空值转换为空字符: HIVE本身的空值默认是N stored as textfile ; --文本存储 -- 设置非严格模式 set hive.exec.dynamic.partition.mode=nonstrict; -- 导入数据 insert into zy_refund_part1 partition (platform ,byyear,bymonth ,byday ) select sum(refund_amount),sum(total_amount),tenant_id as platform, from_unixtime(unix_timestamp(create_time,'yyyy-MM-dd HH:mm:ss'), 'yyyy') as byyear ,from_unixtime(unix_timestamp(create_time,'yyyy-MM-dd HH:mm:ss'), 'yyyy-MM') as bymonth, from_unixtime(unix_timestamp(create_time,'yyyy-MM-dd HH:mm:ss'), 'yyyy-MM-dd') as byday from zy_refund;
- 将 hive 增量数据定时抽取到分区表
#!/bin/bash
DT=`date -d '-1 day' "+%Y-%m-%d"`
#如果某天的数据有误需要重跑
if [ $1 ];then
DT=$1
fi
SQL="
select * from zy_refund_part1;set hive.exec.dynamic.partition.mode=nonstrict;insert into zy_refund_part1 partition (platform ,byyear,bymonth ,byday ) select
sum(refund_amount),sum(total_amount),tenant_id as platform, from_unixtime(unix_timestamp(create_time,'yyyy-MM-dd HH:mm:ss'), 'yyyy') as byyear ,from_unixtime(unix_timestamp(create_time,'yyyy-MM-dd HH:mm:ss'), 'yyyy-MM') as bymonth, from_unixtime(unix_timestamp(create_time,'yyyy-MM-dd HH:mm:ss'), 'yyyy-MM-dd') as byday from zy_refund where from_unixtime(unix_timestamp(create_time,'yyyy-MM-dd HH:mm:ss'), 'yyyy-MM-dd')='"${DT}"' group by tenant_id , from_unixtime(unix_timestamp(create_time,'yyyy-MM-dd HH:mm:ss'), 'yyyy') ,from_unixtime(unix_timestamp(create_time,'yyyy-MM-dd HH:mm:ss'), 'yyyy-MM') , from_unixtime(unix_timestamp(create_time,'yyyy-MM-dd HH:mm:ss'), 'yyyy-MM-dd')
"
echo "${SQL}"
hive -e "${SQL}"
三.将计算后的 hive 分区表数据导入关系型数据库
- 将计算后的 hive 分区表数据导入关系型数据库
#!/bin/bash sqoop export --connect jdbc:mysql://zy-mysql.mysql.rds.aliyuncs.com:3306/mp-payment --username mp_test --password '"u7i$Ox99zBlzrp91"' --table zy_refund_part --fields-terminated-by ',' --export-dir /user/hive/warehouse/zy_refund_part1四.用 AZKABAN 将所有需要定时处理的脚本管理起来
- 新建 shell 脚本 jobname.sh
#!/bin/bash #脚本内容
- 新建流文件 flowname.flow
flowchat nodes: - name: jobname type: command config: command: sh jobname.sh
- 新建项目 projectname.project
azkaban-flow-version: 2.0
-
将 jobname.flowname.flow 和 - 新建项目 projectname.project 打包成 jobname.zip
-
创建 jobname 项目=》上传 jobname.zip 文件=》执行作业=》观察结果
-
如果没有问题,在执行工作流时候,选择左下角 Schedule
-
然后在左面填写具体执行事件,填写的方法和 crontab 配置定时任务规则一致



