此内容没有您所选择的语言版本。

Chapter 17. Enabling OAuth 2.0 token-based access


Streams for Apache Kafka supports OAuth 2.0 for securing Kafka clusters by integrating with an OAUth 2.0 authorization server. Kafka brokers and clients both need to be configured to use OAuth 2.0.

OAuth 2.0 enables standardized token-based authentication and authorization between applications, using a central authorization server to issue tokens that grant limited access to resources. You can define specific scopes for fine-grained access control. Scopes correspond to different levels of access to Kafka topics or operations within the cluster.

OAuth 2.0 also supports single sign-on and integration with identity providers.

17.1. Configuring an OAuth 2.0 authorization server

Before you can use OAuth 2.0 token-based access, you must configure an authorization server for integration with Streams for Apache Kafka. The steps are dependent on the chosen authorization server. Consult the product documentation for the authorization server for information on how to set up OAuth 2.0 access.

Prepare the authorization server to work with Streams for Apache Kafka by defining OAUth 2.0 clients for Kafka and each Kafka client component of your application. In relation to the authorization server, the Kafka cluster and Kafka clients are both regarded as OAuth 2.0 clients.

In general, configure OAuth 2.0 clients in the authorization server with the following client credentials enabled:

  • Client ID (for example, kafka for the Kafka cluster)
  • Client ID and secret as the authentication mechanism
Note

You only need to use a client ID and secret when using a non-public introspection endpoint of the authorization server. The credentials are not typically required when using public authorization server endpoints, as with fast local JWT token validation.

17.2. Using OAuth 2.0 token-based authentication

Streams for Apache Kafka supports the use of OAuth 2.0 for token-based authentication. An OAuth 2.0 authorization server handles the granting of access and inquiries about access. Kafka clients authenticate to Kafka brokers. Brokers and clients communicate with the authorization server, as necessary, to obtain or validate access tokens.

For a deployment of Streams for Apache Kafka, OAuth 2.0 integration provides the following support:

  • Server-side OAuth 2.0 authentication for Kafka brokers
  • Client-side OAuth 2.0 authentication for Kafka MirrorMaker, Kafka Connect, and the Kafka Bridge

17.2.1. Configuring OAuth 2.0 authentication on listeners

To secure Kafka brokers with OAuth 2.0 authentication, configure a listener in the Kafka resource to use OAUth 2.0 authentication and a client authentication mechanism, and add further configuration depending on the authentication mechanism and type of token validation used in the authentication.

Configuring listeners to use oauth authentication

Specify a listener in the Kafka resource with an oauth authentication type. You can configure internal and external listeners. We recommend using OAuth 2.0 authentication together with TLS encryption (tls: true). Without encryption, the connection is vulnerable to network eavesdropping and unauthorized access through token theft.

Example listener configuration with OAuth 2.0 authentication

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
spec:
  kafka:
    # ...
    listeners:
      - name: tls
        port: 9093
        type: internal
        tls: true
        authentication:
          type: oauth
      - name: external3
        port: 9094
        type: loadbalancer
        tls: true
        authentication:
          type: oauth
      #...

Enabling SASL authentication mechanisms

Use one or both of the following SASL mechanisms for clients to exchange credentials and establish authenticated sessions with Kafka.

OAUTHBEARER

Using the OAUTHBEARER authentication mechanism, credentials exchange uses a bearer token provided by an OAuth callback handler. Token provision can be configured to use the following methods:

  • Client ID and secret (using the OAuth 2.0 client credentials mechanism)
  • Client ID and client assertion
  • Long-lived access token or Service account token
  • Long-lived refresh token obtained manually

OAUTHBEARER is recommended as it provides a higher level of security than PLAIN, though it can only be used by Kafka clients that support the OAUTHBEARER mechanism at the protocol level. Client credentials are never shared with Kafka.

PLAIN

PLAIN is a simple authentication mechanism used by all Kafka client tools. Consider using PLAIN only with Kafka clients that do not support OAUTHBEARER. Using the PLAIN authentication mechanism, credentials exchange can be configured to use the following methods:

  • Client ID and secret (using the OAuth 2.0 client credentials mechanism)
  • Long-lived access token
    Regardless of the method used, the client must provide username and password properties to Kafka.

Credentials are handled centrally behind a compliant authorization server, similar to how OAUTHBEARER authentication is used. The username extraction process depends on the authorization server configuration.

OAUTHBEARER is automatically enabled in the oauth listener configuration for the Kafka broker. To use the PLAIN mechanism, you must set the enablePlain property to true.

In the following example, the PLAIN mechanism is enabled, and the OAUTHBEARER mechanism is disabled on a listener using the enableOauthBearer property.

Example listener configuration for the PLAIN mechanism

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
spec:
  kafka:
    # ...
    listeners:
      - name: tls
        port: 9093
        type: internal
        tls: true
        authentication:
          type: oauth
      - name: external3
        port: 9094
        type: loadbalancer
        tls: true
        authentication:
          type: oauth
          enablePlain: true
          enableOauthBearer: false
      #...

When you have defined the type of authentication as OAuth 2.0, you add configuration based on the type of validation, either as fast local JWT validation or token validation using an introspection endpoint.

Configuring fast local JWT token validation

Fast local JWT token validation involves checking a JWT token signature locally to ensure that the token meets the following criteria:

  • Contains a typ (type) or token_type header claim value of Bearer to indicate it is an access token
  • Is currently valid and not expired
  • Has an issuer that matches a validIssuerURI

You specify a validIssuerURI attribute when you configure the listener, so that any tokens not issued by the authorization server are rejected.

The authorization server does not need to be contacted during fast local JWT token validation. You activate fast local JWT token validation by specifying a jwksEndpointUri attribute, the endpoint exposed by the OAuth 2.0 authorization server. The endpoint contains the public keys used to validate signed JWT tokens, which are sent as credentials by Kafka clients.

All communication with the authorization server should be performed using TLS encryption. You can configure a certificate truststore as an OpenShift Secret in your Streams for Apache Kafka project namespace, and use the tlsTrustedCertificates property to point to the OpenShift secret containing the truststore file.

You might want to configure a userNameClaim to properly extract a username from the JWT token. If required, you can use a JsonPath expression like "['user.info'].['user.id']" to retrieve the username from nested JSON attributes within a token.

If you want to use Kafka ACL authorization, identify the user by their username during authentication. (The sub claim in JWT tokens is typically a unique ID, not a username.)

Example configuration for fast local JWT token validation

#...
- name: external3
  port: 9094
  type: loadbalancer
  tls: true
  authentication:
    type: oauth 1
    validIssuerUri: https://<auth_server_address>/<issuer-context> 2
    jwksEndpointUri: https://<auth_server_address>/<path_to_jwks_endpoint> 3
    userNameClaim: preferred_username 4
    maxSecondsWithoutReauthentication: 3600 5
    tlsTrustedCertificates: 6
      - secretName: oauth-server-cert
        pattern: "*.crt"
    disableTlsHostnameVerification: true 7
    jwksExpirySeconds: 360 8
    jwksRefreshSeconds: 300 9
    jwksMinRefreshPauseSeconds: 1 10

1
Listener type set to oauth.
2
URI of the token issuer used for authentication.
3
URI of the JWKS certificate endpoint used for local JWT validation.
4
The token claim (or key) that contains the actual username used to identify the user. Its value depends on the authorization server. If necessary, a JsonPath expression like "['user.info'].['user.id']" can be used to retrieve the username from nested JSON attributes within a token.
5
(Optional) Activates the Kafka re-authentication mechanism that enforces session expiry to the same length of time as the access token. If the specified value is less than the time left for the access token to expire, then the client will have to re-authenticate before the actual token expiry. By default, the session does not expire when the access token expires, and the client does not attempt re-authentication.
6
(Optional) Certificates stored in X.509 format within the specified secrets for TLS connection to the authorization server.
7
(Optional) Disable TLS hostname verification. Default is false.
8
The duration the JWKS certificates are considered valid before they expire. Default is 360 seconds. If you specify a longer time, consider the risk of allowing access to revoked certificates.
9
The period between refreshes of JWKS certificates. The interval must be at least 60 seconds shorter than the expiry interval. Default is 300 seconds.
10
The minimum pause in seconds between consecutive attempts to refresh JWKS public keys. When an unknown signing key is encountered, the JWKS keys refresh is scheduled outside the regular periodic schedule with at least the specified pause since the last refresh attempt. The refreshing of keys follows the rule of exponential backoff, retrying on unsuccessful refreshes with ever increasing pause, until it reaches jwksRefreshSeconds. The default value is 1.

Configuring fast local JWT token validation with OpenShift service accounts

To configure the listener for OpenShift service accounts, the Kubernetes API server must be used as the authorization server.

Example configuration for fast local JWT token validation using Kubernetes API server as authorization server

#...
- name: external3
  port: 9094
  type: loadbalancer
  tls: true
  authentication:
    type: oauth
    validIssuerUri: https://kubernetes.default.svc.cluster.local 1
    jwksEndpointUri: https://kubernetes.default.svc.cluster.local/openid/v1/jwks 2
    serverBearerTokenLocation: /var/run/secrets/kubernetes.io/serviceaccount/token 3
    checkAccessTokenType: false 4
    includeAcceptHeader: false 5
    tlsTrustedCertificates: 6
      - secretName: oauth-server-cert
        pattern: "*.crt"
    maxSecondsWithoutReauthentication: 3600
    customClaimCheck: "@.['kubernetes.io'] && @.['kubernetes.io'].['namespace'] in ['myproject']" 7

1
URI of the token issuer used for authentication. Must use FQDN, including the .cluster.local extension, which may vary based on the OpenShift cluster configuration.
2
URI of the JWKS certificate endpoint used for local JWT validation. Must use FQDN, including the .cluster.local extension, which may vary based on the OpenShift cluster configuration.
3
Location to the access token used by the Kafka broker to authenticate to the Kubernetes API server in order to access the jwksEndpointUri.
4
Skip the access token type check, as the claim for this is not present in service account tokens.
5
Skip sending Accept header in HTTP requests to the JWKS endpoint, as the Kubernetes API server does not support it.
6
Trusted certificates to connect to authorization server. This should point to a manually created Secret that contains the Kubernetes API server public certificate, which is mounted to the running pods under /var/run/secrets/kubernetes.io/serviceaccount/ca.crt. You can use the following command to create the Secret:
oc get cm kube-root-ca.crt -o jsonpath="{['data']['ca\.crt']}" > /tmp/ca.crt
oc create secret generic oauth-server-cert --from-file=ca.crt=/tmp/ca.crt
7
(Optional) Additional constraints that JWT token has to fulfill in order to be accepted, expressed as JsonPath filter query. In this example the service account has to belong to myproject namespace in order to be allowed to authenticate.

