栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink 用户电商行为分析项目

Flink 用户电商行为分析项目

Flink 用户电商行为分析

文章目录
      • Flink 用户电商行为分析
        • 1. 实时统计分析
          • 1. 1 热门商品统计
          • 1. 2 热门页面统计
          • 1. 3 网站uv统计
        • 2. 业务流程以及风险控制
          • 2. 1 页面广告黑名单过滤
          • 2. 2 恶意登陆监控
          • 2. 3 订单支付失效监控
          • 2. 4 支付实时对账
        • 3. 项目地址

1. 实时统计分析 1. 1 热门商品统计
  • 需求描述:每隔5分钟 实时展示1小时内的该网站的热门商品的TopN

  • 展示的数据形式:

    时间窗口信息:

    NO 1:商品ID+浏览次数1

    NO 2:商品ID+浏览次数2

    NO 1.商品ID+浏览次数3

  • 实现思路:

      1. 因为最终要窗口信息+商品ID 所有keyBy后需要全窗口函数 这样才能拿到窗口时间+key
      1. 而且需要浏览次数 所以需要增量聚合函数 keyBy聚合后来一条数据增量聚合一条 拿到浏览次数
      1. 以上1 2步骤后只能拿到 一个商品的浏览次数 所以为了拿到1小时内的 根据时间窗口keyBy 使用processFunction 窗口内的商品保存到ListStat中 定时器到达窗口截止时间 输出ListStat的数据
  • 代码


public class HotItemsPractise {

    public static void main(String[] args) throws Exception {
        //    1. 环境准备
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);
        DataStreamSource inputStream = env.readTextFile("/Users/liangfangwei/IdeaProjects/flinkUserAnalays/data_file/UserBehavior.csv");
        //2. 准备数据源
        DataStream filterStream = inputStream.map(line -> {
            String[] split = line.split(",");
            return new ItemBean(Long.parseLong(split[0]), Long.parseLong(split[1]), Integer.parseInt(split[2]), split[3], Long.parseLong(split[4]));

        }).filter(item -> "pv".equals(item.getBehavior()));
       //3. 收集一个商品的聚合结果
        DataStream windowsResult = filterStream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor() {
            @Override
            public long extractAscendingTimestamp(ItemBean element) {
                return element.getTimestamp() * 1000L;
            }
        }).keyBy("itemId")
                .timeWindow(Time.hours(1), Time.minutes(5))
                .aggregate(new MyAggreateCount(), new MyAllWindowsView());
        //4. 收集一小时的聚合结果

        SingleOutputStreamOperator windowEnd = windowsResult
                .keyBy("windowEnd")
                .process(new ItemHotTopN(5));
        windowEnd.print();

        env.execute("HotItemsPractise");
    }

    
    public static class MyAggreateCount implements AggregateFunction {

        @Override
        public Long createAccumulator() {
            return 0L;
        }

        @Override
        public Long add(ItemBean value, Long accumulator) {
            return accumulator + 1L;
        }

        @Override
        public Long getResult(Long accumulator) {
            return accumulator;
        }

        @Override
        public Long merge(Long a, Long b) {
            return null;
        }
    }
    
    public static class MyAllWindowsView implements WindowFunction {

        
        @Override
        public void apply(Tuple tuple, TimeWindow window, Iterable input, Collector out) throws Exception {
            long windowEnd = window.getEnd();
            long count = input.iterator().next();
            long itemId = tuple.getField(0);

            out.collect(new ItemViewCount(itemId, windowEnd, count));
        }
    }

    
    public static class ItemHotTopN extends KeyedProcessFunction {
        ListState itemViewCountListState;
        private int topN;

        public ItemHotTopN(int topN) {
            this.topN = topN;
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            itemViewCountListState = getRuntimeContext().getListState(new ListStateDescriptor("itemViewCount", ItemViewCount.class));
        }

        @Override
        public void processElement(ItemViewCount itemViewCount, Context ctx, Collector out) throws Exception {
            itemViewCountListState.add(itemViewCount);
            ctx.timerService().registerEventTimeTimer(itemViewCount.getWindowEnd() + 1L);
        }

        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {
            // ListState转为ArrayList
            ArrayList arraylist = Lists.newArrayList(itemViewCountListState.get().iterator());

            arraylist.sort(new Comparator() {
                @Override
                public int compare(ItemViewCount o1, ItemViewCount o2) {
                    return o2.getCount().intValue() - o1.getCount().intValue();
                }
            });
            StringBuilder resultStringBuilder = new StringBuilder();
            resultStringBuilder.append("===================================" + "n");
            resultStringBuilder.append("窗口结束时间:").append(new Timestamp(timestamp).toString()).append("n");

            for (int i = 0; i < Math.min(topN, arraylist.size()); i++) {

                resultStringBuilder
                        .append("NO ")
                        .append(i + 1)
                        .append(": 商品ID = ")
                        .append(arraylist.get(i).getItemId())
                        .append(" 热门度 = ")
                        .append(arraylist.get(i).getCount())
                        .append("n");

            }
            resultStringBuilder.append("===================================n");
            out.collect(resultStringBuilder.toString());
            Thread.sleep(1000L);

        }

    }
}

1. 2 热门页面统计
  • 需求 :每隔5分钟输出一小时内浏览的热门页面
  • 输出结果展示:

窗口结束时间:2015-05-18 13:08:50.0

NO 1: 页面URL = /blog/tags/puppet?flav=rss20 热门度 = 11
NO 2: 页面URL = /projects/xdotool/xdotool.xhtml 热门度 = 5
NO 3: 页面URL = /projects/xdotool/ 热门度 = 4
NO 4: 页面URL = /?flav=rss20 热门度 = 4
NO 5: 页面URL = /robots.txt 热门度 = 4

  • 实现思路:和上一个不同的是 该数据源中的数据的时间非增量

    • 怎么保证保证乱序数据不丢
      • 1.所以要设置watermark与数据源之间的乱序程度
      • 2.设置一定的窗口延迟关闭时间 在初始的时间窗口到了 先聚合数据 后续再来属于该窗口的数据 来一条计算一条输出一条
      • 3.再有迟到的数据 则直接扔到侧输出流中
    • 怎么保证后续迟到的数据 来一条覆盖前面的数据
      • 1 先开窗增量聚合 再全窗口聚合 再根据窗口截止时间分组
      • 2 根据时间的截止窗口开窗key by 收集窗口截止时间内的所有数据 排序输出
      • 3 如果后续再来了延迟数据 需要更新之前的结果。所以把之间的数据存咋mapstat中 key为 页面url value为输出结果
  • 代码

    
    public class HotPages {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            executionEnvironment.setParallelism(1);
    
            DataStreamSource stringDataStreamSource = executionEnvironment.readTextFile("/Users/liangfangwei/IdeaProjects/flinkUserAnalays/data_file/apache.log");
    
            SimpleDateFormat simpleFormatter = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss");
    
            OutputTag lateTag = new OutputTag("late_date") {
            };
    
            DataStream streamPageViewCount = stringDataStreamSource.map(line -> {
                String[] s = line.split(" ");
                // 日期转时间戳
                Long timestamp = simpleFormatter.parse(s[3]).getTime();
                return new ApacheLogEvent(s[0], s[1], timestamp, s[5], s[6]);
            }).filter(date -> "GET".equals(date.getMethod()))
                    .filter(data -> {
                        // 过滤处css  js png ico 结尾的
                        String regex = "((?!\.(css|js|png|ico|jpg)$).)*$";
                        return Pattern.matches(regex, data.getUrl());
                    }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.seconds(1)) {
                        @Override
                        public long extractTimestamp(ApacheLogEvent apacheLogEvent) {
    
                            return apacheLogEvent.getTimestamp();
                        }
                    }).keyBy("url")
                    .timeWindow(Time.minutes(10), Time.seconds(5))
                    .allowedLateness(Time.minutes(1))
                    .sideOutputLateData(lateTag)
                    .aggregate(new HotPageIncreaseAgg(), new HotPageAllAgg());
    
            SingleOutputStreamOperator windowEnd = streamPageViewCount
                    .keyBy("windowEnd")
                    .process(new MyProcessFunction(5));
            // 控制台输出
            windowEnd.print("data");
            windowEnd.getSideOutput(lateTag).print("late_date");
            executionEnvironment.execute();
    
        }
    
    
        public static class HotPageIncreaseAgg implements AggregateFunction {
    
    
            @Override
            public Long createAccumulator() {
                return 0L;
            }
    
            @Override
            public Long add(ApacheLogEvent value, Long accumulator) {
                return accumulator + 1;
            }
    
            @Override
            public Long getResult(Long accumulator) {
                return accumulator;
            }
    
            @Override
            public Long merge(Long a, Long b) {
                return a + b;
            }
        }
    
        public static class HotPageAllAgg implements WindowFunction {
    
    
            @Override
            public void apply(Tuple tuple, TimeWindow window, Iterable input, Collector out) throws Exception {
    
                String url = tuple.getField(0);
                Long count = input.iterator().next();
                long windowEnd = window.getEnd();
    
                out.collect(new PageViewCount(url, windowEnd, count));
            }
        }
    
        public static class MyProcessFunction extends KeyedProcessFunction {
            private Integer topSize;
    
            MapState hotPageCount;
    
            public MyProcessFunction(Integer topSize) {
                this.topSize = topSize;
            }
    
            @Override
            public void open(Configuration parameters) throws Exception {
    
                  hotPageCount = getRuntimeContext().getMapState(new MapStateDescriptor("hot_page_count", String.class, Long.class));
            }
    
            
            @Override
            public void processElement(PageViewCount pageViewCount, Context ctx, Collector out) throws Exception {
                // map 类型 如果key相同就更新
                     hotPageCount.put(pageViewCount.getUrl(),pageViewCount.getCount());
                    ctx.timerService().registerEventTimeTimer(pageViewCount.getWindowEnd()+1);
            }
    
    
            
            @Override
            public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws
                    Exception {
                Long currentKey = ctx.getCurrentKey().getField(0);
                // 判断是否到了窗口关闭清理的时间, 如果是 直接清空状态
               if (timestamp == currentKey + 60 * 1000L) {
                   hotPageCount.clear();
                    return;
                }
                ArrayList> pageViewCounts = Lists.newArrayList(hotPageCount.entries());
    
                pageViewCounts.sort(new Comparator>() {
                    @Override
                    public int compare(Map.Entry o1, Map.Entry o2) {
                        if(o1.getValue() > o2.getValue())
                            return -1;
                        else if(o1.getValue() < o2.getValue())
                            return 1;
                        else
                            return 0;
                    }
                });
                StringBuilder stringBuilder = new StringBuilder();
    
                stringBuilder.append("===================================n");
                stringBuilder.append("窗口结束时间:").append(new Timestamp(timestamp - 1)).append("n");
                for (int i = 0; i < Math.min(topSize, pageViewCounts.size()); i++) {
                    Map.Entry stringLongEntry = pageViewCounts.get(i);
                    stringBuilder.append("NO ").append(i + 1).append(":")
                            .append(" 页面URL = ").append(stringLongEntry.getKey())
                            .append(" 热门度 = ").append(stringLongEntry.getValue())
                            .append("n");
                }
                stringBuilder.append("===============================nn");
    
                // 控制输出频率
               Thread.sleep(1000L);
    
                out.collect(stringBuilder.toString());
    
            }
        }
    
    
    }
    
    
    
