栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Canal系列2-Canal同步到Kafka

Canal系列2-Canal同步到Kafka

文章目录

一. MySQL的准备

1.1 binlog格式1.2 创建库表1.3 赋权限 二. Canal安装及配置

2.1 Canal下载及安装

https://github.com/alibaba/canal/releases 2.2 修改 canal.properties 的配置2.3 修改 instance.properties2.4 启动 Canal2.5 看到 CanalLauncher 你表示启动成功,同时会创建 canal_test 主题2.6 启动 Kafka 消费客户端测试,查看消费情况2.7 向 MySQL 中插入数据后查看消费者控制台 参考:

一. MySQL的准备

MySQL版本 5.7.31

1.1 binlog格式

vim /etc/my.cnf

server-id=1
log-bin=mysql-bin 
binlog_format=row 
binlog-do-db=canal_test

重启mysql服务

测试记录:

mysql> show variables like '%log_bin%';
+---------------------------------+-------------------------------------+
| Variable_name                   | Value                               |
+---------------------------------+-------------------------------------+
| log_bin                         | ON                                  |
| log_bin_basename                | /home/mysql/data/3306/hp8-bin       |
| log_bin_index                   | /home/mysql/data/3306/hp8-bin.index |
| log_bin_trust_function_creators | OFF                                 |
| log_bin_use_v1_row_events       | OFF                                 |
| sql_log_bin                     | ON                                  |
+---------------------------------+-------------------------------------+
6 rows in set (0.01 sec)

mysql> show variables like '%binlog_format%';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW   |
+---------------+-------+
1 row in set (0.00 sec)

mysql> 
1.2 创建库表

代码:

create database canal_test default character set utf8;
show create database canal_testG

use canal_test;
CREATE TABLE user_info(
`id` VARCHAR(255),
`name` VARCHAR(255),
`sex` VARCHAR(255)
);

测试记录:

mysql> create database canal_test default character set utf8;
Query OK, 1 row affected (0.00 sec)

mysql> show create database canal_testG
*************************** 1. row ***************************
       Database: canal_test
Create Database: CREATE DATAbase `canal_test` 
1 row in set (0.00 sec)

mysql> 
mysql> use canal_test;
Database changed
mysql> CREATE TABLE user_info(
    -> `id` VARCHAr(255),
    -> `name` VARCHAr(255),
    -> `sex` VARCHAr(255)
    -> );
Query OK, 0 rows affected (0.01 sec)

mysql> show create table user_infoG
*************************** 1. row ***************************
       Table: user_info
Create Table: CREATE TABLE `user_info` (
  `id` varchar(255) DEFAULT NULL,
  `name` varchar(255) DEFAULT NULL,
  `sex` varchar(255) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8
1 row in set (0.00 sec)

mysql> 
1.3 赋权限
set global validate_password_length=4; 
set global validate_password_policy=0;
GRANT SELECT,REPLICATION SLAVE,REPLICATION CLIENT ON *.* TO
'canal'@'%' IDENTIFIED BY 'canal' ;
二. Canal安装及配置 2.1 Canal下载及安装

下载地址:

https://github.com/alibaba/canal/releases
cd /usr/local/src
gunzip canal.deployer-1.1.2.tar.gz
tar -xvf canal.deployer-1.1.2.tar
2.2 修改 canal.properties 的配置
cd /usr/local/src/conf
vim canal.properties
canal.id = 1 
canal.ip = 
canal.port = 11111
canal.metrics.pull.port = 11112 
canal.zkServers =
# flush data to zk canal.zookeeper.flush.period = 1000 
canal.withoutNetty = false
# tcp, kafka, RocketMQ 
canal.serverMode = kafka
# flush meta cursor/parse position to file

canal.mq.servers = hp2:9092,hp3:9092,hp4:9092

说明:
这个文件是 canal 的基本通用配置,canal 端口号默认就是 11111,修改 canal 的 输出 model,默认 tcp,改为输出到 kafka多实例配置如果创建多个实例,通过前面 canal 架构,我们可以知道,一个 canal 服务 中可以有多个 instance,conf/下的每一个 example 即是一个实例,每个实例下面都有独立的 配置文件。默认只有一个实例 example,如果需要多个实例处理不同的 MySQL 数据的话,直 接拷贝出多个 example,并对其重新命名,命名和配置文件中指定的名称一致,然后修改 canal.properties 中的 canal.destinations=实例 1,实例 2,实例 3。

2.3 修改 instance.properties

我们这里只读取一个 MySQL 数据,所以只有一个实例,这个实例的配置文件在conf/example 目录下

cd /usr/local/src/conf/example
vim instance.properties
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
canal.instance.defaultDatabaseName =canal_test

# mq config 
canal.mq.topic=canal_test 
canal.mq.partitionsNum=2
canal.mq.partition=2
# hash partition config
#canal.mq.partitionHash=mytest.person:id,mytest.role:id

备注:
canal.mq.partitionsNum 不能操作kafka主题的分区数,不然会报错
ERROR com.alibaba.otter.canal.kafka.CanalKafkaProducer - Invalid partition given with record: 3 is not in the range [0…1).

kafka默认的分区数是1,此时需要修改

kafka-topics.sh --zookeeper  localhost:2181 --alter --partitions 3 --topic  canal_test 
2.4 启动 Canal
cd /usr/local/src/
bin/startup.sh

2.5 看到 CanalLauncher 你表示启动成功,同时会创建 canal_test 主题
[root@hp8 ~]# jps 
11498 BrokerBootstrap
15662 Jps
[root@hp8 ~]# 
2.6 启动 Kafka 消费客户端测试,查看消费情况
cd /opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/kafka/bin
./kafka-console-consumer.sh --bootstrap-server hp2:9092 --topic canal_test
2.7 向 MySQL 中插入数据后查看消费者控制台

MySQL

INSERT INTO user_info VALUES('1001','zhangsan','male');
INSERT INTO user_info VALUES('1002','lishi','male');

Kafka

{"data":[{"id":"1002","name":"lishi","sex":"male"}],"database":"canal_test","es":1646041639000,"id":1,"isDdl":false,"mysqlType":{"id":"varchar(255)","name":"varchar(255)","sex":"varchar(255)"},"old":null,"sql":"","sqlType":{"id":12,"name":12,"sex":12},"table":"user_info","ts":1646042284004,"type":"INSERT"}

参考:
    https://blog.csdn.net/libizhide/article/details/109555562https://www.dazhuanlan.com/wendyemir/topics/1422034
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/780112.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号