第 7 章 升级 AMQ Streams
AMQ Streams 可以升级,无需集群停机。AMQ Streams 的每个版本都支持一个或多个 Apache Kafka 版本。只要您的 AMQ Streams 版本支持,就可以升级到更高的 Kafka 版本。
AMQ Streams 的新版本可能支持较新版本的 Kafka,但需要先升级 AMQ Streams,然后才能 升级到更高版本的 Kafka。
要升级 AMQ Streams Operator,您可以使用 OpenShift Container Platform 集群上的 Operator Lifecycle Manager(OLM)。
如果适用,必须在 升级 AMQ Streams 和 Kafka 后进行资源 升级。
7.1. AMQ Streams 和 Kafka 升级
升级 AMQ Streams 是一个两阶段的过程。要在没有停机的情况下升级代理和客户端,您必须 按照以下顺序完成升级过程:
将 Cluster Operator 更新至最新的 AMQ Streams 版本。您采取的方法取决于如何 部署 Cluster Operator。
- 如果使用安装 YAML 文件部署 Cluster Operator,请通过 修改 Operator 安装文件 来执行升级。
如果您从 OperatorHub 部署 Cluster Operator,请使用 Operator Lifecycle Manager(OLM)将 AMQ Streams Operator 的更新频道改为新的 AMQ Streams 版本。
根据您选择的升级策略,按照频道更新进行操作:
- 启动 自动 升级
- 然后,在开始安装前需要批准 手动 升级
有关使用 OperatorHub 升级 Operator 的更多信息,请参阅 升级已安装的 Operator。
- 将所有 Kafka 代理和客户端应用程序升级到最新的 Kafka 版本。
7.1.1. Kafka 版本
Kafka 的日志消息格式版本和broker 协议版本指定附加到消息的日志格式版本以及集群中使用的协议版本。因此,升级过程包括对现有的 Kafka 代理进行配置更改,并对客户端应用程序(使用者和生产者)进行代码更改以确保使用正确的版本。
下表显示了 Kafka 版本之间的区别:
Kafka 版本 | Interbroker 协议版本 | 日志消息格式版本 | zookeeper 版本 |
---|---|---|---|
2.5.0 | 2.5 | 2.5 | 3.5.8 |
2.6.0 | 2.6 | 2.6 | 3.5.8 |
消息格式版本
当生产者向 Kafka 代理发送消息时,该消息将使用特定格式进行编码。Kafka 版本的格式可能会改变,因此消息包括一个版本,用于标识它们编码的格式版本。您可以配置 Kafka 代理,在代理将消息附加到日志前将信息从较新的格式转换为给定的旧格式版本。
在 Kafka 中,设置消息格式版本的方法有两种:
-
message.format.version
属性在主题上设置。 -
log.message.format.version
属性在 Kafka 代理上设置。
主题的 message.format.version
默认值由 Kafka 代理上设置的 log.message.format.version
定义。您可以通过修改主题配置来手动设置主题的 message.format.version
。
本节中的升级任务假定消息格式版本由 log.message.format.version
定义。
7.1.2. 升级 Cluster Operator
本节概述了将 Cluster Operator 部署升级到使用 AMQ Streams 1.6 的步骤。
由 Cluster Operator 管理的 Kafka 集群的可用性不受升级操作的影响。
有关如何升级到该版本的信息,请参阅支持 AMQ Streams 特定版本的文档。
7.1.2.1. 将 Cluster Operator 升级到更新的版本
此流程描述了如何将 Cluster Operator 部署升级到更新的版本。
先决条件
- 现有 Cluster Operator 部署可用。
- 已 下载新版本的安装文件。
步骤
-
记录对现有 Cluster Operator 资源所做的任何配置更改(在
/install/cluster-operator
目录中)。任何更改都将被 Cluster Operator 的新版本 覆盖。 更新 Cluster Operator。
根据 Cluster Operator 运行的命名空间修改新版本的安装文件。
在 Linux 中,使用:
sed -i 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml
sed -i 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml
Copy to Clipboard Copied! 在 MacOS 中,使用:
sed -i '' 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml
sed -i '' 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml
Copy to Clipboard Copied! -
如果在现有 Cluster Operator
Deployment
中修改了一个或多个环境变量,请编辑install/cluster-operator/060-Deployment-cluster-operator.yaml
文件来使用这些环境变量。
当您有更新的配置时,请将其与其它安装资源一起部署:
oc apply -f install/cluster-operator
oc apply -f install/cluster-operator
Copy to Clipboard Copied! 等待滚动更新完成。
获取 Kafka pod 的镜像以确保升级成功:
oc get po my-cluster-kafka-0 -o jsonpath='{.spec.containers[0].image}'
oc get po my-cluster-kafka-0 -o jsonpath='{.spec.containers[0].image}'
Copy to Clipboard Copied! 镜像标签显示新的 AMQ Streams 版本,后跟 Kafka 版本。例如,
<New AMQ Streams version>-kafka-<Current Kafka version>
。更新现有资源以处理已弃用的自定义资源属性。
现在,您有一个更新的 Cluster Operator,但在它管理的集群中运行的 Kafka 版本没有改变。
接下来要做什么
在 Cluster Operator 升级后,您可以执行 Kafka 升级。
7.1.3. 升级 Kafka
将 Cluster Operator 升级到 1.6 后,下一步是将所有 Kafka 代理升级到最新支持的 Kafka 版本。
Kafka 升级由 Cluster Operator 通过对 Kafka 代理的滚动更新来执行。
Cluster Operator 根据 Kafka 集群配置启动滚动更新。
如果 Kafka.spec.kafka.config 包含… | Cluster Operator 启动s… |
---|---|
|
单个滚动更新.更新后, |
| 两次滚动更新. |
没有 | 两次滚动更新. |
作为 Kafka 升级的一部分,Cluster Operator 会启动 ZooKeeper 的滚动更新。
- 即使 ZooKeeper 版本未更改,也会出现一次滚动更新。
- 如果 Kafka 的新版本需要新的 ZooKeeper 版本,则会出现额外的滚动更新。
7.1.3.1. Kafka 版本和镜像映射
在升级 Kafka 时,请考虑对 STRIMZI_KAFKA_IMAGES
和 Kafka.spec.kafka.version
属性的设置。
-
每个
Kafka
资源都可以使用Kafka.spec.kafka.version
配置。 Cluster Operator 的
STRIMZI_KAFKA_IMAGES
环境变量提供 Kafka 版本和给定Kafka
资源中请求该版本时使用的镜像之间的映射。-
如果没有配置
Kafka.spec.kafka.image
,则会使用给定版本的默认镜像。 -
如果配置了
Kafka.spec.kafka.image
,则默认镜像会被覆盖。
-
如果没有配置
Cluster Operator 无法验证镜像是否实际包含预期版本的 Kafka 代理。请特别注意确保给定镜像与给定的 Kafka 版本对应。
7.1.3.2. 升级客户端的策略
升级客户端应用程序(包括 Kafka Connect 连接器)的最佳方法取决于您的具体情形。
使用应用程序时,需要以它们所理解的消息格式接收消息。您可以通过以下两种方式之一来确保这是这种情况:
- 在升级任何制作者 之前,先 升级所有消费者的主题。
- 通过将代理关闭消息置于较旧格式。
使用代理 down-conversion 会给代理带来额外的负载,因此在较长时间内依赖所有主题并不理想。要使代理以最佳方式运行,绝对不应是向下转换消息。
代理 down-conversion 可通过两种方式进行配置:
-
主题级别
message.format.version
为单个主题配置它。 -
代理级
log.message.format.version
是未配置主题级别message.format.version
的主题的默认设置。
以新版本格式发布到主题的消息对消费者可见,因为代理在从生产者接收消息时执行 down-conversion,而不是当它们发送到消费者时。
您可以使用多个策略来升级客户端:
- 消费者第一
- 升级所有使用的应用程序。
-
将代理级别的
log.message.format.version
更改为新版本。 升级所有生成的应用程序。
此策略非常简单,可避免任何代理停机转换。但是,它假定贵组织中的所有消费者可以协调升级,而且不适用于消费者和生产者的应用程序。如果升级的客户端出现问题,新格式消息可能会添加到消息日志中,因此您无法恢复到以前的使用者版本。
- 每个主题消费者首先
对于每个主题:
- 升级所有使用的应用程序。
-
将主题级
message.format.version
更改为新版本。 升级所有生成的应用程序。
此策略可避免任何代理 down-conversion,并且意味着您可以逐个主题地进行。它不适用于同时作为同一主题的消费者和制作者的应用。再强调一下,如果升级的客户端出现问题,新格式消息可能会添加到消息日志中。
- 首先每个主题消费者,进行向下转换
对于每个主题:
-
将主题级
message.format.version
更改为旧版本(或依赖主题默认到代理级别的log.message.format.version
)。 - 升级所有使用和生产的应用程序。
- 验证升级的应用是否正常工作。
将主题级
message.format.version
更改为新版本。这个策略需要代理 down-conversion,但代理的负载会最小化,因为一次只需要单个主题(或小组主题)需要它。它还适用于同时是同一主题的消费者和制作者的应用。这种方法可确保升级的生产者和消费者在提交使用新的消息格式版本之前正常工作。
这种方法的主要缺点是,在含有许多主题和应用的集群中进行管理可能比较复杂。
-
将主题级
还可以通过其他策略来升级客户端应用。
也可以应用多个策略。例如,在前几个应用程序和主题中,"每个主题消费者首先使用",可以使用向下转换"策略"。如果这已被证明是成功的,那么更高效的策略可被视为可以接受采用。
7.1.3.3. 升级 Kafka 代理和客户端应用程序
这个步骤描述了如何将 AMQ Streams Kafka 集群升级到最新支持的 Kafka 版本。
与您当前的 Kafka 版本相比,新版本可能支持更高的 日志消息格式版本 或代理协议版本,或两者。如果需要,请按照步骤升级这些版本。如需更多信息,请参阅 第 7.1.1 节 “Kafka 版本”。
您还应该选择 升级客户端的策略。Kafka 客户端在此流程的第 6 步中升级。
先决条件
要升级 Kafka
资源,请检查:
- 支持两个 Kafka 版本的 Cluster Operator 已启动且正在运行。
-
Kafka.spec.kafka.config
不包含 新 Kafka 版本中不支持的选项。
步骤
更新 Kafka 集群配置:
oc edit kafka my-cluster
oc edit kafka my-cluster
Copy to Clipboard Copied! 如果配置了,请确保
Kafka.spec.kafka.config
的 log.message.format
设置为 当前 Kafka 版本的默认值。.version
和 inter.broker.protocol.version例如,如果从 Kafka 版本 2.5.0 升级到 2.6.0:
kind: Kafka spec: # ... kafka: version: 2.5.0 config: log.message.format.version: "2.5" inter.broker.protocol.version: "2.5" # ...
kind: Kafka spec: # ... kafka: version: 2.5.0 config: log.message.format.version: "2.5" inter.broker.protocol.version: "2.5" # ...
Copy to Clipboard Copied! 如果没有配置
log.message.format
,AMQ Streams 会在下一步中将这些版本更新至 Kafka 版本后自动将这些版本更新至当前的默认值。.version
和 inter.broker.protocol.version注意log.message.format.version
和inter.broker.protocol.version
的值必须是字符串,以防止它们被解释为浮动点数。更改
Kafka.spec.kafka.version
以指定新的 Kafka 版本;保留log.message.format.version
和inter.broker.protocol.version
,默认为 当前 Kafka 版本。注意更改
kafka.version
可确保集群中的所有代理都将升级为使用新的代理二进制文件。在此过程中,一些代理使用旧二进制文件,另一些则已升级至新二进制文件。不要更改inter.broker.protocol.version
,确保代理可以在升级过程中继续相互通信。例如,如果从 Kafka 2.5.0 升级到 2.6.0:
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka spec: # ... kafka: version: 2.6.0 config: log.message.format.version: "2.5" inter.broker.protocol.version: "2.5" # ...
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka spec: # ... kafka: version: 2.6.0
1 config: log.message.format.version: "2.5"
2 inter.broker.protocol.version: "2.5"
3 # ...
Copy to Clipboard Copied! 警告如果新 Kafka 版本的
inter.broker.protocol.version
发生变化,则无法降级 Kafka。broker 协议版本决定了代理存储的持久元数据的模式,包括写入到__consumer_offset
的消息。降级集群将不会理解消息。如果在 Kafka 自定义资源(在 Kafka
.spec.kafka.image 中)中定义了 Kafka
集群的镜像,请将镜像
更新为指向具有新 Kafka 版本的容器镜像。请参阅 Kafka 版本和镜像映射
保存并退出编辑器,然后等待滚动更新完成。
通过观察 pod 状态转换来检查滚动更新的进度:
oc get pods my-cluster-kafka-0 -o jsonpath='{.spec.containers[0].image}'
oc get pods my-cluster-kafka-0 -o jsonpath='{.spec.containers[0].image}'
Copy to Clipboard Copied! 滚动更新可确保每个 pod 在新版本的 Kafka 中使用代理二进制文件。
根据您选择的 升级客户端策略,升级所有客户端应用以使用新版本的客户端二进制文件。
如果需要,将 Kafka Connect 和 MirrorMaker 的
version
属性设置为 Kafka 的新版本:-
对于 Kafka Connect,更新
KafkaConnect.spec.version
。 -
对于 MirrorMaker,更新
KafkaMirrorMaker.spec.version
。 -
对于 MirrorMaker 2.0,更新
KafkaMirrorMaker2.spec.version
。
-
对于 Kafka Connect,更新
如果配置,更新 Kafka 资源,以使用新的
inter.broker.protocol.version
版本。否则,请转至第 9 步。例如,如果升级到 Kafka 2.6.0:
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka spec: # ... kafka: version: 2.6.0 config: log.message.format.version: "2.5" inter.broker.protocol.version: "2.6" # ...
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka spec: # ... kafka: version: 2.6.0 config: log.message.format.version: "2.5" inter.broker.protocol.version: "2.6" # ...
Copy to Clipboard Copied! - 等待 Cluster Operator 更新集群。
如果配置,更新 Kafka 资源,以使用新的
log.message.format.version
版本。否则,请转至第 10 步。例如,如果升级到 Kafka 2.6.0:
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka spec: # ... kafka: version: 2.6.0 config: log.message.format.version: "2.6" inter.broker.protocol.version: "2.6" # ...
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka spec: # ... kafka: version: 2.6.0 config: log.message.format.version: "2.6" inter.broker.protocol.version: "2.6" # ...
Copy to Clipboard Copied! 等待 Cluster Operator 更新集群。
- Kafka 集群和客户端现在使用新的 Kafka 版本。
- 代理配置为使用 Kafka 的新版本的 Inter-broker 协议版本和消息格式发送消息。
在 Kafka 升级后,如果需要,您可以:
7.1.3.4. 更新监听程序配置
AMQ Streams 提供 GenericKafkaListener
模式,用于在 Kafka 资源
中配置 Kafka 侦听器。
GenericKafkaListener
取代 KafkaListeners
模式,后者已被弃用。
使用 GenericKafkaListener
模式时,您可以根据需要配置多个监听器,只要它们的名称和端口是唯一的。监听器
配置被定义为数组,但已弃用的格式也受到支持。
对于 OpenShift 集群内的客户端,您可以创建 普通
(无需加密)或 tls
内部 侦听器。
对于 OpenShift 集群外的客户端,您可以创建 外部 侦听器并指定连接机制,可以是 节点端口
、负载均衡器、
入口
或 路由
。
KafkaListeners
模式将子属性用于 纯文本
、tls
和外部
监听程序,以及每个节点的固定端口。升级 Kafka 后,您可以将使用 KafkaListeners
模式配置的监听程序转换为 GenericKafkaListener
模式的格式。
例如,如果您当前在 Kafka
配置中使用以下配置:
旧监听程序配置
listeners: plain: # ... tls: # ... external: type: loadbalancer # ...
listeners:
plain:
# ...
tls:
# ...
external:
type: loadbalancer
# ...
使用以下方法将监听程序转换为新格式:
新的监听程序配置
listeners: #... - name: plain port: 9092 type: internal tls: false - name: tls port: 9093 type: internal tls: true - name: external port: 9094 type: EXTERNAL-LISTENER-TYPE tls: true
listeners:
#...
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
- name: external
port: 9094
type: EXTERNAL-LISTENER-TYPE
tls: true
确保使用显示 的确切 名称和端口号。
对于任何其他 配置
或 覆盖
使用旧格式的属性,您需要将它们更新为新格式。
对监听程序 配置
进行了更改:
-
覆盖
与configuration
部分合并 -
dnsAnnotations
已重命名为注解
-
preferredAddressType
被重命名为preferredNodePortAddressType
-
地址
已被重命名为alternativesNames
-
LoadBalancerSourceRanges 和 external
TrafficPolicy
移至现在已弃用的模板
中的监听程序配置
例如,这个配置:
旧的其他监听程序配置
listeners: external: type: loadbalancer authentication: type: tls overrides: bootstrap: dnsAnnotations: #...
listeners:
external:
type: loadbalancer
authentication:
type: tls
overrides:
bootstrap:
dnsAnnotations:
#...
更改:
新的附加监听程序配置
listeners: #... - name: external port: 9094 type:loadbalancer tls: true authentication: type: tls configuration: bootstrap: annotations: #...
listeners:
#...
- name: external
port: 9094
type:loadbalancer
tls: true
authentication:
type: tls
configuration:
bootstrap:
annotations:
#...
新监听器配置中显示的名称和端口号 必须 用于向后兼容。使用任何其他值将导致 Kafka 侦听器和 OpenShift 服务重命名。
有关各种侦听器可用的配置选项的更多信息,请参阅 GenericKafkaListener
模式参考。
7.1.3.5. 升级消费者和 Kafka Streams 应用程序以合作重新平衡
您可以升级 Kafka 用户和 Kafka Streams 应用程序,以使用 增量合作重新平衡 协议进行分区重新平衡,而不是默认的预先 重新平衡 协议。在 Kafka 2.4.0 中添加了新协议。
消费者将分区分配保持在合作重新平衡状态,并仅在流程结束时予以撤销(如果需要)来实现平衡集群。这可减少使用者组或 Kafka Streams 应用程序的不可用。
升级到增量合作重新平衡协议是可选的。预先重新平衡协议仍然受到支持。
先决条件
- 您已将 Kafka 代理和客户端应用程序升级到 Kafka 2.6.0。
步骤
要升级 Kafka 消费者以使用增量合作重新平衡协议:
-
将 Kafka client
.jar
文件替换为新版本。 -
在使用者配置中,将
合作粘滞
附加到partition.assignment.strategy
。例如,如果设定了范围
策略,请将配置更改为范围,协作粘性
。 - 依次重新启动组中的每个使用者,等待使用者在每次重启后重新加入组。
-
通过从使用者配置中删除先前的
partition.assignment.strategy 来
重新配置组中的每个消费者,仅保留协作式
策略。 - 依次重新启动组中的每个使用者,等待使用者在每次重启后重新加入组。
升级 Kafka Streams 应用程序以使用增量合作重新平衡协议:
-
将 Kafka Streams
.jar
文件替换为新版本。 -
在 Kafka Streams 配置中,将
upgrade.from
配置参数设置为您要从中升级的 Kafka 版本(例如 2.3)。 - 依次重新启动每个流处理器(节点)。
-
从 Kafka Streams 配置中删除
upgrade.from
配置参数。 - 依次重新启动组中的每个消费者。
其他资源
- Apache Kafka 文档中的 2.4.0 中的显著变化。