第 2 章 部署配置
本章论述了如何为支持的部署配置不同方面:
- Kafka 集群
- Kafka Connect 集群
- 支持 Source2Image 的 Kafka Connect 集群
- Kafka Mirror Maker
- Kafka Bridge
- 基于令牌的 OAuth 2.0 身份验证
- 基于令牌的 OAuth 2.0 授权
2.1. Kafka 集群配置
Kafka
资源的完整 schema 信息包括在 第 B.2 节 “Kafka
模式参考” 中。应用到所需 Kafka
资源的所有标签也将应用到组成 Kafka 集群的 OpenShift 资源。这提供了一种便捷的机制,可使资源根据需要进行标记。
2.1.1. Kafka YAML 配置示例
有关了解 Kafka 部署可用的配置选项的帮助,请参考此处提供的示例 YAML 文件。
示例只显示一些可能的选项,但尤为重要的配置选项包括:
- 资源请求(CPU / Memory)
- 用于最大和最小内存分配的 JVM 选项
- 侦听器(及身份验证)
- 认证
- 存储
- 机架感知
- 指标
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: replicas: 3 1 version: 1.6 2 resources: 3 requests: memory: 64Gi cpu: "8" limits: 4 memory: 64Gi cpu: "12" jvmOptions: 5 -Xms: 8192m -Xmx: 8192m listeners: 6 - name: plain 7 port: 9092 8 type: internal 9 tls: false 10 configuration: useServiceDnsDomain: true 11 - name: tls port: 9093 type: internal tls: true authentication: 12 type: tls - name: external 13 port: 9094 type: route tls: true configuration: brokerCertChainAndKey: 14 secretName: my-secret certificate: my-certificate.crt key: my-key.key authorization: 15 type: simple config: 16 auto.create.topics.enable: "false" offsets.topic.replication.factor: 3 transaction.state.log.replication.factor: 3 transaction.state.log.min.isr: 2 ssl.cipher.suites: "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384" 17 ssl.enabled.protocols: "TLSv1.2" ssl.protocol: "TLSv1.2" storage: 18 type: persistent-claim 19 size: 10000Gi 20 rack: 21 topologyKey: topology.kubernetes.io/zone metrics: 22 lowercaseOutputName: true rules: 23 # Special cases and very specific rules - pattern : kafka.server<type=(.+), name=(.+), clientId=(.+), topic=(.+), partition=(.*)><>Value name: kafka_server_$1_$2 type: GAUGE labels: clientId: "$3" topic: "$4" partition: "$5" # ... zookeeper: 24 replicas: 3 resources: requests: memory: 8Gi cpu: "2" limits: memory: 8Gi cpu: "2" jvmOptions: -Xms: 4096m -Xmx: 4096m storage: type: persistent-claim size: 1000Gi metrics: # ... entityOperator: 25 topicOperator: resources: requests: memory: 512Mi cpu: "1" limits: memory: 512Mi cpu: "1" userOperator: resources: requests: memory: 512Mi cpu: "1" limits: memory: 512Mi cpu: "1" kafkaExporter: 26 # ... cruiseControl: 27 # ...
- 1
- replicas 指定代理节点的数量。
- 2
- Kafka 版本,可按照 升级过程 更改。
- 3
- 资源请求 指定要为给定容器保留的资源。
- 4
- 资源限值指定容器可以消耗的最大资源。
- 5
- JVM 选项可以为 JVM 指定最小(
-Xms
)和最大内存分配(-Xmx
)。 - 6
- 侦听器配置客户端如何通过 bootstrap 地址连接到 Kafka 集群。侦听器 配置为内部或外部 侦听器,用于 OpenShift 集群 内部和外部 的连接。
- 7
- 用于标识侦听器的名称。在 Kafka 集群中必须是唯一的。
- 8
- Kafka 内侦听器使用的端口号。在给定 Kafka 集群中,端口号必须是唯一的。允许的端口号是 9092 及以上,但端口 9404 和 9999 除外,它们已用于 Prometheus 和 JMX。根据监听程序类型,端口号可能与连接 Kafka 客户端的端口号不同。
- 9
- 侦听器类型指定为
内部
,或用于外部监听器,作为路由
、负载均衡器、
节点端口
或入口
。 - 10
- 为每个侦听器启用 TLS 加密。默认为
false
。路由
监听程序不需要 TLS 加密。 - 11
- 定义是否分配了包括集群服务后缀(通常.
cluster.local
)在内的完全限定 DNS 名称。 - 12
- 13
- 14
- 由外部 证书颁发机构管理的 Kafka 侦听器 证书的可选配置。
brokerCertChainAndKey
属性指定包含服务器证书和私钥的Secret
。也可以为 TLS 侦听程序配置 Kafka 侦听程序证书。 - 15
- 授权在 Kafka 代理上启用简单、OAUTH 2.0 或 OPA 授权。简单授权使用
AclAuthorizer
Kafka 插件。 - 16
- config 指定代理配置。标准 Apache Kafka 配置可能会提供,仅限于不是由 AMQ Streams 直接管理的属性。
- 17
- 18
- 19
- 20
- 21
- 将机架感知配置为在 不同机架之间分布副本。
拓扑
键必须与集群节点标签匹配。 - 22
- 23
- 通过 JMX Exporter 将指标导出到 Grafana 仪表板的 Kafka 规则。可在您的 Kafka 资源配置中复制由 AMQ Streams 提供的一组规则。
- 24
- zookeeper 特定的配置,其中包含与 Kafka 配置类似的属性。
- 25
- 实体 Operator 配置,用于指定 Topic Operator 和 User Operator 的配置。
- 26
- Kafka Exporter 配置,用于将数据公开为 Prometheus 指标数据。
- 27
- 精简控制配置 ,用于重新平衡 Kafka 集群。
2.1.2. 数据存储注意事项
高效的数据存储基础架构对于 AMQ 流的最佳性能至关重要。
块存储是必需的。文件存储(如 NFS)无法使用 Kafka。
对于块存储,您可以选择:
- 基于云的块存储解决方案,如 Amazon Elastic Block Store(EBS)
- 本地持久性卷
- 通过 光纤通道 或 iSCSI等协议访问的存储区域网络(SAN)卷
AMQ Streams 不需要 OpenShift 原始块卷。
2.1.2.1. 文件系统
建议您将存储系统配置为使用 XFS 文件系统。AMQ Streams 还与 ext4 文件系统兼容,但为了获得最佳结果,可能需要额外的配置。
2.1.2.2. Apache Kafka 和 ZooKeeper 存储
Apache Kafka 和 ZooKeeper 使用单独的磁盘。
支持三种类型的数据存储:
- 临时(仅适用于开发的建议)
- persistent
- JBOD(只是一个磁盘绑定,仅适用于 Kafka)
如需更多信息,请参阅 Kafka 和 ZooKeeper 存储。
在异步发送和从多个主题接收数据的大型集群中,固态驱动器(SSD)(虽然不必要)可以提高 Kafka 的性能。SSD 在 ZooKeeper 中特别有效,它需要快速、低延迟的数据访问。
您不需要置备复制存储,因为 Kafka 和 ZooKeeper 都内置了数据复制。
2.1.3. Kafka 和 ZooKeeper 存储类型
作为有状态应用程序,Kafka 和 ZooKeeper 需要将数据存储在磁盘上。AMQ Streams 支持用于此数据的三种存储类型:
- ephemeral
- persistent
- JBOD 存储
JBOD 存储只支持 Kafka,不适用于 ZooKeeper。
在配置 Kafka
资源时,您可以指定 Kafka 代理使用的存储类型及其对应的 ZooKeeper 节点。您可以使用以下资源中的 storage
属性配置存储类型:
-
Kafka.spec.kafka
-
Kafka.spec.zookeeper
存储类型在 type
字段中配置。
部署 Kafka 集群后,无法更改存储类型。
其他资源
- 如需有关临时存储的更多信息,请参阅 临时存储模式参考。
- 有关持久性存储的更多信息,请参阅 持久性存储架构参考。
- 有关 JBOD 存储的更多信息,请参阅 JBOD 模式参考。
-
有关
Kafka
模式的更多信息,请参阅Kafka
模式参考。
2.1.3.1. 临时存储
临时存储使用 emptyDir
卷来存储数据。若要使用临时存储,type
字段应设置为 ephemeral
。
emptyDir
卷不是持久性的,存储在其中的数据会在 Pod 重启时丢失。新 pod 启动后,它必须从集群的其他节点恢复所有数据。临时存储不适用于单一节点 ZooKeeper 集群,而对于带有复制因数 1 的 Kafka 主题,这会导致数据丢失。
临时存储示例
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... storage: type: ephemeral # ... zookeeper: # ... storage: type: ephemeral # ...
2.1.3.1.1. 日志目录
Kafka 代理将使用临时卷作为挂载到以下路径的日志目录:
/var/lib/kafka/data/kafka-log_idx_
-
其中
idx
是 Kafka 代理 pod 索引。例如/var/lib/kafka/data/kafka-log0。
2.1.3.2. 持久性存储
永久存储使用持久卷声明 来调配用于存储数据的持久卷。持久卷声明可用于调配许多不同类型的卷,具体取决于要调配卷的 存储类。可以与持久卷声明一起使用的数据类型包括许多类型的 SAN 存储 和本地持久卷。
若要使用持久存储,必须将 type
设置为 persistent-claim
。持久性存储支持额外的配置选项:
id
(可选)-
存储标识号.对于 JBOD 存储声明中定义的存储卷,此选项是必需的。默认值为
0
。 大小
(必需)- 定义持久卷声明的大小,如 "1000Gi"。
class
(可选)- 用于动态卷置备的 OpenShift 存储类。
selector
(可选)- 允许选择要使用的特定持久性卷。它包含键:值对,代表用于选择此类卷的标签。
deleteClaim
(optional)-
指定在取消部署集群时是否需要删除持久卷声明的布尔值。默认为
false
。
仅支持重新定义持久性卷大小的 OpenShift 版本支持在现有 AMQ Streams 集群中增加持久性卷的大小。要调整持久性卷的大小必须使用支持卷扩展的存储类。对于不支持卷扩展的其他 OpenShift 版本和存储类,您必须在部署集群前决定必要的存储大小。无法减少现有持久性卷的大小。
使用 1000Gi 大小
的持久性存储配置片段示例
# ... storage: type: persistent-claim size: 1000Gi # ...
以下示例演示了存储类的使用。
使用特定存储类的持久性存储配置片段示例
# ... storage: type: persistent-claim size: 1Gi class: my-storage-class # ...
最后,可以利用 选择器
来选择特定的标记持久卷,以提供 SSD 等必要功能。
使用选择器的持久性存储配置片段示例
# ... storage: type: persistent-claim size: 1Gi selector: hdd-type: ssd deleteClaim: true # ...
2.1.3.2.1. 存储类覆盖
您可以为一个或多个 Kafka 代理或 ZooKeeper 节点指定不同的存储类,而不是使用默认存储类。例如,当存储类仅限于不同的可用区或数据中心时,这很有用。您可以使用 overrides
字段来实现这一目的。
在本例中,默认存储类名为 my-storage-class
:
使用存储类覆盖的 AMQ Streams 集群示例
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: labels: app: my-cluster name: my-cluster namespace: myproject spec: # ... kafka: replicas: 3 storage: deleteClaim: true size: 100Gi type: persistent-claim class: my-storage-class overrides: - broker: 0 class: my-storage-class-zone-1a - broker: 1 class: my-storage-class-zone-1b - broker: 2 class: my-storage-class-zone-1c # ... zookeeper: replicas: 3 storage: deleteClaim: true size: 100Gi type: persistent-claim class: my-storage-class overrides: - broker: 0 class: my-storage-class-zone-1a - broker: 1 class: my-storage-class-zone-1b - broker: 2 class: my-storage-class-zone-1c # ...
配置的 overrides
属性后,卷使用以下存储类:
-
ZooKeeper 节点 0 的持久卷将使用
my-storage-class-zone-1a
。 -
ZooKeeper 节点 1 的持久卷将使用
my-storage-class-zone-1b
。 -
ZooKeeepr 节点 2 的持久卷将使用
my-storage-class-zone-1c
。 -
Kafka 代理 0 的持久性卷将使用
my-storage-class-zone-1a
。 -
Kafka 代理 1 的持久性卷将使用
my-storage-class-zone-1b
。 -
Kafka 代理 2 的持久性卷将使用
my-storage-class-zone-1c
。
overrides
属性目前只用于覆盖存储类配置。目前不支持覆盖其他存储配置字段。目前不支持存储配置中的其他字段。
2.1.3.2.2. 持久性卷声明命名
使用持久性存储时,它会使用以下名称创建持久性卷声明:
data-cluster-name-kafka-idx
-
用于存储 Kafka 代理 pod
IDx
数据的卷的持久性卷声明。 data-cluster-name-zookeeper-idx
-
用于为 ZooKeeper 节点 pod
idx
存储数据的卷的持久性卷声明。
2.1.3.2.3. 日志目录
Kafka 代理将使用持久性卷作为挂载到以下路径的日志目录:
/var/lib/kafka/data/kafka-log_idx_
-
其中
idx
是 Kafka 代理 pod 索引。例如/var/lib/kafka/data/kafka-log0。
2.1.3.3. 重新定义持久性卷大小
您可以通过增大现有 AMQ Streams 集群使用的持久性卷的大小来置备存储容量。在使用一个持久性卷或在 JBOD 存储配置中使用多个持久性卷的集群中,支持重新定义持久性卷大小。
您可以增大但不能缩小持久性卷的大小。OpenShift 目前不支持缩小持久卷的大小。
先决条件
- 个 OpenShift 集群,其支持卷大小调整。
- Cluster Operator 正在运行。
- 使用使用支持卷扩展的存储类创建的持久性卷的 Kafka 集群。
步骤
在
Kafka
资源中,增大分配给 Kafka 集群和 ZooKeeper 集群的持久性卷的大小。-
要增大分配给 Kafka 集群的卷大小,编辑
spec.kafka.storage
属性。 要增大分配给 ZooKeeper 集群的卷大小,请编辑
spec.zookeeper.storage
属性。例如,将卷大小从
1000Gi
增加到2000Gi
:apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... storage: type: persistent-claim size: 2000Gi class: my-storage-class # ... zookeeper: # ...
-
要增大分配给 Kafka 集群的卷大小,编辑
创建或更新资源。
使用
oc apply
:oc apply -f your-file
OpenShift 会响应来自 Cluster Operator 的请求,增加了所选持久性卷的容量。完成大小调整后,Cluster Operator 会重启所有使用调整大小持久性卷的 pod。这会自动发生。
其他资源
有关在 OpenShift 中调整持久性卷大小的更多信息,请参阅使用 Kubernetes 重新定义持久性卷 大小。
2.1.3.4. JBOD 存储概述
您可以将 AMQ Streams 配置为使用 JBOD,这是多个磁盘或卷的数据存储配置。JBOD 是为 Kafka 代理提供更多数据存储的一种方法。它还能提高性能。
一个或多个卷描述了 JBOD 配置,每个卷可以 临时 或永久 。JBOD 卷声明的规则和限制与临时和永久存储的规则和限制相同。例如,您无法在置备后更改持久性存储卷的大小。
2.1.3.4.1. JBOD 配置
若要将 JBOD 与 AMQ Streams 搭配使用,存储 类型
必须设置为 jbod
。借助 volumes
属性,您可以描述组成 JBOD 存储阵列或配置的磁盘。以下片段显示了 JBOD 配置示例:
# ... storage: type: jbod volumes: - id: 0 type: persistent-claim size: 100Gi deleteClaim: false - id: 1 type: persistent-claim size: 100Gi deleteClaim: false # ...
创建 JBOD 卷后,无法更改 id。
用户可以从 JBOD 配置中添加或删除卷。
2.1.3.4.2. JBOD 和持久性卷声明
当使用持久性存储声明 JBOD 卷时,生成的持久性卷声明的命名方案如下:
data-id-cluster-name-kafka-idx
-
其中
id
是用于存储 Kafka 代理 podidx
数据的卷 ID。
2.1.3.4.3. 日志目录
Kafka 代理将使用 JBOD 卷作为挂载到以下路径的日志目录:
/var/lib/kafka/data-id/kafka-log_idx_
-
其中
id
是用于存储 Kafka 代理 podidx
数据的卷 ID。例如/var/lib/kafka/data-0/kafka-log0。
2.1.3.5. 将卷添加到 JBOD 存储
这个步骤描述了如何在配置为使用 JBOD 存储的 Kafka 集群中添加卷。它不能应用到配置为使用任何其他存储类型的 Kafka 集群。
在已经使用并被删除的 id
下添加新卷时,您必须确保之前使用的 PersistentVolumeClaims
已被删除。
先决条件
- OpenShift 集群
- 一个正在运行的 Cluster Operator
- 带有 JBOD 存储的 Kafka 集群
步骤
编辑
Kafka
资源中的spec.kafka.storage.volumes
属性。将新卷添加到 volumes数组
。例如,添加 ID 为2
的新卷:apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... storage: type: jbod volumes: - id: 0 type: persistent-claim size: 100Gi deleteClaim: false - id: 1 type: persistent-claim size: 100Gi deleteClaim: false - id: 2 type: persistent-claim size: 100Gi deleteClaim: false # ... zookeeper: # ...
创建或更新资源。
这可以使用
oc apply
来完成:oc apply -f KAFKA-CONFIG-FILE
- 创建新主题或将现有分区重新分配至新磁盘.
其他资源
有关重新分配主题的详情请参考 第 2.1.24.2 节 “分区重新分配”。
2.1.3.6. 从 JBOD 存储中删除卷
这个步骤描述了如何从配置为使用 JBOD 存储的 Kafka 集群中删除卷。它不能应用到配置为使用任何其他存储类型的 Kafka 集群。JBOD 存储始终必须至少包含一个卷。
为避免数据丢失,您必须先移动所有分区,然后再删除卷。
先决条件
- OpenShift 集群
- 一个正在运行的 Cluster Operator
- 带有两个或多个卷的 JBOD 存储的 Kafka 集群
步骤
- 从要删除的磁盘中重新分配所有分区。分区中仍然分配到将要删除的磁盘中的任何数据都可能会丢失。
编辑
Kafka
资源中的spec.kafka.storage.volumes
属性。从 volumes 阵列中删除一个或多个卷
。例如,删除 ID 为1
和2
的卷:apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... storage: type: jbod volumes: - id: 0 type: persistent-claim size: 100Gi deleteClaim: false # ... zookeeper: # ...
创建或更新资源。
这可以使用
oc apply
来完成:oc apply -f your-file
其他资源
有关重新分配主题的详情请参考 第 2.1.24.2 节 “分区重新分配”。
2.1.4. Kafka 代理副本
Kafka 集群可以使用多个代理运行。您可以在 Kafka. spec.kafka.replicas 中配置用于 Kafka
集群的代理数量。集群的最佳代理数量需要根据您的具体用例来确定。
2.1.4.1. 配置代理节点的数量
这个步骤描述了如何在新集群中配置 Kafka 代理节点的数量。它只适用于没有分区的新集群。如果您的集群已经定义了主题,请参阅 第 2.1.24 节 “扩展集群”。
先决条件
- OpenShift 集群
- 一个正在运行的 Cluster Operator
- 尚未定义主题的 Kafka 集群
步骤
编辑
Kafka
资源中的replicas
属性。例如:apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... replicas: 3 # ... zookeeper: # ...
创建或更新资源。
这可以使用
oc apply
来完成:oc apply -f your-file
其他资源
如果您的集群已经定义了主题,请参阅 第 2.1.24 节 “扩展集群”。
2.1.5. Kafka 代理配置
AMQ Streams 允许您自定义 Kafka 集群中 Kafka 代理的配置。您可以指定和配置 Apache Kafka 文档 的"Broker Configs"部分中列出的大多数选项。您无法配置与以下区域相关的选项:
- 安全(加密、身份验证和授权)
- 监听程序配置
- 代理 ID 配置
- 日志数据目录的配置
- Broker 间通信
- zookeeper 连接
这些选项由 AMQ Streams 自动配置。
有关代理配置的更多信息,请参阅 KafkaClusterSpec
模式。
监听程序配置
您可以配置监听程序以连接到 Kafka 代理。有关配置监听程序的更多信息,请参阅 Listener 配置
授权对 Kafka 的访问
您可以将 Kafka 集群配置为允许或拒绝用户执行的操作。有关保护对 Kafka 代理的访问的更多信息,请参阅管理对 Kafka 的访问。
2.1.5.1. 配置 Kafka 代理
您可以配置现有的 Kafka 代理,或使用指定配置创建新 Kafka 代理。
先决条件
- OpenShift 集群可用。
- Cluster Operator 正在运行。
步骤
-
打开包含指定集群部署的
Kafka
资源的 YAML 配置文件。 在
Kafka
资源中的spec.kafka.config
属性中,输入一个或多个 Kafka 配置设置。例如:apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka spec: kafka: # ... config: default.replication.factor: 3 offsets.topic.replication.factor: 3 transaction.state.log.replication.factor: 3 transaction.state.log.min.isr: 1 # ... zookeeper: # ...
应用新配置以创建或更新资源。
使用
oc apply
:oc apply -f kafka.yaml
其中
kafka.yaml
是您要配置的资源的 YAML 配置文件,例如kafka-persistent.yaml
。
2.1.6. 监听程序配置
侦听器用于连接到 Kafka 代理。
AMQ Streams 提供了一个 generic GenericKafkaListener
模式,其中包含用于通过 Kafka
资源配置监听器的属性。
The GenericKafkaListener
提供了用于监听器配置的灵活方法。
您可以指定属性来配置 内部 侦听器,以便在 OpenShift 集群内连接,或者指定 外部 侦听器以连接 OpenShift 集群外部。
通用监听程序配置
每个监听程序都 定义为 Kafka
资源中的一个数组。
有关监听器配置的更多信息,请参阅 GenericKafkaListener
模式参考。
通用监听程序配置使用 KafkaListeners
模式参考替换了之前的监听程序配置方法,该参考 已被弃用。但是,您可以使用向后兼容将 旧格式转换为新格式。
KafkaListeners
模式将子属性用于 纯文本
、tls
和外部
监听程序,以及每个节点的固定端口。由于架构架构固有的限制,只能配置三个监听器,配置选项仅限于监听器类型。
使用 GenericKafkaListener
模式时,您可以根据需要配置多个监听器,只要它们的名称和端口是唯一的。
您可能希望配置多个外部监听器,例如,以处理来自需要不同身份验证机制的网络的访问。或者,您可能需要将 OpenShift 网络加入外部网络。在这种情况下,您可以配置内部侦听器( 使用ServiceDnsDomain
属性),以便不使用 OpenShift 服务 DNS 域(通常为 .cluster.local
)。
配置监听程序来保护对 Kafka 代理的访问
您可以使用身份验证为安全连接配置监听程序。有关保护对 Kafka 代理的访问的更多信息,请参阅管理对 Kafka 的访问。
为 OpenShift 外部的客户端访问配置外部侦听器
您可以使用指定的连接机制(如 loadbalancer),为 OpenShift 环境外的客户端访问配置外部侦听器。有关连接外部客户端的配置选项的更多信息,请参阅配置外部监听程序。
监听程序证书
您可以为启用了 TLS 加密的 TLS 监听程序或者外部监听程序提供自己的服务器证书,称为 Kafka 侦听程序证书。如需更多信息,请参阅 Kafka 侦听程序证书。
2.1.7. zookeeper 副本
zookeeper 集群或ensemble 通常使用奇数的节点(通常为三个、五个或 7 个)运行。
大多数节点必须可用,才能保持有效的仲裁。如果 ZooKeeper 集群丢失其仲裁,它将停止响应客户端,Kafka 代理将停止工作。拥有稳定且高度可用的 ZooKeeper 集群对于 AMQ 流至关重要。
- 三节点集群
- 为了保持仲裁,三节点 ZooKeeper 群集至少需要启动并运行两个节点。它只能容忍一个节点不可用。
- 五节点集群
- 为了保持仲裁,五节点 ZooKeeper 群集至少需要启动并运行三个节点。它可能会容忍两个节点不可用。
- 七节点集群
- 为了保持仲裁,七节点 ZooKeeper 群集至少需要启动并运行四个节点。它可能会容忍三个节点不可用。
出于开发目的,也可以使用单一节点运行 ZooKeeper。
拥有更多节点不一定意味着更好的性能,因为随着集群中节点的数量增加,保持仲裁的成本将会增加。根据可用性要求,您可以决定要使用的节点数量。
2.1.7.1. ZooKeeper 节点数量
可以使用 Kafka.spec.zookeeper
中的 replicas
属性来配置 ZooKeeper 节点的数量。
显示副本配置示例
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... zookeeper: # ... replicas: 3 # ...
2.1.7.2. 更改 ZooKeeper 副本数量
先决条件
- OpenShift 集群可用。
- Cluster Operator 正在运行。
步骤
-
打开包含指定集群部署的
Kafka
资源的 YAML 配置文件。 在
Kafka
资源中的spec.zookeeper.replicas
属性中,输入复制的 ZooKeeper 服务器数量。例如:apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... zookeeper: # ... replicas: 3 # ...
应用新配置以创建或更新资源。
使用
oc apply
:oc apply -f kafka.yaml
其中
kafka.yaml
是您要配置的资源的 YAML 配置文件,例如kafka-persistent.yaml
。
2.1.8. zookeeper 配置
AMQ Streams 允许您自定义 Apache ZooKeeper 节点的配置。您可以指定和配置 ZooKeeper 文档 中列出的大多数选项。
无法配置的选项与以下区域相关:
- 安全(加密、身份验证和授权)
- 监听程序配置
- 数据目录的配置
- zookeeper 集群组成
这些选项由 AMQ Streams 自动配置。
2.1.8.1. zookeeper 配置
zookeeper 节点使用 Kafka.spec.zookeeper
中的 config
属性进行配置。此属性包含 ZooKeeper 配置选项作为键。这些值可使用以下 JSON 类型之一描述:
- 字符串
- 数字
- 布尔值
除了那些直接由 AMQ Streams 管理的选项外,用户可以指定和配置 ZooKeeper 文档 中列出的选项。具体来说,所有键为等于或以以下任一字符串开头的配置选项将被禁止:
-
服务器.
-
dataDir
-
dataLogDir
-
clientPort
-
authProvider
-
quorum.auth
-
requireClientAuthScheme
当 config
属性中存在一个禁止选项时,它将被忽略,并会在 Cluster Operator 日志文件中输出警告信息。所有其他选项都传递到 ZooKeeper。
Cluster Operator 不会验证提供的 config
对象中的密钥或值。当提供无效的配置时,ZooKeeper 集群可能不会启动,或者可能会变得不稳定。在这种情况下,Kafka.spec.zookeeper.config
对象中的配置应该被修复,Cluster Operator 会将新配置部署到所有 ZooKeeper 节点。
所选选项有默认值:
-
timeTick
,默认值为2000
-
默认值为
5
的initLimit
-
带有
默认值
限制2
的同步 -
autopurge.purgeInterval
,默认值为1
当 Kafka.spec.zookeeper.config
属性中没有这些选项时,会自动配置这些选项。
使用三个允许的 ssl
配置选项,将特定的 密码套件 用于 TLS 版本。密码套件组合了用于安全连接和数据传输的算法。
ZooKeeper 配置示例
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka spec: kafka: # ... zookeeper: # ... config: autopurge.snapRetainCount: 3 autopurge.purgeInterval: 1 ssl.cipher.suites: "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384" 1 ssl.enabled.protocols: "TLSv1.2" 2 ssl.protocol: "TLSv1.2" 3 # ...
2.1.8.2. 配置 ZooKeeper
先决条件
- OpenShift 集群可用。
- Cluster Operator 正在运行。
步骤
-
打开包含指定集群部署的
Kafka
资源的 YAML 配置文件。 在
Kafka
资源中的spec.zookeeper.config
属性中,输入一个或多个 ZooKeeper 配置设置。例如:apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka spec: kafka: # ... zookeeper: # ... config: autopurge.snapRetainCount: 3 autopurge.purgeInterval: 1 # ...
应用新配置以创建或更新资源。
使用
oc apply
:oc apply -f kafka.yaml
其中
kafka.yaml
是您要配置的资源的 YAML 配置文件,例如kafka-persistent.yaml
。
2.1.9. zookeeper 连接
zookeeper 服务通过加密和身份验证进行保护,它们不应该由不属于 AMQ Streams 的外部应用程序使用。
但是,如果要使用需要连接到 ZooKeeper 的 Kafka CLI 工具,您可以使用 ZooKeeper 容器中的终端,并连接 localhost:12181
作为 ZooKeeper 地址。
2.1.9.1. 从终端连接到 ZooKeeper
大多数 Kafka CLI 工具可以直接连接到 Kafka。因此,在正常情况下,您应不需要连接到 ZooKeeper。如果需要,您可以按照以下步骤操作。在 ZooKeeper 容器内打开终端,以使用 Kafka CLI 工具需要 ZooKeeper 连接。
先决条件
- OpenShift 集群可用。
- Kafka 集群正在运行。
- Cluster Operator 正在运行。
步骤
使用 OpenShift 控制台打开终端,或者从 CLI 运行
exec
命令。例如:
oc exec -it my-cluster-zookeeper-0 -- bin/kafka-topics.sh --list --zookeeper localhost:12181
务必使用
localhost:12181
。现在,您可以向 ZooKeeper 运行 Kafka 命令。
2.1.10. 实体 Operator
Entity Operator 负责管理正在运行的 Kafka 集群中与 Kafka 相关的实体。
Entity Operator 包括:
- 用于管理 Kafka 主题的主题 Operator
- 管理 Kafka 用户的用户 Operator
通过 Kafka
资源配置,Cluster Operator 可在部署 Kafka 集群时部署 Entity Operator,包括一个或多个操作器。
部署后,实体 Operator 根据部署配置包含操作器。
Operator 会自动配置为管理 Kafka 集群的主题和用户。
2.1.10.1. 实体 Operator 配置属性
使用 Kafka.spec 中的 principal
Operator
属性来配置 Entity Operator。
entity Operator
属性支持以下几个子属性:
-
tlsSidecar
-
topicOperator
-
userOperator
-
模板
tlsSidecar
属性包含 TLS sidecar 容器的配置,用于与 ZooKeeper 通信。有关配置 TLS sidecar 的详情请参考 第 2.1.19 节 “TLS sidecar”。
template
属性包含 Entity Operator pod 的配置,如标签、注解、关联性和容限。有关配置模板的详情请参考 第 2.6 节 “自定义 OpenShift 资源”。
topicOperator
属性包含 Topic Operator 的配置。当缺少这个选项时,Entity Operator 会在没有 Topic Operator 的情况下部署。
userOperator
属性包含 User Operator 的配置。当缺少这个选项时,Entity Operator 会在没有 User Operator 的情况下部署。
有关配置 Entity Operator 的属性的更多信息,请参阅 EntityUserOperatorSpec
schema 参考。
启用两个运算符的基本配置示例
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... zookeeper: # ... entityOperator: topicOperator: {} userOperator: {}
如果将空对象({}
)用于 topicOperator 和
,则所有属性都使用其默认值。
userOperator
当缺少 topicOperator
和 userOperator
属性时,Entity Operator 不会被部署。
2.1.10.2. 主题 Operator 配置属性
可以使用 topicOperator
对象中的附加选项来配置主题 Operator 部署。支持以下属性:
watchedNamespace
-
主题操作器监视
KafkaTopics
的 OpenShift 命名空间。默认为部署 Kafka 集群的命名空间。 reconciliationIntervalSeconds
-
定期协调之间的间隔(以秒为单位)。
默认
90. zookeeperSessionTimeoutSeconds
-
ZooKeeper 会话超时,以秒为单位。默认
20
。 topicMetadataMaxAttempts
-
从 Kafka 获取主题元数据的尝试数量。每次尝试之间的时间都定义为指数回退。考虑在因为分区或副本数量而创建主题需要更多时间时增加这个值。
默认
6. 镜像
-
image
属性可用于配置要使用的容器镜像。有关配置自定义容器镜像的详情,请参考 第 2.1.18 节 “容器镜像”。 资源
-
resources
属性配置分配给 Topic Operator 的资源数量。有关资源请求和限制配置的详情,请参阅 第 2.1.11 节 “CPU 和内存资源”。 logging
-
logging
属性配置 Topic Operator 的日志。如需了解更多详细信息,请参阅 第 2.1.10.4 节 “Operator 日志记录器”。
Topic Operator 配置示例
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... zookeeper: # ... entityOperator: # ... topicOperator: watchedNamespace: my-topic-namespace reconciliationIntervalSeconds: 60 # ...
2.1.10.3. 用户 Operator 配置属性
用户 Operator 部署可以使用 userOperator
对象中的附加选项进行配置。支持以下属性:
watchedNamespace
-
用户操作员在其中监视
KafkaUsers
的 OpenShift 命名空间。默认为部署 Kafka 集群的命名空间。 reconciliationIntervalSeconds
-
定期协调之间的间隔(以秒为单位)。
默认值
120. zookeeperSessionTimeoutSeconds
-
ZooKeeper 会话超时,以秒为单位。
默认
6. 镜像
-
image
属性可用于配置要使用的容器镜像。有关配置自定义容器镜像的详情,请参考 第 2.1.18 节 “容器镜像”。 资源
-
resources
属性配置分配给 User Operator 的资源数量。有关资源请求和限制配置的详情,请参阅 第 2.1.11 节 “CPU 和内存资源”。 logging
-
logging
属性配置 User Operator 的日志记录。如需了解更多详细信息,请参阅 第 2.1.10.4 节 “Operator 日志记录器”。
User Operator 配置示例
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... zookeeper: # ... entityOperator: # ... userOperator: watchedNamespace: my-user-namespace reconciliationIntervalSeconds: 60 # ...
2.1.10.4. Operator 日志记录器
Topic Operator 和 User Operator 具有可配置的日志记录器:
-
rootLogger.level
操作器使用 Apache log4j2
日志记录器实施。
使用 Kafka
资源中的 logging
属性来配置日志记录器和日志记录器级别。
您可以通过直接(内线)指定日志记录器和级别来设置日志级别,或使用自定义(外部)ConfigMap。如果使用 ConfigMap,则将 logging.name
属性设置为包含外部日志配置的 ConfigMap 的名称。在 ConfigMap 中,日志配置使用 log4j2.properties 进行
描述。
此处我们会看到 内联
和外部
记录示例。
内联日志记录
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... zookeeper: # ... entityOperator: # ... topicOperator: watchedNamespace: my-topic-namespace reconciliationIntervalSeconds: 60 logging: type: inline loggers: rootLogger.level: INFO # ... userOperator: watchedNamespace: my-topic-namespace reconciliationIntervalSeconds: 60 logging: type: inline loggers: rootLogger.level: INFO # ...
外部日志记录
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... zookeeper: # ... entityOperator: # ... topicOperator: watchedNamespace: my-topic-namespace reconciliationIntervalSeconds: 60 logging: type: external name: customConfigMap # ...
其他资源
- 垃圾收集器(GC)日志记录也可以启用(或禁用)。有关 GC 日志的更多信息,请参阅 第 2.1.17.1 节 “JVM 配置”
- 有关日志级别的更多信息,请参阅 Apache 日志记录服务。
2.1.10.5. 配置实体 Operator
先决条件
- OpenShift 集群
- 一个正在运行的 Cluster Operator
步骤
编辑
Kafka
资源中的 principalOperator
属性。例如:apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... zookeeper: # ... entityOperator: topicOperator: watchedNamespace: my-topic-namespace reconciliationIntervalSeconds: 60 userOperator: watchedNamespace: my-user-namespace reconciliationIntervalSeconds: 60
创建或更新资源。
这可以使用
oc apply
来完成:oc apply -f your-file
2.1.11. CPU 和内存资源
对于每个部署的容器,AMQ Streams 允许您请求特定资源并定义这些资源的最大消耗。
AMQ Streams 支持两种类型的资源:
- CPU
- 内存
AMQ Streams 使用 OpenShift 语法来指定 CPU 和内存资源。
2.1.11.1. 资源限值和请求
资源限值和请求使用以下 资源中的 resources
属性进行配置:
-
Kafka.spec.kafka
-
Kafka.spec.zookeeper
-
Kafka.spec.entityOperator.topicOperator
-
Kafka.spec.entityOperator.userOperator
-
Kafka.spec.entityOperator.tlsSidecar
-
Kafka.spec.kafkaExporter
-
KafkaConnect.spec
-
KafkaConnectS2I.spec
-
KafkaBridge.spec
其他资源
- 有关在 OpenShift 中管理计算资源的更多信息,请参阅为容器管理计算资源。
2.1.11.1.1. 资源请求
请求指定要为给定容器保留的资源。保留资源可确保资源始终可用。
如果资源请求针对的是 OpenShift 集群中可用的可用资源,则不会调度该容器集。
资源请求在 requests
属性中指定。AMQ Streams 当前支持的资源请求:
-
cpu
-
memory
可以为一个或多个支持的资源配置请求。
使用所有资源的资源请求配置示例
# ... resources: requests: cpu: 12 memory: 64Gi # ...
2.1.11.1.2. 资源限值
limits 指定给定容器可以消耗的最大资源。限制不是保留的,可能并不总是可用。容器只能在资源可用时才使用资源限制。资源限制应始终大于资源请求。
资源限值在 limits
属性中指定。AMQ Streams 当前支持的资源限制:
-
cpu
-
memory
资源可以被配置为一个或多个支持的限制。
资源限制配置示例
# ... resources: limits: cpu: 12 memory: 64Gi # ...
2.1.11.1.3. 支持的 CPU 格式
支持 CPU 请求和限制,其格式如下:
-
CPU 内核数作为整数(
5
个 CPU 内核)或十进制(2.5
个 CPU 内核)。 -
数字或 millicpus / millicores(
100m
),其中 1000 millicores 相同1 个
CPU 内核。
CPU 单元示例
# ... resources: requests: cpu: 500m limits: cpu: 2.5 # ...
1 个 CPU 核心的计算能力可能因部署 OpenShift 的平台而异。
其他资源
- 有关 CPU 规格的更多信息,请参阅 CPU 含义。
2.1.11.1.4. 支持的内存格式
内存请求和限值以兆字节、千兆字节、兆字节和千兆字节为单位指定。
-
要指定内存(以 MB 为单位),请使用
M
后缀。例如1000M
。 -
要指定以 GB 为单位的内存,请使用
G
后缀。例如1G
: -
要以兆字节为单位指定内存,请使用
Mi
后缀。例如1000Mi
。 -
要以千兆字节为单位指定内存,请使用
Gi
后缀。例如1Gi
。
使用不同内存单元的示例
# ... resources: requests: memory: 512Mi limits: memory: 2Gi # ...
其他资源
- 有关内存规格和其他支持单元的详情,请参阅 内存含义。
2.1.11.2. 配置资源请求和限值
先决条件
- OpenShift 集群
- 一个正在运行的 Cluster Operator
步骤
编辑指定集群部署的资源中的
resources
属性。例如:apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka spec: kafka: # ... resources: requests: cpu: "8" memory: 64Gi limits: cpu: "12" memory: 128Gi # ... zookeeper: # ...
创建或更新资源。
这可以使用
oc apply
来完成:oc apply -f your-file
其他资源
- 如需有关该架构的更多信息,请参阅 资源要求 API 参考。
2.1.12. Kafka 日志记录器
Kafka 拥有自己的可配置日志记录器:
-
log4j.logger.org.I0Itec.zkclient.ZkClient
-
log4j.logger.org.apache.zookeeper
-
log4j.logger.kafka
-
log4j.logger.org.apache.kafka
-
log4j.logger.kafka.request.logger
-
log4j.logger.kafka.network.Processor
-
log4j.logger.kafka.server.KafkaApis
-
log4j.logger.kafka.network.RequestChannel$
-
log4j.logger.kafka.controller
-
log4j.logger.kafka.log.LogCleaner
-
log4j.logger.state.change.logger
-
log4j.logger.kafka.authorizer.logger
zookeeper 还具有可配置的日志记录器:
-
zookeeper.root.logger
Kafka 和 ZooKeeper 使用 Apache log4j
日志记录器实现。
操作员使用 Apache log4j2
日志记录器实现,因此日志记录配置在 ConfigMap 中使用 log4j2.properties 进行
描述。如需更多信息,请参阅 第 2.1.10.4 节 “Operator 日志记录器”。
使用 logging
属性来配置日志记录器和日志记录器级别。
您可以通过直接(内线)指定日志记录器和级别来设置日志级别,或使用自定义(外部)ConfigMap。如果使用 ConfigMap,则将 logging.name
属性设置为包含外部日志配置的 ConfigMap 的名称。在 ConfigMap 中,日志配置使用 log4j.properties 进行
描述。
此处我们会看到 内联
和外部
记录示例。
内联日志记录
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka spec: # ... kafka: # ... logging: type: inline loggers: kafka.root.logger.level: "INFO" # ... zookeeper: # ... logging: type: inline loggers: zookeeper.root.logger: "INFO" # ... entityOperator: # ... topicOperator: # ... logging: type: inline loggers: rootLogger.level: INFO # ... userOperator: # ... logging: type: inline loggers: rootLogger.level: INFO # ...
外部日志记录
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka spec: # ... logging: type: external name: customConfigMap # ...
对外部和内联日志记录级别的更改将在不重启的情况下应用到 Kafka 代理。
其他资源
- 垃圾收集器(GC)日志记录也可以启用(或禁用)。有关垃圾回收的更多信息,请参阅 垃圾回收。 第 2.1.17.1 节 “JVM 配置”
- 有关日志级别的更多信息,请参阅 Apache 日志记录服务。
2.1.13. Kafka 机架感知
AMQ Streams 中的机架感知功能有助于在不同机架之间分散 Kafka 代理 Pod 和 Kafka 主题副本。启用机架意识有助于提高 Kafka 代理的可用性以及它们托管的主题。
"rack"可能代表数据中心中的可用性区域、数据中心或实际机架。
2.1.13.1. 在 Kafka 代理中配置机架感知
可以在 Kafka .spec.kafka 的
机架感知。rack
属性中配置 Kafkarack
对象具有一个名为 topologyKey
的必填字段。此键需要与分配给 OpenShift 集群节点的一个标签匹配。OpenShift 在将 Kafka 代理 Pod 调度到节点时使用该标签。如果 OpenShift 集群在云提供商平台上运行,该标签应代表运行该节点的可用性区域。通常,节点使用 topology.kubernetes.io/zone
标签(或旧的 OpenShift 版本上的 failure-domain.beta.kubernetes.io/zone
)标记为 topologyKey
值。如需有关 OpenShift 节点标签的更多信息,请参阅 Well-Known Labels、Annotations 和 Taints。这会导致在区间分散代理 pod,并在 Kafka 代理中设置 代理的 broker.rack
配置参数。
先决条件
- OpenShift 集群
- 一个正在运行的 Cluster Operator
步骤
- 请咨询您的 OpenShift 管理员,了解代表节点要部署到的区域 / 机架的节点标签。
将标签用作拓扑键,编辑
Kafka
资源中的rack
属性。apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... rack: topologyKey: topology.kubernetes.io/zone # ...
创建或更新资源。
这可以使用
oc apply
来完成:oc apply -f your-file
其他资源
- 有关为 Kafka 机架感知配置 init 容器镜像的详情请参考 第 2.1.18 节 “容器镜像”。
2.1.14. healthchecks
健康检查是定期测试,可验证应用的健康状况。当健康检查探测失败时,OpenShift 假定应用不健康,并尝试修复它。
OpenShift 支持两种类型的 Healthcheck 探测:
- 存活度探测
- 就绪度探测
有关探测的详情,请参阅 配置存活度和就绪度探测。这两种探测都用于 AMQ Streams 组件。
用户可以配置所选的存活度和就绪度探测选项。
2.1.14.1. healthcheck 配置
可以使用以下资源中的 livenessProbe 和
属性来配置存活度和就绪度探测:
readinessProbe
-
Kafka.spec.kafka
-
Kafka.spec.zookeeper
-
Kafka.spec.entityOperator.tlsSidecar
-
Kafka.spec.entityOperator.topicOperator
-
Kafka.spec.entityOperator.userOperator
-
Kafka.spec.kafkaExporter
-
KafkaConnect.spec
-
KafkaConnectS2I.spec
-
KafkaMirrorMaker.spec
-
KafkaBridge.spec
Live nessProbe
和 readinessProbe
均支持以下选项:
-
initialDelaySeconds
-
timeoutSeconds
-
periodSeconds
-
successThreshold
-
failureThreshold
有关 livenessProbe
和 readinessProbe
选项的详情请参考 第 B.45 节 “探测
模式参考”。
存活度和就绪度探测配置示例
# ... readinessProbe: initialDelaySeconds: 15 timeoutSeconds: 5 livenessProbe: initialDelaySeconds: 15 timeoutSeconds: 5 # ...
2.1.14.2. 配置健康检查
先决条件
- OpenShift 集群
- 一个正在运行的 Cluster Operator
步骤
编辑
Kafka
资源中的liveness
属性。例如:Probe
或 readinessProbeapiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... readinessProbe: initialDelaySeconds: 15 timeoutSeconds: 5 livenessProbe: initialDelaySeconds: 15 timeoutSeconds: 5 # ... zookeeper: # ...
创建或更新资源。
这可以使用
oc apply
来完成:oc apply -f your-file
2.1.15. Prometheus 指标
AMQ Streams 支持使用 Prometheus JMX 导出器的 Prometheus 指标将 Apache Kafka 和 ZooKeeper 支持的 JMX 指标转换为 Prometheus 指标。启用指标数据后,它们会在端口 9404 上公开。
有关设置和部署 Prometheus 和 Grafana 的更多信息,请参阅 OpenShift 指南中的 Deploying 和 升级 AMQ Streams 中的 介绍 Metrics 到 Kafka。
2.1.15.1. 指标配置
Prometheus 指标通过在以下资源中配置 metrics
属性来启用:
-
Kafka.spec.kafka
-
Kafka.spec.zookeeper
-
KafkaConnect.spec
-
KafkaConnectS2I.spec
当资源中没有定义 metrics
属性时,Prometheus 指标将被禁用。要在不进一步配置的情况下启用 Prometheus 指标导出,您可以将其设置为空对象({}
)。
在没有任何进一步配置的情况下启用指标的示例
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... metrics: {} # ... zookeeper: # ...
指标
属性可能包含 Prometheus JMX 导出器的其他配置。
使用其他 Prometheus JMX Exporter 配置启用指标示例
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... metrics: lowercaseOutputName: true rules: - pattern: "kafka.server<type=(.+), name=(.+)PerSec\\w*><>Count" name: "kafka_server_$1_$2_total" - pattern: "kafka.server<type=(.+), name=(.+)PerSec\\w*, topic=(.+)><>Count" name: "kafka_server_$1_$2_total" labels: topic: "$3" # ... zookeeper: # ...
2.1.15.2. 配置 Prometheus 指标
先决条件
- OpenShift 集群
- 一个正在运行的 Cluster Operator
步骤
编辑
Kafka
资源中的metrics
属性。例如:apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... zookeeper: # ... metrics: lowercaseOutputName: true # ...
创建或更新资源。
这可以使用
oc apply
来完成:oc apply -f your-file
2.1.16. JMX 选项
AMQ Streams 支持通过在 9999 上打开 JMX 端口从 Kafka 代理获取 JMX 指标。您可以获取有关每个 Kafka 代理的各种指标,例如使用数据,如 BytesPerSecond
值或代理网络的请求率。AMQ Streams 支持打开受密码保护的 JMX 端口或不受保护的 JMX 端口。
2.1.16.1. 配置 JMX 选项
先决条件
- OpenShift 集群
- 一个正在运行的 Cluster Operator
您可以使用以下资源中的 jmxOptions
属性配置 JMX 选项:
-
Kafka.spec.kafka
您可以为 Kafka 代理上打开的 JMX 端口配置用户名和密码保护。
保护 JMX 端口
您可以保护 JMX 端口,以防止未经授权的 pod 访问该端口。目前,JMX 端口只能使用用户名和密码进行保护。要启用 JMX 端口的安全性,请将 身份验证
字段中的 type
参数设置为 password
:
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... jmxOptions: authentication: type: "password" # ... zookeeper: # ...
这可让您将 pod 部署到集群中,并使用无头服务来获取 JMX 指标,并指定您要解决的代理。要从 broker 0 获得 JMX 指标,我们解决在无头服务前附加代理 0 的无外设服务:
"<cluster-name>-kafka-0-<cluster-name>-<headless-service-name>"
如果 JMX 端口受到保护,您可以通过在 pod 部署中引用 JMX 机密来获取用户名和密码。
使用打开的 JMX 端口
要禁用 JMX 端口的安全性,请不要填写 身份验证
字段
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... jmxOptions: {} # ... zookeeper: # ...
这只会在无头服务上打开 JMX 端口,您可以按照上述方法将 pod 部署到集群中。唯一的区别在于,任何容器集都能够从 JMX 端口读取。
2.1.17. JVM 选项
AMQ 流的以下组件在虚拟机(VM)中运行:
- Apache Kafka
- Apache ZooKeeper
- Apache Kafka Connect
- Apache Kafka MirrorMaker
- AMQ Streams Kafka Bridge
JVM 配置选项针对不同的平台和架构优化性能。AMQ Streams 允许您配置其中一些选项。
2.1.17.1. JVM 配置
使用 jvmOptions
属性,为组件所运行的 JVM 配置支持的选项。
支持的 JVM 选项有助于优化不同平台和架构的性能。
2.1.17.2. 配置 JVM 选项
先决条件
- OpenShift 集群
- 一个正在运行的 Cluster Operator
步骤
编辑 Kafka、KafkaConnect、
Kafka
Connect
S2I、Kafka
MirrorMaker 或 Kafka
Bridge
资源中的jvmOptions
属性。例如:apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... jvmOptions: "-Xmx": "8g" "-Xms": "8g" # ... zookeeper: # ...
创建或更新资源。
这可以使用
oc apply
来完成:oc apply -f your-file
2.1.18. 容器镜像
AMQ Streams 允许您配置将用于其组件的容器镜像。建议仅在特殊情况下覆盖容器镜像,这时您需要使用不同的容器注册表。例如,因为您的网络不允许访问 AMQ Streams 使用的容器存储库。在这种情况下,您应该复制 AMQ Streams 镜像或从源构建它们。如果配置的镜像与 AMQ Streams 镜像不兼容,它可能无法正常工作。
2.1.18.1. 容器镜像配置
使用 image
属性 来指定要使用的容器镜像。
建议仅在特殊情况下覆盖容器镜像。
2.1.18.2. 配置容器镜像
先决条件
- OpenShift 集群
- 一个正在运行的 Cluster Operator
步骤
编辑
Kafka
资源中的image
属性。例如:apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... image: my-org/my-image:latest # ... zookeeper: # ...
创建或更新资源。
这可以使用
oc apply
来完成:oc apply -f your-file
2.1.19. TLS sidecar
sidecar 是一个在 pod 中运行的容器,但用于支持目的。在 AMQ Streams 中,TLS sidecar 使用 TLS 来加密和解密各种组件和 ZooKeeper 之间的所有通信。
TLS sidecar 用于:
- 实体 Operator
- Sything Control
2.1.19.1. TLS sidecar 配置
可以使用 中的 tlsSidecar
属性配置 TLS sidecar:
-
Kafka.spec.kafka
-
Kafka.spec.zookeeper
-
Kafka.spec.entityOperator
TLS sidecar 支持以下附加选项:
-
镜像
-
资源
-
logLevel
-
readinessProbe
-
livenessProbe
resources
属性可用于指定为 TLS sidecar 分配的 内存和 CPU 资源。
image
属性可用于配置要使用的容器镜像。有关配置自定义容器镜像的详情,请参考 第 2.1.18 节 “容器镜像”。
logLevel
属性用于指定日志级别。支持以下日志记录级别:
- emerg
- alert
- crit
- err
- warning
- 备注
- info
- debug
默认值为 notice。
有关为健康检查配置 readinessProbe
和 livenessProbe
属性的更多信息,请参阅 第 2.1.14.1 节 “healthcheck 配置”。
TLS sidecar 配置示例
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... tlsSidecar: image: my-org/my-image:latest resources: requests: cpu: 200m memory: 64Mi limits: cpu: 500m memory: 128Mi logLevel: debug readinessProbe: initialDelaySeconds: 15 timeoutSeconds: 5 livenessProbe: initialDelaySeconds: 15 timeoutSeconds: 5 # ... zookeeper: # ...
2.1.19.2. 配置 TLS sidecar
先决条件
- OpenShift 集群
- 一个正在运行的 Cluster Operator
步骤
编辑
Kafka
资源中的tlsSidecar
属性。例如:apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... zookeeper: # ... entityOperator: # ... tlsSidecar: resources: requests: cpu: 200m memory: 64Mi limits: cpu: 500m memory: 128Mi # ... cruiseControl: # ... tlsSidecar: resources: requests: cpu: 200m memory: 64Mi limits: cpu: 500m memory: 128Mi # ...
创建或更新资源。
这可以使用
oc apply
来完成:oc apply -f your-file
2.1.20. 配置 pod 调度
当两个应用调度到同一个 OpenShift 节点时,这两个应用可能使用相同的资源,如磁盘 I/O 并影响性能。这可能会导致性能下降。以避免与其他关键工作负载共享节点的方式,仅为 Kafka 使用正确的节点或专用一组节点是如何避免这些问题的最佳方法,调度 Kafka pod 是避免这些问题的最佳方法。
2.1.20.1. 根据其他应用程序调度 pod
2.1.20.1.1. 避免关键应用程序共享节点
Pod 反关联性可用于确保关键应用永远不会调度到同一磁盘上。在运行 Kafka 集群时,建议使用 pod 反关联性来确保 Kafka 代理不与其他工作负载(如数据库)共享节点。
2.1.20.1.2. 关联性
可以使用以下资源中 的关联性属性来配置关联性
:
-
Kafka.spec.kafka.template.pod
-
Kafka.spec.zookeeper.template.pod
-
Kafka.spec.entityOperator.template.pod
-
KafkaConnect.spec.template.pod
-
KafkaConnectS2I.spec.template.pod
-
KafkaBridge.spec.template.pod
关联性配置可以包含不同类型的关联性:
- pod 关联性和反关联性
- 节点关联性
affinity
属性的格式遵循 OpenShift 规范。如需了解更多详细信息,请参阅 Kubernetes 节点和 pod 关联性文档。
2.1.20.1.3. 在 Kafka 组件中配置 pod 反关联性
先决条件
- OpenShift 集群
- 一个正在运行的 Cluster Operator
步骤
编辑指定集群部署的资源中的
affinity
属性。使用标签指定不应调度到同一节点上的 pod。topologyKey
应设置为kubernetes.io/hostname
,以指定不应将所选 pod 调度到具有相同主机名的节点。例如:apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka spec: kafka: # ... template: pod: affinity: podAntiAffinity: requiredDuringSchedulingIgnoredDuringExecution: - labelSelector: matchExpressions: - key: application operator: In values: - postgresql - mongodb topologyKey: "kubernetes.io/hostname" # ... zookeeper: # ...
创建或更新资源。
这可以使用
oc apply
来完成:oc apply -f your-file
2.1.20.2. 将 pod 调度到特定的节点
2.1.20.2.1. 节点调度
OpenShift 集群通常包含许多不同类型的工作程序节点。些已针对 CPU 繁重工作负载优化,有些则用于内存,另一些则可能针对存储(快速本地 SSD)或网络进行了优化。使用不同的节点有助于优化成本和性能。要获得最佳性能,务必要允许调度 AMQ Streams 组件以使用正确的节点。
OpenShift 使用节点关联性将工作负载调度到特定的节点上。通过节点关联性,您可以为要在其上调度 pod 的节点创建调度约束。约束指定为标签选择器。您可以使用内置节点标签(如 beta.kubernetes.io/instance-type)
或自定义标签来指定标签,以选择正确的节点。
2.1.20.2.2. 关联性
可以使用以下资源中 的关联性属性来配置关联性
:
-
Kafka.spec.kafka.template.pod
-
Kafka.spec.zookeeper.template.pod
-
Kafka.spec.entityOperator.template.pod
-
KafkaConnect.spec.template.pod
-
KafkaConnectS2I.spec.template.pod
-
KafkaBridge.spec.template.pod
关联性配置可以包含不同类型的关联性:
- pod 关联性和反关联性
- 节点关联性
affinity
属性的格式遵循 OpenShift 规范。如需了解更多详细信息,请参阅 Kubernetes 节点和 pod 关联性文档。
2.1.20.2.3. 在 Kafka 组件中配置节点关联性
先决条件
- OpenShift 集群
- 一个正在运行的 Cluster Operator
步骤
标记调度 AMQ Streams 组件的节点。
可以使用
oc label
来完成此操作:oc label node your-node node-type=fast-network
或者,某些现有标签可以被重复使用。
编辑指定集群部署的资源中的
affinity
属性。例如:apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka spec: kafka: # ... template: pod: affinity: nodeAffinity: requiredDuringSchedulingIgnoredDuringExecution: nodeSelectorTerms: - matchExpressions: - key: node-type operator: In values: - fast-network # ... zookeeper: # ...
创建或更新资源。
这可以使用
oc apply
来完成:oc apply -f your-file
2.1.20.3. 使用专用节点
2.1.20.3.1. 专用节点
集群管理员可以将选定的 OpenShift 节点标记为污点。具有污点的节点不在常规调度中,一般的 pod 不会调度到它们上运行。只有可以容许节点上设置的污点的服务才可以调度到其中。此类节点上运行的唯一其他服务是系统服务,如日志收集器或软件定义型网络。
污点可用于创建专用节点。在专用节点上运行 Kafka 及其组件可能会有许多优点。同一节点上运行的其他应用程序不会造成干扰或消耗 Kafka 所需的资源。这可提高性能和稳定性。
2.1.20.3.2. 关联性
可以使用以下资源中 的关联性属性来配置关联性
:
-
Kafka.spec.kafka.template.pod
-
Kafka.spec.zookeeper.template.pod
-
Kafka.spec.entityOperator.template.pod
-
KafkaConnect.spec.template.pod
-
KafkaConnectS2I.spec.template.pod
-
KafkaBridge.spec.template.pod
关联性配置可以包含不同类型的关联性:
- pod 关联性和反关联性
- 节点关联性
affinity
属性的格式遵循 OpenShift 规范。如需了解更多详细信息,请参阅 Kubernetes 节点和 pod 关联性文档。
2.1.20.3.3. 容限(Tolerations)
可以使用以下资源中的 tolerations
属性来配置容限:
-
Kafka.spec.kafka.template.pod
-
Kafka.spec.zookeeper.template.pod
-
Kafka.spec.entityOperator.template.pod
-
KafkaConnect.spec.template.pod
-
KafkaConnectS2I.spec.template.pod
-
KafkaBridge.spec.template.pod
tolerations
属性的格式遵循 OpenShift 规范。如需了解更多详细信息,请参阅 Kubernetes 污点和容限。
2.1.20.3.4. 设置专用节点并在节点上调度 pod
先决条件
- OpenShift 集群
- 一个正在运行的 Cluster Operator
步骤
- 选择应当用作专用的节点。
- 确保这些节点上未调度工作负载。
在所选节点上设置污点:
可以使用
oc adm taint
来完成此操作:oc adm taint node your-node dedicated=Kafka:NoSchedule
另外,也向所选节点添加标签。
可以使用
oc label
来完成此操作:oc label node your-node dedicated=Kafka
编辑
指定集群部署的资源中的关联性
和容限
属性。例如:apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka spec: kafka: # ... template: pod: tolerations: - key: "dedicated" operator: "Equal" value: "Kafka" effect: "NoSchedule" affinity: nodeAffinity: requiredDuringSchedulingIgnoredDuringExecution: nodeSelectorTerms: - matchExpressions: - key: dedicated operator: In values: - Kafka # ... zookeeper: # ...
创建或更新资源。
这可以使用
oc apply
来完成:oc apply -f your-file
2.1.21. Kafka Exporter
您可以将 Kafka
资源配置为自动在集群中部署 Kafka Exporter。
Kafka Exporter 提取数据作为 Prometheus 指标,主要与偏移、消费者组、消费者滞后和主题相关的数据。
有关设置 Kafka Exporter 以及监控消费滞后性能的原因,请参阅 OpenShift 指南中的部署和升级 AMQ Streams 中的 Kafka Exporter。
2.1.22. 对 Kafka 集群执行滚动更新
此流程描述了如何使用 OpenShift 注解手动触发现有 Kafka 集群的滚动更新。
先决条件
有关运行 以下的说明,请参阅 OpenShift 指南中的部署和升级 AMQ Streams :
步骤
查找控制您要手动更新的 Kafka pod 的
StatefulSet
的名称。例如,如果您的 Kafka 集群名为 my-cluster,则对应的
StatefulSet
名为 my-cluster-kafka。给 OpenShift 中的
StatefulSet
资源标注。例如,使用oc annotate
:oc annotate statefulset cluster-name-kafka strimzi.io/manual-rolling-update=true
-
等待下一次协调发生(默认为每隔两分钟)。只要协调过程检测到注解的
StatefulSet
,则会触发注解的滚动更新。当所有 pod 的滚动更新完成后,注解会从StatefulSet
中删除。
2.1.23. 执行 ZooKeeper 集群的滚动更新
此流程描述了如何使用 OpenShift 注释手动触发现有 ZooKeeper 集群的滚动更新。
先决条件
有关运行 以下的说明,请参阅 OpenShift 指南中的部署和升级 AMQ Streams :
步骤
查找控制您要手动更新的 ZooKeeper pod 的
StatefulSet
的名称。例如,如果您的 Kafka 集群名为 my-cluster,则对应的
StatefulSet 被
命名为 my-cluster-zookeeper。给 OpenShift 中的
StatefulSet
资源标注。例如,使用oc annotate
:oc annotate statefulset cluster-name-zookeeper strimzi.io/manual-rolling-update=true
-
等待下一次协调发生(默认为每隔两分钟)。只要协调过程检测到注解的
StatefulSet
,则会触发注解的滚动更新。当所有 pod 的滚动更新完成后,注解会从StatefulSet
中删除。
2.1.24. 扩展集群
2.1.24.1. 扩展 Kafka 集群
2.1.24.1.1. 在集群中添加代理
提高某个主题吞吐量的主要方法是增加该主题的分区数量。这是因为额外的分区允许在集群中的不同代理之间共享该主题的负载。但是,如果每个代理都受到使用更多分区的特定资源(通常是 I/O)限制,则不会增加吞吐量。相反,您需要在集群中添加代理。
当您在集群中添加额外的代理时,Kafka 不会自动为其分配任何分区。您必须决定从现有代理迁移到新代理的分区。
在所有代理之间重新分布分区后,应该减少每个代理的资源利用率。
2.1.24.1.2. 从集群中删除代理
因为 AMQ Streams 使用 StatefulSet 来管理代理 Pod,所以
您无法从集群中删除 任何 pod。您只能从集群中移除一个或多个数字最高的 pod。例如,在一个有 12 个代理的集群中,pod 被命名为 cluster-name-kafka-0
to cluster-name-kafka-11
。如果您决定缩减一个代理,则会删除 cluster-name-kafka-11
。
在从集群中删除代理前,请确保它没有被分配给任何分区。您还应决定剩余的代理中哪些将负责要停用代理中的每个分区。代理没有分配分区后,您可以安全地缩减集群。
2.1.24.2. 分区重新分配
Topic Operator 目前不支持将副本分配给不同的代理,因此需要直接连接到代理 Pod 以将副本分配到代理。
在代理 pod 中,kafka-reassign-partitions.sh
实用程序允许您将分区重新分配给不同的代理。
它有三种不同的模式:
--generate
- 取一组主题和代理,并生成 重新分配 JSON 文件,该文件将导致这些主题的分区分配给这些代理。由于这适用于整个主题,因此当您需要重新分配某些主题的部分分区时,无法使用它。
--execute
- 取 重新分配 JSON 文件 并将其应用到集群中的分区和代理。因此获得分区的代理会成为分区领导的追随者。对于给定分区,新代理一旦捕获并加入 ISR(同步副本)后,旧的代理将停止成为追随者,并删除其副本。
--verify
-
使用与
--execute
步骤相同的 重新分配 JSON 文件,--verify
检查文件中的所有分区是否已移到其预期代理中。如果重新分配完成后,--verify 也会移除任何有效的 节流。除非被删除,否则节流将继续影响群集,即使重新分配完成后也是如此。
只能在任何给定时间在集群中运行一个重新分配,且无法取消正在运行的重新分配。如果您需要取消重新分配,请等待它完成,然后执行另一个重新分配以恢复第一次重新分配的影响。kafka-reassign-partitions.sh
将显示此重新版本的重新分配 JSON 作为其输出的一部分。在需要停止进行中的重新分配时,应将非常大的重新分配分成几个较小的重新分配。
2.1.24.2.1. 重新分配 JSON 文件
重新分配 JSON 文件 有一个特定的结构:
{
"version": 1,
"partitions": [
<PartitionObjects>
]
}
其中 <PartitionObjects> 是一个用逗号分开的对象列表,例如:
{ "topic": <TopicName>, "partition": <Partition>, "replicas": [ <AssignedBrokerIds> ] }
虽然 Kafka 也支持 "log_dirs"
属性,但这不应用于 AMQ Streams。
以下是重新分配 JSON 文件的示例,该文件分配主题 主题a
、分区 4
到代理 2
、4
和 7
,以及 主题 2 的分区 2
分配给
代理 1
、5
和 7
:
{ "version": 1, "partitions": [ { "topic": "topic-a", "partition": 4, "replicas": [2,4,7] }, { "topic": "topic-b", "partition": 2, "replicas": [1,5,7] } ] }
JSON 中没有包括的分区不会更改。
2.1.24.2.2. 在 JBOD 卷间重新分配分区
在 Kafka 集群中使用 JBOD 存储时,您可以选择在特定卷及其日志目录之间重新分配分区(每个卷都有一个日志目录)。要将分区重新分配给特定卷,请将 log_dirs
选项添加到重新分配 JSON 文件中的 <PartitionObjects> 中。
{ "topic": <TopicName>, "partition": <Partition>, "replicas": [ <AssignedBrokerIds> ], "log_dirs": [ <AssignedLogDirs> ] }
log_dirs
对象应包含与 replicas 对象中指定的副本数相同的日志目录 数量
。该值应当是日志目录的绝对路径或 any
关键字。
例如:
{ "topic": "topic-a", "partition": 4, "replicas": [2,4,7]. "log_dirs": [ "/var/lib/kafka/data-0/kafka-log2", "/var/lib/kafka/data-0/kafka-log4", "/var/lib/kafka/data-0/kafka-log7" ] }
2.1.24.3. 生成重新分配 JSON 文件
此流程描述了如何生成重新分配 JSON 文件,该文件使用 kafka-reassign-partitions.sh
工具为给定主题集重新分配所有分区。
先决条件
- 一个正在运行的 Cluster Operator
-
Kafka
资源 - 重新分配分区的一组主题
步骤
准备名为 topic
.json
的 JSON 文件,其中列出要移动的主题。它必须具有以下结构:{ "version": 1, "topics": [ <TopicObjects> ] }
其中 <TopicObjects> 是一个以逗号分隔的对象列表,例如:
{ "topic": <TopicName> }
例如,如果要重新分配
topic-a 和
分区,则需要准备类似以下的 topictopic-
b 的所有.json
文件:{ "version": 1, "topics": [ { "topic": "topic-a"}, { "topic": "topic-b"} ] }
将
topic.json
文件复制到其中一个代理 pod 中:cat topics.json | oc exec -c kafka <BrokerPod> -i -- \ /bin/bash -c \ 'cat > /tmp/topics.json'
使用
kafka-reassign-partitions.sh
命令生成重新分配 JSON。oc exec <BrokerPod> -c kafka -it -- \ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \ --topics-to-move-json-file /tmp/topics.json \ --broker-list <BrokerList> \ --generate
例如,将
topic-a 和
的所有分区移动到 brokertopic-
b4
和7
oc exec <BrokerPod> -c kafka -it -- \ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \ --topics-to-move-json-file /tmp/topics.json \ --broker-list 4,7 \ --generate
2.1.24.4. 手动创建重新分配 JSON 文件
如果要移动特定的分区,可以手动创建重新分配 JSON 文件。
2.1.24.5. 重新分配节流
分区重新分配可能是一个缓慢的过程,因为它涉及到在代理之间传输大量数据。为避免对客户端造成重大影响,您可以限制重新分配过程。这可能会导致重新分配需要更长的时间。
- 如果限流太低,新分配的代理将无法与正在发布的记录保持同步,并且重新分配永远不会完成。
- 如果限流太高,客户端就会受到影响。
例如,对于制作者而言,这可能会比等待确认的正常延迟更高。对于消费者而言,这可能会因为轮询之间延迟较高而导致吞吐量下降。
2.1.24.6. 扩展 Kafka 集群
这个步骤描述了如何在 Kafka 集群中增加代理数量。
先决条件
- 现有的 Kafka 集群。
-
名为 reassignment
.json
的重新分配 JSON 文件 描述了如何在放大的集群中将分区重新分配给代理。
步骤
-
通过增加
Kafka.spec.kafka.replicas
配置选项,根据需要添加多个新代理。 - 验证新代理 Pod 是否已启动。
将
reassignment.json
文件复制到您稍后要执行命令的代理 pod 中:cat reassignment.json | \ oc exec broker-pod -c kafka -i -- /bin/bash -c \ 'cat > /tmp/reassignment.json'
例如:
cat reassignment.json | \ oc exec my-cluster-kafka-0 -c kafka -i -- /bin/bash -c \ 'cat > /tmp/reassignment.json'
使用同一代理 pod 的
kafka-reassign-partitions.sh
命令行工具执行分区重新分配。oc exec broker-pod -c kafka -it -- \ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \ --reassignment-json-file /tmp/reassignment.json \ --execute
如果您要节流复制,您也可以使用
--throttle
选项,每秒使用节流率(以字节为单位)。例如:oc exec my-cluster-kafka-0 -c kafka -it -- \ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \ --reassignment-json-file /tmp/reassignment.json \ --throttle 5000000 \ --execute
此命令将显示两个重新分配 JSON 对象:第一个记录了正在移动的分区的当前分配。如果稍后需要恢复重新分配,您应该将其保存到本地文件(而不是 pod 中的文件)。第二个 JSON 对象是您在重新分配 JSON 文件中传递的目标重新分配。
如果您需要在重新分配期间更改节流,您可以使用具有不同节流率的同一命令行。例如:
oc exec my-cluster-kafka-0 -c kafka -it -- \ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \ --reassignment-json-file /tmp/reassignment.json \ --throttle 10000000 \ --execute
定期验证是否使用任何代理 Pod 的
kafka-reassign-partitions.sh
命令行工具完成重新分配。这与上一步中的命令相同,但使用--verify
选项而不是--execute
选项。oc exec broker-pod -c kafka -it -- \ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \ --reassignment-json-file /tmp/reassignment.json \ --verify
例如,
oc exec my-cluster-kafka-0 -c kafka -it -- \ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \ --reassignment-json-file /tmp/reassignment.json \ --verify
-
当
--verify
命令报告每个分区成功完成时,重新分配已完成。最后的--verify
还会导致移除任何重新分配节流。现在,如果您保存了 JSON,将分配还原到其原始代理中,您可以删除还原文件。
2.1.24.7. 缩减 Kafka 集群
其他资源
这个步骤描述了如何减少 Kafka 集群中的代理数量。
先决条件
- 现有的 Kafka 集群。
-
名为 reassignment
.json
的重新分配 JSON 文件 描述在删除了最高编号 Pod 中的代理后,应如何将
分区重新分配给集群中的代理。
步骤
将
reassignment.json
文件复制到您稍后要执行命令的代理 pod 中:cat reassignment.json | \ oc exec broker-pod -c kafka -i -- /bin/bash -c \ 'cat > /tmp/reassignment.json'
例如:
cat reassignment.json | \ oc exec my-cluster-kafka-0 -c kafka -i -- /bin/bash -c \ 'cat > /tmp/reassignment.json'
使用同一代理 pod 的
kafka-reassign-partitions.sh
命令行工具执行分区重新分配。oc exec broker-pod -c kafka -it -- \ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \ --reassignment-json-file /tmp/reassignment.json \ --execute
如果您要节流复制,您也可以使用
--throttle
选项,每秒使用节流率(以字节为单位)。例如:oc exec my-cluster-kafka-0 -c kafka -it -- \ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \ --reassignment-json-file /tmp/reassignment.json \ --throttle 5000000 \ --execute
此命令将显示两个重新分配 JSON 对象:第一个记录了正在移动的分区的当前分配。如果稍后需要恢复重新分配,您应该将其保存到本地文件(而不是 pod 中的文件)。第二个 JSON 对象是您在重新分配 JSON 文件中传递的目标重新分配。
如果您需要在重新分配期间更改节流,您可以使用具有不同节流率的同一命令行。例如:
oc exec my-cluster-kafka-0 -c kafka -it -- \ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \ --reassignment-json-file /tmp/reassignment.json \ --throttle 10000000 \ --execute
定期验证是否使用任何代理 Pod 的
kafka-reassign-partitions.sh
命令行工具完成重新分配。这与上一步中的命令相同,但使用--verify
选项而不是--execute
选项。oc exec broker-pod -c kafka -it -- \ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \ --reassignment-json-file /tmp/reassignment.json \ --verify
例如,
oc exec my-cluster-kafka-0 -c kafka -it -- \ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \ --reassignment-json-file /tmp/reassignment.json \ --verify
-
当
--verify
命令报告每个分区成功完成时,重新分配已完成。最后的--verify
还会导致移除任何重新分配节流。现在,如果您保存了 JSON,将分配还原到其原始代理中,您可以删除还原文件。 所有分区重新分配完成后,删除的代理不应对集群中的任何分区负责。您可以通过检查代理的数据日志目录是否包含任何实时分区日志来验证这一点。如果代理上的日志目录包含与扩展正则表达式
[a-zA-Z0-9.-]+\ 不匹配的目录。[a-z0-9]+-delete$
仍然具有实时分区,不应停止。您可以执行以下命令检查:
oc exec my-cluster-kafka-0 -c kafka -it -- \ /bin/bash -c \ "ls -l /var/lib/kafka/kafka-log_<N>_ | grep -E '^d' | grep -vE '[a-zA-Z0-9.-]+\.[a-z0-9]+-delete$'"
其中 N
是要删除的 Pod
数。如果上述命令显示任何输出,则代理仍然有实时分区。在这种情况下,重新分配没有完成,或者重新分配 JSON 文件不正确。
-
确认代理没有实时分区后,您可以编辑
Kafka.spec.kafka.replicas 资源的
spec.kafka.replicas,这会缩减Kafka
.StatefulSet
,从而删除最高数字的代理 Pod。
2.1.25. 手动删除 Kafka 节点
其他资源
此流程描述了如何使用 OpenShift 注解删除现有 Kafka 节点。删除 Kafka 节点包括删除运行 Kafka 代理的 Pod
和相关的 PersistentVolumeClaim
(如果集群部署了持久性存储)。删除后,Pod
及其相关的 PersistentVolumeClaim
会被自动重新创建。
删除 PersistentVolumeClaim
可能会导致持久性数据丢失。只有在您遇到存储问题时才应执行以下步骤。
先决条件
有关运行 以下的说明,请参阅 OpenShift 指南中的部署和升级 AMQ Streams :
步骤
查找您要删除的
Pod
的名称。例如,如果集群命名为 cluster-name,pod 被命名为 cluster-name-kafka-index,其中 索引 以零开始,结尾是副本总数。
给 OpenShift
中的
容器集资源标注:使用
oc annotate
:oc annotate pod cluster-name-kafka-index strimzi.io/delete-pod-and-pvc=true
- 等待下一次协调,当带有底层持久性卷声明的注解的 pod 被删除后再重新创建。
2.1.26. 手动删除 ZooKeeper 节点
此流程描述了如何使用 OpenShift 注释删除现有的 ZooKeeper 节点。删除 ZooKeeper 节点包括删除运行 ZooKeeper 的 Pod
和相关的 PersistentVolumeClaim
(如果集群部署了持久性存储)。删除后,Pod
及其相关的 PersistentVolumeClaim
会被自动重新创建。
删除 PersistentVolumeClaim
可能会导致持久性数据丢失。只有在您遇到存储问题时才应执行以下步骤。
先决条件
有关运行 以下的说明,请参阅 OpenShift 指南中的部署和升级 AMQ Streams :
步骤
查找您要删除的
Pod
的名称。例如,如果集群名为 cluster-name,pod 被命名为 cluster-name-zookeeper-index,其中 索引 从零开始,结尾是副本总数。
给 OpenShift
中的
容器集资源标注:使用
oc annotate
:oc annotate pod cluster-name-zookeeper-index strimzi.io/delete-pod-and-pvc=true
- 等待下一次协调,当带有底层持久性卷声明的注解的 pod 被删除后再重新创建。
2.1.27. 用于滚动更新的维护时间窗
通过维护时间窗,您可以计划对 Kafka 和 ZooKeeper 集群进行某些滚动更新,以便在方便的时间启动。
2.1.27.1. 维护时间窗概述
在大多数情况下,Cluster Operator 只更新 Kafka 或 ZooKeeper 集群,以响应对对应 Kafka
资源的更改。这可让您计划何时对 Kafka
资源应用更改,以最大程度降低对 Kafka 客户端应用程序的影响。
但是,在不对 Kafka 资源
进行任何相应的更改的情况下,可能会对 Kafka 和 ZooKeeper 集群进行一些更新。例如,如果 Cluster Operator 管理的 CA(证书授权机构)证书接近到期时间,则 Cluster Operator 将需要执行滚动重启。
虽然 pod 的滚动重启不应影响服务 的可用性 (假设正确的代理和主题配置),这可能会影响 Kafka 客户端应用程序 的性能。通过维护时间窗,您可以调度 Kafka 和 ZooKeeper 集群的自发滚动更新,以便在方便的时间开始。如果没有为群集配置维护时间窗,那么这种自发滚动更新可能会在不方便的时间(如在可预测的高负载期间)发生。
2.1.27.2. 维护时间窗定义
您可以通过在 Kafka.spec.maintenanceTimeWindows
属性中输入字符串数组来配置维护时间窗。每个字符串是一个 cron 表达式,解释为使用 UTC(协调世界时间,其实际用途与 Greenwich Mean Time 相同)。
以下示例配置了一个维护时间窗口,该时间窗口从午夜开始,到 01:59am(UTC)、周日、周一、星期二和周四结束:
# ... maintenanceTimeWindows: - "* * 0-1 ? * SUN,MON,TUE,WED,THU *" # ...
在实践中,维护窗口应当与 Kafka.spec.clusterCa.renewalDays
和 Kafka.spec.clientsCa.renewalDays
属性一同设置,以确保在
配置的维护时间窗口中完成必要的 CA 证书续订。
AMQ Streams 不完全根据给定的窗口调度维护操作。相反,对于每个协调,它会检查维护窗口当前是否"打开"。这意味着,给定时间窗内的维护操作启动可能会被 Cluster Operator 协调间隔延迟。因此维护时间窗必须至少是这个长。
其他资源
- 有关 Cluster Operator 配置的更多信息,请参阅 第 5.1.1 节 “Cluster Operator 配置”。
2.1.27.3. 配置维护时间窗
您可以配置维护时间窗,用于由支持的进程触发的滚动更新。
先决条件
- OpenShift 集群。
- Cluster Operator 正在运行。
步骤
在
Kafka
资源中添加或编辑maintenanceTimeWindows
属性。例如,允许在 0800 到 1059 之间以及 1400 到 1559 之间的维护,您可以设置maintenanceTimeWindows
,如下所示:apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... zookeeper: # ... maintenanceTimeWindows: - "* * 8-10 * * ?" - "* * 14-15 * * ?"
创建或更新资源。
这可以使用
oc apply
来完成:oc apply -f your-file
其他资源
- 执行 Kafka 集群的滚动更新,请参阅 第 2.1.22 节 “对 Kafka 集群执行滚动更新”
- 执行 ZooKeeper 集群的滚动更新,请参阅 第 2.1.23 节 “执行 ZooKeeper 集群的滚动更新”
2.1.28. 手动续订 CA 证书
在相应证书续订期开始时,集群和客户端 CA 证书会自动续订。如果 Kafka.spec.clusterCa.generateCertificateAuthority
和 Kafka.spec.clientsCa.generateCertificateAuthority
设置为 false
,则 CA 证书不会自动续订。
您可以在证书续订期限开始前手动续订其中一个或两个证书。出于安全原因 或更改了证书的续订期或有效期,您可以这样做。
更新的证书使用与旧证书相同的私钥。
先决条件
- Cluster Operator 正在运行。
- 安装 CA 证书和私钥的 Kafka 集群。
步骤
将
strimzi.io/force-renew
注解应用到包含您要续订的 CA 证书的Secret
。表 2.1. 强制续订证书的 Secret 注解 证书 Secret annotate 命令 集群 CA
kAFKA-CLUSTER-NAME-cluster-ca-cert
oc annotate secret KAFKA-CLUSTER-NAME-cluster-ca-cert strimzi.io/force-renew=true
客户端 CA
kAFKA-CLUSTER-NAME-clients-ca-cert
oc annotate secret KAFKA-CLUSTER-NAME-clients-ca-cert strimzi.io/force-renew=true
在下一次协调中,Cluster Operator 会为您注解的
Secret
生成一个新的 CA 证书。如果配置了维护时间窗,Cluster Operator 将在下一个维护时间窗内第一次协调时生成新的 CA 证书。客户端应用程序必须重新载入由 Cluster Operator 更新的集群和客户端 CA 证书。
检查 CA 证书有效的周期:
例如,使用
openssl
命令:oc get secret CA-CERTIFICATE-SECRET -o 'jsonpath={.data.CA-CERTIFICATE}' | base64 -d | openssl x509 -subject -issuer -startdate -enddate -noout
ca-CERTIFICATE-SECRET 是
Secret
的名称,这是用于集群 CA证书的 KAFKA-CLUSTER-NAME-cluster-ca-cert
和用于客户端 CA 证书的KAFKA-CLUSTER-NAME-clients-ca-cert
。ca-CERTIFICATE 是 CA 证书的名称,如
jsonpath={.data.ca\.crt}
。该命令会返回
notBefore
和notAfter
日期,这是 CA 证书的有效周期。例如,对于集群 CA 证书:
subject=O = io.strimzi, CN = cluster-ca v0 issuer=O = io.strimzi, CN = cluster-ca v0 notBefore=Jun 30 09:43:54 2020 GMT notAfter=Jun 30 09:43:54 2021 GMT
从 Secret 删除旧证书。
当组件使用新证书时,旧证书可能仍处于活跃状态。删除旧证书以消除任何潜在的安全风险。
2.1.29. 替换私钥
您可以替换集群 CA 和客户端 CA 证书使用的私钥。替换私钥时,Cluster Operator 会为新私钥生成一个新的 CA 证书。
先决条件
- Cluster Operator 正在运行。
- 安装 CA 证书和私钥的 Kafka 集群。
步骤
将
strimzi.io/force-replace
注解应用到包含您要续订的私钥的Secret
。表 2.2. 替换私钥的命令 的私钥 Secret annotate 命令 集群 CA
<cluster-name>-cluster-ca
oc annotate secret <cluster-name>-cluster-ca strimzi.io/force-replace=true
客户端 CA
<cluster-name>-clients-ca
oc annotate secret <cluster-name>-clients-ca strimzi.io/force-replace=true
在下一次协调中,Cluster Operator 将:
-
为注解的
Secret
生成一个新的私钥 - 生成新的 CA 证书
如果配置了维护时间窗,Cluster Operator 将在下一个维护时间窗内第一次协调时生成新的私钥和 CA 证书。
客户端应用程序必须重新载入由 Cluster Operator 更新的集群和客户端 CA 证书。
2.1.30. 作为 Kafka 集群一部分创建的资源列表
以下资源由 OpenShift 集群中的 Cluster Operator 创建:
共享资源
cluster-name-cluster-ca
- 使用用于加密集群通信的集群 CA 的机密。
cluster-name-cluster-ca-cert
- 使用 Cluster CA 公钥的 secret。这个密钥可以用来验证 Kafka 代理的身份。
cluster-name-clients-ca
- 使用用于签署用户证书的 Clients CA 私钥的 secret
cluster-name-clients-ca-cert
- 使用 Clients CA 公钥的机密。此密钥可用于验证 Kafka 用户的身份。
cluster-name-cluster-operator-certs
- 使用集群操作器密钥与 Kafka 和 ZooKeeper 通信的机密。
zookeeper 节点
cluster-name-zookeeper
- StatefulSet,它负责管理 ZooKeeper 节点 pod。
cluster-name-zookeeper-idx
- 由 Zookeeper StatefulSet 创建的 Pod。
cluster-name-zookeeper-nodes
- 无头服务需要直接解析 ZooKeeper pod IP 地址。
cluster-name-zookeeper-client
- Kafka 代理用于连接 ZooKeeper 节点的服务作为客户端。
cluster-name-zookeeper-config
- 包含 ZooKeeper 辅助配置的 ConfigMap,由 ZooKeeper 节点 pod 挂载为卷。
cluster-name-zookeeper-nodes
- 使用 ZooKeeper 节点密钥的机密。
cluster-name-zookeeper
- Zookeeper 节点使用的服务帐户。
cluster-name-zookeeper
- 为 ZooKeeper 节点配置的 Pod Disruption Budget。
cluster-name-network-policy-zookeeper
- 管理对 ZooKeeper 服务访问的网络策略.
data-cluster-name-zookeeper-idx
-
用于为 ZooKeeper 节点 pod
idx
存储数据的卷的持久性卷声明。只有在选择了持久存储来调配持久卷以存储数据时,才会创建此资源。
Kafka 代理
cluster-name-kafka
- StatefulSet,它负责管理 Kafka 代理 pod。
cluster-name-kafka-idx
- 由 Kafka StatefulSet 创建的 Pod。
cluster-name-kafka-brokers
- 需要 DNS 直接解析 Kafka 代理 Pod IP 地址的服务。
cluster-name-kafka-bootstrap
- 服务可用作 Kafka 客户端的 bootstrap 服务器。
cluster-name-kafka-external-bootstrap
- 为从 OpenShift 集群外部连接的客户端引导服务。只有在启用了外部侦听器时才会创建此资源。
cluster-name-kafka-pod-id
- 用于将流量从 OpenShift 集群外部路由到单个容器集的服务。只有在启用了外部侦听器时才会创建此资源。
cluster-name-kafka-external-bootstrap
-
从 OpenShift 集群外部连接的客户端的引导路由。只有在启用了外部侦听器并设置为 type
route
时才会创建此资源。 cluster-name-kafka-pod-id
-
从 OpenShift 集群外部的流量路由到单个容器集。只有在启用了外部侦听器并设置为 type
route
时才会创建此资源。 cluster-name-kafka-config
- 包含 Kafka 辅助配置且由 Kafka 代理 Pod 挂载为卷的 ConfigMap。
cluster-name-kafka-brokers
- 使用 Kafka 代理密钥的 secret。
cluster-name-kafka
- Kafka 代理使用的服务帐户。
cluster-name-kafka
- 为 Kafka 代理配置的 Pod Disruption Budget。
cluster-name-network-policy-kafka
- 管理对 Kafka 服务访问的网络策略。
strimzi-namespace-name-cluster-name-kafka-init
- Kafka 代理使用的集群角色绑定。
cluster-name-jmx
- 用来保护 Kafka 代理端口的 JMX 用户名和密码的 secret。只有在 Kafka 中启用了 JMX 时才会创建此资源。
data-cluster-name-kafka-idx
-
用于存储 Kafka 代理 pod
IDx
数据的卷的持久性卷声明。只有在选择了持久存储来调配持久卷以存储数据时,才会创建此资源。 data-id-cluster-name-kafka-idx
-
用于存储 Kafka 代理 pod
ID
的卷 ID 的持久性卷声明
。只有在调配持久卷以存储数据时,才会为 JBOD 卷选择持久存储时创建此资源。
实体 Operator
只有在使用 Cluster Operator 部署 Entity Operator 时才会创建这些资源。
cluster-name-entity-operator
- 使用主题和用户操作器进行部署.
cluster-name-entity-operator-random-string
- 由 Entity Operator 部署创建的 Pod。
cluster-name-entity-topic-operator-config
- 带有主题 Operator 辅助配置的 ConfigMap。
cluster-name-entity-user-operator-config
- 带有用户 Operator 辅助配置的 ConfigMap.
cluster-name-entity-operator-certs
- 使用 Entity Operator 密钥与 Kafka 和 ZooKeeper 通信的 secret。
cluster-name-entity-operator
- Entity Operator 使用的服务帐户。
strimzi-cluster-name-topic-operator
- Entity Operator 使用的角色绑定。
strimzi-cluster-name-user-operator
- Entity Operator 使用的角色绑定。
Kafka Exporter
只有在使用 Cluster Operator 部署 Kafka Exporter 时才会创建这些资源。
cluster-name-kafka-exporter
- 使用 Kafka 导出器部署.
cluster-name-kafka-exporter-random-string
- 由 Kafka Exporter 部署创建的 Pod。
cluster-name-kafka-exporter
- 用于收集消费者滞后指标的服务.
cluster-name-kafka-exporter
- Kafka Exporter 使用的服务帐户。
Sything Control
只有在使用 Cluster Operator 部署 Cruise Control 时才会创建这些资源。
cluster-name-cruise-control
- 通过 Cruise 控制进行部署.
cluster-name-cruise-control-random-string
- 由 Cruise Control 部署创建的 Pod。
cluster-name-cruise-control-config
- 包含 Cruise Control 辅助配置的 ConfigMap,并被 Cruise Control pod 作为一个卷挂载。
cluster-name-cruise-control-certs
- 使用 Cruise Control 密钥与 Kafka 和 ZooKeeper 通信的机密。
cluster-name-cruise-control
- 用于与 Cruise Control 通信的服务.
cluster-name-cruise-control
- Cruise Control 使用的服务帐户.
cluster-name-network-policy-cruise-control
- 管理对 Cruise 控制服务的访问的网络策略.
JMXTrans
只有在使用 Cluster Operator 部署 JMXTrans 时才会创建这些资源。
cluster-name-jmxtrans
- 使用 JMXTrans 部署.
cluster-name-jmxtrans-random-string
- 由 JMXTrans 部署创建的 Pod。
cluster-name-jmxtrans-config
- 包含 JMXTrans 辅助配置的 ConfigMap,由 JMXTrans pod 挂载为卷。
cluster-name-jmxtrans
- JMXTrans 使用的服务帐户.