第 27 章 管理 AMQ Streams


管理 AMQ Streams 需要执行各种任务来保持 Kafka 集群和相关资源平稳运行。使用 oc 命令检查资源的状态,为滚动更新配置维护窗口,并利用 AMQ Streams Drain Cleaner 和 Kafka Static Quota 插件等工具来有效地管理部署。

27.1. 使用自定义资源

您可以使用 oc 命令检索信息,并在 AMQ Streams 自定义资源上执行其他操作。

通过将 oc 与自定义资源 的状态 子资源一起使用,您可以获取有关资源的信息。

27.1.1. 对自定义资源执行 oc 操作

使用 oc 命令,如 get, describe, edit, 或 delete, 对资源类型执行操作。例如,oc get kafkatopics 检索所有 Kafka 主题和 oc get kafkas 列表,检索所有部署的 Kafka 集群。

当引用资源类型时,您可以使用单数和复数名称: oc get kafkas 获取与 oc get kafka 相同的结果。

您还可以使用资源 的短名称。学习短名称可在管理 AMQ Streams 时节省时间。Kafka 的短名称为 k,因此您也可以运行 oc get k 来列出所有 Kafka 集群。

oc get k

NAME         DESIRED KAFKA REPLICAS   DESIRED ZK REPLICAS
my-cluster   3                        3
Copy to Clipboard Toggle word wrap
Expand
表 27.1. 每个 AMQ Streams 资源的长和短名称
AMQ Streams 资源长名称短名称

Kafka

kafka

k

Kafka 节点池

kafkanodepool

knp

Kafka 主题

kafkatopic

kt

Kafka 用户

kafkauser

ku

Kafka Connect

kafkaconnect

kc

Kafka Connector

kafkaconnector

kctr

Kafka Mirror Maker

kafkamirrormaker

kmm

Kafka Mirror Maker 2

kafkamirrormaker2

kmm2

Kafka Bridge

kafkabridge

kb

Kafka Rebalance

kafkarebalance

kr

27.1.1.1. 资源类别

自定义资源的类别也可以在 oc 命令中使用。

所有 AMQ Streams 自定义资源都属于类别 strimzi,因此您可以使用 strimzi 来通过一个命令获取所有 AMQ Streams 资源。

例如,运行 oc get strimzi 列出了给定命名空间中的所有 AMQ Streams 自定义资源。

oc get strimzi

NAME                                   DESIRED KAFKA REPLICAS DESIRED ZK REPLICAS
kafka.kafka.strimzi.io/my-cluster      3                      3

NAME                                   PARTITIONS REPLICATION FACTOR
kafkatopic.kafka.strimzi.io/kafka-apps 3          3

NAME                                   AUTHENTICATION AUTHORIZATION
kafkauser.kafka.strimzi.io/my-user     tls            simple
Copy to Clipboard Toggle word wrap

oc get strimzi -o name 命令返回所有资源类型和资源名称。-o name 选项以 type/name 格式获取输出

oc get strimzi -o name

kafka.kafka.strimzi.io/my-cluster
kafkatopic.kafka.strimzi.io/kafka-apps
kafkauser.kafka.strimzi.io/my-user
Copy to Clipboard Toggle word wrap

您可以将这个 strimzi 命令与其他命令合并。例如,您可以将其传给 oc delete 命令,以删除单个命令中的所有资源。

oc delete $(oc get strimzi -o name)

kafka.kafka.strimzi.io "my-cluster" deleted
kafkatopic.kafka.strimzi.io "kafka-apps" deleted
kafkauser.kafka.strimzi.io "my-user" deleted
Copy to Clipboard Toggle word wrap

删除单个操作中的所有资源可能很有用,例如在测试新的 AMQ Streams 功能时。

27.1.1.2. 查询子资源的状态

您可以使用其他值传递给 -o 选项。例如,通过使用 -o yaml,您可以以 YAML 格式获取输出。使用 -o json 将返回 JSON。

您可以查看 oc get --help 中的所有选项。

其中一个最有用的选项是 JSONPath 支持,它允许您传递 JSONPath 表达式来查询 Kubernetes API。JSONPath 表达式可以提取或导航任何资源的特定部分。

例如,您可以使用 JSONPath 表达式 {.status.listeners[? (@.name=="tls")].bootstrapServers} 从 Kafka 自定义资源的状态中获取 bootstrap 地址,并在 Kafka 客户端中使用它。

在这里,命令查找名为 tls 的监听程序的 bootstrapServers 值:

oc get kafka my-cluster -o=jsonpath='{.status.listeners[?(@.name=="tls")].bootstrapServers}{"\n"}'

my-cluster-kafka-bootstrap.myproject.svc:9093
Copy to Clipboard Toggle word wrap

通过更改 name 条件,您还可以获取其他 Kafka 侦听程序的地址。

您可以使用 jsonpath 从任何自定义资源中提取任何其他属性或属性组。

27.1.2. AMQ Streams 自定义资源状态信息

status 属性提供某些自定义资源的状态信息。

下表列出了提供状态信息(部署时)以及定义 status 属性的 schema 的自定义资源。

如需有关 schema 的更多信息,请参阅 AMQ Streams 自定义资源 API 参考

Expand
表 27.2. 提供状态信息的自定义资源
AMQ Streams 资源模式参考在…​ 中发布状态信息

