このコンテンツは選択した言語では利用できません。

Chapter 7. Enabling OAuth 2.0 token-based access


Streams for Apache Kafka supports OAuth 2.0 for securing Kafka clusters by integrating with an OAUth 2.0 authorization server. Kafka brokers and clients both need to be configured to use OAuth 2.0.

OAuth 2.0 enables standardized token-based authentication and authorization between applications, using a central authorization server to issue tokens that grant limited access to resources. You can define specific scopes for fine-grained access control. Scopes correspond to different levels of access to Kafka topics or operations within the cluster.

OAuth 2.0 also supports single sign-on and integration with identity providers.

7.1. Configuring an OAuth 2.0 authorization server

Before you can use OAuth 2.0 token-based access, you must configure an authorization server for integration with Streams for Apache Kafka. The steps are dependent on the chosen authorization server. Consult the product documentation for the authorization server for information on how to set up OAuth 2.0 access.

Prepare the authorization server to work with Streams for Apache Kafka by defining OAUth 2.0 clients for Kafka and each Kafka client component of your application. In relation to the authorization server, the Kafka cluster and Kafka clients are both regarded as OAuth 2.0 clients.

In general, configure OAuth 2.0 clients in the authorization server with the following client credentials enabled:

  • Client ID (for example, kafka for the Kafka cluster)
  • Client ID and secret as the authentication mechanism
Note

You only need to use a client ID and secret when using a non-public introspection endpoint of the authorization server. The credentials are not typically required when using public authorization server endpoints, as with fast local JWT token validation.

7.2. Using OAuth 2.0 token-based authentication

Streams for Apache Kafka supports the use of OAuth 2.0 for token-based authentication. An OAuth 2.0 authorization server handles the granting of access and inquiries about access. Kafka clients authenticate to Kafka brokers. Brokers and clients communicate with the authorization server, as necessary, to obtain or validate access tokens.

For a deployment of Streams for Apache Kafka, OAuth 2.0 integration provides the following support:

  • Server-side OAuth 2.0 authentication for Kafka brokers
  • Client-side OAuth 2.0 authentication for Kafka MirrorMaker, Kafka Connect, and the Kafka Bridge

Streams for Apache Kafka on RHEL includes two OAuth 2.0 libraries:

kafka-oauth-client
Provides a custom login callback handler class named io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler. To handle the OAUTHBEARER authentication mechanism, use the login callback handler with the OAuthBearerLoginModule provided by Apache Kafka.
kafka-oauth-common
A helper library that provides some of the functionality needed by the kafka-oauth-client library.

The provided client libraries also have dependencies on some additional third-party libraries, such as: keycloak-core, jackson-databind, and slf4j-api.

We recommend using a Maven project to package your client to ensure that all the dependency libraries are included. Dependency libraries might change in future versions.

Additional resources

7.2.1. Configuring OAuth 2.0 authentication on listeners

To secure Kafka brokers with OAuth 2.0 authentication, configure a Kafka listener to use OAUth 2.0 authentication and a client authentication mechanism in the Kafka server.properties file, and add further configuration depending on the authentication mechanism and type of token validation used in the authentication.

A minimum configuration is required. You can also configure a TLS listener, where TLS is used for inter-broker communication. We recommend using OAuth 2.0 authentication together with TLS encryption. Without encryption, the connection is vulnerable to network eavesdropping and unauthorized access through token theft.

When you have defined the type of authentication as OAuth 2.0, you add configuration based on the type of validation, either as fast local JWT validation or token validation using an introspection endpoint.

Enabling SASL authentication mechanisms

Use one or both of the following SASL mechanisms for clients to exchange credentials and establish authenticated sessions with Kafka.

OAUTHBEARER

Using the OAUTHBEARER authentication mechanism, credentials exchange uses a bearer token provided by an OAuth callback handler. Token provision can be configured to use the following methods:

  • Client ID and secret (using the OAuth 2.0 client credentials mechanism)
  • Client ID and client assertion
  • Long-lived access token
  • Long-lived refresh token obtained manually

OAUTHBEARER is recommended as it provides a higher level of security than PLAIN, though it can only be used by Kafka clients that support the OAUTHBEARER mechanism at the protocol level. Client credentials are never shared with Kafka.

PLAIN

PLAIN is a simple authentication mechanism used by all Kafka client tools. Consider using PLAIN only with Kafka clients that do not support OAUTHBEARER. Using the PLAIN authentication mechanism, credentials exchange can be configured to use the following methods:

  • Client ID and secret (using the OAuth 2.0 client credentials mechanism)
  • Long-lived access token
    Regardless of the method used, the client must provide username and password properties to Kafka.

Credentials are handled centrally behind a compliant authorization server, similar to how OAUTHBEARER authentication is used. The username extraction process depends on the authorization server configuration.

Example listener configuration for the OAUTHBEARER mechanism

sasl.enabled.mechanisms=OAUTHBEARER 1
listeners=CLIENT://0.0.0.0:9092 2
listener.security.protocol.map=CLIENT:SASL_PLAINTEXT 3
listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER 4
sasl.mechanism.inter.broker.protocol=OAUTHBEARER 5
inter.broker.listener.name=CLIENT 6
listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler 7
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
  # ...

1
Enables the OAUTHBEARER mechanism for credentials exchange over SASL.
2
Configures a listener for client applications to connect to. The system hostname is used as an advertised hostname, which clients must resolve in order to reconnect. The listener is named CLIENT in this example.
3
Specifies the channel protocol for the listener. SASL_SSL is for TLS. SASL_PLAINTEXT is used for an unencrypted connection (no TLS), but there is risk of eavesdropping and interception at the TCP connection layer.
4
Specifies the OAUTHBEARER mechanism for the CLIENT listener. The client name (CLIENT) is usually specified in uppercase in the listeners property, in lowercase for listener.name properties (listener.name.client), and in lowercase when part of a listener.name.client.* property.
5
Specifies the OAUTHBEARER mechanism for inter-broker communication.
6
Specifies the listener for inter-broker communication. The specification is required for the configuration to be valid.
7
Configures OAuth 2.0 authentication on the client listener.

Configuring OAuth 2.0 with properties or variables

Configure OAuth 2.0 settings using Java Authentication and Authorization Service (JAAS) properties or environment variables.

  • JAAS properties are configured in the server.properties configuration file, and passed as key-values pairs of the listener.name.<listener_name>.oauthbearer.sasl.jaas.config property.
  • If using environment variables, you still need to provide the listener.name.<listener_name>.oauthbearer.sasl.jaas.config property in the server.properties file, but you can omit the other JAAS properties.

    You can use capitalized or upper-case environment variable naming conventions.

The Streams for Apache Kafka OAuth 2.0 libraries use properties that start with:

