关联MySQL中的维度表示例
public class MySQLDimensionDemo {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
//使用KafkaConnector从Kafka中读取数据
tableEnvironment.executeSql("CREATE TABLE KafkaTable (n" +
" `oid` STRING, n" +
" `cid` STRING, n" +
" `money` DOUBLE, n" +
" `ts` TIMESTAMP(3) metaDATA FROM 'timestamp',n" +
" proctime as PROCTIME(), --通过计算列产生一个处理时间列n" +
" eventTime as ts, -- 事件时间n" +
" WATERMARK FOR eventTime as eventTime - INTERVAL '5' SECOND -- 在eventTime上定义watermarkn" +
") WITH (n" +
" 'connector' = 'kafka',n" +
" 'topic' = 'order-csv',n" +
" 'properties.bootstrap.servers' = 'linux01:9092',n" +
" 'properties.group.id' = 'testGroup',n" +
" 'scan.startup.mode' = 'earliest-offset',n" +
" 'format' = 'csv',n" +
" 'csv.ignore-parse-errors' = 'true'n" +
")");
//使用JDBCConnector缓存维度数据
tableEnvironment.executeSql("CREATE TABLE tb_mysql_category (n" +
" id STRING,n" +
" name STRING,n" +
" PRIMARY KEY (id) NOT ENFORCEDn" +
") WITH (n" +
" 'connector' = 'jdbc',n" +
" 'url' = 'jdbc:mysql://localhost:3306/flink',n" +
" 'username' = 'root',n" +
" 'password' = 'root',n" +
" 'lookup.cache.max-rows' = '5000',n" + //最大缓存5000条数据
" 'lookup.cache.ttl' = '10min',n" + //维度数据最大的存活时间
" 'table-name' = 'tb_category'n" +
")n");
//使用JDBCConnector将关联后的数据写入MySQL
tableEnvironment.executeSql("CREATE TABLE tb_mysql_orders (n" +
" oid STRING,n" +
" cid STRING,n" +
" cname STRING,n" +
" money DOUBLE,n" +
" PRIMARY KEY (oid) NOT ENFORCEDn" +
") WITH (n" +
" 'connector' = 'jdbc',n" +
" 'url' = 'jdbc:mysql://localhost:3306/flink',n" +
" 'username' = 'root',n" +
" 'password' = 'jin19930213',n" +
" 'table-name' = 'tb_orders03'n" +
")n");
tableEnvironment.executeSql("INSERT INTO tb_mysql_orders " +
"SELECt k.oid,k.cid,t.name as cname,k.money FROM KafkaTable as kn" +
"LEFT JOIN tb_mysql_category FOR SYSTEM_TIME AS OF k.proctime as tn" +
"ON k.cid= t.id");
}
}
获取Kafka中元数据示例
public class KafkaConnectormetadataDemo {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
tableEnvironment.executeSql("CREATE TABLE KafkaTable (n" +
" `user_id` BIGINT,n" +
" `item_id` BIGINT,n" +
" `behavior` STRING,n" +
" `event_time` TIMESTAMP(3) metaDATA FROM 'timestamp',n" +
" `topic` STRING metaDATA VIRTUAL, n" +
" `partition` BIGINT metaDATA VIRTUAL,n" +
" `offset` BIGINT metaDATA VIRTUAL n" +
") WITH (n" +
" 'connector' = 'kafka',n" +
" 'topic' = 'user_behavior',n" +
" 'properties.bootstrap.servers' = 'linux01:9092',n" +
" 'properties.group.id' = 'testGroup',n" +
" 'scan.startup.mode' = 'earliest-offset',n" +
" 'format' = 'csv',n" +
" 'csv.ignore-parse-errors' = 'true'n" +
")");
tableEnvironment.executeSql("CREATE TABLE print_table (n" +
" `id` STRING,n" +
" `user_id` BIGINT,n" +
" `item_id` BIGINT,n" +
" `behavior` STRINGn" +
") WITH (n" +
" 'connector' = 'print'n" +
")");
tableEnvironment.executeSql("INSERT INTO print_table " +
"SELECt concat_ws('-',topic,CAST(`partition` AS STRING),CAST(`offset` AS STRING),DATE_FORMAT(event_time, 'yyyy-MM-dd')),user_id,item_id,behavior " +
"FROM KafkaTable");
}
}
写入MySQL的两种方式 (追加,覆盖)
public class KafkaToMySQLDemo01 {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
//使用KafkaConnector从Kafka中读取数据
tableEnvironment.executeSql("CREATE TABLE tb_user_order (n" +
" `oid` STRING, n" +
" `cid` STRING, n" +
" `money` DOUBLE n" +
") WITH (n" +
" 'connector' = 'kafka',n" +
" 'topic' = 'order-json',n" +
" 'properties.bootstrap.servers' = 'linux01:9092,linux01:9092',n" +
" 'properties.group.id' = 'testGroup',n" +
" 'scan.startup.mode' = 'earliest-offset',n" +
" 'format' = 'json',n" +
" 'json.ignore-parse-errors' = 'true' n" + //忽略格式不良好的json
")");
//使用JDBCConnector向MySQL中插入数据
tableEnvironment.executeSql("CREATE TABLE tb_mysql_orders (n" +
" `oid` STRING, n" +
" `cid` STRING, n" +
" `money` DOUBLE n" +
") WITH (n" +
" 'connector' = 'jdbc',n" +
" 'url' = 'jdbc:mysql://localhost:3306/flink',n" +
" 'username' = 'root',n" +
" 'password' = 'root',n" +
" 'table-name' = 'tb_orders'n" +
")");
//将数据插入MySQL中
tableEnvironment.executeSql("INSERT INTO tb_mysql_orders " +
"SELECt oid,cid,money from tb_user_order");
}
}
public class KafkaToMySQLDemo02 {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
//使用KafkaConnector从Kafka中读取数据
tableEnvironment.executeSql("CREATE TABLE tb_user_order (n" +
" `oid` STRING, n" +
" `cid` STRING, n" +
" `money` DOUBLE n" +
") WITH (n" +
" 'connector' = 'kafka',n" +
" 'topic' = 'order-json02',n" +
" 'properties.bootstrap.servers' = 'linux01:9092,linux01:9092',n" +
" 'properties.group.id' = 'testGroup',n" +
" 'scan.startup.mode' = 'earliest-offset',n" +
" 'format' = 'json',n" +
" 'json.ignore-parse-errors' = 'true' n" + //忽略格式不良好的json
")");
//使用JDBCConnector向MySQL中插入数据
tableEnvironment.executeSql("CREATE TABLE tb_mysql_orders (n" +
" `oid` STRING, n" +
" `cid` STRING, n" +
" `money` DOUBLE, n" +
" PRIMARY KEY (oid) NOT ENFORCED"+
") WITH (n" +
" 'connector' = 'jdbc',n" +
" 'url' = 'jdbc:mysql://localhost:3306/flink',n" +
" 'username' = 'root',n" +
" 'password' = 'root',n" +
" 'table-name' = 'tb_orders02'n" +
")");
//将数据插入MySQL中
tableEnvironment.executeSql("INSERT INTO tb_mysql_orders " +
"SELECt oid,cid,money from tb_user_order");
}
}



