8.3. 管理连接器
Kafka Connect REST API 提供了直接创建、更新和删除连接器的端点。您还可以使用 API 检查连接器的状态或更改日志级别。通过 API 创建连接器时,您可以提供连接器的配置详情作为 API 调用的一部分。
您还可以将连接器作为插件添加和管理。插件打包为 JAR 文件,该文件包含通过 Kafka Connect API 实现连接器的类。您只需要在 classpath 中指定插件,或将其添加到 Kafka Connect 的插件路径中,以便在启动时运行连接器插件。
除了使用 Kafka Connect REST API 或插件来管理连接器外,您还可以在以独立模式运行时使用属性文件添加连接器配置。要做到这一点,您可以在启动 Kafka Connect worker 进程时指定属性文件的位置。属性文件应包含连接器的配置详情,包括连接器类、源和目标主题,以及任何所需的身份验证或序列化设置。
8.3.1. 限制对 Kafka Connect API 的访问 复制链接链接已复制到粘贴板!
Kafka Connect REST API 可以被经过身份验证访问权限的任何人访问,并知道端点 URL,其中包括主机名/IP 地址和端口号。仅将对 Kafka Connect API 的访问限制为可信用户,以防止未经授权的操作和潜在的安全问题。
为了提高安全性,我们建议为 Kafka Connect API 配置以下属性:
-
(Kafka 3.4 或更高版本)
org.apache.kafka.disallowed.login.modules
以明确排除不安全的登录模块 -
connector.client.config.override.policy
设置为NONE
,以防止连接器配置覆盖 Kafka Connect 配置及其使用的用户和制作者
8.3.2. 配置连接器 复制链接链接已复制到粘贴板!
使用 Kafka Connect REST API 或属性文件来创建、管理和监控连接器实例。在独立或分布式模式中使用 Kafka Connect 时,您可以使用 REST API。您可以在独立模式中使用 Kafka Connect 时使用属性文件。
8.3.2.1. 使用 Kafka Connect REST API 管理连接器 复制链接链接已复制到粘贴板!
使用 Kafka Connect REST API 时,您可以通过向 Kafka Connect REST API 发送 PUT
或 POST
HTTP 请求来动态创建连接器,在请求正文中指定连接器配置详情。
使用 PUT
命令时,它与启动和更新连接器的命令相同。
REST 接口默认侦听端口 8083 并支持以下端点:
GET /connectors
- 返回现有连接器列表。
POST /connectors
- 创建连接器。请求正文必须是带有连接器配置的 JSON 对象。
GET /connectors/<connector_name>
- 获取有关特定连接器的信息。
GET /connectors/<connector_name>/config
- 获取特定连接器的配置。
PUT /connectors/<connector_name>/config
- 更新特定连接器的配置。
GET /connectors/<connector_name>/status
- 获取特定连接器的状态。
GET /connectors/<connector_name>/tasks
- 获取特定连接器的任务列表
GET /connectors/<connector_name>/tasks/<task_id>/status
- 获取特定连接器的任务状态
PUT /connectors/<connector_name>/pause
- 暂停连接器及其所有任务。连接器将停止处理任何消息。
PUT /connectors/<connector_name>/resume
- 恢复暂停的连接器。
POST /connectors/<connector_name>/restart
- 如果连接器失败,重启它。
POST /connectors/<connector_name>/tasks/<task_id>/restart
- 重启特定的任务。
DELETE /connectors/<connector_name>
- 删除连接器。
GET /connectors/<connector_name>/topics
- 获取特定连接器的主题。
PUT /connectors/<connector_name>/topics/reset
- 清空特定连接器的活跃主题集合。
GET /connector-plugins
- 获取所有支持的连接器插件列表。
PUT /connector-plugins/<connector_type>/config/validate
- 验证连接器配置。
8.3.2.2. 指定连接器配置属性 复制链接链接已复制到粘贴板!
要配置 Kafka Connect 连接器,您需要指定源或接收器连接器的配置详情。执行此操作有两种方法:通过 Kafka Connect REST API 使用 JSON 提供配置,或使用属性文件来定义配置属性。每种类型的连接器可用的特定配置选项可能会有所不同,但这两种方法都提供了指定所需设置的灵活方法。
以下选项适用于所有连接器:
name
- 连接器的名称,它在当前 Kafka Connect 实例中必须是唯一的。
connector.class
-
连接器插件的类。例如:
org.apache.kafka.connect.file.FileStreamSinkConnector
。 tasks.max
- 指定连接器可以使用的最大任务数量。任务使连接器能够并行执行工作。连接器可能会创建比指定数量较少的任务。
key.converter
-
用于将消息密钥转换为 Kafka 格式或从 Kafka 格式转换的类。这会覆盖 Kafka Connect 配置设置的默认值。例如,
org.apache.kafka.connect.json.JsonConverter
. value.converter
-
用于将消息有效负载转换为 Kafka 格式的类。这会覆盖 Kafka Connect 配置设置的默认值。例如,
org.apache.kafka.connect.json.JsonConverter
.
必须至少为接收器连接器设置以下选项之一:
topics
- 以逗号分隔的主题列表,用作输入。
topics.regex
- 用作输入的 Java 正则表达式。
有关所有其他选项,请参阅 Apache Kafka 文档中的 连接器属性。
AMQ Streams 包括示例连接器配置文件 config/connect-file-sink.properties
和 config/connect-file-source.properties
。
8.3.3. 使用 Kafka Connect API 创建连接器 复制链接链接已复制到粘贴板!
使用 Kafka Connect REST API 创建用于 Kafka Connect 的连接器。
先决条件
- Kafka Connect 安装。
流程
使用连接器配置准备 JSON 有效负载。例如:
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 发送 POST 请求到
<KafkaConnectAddress>:8083/connectors
,以创建连接器。以下示例使用curl
:curl -X POST -H "Content-Type: application/json" --data @sink-connector.json http://connect0.my-domain.com:8083/connectors
curl -X POST -H "Content-Type: application/json" --data @sink-connector.json http://connect0.my-domain.com:8083/connectors
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 通过向 <
KafkaConnectAddress > :8083/connectors
发送 GET 请求来验证连接器是否已部署。以下示例使用curl
:curl http://connect0.my-domain.com:8083/connectors
curl http://connect0.my-domain.com:8083/connectors
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
8.3.4. 使用 Kafka Connect API 删除连接器 复制链接链接已复制到粘贴板!
使用 Kafka Connect REST API 从 Kafka Connect 中删除连接器。
先决条件
- Kafka Connect 安装。
删除连接器
通过向
<KafkaConnectAddress>:8083/connectors/<ConnectorName>
发送GET
请求来验证连接器是否存在。以下示例使用curl
:curl http://connect0.my-domain.com:8083/connectors
curl http://connect0.my-domain.com:8083/connectors
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 要删除连接器,请发送
DELETE
请求到 <KafkaConnectAddress> : 8083/connectors
。以下示例使用curl
:curl -X DELETE http://connect0.my-domain.com:8083/connectors/my-connector
curl -X DELETE http://connect0.my-domain.com:8083/connectors/my-connector
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 通过向 <
KafkaConnectAddress > :8083/connectors
发送 GET 请求来验证连接器是否已删除。以下示例使用curl
:curl http://connect0.my-domain.com:8083/connectors
curl http://connect0.my-domain.com:8083/connectors
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
8.3.5. 添加连接器插件 复制链接链接已复制到粘贴板!
Kafka 提供了示例连接器,用作开发连接器的起点。AMQ Streams 中包含以下示例连接器:
- FileStreamSink
- 从 Kafka 主题读取数据并将数据写入文件中。
- FileStreamSource
- 从文件读取数据并将数据发送到 Kafka 主题。
这两个连接器都包含在 libs/connect-file-<kafka_version>.redhat-<build>.jar
插件中。
要在 Kafka Connect 中使用连接器插件,您可以将其添加到 classpath 中,或者在 Kafka Connect 属性文件中指定插件路径,并将插件复制到该位置。
在 classpath 中指定示例连接器
CLASSPATH=/opt/kafka/libs/connect-file-<kafka_version>.redhat-<build>.jar opt/kafka/bin/connect-distributed.sh
CLASSPATH=/opt/kafka/libs/connect-file-<kafka_version>.redhat-<build>.jar opt/kafka/bin/connect-distributed.sh
设置插件路径
plugin.path=/opt/kafka/connector-plugins,/opt/connectors
plugin.path=/opt/kafka/connector-plugins,/opt/connectors
plugin.path
配置选项可以包含以逗号分隔的路径列表。
如果需要,您可以添加更多连接器插件。Kafka Connect 会在启动时搜索并运行连接器插件。
当在分布式模式下运行 Kafka Connect 时,必须在所有 worker 节点上提供插件。