搜索

8.3. 管理连接器

download PDF

Kafka Connect REST API 提供用于直接创建、更新和删除连接器的端点。您还可以使用 API 检查连接器的状态或更改日志记录级别。当您通过 API 创建连接器时,您可以提供连接器的配置详情作为 API 调用的一部分。

您还可以添加和管理连接器作为插件。插件被打包为 JAR 文件,其中包含通过 Kafka Connect API 实现连接器的类。您只需要在 classpath 中指定插件,或将其添加到 Kafka Connect 的插件路径中,以便在启动时运行连接器插件。

除了使用 Kafka Connect REST API 或插件来管理连接器外,您还可以在独立模式中运行 Kafka Connect 时使用属性文件添加连接器配置。要做到这一点,您可以在启动 Kafka Connect worker 进程时指定属性文件的位置。属性文件应包含连接器的配置详情,包括连接器类、源和目标主题,以及任何所需的身份验证或序列化设置。

8.3.1. 限制对 Kafka Connect API 的访问

Kafka Connect REST API 可以被经过身份验证的用户访问,并知道端点 URL,其中包括主机名/IP 地址和端口号。仅将对 Kafka Connect API 的访问限制为可信用户,以防止未经授权的操作和潜在的安全问题。

为提高安全性,我们建议为 Kafka Connect API 配置以下属性:

  • (Kafka 3.4 或更高版本) org.apache.kafka.disallowed.login.modules,来专门排除不安全的登录模块
  • connector.client.config.override.policy 设置为 NONE,以防止连接器配置覆盖 Kafka Connect 配置及其使用的使用者和制作者

8.3.2. 配置连接器

使用 Kafka Connect REST API 或属性文件来创建、管理和监控连接器实例。在独立或分布式模式中使用 Kafka Connect 时,您可以使用 REST API。在独立模式中使用 Kafka Connect 时,您可以使用属性文件。

8.3.2.1. 使用 Kafka Connect REST API 管理连接器

使用 Kafka Connect REST API 时,您可以通过向 Kafka Connect REST API 发送 PUTPOST HTTP 请求来动态创建连接器,在请求正文中指定连接器配置详情。

提示

使用 PUT 命令时,它具有相同的命令来启动和更新连接器。

REST 接口默认侦听端口 8083,并支持以下端点:

GET /connectors
返回现有连接器列表。
POST /connectors
创建连接器。请求正文必须是带有连接器配置的 JSON 对象。
GET /connectors/<connector_name>
获取有关特定连接器的信息。
GET /connectors/<connector_name>/config
获取特定连接器的配置。
PUT /connectors/<connector_name>/config
更新特定连接器的配置。
GET /connectors/<connector_name>/status
获取特定连接器的状态。
GET /connectors/<connector_name>/tasks
获取特定连接器的任务列表
GET /connectors/<connector_name>/tasks/<task_id>/status
获取特定连接器的任务状态
PUT /connectors/<connector_name>/pause
暂停连接器及其所有任务。连接器将停止处理任何信息。
PUT /connectors/<connector_name>/stop
停止连接器及其所有任务。连接器将停止处理任何信息。从运行停止连接器可能更适合长时间运行,而不是只暂停。
PUT /connectors/<connector_name>/resume
恢复暂停的连接器。
POST /connectors/<connector_name>/restart
如果连接器失败,请重启它。
POST /connectors/<connector_name>/tasks/<task_id>/restart
重启特定的任务。
DELETE /connectors/<connector_name>
删除连接器。
GET /connectors/<connector_name>/topics
获取特定连接器的主题。
PUT /connectors/<connector_name>/topics/reset
为特定连接器清空一组活跃的主题。
GET /connectors/<connector_name>/offsets
获取连接器的当前偏移量。
DELETE /connectors/<connector_name>/offsets
为连接器重置偏移,该连接器必须处于已停止状态。
PATCH /connectors/<connector_name>/offsets
为连接器调整 偏移 属性(使用请求的偏移属性),该连接器必须处于已停止状态。
GET /connector-plugins
获取所有支持的连接器插件的列表。
GET /connector-plugins/<connector_plugin_type>/config
获取连接器插件的配置。
PUT /connector-plugins/<connector_type>/config/validate
验证连接器配置。

8.3.2.2. 指定连接器配置属性

