7.3. 使用 Topic Operator


当您使用 KafkaTopic 资源创建、修改或删除主题时,主题 Operator 可确保这些更改反映在 Kafka 集群中。

如需有关 KafkaTopic 资源的更多信息,请参阅 KafkaTopic 模式参考

部署 topics Operator

您可以使用 Cluster Operator 或独立 Operator 部署 Topic Operator。您要将独立 Topic Operator 与不由 Cluster Operator 管理的 Kafka 集群一起使用。

有关部署说明,请查看以下操作:

重要

要部署独立主题 Operator,您需要设置环境变量以连接到 Kafka 集群。如果要使用 Cluster Operator 部署 Topic Operator,则不需要设置这些环境变量,因为 Cluster Operator 将设置它们。

7.3.1. Kafka 主题资源

KafkaTopic 资源用于配置主题,包括分区和副本的数量。

KafkaTopic 的完整模式信息包括在 KafkaTopic schema reference 中。

7.3.1.1. 为主题处理识别 Kafka 集群

KafkaTopic 资源包含一个标签,它指定 Kafka 集群的名称(来自 Kafka 资源的名称)的名称。

例如:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: topic-name-1
  labels:
    strimzi.io/cluster: my-cluster
Copy to Clipboard Toggle word wrap

Topic Operator 用来识别 KafkaTopic 资源并创建一个新主题,并在后续处理主题时使用该标签。

如果标签与 Kafka 集群不匹配,主题 Operator 无法识别 KafkaTopic,且不创建主题。

7.3.1.2. Kafka 主题使用建议

处理主题时,会一致。始终直接在 OpenShift 中操作 KafkaTopic 资源或主题。避免定期在这两种方法间切换给定主题。

使用反映主题性质的主题名称,并记住以后无法更改名称。

如果在 Kafka 中创建一个主题,请使用有效的 OpenShift 资源名称,否则主题 Operator 需要使用符合 OpenShift 规则的名称来创建对应的 KafkaTopic

注意

如需有关 OpenShift 中标识符和名称的要求的信息,请参阅对象名称和 ID

7.3.1.3. Kafka 主题命名约定

Kafka 和 OpenShift 会分别在 Kafka 和 KafkaTopic.metadata.name 中对主题进行自己的验证规则。彼此都无效,它们都有有效的名称。

使用 spec.topicName 属性,可以在 Kafka 中创建一个有效的主题,其名称对 OpenShift 中的 Kafka 主题无效。

spec.topicName 属性继承 Kafka 命名验证规则:

  • 名称不得多于 249 个字符。
  • Kafka 主题的有效字符包括 ASCII 字母数字、._-
  • 名称不能是 ..,但 . 可在名称中使用,如 exampleTopic..exampleTopic

不能更改 spec.topicName

例如:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: topic-name-1
spec:
  topicName: topicName-1 
1

  # ...
Copy to Clipboard Toggle word wrap
1
在 OpenShift 中,大写无效。

无法更改为:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: topic-name-1
spec:
  topicName: name-2
  # ...
Copy to Clipboard Toggle word wrap
注意

有些 Kafka 客户端应用程序(如 Kafka Streams)可以以编程方式在 Kafka 中创建主题。如果这些主题具有无效的 OpenShift 资源名称的名称,主题 Operator 会根据 Kafka 名称赋予它们有效的 metadata.name。无效的字符会被替换,哈希会附加到名称中。例如:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: mytopic---c55e57fe2546a33f9e603caf57165db4072e827e
spec:
  topicName: myTopic
  # ...
Copy to Clipboard Toggle word wrap

7.3.2. 主题 Operator 主题存储

主题 Operator 使用 Kafka 将主题元数据存储为键值对描述主题配置。主题存储 基于 Kafka Streams 键值机制,使用 Kafka 主题来持久保留状态。

主题元数据缓存在内存中,并在主题 Operator 中本地访问。应用到本地内存缓存的操作更新会被保留到磁盘上的备份主题存储中。主题存储会持续与 Kafka 主题或 OpenShift KafkaTopic 自定义资源的更新同步。操作通过通过这个主题存储设置的快速处理,但应该从持久性存储中自动填充内存缓存崩溃。

7.3.2.1. 内部主题存储主题

内部主题支持处理主题存储中的主题元数据。

__strimzi_store_topic
用于存储主题元数据的输入主题
__strimzi-topic-operator-kstreams-topic-store-changelog
保留紧凑主题存储值的日志
警告

