29.7. 使用 ZooKeeper 升级 Kafka


如果您使用基于 ZooKeeper 的 Kafka 集群,升级需要更新 Kafka 版本和 inter-broker 协议版本。

如果要将 Kafka 集群从使用 ZooKeeper 切换,以便元数据管理在 KRaft 模式中操作,则必须独立于升级执行这些步骤。有关迁移到基于 KRaft 的集群的详情,请参考 第 5.2 节 “迁移到 KRaft 模式”

29.7.1. 更新 Kafka 版本

当使用 ZooKeeper 进行集群管理时,在 Kafka 资源的配置中需要更新 Kafka 版本(Kafka.spec.kafka.version)及其 inter-broker 协议版本(inter.broker.protocol.version)。Kafka 的每个版本都有了一个内部代理协议的兼容版本。inter-broker 协议用于代理间通信。协议的次要版本通常会增加以匹配 Kafka 的次要版本,如上表中所示。inter-broker 协议版本在 Kafka 资源中设置 cluster wide。要更改它,您可以编辑 Kafka.spec.kafka.config 中的 inter.broker.protocol.version 属性。

下表显示了 Kafka 版本之间的区别:

Expand
表 29.1. Kafka 版本的不同
Apache Kafka 版本流Kafka 版本inter-broker 协议版本日志消息格式版本ZooKeeper 版本

2.8

3.8.0

3.8

3.8

3.8.4

2.7

3.7.0

3.7

3.7

3.8.3

  • Kafka 3.8.0 支持在生产环境中使用。
  • Kafka 3.7.0 仅支持升级到 Streams for Apache Kafka 2.8。

日志消息格式版本

当制作者发送消息到 Kafka 代理时,消息会使用特定的格式进行编码。格式可能会在 Kafka 发行版本之间改变,因此消息指定它们编码的消息格式版本。

用于设置特定消息格式版本的属性如下:

  • 主题的 message.format.version 属性
  • Kafka 代理的 log.message.format.version 属性

从 Kafka 3.0.0,消息格式版本值被假定为与 inter.broker.protocol.version 匹配,且不需要设置。该值反映了使用的 Kafka 版本。

当升级到 Kafka 3.0.0 或更高版本时,您可以在更新 inter.broker.protocol.version 时删除这些设置。否则,您可以根据您要升级到的 Kafka 版本设置消息格式版本。

由在 Kafka 代理中设置的 log.message.format.version 定义的 message.format.version 的默认值。您可以通过修改主题配置来手动设置主题的 message.format.version

Kafka 版本的滚动更新更改

当 Kafka 版本被更新时,Cluster Operator 会启动对 Kafka 代理的滚动更新。进一步的滚动更新依赖于 inter.broker.protocol.versionlog.message.format.version 的配置。

Expand
如果 Kafka.spec.kafka.config 包含…​Cluster Operator 启动…​

inter.broker.protocol.versionlog.message.format.version

单个滚动更新。更新后,必须手动更新 inter.broker.protocol.version,后跟 log.message.format.version。更改每个将触发进一步的滚动更新。

inter.broker.protocol.versionlog.message.format.version

两个滚动更新。

没有 inter.broker.protocol.versionlog.message.format.version 的配置。

两个滚动更新。

重要

从 Kafka 3.0.0,当 inter.broker.protocol.version 设置为 3.0 或更高版本时,log.message.format.version 选项会被忽略,且不需要设置。代理的 log.message.format.version 属性和主题的 message.format.version 属性已弃用,并将在以后的 Kafka 发行版本中删除。

作为 Kafka 升级的一部分,Cluster Operator 为 ZooKeeper 启动滚动更新。

  • 即使 ZooKeeper 版本没有改变,也会进行单个滚动更新。
  • 如果 Kafka 的新版本需要新的 ZooKeeper 版本,则会进行额外的滚动更新。

29.7.2. 升级带有旧消息格式的客户端

在 Kafka 3.0 之前,您可以使用 log.message.format.version 属性(或主题级别上的 message.format.version 属性)为代理配置特定的消息格式。这允许代理适应使用过时的消息格式的旧 Kafka 客户端。虽然 Kafka 在没有显式设置此属性的情况下支持旧的客户端,但代理需要从旧的客户端转换信息,这会显著降低性能成本。

自 0.11 版以来,Apache Kafka Java 客户端支持最新的消息格式版本。如果您的所有客户端都使用最新的消息版本,您可以在升级代理时删除 log.message.format.versionmessage.format.version 覆盖。

但是,如果您仍然有使用较旧的消息格式版本的客户端,我们建议首先升级您的客户端。从消费者开始,然后在升级代理时删除 log.message.format.versionmessage.format.version 覆盖前升级制作者。这将确保您的所有客户端都支持最新的消息格式版本,且升级过程平稳。

您可以使用此指标跟踪 Kafka 客户端名称和版本:

  • kafka.server:type=socket-server-metrics,clientSoftwareName=<name>,clientSoftwareVersion=<version>,listener=<listener>,networkProcessor=<processor>
提示

以下 Kafka 代理指标帮助监控消息 down-conversion 的性能:

  • kafka.network:type=RequestMetrics,name=MessageConversionsTimeMs,request={Produce|Fetch} 提供了执行消息转换的时间的指标。
  • kafka.server:type=BrokerTopicMetrics,name={Produce|Fetch}MessageConversionsPerSec,topic=([-.\w]+) 在一段时间内转换的信息数量上提供指标。

29.7.3. 升级基于 ZooKeeper 的 Kafka 集群

将基于 ZooKeeper 的 Kafka 集群升级到较新的支持的 Kafka 版本和 inter-broker 协议版本。

先决条件

  • Cluster Operator 已启动并在运行。
  • 在升级 Kafka 集群前,请检查 Kafka 资源的属性 不包含 新的 Kafka 版本不支持的配置选项。

流程

  1. 更新 Kafka 集群配置:

    oc edit kafka <kafka_configuration_file>
    Copy to Clipboard Toggle word wrap
  2. 如果配置,请检查 inter.broker.protocol.versionlog.message.format.version 属性是否已设置为 当前版本

    例如,如果从 Kafka 版本 3.7.0 升级到 3.8.0,当前版本为 3.7 :

    kind: Kafka
    spec:
      # ...
      kafka:
        version: 3.7.0
        config:
          log.message.format.version: "3.7"
          inter.broker.protocol.version: "3.7"
          # ...
    Copy to Clipboard Toggle word wrap

    如果没有配置 log.message.format.versioninter.broker.protocol.version,则 Apache Kafka 的 Streams 会在在下一步中更新到 Kafka 版本后自动将这些版本更新为当前默认值。

    注意

    log.message.format.versioninter.broker.protocol.version 的值必须是字符串,以防止它们被解释为浮点号。

  3. 更改 Kafka.spec.kafka.version 以指定新的 Kafka 版本;将 log.message.format.versioninter.broker.protocol.version 保留为 当前 Kafka 版本的默认值。

    注意

    更改 kafka.version 可确保升级集群中的所有代理,以使用新的代理二进制文件。在此过程中,一些代理使用旧的二进制文件,而其他代理已升级到新的二进制文件。将 inter.broker.protocol.version 保持不变在当前设置中,可确保代理可以在升级过程中继续相互通信。

    例如,如果从 Kafka 3.7.0 升级到 3.8.0 :

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    spec:
      # ...
      kafka:
        version: 3.8.0 
    1
    
        config:
          log.message.format.version: "3.7" 
    2
    
          inter.broker.protocol.version: "3.7" 
    3
    
          # ...
    Copy to Clipboard Toggle word wrap
    1
    Kafka 版本已改为新版本。
    2
    消息格式版本保持不变。
    3
    inter-broker 协议版本保持不变。
    警告

    如果新的 Kafka 版本更改的 inter.broker.protocol.version,则无法降级 Kafka。inter-broker 协议版本决定用于代理存储的持久性元数据的模式,包括写入 __consumer_offsets 的消息。降级的集群不了解消息。

  4. 如果在 Kafka 自定义资源的 Kafka.spec.kafka.image 中定义了 Kafka 集群 的镜像,请更新该镜像以指向新的 Kafka 版本的容器镜像。

    请参阅 Kafka 版本和镜像映射

  5. 保存并退出编辑器,然后等待滚动更新完成。

    通过观察 pod 状态转换来检查滚动更新的进度:

    oc get pods my-cluster-kafka-0 -o jsonpath='{.spec.containers[0].image}'
    Copy to Clipboard Toggle word wrap

    滚动更新可确保每个 pod 都使用 Kafka 的新版本的代理二进制文件。

  6. 如果需要,将 Kafka Connect 和 MirrorMaker 的 version 属性设置为 Kafka 的新版本:

    1. 对于 Kafka Connect,更新 KafkaConnect.spec.version
    2. 对于 MirrorMaker,更新 KafkaMirrorMaker.spec.version
    3. 对于 MirrorMaker 2,更新 KafkaMirrorMaker2.spec.version

      注意

      如果使用手动构建的自定义镜像,您必须重建这些镜像以确保它们与 Apache Kafka 基础镜像的最新流最新。例如,如果您从 基础 Kafka Connect 镜像创建了容器镜像,请更新 Dockerfile 以指向最新的基础镜像和构建配置。

  7. 如果配置,将 Kafka 资源更新为使用新的 inter.broker.protocol.version 版本。否则,请转到第 9 步。

    例如,如果升级到 Kafka 3.8.0 :

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    spec:
      # ...
      kafka:
        version: 3.8.0
        config:
          log.message.format.version: "3.7"
          inter.broker.protocol.version: "3.8"
          # ...
    Copy to Clipboard Toggle word wrap
  8. 等待 Cluster Operator 更新集群。
  9. 如果配置,将 Kafka 资源更新为使用新的 log.message.format.version 版本。否则,请转到第 10 步。

    例如,如果升级到 Kafka 3.8.0 :

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    spec:
      # ...
      kafka:
        version: 3.8.0
        config:
          log.message.format.version: "3.8"
          inter.broker.protocol.version: "3.8"
          # ...
    Copy to Clipboard Toggle word wrap
    重要

    从 Kafka 3.0.0,当 inter.broker.protocol.version 设置为 3.0 或更高版本时,log.message.format.version 选项会被忽略,且不需要设置。

  10. 等待 Cluster Operator 更新集群。

    检查升级已从 Kafka 资源的状态 成功完成。

升级 Kafka 客户端应用程序

确保所有 Kafka 客户端应用程序都已更新为使用客户端二进制文件的新版本,作为升级过程的一部分,并验证其与 Kafka 升级的兼容性。如果需要,与负责管理客户端应用程序的团队协调。

提示

要检查客户端是否使用最新的消息格式,请使用 kafka.server:type=BrokerTopicMetrics,name={Produce|Fetch}MessageConversionsPerSec 指标。如果使用最新的消息格式,指标会显示 0。

返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat