- 现实项目和业务场景经常会做一件事:同一份数据的多存储系统存储,如同一份订单数据存在mysql,做大屏查询可能又要存在es,热门数据存在redis;还有上游多存储系统的实时数据采集,如postgresql、mysql等产生的实时数据要采集推送到Kafka。
- 针对同一份数据的多存储系统存储,如果数据是实时性要求不高的,做好离线定时同步即可。但针对上游实时数据采集的情况,以及多存储系统存储数据实时性要求较高时,如何做到近实时同步又是一个棘手的问题。传统基于jdbc的查询需要定好轮询周期,如需达到准实时轮循周期一般都较小,对数据库压力较大,并且这其中的更新、删除等操作不一定能够被采集到,数据一致性不能够被保障,这一部分最常见的组件就是Kafka connect、Logstash等。
针对如上的项目背景和业务需求,使用流式ETL作业的方式能很好的解决存在的问题。这里使用FlinkCDC快速构建MySQL到Elasticsearch的流式ETL作业,实现实时同步数据小Demo。
- MySQL创建数据库和表 products,orders,并插入数据
-- MySQL
CREATE DATAbase flinkcdc;
USE flinkcdc;
CREATE TABLE products (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description VARCHAR(512)
);
ALTER TABLE products AUTO_INCREMENT = 101;
INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter"),
(default,"car battery","12V car battery"),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
(default,"hammer","12oz carpenter's hammer"),
(default,"hammer","14oz carpenter's hammer"),
(default,"hammer","16oz carpenter's hammer"),
(default,"rocks","box of assorted rocks"),
(default,"jacket","water resistent black wind breaker"),
(default,"spare tire","24 inch spare tire");
CREATE TABLE orders (
order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATETIME NOT NULL,
customer_name VARCHAR(255) NOT NULL,
price DECIMAL(10, 5) NOT NULL,
product_id INTEGER NOT NULL,
order_status BOOLEAN NOT NULL -- Whether order has been placed
) AUTO_INCREMENT = 10001;
INSERT INTO orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
(default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
(default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);
- 启动 Flink 集群和 Flink SQL CLI
-
我这里是用的是Flink-1.13.2,下载地址
flink下载 -
启动集群之前保证mysql和elasticsearch connector已经下载好,并放入Flink lib目录下:
flink-sql-connector-elasticsearch6_2.12-1.13.2.jar
flink-sql-connector-mysql-cdc-2.1.0.jar -
elasticsearch连接器-后面的是flink的版本,要保证和flink版本一致。
- 下载完放入lib包以后,启动器群
./bin/start-cluster.sh
启动成功以后访问 http://localhost:8081/,可看到Flink Web UI 界面:
- 启动Flink SQL CLI :
./bin/sql-client.sh
启动成功以后可看到如下界面:
- 在 Flink SQL CLI 中使用 Flink DDL 创建表
- 首先,开启 checkpoint,每隔3秒做一次 checkpoint
Flink SQL> SET execution.checkpointing.interval = 3s;
- 对于数据库中的表 products, orders, 使用 Flink SQL CLI 创建对应的表,用于同步这些底层数据库表的数据:
CREATE TABLE products (
id INT,
name STRING,
description STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = 'admin',
'database-name' = 'flinkcdc',
'table-name' = 'products'
);
CREATE TABLE orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = 'admin',
'database-name' = 'flinkcdc',
'table-name' = 'orders'
);
- 创建 enriched_orders 表, 用来将关联后的订单数据写入 Elasticsearch 中
CREATE TABLE enriched_orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
product_name STRING,
product_description STRING,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-6',
'hosts' = 'http://localhost:9200',
'index' = 'enriched_orders',
'document-type'='orders'
);
- 关联订单数据并且将其写入 Elasticsearch 中:
- 使用Flink SQL 将订单表 order 与 商品表 products关联,并将关联后的订单信息写入 Elasticsearch 中:
insert into enriched_orders select o.order_id as order_id, o.order_date as order_date, o.customer_name as customer_name, o.price as price, o.product_id as product_id, o.order_status as order_status, p.name as product_name, p.description as product_description from orders as o left join products as p on o.product_id=p.id;
- 访问 Kibana 可看到订单宽表的数据:
接下来,修改 MySQL 和 Postgres 数据库中表的数据,Kibana中显示的订单数据也将实时更新:
- mysql插入数据
# mysql插入数据 INSERT INTO orders VALUES (default, '2020-07-30 15:22:00', 'Jark', 29.71, 104, false);
Kibana可查到刚刚插入的数据:
- MySQL 的 orders 表中更新订单的状态
UPDATe orders SET order_status = true WHERe order_id = 10006;
Kibana可查到更新数据的订单状态变为了true:
- 在 MySQL 的 orders 表中删除一条数据
DELETE FROM orders WHERe order_id = 10006;
再去查kibana,order_id为10006的已经被删除。
更多FlinkCDC请参考 https://ververica.github.io/flink-cdc-connectors
完结,散花!



