Chapter 17. AMQ Streams and Kafka upgrades
AMQ Streams can be upgraded with no cluster downtime. Each version of AMQ Streams supports one or more versions of Apache Kafka: you can upgrade to a higher Kafka version as long as it is supported by your version of AMQ Streams. Newer versions of AMQ Streams may support newer versions of Kafka, but you need to upgrade AMQ Streams before you can upgrade to a higher supported Kafka version.
17.1. Upgrade prerequisites Copy linkLink copied to clipboard!
Before you begin the upgrade process, make sure that:
- AMQ Streams is installed. For instructions, see Chapter 2, Getting started.
- You are familiar with any upgrade changes described in the AMQ Streams 2.0 on Red Hat Enterprise Linux Release Notes.
17.2. Upgrade process Copy linkLink copied to clipboard!
Upgrading AMQ Streams is a two-stage process. To upgrade brokers and clients without downtime, you must complete the upgrade procedures in the following order:
Upgrade to the latest AMQ Streams version.
Upgrade all Kafka brokers and client applications to the latest Kafka version
17.3. Kafka versions Copy linkLink copied to clipboard!
Kafka’s log message format version and inter-broker protocol version specify, respectively, the log format version appended to messages and the version of the Kafka protocol used in a cluster. To ensure the correct versions are used, the upgrade process involves making configuration changes to existing Kafka brokers and code changes to client applications (consumers and producers).
The following table shows the differences between Kafka versions:
| Kafka version | Interbroker protocol version | Log message format version | ZooKeeper version |
|---|---|---|---|
| 3.0.0 | 3.0 | 3.0 | 3.6.3 |
| 2.8.0 | 2.8 | 2.8 | 3.5.9 |
Inter-broker protocol version
In Kafka, the network protocol used for inter-broker communication is called the inter-broker protocol. Each version of Kafka has a compatible version of the inter-broker protocol. The minor version of the protocol typically increases to match the minor version of Kafka, as shown in the preceding table.
The inter-broker protocol version is set cluster wide in the Kafka resource. To change it, you edit the inter.broker.protocol.version property in Kafka.spec.kafka.config.
Log message format version
When a producer sends a message to a Kafka broker, the message is encoded using a specific format. The format can change between Kafka releases, so messages specify which version of the message format they were encoded with.
The properties used to set a specific message format version are as follows:
-
message.format.versionproperty for topics -
log.message.format.versionproperty for Kafka brokers
From Kafka 3.0.0, the message format version values are assumed to match the inter.broker.protocol.version and don’t need to be set. The values reflect the Kafka version used.
When upgrading to Kafka 3.0.0 or higher, you can remove these settings when you update the inter.broker.protocol.version. Otherwise, set the message format version based on the Kafka version you are upgrading to.
The default value of message.format.version for a topic is defined by the log.message.format.version that is set on the Kafka broker. You can manually set the message.format.version of a topic by modifying its topic configuration.
17.4. Upgrading to AMQ Streams 2.0 Copy linkLink copied to clipboard!
The steps to upgrade your deployment to use AMQ Streams 2.0 are outlined in this section.
The availability of Kafka clusters managed by AMQ Streams is not affected by the upgrade operation.
Refer to the documentation supporting a specific version of AMQ Streams for information on how to upgrade to that version.
17.4.1. Upgrading Kafka brokers and ZooKeeper Copy linkLink copied to clipboard!
This procedure describes how to upgrade Kafka brokers and ZooKeeper on a host machine to use the latest version of AMQ Streams.
Prerequisites
-
You are logged in to Red Hat Enterprise Linux as the
kafkauser.
Procedure
For each Kafka broker in your AMQ Streams cluster and one at a time:
Download the AMQ Streams archive from the Customer Portal.
NoteIf prompted, log in to your Red Hat account.
On the command line, create a temporary directory and extract the contents of the
amq-streams-x.y.z-bin.zipfile.mkdir /tmp/kafka unzip amq-streams-x.y.z-bin.zip -d /tmp/kafka
mkdir /tmp/kafka unzip amq-streams-x.y.z-bin.zip -d /tmp/kafkaCopy to Clipboard Copied! Toggle word wrap Toggle overflow If running, stop ZooKeeper and the Kafka broker running on the host.
/opt/kafka/bin/zookeeper-server-stop.sh /opt/kafka/bin/kafka-server-stop.sh jcmd | grep zookeeper jcmd | grep kafka
/opt/kafka/bin/zookeeper-server-stop.sh /opt/kafka/bin/kafka-server-stop.sh jcmd | grep zookeeper jcmd | grep kafkaCopy to Clipboard Copied! Toggle word wrap Toggle overflow Delete the
libs,bin, anddocsdirectories from your existing installation:rm -rf /opt/kafka/libs /opt/kafka/bin /opt/kafka/docs
rm -rf /opt/kafka/libs /opt/kafka/bin /opt/kafka/docsCopy to Clipboard Copied! Toggle word wrap Toggle overflow Copy the
libs,bin, anddocsdirectories from the temporary directory:cp -r /tmp/kafka/kafka_y.y-x.x.x/libs /opt/kafka/ cp -r /tmp/kafka/kafka_y.y-x.x.x/bin /opt/kafka/ cp -r /tmp/kafka/kafka_y.y-x.x.x/docs /opt/kafka/
cp -r /tmp/kafka/kafka_y.y-x.x.x/libs /opt/kafka/ cp -r /tmp/kafka/kafka_y.y-x.x.x/bin /opt/kafka/ cp -r /tmp/kafka/kafka_y.y-x.x.x/docs /opt/kafka/Copy to Clipboard Copied! Toggle word wrap Toggle overflow Delete the temporary directory.
rm -r /tmp/kafka
rm -r /tmp/kafkaCopy to Clipboard Copied! Toggle word wrap Toggle overflow -
In a text editor, open the broker properties file, commonly stored in the
/opt/kafka/config/directory. Check that the
inter.broker.protocol.versionandlog.message.format.versionproperties are set to the current version:inter.broker.protocol.version=2.8 log.message.format.version=2.8
inter.broker.protocol.version=2.8 log.message.format.version=2.8Copy to Clipboard Copied! Toggle word wrap Toggle overflow Leaving the
inter.broker.protocol.versionunchanged ensures that the brokers can continue to communicate with each other throughout the upgrade.If the properties are not configured, add them with the current version.
Restart the updated ZooKeeper and Kafka broker:
/opt/kafka/bin/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
/opt/kafka/bin/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.propertiesCopy to Clipboard Copied! Toggle word wrap Toggle overflow The Kafka broker and Zookeeper will start using the binaries for the latest Kafka version.
-
Verify that the restarted Kafka broker has caught up with the partition replicas it is following. Use the
kafka-topics.shtool to ensure that all replicas contained in the broker are back in sync. For instructions, see Listing and describing topics. - Perform the procedures to upgrade Kafka, as described in Section 17.5, “Upgrading Kafka”.
17.4.2. Upgrading Kafka Connect Copy linkLink copied to clipboard!
This procedure describes how to upgrade a Kafka Connect cluster on a host machine.
Prerequisites
-
You are logged in to Red Hat Enterprise Linux as the
kafkauser. - Kafka Connect is not started.
Procedure
For each Kafka broker in your AMQ Streams cluster and one at a time:
Download the AMQ Streams archive from the Customer Portal.
NoteIf prompted, log in to your Red Hat account.
On the command line, create a temporary directory and extract the contents of the
amq-streams-x.y.z-bin.zipfile.mkdir /tmp/kafka unzip amq-streams-x.y.z-bin.zip -d /tmp/kafka
mkdir /tmp/kafka unzip amq-streams-x.y.z-bin.zip -d /tmp/kafkaCopy to Clipboard Copied! Toggle word wrap Toggle overflow If running, stop the Kafka broker and ZooKeeper running on the host.
/opt/kafka/bin/kafka-server-stop.sh /opt/kafka/bin/zookeeper-server-stop.sh
/opt/kafka/bin/kafka-server-stop.sh /opt/kafka/bin/zookeeper-server-stop.shCopy to Clipboard Copied! Toggle word wrap Toggle overflow Delete the
libs,bin, anddocsdirectories from your existing installation:rm -rf /opt/kafka/libs /opt/kafka/bin /opt/kafka/docs
rm -rf /opt/kafka/libs /opt/kafka/bin /opt/kafka/docsCopy to Clipboard Copied! Toggle word wrap Toggle overflow Copy the
libs,bin, anddocsdirectories from the temporary directory:cp -r /tmp/kafka/kafka_y.y-x.x.x/libs /opt/kafka/ cp -r /tmp/kafka/kafka_y.y-x.x.x/bin /opt/kafka/ cp -r /tmp/kafka/kafka_y.y-x.x.x/docs /opt/kafka/
cp -r /tmp/kafka/kafka_y.y-x.x.x/libs /opt/kafka/ cp -r /tmp/kafka/kafka_y.y-x.x.x/bin /opt/kafka/ cp -r /tmp/kafka/kafka_y.y-x.x.x/docs /opt/kafka/Copy to Clipboard Copied! Toggle word wrap Toggle overflow Delete the temporary directory.
rm -r /tmp/kafka
rm -r /tmp/kafkaCopy to Clipboard Copied! Toggle word wrap Toggle overflow Start Kafka Connect in either standalone or distributed mode.
To start in standalone mode, run the
connect-standalone.shscript. Specify the Kafka Connect standalone configuration file and the configuration files of your Kafka Connect connectors.su - kafka /opt/kafka/bin/connect-standalone.sh /opt/kafka/config/connect-standalone.properties connector1.properties [connector2.properties ...]
su - kafka /opt/kafka/bin/connect-standalone.sh /opt/kafka/config/connect-standalone.properties connector1.properties [connector2.properties ...]Copy to Clipboard Copied! Toggle word wrap Toggle overflow To start in distributed mode, start the Kafka Connect workers with the
/opt/kafka/config/connect-distributed.propertiesconfiguration file on all Kafka Connect nodes:su - kafka /opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.properties
su - kafka /opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.propertiesCopy to Clipboard Copied! Toggle word wrap Toggle overflow
Verify that Kafka Connect is running:
In standalone mode:
jcmd | grep ConnectStandalone
jcmd | grep ConnectStandaloneCopy to Clipboard Copied! Toggle word wrap Toggle overflow In distributed mode:
jcmd | grep ConnectDistributed
jcmd | grep ConnectDistributedCopy to Clipboard Copied! Toggle word wrap Toggle overflow
- Verify that Kafka Connect is producing and consuming data as expected.
Additional resources
17.5. Upgrading Kafka Copy linkLink copied to clipboard!
After you have upgraded your binaries to use the latest version of AMQ Streams, you can upgrade your brokers to use a higher supported version of Kafka.
Following the Kafka upgrade, if required, you can upgrade Kafka consumers to use the incremental cooperative rebalance protocol.
17.5.1. Upgrading Kafka brokers to use the new inter-broker protocol version Copy linkLink copied to clipboard!
Manually configure and restart all Kafka brokers to use the new inter-broker protocol version. After performing these steps, data is transmitted between the Kafka brokers using the new inter-broker protocol version.
From Kafka 3.0.0, message format version values are assumed to match the inter.broker.protocol.version and don’t need to be set. The values reflect the Kafka version used.
Messages received are still appended to the message logs in the earlier message format version.
Downgrading AMQ Streams is not possible after completing this procedure.
Prerequisites
- You have updated the ZooKeeper binaries and upgraded all Kafka brokers to AMQ Streams 2.0
-
You are logged in to Red Hat Enterprise Linux as the
kafkauser.
Procedure
For each Kafka broker in your AMQ Streams cluster and one at a time:
-
In a text editor, open the broker properties file for the Kafka broker you want to update. Broker properties files are commonly stored in the
/opt/kafka/config/directory. Set the
inter.broker.protocol.versionto3.0.inter.broker.protocol.version=3.0
inter.broker.protocol.version=3.0Copy to Clipboard Copied! Toggle word wrap Toggle overflow On the command line, stop the Kafka broker that you modified:
/opt/kafka/bin/kafka-server-stop.sh jcmd | grep kafka
/opt/kafka/bin/kafka-server-stop.sh jcmd | grep kafkaCopy to Clipboard Copied! Toggle word wrap Toggle overflow Restart the Kafka broker that you modified:
/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.propertiesCopy to Clipboard Copied! Toggle word wrap Toggle overflow -
Verify that the restarted Kafka broker has caught up with the partition replicas it is following. Use the
kafka-topics.shtool to ensure that all replicas contained in the broker are back in sync. For instructions, see Listing and describing topics.
17.5.2. Upgrading consumers and Kafka Streams applications to cooperative rebalancing Copy linkLink copied to clipboard!
You can upgrade Kafka consumers and Kafka Streams applications to use the incremental cooperative rebalance protocol for partition rebalances instead of the default eager rebalance protocol. The new protocol was added in Kafka 2.4.0.
Consumers keep their partition assignments in a cooperative rebalance and only revoke them at the end of the process, if needed to achieve a balanced cluster. This reduces the unavailability of the consumer group or Kafka Streams application.
Upgrading to the incremental cooperative rebalance protocol is optional. The eager rebalance protocol is still supported.
Prerequisites
Procedure
To upgrade a Kafka consumer to use the incremental cooperative rebalance protocol:
-
Replace the Kafka clients
.jarfile with the new version. -
In the consumer configuration, append
cooperative-stickyto thepartition.assignment.strategy. For example, if therangestrategy is set, change the configuration torange, cooperative-sticky. - Restart each consumer in the group in turn, waiting for the consumer to rejoin the group after each restart.
-
Reconfigure each consumer in the group by removing the earlier
partition.assignment.strategyfrom the consumer configuration, leaving only thecooperative-stickystrategy. - Restart each consumer in the group in turn, waiting for the consumer to rejoin the group after each restart.
To upgrade a Kafka Streams application to use the incremental cooperative rebalance protocol:
-
Replace the Kafka Streams
.jarfile with the new version. -
In the Kafka Streams configuration, set the
upgrade.fromconfiguration parameter to the Kafka version you are upgrading from (for example, 2.3). - Restart each of the stream processors (nodes) in turn.
-
Remove the
upgrade.fromconfiguration parameter from the Kafka Streams configuration. - Restart each consumer in the group in turn.