8.2. 配置 Kafka


更新 Kafka 自定义资源的 spec 属性来配置 Kafka 部署。

另外,您还可以为 ZooKeeper 和 AMQ Streams Operator 添加配置。各个组件都独立配置通用配置属性,如日志记录和健康检查。

特别重要的配置选项包括:

  • 资源请求(CPU / Memory)
  • 最多和最小内存分配的 JVM 选项
  • 将客户端连接到 Kafka 代理的监听程序(以及客户端的身份验证)
  • 身份验证
  • 存储
  • 机架感知
  • 指标
  • 用于集群重新平衡的 Cruise Control

要深入了解 Kafka 集群配置选项,请参阅 AMQ Streams 自定义资源 API 参考

Kafka 版本

Kafka configinter.broker.protocol.version 属性必须是指定的 Kafka 版本 (spec.kafka.version) 支持的版本。属性代表 Kafka 集群中使用的 Kafka 协议版本。

从 Kafka 3.0.0,当 inter.broker.protocol.version 设置为 3.0 或更高版本时,log.message.format.version 选项会被忽略,且不需要设置。

当升级 Kafka 版本时,需要对 inter.broker.protocol.version 进行更新。如需更多信息,请参阅升级 Kafka

管理 TLS 证书

在部署 Kafka 时,Cluster Operator 会自动设置和更新 TLS 证书,以便在集群中启用加密和身份验证。如果需要,您可以在续订周期开始前手动续订集群和客户端 CA 证书。您还可以替换集群和客户端 CA 证书使用的密钥。如需更多信息,请参阅 手动续订 CA 证书替换私钥

Kafka 自定义资源配置示例

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    replicas: 3 
1

    version: 3.6.0 
2

    logging: 
3

      type: inline
      loggers:
        kafka.root.logger.level: INFO
    resources: 
4

      requests:
        memory: 64Gi
        cpu: "8"
      limits:
        memory: 64Gi
        cpu: "12"
    readinessProbe: 
5

      initialDelaySeconds: 15
      timeoutSeconds: 5
    livenessProbe:
      initialDelaySeconds: 15
      timeoutSeconds: 5
    jvmOptions: 
6

      -Xms: 8192m
      -Xmx: 8192m
    image: my-org/my-image:latest 
7

    listeners: 
8

      - name: plain 
9

        port: 9092 
10

        type: internal 
11

        tls: false 
12

        configuration:
          useServiceDnsDomain: true 
13

      - name: tls
        port: 9093
        type: internal
        tls: true
        authentication: 
14

          type: tls
      - name: external1 
15

        port: 9094
        type: route
        tls: true
        configuration:
          brokerCertChainAndKey: 
16

            secretName: my-secret
            certificate: my-certificate.crt
            key: my-key.key
    authorization: 
17

      type: simple
    config: 
18

      auto.create.topics.enable: "false"
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      default.replication.factor: 3
      min.insync.replicas: 2
      inter.broker.protocol.version: "3.6"
    storage: 
19

      type: persistent-claim 
20

      size: 10000Gi
    rack: 
21

      topologyKey: topology.kubernetes.io/zone
    metricsConfig: 
22

      type: jmxPrometheusExporter
      valueFrom:
        configMapKeyRef: 
23

          name: my-config-map
          key: my-key
    # ...
  zookeeper: 
24

    replicas: 3 
25

    logging: 
26

      type: inline
      loggers:
        zookeeper.root.logger: INFO
    resources:
      requests:
        memory: 8Gi
        cpu: "2"
      limits:
        memory: 8Gi
        cpu: "2"
    jvmOptions:
      -Xms: 4096m
      -Xmx: 4096m
    storage:
      type: persistent-claim
      size: 1000Gi
    metricsConfig:
      # ...
  entityOperator: 
27

    tlsSidecar: 
28

      resources:
        requests:
          cpu: 200m
          memory: 64Mi
        limits:
          cpu: 500m
          memory: 128Mi
    topicOperator:
      watchedNamespace: my-topic-namespace
      reconciliationIntervalSeconds: 60
      logging: 
29

        type: inline
        loggers:
          rootLogger.level: INFO
      resources:
        requests:
          memory: 512Mi
          cpu: "1"
        limits:
          memory: 512Mi
          cpu: "1"
    userOperator:
      watchedNamespace: my-topic-namespace
      reconciliationIntervalSeconds: 60
      logging: 
