数仓项目实战的笔记
作业题1本次作业要求最近七天连续三天的活跃会员数。我才用的是small_startlog小数据集,一共是三天的log:
其中一天的log大概是9k~1w条数据。
那么分析得一下步骤:
- 数据采集
自定义拦截器, 对数据进行一定的分类并获取时间
public Event intercept(Event event) { MapheadersMap = event.getHeaders(); //1、获取 event 的 body String eventBody = new String(event.getBody(), Charsets.UTF_8); // 3、解析body获取json串 String[] bodyArr = eventBody.split("\s+"); try { String jsonStr = bodyArr[6]; //4、解析json串获取时间戳 JSONObject jsonObject = JSON.parseObject(jsonStr); String timestampStr = jsonObject.getJSONObject("app_active").getString("time"); //5、将时间戳转换为字符串 "yyyy-MM-dd" long timestamp = Long.parseLong(timestampStr); DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyy-MM-dd"); Instant instant = Instant.ofEpochMilli(timestamp); LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault()); String date = formatter.format(localDateTime); //6、将转换后的字符串放置header中 headersMap.put("logtime", date); event.setHeaders(headersMap); } catch (Exception e){ headersMap.put("logtime", "Unkown"); event.setHeaders(headersMap); } return event; } 配置flume使用的conf文件:
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # taildir source a1.sources.r1.type = TAILDIR a1.sources.r1.positionFile = /data/lagoudw/conf/startlog_position.json a1.sources.r1.filegroups = f1 f2 a1.sources.r1.filegroups.f1 = /data/lagoudw/logs/start/.*log a1.sources.r1.headers.f1.logtype = start a1.sources.r1.filegroups.f2 = /data/lagoudw/logs/event/.*log a1.sources.r1.headers.f2.logtype = event # 自定义拦截器 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = cn.lagou.dw.interceptor.LogTypeInterceptor$Builder # memorychannel a1.channels.c1.type = memory a1.channels.c1.capacity = 500000 a1.channels.c1.transactionCapacity = 20000 # hdfs sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /user/data/logs/%{logtype}/dt=%{logtime}/ a1.sinks.k1.hdfs.filePrefix = startlog. a1.sinks.k1.hdfs.fileType = DataStream # 配置文件滚动方式(文件大小1G) a1.sinks.k1.hdfs.rollSize = 130000000 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.rollInterval = 0 a1.sinks.k1.hdfs.idleTimeout = 0 a1.sinks.k1.hdfs.minBlockReplicas = 1 # 向hdfs上刷新的event的个数 a1.sinks.k1.hdfs.batchSize = 10000 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1最后使用flume进行数据采集:
flume-ng agent --conf /opt/apps/flume-1.9/conf --conf-file /data/lagoudw/conf/flume-log2hdfs4.conf -name a1 -Dflume.root.logger=INFO,console
- 创建ods层表
创建ods start log表,从/user/data/logs/start处导入数据:
use ODS; create external table ods.ods_start_log( `str` string) comment '用户启动日志信息' partitioned by (`dt` string) location '/user/data/logs/start';
为此表加入相应的分区:
alter table ods.ods_start_log add partition(dt='2020-07-21') alter table ods.ods_start_log add partition(dt='2020-07-22') alter table ods.ods_start_log add partition(dt='2020-07-23')
- 创建dwd层表并对数据进行清洗
sql=" with tmp as( select split(str, ' ')[7] line from ods.ods_start_log where dt='$do_date' ) insert overwrite table dwd.dwd_start_log partition(dt='$do_date') select get_json_object(line, '$.attr.device_id'), get_json_object(line, '$.attr.area'), get_json_object(line, '$.attr.uid'), get_json_object(line, '$.attr.app_v'), get_json_object(line, '$.attr.event_type'), get_json_object(line, '$.attr.os_type'), get_json_object(line, '$.attr.channel'), get_json_object(line, '$.attr.language'), get_json_object(line, '$.attr.brand'), get_json_object(line, '$.app_active.json.entry'), get_json_object(line, '$.app_active.json.action'), get_json_object(line, '$.app_active.json.error_code') from tmp; " hive -e "$sql"
dwd_start_log就是每日会员启动信息明细,为后续dws做准备
- 计算活跃会员
通过dwd_start_Log, 我们可以在dws层计算每日的活跃会员:
insert overwrite table dws.dws_member_start_day partition(dt='$do_date') select device_id, concat_ws('|', collect_set(uid)), concat_ws('|', collect_set(app_v)), concat_ws('|', collect_set(os_type)), concat_ws('|', collect_set(language)), concat_ws('|', collect_set(channel)), concat_ws('|', collect_set(area)), concat_ws('|', collect_set(brand)) from dwd.dwd_start_log where dt='$do_date' group by device_id; - 计算每日新增会员
通过每日活跃会员来计算dws层每日新增会员:
insert into table dws.dws_member_add_day select t1.device_id, t1.uid, t1.app_v, t1.os_type, t1.language, t1.channel, t1.area, t1.brand, '$do_date' from dws.dws_member_start_day t1 left join dws.dws_member_add_day t2 on t1.device_id=t2.device_id where t1.dt='$do_date' and t2.device_id is null; -- t1表中的device_id如果没有在t2中出现过,那么此用户就是当天的新增用户,将他的信息加到对应新增会员的表中 -- 接下来就可以计算每日新增会员数了: insert overwrite table ads.ads_new_member_cnt partition (dt='$do_date') select count(1) from dws.dws_member_add_day where dt = '$do_date'
至此,新增会员数计算完毕,我们会得到如下的一张表:
cnt(new member count) dt(new member register date) 9972 2020-07-21 5027 2020-07-22 4000 2020-07-23 - 计算连续三天留存会员
因为我们需要计算的是连续三天,就是7.21, 7.22, 7.23 这三天都有登录记录的人,满足这类用户必须在7.21日注册,并且7.22, 7.23都登陆,我们可以分别求出两类用户并取交集:
- 7.21注册并且7.22登录
- 7.21注册并且7.23登录
-- 6.1 创建记录连续n天登录的会员明细表 drop table if exists dws.dws_member_retention_nday; create table dws.dws_member_retention_nday ( `device_id` string, `uid` string, `app_v` string, `os_type` string, `language` string, `channel` string, `area` string, `brand` string, `add_date` string comment '会员新增时间', `retention_date` int comment '连续留存天数' )COMMENT '每日会员留存明细' PARTITIonED BY (`dt` string) stored as parquet; -- 6.2 插入数据(仅仅插入了符合要求的连续三天登陆的用户) insert overwrite table dws.dws_member_retention_nday partition(dt='2020-07-23') ( select t2.device_id, t2.uid, t2.app_v, t2.os_type, t2.language, t2.channel, t2.area, t2.brand, t2.dt add_date, 3 retention_nday from dws.dws_member_start_day t1 join dws.dws_member_add_day t2 on t1.device_id=t2.device_id where t2.dt=date_add('2020-07-23', -2) and t1.dt='2020-07-23' and t2.device_id in ( select t2.device_id from dws.dws_member_start_day t1 join dws.dws_member_add_day t2 on t1.device_id=t2.device_id where t2.dt=date_add('2020-07-23', -2) and t1.dt='2020-07-22' ) ); -- 接下来进入ads层汇总计算 -- 6.3 创建连续登录n天的会员数表 drop table if exists ads.ads_member_retention_nday_count; create table ads.ads_member_retention_nday_count ( `add_date` string comment '新增日期', `retention_nday` int comment '截止当前日期连续活跃天数', `retention_count` bigint comment '留存数' ) COMMENT '会员留存数' partitioned by(dt string) row format delimited fields terminated by ','; -- 6.4 从dws_member_retention_nday 插入数据 insert overwrite table ads.ads_member_retention_nday_count partition (dt='2020-07-23') select add_date, retention_date, count(*) retention_count from dws.dws_member_retention_nday where dt='2020-07-23' group by add_date, retention_date; -- 6.6 连续n天用户活跃率 drop table if exists ads.ads_member_retention_nday_rate; create table ads.ads_member_retention_nday_rate ( `add_date` string comment '新增日期', `retention_nday` int comment '截止当前日期连续活跃天数', `retention_count` bigint comment '留存数', `new_mid_count` bigint comment '当日会员新增数', `retention_ratio` decimal(10,2) comment '留存率' ) COMMENT '会员留存率' partitioned by(dt string) row format delimited fields terminated by ','; -- 6.7 插入数据 insert overwrite table ads.ads_member_retention_nday_rate partition (dt='2020-07-23') ( select t1.add_date, t1.retention_nday, t1.retention_count, t2.cnt, t1.retention_count/t2.cnt*100 from ads.ads_member_retention_nday_count t1 join ads.ads_new_member_cnt t2 on t1.add_date=t2.dt where t1.dt='2020-07-23' );通过表 ads_member_retention_nday_rate 我们得到连续三日登陆的会员数是:1973
7.21日登录的会员数是9972, 留存率应该是在20%左右, 查看表ads_member_retention_nday_rate:
结果正确,第一题完成。
作业题2哪些地方可以优化?
在观察原始数据时,我发现其实还有一部分信息,是header部分,根本没有用到,我们可以只取后半段的body部分,减轻ods表的压力。



