このコンテンツは選択した言語では利用できません。

Chapter 16. Securing access to a Kafka cluster


Secure connections by configuring Kafka and Kafka users. Through configuration, you can implement encryption, authentication, and authorization mechanisms.

Kafka configuration

To establish secure access to Kafka, configure the Kafka resource to set up the following configurations based on your specific requirements:

  • Listeners with specified authentication types to define how clients authenticate

    • TLS encryption for communication between Kafka and clients
    • Supported TLS versions and cipher suites for additional security
  • Authorization for the entire Kafka cluster
  • Network policies for restricting access
  • Super users for unconstrained access to brokers

Authentication is configured independently for each listener, while authorization is set up for the whole Kafka cluster.

For more information on access configuration for Kafka, see the Kafka schema reference and GenericKafkaListener schema reference.

User (client-side) configuration

To enable secure client access to Kafka, configure KafkaUser resources. These resources represent clients and determine how they authenticate and authorize with the Kafka cluster.

Configure the KafkaUser resource to set up the following configurations based on your specific requirements:

  • Authentication that must match the enabled listener authentication

    • Supported TLS versions and cipher suites that must match the Kafka configuration
  • Simple authorization to apply Access Control List (ACL) rules

    • ACLs for fine-grained control over user access to topics and actions
  • Quotas to limit client access based on byte rates or CPU utilization

The User Operator creates the user representing the client and the security credentials used for client authentication, based on the chosen authentication type.

For more information on access configuration for users, see the KafkaUser schema reference.

16.1. Configuring client authentication on listeners

Configure client authentication for Kafka brokers when creating listeners. Specify the listener authentication type using the Kafka.spec.kafka.listeners.authentication property in the Kafka resource.

For clients inside the OpenShift cluster, you can create plain (without encryption) or tls internal listeners. The internal listener type use a headless service and the DNS names given to the broker pods. As an alternative to the headless service, you can also create a cluster-ip type of internal listener to expose Kafka using per-broker ClusterIP services. For clients outside the OpenShift cluster, you create external listeners and specify a connection mechanism, which can be nodeport, loadbalancer, ingress (Kubernetes only), or route (OpenShift only).

For more information on the configuration options for connecting an external client, see Chapter 15, Setting up client access to a Kafka cluster.

Supported authentication options:

  1. mTLS authentication (only on the listeners with TLS enabled encryption)
  2. SCRAM-SHA-512 authentication
  3. OAuth 2.0 token-based authentication
  4. Custom authentication
  5. TLS versions and cipher suites

If you’re using OAuth 2.0 for client access management, user authentication and authorization credentials are handled through the authorization server.

The authentication option you choose depends on how you wish to authenticate client access to Kafka brokers.

Note

Try exploring the standard authentication options before using custom authentication. Custom authentication allows for any type of Kafka-supported authentication. It can provide more flexibility, but also adds complexity.

Figure 16.1. Kafka listener authentication options

options for listener authentication configuration

The listener authentication property is used to specify an authentication mechanism specific to that listener.

If no authentication property is specified then the listener does not authenticate clients which connect through that listener. The listener will accept all connections without authentication.

Authentication must be configured when using the User Operator to manage KafkaUsers.

The following example shows:

  • A plain listener configured for SCRAM-SHA-512 authentication
  • A tls listener with mTLS authentication
  • An external listener with mTLS authentication

Each listener is configured with a unique name and port within a Kafka cluster.

Important

When configuring listeners for client access to brokers, you can use port 9092 or higher (9093, 9094, and so on), but with a few exceptions. The listeners cannot be configured to use the ports reserved for interbroker communication (9090 and 9091), Prometheus metrics (9404), and JMX (Java Management Extensions) monitoring (9999).

Example listener authentication configuration

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
  namespace: myproject
spec:
  kafka:
    # ...
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: true
        authentication:
          type: scram-sha-512
      - name: tls
        port: 9093
        type: internal
        tls: true
        authentication:
          type: tls
      - name: external3
        port: 9094
        type: loadbalancer
        tls: true
        authentication:
          type: tls
# ...

16.1.1. mTLS authentication

mTLS authentication is always used for the communication between Kafka brokers and ZooKeeper pods.

Streams for Apache Kafka can configure Kafka to use TLS (Transport Layer Security) to provide encrypted communication between Kafka brokers and clients either with or without mutual authentication. For mutual, or two-way, authentication, both the server and the client present certificates. When you configure mTLS authentication, the broker authenticates the client (client authentication) and the client authenticates the broker (server authentication).

