栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

stage-5 mode-1: 电商离线数仓项目实战(上)- hw

stage-5 mode-1: 电商离线数仓项目实战(上)- hw

stage-5 mode-1: 电商离线数仓项目实战(上) 介绍

数仓项目实战的笔记

作业题1

本次作业要求最近七天连续三天的活跃会员数。我才用的是small_startlog小数据集,一共是三天的log:

其中一天的log大概是9k~1w条数据。

那么分析得一下步骤:

  1. 数据采集

    自定义拦截器, 对数据进行一定的分类并获取时间

        public Event intercept(Event event) {
            Map headersMap = event.getHeaders();
            //1、获取 event 的 body
            String eventBody = new String(event.getBody(), Charsets.UTF_8);
            //        3、解析body获取json串
            String[] bodyArr = eventBody.split("\s+");
            try {
                String jsonStr = bodyArr[6];
                //4、解析json串获取时间戳
                JSONObject jsonObject = JSON.parseObject(jsonStr);
                String timestampStr = jsonObject.getJSONObject("app_active").getString("time");
                //5、将时间戳转换为字符串 "yyyy-MM-dd"
                long timestamp = Long.parseLong(timestampStr);
                DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyy-MM-dd");
                Instant instant = Instant.ofEpochMilli(timestamp);
                LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
                String date = formatter.format(localDateTime);
                //6、将转换后的字符串放置header中
                headersMap.put("logtime", date);
                event.setHeaders(headersMap);
            } catch (Exception e){
                headersMap.put("logtime", "Unkown");
                event.setHeaders(headersMap);
            }
            return event;
        }
    

    配置flume使用的conf文件:

    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    # taildir source
    a1.sources.r1.type = TAILDIR
    a1.sources.r1.positionFile = /data/lagoudw/conf/startlog_position.json
    a1.sources.r1.filegroups = f1 f2
    a1.sources.r1.filegroups.f1 = /data/lagoudw/logs/start/.*log
    a1.sources.r1.headers.f1.logtype = start
    a1.sources.r1.filegroups.f2 = /data/lagoudw/logs/event/.*log
    a1.sources.r1.headers.f2.logtype = event
    # 自定义拦截器
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = cn.lagou.dw.interceptor.LogTypeInterceptor$Builder
    # memorychannel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 500000
    a1.channels.c1.transactionCapacity = 20000
    # hdfs sink
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = /user/data/logs/%{logtype}/dt=%{logtime}/
    a1.sinks.k1.hdfs.filePrefix = startlog.
    a1.sinks.k1.hdfs.fileType = DataStream
    # 配置文件滚动方式(文件大小1G)
    a1.sinks.k1.hdfs.rollSize = 130000000
    a1.sinks.k1.hdfs.rollCount = 0
    a1.sinks.k1.hdfs.rollInterval = 0
    a1.sinks.k1.hdfs.idleTimeout = 0
    a1.sinks.k1.hdfs.minBlockReplicas = 1
    # 向hdfs上刷新的event的个数
    a1.sinks.k1.hdfs.batchSize = 10000
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
    

    最后使用flume进行数据采集:

    flume-ng agent --conf /opt/apps/flume-1.9/conf --conf-file /data/lagoudw/conf/flume-log2hdfs4.conf -name a1 -Dflume.root.logger=INFO,console
    
  2. 创建ods层表

    创建ods start log表,从/user/data/logs/start处导入数据:

    use ODS;
    create external table ods.ods_start_log(
    `str` string)
    comment '用户启动日志信息'
    partitioned by (`dt` string)
    location '/user/data/logs/start';
    

    为此表加入相应的分区:

    alter table ods.ods_start_log add partition(dt='2020-07-21')
    alter table ods.ods_start_log add partition(dt='2020-07-22')
    alter table ods.ods_start_log add partition(dt='2020-07-23')
    
  3. 创建dwd层表并对数据进行清洗
    sql="
    with tmp as(
    select split(str, ' ')[7] line
    from ods.ods_start_log
    where dt='$do_date'
    )
    insert overwrite table dwd.dwd_start_log
    partition(dt='$do_date')
    select get_json_object(line, '$.attr.device_id'),
    get_json_object(line, '$.attr.area'),
    get_json_object(line, '$.attr.uid'),
    get_json_object(line, '$.attr.app_v'),
    get_json_object(line, '$.attr.event_type'),
    get_json_object(line, '$.attr.os_type'),
    get_json_object(line, '$.attr.channel'),
    get_json_object(line, '$.attr.language'),
    get_json_object(line, '$.attr.brand'),
    get_json_object(line, '$.app_active.json.entry'),
    get_json_object(line, '$.app_active.json.action'),
    get_json_object(line, '$.app_active.json.error_code')
    from tmp;
    "
    hive -e "$sql"
    

    dwd_start_log就是每日会员启动信息明细,为后续dws做准备

  4. 计算活跃会员

    通过dwd_start_Log, 我们可以在dws层计算每日的活跃会员:

    insert overwrite table dws.dws_member_start_day
    partition(dt='$do_date')
    select device_id,
    concat_ws('|', collect_set(uid)),
    concat_ws('|', collect_set(app_v)),
    concat_ws('|', collect_set(os_type)),
    concat_ws('|', collect_set(language)),
    concat_ws('|', collect_set(channel)),
    concat_ws('|', collect_set(area)),
    concat_ws('|', collect_set(brand))
    from dwd.dwd_start_log
    where dt='$do_date'
    group by device_id;
    
  5. 计算每日新增会员

    通过每日活跃会员来计算dws层每日新增会员:

    insert into table dws.dws_member_add_day
    select t1.device_id,
    t1.uid,
    t1.app_v,
    t1.os_type,
    t1.language,
    t1.channel,
    t1.area,
    t1.brand,
    '$do_date'
    from dws.dws_member_start_day t1 left join
    dws.dws_member_add_day t2
    on t1.device_id=t2.device_id
    where t1.dt='$do_date'
    and t2.device_id is null;
    
    -- t1表中的device_id如果没有在t2中出现过,那么此用户就是当天的新增用户,将他的信息加到对应新增会员的表中
    
    -- 接下来就可以计算每日新增会员数了:
    insert overwrite table ads.ads_new_member_cnt
    partition (dt='$do_date')
    select count(1)
    from dws.dws_member_add_day
    where dt = '$do_date'
    

    至此,新增会员数计算完毕,我们会得到如下的一张表:

    cnt(new member count)dt(new member register date)
    99722020-07-21
    50272020-07-22
    40002020-07-23
  6. 计算连续三天留存会员

    因为我们需要计算的是连续三天,就是7.21, 7.22, 7.23 这三天都有登录记录的人,满足这类用户必须在7.21日注册,并且7.22, 7.23都登陆,我们可以分别求出两类用户并取交集:

    1. 7.21注册并且7.22登录
    2. 7.21注册并且7.23登录
    -- 6.1 创建记录连续n天登录的会员明细表
    
    drop table if exists dws.dws_member_retention_nday;
    create table dws.dws_member_retention_nday
    (
    `device_id` string,
    `uid` string,
    `app_v` string,
    `os_type` string,
    `language` string,
    `channel` string,
    `area` string,
    `brand` string,
    `add_date` string comment '会员新增时间',
    `retention_date` int comment '连续留存天数'
    )COMMENT '每日会员留存明细'
    PARTITIonED BY (`dt` string)
    stored as parquet;
    
    -- 6.2 插入数据(仅仅插入了符合要求的连续三天登陆的用户)
    insert overwrite table dws.dws_member_retention_nday
    partition(dt='2020-07-23')
    ( 
    select  t2.device_id,
            t2.uid,
            t2.app_v,
            t2.os_type,
            t2.language,
            t2.channel,
            t2.area,
            t2.brand,
            t2.dt add_date,
            3 retention_nday
            from dws.dws_member_start_day t1 join dws.dws_member_add_day
            t2 on t1.device_id=t2.device_id
            where t2.dt=date_add('2020-07-23', -2)
            and t1.dt='2020-07-23'
            and t2.device_id in (
                select  t2.device_id
                from dws.dws_member_start_day t1 join dws.dws_member_add_day
                t2 on t1.device_id=t2.device_id
                where t2.dt=date_add('2020-07-23', -2)
                and t1.dt='2020-07-22'
            )
    );
    
    -- 接下来进入ads层汇总计算
    
    -- 6.3 创建连续登录n天的会员数表
    drop table if exists ads.ads_member_retention_nday_count;
    create table ads.ads_member_retention_nday_count
    (
    `add_date` string comment '新增日期',
    `retention_nday` int comment '截止当前日期连续活跃天数',
    `retention_count` bigint comment '留存数'
    ) COMMENT '会员留存数'
    partitioned by(dt string)
    row format delimited fields terminated by ',';
    
    -- 6.4 从dws_member_retention_nday 插入数据
    
    insert overwrite table ads.ads_member_retention_nday_count
    partition (dt='2020-07-23')
    select add_date, retention_date,
    	   count(*) retention_count
    from dws.dws_member_retention_nday
    where dt='2020-07-23'
    group by add_date, retention_date;
    
    -- 6.6 连续n天用户活跃率
    drop table if exists ads.ads_member_retention_nday_rate;
    create table ads.ads_member_retention_nday_rate
    (
    `add_date` string comment '新增日期',
    `retention_nday` int comment '截止当前日期连续活跃天数',
    `retention_count` bigint comment '留存数',
    `new_mid_count` bigint comment '当日会员新增数',
    `retention_ratio` decimal(10,2) comment '留存率'
     ) COMMENT '会员留存率'
    partitioned by(dt string)
    row format delimited fields terminated by ',';
    
    -- 6.7 插入数据
    insert overwrite table ads.ads_member_retention_nday_rate
    partition (dt='2020-07-23')
    (
    select  t1.add_date,
            t1.retention_nday,
            t1.retention_count,
            t2.cnt,
            t1.retention_count/t2.cnt*100
    from ads.ads_member_retention_nday_count t1 join
    ads.ads_new_member_cnt t2 on t1.add_date=t2.dt
    where t1.dt='2020-07-23'
    );
    

    通过表 ads_member_retention_nday_rate 我们得到连续三日登陆的会员数是:1973

​ 7.21日登录的会员数是9972, 留存率应该是在20%左右, 查看表ads_member_retention_nday_rate:

结果正确,第一题完成。

作业题2

哪些地方可以优化?

在观察原始数据时,我发现其实还有一部分信息,是header部分,根本没有用到,我们可以只取后半段的body部分,减轻ods表的压力。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/652626.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号