栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

SpringBoot+KafkaConnect功能+监控实践(持续更新)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

SpringBoot+KafkaConnect功能+监控实践(持续更新)

版权说明: 本文由博主keep丶原创,转载请注明出处。
原文地址: https://blog.csdn.net/qq_38688267/article/details/120729471

文章目录
  • 前言
  • 实践
    • 环境搭建
      • kafka安装运行
      • 启动kafka connect
      • 实现mysql数据读取和插入
  • 附录
    • 添加额外连接器
    • connect URI介绍

前言 实践 环境搭建

  作者是在linux环境中搭建的,建议在linux环境测试。

kafka安装运行
  • 下载
    传送门

  • 解压

# 解压
tar -xzf kafka_2.13-3.0.0.tgz
# 进入主目录
cd kafka_2.13-3.0.0
# 启动zk
bin/zookeeper-server-start.sh config/zookeeper.properties
# 启动kafka
bin/kafka-server-start.sh config/server.properties
启动kafka connect
# 第一个properties是启动connect的配置;
# 第二个properties是connector的配置,会根据配置创建一个connector
bin/connect-standalone.sh 
	config/connect-standalone.properties 
	connect-file-sink.properties
实现mysql数据读取和插入

  kafka官方包里面只支持几种简单的connector类型,像JDBC这些是不支持的,因此需要额外添加。

  • 直接linux下载
# 需要安装wget工具,如果没有
yum install -y wget

# 下载
wget https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-jdbc/versions/10.2.4/confluentinc-kafka-connect-jdbc-10.2.4.zip
  • 安装
# 先解压
tar -xzf confluentinc-kafka-connect-jdbc-10.2.3.zip
# 将该工具里的lib下面的包,复制到kafka/lib下即可
cp confluentinc-kafka-connect-jdbc-10.2.3/lib/* KAFKA_HOME/lib
# 再重启kafka和connect
  • windows下载JDBC连接包,再上传到Linux
    下载地址

  • 添加一个MySQL source connector

curl -X POST -H "Content-Type: application/json" 
-d '{
	"name":"test_mysql", 
	"config":{
		"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
		"mode":"bulk",
		"tables":"`zzf`.`test_zzf`",
		"task.class":"io.confluent.connect.jdbc.source.JdbcSourceTask",
		"tasks.max":"2",
		"topics":"mysql_test",
		"name":"test_mysql",
		"connection.url":"jdbc:mysql://localhost:3306/zzf?user=root&password=123456",
		"table.whitelist":"test_zzf"
	}
}'
附录 添加额外连接器

  kafka官方包里面只支持几种简单的connector类型,像JDBC这些是不支持的,因此需要额外添加
地址:https://www.confluent.io/hub

connect URI介绍
请求方法路径含义
GET/connectors返回所有正在运行的connector名。
GET/connectors/{name}获取指定connetor的信息。
GET/connectors/{name}/config获取指定connector的配置信息
GET/connectors/{name}/status获取指定connector的状态,包括它是否在运行、停止、或者失败,如果发生错误,还会列出错误的具体信息。
GET/connectors/{name}/tasks获取指定connector正在运行的task。
GET/connectors/{name}/tasks/{taskid}/status获取指定connector的task的状态信息。
PUT/connectors/{name}/config更新指定connector的配置信息
PUT/connectors/{name}/pause暂停connector和它的task,停止数据处理知道它被恢复
PUT/connectors/{name}/resume恢复一个被暂停的connector。
POST/connectors新建一个connector; 请求体必须是json格式并且需要包含name字段和config字段,name是connector的名字,config是json格式,必须包含你的connector的配置信息。
POST/connectors/{name}/restart重启一个connector,尤其是在一个connector运行失败的情况下比较常用
POST/connectors/{name}/tasks/{taskId}/restart重启一个task,一般是因为它运行失败才这样做。
DELETE/connectors/{name}删除一个connector,停止它的所有task并删除配置。
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/318766.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号