电商用户行为分析
统计分析:
-点击、浏览
-热门商品、近期热门商品、分类热门商品,流量统计
偏好统计
-收藏、喜欢、评分、打标签
-用户画像,推荐列表(结合特征工程和机器学习算法)
风险控制
-下订单、支付、登录
-刷单监控,订单失效监控、恶意登录(短时间内频繁登录失败)监控
数据源解析:
用户行为数据UserBehavior.csv
web服务器日志: apache.log
热门实时商品统计
基本需求:
-统计近1小时内的热门商品,每5分钟更新一次
- 热门度用浏览次数("pv")来衡量
解决思路:
-在所有用户行为数据中,过滤出浏览("pv")行为进行统计
-构建滑动窗口,窗口长度卫1小时,滑动距离为5分钟
DataStream-->KeyedStream-->WindowedStream-->DataStream
1.分区
2.时间窗
3.聚合
按照商品Id进行分区
.keyBy("itemId")
设置时间窗口
.timeWindow(Time.minutes(60),Time.minutes(5))
时间窗口(timeWindow)区间为左闭右开
同一份数据会被分发道不同的窗口
.窗口聚合
.aggregate(new CountAgg(),new WindowResultFunction())
CountAgg()定义窗口聚合的规则
WindowResultFunction定义输出的数据结构ItemViewCount(itemId,windowEnd,count)
WindowedStream-->DataStream
窗口聚合策略-没出现一条记录就加一
class CountAgg extends AggregateFunction[UserBehavior,Long,Long] {
override def createAccumulator(): Long = 0L
override def add(userBehavior:UserBehavior,acc:Long): Long = acc +1
override def getResult(acc:Long): Long = acc
override def merge(acc1: Long,acc2:Long): Any = acc1 + acc2
}
累加规则--窗口内碰到一条数据就加一(add方法)
实现AggregateFunction
进行统计整理---KeyBy("windowEnd")
根据key来保存的数据,ListState
最终排序输出--KeyedProcessFunction
-针对有状态流的底层API
-KeyedProcessFunction会对分区后的每一条子流进行处理
-以windowEnd作为key,保证分流以后每一条流的数据都在一个时间窗口内
-从ListState中读取当前流的状态,存储数据进行排序输出
用ProcessFunction来定义KeyedStream的处理逻辑
分区以后,每个KeyedStream都有自己的声明周期
-open: 初始化,在这里可以获取当前流的状态
-processElement:处理流中每一个元素时调用
-onTimer:定时调用,注册定时器Timer并触发之后的回调操作