8.2. 在分布式模式下的 Kafka Connect


在分布式模式中,Kafka Connect 在一个或多个 worker 节点上运行,工作负载分布在它们中。您可以使用 HTTP REST 接口管理连接器插件及其配置。

8.2.1. 在分布式模式下配置 Kafka 连接

要在分布式模式下配置 Kafka Connect,请编辑 config/connect-distributed.properties 配置文件。以下选项是最重要的。

bootstrap.servers
用作 Kafka 的 bootstrap 连接的 Kafka 代理地址列表。例如: kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092
key.converter
用于将消息密钥转换为 Kafka 格式的类。例如,org.apache.kafka.connect.json.JsonConverter.
value.converter
用于将消息有效负载转换为 Kafka 格式的类。例如,org.apache.kafka.connect.json.JsonConverter.
group.id
分布式 Kafka Connect 集群的名称。这必须是唯一的,且不得与另一个消费者组 ID 冲突。默认值为 connect-cluster
config.storage.topic
用于存储连接器配置的 Kafka 主题。默认值为 connect-configs
offset.storage.topic
用于存储偏移的 Kafka 主题。默认值为 connect-offset
status.storage.topic
用于 worker 节点状态的 Kafka 主题。默认值为 connect-status

AMQ Streams 在分布式模式下包括 Kafka Connect 的示例配置文件 - 请参阅 AMQ Streams 安装目录中的 config/connect-distributed.properties

有关所有支持的 Kafka Connect 配置选项的完整列表,请参阅 附录 F, Kafka Connect 配置参数

连接器插件使用 bootstrap 地址打开到 Kafka 代理的客户端连接。要配置这些连接,请使用以 producer.consumer. 为前缀的标准 Kafka producer 和使用者配置选项。

有关配置 Kafka 生成者和消费者的更多信息,请参阅:

8.2.2. 在分布式 Kafka Connect 中配置连接器

HTTP REST 接口

分布式 Kafka Connect 的连接器使用 HTTP REST 接口进行配置。默认情况下,REST 接口侦听端口 8083。它支持以下端点:

GET /connectors
返回现有连接器列表。
POST /connectors
创建连接器。请求正文必须是带有连接器配置的 JSON 对象。
GET /connectors/ <name>
获取有关特定连接器的信息。
GET /connectors/ &lt;name&gt; /config
获取特定连接器的配置。
PUT /connectors/ &lt;name&gt; /config
更新特定连接器的配置。
GET /connectors/ &lt;name&gt; /status
获取特定连接器的状态。
PUT /connectors/ &lt;name&gt; /pause
暂停连接器及其所有任务。连接器将停止处理任何信息。
PUT /connectors/ &lt;name&gt; /resume
恢复暂停的连接器。
POST /connectors/ &lt;name&gt; /restart
如果连接器失败,请重启它。
DELETE /connectors/ <name>
删除连接器。
GET /connector-plugins
获取所有支持的连接器插件的列表。

连接器配置

大多数配置选项都是特定于连接器的,包括在连接器的文档中。以下字段适用于所有连接器。

name
连接器的名称。在给定的 Kafka Connect 实例中必须是唯一的。
connector.class
连接器插件的类。例如 org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max
此连接器使用的最大任务数量。连接器使用任务来并行处理其工作。Connetors 可以创建比指定更少的任务。
key.converter
用于将消息密钥转换为 Kafka 格式的类。这会覆盖 Kafka Connect 配置设置的默认值。例如,org.apache.kafka.connect.json.JsonConverter.
value.converter
用于将消息有效负载转换为 Kafka 格式的类。这会覆盖 Kafka Connect 配置设置的默认值。例如,org.apache.kafka.connect.json.JsonConverter.

另外,必须为接收器连接器设置以下选项之一:

topics
以逗号分隔的主题列表,用作输入。
topics.regex
用作输入的 Java 正则表达式。

有关所有其他选项,请查看特定连接器的文档。

AMQ Streams 包括示例连接器配置文件。您可以在 AMQ Streams 安装目录中的 config/connect-file-sink.propertiesconfig/connect-file-source.properties 中找到。

8.2.3. 运行分布式 Kafka Connect

这个步骤描述了如何在分布式模式下配置和运行 Kafka Connect。

先决条件

  • 已安装并运行 AMQ Streams 集群。

运行集群

  1. 编辑所有 Kafka Connect worker 节点上的 /opt/kafka/config/connect-distributed.properties Kafka Connect 配置文件。

    • 设置 bootstrap.server 选项以指向 Kafka 代理。
    • 设置 group.id 选项。
    • 设置 config.storage.topic 选项。
    • 设置 offset.storage.topic 选项。
    • 设置 status.storage.topic 选项。

      例如:

      bootstrap.servers=kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092
      group.id=my-group-id
      config.storage.topic=my-group-id-configs
      offset.storage.topic=my-group-id-offsets
      status.storage.topic=my-group-id-status
      Copy to Clipboard Toggle word wrap
  2. 使用所有 Kafka Connect 节点上的 /opt/kafka/config/connect-distributed.properties 配置文件启动 Kafka Connect worker。

    su - kafka
    /opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.properties
    Copy to Clipboard Toggle word wrap
  3. 验证 Kafka Connect 是否正在运行。

    jcmd | grep ConnectDistributed
    Copy to Clipboard Toggle word wrap

其他资源

8.2.4. 创建连接器

此流程描述了如何使用 Kafka Connect REST API 创建连接器插件,以便在分布式模式中使用 Kafka Connect。

先决条件

  • 以分布式模式运行的 Kafka 连接安装。

流程

  1. 使用连接器配置准备 JSON 有效负载。例如:

    {
      "name": "my-connector",
      "config": {
      "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
        "tasks.max": "1",
        "topics": "my-topic-1,my-topic-2",
        "file": "/tmp/output-file.txt"
      }
    }
    Copy to Clipboard Toggle word wrap
  2. 发送 POST 请求到 <KafkaConnectAddress>:8083/connectors,以创建连接器。以下示例使用 curl

    curl -X POST -H "Content-Type: application/json" --data @sink-connector.json http://connect0.my-domain.com:8083/connectors
    Copy to Clipboard Toggle word wrap
  3. 通过向 < KafkaConnectAddress > :8083/connectors 发送 GET 请求来验证连接器是否已部署。以下示例使用 curl

    curl http://connect0.my-domain.com:8083/connectors
    Copy to Clipboard Toggle word wrap

8.2.5. 删除连接器

此流程描述了如何使用 Kafka Connect REST API 从分布式模式的 Kafka Connect 中删除连接器插件。

先决条件

  • 以分布式模式运行的 Kafka 连接安装。

删除连接器

  1. 通过向 <KafkaConnectAddress>:8083/connectors/<ConnectorName> 发送 GET 请求来验证连接器是否存在。以下示例使用 curl

    curl http://connect0.my-domain.com:8083/connectors
    Copy to Clipboard Toggle word wrap
  2. 要删除连接器,请发送 DELETE 请求到 < KafkaConnectAddress& gt; :8083/connectors。以下示例使用 curl

    curl -X DELETE http://connect0.my-domain.com:8083/connectors/my-connector
    Copy to Clipboard Toggle word wrap
  3. 通过向 < KafkaConnectAddress > :8083/connectors 发送 GET 请求来验证连接器是否已删除。以下示例使用 curl

    curl http://connect0.my-domain.com:8083/connectors
    Copy to Clipboard Toggle word wrap
返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat