DWS 层的定位是什么
- 轻度聚合,因为 DWS 层要应对很多实时查询,如果是完全的明细那么查询的压力是非常大的。将更多的实时数据以主题的方式组合起来便于管理,同时也能减少维度查询的次数。
DWS 层-访客主题宽表的计算
设计一张 DWS 层的表其实就两件事:维度和度量(事实数据)
- 度量包括 PV、UV、跳出次数、进入页面数(session_count)、连续访问时长维度包括在分析中比较重要的几个字段:渠道、地区、版本、新老用户进行聚合
需求分析与思路
- 接收各个明细数据,变为数据流把数据流合并在一起,成为一个相同格式对象的数据流对合并的流进行聚合,聚合的时间窗口决定了数据的时效性把聚合结果写在数据库中
代码实现:
public class VisitorStatsApp {
public static void main(String[] args) throws Exception {
//TODO 1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//1.1 设置CK&状态后端
//env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/gmall-flink-210325/ck"));
//env.enableCheckpointing(5000L);
//env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//env.getCheckpointConfig().setCheckpointTimeout(10000L);
//env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
//env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000);
//env.setRestartStrategy(RestartStrategies.fixedDelayRestart());
//TODO 2.读取Kafka数据创建流
String groupId = "visitor_stats_app_210325";
String uniqueVisitSourceTopic = "dwm_unique_visit";
String userJumpDetailSourceTopic = "dwm_user_jump_detail";
String pageViewSourceTopic = "dwd_page_log";
DataStreamSource uvDS = env.addSource(MyKafkaUtil.getKafkaConsumer(uniqueVisitSourceTopic, groupId));
DataStreamSource ujDS = env.addSource(MyKafkaUtil.getKafkaConsumer(userJumpDetailSourceTopic, groupId));
DataStreamSource pvDS = env.addSource(MyKafkaUtil.getKafkaConsumer(pageViewSourceTopic, groupId));
//TODO 3.将每个流处理成相同的数据类型
//3.1 处理UV数据
SingleOutputStreamOperator visitorStatsWithUvDS = uvDS.map(line -> {
JSONObject jsonObject = JSON.parseObject(line);
//提取公共字段
JSONObject common = jsonObject.getJSONObject("common");
return new VisitorStats("", "",
common.getString("vc"),
common.getString("ch"),
common.getString("ar"),
common.getString("is_new"),
1L, 0L, 0L, 0L, 0L,
jsonObject.getLong("ts"));
});
//3.2 处理UJ数据
SingleOutputStreamOperator visitorStatsWithUjDS = ujDS.map(line -> {
JSONObject jsonObject = JSON.parseObject(line);
//提取公共字段
JSONObject common = jsonObject.getJSONObject("common");
return new VisitorStats("", "",
common.getString("vc"),
common.getString("ch"),
common.getString("ar"),
common.getString("is_new"),
0L, 0L, 0L, 1L, 0L,
jsonObject.getLong("ts"));
});
//3.3 处理PV数据
SingleOutputStreamOperator visitorStatsWithPvDS = pvDS.map(line -> {
JSONObject jsonObject = JSON.parseObject(line);
//获取公共字段
JSONObject common = jsonObject.getJSONObject("common");
//获取页面信息
JSONObject page = jsonObject.getJSONObject("page");
String last_page_id = page.getString("last_page_id");
long sv = 0L;
if (last_page_id == null || last_page_id.length() <= 0) {
sv = 1L;
}
return new VisitorStats("", "",
common.getString("vc"),
common.getString("ch"),
common.getString("ar"),
common.getString("is_new"),
0L, 1L, sv, 0L, page.getLong("during_time"),
jsonObject.getLong("ts"));
});
//TODO 4.Union几个流
DataStream unionDS = visitorStatsWithUvDS.union(
visitorStatsWithUjDS,
visitorStatsWithPvDS);
//TODO 5.提取时间戳生成WaterMark
SingleOutputStreamOperator visitorStatsWithWMDS = unionDS.assignTimestampsAndWatermarks(WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(11))
.withTimestampAssigner(new SerializableTimestampAssigner() {
@Override
public long extractTimestamp(VisitorStats element, long recordTimestamp) {
return element.getTs();
}
}));
//TODO 6.按照维度信息分组
KeyedStream> keyedStream = visitorStatsWithWMDS.keyBy(new KeySelector>() {
@Override
public Tuple4 getKey(VisitorStats value) throws Exception {
return new Tuple4(
value.getAr(),
value.getCh(),
value.getIs_new(),
value.getVc());
}
});
//TODO 7.开窗聚合 10s的滚动窗口
WindowedStream, TimeWindow> windowedStream = keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(10)));
SingleOutputStreamOperator result = windowedStream.reduce(new ReduceFunction() {
@Override
public VisitorStats reduce(VisitorStats value1, VisitorStats value2) throws Exception {
value1.setUv_ct(value1.getUv_ct() + value2.getUv_ct());
value1.setPv_ct(value1.getPv_ct() + value2.getPv_ct());
value1.setSv_ct(value1.getSv_ct() + value2.getSv_ct());
value1.setUj_ct(value1.getUj_ct() + value2.getUj_ct());
value1.setDur_sum(value1.getDur_sum() + value2.getDur_sum());
return value1;
}
}, new WindowFunction, TimeWindow>() {
@Override
public void apply(Tuple4 stringStringStringStringTuple4, TimeWindow window, Iterable input, Collector out) throws Exception {
long start = window.getStart();
long end = window.getEnd();
VisitorStats visitorStats = input.iterator().next();
//补充窗口信息
visitorStats.setStt(DateTimeUtil.toYMDhms(new Date(start)));
visitorStats.setEdt(DateTimeUtil.toYMDhms(new Date(end)));
out.collect(visitorStats);
}
});
//TODO 8.将数据写入ClickHouse
result.print(">>>>>>>>>>>");
result.addSink(ClickHouseUtil.getSink("insert into visitor_stats_210325 values(?,?,?,?,?,?,?,?,?,?,?,?)"));
//TODO 9.启动任务
env.execute("VisitorStatsApp");
}
}
流程图:



