1、CentOS(7.6)
2、Docker-compose
3、Mysql(5.7)
4、Canal
5、Kafka
6、Kafka-eagle(官方压缩包下载地址)
1、开启数据库binlog
[mysqld] log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择 ROW 模式 server_id=1
开启数据库binlog是因为Canal的工作原理所需,具体细节---->【Canal工作原理】
2、创建canal账号
Navicat下root账号执行,怎么方便怎么来
#创建账号(账号:canal;密码:canal) CREATE USER canal IDENTIFIED BY 'canal'; #授予权限 GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; #刷新并应用权限 FLUSH PRIVILEGES;Canal安装
1、docker-compose.yml
version: '3' services: canal-server: image: canal/canal-server:v1.1.4 container_name: canal-server ports: - 11111:11111 ## 暴露端口 environment: - canal.instance.mysql.slaveId=12 - canal.auto.scan=false - canal.destinations=test_canal ## canal服务端名称 - canal.instance.master.address=127.0.0.1:3306 ## 数据库地址 - canal.instance.dbUsername=canal ## 数据库账号 (可自行设置) - canal.instance.dbPassword=canal ## 数据库密码 - canal.instance.filter.regex=.*\\..* ## 数据过滤规则(正则),后续按需求再做调整
docker-compose.yml目录下执行命令:
docker-compose up -dKafka+Zookeeper安装
docker-compose.yml
version: '2' services: zookeeper: image: wurstmeister/zookeeper ## 镜像 ports: - "2181:2181" ## 对外暴露的端口号 kafka: image: wurstmeister/kafka ## 镜像 volumes: - /etc/localtime:/etc/localtime ## 挂载位置(kafka镜像和宿主机器之间时间保持一直) ports: - "9092:9092" environment: KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1 ## 宿主机IP KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 ## 运行基于zookeeper KAFKA_ADVERTISED_PORT: 9092 # kafka 对外端口 KAFKA_LOG_RETENTION_HOURS: 120 KAFKA_MESSAGE_MAX_BYTES: 10000000 KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000 KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS: 60000 KAFKA_NUM_PARTITIONS: 3 KAFKA_DELETE_RETENTION_MS: 1000
docker-compose.yml目录下执行命令:
docker-compose up -d配置串联
canal默认的工作模式是TCP。要把canal和kafka串联起来需要修改配置文件
docker exec -it [container ID] bash cd /admin/canal-server/conf/ vi canal.properties
1、修改Canal工作模式:
把canal拿到的Binlog丢到Kafka里面
canal.serverMode = kafka ## 服务模式 canal.mq.server = host:port ## kafka 地址 127.0.0.1:9092
2、修改Canal过滤规则
canal默认是监听所有数据库的Binlog,这不符合一般需求。修改Canal过滤规则,只把要监听的表丢到对应的kafka Topic里。
cd /admin/canal-server/conf/test_canal/ ## test_canal 上面canal.destinations的值 vi instance.properties
修改内容:
canal.instance.filter.regex=db.table,db2.table # 过滤规则 canal.mq.dynamicTopic=db.table,db2.table # 动态topic
修改完配置文件后,需要重启一下容器。
除此之外,还要进入kafka容器,把自动创建topic开启(默认是开启的)
docker exec -it [kafka 容器 ID] bash vi /opt/kafka/server.propertices 添加:auto.create.topics.enable=true
为了后面Kafka-eagle做准备。顺便修改一下启动shell
vi opt/kafka/bin/kafka-server-start.sh if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export JMX_PORT="9999" ## 新增内容 export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" fi
重启容器
docker restart [Kafka 容器 ID]Kafka-eagle安装
1、通过宝塔、File-Zilla等方式,把从官方下载的压缩包上传到服务器。
2、安装JDK,推荐1.8。Linux安装JDK
3、解压两次压缩包
tar -zxf 压缩包
然后会得到一个文件夹efak-web-2.0.8(目前最新版)
4、修改配置文件
vi /efak-web-2.0.8/conf/system-config.properties # 修改zookeeper集群地址 efak.zk.cluster.alias = cluster1 cluster1.zk.list = zookeeper # (host:port) # 添加一个mysql数据库配置,监控要记录日常用户信息 efak.driver=com.mysql.cj.jdbc.Driver efak.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8& zeroDateTimeBehavior=convertToNull efak.username=dbName # 数据库账号 efak.password=dbPassword # 数据库密码
5、添加环境变量
vi ~/.bash_profile export KE_HOME=/data/kafka/kafka-eagle/efak-web-2.0.8 export PATH=$PATH:$KE_HOME/bin
保存后执行
source ~/.bash_profile
进入 /efak-web-2.0.8/bin/目录下
ke.sh start效果图