不要删除这些主题,因为它们对于 Topic Operator 的运行至关重要。

7.3.2.2. 从 ZooKeeper 迁移主题元数据

在 AMQ Streams 之前的版本中,主题元数据存储在 ZooKeeper 中。此过程将删除此要求,将元数据引入到 Kafka 集群中,并在主题 Operator 的控制下。

当升级到 AMQ Streams 2.3 时,到主题存储的 Operator 控制是无缝的。从 ZooKeeper 找到并迁移元数据,旧的存储会被删除。

如果您恢复到 1.7 之前的 AMQ Streams 版本,该版本使用 ZooKeeper 用于主题元数据的存储,您仍然会将 Cluster Operator 降级到以前的版本,然后降级 Kafka 代理和客户端应用程序作为标准。

但是,还必须使用 kafka-admin 命令删除为主题存储创建的主题,并指定 Kafka 集群的 bootstrap 地址。例如:

oc run kafka-admin -ti --image=registry.redhat.io/amq7/amq-streams-kafka-33-rhel8:2.3.0 --rm=true --restart=Never -- ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic __strimzi-topic-operator-kstreams-topic-store-changelog --delete && ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic __strimzi_store_topic --delete
Copy to Clipboard Toggle word wrap

命令必须与用于访问 Kafka 集群的监听程序和身份验证类型对应。

主题 Operator 将从 Kafka 中主题状态重新创建 ZooKeeper 主题元数据。

7.3.2.4. 主题 Operator 主题复制和扩展

建议由主题 Operator 管理的主题配置是 3 个主题复制因素,至少 2 个同步副本。

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: my-topic
  labels:
    strimzi.io/cluster: my-cluster
spec:
  partitions: 10 
1

  replicas: 3 
2

  config:
    min.insync.replicas: 2 
3

  #...
Copy to Clipboard Toggle word wrap
1
主题的分区数量。
2
副本主题分区的数量。目前,这无法在 KafkaTopic 资源中更改,但可以使用 kafka-reassign-partitions.sh 工具进行修改。
3
必须成功写入消息的最少副本数,或者引发异常。
注意

同步副本与生产者应用的 acks 配置结合使用。acks 配置决定了必须将消息的后续分区数量复制到,然后才能确认成功接收消息。主题 Operator 使用 acks=all 运行,通过所有 in-sync 副本都必须确认信息。

当通过添加或删除代理扩展 Kafka 集群时,复制因素配置不会被更改,且不会自动重新分配副本。但是,您可以使用 kafka-reassign-partitions.sh 工具更改复制因素,并手动将副本重新分配给代理。

另外,虽然集成 AMQ Streams 的 Cruise Control 无法更改主题的复制因素,但它生成的优化建议会包括传输分区副本的命令以及更改分区领导的命令。

7.3.2.5. 处理对主题的更改

主题 Operator 需要解决的一个根本问题是没有单个数据源: KafkaTopic 资源和 Kafka 主题都独立于主题 Operator 修改。对这一点复杂,主题 Operator 可能不会始终能够实时观察变化。例如,当主题 Operator 停机时。

要解决这个问题,主题 Operator 会维护主题存储中每个主题的信息。当 Kafka 集群或 OpenShift 中发生更改时,它会查看其他系统和主题存储的状态,以确定一切保持同步的需求。每当主题 Operator 启动时,并定期发生同样的情况。

例如,假设 Topic Operator 未运行,并且创建了名为 my-topicKafkaTopic。当主题 Operator 启动时,主题存储不包含 my-topic 的信息,因此可在最后运行 KafkaTopic 后创建 KafkaTopic。主题 Operator 创建与 my-topic 对应的主题,并将 my-topic 的元数据存储在主题存储中。

如果您更新 Kafka 主题配置或通过 KafkaTopic 自定义资源应用更改,则在 Kafka 集群协调后会更新主题存储。

主题存储也允许 Topic Operator 来管理在 Kafka 主题中更改主题配置通过 OpenShift KafkaTopic 自定义资源进行更新的情况,只要这些更改没有不兼容。例如,可以更改相同的主题配置键,但可以对不同的值进行更改。对于不兼容的更改,Kafka 配置会考虑优先级,并相应地更新 KafkaTopic

注意

您还可以使用 KafkaTopic 资源,使用 oc delete -f KAFKA-TOPIC-CONFIG-FILE 命令删除主题。要实现此目的,在 Kafka 资源的 spec.kafka.config 中必须将 delete.topic.enable 设置为 true (默认)。

7.3.3. 配置 Kafka 主题

使用 KafkaTopic 资源的属性来配置 Kafka 主题。

您可以使用 oc apply 来创建或修改主题,oc delete 来删除现有的主题。

例如:

  • oc apply -f <topic_config_file>
  • oc delete KafkaTopic <topic_name>

此流程演示了如何创建包含 10 个分区和 2 个副本的主题。

开始前

在进行更改前,您必须考虑以下问题:

  • Kafka 不支持减少分区数量。
  • 增加带有键的主题增加 spec.partitions 将改变记录分区的方式,这在主题使用 semantic partitioning 时会可能会有问题。
  • AMQ Streams 不支持通过 KafkaTopic 资源进行以下更改:

    • 使用 spec.replicas 更改初始指定副本数
    • 使用 spec.topicName 更改主题名称

先决条件

流程

  1. 配置 KafkaTopic 资源。

    Kafka 主题配置示例

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaTopic
    metadata:
      name: orders
      labels:
        strimzi.io/cluster: my-cluster
    spec:
      partitions: 10
      replicas: 2
    Copy to Clipboard Toggle word wrap

    提示

    在修改主题时,您可以使用 oc get kafkatopic orders -o yaml 来获取资源的当前版本。

  2. 在 OpenShift 中创建 KafkaTopic 资源。

    oc apply -f <topic_config_file>
    Copy to Clipboard Toggle word wrap
  3. 等待主题的就绪状态更改为 True

    oc get kafkatopics -o wide -w -n <namespace>
    Copy to Clipboard Toggle word wrap

    Kafka 主题状态

    NAME         CLUSTER     PARTITIONS  REPLICATION FACTOR READY
    my-topic-1   my-cluster  10          3                  True
    my-topic-2   my-cluster  10          3
    my-topic-3   my-cluster  10          3                  True
    Copy to Clipboard Toggle word wrap

    READY 输出显示为 True 时,主题创建成功。

  4. 如果 READY 列留空,请从资源 YAML 或 Topic Operator 日志中获取有关状态的更多详细信息。

    消息提供有关当前状态的原因的详细信息。

    oc get kafkatopics my-topic-2 -o yaml
    Copy to Clipboard Toggle word wrap

    有关 NotReady 状态主题的详情

    # ...
    status:
      conditions:
      - lastTransitionTime: "2022-06-13T10:14:43.351550Z"
        message: Number of partitions cannot be decreased
        reason: PartitionDecreaseException
        status: "True"
        type: NotReady
    Copy to Clipboard Toggle word wrap

    在本例中,主题未就绪的原因是在 KafkaTopic 配置中减少原始分区数量。Kafka 不支持此功能。

    重置主题配置后,状态将显示主题为 ready。

    oc get kafkatopics my-topic-2 -o wide -w -n <namespace>
    Copy to Clipboard Toggle word wrap

    主题的状态更新

    NAME         CLUSTER     PARTITIONS  REPLICATION FACTOR READY
    my-topic-2   my-cluster  10          3                  True
    Copy to Clipboard Toggle word wrap

    获取详情不会显示任何信息

    oc get kafkatopics my-topic-2 -o yaml
    Copy to Clipboard Toggle word wrap

    有关具有 READY 状态的主题的详情

    # ...
    status:
      conditions:
      - lastTransitionTime: '2022-06-13T10:15:03.761084Z'
        status: 'True'
        type: Ready
    Copy to Clipboard Toggle word wrap

7.3.4. 使用资源请求和限值配置 Topic Operator

您可以将资源(如 CPU 和内存)分配给主题 Operator,并为其消耗的资源数量设置限制。

先决条件

  • Cluster Operator 正在运行。

流程

  1. 根据需要,在编辑器中更新 Kafka 集群配置:

    oc edit kafka MY-CLUSTER
    Copy to Clipboard Toggle word wrap
  2. Kafka 资源的 spec.entityOperator.topicOperator.resources 属性中,为 Topic Operator 设置资源请求和限值。

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    spec:
      # Kafka and ZooKeeper sections...
      entityOperator:
        topicOperator:
          resources:
            requests:
              cpu: "1"
              memory: 500Mi
            limits:
              cpu: "1"
              memory: 500Mi
    Copy to Clipboard Toggle word wrap
  3. 应用新配置来创建或更新资源。

    oc apply -f <kafka_configuration_file>
    Copy to Clipboard Toggle word wrap
返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat