7.2. 分布式模式的 Kafka 连接


在分布式模式中,Kafka Connect 在一个或多个 worker 节点上运行,并将工作负载分布到它们中。您可以使用 HTTP REST 接口管理连接器插件及其配置。

7.2.1. 在分布式模式下配置 Kafka 连接

要在分布式模式下配置 Kafka Connect,请编辑 config/connect-distributed.properties 配置文件。以下选项是最重要的选项。

bootstrap.servers
用作到 Kafka 的 bootstrap 连接的 Kafka 代理地址列表。例如,kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092
key.converter
用于将消息密钥转换到 Kafka 格式的类,以及从 Kafka 格式转换。例如,org.apache.kafka.connect.json.JsonConverter.
value.converter
用于将消息有效负载转换为 Kafka 格式的和从 Kafka 格式转换的类。例如,org.apache.kafka.connect.json.JsonConverter.
group.id
分布式 Kafka Connect 集群的名称。这必须是唯一的,且不得与其他消费者组 ID 冲突。默认值为 connect-cluster
config.storage.topic
用于存储连接器配置的 Kafka 主题。默认值为 connect-configs
offset.storage.topic
用于存储偏移的 Kafka 主题。默认值为 connect-offset
status.storage.topic
用于 worker 节点状态的 Kafka 主题。默认值为 connect-status

AMQ Streams 在分布式模式下包括 Kafka Connect 的示例配置文件 - 请参阅 AMQ Streams 安装目录中的 config/connect-distributed.properties

连接器插件打开了使用 bootstrap 地址到 Kafka 代理的客户端连接。要配置这些连接,请使用以 producer.consumer. 为前缀的标准 Kafka producer 和使用者配置选项。

7.2.2. 在分布式 Kafka Connect 中配置连接器

HTTP REST 接口

分布式 Kafka Connect 的连接器是使用 HTTP REST 接口配置的。REST 接口默认侦听端口 8083。它支持以下端点:

GET/connectors
返回现有连接器列表。
POST/连接器
创建连接器。请求正文必须是带有连接器配置的 JSON 对象。
GET/connectors/ <name>
获取有关特定连接器的信息。
GET /connectors/ &lt;name&gt; /config
获取特定连接器的配置。
PUT /connectors/ &lt;name&gt; /config
更新特定连接器的配置。
GET /connectors/ &lt;name&gt; /status
获取特定连接器的状态。
PUT /connectors/ &lt;name&gt; /pause
暂停连接器及其所有任务。连接器将停止处理任何消息。
PUT /connectors/ &lt;name&gt; /resume
恢复暂停的连接器。
POST /connectors/ &lt;name&gt; /restart
如果失败,请重启连接器。
DELETE /connectors/ <name>
删除连接器。
GET /connector-plugins
获取所有支持的连接器插件列表。

连接器配置

大多数配置选项都是特定于连接器的连接器,包括在连接器的文档中。以下字段对于所有连接器都是通用的。

name
连接器的名称。在给定的 Kafka Connect 实例中必须是唯一的。
connector.class
连接器插件的类。例如,org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max
此连接器使用的最大任务数量。连接器使用任务来并行处理其工作。Connetors 可能会创建比指定更少的任务。
key.converter
用于转换消息密钥的类,以及从 Kafka 格式转换。这会覆盖 Kafka Connect 配置设定的默认值。例如,org.apache.kafka.connect.json.JsonConverter.
value.converter
用于转换消息有效负载到 Kafka 格式的类。这会覆盖 Kafka Connect 配置设定的默认值。例如,org.apache.kafka.connect.json.JsonConverter.

另外,必须为接收器连接器设置以下选项之一:

主题
用作输入的以逗号分隔的主题列表。
topics.regex
用作输入的 Java 正则表达式。

有关所有其他选项,请查看特定连接器的文档。

AMQ Streams 包括连接器配置文件示例。您可以在 AMQ Streams 安装目录中的 config/connect-file-sink.propertiesconfig/connect-file-source.properties 中找到。

7.2.3. 运行分布式 Kafka 连接

这个步骤描述了如何在分布式模式下配置和运行 Kafka 连接。

先决条件

  • 已安装并运行 AMQ Streams 集群。

运行容器

  1. 编辑所有 Kafka Connect worker 节点上的 /opt/kafka/config/connect-distributed.properties Kafka Connect 配置文件。

    • 设置 bootstrap.server 选项以指向您的 Kafka 代理。
    • 设置 group.id 选项。
    • 设置 config.storage.topic 选项。
    • 设置 offset.storage.topic 选项。
    • 设置 status.storage.topic 选项。

      例如:

      bootstrap.servers=kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092
      group.id=my-group-id
      config.storage.topic=my-group-id-configs
      offset.storage.topic=my-group-id-offsets
      status.storage.topic=my-group-id-status
      Copy to Clipboard Toggle word wrap
  2. 使用所有 Kafka Connect 节点上的 /opt/kafka/config/connect-distributed.properties 配置文件启动 Kafka Connect worker。

    su - kafka
    /opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.properties
    Copy to Clipboard Toggle word wrap
  3. 验证 Kafka Connect 是否正在运行。

    jcmd | grep ConnectDistributed
    Copy to Clipboard Toggle word wrap

7.2.4. 创建连接器

此流程描述了如何使用 Kafka Connect 创建连接器插件,用于在分布式模式的 Kafka Connect 中使用。

先决条件

  • 以分布式模式运行的 Kafka 连接安装。

流程

  1. 使用连接器配置准备 JSON 有效负载。例如:

    {
      "name": "my-connector",
      "config": {
      "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
        "tasks.max": "1",
        "topics": "my-topic-1,my-topic-2",
        "file": "/tmp/output-file.txt"
      }
    }
    Copy to Clipboard Toggle word wrap
  2. 发送 POST 请求到 <KafkaConnectAddress>:8083/connectors,以创建连接器。以下示例使用 curl

    curl -X POST -H "Content-Type: application/json" --data @sink-connector.json http://connect0.my-domain.com:8083/connectors
    Copy to Clipboard Toggle word wrap
  3. 通过向 < KafkaConnectAddress > :8083/connectors 发送 GET 请求来验证连接器是否已部署。以下示例使用 curl

    curl http://connect0.my-domain.com:8083/connectors
    Copy to Clipboard Toggle word wrap

7.2.5. 删除连接器

这个步骤描述了如何使用 Kafka Connect 在分布式模式下从 Kafka Connect 中删除连接器插件。

先决条件

  • 以分布式模式运行的 Kafka 连接安装。

删除连接器

  1. 通过向 <KafkaConnectAddress>:8083/connectors/<ConnectorName> 发送 GET 请求来验证连接器是否存在。以下示例使用 curl

    curl http://connect0.my-domain.com:8083/connectors
    Copy to Clipboard Toggle word wrap
  2. 要删除连接器,请发送 DELETE 请求到 < KafkaConnectAddress > :8083/connectors。以下示例使用 curl

    curl -X DELETE http://connect0.my-domain.com:8083/connectors/my-connector
    Copy to Clipboard Toggle word wrap
  3. 通过将 GET 请求发送到 < KafkaConnectAddress>:8083/connectors 来验证连接器是否已删除。以下示例使用 curl

    curl http://connect0.my-domain.com:8083/connectors
    Copy to Clipboard Toggle word wrap
返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat