栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink cdc实践

flink cdc实践

flink 1.12.1
flink-cdc 1.2.0
mysql 7+

最近使用公司的大数据平台做实时ETL,发现平台提供的实时消费binlog到kafka的服务有点坑,莫名其妙失败重启,日志也没有效的信息,于是打算另辟蹊径。主要考虑能控制整个流程,能查看完整日志,不新增技术栈,最好还是基于flink,于是选择了flink-cdc,它底层基于debezium。

  • sterp 1
    首先需要确保MySQL开启binlog,并且有采集binlog的全限
#需要的mysql配置
log-bin = on #开启BINLOG
binlog-format = ROW #选择ROW模式
gtid_mode = ON #开启GTID

#可以使用数据库用户执行以下命令确认是否有采集binlog全限
#1、是否打开binlog特性,on表示开启
show global variables where Variable_name = 'log_bin'
#2、是否为row模式,row表示已开启row模式
show global variables where Variable_name = 'binlog_format'
#3、是否打开gtid特性,on表示开启
show global variables where Variable_name = 'gtid_mode'
#4、查询是否有获取binlog日志全限,为空则表示全选不够
show binary logs #获取binlog文件列表
show master logs #查看master的binlog日志
show master status #查看当前写入的binlog文件
show binlog events in 'mysql-bin.000001' #查看指定binlog文件内容

#如果全限不够,可以参看下面的语句授权:
 GRANT SELECt, SHOW DATAbaseS, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user'

有时候会报需要RELOAD权限,这时有两种办法:
1、给用户赋予reload权限
2、编写flinksql时配置debezium.snapshot.locking.mode=node参数,跳过全局锁

  • step2
    依赖

  com.alibaba.ververica
  flink-connector-mysql-cdc
  1.2.0



    mysql
    mysql-connector-java
    8.0.18

demo

-- register a MySQL table 'orders' in Flink SQL
CREATE TABLE orders (
  order_id INT,
  order_date TIMESTAMP(0),
  customer_name STRING,
  price DECIMAL(10, 5),
  product_id INT,
  order_status BOOLEAN
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'localhost',
  'port' = '3306',
  'username' = 'root',
  'password' = '123456',
  'database-name' = 'mydb',
  'table-name' = 'orders'
);

-- read snapshot and binlogs from orders table
SELECt * FROM orders;

这样就能采集binlog日志了,下来就可以将采集的数据写入到下游,比如kafka,es,hudi等。

参考:
https://ververica.github.io/flink-cdc-connectors/release-1.4/index.html
https://nightlies.apache.org/flink/flink-docs-release-1.12/

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/316151.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号