栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Debezium同步mysql数据

Debezium同步mysql数据

1.1    简介


为实现数据同步功能,进行debezium组件验证,编写此说明。

1.2    目标读者


使用数据同步组件相关人员,技术选型人员。

3.1    组件版本

(版本可换,保证kafka、mysql能用即可)
Kafka:2.2.1+cdh6.3.2
Zookeeper:3.4.5+cdh6.3.2
MySql(MariaDB版本)
Debezium:1.5.0

3.2 组件使用

MySQL安装在10.22.82.123节点,需开启binlog。

登录命令:mysql -uroot -p123456
账号:root
密码:123456
如果是maridb mysql,可能会用到命令:
sudo systemctl status mariadb
sudo systemctl restart mariadb
ps -ef | grep mariadb

3.3Kafka


查看已创建的kafka的topic
/opt/cloudera/parcels/CDH/bin/kafka-topics --zookeeper node01:2181 --list

创建topic
/opt/cloudera/parcels/CDH/bin/kafka-topics --zookeeper node01:2181,node02:2181,node03:2181 --create --replication-factor 1 --partitions 1 --topic test001
生产者
/opt/cloudera/parcels/CDH/bin/kafka-console-producer --broker-list node01:9092,node02:9092,node03:9092 --topic test001

消费者
/opt/cloudera/parcels/CDH/bin/kafka-console-consumer --bootstrap-server node01:9092,node02:9092,node03:9092 --topic test001 --from-beginning

3.4配置修改

创建配置文件
进入kafka配置目录
cd /opt/cloudera/parcels/CDH/lib/kafka/config

vim connect-distributed.properties

bootstrap.servers=10.30.202.21:9092,10.30.202.22:9092,10.30.202.23:9092
group.id=kafka-connect
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
config.storage.topic=connect-configs
config.storage.replication.factor=3
status.storage.topic=connect-status
status.storage.replication.factor=3
offset.flush.interval.ms=10000
rest.port=8533
plugin.path=/opt/cloudera/parcels/CDH/lib/kafka/libs/

3.5使用集群模式启动connect

cd /opt/cloudera/parcels/CDH/lib/kafka/bin && bash connect-distributed.sh -daemon ../config/connect-distributed.properties

3.6验证是否启动成功

浏览器访问node01:8533或者10.30.202.21:8533

3.7Debezium配置

下载、配置、解压

debezium-connector-mysql-1.5.0.Final-plugin.tar.gz包

复制jar包到kafka的lib下

cp debezium-connector-mysql/*  /opt/cloudera/parcels/CDH/lib/kafka/libs

3.8启动kafka-connect 检测是否生效

访问connector-plugins ,发现新增debezium插件。
浏览器访问node01:8533/connector-plugins或者10.30.202.21:8533/connector-plugins

3.9日志监测


tail -100f /opt/cloudera/parcels/CDH/lib/kafka/logs/connectDistributed.out
监控mysql的binlog
建立链接

curl -X POST -H 'Content-Type: application/json' -i 'http://10.30.202.21:8533/connectors' --data '{"name": "msyqldemotest04",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "10.22.82.123",
"database.port": "3306",
"database.user": "root",
"database.password": "123456",
"tombstones.on.delete": "false",
"database.server.id": "1",
"database.server.name": "demo1",
"database.history.kafka.topic": "mysql_master_history",
"database.history.kafka.bootstrap.servers": "10.30.202.21:9092",
"table.include.list":"sunlinekn.kn_kns_tran",
"database.include.list":"sunlinekn",
"include.schema.changes": "true",
"database.serverTimezone": "Asia/Shanghai",
"database.driver": "com.mysql.jdbc.Driver",
"database.history.kafka.recovery.poll.interval.ms": "3000",
"defaultFetchSize": "1000",
"database.tinyInt1isBit": "false",
"snapshot.locking.mode": "none",
"decimal.handling.mode": "string"
}
}'

说明:
1.执行上述语句后,会自动创建topic(demo1. sunlinekn.kn_kns_tran)
规则为:database.server.name.table.include.list
2. 执行上述语句后,浏览器访问node01:8533/connectors或者10.30.202.21:8533/connectors,可以看到刚刚创建的name

4.0消费验证

启动kafka消费者

/opt/cloudera/parcels/CDH/bin/kafka-console-consumer --bootstrap-server node01:9092,node02:9092,node03:9092 --topic demo1. sunlinekn.kn_kns_tran
 --from-beginning
4.1对mysql表进行修改
增加数据:
INSERT INTO sunlinekn.kn_kns_tran
(trandt, transq, tranti, trantp, tranbr, crcycd, tranam, acctno, toacct, transg)
VALUES('20170103', 'QF123000119', '20170603', 'SYS', '0120', '01', 10000.00, '1535004460311000003', '1535004460311000003', '1');

删除数据:
DELETe from sunlinekn.kn_kns_tran WHERe transq = 'QF123652229'

更新数据:
UPDATE sunlinekn.kn_kns_tran set transq =  'QF333444999' where transq =  'QF333444111'
4.2消费者结果截图

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/758387.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号