2.2. Kafka 集群配置


使用 Kafka 资源配置 Kafka 部署。Kafka 集群使用 ZooKeeper 集群部署,因此配置选项也可用于 Kafka 资源中的 ZooKeeper。Entity Operator 包括 Topic Operator 和 User Operator。您还可以在 Kafka 资源中配置 entityOperator 属性,以在部署中包含 Topic Operator 和 User Operator。

第 6.2.1 节 “Kafka 模式参考” 描述 Kafka 资源的完整模式。

有关 Apache Kafka 的更多信息,请参阅 Apache Kafka 文档

侦听器配置

您可以配置监听程序,将客户端连接到 Kafka 代理。有关配置监听程序的更多信息,请参阅 第 6.2.4 节 “GenericKafkaListener 模式参考”

管理 TLS 证书

在部署 Kafka 时,Cluster Operator 会自动设置和更新 TLS 证书,以便在集群中启用加密和身份验证。如果需要,您可以在续订周期开始前手动续订集群和客户端 CA 证书。您还可以替换集群和客户端 CA 证书使用的密钥。如需更多信息,请参阅 手动续订 CA 证书 并替换私钥

2.2.1. 配置 Kafka

使用 Kafka 资源的属性来配置 Kafka 部署。

另外,您还可以配置 Kafka,您可以为 ZooKeeper 和 AMQ Streams Operator 添加配置。常见的配置属性(如日志记录和健康检查)是为每个组件独立配置。

这个过程仅显示一些可能的配置选项,但特别重要的配置选项包括:

  • 资源请求(CPU / Memory)
  • 用于最大和最小内存分配的 JVM 选项
  • 监听器(以及客户端的身份验证)
  • 身份验证
  • 存储
  • 机架感知
  • 指标
  • 用于集群重新平衡的 Cruise Control

Kafka 版本

Kafka configinter.broker.protocol.version 属性必须是指定的 Kafka 版本 (spec.kafka.version) 支持的版本。属性表示 Kafka 集群中使用的 Kafka 协议版本。

在 Kafka 3.0.0 中,当 inter.broker.protocol.version 设置为 3.0 或更高版本时,log.message.format.version 选项会被忽略,不需要设置。

当升级 Kafka 版本时,需要对 inter.broker.protocol.version 进行更新。如需更多信息,请参阅升级 Kafka

先决条件

  • 一个 OpenShift 集群
  • 正在运行的 Cluster Operator

有关部署的步骤,请参阅在 OpenShift 中部署和升级 AMQ Streams 指南:

