下载地址:https://github.com/alibaba/canal/releases/tag/canal-1.1.5
准备工作安装mysql5.7 , es7.x,安装过程略…
注意: mysql 需要开启 binlog
[mysqld] og-bin=mysql-bin #添加这一行就ok binlog-format=ROW #选择row模式 server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复mysql中 配置canal数据库管理用户,配置相应权限(repication权限)(如果使用root用户忽略此步骤)
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECt, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;解压文件 配置canal-deployer/conf/example/instance.properties 启动deployer
./startup.sh配置 /canal-adapter/conf/application.yml
USER:
HOSTNAME%%.*:
PWD/#$HOME/~:
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
consumerProperties:
# canal tcp consumer
canal.tcp.server.host: 127.0.0.1:11111
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
# kafka consumer
# kafka.bootstrap.servers: 127.0.0.1:9092
# kafka.enable.auto.commit: false
# kafka.auto.commit.interval.ms: 1000
# kafka.auto.offset.reset: latest
# kafka.request.timeout.ms: 40000
# kafka.session.timeout.ms: 30000
# kafka.isolation.level: read_committed
# kafka.max.poll.records: 1000
# rocketMQ consumer
# rocketmq.namespace:
# rocketmq.namesrv.addr: 127.0.0.1:9876
# rocketmq.batch.size: 1000
# rocketmq.enable.message.trace: false
# rocketmq.customized.trace.topic:
# rocketmq.access.channel:
# rocketmq.subscribe.filter:
# rabbitMQ consumer
# rabbitmq.host:
# rabbitmq.virtual.host:
# rabbitmq.username:
# rabbitmq.password:
# rabbitmq.resource.ownerId:
srcDataSources:
defaultDS:
url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true&useSSL=false&serverTimezone=Asia/Shanghai&characterEncoding=utf-8&autoReconnect=true
username: root
password: Yi123456789
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger
# - name: rdb
# key: mysql1
# properties:
# jdbc.driverClassName: com.mysql.jdbc.Driver
# jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
# jdbc.username: root
# jdbc.password: 121212
# - name: rdb
# key: oracle1
# properties:
# jdbc.driverClassName: oracle.jdbc.OracleDriver
# jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
# jdbc.username: mytest
# jdbc.password: m121212
# - name: rdb
# key: postgres1
# properties:
# jdbc.driverClassName: org.postgresql.Driver
# jdbc.url: jdbc:postgresql://localhost:5432/postgres
# jdbc.username: postgres
# jdbc.password: 121212
# threads: 1
# commitSize: 3000
# - name: hbase
# properties:
# hbase.zookeeper.quorum: 127.0.0.1
# hbase.zookeeper.property.clientPort: 2181
# zookeeper.znode.parent: /hbase
- name: es7
# key: es-test
hosts: http://127.0.0.1:9200 # 127.0.0.1:9200 for rest mode
properties:
mode: rest # or rest
# # security.auth: test:123456 # only used for rest mode
cluster.name: my-application
# - name: kudu
# key: kudu
# properties:
# kudu.master.address: 127.0.0.1 # ',' split multi address
配置 es7 同步文件 /canal-adapter/conf/es7/mytest_user.yml
dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
_index: mytest_user
_id: _id
_type: _doc
upsert: true
pk: id
sql: "select a.id as _id, a.name, a.role_id, b.role_name from m_user a
left join m_role b on b.id=a.role_id"
# objFields:
# _labels: array:;
etlCondition: "where a.id = {}"
commitBatch: 3000
启动adapter
./startup.sh常见问题总结 问题一
DruidDataSource jar包依赖冲突问题
该问题会导致源数据变更了,但无法写入es中,这个需要下载canal 源码包,修改client-adapter escore模块下的pom文件,对应位置加上provided配置,如下图:
在编译过程中有可能出现
[Help 1][ERROR]" />
将 guava 版本改为 18.0
ERROR c.a.otter.canal.parse.inbound.mysql.MysqlEventParser - dump address /127.0.0.1:3306 has an error, retrying. caused by com.alibaba.otter.canal.parse.exception.CanalParseException: can't find start position for example
原因:meta.dat 中保存的位点信息和数据库的位点信息不一致;导致canal抓取不到数据库的动作;
解决方案:删除meta.dat删除,再重启canal,问题解决;
集群操作:进入canal对应的zookeeper集群下,删除节点/otter/canal/destinations/xxxxx/1001/cursor ;重启canal即可恢复;
java.lang.OutOfMemoryError: Java heap space
canal消费端挂了太久,在zk对应conf下节点的
/otter/canal/destinations/test_db/1001/cursor 位点信息是很早以前,导致重启canal时,从很早以前的位点开始消费,导致canal服务器内存爆掉
监听数据库变更,只有TransactionBegin/TransactionEnd,没有拿到数据的EventType;
原因可能是canal.instance.filter.black.regex=.*…*导致,改canal.instance.filter.black.regex= 再重启试试
ERROR com.alibaba.otter.canal.common.alarm.LogAlarmHandler - destination:fdyb_db[com.alibaba.otter.canal.parse.exception.CanalParseException: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed. Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed. Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: com.google.common.collect.ComputationException: com.alibaba.otter.canal.parse.exception.CanalParseException: fetch failed by table Caused by: com.google.common.collect.ComputationException: com.alibaba.otter.canal.parse.exception.CanalParseException: fetch failed by table Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: fetch failed by Caused by: java.io.IOException: ErrorPacket [errorNumber=1142, fieldCount=-1,
分析:mysql系统表权限较高,canal读该表的binlog失败,位点无法移动
解决:将配置项中黑名单加上mysql下的所有表:canal.instance.filter.black.regex = mysql…* ,修改后canal集群不需要重启即可恢复;
其它注意点:检查下CanalConnector是否调用subscribe(filter)方法;有的话,filter需要和instance.properties的canal.instance.filter.regex一致,否则subscribe的filter会覆盖instance的配置,如果subscribe的filter是.…,那么相当于你消费了所有的更新数据。



