vim canal.properties
# tcp, kafka, RocketMQ 这里选择kafka模式 canal.serverMode = kafka # 解析器的线程数,打开此配置,不打开则会出现阻塞或者不进行解析的情况 canal.instance.parser.parallelThreadSize = 16 # 配置MQ的服务地址,这里配置的是kafka对应的地址和端口 canal.mq.servers = 192.168.66.101:9092 # 配置instance,在conf目录下要有example同名的目录,可以配置多个 canal.destinations = example
vim /conf/example/instance.properties
## mysql serverId , v1.0.26+ will autoGen(自动生成,不需配置) # canal.instance.mysql.slaveId=90 # position info canal.instance.master.address=127.0.0.1:3306 # 在Mysql执行 SHOW MASTER STATUS;查看当前数据库的binlog canal.instance.master.journal.name=mysql-bin.000001 canal.instance.master.position=154 # 账号密码 canal.instance.dbUsername=canal canal.instance.dbPassword=canal canal.instance.connectionCharset = UTF-8 #MQ队列名称 canal.mq.topic=canaltopic #单队列模式的分区下标 canal.mq.partition=0
创建主题和监听消费
./bin/kafka-topics.sh --create --zookeeper 192.168.66.101:2181 --replication-factor 1 --partitions 1 --topic canaltopic ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.66.101:9092 --from-beginning --topic canaltopic
INSERT INTO tb_commodity_info VALUES('1','demo','test',3,'.......');