流程

  1. 编辑 Kafka 资源的 spec 属性。

    您可以配置的属性显示在以下示例配置中:

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        replicas: 3 1
        version: 3.4.0 2
        logging: 3
          type: inline
          loggers:
            kafka.root.logger.level: "INFO"
        resources: 4
          requests:
            memory: 64Gi
            cpu: "8"
          limits:
            memory: 64Gi
            cpu: "12"
        readinessProbe: 5
          initialDelaySeconds: 15
          timeoutSeconds: 5
        livenessProbe:
          initialDelaySeconds: 15
          timeoutSeconds: 5
        jvmOptions: 6
          -Xms: 8192m
          -Xmx: 8192m
        image: my-org/my-image:latest 7
        listeners: 8
          - name: plain 9
            port: 9092 10
            type: internal 11
            tls: false 12
            configuration:
              useServiceDnsDomain: true 13
          - name: tls
            port: 9093
            type: internal
            tls: true
            authentication: 14
              type: tls
          - name: external 15
            port: 9094
            type: route
            tls: true
            configuration:
              brokerCertChainAndKey: 16
                secretName: my-secret
                certificate: my-certificate.crt
                key: my-key.key
        authorization: 17
          type: simple
        config: 18
          auto.create.topics.enable: "false"
          offsets.topic.replication.factor: 3
          transaction.state.log.replication.factor: 3
          transaction.state.log.min.isr: 2
          default.replication.factor: 3
          min.insync.replicas: 2
          inter.broker.protocol.version: "3.4"
          ssl.cipher.suites: TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384 19
          ssl.enabled.protocols: TLSv1.2
          ssl.protocol: TLSv1.2
        storage: 20
          type: persistent-claim 21
          size: 10000Gi 22
        rack: 23
          topologyKey: topology.kubernetes.io/zone
        metricsConfig: 24
          type: jmxPrometheusExporter
          valueFrom:
            configMapKeyRef: 25
              name: my-config-map
              key: my-key
        # ...
      zookeeper: 26
        replicas: 3 27
        logging: 28
          type: inline
          loggers:
            zookeeper.root.logger: "INFO"
        resources:
          requests:
            memory: 8Gi
            cpu: "2"
          limits:
            memory: 8Gi
            cpu: "2"
        jvmOptions:
          -Xms: 4096m
          -Xmx: 4096m
        storage:
          type: persistent-claim
          size: 1000Gi
        metricsConfig:
          # ...
      entityOperator: 29
        tlsSidecar: 30
          resources:
            requests:
              cpu: 200m
              memory: 64Mi
            limits:
              cpu: 500m
              memory: 128Mi
        topicOperator:
          watchedNamespace: my-topic-namespace
          reconciliationIntervalSeconds: 60
          logging: 31
            type: inline
            loggers:
              rootLogger.level: "INFO"
          resources:
            requests:
              memory: 512Mi
              cpu: "1"
            limits:
              memory: 512Mi
              cpu: "1"
        userOperator:
          watchedNamespace: my-topic-namespace
          reconciliationIntervalSeconds: 60
          logging: 32
            type: inline
            loggers:
              rootLogger.level: INFO
          resources:
            requests:
              memory: 512Mi
              cpu: "1"
            limits:
              memory: 512Mi
              cpu: "1"
      kafkaExporter: 33
        # ...
      cruiseControl: 34
        # ...
    1
    2
    Kafka 版本,可按照 升级过程改为受支持的版本。
    3
    Kafka 日志记录器和日志级别 直接(内联)或通过 ConfigMap 间接添加(外部)。自定义 ConfigMap 必须放在 log4j.properties 键下。对于 Kafka kafka.root.logger.level 日志记录器,您可以将日志级别设置为 INFO, ERROR, WARN, TRACE, DEBUG, FATAL 或 OFF。
    4
    支持的资源、当前 cpu 和内存的 保留请求,以及指定可消耗的最大资源的限制。
    5
    使用 HealthCheck 可以知道何时重启一个容器(存活度)以及何时一个容器可以开始接受流量(就绪度)。
    6
    JVM 配置选项,用于优化运行 Kafka 的虚拟机(VM)的性能。
    7
    ADVANCED OPTION: 容器镜像配置,这只在特殊情况下建议使用。
    8
    侦听器配置客户端如何通过 bootstrap 地址连接到 Kafka 集群。监听器被配置为 internalexternal 监听器,用于来自 OpenShift 集群内部或外的连接
    9
    用于标识监听程序的名称。在 Kafka 集群中必须是唯一的。
    10
    Kafka 中监听器使用的端口号。端口号必须在给定的 Kafka 集群中唯一。允许的端口号为 9092 及更高版本,但端口 9404 和 9999 除外,它们已用于 Prometheus 和 JMX。根据监听程序类型,端口号可能与连接 Kafka 客户端的端口号不同。
    11
    监听程序指定为 internalcluster-ip (使用每个代理的 ClusterIP 服务公开 Kafka),或为外部监听程序指定为 route(只限 OpenShift), loadbalancer, nodeportingress(只限 Kubernetes)。
    12
    为每个监听程序启用 TLS 加密。默认为 false路由 监听程序不需要 TLS 加密。
    13
    定义是否分配了包括集群服务后缀(通常为 .cluster.local)的完全限定 DNS 名称。
    14
    15
    16
    由外部 CA (证书授权)管理的 Kafka 侦听器证书 的可选配置。brokerCertChainAndKey 指定包含服务器证书和私钥的 Secret。您可以在任何启用了 TLS 加密的监听程序中配置 Kafka 侦听器证书。
    17
    授权在 Kafka 代理上启用 simple、OAUTH 2.0 或 OPA 授权。简单授权使用 AclAuthorizer Kafka 插件。
    18
    19
    20
    存储配置为 临时persistent-claimjbod
    21
    22
    永久存储具有额外的配置选项,如用于动态卷调配的存储 idclass
    23
    机架感知 配置,将副本分散到不同的机架、数据中心或可用性区域。topologyKey 必须与包含机架 ID 的节点标签匹配。此配置中使用的示例使用标准 topology.kubernetes.io/zone 标签指定区。
    24
    启用 Prometheus 指标。在本例中,为 Prometheus JMX Exporter 配置指标(默认指标导出器)。
    25
    通过 Prometheus JMX Exporter 将指标导出到 Grafana 仪表板的 Prometheus 规则,通过引用包含 Prometheus JMX 导出器配置的 ConfigMap 来启用。您可以使用对在 metricsConfig.valueFrom.configMapKeyRef.key 下包含空文件的 ConfigMap 的引用来启用指标。
    26
    特定于 ZooKeeper 的配置,其中包含与 Kafka 配置类似的属性。
    27
    ZooKeeper 节点数量。ZooKeeper 集群通常有奇数个节点,一般为三个、五个或七个。大多数节点都必须可用,才能保持有效的仲裁。如果 ZooKeeper 集群丢失了其仲裁,它将停止响应客户端,Kafka 代理将停止工作。拥有稳定且高度可用的 ZooKeeper 集群对于 AMQ Streams 至关重要。
    28
    29
    30
    实体 Operator TLS sidecar 配置。Entity Operator 使用 TLS sidecar 与 ZooKeeper 安全通信。
    31
    指定的主题 Operator 日志程序和日志级别。这个示例使用 内联 日志记录。
    32
    33
    Kafka 导出程序配置。Kafka Exporter 是一个可选组件,用于在特定消费者滞后数据从 Kafka 代理中提取指标数据。要使 Kafka Exporter 正常工作,需要使用消费者组。
    34
    Cruise Control 的可选配置,用于重新平衡 Kafka 集群。
  2. 创建或更新资源:

    oc apply -f <kafka_configuration_file>

2.2.2. 配置实体 Operator

Entity Operator 用于管理在 Kafka 集群中运行的与 Kafka 相关的实体。

Entity Operator 包括:

  • 管理 Kafka 主题的主题 Operator
  • 用于管理 Kafka 用户的用户 Operator