Kafka

KafkaStatus 模式参考

Kafka 集群

KafkaTopic

KafkaTopicStatus 模式参考

Kafka 集群中的 Kafka 主题

KafkaUser

KafkaUserStatus 模式参考

Kafka 集群中的 Kafka 用户

KafkaConnect

KafkaConnectStatus schema 参考

Kafka Connect 集群

KafkaConnector

KafkaConnectorStatus 模式参考

KafkaConnector 资源

KafkaMirrorMaker2

KafkaMirrorMaker2Status 模式参考

Kafka MirrorMaker 2 集群

KafkaMirrorMaker

KafkaMirrorMakerStatus 模式参考

Kafka MirrorMaker 集群

KafkaBridge

KafkaBridgeStatus schema reference

AMQ Streams Kafka Bridge

KafkaRebalance

KafkaRebalance 模式参考

重新平衡的状态和结果

资源的 status 属性提供有关资源状态的信息。status.conditionsstatus.observedGeneration 属性适用于所有资源。

status.conditions
状态条件描述了资源的当前状态。状态条件属性可用于跟踪与资源相关的进度,实现 其所需状态,如 spec 中指定的配置所定义。Status 条件属性提供资源更改的时间和原因,以及防止或延迟 Operator 正常状态的事件详情。
status.observedGeneration
最后观察到的生成表示 Cluster Operator 资源的最新协调。如果 observedGeneration 的值与 metadata.generation 的值(部署的当前版本)不同,Operator 尚未处理对资源的最新更新。如果这些值相同,状态信息反映了资源的最新更改。

status 属性还提供特定于资源的信息。例如,KafkaStatus 提供有关侦听器地址的信息,以及 Kafka 集群的 ID。

KafkaStatus 还提供有关使用的 Kafka 和 AMQ Streams 版本的信息。您可以检查 operatorLastSuccessfulVersionkafkaVersion 的值,以确定 AMQ Streams 或 Kafka 的升级是否已完成

AMQ Streams 创建和维护自定义资源的状态,定期评估自定义资源的当前状态并相应地更新其状态。当使用 oc edit 在自定义资源上执行更新时,则其状态不可编辑。另外,更改 状态 不会影响 Kafka 集群的配置。

在这里,我们看到 Kafka 自定义资源的状态 属性。

Kafka 自定义资源状态

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
spec:
  # ...
status:
  clusterId: XP9FP2P-RByvEy0W4cOEUA 
1

  conditions: 
2

    - lastTransitionTime: '2023-01-20T17:56:29.396588Z'
      status: 'True'
      type: Ready 
3

  kafkaVersion: 3.6.0 
4

  listeners: 
5

    - addresses:
        - host: my-cluster-kafka-bootstrap.prm-project.svc
          port: 9092
      bootstrapServers: 'my-cluster-kafka-bootstrap.prm-project.svc:9092'
      name: plain
    - addresses:
        - host: my-cluster-kafka-bootstrap.prm-project.svc
          port: 9093
      bootstrapServers: 'my-cluster-kafka-bootstrap.prm-project.svc:9093'
      certificates:
        - |
          -----BEGIN CERTIFICATE-----

          -----END CERTIFICATE-----
      name: tls
    - addresses:
        - host: >-
            2054284155.us-east-2.elb.amazonaws.com
          port: 9095
      bootstrapServers: >-
        2054284155.us-east-2.elb.amazonaws.com:9095
      certificates:
        - |
          -----BEGIN CERTIFICATE-----

          -----END CERTIFICATE-----
      name: external3
    - addresses:
        - host: ip-10-0-172-202.us-east-2.compute.internal
          port: 31644
      bootstrapServers: 'ip-10-0-172-202.us-east-2.compute.internal:31644'
      certificates:
        - |
          -----BEGIN CERTIFICATE-----

          -----END CERTIFICATE-----
      name: external4
  observedGeneration: 3 
6

  operatorLastSuccessfulVersion: 2.6 
7
Copy to Clipboard Toggle word wrap

1
Kafka 集群 ID。
2
状态条件 描述了 Kafka 集群的当前状态。
3
Ready 条件表示 Cluster Operator 认为 Kafka 集群可以处理流量。
4
Kafka 集群使用的 Kafka 版本。
5
监听器根据类型 描述 Kafka bootstrap 地址。
6
observedGeneration 值表示 Cluster Operator 的 Kafka 自定义资源的最后协调。
7
成功完成最后一次协调的 Operator 版本。
注意

状态中列出的 Kafka bootstrap 地址没有表示这些端点或 Kafka 集群处于 Ready 状态。

访问状态信息

您可以从命令行访问资源的状态信息。更多信息请参阅 第 27.1.3 节 “查找自定义资源的状态”

27.1.3. 查找自定义资源的状态

这个步骤描述了如何查找自定义资源的状态。

先决条件

  • 一个 OpenShift 集群。
  • Cluster Operator 正在运行。

流程

  • 指定自定义资源,并使用 -o jsonpath 选项应用标准 JSONPath 表达式来选择 status 属性:

    oc get kafka <kafka_resource_name> -o jsonpath='{.status}'
    Copy to Clipboard Toggle word wrap

    此表达式返回指定自定义资源的所有状态信息。您可以使用点表示法(如 status.listenersstatus.observedGeneration )来微调您要查看的状态信息。

返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat