栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

8.Flink实时项目之CEP计算访客跳出

8.Flink实时项目之CEP计算访客跳出

1.访客跳出明细介绍

首先要识别哪些是跳出行为,要把这些跳出的访客最后一个访问的页面识别出来。那么就要抓住几个特征:

该页面是用户近期访问的第一个页面,这个可以通过该页面是否有上一个页面(last_page_id)来判断,如果这个表示为空,就说明这是这个访客这次访问的第一个页面。

首次访问之后很长一段时间(自己设定),用户没继续再有其他页面的访问。

这第一个特征的识别很简单,保留 last_page_id 为空的就可以了。但是第二个访问的判断,其实有点麻烦,首先这不是用一条数据就能得出结论的,需要组合判断,要用一条存在的数据和不存在的数据进行组合判断。而且要通过一个不存在的数据求得一条存在的数据。更麻烦的他并不是永远不存在,而是在一定时间范围内不存在。那么如何识别有一定失效的组合行为呢?

最简单的办法就是 Flink 自带的 CEP 技术。这个 CEP 非常适合通过多条数据组合来识别某个事件。

用户跳出事件,本质上就是一个条件事件加一个超时事件的组合。

流程图

2.代码实现

创建任务类UserJumpDetailApp.java,从kafka读取页面日志

import com.zhangbao.gmall.realtime.utils.MyKafkaUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;


public class UserJumpDetailApp {
    public static void main(String[] args) {
        //webui模式,需要添加pom依赖
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
//        StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createLocalEnvironment();
        //设置并行度
        env.setParallelism(4);
        //设置检查点
//        env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
//        env.getCheckpointConfig().setCheckpointTimeout(60000);
//        env.setStateBackend(new FsStateBackend("hdfs://hadoop101:9000/gmall/flink/checkpoint/userJumpDetail"));
//        //指定哪个用户读取hdfs文件
//        System.setProperty("HADOOP_USER_NAME","zhangbao");

        //从kafka读取数据源
        String sourceTopic = "dwd_page_log";
        String group = "user_jump_detail_app_group";
        String sinkTopic = "dwm_user_jump_detail";
        FlinkKafkaConsumer kafkaSource = MyKafkaUtil.getKafkaSource(sourceTopic, group);
        DataStreamSource kafkaDs = env.addSource(kafkaSource);

        kafkaDs.print("user jump detail >>>");

        try {
            env.execute("user jump detail task");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
3. flink CEP编程

官方文档:https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/libs/cep.html

处理流程

1.从kafka读取日志数据

2.设定时间语义为事件时间并指定事件时间字段ts

3.按照mid分组

4.配置CEP表达式

1.第一次访问的页面:last_page_id == null

2.第一次访问的页面在10秒内,没有进行其他操作,没有访问其他页面

5.根据表达式筛选流

6.提取命中的数据

设定超时时间标识 timeoutTagflatSelect 方法中,实现 PatternFlatTimeoutFunction 中的 timeout 方法。所有 out.collect 的数据都被打上了超时标记本身的 flatSelect 方法因为不需要未超时的数据所以不接受数据。通过 SideOutput 侧输出流输出超时数据

7.将跳出数据写回到kafka

package com.zhangbao.gmall.realtime.app.dwm;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.zhangbao.gmall.realtime.utils.MyKafkaUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternFlatSelectFunction;
import org.apache.flink.cep.PatternFlatTimeoutFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.util.List;
import java.util.Map;


public class UserJumpDetailApp {
    public static void main(String[] args) {
        //webui模式,需要添加pom依赖
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
//        StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createLocalEnvironment();
        //设置并行度
        env.setParallelism(4);
        //设置检查点
//        env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
//        env.getCheckpointConfig().setCheckpointTimeout(60000);
//        env.setStateBackend(new FsStateBackend("hdfs://hadoop101:9000/gmall/flink/checkpoint/userJumpDetail"));
//        //指定哪个用户读取hdfs文件
//        System.setProperty("HADOOP_USER_NAME","zhangbao");

        //从kafka读取数据源
        String sourceTopic = "dwd_page_log";
        String group = "user_jump_detail_app_group";
        String sinkTopic = "dwm_user_jump_detail";
        FlinkKafkaConsumer kafkaSource = MyKafkaUtil.getKafkaSource(sourceTopic, group);
        DataStreamSource jsonStrDs = env.addSource(kafkaSource);

        

        //对读取到的数据进行结构转换
        SingleOutputStreamOperator jsonObjDs = jsonStrDs.map(jsonStr -> JSON.parseObject(jsonStr));

//        jsonStrDs.print("user jump detail >>>");
        //从flink1.12开始,时间语义默认是事件时间,不需要额外指定,如果是之前的版本,则要按以下方式指定事件时间语义
        //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        //指定事件时间字段
        SingleOutputStreamOperator jsonObjWithTSDs = jsonObjDs.assignTimestampsAndWatermarks(
                WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner(
                        new SerializableTimestampAssigner() {
                            @Override
                            public long extractTimestamp(JSonObject jsonObject, long l) {
                                return jsonObject.getLong("ts");
                            }
                        }
        ));

        //按照mid分组
        KeyedStream ketByDs = jsonObjWithTSDs.keyBy(
                jsonObject -> jsonObject.getJSONObject("common").getString("mid")
        );

        
        Pattern pattern = Pattern.begin("first")
                .where( // 1.第一次访问的页面:last_page_id == null
                    new SimpleCondition() {
                        @Override
                        public boolean filter(JSonObject jsonObject) throws Exception {
                            String lastPageId = jsonObject.getJSONObject("page").getString("last_page_id");
                            System.out.println("first page >>> "+lastPageId);
                            if (lastPageId == null || lastPageId.length() == 0) {
                                return true;
                            }
                            return false;
                        }
                    }
                ).next("next")
                .where( //2.第一次访问的页面在10秒内,没有进行其他操作,没有访问其他页面
                        new SimpleCondition() {
                            @Override
                            public boolean filter(JSonObject jsonObject) throws Exception {
                                String pageId = jsonObject.getJSONObject("page").getString("page_id");
                                System.out.println("next page >>> "+pageId);
                                if(pageId != null && pageId.length()>0){
                                    return true;
                                }
                                return false;
                            }
                        }
                //时间限制模式,10S
                ).within(Time.milliseconds(10000));

        //将cep表达式运用到流中,筛选数据
        PatternStream patternStream = CEP.pattern(ketByDs, pattern);

        //从筛选的数据中再提取数据超时数据,放到侧输出流中
        OutputTag timeOutTag = new OutputTag("timeOut"){};
        SingleOutputStreamOperator outputStreamDS = patternStream.flatSelect(
                timeOutTag,
                //获取超时数据
                new PatternFlatTimeoutFunction() {
                    @Override
                    public void timeout(Map> map, long l, Collector collector) throws Exception {
                        List first = map.get("first");
                        for (JSonObject jsonObject : first) {
                            System.out.println("time out date >>> "+jsonObject.toJSONString());
                            //所有 out.collect 的数据都被打上了超时标记
                            collector.collect(jsonObject.toJSONString());
                        }
                    }
                },
                //获取未超时数据
                new PatternFlatSelectFunction() {
                    @Override
                    public void flatSelect(Map> map, Collector collector) throws Exception {
                        //不超时的数据不提取,所以这里不做操作
                    }
                }
        );

        //获取侧输出流的超时数据
        DataStream timeOutDs = outputStreamDS.getSideOutput(timeOutTag);
        timeOutDs.print("jump >>> ");
        
        //将跳出数据写回到kafka
        timeOutDs.addSink(MyKafkaUtil.getKafkaSink(sinkTopic));

        try {
            env.execute("user jump detail task");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

 

测试数据

将从kafka读取数据的方式切换成固定数据内容,如下:

//测试数据
        DataStream jsonStrDs = env
         .fromElements(
                "{"common":{"mid":"101"},"page":{"page_id":"home"},"ts":10000} ",
                "{"common":{"mid":"102"},"page":{"page_id":"home"},"ts":12000}",

                "{"common":{"mid":"102"},"page":{"page_id":"good_list","last_page_id":" +
                        ""home"},"ts":15000} ",

                "{"common":{"mid":"102"},"page":{"page_id":"good_list","last_page_id":" +
                        ""detail"},"ts":30000} "
        );
        dataStream.print("in json:");

然后从dwm_user_jump_detail主题消费数据

./kafka-console-consumer.sh --bootstrap-server hadoop101:9092,hadoop102:9092,hadoop103:9092 --topic dwm_user_jump_detail

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号