1,环境准备:zk, kafka, mysql-master
1.1,mysql启用binlog1.2,启动zk, kafka
2,安装配置canal3,测试使用
官网介绍:
https://github.com/alibaba/canal/wiki/Introduction
| 角色 | IP |
|---|---|
| mysql-master | 192.168.56.1 (windows) |
| zk, kafka, canal1,canal2 | 192.168.56.7 (centos7) |
[root@c7 kafka_2.11-1.1.0-packs]# mysql -uroot -proot -h192.168.56.1 Warning: Using a password on the command line interface can be insecure. Welcome to the MySQL monitor. Commands end with ; or g. Your MySQL connection id is 50 Server version: 5.7.34-log MySQL Community Server (GPL) Copyright (c) 2000, 2015, Oracle and/or its affiliates. All rights reserved. Oracle is a registered trademark of Oracle Corporation and/or its affiliates. Other names may be trademarks of their respective owners. Type 'help;' or 'h' for help. Type 'c' to clear the current input statement. mysql> show variables like 'binlog_format'; +---------------+-------+ | Variable_name | Value | +---------------+-------+ | binlog_format | ROW | +---------------+-------+ 1 row in set, 1 warning (0.00 sec) mysql> show variables like 'server_id'; +---------------+-------+ | Variable_name | Value | +---------------+-------+ | server_id | 1 | +---------------+-------+ 1 row in set, 1 warning (0.01 sec) mysql> show master status; +------------------+----------+--------------+------------------+-------------------+ | File | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set | +------------------+----------+--------------+------------------+-------------------+ | mysql-bin.000007 | 1817 | | | | +------------------+----------+--------------+------------------+-------------------+ 1 row in set (0.01 sec)1.2,启动zk, kafka
[root@c7 zookeeper-3.5.5]# cd conf/ [root@c7 conf]# cat zoo.cfg |grep -v ^# tickTime=2000 initLimit=10 syncLimit=5 dataDir=/var/lib/zookeeper/ clientPort=2181 admin.serverPort=8881 [root@c7 conf]# cd .. [root@c7 zookeeper-3.5.5]# sh bin/zkServer.sh start ZooKeeper JMX enabled by default Using config: /root/zookeeper-3.5.5/bin/../conf/zoo.cfg Starting zookeeper ... already running as process 3108. [root@c7 kafka_2.11-1.1.0-packs]# cat config/server.properties |grep -v ^# |grep -v ^$ broker.id=0 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/export/common/kafka_2.11-1.1.0/logs num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0 [root@c7 kafka_2.11-1.1.0-packs]# sh bin/kafka-server-start.sh -daemon config/server.properties [root@c7 kafka_2.11-1.1.0-packs]# tail logs/server.log [2022-02-14 14:56:44,444] INFO Client environment:user.name=root (org.apache.zookeeper.ZooKeeper) [2022-02-14 14:56:44,444] INFO Client environment:user.home=/root (org.apache.zookeeper.ZooKeeper) [2022-02-14 14:56:44,444] INFO Client environment:user.dir=/root/kafka_2.11-1.1.0-packs (org.apache.zookeeper.ZooKeeper) [2022-02-14 14:56:44,446] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=6000 watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@6c64cb25 (org.apache.zookeeper.ZooKeeper) [2022-02-14 14:56:44,883] INFO [ZooKeeperClient] Waiting until connected. (kafka.zookeeper.ZooKeeperClient) [2022-02-14 14:56:44,883] INFO Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn) [2022-02-14 14:56:44,935] INFO Socket connection established to localhost/0:0:0:0:0:0:0:1:2181, initiating session (org.apache.zookeeper.ClientCnxn) [2022-02-14 14:56:44,943] INFO Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid = 0x100002e9f38000d, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn) [2022-02-14 14:56:44,945] INFO [ZooKeeperClient] Connected. (kafka.zookeeper.ZooKeeperClient) [2022-02-14 14:56:46,492] INFO Cluster ID = Laod0bPIRiCmex0wK3QRjw (kafka.server.KafkaServer)2,安装配置canal
canal2 只要修改canal.properties中对应的端口,即可启动
[root@c7 ~]# ll -h canal.deployer-1.1.5.tar.gz -rw-r--r-- 1 root root 58M Feb 14 11:07 canal.deployer-1.1.5.tar.gz [root@c7 ~]# mkdir canal [root@c7 ~]# tar -xf canal.deployer-1.1.5.tar.gz -C canal [root@c7 ~]# ll canal total 4 drwxr-xr-x 2 root root 93 Feb 14 14:37 bin drwxr-xr-x 5 root root 151 Feb 14 14:44 conf drwxr-xr-x 2 root root 4096 Feb 14 11:08 lib drwxrwxrwx 4 root root 34 Feb 14 11:18 logs drwxrwxrwx 2 root root 177 Apr 19 2021 plugin [root@c7 ~]# cd canal/conf [root@c7 conf]# vim canal.properties ################################################# ######### common argument ############# ################################################# # tcp bind ip canal.ip = # register ip to zookeeper canal.register.ip =192.168.56.7 canal.port = 11111 canal.metrics.pull.port = 11112 # canal instance user/passwd # canal.user = canal # canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458 # canal admin config #canal.admin.manager = 127.0.0.1:8089 canal.admin.port = 11110 canal.admin.user = admin canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441 # admin auto register #canal.admin.register.auto = true #canal.admin.register.cluster = #canal.admin.register.name = canal.zkServers = 192.168.56.7:2181 # flush data to zk canal.zookeeper.flush.period = 1000 canal.withoutNetty = false # tcp, kafka, rocketMQ, rabbitMQ canal.serverMode = kafka .... ################################################# ######### destinations ############# ################################################# canal.destinations = example # conf root dir canal.conf.dir = ../conf .... ################################################## ######### Kafka ############# ################################################## kafka.bootstrap.servers = 192.168.56.7:9092 .... [root@c7 conf]# vim example/instance.properties ################################################# ## mysql serverId , v1.0.26+ will autoGen # canal.instance.mysql.slaveId=0 # enable gtid use true/false canal.instance.gtidon=false # position info canal.instance.master.address=192.168.56.1:3306 canal.instance.master.journal.name=mysql-bin.000007 canal.instance.master.position=1817 .... # username/password canal.instance.dbUsername=root canal.instance.dbPassword=root canal.instance.connectionCharset = UTF-8 .... # mq config (需提前创建好topic ) canal.mq.topic=canal_test ....3,测试使用
问题:主备节点切换时,发现会向kafka发送部分重复数据。
[root@c7 ~]# sh canal/bin/startup.sh
found canal.pid , Please run stop.sh first ,then startup.sh
[root@c7 ~]# sh canal2/bin/startup.sh
found canal.pid , Please run stop.sh first ,then startup.sh
#查看 zk 上的记录
[zk: localhost:2181(CONNECTED) 0] ls /
[cluster, controller, brokers, zookeeper, admin, isr_change_notification, log_dir_event_notification, otter, controller_epoch, clickhouse, consumers, latest_producer_id_block, config]
[zk: localhost:2181(CONNECTED) 1] ls /otter
[canal]
[zk: localhost:2181(CONNECTED) 2] ls /otter/canal
[cluster, destinations]
[zk: localhost:2181(CONNECTED) 3] ls /otter/canal/cluster
[192.168.56.7:11111, 192.168.56.7:22222]
[zk: localhost:2181(CONNECTED) 4] ls /otter/canal/destinations
[example]
[zk: localhost:2181(CONNECTED) 5] ls /otter/canal/destinations/example
[running, cluster]
[zk: localhost:2181(CONNECTED) 6] ls /otter/canal/destinations/example/running
[]
[zk: localhost:2181(CONNECTED) 7] get /otter/canal/destinations/example/running
{"active":true,"address":"192.168.56.7:11111"}
cZxid = 0xa53
ctime = Mon Feb 14 14:19:08 CST 2022
mZxid = 0xa53
mtime = Mon Feb 14 14:19:08 CST 2022
pZxid = 0xa53
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x100002e9f380008
dataLength = 46
numChildren = 0
[zk: localhost:2181(CONNECTED) 8]
[zk: localhost:2181(CONNECTED) 8] ls /otter/canal/destinations/example/cluster
[192.168.56.7:11111, 192.168.56.7:22222]
####创建数据库表,并插入数据
mysql> create database t1;
Query OK, 1 row affected (0.11 sec)
mysql> use t1;
Database changed
mysql> create table per(id int);
Query OK, 0 rows affected (0.25 sec)
mysql> insert into per values(6),(7);
Query OK, 2 rows affected (0.06 sec)
Records: 2 Duplicates: 0 Warnings: 0
mysql> insert into per values(8);
Query OK, 1 row affected (0.16 sec)
###########消费kafka topic
[root@c7 ~]# kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic canal_test
{"data":null,"database":"t1","es":1644819646000,"id":4,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"create database t1","sqlType":null,"table":"","ts":1644819647070,"type":"QUERY"}
{"data":null,"database":"t1","es":1644819657000,"id":5,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"create table per(id int)","sqlType":null,"table":"per","ts":1644819658225,"type":"CREATE"}
{"data":[{"id":"6"},{"id":"7"}],"database":"t1","es":1644819670000,"id":6,"isDdl":false,"mysqlType":{"id":"int"},"old":null,"pkNames":null,"sql":"","sqlType":{"id":4},"table":"per","ts":1644819670986,"type":"INSERT"}
{"data":[{"id":"8"}],"database":"t1","es":1644820545000,"id":7,"isDdl":false,"mysqlType":{"id":"int"},"old":null,"pkNames":null,"sql":"","sqlType":{"id":4},"table":"per","ts":1644820546063,"type":"INSERT"}