通过 Kafka 资源配置,Cluster Operator 可以在部署 Kafka 集群时部署 Entity Operator,包括一个或多个 Operator。

Operator 会自动配置为管理 Kafka 集群的主题和用户。主题 Operator 和 User Operator 只能监视单个命名空间。

注意

部署后,实体 Operator Pod 会根据部署配置包含操作器。

2.2.2.1. 实体 Operator 配置属性

使用 Kafka.spec 中的 entityOperator 属性来配置 Entity Operator。

entityOperator 属性支持多个子属性:

  • tlsSidecar
  • topicOperator
  • userOperator
  • 模板

tlsSidecar 属性包含 TLS sidecar 容器的配置,用于与 ZooKeeper 通信。

template 属性包含 Entity Operator pod 的配置,如标签、注解、关联性和容限。有关配置模板的详情,请参考 第 2.7 节 “自定义 OpenShift 资源”

topicOperator 属性包含主题 Operator 的配置。如果缺少这个选项,则在没有 Topic Operator 的情况下部署 Entity Operator。

userOperator 属性包含 User Operator 的配置。如果缺少这个选项,则在没有 User Operator 的情况下部署 Entity Operator。

如需有关用于配置 Entity Operator 的属性的更多信息,请参阅 EntityUserOperatorSpec 模式参考

启用这两个 Operator 的基本配置示例

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    # ...
  zookeeper:
    # ...
  entityOperator:
    topicOperator: {}
    userOperator: {}

如果将空对象({})用于 topicOperatoruserOperator,则所有属性都使用它们的默认值。

topicOperatoruserOperator 属性都缺失时,实体 Operator 不会被部署。

2.2.2.2. 主题 Operator 配置属性

主题 Operator 部署可使用 topicOperator 对象内的附加选项进行配置。支持以下属性:

watchedNamespace
Topic Operator 监视 KafkaTopic 资源的 OpenShift 命名空间。default 是部署 Kafka 集群的命名空间。
reconciliationIntervalSeconds
定期协调之间的间隔(以秒为单位)。默认 120
zookeeperSessionTimeoutSeconds
ZooKeeper 会话超时(以秒为单位)。默认 18
topicMetadataMaxAttempts
从 Kafka 获取主题元数据的尝试次数。每次尝试之间的时间定义为指数避退。当主题创建可能会因为分区或副本的数量而花费更多时间时,请考虑增加这个值。默认 6
image
image 属性可用于配置要使用的容器镜像。有关配置自定义容器镜像的详情,请参考 第 6.1.6 节 “image
资源
resources 属性配置分配给 Topic Operator 的资源数量。有关资源请求和限制配置的详情,请参阅 第 6.1.5 节 “资源
logging
logging 属性配置 Topic Operator 的日志记录。如需了解更多详细信息,请参阅 第 6.2.45.1 节 “logging

Topic Operator 配置示例

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    # ...
  zookeeper:
    # ...
  entityOperator:
    # ...
    topicOperator:
      watchedNamespace: my-topic-namespace
      reconciliationIntervalSeconds: 60
    # ...

2.2.2.3. 用户 Operator 配置属性

可以使用 userOperator 对象内的附加选项来配置用户 Operator 部署。支持以下属性:

watchedNamespace
User Operator 监视 KafkaUser 资源的 OpenShift 命名空间。default 是部署 Kafka 集群的命名空间。
reconciliationIntervalSeconds
定期协调之间的间隔(以秒为单位)。默认 120
image
image 属性可用于配置要使用的容器镜像。有关配置自定义容器镜像的详情,请参考 第 6.1.6 节 “image
资源
resources 属性配置分配给 User Operator 的资源数量。有关资源请求和限制配置的详情,请参阅 第 6.1.5 节 “资源
logging
logging 属性配置 User Operator 的日志记录。如需了解更多详细信息,请参阅 第 6.2.45.1 节 “logging
secretPrefix
secretPrefix 属性在从 KafkaUser 资源创建的所有 Secret 的名称中添加前缀。例如,secretPrefix: kafka- 会使用 kafka- 前缀所有 Secret 名称作为前缀。因此,名为 my-user 的 KafkaUser 将创建一个名为 kafka-my-user 的 Secret。

User Operator 配置示例

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    # ...
  zookeeper:
    # ...
  entityOperator:
    # ...
    userOperator:
      watchedNamespace: my-user-namespace
      reconciliationIntervalSeconds: 60
    # ...

2.2.3. 配置 Kafka 和 ZooKeeper 存储

作为有状态应用程序,Kafka 和 ZooKeeper 将数据存储在磁盘上。对于这个数据,AMQ Streams 支持三种存储类型:

  • Ephemeral (推荐只在开发时使用)
  • 持久性
  • JBOD (Kafka 不仅仅是 ZooKeeper)

在配置 Kafka 资源时,您可以指定 Kafka 代理使用的存储类型及其对应的 ZooKeeper 节点。您可以使用以下资源中的 storage 属性配置存储类型:

  • Kafka.spec.kafka
  • Kafka.spec.zookeeper

存储类型在 type 字段中配置。

有关存储配置属性的更多信息,请参阅模式参考:

警告

部署 Kafka 集群后无法更改存储类型。

2.2.3.1. 数据存储注意事项

要使 AMQ Streams 正常工作,高效的数据存储基础架构非常重要。我们强烈建议使用块存储。AMQ Streams 仅测试用于块存储。文件存储(如 NFS)没有测试,且无法保证它正常工作。

为您的块存储选择以下选项之一:

注意

AMQ Streams 不需要 OpenShift 原始块卷。

2.2.3.1.1. 文件系统

Kafka 使用文件系统来存储信息。AMQ Streams 与 XFS 和 ext4 文件系统兼容,它们通常与 Kafka 一起使用。选择和设置文件系统时,请考虑部署的底层架构和要求。

如需更多信息,请参阅 Kafka 文档中的 Filesystem Selection

2.2.3.1.2. 磁盘用量

为 Apache Kafka 和 ZooKeeper 使用单独的磁盘。

虽然使用固态驱动器 (SSD) 并不是必须的,但它可以在大型集群中提高 Kafka 的性能,其中数据会异步发送到多个主题,并从多个主题接收。SSD 在 ZooKeeper 中特别有效,这需要快速、低延迟数据访问。

注意

您不需要置备复制存储,因为 Kafka 和 ZooKeeper 都有内置数据复制。

2.2.3.2. 临时存储

临时数据存储是临时的。节点上的所有 pod 共享本地临时存储空间。只要使用数据的 pod 正在运行,就会保留数据。当 pod 被删除时,数据会丢失。虽然 pod 可以在高可用性环境中恢复数据。

由于其临时性质,建议进行临时存储进行开发和测试。

临时存储使用 emptyDir 卷来存储数据。当 pod 分配给节点时,会创建一个 emptyDir 卷。您可以使用 sizeLimit 属性设置 emptyDir 的总存储量。

重要

临时存储不适用于带有复制因子 1 的单节点 ZooKeeper 集群或 Kafka 主题。

要使用临时存储,您需要将 KafkaZooKeeper 资源中的存储类型配置设置为 临时

临时存储配置示例

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    # ...
    storage:
      type: ephemeral
    # ...
  zookeeper:
    # ...
    storage:
      type: ephemeral
    # ...

2.2.3.2.1. Kafka 日志目录的挂载路径

Kafka 代理使用临时卷作为挂载到以下路径的日志目录:

/var/lib/kafka/data/kafka-logIDX

其中 IDX 是 Kafka 代理 pod 索引。例如 /var/lib/kafka/data/kafka-log0

2.2.3.3. 持久性存储

持久性数据存储在系统中断时保留数据。对于使用持久数据存储的 pod,数据会在 pod 故障之间保留,并重启。

动态置备框架可创建带有持久性存储的集群。Pod 配置使用 持久性卷声明 (PVC)在持久性卷(PV)上发出存储请求。PV 是代表存储卷的存储资源。PV 独立于使用这些 PV 的 pod。PVC 请求创建 pod 时需要的存储量。PV 的底层存储基础架构不需要理解。如果 PV 与存储标准匹配,则 PVC 会绑定到 PV。

由于其具有持久性的性质,建议用于生产环境。

PVC 可以通过指定 StorageClass 来请求不同类型的持久性存储。存储类定义存储配置集并动态置备 PV。如果没有指定存储类,则使用默认存储类。持久性存储选项可能包括 SAN 存储类型或本地持久性卷

要使用持久性存储,您需要将 KafkaZooKeeper 资源中的存储类型配置设置为 persistent-claim

在生产环境中,建议以下配置:

  • 对于 Kafka,使用一个或多个 type: persistent-claim 卷配置 type: jbod
  • 对于 ZooKeeper,配置 type: persistent-claim

持久性存储还具有以下配置选项:

id (可选)
存储标识号。对于 JBOD 存储声明中定义的存储卷,这个选项是必须的。默认值为 0。
size (必需)
持久性卷声明的大小,如 "1000Gi"。
class (可选)
用于动态卷置备的 OpenShift StorageClass存储类 配置包括描述卷配置集的参数。
selector (可选)
配置以指定特定 PV。提供 key:value 对,代表所选卷的标签。
deleteClaim (optional)
布尔值,以指定在卸载集群时是否删除 PVC。默认为 false
警告

只有在支持持久性卷大小的 OpenShift 版本中才支持增加现有 AMQ Streams 集群中的持久性卷大小。要调整的持久性卷必须使用支持卷扩展的存储类。对于不支持卷扩展的其他版本的 OpenShift 和存储类,您必须在部署集群前决定必要的存储大小。不能减少现有持久性卷的大小。

Kafka 和 ZooKeeper 持久性存储配置示例

# ...
spec:
  kafka:
    # ...
    storage:
      type: jbod
      volumes:
      - id: 0
        type: persistent-claim
        size: 100Gi
        deleteClaim: false
      - id: 1
        type: persistent-claim
        size: 100Gi
        deleteClaim: false
      - id: 2
        type: persistent-claim
        size: 100Gi
        deleteClaim: false
    # ...
  zookeeper:
    storage:
      type: persistent-claim
      size: 1000Gi
# ...

如果没有指定存储类,则使用默认值。以下示例指定了一个存储类。

使用特定存储类的持久性存储配置示例

# ...
storage:
  type: persistent-claim
  size: 1Gi
  class: my-storage-class
# ...

