栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Flink实时仓库-DWD层(下单-多张表实现join操作)模板代码

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Flink实时仓库-DWD层(下单-多张表实现join操作)模板代码

简介

多张表的join操作

工具类 KafkaUtil
public class KafkaUtil {
    private final static String BOOTSTRAP_SERVERS="master:9092";

    
    public static String getKafkaDDL(String topic, String groupId) {

        return " with ('connector' = 'kafka', " +
                " 'topic' = '" + topic + "'," +
                " 'properties.bootstrap.servers' = '" + BOOTSTRAP_SERVERS + "', " +
                " 'properties.group.id' = '" + groupId + "', " +
                " 'format' = 'json', " +
                " 'scan.startup.mode' = 'group-offsets')";
    }

    
    public static String getUpsertKafkaDDL(String topic) {

        return "WITH ( " +
                "  'connector' = 'upsert-kafka', " +
                "  'topic' = '" + topic + "', " +
                "  'properties.bootstrap.servers' = '" + BOOTSTRAP_SERVERS + "', " +
                "  'key.format' = 'json', " +
                "  'value.format' = 'json' " +
                ")";
    }
}
MysqlUtils  
public class MysqlUtils {
    public static String getBaseDicLookUpDDL() {

        return "create table `base_dic`( " +
                "`dic_code` string, " +
                "`dic_name` string, " +
                "`parent_code` string, " +
                "`create_time` timestamp, " +
                "`operate_time` timestamp, " +
                "primary key(`dic_code`) not enforced " +
                ")" + MysqlUtils.mysqlLookUpTableDDL("base_dic");
    }

    public static String mysqlLookUpTableDDL(String tableName) {

        String ddl = "WITH ( " +
                "'connector' = 'jdbc', " +
                "'url' = 'jdbc:mysql://hadoop102:3306/gmall', " +
                "'table-name' = '" + tableName + "', " +
                "'lookup.cache.max-rows' = '10', " +
                "'lookup.cache.ttl' = '1 hour', " +
                "'username' = 'root', " +
                "'password' = '000000', " +
                "'driver' = 'com.mysql.cj.jdbc.Driver' " +
                ")";
        return ddl;
    }
}
代码实现例子
public class DwdTradeOrderPreProcess {
    public static void main(String[] args) throws Exception {

        // TODO 1. 环境准备
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // TODO 2. 启用状态后端
//        env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
//        env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);
//        env.getCheckpointConfig().enableExternalizedCheckpoints(
//                CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
//        );
//        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
//        env.setRestartStrategy(
//                RestartStrategies.failureRateRestart(3, Time.days(1L), Time.minutes(3L))
//        );
//        env.setStateBackend(new HashMapStateBackend());
//        env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/ck");
//        System.setProperty("HADOOP_USER_NAME", "atguigu");

        tableEnv.getConfig().setLocalTimeZone(ZoneId.of("GMT+8"));
        //设置过期的天数,根据业务制定
        tableEnv.getConfig().setIdleStateRetention(Duration.ofDays(3));

        // TODO 3. 从 Kafka 读取业务数据,封装为 Flink SQL 表
        tableEnv.executeSql("create table topic_db(" +
                "`database` String, " +
                "`table` String, " +
                "`type` String, " +
                "`data` map, " +
                "`old` map, " +
                "`proc_time` as PROCTIME(), " +
                "`ts` string " +
                ")" + KafkaUtil.getKafkaDDL("topic_db", "dwd_trade_order_pre_process"));

        // TODO 4. 读取订单明细表数据
        Table orderDetail = tableEnv.sqlQuery("select  " +
                "data['id'] id, " +
                "data['order_id'] order_id, " +
                "data['sku_id'] sku_id, " +
                "data['sku_name'] sku_name, " +
                "data['create_time'] create_time, " +
                "data['source_id'] source_id, " +
                "data['source_type'] source_type, " +
                "data['sku_num'] sku_num, " +
                "cast(cast(data['sku_num'] as decimal(16,2)) * " +
                "cast(data['order_price'] as decimal(16,2)) as String) split_original_amount, " +
                "data['split_total_amount'] split_total_amount, " +
                "data['split_activity_amount'] split_activity_amount, " +
                "data['split_coupon_amount'] split_coupon_amount, " +
                "ts od_ts, " +
                "proc_time " +
                "from `topic_db` where `table` = 'order_detail' " +
                "and `type` = 'insert' ");
        tableEnv.createTemporaryView("order_detail", orderDetail);

        // TODO 5. 读取订单表数据
        Table orderInfo = tableEnv.sqlQuery("select  " +
                "data['id'] id, " +
                "data['user_id'] user_id, " +
                "data['province_id'] province_id, " +
                "data['operate_time'] operate_time, " +
                "data['order_status'] order_status, " +
                "`type`, " +
                "`old`, " +
                "ts oi_ts " +
                "from `topic_db` " +
                "where `table` = 'order_info' " +
                "and (`type` = 'insert' or `type` = 'update')");
        tableEnv.createTemporaryView("order_info", orderInfo);

        // TODO 6. 读取订单明细活动关联表数据
        Table orderDetailActivity = tableEnv.sqlQuery("select  " +
                "data['order_detail_id'] order_detail_id, " +
                "data['activity_id'] activity_id, " +
                "data['activity_rule_id'] activity_rule_id " +
                "from `topic_db` " +
                "where `table` = 'order_detail_activity' " +
                "and `type` = 'insert' ");
        tableEnv.createTemporaryView("order_detail_activity", orderDetailActivity);

        // TODO 7. 读取订单明细优惠券关联表数据
        Table orderDetailCoupon = tableEnv.sqlQuery("select " +
                "data['order_detail_id'] order_detail_id, " +
                "data['coupon_id'] coupon_id " +
                "from `topic_db` " +
                "where `table` = 'order_detail_coupon' " +
                "and `type` = 'insert' ");
        tableEnv.createTemporaryView("order_detail_coupon", orderDetailCoupon);

        // TODO 8. 建立 MySQL-LookUp 字典表
        tableEnv.executeSql(MysqlUtils.getBaseDicLookUpDDL());

        // TODO 9. 关联五张表获得订单明细表
        Table resultTable = tableEnv.sqlQuery("select  " +
                "od.id, " +
                "od.order_id, " +
                "oi.user_id, " +
                "oi.order_status, " +
                "od.sku_id, " +
                "od.sku_name, " +
                "oi.province_id, " +
                "act.activity_id, " +
                "act.activity_rule_id, " +
                "cou.coupon_id, " +
                "date_format(od.create_time, 'yyyy-MM-dd') date_id, " +
                "od.create_time, " +
                "date_format(oi.operate_time, 'yyyy-MM-dd') operate_date_id, " +
                "oi.operate_time, " +
                "od.source_id, " +
                "od.source_type, " +
                "dic.dic_name source_type_name, " +
                "od.sku_num, " +
                "od.split_original_amount, " +
                "od.split_activity_amount, " +
                "od.split_coupon_amount, " +
                "od.split_total_amount, " +
                "oi.`type`, " +
                "oi.`old`, " +
                "od.od_ts, " +
                "oi.oi_ts, " +
                "current_row_timestamp() row_op_ts " +
                "from order_detail od  " +
                "join order_info oi " +
                "on od.order_id = oi.id " +
                "left join order_detail_activity act " +
                "on od.id = act.order_detail_id " +
                "left join order_detail_coupon cou " +
                "on od.id = cou.order_detail_id " +
                "left join `base_dic` for system_time as of od.proc_time as dic " +
                "on od.source_type = dic.dic_code");
        tableEnv.createTemporaryView("result_table", resultTable);

        // TODO 10. 建立 Upsert-Kafka dwd_trade_order_pre_process 表
        tableEnv.executeSql("" +
                "create table dwd_trade_order_pre_process( " +
                "id string, " +
                "order_id string, " +
                "user_id string, " +
                "order_status string, " +
                "sku_id string, " +
                "sku_name string, " +
                "province_id string, " +
                "activity_id string, " +
                "activity_rule_id string, " +
                "coupon_id string, " +
                "date_id string, " +
                "create_time string, " +
                "operate_date_id string, " +
                "operate_time string, " +
                "source_id string, " +
                "source_type string, " +
                "source_type_name string, " +
                "sku_num string, " +
                "split_original_amount string, " +
                "split_activity_amount string, " +
                "split_coupon_amount string, " +
                "split_total_amount string, " +
                "`type` string, " +
                "`old` map, " +
                "od_ts string, " +
                "oi_ts string, " +
                "row_op_ts timestamp_ltz(3), " +
                "primary key(id) not enforced " +
                ")" + KafkaUtil.getUpsertKafkaDDL("dwd_trade_order_pre_process"));

        // TODO 11. 将关联结果写入 Upsert-Kafka 表
        tableEnv.executeSql("" +
                        "insert into dwd_trade_order_pre_process  " +
                        "select * from result_table")
                .print();

        env.execute();
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/1032861.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号