栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

FLINK-SQL

FLINK-SQL

关联MySQL中的维度表示例

public class MySQLDimensionDemo {

    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);

        //使用KafkaConnector从Kafka中读取数据
        tableEnvironment.executeSql("CREATE TABLE KafkaTable (n" +
                "  `oid` STRING,    n" +
                "  `cid` STRING,    n" +
                "  `money` DOUBLE,   n" +
                "  `ts` TIMESTAMP(3) metaDATA FROM 'timestamp',n" +
                "  proctime as PROCTIME(), --通过计算列产生一个处理时间列n" +
                "  eventTime as ts, -- 事件时间n" +
                "  WATERMARK FOR eventTime as eventTime - INTERVAL '5' SECOND  -- 在eventTime上定义watermarkn" +
                ") WITH (n" +
                "  'connector' = 'kafka',n" +
                "  'topic' = 'order-csv',n" +
                "  'properties.bootstrap.servers' = 'linux01:9092',n" +
                "  'properties.group.id' = 'testGroup',n" +
                "  'scan.startup.mode' = 'earliest-offset',n" +
                "  'format' = 'csv',n" +
                "  'csv.ignore-parse-errors' = 'true'n" +
                ")");

        //使用JDBCConnector缓存维度数据
        tableEnvironment.executeSql("CREATE TABLE tb_mysql_category (n" +
                "  id STRING,n" +
                "  name STRING,n" +
                "  PRIMARY KEY (id) NOT ENFORCEDn" +
                ") WITH (n" +
                "   'connector' = 'jdbc',n" +
                "   'url' = 'jdbc:mysql://localhost:3306/flink',n" +
                "   'username' = 'root',n" +
                "   'password' = 'root',n" +
                "   'lookup.cache.max-rows' = '5000',n" + //最大缓存5000条数据
                "   'lookup.cache.ttl' = '10min',n" +     //维度数据最大的存活时间
                "   'table-name' = 'tb_category'n" +
                ")n");

        //使用JDBCConnector将关联后的数据写入MySQL

        tableEnvironment.executeSql("CREATE TABLE tb_mysql_orders (n" +
                "  oid STRING,n" +
                "  cid STRING,n" +
                "  cname STRING,n" +
                "  money DOUBLE,n" +
                "  PRIMARY KEY (oid) NOT ENFORCEDn" +
                ") WITH (n" +
                "   'connector' = 'jdbc',n" +
                "   'url' = 'jdbc:mysql://localhost:3306/flink',n" +
                "   'username' = 'root',n" +
                "   'password' = 'jin19930213',n" +
                "   'table-name' = 'tb_orders03'n" +
                ")n");

        tableEnvironment.executeSql("INSERT INTO tb_mysql_orders " +
                "SELECt k.oid,k.cid,t.name as cname,k.money FROM KafkaTable as kn" +
                "LEFT JOIN tb_mysql_category FOR SYSTEM_TIME AS OF k.proctime as tn" +
                "ON k.cid= t.id");
    }
}

获取Kafka中元数据示例

public class KafkaConnectormetadataDemo {

    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);

        tableEnvironment.executeSql("CREATE TABLE KafkaTable (n" +
                "  `user_id` BIGINT,n" +
                "  `item_id` BIGINT,n" +
                "  `behavior` STRING,n" +
                "  `event_time` TIMESTAMP(3) metaDATA FROM 'timestamp',n" +
                "  `topic`  STRING metaDATA VIRTUAL,   n" +
                "  `partition` BIGINT metaDATA VIRTUAL,n" +
                "  `offset` BIGINT metaDATA VIRTUAL   n" +
                ") WITH (n" +
                "  'connector' = 'kafka',n" +
                "  'topic' = 'user_behavior',n" +
                "  'properties.bootstrap.servers' = 'linux01:9092',n" +
                "  'properties.group.id' = 'testGroup',n" +
                "  'scan.startup.mode' = 'earliest-offset',n" +
                "  'format' = 'csv',n" +
                "   'csv.ignore-parse-errors' = 'true'n" +
                ")");


        tableEnvironment.executeSql("CREATE TABLE print_table (n" +
                "  `id` STRING,n" +
                "  `user_id` BIGINT,n" +
                "  `item_id` BIGINT,n" +
                "  `behavior` STRINGn" +
                ") WITH (n" +
                "  'connector' = 'print'n" +
                ")");

        tableEnvironment.executeSql("INSERT INTO print_table " +
                "SELECt concat_ws('-',topic,CAST(`partition` AS STRING),CAST(`offset` AS STRING),DATE_FORMAT(event_time, 'yyyy-MM-dd')),user_id,item_id,behavior " +
                "FROM KafkaTable");

        

    }
}

写入MySQL的两种方式 (追加,覆盖)

public class KafkaToMySQLDemo01 {

    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);

        //使用KafkaConnector从Kafka中读取数据
        tableEnvironment.executeSql("CREATE TABLE tb_user_order (n" +
                "  `oid` STRING,   n" +
                "  `cid` STRING,   n" +
                "  `money` DOUBLE  n" +
                ") WITH (n" +
                "  'connector' = 'kafka',n" +
                "  'topic' = 'order-json',n" +
                "  'properties.bootstrap.servers' = 'linux01:9092,linux01:9092',n" +
                "  'properties.group.id' = 'testGroup',n" +
                "  'scan.startup.mode' = 'earliest-offset',n" +
                "  'format' = 'json',n" +
                "  'json.ignore-parse-errors' = 'true' n" +   //忽略格式不良好的json
                ")");

        //使用JDBCConnector向MySQL中插入数据
        tableEnvironment.executeSql("CREATE TABLE tb_mysql_orders (n" +
                "  `oid` STRING,   n" +
                "  `cid` STRING,   n" +
                "  `money` DOUBLE  n" +
                ") WITH (n" +
                "   'connector' = 'jdbc',n" +
                "   'url' = 'jdbc:mysql://localhost:3306/flink',n" +
                "   'username' = 'root',n" +
                "   'password' = 'root',n" +
                "   'table-name' = 'tb_orders'n" +
                ")");

        //将数据插入MySQL中
        tableEnvironment.executeSql("INSERT INTO tb_mysql_orders " +
                "SELECt oid,cid,money from tb_user_order");


    }
}
public class KafkaToMySQLDemo02 {

    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);

        //使用KafkaConnector从Kafka中读取数据
        tableEnvironment.executeSql("CREATE TABLE tb_user_order (n" +
                "  `oid` STRING,   n" +
                "  `cid` STRING,   n" +
                "  `money` DOUBLE  n" +
                ") WITH (n" +
                "  'connector' = 'kafka',n" +
                "  'topic' = 'order-json02',n" +
                "  'properties.bootstrap.servers' = 'linux01:9092,linux01:9092',n" +
                "  'properties.group.id' = 'testGroup',n" +
                "  'scan.startup.mode' = 'earliest-offset',n" +
                "  'format' = 'json',n" +
                "  'json.ignore-parse-errors' = 'true' n" +   //忽略格式不良好的json
                ")");

        //使用JDBCConnector向MySQL中插入数据
        tableEnvironment.executeSql("CREATE TABLE tb_mysql_orders (n" +
                "  `oid` STRING,   n" +
                "  `cid` STRING,   n" +
                "  `money` DOUBLE,  n" +
                "   PRIMARY KEY (oid) NOT ENFORCED"+
                ") WITH (n" +
                "   'connector' = 'jdbc',n" +
                "   'url' = 'jdbc:mysql://localhost:3306/flink',n" +
                "   'username' = 'root',n" +
                "   'password' = 'root',n" +
                "   'table-name' = 'tb_orders02'n" +
                ")");

        //将数据插入MySQL中
        tableEnvironment.executeSql("INSERT INTO tb_mysql_orders " +
                "SELECt oid,cid,money from tb_user_order");


    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/752436.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号