16.4. Red Hat build of Keycloak での権限のセットアップ
Red Hat build of Keycloak を OAuth 2.0 認可サーバーとして使用する場合、認可権限を使用してユーザーアカウントまたはサービスアカウントに Kafka 権限が付与されます。Kafka へのアクセス権限を付与するには、Red Hat build of Keycloak Authorization Services と Kafka の認可モデルをマッピングする OAuth クライアント仕様 を Red Hat build of Keycloak に作成します。
16.4.1. Kafka と Red Hat build of Keycloak の認可モデル
Kafka と Red Hat build of Keycloak は異なる認可モデルを使用します。
Kafka 認可モデル
Kafka の認可モデルでは、リソースタイプ と 操作 を使用してユーザーの ACL を記述します。Kafka クライアントがブローカーでアクションを実行すると、ブローカーは設定済みの KeycloakAuthorizer
を使用して、アクションおよびリソースタイプを基にしてクライアントのパーミッションをチェックします。
各リソースタイプには、使用可能な操作権限のセットがあります。たとえば、Topic
リソースタイプには、Create
権限や Write
権限などがあります。
リソースと権限の完全なリストは、Kafka ドキュメントの Kafka 認可モデル を参照してください。
Red Hat build of Keycloak 認可モデル
Red Hat build of Keycloak の認可サービスモデルには、権限の定義と付与に関する 4 つの概念があります。
- リソース
- スコープ
- ポリシー
- パーミッション
これらの概念の詳細は、Authorization Services のガイドを参照してください。
16.4.2. 認可モデルのマッピング
Kafka 認証モデルは、Kafka へのアクセスを制御する Red Hat build of Keycloak のロールとリソースを定義するための基盤として使用されます。
ユーザーアカウントまたはサービスアカウントに Kafka 権限を付与するには、まず、Kafka クラスター用の Red Hat build of Keycloak に OAuth クライアント仕様 を作成します。その後、クライアントで Red Hat build of Keycloak Authorization Services のルールを指定します。通常、Kafka クラスターを表す OAuth クライアントのクライアント ID は、kafka
です。Streams for Apache Kafka に付属する サンプル設定ファイル では、OAuth クライアント ID として kafka
を使用します。
複数の Kafka クラスターがある場合は、それらすべてに対して単一の OAuth クライアント (kafka
) を使用できます。これにより、認可ルールを定義および管理するための統一された場所が得られます。ただし、異なる OAuth クライアント ID (たとえば、my-cluster-Kafka
または cluster-dev-kafka
) を使用して、各クライアント設定内の各クラスターの認可ルールを定義することもできます。
kafka
クライアント定義を指定するには、Red Hat build of Keycloak 管理コンソールで Authorization Enabled オプションを有効にする必要があります。認可サービスを有効にする方法は、Authorization Services のガイドを参照してください。
すべてのパーミッションは、kafka
クライアントのスコープ内に存在します。異なる OAuth クライアント ID で設定された異なる Kafka クラスターがある場合、そのクラスターが同じ Red Hat build of Keycloak レルムに含まれていても、それぞれに個別の権限セットが必要です。
Kafka クライアントが OAUTHBEARER 認証を使用する場合、Red Hat build of Keycloak オーソライザー (KeycloakAuthorizer
) が、現在のセッションのアクセストークンを使用して、Red Hat build of Keycloak サーバーからグラントのリストを取得します。オーソライザーは、権限を付与するために、アクセストークン所有者のポリシーと権限に基づいて、Red Hat build of Keycloak Authorization Services から受信およびキャッシュされたグラントリストを評価します。
Kafka 権限の認可スコープのアップロード
通常、Red Hat build of Keycloak の初期設定で、各 Kafka リソースタイプに対して実行できるすべての操作のリストを作成するために、認可スコープをアップロードします。この手順は、権限を定義する前に 1 回だけ実行されます。認可スコープは、アップロードする代わりに手動で追加することもできます。
認可スコープには、リソースタイプに関係なく、次の Kafka 権限が含まれている必要があります。
-
Create
-
Write
-
Read
-
Delete
-
Describe
-
Alter
-
DescribeConfigs
-
AlterConfigs
-
ClusterAction
-
IdempotentWrite
権限が必要ないことが確実な場合 (IdempotentWrite
など)、その権限を認可スコープのリストから省略できます。ただし、その権限は、Kafka リソースをターゲットにする際に使用できなくなります。
All
権限はサポートされていません。
パーミッションチェックのリソースパターン
リソースパターンは、パーミッションチェックの実行時にターゲットリソースに対するパターンの照合に使用されます。一般的なパターンの形式は、<resource_type>:<pattern_name>
です。
リソースタイプは Kafka 承認モデルをミラーリングします。このパターンでは、次の 2 つの一致オプションが可能です。
-
完全一致 (パターンが
*
で終了しない場合) -
接頭辞一致 (パターンが
*
で終了する)
リソースのパターン例
Topic:my-topic Topic:orders-* Group:orders-* Cluster:*
Topic:my-topic
Topic:orders-*
Group:orders-*
Cluster:*
さらに、一般的なパターンの形式には、接頭辞 kafka-cluster:<cluster_name>
を付け、その後にコンマを付けることができます。<cluster_name>
は、Kafka カスタムリソース内の metadata.name
を示します。
クラスター接頭辞が付けられたリソースのパターン例
kafka-cluster:my-cluster,Topic:* kafka-cluster:*,Group:b_*
kafka-cluster:my-cluster,Topic:*
kafka-cluster:*,Group:b_*
kafka-cluster
の接頭辞がない場合は、kafka-cluster:*
とみなします。
リソースを定義するときに、リソースに関連する可能な認可スコープのリストを関連付けることができます。ターゲットリソースタイプに妥当なアクションを設定します。
任意のリソースに任意の認可スコープを追加できますが、リソースタイプでサポートされるスコープのみがアクセス制御の対象として考慮されます。
アクセスパーミッションを適用するポリシー
ポリシーは、1 つ以上のユーザーアカウントまたはサービスアカウントにパーミッションをターゲットにするために使用されます。以下がターゲットの対象になります。
- 特定のユーザーまたはサービスアカウント
- レルムロールまたはクライアントロール
- ユーザーグループ
ポリシーには一意の名前が割り当てられ、複数のリソースに対して複数の対象パーミッションを指定するために再使用できます。
アクセスを付与するためのパーミッション
詳細なパーミッションを使用して、ユーザーへのアクセスを付与するポリシー、リソース、および認可スコープをまとめます。
各パーミッションの名前によって、どのユーザーにどのパーミッションが付与されるかが明確に定義される必要があります。例えば、Dev Team B は x で始まるトピックから読むことができます
。
16.4.3. 一般的な Kafka 操作の権限
次の例は、Kafka で一般的な操作を実行するために必要なユーザー権限を示しています。
トピックの作成
トピックを作成するには、特定のトピック、または Cluster:kafka-cluster
に対して Create
パーミッションが必要です。
bin/kafka-topics.sh --create --topic my-topic \ --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties
bin/kafka-topics.sh --create --topic my-topic \
--bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties
トピックのリスト表示
指定のトピックでユーザーに Describe
パーミッションがある場合には、トピックがリスト表示されます。
bin/kafka-topics.sh --list \ --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties
bin/kafka-topics.sh --list \
--bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties
トピックの詳細の表示
トピックの詳細を表示するには、トピックに対して Describe
および DescribeConfigs
の権限が必要です。
bin/kafka-topics.sh --describe --topic my-topic \ --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties
bin/kafka-topics.sh --describe --topic my-topic \
--bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties
トピックへのメッセージの生成
トピックへのメッセージを作成するには、トピックに対する Describe
と Write
の権限が必要です。
トピックが作成されておらず、トピックの自動生成が有効になっている場合は、トピックを作成するパーミッションが必要になります。
bin/kafka-console-producer.sh --topic my-topic \ --bootstrap-server my-cluster-kafka-bootstrap:9092 --producer.config=/tmp/config.properties
bin/kafka-console-producer.sh --topic my-topic \
--bootstrap-server my-cluster-kafka-bootstrap:9092 --producer.config=/tmp/config.properties
トピックからのメッセージの消費
トピックからのメッセージを消費するためには、トピックに Describe
と Read
のパーミッションが必要です。通常、トピックからの消費は、コンシューマーグループにコンシューマーオフセットを格納することに依存しており、これにはコンシューマーグループに対する追加の Describe
および Read
権限が必要です。
マッチングには 2 つの resources
が必要です。以下に例を示します。
Topic:my-topic Group:my-group-*
Topic:my-topic
Group:my-group-*
bin/kafka-console-consumer.sh --topic my-topic --group my-group-1 --from-beginning \ --bootstrap-server my-cluster-kafka-bootstrap:9092 --consumer.config /tmp/config.properties
bin/kafka-console-consumer.sh --topic my-topic --group my-group-1 --from-beginning \
--bootstrap-server my-cluster-kafka-bootstrap:9092 --consumer.config /tmp/config.properties
べき等プロデューサーを使用したトピックへのメッセージの生成
Cluster:kafka-cluster
リソースには、トピックをプロデュースするためのアクセス許可だけでなく、IdempotentWrite
アクセス許可が追加で必要です。
マッチングには 2 つの resources
が必要です。以下に例を示します。
Topic:my-topic Cluster:kafka-cluster
Topic:my-topic
Cluster:kafka-cluster
bin/kafka-console-producer.sh --topic my-topic \ --bootstrap-server my-cluster-kafka-bootstrap:9092 --producer.config=/tmp/config.properties --producer-property enable.idempotence=true --request-required-acks -1
bin/kafka-console-producer.sh --topic my-topic \
--bootstrap-server my-cluster-kafka-bootstrap:9092 --producer.config=/tmp/config.properties --producer-property enable.idempotence=true --request-required-acks -1
コンシューマーグループのリスト
コンシューマーグループのリスト表示時に、ユーザーが Describe
権限を持っているグループのみが返されます。また、ユーザーが Cluster:kafka-cluster
に対して Describe
パーミッションを持っている場合は、すべてのコンシューマーグループが返されます。
bin/kafka-consumer-groups.sh --list \ --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties
bin/kafka-consumer-groups.sh --list \
--bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties
コンシューマーグループの詳細の表示
コンシューマーグループの詳細を表示するには、グループとグループに関連するトピックに対して Describe
権限が必要です。
bin/kafka-consumer-groups.sh --describe --group my-group-1 \ --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties
bin/kafka-consumer-groups.sh --describe --group my-group-1 \
--bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties
トピック設定の変更
トピックの設定を変更するには、トピックに Describe
と Alter
の権限が必要です。
bin/kafka-topics.sh --alter --topic my-topic --partitions 2 \ --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties
bin/kafka-topics.sh --alter --topic my-topic --partitions 2 \
--bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties
Kafka ブローカー設定の表示
kafka-configs.sh
を使用してブローカーの設定を取得するためには、Cluster:kafka-cluster
に DescribeConfigs
パーミッションが必要です。
bin/kafka-configs.sh --entity-type brokers --entity-name 0 --describe --all \ --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties
bin/kafka-configs.sh --entity-type brokers --entity-name 0 --describe --all \
--bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties
Kafka ブローカー設定の変更
Kafka ブローカーの設定を変更するには、Cluster:kafka-cluster
に DescribeConfigs
および AlterConfigs
パーミッションが必要です。
bin/kafka-configs --entity-type brokers --entity-name 0 --alter --add-config log.cleaner.threads=2 \ --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties
bin/kafka-configs --entity-type brokers --entity-name 0 --alter --add-config log.cleaner.threads=2 \
--bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties
トピックの削除
トピックを削除するには、トピックに Describe
と Delete
の権限が必要です。
bin/kafka-topics.sh --delete --topic my-topic \ --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties
bin/kafka-topics.sh --delete --topic my-topic \
--bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties
リードパーティションの選択
トピックパーティションのリーダー選択を実行するには、Cluster:kafka-cluster
に Alter
パーミッションが必要です。
bin/kafka-leader-election.sh --topic my-topic --partition 0 --election-type PREFERRED / --bootstrap-server my-cluster-kafka-bootstrap:9092 --admin.config /tmp/config.properties
bin/kafka-leader-election.sh --topic my-topic --partition 0 --election-type PREFERRED /
--bootstrap-server my-cluster-kafka-bootstrap:9092 --admin.config /tmp/config.properties
パーティションの再割り当て
パーティション再割り当てファイルを生成するためには、関係するトピックに対して Describe
権限が必要です。
bin/kafka-reassign-partitions.sh --topics-to-move-json-file /tmp/topics-to-move.json --broker-list "0,1" --generate \ --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config /tmp/config.properties > /tmp/partition-reassignment.json
bin/kafka-reassign-partitions.sh --topics-to-move-json-file /tmp/topics-to-move.json --broker-list "0,1" --generate \
--bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config /tmp/config.properties > /tmp/partition-reassignment.json
パーティション再割り当てを実行するには、Cluster:kafka-cluster
に対して Describe
と Alter
のパーミッションが必要です。また、関係するトピックには、Describe
のパーミッションが必要です。
bin/kafka-reassign-partitions.sh --reassignment-json-file /tmp/partition-reassignment.json --execute \ --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config /tmp/config.properties
bin/kafka-reassign-partitions.sh --reassignment-json-file /tmp/partition-reassignment.json --execute \
--bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config /tmp/config.properties
パーティション再割り当てを確認するには、Cluster:kafka-cluster
および関連する各トピックに対して Describe
および AlterConfigs
のパーミッションが必要です。
bin/kafka-reassign-partitions.sh --reassignment-json-file /tmp/partition-reassignment.json --verify \ --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config /tmp/config.properties
bin/kafka-reassign-partitions.sh --reassignment-json-file /tmp/partition-reassignment.json --verify \
--bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config /tmp/config.properties
16.4.4. 例: Red Hat build of Keycloak Authorization Services のセットアップ
トークンベースの認証に Red Hat build of Keycloak で OAuth 2.0 を使用している場合は、Red Hat build of Keycloak を使用して、Kafka ブローカーへのクライアントアクセスを制限する認可ルールを設定することもできます。この例では、keycloak
認可で Red Hat build of Keycloak Authorization Services を使用する方法を説明します。Red Hat build of Keycloak をセットアップして、Kafka クライアントへのアクセス制限を適用します。Red Hat build of Keycloak Authorization Services では、認可スコープ、ポリシー、権限を使用して、リソースへのアクセス制御を定義および適用します。
Red Hat build of Keycloak Authorization Services REST エンドポイントは、認証されたユーザーのリソースに付与された権限のリストを提供します。許可 (権限) のリストは、Kafka クライアントによって認証されたセッションが確立された後の最初のアクションとして、Red Hat build of Keycloak サーバーから取得されます。付与の変更が検出されるように、バックグラウンドでリストが更新されます。付与は、各ユーザーセッションが迅速な承認決定を提供するために、Kafka ブローカーにてローカルでキャッシュおよび適用されます。
Streams for Apache Kafka は、設定ファイルのサンプル を提供します。これには、Red Hat build of Keycloak をセットアップするための次のサンプルファイルが含まれています。
kafka-ephemeral-oauth-single-keycloak-authz.yaml
-
Red Hat build of Keycloak を使用する OAuth 2.0 トークンベースの認可用に設定された
Kafka
カスタムリソースの例。カスタムリソースを使用して、keycloak
認可およびトークンベースのoauth
認証を使用する Kafka クラスターをデプロイできます。 kafka-authz-realm.json
- サンプルグループ、ユーザー、ロール、クライアントで設定された Red Hat build of Keycloak レルムの例。このレルムを Red Hat build of Keycloak インスタンスにインポートすると、Kafka にアクセスするための詳細な権限を設定できます。
Red Hat build of Keycloak で例を試す場合は、これらのファイルを使用して、このセクションで概説されているタスクを順番どおりに実行してください。
認証
トークンベースの oauth
認証を設定する場合、jwksEndpointUri
をローカル JWT 検証の URI として指定します。keycloak
認可を設定する場合、tokenEndpointUri
を Red Hat build of Keycloak トークンエンドポイントの URI として指定します。両方の URI のホスト名は同じである必要があります。
グループまたはロールポリシーを使用した対象パーミッション
Red Hat build of Keycloak では、サービスアカウントが有効になっている機密性の高いクライアントを、クライアント ID とシークレットを使用して、独自の名前のサーバーに対して認証できます。これは、通常、特定ユーザーのエージェント (Web サイトなど) としてではなく、独自の名前で動作するマイクロサービスに便利です。サービスアカウントには、通常のユーザーと同様にロールを割り当てることができます。ただし、グループを割り当てることはできません。そのため、サービスアカウントを使用してマイクロサービスへのパーミッションをターゲットにする場合は、グループポリシーを使用できないため、代わりにロールポリシーを使用する必要があります。逆に、ユーザー名およびパスワードを使用した認証が必要な通常のユーザーアカウントにのみ特定のパーミッションを制限する場合は、ロールポリシーではなく、グループポリシーを使用すると、副次的に実現することができます。これは、ClusterManager
で始まるパーミッションの例で使用されるものです。通常、クラスター管理の実行は CLI ツールを使用して対話的に行われます。結果的に生成されるアクセストークンを使用して Kafka ブローカーに対して認証を行う前に、ユーザーのログインを要求することは妥当です。この場合、アクセストークンはクライアントアプリケーションではなく、特定のユーザーを表します。
16.4.4.1. Red Hat build of Keycloak での権限のセットアップ
Red Hat build of Keycloak をセットアップし、管理コンソールに接続して、事前設定されたレルムを追加します。kafka-authz-realm.json
ファイルのサンプルを使用して、レルムをインポートします。管理コンソールで、レルムに定義されている認可ルールを確認できます。このルールは、サンプルの Red Hat build of Keycloak レルムを使用するように設定された Kafka クラスター上のリソースへのアクセスを許可します。
前提条件
- 実行中の OpenShift クラスター。
-
事前設定されたレルムを含む Streams for Apache Kafka
examples/security/keycloak-authorization/kafka-authz-realm.json
ファイル。
手順
- Red Hat build of Keycloak ドキュメントの スタートガイド の説明に従って、Red Hat build of Keycloak Operator を使用して Red Hat build of Keycloak サーバーをインストールします。
- Red Hat build of Keycloak インスタンスが実行されるまで待ちます。
管理コンソールにアクセスできるように外部ホスト名を取得します。
NS=sso oc get ingress keycloak -n $NS
NS=sso oc get ingress keycloak -n $NS
Copy to Clipboard Copied! この例では、Red Hat build of Keycloak サーバーが
sso
namespace で実行されていると想定しています。admin
ユーザーのパスワードを取得します。oc get -n $NS pod keycloak-0 -o yaml | less
oc get -n $NS pod keycloak-0 -o yaml | less
Copy to Clipboard Copied! パスワードはシークレットとして保存されるため、Red Hat build of Keycloak インスタンスの設定 YAML ファイルを取得して、シークレットの名前 (
secretKeyRef.name
) を特定します。シークレットの名前を使用して、クリアテキストのパスワードを取得します。
SECRET_NAME=credential-keycloak oc get -n $NS secret $SECRET_NAME -o yaml | grep PASSWORD | awk '{print $2}' | base64 -D
SECRET_NAME=credential-keycloak oc get -n $NS secret $SECRET_NAME -o yaml | grep PASSWORD | awk '{print $2}' | base64 -D
Copy to Clipboard Copied! この例では、シークレットの名前が
credential-keycloak
であることを前提としています。ユーザー名
admin
と取得したパスワードを使用して、管理コンソールにログインします。https://HOSTNAME
を使用して KubernetesIngress
にアクセスします。管理コンソールを使用して、サンプルレルムを Red Hat build of Keycloak にアップロードできるようになりました。
- Add Realm をクリックして、サンプルレルムをインポートします。
examples/security/keycloak-authorization/kafka-authz-realm.json
ファイルを追加してから Create をクリックします。これで、管理コンソールの現在のレルムとして
kafka-authz
が含まれるようになりました。デフォルトビューには、Master レルムが表示されます。
Red Hat build of Keycloak 管理コンソールで、Clients > kafka > Authorization > Settings に移動し、Decision Strategy が Affirmative に設定されていることを確認します。
肯定的な (Affirmative) ポリシーとは、クライアントが Kafka クラスターにアクセスするためには少なくとも 1 つのポリシーが満たされている必要があることを意味します。
Red Hat build of Keycloak 管理コンソールで、Groups、Users、Roles、Clients に移動して、レルム設定を表示します。
- グループ
-
Groups
はユーザーを整理し、権限を設定します。グループは LDAP アイデンティティープロバイダーにリンクして、リージョンや部門などによってユーザーを区分するために使用できます。カスタム LDAP インターフェイスを介してユーザーをグループに追加し、Kafka リソースの権限を管理できます。KeyCloak の LDAP プロバイダーの使用に関する詳細は、Keycloak サーバー管理 ガイドを参照してください。 - ユーザー
-
Users
は個々のユーザーを定義します。この例では、alice
(ClusterManager
グループ内) とbob
(ClusterManager-my-cluster
内) を作成します。ユーザーは LDAP アイデンティティープロバイダーに保存することもできます。 - ロール
-
Roles
は、ユーザーまたはクライアントに特定の権限を割り当てます。ロールはタグのように機能し、ユーザーに特定の権限を付与します。ロールは LDAP に保存できませんが、Red Hat build of Keycloak ロールをグループに追加すると、ロールと権限の両方を組み合わせることができます。 - クライアント
Clients
は Kafka のやり取りの設定を定義します。-
kafka
クライアントは、ブローカーの OAuth 2.0 トークン検証を処理し、認可ポリシー (認可の有効化が必要) を含んでいます。 -
kafka-cli
クライアントは、アクセストークンまたはリフレッシュトークンを取得するためにコマンドラインツールによって使用されます。 -
team-a-client
とteam-b-client
は、特定の Kafka トピックへの部分的なアクセス権を持つサービスを表します。
-
Red Hat build of Keycloak 管理コンソールで、Authorization > Permissions に移動して、レルムに定義されたリソースとポリシーを使用する付与された権限を確認します。
たとえば、
kafka
クライアントには次の権限があります。Dev Team A can write to topics that start with x_ on any cluster Dev Team B can read from topics that start with x_ on any cluster Dev Team B can update consumer group offsets that start with x_ on any cluster ClusterManager of my-cluster Group has full access to cluster config on my-cluster ClusterManager of my-cluster Group has full access to consumer groups on my-cluster ClusterManager of my-cluster Group has full access to topics on my-cluster
Dev Team A can write to topics that start with x_ on any cluster Dev Team B can read from topics that start with x_ on any cluster Dev Team B can update consumer group offsets that start with x_ on any cluster ClusterManager of my-cluster Group has full access to cluster config on my-cluster ClusterManager of my-cluster Group has full access to consumer groups on my-cluster ClusterManager of my-cluster Group has full access to topics on my-cluster
Copy to Clipboard Copied! - Dev Team A
-
Dev チーム A レルムロールは、任意のクラスターで
x_
で始まるトピックに書き込みできます。これは、Topic:x_*
というリソース、Describe
とWrite
のスコープ、そしてDev Team A
のポリシーを組み合わせたものです。Dev Team A
ポリシーは、Dev Team A
というレルムロールを持つすべてのユーザーにマッチします。 - Dev Team B
-
Dev チーム B レルムロールは、任意のクラスターで
x_
で始まるトピックから読み取ることができます。これは、Topic:x_*、
Group:x_*
のリソース、Describe
とRead
のスコープ、およびDev Team B
のポリシーを組み合わせたものです。Dev Team B
ポリシーは、Dev Team B
というレルムロールを持つすべてのユーザーにマッチします。一致するユーザーおよびクライアントはトピックから読み取りでき、名前がx_
で始まるトピックおよびコンシューマーグループの消費されたオフセットを更新できます。
16.4.4.2. Red Hat build of Keycloak 認可を使用する Kafka クラスターのデプロイ
Red Hat build of Keycloak サーバーに接続するように設定された Kafka クラスターをデプロイします。サンプルの kafka-ephemeral-oauth-single-keycloak-authz.yaml
ファイルを使用して、Kafka
カスタムリソースとして Kafka クラスターをデプロイメントします。この例では、keycloak
承認と oauth
認証を使用して単一ノードの Kafka クラスターをデプロイします。
前提条件
- Red Hat build of Keycloak 認可サーバーが OpenShift クラスターにデプロイされ、サンプルレルムでロードされている。
- Cluster Operator が OpenShift クラスターにデプロイされている。
-
Streams for Apache Kafka の
examples/security/keycloak-authorization/kafka-ephemeral-oauth-single-keycloak-authz.yaml
カスタムリソース。
手順
デプロイした Red Hat build of Keycloak インスタンスのホスト名を使用して、Kafka ブローカーが Red Hat build of Keycloak サーバーと通信するためのトラストストア証明書を準備します。
SSO_HOST=SSO-HOSTNAME SSO_HOST_PORT=$SSO_HOST:443 STOREPASS=storepass echo "Q" | openssl s_client -showcerts -connect $SSO_HOST_PORT 2>/dev/null | awk ' /BEGIN CERTIFICATE/,/END CERTIFICATE/ { print $0 } ' > /tmp/sso.pem
SSO_HOST=SSO-HOSTNAME SSO_HOST_PORT=$SSO_HOST:443 STOREPASS=storepass echo "Q" | openssl s_client -showcerts -connect $SSO_HOST_PORT 2>/dev/null | awk ' /BEGIN CERTIFICATE/,/END CERTIFICATE/ { print $0 } ' > /tmp/sso.pem
Copy to Clipboard Copied! Kubernetes
Ingress
はセキュアな (HTTPS) 接続を確立するために使用されるため、証明書が必要です。通常、単一の証明書ではなく、証明書チェーンがあります。指定する必要があるのは、
/tmp/sso.pem
ファイルの最後にリストされている最上位の発行者 CA だけです。手動で抽出するか、次のコマンドを使用して抽出できます。証明書チェーンの最上位の CA 証明書を抽出するコマンドの例
split -p "-----BEGIN CERTIFICATE-----" sso.pem sso- for f in $(ls sso-*); do mv $f $f.pem; done cp $(ls sso-* | sort -r | head -n 1) sso-ca.crt
split -p "-----BEGIN CERTIFICATE-----" sso.pem sso- for f in $(ls sso-*); do mv $f $f.pem; done cp $(ls sso-* | sort -r | head -n 1) sso-ca.crt
Copy to Clipboard Copied! 注記信頼できる CA 証明書は通常、
openssl
コマンドを使用するのではなく、信頼できるソースから取得します。シークレットとして OpenShift に証明書をデプロイします。
oc create secret generic oauth-server-cert --from-file=/tmp/sso-ca.crt -n $NS
oc create secret generic oauth-server-cert --from-file=/tmp/sso-ca.crt -n $NS
Copy to Clipboard Copied! ホスト名を環境変数として設定します。
SSO_HOST=SSO-HOSTNAME
SSO_HOST=SSO-HOSTNAME
Copy to Clipboard Copied! サンプル Kafka クラスターを作成およびデプロイします。
cat examples/security/keycloak-authorization/kafka-ephemeral-oauth-single-keycloak-authz.yaml | sed -E 's#\${SSO_HOST}'"#$SSO_HOST#" | oc create -n $NS -f -
cat examples/security/keycloak-authorization/kafka-ephemeral-oauth-single-keycloak-authz.yaml | sed -E 's#\${SSO_HOST}'"#$SSO_HOST#" | oc create -n $NS -f -
Copy to Clipboard Copied!
16.4.4.3. CLI Kafka クライアントセッションの TLS 接続の準備
対話型 CLI セッション用の新規 Pod を作成します。TLS 接続用の Red Hat build of Keycloak 証明書を使用してトラストストアを設定します。トラストストアは、Red Hat build of Keycloak と Kafka ブローカーに接続します。
前提条件
Red Hat build of Keycloak 認可サーバーが OpenShift クラスターにデプロイされ、サンプルレルムでロードされている。
Red Hat build of Keycloak 管理コンソールで、クライアントに割り当てられたロールが Clients > Service Account Roles に表示されることを確認します。
- Red Hat build of Keycloak に接続するように設定された Kafka クラスターが OpenShift クラスターにデプロイされている。
手順
Kafka イメージを使用して新しいインタラクティブな Pod コンテナーを実行し、実行中の Kafka ブローカーに接続します。
NS=sso oc run -ti --restart=Never --image=registry.redhat.io/amq-streams/kafka-38-rhel9:2.8.0 kafka-cli -n $NS -- /bin/sh
NS=sso oc run -ti --restart=Never --image=registry.redhat.io/amq-streams/kafka-38-rhel9:2.8.0 kafka-cli -n $NS -- /bin/sh
Copy to Clipboard Copied! 注記イメージのダウンロードの待機中に
oc
がタイムアウトする場合、その後の試行によって AlreadyExists エラーが発生することがあります。Pod コンテナーにアタッチします。
oc attach -ti kafka-cli -n $NS
oc attach -ti kafka-cli -n $NS
Copy to Clipboard Copied! Red Hat build of Keycloak のホスト名を使用して、TLS を使用したクライアント接続用の証明書を準備します。
SSO_HOST=SSO-HOSTNAME SSO_HOST_PORT=$SSO_HOST:443 STOREPASS=storepass echo "Q" | openssl s_client -showcerts -connect $SSO_HOST_PORT 2>/dev/null | awk ' /BEGIN CERTIFICATE/,/END CERTIFICATE/ { print $0 } ' > /tmp/sso.pem
SSO_HOST=SSO-HOSTNAME SSO_HOST_PORT=$SSO_HOST:443 STOREPASS=storepass echo "Q" | openssl s_client -showcerts -connect $SSO_HOST_PORT 2>/dev/null | awk ' /BEGIN CERTIFICATE/,/END CERTIFICATE/ { print $0 } ' > /tmp/sso.pem
Copy to Clipboard Copied! 通常、単一の証明書ではなく、証明書チェーンがあります。指定する必要があるのは、
/tmp/sso.pem
ファイルの最後にリストされている最上位の発行者 CA だけです。手動で抽出するか、次のコマンドを使用して抽出できます。証明書チェーンの最上位の CA 証明書を抽出するコマンドの例
split -p "-----BEGIN CERTIFICATE-----" sso.pem sso- for f in $(ls sso-*); do mv $f $f.pem; done cp $(ls sso-* | sort -r | head -n 1) sso-ca.crt
split -p "-----BEGIN CERTIFICATE-----" sso.pem sso- for f in $(ls sso-*); do mv $f $f.pem; done cp $(ls sso-* | sort -r | head -n 1) sso-ca.crt
Copy to Clipboard Copied! 注記信頼できる CA 証明書は通常、
openssl
コマンドを使用するのではなく、信頼できるソースから取得します。Kafka ブローカーへの TLS 接続のトラストストアを作成します。
keytool -keystore /tmp/truststore.p12 -storetype pkcs12 -alias sso -storepass $STOREPASS -import -file /tmp/sso-ca.crt -noprompt
keytool -keystore /tmp/truststore.p12 -storetype pkcs12 -alias sso -storepass $STOREPASS -import -file /tmp/sso-ca.crt -noprompt
Copy to Clipboard Copied! Kafka ブートストラップアドレスを Kafka ブローカーのホスト名および
tls
リスナーポート (9093) のホスト名として使用し、Kafka ブローカーの証明書を準備します。KAFKA_HOST_PORT=my-cluster-kafka-bootstrap:9093 STOREPASS=storepass echo "Q" | openssl s_client -showcerts -connect $KAFKA_HOST_PORT 2>/dev/null | awk ' /BEGIN CERTIFICATE/,/END CERTIFICATE/ { print $0 } ' > /tmp/my-cluster-kafka.pem
KAFKA_HOST_PORT=my-cluster-kafka-bootstrap:9093 STOREPASS=storepass echo "Q" | openssl s_client -showcerts -connect $KAFKA_HOST_PORT 2>/dev/null | awk ' /BEGIN CERTIFICATE/,/END CERTIFICATE/ { print $0 } ' > /tmp/my-cluster-kafka.pem
Copy to Clipboard Copied! 取得した
.pem
ファイルは通常、1 つの証明書ではなく、証明書チェーンです。指定する必要があるのは、/tmp/my-cluster-kafka.pem
ファイルの最後にリストされている最上位の発行者 CA のみです。手動で抽出するか、次のコマンドを使用して抽出できます。証明書チェーンの最上位の CA 証明書を抽出するコマンドの例
split -p "-----BEGIN CERTIFICATE-----" /tmp/my-cluster-kafka.pem kafka- for f in $(ls kafka-*); do mv $f $f.pem; done cp $(ls kafka-* | sort -r | head -n 1) my-cluster-kafka-ca.crt
split -p "-----BEGIN CERTIFICATE-----" /tmp/my-cluster-kafka.pem kafka- for f in $(ls kafka-*); do mv $f $f.pem; done cp $(ls kafka-* | sort -r | head -n 1) my-cluster-kafka-ca.crt
Copy to Clipboard Copied! 注記信頼できる CA 証明書は通常、
openssl
コマンドを使用するのではなく、信頼できるソースから取得します。この例では、Kafka クラスターがデプロイされたのと同じ namespace の Pod でクライアントが実行されていると想定しています。クライアントが OpenShift クラスターの外部から Kafka クラスターにアクセスしている場合は、最初にブートストラップアドレスを決定する必要があります。その場合、クラスター証明書を OpenShift シークレットから直接取得することもでき、openssl
は必要ありません。詳細は、14章Kafka クラスターへのクライアントアクセスの設定 を参照してください。Kafka ブローカーの証明書をトラストストアに追加します。
keytool -keystore /tmp/truststore.p12 -storetype pkcs12 -alias my-cluster-kafka -storepass $STOREPASS -import -file /tmp/my-cluster-kafka-ca.crt -noprompt
keytool -keystore /tmp/truststore.p12 -storetype pkcs12 -alias my-cluster-kafka -storepass $STOREPASS -import -file /tmp/my-cluster-kafka-ca.crt -noprompt
Copy to Clipboard Copied! 承認されたアクセスを確認するために、セッションを開いたままにします。
16.4.4.4. CLI Kafka クライアントセッションを使用した Kafka への承認されたアクセスの確認
対話型 CLI セッションを使用して、Red Hat build of Keycloak を通じて適用された認可ルールを確認します。Kafka のサンプルプロデューサーおよびコンシューマークライアントを使用してチェックを適用し、異なるレベルのアクセスを持つユーザーおよびサービスアカウントでトピックを作成します。
team-a-client
クライアントおよび team-b-client
クライアントを使用して、認可ルールを確認します。alice
admin ユーザーを使用して、Kafka で追加の管理タスクを実行します。
この例で使用する Kafka イメージには、Kafka プロデューサーおよびコンシューマーのバイナリーが含まれています。
前提条件
- ZooKeeper および Kafka は OpenShift クラスターで実行され、メッセージを送受信できる。
対話型 CLI Kafka クライアントセッション が開始される。
クライアントおよび管理ユーザーの設定
team-a-client
クライアントの認証プロパティーで Kafka 設定ファイルを準備します。SSO_HOST=SSO-HOSTNAME cat > /tmp/team-a-client.properties << EOF security.protocol=SASL_SSL ssl.truststore.location=/tmp/truststore.p12 ssl.truststore.password=$STOREPASS ssl.truststore.type=PKCS12 sasl.mechanism=OAUTHBEARER sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.client.id="team-a-client" \ oauth.client.secret="team-a-client-secret" \ oauth.ssl.truststore.location="/tmp/truststore.p12" \ oauth.ssl.truststore.password="$STOREPASS" \ oauth.ssl.truststore.type="PKCS12" \ oauth.token.endpoint.uri="https://$SSO_HOST/realms/kafka-authz/protocol/openid-connect/token" ; sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler EOF
SSO_HOST=SSO-HOSTNAME cat > /tmp/team-a-client.properties << EOF security.protocol=SASL_SSL ssl.truststore.location=/tmp/truststore.p12 ssl.truststore.password=$STOREPASS ssl.truststore.type=PKCS12 sasl.mechanism=OAUTHBEARER sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.client.id="team-a-client" \ oauth.client.secret="team-a-client-secret" \ oauth.ssl.truststore.location="/tmp/truststore.p12" \ oauth.ssl.truststore.password="$STOREPASS" \ oauth.ssl.truststore.type="PKCS12" \ oauth.token.endpoint.uri="https://$SSO_HOST/realms/kafka-authz/protocol/openid-connect/token" ; sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler EOF
Copy to Clipboard Copied! SASL
OAUTHBEARER
メカニズムを使用します。このメカニズムにはクライアント ID とクライアントシークレットが必要です。つまり、クライアントはまず Red Hat build of Keycloak サーバーに接続してアクセストークンを取得します。その後、クライアントは Kafka ブローカーに接続し、アクセストークンを使用して認証します。team-b-client
クライアントの認証プロパティーで Kafka 設定ファイルを準備します。cat > /tmp/team-b-client.properties << EOF security.protocol=SASL_SSL ssl.truststore.location=/tmp/truststore.p12 ssl.truststore.password=$STOREPASS ssl.truststore.type=PKCS12 sasl.mechanism=OAUTHBEARER sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.client.id="team-b-client" \ oauth.client.secret="team-b-client-secret" \ oauth.ssl.truststore.location="/tmp/truststore.p12" \ oauth.ssl.truststore.password="$STOREPASS" \ oauth.ssl.truststore.type="PKCS12" \ oauth.token.endpoint.uri="https://$SSO_HOST/realms/kafka-authz/protocol/openid-connect/token" ; sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler EOF
cat > /tmp/team-b-client.properties << EOF security.protocol=SASL_SSL ssl.truststore.location=/tmp/truststore.p12 ssl.truststore.password=$STOREPASS ssl.truststore.type=PKCS12 sasl.mechanism=OAUTHBEARER sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.client.id="team-b-client" \ oauth.client.secret="team-b-client-secret" \ oauth.ssl.truststore.location="/tmp/truststore.p12" \ oauth.ssl.truststore.password="$STOREPASS" \ oauth.ssl.truststore.type="PKCS12" \ oauth.token.endpoint.uri="https://$SSO_HOST/realms/kafka-authz/protocol/openid-connect/token" ; sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler EOF
Copy to Clipboard Copied! curl
を使用して管理者ユーザーalice
を認証し、パスワード付与認証を実行して更新トークンを取得します。USERNAME=alice PASSWORD=alice-password GRANT_RESPONSE=$(curl -X POST "https://$SSO_HOST/realms/kafka-authz/protocol/openid-connect/token" -H 'Content-Type: application/x-www-form-urlencoded' -d "grant_type=password&username=$USERNAME&password=$PASSWORD&client_id=kafka-cli&scope=offline_access" -s -k) REFRESH_TOKEN=$(echo $GRANT_RESPONSE | awk -F "refresh_token\":\"" '{printf $2}' | awk -F "\"" '{printf $1}')
USERNAME=alice PASSWORD=alice-password GRANT_RESPONSE=$(curl -X POST "https://$SSO_HOST/realms/kafka-authz/protocol/openid-connect/token" -H 'Content-Type: application/x-www-form-urlencoded' -d "grant_type=password&username=$USERNAME&password=$PASSWORD&client_id=kafka-cli&scope=offline_access" -s -k) REFRESH_TOKEN=$(echo $GRANT_RESPONSE | awk -F "refresh_token\":\"" '{printf $2}' | awk -F "\"" '{printf $1}')
Copy to Clipboard Copied! 更新トークンは、有効期間がなく、期限切れにならないオフライントークンです。
admin ユーザー
alice
の認証プロパティーで Kafka 設定ファイルを準備します。cat > /tmp/alice.properties << EOF security.protocol=SASL_SSL ssl.truststore.location=/tmp/truststore.p12 ssl.truststore.password=$STOREPASS ssl.truststore.type=PKCS12 sasl.mechanism=OAUTHBEARER sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.refresh.token="$REFRESH_TOKEN" \ oauth.client.id="kafka-cli" \ oauth.ssl.truststore.location="/tmp/truststore.p12" \ oauth.ssl.truststore.password="$STOREPASS" \ oauth.ssl.truststore.type="PKCS12" \ oauth.token.endpoint.uri="https://$SSO_HOST/realms/kafka-authz/protocol/openid-connect/token" ; sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler EOF
cat > /tmp/alice.properties << EOF security.protocol=SASL_SSL ssl.truststore.location=/tmp/truststore.p12 ssl.truststore.password=$STOREPASS ssl.truststore.type=PKCS12 sasl.mechanism=OAUTHBEARER sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.refresh.token="$REFRESH_TOKEN" \ oauth.client.id="kafka-cli" \ oauth.ssl.truststore.location="/tmp/truststore.p12" \ oauth.ssl.truststore.password="$STOREPASS" \ oauth.ssl.truststore.type="PKCS12" \ oauth.token.endpoint.uri="https://$SSO_HOST/realms/kafka-authz/protocol/openid-connect/token" ; sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler EOF
Copy to Clipboard Copied! kafka-cli
パブリッククライアントは、sasl.jaas .config
のoauth.client. id
に使用されます。これはパブリッククライアントであるため、シークレットは必要ありません。クライアントは直前の手順で認証された更新トークンで認証されます。更新トークンは背後でアクセストークンを要求します。これは、認証のために Kafka ブローカーに送信されます。
承認されたアクセスでのメッセージの生成
team-a-client
の設定を使用して、a_
や x_
で始まるトピックへのメッセージを作成できるかどうかを確認します。
トピック
my-topic
に書き込みます。bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic my-topic \ --producer.config=/tmp/team-a-client.properties First message
bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic my-topic \ --producer.config=/tmp/team-a-client.properties First message
Copy to Clipboard Copied! 以下のリクエストは、
Not authorized to access topics: [my-topic]
エラーを返します。team-a-client
はDev Team A
ロールを持っており、a_
で始まるトピックに対してサポートされているすべてのアクションを実行する権限を与えられていますが、x_
で始まるトピックへの書き込みのみ可能です。my-topic
という名前のトピックは、これらのルールのいずれにも一致しません。トピック
a_messages
に書き込む。bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic a_messages \ --producer.config /tmp/team-a-client.properties First message Second message
bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic a_messages \ --producer.config /tmp/team-a-client.properties First message Second message
Copy to Clipboard Copied! メッセージは Kafka に正常に生成されます。
- CTRL+C を押して CLI アプリケーションを終了します。
リクエストについて、Kafka コンテナーログで
Authorization GRANTED
のデバッグログを確認します。oc logs my-cluster-kafka-0 -f -n $NS
oc logs my-cluster-kafka-0 -f -n $NS
Copy to Clipboard Copied!
承認されたアクセスでのメッセージの消費
team-a-client
設定を使用して、トピック a_messages
からメッセージを消費します。
トピック
a_messages
からメッセージをフェッチします。bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic a_messages \ --from-beginning --consumer.config /tmp/team-a-client.properties
bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic a_messages \ --from-beginning --consumer.config /tmp/team-a-client.properties
Copy to Clipboard Copied! team-a-client
のDev Team A
ロールは、名前がa_
で始まるコンシューマーグループのみにアクセスできるため、リクエストはエラーを返します。team-a-client
プロパティーを更新し、使用が許可されているカスタムコンシューマーグループを指定します。bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic a_messages \ --from-beginning --consumer.config /tmp/team-a-client.properties --group a_consumer_group_1
bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic a_messages \ --from-beginning --consumer.config /tmp/team-a-client.properties --group a_consumer_group_1
Copy to Clipboard Copied! コンシューマーは
a_messages
トピックからすべてのメッセージを受信します。
承認されたアクセスでの Kafka の管理
team-a-client
はクラスターレベルのアクセスのないアカウントですが、一部の管理操作と使用することができます。
トピックをリスト表示します。
bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/team-a-client.properties --list
bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/team-a-client.properties --list
Copy to Clipboard Copied! a_messages
トピックが返されます。コンシューマーグループをリスト表示します。
bin/kafka-consumer-groups.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/team-a-client.properties --list
bin/kafka-consumer-groups.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/team-a-client.properties --list
Copy to Clipboard Copied! a_consumer_group_1
コンシューマーグループが返されます。クラスター設定の詳細を取得します。
bin/kafka-configs.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/team-a-client.properties \ --entity-type brokers --describe --entity-default
bin/kafka-configs.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/team-a-client.properties \ --entity-type brokers --describe --entity-default
Copy to Clipboard Copied! 操作には
team-a-client
にないクラスターレベルのパーミッションが必要なため、リクエストはエラーを返します。
異なるパーミッションを持つクライアントの使用
team-b-client
設定を使用して、b_
で始まるトピックにメッセージを生成します。
トピック
a_messages
に書き込む。bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic a_messages \ --producer.config /tmp/team-b-client.properties Message 1
bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic a_messages \ --producer.config /tmp/team-b-client.properties Message 1
Copy to Clipboard Copied! 以下のリクエストは、
Not authorized to access topics: [a_messages]
エラーを返します。トピック
b_messages
に書き込む。bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic b_messages \ --producer.config /tmp/team-b-client.properties Message 1 Message 2 Message 3
bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic b_messages \ --producer.config /tmp/team-b-client.properties Message 1 Message 2 Message 3
Copy to Clipboard Copied! メッセージは Kafka に正常に生成されます。
トピック
x_messages
に書き込む。bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \ --producer.config /tmp/team-b-client.properties Message 1
bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \ --producer.config /tmp/team-b-client.properties Message 1
Copy to Clipboard Copied! Not authorized to access topics: [x_messages]
エラーが返され、team-b-client
はトピックx_messages
からのみ読み取りできます。team-a-client
を使用してトピックx_messages
に書き込みます。bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \ --producer.config /tmp/team-a-client.properties Message 1
bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \ --producer.config /tmp/team-a-client.properties Message 1
Copy to Clipboard Copied! このリクエストは、
Not authorized to access topics: [x_messages]
エラーを返します。team-a-client
はx_messages
トピックに書き込みできますが、トピックが存在しない場合に作成するパーミッションがありません。team-a-client
がx_messages
トピックに書き込みできるようにするには、管理者 power user はパーティションやレプリカの数などの適切な設定で作成する必要があります。
承認された管理ユーザーでの Kafka の管理
管理者ユーザー alice
を使用して Kafka を管理します。alice
は、すべての Kafka クラスターのすべての管理にフルアクセスできます。
alice
としてx_messages
トピックを作成します。bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/alice.properties \ --topic x_messages --create --replication-factor 1 --partitions 1
bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/alice.properties \ --topic x_messages --create --replication-factor 1 --partitions 1
Copy to Clipboard Copied! トピックが正常に作成されました。
alice
としてすべてのトピックをリスト表示します。bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/alice.properties --list bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/team-a-client.properties --list bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/team-b-client.properties --list
bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/alice.properties --list bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/team-a-client.properties --list bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/team-b-client.properties --list
Copy to Clipboard Copied! 管理者ユーザーの
alice
はすべてのトピックをリスト表示できますが、team-a-client
とteam-b
-client は自分がアクセスできるトピックのみをリスト表示できます。Dev Team A
ロールとDev Team B
ロールは、どちらもx_
で始まるトピックに対するDescribe
権限を持っていますが、他のチームのトピックに対するDescribe
権限を持っていないため、他のチームのトピックを見ることができません。team-a-client
を使用して、x_messages
トピックにメッセージを生成します。bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \ --producer.config /tmp/team-a-client.properties Message 1 Message 2 Message 3
bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \ --producer.config /tmp/team-a-client.properties Message 1 Message 2 Message 3
Copy to Clipboard Copied! alice
がx_messages
トピックを作成すると、メッセージが正常に Kafka に生成されます。team-b-client
を使用して、x_messages
トピックにメッセージを生成します。bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \ --producer.config /tmp/team-b-client.properties Message 4 Message 5
bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \ --producer.config /tmp/team-b-client.properties Message 4 Message 5
Copy to Clipboard Copied! このリクエストは、
Not authorized to access topics: [x_messages]
エラーを返します。team-b-client
を使用して、x_messages
トピックからメッセージを消費します。bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \ --from-beginning --consumer.config /tmp/team-b-client.properties --group x_consumer_group_b
bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \ --from-beginning --consumer.config /tmp/team-b-client.properties --group x_consumer_group_b
Copy to Clipboard Copied! コンシューマーは、
x_messages
トピックからすべてのメッセージを受け取ります。team-a-client
を使用して、x_messages
トピックからメッセージを消費します。bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \ --from-beginning --consumer.config /tmp/team-a-client.properties --group x_consumer_group_a
bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \ --from-beginning --consumer.config /tmp/team-a-client.properties --group x_consumer_group_a
Copy to Clipboard Copied! このリクエストは、
Not authorized to access topics: [x_messages]
エラーを返します。team-a-client
を使用して、a_
で始まるコンシューマーグループからのメッセージを消費します。bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \ --from-beginning --consumer.config /tmp/team-a-client.properties --group a_consumer_group_a
bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \ --from-beginning --consumer.config /tmp/team-a-client.properties --group a_consumer_group_a
Copy to Clipboard Copied! このリクエストは、
Not authorized to access topics: [x_messages]
エラーを返します。Dev Team A
には、x_
で始まるトピックのRead
権限がありません。alice
を使用して、x_messages
トピックへのメッセージを生成します。bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \ --from-beginning --consumer.config /tmp/alice.properties
bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \ --from-beginning --consumer.config /tmp/alice.properties
Copy to Clipboard Copied! メッセージは Kafka に正常に生成されます。
alice
は、すべてのトピックに対して読み取りまたは書き込みを行うことができます。alice
を使用してクラスター設定を読み取ります。bin/kafka-configs.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/alice.properties \ --entity-type brokers --describe --entity-default
bin/kafka-configs.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/alice.properties \ --entity-type brokers --describe --entity-default
Copy to Clipboard Copied! この例のクラスター設定は空です。