mTLS listener configuration in the Kafka resource requires the following:

  • tls: true to specify TLS encryption and server authentication
  • authentication.type: tls to specify the client authentication

When a Kafka cluster is created by the Cluster Operator, it creates a new secret with the name <cluster_name>-cluster-ca-cert. The secret contains a CA certificate. The CA certificate is in PEM and PKCS #12 format. To verify a Kafka cluster, add the CA certificate to the truststore in your client configuration. To verify a client, add a user certificate and key to the keystore in your client configuration. For more information on configuring a client for mTLS, see Section 16.3.2, “Configuring user authentication”.

Note

TLS authentication is more commonly one-way, with one party authenticating the identity of another. For example, when HTTPS is used between a web browser and a web server, the browser obtains proof of the identity of the web server.

16.1.2. SCRAM-SHA-512 authentication

SCRAM (Salted Challenge Response Authentication Mechanism) is an authentication protocol that can establish mutual authentication using passwords. Streams for Apache Kafka can configure Kafka to use SASL (Simple Authentication and Security Layer) SCRAM-SHA-512 to provide authentication on both unencrypted and encrypted client connections.

When SCRAM-SHA-512 authentication is used with a TLS connection, the TLS protocol provides the encryption, but is not used for authentication.

The following properties of SCRAM make it safe to use SCRAM-SHA-512 even on unencrypted connections:

  • The passwords are not sent in the clear over the communication channel. Instead the client and the server are each challenged by the other to offer proof that they know the password of the authenticating user.
  • The server and client each generate a new challenge for each authentication exchange. This means that the exchange is resilient against replay attacks.

When KafkaUser.spec.authentication.type is configured with scram-sha-512 the User Operator will generate a random 32-character password consisting of upper and lowercase ASCII letters and numbers.

16.1.3. Restricting access to listeners with network policies

Control listener access by configuring the networkPolicyPeers property in the Kafka resource.

By default, Streams for Apache Kafka automatically creates a NetworkPolicy resource for every enabled Kafka listener, allowing connections from all namespaces.

To restrict listener access to specific applications or namespaces at the network level, configure the networkPolicyPeers property. Each listener can have its own networkPolicyPeers configuration. For more information on network policy peers, refer to the NetworkPolicyPeer API reference.

If you want to use custom network policies, you can set the STRIMZI_NETWORK_POLICY_GENERATION environment variable to false in the Cluster Operator configuration. For more information, see Section 10.7, “Configuring the Cluster Operator”.

Note

Your configuration of OpenShift must support ingress NetworkPolicies in order to use network policies.

Prerequisites

  • An OpenShift cluster with support for Ingress NetworkPolicies.
  • The Cluster Operator is running.

Procedure

  1. Configure the networkPolicyPeers property to define the application pods or namespaces allowed to access the Kafka cluster.

    This example shows configuration for a tls listener to allow connections only from application pods with the label app set to kafka-client:

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    spec:
      kafka:
        # ...
        listeners:
          - name: tls
            port: 9093
            type: internal
            tls: true
            authentication:
              type: tls
            networkPolicyPeers:
              - podSelector:
                  matchLabels:
                    app: kafka-client
        # ...
      zookeeper:
        # ...
  2. Apply the changes to the Kafka resource configuration.

16.1.4. Using custom listener certificates for TLS encryption

This procedure shows how to configure custom server certificates for TLS listeners or external listeners which have TLS encryption enabled.

By default, Kafka listeners use certificates signed by Streams for Apache Kafka’s internal CA (certificate authority). The Cluster Operator automatically generates a CA certificate when creating a Kafka cluster. To configure a client for TLS, the CA certificate is included in its truststore configuration to authenticate the Kafka cluster. Alternatively, you have the option to install and use your own CA certificates.

However, if you prefer more granular control by using your own custom certificates at the listener-level, you can configure listeners using brokerCertChainAndKey properties. You create a secret with your own private key and server certificate, then specify them in the brokerCertChainAndKey configuration.

User-provided certificates allow you to leverage existing security infrastructure. You can use a certificate signed by a public (external) CA or a private CA. Kafka clients need to trust the CA which was used to sign the listener certificate. If signed by a public CA, you usually won’t need to add it to a client’s truststore configuration.

Custom certificates are not managed by Streams for Apache Kafka, so you need to renew them manually.

Note

Listener certificates are used for TLS encryption and server authentication only. They are not used for TLS client authentication. If you want to use your own certificate for TLS client authentication as well, you must install and use your own clients CA.

Prerequisites

  • The Cluster Operator is running.
  • Each listener requires the following:

If you are not using a self-signed certificate, you can provide a certificate that includes the whole CA chain in the certificate.

You can only use the brokerCertChainAndKey properties if TLS encryption (tls: true) is configured for the listener.

Note

Streams for Apache Kafka does not support the use of encrypted private keys for TLS. The private key stored in the secret must be unencrypted for this to work.

Procedure

  1. Create a Secret containing your private key and server certificate:

    oc create secret generic <my_secret> --from-file=<my_listener_key.key> --from-file=<my_listener_certificate.crt>
  2. Edit the Kafka resource for your cluster.

    Configure the listener to use your Secret, certificate file, and private key file in the configuration.brokerCertChainAndKey property.

    Example configuration for a loadbalancer external listener with TLS encryption enabled

    # ...
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: external3
        port: 9094
        type: loadbalancer
        tls: true
        configuration:
          brokerCertChainAndKey:
            secretName: my-secret
            certificate: my-listener-certificate.crt
            key: my-listener-key.key
    # ...

    Example configuration for a TLS listener

    # ...
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
        configuration:
          brokerCertChainAndKey:
            secretName: my-secret
            certificate: my-listener-certificate.crt
            key: my-listener-key.key
    # ...

  3. Apply the changes to the Kafka resource configuration.

    The Cluster Operator starts a rolling update of the Kafka cluster, which updates the configuration of the listeners.

    Note

    A rolling update is also started if you update a Kafka listener certificate in a Secret that is already used by a listener.

16.1.5. Specifying SANs for custom listener certificates

In order to use TLS hostname verification with custom Kafka listener certificates, you must specify the correct Subject Alternative Names (SANs) for each listener.

The certificate SANs must specify hostnames for the following:

  • All of the Kafka brokers in your cluster
  • The Kafka cluster bootstrap service

You can use wildcard certificates if they are supported by your CA.

16.1.5.1. Examples of SANs for internal listeners

Use the following examples to help you specify hostnames of the SANs in your certificates for your internal listeners.

Replace <cluster-name> with the name of the Kafka cluster and <namespace> with the OpenShift namespace where the cluster is running.

Wildcards example for a type: internal listener

//Kafka brokers
*.<cluster_name>-kafka-brokers
*.<cluster_name>-kafka-brokers.<namespace>.svc

// Bootstrap service
<cluster_name>-kafka-bootstrap
<cluster_name>-kafka-bootstrap.<namespace>.svc

Non-wildcards example for a type: internal listener

// Kafka brokers
<cluster_name>-kafka-0.<cluster_name>-kafka-brokers
<cluster_name>-kafka-0.<cluster_name>-kafka-brokers.<namespace>.svc
<cluster_name>-kafka-1.<cluster_name>-kafka-brokers
<cluster_name>-kafka-1.<cluster_name>-kafka-brokers.<namespace>.svc
# ...

// Bootstrap service
<cluster_name>-kafka-bootstrap
<cluster_name>-kafka-bootstrap.<namespace>.svc

Non-wildcards example for a type: cluster-ip listener

// Kafka brokers
<cluster_name>-kafka-<listener-name>-0
<cluster_name>-kafka-<listener-name>-0.<namespace>.svc
<cluster_name>-kafka-_listener-name>-1
<cluster_name>-kafka-<listener-name>-1.<namespace>.svc
# ...

// Bootstrap service
<cluster_name>-kafka-<listener-name>-bootstrap
<cluster_name>-kafka-<listener-name>-bootstrap.<namespace>.svc

16.1.5.2. Examples of SANs for external listeners

For external listeners which have TLS encryption enabled, the hostnames you need to specify in certificates depends on the external listener type.

Table 16.1. SANs for each type of external listener
External listener typeIn the SANs, specify…​

ingress

Addresses of all Kafka broker Ingress resources and the address of the bootstrap Ingress.

You can use a matching wildcard name.

route

Addresses of all Kafka broker Routes and the address of the bootstrap Route.

You can use a matching wildcard name.

loadbalancer

Addresses of all Kafka broker loadbalancers and the bootstrap loadbalancer address.

You can use a matching wildcard name.

nodeport

Addresses of all OpenShift worker nodes that the Kafka broker pods might be scheduled to.

You can use a matching wildcard name.

16.2. Configuring authorized access to Kafka

Configure authorized access to a Kafka cluster using the Kafka.spec.kafka.authorization property in the Kafka resource. If the authorization property is missing, no authorization is enabled and clients have no restrictions. When enabled, authorization is applied to all enabled listeners. The authorization method is defined in the type field.

Supported authorization options:

Figure 16.2. Kafka cluster authorization options

options for kafka authorization configuration

16.2.1. Designating super users

Super users can access all resources in your Kafka cluster regardless of any access restrictions, and are supported by all authorization mechanisms.

To designate super users for a Kafka cluster, add a list of user principals to the superUsers property. If a user uses mTLS authentication, the username is the common name from the TLS certificate subject prefixed with CN=. If you are not using the User Operator and using your own certificates for mTLS, the username is the full certificate subject.

A full certificate subject can include the following fields:

  • CN=<common_name>
  • OU=<organizational_unit>
  • O=<organization>
  • L=<locality>
  • ST=<state>
  • C=<country_code>

Omit any fields that are not applicable.

An example configuration with super users

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
  namespace: myproject
spec:
  kafka:
    # ...
    authorization:
      type: simple
      superUsers:
        - CN=user-1
        - user-2
        - CN=user-3
        - CN=user-4,OU=my-ou,O=my-org,L=my-location,ST=my-state,C=US
        - CN=user-5,OU=my-ou,O=my-org,C=GB
        - CN=user-6,O=my-org
    # ...

16.3. Configuring user (client-side) security mechanisms

When configuring security mechanisms in clients, the clients are represented as users. Use the KafkaUser resource to configure the authentication, authorization, and access rights for Kafka clients.

Authentication permits user access, and authorization constrains user access to permissible actions. You can also create super users that have unconstrained access to Kafka brokers.

The authentication and authorization mechanisms must match the specification for the listener used to access the Kafka brokers.

For more information on configuring a KafkaUser resource to access Kafka brokers securely, see Section 16.4, “Example: Setting up secure client access”.

16.3.1. Associating users with Kafka clusters

A KafkaUser resource includes a label that defines the appropriate name of the Kafka cluster (derived from the name of the Kafka resource) to which it belongs.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
  name: my-user
  labels:
    strimzi.io/cluster: my-cluster

The label enables the User Operator to identify the KafkaUser resource and create and manager the user.

If the label does not match the Kafka cluster, the User Operator cannot identify the KafkaUser, and the user is not created.

If the status of the KafkaUser resource remains empty, check your label configuration.

16.3.2. Configuring user authentication

Use the KafkaUser custom resource to configure authentication credentials for users (clients) that require access to a Kafka cluster. Configure the credentials using the authentication property in KafkaUser.spec. By specifying a type, you control what credentials are generated.

Supported authentication types:

  • tls for mTLS authentication
  • tls-external for mTLS authentication using external certificates
  • scram-sha-512 for SCRAM-SHA-512 authentication

If tls or scram-sha-512 is specified, the User Operator creates authentication credentials when it creates the user. If tls-external is specified, the user still uses mTLS, but no authentication credentials are created. Use this option when you’re providing your own certificates. When no authentication type is specified, the User Operator does not create the user or its credentials.

You can use tls-external to authenticate with mTLS using a certificate issued outside the User Operator. The User Operator does not generate a TLS certificate or a secret. You can still manage ACL rules and quotas through the User Operator in the same way as when you’re using the tls mechanism. This means that you use the CN=USER-NAME format when specifying ACL rules and quotas. USER-NAME is the common name given in a TLS certificate.

16.3.2.1. mTLS authentication

To use mTLS authentication, you set the type field in the KafkaUser resource to tls.

Example user with mTLS authentication enabled

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
  name: my-user
  labels:
    strimzi.io/cluster: my-cluster
spec:
  authentication:
    type: tls
  # ...

The authentication type must match the equivalent configuration for the Kafka listener used to access the Kafka cluster.

When the user is created by the User Operator, it creates a new secret with the same name as the KafkaUser resource. The secret contains a private and public key for mTLS. The public key is contained in a user certificate, which is signed by a clients CA (certificate authority) when it is created. All keys are in X.509 format.

Note

If you are using the clients CA generated by the Cluster Operator, the user certificates generated by the User Operator are also renewed when the clients CA is renewed by the Cluster Operator.

The user secret provides keys and certificates in PEM and PKCS #12 formats.

Example secret with user credentials

apiVersion: v1
kind: Secret
metadata:
  name: my-user
  labels:
    strimzi.io/kind: KafkaUser
    strimzi.io/cluster: my-cluster
type: Opaque
data:
  ca.crt: <public_key> # Public key of the clients CA used to sign this user certificate
  user.crt: <user_certificate> # Public key of the user
  user.key: <user_private_key> # Private key of the user
  user.p12: <store> # PKCS #12 store for user certificates and keys
  user.password: <password_for_store> # Protects the PKCS #12 store

When you configure a client, you specify the following:

  • Truststore properties for the public cluster CA certificate to verify the identity of the Kafka cluster
  • Keystore properties for the user authentication credentials to verify the client

The configuration depends on the file format (PEM or PKCS #12). This example uses PKCS #12 stores, and the passwords required to access the credentials in the stores.

Example client configuration using mTLS in PKCS #12 format

bootstrap.servers=<kafka_cluster_name>-kafka-bootstrap:9093 1
security.protocol=SSL 2
ssl.truststore.location=/tmp/ca.p12 3
ssl.truststore.password=<truststore_password> 4
ssl.keystore.location=/tmp/user.p12 5
ssl.keystore.password=<keystore_password> 6

1
The bootstrap server address to connect to the Kafka cluster.
2
The security protocol option when using TLS for encryption.
3
The truststore location contains the public key certificate (ca.p12) for the Kafka cluster. A cluster CA certificate and password is generated by the Cluster Operator in the <cluster_name>-cluster-ca-cert secret when the Kafka cluster is created.
4
The password (ca.password) for accessing the truststore.
5
The keystore location contains the public key certificate (user.p12) for the Kafka user.
6
The password (user.password) for accessing the keystore.

16.3.2.2. mTLS authentication using a certificate issued outside the User Operator

To use mTLS authentication using a certificate issued outside the User Operator, you set the type field in the KafkaUser resource to tls-external. A secret and credentials are not created for the user.

Example user with mTLS authentication that uses a certificate issued outside the User Operator

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
  name: my-user
  labels:
    strimzi.io/cluster: my-cluster
spec:
  authentication:
    type: tls-external
  # ...

16.3.2.3. SCRAM-SHA-512 authentication

To use the SCRAM-SHA-512 authentication mechanism, you set the type field in the KafkaUser resource to scram-sha-512.

Example user with SCRAM-SHA-512 authentication enabled

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
  name: my-user
  labels:
    strimzi.io/cluster: my-cluster
spec:
  authentication:
    type: scram-sha-512
  # ...

When the user is created by the User Operator, it creates a new secret with the same name as the KafkaUser resource. The secret contains the generated password in the password key, which is encoded with base64. In order to use the password, it must be decoded.

Example secret with user credentials

apiVersion: v1
kind: Secret
metadata:
  name: my-user
  labels:
    strimzi.io/kind: KafkaUser
    strimzi.io/cluster: my-cluster
type: Opaque
data:
  password: Z2VuZXJhdGVkcGFzc3dvcmQ= 1
  sasl.jaas.config: b3JnLmFwYWNoZS5rYWZrYS5jb21tb24uc2VjdXJpdHkuc2NyYW0uU2NyYW1Mb2dpbk1vZHVsZSByZXF1aXJlZCB1c2VybmFtZT0ibXktdXNlciIgcGFzc3dvcmQ9ImdlbmVyYXRlZHBhc3N3b3JkIjsK 2

1
The generated password, base64 encoded.
2
The JAAS configuration string for SASL SCRAM-SHA-512 authentication, base64 encoded.

Decoding the generated password:

echo "Z2VuZXJhdGVkcGFzc3dvcmQ=" | base64 --decode
16.3.2.3.1. Custom password configuration

When a user is created, Streams for Apache Kafka generates a random password. You can use your own password instead of the one generated by Streams for Apache Kafka. To do so, create a secret with the password and reference it in the KafkaUser resource.

Example user with a password set for SCRAM-SHA-512 authentication

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
  name: my-user
  labels:
    strimzi.io/cluster: my-cluster
spec:
  authentication:
    type: scram-sha-512
    password:
      valueFrom:
        secretKeyRef:
          name: my-secret 1
          key: my-password 2
  # ...

1
The name of the secret containing the predefined password.
2
The key for the password stored inside the secret.

16.3.3. Configuring user authorization

Use the KafkaUser custom resource to configure authorization rules for users (clients) that require access to a Kafka cluster. Configure the rules using the authorization property in KafkaUser.spec. By specifying a type, you control what rules are used.

To use simple authorization, you set the type property to simple in KafkaUser.spec.authorization. The simple authorization uses the Kafka Admin API to manage the ACL rules inside your Kafka cluster. Whether ACL management in the User Operator is enabled or not depends on your authorization configuration in the Kafka cluster.

  • For simple authorization, ACL management is always enabled.
  • For OPA authorization, ACL management is always disabled. Authorization rules are configured in the OPA server.
  • For Red Hat build of Keycloak authorization, you can manage the ACL rules directly in Red Hat build of Keycloak. You can also delegate authorization to the simple authorizer as a fallback option in the configuration. When delegation to the simple authorizer is enabled, the User Operator will enable management of ACL rules as well.
  • For custom authorization using a custom authorization plugin, use the supportsAdminApi property in the .spec.kafka.authorization configuration of the Kafka custom resource to enable or disable the support.

Authorization is cluster-wide. The authorization type must match the equivalent configuration in the Kafka custom resource.

If ACL management is not enabled, Streams for Apache Kafka rejects a resource if it contains any ACL rules.

If you’re using a standalone deployment of the User Operator, ACL management is enabled by default. You can disable it using the STRIMZI_ACLS_ADMIN_API_SUPPORTED environment variable.

If no authorization is specified, the User Operator does not provision any access rights for the user. Whether such a KafkaUser can still access resources depends on the authorizer being used. For example, for simple authorization, this is determined by the allow.everyone.if.no.acl.found configuration in the Kafka cluster.

16.3.3.1. ACL rules

simple authorization uses ACL rules to manage access to Kafka brokers.

ACL rules grant access rights to the user, which you specify in the acls property.

For more information about the AclRule object, see the AclRule schema reference.

16.3.3.2. Super user access to Kafka brokers

If a user is added to a list of super users in a Kafka broker configuration, the user is allowed unlimited access to the cluster regardless of any authorization constraints defined in ACLs in KafkaUser.

For more information on configuring super user access to brokers, see Kafka authorization.

16.3.4. Configuring user quotas

Configure the spec for the KafkaUser resource to enforce quotas so that a user does not overload Kafka brokers. Set size-based network usage and time-based CPU utilization thresholds.

Partition mutations occur in response to the following types of user requests:

  • Creating partitions for a new topic
  • Adding partitions to an existing topic
  • Deleting partitions from a topic

You can also add a partition mutation quota to control the rate at which requests to change partitions are accepted.

Example KafkaUser with user quotas

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
  name: my-user
  labels:
    strimzi.io/cluster: my-cluster
spec:
  # ...
  quotas:
    producerByteRate: 1048576 1
    consumerByteRate: 2097152 2
    requestPercentage: 55 3
    controllerMutationRate: 10 4

1
Byte-per-second quota on the amount of data the user can push to a Kafka broker.
2
Byte-per-second quota on the amount of data the user can fetch from a Kafka broker.
3
CPU utilization limit as a percentage of time for a client group.
4
Number of concurrent partition creation and deletion operations (mutations) allowed per second.

Using quotas for Kafka clients might be useful in a number of situations. Consider a wrongly configured Kafka producer which is sending requests at too high a rate. Such misconfiguration can cause a denial of service to other clients, so the problematic client ought to be blocked. By using a network limiting quota, it is possible to prevent this situation from significantly impacting other clients.

Note

Streams for Apache Kafka supports user-level quotas, but not client-level quotas.

16.4. Example: Setting up secure client access

This procedure shows how to configure client access to a Kafka cluster from outside OpenShift or from another OpenShift cluster. It’s split into two parts:

  • Securing Kafka brokers
  • Securing user access to Kafka

Resource configuration

Client access to the Kafka cluster is secured with the following configuration:

  1. An external listener is configured with TLS encryption and mutual TLS (mTLS) authentication in the Kafka resource, as well as simple authorization.
  2. A KafkaUser is created for the client, utilizing mTLS authentication, and Access Control Lists (ACLs) are defined for simple authorization.

At least one listener supporting the desired authentication must be configured for the KafkaUser.

Listeners can be configured for mutual TLS, SCRAM-SHA-512, or OAuth authentication. While mTLS always uses encryption, it’s also recommended when using SCRAM-SHA-512 and OAuth 2.0 authentication.

Authorization options for Kafka include simple, OAuth, OPA, or custom. When enabled, authorization is applied to all enabled listeners.

To ensure compatibility between Kafka and clients, configuration of the following authentication and authorization mechanisms must align:

  • For type: tls and type: scram-sha-512 authentication types, Kafka.spec.kafka.listeners[*].authentication must match KafkaUser.spec.authentication
  • For type: simple authorization,Kafka.spec.kafka.authorization must match KafkaUser.spec.authorization

For example, mTLS authentication for a user is only possible if it’s also enabled in the Kafka configuration.

Automation and certificate management

Streams for Apache Kafka operators automate the configuration process and create the certificates required for authentication:

  • The Cluster Operator creates the listeners and sets up the cluster and client certificate authority (CA) certificates to enable authentication within the Kafka cluster.
  • The User Operator creates the user representing the client and the security credentials used for client authentication, based on the chosen authentication type.

You add the certificates to your client configuration.

In this procedure, the CA certificates generated by the Cluster Operator are used. Alternatively, you can replace them by installing your own custom CA certificates. You can also configure listeners to use Kafka listener certificates managed by an external CA.

Certificates are available in PEM (.crt) and PKCS #12 (.p12) formats. This procedure uses PEM certificates. Use PEM certificates with clients that support the X.509 certificate format.

Note

For internal clients in the same OpenShift cluster and namespace, you can mount the cluster CA certificate in the pod specification. For more information, see Configuring internal clients to trust the cluster CA.

Prerequisites

  • The Kafka cluster is available for connection by a client running outside the OpenShift cluster
  • The Cluster Operator and User Operator are running in the cluster

16.4.1. Securing Kafka brokers

  1. Configure the Kafka cluster with a Kafka listener.

    • Define the authentication required to access the Kafka broker through the listener.
    • Enable authorization on the Kafka broker.

      Example listener configuration

      apiVersion: kafka.strimzi.io/v1beta2
      kind: Kafka
      metadata:
        name: my-cluster
        namespace: myproject
      spec:
        kafka:
          # ...
          listeners: 1
          - name: external1 2
            port: 9094 3
            type: <listener_type> 4
            tls: true 5
            authentication:
              type: tls 6
            configuration: 7
              #...
          authorization: 8
            type: simple
            superUsers:
              - super-user-name 9
        # ...

      1
      Configuration options for enabling external listeners are described in the Generic Kafka listener schema reference.
      2
      Name to identify the listener. Must be unique within the Kafka cluster.
      3
      Port number used by the listener inside Kafka. The port number has to be unique within a given Kafka cluster. Allowed port numbers are 9092 and higher with the exception of ports 9404 and 9999, which are already used for Prometheus and JMX. Depending on the listener type, the port number might not be the same as the port number that connects Kafka clients.
      4
      External listener type specified as route (OpenShift only), loadbalancer, nodeport or ingress (Kubernetes only). An internal listener is specified as internal or cluster-ip.
      5
      Required. TLS encryption on the listener. For route and ingress type listeners it must be set to true. For mTLS authentication, also use the authentication property.
      6
      Client authentication mechanism on the listener. For server and client authentication using mTLS, you specify tls: true and authentication.type: tls.
      7
      (Optional) Depending on the requirements of the listener type, you can specify additional listener configuration.
      8
      Authorization specified as simple, which uses the AclAuthorizer and StandardAuthorizer Kafka plugins.
      9
      (Optional) Super users can access all brokers regardless of any access restrictions defined in ACLs.
      Warning

      An OpenShift route address comprises the Kafka cluster name, the listener name, the project name, and the domain of the router. For example, my-cluster-kafka-external1-bootstrap-my-project.domain.com (<cluster_name>-kafka-<listener_name>-bootstrap-<namespace>.<domain>). Each DNS label (between periods “.”) must not exceed 63 characters, and the total length of the address must not exceed 255 characters.

  2. Apply the changes to the Kafka resource configuration.

    The Kafka cluster is configured with a Kafka broker listener using mTLS authentication.

    A service is created for each Kafka broker pod.

    A service is created to serve as the bootstrap address for connection to the Kafka cluster.

    A service is also created as the external bootstrap address for external connection to the Kafka cluster using nodeport listeners.

    The cluster CA certificate to verify the identity of the kafka brokers is also created in the secret <cluster_name>-cluster-ca-cert.

    Note

    If you scale your Kafka cluster while using external listeners, it might trigger a rolling update of all Kafka brokers. This depends on the configuration.

  3. Retrieve the bootstrap address you can use to access the Kafka cluster from the status of the Kafka resource.

    oc get kafka <kafka_cluster_name> -o=jsonpath='{.status.listeners[?(@.name=="<listener_name>")].bootstrapServers}{"\n"}'

    For example:

    oc get kafka my-cluster -o=jsonpath='{.status.listeners[?(@.name=="external")].bootstrapServers}{"\n"}'

    Use the bootstrap address in your Kafka client to connect to the Kafka cluster.

16.4.2. Securing user access to Kafka

  1. Create or modify a user representing the client that requires access to the Kafka cluster.

    • Specify the same authentication type as the Kafka listener.
    • Specify the authorization ACLs for simple authorization.

      Example user configuration

      apiVersion: kafka.strimzi.io/v1beta2
      kind: KafkaUser
      metadata:
        name: my-user
        labels:
          strimzi.io/cluster: my-cluster 1
      spec:
        authentication:
          type: tls 2
        authorization:
          type: simple
          acls: 3
            - resource:
                type: topic
                name: my-topic
                patternType: literal
              operations:
                - Describe
                - Read
            - resource:
                type: group
                name: my-group
                patternType: literal
              operations:
                - Read

      1
      The label must match the label of the Kafka cluster.
      2
      Authentication specified as mutual tls.
      3
      Simple authorization requires an accompanying list of ACL rules to apply to the user. The rules define the operations allowed on Kafka resources based on the username (my-user).
  2. Apply the changes to the KafkaUser resource configuration.

    The user is created, as well as a secret with the same name as the KafkaUser resource. The secret contains a public and private key for mTLS authentication.

    Example secret with user credentials

    apiVersion: v1
    kind: Secret
    metadata:
      name: my-user
      labels:
        strimzi.io/kind: KafkaUser
        strimzi.io/cluster: my-cluster
    type: Opaque
    data:
      ca.crt: <public_key> # Public key of the clients CA used to sign this user certificate
      user.crt: <user_certificate> # Public key of the user
      user.key: <user_private_key> # Private key of the user
      user.p12: <store> # PKCS #12 store for user certificates and keys
      user.password: <password_for_store> # Protects the PKCS #12 store

  3. Extract the cluster CA certificate from the <cluster_name>-cluster-ca-cert secret of the Kafka cluster.

    oc get secret <cluster_name>-cluster-ca-cert -o jsonpath='{.data.ca\.crt}' | base64 -d > ca.crt
  4. Extract the user CA certificate from the <user_name> secret.

    oc get secret <user_name> -o jsonpath='{.data.user\.crt}' | base64 -d > user.crt
  5. Extract the private key of the user from the <user_name> secret.

    oc get secret <user_name> -o jsonpath='{.data.user\.key}' | base64 -d > user.key
  6. Configure your client with the bootstrap address hostname and port for connecting to the Kafka cluster:

    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<hostname>:<port>");
  7. Configure your client with the truststore credentials to verify the identity of the Kafka cluster.

    Specify the public cluster CA certificate.

    Example truststore configuration

    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
    props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "PEM");
    props.put(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, "<ca.crt_file_content>");

    SSL is the specified security protocol for mTLS authentication. Specify SASL_SSL for SCRAM-SHA-512 authentication over TLS. PEM is the file format of the truststore.

  8. Configure your client with the keystore credentials to verify the user when connecting to the Kafka cluster.

    Specify the public certificate and private key.

    Example keystore configuration

    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
    props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "PEM");
    props.put(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG, "<user.crt_file_content>");
    props.put(SslConfigs.SSL_KEYSTORE_KEY_CONFIG, "<user.key_file_content>");

    Add the keystore certificate and the private key directly to the configuration. Add as a single-line format. Between the BEGIN CERTIFICATE and END CERTIFICATE delimiters, start with a newline character (\n). End each line from the original certificate with \n too.

    Example keystore configuration

    props.put(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG, "-----BEGIN CERTIFICATE----- \n<user_certificate_content_line_1>\n<user_certificate_content_line_n>\n-----END CERTIFICATE---");
    props.put(SslConfigs.SSL_KEYSTORE_KEY_CONFIG, "----BEGIN PRIVATE KEY-----\n<user_key_content_line_1>\n<user_key_content_line_n>\n-----END PRIVATE KEY-----");

16.5. Troubleshooting TLS hostname verification with node ports

Off-cluster access using node ports with TLS encryption enabled does not support TLS hostname verification. This is because Streams for Apache Kafka does not know the address of the node where the broker pod is scheduled and cannot include it in the broker certificate. Consequently, clients that perform hostname verification will fail to connect.

For example, a Java client will fail with the following exception:

Exception for TLS hostname verification

Caused by: java.security.cert.CertificateException: No subject alternative names matching IP address 168.72.15.231 found
 ...

To connect, you must disable hostname verification. In the Java client, set the ssl.endpoint.identification.algorithm configuration option to an empty string.

When configuring the client using a properties file, you can do it this way:

ssl.endpoint.identification.algorithm=

When configuring the client directly in Java, set the configuration option to an empty string:

props.put("ssl.endpoint.identification.algorithm", "");

Alternatively, if you know the addresses of the worker nodes where the brokers are scheduled, you can add them as additional SANs (Subject Alternative Names) to the broker certificates manually. For example, this might apply if your cluster is running on a bare metal deployment with a limited number of available worker nodes. Use the alternativeNames property to specify additional SANS.

Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.