Ce contenu n'est pas disponible dans la langue sélectionnée.

Chapter 5. Configuring Streams for Apache Kafka


Use the Kafka configuration properties files to configure Streams for Apache Kafka.

The properties file is in Java format, with each property on a separate line in the following format:

<option> = <value>

Lines starting with # or ! are treated as comments and are ignored by Streams for Apache Kafka components.

# This is a comment

Values can be split into multiple lines by using \ directly before the newline/carriage return.

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="bob" \
    password="bobs-password";

After saving the changes in the properties file, you need to restart the Kafka node. In a multi-node environment, repeat the process on each node in the cluster.

5.1. Using standard Kafka configuration properties

Use standard Kafka configuration properties to configure Kafka components.

The properties provide options to control and tune the configuration of the following Kafka components:

  • Brokers
  • Topics
  • Producer, consumer, and management clients
  • Kafka Connect
  • Kafka Streams

Broker and client parameters include options to configure authorization, authentication and encryption.

For further information on Kafka configuration properties and how to use the properties to tune your deployment, see the following guides:

5.2. Loading configuration values from environment variables

Use the Environment Variables Configuration Provider plugin to load configuration data from environment variables. You can use the Environment Variables Configuration Provider, for example, to load certificates or JAAS configuration from environment variables.

You can use the provider to load configuration data for all Kafka components, including producers and consumers. Use the provider, for example, to provide the credentials for Kafka Connect connector configuration.

Prerequisites

Procedure

  1. Add the Environment Variables Configuration Provider JAR file to the Kafka libs directory.
  2. Initialize the Environment Variables Configuration Provider in the configuration properties file of the Kafka component. For example, to initialize the provider for Kafka, add the configuration to the server.properties file.

    Configuration to enable the Environment Variables Configuration Provider

    config.providers.env.class=org.apache.kafka.common.config.provider.EnvVarConfigProvider

  3. Add configuration to the properties file to load data from environment variables.

    Configuration to load data from an environment variable

    option=${env:<MY_ENV_VAR_NAME>}

    Use capitalized or upper-case environment variable naming conventions, such as MY_ENV_VAR_NAME.

  4. Save the changes.
  5. Restart the Kafka component.

    For information on restarting brokers in a multi-node cluster, see Section 3.9, “Performing a graceful rolling restart of Kafka brokers”.

5.3. Configuring Kafka

Kafka uses properties files to store static configuration. The recommended location for the configuration files is ./config/. The configuration files have to be readable by the Kafka user.

Streams for Apache Kafka ships example configuration files that highlight various basic and advanced features of the product. They can be found under config/kraft/ in the Streams for Apache Kafka installation directory as follows:

  • (default) config/kraft/server.properties for nodes running in combined mode
  • config/kraft/broker.properties for nodes running as brokers
  • config/kraft/controller.properties for nodes running as controllers

This chapter explains the most important configuration options.

5.3.1. Listeners

Listeners are used for client connections to Kafka brokers, inter-broker communication, and communication between brokers and controllers.

  • Client listeners enable Kafka clients to connect to brokers.
  • Inter-broker listeners manage communication between brokers.
  • Controller listeners facilitate communication between brokers and controllers.

Each Kafka node can be configured to use multiple listeners, including those dedicated to clients, brokers, or controllers. Each listener requires a different configuration so it can listen on a different port or network interface.

To configure listeners, edit the listeners property in the Kafka configuration properties file. Add listeners to the listeners property as a comma-separated list. Configure each property as follows:

<listener_name>://<hostname>:<port>

If <hostname> is empty, Kafka uses the java.net.InetAddress.getCanonicalHostName() method to resolve the machine’s IP address to a fully qualified domain name (FQDN). Kafka advertises this hostname to clients and brokers. Explicitly setting the hostname is recommended to avoid DNS issues.

Example configuration for multiple listeners

listeners=internal-1://:9092,internal-2://:9093,replication://:9094,controller://:9095

When a Kafka client connects to a Kafka cluster, it first contacts the bootstrap server, which is a broker specified in the client’s configuration. The bootstrap server provides the client with metadata, including the addresses of all brokers in the cluster. The client uses this metadata to connect directly to the brokers responsible for the partitions it needs to access. The broker addresses provided in the metadata are based on the advertised.listeners property.

5.3.1.1. Advertised listeners

Optionally, you can use the advertised.listeners property to provide the client with a different set of listener addresses than those given in the listeners property. This is useful if additional network infrastructure, such as a proxy, is between the client and the broker, or an external DNS name is being used instead of an IP address.

The advertised.listeners property is formatted in the same way as the listeners property.

Example configuration for advertised listeners

listeners=internal-1://:9092,internal-2://:9093
advertised.listeners=internal-1://my-broker-1.my-domain.com:1234,internal-2://my-broker-1.my-domain.com:1235

Note

The names of the advertised listeners must match those listed in the listeners property.

5.3.1.2. Inter-broker listeners

Inter-broker listeners are used for communication between Kafka brokers. Inter-broker communication is required for:

  • Coordinating workloads between different brokers
  • Replicating messages between partitions stored on different brokers

The inter-broker listener can be assigned to a port of your choice. When multiple listeners are configured, you can define the name of the inter-broker listener in the inter.broker.listener.name property of your broker configuration.

Here, the inter-broker listener is named as REPLICATION:

listeners=REPLICATION://0.0.0.0:9091
inter.broker.listener.name=REPLICATION

5.3.1.3. Controller listeners

Controller configuration is used to connect and communicate with the controller that coordinates the cluster and manages the metadata used to track the status of brokers and partitions.

By default, communication between the controllers and brokers uses a dedicated controller listener. Controllers are responsible for coordinating administrative tasks, such as partition leadership changes, so one or more of these listeners is required.

Specify listeners to use for controllers using the controller.listener.names property. You can specify a dynamic quorum of controllers using the controller.quorum.bootstrap.servers property. The quorum enables a leader-follower structure for administrative tasks, with the leader actively managing operations and followers as hot standbys, maintaining data integrity and enabling failover in case of disruptions.

listeners=CONTROLLER://0.0.0.0:9090
controller.listener.names=CONTROLLER
controller.quorum.bootstrap.servers=localhost:9090

The format for the controller quorum is <hostname>:<port>.

5.3.2. Changing controller listeners

To change controller listener configuration, create and switch to a new controller with the new configuration. This procedure describes how to change controller listeners in a 3-node controller quorum from using PLAINTEXT to SSL security to enhance secure communication between controller nodes. The update involves adding the SSL listener, updating brokers to use the new listener, and removing the old PLAINTEXT listener.

Each controller in the controller quorum has the following configuration:

Example controller configuration before the switch

process.roles=controller
node.id=1
#...

listeners=CONTROLLER_PLAIN://0.0.0.0:9090
advertised.listeners=CONTROLLER_PLAIN://localhost:9090
controller.listener.names=CONTROLLER_PLAIN
listener.security.protocol.map=CONTROLLER_PLAIN:PLAINTEXT
controller.quorum.bootstrap.servers=localhost:9090, localhost:9091, localhost:9092

We create three new listeners named CONTROLLER_SSL on ports 9093, 9094, and 9095.

Prerequisites

  • Streams for Apache Kafka is installed on the host, and the configuration files and tools are available.
    This procedure uses the kafka-server-start, kafka-server-stop, and kafka-metadata-quorum.sh tools.
  • Administrative access to the controller nodes.
  • SSL certificates must be generated and configured for the controllers and brokers.

Procedure

  1. For each controller in the controller quorum, update the configuration one at a time:

    1. Stop the controller node:

      ./bin/kafka-server-stop.sh
    2. Add the new listener to the controller configuration:

      process.roles=controller
      node.id=1
      #...
      
      listeners=CONTROLLER_PLAIN://0.0.0.0:9090, CONTROLLER_SSL://0.0.0.0:9093
      advertised.listeners=CONTROLLER_PLAIN://localhost:9090, CONTROLLER_SSL://localhost:9093
      controller.listener.names=CONTROLLER_PLAIN, CONTROLLER_SSL
      listener.security.protocol.map=CONTROLLER_PLAIN:PLAINTEXT, CONTROLLER_SSL:SSL
      controller.quorum.bootstrap.servers=localhost:9090, localhost:9091, localhost:9092, localhost:9093, localhost:9094, localhost:9095

      In this example, a new controller listener named CONTROLLER_SSL is added with configuration for using the SSL security protocol. Keep the existing CONTROLLER_PLAIN listener until the switch to CONTROLLER_SSL is verified.

    3. Restart the controller node:

      ./bin/kafka-server-start.sh ./config/controller.properties

      Monitor the logs to confirm it starts successfully and binds to the new listener.

  2. Verify the new listener is active by checking the status of the controller quorum:

    ./bin/kafka-metadata-quorum.sh --bootstrap-server localhost:9090 describe --status

    The new listener should be listed in the output.

  3. After all controllers have been updated, switch the default listener:

    process.roles=controller
    node.id=1
    #...
    
    listeners=CONTROLLER_PLAIN://0.0.0.0:9090, CONTROLLER_SSL://0.0.0.0:9093
    advertised.listeners=CONTROLLER_PLAIN://localhost:9090, CONTROLLER_SSL://localhost:9093
    controller.listener.names=CONTROLLER_SSL, CONTROLLER_PLAIN
    listener.security.protocol.map=CONTROLLER_PLAIN:PLAINTEXT, CONTROLLER_SSL:SSL
    controller.quorum.bootstrap.servers=localhost:9090, localhost:9091, localhost:9092, localhost:9093, localhost:9094, localhost:9095

    Switching the order of priority in controller.listener.names ensures that Kafka uses the new CONTROLLER_SSL listener first.

  4. Restart the controllers to apply the changes, as before.
  5. Update the brokers to use the new controller listener:

    Example broker configuration using the new controller listener

    node.id=0
    process.roles=broker
    #...
    
    controller.listener.names=CONTROLLER_SSL
    listener.security.protocol.map=CONTROLLER_SSL:SSL
    controller.quorum.bootstrap.servers=localhost:9093, localhost:9094, localhost:9095

  6. Restart the brokers to apply the changes.

    ./bin/kafka-server-stop.sh
    ./bin/kafka-server-start.sh ./config/server.properties
  7. Remove the old listener from the controllers:

    Example controller configuration after removing the old listener

    process.roles=controller
    node.id=1
    #...
    
    listeners=CONTROLLER_SSL://0.0.0.0:9093
    advertised.listeners=CONTROLLER_SSL://localhost:9093
    controller.listener.names=CONTROLLER_SSL
    listener.security.protocol.map=CONTROLLER_SSL:SSL
    controller.quorum.bootstrap.servers=localhost:9093, localhost:9094, localhost:9095

  8. Restart the controllers to apply the changes, as before.
  9. Verify that the quorum is operational and all controllers are connected over CONTROLLER_SSL.

    Use the kafka-metadata-quorum.sh tool to confirm the listener configuration and quorum status.

5.3.3. Data logs

Apache Kafka stores all records it receives from producers in logs. The logs contain the actual data, in the form of records, that Kafka needs to deliver. Note that these records differ from application log files, which detail the broker’s activities.

Log directories

You can configure log directories using the log.dirs property in the server configuration properties file to store logs in one or multiple log directories. It should be set to /var/lib/kafka directory created during installation:

Data log configuration

log.dirs=/var/lib/kafka

For performance reasons, you can configure log.dirs to multiple directories and place each of them on a different physical device to improve disk I/O performance. For example:

Configuration for multiple directories

log.dirs=/var/lib/kafka1,/var/lib/kafka2,/var/lib/kafka3

5.3.4. Metadata log

Controllers use a metadata log stored as a single-partition topic (__cluster_metadata) on every node. It records the state of the cluster, storing information on brokers, replicas, topics, and partitions, including the state of in-sync replicas and partition leadership.

Metadata log directory

You can configure the directory for storing the metadata log using the metadata.log.dir property. By default, if this property is not set, Kafka uses the log.dirs property to determine the storage directory for both data logs and metadata logs. The metadata log is placed in the first directory specified for log.dirs.

Isolating metadata operations from data operations can improve manageability and potentially lead to performance gains. To set a specific directory for the metadata log, include the metadata.log.dir property in the server configuration properties file.

For example:

Metadata log configuration

log.dirs=/var/lib/kafka
metadata.log.dir=/var/lib/kafka-metadata

Note

Kafka tools are available for inspecting and debugging the metadata log. For more information, see the Apache Kafka documentation.

5.3.5. Node ID

Node ID is a unique identifier for each node (broker or controller) in the cluster. You can assign an integer greater than or equal to 0 as node ID. The node ID is used to identify the nodes after restarts or crashes and it is therefore important that the ID is stable and does not change over time.

The node ID is configured in the Kafka configuration properties file:

node.id=1

5.4. Transitioning to separate broker and controller roles

This procedure describes how to transition to using nodes with separate roles. If your Kafka cluster is using nodes with dual controller and broker roles, you can transition to using nodes with separate roles.

To do this, add new controllers, scale down the controllers on the dual-role nodes, and then switch the dual-role nodes to broker-only.

In this example, we update three dual-role nodes.

Prerequisites

  • Streams for Apache Kafka (minimum 2.9) is installed on each host, and the configuration files are available.
  • The controller quorum is configured for dynamic scaling using the controller.quorum.bootstrap.servers property.
  • Cruise Control is installed.
  • A backup of the cluster is recommended.

Procedure

  1. Add a quorum of three new controller-only nodes.

    Integrate the controllers into the controller quorum by updating the controller.quorum.bootstrap.servers property.

    For more information, see Section 14.6, “Adding new controllers”.

  2. Using the kafka-metadata-quorum.sh tool, remove the dual-role controllers from the controller quorum.

    For more information, see Section 14.7, “Removing controllers”.

  3. For each dual-role node, and one at a time:

    1. Stop the dual-role node:

      ./bin/kafka-server-stop.sh
    2. Configure the dual-role node to serve as a broker-node only by switching process.roles=broker, controller in the node configuration to process.roles=broker.

      Example broker configuration

      node.id=1
      process.roles=broker
      
      log.dirs=/var/lib/kafka
      listeners=PLAINTEXT://0.0.0.0:9092
      controller.listener.names=CONTROLLER
      listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      controller.quorum.bootstrap.servers=localhost:9090, localhost:9091, localhost:9092
      inter.broker.listener.name=PLAINTEXT
      num.partitions=1
      auto.create.topics.enable=false
      default.replication.factor=3
      min.insync.replicas=2
      #...

    3. Restart the broker node that was previously serving a dual role:

      ./bin/kafka-server-start.sh -daemon ./config/server.properties

5.5. Transitioning to dual-role nodes

This procedure describes how to transition from using separate nodes with broker-only and controller-only roles to using dual-role nodes. If your Kafka cluster is using nodes with dedicated controller and broker nodes, you can transition to using single nodes with both roles.

To do this, add dual-role configuration to the nodes, then rebalance the cluster to move partition replicas to the nodes that previously served as controllers only.

Note

A dual-role configuration is suitable for development or testing. In a typical production environment, use dedicated broker and controller nodes.

Prerequisites

  • Streams for Apache Kafka (minimum 2.9) is is installed on each host, and the configuration files are available.
  • The controller quorum is configured for dynamic scaling using the controller.quorum.bootstrap.servers property.
  • Cruise Control is installed.
  • A backup of the cluster is recommended.

Procedure

  1. For each controller node, and one at a time:

    1. Stop the controller node:

      ./bin/kafka-server-stop.sh
    2. Configure the controller-only node to serve as a dual-role node by adding broker-specific configuration.

      At a minimum, do the following:

      • Switch process.roles=controller to process.roles=broker, controller.
      • Add or update the broker log directory using log.dirs.
      • Add a listener for the broker to handle client requests. In this example, PLAINTEXT://:9092 is added.
      • Update mappings between listener names and security protocols using listener.security.protocol.map.
      • Configure a listener for inter-broker communication using inter.broker.listener.name.

      Example dual-role configuration

      process.roles=broker, controller
      node.id=1
      
      log.dirs=/var/lib/kafka
      metadata.log.dir=/var/lib/kafka-metadata
      
      listeners=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9090
      controller.listener.names=CONTROLLER
      listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      controller.quorum.bootstrap.servers=localhost:9090
      inter.broker.listener.name=PLAINTEXT
      
      num.partitions=1
      auto.create.topics.enable=false
      default.replication.factor=3
      min.insync.replicas=2
      # ...

    3. Restart the node that is now operating in a dual role:

      ./bin/kafka-server-start.sh -daemon ./config/dual-role.properties
  2. Use the Cruise Control remove_broker endpoint to reassign partition replicas from broker-only nodes to the nodes that now serve as dual-role nodes.

    The reassignment can take some time depending on the number of topics and partitions in the cluster.

    For more information, see Section 15.7, “Generating optimization proposals”.

  3. Unregister the broker nodes:

    ./bin/kafka-cluster.sh unregister \
      --bootstrap-server <broker_host>:<port> \
      --id <node_id_number>

    For more information, see Section 14.4, “Unregistering brokers after scale-down operations”.

  4. Stop the broker nodes:

    ./bin/kafka-server-stop.sh
Red Hat logoGithubredditYoutubeTwitter

Apprendre

Essayez, achetez et vendez

Communautés

À propos de la documentation Red Hat

Nous aidons les utilisateurs de Red Hat à innover et à atteindre leurs objectifs grâce à nos produits et services avec un contenu auquel ils peuvent faire confiance. Découvrez nos récentes mises à jour.

Rendre l’open source plus inclusif

Red Hat s'engage à remplacer le langage problématique dans notre code, notre documentation et nos propriétés Web. Pour plus de détails, consultez le Blog Red Hat.

À propos de Red Hat

Nous proposons des solutions renforcées qui facilitent le travail des entreprises sur plusieurs plates-formes et environnements, du centre de données central à la périphérie du réseau.

Theme

© 2026 Red Hat
Retour au début