栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 系统运维 > 运维 > Linux

kafka connector使用(Docker一键启动版)

Linux 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

kafka connector使用(Docker一键启动版)

前言

结合博主之前写的一篇《kafka connector使用(单机手动启动版)》一起看

版本

kafka: confluentinc/cp-kafka:7.0.1
zookeeper: confluentinc/cp-zookeeper:7.0.1

思路

将该连接器做成一个镜像。

实现 一、制作镜像

Dockerfile

FROM confluentinc/cp-kafka:7.0.1
COPY connect-redis.jar /usr/share/java/connect-redis.jar
CMD ["connect-standalone","/etc/kafka/connect-standalone.properties","/etc/kafka/connect-redis-source.properties"]

将Dockerfile和超级jar放在一个文件夹下,打开cmd命令行,输入docker build -t curtain:kafka-connector:7.0.1 .构建连接器镜像

二、编写docker-compose.yml
version: "3.0"
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.0.1
    container_name: zookeeper
    restart: always
    ports:
      - 2181:2181
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_SYNC_LIMIT: 2
  kafka:
    image: confluentinc/cp-kafka:7.0.1
    container_name: kafka
    ports:
      - 9092:9092
    environment:
      # client要访问的broker地址
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://服务器ip:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      # 通过端口连接 zookeeper
    restart: always
    depends_on:
      - zookeeper
  kafka-connector:
    image: curtain/kafka-connector:7.0.1
    container_name: kafka-connector
    volumes:
      - type: bind
        source: ./connect-standalone.properties
        target: /etc/kafka/connect-standalone.properties
      - type: bind
        source: ./connect-redis-source.properties
        target: /etc/kafka/connect-redis-source.properties
    restart: always
    depends_on:
      - zookeeper
      - kafka
三、修改connect-standalone.properties

没有该文件的话,网上找一份。将bootstrap.servers里面的ip部分从localhost改成服务器的ip。

四、上传文件

将镜像包、docker-compose.yml、connect-standalone.properties、连接器配置文件上传到服务器同一个路径下。

五、加载镜像并启动
  1. 加载镜像:docker load < 镜像包.tar
  2. 启动镜像: docker-compose up -d

这样,kafka连接器就启动啦。

可能会遇到的问题

问题一: 默认写入到kafka的数据会包含一些额外的信息如:schema和payload,我们不想要这些额外的信息怎么办?
将connect-standalone.properties配置文件中的key.converter.schemas.enable和value.converter.schemas.enable设置为false即可
问题二:写入到kafka的数据多了一层双引号和json转义怎么去掉?
这个是因为默认配置的转换器类是org.apache.kafka.connect.json.JsonConverter加上你生成的SourceRecord对象里面的value和key值已经是一个String类型了。这样的话会将String类型的数据json化一遍,就会出现多了一层双引号加转义的情况。修改connect-standalone.properties配置文件中的key.converter和value.converter为org.apache.kafka.connect.storage.StringConverter即可。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/882327.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号