canal-admin服务安装配置服务器系统:CentOS 8
数据库版本:MySQL 5.7
Canal版本:canal 1.1.5
1、开启mysql的binlog日志,且binlog-format 为 ROW 模式,默认在:/etc/my.cnf
[mysqld] log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择 ROW 模式
2、下载 canal.admin(可视化管理控制台) 和 canal.deployer(canal服务)下载地址
3、上传至服务器,比如:/usr/local/src 目录(以下操作均基于此目录)
4、创建 canal-admin目录并解压缩canal.admin
cd ../ mkdir -p canal/canal-admin cd src tar -xzvf canal.admin-1.1.5.tar.gz -C ../canal/canal-admin
5、进入canal-admin目录,修改配置文件,修改数据库连接地址和用户密码
cd ../canal/canal-admin vi conf/application.yml
server:
port: 8089 # 管理控制台的访问端口
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
spring.datasource:
address: 127.0.0.1:3306 # admin服务的数据库地址
database: canal_manager # admin服务的数据库名
username: canal # 数据库连接用户名
password: canal # 数据库连接密码
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
hikari:
maximum-pool-size: 30
minimum-idle: 1
canal:
adminUser: admin # admin用户名,用于canal-server连接控制台的用户名
adminPasswd: admin # admin用户名,用于canal-server连接控制台的密码
6、初始化上面配置的数据库,初始化脚本: conf/canal_manager.sql ,初始化SQL脚本里会默认创建canal_manager的数据库
通过mysql服务器导入
# 登录mysql mysql -uroot -p # 导入初始化SQL > source conf/canal_manager.sql
通过客户端导入,比如:navicat,先下载 conf/canal_manager.sql 到本地
右键数据库连接,运行sql文件
7、启动canal-admin服务
./bin/startup.sh
8、查看日志,是否启动成功
tail -f logs/admin.log
输出如下信息,服务启动成功
9、访问admin控制台
浏览器输入服务访问地址,例:http://192.168.187.130:8089/
默认密码:admin/123456
若无法访问,查看服务器防火墙是否开放端口访问
# 查看配置的端口列表 firewall-cmd --permanent --list-port # 添加8089端口公共访问权限 firewall-cmd --zone=public --add-port=8089/tcp --permanent # 重启防火墙 firewall-cmd --reloadcanal-server服务部署和配置
1、创建canal-server文件夹,解压canal.deployer
cd /usr/local mkdir canal/canal-server cd src tar -xzvf canal.deployer-1.1.5.tar.gz -C ../canal/canal-server
2、修改配置文件,启动canal-server服务
cd ../canal/canal-server/ vi conf/canal_local.properties
# register ip canal.register.ip = # canal admin config # admin管理后台的地址 canal.admin.manager = 127.0.0.1:8089 # admin端口,控制台新建server使用,默认即可 canal.admin.port = 11110 # admin用户名,即canal-admin中application.yml配置的adminUser canal.admin.user = admin # admin密码,即canal-admin中application.yml配置的adminPasswd canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441 # admin auto register 自动注册到admin canal.admin.register.auto = true canal.admin.register.cluster = canal.admin.register.name =
canal.admin.passwd是一个密文,密文的生成方式,登录mysql,执行如下密文生成sql,去掉首字符星号即可
select password('admin123');
3、启动canal-server服务
# 指定启动配置为local,或者将canal_local.properties替换掉canal.properties ./bin/startup.sh local
4、查看日志,出现如下日志,说明启动成功
canal-server启动成功后,刷新admin的server管理列表,canal-server会自动注册到admin
新建server
新建instance
MySQL 账号需要具有作为 MySQL slave 的权限,可以新增账号或修改现有账号权限
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;
启动实例
新建Java项目,引入canal-client依赖
com.alibaba.otter canal.client 1.1.5 com.alibaba.otter canal.protocol 1.1.5
编写测试类
@Test
public void canalClientTest() {
// 新建连接 192.168.187.130:11111 canal-server的服务器地址和端口, test为Instance名称, 没有设置用户名密码, 传null
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.187.130", 11111), "test", null, null);
try {
// 连接服务
connector.connect();
// 订阅服务
connector.subscribe();
// 回滚到没有ask的地方开始
connector.rollback();
Message message;
while (true) {
// 获取指定数量的数据
message = connector.getWithoutAck(100);
if (message.getId() == -1 || message.getEntries().size() == 0) {
try {
System.out.println("未获取到数据,休眠2s后重新获取");
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
// 解析数据
parseEntryList(message.getEntries());
}
// 提交确认
connector.ack(message.getId());
}
} catch (Exception e) {
e.printStackTrace();
} finally {
connector.disconnect();
}
}
private void parseEntryList(List list) {
list.forEach(a -> parseEntry(a));
}
private void parseEntry(CanalEntry.Entry entry) {
EntryType entryType = entry.getEntryType();
if (entryType == CanalEntry.EntryType.TRANSACTIonBEGIN || entryType == CanalEntry.EntryType.TRANSACTIONEND) {
// 事务开启或事务结束标记, 跳过, 不处理
return;
}
CanalEntry.RowChange rowChange;
try {
// 解析数据
rowChange = CanalEntry.RowChange.parseFrom(entry.getStorevalue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
}
Header header = entry.getHeader();
CanalEntry.EventType eventType = rowChange.getEventType();
System.out.println("==================== start ==================");
System.out.println(String.format("binlog: %s, position: %s, schemaName: %s, tableName: %s, eventType: %s", header.getLogfileName(),
header.getLogfileOffset(), header.getSchemaName(), header.getTableName(), eventType));
printRowData(rowChange.getRowDatasList(), eventType);
System.out.println("==================== end ====================");
}
private void printRowData(List rowDatasList, EventType eventType) {
for (RowData rowData : rowDatasList) {
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("after");
printColumn(rowData.getAfterColumnsList());
}
}
}
private static void printColumn(List columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
运行测试类,数据库插入一条数据
INSERT INTO `test_user` (`id`, `name`, `create_time`) VALUES (2, '测试', now());
参考:https://www.bookstack.cn/read/canal-v1.1.4/507a40ad1c1335da.md



