Chapter 6. Securing access to Kafka


Secure your Kafka cluster by managing the access a client has to Kafka brokers. Specify configuration options to secure Kafka brokers and clients

A secure connection between Kafka brokers and clients can encompass the following:

  • Encryption for data exchange
  • Authentication to prove identity
  • Authorization to allow or decline actions executed by users

The authentication and authorization mechanisms specified for a client must match those specified for the Kafka brokers.

6.1. Listener configuration

Encryption and authentication in Kafka brokers is configured per listener. For more information about Kafka listener configuration, see Section 5.4.2, “Listeners”.

Each listener in the Kafka broker is configured with its own security protocol. The configuration property listener.security.protocol.map defines which listener uses which security protocol. It maps each listener name to its security protocol. Supported security protocols are:

PLAINTEXT
Listener without any encryption or authentication.
SSL
Listener using TLS encryption and, optionally, authentication using TLS client certificates.
SASL_PLAINTEXT
Listener without encryption but with SASL-based authentication.
SASL_SSL
Listener with TLS-based encryption and SASL-based authentication.

Given the following listeners configuration:

listeners=INT1://:9092,INT2://:9093,REPLICATION://:9094
Copy to Clipboard Toggle word wrap

the listener.security.protocol.map might look like this:

listener.security.protocol.map=INT1:SASL_PLAINTEXT,INT2:SASL_SSL,REPLICATION:SSL
Copy to Clipboard Toggle word wrap

This would configure the listener INT1 to use unencrypted connections with SASL authentication, the listener INT2 to use encrypted connections with SASL authentication and the REPLICATION interface to use TLS encryption (possibly with TLS client authentication). The same security protocol can be used multiple times. The following example is also a valid configuration:

listener.security.protocol.map=INT1:SSL,INT2:SSL,REPLICATION:SSL
Copy to Clipboard Toggle word wrap

Such a configuration would use TLS encryption and TLS authentication (optional) for all interfaces.

6.2. TLS Encryption

Kafka supports TLS for encrypting communication with Kafka clients.

In order to use TLS encryption and server authentication, a keystore containing private and public keys has to be provided. This is usually done using a file in the Java Keystore (JKS) format. A path to this file is set in the ssl.keystore.location property. The ssl.keystore.password property should be used to set the password protecting the keystore. For example:

ssl.keystore.location=/path/to/keystore/server-1.jks
ssl.keystore.password=123456
Copy to Clipboard Toggle word wrap

In some cases, an additional password is used to protect the private key. Any such password can be set using the ssl.key.password property.

Kafka is able to use keys signed by certification authorities as well as self-signed keys. Using keys signed by certification authorities should always be the preferred method. In order to allow clients to verify the identity of the Kafka broker they are connecting to, the certificate should always contain the advertised hostname(s) as its Common Name (CN) or in the Subject Alternative Names (SAN).

It is possible to use different SSL configurations for different listeners. All options starting with ssl. can be prefixed with listener.name.<NameOfTheListener>., where the name of the listener has to be always in lowercase. This will override the default SSL configuration for that specific listener. The following example shows how to use different SSL configurations for different listeners:

listeners=INT1://:9092,INT2://:9093,REPLICATION://:9094
listener.security.protocol.map=INT1:SSL,INT2:SSL,REPLICATION:SSL

# Default configuration - will be used for listeners INT1 and INT2
ssl.keystore.location=/path/to/keystore/server-1.jks
ssl.keystore.password=123456

# Different configuration for listener REPLICATION
listener.name.replication.ssl.keystore.location=/path/to/keystore/replication.jks
listener.name.replication.ssl.keystore.password=123456
Copy to Clipboard Toggle word wrap

Additional TLS configuration options

In addition to the main TLS configuration options described above, Kafka supports many options for fine-tuning the TLS configuration. For example, to enable or disable TLS / SSL protocols or cipher suites:

ssl.cipher.suites
List of enabled cipher suites. Each cipher suite is a combination of authentication, encryption, MAC and key exchange algorithms used for the TLS connection. By default, all available cipher suites are enabled.
ssl.enabled.protocols
List of enabled TLS / SSL protocols. Defaults to TLSv1.2,TLSv1.1,TLSv1.

6.2.1. Enabling TLS encryption

This procedure describes how to enable encryption in Kafka brokers.

Prerequisites

Procedure

  1. Generate TLS certificates for all Kafka brokers in your cluster. The certificates should have their advertised and bootstrap addresses in their Common Name or Subject Alternative Name.
  2. Edit the Kafka configuration properties file on all cluster nodes for the following:

    • Change the listener.security.protocol.map field to specify the SSL protocol for the listener where you want to use TLS encryption.
    • Set the ssl.keystore.location option to the path to the JKS keystore with the broker certificate.
    • Set the ssl.keystore.password option to the password you used to protect the keystore.

      For example:

      listeners=UNENCRYPTED://:9092,ENCRYPTED://:9093,REPLICATION://:9094
      listener.security.protocol.map=UNENCRYPTED:PLAINTEXT,ENCRYPTED:SSL,REPLICATION:PLAINTEXT
      ssl.keystore.location=/path/to/keystore/server-1.jks
      ssl.keystore.password=123456
      Copy to Clipboard Toggle word wrap
  3. (Re)start the Kafka brokers

6.3. Authentication

To authenticate client connections to your Kafka cluster, the following options are available:

TLS client authentication
TLS (Transport Layer Security) using X.509 certificates on encrypted connections
Kafka SASL
Kafka SASL (Simple Authentication and Security Layer) using supported authentication mechanisms
OAuth 2.0
OAuth 2.0 token-based authentication

SASL authentication supports various mechanisms for both plain unencrypted connections and TLS connections:

  • PLAIN ― Authentication based on usernames and passwords.
  • SCRAM-SHA-256 and SCRAM-SHA-512 ― Authentication using Salted Challenge Response Authentication Mechanism (SCRAM).
  • GSSAPI ― Authentication against a Kerberos server.
Warning

The PLAIN mechanism sends usernames and passwords over the network in an unencrypted format. It should only be used in combination with TLS encryption.

6.3.1. Enabling TLS client authentication

Enable TLS client authentication in Kafka brokers to enhance security for connections to Kafka nodes already using TLS encryption.

Use the ssl.client.auth property to set TLS authentication with one of these values:

  • none ― TLS client authentication is off (default)
  • requested ― Optional TLS client authentication
  • required ― Clients must authenticate using a TLS client certificate

When a client authenticates using TLS client authentication, the authenticated principal name is derived from the distinguished name in the client certificate. For instance, a user with a certificate having a distinguished name CN=someuser will be authenticated with the principal CN=someuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown. This principal name provides a unique identifier for the authenticated user or entity. When TLS client authentication is not used, and SASL is disabled, the principal name defaults to ANONYMOUS.

Prerequisites

Procedure

  1. Prepare a JKS (Java Keystore ) truststore containing the public key of the CA (Certification Authority) used to sign the user certificates.
  2. Edit the Kafka configuration properties file on all cluster nodes as follows:

    • Specify the path to the JKS truststore using the ssl.truststore.location property.
    • If the truststore is password-protected, set the password using ssl.truststore.password property.
    • Set the ssl.client.auth property to required.

      TLS client authentication configuration

      ssl.truststore.location=/path/to/truststore.jks
      ssl.truststore.password=123456
      ssl.client.auth=required
      Copy to Clipboard Toggle word wrap

  3. (Re)start the Kafka brokers.

6.3.2. Enabling SASL PLAIN client authentication

Enable SASL PLAIN authentication in Kafka to enhance security for connections to Kafka nodes.

SASL authentication is enabled through the Java Authentication and Authorization Service (JAAS) using the KafkaServer JAAS context. You can define the JAAS configuration in a dedicated file or directly in the Kafka configuration.

The recommended location for the dedicated file is /opt/kafka/config/jaas.conf. Ensure that the file is readable by the kafka user. Keep the JAAS configuration file in sync on all Kafka nodes.

Prerequisites

Procedure

  1. Edit or create the /opt/kafka/config/jaas.conf JAAS configuration file to enable the PlainLoginModule and specify the allowed usernames and passwords.

    Make sure this file is the same on all Kafka brokers.

    JAAS configuration

    KafkaServer {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        user_admin="123456"
        user_user1="123456"
        user_user2="123456";
    };
    Copy to Clipboard Toggle word wrap

  2. Edit the Kafka configuration properties file on all cluster nodes as follows:

    • Enable SASL PLAIN authentication on specific listeners using the listener.security.protocol.map property. Specify SASL_PLAINTEXT or SASL_SSL.
    • Set the sasl.enabled.mechanisms property to PLAIN.

      SASL plain configuration

      listeners=INSECURE://:9092,AUTHENTICATED://:9093,REPLICATION://:9094
      listener.security.protocol.map=INSECURE:PLAINTEXT,AUTHENTICATED:SASL_PLAINTEXT,REPLICATION:PLAINTEXT
      sasl.enabled.mechanisms=PLAIN
      Copy to Clipboard Toggle word wrap

  3. (Re)start the Kafka brokers using the KAFKA_OPTS environment variable to pass the JAAS configuration to Kafka brokers:

    su - kafka
    export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/config/jaas.conf"; /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
    Copy to Clipboard Toggle word wrap

6.3.3. Enabling SASL SCRAM client authentication

Enable SASL SCRAM authentication in Kafka to enhance security for connections to Kafka nodes.

SASL authentication is enabled through the Java Authentication and Authorization Service (JAAS) using the KafkaServer JAAS context. You can define the JAAS configuration in a dedicated file or directly in the Kafka configuration.

The recommended location for the dedicated file is /opt/kafka/config/jaas.conf. Ensure that the file is readable by the kafka user. Keep the JAAS configuration file in sync on all Kafka nodes.

Prerequisites

Procedure

  1. Edit or create the /opt/kafka/config/jaas.conf JAAS configuration file to enable the ScramLoginModule.

    Make sure this file is the same on all Kafka brokers.

    JAAS configuration

    KafkaServer {
        org.apache.kafka.common.security.scram.ScramLoginModule required;
    };
    Copy to Clipboard Toggle word wrap

  2. Edit the Kafka configuration properties file on all cluster nodes as follows:

    • Enable SASL SCRAM authentication on specific listeners using the listener.security.protocol.map property. Specify SASL_PLAINTEXT or SASL_SSL.
    • Set the sasl.enabled.mechanisms option to SCRAM-SHA-256 or SCRAM-SHA-512.

      For example:

      listeners=INSECURE://:9092,AUTHENTICATED://:9093,REPLICATION://:9094
      listener.security.protocol.map=INSECURE:PLAINTEXT,AUTHENTICATED:SASL_PLAINTEXT,REPLICATION:PLAINTEXT
      sasl.enabled.mechanisms=SCRAM-SHA-512
      Copy to Clipboard Toggle word wrap
  3. (Re)start the Kafka brokers using the KAFKA_OPTS environment variable to pass the JAAS configuration to Kafka brokers.

    su - kafka
    export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/config/jaas.conf"; /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
    Copy to Clipboard Toggle word wrap

6.3.4. Enabling multiple SASL mechanisms

When using SASL authentication, you can enable more than one mechanism. Kafka can use more than one SASL mechanism simultaneously. When multiple mechanisms are enabled, you can choose the mechanism specific clients use.

To use more than one mechanism, you set up the configuration required for each mechanism. You can add different KafkaServer JAAS configurations to the same context and enable more than one mechanism in the Kafka configuration as a comma-separated list using the sasl.mechanism.inter.broker.protocol property.

JAAS configuration for more than one SASL mechanism

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    user_admin="123456"
    user_user1="123456"
    user_user2="123456";

    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    keyTab="/etc/security/keytabs/kafka_server.keytab"
    principal="kafka/kafka1.hostname.com@EXAMPLE.COM";

    org.apache.kafka.common.security.scram.ScramLoginModule required;
};
Copy to Clipboard Toggle word wrap

SASL mechanisms enabled

sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512
Copy to Clipboard Toggle word wrap

Enable SASL SCRAM authentication between Kafka nodes to enhance security for inter-broker connections. As well as using SASL authentication for client connections to a Kafka cluster, you can also use SASL for inter-broker authentication. Unlike SASL for client connections, you can only choose one mechanism for inter-broker communication.

Prerequisites

  • ZooKeeper is installed on each host, and the configuration files are available.
  • If you are using a SCRAM mechanism, register SCRAM credentials on the Kafka cluster.

    For all nodes in the Kafka cluster, add the inter-broker SASL SCRAM user to ZooKeeper. This ensures that the credentials for authentication are updated for bootstrapping before the Kafka cluster is running.

    Registering an inter-broker SASL SCRAM user

    bin/kafka-configs.sh \
    --zookeeper localhost:2181 \
    --alter \
    --add-config 'SCRAM-SHA-512=[password=changeit]' \
    --entity-type users \
    --entity-name kafka
    Copy to Clipboard Toggle word wrap

Procedure

  1. Specify an inter-broker SASL mechanism in the Kafka configuration using the sasl.mechanism.inter.broker.protocol property.

    Inter-broker SASL mechanism

    sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
    Copy to Clipboard Toggle word wrap

  2. (Optional) If you are using a SCRAM mechanism, register SCRAM credentials on the Kafka cluster by adding SCRAM users.

    This ensures that the credentials for authentication are updated for bootstrapping before the Kafka cluster is running.

  3. Specify the username and password for inter-broker communication in the KafkaServer JAAS context using the username and password fields.

    Inter-broker JAAS context

    KafkaServer {
        org.apache.kafka.common.security.plain.ScramLoginModule required
        username="admin"
        password="123456"
        # ...
    };
    Copy to Clipboard Toggle word wrap

6.3.6. Adding SASL SCRAM users

This procedure outlines the steps to register new users for authentication using SASL SCRAM in Kafka. SASL SCRAM authentication enhances the security of client connections.

Prerequisites

Procedure

  • Use the kafka-configs.sh tool to add new SASL SCRAM users.

    /opt/kafka/kafka-configs.sh \
    --bootstrap-server <broker_host>:<port> \
    --alter \
    --add-config 'SCRAM-SHA-512=[password=<password>]' \
    --entity-type users --entity-name <username>
    Copy to Clipboard Toggle word wrap

    For example:

    /opt/kafka/kafka-configs.sh \
    --bootstrap-server localhost:9092 \
    --alter \
    --add-config 'SCRAM-SHA-512=[password=123456]' \
    --entity-type users \
    --entity-name user1
    Copy to Clipboard Toggle word wrap

6.3.7. Deleting SASL SCRAM users

This procedure outlines the steps to remove users registered for authentication using SASL SCRAM in Kafka.

Prerequisites

Procedure

  • Use the kafka-configs.sh tool to delete SASL SCRAM users.

    /opt/kafka/bin/kafka-configs.sh \
    --bootstrap-server <broker_host>:<port> \
    --alter \
    --delete-config 'SCRAM-SHA-512' \
    --entity-type users \
    --entity-name <username>
    Copy to Clipboard Toggle word wrap

    For example:

    /opt/kafka/bin/kafka-configs.sh \
    --bootstrap-server localhost:9092 \
    --alter \
    --delete-config 'SCRAM-SHA-512' \
    --entity-type users \
    --entity-name user1
    Copy to Clipboard Toggle word wrap

6.3.8. Enabling Kerberos (GSSAPI) authentication

Streams for Apache Kafka supports the use of the Kerberos (GSSAPI) authentication protocol for secure single sign-on access to your Kafka cluster. GSSAPI is an API wrapper for Kerberos functionality, insulating applications from underlying implementation changes.

Kerberos is a network authentication system that allows clients and servers to authenticate to each other by using symmetric encryption and a trusted third party, the Kerberos Key Distribution Centre (KDC).

This procedure shows how to configure Streams for Apache Kafka so that Kafka clients can access Kafka and ZooKeeper using Kerberos (GSSAPI) authentication.

The procedure assumes that a Kerberos krb5 resource server has been set up on a Red Hat Enterprise Linux host.

The procedure shows, with examples, how to configure:

  1. Service principals
  2. Kafka brokers to use the Kerberos login
  3. ZooKeeper to use Kerberos login
  4. Producer and consumer clients to access Kafka using Kerberos authentication

The instructions describe Kerberos set up for a single ZooKeeper and Kafka installation on a single host, with additional configuration for a producer and consumer client.

Prerequisites

To be able to configure Kafka and ZooKeeper to authenticate and authorize Kerberos credentials, you will need:

  • Access to a Kerberos server
  • A Kerberos client on each Kafka broker host

For more information on the steps to set up a Kerberos server, and clients on broker hosts, see the example Kerberos on RHEL set up configuration.

Add service principals for authentication

From your Kerberos server, create service principals (users) for ZooKeeper, Kafka brokers, and Kafka producer and consumer clients.

Service principals must take the form SERVICE-NAME/FULLY-QUALIFIED-HOST-NAME@DOMAIN-REALM.

  1. Create the service principals, and keytabs that store the principal keys, through the Kerberos KDC.

    Make sure the domain name in the Kerberos principal is in uppercase.

    For example:

    • zookeeper/node1.example.redhat.com@EXAMPLE.REDHAT.COM
    • kafka/node1.example.redhat.com@EXAMPLE.REDHAT.COM
    • producer1/node1.example.redhat.com@EXAMPLE.REDHAT.COM
    • consumer1/node1.example.redhat.com@EXAMPLE.REDHAT.COM

      The ZooKeeper service principal must have the same hostname as the zookeeper.connect configuration in the Kafka config/server.properties file:

      zookeeper.connect=node1.example.redhat.com:2181
      Copy to Clipboard Toggle word wrap

      If the hostname is not the same, localhost is used and authentication will fail.

  2. Create a directory on the host and add the keytab files:

    For example:

    /opt/kafka/krb5/zookeeper-node1.keytab
    /opt/kafka/krb5/kafka-node1.keytab
    /opt/kafka/krb5/kafka-producer1.keytab
    /opt/kafka/krb5/kafka-consumer1.keytab
    Copy to Clipboard Toggle word wrap
  3. Ensure the kafka user can access the directory:

    chown kafka:kafka -R /opt/kafka/krb5
    Copy to Clipboard Toggle word wrap
Configure ZooKeeper to use a Kerberos Login

Configure ZooKeeper to use the Kerberos Key Distribution Center (KDC) for authentication using the user principals and keytabs previously created for zookeeper.

  1. Create or modify the opt/kafka/config/jaas.conf file to support ZooKeeper client and server operations:

    Client {
        com.sun.security.auth.module.Krb5LoginModule required debug=true
        useKeyTab=true 
    1
    
        storeKey=true 
    2
    
        useTicketCache=false 
    3
    
        keyTab="/opt/kafka/krb5/zookeeper-node1.keytab" 
    4
    
        principal="zookeeper/node1.example.redhat.com@EXAMPLE.REDHAT.COM"; 
    5
    
    };
    
    Server {
        com.sun.security.auth.module.Krb5LoginModule required debug=true
        useKeyTab=true
        storeKey=true
        useTicketCache=false
        keyTab="/opt/kafka/krb5/zookeeper-node1.keytab"
        principal="zookeeper/node1.example.redhat.com@EXAMPLE.REDHAT.COM";
    };
    
    QuorumServer {
        com.sun.security.auth.module.Krb5LoginModule required debug=true
        useKeyTab=true
        storeKey=true
        keyTab="/opt/kafka/krb5/zookeeper-node1.keytab"
        principal="zookeeper/node1.example.redhat.com@EXAMPLE.REDHAT.COM";
    };
    
    QuorumLearner {
        com.sun.security.auth.module.Krb5LoginModule required debug=true
        useKeyTab=true
        storeKey=true
        keyTab="/opt/kafka/krb5/zookeeper-node1.keytab"
        principal="zookeeper/node1.example.redhat.com@EXAMPLE.REDHAT.COM";
    };
    Copy to Clipboard Toggle word wrap
    1
    Set to true to get the principal key from the keytab.
    2
    Set to true to store the principal key.
    3
    Set to true to obtain the Ticket Granting Ticket (TGT) from the ticket cache.
    4
    The keyTab property points to the location of the keytab file copied from the Kerberos KDC. The location and file must be readable by the kafka user.
    5
    The principal property is configured to match the fully-qualified principal name created on the KDC host, which follows the format SERVICE-NAME/FULLY-QUALIFIED-HOST-NAME@DOMAIN-NAME.
  2. Edit opt/kafka/config/zookeeper.properties to use the updated JAAS configuration:

    # ...
    
    requireClientAuthScheme=sasl
    jaasLoginRenew=3600000 
    1
    
    kerberos.removeHostFromPrincipal=false 
    2
    
    kerberos.removeRealmFromPrincipal=false 
    3
    
    quorum.auth.enableSasl=true 
    4
    
    quorum.auth.learnerRequireSasl=true 
    5
    
    quorum.auth.serverRequireSasl=true
    quorum.auth.learner.loginContext=QuorumLearner 
    6
    
    quorum.auth.server.loginContext=QuorumServer
    quorum.auth.kerberos.servicePrincipal=zookeeper/_HOST 
    7
    
    quorum.cnxn.threads.size=20
    Copy to Clipboard Toggle word wrap
    1
    Controls the frequency for login renewal in milliseconds, which can be adjusted to suit ticket renewal intervals. Default is one hour.
    2
    Dictates whether the hostname is used as part of the login principal name. If using a single keytab for all nodes in the cluster, this is set to true. However, it is recommended to generate a separate keytab and fully-qualified principal for each broker host for troubleshooting.
    3
    Controls whether the realm name is stripped from the principal name for Kerberos negotiations. It is recommended that this setting is set as false.
    4
    Enables SASL authentication mechanisms for the ZooKeeper server and client.
    5
    The RequireSasl properties controls whether SASL authentication is required for quorum events, such as master elections.
    6
    The loginContext properties identify the name of the login context in the JAAS configuration used for authentication configuration of the specified component. The loginContext names correspond to the names of the relevant sections in the opt/kafka/config/jaas.conf file.
    7
    Controls the naming convention to be used to form the principal name used for identification. The placeholder _HOST is automatically resolved to the hostnames defined by the server.1 properties at runtime.
  3. Start ZooKeeper with JVM parameters to specify the Kerberos login configuration:

    su - kafka
    export EXTRA_ARGS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/opt/kafka/config/jaas.conf"; /opt/kafka/bin/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties
    Copy to Clipboard Toggle word wrap

    If you are not using the default service name (zookeeper), add the name using the -Dzookeeper.sasl.client.username=NAME parameter.

    Note

    If you are using the /etc/krb5.conf location, you do not need to specify -Djava.security.krb5.conf=/etc/krb5.conf when starting ZooKeeper, Kafka, or the Kafka producer and consumer.

Configure the Kafka broker server to use a Kerberos login

Configure Kafka to use the Kerberos Key Distribution Center (KDC) for authentication using the user principals and keytabs previously created for kafka.

  1. Modify the opt/kafka/config/jaas.conf file with the following elements:

    KafkaServer {
        com.sun.security.auth.module.Krb5LoginModule required
        useKeyTab=true
        storeKey=true
        keyTab="/opt/kafka/krb5/kafka-node1.keytab"
        principal="kafka/node1.example.redhat.com@EXAMPLE.REDHAT.COM";
    };
    KafkaClient {
        com.sun.security.auth.module.Krb5LoginModule required debug=true
        useKeyTab=true
        storeKey=true
        useTicketCache=false
        keyTab="/opt/kafka/krb5/kafka-node1.keytab"
        principal="kafka/node1.example.redhat.com@EXAMPLE.REDHAT.COM";
    };
    Copy to Clipboard Toggle word wrap
  2. Configure each broker in the Kafka cluster by modifying the listener configuration in the config/server.properties file so the listeners use the SASL/GSSAPI login.

    Add the SASL protocol to the map of security protocols for the listener, and remove any unwanted protocols.

    For example:

    # ...
    broker.id=0
    # ...
    listeners=SECURE://:9092,REPLICATION://:9094 
    1
    
    inter.broker.listener.name=REPLICATION
    # ...
    listener.security.protocol.map=SECURE:SASL_PLAINTEXT,REPLICATION:SASL_PLAINTEXT 
    2
    
    # ..
    sasl.enabled.mechanisms=GSSAPI 
    3
    
    sasl.mechanism.inter.broker.protocol=GSSAPI 
    4
    
    sasl.kerberos.service.name=kafka 
    5
    
    ...
    Copy to Clipboard Toggle word wrap
    1
    Two listeners are configured: a secure listener for general-purpose communications with clients (supporting TLS for communications), and a replication listener for inter-broker communications.
    2
    For TLS-enabled listeners, the protocol name is SASL_PLAINTEXT. For non-TLS-enabled connectors, the protocol name is SASL_PLAINTEXT. If SSL is not required, you can remove the ssl.* properties.
    3
    SASL mechanism for Kerberos authentication is GSSAPI.
    4
    Kerberos authentication for inter-broker communication.
    5
    The name of the service used for authentication requests is specified to distinguish it from other services that may also be using the same Kerberos configuration.
  3. Start the Kafka broker, with JVM parameters to specify the Kerberos login configuration:

    su - kafka
    export KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/opt/kafka/config/jaas.conf"; /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
    Copy to Clipboard Toggle word wrap

    If the broker and ZooKeeper cluster were previously configured and working with a non-Kerberos-based authentication system, it is possible to start the ZooKeeper and broker cluster and check for configuration errors in the logs.

    After starting the broker and Zookeeper instances, the cluster is now configured for Kerberos authentication.

Configure Kafka producer and consumer clients to use Kerberos authentication

Configure Kafka producer and consumer clients to use the Kerberos Key Distribution Center (KDC) for authentication using the user principals and keytabs previously created for producer1 and consumer1.

  1. Add the Kerberos configuration to the producer or consumer configuration file.

    For example:

    /opt/kafka/config/producer.properties

    # ...
    sasl.mechanism=GSSAPI 
    1
    
    security.protocol=SASL_PLAINTEXT 
    2
    
    sasl.kerberos.service.name=kafka 
    3
    
    sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \ 
    4
    
        useKeyTab=true  \
        useTicketCache=false \
        storeKey=true  \
        keyTab="/opt/kafka/krb5/producer1.keytab" \
        principal="producer1/node1.example.redhat.com@EXAMPLE.REDHAT.COM";
    # ...
    Copy to Clipboard Toggle word wrap

    1
    Configuration for Kerberos (GSSAPI) authentication.
    2
    Kerberos uses the SASL plaintext (username/password) security protocol.
    3
    The service principal (user) for Kafka that was configured in the Kerberos KDC.
    4
    Configuration for the JAAS using the same properties defined in jaas.conf.

    /opt/kafka/config/consumer.properties

    # ...
    sasl.mechanism=GSSAPI
    security.protocol=SASL_PLAINTEXT
    sasl.kerberos.service.name=kafka
    sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
        useKeyTab=true  \
        useTicketCache=false \
        storeKey=true  \
        keyTab="/opt/kafka/krb5/consumer1.keytab" \
        principal="consumer1/node1.example.redhat.com@EXAMPLE.REDHAT.COM";
    # ...
    Copy to Clipboard Toggle word wrap

  2. Run the clients to verify that you can send and receive messages from the Kafka brokers.

    Producer client:

    export KAFKA_HEAP_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Dsun.security.krb5.debug=true"; /opt/kafka/bin/kafka-console-producer.sh --producer.config /opt/kafka/config/producer.properties  --topic topic1 --bootstrap-server node1.example.redhat.com:9094
    Copy to Clipboard Toggle word wrap

    Consumer client:

    export KAFKA_HEAP_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Dsun.security.krb5.debug=true"; /opt/kafka/bin/kafka-console-consumer.sh --consumer.config /opt/kafka/config/consumer.properties  --topic topic1 --bootstrap-server node1.example.redhat.com:9094
    Copy to Clipboard Toggle word wrap

6.4. Authorization

Authorization in Kafka brokers is implemented using authorizer plugins.

In this section we describe how to use the AclAuthorizer plugin provided with Kafka.

Alternatively, you can use your own authorization plugins. For example, if you are using OAuth 2.0 token-based authentication, you can use OAuth 2.0 authorization.

6.4.1. Enabling an ACL authorizer

Edit the /opt/kafka/config/server.properties file to add an ACL authorizer. Enable the authorizer by specifying its fully-qualified name in the authorizer.class.name property:

Enabling the authorizer

authorizer.class.name=kafka.security.authorizer.AclAuthorizer
Copy to Clipboard Toggle word wrap

For AclAuthorizer, the fully-qualified name is kafka.security.authorizer.AclAuthorizer.

6.4.1.1. ACL rules

An ACL authorizer uses ACL rules to manage access to Kafka brokers.

ACL rules are defined in the following format:

Principal P is allowed / denied <operation> O on <kafka_resource> R from host H

For example, a rule might be set so that user John can view the topic comments from host 127.0.0.1. Host is the IP address of the machine that John is connecting from.

In most cases, the user is a producer or consumer application:

Consumer01 can write to the consumer group accounts from host 127.0.0.1

If ACL rules are not present for a given resource, all actions are denied. This behavior can be changed by setting the property allow.everyone.if.no.acl.found to true in the Kafka configuration file /opt/kafka/config/server.properties.

6.4.1.2. Principals

A principal represents the identity of a user. The format of the ID depends on the authentication mechanism used by clients to connect to Kafka:

  • User:ANONYMOUS when connected without authentication.
  • User:<username> when connected using simple authentication mechanisms, such as PLAIN or SCRAM.

    For example User:admin or User:user1.

  • User:<DistinguishedName> when connected using TLS client authentication.

    For example User:CN=user1,O=MyCompany,L=Prague,C=CZ.

  • User:<Kerberos username> when connected using Kerberos.

The DistinguishedName is the distinguished name from the client certificate.

The Kerberos username is the primary part of the Kerberos principal, which is used by default when connecting using Kerberos. You can use the sasl.kerberos.principal.to.local.rules property to configure how the Kafka principal is built from the Kerberos principal.

6.4.1.3. Authentication of users

To use authorization, you need to have authentication enabled and used by your clients. Otherwise, all connections will have the principal User:ANONYMOUS.

For more information on methods of authentication, see Section 6.3, “Authentication”.

6.4.1.4. Super users

Super users are allowed to take all actions regardless of the ACL rules.

Super users are defined in the Kafka configuration file using the property super.users.

For example:

super.users=User:admin,User:operator
Copy to Clipboard Toggle word wrap

6.4.1.5. Replica broker authentication

When authorization is enabled, it is applied to all listeners and all connections. This includes the inter-broker connections used for replication of data between brokers. If enabling authorization, therefore, ensure that you use authentication for inter-broker connections and give the users used by the brokers sufficient rights. For example, if authentication between brokers uses the kafka-broker user, then super user configuration must include the username super.users=User:kafka-broker.

Note

For more information on the operations on Kafka resources you can control with ACLs, see the Apache Kafka documentation.

6.4.2. Adding ACL rules

When using an ACL authorizer to control access to Kafka based on Access Control Lists (ACLs), you can add new ACL rules using the kafka-acls.sh utility.

Use kafka-acls.sh parameter options to add, list and remove ACL rules, and perform other functions. The parameters require a double-hyphen convention, such as --add.

Prerequisites

Procedure

  • Run kafka-acls.sh with the --add option.

    Examples:

  • Allow user1 and user2 access to read from myTopic using the MyConsumerGroup consumer group.

    opt/kafka/bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --operation Read --topic myTopic --allow-principal User:user1 --allow-principal User:user2
    
    opt/kafka/bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --operation Describe --topic myTopic --allow-principal User:user1 --allow-principal User:user2
    
    opt/kafka/bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --operation Read --operation Describe --group MyConsumerGroup --allow-principal User:user1 --allow-principal User:user2
    Copy to Clipboard Toggle word wrap
  • Deny user1 access to read myTopic from IP address host 127.0.0.1.

    opt/kafka/bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --operation Describe --operation Read --topic myTopic --group MyConsumerGroup --deny-principal User:user1 --deny-host 127.0.0.1
    Copy to Clipboard Toggle word wrap
  • Add user1 as the consumer of myTopic with MyConsumerGroup.

    opt/kafka/bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --consumer --topic myTopic --group MyConsumerGroup --allow-principal User:user1
    Copy to Clipboard Toggle word wrap

6.4.3. Listing ACL rules

When using an ACL authorizer to control access to Kafka based on Access Control Lists (ACLs), you can list existing ACL rules using the kafka-acls.sh utility.

Prerequisites

Procedure

  • Run kafka-acls.sh with the --list option.

    For example:

    opt/kafka/bin/kafka-acls.sh --bootstrap-server localhost:9092 --list --topic myTopic
    
    Current ACLs for resource `Topic:myTopic`:
    
    User:user1 has Allow permission for operations: Read from hosts: *
    User:user2 has Allow permission for operations: Read from hosts: *
    User:user2 has Deny permission for operations: Read from hosts: 127.0.0.1
    User:user1 has Allow permission for operations: Describe from hosts: *
    User:user2 has Allow permission for operations: Describe from hosts: *
    User:user2 has Deny permission for operations: Describe from hosts: 127.0.0.1
    Copy to Clipboard Toggle word wrap

6.4.4. Removing ACL rules

When using an ACL authorizer to control access to Kafka based on Access Control Lists (ACLs), you can remove existing ACL rules using the kafka-acls.sh utility.

Prerequisites

Procedure

  • Run kafka-acls.sh with the --remove option.

    Examples:

  • Remove the ACL allowing Allow user1 and user2 access to read from myTopic using the MyConsumerGroup consumer group.

    opt/kafka/bin/kafka-acls.sh --bootstrap-server localhost:9092 --remove --operation Read --topic myTopic --allow-principal User:user1 --allow-principal User:user2
    
    opt/kafka/bin/kafka-acls.sh --bootstrap-server localhost:9092 --remove --operation Describe --topic myTopic --allow-principal User:user1 --allow-principal User:user2
    
    opt/kafka/bin/kafka-acls.sh --bootstrap-server localhost:9092 --remove --operation Read --operation Describe --group MyConsumerGroup --allow-principal User:user1 --allow-principal User:user2
    Copy to Clipboard Toggle word wrap
  • Remove the ACL adding user1 as the consumer of myTopic with MyConsumerGroup.

    opt/kafka/bin/kafka-acls.sh --bootstrap-server localhost:9092 --remove --consumer --topic myTopic --group MyConsumerGroup --allow-principal User:user1
    Copy to Clipboard Toggle word wrap
  • Remove the ACL denying user1 access to read myTopic from IP address host 127.0.0.1.

    opt/kafka/bin/kafka-acls.sh --bootstrap-server localhost:9092 --remove --operation Describe --operation Read --topic myTopic --group MyConsumerGroup --deny-principal User:user1 --deny-host 127.0.0.1
    Copy to Clipboard Toggle word wrap

6.5. ZooKeeper authentication

By default, connections between ZooKeeper and Kafka are not authenticated. However, Kafka and ZooKeeper support Java Authentication and Authorization Service (JAAS) which can be used to set up authentication using Simple Authentication and Security Layer (SASL). ZooKeeper supports authentication using the DIGEST-MD5 SASL mechanism with locally stored credentials.

6.5.1. JAAS Configuration

SASL authentication for ZooKeeper connections has to be configured in the JAAS configuration file. By default, Kafka will use the JAAS context named Client for connecting to ZooKeeper. The Client context should be configured in the /opt/kafka/config/jass.conf file. The context has to enable the PLAIN SASL authentication, as in the following example:

Client {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="kafka"
    password="123456";
};
Copy to Clipboard Toggle word wrap

6.5.2. Enabling ZooKeeper authentication

This procedure describes how to enable authentication using the SASL DIGEST-MD5 mechanism when connecting to ZooKeeper.

Prerequisites

  • Client-to-server authentication is enabled in ZooKeeper

Enabling SASL DIGEST-MD5 authentication

  1. On all Kafka broker nodes, create or edit the /opt/kafka/config/jaas.conf JAAS configuration file and add the following context:

    Client {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="<Username>"
        password="<Password>";
    };
    Copy to Clipboard Toggle word wrap

    The username and password should be the same as configured in ZooKeeper.

    Following example shows the Client context:

    Client {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="kafka"
        password="123456";
    };
    Copy to Clipboard Toggle word wrap
  2. Restart all Kafka broker nodes one by one. To pass the JAAS configuration to Kafka brokers, use the KAFKA_OPTS environment variable.

    su - kafka
    export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/config/jaas.conf"; /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
    Copy to Clipboard Toggle word wrap

    For information on restarting brokers in a multi-node cluster, see Section 4.3, “Performing a graceful rolling restart of Kafka brokers”.

6.6. ZooKeeper authorization

When authentication is enabled between Kafka and ZooKeeper, you can use ZooKeeper Access Control List (ACL) rules to automatically control access to Kafka’s metadata stored in ZooKeeper.

6.6.1. ACL Configuration

Enforcement of ZooKeeper ACL rules is controlled by the zookeeper.set.acl property in the config/server.properties Kafka configuration file.

The property is disabled by default and enabled by setting to true:

zookeeper.set.acl=true
Copy to Clipboard Toggle word wrap

If ACL rules are enabled, when a znode is created in ZooKeeper only the Kafka user who created it can modify or delete it. All other users have read-only access.

Kafka sets ACL rules only for newly created ZooKeeper znodes. If the ACLs are only enabled after the first start of the cluster, the zookeeper-security-migration.sh tool can set ACLs on all existing znodes.

Confidentiality of data in ZooKeeper

Data stored in ZooKeeper includes:

  • Topic names and their configuration
  • Salted and hashed user credentials when SASL SCRAM authentication is used.

But ZooKeeper does not store any records sent and received using Kafka. The data stored in ZooKeeper is assumed to be non-confidential.

If the data is to be regarded as confidential (for example because topic names contain customer IDs), the only option available for protection is isolating ZooKeeper on the network level and allowing access only to Kafka brokers.

This procedure describes how to enable ZooKeeper ACLs in Kafka configuration for a new Kafka cluster. Use this procedure only before the first start of the Kafka cluster. For enabling ZooKeeper ACLs in a cluster that is already running, see Section 6.6.3, “Enabling ZooKeeper ACLs in an existing Kafka cluster”.

Prerequisites

Procedure

  1. Edit the Kafka configuration properties file to set the zookeeper.set.acl field to true on all cluster nodes.

    zookeeper.set.acl=true
    Copy to Clipboard Toggle word wrap
  2. Start the Kafka brokers.

This procedure describes how to enable ZooKeeper ACLs in Kafka configuration for a Kafka cluster that is running. Use the zookeeper-security-migration.sh tool to set ZooKeeper ACLs on all existing znodes. The zookeeper-security-migration.sh is available as part of Streams for Apache Kafka, and can be found in the bin directory.

Prerequisites

Enabling the ZooKeeper ACLs

  1. Edit the Kafka configuration properties file to set the zookeeper.set.acl field to true on all cluster nodes.

    zookeeper.set.acl=true
    Copy to Clipboard Toggle word wrap
  2. Restart all Kafka brokers one by one.

    For information on restarting brokers in a multi-node cluster, see Section 4.3, “Performing a graceful rolling restart of Kafka brokers”.

  3. Set the ACLs on all existing ZooKeeper znodes using the zookeeper-security-migration.sh tool.

    su - kafka
    cd /opt/kafka
    KAFKA_OPTS="-Djava.security.auth.login.config=./config/jaas.conf"; ./bin/zookeeper-security-migration.sh --zookeeper.acl=secure --zookeeper.connect=<ZooKeeperURL>
    exit
    Copy to Clipboard Toggle word wrap

    For example:

    su - kafka
    cd /opt/kafka
    KAFKA_OPTS="-Djava.security.auth.login.config=./config/jaas.conf"; ./bin/zookeeper-security-migration.sh --zookeeper.acl=secure --zookeeper.connect=zoo1.my-domain.com:2181
    exit
    Copy to Clipboard Toggle word wrap
Back to top
Red Hat logoGithubredditYoutubeTwitter

Learn

Try, buy, & sell

Communities

About Red Hat Documentation

We help Red Hat users innovate and achieve their goals with our products and services with content they can trust. Explore our recent updates.

Making open source more inclusive

Red Hat is committed to replacing problematic language in our code, documentation, and web properties. For more details, see the Red Hat Blog.

About Red Hat

We deliver hardened solutions that make it easier for enterprises to work across platforms and environments, from the core datacenter to the network edge.

Theme

© 2025 Red Hat