Configuring fast local JWT token validation

Fast local JWT token validation involves checking a JWT token signature locally to ensure that the token meets the following criteria:

  • Contains a typ (type) or token_type header claim value of Bearer to indicate it is an access token
  • Is currently valid and not expired
  • Has an issuer that matches a validIssuerURI

You specify a validIssuerURI attribute when you configure the listener, so that any tokens not issued by the authorization server are rejected.

The authorization server does not need to be contacted during fast local JWT token validation. You activate fast local JWT token validation by specifying a jwksEndpointUri attribute, the endpoint exposed by the OAuth 2.0 authorization server. The endpoint contains the public keys used to validate signed JWT tokens, which are sent as credentials by Kafka clients.

All communication with the authorization server should be performed using TLS encryption. You can configure a certificate truststore and point to the truststore file.

You might want to configure a userNameClaim to properly extract a username from the JWT token. If required, you can use a JsonPath expression like "['user.info'].['user.id']" to retrieve the username from nested JSON attributes within a token.

If you want to use Kafka ACL authorization, identify the user by their username during authentication. (The sub claim in JWT tokens is typically a unique ID, not a username.)

Example configuration for fast local JWT token validation

# ...
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ 1
  oauth.valid.issuer.uri="https://<auth_server_address>/<issuer-context>" \ 2
  oauth.jwks.endpoint.uri="https://<oauth_server_address>/<path_to_jwks_endpoint>" \ 3
  oauth.jwks.refresh.seconds="300" \ 4
  oauth.jwks.refresh.min.pause.seconds="1" \ 5
  oauth.jwks.expiry.seconds="360" \ 6
  oauth.username.claim="preferred_username" \ 7
  oauth.ssl.truststore.location="<path_to_truststore_p12_file>" \ 8
  oauth.ssl.truststore.password="<truststore_password>" \ 9
  oauth.ssl.truststore.type="PKCS12" ; 10
listener.name.client.oauthbearer.connections.max.reauth.ms=3600000 11

1
Configures the CLIENT listener for OAuth 2.0. Connectivity with the authorization server should use secure HTTPS connections.
2
A valid issuer URI. Only access tokens issued by this issuer will be accepted. (Always required.)
3
The JWKS endpoint URL.
4
The period between endpoint refreshes (default 300).
5
The minimum pause in seconds between consecutive attempts to refresh JWKS public keys. When an unknown signing key is encountered, the JWKS keys refresh is scheduled outside the regular periodic schedule with at least the specified pause since the last refresh attempt. The refreshing of keys follows the rule of exponential backoff, retrying on unsuccessful refreshes with ever increasing pause, until it reaches oauth.jwks.refresh.seconds. The default value is 1.
6
The duration the JWKs certificates are considered valid before they expire. Default is 360 seconds. If you specify a longer time, consider the risk of allowing access to revoked certificates.
7
The token claim (or key) that contains the actual user name in the token. The user name is the principal used to identify the user. The value will depend on the authentication flow and the authorization server used. If required, you can use a JsonPath expression like "['user.info'].['user.id']" to retrieve the username from nested JSON attributes within a token.
8
The location of the truststore used in the TLS configuration.
9
Password to access the truststore.
10
The truststore type in PKCS #12 format.
11
(Optional) Enforces session expiry when a token expires, and also activates the Kafka re-authentication mechanism. If the specified value is less than the time left for the access token to expire, then the client will have to re-authenticate before the actual token expiry. By default, the session does not expire when the access token expires, and the client does not attempt re-authentication.

Configuring token validation using an introspection endpoint

Token validation using an OAuth 2.0 introspection endpoint treats a received access token as opaque. The Kafka broker sends an access token to the introspection endpoint, which responds with the token information necessary for validation. Importantly, it returns up-to-date information if the specific access token is valid, and also information about when the token expires.

To configure OAuth 2.0 introspection-based validation, you specify an introspection endpoint URI rather than the JWKs endpoint URI specified for fast local JWT token validation. Depending on the authorization server, you typically have to specify a client ID and client secret, because the introspection endpoint is usually protected.

Example token validation configuration using an introspection endpoint

# ...
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  oauth.introspection.endpoint.uri="https://<oauth_server_address>/<introspection_endpoint>" \ 1
  oauth.client.id="kafka-broker" \ 2
  oauth.client.secret="kafka-broker-secret" \ 3
  oauth.ssl.truststore.location="<path_to_truststore_p12_file>" \ 4
  oauth.ssl.truststore.password="<truststore_password>" \ 5
  oauth.ssl.truststore.type="PKCS12" \ 6
  oauth.username.claim="preferred_username" ; 7

1
URI of the token introspection endpoint.
2
Client ID of the Kafka broker.
3
Secret for the Kafka broker.
4
The location of the truststore used in the TLS configuration.
5
Password to access the truststore.
6
The truststore type in PKCS #12 format.
7
The token claim (or key) that contains the actual user name in the token. The user name is the principal used to identify the user. The value will depend on the authentication flow and the authorization server used. If required, you can use a JsonPath expression like "['user.info'].['user.id']" to retrieve the username from nested JSON attributes within a token.

Authenticating brokers to the authorization server protected endpoints

Usually, the certificates endpoint of the authorization server (oauth.jwks.endpoint.uri) is publicly accessible, while the introspection endpoint (oauth.introspection.endpoint.uri) is protected. However, this may vary depending on the authorization server configuration.

The Kafka broker can authenticate to the authorization server’s protected endpoints in one of two ways using HTTP authentication schemes:

  • HTTP Basic authentication uses a client ID and secret.
  • HTTP Bearer authentication uses a bearer token.

To configure HTTP Basic authentication, set the following properties:

  • oauth.client.id
  • oauth.client.secret

For HTTP Bearer authentication, set one of the following properties:

  • oauth.server.bearer.token.location to specify the file path on disk containing the bearer token.
  • oauth.server.bearer.token to specify the bearer token in clear text.

Including additional configuration options

Specify additional settings depending on the authentication requirements and the authorization server you are using. Some of these properties apply only to certain authentication mechanisms or when used in combination with other properties.

For example, when using OAUth over PLAIN, access tokens are passed as password property values with or without an $accessToken: prefix.

  • If you configure a token endpoint (oauth.token.endpoint.uri) in the listener configuration, you need the prefix.
  • If you don’t configure a token endpoint in the listener configuration, you don’t need the prefix. The Kafka broker interprets the password as a raw access token.

If the password is set as the access token, the username must be set to the same principal name that the Kafka broker obtains from the access token. You can specify username extraction options in your listener using the oauth.username.claim, oauth.username.prefix, oauth.fallback.username.claim, oauth.fallback.username.prefix, and oauth.userinfo.endpoint.uri properties. The username extraction process also depends on your authorization server; in particular, how it maps client IDs to account names.

