栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

MongoDB CDC变更捕获推 Kafka

MongoDB CDC变更捕获推 Kafka

重点记录debezium connect的使用

工具选型
  1. MongoDB version 4.4.3
  2. 捕获工具 debezium connect 1.8.0
  3. Kafka 2.13+
参考过程

MongoDB | 编排文件

version: '3.0'

services:
  mongo:
    image: mongo:4.4.3
    container_name: mongo
    command: mongod --replSet cdcReplset --bind_ip_all
    environment:
      - MONGO_INITDB_ROOT_USERNAME=root
      - MONGO_INITDB_ROOT_PASSWORD=123456
    ports:
      - 27017:27017
    networks:
      product:
        aliases:
          - docker.mongo

networks:
  product:
    external: true

Kafka | 编排文件 (外置zookeeper)

version: '3.0'

services:
  kafka:
    image: wurstmeister/kafka:2.13-2.7.0
    container_name: kafka
    ports:
      - 9092:9092
    environment:
      - KAFKA_ZOOKEEPER_ConNECT=docker.zookeeper.node1:2181
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.31.233:9092
      - KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092
    networks:
      product:
        aliases:
          - docker.kafka
networks:
  product:
    external: true

debezium connect | 编排文件

version: '3.0'

services:
  connect:
    image: debezium/connect:1.8.0.Final
    container_name: connect
    ports:
      - 8083:8083
    environment:
      - BOOTSTRAP_SERVERS=docker.kafka:9092
      - GROUP_ID=1
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses
    networks:
      product:
        aliases:
          - docker.debezium

networks:
  product:
    external: true
配置过程

初始化MongoDB副本集, mongo shell 初始化 ,此处只有一个副本

rs.initiate({_id: "cdcReplset", members: [{_id: 0, host: "docker.mongo:27017"}]})

添加 debezium connect 任务
官网参考

{
  "name": "inventory-connector", 1
  "config": {
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector", 2
    "mongodb.hosts": "rs0/192.168.99.100:27017", 3
    "mongodb.name": "fullfillment", 4
    "collection.include.list": "inventory[.]*" 5
  }
}
  1. 当我们将连接器注册到 Kafka Connect 服务时,我们的连接器的名称
  2. MongoDB 连接器类的名称。
  3. 用于连接到 MongoDB 副本集的主机地址。
  4. MongoDB 副本集的逻辑名称,它为生成的事件形成命名空间,用于连接器写入的所有 Kafka 主题名称、Kafka Connect 模式名称以及 Avro 连接时对应的 Avro 模式的命名空间使用转换器。
  5. 与要监视的所有集合的集合命名空间(例如,.)匹配的正则表达式列表。这是可选的。

参考命令

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{"name": "inventory-connector", "config": {"connector.class": "io.debezium.connector.mongodb.MongoDbConnector", "mongodb.hosts": "cdcReplset/docker.mongo:27017", "mongodb.user": "root", "mongodb.password": "123456", "mongodb.name": "cdc_topic_prefix","collection.include.list": "cdc_database.cdc_collection" }}'

跑完稍等一会去看kafka topic

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/676980.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号