栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

实时数仓(一)行为数据ods到dwd层

实时数仓(一)行为数据ods到dwd层

行为数据ods到dwd层

采集的日志数据已经保存到 Kafka 中,作为日志数据的 ODS 层,从 Kafka 的ODS 层读取的日志数据分为 3 类, 页面日志、启动日志和曝光日志。这三类数据虽然都是用户行为数据,但是有着完全不一样的数据结构,所以要拆分处理。将拆分后的不同的日志写回 Kafka 不同主题中,作为日志 DWD 层。

(1) 识别新老用户工具类

本身客户端业务有新老用户的标识,但是不够准确,需要用实时计算再次确认。

package com.yyds.utils;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.util.Properties;

public class MyKafkaUtils {


    private static String brokers = "centos01:9092,centos02:9092,centos03:9092";

    public static FlinkKafkaProducer getKafkaProducer(String topic){
        return new FlinkKafkaProducer(
                brokers,
          topic,
          new SimpleStringSchema()
        );
    }


    public static FlinkKafkaConsumer getKafkaConsumer(String topic,String groupId){
        Properties properties = new Properties();

        properties.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,brokers);

        return new FlinkKafkaConsumer(
                topic,
                new SimpleStringSchema(),
                properties
        );
    }
}

(2) 代码实现
package com.yyds.app.dwd;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.yyds.utils.MyKafkaUtils;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

public class baseLogApp {
    public static void main(String[] args) throws Exception {
        //TODO 1、获取执行环境
        System.setProperty("HADOOP_USER_NAME","root");

        // 获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 开启 Checkpoint,每隔 5 秒钟做一次 Checkpoint
        env.enableCheckpointing(5000L);
        //指定 CK 的一致性语义
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // 设置超时时间
        //env.getCheckpointConfig().setAlignmentTimeout(10000L);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
        // 重启策略
        //env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,5000L));
        //设置任务关闭的时候保留最后一次 CK 数据
        env.getCheckpointConfig().enableExternalizedCheckpoints(
                CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
        );
        // 设置状态后端
        env.setStateBackend(new FsStateBackend("hdfs://centos01:8020/flinkCDC/ck"));


        //TODO 2、消费ods_base_log  主题数据
        String sourceTopic = "ods_base_log";
        String groupId = "base_log_app_2022";
        FlinkKafkaConsumer kafkaConsumer = MyKafkaUtils.getKafkaConsumer(sourceTopic, groupId);
        DataStreamSource kafkaDS = env.addSource(kafkaConsumer);

        //TODO 3、将每行数据转换为JSON对象
        OutputTag outputTag = new OutputTag("dirty") {

        };
        SingleOutputStreamOperator jsonObjDS = kafkaDS.process(new ProcessFunction() {
            @Override
            public void processElement(String value, Context ctx, Collector out) throws Exception {
                try {
                    JSONObject jsonObject = JSON.parseObject(value);
                    out.collect(jsonObject);
                } catch (Exception e) {
                    // TODO 统计脏数据的信息,一般为千分之1 到 千分之3
                    // 写入侧输出流
                    ctx.output(outputTag,value);
                }
            }
        });

        jsonObjDS.getSideOutput(outputTag).print("dirty>>>>>>>>>>>>>>>>>>>>>>");

        //TODO 4、新老用户校验  状态编程
        // 按照mid进行分组
        SingleOutputStreamOperator jsonObjWithNewTag = jsonObjDS
                .keyBy(jsonObj -> jsonObj.getJSONObject("common").getString("mid"))
                .map(new RichMapFunction() {
                    private ValueState valueState;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        ValueState valueState = getRuntimeContext()
                                .getState(new ValueStateDescriptor("value-state", String.class));
                    }

                    @Override
                    public JSONObject map(JSONObject value) throws Exception {
                        // 获取数据中的is_new标记
                        String isNew = value.getJSONObject("common").getString("is_new");

                        // 判断isNew是否为1
                        if ("1".equals(isNew)) {
                            String state = valueState.value();
                            if (state != null) {
                                // 修改isNew标记
                                value.getJSONObject("common").put("is_new", "0");
                                return value;
                            } else {
                                // 没有来过
                                valueState.update("1");
                                return value;
                            }
                        } else {
                            return value;
                        }
                    }
                });


        //TODO 5、分流处理   侧输出流    页面为主流  启动和曝光为侧输出流
        OutputTag stringOutputTag = new OutputTag("start") {
        };
        OutputTag displayTag = new OutputTag("display") {
        };
        SingleOutputStreamOperator pageDS = jsonObjWithNewTag.process(new ProcessFunction() {
            @Override
            public void processElement(JSONObject value, Context ctx, Collector out) throws Exception {
                // 启动
                String start = value.getString("start");
                if (start != null && start.length() > 0) {
                    // 将数据写入到启动日志侧输出流
                    ctx.output(stringOutputTag, value.toString());
                } else {
                    // 页面为主流
                    out.collect(value.toString());

                    // 曝光数据
                    JSONArray displays = value.getJSONArray("displays");

                    if (displays != null && displays.size() > 0) {
                        // 获取页面Id
                        String pageId = value.getJSONObject("page").getString("page_id");

                        for (int i = 0; i < displays.size(); i++) {
                            JSONObject display = displays.getJSONObject(i);
                            display.put("page_id", pageId);
                            // 将输出写入到曝光侧输出流
                            ctx.output(displayTag, display.toString());
                        }
                    }
                }
            }
        });



        //TODO 6、提取侧输出流
        DataStream startDS = pageDS.getSideOutput(stringOutputTag);
        DataStream displayDS = pageDS.getSideOutput(displayTag);


        //TODO 7、将三个流进行打印并输出到对应的kafka主题中
        startDS.print("start>>>>>>>>>>>>>>>>>>>>>>>>>>");
        displayDS.print("displayDS>>>>>>>>>>>>>>>>>>>>>>>>>>");
        pageDS.print("pageDS>>>>>>>>>>>>>>>>>>>>>>>>>>");

        startDS.addSink(MyKafkaUtils.getKafkaProducer("dwd_start_log"));
        displayDS.addSink(MyKafkaUtils.getKafkaProducer("dwd_display_log"));
        pageDS.addSink(MyKafkaUtils.getKafkaProducer("dwd_page_log"));


        //TODO 8、启动任务
        env.execute("baseLogApp");
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/774279.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号