7.7. Kafka Connect 配置


使用 AMQ Streams 的 KafkaConnect 资源快速轻松地创建新的 Kafka Connect 集群。

当使用 KafkaConnect 资源部署 Kafka Connect 时,您可以指定 bootstrap 服务器地址(在 spec.bootstrapServers中)以连接到 Kafka 集群。当服务器停机时,您可以指定多个地址。您还指定身份验证凭据和 TLS 客户端证书,以建立安全连接。

注意

Kafka 集群不需要由 AMQ Streams 管理,或部署到 OpenShift 集群。

您还可以使用 KafkaConnect 资源来指定以下内容:

  • 构建包含插件的容器镜像的插件配置,以建立连接
  • 属于 Kafka Connect 集群的 worker pod 配置
  • 启用使用 KafkaConnector 资源管理插件的注解

Cluster Operator 管理使用 KafkaConnector 资源部署的 Kafka Connect 集群,以及利用 KafkaConnector 资源创建的连接器。

插件配置

插件提供创建连接器实例的实施。当插件实例化时,会为连接特定类型的外部数据系统提供配置。插件提供一组或者多个 JAR 文件,该文件定义了连接器和任务实施,以连接到指定类型数据源。许多外部系统的插件可用于 Kafka 连接。您还可以创建自己的插件。

配置描述了要发送到 Kafka Connect 的源输入数据和目标输出数据。对于源连接器,外部源数据必须引用要存储消息的特定主题。插件也可以包含转换数据所需的库和文件。

Kafka Connect 部署可以有一个或多个插件,但每个插件只能有一个版本。

您可以创建自定义 Kafka Connect 镜像,其中包括您选择的插件。您可以通过两种方式创建镜像:

要自动创建容器镜像,您可以使用 KafkaConnect 资源的 build 属性指定要添加到 Kafka Connect 集群中的插件。AMQ Streams 会自动下载插件工件并将其添加到新容器镜像中。

插件配置示例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  # ...
  build: 
1

    output: 
2

      type: docker
      image: my-registry.io/my-org/my-connect-cluster:latest
      pushSecret: my-registry-credentials
    plugins: 
3

      - name: my-connector
        artifacts:
          - type: tgz
            url: https://<plugin_download_location>.tgz
            sha512sum: <checksum_to_verify_the_plugin>
      # ...
  # ...
Copy to Clipboard Toggle word wrap

1
2
推送新镜像的容器 registry 的配置。output 属性描述镜像的类型和名称,以及包含访问容器 registry 所需的凭证的 secret 名称。
3
要添加到新容器镜像的插件及其构件列表。plugins 属性描述工件的类型以及下载工件的 URL。每个插件必须配置至少一个工件。另外,您可以指定 SHA-512 校验和来验证工件,然后再解包。

如果使用 Dockerfile 构建镜像,您可以使用 AMQ Streams 的最新容器镜像作为基础镜像来添加插件配置文件。

显示手动添加插件配置示例

FROM registry.redhat.io/amq-streams/kafka-36-rhel8:2.6.0
USER root:root
COPY ./my-plugins/ /opt/kafka/plugins/
USER 1001
Copy to Clipboard Toggle word wrap

worker 的 Kafka Connect 集群配置

您可以在 KafkaConnect 资源的 config 属性中指定 worker 的配置。

分布式 Kafka Connect 集群有一个组 ID 和一组内部配置主题。

  • group.id
  • offset.storage.topic
  • config.storage.topic
  • status.storage.topic

Kafka Connect 集群默认使用这些属性的值配置。Kafka Connect 集群无法共享组 ID 或主题名称,因为它将创建错误。如果使用多个不同的 Kafka Connect 集群,则每个创建的 Kafka Connect 集群的 worker 必须是唯一的。

每个 Kafka Connect 集群使用的连接器名称也必须是唯一的。

在以下示例中,指定了 JSON 转换器。Kafka Connect 使用的内部 Kafka 主题设置了复制因素。对于生产环境,至少应有 3 个。在创建主题后更改复制因素将无效。

worker 配置示例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
# ...
spec:
  config:
    # ...
    group.id: my-connect-cluster 
1

    offset.storage.topic: my-connect-cluster-offsets 
2

    config.storage.topic: my-connect-cluster-configs 
3

    status.storage.topic: my-connect-cluster-status 
4

    key.converter: org.apache.kafka.connect.json.JsonConverter 
5

    value.converter: org.apache.kafka.connect.json.JsonConverter 
6

    key.converter.schemas.enable: true 
7

    value.converter.schemas.enable: true 
8

    config.storage.replication.factor: 3 
9

    offset.storage.replication.factor: 3 
10

    status.storage.replication.factor: 3 
11

  # ...
Copy to Clipboard Toggle word wrap