使用选择器 (selector)来指定提供某些功能的标记的持久性卷,如 SSD。

使用选择器的持久性存储配置示例

# ...
storage:
  type: persistent-claim
  size: 1Gi
  selector:
    hdd-type: ssd
  deleteClaim: true
# ...

2.2.3.3.1. 存储类覆盖

您可以为一个或多个 Kafka 代理或 ZooKeeper 节点指定不同的存储类,而不是使用默认存储类。例如,当存储类仅限于不同的可用区或数据中心时,这非常有用。您可以使用 overrides 字段来实现这一目的。

在本例中,默认存储类名为 my-storage-class

使用存储类覆盖的 AMQ Streams 集群示例

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  labels:
    app: my-cluster
  name: my-cluster
  namespace: myproject
spec:
  # ...
  kafka:
    replicas: 3
    storage:
      type: jbod
      volumes:
      - id: 0
        type: persistent-claim
        size: 100Gi
        deleteClaim: false
        class: my-storage-class
        overrides:
        - broker: 0
          class: my-storage-class-zone-1a
        - broker: 1
          class: my-storage-class-zone-1b
        - broker: 2
          class: my-storage-class-zone-1c
      # ...
  # ...
  zookeeper:
    replicas: 3
    storage:
      deleteClaim: true
      size: 100Gi
      type: persistent-claim
      class: my-storage-class
      overrides:
        - broker: 0
          class: my-storage-class-zone-1a
        - broker: 1
          class: my-storage-class-zone-1b
        - broker: 2
          class: my-storage-class-zone-1c
  # ...

由于配置的 overrides 属性,卷使用以下存储类:

  • ZooKeeper 节点 0 的持久性卷使用 my-storage-class-zone-1a
  • ZooKeeper 节点 1 的持久性卷使用 my-storage-class-zone-1b
  • ZooKeeepr 节点 2 的持久性卷使用 my-storage-class-zone-1c
  • Kafka 代理 0 的持久性卷使用 my-storage-class-zone-1a
  • Kafka 代理 1 的持久性卷使用 my-storage-class-zone-1b
  • Kafka 代理 2 的持久性卷使用 my-storage-class-zone-1c

overrides 属性目前仅用于覆盖存储类配置。目前不支持对其他存储配置属性的覆盖。目前不支持其他存储配置属性。

2.2.3.3.2. 持久性存储的 PVC 资源

使用持久性存储时,它会创建具有以下名称的 PVC:

data-cluster-name-kafka-idx
用于存储 Kafka 代理 pod idx 数据的卷的 PVC。
data-cluster-name-zookeeper-idx
用于存储 ZooKeeper 节点 pod idx 数据的 PVC。
2.2.3.3.3. Kafka 日志目录的挂载路径

Kafka 代理使用持久性卷作为挂载到以下路径的日志目录:

/var/lib/kafka/data/kafka-logIDX

其中 IDX 是 Kafka 代理 pod 索引。例如 /var/lib/kafka/data/kafka-log0

2.2.3.4. 调整持久性卷大小

您可以通过增加现有 AMQ Streams 集群使用的持久性卷的大小来置备存储容量。在 JBOD 存储配置中使用单个持久性卷或多个持久性卷的集群中支持重新定义持久性卷大小。

注意

您可以增加但不能缩小持久性卷的大小。OpenShift 当前不支持减少持久性卷的大小。

先决条件

  • 支持重新定义卷大小的 OpenShift 集群。
  • Cluster Operator 正在运行。
  • 使用支持卷扩展的存储类创建的持久性卷的 Kafka 集群。

流程

  1. 编辑集群的 Kafka 资源。

    更改 size 属性,以增加分配给 Kafka 集群、ZooKeeper 集群或两者的持久性卷的大小。

    • 对于 Kafka 集群,更新 spec.kafka.storage 中的 size 属性。
    • 对于 ZooKeeper 集群,更新 spec.zookeeper.storage 中的 size 属性。

    Kafka 配置将卷大小增加到 2000Gi

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        # ...
        storage:
          type: persistent-claim
          size: 2000Gi
          class: my-storage-class
        # ...
      zookeeper:
        # ...

  2. 创建或更新资源:

    oc apply -f <kafka_configuration_file>

    OpenShift 增加所选持久性卷的容量,以响应 Cluster Operator 的请求。调整大小后,Cluster Operator 会重启所有使用调整大小的持久性卷的 pod。这会自动发生。

  3. 验证集群中相关 pod 的存储容量是否已增加:

    oc get pv

    带有增加存储的 Kafka 代理 pod

    NAME               CAPACITY   CLAIM
    pvc-0ca459ce-...   2000Gi     my-project/data-my-cluster-kafka-2
    pvc-6e1810be-...   2000Gi     my-project/data-my-cluster-kafka-0
    pvc-82dc78c9-...   2000Gi     my-project/data-my-cluster-kafka-1

    输出显示与代理 pod 关联的每个 PVC 的名称。

其他资源

2.2.3.5. JBOD 存储

您可以将 AMQ Streams 配置为使用 JBOD,这是多个磁盘或卷的数据存储配置。JBOD 是为 Kafka 代理提供增加数据存储的方法。它还可以提高性能。

注意

JBOD 存储 只支持 Kafka 不支持 ZooKeeper。

JBOD 配置由一个或多个卷描述,每个卷可以 是临时的,也可以 是永久的。JBOD 卷声明的规则和约束与用于临时存储和持久性存储的规则和约束相同。例如,在置备后,您无法缩小持久性存储卷的大小,或者当类型为 临时 时无法更改 sizeLimit 的值。

要使用 JBOD 存储,您需要将 Kafka 资源中的存储类型配置设置为 jbodvolumes 属性允许您描述组成 JBOD 存储阵列或配置的磁盘。

JBOD 存储配置示例

# ...
storage:
  type: jbod
  volumes:
  - id: 0
    type: persistent-claim
    size: 100Gi
    deleteClaim: false
  - id: 1
    type: persistent-claim
    size: 100Gi
    deleteClaim: false
# ...

创建 JBOD 卷后无法更改 ID。您可以从 JBOD 配置中添加或删除卷。

2.2.3.5.1. JBOD 存储的 PVC 资源

当持久性存储用于声明 JBOD 卷时,它会创建一个具有以下名称的 PVC:

data-id-cluster-name-kafka-idx
用于存储 Kafka 代理 pod idx 数据的卷的 PVC。id 是用于存储 Kafka 代理 pod 数据的卷的 ID。
2.2.3.5.2. Kafka 日志目录的挂载路径

Kafka 代理使用 JBOD 卷作为挂载到以下路径的日志目录:

/var/lib/kafka/data-id/kafka-logidx

其中 id 是用于存储 Kafka 代理 pod idx 数据的卷的 ID。例如 /var/lib/kafka/data-0/kafka-log0

2.2.3.6. 将卷添加到 JBOD 存储

此流程描述了如何将卷添加到配置为使用 JBOD 存储的 Kafka 集群中。它不能应用到配置为使用任何其他存储类型的 Kafka 集群。

注意

当在过去和移除的 id 下添加新卷时,您必须确保之前使用的 PersistentVolumeClaims 已被删除。

先决条件

  • 一个 OpenShift 集群
  • 正在运行的 Cluster Operator
  • 带有 JBOD 存储的 Kafka 集群

流程

  1. 编辑 Kafka 资源中的 spec.kafka.storage.volumes 属性。将新卷添加到 volumes 数组中。例如,使用 id 2 添加新卷:

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        # ...
        storage:
          type: jbod
          volumes:
          - id: 0
            type: persistent-claim
            size: 100Gi
            deleteClaim: false
          - id: 1
            type: persistent-claim
            size: 100Gi
            deleteClaim: false
          - id: 2
            type: persistent-claim
            size: 100Gi
            deleteClaim: false
        # ...
      zookeeper:
        # ...
  2. 创建或更新资源:

    oc apply -f <kafka_configuration_file>
  3. 创建新主题或将现有分区重新分配给新磁盘。

    提示

    Cruise Control 是重新分配分区的有效工具。要执行内部磁盘平衡,您可以在 KafkaRebalance.spec 下将 rebalanceDisk 设置为 true

2.2.3.7. 从 JBOD 存储中删除卷

此流程描述了如何从配置为使用 JBOD 存储的 Kafka 集群中删除卷。它不能应用到配置为使用任何其他存储类型的 Kafka 集群。JBOD 存储始终必须包含至少一个卷。

重要

为了避免数据丢失,您必须在删除卷前移动所有分区。

先决条件

  • 一个 OpenShift 集群
  • 正在运行的 Cluster Operator
  • 带有两个或多个卷的 JBOD 存储的 Kafka 集群

流程

  1. 从您要删除的磁盘中重新分配所有分区。任何分区中的数据仍然分配给要被删除的磁盘。

    提示

    您可以使用 kafka-reassign-partitions.sh 工具重新分配分区。

  2. 编辑 Kafka 资源中的 spec.kafka.storage.volumes 属性。从 volumes 阵列中删除一个或多个卷。例如,删除 ID 为 12 的卷:

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        # ...
        storage:
          type: jbod
          volumes:
          - id: 0
            type: persistent-claim
            size: 100Gi
            deleteClaim: false
        # ...
      zookeeper:
        # ...
  3. 创建或更新资源:

    oc apply -f <kafka_configuration_file>

2.2.4. 从一个终端连接到 ZooKeeper

大多数 Kafka CLI 工具都可以直接连接到 Kafka,因此在正常情况下,您不需要连接到 ZooKeeper。ZooKeeper 服务通过加密和身份验证进行保护,不供不是 AMQ Streams 一部分的外部应用程序使用。

但是,如果要使用需要连接到 ZooKeeper 的 Kafka CLI 工具,您可以使用 ZooKeeper 容器内的终端,并连接到 localhost:12181 作为 ZooKeeper 地址。

先决条件

  • 一个 OpenShift 集群可用。
  • Kafka 集群正在运行。
  • Cluster Operator 正在运行。

流程

  1. 使用 OpenShift 控制台打开终端,或者从 CLI 运行 exec 命令。

    例如:

    oc exec -ti my-cluster-zookeeper-0 -- bin/kafka-topics.sh --list --zookeeper localhost:12181

    务必使用 localhost:12181

    现在,您可以在 ZooKeeper 中运行 Kafka 命令。

2.2.5. 手动删除 Kafka 节点

此流程描述了如何使用 OpenShift 注解删除现有 Kafka 节点。删除 Kafka 节点包括删除运行 Kafka 代理的 Pod 以及相关的 PersistentVolumeClaim (如果集群使用持久性存储部署)。删除后,Pod 及其相关 PersistentVolumeClaim 会自动重新创建。

警告

删除 PersistentVolumeClaim 可能会导致永久数据丢失。只有在遇到存储问题时,才应执行以下流程。

先决条件

有关运行的步骤,请参阅在 OpenShift 中部署和升级 AMQ Streams 指南:

流程

  1. 查找您要删除的 Pod 的名称。

    Kafka 代理 pod 的名称为 & lt;cluster-name&gt; -kafka-<index &gt ;,其中 <index > 以零开始,结束副本总数减一。例如,my-cluster-kafka-0

  2. 在 OpenShift 中注解 Pod 资源。

    使用 oc annotate

    oc annotate pod cluster-name-kafka-index strimzi.io/delete-pod-and-pvc=true
  3. 当注解的 pod 被删除,然后重新创建时,等待下一个协调。

2.2.6. 手动删除 ZooKeeper 节点

此流程描述了如何使用 OpenShift 注解删除现有 ZooKeeper 节点。删除 ZooKeeper 节点包括删除 ZooKeeper 运行所在的 Pod 以及相关的 PersistentVolumeClaim (如果集群使用持久性存储部署)。删除后,Pod 及其相关 PersistentVolumeClaim 会自动重新创建。

警告

删除 PersistentVolumeClaim 可能会导致永久数据丢失。只有在遇到存储问题时,才应执行以下流程。

先决条件

有关运行的步骤,请参阅在 OpenShift 中部署和升级 AMQ Streams 指南:

流程

  1. 查找您要删除的 Pod 的名称。

    ZooKeeper pod 命名为 &lt ;cluster-name&gt; -zookeeper-<index &gt ;,其中 <index > 以零开始,并以 replicas minus 总数结束。例如: my-cluster-zookeeper-0

  2. 在 OpenShift 中注解 Pod 资源。

    使用 oc annotate

    oc annotate pod cluster-name-zookeeper-index strimzi.io/delete-pod-and-pvc=true
  3. 当注解的 pod 被删除,然后重新创建时,等待下一个协调。

2.2.7. Kafka 集群资源列表

以下资源由 OpenShift 集群中的 Cluster Operator 创建:

共享资源

cluster-name-cluster-ca
带有用于加密集群通信的集群 CA 私钥的 secret。
cluster-name-cluster-ca-cert
使用集群 CA 公钥的 secret。此密钥可用于验证 Kafka 代理的身份。
cluster-name-clients-ca
带有用于为用户证书签名的客户端 CA 私钥的 secret
cluster-name-clients-ca-cert
带有客户端 CA 公钥的 secret。此密钥可用于验证 Kafka 用户的身份。
cluster-name-cluster-operator-certs
用于与 Kafka 和 ZooKeeper 通信的集群 operator 密钥的 secret。

Zookeeper 节点

cluster-name-zookeeper

提供给以下 ZooKeeper 资源的名称:

  • StrimziPodSet 或 StatefulSet (如果禁用 UseStrimziPodSets 功能门)用于管理 ZooKeeper 节点 pod。
  • ZooKeeper 节点使用的服务帐户。
  • 为 ZooKeeper 节点配置的 PodDisruptionBudget。
cluster-name-zookeeper-idx
由 ZooKeeper StatefulSet 或 StrimziPodSet 创建的 Pod。
cluster-name-zookeeper-nodes
无头服务需要使 DNS 直接解析 ZooKeeper pod IP 地址。
cluster-name-zookeeper-client
Kafka 代理用来连接到 ZooKeeper 节点的服务作为客户端。
cluster-name-zookeeper-config
包含 ZooKeeper 辅助配置的 ConfigMap,并由 ZooKeeper 节点 pod 作为卷挂载。
cluster-name-zookeeper-nodes
使用 ZooKeeper 节点密钥的 secret。
cluster-name-network-policy-zookeeper
网络策略管理对 ZooKeeper 服务的访问。
data-cluster-name-zookeeper-idx
用于为 ZooKeeper 节点 pod idx 存储数据的卷的持久性卷声明。只有在选择了持久性存储来置备持久性卷以存储数据时,才会创建此资源。

Kafka 代理

cluster-name-kafka

提供给以下 Kafka 资源的名称:

  • StrimziPodSet 或 StatefulSet (如果禁用 UseStrimziPodSets 功能门)用于管理 Kafka 代理 pod。
  • Kafka pod 使用的服务帐户。
  • 为 Kafka 代理配置的 PodDisruptionBudget。
cluster-name-kafka-idx

提供给以下 Kafka 资源的名称:

  • 由 Kafka StatefulSet 或 StrimziPodSet 创建的 Pod。
  • 带有 Kafka 代理配置的 ConfigMap (如果启用了 UseStrimziPodSets 功能门)。
cluster-name-kafka-brokers
服务需要 DNS 解析 Kafka 代理 pod IP 地址。
cluster-name-kafka-bootstrap
服务可用作从 OpenShift 集群内连接的 Kafka 客户端的 bootstrap 服务器。
cluster-name-kafka-external-bootstrap
从 OpenShift 集群外部连接的客户端的 bootstrap 服务。此资源仅在启用外部监听程序时创建。当监听器名称是 外部 且端口为 9094 时,旧服务名称将用于向后兼容。
cluster-name-kafka-pod-id
用于将流量从 OpenShift 集群外部路由到各个容器集的服务。此资源仅在启用外部监听程序时创建。当监听器名称是 外部 且端口为 9094 时,旧服务名称将用于向后兼容。
cluster-name-kafka-external-bootstrap
从 OpenShift 集群外部连接的客户端的 bootstrap 路由。只有启用外部监听程序并设置类型 路由 时,才会创建此资源。当监听器名称是 外部 且端口为 9094 时,旧路由名称将用于向后兼容。
cluster-name-kafka-pod-id
将来自 OpenShift 集群外的流量路由到各个容器集。只有启用外部监听程序并设置类型 路由 时,才会创建此资源。当监听器名称是 外部 且端口为 9094 时,旧路由名称将用于向后兼容。
cluster-name-kafka-listener-name-bootstrap
从 OpenShift 集群外部连接的客户端的 bootstrap 服务。此资源仅在启用外部监听程序时创建。新的服务名称将用于所有其他外部监听程序。
cluster-name-kafka-listener-name-pod-id
用于将流量从 OpenShift 集群外部路由到各个容器集的服务。此资源仅在启用外部监听程序时创建。新的服务名称将用于所有其他外部监听程序。
cluster-name-kafka-listener-name-bootstrap
从 OpenShift 集群外部连接的客户端的 bootstrap 路由。只有启用外部监听程序并设置类型 路由 时,才会创建此资源。新路由名称将用于所有其他外部监听程序。
cluster-name-kafka-listener-name-pod-id
将来自 OpenShift 集群外的流量路由到各个容器集。只有启用外部监听程序并设置类型 路由 时,才会创建此资源。新路由名称将用于所有其他外部监听程序。
cluster-name-kafka-config
包含 Kafka 辅助配置的 ConfigMap,当禁用 UseStrimziPodSets 功能门时,代理 pod 会作为卷挂载。
cluster-name-kafka-brokers
带有 Kafka 代理密钥的 secret。
cluster-name-network-policy-kafka
网络策略管理对 Kafka 服务的访问。
strimzi-namespace-name-cluster-name-kafka-init
Kafka 代理使用的集群角色绑定。
cluster-name-jmx
带有 JMX 用户名和密码的 secret,用于保护 Kafka 代理端口。只有在 Kafka 中启用了 JMX 时创建此资源。
data-cluster-name-kafka-idx
卷的持久性卷声明,用于存储 Kafka 代理 pod idx 的数据。只有在选择了持久性存储来置备持久性卷以存储数据时,才会创建此资源。
data-id-cluster-name-kafka-idx
id 的持久性卷声明用于存储 Kafka 代理 pod idx 的数据。只有在置备持久性卷以存储数据时为 JBOD 卷选择了持久性存储时,才会创建此资源。

Entity Operator

只有在使用 Cluster Operator 部署 Entity Operator 时,才会创建这些资源。

cluster-name-entity-operator

提供给以下实体 Operator 资源的名称:

  • 使用主题和用户操作程序进行部署.
  • Entity Operator 使用的服务帐户。
cluster-name-entity-operator-random-string
由 Entity Operator 部署创建的 Pod。
cluster-name-entity-topic-operator-config
带有主题 Operator 的辅助配置的 ConfigMap。
cluster-name-entity-user-operator-config
带有用户 Operator 的辅助配置的 ConfigMap。
cluster-name-entity-topic-operator-certs
与 Kafka 和 ZooKeeper 通信的主题 Operator 密钥的 secret。
cluster-name-entity-user-operator-certs
与 Kafka 和 ZooKeeper 通信的用户 Operator 密钥的 secret。
strimzi-cluster-name-entity-topic-operator
Entity Topic Operator 使用的角色绑定。
strimzi-cluster-name-entity-user-operator
Entity User Operator 使用的角色绑定。

Kafka Exporter

只有在使用 Cluster Operator 部署 Kafka Exporter 时,才会创建这些资源。

cluster-name-kafka-exporter

提供给以下 Kafka 导出器资源的名称:

  • 使用 Kafka 导出器进行部署。
  • 用于收集消费者滞后指标的服务。
  • Kafka Exporter 使用的服务帐户。
cluster-name-kafka-exporter-random-string
由 Kafka Exporter 部署创建的 Pod。

Sything Control

只有在使用 Cluster Operator 部署 Cruise Control 时,才会创建这些资源。

cluster-name-cruise-control

给定以下 Cruise 控制资源的名称:

  • 使用 Cruise 控制进行部署.
  • 用于与 Cruise Control 进行通信的服务。
  • Cruise Control 使用的服务帐户。
cluster-name-cruise-control-random-string
由 Cruise Control 部署创建的 Pod。
cluster-name-cruise-control-config
包含 Cruise Control 辅助配置的 ConfigMap,并由 Cruise Control Pod 作为一个卷挂载。
cluster-name-cruise-control-certs
使用 Cruise Control 密钥与 Kafka 和 ZooKeeper 通信的 secret。
cluster-name-network-policy-cruise-control
网络策略管理对 Cruise Control 服务的访问权限。
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.