8.3. 管理连接器
Kafka Connect REST API 提供用于直接创建、更新和删除连接器的端点。您还可以使用 API 检查连接器的状态或更改日志记录级别。当您通过 API 创建连接器时,您可以提供连接器的配置详情作为 API 调用的一部分。
您还可以添加和管理连接器作为插件。插件被打包为 JAR 文件,其中包含通过 Kafka Connect API 实现连接器的类。您只需要在 classpath 中指定插件,或将其添加到 Kafka Connect 的插件路径中,以便在启动时运行连接器插件。
除了使用 Kafka Connect REST API 或插件来管理连接器外,您还可以在独立模式中运行 Kafka Connect 时使用属性文件添加连接器配置。要做到这一点,您可以在启动 Kafka Connect worker 进程时指定属性文件的位置。属性文件应包含连接器的配置详情,包括连接器类、源和目标主题,以及任何所需的身份验证或序列化设置。
8.3.1. 限制对 Kafka Connect API 的访问
Kafka Connect REST API 可以被经过身份验证的用户访问,并知道端点 URL,其中包括主机名/IP 地址和端口号。仅将对 Kafka Connect API 的访问限制为可信用户,以防止未经授权的操作和潜在的安全问题。
为提高安全性,我们建议为 Kafka Connect API 配置以下属性:
-
(Kafka 3.4 或更高版本)
org.apache.kafka.disallowed.login.modules
,来专门排除不安全的登录模块 -
connector.client.config.override.policy
设置为NONE
,以防止连接器配置覆盖 Kafka Connect 配置及其使用的使用者和制作者
8.3.2. 配置连接器
使用 Kafka Connect REST API 或属性文件来创建、管理和监控连接器实例。在独立或分布式模式中使用 Kafka Connect 时,您可以使用 REST API。在独立模式中使用 Kafka Connect 时,您可以使用属性文件。
8.3.2.1. 使用 Kafka Connect REST API 管理连接器
使用 Kafka Connect REST API 时,您可以通过向 Kafka Connect REST API 发送 PUT
或 POST
HTTP 请求来动态创建连接器,在请求正文中指定连接器配置详情。
使用 PUT
命令时,它具有相同的命令来启动和更新连接器。
REST 接口默认侦听端口 8083,并支持以下端点:
GET /connectors
- 返回现有连接器列表。
POST /connectors
- 创建连接器。请求正文必须是带有连接器配置的 JSON 对象。
GET /connectors/<connector_name>
- 获取有关特定连接器的信息。
GET /connectors/<connector_name>/config
- 获取特定连接器的配置。
PUT /connectors/<connector_name>/config
- 更新特定连接器的配置。
GET /connectors/<connector_name>/status
- 获取特定连接器的状态。
GET /connectors/<connector_name>/tasks
- 获取特定连接器的任务列表
GET /connectors/<connector_name>/tasks/<task_id>/status
- 获取特定连接器的任务状态
PUT /connectors/<connector_name>/pause
- 暂停连接器及其所有任务。连接器将停止处理任何信息。
PUT /connectors/<connector_name>/stop
- 停止连接器及其所有任务。连接器将停止处理任何信息。从运行停止连接器可能更适合长时间运行,而不是只暂停。
PUT /connectors/<connector_name>/resume
- 恢复暂停的连接器。
POST /connectors/<connector_name>/restart
- 如果连接器失败,请重启它。
POST /connectors/<connector_name>/tasks/<task_id>/restart
- 重启特定的任务。
DELETE /connectors/<connector_name>
- 删除连接器。
GET /connectors/<connector_name>/topics
- 获取特定连接器的主题。
PUT /connectors/<connector_name>/topics/reset
- 为特定连接器清空一组活跃的主题。
GET /connectors/<connector_name>/offsets
- 获取连接器的当前偏移量。
DELETE /connectors/<connector_name>/offsets
- 为连接器重置偏移,该连接器必须处于已停止状态。
PATCH /connectors/<connector_name>/offsets
-
为连接器调整
偏移
属性(使用请求的偏移属性),该连接器必须处于已停止状态。 GET /connector-plugins
- 获取所有支持的连接器插件的列表。
GET /connector-plugins/<connector_plugin_type>/config
- 获取连接器插件的配置。
PUT /connector-plugins/<connector_type>/config/validate
- 验证连接器配置。
8.3.2.2. 指定连接器配置属性
要配置 Kafka Connect 连接器,您需要指定源或接收器连接器的配置详情。有两种方法可以做到这一点:通过 Kafka Connect REST API,使用 JSON 提供配置,或使用属性文件来定义配置属性。每种连接器类型可用的特定配置选项可能有所不同,但这两种方法都提供指定必要设置的灵活方法。
以下选项适用于所有连接器:
name
- 连接器的名称,它在当前 Kafka Connect 实例中必须是唯一的。
connector.class
-
连接器插件的类。例如,
org.apache.kafka.connect.file.FileStreamSinkConnector
。 tasks.max
- 指定连接器可以使用的最大任务数量。任务可让连接器并行执行工作。连接器可能会创建比指定更少的任务。
key.converter
-
用于将消息密钥转换为 Kafka 格式的类。这会覆盖 Kafka Connect 配置设置的默认值。例如,
org.apache.kafka.connect.json.JsonConverter
. value.converter
-
用于将消息有效负载转换为 Kafka 格式的类。这会覆盖 Kafka Connect 配置设置的默认值。例如,
org.apache.kafka.connect.json.JsonConverter
.
必须至少为接收器连接器设置以下选项之一:
topics
- 以逗号分隔的主题列表,用作输入。
topics.regex
- 用作输入的 Java 正则表达式。
有关所有其他选项,请参阅 Apache Kafka 文档中的 连接器属性。
Apache Kafka 的 Streams 在 Apache Kafka 安装目录的 Streams 中包含示例连接器配置文件 config/connect-file-sink.properties
和 config/connect-file-source.properties
。
8.3.3. 使用 Kafka Connect API 创建连接器
使用 Kafka Connect REST API 创建用于 Kafka Connect 的连接器。
先决条件
- Kafka Connect 安装。
流程
使用连接器配置准备 JSON 有效负载。例如:
{ "name": "my-connector", "config": { "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max": "1", "topics": "my-topic-1,my-topic-2", "file": "/tmp/output-file.txt" } }
发送 POST 请求到
<KafkaConnectAddress>:8083/connectors
,以创建连接器。以下示例使用curl
:curl -X POST -H "Content-Type: application/json" --data @sink-connector.json http://connect0.my-domain.com:8083/connectors
通过向 <
KafkaConnectAddress > :8083/connectors
发送 GET 请求来验证连接器是否已部署。以下示例使用curl
:curl http://connect0.my-domain.com:8083/connectors
8.3.4. 使用 Kafka Connect API 删除连接器
使用 Kafka Connect REST API 从 Kafka Connect 中删除连接器。
先决条件
- Kafka Connect 安装。
删除连接器
通过向
<KafkaConnectAddress>:8083/connectors/<ConnectorName>
发送GET
请求来验证连接器是否存在。以下示例使用curl
:curl http://connect0.my-domain.com:8083/connectors
要删除连接器,请发送
DELETE
请求到 <KafkaConnectAddress& gt; :8083/connectors
。以下示例使用curl
:curl -X DELETE http://connect0.my-domain.com:8083/connectors/my-connector
通过向 <
KafkaConnectAddress > :8083/connectors
发送 GET 请求来验证连接器是否已删除。以下示例使用curl
:curl http://connect0.my-domain.com:8083/connectors
8.3.5. 添加连接器插件
Kafka 提供示例连接器,用作开发连接器的起点。以下连接器示例包括在 Apache Kafka 的 Streams 中:
- FileStreamSink
- 从 Kafka 主题读取数据,并将数据写入一个文件中。
- FileStreamSource
- 从文件中读取数据,并将数据发送到 Kafka 主题。
这两个连接器都包含在 libs/connect-file-<kafka_version>.redhat-<build>.jar
插件中。
要使用 Kafka Connect 中的连接器插件,您可以将其添加到 classpath 中,或者在 Kafka Connect 属性文件中指定插件路径,并将插件复制到位置。
在 classpath 中指定示例连接器
CLASSPATH=/opt/kafka/libs/connect-file-<kafka_version>.redhat-<build>.jar opt/kafka/bin/connect-distributed.sh
设置插件路径
plugin.path=/opt/kafka/connector-plugins,/opt/connectors
plugin.path
配置选项可以包含以逗号分隔的路径列表。
如果需要,您可以添加更多连接器插件。Kafka Connect 在启动时搜索并运行连接器插件。
当在分布式模式下运行 Kafka Connect 时,所有 worker 节点上都必须提供插件。