栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

cannl同步mysql数据到es中

cannl同步mysql数据到es中

背景:

项目中业务数据量比较大,每类业务表都达到千万级别,虽然做了分库分表和读写分离,每张表数据控制在500W一下,但是效率还是达不到要求,为了提高查询效率,我们使用ES查询。
而将mysql实时同步到es中保证数据一致性就成了我们的工作之下。

环境:

mysql 5.7
caanl 1.1.5
(也有一个坑,多张表公用一个es索引,但是多表有字段同名的时候,你更新一个表的同名字段,es会把数据表同名的所有字段都更新,虽然你在es索引中的字段名称不一样,也会导致,cannl开发者修复了这个问题,但是并没有在1.1.5版本中更新,我们是自己下载源码手动改cannl才搞定)
es:7.1.6
(推荐使用6.8.22版本(cannl对7.X版本的支持感觉不是很好,我们遇到不少坑,尽量用6.X版本,最新的6.8.22版本修复了log4j2的漏洞))
官方文档地址: https://www.elastic.co/guide/en/elasticsearch/reference/6.8/release-notes-6.8.22.html
官方下载地址:https://www.elastic.co/cn/downloads/elasticsearch
腾讯云镜像地址:https://mirrors.cloud.tencent.com/elasticstack/6.x/yum/6.8.22/

服务器 centos, 采用docker容器

资料:

canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。
简单来说canal的原理是根据mysql的主从复制原理实现的,canal伪装成slave库从而向master库读取binlog日志获取增量日志。

canal-admin

(非必须但推荐使用):为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作。

canal-server:

服务端,从mysql读取binlog日志获取增量日志,可以通过tcp、kafka、RocketMQ等方式与客户端通信;通过zookeeper搭建集群。

canal-adapter

:客户端,根据canal-server获取的增量日志执行适配到其他诸如elasticsearch、redis、mysql等端,实现数据同步。

部署: 1、开启mysql的binlog

使用canal-server需要先准备mysql,对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,/mysql.conf.d/mysqld.cnf中配置如下:

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

配置完成后重启mysql,并查询是否配置生效:ON就是开启

show variables like ‘log_bin%’;
show variables like ‘binlog_format%’;

2、mysql创建cannl用户并授权

授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限。

CREATE USER canal IDENTIFIED BY '123456';
GRANT SELECt, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
3、安装运行canal-admin
docker run -d -p 8089:8089 -e server.port=8089 --name canal-admin canal/canal-admin

管理页面地址:http://127.0.0.1:8089/
用户名:admin
密码:123456

4、安装运行canal-server

(下面的IP和端口需要和第三部中配置的一致,需要改成自己服务器的)

docker run -d -p 9100:9100 -p 11110:11110 -p 11111:11111 -p 11112:11112 
-v /home/docker/canal-server/logs:/home/admin/canal-server/logs 
-e canal.admin.manager=IP:8089 
-e canal.admin.port=11110 
-e canal.admin.user=admin 
-e canal.admin.passwd=4ACFE3202A5FF5CF467898FC58AAB1D615029441 
--name=canal-server 
--restart=always 
canal/canal-server
5、应用配置

canal-server启动完成后访问canal-admin链接(IP:8089)因为docke启动的时候配置admin地址,canal-admin服务管理里会自动识别到canal-server的服务
访问地址:http://IP:8089/#/ 账号密码:admin/123456

第一步:现在 Instance 管理里添加一个实例,先输入实例名称(用英文,这个实例名下面要用!我这叫做demo)和选择服务,点击载入模板修改数据库配置

...
# 配置数据库IP和端口,改成自己的,下面说的都要改成自己环境的
canal.instance.master.address=127.0.0.1:3306
...
# 数据库中配置的cannl账号和密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=密码
# 数据库匹配,如果这个不配置,他会扫描所有的库的binlog
canal.instance.filter.regex=数据库名\..*

第二步:在Server 管理中选择操作下拉框配置,在配置文件99行添加刚配置的实例名称,多个用,(逗号)隔开

canal.destinations = demo
6、安装运行canal-adapter
docker run -d -p 8081:8081 
--name canal-adapter 
canal/canal-adapter:v1.1.5

复制配置文件到容器外

docker cp canal-adapter:/opt/canal/adapter/conf /home/docker/canal-adapter

停止canal-adapter容器,

docker stop canal-adapter

再执行运行命令,因为dokcer挂载配置目录会导致配置不生成配置文件,所以我们再以挂载配置文件方式运行它。

docker cp canal-adapter:/opt/canal/adapter/conf /home/docker/canal-adapter
docker run -d -p 8081:8081 
-v /home/docker/canal-adapter/conf:/opt/canal-adapter/conf 
-v /home/docker/canal-adapter/logs:/opt/canal-adapter/logs 
--name canal-adapter 
canal/canal-adapter:v1.1.5
7、canal-adapter配置

修改application.yml配置文件中的canalServerHost、数据源及instance和ES服务器地址

 vi /home/docker/canal-adapter/conf/application.yml

文件仅做参考,需要改成自己的

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # 配置cannl 地址 把127.0.0.1换成自己的
    canal.tcp.server.host: 127.0.0.1:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
  srcDataSources:
    defaultDS:
      # 数据库信息配置 换成自己的
      url: jdbc:mysql://127.0.0.1:3306/test?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf8&useSSL=false&allowMultiQueries=true
      username: canal
      password: 123456
  canalAdapters:
   # canal instance Name or mq topic name
   # 实例名配置,上面第5步取的,我配置的叫做demo
  - instance: demo
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
#      - name: rdb
#        key: mysql1
#        properties:
#          jdbc.driverClassName: com.mysql.jdbc.Driver
#          jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
#          jdbc.username: root
#          jdbc.password: 121212
#      - name: rdb
#        key: oracle1
#        properties:
#          jdbc.driverClassName: oracle.jdbc.OracleDriver
#          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
#          jdbc.username: mytest
#          jdbc.password: m121212
#      - name: rdb
#        key: postgres1
#        properties:
#          jdbc.driverClassName: org.postgresql.Driver
#          jdbc.url: jdbc:postgresql://localhost:5432/postgres
#          jdbc.username: postgres
#          jdbc.password: 121212
#          threads: 1
#          commitSize: 3000
#      - name: hbase
#        properties:
#          hbase.zookeeper.quorum: 127.0.0.1
#          hbase.zookeeper.property.clientPort: 2181
#          zookeeper.znode.parent: /hbase
		# 数据文件位置,我这设置es7,等下需要在这个conf目录下创建一个新的文件夹 名称es7,这里设置的啥名,文件夹就叫什么名
      - name: es7
      # es信息配置,127.0.0.1换成自己的,es的数据端口是9300
        hosts: 127.0.0.1:9300
        properties:
          mode: transport # or rest
#          # security.auth: test:123456 #  only used for rest mode
		# ES的cluster.name通过访问es所在服务器http://IP:9200/可以看到,我不确定默认是不是elasticsearch。最好确定一下
          cluster.name: elasticsearch

注意: 如果canal-adapter启动日志报错,报连接问题,请注意配置的端口防火墙有没有开放!!!

8、创建数据库同步脚本文件夹和文件

在/home/docker/canal-adapter/conf/下新建,名为es7的文件夹

mkdir es7

在es7文件夹下创建数据库同步脚本,脚本不一致,需要自己创建,每个索引都需要一个yml脚本,我以其中一个脚本为例,
数据库表名为 person_1,表结构如下

那么我需要把person_1和es中的索引数据同步(es索引自己百度),脚本如下

 cd es7
 vi  person_1.yml

person_1.yml内容

dataSourceKey: defaultDS
# 实例名 第5步取名的
destination: demo
groupId: g1
esMapping:
   # es创建的索引名
  _index: dev_person
  _type: _doc
  _id: _id
  sql: "
  # 查询sql语句,别名就是es的文档名,我这是一样,多表公用一个es索引的话,正常的left join写法就行
SELECT
  t.id as _id,
  t.id as id,
  t.name as name,
  t.sex as sex,
  t.age as age,
  t.des as des
  FROM
  person_1 t
"
  #etlCondition: "where s.c_time>={}" 这个etlCondition是全量同步的,有坑,别用,全量同步用步骤9
  commitBatch: 3000

重启canal-adapter,mysql新增数据就会自动增量同步到es

注意:canal-adapter不能使用docker重启,需要进入到容器里在/bin目录下执行sh restart.sh重启

9、初始化数据同步

如果mysql中有数据就需要调用一次全量同步,如果mysql没数据,或者数据没用,就不需要调用此步骤

canal-adapter提供一个REST接口可全量同步数据到ES

### 全量同步person_1表数据到es的person索引中,路径中的es7是上一步的文件夹名
curl http://127.0.0.1:8081/etl/es7/person_1.yml -X POST
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/677221.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号