9.2. 配置 Kafka
更新 Kafka 自定义资源的 spec 属性来配置 Kafka 部署。
另外,您还可以为 ZooKeeper 和 Apache Kafka Operator 添加配置。各个组件都独立配置通用配置属性,如日志记录和健康检查。
特别重要的配置选项包括:
- 资源请求(CPU / Memory)
- 最多和最小内存分配的 JVM 选项
- 将客户端连接到 Kafka 代理的监听程序(以及客户端的身份验证)
- 身份验证
- 存储
- 机架感知
- 指标
- 用于集群重新平衡的 Cruise Control
- 基于 KRaft 的 Kafka 集群的元数据版本
- 基于 ZooKeeper 的 Kafka 集群的 inter-broker 协议版本
.spec.kafka.metadataVersion 属性或 config 中的 inter.broker.protocol.version 属性必须是指定的 Kafka 版本(spec.kafka.version)支持的版本。属性代表 Kafka 集群中使用的 Kafka 元数据或 inter-broker 协议版本。如果没有在配置中设置这些属性,Cluster Operator 会将版本更新为使用的 Kafka 版本的默认值。
最旧的支持的元数据版本为 3.3。使用早于 Kafka 版本的元数据版本可能会导致禁用一些功能。
要深入了解 Kafka 集群配置选项,请参阅 Apache Kafka 自定义资源 API 参考。
管理 TLS 证书
在部署 Kafka 时,Cluster Operator 会自动设置和更新 TLS 证书,以便在集群中启用加密和身份验证。如果需要,您可以在续订周期开始前手动续订集群和客户端 CA 证书。您还可以替换集群和客户端 CA 证书使用的密钥。如需更多信息,请参阅 手动续订 CA 证书和替换私钥。
Kafka 自定义资源配置示例
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
spec:
kafka:
replicas: 3
version: 3.7.0
logging:
type: inline
loggers:
kafka.root.logger.level: INFO
resources:
requests:
memory: 64Gi
cpu: "8"
limits:
memory: 64Gi
cpu: "12"
readinessProbe:
initialDelaySeconds: 15
timeoutSeconds: 5
livenessProbe:
initialDelaySeconds: 15
timeoutSeconds: 5
jvmOptions:
-Xms: 8192m
-Xmx: 8192m
image: my-org/my-image:latest
listeners:
- name: plain
port: 9092
type: internal
tls: false
configuration:
useServiceDnsDomain: true
- name: tls
port: 9093
type: internal
tls: true
authentication:
type: tls
- name: external1
port: 9094
type: route
tls: true
configuration:
brokerCertChainAndKey:
secretName: my-secret
certificate: my-certificate.crt
key: my-key.key
authorization:
type: simple
config:
auto.create.topics.enable: "false"
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
default.replication.factor: 3
min.insync.replicas: 2
inter.broker.protocol.version: "3.7"
storage:
type: persistent-claim
size: 10000Gi
rack:
topologyKey: topology.kubernetes.io/zone
metricsConfig:
type: jmxPrometheusExporter
valueFrom:
configMapKeyRef:
name: my-config-map
key: my-key
# ...
zookeeper:
replicas: 3
logging:
type: inline
loggers:
zookeeper.root.logger: INFO
resources:
requests:
memory: 8Gi
cpu: "2"
limits:
memory: 8Gi
cpu: "2"
jvmOptions:
-Xms: 4096m
-Xmx: 4096m
storage:
type: persistent-claim
size: 1000Gi
metricsConfig:
# ...
entityOperator:
tlsSidecar:
resources:
requests:
cpu: 200m
memory: 64Mi
limits:
cpu: 500m
memory: 128Mi
topicOperator:
watchedNamespace: my-topic-namespace
reconciliationIntervalSeconds: 60
logging:
type: inline
loggers:
rootLogger.level: INFO
resources:
requests:
memory: 512Mi
cpu: "1"
limits:
memory: 512Mi
cpu: "1"
userOperator:
watchedNamespace: my-topic-namespace
reconciliationIntervalSeconds: 60
logging:
type: inline
loggers:
rootLogger.level: INFO
resources:
requests:
memory: 512Mi
cpu: "1"
limits:
memory: 512Mi
cpu: "1"
kafkaExporter:
# ...
cruiseControl:
# ...
- 1
- 副本节点的数量。
- 2
- Kafka 版本,可按照升级步骤将其改为受支持的版本。
- 3
- Kafka 日志记录器和日志级别直接(
内联)或通过 ConfigMap 间接添加(外部)。自定义 Log4j 配置必须放在 ConfigMap 中的log4j.properties键下。对于 Kafkakafka.root.logger.levellogger,您可以将日志级别设置为 INFO, ERROR, WARN, TRACE, DEBUG, FATAL 或 OFF。 - 4
- 为保留支持的资源(当前
cpu和memory)的请求,以及指定可消耗的最大资源的限制。 - 5
- 检查检查以了解何时重启容器(存活度)以及何时容器可以接受流量(就绪度)。
- 6
- JVM 配置选项,用于优化运行 Kafka 的虚拟机(VM)的性能。
- 7
- ProductShortName OPTION: 容器镜像配置,仅在特殊情况下推荐使用。
- 8
- 侦听器配置客户端如何通过 bootstrap 地址连接到 Kafka 集群。监听器被配置为 internal 或 external 监听程序,用于来自 OpenShift 集群内部或外的连接。
- 9
- 用于标识监听程序的名称。在 Kafka 集群中必须是唯一的。
- 10
- Kafka 中监听器使用的端口号。端口号必须在给定的 Kafka 集群中唯一。允许端口号为 9092 及更高,但端口 9404 和 9999 除外,它们已经用于 Prometheus 和 JMX。根据监听器类型,端口号可能与连接 Kafka 客户端的端口号不同。
- 11
- 监听程序指定为
internal或cluster-ip(使用每个代理的ClusterIP服务公开 Kafka),或为外部监听程序指定为route(只限 OpenShift),loadbalancer,nodeport或ingress(只限 Kubernetes)。 - 12
- 启用或禁用每个监听器的 TLS 加密。对于
route和ingress类型监听程序,必须通过将其设置为true来启用 TLS 加密。 - 13
- 定义是否分配了包括集群服务后缀(通常为
.cluster.local)的完全限定 DNS 名称。 - 14
- 侦听器身份验证机制指定为 mTLS、SCRAM-SHA-512 或基于令牌的 OAuth 2.0。
- 15
- 外部监听程序配置指定 Kafka 集群如何在 OpenShift 外部公开,如通过
路由,loadbalancer或nodeport。 - 16
- 由外部 CA (证书颁发机构)管理的 Kafka 侦听器证书的可选配置。
brokerCertChainAndKey指定包含服务器证书和私钥的Secret。您可以在任何启用了 TLS 加密的监听程序中配置 Kafka 侦听器证书。 - 17
- 授权在 Kafka 代理上启用 simple、OAUTH 2.0 或 OPA 授权。简单授权使用
AclAuthorizer和StandardAuthorizerKafka 插件。 - 18
- 代理配置。标准 Apache Kafka 配置可能会提供,仅限于不直接由 Apache Kafka 的 Streams 管理的属性。
- 19
- 持久性卷的存储大小可能会增加,并可以添加额外的卷到 JBOD 存储中。
- 20
- 持久性存储具有额外的配置选项,如用于动态卷置备的存储
id和class。 - 21
- 机架感知配置,将副本分散到不同的机架、数据中心或可用性区域。
topologyKey必须与包含机架 ID 的节点标签匹配。此配置中使用的示例使用标准topology.kubernetes.io/zone标签指定区。 - 22
- 启用 Prometheus 指标。在本例中,为 Prometheus JMX Exporter (默认指标导出器)配置了指标。
- 23
- 通过 Prometheus JMX Exporter 将指标数据导出到 Grafana 仪表板的规则,通过引用包含 Prometheus JMX exporter 配置的 ConfigMap 来启用。您可以使用对
metricsConfig.valueFrom.configMapKeyRef.key下包含空文件的 ConfigMap 的引用来启用指标。 - 24
- 特定于 ZooKeeper 的配置,其中包含与 Kafka 配置类似的属性。
- 25
- ZooKeeper 节点数量。ZooKeeper 集群通常有奇数个节点,一般为三个、五个或七个。大多数节点都必须可用,才能保持有效的仲裁。如果 ZooKeeper 集群丢失了其仲裁,它将停止响应客户端,并且 Kafka 代理将停止工作。拥有稳定且高度可用的 ZooKeeper 集群对于 Apache Kafka 来说至关重要。
- 26
- ZooKeeper 日志记录器和日志级别。
- 27
- Entity Operator 配置,用于指定主题 Operator 和 User Operator 的配置。
- 28
- 实体 Operator TLS sidecar 配置。实体 Operator 使用 TLS sidecar 与 ZooKeeper 安全通信。
- 29
- 指定主题 Operator 日志记录器和日志级别。这个示例使用
内联日志记录。 - 30
- 指定 User Operator 日志记录器和日志级别。
- 31
- Kafka 导出程序配置。Kafka Exporter 是一个可选组件,用于在特定消费者滞后数据中从 Kafka 代理中提取指标数据。要使 Kafka Exporter 能够正常工作,需要使用消费者组。
- 32
- Cruise Control 的可选配置,用于重新平衡 Kafka 集群。
9.2.1. 使用 Kafka 静态配额插件对代理设置限制 复制链接链接已复制到粘贴板!
使用 Kafka 静态配额插件在 Kafka 集群中的代理上设置吞吐量和存储限制。您可以通过配置 Kafka 资源来启用插件和设置限制。您可以设置字节阈值和存储配额,以在与代理交互的客户端上放置限制。
您可以为生成者和消费者带宽设置字节阈值。总限制分布在访问代理的所有客户端中。例如,您可以为制作者设置 40 MBps 的字节阈值。如果两个制作者正在运行,则它们各自限制为 20 MBps 的吞吐量。
存储配额在软限制和硬限制之间节流 Kafka 磁盘存储限制。限制适用于所有可用磁盘空间。生产者在软限制和硬限制之间逐渐减慢。限制可防止磁盘填满速度超过其容量。完整磁盘可能会导致出现问题。硬限制是最大存储限制。
对于 JBOD 存储,限制适用于所有磁盘。如果代理使用两个 1 TB 磁盘,且配额为 1.1 TB,则一个磁盘可能会填满,另一个磁盘将大约为空。
先决条件
- 管理 Kafka 集群的 Cluster Operator 正在运行。
流程
为
Kafka资源的config添加插件属性。本例中显示了插件属性。
Kafka 静态配额插件配置示例
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: my-cluster spec: kafka: # ... config: client.quota.callback.class: io.strimzi.kafka.quotas.StaticQuotaCallback1 client.quota.callback.static.produce: 10000002 client.quota.callback.static.fetch: 10000003 client.quota.callback.static.storage.soft: 4000000000004 client.quota.callback.static.storage.hard: 5000000000005 client.quota.callback.static.storage.check-interval: 56 更新资源。
oc apply -f <kafka_configuration_file>
9.2.2. 默认 ZooKeeper 配置值 复制链接链接已复制到粘贴板!
当使用 Apache Kafka 的 Streams 部署 ZooKeeper 时,Apache Kafka 的 Streams 设置的一些默认配置与标准 ZooKeeper 默认值不同。这是因为 Apache Kafka 的 Streams 设置多个 ZooKeeper 属性,其值在 OpenShift 环境中运行 ZooKeeper。
Apache Kafka Streams 中密钥 ZooKeeper 属性的默认配置如下:
| 属性 | 默认值 | Description |
|---|---|---|
|
| 2000 | 以毫秒为单位的单空长度,它决定了会话超时的长度。 |
|
| 5 | 在 ZooKeeper 集群中允许后续者获得的最大 tick 数量。 |
|
| 2 | 后续允许不与 ZooKeeper 集群中的领导不同步的最大 tick 数量。 |
|
| 1 |
启用 |
|
| false | 禁用 ZooKeeper admin 服务器的标记。在 Apache Kafka 中,流不使用 admin 服务器。 |
修改这些默认值作为 Kafka 自定义资源中的 zookeeper.config 可能会影响 ZooKeeper 集群的行为和性能。
9.2.3. 使用注解删除 Kafka 节点 复制链接链接已复制到粘贴板!
此流程描述了如何使用 OpenShift 注解删除现有 Kafka 节点。删除 Kafka 节点包括删除运行 Kafka 代理的 Pod 和相关的 PersistentVolumeClaim (如果集群使用持久性存储部署)。删除后,Pod 及其相关的 PersistentVolumeClaim 会自动重新创建。
删除 PersistentVolumeClaim 可能会导致持久性数据丢失,且不能保证集群可用。只有在遇到存储问题时才应执行以下步骤。
先决条件
- 正在运行的 Cluster Operator
流程
查找您要删除的
Pod的名称。Kafka 代理 pod 名为
<cluster_name>-kafka-<index_number>,其中<index_number>从 0 开始,以总副本数减一结束。例如,my-cluster-kafka-0。使用
oc annotate注解 OpenShift 中的Pod资源:oc annotate pod <cluster_name>-kafka-<index_number> strimzi.io/delete-pod-and-pvc="true"- 等待下一个协调,当注解的 pod 带有底层持久性卷声明的 pod 将被删除,然后重新创建。
9.2.4. 使用注解删除 ZooKeeper 节点 复制链接链接已复制到粘贴板!
此流程描述了如何使用 OpenShift 注解删除现有 ZooKeeper 节点。删除 ZooKeeper 节点包括删除运行 ZooKeeper 的 Pod 和相关的 PersistentVolumeClaim (如果集群使用持久性存储部署)。删除后,Pod 及其相关的 PersistentVolumeClaim 会自动重新创建。
删除 PersistentVolumeClaim 可能会导致持久性数据丢失,且不能保证集群可用。只有在遇到存储问题时才应执行以下步骤。
先决条件
- 正在运行的 Cluster Operator
流程
查找您要删除的
Pod的名称。ZooKeeper pod 名为
<cluster_name>-zookeeper-<index_number>,其中<index_number>从 0 开始,以总副本数减一结束。例如,my-cluster-zookeeper-0。使用
oc annotate注解 OpenShift 中的Pod资源:oc annotate pod <cluster_name>-zookeeper-<index_number> strimzi.io/delete-pod-and-pvc="true"- 等待下一个协调,当注解的 pod 带有底层持久性卷声明的 pod 将被删除,然后重新创建。