30

        type: inline
        loggers:
          rootLogger.level: INFO
      resources:
        requests:
          memory: 512Mi
          cpu: "1"
        limits:
          memory: 512Mi
          cpu: "1"
  kafkaExporter: 
31

    # ...
  cruiseControl: 
32

    # ...
Copy to Clipboard Toggle word wrap

1
副本节点的数量。
2
Kafka 版本,可按照升级步骤将其改为受支持的版本。
3
Kafka 日志记录器和日志级别直接(内联)或通过 ConfigMap 间接添加(外部)。自定义 Log4j 配置必须放在 ConfigMap 中的 log4j.properties 键下。对于 Kafka kafka.root.logger.level logger,您可以将日志级别设置为 INFO, ERROR, WARN, TRACE, DEBUG, FATAL 或 OFF。
4
为保留支持的资源(当前 cpumemory )的请求,以及指定可消耗的最大资源的限制。
5
检查检查以了解何时重启容器(存活度)以及何时容器可以接受流量(就绪度)。
6
JVM 配置选项,用于优化运行 Kafka 的虚拟机(VM)的性能。
7
ProductShortName OPTION: 容器镜像配置,仅在特殊情况下推荐使用。
8
侦听器配置客户端如何通过 bootstrap 地址连接到 Kafka 集群。监听器被配置为 internalexternal 监听程序,用于来自 OpenShift 集群内部或外的连接。
9
用于标识监听程序的名称。在 Kafka 集群中必须是唯一的。
10
Kafka 中监听器使用的端口号。端口号必须在给定的 Kafka 集群中唯一。允许端口号为 9092 及更高,但端口 9404 和 9999 除外,它们已经用于 Prometheus 和 JMX。根据监听器类型,端口号可能与连接 Kafka 客户端的端口号不同。
11
监听程序指定为 internalcluster-ip (使用每个代理的 ClusterIP 服务公开 Kafka),或为外部监听程序指定为 route(只限 OpenShift), loadbalancer, nodeportingress(只限 Kubernetes)。
12
为每个监听程序启用 TLS 加密。默认为 false。需要启用 TLS 加密,将它设置为 true, 对于 routeingress 类型的监听器。
13
定义是否分配了包括集群服务后缀(通常为 .cluster.local)的完全限定 DNS 名称。
14
侦听器身份验证机制指定为 mTLS、SCRAM-SHA-512 或基于令牌的 OAuth 2.0。
15
外部监听程序配置指定 Kafka 集群如何在 OpenShift 外部公开,如通过 路由,loadbalancernodeport
16
由外部 CA (证书颁发机构)管理的 Kafka 侦听器证书的可选配置。brokerCertChainAndKey 指定包含服务器证书和私钥的 Secret。您可以在任何启用了 TLS 加密的监听程序中配置 Kafka 侦听器证书。
17
授权在 Kafka 代理上启用 simple、OAUTH 2.0 或 OPA 授权。简单授权使用 AclAuthorizerStandardAuthorizer Kafka 插件。
18
代理配置。标准 Apache Kafka 配置可能会提供,仅限于不直接由 AMQ Streams 管理的属性。
19
持久性卷的存储大小可能会增加,并可以添加额外的卷到 JBOD 存储中。
20
持久性存储具有额外的配置选项,如用于动态卷置备的存储 idclass
21
机架感知配置,将副本分散到不同的机架、数据中心或可用性区域。topologyKey 必须与包含机架 ID 的节点标签匹配。此配置中使用的示例使用标准 topology.kubernetes.io/zone 标签指定区。
22
启用 Prometheus 指标。在本例中,为 Prometheus JMX Exporter (默认指标导出器)配置了指标。
23
通过 Prometheus JMX Exporter 将指标数据导出到 Grafana 仪表板的规则,通过引用包含 Prometheus JMX exporter 配置的 ConfigMap 来启用。您可以使用对 metricsConfig.valueFrom.configMapKeyRef.key 下包含空文件的 ConfigMap 的引用来启用指标。
24
特定于 ZooKeeper 的配置,其中包含与 Kafka 配置类似的属性。
25
ZooKeeper 节点数量。ZooKeeper 集群通常有奇数个节点,一般为三个、五个或七个。大多数节点都必须可用,才能保持有效的仲裁。如果 ZooKeeper 集群丢失了其仲裁,它将停止响应客户端,并且 Kafka 代理将停止工作。拥有稳定且高度可用的 ZooKeeper 集群对于 AMQ Streams 至关重要。
26
ZooKeeper 日志记录器和日志级别。
27
Entity Operator 配置,用于指定主题 Operator 和 User Operator 的配置。
28
实体 Operator TLS sidecar 配置。实体 Operator 使用 TLS sidecar 与 ZooKeeper 安全通信。
29
指定主题 Operator 日志记录器和日志级别。这个示例使用 内联 日志记录。
30
指定 User Operator 日志记录器和日志级别。
31
Kafka 导出程序配置。Kafka Exporter 是一个可选组件,用于在特定消费者滞后数据中从 Kafka 代理中提取指标数据。要使 Kafka Exporter 能够正常工作,需要使用消费者组。
32
Cruise Control 的可选配置,用于重新平衡 Kafka 集群。

8.2.1. 使用 Kafka 静态配额插件对代理设置限制

使用 Kafka 静态配额插件在 Kafka 集群中的代理上设置吞吐量和存储限制。您可以通过配置 Kafka 资源来启用插件和设置限制。您可以设置字节阈值和存储配额,以在与代理交互的客户端上放置限制。

您可以为生成者和消费者带宽设置字节阈值。总限制分布在访问代理的所有客户端中。例如,您可以为制作者设置 40 MBps 的字节阈值。如果两个制作者正在运行,则它们各自限制为 20 MBps 的吞吐量。

存储配额在软限制和硬限制之间节流 Kafka 磁盘存储限制。限制适用于所有可用磁盘空间。生产者在软限制和硬限制之间逐渐减慢。限制可防止磁盘填满速度超过其容量。完整磁盘可能会导致出现问题。硬限制是最大存储限制。

注意

对于 JBOD 存储,限制适用于所有磁盘。如果代理使用两个 1 TB 磁盘,且配额为 1.1 TB,则一个磁盘可能会填满,另一个磁盘将大约为空。

先决条件

  • 管理 Kafka 集群的 Cluster Operator 正在运行。

流程

  1. Kafka 资源的 config 添加插件属性。

    本例中显示了插件属性。

    Kafka 静态配额插件配置示例

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        # ...
        config:
          client.quota.callback.class: io.strimzi.kafka.quotas.StaticQuotaCallback 
    1
    
          client.quota.callback.static.produce: 1000000 
    2
    
          client.quota.callback.static.fetch: 1000000 
    3
    
          client.quota.callback.static.storage.soft: 400000000000 
    4
    
          client.quota.callback.static.storage.hard: 500000000000 
    5
    
          client.quota.callback.static.storage.check-interval: 5 
    6
    Copy to Clipboard Toggle word wrap

    1
    加载 Kafka Static Quota 插件。
    2
    设置制作者字节阈值。本示例中为 1 Mbps.
    3
    设置消费者字节阈值。本示例中为 1 Mbps.
    4
    为存储设置较低软限制。在本示例中为 400 GB。
    5
    为存储设置更高的硬限制。本例中为 500 GB。
    6
    设置存储检查间隔(以秒为单位)。本示例中为 5 秒。您可以将其设置为 0 来禁用检查。
  2. 更新资源。

    oc apply -f <kafka_configuration_file>
    Copy to Clipboard Toggle word wrap

8.2.2. 默认 ZooKeeper 配置值

当使用 AMQ Streams 部署 ZooKeeper 时,AMQ Streams 设置的一些默认配置与标准 ZooKeeper 默认值不同。这是因为 AMQ Streams 设置多个 ZooKeeper 属性,其值在 OpenShift 环境中运行 ZooKeeper。

AMQ Streams 中密钥 ZooKeeper 属性的默认配置如下:

Expand
表 8.1. AMQ Streams 中的默认 ZooKeeper 属性
属性默认值Description

tickTime

2000

以毫秒为单位的单空长度,它决定了会话超时的长度。

initLimit

5

在 ZooKeeper 集群中允许后续者获得的最大 tick 数量。

syncLimit

2

后续允许不与 ZooKeeper 集群中的领导不同步的最大 tick 数量。

autopurge.purgeInterval

1

启用 autopurge 功能,并在小时内设置清除服务器端 ZooKeeper 事务日志的时间间隔。

admin.enableServer

false

禁用 ZooKeeper admin 服务器的标记。AMQ Streams 不使用 admin 服务器。

重要

修改这些默认值作为 Kafka 自定义资源中的 zookeeper.config 可能会影响 ZooKeeper 集群的行为和性能。

返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat