25.7. 使用 ZooKeeper 升级 Kafka
如果您使用基于 ZooKeeper 的 Kafka 集群,升级需要更新 Kafka 版本和 inter-broker 协议版本。
如果要将 Kafka 集群从使用 ZooKeeper 切换,以便元数据管理在 KRaft 模式中操作,则必须独立于升级执行这些步骤。有关迁移到基于 KRaft 的集群的详情,请参考 第 8 章 迁移到 KRaft 模式。
25.7.1. 更新 Kafka 版本
当使用 ZooKeeper 进行集群管理时,在 Kafka
资源的配置中需要更新 Kafka 版本(Kafka.spec.kafka.version
)及其 inter-broker 协议版本(inter.broker.protocol.version
)。Kafka 的每个版本都有了一个内部代理协议的兼容版本。inter-broker 协议用于代理间通信。协议的次要版本通常会增加以匹配 Kafka 的次要版本,如上表中所示。inter-broker 协议版本在 Kafka
资源中设置 cluster wide。要更改它,您可以编辑 Kafka.spec.kafka.config
中的 inter.broker.protocol.version
属性。
下表显示了 Kafka 版本之间的区别:
Apache Kafka 版本流 | Kafka 版本 | inter-broker 协议版本 | 日志消息格式版本 | ZooKeeper 版本 |
---|---|---|---|---|
2.7 | 3.7.0 | 3.7 | 3.7 | 3.8.3 |
2.6 | 3.6.0 | 3.6 | 3.6 | 3.8.3 |
- Kafka 3.7.0 支持在生产环境中使用。
- Kafka 3.6.0 仅支持升级到 Apache Kafka 2.7 的 Streams 的目的。
日志消息格式版本
当制作者发送消息到 Kafka 代理时,消息会使用特定的格式进行编码。格式可能会在 Kafka 发行版本之间改变,因此消息指定它们编码的消息格式版本。
用于设置特定消息格式版本的属性如下:
-
主题的
message.format.version
属性 -
Kafka 代理的
log.message.format.version
属性
从 Kafka 3.0.0,消息格式版本值被假定为与 inter.broker.protocol.version
匹配,且不需要设置。该值反映了使用的 Kafka 版本。
当升级到 Kafka 3.0.0 或更高版本时,您可以在更新 inter.broker.protocol.version
时删除这些设置。否则,您可以根据您要升级到的 Kafka 版本设置消息格式版本。
由在 Kafka 代理中设置的 log.message.format.version
定义的 message.format.version
的默认值。您可以通过修改主题配置来手动设置主题的 message.format.version
。
Kafka 版本的滚动更新更改
当 Kafka 版本被更新时,Cluster Operator 会启动对 Kafka 代理的滚动更新。进一步的滚动更新依赖于 inter.broker.protocol.version
和 log.message.format.version
的配置。
如果 Kafka.spec.kafka.config 包含… | Cluster Operator 启动… |
---|---|
|
单个滚动更新。更新后,必须手动更新 |
| 两个滚动更新。 |
没有 | 两个滚动更新。 |
从 Kafka 3.0.0,当 inter.broker.protocol.version
设置为 3.0
或更高版本时,log.message.format.version
选项会被忽略,且不需要设置。代理的 log.message.format.version
属性和主题的 message.format.version
属性已弃用,并将在以后的 Kafka 发行版本中删除。
作为 Kafka 升级的一部分,Cluster Operator 为 ZooKeeper 启动滚动更新。
- 即使 ZooKeeper 版本没有改变,也会进行单个滚动更新。
- 如果 Kafka 的新版本需要新的 ZooKeeper 版本,则会进行额外的滚动更新。
25.7.2. 升级带有旧消息格式的客户端
在 Kafka 3.0 之前,您可以使用 log.message.format.version
属性(或主题级别上的 message.format.version
属性)为代理配置特定的消息格式。这允许代理适应使用过时的消息格式的旧 Kafka 客户端。虽然 Kafka 在没有显式设置此属性的情况下支持旧的客户端,但代理需要从旧的客户端转换信息,这会显著降低性能成本。
自 0.11 版以来,Apache Kafka Java 客户端支持最新的消息格式版本。如果您的所有客户端都使用最新的消息版本,您可以在升级代理时删除 log.message.format.version
或 message.format.version
覆盖。
但是,如果您仍然有使用较旧的消息格式版本的客户端,我们建议首先升级您的客户端。从消费者开始,然后在升级代理时删除 log.message.format.version
或 message.format.version
覆盖前升级制作者。这将确保您的所有客户端都支持最新的消息格式版本,且升级过程平稳。
您可以使用此指标跟踪 Kafka 客户端名称和版本:
-
kafka.server:type=socket-server-metrics,clientSoftwareName=<name>,clientSoftwareVersion=<version>,listener=<listener>,networkProcessor=<processor>
以下 Kafka 代理指标帮助监控消息 down-conversion 的性能:
-
kafka.network:type=RequestMetrics,name=MessageConversionsTimeMs,request={Produce|Fetch}
提供了执行消息转换的时间的指标。 -
kafka.server:type=BrokerTopicMetrics,name={Produce|Fetch}MessageConversionsPerSec,topic=([-.\w]+)
在一段时间内转换的信息数量上提供指标。
25.7.3. 升级基于 ZooKeeper 的 Kafka 集群和客户端应用程序
将基于 ZooKeeper 的 Apache Kafka 集群的流升级到较新的支持的 Kafka 版本和 inter-broker 协议版本。
您还应选择升级客户端的策略。在此流程的第 6 步中升级 Kafka 客户端。
先决条件
- Cluster Operator 已启动并在运行。
-
在升级 Apache Kafka 集群的 Streams 前,请检查
Kafka
资源的属性 不包含 新的 Kafka 版本不支持的配置选项。
流程
更新 Kafka 集群配置:
oc edit kafka <kafka_configuration_file>
如果配置,请检查
inter.broker.protocol.version
和log.message.format.version
属性是否已设置为 当前版本。例如,如果从 Kafka 版本 3.6.0 升级到 3.7.0,则当前版本是 3.6。
kind: Kafka spec: # ... kafka: version: 3.6.0 config: log.message.format.version: "3.6" inter.broker.protocol.version: "3.6" # ...
如果没有配置
log.message.format.version
和inter.broker.protocol.version
,则 Apache Kafka 的 Streams 会在在下一步中更新到 Kafka 版本后自动将这些版本更新为当前默认值。注意log.message.format.version
和inter.broker.protocol.version
的值必须是字符串,以防止它们被解释为浮点号。更改
Kafka.spec.kafka.version
以指定新的 Kafka 版本;将log.message.format.version
和inter.broker.protocol.version
保留为 当前 Kafka 版本的默认值。注意更改
kafka.version
可确保升级集群中的所有代理,以使用新的代理二进制文件。在此过程中,一些代理使用旧的二进制文件,而其他代理已升级到新的二进制文件。将inter.broker.protocol.version
保持不变在当前设置中,可确保代理可以在升级过程中继续相互通信。例如,如果从 Kafka 3.6.0 升级到 3.7.0 :
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka spec: # ... kafka: version: 3.7.0 1 config: log.message.format.version: "3.6" 2 inter.broker.protocol.version: "3.6" 3 # ...
警告如果新的 Kafka 版本更改的
inter.broker.protocol.version
,则无法降级 Kafka。inter-broker 协议版本决定用于代理存储的持久性元数据的模式,包括写入__consumer_offsets
的消息。降级的集群不了解消息。如果在 Kafka 自定义资源的
Kafka.spec.kafka.image
中定义了Kafka
集群的镜像
,请更新该镜像以指向新的 Kafka 版本的容器镜像。请参阅 Kafka 版本和镜像映射
保存并退出编辑器,然后等待滚动更新完成。
通过观察 pod 状态转换来检查滚动更新的进度:
oc get pods my-cluster-kafka-0 -o jsonpath='{.spec.containers[0].image}'
滚动更新可确保每个 pod 都使用 Kafka 的新版本的代理二进制文件。
根据您选择的 策略来升级客户端,升级所有客户端应用程序以使用客户端二进制文件的新版本。
如果需要,将 Kafka Connect 和 MirrorMaker 的
version
属性设置为 Kafka 的新版本:-
对于 Kafka Connect,更新
KafkaConnect.spec.version
。 -
对于 MirrorMaker,更新
KafkaMirrorMaker.spec.version
。 对于 MirrorMaker 2,更新
KafkaMirrorMaker2.spec.version
。注意如果使用手动构建的自定义镜像,您必须重建这些镜像以确保它们与 Apache Kafka 基础镜像的最新流最新。例如,如果您从 基础 Kafka Connect 镜像创建了容器镜像,请更新 Dockerfile 以指向最新的基础镜像和构建配置。
-
对于 Kafka Connect,更新
- 验证升级的客户端应用程序是否与新的 Kafka 代理正常工作。
如果配置,将 Kafka 资源更新为使用新的
inter.broker.protocol.version
版本。否则,请转到第 9 步。例如,如果升级到 Kafka 3.7.0 :
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka spec: # ... kafka: version: 3.7.0 config: log.message.format.version: "3.6" inter.broker.protocol.version: "3.7" # ...
- 等待 Cluster Operator 更新集群。
如果配置,将 Kafka 资源更新为使用新的
log.message.format.version
版本。否则,请转到第 10 步。例如,如果升级到 Kafka 3.7.0 :
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka spec: # ... kafka: version: 3.7.0 config: log.message.format.version: "3.7" inter.broker.protocol.version: "3.7" # ...
重要从 Kafka 3.0.0,当
inter.broker.protocol.version
设置为3.0
或更高版本时,log.message.format.version
选项会被忽略,且不需要设置。等待 Cluster Operator 更新集群。
您可以从
Kafka
资源的状态检查升级是否已成功完成。