9.2. 在 KRaft 模式中配置 Kafka
更新 Kafka 自定义资源的 spec 属性,以便在 KRaft 模式中配置 Kafka 部署。
除了配置 Kafka 外,您可以为 Apache Kafka operator 添加 Streams 配置。
KRaft 元数据版本(.spec.kafka.metadataVersion)必须是 Kafka 版本(spec.kafka.version)支持的版本。如果没有在配置中设置元数据版本,Cluster Operator 会将版本更新为使用 Kafka 版本的默认设置。
最旧的支持的元数据版本为 3.3。使用早于 Kafka 版本的元数据版本可能会导致禁用一些功能。
在 KRaft 模式下运行的 Kafka 集群也使用节点池。必须在 节点池配置中指定 以下内容:
- 分配给 Kafka 集群中每个节点的角色
- 使用的副本节点数量
- 节点的存储规格
其他可选属性也可以在节点池中设置。
要深入了解 Kafka 集群配置选项,请参阅 Apache Kafka 自定义资源 API 参考。
Kafka 自定义资源配置示例
# Basic configuration (required)
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
# Deployment specifications
spec:
kafka:
# Kafka version (recommended)
version: 3.8.0
# KRaft metadata version (recommended)
metadataVersion: 3.8
# Logging configuration (optional)
logging:
type: inline
loggers:
kafka.root.logger.level: INFO
# Resources requests and limits (recommended)
resources:
requests:
memory: 64Gi
cpu: "8"
limits:
memory: 64Gi
cpu: "12"
# Readiness probe (optional)
readinessProbe:
initialDelaySeconds: 15
timeoutSeconds: 5
# Liveness probe (optional)
livenessProbe:
initialDelaySeconds: 15
timeoutSeconds: 5
# JVM options (optional)
jvmOptions:
-Xms: 8192m
-Xmx: 8192m
# Custom image (optional)
image: my-org/my-image:latest
# Listener configuration (required)
listeners:
- name: plain
port: 9092
type: internal
tls: false
configuration:
useServiceDnsDomain: true
- name: tls
port: 9093
type: internal
tls: true
authentication:
type: tls
- name: external1
port: 9094
type: route
tls: true
configuration:
brokerCertChainAndKey:
secretName: my-secret
certificate: my-certificate.crt
key: my-key.key
# Authorization (optional)
authorization:
type: simple
# Kafka configuration (recommended)
config:
auto.create.topics.enable: "false"
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
default.replication.factor: 3
min.insync.replicas: 2
# Rack awareness (optional)
rack:
topologyKey: topology.kubernetes.io/zone
# Metrics configuration (optional)
metricsConfig:
type: jmxPrometheusExporter
valueFrom:
configMapKeyRef:
name: my-config-map
key: my-key
# Entity Operator (recommended)
entityOperator:
topicOperator:
watchedNamespace: my-topic-namespace
reconciliationIntervalMs: 60000
# Logging configuration (optional)
logging:
type: inline
loggers:
rootLogger.level: INFO
# Resources requests and limits (recommended)
resources:
requests:
memory: 512Mi
cpu: "1"
limits:
memory: 512Mi
cpu: "1"
userOperator:
watchedNamespace: my-topic-namespace
reconciliationIntervalMs: 60000
# Logging configuration (optional)
logging:
type: inline
loggers:
rootLogger.level: INFO
# Resources requests and limits (recommended)
resources:
requests:
memory: 512Mi
cpu: "1"
limits:
memory: 512Mi
cpu: "1"
# Kafka Exporter (optional)
kafkaExporter:
# ...
# Cruise Control (optional)
cruiseControl:
# ...
- 1
- Kafka 版本,可按照升级步骤将其改为受支持的版本。
- 2
- Kafka 元数据版本,可按照升级步骤将其改为受支持的版本。
- 3
- Kafka 日志记录器和日志级别直接(
内联)或通过 ConfigMap 间接添加(外部)。自定义 Log4j 配置必须放在 ConfigMap 中的log4j.properties键下。对于 Kafkakafka.root.logger.levellogger,您可以将日志级别设置为 INFO, ERROR, WARN, TRACE, DEBUG, FATAL 或 OFF。 - 4
- 为保留支持的资源(当前
cpu和memory)的请求,以及指定可消耗的最大资源的限制。 - 5
- 检查检查以了解何时重启容器(存活度)以及何时容器可以接受流量(就绪度)。
- 6
- JVM 配置选项,用于优化运行 Kafka 的虚拟机(VM)的性能。
- 7
- ProductShortName OPTION: 容器镜像配置,仅在特殊情况下推荐使用。
- 8
- 侦听器配置客户端如何通过 bootstrap 地址连接到 Kafka 集群。监听器被配置为 internal 或 external 监听程序,用于来自 OpenShift 集群内部或外的连接。
- 9
- 用于标识监听程序的名称。在 Kafka 集群中必须是唯一的。
- 10
- Kafka 中监听器使用的端口号。端口号必须在给定的 Kafka 集群中唯一。允许端口号为 9092 及更高,但端口 9404 和 9999 除外,它们已经用于 Prometheus 和 JMX。根据监听器类型,端口号可能与连接 Kafka 客户端的端口号不同。
- 11
- 监听程序指定为
internal或cluster-ip(使用每个代理的ClusterIP服务公开 Kafka),或为外部监听程序指定为route(只限 OpenShift),loadbalancer,nodeport或ingress(只限 Kubernetes)。 - 12
- 启用或禁用每个监听器的 TLS 加密。对于
route和ingress类型监听程序,必须通过将其设置为true来启用 TLS 加密。 - 13
- 定义是否分配了包括集群服务后缀(通常为
.cluster.local)的完全限定 DNS 名称。 - 14
- 侦听器身份验证机制指定为 mTLS、SCRAM-SHA-512 或基于令牌的 OAuth 2.0。
- 15
- 外部监听程序配置指定 Kafka 集群如何在 OpenShift 外部公开,如通过
路由,loadbalancer或nodeport。 - 16
- 由外部 CA (证书颁发机构)管理的 Kafka 侦听器证书的可选配置。
brokerCertChainAndKey指定包含服务器证书和私钥的Secret。您可以在任何启用了 TLS 加密的监听程序中配置 Kafka 侦听器证书。 - 17
- 授权在 Kafka 代理上启用 simple、OAUTH 2.0 或 OPA 授权。简单授权使用
AclAuthorizer和StandardAuthorizerKafka 插件。 - 18
- 代理配置。标准 Apache Kafka 配置可能会提供,仅限于不直接由 Apache Kafka 的 Streams 管理的属性。
- 19
- 机架感知配置,将副本分散到不同的机架、数据中心或可用性区域。
topologyKey必须与包含机架 ID 的节点标签匹配。此配置中使用的示例使用标准topology.kubernetes.io/zone标签指定区。 - 20
- 启用 Prometheus 指标。在本例中,为 Prometheus JMX Exporter (默认指标导出器)配置了指标。
- 21
- 通过 Prometheus JMX Exporter 将指标数据导出到 Grafana 仪表板的规则,通过引用包含 Prometheus JMX exporter 配置的 ConfigMap 来启用。您可以使用对
metricsConfig.valueFrom.configMapKeyRef.key下包含空文件的 ConfigMap 的引用来启用指标。 - 22
- Entity Operator 配置,用于指定主题 Operator 和 User Operator 的配置。
- 23
- 指定主题 Operator 日志记录器和日志级别。这个示例使用
内联日志记录。 - 24
- 指定 User Operator 日志记录器和日志级别。
- 25
- Kafka 导出程序配置。Kafka Exporter 是一个可选组件,用于在特定消费者滞后数据中从 Kafka 代理中提取指标数据。要使 Kafka Exporter 能够正常工作,需要使用消费者组。
- 26
- Cruise Control 的可选配置,用于重新平衡 Kafka 集群。
9.2.1. 在代理上设置吞吐量和存储限制 复制链接链接已复制到粘贴板!
此功能是一个技术预览功能,不适用于生产环境。如需更多信息,请参阅发行注记。
此流程描述了如何在 Kafka 集群中的代理上设置吞吐量和存储限制。启用配额 插件,并使用 Kafka 资源中的配额属性配置限制。
可用的配额插件有两种:
-
strimzi类型启用 Strimzi Quotas 插件。 -
kafka类型启用内置 Kafka 插件。
一次只能启用一个配额插件。内置 kafka 插件被默认启用。启用 strimzi 插件会自动禁用内置插件。
strimzi 插件
strimzi 插件提供存储利用率配额和吞吐量限制的动态分布。
-
存储配额根据磁盘存储利用率的 Kafka producer 节流。限制可以以字节为单位(
minAvailableBytesPerVolume)或可用磁盘空间的百分比(minAvailableRatioPerVolume),分别应用到各个磁盘。当集群中的任何代理超过配置的磁盘阈值时,客户端会节流,以防止磁盘被填满并超过容量。 - 在所有客户端间动态分布总吞吐量限制。例如,如果您设置了 40 MBps producer 字节位阈值,则两个制作者之间的分布不是静态的。如果一个生成者使用 10 MBps,则另一个制作者可以使用 30 MBps。
- 特定用户(客户端)可以从限制中排除。
使用 strimzi 插件时,您只会看到聚合的配额指标,而不是每个客户端的指标。
Kafka 插件
kafka 插件基于每个用户(每个代理)应用吞吐量限制,并包含额外的 CPU 和操作速率限值。
-
限制会为每个用户和每个代理应用。例如,设置 20 MBps producer 字节阈值将每个用户限制为在该用户的所有生成者连接中每个代理的 20 MBps。没有总吞吐量限制,因为
strimzi插件中存在。限制可以被 特定于用户的配额配置覆盖。 - 每个客户端的 CPU 使用率限制可以设置为每个代理的网络线程和 I/O 线程的百分比。
- 每秒允许的并发分区创建和删除操作数量(mutations)可以基于每个代理设置。
在使用默认的 Kafka 配额插件时,默认配额(如果设置)会应用到所有用户。这包括内部用户,如主题 Operator 和 Cruise Control,可能会影响其操作。为避免对内部用户造成不必要的限制,请考虑有效地调整配额。
例如,Kafka 配额插件自动应用到 Topic Operator 的配额可能会限制控制器变异率,这可能会节流主题创建和删除操作。因此,务必要了解 Topic Operator 正常工作所需的最小配额,并明确设置适当的配额以避免出现这个问题。监控相关的控制器和代理指标可帮助跟踪和优化主题上的操作速度。根据 Kafka 集群的规模和配置,Cruise Control 及其指标报告程序还需要足够的生成和获取率才能进行重新平衡。要防止 Cruise Control 的问题,您可能会在小集群中启动至少 1 KB/s 的生产者和消费者的速率,如具有中等流量的三个代理,并根据更大或多个活跃集群的要求进行调整。
先决条件
- 管理 Kafka 集群的 Cluster Operator 正在运行。
流程
在
Kafka资源的quota部分添加插件配置。strimzi插件配置示例apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: my-cluster spec: kafka: # ... quotas: type: strimzi producerByteRate: 10000001 consumerByteRate: 10000002 minAvailableBytesPerVolume: 5000000000003 excludedPrincipals:4 - my-userminAvailableBytesPerVolume和minAvailableRatioPerVolume是互斥的。仅配置其中一个参数。kafka插件配置示例apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: my-cluster spec: kafka: # ... quotas: type: kafka producerByteRate: 1000000 consumerByteRate: 1000000 requestPercentage: 551 controllerMutationRate: 502 -
将更改应用到
Kafka配置。
可以在 spec.kafka.config 部分中配置其他选项。支持选项的完整列表可在 插件文档 中找到。
9.2.2. 使用注解删除 Kafka 节点 复制链接链接已复制到粘贴板!
此流程描述了如何使用 OpenShift 注解删除现有 Kafka 节点。删除 Kafka 节点包括删除运行 Kafka 代理的 Pod 和相关的 PersistentVolumeClaim (如果集群使用持久性存储部署)。删除后,Pod 及其相关的 PersistentVolumeClaim 会自动重新创建。
删除 PersistentVolumeClaim 可能会导致持久性数据丢失,且不能保证集群可用。只有在遇到存储问题时才应执行以下步骤。
先决条件
- 正在运行的 Cluster Operator
流程
查找您要删除的
Pod的名称。Kafka 代理 pod 名为
<cluster_name>-kafka-<index_number>,其中<index_number>从 0 开始,以总副本数减一结束。例如,my-cluster-kafka-0。使用
oc annotate注解 OpenShift 中的Pod资源:oc annotate pod <cluster_name>-kafka-<index_number> strimzi.io/delete-pod-and-pvc="true"- 等待下一个协调,当注解的 pod 带有底层持久性卷声明的 pod 将被删除,然后重新创建。