栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

FlinkCDC将MySQL的Binlog数据以纯FlinkSQL落表至Hive

FlinkCDC将MySQL的Binlog数据以纯FlinkSQL落表至Hive

目的

使用FlinkSQL(1.13.6)纯SQL方式,通过FlinkCDC(2.1.1)获取MySQL的Binlog数据,以流的形式同步到Hive表中。小文件问题可使用FlinkSQL批处理定期执行表合并来解决。

步骤

    启动MySQL的Binlog功能(略)

    FlinkCDC获取MySQL Binlog并写入Kafka表;

-- 读取MySQL源表
DROp TABLE IF EXISTS mysql_cdc;
CREATE TABLE mysql_cdc (
   `do_date`     STRING
  ,`do_min`      STRING
  ,`pv`          INT
  PRIMARY KEY(do_date) NOT ENFORCED
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'xxx.xxx.xxx.xxx',
  'port' = '3306',
  'username' = 'root',
  'password' = 'xxxxxxxx',
  'database-name' = 'test',
  'table-name' = 'result_total_pvuv_min_mysql',
  'scan.startup.mode'='initial',
  'debezium.snapshot.mode' = 'initial'
);

-- Sink表,将数据写入Kafka
DROP TABLE IF EXISTS kafka_mysql_cdc;
CREATE TABLE kafka_mysql_cdc (
   `do_date`     STRING
  ,`do_min`      STRING
  ,`pv`          INT
) WITH (
 'connector' = 'kafka',
 'topic' = 'test',
 'properties.group.id' = 'flinkTest',
 'scan.startup.mode' = 'earliest-offset',
 'properties.bootstrap.servers' = 'collector101:9092,collector102:9092',
 'format' = 'debezium-json'
);

INSERT INTO kafka_mysql_cdc SELECT * FROM mysql_cdc;
    FlinkSQL Kafka表解析Map
-- 另一种解析表,以纯Json格式解析Debezium—json
DROp TABLE IF EXISTS kafka_mysql_parser;
CREATE TABLE kafka_mysql_parser (
  `before` MAP
  ,`after`  MAP
  ,`op`     STRING
) WITH (
 'connector' = 'kafka',
 'topic' = 'test',
 'properties.group.id' = 'flinkTest',
 'scan.startup.mode' = 'earliest-offset',
 'properties.bootstrap.servers' = 'collector101:9092,collector102:9092',
 'format' = 'json'
);
    将解析后的数据存入Hive
-- 创建Hive表
SET table.sql-dialect=hive;
DROP TABLE IF EXISTS kafka_hive_cdc;
CREATE TABLE kafka_hive_cdc (
  do_date     STRING,
  do_min      STRING,
  pv          STRING,
  op          STRING,
  process_time STRING
) PARTITIonED BY (dt STRING) STORED AS parquet TBLPROPERTIES (
  'partition.time-extractor.timestamp-pattern'='$dt',
  'sink.partition-commit.trigger'='partition-time',
  'sink.partition-commit.delay'='0S',
  'sink.partition-commit.policy.kind'='metastore,success-file',
  'auto-compaction'='true',
  'compaction.file-size'='128MB'
);

-- 通过字典获取Kafka中数据
INSERT INTO kafka_hive_cdc
SELECT
 case when `before` is not null then `before`['do_date'] else `after`['do_date'] end as a
,case when `before` is not null then `before`['do_min'] else `after`['do_min'] end as a
,case when `before` is not null then `before`['pv'] else `after`['pv'] end as a
,`op`
,CAST(PROCTIME() AS STRING)
,DATE_FORMAT(PROCTIME(), 'yyyy-MM-dd')
FROM kafka_mysql_parser;
    小文件合并,注意这步需要关闭Checkpoint,否则会IllegalArgumentException: Checkpoint is not supported for batch jobs.
-- 小文件合并;
SET execution.runtime-mode=batch;
INSERT OVERWRITE kafka_hive_cdc SELECt * FROM kafka_hive_cdc;
    从Hive中读取最终数据
-- 从Hive表中取出最终版数据
SELECt
 `do_date`
,`do_min`
,`pv`
FROM (
SELECt
*
,ROW_NUMBER() OVER(PARTITION BY `do_date` ORDER BY `process_time` DESC, `op`) AS `rn`
FROM kafka_hive_cdc) a
WHERe `rn` = 1 AND `op` = 'c';
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/752130.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号