Chapter 5. Configuring Streams for Apache Kafka
Use the Kafka and ZooKeeper properties files to configure Streams for Apache Kafka.
- ZooKeeper
-
/kafka/config/zookeeper.properties
- Kafka
-
/kafka/config/server.properties
The properties files are in the Java format, with each property on separate line in the following format:
<option> = <value>
Lines starting with #
or !
will be treated as comments and will be ignored by Streams for Apache Kafka components.
# This is a comment
Values can be split into multiple lines by using \
directly before the newline / carriage return.
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="bob" \ password="bobs-password";
After you save the changes in the properties files, you need to restart the Kafka broker or ZooKeeper. In a multi-node environment, you will need to repeat the process on each node in the cluster.
5.1. Using standard Kafka configuration properties
Use standard Kafka configuration properties to configure Kafka components.
The properties provide options to control and tune the configuration of the following Kafka components:
- Brokers
- Topics
- Producer, consumer, and management clients
- Kafka Connect
- Kafka Streams
Broker and client parameters include options to configure authorization, authentication and encryption.
For further information on Kafka configuration properties and how to use the properties to tune your deployment, see the following guides:
5.2. Loading configuration values from environment variables
Use the Environment Variables Configuration Provider plugin to load configuration data from environment variables. You can use the Environment Variables Configuration Provider, for example, to load certificates or JAAS configuration from environment variables.
You can use the provider to load configuration data for all Kafka components, including producers and consumers. Use the provider, for example, to provide the credentials for Kafka Connect connector configuration.
Prerequisites
- Streams for Apache Kafka is installed on each host, and the configuration files are available.
The Environment Variables Configuration Provider JAR file.
The JAR file is available from the Streams for Apache Kafka archive.
Procedure
-
Add the Environment Variables Configuration Provider JAR file to the Kafka
libs
directory. Initialize the Environment Variables Configuration Provider in the configuration properties file of the Kafka component. For example, to initialize the provider for Kafka, add the configuration to the
server.properties
file.Configuration to enable the Environment Variables Configuration Provider
config.providers.env.class=org.apache.kafka.common.config.provider.EnvVarConfigProvider
Add configuration to the properties file to load data from environment variables.
Configuration to load data from an environment variable
option=${env:<MY_ENV_VAR_NAME>}
Use capitalized or upper-case environment variable naming conventions, such as
MY_ENV_VAR_NAME
.- Save the changes.
Restart the Kafka component.
For information on restarting brokers in a multi-node cluster, see Section 4.3, “Performing a graceful rolling restart of Kafka brokers”.
5.3. Configuring ZooKeeper
Kafka uses ZooKeeper to store configuration data and for cluster coordination. It is strongly recommended to run a cluster of replicated ZooKeeper instances.
5.3.1. Basic configuration
The most important ZooKeeper configuration options are:
tickTime
- ZooKeeper’s basic time unit in milliseconds. It is used for heartbeats and session timeouts. For example, minimum session timeout will be two ticks.
dataDir
-
The directory where ZooKeeper stores its transaction logs and snapshots of its in-memory database. This should be set to the
/var/lib/zookeeper/
directory that was created during installation. clientPort
-
Port number where clients can connect. Defaults to
2181
.
An example ZooKeeper configuration file named config/zookeeper.properties
is located in the Streams for Apache Kafka installation directory. It is recommended to place the dataDir
directory on a separate disk device to minimize the latency in ZooKeeper.
ZooKeeper configuration file should be located in /opt/kafka/config/zookeeper.properties
. A basic example of the configuration file can be found below. The configuration file has to be readable by the kafka
user.
tickTime=2000 dataDir=/var/lib/zookeeper/ clientPort=2181
5.3.2. ZooKeeper cluster configuration
In most production environments, we recommend you deploy a cluster of replicated ZooKeeper instances. A stable and highly available ZooKeeper cluster is important for running for a reliable ZooKeeper service. ZooKeeper clusters are also referred to as ensembles.
ZooKeeper clusters usually consist of an odd number of nodes. ZooKeeper requires that a majority of the nodes in the cluster are up and running. For example:
- In a cluster with three nodes, at least two of the nodes must be up and running. This means it can tolerate one node being down.
- In a cluster consisting of five nodes, at least three nodes must be available. This means it can tolerate two nodes being down.
- In a cluster consisting of seven nodes, at least four nodes must be available. This means it can tolerate three nodes being down.
Having more nodes in the ZooKeeper cluster delivers better resiliency and reliability of the whole cluster.
ZooKeeper can run in clusters with an even number of nodes. The additional node, however, does not increase the resiliency of the cluster. A cluster with four nodes requires at least three nodes to be available and can tolerate only one node being down. Therefore it has exactly the same resiliency as a cluster with only three nodes.
Ideally, the different ZooKeeper nodes should be located in different data centers or network segments. Increasing the number of ZooKeeper nodes increases the workload spent on cluster synchronization. For most Kafka use cases, a ZooKeeper cluster with 3, 5 or 7 nodes should be sufficient.
A ZooKeeper cluster with 3 nodes can tolerate only 1 unavailable node. This means that if a cluster node crashes while you are doing maintenance on another node your ZooKeeper cluster will be unavailable.
Replicated ZooKeeper configuration supports all configuration options supported by the standalone configuration. Additional options are added for the clustering configuration:
initLimit
-
Amount of time to allow followers to connect and sync to the cluster leader. The time is specified as a number of ticks (see the
tickTime
option for more details). syncLimit
-
Amount of time for which followers can be behind the leader. The time is specified as a number of ticks (see the
tickTime
option for more details). reconfigEnabled
- Enables or disables dynamic reconfiguration. Must be enabled in order to add or remove servers to a ZooKeeper cluster.
standaloneEnabled
- Enables or disables standalone mode, where ZooKeeper runs with only one server.
In addition to the options above, every configuration file should contain a list of servers which should be members of the ZooKeeper cluster. The server records should be specified in the format server.id=hostname:port1:port2
, where:
id
- The ID of the ZooKeeper cluster node.
hostname
- The hostname or IP address where the node listens for connections.
port1
- The port number used for intra-cluster communication.
port2
- The port number used for leader election.
The following is an example configuration file of a ZooKeeper cluster with three nodes:
tickTime=2000 dataDir=/var/lib/zookeeper/ initLimit=5 syncLimit=2 reconfigEnabled=true standaloneEnabled=false server.1=172.17.0.1:2888:3888:participant;172.17.0.1:2181 server.2=172.17.0.2:2888:3888:participant;172.17.0.2:2181 server.3=172.17.0.3:2888:3888:participant;172.17.0.3:2181
To use four letter word commands, specify 4lw.commands.whitelist=*
in zookeeper.properties
.
myid
files
Each node in the ZooKeeper cluster must be assigned a unique ID
. Each node’s ID
must be configured in a myid
file and stored in the dataDir
folder, like /var/lib/zookeeper/
. The myid
files should contain only a single line with the written ID
as text. The ID
can be any integer from 1 to 255. You must manually create this file on each cluster node. Using this file, each ZooKeeper instance will use the configuration from the corresponding server.
line in the configuration file to configure its listeners. It will also use all other server.
lines to identify other cluster members.
In the above example, there are three nodes, so each one will have a different myid
with values 1
, 2
, and 3
respectively.
5.3.3. Authentication
By default, ZooKeeper does not use any form of authentication and allows anonymous connections. However, it supports Java Authentication and Authorization Service (JAAS) which can be used to set up authentication using Simple Authentication and Security Layer (SASL). ZooKeeper supports authentication using the DIGEST-MD5 SASL mechanism with locally stored credentials.
5.3.3.1. Authentication with SASL
JAAS is configured using a separate configuration file. It is recommended to place the JAAS configuration file in the same directory as the ZooKeeper configuration (/opt/kafka/config/
). The recommended file name is zookeeper-jaas.conf
. When using a ZooKeeper cluster with multiple nodes, the JAAS configuration file has to be created on all cluster nodes.
JAAS is configured using contexts. Separate parts such as the server and client are always configured with a separate context. The context is a configuration option and has the following format:
ContextName { param1 param2; };
SASL Authentication is configured separately for server-to-server communication (communication between ZooKeeper instances) and client-to-server communication (communication between Kafka and ZooKeeper). Server-to-server authentication is relevant only for ZooKeeper clusters with multiple nodes.
Server-to-Server authentication
For server-to-server authentication, the JAAS configuration file contains two parts:
- The server configuration
- The client configuration
When using DIGEST-MD5 SASL mechanism, the QuorumServer
context is used to configure the authentication server. It must contain all the usernames to be allowed to connect together with their passwords in an unencrypted form. The second context, QuorumLearner
, has to be configured for the client which is built into ZooKeeper. It also contains the password in an unencrypted form. An example of the JAAS configuration file for DIGEST-MD5 mechanism can be found below:
QuorumServer { org.apache.zookeeper.server.auth.DigestLoginModule required user_zookeeper="123456"; }; QuorumLearner { org.apache.zookeeper.server.auth.DigestLoginModule required username="zookeeper" password="123456"; };
In addition to the JAAS configuration file, you must enable the server-to-server authentication in the regular ZooKeeper configuration file by specifying the following options:
quorum.auth.enableSasl=true quorum.auth.learnerRequireSasl=true quorum.auth.serverRequireSasl=true quorum.auth.learner.loginContext=QuorumLearner quorum.auth.server.loginContext=QuorumServer quorum.cnxn.threads.size=20
Use the KAFKA_OPTS
environment variable to pass the JAAS configuration file to the ZooKeeper server as a Java property:
su - kafka export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/config/zookeeper-jaas.conf"; /opt/kafka/bin/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties
For more information about server-to-server authentication, see ZooKeeper wiki.
Client-to-Server authentication
Client-to-server authentication is configured in the same JAAS file as the server-to-server authentication. However, unlike the server-to-server authentication, it contains only the server configuration. The client part of the configuration has to be done in the client. For information on how to configure a Kafka broker to connect to ZooKeeper using authentication, see the Kafka installation section.
Add the Server context to the JAAS configuration file to configure client-to-server authentication. For DIGEST-MD5 mechanism it configures all usernames and passwords:
Server { org.apache.zookeeper.server.auth.DigestLoginModule required user_super="123456" user_kafka="123456" user_someoneelse="123456"; };
After configuring the JAAS context, enable the client-to-server authentication in the ZooKeeper configuration file by adding the following line:
requireClientAuthScheme=sasl authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider authProvider.2=org.apache.zookeeper.server.auth.SASLAuthenticationProvider authProvider.3=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
You must add the authProvider.<ID>
property for every server that is part of the ZooKeeper cluster.
Use the KAFKA_OPTS
environment variable to pass the JAAS configuration file to the ZooKeeper server as a Java property:
su - kafka export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/config/zookeeper-jaas.conf"; /opt/kafka/bin/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties
For more information about configuring ZooKeeper authentication in Kafka brokers, see Section 6.5, “ZooKeeper authentication”.
5.3.3.2. Enabling server-to-server authentication using DIGEST-MD5
This procedure describes how to enable authentication using the SASL DIGEST-MD5 mechanism between the nodes of the ZooKeeper cluster.
Prerequisites
- Streams for Apache Kafka is installed on the host
- ZooKeeper cluster is configured with multiple nodes.
Enabling SASL DIGEST-MD5 authentication
On all ZooKeeper nodes, create or edit the
/opt/kafka/config/zookeeper-jaas.conf
JAAS configuration file and add the following contexts:QuorumServer { org.apache.zookeeper.server.auth.DigestLoginModule required user_<Username>="<Password>"; }; QuorumLearner { org.apache.zookeeper.server.auth.DigestLoginModule required username="<Username>" password="<Password>"; };
The username and password must be the same in both JAAS contexts. For example:
QuorumServer { org.apache.zookeeper.server.auth.DigestLoginModule required user_zookeeper="123456"; }; QuorumLearner { org.apache.zookeeper.server.auth.DigestLoginModule required username="zookeeper" password="123456"; };
On all ZooKeeper nodes, edit the
/opt/kafka/config/zookeeper.properties
ZooKeeper configuration file and set the following options:quorum.auth.enableSasl=true quorum.auth.learnerRequireSasl=true quorum.auth.serverRequireSasl=true quorum.auth.learner.loginContext=QuorumLearner quorum.auth.server.loginContext=QuorumServer quorum.cnxn.threads.size=20
Restart all ZooKeeper nodes one by one. To pass the JAAS configuration to ZooKeeper, use the
KAFKA_OPTS
environment variable.su - kafka export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/config/zookeeper-jaas.conf"; /opt/kafka/bin/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties
5.3.3.3. Enabling Client-to-server authentication using DIGEST-MD5
This procedure describes how to enable authentication using the SASL DIGEST-MD5 mechanism between ZooKeeper clients and ZooKeeper.
Prerequisites
- Streams for Apache Kafka is installed on the host
- ZooKeeper cluster is configured and running.
Enabling SASL DIGEST-MD5 authentication
On all ZooKeeper nodes, create or edit the
/opt/kafka/config/zookeeper-jaas.conf
JAAS configuration file and add the following context:Server { org.apache.zookeeper.server.auth.DigestLoginModule required user_super="<SuperUserPassword>" user<Username1>_="<Password1>" user<USername2>_="<Password2>"; };
The
super
automatically has administrator priviledges. The file can contain multiple users, but only one additional user is required by the Kafka brokers. The recommended name for the Kafka user iskafka
.The following example shows the
Server
context for client-to-server authentication:Server { org.apache.zookeeper.server.auth.DigestLoginModule required user_super="123456" user_kafka="123456"; };
On all ZooKeeper nodes, edit the
/opt/kafka/config/zookeeper.properties
ZooKeeper configuration file and set the following options:requireClientAuthScheme=sasl authProvider.<IdOfBroker1>=org.apache.zookeeper.server.auth.SASLAuthenticationProvider authProvider.<IdOfBroker2>=org.apache.zookeeper.server.auth.SASLAuthenticationProvider authProvider.<IdOfBroker3>=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
The
authProvider.<ID>
property has to be added for every node which is part of the ZooKeeper cluster. An example three-node ZooKeeper cluster configuration must look like the following:requireClientAuthScheme=sasl authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider authProvider.2=org.apache.zookeeper.server.auth.SASLAuthenticationProvider authProvider.3=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
Restart all ZooKeeper nodes one by one. To pass the JAAS configuration to ZooKeeper, use the
KAFKA_OPTS
environment variable.su - kafka export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/config/zookeeper-jaas.conf"; /opt/kafka/bin/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties
5.3.4. Authorization
ZooKeeper supports access control lists (ACLs) to protect data stored inside it. Kafka brokers can automatically configure the ACL rights for all ZooKeeper records they create so no other ZooKeeper user can modify them.
For more information about enabling ZooKeeper ACLs in Kafka brokers, see Section 6.6, “ZooKeeper authorization”.
5.3.5. TLS
ZooKeeper supports TLS for encryption or authentication.
5.3.6. Additional configuration options
You can set the following additional ZooKeeper configuration options based on your use case:
maxClientCnxns
- The maximum number of concurrent client connections to a single member of the ZooKeeper cluster.
autopurge.snapRetainCount
-
Number of snapshots of ZooKeeper’s in-memory database which will be retained. Default value is
3
. autopurge.purgeInterval
-
The time interval in hours for purging snapshots. The default value is
0
and this option is disabled.
All available configuration options can be found in the ZooKeeper documentation.
5.4. Configuring Kafka
Kafka uses a properties file to store static configuration. The recommended location for the configuration file is /opt/kafka/config/server.properties
. The configuration file has to be readable by the kafka
user.
Streams for Apache Kafka ships an example configuration file that highlights various basic and advanced features of the product. It can be found under config/server.properties
in the Streams for Apache Kafka installation directory.
This chapter explains the most important configuration options.
5.4.1. ZooKeeper
Kafka brokers need ZooKeeper to store some parts of their configuration as well as to coordinate the cluster (for example to decide which node is a leader for which partition). Connection details for the ZooKeeper cluster are stored in the configuration file. The field zookeeper.connect
contains a comma-separated list of hostnames and ports of members of the zookeeper cluster.
For example:
zookeeper.connect=zoo1.my-domain.com:2181,zoo2.my-domain.com:2181,zoo3.my-domain.com:2181
Kafka will use these addresses to connect to the ZooKeeper cluster. With this configuration, all Kafka znodes
will be created directly in the root of ZooKeeper database. Therefore, such a ZooKeeper cluster could be used only for a single Kafka cluster. To configure multiple Kafka clusters to use single ZooKeeper cluster, specify a base (prefix) path at the end of the ZooKeeper connection string in the Kafka configuration file:
zookeeper.connect=zoo1.my-domain.com:2181,zoo2.my-domain.com:2181,zoo3.my-domain.com:2181/my-cluster-1
5.4.2. Listeners
Listeners are used to connect to Kafka brokers. Each Kafka broker can be configured to use multiple listeners. Each listener requires a different configuration so it can listen on a different port or network interface.
To configure listeners, edit the listeners
property in the Kafka configuration properties file. Add listeners to the listeners
property as a comma-separated list. Configure each property as follows:
<listener_name>://<hostname>:<port>
If <hostname>
is empty, Kafka uses the java.net.InetAddress.getCanonicalHostName()
class as the hostname.
Example configuration for multiple listeners
listeners=internal-1://:9092,internal-2://:9093,replication://:9094
When a Kafka client wants to connect to a Kafka cluster, it first connects to the bootstrap server, which is one of the cluster nodes. The bootstrap server provides the client with a list of all the brokers in the cluster, and the client connects to each one individually. The list of brokers is based on the configured listeners
.
Advertised listeners
Optionally, you can use the advertised.listeners
property to provide the client with a different set of listener addresses than those given in the listeners
property. This is useful if additional network infrastructure, such as a proxy, is between the client and the broker, or an external DNS name is being used instead of an IP address.
The advertised.listeners
property is formatted in the same way as the listeners
property.
Example configuration for advertised listeners
listeners=internal-1://:9092,internal-2://:9093 advertised.listeners=internal-1://my-broker-1.my-domain.com:1234,internal-2://my-broker-1.my-domain.com:1235
The names of the advertised listeners must match those listed in the listeners
property.
Inter-broker listeners
Inter-broker listeners are used for communication between Kafka brokers. Inter-broker communication is required for:
- Coordinating workloads between different brokers
- Replicating messages between partitions stored on different brokers
The inter-broker listener can be assigned to a port of your choice. When multiple listeners are configured, you can define the name of the inter-broker listener in the inter.broker.listener.name
property of your broker configuration.
Here, the inter-broker listener is named as REPLICATION
:
listeners=REPLICATION://0.0.0.0:9091 inter.broker.listener.name=REPLICATION
Controller listeners
Controller configuration is used to connect and communicate with the controller that coordinates the cluster and manages the metadata used to track the status of brokers and partitions.
By default, communication between the controllers and brokers uses a dedicated controller listener. Controllers are responsible for coordinating administrative tasks, such as partition leadership changes, so one or more of these listeners is required.
Specify listeners to use for controllers using the controller.listener.names
property. You can specify a quorum of controller voters using the controller.quorum.voters
property. The quorum enables a leader-follower structure for administrative tasks, with the leader actively managing operations and followers as hot standbys, ensuring metadata consistency in memory and facilitating failover.
listeners=CONTROLLER://0.0.0.0:9090 controller.listener.names=CONTROLLER controller.quorum.voters=1@localhost:9090
The format for the controller voters is <cluster_id>@<hostname>:<port>
.
5.4.3. Commit logs
Apache Kafka stores all records it receives from producers in commit logs. The commit logs contain the actual data, in the form of records, that Kafka needs to deliver. Note that these records differ from application log files, which detail the broker’s activities.
Log directories
You can configure log directories using the log.dirs
property file to store commit logs in one or multiple log directories. It should be set to /var/lib/kafka
directory created during installation:
log.dirs=/var/lib/kafka
For performance reasons, you can configure log.dirs to multiple directories and place each of them on a different physical device to improve disk I/O performance. For example:
log.dirs=/var/lib/kafka1,/var/lib/kafka2,/var/lib/kafka3
5.4.4. Broker ID
Broker ID is a unique identifier for each broker in the cluster. You can assign an integer greater than or equal to 0 as broker ID. The broker ID is used to identify the brokers after restarts or crashes and it is therefore important that the id is stable and does not change over time. The broker ID is configured in the broker properties file:
broker.id=1