搜索

第 2 章 部署配置

download PDF

本章论述了如何为支持的部署配置不同方面:

  • Kafka 集群
  • Kafka Connect 集群
  • 支持 Source2Image 的 Kafka Connect 集群
  • Kafka Mirror Maker
  • Kafka Bridge
  • 基于令牌的 OAuth 2.0 身份验证
  • 基于令牌的 OAuth 2.0 授权

2.1. Kafka 集群配置

Kafka 资源的完整 schema 信息包括在 第 B.2 节 “Kafka 模式参考” 中。应用到所需 Kafka 资源的所有标签也将应用到组成 Kafka 集群的 OpenShift 资源。这提供了一种便捷的机制,可使资源根据需要进行标记。

2.1.1. Kafka YAML 配置示例

有关了解 Kafka 部署可用的配置选项的帮助,请参考此处提供的示例 YAML 文件。

示例只显示一些可能的选项,但尤为重要的配置选项包括:

  • 资源请求(CPU / Memory)
  • 用于最大和最小内存分配的 JVM 选项
  • 侦听器(及身份验证)
  • 认证
  • 存储
  • 机架感知
  • 指标
apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    replicas: 3 1
    version: 1.6 2
    resources: 3
      requests:
        memory: 64Gi
        cpu: "8"
      limits: 4
        memory: 64Gi
        cpu: "12"
    jvmOptions: 5
      -Xms: 8192m
      -Xmx: 8192m
    listeners: 6
      - name: plain 7
        port: 9092 8
        type: internal 9
        tls: false 10
        configuration:
          useServiceDnsDomain: true 11
      - name: tls
        port: 9093
        type: internal
        tls: true
        authentication: 12
          type: tls
      - name: external 13
        port: 9094
        type: route
        tls: true
        configuration:
          brokerCertChainAndKey: 14
            secretName: my-secret
            certificate: my-certificate.crt
            key: my-key.key
    authorization: 15
      type: simple
    config: 16
      auto.create.topics.enable: "false"
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      ssl.cipher.suites: "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384" 17
      ssl.enabled.protocols: "TLSv1.2"
      ssl.protocol: "TLSv1.2"
    storage: 18
      type: persistent-claim 19
      size: 10000Gi 20
    rack: 21
      topologyKey: topology.kubernetes.io/zone
    metrics: 22
      lowercaseOutputName: true
      rules: 23
      # Special cases and very specific rules
      - pattern : kafka.server<type=(.+), name=(.+), clientId=(.+), topic=(.+), partition=(.*)><>Value
        name: kafka_server_$1_$2
        type: GAUGE
        labels:
          clientId: "$3"
          topic: "$4"
          partition: "$5"
        # ...
  zookeeper: 24
    replicas: 3
    resources:
      requests:
        memory: 8Gi
        cpu: "2"
      limits:
        memory: 8Gi
        cpu: "2"
    jvmOptions:
      -Xms: 4096m
      -Xmx: 4096m
    storage:
      type: persistent-claim
      size: 1000Gi
    metrics:
      # ...
  entityOperator: 25
    topicOperator:
      resources:
        requests:
          memory: 512Mi
          cpu: "1"
        limits:
          memory: 512Mi
          cpu: "1"
    userOperator:
      resources:
        requests:
          memory: 512Mi
          cpu: "1"
        limits:
          memory: 512Mi
          cpu: "1"
  kafkaExporter: 26
    # ...
  cruiseControl: 27
    # ...
1
2
Kafka 版本,可按照 升级过程 更改。
3
4
资源限值指定容器可以消耗的最大资源。
5
6
侦听器配置客户端如何通过 bootstrap 地址连接到 Kafka 集群。侦听器 配置为内部或外部 侦听器,用于 OpenShift 集群 内部和外部 的连接
7
用于标识侦听器的名称。在 Kafka 集群中必须是唯一的。
8
Kafka 内侦听器使用的端口号。在给定 Kafka 集群中,端口号必须是唯一的。允许的端口号是 9092 及以上,但端口 9404 和 9999 除外,它们已用于 Prometheus 和 JMX。根据监听程序类型,端口号可能与连接 Kafka 客户端的端口号不同。
9
侦听器类型指定为 内部,或用于外部监听器,作为 路由负载均衡器、 节点端口入口
10
为每个侦听器启用 TLS 加密。默认为 false路由 监听程序不需要 TLS 加密。
11
定义是否分配了包括集群服务后缀(通常. cluster.local)在内的完全限定 DNS 名称。
12
13
14
由外部 证书颁发机构管理的 Kafka 侦听器 证书的可选配置。brokerCertChainAndKey 属性指定包含服务器证书和私钥的 Secret。也可以为 TLS 侦听程序配置 Kafka 侦听程序证书。
15
授权在 Kafka 代理上启用简单、OAUTH 2.0 或 OPA 授权。简单授权使用 AclAuthorizer Kafka 插件。
16
17
18
19
20
持久性存储具有 额外的配置选项,如用于动态卷置备的存储 idclass
21
将机架感知配置为在 不同机架之间分布副本拓扑 键必须与集群节点标签匹配。
22
23
通过 JMX Exporter 将指标导出到 Grafana 仪表板的 Kafka 规则。可在您的 Kafka 资源配置中复制由 AMQ Streams 提供的一组规则。
24
zookeeper 特定的配置,其中包含与 Kafka 配置类似的属性。
25
26
27
精简控制配置 ,用于重新平衡 Kafka 集群

2.1.2. 数据存储注意事项

高效的数据存储基础架构对于 AMQ 流的最佳性能至关重要。

块存储是必需的。文件存储(如 NFS)无法使用 Kafka。

对于块存储,您可以选择:

注意

AMQ Streams 不需要 OpenShift 原始块卷。

2.1.2.1. 文件系统

建议您将存储系统配置为使用 XFS 文件系统。AMQ Streams 还与 ext4 文件系统兼容,但为了获得最佳结果,可能需要额外的配置。

2.1.2.2. Apache Kafka 和 ZooKeeper 存储

Apache Kafka 和 ZooKeeper 使用单独的磁盘。

支持三种类型的数据存储:

  • 临时(仅适用于开发的建议)
  • persistent
  • JBOD(只是一个磁盘绑定,仅适用于 Kafka)

如需更多信息,请参阅 Kafka 和 ZooKeeper 存储

在异步发送和从多个主题接收数据的大型集群中,固态驱动器(SSD)(虽然不必要)可以提高 Kafka 的性能。SSD 在 ZooKeeper 中特别有效,它需要快速、低延迟的数据访问。

注意

您不需要置备复制存储,因为 Kafka 和 ZooKeeper 都内置了数据复制。

2.1.3. Kafka 和 ZooKeeper 存储类型

作为有状态应用程序,Kafka 和 ZooKeeper 需要将数据存储在磁盘上。AMQ Streams 支持用于此数据的三种存储类型:

  • ephemeral
  • persistent
  • JBOD 存储
注意

JBOD 存储只支持 Kafka,不适用于 ZooKeeper。

在配置 Kafka 资源时,您可以指定 Kafka 代理使用的存储类型及其对应的 ZooKeeper 节点。您可以使用以下资源中的 storage 属性配置存储类型:

  • Kafka.spec.kafka
  • Kafka.spec.zookeeper

存储类型在 type 字段中配置。

警告

部署 Kafka 集群后,无法更改存储类型。

其他资源

2.1.3.1. 临时存储

临时存储使用 emptyDir 卷来存储数据。若要使用临时存储,type 字段应设置为 ephemeral

重要

emptyDir 卷不是持久性的,存储在其中的数据会在 Pod 重启时丢失。新 pod 启动后,它必须从集群的其他节点恢复所有数据。临时存储不适用于单一节点 ZooKeeper 集群,而对于带有复制因数 1 的 Kafka 主题,这会导致数据丢失。

临时存储示例

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    # ...
    storage:
      type: ephemeral
    # ...
  zookeeper:
    # ...
    storage:
      type: ephemeral
    # ...

2.1.3.1.1. 日志目录

Kafka 代理将使用临时卷作为挂载到以下路径的日志目录:

/var/lib/kafka/data/kafka-log_idx_
其中 idx 是 Kafka 代理 pod 索引。例如 /var/lib/kafka/data/kafka-log0。

2.1.3.2. 持久性存储

永久存储使用持久卷声明 来调配用于存储数据的持久卷。持久卷声明可用于调配许多不同类型的卷,具体取决于要调配卷的 存储类。可以与持久卷声明一起使用的数据类型包括许多类型的 SAN 存储 和本地持久卷

若要使用持久存储,必须将 type 设置为 persistent-claim。持久性存储支持额外的配置选项:

id (可选)
存储标识号.对于 JBOD 存储声明中定义的存储卷,此选项是必需的。默认值为 0
大小 (必需)
定义持久卷声明的大小,如 "1000Gi"。
class (可选)
用于动态卷置备的 OpenShift 存储类
selector (可选)
允许选择要使用的特定持久性卷。它包含键:值对,代表用于选择此类卷的标签。
deleteClaim (optional)
指定在取消部署集群时是否需要删除持久卷声明的布尔值。默认为 false
警告

仅支持重新定义持久性卷大小的 OpenShift 版本支持在现有 AMQ Streams 集群中增加持久性卷的大小。要调整持久性卷的大小必须使用支持卷扩展的存储类。对于不支持卷扩展的其他 OpenShift 版本和存储类,您必须在部署集群前决定必要的存储大小。无法减少现有持久性卷的大小。

使用 1000Gi 大小的持久性存储配置片段示例

# ...
storage:
  type: persistent-claim
  size: 1000Gi
# ...

以下示例演示了存储类的使用。

使用特定存储类的持久性存储配置片段示例

# ...
storage:
  type: persistent-claim
  size: 1Gi
  class: my-storage-class
# ...

最后,可以利用 选择器 来选择特定的标记持久卷,以提供 SSD 等必要功能。

使用选择器的持久性存储配置片段示例

# ...
storage:
  type: persistent-claim
  size: 1Gi
  selector:
    hdd-type: ssd
  deleteClaim: true
# ...

2.1.3.2.1. 存储类覆盖

您可以为一个或多个 Kafka 代理或 ZooKeeper 节点指定不同的存储类,而不是使用默认存储类。例如,当存储类仅限于不同的可用区或数据中心时,这很有用。您可以使用 overrides 字段来实现这一目的。

在本例中,默认存储类名为 my-storage-class

使用存储类覆盖的 AMQ Streams 集群示例

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  labels:
    app: my-cluster
  name: my-cluster
  namespace: myproject
spec:
  # ...
  kafka:
    replicas: 3
    storage:
      deleteClaim: true
      size: 100Gi
      type: persistent-claim
      class: my-storage-class
      overrides:
        - broker: 0
          class: my-storage-class-zone-1a
        - broker: 1
          class: my-storage-class-zone-1b
        - broker: 2
          class: my-storage-class-zone-1c
  # ...
  zookeeper:
    replicas: 3
    storage:
      deleteClaim: true
      size: 100Gi
      type: persistent-claim
      class: my-storage-class
      overrides:
        - broker: 0
          class: my-storage-class-zone-1a
        - broker: 1
          class: my-storage-class-zone-1b
        - broker: 2
          class: my-storage-class-zone-1c
  # ...

配置的 overrides 属性后,卷使用以下存储类:

  • ZooKeeper 节点 0 的持久卷将使用 my-storage-class-zone-1a
  • ZooKeeper 节点 1 的持久卷将使用 my-storage-class-zone-1b
  • ZooKeeepr 节点 2 的持久卷将使用 my-storage-class-zone-1c
  • Kafka 代理 0 的持久性卷将使用 my-storage-class-zone-1a
  • Kafka 代理 1 的持久性卷将使用 my-storage-class-zone-1b
  • Kafka 代理 2 的持久性卷将使用 my-storage-class-zone-1c

overrides 属性目前只用于覆盖存储类配置。目前不支持覆盖其他存储配置字段。目前不支持存储配置中的其他字段。

2.1.3.2.2. 持久性卷声明命名

使用持久性存储时,它会使用以下名称创建持久性卷声明:

data-cluster-name-kafka-idx
用于存储 Kafka 代理 pod IDx 数据的卷的持久性卷声明。
data-cluster-name-zookeeper-idx
用于为 ZooKeeper 节点 pod idx 存储数据的卷的持久性卷声明。
2.1.3.2.3. 日志目录

Kafka 代理将使用持久性卷作为挂载到以下路径的日志目录:

/var/lib/kafka/data/kafka-log_idx_
其中 idx 是 Kafka 代理 pod 索引。例如 /var/lib/kafka/data/kafka-log0。

2.1.3.3. 重新定义持久性卷大小

您可以通过增大现有 AMQ Streams 集群使用的持久性卷的大小来置备存储容量。在使用一个持久性卷或在 JBOD 存储配置中使用多个持久性卷的集群中,支持重新定义持久性卷大小。

注意

您可以增大但不能缩小持久性卷的大小。OpenShift 目前不支持缩小持久卷的大小。

先决条件

  • 个 OpenShift 集群,其支持卷大小调整。
  • Cluster Operator 正在运行。
  • 使用使用支持卷扩展的存储类创建的持久性卷的 Kafka 集群。

步骤

  1. Kafka 资源中,增大分配给 Kafka 集群和 ZooKeeper 集群的持久性卷的大小。

    • 要增大分配给 Kafka 集群的卷大小,编辑 spec.kafka.storage 属性。
    • 要增大分配给 ZooKeeper 集群的卷大小,请编辑 spec.zookeeper.storage 属性。

      例如,将卷大小从 1000Gi 增加到 2000Gi

      apiVersion: kafka.strimzi.io/v1beta1
      kind: Kafka
      metadata:
        name: my-cluster
      spec:
        kafka:
          # ...
          storage:
            type: persistent-claim
            size: 2000Gi
            class: my-storage-class
          # ...
        zookeeper:
          # ...
  2. 创建或更新资源。

    使用 oc apply:

    oc apply -f your-file

    OpenShift 会响应来自 Cluster Operator 的请求,增加了所选持久性卷的容量。完成大小调整后,Cluster Operator 会重启所有使用调整大小持久性卷的 pod。这会自动发生。

其他资源

有关在 OpenShift 中调整持久性卷大小的更多信息,请参阅使用 Kubernetes 重新定义持久性卷 大小。

2.1.3.4. JBOD 存储概述

您可以将 AMQ Streams 配置为使用 JBOD,这是多个磁盘或卷的数据存储配置。JBOD 是为 Kafka 代理提供更多数据存储的一种方法。它还能提高性能。

一个或多个卷描述了 JBOD 配置,每个卷可以 临时 或永久 JBOD 卷声明的规则和限制与临时和永久存储的规则和限制相同。例如,您无法在置备后更改持久性存储卷的大小。

2.1.3.4.1. JBOD 配置

若要将 JBOD 与 AMQ Streams 搭配使用,存储 类型 必须设置为 jbod。借助 volumes 属性,您可以描述组成 JBOD 存储阵列或配置的磁盘。以下片段显示了 JBOD 配置示例:

# ...
storage:
  type: jbod
  volumes:
  - id: 0
    type: persistent-claim
    size: 100Gi
    deleteClaim: false
  - id: 1
    type: persistent-claim
    size: 100Gi
    deleteClaim: false
# ...

创建 JBOD 卷后,无法更改 id。

用户可以从 JBOD 配置中添加或删除卷。

2.1.3.4.2. JBOD 和持久性卷声明

当使用持久性存储声明 JBOD 卷时,生成的持久性卷声明的命名方案如下:

data-id-cluster-name-kafka-idx
其中 id 是用于存储 Kafka 代理 pod idx 数据的卷 ID。
2.1.3.4.3. 日志目录

Kafka 代理将使用 JBOD 卷作为挂载到以下路径的日志目录:

/var/lib/kafka/data-id/kafka-log_idx_
其中 id 是用于存储 Kafka 代理 pod idx 数据的卷 ID。例如 /var/lib/kafka/data-0/kafka-log0。

2.1.3.5. 将卷添加到 JBOD 存储

这个步骤描述了如何在配置为使用 JBOD 存储的 Kafka 集群中添加卷。它不能应用到配置为使用任何其他存储类型的 Kafka 集群。

注意

在已经使用并被删除的 id 下添加新卷时,您必须确保之前使用的 PersistentVolumeClaims 已被删除。

先决条件

  • OpenShift 集群
  • 一个正在运行的 Cluster Operator
  • 带有 JBOD 存储的 Kafka 集群

步骤

  1. 编辑 Kafka 资源中的 spec.kafka.storage.volumes 属性。将新卷添加到 volumes 数组。例如,添加 ID 为 2 的新卷:

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        # ...
        storage:
          type: jbod
          volumes:
          - id: 0
            type: persistent-claim
            size: 100Gi
            deleteClaim: false
          - id: 1
            type: persistent-claim
            size: 100Gi
            deleteClaim: false
          - id: 2
            type: persistent-claim
            size: 100Gi
            deleteClaim: false
        # ...
      zookeeper:
        # ...
  2. 创建或更新资源。

    这可以使用 oc apply 来完成:

    oc apply -f KAFKA-CONFIG-FILE
  3. 创建新主题或将现有分区重新分配至新磁盘.

其他资源

有关重新分配主题的详情请参考 第 2.1.24.2 节 “分区重新分配”

2.1.3.6. 从 JBOD 存储中删除卷

这个步骤描述了如何从配置为使用 JBOD 存储的 Kafka 集群中删除卷。它不能应用到配置为使用任何其他存储类型的 Kafka 集群。JBOD 存储始终必须至少包含一个卷。

重要

为避免数据丢失,您必须先移动所有分区,然后再删除卷。

先决条件

  • OpenShift 集群
  • 一个正在运行的 Cluster Operator
  • 带有两个或多个卷的 JBOD 存储的 Kafka 集群

步骤

  1. 从要删除的磁盘中重新分配所有分区。分区中仍然分配到将要删除的磁盘中的任何数据都可能会丢失。
  2. 编辑 Kafka 资源中的 spec.kafka.storage.volumes 属性。从 volumes 阵列中删除一个或多个 。例如,删除 ID 为 12 的卷:

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        # ...
        storage:
          type: jbod
          volumes:
          - id: 0
            type: persistent-claim
            size: 100Gi
            deleteClaim: false
        # ...
      zookeeper:
        # ...
  3. 创建或更新资源。

    这可以使用 oc apply 来完成:

    oc apply -f your-file

其他资源

有关重新分配主题的详情请参考 第 2.1.24.2 节 “分区重新分配”

2.1.4. Kafka 代理副本

Kafka 集群可以使用多个代理运行。您可以在 Kafka. spec.kafka.replicas 中配置用于 Kafka 集群的代理数量。集群的最佳代理数量需要根据您的具体用例来确定。

2.1.4.1. 配置代理节点的数量

这个步骤描述了如何在新集群中配置 Kafka 代理节点的数量。它只适用于没有分区的新集群。如果您的集群已经定义了主题,请参阅 第 2.1.24 节 “扩展集群”

先决条件

  • OpenShift 集群
  • 一个正在运行的 Cluster Operator
  • 尚未定义主题的 Kafka 集群

步骤

  1. 编辑 Kafka 资源中的 replicas 属性。例如:

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        # ...
        replicas: 3
        # ...
      zookeeper:
        # ...
  2. 创建或更新资源。

    这可以使用 oc apply 来完成:

    oc apply -f your-file

其他资源

如果您的集群已经定义了主题,请参阅 第 2.1.24 节 “扩展集群”

2.1.5. Kafka 代理配置

AMQ Streams 允许您自定义 Kafka 集群中 Kafka 代理的配置。您可以指定和配置 Apache Kafka 文档 的"Broker Configs"部分中列出的大多数选项。您无法配置与以下区域相关的选项:

  • 安全(加密、身份验证和授权)
  • 监听程序配置
  • 代理 ID 配置
  • 日志数据目录的配置
  • Broker 间通信
  • zookeeper 连接

这些选项由 AMQ Streams 自动配置。

有关代理配置的更多信息,请参阅 KafkaClusterSpec 模式

监听程序配置

您可以配置监听程序以连接到 Kafka 代理。有关配置监听程序的更多信息,请参阅 Listener 配置

授权对 Kafka 的访问

您可以将 Kafka 集群配置为允许或拒绝用户执行的操作。有关保护对 Kafka 代理的访问的更多信息,请参阅管理对 Kafka 的访问

2.1.5.1. 配置 Kafka 代理

您可以配置现有的 Kafka 代理,或使用指定配置创建新 Kafka 代理。

先决条件

  • OpenShift 集群可用。
  • Cluster Operator 正在运行。

步骤

  1. 打开包含指定集群部署的 Kafka 资源的 YAML 配置文件。
  2. Kafka 资源中的 spec.kafka.config 属性中,输入一个或多个 Kafka 配置设置。例如:

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    spec:
      kafka:
        # ...
        config:
          default.replication.factor: 3
          offsets.topic.replication.factor: 3
          transaction.state.log.replication.factor: 3
          transaction.state.log.min.isr: 1
        # ...
      zookeeper:
        # ...
  3. 应用新配置以创建或更新资源。

    使用 oc apply:

    oc apply -f kafka.yaml

    其中 kafka.yaml 是您要配置的资源的 YAML 配置文件,例如 kafka-persistent.yaml

2.1.6. 监听程序配置

侦听器用于连接到 Kafka 代理。

AMQ Streams 提供了一个 generic GenericKafkaListener 模式,其中包含用于通过 Kafka 资源配置监听器的属性。

The GenericKafkaListener 提供了用于监听器配置的灵活方法。

您可以指定属性来配置 内部 侦听器,以便在 OpenShift 集群内连接,或者指定 外部 侦听器以连接 OpenShift 集群外部。

通用监听程序配置

每个监听程序都 定义为 Kafka 资源中的一个数组

有关监听器配置的更多信息,请参阅 GenericKafkaListener 模式参考

通用监听程序配置使用 KafkaListeners 模式参考替换了之前的监听程序配置方法,该参考 已被弃用。但是,您可以使用向后兼容将 旧格式转换为新格式

KafkaListeners 模式将子属性用于 纯文本tls 和外部 监听程序,以及每个节点的固定端口。由于架构架构固有的限制,只能配置三个监听器,配置选项仅限于监听器类型。

使用 GenericKafkaListener 模式时,您可以根据需要配置多个监听器,只要它们的名称和端口是唯一的。

您可能希望配置多个外部监听器,例如,以处理来自需要不同身份验证机制的网络的访问。或者,您可能需要将 OpenShift 网络加入外部网络。在这种情况下,您可以配置内部侦听器( 使用ServiceDnsDomain 属性),以便不使用 OpenShift 服务 DNS 域(通常为 .cluster.local)。

配置监听程序来保护对 Kafka 代理的访问

您可以使用身份验证为安全连接配置监听程序。有关保护对 Kafka 代理的访问的更多信息,请参阅管理对 Kafka 的访问

为 OpenShift 外部的客户端访问配置外部侦听器

您可以使用指定的连接机制(如 loadbalancer),为 OpenShift 环境外的客户端访问配置外部侦听器。有关连接外部客户端的配置选项的更多信息,请参阅配置外部监听程序

监听程序证书

您可以为启用了 TLS 加密的 TLS 监听程序或者外部监听程序提供自己的服务器证书,称为 Kafka 侦听程序证书。如需更多信息,请参阅 Kafka 侦听程序证书

2.1.7. zookeeper 副本

zookeeper 集群或ensemble 通常使用奇数的节点(通常为三个、五个或 7 个)运行。

大多数节点必须可用,才能保持有效的仲裁。如果 ZooKeeper 集群丢失其仲裁,它将停止响应客户端,Kafka 代理将停止工作。拥有稳定且高度可用的 ZooKeeper 集群对于 AMQ 流至关重要。

三节点集群
为了保持仲裁,三节点 ZooKeeper 群集至少需要启动并运行两个节点。它只能容忍一个节点不可用。
五节点集群
为了保持仲裁,五节点 ZooKeeper 群集至少需要启动并运行三个节点。它可能会容忍两个节点不可用。
七节点集群
为了保持仲裁,七节点 ZooKeeper 群集至少需要启动并运行四个节点。它可能会容忍三个节点不可用。
注意

出于开发目的,也可以使用单一节点运行 ZooKeeper。

拥有更多节点不一定意味着更好的性能,因为随着集群中节点的数量增加,保持仲裁的成本将会增加。根据可用性要求,您可以决定要使用的节点数量。

2.1.7.1. ZooKeeper 节点数量

可以使用 Kafka.spec.zookeeper 中的 replicas 属性来配置 ZooKeeper 节点的数量。

显示副本配置示例

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    # ...
  zookeeper:
    # ...
    replicas: 3
    # ...

2.1.7.2. 更改 ZooKeeper 副本数量

先决条件

  • OpenShift 集群可用。
  • Cluster Operator 正在运行。

步骤

  1. 打开包含指定集群部署的 Kafka 资源的 YAML 配置文件。
  2. Kafka 资源中的 spec.zookeeper.replicas 属性中,输入复制的 ZooKeeper 服务器数量。例如:

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        # ...
      zookeeper:
        # ...
        replicas: 3
        # ...
  3. 应用新配置以创建或更新资源。

    使用 oc apply:

    oc apply -f kafka.yaml

    其中 kafka.yaml 是您要配置的资源的 YAML 配置文件,例如 kafka-persistent.yaml

2.1.8. zookeeper 配置

AMQ Streams 允许您自定义 Apache ZooKeeper 节点的配置。您可以指定和配置 ZooKeeper 文档 中列出的大多数选项。

无法配置的选项与以下区域相关:

  • 安全(加密、身份验证和授权)
  • 监听程序配置
  • 数据目录的配置
  • zookeeper 集群组成

这些选项由 AMQ Streams 自动配置。

2.1.8.1. zookeeper 配置

zookeeper 节点使用 Kafka.spec.zookeeper 中的 config 属性进行配置。此属性包含 ZooKeeper 配置选项作为键。这些值可使用以下 JSON 类型之一描述:

  • 字符串
  • 数字
  • 布尔值

除了那些直接由 AMQ Streams 管理的选项外,用户可以指定和配置 ZooKeeper 文档 中列出的选项。具体来说,所有键为等于或以以下任一字符串开头的配置选项将被禁止:

  • 服务器.
  • dataDir
  • dataLogDir
  • clientPort
  • authProvider
  • quorum.auth
  • requireClientAuthScheme

config 属性中存在一个禁止选项时,它将被忽略,并会在 Cluster Operator 日志文件中输出警告信息。所有其他选项都传递到 ZooKeeper。

重要

Cluster Operator 不会验证提供的 config 对象中的密钥或值。当提供无效的配置时,ZooKeeper 集群可能不会启动,或者可能会变得不稳定。在这种情况下,Kafka.spec.zookeeper.config 对象中的配置应该被修复,Cluster Operator 会将新配置部署到所有 ZooKeeper 节点。

所选选项有默认值:

  • timeTick,默认值为 2000
  • 默认值为 5initLimit
  • 带有 默认值 2的同步 限制
  • autopurge.purgeInterval,默认值为 1

Kafka.spec.zookeeper.config 属性中没有这些选项时,会自动配置这些选项。

使用三个允许的 ssl 配置选项,将特定的 密码套件 用于 TLS 版本。密码套件组合了用于安全连接和数据传输的算法。

ZooKeeper 配置示例

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
spec:
  kafka:
    # ...
  zookeeper:
    # ...
    config:
      autopurge.snapRetainCount: 3
      autopurge.purgeInterval: 1
      ssl.cipher.suites: "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384" 1
      ssl.enabled.protocols: "TLSv1.2" 2
      ssl.protocol: "TLSv1.2" 3
    # ...

1
用于 TLS 的加密套件使用 ECDHE 密钥交换机制、RSA 身份验证算法、AES 批量机密算法和 SHA384 MAC 算法的组合。
2
启用 SSl 协议 TLSv1.2
3
指定用于生成 SSL 上下文的 TLSv1.2 协议。允许的值有 TLSv1.1TLSv1.2

2.1.8.2. 配置 ZooKeeper

先决条件

  • OpenShift 集群可用。
  • Cluster Operator 正在运行。

步骤

  1. 打开包含指定集群部署的 Kafka 资源的 YAML 配置文件。
  2. Kafka 资源中的 spec.zookeeper.config 属性中,输入一个或多个 ZooKeeper 配置设置。例如:

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    spec:
      kafka:
        # ...
      zookeeper:
        # ...
        config:
          autopurge.snapRetainCount: 3
          autopurge.purgeInterval: 1
        # ...
  3. 应用新配置以创建或更新资源。

    使用 oc apply:

    oc apply -f kafka.yaml

    其中 kafka.yaml 是您要配置的资源的 YAML 配置文件,例如 kafka-persistent.yaml

2.1.9. zookeeper 连接

zookeeper 服务通过加密和身份验证进行保护,它们不应该由不属于 AMQ Streams 的外部应用程序使用。

但是,如果要使用需要连接到 ZooKeeper 的 Kafka CLI 工具,您可以使用 ZooKeeper 容器中的终端,并连接 localhost:12181 作为 ZooKeeper 地址。

2.1.9.1. 从终端连接到 ZooKeeper

大多数 Kafka CLI 工具可以直接连接到 Kafka。因此,在正常情况下,您应不需要连接到 ZooKeeper。如果需要,您可以按照以下步骤操作。在 ZooKeeper 容器内打开终端,以使用 Kafka CLI 工具需要 ZooKeeper 连接。

先决条件

  • OpenShift 集群可用。
  • Kafka 集群正在运行。
  • Cluster Operator 正在运行。

步骤

  1. 使用 OpenShift 控制台打开终端,或者从 CLI 运行 exec 命令。

    例如:

    oc exec -it my-cluster-zookeeper-0 -- bin/kafka-topics.sh --list --zookeeper localhost:12181

    务必使用 localhost:12181

    现在,您可以向 ZooKeeper 运行 Kafka 命令。

2.1.10. 实体 Operator

Entity Operator 负责管理正在运行的 Kafka 集群中与 Kafka 相关的实体。

Entity Operator 包括:

通过 Kafka 资源配置,Cluster Operator 可在部署 Kafka 集群时部署 Entity Operator,包括一个或多个操作器。

注意

部署后,实体 Operator 根据部署配置包含操作器。

Operator 会自动配置为管理 Kafka 集群的主题和用户。

2.1.10.1. 实体 Operator 配置属性

使用 Kafka.spec 中的 principal Operator 属性来配置 Entity Operator。

entity Operator 属性支持以下几个子属性:

  • tlsSidecar
  • topicOperator
  • userOperator
  • 模板

tlsSidecar 属性包含 TLS sidecar 容器的配置,用于与 ZooKeeper 通信。有关配置 TLS sidecar 的详情请参考 第 2.1.19 节 “TLS sidecar”

template 属性包含 Entity Operator pod 的配置,如标签、注解、关联性和容限。有关配置模板的详情请参考 第 2.6 节 “自定义 OpenShift 资源”

topicOperator 属性包含 Topic Operator 的配置。当缺少这个选项时,Entity Operator 会在没有 Topic Operator 的情况下部署。

userOperator 属性包含 User Operator 的配置。当缺少这个选项时,Entity Operator 会在没有 User Operator 的情况下部署。

有关配置 Entity Operator 的属性的更多信息,请参阅 EntityUserOperatorSpec schema 参考

启用两个运算符的基本配置示例

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    # ...
  zookeeper:
    # ...
  entityOperator:
    topicOperator: {}
    userOperator: {}

如果将空对象({})用于 topicOperator 和 userOperator ,则所有属性都使用其默认值。

当缺少 topicOperatoruserOperator 属性时,Entity Operator 不会被部署。

2.1.10.2. 主题 Operator 配置属性

可以使用 topicOperator 对象中的附加选项来配置主题 Operator 部署。支持以下属性:

watchedNamespace
主题操作器监视 KafkaTopics 的 OpenShift 命名空间。默认为部署 Kafka 集群的命名空间。
reconciliationIntervalSeconds
定期协调之间的间隔(以秒为单位)。默认 90.
zookeeperSessionTimeoutSeconds
ZooKeeper 会话超时,以秒为单位。默认 20
topicMetadataMaxAttempts
从 Kafka 获取主题元数据的尝试数量。每次尝试之间的时间都定义为指数回退。考虑在因为分区或副本数量而创建主题需要更多时间时增加这个值。默认 6.
镜像
image 属性可用于配置要使用的容器镜像。有关配置自定义容器镜像的详情,请参考 第 2.1.18 节 “容器镜像”
资源
resources 属性配置分配给 Topic Operator 的资源数量。有关资源请求和限制配置的详情,请参阅 第 2.1.11 节 “CPU 和内存资源”
logging
logging 属性配置 Topic Operator 的日志。如需了解更多详细信息,请参阅 第 2.1.10.4 节 “Operator 日志记录器”

Topic Operator 配置示例

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    # ...
  zookeeper:
    # ...
  entityOperator:
    # ...
    topicOperator:
      watchedNamespace: my-topic-namespace
      reconciliationIntervalSeconds: 60
    # ...

2.1.10.3. 用户 Operator 配置属性

用户 Operator 部署可以使用 userOperator 对象中的附加选项进行配置。支持以下属性:

watchedNamespace
用户操作员在其中监视 KafkaUsers 的 OpenShift 命名空间。默认为部署 Kafka 集群的命名空间。
reconciliationIntervalSeconds
定期协调之间的间隔(以秒为单位)。默认值 120.
zookeeperSessionTimeoutSeconds
ZooKeeper 会话超时,以秒为单位。默认 6.
镜像
image 属性可用于配置要使用的容器镜像。有关配置自定义容器镜像的详情,请参考 第 2.1.18 节 “容器镜像”
资源
resources 属性配置分配给 User Operator 的资源数量。有关资源请求和限制配置的详情,请参阅 第 2.1.11 节 “CPU 和内存资源”
logging
logging 属性配置 User Operator 的日志记录。如需了解更多详细信息,请参阅 第 2.1.10.4 节 “Operator 日志记录器”

User Operator 配置示例

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    # ...
  zookeeper:
    # ...
  entityOperator:
    # ...
    userOperator:
      watchedNamespace: my-user-namespace
      reconciliationIntervalSeconds: 60
    # ...

2.1.10.4. Operator 日志记录器

Topic Operator 和 User Operator 具有可配置的日志记录器:

  • rootLogger.level

操作器使用 Apache log4j2 日志记录器实施。

使用 Kafka 资源中的 logging 属性来配置日志记录器和日志记录器级别。

您可以通过直接(内线)指定日志记录器和级别来设置日志级别,或使用自定义(外部)ConfigMap。如果使用 ConfigMap,则将 logging.name 属性设置为包含外部日志配置的 ConfigMap 的名称。在 ConfigMap 中,日志配置使用 log4j2.properties 进行 描述。

此处我们会看到 内联 和外部 记录示例。

内联日志记录

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    # ...
  zookeeper:
    # ...
  entityOperator:
    # ...
    topicOperator:
      watchedNamespace: my-topic-namespace
      reconciliationIntervalSeconds: 60
      logging:
        type: inline
        loggers:
          rootLogger.level: INFO
    # ...
    userOperator:
      watchedNamespace: my-topic-namespace
      reconciliationIntervalSeconds: 60
      logging:
        type: inline
        loggers:
          rootLogger.level: INFO
# ...

外部日志记录

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    # ...
  zookeeper:
    # ...
  entityOperator:
    # ...
    topicOperator:
      watchedNamespace: my-topic-namespace
      reconciliationIntervalSeconds: 60
      logging:
        type: external
        name: customConfigMap
# ...

其他资源

2.1.10.5. 配置实体 Operator

先决条件

  • OpenShift 集群
  • 一个正在运行的 Cluster Operator

步骤

  1. 编辑 Kafka 资源中的 principal Operator 属性。例如:

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        # ...
      zookeeper:
        # ...
      entityOperator:
        topicOperator:
          watchedNamespace: my-topic-namespace
          reconciliationIntervalSeconds: 60
        userOperator:
          watchedNamespace: my-user-namespace
          reconciliationIntervalSeconds: 60
  2. 创建或更新资源。

    这可以使用 oc apply 来完成:

    oc apply -f your-file

2.1.11. CPU 和内存资源

对于每个部署的容器,AMQ Streams 允许您请求特定资源并定义这些资源的最大消耗。

AMQ Streams 支持两种类型的资源:

  • CPU
  • 内存

AMQ Streams 使用 OpenShift 语法来指定 CPU 和内存资源。

2.1.11.1. 资源限值和请求

资源限值和请求使用以下 资源中的 resources 属性进行配置:

  • Kafka.spec.kafka
  • Kafka.spec.zookeeper
  • Kafka.spec.entityOperator.topicOperator
  • Kafka.spec.entityOperator.userOperator
  • Kafka.spec.entityOperator.tlsSidecar
  • Kafka.spec.kafkaExporter
  • KafkaConnect.spec
  • KafkaConnectS2I.spec
  • KafkaBridge.spec

其他资源

2.1.11.1.1. 资源请求

请求指定要为给定容器保留的资源。保留资源可确保资源始终可用。

重要

如果资源请求针对的是 OpenShift 集群中可用的可用资源,则不会调度该容器集。

资源请求在 requests 属性中指定。AMQ Streams 当前支持的资源请求:

  • cpu
  • memory

可以为一个或多个支持的资源配置请求。

使用所有资源的资源请求配置示例

# ...
resources:
  requests:
    cpu: 12
    memory: 64Gi
# ...

2.1.11.1.2. 资源限值

limits 指定给定容器可以消耗的最大资源。限制不是保留的,可能并不总是可用。容器只能在资源可用时才使用资源限制。资源限制应始终大于资源请求。

资源限值在 limits 属性中指定。AMQ Streams 当前支持的资源限制:

  • cpu
  • memory

资源可以被配置为一个或多个支持的限制。

资源限制配置示例

# ...
resources:
  limits:
    cpu: 12
    memory: 64Gi
# ...

2.1.11.1.3. 支持的 CPU 格式

支持 CPU 请求和限制,其格式如下:

  • CPU 内核数作为整数(5 个 CPU 内核)或十进制(2.5 个 CPU 内核)。
  • 数字或 millicpus / millicores( 100m),其中 1000 millicores 相同 1 个 CPU 内核。

CPU 单元示例

# ...
resources:
  requests:
    cpu: 500m
  limits:
    cpu: 2.5
# ...

注意

1 个 CPU 核心的计算能力可能因部署 OpenShift 的平台而异。

其他资源

  • 有关 CPU 规格的更多信息,请参阅 CPU 含义
2.1.11.1.4. 支持的内存格式

内存请求和限值以兆字节、千兆字节、兆字节和千兆字节为单位指定。

  • 要指定内存(以 MB 为单位),请使用 M 后缀。例如 1000M
  • 要指定以 GB 为单位的内存,请使用 G 后缀。例如 1G
  • 要以兆字节为单位指定内存,请使用 Mi 后缀。例如 1000Mi
  • 要以千兆字节为单位指定内存,请使用 Gi 后缀。例如 1Gi

使用不同内存单元的示例

# ...
resources:
  requests:
    memory: 512Mi
  limits:
    memory: 2Gi
# ...

其他资源

  • 有关内存规格和其他支持单元的详情,请参阅 内存含义

2.1.11.2. 配置资源请求和限值

先决条件

  • OpenShift 集群
  • 一个正在运行的 Cluster Operator

步骤

  1. 编辑指定集群部署的资源中的 resources 属性。例如:

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    spec:
      kafka:
        # ...
        resources:
          requests:
            cpu: "8"
            memory: 64Gi
          limits:
            cpu: "12"
            memory: 128Gi
        # ...
      zookeeper:
        # ...
  2. 创建或更新资源。

    这可以使用 oc apply 来完成:

    oc apply -f your-file

其他资源

2.1.12. Kafka 日志记录器

Kafka 拥有自己的可配置日志记录器:

  • log4j.logger.org.I0Itec.zkclient.ZkClient
  • log4j.logger.org.apache.zookeeper
  • log4j.logger.kafka
  • log4j.logger.org.apache.kafka
  • log4j.logger.kafka.request.logger
  • log4j.logger.kafka.network.Processor
  • log4j.logger.kafka.server.KafkaApis
  • log4j.logger.kafka.network.RequestChannel$
  • log4j.logger.kafka.controller
  • log4j.logger.kafka.log.LogCleaner
  • log4j.logger.state.change.logger
  • log4j.logger.kafka.authorizer.logger

zookeeper 还具有可配置的日志记录器:

  • zookeeper.root.logger

Kafka 和 ZooKeeper 使用 Apache log4j 日志记录器实现。

操作员使用 Apache log4j2 日志记录器实现,因此日志记录配置在 ConfigMap 中使用 log4j2.properties 进行 描述。如需更多信息,请参阅 第 2.1.10.4 节 “Operator 日志记录器”

使用 logging 属性来配置日志记录器和日志记录器级别。

您可以通过直接(内线)指定日志记录器和级别来设置日志级别,或使用自定义(外部)ConfigMap。如果使用 ConfigMap,则将 logging.name 属性设置为包含外部日志配置的 ConfigMap 的名称。在 ConfigMap 中,日志配置使用 log4j.properties 进行 描述。

此处我们会看到 内联 和外部 记录示例。

内联日志记录

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
spec:
  # ...
  kafka:
    # ...
    logging:
      type: inline
      loggers:
        kafka.root.logger.level: "INFO"
  # ...
  zookeeper:
    # ...
    logging:
      type: inline
      loggers:
        zookeeper.root.logger: "INFO"
  # ...
  entityOperator:
    # ...
    topicOperator:
      # ...
      logging:
        type: inline
        loggers:
          rootLogger.level: INFO
    # ...
    userOperator:
      # ...
      logging:
        type: inline
        loggers:
          rootLogger.level: INFO
    # ...

外部日志记录

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
spec:
  # ...
  logging:
    type: external
    name: customConfigMap
  # ...

对外部和内联日志记录级别的更改将在不重启的情况下应用到 Kafka 代理。

其他资源

2.1.13. Kafka 机架感知

AMQ Streams 中的机架感知功能有助于在不同机架之间分散 Kafka 代理 Pod 和 Kafka 主题副本。启用机架意识有助于提高 Kafka 代理的可用性以及它们托管的主题。

注意

"rack"可能代表数据中心中的可用性区域、数据中心或实际机架。

2.1.13.1. 在 Kafka 代理中配置机架感知

可以在 Kafka .spec.kafka 的 rack 属性中配置 Kafka 机架感知。rack 对象具有一个名为 topologyKey 的必填字段。此键需要与分配给 OpenShift 集群节点的一个标签匹配。OpenShift 在将 Kafka 代理 Pod 调度到节点时使用该标签。如果 OpenShift 集群在云提供商平台上运行,该标签应代表运行该节点的可用性区域。通常,节点使用 topology.kubernetes.io/zone 标签(或旧的 OpenShift 版本上的 failure-domain.beta.kubernetes.io/zone )标记为 topologyKey 值。如需有关 OpenShift 节点标签的更多信息,请参阅 Well-Known Labels、Annotations 和 Taints。这会导致在区间分散代理 pod,并在 Kafka 代理中设置 代理的 broker.rack 配置参数。

先决条件

  • OpenShift 集群
  • 一个正在运行的 Cluster Operator

步骤

  1. 请咨询您的 OpenShift 管理员,了解代表节点要部署到的区域 / 机架的节点标签。
  2. 将标签用作拓扑键,编辑 Kafka 资源中的 rack 属性。

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        # ...
        rack:
          topologyKey: topology.kubernetes.io/zone
        # ...
  3. 创建或更新资源。

    这可以使用 oc apply 来完成:

    oc apply -f your-file

其他资源

2.1.14. healthchecks

健康检查是定期测试,可验证应用的健康状况。当健康检查探测失败时,OpenShift 假定应用不健康,并尝试修复它。

OpenShift 支持两种类型的 Healthcheck 探测:

  • 存活度探测
  • 就绪度探测

有关探测的详情,请参阅 配置存活度和就绪度探测。这两种探测都用于 AMQ Streams 组件。

用户可以配置所选的存活度和就绪度探测选项。

2.1.14.1. healthcheck 配置

可以使用以下资源中的 livenessProbe 和 readinessProbe 属性来配置存活度和就绪度探测:

  • Kafka.spec.kafka
  • Kafka.spec.zookeeper
  • Kafka.spec.entityOperator.tlsSidecar
  • Kafka.spec.entityOperator.topicOperator
  • Kafka.spec.entityOperator.userOperator
  • Kafka.spec.kafkaExporter
  • KafkaConnect.spec
  • KafkaConnectS2I.spec
  • KafkaMirrorMaker.spec
  • KafkaBridge.spec

Live nessProbereadinessProbe 均支持以下选项:

  • initialDelaySeconds
  • timeoutSeconds
  • periodSeconds
  • successThreshold
  • failureThreshold

有关 livenessProbereadinessProbe 选项的详情请参考 第 B.45 节 “探测 模式参考”

存活度和就绪度探测配置示例

# ...
readinessProbe:
  initialDelaySeconds: 15
  timeoutSeconds: 5
livenessProbe:
  initialDelaySeconds: 15
  timeoutSeconds: 5
# ...

2.1.14.2. 配置健康检查

先决条件

  • OpenShift 集群
  • 一个正在运行的 Cluster Operator

步骤

  1. 编辑 Kafka 资源中的 liveness Probe 或 readinessProbe 属性。例如:

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        # ...
        readinessProbe:
          initialDelaySeconds: 15
          timeoutSeconds: 5
        livenessProbe:
          initialDelaySeconds: 15
          timeoutSeconds: 5
        # ...
      zookeeper:
        # ...
  2. 创建或更新资源。

    这可以使用 oc apply 来完成:

    oc apply -f your-file

2.1.15. Prometheus 指标

AMQ Streams 支持使用 Prometheus JMX 导出器的 Prometheus 指标将 Apache Kafka 和 ZooKeeper 支持的 JMX 指标转换为 Prometheus 指标。启用指标数据后,它们会在端口 9404 上公开。

有关设置和部署 Prometheus 和 Grafana 的更多信息,请参阅 OpenShift 指南中的 Deploying 和 升级 AMQ Streams 中的 介绍 Metrics 到 Kafka

2.1.15.1. 指标配置

Prometheus 指标通过在以下资源中配置 metrics 属性来启用:

  • Kafka.spec.kafka
  • Kafka.spec.zookeeper
  • KafkaConnect.spec
  • KafkaConnectS2I.spec

当资源中没有定义 metrics 属性时,Prometheus 指标将被禁用。要在不进一步配置的情况下启用 Prometheus 指标导出,您可以将其设置为空对象({})。

在没有任何进一步配置的情况下启用指标的示例

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    # ...
    metrics: {}
    # ...
  zookeeper:
    # ...

指标 属性可能包含 Prometheus JMX 导出器的其他配置

使用其他 Prometheus JMX Exporter 配置启用指标示例

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    # ...
    metrics:
      lowercaseOutputName: true
      rules:
        - pattern: "kafka.server<type=(.+), name=(.+)PerSec\\w*><>Count"
          name: "kafka_server_$1_$2_total"
        - pattern: "kafka.server<type=(.+), name=(.+)PerSec\\w*, topic=(.+)><>Count"
          name: "kafka_server_$1_$2_total"
          labels:
            topic: "$3"
    # ...
  zookeeper:
    # ...

2.1.15.2. 配置 Prometheus 指标

先决条件

  • OpenShift 集群
  • 一个正在运行的 Cluster Operator

步骤

  1. 编辑 Kafka 资源中的 metrics 属性。例如:

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        # ...
      zookeeper:
        # ...
        metrics:
          lowercaseOutputName: true
        # ...
  2. 创建或更新资源。

    这可以使用 oc apply 来完成:

    oc apply -f your-file

2.1.16. JMX 选项

AMQ Streams 支持通过在 9999 上打开 JMX 端口从 Kafka 代理获取 JMX 指标。您可以获取有关每个 Kafka 代理的各种指标,例如使用数据,如 BytesPerSecond 值或代理网络的请求率。AMQ Streams 支持打开受密码保护的 JMX 端口或不受保护的 JMX 端口。

2.1.16.1. 配置 JMX 选项

先决条件

  • OpenShift 集群
  • 一个正在运行的 Cluster Operator

您可以使用以下资源中的 jmxOptions 属性配置 JMX 选项:

  • Kafka.spec.kafka

您可以为 Kafka 代理上打开的 JMX 端口配置用户名和密码保护。

保护 JMX 端口

您可以保护 JMX 端口,以防止未经授权的 pod 访问该端口。目前,JMX 端口只能使用用户名和密码进行保护。要启用 JMX 端口的安全性,请将 身份验证 字段中的 type 参数设置为 password

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    # ...
    jmxOptions:
      authentication:
        type: "password"
    # ...
  zookeeper:
    # ...

这可让您将 pod 部署到集群中,并使用无头服务来获取 JMX 指标,并指定您要解决的代理。要从 broker 0 获得 JMX 指标,我们解决在无头服务前附加代理 0 的无外设服务:

"<cluster-name>-kafka-0-<cluster-name>-<headless-service-name>"

如果 JMX 端口受到保护,您可以通过在 pod 部署中引用 JMX 机密来获取用户名和密码。

使用打开的 JMX 端口

要禁用 JMX 端口的安全性,请不要填写 身份验证 字段

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    # ...
    jmxOptions: {}
    # ...
  zookeeper:
    # ...

这只会在无头服务上打开 JMX 端口,您可以按照上述方法将 pod 部署到集群中。唯一的区别在于,任何容器集都能够从 JMX 端口读取。

2.1.17. JVM 选项

AMQ 流的以下组件在虚拟机(VM)中运行:

  • Apache Kafka
  • Apache ZooKeeper
  • Apache Kafka Connect
  • Apache Kafka MirrorMaker
  • AMQ Streams Kafka Bridge

JVM 配置选项针对不同的平台和架构优化性能。AMQ Streams 允许您配置其中一些选项。

2.1.17.1. JVM 配置

使用 jvmOptions 属性,为组件所运行的 JVM 配置支持的选项

支持的 JVM 选项有助于优化不同平台和架构的性能。

2.1.17.2. 配置 JVM 选项

先决条件

  • OpenShift 集群
  • 一个正在运行的 Cluster Operator

步骤

  1. 编辑 Kafka、KafkaConnect、Kafka Connect S2I、Kafka MirrorMaker 或 Kafka Bridge 资源中的 jvmOptions 属性。例如:

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        # ...
        jvmOptions:
          "-Xmx": "8g"
          "-Xms": "8g"
        # ...
      zookeeper:
        # ...
  2. 创建或更新资源。

    这可以使用 oc apply 来完成:

    oc apply -f your-file

2.1.18. 容器镜像

AMQ Streams 允许您配置将用于其组件的容器镜像。建议仅在特殊情况下覆盖容器镜像,这时您需要使用不同的容器注册表。例如,因为您的网络不允许访问 AMQ Streams 使用的容器存储库。在这种情况下,您应该复制 AMQ Streams 镜像或从源构建它们。如果配置的镜像与 AMQ Streams 镜像不兼容,它可能无法正常工作。

2.1.18.1. 容器镜像配置

使用 image 属性 来指定要使用的容器镜像

警告

建议仅在特殊情况下覆盖容器镜像。

2.1.18.2. 配置容器镜像

先决条件

  • OpenShift 集群
  • 一个正在运行的 Cluster Operator

步骤

  1. 编辑 Kafka 资源中的 image 属性。例如:

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        # ...
        image: my-org/my-image:latest
        # ...
      zookeeper:
        # ...
  2. 创建或更新资源。

    这可以使用 oc apply 来完成:

    oc apply -f your-file

2.1.19. TLS sidecar

sidecar 是一个在 pod 中运行的容器,但用于支持目的。在 AMQ Streams 中,TLS sidecar 使用 TLS 来加密和解密各种组件和 ZooKeeper 之间的所有通信。

TLS sidecar 用于:

  • 实体 Operator
  • Sything Control

2.1.19.1. TLS sidecar 配置

可以使用 中的 tlsSidecar 属性配置 TLS sidecar:

  • Kafka.spec.kafka
  • Kafka.spec.zookeeper
  • Kafka.spec.entityOperator

TLS sidecar 支持以下附加选项:

  • 镜像
  • 资源
  • logLevel
  • readinessProbe
  • livenessProbe

resources 属性可用于指定为 TLS sidecar 分配的 内存和 CPU 资源

image 属性可用于配置要使用的容器镜像。有关配置自定义容器镜像的详情,请参考 第 2.1.18 节 “容器镜像”

logLevel 属性用于指定日志级别。支持以下日志记录级别:

  • emerg
  • alert
  • crit
  • err
  • warning
  • 备注
  • info
  • debug

默认值为 notice

有关为健康检查配置 readinessProbelivenessProbe 属性的更多信息,请参阅 第 2.1.14.1 节 “healthcheck 配置”

TLS sidecar 配置示例

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    # ...
    tlsSidecar:
      image: my-org/my-image:latest
      resources:
        requests:
          cpu: 200m
          memory: 64Mi
        limits:
          cpu: 500m
          memory: 128Mi
      logLevel: debug
      readinessProbe:
        initialDelaySeconds: 15
        timeoutSeconds: 5
      livenessProbe:
        initialDelaySeconds: 15
        timeoutSeconds: 5
    # ...
  zookeeper:
    # ...

2.1.19.2. 配置 TLS sidecar

先决条件

  • OpenShift 集群
  • 一个正在运行的 Cluster Operator

步骤

  1. 编辑 Kafka 资源中的 tlsSidecar 属性。例如:

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        # ...
      zookeeper:
        # ...
      entityOperator:
        # ...
        tlsSidecar:
          resources:
            requests:
              cpu: 200m
              memory: 64Mi
            limits:
              cpu: 500m
              memory: 128Mi
        # ...
      cruiseControl:
        # ...
        tlsSidecar:
          resources:
            requests:
              cpu: 200m
              memory: 64Mi
            limits:
              cpu: 500m
              memory: 128Mi
        # ...
  2. 创建或更新资源。

    这可以使用 oc apply 来完成:

    oc apply -f your-file

2.1.20. 配置 pod 调度

重要

当两个应用调度到同一个 OpenShift 节点时,这两个应用可能使用相同的资源,如磁盘 I/O 并影响性能。这可能会导致性能下降。以避免与其他关键工作负载共享节点的方式,仅为 Kafka 使用正确的节点或专用一组节点是如何避免这些问题的最佳方法,调度 Kafka pod 是避免这些问题的最佳方法。

2.1.20.1. 根据其他应用程序调度 pod

2.1.20.1.1. 避免关键应用程序共享节点

Pod 反关联性可用于确保关键应用永远不会调度到同一磁盘上。在运行 Kafka 集群时,建议使用 pod 反关联性来确保 Kafka 代理不与其他工作负载(如数据库)共享节点。

2.1.20.1.2. 关联性

可以使用以下资源中 的关联性属性来配置关联性

  • Kafka.spec.kafka.template.pod
  • Kafka.spec.zookeeper.template.pod
  • Kafka.spec.entityOperator.template.pod
  • KafkaConnect.spec.template.pod
  • KafkaConnectS2I.spec.template.pod
  • KafkaBridge.spec.template.pod

关联性配置可以包含不同类型的关联性:

  • pod 关联性和反关联性
  • 节点关联性

affinity 属性的格式遵循 OpenShift 规范。如需了解更多详细信息,请参阅 Kubernetes 节点和 pod 关联性文档

2.1.20.1.3. 在 Kafka 组件中配置 pod 反关联性

先决条件

  • OpenShift 集群
  • 一个正在运行的 Cluster Operator

步骤

  1. 编辑指定集群部署的资源中的 affinity 属性。使用标签指定不应调度到同一节点上的 pod。topologyKey 应设置为 kubernetes.io/hostname,以指定不应将所选 pod 调度到具有相同主机名的节点。例如:

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    spec:
      kafka:
        # ...
        template:
          pod:
            affinity:
              podAntiAffinity:
                requiredDuringSchedulingIgnoredDuringExecution:
                  - labelSelector:
                      matchExpressions:
                        - key: application
                          operator: In
                          values:
                            - postgresql
                            - mongodb
                    topologyKey: "kubernetes.io/hostname"
        # ...
      zookeeper:
        # ...
  2. 创建或更新资源。

    这可以使用 oc apply 来完成:

    oc apply -f your-file

2.1.20.2. 将 pod 调度到特定的节点

2.1.20.2.1. 节点调度

OpenShift 集群通常包含许多不同类型的工作程序节点。些已针对 CPU 繁重工作负载优化,有些则用于内存,另一些则可能针对存储(快速本地 SSD)或网络进行了优化。使用不同的节点有助于优化成本和性能。要获得最佳性能,务必要允许调度 AMQ Streams 组件以使用正确的节点。

OpenShift 使用节点关联性将工作负载调度到特定的节点上。通过节点关联性,您可以为要在其上调度 pod 的节点创建调度约束。约束指定为标签选择器。您可以使用内置节点标签(如 beta.kubernetes.io/instance-type) 或自定义标签来指定标签,以选择正确的节点。

2.1.20.2.2. 关联性

可以使用以下资源中 的关联性属性来配置关联性

  • Kafka.spec.kafka.template.pod
  • Kafka.spec.zookeeper.template.pod
  • Kafka.spec.entityOperator.template.pod
  • KafkaConnect.spec.template.pod
  • KafkaConnectS2I.spec.template.pod
  • KafkaBridge.spec.template.pod

关联性配置可以包含不同类型的关联性:

  • pod 关联性和反关联性
  • 节点关联性

affinity 属性的格式遵循 OpenShift 规范。如需了解更多详细信息,请参阅 Kubernetes 节点和 pod 关联性文档

2.1.20.2.3. 在 Kafka 组件中配置节点关联性

先决条件

  • OpenShift 集群
  • 一个正在运行的 Cluster Operator

步骤

  1. 标记调度 AMQ Streams 组件的节点。

    可以使用 oc label 来完成此操作:

    oc label node your-node node-type=fast-network

    或者,某些现有标签可以被重复使用。

  2. 编辑指定集群部署的资源中的 affinity 属性。例如:

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    spec:
      kafka:
        # ...
        template:
          pod:
            affinity:
              nodeAffinity:
                requiredDuringSchedulingIgnoredDuringExecution:
                  nodeSelectorTerms:
                    - matchExpressions:
                      - key: node-type
                        operator: In
                        values:
                        - fast-network
        # ...
      zookeeper:
        # ...
  3. 创建或更新资源。

    这可以使用 oc apply 来完成:

    oc apply -f your-file

2.1.20.3. 使用专用节点

2.1.20.3.1. 专用节点

集群管理员可以将选定的 OpenShift 节点标记为污点。具有污点的节点不在常规调度中,一般的 pod 不会调度到它们上运行。只有可以容许节点上设置的污点的服务才可以调度到其中。此类节点上运行的唯一其他服务是系统服务,如日志收集器或软件定义型网络。

污点可用于创建专用节点。在专用节点上运行 Kafka 及其组件可能会有许多优点。同一节点上运行的其他应用程序不会造成干扰或消耗 Kafka 所需的资源。这可提高性能和稳定性。

要在专用节点上调度 Kafka pod,请配置 节点关联性容限

2.1.20.3.2. 关联性

可以使用以下资源中 的关联性属性来配置关联性

  • Kafka.spec.kafka.template.pod
  • Kafka.spec.zookeeper.template.pod
  • Kafka.spec.entityOperator.template.pod
  • KafkaConnect.spec.template.pod
  • KafkaConnectS2I.spec.template.pod
  • KafkaBridge.spec.template.pod

关联性配置可以包含不同类型的关联性:

  • pod 关联性和反关联性
  • 节点关联性

affinity 属性的格式遵循 OpenShift 规范。如需了解更多详细信息,请参阅 Kubernetes 节点和 pod 关联性文档

2.1.20.3.3. 容限(Tolerations)

可以使用以下资源中的 tolerations 属性来配置容限:

  • Kafka.spec.kafka.template.pod
  • Kafka.spec.zookeeper.template.pod
  • Kafka.spec.entityOperator.template.pod
  • KafkaConnect.spec.template.pod
  • KafkaConnectS2I.spec.template.pod
  • KafkaBridge.spec.template.pod

tolerations 属性的格式遵循 OpenShift 规范。如需了解更多详细信息,请参阅 Kubernetes 污点和容限

2.1.20.3.4. 设置专用节点并在节点上调度 pod

先决条件

  • OpenShift 集群
  • 一个正在运行的 Cluster Operator

步骤

  1. 选择应当用作专用的节点。
  2. 确保这些节点上未调度工作负载。
  3. 在所选节点上设置污点:

    可以使用 oc adm taint 来完成此操作:

    oc adm taint node your-node dedicated=Kafka:NoSchedule
  4. 另外,也向所选节点添加标签。

    可以使用 oc label 来完成此操作:

    oc label node your-node dedicated=Kafka
  5. 编辑 指定集群部署的资源中的关联性容限 属性。例如:

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    spec:
      kafka:
        # ...
        template:
          pod:
            tolerations:
              - key: "dedicated"
                operator: "Equal"
                value: "Kafka"
                effect: "NoSchedule"
            affinity:
              nodeAffinity:
                requiredDuringSchedulingIgnoredDuringExecution:
                  nodeSelectorTerms:
                  - matchExpressions:
                    - key: dedicated
                      operator: In
                      values:
                      - Kafka
        # ...
      zookeeper:
        # ...
  6. 创建或更新资源。

    这可以使用 oc apply 来完成:

    oc apply -f your-file

2.1.21. Kafka Exporter

您可以将 Kafka 资源配置为自动在集群中部署 Kafka Exporter。

Kafka Exporter 提取数据作为 Prometheus 指标,主要与偏移、消费者组、消费者滞后和主题相关的数据。

有关设置 Kafka Exporter 以及监控消费滞后性能的原因,请参阅 OpenShift 指南中的部署和升级 AMQ Streams 中的 Kafka Exporter

2.1.22. 对 Kafka 集群执行滚动更新

此流程描述了如何使用 OpenShift 注解手动触发现有 Kafka 集群的滚动更新。

先决条件

有关运行 以下的说明,请参阅 OpenShift 指南中的部署和升级 AMQ Streams

步骤

  1. 查找控制您要手动更新的 Kafka pod 的 StatefulSet 的名称。

    例如,如果您的 Kafka 集群名为 my-cluster,则对应的 StatefulSet 名为 my-cluster-kafka

  2. 给 OpenShift 中的 StatefulSet 资源标注。例如,使用 oc annotate:

    oc annotate statefulset cluster-name-kafka strimzi.io/manual-rolling-update=true
  3. 等待下一次协调发生(默认为每隔两分钟)。只要协调过程检测到注解的 StatefulSet,则会触发注解的滚动更新。当所有 pod 的滚动更新完成后,注解会从 StatefulSet 中删除。

2.1.23. 执行 ZooKeeper 集群的滚动更新

此流程描述了如何使用 OpenShift 注释手动触发现有 ZooKeeper 集群的滚动更新。

先决条件

有关运行 以下的说明,请参阅 OpenShift 指南中的部署和升级 AMQ Streams

步骤

  1. 查找控制您要手动更新的 ZooKeeper pod 的 StatefulSet 的名称。

    例如,如果您的 Kafka 集群名为 my-cluster,则对应的 StatefulSet 被 命名为 my-cluster-zookeeper

  2. 给 OpenShift 中的 StatefulSet 资源标注。例如,使用 oc annotate:

    oc annotate statefulset cluster-name-zookeeper strimzi.io/manual-rolling-update=true
  3. 等待下一次协调发生(默认为每隔两分钟)。只要协调过程检测到注解的 StatefulSet,则会触发注解的滚动更新。当所有 pod 的滚动更新完成后,注解会从 StatefulSet 中删除。

2.1.24. 扩展集群

2.1.24.1. 扩展 Kafka 集群

2.1.24.1.1. 在集群中添加代理

提高某个主题吞吐量的主要方法是增加该主题的分区数量。这是因为额外的分区允许在集群中的不同代理之间共享该主题的负载。但是,如果每个代理都受到使用更多分区的特定资源(通常是 I/O)限制,则不会增加吞吐量。相反,您需要在集群中添加代理。

当您在集群中添加额外的代理时,Kafka 不会自动为其分配任何分区。您必须决定从现有代理迁移到新代理的分区。

在所有代理之间重新分布分区后,应该减少每个代理的资源利用率。

2.1.24.1.2. 从集群中删除代理

因为 AMQ Streams 使用 StatefulSet 来管理代理 Pod,所以 您无法从集群中删除 任何 pod。您只能从集群中移除一个或多个数字最高的 pod。例如,在一个有 12 个代理的集群中,pod 被命名为 cluster-name-kafka-0 to cluster-name-kafka-11。如果您决定缩减一个代理,则会删除 cluster-name-kafka-11

在从集群中删除代理前,请确保它没有被分配给任何分区。您还应决定剩余的代理中哪些将负责要停用代理中的每个分区。代理没有分配分区后,您可以安全地缩减集群。

2.1.24.2. 分区重新分配

Topic Operator 目前不支持将副本分配给不同的代理,因此需要直接连接到代理 Pod 以将副本分配到代理。

在代理 pod 中,kafka-reassign-partitions.sh 实用程序允许您将分区重新分配给不同的代理。

它有三种不同的模式:

--generate
取一组主题和代理,并生成 重新分配 JSON 文件,该文件将导致这些主题的分区分配给这些代理。由于这适用于整个主题,因此当您需要重新分配某些主题的部分分区时,无法使用它。
--execute
重新分配 JSON 文件 并将其应用到集群中的分区和代理。因此获得分区的代理会成为分区领导的追随者。对于给定分区,新代理一旦捕获并加入 ISR(同步副本)后,旧的代理将停止成为追随者,并删除其副本。
--verify
使用与 --execute 步骤相同的 重新分配 JSON 文件,-- verify 检查文件中的所有分区是否已移到其预期代理中。如果重新分配完成后,--verify 也会移除任何有效的 节流。除非被删除,否则节流将继续影响群集,即使重新分配完成后也是如此。

只能在任何给定时间在集群中运行一个重新分配,且无法取消正在运行的重新分配。如果您需要取消重新分配,请等待它完成,然后执行另一个重新分配以恢复第一次重新分配的影响。kafka-reassign-partitions.sh 将显示此重新版本的重新分配 JSON 作为其输出的一部分。在需要停止进行中的重新分配时,应将非常大的重新分配分成几个较小的重新分配。

2.1.24.2.1. 重新分配 JSON 文件

重新分配 JSON 文件 有一个特定的结构:

{
  "version": 1,
  "partitions": [
    <PartitionObjects>
  ]
}

其中 <PartitionObjects> 是一个用逗号分开的对象列表,例如:

{
  "topic": <TopicName>,
  "partition": <Partition>,
  "replicas": [ <AssignedBrokerIds> ]
}
注意

虽然 Kafka 也支持 "log_dirs" 属性,但这不应用于 AMQ Streams。

以下是重新分配 JSON 文件的示例,该文件分配主题 主题a、分区 4 到代理 247,以及 主题 2 的分区 2 分配给 代理 157

{
  "version": 1,
  "partitions": [
    {
      "topic": "topic-a",
      "partition": 4,
      "replicas": [2,4,7]
    },
    {
      "topic": "topic-b",
      "partition": 2,
      "replicas": [1,5,7]
    }
  ]
}

JSON 中没有包括的分区不会更改。

2.1.24.2.2. 在 JBOD 卷间重新分配分区

在 Kafka 集群中使用 JBOD 存储时,您可以选择在特定卷及其日志目录之间重新分配分区(每个卷都有一个日志目录)。要将分区重新分配给特定卷,请将 log_dirs 选项添加到重新分配 JSON 文件中的 <PartitionObjects> 中。

{
  "topic": <TopicName>,
  "partition": <Partition>,
  "replicas": [ <AssignedBrokerIds> ],
  "log_dirs": [ <AssignedLogDirs> ]
}

log_dirs 对象应包含与 replicas 对象中指定的副本数相同的日志目录 数量。该值应当是日志目录的绝对路径或 any 关键字。

例如:

{
      "topic": "topic-a",
      "partition": 4,
      "replicas": [2,4,7].
      "log_dirs": [ "/var/lib/kafka/data-0/kafka-log2", "/var/lib/kafka/data-0/kafka-log4", "/var/lib/kafka/data-0/kafka-log7" ]
}

2.1.24.3. 生成重新分配 JSON 文件

此流程描述了如何生成重新分配 JSON 文件,该文件使用 kafka-reassign-partitions.sh 工具为给定主题集重新分配所有分区。

先决条件

  • 一个正在运行的 Cluster Operator
  • Kafka 资源
  • 重新分配分区的一组主题

步骤

  1. 准备名为 topic .json 的 JSON 文件,其中列出要移动的主题。它必须具有以下结构:

    {
      "version": 1,
      "topics": [
        <TopicObjects>
      ]
    }

    其中 <TopicObjects> 是一个以逗号分隔的对象列表,例如:

    {
      "topic": <TopicName>
    }

    例如,如果要重新分配 topic-a 和 topic- b 的所有 分区,则需要准备类似以下的 topic .json 文件:

    {
      "version": 1,
      "topics": [
        { "topic": "topic-a"},
        { "topic": "topic-b"}
      ]
    }
  2. topic.json 文件复制到其中一个代理 pod 中:

    cat topics.json | oc exec -c kafka <BrokerPod> -i -- \
      /bin/bash -c \
      'cat > /tmp/topics.json'
  3. 使用 kafka-reassign-partitions.sh 命令生成重新分配 JSON。

    oc exec <BrokerPod> -c kafka -it -- \
      bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
      --topics-to-move-json-file /tmp/topics.json \
      --broker-list <BrokerList> \
      --generate

    例如,将 topic-a 和 topic- b 的所有分区移动到 broker 47

    oc exec <BrokerPod> -c kafka -it -- \
      bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
      --topics-to-move-json-file /tmp/topics.json \
      --broker-list 4,7 \
      --generate

2.1.24.4. 手动创建重新分配 JSON 文件

如果要移动特定的分区,可以手动创建重新分配 JSON 文件。

2.1.24.5. 重新分配节流

分区重新分配可能是一个缓慢的过程,因为它涉及到在代理之间传输大量数据。为避免对客户端造成重大影响,您可以限制重新分配过程。这可能会导致重新分配需要更长的时间。

  • 如果限流太低,新分配的代理将无法与正在发布的记录保持同步,并且重新分配永远不会完成。
  • 如果限流太高,客户端就会受到影响。

例如,对于制作者而言,这可能会比等待确认的正常延迟更高。对于消费者而言,这可能会因为轮询之间延迟较高而导致吞吐量下降。

2.1.24.6. 扩展 Kafka 集群

这个步骤描述了如何在 Kafka 集群中增加代理数量。

先决条件

  • 现有的 Kafka 集群。
  • 名为 reassignment .json 的重新分配 JSON 文件 描述了如何在放大的集群中将分区重新分配给代理。

步骤

  1. 通过增加 Kafka.spec.kafka.replicas 配置选项,根据需要添加多个新代理。
  2. 验证新代理 Pod 是否已启动。
  3. reassignment.json 文件复制到您稍后要执行命令的代理 pod 中:

    cat reassignment.json | \
      oc exec broker-pod -c kafka -i -- /bin/bash -c \
      'cat > /tmp/reassignment.json'

    例如:

    cat reassignment.json | \
      oc exec my-cluster-kafka-0 -c kafka -i -- /bin/bash -c \
      'cat > /tmp/reassignment.json'
  4. 使用同一代理 pod 的 kafka-reassign-partitions.sh 命令行工具执行分区重新分配。

    oc exec broker-pod -c kafka -it -- \
      bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
      --reassignment-json-file /tmp/reassignment.json \
      --execute

    如果您要节流复制,您也可以使用 --throttle 选项,每秒使用节流率(以字节为单位)。例如:

    oc exec my-cluster-kafka-0 -c kafka -it -- \
      bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
      --reassignment-json-file /tmp/reassignment.json \
      --throttle 5000000 \
      --execute

    此命令将显示两个重新分配 JSON 对象:第一个记录了正在移动的分区的当前分配。如果稍后需要恢复重新分配,您应该将其保存到本地文件(而不是 pod 中的文件)。第二个 JSON 对象是您在重新分配 JSON 文件中传递的目标重新分配。

  5. 如果您需要在重新分配期间更改节流,您可以使用具有不同节流率的同一命令行。例如:

    oc exec my-cluster-kafka-0 -c kafka -it -- \
      bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
      --reassignment-json-file /tmp/reassignment.json \
      --throttle 10000000 \
      --execute
  6. 定期验证是否使用任何代理 Pod 的 kafka-reassign-partitions.sh 命令行工具完成重新分配。这与上一步中的命令相同,但使用 --verify 选项而不是 --execute 选项。

    oc exec broker-pod -c kafka -it -- \
      bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
      --reassignment-json-file /tmp/reassignment.json \
      --verify

    例如,

    oc exec my-cluster-kafka-0 -c kafka -it -- \
      bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
      --reassignment-json-file /tmp/reassignment.json \
      --verify
  7. --verify 命令报告每个分区成功完成时,重新分配已完成。最后的 --verify 还会导致移除任何重新分配节流。现在,如果您保存了 JSON,将分配还原到其原始代理中,您可以删除还原文件。

2.1.24.7. 缩减 Kafka 集群

其他资源

这个步骤描述了如何减少 Kafka 集群中的代理数量。

先决条件

  • 现有的 Kafka 集群。
  • 名为 reassignment .json 的重新分配 JSON 文件 描述在 删除了最高编号 Pod 中的代理后,应如何将 分区重新分配给集群中的代理。

步骤

  1. reassignment.json 文件复制到您稍后要执行命令的代理 pod 中:

    cat reassignment.json | \
      oc exec broker-pod -c kafka -i -- /bin/bash -c \
      'cat > /tmp/reassignment.json'

    例如:

    cat reassignment.json | \
      oc exec my-cluster-kafka-0 -c kafka -i -- /bin/bash -c \
      'cat > /tmp/reassignment.json'
  2. 使用同一代理 pod 的 kafka-reassign-partitions.sh 命令行工具执行分区重新分配。

    oc exec broker-pod -c kafka -it -- \
      bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
      --reassignment-json-file /tmp/reassignment.json \
      --execute

    如果您要节流复制,您也可以使用 --throttle 选项,每秒使用节流率(以字节为单位)。例如:

    oc exec my-cluster-kafka-0 -c kafka -it -- \
      bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
      --reassignment-json-file /tmp/reassignment.json \
      --throttle 5000000 \
      --execute

    此命令将显示两个重新分配 JSON 对象:第一个记录了正在移动的分区的当前分配。如果稍后需要恢复重新分配,您应该将其保存到本地文件(而不是 pod 中的文件)。第二个 JSON 对象是您在重新分配 JSON 文件中传递的目标重新分配。

  3. 如果您需要在重新分配期间更改节流,您可以使用具有不同节流率的同一命令行。例如:

    oc exec my-cluster-kafka-0 -c kafka -it -- \
      bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
      --reassignment-json-file /tmp/reassignment.json \
      --throttle 10000000 \
      --execute
  4. 定期验证是否使用任何代理 Pod 的 kafka-reassign-partitions.sh 命令行工具完成重新分配。这与上一步中的命令相同,但使用 --verify 选项而不是 --execute 选项。

    oc exec broker-pod -c kafka -it -- \
      bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
      --reassignment-json-file /tmp/reassignment.json \
      --verify

    例如,

    oc exec my-cluster-kafka-0 -c kafka -it -- \
      bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
      --reassignment-json-file /tmp/reassignment.json \
      --verify
  5. --verify 命令报告每个分区成功完成时,重新分配已完成。最后的 --verify 还会导致移除任何重新分配节流。现在,如果您保存了 JSON,将分配还原到其原始代理中,您可以删除还原文件。
  6. 所有分区重新分配完成后,删除的代理不应对集群中的任何分区负责。您可以通过检查代理的数据日志目录是否包含任何实时分区日志来验证这一点。如果代理上的日志目录包含与扩展正则表达式 [a-zA-Z0-9.-]+\ 不匹配的目录。[a-z0-9]+-delete$ 仍然具有实时分区,不应停止。

    您可以执行以下命令检查:

    oc exec my-cluster-kafka-0 -c kafka -it -- \
      /bin/bash -c \
      "ls -l /var/lib/kafka/kafka-log_<N>_ | grep -E '^d' | grep -vE '[a-zA-Z0-9.-]+\.[a-z0-9]+-delete$'"

    其中 N 是要删除的 Pod 数。

    如果上述命令显示任何输出,则代理仍然有实时分区。在这种情况下,重新分配没有完成,或者重新分配 JSON 文件不正确。

  7. 确认代理没有实时分区后,您可以编辑 Kafka.spec.kafka.replicas 资源的 Kafka. spec.kafka.replicas,这会缩减 StatefulSet从而删除最高数字的代理 Pod。

2.1.25. 手动删除 Kafka 节点

其他资源

此流程描述了如何使用 OpenShift 注解删除现有 Kafka 节点。删除 Kafka 节点包括删除运行 Kafka 代理的 Pod 和相关的 PersistentVolumeClaim (如果集群部署了持久性存储)。删除后,Pod 及其相关的 PersistentVolumeClaim 会被自动重新创建。

警告

删除 PersistentVolumeClaim 可能会导致持久性数据丢失。只有在您遇到存储问题时才应执行以下步骤。

先决条件

有关运行 以下的说明,请参阅 OpenShift 指南中的部署和升级 AMQ Streams

步骤

  1. 查找您要删除的 Pod 的名称。

    例如,如果集群命名为 cluster-name,pod 被命名为 cluster-name-kafka-index,其中 索引 以零开始,结尾是副本总数。

  2. 给 OpenShift 中的 容器集资源标注:

    使用 oc annotate:

    oc annotate pod cluster-name-kafka-index strimzi.io/delete-pod-and-pvc=true
  3. 等待下一次协调,当带有底层持久性卷声明的注解的 pod 被删除后再重新创建。

2.1.26. 手动删除 ZooKeeper 节点

此流程描述了如何使用 OpenShift 注释删除现有的 ZooKeeper 节点。删除 ZooKeeper 节点包括删除运行 ZooKeeper 的 Pod 和相关的 PersistentVolumeClaim (如果集群部署了持久性存储)。删除后,Pod 及其相关的 PersistentVolumeClaim 会被自动重新创建。

警告

删除 PersistentVolumeClaim 可能会导致持久性数据丢失。只有在您遇到存储问题时才应执行以下步骤。

先决条件

有关运行 以下的说明,请参阅 OpenShift 指南中的部署和升级 AMQ Streams

步骤

  1. 查找您要删除的 Pod 的名称。

    例如,如果集群名为 cluster-name,pod 被命名为 cluster-name-zookeeper-index,其中 索引 从零开始,结尾是副本总数。

  2. 给 OpenShift 中的 容器集资源标注:

    使用 oc annotate:

    oc annotate pod cluster-name-zookeeper-index strimzi.io/delete-pod-and-pvc=true
  3. 等待下一次协调,当带有底层持久性卷声明的注解的 pod 被删除后再重新创建。

2.1.27. 用于滚动更新的维护时间窗

通过维护时间窗,您可以计划对 Kafka 和 ZooKeeper 集群进行某些滚动更新,以便在方便的时间启动。

2.1.27.1. 维护时间窗概述

在大多数情况下,Cluster Operator 只更新 Kafka 或 ZooKeeper 集群,以响应对对应 Kafka 资源的更改。这可让您计划何时对 Kafka 资源应用更改,以最大程度降低对 Kafka 客户端应用程序的影响。

但是,在不对 Kafka 资源 进行任何相应的更改的情况下,可能会对 Kafka 和 ZooKeeper 集群进行一些更新。例如,如果 Cluster Operator 管理的 CA(证书授权机构)证书接近到期时间,则 Cluster Operator 将需要执行滚动重启。

虽然 pod 的滚动重启不应影响服务 的可用性 (假设正确的代理和主题配置),这可能会影响 Kafka 客户端应用程序 的性能。通过维护时间窗,您可以调度 Kafka 和 ZooKeeper 集群的自发滚动更新,以便在方便的时间开始。如果没有为群集配置维护时间窗,那么这种自发滚动更新可能会在不方便的时间(如在可预测的高负载期间)发生。

2.1.27.2. 维护时间窗定义

您可以通过在 Kafka.spec.maintenanceTimeWindows 属性中输入字符串数组来配置维护时间窗。每个字符串是一个 cron 表达式,解释为使用 UTC(协调世界时间,其实际用途与 Greenwich Mean Time 相同)。

以下示例配置了一个维护时间窗口,该时间窗口从午夜开始,到 01:59am(UTC)、周日、周一、星期二和周四结束:

# ...
maintenanceTimeWindows:
  - "* * 0-1 ? * SUN,MON,TUE,WED,THU *"
# ...

在实践中,维护窗口应当与 Kafka.spec.clusterCa.renewalDaysKafka.spec.clientsCa.renewalDays 属性一同设置,以确保在 配置的维护时间窗口中完成必要的 CA 证书续订。

注意

AMQ Streams 不完全根据给定的窗口调度维护操作。相反,对于每个协调,它会检查维护窗口当前是否"打开"。这意味着,给定时间窗内的维护操作启动可能会被 Cluster Operator 协调间隔延迟。因此维护时间窗必须至少是这个长。

其他资源

2.1.27.3. 配置维护时间窗

您可以配置维护时间窗,用于由支持的进程触发的滚动更新。

先决条件

  • OpenShift 集群。
  • Cluster Operator 正在运行。

步骤

  1. Kafka 资源中添加或编辑 maintenanceTimeWindows 属性。例如,允许在 0800 到 1059 之间以及 1400 到 1559 之间的维护,您可以设置 maintenanceTimeWindows,如下所示:

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        # ...
      zookeeper:
        # ...
      maintenanceTimeWindows:
        - "* * 8-10 * * ?"
        - "* * 14-15 * * ?"
  2. 创建或更新资源。

    这可以使用 oc apply 来完成:

    oc apply -f your-file

其他资源

2.1.28. 手动续订 CA 证书

在相应证书续订期开始时,集群和客户端 CA 证书会自动续订。如果 Kafka.spec.clusterCa.generateCertificateAuthorityKafka.spec.clientsCa.generateCertificateAuthority 设置为 false,则 CA 证书不会自动续订。

您可以在证书续订期限开始前手动续订其中一个或两个证书。出于安全原因 或更改了证书的续订期或有效期,您可以这样做。

更新的证书使用与旧证书相同的私钥。

先决条件

  • Cluster Operator 正在运行。
  • 安装 CA 证书和私钥的 Kafka 集群。

步骤

  1. strimzi.io/force-renew 注解应用到包含您要续订的 CA 证书的 Secret

    表 2.1. 强制续订证书的 Secret 注解
    证书Secretannotate 命令

    集群 CA

    kAFKA-CLUSTER-NAME-cluster-ca-cert

    oc annotate secret KAFKA-CLUSTER-NAME-cluster-ca-cert strimzi.io/force-renew=true

    客户端 CA

    kAFKA-CLUSTER-NAME-clients-ca-cert

    oc annotate secret KAFKA-CLUSTER-NAME-clients-ca-cert strimzi.io/force-renew=true

    在下一次协调中,Cluster Operator 会为您注解的 Secret 生成一个新的 CA 证书。如果配置了维护时间窗,Cluster Operator 将在下一个维护时间窗内第一次协调时生成新的 CA 证书。

    客户端应用程序必须重新载入由 Cluster Operator 更新的集群和客户端 CA 证书。

  2. 检查 CA 证书有效的周期:

    例如,使用 openssl 命令:

    oc get secret CA-CERTIFICATE-SECRET -o 'jsonpath={.data.CA-CERTIFICATE}' | base64 -d | openssl x509 -subject -issuer -startdate -enddate -noout

    ca-CERTIFICATE-SECRETSecret 的名称,这是用于集群 CA 证书的 KAFKA-CLUSTER-NAME-cluster-ca-cert 和用于客户端 CA 证书的 KAFKA-CLUSTER-NAME-clients-ca-cert

    ca-CERTIFICATE 是 CA 证书的名称,如 jsonpath={.data.ca\.crt}

    该命令会返回 notBeforenotAfter 日期,这是 CA 证书的有效周期。

    例如,对于集群 CA 证书:

    subject=O = io.strimzi, CN = cluster-ca v0
    issuer=O = io.strimzi, CN = cluster-ca v0
    notBefore=Jun 30 09:43:54 2020 GMT
    notAfter=Jun 30 09:43:54 2021 GMT
  3. 从 Secret 删除旧证书。

    当组件使用新证书时,旧证书可能仍处于活跃状态。删除旧证书以消除任何潜在的安全风险。

2.1.29. 替换私钥

您可以替换集群 CA 和客户端 CA 证书使用的私钥。替换私钥时,Cluster Operator 会为新私钥生成一个新的 CA 证书。

先决条件

  • Cluster Operator 正在运行。
  • 安装 CA 证书和私钥的 Kafka 集群。

步骤

  • strimzi.io/force-replace 注解应用到包含您要续订的私钥的 Secret

    表 2.2. 替换私钥的命令
    的私钥Secretannotate 命令

    集群 CA

    <cluster-name>-cluster-ca

    oc annotate secret <cluster-name>-cluster-ca strimzi.io/force-replace=true

    客户端 CA

    <cluster-name>-clients-ca

    oc annotate secret <cluster-name>-clients-ca strimzi.io/force-replace=true

在下一次协调中,Cluster Operator 将:

  • 为注解的 Secret 生成一个新的私钥
  • 生成新的 CA 证书

如果配置了维护时间窗,Cluster Operator 将在下一个维护时间窗内第一次协调时生成新的私钥和 CA 证书。

客户端应用程序必须重新载入由 Cluster Operator 更新的集群和客户端 CA 证书。

2.1.30. 作为 Kafka 集群一部分创建的资源列表

以下资源由 OpenShift 集群中的 Cluster Operator 创建:

共享资源

cluster-name-cluster-ca
使用用于加密集群通信的集群 CA 的机密。
cluster-name-cluster-ca-cert
使用 Cluster CA 公钥的 secret。这个密钥可以用来验证 Kafka 代理的身份。
cluster-name-clients-ca
使用用于签署用户证书的 Clients CA 私钥的 secret
cluster-name-clients-ca-cert
使用 Clients CA 公钥的机密。此密钥可用于验证 Kafka 用户的身份。
cluster-name-cluster-operator-certs
使用集群操作器密钥与 Kafka 和 ZooKeeper 通信的机密。

zookeeper 节点

cluster-name-zookeeper
StatefulSet,它负责管理 ZooKeeper 节点 pod。
cluster-name-zookeeper-idx
由 Zookeeper StatefulSet 创建的 Pod。
cluster-name-zookeeper-nodes
无头服务需要直接解析 ZooKeeper pod IP 地址。
cluster-name-zookeeper-client
Kafka 代理用于连接 ZooKeeper 节点的服务作为客户端。
cluster-name-zookeeper-config
包含 ZooKeeper 辅助配置的 ConfigMap,由 ZooKeeper 节点 pod 挂载为卷。
cluster-name-zookeeper-nodes
使用 ZooKeeper 节点密钥的机密。
cluster-name-zookeeper
Zookeeper 节点使用的服务帐户。
cluster-name-zookeeper
为 ZooKeeper 节点配置的 Pod Disruption Budget。
cluster-name-network-policy-zookeeper
管理对 ZooKeeper 服务访问的网络策略.
data-cluster-name-zookeeper-idx
用于为 ZooKeeper 节点 pod idx 存储数据的卷的持久性卷声明。只有在选择了持久存储来调配持久卷以存储数据时,才会创建此资源。

Kafka 代理

cluster-name-kafka
StatefulSet,它负责管理 Kafka 代理 pod。
cluster-name-kafka-idx
由 Kafka StatefulSet 创建的 Pod。
cluster-name-kafka-brokers
需要 DNS 直接解析 Kafka 代理 Pod IP 地址的服务。
cluster-name-kafka-bootstrap
服务可用作 Kafka 客户端的 bootstrap 服务器。
cluster-name-kafka-external-bootstrap
为从 OpenShift 集群外部连接的客户端引导服务。只有在启用了外部侦听器时才会创建此资源。
cluster-name-kafka-pod-id
用于将流量从 OpenShift 集群外部路由到单个容器集的服务。只有在启用了外部侦听器时才会创建此资源。
cluster-name-kafka-external-bootstrap
从 OpenShift 集群外部连接的客户端的引导路由。只有在启用了外部侦听器并设置为 type route 时才会创建此资源。
cluster-name-kafka-pod-id
从 OpenShift 集群外部的流量路由到单个容器集。只有在启用了外部侦听器并设置为 type route 时才会创建此资源。
cluster-name-kafka-config
包含 Kafka 辅助配置且由 Kafka 代理 Pod 挂载为卷的 ConfigMap。
cluster-name-kafka-brokers
使用 Kafka 代理密钥的 secret。
cluster-name-kafka
Kafka 代理使用的服务帐户。
cluster-name-kafka
为 Kafka 代理配置的 Pod Disruption Budget。
cluster-name-network-policy-kafka
管理对 Kafka 服务访问的网络策略。
strimzi-namespace-name-cluster-name-kafka-init
Kafka 代理使用的集群角色绑定。
cluster-name-jmx
用来保护 Kafka 代理端口的 JMX 用户名和密码的 secret。只有在 Kafka 中启用了 JMX 时才会创建此资源。
data-cluster-name-kafka-idx
用于存储 Kafka 代理 pod IDx 数据的卷的持久性卷声明。只有在选择了持久存储来调配持久卷以存储数据时,才会创建此资源。
data-id-cluster-name-kafka-idx
用于存储 Kafka 代理 pod ID 的卷 ID 的 持久性卷声明。只有在调配持久卷以存储数据时,才会为 JBOD 卷选择持久存储时创建此资源。

实体 Operator

只有在使用 Cluster Operator 部署 Entity Operator 时才会创建这些资源。

cluster-name-entity-operator
使用主题和用户操作器进行部署.
cluster-name-entity-operator-random-string
由 Entity Operator 部署创建的 Pod。
cluster-name-entity-topic-operator-config
带有主题 Operator 辅助配置的 ConfigMap。
cluster-name-entity-user-operator-config
带有用户 Operator 辅助配置的 ConfigMap.
cluster-name-entity-operator-certs
使用 Entity Operator 密钥与 Kafka 和 ZooKeeper 通信的 secret。
cluster-name-entity-operator
Entity Operator 使用的服务帐户。
strimzi-cluster-name-topic-operator
Entity Operator 使用的角色绑定。
strimzi-cluster-name-user-operator
Entity Operator 使用的角色绑定。

Kafka Exporter

只有在使用 Cluster Operator 部署 Kafka Exporter 时才会创建这些资源。

cluster-name-kafka-exporter
使用 Kafka 导出器部署.
cluster-name-kafka-exporter-random-string
由 Kafka Exporter 部署创建的 Pod。
cluster-name-kafka-exporter
用于收集消费者滞后指标的服务.
cluster-name-kafka-exporter
Kafka Exporter 使用的服务帐户。

Sything Control

只有在使用 Cluster Operator 部署 Cruise Control 时才会创建这些资源。

cluster-name-cruise-control
通过 Cruise 控制进行部署.
cluster-name-cruise-control-random-string
由 Cruise Control 部署创建的 Pod。
cluster-name-cruise-control-config
包含 Cruise Control 辅助配置的 ConfigMap,并被 Cruise Control pod 作为一个卷挂载。
cluster-name-cruise-control-certs
使用 Cruise Control 密钥与 Kafka 和 ZooKeeper 通信的机密。
cluster-name-cruise-control
用于与 Cruise Control 通信的服务.
cluster-name-cruise-control
Cruise Control 使用的服务帐户.
cluster-name-network-policy-cruise-control
管理对 Cruise 控制服务的访问的网络策略.

JMXTrans

只有在使用 Cluster Operator 部署 JMXTrans 时才会创建这些资源。

cluster-name-jmxtrans
使用 JMXTrans 部署.
cluster-name-jmxtrans-random-string
由 JMXTrans 部署创建的 Pod。
cluster-name-jmxtrans-config
包含 JMXTrans 辅助配置的 ConfigMap,由 JMXTrans pod 挂载为卷。
cluster-name-jmxtrans
JMXTrans 使用的服务帐户.
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.