文章目录版权说明: 本文由博主keep丶原创,转载请注明出处。
原文地址: https://blog.csdn.net/qq_38688267/article/details/120729471
- 前言
- 实践
- 环境搭建
- kafka安装运行
- 启动kafka connect
- 实现mysql数据读取和插入
- 附录
- 添加额外连接器
- connect URI介绍
作者是在linux环境中搭建的,建议在linux环境测试。
kafka安装运行-
下载
传送门 -
解压
# 解压 tar -xzf kafka_2.13-3.0.0.tgz # 进入主目录 cd kafka_2.13-3.0.0 # 启动zk bin/zookeeper-server-start.sh config/zookeeper.properties # 启动kafka bin/kafka-server-start.sh config/server.properties启动kafka connect
# 第一个properties是启动connect的配置; # 第二个properties是connector的配置,会根据配置创建一个connector bin/connect-standalone.sh config/connect-standalone.properties connect-file-sink.properties实现mysql数据读取和插入
kafka官方包里面只支持几种简单的connector类型,像JDBC这些是不支持的,因此需要额外添加。
- 直接linux下载
# 需要安装wget工具,如果没有 yum install -y wget # 下载 wget https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-jdbc/versions/10.2.4/confluentinc-kafka-connect-jdbc-10.2.4.zip
- 安装
# 先解压 tar -xzf confluentinc-kafka-connect-jdbc-10.2.3.zip # 将该工具里的lib下面的包,复制到kafka/lib下即可 cp confluentinc-kafka-connect-jdbc-10.2.3/lib/* KAFKA_HOME/lib # 再重启kafka和connect
-
windows下载JDBC连接包,再上传到Linux
下载地址 -
添加一个MySQL source connector
curl -X POST -H "Content-Type: application/json"
-d '{
"name":"test_mysql",
"config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
"mode":"bulk",
"tables":"`zzf`.`test_zzf`",
"task.class":"io.confluent.connect.jdbc.source.JdbcSourceTask",
"tasks.max":"2",
"topics":"mysql_test",
"name":"test_mysql",
"connection.url":"jdbc:mysql://localhost:3306/zzf?user=root&password=123456",
"table.whitelist":"test_zzf"
}
}'
附录
添加额外连接器
kafka官方包里面只支持几种简单的connector类型,像JDBC这些是不支持的,因此需要额外添加
地址:https://www.confluent.io/hub
| 请求方法 | 路径 | 含义 |
|---|---|---|
| GET | /connectors | 返回所有正在运行的connector名。 |
| GET | /connectors/{name} | 获取指定connetor的信息。 |
| GET | /connectors/{name}/config | 获取指定connector的配置信息 |
| GET | /connectors/{name}/status | 获取指定connector的状态,包括它是否在运行、停止、或者失败,如果发生错误,还会列出错误的具体信息。 |
| GET | /connectors/{name}/tasks | 获取指定connector正在运行的task。 |
| GET | /connectors/{name}/tasks/{taskid}/status | 获取指定connector的task的状态信息。 |
| PUT | /connectors/{name}/config | 更新指定connector的配置信息 |
| PUT | /connectors/{name}/pause | 暂停connector和它的task,停止数据处理知道它被恢复 |
| PUT | /connectors/{name}/resume | 恢复一个被暂停的connector。 |
| POST | /connectors | 新建一个connector; 请求体必须是json格式并且需要包含name字段和config字段,name是connector的名字,config是json格式,必须包含你的connector的配置信息。 |
| POST | /connectors/{name}/restart | 重启一个connector,尤其是在一个connector运行失败的情况下比较常用 |
| POST | /connectors/{name}/tasks/{taskId}/restart | 重启一个task,一般是因为它运行失败才这样做。 |
| DELETE | /connectors/{name} | 删除一个connector,停止它的所有task并删除配置。 |



