このコンテンツは選択した言語では利用できません。
Chapter 7. Enabling OAuth 2.0 token-based access
Streams for Apache Kafka supports OAuth 2.0 for securing Kafka clusters by integrating with an OAUth 2.0 authorization server. Kafka brokers and clients both need to be configured to use OAuth 2.0.
OAuth 2.0 enables standardized token-based authentication and authorization between applications, using a central authorization server to issue tokens that grant limited access to resources. You can define specific scopes for fine-grained access control. Scopes correspond to different levels of access to Kafka topics or operations within the cluster.
OAuth 2.0 also supports single sign-on and integration with identity providers.
7.1. Configuring an OAuth 2.0 authorization server
Before you can use OAuth 2.0 token-based access, you must configure an authorization server for integration with Streams for Apache Kafka. The steps are dependent on the chosen authorization server. Consult the product documentation for the authorization server for information on how to set up OAuth 2.0 access.
Prepare the authorization server to work with Streams for Apache Kafka by defining OAUth 2.0 clients for Kafka and each Kafka client component of your application. In relation to the authorization server, the Kafka cluster and Kafka clients are both regarded as OAuth 2.0 clients.
In general, configure OAuth 2.0 clients in the authorization server with the following client credentials enabled:
-
Client ID (for example,
kafka
for the Kafka cluster) - Client ID and secret as the authentication mechanism
You only need to use a client ID and secret when using a non-public introspection endpoint of the authorization server. The credentials are not typically required when using public authorization server endpoints, as with fast local JWT token validation.
7.2. Using OAuth 2.0 token-based authentication
Streams for Apache Kafka supports the use of OAuth 2.0 for token-based authentication. An OAuth 2.0 authorization server handles the granting of access and inquiries about access. Kafka clients authenticate to Kafka brokers. Brokers and clients communicate with the authorization server, as necessary, to obtain or validate access tokens.
For a deployment of Streams for Apache Kafka, OAuth 2.0 integration provides the following support:
- Server-side OAuth 2.0 authentication for Kafka brokers
- Client-side OAuth 2.0 authentication for Kafka MirrorMaker, Kafka Connect, and the Kafka Bridge
Streams for Apache Kafka on RHEL includes two OAuth 2.0 libraries:
kafka-oauth-client
-
Provides a custom login callback handler class named
io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
. To handle theOAUTHBEARER
authentication mechanism, use the login callback handler with theOAuthBearerLoginModule
provided by Apache Kafka. kafka-oauth-common
-
A helper library that provides some of the functionality needed by the
kafka-oauth-client
library.
The provided client libraries also have dependencies on some additional third-party libraries, such as: keycloak-core
, jackson-databind
, and slf4j-api
.
We recommend using a Maven project to package your client to ensure that all the dependency libraries are included. Dependency libraries might change in future versions.
Additional resources
7.2.1. Configuring OAuth 2.0 authentication on listeners
To secure Kafka brokers with OAuth 2.0 authentication, configure a Kafka listener to use OAUth 2.0 authentication and a client authentication mechanism in the Kafka server.properties
file, and add further configuration depending on the authentication mechanism and type of token validation used in the authentication.
A minimum configuration is required. You can also configure a TLS listener, where TLS is used for inter-broker communication. We recommend using OAuth 2.0 authentication together with TLS encryption. Without encryption, the connection is vulnerable to network eavesdropping and unauthorized access through token theft.
When you have defined the type of authentication as OAuth 2.0, you add configuration based on the type of validation, either as fast local JWT validation or token validation using an introspection endpoint.
Enabling SASL authentication mechanisms
Use one or both of the following SASL mechanisms for clients to exchange credentials and establish authenticated sessions with Kafka.
OAUTHBEARER
Using the
OAUTHBEARER
authentication mechanism, credentials exchange uses a bearer token provided by an OAuth callback handler. Token provision can be configured to use the following methods:- Client ID and secret (using the OAuth 2.0 client credentials mechanism)
- Client ID and client assertion
- Long-lived access token
- Long-lived refresh token obtained manually
OAUTHBEARER
is recommended as it provides a higher level of security thanPLAIN
, though it can only be used by Kafka clients that support theOAUTHBEARER
mechanism at the protocol level. Client credentials are never shared with Kafka.PLAIN
PLAIN
is a simple authentication mechanism used by all Kafka client tools. Consider usingPLAIN
only with Kafka clients that do not supportOAUTHBEARER
. Using thePLAIN
authentication mechanism, credentials exchange can be configured to use the following methods:- Client ID and secret (using the OAuth 2.0 client credentials mechanism)
-
Long-lived access token
Regardless of the method used, the client must provideusername
andpassword
properties to Kafka.
Credentials are handled centrally behind a compliant authorization server, similar to how
OAUTHBEARER
authentication is used. The username extraction process depends on the authorization server configuration.
Example listener configuration for the OAUTHBEARER
mechanism
sasl.enabled.mechanisms=OAUTHBEARER 1 listeners=CLIENT://0.0.0.0:9092 2 listener.security.protocol.map=CLIENT:SASL_PLAINTEXT 3 listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER 4 sasl.mechanism.inter.broker.protocol=OAUTHBEARER 5 inter.broker.listener.name=CLIENT 6 listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler 7 listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule # ...
- 1
- Enables the
OAUTHBEARER
mechanism for credentials exchange over SASL. - 2
- Configures a listener for client applications to connect to. The system
hostname
is used as an advertised hostname, which clients must resolve in order to reconnect. The listener is namedCLIENT
in this example. - 3
- Specifies the channel protocol for the listener.
SASL_SSL
is for TLS.SASL_PLAINTEXT
is used for an unencrypted connection (no TLS), but there is risk of eavesdropping and interception at the TCP connection layer. - 4
- Specifies the
OAUTHBEARER
mechanism for the CLIENT listener. The client name (CLIENT
) is usually specified in uppercase in thelisteners
property, in lowercase forlistener.name
properties (listener.name.client
), and in lowercase when part of alistener.name.client.*
property. - 5
- Specifies the
OAUTHBEARER
mechanism for inter-broker communication. - 6
- Specifies the listener for inter-broker communication. The specification is required for the configuration to be valid.
- 7
- Configures OAuth 2.0 authentication on the client listener.
Configuring OAuth 2.0 with properties or variables
Configure OAuth 2.0 settings using Java Authentication and Authorization Service (JAAS) properties or environment variables.
-
JAAS properties are configured in the
server.properties
configuration file, and passed as key-values pairs of thelistener.name.<listener_name>.oauthbearer.sasl.jaas.config
property. If using environment variables, you still need to provide the
listener.name.<listener_name>.oauthbearer.sasl.jaas.config
property in theserver.properties
file, but you can omit the other JAAS properties.You can use capitalized or upper-case environment variable naming conventions.
The Streams for Apache Kafka OAuth 2.0 libraries use properties that start with:
-
oauth.
to configure authentication -
strimzi.
to configure OAuth 2.0 authorization
Configuring fast local JWT token validation
Fast local JWT token validation involves checking a JWT token signature locally to ensure that the token meets the following criteria:
-
Contains a
typ
(type) ortoken_type
header claim value ofBearer
to indicate it is an access token - Is currently valid and not expired
-
Has an issuer that matches a
validIssuerURI
You specify a validIssuerURI
attribute when you configure the listener, so that any tokens not issued by the authorization server are rejected.
The authorization server does not need to be contacted during fast local JWT token validation. You activate fast local JWT token validation by specifying a jwksEndpointUri
attribute, the endpoint exposed by the OAuth 2.0 authorization server. The endpoint contains the public keys used to validate signed JWT tokens, which are sent as credentials by Kafka clients.
All communication with the authorization server should be performed using TLS encryption. You can configure a certificate truststore and point to the truststore file.
You might want to configure a userNameClaim
to properly extract a username from the JWT token. If required, you can use a JsonPath expression like "['user.info'].['user.id']"
to retrieve the username from nested JSON attributes within a token.
If you want to use Kafka ACL authorization, identify the user by their username during authentication. (The sub
claim in JWT tokens is typically a unique ID, not a username.)
Example configuration for fast local JWT token validation
# ... listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ 1 oauth.valid.issuer.uri="https://<auth_server_address>/<issuer-context>" \ 2 oauth.jwks.endpoint.uri="https://<oauth_server_address>/<path_to_jwks_endpoint>" \ 3 oauth.jwks.refresh.seconds="300" \ 4 oauth.jwks.refresh.min.pause.seconds="1" \ 5 oauth.jwks.expiry.seconds="360" \ 6 oauth.username.claim="preferred_username" \ 7 oauth.ssl.truststore.location="<path_to_truststore_p12_file>" \ 8 oauth.ssl.truststore.password="<truststore_password>" \ 9 oauth.ssl.truststore.type="PKCS12" ; 10 listener.name.client.oauthbearer.connections.max.reauth.ms=3600000 11
- 1
- Configures the CLIENT listener for OAuth 2.0. Connectivity with the authorization server should use secure HTTPS connections.
- 2
- A valid issuer URI. Only access tokens issued by this issuer will be accepted. (Always required.)
- 3
- The JWKS endpoint URL.
- 4
- The period between endpoint refreshes (default 300).
- 5
- The minimum pause in seconds between consecutive attempts to refresh JWKS public keys. When an unknown signing key is encountered, the JWKS keys refresh is scheduled outside the regular periodic schedule with at least the specified pause since the last refresh attempt. The refreshing of keys follows the rule of exponential backoff, retrying on unsuccessful refreshes with ever increasing pause, until it reaches
oauth.jwks.refresh.seconds
. The default value is 1. - 6
- The duration the JWKs certificates are considered valid before they expire. Default is
360
seconds. If you specify a longer time, consider the risk of allowing access to revoked certificates. - 7
- The token claim (or key) that contains the actual user name in the token. The user name is the principal used to identify the user. The value will depend on the authentication flow and the authorization server used. If required, you can use a JsonPath expression like
"['user.info'].['user.id']"
to retrieve the username from nested JSON attributes within a token. - 8
- The location of the truststore used in the TLS configuration.
- 9
- Password to access the truststore.
- 10
- The truststore type in PKCS #12 format.
- 11
- (Optional) Enforces session expiry when a token expires, and also activates the Kafka re-authentication mechanism. If the specified value is less than the time left for the access token to expire, then the client will have to re-authenticate before the actual token expiry. By default, the session does not expire when the access token expires, and the client does not attempt re-authentication.
Configuring token validation using an introspection endpoint
Token validation using an OAuth 2.0 introspection endpoint treats a received access token as opaque. The Kafka broker sends an access token to the introspection endpoint, which responds with the token information necessary for validation. Importantly, it returns up-to-date information if the specific access token is valid, and also information about when the token expires.
To configure OAuth 2.0 introspection-based validation, you specify an introspection endpoint URI rather than the JWKs endpoint URI specified for fast local JWT token validation. Depending on the authorization server, you typically have to specify a client ID and client secret, because the introspection endpoint is usually protected.
Example token validation configuration using an introspection endpoint
# ... listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.introspection.endpoint.uri="https://<oauth_server_address>/<introspection_endpoint>" \ 1 oauth.client.id="kafka-broker" \ 2 oauth.client.secret="kafka-broker-secret" \ 3 oauth.ssl.truststore.location="<path_to_truststore_p12_file>" \ 4 oauth.ssl.truststore.password="<truststore_password>" \ 5 oauth.ssl.truststore.type="PKCS12" \ 6 oauth.username.claim="preferred_username" ; 7
- 1
- URI of the token introspection endpoint.
- 2
- Client ID of the Kafka broker.
- 3
- Secret for the Kafka broker.
- 4
- The location of the truststore used in the TLS configuration.
- 5
- Password to access the truststore.
- 6
- The truststore type in PKCS #12 format.
- 7
- The token claim (or key) that contains the actual user name in the token. The user name is the principal used to identify the user. The value will depend on the authentication flow and the authorization server used. If required, you can use a JsonPath expression like
"['user.info'].['user.id']"
to retrieve the username from nested JSON attributes within a token.
Authenticating brokers to the authorization server protected endpoints
Usually, the certificates endpoint of the authorization server (oauth.jwks.endpoint.uri
) is publicly accessible, while the introspection endpoint (oauth.introspection.endpoint.uri
) is protected. However, this may vary depending on the authorization server configuration.
The Kafka broker can authenticate to the authorization server’s protected endpoints in one of two ways using HTTP authentication schemes:
- HTTP Basic authentication uses a client ID and secret.
- HTTP Bearer authentication uses a bearer token.
To configure HTTP Basic authentication, set the following properties:
-
oauth.client.id
-
oauth.client.secret
For HTTP Bearer authentication, set one of the following properties:
-
oauth.server.bearer.token.location
to specify the file path on disk containing the bearer token. -
oauth.server.bearer.token
to specify the bearer token in clear text.
Including additional configuration options
Specify additional settings depending on the authentication requirements and the authorization server you are using. Some of these properties apply only to certain authentication mechanisms or when used in combination with other properties.
For example, when using OAUth over PLAIN
, access tokens are passed as password
property values with or without an $accessToken:
prefix.
-
If you configure a token endpoint (
oauth.token.endpoint.uri
) in the listener configuration, you need the prefix. - If you don’t configure a token endpoint in the listener configuration, you don’t need the prefix. The Kafka broker interprets the password as a raw access token.
If the password
is set as the access token, the username
must be set to the same principal name that the Kafka broker obtains from the access token. You can specify username extraction options in your listener using the oauth.username.claim
, oauth.username.prefix
, oauth.fallback.username.claim
, oauth.fallback.username.prefix
, and oauth.userinfo.endpoint.uri
properties. The username extraction process also depends on your authorization server; in particular, how it maps client IDs to account names.
The PLAIN
mechanism does not support password grant authentication. Use either client credentials (client ID + secret) or an access token for authentication.
Example additional configuration settings
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ # ... oauth.token.endpoint.uri="https://<auth_server_address>/<path_to_token_endpoint>" \ 1 oauth.custom.claim.check="@.custom == 'custom-value'" \ 2 oauth.scope="<scope>" \ 3 oauth.check.audience="true" \ 4 oauth.audience="<audience>" \ 5 oauth.client.id="kafka-broker" \ 6 oauth.client.secret="kafka-broker-secret" \ 7 oauth.connect.timeout.seconds=60 \ 8 oauth.read.timeout.seconds=60 \ 9 oauth.http.retries=2 \ 10 oauth.http.retry.pause.millis=300 \ 11 oauth.groups.claim="$.groups" \ 12 oauth.groups.claim.delimiter="," \ 13 oauth.include.accept.header="false" ; 14 oauth.check.issuer=false \ 15 oauth.username.prefix="user-account-" \ 16 oauth.fallback.username.claim="client_id" \ 17 oauth.fallback.username.prefix="service-account-" \ 18 oauth.valid.token.type="bearer" \ 19 oauth.userinfo.endpoint.uri="https://<auth_server_address>/<path_to_userinfo_endpoint>" ; 20
- 1
- The OAuth 2.0 token endpoint URL to your authorization server. For production, always use
https://
urls. Required whenKeycloakAuthorizer
is used, or an OAuth 2.0 enabled listener is used for inter-broker communication. - 2
- (Optional) Custom claim checking. A JsonPath filter query that applies additional custom rules to the JWT access token during validation. If the access token does not contain the necessary data, it is rejected. When using the introspection endpoint method, the custom check is applied to the introspection endpoint response JSON.
- 3
- (Optional) A
scope
parameter passed to the token endpoint. A scope is used when obtaining an access token for inter-broker authentication. It is also used in the name of a client for OAuth 2.0 over PLAIN client authentication using aclientId
andsecret
. This only affects the ability to obtain the token, and the content of the token, depending on the authorization server. It does not affect token validation rules by the listener. - 4
- (Optional) Audience checking. If your authorization server provides an
aud
(audience) claim, and you want to enforce an audience check, setouath.check.audience
totrue
. Audience checks identify the intended recipients of tokens. As a result, the Kafka broker will reject tokens that do not have itsclientId
in theiraud
claims. Default isfalse
. - 5
- (Optional) An
audience
parameter passed to the token endpoint. An audience is used when obtaining an access token for inter-broker authentication. It is also used in the name of a client for OAuth 2.0 over PLAIN client authentication using aclientId
andsecret
. This only affects the ability to obtain the token, and the content of the token, depending on the authorization server. It does not affect token validation rules by the listener. - 6
- The configured client ID of the Kafka broker, which is the same for all brokers. This is the client registered with the authorization server as
kafka-broker
. Required when an introspection endpoint is used for token validation, or whenKeycloakAuthorizer
is used. - 7
- The configured secret for the Kafka broker, which is the same for all brokers. When the broker must authenticate to the authorization server, either a client secret, access token or a refresh token has to be specified.
- 8
- (Optional) The connect timeout in seconds when connecting to the authorization server. The default value is 60.
- 9
- (Optional) The read timeout in seconds when connecting to the authorization server. The default value is 60.
- 10
- The maximum number of times to retry a failed HTTP request to the authorization server. The default value is 0, meaning that no retries are performed. To use this option effectively, consider reducing the timeout times for the
oauth.connect.timeout.seconds
andoauth.read.timeout.seconds
options. However, note that retries may prevent the current worker thread from being available to other requests, and if too many requests stall, it could make the Kafka broker unresponsive. - 11
- The time to wait before attempting another retry of a failed HTTP request to the authorization server. By default, this time is set to zero, meaning that no pause is applied. This is because many issues that cause failed requests are per-request network glitches or proxy issues that can be resolved quickly. However, if your authorization server is under stress or experiencing high traffic, you may want to set this option to a value of 100 ms or more to reduce the load on the server and increase the likelihood of successful retries.
- 12
- A JsonPath query used to extract groups information from JWT token or introspection endpoint response. Not set by default. This can be used by a custom authorizer to make authorization decisions based on user groups.
- 13
- A delimiter used to parse groups information when returned as a single delimited string. The default value is ',' (comma).
- 14
- (Optional) Sets
oauth.include.accept.header
tofalse
to remove theAccept
header from requests. You can use this setting if including the header is causing issues when communicating with the authorization server. - 15
- If your authorization server does not provide an
iss
claim, it is not possible to perform an issuer check. In this situation, setoauth.check.issuer
tofalse
and do not specify aoauth.valid.issuer.uri
. Default istrue
. - 16
- The prefix used when constructing the user ID. This only takes effect if
oauth.username.claim
is configured. - 17
- An authorization server may not provide a single attribute to identify both regular users and clients. When a client authenticates in its own name, the server might provide a client ID attribute. When a user authenticates using a username and password, to obtain a refresh token or an access token, the server might provide a username attribute in addition to a client ID. Use this fallback option to specify the username claim (attribute) to use if a primary user ID attribute is not available. If required, you can use a JsonPath expression like
"['client.info'].['client.id']"
to retrieve the fallback username from nested JSON attributes within a token. - 18
- In situations where
oauth.fallback.username.claim
is applicable, it may also be necessary to prevent name collisions between the values of the username claim, and those of the fallback username claim. Consider a situation where a client calledproducer
exists, but also a regular user calledproducer
exists. In order to differentiate between the two, you can use this property to add a prefix to the user ID of the client. - 19
- (Only applicable when using
oauth.introspection.endpoint.uri
) Depending on the authorization server you are using, the introspection endpoint may or may not return the token type attribute, or it may contain different values. You can specify a valid token type value that the response from the introspection endpoint has to contain. - 20
- (Only applicable when using
oauth.introspection.endpoint.uri
) The authorization server may be configured or implemented in such a way to not provide any identifiable information in an introspection endpoint response. In order to obtain the user ID, you can configure the URI of theuserinfo
endpoint as a fallback. Theoauth.username.claim
,oauth.username.prefix
,oauth.fallback.username.claim
, andoauth.fallback.username.prefix
settings are also applied to the response of theuserinfo
endpoint.
Configuring listeners for inter-broker communication
The following example uses the OAUTHBEARER
mechanism for fast token validation in a minimum configuration where inter-broker communication goes through the same listener as application clients.
The oauth.client.id
, oauth.client.secret
, and auth.token.endpoint.uri
properties relate to inter-broker communication.
Example inter-broker configuration using the OAUTHBEARER
mechanism
sasl.enabled.mechanisms=OAUTHBEARER listeners=CLIENT://0.0.0.0:9092 listener.security.protocol.map=CLIENT:SASL_PLAINTEXT listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER sasl.mechanism.inter.broker.protocol=OAUTHBEARER inter.broker.listener.name=CLIENT listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ 1 oauth.valid.issuer.uri="https://<auth_server_address>/<issuer-context>" \ oauth.jwks.endpoint.uri="https://<oauth_server_address>/<path_to_jwks_endpoint>" \ oauth.username.claim="preferred_username" \ oauth.client.id="kafka-broker" \ 2 oauth.client.secret="kafka-secret" \ 3 oauth.token.endpoint.uri="https://<oauth_server_address>/<token_endpoint>" ; 4 listener.name.client.oauthbearer.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler 5 listener.name.client.oauthbearer.connections.max.reauth.ms=3600000
- 1
- Configures authentication settings for client and inter-broker communication.
- 2
- Client ID of the Kafka broker, which is the same for all brokers. This is the client registered with the authorization server as
kafka-broker
. - 3
- Secret for the Kafka broker, which is the same for all brokers.
- 4
- The OAuth 2.0 token endpoint URL to your authorization server. For production, always use
https://
urls. - 5
- Enables (and is only required for) OAuth 2.0 authentication for inter-broker communication.
The following example shows a minimum configuration for a TLS listener used for inter-broker communication.
Example inter-broker configuration configuration with TLS
sasl.enabled.mechanisms=OAUTHBEARER listeners=REPLICATION://kafka:9091,CLIENT://kafka:9092 1 listener.security.protocol.map=REPLICATION:SSL,CLIENT:SASL_PLAINTEXT 2 listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER inter.broker.listener.name=REPLICATION listener.name.replication.ssl.keystore.password=<keystore_password> 3 listener.name.replication.ssl.truststore.password=<truststore_password> listener.name.replication.ssl.keystore.type=JKS listener.name.replication.ssl.truststore.type=JKS listener.name.replication.ssl.secure.random.implementation=SHA1PRNG 4 listener.name.replication.ssl.endpoint.identification.algorithm=HTTPS 5 listener.name.replication.ssl.keystore.location=<path_to_keystore> 6 listener.name.replication.ssl.truststore.location=<path_to_truststore> 7 listener.name.replication.ssl.client.auth=required 8 listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.valid.issuer.uri="https://<auth_server_address>/<issuer-context>" \ oauth.jwks.endpoint.uri="https://<oauth_server_address>/<path_to_jwks_endpoint>" \ oauth.username.claim="preferred_username" ;
- 1
- Separate configurations are required for inter-broker communication and client applications.
- 2
- Configures the REPLICATION listener to use TLS, and the CLIENT listener to use SASL over an unencrypted channel. The client could use an encrypted channel (
SASL_SSL
) in a production environment. - 3
- The
ssl.
properties define the TLS configuration. - 4
- Random number generator implementation. If not set, the Java platform SDK default is used.
- 5
- Hostname verification. If set to an empty string, the hostname verification is turned off. If not set, the default value is
HTTPS
, which enforces hostname verification for server certificates. - 6
- Path to the keystore for the listener.
- 7
- Path to the truststore for the listener.
- 8
- Specifies that clients of the REPLICATION listener have to authenticate with a client certificate when establishing a TLS connection (used for inter-broker connectivity).
The following example uses the PLAIN
mechanism for fast token validation in a minimum configuration where inter-broker communication goes through the same listener as application clients.
Example inter-broker configuration configuration using the PLAIN
mechanism
listeners=CLIENT://0.0.0.0:9092 listener.security.protocol.map=CLIENT:SASL_PLAINTEXT listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER,PLAIN sasl.mechanism.inter.broker.protocol=OAUTHBEARER inter.broker.listener.name=CLIENT listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.valid.issuer.uri="https:<auth_server_address>/<issuer-context>" \ oauth.jwks.endpoint.uri="https://<auth_server>/<path_to_jwks_endpoint>" \ oauth.username.claim="preferred_username" \ oauth.client.id="kafka-broker" \ oauth.client.secret="kafka-secret" \ oauth.token.endpoint.uri="https://<oauth_server_address>/<token_endpoint>" ; listener.name.client.oauthbearer.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler 1 listener.name.client.plain.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.plain.JaasServerOauthOverPlainValidatorCallbackHandler 2 listener.name.client.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ 3 oauth.valid.issuer.uri="https://<auth_server_address>/<issuer-context>" \ oauth.jwks.endpoint.uri="https://<oauth_server_address>/<path_to_jwks_endpoint>" \ oauth.username.claim="preferred_username" \ oauth.token.endpoint.uri="https://<oauth_server_address>/<token_endpoint>" ; 4 listener.name.client.oauthbearer.connections.max.reauth.ms=3600000
- 1
- Enables OAuth 2.0 authentication for inter-broker communication.
- 2
- Configures the server callback handler for
PLAIN
authentication. - 3
- Configures authentication settings for client communication using
PLAIN
authentication.oauth.token.endpoint.uri
is an optional property that enables OAuth 2.0 over PLAIN using the OAuth 2.0 client credentials mechanism. - 4
- The OAuth 2.0 token endpoint URL to your authorization server. If specified, clients can authenticate over PLAIN by passing an access token as the
password
using an$accessToken:
prefix.
7.2.2. Configuring OAuth 2.0 on client applications
To configure OAuth 2.0 on client applications, you must specify the following:
- SASL (Simple Authentication and Security Layer) security protocols
- SASL mechanisms
- A JAAS (Java Authentication and Authorization Service) module
- Authentication properties to access the authorization server
Configuring SASL protocols
Specify SASL protocols in the client configuration:
-
SASL_SSL
for authentication over TLS encrypted connections -
SASL_PLAINTEXT
for authentication over unencrypted connections
Use SASL_SSL
for production and SASL_PLAINTEXT
for local development only.
When using SASL_SSL
, additional ssl.truststore
configuration is needed. The truststore configuration is required for secure connection (https://
) to the OAuth 2.0 authorization server. To verify the OAuth 2.0 authorization server, add the CA certificate for the authorization server to the truststore in your client configuration. You can configure a truststore in PEM or PKCS #12 format.
Configuring SASL authentication mechanisms
Specify SASL mechanisms in the client configuration:
-
OAUTHBEARER
for credentials exchange using a bearer token -
PLAIN
to pass client credentials (clientId + secret) or an access token
Configuring a JAAS module
Specify a JAAS module that implements the SASL authentication mechanism as a sasl.jaas.config
property value:
-
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
implements theOAUTHBEARER
mechanism -
org.apache.kafka.common.security.plain.PlainLoginModule
implements thePLAIN
mechanism
For the OAUTHBEARER
mechanism, Streams for Apache Kafka provides a callback handler for clients that use Kafka Client Java libraries to enable credentials exchange. For clients in other languages, custom code may be required to obtain the access token. For the PLAIN
mechanism, Streams for Apache Kafka provides server-side callbacks to enable credentials exchange.
To be able to use the OAUTHBEARER
mechanism, you must also add the custom io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
class as the callback handler. JaasClientOauthLoginCallbackHandler
handles OAuth callbacks to the authorization server for access tokens during client login. This enables automatic token renewal, ensuring continuous authentication without user intervention. Additionally, it handles login credentials for clients using the OAuth 2.0 password grant method.
Configuring authentication properties
Configure the client to use credentials or access tokens for OAuth 2.0 authentication.
- Using client credentials
- Using client credentials involves configuring the client with the necessary credentials (client ID and secret, or client ID and client assertion) to obtain a valid access token from an authorization server. This is the simplest mechanism.
- Using access tokens
- Using access tokens, the client is configured with a valid long-lived access token or refresh token obtained from an authorization server. Using access tokens adds more complexity because there is an additional dependency on authorization server tools. If you are using long-lived access tokens, you may need to configure the client in the authorization server to increase the maximum lifetime of the token.
The only information ever sent to Kafka is the access token. The credentials used to obtain the token are never sent to Kafka. When a client obtains an access token, no further communication with the authorization server is needed.
SASL authentication properties support the following authentication methods:
- OAuth 2.0 client credentials
- Access token or Service account token
- Refresh token
- OAuth 2.0 password grant (deprecated)
Add the authentication properties as JAAS configuration (sasl.jaas.config
and sasl.login.callback.handler.class
).
If the client application is not configured with an access token directly, the client exchanges one of the following sets of credentials for an access token during Kafka session initiation:
- Client ID and secret
- Client ID and client assertion
- Client ID, refresh token, and (optionally) a secret
- Username and password, with client ID and (optionally) a secret
You can also specify authentication properties as environment variables, or as Java system properties. For Java system properties, you can set them using setProperty
and pass them on the command line using the -D
option.
Example client credentials configuration using the client secret
security.protocol=SASL_SSL 1 sasl.mechanism=OAUTHBEARER 2 ssl.truststore.location=/tmp/truststore.p12 3 ssl.truststore.password=$STOREPASS ssl.truststore.type=PKCS12 sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.token.endpoint.uri="<token_endpoint_url>" \ 4 oauth.client.id="<client_id>" \ 5 oauth.client.secret="<client_secret>" \ 6 oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \ 7 oauth.ssl.truststore.password="$STOREPASS" \ 8 oauth.ssl.truststore.type="PKCS12" \ 9 oauth.scope="<scope>" \ 10 oauth.audience="<audience>" ; 11 sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
- 1
SASL_SSL
security protocol for TLS-encrypted connections. UseSASL_PLAINTEXT
over unencrypted connections for local development only.- 2
- The SASL mechanism specified as
OAUTHBEARER
orPLAIN
. - 3
- The truststore configuration for secure access to the Kafka cluster.
- 4
- URI of the authorization server token endpoint.
- 5
- Client ID, which is the name used when creating the client in the authorization server.
- 6
- Client secret created when creating the client in the authorization server.
- 7
- The location contains the public key certificate (
truststore.p12
) for the authorization server. - 8
- The password for accessing the truststore.
- 9
- The truststore type.
- 10
- (Optional) The
scope
for requesting the token from the token endpoint. An authorization server may require a client to specify the scope. - 11
- (Optional) The
audience
for requesting the token from the token endpoint. An authorization server may require a client to specify the audience.
Example client credentials configuration using the client assertion
security.protocol=SASL_SSL sasl.mechanism=OAUTHBEARER ssl.truststore.location=/tmp/truststore.p12 ssl.truststore.password=$STOREPASS ssl.truststore.type=PKCS12 sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.token.endpoint.uri="<token_endpoint_url>" \ oauth.client.id="<client_id>" \ oauth.client.assertion.location="<path_to_client_assertion_token_file>" \ 1 oauth.client.assertion.type="urn:ietf:params:oauth:client-assertion-type:jwt-bearer" \ 2 oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \ oauth.ssl.truststore.password="$STOREPASS" \ oauth.ssl.truststore.type="PKCS12" \ oauth.scope="<scope>" \ oauth.audience="<audience>" ; sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
- 1
- Path to the client assertion file used for authenticating the client. This file is a private key file as an alternative to the client secret. Alternatively, use the
oauth.client.assertion
option to specify the client assertion value in clear text. - 2
- (Optional) Sometimes you may need to specify the client assertion type. In not specified, the default value is
urn:ietf:params:oauth:client-assertion-type:jwt-bearer
.
Example password grants configuration
security.protocol=SASL_SSL sasl.mechanism=OAUTHBEARER ssl.truststore.location=/tmp/truststore.p12 ssl.truststore.password=$STOREPASS ssl.truststore.type=PKCS12 sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.token.endpoint.uri="<token_endpoint_url>" \ oauth.client.id="<client_id>" \ 1 oauth.client.secret="<client_secret>" \ 2 oauth.password.grant.username="<username>" \ 3 oauth.password.grant.password="<password>" \ 4 oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \ oauth.ssl.truststore.password="$STOREPASS" \ oauth.ssl.truststore.type="PKCS12" \ oauth.scope="<scope>" \ oauth.audience="<audience>" ; sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
- 1
- Client ID, which is the name used when creating the client in the authorization server.
- 2
- (Optional) Client secret created when creating the client in the authorization server.
- 3
- Username for password grant authentication. OAuth password grant configuration (username and password) uses the OAuth 2.0 password grant method. To use password grants, create a user account for a client on your authorization server with limited permissions. The account should act like a service account. Use in environments where user accounts are required for authentication, but consider using a refresh token first.
- 4
- Password for password grant authentication.Note
SASL
PLAIN
does not support passing a username and password (password grants) using the OAuth 2.0 password grant method.
Example access token configuration
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
ssl.truststore.location=/tmp/truststore.p12
ssl.truststore.password=$STOREPASS
ssl.truststore.type=PKCS12
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
oauth.access.token="<access_token>" ; 1
sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
- 1
- Long-lived access token for Kafka clients. Alternatively,
oauth.access.token.location
can be used to specify the file that contains the access token.
Example OpenShift service account token configuration
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
ssl.truststore.location=/tmp/truststore.p12
ssl.truststore.password=$STOREPASS
ssl.truststore.type=PKCS12
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
oauth.access.token.location="/var/run/secrets/kubernetes.io/serviceaccount/token"; 1
sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
- 1
- Location to the service account token on the filesystem (assuming that the client is deployed as an OpenShift pod)
Example refresh token configuration
security.protocol=SASL_SSL sasl.mechanism=OAUTHBEARER ssl.truststore.location=/tmp/truststore.p12 ssl.truststore.password=$STOREPASS ssl.truststore.type=PKCS12 sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.token.endpoint.uri="<token_endpoint_url>" \ oauth.client.id="<client_id>" \ 1 oauth.client.secret="<client_secret>" \ 2 oauth.refresh.token="<refresh_token>" \ 3 oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \ oauth.ssl.truststore.password="$STOREPASS" \ oauth.ssl.truststore.type="PKCS12" ; sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
SASL extensions for custom OAUTHBEARER
implementations
If your Kafka broker uses a custom OAUTHBEARER
implementation, you may need to pass additional SASL extension options. These extensions can include attributes or information required as client context by the authorization server. The options are passed as key-value pairs and are sent to the Kafka broker when a new session is started.
Pass SASL extension values using oauth.sasl.extension.
as a key prefix.
Example configuration to pass SASL extension values
oauth.sasl.extension.key1="value1" oauth.sasl.extension.key2="value2"
7.2.3. OAuth 2.0 client authentication flows
OAuth 2.0 authentication flows depend on the underlying Kafka client and Kafka broker configuration. The flows must also be supported by the authorization server used.
The Kafka broker listener configuration determines how clients authenticate using an access token. The client can pass a client ID and secret to request an access token.
If a listener is configured to use PLAIN
authentication, the client can authenticate with a client ID and secret or username and access token. These values are passed as the username
and password
properties of the PLAIN
mechanism.
Listener configuration supports the following token validation options:
- You can use fast local token validation based on JWT signature checking and local token introspection, without contacting an authorization server. The authorization server provides a JWKS endpoint with public certificates that are used to validate signatures on the tokens.
- You can use a call to a token introspection endpoint provided by an authorization server. Each time a new Kafka broker connection is established, the broker passes the access token received from the client to the authorization server. The Kafka broker checks the response to confirm whether the token is valid.
An authorization server might only allow the use of opaque access tokens, which means that local token validation is not possible.
Kafka client credentials can also be configured for the following types of authentication:
- Direct local access using a previously generated long-lived access token
- Contact with the authorization server for a new access token to be issued (using a client ID and credentials, or a refresh token, or a username and a password)
7.2.3.1. Example client authentication flows using the SASL OAUTHBEARER
mechanism
You can use the following communication flows for Kafka authentication using the SASL OAUTHBEARER
mechanism.
Client using client ID and credentials, with broker delegating validation to authorization server
- The Kafka client requests an access token from the authorization server using a client ID and credentials, and optionally a refresh token. Alternatively, the client may authenticate using a username and a password.
- The authorization server generates a new access token.
-
The Kafka client authenticates with the Kafka broker using the SASL
OAUTHBEARER
mechanism to pass the access token. - The Kafka broker validates the access token by calling a token introspection endpoint on the authorization server using its own client ID and secret.
- A Kafka client session is established if the token is valid.
Client using client ID and credentials, with broker performing fast local token validation
- The Kafka client authenticates with the authorization server from the token endpoint, using a client ID and credentials, and optionally a refresh token. Alternatively, the client may authenticate using a username and a password.
- The authorization server generates a new access token.
-
The Kafka client authenticates with the Kafka broker using the SASL
OAUTHBEARER
mechanism to pass the access token. - The Kafka broker validates the access token locally using a JWT token signature check, and local token introspection.
Client using long-lived access token, with broker delegating validation to authorization server
-
The Kafka client authenticates with the Kafka broker using the SASL
OAUTHBEARER
mechanism to pass the long-lived access token. - The Kafka broker validates the access token by calling a token introspection endpoint on the authorization server, using its own client ID and secret.
- A Kafka client session is established if the token is valid.
Client using long-lived access token, with broker performing fast local validation
-
The Kafka client authenticates with the Kafka broker using the SASL
OAUTHBEARER
mechanism to pass the long-lived access token. - The Kafka broker validates the access token locally using a JWT token signature check and local token introspection.
Fast local JWT token signature validation is suitable only for short-lived tokens as there is no check with the authorization server if a token has been revoked. Token expiration is written into the token, but revocation can happen at any time, so cannot be accounted for without contacting the authorization server. Any issued token would be considered valid until it expires.
7.2.3.2. Example client authentication flows using the SASL PLAIN
mechanism
You can use the following communication flows for Kafka authentication using the OAuth PLAIN
mechanism.
Client using a client ID and secret, with the broker obtaining the access token for the client
-
The Kafka client passes a
clientId
as a username and asecret
as a password. -
The Kafka broker uses a token endpoint to pass the
clientId
andsecret
to the authorization server. - The authorization server returns a fresh access token or an error if the client credentials are not valid.
The Kafka broker validates the token in one of the following ways:
- If a token introspection endpoint is specified, the Kafka broker validates the access token by calling the endpoint on the authorization server. A session is established if the token validation is successful.
- If local token introspection is used, a request is not made to the authorization server. The Kafka broker validates the access token locally using a JWT token signature check.
Client using a long-lived access token without a client ID and secret
- The Kafka client passes a username and password. The password provides the value of an access token that was obtained manually and configured before running the client.
The password is passed with or without an
$accessToken:
string prefix depending on whether or not the Kafka broker listener is configured with a token endpoint for authentication.-
If the token endpoint is configured, the password should be prefixed by
$accessToken:
to let the broker know that the password parameter contains an access token rather than a client secret. The Kafka broker interprets the username as the account username. -
If the token endpoint is not configured on the Kafka broker listener (enforcing a
no-client-credentials mode
), the password should provide the access token without the prefix. The Kafka broker interprets the username as the account username. In this mode, the client doesn’t use a client ID and secret, and thepassword
parameter is always interpreted as a raw access token.
-
If the token endpoint is configured, the password should be prefixed by
The Kafka broker validates the token in one of the following ways:
- If a token introspection endpoint is specified, the Kafka broker validates the access token by calling the endpoint on the authorization server. A session is established if token validation is successful.
- If local token introspection is used, there is no request made to the authorization server. Kafka broker validates the access token locally using a JWT token signature check.
7.2.4. Re-authenticating sessions
You can configure OAuth listeners to use Kafka session re-authentication for OAuth 2.0 sessions between Kafka clients and Kafka brokers. This mechanism enforces the expiry of an authenticated session between the client and the broker after a defined period of time. When a session expires, the client immediately starts a new session by reusing the existing connection rather than dropping it.
Session re-authentication is disabled by default. To enable it, set a time value for the connections.max.reauth.ms
property in the server.properties
file. For an example configuration, see Section 7.2.1, “Configuring OAuth 2.0 authentication on listeners”.
Session re-authentication must be supported by the Kafka client libraries used by the client.
Session re-authentication can be used with fast local JWT or introspection endpoint token validation.
Client re-authentication
When the broker’s authenticated session expires, the client must re-authenticate to the existing session by sending a new, valid access token to the broker, without dropping the connection.
If token validation is successful, a new client session is started using the existing connection. If the client fails to re-authenticate, the broker will close the connection if further attempts are made to send or receive messages. Java clients that use Kafka client library 2.2 or later automatically re-authenticate if the re-authentication mechanism is enabled on the broker.
Session re-authentication also applies to refresh tokens, if used. When the session expires, the client refreshes the access token by using its refresh token. The client then uses the new access token to re-authenticate over the existing connection.
Session expiry for OAUTHBEARER and PLAIN
When session re-authentication is configured, session expiry works differently for OAUTHBEARER and PLAIN authentication.
For OAUTHBEARER and PLAIN, using the client ID and secret method:
-
The broker’s authenticated session will expire at the configured
connections.max.reauth.ms
. - The session will expire earlier if the access token expires before the configured time.
For PLAIN using the long-lived access token method:
-
The broker’s authenticated session will expire at the configured
connections.max.reauth.ms
. - Re-authentication will fail if the access token expires before the configured time. Although session re-authentication is attempted, PLAIN has no mechanism for refreshing tokens.
If connections.max.reauth.ms
is not configured, OAUTHBEARER and PLAIN clients can remain connected to brokers indefinitely, without needing to re-authenticate. Authenticated sessions do not end with access token expiry.
However, this can be considered when configuring authorization, for example, by using keycloak
authorization or installing a custom authorizer.
7.2.5. Example: Enabling OAuth 2.0 authentication
This example shows how to configure client access to a Kafka cluster using OAUth 2.0 authentication. The procedures describe the configuration required to set up OAuth 2.0 authentication on Kafka listeners and Kafka Java clients.
7.2.5.1. Configuring OAuth 2.0 support for Kafka brokers
This procedure describes how to configure Kafka brokers so that the broker listeners are enabled to use OAuth 2.0 authentication using an authorization server.
We advise use of OAuth 2.0 over an encrypted interface through configuration of TLS listeners. Plain listeners are not recommended.
Configure the Kafka brokers using properties that support your chosen authorization server, and the type of authorization you are implementing.
Prerequisites
- Streams for Apache Kafka is installed on each host, and the configuration files are available.
- An OAuth 2.0 authorization server is deployed.
Procedure
Configure the Kafka broker listener configuration in the
server.properties
file.For example, using the OAUTHBEARER mechanism:
sasl.enabled.mechanisms=OAUTHBEARER listeners=CLIENT://0.0.0.0:9092 listener.security.protocol.map=CLIENT:SASL_PLAINTEXT listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER sasl.mechanism.inter.broker.protocol=OAUTHBEARER inter.broker.listener.name=CLIENT listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required ; listener.name.client.oauthbearer.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
Configure broker connection settings as part of the
listener.name.client.oauthbearer.sasl.jaas.config
.If required, configure access to the authorization server.
This step is normally required for a production environment, unless a technology like service mesh is used to configure secure channels outside containers.
Provide a custom truststore for connecting to a secured authorization server. SSL is always required for access to the authorization server.
Set properties to configure the truststore.
For example:
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ # ... oauth.client.id="kafka-broker" \ oauth.client.secret="kafka-broker-secret" \ oauth.ssl.truststore.location="<path_to_truststore_p12_file>" \ oauth.ssl.truststore.password="<truststore_password>" \ oauth.ssl.truststore.type="PKCS12" ;
If the certificate hostname does not match the access URL hostname, you can turn off certificate hostname validation:
oauth.ssl.endpoint.identification.algorithm=""
The check ensures that client connection to the authorization server is authentic. You may wish to turn off the validation in a non-production environment.
What to do next
7.2.5.2. Setting up OAuth 2.0 on Kafka Java clients
Configure Kafka producer and consumer APIs to use OAuth 2.0 for interaction with Kafka brokers. Add a callback plugin to your client pom.xml
file, then configure your client for OAuth 2.0.
How you configure the authentication properties depends on the authentication method you are using to access the OAuth 2.0 authorization server. In this procedure, the properties are specified in a properties file, then loaded into the client configuration.
Prerequisites
- Streams for Apache Kafka and Kafka are running
- An OAuth 2.0 authorization server is deployed and configured for OAuth access to Kafka brokers
- Kafka brokers are configured for OAuth 2.0
Procedure
Add the client library with OAuth 2.0 support to the
pom.xml
file for the Kafka client:<dependency> <groupId>io.strimzi</groupId> <artifactId>kafka-oauth-client</artifactId> <version>0.15.0.redhat-00012</version> </dependency>
Configure the client depending on the OAuth 2.0 authentication method:
For example, specify the properties for the authentication method in a
client.properties
file.Input the client properties for OAUTH 2.0 authentication into the Java client code.
Example showing input of client properties
Properties props = new Properties(); try (FileReader reader = new FileReader("client.properties", StandardCharsets.UTF_8)) { props.load(reader); }
- Verify that the Kafka client can access the Kafka brokers.
7.3. Using OAuth 2.0 token-based authorization
Streams for Apache Kafka supports the use of OAuth 2.0 token-based authorization through Red Hat build of Keycloak Authorization Services, which lets you manage security policies and permissions centrally.
Security policies and permissions defined in Red Hat build of Keycloak grant access to Kafka resources. Users and clients are matched against policies that permit access to perform specific actions on Kafka brokers.
Kafka allows all users full access to brokers by default, but also provides the AclAuthorizer
and StandardAuthorizer
plugins to configure authorization based on Access Control Lists (ACLs). The ACL rules managed by these plugins are used to grant or deny access to resources based on username, and these rules are stored within the Kafka cluster itself.
However, OAuth 2.0 token-based authorization with Red Hat build of Keycloak offers far greater flexibility on how you wish to implement access control to Kafka brokers. In addition, you can configure your Kafka brokers to use OAuth 2.0 authorization and ACLs.
7.3.1. Example: Enabling OAuth 2.0 authorization
This procedure describes how to configure Kafka brokers to use OAuth 2.0 authorization using Red Hat build of Keycloak Authorization Services.
Red Hat build of Keycloak server Authorization Services REST endpoints extend token-based authentication with Red Hat build of Keycloak by applying defined security policies on a particular user, and providing a list of permissions granted on different resources for that user. Policies use roles and groups to match permissions to users. OAuth 2.0 authorization enforces permissions locally based on the received list of grants for the user from Red Hat build of Keycloak Authorization Services.
A Red Hat build of Keycloak authorizer (KeycloakAuthorizer
) is provided with Streams for Apache Kafka. The authorizer fetches a list of granted permissions from the authorization server as needed, and enforces authorization locally on Kafka, making rapid authorization decisions for each client request.
Before you begin
Consider the access you require or want to limit for certain users. You can use a combination of Red Hat build of Keycloak groups, roles, clients, and users to configure access in Red Hat build of Keycloak.
Typically, groups are used to match users based on organizational departments or geographical locations. And roles are used to match users based on their function.
With Red Hat build of Keycloak, you can store users and groups in LDAP, whereas clients and roles cannot be stored this way. Storage and access to user data may be a factor in how you choose to configure authorization policies.
Super users always have unconstrained access to a Kafka broker regardless of the authorization implemented on the Kafka broker.
Prerequisites
- Streams for Apache Kafka must be configured to use OAuth 2.0 with Red Hat build of Keycloak token-based authentication. You use the same RRed Hat build of Keycloak endpoint when you set up authorization.
- You need to understand how to manage policies and permissions for Red Hat build of Keycloak Authorization Services, as described in the Red Hat build of Keycloak documentation.
Procedure
- Access the Red Hat build of Keycloak Admin Console or use the Red Hat build of Keycloak Admin CLI to enable Authorization Services for the OAuth 2.0 client for Kafka you created when setting up OAuth 2.0 authentication.
- Use Authorization Services to define resources, authorization scopes, policies, and permissions for the client.
- Bind the permissions to users and clients by assigning them roles and groups.
Configure the Kafka brokers to use Red Hat build of Keycloak authorization.
Add the following to the Kafka
server.properties
configuration file to install the authorizer in Kafka:authorizer.class.name=io.strimzi.kafka.oauth.server.authorizer.KeycloakAuthorizer principal.builder.class=io.strimzi.kafka.oauth.server.OAuthKafkaPrincipalBuilder
Add configuration for the Kafka brokers to access the authorization server and Authorization Services.
Here we show example configuration added as additional properties to
server.properties
, but you can also define them as environment variables using capitalized or upper-case naming conventions.strimzi.authorization.token.endpoint.uri="https://<auth_server_address>/auth/realms/REALM-NAME/protocol/openid-connect/token" 1 strimzi.authorization.client.id="kafka" 2
(Optional) Add configuration for specific Kafka clusters.
For example:
strimzi.authorization.kafka.cluster.name="kafka-cluster" 1
- 1
- The name of a specific Kafka cluster. Names are used to target permissions, making it possible to manage multiple clusters within the same Red Hat build of Keycloak realm. The default value is
kafka-cluster
.
(Optional) Delegate to simple authorization:
strimzi.authorization.delegate.to.kafka.acl="true" 1
- 1
- Delegate authorization to Kafka
AclAuthorizer
if access is denied by Red Hat build of Keycloak Authorization Services policies. The default isfalse
.
(Optional) Add configuration for TLS connection to the authorization server.
For example:
strimzi.authorization.ssl.truststore.location=<path_to_truststore> 1 strimzi.authorization.ssl.truststore.password=<my_truststore_password> 2 strimzi.authorization.ssl.truststore.type=JKS 3 strimzi.authorization.ssl.secure.random.implementation=SHA1PRNG 4 strimzi.authorization.ssl.endpoint.identification.algorithm=HTTPS 5
- 1
- The path to the truststore that contain the certificates.
- 2
- The password for the truststore.
- 3
- The truststore type. If not set, the default Java keystore type is used.
- 4
- Random number generator implementation. If not set, the Java platform SDK default is used.
- 5
- Hostname verification. If set to an empty string, the hostname verification is turned off. If not set, the default value is
HTTPS
, which enforces hostname verification for server certificates.
(Optional) Configure the refresh of grants from the authorization server. The grants refresh job works by enumerating the active tokens and requesting the latest grants for each.
For example:
strimzi.authorization.grants.refresh.period.seconds="120" 1 strimzi.authorization.grants.refresh.pool.size="10" 2 strimzi.authorization.grants.max.idle.time.seconds="300" 3 strimzi.authorization.grants.gc.period.seconds="300" 4 strimzi.authorization.reuse.grants="false" 5
- 1
- Specifies how often the list of grants from the authorization server is refreshed (once per minute by default). To turn grants refresh off for debugging purposes, set to
"0"
. - 2
- Specifies the size of the thread pool (the degree of parallelism) used by the grants refresh job. The default value is
"5"
. - 3
- The time, in seconds, after which an idle grant in the cache can be evicted. The default value is 300.
- 4
- The time, in seconds, between consecutive runs of a job that cleans stale grants from the cache. The default value is 300.
- 5
- Controls whether the latest grants are fetched for a new session. When disabled, grants are retrieved from Red Hat build of Keycloak and cached for the user. The default value is
true
.
(Optional) Configure network timeouts when communicating with the authorization server.
For example:
strimzi.authorization.connect.timeout.seconds="60" 1 strimzi.authorization.read.timeout.seconds="60" 2 strimzi.authorization.http.retries="2" 3
- 1
- The connect timeout in seconds when connecting to the Red Hat build of Keycloak token endpoint. The default value is
60
. - 2
- The read timeout in seconds when connecting to the Red Hat build of Keycloak token endpoint. The default value is
60
. - 3
- The maximum number of times to retry (without pausing) a failed HTTP request to the authorization server. The default value is
0
, meaning that no retries are performed. To use this option effectively, consider reducing the timeout times for thestrimzi.authorization.connect.timeout.seconds
andstrimzi.authorization.read.timeout.seconds
options. However, note that retries may prevent the current worker thread from being available to other requests, and if too many requests stall, it could make Kafka unresponsive.
(Optional) Enable OAuth 2.0 metrics for token validation and authorization:
oauth.enable.metrics="true" 1
- 1
- Controls whether to enable or disable OAuth metrics. The default value is
false
.
(Optional) Remove the
Accept
header from requests:oauth.include.accept.header="false" 1
- 1
- Set to
false
if including the header is causing issues when communicating with the authorization server. The default value istrue
.
- Verify the configured permissions by accessing Kafka brokers as clients or users with specific roles, ensuring they have the necessary access and do not have unauthorized access.