5.5. Kafka Connect 配置


基本的 Kafka Connect 配置需要一个 bootstrap 地址来连接到 Kafka 集群,以及加密和身份验证详情。

默认情况下会使用相同的配置 Kafka Connect 实例:

  • Kafka Connect 集群的组 ID
  • 保存连接器偏移的 Kafka 主题
  • 存储连接器和任务状态配置的 Kafka 主题
  • 存储连接器和任务状态更新的 Kafka 主题

如果使用多个不同的 Kafka Connect 实例,这些设置必须反映每个实例。

显示 Kafka Connect 配置的 YAML 示例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect
spec:
  # ...
  config:
    group.id: my-connect-cluster
    offset.storage.topic: my-connect-cluster-offsets
    config.storage.topic: my-connect-cluster-configs
    status.storage.topic: my-connect-cluster-status
  # ...

连接器

连接器与 Kafka Connect 独立配置。该配置描述了要馈入和离开 Kafka Connect 的源输入数据和目标输出数据。外部源数据必须引用存储消息的特定主题。

Kafka 提供两个内置连接器:

  • FileStreamSourceConnector 将外部系统的数据流传输到 Kafka,从输入源读取行并将每行发送到 Kafka 主题。
  • FileStreamSinkConnector 将 Kafka 数据流传输到外部系统,读取 Kafka 主题中的信息,并在输出文件中为每个文件创建一个行。

您可以使用连接器插件添加其他连接器,它们是一组 JAR 文件或 TGZ 存档,定义了连接到特定类型外部系统所需的实施。

您可以创建一个使用新连接器插件的自定义 Kafka Connect 镜像。

要创建镜像,您可以使用:

对于 AMQ Streams 自动创建新镜像,build 配置需要 output 属性引用存储容器镜像的容器 registry,以及 plugins 属性来列出要添加到镜像中的连接器插件及其工件。

output 属性描述了镜像的类型和名称,以及包含访问容器 registry 所需凭证的 Secret 名称(可选)。plugins 属性描述了工件的类型以及从中下载工件的 URL。另外,您可以在解压缩工件前指定一个 SHA-512 校验和来验证工件。

用于自动创建新镜像的 Kafka Connect 配置示例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
spec:
  # ...
  build:
    output:
      type: docker
      image: my-registry.io/my-org/my-connect-cluster:latest
      pushSecret: my-registry-credentials
    plugins:
      - name: debezium-postgres-connector
        artifacts:
          - type: tgz
            url: https://ARTIFACT-ADDRESS.tgz
            sha512sum: HASH-NUMBER-TO-VERIFY-ARTIFACT
      # ...
  #...

管理连接器

您可以使用 KafkaConnector 资源或 Kafka Connect REST API 在 Kafka Connect 集群中创建和管理连接器实例。KafkaConnector 资源提供 OpenShift 原生方法,并由 Cluster Operator 管理。

KafkaConnector 资源的 spec 指定连接器类和配置设置,以及处理数据的连接器 任务 的最大数量。

显示 KafkaConnector 配置的 YAML 示例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: my-source-connector
  labels:
    strimzi.io/cluster: my-connect-cluster
spec:
  class: org.apache.kafka.connect.file.FileStreamSourceConnector
  tasksMax: 2
  config:
    file: "/opt/kafka/LICENSE"
    topic: my-topic
    # ...

您可以通过在 KafkaConnect 资源中添加注解来启用 KafkaConnectors。KafkaConnector 资源必须部署到与其链接的 Kafka Connect 集群相同的命名空间中。

启用 KafkaConnector 的 YAML 显示注解示例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect
  annotations:
    strimzi.io/use-connector-resources: "true"
  # ...
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.