这里只是作为一个想法,Canal有监听binlog文件的功能。所以简单看了一下Canal的入门使用。
后续Canal实时数据同步的功能希望不会被我阉割......当然有大佬已经实现或者有其他方法实现mysql到es的实时数据同步,请一定要@我一哈,感激不尽以身相许!!!
阿里中的一个开源框架,用来实时同步mysql...数据到es...。主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
参考:QuickStart · alibaba/canal Wiki (github.com)
1.mysql准备[mysqld] log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECt, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;2.下载canal
Release v1.1.5 · alibaba/canal · GitHub
根据需要的版本进行下载。
3.adapter (1)解压缩mkdir /tmp/canal tar zxvf canal.deployer-$version.tar.gz -C /tmp/canal(2)配置修改
vi conf/example/instance.properties
## mysql serverId 可以自动配置 #canal.instance.mysql.slaveId = 1234 #position info,需要改成自己的数据库信息 canal.instance.master.address = 127.0.0.1:3306 canal.instance.master.journal.name = canal.instance.master.position = canal.instance.master.timestamp = #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = #username/password,需要改成自己的数据库信息 canal.instance.dbUsername = canal canal.instance.dbPassword = canal canal.instance.defaultDatabaseName = canal.instance.connectionCharset = UTF-8 #table regex canal.instance.filter.regex = .*\\..*
canal.instance.connectionCharset 代表数据库的编码方式对应到 java 中的编码类型,比如 UTF-8,GBK , ISO-8859-1
如果系统是1个 cpu,需要将 canal.instance.parser.parallel 设置为 false
(3)启动sh bin/startup.sh(4)查看 server 日志
vi logs/canal/canal.log
2013-02-05 22:45:27.967 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server. 2013-02-05 22:45:28.113 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.29.120:11111] 2013-02-05 22:45:28.210 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......(5)查看 instance 的日志
vi logs/example/example.log
2013-02-05 22:50:45.636 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties] 2013-02-05 22:50:45.641 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties] 2013-02-05 22:50:45.803 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 2013-02-05 22:50:45.810 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....(6) 关闭
sh bin/stop.sh(7)测试adapter
需要创建一个maven项目来测试
com.alibaba.otter canal.client1.1.0
package com.alibaba.otter.canal.sample;
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
public class SimpleCanalClientExample {
public static void main(String args[]) {
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
11111), "example", "", "");
int batchSize = 1000;
int emptyCount = 0;
try {
connector.connect();
connector.subscribe(".*\..*");
connector.rollback();
int totalEmptyCount = 120;
while (emptyCount < totalEmptyCount) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else {
emptyCount = 0;
// System.out.printf("message[batchId=%s,size=%s] n", batchId, size);
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
System.out.println("empty too many times, exit");
} finally {
connector.disconnect();
}
}
private static void printEntry(List entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIonBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStorevalue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (RowData rowdata: rowChage.getRowDatasList()) {
if (eventType == EventType.DELETe) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
运行Client
首先启动Canal Server,可参见QuickStart
启动Canal Client后,可以从控制台从看到类似消息:
empty count : 1 empty count : 2 empty count : 3 empty count : 4
此时代表当前数据库无变更数据
触发数据库变更
mysql> use test; Database changed mysql> CREATE TABLE `xdual` ( -> `ID` int(11) NOT NULL AUTO_INCREMENT, -> `X` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -> PRIMARY KEY (`ID`) -> ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 ; Query OK, 0 rows affected (0.06 sec) mysql> insert into xdual(id,x) values(null,now());Query OK, 1 row affected (0.06 sec)
可以从控制台中看到:
empty count : 1 empty count : 2 empty count : 3 empty count : 4 ================> binlog[mysql-bin.001946:313661577] , name[test,xdual] , eventType : INSERT ID : 4 update=true X : 2013-02-05 23:29:46 update=true4.deployer (1)解压缩
tar -zxvf canal.adapter-1.1.5.tar.gz -C canal-adapter(2)修改启动器配置
vi application.yml
server: port: 8081 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 default-property-inclusion: non_null canal.conf: # tcp kafka rocketMQ rabbitMQ canal-server运行的模式,TCP模式就是直连客户端,不经过中间件。kafka和mq是消息队列的模式 mode: tcp # kafka rocketMQ canalServerHost: 127.0.0.1:11111 # zookeeperHosts: slave1:2181 # mqServers: 127.0.0.1:9092 #or rocketmq # flatMessage: true batchSize: 500 syncBatchSize: 1000 retries: 0 timeout: accessKey: secretKey: # srcDataSources: # defaultDS: # url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true # username: root # password: 121212 srcDataSources: defaultDS: #这里用来修改为自己的数据库信息 url: jdbc:mysql://192.168.188.128:3306/test?useUnicode=true username: canal password: canal canalAdapters: - instance: example # canal instance Name or mq topic name groups: - groupId: g1 outerAdapters: #没有logger会出现错误且没有日志产生 - name: logger # - name: rdb # key: mysql1 # properties: # jdbc.driverClassName: com.mysql.jdbc.Driver # jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true # jdbc.username: root # jdbc.password: 121212 # - name: rdb # key: oracle1 # properties: # jdbc.driverClassName: oracle.jdbc.OracleDriver # jdbc.url: jdbc:oracle:thin:@localhost:49161:XE # jdbc.username: mytest # jdbc.password: m121212 # - name: rdb # key: postgres1 # properties: # jdbc.driverClassName: org.postgresql.Driver # jdbc.url: jdbc:postgresql://localhost:5432/postgres # jdbc.username: postgres # jdbc.password: 121212 # threads: 1 # commitSize: 3000 # - name: hbase # properties: # hbase.zookeeper.quorum: 127.0.0.1 # hbase.zookeeper.property.clientPort: 2181 # zookeeper.znode.parent: /hbase #9300对应transport,9200对应rest,es对应conf下的包。 - name: es hosts: 127.0.0.1:9200 # 127.0.0.1:9200 for rest mode properties: mode: rest # transport or rest # security.auth: test:123456 # only used for rest mode cluster.name: elasticsearch(3)表映射文件
# 指定数据源,这个值和adapter的application.yml文件中配置的srcDataSources值对应。
dataSourceKey: defaultDS
# 指定canal-server中配置的某个实例的名字,不同实例对应不同业务
destination: example
# 组ID ,tcp方式这里填写空,不要填写值,不然可能会接收不到数据
groupId:
# ES的mapping(映射)
esMapping:
# ES索引名称
_index: testsync2
# ES标示文档的唯一标示,通常对应数据表中的主键ID字段
_id: _id
# upsert: true
# pk: id
# 数据表每个字段映射到表中的具体名称,不能重复
sql: "select a.id as _id, a.name,a.age,a.age_2,a.message,a.insert_time from testsync as a"
# objFields:
# _labels: array:;
# etlCondition: "where a.c_time>={}"
commitBatch: 10
dataSourceKey: defaultDS #对应application.yml中的datasourceConfigs下的配置
destination: example # 对应tcp模式下的canal instance或者MQ模式下的topic
groupId: g1 # 注意,同步Hbase数据这里groupId不要填写内容,对应MQ模式下的groupId, 只会同步对应groupId的数据
esMapping:
_index: ceshi #索引库名称
_type: _doc #索引库类型
_id: _id #索引库主键
upsert: true
# pk: id
sql: "select a.id as _id,a.id,a.name as lname,a.age as lage,b.name as rname,b.age as rage from hleft a left join hright b on a.id=b.id"
# objFields:
# _labels: array:;
etlCondition: "where a.is_delete=={0} and b.is_delete={0}"
commitBatch: 3000
(4) 启动
启动:bin/startup.sh 停止:bin/stop.sh 重启:bin/restart.sh 日志目录:logs/adapter/adapter.log
启动前提是已经有索引库和相应的字段。
这里也不得不提一下使用Logstash同步数据时,是可以直接创建索引中的字段的。只是后续的字段类型可以需要根据需求进行修改。因此我觉得可以先用logstash进行一次全量同步,然后后续的实时数据交给canal。
欢迎各位大佬批评指正,如需转载请表明出处......



