此内容没有您所选择的语言版本。
Chapter 3. Getting started
Streams for Apache Kafka is distributed in a ZIP file that contains installation artifacts for the Kafka components.
The Kafka Bridge has separate installation files. For information on installing and using the Kafka Bridge, see Using the Streams for Apache Kafka Bridge.
3.1. Installation environment 复制链接链接已复制到粘贴板!
Streams for Apache Kafka runs on Red Hat Enterprise Linux. The host (node) can be a physical or virtual machine (VM). Use the installation files provided with Streams for Apache Kafka to install Kafka components. You can install Kafka in a single-node or multi-node environment.
- Single-node environment
- A single-node Kafka cluster runs instances of Kafka components on a single host. This configuration is not suitable for a production environment.
- Multi-node environment
- A multi-node Kafka cluster runs instances of Kafka components on multiple hosts.
We recommended that you run Kafka and other Kafka components, such as Kafka Connect, on separate hosts. By running the components in this way, it’s easier to maintain and upgrade each component.
Kafka clients establish a connection to the Kafka cluster using the bootstrap.servers configuration property. If you are using Kafka Connect, for example, the Kafka Connect configuration properties must include a bootstrap.servers value that specifies the hostname and port of the hosts where the Kafka brokers are running. If the Kafka cluster is running on more than one host with multiple Kafka brokers, you specify a hostname and port for each broker. Each Kafka broker is identified by a node.id.
3.1.1. Storage considerations 复制链接链接已复制到粘贴板!
Apache Kafka stores records in data logs, which are configured using the log.dirs property. For more information, see Section 5.3.3, “Data logs”.
Efficient data storage is essential for Streams for Apache Kafka to operate effectively. Streams for Apache Kafka has been tested with block storage as the primary storage type for Kafka brokers, and block storage is strongly recommended. File system-based storage (such as NFS) is not guaranteed to work for primary broker storage and may cause stability or performance issues.
Common block storage types include:
Cloud-based block storage solutions:
- Amazon EBS (for AWS)
- Azure Disk Storage (for Microsoft Azure)
- Persistent Disk (for Google Cloud)
- Persistent storage (for bare metal deployments) using local persistent devices
- Storage Area Network (SAN) volumes accessed by protocols like Fibre Channel or iSCSI
3.1.2. File systems 复制链接链接已复制到粘贴板!
Kafka uses a file system for storing messages. Streams for Apache Kafka is compatible with the XFS and ext4 file systems, which are commonly used with Kafka. Consider the underlying architecture and requirements of your deployment when choosing and setting up your file system.
For more information, refer to Filesystem Selection in the Kafka documentation.
3.1.3. Tiered storage 复制链接链接已复制到粘贴板!
Kafka’s tiered storage feature is supported in Streams for Apache Kafka as an optional capability.
With tiered storage enabled:
- Primary broker storage,such as persistent storage, handles recent data
- Remote tiered storage, such as object storage, is used for historical data
Streams for Apache Kafka allows users to integrate their own tiered storage plugins to support specific remote storage backends. If using a custom plugin, ensure that it meets performance and compatibility requirements before deploying to production.
3.1.4. Disk usage 复制链接链接已复制到粘贴板!
Solid-state drives (SSDs), though not essential, can improve the performance of Kafka in large clusters where data is sent to and received from multiple topics asynchronously.
Replicated storage is not required, as Kafka provides built-in data replication.
3.2. Tiered storage 复制链接链接已复制到粘贴板!
Tiered storage introduces a flexible approach to managing Kafka data whereby log segments are moved to a separate storage system. For example, you can combine the use of block storage on brokers for frequently accessed data and offload older or less frequently accessed data from the block storage to more cost-effective, scalable remote storage solutions, such as Amazon S3, without compromising data accessibility and durability.
Tiered storage is a production-ready feature in Kafka since version 3.9.0, and it is also supported in Streams for Apache Kafka. Before introducing tiered storage to your environment, review the known limitations of this feature.
3.2.1. Tiered storage plugin libraries 复制链接链接已复制到粘贴板!
An open-source tiered-storage-for-apache-kafka project from Aiven demonstrates how Apache Kafka can use remote object storage for tiered storage. The project provides an implementation of the RemoteStorageManager interface for Kafka’s tiered storage feature.
Streams for Apache Kafka includes plugin libraries from this project for AWS S3 and file system storage by default in Kafka v4.0 and later.
For more information, see the tiered-storage-for-apache-kafka project on GitHub.
3.2.2. AWS S3 remote storage configuration 复制链接链接已复制到粘贴板!
Streams for Apache Kafka includes plugin libraries from the open-source tiered-storage-for-apache-kafka project to enable tiered storage using AWS S3.
To configure AWS S3 as the remote storage, specify the following properties in the Kafka node configuration:
-
remote.log.storage.manager.class.path=/<path_to_kafka>/libs/tiered-storage/* -
remote.log.storage.manager.class.name=io.aiven.kafka.tieredstorage.RemoteStorageManager -
rsm.config.storage.backend.class=io.aiven.kafka.tieredstorage.storage.s3.S3Storage -
Any additional
rsm.config.storage.s3.*andrsm.config.storage.aws.*properties as needed for your S3 environment
Before applying the configuration, ensure that the Streams for Apache Kafka components are installed and available on the node.
Example configuration for Amazon S3 tiered storage
- 1
- The name of the AWS S3 bucket used for storing and retrieving data.
- 2
- The AWS region of the S3 bucket (for example,
us-east-1). - 3
- The AWS access key ID used to access the S3 bucket.
- 4
- The AWS secret access key used to access the S3 bucket.
- 5
- The size (in bytes) of chunks into which segment files are split. It’s recommended to start with "4194304" (4MiB).
3.2.3. File system remote storage configuration 复制链接链接已复制到粘贴板!
Streams for Apache Kafka includes plugin libraries from the open-source tiered-storage-for-apache-kafka project to enable tiered storage using file system paths. The file system storage plugin supports any mountable file system that can be accessed using a local path (for example, /mnt/tiered-storage/).
To configure a file system as the remote storage, specify the following properties in the Kafka node configuration:
-
remote.log.storage.manager.class.path=/<path_to_kafka>/libs/tiered-storage/* -
remote.log.storage.manager.class.name=io.aiven.kafka.tieredstorage.RemoteStorageManager -
rsm.config.storage.backend.class=io.aiven.kafka.tieredstorage.storage.filesystem.FileSystemStorage -
Any additional file system–related configuration properties, such as
rsm.config.storage.root
Before applying the configuration, ensure that the Streams for Apache Kafka components are installed and available on the node.
Example configuration for file system (NFS) tiered storage
3.2.4. Common tiered storage tuning properties 复制链接链接已复制到粘贴板!
In addition to the required configuration properties, Streams for Apache Kafka supports a set of optional tuning properties for the tiered storage plugin. These properties can help optimize storage performance, throughput, and resource usage depending on your environment.
Common tuning properties include the following:
rsm.config.storage.chunk.cache.prefetch.max.size-
Controls the maximum amount of data (in bytes) to prefetch and cache. This setting can improve performance when reading chunked segments.
Default: 0 rsm.config.storage.upload.rate.limit.bytes.per.second-
Sets an upper bound on the upload rate from disk (in bytes per second). The value must be at least 1 MiB/s.
Default: No limit rsm.config.storage.compression.enabled-
Enables additional compression of segment data to reduce storage usage.
Default: false rsm.config.storage.s3.multipart.upload.part.size-
Sets the part size (in bytes) used when performing multipart uploads to S3. Tuning this value can improve upload performance and compatibility with S3 limits.
Default: 26214400 (25 MiB)
For a full list of supported configuration options, see the tiered-storage-for-apache-kafka project documentation.
3.3. Downloading Streams for Apache Kafka 复制链接链接已复制到粘贴板!
A ZIP file distribution of Streams for Apache Kafka is available for download from the Red Hat website. You can download the latest version of Red Hat Streams for Apache Kafka from the Streams for Apache Kafka software downloads page.
-
For Kafka and other Kafka components, download the
amq-streams-<version>-kafka-bin.zipfile For Kafka Bridge, download the
amq-streams-<version>-bridge-bin.zipfile.For installation instructions, see Using the Streams for Apache Kafka Bridge.
After downloading Streams for Apache Kafka, set up the directory structure and configure a dedicated user to manage the Kafka installation. This provides proper permissions and a consistent environment for running Kafka.
The following setup is assumed throughout the rest of this guide, but you can adapt it to suit your specific requirements.
- Installation directory
-
Install Kafka in the
/opt/kafka/directory to maintain a standard location. This guide assumes all Kafka commands are executed from this directory. - Dedicated Kafka User
-
Create a system user, such as
kafka, to manage the installation. Change ownership of the/opt/kafka/directory to this user to ensure secure access. You can use any username that fits your environment.
3.5. Installing Kafka 复制链接链接已复制到粘贴板!
Use the Streams for Apache Kafka ZIP files to install Kafka on Red Hat Enterprise Linux. You can install Kafka in either a single-node or multi-node environment. This procedure focuses on installing a single Kafka instance on a single host (node). For this setup, Kafka is installed in the /opt/kafka/ directory, and a dedicated Kafka user (kafka) is used to manage the installation.
The Streams for Apache Kafka installation files include the binaries for running other Kafka components, like Kafka Connect, Kafka MirrorMaker 2, and Kafka Bridge. In a single-node environment, you can run these components from the same host where you installed Kafka. However, we recommend that you add the installation files and run other Kafka components on separate hosts.
Prerequisites
- You have downloaded the installation files.
- You have reviewed the supported configurations in the Streams for Apache Kafka 3.1 on Red Hat Enterprise Linux Release Notes.
-
You are logged in to Red Hat Enterprise Linux as admin (
root) user.
Procedure
Install Kafka on your host.
Add a new Kafka user and group:
groupadd kafka useradd -g kafka kafka passwd kafka
groupadd kafka useradd -g kafka kafka passwd kafkaCopy to Clipboard Copied! Toggle word wrap Toggle overflow This step creates the necessary user and group for managing Kafka.
Extract and move the contents of the
amq-streams-<version>-kafka-bin.zipfile and place it in the/opt/kafkadirectory:unzip amq-streams-<version>-kafka-bin.zip -d /opt mv /opt/kafka*redhat* /opt/kafka
unzip amq-streams-<version>-kafka-bin.zip -d /opt mv /opt/kafka*redhat* /opt/kafkaCopy to Clipboard Copied! Toggle word wrap Toggle overflow Change the ownership of the
/opt/kafkadirectory to the Kafka user:chown -R kafka:kafka /opt/kafka
chown -R kafka:kafka /opt/kafkaCopy to Clipboard Copied! Toggle word wrap Toggle overflow Create the
/var/lib/kafkadirectory for storing Kafka data and change its ownership to the Kafka user:mkdir /var/lib/kafka chown -R kafka:kafka /var/lib/kafka
mkdir /var/lib/kafka chown -R kafka:kafka /var/lib/kafkaCopy to Clipboard Copied! Toggle word wrap Toggle overflow You can now run a default configuration of Kafka as a single-node cluster.
(Optional) Run other Kafka components, like Kafka Connect, on the same host.
To run other components, specify the hostname and port to connect to the Kafka broker using the
bootstrap.serversproperty in the component configuration.Example bootstrap servers configuration pointing to a single Kafka broker on the same host
bootstrap.servers=localhost:9092
bootstrap.servers=localhost:9092Copy to Clipboard Copied! Toggle word wrap Toggle overflow However, we recommend installing and running Kafka components on separate hosts.
(Optional) Install Kafka components on separate hosts.
-
Extract the Kafka installation files into the
/opt/kafka/directory on each host. -
Change the ownership of the
/opt/kafka/directory to the Kafka user on each host. Update the
bootstrap.serversproperty to connect the components to the Kafka brokers running on different hosts.Example bootstrap servers configuration pointing to Kafka brokers on different hosts
bootstrap.servers=kafka0.<host_ip_address>:9092,kafka1.<host_ip_address>:9092,kafka2.<host_ip_address>:9092
bootstrap.servers=kafka0.<host_ip_address>:9092,kafka1.<host_ip_address>:9092,kafka2.<host_ip_address>:9092Copy to Clipboard Copied! Toggle word wrap Toggle overflow You can use this configuration for Kafka Connect, MirrorMaker 2, and the Kafka Bridge.
-
Extract the Kafka installation files into the
3.6. Running a Kafka cluster 复制链接链接已复制到粘贴板!
Configure and run a Kafka cluster. Nodes in the cluster perform the role of broker, controller, or both.
- Broker role
- Manages the storage and passing of messages.
- Controller role
- Coordinates the cluster and manages metadata.
- Combined role
- A single node acts as both broker and controller.
The minimum recommended setup is three brokers and three controllers with topic replication for stability and availability. The internal __cluster_metadata topic stores cluster-wide information.
Prerequisites
- Streams for Apache Kafka is installed on each host, and the configuration files are available.
-
The procedure uses
kafka-storage.sh,kafka-server-start.sh, andkafka-metadata-quorum.shtools.
Procedure
Generate a unique Kafka cluster ID using the
kafka-storagetool:./bin/kafka-storage.sh random-uuid
./bin/kafka-storage.sh random-uuidCopy to Clipboard Copied! Toggle word wrap Toggle overflow Save this ID as it is reused for all nodes.
Create a configuration file for each node.
Base the configuration file on Kafka’s provided examples:
-
controller.propertiesfor controller-only nodes -
broker.propertiesfor broker-only nodes -
server.propertiesfor nodes with both roles
Adjust the following properties for each node:
Copy to Clipboard Copied! Toggle word wrap Toggle overflow Important considerations:
-
Each node must have a unique
node.id. -
listenershostname (0.0.0.0) binds to all interfaces but can be changed to a specific hostname or IP address if needed for each node. -
advertised.listenersmust reflect the actual address that clients use to connect to the Kafka node. -
The
log.dirspath in the configuration file defines where metadata is stored. If not set, it defaults to a temporary location, which is cleared on reboot and suitable only for development.
-
Bootstrap the controller quorum (one controller only):
./bin/kafka-storage.sh format --cluster-id <uuid> --standalone --config ./config/controller.properties
./bin/kafka-storage.sh format --cluster-id <uuid> --standalone --config ./config/controller.propertiesCopy to Clipboard Copied! Toggle word wrap Toggle overflow This initializes the metadata log and bootstraps the quorum. Use it on one node only. Replace
<uuid>with the generated cluster ID.NoteTo bootstrap with multiple controllers, use
--initial-controllersinstead.Start the controller:
./bin/kafka-server-start.sh ./config/controller.properties
./bin/kafka-server-start.sh ./config/controller.propertiesCopy to Clipboard Copied! Toggle word wrap Toggle overflow Add additional controllers to the quorum.
Make sure
controller.quorum.bootstrap.serversis correctly set across all nodes and the controllers are started.Format the remaining nodes (brokers):
./bin/kafka-storage.sh format --cluster-id <uuid> --config ./config/server.properties --no-initial-controllers
./bin/kafka-storage.sh format --cluster-id <uuid> --config ./config/server.properties --no-initial-controllersCopy to Clipboard Copied! Toggle word wrap Toggle overflow Use the appropriate configuration file based on the role of each node. Use the same cluster ID for all nodes. This step prepares each node’s storage without modifying the controller quorum.
Start each broker node:
./bin/kafka-server-start.sh ./config/server.properties
./bin/kafka-server-start.sh ./config/server.propertiesCopy to Clipboard Copied! Toggle word wrap Toggle overflow Check that Kafka is running:
jcmd | grep kafka
jcmd | grep kafkaCopy to Clipboard Copied! Toggle word wrap Toggle overflow Returns:
process ID kafka.Kafka ./config/server.properties
process ID kafka.Kafka ./config/server.propertiesCopy to Clipboard Copied! Toggle word wrap Toggle overflow Check the logs of each node to ensure that they have successfully joined the KRaft cluster:
tail -f ./logs/server.log
tail -f ./logs/server.logCopy to Clipboard Copied! Toggle word wrap Toggle overflow
Next steps
You can now create topics, and send and receive messages from the brokers.
For brokers passing messages, you can use topic replication across the brokers in a cluster for data durability. Configure topics to have a replication factor of at least three and a minimum number of in-sync replicas set to 1 less than the replication factor (replication.factor=3, with min.insync.replicas=2). For more information, see Section 9.7, “Creating a topic”.
3.7. Sending and receiving messages from a topic 复制链接链接已复制到粘贴板!
This procedure describes how to start the Kafka console producer and consumer clients and use them to send and receive several messages.
A new topic is automatically created in step one. Topic auto-creation is controlled using the auto.create.topics.enable configuration property (set to true by default). Alternatively, you can configure and create topics before using the cluster. For more information, see Topics.
Procedure
Start the Kafka console producer and configure it to send messages to a new topic:
./bin/kafka-console-producer.sh --broker-list <bootstrap_address> --topic <topic-name>
./bin/kafka-console-producer.sh --broker-list <bootstrap_address> --topic <topic-name>Copy to Clipboard Copied! Toggle word wrap Toggle overflow For example:
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-topicCopy to Clipboard Copied! Toggle word wrap Toggle overflow Enter several messages into the console. Press Enter to send each individual message to your new topic:
>message 1 >message 2 >message 3 >message 4
>message 1 >message 2 >message 3 >message 4Copy to Clipboard Copied! Toggle word wrap Toggle overflow When Kafka creates a new topic automatically, you might receive a warning that the topic does not exist:
WARN Error while fetching metadata with correlation id 39 : {4-3-16-topic1=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)WARN Error while fetching metadata with correlation id 39 : {4-3-16-topic1=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)Copy to Clipboard Copied! Toggle word wrap Toggle overflow The warning should not reappear after you send further messages.
In a new terminal window, start the Kafka console consumer and configure it to read messages from the beginning of your new topic.
./bin/kafka-console-consumer.sh --bootstrap-server <bootstrap_address> --topic <topic-name> --from-beginning
./bin/kafka-console-consumer.sh --bootstrap-server <bootstrap_address> --topic <topic-name> --from-beginningCopy to Clipboard Copied! Toggle word wrap Toggle overflow For example:
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginningCopy to Clipboard Copied! Toggle word wrap Toggle overflow The incoming messages display in the consumer console.
- Switch to the producer console and send additional messages. Check that they display in the consumer console.
- Stop the Kafka console producer and then the consumer by pressing Ctrl+C.
3.8. Stopping the Streams for Apache Kafka services 复制链接链接已复制到粘贴板!
You can stop Kafka services by running a script. After running the script, all connections to the Kafka services are terminated.
Procedure
Stop the Kafka node.
./bin/kafka-server-stop.sh
./bin/kafka-server-stop.shCopy to Clipboard Copied! Toggle word wrap Toggle overflow Confirm that the Kafka node is stopped.
jcmd | grep kafka
jcmd | grep kafkaCopy to Clipboard Copied! Toggle word wrap Toggle overflow
This procedure shows how to do a graceful rolling restart of brokers in a multi-node cluster. A rolling restart is usually required following an upgrade or change to the Kafka cluster configuration properties.
Some broker configurations do not need a restart of the broker. For more information, see Updating Broker Configs in the Apache Kafka documentation.
After you perform a restart of a broker, check for under-replicated topic partitions to make sure that replica partitions have caught up.
To achieve a graceful restart with no loss of availability, ensure that you are replicating topics and that at least the minimum number of replicas (min.insync.replicas) replicas are in sync. The min.insync.replicas configuration determines the minimum number of replicas that must acknowledge a write for the write to be considered successful.
For a multi-node cluster, the standard approach is to have a topic replication factor of at least 3 and a minimum number of in-sync replicas set to 1 less than the replication factor. If you are using acks=all in your producer configuration for data durability, check that the broker you restarted is in sync with all the partitions it’s replicating before restarting the next broker.
Single-node clusters are unavailable during a restart, since all partitions are on the same broker.
Prerequisites
- Streams for Apache Kafka is installed on each host, and the configuration files are available.
The Kafka cluster is operating as expected.
Check for under-replicated partitions or any other issues affecting broker operation. The steps in this procedure describe how to check for under-replicated partitions.
Procedure
Perform the following steps on each Kafka broker. Complete the steps on the first broker before moving on to the next. Perform the steps on the brokers that also act as controllers last. Otherwise, the controllers need to change on more than one restart.
Stop the Kafka broker:
./bin/kafka-server-stop.sh
./bin/kafka-server-stop.shCopy to Clipboard Copied! Toggle word wrap Toggle overflow Make any changes to the broker configuration that require a restart after completion.
For further information, see the following:
Restart the Kafka broker:
./bin/kafka-server-start.sh -daemon ./config/server.properties
./bin/kafka-server-start.sh -daemon ./config/server.propertiesCopy to Clipboard Copied! Toggle word wrap Toggle overflow Check that Kafka is running:
jcmd | grep kafka
jcmd | grep kafkaCopy to Clipboard Copied! Toggle word wrap Toggle overflow Returns:
process ID kafka.Kafka ./config/server.properties
process ID kafka.Kafka ./config/server.propertiesCopy to Clipboard Copied! Toggle word wrap Toggle overflow Check the logs of each node to ensure that they have successfully joined the KRaft cluster:
tail -f ./logs/server.log
tail -f ./logs/server.logCopy to Clipboard Copied! Toggle word wrap Toggle overflow Wait until the broker has zero under-replicated partitions. You can check from the command line or use metrics.
Use the
kafka-topics.shcommand with the--under-replicated-partitionsparameter:./bin/kafka-topics.sh --bootstrap-server <broker_host>:<port> --describe --under-replicated-partitions
./bin/kafka-topics.sh --bootstrap-server <broker_host>:<port> --describe --under-replicated-partitionsCopy to Clipboard Copied! Toggle word wrap Toggle overflow For example:
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --under-replicated-partitions
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --under-replicated-partitionsCopy to Clipboard Copied! Toggle word wrap Toggle overflow The command provides a list of topics with under-replicated partitions in a cluster.
Topics with under-replicated partitions
Topic: topic3 Partition: 4 Leader: 2 Replicas: 2,3 Isr: 2 Topic: topic3 Partition: 5 Leader: 3 Replicas: 1,2 Isr: 1 Topic: topic1 Partition: 1 Leader: 3 Replicas: 1,3 Isr: 3 # …
Topic: topic3 Partition: 4 Leader: 2 Replicas: 2,3 Isr: 2 Topic: topic3 Partition: 5 Leader: 3 Replicas: 1,2 Isr: 1 Topic: topic1 Partition: 1 Leader: 3 Replicas: 1,3 Isr: 3 # …Copy to Clipboard Copied! Toggle word wrap Toggle overflow Under-replicated partitions are listed if the ISR (in-sync replica) count is less than the number of replicas. If a list is not returned, there are no under-replicated partitions.
Use the
UnderReplicatedPartitionsmetric:kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions
kafka.server:type=ReplicaManager,name=UnderReplicatedPartitionsCopy to Clipboard Copied! Toggle word wrap Toggle overflow The metric provides a count of partitions where replicas have not caught up. You wait until the count is zero.
TipUse the Kafka Exporter to create an alert when there are one or more under-replicated partitions for a topic.
Checking logs when restarting
If a broker fails to start, check the application logs for information. You can also check the status of a broker shutdown and restart in the ./logs/server.log application log.