Chapter 13. AMQ Streams and Kafka upgrades
AMQ Streams can be upgraded with no cluster downtime. Each version of AMQ Streams supports one or more versions of Apache Kafka: you can upgrade to a higher Kafka version as long as it is supported by your version of AMQ Streams. In some cases, you can also downgrade to a lower supported Kafka version.
Newer versions of AMQ Streams may support newer versions of Kafka, but you need to upgrade AMQ Streams before you can upgrade to a higher supported Kafka version.
13.1. Upgrade prerequisites
Before you begin the upgrade process, make sure that:
- AMQ Streams is installed. For instructions, see Chapter 2, Getting started.
- You are familiar with any upgrade changes described in the AMQ Streams 1.3 on Red Hat Enterprise Linux Release Notes.
13.2. Upgrade process
Upgrading AMQ Streams is a two-stage process. To upgrade brokers and clients without downtime, you must complete the upgrade procedures in the following order:
Upgrade to the latest AMQ Streams version.
Upgrade all Kafka brokers and client applications to the latest Kafka version
13.3. Kafka versions
AMQ Streams is based on a specific version of Apache Kafka.
AMQ Streams version | Kafka version |
---|---|
1.3 | 2.3.0 |
Kafka’s log message format version and inter-broker protocol version specify the version of the format of messages appended to broker logs and the version of protocol used between brokers in a cluster. As a result, the upgrade process involves making configuration changes to existing Kafka brokers and code changes to client applications (consumers and producers) to ensure the correct versions are used.
The following table shows the differences between Kafka versions:
Kafka version | Interbroker protocol version | Log message format version | Zookeeper version |
---|---|---|---|
2.2.1 | 2.2 | 2.2 | 3.4.14 |
2.3.0 | 2.3 | 2.3 | 3.4.14 |
Although Kafka versions may use the same version of Zookeeper, it is recommended that you update your Zookeeper cluster to use the newest Zookeeper binaries before proceeding with the main AMQ Streams upgrade.
Message format version
When a producer sends a message to a Kafka broker, the message is encoded using a specific format. The format can change between Kafka releases, so messages include a version identifying which version of the format they were encoded with. You can configure a Kafka broker to convert messages from newer format versions to a given older format version before the broker appends the message to the log.
In Kafka, there are two different methods for setting the message format version:
-
The
message.format.version
property is set on topics. -
The
log.message.format.version
property is set on Kafka brokers.
The default value of message.format.version
for a topic is defined by the log.message.format.version
that is set on the Kafka broker. You can manually set the message.format.version
of a topic by modifying its topic configuration.
The upgrade tasks in this section assume that the message format version is defined at the broker level by the log.message.format.version
.
13.4. Upgrading to AMQ Streams 1.3
The steps to upgrade your deployment to use AMQ Streams 1.3 are outlined in this section.
The availability of Kafka clusters managed by AMQ Streams is not affected by the upgrade operation.
Refer to the documentation supporting a specific version of AMQ Streams for information on how to upgrade to that version.
13.4.1. Upgrading Zookeeper
This procedure describes how to upgrade Zookeeper on a host machine.
Prerequisites
-
You are logged in to Red Hat Enterprise Linux as the
kafka
user.
Procedure
For each Kafka broker in your AMQ Streams cluster and one at a time:
Download the AMQ Streams archive for this release from the Customer Portal.
NoteIf prompted, log in to your Red Hat account.
On the command line, create a temporary directory and extract the contents of the
amq-streams-x.y.z-bin.zip
file:mkdir /tmp/kafka unzip amq-streams-x.y.z-bin.zip -d /tmp/kafka
Delete the
libs
,bin
, anddocs
directories from your existing installation:rm -rf /opt/kafka/libs /opt/kafka/bin /opt/kafka/docs
Copy the
libs
,bin
, anddocs
directories from the temporary directory:cp -r /tmp/kafka/kafka_y.y-x.x.x/libs /opt/kafka/ cp -r /tmp/kafka/kafka_y.y-x.x.x/bin /opt/kafka/ cp -r /tmp/kafka/kafka_y.y-x.x.x/docs /opt/kafka/
Delete the temporary directory:
rm -rf /tmp/kafka
Restart Zookeeper:
/opt/kafka/bin/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties
13.4.2. Upgrading Kafka brokers
This procedure describes how to upgrade Kafka brokers on a host machine.
Prerequisites
-
You are logged in to Red Hat Enterprise Linux as the
kafka
user.
Procedure
For each Kafka broker in your AMQ Streams cluster and one at a time:
Download the AMQ Streams archive from the Customer Portal.
NoteIf prompted, log in to your Red Hat account.
On the command line, create a temporary directory and extract the contents of the
amq-streams-x.y.z-bin.zip
file.mkdir /tmp/kafka unzip amq-streams-x.y.z-bin.zip -d /tmp/kafka
Delete the
libs
,bin
, anddocs
directories from your existing installation:rm -rf /opt/kafka/libs /opt/kafka/bin /opt/kafka/docs
Copy the
libs
,bin
, anddocs
directories from the temporary directory:cp -r /tmp/kafka/kafka_y.y-x.x.x/libs /opt/kafka/ cp -r /tmp/kafka/kafka_y.y-x.x.x/bin /opt/kafka/ cp -r /tmp/kafka/kafka_y.y-x.x.x/docs /opt/kafka/
Delete the temporary directory.
rm -r /tmp/kafka
-
In a text editor, open the broker properties file, commonly stored in the
/opt/kafka/config/
directory. Temporarily override the default inter-broker protocol and message format versions for Kafka 2.3.0 by adding or updating the following properties in the file:
inter.broker.protocol.version=2.2 log.message.format.version=2.2
This configures the Kafka broker to process data using the previous inter-broker protocol (
2.2
) and message format versions.On the command line, stop the Kafka broker that you modified:
/opt/kafka/bin/kafka-server-stop.sh jcmd | grep kafka
Restart the Kafka broker that you modified:
/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
NoteThe Kafka broker will start using the binaries for the latest Kafka version.
-
Verify that the restarted Kafka broker has caught up with the partition replicas it is following. Use the
kafka-topics.sh
tool to ensure that all replicas contained in the broker are back in sync. For instructions, see Listing and describing topics.
13.4.3. Upgrading Kafka Connect
This procedure describes how to upgrade a Kafka Connect cluster on a host machine.
Kafka Connect is a client application and should be included in your chosen strategy for upgrading clients. For more information, see Strategies for upgrading clients.
Prerequisites
-
You are logged in to Red Hat Enterprise Linux as the
kafka
user.
Procedure
For each Kafka broker in your AMQ Streams cluster and one at a time:
Download the AMQ Streams archive from the Customer Portal.
NoteIf prompted, log in to your Red Hat account.
On the command line, create a temporary directory and extract the contents of the
amq-streams-x.y.z-bin.zip
file.mkdir /tmp/kafka unzip amq-streams-x.y.z-bin.zip -d /tmp/kafka
Delete the
libs
,bin
, anddocs
directories from your existing installation:rm -rf /opt/kafka/libs /opt/kafka/bin /opt/kafka/docs
Copy the
libs
,bin
, anddocs
directories from the temporary directory:cp -r /tmp/kafka/kafka_y.y-x.x.x/libs /opt/kafka/ cp -r /tmp/kafka/kafka_y.y-x.x.x/bin /opt/kafka/ cp -r /tmp/kafka/kafka_y.y-x.x.x/docs /opt/kafka/
Delete the temporary directory.
rm -r /tmp/kafka
Start Kafka Connect in either standalone or distributed mode.
To start in standalone mode, run the
connect-standalone.sh
script. Specify the Kafka Connect standalone configuration file and the configuration files of your Kafka Connect connectors.su - kafka /opt/kafka/bin/connect-standalone.sh /opt/kafka/config/connect-standalone.properties connector1.properties [connector2.properties ...]
To start in distributed mode, start the Kafka Connect workers with the
/opt/kafka/config/connect-distributed.properties
configuration file on all Kafka Connect nodes:su - kafka /opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.properties
Verify that Kafka Connect is running:
In standalone mode:
jcmd | grep ConnectStandalone
In distributed mode:
jcmd | grep ConnectDistributed
- Verify that Kafka Connect is producing and consuming data as expected.
13.5. Upgrading Kafka
After you have upgraded your binaries to use the latest version of AMQ Streams, you can upgrade your brokers and clients to use a higher supported version of Kafka.
Take care to follow the steps in the correct order:
Additional resources
13.5.1. Upgrading Kafka brokers to use the new inter-broker protocol version
Manually configure and restart all Kafka brokers to use the new inter-broker protocol version. After performing these steps, data is transmitted between the Kafka brokers using the new inter-broker protocol version.
Messages received are still appended to the message logs in the earlier message format version.
Downgrading AMQ Streams is not possible after completing this procedure.
Prerequisites
- You have updated the Zookeeper binaries.
- You have upgraded all Kafka brokers to AMQ Streams 1.3
-
You are logged in to Red Hat Enterprise Linux as the
kafka
user.
Procedure
For each Kafka broker in your AMQ Streams cluster and one at a time:
-
In a text editor, open the broker properties file for the Kafka broker you want to update. Broker properties files are commonly stored in the
/opt/kafka/config/
directory. Set the
inter.broker.protocol.version
to2.3
.inter.broker.protocol.version=2.3
On the command line, stop the Kafka broker that you modified:
/opt/kafka/bin/kafka-server-stop.sh jcmd | grep kafka
Restart the Kafka broker that you modified:
/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
-
Verify that the restarted Kafka broker has caught up with the partition replicas it is following. Use the
kafka-topics.sh
tool to ensure that all replicas contained in the broker are back in sync. For instructions, see Listing and describing topics.
13.5.2. Strategies for upgrading clients
The best approach to upgrading your client applications depends on your particular circumstances. Client applications might include producers, consumers, Kafka Connect, and Kafka MirrorMaker.
Consuming applications need to receive messages in a message format that they understand. You can ensure that this is the case in one of two ways:
- By upgrading all the consumers for a topic before upgrading any of the producers.
- By having the brokers down-convert messages to an older format.
Using broker down-conversion puts extra load on the brokers, so it is not ideal to rely on down-conversion for all topics for a prolonged period of time. For brokers to perform optimally they should not be down converting messages at all.
Broker down-conversion is configured in two ways:
-
The topic-level
message.format.version
configures it for a single topic. -
The broker-level
log.message.format.version
is the default for topics that do not have the topic-levelmessage.format.version
configured.
Another aspect to consider is that once new-version messages have been published to a topic then they will be visible to consumers. This is because brokers perform down-conversion when they receive messages from producers, not when they are sent to consumers.
There are a number of strategies you can use to upgrade your clients:
- Consumers first
- Upgrade all the consuming applications.
-
Change the broker-level
log.message.format.version
to the new version. Upgrade all the producing applications.
This strategy is straightforward, and avoids any broker down-conversion. However, it assumes that all consumers in your organization can be upgraded in a coordinated way, and it does not work for applications that are both consumers and producers. There is also a risk that, if there is a problem with the upgraded clients, new-format messages might get added to the message log so that you cannot revert to the previous consumer version.
- Per-topic consumers first
For each topic:
- Upgrade all the consuming applications.
-
Change the topic-level
message.format.version
to the new version. Upgrade all the producing applications.
This strategy avoids any broker down-conversion, and means you can proceed on a topic-by-topic basis. It does not work for applications that are both consumers and producers of the same topic. Again, it has the risk that, if there is a problem with the upgraded clients, new-format messages might get added to the message log.
- Per-topic consumers first, with down conversion
For each topic:
-
Change the topic-level
message.format.version
to the old version (or rely on the topic defaulting to the broker-levellog.message.format.version
). - Upgrade all the consuming and producing applications.
- Verify that the upgraded applications function correctly.
Change the topic-level
message.format.version
to the new version.This strategy requires broker down-conversion, but the load on the brokers is minimized because it is only required for a single topic (or small group of topics) at a time. It also works for applications that are both consumers and producers of the same topic. This approach ensures that the upgraded producers and consumers are working correctly before you commit to using the new message format version.
The main drawback of this approach is that it can be complicated to manage in a cluster with many topics and applications.
-
Change the topic-level
Other strategies for upgrading client applications are also possible.
It is also possible to apply multiple strategies. For example, for the first few applications and topics the "per-topic consumers first, with down conversion" strategy can be used. When this has proved successful another, more efficient strategy can be considered acceptable to use instead.
13.5.3. Upgrading client applications to the new Kafka version
This procedure describes one possible approach to upgrading your client applications to the Kafka version used for AMQ Streams 1.3.
The procedure is based on the "per-topic consumers first, with down conversion" approach outlined in Strategies for upgrading clients.
Client applications might include producers, consumers, Kafka Connect, and MirrorMaker.
Prerequisites
- You have updated the Zookeeper binaries.
- You have upgraded all Kafka brokers to AMQ Streams 1.3.
- You have configured Kafka brokers to use the new inter-broker protocol version.
-
You are logged in to Red Hat Enterprise Linux as the
kafka
user.
Procedure
For each topic:
On the command line, set the
message.format.version
configuration option to2.2
.bin/kafka-configs.sh --zookeeper <ZookeeperAddress> --entity-type topics --entity-name <TopicName> --alter --add-config message.format.version=2.2
- Upgrade all the consuming and producing applications for the topic.
- Verify that the upgraded applications function correctly.
Change the topic’s
message.format.version
configuration option to2.3
.bin/kafka-configs.sh --zookeeper <ZookeeperAddress> --entity-type topics --entity-name <TopicName> --alter --add-config message.format.version=2.3
Additional resources
13.5.4. Upgrading Kafka brokers to use the new message format version
When client applications have been upgraded, you can update the Kafka brokers to use the new message format version.
If you did not modify topic configurations when you upgraded your client applications to use the Kafka version required for AMQ Streams 1.3, the Kafka brokers are now converting messages down to the previous message format version, which can cause a reduction in performance. Therefore, it is important that you update all Kafka brokers to use the new message format version as soon as possible.
Update and restart the Kafka brokers one-by-one. Before you restart a modified broker, stop the broker you configured and restarted previously.
Prerequisites
- You have updated the Zookeeper binaries.
- You have upgraded all Kafka brokers to AMQ Streams 1.3.
- You have configured Kafka brokers to use the new inter-broker protocol version.
-
You have upgraded supported client applications that consume messages from topics for which the
message.format.version
property is not explicitly configured at the topic level. -
You are logged in to Red Hat Enterprise Linux as the
kafka
user.
Procedure
For each Kafka broker in your AMQ Streams cluster and one at a time:
-
In a text editor, open the broker properties file for the Kafka broker you want to update. Broker properties files are commonly stored in the
/opt/kafka/config/
directory. Set the
log.message.format.version
to2.3
.log.message.format.version=2.3
On the command line, stop the Kafka broker that you most recently modified and restarted as part of this procedure. If you are modifying the first Kafka broker in this procedure, go to step four.
/opt/kafka/bin/kafka-server-stop.sh jcmd | grep kafka
Restart the Kafka broker whose configuration you modified in step two:
/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
-
Verify that the restarted Kafka broker has caught up with the partition replicas it is following. Use the
kafka-topics.sh
tool to ensure that all replicas contained in the broker are back in sync. For instructions, see Listing and describing topics.