Chapter 6. Managing secure access to Kafka
You can secure your Kafka cluster by managing the access each client has to the Kafka brokers.
A secure connection between Kafka brokers and clients can encompass:
- Encryption for data exchange
- Authentication to prove identity
- Authorization to allow or decline actions executed by users
This chapter explains how to set up secure connections between Kafka brokers and clients, with sections describing:
- Security options for Kafka clusters and clients
- How to secure Kafka brokers
- How to use an authorization server for OAuth 2.0 token-based authentication and authorization
6.1. Security options for Kafka
Use the Kafka
resource to configure the mechanisms used for Kafka authentication and authorization.
6.1.1. Listener authentication
Configure client authentication for Kafka brokers by creating listeners. Specify the listener authentication type using the Kafka.spec.kafka.listeners.authentication
property in the Kafka
resource.
For clients inside the OpenShift cluster, you can create plain
(without encryption) or tls
internal listeners. The internal
listener type use a headless service and the DNS names given to the broker pods. As an alternative to the headless service, you can also create a cluster-ip
type of internal listener to expose Kafka using per-broker ClusterIP
services. For clients outside the OpenShift cluster, you create external listeners and specify a connection mechanism, which can be nodeport
, loadbalancer
, ingress
, or route
(on OpenShift).
For more information on the configuration options for connecting an external client, see Accessing Kafka outside of the OpenShift cluster.
Supported authentication options:
- mTLS authentication (only on the listeners with TLS enabled encryption)
- SCRAM-SHA-512 authentication
- OAuth 2.0 token-based authentication
- Custom authentication
The authentication option you choose depends on how you wish to authenticate client access to Kafka brokers.
Try exploring the standard authentication options before using custom authentication. Custom authentication allows for any type of kafka-supported authentication. It can provide more flexibility, but also adds complexity.
Figure 6.1. Kafka listener authentication options
The listener authentication
property is used to specify an authentication mechanism specific to that listener.
If no authentication
property is specified then the listener does not authenticate clients which connect through that listener. The listener will accept all connections without authentication.
Authentication must be configured when using the User Operator to manage KafkaUsers
.
The following example shows:
-
A
plain
listener configured for SCRAM-SHA-512 authentication -
A
tls
listener with mTLS authentication -
An
external
listener with mTLS authentication
Each listener is configured with a unique name and port within a Kafka cluster.
Listeners cannot be configured to use the ports reserved for inter-broker communication (9091 or 9090) and metrics (9404).
Example listener authentication configuration
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: my-cluster namespace: myproject spec: kafka: # ... listeners: - name: plain port: 9092 type: internal tls: true authentication: type: scram-sha-512 - name: tls port: 9093 type: internal tls: true authentication: type: tls - name: external port: 9094 type: loadbalancer tls: true authentication: type: tls # ...
6.1.1.1. mTLS authentication
mTLS authentication is always used for the communication between Kafka brokers and ZooKeeper pods.
AMQ Streams can configure Kafka to use TLS (Transport Layer Security) to provide encrypted communication between Kafka brokers and clients either with or without mutual authentication. For mutual, or two-way, authentication, both the server and the client present certificates. When you configure mTLS authentication, the broker authenticates the client (client authentication) and the client authenticates the broker (server authentication).
mTLS listener configuration in the Kafka
resource requires the following:
-
tls: true
to specify TLS encryption and server authentication -
authentication.type: tls
to specify the client authentication
When a Kafka cluster is created by the Cluster Operator, it creates a new secret with the name <cluster_name>-cluster-ca-cert
. The secret contains a CA certificate. The CA certificate is in PEM and PKCS #12 format. To verify a Kafka cluster, add the CA certificate to the truststore in your client configuration. To verify a client, add a user certificate and key to the keystore in your client configuration. For more information on configuring a client for mTLS, see Section 6.2.2, “User authentication”.
TLS authentication is more commonly one-way, with one party authenticating the identity of another. For example, when HTTPS is used between a web browser and a web server, the browser obtains proof of the identity of the web server.
6.1.1.2. SCRAM-SHA-512 authentication
SCRAM (Salted Challenge Response Authentication Mechanism) is an authentication protocol that can establish mutual authentication using passwords. AMQ Streams can configure Kafka to use SASL (Simple Authentication and Security Layer) SCRAM-SHA-512 to provide authentication on both unencrypted and encrypted client connections.
When SCRAM-SHA-512 authentication is used with a TLS connection, the TLS protocol provides the encryption, but is not used for authentication.
The following properties of SCRAM make it safe to use SCRAM-SHA-512 even on unencrypted connections:
- The passwords are not sent in the clear over the communication channel. Instead the client and the server are each challenged by the other to offer proof that they know the password of the authenticating user.
- The server and client each generate a new challenge for each authentication exchange. This means that the exchange is resilient against replay attacks.
When KafkaUser.spec.authentication.type
is configured with scram-sha-512
the User Operator will generate a random 12-character password consisting of upper and lowercase ASCII letters and numbers.
6.1.1.3. Network policies
By default, AMQ Streams automatically creates a NetworkPolicy
resource for every listener that is enabled on a Kafka broker. This NetworkPolicy
allows applications to connect to listeners in all namespaces. Use network policies as part of the listener configuration.
If you want to restrict access to a listener at the network level to only selected applications or namespaces, use the networkPolicyPeers
property. Each listener can have a different networkPolicyPeers
configuration. For more information on network policy peers, refer to the NetworkPolicyPeer API reference.
If you want to use custom network policies, you can set the STRIMZI_NETWORK_POLICY_GENERATION
environment variable to false
in the Cluster Operator configuration. For more information, see Cluster Operator configuration.
Your configuration of OpenShift must support ingress NetworkPolicies
in order to use network policies in AMQ Streams.
6.1.1.4. Additional listener configuration options
You can use the properties of the GenericKafkaListenerConfiguration schema to add further configuration to listeners.
6.1.2. Kafka authorization
Configure authorization for Kafka brokers using the Kafka.spec.kafka.authorization
property in the Kafka
resource. If the authorization
property is missing, no authorization is enabled and clients have no restrictions. When enabled, authorization is applied to all enabled listeners. The authorization method is defined in the type
field.
Supported authorization options:
- Simple authorization
- OAuth 2.0 authorization (if you are using OAuth 2.0 token based authentication)
- Open Policy Agent (OPA) authorization
- Custom authorization
Figure 6.2. Kafka cluster authorization options
6.1.2.1. Super users
Super users can access all resources in your Kafka cluster regardless of any access restrictions, and are supported by all authorization mechanisms.
To designate super users for a Kafka cluster, add a list of user principals to the superUsers
property. If a user uses mTLS authentication, the username is the common name from the TLS certificate subject prefixed with CN=
. If you are not using the User Operator and using your own certificates for mTLS, the username is the full certificate subject. A full certificate subject can have the following fields: CN=user,OU=my_ou,O=my_org,L=my_location,ST=my_state,C=my_country_code
. Omit any fields that are not present.
An example configuration with super users
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: my-cluster namespace: myproject spec: kafka: # ... authorization: type: simple superUsers: - CN=client_1 - user_2 - CN=client_3 - CN=client_4,OU=my_ou,O=my_org,L=my_location,ST=my_state,C=US - CN=client_5,OU=my_ou,O=my_org,C=GB - CN=client_6,O=my_org # ...
6.2. Security options for Kafka clients
Use the KafkaUser
resource to configure the authentication mechanism, authorization mechanism, and access rights for Kafka clients. In terms of configuring security, clients are represented as users.
You can authenticate and authorize user access to Kafka brokers. Authentication permits access, and authorization constrains the access to permissible actions.
You can also create super users that have unconstrained access to Kafka brokers.
The authentication and authorization mechanisms must match the specification for the listener used to access the Kafka brokers.
Configuring users for secure access to Kafka brokers
For more information on configuring a KafkaUser
resource to access Kafka brokers securely, see the following sections:
6.2.1. Identifying a Kafka cluster for user handling
A KafkaUser
resource includes a label that defines the appropriate name of the Kafka cluster (derived from the name of the Kafka
resource) to which it belongs.
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaUser metadata: name: my-user labels: strimzi.io/cluster: my-cluster
The label is used by the User Operator to identify the KafkaUser
resource and create a new user, and also in subsequent handling of the user.
If the label does not match the Kafka cluster, the User Operator cannot identify the KafkaUser
and the user is not created.
If the status of the KafkaUser
resource remains empty, check your label.
6.2.2. User authentication
Use the KafkaUser
custom resource to configure authentication credentials for users (clients) that require access to a Kafka cluster. Configure the credentials using the authentication
property in KafkaUser.spec
. By specifying a type
, you control what credentials are generated.
Supported authentication types:
-
tls
for mTLS authentication -
tls-external
for mTLS authentication using external certificates -
scram-sha-512
for SCRAM-SHA-512 authentication
If tls
or scram-sha-512
is specified, the User Operator creates authentication credentials when it creates the user. If tls-external
is specified, the user still uses mTLS, but no authentication credentials are created. Use this option when you’re providing your own certificates. When no authentication type is specified, the User Operator does not create the user or its credentials.
You can use tls-external
to authenticate with mTLS using a certificate issued outside the User Operator. The User Operator does not generate a TLS certificate or a secret. You can still manage ACL rules and quotas through the User Operator in the same way as when you’re using the tls
mechanism. This means that you use the CN=USER-NAME
format when specifying ACL rules and quotas. USER-NAME is the common name given in a TLS certificate.
6.2.2.1. mTLS authentication
To use mTLS authentication, you set the type
field in the KafkaUser
resource to tls
.
Example user with mTLS authentication enabled
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaUser metadata: name: my-user labels: strimzi.io/cluster: my-cluster spec: authentication: type: tls # ...
The authentication type must match the equivalent configuration for the Kafka
listener used to access the Kafka cluster.
When the user is created by the User Operator, it creates a new secret with the same name as the KafkaUser
resource. The secret contains a private and public key for mTLS. The public key is contained in a user certificate, which is signed by a clients CA (certificate authority) when it is created. All keys are in X.509 format.
If you are using the clients CA generated by the Cluster Operator, the user certificates generated by the User Operator are also renewed when the clients CA is renewed by the Cluster Operator.
The user secret provides keys and certificates in PEM and PKCS #12 formats.
Example secret with user credentials
apiVersion: v1 kind: Secret metadata: name: my-user labels: strimzi.io/kind: KafkaUser strimzi.io/cluster: my-cluster type: Opaque data: ca.crt: <public_key> # Public key of the clients CA user.crt: <user_certificate> # Public key of the user user.key: <user_private_key> # Private key of the user user.p12: <store> # PKCS #12 store for user certificates and keys user.password: <password_for_store> # Protects the PKCS #12 store
When you configure a client, you specify the following:
- Truststore properties for the public cluster CA certificate to verify the identity of the Kafka cluster
- Keystore properties for the user authentication credentials to verify the client
The configuration depends on the file format (PEM or PKCS #12). This example uses PKCS #12 stores, and the passwords required to access the credentials in the stores.
Example client configuration using mTLS in PKCS #12 format
bootstrap.servers=<kafka_cluster_name>-kafka-bootstrap:9093 1 security.protocol=SSL 2 ssl.truststore.location=/tmp/ca.p12 3 ssl.truststore.password=<truststore_password> 4 ssl.keystore.location=/tmp/user.p12 5 ssl.keystore.password=<keystore_password> 6
- 1
- The bootstrap server address to connect to the Kafka cluster.
- 2
- The security protocol option when using TLS for encryption.
- 3
- The truststore location contains the public key certificate (
ca.p12
) for the Kafka cluster. A cluster CA certificate and password is generated by the Cluster Operator in the<cluster_name>-cluster-ca-cert
secret when the Kafka cluster is created. - 4
- The password (
ca.password
) for accessing the truststore. - 5
- The keystore location contains the public key certificate (
user.p12
) for the Kafka user. - 6
- The password (
user.password
) for accessing the keystore.
6.2.2.2. mTLS authentication using a certificate issued outside the User Operator
To use mTLS authentication using a certificate issued outside the User Operator, you set the type
field in the KafkaUser
resource to tls-external
. A secret and credentials are not created for the user.
Example user with mTLS authentication that uses a certificate issued outside the User Operator
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaUser metadata: name: my-user labels: strimzi.io/cluster: my-cluster spec: authentication: type: tls-external # ...
6.2.2.3. SCRAM-SHA-512 authentication
To use the SCRAM-SHA-512 authentication mechanism, you set the type
field in the KafkaUser
resource to scram-sha-512
.
Example user with SCRAM-SHA-512 authentication enabled
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaUser metadata: name: my-user labels: strimzi.io/cluster: my-cluster spec: authentication: type: scram-sha-512 # ...
When the user is created by the User Operator, it creates a new secret with the same name as the KafkaUser
resource. The secret contains the generated password in the password
key, which is encoded with base64. In order to use the password, it must be decoded.
Example secret with user credentials
apiVersion: v1 kind: Secret metadata: name: my-user labels: strimzi.io/kind: KafkaUser strimzi.io/cluster: my-cluster type: Opaque data: password: Z2VuZXJhdGVkcGFzc3dvcmQ= 1 sasl.jaas.config: b3JnLmFwYWNoZS5rYWZrYS5jb21tb24uc2VjdXJpdHkuc2NyYW0uU2NyYW1Mb2dpbk1vZHVsZSByZXF1aXJlZCB1c2VybmFtZT0ibXktdXNlciIgcGFzc3dvcmQ9ImdlbmVyYXRlZHBhc3N3b3JkIjsK 2
Decoding the generated password:
echo "Z2VuZXJhdGVkcGFzc3dvcmQ=" | base64 --decode
6.2.2.3.1. Custom password configuration
When a user is created, AMQ Streams generates a random password. You can use your own password instead of the one generated by AMQ Streams. To do so, create a secret with the password and reference it in the KafkaUser
resource.
Example user with a password set for SCRAM-SHA-512 authentication
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaUser metadata: name: my-user labels: strimzi.io/cluster: my-cluster spec: authentication: type: scram-sha-512 password: valueFrom: secretKeyRef: name: my-secret 1 key: my-password 2 # ...
6.2.3. User authorization
Use the KafkaUser
custom resource to configure authorization rules for users (clients) that require access to a Kafka cluster. Configure the rules using the authorization
property in KafkaUser.spec
. By specifying a type
, you control what rules are used.
To use simple authorization, you set the type
property to simple
in KafkaUser.spec.authorization
. The simple authorization uses the Kafka Admin API to manage the ACL rules inside your Kafka cluster. Whether ACL management in the User Operator is enabled or not depends on your authorization configuration in the Kafka cluster.
- For simple authorization, ACL management is always enabled.
- For OPA authorization, ACL management is always disabled. Authorization rules are configured in the OPA server.
- For Red Hat Single Sign-On authorization, you can manage the ACL rules directly in Red Hat Single Sign-On. You can also delegate authorization to the simple authorizer as a fallback option in the configuration. When delegation to the simple authorizer is enabled, the User Operator will enable management of ACL rules as well.
-
For custom authorization using a custom authorization plugin, use the
supportsAdminApi
property in the.spec.kafka.authorization
configuration of theKafka
custom resource to enable or disable the support.
Authorization is cluster-wide. The authorization type must match the equivalent configuration in the Kafka
custom resource.
If ACL management is not enabled, AMQ Streams rejects a resource if it contains any ACL rules.
If you’re using a standalone deployment of the User Operator, ACL management is enabled by default. You can disable it using the STRIMZI_ACLS_ADMIN_API_SUPPORTED
environment variable.
If no authorization is specified, the User Operator does not provision any access rights for the user. Whether such a KafkaUser
can still access resources depends on the authorizer being used. For example, for the AclAuthorizer
this is determined by its allow.everyone.if.no.acl.found
configuration.
6.2.3.1. ACL rules
AclAuthorizer
uses ACL rules to manage access to Kafka brokers.
ACL rules grant access rights to the user, which you specify in the acls
property.
For more information about the AclRule
object, see the AclRule
schema reference.
6.2.3.2. Super user access to Kafka brokers
If a user is added to a list of super users in a Kafka broker configuration, the user is allowed unlimited access to the cluster regardless of any authorization constraints defined in ACLs in KafkaUser
.
For more information on configuring super user access to brokers, see Kafka authorization.
6.2.3.3. User quotas
You can configure the spec
for the KafkaUser
resource to enforce quotas so that a user does not exceed a configured level of access to Kafka brokers. You can set size-based network usage and time-based CPU utilization thresholds. You can also add a partition mutation quota to control the rate at which requests to change partitions are accepted for user requests.
An example KafkaUser
with user quotas
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaUser metadata: name: my-user labels: strimzi.io/cluster: my-cluster spec: # ... quotas: producerByteRate: 1048576 1 consumerByteRate: 2097152 2 requestPercentage: 55 3 controllerMutationRate: 10 4
- 1
- Byte-per-second quota on the amount of data the user can push to a Kafka broker
- 2
- Byte-per-second quota on the amount of data the user can fetch from a Kafka broker
- 3
- CPU utilization limit as a percentage of time for a client group
- 4
- Number of concurrent partition creation and deletion operations (mutations) allowed per second
For more information on these properties, see the KafkaUserQuotas
schema reference.
6.3. Securing access to Kafka brokers
To establish secure access to Kafka brokers, you configure and apply:
A
Kafka
resource to:- Create listeners with a specified authentication type
- Configure authorization for the whole Kafka cluster
-
A
KafkaUser
resource to access the Kafka brokers securely through the listeners
Configure the Kafka
resource to set up:
- Listener authentication
- Network policies that restrict access to Kafka listeners
- Kafka authorization
- Super users for unconstrained access to brokers
Authentication is configured independently for each listener. Authorization is always configured for the whole Kafka cluster.
The Cluster Operator creates the listeners and sets up the cluster and client certificate authority (CA) certificates to enable authentication within the Kafka cluster.
You can replace the certificates generated by the Cluster Operator by installing your own certificates. You can also configure your listener to use a Kafka listener certificate managed by an external CA (certificate authority). Certificates are available in PKCS #12 format (.p12) and PEM (.crt) formats.
Use KafkaUser
to enable the authentication and authorization mechanisms that a specific client uses to access Kafka.
Configure the KafkaUser
resource to set up:
- Authentication to match the enabled listener authentication
- Authorization to match the enabled Kafka authorization
- Quotas to control the use of resources by clients
The User Operator creates the user representing the client and the security credentials used for client authentication, based on the chosen authentication type.
Refer to the schema reference for more information on access configuration properties:
6.3.1. Securing Kafka brokers
This procedure shows the steps involved in securing Kafka brokers when running AMQ Streams.
The security implemented for Kafka brokers must be compatible with the security implemented for the clients requiring access.
-
Kafka.spec.kafka.listeners[*].authentication
matchesKafkaUser.spec.authentication
-
Kafka.spec.kafka.authorization
matchesKafkaUser.spec.authorization
The steps show the configuration for simple authorization and a listener using mTLS authentication. For more information on listener configuration, see GenericKafkaListener
schema reference.
Alternatively, you can use SCRAM-SHA or OAuth 2.0 for listener authentication, and OAuth 2.0 or OPA for Kafka authorization.
Procedure
Configure the
Kafka
resource.-
Configure the
authorization
property for authorization. Configure the
listeners
property to create a listener with authentication.For example:
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka spec: kafka: # ... authorization: 1 type: simple superUsers: 2 - CN=client_1 - user_2 - CN=client_3 listeners: - name: tls port: 9093 type: internal tls: true authentication: type: tls 3 # ... zookeeper: # ...
- 1
- 2
- List of user principals with unlimited access to Kafka. CN is the common name from the client certificate when mTLS authentication is used.
- 3
- Listener authentication mechanisms may be configured for each listener, and specified as mTLS, SCRAM-SHA-512, or token-based OAuth 2.0.
If you are configuring an external listener, the configuration is dependent on the chosen connection mechanism.
-
Configure the
Create or update the
Kafka
resource.oc apply -f <kafka_configuration_file>
The Kafka cluster is configured with a Kafka broker listener using mTLS authentication.
A service is created for each Kafka broker pod.
A service is created to serve as the bootstrap address for connection to the Kafka cluster.
The cluster CA certificate to verify the identity of the kafka brokers is also created in the secret
<cluster_name>-cluster-ca-cert
.
6.3.2. Securing user access to Kafka
Create or modify a KafkaUser
to represent a client that requires secure access to the Kafka cluster.
When you configure the KafkaUser
authentication and authorization mechanisms, ensure they match the equivalent Kafka
configuration:
-
KafkaUser.spec.authentication
matchesKafka.spec.kafka.listeners[*].authentication
-
KafkaUser.spec.authorization
matchesKafka.spec.kafka.authorization
This procedure shows how a user is created with mTLS authentication. You can also create a user with SCRAM-SHA authentication.
The authentication required depends on the type of authentication configured for the Kafka broker listener.
Authentication between Kafka users and Kafka brokers depends on the authentication settings for each. For example, it is not possible to authenticate a user with mTLS if it is not also enabled in the Kafka configuration.
Prerequisites
- A running Kafka cluster configured with a Kafka broker listener using mTLS authentication and TLS encryption.
- A running User Operator (typically deployed with the Entity Operator).
The authentication type in KafkaUser
should match the authentication configured in Kafka
brokers.
Procedure
Configure the
KafkaUser
resource.For example:
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaUser metadata: name: my-user labels: strimzi.io/cluster: my-cluster spec: authentication: 1 type: tls authorization: type: simple 2 acls: - resource: type: topic name: my-topic patternType: literal operations: - Describe - Read - resource: type: group name: my-group patternType: literal operations: - Read
Create or update the
KafkaUser
resource.oc apply -f <user_config_file>
The user is created, as well as a Secret with the same name as the
KafkaUser
resource. The Secret contains a private and public key for mTLS authentication.
For information on configuring a Kafka client with properties for secure connection to Kafka brokers, see Setting up client access to a Kafka cluster using listeners.
6.3.3. Restricting access to Kafka listeners using network policies
You can restrict access to a listener to only selected applications by using the networkPolicyPeers
property.
Prerequisites
- An OpenShift cluster with support for Ingress NetworkPolicies.
- The Cluster Operator is running.
Procedure
-
Open the
Kafka
resource. In the
networkPolicyPeers
property, define the application pods or namespaces that will be allowed to access the Kafka cluster.For example, to configure a
tls
listener to allow connections only from application pods with the labelapp
set tokafka-client
:apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka spec: kafka: # ... listeners: - name: tls port: 9093 type: internal tls: true authentication: type: tls networkPolicyPeers: - podSelector: matchLabels: app: kafka-client # ... zookeeper: # ...
Create or update the resource.
Use
oc apply
:oc apply -f your-file
Additional resources
6.4. Using OAuth 2.0 token-based authentication
AMQ Streams supports the use of OAuth 2.0 authentication using the OAUTHBEARER and PLAIN mechanisms.
OAuth 2.0 enables standardized token-based authentication and authorization between applications, using a central authorization server to issue tokens that grant limited access to resources.
You can configure OAuth 2.0 authentication, then OAuth 2.0 authorization.
Kafka brokers and clients both need to be configured to use OAuth 2.0. OAuth 2.0 authentication can also be used in conjunction with simple
or OPA-based Kafka authorization.
Using OAuth 2.0 token-based authentication, application clients can access resources on application servers (called resource servers) without exposing account credentials.
The application client passes an access token as a means of authenticating, which application servers can also use to determine the level of access to grant. The authorization server handles the granting of access and inquiries about access.
In the context of AMQ Streams:
- Kafka brokers act as OAuth 2.0 resource servers
- Kafka clients act as OAuth 2.0 application clients
Kafka clients authenticate to Kafka brokers. The brokers and clients communicate with the OAuth 2.0 authorization server, as necessary, to obtain or validate access tokens.
For a deployment of AMQ Streams, OAuth 2.0 integration provides:
- Server-side OAuth 2.0 support for Kafka brokers
- Client-side OAuth 2.0 support for Kafka MirrorMaker, Kafka Connect and the Kafka Bridge
6.4.1. OAuth 2.0 authentication mechanisms
AMQ Streams supports the OAUTHBEARER and PLAIN mechanisms for OAuth 2.0 authentication. Both mechanisms allow Kafka clients to establish authenticated sessions with Kafka brokers. The authentication flow between clients, the authorization server, and Kafka brokers is different for each mechanism.
We recommend that you configure clients to use OAUTHBEARER whenever possible. OAUTHBEARER provides a higher level of security than PLAIN because client credentials are never shared with Kafka brokers. Consider using PLAIN only with Kafka clients that do not support OAUTHBEARER.
You configure Kafka broker listeners to use OAuth 2.0 authentication for connecting clients. If necessary, you can use the OAUTHBEARER and PLAIN mechanisms on the same oauth
listener. The properties to support each mechanism must be explicitly specified in the oauth
listener configuration.
OAUTHBEARER overview
OAUTHBEARER is automatically enabled in the oauth
listener configuration for the Kafka broker. You can set the enableOauthBearer
property to true
, though this is not required.
# ... authentication: type: oauth # ... enableOauthBearer: true
Many Kafka client tools use libraries that provide basic support for OAUTHBEARER at the protocol level. To support application development, AMQ Streams provides an OAuth callback handler for the upstream Kafka Client Java libraries (but not for other libraries). Therefore, you do not need to write your own callback handlers. An application client can use the callback handler to provide the access token. Clients written in other languages, such as Go, must use custom code to connect to the authorization server and obtain the access token.
With OAUTHBEARER, the client initiates a session with the Kafka broker for credentials exchange, where credentials take the form of a bearer token provided by the callback handler. Using the callbacks, you can configure token provision in one of three ways:
- Client ID and Secret (by using the OAuth 2.0 client credentials mechanism)
- A long-lived access token, obtained manually at configuration time
- A long-lived refresh token, obtained manually at configuration time
OAUTHBEARER authentication can only be used by Kafka clients that support the OAUTHBEARER mechanism at the protocol level.
PLAIN overview
To use PLAIN, you must enable it in the oauth
listener configuration for the Kafka broker.
In the following example, PLAIN is enabled in addition to OAUTHBEARER, which is enabled by default. If you want to use PLAIN only, you can disable OAUTHBEARER by setting enableOauthBearer
to false
.
# ...
authentication:
type: oauth
# ...
enablePlain: true
tokenEndpointUri: https://OAUTH-SERVER-ADDRESS/auth/realms/external/protocol/openid-connect/token
PLAIN is a simple authentication mechanism used by all Kafka client tools. To enable PLAIN to be used with OAuth 2.0 authentication, AMQ Streams provides OAuth 2.0 over PLAIN server-side callbacks.
With the AMQ Streams implementation of PLAIN, the client credentials are not stored in ZooKeeper. Instead, client credentials are handled centrally behind a compliant authorization server, similar to when OAUTHBEARER authentication is used.
When used with the OAuth 2.0 over PLAIN callbacks, Kafka clients authenticate with Kafka brokers using either of the following methods:
- Client ID and secret (by using the OAuth 2.0 client credentials mechanism)
- A long-lived access token, obtained manually at configuration time
For both methods, the client must provide the PLAIN username
and password
properties to pass credentials to the Kafka broker. The client uses these properties to pass a client ID and secret or username and access token.
Client IDs and secrets are used to obtain access tokens.
Access tokens are passed as password
property values. You pass the access token with or without an $accessToken:
prefix.
-
If you configure a token endpoint (
tokenEndpointUri
) in the listener configuration, you need the prefix. -
If you don’t configure a token endpoint (
tokenEndpointUri
) in the listener configuration, you don’t need the prefix. The Kafka broker interprets the password as a raw access token.
If the password
is set as the access token, the username
must be set to the same principal name that the Kafka broker obtains from the access token. You can specify username extraction options in your listener using the userNameClaim
, fallbackUserNameClaim
, fallbackUsernamePrefix
, and userInfoEndpointUri
properties. The username extraction process also depends on your authorization server; in particular, how it maps client IDs to account names.
OAuth over PLAIN does not support password grant
mechanism. You can only 'proxy' through SASL PLAIN mechanism the client credentials
(clientId + secret) or the access token as described above.
Additional resources
6.4.2. OAuth 2.0 Kafka broker configuration
Kafka broker configuration for OAuth 2.0 involves:
- Creating the OAuth 2.0 client in the authorization server
- Configuring OAuth 2.0 authentication in the Kafka custom resource
In relation to the authorization server, Kafka brokers and Kafka clients are both regarded as OAuth 2.0 clients.
6.4.2.1. OAuth 2.0 client configuration on an authorization server
To configure a Kafka broker to validate the token received during session initiation, the recommended approach is to create an OAuth 2.0 client definition in an authorization server, configured as confidential, with the following client credentials enabled:
-
Client ID of
kafka
(for example) - Client ID and Secret as the authentication mechanism
You only need to use a client ID and secret when using a non-public introspection endpoint of the authorization server. The credentials are not typically required when using public authorization server endpoints, as with fast local JWT token validation.
6.4.2.2. OAuth 2.0 authentication configuration in the Kafka cluster
To use OAuth 2.0 authentication in the Kafka cluster, you specify, for example, a tls
listener configuration for your Kafka cluster custom resource with the authentication method oauth
:
Assigining the authentication method type for OAuth 2.0
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
spec:
kafka:
# ...
listeners:
- name: tls
port: 9093
type: internal
tls: true
authentication:
type: oauth
#...
You can configure OAuth 2.0 authentication in your listeners. We recommend using OAuth 2.0 authentication together with TLS encryption (tls: true
). Without encryption, the connection is vulnerable to network eavesdropping and unauthorized access through token theft.
You configure an external
listener with type: oauth
for a secure transport layer to communicate with the client.
Using OAuth 2.0 with an external listener
# ...
listeners:
- name: external
port: 9094
type: loadbalancer
tls: true
authentication:
type: oauth
#...
The tls
property is false by default, so it must be enabled.
When you have defined the type of authentication as OAuth 2.0, you add configuration based on the type of validation, either as fast local JWT validation or token validation using an introspection endpoint.
The procedure to configure OAuth 2.0 for listeners, with descriptions and examples, is described in Configuring OAuth 2.0 support for Kafka brokers.
6.4.2.3. Fast local JWT token validation configuration
Fast local JWT token validation checks a JWT token signature locally.
The local check ensures that a token:
-
Conforms to type by containing a (typ) claim value of
Bearer
for an access token - Is valid (not expired)
-
Has an issuer that matches a
validIssuerURI
You specify a validIssuerURI
attribute when you configure the listener, so that any tokens not issued by the authorization server are rejected.
The authorization server does not need to be contacted during fast local JWT token validation. You activate fast local JWT token validation by specifying a jwksEndpointUri
attribute, the endpoint exposed by the OAuth 2.0 authorization server. The endpoint contains the public keys used to validate signed JWT tokens, which are sent as credentials by Kafka clients.
All communication with the authorization server should be performed using TLS encryption.
You can configure a certificate truststore as an OpenShift Secret in your AMQ Streams project namespace, and use a tlsTrustedCertificates
attribute to point to the OpenShift Secret containing the truststore file.
You might want to configure a userNameClaim
to properly extract a username from the JWT token. If you want to use Kafka ACL authorization, you need to identify the user by their username during authentication. (The sub
claim in JWT tokens is typically a unique ID, not a username.)
Example configuration for fast local JWT token validation
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka spec: kafka: #... listeners: - name: tls port: 9093 type: internal tls: true authentication: type: oauth validIssuerUri: <https://<auth-server-address>/auth/realms/tls> jwksEndpointUri: <https://<auth-server-address>/auth/realms/tls/protocol/openid-connect/certs> userNameClaim: preferred_username maxSecondsWithoutReauthentication: 3600 tlsTrustedCertificates: - secretName: oauth-server-cert certificate: ca.crt #...
6.4.2.4. OAuth 2.0 introspection endpoint configuration
Token validation using an OAuth 2.0 introspection endpoint treats a received access token as opaque. The Kafka broker sends an access token to the introspection endpoint, which responds with the token information necessary for validation. Importantly, it returns up-to-date information if the specific access token is valid, and also information about when the token expires.
To configure OAuth 2.0 introspection-based validation, you specify an introspectionEndpointUri
attribute rather than the jwksEndpointUri
attribute specified for fast local JWT token validation. Depending on the authorization server, you typically have to specify a clientId
and clientSecret
, because the introspection endpoint is usually protected.
Example configuration for an introspection endpoint
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka spec: kafka: listeners: - name: tls port: 9093 type: internal tls: true authentication: type: oauth clientId: kafka-broker clientSecret: secretName: my-cluster-oauth key: clientSecret validIssuerUri: <https://<auth-server-address>/auth/realms/tls> introspectionEndpointUri: <https://<auth-server-address>/auth/realms/tls/protocol/openid-connect/token/introspect> userNameClaim: preferred_username maxSecondsWithoutReauthentication: 3600 tlsTrustedCertificates: - secretName: oauth-server-cert certificate: ca.crt
6.4.3. Session re-authentication for Kafka brokers
You can configure oauth
listeners to use Kafka session re-authentication for OAuth 2.0 sessions between Kafka clients and Kafka brokers. This mechanism enforces the expiry of an authenticated session between the client and the broker after a defined period of time. When a session expires, the client immediately starts a new session by reusing the existing connection rather than dropping it.
Session re-authentication is disabled by default. To enable it, you set a time value for maxSecondsWithoutReauthentication
in the oauth
listener configuration. The same property is used to configure session re-authentication for OAUTHBEARER and PLAIN authentication. For an example configuration, see Section 6.4.6.2, “Configuring OAuth 2.0 support for Kafka brokers”.
Session re-authentication must be supported by the Kafka client libraries used by the client.
Session re-authentication can be used with fast local JWT or introspection endpoint token validation.
Client re-authentication
When the broker’s authenticated session expires, the client must re-authenticate to the existing session by sending a new, valid access token to the broker, without dropping the connection.
If token validation is successful, a new client session is started using the existing connection. If the client fails to re-authenticate, the broker will close the connection if further attempts are made to send or receive messages. Java clients that use Kafka client library 2.2 or later automatically re-authenticate if the re-authentication mechanism is enabled on the broker.
Session re-authentication also applies to refresh tokens, if used. When the session expires, the client refreshes the access token by using its refresh token. The client then uses the new access token to re-authenticate to the existing session.
Session expiry for OAUTHBEARER and PLAIN
When session re-authentication is configured, session expiry works differently for OAUTHBEARER and PLAIN authentication.
For OAUTHBEARER and PLAIN, using the client ID and secret method:
-
The broker’s authenticated session will expire at the configured
maxSecondsWithoutReauthentication
. - The session will expire earlier if the access token expires before the configured time.
For PLAIN using the long-lived access token method:
-
The broker’s authenticated session will expire at the configured
maxSecondsWithoutReauthentication
. - Re-authentication will fail if the access token expires before the configured time. Although session re-authentication is attempted, PLAIN has no mechanism for refreshing tokens.
If maxSecondsWithoutReauthentication
is not configured, OAUTHBEARER and PLAIN clients can remain connected to brokers indefinitely, without needing to re-authenticate. Authenticated sessions do not end with access token expiry. However, this can be considered when configuring authorization, for example, by using keycloak
authorization or installing a custom authorizer.
6.4.4. OAuth 2.0 Kafka client configuration
A Kafka client is configured with either:
- The credentials required to obtain a valid access token from an authorization server (client ID and Secret)
- A valid long-lived access token or refresh token, obtained using tools provided by an authorization server
The only information ever sent to the Kafka broker is an access token. The credentials used to authenticate with the authorization server to obtain the access token are never sent to the broker.
When a client obtains an access token, no further communication with the authorization server is needed.
The simplest mechanism is authentication with a client ID and Secret. Using a long-lived access token, or a long-lived refresh token, adds more complexity because there is an additional dependency on authorization server tools.
If you are using long-lived access tokens, you may need to configure the client in the authorization server to increase the maximum lifetime of the token.
If the Kafka client is not configured with an access token directly, the client exchanges credentials for an access token during Kafka session initiation by contacting the authorization server. The Kafka client exchanges either:
- Client ID and Secret
- Client ID, refresh token, and (optionally) a secret
- Username and password, with client ID and (optionally) a secret
6.4.5. OAuth 2.0 client authentication flows
OAuth 2.0 authentication flows depend on the underlying Kafka client and Kafka broker configuration. The flows must also be supported by the authorization server used.
The Kafka broker listener configuration determines how clients authenticate using an access token. The client can pass a client ID and secret to request an access token.
If a listener is configured to use PLAIN authentication, the client can authenticate with a client ID and secret or username and access token. These values are passed as the username
and password
properties of the PLAIN mechanism.
Listener configuration supports the following token validation options:
- You can use fast local token validation based on JWT signature checking and local token introspection, without contacting an authorization server. The authorization server provides a JWKS endpoint with public certificates that are used to validate signatures on the tokens.
- You can use a call to a token introspection endpoint provided by an authorization server. Each time a new Kafka broker connection is established, the broker passes the access token received from the client to the authorization server. The Kafka broker checks the response to confirm whether or not the token is valid.
An authorization server might only allow the use of opaque access tokens, which means that local token validation is not possible.
Kafka client credentials can also be configured for the following types of authentication:
- Direct local access using a previously generated long-lived access token
- Contact with the authorization server for a new access token to be issued (using a client ID and a secret, or a refresh token, or a username and a password)
6.4.5.1. Example client authentication flows using the SASL OAUTHBEARER mechanism
You can use the following communication flows for Kafka authentication using the SASL OAUTHBEARER mechanism.
- Client using client ID and secret, with broker delegating validation to authorization server
- Client using client ID and secret, with broker performing fast local token validation
- Client using long-lived access token, with broker delegating validation to authorization server
- Client using long-lived access token, with broker performing fast local validation
Client using client ID and secret, with broker delegating validation to authorization server
- The Kafka client requests an access token from the authorization server using a client ID and secret, and optionally a refresh token. Alternatively, the client may authenticate using a username and a password.
- The authorization server generates a new access token.
- The Kafka client authenticates with the Kafka broker using the SASL OAUTHBEARER mechanism to pass the access token.
- The Kafka broker validates the access token by calling a token introspection endpoint on the authorization server using its own client ID and secret.
- A Kafka client session is established if the token is valid.
Client using client ID and secret, with broker performing fast local token validation
- The Kafka client authenticates with the authorization server from the token endpoint, using a client ID and secret, and optionally a refresh token. Alternatively, the client may authenticate using a username and a password.
- The authorization server generates a new access token.
- The Kafka client authenticates with the Kafka broker using the SASL OAUTHBEARER mechanism to pass the access token.
- The Kafka broker validates the access token locally using a JWT token signature check, and local token introspection.
Client using long-lived access token, with broker delegating validation to authorization server
- The Kafka client authenticates with the Kafka broker using the SASL OAUTHBEARER mechanism to pass the long-lived access token.
- The Kafka broker validates the access token by calling a token introspection endpoint on the authorization server, using its own client ID and secret.
- A Kafka client session is established if the token is valid.
Client using long-lived access token, with broker performing fast local validation
- The Kafka client authenticates with the Kafka broker using the SASL OAUTHBEARER mechanism to pass the long-lived access token.
- The Kafka broker validates the access token locally using a JWT token signature check and local token introspection.
Fast local JWT token signature validation is suitable only for short-lived tokens as there is no check with the authorization server if a token has been revoked. Token expiration is written into the token, but revocation can happen at any time, so cannot be accounted for without contacting the authorization server. Any issued token would be considered valid until it expires.
6.4.5.2. Example client authentication flows using the SASL PLAIN mechanism
You can use the following communication flows for Kafka authentication using the OAuth PLAIN mechanism.
Client using a client ID and secret, with the broker obtaining the access token for the client
-
The Kafka client passes a
clientId
as a username and asecret
as a password. -
The Kafka broker uses a token endpoint to pass the
clientId
andsecret
to the authorization server. - The authorization server returns a fresh access token or an error if the client credentials are not valid.
The Kafka broker validates the token in one of the following ways:
- If a token introspection endpoint is specified, the Kafka broker validates the access token by calling the endpoint on the authorization server. A session is established if the token validation is successful.
- If local token introspection is used, a request is not made to the authorization server. The Kafka broker validates the access token locally using a JWT token signature check.
Client using a long-lived access token without a client ID and secret
- The Kafka client passes a username and password. The password provides the value of an access token that was obtained manually and configured before running the client.
The password is passed with or without an
$accessToken:
string prefix depending on whether or not the Kafka broker listener is configured with a token endpoint for authentication.-
If the token endpoint is configured, the password should be prefixed by
$accessToken:
to let the broker know that the password parameter contains an access token rather than a client secret. The Kafka broker interprets the username as the account username. -
If the token endpoint is not configured on the Kafka broker listener (enforcing a
no-client-credentials mode
), the password should provide the access token without the prefix. The Kafka broker interprets the username as the account username. In this mode, the client doesn’t use a client ID and secret, and thepassword
parameter is always interpreted as a raw access token.
-
If the token endpoint is configured, the password should be prefixed by
The Kafka broker validates the token in one of the following ways:
- If a token introspection endpoint is specified, the Kafka broker validates the access token by calling the endpoint on the authorization server. A session is established if token validation is successful.
- If local token introspection is used, there is no request made to the authorization server. Kafka broker validates the access token locally using a JWT token signature check.
6.4.6. Configuring OAuth 2.0 authentication
OAuth 2.0 is used for interaction between Kafka clients and AMQ Streams components.
In order to use OAuth 2.0 for AMQ Streams, you must:
6.4.6.1. Configuring an OAuth 2.0 authorization server
This procedure describes in general what you need to do to configure an authorization server for integration with AMQ Streams.
These instructions are not product specific.
The steps are dependent on the chosen authorization server. Consult the product documentation for the authorization server for information on how to set up OAuth 2.0 access.
If you already have an authorization server deployed, you can skip the deployment step and use your current deployment.
Procedure
- Deploy the authorization server to your cluster.
Access the CLI or admin console for the authorization server to configure OAuth 2.0 for AMQ Streams.
Now prepare the authorization server to work with AMQ Streams.
-
Configure a
kafka-broker
client. - Configure clients for each Kafka client component of your application.
What to do next
After deploying and configuring the authorization server, configure the Kafka brokers to use OAuth 2.0.
6.4.6.2. Configuring OAuth 2.0 support for Kafka brokers
This procedure describes how to configure Kafka brokers so that the broker listeners are enabled to use OAuth 2.0 authentication using an authorization server.
We advise use of OAuth 2.0 over an encrypted interface through through a listener with tls: true
. Plain listeners are not recommended.
If the authorization server is using certificates signed by the trusted CA and matching the OAuth 2.0 server hostname, TLS connection works using the default settings. Otherwise, you may need to configure the truststore with proper certificates or disable the certificate hostname validation.
When configuring the Kafka broker you have two options for the mechanism used to validate the access token during OAuth 2.0 authentication of the newly connected Kafka client:
Before you start
For more information on the configuration of OAuth 2.0 authentication for Kafka broker listeners, see:
Prerequisites
- AMQ Streams and Kafka are running
- An OAuth 2.0 authorization server is deployed
Procedure
Update the Kafka broker configuration (
Kafka.spec.kafka
) of yourKafka
resource in an editor.oc edit kafka my-cluster
Configure the Kafka broker
listeners
configuration.The configuration for each type of listener does not have to be the same, as they are independent.
The examples here show the configuration options as configured for external listeners.
Example 1: Configuring fast local JWT token validation
#... - name: external port: 9094 type: loadbalancer tls: true authentication: type: oauth 1 validIssuerUri: <https://<auth-server-address>/auth/realms/external> 2 jwksEndpointUri: <https://<auth-server-address>/auth/realms/external/protocol/openid-connect/certs> 3 userNameClaim: preferred_username 4 maxSecondsWithoutReauthentication: 3600 5 tlsTrustedCertificates: 6 - secretName: oauth-server-cert certificate: ca.crt disableTlsHostnameVerification: true 7 jwksExpirySeconds: 360 8 jwksRefreshSeconds: 300 9 jwksMinRefreshPauseSeconds: 1 10
- 1
- Listener type set to
oauth
. - 2
- URI of the token issuer used for authentication.
- 3
- URI of the JWKS certificate endpoint used for local JWT validation.
- 4
- The token claim (or key) that contains the actual user name in the token. The user name is the principal used to identify the user. The
userNameClaim
value will depend on the authentication flow and the authorization server used. - 5
- (Optional) Activates the Kafka re-authentication mechanism that enforces session expiry to the same length of time as the access token. If the specified value is less than the time left for the access token to expire, then the client will have to re-authenticate before the actual token expiry. By default, the session does not expire when the access token expires, and the client does not attempt re-authentication.
- 6
- (Optional) Trusted certificates for TLS connection to the authorization server.
- 7
- (Optional) Disable TLS hostname verification. Default is
false
. - 8
- The duration the JWKS certificates are considered valid before they expire. Default is
360
seconds. If you specify a longer time, consider the risk of allowing access to revoked certificates. - 9
- The period between refreshes of JWKS certificates. The interval must be at least 60 seconds shorter than the expiry interval. Default is
300
seconds. - 10
- The minimum pause in seconds between consecutive attempts to refresh JWKS public keys. When an unknown signing key is encountered, the JWKS keys refresh is scheduled outside the regular periodic schedule with at least the specified pause since the last refresh attempt. The refreshing of keys follows the rule of exponential backoff, retrying on unsuccessful refreshes with ever increasing pause, until it reaches
jwksRefreshSeconds
. The default value is 1.
Example 2: Configuring token validation using an introspection endpoint
- name: external port: 9094 type: loadbalancer tls: true authentication: type: oauth validIssuerUri: <https://<auth-server-address>/auth/realms/external> introspectionEndpointUri: <https://<auth-server-address>/auth/realms/external/protocol/openid-connect/token/introspect> 1 clientId: kafka-broker 2 clientSecret: 3 secretName: my-cluster-oauth key: clientSecret userNameClaim: preferred_username 4 maxSecondsWithoutReauthentication: 3600 5
- 1
- URI of the token introspection endpoint.
- 2
- Client ID to identify the client.
- 3
- Client Secret and client ID is used for authentication.
- 4
- The token claim (or key) that contains the actual user name in the token. The user name is the principal used to identify the user. The
userNameClaim
value will depend on the authorization server used. - 5
- (Optional) Activates the Kafka re-authentication mechanism that enforces session expiry to the same length of time as the access token. If the specified value is less than the time left for the access token to expire, then the client will have to re-authenticate before the actual token expiry. By default, the session does not expire when the access token expires, and the client does not attempt re-authentication.
Depending on how you apply OAuth 2.0 authentication, and the type of authorization server, there are additional (optional) configuration settings you can use:
# ... authentication: type: oauth # ... checkIssuer: false 1 checkAudience: true 2 fallbackUserNameClaim: client_id 3 fallbackUserNamePrefix: client-account- 4 validTokenType: bearer 5 userInfoEndpointUri: https://OAUTH-SERVER-ADDRESS/auth/realms/external/protocol/openid-connect/userinfo 6 enableOauthBearer: false 7 enablePlain: true 8 tokenEndpointUri: https://OAUTH-SERVER-ADDRESS/auth/realms/external/protocol/openid-connect/token 9 customClaimCheck: "@.custom == 'custom-value'" 10 clientAudience: AUDIENCE 11 clientScope: SCOPE 12 connectTimeoutSeconds: 60 13 readTimeoutSeconds: 60 14 groupsClaim: "$.groups" 15 groupsClaimDelimiter: "," 16
- 1
- If your authorization server does not provide an
iss
claim, it is not possible to perform an issuer check. In this situation, setcheckIssuer
tofalse
and do not specify avalidIssuerUri
. Default istrue
. - 2
- If your authorization server provides an
aud
(audience) claim, and you want to enforce an audience check, setcheckAudience
totrue
. Audience checks identify the intended recipients of tokens. As a result, the Kafka broker will reject tokens that do not have itsclientId
in theiraud
claim. Default isfalse
. - 3
- An authorization server may not provide a single attribute to identify both regular users and clients. When a client authenticates in its own name, the server might provide a client ID. When a user authenticates using a username and password, to obtain a refresh token or an access token, the server might provide a username attribute in addition to a client ID. Use this fallback option to specify the username claim (attribute) to use if a primary user ID attribute is not available.
- 4
- In situations where
fallbackUserNameClaim
is applicable, it may also be necessary to prevent name collisions between the values of the username claim, and those of the fallback username claim. Consider a situation where a client calledproducer
exists, but also a regular user calledproducer
exists. In order to differentiate between the two, you can use this property to add a prefix to the user ID of the client. - 5
- (Only applicable when using
introspectionEndpointUri
) Depending on the authorization server you are using, the introspection endpoint may or may not return the token type attribute, or it may contain different values. You can specify a valid token type value that the response from the introspection endpoint has to contain. - 6
- (Only applicable when using
introspectionEndpointUri
) The authorization server may be configured or implemented in such a way to not provide any identifiable information in an Introspection Endpoint response. In order to obtain the user ID, you can configure the URI of theuserinfo
endpoint as a fallback. TheuserNameClaim
,fallbackUserNameClaim
, andfallbackUserNamePrefix
settings are applied to the response ofuserinfo
endpoint. - 7
- Set this to
false
to disable the OAUTHBEARER mechanism on the listener. At least one of PLAIN or OAUTHBEARER has to be enabled. Default istrue
. - 8
- Set to
true
to enable PLAIN authentication on the listener, which is supported for clients on all platforms. - 9
- Additional configuration for the PLAIN mechanism. If specified, clients can authenticate over PLAIN by passing an access token as the
password
using an$accessToken:
prefix. For production, always usehttps://
urls. - 10
- Additional custom rules can be imposed on the JWT access token during validation by setting this to a JsonPath filter query. If the access token does not contain the necessary data, it is rejected. When using the
introspectionEndpointUri
, the custom check is applied to the introspection endpoint response JSON. - 11
- An
audience
parameter passed to the token endpoint. An audience is used when obtaining an access token for inter-broker authentication. It is also used in the name of a client for OAuth 2.0 over PLAIN client authentication using aclientId
andsecret
. This only affects the ability to obtain the token, and the content of the token, depending on the authorization server. It does not affect token validation rules by the listener. - 12
- A
scope
parameter passed to the token endpoint. A scope is used when obtaining an access token for inter-broker authentication. It is also used in the name of a client for OAuth 2.0 over PLAIN client authentication using aclientId
andsecret
. This only affects the ability to obtain the token, and the content of the token, depending on the authorization server. It does not affect token validation rules by the listener. - 13
- The connect timeout in seconds when connecting to the authorization server. The default value is 60.
- 14
- The read timeout in seconds when connecting to the authorization server. The default value is 60.
- 15
- A JsonPath query used to extract groups information from JWT token or introspection endpoint response. Not set by default. This can be used by a custom authorizer to make authorization decisions based on user groups.
- 16
- A delimiter used to parse groups information when returned as a single delimited string. The default value is ',' (comma).
- Save and exit the editor, then wait for rolling updates to complete.
Check the update in the logs or by watching the pod state transitions:
oc logs -f ${POD_NAME} -c ${CONTAINER_NAME} oc get pod -w
The rolling update configures the brokers to use OAuth 2.0 authentication.
What to do next
6.4.6.3. Configuring Kafka Java clients to use OAuth 2.0
Configure Kafka producer and consumer APIs to use OAuth 2.0 for interaction with Kafka brokers. Add a callback plugin to your client pom.xml
file, then configure your client for OAuth 2.0.
Specify the following in your client configuration:
A SASL (Simple Authentication and Security Layer) security protocol:
-
SASL_SSL
for authentication over TLS encrypted connections SASL_PLAINTEXT
for authentication over unencrypted connectionsUse
SASL_SSL
for production andSASL_PLAINTEXT
for local development only. When usingSASL_SSL
, additionalssl.truststore
configuration is needed. The truststore configuration is required for secure connection (https://
) to the OAuth 2.0 authorization server. To verify the OAuth 2.0 authorization server, add the CA certificate for the authorization server to the truststore in your client configuration. You can configure a truststore in PEM or PKCS #12 format.
-
A Kafka SASL mechanism:
-
OAUTHBEARER
for credentials exchange using a bearer token -
PLAIN
to pass client credentials (clientId + secret) or an access token
-
A JAAS (Java Authentication and Authorization Service) module that implements the SASL mechanism:
-
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
implements the OAUTHBEARER mechanism -
org.apache.kafka.common.security.plain.PlainLoginModule
implements the PLAIN mechanism
-
SASL authentication properties, which support the following authentication methods:
- OAuth 2.0 client credentials
- OAuth 2.0 password grant (deprecated)
- Access token
- Refresh token
Add the SASL authentication properties as JAAS configuration (sasl.jaas.config
). How you configure the authentication properties depends on the authentication method you are using to access the OAuth 2.0 authorization server. In this procedure, the properties are specified in a properties file, then loaded into the client configuration.
You can also specify authentication properties as environment variables, or as Java system properties. For Java system properties, you can set them using setProperty
and pass them on the command line using the -D
option.
Prerequisites
- AMQ Streams and Kafka are running
- An OAuth 2.0 authorization server is deployed and configured for OAuth access to Kafka brokers
- Kafka brokers are configured for OAuth 2.0
Procedure
Add the client library with OAuth 2.0 support to the
pom.xml
file for the Kafka client:<dependency> <groupId>io.strimzi</groupId> <artifactId>kafka-oauth-client</artifactId> <version>0.11.0.redhat-00003</version> </dependency>
Configure the client properties by specifying the following configuration in a properties file:
- The security protocol
- The SASL mechanism
The JAAS module and authentication properties according to the method being used
For example, we can add the following to a
client.properties
file:Client credentials mechanism properties
security.protocol=SASL_SSL 1 sasl.mechanism=OAUTHBEARER 2 ssl.truststore.location=/tmp/truststore.p12 3 ssl.truststore.password=$STOREPASS ssl.truststore.type=PKCS12 sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.token.endpoint.uri="<token_endpoint_url>" \ 4 oauth.client.id="<client_id>" \ 5 oauth.client.secret="<client_secret>" \ 6 oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \ 7 oauth.ssl.truststore.password="$STOREPASS" \ 8 oauth.ssl.truststore.type="PKCS12" \ 9 oauth.scope="<scope>" \ 10 oauth.audience="<audience>" ; 11
- 1
SASL_SSL
security protocol for TLS-encrypted connections. UseSASL_PLAINTEXT
over unencrypted connections for local development only.- 2
- The SASL mechanism specified as
OAUTHBEARER
orPLAIN
. - 3
- The truststore configuration for secure access to the Kafka cluster.
- 4
- URI of the authorization server token endpoint.
- 5
- Client ID, which is the name used when creating the client in the authorization server.
- 6
- Client secret created when creating the client in the authorization server.
- 7
- The location contains the public key certificate (
truststore.p12
) for the authorization server. - 8
- The password for accessing the truststore.
- 9
- The truststore type.
- 10
- (Optional) The
scope
for requesting the token from the token endpoint. An authorization server may require a client to specify the scope. - 11
- (Optional) The
audience
for requesting the token from the token endpoint. An authorization server may require a client to specify the audience.
Password grants mechanism properties
security.protocol=SASL_SSL sasl.mechanism=OAUTHBEARER ssl.truststore.location=/tmp/truststore.p12 ssl.truststore.password=$STOREPASS ssl.truststore.type=PKCS12 sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.token.endpoint.uri="<token_endpoint_url>" \ oauth.client.id="<client_id>" \ 1 oauth.client.secret="<client_secret>" \ 2 oauth.password.grant.username="<username>" \ 3 oauth.password.grant.password="<password>" \ 4 oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \ oauth.ssl.truststore.password="$STOREPASS" \ oauth.ssl.truststore.type="PKCS12" \ oauth.scope="<scope>" \ oauth.audience="<audience>" ;
- 1
- Client ID, which is the name used when creating the client in the authorization server.
- 2
- (Optional) Client secret created when creating the client in the authorization server.
- 3
- Username for password grant authentication. OAuth password grant configuration (username and password) uses the OAuth 2.0 password grant method. To use password grants, create a user account for a client on your authorization server with limited permissions. The account should act like a service account. Use in environments where user accounts are required for authentication, but consider using a refresh token first.
- 4
- Password for password grant authentication.Note
SASL PLAIN does not support passing a username and password (password grants) using the OAuth 2.0 password grant method.
Access token properties
security.protocol=SASL_SSL sasl.mechanism=OAUTHBEARER ssl.truststore.location=/tmp/truststore.p12 ssl.truststore.password=$STOREPASS ssl.truststore.type=PKCS12 sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.token.endpoint.uri="<token_endpoint_url>" \ oauth.access.token="<access_token>" ; 1 oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \ oauth.ssl.truststore.password="$STOREPASS" \ oauth.ssl.truststore.type="PKCS12" \
- 1
- Long-lived access token for Kafka clients.
Refresh token properties
security.protocol=SASL_SSL sasl.mechanism=OAUTHBEARER ssl.truststore.location=/tmp/truststore.p12 ssl.truststore.password=$STOREPASS ssl.truststore.type=PKCS12 sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.token.endpoint.uri="<token_endpoint_url>" \ oauth.client.id="<client_id>" \ 1 oauth.client.secret="<client_secret>" \ 2 oauth.refresh.token="<refresh_token>" ; 3 oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \ oauth.ssl.truststore.password="$STOREPASS" \ oauth.ssl.truststore.type="PKCS12" \
Input the client properties for OAUTH 2.0 authentication into the Java client code.
Example showing input of client properties
Properties props = new Properties(); try (FileReader reader = new FileReader("client.properties", StandardCharsets.UTF_8)) { props.load(reader); }
- Verify that the Kafka client can access the Kafka brokers.
6.4.6.4. Configuring OAuth 2.0 for Kafka components
This procedure describes how to configure Kafka components to use OAuth 2.0 authentication using an authorization server.
You can configure authentication for:
- Kafka Connect
- Kafka MirrorMaker
- Kafka Bridge
In this scenario, the Kafka component and the authorization server are running in the same cluster.
Before you start
For more information on the configuration of OAuth 2.0 authentication for Kafka components, see:
Prerequisites
- AMQ Streams and Kafka are running
- An OAuth 2.0 authorization server is deployed and configured for OAuth access to Kafka brokers
- Kafka brokers are configured for OAuth 2.0
Procedure
Create a client secret and mount it to the component as an environment variable.
For example, here we are creating a client
Secret
for the Kafka Bridge:apiVersion: kafka.strimzi.io/v1beta2 kind: Secret metadata: name: my-bridge-oauth type: Opaque data: clientSecret: MGQ1OTRmMzYtZTllZS00MDY2LWI5OGEtMTM5MzM2NjdlZjQw 1
- 1
- The
clientSecret
key must be in base64 format.
Create or edit the resource for the Kafka component so that OAuth 2.0 authentication is configured for the authentication property.
For OAuth 2.0 authentication, you can use:
- Client ID and secret
- Client ID and refresh token
- Access token
- Username and password
- TLS
KafkaClientAuthenticationOAuth schema reference provides examples of each.
For example, here OAuth 2.0 is assigned to the Kafka Bridge client using a client ID and secret, and TLS:
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaBridge metadata: name: my-bridge spec: # ... authentication: type: oauth 1 tokenEndpointUri: https://<auth-server-address>/auth/realms/master/protocol/openid-connect/token 2 clientId: kafka-bridge clientSecret: secretName: my-bridge-oauth key: clientSecret tlsTrustedCertificates: 3 - secretName: oauth-server-cert certificate: tls.crt
Depending on how you apply OAuth 2.0 authentication, and the type of authorization server, there are additional configuration options you can use:
# ... spec: # ... authentication: # ... disableTlsHostnameVerification: true 1 checkAccessTokenType: false 2 accessTokenIsJwt: false 3 scope: any 4 audience: kafka 5 connectTimeoutSeconds: 60 6 readTimeoutSeconds: 60 7
- 1
- (Optional) Disable TLS hostname verification. Default is
false
. - 2
- If the authorization server does not return a
typ
(type) claim inside the JWT token, you can applycheckAccessTokenType: false
to skip the token type check. Default istrue
. - 3
- If you are using opaque tokens, you can apply
accessTokenIsJwt: false
so that access tokens are not treated as JWT tokens. - 4
- (Optional) The
scope
for requesting the token from the token endpoint. An authorization server may require a client to specify the scope. In this case it isany
. - 5
- (Optional) The
audience
for requesting the token from the token endpoint. An authorization server may require a client to specify the audience. In this case it iskafka
. - 6
- (Optional) The connect timeout in seconds when connecting to the authorization server. The default value is 60.
- 7
- (Optional) The read timeout in seconds when connecting to the authorization server. The default value is 60.
Apply the changes to the deployment of your Kafka resource.
oc apply -f your-file
Check the update in the logs or by watching the pod state transitions:
oc logs -f ${POD_NAME} -c ${CONTAINER_NAME} oc get pod -w
The rolling updates configure the component for interaction with Kafka brokers using OAuth 2.0 authentication.
6.5. Using OAuth 2.0 token-based authorization
AMQ Streams supports the use of OAuth 2.0 token-based authorization through Red Hat Single Sign-On Authorization Services, which allows you to manage security policies and permissions centrally.
Security policies and permissions defined in Red Hat Single Sign-On are used to grant access to resources on Kafka brokers. Users and clients are matched against policies that permit access to perform specific actions on Kafka brokers.
Kafka allows all users full access to brokers by default, and also provides the AclAuthorizer
plugin to configure authorization based on Access Control Lists (ACLs).
ZooKeeper stores ACL rules that grant or deny access to resources based on username. However, OAuth 2.0 token-based authorization with Red Hat Single Sign-On offers far greater flexibility on how you wish to implement access control to Kafka brokers. In addition, you can configure your Kafka brokers to use OAuth 2.0 authorization and ACLs.
Additional resources
6.5.1. OAuth 2.0 authorization mechanism
OAuth 2.0 authorization in AMQ Streams uses Red Hat Single Sign-On server Authorization Services REST endpoints to extend token-based authentication with Red Hat Single Sign-On by applying defined security policies on a particular user, and providing a list of permissions granted on different resources for that user. Policies use roles and groups to match permissions to users. OAuth 2.0 authorization enforces permissions locally based on the received list of grants for the user from Red Hat Single Sign-On Authorization Services.
6.5.1.1. Kafka broker custom authorizer
A Red Hat Single Sign-On authorizer (KeycloakRBACAuthorizer
) is provided with AMQ Streams. To be able to use the Red Hat Single Sign-On REST endpoints for Authorization Services provided by Red Hat Single Sign-On, you configure a custom authorizer on the Kafka broker.
The authorizer fetches a list of granted permissions from the authorization server as needed, and enforces authorization locally on the Kafka Broker, making rapid authorization decisions for each client request.
6.5.2. Configuring OAuth 2.0 authorization support
This procedure describes how to configure Kafka brokers to use OAuth 2.0 authorization using Red Hat Single Sign-On Authorization Services.
Before you begin
Consider the access you require or want to limit for certain users. You can use a combination of Red Hat Single Sign-On groups, roles, clients, and users to configure access in Red Hat Single Sign-On.
Typically, groups are used to match users based on organizational departments or geographical locations. And roles are used to match users based on their function.
With Red Hat Single Sign-On, you can store users and groups in LDAP, whereas clients and roles cannot be stored this way. Storage and access to user data may be a factor in how you choose to configure authorization policies.
Super users always have unconstrained access to a Kafka broker regardless of the authorization implemented on the Kafka broker.
Prerequisites
- AMQ Streams must be configured to use OAuth 2.0 with Red Hat Single Sign-On for token-based authentication. You use the same Red Hat Single Sign-On server endpoint when you set up authorization.
-
OAuth 2.0 authentication must be configured with the
maxSecondsWithoutReauthentication
option to enable re-authentication.
Procedure
- Access the Red Hat Single Sign-On Admin Console or use the Red Hat Single Sign-On Admin CLI to enable Authorization Services for the Kafka broker client you created when setting up OAuth 2.0 authentication.
- Use Authorization Services to define resources, authorization scopes, policies, and permissions for the client.
- Bind the permissions to users and clients by assigning them roles and groups.
Configure the Kafka brokers to use Red Hat Single Sign-On authorization by updating the Kafka broker configuration (
Kafka.spec.kafka
) of yourKafka
resource in an editor.oc edit kafka my-cluster
Configure the Kafka broker
kafka
configuration to usekeycloak
authorization, and to be able to access the authorization server and Authorization Services.For example:
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: my-cluster spec: kafka: # ... authorization: type: keycloak 1 tokenEndpointUri: <https://<auth-server-address>/auth/realms/external/protocol/openid-connect/token> 2 clientId: kafka 3 delegateToKafkaAcls: false 4 disableTlsHostnameVerification: false 5 superUsers: 6 - CN=fred - sam - CN=edward tlsTrustedCertificates: 7 - secretName: oauth-server-cert certificate: ca.crt grantsRefreshPeriodSeconds: 60 8 grantsRefreshPoolSize: 5 9 connectTimeoutSeconds: 60 10 readTimeoutSeconds: 60 11 #...
- 1
- Type
keycloak
enables Red Hat Single Sign-On authorization. - 2
- URI of the Red Hat Single Sign-On token endpoint. For production, always use
https://
urls. When you configure token-basedoauth
authentication, you specify ajwksEndpointUri
as the URI for local JWT validation. The hostname for thetokenEndpointUri
URI must be the same. - 3
- The client ID of the OAuth 2.0 client definition in Red Hat Single Sign-On that has Authorization Services enabled. Typically,
kafka
is used as the ID. - 4
- (Optional) Delegate authorization to Kafka
AclAuthorizer
if access is denied by Red Hat Single Sign-On Authorization Services policies. Default isfalse
. - 5
- (Optional) Disable TLS hostname verification. Default is
false
. - 6
- (Optional) Designated super users.
- 7
- (Optional) Trusted certificates for TLS connection to the authorization server.
- 8
- (Optional) The time between two consecutive grants refresh runs. That is the maximum time for active sessions to detect any permissions changes for the user on Red Hat Single Sign-On. The default value is 60.
- 9
- (Optional) The number of threads to use to refresh (in parallel) the grants for the active sessions. The default value is 5.
- 10
- (Optional) The connect timeout in seconds when connecting to the Red Hat Single Sign-On token endpoint. The default value is 60.
- 11
- (Optional) The read timeout in seconds when connecting to the Red Hat Single Sign-On token endpoint. The default value is 60.
- Save and exit the editor, then wait for rolling updates to complete.
Check the update in the logs or by watching the pod state transitions:
oc logs -f ${POD_NAME} -c kafka oc get pod -w
The rolling update configures the brokers to use OAuth 2.0 authorization.
- Verify the configured permissions by accessing Kafka brokers as clients or users with specific roles, making sure they have the necessary access, or do not have the access they are not supposed to have.
6.5.3. Managing policies and permissions in Red Hat Single Sign-On Authorization Services
This section describes the authorization models used by Red Hat Single Sign-On Authorization Services and Kafka, and defines the important concepts in each model.
To grant permissions to access Kafka, you can map Red Hat Single Sign-On Authorization Services objects to Kafka resources by creating an OAuth client specification in Red Hat Single Sign-On. Kafka permissions are granted to user accounts or service accounts using Red Hat Single Sign-On Authorization Services rules.
Examples are shown of the different user permissions required for common Kafka operations, such as creating and listing topics.
6.5.3.1. Kafka and Red Hat Single Sign-On authorization models overview
Kafka and Red Hat Single Sign-On Authorization Services use different authorization models.
Kafka authorization model
Kafka’s authorization model uses resource types. When a Kafka client performs an action on a broker, the broker uses the configured KeycloakRBACAuthorizer
to check the client’s permissions, based on the action and resource type.
Kafka uses five resource types to control access: Topic
, Group
, Cluster
, TransactionalId
, and DelegationToken
. Each resource type has a set of available permissions.
Topic
-
Create
-
Write
-
Read
-
Delete
-
Describe
-
DescribeConfigs
-
Alter
-
AlterConfigs
Group
-
Read
-
Describe
-
Delete
Cluster
-
Create
-
Describe
-
Alter
-
DescribeConfigs
-
AlterConfigs
-
IdempotentWrite
-
ClusterAction
TransactionalId
-
Describe
-
Write
DelegationToken
-
Describe
Red Hat Single Sign-On Authorization Services model
The Red Hat Single Sign-On Authorization Services model has four concepts for defining and granting permissions: resources, authorization scopes, policies, and permissions.
- Resources
- A resource is a set of resource definitions that are used to match resources with permitted actions. A resource might be an individual topic, for example, or all topics with names starting with the same prefix. A resource definition is associated with a set of available authorization scopes, which represent a set of all actions available on the resource. Often, only a subset of these actions is actually permitted.
- Authorization scopes
- An authorization scope is a set of all the available actions on a specific resource definition. When you define a new resource, you add scopes from the set of all scopes.
- Policies
A policy is an authorization rule that uses criteria to match against a list of accounts. Policies can match:
- Service accounts based on client ID or roles
- User accounts based on username, groups, or roles.
- Permissions
- A permission grants a subset of authorization scopes on a specific resource definition to a set of users.
Additional resources
6.5.3.2. Map Red Hat Single Sign-On Authorization Services to the Kafka authorization model
The Kafka authorization model is used as a basis for defining the Red Hat Single Sign-On roles and resources that will control access to Kafka.
To grant Kafka permissions to user accounts or service accounts, you first create an OAuth client specification in Red Hat Single Sign-On for the Kafka broker. You then specify Red Hat Single Sign-On Authorization Services rules on the client. Typically, the client id of the OAuth client that represents the broker is kafka
. The example configuration files provided with AMQ Streams use kafka
as the OAuth client id.
If you have multiple Kafka clusters, you can use a single OAuth client (kafka
) for all of them. This gives you a single, unified space in which to define and manage authorization rules. However, you can also use different OAuth client ids (for example, my-cluster-kafka
or cluster-dev-kafka
) and define authorization rules for each cluster within each client configuration.
The kafka
client definition must have the Authorization Enabled option enabled in the Red Hat Single Sign-On Admin Console.
All permissions exist within the scope of the kafka
client. If you have different Kafka clusters configured with different OAuth client IDs, they each need a separate set of permissions even though they’re part of the same Red Hat Single Sign-On realm.
When the Kafka client uses OAUTHBEARER authentication, the Red Hat Single Sign-On authorizer (KeycloakRBACAuthorizer
) uses the access token of the current session to retrieve a list of grants from the Red Hat Single Sign-On server. To retrieve the grants, the authorizer evaluates the Red Hat Single Sign-On Authorization Services policies and permissions.
Authorization scopes for Kafka permissions
An initial Red Hat Single Sign-On configuration usually involves uploading authorization scopes to create a list of all possible actions that can be performed on each Kafka resource type. This step is performed once only, before defining any permissions. You can add authorization scopes manually instead of uploading them.
Authorization scopes must contain all the possible Kafka permissions regardless of the resource type:
-
Create
-
Write
-
Read
-
Delete
-
Describe
-
Alter
-
DescribeConfig
-
AlterConfig
-
ClusterAction
-
IdempotentWrite
If you’re certain you won’t need a permission (for example, IdempotentWrite
), you can omit it from the list of authorization scopes. However, that permission won’t be available to target on Kafka resources.
Resource patterns for permissions checks
Resource patterns are used for pattern matching against the targeted resources when performing permission checks. The general pattern format is RESOURCE-TYPE:PATTERN-NAME
.
The resource types mirror the Kafka authorization model. The pattern allows for two matching options:
-
Exact matching (when the pattern does not end with
*
) -
Prefix matching (when the pattern ends with
*
)
Example patterns for resources
Topic:my-topic Topic:orders-* Group:orders-* Cluster:*
Additionally, the general pattern format can be prefixed by kafka-cluster:CLUSTER-NAME
followed by a comma, where CLUSTER-NAME refers to the metadata.name
in the Kafka custom resource.
Example patterns for resources with cluster prefix
kafka-cluster:my-cluster,Topic:* kafka-cluster:*,Group:b_*
When the kafka-cluster
prefix is missing, it is assumed to be kafka-cluster:*
.
When defining a resource, you can associate it with a list of possible authorization scopes which are relevant to the resource. Set whatever actions make sense for the targeted resource type.
Though you may add any authorization scope to any resource, only the scopes supported by the resource type are considered for access control.
Policies for applying access permission
Policies are used to target permissions to one or more user accounts or service accounts. Targeting can refer to:
- Specific user or service accounts
- Realm roles or client roles
- User groups
- JavaScript rules to match a client IP address
A policy is given a unique name and can be reused to target multiple permissions to multiple resources.
Permissions to grant access
Use fine-grained permissions to pull together the policies, resources, and authorization scopes that grant access to users.
The name of each permission should clearly define which permissions it grants to which users. For example, Dev Team B can read from topics starting with x
.
Additional resources
- For more information about how to configure permissions through Red Hat Single Sign-On Authorization Services, see Section 6.5.4, “Trying Red Hat Single Sign-On Authorization Services”.
6.5.3.3. Example permissions required for Kafka operations
The following examples demonstrate the user permissions required for performing common operations on Kafka.
Create a topic
To create a topic, the Create
permission is required for the specific topic, or for Cluster:kafka-cluster
.
bin/kafka-topics.sh --create --topic my-topic \ --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties
List topics
If a user has the Describe
permission on a specified topic, the topic is listed.
bin/kafka-topics.sh --list \ --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties
Display topic details
To display a topic’s details, Describe
and DescribeConfigs
permissions are required on the topic.
bin/kafka-topics.sh --describe --topic my-topic \ --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties
Produce messages to a topic
To produce messages to a topic, Describe
and Write
permissions are required on the topic.
If the topic hasn’t been created yet, and topic auto-creation is enabled, the permissions to create a topic are required.
bin/kafka-console-producer.sh --topic my-topic \ --bootstrap-server my-cluster-kafka-bootstrap:9092 --producer.config=/tmp/config.properties
Consume messages from a topic
To consume messages from a topic, Describe
and Read
permissions are required on the topic. Consuming from the topic normally relies on storing the consumer offsets in a consumer group, which requires additional Describe
and Read
permissions on the consumer group.
Two resources
are needed for matching. For example:
Topic:my-topic Group:my-group-*
bin/kafka-console-consumer.sh --topic my-topic --group my-group-1 --from-beginning \ --bootstrap-server my-cluster-kafka-bootstrap:9092 --consumer.config /tmp/config.properties
Produce messages to a topic using an idempotent producer
As well as the permissions for producing to a topic, an additional IdempotentWrite
permission is required on the Cluster:kafka-cluster
resource.
Two resources
are needed for matching. For example:
Topic:my-topic Cluster:kafka-cluster
bin/kafka-console-producer.sh --topic my-topic \ --bootstrap-server my-cluster-kafka-bootstrap:9092 --producer.config=/tmp/config.properties --producer-property enable.idempotence=true --request-required-acks -1
List consumer groups
When listing consumer groups, only the groups on which the user has the Describe
permissions are returned. Alternatively, if the user has the Describe
permission on the Cluster:kafka-cluster
, all the consumer groups are returned.
bin/kafka-consumer-groups.sh --list \ --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties
Display consumer group details
To display a consumer group’s details, the Describe
permission is required on the group and the topics associated with the group.
bin/kafka-consumer-groups.sh --describe --group my-group-1 \ --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties
Change topic configuration
To change a topic’s configuration, the Describe
and Alter
permissions are required on the topic.
bin/kafka-topics.sh --alter --topic my-topic --partitions 2 \ --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties
Display Kafka broker configuration
In order to use kafka-configs.sh
to get a broker’s configuration, the DescribeConfigs
permission is required on the Cluster:kafka-cluster
.
bin/kafka-configs.sh --entity-type brokers --entity-name 0 --describe --all \ --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties
Change Kafka broker configuration
To change a Kafka broker’s configuration, DescribeConfigs
and AlterConfigs
permissions are required on Cluster:kafka-cluster
.
bin/kafka-configs --entity-type brokers --entity-name 0 --alter --add-config log.cleaner.threads=2 \ --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties
Delete a topic
To delete a topic, the Describe
and Delete
permissions are required on the topic.
bin/kafka-topics.sh --delete --topic my-topic \ --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties
Select a lead partition
To run leader selection for topic partitions, the Alter
permission is required on the Cluster:kafka-cluster
.
bin/kafka-leader-election.sh --topic my-topic --partition 0 --election-type PREFERRED / --bootstrap-server my-cluster-kafka-bootstrap:9092 --admin.config /tmp/config.properties
Reassign partitions
To generate a partition reassignment file, Describe
permissions are required on the topics involved.
bin/kafka-reassign-partitions.sh --topics-to-move-json-file /tmp/topics-to-move.json --broker-list "0,1" --generate \ --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config /tmp/config.properties > /tmp/partition-reassignment.json
To execute the partition reassignment, Describe
and Alter
permissions are required on Cluster:kafka-cluster
. Also, Describe
permissions are required on the topics involved.
bin/kafka-reassign-partitions.sh --reassignment-json-file /tmp/partition-reassignment.json --execute \ --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config /tmp/config.properties
To verify partition reassignment, Describe
, and AlterConfigs
permissions are required on Cluster:kafka-cluster
, and on each of the topics involved.
bin/kafka-reassign-partitions.sh --reassignment-json-file /tmp/partition-reassignment.json --verify \ --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config /tmp/config.properties
6.5.4. Trying Red Hat Single Sign-On Authorization Services
This example explains how to use Red Hat Single Sign-On Authorization Services with keycloak
authorization. Use Red Hat Single Sign-On Authorization Services to enforce access restrictions on Kafka clients. Red Hat Single Sign-On Authorization Services use authorization scopes, policies and permissions to define and apply access control to resources.
Red Hat Single Sign-On Authorization Services REST endpoints provide a list of granted permissions on resources for authenticated users. The list of grants (permissions) is fetched from the Red Hat Single Sign-On server as the first action after an authenticated session is established by the Kafka client. The list is refreshed in the background so that changes to the grants are detected. Grants are cached and enforced locally on the Kafka broker for each user session to provide fast authorization decisions.
AMQ Streams provides example configuration files. These include the following example files for setting up Red Hat Single Sign-On:
kafka-ephemeral-oauth-single-keycloak-authz.yaml
-
An example
Kafka
custom resource configured for OAuth 2.0 token-based authorization using Red Hat Single Sign-On. You can use the custom resource to deploy a Kafka cluster that useskeycloak
authorization and token-basedoauth
authentication. kafka-authz-realm.json
- An example Red Hat Single Sign-On realm configured with sample groups, users, roles and clients. You can import the realm into a Red Hat Single Sign-On instance to set up fine-grained permissions to access Kafka.
If you want to try the example with Red Hat Single Sign-On, use these files to perform the tasks outlined in this section in the order shown.
Authentication
When you configure token-based oauth
authentication, you specify a jwksEndpointUri
as the URI for local JWT validation. When you configure keycloak
authorization, you specify a tokenEndpointUri
as the URI of the Red Hat Single Sign-On token endpoint. The hostname for both URIs must be the same.
Targeted permissions with group or role policies
In Red Hat Single Sign-On, confidential clients with service accounts enabled can authenticate to the server in their own name using a client ID and a secret. This is convenient for microservices that typically act in their own name, and not as agents of a particular user (like a web site). Service accounts can have roles assigned like regular users. They cannot, however, have groups assigned. As a consequence, if you want to target permissions to microservices using service accounts, you cannot use group policies, and should instead use role policies. Conversely, if you want to limit certain permissions only to regular user accounts where authentication with a username and password is required, you can achieve that as a side effect of using the group policies rather than the role policies. This is what is used in this example for permissions that start with ClusterManager
. Performing cluster management is usually done interactively using CLI tools. It makes sense to require the user to log in before using the resulting access token to authenticate to the Kafka broker. In this case, the access token represents the specific user, rather than the client application.
6.5.4.1. Accessing the Red Hat Single Sign-On Admin Console
Set up Red Hat Single Sign-On, then connect to its Admin Console and add the preconfigured realm. Use the example kafka-authz-realm.json
file to import the realm. You can check the authorization rules defined for the realm in the Admin Console. The rules grant access to the resources on the Kafka cluster configured to use the example Red Hat Single Sign-On realm.
Prerequisites
- A running OpenShift cluster.
-
The AMQ Streams
examples/security/keycloak-authorization/kafka-authz-realm.json
file that contains the preconfigured realm.
Procedure
- Install the Red Hat Single Sign-On server using the Red Hat Single Sign-On Operator as described in Server Installation and Configuration in the Red Hat Single Sign-On documentation.
- Wait until the Red Hat Single Sign-On instance is running.
Get the external hostname to be able to access the Admin Console.
NS=sso oc get ingress keycloak -n $NS
In this example, we assume the Red Hat Single Sign-On server is running in the
sso
namespace.Get the password for the
admin
user.oc get -n $NS pod keycloak-0 -o yaml | less
The password is stored as a secret, so get the configuration YAML file for the Red Hat Single Sign-On instance to identify the name of the secret (
secretKeyRef.name
).Use the name of the secret to obtain the clear text password.
SECRET_NAME=credential-keycloak oc get -n $NS secret $SECRET_NAME -o yaml | grep PASSWORD | awk '{print $2}' | base64 -D
In this example, we assume the name of the secret is
credential-keycloak
.Log in to the Admin Console with the username
admin
and the password you obtained.Use
https://HOSTNAME
to access the OpenShift ingress.You can now upload the example realm to Red Hat Single Sign-On using the Admin Console.
- Click Add Realm to import the example realm.
Add the
examples/security/keycloak-authorization/kafka-authz-realm.json
file, and then click Create.You now have
kafka-authz
as your current realm in the Admin Console.The default view displays the Master realm.
In the Red Hat Single Sign-On Admin Console, go to Clients > kafka > Authorization > Settings and check that Decision Strategy is set to Affirmative.
An affirmative policy means that at least one policy must be satisfied for a client to access the Kafka cluster.
In the Red Hat Single Sign-On Admin Console, go to Groups, Users, Roles and Clients to view the realm configuration.
- Groups
-
Groups
are used to create user groups and set user permissions. Groups are sets of users with a name assigned. They are used to compartmentalize users into geographical, organizational or departmental units. Groups can be linked to an LDAP identity provider. You can make a user a member of a group through a custom LDAP server admin user interface, for example, to grant permissions on Kafka resources. - Users
-
Users
are used to create users. For this example,alice
andbob
are defined.alice
is a member of theClusterManager
group andbob
is a member ofClusterManager-my-cluster
group. Users can be stored in an LDAP identity provider. - Roles
-
Roles
mark users or clients as having certain permissions. Roles are a concept analogous to groups. They are usually used to tag users with organizational roles and have the requisite permissions. Roles cannot be stored in an LDAP identity provider. If LDAP is a requirement, you can use groups instead, and add Red Hat Single Sign-On roles to the groups so that when users are assigned a group they also get a corresponding role. - Clients
Clients
can have specific configurations. For this example,kafka
,kafka-cli
,team-a-client
, andteam-b-client
clients are configured.-
The
kafka
client is used by Kafka brokers to perform the necessary OAuth 2.0 communication for access token validation. This client also contains the authorization services resource definitions, policies, and authorization scopes used to perform authorization on the Kafka brokers. The authorization configuration is defined in thekafka
client from the Authorization tab, which becomes visible when Authorization Enabled is switched on from the Settings tab. -
The
kafka-cli
client is a public client that is used by the Kafka command line tools when authenticating with username and password to obtain an access token or a refresh token. -
The
team-a-client
andteam-b-client
clients are confidential clients representing services with partial access to certain Kafka topics.
-
The
In the Red Hat Single Sign-On Admin Console, go to Authorization > Permissions to see the granted permissions that use the resources and policies defined for the realm.
For example, the
kafka
client has the following permissions:Dev Team A can write to topics that start with x_ on any cluster Dev Team B can read from topics that start with x_ on any cluster Dev Team B can update consumer group offsets that start with x_ on any cluster ClusterManager of my-cluster Group has full access to cluster config on my-cluster ClusterManager of my-cluster Group has full access to consumer groups on my-cluster ClusterManager of my-cluster Group has full access to topics on my-cluster
- Dev Team A
-
The Dev Team A realm role can write to topics that start with
x_
on any cluster. This combines a resource calledTopic:x_*
,Describe
andWrite
scopes, and theDev Team A
policy. TheDev Team A
policy matches all users that have a realm role calledDev Team A
. - Dev Team B
-
The Dev Team B realm role can read from topics that start with
x_
on any cluster. This combinesTopic:x_*
,Group:x_*
resources,Describe
andRead
scopes, and theDev Team B
policy. TheDev Team B
policy matches all users that have a realm role calledDev Team B
. Matching users and clients have the ability to read from topics, and update the consumed offsets for topics and consumer groups that have names starting withx_
.
6.5.4.2. Deploying a Kafka cluster with Red Hat Single Sign-On authorization
Deploy a Kafka cluster configured to connect to the Red Hat Single Sign-On server. Use the example kafka-ephemeral-oauth-single-keycloak-authz.yaml
file to deploy the Kafka cluster as a Kafka
custom resource. The example deploys a single-node Kafka cluster with keycloak
authorization and oauth
authentication.
Prerequisites
- The Red Hat Single Sign-On authorization server is deployed to your OpenShift cluster and loaded with the example realm.
- The Cluster Operator is deployed to your OpenShift cluster.
-
The AMQ Streams
examples/security/keycloak-authorization/kafka-ephemeral-oauth-single-keycloak-authz.yaml
custom resource.
Procedure
Use the hostname of the Red Hat Single Sign-On instance you deployed to prepare a truststore certificate for Kafka brokers to communicate with the Red Hat Single Sign-On server.
SSO_HOST=SSO-HOSTNAME SSO_HOST_PORT=$SSO_HOST:443 STOREPASS=storepass echo "Q" | openssl s_client -showcerts -connect $SSO_HOST_PORT 2>/dev/null | awk ' /BEGIN CERTIFICATE/,/END CERTIFICATE/ { print $0 } ' > /tmp/sso.crt
The certificate is required as OpenShift ingress is used to make a secure (HTTPS) connection.
Deploy the certificate to OpenShift as a secret.
oc create secret generic oauth-server-cert --from-file=/tmp/sso.crt -n $NS
Set the hostname as an environment variable
SSO_HOST=SSO-HOSTNAME
Create and deploy the example Kafka cluster.
cat examples/security/keycloak-authorization/kafka-ephemeral-oauth-single-keycloak-authz.yaml | sed -E 's#\${SSO_HOST}'"#$SSO_HOST#" | oc create -n $NS -f -
6.5.4.3. Preparing TLS connectivity for a CLI Kafka client session
Create a new pod for an interactive CLI session. Set up a truststore with a Red Hat Single Sign-On certificate for TLS connectivity. The truststore is to connect to Red Hat Single Sign-On and the Kafka broker.
Prerequisites
The Red Hat Single Sign-On authorization server is deployed to your OpenShift cluster and loaded with the example realm.
In the Red Hat Single Sign-On Admin Console, check the roles assigned to the clients are displayed in Clients > Service Account Roles.
- The Kafka cluster configured to connect with Red Hat Single Sign-On is deployed to your OpenShift cluster.
Procedure
Run a new interactive pod container using the AMQ Streams Kafka image to connect to a running Kafka broker.
NS=sso oc run -ti --restart=Never --image=registry.redhat.io/amq7/amq-streams-kafka-33-rhel8:2.3.0 kafka-cli -n $NS -- /bin/sh
NoteIf
oc
times out waiting on the image download, subsequent attempts may result in an AlreadyExists error.Attach to the pod container.
oc attach -ti kafka-cli -n $NS
Use the hostname of the Red Hat Single Sign-On instance to prepare a certificate for client connection using TLS.
SSO_HOST=SSO-HOSTNAME SSO_HOST_PORT=$SSO_HOST:443 STOREPASS=storepass echo "Q" | openssl s_client -showcerts -connect $SSO_HOST_PORT 2>/dev/null | awk ' /BEGIN CERTIFICATE/,/END CERTIFICATE/ { print $0 } ' > /tmp/sso.crt
Create a truststore for TLS connection to the Kafka brokers.
keytool -keystore /tmp/truststore.p12 -storetype pkcs12 -alias sso -storepass $STOREPASS -import -file /tmp/sso.crt -noprompt
Use the Kafka bootstrap address as the hostname of the Kafka broker and the
tls
listener port (9093) to prepare a certificate for the Kafka broker.KAFKA_HOST_PORT=my-cluster-kafka-bootstrap:9093 STOREPASS=storepass echo "Q" | openssl s_client -showcerts -connect $KAFKA_HOST_PORT 2>/dev/null | awk ' /BEGIN CERTIFICATE/,/END CERTIFICATE/ { print $0 } ' > /tmp/my-cluster-kafka.crt
Add the certificate for the Kafka broker to the truststore.
keytool -keystore /tmp/truststore.p12 -storetype pkcs12 -alias my-cluster-kafka -storepass $STOREPASS -import -file /tmp/my-cluster-kafka.crt -noprompt
Keep the session open to check authorized access.
6.5.4.4. Checking authorized access to Kafka using a CLI Kafka client session
Check the authorization rules applied through the Red Hat Single Sign-On realm using an interactive CLI session. Apply the checks using Kafka’s example producer and consumer clients to create topics with user and service accounts that have different levels of access.
Use the team-a-client
and team-b-client
clients to check the authorization rules. Use the alice
admin user to perform additional administrative tasks on Kafka.
The AMQ Streams Kafka image used in this example contains Kafka producer and consumer binaries.
Prerequisites
- ZooKeeper and Kafka are running in the OpenShift cluster to be able to send and receive messages.
The interactive CLI Kafka client session is started.
Setting up client and admin user configuration
Prepare a Kafka configuration file with authentication properties for the
team-a-client
client.SSO_HOST=SSO-HOSTNAME cat > /tmp/team-a-client.properties << EOF security.protocol=SASL_SSL ssl.truststore.location=/tmp/truststore.p12 ssl.truststore.password=$STOREPASS ssl.truststore.type=PKCS12 sasl.mechanism=OAUTHBEARER sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.client.id="team-a-client" \ oauth.client.secret="team-a-client-secret" \ oauth.ssl.truststore.location="/tmp/truststore.p12" \ oauth.ssl.truststore.password="$STOREPASS" \ oauth.ssl.truststore.type="PKCS12" \ oauth.token.endpoint.uri="https://$SSO_HOST/auth/realms/kafka-authz/protocol/openid-connect/token" ; sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler EOF
The SASL OAUTHBEARER mechanism is used. This mechanism requires a client ID and client secret, which means the client first connects to the Red Hat Single Sign-On server to obtain an access token. The client then connects to the Kafka broker and uses the access token to authenticate.
Prepare a Kafka configuration file with authentication properties for the
team-b-client
client.cat > /tmp/team-b-client.properties << EOF security.protocol=SASL_SSL ssl.truststore.location=/tmp/truststore.p12 ssl.truststore.password=$STOREPASS ssl.truststore.type=PKCS12 sasl.mechanism=OAUTHBEARER sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.client.id="team-b-client" \ oauth.client.secret="team-b-client-secret" \ oauth.ssl.truststore.location="/tmp/truststore.p12" \ oauth.ssl.truststore.password="$STOREPASS" \ oauth.ssl.truststore.type="PKCS12" \ oauth.token.endpoint.uri="https://$SSO_HOST/auth/realms/kafka-authz/protocol/openid-connect/token" ; sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler EOF
Authenticate admin user
alice
by usingcurl
and performing a password grant authentication to obtain a refresh token.USERNAME=alice PASSWORD=alice-password GRANT_RESPONSE=$(curl -X POST "https://$SSO_HOST/auth/realms/kafka-authz/protocol/openid-connect/token" -H 'Content-Type: application/x-www-form-urlencoded' -d "grant_type=password&username=$USERNAME&password=$PASSWORD&client_id=kafka-cli&scope=offline_access" -s -k) REFRESH_TOKEN=$(echo $GRANT_RESPONSE | awk -F "refresh_token\":\"" '{printf $2}' | awk -F "\"" '{printf $1}')
The refresh token is an offline token that is long-lived and does not expire.
Prepare a Kafka configuration file with authentication properties for the admin user
alice
.cat > /tmp/alice.properties << EOF security.protocol=SASL_SSL ssl.truststore.location=/tmp/truststore.p12 ssl.truststore.password=$STOREPASS ssl.truststore.type=PKCS12 sasl.mechanism=OAUTHBEARER sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.refresh.token="$REFRESH_TOKEN" \ oauth.client.id="kafka-cli" \ oauth.ssl.truststore.location="/tmp/truststore.p12" \ oauth.ssl.truststore.password="$STOREPASS" \ oauth.ssl.truststore.type="PKCS12" \ oauth.token.endpoint.uri="https://$SSO_HOST/auth/realms/kafka-authz/protocol/openid-connect/token" ; sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler EOF
The
kafka-cli
public client is used for theoauth.client.id
in thesasl.jaas.config
. Since it’s a public client it does not require a secret. The client authenticates with the refresh token that was authenticated in the previous step. The refresh token requests an access token behind the scenes, which is then sent to the Kafka broker for authentication.
Producing messages with authorized access
Use the team-a-client
configuration to check that you can produce messages to topics that start with a_
or x_
.
Write to topic
my-topic
.bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic my-topic \ --producer.config=/tmp/team-a-client.properties First message
This request returns a
Not authorized to access topics: [my-topic]
error.team-a-client
has aDev Team A
role that gives it permission to perform any supported actions on topics that start witha_
, but can only write to topics that start withx_
. The topic namedmy-topic
matches neither of those rules.Write to topic
a_messages
.bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic a_messages \ --producer.config /tmp/team-a-client.properties First message Second message
Messages are produced to Kafka successfully.
- Press CTRL+C to exit the CLI application.
Check the Kafka container log for a debug log of
Authorization GRANTED
for the request.oc logs my-cluster-kafka-0 -f -n $NS
Consuming messages with authorized access
Use the team-a-client
configuration to consume messages from topic a_messages
.
Fetch messages from topic
a_messages
.bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic a_messages \ --from-beginning --consumer.config /tmp/team-a-client.properties
The request returns an error because the
Dev Team A
role forteam-a-client
only has access to consumer groups that have names starting witha_
.Update the
team-a-client
properties to specify the custom consumer group it is permitted to use.bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic a_messages \ --from-beginning --consumer.config /tmp/team-a-client.properties --group a_consumer_group_1
The consumer receives all the messages from the
a_messages
topic.
Administering Kafka with authorized access
The team-a-client
is an account without any cluster-level access, but it can be used with some administrative operations.
List topics.
bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/team-a-client.properties --list
The
a_messages
topic is returned.List consumer groups.
bin/kafka-consumer-groups.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/team-a-client.properties --list
The
a_consumer_group_1
consumer group is returned.Fetch details on the cluster configuration.
bin/kafka-configs.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/team-a-client.properties \ --entity-type brokers --describe --entity-default
The request returns an error because the operation requires cluster level permissions that
team-a-client
does not have.
Using clients with different permissions
Use the team-b-client
configuration to produce messages to topics that start with b_
.
Write to topic
a_messages
.bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic a_messages \ --producer.config /tmp/team-b-client.properties Message 1
This request returns a
Not authorized to access topics: [a_messages]
error.Write to topic
b_messages
.bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic b_messages \ --producer.config /tmp/team-b-client.properties Message 1 Message 2 Message 3
Messages are produced to Kafka successfully.
Write to topic
x_messages
.bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \ --producer.config /tmp/team-b-client.properties Message 1
A
Not authorized to access topics: [x_messages]
error is returned, Theteam-b-client
can only read from topicx_messages
.Write to topic
x_messages
usingteam-a-client
.bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \ --producer.config /tmp/team-a-client.properties Message 1
This request returns a
Not authorized to access topics: [x_messages]
error. Theteam-a-client
can write to thex_messages
topic, but it does not have a permission to create a topic if it does not yet exist. Beforeteam-a-client
can write to thex_messages
topic, an admin power user must create it with the correct configuration, such as the number of partitions and replicas.
Managing Kafka with an authorized admin user
Use admin user alice
to manage Kafka. alice
has full access to manage everything on any Kafka cluster.
Create the
x_messages
topic asalice
.bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/alice.properties \ --topic x_messages --create --replication-factor 1 --partitions 1
The topic is created successfully.
List all topics as
alice
.bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/alice.properties --list bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/team-a-client.properties --list bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/team-b-client.properties --list
Admin user
alice
can list all the topics, whereasteam-a-client
andteam-b-client
can only list the topics they have access to.The
Dev Team A
andDev Team B
roles both haveDescribe
permission on topics that start withx_
, but they cannot see the other team’s topics because they do not haveDescribe
permissions on them.Use the
team-a-client
to produce messages to thex_messages
topic:bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \ --producer.config /tmp/team-a-client.properties Message 1 Message 2 Message 3
As
alice
created thex_messages
topic, messages are produced to Kafka successfully.Use the
team-b-client
to produce messages to thex_messages
topic.bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \ --producer.config /tmp/team-b-client.properties Message 4 Message 5
This request returns a
Not authorized to access topics: [x_messages]
error.Use the
team-b-client
to consume messages from thex_messages
topic:bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \ --from-beginning --consumer.config /tmp/team-b-client.properties --group x_consumer_group_b
The consumer receives all the messages from the
x_messages
topic.Use the
team-a-client
to consume messages from thex_messages
topic.bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \ --from-beginning --consumer.config /tmp/team-a-client.properties --group x_consumer_group_a
This request returns a
Not authorized to access topics: [x_messages]
error.Use the
team-a-client
to consume messages from a consumer group that begins witha_
.bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \ --from-beginning --consumer.config /tmp/team-a-client.properties --group a_consumer_group_a
This request returns a
Not authorized to access topics: [x_messages]
error.Dev Team A
has noRead
access on topics that start with ax_
.Use
alice
to produce messages to thex_messages
topic.bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \ --from-beginning --consumer.config /tmp/alice.properties
Messages are produced to Kafka successfully.
alice
can read from or write to any topic.Use
alice
to read the cluster configuration.bin/kafka-configs.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/alice.properties \ --entity-type brokers --describe --entity-default
The cluster configuration for this example is empty.