7.3. 使用 Topic Operator
当您使用 KafkaTopic 资源创建、修改或删除主题时,主题 Operator 可确保这些更改反映在 Kafka 集群中。
如需有关 KafkaTopic 资源的更多信息,请参阅 KafkaTopic 模式参考。
部署 topics Operator
您可以使用 Cluster Operator 或独立 Operator 部署 Topic Operator。您要将独立 Topic Operator 与不由 Cluster Operator 管理的 Kafka 集群一起使用。
有关部署说明,请查看以下操作:
要部署独立主题 Operator,您需要设置环境变量以连接到 Kafka 集群。如果要使用 Cluster Operator 部署 Topic Operator,则不需要设置这些环境变量,因为 Cluster Operator 将设置它们。
7.3.1. Kafka 主题资源 复制链接链接已复制到粘贴板!
KafkaTopic 资源用于配置主题,包括分区和副本的数量。
KafkaTopic 的完整模式信息包括在 KafkaTopic schema reference 中。
7.3.1.1. 为主题处理识别 Kafka 集群 复制链接链接已复制到粘贴板!
KafkaTopic 资源包含一个标签,它指定 Kafka 集群的名称(来自 Kafka 资源的名称)的名称。
例如:
Topic Operator 用来识别 KafkaTopic 资源并创建一个新主题,并在后续处理主题时使用该标签。
如果标签与 Kafka 集群不匹配,主题 Operator 无法识别 KafkaTopic,且不创建主题。
7.3.1.2. Kafka 主题使用建议 复制链接链接已复制到粘贴板!
处理主题时,会一致。始终直接在 OpenShift 中操作 KafkaTopic 资源或主题。避免定期在这两种方法间切换给定主题。
使用反映主题性质的主题名称,并记住以后无法更改名称。
如果在 Kafka 中创建一个主题,请使用有效的 OpenShift 资源名称,否则主题 Operator 需要使用符合 OpenShift 规则的名称来创建对应的 KafkaTopic。
如需有关 OpenShift 中标识符和名称的要求的信息,请参阅对象名称和 ID。
7.3.1.3. Kafka 主题命名约定 复制链接链接已复制到粘贴板!
Kafka 和 OpenShift 会分别在 Kafka 和 KafkaTopic.metadata.name 中对主题进行自己的验证规则。彼此都无效,它们都有有效的名称。
使用 spec.topicName 属性,可以在 Kafka 中创建一个有效的主题,其名称对 OpenShift 中的 Kafka 主题无效。
spec.topicName 属性继承 Kafka 命名验证规则:
- 名称不得多于 249 个字符。
-
Kafka 主题的有效字符包括 ASCII 字母数字、
.、_和-。 -
名称不能是
.或.,但.可在名称中使用,如exampleTopic.或.exampleTopic。
不能更改 spec.topicName。
例如:
- 1
- 在 OpenShift 中,大写无效。
无法更改为:
有些 Kafka 客户端应用程序(如 Kafka Streams)可以以编程方式在 Kafka 中创建主题。如果这些主题具有无效的 OpenShift 资源名称的名称,主题 Operator 会根据 Kafka 名称赋予它们有效的 metadata.name。无效的字符会被替换,哈希会附加到名称中。例如:
7.3.2. 主题 Operator 主题存储 复制链接链接已复制到粘贴板!
主题 Operator 使用 Kafka 将主题元数据存储为键值对描述主题配置。主题存储 基于 Kafka Streams 键值机制,使用 Kafka 主题来持久保留状态。
主题元数据缓存在内存中,并在主题 Operator 中本地访问。应用到本地内存缓存的操作更新会被保留到磁盘上的备份主题存储中。主题存储会持续与 Kafka 主题或 OpenShift KafkaTopic 自定义资源的更新同步。操作通过通过这个主题存储设置的快速处理,但应该从持久性存储中自动填充内存缓存崩溃。
7.3.2.1. 内部主题存储主题 复制链接链接已复制到粘贴板!
内部主题支持处理主题存储中的主题元数据。
__strimzi_store_topic- 用于存储主题元数据的输入主题
__strimzi-topic-operator-kstreams-topic-store-changelog- 保留紧凑主题存储值的日志
不要删除这些主题,因为它们对于 Topic Operator 的运行至关重要。
7.3.2.2. 从 ZooKeeper 迁移主题元数据 复制链接链接已复制到粘贴板!
在 AMQ Streams 之前的版本中,主题元数据存储在 ZooKeeper 中。此过程将删除此要求,将元数据引入到 Kafka 集群中,并在主题 Operator 的控制下。
当升级到 AMQ Streams 2.3 时,到主题存储的 Operator 控制是无缝的。从 ZooKeeper 找到并迁移元数据,旧的存储会被删除。
7.3.2.3. 降级到使用 ZooKeeper 存储主题元数据的 AMQ Streams 版本 复制链接链接已复制到粘贴板!
如果您恢复到 1.7 之前的 AMQ Streams 版本,该版本使用 ZooKeeper 用于主题元数据的存储,您仍然会将 Cluster Operator 降级到以前的版本,然后降级 Kafka 代理和客户端应用程序作为标准。
但是,还必须使用 kafka-admin 命令删除为主题存储创建的主题,并指定 Kafka 集群的 bootstrap 地址。例如:
oc run kafka-admin -ti --image=registry.redhat.io/amq7/amq-streams-kafka-33-rhel8:2.3.0 --rm=true --restart=Never -- ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic __strimzi-topic-operator-kstreams-topic-store-changelog --delete && ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic __strimzi_store_topic --delete
oc run kafka-admin -ti --image=registry.redhat.io/amq7/amq-streams-kafka-33-rhel8:2.3.0 --rm=true --restart=Never -- ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic __strimzi-topic-operator-kstreams-topic-store-changelog --delete && ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic __strimzi_store_topic --delete
命令必须与用于访问 Kafka 集群的监听程序和身份验证类型对应。
主题 Operator 将从 Kafka 中主题状态重新创建 ZooKeeper 主题元数据。
7.3.2.4. 主题 Operator 主题复制和扩展 复制链接链接已复制到粘贴板!
建议由主题 Operator 管理的主题配置是 3 个主题复制因素,至少 2 个同步副本。
同步副本与生产者应用的 acks 配置结合使用。acks 配置决定了必须将消息的后续分区数量复制到,然后才能确认成功接收消息。主题 Operator 使用 acks=all 运行,通过所有 in-sync 副本都必须确认信息。
当通过添加或删除代理扩展 Kafka 集群时,复制因素配置不会被更改,且不会自动重新分配副本。但是,您可以使用 kafka-reassign-partitions.sh 工具更改复制因素,并手动将副本重新分配给代理。
另外,虽然集成 AMQ Streams 的 Cruise Control 无法更改主题的复制因素,但它生成的优化建议会包括传输分区副本的命令以及更改分区领导的命令。
7.3.2.5. 处理对主题的更改 复制链接链接已复制到粘贴板!
主题 Operator 需要解决的一个根本问题是没有单个数据源: KafkaTopic 资源和 Kafka 主题都独立于主题 Operator 修改。对这一点复杂,主题 Operator 可能不会始终能够实时观察变化。例如,当主题 Operator 停机时。
要解决这个问题,主题 Operator 会维护主题存储中每个主题的信息。当 Kafka 集群或 OpenShift 中发生更改时,它会查看其他系统和主题存储的状态,以确定一切保持同步的需求。每当主题 Operator 启动时,并定期发生同样的情况。
例如,假设 Topic Operator 未运行,并且创建了名为 my-topic 的 KafkaTopic。当主题 Operator 启动时,主题存储不包含 my-topic 的信息,因此可在最后运行 KafkaTopic 后创建 KafkaTopic。主题 Operator 创建与 my-topic 对应的主题,并将 my-topic 的元数据存储在主题存储中。
如果您更新 Kafka 主题配置或通过 KafkaTopic 自定义资源应用更改,则在 Kafka 集群协调后会更新主题存储。
主题存储也允许 Topic Operator 来管理在 Kafka 主题中更改主题配置和通过 OpenShift KafkaTopic 自定义资源进行更新的情况,只要这些更改没有不兼容。例如,可以更改相同的主题配置键,但可以对不同的值进行更改。对于不兼容的更改,Kafka 配置会考虑优先级,并相应地更新 KafkaTopic。
您还可以使用 KafkaTopic 资源,使用 oc delete -f KAFKA-TOPIC-CONFIG-FILE 命令删除主题。要实现此目的,在 Kafka 资源的 spec.kafka.config 中必须将 delete.topic.enable 设置为 true (默认)。
7.3.3. 配置 Kafka 主题 复制链接链接已复制到粘贴板!
使用 KafkaTopic 资源的属性来配置 Kafka 主题。
您可以使用 oc apply 来创建或修改主题,oc delete 来删除现有的主题。
例如:
-
oc apply -f <topic_config_file> -
oc delete KafkaTopic <topic_name>
此流程演示了如何创建包含 10 个分区和 2 个副本的主题。
开始前
在进行更改前,您必须考虑以下问题:
- Kafka 不支持减少分区数量。
-
增加带有键的主题增加
spec.partitions将改变记录分区的方式,这在主题使用 semantic partitioning 时会可能会有问题。 AMQ Streams 不支持通过
KafkaTopic资源进行以下更改:-
使用
spec.replicas更改初始指定副本数 -
使用
spec.topicName更改主题名称
-
使用
先决条件
- 使用 mTLS 身份验证和 TLS 加密,运行带有 Kafka 代理监听程序 的 Kafka 集群。
- 正在运行的主题 Operator (通常使用 Entity Operator 部署)。
-
要删除主题,请在
Kafka资源的spec.kafka.config中的delete.topic.enable=true(默认)。
流程
配置
KafkaTopic资源。Kafka 主题配置示例
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 提示在修改主题时,您可以使用
oc get kafkatopic orders -o yaml来获取资源的当前版本。在 OpenShift 中创建
KafkaTopic资源。oc apply -f <topic_config_file>
oc apply -f <topic_config_file>Copy to Clipboard Copied! Toggle word wrap Toggle overflow 等待主题的就绪状态更改为
True:oc get kafkatopics -o wide -w -n <namespace>
oc get kafkatopics -o wide -w -n <namespace>Copy to Clipboard Copied! Toggle word wrap Toggle overflow Kafka 主题状态
NAME CLUSTER PARTITIONS REPLICATION FACTOR READY my-topic-1 my-cluster 10 3 True my-topic-2 my-cluster 10 3 my-topic-3 my-cluster 10 3 True
NAME CLUSTER PARTITIONS REPLICATION FACTOR READY my-topic-1 my-cluster 10 3 True my-topic-2 my-cluster 10 3 my-topic-3 my-cluster 10 3 TrueCopy to Clipboard Copied! Toggle word wrap Toggle overflow 当
READY输出显示为True时,主题创建成功。如果
READY列留空,请从资源 YAML 或 Topic Operator 日志中获取有关状态的更多详细信息。消息提供有关当前状态的原因的详细信息。
oc get kafkatopics my-topic-2 -o yaml
oc get kafkatopics my-topic-2 -o yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow 有关
NotReady状态主题的详情Copy to Clipboard Copied! Toggle word wrap Toggle overflow 在本例中,主题未就绪的原因是在
KafkaTopic配置中减少原始分区数量。Kafka 不支持此功能。重置主题配置后,状态将显示主题为 ready。
oc get kafkatopics my-topic-2 -o wide -w -n <namespace>
oc get kafkatopics my-topic-2 -o wide -w -n <namespace>Copy to Clipboard Copied! Toggle word wrap Toggle overflow 主题的状态更新
NAME CLUSTER PARTITIONS REPLICATION FACTOR READY my-topic-2 my-cluster 10 3 True
NAME CLUSTER PARTITIONS REPLICATION FACTOR READY my-topic-2 my-cluster 10 3 TrueCopy to Clipboard Copied! Toggle word wrap Toggle overflow 获取详情不会显示任何信息
oc get kafkatopics my-topic-2 -o yaml
oc get kafkatopics my-topic-2 -o yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow 有关具有
READY状态的主题的详情Copy to Clipboard Copied! Toggle word wrap Toggle overflow
7.3.4. 使用资源请求和限值配置 Topic Operator 复制链接链接已复制到粘贴板!
您可以将资源(如 CPU 和内存)分配给主题 Operator,并为其消耗的资源数量设置限制。
先决条件
- Cluster Operator 正在运行。
流程
根据需要,在编辑器中更新 Kafka 集群配置:
oc edit kafka MY-CLUSTER
oc edit kafka MY-CLUSTERCopy to Clipboard Copied! Toggle word wrap Toggle overflow 在
Kafka资源的spec.entityOperator.topicOperator.resources属性中,为 Topic Operator 设置资源请求和限值。Copy to Clipboard Copied! Toggle word wrap Toggle overflow 应用新配置来创建或更新资源。
oc apply -f <kafka_configuration_file>
oc apply -f <kafka_configuration_file>Copy to Clipboard Copied! Toggle word wrap Toggle overflow