1. 3 网站uv统计
  • 需求:实时输出每小时内网站的uv

  • 输出格式: 窗口的截止时间+窗口的独立访问人数

  • 实现思路:

    • 1.设置滚动窗口为1小时 每来一条数据就要触发计算 那么就需要自定义触发器,

    • 2 . 触发器的方法是每条数据都去触发后续的统计逻辑 ,uerid去重,去重逻辑就是每条数据根据uerId去redis中查询,如果有那么丢弃 如果没有则count+1。

      1. 每条数据来了 解析出user ID,根据自定义的hash函数解析出在位图的位置,查询位置的值为1 则取出窗口截止时间对应的访问数并输出,如果为1 设置为1,取出访问数+1 输出 将更新后的count数存储到redis中

      存储格式:

      count :哈希结果 存储格式 “uv_count”,<窗口的截止时间,访问数>

      uesrId :位图

      hash函数:userId的当前位的Ascii*seed+上一位的统计结果

  • 代码





public class HotUVWithBloomFilter {
    public static void main(String[] args) throws Exception {
        //1.环境准备
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        executionEnvironment.setParallelism(1);
        // 2. 准备数据

        DataStreamSource inputStream = executionEnvironment.readTextFile("/Users/liangfangwei/IdeaProjects/flinkUserAnalays/data_file/UserBehavior.csv");
        SingleOutputStreamOperator filterData = inputStream.map(line -> {
            String[] split = line.split(",");
            return new ItemBean(Long.parseLong(split[0]), Long.parseLong(split[1]), Integer.parseInt(split[2]), split[3], Long.parseLong(split[4]));
        }).assignTimestampsAndWatermarks(new AscendingTimestampExtractor() {
            @Override
            public long extractAscendingTimestamp(ItemBean element) {
                return element.getTimestamp() * 1000L;
            }
        }).filter(itemBean -> "pv".equals(itemBean.getBehavior()));

        //2.滚动窗口为1小时
        SingleOutputStreamOperator streamOperator = filterData
                .timeWindowAll(Time.hours(1))
      //3.定义触发器 需要定义每来一条数据触发计算 而不是等全部的窗口再触发计算
                .trigger(new UVTriigger())
      // 4 计算逻辑 去redis的位图查是否有没有当前userID
                .process(new UVProcessFunction());
        // 5 如果没有则 需要插入进去
        streamOperator.print();
        executionEnvironment.execute();
    }

    
    public static class UVTriigger extends Trigger {
        @Override
        public TriggerResult onElement(ItemBean element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
            return TriggerResult.FIRE_AND_PURGE;
        }

        @Override
        public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
            return TriggerResult.CONTINUE;
        }

        @Override
        public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
            return TriggerResult.CONTINUE;
        }

        @Override
        public void clear(TimeWindow window, TriggerContext ctx) throws Exception {

        }
    }

    public static class UVProcessFunction extends ProcessAllWindowFunction {
        private Jedis jedis;
        private String pageCountKey = "uv_page_count";
        private BloomFilter bloomFilter;

        @Override
        public void open(Configuration parameters) throws Exception {
            jedis = new Jedis("localhost", 6379);
            bloomFilter = new BloomFilter(1 << 29);
        }


        
        @Override
        public void process(Context context, Iterable elements, Collector out) throws Exception {
            Long windowEnd1 = context.window().getEnd();
            String windowEnd = windowEnd1.toString();
            ItemBean itemBean = elements.iterator().next();
            Long userId = itemBean.getUserId();
            long offset = bloomFilter.hash(userId.toString(), 61);

            Boolean isExist = jedis.getbit(windowEnd, offset);
            if (!isExist) {
                jedis.setbit(windowEnd, offset, true);
                // count值+1 cont值存储为hash结构
                Long uvCount = 0L;    // 初始count值

                String uvCountString = jedis.hget(pageCountKey, windowEnd);
                if (StringUtils.isNoneBlank(uvCountString)) {
                    uvCount = Long.valueOf(uvCountString);

                }
                jedis.hset(pageCountKey, windowEnd, String.valueOf(uvCount + 1));
                out.collect(new PageViewCount("uv", windowEnd1, uvCount + 1));


            }

        }
    }

    public static class BloomFilter {
        // 要去2的幂次方 result&(capacity-1) 才是求余的
        private long capacity;

        public BloomFilter(long capacity) {
            this.capacity = capacity;
        }

        public long hash(String userId, int seed) {
            long result = 0L;
            for (int i = 0; i < userId.length(); i++) {
                result = result * seed + userId.charAt(i);
            }

            return result & (capacity - 1);
        }

    }
}


2. 业务流程以及风险控制 2. 1 页面广告黑名单过滤
  • 需求: 输出每个省份每个广告的点击数。统计周期是一个小时 输出间隔是5分钟。要求如果当天某人都某个广告点击次数超过3次 则将该用户输出到侧输出流中 。如果当天内用户再次点击该广告 则计为无效 不做统计

  • 输出格式:

    • blacklist-user> BlackAdUerInfo(uerId=937166, adId=1715, count=click over 3times.)
      blacklist-user> BlackAdUerInfo(uerId=161501, adId=36156, count=click over 3times.)

      —>> AdOutputInfo(province=beijing, windowEnd=2017-11-26 09:25:00.0, count=2)

      —>> AdOutputInfo(province=guangdong, windowEnd=2017-11-26 09:25:00.0, count=5)
      —>> AdOutputInfo(province=beijing, windowEnd=2017-11-26 09:25:00.0, count=2)
      —>> AdOutputInfo(province=beijing, windowEnd=2017-11-26 09:30:00.0, count=2)
      —>> AdOutputInfo(province=guangdong, windowEnd=2017-11-26 09:30:00.0, count=5)
      —>> AdOutputInfo(province=shanghai, windowEnd=2017-11-26 09:30:00.0, count=2)

  • 统计逻辑:

    • 怎么过滤异常数据:根据uerid+adId keyBy 分组 再使用process,该分区每来一条数据 判断是否到达设置的点击数,如果没有则+1,并且输出该条记录 则将该用户的uerID加入到黑名单(侧输出流中) 并注册第二天凌晨的定时器 定时器第二天清空改用好点击次数的状态
    • 后面广告数的统计 就和前面统计方式如出一辙了
  • 代码





public class AdStatisticsByProvince {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        executionEnvironment.setParallelism(1);
        DataStream inputStream = executionEnvironment.readTextFile("/Users/liangfangwei/IdeaProjects/flinkUserAnalays/data_file/AdClickLog.csv");
        DataStream processStream1 = inputStream.map(line -> {
            String[] split = line.split(",");
            return new AdvertInfo(split[0], split[1], split[2], Long.parseLong(split[4]));
        }).assignTimestampsAndWatermarks(new AscendingTimestampExtractor() {
            @Override
            public long extractAscendingTimestamp(AdvertInfo element) {
                return element.getTimeStramp() * 1000L;
            }
        });
        // 过滤掉异常的流数据
        SingleOutputStreamOperator fliterBlackStream = processStream1
                .keyBy("userId", "adId")
                .process(new BlackUserProcess(3));

        DataStream resultStream = fliterBlackStream
                .keyBy("province")
                .timeWindow(Time.hours(1), Time.minutes(5))
                .aggregate(new IncreaseAggreateEle(), new AllAggreateCount());
        fliterBlackStream.getSideOutput(new OutputTag("blacklist"){}).print("blacklist-user");

        resultStream.print("--->");

        executionEnvironment.execute();
    }


    

    public static class BlackUserProcess extends KeyedProcessFunction {
        ValueState adClickCount;
        ValueState isBlackUser;


        private int bound;

        public BlackUserProcess(int bound) {

            this.bound = bound;
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            adClickCount = getRuntimeContext().getState(new ValueStateDescriptor("ad_click_count", Long.class, 0l));
            isBlackUser = getRuntimeContext().getState(new ValueStateDescriptor("is_black_user", Boolean.class, false));


        }

        
        @Override
        public void processElement(AdvertInfo value, Context ctx, Collector out) throws Exception {
            // 1.判断是否到了设置的边界 注意状态只保留一天
            Long userIdClickCount = adClickCount.value();
            // 注册第二天的定时器 如果到了清楚状态
            Long timestamp = ctx.timerService().currentProcessingTime();
            Long clserTime = ((timestamp / 24 * 60 * 60 * 1000L) + 1) * 24 * 60 * 60 * 1000L - 8 * 60 * 60 * 1000;
            ctx.timerService().registerEventTimeTimer(clserTime);

            // 2.如果到了设置了边界
            if (userIdClickCount >= bound) {
                // 2.1 没有在黑名单中
                if (!isBlackUser.value()) {
                    // 加入黑名单 加入到侧输出流中
                    isBlackUser.update(true);
                    ctx.output(new OutputTag("blacklist") {
                               },
                            new BlackAdUerInfo(value.getUserId(), value.getAdId(), "click over " + userIdClickCount + "times."));
                }
                // 2.2 在黑名单 直接返回
                return;
            }


            // 3. 如果没有达到设置的边界 更新状态 输出该条数据
            adClickCount.update(userIdClickCount+1);
            out.collect(value);

        }


        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {
            adClickCount.clear();
            isBlackUser.clear();
        }
    }


    public static class IncreaseAggreateEle implements AggregateFunction {


        @Override
        public Long createAccumulator() {
            return 0L;
        }

        @Override
        public Long add(AdvertInfo value, Long accumulator) {
            return accumulator+1;
        }

        @Override
        public Long getResult(Long accumulator) {
            return accumulator;
        }

        @Override
        public Long merge(Long a, Long b) {
            return a+b;
        }
    }


    public static class AllAggreateCount implements WindowFunction {

        @Override
        public void apply(Tuple tuple, TimeWindow window, Iterable input, Collector out) throws Exception {
            Timestamp formateDate = new Timestamp(window.getEnd());
            out.collect(new AdOutputInfo(tuple.getField(0).toString(),formateDate.toString(),input.iterator().next()));
        }
    }
}


2. 2 恶意登陆监控
  • 需求:检测出两秒内 连续登陆失败的2次用用户

  • 实现思路:CEP编程 定义 2秒内连续登陆是吧失败两次的规则,并将流应用到改规则上 筛选出应用规则后的流





public class LoginCheck {

    public static void main(String[] args) throws Exception {

        // 1.定义环境
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);
        executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);


        DataStreamSource stringDataStreamSource = executionEnvironment.readTextFile("/Users/liangfangwei/IdeaProjects/flinkUserAnalays/data_file/LoginLog.csv");

        // 2.包装为对象

        KeyedStream keyedStream = stringDataStreamSource.map(line -> {
            String[] split = line.split(",");
            return new LoginInfo(split[0], split[2], Long.parseLong(split[3]));
        }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.seconds(3)) {

            @Override
            public long extractTimestamp(LoginInfo element) {
                return element.getTimeStamp() * 1000L;
            }
        }).keyBy("status");


        // 3.定义规则
        // 3.1 创建规则
        Pattern failPattern = Pattern.begin("loginFailEvent").where(new SimpleCondition() {
            @Override
            public boolean filter(LoginInfo value) throws Exception {
                return "fail".equals(value.getStatus());
            }
            // 连续三次登陆失败  consecutive 设置为严格近邻
        }).times(3).consecutive().within(Time.seconds(5));

        // 3.2 规则匹配到流上

        PatternStream pattern = CEP.pattern(keyedStream, failPattern);
        // 3.3 筛选数据
        SingleOutputStreamOperator selectStream = pattern.select(new PatternSelectFunction() {

             
            @Override
            public LoginFailInfo select(Map> pattern) throws Exception {
                List loginFailEvent = pattern.get("loginFailEvent");
                LoginInfo firstFail = loginFailEvent.get(0);
                String userId = firstFail.getUserId();
                LoginInfo lastFail = pattern.get("loginFailEvent").get(loginFailEvent.size()-1);
                Timestamp firstFailTimeStamp = new Timestamp(firstFail.getTimeStamp() * 1000L);
                Timestamp secondFailTimeStamp = new Timestamp(lastFail.getTimeStamp() * 1000L);
                return new LoginFailInfo(userId, firstFailTimeStamp.toString(), secondFailTimeStamp.toString(), "连续"+loginFailEvent.size()+"登陆失败");

            }
        });
        selectStream.print();
        executionEnvironment.execute();

    }


}

2. 3 订单支付失效监控
  • 需求:实时检测是15分钟内下单咩有支付的订单

  • 实现逻辑:

    • 定义CEP规则: 15分钟内有下单和支付的规则。
    • 匹配到流上
    • 从匹配的流上筛选出匹配的数据,并从map中解析出延迟数据和非延数据
      • 匹配的数据会封装到map中
      • 没匹配上的数据也会输出到map中
  • 代码





public class OrderCheck {
    private static final Logger logger = LoggerFactory.getLogger(OrderCheck.class);

    public static void main(String[] args) throws Exception {
           // 1.
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);
        executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStreamSource stringDataStreamSource = executionEnvironment.readTextFile("/Users/liangfangwei/IdeaProjects/flinkUserAnalays/data_file/OrderLog.csv");
        SingleOutputStreamOperator objectSingleOutputStreamOperator = stringDataStreamSource.map(line -> {
            String[] split = line.split(",");
            return new OrderInfo(split[0], split[1], split[2], Long.parseLong(split[3]));
        }).assignTimestampsAndWatermarks(new AscendingTimestampExtractor() {

            @Override
            public long extractAscendingTimestamp(OrderInfo element) {
                return element.getTimeStamp()*1000L;
            }
        });


        // 2  定义规则

        Pattern orderPayPattern = Pattern.begin("create").where(new SimpleCondition() {
            @Override
            public boolean filter(OrderInfo value) throws Exception {
                return "create".equals(value.getStatus());
            }
        }).followedBy("pay").where(new SimpleCondition() {

            @Override
            public boolean filter(OrderInfo value) throws Exception {
                return "pay".equals(value.getStatus());
            }
    }).within(Time.minutes(15));
        // 3. 匹配模式
        PatternStream orderStream = CEP.pattern(objectSingleOutputStreamOperator.keyBy("orderId"), orderPayPattern);
        OutputTag outputTag = new OutputTag("timeoutStream") {
        };
        // 4. 筛选输出匹配上和超时事件
        SingleOutputStreamOperator resultStream = orderStream.select(outputTag, new OrderTimeoutSelect(), new OrderPaySelect());
        resultStream.print("payed normally");
        resultStream.getSideOutput(outputTag).print("timeout");

        executionEnvironment.execute("order timeout detect job");
    }

    
    public static class OrderTimeoutSelect implements PatternTimeoutFunction{
    @Override
    public OrderTimeoutInfo timeout(Map> pattern, long timeoutTimestamp) throws Exception {
        logger.error("rrrr_locker: get locker fail: key={}", pattern.toString());
        OrderInfo OrderInfo = pattern.get("create").get(0);

        return new OrderTimeoutInfo(OrderInfo.getOrderId(),"timeout"+timeoutTimestamp);
    }



    }

    public  static  class  OrderPaySelect implements PatternSelectFunction{
        @Override
        public OrderTimeoutInfo select(Map> pattern) throws Exception {
            OrderInfo OrderInfo = pattern.get("pay").get(0);

            return new OrderTimeoutInfo(OrderInfo.getOrderId(),"pay");
        }
    }

}

2. 4 支付实时对账
  • 双流join

  • 代码

    
    
    
    
    public class OrderPay {
        private final static OutputTag unmatchedPays = new OutputTag("unmatchedPays") {
        };
    
    
        private final static OutputTag unmatchedReceipts = new OutputTag("unmatchedReceipts") {
        };
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    
            //1. 支付数据
            DataStreamSource inputSteam1 = env.readTextFile("/Users/liangfangwei/IdeaProjects/flinkUserAnalays/data_file/OrderLog.csv");
            SingleOutputStreamOperator orderStream = inputSteam1.map(line -> {
                String[] split = line.split(",");
                return new OrderInfo(split[0], split[1], split[2], Long.parseLong(split[3]));
            }).assignTimestampsAndWatermarks(new AscendingTimestampExtractor() {
    
                @Override
                public long extractAscendingTimestamp(OrderInfo element) {
                    return element.getTimeStamp() * 1000L;
                }
            });
            // 2.入账数据
            DataStreamSource inputStream2 = env.readTextFile("/Users/liangfangwei/IdeaProjects/flinkUserAnalays/data_file/OrderLog.csv");
            SingleOutputStreamOperator payStream = inputStream2.map(line -> {
                String[] split = line.split(",");
                return new Receipt(split[0], split[1], Long.parseLong(split[2]));
            }).assignTimestampsAndWatermarks(new AscendingTimestampExtractor() {
    
                @Override
                public long extractAscendingTimestamp(Receipt element) {
                    return element.getTimeStamp() * 1000L;
                }
            });
            // 3.双里join
            SingleOutputStreamOperator> resultStream = orderStream.keyBy("payId").connect(payStream.keyBy("payId")).process(new DoubleStreamJoinProcess());
    
    
            // 4.如果join上返回
            resultStream.print("matched");
            resultStream.getSideOutput(unmatchedPays).print("unmatchedPays");
        }
    
    
        public static class DoubleStreamJoinProcess extends CoProcessFunction> {
    
            ValueState payState;
            ValueState receiptState;
    
            @Override
            public void open(Configuration parameters) throws Exception {
                payState = getRuntimeContext().getState(new ValueStateDescriptor("pay", OrderInfo.class));
                receiptState = getRuntimeContext().getState(new ValueStateDescriptor("receipt", Receipt.class));
    
            }
    
    
            @Override
            public void processElement1(OrderInfo orderInfo, Context ctx, Collector> out) throws Exception {
    
                Receipt receipt = receiptState.value();
                //  取出流2
                if (receipt != null) {
                    out.collect(new Tuple2<>(orderInfo, receipt));
                    receiptState.clear();
                } else {
                    payState.update(orderInfo);
                    ctx.timerService().registerEventTimeTimer(orderInfo.getTimeStamp() * 1000L + 5000L);
    
                }
            }
    
            @Override
            public void processElement2(Receipt receipt, Context ctx, Collector> out) throws Exception {
    
    
                // 取出流1
                OrderInfo orderInfo = payState.value();
                if (orderInfo != null) {
                    out.collect(new Tuple2<>(orderInfo, receipt));
                    payState.clear();
                } else {
                    receiptState.update(receipt);
                    ctx.timerService().registerEventTimeTimer(receipt.getTimeStamp() * 1000L + 5000L);
    
    
                }
            }
    
            @Override
            public void onTimer(long timestamp, OnTimerContext ctx, Collector> out) throws Exception {
                if (payState.value() != null) {
                    ctx.output(unmatchedPays, payState.value());
                }
                if (receiptState.value() != null) {
                    ctx.output(unmatchedReceipts, receiptState.value());
                }
                payState.clear();
                receiptState.clear();
    
                super.onTimer(timestamp, ctx, out);
            }
        }
    
    }
    
    
3. 项目地址

项目地址欢迎大家来踩踩踩

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/689605.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号