第 7 章 升级 AMQ Streams


AMQ Streams 可以升级,无需集群停机。AMQ Streams 的每个版本都支持一个或多个 Apache Kafka 版本。只要您的 AMQ Streams 版本支持,就可以升级到更高的 Kafka 版本。

AMQ Streams 的新版本可能支持较新版本的 Kafka,但需要先升级 AMQ Streams,然后才能 升级到更高版本的 Kafka。

要升级 AMQ Streams Operator,您可以使用 OpenShift Container Platform 集群上的 Operator Lifecycle Manager(OLM)。

重要

如果适用,必须在 升级 AMQ Streams 和 Kafka 后进行资源 升级。

7.1. AMQ Streams 和 Kafka 升级

升级 AMQ Streams 是一个两阶段的过程。要在没有停机的情况下升级代理和客户端,您必须 按照以下顺序完成升级过程:

  1. 将 Cluster Operator 更新至最新的 AMQ Streams 版本。您采取的方法取决于如何 部署 Cluster Operator

    • 如果使用安装 YAML 文件部署 Cluster Operator,请通过 修改 Operator 安装文件 来执行升级。
    • 如果您从 OperatorHub 部署 Cluster Operator,请使用 Operator Lifecycle Manager(OLM)将 AMQ Streams Operator 的更新频道改为新的 AMQ Streams 版本。

      根据您选择的升级策略,按照频道更新进行操作:

      • 启动 自动 升级
      • 然后,在开始安装前需要批准 手动 升级

      有关使用 OperatorHub 升级 Operator 的更多信息,请参阅 升级已安装的 Operator

  2. 将所有 Kafka 代理和客户端应用程序升级到最新的 Kafka 版本。

7.1.1. Kafka 版本

Kafka 的日志消息格式版本和broker 协议版本指定附加到消息的日志格式版本以及集群中使用的协议版本。因此,升级过程包括对现有的 Kafka 代理进行配置更改,并对客户端应用程序(使用者和生产者)进行代码更改以确保使用正确的版本。

下表显示了 Kafka 版本之间的区别:

Kafka 版本Interbroker 协议版本日志消息格式版本zookeeper 版本

2.5.0

2.5

2.5

3.5.8

2.6.0

2.6

2.6

3.5.8

消息格式版本

当生产者向 Kafka 代理发送消息时,该消息将使用特定格式进行编码。Kafka 版本的格式可能会改变,因此消息包括一个版本,用于标识它们编码的格式版本。您可以配置 Kafka 代理,在代理将消息附加到日志前将信息从较新的格式转换为给定的旧格式版本。

在 Kafka 中,设置消息格式版本的方法有两种:

  • message.format.version 属性在主题上设置。
  • log.message.format.version 属性在 Kafka 代理上设置。

主题的 message.format.version 默认值由 Kafka 代理上设置的 log.message.format.version 定义。您可以通过修改主题配置来手动设置主题的 message.format.version

本节中的升级任务假定消息格式版本由 log.message.format.version 定义。

7.1.2. 升级 Cluster Operator

本节概述了将 Cluster Operator 部署升级到使用 AMQ Streams 1.6 的步骤。

由 Cluster Operator 管理的 Kafka 集群的可用性不受升级操作的影响。

注意

有关如何升级到该版本的信息,请参阅支持 AMQ Streams 特定版本的文档。

7.1.2.1. 将 Cluster Operator 升级到更新的版本

此流程描述了如何将 Cluster Operator 部署升级到更新的版本。

先决条件

步骤

  1. 记录对现有 Cluster Operator 资源所做的任何配置更改(在 /install/cluster-operator 目录中)。任何更改都将被 Cluster Operator 的新版本 覆盖
  2. 更新 Cluster Operator。

    1. 根据 Cluster Operator 运行的命名空间修改新版本的安装文件。

      在 Linux 中,使用:

      sed -i 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml
      Copy to Clipboard

      在 MacOS 中,使用:

      sed -i '' 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml
      Copy to Clipboard
    2. 如果在现有 Cluster Operator Deployment 中修改了一个或多个环境变量,请编辑 install/cluster-operator/060-Deployment-cluster-operator.yaml 文件来使用这些环境变量。
  3. 当您有更新的配置时,请将其与其它安装资源一起部署:

    oc apply -f install/cluster-operator
    Copy to Clipboard

    等待滚动更新完成。

  4. 获取 Kafka pod 的镜像以确保升级成功:

    oc get po my-cluster-kafka-0 -o jsonpath='{.spec.containers[0].image}'
    Copy to Clipboard

    镜像标签显示新的 AMQ Streams 版本,后跟 Kafka 版本。例如,<New AMQ Streams version>-kafka-<Current Kafka version>

  5. 更新现有资源以处理已弃用的自定义资源属性。

现在,您有一个更新的 Cluster Operator,但在它管理的集群中运行的 Kafka 版本没有改变。

接下来要做什么

在 Cluster Operator 升级后,您可以执行 Kafka 升级

7.1.3. 升级 Kafka

将 Cluster Operator 升级到 1.6 后,下一步是将所有 Kafka 代理升级到最新支持的 Kafka 版本。

Kafka 升级由 Cluster Operator 通过对 Kafka 代理的滚动更新来执行。

Cluster Operator 根据 Kafka 集群配置启动滚动更新。

如果 Kafka.spec.kafka.config 包含…​Cluster Operator 启动s…​

inter.broker.protocol.versionlog.message.format.version

单个滚动更新.更新后,inter.broker.protocol.version 必须手动更新,后接 log.message.format.version。更改每项将触发进一步滚动更新。

inter.broker.protocol.versionlog.message.format.version

两次滚动更新.

没有 inter.broker.protocol.version 或 log.message.format.version 的配置。

两次滚动更新.

作为 Kafka 升级的一部分,Cluster Operator 会启动 ZooKeeper 的滚动更新。

  • 即使 ZooKeeper 版本未更改,也会出现一次滚动更新。
  • 如果 Kafka 的新版本需要新的 ZooKeeper 版本,则会出现额外的滚动更新。

7.1.3.1. Kafka 版本和镜像映射

在升级 Kafka 时,请考虑对 STRIMZI_KAFKA_IMAGESKafka.spec.kafka.version 属性的设置。

  • 每个 Kafka 资源都可以使用 Kafka.spec.kafka.version 配置。
  • Cluster Operator 的 STRIMZI_KAFKA_IMAGES 环境变量提供 Kafka 版本和给定 Kafka 资源中请求该版本时使用的镜像之间的映射。

    • 如果没有配置 Kafka.spec.kafka.image,则会使用给定版本的默认镜像。
    • 如果配置了 Kafka.spec.kafka.image,则默认镜像会被覆盖。
警告

Cluster Operator 无法验证镜像是否实际包含预期版本的 Kafka 代理。请特别注意确保给定镜像与给定的 Kafka 版本对应。

7.1.3.2. 升级客户端的策略

升级客户端应用程序(包括 Kafka Connect 连接器)的最佳方法取决于您的具体情形。

使用应用程序时,需要以它们所理解的消息格式接收消息。您可以通过以下两种方式之一来确保这是这种情况:

  • 在升级任何制作者 之前,先 升级所有消费者的主题。
  • 通过将代理关闭消息置于较旧格式。

使用代理 down-conversion 会给代理带来额外的负载,因此在较长时间内依赖所有主题并不理想。要使代理以最佳方式运行,绝对不应是向下转换消息。

代理 down-conversion 可通过两种方式进行配置:

  • 主题级别 message.format.version 为单个主题配置它。
  • 代理级 log.message.format.version 是未配置主题级别 message.format.version 的主题的默认设置。

以新版本格式发布到主题的消息对消费者可见,因为代理在从生产者接收消息时执行 down-conversion,而不是当它们发送到消费者时。

您可以使用多个策略来升级客户端:

消费者第一
  1. 升级所有使用的应用程序。
  2. 将代理级别的 log.message.format.version 更改为新版本。
  3. 升级所有生成的应用程序。

    此策略非常简单,可避免任何代理停机转换。但是,它假定贵组织中的所有消费者可以协调升级,而且不适用于消费者和生产者的应用程序。如果升级的客户端出现问题,新格式消息可能会添加到消息日志中,因此您无法恢复到以前的使用者版本。

每个主题消费者首先

对于每个主题:

  1. 升级所有使用的应用程序。
  2. 将主题级 message.format.version 更改为新版本。
  3. 升级所有生成的应用程序。

    此策略可避免任何代理 down-conversion,并且意味着您可以逐个主题地进行。它不适用于同时作为同一主题的消费者和制作者的应用。再强调一下,如果升级的客户端出现问题,新格式消息可能会添加到消息日志中。

首先每个主题消费者,进行向下转换

对于每个主题:

  1. 将主题级 message.format.version 更改为旧版本(或依赖主题默认到代理级别的 log.message.format.version)。
  2. 升级所有使用和生产的应用程序。
  3. 验证升级的应用是否正常工作。
  4. 将主题级 message.format.version 更改为新版本。

    这个策略需要代理 down-conversion,但代理的负载会最小化,因为一次只需要单个主题(或小组主题)需要它。它还适用于同时是同一主题的消费者和制作者的应用。这种方法可确保升级的生产者和消费者在提交使用新的消息格式版本之前正常工作。

    这种方法的主要缺点是,在含有许多主题和应用的集群中进行管理可能比较复杂。

还可以通过其他策略来升级客户端应用。

注意

也可以应用多个策略。例如,在前几个应用程序和主题中,"每个主题消费者首先使用",可以使用向下转换"策略"。如果这已被证明是成功的,那么更高效的策略可被视为可以接受采用。

7.1.3.3. 升级 Kafka 代理和客户端应用程序

这个步骤描述了如何将 AMQ Streams Kafka 集群升级到最新支持的 Kafka 版本。

与您当前的 Kafka 版本相比,新版本可能支持更高的 日志消息格式版本 或代理协议版本,或两者。如果需要,请按照步骤升级这些版本。如需更多信息,请参阅 第 7.1.1 节 “Kafka 版本”

您还应该选择 升级客户端的策略。Kafka 客户端在此流程的第 6 步中升级。

先决条件

要升级 Kafka 资源,请检查:

  • 支持两个 Kafka 版本的 Cluster Operator 已启动且正在运行。
  • Kafka.spec.kafka.config 不包含 新 Kafka 版本中不支持的选项。

步骤

  1. 更新 Kafka 集群配置:

    oc edit kafka my-cluster
    Copy to Clipboard
  2. 如果配置了,请确保 Kafka.spec.kafka.config 的 log.message.format .version 和 inter.broker.protocol.version 设置为 当前 Kafka 版本的默认值。

    例如,如果从 Kafka 版本 2.5.0 升级到 2.6.0:

    kind: Kafka
    spec:
      # ...
      kafka:
        version: 2.5.0
        config:
          log.message.format.version: "2.5"
          inter.broker.protocol.version: "2.5"
          # ...
    Copy to Clipboard

    如果没有配置 log.message.format .version 和 inter.broker.protocol.version,AMQ Streams 会在下一步中将这些版本更新至 Kafka 版本后自动将这些版本更新至当前的默认值。

    注意

    log.message.format.versioninter.broker.protocol.version 的值必须是字符串,以防止它们被解释为浮动点数。

  3. 更改 Kafka.spec.kafka.version 以指定新的 Kafka 版本;保留 log.message.format.versioninter.broker.protocol.version,默认为 当前 Kafka 版本。

    注意

    更改 kafka.version 可确保集群中的所有代理都将升级为使用新的代理二进制文件。在此过程中,一些代理使用旧二进制文件,另一些则已升级至新二进制文件。不要更改 inter.broker.protocol.version,确保代理可以在升级过程中继续相互通信。

    例如,如果从 Kafka 2.5.0 升级到 2.6.0:

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    spec:
      # ...
      kafka:
        version: 2.6.0 
    1
    
        config:
          log.message.format.version: "2.5" 
    2
    
          inter.broker.protocol.version: "2.5" 
    3
    
          # ...
    Copy to Clipboard
    1
    Kafka 版本已改为新版本。
    2
    消息格式版本保持不变。
    3
    代理协议版本没有改变。
    警告

    如果新 Kafka 版本的 inter.broker.protocol.version 发生变化,则无法降级 Kafka。broker 协议版本决定了代理存储的持久元数据的模式,包括写入到 __consumer_offset 的消息。降级集群将不会理解消息。

  4. 如果在 Kafka 自定义资源(在 Kafka .spec.kafka.image 中)中定义了 Kafka 集群 的镜像,请将镜像 更新为指向具有新 Kafka 版本的容器镜像。

    请参阅 Kafka 版本和镜像映射

  5. 保存并退出编辑器,然后等待滚动更新完成。

    通过观察 pod 状态转换来检查滚动更新的进度:

    oc get pods my-cluster-kafka-0 -o jsonpath='{.spec.containers[0].image}'
    Copy to Clipboard

    滚动更新可确保每个 pod 在新版本的 Kafka 中使用代理二进制文件。

  6. 根据您选择的 升级客户端策略,升级所有客户端应用以使用新版本的客户端二进制文件。

    如果需要,将 Kafka Connect 和 MirrorMaker 的 version 属性设置为 Kafka 的新版本:

    1. 对于 Kafka Connect,更新 KafkaConnect.spec.version
    2. 对于 MirrorMaker,更新 KafkaMirrorMaker.spec.version
    3. 对于 MirrorMaker 2.0,更新 KafkaMirrorMaker2.spec.version
  7. 如果配置,更新 Kafka 资源,以使用新的 inter.broker.protocol.version 版本。否则,请转至第 9 步。

    例如,如果升级到 Kafka 2.6.0:

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    spec:
      # ...
      kafka:
        version: 2.6.0
        config:
          log.message.format.version: "2.5"
          inter.broker.protocol.version: "2.6"
          # ...
    Copy to Clipboard
  8. 等待 Cluster Operator 更新集群。
  9. 如果配置,更新 Kafka 资源,以使用新的 log.message.format.version 版本。否则,请转至第 10 步。

    例如,如果升级到 Kafka 2.6.0:

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    spec:
      # ...
      kafka:
        version: 2.6.0
        config:
          log.message.format.version: "2.6"
          inter.broker.protocol.version: "2.6"
          # ...
    Copy to Clipboard
  10. 等待 Cluster Operator 更新集群。

    • Kafka 集群和客户端现在使用新的 Kafka 版本。
    • 代理配置为使用 Kafka 的新版本的 Inter-broker 协议版本和消息格式发送消息。

在 Kafka 升级后,如果需要,您可以:

7.1.3.4. 更新监听程序配置

AMQ Streams 提供 GenericKafkaListener 模式,用于在 Kafka 资源 中配置 Kafka 侦听器。

GenericKafkaListener 取代 KafkaListeners 模式,后者已被弃用。

使用 GenericKafkaListener 模式时,您可以根据需要配置多个监听器,只要它们的名称和端口是唯一的。监听器 配置被定义为数组,但已弃用的格式也受到支持。

对于 OpenShift 集群内的客户端,您可以创建 普通 (无需加密)或 tls 内部 侦听器。

对于 OpenShift 集群外的客户端,您可以创建 外部 侦听器并指定连接机制,可以是 节点端口负载均衡器、 入口路由

KafkaListeners 模式将子属性用于 纯文本tls 和外部 监听程序,以及每个节点的固定端口。升级 Kafka 后,您可以将使用 KafkaListeners 模式配置的监听程序转换为 GenericKafkaListener 模式的格式。

例如,如果您当前在 Kafka 配置中使用以下配置:

旧监听程序配置

listeners:
  plain:
    # ...
  tls:
    # ...
  external:
    type: loadbalancer
    # ...
Copy to Clipboard

使用以下方法将监听程序转换为新格式:

新的监听程序配置

listeners:
  #...
  - name: plain
    port: 9092
    type: internal
    tls: false 
1

  - name: tls
    port: 9093
    type: internal
    tls: true
  - name: external
    port: 9094
    type: EXTERNAL-LISTENER-TYPE 
2

    tls: true
Copy to Clipboard

1
现在,所有监听器都需要 TLS 属性。
2
选项: 入口负载均衡器、 节点端口路由

确保使用显示 的确切 名称和端口号。

对于任何其他 配置覆盖 使用旧格式的属性,您需要将它们更新为新格式。

对监听程序 配置 进行了更改:

  • 覆盖configuration 部分合并
  • dnsAnnotations 已重命名为 注解
  • preferredAddressType 被重命名为 preferredNodePortAddressType
  • 地址 已被重命名为 alternativesNames
  • LoadBalancerSourceRanges 和 external TrafficPolicy 移至现在已弃用 的模板中的监听程序配置

例如,这个配置:

旧的其他监听程序配置

listeners:
  external:
    type: loadbalancer
    authentication:
      type: tls
    overrides:
      bootstrap:
        dnsAnnotations:
          #...
Copy to Clipboard

更改:

新的附加监听程序配置

listeners:
    #...
  - name: external
    port: 9094
    type:loadbalancer
    tls: true
    authentication:
      type: tls
    configuration:
      bootstrap:
        annotations:
          #...
Copy to Clipboard

重要

新监听器配置中显示的名称和端口号 必须 用于向后兼容。使用任何其他值将导致 Kafka 侦听器和 OpenShift 服务重命名。

有关各种侦听器可用的配置选项的更多信息,请参阅 GenericKafkaListener 模式参考

7.1.3.5. 升级消费者和 Kafka Streams 应用程序以合作重新平衡

您可以升级 Kafka 用户和 Kafka Streams 应用程序,以使用 增量合作重新平衡 协议进行分区重新平衡,而不是默认的预先 重新平衡 协议。在 Kafka 2.4.0 中添加了新协议。

消费者将分区分配保持在合作重新平衡状态,并仅在流程结束时予以撤销(如果需要)来实现平衡集群。这可减少使用者组或 Kafka Streams 应用程序的不可用。

注意

升级到增量合作重新平衡协议是可选的。预先重新平衡协议仍然受到支持。

先决条件

步骤

要升级 Kafka 消费者以使用增量合作重新平衡协议:

  1. 将 Kafka client .jar 文件替换为新版本。
  2. 在使用者配置中,将 合作粘滞 附加到 partition.assignment.strategy。例如,如果设定了 范围 策略,请将配置更改为 范围,协作粘性
  3. 依次重新启动组中的每个使用者,等待使用者在每次重启后重新加入组。
  4. 通过从使用者配置中删除先前的 partition.assignment.strategy 来 重新配置组中的每个消费者,仅保留 协作式 策略。
  5. 依次重新启动组中的每个使用者,等待使用者在每次重启后重新加入组。

升级 Kafka Streams 应用程序以使用增量合作重新平衡协议:

  1. 将 Kafka Streams .jar 文件替换为新版本。
  2. 在 Kafka Streams 配置中,将 upgrade.from 配置参数设置为您要从中升级的 Kafka 版本(例如 2.3)。
  3. 依次重新启动每个流处理器(节点)。
  4. 从 Kafka Streams 配置中删除 upgrade.from 配置参数。
  5. 依次重新启动组中的每个消费者。

其他资源

返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat