1.在flinksql 中创建flink cdc 表 2.创建视图(用两张表关联后需要的列的结果显示为一张速度) 3.创建输出表,关联Hudi表,并且自动同步到Hive表 4.查询视图数据,插入到输出表 -- flink 后台实时执行2.3pom 文件需要的类
2.4 代码实现wudl-hudi wudl-hudi 1.0-SNAPSHOT 4.0.0 flink13.5-hudi aliyun http://maven.aliyun.com/nexus/content/groups/public/ apache https://repository.apache.org/content/repositories/snapshots/ cloudera https://repository.cloudera.com/artifactory/cloudera-repos/ spring-plugin https://repo.spring.io/plugins-release/ UTF-8 ${java.version} ${java.version} 1.8 2.12 1.13.5 2.7.3 8.0.16 2.0.2 org.apache.flink flink-jdbc_2.12 1.10.3 org.apache.flink flink-json ${flink.version} com.ververica flink-connector-mysql-cdc ${flink-mysql-cdc} com.alibaba fastjson 1.2.78 org.apache.flink flink-clients_${scala.binary.version} ${flink.version} org.apache.flink flink-java ${flink.version} org.apache.flink flink-streaming-java_${scala.binary.version} ${flink.version} org.apache.flink flink-runtime-web_${scala.binary.version} ${flink.version} org.apache.flink flink-table-common ${flink.version} org.apache.flink flink-table-planner-blink_${scala.binary.version} ${flink.version} org.apache.flink flink-table-api-java-bridge_${scala.binary.version} ${flink.version} org.apache.flink flink-connector-kafka_${scala.binary.version} ${flink.version} org.apache.flink flink-json ${flink.version} org.apache.hudi hudi-flink-bundle_${scala.binary.version} 0.10.0 org.apache.flink flink-shaded-hadoop-2-uber 2.7.5-10.0 mysql mysql-connector-java ${mysql.version} org.projectlombok lombok 1.18.12 org.slf4j slf4j-log4j12 1.7.7 runtime log4j log4j 1.2.17 runtime org.apache.maven.plugins maven-assembly-plugin 3.0.0 jar-with-dependencies make-assembly package single
package com.wudl.hudi.sink;
//import org.apache.flink.api.common.restartstrategy.RestartStrategies;
//import org.apache.flink.runtime.state.filesystem.FsStateBackend;
//import org.apache.flink.streaming.api.CheckpointingMode;
//import org.apache.flink.streaming.api.environment.CheckpointConfig;
//import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
//import org.apache.flink.table.api.EnvironmentSettings;
//import org.apache.flink.table.api.Table;
//import org.apache.flink.table.api.TableResult;
//import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
//
//import static org.apache.flink.table.api.expressions.$;
import com.wudl.hudi.utils.MyKafkaUtil;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.expressions.$;
public class MysqlJoinMysqlHuDi {
public static void main(String[] args) throws Exception {
// 1-获取表执行环境getExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// TODO: 由于增量将数据写入到Hudi表,所以需要启动Flink Checkpoint检查点
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 1.1 开启CK
env.enableCheckpointing(5000L);
env.getCheckpointConfig().setCheckpointTimeout(10000L);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//正常Cancel任务时,保留最后一次CK
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));
//状态后端
env.setStateBackend(new FsStateBackend("hdfs://192.168.1.161:8020/flink-hudi/ck"));
//设置访问HDFS的用户名
System.setProperty("HADOOP_USER_NAME", "root");
// 2-创建输入表,TODO:从Kafka消费数据
tableEnv.executeSql(
"CREATE TABLE IF NOT EXISTS source_mysql ( " +
" id BIGINT primary key NOT ENFORCED ," +
" name string," +
" age int ," +
" birthday TIMESTAMP(3)," +
" ts TIMESTAMP(3)" +
") WITH ( " +
" 'connector' = 'mysql-cdc', " +
" 'hostname' = '192.168.1.162', " +
" 'port' = '3306', " +
" 'username' = 'root', " +
" 'password' = '123456', " +
" 'server-time-zone' = 'Asia/Shanghai', " +
" 'scan.startup.mode' = 'initial', " +
" 'database-name' = 'wudldb', " +
" 'table-name' = 'Flink_cdc' " +
" )");
tableEnv.executeSql(
"CREATE TABLE IF NOT EXISTS source_mysql_Flink_cdd ( " +
" id BIGINT primary key NOT ENFORCED ," +
" phone string," +
" address string ," +
" ts TIMESTAMP(3)" +
") WITH ( " +
" 'connector' = 'mysql-cdc', " +
" 'hostname' = '192.168.1.162', " +
" 'port' = '3306', " +
" 'username' = 'root', " +
" 'password' = '123456', " +
" 'server-time-zone' = 'Asia/Shanghai', " +
" 'scan.startup.mode' = 'initial', " +
" 'database-name' = 'wudldb', " +
" 'table-name' = 'Flink_cdd' " +
" )");
String joinSql = "SELECt b.id id,b.name name,b.age age,CAST(b.birthday as STRING) birthday ,a.phone phone,a.address address,CAST(a.ts AS STRING) ts FROM source_mysql_Flink_cdd a INNER JOIN source_mysql b ON a.id = b.id";
Table tableMysqlJoin = tableEnv.sqlQuery(joinSql);
// 4-创建输出表,TODO: 关联到Hudi表,指定Hudi表名称,存储路径,字段名称等等信息
tableEnv.createTemporaryView("viewFlinkCdc",tableMysqlJoin);
tableEnv.executeSql(
"CREATE TABLE myslqjoinmysqlhudiSink (" +
" id BIGINT PRIMARY KEY NOT ENFORCED," +
" name STRING," +
" age INT," +
" birthday STRING," +
" phone STRING," +
" address STRING," +
" ts STRING" +
")" +
"WITH (" +
" 'connector' = 'hudi'," +
" 'path' = 'file:///D:/myslqjoinmysqlhudiSink'," +
// " 'path' = 'hdfs://192.168.1.161:8020/hudi-warehouse/order_hudi_sink' ,n" +
" 'table.type' = 'MERGE_ON_READ'," +
" 'write.operation' = 'upsert'," +
" 'hoodie.datasource.write.recordkey.field'= 'id'," +
" 'write.precombine.field' = 'ts'," +
" 'write.tasks'= '1'" +
")"
);
TableResult kafkaSink = tableEnv.executeSql(
"CREATE TABLE flinkCdc_kafka_Sink (" +
" id BIGINT NOT NULL," +
" name STRING," +
" age INT," +
" birthday STRING," +
" phone STRING," +
" address STRING," +
" ts STRING" +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'sinktest'," +
" 'scan.startup.mode' = 'earliest-offset', "+
" 'properties.bootstrap.servers' = '192.168.1.161:6667'," +
" 'format' = 'debezium-json'," +
" 'debezium-json.ignore-parse-errors'='true' " +
")"
);
// // 5-通过子查询方式,将数据写入输出表
tableEnv.executeSql(
"INSERT INTO myslqjoinmysqlhudiSink " +
"SELECt id,name,age,birthday,phone,address, ts FROM viewFlinkCdc"
);
tableEnv.sqlQuery("select * from flinkCdc_kafka_Sink").printSchema();
tableEnv.sqlQuery("select * from viewFlinkCdc").printSchema();
tableEnv.executeSql("insert into flinkCdc_kafka_Sink SELECt b.id id,b.name name,b.age age,CAST(b.birthday as STRING) birthday ,a.phone phone,a.address address,CAST(a.ts AS STRING) ts FROM source_mysql_Flink_cdd a INNER JOIN source_mysql b ON a.id = b.id ");
// tableEnv.executeSql("insert into myslqjoinmysqlhudiSink SELECt b.id id,b.name name,b.age age,CAST(b.birthday as STRING) birthday ,a.phone phone,a.address address,CAST(a.ts AS STRING) ts FROM source_mysql_Flink_cdd a INNER JOIN source_mysql b ON a.id = b.id ");
// tableEnv.executeSql("insert into flinkCdc_kafka_Sink select id,name,age,CAST(birthday as STRING) birthday ,phone, address,CAST(ts AS STRING) ts from myslqjoinmysqlhudiSink ");
// tableEnv.executeSql("insert into flinkcdc_hudi_sink select id,name,age,CAST(birthday as STRING) birthday, CAST(ts as STRING) ts from source_mysql ");
System.out.println("--------------------------");
}
}
2.5 mysql 表结构
CREATE TABLE `Flink_cdc` ( `id` bigint(64) NOT NULL AUTO_INCREMENT, `name` varchar(64) DEFAULT NULL, `age` int(20) DEFAULT NULL, `birthday` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, `ts` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=7604 DEFAULT CHARSET=utf8mb4 #********************************************************************************* CREATE TABLE `Flink_cdd` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `phone` varchar(20) DEFAULT NULL, `address` varchar(200) DEFAULT NULL, `ts` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATe CURRENT_TIMESTAMP, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=7417 DEFAULT CHARSET=utf8mb42.6mysql 代码实现
package com.wudl.hudi.source; import com.alibaba.fastjson.JSON; import com.wudl.hudi.entity.FlinkCdcBean; import com.wudl.hudi.entity.Order; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.text.SimpleDateFormat; import java.util.*; public class GenerateMysqlFlinkCdcBean implements SourceFunction{ private boolean isRunning = true; String[] citys = {"北京", "广东", "山东", "江苏", "河南", "上海", "河北", "浙江", "香港", "山西", "陕西", "湖南", "重庆", "福建", "天津", "云南", "四川", "广西", "安徽", "海南", "江西", "湖北", "山西", "辽宁", "内蒙古"}; Integer i = 0; List list = new ArrayList<>(); @Override public void run(SourceContext ctx) throws Exception { Random random = new Random(); SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); while (isRunning) { int number = random.nextInt(4) + 1; String name = getChineseName(); String address = citys[random.nextInt(citys.length)]; int age = random.nextInt(25); String birthday = getDate(); String phone = getTel(); java.sql.Timestamp ts = new java.sql.Timestamp(df.parse(getDate()).getTime()); FlinkCdcBean flinkCdcBean = new FlinkCdcBean(name, age, birthday, ts, phone, address); ctx.collect(flinkCdcBean); } } public static String getDate() throws InterruptedException { Calendar calendar = Calendar.getInstance(); Date date = calendar.getTime(); String dataStr = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").format(date); Thread.sleep(10); return dataStr; } public static int getNum(int start, int end) { return (int) (Math.random() * (end - start + 1) + start); } private static String[] telFirst = "134,135,136,137,138,139,150,151,152,157,158,159,130,131,132,155,156,133,153".split(","); private static String getTel() { int index = getNum(0, telFirst.length - 1); String first = telFirst[index]; String second = String.valueOf(getNum(1, 888) + 10000).substring(1); String third = String.valueOf(getNum(1, 9100) + 10000).substring(1); return first + second + third; } @Override public void cancel() { isRunning = false; } private static String firstName="赵钱孙李周吴郑王冯陈褚卫蒋沈韩杨朱秦尤许何吕施张孔曹严华金魏陶姜戚谢邹喻柏水窦章云苏潘葛奚范彭郎鲁韦昌马苗凤花方俞任袁柳酆鲍史唐费廉岑薛雷贺倪汤滕殷罗毕郝邬安常乐于时傅皮卞齐康伍余元卜顾孟平黄和穆萧尹姚邵湛汪祁毛禹狄米贝明臧计伏成戴谈宋茅庞熊纪舒屈项祝董梁杜阮蓝闵席季麻强贾路娄危江童颜郭梅盛林刁钟徐邱骆高夏蔡田樊胡凌霍虞万支柯咎管卢莫经房裘缪干解应宗宣丁贲邓郁单杭洪包诸左石崔吉钮龚程嵇邢滑裴陆荣翁荀羊於惠甄魏加封芮羿储靳汲邴糜松井段富巫乌焦巴弓牧隗山谷车侯宓蓬全郗班仰秋仲伊宫宁仇栾暴甘钭厉戎祖武符刘姜詹束龙叶幸司韶郜黎蓟薄印宿白怀蒲台从鄂索咸籍赖卓蔺屠蒙池乔阴郁胥能苍双闻莘党翟谭贡劳逄姬申扶堵冉宰郦雍却璩桑桂濮牛寿通边扈燕冀郏浦尚农温别庄晏柴瞿阎充慕连茹习宦艾鱼容向古易慎戈廖庚终暨居衡步都耿满弘匡国文寇广禄阙东殴殳沃利蔚越夔隆师巩厍聂晁勾敖融冷訾辛阚那简饶空曾毋沙乜养鞠须丰巢关蒯相查后江红游竺权逯盖益桓公万俟司马上官欧阳夏侯诸葛闻人东方赫连皇甫尉迟公羊澹台公冶宗政濮阳淳于仲孙太叔申屠公孙乐正轩辕令狐钟离闾丘长孙慕容鲜于宇文司徒司空亓官司寇仉督子车颛孙端木巫马公西漆雕乐正壤驷公良拓拔夹谷宰父谷粱晋楚阎法汝鄢涂钦段干百里东郭南门呼延归海羊舌微生岳帅缑亢况后有琴梁丘左丘东门西门商牟佘佴伯赏南宫墨哈谯笪年爱阳佟第五言福百家姓续"; private static String girl="秀娟英华慧巧美娜静淑惠珠翠雅芝玉萍红娥玲芬芳燕彩春菊兰凤洁梅琳素云莲真环雪荣爱妹霞香月莺媛艳瑞凡佳嘉琼勤珍贞莉桂娣叶璧璐娅琦晶妍茜秋珊莎锦黛青倩婷姣婉娴瑾颖露瑶怡婵雁蓓纨仪荷丹蓉眉君琴蕊薇菁梦岚苑婕馨瑗琰韵融园艺咏卿聪澜纯毓悦昭冰爽琬茗羽希宁欣飘育滢馥筠柔竹霭凝晓欢霄枫芸菲寒伊亚宜可姬舒影荔枝思丽 "; private static String boy="伟刚勇毅俊峰强军平保东文辉力明永健世广志义兴良海山仁波宁贵福生龙元全国胜学祥才发武新利清飞彬富顺信子杰涛昌成康星光天达安岩中茂进林有坚和彪博诚先敬震振壮会思群豪心邦承乐绍功松善厚庆磊民友裕河哲江超浩亮政谦亨奇固之轮翰朗伯宏言若鸣朋斌梁栋维启克伦翔旭鹏泽晨辰士以建家致树炎德行时泰盛雄琛钧冠策腾楠榕风航弘"; private static String name_sex = ""; private static String getChineseName() { int index=getNum(0, firstName.length()-1); String first=firstName.substring(index, index+1); int sex=getNum(0,1); String str=boy; int length=boy.length(); if(sex==0){ str=girl; length=girl.length(); name_sex = "女"; }else { name_sex="男"; } index=getNum(0,length-1); String second=str.substring(index, index+1); int hasThird=getNum(0,1); String third=""; if(hasThird==1){ index=getNum(0,length-1); third=str.substring(index, index+1); } return first+second+third; } public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource addSource = env.addSource(new GenerateMysqlFlinkCdcBean()); addSource.print(); Thread.sleep(5000); addSource.addSink(new MysqlJdbcSink()); env.execute(); } }
package com.wudl.hudi.source; import com.wudl.hudi.entity.FlinkCdcBean; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; public class MysqlJdbcSink extends RichSinkFunction2.7 读取hudi 数据{ // 声明连接和预编译语句 Connection connection = null; PreparedStatement insertStmtFlink_cdc = null; PreparedStatement insertStmtFlink_cdd = null; @Override public void open(Configuration parameters) throws Exception { connection = DriverManager.getConnection("jdbc:mysql://192.168.1.162:3306/wudldb?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai", "root", "123456"); insertStmtFlink_cdc = connection.prepareStatement("INSERT INTO `wudldb`.`Flink_cdc`(NAME,age,birthday,ts) VALUES(?,?,?,?) "); insertStmtFlink_cdd = connection.prepareStatement("INSERT INTO `wudldb`.`Flink_cdd` (phone,address,ts) VALUES(?,?,?) "); } // 每来一条数据,调用连接,执行sql @Override public void invoke(FlinkCdcBean fc, Context context) throws Exception { insertStmtFlink_cdc.setString(1, fc.getName()); insertStmtFlink_cdc.setInt(2, fc.getAge()); insertStmtFlink_cdc.setString(3, fc.getTs().toString()); insertStmtFlink_cdc.setString(4, fc.getTs().toString()); insertStmtFlink_cdc.execute(); insertStmtFlink_cdd.setString(1, fc.getPhone()); insertStmtFlink_cdd.setString(2, fc.getAddress()); insertStmtFlink_cdd.setString(3, fc.getTs().toString()); insertStmtFlink_cdd.executeUpdate(); } @Override public void close() throws Exception { insertStmtFlink_cdc.close(); insertStmtFlink_cdd.close(); connection.close(); } }
package com.wudl.hudi.sink;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class MysqlJoinMysqlHuDiRead {
public static void main(String[] args) throws Exception {
// 1-获取表执行环境getExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// TODO: 由于增量将数据写入到Hudi表,所以需要启动Flink Checkpoint检查点
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.executeSql(
"CREATE TABLE order_hudi(n" +
" id BIGINT PRIMARY KEY NOT ENFORCED," +
" name STRING," +
" age INT," +
" birthday STRING," +
" phone STRING," +
" address STRING," +
" ts STRING" +
")" +
"WITH (" +
" 'connector' = 'hudi'," +
" 'path' = 'file:///D:/myslqjoinmysqlhudiSink'," +
" 'table.type' = 'MERGE_ON_READ'," +
" 'read.streaming.enabled' = 'true'," +
" 'read.streaming.check-interval' = '4'" +
")"
);
tableEnv.executeSql("select * from order_hudi ").print();
}
}
运行jar
tableEnv.executeSql("CREATE TABLE myslqjoinmysqlhudiSink(n" +
"id bigint ,n" +
"name string,n" +
"age int,n" +
"birthday STRING,n" +
"phone STRING,n" +
"address STRING,n" +
"ts TIMESTAMP(3),n" +
"primary key(id) not enforcedn" +
")n" +
"with(n" +
"'connector'='hudi',n" +
"'path'= 'hdfs://192.168.1.161:8020/myslqjoinmysqlhudiSink', n" +
"'table.type'= 'MERGE_ON_READ',n" +
"'hoodie.datasource.write.recordkey.field'= 'id', n" +
"'write.precombine.field'= 'ts',n" +
"'write.tasks'= '1',n" +
"'write.rate.limit'= '2000', n" +
"'compaction.tasks'= '1', n" +
"'compaction.async.enabled'= 'true',n" +
"'compaction.trigger.strategy'= 'num_commits',n" +
"'compaction.delta_commits'= '1',n" +
"'changelog.enabled'= 'true',n" +
"'read.streaming.enabled'= 'true',n" +
"'read.streaming.check-interval'= '3',n" +
"'hive_sync.enable'= 'true',n" +
"'hive_sync.mode'= 'hms',n" +
"'hive_sync.metastore.uris'= 'thrift://node02.com:9083',n" +
"'hive_sync.jdbc_url'= 'jdbc:hive2://node02.com:10000',n" +
"'hive_sync.table'= 'myslqjoinmysqlhudiSink',n" +
"'hive_sync.db'= 'db_hive',n" +
"'hive_sync.username'= 'root',n" +
"'hive_sync.password'= '123456',n" +
"'hive_sync.support_timestamp'= 'true'n" +
")");
2.9 命令提交
[root@node01 bin]# ./flink run -m 192.168.1.161:8081 -c com.wudl.hudi.sink.MysqlJoinMysqlHuDi /opt/module/jar/flink13.5-hudi-1.0-SNAPSHOT-jar-with-dependencies.jar Setting Hbase_CONF_DIR=/etc/hbase/conf because no Hbase_CONF_DIR was set. log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. Job has been submitted with JobID 225aba756224502aa9e643d75560ddb9 Job has been submitted with JobID 63a6b9e2bb697a4ce0c3f993c720b534 -------------------------- [root@node01 bin]#
flink 后台
效果



