背景:本人之前是做数据分析的工作,在实习的期间的主要工作内容是写一些SQL语句,在这个过程中会进行很多次重复SQL的编写,事实上,作为一条业务线来说,主要的核心表就那么几张,然后不断的从中抽取字段形成中间表再聚合,效率很低,就此引发了我的思考,如何能在实际的工作中提高效率减少重复开发的成本,在得知数据仓库的概念之后呢,我觉得这正是我所缺少的,于是开始学习数据仓库的建模思想与实际的建模方法,这个小项目是跟着b站上学习的,抽出了其中一个业务线进行整理和描述作为学习的过程。
使用到的大数据组件有:(伪分布式)hadoop+zookeeper+sqoop+flume+kafka+hive
辅助的组件:MySQL+Linux+navicat
数仓的分层:采用维度建模
ods(原始数据)——dwd层(事实表)——dim层(维度表)——dws(宽表)——ads(需求层)
主要分为2部分:
1.业务数据库:MySQL------hdfs--------hive------ods,dim,dwd,dws,ads;
2.用户行为日志------flume----kafka-----flume-------hdfs-----hive----ods,dwd,dws,ads;
FIRST_PART(业务数据库)
ods层dwd层dim层SECOND_PART(用户行为数据)ods层dwd层 dws层ads层 总结
FIRST_PART(业务数据库)1.1使用项目中对应的sql建表语句和自动生成数据的sql在MySQL(navicat作为平台)中运行。(这里主要是形成对应数据,作为学习项目开展的依据)
1.2使用sqoop对MySQL进行抽取放入hdfs中,这里需要放置2个自动化SQL语句,
一个是首次装载,一个是每日装载
首次装载示例:
通过传入
1
和
1和
1和do_date,$2参数,将MySQL中的数据导入到hdfs中。
每日装载:
与首日装载其实差不多,不过因为是每日装载所以在SQL语句中的where部分进行了时间上的限制。
1.3从hdfs到ods层:这里也是分为hive表的创建以及首次的装载和每次装载。
Hive各种表的创建:
实例:
CREATE EXTERNAL TABLE ods_activity_info(
`id` STRING COMMENT '编号',
`activity_name` STRING COMMENT '活动名称',
`activity_type` STRING COMMENT '活动类型',
`start_time` STRING COMMENT '开始时间',
`end_time` STRING COMMENT '结束时间',
`create_time` STRING COMMENT '创建时间'
) COMMENT '活动信息表'
PARTITIonED BY (`dt` STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY 't'
LOCATION '/warehouse/gmall/ods/ods_activity_info/';
创建的都是外部表,外部表的话好处在于即使删除了hive的表,但是它的数据仍旧存储在hdfs中,提高误操作的容错率,坏处就是删除表时比较麻烦,不但需要删除hive的表还需要在hadoop中删除对应的路径:Hdfs dfs -r -f /xxxxx
首次装载:
这里使用hive中的load加载,把hdfs中的数据移动到这个表当中,overwrite的话是直接覆盖掉这张表。
每日装载:
至此ods层数据加载完毕,ODS层主要的作用还是用来放置原始数据,作为一个备份操作,一旦后面的操作出现重大的问题,还可以从ODS找到源数据。
1.4通过之前的项目学习,这个项目主要的业务线有订单,支付,退款,活动等业务线,本人选择订单业务线进行描述:
通过mysql主外键的关联和业务系统的学习,发现主要跟订单相关的表来自于ods层的以下几张表,然后通过这几张表的联结形成一张宽表,进而构建事实表与维度表
CREATE EXTERNAL TABLE ods_order_detail(
`id` STRING COMMENT '编号',
`order_id` STRING COMMENT '订单号',
`sku_id` STRING COMMENT '商品id',
`sku_name` STRING COMMENT '商品名称',
`order_price` DECIMAL(16,2) COMMENT '商品价格',
`sku_num` BIGINT COMMENT '商品数量',
`create_time` STRING COMMENT '创建时间',
`source_type` STRING COMMENT '来源类型',
`source_id` STRING COMMENT '来源编号',
`split_final_amount` DECIMAL(16,2) COMMENT '分摊最终金额',
`split_activity_amount` DECIMAL(16,2) COMMENT '分摊活动优惠',
`split_coupon_amount` DECIMAL(16,2) COMMENT '分摊优惠券优惠'
) COMMENT '订单详情表'
CREATE EXTERNAL TABLE ods_order_detail_activity(
`id` STRING COMMENT '编号',
`order_id` STRING COMMENT '订单号',
`order_detail_id` STRING COMMENT '订单明细id',
`activity_id` STRING COMMENT '活动id',
`activity_rule_id` STRING COMMENT '活动规则id',
`sku_id` BIGINT COMMENT '商品id',
`create_time` STRING COMMENT '创建时间')
COMMENT '订单详情活动关联表'
CREATE EXTERNAL TABLE ods_order_detail_coupon(
`id` STRING COMMENT '编号',
`order_id` STRING COMMENT '订单号',
`order_detail_id` STRING COMMENT '订单明细id',
`coupon_id` STRING COMMENT '优惠券id',
`coupon_use_id` STRING COMMENT '优惠券领用记录id',
`sku_id` STRING COMMENT '商品id',
`create_time` STRING COMMENT '创建时间'
) COMMENT '订单详情活动关联表'
CREATE EXTERNAL TABLE ods_order_info (
`id` STRING COMMENT '订单号',
`final_amount` DECIMAL(16,2) COMMENT '订单最终金额',
`order_status` STRING COMMENT '订单状态',
`user_id` STRING COMMENT '用户id',
`payment_way` STRING COMMENT '支付方式',
`delivery_address` STRING COMMENT '送货地址',
`out_trade_no` STRING COMMENT '支付流水号',
`create_time` STRING COMMENT '创建时间',
`operate_time` STRING COMMENT '操作时间',
`expire_time` STRING COMMENT '过期时间',
`tracking_no` STRING COMMENT '物流单编号',
`province_id` STRING COMMENT '省份ID',
`activity_reduce_amount` DECIMAL(16,2) COMMENT '活动减免金额',
`coupon_reduce_amount` DECIMAL(16,2) COMMENT '优惠券减免金额',
`original_amount` DECIMAL(16,2) COMMENT '订单原价金额',
`feight_fee` DECIMAL(16,2) COMMENT '运费',
`feight_fee_reduce` DECIMAL(16,2) COMMENT '运费减免'
) COMMENT '订单表'
根据以上的几个表的联结
(联结字段:
ods_order_detail.order_id,
ods_order_detail_activity.order_detail_id,
ods_order_detail_coupon.order_detail_id
ods_order_info.id),
以及订单粒度的声明;在订单这块粒度大小如下:一个订单里面可能包含多个商品,那么一个订单的一个商品的购买就是最小的粒度。在这个基础上这样我们就形成了一张非常大的宽表,这个宽表的每一行代表一个订单里的一个商品类型的购买)。
1.5一个事实表里面包含的是维度外键和度量值:这张订单表里:声明如下
维度有:时间,商品,活动,优惠劵,用户,省份,
度量值:商品价格,商品数量,分摊最终金额,分摊活动优惠,分摊优惠券优惠,
里面一些重复的字段和意义不大的字段没有添加:
还有一个维度退化:来源类型以及来源编号。
那么最终形成的订单事实表:
CREATE EXTERNAL TABLE dwd_order_detail (
`id` STRING COMMENT '订单编号',
`order_id` STRING COMMENT '订单号',
`user_id` STRING COMMENT '用户id',
`sku_id` STRING COMMENT 'sku商品id',
`province_id` STRING COMMENT '省份ID',
`activity_id` STRING COMMENT '活动ID',
`activity_rule_id` STRING COMMENT '活动规则ID',
`coupon_id` STRING COMMENT '优惠券ID',
`create_time` STRING COMMENT '创建时间',
`source_type` STRING COMMENT '来源类型',
`source_id` STRING COMMENT '来源编号',
`sku_num` BIGINT COMMENT '商品数量',
`original_amount` DECIMAL(16,2) COMMENT '原始价格',
`split_activity_amount` DECIMAL(16,2) COMMENT '活动优惠分摊',
`split_coupon_amount` DECIMAL(16,2) COMMENT '优惠券优惠分摊',
`split_final_amount` DECIMAL(16,2) COMMENT '最终价格分摊'
) COMMENT '订单明细事实表表'
PS:这里其实有很多事实表,项目还是比较大的,但是篇幅太长,就只选了一个表,更多的含义是分享操作过程。
Dwd_order_detail的装载过程:分为首日装载以及每日装载。
首日装载:
这里采用的是动态分区:因为是首日装载,我们选择了dt=2021-06-14,但是create_time不一定为2021-06-14
insert overwrite table dwd_order_detail partition(dt)
select
od.id,
od.order_id,
oi.user_id,
od.sku_id,
oi.province_id,
oda.activity_id,
oda.activity_rule_id,
odc.coupon_id,
od.create_time,
od.source_type,
od.source_id,
od.sku_num,
od.order_price*od.sku_num,
od.split_activity_amount,
od.split_coupon_amount,
od.split_final_amount,
date_format(create_time,'yyyy-MM-dd')
from
(
select
*
from ods_order_detail
where dt='2021-06-14'
)od
left join
(
select
id,
user_id,
province_id
from ods_order_info
where dt='2021-06-14'
)oi
on od.order_id=oi.id
left join
(
select
order_detail_id,
activity_id,
activity_rule_id
from ods_order_detail_activity
where dt='2021-06-14'
)oda
on od.id=oda.order_detail_id
left join
(
select
order_detail_id,
coupon_id
from ods_order_detail_coupon
where dt='2021-06-14'
)odc
on od.id=odc.order_detail_id;每日装载:
因为是一张事务性事实表(每天只会新增数据)增量同步然后按照dt来分区存储,所以partition(dt=2021-06-15),里面存储的就是2021-06-15这一天新增的数据。
insert overwrite table dwd_order_detail partition(dt='2021-06-15')
select
od.id,
od.order_id,
oi.user_id,
od.sku_id,
oi.province_id,
oda.activity_id,
oda.activity_rule_id,
odc.coupon_id,
od.create_time,
od.source_type,
od.source_id,
od.sku_num,
od.order_price*od.sku_num,
od.split_activity_amount,
od.split_coupon_amount,
od.split_final_amount
from
(
select
*
from ods_order_detail
where dt='2021-06-15'
)od
left join
(
select
id,
user_id,
province_id
from ods_order_info
where dt='2021-06-15'
)oi
on od.order_id=oi.id
left join
(
select
order_detail_id,
activity_id,
activity_rule_id
from ods_order_detail_activity
where dt='2020-06-15'
)oda
on od.id=oda.order_detail_id
left join
(
select
order_detail_id,
coupon_id
from ods_order_detail_coupon
where dt='2021-06-15'
)odc
on od.id=odc.order_detail_id;
至此dwd层完毕。
1.6 Dim层:维度表的创建:
在订单事实表里,维度有:商品,活动,优惠劵,用户,省份,时间。
一共6个维度,这里挑出商品和用户维度来描述
商品维度表
每行数据指的是关于商品的描述,把商品的属性作为维度表的列名,商品的属性来自于ods层与商品有关的表的列名(也就是基于MySQL源数据主外键的关系以及业务上的知识,把MySQL的多张表转化成一张表,增大冗余)。
CREATE EXTERNAL TABLE dim_sku_info (
`id` STRING COMMENT '商品id',
`price` DECIMAL(16,2) COMMENT '商品价格',
`sku_name` STRING COMMENT '商品名称',
`sku_desc` STRING COMMENT '商品描述',
`weight` DECIMAL(16,2) COMMENT '重量',
`is_sale` BOOLEAN COMMENT '是否在售',
`spu_id` STRING COMMENT 'spu编号',
`spu_name` STRING COMMENT 'spu名称',
`category3_id` STRING COMMENT '三级分类id',
`category3_name` STRING COMMENT '三级分类名称',
`category2_id` STRING COMMENT '二级分类id',
`category2_name` STRING COMMENT '二级分类名称',
`category1_id` STRING COMMENT '一级分类id',
`category1_name` STRING COMMENT '一级分类名称',
`tm_id` STRING COMMENT '品牌id',
`tm_name` STRING COMMENT '品牌名称',
`sku_attr_values` ARRAY> COMMENT '平台属性',
`sku_sale_attr_values` ARRAY> COMMENT '销售属性',
`create_time` STRING COMMENT '创建时间'
) COMMENT '商品维度表'
PARTITIonED BY (`dt` STRING)
采用的还是以天为分区的分区表,一般来说维度表的量都不大,采用全量同步并且首日装载与每日装载基本没有区别:insert语句如下:
with
sku as
(
select
id,
price,
sku_name,
sku_desc,
weight,
is_sale,
spu_id,
category3_id,
tm_id,
create_time
from ods_sku_info
where dt='2021-06-11'
),
spu as
(
select
id,
spu_name
from ods_spu_info
where dt='2021-06-11'
),
c3 as
(
select
id,
name,
category2_id
from ods_base_category3
where dt='2021-06-11'
),
c2 as
(
select
id,
name,
category1_id
from ods_base_category2
where dt='2021-06-11'
),
c1 as
(
select
id,
name
from ods_base_category1
where dt='2021-06-11'
),
tm as
(
select
id,
tm_name
from ods_base_trademark
where dt='2021-06-11'
),
attr as
(
select
sku_id,
collect_set(named_struct('attr_id',attr_id,'value_id',value_id,'attr_name',attr_name,'value_name',value_name)) attrs
from ods_sku_attr_value
where dt='2021-06-11'
group by sku_id
),
sale_attr as
(
select
sku_id,
collect_set(named_struct('sale_attr_id',sale_attr_id,'sale_attr_value_id',sale_attr_value_id,'sale_attr_name',sale_attr_name,'sale_attr_value_name',sale_attr_value_name)) sale_attrs
from ods_sku_sale_attr_value
where dt='2021-06-11'
group by sku_id
)
insert overwrite table dim_sku_info partition(dt='2021-06-11')
select
sku.id,
sku.price,
sku.sku_name,
sku.sku_desc,
sku.weight,
sku.is_sale,
sku.spu_id,
spu.spu_name,
sku.category3_id,
c3.name,
c3.category2_id,
c2.name,
c2.category1_id,
c1.name,
sku.tm_id,
tm.tm_name,
attr.attrs,
sale_attr.sale_attrs,
sku.create_time
from sku
left join spu on sku.spu_id=spu.id
left join c3 on sku.category3_id=c3.id
left join c2 on c3.category2_id=c2.id
left join c1 on c2.category1_id=c1.id
left join tm on sku.tm_id=tm.id
left join attr on sku.id=attr.sku_id
left join sale_attr on sku.id=sale_attr.sku_id;
1.7 用户维度表(拉链表):
拉链表相比于全量更新的话更省数据量,适用于有变化和新增但是数据量都不大的表
创建语句
CREATE EXTERNAL TABLE dim_user_info(
`id` STRING COMMENT '用户id',
`login_name` STRING COMMENT '用户名称',
`nick_name` STRING COMMENT '用户昵称',
`name` STRING COMMENT '用户姓名',
`phone_num` STRING COMMENT '手机号码',
`email` STRING COMMENT '邮箱',
`user_level` STRING COMMENT '用户等级',
`birthday` STRING COMMENT '生日',
`gender` STRING COMMENT '性别',
`create_time` STRING COMMENT '创建时间',
`operate_time` STRING COMMENT '操作时间',
`start_date` STRING COMMENT '开始日期',
`end_date` STRING COMMENT '结束日期'
) COMMENT '用户表'
PARTITIonED BY (`dt` STRING)
首日装载;
insert overwrite table dim_user_info partition(dt='9999-99-99')
Select
id,
login_name,
nick_name,
name,
phone_num,
email,
user_level,
birthday,
gender,
create_time,
operate_time,
'2021-06-10',
'9999-99-99'
from ods_user_info
where dt='2021-06-10';
每日装载:
with
tmp as
(
select
old.id old_id,
old.login_name old_login_name,
old.nick_name old_nick_name,
old.name old_name,
old.phone_num old_phone_num,
old.email old_email,
old.user_level old_user_level,
old.birthday old_birthday,
old.gender old_gender,
old.create_time old_create_time,
old.operate_time old_operate_time,
old.start_date old_start_date,
old.end_date old_end_date,
new.id new_id,
new.login_name new_login_name,
new.nick_name new_nick_name,
new.name new_name,
new.phone_num new_phone_num,
new.email new_email,
new.user_level new_user_level,
new.birthday new_birthday,
new.gender new_gender,
new.create_time new_create_time,
new.operate_time new_operate_time,
new.start_date new_start_date,
new.end_date new_end_date
from
(
select
id,
login_name,
nick_name,
name,
phone_num,
email,
user_level,
birthday,
gender,
create_time,
operate_time,
start_date,
end_date
from dim_user_info
where dt='9999-99-99'
)old
full outer join
(
select
id,
login_name,
nick_name,
md5(name) name,
md5(phone_num) phone_num,
md5(email) email,
user_level,
birthday,
gender,
create_time,
operate_time,
'2020-06-15' start_date,
'9999-99-99' end_date
from ods_user_info
where dt='2020-06-15'
)new
on old.id=new.id
)
insert overwrite table dim_user_info partition(dt)
select
nvl(new_id,old_id),
nvl(new_login_name,old_login_name),
nvl(new_nick_name,old_nick_name),
nvl(new_name,old_name),
nvl(new_phone_num,old_phone_num),
nvl(new_email,old_email),
nvl(new_user_level,old_user_level),
nvl(new_birthday,old_birthday),
nvl(new_gender,old_gender),
nvl(new_create_time,old_create_time),
nvl(new_operate_time,old_operate_time),
nvl(new_start_date,old_start_date),
nvl(new_end_date,old_end_date),
nvl(new_end_date,old_end_date) dt
from tmp
union all
select
old_id,
old_login_name,
old_nick_name,
old_name,
old_phone_num,
old_email,
old_user_level,
old_birthday,
old_gender,
old_create_time,
old_operate_time,
old_start_date,
cast(date_add('2020-06-15',-1) as string),
cast(date_add('2020-06-15',-1) as string) dt
from tmp
where new_id is not null and old_id is not null;
Dim层:
搭建完毕
SECOND_PART(用户行为数据)2.1 这个项目里面有对应生成的jar包,可以直接生成类似于日志类型的.log文件:
对应的log文件生成后,通过flume进行传输到kafka里面(source=taildir,channel=kafka,sink=null):
然后在kafka里面缓存后通过source=kafkasource channel=file channel sink=hdfs再传输到hdfs
在整个过程中分别要启动:
zookeeper-----./zkServer.sh start
Kafka:kafka/bin/kafka-server-start.sh -daemon /opt/hadoop-3.2.2/kafka/config/server.properties
Flume:nohup /opt/hadoop-3.2.2/flume/bin/flume-ng agent --name a1 --conf-file conf/file-flume-kafka.conf
2.2 然后再hive中创建对应的表:按天分区表ods_log(line stirng)
再通过load语句将hdfs里面的log文件移动到hive 中的ods层(原始数据层),每一行代表一个日志,日志的结构分为两种:页面埋点日志(包括:基本信息,动作数组,页面数组,错误信息)和启动日志(基本信息,启动信息,错误信息)
E,g:
解析的思路是按照日志的内容进行解析:分为:启动日志,页面日志,动作日志,曝光日志,错误日志
拿出页面日志表进行描述:
2.3 在ods中进行抽取搭建dwd层:
CREATE EXTERNAL TABLE dwd_page_log(
`area_code` STRING COMMENT '地区编码',
`brand` STRING COMMENT '手机品牌',
`channel` STRING COMMENT '渠道',
`is_new` STRING COMMENT '是否首次启动',
`model` STRING COMMENT '手机型号',
`mid_id` STRING COMMENT '设备id',
`os` STRING COMMENT '操作系统',
`user_id` STRING COMMENT '会员id',
`version_code` STRING COMMENT 'app版本号',
`during_time` BIGINT COMMENT '持续时间毫秒',
`page_item` STRING COMMENT '目标id ',
`page_item_type` STRING COMMENT '目标类型',
`last_page_id` STRING COMMENT '上页类型',
`page_id` STRING COMMENT '页面ID ',
`source_type` STRING COMMENT '来源类型',
`ts` bigint
) COMMENT '页面日志表'
PARTITIonED BY (`dt` STRING)
STORED AS PARQUET
LOCATION '/warehouse/gmall/dwd/dwd_page_log'
TBLPROPERTIES('parquet.compression'='lzo');
装载语句:
insert overwrite table dwd_page_log partition(dt='2021-06-10')
select
get_json_object(line,'$.common.ar'),
get_json_object(line,'$.common.ba'),
get_json_object(line,'$.common.ch'),
get_json_object(line,'$.common.is_new'),
get_json_object(line,'$.common.md'),
get_json_object(line,'$.common.mid'),
get_json_object(line,'$.common.os'),
get_json_object(line,'$.common.uid'),
get_json_object(line,'$.common.vc'),
get_json_object(line,'$.page.during_time'),
get_json_object(line,'$.page.item'),
get_json_object(line,'$.page.item_type'),
get_json_object(line,'$.page.last_page_id'),
get_json_object(line,'$.page.page_id'),
get_json_object(line,'$.page.source_type'),
get_json_object(line,'$.ts')
from ods_log
where dt='2021-06-10'
and get_json_object(line,'$.page') is not null;
dws层
3.1 dws层:汇总数据层,这里按照维度表进行主题的搭建,这里我们选用户维度对事实表的度量值进行轻度的时间维度以天为粒度上的汇总(用户主题)。
表的字段来源之前事实表里面与用户相关的字段
CREATE EXTERNAL TABLE dws_user_action_daycount
(
`user_id` STRING COMMENT '用户id',
`login_count` BIGINT COMMENT '登录次数',
`order_count` BIGINT COMMENT '下单次数',
`order_activity_count` BIGINT COMMENT '订单参与活动次数',
`order_activity_reduce_amount` DECIMAL(16,2) COMMENT '订单减免金额(活动)',
`order_coupon_count` BIGINT COMMENT '订单用券次数',
`order_coupon_reduce_amount` DECIMAL(16,2) COMMENT '订单减免金额(优惠券)',
`order_original_amount` DECIMAL(16,2) COMMENT '订单单原始金额',
`order_final_amount` DECIMAL(16,2) COMMENT '订单总金额',
`payment_count` BIGINT COMMENT '支付次数',
`payment_amount` DECIMAL(16,2) COMMENT '支付金额',
`refund_order_count` BIGINT COMMENT '退单次数',
`refund_order_num` BIGINT COMMENT '退单件数',
`refund_order_amount` DECIMAL(16,2) COMMENT '退单金额',
`refund_payment_count` BIGINT COMMENT '退款次数',
`refund_payment_num` BIGINT COMMENT '退款件数',
`refund_payment_amount` DECIMAL(16,2) COMMENT '退款金额',
`coupon_get_count` BIGINT COMMENT '优惠券领取次数',
`coupon_using_count` BIGINT COMMENT '优惠券使用(下单)次数',
`coupon_used_count` BIGINT COMMENT '优惠券使用(支付)次数',
`appraise_good_count` BIGINT COMMENT '好评数',
`appraise_mid_count` BIGINT COMMENT '中评数',
`appraise_bad_count` BIGINT COMMENT '差评数',
`appraise_default_count` BIGINT COMMENT '默认评价数',
`order_detail_stats` array> COMMENT '下单明细统计'
) COMMENT '每日用户行为'
PARTITIonED BY (`dt` STRING)
首日装载:
with
tmp_login as
(
select
dt,
user_id,
count(*) login_count
from dwd_page_log
where user_id is not null
and last_page_id is null
group by dt,user_id
),
tmp_pay as
(
select
date_format(callback_time,'yyyy-MM-dd') dt,
user_id,
count(*) payment_count,
sum(payment_amount) payment_amount
from dwd_payment_info
group by date_format(callback_time,'yyyy-MM-dd'),user_id
),
tmp_ri as
(
select
date_format(create_time,'yyyy-MM-dd') dt,
user_id,
count(*) refund_order_count,
sum(refund_num) refund_order_num,
sum(refund_amount) refund_order_amount
from dwd_order_refund_info
group by date_format(create_time,'yyyy-MM-dd'),user_id
),
tmp_rp as
(
select
date_format(callback_time,'yyyy-MM-dd') dt,
rp.user_id,
count(*) refund_payment_count,
sum(ri.refund_num) refund_payment_num,
sum(rp.refund_amount) refund_payment_amount
from
(
select
user_id,
order_id,
sku_id,
refund_amount,
callback_time
from dwd_refund_payment
)rp
left join
(
select
user_id,
order_id,
sku_id,
refund_num
from dwd_order_refund_info
)ri
on rp.order_id=ri.order_id
and rp.sku_id=rp.sku_id
group by date_format(callback_time,'yyyy-MM-dd'),rp.user_id
),
tmp_coupon as
(
select
coalesce(coupon_get.dt,coupon_using.dt,coupon_used.dt) dt,
coalesce(coupon_get.user_id,coupon_using.user_id,coupon_used.user_id) user_id,
nvl(coupon_get_count,0) coupon_get_count,
nvl(coupon_using_count,0) coupon_using_count,
nvl(coupon_used_count,0) coupon_used_count
from
(
select
date_format(get_time,'yyyy-MM-dd') dt,
user_id,
count(*) coupon_get_count
from dwd_coupon_use
where get_time is not null
group by user_id,date_format(get_time,'yyyy-MM-dd')
)coupon_get
full outer join
(
select
date_format(using_time,'yyyy-MM-dd') dt,
user_id,
count(*) coupon_using_count
from dwd_coupon_use
where using_time is not null
group by user_id,date_format(using_time,'yyyy-MM-dd')
)coupon_using
on coupon_get.dt=coupon_using.dt
and coupon_get.user_id=coupon_using.user_id
full outer join
(
select
date_format(used_time,'yyyy-MM-dd') dt,
user_id,
count(*) coupon_used_count
from dwd_coupon_use
where used_time is not null
group by user_id,date_format(used_time,'yyyy-MM-dd')
)coupon_used
on nvl(coupon_get.dt,coupon_using.dt)=coupon_used.dt
and nvl(coupon_get.user_id,coupon_using.user_id)=coupon_used.user_id
),
tmp_comment as
(
select
date_format(create_time,'yyyy-MM-dd') dt,
user_id,
sum(if(appraise='1201',1,0)) appraise_good_count,
sum(if(appraise='1202',1,0)) appraise_mid_count,
sum(if(appraise='1203',1,0)) appraise_bad_count,
sum(if(appraise='1204',1,0)) appraise_default_count
from dwd_comment_info
group by date_format(create_time,'yyyy-MM-dd'),user_id
),
tmp_od as
(
select
dt,
user_id,
collect_set(named_struct('sku_id',sku_id,'sku_num',sku_num,'order_count',order_count,'activity_reduce_amount',activity_reduce_amount,'coupon_reduce_amount',coupon_reduce_amount,'original_amount',original_amount,'final_amount',final_amount)) order_detail_stats
from
(
select
date_format(create_time,'yyyy-MM-dd') dt,
user_id,
sku_id,
sum(sku_num) sku_num,
count(*) order_count,
cast(sum(split_activity_amount) as decimal(16,2)) activity_reduce_amount,
cast(sum(split_coupon_amount) as decimal(16,2)) coupon_reduce_amount,
cast(sum(original_amount) as decimal(16,2)) original_amount,
cast(sum(split_final_amount) as decimal(16,2)) final_amount
from dwd_order_detail
group by date_format(create_time,'yyyy-MM-dd'),user_id,sku_id
)t1
group by dt,user_id
),
tmp_order as
(
select
date_format(create_time,'yyyy-MM-dd') dt,
user_id,
count(*) order_count,
sum(if(activity_reduce_amount>0,1,0)) order_activity_count,
sum(if(coupon_reduce_amount>0,1,0)) order_coupon_count,
sum(activity_reduce_amount) order_activity_reduce_amount,
sum(coupon_reduce_amount) order_coupon_reduce_amount,
sum(original_amount) order_original_amount,
sum(final_amount) order_final_amount
from dwd_order_info
group by date_format(create_time,'yyyy-MM-dd'),user_id
)
insert overwrite table dws_user_action_daycount partition(dt)
select
coalesce(tmp_login.user_id,tmp_order.user_id,tmp_pay.user_id,tmp_ri.user_id,tmp_rp.user_id,tmp_comment.user_id,tmp_coupon.user_id,tmp_od.user_id),
nvl(login_count,0),
Nvl(order_count,0),
nvl(order_activity_count,0),
nvl(order_activity_reduce_amount,0),
nvl(order_coupon_count,0),
nvl(order_coupon_reduce_amount,0),
nvl(order_original_amount,0),
nvl(order_final_amount,0),
nvl(payment_count,0),
nvl(payment_amount,0),
nvl(refund_order_count,0),
nvl(refund_order_num,0),
nvl(refund_order_amount,0),
nvl(refund_payment_count,0),
nvl(refund_payment_num,0),
nvl(refund_payment_amount,0),
nvl(coupon_get_count,0),
nvl(coupon_using_count,0),
nvl(coupon_used_count,0),
nvl(appraise_good_count,0),
nvl(appraise_mid_count,0),
nvl(appraise_bad_count,0),
nvl(appraise_default_count,0),
order_detail_stats,
coalesce(tmp_login.dt,tmp_order.dt,tmp_pay.dt,tmp_ri.dt,tmp_rp.dt,tmp_comment.dt,tmp_coupon.dt,tmp_od.dt)
from tmp_login
Full join
tmp_order on tmp_order.user_id=tmp_login.user_id and
tmp_order.dt=tmp_login.dt
full outer join tmp_pay
on coalesce(tmp_login.user_id,
tmp_order.user_id)=tmp_pay.user_id
and coalesce(tmp_login.dt,tmp_order.dt)=tmp_pay.dt
full outer join tmp_ri
on coalesce(tmp_login.user_id,
tmp_order.user_id,tmp_pay.user_id)=tmp_ri.user_id
and coalesce(tmp_login.dt,tmp_order.dt,tmp_pay.dt)=tmp_ri.dt
full outer join tmp_rp
on coalesce(tmp_login.user_id,tmp_order.user_id,tmp_pay.user_id,tmp_ri.user_id)=tmp_rp.user_id
and coalesce(tmp_login.dt,tmp_order.dt,tmp_pay.dt,tmp_ri.dt)=tmp_rp.dt
full outer join tmp_comment
on coalesce(tmp_login.user_id,tmp_order.user_id,tmp_pay.user_id,tmp_ri.user_id,tmp_rp.user_id)=tmp_comment.user_id
and coalesce(tmp_login.dt,tmp_order.dt,tmp_pay.dt,tmp_ri.dt,tmp_rp.dt)=tmp_comment.dt
full outer join tmp_coupon
on coalesce(tmp_login.user_id,tmp_order.user_id,tmp_pay.user_id,tmp_ri.user_id,tmp_rp.user_id,tmp_comment.user_id)=tmp_coupon.user_id
and coalesce(tmp_login.dt,tmp_order.dt,tmp_pay.dt,tmp_ri.dt,tmp_rp.dt,tmp_comment.dt)=tmp_coupon.dt
full outer join tmp_od
on coalesce(tmp_login.user_id,tmp_order.user_id,tmp_pay.user_id,tmp_ri.user_id,tmp_rp.user_id,tmp_comment.user_id,tmp_coupon.user_id)=tmp_od.user_id
and coalesce(tmp_login.dt,tmp_order.dt,tmp_pay.dt,tmp_ri.dt,tmp_rp.dt,tmp_comment.dt,tmp_coupon.dt)=tmp_od.dt;
3.2 在首日装载语句里面使用了大量的full join ,这个目的是为了把数据取全,高可靠,所以写入的效率比较慢。
每日装载与首日装载相似,分组字段只选取user_id并且dt设置为当天日期即可(有的不一定是设置dt,也有可能是别的一些事实表时间字段,只有当dt所代表的意义与事实表时间字段意义相同时可以使用dt)。
至此 dws的数据就装载完毕了
4.0 ads层:ads的搭建主要依赖于业务需求方面,然后根据我们每天的用户数据去根据不同的维度去聚合,形成一些结果表,然后我们把它存储在mysql中,提供给业务查询即可。(这里没有像之前别的层一样展开来写,主要是因为这些业务数据都是自动生成的没有实际的业务需求,所以没有表达)
总结流程还是比较粗糙的,后面大的全流程调度和权限管理还没来得及学习。好处是对于我而言,在数据领域添加了一些实实在在的技能,对于以后的求职学习都会有长远的帮助



