Chapter 4. Configuring Kafka
Kafka uses a properties file to store static configuration. The recommended location for the configuration file is /opt/kafka/config/server.properties
. The configuration file has to be readable by the kafka
user.
AMQ Streams ships an example configuration file that highlights various basic and advanced features of the product. It can be found under config/server.properties
in the AMQ Streams installation directory.
This chapter explains the most important configuration options. For a complete list of supported Kafka broker configuration options, see Appendix A, Broker configuration parameters.
4.1. Zookeeper
Kafka brokers need Zookeeper to store some parts of their configuration as well as to coordinate the cluster (for example to decide which node is a leader for which partition). Connection details for the Zookeeper cluster are stored in the configuration file. The field zookeeper.connect
contains a comma-separated list of hostnames and ports of members of the zookeeper cluster.
For example:
zookeeper.connect=zoo1.my-domain.com:2181,zoo2.my-domain.com:2181,zoo3.my-domain.com:2181
Kafka will use these addresses to connect to the Zookeeper cluster. With this configuration, all Kafka znodes
will be created directly in the root of Zookeeper database. Therefore, such a Zookeeper cluster could be used only for a single Kafka cluster. To configure multiple Kafka clusters to use single Zookeeper cluster, specify a base (prefix) path at the end of the Zookeeper connection string in the Kafka configuration file:
zookeeper.connect=zoo1.my-domain.com:2181,zoo2.my-domain.com:2181,zoo3.my-domain.com:2181/my-cluster-1
4.2. Listeners
Kafka brokers can be configured to use multiple listeners. Each listener can be used to listen on a different port or network interface and can have different configuration. Listeners are configured in the listeners
property in the configuration file. The listeners
property contains a list of listeners with each listener configured as <listenerName>://<hostname>:_<port>_
. When the hostname value is empty, Kafka will use java.net.InetAddress.getCanonicalHostName()
as hostname. The following example shows how multiple listeners might be configured:
listeners=INT1://:9092,INT2://:9093,REPLICATION://:9094
When a Kafka client wants to connect to a Kafka cluster, it first connects to a bootstrap server. The bootstrap server is one of the cluster nodes. It will provide the client with a list of all other brokers which are part of the cluster and the client will connect to them individually. By default the bootstrap server will provide the client with a list of nodes based on the listeners
field.
Advertised listeners
It is possible to give the client a different set of addresses than given in the listeners property. It is useful in situations when additional network infrastructure, such as a proxy, is between the client and the broker, or when an external DNS name should be used instead of an IP address. Here, the broker allows defining the advertised addresses of the listeners in the advertised.listeners configuration property. This property has the same format as the listeners property. The following example shows how to configure advertised listeners:
listeners=INT1://:9092,INT2://:9093 advertised.listeners=INT1://my-broker-1.my-domain.com:1234,INT2://my-broker-1.my-domain.com:1234:9093
The names of the listeners have to match the names of the listeners from the listeners
property.
Inter-broker listeners
When the cluster has replicated topics, the brokers responsible for such topics need to communicate with each other in order to replicate the messages in those topics. When multiple listeners are configured, the configuration field inter.broker.listener.name
can be used to specify the name of the listener which should be used for replication between brokers. For example:
inter.broker.listener.name=REPLICATION
4.3. Commit logs
Apache Kafka stores all records it receives from producers in commit logs. The commit logs contain the actual data, in the form of records, that Kafka needs to deliver. These are not the application log files which record what the broker is doing.
Log directories
You can configure log directories using the log.dirs
property file to store commit logs in one or multiple log directories. It should be set to /var/lib/kafka
directory created during installation:
log.dirs=/var/lib/kafka
For performance reasons, you can configure log.dirs to multiple directories and place each of them on a different physical device to improve disk I/O performance. For example:
log.dirs=/var/lib/kafka1,/var/lib/kafka2,/var/lib/kafka3
4.4. Broker ID
Broker ID is a unique identifier for each broker in the cluster. You can assign an integer greater than or equal to 0 as broker ID. The broker ID is used to identify the brokers after restarts or crashes and it is therefore important that the id is stable and does not change over time. The broker ID is configured in the broker properties file:
broker.id=1
4.5. Running a multi-node Kafka cluster
This procedure describes how to configure and run Kafka as a multi-node cluster.
Prerequisites
- AMQ Streams is installed on all hosts which will be used as Kafka brokers.
- A Zookeeper cluster is configured and running.
Running the cluster
For each Kafka broker in your AMQ Streams cluster:
Edit the
/opt/kafka/config/server.properties
Kafka configuration file as follows:-
Set the
broker.id
field to0
for the first broker,1
for the second broker, and so on. -
Configure the details for connecting to Zookeeper in the
zookeeper.connect
option. - Configure the Kafka listeners.
Set the directories where the commit logs should be stored in the
logs.dir
directory.Here we see an example configuration for a Kafka broker:
broker.id=0 zookeeper.connect=zoo1.my-domain.com:2181,zoo2.my-domain.com:2181,zoo3.my-domain.com:2181 listeners=REPLICATION://:9091,PLAINTEXT://:9092 inter.broker.listener.name=REPLICATION log.dirs=/var/lib/kafka
In a typical installation where each Kafka broker is running on identical hardware, only the
broker.id
configuration property will differ between each broker config.
-
Set the
Start the Kafka broker with the default configuration file.
su - kafka /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
Verify that the Kafka broker is running.
jcmd | grep Kafka
Verifying the brokers
Once all nodes of the clusters are up and running, verify that all nodes are members of the Kafka cluster by sending a dump
command to one of the Zookeeper nodes using the ncat
utility. The command prints all Kafka brokers registered in Zookeeper.
Use ncat stat to check the node status.
echo dump | ncat zoo1.my-domain.com 2181
The output should contain all Kafka brokers you just configured and started.
Example output from the
ncat
command for Kafka cluster with 3 nodes:SessionTracker dump: org.apache.zookeeper.server.quorum.LearnerSessionTracker@28848ab9 ephemeral nodes dump: Sessions with Ephemerals (3): 0x20000015dd00000: /brokers/ids/1 0x10000015dc70000: /controller /brokers/ids/0 0x10000015dc70001: /brokers/ids/2
Additional resources
- For more information about installing AMQ Streams, see Section 2.3, “Installing AMQ Streams”.
- For more information about configuring AMQ Streams, see Section 2.8, “Configuring AMQ Streams”.
- For more information about running a Zookeeper cluster, see Section 3.3, “Running multi-node Zookeeper cluster”.
- For a complete list of supported Kafka broker configuration options, see Appendix A, Broker configuration parameters.
4.6. Zookeeper authentication
By default, connections between Zookeeper and Kafka are not authenticated. However, Kafka and Zookeeper support Java Authentication and Authorization Service (JAAS) which can be used to set up authentication using Simple Authentication and Security Layer (SASL). Zookeeper supports authentication using the DIGEST-MD5 SASL mechanism with locally stored credentials.
4.6.1. JAAS Configuration
SASL authentication for Zookeeper connections has to be configured in the JAAS configuration file. By default, Kafka will use the JAAS context named Client
for connecting to Zookeeper. The Client
context should be configured in the /opt/kafka/config/jass.conf
file. The context has to enable the PLAIN
SASL authentication, as in the following example:
Client { org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="123456"; };
4.6.2. Enabling Zookeeper authentication
This procedure describes how to enable authentication using the SASL DIGEST-MD5 mechanism when connecting to Zookeeper.
Prerequisites
- Client-to-server authentication is enabled in Zookeeper
Enabling SASL DIGEST-MD5 authentication
On all Kafka broker nodes, create or edit the
/opt/kafka/config/jaas.conf
JAAS configuration file and add the following context:Client { org.apache.kafka.common.security.plain.PlainLoginModule required username="<Username>" password="<Password>"; };
The username and password should be the same as configured in Zookeeper.
Following example shows the
Client
context:Client { org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="123456"; };
Restart all Kafka broker nodes one by one. To pass the JAAS configuration to Kafka brokers, use the
KAFKA_OPTS
environment variable.su - kafka export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/config/jaas.conf"; /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
Additional resources
- For more information about configuring client-to-server authentication in Zookeeper, see Section 3.4, “Authentication”.
4.7. Authorization
Authorization in Kafka brokers is implemented using authorizer plugins.
You can use your own authorization plugins or SimpleAclAuthorizer
, the authorizer plugin provided with Kafka.
In this section we describe how to use SimpleAclAuthorizer
.
4.7.1. Simple ACL authorizer
Authorizer plugins, including SimpleAclAuthorizer
, are enabled through the authorizer.class.name
property:
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
A fully-qualified name is required for the chosen authorizer. For SimpleAclAuthorizer
, the fully-qualified name is kafka.security.auth.SimpleAclAuthorizer
.
4.7.1.1. ACL rules
SimpleAclAuthorizer
uses ACL rules to manage access to Kafka brokers.
ACL rules are defined in the format:
Principal P is allowed / denied operation O on Kafka resource R from host H
For example, a rule might be set so that user:
John can view the topic comments from host 127.0.0.1
Host is the IP address of the machine that John is connecting from.
In most cases, the user is a producer or consumer application:
Consumer01 can write to the consumer group accounts from host 127.0.0.1
If ACL rules are not present
If ACL rules are not present for a given resource, all actions are denied. This behavior can be changed by setting the property allow.everyone.if.no.acl.found
to true
in the Kafka configuration file /opt/kafka/config/server.properties
.
4.7.1.2. Principals
A principal represents the identity of a user. The format of the ID depends on the authentication mechanism used by clients to connect to Kafka:
-
User:ANONYMOUS
when connected without authentication. User:<username>
when connected using simple authentication mechanisms, such as PLAIN or SCRAM.For example
User:admin
orUser:user1
.User:<DistinguishedName>
when connected using TLS client authentication.For example
User:CN=user1,O=MyCompany,L=Prague,C=CZ
.-
User:<Kerberos username>
when connected using Kerberos.
The DistinguishedName is the distinguished name from the client certificate.
The Kerberos username is the primary part of the Kerberos principal, which is used by default when connecting using Kerberos. You can use the sasl.kerberos.principal.to.local.rules
property to configure how the Kafka principal is built from the Kerberos principal.
4.7.1.3. Authentication of users
To use authorization, you need to have authentication enabled and used by your clients. Otherwise, all connections will have the principal User:ANONYMOUS
.
For more information on methods of authentication, see Encryption and authentication.
Super users
Super users are allowed to take all actions regardless of the ACL rules.
Super users are defined in the Kafka configuration file using the property super.users
.
For example:
super.users=User:admin,User:operator
Replica broker authentication
When authorization is enabled, it is applied to all listeners and all connections. This includes the inter-broker connections used for replication of data between brokers. If enabling authorization, therefore, ensure that you use authentication for inter-broker connections and give the users used by the brokers sufficient rights. For example, if authentication between brokers uses the kafka-broker
user, then super user configuration must include the username super.users=User:kafka-broker
.
4.7.1.4. Supported resources
You can apply Kafka ACLs to these types of resource:
- Topics
- Consumer groups
- The cluster
- TransactionId
- DelegationToken
4.7.1.5. Supported operations
SimpleAclAuthorizer
authorizes operations on resources.
Fields with X
in the following table mark the supported operations for each resource.
Topics | Consumer Groups | Cluster | |
---|---|---|---|
Read | X | X | |
Write | X | ||
Create | X | ||
Delete | X | ||
Alter | X | ||
Describe | X | X | X |
ClusterAction | X | ||
All | X | X | X |
4.7.1.6. ACL management options
ACL rules are managed using the bin/kafka-acls.sh
utility, which is provided as part of the Kafka distribution package.
Use kafka-acls.sh
parameter options to add, list and remove ACL rules, and perform other functions.
The parameters require a double-hyphen convention, such as --add
.
Option | Type | Description | Default |
---|---|---|---|
| Action | Add ACL rule. | |
| Action | Remove ACL rule. | |
| Action | List ACL rules. | |
| Action | Fully-qualified class name of the authorizer. |
|
| Configuration | Key/value pairs passed to the authorizer for initialization.
For | |
| Resource | Host/port pairs to connect to the Kafka cluster. |
Use this option or the |
| Resource |
Configuration property file to pass to the Admin Client, which is used in conjunction with the | |
| Resource | Specifies a cluster as an ACL resource. | |
| Resource | Specifies a topic name as an ACL resource.
An asterisk (
A single command can specify multiple | |
| Resource | Specifies a consumer group name as an ACL resource.
A single command can specify multiple | |
| Resource | Specifies a transactional ID as an ACL resource. Transactional delivery means that all messages sent by a producer to multiple partitions must be successfully delivered or none of them.
An asterisk ( | |
| Resource | Specifies a delegation token as an ACL resource.
An asterisk ( | |
| Configuration |
Specifies a type of resource pattern for the
Use
Use |
|
| Principal | Principal added to an allow ACL rule.
A single command can specify multiple | |
| Principal | Principal added to a deny ACL rule.
A single command can specify multiple | |
| Principal |
Principal name used with the
A single command can specify multiple | |
| Host |
IP address that allows access to the principals listed in Hostnames or CIDR ranges are not supported. |
If |
| Host |
IP address that denies access to the principals listed in Hostnames or CIDR ranges are not supported. |
if |
| Operation | Allows or denies an operation.
A single command can specify multipleMultiple | All |
| Shortcut | A shortcut to allow or deny all operations needed by a message producer (WRITE and DESCRIBE on topic, CREATE on cluster). | |
| Shortcut | A shortcut to allow or deny all operations needed by a message consumer (READ and DESCRIBE on topic, READ on consumer group). | |
| Shortcut |
A shortcut to enable idempotence when used with the Idepmotence is enabled automatically if the producer is authorized to send messages based on a specific transactional ID. | |
| Shortcut | A shortcut to accept all queries and do not prompt. |
4.7.2. Enabling authorization
This procedure describes how to enable the SimpleAclAuthorizer
plugin for authorization in Kafka brokers.
Prerequisites
- AMQ Streams is installed on all hosts used as Kafka brokers.
Procedure
Edit the
/opt/kafka/config/server.properties
Kafka configuration file to use theSimpleAclAuthorizer
.authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
- (Re)start the Kafka brokers.
Additional resources
- For more information about configuring AMQ Streams, see Section 2.8, “Configuring AMQ Streams”.
- For more information about running a Kafka cluster, see Section 4.5, “Running a multi-node Kafka cluster”.
4.7.3. Adding ACL rules
SimpleAclAuthorizer
uses Access Control Lists (ACLs), which define a set of rules describing what users can and cannot do.
This procedure describes how to add ACL rules when using the SimpleAclAuthorizer
plugin in Kafka brokers.
Rules are added using the kafka-acls.sh
utility and stored in Zookeeper.
Prerequisites
- AMQ Streams is installed on all hosts used as Kafka brokers.
- Authorization is enabled in Kafka brokers.
Procedure
Run
kafka-acls.sh
with the--add
option.Examples:
Allow
user1
anduser2
access to read frommyTopic
using theMyConsumerGroup
consumer group.bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zoo1.my-domain.com:2181 --add --operation Read --topic myTopic --allow-principal User:user1 --allow-principal User:user2 bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zoo1.my-domain.com:2181 --add --operation Describe --topic myTopic --allow-principal User:user1 --allow-principal User:user2 bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zoo1.my-domain.com:2181 --add --operation Read --operation Describe --group MyConsumerGroup --allow-principal User:user1 --allow-principal User:user2
Deny
user1
access to readmyTopic
from IP address host127.0.0.1
.bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zoo1.my-domain.com:2181 --add --operation Describe --operation Read --topic myTopic --group MyConsumerGroup --deny-principal User:user1 --deny-host 127.0.0.1
Add
user1
as the consumer ofmyTopic
withMyConsumerGroup
.bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zoo1.my-domain.com:2181 --add --consumer --topic myTopic --group MyConsumerGroup --allow-principal User:user1
Additional resources
-
For a list of all
kafka-acls.sh
options, see Section 4.7.1, “Simple ACL authorizer”.
4.7.4. Listing ACL rules
This procedure describes how to list existing ACL rules when using the SimpleAclAuthorizer
plugin in Kafka brokers.
Rules are listed using the kafka-acls.sh
utility.
Prerequisites
- AMQ Streams is installed on all hosts used as Kafka brokers.
- Authorization is enabled in Kafka brokers
- ACLs have been added.
Procedure
Run
kafka-acls.sh
with the--list
option.For example:
$ bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zoo1.my-domain.com:2181 --list --topic myTopic Current ACLs for resource `Topic:myTopic`: User:user1 has Allow permission for operations: Read from hosts: * User:user2 has Allow permission for operations: Read from hosts: * User:user2 has Deny permission for operations: Read from hosts: 127.0.0.1 User:user1 has Allow permission for operations: Describe from hosts: * User:user2 has Allow permission for operations: Describe from hosts: * User:user2 has Deny permission for operations: Describe from hosts: 127.0.0.1
Additional resources
-
For a list of all
kafka-acls.sh
options, see Section 4.7.1, “Simple ACL authorizer”.
4.7.5. Removing ACL rules
This procedure describes how to remove ACL rules when using the SimpleAclAuthorizer
plugin in Kafka brokers.
Rules are removed using the kafka-acls.sh
utility.
Prerequisites
- AMQ Streams is installed on all hosts used as Kafka brokers.
- Authorization is enabled in Kafka brokers.
- ACLs have been added.
Procedure
Run
kafka-acls.sh
with the--remove
option.Examples:
Remove the ACL allowing Allow
user1
anduser2
access to read frommyTopic
using theMyConsumerGroup
consumer group.bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zoo1.my-domain.com:2181 --remove --operation Read --topic myTopic --allow-principal User:user1 --allow-principal User:user2 bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zoo1.my-domain.com:2181 --remove --operation Describe --topic myTopic --allow-principal User:user1 --allow-principal User:user2 bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zoo1.my-domain.com:2181 --remove --operation Read --operation Describe --group MyConsumerGroup --allow-principal User:user1 --allow-principal User:user2
Remove the ACL adding
user1
as the consumer ofmyTopic
withMyConsumerGroup
.bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zoo1.my-domain.com:2181 --remove --consumer --topic myTopic --group MyConsumerGroup --allow-principal User:user1
Remove the ACL denying
user1
access to readmyTopic
from IP address host127.0.0.1
.bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zoo1.my-domain.com:2181 --remove --operation Describe --operation Read --topic myTopic --group MyConsumerGroup --deny-principal User:user1 --deny-host 127.0.0.1
Additional resources
-
For a list of all
kafka-acls.sh
options, see Section 4.7.1, “Simple ACL authorizer”. - For more information about enabling authorization, see Section 4.7.2, “Enabling authorization”.
4.8. Zookeeper authorization
When authentication is enabled between Kafka and Zookeeper, you can use Zookeeper Access Control List (ACL) rules to automatically control access to Kafka’s metadata stored in Zookeeper.
4.8.1. ACL Configuration
Enforcement of Zookeeper ACL rules is controlled by the zookeeper.set.acl
property in the config/server.properties
Kafka configuration file.
The property is disabled by default and enabled by setting to true
:
zookeeper.set.acl=true
If ACL rules are enabled, when a znode
is created in Zookeeper only the Kafka user who created it can modify or delete it. All other users have read-only access.
Kafka sets ACL rules only for newly created Zookeeper znodes
. If the ACLs are only enabled after the first start of the cluster, the zookeeper-security-migration.sh
tool can set ACLs on all existing znodes
.
Confidentiality of data in Zookeeper
Data stored in Zookeeper includes:
- Topic names and their configuration
- Salted and hashed user credentials when SASL SCRAM authentication is used.
But Zookeeper does not store any records sent and received using Kafka. The data stored in Zookeeper is assumed to be non-confidential.
If the data is to be regarded as confidential (for example because topic names contain customer IDs), the only option available for protection is isolating Zookeeper on the network level and allowing access only to Kafka brokers.
4.8.2. Enabling Zookeeper ACLs for a new Kafka cluster
This procedure describes how to enable Zookeeper ACLs in Kafka configuration for a new Kafka cluster. Use this procedure only before the first start of the Kafka cluster. For enabling Zookeeper ACLs in a cluster that is already running, see Section 4.8.3, “Enabling Zookeeper ACLs in an existing Kafka cluster”.
Prerequisites
- AMQ Streams is installed on all hosts which will be used as Kafka brokers.
- Zookeeper cluster is configured and running.
- Client-to-server authentication is enabled in Zookeeper.
- Zookeeper authentication is enabled in the Kafka brokers.
- Kafka brokers have not yet been started.
Procedure
Edit the
/opt/kafka/config/server.properties
Kafka configuration file to set thezookeeper.set.acl
field totrue
on all cluster nodes.zookeeper.set.acl=true
- Start the Kafka brokers.
4.8.3. Enabling Zookeeper ACLs in an existing Kafka cluster
This procedure describes how to enable Zookeeper ACLs in Kafka configuration for a Kafka cluster that is running. Use the zookeeper-security-migration.sh
tool to set Zookeeper ACLs on all existing znodes
. The zookeeper-security-migration.sh
is available as part of AMQ Streams, and can be found in the bin
directory.
Prerequisites
- Kafka cluster is configured and running.
Enabling the Zookeeper ACLs
Edit the
/opt/kafka/config/server.properties
Kafka configuration file to set thezookeeper.set.acl
field totrue
on all cluster nodes.zookeeper.set.acl=true
- Restart all Kafka brokers one by one.
Set the ACLs on all existing Zookeeper
znodes
using thezookeeper-security-migration.sh
tool.su - kafka cd /opt/kafka KAFKA_OPTS="-Djava.security.auth.login.config=./config/jaas.conf"; ./bin/zookeeper-security-migration.sh --zookeeper.acl=secure --zookeeper.connect=_<ZookeeperURL>_ exit
For example:
su - kafka cd /opt/kafka KAFKA_OPTS="-Djava.security.auth.login.config=./config/jaas.conf"; ./bin/zookeeper-security-migration.sh --zookeeper.acl=secure --zookeeper.connect=zoo1.my-domain.com:2181 exit
4.9. Encryption and authentication
Kafka supports TLS for encrypting the communication with Kafka clients. Additionally, it supports two types of authentication:
- TLS client authentication based on X.509 certificates
- SASL Authentication based on a username and password
4.9.1. Listener configuration
Encryption and authentication in Kafka brokers is configured per listener. For more information about Kafka listener configuration, see Section 4.2, “Listeners”.
Each listener in the Kafka broker is configured with its own security protocol. The configuration property listener.security.protocol.map
defines which listener uses which security protocol. It maps each listener name to its security protocol. Supported security protocols are:
PLAINTEXT
- Listener without any encryption or authentication.
SSL
- Listener using TLS encryption and, optionally, authentication using TLS client certificates.
SASL_PLAINTEXT
- Listener without encryption but with SASL-based authentication.
SASL_SSL
- Listener with TLS-based encryption and SASL-based authentication.
Given the following listeners
configuration:
listeners=INT1://:9092,INT2://:9093,REPLICATION://:9094
the listener.security.protocol.map
might look like this:
listener.security.protocol.map=INT1:SASL_PLAINTEXT,INT2:SASL_SSL,REPLICATION:SSL
This would configure the listener INT1
to use unencrypted connections with SASL authentication, the listener INT2
to use encrypted connections with SASL authentication and the REPLICATION
interface to use TLS encryption (possibly with TLS client authentication). The same security protocol can be used multiple times. The following example is also a valid configuration:
listener.security.protocol.map=INT1:SSL,INT2:SSL,REPLICATION:SSL
Such a configuration would use TLS encryption and TLS authentication for all interfaces. The following chapters will explain in more detail how to configure TLS and SASL.
4.9.2. TLS Encryption
In order to use TLS encryption and server authentication, a keystore containing private and public keys has to be provided. This is usually done using a file in the Java Keystore (JKS) format. A path to this file is set in the ssl.keystore.location
property. The ssl.keystore.password
property should be used to set the password protecting the keystore. For example:
ssl.keystore.location=/path/to/keystore/server-1.jks ssl.keystore.password=123456
In some cases, an additional password is used to protect the private key. Any such password can be set using the ssl.key.password
property.
Kafka is able to use keys signed by certification authorities as well as self-signed keys. Using keys signed by certification authorities should always be the preferred method. In order to allow clients to verify the identity of the Kafka broker they are connecting to, the certificate should always contain the advertised hostname(s) as its Common Name (CN) or in the Subject Alternative Names (SAN).
It is possible to use different SSL configurations for different listeners. All options starting with ssl.
can be prefixed with listener.name.<NameOfTheListener>.
, where the name of the listener has to be always in lower case. This will override the default SSL configuration for that specific listener. The following example shows how to use different SSL configurations for different listeners:
listeners=INT1://:9092,INT2://:9093,REPLICATION://:9094 listener.security.protocol.map=INT1:SSL,INT2:SSL,REPLICATION:SSL # Default configuration - will be used for listeners INT1 and INT2 ssl.keystore.location=/path/to/keystore/server-1.jks ssl.keystore.password=123456 # Different configuration for listener REPLICATION listener.name.replication.ssl.keystore.location=/path/to/keystore/server-1.jks listener.name.replication.ssl.keystore.password=123456
Additional TLS configuration options
In addition to the main TLS configuration options described above, Kafka supports many options for fine-tuning the TLS configuration. For example, to enable or disable TLS / SSL protocols or cipher suites:
ssl.cipher.suites
- List of enabled cipher suites. Each cipher suite is a combination of authentication, encryption, MAC and key exchange algorithms used for the TLS connection. By default, all available cipher suites are enabled.
ssl.enabled.protocols
-
List of enabled TLS / SSL protocols. Defaults to
TLSv1.2,TLSv1.1,TLSv1
.
For a complete list of supported Kafka broker configuration options, see Appendix A, Broker configuration parameters.
4.9.3. Enabling TLS encryption
This procedure describes how to enable encryption in Kafka brokers.
Prerequisites
- AMQ Streams is installed on all hosts which will be used as Kafka brokers.
Procedure
- Generate TLS certificates for all Kafka brokers in your cluster. The certificates should have their advertised and bootstrap addresses in their Common Name or Subject Alternative Name.
Edit the
/opt/kafka/config/server.properties
Kafka configuration file on all cluster nodes for the following:-
Change the
listener.security.protocol.map
field to specify theSSL
protocol for the listener where you want to use TLS encryption. -
Set the
ssl.keystore.location
option to the path to the JKS keystore with the broker certificate. Set the
ssl.keystore.password
option to the password you used to protect the keystore.For example:
listeners=UNENCRYPTED://:9092,ENCRYPTED://:9093,REPLICATION://:9094 listener.security.protocol.map=UNENCRYPTED:PLAINTEXT,ENCRYPTED:SSL,REPLICATION:PLAINTEXT ssl.keystore.location=/path/to/keystore/server-1.jks ssl.keystore.password=123456
-
Change the
- (Re)start the Kafka brokers
Additional resources
- For more information about configuring AMQ Streams, see Section 2.8, “Configuring AMQ Streams”.
- For more information about running a Kafka cluster, see Section 4.5, “Running a multi-node Kafka cluster”.
For more information about configuring TLS encryption in clients, see:
4.9.4. Authentication
Kafka supports two methods of authentication. On all connections, authentication using one of the supported SASL (Simple Authentication and Security Layer) mechanisms can be used. On encrypted connections, TLS client authentication based on X.509 certificates can be used.
4.9.4.1. TLS client authentication
TLS client authentication can be used only on connections which are already using TLS encryption. To use TLS client authentication, a truststore with public keys can be provided to the broker. These keys can be used to authenticate clients connecting to the broker. The truststore should be provided in Java Keystore (JKS) format and should contain public keys of the certification authorities. All clients with public and private keys signed by one of the certification authorities included in the truststore will be authenticated. The location of the truststore is set using field ssl.truststore.location
. In case the truststore is password protected, the password should be set in the ssl.truststore.password
property. For example:
ssl.truststore.location=/path/to/keystore/server-1.jks ssl.truststore.password=123456
Once the truststore is configured, TLS client authentication has to be enabled using the ssl.client.auth
property. This property can be set to one of three different values:
none
- TLS client authentication is switched off. (Default value)
requested
- TLS client authentication is optional. Clients will be asked to authenticate using TLS client certificate but they can choose not to.
required
- Clients are required to authenticate using TLS client certificate.
When a client authenticates using TLS client authentication, the authenticated principal name is the distinguished name from the authenticated client certificate. For example, a user with a certificate which has a distinguished name CN=someuser
will be authenticated with the following principal CN=someuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown
. When TLS client authentication is not used and SASL is disabled, the principal name will be ANONYMOUS
.
4.9.4.2. SASL authentication
SASL authentication is configured using Java Authentication and Authorization Service (JAAS). JAAS is also used for authentication of connections between Kafka and Zookeeper. JAAS uses its own configuration file. The recommended location for this file is /opt/kafka/config/jaas.conf
. The file has to be readable by the kafka
user. When running Kafka, the location of this file is specified using Java system property java.security.auth.login.config
. This property has to be passed to Kafka when starting the broker nodes:
KAFKA_OPTS="-Djava.security.auth.login.config=/path/to/my/jaas.config"; bin/kafka-server-start.sh
SASL authentication is supported both through plain unencrypted connections as well as through TLS connections. SASL can be enabled individually for each listener. To enable it, the security protocol in listener.security.protocol.map
has to be either SASL_PLAINTEXT
or SASL_SSL
.
SASL authentication in Kafka supports several different mechanisms:
PLAIN
- Implements authentication based on username and passwords. Usernames and passwords are stored locally in Kafka configuration.
SCRAM-SHA-256
andSCRAM-SHA-512
- Implements authentication using Salted Challenge Response Authentication Mechanism (SCRAM). SCRAM credentials are stored centrally in Zookeeper. SCRAM can be used in situations where Zookeeper cluster nodes are running isolated in a private network.
GSSAPI
- Implements authentication against a Kerberos server.
The PLAIN
mechanism sends the username and password over the network in an unencrypted format. It should be therefore only be used in combination with TLS encryption.
The SASL mechanisms are configured via the JAAS configuration file. Kafka uses the JAAS context named KafkaServer
. After they are configured in JAAS, the SASL mechanisms have to be enabled in the Kafka configuration. This is done using the sasl.enabled.mechanisms
property. This property contains a comma-separated list of enabled mechanisms:
sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512
In case the listener used for inter-broker communication is using SASL, the property sasl.mechanism.inter.broker.protocol
has to be used to specify the SASL mechanism which it should use. For example:
sasl.mechanism.inter.broker.protocol=PLAIN
The username and password which will be used for the inter-broker communication has to be specified in the KafkaServer
JAAS context using the field username
and password
.
SASL PLAIN
To use the PLAIN mechanism, the usernames and password which are allowed to connect are specified directly in the JAAS context. The following example shows the context configured for SASL PLAIN authentication. The example configures three different users:
-
admin
-
user1
-
user2
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required user_admin="123456" user_user1="123456" user_user2="123456"; };
The JAAS configuration file with the user database should be kept in sync on all Kafka brokers.
When SASL PLAIN is also used for inter-broker authentication, the username
and password
properties should be included in the JAAS context:
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="123456" user_admin="123456" user_user1="123456" user_user2="123456"; };
SASL SCRAM
SCRAM authentication in Kafka consists of two mechanisms: SCRAM-SHA-256
and SCRAM-SHA-512
. These mechanisms differ only in the hashing algorithm used - SHA-256 versus stronger SHA-512. To enable SCRAM authentication, the JAAS configuration file has to include the following configuration:
KafkaServer { org.apache.kafka.common.security.scram.ScramLoginModule required; };
When enabling SASL authentication in the Kafka configuration file, both SCRAM mechanisms can be listed. However, only one of them can be chosen for the inter-broker communication. For example:
sasl.enabled.mechanisms=SCRAM-SHA-256,SCRAM-SHA-512 sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
User credentials for the SCRAM mechanism are stored in Zookeeper. The kafka-configs.sh
tool can be used to manage them. For example, run the following command to add user user1 with password 123456:
bin/kafka-configs.sh --zookeeper zoo1.my-domain.com:2181 --alter --add-config 'SCRAM-SHA-256=[password=123456],SCRAM-SHA-512=[password=123456]' --entity-type users --entity-name user1
To delete a user credential use:
bin/kafka-configs.sh --zookeeper zoo1.my-domain.com:2181 --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name user1
SASL GSSAPI
The SASL mechanism used for authentication using Kerberos is called GSSAPI
. To configure Kerberos SASL authentication, the following configuration should be added to the JAAS configuration file:
KafkaServer { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/etc/security/keytabs/kafka_server.keytab" principal="kafka/kafka1.hostname.com@EXAMPLE.COM"; };
The domain name in the Kerberos principal has to be always in upper case.
In addition to the JAAS configuration, the Kerberos service name needs to be specified in the sasl.kerberos.service.name
property in the Kafka configuration:
sasl.enabled.mechanisms=GSSAPI sasl.mechanism.inter.broker.protocol=GSSAPI sasl.kerberos.service.name=kafka
Multiple SASL mechanisms
Kafka can use multiple SASL mechanisms at the same time. The different JAAS configurations can be all added to the same context:
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required user_admin="123456" user_user1="123456" user_user2="123456"; com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/etc/security/keytabs/kafka_server.keytab" principal="kafka/kafka1.hostname.com@EXAMPLE.COM"; org.apache.kafka.common.security.scram.ScramLoginModule required; };
When multiple mechanisms are enabled, clients will be able to choose the mechanism which they want to use.
4.9.5. Enabling TLS client authentication
This procedure describes how to enable TLS client authentication in Kafka brokers.
Prerequisites
Procedure
- Prepare a JKS truststore containing the public key of the certification authority used to sign the user certificates.
Edit the
/opt/kafka/config/server.properties
Kafka configuration file on all cluster nodes for the following:-
Set the
ssl.truststore.location
option to the path to the JKS truststore with the certification authority of the user certificates. -
Set the
ssl.truststore.password
option to the password you used to protect the truststore. Set the
ssl.client.auth
option torequired
.For example:
ssl.truststore.location=/path/to/truststore.jks ssl.truststore.password=123456 ssl.client.auth=required
-
Set the
- (Re)start the Kafka brokers
Additional resources
- For more information about configuring AMQ Streams, see Section 2.8, “Configuring AMQ Streams”.
- For more information about running a Kafka cluster, see Section 4.5, “Running a multi-node Kafka cluster”.
For more information about configuring TLS encryption in clients, see:
4.9.6. Enabling SASL PLAIN authentication
This procedure describes how to enable SASL PLAIN authentication in Kafka brokers.
Prerequisites
- AMQ Streams is installed on all hosts which will be used as Kafka brokers.
Procedure
Edit or create the
/opt/kafka/config/jaas.conf
JAAS configuration file. This file should contain all your users and their passwords. Make sure this file is the same on all Kafka brokers.For example:
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required user_admin="123456" user_user1="123456" user_user2="123456"; };
Edit the
/opt/kafka/config/server.properties
Kafka configuration file on all cluster nodes for the following:-
Change the
listener.security.protocol.map
field to specify theSASL_PLAINTEXT
orSASL_SSL
protocol for the listener where you want to use SASL PLAIN authentication. Set the
sasl.enabled.mechanisms
option toPLAIN
.For example:
listeners=INSECURE://:9092,AUTHENTICATED://:9093,REPLICATION://:9094 listener.security.protocol.map=INSECURE:PLAINTEXT,AUTHENTICATED:SASL_PLAINTEXT,REPLICATION:PLAINTEXT sasl.enabled.mechanisms=PLAIN
-
Change the
(Re)start the Kafka brokers using the KAFKA_OPTS environment variable to pass the JAAS configuration to Kafka brokers.
su - kafka export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/config/jaas.conf"; /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
Additional resources
- For more information about configuring AMQ Streams, see Section 2.8, “Configuring AMQ Streams”.
- For more information about running a Kafka cluster, see Section 4.5, “Running a multi-node Kafka cluster”.
For more information about configuring SASL PLAIN authentication in clients, see:
4.9.7. Enabling SASL SCRAM authentication
This procedure describes how to enable SASL SCRAM authentication in Kafka brokers.
Prerequisites
- AMQ Streams is installed on all hosts which will be used as Kafka brokers.
Procedure
Edit or create the
/opt/kafka/config/jaas.conf
JAAS configuration file. Enable theScramLoginModule
for theKafkaServer
context. Make sure this file is the same on all Kafka brokers.For example:
KafkaServer { org.apache.kafka.common.security.scram.ScramLoginModule required; };
Edit the
/opt/kafka/config/server.properties
Kafka configuration file on all cluster nodes for the following:-
Change the
listener.security.protocol.map
field to specify theSASL_PLAINTEXT
orSASL_SSL
protocol for the listener where you want to use SASL SCRAM authentication. Set the
sasl.enabled.mechanisms
option toSCRAM-SHA-256
orSCRAM-SHA-512
.For example:
listeners=INSECURE://:9092,AUTHENTICATED://:9093,REPLICATION://:9094 listener.security.protocol.map=INSECURE:PLAINTEXT,AUTHENTICATED:SASL_PLAINTEXT,REPLICATION:PLAINTEXT sasl.enabled.mechanisms=SCRAM-SHA-512
-
Change the
(Re)start the Kafka brokers using the KAFKA_OPTS environment variable to pass the JAAS configuration to Kafka brokers.
su - kafka export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/config/jaas.conf"; /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
Additional resources
- For more information about configuring AMQ Streams, see Section 2.8, “Configuring AMQ Streams”.
- For more information about running a Kafka cluster, see Section 4.5, “Running a multi-node Kafka cluster”.
- For more information about adding SASL SCRAM users, see Section 4.9.8, “Adding SASL SCRAM users”.
- For more information about deleting SASL SCRAM users, see Section 4.9.9, “Deleting SASL SCRAM users”.
For more information about configuring SASL SCRAM authentication in clients, see:
4.9.8. Adding SASL SCRAM users
This procedure describes how to add new users for authentication using SASL SCRAM.
Prerequisites
Procedure
Use the
kafka-configs.sh
tool to add new SASL SCRAM users.bin/kafka-configs.sh --zookeeper <ZookeeperAddress> --alter --add-config 'SCRAM-SHA-512=[password=<Password>]' --entity-type users --entity-name <Username>
For example:
bin/kafka-configs.sh --zookeeper zoo1.my-domain.com:2181 --alter --add-config 'SCRAM-SHA-512=[password=123456]' --entity-type users --entity-name user1
Additional resources
For more information about configuring SASL SCRAM authentication in clients, see:
4.9.9. Deleting SASL SCRAM users
This procedure describes how to remove users when using SASL SCRAM authentication.
Prerequisites
Procedure
Use the
kafka-configs.sh
tool to delete SASL SCRAM users.bin/kafka-configs.sh --zookeeper <ZookeeperAddress> --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name <Username>
For example:
bin/kafka-configs.sh --zookeeper zoo1.my-domain.com:2181 --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name user1
Additional resources
For more information about configuring SASL SCRAM authentication in clients, see:
4.10. Using OAuth 2.0 token based authentication
AMQ Streams supports the use of OAuth 2.0 authentication using the SASL OAUTHBEARER mechanism.
OAuth 2.0 enables standardized token based authentication and authorization between applications, using a central authorization server to issue tokens that grant limited access to resources.
OAuth 2.0 is currently only supported for authentication, with no authorization support. However, OAuth 2.0 authentication can be used in conjunction with ACL-based Kafka authorization.
Using OAuth 2.0 token based authentication, application clients can access resources on application servers (called ‘resource servers’) without exposing account credentials. The client passes an access token as a means of authenticating, which application servers can also use to find more information about the level of access granted. The authorization server handles the granting of access and inquiries about access.
In the context of AMQ Streams:
- Kafka brokers act as resource servers
- Kafka clients act as resource clients
The brokers and clients communicate with the OAuth 2.0 authorization server, as necessary, to obtain or validate access tokens.
For a deployment of AMQ Streams, OAuth 2.0 integration provides:
- Server-side OAuth 2.0 support for Kafka brokers
- Client-side OAuth 2.0 support for Kafka Mirror Maker, Kafka Connect and the Kafka Bridge
Additional resources
4.10.1. OAuth 2.0 authentication mechanism
Support for OAuth 2.0 is based on the Kafka SASL OAUTHBEARER mechanism, which is used to establish authenticated sessions with a Kafka broker.
A Kafka client initiates a session with the Kafka broker using the SASL OAUTHBEARER mechanism for credentials exchange, where credentials take the form of an access token.
Kafka brokers and clients need to be configured to use OAuth 2.0.
Kafka broker configuration
The Kafka broker must be configured to validate the token received during session initiation. The recommended approach is to create a client definition in an authorization server with:
-
Client ID of
kafka-broker
- Client ID and secret as the authentication mechanism
Kafka client configuration
A Kafka client is configured with either:
- Credentials required to obtain a valid access token from an authorization server
- A valid long-lived access token, obtained using tools provided by an authorization server
Credentials are never sent to the Kafka broker. The only information ever sent to the Kafka broker is an access token. When a client obtains an access token, no further communication with the authorization server is needed.
The simplest mechanism, which requires no additional usage of authorization server tools, is authentication with a client ID and secret. Using a long-lived access token, or a long-lived refresh token, is more complex.
If you are using long-lived access tokens, you can set policy in the authorization server to increase the maximum lifetime of the token.
If the Kafka client is not configured with an access token directly, the client exchanges credentials for an access token during Kafka session initiation by contacting the authorization server. The Kafka client uses one of two mechanisms:
- Client id and secret
- Client id, refresh token, and (optionally) a secret
4.10.2. OAuth 2.0 client authentication flow
In this section, we explain and visualize the communication flow between Kafka client, Kafka broker, and authorization server during Kafka session initiation. The flow depends on the client and server configuration.
When a Kafka client sends an access token as credentials to a Kafka broker, the token needs to be validated.
Depending on the authorization server used, and the configuration options available, you may prefer to use:
- Fast local token validation based on JWT signature checking and local token introspection, without contacting the authorization server
- An OAuth 2.0 introspection endpoint provided by the authorization server
Using fast local token validation requires the authorization server to provide a JWKS endpoint with public certificates that are used to validate signatures on the tokens.
An authorization server might only allow the use of opaque access tokens, which means that local token validation is not possible.
Another option is to use an OAuth 2.0 introspection endpoint on the authorization server. Each time a Kafka broker connection is established, the broker sends the access token it receives to the authorization server, and a response confirming whether or not the token is valid is returned.
Kafka client credentials can also be configured for:
- Direct local access using a previously generated long-lived access token
- Contact with the authorization server for a new access token to be issued and sent to the Kafka broker
4.10.2.1. Example client authentication flows
Here you can see the communication flows, for different configurations of Kafka clients and brokers, during Kafka session authentication.
- Client using client ID and secret, with broker delegating validation to authorization server
- Client using client ID and secret, with broker performing fast local token validation
- Client using long-lived access token, with broker delegating validation to authorization server
- Client using long-lived access token, with broker performing fast local validation
Client using client ID and secret, with broker delegating validation to authorization server
- Kafka client requests access token from authorization server, using client ID and secret, and optionally a refresh token.
- Authorization server generates a new access token.
- Kafka client authenticates with the Kafka broker using the SASL OAUTHBEARER mechanism to pass the access token.
- Kafka broker validates the access token by calling a token introspection endpoint on authorization server, using its own client ID and secret.
- Kafka client session is established if the token is valid.
Client using client ID and secret, with broker performing fast local token validation
- Kafka client authenticates with authorization server from the token endpoint, using a client ID and secret, and optionally a refresh token.
- Authorization server generates a new access token.
- Kafka client authenticates with the Kafka broker using the SASL OAUTHBEARER mechanism to pass the access token.
- Kafka broker validates the access token locally using a JWT token signature check, and local token introspection.
Client using long-lived access token, with broker delegating validation to authorization server
- Kafka client authenticates with the Kafka broker using the SASL OAUTHBEARER mechanism to pass the long-lived access token.
- Kafka broker validates the access token by calling a token introspection endpoint on authorization server, using its own client ID and secret.
- Kafka client session is established if the token is valid.
Client using long-lived access token, with broker performing fast local validation
- Kafka client authenticates with the Kafka broker using the SASL OAUTHBEARER mechanism to pass the long-lived access token.
- Kafka broker validates the access token locally using JWT token signature check, and local token introspection.
Fast local JWT token signature validation is suitable only for short-lived tokens as there is no check with the authorization server if a token has been revoked. Token expiration is written into the token, but revocation can happen at any time, so cannot be accounted for without contacting the authorization server. Any issued token would be considered valid until it expires.
4.10.3. Configuring OAuth 2.0 authentication
OAuth 2.0 is used for interaction between Kafka clients and AMQ Streams components.
In order to use OAuth 2.0 for AMQ Streams, you must:
4.10.3.1. Configuring Red Hat Single Sign-On as an OAuth 2.0 authorization server
This procedure describes how to deploy Red Hat Single Sign-On as an authorization server and configure it for integration with AMQ Streams.
The authorization server provides a central point for authentication and authorization, and management of users, clients, and permissions. Red Hat Single Sign-On has a concept of realms where a realm represents a separate set of users, clients, permissions, and other configuration. You can use a default master realm, or create a new one. Each realm exposes its own OAuth 2.0 endpoints, which means that application clients and application servers all need to use the same realm.
To use OAuth 2.0 with AMQ Streams, you need a deployment of an authorization server to be able to create and manage authentication realms.
If you already have Red Hat Single Sign-On deployed, you can skip the deployment step and use your current deployment.
Before you begin
You will need to be familiar with using Red Hat Single Sign-On.
For installation and administration instructions, see:
Prerequisites
- AMQ Streams and Kafka are running
For the Red Hat Single Sign-On deployment:
Procedure
Install Red Hat Single Sign-On.
You can install from a ZIP file or by using an RPM.
Log in to the Red Hat Single Sign-On Admin Console to create the OAuth 2.0 policies for AMQ Streams.
Login details are provided when you deploy Red Hat Single Sign-On.
Create and enable a realm.
You can use an existing master realm.
- Adjust the session and token timeouts for the realm, if required.
-
Create a client called
kafka-broker
. From the
tab, set:-
Access Type to
Confidential
-
Standard Flow Enabled to
OFF
to disable web login for this client -
Service Accounts Enabled to
ON
to allow this client to authenticate in its own name
-
Access Type to
- Click Save before continuing.
- From the tab, take a note of the secret for using in your AMQ Streams Kafka cluster configuration.
Repeat the client creation steps for any application client that will connect to your Kafka brokers.
Create a definition for each new client.
You will use the names as client IDs in your configuration.
What to do next
After deploying and configuring the authorization server, configure the Kafka brokers to use OAuth 2.0.
4.10.3.2. Configuring OAuth 2.0 support for Kafka brokers
This procedure describes how to configure Kafka brokers so that the broker listeners are enabled to use OAuth 2.0 authentication using an authorization server.
Before you start
For more information on the configuration and authentication of Kafka broker listeners, see:
Prerequisites
- AMQ Streams and Kafka are running
- An OAuth 2.0 authorization server is deployed
Procedure
Check you have the following JAR files in your Kafka
libs
directory.strimzi-kafka-oauth-common-*.jar strimzi-kafka-oauth-server-*.jar strimzi-kafka-oauth-client-*.jar keycloak-common-*.jar keycloak-core-*.jar bcprov-*.jar
If they are not already present, the files are available from the installation archive.
Update the Kafka
server.properties
file.sasl.enabled.mechanisms=OAUTHBEARER 1 listeners=CLIENTS://kafka:9092 2 listener.security.protocol.map=CLIENTS:SASL_PLAINTEXT 3 listener.name.clients.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required; 4 listener.name.clients.oauthbearer.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler listener.name.clients.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler
- 1
- Enables the OAUTHBEARER as SASL mechanism for credentials exchange over SASL.
- 2
- Configures a listener for client applications, replacing
kafka
with a valid hostname, resolvable by clients. - 3
- Specifies the channel protocol for the listener. SASL_PLAINTEXT means unencrypted connection (no TLS), so there is risk of eavesdropping and interception at the TCP connection layer.
- 4
- The three listener properties enable OAuth 2.0 for AMQ Streams.
To further configure OAuth 2.0 support, use one of the following connection options.
OPTION 1: Add environment variables to enable local token validation using a JWKS endpoint configuration.
export OAUTH_JWKS_ENDPOINT_URI=https://<authorization-server-address>/jwks 1 export OAUTH_JWKS_REFRESH_SECONDS=300 2 export OAUTH_JWKS_EXPIRY_SECONDS=360 3
- 1
- The JWKS endpoint URL. For example,
https://<authorization-server-address>/auth/realms/master/protocol/openid-connect/certs
. - 2
- The period between endpoint refreshes (default 300).
- 3
- The certificate’s expiry time (default 360). If you specify a longer time, consider the risk of allowing access to revoked certificates.
OPTION 2: Add environment variables to enable delegating token validation to the authorization server through the OAuth 2.0 introspection endpoint.
export OAUTH_INTROSPECTION_ENDPOINT_URI=https://<authorization-server-address>/introspection 1
- 1
- The OAuth 2.0 introspection endpoint URL. For example,
https://<authorization-server-address>/auth/realms/master/protocol/openid-connect/token/introspect
.
If required, configure access to the authorization server.
This step is normally required for a production environment, unless a technology like service mesh is used to configure secure channels outside containers.
Provide a custom truststore for connecting to the authorization server. SSL is always required for access to the authorization server.
Set environment variables to configure a truststore for connecting to a secured authorization server:
For example:
export OAUTH_SSL_TRUSTSTORE_LOCATION=</path/to/truststore.p12> 1 export OAUTH_SSL_TRUSTSTORE_PASSWORD=<my-password> 2 export OAUTH_SSL_TRUSTSTORE_TYPE=pkcs12 3
If the certificate hostname does not match the access URL hostname, you can turn off certificate hostname validation:
export OAUTH_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM= ""
The check ensures that client connection to the authorization server is authentic. You may wish to turn off the validation in a non-production environment.
Configure the following environment variables according to your chosen authentication flow.
export OAUTH_TOKEN_ENDPOINT_URI=https://<authorization-server-address>/token 1 export OAUTH_VALID_ISSUER_URI=https://<authorization-server-address> 2 export OAUTH_CLIENT_ID=kafka-broker 3 export OAUTH_CLIENT_SECRET=kafka-broker-secret 4 export OAUTH_REFRESH_TOKEN=<my-refresh-token-for-kafka-brokers> 5 export OAUTH_ACCESS_TOKEN=<my-access-token-for-kafka-brokers> 6
- 1
- The OAuth 2.0 token endpoint URL to your authorization server. For production, always use HTTPs.
- 2
- A valid issuer URI. Only access tokens issued by this issuer will be accepted.
- 3
- The configured client ID of the Kafka broker, which is the same for all brokers. This is the client registered with the authorization server as
kafka-broker
. - 4
- The configured secret for Kafka broker, which is the same for all brokers.
- 5
- (Optional) A long-lived refresh token for Kafka brokers.
- 6
- (Optional) A long-lived access token for Kafka brokers.
What to do next
4.10.3.3. Configuring Kafka Java clients to use OAuth 2.0
This procedure describes how to configure Kafka producer and consumer APIs to use OAuth 2.0 for interaction with Kafka brokers.
Add a client callback plugin to your pom.xml file, and configure the system properties.
Prerequisites
- AMQ Streams and Kafka are running
- An OAuth 2.0 authorization server is deployed and configured for OAuth access to Kafka brokers
- Kafka brokers are configured for OAuth 2.0
Procedure
Add the client library with OAuth 2.0 support to the
pom.xml
file for the Kafka client:<dependency> <groupId>io.strimzi</groupId> <artifactId>kafka-oauth-client</artifactId> <version>0.1.0.redhat-00002</version> </dependency>
Configure the system properties for the callback:
For example:
System.setProperty(ClientConfig.OAUTH_TOKEN_ENDPOINT_URI, “https://<authorization-server-address>/auth/realms/master/protocol/openid-connect/token”); 1 System.setProperty(ClientConfig.OAUTH_CLIENT_ID, "<client-name>"); 2 System.setProperty(ClientConfig.OAUTH_CLIENT_SECRET, "<client-secret>"); 3
Enable the SASL OAUTHBEARER mechanism on a TLS encrypted connection in the Kafka client configuration:
For example:
props.put("sasl.jaas.config", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"); props.put("security.protocol", "SASL_SSL"); 1 props.put("sasl.mechanism", "OAUTHBEARER"); props.put("sasl.login.callback.handler.class", "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler");
- 1
- Here we use
SASL_SSL
for use over TLS connections. UseSASL_PLAINTEXT
over unencrypted connections.
- Verify that the Kafka client can access the Kafka brokers.
4.11. Logging
Kafka brokers use Log4j as their logging infrastructure. Logging configuration is by default read from the log4j.propeties
configuration file which should be placed either in the /opt/kafka/config/
directory or on the classpath. The location and name of the configuration file can be changed using the Java property log4j.configuration
which can be passed to Kafka using the KAFKA_LOG4J_OPTS
environment variable:
su - kafka export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:/my/path/to/log4j.config"; /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
For more information about Log4j configurations, see Log4j manual.