Note

The PLAIN mechanism does not support password grant authentication. Use either client credentials (client ID + secret) or an access token for authentication.

Example additional configuration settings

listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  # ...
  oauth.token.endpoint.uri="https://<auth_server_address>/<path_to_token_endpoint>" \ 1
  oauth.custom.claim.check="@.custom == 'custom-value'" \ 2
  oauth.scope="<scope>" \ 3
  oauth.check.audience="true" \ 4
  oauth.audience="<audience>" \ 5
  oauth.client.id="kafka-broker" \ 6
  oauth.client.secret="kafka-broker-secret" \ 7
  oauth.connect.timeout.seconds=60 \ 8
  oauth.read.timeout.seconds=60 \ 9
  oauth.http.retries=2 \ 10
  oauth.http.retry.pause.millis=300 \ 11
  oauth.groups.claim="$.groups" \ 12
  oauth.groups.claim.delimiter="," \ 13
  oauth.include.accept.header="false" ; 14
  oauth.check.issuer=false \ 15
  oauth.username.prefix="user-account-" \ 16
  oauth.fallback.username.claim="client_id" \ 17
  oauth.fallback.username.prefix="service-account-" \ 18
  oauth.valid.token.type="bearer" \ 19
  oauth.userinfo.endpoint.uri="https://<auth_server_address>/<path_to_userinfo_endpoint>" ; 20

1
The OAuth 2.0 token endpoint URL to your authorization server. For production, always use https:// urls. Required when KeycloakAuthorizer is used, or an OAuth 2.0 enabled listener is used for inter-broker communication.
2
(Optional) Custom claim checking. A JsonPath filter query that applies additional custom rules to the JWT access token during validation. If the access token does not contain the necessary data, it is rejected. When using the introspection endpoint method, the custom check is applied to the introspection endpoint response JSON.
3
(Optional) A scope parameter passed to the token endpoint. A scope is used when obtaining an access token for inter-broker authentication. It is also used in the name of a client for OAuth 2.0 over PLAIN client authentication using a clientId and secret. This only affects the ability to obtain the token, and the content of the token, depending on the authorization server. It does not affect token validation rules by the listener.
4
(Optional) Audience checking. If your authorization server provides an aud (audience) claim, and you want to enforce an audience check, set ouath.check.audience to true. Audience checks identify the intended recipients of tokens. As a result, the Kafka broker will reject tokens that do not have its clientId in their aud claims. Default is false.
5
(Optional) An audience parameter passed to the token endpoint. An audience is used when obtaining an access token for inter-broker authentication. It is also used in the name of a client for OAuth 2.0 over PLAIN client authentication using a clientId and secret. This only affects the ability to obtain the token, and the content of the token, depending on the authorization server. It does not affect token validation rules by the listener.
6
The configured client ID of the Kafka broker, which is the same for all brokers. This is the client registered with the authorization server as kafka-broker. Required when an introspection endpoint is used for token validation, or when KeycloakAuthorizer is used.
7
The configured secret for the Kafka broker, which is the same for all brokers. When the broker must authenticate to the authorization server, either a client secret, access token or a refresh token has to be specified.
8
(Optional) The connect timeout in seconds when connecting to the authorization server. The default value is 60.
9
(Optional) The read timeout in seconds when connecting to the authorization server. The default value is 60.
10
The maximum number of times to retry a failed HTTP request to the authorization server. The default value is 0, meaning that no retries are performed. To use this option effectively, consider reducing the timeout times for the oauth.connect.timeout.seconds and oauth.read.timeout.seconds options. However, note that retries may prevent the current worker thread from being available to other requests, and if too many requests stall, it could make the Kafka broker unresponsive.
11
The time to wait before attempting another retry of a failed HTTP request to the authorization server. By default, this time is set to zero, meaning that no pause is applied. This is because many issues that cause failed requests are per-request network glitches or proxy issues that can be resolved quickly. However, if your authorization server is under stress or experiencing high traffic, you may want to set this option to a value of 100 ms or more to reduce the load on the server and increase the likelihood of successful retries.
12
A JsonPath query used to extract groups information from JWT token or introspection endpoint response. Not set by default. This can be used by a custom authorizer to make authorization decisions based on user groups.
13
A delimiter used to parse groups information when returned as a single delimited string. The default value is ',' (comma).
14
(Optional) Sets oauth.include.accept.header to false to remove the Accept header from requests. You can use this setting if including the header is causing issues when communicating with the authorization server.
15
If your authorization server does not provide an iss claim, it is not possible to perform an issuer check. In this situation, set oauth.check.issuer to false and do not specify a oauth.valid.issuer.uri. Default is true.
16
The prefix used when constructing the user ID. This only takes effect if oauth.username.claim is configured.
17
An authorization server may not provide a single attribute to identify both regular users and clients. When a client authenticates in its own name, the server might provide a client ID attribute. When a user authenticates using a username and password, to obtain a refresh token or an access token, the server might provide a username attribute in addition to a client ID. Use this fallback option to specify the username claim (attribute) to use if a primary user ID attribute is not available. If required, you can use a JsonPath expression like "['client.info'].['client.id']" to retrieve the fallback username from nested JSON attributes within a token.
18
In situations where oauth.fallback.username.claim is applicable, it may also be necessary to prevent name collisions between the values of the username claim, and those of the fallback username claim. Consider a situation where a client called producer exists, but also a regular user called producer exists. In order to differentiate between the two, you can use this property to add a prefix to the user ID of the client.
19
(Only applicable when using oauth.introspection.endpoint.uri) Depending on the authorization server you are using, the introspection endpoint may or may not return the token type attribute, or it may contain different values. You can specify a valid token type value that the response from the introspection endpoint has to contain.
20
(Only applicable when using oauth.introspection.endpoint.uri) The authorization server may be configured or implemented in such a way to not provide any identifiable information in an introspection endpoint response. In order to obtain the user ID, you can configure the URI of the userinfo endpoint as a fallback. The oauth.username.claim, oauth.username.prefix, oauth.fallback.username.claim, and oauth.fallback.username.prefix settings are also applied to the response of the userinfo endpoint.

Configuring listeners for inter-broker communication

The following example uses the OAUTHBEARER mechanism for fast token validation in a minimum configuration where inter-broker communication goes through the same listener as application clients.

The oauth.client.id, oauth.client.secret, and auth.token.endpoint.uri properties relate to inter-broker communication.

Example inter-broker configuration using the OAUTHBEARER mechanism

sasl.enabled.mechanisms=OAUTHBEARER
listeners=CLIENT://0.0.0.0:9092
listener.security.protocol.map=CLIENT:SASL_PLAINTEXT
listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER
sasl.mechanism.inter.broker.protocol=OAUTHBEARER
inter.broker.listener.name=CLIENT
listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ 1
  oauth.valid.issuer.uri="https://<auth_server_address>/<issuer-context>" \
  oauth.jwks.endpoint.uri="https://<oauth_server_address>/<path_to_jwks_endpoint>" \
  oauth.username.claim="preferred_username"  \
  oauth.client.id="kafka-broker" \ 2
  oauth.client.secret="kafka-secret" \ 3
  oauth.token.endpoint.uri="https://<oauth_server_address>/<token_endpoint>" ; 4
listener.name.client.oauthbearer.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler 5
listener.name.client.oauthbearer.connections.max.reauth.ms=3600000

1
Configures authentication settings for client and inter-broker communication.
2
Client ID of the Kafka broker, which is the same for all brokers. This is the client registered with the authorization server as kafka-broker.
3
Secret for the Kafka broker, which is the same for all brokers.
4
The OAuth 2.0 token endpoint URL to your authorization server. For production, always use https:// urls.
5
Enables (and is only required for) OAuth 2.0 authentication for inter-broker communication.

The following example shows a minimum configuration for a TLS listener used for inter-broker communication.

Example inter-broker configuration configuration with TLS

sasl.enabled.mechanisms=OAUTHBEARER
listeners=REPLICATION://kafka:9091,CLIENT://kafka:9092 1
listener.security.protocol.map=REPLICATION:SSL,CLIENT:SASL_PLAINTEXT 2
listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER
inter.broker.listener.name=REPLICATION
listener.name.replication.ssl.keystore.password=<keystore_password> 3
listener.name.replication.ssl.truststore.password=<truststore_password>
listener.name.replication.ssl.keystore.type=JKS
listener.name.replication.ssl.truststore.type=JKS
listener.name.replication.ssl.secure.random.implementation=SHA1PRNG 4
listener.name.replication.ssl.endpoint.identification.algorithm=HTTPS 5
listener.name.replication.ssl.keystore.location=<path_to_keystore> 6
listener.name.replication.ssl.truststore.location=<path_to_truststore> 7
listener.name.replication.ssl.client.auth=required 8
listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  oauth.valid.issuer.uri="https://<auth_server_address>/<issuer-context>" \
  oauth.jwks.endpoint.uri="https://<oauth_server_address>/<path_to_jwks_endpoint>" \
  oauth.username.claim="preferred_username" ;

1
Separate configurations are required for inter-broker communication and client applications.
2
Configures the REPLICATION listener to use TLS, and the CLIENT listener to use SASL over an unencrypted channel. The client could use an encrypted channel (SASL_SSL) in a production environment.
3
The ssl. properties define the TLS configuration.
4
Random number generator implementation. If not set, the Java platform SDK default is used.
5
Hostname verification. If set to an empty string, the hostname verification is turned off. If not set, the default value is HTTPS, which enforces hostname verification for server certificates.
6
Path to the keystore for the listener.
7
Path to the truststore for the listener.
8
Specifies that clients of the REPLICATION listener have to authenticate with a client certificate when establishing a TLS connection (used for inter-broker connectivity).

The following example uses the PLAIN mechanism for fast token validation in a minimum configuration where inter-broker communication goes through the same listener as application clients.

Example inter-broker configuration configuration using the PLAIN mechanism

listeners=CLIENT://0.0.0.0:9092
listener.security.protocol.map=CLIENT:SASL_PLAINTEXT
listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER,PLAIN
sasl.mechanism.inter.broker.protocol=OAUTHBEARER
inter.broker.listener.name=CLIENT
listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  oauth.valid.issuer.uri="https:<auth_server_address>/<issuer-context>" \
  oauth.jwks.endpoint.uri="https://<auth_server>/<path_to_jwks_endpoint>" \
  oauth.username.claim="preferred_username"  \
  oauth.client.id="kafka-broker" \
  oauth.client.secret="kafka-secret" \
  oauth.token.endpoint.uri="https://<oauth_server_address>/<token_endpoint>" ;
listener.name.client.oauthbearer.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler 1
listener.name.client.plain.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.plain.JaasServerOauthOverPlainValidatorCallbackHandler 2
listener.name.client.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ 3
  oauth.valid.issuer.uri="https://<auth_server_address>/<issuer-context>" \
  oauth.jwks.endpoint.uri="https://<oauth_server_address>/<path_to_jwks_endpoint>" \
  oauth.username.claim="preferred_username"  \
  oauth.token.endpoint.uri="https://<oauth_server_address>/<token_endpoint>" ; 4
listener.name.client.oauthbearer.connections.max.reauth.ms=3600000

1
Enables OAuth 2.0 authentication for inter-broker communication.
2
Configures the server callback handler for PLAIN authentication.
3
Configures authentication settings for client communication using PLAIN authentication. oauth.token.endpoint.uri is an optional property that enables OAuth 2.0 over PLAIN using the OAuth 2.0 client credentials mechanism.
4
The OAuth 2.0 token endpoint URL to your authorization server. If specified, clients can authenticate over PLAIN by passing an access token as the password using an $accessToken: prefix.

7.2.2. Configuring OAuth 2.0 on client applications

To configure OAuth 2.0 on client applications, you must specify the following:

  • SASL (Simple Authentication and Security Layer) security protocols
  • SASL mechanisms
  • A JAAS (Java Authentication and Authorization Service) module
  • Authentication properties to access the authorization server

Configuring SASL protocols

Specify SASL protocols in the client configuration:

  • SASL_SSL for authentication over TLS encrypted connections
  • SASL_PLAINTEXT for authentication over unencrypted connections

Use SASL_SSL for production and SASL_PLAINTEXT for local development only.

When using SASL_SSL, additional ssl.truststore configuration is needed. The truststore configuration is required for secure connection (https://) to the OAuth 2.0 authorization server. To verify the OAuth 2.0 authorization server, add the CA certificate for the authorization server to the truststore in your client configuration. You can configure a truststore in PEM or PKCS #12 format.

Configuring SASL authentication mechanisms

Specify SASL mechanisms in the client configuration:

  • OAUTHBEARER for credentials exchange using a bearer token
  • PLAIN to pass client credentials (clientId + secret) or an access token

Configuring a JAAS module

Specify a JAAS module that implements the SASL authentication mechanism as a sasl.jaas.config property value:

  • org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule implements the OAUTHBEARER mechanism
  • org.apache.kafka.common.security.plain.PlainLoginModule implements the PLAIN mechanism
Note

For the OAUTHBEARER mechanism, Streams for Apache Kafka provides a callback handler for clients that use Kafka Client Java libraries to enable credentials exchange. For clients in other languages, custom code may be required to obtain the access token. For the PLAIN mechanism, Streams for Apache Kafka provides server-side callbacks to enable credentials exchange.

To be able to use the OAUTHBEARER mechanism, you must also add the custom io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler class as the callback handler. JaasClientOauthLoginCallbackHandler handles OAuth callbacks to the authorization server for access tokens during client login. This enables automatic token renewal, ensuring continuous authentication without user intervention. Additionally, it handles login credentials for clients using the OAuth 2.0 password grant method.

Configuring authentication properties

Configure the client to use credentials or access tokens for OAuth 2.0 authentication.

Using client credentials
Using client credentials involves configuring the client with the necessary credentials (client ID and secret, or client ID and client assertion) to obtain a valid access token from an authorization server. This is the simplest mechanism.
Using access tokens
Using access tokens, the client is configured with a valid long-lived access token or refresh token obtained from an authorization server. Using access tokens adds more complexity because there is an additional dependency on authorization server tools. If you are using long-lived access tokens, you may need to configure the client in the authorization server to increase the maximum lifetime of the token.

The only information ever sent to Kafka is the access token. The credentials used to obtain the token are never sent to Kafka. When a client obtains an access token, no further communication with the authorization server is needed.

SASL authentication properties support the following authentication methods:

  • OAuth 2.0 client credentials
  • Access token or Service account token
  • Refresh token
  • OAuth 2.0 password grant (deprecated)

Add the authentication properties as JAAS configuration (sasl.jaas.config and sasl.login.callback.handler.class).

If the client application is not configured with an access token directly, the client exchanges one of the following sets of credentials for an access token during Kafka session initiation:

  • Client ID and secret
  • Client ID and client assertion
  • Client ID, refresh token, and (optionally) a secret
  • Username and password, with client ID and (optionally) a secret
Note

You can also specify authentication properties as environment variables, or as Java system properties. For Java system properties, you can set them using setProperty and pass them on the command line using the -D option.

Example client credentials configuration using the client secret

security.protocol=SASL_SSL 1
sasl.mechanism=OAUTHBEARER 2
ssl.truststore.location=/tmp/truststore.p12 3
ssl.truststore.password=$STOREPASS
ssl.truststore.type=PKCS12
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  oauth.token.endpoint.uri="<token_endpoint_url>" \ 4
  oauth.client.id="<client_id>" \ 5
  oauth.client.secret="<client_secret>" \ 6
  oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \ 7
  oauth.ssl.truststore.password="$STOREPASS" \ 8
  oauth.ssl.truststore.type="PKCS12" \ 9
  oauth.scope="<scope>" \ 10
  oauth.audience="<audience>" ; 11
sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler

1
SASL_SSL security protocol for TLS-encrypted connections. Use SASL_PLAINTEXT over unencrypted connections for local development only.
2
The SASL mechanism specified as OAUTHBEARER or PLAIN.
3
The truststore configuration for secure access to the Kafka cluster.
4
URI of the authorization server token endpoint.
5
Client ID, which is the name used when creating the client in the authorization server.
6
Client secret created when creating the client in the authorization server.
7
The location contains the public key certificate (truststore.p12) for the authorization server.
8
The password for accessing the truststore.
9
The truststore type.
10
(Optional) The scope for requesting the token from the token endpoint. An authorization server may require a client to specify the scope.
11
(Optional) The audience for requesting the token from the token endpoint. An authorization server may require a client to specify the audience.

Example client credentials configuration using the client assertion

security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
ssl.truststore.location=/tmp/truststore.p12
ssl.truststore.password=$STOREPASS
ssl.truststore.type=PKCS12
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  oauth.token.endpoint.uri="<token_endpoint_url>" \
  oauth.client.id="<client_id>" \
  oauth.client.assertion.location="<path_to_client_assertion_token_file>" \ 1
  oauth.client.assertion.type="urn:ietf:params:oauth:client-assertion-type:jwt-bearer" \ 2
  oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \
  oauth.ssl.truststore.password="$STOREPASS" \
  oauth.ssl.truststore.type="PKCS12" \
  oauth.scope="<scope>" \
  oauth.audience="<audience>" ;
sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler

1
Path to the client assertion file used for authenticating the client. This file is a private key file as an alternative to the client secret. Alternatively, use the oauth.client.assertion option to specify the client assertion value in clear text.
2
(Optional) Sometimes you may need to specify the client assertion type. In not specified, the default value is urn:ietf:params:oauth:client-assertion-type:jwt-bearer.

Example password grants configuration

security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
ssl.truststore.location=/tmp/truststore.p12
ssl.truststore.password=$STOREPASS
ssl.truststore.type=PKCS12
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  oauth.token.endpoint.uri="<token_endpoint_url>" \
  oauth.client.id="<client_id>" \ 1
  oauth.client.secret="<client_secret>" \ 2
  oauth.password.grant.username="<username>" \ 3
  oauth.password.grant.password="<password>" \ 4
  oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \
  oauth.ssl.truststore.password="$STOREPASS" \
  oauth.ssl.truststore.type="PKCS12" \
  oauth.scope="<scope>" \
  oauth.audience="<audience>" ;
sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler

1
Client ID, which is the name used when creating the client in the authorization server.
2
(Optional) Client secret created when creating the client in the authorization server.
3
Username for password grant authentication. OAuth password grant configuration (username and password) uses the OAuth 2.0 password grant method. To use password grants, create a user account for a client on your authorization server with limited permissions. The account should act like a service account. Use in environments where user accounts are required for authentication, but consider using a refresh token first.
4
Password for password grant authentication.
Note

SASL PLAIN does not support passing a username and password (password grants) using the OAuth 2.0 password grant method.

Example access token configuration

security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
ssl.truststore.location=/tmp/truststore.p12
ssl.truststore.password=$STOREPASS
ssl.truststore.type=PKCS12
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  oauth.access.token="<access_token>" ; 1
sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler

1
Long-lived access token for Kafka clients. Alternatively, oauth.access.token.location can be used to specify the file that contains the access token.

Example OpenShift service account token configuration

security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
ssl.truststore.location=/tmp/truststore.p12
ssl.truststore.password=$STOREPASS
ssl.truststore.type=PKCS12
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  oauth.access.token.location="/var/run/secrets/kubernetes.io/serviceaccount/token";  1
sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler

1
Location to the service account token on the filesystem (assuming that the client is deployed as an OpenShift pod)

Example refresh token configuration

security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
ssl.truststore.location=/tmp/truststore.p12
ssl.truststore.password=$STOREPASS
ssl.truststore.type=PKCS12
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  oauth.token.endpoint.uri="<token_endpoint_url>" \
  oauth.client.id="<client_id>" \ 1
  oauth.client.secret="<client_secret>" \ 2
  oauth.refresh.token="<refresh_token>" \ 3
  oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \
  oauth.ssl.truststore.password="$STOREPASS" \
  oauth.ssl.truststore.type="PKCS12" ;
sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler

1
Client ID, which is the name used when creating the client in the authorization server.
2
(Optional) Client secret created when creating the client in the authorization server.
3
Long-lived refresh token for Kafka clients.

SASL extensions for custom OAUTHBEARER implementations

If your Kafka broker uses a custom OAUTHBEARER implementation, you may need to pass additional SASL extension options. These extensions can include attributes or information required as client context by the authorization server. The options are passed as key-value pairs and are sent to the Kafka broker when a new session is started.

Pass SASL extension values using oauth.sasl.extension. as a key prefix.

Example configuration to pass SASL extension values

oauth.sasl.extension.key1="value1"
oauth.sasl.extension.key2="value2"

7.2.3. OAuth 2.0 client authentication flows

OAuth 2.0 authentication flows depend on the underlying Kafka client and Kafka broker configuration. The flows must also be supported by the authorization server used.

The Kafka broker listener configuration determines how clients authenticate using an access token. The client can pass a client ID and secret to request an access token.

If a listener is configured to use PLAIN authentication, the client can authenticate with a client ID and secret or username and access token. These values are passed as the username and password properties of the PLAIN mechanism.

Listener configuration supports the following token validation options:

  • You can use fast local token validation based on JWT signature checking and local token introspection, without contacting an authorization server. The authorization server provides a JWKS endpoint with public certificates that are used to validate signatures on the tokens.
  • You can use a call to a token introspection endpoint provided by an authorization server. Each time a new Kafka broker connection is established, the broker passes the access token received from the client to the authorization server. The Kafka broker checks the response to confirm whether the token is valid.
Note

An authorization server might only allow the use of opaque access tokens, which means that local token validation is not possible.

Kafka client credentials can also be configured for the following types of authentication:

  • Direct local access using a previously generated long-lived access token
  • Contact with the authorization server for a new access token to be issued (using a client ID and credentials, or a refresh token, or a username and a password)

7.2.3.1. Example client authentication flows using the SASL OAUTHBEARER mechanism

You can use the following communication flows for Kafka authentication using the SASL OAUTHBEARER mechanism.

Client using client ID and credentials, with broker delegating validation to authorization server

Client using client ID and secret with broker delegating validation to authorization server

  1. The Kafka client requests an access token from the authorization server using a client ID and credentials, and optionally a refresh token. Alternatively, the client may authenticate using a username and a password.
  2. The authorization server generates a new access token.
  3. The Kafka client authenticates with the Kafka broker using the SASL OAUTHBEARER mechanism to pass the access token.
  4. The Kafka broker validates the access token by calling a token introspection endpoint on the authorization server using its own client ID and secret.
  5. A Kafka client session is established if the token is valid.

Client using client ID and credentials, with broker performing fast local token validation

Client using client ID and credentials with broker performing fast local token validation

  1. The Kafka client authenticates with the authorization server from the token endpoint, using a client ID and credentials, and optionally a refresh token. Alternatively, the client may authenticate using a username and a password.
  2. The authorization server generates a new access token.
  3. The Kafka client authenticates with the Kafka broker using the SASL OAUTHBEARER mechanism to pass the access token.
  4. The Kafka broker validates the access token locally using a JWT token signature check, and local token introspection.

Client using long-lived access token, with broker delegating validation to authorization server

Client using long-lived access token with broker delegating validation to authorization server

  1. The Kafka client authenticates with the Kafka broker using the SASL OAUTHBEARER mechanism to pass the long-lived access token.
  2. The Kafka broker validates the access token by calling a token introspection endpoint on the authorization server, using its own client ID and secret.
  3. A Kafka client session is established if the token is valid.

Client using long-lived access token, with broker performing fast local validation

Client using long-lived access token with broker performing fast local validation

  1. The Kafka client authenticates with the Kafka broker using the SASL OAUTHBEARER mechanism to pass the long-lived access token.
  2. The Kafka broker validates the access token locally using a JWT token signature check and local token introspection.
Warning

Fast local JWT token signature validation is suitable only for short-lived tokens as there is no check with the authorization server if a token has been revoked. Token expiration is written into the token, but revocation can happen at any time, so cannot be accounted for without contacting the authorization server. Any issued token would be considered valid until it expires.

7.2.3.2. Example client authentication flows using the SASL PLAIN mechanism

You can use the following communication flows for Kafka authentication using the OAuth PLAIN mechanism.

Client using a client ID and secret, with the broker obtaining the access token for the client

Client using a client ID and secret with the broker obtaining the access token for the client

  1. The Kafka client passes a clientId as a username and a secret as a password.
  2. The Kafka broker uses a token endpoint to pass the clientId and secret to the authorization server.
  3. The authorization server returns a fresh access token or an error if the client credentials are not valid.
  4. The Kafka broker validates the token in one of the following ways:

    1. If a token introspection endpoint is specified, the Kafka broker validates the access token by calling the endpoint on the authorization server. A session is established if the token validation is successful.
    2. If local token introspection is used, a request is not made to the authorization server. The Kafka broker validates the access token locally using a JWT token signature check.

Client using a long-lived access token without a client ID and secret

Client using a long-lived access token without a client ID and secret

  1. The Kafka client passes a username and password. The password provides the value of an access token that was obtained manually and configured before running the client.
  2. The password is passed with or without an $accessToken: string prefix depending on whether or not the Kafka broker listener is configured with a token endpoint for authentication.

    1. If the token endpoint is configured, the password should be prefixed by $accessToken: to let the broker know that the password parameter contains an access token rather than a client secret. The Kafka broker interprets the username as the account username.
    2. If the token endpoint is not configured on the Kafka broker listener (enforcing a no-client-credentials mode), the password should provide the access token without the prefix. The Kafka broker interprets the username as the account username. In this mode, the client doesn’t use a client ID and secret, and the password parameter is always interpreted as a raw access token.
  3. The Kafka broker validates the token in one of the following ways:

    1. If a token introspection endpoint is specified, the Kafka broker validates the access token by calling the endpoint on the authorization server. A session is established if token validation is successful.
    2. If local token introspection is used, there is no request made to the authorization server. Kafka broker validates the access token locally using a JWT token signature check.

7.2.4. Re-authenticating sessions

You can configure OAuth listeners to use Kafka session re-authentication for OAuth 2.0 sessions between Kafka clients and Kafka brokers. This mechanism enforces the expiry of an authenticated session between the client and the broker after a defined period of time. When a session expires, the client immediately starts a new session by reusing the existing connection rather than dropping it.

Session re-authentication is disabled by default. To enable it, set a time value for the connections.max.reauth.ms property in the server.properties file. For an example configuration, see Section 7.2.1, “Configuring OAuth 2.0 authentication on listeners”.

Session re-authentication must be supported by the Kafka client libraries used by the client.

Session re-authentication can be used with fast local JWT or introspection endpoint token validation.

Client re-authentication

When the broker’s authenticated session expires, the client must re-authenticate to the existing session by sending a new, valid access token to the broker, without dropping the connection.

If token validation is successful, a new client session is started using the existing connection. If the client fails to re-authenticate, the broker will close the connection if further attempts are made to send or receive messages. Java clients that use Kafka client library 2.2 or later automatically re-authenticate if the re-authentication mechanism is enabled on the broker.

Session re-authentication also applies to refresh tokens, if used. When the session expires, the client refreshes the access token by using its refresh token. The client then uses the new access token to re-authenticate over the existing connection.

Session expiry for OAUTHBEARER and PLAIN

When session re-authentication is configured, session expiry works differently for OAUTHBEARER and PLAIN authentication.

For OAUTHBEARER and PLAIN, using the client ID and secret method:

  • The broker’s authenticated session will expire at the configured connections.max.reauth.ms.
  • The session will expire earlier if the access token expires before the configured time.

For PLAIN using the long-lived access token method:

  • The broker’s authenticated session will expire at the configured connections.max.reauth.ms.
  • Re-authentication will fail if the access token expires before the configured time. Although session re-authentication is attempted, PLAIN has no mechanism for refreshing tokens.

If connections.max.reauth.ms is not configured, OAUTHBEARER and PLAIN clients can remain connected to brokers indefinitely, without needing to re-authenticate. Authenticated sessions do not end with access token expiry.

However, this can be considered when configuring authorization, for example, by using keycloak authorization or installing a custom authorizer.

7.2.5. Example: Enabling OAuth 2.0 authentication

This example shows how to configure client access to a Kafka cluster using OAUth 2.0 authentication. The procedures describe the configuration required to set up OAuth 2.0 authentication on Kafka listeners and Kafka Java clients.

7.2.5.1. Configuring OAuth 2.0 support for Kafka brokers

This procedure describes how to configure Kafka brokers so that the broker listeners are enabled to use OAuth 2.0 authentication using an authorization server.

We advise use of OAuth 2.0 over an encrypted interface through configuration of TLS listeners. Plain listeners are not recommended.

Configure the Kafka brokers using properties that support your chosen authorization server, and the type of authorization you are implementing.

Prerequisites

  • Streams for Apache Kafka is installed on each host, and the configuration files are available.
  • An OAuth 2.0 authorization server is deployed.

Procedure

  1. Configure the Kafka broker listener configuration in the server.properties file.

    For example, using the OAUTHBEARER mechanism:

    sasl.enabled.mechanisms=OAUTHBEARER
    listeners=CLIENT://0.0.0.0:9092
    listener.security.protocol.map=CLIENT:SASL_PLAINTEXT
    listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER
    sasl.mechanism.inter.broker.protocol=OAUTHBEARER
    inter.broker.listener.name=CLIENT
    listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler
    listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required ;
    listener.name.client.oauthbearer.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
  2. Configure broker connection settings as part of the listener.name.client.oauthbearer.sasl.jaas.config.

  3. If required, configure access to the authorization server.

    This step is normally required for a production environment, unless a technology like service mesh is used to configure secure channels outside containers.

    1. Provide a custom truststore for connecting to a secured authorization server. SSL is always required for access to the authorization server.

      Set properties to configure the truststore.

      For example:

      listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
        # ...
        oauth.client.id="kafka-broker" \
        oauth.client.secret="kafka-broker-secret" \
        oauth.ssl.truststore.location="<path_to_truststore_p12_file>" \
        oauth.ssl.truststore.password="<truststore_password>" \
        oauth.ssl.truststore.type="PKCS12" ;
    2. If the certificate hostname does not match the access URL hostname, you can turn off certificate hostname validation:

      oauth.ssl.endpoint.identification.algorithm=""

      The check ensures that client connection to the authorization server is authentic. You may wish to turn off the validation in a non-production environment.

7.2.5.2. Setting up OAuth 2.0 on Kafka Java clients

Configure Kafka producer and consumer APIs to use OAuth 2.0 for interaction with Kafka brokers. Add a callback plugin to your client pom.xml file, then configure your client for OAuth 2.0.

How you configure the authentication properties depends on the authentication method you are using to access the OAuth 2.0 authorization server. In this procedure, the properties are specified in a properties file, then loaded into the client configuration.

Prerequisites

  • Streams for Apache Kafka and Kafka are running
  • An OAuth 2.0 authorization server is deployed and configured for OAuth access to Kafka brokers
  • Kafka brokers are configured for OAuth 2.0

Procedure

  1. Add the client library with OAuth 2.0 support to the pom.xml file for the Kafka client:

    <dependency>
     <groupId>io.strimzi</groupId>
     <artifactId>kafka-oauth-client</artifactId>
     <version>0.15.0.redhat-00012</version>
    </dependency>
  2. Configure the client depending on the OAuth 2.0 authentication method:

    For example, specify the properties for the authentication method in a client.properties file.

  3. Input the client properties for OAUTH 2.0 authentication into the Java client code.

    Example showing input of client properties

    Properties props = new Properties();
    try (FileReader reader = new FileReader("client.properties", StandardCharsets.UTF_8)) {
      props.load(reader);
    }

  4. Verify that the Kafka client can access the Kafka brokers.

7.3. Using OAuth 2.0 token-based authorization

Streams for Apache Kafka supports the use of OAuth 2.0 token-based authorization through Red Hat build of Keycloak Authorization Services, which lets you manage security policies and permissions centrally.

Security policies and permissions defined in Red Hat build of Keycloak grant access to Kafka resources. Users and clients are matched against policies that permit access to perform specific actions on Kafka brokers.

Kafka allows all users full access to brokers by default, but also provides the AclAuthorizer and StandardAuthorizer plugins to configure authorization based on Access Control Lists (ACLs). The ACL rules managed by these plugins are used to grant or deny access to resources based on username, and these rules are stored within the Kafka cluster itself.

However, OAuth 2.0 token-based authorization with Red Hat build of Keycloak offers far greater flexibility on how you wish to implement access control to Kafka brokers. In addition, you can configure your Kafka brokers to use OAuth 2.0 authorization and ACLs.

7.3.1. Example: Enabling OAuth 2.0 authorization

This procedure describes how to configure Kafka brokers to use OAuth 2.0 authorization using Red Hat build of Keycloak Authorization Services.

Red Hat build of Keycloak server Authorization Services REST endpoints extend token-based authentication with Red Hat build of Keycloak by applying defined security policies on a particular user, and providing a list of permissions granted on different resources for that user. Policies use roles and groups to match permissions to users. OAuth 2.0 authorization enforces permissions locally based on the received list of grants for the user from Red Hat build of Keycloak Authorization Services.

A Red Hat build of Keycloak authorizer (KeycloakAuthorizer) is provided with Streams for Apache Kafka. The authorizer fetches a list of granted permissions from the authorization server as needed, and enforces authorization locally on Kafka, making rapid authorization decisions for each client request.

Before you begin

Consider the access you require or want to limit for certain users. You can use a combination of Red Hat build of Keycloak groups, roles, clients, and users to configure access in Red Hat build of Keycloak.

Typically, groups are used to match users based on organizational departments or geographical locations. And roles are used to match users based on their function.

With Red Hat build of Keycloak, you can store users and groups in LDAP, whereas clients and roles cannot be stored this way. Storage and access to user data may be a factor in how you choose to configure authorization policies.

Note

Super users always have unconstrained access to a Kafka broker regardless of the authorization implemented on the Kafka broker.

Prerequisites

  • Streams for Apache Kafka must be configured to use OAuth 2.0 with Red Hat build of Keycloak token-based authentication. You use the same RRed Hat build of Keycloak endpoint when you set up authorization.
  • You need to understand how to manage policies and permissions for Red Hat build of Keycloak Authorization Services, as described in the Red Hat build of Keycloak documentation.

Procedure

  1. Access the Red Hat build of Keycloak Admin Console or use the Red Hat build of Keycloak Admin CLI to enable Authorization Services for the OAuth 2.0 client for Kafka you created when setting up OAuth 2.0 authentication.
  2. Use Authorization Services to define resources, authorization scopes, policies, and permissions for the client.
  3. Bind the permissions to users and clients by assigning them roles and groups.
  4. Configure the Kafka brokers to use Red Hat build of Keycloak authorization.

    Add the following to the Kafka server.properties configuration file to install the authorizer in Kafka:

    authorizer.class.name=io.strimzi.kafka.oauth.server.authorizer.KeycloakAuthorizer
    principal.builder.class=io.strimzi.kafka.oauth.server.OAuthKafkaPrincipalBuilder
  5. Add configuration for the Kafka brokers to access the authorization server and Authorization Services.

    Here we show example configuration added as additional properties to server.properties, but you can also define them as environment variables using capitalized or upper-case naming conventions.

    strimzi.authorization.token.endpoint.uri="https://<auth_server_address>/auth/realms/REALM-NAME/protocol/openid-connect/token" 1
    strimzi.authorization.client.id="kafka" 2
    1
    The OAuth 2.0 token endpoint URL to Red Hat build of Keycloak. For production, always use https:// urls.
    2
    The client ID of the OAuth 2.0 client definition in Red Hat build of Keycloak that has Authorization Services enabled. Typically, kafka is used as the ID.
  6. (Optional) Add configuration for specific Kafka clusters.

    For example:

    strimzi.authorization.kafka.cluster.name="kafka-cluster" 1
    1
    The name of a specific Kafka cluster. Names are used to target permissions, making it possible to manage multiple clusters within the same Red Hat build of Keycloak realm. The default value is kafka-cluster.
  7. (Optional) Delegate to simple authorization:

    strimzi.authorization.delegate.to.kafka.acl="true" 1
    1
    Delegate authorization to Kafka AclAuthorizer if access is denied by Red Hat build of Keycloak Authorization Services policies. The default is false.
  8. (Optional) Add configuration for TLS connection to the authorization server.

    For example:

    strimzi.authorization.ssl.truststore.location=<path_to_truststore> 1
    strimzi.authorization.ssl.truststore.password=<my_truststore_password> 2
    strimzi.authorization.ssl.truststore.type=JKS 3
    strimzi.authorization.ssl.secure.random.implementation=SHA1PRNG 4
    strimzi.authorization.ssl.endpoint.identification.algorithm=HTTPS 5
    1
    The path to the truststore that contain the certificates.
    2
    The password for the truststore.
    3
    The truststore type. If not set, the default Java keystore type is used.
    4
    Random number generator implementation. If not set, the Java platform SDK default is used.
    5
    Hostname verification. If set to an empty string, the hostname verification is turned off. If not set, the default value is HTTPS, which enforces hostname verification for server certificates.
  9. (Optional) Configure the refresh of grants from the authorization server. The grants refresh job works by enumerating the active tokens and requesting the latest grants for each.

    For example:

    strimzi.authorization.grants.refresh.period.seconds="120" 1
    strimzi.authorization.grants.refresh.pool.size="10" 2
    strimzi.authorization.grants.max.idle.time.seconds="300" 3
    strimzi.authorization.grants.gc.period.seconds="300" 4
    strimzi.authorization.reuse.grants="false" 5
    1
    Specifies how often the list of grants from the authorization server is refreshed (once per minute by default). To turn grants refresh off for debugging purposes, set to "0".
    2
    Specifies the size of the thread pool (the degree of parallelism) used by the grants refresh job. The default value is "5".
    3
    The time, in seconds, after which an idle grant in the cache can be evicted. The default value is 300.
    4
    The time, in seconds, between consecutive runs of a job that cleans stale grants from the cache. The default value is 300.
    5
    Controls whether the latest grants are fetched for a new session. When disabled, grants are retrieved from Red Hat build of Keycloak and cached for the user. The default value is true.
  10. (Optional) Configure network timeouts when communicating with the authorization server.

    For example:

    strimzi.authorization.connect.timeout.seconds="60" 1
    strimzi.authorization.read.timeout.seconds="60" 2
    strimzi.authorization.http.retries="2" 3
    1
    The connect timeout in seconds when connecting to the Red Hat build of Keycloak token endpoint. The default value is 60.
    2
    The read timeout in seconds when connecting to the Red Hat build of Keycloak token endpoint. The default value is 60.
    3
    The maximum number of times to retry (without pausing) a failed HTTP request to the authorization server. The default value is 0, meaning that no retries are performed. To use this option effectively, consider reducing the timeout times for the strimzi.authorization.connect.timeout.seconds and strimzi.authorization.read.timeout.seconds options. However, note that retries may prevent the current worker thread from being available to other requests, and if too many requests stall, it could make Kafka unresponsive.
  11. (Optional) Enable OAuth 2.0 metrics for token validation and authorization:

    oauth.enable.metrics="true" 1
    1
    Controls whether to enable or disable OAuth metrics. The default value is false.
  12. (Optional) Remove the Accept header from requests:

    oauth.include.accept.header="false" 1
    1
    Set to false if including the header is causing issues when communicating with the authorization server. The default value is true.
  13. Verify the configured permissions by accessing Kafka brokers as clients or users with specific roles, ensuring they have the necessary access and do not have unauthorized access.
Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.