The above configuration uses the sub claim from the service account JWT token as the user ID. For example, the default service account for pods deployed in the myproject namespace has the username: system:serviceaccount:myproject:default.

When configuring ACLs the general form of how to refer to the ServiceAccount user should in that case be: User:system:serviceaccount:<Namespace>:<ServiceAccount-name>

Configuring token validation using an introspection endpoint

Token validation using an OAuth 2.0 introspection endpoint treats a received access token as opaque. The Kafka broker sends an access token to the introspection endpoint, which responds with the token information necessary for validation. Importantly, it returns up-to-date information if the specific access token is valid, and also information about when the token expires.

To configure OAuth 2.0 introspection-based validation, you specify an introspectionEndpointUri attribute rather than the jwksEndpointUri attribute specified for fast local JWT token validation. Depending on the authorization server, you typically have to specify a clientId and clientSecret, because the introspection endpoint is usually protected.

Example token validation configuration using an introspection endpoint

- name: external3
  port: 9094
  type: loadbalancer
  tls: true
  authentication:
    type: oauth
    validIssuerUri: https://<auth_server_address>/<issuer-context>
    introspectionEndpointUri: https://<auth_server_address>/<path_to_introspection_endpoint> 1
    clientId: kafka-broker 2
    clientSecret: 3
      secretName: my-cluster-oauth
      key: clientSecret
    userNameClaim: preferred_username 4
    maxSecondsWithoutReauthentication: 3600 5
    tlsTrustedCertificates:
      - secretName: oauth-server-cert
        pattern: "*.crt"

1
URI of the token introspection endpoint.
2
Client ID to identify the client.
3
Client Secret and client ID is used for authentication.
4
The token claim (or key) that contains the actual username used to identify the user. Its value depends on the authorization server. If necessary, a JsonPath expression like "['user.info'].['user.id']" can be used to retrieve the username from nested JSON attributes within a token.
5
(Optional) Activates the Kafka re-authentication mechanism that enforces session expiry to the same length of time as the access token. If the specified value is less than the time left for the access token to expire, then the client will have to re-authenticate before the actual token expiry. By default, the session does not expire when the access token expires, and the client does not attempt re-authentication.

Authenticating brokers to the authorization server protected endpoints

Usually, the certificates endpoint of the authorization server (jwksEndpointUri) is publicly accessible, while the introspection endpoint (introspectionEndpointUri) is protected. However, this may vary depending on the authorization server configuration.

The Kafka broker can authenticate to the authorization server’s protected endpoints in one of two ways using HTTP authentication schemes:

  • HTTP Basic authentication uses a client ID and secret.
  • HTTP Bearer authentication uses a bearer token.

To configure HTTP Basic authentication, set the following properties:

  • clientId
  • clientSecret

For HTTP Bearer authentication, set the following property:

  • serverBearerTokenLocation to specify the file path on disk containing the bearer token.

Including additional configuration options

Specify additional settings depending on the authentication requirements and the authorization server you are using. Some of these properties apply only to certain authentication mechanisms or when used in combination with other properties.

For example, when using OAUth over PLAIN, access tokens are passed as password property values with or without an $accessToken: prefix.

  • If you configure a token endpoint (tokenEndpointUri) in the listener configuration, you need the prefix.
  • If you don’t configure a token endpoint in the listener configuration, you don’t need the prefix. The Kafka broker interprets the password as a raw access token.

If the password is set as the access token, the username must be set to the same principal name that the Kafka broker obtains from the access token. You can specify username extraction options in your listener using the userNameClaim, usernamePrefix, fallbackUserNameClaim, fallbackUsernamePrefix, and userInfoEndpointUri properties. The username extraction process also depends on your authorization server; in particular, how it maps client IDs to account names.

Note

The PLAIN mechanism does not support password grant authentication. Use either client credentials (client ID + secret) or an access token for authentication.

Example optional configuration settings

  # ...
  authentication:
    type: oauth
    # ...
    checkIssuer: false 1
    checkAudience: true 2
    usernamePrefix: user- 3
    fallbackUserNameClaim: client_id 4
    fallbackUserNamePrefix: client-account- 5
    serverBearerTokenLocation: path/to/access/token 6
    validTokenType: bearer 7
    userInfoEndpointUri: https://<auth_server_address>/<path_to_userinfo_endpoint> 8
    enableOauthBearer: false 9
    enablePlain: true 10
    tokenEndpointUri: https://<auth_server_address>/<path_to_token_endpoint> 11
    customClaimCheck: "@.custom == 'custom-value'" 12
    clientAudience: audience 13
    clientScope: scope 14
    connectTimeoutSeconds: 60 15
    readTimeoutSeconds: 60 16
    httpRetries: 2 17
    httpRetryPauseMs: 300 18
    groupsClaim: "$.groups" 19
    groupsClaimDelimiter: "," 20
    includeAcceptHeader: false 21

1
If your authorization server does not provide an iss claim, it is not possible to perform an issuer check. In this situation, set checkIssuer to false and do not specify a validIssuerUri. Default is true.
2
If your authorization server provides an aud (audience) claim, and you want to enforce an audience check, set checkAudience to true. Audience checks identify the intended recipients of tokens. As a result, the Kafka broker will reject tokens that do not have its clientId in their aud claim. Default is false.
3
The prefix used when constructing the user ID. This only takes effect if userNameClaim is configured.
4
An authorization server may not provide a single attribute to identify both regular users and clients. When a client authenticates in its own name, the server might provide a client ID. When a user authenticates using a username and password to obtain a refresh token or an access token, the server might provide a username attribute in addition to a client ID. Use this fallback option to specify the username claim (attribute) to use if a primary user ID attribute is not available. If necessary, a JsonPath expression like "['client.info'].['client.id']" can be used to retrieve the fallback username to retrieve the username from nested JSON attributes within a token.
5
In situations where fallbackUserNameClaim is applicable, it may also be necessary to prevent name collisions between the values of the username claim, and those of the fallback username claim. Consider a situation where a client called producer exists, but also a regular user called producer exists. In order to differentiate between the two, you can use this property to add a prefix to the user ID of the client.
6
The location of the access token used by the Kafka broker to authenticate to the Kubernetes API server for accessing protected endpoints. The authorization server must support OAUTHBEARER authentication. This is an alternative to specifying clientId and clientSecret, which uses PLAIN authentication.
7
(Only applicable when using introspectionEndpointUri) Depending on the authorization server you are using, the introspection endpoint may or may not return the token type attribute, or it may contain different values. You can specify a valid token type value that the response from the introspection endpoint has to contain.
8
(Only applicable when using introspectionEndpointUri) The authorization server may be configured or implemented in such a way to not provide any identifiable information in an introspection endpoint response. In order to obtain the user ID, you can configure the URI of the userinfo endpoint as a fallback. The userNameClaim, fallbackUserNameClaim, and fallbackUserNamePrefix settings are applied to the response of userinfo endpoint.
9
Set this to false to disable the OAUTHBEARER mechanism on the listener. At least one of PLAIN or OAUTHBEARER has to be enabled. Default is true.
10
Set to true to enable PLAIN authentication on the listener, which is supported for clients on all platforms.
11
Additional configuration for the PLAIN mechanism. If specified, clients can authenticate over PLAIN by passing an access token as the password using an $accessToken: prefix. For production, always use https:// urls.
12
Additional custom rules can be imposed on the JWT access token during validation by setting this to a JsonPath filter query. If the access token does not contain the necessary data, it is rejected. When using the introspectionEndpointUri, the custom check is applied to the introspection endpoint response JSON.
13
An audience parameter passed to the token endpoint. An audience is used when obtaining an access token for inter-broker authentication. It is also used in the name of a client for OAuth 2.0 over PLAIN client authentication using a clientId and secret. This only affects the ability to obtain the token, and the content of the token, depending on the authorization server. It does not affect token validation rules by the listener.
14
A scope parameter passed to the token endpoint. A scope is used when obtaining an access token for inter-broker authentication. It is also used in the name of a client for OAuth 2.0 over PLAIN client authentication using a clientId and secret. This only affects the ability to obtain the token, and the content of the token, depending on the authorization server. It does not affect token validation rules by the listener.
15
The connect timeout in seconds when connecting to the authorization server. The default value is 60.
16
The read timeout in seconds when connecting to the authorization server. The default value is 60.
17
The maximum number of times to retry a failed HTTP request to the authorization server. The default value is 0, meaning that no retries are performed. To use this option effectively, consider reducing the timeout times for the connectTimeoutSeconds and readTimeoutSeconds options. However, note that retries may prevent the current worker thread from being available to other requests, and if too many requests stall, it could make the Kafka broker unresponsive.
18
The time to wait before attempting another retry of a failed HTTP request to the authorization server. By default, this time is set to zero, meaning that no pause is applied. This is because many issues that cause failed requests are per-request network glitches or proxy issues that can be resolved quickly. However, if your authorization server is under stress or experiencing high traffic, you may want to set this option to a value of 100 ms or more to reduce the load on the server and increase the likelihood of successful retries.
19
A JsonPath query that is used to extract groups information from either the JWT token or the introspection endpoint response. This option is not set by default. By configuring this option, a custom authorizer can make authorization decisions based on user groups.
20
A delimiter used to parse groups information when it is returned as a single delimited string. The default value is ',' (comma).
21
Some authorization servers have issues with client sending Accept: application/json header. By setting includeAcceptHeader: false the header will not be sent. Default is true.

17.2.2. Configuring OAuth 2.0 on client applications

To configure OAuth 2.0 on client applications, you must specify the following:

  • SASL (Simple Authentication and Security Layer) security protocols
  • SASL mechanisms
  • A JAAS (Java Authentication and Authorization Service) module
  • Authentication properties to access the authorization server

Configuring SASL protocols

Specify SASL protocols in the client configuration:

  • SASL_SSL for authentication over TLS encrypted connections
  • SASL_PLAINTEXT for authentication over unencrypted connections

Use SASL_SSL for production and SASL_PLAINTEXT for local development only.

When using SASL_SSL, additional ssl.truststore configuration is needed. The truststore configuration is required for secure connection (https://) to the OAuth 2.0 authorization server. To verify the OAuth 2.0 authorization server, add the CA certificate for the authorization server to the truststore in your client configuration. You can configure a truststore in PEM or PKCS #12 format.

Configuring SASL authentication mechanisms

Specify SASL mechanisms in the client configuration:

  • OAUTHBEARER for credentials exchange using a bearer token
  • PLAIN to pass client credentials (clientId + secret) or an access token

Configuring a JAAS module

Specify a JAAS module that implements the SASL authentication mechanism as a sasl.jaas.config property value:

  • org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule implements the OAUTHBEARER mechanism
  • org.apache.kafka.common.security.plain.PlainLoginModule implements the PLAIN mechanism
Note

For the OAUTHBEARER mechanism, Streams for Apache Kafka provides a callback handler for clients that use Kafka Client Java libraries to enable credentials exchange. For clients in other languages, custom code may be required to obtain the access token. For the PLAIN mechanism, Streams for Apache Kafka provides server-side callbacks to enable credentials exchange.

To be able to use the OAUTHBEARER mechanism, you must also add the custom io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler class as the callback handler. JaasClientOauthLoginCallbackHandler handles OAuth callbacks to the authorization server for access tokens during client login. This enables automatic token renewal, ensuring continuous authentication without user intervention. Additionally, it handles login credentials for clients using the OAuth 2.0 password grant method.

Configuring authentication properties

Configure the client to use credentials or access tokens for OAuth 2.0 authentication.

Using client credentials
Using client credentials involves configuring the client with the necessary credentials (client ID and secret, or client ID and client assertion) to obtain a valid access token from an authorization server. This is the simplest mechanism.
Using access tokens
Using access tokens, the client is configured with a valid long-lived access token or refresh token obtained from an authorization server. Using access tokens adds more complexity because there is an additional dependency on authorization server tools. If you are using long-lived access tokens, you may need to configure the client in the authorization server to increase the maximum lifetime of the token.

The only information ever sent to Kafka is the access token. The credentials used to obtain the token are never sent to Kafka. When a client obtains an access token, no further communication with the authorization server is needed.

SASL authentication properties support the following authentication methods:

  • OAuth 2.0 client credentials
  • Access token or Service account token
  • Refresh token
  • OAuth 2.0 password grant (deprecated)

Add the authentication properties as JAAS configuration (sasl.jaas.config and sasl.login.callback.handler.class).

If the client application is not configured with an access token directly, the client exchanges one of the following sets of credentials for an access token during Kafka session initiation:

  • Client ID and secret
  • Client ID and client assertion
  • Client ID, refresh token, and (optionally) a secret
  • Username and password, with client ID and (optionally) a secret
Note

You can also specify authentication properties as environment variables, or as Java system properties. For Java system properties, you can set them using setProperty and pass them on the command line using the -D option.

Example client credentials configuration using the client secret

security.protocol=SASL_SSL 1
sasl.mechanism=OAUTHBEARER 2
ssl.truststore.location=/tmp/truststore.p12 3
ssl.truststore.password=$STOREPASS
ssl.truststore.type=PKCS12
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  oauth.token.endpoint.uri="<token_endpoint_url>" \ 4
  oauth.client.id="<client_id>" \ 5
  oauth.client.secret="<client_secret>" \ 6
  oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \ 7
  oauth.ssl.truststore.password="$STOREPASS" \ 8
  oauth.ssl.truststore.type="PKCS12" \ 9
  oauth.scope="<scope>" \ 10
  oauth.audience="<audience>" ; 11
sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler

1
SASL_SSL security protocol for TLS-encrypted connections. Use SASL_PLAINTEXT over unencrypted connections for local development only.
2
The SASL mechanism specified as OAUTHBEARER or PLAIN.
3
The truststore configuration for secure access to the Kafka cluster.
4
URI of the authorization server token endpoint.
5
Client ID, which is the name used when creating the client in the authorization server.
6
Client secret created when creating the client in the authorization server.
7
The location contains the public key certificate (truststore.p12) for the authorization server.
8
The password for accessing the truststore.
9
The truststore type.
10
(Optional) The scope for requesting the token from the token endpoint. An authorization server may require a client to specify the scope.
11
(Optional) The audience for requesting the token from the token endpoint. An authorization server may require a client to specify the audience.

Example client credentials configuration using the client assertion

security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
ssl.truststore.location=/tmp/truststore.p12
ssl.truststore.password=$STOREPASS
ssl.truststore.type=PKCS12
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  oauth.token.endpoint.uri="<token_endpoint_url>" \
  oauth.client.id="<client_id>" \
  oauth.client.assertion.location="<path_to_client_assertion_token_file>" \ 1
  oauth.client.assertion.type="urn:ietf:params:oauth:client-assertion-type:jwt-bearer" \ 2
  oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \
  oauth.ssl.truststore.password="$STOREPASS" \
  oauth.ssl.truststore.type="PKCS12" \
  oauth.scope="<scope>" \
  oauth.audience="<audience>" ;
sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler

1
Path to the client assertion file used for authenticating the client. This file is a private key file as an alternative to the client secret. Alternatively, use the oauth.client.assertion option to specify the client assertion value in clear text.
2
(Optional) Sometimes you may need to specify the client assertion type. In not specified, the default value is urn:ietf:params:oauth:client-assertion-type:jwt-bearer.

Example password grants configuration

security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
ssl.truststore.location=/tmp/truststore.p12
ssl.truststore.password=$STOREPASS
ssl.truststore.type=PKCS12
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  oauth.token.endpoint.uri="<token_endpoint_url>" \
  oauth.client.id="<client_id>" \ 1
  oauth.client.secret="<client_secret>" \ 2
  oauth.password.grant.username="<username>" \ 3
  oauth.password.grant.password="<password>" \ 4
  oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \
  oauth.ssl.truststore.password="$STOREPASS" \
  oauth.ssl.truststore.type="PKCS12" \
  oauth.scope="<scope>" \
  oauth.audience="<audience>" ;
sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler

1
Client ID, which is the name used when creating the client in the authorization server.
2
(Optional) Client secret created when creating the client in the authorization server.
3
Username for password grant authentication. OAuth password grant configuration (username and password) uses the OAuth 2.0 password grant method. To use password grants, create a user account for a client on your authorization server with limited permissions. The account should act like a service account. Use in environments where user accounts are required for authentication, but consider using a refresh token first.
4
Password for password grant authentication.
Note

SASL PLAIN does not support passing a username and password (password grants) using the OAuth 2.0 password grant method.

Example access token configuration

security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
ssl.truststore.location=/tmp/truststore.p12
ssl.truststore.password=$STOREPASS
ssl.truststore.type=PKCS12
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  oauth.access.token="<access_token>" ; 1
sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler

1
Long-lived access token for Kafka clients. Alternatively, oauth.access.token.location can be used to specify the file that contains the access token.

Example OpenShift service account token configuration

security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
ssl.truststore.location=/tmp/truststore.p12
ssl.truststore.password=$STOREPASS
ssl.truststore.type=PKCS12
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  oauth.access.token.location="/var/run/secrets/kubernetes.io/serviceaccount/token";  1
sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler

1
Location to the service account token on the filesystem (assuming that the client is deployed as an OpenShift pod)

Example refresh token configuration

security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
ssl.truststore.location=/tmp/truststore.p12
ssl.truststore.password=$STOREPASS
ssl.truststore.type=PKCS12
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  oauth.token.endpoint.uri="<token_endpoint_url>" \
  oauth.client.id="<client_id>" \ 1
  oauth.client.secret="<client_secret>" \ 2
  oauth.refresh.token="<refresh_token>" \ 3
  oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \
  oauth.ssl.truststore.password="$STOREPASS" \
  oauth.ssl.truststore.type="PKCS12" ;
sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler

1
Client ID, which is the name used when creating the client in the authorization server.
2
(Optional) Client secret created when creating the client in the authorization server.
3
Long-lived refresh token for Kafka clients.

SASL extensions for custom OAUTHBEARER implementations

If your Kafka broker uses a custom OAUTHBEARER implementation, you may need to pass additional SASL extension options. These extensions can include attributes or information required as client context by the authorization server. The options are passed as key-value pairs and are sent to the Kafka broker when a new session is started.

Pass SASL extension values using oauth.sasl.extension. as a key prefix.

Example configuration to pass SASL extension values

oauth.sasl.extension.key1="value1"
oauth.sasl.extension.key2="value2"

17.2.3. OAuth 2.0 client authentication flows

OAuth 2.0 authentication flows depend on the underlying Kafka client and Kafka broker configuration. The flows must also be supported by the authorization server used.

The Kafka broker listener configuration determines how clients authenticate using an access token. The client can pass a client ID and secret to request an access token.

If a listener is configured to use PLAIN authentication, the client can authenticate with a client ID and secret or username and access token. These values are passed as the username and password properties of the PLAIN mechanism.

Listener configuration supports the following token validation options:

  • You can use fast local token validation based on JWT signature checking and local token introspection, without contacting an authorization server. The authorization server provides a JWKS endpoint with public certificates that are used to validate signatures on the tokens.
  • You can use a call to a token introspection endpoint provided by an authorization server. Each time a new Kafka broker connection is established, the broker passes the access token received from the client to the authorization server. The Kafka broker checks the response to confirm whether the token is valid.
Note

An authorization server might only allow the use of opaque access tokens, which means that local token validation is not possible.

Kafka client credentials can also be configured for the following types of authentication:

  • Direct local access using a previously generated long-lived access token
  • Contact with the authorization server for a new access token to be issued (using a client ID and credentials, or a refresh token, or a username and a password)

17.2.3.1. Example client authentication flows using the SASL OAUTHBEARER mechanism

You can use the following communication flows for Kafka authentication using the SASL OAUTHBEARER mechanism.

Client using client ID and credentials, with broker delegating validation to authorization server

Client using client ID and secret with broker delegating validation to authorization server

  1. The Kafka client requests an access token from the authorization server using a client ID and credentials, and optionally a refresh token. Alternatively, the client may authenticate using a username and a password.
  2. The authorization server generates a new access token.
  3. The Kafka client authenticates with the Kafka broker using the SASL OAUTHBEARER mechanism to pass the access token.
  4. The Kafka broker validates the access token by calling a token introspection endpoint on the authorization server using its own client ID and secret.
  5. A Kafka client session is established if the token is valid.

Client using client ID and credentials, with broker performing fast local token validation

Client using client ID and credentials with broker performing fast local token validation

  1. The Kafka client authenticates with the authorization server from the token endpoint, using a client ID and credentials, and optionally a refresh token. Alternatively, the client may authenticate using a username and a password.
  2. The authorization server generates a new access token.
  3. The Kafka client authenticates with the Kafka broker using the SASL OAUTHBEARER mechanism to pass the access token.
  4. The Kafka broker validates the access token locally using a JWT token signature check, and local token introspection.

Client using long-lived access token, with broker delegating validation to authorization server

Client using long-lived access token with broker delegating validation to authorization server

  1. The Kafka client authenticates with the Kafka broker using the SASL OAUTHBEARER mechanism to pass the long-lived access token.
  2. The Kafka broker validates the access token by calling a token introspection endpoint on the authorization server, using its own client ID and secret.
  3. A Kafka client session is established if the token is valid.

Client using long-lived access token, with broker performing fast local validation

Client using long-lived access token with broker performing fast local validation

  1. The Kafka client authenticates with the Kafka broker using the SASL OAUTHBEARER mechanism to pass the long-lived access token.
  2. The Kafka broker validates the access token locally using a JWT token signature check and local token introspection.
Warning

Fast local JWT token signature validation is suitable only for short-lived tokens as there is no check with the authorization server if a token has been revoked. Token expiration is written into the token, but revocation can happen at any time, so cannot be accounted for without contacting the authorization server. Any issued token would be considered valid until it expires.

17.2.3.2. Example client authentication flows using the SASL PLAIN mechanism

You can use the following communication flows for Kafka authentication using the OAuth PLAIN mechanism.

Client using a client ID and secret, with the broker obtaining the access token for the client

Client using a client ID and secret with the broker obtaining the access token for the client

  1. The Kafka client passes a clientId as a username and a secret as a password.
  2. The Kafka broker uses a token endpoint to pass the clientId and secret to the authorization server.
  3. The authorization server returns a fresh access token or an error if the client credentials are not valid.
  4. The Kafka broker validates the token in one of the following ways:

    1. If a token introspection endpoint is specified, the Kafka broker validates the access token by calling the endpoint on the authorization server. A session is established if the token validation is successful.
    2. If local token introspection is used, a request is not made to the authorization server. The Kafka broker validates the access token locally using a JWT token signature check.

Client using a long-lived access token without a client ID and secret

Client using a long-lived access token without a client ID and secret

  1. The Kafka client passes a username and password. The password provides the value of an access token that was obtained manually and configured before running the client.
  2. The password is passed with or without an $accessToken: string prefix depending on whether or not the Kafka broker listener is configured with a token endpoint for authentication.

    1. If the token endpoint is configured, the password should be prefixed by $accessToken: to let the broker know that the password parameter contains an access token rather than a client secret. The Kafka broker interprets the username as the account username.
    2. If the token endpoint is not configured on the Kafka broker listener (enforcing a no-client-credentials mode), the password should provide the access token without the prefix. The Kafka broker interprets the username as the account username. In this mode, the client doesn’t use a client ID and secret, and the password parameter is always interpreted as a raw access token.
  3. The Kafka broker validates the token in one of the following ways:

    1. If a token introspection endpoint is specified, the Kafka broker validates the access token by calling the endpoint on the authorization server. A session is established if token validation is successful.
    2. If local token introspection is used, there is no request made to the authorization server. Kafka broker validates the access token locally using a JWT token signature check.

17.2.4. Re-authenticating sessions

Configure oauth listeners to use Kafka session re-authentication for OAuth 2.0 sessions between Kafka clients and Kafka. This mechanism enforces the expiry of an authenticated session between the client and the broker after a defined period of time. When a session expires, the client immediately starts a new session by reusing the existing connection rather than dropping it.

Session re-authentication is disabled by default. To enable it, you set a time value for maxSecondsWithoutReauthentication in the oauth listener configuration. The same property is used to configure session re-authentication for OAUTHBEARER and PLAIN authentication. For an example configuration, see Section 17.2.1, “Configuring OAuth 2.0 authentication on listeners”.

Session re-authentication must be supported by the Kafka client libraries used by the client.

Session re-authentication can be used with fast local JWT or introspection endpoint token validation.

Client re-authentication

When the broker’s authenticated session expires, the client must re-authenticate to the existing session by sending a new, valid access token to the broker, without dropping the connection.

If token validation is successful, a new client session is started using the existing connection. If the client fails to re-authenticate, the broker will close the connection if further attempts are made to send or receive messages. Java clients that use Kafka client library 2.2 or later automatically re-authenticate if the re-authentication mechanism is enabled on the broker.

Session re-authentication also applies to refresh tokens, if used. When the session expires, the client refreshes the access token by using its refresh token. The client then uses the new access token to re-authenticate to the existing session.

Session expiry

When session re-authentication is configured, session expiry works differently for OAUTHBEARER and PLAIN authentication.

For OAUTHBEARER and PLAIN, using the client ID and secret method:

  • The broker’s authenticated session will expire at the configured maxSecondsWithoutReauthentication.
  • The session will expire earlier if the access token expires before the configured time.

For PLAIN using the long-lived access token method:

  • The broker’s authenticated session will expire at the configured maxSecondsWithoutReauthentication.
  • Re-authentication will fail if the access token expires before the configured time. Although session re-authentication is attempted, PLAIN has no mechanism for refreshing tokens.

If maxSecondsWithoutReauthentication is not configured, OAUTHBEARER and PLAIN clients can remain connected to brokers indefinitely, without needing to re-authenticate. Authenticated sessions do not end with access token expiry. However, this can be considered when configuring authorization, for example, by using keycloak authorization or installing a custom authorizer.

17.2.5. Example: Enabling OAuth 2.0 authentication

This example shows how to configure client access to a Kafka cluster using OAUth 2.0 authentication. The procedures describe the configuration required to set up OAuth 2.0 authentication on Kafka listeners, Kafka Java clients, and Kafka components.

17.2.5.1. Setting up OAuth 2.0 authentication on listeners

Configure Kafka listeners so that they are enabled to use OAuth 2.0 authentication using an authorization server.

We advise using OAuth 2.0 over an encrypted interface through through a listener with tls: true. Plain listeners are not recommended.

If the authorization server is using certificates signed by the trusted CA and matching the OAuth 2.0 server hostname, TLS connection works using the default settings. Otherwise, you may need to configure the truststore with proper certificates or disable the certificate hostname validation.

For more information on the configuration of OAuth 2.0 authentication for Kafka broker listeners, see the KafkaListenerAuthenticationOAuth schema reference.

Prerequisites

  • Streams for Apache Kafka and Kafka are running
  • An OAuth 2.0 authorization server is deployed

Procedure

  1. Specify a listener in the Kafka resource with an oauth authentication type.

    Example listener configuration with OAuth 2.0 authentication

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    spec:
      kafka:
        # ...
        listeners:
          - name: tls
            port: 9093
            type: internal
            tls: true
            authentication:
              type: oauth
          - name: external3
            port: 9094
            type: loadbalancer
            tls: true
            authentication:
              type: oauth
          #...

  2. Configure the OAuth listener depending on the authorization server and validation type:

  3. Apply the changes to the Kafka configuration.
  4. Check the update in the logs or by watching the pod state transitions:

    oc logs -f ${POD_NAME} -c ${CONTAINER_NAME}
    oc get pod -w

    The rolling update configures the brokers to use OAuth 2.0 authentication.

17.2.5.2. Setting up OAuth 2.0 on Kafka Java clients

Configure Kafka producer and consumer APIs to use OAuth 2.0 for interaction with Kafka brokers. Add a callback plugin to your client pom.xml file, then configure your client for OAuth 2.0.

How you configure the authentication properties depends on the authentication method you are using to access the OAuth 2.0 authorization server. In this procedure, the properties are specified in a properties file, then loaded into the client configuration.

Prerequisites

  • Streams for Apache Kafka and Kafka are running
  • An OAuth 2.0 authorization server is deployed and configured for OAuth access to Kafka brokers
  • Kafka brokers are configured for OAuth 2.0

Procedure

  1. Add the client library with OAuth 2.0 support to the pom.xml file for the Kafka client:

    <dependency>
     <groupId>io.strimzi</groupId>
     <artifactId>kafka-oauth-client</artifactId>
     <version>0.15.0.redhat-00012</version>
    </dependency>
  2. Configure the client depending on the OAuth 2.0 authentication method:

    For example, specify the properties for the authentication method in a client.properties file.

  3. Input the client properties for OAUTH 2.0 authentication into the Java client code.

    Example showing input of client properties

    Properties props = new Properties();
    try (FileReader reader = new FileReader("client.properties", StandardCharsets.UTF_8)) {
      props.load(reader);
    }

  4. Verify that the Kafka client can access the Kafka brokers.

17.2.5.3. Setting up OAuth 2.0 on Kafka components

This procedure describes how to set up Kafka components to use OAuth 2.0 authentication using an authorization server.

You can configure OAuth 2.0 authentication for the following components:

  • Kafka Connect
  • Kafka MirrorMaker
  • Kafka Bridge

In this scenario, the Kafka component and the authorization server are running in the same cluster.

Before you start

For more information on the configuration of OAuth 2.0 authentication for Kafka components, see the KafkaClientAuthenticationOAuth schema reference. The schema reference includes examples of configuration options.

Prerequisites

  • Streams for Apache Kafka and Kafka are running
  • An OAuth 2.0 authorization server is deployed and configured for OAuth access to Kafka brokers
  • Kafka brokers are configured for OAuth 2.0

Procedure

  1. Create a client secret and mount it to the component as an environment variable.

    For example, here we are creating a client Secret for the Kafka Bridge:

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Secret
    metadata:
     name: my-bridge-oauth
    type: Opaque
    data:
     clientSecret: MGQ1OTRmMzYtZTllZS00MDY2LWI5OGEtMTM5MzM2NjdlZjQw 1
    1
    The clientSecret key must be in base64 format.
  2. Create or edit the resource for the Kafka component so that OAuth 2.0 authentication is configured for the authentication property.

    For OAuth 2.0 authentication, you can use the following options:

    • Client ID and secret
    • Client ID and client assertion
    • Client ID and refresh token
    • Access token
    • Username and password
    • TLS

    For example, here OAuth 2.0 is assigned to the Kafka Bridge client using a client ID and secret, and TLS:

    Example OAuth 2.0 authentication configuration using the client secret

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaBridge
    metadata:
      name: my-bridge
    spec:
      # ...
      authentication:
        type: oauth 1
        tokenEndpointUri: https://<auth_server_address>/<path_to_token_endpoint> 2
        clientId: kafka-bridge
        clientSecret:
          secretName: my-bridge-oauth
          key: clientSecret
        tlsTrustedCertificates: 3
          - secretName: oauth-server-cert
            pattern: "*.crt"

    1
    Authentication type set to oauth.
    2
    URI of the token endpoint for authentication.
    3
    Certificates stored in X.509 format within the specified secrets for TLS connection to the authorization server.

    In this example, OAuth 2.0 is assigned to the Kafka Bridge client using a client ID and the location of a client assertion file, with TLS to connect to the authorization server:

    Example OAuth 2.0 authentication configuration using client assertion

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaBridge
    metadata:
      name: my-bridge
    spec:
      # ...
      authentication:
        type: oauth
        tokenEndpointUri: https://<auth_server_address>/<path_to_token_endpoint>
        clientId: kafka-bridge
        clientAssertionLocation: /var/run/secrets/sso/assertion 1
        tlsTrustedCertificates:
          - secretName: oauth-server-cert
            pattern: "*.crt"

    1
    Filesystem path to the client assertion file used for authenticating the client. This file is typically added to the deployed pod by an external operator service. Alternatively, use clientAssertion to refer to a secret containing the client assertion value.

    Here, OAuth 2.0 is assigned to the Kafka Bridge client using a service account token:

    Example OAuth 2.0 authentication configuration using the service account token

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaBridge
    metadata:
      name: my-bridge
    spec:
      # ...
      authentication:
        type: oauth
        accessTokenLocation: /var/run/secrets/kubernetes.io/serviceaccount/token 1

    1
    Path to the service account token file location.

    Depending on how you apply OAuth 2.0 authentication, and the type of authorization server, there are additional configuration options you can use:

    Additional configuration options

    # ...
    spec:
      # ...
      authentication:
        # ...
        disableTlsHostnameVerification: true 1
        accessTokenIsJwt: false 2
        scope: any 3
        audience: kafka 4
        connectTimeoutSeconds: 60 5
        readTimeoutSeconds: 60 6
        httpRetries: 2 7
        httpRetryPauseMs: 300 8
        includeAcceptHeader: false 9

    1
    (Optional) Disable TLS hostname verification. Default is false.
    2
    If you are using opaque tokens, you can apply accessTokenIsJwt: false so that access tokens are not treated as JWT tokens.
    3
    (Optional) The scope for requesting the token from the token endpoint. An authorization server may require a client to specify the scope. In this case it is any.
    4
    (Optional) The audience for requesting the token from the token endpoint. An authorization server may require a client to specify the audience. In this case it is kafka.
    5
    (Optional) The connect timeout in seconds when connecting to the authorization server. The default value is 60.
    6
    (Optional) The read timeout in seconds when connecting to the authorization server. The default value is 60.
    7
    (Optional) The maximum number of times to retry a failed HTTP request to the authorization server. The default value is 0, meaning that no retries are performed. To use this option effectively, consider reducing the timeout times for the connectTimeoutSeconds and readTimeoutSeconds options. However, note that retries may prevent the current worker thread from being available to other requests, and if too many requests stall, it could make the Kafka broker unresponsive.
    8
    (Optional) The time to wait before attempting another retry of a failed HTTP request to the authorization server. By default, this time is set to zero, meaning that no pause is applied. This is because many issues that cause failed requests are per-request network glitches or proxy issues that can be resolved quickly. However, if your authorization server is under stress or experiencing high traffic, you may want to set this option to a value of 100 ms or more to reduce the load on the server and increase the likelihood of successful retries.
    9
    (Optional) Some authorization servers have issues with client sending Accept: application/json header. By setting includeAcceptHeader: false the header will not be sent. Default is true.
  3. Apply the changes to the resource configuration of the component.
  4. Check the update in the logs or by watching the pod state transitions:

    oc logs -f ${POD_NAME} -c ${CONTAINER_NAME}
    oc get pod -w

    The rolling updates configure the component for interaction with Kafka brokers using OAuth 2.0 authentication.

17.3. Using OAuth 2.0 token-based authorization

Streams for Apache Kafka supports the use of OAuth 2.0 token-based authorization through Authorization Services, which lets you manage security policies and permissions centrally.

Security policies and permissions defined in Red Hat build of Keycloak grant access to Kafka resources. Users and clients are matched against policies that permit access to perform specific actions on Kafka brokers.

Kafka allows all users full access to brokers by default, but also provides the AclAuthorizer and StandardAuthorizer plugins to configure authorization based on Access Control Lists (ACLs). The ACL rules managed by these plugins are used to grant or deny access to resources based on username, and these rules are stored within the Kafka cluster itself.

However, OAuth 2.0 token-based authorization with Red Hat build of Keycloak offers far greater flexibility on how you wish to implement access control to Kafka brokers. In addition, you can configure your Kafka brokers to use OAuth 2.0 authorization and ACLs.

17.3.1. Example: Enabling OAuth 2.0 authorization

This example procedure shows how to configure Kafka to use OAuth 2.0 authorization using Red Hat build of Keycloak Authorization Services. To enable OAuth 2.0 authorization using Red Hat build of Keycloak, configure the Kafka resource to use keycloak authorization and specify the properties required to access the authorization server and Red Hat build of Keycloak Authorization Services.

Red Hat build of Keycloak server Authorization Services REST endpoints extend token-based authentication with Red Hat build of Keycloak by applying defined security policies on a particular user, and providing a list of permissions granted on different resources for that user. Policies use roles and groups to match permissions to users. OAuth 2.0 authorization enforces permissions locally based on the received list of grants for the user from Red Hat build of Keycloak Authorization Services.

A Red Hat build of Keycloak authorizer (KeycloakAuthorizer) is provided with Streams for Apache Kafka. The authorizer fetches a list of granted permissions from the authorization server as needed, and enforces authorization locally on Kafka, making rapid authorization decisions for each client request.

Before you begin

Consider the access you require or want to limit for certain users. You can use a combination of Red Hat build of Keycloak groups, roles, clients, and users to configure access in Red Hat build of Keycloak.

Typically, groups are used to match users based on organizational departments or geographical locations. And roles are used to match users based on their function.

With Red Hat build of Keycloak, you can store users and groups in LDAP, whereas clients and roles cannot be stored this way. Storage and access to user data may be a factor in how you choose to configure authorization policies.

Note

Super users always have unconstrained access to Kafka regardless of the authorization implemented.

Prerequisites

  • Streams for Apache Kafka must be configured to use OAuth 2.0 with Red Hat build of Keycloak for token-based authentication. You use the same Red Hat build of Keycloak server endpoint when you set up authorization.
  • OAuth 2.0 authentication must be configured with the maxSecondsWithoutReauthentication option to enable re-authentication.

Procedure

  1. Access the Red Hat build of Keycloak Admin Console or use the Red Hat build of Keycloak Admin CLI to enable Authorization Services for the OAuth 2.0 client for Kafka you created when setting up OAuth 2.0 authentication.
  2. Use Authorization Services to define resources, authorization scopes, policies, and permissions for the client.
  3. Bind the permissions to users and clients by assigning them roles and groups.
  4. Configure the kafka resource to use keycloak authorization, and to be able to access the authorization server and Authorization Services.

    Example OAuth 2.0 authorization configuration

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        # ...
        authorization:
          type: keycloak 1
          tokenEndpointUri: <https://<auth-server-address>/realms/external/protocol/openid-connect/token> 2
          clientId: kafka 3
          delegateToKafkaAcls: false 4
          disableTlsHostnameVerification: false 5
          superUsers: 6
            - CN=user-1
            - user-2
            - CN=user-3
          tlsTrustedCertificates: 7
            - secretName: oauth-server-cert
              pattern: "*.crt"
          grantsRefreshPeriodSeconds: 60 8
          grantsRefreshPoolSize: 5 9
          grantsMaxIdleSeconds: 300 10
          grantsGcPeriodSeconds: 300 11
          grantsAlwaysLatest: false 12
          connectTimeoutSeconds: 60 13
          readTimeoutSeconds: 60 14
          httpRetries: 2 15
          enableMetrics: false 16
          includeAcceptHeader: false 17
        #...

    1
    Type keycloak enables Red Hat build of Keycloak authorization.
    2
    URI of the Red Hat build of Keycloak token endpoint. For production, always use https:// urls. When you configure token-based oauth authentication, you specify a jwksEndpointUri as the URI for local JWT validation. The hostname for the tokenEndpointUri URI must be the same.
    3
    The client ID of the OAuth 2.0 client definition in Red Hat build of Keycloak that has Authorization Services enabled. Typically, kafka is used as the ID.
    4
    (Optional) Delegate authorization to Kafka AclAuthorizer and StandardAuthorizer if access is denied by Red Hat build of Keycloak Authorization Services policies. Default is false.
    5
    (Optional) Disable TLS hostname verification. Default is false.
    6
    (Optional) Designated super users.
    7
    (Optional) Certificates stored in X.509 format within the specified secrets for TLS connection to the authorization server.
    8
    (Optional) The time between two consecutive grants refresh runs. That is the maximum time for active sessions to detect any permissions changes for the user on Red Hat build of Keycloak. The default value is 60.
    9
    (Optional) The number of threads to use to refresh (in parallel) the grants for the active sessions. The default value is 5.
    10
    (Optional) The time, in seconds, after which an idle grant in the cache can be evicted. The default value is 300.
    11
    (Optional) The time, in seconds, between consecutive runs of a job that cleans stale grants from the cache. The default value is 300.
    12
    (Optional) Controls whether the latest grants are fetched for a new session. When enabled, grants are retrieved from Red Hat build of Keycloak and cached for the user. The default value is false.
    13
    (Optional) The connect timeout in seconds when connecting to the Red Hat build of Keycloak token endpoint. The default value is 60.
    14
    (Optional) The read timeout in seconds when connecting to the Red Hat build of Keycloak token endpoint. The default value is 60.
    15
    (Optional) The maximum number of times to retry (without pausing) a failed HTTP request to the authorization server. The default value is 0, meaning that no retries are performed. To use this option effectively, consider reducing the timeout times for the connectTimeoutSeconds and readTimeoutSeconds options. However, note that retries may prevent the current worker thread from being available to other requests, and if too many requests stall, it could make Kafka unresponsive.
    16
    (Optional) Enable or disable OAuth metrics. The default value is false.
    17
    (Optional) Some authorization servers have issues with client sending Accept: application/json header. By setting includeAcceptHeader: false the header will not be sent. Default is true.
  5. Apply the changes to the Kafka configuration.
  6. Check the update in the logs or by watching the pod state transitions:

    oc logs -f ${POD_NAME} -c kafka
    oc get pod -w

    The rolling update configures the brokers to use OAuth 2.0 authorization.

  7. Verify the configured permissions by accessing Kafka brokers as clients or users with specific roles, ensuring they have the necessary access and do not have unauthorized access.

17.4. Setting up permissions in Red Hat build of Keycloak

When using Red Hat build of Keycloak as the OAuth 2.0 authorization server, Kafka permissions are granted to user accounts or service accounts using authorization permissions. To grant permissions to access Kafka, create an OAuth client specification in Red Hat build of Keycloak that maps the authorization models of Red Hat build of Keycloak Authorization Services and Kafka.

17.4.1. Kafka and Red Hat build of Keycloak authorization models

Kafka and Red Hat build of Keycloak use different authorization models.

Kafka authorization model

Kafka’s authorization model uses resource types and operations to describe ACLs for a user. When a Kafka client performs an action on a broker, the broker uses the configured KeycloakAuthorizer to check the client’s permissions, based on the action and resource type.

Each resource type has a set of available permissions for operations. For example, the Topic resource type has Create and Write permissions among others.

Refer to the Kafka authorization model in the Kafka documentation for the full list of resources and permissions.

Red Hat build of Keycloak authorization model

Red Hat build of Keycloak’s authorization services model has four concepts for defining and granting permissions:

  • Resources
  • Scopes
  • Policies
  • Permissions

For information on these concepts, see the guide to Authorization Services.

17.4.2. Mapping authorization models

The Kafka authorization model is used as a basis for defining the Red Hat build of Keycloak roles and resources that control access to Kafka.

To grant Kafka permissions to user accounts or service accounts, you first create an OAuth client specification in Red Hat build of Keycloak for the Kafka cluster. You then specify Red Hat build of Keycloak Authorization Services rules on the client. Typically, the client ID of the OAuth client that represents the Kafka cluster is kafka. The example configuration files provided with Streams for Apache Kafka use kafka as the OAuth client id.

Note

If you have multiple Kafka clusters, you can use a single OAuth client (kafka) for all of them. This gives you a single, unified space in which to define and manage authorization rules. However, you can also use different OAuth client ids (for example, my-cluster-kafka or cluster-dev-kafka) and define authorization rules for each cluster within each client configuration.

The kafka client definition must have the Authorization Enabled option enabled in the Red Hat build of Keycloak Admin Console. For information on enabling authorization services, see the guide to Authorization Services.

All permissions exist within the scope of the kafka client. If you have different Kafka clusters configured with different OAuth client IDs, they each need a separate set of permissions even though they’re part of the same Red Hat build of Keycloak realm.

When the Kafka client uses OAUTHBEARER authentication, the Red Hat build of Keycloak authorizer (KeycloakAuthorizer) uses the access token of the current session to retrieve a list of grants from the Red Hat build of Keycloak server. To grant permissions, the authorizer evaluates the grants list (received and cached) from Red Hat build of Keycloak Authorization Services based on the access token owner’s policies and permissions.

Uploading authorization scopes for Kafka permissions

An initial Red Hat build of Keycloak configuration usually involves uploading authorization scopes to create a list of all possible actions that can be performed on each Kafka resource type. This step is performed once only, before defining any permissions. You can add authorization scopes manually instead of uploading them.

Authorization scopes should contain the following Kafka permissions regardless of the resource type:

  • Create
  • Write
  • Read
  • Delete
  • Describe
  • Alter
  • DescribeConfigs
  • AlterConfigs
  • ClusterAction
  • IdempotentWrite

If you’re certain you won’t need a permission (for example, IdempotentWrite), you can omit it from the list of authorization scopes. However, that permission won’t be available to target on Kafka resources.

Note

The All permission is not supported.

Resource patterns for permissions checks

Resource patterns are used for pattern matching against the targeted resources when performing permission checks. The general pattern format is <resource_type>:<pattern_name>.

The resource types mirror the Kafka authorization model. The pattern allows for two matching options:

  • Exact matching (when the pattern does not end with *)
  • Prefix matching (when the pattern ends with *)

Example patterns for resources

Topic:my-topic
Topic:orders-*
Group:orders-*
Cluster:*

Additionally, the general pattern format can be prefixed by kafka-cluster:<cluster_name> followed by a comma, where <cluster_name> refers to the metadata.name in the Kafka custom resource.

Example patterns for resources with cluster prefix

kafka-cluster:my-cluster,Topic:*
kafka-cluster:*,Group:b_*

When the kafka-cluster prefix is missing, it is assumed to be kafka-cluster:*.

When defining a resource, you can associate it with a list of possible authorization scopes which are relevant to the resource. Set whatever actions make sense for the targeted resource type.

Though you may add any authorization scope to any resource, only the scopes supported by the resource type are considered for access control.

Policies for applying access permission

Policies are used to target permissions to one or more user accounts or service accounts. Targeting can refer to:

  • Specific user or service accounts
  • Realm roles or client roles
  • User groups

A policy is given a unique name and can be reused to target multiple permissions to multiple resources.

Permissions to grant access

Use fine-grained permissions to pull together the policies, resources, and authorization scopes that grant access to users.

The name of each permission should clearly define which permissions it grants to which users. For example, Dev Team B can read from topics starting with x.

17.4.3. Permissions for common Kafka operations

The following examples demonstrate the user permissions required for performing common operations on Kafka.

Create a topic

To create a topic, the Create permission is required for the specific topic, or for Cluster:kafka-cluster.

bin/kafka-topics.sh --create --topic my-topic \
  --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties

List topics

If a user has the Describe permission on a specified topic, the topic is listed.

bin/kafka-topics.sh --list \
  --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties

Display topic details

To display a topic’s details, Describe and DescribeConfigs permissions are required on the topic.

bin/kafka-topics.sh --describe --topic my-topic \
  --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties

Produce messages to a topic

To produce messages to a topic, Describe and Write permissions are required on the topic.

If the topic hasn’t been created yet, and topic auto-creation is enabled, the permissions to create a topic are required.

bin/kafka-console-producer.sh  --topic my-topic \
  --bootstrap-server my-cluster-kafka-bootstrap:9092 --producer.config=/tmp/config.properties

Consume messages from a topic

To consume messages from a topic, Describe and Read permissions are required on the topic. Consuming from the topic normally relies on storing the consumer offsets in a consumer group, which requires additional Describe and Read permissions on the consumer group.

Two resources are needed for matching. For example:

Topic:my-topic
Group:my-group-*
bin/kafka-console-consumer.sh --topic my-topic --group my-group-1 --from-beginning \
  --bootstrap-server my-cluster-kafka-bootstrap:9092 --consumer.config /tmp/config.properties

Produce messages to a topic using an idempotent producer

As well as the permissions for producing to a topic, an additional IdempotentWrite permission is required on the Cluster:kafka-cluster resource.

Two resources are needed for matching. For example:

Topic:my-topic
Cluster:kafka-cluster
bin/kafka-console-producer.sh  --topic my-topic \
  --bootstrap-server my-cluster-kafka-bootstrap:9092 --producer.config=/tmp/config.properties --producer-property enable.idempotence=true --request-required-acks -1

List consumer groups

When listing consumer groups, only the groups on which the user has the Describe permissions are returned. Alternatively, if the user has the Describe permission on the Cluster:kafka-cluster, all the consumer groups are returned.

bin/kafka-consumer-groups.sh --list \
  --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties

Display consumer group details

To display a consumer group’s details, the Describe permission is required on the group and the topics associated with the group.

bin/kafka-consumer-groups.sh --describe --group my-group-1 \
  --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties

Change topic configuration

To change a topic’s configuration, the Describe and Alter permissions are required on the topic.

bin/kafka-topics.sh --alter --topic my-topic --partitions 2 \
  --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties

Display Kafka broker configuration

In order to use kafka-configs.sh to get a broker’s configuration, the DescribeConfigs permission is required on the Cluster:kafka-cluster.

bin/kafka-configs.sh --entity-type brokers --entity-name 0 --describe --all \
  --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties

Change Kafka broker configuration

To change a Kafka broker’s configuration, DescribeConfigs and AlterConfigs permissions are required on Cluster:kafka-cluster.

bin/kafka-configs --entity-type brokers --entity-name 0 --alter --add-config log.cleaner.threads=2 \
  --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties

Delete a topic

To delete a topic, the Describe and Delete permissions are required on the topic.

bin/kafka-topics.sh --delete --topic my-topic \
  --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties

Select a lead partition

To run leader selection for topic partitions, the Alter permission is required on the Cluster:kafka-cluster.

bin/kafka-leader-election.sh --topic my-topic --partition 0 --election-type PREFERRED  /
  --bootstrap-server my-cluster-kafka-bootstrap:9092 --admin.config /tmp/config.properties

Reassign partitions

To generate a partition reassignment file, Describe permissions are required on the topics involved.

bin/kafka-reassign-partitions.sh --topics-to-move-json-file /tmp/topics-to-move.json --broker-list "0,1" --generate \
  --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config /tmp/config.properties > /tmp/partition-reassignment.json

To execute the partition reassignment, Describe and Alter permissions are required on Cluster:kafka-cluster. Also, Describe permissions are required on the topics involved.

bin/kafka-reassign-partitions.sh --reassignment-json-file /tmp/partition-reassignment.json --execute \
  --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config /tmp/config.properties

To verify partition reassignment, Describe, and AlterConfigs permissions are required on Cluster:kafka-cluster, and on each of the topics involved.

bin/kafka-reassign-partitions.sh --reassignment-json-file /tmp/partition-reassignment.json --verify \
  --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config /tmp/config.properties

17.4.4. Example: Setting up Red Hat build of Keycloak Authorization Services

If you are using OAuth 2.0 with Red Hat build of Keycloak for token-based authentication, you can also use Red Hat build of Keycloak to configure authorization rules to constrain client access to Kafka brokers. This example explains how to use Red Hat build of Keycloak Authorization Services with keycloak authorization. Set up Red Hat build of Keycloak Authorization Services to enforce access restrictions on Kafka clients. Red Hat build of Keycloak Authorization Services use authorization scopes, policies and permissions to define and apply access control to resources.

Red Hat build of Keycloak Authorization Services REST endpoints provide a list of granted permissions on resources for authenticated users. The list of grants (permissions) is fetched from the Red Hat build of Keycloak server as the first action after an authenticated session is established by the Kafka client. The list is refreshed in the background so that changes to the grants are detected. Grants are cached and enforced locally on the Kafka broker for each user session to provide fast authorization decisions.

Streams for Apache Kafka provides example configuration files. These include the following example files for setting up Red Hat build of Keycloak:

kafka-ephemeral-oauth-single-keycloak-authz.yaml
An example Kafka custom resource configured for OAuth 2.0 token-based authorization using Red Hat build of Keycloak. You can use the custom resource to deploy a Kafka cluster that uses keycloak authorization and token-based oauth authentication.
kafka-authz-realm.json
An example Red Hat build of Keycloak realm configured with sample groups, users, roles and clients. You can import the realm into a Red Hat build of Keycloak instance to set up fine-grained permissions to access Kafka.

If you want to try the example with Red Hat build of Keycloak, use these files to perform the tasks outlined in this section in the order shown.

Authentication

When you configure token-based oauth authentication, you specify a jwksEndpointUri as the URI for local JWT validation. When you configure keycloak authorization, you specify a tokenEndpointUri as the URI of the Red Hat build of Keycloak token endpoint. The hostname for both URIs must be the same.

Targeted permissions with group or role policies

In Red Hat build of Keycloak, confidential clients with service accounts enabled can authenticate to the server in their own name using a client ID and a secret. This is convenient for microservices that typically act in their own name, and not as agents of a particular user (like a web site). Service accounts can have roles assigned like regular users. They cannot, however, have groups assigned. As a consequence, if you want to target permissions to microservices using service accounts, you cannot use group policies, and should instead use role policies. Conversely, if you want to limit certain permissions only to regular user accounts where authentication with a username and password is required, you can achieve that as a side effect of using the group policies rather than the role policies. This is what is used in this example for permissions that start with ClusterManager. Performing cluster management is usually done interactively using CLI tools. It makes sense to require the user to log in before using the resulting access token to authenticate to the Kafka broker. In this case, the access token represents the specific user, rather than the client application.

17.4.4.1. Setting up permissions in Red Hat build of Keycloak

Set up Red Hat build of Keycloak, then connect to its Admin Console and add the preconfigured realm. Use the example kafka-authz-realm.json file to import the realm. You can check the authorization rules defined for the realm in the Admin Console. The rules grant access to the resources on the Kafka cluster configured to use the example Red Hat build of Keycloak realm.

Prerequisites

  • A running OpenShift cluster.
  • The Streams for Apache Kafka examples/security/keycloak-authorization/kafka-authz-realm.json file that contains the preconfigured realm.

Procedure

  1. Install the Red Hat build of Keycloak server using the Red Hat build of Keycloak Operator as described in Getting Started in the Red Hat build of Keycloak documentation.
  2. Wait until the Red Hat build of Keycloak instance is running.
  3. Get the external hostname to be able to access the Admin Console.

    NS=sso
    oc get ingress keycloak -n $NS

    In this example, we assume the Red Hat build of Keycloak server is running in the sso namespace.

  4. Get the password for the admin user.

    oc get -n $NS pod keycloak-0 -o yaml | less

    The password is stored as a secret, so get the configuration YAML file for the Red Hat build of Keycloak instance to identify the name of the secret (secretKeyRef.name).

  5. Use the name of the secret to obtain the clear text password.

    SECRET_NAME=credential-keycloak
    oc get -n $NS secret $SECRET_NAME -o yaml | grep PASSWORD | awk '{print $2}' | base64 -D

    In this example, we assume the name of the secret is credential-keycloak.

  6. Log in to the Admin Console with the username admin and the password you obtained.

    Use https://HOSTNAME to access the Kubernetes Ingress.

    You can now upload the example realm to Red Hat build of Keycloak using the Admin Console.

  7. Click Add Realm to import the example realm.
  8. Add the examples/security/keycloak-authorization/kafka-authz-realm.json file, and then click Create.

    You now have kafka-authz as your current realm in the Admin Console.

    The default view displays the Master realm.

  9. In the Red Hat build of Keycloak Admin Console, go to Clients > kafka > Authorization > Settings and check that Decision Strategy is set to Affirmative.

    An affirmative policy means that at least one policy must be satisfied for a client to access the Kafka cluster.

  10. In the Red Hat build of Keycloak Admin Console, go to Groups, Users, Roles and Clients to view the realm configuration.

    Groups
    Groups organize users and set permissions. Groups can be linked to an LDAP identity provider and used to compartmentalize users, such as by region or department. Users can be added to groups through a custom LDAP interface to manage permissions for Kafka resources. For more information on using KeyCloak’s LDAP provider, see the guide to Keycloak Server Administration.
    Users
    Users define individual users. In this example, alice (in the ClusterManager group) and bob (in ClusterManager-my-cluster) are created. Users can also be stored in an LDAP identity provider.
    Roles
    Roles assign specific permissions to users or clients. Roles function like tags to give users certain rights. While roles cannot be stored in LDAP, you can add Red Hat build of Keycloak roles to groups to combine both roles and permissions.
    Clients

    Clients define configurations for Kafka interactions.

    • The kafka client handles OAuth 2.0 token validation for brokers and contains authorization policies (which require authorization to be enabled).
    • The kafka-cli client is used by command line tools to obtain access or refresh tokens.
    • team-a-client and team-b-client represent services with partial access to specific Kafka topics.
  11. In the Red Hat build of Keycloak Admin Console, go to Authorization > Permissions to see the granted permissions that use the resources and policies defined for the realm.

    For example, the kafka client has the following permissions:

    Dev Team A can write to topics that start with x_ on any cluster
    Dev Team B can read from topics that start with x_ on any cluster
    Dev Team B can update consumer group offsets that start with x_ on any cluster
    ClusterManager of my-cluster Group has full access to cluster config on my-cluster
    ClusterManager of my-cluster Group has full access to consumer groups on my-cluster
    ClusterManager of my-cluster Group has full access to topics on my-cluster
    Dev Team A
    The Dev Team A realm role can write to topics that start with x_ on any cluster. This combines a resource called Topic:x_*, Describe and Write scopes, and the Dev Team A policy. The Dev Team A policy matches all users that have a realm role called Dev Team A.
    Dev Team B
    The Dev Team B realm role can read from topics that start with x_ on any cluster. This combines Topic:x_*, Group:x_* resources, Describe and Read scopes, and the Dev Team B policy. The Dev Team B policy matches all users that have a realm role called Dev Team B. Matching users and clients have the ability to read from topics, and update the consumed offsets for topics and consumer groups that have names starting with x_.

17.4.4.2. Deploying a Kafka cluster with Red Hat build of Keycloak authorization

Deploy a Kafka cluster configured to connect to the Red Hat build of Keycloak server. Use the example kafka-ephemeral-oauth-single-keycloak-authz.yaml file to deploy the Kafka cluster as a Kafka custom resource. The example deploys a single-node Kafka cluster with keycloak authorization and oauth authentication.

Prerequisites

  • The Red Hat build of Keycloak authorization server is deployed to your OpenShift cluster and loaded with the example realm.
  • The Cluster Operator is deployed to your OpenShift cluster.
  • The Streams for Apache Kafka examples/security/keycloak-authorization/kafka-ephemeral-oauth-single-keycloak-authz.yaml custom resource.

Procedure

  1. Use the hostname of the Red Hat build of Keycloak instance you deployed to prepare a truststore certificate for Kafka brokers to communicate with the Red Hat build of Keycloak server.

    SSO_HOST=SSO-HOSTNAME
    SSO_HOST_PORT=$SSO_HOST:443
    STOREPASS=storepass
    
    echo "Q" | openssl s_client -showcerts -connect $SSO_HOST_PORT 2>/dev/null | awk ' /BEGIN CERTIFICATE/,/END CERTIFICATE/ { print $0 } ' > /tmp/sso.pem

    The certificate is required as Kubernetes Ingress is used to make a secure (HTTPS) connection.

    Usually there is not one single certificate, but a certificate chain. You only have to provide the top-most issuer CA, which is listed last in the /tmp/sso.pem file. You can extract it manually or using the following commands:

    Example command to extract the top CA certificate in a certificate chain

    split -p "-----BEGIN CERTIFICATE-----" sso.pem sso-
    for f in $(ls sso-*); do mv $f $f.pem; done
    cp $(ls sso-* | sort -r | head -n 1) sso-ca.crt

    Note

    A trusted CA certificate is normally obtained from a trusted source, and not by using the openssl command.

  2. Deploy the certificate to OpenShift as a secret.

    oc create secret generic oauth-server-cert --from-file=/tmp/sso-ca.crt -n $NS
  3. Set the hostname as an environment variable

    SSO_HOST=SSO-HOSTNAME
  4. Create and deploy the example Kafka cluster.

    cat examples/security/keycloak-authorization/kafka-ephemeral-oauth-single-keycloak-authz.yaml | sed -E 's#\${SSO_HOST}'"#$SSO_HOST#" | oc create -n $NS -f -

17.4.4.3. Preparing TLS connectivity for a CLI Kafka client session

Create a new pod for an interactive CLI session. Set up a truststore with a Red Hat build of Keycloak certificate for TLS connectivity. The truststore is to connect to Red Hat build of Keycloak and the Kafka broker.

Prerequisites

  • The Red Hat build of Keycloak authorization server is deployed to your OpenShift cluster and loaded with the example realm.

    In the Red Hat build of Keycloak Admin Console, check the roles assigned to the clients are displayed in Clients > Service Account Roles.

  • The Kafka cluster configured to connect with Red Hat build of Keycloak is deployed to your OpenShift cluster.

Procedure

  1. Run a new interactive pod container using the Kafka image to connect to a running Kafka broker.

    NS=sso
    oc run -ti --restart=Never --image=registry.redhat.io/amq-streams/kafka-39-rhel9:2.9.0 kafka-cli -n $NS -- /bin/sh
    Note

    If oc times out waiting on the image download, subsequent attempts may result in an AlreadyExists error.

  2. Attach to the pod container.

    oc attach -ti kafka-cli -n $NS
  3. Use the hostname of the Red Hat build of Keycloak instance to prepare a certificate for client connection using TLS.

    SSO_HOST=SSO-HOSTNAME
    SSO_HOST_PORT=$SSO_HOST:443
    STOREPASS=storepass
    
    echo "Q" | openssl s_client -showcerts -connect $SSO_HOST_PORT 2>/dev/null | awk ' /BEGIN CERTIFICATE/,/END CERTIFICATE/ { print $0 } ' > /tmp/sso.pem

    Usually there is not one single certificate, but a certificate chain. You only have to provide the top-most issuer CA, which is listed last in the /tmp/sso.pem file. You can extract it manually or using the following command:

    Example command to extract the top CA certificate in a certificate chain

    split -p "-----BEGIN CERTIFICATE-----" sso.pem sso-
    for f in $(ls sso-*); do mv $f $f.pem; done
    cp $(ls sso-* | sort -r | head -n 1) sso-ca.crt

    Note

    A trusted CA certificate is normally obtained from a trusted source, and not by using the openssl command.

  4. Create a truststore for TLS connection to the Kafka brokers.

    keytool -keystore /tmp/truststore.p12 -storetype pkcs12 -alias sso -storepass $STOREPASS -import -file /tmp/sso-ca.crt -noprompt
  5. Use the Kafka bootstrap address as the hostname of the Kafka broker and the tls listener port (9093) to prepare a certificate for the Kafka broker.

    KAFKA_HOST_PORT=my-cluster-kafka-bootstrap:9093
    STOREPASS=storepass
    
    echo "Q" | openssl s_client -showcerts -connect $KAFKA_HOST_PORT 2>/dev/null | awk ' /BEGIN CERTIFICATE/,/END CERTIFICATE/ { print $0 } ' > /tmp/my-cluster-kafka.pem

    The obtained .pem file is usually not one single certificate, but a certificate chain. You only have to provide the top-most issuer CA, which is listed last in the /tmp/my-cluster-kafka.pem file. You can extract it manually or using the following command:

    Example command to extract the top CA certificate in a certificate chain

    split -p "-----BEGIN CERTIFICATE-----" /tmp/my-cluster-kafka.pem kafka-
    for f in $(ls kafka-*); do mv $f $f.pem; done
    cp $(ls kafka-* | sort -r | head -n 1) my-cluster-kafka-ca.crt

    Note

    A trusted CA certificate is normally obtained from a trusted source, and not by using the openssl command. For this example we assume the client is running in a pod in the same namespace where the Kafka cluster was deployed. If the client is accessing the Kafka cluster from outside the OpenShift cluster, you would have to first determine the bootstrap address. In that case you can also get the cluster certificate directly from the OpenShift secret, and there is no need for openssl. For more information, see Chapter 15, Setting up client access to a Kafka cluster.

  6. Add the certificate for the Kafka broker to the truststore.

    keytool -keystore /tmp/truststore.p12 -storetype pkcs12 -alias my-cluster-kafka -storepass $STOREPASS -import -file /tmp/my-cluster-kafka-ca.crt -noprompt

    Keep the session open to check authorized access.

17.4.4.4. Checking authorized access to Kafka using a CLI Kafka client session

Check the authorization rules applied through the Red Hat build of Keycloak realm using an interactive CLI session. Apply the checks using Kafka’s example producer and consumer clients to create topics with user and service accounts that have different levels of access.

Use the team-a-client and team-b-client clients to check the authorization rules. Use the alice admin user to perform additional administrative tasks on Kafka.

The Kafka image used in this example contains Kafka producer and consumer binaries.

Prerequisites

Setting up client and admin user configuration

  1. Prepare a Kafka configuration file with authentication properties for the team-a-client client.

    SSO_HOST=SSO-HOSTNAME
    
    cat > /tmp/team-a-client.properties << EOF
    security.protocol=SASL_SSL
    ssl.truststore.location=/tmp/truststore.p12
    ssl.truststore.password=$STOREPASS
    ssl.truststore.type=PKCS12
    sasl.mechanism=OAUTHBEARER
    sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
      oauth.client.id="team-a-client" \
      oauth.client.secret="team-a-client-secret" \
      oauth.ssl.truststore.location="/tmp/truststore.p12" \
      oauth.ssl.truststore.password="$STOREPASS" \
      oauth.ssl.truststore.type="PKCS12" \
      oauth.token.endpoint.uri="https://$SSO_HOST/realms/kafka-authz/protocol/openid-connect/token" ;
    sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
    EOF

    The SASL OAUTHBEARER mechanism is used. This mechanism requires a client ID and client secret, which means the client first connects to the Red Hat build of Keycloak server to obtain an access token. The client then connects to the Kafka broker and uses the access token to authenticate.

  2. Prepare a Kafka configuration file with authentication properties for the team-b-client client.

    cat > /tmp/team-b-client.properties << EOF
    security.protocol=SASL_SSL
    ssl.truststore.location=/tmp/truststore.p12
    ssl.truststore.password=$STOREPASS
    ssl.truststore.type=PKCS12
    sasl.mechanism=OAUTHBEARER
    sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
      oauth.client.id="team-b-client" \
      oauth.client.secret="team-b-client-secret" \
      oauth.ssl.truststore.location="/tmp/truststore.p12" \
      oauth.ssl.truststore.password="$STOREPASS" \
      oauth.ssl.truststore.type="PKCS12" \
      oauth.token.endpoint.uri="https://$SSO_HOST/realms/kafka-authz/protocol/openid-connect/token" ;
    sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
    EOF
  3. Authenticate admin user alice by using curl and performing a password grant authentication to obtain a refresh token.

    USERNAME=alice
    PASSWORD=alice-password
    
    GRANT_RESPONSE=$(curl -X POST "https://$SSO_HOST/realms/kafka-authz/protocol/openid-connect/token" -H 'Content-Type: application/x-www-form-urlencoded' -d "grant_type=password&username=$USERNAME&password=$PASSWORD&client_id=kafka-cli&scope=offline_access" -s -k)
    
    REFRESH_TOKEN=$(echo $GRANT_RESPONSE | awk -F "refresh_token\":\"" '{printf $2}' | awk -F "\"" '{printf $1}')

    The refresh token is an offline token that is long-lived and does not expire.

  4. Prepare a Kafka configuration file with authentication properties for the admin user alice.

    cat > /tmp/alice.properties << EOF
    security.protocol=SASL_SSL
    ssl.truststore.location=/tmp/truststore.p12
    ssl.truststore.password=$STOREPASS
    ssl.truststore.type=PKCS12
    sasl.mechanism=OAUTHBEARER
    sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
      oauth.refresh.token="$REFRESH_TOKEN" \
      oauth.client.id="kafka-cli" \
      oauth.ssl.truststore.location="/tmp/truststore.p12" \
      oauth.ssl.truststore.password="$STOREPASS" \
      oauth.ssl.truststore.type="PKCS12" \
      oauth.token.endpoint.uri="https://$SSO_HOST/realms/kafka-authz/protocol/openid-connect/token" ;
    sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
    EOF

    The kafka-cli public client is used for the oauth.client.id in the sasl.jaas.config. Since it’s a public client it does not require a secret. The client authenticates with the refresh token that was authenticated in the previous step. The refresh token requests an access token behind the scenes, which is then sent to the Kafka broker for authentication.

Producing messages with authorized access

Use the team-a-client configuration to check that you can produce messages to topics that start with a_ or x_.

  1. Write to topic my-topic.

    bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic my-topic \
      --producer.config=/tmp/team-a-client.properties
    First message

    This request returns a Not authorized to access topics: [my-topic] error.

    team-a-client has a Dev Team A role that gives it permission to perform any supported actions on topics that start with a_, but can only write to topics that start with x_. The topic named my-topic matches neither of those rules.

  2. Write to topic a_messages.

    bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic a_messages \
      --producer.config /tmp/team-a-client.properties
    First message
    Second message

    Messages are produced to Kafka successfully.

  3. Press CTRL+C to exit the CLI application.
  4. Check the Kafka container log for a debug log of Authorization GRANTED for the request.

    oc logs my-cluster-kafka-0 -f -n $NS

Consuming messages with authorized access

Use the team-a-client configuration to consume messages from topic a_messages.

  1. Fetch messages from topic a_messages.

    bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic a_messages \
      --from-beginning --consumer.config /tmp/team-a-client.properties

    The request returns an error because the Dev Team A role for team-a-client only has access to consumer groups that have names starting with a_.

  2. Update the team-a-client properties to specify the custom consumer group it is permitted to use.

    bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic a_messages \
      --from-beginning --consumer.config /tmp/team-a-client.properties --group a_consumer_group_1

    The consumer receives all the messages from the a_messages topic.

Administering Kafka with authorized access

The team-a-client is an account without any cluster-level access, but it can be used with some administrative operations.

  1. List topics.

    bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/team-a-client.properties --list

    The a_messages topic is returned.

  2. List consumer groups.

    bin/kafka-consumer-groups.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/team-a-client.properties --list

    The a_consumer_group_1 consumer group is returned.

    Fetch details on the cluster configuration.

    bin/kafka-configs.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/team-a-client.properties \
      --entity-type brokers --describe --entity-default

    The request returns an error because the operation requires cluster level permissions that team-a-client does not have.

Using clients with different permissions

Use the team-b-client configuration to produce messages to topics that start with b_.

  1. Write to topic a_messages.

    bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic a_messages \
      --producer.config /tmp/team-b-client.properties
    Message 1

    This request returns a Not authorized to access topics: [a_messages] error.

  2. Write to topic b_messages.

    bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic b_messages \
      --producer.config /tmp/team-b-client.properties
    Message 1
    Message 2
    Message 3

    Messages are produced to Kafka successfully.

  3. Write to topic x_messages.

    bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \
      --producer.config /tmp/team-b-client.properties
    Message 1

    A Not authorized to access topics: [x_messages] error is returned, The team-b-client can only read from topic x_messages.

  4. Write to topic x_messages using team-a-client.

    bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \
      --producer.config /tmp/team-a-client.properties
    Message 1

    This request returns a Not authorized to access topics: [x_messages] error. The team-a-client can write to the x_messages topic, but it does not have a permission to create a topic if it does not yet exist. Before team-a-client can write to the x_messages topic, an admin power user must create it with the correct configuration, such as the number of partitions and replicas.

Managing Kafka with an authorized admin user

Use admin user alice to manage Kafka. alice has full access to manage everything on any Kafka cluster.

  1. Create the x_messages topic as alice.

    bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/alice.properties \
      --topic x_messages --create --replication-factor 1 --partitions 1

    The topic is created successfully.

  2. List all topics as alice.

    bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/alice.properties --list
    bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/team-a-client.properties --list
    bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/team-b-client.properties --list

    Admin user alice can list all the topics, whereas team-a-client and team-b-client can only list the topics they have access to.

    The Dev Team A and Dev Team B roles both have Describe permission on topics that start with x_, but they cannot see the other team’s topics because they do not have Describe permissions on them.

  3. Use the team-a-client to produce messages to the x_messages topic:

    bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \
      --producer.config /tmp/team-a-client.properties
    Message 1
    Message 2
    Message 3

    As alice created the x_messages topic, messages are produced to Kafka successfully.

  4. Use the team-b-client to produce messages to the x_messages topic.

    bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \
      --producer.config /tmp/team-b-client.properties
    Message 4
    Message 5

    This request returns a Not authorized to access topics: [x_messages] error.

  5. Use the team-b-client to consume messages from the x_messages topic:

    bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \
      --from-beginning --consumer.config /tmp/team-b-client.properties --group x_consumer_group_b

    The consumer receives all the messages from the x_messages topic.

  6. Use the team-a-client to consume messages from the x_messages topic.

    bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \
      --from-beginning --consumer.config /tmp/team-a-client.properties --group x_consumer_group_a

    This request returns a Not authorized to access topics: [x_messages] error.

  7. Use the team-a-client to consume messages from a consumer group that begins with a_.

    bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \
      --from-beginning --consumer.config /tmp/team-a-client.properties --group a_consumer_group_a

    This request returns a Not authorized to access topics: [x_messages] error.

    Dev Team A has no Read access on topics that start with a x_.

  8. Use alice to produce messages to the x_messages topic.

    bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \
      --from-beginning --consumer.config /tmp/alice.properties

    Messages are produced to Kafka successfully.

    alice can read from or write to any topic.

  9. Use alice to read the cluster configuration.

    bin/kafka-configs.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/alice.properties \
      --entity-type brokers --describe --entity-default

    The cluster configuration for this example is empty.

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.