栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 系统运维 > 运维 > Linux

elasticsearch和mysql数据同步, 基于canal, canal-server和canal-adapter

Linux 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

elasticsearch和mysql数据同步, 基于canal, canal-server和canal-adapter

第一步: 下载mysql 8.0.18版本的镜像
docker pull mysql:8.0.18
第二步: 运行mysql的实例
docker run --name mysql8.0.18 -p 3306:3306 -e MYSQL_ROOT_PASSWORD=admin -d mysql:8.0.18
第三步: 在宿主机中新建一个文件用来存储mysql的配置文件, 拷贝出mysql容器中的文件到本地 第四步: 关掉刚刚新建的mysql容器, 新建一个挂载盘的容器
# 创建一个宿主机新的文件
mkdir /mydata/mysql8.0.18


# 将mysql的配置文件copy到宿主机文件中
docker cp mysql:8.0.18:/etc/mysql/. /mydata/mysql8.0.18/conf/

# 将mysql的数据copy到宿主机文件中
docker cp mysql:8.0.18:/var/lib/mysql/. /mydata/mysql8.0.18/data/

# 删除mysql容器
docker stop mysql8.0.18
docker rm mysql8.0.18

# 再次新建mysql容器, 并且挂载数据到宿主机中, 方便后续配置文件重启mysql
docker run --name mysql8.0.18 -p 3306:3306 
-v /mydata/mysql8.0.18/conf:/etc/mysql 
-v /mydata/mysql8.0.18/data:/var/lib/mysql 
--restart=always 
-e MYSQL_ROOT_PASSWORD=admin 
-d mysql:8.0.18
第五步: 开始配置mysql  配置完成重启容器 下面是配置文件内容:
[mysqld]
pid-file        = /var/run/mysqld/mysqld.pid
socket          = /var/run/mysqld/mysqld.sock
datadir         = /var/lib/mysql
secure-file-priv= NULL

# 打开binlog
log-bin=mysql-bin
binlog-format=ROW
# 配置MySQL replaction需要定义,不要和canal的slaveId重复
server-id=1

# Disabling symbolic-links is recommended to prevent assorted security risks
symbolic-links=0

# Custom config should go here
!includedir /etc/mysql/conf.d/

重启mysql容器
docker restart mysql8.0.18
查看配置是否成功:
# 进入docker mysql容器内部
docker exec -it mysql8.0.18 /bin/bash

mysql -uroot -p

# 输入密码

show master status;

show varialbles like 'log_bin';
新建一下后面测试需要的数据库和表
drop DATABASE if EXISTS mytest;
CREATE DATABASE `mytest` DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci;

use mytest;

drop table if exists user;
create table user(
	`id` varchar(256) COMMENT 'id',
	`username` varchar(256) comment '用户登录名',
	`password` varchar(256) comment '登录密码',
	`nickname` varchar(25) comment '用户昵称',
	`address` varchar(256) comment '用户地址',
	`age` int(5) comment '用户年龄',
	primary key (`id`)
)ENGINE=INNODB;
安装 canal-server, 启动一个canal-server容器
#拉取镜像
docker pull canal/canal-server:v1.1.5
#第一次运行
docker run --name canal-server -p 11111:11111 -d canal/canal-server:v1.1.5

拷贝配置文件到宿主机
# 将容器中的配置文件copy到宿主机
docker cp canal-server:/home/admin/canal-server/. /mydata/canal-server/

# 关闭容器,  重新挂载配置到宿主机启动容器
docker stop canal-server
docker rm canal-server

# 启动canal-server
docker run --name canal-server -p 11111:11111 -v /mydata/canal-server:/home/admin/canal-server -d canal/canal-server:v1.1.5
修改配置文件
vim /mydata/canal-server/conf/example/instance.properties
#################################################
## mysql serverId , v1.0.26+ will autoGen
canal.instance.mysql.slaveId=11

# enable gtid use true/false
canal.instance.gtidon=false

# position info
canal.instance.master.address=127.0.0.1:3306  # 你的数据库地址
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
canal.instance.dbUsername=root  # 您的数据库账号
canal.instance.dbPassword=admin #你的数据库密码
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
canal.instance.filter.regex=.*\..*
# table black regex
canal.instance.filter.black.regex=mysql\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\..*,.*\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\..*
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#################################################
安装canal-adapter
#拉取镜像
docker pull slpcat/canal-adapter:v1.1.5
#第一次运行
docker run --name canal-adapter -p 8081:8081 -d slpcat/canal-adapter:v1.1.5

拷贝配置文件到宿主机

docker cp canal-adapter:/opt/canal-adapter/.  /mydata/canal-adapter/


# 关闭之前的容器
docker stop canal-adapter
docker rm canal-adapter

# 挂载配置文件到宿主机, 启动容器
docker run --name canal-adapter -p 8081:8081 -v /mydata/canal-adapter:/opt/canal-adapter  -d slpcat/canal-adapter:v1.1.5
编写配置文件
# 编辑配置文件
vim /mydata/canal-adapter/conf/application.yml

配置文件内容

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 127.0.0.1:11111 #  之前起的 canal-server 地址  url
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
    # kafka consumer
    # kafka.bootstrap.servers: 127.0.0.1:9092
    # kafka.enable.auto.commit: false
    # kafka.auto.commit.interval.ms: 1000
    # kafka.auto.offset.reset: latest
    # kafka.request.timeout.ms: 40000
    # kafka.session.timeout.ms: 30000
    # kafka.isolation.level: read_committed
    # kafka.max.poll.records: 1000
    # rocketMQ consumer
    # rocketmq.namespace:
    # rocketmq.namesrv.addr: 127.0.0.1:9876
    # rocketmq.batch.size: 1000
    # rocketmq.enable.message.trace: false
    # rocketmq.customized.trace.topic:
    # rocketmq.access.channel:
    # rocketmq.subscribe.filter:
    # rabbitMQ consumer
    # rabbitmq.host:
    # rabbitmq.virtual.host:
    # rabbitmq.username:
    # rabbitmq.password:
    # rabbitmq.resource.ownerId:

  srcDataSources:
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true # 你的数据库地址
      username: root   # 数据库账号
      password: admin  # 数据库密码
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      - name: es7  # 你的es配置
        hosts: 127.0.0.1:9200 # 你的es配置
        properties:
         mode: rest # 你的es配置
         cluster.name: test-es # 你的es配置

编写配置脚本

vim /mydata/canal-adapter/conf/es7/test.yml

编写配置文件内容

dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
  _index: mytest # es索引
  _id: _id
  _type: _doc
  upsert: true
  sql: "select a.id as _id, a.username as username, a.password as password, a.nickname as nickname, a.address as address, a.age as age from user a"
  commitBatch: 3000
测试一下

添加elastisearch的索引

{
  "mappings": {
    "properties": {
      "username": {
        "type": "text"
      },
      "password": {
        "type": "text"
      },
      "nickname": {
        "type": "text"
      },
      "address": {
        "type": "text"
      },
      "age": {
        "type": "text"
      }
    }
  }
}

重启canal-server和canal-adapter

docker restart canal-server

docker restart canal-adapter

数据库新增修改删除, elasticsearch都会相应的改变

关于如何全量同步已存在的数据进入es
# 8081 就是指的canal-adapter起的服务地址  触发 请求去告诉adapter需要同步全量的数据
curl -X POST http://127.0.0.1:8081/etl/es7/test.yml # yml就是指的es7文件中的文件名

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/880223.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号