要配置 Kafka Connect 连接器,您需要指定源或接收器连接器的配置详情。有两种方法可以做到这一点:通过 Kafka Connect REST API,使用 JSON 提供配置,或使用属性文件来定义配置属性。每种连接器类型可用的特定配置选项可能有所不同,但这两种方法都提供指定必要设置的灵活方法。

以下选项适用于所有连接器:

name
连接器的名称,它在当前 Kafka Connect 实例中必须是唯一的。
connector.class
连接器插件的类。例如,org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max
指定连接器可以使用的最大任务数量。任务可让连接器并行执行工作。连接器可能会创建比指定更少的任务。
key.converter
用于将消息密钥转换为 Kafka 格式的类。这会覆盖 Kafka Connect 配置设置的默认值。例如,org.apache.kafka.connect.json.JsonConverter.
value.converter
用于将消息有效负载转换为 Kafka 格式的类。这会覆盖 Kafka Connect 配置设置的默认值。例如,org.apache.kafka.connect.json.JsonConverter.

必须至少为接收器连接器设置以下选项之一:

topics
以逗号分隔的主题列表,用作输入。
topics.regex
用作输入的 Java 正则表达式。

有关所有其他选项,请参阅 Apache Kafka 文档中的 连接器属性。

注意

Apache Kafka 的 Streams 在 Apache Kafka 安装目录的 Streams 中包含示例连接器配置文件 config/connect-file-sink.propertiesconfig/connect-file-source.properties

8.3.3. 使用 Kafka Connect API 创建连接器

使用 Kafka Connect REST API 创建用于 Kafka Connect 的连接器。

先决条件

  • Kafka Connect 安装。

流程

  1. 使用连接器配置准备 JSON 有效负载。例如:

    {
      "name": "my-connector",
      "config": {
      "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
        "tasks.max": "1",
        "topics": "my-topic-1,my-topic-2",
        "file": "/tmp/output-file.txt"
      }
    }
  2. 发送 POST 请求到 <KafkaConnectAddress>:8083/connectors,以创建连接器。以下示例使用 curl

    curl -X POST -H "Content-Type: application/json" --data @sink-connector.json http://connect0.my-domain.com:8083/connectors
  3. 通过向 < KafkaConnectAddress > :8083/connectors 发送 GET 请求来验证连接器是否已部署。以下示例使用 curl

    curl http://connect0.my-domain.com:8083/connectors

8.3.4. 使用 Kafka Connect API 删除连接器

使用 Kafka Connect REST API 从 Kafka Connect 中删除连接器。

先决条件

  • Kafka Connect 安装。

删除连接器

  1. 通过向 <KafkaConnectAddress>:8083/connectors/<ConnectorName> 发送 GET 请求来验证连接器是否存在。以下示例使用 curl

    curl http://connect0.my-domain.com:8083/connectors
  2. 要删除连接器,请发送 DELETE 请求到 < KafkaConnectAddress& gt; :8083/connectors。以下示例使用 curl

    curl -X DELETE http://connect0.my-domain.com:8083/connectors/my-connector
  3. 通过向 < KafkaConnectAddress > :8083/connectors 发送 GET 请求来验证连接器是否已删除。以下示例使用 curl

    curl http://connect0.my-domain.com:8083/connectors

8.3.5. 添加连接器插件

Kafka 提供示例连接器,用作开发连接器的起点。以下连接器示例包括在 Apache Kafka 的 Streams 中:

FileStreamSink
从 Kafka 主题读取数据,并将数据写入一个文件中。
FileStreamSource
从文件中读取数据,并将数据发送到 Kafka 主题。

这两个连接器都包含在 libs/connect-file-<kafka_version>.redhat-<build>.jar 插件中。

要使用 Kafka Connect 中的连接器插件,您可以将其添加到 classpath 中,或者在 Kafka Connect 属性文件中指定插件路径,并将插件复制到位置。

在 classpath 中指定示例连接器

CLASSPATH=/opt/kafka/libs/connect-file-<kafka_version>.redhat-<build>.jar opt/kafka/bin/connect-distributed.sh

设置插件路径

plugin.path=/opt/kafka/connector-plugins,/opt/connectors

plugin.path 配置选项可以包含以逗号分隔的路径列表。

如果需要,您可以添加更多连接器插件。Kafka Connect 在启动时搜索并运行连接器插件。

注意

当在分布式模式下运行 Kafka Connect 时,所有 worker 节点上都必须提供插件。

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.