7.7. Kafka Connect 配置


使用 AMQ Streams 的 KafkaConnect 资源快速轻松地创建新的 Kafka Connect 集群。

当使用 KafkaConnect 资源部署 Kafka Connect 时,您可以指定 bootstrap 服务器地址(在 spec.bootstrapServers中)以连接到 Kafka 集群。当服务器停机时,您可以指定多个地址。您还指定身份验证凭据和 TLS 客户端证书,以建立安全连接。

注意

Kafka 集群不需要由 AMQ Streams 管理,或部署到 OpenShift 集群。

您还可以使用 KafkaConnect 资源来指定以下内容:

  • 构建包含插件的容器镜像的插件配置,以建立连接
  • 属于 Kafka Connect 集群的 worker pod 配置
  • 启用使用 KafkaConnector 资源管理插件的注解

Cluster Operator 管理使用 KafkaConnector 资源部署的 Kafka Connect 集群,以及利用 KafkaConnector 资源创建的连接器。

插件配置

插件提供创建连接器实例的实施。当插件实例化时,会为连接特定类型的外部数据系统提供配置。插件提供一组或者多个 JAR 文件,该文件定义了连接器和任务实施,以连接到指定类型数据源。许多外部系统的插件可用于 Kafka 连接。您还可以创建自己的插件。

配置描述了要发送到 Kafka Connect 的源输入数据和目标输出数据。对于源连接器,外部源数据必须引用要存储消息的特定主题。插件也可以包含转换数据所需的库和文件。

Kafka Connect 部署可以有一个或多个插件,但每个插件只能有一个版本。

您可以创建自定义 Kafka Connect 镜像,其中包括您选择的插件。您可以通过两种方式创建镜像:

要自动创建容器镜像,您可以使用 KafkaConnect 资源的 build 属性指定要添加到 Kafka Connect 集群中的插件。AMQ Streams 会自动下载插件工件并将其添加到新容器镜像中。

插件配置示例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  # ...
  build: 1
    output: 2
      type: docker
      image: my-registry.io/my-org/my-connect-cluster:latest
      pushSecret: my-registry-credentials
    plugins: 3
      - name: my-connector
        artifacts:
          - type: tgz
            url: https://<plugin_download_location>.tgz
            sha512sum: <checksum_to_verify_the_plugin>
      # ...
  # ...

1
2
推送新镜像的容器 registry 的配置。output 属性描述镜像的类型和名称,以及包含访问容器 registry 所需的凭证的 secret 名称。
3
要添加到新容器镜像的插件及其构件列表。plugins 属性描述工件的类型以及下载工件的 URL。每个插件必须配置至少一个工件。另外,您可以指定 SHA-512 校验和来验证工件,然后再解包。

如果使用 Dockerfile 构建镜像,您可以使用 AMQ Streams 的最新容器镜像作为基础镜像来添加插件配置文件。

显示手动添加插件配置示例

FROM registry.redhat.io/amq-streams/kafka-35-rhel8:2.5.1
USER root:root
COPY ./my-plugins/ /opt/kafka/plugins/
USER 1001

worker 的 Kafka Connect 集群配置

您可以在 KafkaConnect 资源的 config 属性中指定 worker 的配置。

分布式 Kafka Connect 集群有一个组 ID 和一组内部配置主题。

  • group.id
  • offset.storage.topic
  • config.storage.topic
  • status.storage.topic

Kafka Connect 集群默认使用这些属性的值配置。Kafka Connect 集群无法共享组 ID 或主题名称,因为它将创建错误。如果使用多个不同的 Kafka Connect 集群,则每个创建的 Kafka Connect 集群的 worker 必须是唯一的。

每个 Kafka Connect 集群使用的连接器名称也必须是唯一的。

在以下示例中,指定了 JSON 转换器。Kafka Connect 使用的内部 Kafka 主题设置了复制因素。对于生产环境,至少应有 3 个。在创建主题后更改复制因素将无效。

worker 配置示例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
# ...
spec:
  config:
    # ...
    group.id: my-connect-cluster 1
    offset.storage.topic: my-connect-cluster-offsets 2
    config.storage.topic: my-connect-cluster-configs 3
    status.storage.topic: my-connect-cluster-status 4
    key.converter: org.apache.kafka.connect.json.JsonConverter 5
    value.converter: org.apache.kafka.connect.json.JsonConverter 6
    key.converter.schemas.enable: true 7
    value.converter.schemas.enable: true 8
    config.storage.replication.factor: 3 9
    offset.storage.replication.factor: 3 10
    status.storage.replication.factor: 3 11
  # ...

