1、流表join
(1)Regular Joins(inner join和left left一样)
两个流一致持续,一个流很久以后出现一个数据,只要id可以关联就会关联,这样表中的所有数据都会保存状态,随着数据变大,不适合
(2)Interval Joins (时间间隔join) 不会保留大量的状态(双流join)
—只会关联这一段时间的join数据
—a只会关联b时间到b-5之间数据(b-5 (3)流表和维表关联
—维表有时也会更新,但是维表是一个有界流一下就读取完成了
这时需要使用第三方工具mysqlcdc
—使用mysqlcdc关联维表
– 可以实时发现维表更新 需要将flink-sql-connector-mysql-cdc-1.1.0 上传到flink 的lib目录下
mysql cdc
1)、先进行全量表读取
2)、再通过监控 mysql 的binlog 日志实时读取新的数据
附件数据(1)
1、Regular Joins 问题:两边表的数据都会以flink状态的形式保存起来,如果表持续增长,会导致flink状态放不下,出问题 CREATE TABLE student_join ( id String, name String, age int, gender STRING, clazz STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'student_join', 'properties.bootstrap.servers' = 'master:9092', 'properties.group.id' = 'asdasdasd', 'format' = 'csv', 'scan.startup.mode' = 'latest-offset' ) CREATE TABLE score_join ( s_id String, c_id String, sco int ) WITH ( 'connector' = 'kafka', 'topic' = 'score_join', 'properties.bootstrap.servers' = 'master:9092', 'properties.group.id' = 'asdasdasd', 'format' = 'csv', 'scan.startup.mode' = 'latest-offset' ) select a.id,a.name,b.c_id,b.sco from student_join a inner join score_join b on a.id=b.s_id select a.id,a.name,b.c_id,b.sco from student_join a left join score_join b on a.id=b.s_id kafka-console-producer.sh --broker-list master:9092 --topic student_join 1500100001,施笑槐,22,女,文科六班 1500100002,吕金鹏,24,男,文科六班 1500100003,单乐蕊,22,女,理科六班 kafka-console-producer.sh --broker-list master:9092 --topic score_join 1500100001,1000001,98 1500100001,1000002,5 1500100001,1000003,137 1500100001,1000004,29 1500100001,1000005,85 1500100001,1000006,52 1500100002,1000001,139 1500100002,1000002,102
附件数据(2)
CREATE TABLE student_Interval_join ( id String, name String, age int, gender STRING, clazz STRING, ts TIMESTAMP(3), WATERMARK FOR ts AS ts - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'student_join', 'properties.bootstrap.servers' = 'node1:9092,node2:9092,master:9092', 'properties.group.id' = 'asdasdasd', 'format' = 'csv', 'scan.startup.mode' = 'latest-offset' ) CREATE TABLE score_Interval_join ( s_id String, c_id String, sco int, ts TIMESTAMP(3), WATERMARK FOR ts AS ts - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'score_join', 'properties.bootstrap.servers' = 'node1:9092,node2:9092,master:9092', 'properties.group.id' = 'asdasdasd', 'format' = 'csv', 'scan.startup.mode' = 'latest-offset' ) select a.id,a.name,b.c_id,b.sco from student_Interval_join a , score_Interval_join b WHERe a.id=b.s_id and a.ts BETWEEN b.ts - INTERVAL '5' SECOND AND b.ts kafka-console-producer.sh --broker-list master:9092 --topic student_join 1500100001,施笑槐,22,女,文科六班,"2020-09-17 15:12:20" 1500100002,吕金鹏,24,男,文科六班,"2020-09-17 15:12:20" 1500100002,吕金鹏,24,男,文科六班,"2020-09-17 15:12:36" 1500100003,单乐蕊,22,女,理科六班 kafka-console-producer.sh --broker-list master:9092 --topic score_join 1500100001,1000001,98,"2020-09-17 15:12:22" 1500100001,1000002,5,"2020-09-17 15:12:23" 1500100001,1000002,5,"2020-09-17 15:12:40" 1500100002,1000002,5,"2020-09-17 15:12:10" 1500100001,1000003,137 1500100001,1000004,29 1500100001,1000005,85 1500100001,1000006,52 1500100002,1000001,139 1500100002,1000002,102
附件数据(3)
-- 维表在数据 mysql中 无法发现维表更新 CREATE TABLE student_mysql ( id String, name String, age int, gender STRING, clazz STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://master:3306/student?useUnicode=true&characterEncoding=utf-8&useSSL=false', 'table-name' = 'student', 'username' = 'root', 'password'= '123456' ) -- 流表数据再kafka中 CREATE TABLE score_join1 ( s_id String, c_id String, sco int ) WITH ( 'connector' = 'kafka', 'topic' = 'score_join', 'properties.bootstrap.servers' = 'master:9092', 'properties.group.id' = 'asdasdasd', 'format' = 'csv', 'scan.startup.mode' = 'latest-offset' ) select a.id,a.name,b.c_id,b.sco from score_join1 b left join student_mysql a on a.id=b.s_id kafka-console-producer.sh --broker-list master:9092 --topic score_join 1500100001,1000001,98 1500100002,1000002,5 1500100001,1000003,137 1500100001,1000004,29 ---使用mysqlcdc关联维表 -- 可以实时发现维表更新 需要将flink-sql-connector-mysql-cdc-1.1.0 上传到flink 的lib目录下 mysql cdc 1、先进行全量表读取 2、再通过监控 mysql 的binlog 日志实时读取新的数据 CREATE TABLE student_mysql_cdc ( id String, name String, age int, gender STRING, clazz STRING ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'master', 'port' = '3306', 'username' = 'root', 'password' = '123456', 'database-name' = 'student', 'table-name' = 'student' ) select a.id,a.name,b.c_id,b.sco from score_join1 b left join student_mysql_cdc a on a.id=b.s_id kafka-console-producer.sh --broker-list master:9092 --topic score_join 1500100001,1000001,98 1500100001,1000002,5 1500100001,1000003,137 1500100001,1000004,29 1500100001,1000005,85 1500100001,1000006,52 1500100002,1000001,139 1500100002,1000002,102 1500100003,1000002,102 1,1000002,102 2,1000002,102



