- Flink 用户电商行为分析
- 1. 实时统计分析
- 1. 1 热门商品统计
- 1. 2 热门页面统计
- 1. 3 网站uv统计
- 2. 业务流程以及风险控制
- 2. 1 页面广告黑名单过滤
- 2. 2 恶意登陆监控
- 2. 3 订单支付失效监控
- 2. 4 支付实时对账
- 3. 项目地址
-
需求描述:每隔5分钟 实时展示1小时内的该网站的热门商品的TopN
-
展示的数据形式:
时间窗口信息:
NO 1:商品ID+浏览次数1
NO 2:商品ID+浏览次数2
NO 1.商品ID+浏览次数3
-
实现思路:
-
- 因为最终要窗口信息+商品ID 所有keyBy后需要全窗口函数 这样才能拿到窗口时间+key
-
- 而且需要浏览次数 所以需要增量聚合函数 keyBy聚合后来一条数据增量聚合一条 拿到浏览次数
-
- 以上1 2步骤后只能拿到 一个商品的浏览次数 所以为了拿到1小时内的 根据时间窗口keyBy 使用processFunction 窗口内的商品保存到ListStat中 定时器到达窗口截止时间 输出ListStat的数据
-
-
代码
public class HotItemsPractise {
public static void main(String[] args) throws Exception {
// 1. 环境准备
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStreamSource inputStream = env.readTextFile("/Users/liangfangwei/IdeaProjects/flinkUserAnalays/data_file/UserBehavior.csv");
//2. 准备数据源
DataStream filterStream = inputStream.map(line -> {
String[] split = line.split(",");
return new ItemBean(Long.parseLong(split[0]), Long.parseLong(split[1]), Integer.parseInt(split[2]), split[3], Long.parseLong(split[4]));
}).filter(item -> "pv".equals(item.getBehavior()));
//3. 收集一个商品的聚合结果
DataStream windowsResult = filterStream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor() {
@Override
public long extractAscendingTimestamp(ItemBean element) {
return element.getTimestamp() * 1000L;
}
}).keyBy("itemId")
.timeWindow(Time.hours(1), Time.minutes(5))
.aggregate(new MyAggreateCount(), new MyAllWindowsView());
//4. 收集一小时的聚合结果
SingleOutputStreamOperator windowEnd = windowsResult
.keyBy("windowEnd")
.process(new ItemHotTopN(5));
windowEnd.print();
env.execute("HotItemsPractise");
}
public static class MyAggreateCount implements AggregateFunction {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(ItemBean value, Long accumulator) {
return accumulator + 1L;
}
@Override
public Long getResult(Long accumulator) {
return accumulator;
}
@Override
public Long merge(Long a, Long b) {
return null;
}
}
public static class MyAllWindowsView implements WindowFunction {
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable input, Collector out) throws Exception {
long windowEnd = window.getEnd();
long count = input.iterator().next();
long itemId = tuple.getField(0);
out.collect(new ItemViewCount(itemId, windowEnd, count));
}
}
public static class ItemHotTopN extends KeyedProcessFunction {
ListState itemViewCountListState;
private int topN;
public ItemHotTopN(int topN) {
this.topN = topN;
}
@Override
public void open(Configuration parameters) throws Exception {
itemViewCountListState = getRuntimeContext().getListState(new ListStateDescriptor("itemViewCount", ItemViewCount.class));
}
@Override
public void processElement(ItemViewCount itemViewCount, Context ctx, Collector out) throws Exception {
itemViewCountListState.add(itemViewCount);
ctx.timerService().registerEventTimeTimer(itemViewCount.getWindowEnd() + 1L);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {
// ListState转为ArrayList
ArrayList arraylist = Lists.newArrayList(itemViewCountListState.get().iterator());
arraylist.sort(new Comparator() {
@Override
public int compare(ItemViewCount o1, ItemViewCount o2) {
return o2.getCount().intValue() - o1.getCount().intValue();
}
});
StringBuilder resultStringBuilder = new StringBuilder();
resultStringBuilder.append("===================================" + "n");
resultStringBuilder.append("窗口结束时间:").append(new Timestamp(timestamp).toString()).append("n");
for (int i = 0; i < Math.min(topN, arraylist.size()); i++) {
resultStringBuilder
.append("NO ")
.append(i + 1)
.append(": 商品ID = ")
.append(arraylist.get(i).getItemId())
.append(" 热门度 = ")
.append(arraylist.get(i).getCount())
.append("n");
}
resultStringBuilder.append("===================================n");
out.collect(resultStringBuilder.toString());
Thread.sleep(1000L);
}
}
}
1. 2 热门页面统计
- 需求 :每隔5分钟输出一小时内浏览的热门页面
- 输出结果展示:
窗口结束时间:2015-05-18 13:08:50.0
NO 1: 页面URL = /blog/tags/puppet?flav=rss20 热门度 = 11
NO 2: 页面URL = /projects/xdotool/xdotool.xhtml 热门度 = 5
NO 3: 页面URL = /projects/xdotool/ 热门度 = 4
NO 4: 页面URL = /?flav=rss20 热门度 = 4
NO 5: 页面URL = /robots.txt 热门度 = 4
-
实现思路:和上一个不同的是 该数据源中的数据的时间非增量
- 怎么保证保证乱序数据不丢
- 1.所以要设置watermark与数据源之间的乱序程度
- 2.设置一定的窗口延迟关闭时间 在初始的时间窗口到了 先聚合数据 后续再来属于该窗口的数据 来一条计算一条输出一条
- 3.再有迟到的数据 则直接扔到侧输出流中
- 怎么保证后续迟到的数据 来一条覆盖前面的数据
- 1 先开窗增量聚合 再全窗口聚合 再根据窗口截止时间分组
- 2 根据时间的截止窗口开窗key by 收集窗口截止时间内的所有数据 排序输出
- 3 如果后续再来了延迟数据 需要更新之前的结果。所以把之间的数据存咋mapstat中 key为 页面url value为输出结果
- 怎么保证保证乱序数据不丢
-
代码
public class HotPages { public static void main(String[] args) throws Exception { StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); executionEnvironment.setParallelism(1); DataStreamSourcestringDataStreamSource = executionEnvironment.readTextFile("/Users/liangfangwei/IdeaProjects/flinkUserAnalays/data_file/apache.log"); SimpleDateFormat simpleFormatter = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss"); OutputTag lateTag = new OutputTag ("late_date") { }; DataStream streamPageViewCount = stringDataStreamSource.map(line -> { String[] s = line.split(" "); // 日期转时间戳 Long timestamp = simpleFormatter.parse(s[3]).getTime(); return new ApacheLogEvent(s[0], s[1], timestamp, s[5], s[6]); }).filter(date -> "GET".equals(date.getMethod())) .filter(data -> { // 过滤处css js png ico 结尾的 String regex = "((?!\.(css|js|png|ico|jpg)$).)*$"; return Pattern.matches(regex, data.getUrl()); }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor (Time.seconds(1)) { @Override public long extractTimestamp(ApacheLogEvent apacheLogEvent) { return apacheLogEvent.getTimestamp(); } }).keyBy("url") .timeWindow(Time.minutes(10), Time.seconds(5)) .allowedLateness(Time.minutes(1)) .sideOutputLateData(lateTag) .aggregate(new HotPageIncreaseAgg(), new HotPageAllAgg()); SingleOutputStreamOperator windowEnd = streamPageViewCount .keyBy("windowEnd") .process(new MyProcessFunction(5)); // 控制台输出 windowEnd.print("data"); windowEnd.getSideOutput(lateTag).print("late_date"); executionEnvironment.execute(); } public static class HotPageIncreaseAgg implements AggregateFunction { @Override public Long createAccumulator() { return 0L; } @Override public Long add(ApacheLogEvent value, Long accumulator) { return accumulator + 1; } @Override public Long getResult(Long accumulator) { return accumulator; } @Override public Long merge(Long a, Long b) { return a + b; } } public static class HotPageAllAgg implements WindowFunction { @Override public void apply(Tuple tuple, TimeWindow window, Iterable input, Collector out) throws Exception { String url = tuple.getField(0); Long count = input.iterator().next(); long windowEnd = window.getEnd(); out.collect(new PageViewCount(url, windowEnd, count)); } } public static class MyProcessFunction extends KeyedProcessFunction { private Integer topSize; MapState hotPageCount; public MyProcessFunction(Integer topSize) { this.topSize = topSize; } @Override public void open(Configuration parameters) throws Exception { hotPageCount = getRuntimeContext().getMapState(new MapStateDescriptor ("hot_page_count", String.class, Long.class)); } @Override public void processElement(PageViewCount pageViewCount, Context ctx, Collector out) throws Exception { // map 类型 如果key相同就更新 hotPageCount.put(pageViewCount.getUrl(),pageViewCount.getCount()); ctx.timerService().registerEventTimeTimer(pageViewCount.getWindowEnd()+1); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception { Long currentKey = ctx.getCurrentKey().getField(0); // 判断是否到了窗口关闭清理的时间, 如果是 直接清空状态 if (timestamp == currentKey + 60 * 1000L) { hotPageCount.clear(); return; } ArrayList > pageViewCounts = Lists.newArrayList(hotPageCount.entries()); pageViewCounts.sort(new Comparator >() { @Override public int compare(Map.Entry o1, Map.Entry o2) { if(o1.getValue() > o2.getValue()) return -1; else if(o1.getValue() < o2.getValue()) return 1; else return 0; } }); StringBuilder stringBuilder = new StringBuilder(); stringBuilder.append("===================================n"); stringBuilder.append("窗口结束时间:").append(new Timestamp(timestamp - 1)).append("n"); for (int i = 0; i < Math.min(topSize, pageViewCounts.size()); i++) { Map.Entry stringLongEntry = pageViewCounts.get(i); stringBuilder.append("NO ").append(i + 1).append(":") .append(" 页面URL = ").append(stringLongEntry.getKey()) .append(" 热门度 = ").append(stringLongEntry.getValue()) .append("n"); } stringBuilder.append("===============================nn"); // 控制输出频率 Thread.sleep(1000L); out.collect(stringBuilder.toString()); } } }
-
需求:实时输出每小时内网站的uv
-
输出格式: 窗口的截止时间+窗口的独立访问人数
-
实现思路:
-
1.设置滚动窗口为1小时 每来一条数据就要触发计算 那么就需要自定义触发器,
-
2 . 触发器的方法是每条数据都去触发后续的统计逻辑 ,uerid去重,去重逻辑就是每条数据根据uerId去redis中查询,如果有那么丢弃 如果没有则count+1。
-
- 每条数据来了 解析出user ID,根据自定义的hash函数解析出在位图的位置,查询位置的值为1 则取出窗口截止时间对应的访问数并输出,如果为1 设置为1,取出访问数+1 输出 将更新后的count数存储到redis中
存储格式:
count :哈希结果 存储格式 “uv_count”,<窗口的截止时间,访问数>
uesrId :位图
hash函数:userId的当前位的Ascii*seed+上一位的统计结果
-
-
代码
public class HotUVWithBloomFilter {
public static void main(String[] args) throws Exception {
//1.环境准备
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
executionEnvironment.setParallelism(1);
// 2. 准备数据
DataStreamSource inputStream = executionEnvironment.readTextFile("/Users/liangfangwei/IdeaProjects/flinkUserAnalays/data_file/UserBehavior.csv");
SingleOutputStreamOperator filterData = inputStream.map(line -> {
String[] split = line.split(",");
return new ItemBean(Long.parseLong(split[0]), Long.parseLong(split[1]), Integer.parseInt(split[2]), split[3], Long.parseLong(split[4]));
}).assignTimestampsAndWatermarks(new AscendingTimestampExtractor() {
@Override
public long extractAscendingTimestamp(ItemBean element) {
return element.getTimestamp() * 1000L;
}
}).filter(itemBean -> "pv".equals(itemBean.getBehavior()));
//2.滚动窗口为1小时
SingleOutputStreamOperator streamOperator = filterData
.timeWindowAll(Time.hours(1))
//3.定义触发器 需要定义每来一条数据触发计算 而不是等全部的窗口再触发计算
.trigger(new UVTriigger())
// 4 计算逻辑 去redis的位图查是否有没有当前userID
.process(new UVProcessFunction());
// 5 如果没有则 需要插入进去
streamOperator.print();
executionEnvironment.execute();
}
public static class UVTriigger extends Trigger {
@Override
public TriggerResult onElement(ItemBean element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
}
}
public static class UVProcessFunction extends ProcessAllWindowFunction {
private Jedis jedis;
private String pageCountKey = "uv_page_count";
private BloomFilter bloomFilter;
@Override
public void open(Configuration parameters) throws Exception {
jedis = new Jedis("localhost", 6379);
bloomFilter = new BloomFilter(1 << 29);
}
@Override
public void process(Context context, Iterable elements, Collector out) throws Exception {
Long windowEnd1 = context.window().getEnd();
String windowEnd = windowEnd1.toString();
ItemBean itemBean = elements.iterator().next();
Long userId = itemBean.getUserId();
long offset = bloomFilter.hash(userId.toString(), 61);
Boolean isExist = jedis.getbit(windowEnd, offset);
if (!isExist) {
jedis.setbit(windowEnd, offset, true);
// count值+1 cont值存储为hash结构
Long uvCount = 0L; // 初始count值
String uvCountString = jedis.hget(pageCountKey, windowEnd);
if (StringUtils.isNoneBlank(uvCountString)) {
uvCount = Long.valueOf(uvCountString);
}
jedis.hset(pageCountKey, windowEnd, String.valueOf(uvCount + 1));
out.collect(new PageViewCount("uv", windowEnd1, uvCount + 1));
}
}
}
public static class BloomFilter {
// 要去2的幂次方 result&(capacity-1) 才是求余的
private long capacity;
public BloomFilter(long capacity) {
this.capacity = capacity;
}
public long hash(String userId, int seed) {
long result = 0L;
for (int i = 0; i < userId.length(); i++) {
result = result * seed + userId.charAt(i);
}
return result & (capacity - 1);
}
}
}
2. 业务流程以及风险控制
2. 1 页面广告黑名单过滤
-
需求: 输出每个省份每个广告的点击数。统计周期是一个小时 输出间隔是5分钟。要求如果当天某人都某个广告点击次数超过3次 则将该用户输出到侧输出流中 。如果当天内用户再次点击该广告 则计为无效 不做统计
-
输出格式:
-
blacklist-user> BlackAdUerInfo(uerId=937166, adId=1715, count=click over 3times.)
blacklist-user> BlackAdUerInfo(uerId=161501, adId=36156, count=click over 3times.)—>> AdOutputInfo(province=beijing, windowEnd=2017-11-26 09:25:00.0, count=2)
—>> AdOutputInfo(province=guangdong, windowEnd=2017-11-26 09:25:00.0, count=5)
—>> AdOutputInfo(province=beijing, windowEnd=2017-11-26 09:25:00.0, count=2)
—>> AdOutputInfo(province=beijing, windowEnd=2017-11-26 09:30:00.0, count=2)
—>> AdOutputInfo(province=guangdong, windowEnd=2017-11-26 09:30:00.0, count=5)
—>> AdOutputInfo(province=shanghai, windowEnd=2017-11-26 09:30:00.0, count=2)
-
-
统计逻辑:
- 怎么过滤异常数据:根据uerid+adId keyBy 分组 再使用process,该分区每来一条数据 判断是否到达设置的点击数,如果没有则+1,并且输出该条记录 则将该用户的uerID加入到黑名单(侧输出流中) 并注册第二天凌晨的定时器 定时器第二天清空改用好点击次数的状态
- 后面广告数的统计 就和前面统计方式如出一辙了
-
代码
public class AdStatisticsByProvince {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
executionEnvironment.setParallelism(1);
DataStream inputStream = executionEnvironment.readTextFile("/Users/liangfangwei/IdeaProjects/flinkUserAnalays/data_file/AdClickLog.csv");
DataStream processStream1 = inputStream.map(line -> {
String[] split = line.split(",");
return new AdvertInfo(split[0], split[1], split[2], Long.parseLong(split[4]));
}).assignTimestampsAndWatermarks(new AscendingTimestampExtractor() {
@Override
public long extractAscendingTimestamp(AdvertInfo element) {
return element.getTimeStramp() * 1000L;
}
});
// 过滤掉异常的流数据
SingleOutputStreamOperator fliterBlackStream = processStream1
.keyBy("userId", "adId")
.process(new BlackUserProcess(3));
DataStream resultStream = fliterBlackStream
.keyBy("province")
.timeWindow(Time.hours(1), Time.minutes(5))
.aggregate(new IncreaseAggreateEle(), new AllAggreateCount());
fliterBlackStream.getSideOutput(new OutputTag("blacklist"){}).print("blacklist-user");
resultStream.print("--->");
executionEnvironment.execute();
}
public static class BlackUserProcess extends KeyedProcessFunction {
ValueState adClickCount;
ValueState isBlackUser;
private int bound;
public BlackUserProcess(int bound) {
this.bound = bound;
}
@Override
public void open(Configuration parameters) throws Exception {
adClickCount = getRuntimeContext().getState(new ValueStateDescriptor("ad_click_count", Long.class, 0l));
isBlackUser = getRuntimeContext().getState(new ValueStateDescriptor("is_black_user", Boolean.class, false));
}
@Override
public void processElement(AdvertInfo value, Context ctx, Collector out) throws Exception {
// 1.判断是否到了设置的边界 注意状态只保留一天
Long userIdClickCount = adClickCount.value();
// 注册第二天的定时器 如果到了清楚状态
Long timestamp = ctx.timerService().currentProcessingTime();
Long clserTime = ((timestamp / 24 * 60 * 60 * 1000L) + 1) * 24 * 60 * 60 * 1000L - 8 * 60 * 60 * 1000;
ctx.timerService().registerEventTimeTimer(clserTime);
// 2.如果到了设置了边界
if (userIdClickCount >= bound) {
// 2.1 没有在黑名单中
if (!isBlackUser.value()) {
// 加入黑名单 加入到侧输出流中
isBlackUser.update(true);
ctx.output(new OutputTag("blacklist") {
},
new BlackAdUerInfo(value.getUserId(), value.getAdId(), "click over " + userIdClickCount + "times."));
}
// 2.2 在黑名单 直接返回
return;
}
// 3. 如果没有达到设置的边界 更新状态 输出该条数据
adClickCount.update(userIdClickCount+1);
out.collect(value);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {
adClickCount.clear();
isBlackUser.clear();
}
}
public static class IncreaseAggreateEle implements AggregateFunction {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(AdvertInfo value, Long accumulator) {
return accumulator+1;
}
@Override
public Long getResult(Long accumulator) {
return accumulator;
}
@Override
public Long merge(Long a, Long b) {
return a+b;
}
}
public static class AllAggreateCount implements WindowFunction {
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable input, Collector out) throws Exception {
Timestamp formateDate = new Timestamp(window.getEnd());
out.collect(new AdOutputInfo(tuple.getField(0).toString(),formateDate.toString(),input.iterator().next()));
}
}
}
2. 2 恶意登陆监控
-
需求:检测出两秒内 连续登陆失败的2次用用户
-
实现思路:CEP编程 定义 2秒内连续登陆是吧失败两次的规则,并将流应用到改规则上 筛选出应用规则后的流
public class LoginCheck {
public static void main(String[] args) throws Exception {
// 1.定义环境
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
executionEnvironment.setParallelism(1);
executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource stringDataStreamSource = executionEnvironment.readTextFile("/Users/liangfangwei/IdeaProjects/flinkUserAnalays/data_file/LoginLog.csv");
// 2.包装为对象
KeyedStream keyedStream = stringDataStreamSource.map(line -> {
String[] split = line.split(",");
return new LoginInfo(split[0], split[2], Long.parseLong(split[3]));
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.seconds(3)) {
@Override
public long extractTimestamp(LoginInfo element) {
return element.getTimeStamp() * 1000L;
}
}).keyBy("status");
// 3.定义规则
// 3.1 创建规则
Pattern failPattern = Pattern.begin("loginFailEvent").where(new SimpleCondition() {
@Override
public boolean filter(LoginInfo value) throws Exception {
return "fail".equals(value.getStatus());
}
// 连续三次登陆失败 consecutive 设置为严格近邻
}).times(3).consecutive().within(Time.seconds(5));
// 3.2 规则匹配到流上
PatternStream pattern = CEP.pattern(keyedStream, failPattern);
// 3.3 筛选数据
SingleOutputStreamOperator selectStream = pattern.select(new PatternSelectFunction() {
@Override
public LoginFailInfo select(Map> pattern) throws Exception {
List loginFailEvent = pattern.get("loginFailEvent");
LoginInfo firstFail = loginFailEvent.get(0);
String userId = firstFail.getUserId();
LoginInfo lastFail = pattern.get("loginFailEvent").get(loginFailEvent.size()-1);
Timestamp firstFailTimeStamp = new Timestamp(firstFail.getTimeStamp() * 1000L);
Timestamp secondFailTimeStamp = new Timestamp(lastFail.getTimeStamp() * 1000L);
return new LoginFailInfo(userId, firstFailTimeStamp.toString(), secondFailTimeStamp.toString(), "连续"+loginFailEvent.size()+"登陆失败");
}
});
selectStream.print();
executionEnvironment.execute();
}
}
2. 3 订单支付失效监控
-
需求:实时检测是15分钟内下单咩有支付的订单
-
实现逻辑:
- 定义CEP规则: 15分钟内有下单和支付的规则。
- 匹配到流上
- 从匹配的流上筛选出匹配的数据,并从map中解析出延迟数据和非延数据
- 匹配的数据会封装到map中
- 没匹配上的数据也会输出到map中
-
代码
public class OrderCheck {
private static final Logger logger = LoggerFactory.getLogger(OrderCheck.class);
public static void main(String[] args) throws Exception {
// 1.
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
executionEnvironment.setParallelism(1);
executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource stringDataStreamSource = executionEnvironment.readTextFile("/Users/liangfangwei/IdeaProjects/flinkUserAnalays/data_file/OrderLog.csv");
SingleOutputStreamOperator objectSingleOutputStreamOperator = stringDataStreamSource.map(line -> {
String[] split = line.split(",");
return new OrderInfo(split[0], split[1], split[2], Long.parseLong(split[3]));
}).assignTimestampsAndWatermarks(new AscendingTimestampExtractor() {
@Override
public long extractAscendingTimestamp(OrderInfo element) {
return element.getTimeStamp()*1000L;
}
});
// 2 定义规则
Pattern orderPayPattern = Pattern.begin("create").where(new SimpleCondition() {
@Override
public boolean filter(OrderInfo value) throws Exception {
return "create".equals(value.getStatus());
}
}).followedBy("pay").where(new SimpleCondition() {
@Override
public boolean filter(OrderInfo value) throws Exception {
return "pay".equals(value.getStatus());
}
}).within(Time.minutes(15));
// 3. 匹配模式
PatternStream orderStream = CEP.pattern(objectSingleOutputStreamOperator.keyBy("orderId"), orderPayPattern);
OutputTag outputTag = new OutputTag("timeoutStream") {
};
// 4. 筛选输出匹配上和超时事件
SingleOutputStreamOperator resultStream = orderStream.select(outputTag, new OrderTimeoutSelect(), new OrderPaySelect());
resultStream.print("payed normally");
resultStream.getSideOutput(outputTag).print("timeout");
executionEnvironment.execute("order timeout detect job");
}
public static class OrderTimeoutSelect implements PatternTimeoutFunction{
@Override
public OrderTimeoutInfo timeout(Map> pattern, long timeoutTimestamp) throws Exception {
logger.error("rrrr_locker: get locker fail: key={}", pattern.toString());
OrderInfo OrderInfo = pattern.get("create").get(0);
return new OrderTimeoutInfo(OrderInfo.getOrderId(),"timeout"+timeoutTimestamp);
}
}
public static class OrderPaySelect implements PatternSelectFunction{
@Override
public OrderTimeoutInfo select(Map> pattern) throws Exception {
OrderInfo OrderInfo = pattern.get("pay").get(0);
return new OrderTimeoutInfo(OrderInfo.getOrderId(),"pay");
}
}
}
2. 4 支付实时对账
-
双流join
-
代码
public class OrderPay { private final static OutputTagunmatchedPays = new OutputTag ("unmatchedPays") { }; private final static OutputTag unmatchedReceipts = new OutputTag ("unmatchedReceipts") { }; public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //1. 支付数据 DataStreamSource inputSteam1 = env.readTextFile("/Users/liangfangwei/IdeaProjects/flinkUserAnalays/data_file/OrderLog.csv"); SingleOutputStreamOperator orderStream = inputSteam1.map(line -> { String[] split = line.split(","); return new OrderInfo(split[0], split[1], split[2], Long.parseLong(split[3])); }).assignTimestampsAndWatermarks(new AscendingTimestampExtractor () { @Override public long extractAscendingTimestamp(OrderInfo element) { return element.getTimeStamp() * 1000L; } }); // 2.入账数据 DataStreamSource inputStream2 = env.readTextFile("/Users/liangfangwei/IdeaProjects/flinkUserAnalays/data_file/OrderLog.csv"); SingleOutputStreamOperator payStream = inputStream2.map(line -> { String[] split = line.split(","); return new Receipt(split[0], split[1], Long.parseLong(split[2])); }).assignTimestampsAndWatermarks(new AscendingTimestampExtractor () { @Override public long extractAscendingTimestamp(Receipt element) { return element.getTimeStamp() * 1000L; } }); // 3.双里join SingleOutputStreamOperator > resultStream = orderStream.keyBy("payId").connect(payStream.keyBy("payId")).process(new DoubleStreamJoinProcess()); // 4.如果join上返回 resultStream.print("matched"); resultStream.getSideOutput(unmatchedPays).print("unmatchedPays"); } public static class DoubleStreamJoinProcess extends CoProcessFunction > { ValueState payState; ValueState receiptState; @Override public void open(Configuration parameters) throws Exception { payState = getRuntimeContext().getState(new ValueStateDescriptor ("pay", OrderInfo.class)); receiptState = getRuntimeContext().getState(new ValueStateDescriptor ("receipt", Receipt.class)); } @Override public void processElement1(OrderInfo orderInfo, Context ctx, Collector > out) throws Exception { Receipt receipt = receiptState.value(); // 取出流2 if (receipt != null) { out.collect(new Tuple2<>(orderInfo, receipt)); receiptState.clear(); } else { payState.update(orderInfo); ctx.timerService().registerEventTimeTimer(orderInfo.getTimeStamp() * 1000L + 5000L); } } @Override public void processElement2(Receipt receipt, Context ctx, Collector > out) throws Exception { // 取出流1 OrderInfo orderInfo = payState.value(); if (orderInfo != null) { out.collect(new Tuple2<>(orderInfo, receipt)); payState.clear(); } else { receiptState.update(receipt); ctx.timerService().registerEventTimeTimer(receipt.getTimeStamp() * 1000L + 5000L); } } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector > out) throws Exception { if (payState.value() != null) { ctx.output(unmatchedPays, payState.value()); } if (receiptState.value() != null) { ctx.output(unmatchedReceipts, receiptState.value()); } payState.clear(); receiptState.clear(); super.onTimer(timestamp, ctx, out); } } }
项目地址欢迎大家来踩踩踩