1
Kafka 中的 Kafka Connect 集群 ID。每个 Kafka Connect 集群都必须是唯一的。
2
存储连接器偏移的 Kafka 主题。每个 Kafka Connect 集群都必须是唯一的。
3
存储连接器和任务状态配置的 Kafka 主题。每个 Kafka Connect 集群都必须是唯一的。
4
存储连接器和任务状态更新的 Kafka 主题。每个 Kafka Connect 集群都必须是唯一的。
5
转换程序,将消息密钥转换为 Kafka 中存储的 JSON 格式。
6
转换程序,将消息值转换为 Kafka 中存储的 JSON 格式。
7
为将消息键转换为结构化 JSON 格式的 schema。
8
为将消息值转换为结构化 JSON 格式的 schema。
9
存储连接器偏移的 Kafka 主题的复制因素。
10
存储连接器和任务状态配置的 Kafka 主题的复制因素。
11
存储连接器和任务状态更新的 Kafka 主题的复制因素。

连接器的 KafkaConnector 管理

在将插件添加到用于部署中的 worker pod 的容器镜像后,您可以使用 AMQ Streams 的 KafkaConnector 自定义资源或 Kafka Connect API 来管理连接器实例。您还可以使用这些选项创建新的连接器实例。

KafkaConnector 资源提供了一种 OpenShift 原生的方法来管理 Cluster Operator 连接器。要使用 KafkaConnector 资源管理连接器,您必须在 KafkaConnect 自定义资源中指定注解。

启用 KafkaConnectors 的注解

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
  # ...

use-connector-resources 设置为 true 可启用 KafkaConnectors 创建、删除和重新配置连接器。

如果在 KafkaConnect 配置中启用了 use-connector-resources,则必须使用 KafkaConnector 资源来定义和管理连接器。KafkaConnector 资源被配置为连接到外部系统。它们部署到与 Kafka Connect 集群和与外部数据系统交互的 Kafka 集群相同的 OpenShift 集群。

Kafka 组件在同一个 OpenShift 集群中包含

Kafka and Kafka Connect clusters

配置指定连接器实例如何连接到外部数据系统,包括任何身份验证。您还需要陈述要监视的数据。对于源连接器,您可能会在配置中提供数据库名称。您还可以通过指定目标主题名称指定数据在 Kafka 中的位置。

使用 tasksMax 指定最大任务数。例如,带有 tasksMax: 2 的源连接器可能会将源数据导入两个任务。

KafkaConnector 源连接器配置示例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: my-source-connector  1
  labels:
    strimzi.io/cluster: my-connect-cluster 2
spec:
  class: org.apache.kafka.connect.file.FileStreamSourceConnector 3
  tasksMax: 2 4
  config: 5
    file: "/opt/kafka/LICENSE" 6
    topic: my-topic 7
    # ...

1
KafkaConnector 资源的名称,用作连接器的名称。使用对 OpenShift 资源有效的任何名称。
2
在其中创建连接器实例的 Kafka Connect 集群的名称。连接器必须部署到它们所链接的 Kafka Connect 集群相同的命名空间中。
3
连接器类的全名。这应该存在于 Kafka Connect 集群使用的镜像中。
4
连接器可创建的最大 Kafka Connect 任务数量。
5
连接器配置 作为键值对。
6
外部数据文件的位置。在本例中,我们将 FileStreamSourceConnector 配置为从 /opt/kafka/LICENSE 文件中读取。
7
将源数据发布到的 Kafka 主题。
注意

Kafka Connect API

使用 Kafka Connect REST API 作为使用 KafkaConnector 资源管理连接器的替代选择。Kafka Connect REST API 作为一个运行在 <connect_cluster_name>-connect-api:8083 的服务其中 <connect_cluster_name> 是 Kafka Connect 集群的名称。

您可以将连接器配置添加为 JSON 对象。

添加连接器配置的 curl 请求示例

curl -X POST \
  http://my-connect-cluster-connect-api:8083/connectors \
  -H 'Content-Type: application/json' \
  -d '{ "name": "my-source-connector",
    "config":
    {
      "connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector",
      "file": "/opt/kafka/LICENSE",
      "topic":"my-topic",
      "tasksMax": "4",
      "type": "source"
    }
}'

如果启用了 KafkaConnectors,Cluster Operator 将恢复使用 Kafka Connect REST API 进行的手动更改。

REST API 支持的操作在 Apache Kafka Connect API 文档中 描述。

注意

您可以在 OpenShift 外部公开 Kafka Connect API 服务。为此,您可以创建一个使用连接机制来提供访问的服务,如入口或路由。我们建议使用,因为连接是不安全的。

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.