9.8. 配置 Kafka 连接连接器


KafkaConnector 资源提供了一种 Kubernetes 原生的方法来管理 Cluster Operator 连接器。要创建、删除或重新配置带有 KafkaConnector 资源的连接器,您必须在 KafkaConnect 自定义资源中将 use-connector-resources 注解设置为 true

启用 KafkaConnectors 的注解

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
  # ...
Copy to Clipboard

当在 KafkaConnect 配置中启用了 use-connector-resources 注解时,您必须使用 KafkaConnector 资源定义和管理连接器。

注意

另外,您可以使用 Kafka Connect REST API 而不是 KafkaConnector 资源管理连接器。要使用 API,您必须删除 strimzi.io/use-connector-resources 注解,以便在 KafkaConnect 中使用 KafkaConnector 资源。

KafkaConnector 资源提供了在 Kafka Connect 集群中创建连接器所需的配置,它根据 KafkaConnect 配置中指定的 Kafka 集群交互。Kafka 集群不需要由 Apache Kafka 的 Streams 管理,或部署到 Kubernetes 集群。

同一 Kubernetes 集群中包含的 Kafka 组件

Kafka and Kafka Connect clusters

配置还指定连接器实例如何与外部数据系统交互,包括任何所需的身份验证方法。另外,您必须定义要监视的数据。例如,在从数据库读取数据的源连接器中,配置可能包含数据库名称。您还可以通过指定目标主题名称来定义此数据应该在 Kafka 中放置的位置。

使用 tasksMax 属性指定最大任务数。例如,带有 tasksMax: 2 的源连接器可能会将源数据导入两个任务。

源连接器配置示例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: my-source-connector  
1

  labels:
    strimzi.io/cluster: my-connect-cluster 
2

spec:
  class: org.apache.kafka.connect.file.FileStreamSourceConnector 
3

  tasksMax: 2 
4

  autoRestart: 
5

    enabled: true
  config: 
6

    file: "/opt/kafka/LICENSE" 
7

    topic: my-topic 
8

    # ...
Copy to Clipboard

1
KafkaConnector 资源的名称,用作连接器的名称。使用对 OpenShift 资源有效的任何名称。
2
在其中创建连接器实例的 Kafka Connect 集群的名称。连接器必须部署到它们所链接的 Kafka Connect 集群相同的命名空间中。
3
连接器类的全名。这应该存在于 Kafka Connect 集群使用的镜像中。
4
连接器可创建的最大 Kafka Connect 任务数量。
5
启用自动重启失败的连接器和任务。默认情况下,重启数量是无限的,但您可以使用 maxRestarts 属性设置自动重启次数的最大值。
6
连接器配置 作为键值对。
7
外部数据文件的位置。在本例中,我们将 FileStreamSourceConnector 配置为从 /opt/kafka/LICENSE 文件中读取。
8
将源数据发布到的 Kafka 主题。

要包括外部连接器配置,如存储在 secret 中的用户访问凭证,请使用 KafkaConnect 资源的 template 属性。您还可以使用 配置提供程序 来加载值。

9.8.1. 手动停止或暂停 Kafka Connect 连接器

如果您使用 KafkaConnector 资源配置连接器,请使用 state 配置来停止或暂停连接器。与连接器和任务保持实例化的暂停状态不同,停止连接器只保留配置,且没有活跃进程。从运行停止连接器可能更适合长时间运行,而不是只暂停。虽然暂停的连接器速度更快恢复,但已停止的连接器具有释放内存和资源的优点。

注意

state 配置替换 KafkaConnectorSpec 模式中的(已弃用) pause 配置,它允许在连接器上暂停。如果您之前使用 pause 配置来暂停连接器,我们建议您只使用 state 配置过渡到 以避免冲突。

先决条件

  • Cluster Operator 正在运行。

流程

  1. 查找控制您要暂停或停止连接器的 KafkaConnector 自定义资源的名称:

    oc get KafkaConnector
    Copy to Clipboard
  2. 编辑 KafkaConnector 资源,以停止或暂停连接器。

    停止 Kafka Connect 连接器的配置示例

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      name: my-source-connector
      labels:
        strimzi.io/cluster: my-connect-cluster
    spec:
      class: org.apache.kafka.connect.file.FileStreamSourceConnector
      tasksMax: 2
      config:
        file: "/opt/kafka/LICENSE"
        topic: my-topic
      state: stopped
      # ...
    Copy to Clipboard

    state 配置更改为 stoppedpaused。当此属性没有设置时,连接器的默认状态为 running

  3. KafkaConnector 配置应用更改。

    您可以通过将 state 改为 running,或删除配置来恢复连接器。

注意

另外,您可以 公开 Kafka Connect API,并使用 stoppause 端点停止连接器运行。例如,PUT /connectors/<connector_name>/stop。然后,您可以使用 resume 端点重启它。

9.8.2. 手动重启 Kafka 连接连接器

如果您使用 KafkaConnector 资源来管理连接器,请使用 strimzi.io/restart 注解来手动触发连接器的重启。

先决条件

  • Cluster Operator 正在运行。

流程

  1. 查找控制您要重启的 Kafka 连接器的 KafkaConnector 自定义资源的名称:

    oc get KafkaConnector
    Copy to Clipboard
  2. 通过在 OpenShift 中注解 KafkaConnector 资源来重启连接器。

    oc annotate KafkaConnector <kafka_connector_name> strimzi.io/restart="true"
    Copy to Clipboard

    重启 注解设置为 true

  3. 等待下一个协调发生(默认为两分钟)。

    只要协调过程检测到注解,Kafka 连接器就会重启。当 Kafka Connect 接受重启请求时,注解会从 KafkaConnector 自定义资源中删除。

9.8.3. 手动重启 Kafka Connect 连接器任务

如果您使用 KafkaConnector 资源来管理连接器,请使用 strimzi.io/restart-task 注解来手动触发连接器任务的重启。

先决条件

  • Cluster Operator 正在运行。

流程

  1. 查找控制您要重启的 Kafka 连接器任务的 KafkaConnector 自定义资源名称:

    oc get KafkaConnector
    Copy to Clipboard
  2. 查找从 KafkaConnector 自定义资源重启的任务 ID:

    oc describe KafkaConnector <kafka_connector_name>
    Copy to Clipboard

    任务 ID 是非负整数,从 0 开始。

  3. 通过注解 OpenShift 中的 KafkaConnector 资源,使用 ID 重启连接器任务:

    oc annotate KafkaConnector <kafka_connector_name> strimzi.io/restart-task="0"
    Copy to Clipboard

    在本例中,任务 0 被重启。

  4. 等待下一个协调发生(默认为两分钟)。

    Kafka 连接器任务会重启,只要协调过程检测到注解。当 Kafka Connect 接受重启请求时,注解会从 KafkaConnector 自定义资源中删除。

返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat