栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

使用canal保持mysql与kafka数据同步

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

使用canal保持mysql与kafka数据同步

1.下载canel

https://github.com/alibaba/canal/releases
2. 开启 MySQL 的 binlog 配置 如果你忘记了my.cnf的路径 find / -name my.cnf
cd /etc
vi my.cnf
#打开my.cnf(window my.ini) 在【mysqld】块中添加 
server-id=1
log-bin=mysql-bin 
binlog_format=row 
binlog-do-db=你数据库的名字 多个用逗号隔开(这里是需要监控的数据库的名字)
这里我的数据库名称是mydemo

打开官网 下载

canal.deployer-1.1.4.tar.gz
# 创建1个文件夹
mkdir -p /opt/soft/canal 
# 将tar.gz移动到canal下 
mv canal.deployer-1.1.4.tar.gz /opt/soft/canal 
# 解压 
tar -zxf canal.deployer-1.1.4.tar.g

进入解压后的conf 目录

cd /opt/soft/canal/conf
vi canal.properties
# 可选项: tcp(默认), kafka, RocketMQ# 更改模式,直接把数据扔进 Kafka
canal.serverMode = kafka
# ...
# kafka/rocketmq 集群配置:
canal.mq.servers = 192.168.xx.xx:9092
canal.mq.retries = 0
# flagMessage模式下可以调大该值, 但不要超过MQ消息体大小上限
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
# flatMessage模式下请将该值改大, 建议50-200
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
# Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)
canal.mq.canalBatchSize = 50
# Canal get数据的超时时间, 单位: 毫秒, 空为不限超时
canal.mq.canalGetTimeout = 100
# 是否为flat json格式对象
#canal.mq.flatMessage = true # 使用文本格式(JSON)进行传输,否则 Kafka 里扔进去的是二进
制数据,虽然不影响,但是看起来不方便
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
# kafka消息投递是否使用事务
canal.mq.transaction = false

 修改一下几处

cd example
vi instance.properties
# 修改 conf/example/instance.properties
# 按需修改成自己的数据库信息
#################################################
#需要同步的mysql地址,端口
canal.instance.master.address=192.168.x.x:3306
# username/password,数据库的用户名和密码
...
canal.instance.dbUsername = root
canal.instance.dbPassword = xx #需要同步的具体表,也可以使用正则表达式监听多张表或者库
#canal.instance.filter.regex=realtime.tpp_vehicle_through_points_history
...
# mq config# topic的名称
canal.mq.topic=mytest
# 针对库名或者表名发送动态topic
#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\..*,.*\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
# 库名.表名: 唯一主键,多个表之间用逗号分隔
#canal.mq.partitionHash=mytest.person:id,mytest.role:id

 不多说

vi /etc/fprofile

#canal
export CANAL_HOME=/opt/soft/canal
export PATH=$PATH:$CANAL_HOME/bin
添加环境变量
source /etc/profile

启动canal

#配置好/etc/profile
startup.sh

6.开启kafka监控查看数据的录入

kafka-console-consumer.sh --bootstrap-server 192.168.80.181:9092 --topic mykafkademo

 消息队列中的名称是在这设置的

然后启动你的mysql 进入mydemo数据库中插入数据 kafka就能实时读取数据了

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/709650.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号