15.5. Using OAuth 2.0 token-based authorization
Streams for Apache Kafka supports the use of OAuth 2.0 token-based authorization through Red Hat Single Sign-On Authorization Services, which allows you to manage security policies and permissions centrally.
Security policies and permissions defined in Red Hat Single Sign-On are used to grant access to resources on Kafka brokers. Users and clients are matched against policies that permit access to perform specific actions on Kafka brokers.
Kafka allows all users full access to brokers by default, and also provides the AclAuthorizer and StandardAuthorizer plugins to configure authorization based on Access Control Lists (ACLs). The ACL rules managed by these plugins are used to grant or deny access to resources based on the username, and these rules are stored within the Kafka cluster itself. However, OAuth 2.0 token-based authorization with Red Hat Single Sign-On offers far greater flexibility on how you wish to implement access control to Kafka brokers. In addition, you can configure your Kafka brokers to use OAuth 2.0 authorization and ACLs.
15.5.1. OAuth 2.0 authorization mechanism 复制链接链接已复制到粘贴板!
OAuth 2.0 authorization in Streams for Apache Kafka uses Red Hat Single Sign-On server Authorization Services REST endpoints to extend token-based authentication with Red Hat Single Sign-On by applying defined security policies on a particular user, and providing a list of permissions granted on different resources for that user. Policies use roles and groups to match permissions to users. OAuth 2.0 authorization enforces permissions locally based on the received list of grants for the user from Red Hat Single Sign-On Authorization Services.
15.5.1.1. Kafka broker custom authorizer 复制链接链接已复制到粘贴板!
A Red Hat Single Sign-On authorizer (KeycloakAuthorizer) is provided with Streams for Apache Kafka. To be able to use the Red Hat Single Sign-On REST endpoints for Authorization Services provided by Red Hat Single Sign-On, you configure a custom authorizer on the Kafka broker.
The authorizer fetches a list of granted permissions from the authorization server as needed, and enforces authorization locally on the Kafka Broker, making rapid authorization decisions for each client request.
15.5.2. Configuring OAuth 2.0 authorization support 复制链接链接已复制到粘贴板!
This procedure describes how to configure Kafka brokers to use OAuth 2.0 authorization using Red Hat Single Sign-On Authorization Services.
Before you begin
Consider the access you require or want to limit for certain users. You can use a combination of Red Hat Single Sign-On groups, roles, clients, and users to configure access in Red Hat Single Sign-On.
Typically, groups are used to match users based on organizational departments or geographical locations. And roles are used to match users based on their function.
With Red Hat Single Sign-On, you can store users and groups in LDAP, whereas clients and roles cannot be stored this way. Storage and access to user data may be a factor in how you choose to configure authorization policies.
Super users always have unconstrained access to a Kafka broker regardless of the authorization implemented on the Kafka broker.
Prerequisites
- Streams for Apache Kafka must be configured to use OAuth 2.0 with Red Hat Single Sign-On for token-based authentication. You use the same Red Hat Single Sign-On server endpoint when you set up authorization.
-
OAuth 2.0 authentication must be configured with the
maxSecondsWithoutReauthenticationoption to enable re-authentication.
Procedure
- Access the Red Hat Single Sign-On Admin Console or use the Red Hat Single Sign-On Admin CLI to enable Authorization Services for the Kafka broker client you created when setting up OAuth 2.0 authentication.
- Use Authorization Services to define resources, authorization scopes, policies, and permissions for the client.
- Bind the permissions to users and clients by assigning them roles and groups.
Configure the Kafka brokers to use Red Hat Single Sign-On authorization by updating the Kafka broker configuration (
Kafka.spec.kafka) of yourKafkaresource in an editor.oc edit kafka my-clusterConfigure the Kafka broker
kafkaconfiguration to usekeycloakauthorization, and to be able to access the authorization server and Authorization Services.For example:
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: my-cluster spec: kafka: # ... authorization: type: keycloak1 tokenEndpointUri: <https://<auth-server-address>/auth/realms/external/protocol/openid-connect/token>2 clientId: kafka3 delegateToKafkaAcls: false4 disableTlsHostnameVerification: false5 superUsers:6 - CN=fred - sam - CN=edward tlsTrustedCertificates:7 - secretName: oauth-server-cert certificate: ca.crt grantsRefreshPeriodSeconds: 608 grantsRefreshPoolSize: 59 grantsMaxIdleSeconds: 30010 grantsGcPeriodSeconds: 30011 grantsAlwaysLatest: false12 connectTimeoutSeconds: 6013 readTimeoutSeconds: 6014 httpRetries: 215 enableMetrics: false16 includeAcceptHeader: false17 #...- 1
- Type
keycloakenables Red Hat Single Sign-On authorization. - 2
- URI of the Red Hat Single Sign-On token endpoint. For production, always use
https://urls. When you configure token-basedoauthauthentication, you specify ajwksEndpointUrias the URI for local JWT validation. The hostname for thetokenEndpointUriURI must be the same. - 3
- The client ID of the OAuth 2.0 client definition in Red Hat Single Sign-On that has Authorization Services enabled. Typically,
kafkais used as the ID. - 4
- (Optional) Delegate authorization to Kafka
AclAuthorizerandStandardAuthorizerif access is denied by Red Hat Single Sign-On Authorization Services policies. Default isfalse. - 5
- (Optional) Disable TLS hostname verification. Default is
false. - 6
- (Optional) Designated super users.
- 7
- (Optional) Trusted certificates for TLS connection to the authorization server.
- 8
- (Optional) The time between two consecutive grants refresh runs. That is the maximum time for active sessions to detect any permissions changes for the user on Red Hat Single Sign-On. The default value is 60.
- 9
- (Optional) The number of threads to use to refresh (in parallel) the grants for the active sessions. The default value is 5.
- 10
- (Optional) The time, in seconds, after which an idle grant in the cache can be evicted. The default value is 300.
- 11
- (Optional) The time, in seconds, between consecutive runs of a job that cleans stale grants from the cache. The default value is 300.
- 12
- (Optional) Controls whether the latest grants are fetched for a new session. When enabled, grants are retrieved from Red Hat Single Sign-On and cached for the user. The default value is
false. - 13
- (Optional) The connect timeout in seconds when connecting to the Red Hat Single Sign-On token endpoint. The default value is 60.
- 14
- (Optional) The read timeout in seconds when connecting to the Red Hat Single Sign-On token endpoint. The default value is 60.
- 15
- (Optional) The maximum number of times to retry (without pausing) a failed HTTP request to the authorization server. The default value is
0, meaning that no retries are performed. To use this option effectively, consider reducing the timeout times for theconnectTimeoutSecondsandreadTimeoutSecondsoptions. However, note that retries may prevent the current worker thread from being available to other requests, and if too many requests stall, it could make the Kafka broker unresponsive. - 16
- (Optional) Enable or disable OAuth metrics. The default value is
false. - 17
- (Optional) Some authorization servers have issues with client sending
Accept: application/jsonheader. By settingincludeAcceptHeader: falsethe header will not be sent. Default istrue.
- Save and exit the editor, then wait for rolling updates to complete.
Check the update in the logs or by watching the pod state transitions:
oc logs -f ${POD_NAME} -c kafka oc get pod -wThe rolling update configures the brokers to use OAuth 2.0 authorization.
- Verify the configured permissions by accessing Kafka brokers as clients or users with specific roles, making sure they have the necessary access, or do not have the access they are not supposed to have.
This section describes the authorization models used by Red Hat Single Sign-On Authorization Services and Kafka, and defines the important concepts in each model.
To grant permissions to access Kafka, you can map Red Hat Single Sign-On Authorization Services objects to Kafka resources by creating an OAuth client specification in Red Hat Single Sign-On. Kafka permissions are granted to user accounts or service accounts using Red Hat Single Sign-On Authorization Services rules.
Examples are shown of the different user permissions required for common Kafka operations, such as creating and listing topics.
Kafka and Red Hat Single Sign-On Authorization Services use different authorization models.
Kafka authorization model
Kafka’s authorization model uses resource types. When a Kafka client performs an action on a broker, the broker uses the configured KeycloakAuthorizer to check the client’s permissions, based on the action and resource type.
Kafka uses five resource types to control access: Topic, Group, Cluster, TransactionalId, and DelegationToken. Each resource type has a set of available permissions.
Topic
-
Create -
Write -
Read -
Delete -
Describe -
DescribeConfigs -
Alter -
AlterConfigs
Group
-
Read -
Describe -
Delete
Cluster
-
Create -
Describe -
Alter -
DescribeConfigs -
AlterConfigs -
IdempotentWrite -
ClusterAction
TransactionalId
-
Describe -
Write
DelegationToken
-
Describe
Red Hat Single Sign-On Authorization Services model
The Red Hat Single Sign-On Authorization Services model has four concepts for defining and granting permissions: resources, authorization scopes, policies, and permissions.
- Resources
- A resource is a set of resource definitions that are used to match resources with permitted actions. A resource might be an individual topic, for example, or all topics with names starting with the same prefix. A resource definition is associated with a set of available authorization scopes, which represent a set of all actions available on the resource. Often, only a subset of these actions is actually permitted.
- Authorization scopes
- An authorization scope is a set of all the available actions on a specific resource definition. When you define a new resource, you add scopes from the set of all scopes.
- Policies
A policy is an authorization rule that uses criteria to match against a list of accounts. Policies can match:
- Service accounts based on client ID or roles
- User accounts based on username, groups, or roles.
- Permissions
- A permission grants a subset of authorization scopes on a specific resource definition to a set of users.
The Kafka authorization model is used as a basis for defining the Red Hat Single Sign-On roles and resources that will control access to Kafka.
To grant Kafka permissions to user accounts or service accounts, you first create an OAuth client specification in Red Hat Single Sign-On for the Kafka broker. You then specify Red Hat Single Sign-On Authorization Services rules on the client. Typically, the client id of the OAuth client that represents the broker is kafka. The example configuration files provided with Streams for Apache Kafka use kafka as the OAuth client id.
If you have multiple Kafka clusters, you can use a single OAuth client (kafka) for all of them. This gives you a single, unified space in which to define and manage authorization rules. However, you can also use different OAuth client ids (for example, my-cluster-kafka or cluster-dev-kafka) and define authorization rules for each cluster within each client configuration.
The kafka client definition must have the Authorization Enabled option enabled in the Red Hat Single Sign-On Admin Console.
All permissions exist within the scope of the kafka client. If you have different Kafka clusters configured with different OAuth client IDs, they each need a separate set of permissions even though they’re part of the same Red Hat Single Sign-On realm.
When the Kafka client uses OAUTHBEARER authentication, the Red Hat Single Sign-On authorizer (KeycloakAuthorizer) uses the access token of the current session to retrieve a list of grants from the Red Hat Single Sign-On server. To retrieve the grants, the authorizer evaluates the Red Hat Single Sign-On Authorization Services policies and permissions.
Authorization scopes for Kafka permissions
An initial Red Hat Single Sign-On configuration usually involves uploading authorization scopes to create a list of all possible actions that can be performed on each Kafka resource type. This step is performed once only, before defining any permissions. You can add authorization scopes manually instead of uploading them.
Authorization scopes must contain all the possible Kafka permissions regardless of the resource type:
-
Create -
Write -
Read -
Delete -
Describe -
Alter -
DescribeConfig -
AlterConfig -
ClusterAction -
IdempotentWrite
If you’re certain you won’t need a permission (for example, IdempotentWrite), you can omit it from the list of authorization scopes. However, that permission won’t be available to target on Kafka resources.
Resource patterns for permissions checks
Resource patterns are used for pattern matching against the targeted resources when performing permission checks. The general pattern format is RESOURCE-TYPE:PATTERN-NAME.
The resource types mirror the Kafka authorization model. The pattern allows for two matching options:
-
Exact matching (when the pattern does not end with
*) -
Prefix matching (when the pattern ends with
*)
Example patterns for resources
Topic:my-topic
Topic:orders-*
Group:orders-*
Cluster:*
Additionally, the general pattern format can be prefixed by kafka-cluster:CLUSTER-NAME followed by a comma, where CLUSTER-NAME refers to the metadata.name in the Kafka custom resource.
Example patterns for resources with cluster prefix
kafka-cluster:my-cluster,Topic:*
kafka-cluster:*,Group:b_*
When the kafka-cluster prefix is missing, it is assumed to be kafka-cluster:*.
When defining a resource, you can associate it with a list of possible authorization scopes which are relevant to the resource. Set whatever actions make sense for the targeted resource type.
Though you may add any authorization scope to any resource, only the scopes supported by the resource type are considered for access control.
Policies for applying access permission
Policies are used to target permissions to one or more user accounts or service accounts. Targeting can refer to:
- Specific user or service accounts
- Realm roles or client roles
- User groups
- JavaScript rules to match a client IP address
A policy is given a unique name and can be reused to target multiple permissions to multiple resources.
Permissions to grant access
Use fine-grained permissions to pull together the policies, resources, and authorization scopes that grant access to users.
The name of each permission should clearly define which permissions it grants to which users. For example, Dev Team B can read from topics starting with x.
The following examples demonstrate the user permissions required for performing common operations on Kafka.
Create a topic
To create a topic, the Create permission is required for the specific topic, or for Cluster:kafka-cluster.
bin/kafka-topics.sh --create --topic my-topic \
--bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties
List topics
If a user has the Describe permission on a specified topic, the topic is listed.
bin/kafka-topics.sh --list \
--bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties
Display topic details
To display a topic’s details, Describe and DescribeConfigs permissions are required on the topic.
bin/kafka-topics.sh --describe --topic my-topic \
--bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties
Produce messages to a topic
To produce messages to a topic, Describe and Write permissions are required on the topic.
If the topic hasn’t been created yet, and topic auto-creation is enabled, the permissions to create a topic are required.
bin/kafka-console-producer.sh --topic my-topic \
--bootstrap-server my-cluster-kafka-bootstrap:9092 --producer.config=/tmp/config.properties
Consume messages from a topic
To consume messages from a topic, Describe and Read permissions are required on the topic. Consuming from the topic normally relies on storing the consumer offsets in a consumer group, which requires additional Describe and Read permissions on the consumer group.
Two resources are needed for matching. For example:
Topic:my-topic
Group:my-group-*
bin/kafka-console-consumer.sh --topic my-topic --group my-group-1 --from-beginning \
--bootstrap-server my-cluster-kafka-bootstrap:9092 --consumer.config /tmp/config.properties
Produce messages to a topic using an idempotent producer
As well as the permissions for producing to a topic, an additional IdempotentWrite permission is required on the Cluster:kafka-cluster resource.
Two resources are needed for matching. For example:
Topic:my-topic
Cluster:kafka-cluster
bin/kafka-console-producer.sh --topic my-topic \
--bootstrap-server my-cluster-kafka-bootstrap:9092 --producer.config=/tmp/config.properties --producer-property enable.idempotence=true --request-required-acks -1
List consumer groups
When listing consumer groups, only the groups on which the user has the Describe permissions are returned. Alternatively, if the user has the Describe permission on the Cluster:kafka-cluster, all the consumer groups are returned.
bin/kafka-consumer-groups.sh --list \
--bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties
Display consumer group details
To display a consumer group’s details, the Describe permission is required on the group and the topics associated with the group.
bin/kafka-consumer-groups.sh --describe --group my-group-1 \
--bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties
Change topic configuration
To change a topic’s configuration, the Describe and Alter permissions are required on the topic.
bin/kafka-topics.sh --alter --topic my-topic --partitions 2 \
--bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties
Display Kafka broker configuration
In order to use kafka-configs.sh to get a broker’s configuration, the DescribeConfigs permission is required on the Cluster:kafka-cluster.
bin/kafka-configs.sh --entity-type brokers --entity-name 0 --describe --all \
--bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties
Change Kafka broker configuration
To change a Kafka broker’s configuration, DescribeConfigs and AlterConfigs permissions are required on Cluster:kafka-cluster.
bin/kafka-configs --entity-type brokers --entity-name 0 --alter --add-config log.cleaner.threads=2 \
--bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties
Delete a topic
To delete a topic, the Describe and Delete permissions are required on the topic.
bin/kafka-topics.sh --delete --topic my-topic \
--bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties
Select a lead partition
To run leader selection for topic partitions, the Alter permission is required on the Cluster:kafka-cluster.
bin/kafka-leader-election.sh --topic my-topic --partition 0 --election-type PREFERRED /
--bootstrap-server my-cluster-kafka-bootstrap:9092 --admin.config /tmp/config.properties
Reassign partitions
To generate a partition reassignment file, Describe permissions are required on the topics involved.
bin/kafka-reassign-partitions.sh --topics-to-move-json-file /tmp/topics-to-move.json --broker-list "0,1" --generate \
--bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config /tmp/config.properties > /tmp/partition-reassignment.json
To execute the partition reassignment, Describe and Alter permissions are required on Cluster:kafka-cluster. Also, Describe permissions are required on the topics involved.
bin/kafka-reassign-partitions.sh --reassignment-json-file /tmp/partition-reassignment.json --execute \
--bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config /tmp/config.properties
To verify partition reassignment, Describe, and AlterConfigs permissions are required on Cluster:kafka-cluster, and on each of the topics involved.
bin/kafka-reassign-partitions.sh --reassignment-json-file /tmp/partition-reassignment.json --verify \
--bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config /tmp/config.properties
This example explains how to use Red Hat Single Sign-On Authorization Services with keycloak authorization. Use Red Hat Single Sign-On Authorization Services to enforce access restrictions on Kafka clients. Red Hat Single Sign-On Authorization Services use authorization scopes, policies and permissions to define and apply access control to resources.
Red Hat Single Sign-On Authorization Services REST endpoints provide a list of granted permissions on resources for authenticated users. The list of grants (permissions) is fetched from the Red Hat Single Sign-On server as the first action after an authenticated session is established by the Kafka client. The list is refreshed in the background so that changes to the grants are detected. Grants are cached and enforced locally on the Kafka broker for each user session to provide fast authorization decisions.
Streams for Apache Kafka provides example configuration files. These include the following example files for setting up Red Hat Single Sign-On:
kafka-ephemeral-oauth-single-keycloak-authz.yaml-
An example
Kafkacustom resource configured for OAuth 2.0 token-based authorization using Red Hat Single Sign-On. You can use the custom resource to deploy a Kafka cluster that useskeycloakauthorization and token-basedoauthauthentication. kafka-authz-realm.json- An example Red Hat Single Sign-On realm configured with sample groups, users, roles and clients. You can import the realm into a Red Hat Single Sign-On instance to set up fine-grained permissions to access Kafka.
If you want to try the example with Red Hat Single Sign-On, use these files to perform the tasks outlined in this section in the order shown.
Authentication
When you configure token-based oauth authentication, you specify a jwksEndpointUri as the URI for local JWT validation. When you configure keycloak authorization, you specify a tokenEndpointUri as the URI of the Red Hat Single Sign-On token endpoint. The hostname for both URIs must be the same.
Targeted permissions with group or role policies
In Red Hat Single Sign-On, confidential clients with service accounts enabled can authenticate to the server in their own name using a client ID and a secret. This is convenient for microservices that typically act in their own name, and not as agents of a particular user (like a web site). Service accounts can have roles assigned like regular users. They cannot, however, have groups assigned. As a consequence, if you want to target permissions to microservices using service accounts, you cannot use group policies, and should instead use role policies. Conversely, if you want to limit certain permissions only to regular user accounts where authentication with a username and password is required, you can achieve that as a side effect of using the group policies rather than the role policies. This is what is used in this example for permissions that start with ClusterManager. Performing cluster management is usually done interactively using CLI tools. It makes sense to require the user to log in before using the resulting access token to authenticate to the Kafka broker. In this case, the access token represents the specific user, rather than the client application.
Set up Red Hat Single Sign-On, then connect to its Admin Console and add the preconfigured realm. Use the example kafka-authz-realm.json file to import the realm. You can check the authorization rules defined for the realm in the Admin Console. The rules grant access to the resources on the Kafka cluster configured to use the example Red Hat Single Sign-On realm.
Prerequisites
- A running OpenShift cluster.
-
The Streams for Apache Kafka
examples/security/keycloak-authorization/kafka-authz-realm.jsonfile that contains the preconfigured realm.
Procedure
- Install the Red Hat Single Sign-On server using the Red Hat Single Sign-On Operator as described in Server Installation and Configuration in the Red Hat Single Sign-On documentation.
- Wait until the Red Hat Single Sign-On instance is running.
Get the external hostname to be able to access the Admin Console.
NS=sso oc get ingress keycloak -n $NSIn this example, we assume the Red Hat Single Sign-On server is running in the
ssonamespace.Get the password for the
adminuser.oc get -n $NS pod keycloak-0 -o yaml | lessThe password is stored as a secret, so get the configuration YAML file for the Red Hat Single Sign-On instance to identify the name of the secret (
secretKeyRef.name).Use the name of the secret to obtain the clear text password.
SECRET_NAME=credential-keycloak oc get -n $NS secret $SECRET_NAME -o yaml | grep PASSWORD | awk '{print $2}' | base64 -DIn this example, we assume the name of the secret is
credential-keycloak.Log in to the Admin Console with the username
adminand the password you obtained.Use
https://HOSTNAMEto access the KubernetesIngress.You can now upload the example realm to Red Hat Single Sign-On using the Admin Console.
- Click Add Realm to import the example realm.
Add the
examples/security/keycloak-authorization/kafka-authz-realm.jsonfile, and then click Create.You now have
kafka-authzas your current realm in the Admin Console.The default view displays the Master realm.
In the Red Hat Single Sign-On Admin Console, go to Clients > kafka > Authorization > Settings and check that Decision Strategy is set to Affirmative.
An affirmative policy means that at least one policy must be satisfied for a client to access the Kafka cluster.
In the Red Hat Single Sign-On Admin Console, go to Groups, Users, Roles and Clients to view the realm configuration.
- Groups
-
Groupsare used to create user groups and set user permissions. Groups are sets of users with a name assigned. They are used to compartmentalize users into geographical, organizational or departmental units. Groups can be linked to an LDAP identity provider. You can make a user a member of a group through a custom LDAP server admin user interface, for example, to grant permissions on Kafka resources. - Users
-
Usersare used to create users. For this example,aliceandbobare defined.aliceis a member of theClusterManagergroup andbobis a member ofClusterManager-my-clustergroup. Users can be stored in an LDAP identity provider. - Roles
-
Rolesmark users or clients as having certain permissions. Roles are a concept analogous to groups. They are usually used to tag users with organizational roles and have the requisite permissions. Roles cannot be stored in an LDAP identity provider. If LDAP is a requirement, you can use groups instead, and add Red Hat Single Sign-On roles to the groups so that when users are assigned a group they also get a corresponding role. - Clients
Clientscan have specific configurations. For this example,kafka,kafka-cli,team-a-client, andteam-b-clientclients are configured.-
The
kafkaclient is used by Kafka brokers to perform the necessary OAuth 2.0 communication for access token validation. This client also contains the authorization services resource definitions, policies, and authorization scopes used to perform authorization on the Kafka brokers. The authorization configuration is defined in thekafkaclient from the Authorization tab, which becomes visible when Authorization Enabled is switched on from the Settings tab. -
The
kafka-cliclient is a public client that is used by the Kafka command line tools when authenticating with username and password to obtain an access token or a refresh token. -
The
team-a-clientandteam-b-clientclients are confidential clients representing services with partial access to certain Kafka topics.
-
The
In the Red Hat Single Sign-On Admin Console, go to Authorization > Permissions to see the granted permissions that use the resources and policies defined for the realm.
For example, the
kafkaclient has the following permissions:Dev Team A can write to topics that start with x_ on any cluster Dev Team B can read from topics that start with x_ on any cluster Dev Team B can update consumer group offsets that start with x_ on any cluster ClusterManager of my-cluster Group has full access to cluster config on my-cluster ClusterManager of my-cluster Group has full access to consumer groups on my-cluster ClusterManager of my-cluster Group has full access to topics on my-cluster- Dev Team A
-
The Dev Team A realm role can write to topics that start with
x_on any cluster. This combines a resource calledTopic:x_*,DescribeandWritescopes, and theDev Team Apolicy. TheDev Team Apolicy matches all users that have a realm role calledDev Team A. - Dev Team B
-
The Dev Team B realm role can read from topics that start with
x_on any cluster. This combinesTopic:x_*,Group:x_*resources,DescribeandReadscopes, and theDev Team Bpolicy. TheDev Team Bpolicy matches all users that have a realm role calledDev Team B. Matching users and clients have the ability to read from topics, and update the consumed offsets for topics and consumer groups that have names starting withx_.
Deploy a Kafka cluster configured to connect to the Red Hat Single Sign-On server. Use the example kafka-ephemeral-oauth-single-keycloak-authz.yaml file to deploy the Kafka cluster as a Kafka custom resource. The example deploys a single-node Kafka cluster with keycloak authorization and oauth authentication.
Prerequisites
- The Red Hat Single Sign-On authorization server is deployed to your OpenShift cluster and loaded with the example realm.
- The Cluster Operator is deployed to your OpenShift cluster.
-
The Streams for Apache Kafka
examples/security/keycloak-authorization/kafka-ephemeral-oauth-single-keycloak-authz.yamlcustom resource.
Procedure
Use the hostname of the Red Hat Single Sign-On instance you deployed to prepare a truststore certificate for Kafka brokers to communicate with the Red Hat Single Sign-On server.
SSO_HOST=SSO-HOSTNAME SSO_HOST_PORT=$SSO_HOST:443 STOREPASS=storepass echo "Q" | openssl s_client -showcerts -connect $SSO_HOST_PORT 2>/dev/null | awk ' /BEGIN CERTIFICATE/,/END CERTIFICATE/ { print $0 } ' > /tmp/sso.pemThe certificate is required as Kubernetes
Ingressis used to make a secure (HTTPS) connection.Usually there is not one single certificate, but a certificate chain. You only have to provide the top-most issuer CA, which is listed last in the
/tmp/sso.pemfile. You can extract it manually or using the following commands:Example command to extract the top CA certificate in a certificate chain
split -p "-----BEGIN CERTIFICATE-----" sso.pem sso- for f in $(ls sso-*); do mv $f $f.pem; done cp $(ls sso-* | sort -r | head -n 1) sso-ca.crt注意A trusted CA certificate is normally obtained from a trusted source, and not by using the
opensslcommand.Deploy the certificate to OpenShift as a secret.
oc create secret generic oauth-server-cert --from-file=/tmp/sso-ca.crt -n $NSSet the hostname as an environment variable
SSO_HOST=SSO-HOSTNAMECreate and deploy the example Kafka cluster.
cat examples/security/keycloak-authorization/kafka-ephemeral-oauth-single-keycloak-authz.yaml | sed -E 's#\${SSO_HOST}'"#$SSO_HOST#" | oc create -n $NS -f -
Create a new pod for an interactive CLI session. Set up a truststore with a Red Hat Single Sign-On certificate for TLS connectivity. The truststore is to connect to Red Hat Single Sign-On and the Kafka broker.
Prerequisites
The Red Hat Single Sign-On authorization server is deployed to your OpenShift cluster and loaded with the example realm.
In the Red Hat Single Sign-On Admin Console, check the roles assigned to the clients are displayed in Clients > Service Account Roles.
- The Kafka cluster configured to connect with Red Hat Single Sign-On is deployed to your OpenShift cluster.
Procedure
Run a new interactive pod container using the Streams for Apache Kafka image to connect to a running Kafka broker.
NS=sso oc run -ti --restart=Never --image=registry.redhat.io/amq-streams/kafka-37-rhel9:2.7.0 kafka-cli -n $NS -- /bin/sh注意If
octimes out waiting on the image download, subsequent attempts may result in an AlreadyExists error.Attach to the pod container.
oc attach -ti kafka-cli -n $NSUse the hostname of the Red Hat Single Sign-On instance to prepare a certificate for client connection using TLS.
SSO_HOST=SSO-HOSTNAME SSO_HOST_PORT=$SSO_HOST:443 STOREPASS=storepass echo "Q" | openssl s_client -showcerts -connect $SSO_HOST_PORT 2>/dev/null | awk ' /BEGIN CERTIFICATE/,/END CERTIFICATE/ { print $0 } ' > /tmp/sso.pemUsually there is not one single certificate, but a certificate chain. You only have to provide the top-most issuer CA, which is listed last in the
/tmp/sso.pemfile. You can extract it manually or using the following command:Example command to extract the top CA certificate in a certificate chain
split -p "-----BEGIN CERTIFICATE-----" sso.pem sso- for f in $(ls sso-*); do mv $f $f.pem; done cp $(ls sso-* | sort -r | head -n 1) sso-ca.crt注意A trusted CA certificate is normally obtained from a trusted source, and not by using the
opensslcommand.Create a truststore for TLS connection to the Kafka brokers.
keytool -keystore /tmp/truststore.p12 -storetype pkcs12 -alias sso -storepass $STOREPASS -import -file /tmp/sso-ca.crt -nopromptUse the Kafka bootstrap address as the hostname of the Kafka broker and the
tlslistener port (9093) to prepare a certificate for the Kafka broker.KAFKA_HOST_PORT=my-cluster-kafka-bootstrap:9093 STOREPASS=storepass echo "Q" | openssl s_client -showcerts -connect $KAFKA_HOST_PORT 2>/dev/null | awk ' /BEGIN CERTIFICATE/,/END CERTIFICATE/ { print $0 } ' > /tmp/my-cluster-kafka.pemThe obtained
.pemfile is usually not one single certificate, but a certificate chain. You only have to provide the top-most issuer CA, which is listed last in the/tmp/my-cluster-kafka.pemfile. You can extract it manually or using the following command:Example command to extract the top CA certificate in a certificate chain
split -p "-----BEGIN CERTIFICATE-----" /tmp/my-cluster-kafka.pem kafka- for f in $(ls kafka-*); do mv $f $f.pem; done cp $(ls kafka-* | sort -r | head -n 1) my-cluster-kafka-ca.crt注意A trusted CA certificate is normally obtained from a trusted source, and not by using the
opensslcommand. For this example we assume the client is running in a pod in the same namespace where the Kafka cluster was deployed. If the client is accessing the Kafka cluster from outside the OpenShift cluster, you would have to first determine the bootstrap address. In that case you can also get the cluster certificate directly from the OpenShift secret, and there is no need foropenssl. For more information, see 第 14 章 Setting up client access to a Kafka cluster.Add the certificate for the Kafka broker to the truststore.
keytool -keystore /tmp/truststore.p12 -storetype pkcs12 -alias my-cluster-kafka -storepass $STOREPASS -import -file /tmp/my-cluster-kafka-ca.crt -nopromptKeep the session open to check authorized access.
Check the authorization rules applied through the Red Hat Single Sign-On realm using an interactive CLI session. Apply the checks using Kafka’s example producer and consumer clients to create topics with user and service accounts that have different levels of access.
Use the team-a-client and team-b-client clients to check the authorization rules. Use the alice admin user to perform additional administrative tasks on Kafka.
The Streams for Apache Kafka image used in this example contains Kafka producer and consumer binaries.
Prerequisites
- ZooKeeper and Kafka are running in the OpenShift cluster to be able to send and receive messages.
The interactive CLI Kafka client session is started.
Setting up client and admin user configuration
Prepare a Kafka configuration file with authentication properties for the
team-a-clientclient.SSO_HOST=SSO-HOSTNAME cat > /tmp/team-a-client.properties << EOF security.protocol=SASL_SSL ssl.truststore.location=/tmp/truststore.p12 ssl.truststore.password=$STOREPASS ssl.truststore.type=PKCS12 sasl.mechanism=OAUTHBEARER sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.client.id="team-a-client" \ oauth.client.secret="team-a-client-secret" \ oauth.ssl.truststore.location="/tmp/truststore.p12" \ oauth.ssl.truststore.password="$STOREPASS" \ oauth.ssl.truststore.type="PKCS12" \ oauth.token.endpoint.uri="https://$SSO_HOST/auth/realms/kafka-authz/protocol/openid-connect/token" ; sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler EOFThe SASL OAUTHBEARER mechanism is used. This mechanism requires a client ID and client secret, which means the client first connects to the Red Hat Single Sign-On server to obtain an access token. The client then connects to the Kafka broker and uses the access token to authenticate.
Prepare a Kafka configuration file with authentication properties for the
team-b-clientclient.cat > /tmp/team-b-client.properties << EOF security.protocol=SASL_SSL ssl.truststore.location=/tmp/truststore.p12 ssl.truststore.password=$STOREPASS ssl.truststore.type=PKCS12 sasl.mechanism=OAUTHBEARER sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.client.id="team-b-client" \ oauth.client.secret="team-b-client-secret" \ oauth.ssl.truststore.location="/tmp/truststore.p12" \ oauth.ssl.truststore.password="$STOREPASS" \ oauth.ssl.truststore.type="PKCS12" \ oauth.token.endpoint.uri="https://$SSO_HOST/auth/realms/kafka-authz/protocol/openid-connect/token" ; sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler EOFAuthenticate admin user
aliceby usingcurland performing a password grant authentication to obtain a refresh token.USERNAME=alice PASSWORD=alice-password GRANT_RESPONSE=$(curl -X POST "https://$SSO_HOST/auth/realms/kafka-authz/protocol/openid-connect/token" -H 'Content-Type: application/x-www-form-urlencoded' -d "grant_type=password&username=$USERNAME&password=$PASSWORD&client_id=kafka-cli&scope=offline_access" -s -k) REFRESH_TOKEN=$(echo $GRANT_RESPONSE | awk -F "refresh_token\":\"" '{printf $2}' | awk -F "\"" '{printf $1}')The refresh token is an offline token that is long-lived and does not expire.
Prepare a Kafka configuration file with authentication properties for the admin user
alice.cat > /tmp/alice.properties << EOF security.protocol=SASL_SSL ssl.truststore.location=/tmp/truststore.p12 ssl.truststore.password=$STOREPASS ssl.truststore.type=PKCS12 sasl.mechanism=OAUTHBEARER sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.refresh.token="$REFRESH_TOKEN" \ oauth.client.id="kafka-cli" \ oauth.ssl.truststore.location="/tmp/truststore.p12" \ oauth.ssl.truststore.password="$STOREPASS" \ oauth.ssl.truststore.type="PKCS12" \ oauth.token.endpoint.uri="https://$SSO_HOST/auth/realms/kafka-authz/protocol/openid-connect/token" ; sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler EOFThe
kafka-clipublic client is used for theoauth.client.idin thesasl.jaas.config. Since it’s a public client it does not require a secret. The client authenticates with the refresh token that was authenticated in the previous step. The refresh token requests an access token behind the scenes, which is then sent to the Kafka broker for authentication.
Producing messages with authorized access
Use the team-a-client configuration to check that you can produce messages to topics that start with a_ or x_.
Write to topic
my-topic.bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic my-topic \ --producer.config=/tmp/team-a-client.properties First messageThis request returns a
Not authorized to access topics: [my-topic]error.team-a-clienthas aDev Team Arole that gives it permission to perform any supported actions on topics that start witha_, but can only write to topics that start withx_. The topic namedmy-topicmatches neither of those rules.Write to topic
a_messages.bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic a_messages \ --producer.config /tmp/team-a-client.properties First message Second messageMessages are produced to Kafka successfully.
- Press CTRL+C to exit the CLI application.
Check the Kafka container log for a debug log of
Authorization GRANTEDfor the request.oc logs my-cluster-kafka-0 -f -n $NS
Consuming messages with authorized access
Use the team-a-client configuration to consume messages from topic a_messages.
Fetch messages from topic
a_messages.bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic a_messages \ --from-beginning --consumer.config /tmp/team-a-client.propertiesThe request returns an error because the
Dev Team Arole forteam-a-clientonly has access to consumer groups that have names starting witha_.Update the
team-a-clientproperties to specify the custom consumer group it is permitted to use.bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic a_messages \ --from-beginning --consumer.config /tmp/team-a-client.properties --group a_consumer_group_1The consumer receives all the messages from the
a_messagestopic.
Administering Kafka with authorized access
The team-a-client is an account without any cluster-level access, but it can be used with some administrative operations.
List topics.
bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/team-a-client.properties --listThe
a_messagestopic is returned.List consumer groups.
bin/kafka-consumer-groups.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/team-a-client.properties --listThe
a_consumer_group_1consumer group is returned.Fetch details on the cluster configuration.
bin/kafka-configs.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/team-a-client.properties \ --entity-type brokers --describe --entity-defaultThe request returns an error because the operation requires cluster level permissions that
team-a-clientdoes not have.
Using clients with different permissions
Use the team-b-client configuration to produce messages to topics that start with b_.
Write to topic
a_messages.bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic a_messages \ --producer.config /tmp/team-b-client.properties Message 1This request returns a
Not authorized to access topics: [a_messages]error.Write to topic
b_messages.bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic b_messages \ --producer.config /tmp/team-b-client.properties Message 1 Message 2 Message 3Messages are produced to Kafka successfully.
Write to topic
x_messages.bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \ --producer.config /tmp/team-b-client.properties Message 1A
Not authorized to access topics: [x_messages]error is returned, Theteam-b-clientcan only read from topicx_messages.Write to topic
x_messagesusingteam-a-client.bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \ --producer.config /tmp/team-a-client.properties Message 1This request returns a
Not authorized to access topics: [x_messages]error. Theteam-a-clientcan write to thex_messagestopic, but it does not have a permission to create a topic if it does not yet exist. Beforeteam-a-clientcan write to thex_messagestopic, an admin power user must create it with the correct configuration, such as the number of partitions and replicas.
Managing Kafka with an authorized admin user
Use admin user alice to manage Kafka. alice has full access to manage everything on any Kafka cluster.
Create the
x_messagestopic asalice.bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/alice.properties \ --topic x_messages --create --replication-factor 1 --partitions 1The topic is created successfully.
List all topics as
alice.bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/alice.properties --list bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/team-a-client.properties --list bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/team-b-client.properties --listAdmin user
alicecan list all the topics, whereasteam-a-clientandteam-b-clientcan only list the topics they have access to.The
Dev Team AandDev Team Broles both haveDescribepermission on topics that start withx_, but they cannot see the other team’s topics because they do not haveDescribepermissions on them.Use the
team-a-clientto produce messages to thex_messagestopic:bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \ --producer.config /tmp/team-a-client.properties Message 1 Message 2 Message 3As
alicecreated thex_messagestopic, messages are produced to Kafka successfully.Use the
team-b-clientto produce messages to thex_messagestopic.bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \ --producer.config /tmp/team-b-client.properties Message 4 Message 5This request returns a
Not authorized to access topics: [x_messages]error.Use the
team-b-clientto consume messages from thex_messagestopic:bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \ --from-beginning --consumer.config /tmp/team-b-client.properties --group x_consumer_group_bThe consumer receives all the messages from the
x_messagestopic.Use the
team-a-clientto consume messages from thex_messagestopic.bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \ --from-beginning --consumer.config /tmp/team-a-client.properties --group x_consumer_group_aThis request returns a
Not authorized to access topics: [x_messages]error.Use the
team-a-clientto consume messages from a consumer group that begins witha_.bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \ --from-beginning --consumer.config /tmp/team-a-client.properties --group a_consumer_group_aThis request returns a
Not authorized to access topics: [x_messages]error.Dev Team Ahas noReadaccess on topics that start with ax_.Use
aliceto produce messages to thex_messagestopic.bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \ --from-beginning --consumer.config /tmp/alice.propertiesMessages are produced to Kafka successfully.
alicecan read from or write to any topic.Use
aliceto read the cluster configuration.bin/kafka-configs.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/alice.properties \ --entity-type brokers --describe --entity-defaultThe cluster configuration for this example is empty.