9.5. 升级 Kafka
将 Cluster Operator 升级到 2.2 后,下一步是将所有 Kafka 代理升级到最新版本的 Kafka。
Kafka 升级由 Cluster Operator 通过滚动更新 Kafka 代理执行。
Cluster Operator 根据 Kafka 集群配置启动滚动更新。
如果 Kafka.spec.kafka.config 包含… | Cluster Operator 启动… |
|---|---|
|
|
单个滚动更新。在更新后,必须手动更新 |
|
| 两个滚动更新。 |
|
不配置 | 两个滚动更新。 |
从 Kafka 3.0.0,当 inter.broker.protocol.version 设置为 3.0 或更高版本时,logging.message.format.version 选项会被忽略,不需要设置。代理的 log.message.format.version 属性和主题的 message.format.version 属性已弃用,并将在以后的发行版本中删除。
作为 Kafka 升级的一部分,Cluster Operator 会启动 ZooKeeper 的滚动更新。
- 即使 ZooKeeper 版本没有改变,也会进行单个滚动更新。
- 如果新版本的 Kafka 需要新的 ZooKeeper 版本,则会出现额外的滚动更新。
9.5.1. Kafka 版本 复制链接链接已复制到粘贴板!
Kafka 的日志消息格式版本和 inter-broker 协议版本分别指定,附加到消息的日志格式版本以及集群中使用的 Kafka 协议版本。为确保使用了正确的版本,升级过程涉及对现有 Kafka 代理进行配置更改,并对客户端应用程序(消费者和生产者)进行代码更改。
下表显示了 Kafka 版本之间的区别:
| Kafka 版本 | Inter-broker 协议版本 | 日志消息格式版本 | zookeeper 版本 |
|---|---|---|---|
| 3.2.3 | 3.2 | 3.2 | 3.6.3 |
| 3.1.0 | 3.1 | 3.1 | 3.6.3 |
Inter-broker 协议版本
在 Kafka 中,用于交集通信的网络协议称为 inter-broker 协议。Kafka 的每个版本都有与 inter-broker 协议兼容的版本。协议的次要版本通常会增加以匹配 Kafka 的次版本,如上表中所示。
在 Kafka 资源中设置了 cluster wide 的 inter-broker 协议版本。要更改它,您可以编辑 Kafka.spec.kafka.config 中的 inter.broker.protocol.version 属性。
日志消息格式版本
当制作者向 Kafka 代理发送消息时,该消息会使用特定的格式进行编码。在 Kafka 发行版本间可能会更改格式,因此信息会指定它们采用哪一种消息格式版本。
用来设置特定消息格式版本的属性如下:
-
主题的
message.format.version属性 -
Kafka 代理的
log.message.format.version属性
从 Kafka 3.0.0,假设消息格式版本值与 inter.broker.protocol.version 相匹配,不需要设置。这些值反映了使用的 Kafka 版本。
当升级到 Kafka 3.0.0 或更高版本时,您可以在更新 inter.broker.protocol.version 时删除这些设置。否则,根据您要升级到的 Kafka 版本设置消息格式版本。
主题的默认值 message.format.version 由在 Kafka 代理上设置的 log.message.format.version 定义。您可以通过修改主题配置来手动设置主题的 message.format.version。
9.5.2. 升级客户端的策略 复制链接链接已复制到粘贴板!
升级客户端应用程序(包括 Kafka Connect 连接器)的正确方法取决于您的具体情况。
消耗应用程序需要以消息格式接收它们所了解的消息。您可以以两种方式之一来确保这种情况:
- 在升级任何生产者 之前,先升级 主题的所有消费者。
- 通过将代理关闭至旧格式。
使用 broker down-conversion 在代理中放入额外的负载,因此对于较长的时间段内,它并不能依赖于 down-conversion 来处理所有主题。对于执行优化的代理,不应完全降低转换消息。
代理 down-conversion 可以通过两种方式配置:
-
topic-level
message.format.version为单个主题进行配置。 -
broker-level
log.message.format.version是没有配置 topic 级别消息.format.version的主题的默认设置。
向主题发布的新版本格式的消息对消费者可见,因为代理在从生产者接收消息时而不是向消费者接收消息时执行 down-conversion。
您可以用来升级客户端的常见策略如下所述。也可以升级客户端应用程序的其他策略。
当升级到 Kafka 3.0.0 或更高版本时,每个策略中所述的步骤都会稍有不同。从 Kafka 3.0.0,假设消息格式版本值与 inter.broker.protocol.version 相匹配,不需要设置。
代理级消费者第一个策略
- 升级所有消耗的应用程序。
-
将 broker-level
log.message.format.version更改为新版本。 - 升级所有生产应用程序。
这个策略非常简单,并避免任何代理 down-conversion。但是,它假定您组织中的所有消费者都能以协调的方式升级,并且不适用于使用者和制作者的应用程序。如果升级的客户端存在问题,新的格式信息可能会添加到消息日志中,因此您无法恢复到以前的消费者版本。
主题级消费者优先策略
对于每个主题:
- 升级所有消耗的应用程序。
-
将 topic-level
message.format.version更改为新版本。 - 升级所有生产应用程序。
此策略可避免任何代理 down-conversion,意味着您可以按主题进行主题。它不适用于同一主题的使用者和生产者。同样,如果升级的客户端出现问题,新的格式信息可能会添加到消息日志中。
使用转换进行"主题级消费者"优先策略
对于每个主题:
-
将 topic-level
message.format.version改为旧版本(或依赖主题 defaulting to the broker-levellog.message.format.version)。 - 升级所有消耗和生产应用程序。
- 验证升级的应用程序是否正常工作。
-
将 topic-level
message.format.version更改为新版本。
这个策略需要 broker down-conversion,但代理的负载会被最小化,因为它只需要单个主题(或小组主题)。它还适用于同一主题的使用者和生产者。这种方法可确保升级的制作者和使用者在提交使用新消息格式版本之前正常工作。
这种方法的主要缺点是,在具有许多主题和应用程序的集群中管理可能比较复杂。
也可以应用多个策略。例如,对于前几个应用程序和主题用户首先可以使用 "per-topic 用户,可以使用 down conversion"策略。经证实成功了另外一个成功时,可以考虑使用更有效的策略。
9.5.3. Kafka 版本和镜像映射 复制链接链接已复制到粘贴板!
在升级 Kafka 时,请考虑 STRIMZI_KAFKA_IMAGES 环境变量和 Kafka.spec.kafka.version 属性的设置。
-
每个
Kafka资源都可使用Kafka.spec.kafka.version配置。 Cluster Operator 的
STRIMZI_KAFKA_IMAGES环境变量提供在给定 Kafka 资源中请求该版本时使用的Kafka版本和镜像之间的映射。-
如果没有配置
Kafka.spec.kafka.image,则会使用给定版本的默认镜像。 -
如果配置了
Kafka.spec.kafka.image,则默认镜像会被覆盖。
-
如果没有配置
Cluster Operator 无法验证镜像是否实际包含预期版本的 Kafka 代理。请小心操作,以确保指定的镜像与给定的 Kafka 版本对应。
9.5.4. 升级 Kafka 代理和客户端应用程序 复制链接链接已复制到粘贴板!
这个步骤描述了如何将 AMQ Streams Kafka 集群升级到最新支持的 Kafka 版本。
与当前的 Kafka 版本相比,新版本可能会支持更大的 日志消息格式版本或 inter-broker 协议版本,或两者中。如果需要,请按照以下步骤升级这些版本。更多信息请参阅 第 9.5.1 节 “Kafka 版本”。
您还应选择 升级客户端 的策略。Kafka 客户端在此流程的第 6 步升级。
先决条件
要升级 Kafka 资源,请检查以下内容:
- 支持 Kafka 版本的 Cluster Operator 已启动并在运行。
-
Kafka.spec.kafka.config不包含 在新 Kafka 版本不支持的选项。
流程
更新 Kafka 集群配置:
oc edit kafka my-cluster
oc edit kafka my-clusterCopy to Clipboard Copied! Toggle word wrap Toggle overflow 如果配置,请确保
Kafka.spec.kafka.config将log.message.format.version和inter.broker.protocol.version设置为 当前 Kafka 版本的默认值。例如,如果从 Kafka 版本 3.1.0 升级到 3.2.3:
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 如果没有配置
log.message.format.version和inter.broker.protocol.version,AMQ Streams 会在下一步升级到 Kafka 版本后自动将这些版本更新为当前默认值。注意log.message.format.version和inter.broker.protocol.version的值必须是字符串,以防止它们被解释为浮点号。更改
Kafka.spec.kafka.version以指定新的 Kafka 版本;将log.message.format.version和inter.broker.protocol.version保留为 当前 Kafka 版本的默认值。注意更改
kafka.version可确保将使用新的代理二进制文件升级集群中的所有代理。在此过程中,一些代理使用旧的二进制文件,而其他代理已升级到新的二进制文件。不要使inter.broker.protocol.version保持不变,代理可以在升级期间继续相互通信。例如,如果从 Kafka 3.1.0 升级到 3.2.3:
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 警告如果新 Kafka 版本有
inter.broker.protocol.version的变化,则无法降级 Kafka。inter-broker 协议版本决定了用于代理存储的持久性元数据的 schema,包括写入__consumer_offsets的消息。降级的集群将无法了解信息。如果在 Kafka 自定义资源中定义了 Kafka 集群的镜像,在
Kafka.spec.kafka.image中,更新image以指向新的 Kafka 版本的容器镜像。请参阅 Kafka 版本和镜像映射
保存并退出编辑器,然后等待滚动更新完成。
通过观察 pod 状态转换来检查滚动更新的进度:
oc get pods my-cluster-kafka-0 -o jsonpath='{.spec.containers[0].image}'oc get pods my-cluster-kafka-0 -o jsonpath='{.spec.containers[0].image}'Copy to Clipboard Copied! Toggle word wrap Toggle overflow 滚动更新可确保每个 pod 都对 Kafka 的新版本使用 broker 二进制文件。
根据您选择的 策略来升级客户端,将所有客户端应用升级到使用客户端二进制文件的新版本。
如果需要,将 Kafka Connect 和 MirrorMaker
的版本属性设置为 Kafka 的新版本:-
对于 Kafka Connect,更新
KafkaConnect.spec.version。 -
对于 MirrorMaker,更新
KafkaMirrorMaker.spec.version。 -
对于 MirrorMaker 2.0,更新
KafkaMirrorMaker2.spec.version。
-
对于 Kafka Connect,更新
如果配置,更新 Kafka 资源以使用新的
inter.broker.protocol.version版本。否则,请转到第 9 步。例如,如果升级到 Kafka 3.2.3:
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - 等待 Cluster Operator 更新集群。
如果配置,更新 Kafka 资源以使用新的
log.message.format.version版本。否则,转至第 10 步。例如,如果升级到 Kafka 3.2.3:
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 重要从 Kafka 3.0.0,当
inter.broker.protocol.version设置为3.0或更高版本时,logging.message.format.version选项会被忽略,不需要设置。等待 Cluster Operator 更新集群。
- Kafka 集群和客户端现在使用新的 Kafka 版本。
- 代理被配置为使用新版本的 Kafka 的 inter-broker 协议版本和消息格式版本来发送消息。
如果需要,在 Kafka 升级后,您可以: