Chapter 17. Upgrading AMQ Streams and Kafka
Upgrade your Kafka cluster with no downtime. Releases of AMQ Streams for Red Hat Enterprise Linux support one version of Apache Kafka. You upgrade to the latest supported version of Kafka when you install the latest version of AMQ Streams.
17.1. Upgrade prerequisites Copy linkLink copied to clipboard!
Before you begin the upgrade process, make sure you are familiar with any upgrade changes described in the AMQ Streams 2.4 on Red Hat Enterprise Linux Release Notes.
Refer to the documentation supporting a specific version of AMQ Streams for information on how to upgrade to that version.
17.2. Kafka versions Copy linkLink copied to clipboard!
Kafka’s log message format version and inter-broker protocol version specify, respectively, the log format version appended to messages and the version of the Kafka protocol used in a cluster. To ensure the correct versions are used, the upgrade process involves making configuration changes to existing Kafka brokers and code changes to client applications (consumers and producers).
The following table shows the differences between Kafka versions:
| AMQ Streams version | Kafka version | Inter-broker protocol version | Log message format version | ZooKeeper version |
|---|---|---|---|---|
| 2.4 | 3.4.0 | 3.4 | 3.4 | 3.6.3 |
| 2.3 | 3.3.1 | 3.3 | 3.3 | 3.6.3 |
AMQ Streams 2.4 uses Kafka 3.4.0, but Kafka 3.3.1 is also supported for the purpose of upgrading.
Inter-broker protocol version
In Kafka, the network protocol used for inter-broker communication is called the inter-broker protocol. Each version of Kafka has a compatible version of the inter-broker protocol. The minor version of the protocol typically increases to match the minor version of Kafka, as shown in the preceding table.
The inter-broker protocol version is set cluster wide in the Kafka resource. To change it, you edit the inter.broker.protocol.version property in Kafka.spec.kafka.config.
Log message format version
When a producer sends a message to a Kafka broker, the message is encoded using a specific format. The format can change between Kafka releases, so messages specify which version of the message format they were encoded with.
The properties used to set a specific message format version are as follows:
-
message.format.versionproperty for topics -
log.message.format.versionproperty for Kafka brokers
From Kafka 3.0.0, the message format version values are assumed to match the inter.broker.protocol.version and don’t need to be set. The values reflect the Kafka version used.
When upgrading to Kafka 3.0.0 or higher, you can remove these settings when you update the inter.broker.protocol.version. Otherwise, set the message format version based on the Kafka version you are upgrading to.
The default value of message.format.version for a topic is defined by the log.message.format.version that is set on the Kafka broker. You can manually set the message.format.version of a topic by modifying its topic configuration.
17.3. Strategies for upgrading clients Copy linkLink copied to clipboard!
Upgrading Kafka clients ensures that they benefit from the features, fixes, and improvements that are introduced in new versions of Kafka. Upgraded clients maintain compatibility with other upgraded Kafka components. The performance and stability of the clients might also be improved.
Consider the best approach for upgrading Kafka clients and brokers to ensure a smooth transition. The chosen upgrade strategy depends on whether you are upgrading brokers or clients first. Since Kafka 3.0, you can upgrade brokers and client independently and in any order. The decision to upgrade clients or brokers first depends on several factors, such as the number of applications that need to be upgraded and how much downtime is tolerable.
If you upgrade clients before brokers, some new features may not work as they are not yet supported by brokers. However, brokers can handle producers and consumers running with different versions and supporting different log message versions.
Upgrading clients when using Kafka versions older than Kafka 3.0
Before Kafka 3.0, you would configure a specific message format for brokers using the log.message.format.version property (or the message.format.version property at the topic level). This allowed brokers to support older Kafka clients that were using an outdated message format. Otherwise, the brokers would need to convert the messages from the older clients, which came with a significant performance cost.
Apache Kafka Java clients have supported the latest message format version since version 0.11. If all of your clients are using the latest message version, you can remove the log.message.format.version or message.format.version overrides when upgrading your brokers.
However, if you still have clients that are using an older message format version, we recommend upgrading your clients first. Start with the consumers, then upgrade the producers before removing the log.message.format.version or message.format.version overrides when upgrading your brokers. This will ensure that all of your clients can support the latest message format version and that the upgrade process goes smoothly.
You can track Kafka client names and versions using this metric:
-
kafka.server:type=socket-server-metrics,clientSoftwareName=<name>,clientSoftwareVersion=<version>,listener=<listener>,networkProcessor=<processor>
The following Kafka broker metrics help monitor the performance of message down-conversion:
-
kafka.network:type=RequestMetrics,name=MessageConversionsTimeMs,request={Produce|Fetch}provides metrics on the time taken to perform message conversion. -
kafka.server:type=BrokerTopicMetrics,name={Produce|Fetch}MessageConversionsPerSec,topic=([-.\w]+)provides metrics on the number of messages converted over a period of time.
17.4. Upgrading Kafka brokers and ZooKeeper Copy linkLink copied to clipboard!
Upgrade Kafka brokers and ZooKeeper on a host machine to use the latest version of AMQ Streams. You update the installation files, then configure and restart all Kafka brokers to use a new inter-broker protocol version. After performing these steps, data is transmitted between the Kafka brokers using the new inter-broker protocol version.
From Kafka 3.0.0, message format version values are assumed to match the inter.broker.protocol.version and don’t need to be set. The values reflect the Kafka version used.
Prerequisites
-
You are logged in to Red Hat Enterprise Linux as the
kafkauser. You have installed Kafka and other Kafka components you are using on separate hosts.
For more information, see Section 3.1, “Installation environment”.
- You have downloaded the installation files.
Procedure
For each Kafka broker in your AMQ Streams cluster and one at a time:
Download the AMQ Streams archive from the AMQ Streams software downloads page.
NoteIf prompted, log in to your Red Hat account.
On the command line, create a temporary directory and extract the contents of the
amq-streams-<version>-bin.zipfile.mkdir /tmp/kafka unzip amq-streams-<version>-bin.zip -d /tmp/kafka
mkdir /tmp/kafka unzip amq-streams-<version>-bin.zip -d /tmp/kafkaCopy to Clipboard Copied! Toggle word wrap Toggle overflow If running, stop ZooKeeper and the Kafka broker running on the host.
/opt/kafka/bin/zookeeper-server-stop.sh /opt/kafka/bin/kafka-server-stop.sh jcmd | grep zookeeper jcmd | grep kafka
/opt/kafka/bin/zookeeper-server-stop.sh /opt/kafka/bin/kafka-server-stop.sh jcmd | grep zookeeper jcmd | grep kafkaCopy to Clipboard Copied! Toggle word wrap Toggle overflow If you are running Kafka on a multi-node cluster, see Section 4.3, “Performing a graceful rolling restart of Kafka brokers”.
Delete the
libsandbindirectories from your existing installation:rm -rf /opt/kafka/libs /opt/kafka/bin
rm -rf /opt/kafka/libs /opt/kafka/binCopy to Clipboard Copied! Toggle word wrap Toggle overflow Copy the
libsandbindirectories from the temporary directory:cp -r /tmp/kafka/kafka_<version>/libs /opt/kafka/ cp -r /tmp/kafka/kafka_<version>/bin /opt/kafka/
cp -r /tmp/kafka/kafka_<version>/libs /opt/kafka/ cp -r /tmp/kafka/kafka_<version>/bin /opt/kafka/Copy to Clipboard Copied! Toggle word wrap Toggle overflow -
If required, update the configuration files in the
configdirectory to reflect any changes in the new versions. Delete the temporary directory.
rm -r /tmp/kafka
rm -r /tmp/kafkaCopy to Clipboard Copied! Toggle word wrap Toggle overflow Edit the
/opt/kafka/config/server.propertiesproperties file.Set the
inter.broker.protocol.versionandlog.message.format.versionproperties to the current version.For example, the current version is 3.3 if upgrading from Kafka version 3.3.1 to 3.4.0:
inter.broker.protocol.version=3.3 log.message.format.version=3.3
inter.broker.protocol.version=3.3 log.message.format.version=3.3Copy to Clipboard Copied! Toggle word wrap Toggle overflow Use the correct version for the Kafka version you are upgrading from (
3.2,3.3, and so on). Leaving theinter.broker.protocol.versionunchanged at the current setting ensures that the brokers can continue to communicate with each other throughout the upgrade.If the properties are not configured, add them with the current version.
If you are upgrading from Kafka 3.0.0 or later, you only need to set the
inter.broker.protocol.version.Restart the updated ZooKeeper and Kafka broker:
/opt/kafka/bin/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
/opt/kafka/bin/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.propertiesCopy to Clipboard Copied! Toggle word wrap Toggle overflow The Kafka broker and ZooKeeper start using the binaries for the latest Kafka version.
For information on restarting brokers in a multi-node cluster, see Section 4.3, “Performing a graceful rolling restart of Kafka brokers”.
Verify that the restarted Kafka broker has caught up with the partition replicas it is following.
Use the
kafka-topics.shtool to ensure that all replicas contained in the broker are back in sync. For instructions, see Listing and describing topics.In the next steps, update your Kafka brokers to use the new inter-broker protocol version.
Update each broker, one at a time.
WarningDowngrading AMQ Streams is not possible after completing the following steps.
- Depending on your chosen strategy for upgrading clients, upgrade all client applications to use the new version of the client binaries.
Set the
inter.broker.protocol.versionproperty to3.4in the/opt/kafka/config/server.propertiesfile:inter.broker.protocol.version=3.4
inter.broker.protocol.version=3.4Copy to Clipboard Copied! Toggle word wrap Toggle overflow On the command line, stop the Kafka broker that you modified:
/opt/kafka/bin/kafka-server-stop.sh
/opt/kafka/bin/kafka-server-stop.shCopy to Clipboard Copied! Toggle word wrap Toggle overflow Check that Kafka is not running:
jcmd | grep kafka
jcmd | grep kafkaCopy to Clipboard Copied! Toggle word wrap Toggle overflow Restart the Kafka broker that you modified:
/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.propertiesCopy to Clipboard Copied! Toggle word wrap Toggle overflow Check that Kafka is running:
jcmd | grep kafka
jcmd | grep kafkaCopy to Clipboard Copied! Toggle word wrap Toggle overflow If you are upgrading from a version earlier than Kafka 3.0.0, set the
log.message.format.versionproperty to3.4in the/opt/kafka/config/server.propertiesfile:log.message.format.version=3.4
log.message.format.version=3.4Copy to Clipboard Copied! Toggle word wrap Toggle overflow On the command line, stop the Kafka broker that you modified:
/opt/kafka/bin/kafka-server-stop.sh
/opt/kafka/bin/kafka-server-stop.shCopy to Clipboard Copied! Toggle word wrap Toggle overflow Check that Kafka is not running:
jcmd | grep kafka
jcmd | grep kafkaCopy to Clipboard Copied! Toggle word wrap Toggle overflow Restart the Kafka broker that you modified:
/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.propertiesCopy to Clipboard Copied! Toggle word wrap Toggle overflow Check that Kafka is running:
jcmd | grep kafka
jcmd | grep kafkaCopy to Clipboard Copied! Toggle word wrap Toggle overflow Verify that the restarted Kafka broker has caught up with the partition replicas it is following.
Use the
kafka-topics.shtool to ensure that all replicas contained in the broker are back in sync. For instructions, see Listing and describing topics.-
If it was used in the upgrade, remove the legacy
log.message.format.versionconfiguration from theserver.propertiesfile.
17.5. Upgrading Kafka components Copy linkLink copied to clipboard!
Upgrade Kafka components on a host machine to use the latest version of AMQ Streams. You can use the AMQ Streams installation files to upgrade the following components:
- Kafka Connect
- MirrorMaker
- Kafka Bridge (separate ZIP file)
Prerequisites
-
You are logged in to Red Hat Enterprise Linux as the
kafkauser. - You have downloaded the installation files.
You have installed Kafka and other Kafka components you are using on separate hosts.
For more information, see Section 3.1, “Installation environment”.
You have upgraded Kafka and ZooKeeper.
If a Kafka component is running on the same host as Kafka and ZooKeeper, you’ll also need to stop and start Kafka and ZooKeeper when upgrading.
Procedure
For each host running an instance of the Kafka component:
Download the AMQ Streams or Kafka Bridge installation files from the AMQ Streams software downloads page.
NoteIf prompted, log in to your Red Hat account.
On the command line, create a temporary directory and extract the contents of the
amq-streams-<version>-bin.zipfile.mkdir /tmp/kafka unzip amq-streams-<version>-bin.zip -d /tmp/kafka
mkdir /tmp/kafka unzip amq-streams-<version>-bin.zip -d /tmp/kafkaCopy to Clipboard Copied! Toggle word wrap Toggle overflow For Kafka Bridge, extract the
amq-streams-<version>-bridge-bin.zipfile.- If running, stop the Kafka component running on the host.
Delete the
libsandbindirectories from your existing installation:rm -rf /opt/kafka/libs /opt/kafka/bin
rm -rf /opt/kafka/libs /opt/kafka/binCopy to Clipboard Copied! Toggle word wrap Toggle overflow Copy the
libsandbindirectories from the temporary directory:cp -r /tmp/kafka/kafka_<version>/libs /opt/kafka/ cp -r /tmp/kafka/kafka_<version>/bin /opt/kafka/
cp -r /tmp/kafka/kafka_<version>/libs /opt/kafka/ cp -r /tmp/kafka/kafka_<version>/bin /opt/kafka/Copy to Clipboard Copied! Toggle word wrap Toggle overflow -
If required, update the configuration files in the
configdirectory to reflect any changes in the new versions. Delete the temporary directory.
rm -r /tmp/kafka
rm -r /tmp/kafkaCopy to Clipboard Copied! Toggle word wrap Toggle overflow Start the Kafka component using the appropriate script and properties files.
Starting Kafka Connect in standalone mode
/opt/kafka/bin/connect-standalone.sh \ /opt/kafka/config/connect-standalone.properties <connector1>.properties [<connector2>.properties ...]
/opt/kafka/bin/connect-standalone.sh \ /opt/kafka/config/connect-standalone.properties <connector1>.properties [<connector2>.properties ...]Copy to Clipboard Copied! Toggle word wrap Toggle overflow Starting Kafka Connect in distributed mode
/opt/kafka/bin/connect-distributed.sh \ /opt/kafka/config/connect-distributed.properties
/opt/kafka/bin/connect-distributed.sh \ /opt/kafka/config/connect-distributed.propertiesCopy to Clipboard Copied! Toggle word wrap Toggle overflow Starting MirrorMaker 2 in dedicated mode
/opt/kafka/bin/connect-mirror-maker.sh \ /opt/kafka/config/connect-mirror-maker.properties
/opt/kafka/bin/connect-mirror-maker.sh \ /opt/kafka/config/connect-mirror-maker.propertiesCopy to Clipboard Copied! Toggle word wrap Toggle overflow Starting Kafka Bridge
su - kafka ./bin/kafka_bridge_run.sh \ --config-file=<path>/application.properties
su - kafka ./bin/kafka_bridge_run.sh \ --config-file=<path>/application.propertiesCopy to Clipboard Copied! Toggle word wrap Toggle overflow Verify that the Kafka component is running, and producing or consuming data as expected.
Verifying Kafka Connect in standalone mode is running
jcmd | grep ConnectStandalone
jcmd | grep ConnectStandaloneCopy to Clipboard Copied! Toggle word wrap Toggle overflow Verifying Kafka Connect in distributed mode is running
jcmd | grep ConnectDistributed
jcmd | grep ConnectDistributedCopy to Clipboard Copied! Toggle word wrap Toggle overflow Verifying MirrorMaker 2 in dedicated mode is running
jcmd | grep mirrorMaker
jcmd | grep mirrorMakerCopy to Clipboard Copied! Toggle word wrap Toggle overflow Verifying Kafka Bridge is running by checking the log
HTTP-Kafka Bridge started and listening on port 8080 HTTP-Kafka Bridge bootstrap servers localhost:9092
HTTP-Kafka Bridge started and listening on port 8080 HTTP-Kafka Bridge bootstrap servers localhost:9092Copy to Clipboard Copied! Toggle word wrap Toggle overflow
17.6. Upgrading consumers and Kafka Streams applications to cooperative rebalancing Copy linkLink copied to clipboard!
Following a Kafka upgrade, if required, you can upgrade Kafka consumers and Kafka Streams applications to use the incremental cooperative rebalance protocol for partition rebalances instead of the default eager rebalance protocol. The new protocol was added in Kafka 2.4.0.
Consumers keep their partition assignments in a cooperative rebalance and only revoke them at the end of the process, if needed to achieve a balanced cluster. This reduces the unavailability of the consumer group or Kafka Streams application.
Upgrading to the incremental cooperative rebalance protocol is optional. The eager rebalance protocol is still supported.
Prerequisites
- You have upgraded your Kafka brokers
Procedure
To upgrade a Kafka consumer to use the incremental cooperative rebalance protocol:
-
Replace the Kafka clients
.jarfile with the new version. -
In the consumer configuration, append
cooperative-stickyto thepartition.assignment.strategy. For example, if therangestrategy is set, change the configuration torange, cooperative-sticky. - Restart each consumer in the group in turn, waiting for the consumer to rejoin the group after each restart.
-
Reconfigure each consumer in the group by removing the earlier
partition.assignment.strategyfrom the consumer configuration, leaving only thecooperative-stickystrategy. - Restart each consumer in the group in turn, waiting for the consumer to rejoin the group after each restart.
To upgrade a Kafka Streams application to use the incremental cooperative rebalance protocol:
-
Replace the Kafka Streams
.jarfile with the new version. -
In the Kafka Streams configuration, set the
upgrade.fromconfiguration parameter to the Kafka version you are upgrading from (for example, 2.3). - Restart each of the stream processors (nodes) in turn.
-
Remove the
upgrade.fromconfiguration parameter from the Kafka Streams configuration. - Restart each consumer in the group in turn.