2.2. Kafka 集群配置


使用 Kafka 资源配置 Kafka 部署。Kafka 集群使用 ZooKeeper 集群部署,因此 Kafka 资源中的 ZooKeeper 配置选项也可用。Entity Operator 包含主题 Operator 和 User Operator。您还可以在 Kafka 资源中配置 entityOperator 属性,以便在部署中包含 Topic Operator 和 User Operator。

第 12.2.1 节 “Kafka 模式参考” 描述 Kafka 资源的完整 schema。

有关 Apache Kafka 的更多信息,请参阅 Apache Kafka 文档

侦听器配置

您可以配置监听程序,将客户端连接到 Kafka 代理。有关为连接代理配置监听程序的更多信息,请参阅 Listener 配置

授权访问 Kafka

您可以配置 Kafka 集群,以允许或拒绝用户执行的操作。如需更多信息,请参阅保护对 Kafka 代理的访问

管理 TLS 证书

在部署 Kafka 时,Cluster Operator 会自动设置并更新 TLS 证书,以便在集群中启用加密和身份验证。如果需要,您可以在续订周期开始前手动续订集群和客户端 CA 证书。您还可以替换集群和客户端 CA 证书使用的密钥。如需更多信息,请参阅 手动更新 CA 证书并替换私钥

2.2.1. 配置 Kafka

使用 Kafka 资源的属性来配置 Kafka 部署。

另外,还可以配置 Kafka,添加 ZooKeeper 和 AMQ Streams Operator 的配置。常见配置属性(如日志记录和健康检查)会为每个组件独立配置。

此过程仅显示一些可能的配置选项,但特别重要的配置选项包括:

  • 资源请求(CPU/内存)
  • 用于最大和最小内存分配的 JVM 选项
  • 监听器(以及客户端的身份验证)
  • 身份验证
  • 存储
  • 机架感知
  • 指标
  • 用于集群重新平衡的 Cruise Control

Kafka 版本

Kafka configinter.broker.protocol.version 属性必须是指定的 Kafka 版本 (spec.kafka.version) 支持的版本。属性表示 Kafka 集群中使用的 Kafka 协议版本。

从 Kafka 3.0.0,当 inter.broker.protocol.version 设置为 3.0 或更高版本时,logging.message.format.version 选项会被忽略,不需要设置。

当升级 Kafka 版本时,需要对 inter.broker.protocol.version 进行更新。如需更多信息,请参阅升级 Kafka

先决条件

  • 一个 OpenShift 集群
  • 正在运行的 Cluster Operator

有关部署说明 ,请参阅 OpenShift 中的部署和升级 AMQ Streams 指南:

流程

  1. 编辑 Kafka 资源的 spec 属性。

    您可以在以下示例配置中显示您可以配置的属性:

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        replicas: 3 
    1
    
        version: 3.3.1 
    2
    
        logging: 
    3
    
          type: inline
          loggers:
            kafka.root.logger.level: "INFO"
        resources: 
    4
    
          requests:
            memory: 64Gi
            cpu: "8"
          limits:
            memory: 64Gi
            cpu: "12"
        readinessProbe: 
    5
    
          initialDelaySeconds: 15
          timeoutSeconds: 5
        livenessProbe:
          initialDelaySeconds: 15
          timeoutSeconds: 5
        jvmOptions: 
    6
    
          -Xms: 8192m
          -Xmx: 8192m
        image: my-org/my-image:latest 
    7
    
        listeners: 
    8
    
          - name: plain 
    9
    
            port: 9092 
    10
    
            type: internal 
    11
    
            tls: false 
    12
    
            configuration:
              useServiceDnsDomain: true 
    13
    
          - name: tls
            port: 9093
            type: internal
            tls: true
            authentication: 
    14
    
              type: tls
          - name: external 
    15
    
            port: 9094
            type: route
            tls: true
            configuration:
              brokerCertChainAndKey: 
    16
    
                secretName: my-secret
                certificate: my-certificate.crt
                key: my-key.key
        authorization: 
    17
    
          type: simple
        config: 
    18
    
          auto.create.topics.enable: "false"
          offsets.topic.replication.factor: 3
          transaction.state.log.replication.factor: 3
          transaction.state.log.min.isr: 2
          default.replication.factor: 3
          min.insync.replicas: 2
          inter.broker.protocol.version: "3.3"
          ssl.cipher.suites: "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384" 
    19
    
          ssl.enabled.protocols: "TLSv1.2"
          ssl.protocol: "TLSv1.2"
        storage: 
    20
    
          type: persistent-claim 
    21
    
          size: 10000Gi 
    22
    
        rack: 
    23
    
          topologyKey: topology.kubernetes.io/zone
        metricsConfig: 
    24
    
          type: jmxPrometheusExporter
          valueFrom:
            configMapKeyRef: 
    25
    
              name: my-config-map
              key: my-key
        # ...
      zookeeper: 
    26
    
        replicas: 3 
    27
    
        logging: 
    28
    
          type: inline
          loggers:
            zookeeper.root.logger: "INFO"
        resources:
          requests:
            memory: 8Gi
            cpu: "2"
          limits:
            memory: 8Gi
            cpu: "2"
        jvmOptions:
          -Xms: 4096m
          -Xmx: 4096m
        storage:
          type: persistent-claim
          size: 1000Gi
        metricsConfig:
          # ...
      entityOperator: 
    29
    
        tlsSidecar: 
    30
    
          resources:
            requests:
              cpu: 200m
              memory: 64Mi
            limits:
              cpu: 500m
              memory: 128Mi
        topicOperator:
          watchedNamespace: my-topic-namespace
          reconciliationIntervalSeconds: 60
          logging: 
    31
    
            type: inline
            loggers:
              rootLogger.level: "INFO"
          resources:
            requests:
              memory: 512Mi
              cpu: "1"
            limits:
              memory: 512Mi
              cpu: "1"
        userOperator:
          watchedNamespace: my-topic-namespace
          reconciliationIntervalSeconds: 60
          logging: 
    32
    
            type: inline
            loggers:
              rootLogger.level: INFO
          resources:
            requests:
              memory: 512Mi
              cpu: "1"
            limits:
              memory: 512Mi
              cpu: "1"
      kafkaExporter: 
    33
    
        # ...
      cruiseControl: 
    34
    
        # ...
    Copy to Clipboard Toggle word wrap
    1
    副本节点的数量。如果您的集群已定义了主题,您可以扩展集群
    2
    Kafka 版本,可按照升级过程 更改为受支持的版本。
    3
    Kafka 日志记录器和日志级别 直接(内联)或通过 ConfigMap 间接添加(外部)。自定义 ConfigMap 必须放在 log4j.properties 键下。对于 Kafka kafka.root.logger.level logger,您可以将日志级别设置为 INFO、ERROR、WARN、WARN、TRACE、DEBAL 或 OFF。
    4
    用于保留 支持的资源、当前 cpu 和内存 的请求,以及指定可消耗的最大资源数量。
    5
    状况检查,了解何时重启容器(持续)以及容器是否可以接受流量(就绪状态)。
    6
    用于优化运行 Kafka 的虚拟机(VM)性能的 JVM 配置选项
    7
    ADVANCED OPTION: 容器镜像配置,这只在特殊情况下建议。
    8
    侦听器配置客户端如何通过 bootstrap 地址连接到 Kafka 集群。监听器 配置为内部外部监听程序,用于从 OpenShift 集群内部或外部 进行连接
    9
    用于标识监听程序的名称。必须在 Kafka 集群中唯一。
    10
    Kafka 内监听器使用的端口号。端口号必须在给定的 Kafka 集群中唯一。允许的端口号为 9092 及更高版本,除了端口 9404 和 9999 除外,后者已用于 Prometheus 和 JMX。根据监听程序类型,端口号可能与连接 Kafka 客户端的端口号不同。
    11
    监听程序指定为 internalcluster-ip (使用每个代理的 ClusterIP 服务公开 Kafka),或为外部监听程序指定为 route, loadbalancer, nodeportingress
    12
    为每个监听程序启用 TLS 加密。默认为 false路由 监听器不需要 TLS 加密。
    13
    定义是否分配完全限定的 DNS 名称,包括集群服务后缀(通常为 .cluster.local)。
    14
    15
    16
    由外部 CA (证书颁发机构)管理的 Kafka 侦听器证书 的可选配置。brokerCertChainAndKey 指定包含服务器证书和私钥的 Secret。您可以在任何使用启用 TLS 加密的监听程序上配置 Kafka 侦听器证书。
    17
    授权 在 Kafka 代理上启用了 simple、OAUTH 2.0 或 OPA 授权。简单授权使用 AclAuthorizer Kafka 插件。
    18
    19
    20
    存储配置为 临时persistent-claimjbod
    21
    22
    永久存储具有额外的配置选项,如用于动态卷调配的存储 idclass
    23
    机架感知 配置,将副本分散到不同的机架、数据中心或可用性区域。topologyKey 必须与包含机架 ID 的节点标签匹配。此配置中使用的示例使用标准 topology.kubernetes.io/zone 标签指定区。
    24
    启用 Prometheus 指标。在本例中,为 Prometheus JMX Exporter 配置指标(默认指标导出器)。
    25
    Prometheus 规则用于通过 Prometheus JMX Exporter 将指标导出到 Grafana 仪表板,并通过引用包含 Prometheus JMX exporter 配置的 ConfigMap 来启用。您可以启用指标,而无需进一步配置,使用对 metricsConfig.valueFrom.configMapKeyRef.key 下包含空文件的 ConfigMap 的引用。
    26
    特定于 zookeeper 的配置,其包含与 Kafka 配置类似的属性。
    27
    ZooKeeper 节点数量。ZooKeeper 集群通常有奇数个节点,一般为三个、五个或七个。大多数节点都必须可用才能保持有效的仲裁。如果 ZooKeeper 集群丢失了仲裁,它将停止对客户端进行响应,并且 Kafka 代理将停止工作。拥有稳定且高度可用的 ZooKeeper 集群对于 AMQ Streams 至关重要。
    28
    29
    30
    实体 Operator TLS sidecar 配置。实体 Operator 使用 TLS sidecar 进行与 ZooKeeper 的安全通信。
    31
    指定的主题 Operator 日志程序和日志级别。这个示例使用 内联 日志记录。
    32
    33
    Kafka 导出程序配置。Kafka Exporter 是一个可选组件,用于从 Kafka 代理中提取指标数据。要使 Kafka 导出器能够正常工作,需要使用使用者组。
    34
    Cruise Control 的可选配置,用于 重新平衡 Kafka 集群
  2. 创建或更新资源:

    oc apply -f <kafka_configuration_file>
    Copy to Clipboard Toggle word wrap

2.2.2. 配置 Entity Operator

Entity Operator 用于管理在 Kafka 集群中运行的与 Kafka 相关的实体。

Entity Operator 包括:

  • 用于管理 Kafka 主题的主题 Operator
  • 用户 Operator 来管理 Kafka 用户

通过 Kafka 资源配置,Cluster Operator 在部署 Kafka 集群时,包括一个或多个 Operator。

Operator 会自动配置为管理 Kafka 集群的主题和用户。主题 Operator 和 User Operator 只能监视单个命名空间。更多信息请参阅 第 7.1 节 “使用 AMQ Streams operator 监视命名空间”

注意

部署之后,实体 Operator pod 会根据部署配置包含 Operator。

2.2.2.1. 实体 Operator 配置属性

使用 Kafka.spec 中的 entityOperator 属性配置 Entity Operator。

entityOperator 属性支持几个子操作:

  • tlsSidecar
  • topicOperator
  • userOperator
  • 模板

tlsSidecar 属性包含 TLS sidecar 容器的配置,用于与 ZooKeeper 进行通信。

template 属性包含 Entity Operator pod 的配置,如标签、注解、关联性和容限。有关配置模板的详情请参考 第 2.8 节 “自定义 OpenShift 资源”

topicOperator 属性包含主题 Operator 的配置。当缺少这个选项时,在没有 Topic Operator 的情况下部署 Entity Operator。

userOperator 属性包含 User Operator 的配置。当缺少这个选项时,在没有用户 Operator 的情况下部署 Entity Operator。

有关配置 Entity Operator 的属性的更多信息,请参阅 EntityUserOperatorSpec 模式参考

启用这两个 Operator 的基本配置示例

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    # ...
  zookeeper:
    # ...
  entityOperator:
    topicOperator: {}
    userOperator: {}
Copy to Clipboard Toggle word wrap

如果将空对象({})用于 topicOperatoruserOperator,则所有属性都使用它们的默认值。

当缺少 topicOperatoruserOperator 属性时,实体 Operator 不会被部署。

2.2.2.2. 主题 Operator 配置属性

主题 Operator 部署可使用 topicOperator 对象内的附加选项进行配置。支持以下属性:

watchedNamespace
主题 Operator 监视 KafkaTopic 资源的 OpenShift 命名空间。default 是部署 Kafka 集群的命名空间。
reconciliationIntervalSeconds
定期协调间隔(以秒为单位)。默认 120
zookeeperSessionTimeoutSeconds
ZooKeeper 会话超时(以秒为单位)。默认 18
topicMetadataMaxAttempts
从 Kafka 获取主题元数据时的尝试次数。每次尝试之间的时间都定义为 exponential back-off。当主题创建数量或副本的原因可能需要更多时间时,请考虑增大这个值。默认 6
image
image 属性可用于配置要使用的容器镜像。有关配置自定义容器镜像的详情,请参考 第 12.1.6 节 “image
资源
resources 属性配置分配给 Topic Operator 的资源数量。有关资源请求和限制配置的详情,请参阅 第 12.1.5 节 “资源
logging
logging 属性配置 Topic Operator 的日志记录。如需了解更多详细信息,请参阅 第 12.2.45.1 节 “logging

主题 Operator 配置示例

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    # ...
  zookeeper:
    # ...
  entityOperator:
    # ...
    topicOperator:
      watchedNamespace: my-topic-namespace
      reconciliationIntervalSeconds: 60
    # ...
Copy to Clipboard Toggle word wrap

2.2.2.3. 用户 Operator 配置属性

可以使用 userOperator 对象中的附加选项来配置用户 Operator 部署。支持以下属性:

watchedNamespace
User Operator 监视 KafkaUser 资源的 OpenShift 命名空间。default 是部署 Kafka 集群的命名空间。
reconciliationIntervalSeconds
定期协调间隔(以秒为单位)。默认 120
image
image 属性可用于配置要使用的容器镜像。有关配置自定义容器镜像的详情,请参考 第 12.1.6 节 “image
资源
resources 属性配置分配给 User Operator 的资源数量。有关资源请求和限制配置的详情,请参阅 第 12.1.5 节 “资源
logging
logging 属性配置 User Operator 的日志记录。如需了解更多详细信息,请参阅 第 12.2.45.1 节 “logging
secretPrefix
secretPrefix 属性添加一个前缀到从 KafkaUser 资源创建的所有 Secret 的名称中。例如,secretPrefix: kafka- 会为所有带有 kafka- 的 Secret 名称添加前缀。因此,名为 my-user 的 KafkaUser 创建名为 kafka-my-user 的 Secret。

User Operator 配置示例

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    # ...
  zookeeper:
    # ...
  entityOperator:
    # ...
    userOperator:
      watchedNamespace: my-user-namespace
      reconciliationIntervalSeconds: 60
    # ...
Copy to Clipboard Toggle word wrap

2.2.3. 配置 Kafka 和 ZooKeeper 存储

作为有状态应用程序,Kafka 和 ZooKeeper 将数据存储在磁盘中。对于这个数据,AMQ Streams 支持三种存储类型:

  • Ephemeral (推荐只在开发时使用)
  • 持久性
  • JBOD(仅适用于 ZooKeeper)

在配置 Kafka 资源时,您可以指定 Kafka 代理及其对应的 ZooKeeper 节点使用的存储类型。您可以使用以下资源中的 storage 属性配置存储类型:

  • Kafka.spec.kafka
  • Kafka.spec.zookeeper

存储类型在 type 字段中配置。

有关存储配置属性的更多信息,请参阅架构引用:

警告

部署 Kafka 集群后无法更改存储类型。

2.2.3.1. 数据存储注意事项

对于 AMQ Streams,高效的数据存储基础架构非常重要。块存储是必需的。文件存储(如 NFS)无法用于 Kafka。

为您的块存储选择以下选项之一:

注意

AMQ Streams 不需要 OpenShift 原始块卷。

2.2.3.1.1. 文件系统

Kafka 使用文件系统来存储信息。AMQ Streams 与 XFS 和 ext4 文件系统兼容,它们通常用于 Kafka。选择和设置文件系统时,请考虑部署的底层架构和要求。

如需更多信息,请参阅 Kafka 文档中的 Filesystem Selection

2.2.3.1.2. 磁盘用量

为 Apache Kafka 和 ZooKeeper 使用单独的磁盘。

虽然使用固态驱动器 (SSD) 并不是必须的,但它可以在大型集群中提高 Kafka 的性能,其中数据会异步发送到多个主题,并从多个主题接收。SSD 特别适用于 ZooKeeper,这需要快速、低延迟数据访问。

注意

您不需要置备复制存储,因为 Kafka 和 ZooKeeper 内置数据复制。

2.2.3.2. 临时存储

临时数据存储是临时的。节点上的所有 pod 都共享一个本地临时存储空间。只要用于运行它的 pod,就会保留数据。删除 pod 时,数据会丢失。虽然 pod 可以在高可用性环境中恢复数据。

由于其临时性,仅建议进行开发和测试的临时存储。

临时存储使用 emptyDir 卷来存储数据。当 pod 分配给节点时,会创建一个 emptyDir 卷。您可以使用 sizeLimit 属性,为 emptyDir 设置存储总量。

重要

临时存储不适用于单节点 ZooKeeper 集群或 Kafka 主题,其复制因素为 1。

要使用临时存储,您要将 KafkaZooKeeper 资源中的存储类型配置设置为 临时

临时存储配置示例

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    # ...
    storage:
      type: ephemeral
    # ...
  zookeeper:
    # ...
    storage:
      type: ephemeral
    # ...
Copy to Clipboard Toggle word wrap

2.2.3.2.1. Kafka 日志目录挂载路径

Kafka 代理使用临时卷作为挂载到以下路径的日志目录:

/var/lib/kafka/data/kafka-logIDX
Copy to Clipboard Toggle word wrap

其中 IDX 是 Kafka 代理 pod 索引。例如 /var/lib/kafka/data/kafka-log0

2.2.3.3. 持久性存储

持久性数据存储在系统中断时保留数据。对于使用持久性数据存储的 pod,数据会在 pod 故障后保留,并重启。

动态置备框架可创建带有持久性存储的集群。Pod 配置使用 持久性卷声明(PVC)在 持久性卷(PV)上发出存储请求。PV 是代表存储卷的存储资源。PV 独立于使用这些 PV 的 pod。PVC 请求创建 pod 时所需的存储量。PV 的底层存储基础架构不需要理解。如果 PV 与存储条件匹配,PVC 会被绑定到 PV。

由于具有永久性的性质,在生产环境中推荐使用持久性存储。

PVC 可以通过指定 StorageClass 来请求不同类型的持久性存储。存储类定义了存储配置集并动态置备 PV。如果没有指定存储类,则会使用默认存储类。持久性存储选项可能包括 SAN 存储类型 或本地持久性卷

要使用持久性存储,您要将 KafkaZooKeeper 资源中的存储类型配置设置为 persistent-claim

在生产环境中,建议进行以下配置:

  • 对于 Kafka,使用一个或多个类型配置 type: jbod : persistent-claim volumes
  • 对于 ZooKeeper,配置 type: persistent-claim

持久性存储也有以下配置选项:

ID (可选)
存储标识号。对于在 JBOD 存储声明中定义的存储卷,此选项是必需的。默认值为 0。
size (必需)
持久性卷声明的大小,如 "1000Gi"。
class (可选)
用于动态卷置备的 OpenShift StorageClass存储类 配置包含详细描述卷配置集的参数。
selector (可选)
配置以指定特定 PV。提供代表所选卷标签的 key:value 对。
deleteClaim (optional)
指定在卸载集群时是否删除 PVC 的布尔值。默认为 false
警告

只有在 AMQ Streams 集群中增加持久性卷的大小仅在支持持久性卷大小的 OpenShift 版本中被支持。要重新定义持久性卷的大小必须使用支持卷扩展的存储类。对于不支持卷扩展的其他版本的 OpenShift 和存储类,您必须在部署集群前决定必要的存储大小。无法减少现有持久性卷的大小。

Kafka 和 ZooKeeper 持久性存储配置示例

# ...
spec:
  kafka:
    # ...
    storage:
      type: jbod
      volumes:
      - id: 0
        type: persistent-claim
        size: 100Gi
        deleteClaim: false
      - id: 1
        type: persistent-claim
        size: 100Gi
        deleteClaim: false
      - id: 2
        type: persistent-claim
        size: 100Gi
        deleteClaim: false
    # ...
  zookeeper:
    storage:
      type: persistent-claim
      size: 1000Gi
# ...
Copy to Clipboard Toggle word wrap

如果没有指定存储类,则使用默认设置。以下示例指定了存储类。

使用特定存储类的持久性存储配置示例

# ...
storage:
  type: persistent-claim
  size: 1Gi
  class: my-storage-class
# ...
Copy to Clipboard Toggle word wrap

使用选择器 (selector)来指定提供某些功能的标记的持久性卷,如 SSD。

使用选择器的持久性存储配置示例

# ...
storage:
  type: persistent-claim
  size: 1Gi
  selector:
    hdd-type: ssd
  deleteClaim: true
# ...
Copy to Clipboard Toggle word wrap

2.2.3.3.1. 存储类覆盖

您可以为一个或多个 Kafka 代理或 ZooKeeper 节点指定不同的存储类,而不是使用默认存储类。例如,当存储类限制到不同的可用区或数据中心时,这很有用。您可以使用 overrides 字段来实现这一目的。

在本例中,默认存储类命名为 my-storage-class

使用存储类覆盖的 AMQ Streams 集群示例

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  labels:
    app: my-cluster
  name: my-cluster
  namespace: myproject
spec:
  # ...
  kafka:
    replicas: 3
    storage:
      type: jbod
      volumes:
      - id: 0
        type: persistent-claim
        size: 100Gi
        deleteClaim: false
        class: my-storage-class
        overrides:
        - broker: 0
          class: my-storage-class-zone-1a
        - broker: 1
          class: my-storage-class-zone-1b
        - broker: 2
          class: my-storage-class-zone-1c
      # ...
  # ...
  zookeeper:
    replicas: 3
    storage:
      deleteClaim: true
      size: 100Gi
      type: persistent-claim
      class: my-storage-class
      overrides:
        - broker: 0
          class: my-storage-class-zone-1a
        - broker: 1
          class: my-storage-class-zone-1b
        - broker: 2
          class: my-storage-class-zone-1c
  # ...
Copy to Clipboard Toggle word wrap

由于配置的 覆盖 属性,卷使用以下存储类:

  • ZooKeeper 节点 0 的持久性卷使用 my-storage-class-zone-1a
  • ZooKeeper 节点 1 的持久性卷使用 my-storage-class-zone-1b
  • ZooKeeepr 节点 2 的持久性卷使用 my-storage-class-zone-1c
  • Kafka 代理 0 的持久性卷使用 my-storage-class-zone-1a
  • Kafka 代理的持久性卷 1 使用 my-storage-class-zone-1b
  • Kafka 代理 2 的持久性卷使用 my-storage-class-zone-1c

overrides 属性目前只用于覆盖存储类配置。目前不支持覆盖其他存储配置属性。目前不支持其他存储配置属性。

2.2.3.3.2. 持久性存储的 PVC 资源

当使用持久性存储时,它会创建带有以下名称的 PVC:

data-cluster-name-kafka-idx
用于存储 Kafka 代理 pod idx 的数据的 PVC。
data-cluster-name-zookeeper-idx
用于存储 ZooKeeper 节点 pod idx 的数据的 PVC。
2.2.3.3.3. Kafka 日志目录挂载路径

Kafka 代理使用持久性卷作为挂载到以下路径的日志目录:

/var/lib/kafka/data/kafka-logIDX
Copy to Clipboard Toggle word wrap

其中 IDX 是 Kafka 代理 pod 索引。例如 /var/lib/kafka/data/kafka-log0

2.2.3.4. 重新定义持久性卷大小

您可以通过增加现有 AMQ Streams 集群使用的持久性卷的大小来置备更高的存储容量。集群中支持使用单个持久性卷或 JBOD 存储配置中的多个持久性卷的集群调整持久性卷大小。

注意

您可以增加,但不能缩小持久性卷的大小。OpenShift 当前不支持减少持久性卷的大小。

先决条件

  • 个 OpenShift 集群,支持卷大小调整。
  • Cluster Operator 正在运行。
  • 使用支持卷扩展的存储类创建的持久性卷的 Kafka 集群。

流程

  1. 编辑集群的 Kafka 资源。

    更改 size 属性,以增大分配给 Kafka 集群的持久性卷的大小、一个 ZooKeeper 集群或两者。

    • 对于 Kafka 集群,更新 spec.kafka.storage 下的 size 属性。
    • 对于 ZooKeeper 集群,更新 spec.zookeeper.storage 下的 size 属性。

    将卷大小增加到 2000Gi的 Kafka 配置

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        # ...
        storage:
          type: persistent-claim
          size: 2000Gi
          class: my-storage-class
        # ...
      zookeeper:
        # ...
    Copy to Clipboard Toggle word wrap

  2. 创建或更新资源:

    oc apply -f <kafka_configuration_file>
    Copy to Clipboard Toggle word wrap

    OpenShift 增加所选持久性卷的容量,以响应 Cluster Operator 的请求。调整大小完成后,Cluster Operator 会重启所有使用调整大小的持久性卷的 pod。这会自动发生。

  3. 验证集群中相关 pod 的存储容量是否已增加:

    oc get pv
    Copy to Clipboard Toggle word wrap

    带有更高存储的 Kafka 代理 pod

    NAME               CAPACITY   CLAIM
    pvc-0ca459ce-...   2000Gi     my-project/data-my-cluster-kafka-2
    pvc-6e1810be-...   2000Gi     my-project/data-my-cluster-kafka-0
    pvc-82dc78c9-...   2000Gi     my-project/data-my-cluster-kafka-1
    Copy to Clipboard Toggle word wrap

    输出显示与代理 pod 关联的每个 PVC 的名称。

2.2.3.5. JBOD 存储

您可以将 AMQ Streams 配置为使用 JBOD,这是多个磁盘或卷的数据存储配置。JBOD 是为 Kafka 代理提供增加数据存储的方法。它还可以提高性能。

注意

Kafka 仅支持 JBOD 存储。

JBOD 配置由一个或多个卷描述,每个卷可以是 临时或 持久的。JBOD 卷声明的规则和约束与用于临时存储的规则和限制相同。例如,在置备后无法缩小持久性存储卷的大小,或者当类型是 临时 时无法更改 sizeLimit 的值。

要使用 JBOD 存储,您要将 Kafka 资源中的存储类型配置设置为 jbodvolumes 属性允许您描述组成 JBOD 存储阵列或配置的磁盘。

JBOD 存储配置示例

# ...
storage:
  type: jbod
  volumes:
  - id: 0
    type: persistent-claim
    size: 100Gi
    deleteClaim: false
  - id: 1
    type: persistent-claim
    size: 100Gi
    deleteClaim: false
# ...
Copy to Clipboard Toggle word wrap

创建 JBOD 卷后无法更改 ID。您可以从 JBOD 配置中添加或删除卷。

2.2.3.5.1. JBOD 存储的 PVC 资源

当持久性存储用于声明 JBOD 卷时,它会创建一个具有以下名称的 PVC:

data-id-cluster-name-kafka-idx
用于存储 Kafka 代理 pod idx 的数据的 PVC。id 是用于存储 Kafka 代理 pod 数据的卷的 ID。
2.2.3.5.2. Kafka 日志目录挂载路径

Kafka 代理使用 JBOD 卷作为挂载到以下路径的日志目录:

/var/lib/kafka/data-id/kafka-logidx
Copy to Clipboard Toggle word wrap

其中 id 是用于存储 Kafka 代理 pod idx 的数据的卷 ID。例如 /var/lib/kafka/data-0/kafka-log0

2.2.3.6. 将卷添加到 JBOD 存储

此流程描述了如何将卷添加到配置为使用 JBOD 存储的 Kafka 集群。它不适用于配置为使用任何其他存储类型的 Kafka 集群。

注意

当在过去和移除的 id 下添加新卷时,您必须确保之前使用的 PersistentVolumeClaim 已被删除。

先决条件

  • 一个 OpenShift 集群
  • 正在运行的 Cluster Operator
  • 带有 JBOD 存储的 Kafka 集群

流程

  1. 编辑 Kafka 资源中的 spec.kafka.storage.volumes 属性。将新卷添加到 阵列中。例如,添加新卷,其 ID 为 2:

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        # ...
        storage:
          type: jbod
          volumes:
          - id: 0
            type: persistent-claim
            size: 100Gi
            deleteClaim: false
          - id: 1
            type: persistent-claim
            size: 100Gi
            deleteClaim: false
          - id: 2
            type: persistent-claim
            size: 100Gi
            deleteClaim: false
        # ...
      zookeeper:
        # ...
    Copy to Clipboard Toggle word wrap
  2. 创建或更新资源:

    oc apply -f <kafka_configuration_file>
    Copy to Clipboard Toggle word wrap
  3. 创建新主题或将现有分区重新分配给新磁盘。

2.2.3.7. 从 JBOD 存储中删除卷

此流程描述了如何从配置为使用 JBOD 存储的 Kafka 集群中删除卷。它不适用于配置为使用任何其他存储类型的 Kafka 集群。JBOD 存储总必须至少包含一个卷。

重要

为了避免数据丢失,您必须在删除卷前移动所有分区。

先决条件

  • 一个 OpenShift 集群
  • 正在运行的 Cluster Operator
  • 带有两个或多个卷的 JBOD 存储的 Kafka 集群

流程

  1. 从您要删除的磁盘中重新分配所有分区。分区中的任何数据仍会分配给要删除的磁盘中的数据可能会丢失。
  2. 编辑 Kafka 资源中的 spec.kafka.storage.volumes 属性。从 volumes 阵列中删除一个或多个卷。例如,删除 ID 为 12 的卷:

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        # ...
        storage:
          type: jbod
          volumes:
          - id: 0
            type: persistent-claim
            size: 100Gi
            deleteClaim: false
        # ...
      zookeeper:
        # ...
    Copy to Clipboard Toggle word wrap
  3. 创建或更新资源:

    oc apply -f <kafka_configuration_file>
    Copy to Clipboard Toggle word wrap

2.2.4. 扩展集群

通过添加或删除代理来扩展 Kafka 集群。如果集群已经定义了主题,还必须重新分配分区。

使用 kafka-reassign-partitions.sh 工具重新分配分区。该工具使用一个重新分配 JSON 文件来指定要重新分配主题。

如果要移动特定分区,可以生成重新分配 JSON 文件或手动创建文件。

2.2.4.1. 代理扩展配置

您可以配置 Kafka.spec.kafka.replicas 配置来添加或减少代理数。

代理添加

增加主题吞吐量的主要方法是增加该主题的分区数。这是因为额外分区允许在集群中的不同代理间共享主题。但是,当每个代理被使用更多分区的特定资源(通常是 I/O)的限制时,则会导致吞吐量增加。反之,您需要将代理添加到集群中。

当您向集群添加额外代理时,Kafka 不会自动为其分配任何分区。您必须决定从现有代理重新分配给新代理的分区。

在所有代理之间重新分布分区后,会减少每个代理的资源使用。

代理删除

如果您使用 StatefulSets 管理代理 pod,则无法从集群中删除 任何 pod。您只能从集群中删除一个或多个最高数量的 pod。例如,在 12 个代理集群中,pod 被命名为 cluster-name-kafka-0 up to cluster-name-kafka-11。如果您决定缩减一个代理,则会删除 cluster-name-kafka-11

从集群中删除代理前,请确保不会将其分配给任何分区。您还应该决定哪些剩余的代理将负责停用代理中的每个分区。代理没有分配的分区后,可以安全地缩减集群。

2.2.4.2. 分区重新分配工具

Topic Operator 目前不支持将副本重新分配给不同的代理,因此需要直接连接到代理 pod 以重新分配给代理 pod。

在代理 pod 中,kafka-reassign-partitions.sh 工具可让您将分区重新分配给不同的代理。

它有三种不同的模式:

--generate
取一组主题和代理,并生成 重新分配 JSON 文件,该文件将导致这些代理所分配给这些主题的分区。由于这在整个主题上运行,因此只有在您只想重新分配一些主题时,无法使用它。
--execute
取一个 重新分配 JSON 文件,并将其应用到集群中的分区和文件系统。获取分区的代理会成为分区的领导者。对于给定分区,当新代理发现并加入 ISR (同步副本)后,旧代理将停止成为后续代理,并将删除其副本。
--verify
使用与 --execute 步骤相同的 重新分配 JSON 文件,-- verify 检查 文件中所有分区是否已移至预期的代理中。如果重新分配完成后,-- verify 也会移除所有生效的流量节流(--throttle)。除非被删除,否则节流将继续影响集群,即使重新分配完成后也是如此。

只能在任何给定时间在集群中运行一个重新分配,且无法取消一个正在运行的重新分配。如果您需要取消重新分配,请等待它完成,然后执行另一个重新分配来还原第一个重新分配的效果。kafka-reassign-partitions.sh 将打印这个 reversion 的重新分配 JSON 作为其输出的一部分。非常大的重新分配应该被分解为多个较小的重新分配,以防需要停止 in-progress 重新分配。

2.2.4.2.1. 分区重新分配 JSON 文件

重新分配 JSON 文件具有特定的结构:

{
  "version": 1,
  "partitions": [
    <PartitionObjects>
  ]
}
Copy to Clipboard Toggle word wrap

其中 <PartitionObjects > 是一个以逗号分隔的对象列表,如下所示:

{
  "topic": <TopicName>,
  "partition": <Partition>,
  "replicas": [ <AssignedBrokerIds> ]
}
Copy to Clipboard Toggle word wrap
注意

虽然 Kafka 还支持 "log_dirs" 属性,但在 AMQ Streams 中不应该使用它。

以下是一个示例重新分配 JSON 文件,它将主题 topic-a 的分区 4 分配给代理 2, 47;主题 topic-b 的分区 2 分配给代理 1, 57

分区重新分配文件示例

{
  "version": 1,
  "partitions": [
    {
      "topic": "topic-a",
      "partition": 4,
      "replicas": [2,4,7]
    },
    {
      "topic": "topic-b",
      "partition": 2,
      "replicas": [1,5,7]
    }
  ]
}
Copy to Clipboard Toggle word wrap

没有包括在 JSON 中的分区不会被更改。

2.2.4.2.2. 分区在 JBOD 卷之间重新分配

在 Kafka 集群中使用 JBOD 存储时,您可以选择重新分配特定卷及其日志目录(每个卷具有单个日志目录)之间的分区。要将分区重新分配给特定卷,请在重新分配 JSON 文件的 < PartitionObjects& gt; 中添加 log_dirs 选项。

{
  "topic": <TopicName>,
  "partition": <Partition>,
  "replicas": [ <AssignedBrokerIds> ],
  "log_dirs": [ <AssignedLogDirs> ]
}
Copy to Clipboard Toggle word wrap

log_dirs 对象应包含与 replicas 对象中指定的副本数相同的日志目录数量。该值应该是日志目录的绝对路径,或者任何 关键字。

指定日志目录的分区重新分配文件示例

{
      "topic": "topic-a",
      "partition": 4,
      "replicas": [2,4,7].
      "log_dirs": [ "/var/lib/kafka/data-0/kafka-log2", "/var/lib/kafka/data-0/kafka-log4", "/var/lib/kafka/data-0/kafka-log7" ]
}
Copy to Clipboard Toggle word wrap

分区重新分配节流

分区重新分配可能会很慢,因为它涉及在代理之间传输大量数据。为了避免对客户端造成负面影响,您可以限制重新分配过程。使用带有 kafka-reassign-partitions.sh 工具的 --throttle 参数来限制重新分配。对于在代理之间移动分区,每秒指定最大阈值(以字节/秒为单位)。例如,-- throttle 5000000 为移动分区设置最大阈值 50 MBps。

节流可能会导致重新分配需要更长的时间来完成。

  • 如果节流太低,则新分配的代理将无法跟上要发布的记录,并且重新分配永远不会完成。
  • 如果节流过高,客户端将会受到影响。

例如,对于生产者而言,这可以比一般延迟等待被确认而大于正常延迟。对于消费者,这可能会以较低的吞吐量的形式进行清单,导致轮询之间的延迟更高。

2.2.4.3. 生成重新分配 JSON 文件

这个步骤描述了如何生成重新分配 JSON 文件。使用带有 kafka-reassign-partitions.sh 工具的重新分配文件,在扩展 Kafka 集群后重新分配分区。

您可以从连接到 Kafka 集群的交互式 pod 容器运行该工具。

这些步骤描述了使用 mTLS 的安全重新分配过程。您需要一个使用 TLS 加密和 mTLS 验证的 Kafka 集群。

您需要以下内容来建立连接:

  • 在创建 Kafka 集群时,Cluster Operator 生成的集群 CA 证书和密钥
  • 当用户为对 Kafka 集群的客户端访问创建时,User Operator 生成的用户 CA 证书和密钥

在此过程中,CA 证书和对应的密码是从集群中提取的,并以 PKCS #12 (.p12.password)格式包含它们的用户 secret。密码允许访问包含证书的 .p12 存储。您可以使用 .p12 存储来指定信任存储和密钥存储来验证与 Kafka 集群的连接。

先决条件

  • 您有一个正在运行的 Cluster Operator。
  • 您有一个正在运行的 Kafka 集群,它基于配置了内部 TLS 加密和 mTLS 验证的 Kafka 资源。

    使用 TLS 加密和 mTLS 验证的 Kafka 配置

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        # ...
        listeners:
          # ...
          - name: tls
            port: 9093
            type: internal
            tls: true 
    1
    
            authentication:
              type: tls 
    2
    
        # ...
    Copy to Clipboard Toggle word wrap

    1
    为内部监听程序启用 TLS 加密。
    2
  • 正在运行的 Kafka 集群包含一组要重新分配的主题和分区。

    my-topic的主题配置示例

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaTopic
    metadata:
      name: my-topic
      labels:
        strimzi.io/cluster: my-cluster
    spec:
      partitions: 10
      replicas: 3
      config:
        retention.ms: 7200000
        segment.bytes: 1073741824
        # ...
    Copy to Clipboard Toggle word wrap

  • 有一个配置了 ACL 规则的 KafkaUser,它用于指定从 Kafka 代理生成和使用主题的权限。

    带有 ACL 规则的 Kafka 用户配置示例,允许在 my-topicmy-cluster上执行操作

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaUser
    metadata:
      name: my-user
      labels:
        strimzi.io/cluster: my-cluster
    spec:
      authentication: 
    1
    
        type: tls
      authorization:
        type: simple 
    2
    
        acls:
          # access to the topic
          - resource:
              type: topic
              name: my-topic
            operations:
              - Create
              - Describe
              - Read
              - AlterConfigs
            host: "*"
          # access to the cluster
          - resource:
              type: cluster
            operations:
              - Alter
              - AlterConfigs
            host: "*"
          # ...
      # ...
    Copy to Clipboard Toggle word wrap

    1
    定义为 mutual tls 的用户身份验证机制。
    2
    ACL 规则的简单授权和附带列表。

流程

  1. 从 Kafka 集群的 < cluster_name> -cluster-ca-cert secret 中提取集群 CA 证书和密钥。

    oc get secret <cluster_name>-cluster-ca-cert -o jsonpath='{.data.ca\.p12}' | base64 -d > ca.p12
    Copy to Clipboard Toggle word wrap
    oc get secret <cluster_name>-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 -d > ca.password
    Copy to Clipboard Toggle word wrap

    <cluster_name > 替换为 Kafka 集群的名称。当您使用 Kafka 资源部署 Kafka 时,使用 Kafka 集群名称( <cluster_name> -cluster-ca-cert )创建一个带有集群 CA 证书的 secret。例如,my-cluster-cluster-ca-cert

  2. 使用 AMQ Streams Kafka 镜像运行一个新的交互式 pod 容器,以连接到正在运行的 Kafka 代理。

    oc run --restart=Never --image=registry.redhat.io/amq7/amq-streams-kafka-33-rhel8:2.3.0 <interactive_pod_name> -- /bin/sh -c "sleep 3600"
    Copy to Clipboard Toggle word wrap

    <interactive_pod_name > 替换为 pod 的名称。

  3. 将集群 CA 证书复制到互动 pod 容器中。

    oc cp ca.p12 <interactive_pod_name>:/tmp
    Copy to Clipboard Toggle word wrap
  4. 从具有访问 Kafka 代理的 Kafka 用户的 secret 中提取用户 CA 证书和密钥。

    oc get secret <kafka_user> -o jsonpath='{.data.user\.p12}' | base64 -d > user.p12
    Copy to Clipboard Toggle word wrap
    oc get secret <kafka_user> -o jsonpath='{.data.user\.password}' | base64 -d > user.password
    Copy to Clipboard Toggle word wrap

    <kafka_user > 替换为 Kafka 用户的名称。当使用 KafkaUser 资源创建 Kafka 用户时,会使用 Kafka 用户名创建带有用户 CA 证书的 secret。例如,my-user

  5. 将用户 CA 证书复制到交互式 pod 容器。

    oc cp user.p12 <interactive_pod_name>:/tmp
    Copy to Clipboard Toggle word wrap

    CA 证书允许交互式 pod 容器使用 TLS 连接到 Kafka 代理。

  6. 创建 config.properties 文件,以指定用于验证与 Kafka 集群的连接的 truststore 和密钥存储。

    使用您在前面的步骤中提取的证书和密码。

    bootstrap.servers=<kafka_cluster_name>-kafka-bootstrap:9093 
    1
    
    security.protocol=SSL 
    2
    
    ssl.truststore.location=/tmp/ca.p12 
    3
    
    ssl.truststore.password=<truststore_password> 
    4
    
    ssl.keystore.location=/tmp/user.p12 
    5
    
    ssl.keystore.password=<keystore_password> 
    6
    Copy to Clipboard Toggle word wrap
    1
    连接到 Kafka 集群的 bootstrap 服务器地址。使用您自己的 Kafka 集群名称替换 < kafka_cluster_name>
    2
    使用 TLS 进行加密时的安全协议选项。
    3
    truststore 位置包含了 Kafka 集群的公钥证书 (ca.p12)。
    4
    用于访问信任存储的密码(ca.password)。
    5
    密钥存储位置包含 Kafka 用户的公钥证书(user.p12)。
    6
    用于访问密钥存储的密码(user.password)。
  7. config.properties 文件复制到交互式 pod 容器中。

    oc cp config.properties <interactive_pod_name>:/tmp/config.properties
    Copy to Clipboard Toggle word wrap
  8. 准备名为 topics.json 的 JSON 文件,以指定要移动的主题。

    将主题名称指定为用逗号分开的列表。

    用于重新分配 topic-atopic-b的所有分区的 JSON 文件示例

    {
      "version": 1,
      "topics": [
        { "topic": "topic-a"},
        { "topic": "topic-b"}
      ]
    }
    Copy to Clipboard Toggle word wrap

  9. topics.json 文件复制到交互式 pod 容器中。

    oc cp topics.json <interactive_pod_name>:/tmp/topics.json
    Copy to Clipboard Toggle word wrap
  10. 在互动 pod 容器中启动 shell 进程。

    oc exec -n <namespace> -ti <interactive_pod_name> /bin/bash
    Copy to Clipboard Toggle word wrap

    <namespace > 替换为运行 pod 的 OpenShift 命名空间。

  11. 使用 kafka-reassign-partitions.sh 命令生成重新分配 JSON。

    示例以将 topic-atopic-b 的所有分区移动到代理 012

    bin/kafka-reassign-partitions.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 \
      --command-config /tmp/config.properties \
      --topics-to-move-json-file /tmp/topics.json \
      --broker-list 0,1,2 \
      --generate
    Copy to Clipboard Toggle word wrap

2.2.4.4. 扩展 Kafka 集群

使用重新分配文件来增加 Kafka 集群中的代理数。

重新分配文件应描述分区如何重新分配给放大 Kafka 集群中的代理。

这个步骤描述了使用 TLS 的安全扩展过程。您需要一个使用 TLS 加密和 mTLS 验证的 Kafka 集群。

先决条件

  • 您有一个正在运行的 Kafka 集群,它基于配置了内部 TLS 加密和 mTLS 验证的 Kafka 资源。
  • 您已生成了一个名为 rement .json 的重新分配 JSON 文件。
  • 您要运行一个连接到正在运行的 Kafka 代理的交互式 pod 容器。
  • 您使用 ACL 规则配置的 KafkaUser 连接,指定管理 Kafka 集群及其主题的权限。

请参阅生成 JSON 文件

流程

  1. 通过增加 Kafka.spec.kafka.replicas 配置选项,根据需要添加多个新代理。
  2. 验证新代理 pod 是否已启动。
  3. 如果没有这样做,请运行交互式 pod 容器来生成一个重新分配 JSON 文件,名为 reassignment.json
  4. reassignment.json 文件复制到交互式 pod 容器中。

    oc cp reassignment.json <interactive_pod_name>:/tmp/reassignment.json
    Copy to Clipboard Toggle word wrap

    <interactive_pod_name > 替换为 pod 的名称。

  5. 在互动 pod 容器中启动 shell 进程。

    oc exec -n <namespace> -ti <interactive_pod_name> /bin/bash
    Copy to Clipboard Toggle word wrap

    <namespace > 替换为运行 pod 的 OpenShift 命名空间。

  6. 使用交互式 pod 容器中的 kafka-reassign-partitions.sh 脚本来重新分配分区。

    bin/kafka-reassign-partitions.sh --bootstrap-server
     <cluster_name>-kafka-bootstrap:9093 \
     --command-config /tmp/config.properties \
     --reassignment-json-file /tmp/reassignment.json \
     --execute
    Copy to Clipboard Toggle word wrap

    <cluster_name > 替换为 Kafka 集群的名称。例如: my-cluster-kafka-bootstrap:9093

    如果要节流复制,也可以将 --throttle 选项传递给一个 inter-broker throttled 速率(以每秒字节为单位)。例如:

    bin/kafka-reassign-partitions.sh --bootstrap-server
      <cluster_name>-kafka-bootstrap:9093 \
      --command-config /tmp/config.properties \
      --reassignment-json-file /tmp/reassignment.json \
      --throttle 5000000 \
      --execute
    Copy to Clipboard Toggle word wrap

    此命令将输出两个重新分配 JSON 对象。第一个记录所移动分区的当前分配。如果稍后需要恢复重新分配,您应该将其保存到本地文件(而不是 pod 中的文件)。第二个 JSON 对象是您在重新分配 JSON 文件中传递的目标重新分配。

    如果您需要在重新分配时更改节流,您可以使用不同的节流率相同的命令。例如:

    bin/kafka-reassign-partitions.sh --bootstrap-server
      <cluster_name>-kafka-bootstrap:9093 \
      --command-config /tmp/config.properties \
      --reassignment-json-file /tmp/reassignment.json \
      --throttle 10000000 \
      --execute
    Copy to Clipboard Toggle word wrap
  7. 验证重新分配已使用来自任何代理 pod 的 kafka-reassign-partitions.sh 命令行工具完成。这与上一步中的命令相同,但使用 --verify 选项而不是 --execute 选项。

    bin/kafka-reassign-partitions.sh --bootstrap-server
      <cluster_name>-kafka-bootstrap:9093 \
      --command-config /tmp/config.properties \
      --reassignment-json-file /tmp/reassignment.json \
      --verify
    Copy to Clipboard Toggle word wrap

    --verify 命令报告每个分区都成功完成时,重新分配过程已被重新分配。最后 --verify 还会导致删除任何重新分配节流的效果。

  8. 现在,如果保存了 JSON 以将分配恢复到其原始代理,您可以删除恢复的文件。

2.2.4.5. 缩减 Kafka 集群

使用重新分配文件来减少 Kafka 集群中的代理数。

重新分配文件必须描述如何将分区重新分配给 Kafka 集群中的剩余代理。首先删除最高数量最高的 pod 中的代理。

这个步骤描述了使用 TLS 的安全扩展过程。您需要一个使用 TLS 加密和 mTLS 验证的 Kafka 集群。

先决条件

  • 您有一个正在运行的 Kafka 集群,它基于配置了内部 TLS 加密和 mTLS 验证的 Kafka 资源。
  • 您已生成了一个名为 rement .json 的重新分配 JSON 文件。
  • 您要运行一个连接到正在运行的 Kafka 代理的交互式 pod 容器。
  • 您使用 ACL 规则配置的 KafkaUser 连接,指定管理 Kafka 集群及其主题的权限。

请参阅生成 JSON 文件

流程

  1. 如果没有这样做,请运行交互式 pod 容器来生成一个重新分配 JSON 文件,名为 reassignment.json
  2. reassignment.json 文件复制到交互式 pod 容器中。

    oc cp reassignment.json <interactive_pod_name>:/tmp/reassignment.json
    Copy to Clipboard Toggle word wrap

    <interactive_pod_name > 替换为 pod 的名称。

  3. 在互动 pod 容器中启动 shell 进程。

    oc exec -n <namespace> -ti <interactive_pod_name> /bin/bash
    Copy to Clipboard Toggle word wrap

    <namespace > 替换为运行 pod 的 OpenShift 命名空间。

  4. 使用交互式 pod 容器中的 kafka-reassign-partitions.sh 脚本来重新分配分区。

    bin/kafka-reassign-partitions.sh --bootstrap-server
     <cluster_name>-kafka-bootstrap:9093 \
     --command-config /tmp/config.properties \
     --reassignment-json-file /tmp/reassignment.json \
     --execute
    Copy to Clipboard Toggle word wrap

    <cluster_name > 替换为 Kafka 集群的名称。例如: my-cluster-kafka-bootstrap:9093

    如果要节流复制,也可以将 --throttle 选项传递给一个 inter-broker throttled 速率(以每秒字节为单位)。例如:

    bin/kafka-reassign-partitions.sh --bootstrap-server
      <cluster_name>-kafka-bootstrap:9093 \
      --command-config /tmp/config.properties \
      --reassignment-json-file /tmp/reassignment.json \
      --throttle 5000000 \
      --execute
    Copy to Clipboard Toggle word wrap

    此命令将输出两个重新分配 JSON 对象。第一个记录所移动分区的当前分配。如果稍后需要恢复重新分配,您应该将其保存到本地文件(而不是 pod 中的文件)。第二个 JSON 对象是您在重新分配 JSON 文件中传递的目标重新分配。

    如果您需要在重新分配时更改节流,您可以使用不同的节流率相同的命令。例如:

    bin/kafka-reassign-partitions.sh --bootstrap-server
      <cluster_name>-kafka-bootstrap:9093 \
      --command-config /tmp/config.properties \
      --reassignment-json-file /tmp/reassignment.json \
      --throttle 10000000 \
      --execute
    Copy to Clipboard Toggle word wrap
  5. 验证重新分配已使用来自任何代理 pod 的 kafka-reassign-partitions.sh 命令行工具完成。这与上一步中的命令相同,但使用 --verify 选项而不是 --execute 选项。

    bin/kafka-reassign-partitions.sh --bootstrap-server
      <cluster_name>-kafka-bootstrap:9093 \
      --command-config /tmp/config.properties \
      --reassignment-json-file /tmp/reassignment.json \
      --verify
    Copy to Clipboard Toggle word wrap

    --verify 命令报告每个分区都成功完成时,重新分配过程已被重新分配。最后 --verify 还会导致删除任何重新分配节流的效果。

  6. 现在,如果保存了 JSON 以将分配恢复到其原始代理,您可以删除恢复的文件。
  7. 当所有分区重新分配完成后,被删除的代理不应该负责集群中的任何分区。您可以通过检查代理的数据日志目录是否包含任何实时分区日志来验证这一点。如果代理上的日志目录包含一个与扩展的正则表达式 \.[a-z0-9]-delete$ 不匹配的目录,则代理仍然具有实时分区,且不应停止。

    您可以通过执行命令来检查这一点:

    oc exec my-cluster-kafka-0 -c kafka -it -- \
      /bin/bash -c \
      "ls -l /var/lib/kafka/kafka-log_<n>_ | grep -E '^d' | grep -vE '[a-zA-Z0-9.-]+\.[a-z0-9]+-delete$'"
    Copy to Clipboard Toggle word wrap

    其中 n 是要删除的 pod 的数量。

    如果上述命令打印任何输出,则代理仍然具有实时分区。在这种情况下,重新分配过程没有完成,或者重新分配 JSON 文件不正确。

  8. 当您确认代理没有实时分区时,您可以编辑 Kafka 资源的 Kafka.spec.kafka.replicas 属性来减少代理数量。

2.2.5. 用于滚动更新的维护时间窗

通过维护时间窗口,您可以调度 Kafka 和 ZooKeeper 集群的某些滚动更新,以便在方便的时间启动。

2.2.5.1. 维护时间窗概述

在大多数情况下,Cluster Operator 只更新 Kafka 或 ZooKeeper 集群以响应对应的 Kafka 资源。这可让您在将更改应用到 Kafka 资源时计划,以最小化对 Kafka 客户端应用程序的影响。

但是,如果没有对 Kafka 资源进行任何对应的更改,可能会发生对 Kafka 和 ZooKeeper 集群的一些更新。例如,如果它管理的 CA (证书颁发机构)证书接近到期,Cluster Operator 将需要执行滚动重启。

虽然 pod 的滚动重启应该不会影响服务的可用性 (假设正确的代理和主题配置),但它可能会影响 Kafka 客户端应用程序的性能。通过维护时间窗口,您可以调度 Kafka 和 ZooKeeper 集群的启动,以便在方便的时间启动。如果没有为集群配置维护时间窗口,则这种初始滚动更新可能会在高负载的可预测期间发生。

2.2.5.2. 维护时间窗定义

您可以通过在 Kafka.spec.maintenanceTimeWindows 属性中输入字符串数组来配置维护时间窗。每个字符串都是一个 cron 表达式 在 UTC 中(Coordinated Universal Time),其实际用途与 Greenwich Mean Time 相同。

以下示例配置一个维护时间窗,该窗口在午夜开始,于 01:59am (UTC)结束,周日、星期二、星期二、周三和星期四:

# ...
maintenanceTimeWindows:
  - "* * 0-1 ? * SUN,MON,TUE,WED,THU *"
# ...
Copy to Clipboard Toggle word wrap

在实践中,维护窗口应当与 Kafka 资源的 Kafka.spec.clusterCa.renewalDaysKafka.spec.clientsCa.renewalDays 属性一起设置,以确保在配置的维护时间窗口中完成必要的 CA 证书续订。

注意

AMQ Streams 不会根据给定窗口精确调度维护操作。相反,对于每个协调,它会检查维护窗口当前是否"open"。这意味着,在一个给定时间窗内开始维护操作会延迟到 Cluster Operator 协调间隔。因此,维护时间窗口必须至少如此长。

2.2.5.3. 配置维护时间窗

您可以为支持的进程触发的滚动更新配置维护时间窗。

先决条件

  • OpenShift 集群。
  • Cluster Operator 正在运行。

流程

  1. Kafka 资源中添加或编辑 maintenanceTimeWindows 属性。例如,允许 0800 到 1059 到 1400 到 1559 之间的维护,您可以设置 maintenanceTimeWindows,如下所示:

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        # ...
      zookeeper:
        # ...
      maintenanceTimeWindows:
        - "* * 8-10 * * ?"
        - "* * 14-15 * * ?"
    Copy to Clipboard Toggle word wrap
  2. 创建或更新资源:

    oc apply -f <kafka_configuration_file>
    Copy to Clipboard Toggle word wrap

2.2.6. 从一个终端连接到 ZooKeeper

大多数 Kafka CLI 工具可以直接连接到 Kafka,因此当一般情况下,您不需要连接到 ZooKeeper。zookeeper 服务通过加密和身份验证进行保护,并不能供不属于 AMQ Streams 的外部应用程序使用。

但是,如果要使用需要连接到 ZooKeeper 的 Kafka CLI 工具,您可以使用 ZooKeeper 容器内的终端,并连接到 localhost:12181 作为 ZooKeeper 地址。

先决条件

  • OpenShift 集群可用。
  • Kafka 集群正在运行。
  • Cluster Operator 正在运行。

流程

  1. 使用 OpenShift 控制台打开终端,或者从 CLI 运行 exec 命令。

    例如:

    oc exec -ti my-cluster-zookeeper-0 -- bin/kafka-topics.sh --list --zookeeper localhost:12181
    Copy to Clipboard Toggle word wrap

    务必使用 localhost:12181

    现在,您可以在 ZooKeeper 中运行 Kafka 命令。

2.2.7. 手动删除 Kafka 节点

这个步骤描述了如何使用 OpenShift 注解删除现有 Kafka 节点。删除 Kafka 节点包括删除运行 Kafka 代理的 Pod 以及相关的 PersistentVolumeClaim (如果集群已部署了持久性存储)。删除后,Pod 及其相关的 PersistentVolumeClaim 会自动重新创建。

警告

删除 PersistentVolumeClaim 可能会导致持久性数据丢失。只有在遇到存储问题时才会执行以下步骤。

先决条件

有关运行的信息 ,请参阅 OpenShift 中的部署和升级 AMQ Streams 指南:

流程

  1. 查找您要删除的 Pod 的名称。

    Kafka 代理 pod 的名称为 & lt;cluster-name&gt; -kafka-<index & gt;,其中 <index> 从零开始,结尾是 replicas减一总数。例如,my-cluster-kafka-0

  2. 注解 OpenShift 中的 Pod 资源。

    使用 oc annotate:

    oc annotate pod cluster-name-kafka-index strimzi.io/delete-pod-and-pvc=true
    Copy to Clipboard Toggle word wrap
  3. 当注解的具有底层持久性卷声明的 pod 被删除后再重新创建时,等待下一次协调。

2.2.8. 手动删除 ZooKeeper 节点

此流程描述了如何使用 OpenShift 注解删除现有 ZooKeeper 节点。删除 ZooKeeper 节点包括删除运行 ZooKeeper 的 Pod 以及相关的 PersistentVolumeClaim (如果集群已部署了持久性存储)。删除后,Pod 及其相关的 PersistentVolumeClaim 会自动重新创建。

警告

删除 PersistentVolumeClaim 可能会导致持久性数据丢失。只有在遇到存储问题时才会执行以下步骤。

先决条件

有关运行的信息 ,请参阅 OpenShift 中的部署和升级 AMQ Streams 指南:

流程

  1. 查找您要删除的 Pod 的名称。

    zookeeper pod 的名称为 < cluster-name>-zookeeper-<index &gt ;,其中 <index > 从零开始,结尾是 replicas minus 总数。例如,my-cluster-zookeeper-0

  2. 注解 OpenShift 中的 Pod 资源。

    使用 oc annotate:

    oc annotate pod cluster-name-zookeeper-index strimzi.io/delete-pod-and-pvc=true
    Copy to Clipboard Toggle word wrap
  3. 当注解的具有底层持久性卷声明的 pod 被删除后再重新创建时,等待下一次协调。

2.2.9. Kafka 集群资源列表

以下资源由 OpenShift 集群中的 Cluster Operator 创建:

共享资源

cluster-name-cluster-ca
带有用于加密集群通信的集群 CA 私钥的 secret。
cluster-name-cluster-ca-cert
使用集群 CA 公钥的 secret。这个密钥可以用来验证 Kafka 代理的身份。
cluster-name-clients-ca
带有用于为用户证书签名的客户端 CA 私钥的 secret
cluster-name-clients-ca-cert
使用客户端 CA 公钥的 secret。这个密钥可以用来验证 Kafka 用户的身份。
cluster-name-cluster-operator-certs
带有集群 operator 键与 Kafka 和 ZooKeeper 通信的 secret。

Zookeeper 节点

cluster-name-zookeeper

提供给以下 ZooKeeper 资源的名称:

  • StatefulSet 或 StrimziPodSet (如果启用了 UseStrimziPodSets 功能门 )用于管理 ZooKeeper 节点 Pod。
  • ZooKeeper 节点使用的服务帐户。
  • 为 ZooKeeper 节点配置的 PodDisruptionBudget。
cluster-name-zookeeper-idx
ZooKeeper StatefulSet 或 StrimziPodSet 创建的 Pod。
cluster-name-zookeeper-nodes
无头服务需要使 DNS 直接解析 ZooKeeper pod IP 地址。
cluster-name-zookeeper-client
Kafka 代理使用的服务作为客户端连接到 ZooKeeper 节点。
cluster-name-zookeeper-config
包含 ZooKeeper 辅助配置的 ConfigMap,并被 ZooKeeper 节点 Pod 作为一个卷挂载。
cluster-name-zookeeper-nodes
使用 ZooKeeper 节点键的 secret。
cluster-name-network-policy-zookeeper
网络策略管理对 ZooKeeper 服务的访问。
data-cluster-name-zookeeper-idx
用于存储 ZooKeeper 节点 pod idx 数据的卷的持久性卷声明。只有在为置备持久性卷置备持久性卷选择了持久性存储时,才会创建此资源。

Kafka 代理

cluster-name-kafka

提供给以下 Kafka 资源的名称:

  • StatefulSet 或 StrimziPodSet (如果启用了 UseStrimziPodSets 功能门 )用于管理 Kafka 代理 pod。
  • Kafka pod 使用的服务帐户。
  • 为 Kafka 代理配置的 PodDisruptionBudget。
cluster-name-kafka-idx

提供给以下 Kafka 资源的名称:

  • 由 Kafka StatefulSet 或 StrimziPodSet 创建的 Pod。
  • 带有 Kafka 代理配置的 ConfigMap (如果启用了 UseStrimziPodSets 功能门 )。
cluster-name-kafka-brokers
服务需要 DNS 解析 Kafka 代理 pod IP 地址。
cluster-name-kafka-bootstrap
服务可用作从 OpenShift 集群内部连接的 Kafka 客户端的 bootstrap 服务器。
cluster-name-kafka-external-bootstrap
从 OpenShift 集群外部连接的客户端的 bootstrap 服务。只有在启用外部监听程序时,才会创建此资源。当监听器名称是 外部 并且端口为 9094 时,旧服务名称将用于向后兼容。
cluster-name-kafka-pod-id
用于将流量从 OpenShift 集群外部路由到各个容器集的服务。只有在启用外部监听程序时,才会创建此资源。当监听器名称是 外部 并且端口为 9094 时,旧服务名称将用于向后兼容。
cluster-name-kafka-external-bootstrap
从 OpenShift 集群外部连接的客户端的 bootstrap 路由。只有在启用外部监听器并且设置为类型 路由 时才会创建此资源。当监听器名称是 外部 并且端口为 9094 时,旧路由名称将用于向后兼容。
cluster-name-kafka-pod-id
将来自 OpenShift 集群外的流量路由到各个容器集。只有在启用外部监听器并且设置为类型 路由 时才会创建此资源。当监听器名称是 外部 并且端口为 9094 时,旧路由名称将用于向后兼容。
cluster-name-kafka-listener-name-bootstrap
从 OpenShift 集群外部连接的客户端的 bootstrap 服务。只有在启用外部监听程序时,才会创建此资源。新的服务名称将用于所有其他外部监听程序。
cluster-name-kafka-listener-name-pod-id
用于将流量从 OpenShift 集群外部路由到各个容器集的服务。只有在启用外部监听程序时,才会创建此资源。新的服务名称将用于所有其他外部监听程序。
cluster-name-kafka-listener-name-bootstrap
从 OpenShift 集群外部连接的客户端的 bootstrap 路由。只有在启用外部监听器并且设置为类型 路由 时才会创建此资源。新路由名称将用于所有其他外部监听程序。
cluster-name-kafka-listener-name-pod-id
将来自 OpenShift 集群外的流量路由到各个容器集。只有在启用外部监听器并且设置为类型 路由 时才会创建此资源。新路由名称将用于所有其他外部监听程序。
cluster-name-kafka-config
包含 Kafka 辅助配置的 ConfigMap,并由 Kafka 代理 pod 挂载为卷。
cluster-name-kafka-brokers
带有 Kafka 代理密钥的 secret。
cluster-name-network-policy-kafka
网络策略管理对 Kafka 服务的访问。
strimzi-namespace-name-cluster-name-kafka-init
Kafka 代理使用的集群角色绑定。
cluster-name-jmx
用于保护 Kafka 代理端口的 JMX 用户名和密码的 secret。只有在 Kafka 中启用了 JMX 时,这个资源才会被创建。
data-cluster-name-kafka-idx
用于为 Kafka 代理 pod idx 存储数据的卷持久性卷声明。只有在为置备持久性卷置备持久性卷选择了持久性存储时,才会创建此资源。
data-id-cluster-name-kafka-idx
id 的持久性卷声明用于存储 Kafka 代理 pod idx 的数据。只有在置备持久性卷以存储数据时为 JBOD 卷选择持久性存储时,才会创建此资源。

Entity Operator

只有在使用 Cluster Operator 部署 Entity Operator 时,才会创建这些资源。

cluster-name-entity-operator

提供给以下实体 Operator 资源的名称:

  • 使用主题和用户操作员进行部署.
  • Entity Operator 使用的服务帐户。
cluster-name-entity-operator-random-string
由 Entity Operator 部署创建的 Pod。
cluster-name-entity-topic-operator-config
带有主题 Operator 配置的 ConfigMap。
cluster-name-entity-user-operator-config
使用用户 Operator 的辅助配置 ConfigMap。
cluster-name-entity-topic-operator-certs
带有主题 Operator 密钥用于与 Kafka 和 ZooKeeper 通信的 secret。
cluster-name-entity-user-operator-certs
带有用户 Operator 密钥(用于与 Kafka 和 ZooKeeper 的通信)的 secret。
strimzi-cluster-name-entity-topic-operator
Entity Topic Operator 使用的角色绑定。
strimzi-cluster-name-entity-user-operator
Entity User Operator 使用的角色绑定。

Kafka Exporter

只有在使用 Cluster Operator 部署 Kafka 导出器时才会创建这些资源。

cluster-name-kafka-exporter

提供给以下 Kafka 导出器资源的名称:

  • 使用 Kafka 导出器进行部署。
  • 用于收集消费者 lag 指标的服务。
  • Kafka 导出器使用的服务帐户。
cluster-name-kafka-exporter-random-string
由 Kafka Exporter 部署创建的 Pod。

Sything Control

只有在使用 Cluster Operator 部署有 Cruise Control 时,才会创建这些资源。

cluster-name-cruise-control

给定以下 Cruise 控制资源的名称:

  • 使用 Cruise 控制进行部署.
  • 用于与 Cruise 控制通信的服务。
  • 断路器控制使用的服务帐户。
cluster-name-cruise-control-random-string
由 Cruise Control 部署创建的 Pod。
cluster-name-cruise-control-config
包含 Cruise 控制辅助配置的 ConfigMap,并被 Cruise Control pod 作为一个卷挂载。
cluster-name-cruise-control-certs
带有 Cruise 控制密钥的 secret 用于与 Kafka 和 ZooKeeper 通信。
cluster-name-network-policy-cruise-control
网络策略管理对 Cruise Control 服务的访问。
返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat