Rechercher

Ce contenu n'est pas disponible dans la langue sélectionnée.

Chapter 5. Configuring Streams for Apache Kafka

download PDF

Use the Kafka and ZooKeeper properties files to configure Streams for Apache Kafka.

ZooKeeper
/kafka/config/zookeeper.properties
Kafka
/kafka/config/server.properties

The properties files are in the Java format, with each property on separate line in the following format:

<option> = <value>

Lines starting with # or ! will be treated as comments and will be ignored by Streams for Apache Kafka components.

# This is a comment

Values can be split into multiple lines by using \ directly before the newline / carriage return.

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="bob" \
    password="bobs-password";

After you save the changes in the properties files, you need to restart the Kafka broker or ZooKeeper. In a multi-node environment, you will need to repeat the process on each node in the cluster.

5.1. Using standard Kafka configuration properties

Use standard Kafka configuration properties to configure Kafka components.

The properties provide options to control and tune the configuration of the following Kafka components:

  • Brokers
  • Topics
  • Producer, consumer, and management clients
  • Kafka Connect
  • Kafka Streams

Broker and client parameters include options to configure authorization, authentication and encryption.

For further information on Kafka configuration properties and how to use the properties to tune your deployment, see the following guides:

5.2. Loading configuration values from environment variables

Use the Environment Variables Configuration Provider plugin to load configuration data from environment variables. You can use the Environment Variables Configuration Provider, for example, to load certificates or JAAS configuration from environment variables.

You can use the provider to load configuration data for all Kafka components, including producers and consumers. Use the provider, for example, to provide the credentials for Kafka Connect connector configuration.

Prerequisites

Procedure

  1. Add the Environment Variables Configuration Provider JAR file to the Kafka libs directory.
  2. Initialize the Environment Variables Configuration Provider in the configuration properties file of the Kafka component. For example, to initialize the provider for Kafka, add the configuration to the server.properties file.

    Configuration to enable the Environment Variables Configuration Provider

    config.providers.env.class=org.apache.kafka.common.config.provider.EnvVarConfigProvider

  3. Add configuration to the properties file to load data from environment variables.

    Configuration to load data from an environment variable

    option=${env:<MY_ENV_VAR_NAME>}

    Use capitalized or upper-case environment variable naming conventions, such as MY_ENV_VAR_NAME.

  4. Save the changes.
  5. Restart the Kafka component.

    For information on restarting brokers in a multi-node cluster, see Section 4.3, “Performing a graceful rolling restart of Kafka brokers”.

5.3. Configuring ZooKeeper

Kafka uses ZooKeeper to store configuration data and for cluster coordination. It is strongly recommended to run a cluster of replicated ZooKeeper instances.

5.3.1. Basic configuration

The most important ZooKeeper configuration options are:

tickTime
ZooKeeper’s basic time unit in milliseconds. It is used for heartbeats and session timeouts. For example, minimum session timeout will be two ticks.
dataDir
The directory where ZooKeeper stores its transaction logs and snapshots of its in-memory database. This should be set to the /var/lib/zookeeper/ directory that was created during installation.
clientPort
Port number where clients can connect. Defaults to 2181.

An example ZooKeeper configuration file named config/zookeeper.properties is located in the Streams for Apache Kafka installation directory. It is recommended to place the dataDir directory on a separate disk device to minimize the latency in ZooKeeper.

ZooKeeper configuration file should be located in /opt/kafka/config/zookeeper.properties. A basic example of the configuration file can be found below. The configuration file has to be readable by the kafka user.

tickTime=2000
dataDir=/var/lib/zookeeper/
clientPort=2181

5.3.2. ZooKeeper cluster configuration

In most production environments, we recommend you deploy a cluster of replicated ZooKeeper instances. A stable and highly available ZooKeeper cluster is important for running for a reliable ZooKeeper service. ZooKeeper clusters are also referred to as ensembles.

ZooKeeper clusters usually consist of an odd number of nodes. ZooKeeper requires that a majority of the nodes in the cluster are up and running. For example:

  • In a cluster with three nodes, at least two of the nodes must be up and running. This means it can tolerate one node being down.
  • In a cluster consisting of five nodes, at least three nodes must be available. This means it can tolerate two nodes being down.
  • In a cluster consisting of seven nodes, at least four nodes must be available. This means it can tolerate three nodes being down.

Having more nodes in the ZooKeeper cluster delivers better resiliency and reliability of the whole cluster.

ZooKeeper can run in clusters with an even number of nodes. The additional node, however, does not increase the resiliency of the cluster. A cluster with four nodes requires at least three nodes to be available and can tolerate only one node being down. Therefore it has exactly the same resiliency as a cluster with only three nodes.

Ideally, the different ZooKeeper nodes should be located in different data centers or network segments. Increasing the number of ZooKeeper nodes increases the workload spent on cluster synchronization. For most Kafka use cases, a ZooKeeper cluster with 3, 5 or 7 nodes should be sufficient.

Warning

A ZooKeeper cluster with 3 nodes can tolerate only 1 unavailable node. This means that if a cluster node crashes while you are doing maintenance on another node your ZooKeeper cluster will be unavailable.

Replicated ZooKeeper configuration supports all configuration options supported by the standalone configuration. Additional options are added for the clustering configuration:

initLimit
Amount of time to allow followers to connect and sync to the cluster leader. The time is specified as a number of ticks (see the tickTime option for more details).
syncLimit
Amount of time for which followers can be behind the leader. The time is specified as a number of ticks (see the tickTime option for more details).
reconfigEnabled
Enables or disables dynamic reconfiguration. Must be enabled in order to add or remove servers to a ZooKeeper cluster.
standaloneEnabled
Enables or disables standalone mode, where ZooKeeper runs with only one server.

In addition to the options above, every configuration file should contain a list of servers which should be members of the ZooKeeper cluster. The server records should be specified in the format server.id=hostname:port1:port2, where:

id
The ID of the ZooKeeper cluster node.
hostname
The hostname or IP address where the node listens for connections.
port1
The port number used for intra-cluster communication.
port2
The port number used for leader election.

The following is an example configuration file of a ZooKeeper cluster with three nodes:

tickTime=2000
dataDir=/var/lib/zookeeper/
initLimit=5
syncLimit=2
reconfigEnabled=true
standaloneEnabled=false

server.1=172.17.0.1:2888:3888:participant;172.17.0.1:2181
server.2=172.17.0.2:2888:3888:participant;172.17.0.2:2181
server.3=172.17.0.3:2888:3888:participant;172.17.0.3:2181
Tip

To use four letter word commands, specify 4lw.commands.whitelist=* in zookeeper.properties.

myid files

Each node in the ZooKeeper cluster must be assigned a unique ID. Each node’s ID must be configured in a myid file and stored in the dataDir folder, like /var/lib/zookeeper/. The myid files should contain only a single line with the written ID as text. The ID can be any integer from 1 to 255. You must manually create this file on each cluster node. Using this file, each ZooKeeper instance will use the configuration from the corresponding server. line in the configuration file to configure its listeners. It will also use all other server. lines to identify other cluster members.

In the above example, there are three nodes, so each one will have a different myid with values 1, 2, and 3 respectively.

5.3.3. Authentication

By default, ZooKeeper does not use any form of authentication and allows anonymous connections. However, it supports Java Authentication and Authorization Service (JAAS) which can be used to set up authentication using Simple Authentication and Security Layer (SASL). ZooKeeper supports authentication using the DIGEST-MD5 SASL mechanism with locally stored credentials.

5.3.3.1. Authentication with SASL

JAAS is configured using a separate configuration file. It is recommended to place the JAAS configuration file in the same directory as the ZooKeeper configuration (/opt/kafka/config/). The recommended file name is zookeeper-jaas.conf. When using a ZooKeeper cluster with multiple nodes, the JAAS configuration file has to be created on all cluster nodes.

JAAS is configured using contexts. Separate parts such as the server and client are always configured with a separate context. The context is a configuration option and has the following format:

ContextName {
       param1
       param2;
};

SASL Authentication is configured separately for server-to-server communication (communication between ZooKeeper instances) and client-to-server communication (communication between Kafka and ZooKeeper). Server-to-server authentication is relevant only for ZooKeeper clusters with multiple nodes.

Server-to-Server authentication

For server-to-server authentication, the JAAS configuration file contains two parts:

  • The server configuration
  • The client configuration

When using DIGEST-MD5 SASL mechanism, the QuorumServer context is used to configure the authentication server. It must contain all the usernames to be allowed to connect together with their passwords in an unencrypted form. The second context, QuorumLearner, has to be configured for the client which is built into ZooKeeper. It also contains the password in an unencrypted form. An example of the JAAS configuration file for DIGEST-MD5 mechanism can be found below:

QuorumServer {
       org.apache.zookeeper.server.auth.DigestLoginModule required
       user_zookeeper="123456";
};

QuorumLearner {
       org.apache.zookeeper.server.auth.DigestLoginModule required
       username="zookeeper"
       password="123456";
};

In addition to the JAAS configuration file, you must enable the server-to-server authentication in the regular ZooKeeper configuration file by specifying the following options:

quorum.auth.enableSasl=true
quorum.auth.learnerRequireSasl=true
quorum.auth.serverRequireSasl=true
quorum.auth.learner.loginContext=QuorumLearner
quorum.auth.server.loginContext=QuorumServer
quorum.cnxn.threads.size=20

Use the KAFKA_OPTS environment variable to pass the JAAS configuration file to the ZooKeeper server as a Java property:

su - kafka
export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/config/zookeeper-jaas.conf"; /opt/kafka/bin/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties

For more information about server-to-server authentication, see ZooKeeper wiki.

Client-to-Server authentication

Client-to-server authentication is configured in the same JAAS file as the server-to-server authentication. However, unlike the server-to-server authentication, it contains only the server configuration. The client part of the configuration has to be done in the client. For information on how to configure a Kafka broker to connect to ZooKeeper using authentication, see the Kafka installation section.

Add the Server context to the JAAS configuration file to configure client-to-server authentication. For DIGEST-MD5 mechanism it configures all usernames and passwords:

Server {
    org.apache.zookeeper.server.auth.DigestLoginModule required
    user_super="123456"
    user_kafka="123456"
    user_someoneelse="123456";
};

After configuring the JAAS context, enable the client-to-server authentication in the ZooKeeper configuration file by adding the following line:

requireClientAuthScheme=sasl
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
authProvider.2=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
authProvider.3=org.apache.zookeeper.server.auth.SASLAuthenticationProvider

You must add the authProvider.<ID> property for every server that is part of the ZooKeeper cluster.

Use the KAFKA_OPTS environment variable to pass the JAAS configuration file to the ZooKeeper server as a Java property:

su - kafka
export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/config/zookeeper-jaas.conf"; /opt/kafka/bin/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties

For more information about configuring ZooKeeper authentication in Kafka brokers, see Section 6.5, “ZooKeeper authentication”.

5.3.3.2. Enabling server-to-server authentication using DIGEST-MD5

This procedure describes how to enable authentication using the SASL DIGEST-MD5 mechanism between the nodes of the ZooKeeper cluster.

Prerequisites

  • Streams for Apache Kafka is installed on the host
  • ZooKeeper cluster is configured with multiple nodes.

Enabling SASL DIGEST-MD5 authentication

  1. On all ZooKeeper nodes, create or edit the /opt/kafka/config/zookeeper-jaas.conf JAAS configuration file and add the following contexts:

    QuorumServer {
           org.apache.zookeeper.server.auth.DigestLoginModule required
           user_<Username>="<Password>";
    };
    
    QuorumLearner {
           org.apache.zookeeper.server.auth.DigestLoginModule required
           username="<Username>"
           password="<Password>";
    };

    The username and password must be the same in both JAAS contexts. For example:

    QuorumServer {
           org.apache.zookeeper.server.auth.DigestLoginModule required
           user_zookeeper="123456";
    };
    
    QuorumLearner {
           org.apache.zookeeper.server.auth.DigestLoginModule required
           username="zookeeper"
           password="123456";
    };
  2. On all ZooKeeper nodes, edit the /opt/kafka/config/zookeeper.properties ZooKeeper configuration file and set the following options:

    quorum.auth.enableSasl=true
    quorum.auth.learnerRequireSasl=true
    quorum.auth.serverRequireSasl=true
    quorum.auth.learner.loginContext=QuorumLearner
    quorum.auth.server.loginContext=QuorumServer
    quorum.cnxn.threads.size=20
  3. Restart all ZooKeeper nodes one by one. To pass the JAAS configuration to ZooKeeper, use the KAFKA_OPTS environment variable.

    su - kafka
    export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/config/zookeeper-jaas.conf"; /opt/kafka/bin/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties

5.3.3.3. Enabling Client-to-server authentication using DIGEST-MD5

This procedure describes how to enable authentication using the SASL DIGEST-MD5 mechanism between ZooKeeper clients and ZooKeeper.

Prerequisites

Enabling SASL DIGEST-MD5 authentication

  1. On all ZooKeeper nodes, create or edit the /opt/kafka/config/zookeeper-jaas.conf JAAS configuration file and add the following context:

    Server {
        org.apache.zookeeper.server.auth.DigestLoginModule required
        user_super="<SuperUserPassword>"
        user<Username1>_="<Password1>" user<USername2>_="<Password2>";
    };

    The super automatically has administrator priviledges. The file can contain multiple users, but only one additional user is required by the Kafka brokers. The recommended name for the Kafka user is kafka.

    The following example shows the Server context for client-to-server authentication:

    Server {
        org.apache.zookeeper.server.auth.DigestLoginModule required
        user_super="123456"
        user_kafka="123456";
    };
  2. On all ZooKeeper nodes, edit the /opt/kafka/config/zookeeper.properties ZooKeeper configuration file and set the following options:

    requireClientAuthScheme=sasl
    authProvider.<IdOfBroker1>=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
    authProvider.<IdOfBroker2>=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
    authProvider.<IdOfBroker3>=org.apache.zookeeper.server.auth.SASLAuthenticationProvider

    The authProvider.<ID> property has to be added for every node which is part of the ZooKeeper cluster. An example three-node ZooKeeper cluster configuration must look like the following:

    requireClientAuthScheme=sasl
    authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
    authProvider.2=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
    authProvider.3=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
  3. Restart all ZooKeeper nodes one by one. To pass the JAAS configuration to ZooKeeper, use the KAFKA_OPTS environment variable.

    su - kafka
    export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/config/zookeeper-jaas.conf"; /opt/kafka/bin/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties

5.3.4. Authorization

ZooKeeper supports access control lists (ACLs) to protect data stored inside it. Kafka brokers can automatically configure the ACL rights for all ZooKeeper records they create so no other ZooKeeper user can modify them.

For more information about enabling ZooKeeper ACLs in Kafka brokers, see Section 6.6, “ZooKeeper authorization”.

5.3.5. TLS

ZooKeeper supports TLS for encryption or authentication.

5.3.6. Additional configuration options

You can set the following additional ZooKeeper configuration options based on your use case:

maxClientCnxns
The maximum number of concurrent client connections to a single member of the ZooKeeper cluster.
autopurge.snapRetainCount
Number of snapshots of ZooKeeper’s in-memory database which will be retained. Default value is 3.
autopurge.purgeInterval
The time interval in hours for purging snapshots. The default value is 0 and this option is disabled.

All available configuration options can be found in the ZooKeeper documentation.

5.4. Configuring Kafka

Kafka uses a properties file to store static configuration. The recommended location for the configuration file is /opt/kafka/config/server.properties. The configuration file has to be readable by the kafka user.

Streams for Apache Kafka ships an example configuration file that highlights various basic and advanced features of the product. It can be found under config/server.properties in the Streams for Apache Kafka installation directory.

This chapter explains the most important configuration options.

5.4.1. ZooKeeper

Kafka brokers need ZooKeeper to store some parts of their configuration as well as to coordinate the cluster (for example to decide which node is a leader for which partition). Connection details for the ZooKeeper cluster are stored in the configuration file. The field zookeeper.connect contains a comma-separated list of hostnames and ports of members of the zookeeper cluster.

For example:

zookeeper.connect=zoo1.my-domain.com:2181,zoo2.my-domain.com:2181,zoo3.my-domain.com:2181

Kafka will use these addresses to connect to the ZooKeeper cluster. With this configuration, all Kafka znodes will be created directly in the root of ZooKeeper database. Therefore, such a ZooKeeper cluster could be used only for a single Kafka cluster. To configure multiple Kafka clusters to use single ZooKeeper cluster, specify a base (prefix) path at the end of the ZooKeeper connection string in the Kafka configuration file:

zookeeper.connect=zoo1.my-domain.com:2181,zoo2.my-domain.com:2181,zoo3.my-domain.com:2181/my-cluster-1

5.4.2. Listeners

Listeners are used to connect to Kafka brokers. Each Kafka broker can be configured to use multiple listeners. Each listener requires a different configuration so it can listen on a different port or network interface.

To configure listeners, edit the listeners property in the Kafka configuration properties file. Add listeners to the listeners property as a comma-separated list. Configure each property as follows:

<listener_name>://<hostname>:<port>

If <hostname> is empty, Kafka uses the java.net.InetAddress.getCanonicalHostName() class as the hostname.

Example configuration for multiple listeners

listeners=internal-1://:9092,internal-2://:9093,replication://:9094

When a Kafka client wants to connect to a Kafka cluster, it first connects to the bootstrap server, which is one of the cluster nodes. The bootstrap server provides the client with a list of all the brokers in the cluster, and the client connects to each one individually. The list of brokers is based on the configured listeners.

Advertised listeners

Optionally, you can use the advertised.listeners property to provide the client with a different set of listener addresses than those given in the listeners property. This is useful if additional network infrastructure, such as a proxy, is between the client and the broker, or an external DNS name is being used instead of an IP address.

The advertised.listeners property is formatted in the same way as the listeners property.

Example configuration for advertised listeners

listeners=internal-1://:9092,internal-2://:9093
advertised.listeners=internal-1://my-broker-1.my-domain.com:1234,internal-2://my-broker-1.my-domain.com:1235

Note

The names of the advertised listeners must match those listed in the listeners property.

Inter-broker listeners

Inter-broker listeners are used for communication between Kafka brokers. Inter-broker communication is required for:

  • Coordinating workloads between different brokers
  • Replicating messages between partitions stored on different brokers

The inter-broker listener can be assigned to a port of your choice. When multiple listeners are configured, you can define the name of the inter-broker listener in the inter.broker.listener.name property of your broker configuration.

Here, the inter-broker listener is named as REPLICATION:

listeners=REPLICATION://0.0.0.0:9091
inter.broker.listener.name=REPLICATION

Controller listeners

Controller configuration is used to connect and communicate with the controller that coordinates the cluster and manages the metadata used to track the status of brokers and partitions.

By default, communication between the controllers and brokers uses a dedicated controller listener. Controllers are responsible for coordinating administrative tasks, such as partition leadership changes, so one or more of these listeners is required.

Specify listeners to use for controllers using the controller.listener.names property. You can specify a quorum of controller voters using the controller.quorum.voters property. The quorum enables a leader-follower structure for administrative tasks, with the leader actively managing operations and followers as hot standbys, ensuring metadata consistency in memory and facilitating failover.

listeners=CONTROLLER://0.0.0.0:9090
controller.listener.names=CONTROLLER
controller.quorum.voters=1@localhost:9090

The format for the controller voters is <cluster_id>@<hostname>:<port>.

5.4.3. Commit logs

Apache Kafka stores all records it receives from producers in commit logs. The commit logs contain the actual data, in the form of records, that Kafka needs to deliver. Note that these records differ from application log files, which detail the broker’s activities.

Log directories

You can configure log directories using the log.dirs property file to store commit logs in one or multiple log directories. It should be set to /var/lib/kafka directory created during installation:

log.dirs=/var/lib/kafka

For performance reasons, you can configure log.dirs to multiple directories and place each of them on a different physical device to improve disk I/O performance. For example:

log.dirs=/var/lib/kafka1,/var/lib/kafka2,/var/lib/kafka3

5.4.4. Broker ID

Broker ID is a unique identifier for each broker in the cluster. You can assign an integer greater than or equal to 0 as broker ID. The broker ID is used to identify the brokers after restarts or crashes and it is therefore important that the id is stable and does not change over time. The broker ID is configured in the broker properties file:

broker.id=1
Red Hat logoGithubRedditYoutubeTwitter

Apprendre

Essayez, achetez et vendez

Communautés

À propos de la documentation Red Hat

Nous aidons les utilisateurs de Red Hat à innover et à atteindre leurs objectifs grâce à nos produits et services avec un contenu auquel ils peuvent faire confiance.

Rendre l’open source plus inclusif

Red Hat s'engage à remplacer le langage problématique dans notre code, notre documentation et nos propriétés Web. Pour plus de détails, consultez leBlog Red Hat.

À propos de Red Hat

Nous proposons des solutions renforcées qui facilitent le travail des entreprises sur plusieurs plates-formes et environnements, du centre de données central à la périphérie du réseau.

© 2024 Red Hat, Inc.