我将尝试广泛地回答您的问题。连接器开发的一种简单方法如下:
- 通过查看公开提供的许多Kafka连接器之一来构建和构建连接器源代码(您可以在此处找到广泛的列表:https : //www.confluent.io/product/connectors/)
- 从https://www.confluent.io/download/下载最新的Confluent开源版本(> = 3.3.0)
通过以下其中一种方式使连接器包可用于Kafka Connect:
- 将所有连接器jar文件(连接器jar以及不包括Connect API jar的依赖关系jar)存储到文件系统中的某个位置,并通过将该位置添加到
plugin.path
Connect worker属性中的属性来启用插件隔离 。例如,如果连接器罐子存储在中/opt/connectors/my-first-connector
,则将plugin.path=/opt/connectors
在工作人员的属性中进行设置(请参见下文)。 - 将所有连接器jar文件存储在下的文件夹中
${CONFLUENT_HOME}/share/java。例如:${CONFLUENT_HOME}/share/java/kafka-connect-my-first-connector。(需要以kafka-connect-
启动脚本选择的前缀开头)。$ CONFLUENT_HOME是您安装Confluent Platform的位置。 - (可选)通过更改“连接
${CONFLUENT_HOME}/etc/kafka/connect-log4j.properties到”DEBUG
或“甚至” 的日志级别来增加日志记录TRACE
。
- 将所有连接器jar文件(连接器jar以及不包括Connect API jar的依赖关系jar)存储到文件系统中的某个位置,并通过将该位置添加到
使用Confluent CLI启动所有服务,包括Kafka Connect。此处的详细信息:http : //docs.confluent.io/current/connect/quickstart.html
简要地:
confluent start
注意:CLI当前加载的Connect worker的属性文件是
${CONFLUENT_HOME}/etc/schema-registry/connect-avro-distributed.properties。如果选择启用类加载隔离,但也需要更改Connect
worker的属性,则应编辑该文件。
- 一旦运行了Connect worker,请运行以下命令启动连接器:
confluent load <connector_name> -d <connector_config.properties>
要么
confluent load <connector_name> -d <connector_config.json>
连接器配置可以采用Java属性或JSON格式。
- 运行
confluent log connect
以打开Connect worker的日志文件,或通过运行直接导航到您的日志和数据的存储位置
cd "$( confluent current )"
注意:在Confluent
CLI的会话期间,通过CONFLUENT_CURRENT适当地设置环境变量来更改日志和数据的存储位置。例如,假设/opt/confluent存在,并且是您要存储数据的位置,请运行:export CONFLUENT_CURRENT=/opt/confluentconfluent current
- 最后,以交互方式调试连接器的一种可能方法是在使用Confluent CLI启动Connect之前应用以下方法:
confluent stop connect
export CONNECT_DEBUG=y; export DEBUG_SUSPEND_FLAG=y;
confluent start connect
然后与调试器连接(例如,远程连接到Connect worker(默认端口:5005)。要停止以调试模式运行connect,只需
unsetCONNECT_DEBUG; unset DEBUG_SUSPEND_FLAG;在完成后运行:。
我希望以上内容将使您的连接器开发更加轻松和……更加有趣!



