栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

FlinkCDC实时同步MySQL数据到ES

FlinkCDC实时同步MySQL数据到ES

一、背景
  1. 现实项目和业务场景经常会做一件事:同一份数据的多存储系统存储,如同一份订单数据存在mysql,做大屏查询可能又要存在es,热门数据存在redis;还有上游多存储系统的实时数据采集,如postgresql、mysql等产生的实时数据要采集推送到Kafka。
  2. 针对同一份数据的多存储系统存储,如果数据是实时性要求不高的,做好离线定时同步即可。但针对上游实时数据采集的情况,以及多存储系统存储数据实时性要求较高时,如何做到近实时同步又是一个棘手的问题。传统基于jdbc的查询需要定好轮询周期,如需达到准实时轮循周期一般都较小,对数据库压力较大,并且这其中的更新、删除等操作不一定能够被采集到,数据一致性不能够被保障,这一部分最常见的组件就是Kafka connect、Logstash等。
二、引入

针对如上的项目背景和业务需求,使用流式ETL作业的方式能很好的解决存在的问题。这里使用FlinkCDC快速构建MySQL到Elasticsearch的流式ETL作业,实现实时同步数据小Demo。

三、流式ETL作业demo
  1. MySQL创建数据库和表 products,orders,并插入数据
-- MySQL
CREATE DATAbase flinkcdc;
USE flinkcdc;
CREATE TABLE products (
  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  name VARCHAR(255) NOT NULL,
  description VARCHAR(512)
);
ALTER TABLE products AUTO_INCREMENT = 101;

INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter"),
       (default,"car battery","12V car battery"),
       (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
       (default,"hammer","12oz carpenter's hammer"),
       (default,"hammer","14oz carpenter's hammer"),
       (default,"hammer","16oz carpenter's hammer"),
       (default,"rocks","box of assorted rocks"),
       (default,"jacket","water resistent black wind breaker"),
       (default,"spare tire","24 inch spare tire");

CREATE TABLE orders (
  order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  order_date DATETIME NOT NULL,
  customer_name VARCHAR(255) NOT NULL,
  price DECIMAL(10, 5) NOT NULL,
  product_id INTEGER NOT NULL,
  order_status BOOLEAN NOT NULL -- Whether order has been placed
) AUTO_INCREMENT = 10001;

INSERT INTO orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
       (default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
       (default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);
  1. 启动 Flink 集群和 Flink SQL CLI
  • 我这里是用的是Flink-1.13.2,下载地址
    flink下载

  • 启动集群之前保证mysql和elasticsearch connector已经下载好,并放入Flink lib目录下:
    flink-sql-connector-elasticsearch6_2.12-1.13.2.jar
    flink-sql-connector-mysql-cdc-2.1.0.jar

  • elasticsearch连接器-后面的是flink的版本,要保证和flink版本一致。

  1. 下载完放入lib包以后,启动器群
./bin/start-cluster.sh

启动成功以后访问 http://localhost:8081/,可看到Flink Web UI 界面:

  1. 启动Flink SQL CLI :
./bin/sql-client.sh

启动成功以后可看到如下界面:

  1. 在 Flink SQL CLI 中使用 Flink DDL 创建表
  • 首先,开启 checkpoint,每隔3秒做一次 checkpoint
Flink SQL> SET execution.checkpointing.interval = 3s;
  • 对于数据库中的表 products, orders, 使用 Flink SQL CLI 创建对应的表,用于同步这些底层数据库表的数据:
CREATE TABLE products (
    id INT,
    name STRING,
    description STRING,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'localhost',
    'port' = '3306',
    'username' = 'root',
    'password' = 'admin',
    'database-name' = 'flinkcdc',
    'table-name' = 'products'
  );

CREATE TABLE orders (
    order_id INT,
    order_date TIMESTAMP(0),
    customer_name STRING,
    price DECIMAL(10, 5),
    product_id INT,
    order_status BOOLEAN,
    PRIMARY KEY (order_id) NOT ENFORCED
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'localhost',
    'port' = '3306',
    'username' = 'root',
    'password' = 'admin',
    'database-name' = 'flinkcdc',
    'table-name' = 'orders'
  );
  • 创建 enriched_orders 表, 用来将关联后的订单数据写入 Elasticsearch 中
CREATE TABLE enriched_orders (
   order_id INT,
   order_date TIMESTAMP(0),
   customer_name STRING,
   price DECIMAL(10, 5),
   product_id INT,
   order_status BOOLEAN,
   product_name STRING,
   product_description STRING,
   PRIMARY KEY (order_id) NOT ENFORCED
 ) WITH (
     'connector' = 'elasticsearch-6',
     'hosts' = 'http://localhost:9200',
     'index' = 'enriched_orders',
	 'document-type'='orders'
 );
  1. 关联订单数据并且将其写入 Elasticsearch 中:
  • 使用Flink SQL 将订单表 order 与 商品表 products关联,并将关联后的订单信息写入 Elasticsearch 中:
insert into enriched_orders
select 
	o.order_id      as order_id,
	o.order_date    as order_date,
	o.customer_name as customer_name,
	o.price         as price,
	o.product_id    as product_id,
	o.order_status  as order_status,
	p.name          as product_name,
	p.description   as product_description
from orders as o
left join products as p on o.product_id=p.id;
  • 访问 Kibana 可看到订单宽表的数据:


接下来,修改 MySQL 和 Postgres 数据库中表的数据,Kibana中显示的订单数据也将实时更新:

  • mysql插入数据
# mysql插入数据
INSERT INTO orders
VALUES (default, '2020-07-30 15:22:00', 'Jark', 29.71, 104, false);

Kibana可查到刚刚插入的数据:

  • MySQL 的 orders 表中更新订单的状态
UPDATe orders SET order_status = true WHERe order_id = 10006;

Kibana可查到更新数据的订单状态变为了true:

  • 在 MySQL 的 orders 表中删除一条数据
DELETE FROM orders WHERe order_id = 10006;

再去查kibana,order_id为10006的已经被删除。

更多FlinkCDC请参考 https://ververica.github.io/flink-cdc-connectors

完结,散花!

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/651161.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号