Chapter 18. AMQ Streams and Kafka upgrades
AMQ Streams can be upgraded with no cluster downtime. Each version of AMQ Streams supports one or more versions of Apache Kafka: you can upgrade to a higher Kafka version as long as it is supported by your version of AMQ Streams. Newer versions of AMQ Streams may support newer versions of Kafka, but you need to upgrade AMQ Streams before you can upgrade to a higher supported Kafka version.
18.1. Upgrade prerequisites
Before you begin the upgrade process, make sure that:
- AMQ Streams is installed. For instructions, see Chapter 2, Getting started.
- You are familiar with any upgrade changes described in the AMQ Streams 1.6 on Red Hat Enterprise Linux Release Notes.
18.2. Upgrade process
Upgrading AMQ Streams is a two-stage process. To upgrade brokers and clients without downtime, you must complete the upgrade procedures in the following order:
Upgrade to the latest AMQ Streams version.
Upgrade all Kafka brokers and client applications to the latest Kafka version
18.3. Kafka versions
Kafka’s log message format version and inter-broker protocol version specify the log format version appended to messages and the version of protocol used in a cluster. As a result, the upgrade process involves making configuration changes to existing Kafka brokers and code changes to client applications (consumers and producers) to ensure the correct versions are used.
The following table shows the differences between Kafka versions:
Kafka version | Interbroker protocol version | Log message format version | ZooKeeper version |
---|---|---|---|
2.5.0 | 2.5 | 2.5 | 3.5.8 |
2.6.0 | 2.6 | 2.6 | 3.5.8 |
Message format version
When a producer sends a message to a Kafka broker, the message is encoded using a specific format. The format can change between Kafka releases, so messages include a version identifying which version of the format they were encoded with. You can configure a Kafka broker to convert messages from newer format versions to a given older format version before the broker appends the message to the log.
In Kafka, there are two different methods for setting the message format version:
-
The
message.format.version
property is set on topics. -
The
log.message.format.version
property is set on Kafka brokers.
The default value of message.format.version
for a topic is defined by the log.message.format.version
that is set on the Kafka broker. You can manually set the message.format.version
of a topic by modifying its topic configuration.
The upgrade tasks in this section assume that the message format version is defined by the log.message.format.version
.
18.4. Upgrading to AMQ Streams 1.6
The steps to upgrade your deployment to use AMQ Streams 1.6 are outlined in this section.
The availability of Kafka clusters managed by AMQ Streams is not affected by the upgrade operation.
Refer to the documentation supporting a specific version of AMQ Streams for information on how to upgrade to that version.
18.4.1. Upgrading Kafka brokers and ZooKeeper
This procedure describes how to upgrade Kafka brokers and ZooKeeper on a host machine to use the latest version of AMQ Streams.
Prerequisites
-
You are logged in to Red Hat Enterprise Linux as the
kafka
user.
Procedure
For each Kafka broker in your AMQ Streams cluster and one at a time:
Download the AMQ Streams archive from the Customer Portal.
NoteIf prompted, log in to your Red Hat account.
On the command line, create a temporary directory and extract the contents of the
amq-streams-x.y.z-bin.zip
file.mkdir /tmp/kafka unzip amq-streams-x.y.z-bin.zip -d /tmp/kafka
If running, stop ZooKeeper and the Kafka broker running on the host.
/opt/kafka/bin/zookeeper-server-stop.sh /opt/kafka/bin/kafka-server-stop.sh jcmd | grep zookeeper jcmd | grep kafka
Delete the
libs
,bin
, anddocs
directories from your existing installation:rm -rf /opt/kafka/libs /opt/kafka/bin /opt/kafka/docs
Copy the
libs
,bin
, anddocs
directories from the temporary directory:cp -r /tmp/kafka/kafka_y.y-x.x.x/libs /opt/kafka/ cp -r /tmp/kafka/kafka_y.y-x.x.x/bin /opt/kafka/ cp -r /tmp/kafka/kafka_y.y-x.x.x/docs /opt/kafka/
Delete the temporary directory.
rm -r /tmp/kafka
-
In a text editor, open the broker properties file, commonly stored in the
/opt/kafka/config/
directory. Check that the
inter.broker.protocol.version
andlog.message.format.version
properties are set to the current version:inter.broker.protocol.version=2.5 log.message.format.version=2.5
Leaving the
inter.broker.protocol.version
unchanged ensures that the brokers can continue to communicate with each other throughout the upgrade.If the properties are not configured, add them with the current version.
Restart the updated ZooKeeper and Kafka broker:
/opt/kafka/bin/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
The Kafka broker and Zookeeper will start using the binaries for the latest Kafka version.
-
Verify that the restarted Kafka broker has caught up with the partition replicas it is following. Use the
kafka-topics.sh
tool to ensure that all replicas contained in the broker are back in sync. For instructions, see Listing and describing topics. - Perform the procedures to upgrade Kafka, as described in Section 18.5, “Upgrading Kafka”.
18.4.2. Upgrading Kafka Connect
This procedure describes how to upgrade a Kafka Connect cluster on a host machine.
Kafka Connect is a client application and should be included in your chosen strategy for upgrading clients. For more information, see Strategies for upgrading clients.
Prerequisites
-
You are logged in to Red Hat Enterprise Linux as the
kafka
user. - Kafka Connect is not started.
Procedure
For each Kafka broker in your AMQ Streams cluster and one at a time:
Download the AMQ Streams archive from the Customer Portal.
NoteIf prompted, log in to your Red Hat account.
On the command line, create a temporary directory and extract the contents of the
amq-streams-x.y.z-bin.zip
file.mkdir /tmp/kafka unzip amq-streams-x.y.z-bin.zip -d /tmp/kafka
If running, stop the Kafka broker and ZooKeeper running on the host.
/opt/kafka/bin/kafka-server-stop.sh /opt/kafka/bin/zookeeper-server-stop.sh
Delete the
libs
,bin
, anddocs
directories from your existing installation:rm -rf /opt/kafka/libs /opt/kafka/bin /opt/kafka/docs
Copy the
libs
,bin
, anddocs
directories from the temporary directory:cp -r /tmp/kafka/kafka_y.y-x.x.x/libs /opt/kafka/ cp -r /tmp/kafka/kafka_y.y-x.x.x/bin /opt/kafka/ cp -r /tmp/kafka/kafka_y.y-x.x.x/docs /opt/kafka/
Delete the temporary directory.
rm -r /tmp/kafka
Start Kafka Connect in either standalone or distributed mode.
To start in standalone mode, run the
connect-standalone.sh
script. Specify the Kafka Connect standalone configuration file and the configuration files of your Kafka Connect connectors.su - kafka /opt/kafka/bin/connect-standalone.sh /opt/kafka/config/connect-standalone.properties connector1.properties [connector2.properties ...]
To start in distributed mode, start the Kafka Connect workers with the
/opt/kafka/config/connect-distributed.properties
configuration file on all Kafka Connect nodes:su - kafka /opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.properties
Verify that Kafka Connect is running:
In standalone mode:
jcmd | grep ConnectStandalone
In distributed mode:
jcmd | grep ConnectDistributed
- Verify that Kafka Connect is producing and consuming data as expected.
18.5. Upgrading Kafka
After you have upgraded your binaries to use the latest version of AMQ Streams, you can upgrade your brokers and clients to use a higher supported version of Kafka.
Take care to follow the steps in the correct order:
Following the Kafka upgrade, if required, you can upgrade Kafka consumers to use the incremental cooperative rebalance protocol:
18.5.1. Upgrading Kafka brokers to use the new inter-broker protocol version
Manually configure and restart all Kafka brokers to use the new inter-broker protocol version. After performing these steps, data is transmitted between the Kafka brokers using the new inter-broker protocol version.
Messages received are still appended to the message logs in the earlier message format version.
Downgrading AMQ Streams is not possible after completing this procedure.
Prerequisites
- You have updated the ZooKeeper binaries and upgraded all Kafka brokers to AMQ Streams 1.6
-
You are logged in to Red Hat Enterprise Linux as the
kafka
user.
Procedure
For each Kafka broker in your AMQ Streams cluster and one at a time:
-
In a text editor, open the broker properties file for the Kafka broker you want to update. Broker properties files are commonly stored in the
/opt/kafka/config/
directory. Set the
inter.broker.protocol.version
to2.6
.inter.broker.protocol.version=2.6
On the command line, stop the Kafka broker that you modified:
/opt/kafka/bin/kafka-server-stop.sh jcmd | grep kafka
Restart the Kafka broker that you modified:
/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
-
Verify that the restarted Kafka broker has caught up with the partition replicas it is following. Use the
kafka-topics.sh
tool to ensure that all replicas contained in the broker are back in sync. For instructions, see Listing and describing topics.
18.5.2. Strategies for upgrading clients
The best approach to upgrading your client applications (including Kafka Connect connectors) depends on your particular circumstances.
Consuming applications need to receive messages in a message format that they understand. You can ensure that this is the case in one of two ways:
- By upgrading all the consumers for a topic before upgrading any of the producers.
- By having the brokers down-convert messages to an older format.
Using broker down-conversion puts extra load on the brokers, so it is not ideal to rely on down-conversion for all topics for a prolonged period of time. For brokers to perform optimally they should not be down converting messages at all.
Broker down-conversion is configured in two ways:
-
The topic-level
message.format.version
configures it for a single topic. -
The broker-level
log.message.format.version
is the default for topics that do not have the topic-levelmessage.format.version
configured.
Messages published to a topic in a new-version format will be visible to consumers, because brokers perform down-conversion when they receive messages from producers, not when they are sent to consumers.
There are a number of strategies you can use to upgrade your clients:
- Consumers first
- Upgrade all the consuming applications.
-
Change the broker-level
log.message.format.version
to the new version. Upgrade all the producing applications.
This strategy is straightforward, and avoids any broker down-conversion. However, it assumes that all consumers in your organization can be upgraded in a coordinated way, and it does not work for applications that are both consumers and producers. There is also a risk that, if there is a problem with the upgraded clients, new-format messages might get added to the message log so that you cannot revert to the previous consumer version.
- Per-topic consumers first
For each topic:
- Upgrade all the consuming applications.
-
Change the topic-level
message.format.version
to the new version. Upgrade all the producing applications.
This strategy avoids any broker down-conversion, and means you can proceed on a topic-by-topic basis. It does not work for applications that are both consumers and producers of the same topic. Again, it has the risk that, if there is a problem with the upgraded clients, new-format messages might get added to the message log.
- Per-topic consumers first, with down conversion
For each topic:
-
Change the topic-level
message.format.version
to the old version (or rely on the topic defaulting to the broker-levellog.message.format.version
). - Upgrade all the consuming and producing applications.
- Verify that the upgraded applications function correctly.
Change the topic-level
message.format.version
to the new version.This strategy requires broker down-conversion, but the load on the brokers is minimized because it is only required for a single topic (or small group of topics) at a time. It also works for applications that are both consumers and producers of the same topic. This approach ensures that the upgraded producers and consumers are working correctly before you commit to using the new message format version.
The main drawback of this approach is that it can be complicated to manage in a cluster with many topics and applications.
-
Change the topic-level
Other strategies for upgrading client applications are also possible.
It is also possible to apply multiple strategies. For example, for the first few applications and topics the "per-topic consumers first, with down conversion" strategy can be used. When this has proved successful another, more efficient strategy can be considered acceptable to use instead.
18.5.3. Upgrading client applications to the new Kafka version
This procedure describes one possible approach to upgrading your client applications to the Kafka version used for AMQ Streams 1.6.
The procedure is based on the "per-topic consumers first, with down conversion" approach outlined in Strategies for upgrading clients.
Client applications include producers, consumers, Kafka Connect, Kafka Streams applications, and MirrorMaker.
Prerequisites
- You have updated the ZooKeeper binaries and upgraded all Kafka brokers to AMQ Streams 1.6
- You have configured Kafka brokers to use the new inter-broker protocol version.
-
You are logged in to Red Hat Enterprise Linux as the
kafka
user.
Procedure
For each topic:
On the command line, set the
message.format.version
configuration option to2.5
.bin/kafka-configs.sh --bootstrap-server <BrokerAddress> --entity-type topics --entity-name <TopicName> --alter --add-config message.format.version=2.5
- Upgrade all the consumers and producers for the topic.
- Optionally, to upgrade consumers and Kafka Streams applications to use the incremental cooperative rebalance protocol, which was added in Kafka 2.4.0, see Section 18.5.5, “Upgrading consumers and Kafka Streams applications to cooperative rebalancing”.
- Verify that the upgraded applications function correctly.
Change the topic’s
message.format.version
configuration option to2.6
.bin/kafka-configs.sh --bootstrap-server <BrokerAddress> --entity-type topics --entity-name <TopicName> --alter --add-config message.format.version=2.6
Additional resources
18.5.4. Upgrading Kafka brokers to use the new message format version
When client applications have been upgraded, you can update the Kafka brokers to use the new message format version.
If you did not modify topic configurations when you upgraded your client applications to use the Kafka version required for AMQ Streams 1.6, the Kafka brokers are now converting messages down to the previous message format version, which can cause a reduction in performance. Therefore, it is important that you update all Kafka brokers to use the new message format version as soon as possible.
Update and restart the Kafka brokers one-by-one. Before you restart a modified broker, stop the broker you configured and restarted previously.
Prerequisites
- You have updated the ZooKeeper binaries and upgraded all Kafka brokers to AMQ Streams 1.6
- You have configured Kafka brokers to use the new inter-broker protocol version.
-
You have upgraded supported client applications that consume messages from topics for which the
message.format.version
property is not explicitly configured at the topic level. -
You are logged in to Red Hat Enterprise Linux as the
kafka
user.
Procedure
For each Kafka broker in your AMQ Streams cluster and one at a time:
-
In a text editor, open the broker properties file for the Kafka broker you want to update. Broker properties files are commonly stored in the
/opt/kafka/config/
directory. Set the
log.message.format.version
to2.6
.log.message.format.version=2.6
On the command line, stop the Kafka broker that you most recently modified and restarted as part of this procedure. If you are modifying the first Kafka broker in this procedure, go to step four.
/opt/kafka/bin/kafka-server-stop.sh jcmd | grep kafka
Restart the Kafka broker whose configuration you modified in step two:
/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
-
Verify that the restarted Kafka broker has caught up with the partition replicas it is following. Use the
kafka-topics.sh
tool to ensure that all replicas contained in the broker are back in sync. For instructions, see Listing and describing topics.
18.5.5. Upgrading consumers and Kafka Streams applications to cooperative rebalancing
You can upgrade Kafka consumers and Kafka Streams applications to use the incremental cooperative rebalance protocol for partition rebalances instead of the default eager rebalance protocol. The new protocol was added in Kafka 2.4.0.
Consumers keep their partition assignments in a cooperative rebalance and only revoke them at the end of the process, if needed to achieve a balanced cluster. This reduces the unavailability of the consumer group or Kafka Streams application.
Upgrading to the incremental cooperative rebalance protocol is optional. The eager rebalance protocol is still supported.
Prerequisites
Procedure
To upgrade a Kafka consumer to use the incremental cooperative rebalance protocol:
-
Replace the Kafka clients
.jar
file with the new version. -
In the consumer configuration, append
cooperative-sticky
to thepartition.assignment.strategy
. For example, if therange
strategy is set, change the configuration torange, cooperative-sticky
. - Restart each consumer in the group in turn, waiting for the consumer to rejoin the group after each restart.
-
Reconfigure each consumer in the group by removing the earlier
partition.assignment.strategy
from the consumer configuration, leaving only thecooperative-sticky
strategy. - Restart each consumer in the group in turn, waiting for the consumer to rejoin the group after each restart.
To upgrade a Kafka Streams application to use the incremental cooperative rebalance protocol:
-
Replace the Kafka Streams
.jar
file with the new version. -
In the Kafka Streams configuration, set the
upgrade.from
configuration parameter to the Kafka version you are upgrading from (for example, 2.3). - Restart each of the stream processors (nodes) in turn.
-
Remove the
upgrade.from
configuration parameter from the Kafka Streams configuration. - Restart each consumer in the group in turn.
Additional resources
- Notable changes in 2.4.0 in the Apache Kafka documentation.