このコンテンツは選択した言語では利用できません。
Chapter 16. Securing access to a Kafka cluster
Secure connections by configuring Kafka and Kafka users. Through configuration, you can implement encryption, authentication, and authorization mechanisms.
Kafka configuration
To establish secure access to Kafka, configure the Kafka
resource to set up the following configurations based on your specific requirements:
Listeners with specified authentication types to define how clients authenticate
- TLS encryption for communication between Kafka and clients
- Supported TLS versions and cipher suites for additional security
- Authorization for the entire Kafka cluster
- Network policies for restricting access
- Super users for unconstrained access to brokers
Authentication is configured independently for each listener, while authorization is set up for the whole Kafka cluster.
For more information on access configuration for Kafka, see the Kafka
schema reference and GenericKafkaListener
schema reference.
User (client-side) configuration
To enable secure client access to Kafka, configure KafkaUser
resources. These resources represent clients and determine how they authenticate and authorize with the Kafka cluster.
Configure the KafkaUser
resource to set up the following configurations based on your specific requirements:
Authentication that must match the enabled listener authentication
- Supported TLS versions and cipher suites that must match the Kafka configuration
Simple authorization to apply Access Control List (ACL) rules
- ACLs for fine-grained control over user access to topics and actions
- Quotas to limit client access based on byte rates or CPU utilization
The User Operator creates the user representing the client and the security credentials used for client authentication, based on the chosen authentication type.
For more information on access configuration for users, see the KafkaUser
schema reference.
16.1. Configuring client authentication on listeners
Configure client authentication for Kafka brokers when creating listeners. Specify the listener authentication type using the Kafka.spec.kafka.listeners.authentication
property in the Kafka
resource.
For clients inside the OpenShift cluster, you can create plain
(without encryption) or tls
internal listeners. The internal
listener type use a headless service and the DNS names given to the broker pods. As an alternative to the headless service, you can also create a cluster-ip
type of internal listener to expose Kafka using per-broker ClusterIP
services. For clients outside the OpenShift cluster, you create external listeners and specify a connection mechanism, which can be nodeport
, loadbalancer
, ingress
(Kubernetes only), or route
(OpenShift only).
For more information on the configuration options for connecting an external client, see Chapter 15, Setting up client access to a Kafka cluster.
Supported authentication options:
- mTLS authentication (only on the listeners with TLS enabled encryption)
- SCRAM-SHA-512 authentication
- OAuth 2.0 token-based authentication
- Custom authentication
- TLS versions and cipher suites
If you’re using OAuth 2.0 for client access management, user authentication and authorization credentials are handled through the authorization server.
The authentication option you choose depends on how you wish to authenticate client access to Kafka brokers.
Try exploring the standard authentication options before using custom authentication. Custom authentication allows for any type of Kafka-supported authentication. It can provide more flexibility, but also adds complexity.
Figure 16.1. Kafka listener authentication options

The listener authentication
property is used to specify an authentication mechanism specific to that listener.
If no authentication
property is specified then the listener does not authenticate clients which connect through that listener. The listener will accept all connections without authentication.
Authentication must be configured when using the User Operator to manage KafkaUsers
.
The following example shows:
-
A
plain
listener configured for SCRAM-SHA-512 authentication -
A
tls
listener with mTLS authentication -
An
external
listener with mTLS authentication
Each listener is configured with a unique name and port within a Kafka cluster.
When configuring listeners for client access to brokers, you can use port 9092 or higher (9093, 9094, and so on), but with a few exceptions. The listeners cannot be configured to use the ports reserved for interbroker communication (9090 and 9091), Prometheus metrics (9404), and JMX (Java Management Extensions) monitoring (9999).
Example listener authentication configuration
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: my-cluster namespace: myproject spec: kafka: # ... listeners: - name: plain port: 9092 type: internal tls: true authentication: type: scram-sha-512 - name: tls port: 9093 type: internal tls: true authentication: type: tls - name: external3 port: 9094 type: loadbalancer tls: true authentication: type: tls # ...
16.1.1. mTLS authentication
mTLS authentication is always used for the communication between Kafka brokers and ZooKeeper pods.
Streams for Apache Kafka can configure Kafka to use TLS (Transport Layer Security) to provide encrypted communication between Kafka brokers and clients either with or without mutual authentication. For mutual, or two-way, authentication, both the server and the client present certificates. When you configure mTLS authentication, the broker authenticates the client (client authentication) and the client authenticates the broker (server authentication).
mTLS listener configuration in the Kafka
resource requires the following:
-
tls: true
to specify TLS encryption and server authentication -
authentication.type: tls
to specify the client authentication
When a Kafka cluster is created by the Cluster Operator, it creates a new secret with the name <cluster_name>-cluster-ca-cert
. The secret contains a CA certificate. The CA certificate is in PEM and PKCS #12 format. To verify a Kafka cluster, add the CA certificate to the truststore in your client configuration. To verify a client, add a user certificate and key to the keystore in your client configuration. For more information on configuring a client for mTLS, see Section 16.3.2, “Configuring user authentication”.
TLS authentication is more commonly one-way, with one party authenticating the identity of another. For example, when HTTPS is used between a web browser and a web server, the browser obtains proof of the identity of the web server.
16.1.2. SCRAM-SHA-512 authentication
SCRAM (Salted Challenge Response Authentication Mechanism) is an authentication protocol that can establish mutual authentication using passwords. Streams for Apache Kafka can configure Kafka to use SASL (Simple Authentication and Security Layer) SCRAM-SHA-512 to provide authentication on both unencrypted and encrypted client connections.
When SCRAM-SHA-512 authentication is used with a TLS connection, the TLS protocol provides the encryption, but is not used for authentication.
The following properties of SCRAM make it safe to use SCRAM-SHA-512 even on unencrypted connections:
- The passwords are not sent in the clear over the communication channel. Instead the client and the server are each challenged by the other to offer proof that they know the password of the authenticating user.
- The server and client each generate a new challenge for each authentication exchange. This means that the exchange is resilient against replay attacks.
When KafkaUser.spec.authentication.type
is configured with scram-sha-512
the User Operator will generate a random 32-character password consisting of upper and lowercase ASCII letters and numbers.
16.1.3. Restricting access to listeners with network policies
Control listener access by configuring the networkPolicyPeers
property in the Kafka
resource.
By default, Streams for Apache Kafka automatically creates a NetworkPolicy
resource for every enabled Kafka listener, allowing connections from all namespaces.
To restrict listener access to specific applications or namespaces at the network level, configure the networkPolicyPeers
property. Each listener can have its own networkPolicyPeers
configuration. For more information on network policy peers, refer to the NetworkPolicyPeer API reference.
If you want to use custom network policies, you can set the STRIMZI_NETWORK_POLICY_GENERATION
environment variable to false
in the Cluster Operator configuration. For more information, see Section 10.7, “Configuring the Cluster Operator”.
Your configuration of OpenShift must support ingress NetworkPolicies
in order to use network policies.
Prerequisites
- An OpenShift cluster with support for Ingress NetworkPolicies.
- The Cluster Operator is running.
Procedure
Configure the
networkPolicyPeers
property to define the application pods or namespaces allowed to access the Kafka cluster.This example shows configuration for a
tls
listener to allow connections only from application pods with the labelapp
set tokafka-client
:apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka spec: kafka: # ... listeners: - name: tls port: 9093 type: internal tls: true authentication: type: tls networkPolicyPeers: - podSelector: matchLabels: app: kafka-client # ... zookeeper: # ...
-
Apply the changes to the
Kafka
resource configuration.
Additional resources
16.1.4. Using custom listener certificates for TLS encryption
This procedure shows how to configure custom server certificates for TLS listeners or external listeners which have TLS encryption enabled.
By default, Kafka listeners use certificates signed by Streams for Apache Kafka’s internal CA (certificate authority). The Cluster Operator automatically generates a CA certificate when creating a Kafka cluster. To configure a client for TLS, the CA certificate is included in its truststore configuration to authenticate the Kafka cluster. Alternatively, you have the option to install and use your own CA certificates.
However, if you prefer more granular control by using your own custom certificates at the listener-level, you can configure listeners using brokerCertChainAndKey
properties. You create a secret with your own private key and server certificate, then specify them in the brokerCertChainAndKey
configuration.
User-provided certificates allow you to leverage existing security infrastructure. You can use a certificate signed by a public (external) CA or a private CA. Kafka clients need to trust the CA which was used to sign the listener certificate. If signed by a public CA, you usually won’t need to add it to a client’s truststore configuration.
Custom certificates are not managed by Streams for Apache Kafka, so you need to renew them manually.
Listener certificates are used for TLS encryption and server authentication only. They are not used for TLS client authentication. If you want to use your own certificate for TLS client authentication as well, you must install and use your own clients CA.
Prerequisites
- The Cluster Operator is running.
Each listener requires the following:
A compatible server certificate signed by an external CA. (Provide an X.509 certificate in PEM format.)
You can use one listener certificate for multiple listeners.
- Subject Alternative Names (SANs) are specified in the certificate for each listener. For more information, see Section 16.1.5, “Specifying SANs for custom listener certificates”.
If you are not using a self-signed certificate, you can provide a certificate that includes the whole CA chain in the certificate.
You can only use the brokerCertChainAndKey
properties if TLS encryption (tls: true
) is configured for the listener.
Streams for Apache Kafka does not support the use of encrypted private keys for TLS. The private key stored in the secret must be unencrypted for this to work.
Procedure
Create a
Secret
containing your private key and server certificate:oc create secret generic <my_secret> --from-file=<my_listener_key.key> --from-file=<my_listener_certificate.crt>
Edit the
Kafka
resource for your cluster.Configure the listener to use your
Secret
, certificate file, and private key file in theconfiguration.brokerCertChainAndKey
property.Example configuration for a
loadbalancer
external listener with TLS encryption enabled# ... listeners: - name: plain port: 9092 type: internal tls: false - name: external3 port: 9094 type: loadbalancer tls: true configuration: brokerCertChainAndKey: secretName: my-secret certificate: my-listener-certificate.crt key: my-listener-key.key # ...
Example configuration for a TLS listener
# ... listeners: - name: plain port: 9092 type: internal tls: false - name: tls port: 9093 type: internal tls: true configuration: brokerCertChainAndKey: secretName: my-secret certificate: my-listener-certificate.crt key: my-listener-key.key # ...
Apply the changes to the
Kafka
resource configuration.The Cluster Operator starts a rolling update of the Kafka cluster, which updates the configuration of the listeners.
NoteA rolling update is also started if you update a Kafka listener certificate in a
Secret
that is already used by a listener.
16.1.5. Specifying SANs for custom listener certificates
In order to use TLS hostname verification with custom Kafka listener certificates, you must specify the correct Subject Alternative Names (SANs) for each listener.
The certificate SANs must specify hostnames for the following:
- All of the Kafka brokers in your cluster
- The Kafka cluster bootstrap service
You can use wildcard certificates if they are supported by your CA.
16.1.5.1. Examples of SANs for internal listeners
Use the following examples to help you specify hostnames of the SANs in your certificates for your internal listeners.
Replace <cluster-name>
with the name of the Kafka cluster and <namespace>
with the OpenShift namespace where the cluster is running.
Wildcards example for a type: internal
listener
//Kafka brokers *.<cluster_name>-kafka-brokers *.<cluster_name>-kafka-brokers.<namespace>.svc // Bootstrap service <cluster_name>-kafka-bootstrap <cluster_name>-kafka-bootstrap.<namespace>.svc
Non-wildcards example for a type: internal
listener
// Kafka brokers <cluster_name>-kafka-0.<cluster_name>-kafka-brokers <cluster_name>-kafka-0.<cluster_name>-kafka-brokers.<namespace>.svc <cluster_name>-kafka-1.<cluster_name>-kafka-brokers <cluster_name>-kafka-1.<cluster_name>-kafka-brokers.<namespace>.svc # ... // Bootstrap service <cluster_name>-kafka-bootstrap <cluster_name>-kafka-bootstrap.<namespace>.svc
Non-wildcards example for a type: cluster-ip
listener
// Kafka brokers <cluster_name>-kafka-<listener-name>-0 <cluster_name>-kafka-<listener-name>-0.<namespace>.svc <cluster_name>-kafka-_listener-name>-1 <cluster_name>-kafka-<listener-name>-1.<namespace>.svc # ... // Bootstrap service <cluster_name>-kafka-<listener-name>-bootstrap <cluster_name>-kafka-<listener-name>-bootstrap.<namespace>.svc
16.1.5.2. Examples of SANs for external listeners
For external listeners which have TLS encryption enabled, the hostnames you need to specify in certificates depends on the external listener type
.
External listener type | In the SANs, specify… |
---|---|
|
Addresses of all Kafka broker You can use a matching wildcard name. |
|
Addresses of all Kafka broker You can use a matching wildcard name. |
|
Addresses of all Kafka broker You can use a matching wildcard name. |
| Addresses of all OpenShift worker nodes that the Kafka broker pods might be scheduled to. You can use a matching wildcard name. |
16.2. Configuring authorized access to Kafka
Configure authorized access to a Kafka cluster using the Kafka.spec.kafka.authorization
property in the Kafka
resource. If the authorization
property is missing, no authorization is enabled and clients have no restrictions. When enabled, authorization is applied to all enabled listeners. The authorization method is defined in the type
field.
Supported authorization options:
- Simple authorization
- OAuth 2.0 authorization (if you are using OAuth 2.0 token based authentication)
- Open Policy Agent (OPA) authorization
- Custom authorization
Figure 16.2. Kafka cluster authorization options

16.2.1. Designating super users
Super users can access all resources in your Kafka cluster regardless of any access restrictions, and are supported by all authorization mechanisms.
To designate super users for a Kafka cluster, add a list of user principals to the superUsers
property. If a user uses mTLS authentication, the username is the common name from the TLS certificate subject prefixed with CN=
. If you are not using the User Operator and using your own certificates for mTLS, the username is the full certificate subject.
A full certificate subject can include the following fields:
-
CN=<common_name>
-
OU=<organizational_unit>
-
O=<organization>
-
L=<locality>
-
ST=<state>
-
C=<country_code>
Omit any fields that are not applicable.
An example configuration with super users
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: my-cluster namespace: myproject spec: kafka: # ... authorization: type: simple superUsers: - CN=user-1 - user-2 - CN=user-3 - CN=user-4,OU=my-ou,O=my-org,L=my-location,ST=my-state,C=US - CN=user-5,OU=my-ou,O=my-org,C=GB - CN=user-6,O=my-org # ...
16.3. Configuring user (client-side) security mechanisms
When configuring security mechanisms in clients, the clients are represented as users. Use the KafkaUser
resource to configure the authentication, authorization, and access rights for Kafka clients.
Authentication permits user access, and authorization constrains user access to permissible actions. You can also create super users that have unconstrained access to Kafka brokers.
The authentication and authorization mechanisms must match the specification for the listener used to access the Kafka brokers.
For more information on configuring a KafkaUser
resource to access Kafka brokers securely, see Section 16.4, “Example: Setting up secure client access”.
16.3.1. Associating users with Kafka clusters
A KafkaUser
resource includes a label that defines the appropriate name of the Kafka cluster (derived from the name of the Kafka
resource) to which it belongs.
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaUser metadata: name: my-user labels: strimzi.io/cluster: my-cluster
The label enables the User Operator to identify the KafkaUser
resource and create and manager the user.
If the label does not match the Kafka cluster, the User Operator cannot identify the KafkaUser
, and the user is not created.
If the status of the KafkaUser
resource remains empty, check your label configuration.
16.3.2. Configuring user authentication
Use the KafkaUser
custom resource to configure authentication credentials for users (clients) that require access to a Kafka cluster. Configure the credentials using the authentication
property in KafkaUser.spec
. By specifying a type
, you control what credentials are generated.
Supported authentication types:
-
tls
for mTLS authentication -
tls-external
for mTLS authentication using external certificates -
scram-sha-512
for SCRAM-SHA-512 authentication
If tls
or scram-sha-512
is specified, the User Operator creates authentication credentials when it creates the user. If tls-external
is specified, the user still uses mTLS, but no authentication credentials are created. Use this option when you’re providing your own certificates. When no authentication type is specified, the User Operator does not create the user or its credentials.
You can use tls-external
to authenticate with mTLS using a certificate issued outside the User Operator. The User Operator does not generate a TLS certificate or a secret. You can still manage ACL rules and quotas through the User Operator in the same way as when you’re using the tls
mechanism. This means that you use the CN=USER-NAME
format when specifying ACL rules and quotas. USER-NAME is the common name given in a TLS certificate.
16.3.2.1. mTLS authentication
To use mTLS authentication, you set the type
field in the KafkaUser
resource to tls
.
Example user with mTLS authentication enabled
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaUser metadata: name: my-user labels: strimzi.io/cluster: my-cluster spec: authentication: type: tls # ...
The authentication type must match the equivalent configuration for the Kafka
listener used to access the Kafka cluster.
When the user is created by the User Operator, it creates a new secret with the same name as the KafkaUser
resource. The secret contains a private and public key for mTLS. The public key is contained in a user certificate, which is signed by a clients CA (certificate authority) when it is created. All keys are in X.509 format.
If you are using the clients CA generated by the Cluster Operator, the user certificates generated by the User Operator are also renewed when the clients CA is renewed by the Cluster Operator.
The user secret provides keys and certificates in PEM and PKCS #12 formats.
Example secret with user credentials
apiVersion: v1 kind: Secret metadata: name: my-user labels: strimzi.io/kind: KafkaUser strimzi.io/cluster: my-cluster type: Opaque data: ca.crt: <public_key> # Public key of the clients CA used to sign this user certificate user.crt: <user_certificate> # Public key of the user user.key: <user_private_key> # Private key of the user user.p12: <store> # PKCS #12 store for user certificates and keys user.password: <password_for_store> # Protects the PKCS #12 store
When you configure a client, you specify the following:
- Truststore properties for the public cluster CA certificate to verify the identity of the Kafka cluster
- Keystore properties for the user authentication credentials to verify the client
The configuration depends on the file format (PEM or PKCS #12). This example uses PKCS #12 stores, and the passwords required to access the credentials in the stores.
Example client configuration using mTLS in PKCS #12 format
bootstrap.servers=<kafka_cluster_name>-kafka-bootstrap:9093 1 security.protocol=SSL 2 ssl.truststore.location=/tmp/ca.p12 3 ssl.truststore.password=<truststore_password> 4 ssl.keystore.location=/tmp/user.p12 5 ssl.keystore.password=<keystore_password> 6
- 1
- The bootstrap server address to connect to the Kafka cluster.
- 2
- The security protocol option when using TLS for encryption.
- 3
- The truststore location contains the public key certificate (
ca.p12
) for the Kafka cluster. A cluster CA certificate and password is generated by the Cluster Operator in the<cluster_name>-cluster-ca-cert
secret when the Kafka cluster is created. - 4
- The password (
ca.password
) for accessing the truststore. - 5
- The keystore location contains the public key certificate (
user.p12
) for the Kafka user. - 6
- The password (
user.password
) for accessing the keystore.
16.3.2.2. mTLS authentication using a certificate issued outside the User Operator
To use mTLS authentication using a certificate issued outside the User Operator, you set the type
field in the KafkaUser
resource to tls-external
. A secret and credentials are not created for the user.
Example user with mTLS authentication that uses a certificate issued outside the User Operator
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaUser metadata: name: my-user labels: strimzi.io/cluster: my-cluster spec: authentication: type: tls-external # ...
16.3.2.3. SCRAM-SHA-512 authentication
To use the SCRAM-SHA-512 authentication mechanism, you set the type
field in the KafkaUser
resource to scram-sha-512
.
Example user with SCRAM-SHA-512 authentication enabled
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaUser metadata: name: my-user labels: strimzi.io/cluster: my-cluster spec: authentication: type: scram-sha-512 # ...
When the user is created by the User Operator, it creates a new secret with the same name as the KafkaUser
resource. The secret contains the generated password in the password
key, which is encoded with base64. In order to use the password, it must be decoded.
Example secret with user credentials
apiVersion: v1 kind: Secret metadata: name: my-user labels: strimzi.io/kind: KafkaUser strimzi.io/cluster: my-cluster type: Opaque data: password: Z2VuZXJhdGVkcGFzc3dvcmQ= 1 sasl.jaas.config: b3JnLmFwYWNoZS5rYWZrYS5jb21tb24uc2VjdXJpdHkuc2NyYW0uU2NyYW1Mb2dpbk1vZHVsZSByZXF1aXJlZCB1c2VybmFtZT0ibXktdXNlciIgcGFzc3dvcmQ9ImdlbmVyYXRlZHBhc3N3b3JkIjsK 2
Decoding the generated password:
echo "Z2VuZXJhdGVkcGFzc3dvcmQ=" | base64 --decode
16.3.2.3.1. Custom password configuration
When a user is created, Streams for Apache Kafka generates a random password. You can use your own password instead of the one generated by Streams for Apache Kafka. To do so, create a secret with the password and reference it in the KafkaUser
resource.
Example user with a password set for SCRAM-SHA-512 authentication
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaUser metadata: name: my-user labels: strimzi.io/cluster: my-cluster spec: authentication: type: scram-sha-512 password: valueFrom: secretKeyRef: name: my-secret 1 key: my-password 2 # ...
16.3.3. Configuring user authorization
Use the KafkaUser
custom resource to configure authorization rules for users (clients) that require access to a Kafka cluster. Configure the rules using the authorization
property in KafkaUser.spec
. By specifying a type
, you control what rules are used.
To use simple authorization, you set the type
property to simple
in KafkaUser.spec.authorization
. The simple authorization uses the Kafka Admin API to manage the ACL rules inside your Kafka cluster. Whether ACL management in the User Operator is enabled or not depends on your authorization configuration in the Kafka cluster.
- For simple authorization, ACL management is always enabled.
- For OPA authorization, ACL management is always disabled. Authorization rules are configured in the OPA server.
- For Red Hat build of Keycloak authorization, you can manage the ACL rules directly in Red Hat build of Keycloak. You can also delegate authorization to the simple authorizer as a fallback option in the configuration. When delegation to the simple authorizer is enabled, the User Operator will enable management of ACL rules as well.
-
For custom authorization using a custom authorization plugin, use the
supportsAdminApi
property in the.spec.kafka.authorization
configuration of theKafka
custom resource to enable or disable the support.
Authorization is cluster-wide. The authorization type must match the equivalent configuration in the Kafka
custom resource.
If ACL management is not enabled, Streams for Apache Kafka rejects a resource if it contains any ACL rules.
If you’re using a standalone deployment of the User Operator, ACL management is enabled by default. You can disable it using the STRIMZI_ACLS_ADMIN_API_SUPPORTED
environment variable.
If no authorization is specified, the User Operator does not provision any access rights for the user. Whether such a KafkaUser
can still access resources depends on the authorizer being used. For example, for simple
authorization, this is determined by the allow.everyone.if.no.acl.found
configuration in the Kafka cluster.
16.3.3.1. ACL rules
simple
authorization uses ACL rules to manage access to Kafka brokers.
ACL rules grant access rights to the user, which you specify in the acls
property.
For more information about the AclRule
object, see the AclRule
schema reference.
16.3.3.2. Super user access to Kafka brokers
If a user is added to a list of super users in a Kafka broker configuration, the user is allowed unlimited access to the cluster regardless of any authorization constraints defined in ACLs in KafkaUser
.
For more information on configuring super user access to brokers, see Kafka authorization.
16.3.4. Configuring user quotas
Configure the spec
for the KafkaUser
resource to enforce quotas so that a user does not overload Kafka brokers. Set size-based network usage and time-based CPU utilization thresholds.
Partition mutations occur in response to the following types of user requests:
- Creating partitions for a new topic
- Adding partitions to an existing topic
- Deleting partitions from a topic
You can also add a partition mutation quota to control the rate at which requests to change partitions are accepted.
Example KafkaUser
with user quotas
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaUser metadata: name: my-user labels: strimzi.io/cluster: my-cluster spec: # ... quotas: producerByteRate: 1048576 1 consumerByteRate: 2097152 2 requestPercentage: 55 3 controllerMutationRate: 10 4
- 1
- Byte-per-second quota on the amount of data the user can push to a Kafka broker.
- 2
- Byte-per-second quota on the amount of data the user can fetch from a Kafka broker.
- 3
- CPU utilization limit as a percentage of time for a client group.
- 4
- Number of concurrent partition creation and deletion operations (mutations) allowed per second.
Using quotas for Kafka clients might be useful in a number of situations. Consider a wrongly configured Kafka producer which is sending requests at too high a rate. Such misconfiguration can cause a denial of service to other clients, so the problematic client ought to be blocked. By using a network limiting quota, it is possible to prevent this situation from significantly impacting other clients.
Streams for Apache Kafka supports user-level quotas, but not client-level quotas.
16.4. Example: Setting up secure client access
This procedure shows how to configure client access to a Kafka cluster from outside OpenShift or from another OpenShift cluster. It’s split into two parts:
- Securing Kafka brokers
- Securing user access to Kafka
Resource configuration
Client access to the Kafka cluster is secured with the following configuration:
-
An external listener is configured with TLS encryption and mutual TLS (mTLS) authentication in the
Kafka
resource, as well assimple
authorization. -
A
KafkaUser
is created for the client, utilizing mTLS authentication, and Access Control Lists (ACLs) are defined forsimple
authorization.
At least one listener supporting the desired authentication must be configured for the KafkaUser
.
Listeners can be configured for mutual TLS
, SCRAM-SHA-512
, or OAuth
authentication. While mTLS always uses encryption, it’s also recommended when using SCRAM-SHA-512 and OAuth 2.0 authentication.
Authorization options for Kafka include simple
, OAuth
, OPA
, or custom
. When enabled, authorization is applied to all enabled listeners.
To ensure compatibility between Kafka and clients, configuration of the following authentication and authorization mechanisms must align:
-
For
type: tls
andtype: scram-sha-512
authentication types,Kafka.spec.kafka.listeners[*].authentication
must matchKafkaUser.spec.authentication
-
For
type: simple
authorization,Kafka.spec.kafka.authorization
must matchKafkaUser.spec.authorization
For example, mTLS authentication for a user is only possible if it’s also enabled in the Kafka configuration.
Automation and certificate management
Streams for Apache Kafka operators automate the configuration process and create the certificates required for authentication:
- The Cluster Operator creates the listeners and sets up the cluster and client certificate authority (CA) certificates to enable authentication within the Kafka cluster.
- The User Operator creates the user representing the client and the security credentials used for client authentication, based on the chosen authentication type.
You add the certificates to your client configuration.
In this procedure, the CA certificates generated by the Cluster Operator are used. Alternatively, you can replace them by installing your own custom CA certificates. You can also configure listeners to use Kafka listener certificates managed by an external CA.
Certificates are available in PEM (.crt) and PKCS #12 (.p12) formats. This procedure uses PEM certificates. Use PEM certificates with clients that support the X.509 certificate format.
For internal clients in the same OpenShift cluster and namespace, you can mount the cluster CA certificate in the pod specification. For more information, see Configuring internal clients to trust the cluster CA.
Prerequisites
- The Kafka cluster is available for connection by a client running outside the OpenShift cluster
- The Cluster Operator and User Operator are running in the cluster
16.4.1. Securing Kafka brokers
Configure the Kafka cluster with a Kafka listener.
- Define the authentication required to access the Kafka broker through the listener.
Enable authorization on the Kafka broker.
Example listener configuration
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: my-cluster namespace: myproject spec: kafka: # ... listeners: 1 - name: external1 2 port: 9094 3 type: <listener_type> 4 tls: true 5 authentication: type: tls 6 configuration: 7 #... authorization: 8 type: simple superUsers: - super-user-name 9 # ...
- 1
- Configuration options for enabling external listeners are described in the Generic Kafka listener schema reference.
- 2
- Name to identify the listener. Must be unique within the Kafka cluster.
- 3
- Port number used by the listener inside Kafka. The port number has to be unique within a given Kafka cluster. Allowed port numbers are 9092 and higher with the exception of ports 9404 and 9999, which are already used for Prometheus and JMX. Depending on the listener type, the port number might not be the same as the port number that connects Kafka clients.
- 4
- External listener type specified as
route
(OpenShift only),loadbalancer
,nodeport
oringress
(Kubernetes only). An internal listener is specified asinternal
orcluster-ip
. - 5
- Required. TLS encryption on the listener. For
route
andingress
type listeners it must be set totrue
. For mTLS authentication, also use theauthentication
property. - 6
- Client authentication mechanism on the listener. For server and client authentication using mTLS, you specify
tls: true
andauthentication.type: tls
. - 7
- (Optional) Depending on the requirements of the listener type, you can specify additional listener configuration.
- 8
- Authorization specified as
simple
, which uses theAclAuthorizer
andStandardAuthorizer
Kafka plugins. - 9
- (Optional) Super users can access all brokers regardless of any access restrictions defined in ACLs.
WarningAn OpenShift route address comprises the Kafka cluster name, the listener name, the project name, and the domain of the router. For example,
my-cluster-kafka-external1-bootstrap-my-project.domain.com
(<cluster_name>-kafka-<listener_name>-bootstrap-<namespace>.<domain>). Each DNS label (between periods “.”) must not exceed 63 characters, and the total length of the address must not exceed 255 characters.
Apply the changes to the
Kafka
resource configuration.The Kafka cluster is configured with a Kafka broker listener using mTLS authentication.
A service is created for each Kafka broker pod.
A service is created to serve as the bootstrap address for connection to the Kafka cluster.
A service is also created as the external bootstrap address for external connection to the Kafka cluster using
nodeport
listeners.The cluster CA certificate to verify the identity of the kafka brokers is also created in the secret
<cluster_name>-cluster-ca-cert
.NoteIf you scale your Kafka cluster while using external listeners, it might trigger a rolling update of all Kafka brokers. This depends on the configuration.
Retrieve the bootstrap address you can use to access the Kafka cluster from the status of the
Kafka
resource.oc get kafka <kafka_cluster_name> -o=jsonpath='{.status.listeners[?(@.name=="<listener_name>")].bootstrapServers}{"\n"}'
For example:
oc get kafka my-cluster -o=jsonpath='{.status.listeners[?(@.name=="external")].bootstrapServers}{"\n"}'
Use the bootstrap address in your Kafka client to connect to the Kafka cluster.
16.4.2. Securing user access to Kafka
Create or modify a user representing the client that requires access to the Kafka cluster.
-
Specify the same authentication type as the
Kafka
listener. Specify the authorization ACLs for
simple
authorization.Example user configuration
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaUser metadata: name: my-user labels: strimzi.io/cluster: my-cluster 1 spec: authentication: type: tls 2 authorization: type: simple acls: 3 - resource: type: topic name: my-topic patternType: literal operations: - Describe - Read - resource: type: group name: my-group patternType: literal operations: - Read
-
Specify the same authentication type as the
Apply the changes to the
KafkaUser
resource configuration.The user is created, as well as a secret with the same name as the
KafkaUser
resource. The secret contains a public and private key for mTLS authentication.Example secret with user credentials
apiVersion: v1 kind: Secret metadata: name: my-user labels: strimzi.io/kind: KafkaUser strimzi.io/cluster: my-cluster type: Opaque data: ca.crt: <public_key> # Public key of the clients CA used to sign this user certificate user.crt: <user_certificate> # Public key of the user user.key: <user_private_key> # Private key of the user user.p12: <store> # PKCS #12 store for user certificates and keys user.password: <password_for_store> # Protects the PKCS #12 store
Extract the cluster CA certificate from the
<cluster_name>-cluster-ca-cert
secret of the Kafka cluster.oc get secret <cluster_name>-cluster-ca-cert -o jsonpath='{.data.ca\.crt}' | base64 -d > ca.crt
Extract the user CA certificate from the
<user_name>
secret.oc get secret <user_name> -o jsonpath='{.data.user\.crt}' | base64 -d > user.crt
Extract the private key of the user from the
<user_name>
secret.oc get secret <user_name> -o jsonpath='{.data.user\.key}' | base64 -d > user.key
Configure your client with the bootstrap address hostname and port for connecting to the Kafka cluster:
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<hostname>:<port>");
Configure your client with the truststore credentials to verify the identity of the Kafka cluster.
Specify the public cluster CA certificate.
Example truststore configuration
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "PEM"); props.put(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, "<ca.crt_file_content>");
SSL is the specified security protocol for mTLS authentication. Specify
SASL_SSL
for SCRAM-SHA-512 authentication over TLS. PEM is the file format of the truststore.Configure your client with the keystore credentials to verify the user when connecting to the Kafka cluster.
Specify the public certificate and private key.
Example keystore configuration
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "PEM"); props.put(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG, "<user.crt_file_content>"); props.put(SslConfigs.SSL_KEYSTORE_KEY_CONFIG, "<user.key_file_content>");
Add the keystore certificate and the private key directly to the configuration. Add as a single-line format. Between the
BEGIN CERTIFICATE
andEND CERTIFICATE
delimiters, start with a newline character (\n
). End each line from the original certificate with\n
too.Example keystore configuration
props.put(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG, "-----BEGIN CERTIFICATE----- \n<user_certificate_content_line_1>\n<user_certificate_content_line_n>\n-----END CERTIFICATE---"); props.put(SslConfigs.SSL_KEYSTORE_KEY_CONFIG, "----BEGIN PRIVATE KEY-----\n<user_key_content_line_1>\n<user_key_content_line_n>\n-----END PRIVATE KEY-----");
16.5. Troubleshooting TLS hostname verification with node ports
Off-cluster access using node ports with TLS encryption enabled does not support TLS hostname verification. This is because Streams for Apache Kafka does not know the address of the node where the broker pod is scheduled and cannot include it in the broker certificate. Consequently, clients that perform hostname verification will fail to connect.
For example, a Java client will fail with the following exception:
Exception for TLS hostname verification
Caused by: java.security.cert.CertificateException: No subject alternative names matching IP address 168.72.15.231 found ...
To connect, you must disable hostname verification. In the Java client, set the ssl.endpoint.identification.algorithm
configuration option to an empty string.
When configuring the client using a properties file, you can do it this way:
ssl.endpoint.identification.algorithm=
When configuring the client directly in Java, set the configuration option to an empty string:
props.put("ssl.endpoint.identification.algorithm", "");
Alternatively, if you know the addresses of the worker nodes where the brokers are scheduled, you can add them as additional SANs (Subject Alternative Names) to the broker certificates manually. For example, this might apply if your cluster is running on a bare metal deployment with a limited number of available worker nodes. Use the alternativeNames
property to specify additional SANS.