栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Canal-admin 数据订阅和消费服务

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Canal-admin 数据订阅和消费服务

服务器系统:CentOS 8
数据库版本:MySQL 5.7
Canal版本:canal 1.1.5

canal-admin服务安装配置

1、开启mysql的binlog日志,且binlog-format 为 ROW 模式,默认在:/etc/my.cnf

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式

2、下载 canal.admin(可视化管理控制台) 和 canal.deployer(canal服务)下载地址
3、上传至服务器,比如:/usr/local/src 目录(以下操作均基于此目录)
4、创建 canal-admin目录并解压缩canal.admin

cd ../
mkdir -p canal/canal-admin 
cd src
tar -xzvf canal.admin-1.1.5.tar.gz -C ../canal/canal-admin

5、进入canal-admin目录,修改配置文件,修改数据库连接地址和用户密码

cd ../canal/canal-admin
vi conf/application.yml
server:
  port: 8089 # 管理控制台的访问端口
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8

spring.datasource:
  address: 127.0.0.1:3306  # admin服务的数据库地址
  database: canal_manager  # admin服务的数据库名
  username: canal  # 数据库连接用户名
  password: canal  # 数据库连接密码
  driver-class-name: com.mysql.jdbc.Driver
  url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
  hikari:
    maximum-pool-size: 30
    minimum-idle: 1

canal:
  adminUser: admin # admin用户名,用于canal-server连接控制台的用户名
  adminPasswd: admin # admin用户名,用于canal-server连接控制台的密码

6、初始化上面配置的数据库,初始化脚本: conf/canal_manager.sql ,初始化SQL脚本里会默认创建canal_manager的数据库
通过mysql服务器导入

# 登录mysql
mysql -uroot -p
# 导入初始化SQL
> source conf/canal_manager.sql

通过客户端导入,比如:navicat,先下载 conf/canal_manager.sql 到本地
右键数据库连接,运行sql文件



7、启动canal-admin服务

./bin/startup.sh

8、查看日志,是否启动成功

tail -f logs/admin.log

输出如下信息,服务启动成功

9、访问admin控制台
浏览器输入服务访问地址,例:http://192.168.187.130:8089/
默认密码:admin/123456

若无法访问,查看服务器防火墙是否开放端口访问

# 查看配置的端口列表
firewall-cmd --permanent --list-port
# 添加8089端口公共访问权限
firewall-cmd --zone=public --add-port=8089/tcp --permanent
# 重启防火墙
firewall-cmd --reload
canal-server服务部署和配置

1、创建canal-server文件夹,解压canal.deployer

cd /usr/local
mkdir canal/canal-server
cd src
tar -xzvf canal.deployer-1.1.5.tar.gz -C ../canal/canal-server

2、修改配置文件,启动canal-server服务

cd ../canal/canal-server/
vi conf/canal_local.properties
# register ip
canal.register.ip =

# canal admin config
# admin管理后台的地址
canal.admin.manager = 127.0.0.1:8089
# admin端口,控制台新建server使用,默认即可
canal.admin.port = 11110
# admin用户名,即canal-admin中application.yml配置的adminUser
canal.admin.user = admin
# admin密码,即canal-admin中application.yml配置的adminPasswd
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register 自动注册到admin
canal.admin.register.auto = true
canal.admin.register.cluster =
canal.admin.register.name =

canal.admin.passwd是一个密文,密文的生成方式,登录mysql,执行如下密文生成sql,去掉首字符星号即可

select password('admin123');

3、启动canal-server服务

# 指定启动配置为local,或者将canal_local.properties替换掉canal.properties
./bin/startup.sh local

4、查看日志,出现如下日志,说明启动成功

canal-server启动成功后,刷新admin的server管理列表,canal-server会自动注册到admin

新建server


新建instance

MySQL 账号需要具有作为 MySQL slave 的权限,可以新增账号或修改现有账号权限

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

启动实例

创建canal-client项目

新建Java项目,引入canal-client依赖


    com.alibaba.otter
    canal.client
    1.1.5


    com.alibaba.otter
    canal.protocol
    1.1.5

编写测试类

    @Test
	public void canalClientTest() {
		// 新建连接 192.168.187.130:11111 canal-server的服务器地址和端口, test为Instance名称, 没有设置用户名密码, 传null
		CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.187.130", 11111), "test", null, null);
		try {
			// 连接服务
			connector.connect();
			// 订阅服务
			connector.subscribe();
			// 回滚到没有ask的地方开始
			connector.rollback();
			Message message;
			while (true) {
				// 获取指定数量的数据
				message = connector.getWithoutAck(100);
				if (message.getId() == -1 || message.getEntries().size() == 0) {
					try {
						System.out.println("未获取到数据,休眠2s后重新获取");
						TimeUnit.SECONDS.sleep(2);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				} else {
					// 解析数据
					parseEntryList(message.getEntries());
				}
				// 提交确认
				connector.ack(message.getId());
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			connector.disconnect();
		}
	}
	
	private void parseEntryList(List list) {
		list.forEach(a -> parseEntry(a));
	}
	
	
	private void parseEntry(CanalEntry.Entry entry) {
		EntryType entryType = entry.getEntryType();
		if (entryType == CanalEntry.EntryType.TRANSACTIonBEGIN || entryType == CanalEntry.EntryType.TRANSACTIONEND) {
			// 事务开启或事务结束标记, 跳过, 不处理
			return;
		}

		CanalEntry.RowChange rowChange;
		try {
			// 解析数据
			rowChange = CanalEntry.RowChange.parseFrom(entry.getStorevalue());
		} catch (Exception e) {
			throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
		}

		Header header = entry.getHeader();
		CanalEntry.EventType eventType = rowChange.getEventType();
		
		System.out.println("==================== start ==================");
		System.out.println(String.format("binlog: %s, position: %s, schemaName: %s, tableName: %s, eventType: %s", header.getLogfileName(), 
				header.getLogfileOffset(), header.getSchemaName(), header.getTableName(), eventType));
		printRowData(rowChange.getRowDatasList(), eventType);
		System.out.println("==================== end ====================");
	}

	private void printRowData(List rowDatasList, EventType eventType) {
		for (RowData rowData : rowDatasList) {
            if (eventType == EventType.DELETE) {
                printColumn(rowData.getBeforeColumnsList());
            } else if (eventType == EventType.INSERT) {
                printColumn(rowData.getAfterColumnsList());
            } else {
                System.out.println("before");
                printColumn(rowData.getBeforeColumnsList());
                System.out.println("after");
                printColumn(rowData.getAfterColumnsList());
            }
        }
	}
	
	private static void printColumn(List columns) {
	    for (Column column : columns) {
	        System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
	    }
	}

运行测试类,数据库插入一条数据

INSERT INTO `test_user` (`id`, `name`, `create_time`) VALUES (2, '测试', now());

参考:https://www.bookstack.cn/read/canal-v1.1.4/507a40ad1c1335da.md

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/592426.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号