1
Kafka 中的 Kafka Connect 集群 ID。每个 Kafka Connect 集群都必须是唯一的。
2
存储连接器偏移的 Kafka 主题。每个 Kafka Connect 集群都必须是唯一的。
3
存储连接器和任务状态配置的 Kafka 主题。每个 Kafka Connect 集群都必须是唯一的。
4
存储连接器和任务状态更新的 Kafka 主题。每个 Kafka Connect 集群都必须是唯一的。
5
转换程序,将消息密钥转换为 Kafka 中存储的 JSON 格式。
6
转换程序,将消息值转换为 Kafka 中存储的 JSON 格式。
7
为将消息键转换为结构化 JSON 格式的 schema。
8
为将消息值转换为结构化 JSON 格式的 schema。
9
存储连接器偏移的 Kafka 主题的复制因素。
10
存储连接器和任务状态配置的 Kafka 主题的复制因素。
11
存储连接器和任务状态更新的 Kafka 主题的复制因素。

连接器的 KafkaConnector 管理

在将插件添加到用于部署中的 worker pod 的容器镜像后,您可以使用 AMQ Streams 的 KafkaConnector 自定义资源或 Kafka Connect API 来管理连接器实例。您还可以使用这些选项创建新的连接器实例。

KafkaConnector 资源提供了一种 OpenShift 原生的方法来管理 Cluster Operator 连接器。要使用 KafkaConnector 资源管理连接器,您必须在 KafkaConnect 自定义资源中指定注解。

启用 KafkaConnectors 的注解

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
  # ...
Copy to Clipboard Toggle word wrap

use-connector-resources 设置为 true 可启用 KafkaConnectors 创建、删除和重新配置连接器。

如果在 KafkaConnect 配置中启用了 use-connector-resources,则必须使用 KafkaConnector 资源来定义和管理连接器。KafkaConnector 资源被配置为连接到外部系统。它们部署到与 Kafka Connect 集群和与外部数据系统交互的 Kafka 集群相同的 OpenShift 集群。

Kafka 组件在同一个 OpenShift 集群中包含

Kafka and Kafka Connect clusters

配置指定连接器实例如何连接到外部数据系统,包括任何身份验证。您还需要陈述要监视的数据。对于源连接器,您可能会在配置中提供数据库名称。您还可以通过指定目标主题名称指定数据在 Kafka 中的位置。

使用 tasksMax 指定最大任务数。例如,带有 tasksMax: 2 的源连接器可能会将源数据导入两个任务。

KafkaConnector 源连接器配置示例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: my-source-connector  
1

  labels:
    strimzi.io/cluster: my-connect-cluster 
2

spec:
  class: org.apache.kafka.connect.file.FileStreamSourceConnector 
3

  tasksMax: 2 
4

  autoRestart: 
5

    enabled: true
  config: 
6

    file: "/opt/kafka/LICENSE" 
7

    topic: my-topic 
8

    # ...
Copy to Clipboard Toggle word wrap

1
KafkaConnector 资源的名称,用作连接器的名称。使用对 OpenShift 资源有效的任何名称。
2
在其中创建连接器实例的 Kafka Connect 集群的名称。连接器必须部署到它们所链接的 Kafka Connect 集群相同的命名空间中。
3
连接器类的全名。这应该存在于 Kafka Connect 集群使用的镜像中。
4
连接器可创建的最大 Kafka Connect 任务数量。
5
启用自动重启失败的连接器和任务。默认情况下,重启数量是无限的,但您可以使用 maxRestarts 属性设置自动重启次数的最大值。
6
连接器配置 作为键值对。
7
外部数据文件的位置。在本例中,我们将 FileStreamSourceConnector 配置为从 /opt/kafka/LICENSE 文件中读取。
8
将源数据发布到的 Kafka 主题。
注意

Kafka Connect API

使用 Kafka Connect REST API 作为使用 KafkaConnector 资源管理连接器的替代选择。Kafka Connect REST API 作为一个运行在 <connect_cluster_name>-connect-api:8083 的服务其中 <connect_cluster_name> 是 Kafka Connect 集群的名称。

您可以将连接器配置添加为 JSON 对象。

添加连接器配置的 curl 请求示例

curl -X POST \
  http://my-connect-cluster-connect-api:8083/connectors \
  -H 'Content-Type: application/json' \
  -d '{ "name": "my-source-connector",
    "config":
    {
      "connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector",
      "file": "/opt/kafka/LICENSE",
      "topic":"my-topic",
      "tasksMax": "4",
      "type": "source"
    }
}'
Copy to Clipboard Toggle word wrap

如果启用了 KafkaConnectors,Cluster Operator 将恢复使用 Kafka Connect REST API 进行的手动更改。

REST API 支持的操作在 Apache Kafka Connect API 文档中 进行了描述。

注意

您可以在 OpenShift 外部公开 Kafka Connect API 服务。为此,您可以创建一个使用连接机制来提供访问的服务,如入口或路由。我们建议使用,因为连接是不安全的。

返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat