14.5. OAuth 2.0 トークンベース承認の使用


トークンベースの認証に OAuth 2.0 と Red Hat Single Sign-On を使用している場合、Red Hat Single Sign-On を使用して認可ルールを設定し、Kafka ブローカーへのクライアントのアクセスを制限することもできます。認証はユーザーのアイデンティティーを確立します。認可は、そのユーザーのアクセスレベルを決定します。

AMQ Streams は、Red Hat Single Sign-On の 認証サービス による OAuth 2.0 トークンベースの認可をサポートします。これにより、セキュリティーポリシーとパーミッションの一元的な管理が可能になります。

Red Hat Single Sign-On で定義されたセキュリティーポリシーおよびパーミッションは、Kafka ブローカーのリソースへのアクセスを付与するために使用されます。ユーザーとクライアントは、Kafka ブローカーで特定のアクションを実行するためのアクセスを許可するポリシーに対して照合されます。

Kafka では、デフォルトですべてのユーザーにブローカーへのフルアクセスが許可されており、アクセスコントロールリスト (ACL) に基づいて認可を設定するための AclAuthorizer プラグインと StandardAuthorizer プラグインも提供されています。これらのプラグインによって管理される ACL ルールは、username に基づいてリソースへのアクセスを許可または拒否するために使用され、このようなルールは Kafka クラスター自体の中に保存されます。ただし、Red Hat Single Sign-On を使用した OAuth 2.0 トークンベースの認可では、より柔軟にアクセス制御を Kafka ブローカーに実装できます。さらに、Kafka ブローカーで OAuth 2.0 の認可および ACL が使用されるように設定することができます。

14.5.1. OAuth 2.0 の認可メカニズム

AMQ Streams の OAuth 2.0 での認可では、Red Hat Single Sign-On サーバーの Authorization Services REST エンドポイントを使用して、Red Hat Single Sign-On を使用するトークンベースの認証が拡張されます。これは、定義されたセキュリティーポリシーを特定のユーザーに適用し、そのユーザーの異なるリソースに付与されたパーミッションのリストを提供します。ポリシーはロールとグループを使用して、パーミッションをユーザーと照合します。OAuth 2.0 の認可では、Red Hat Single Sign-On の Authorization Services から受信した、ユーザーに付与された権限のリストを基にして、権限がローカルで強制されます。

14.5.1.1. Kafka ブローカーのカスタム authorizer

AMQ Streams では、Red Hat Single Sign-On の authorizer (KeycloakAuthorizer) が提供されます。Red Hat Single Sign-On によって提供される Authorization Services で Red Hat Single Sign-On REST エンドポイントを使用できるようにするには、Kafka ブローカーでカスタム authorizer を設定します。

authorizer は必要に応じて付与された権限のリストを認可サーバーから取得し、ローカルで Kafka ブローカーに認可を強制するため、クライアントの要求ごとに迅速な認可決定が行われます。

14.5.2. OAuth 2.0 認可サポートの設定

この手順では、Red Hat Single Sign-On の Authorization Services を使用して、OAuth 2.0 認可を使用するように Kafka ブローカーを設定する方法を説明します。

作業を開始する前に

特定のユーザーに必要なアクセス、または制限するアクセスについて検討してください。Red Hat Single Sign-On では、Red Hat Single Sign-On の グループロールクライアント、および ユーザー の組み合わせを使用して、アクセスを設定できます。

通常、グループは組織の部門または地理的な場所を基にしてユーザーを照合するために使用されます。また、ロールは職務を基にしてユーザーを照合するために使用されます。

Red Hat Single Sign-On を使用すると、ユーザーおよびグループを LDAP で保存できますが、クライアントおよびロールは LDAP で保存できません。ユーザーデータへのアクセスとストレージを考慮して、認可ポリシーの設定方法を選択する必要がある場合があります。

注記

スーパーユーザー は、Kafka ブローカーに実装された認可にかかわらず、常に制限なく Kafka ブローカーにアクセスできます。

前提条件

  • AMQ Streams は、トークンベースの認証 に Red Hat Single Sign-On と OAuth 2.0 を使用するように設定されている必要がある。認可を設定するときに、同じ Red Hat Single Sign-On サーバーエンドポイントを使用する必要があります。
  • OAuth 2.0 認証は、再認証を有効にするために maxSecondsWithoutReauthentication オプションで設定する必要があります。

手順

  1. Red Hat Single Sign-On の Admin Console にアクセスするか、Red Hat Single Sign-On の Admin CLI を使用して、OAuth 2.0 認証の設定時に作成した Kafka ブローカークライアントの Authorization Services を有効にします。
  2. 認可サービスを使用して、クライアントのリソース、認可スコープ、ポリシー、およびパーミッションを定義します。
  3. ロールとグループをユーザーとクライアントに割り当てて、パーミッションをユーザーとクライアントにバインドします。
  4. エディターで Kafka リソースの Kafka ブローカー設定 (Kafka.spec.kafka) を更新して、Kafka ブローカーで Red Hat Single Sign-On による認可が使用されるように設定します。

    oc edit kafka my-cluster
    Copy to Clipboard Toggle word wrap
  5. Kafka ブローカーの kafka 設定を指定して、keycloak による認可を使用し、認可サーバーと Red Hat Single Sign-On の Authorization Services にアクセスできるようにします。

    以下に例を示します。

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        # ...
        authorization:
          type: keycloak 
    1
    
          tokenEndpointUri: <https://<auth-server-address>/auth/realms/external/protocol/openid-connect/token> 
    2
    
          clientId: kafka 
    3
    
          delegateToKafkaAcls: false 
    4
    
          disableTlsHostnameVerification: false 
    5
    
          superUsers: 
    6
    
          - CN=fred
          - sam
          - CN=edward
          tlsTrustedCertificates: 
    7
    
          - secretName: oauth-server-cert
            certificate: ca.crt
          grantsRefreshPeriodSeconds: 60 
    8
    
          grantsRefreshPoolSize: 5 
    9
    
          grantsMaxIdleSeconds: 300 
    10
    
          grantsGcPeriodSeconds: 300 
    11
    
          grantsAlwaysLatest: false 
    12
    
          connectTimeoutSeconds: 60 
    13
    
          readTimeoutSeconds: 60 
    14
    
          httpRetries: 2 
    15
    
          enableMetrics: false 
    16
    
          includeAcceptHeader: false 
    17
    
        #...
    Copy to Clipboard Toggle word wrap
    1
    タイプ keycloak によって Red Hat Single Sign-On の認可が有効になります。
    2
    Red Hat Single Sign-On トークンエンドポイントの URI。実稼働環境の場合は、常に https:// urls を使用してください。トークンベースの oauth 認証を設定する場合、jwksEndpointUri をローカル JWT 検証の URI として指定します。tokenEndpointUri URI のホスト名は同じである必要があります。
    3
    認可サービスが有効になっている Red Hat Single Sign-On の OAuth 2.0 クライアント定義のクライアント ID。通常、kafka が ID として使用されます。
    4
    (オプション) Red Hat Single Sign-On Authorization Services ポリシーでアクセスが拒否された場合、Kafka AclAuthorizer および StandardAuthorizer に権限を委譲します。デフォルトは false です。
    5
    (任意設定): TLS ホスト名の検証を無効にします。デフォルトは false です。
    6
    (オプション) 指定されたスーパーユーザー。
    7
    (任意設定): 認可サーバーへの TLS 接続用の信用できる証明書。
    8
    (任意設定): 連続する付与 (Grants) 更新実行の間隔。これは、アクティブなセッションが Red Hat Single Sign-On でユーザーのパーミッション変更を検出する最大時間です。デフォルト値は 60 です。
    9
    (任意設定): アクティブなセッションの付与 (Grants) の更新 (並行して) に使用するスレッドの数。デフォルト値は 5 です。
    10
    (オプション) キャッシュ内のアイドル許可を削除できるようになるまでの時間 (秒単位)。デフォルト値は 300 です。
    11
    (オプション) キャッシュから古い許可を削除するジョブの連続実行間の時間 (秒単位)。デフォルト値は 300 です。
    12
    (オプション) 新しいセッションに対して最新の許可を取得するかどうかを制御します。有効にすると、許可が Red Hat Single Sign-On から取得され、ユーザーのためにキャッシュされます。デフォルト値は false です。
    13
    (オプション): Red Hat Single Sign-On トークンエンドポイントへの接続時のタイムアウト (秒単位)。デフォルト値は 60 です。
    14
    (オプション): Red Hat Single Sign-On トークンエンドポイントへの接続時の読み取りタイムアウト (秒単位)。デフォルト値は 60 です。
    15
    (オプション) 認可サーバーへの失敗した HTTP リクエストを (一時停止せずに) 再試行する最大回数。デフォルト値は 0 で、再試行は実行されないことを意味します。このオプションを効果的に使用するには、connectTimeoutSeconds オプションと readTimeoutSeconds オプションのタイムアウト時間を短縮することを検討してください。ただし、再試行により現在のワーカースレッドが他のリクエストで利用できなくなる可能性があり、リクエストが多すぎると停止する場合、Kafka ブローカーが応答しなくなる可能性があることに注意してください。
    16
    (オプション) OAuth メトリックを有効または無効にします。デフォルト値は false です。
    17
    (オプション) 一部の認可サーバーでは、クライアントが Accept: application/json ヘッダーを送信する際に問題が発生します。includeAcceptHeader: false を設定する場合、ヘッダーは送信されません。デフォルトは true です。
  6. エディターを保存して終了し、ローリング更新の完了を待ちます。
  7. 更新をログで確認するか、Pod 状態の遷移を監視して確認します。

    oc logs -f ${POD_NAME} -c kafka
    oc get pod -w
    Copy to Clipboard Toggle word wrap

    ローリング更新によって、ブローカーが OAuth 2.0 認可を使用するように設定されます。

  8. クライアントまたは特定のロールを持つユーザーとして Kafka ブローカーにアクセスして、設定したパーミッションを検証し、必要なアクセス権限があり、付与されるべきでないアクセス権限がないことを確認します。

14.5.3. Red Hat Single Sign-On の Authorization Services でのポリシーおよびパーミッションの管理

本セクションでは、Red Hat Single Sign-On Authorization Services および Kafka によって使用される認可モデルについて説明し、各モデルの重要な概念を定義します。

Kafka にアクセスするためのパーミッションを付与するには、Red Hat Single Sign-On で OAuth クライアント仕様を作成して、Red Hat Single Sign-On Authorization Services オブジェクトを Kafka リソースにマップできます。Kafka パーミッションは、Red Hat Single Sign-On Authorization Services ルールを使用して、ユーザーアカウントまたはサービスアカウントに付与されます。

トピックの作成やリスト表示など、一般的な Kafka 操作に必要なさまざまなユーザーパーミッションの を紹介します。

14.5.3.1. Kafka および Red Hat Single Sign-On 認可モデルの概要

Kafka および Red Hat Single Sign-On Authorization Services は、異なる認可モデルを使用します。

Kafka 認可モデル

Kafka の認可モデルはリソース型を使用します。Kafka クライアントがブローカーでアクションを実行すると、ブローカーは設定済みの KeycloakAuthorizer を使用して、アクションおよびリソースタイプを基にしてクライアントのパーミッションをチェックします。

Kafka は 5 つのリソースタイプを使用してアクセスを制御します ( TopicGroupClusterTransactionalId、および DelegationToken )。各リソースタイプには、利用可能なパーミッションセットがあります。

トピック

  • 作成
  • Write
  • 読み取り
  • Delete
  • Describe
  • DescribeConfigs
  • Alter
  • AlterConfigs

グループ

  • 読み取り
  • Describe
  • Delete

クラスター

  • 作成
  • Describe
  • Alter
  • DescribeConfigs
  • AlterConfigs
  • IdempotentWrite
  • ClusterAction

TransactionalId

  • Describe
  • Write

DelegationToken

  • Describe
Red Hat Single Sign-On の Authorization Services モデル

Red Hat Single Sign-On の Authorization Services には、パーミッションを定義および付与するための 4 つの概念があります。これらは リソース認可スコープポリシー、および パーミッション です。

リソース
リソースは、リソースを許可されたアクションと一致するために使用されるリソース定義のセットです。リソースは、個別のトピックであったり、名前が同じ接頭辞で始まるすべてのトピックであったりします。リソース定義は、利用可能なスコープのセットに関連付けられます。これは、リソースで利用可能なすべてのアクションのセットを表します。多くの場合、これらのアクションのサブセットのみが実際に許可されます。
承認スコープ
承認スコープは、特定のリソース定義で利用可能なすべてのアクションのセットです。新規リソースを定義するとき、すべてのスコープのセットからスコープを追加します。
ポリシー

ポリシーは、アカウントのリストと照合するための基準を使用する承認ルールです。ポリシーは以下と一致できます。

  • クライアント ID またはロールに基づくサービスアカウント
  • ユーザー名、グループ、またはロールに基づくユーザーアカウント
パーミッション
パーミッションは、特定のリソース定義の承認スコープのサブセットをユーザーのセットに付与します。

14.5.3.2. Red Hat Single Sign-On Authorization Services の Kafka 承認モデルへのマッピング

Kafka 承認モデルは、Kafka へのアクセスを制御する Red Hat Single Sign-On ロールおよびリソースを定義するベースとして使用されます。

ユーザーアカウントまたはサービスアカウントに Kafka パーミッションを付与するには、まず Kafka ブローカーの Red Hat Single Sign-On に OAuth クライアント仕様を作成します。次に、クライアントに Red Hat Single Sign-On の Authorization Services ルールを指定します。通常、ブローカーを表す OAuth クライアントのクライアント ID は kafka です。AMQ Streams で提供されている設定ファイルの例では、OAuth のクライアント ID として kafka を使用しています。

注記

複数の Kafka クラスターがある場合は、それらすべてに単一の OAuth クライアント (kafka) を使用できます。これにより、承認ルールを定義および管理するための単一の統合されたスペースが提供されます。ただし、異なる OAuth クライアント ID(例 my-cluster-kafka または cluster-dev-kafka) を使用し、各クライアント設定内の各クラスターの承認ルールを定義することもできます。

Kafka クライアント 定義では、Red Hat Single Sign-On 管理コンソールで Authorization Enabled オプションが有効になっている必要があります。

すべてのパーミッションは、kafka クライアントのスコープ内に存在します。異なる OAuth クライアント ID で異なる Kafka クラスターを設定した場合、同じ Red Hat Single Sign-On レルムの一部であっても、それぞれに個別のパーミッションセットが必要です。

Kafka クライアントが OAUTHBEARER 認証を使用する場合、Red Hat Single Sign-On オーソライザー (KeycloakAuthorizer) は現在のセッションのアクセストークンを使用して、Red Hat Single Sign-On サーバーからグラントのリストを取得します。許可を取得するために、オーソライザーは Red Hat Single Sign-On の Authorization Services ポリシーおよびパーミッションを評価します。

Kafka パーミッションの承認スコープ

通常、Red Hat Single Sign-On 初期設定では、承認スコープをアップロードして、各 Kafka リソースタイプで実行できるすべての可能なアクションのリストを作成します。この手順は、パーミッションを定義する前に 1 度のみ実行されます。承認スコープをアップロードする代わりに、手動で追加できます。

承認スコープには、リソースタイプに関係なく、可能なすべての Kafka パーミッションが含まれる必要があります。

  • 作成
  • Write
  • 読み取り
  • Delete
  • Describe
  • Alter
  • DescribeConfig
  • AlterConfig
  • ClusterAction
  • IdempotentWrite
注記

パーミッションが必要ない場合 (例: IdempotentWrite)、承認スコープのリストから省略できます。ただし、そのパーミッションは Kafka リソースをターゲットにすることはできません。

パーミッションチェックのリソースパターン

リソースパターンは、パーミッションチェックの実行時にターゲットリソースに対するパターンの照合に使用されます。一般的なパターン形式は RESOURCE-TYPE:PATTERN-NAME です。

リソースタイプは Kafka 承認モデルをミラーリングします。このパターンでは、次の 2 つの一致オプションが可能です。

  • 完全一致 (パターンが * で終了しない場合)
  • 接頭辞一致 (パターンが * で終了する)

リソースのパターン例

Topic:my-topic
Topic:orders-*
Group:orders-*
Cluster:*
Copy to Clipboard Toggle word wrap

さらに、一般的なパターンフォーマットは、kafka-cluster:CLUSTER-NAME の前にコンマを付けることができ、CLUSTER-NAMEは Kafka カスタムリソースの metadata.name を参照します。

クラスター接頭辞が付けられたリソースのパターン例

kafka-cluster:my-cluster,Topic:*
kafka-cluster:*,Group:b_*
Copy to Clipboard Toggle word wrap

kafka-cluster の接頭辞がない場合は、kafka-cluster:* とみなします。

リソースを定義するときに、リソースに関連する可能な承認スコープのリストを関連付けることができます。ターゲットリソースタイプに妥当なアクションを設定します。

任意の承認スコープを任意のリソースに追加できますが、リソースタイプでサポートされるスコープのみがアクセス制御の対象として考慮されます。

アクセスパーミッションを適用するポリシー

ポリシーは、1 つ以上のユーザーアカウントまたはサービスアカウントにパーミッションをターゲットにするために使用されます。以下がターゲットの対象になります。

  • 特定のユーザーまたはサービスアカウント
  • レルムロールまたはクライアントロール
  • ユーザーグループ
  • クライアント IP アドレスに一致する JavaScript ルール

ポリシーには一意の名前が割り当てられ、複数のリソースに対して複数の対象パーミッションを指定するために再使用できます。

アクセスを付与するためのパーミッション

詳細なパーミッションを使用して、ユーザーへのアクセスを付与するポリシー、リソース、および承認スコープをまとめます。

各パーミッションの名前によって、どのユーザーにどのパーミッションが付与されるかが明確に定義される必要があります。例えば、Dev Team B は x で始まるトピックから読むことができます

14.5.3.3. Kafka 操作に必要なパーミッションの例

以下の例は、Kafka で一般的な操作を実行するために必要なユーザーパーミッションを示しています。

トピックを作成します

トピックを作成するには、特定のトピック、または Cluster:kafka-cluster に対して Create パーミッションが必要です。

bin/kafka-topics.sh --create --topic my-topic \
  --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties
Copy to Clipboard Toggle word wrap

トピックのリスト表示

指定のトピックでユーザーに Describe パーミッションがある場合には、トピックがリスト表示されます。

bin/kafka-topics.sh --list \
  --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties
Copy to Clipboard Toggle word wrap

トピックの詳細の表示

トピックの詳細を表示するには、トピックに対して Describe および DescribeConfigs の権限が必要です。

bin/kafka-topics.sh --describe --topic my-topic \
  --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties
Copy to Clipboard Toggle word wrap

トピックへのメッセージの生成

トピックへのメッセージを作成するには、トピックに対する DescribeWrite の権限が必要です。

トピックが作成されておらず、トピックの自動生成が有効になっている場合は、トピックを作成するパーミッションが必要になります。

bin/kafka-console-producer.sh  --topic my-topic \
  --bootstrap-server my-cluster-kafka-bootstrap:9092 --producer.config=/tmp/config.properties
Copy to Clipboard Toggle word wrap

トピックからのメッセージの消費

トピックからのメッセージを消費するためには、トピックに DescribeRead のパーミッションが必要です。通常、トピックからの消費は、コンシューマーグループにコンシューマーオフセットを格納することに依存しており、これにはコンシューマーグループに対する追加の Describe および Read 権限が必要です。

マッチングには 2 つの resources が必要です。以下に例を示します。

Topic:my-topic
Group:my-group-*
Copy to Clipboard Toggle word wrap
bin/kafka-console-consumer.sh --topic my-topic --group my-group-1 --from-beginning \
  --bootstrap-server my-cluster-kafka-bootstrap:9092 --consumer.config /tmp/config.properties
Copy to Clipboard Toggle word wrap

べき等プロデューサーを使用したトピックへのメッセージの生成

Cluster:kafka-cluster リソースには、トピックをプロデュースするためのアクセス許可だけでなく、IdempotentWrite アクセス許可が追加で必要です。

マッチングには 2 つの resources が必要です。以下に例を示します。

Topic:my-topic
Cluster:kafka-cluster
Copy to Clipboard Toggle word wrap
bin/kafka-console-producer.sh  --topic my-topic \
  --bootstrap-server my-cluster-kafka-bootstrap:9092 --producer.config=/tmp/config.properties --producer-property enable.idempotence=true --request-required-acks -1
Copy to Clipboard Toggle word wrap

コンシューマーグループのリスト

コンシューマーグループのリスト表示時に、ユーザーが Describe 権限を持っているグループのみが返されます。また、ユーザーが Cluster:kafka-cluster に対して Describe パーミッションを持っている場合は、すべてのコンシューマーグループが返されます。

bin/kafka-consumer-groups.sh --list \
  --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties
Copy to Clipboard Toggle word wrap

コンシューマーグループの詳細の表示

コンシューマーグループの詳細を表示するには、グループとグループに関連するトピックに対して Describe 権限が必要です。

bin/kafka-consumer-groups.sh --describe --group my-group-1 \
  --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties
Copy to Clipboard Toggle word wrap

トピック設定の変更

トピックの設定を変更するには、トピックに DescribeAlter の権限が必要です。

bin/kafka-topics.sh --alter --topic my-topic --partitions 2 \
  --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties
Copy to Clipboard Toggle word wrap

Kafka ブローカー設定の表示

kafka-configs.sh を使用してブローカーの設定を取得するためには、Cluster:kafka-clusterDescribeConfigs パーミッションが必要です。

bin/kafka-configs.sh --entity-type brokers --entity-name 0 --describe --all \
  --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties
Copy to Clipboard Toggle word wrap

Kafka ブローカー設定の変更

Kafka ブローカーの設定を変更するには、Cluster:kafka-clusterDescribeConfigs および AlterConfigs パーミッションが必要です。

bin/kafka-configs --entity-type brokers --entity-name 0 --alter --add-config log.cleaner.threads=2 \
  --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties
Copy to Clipboard Toggle word wrap

トピックを削除します

トピックを削除するには、トピックに DescribeDelete の権限が必要です。

bin/kafka-topics.sh --delete --topic my-topic \
  --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties
Copy to Clipboard Toggle word wrap

リードパーティションの選択

トピックパーティションのリーダー選択を実行するには、Cluster:kafka-clusterAlter パーミッションが必要です。

bin/kafka-leader-election.sh --topic my-topic --partition 0 --election-type PREFERRED  /
  --bootstrap-server my-cluster-kafka-bootstrap:9092 --admin.config /tmp/config.properties
Copy to Clipboard Toggle word wrap

パーティションの再割り当て

パーティション再割り当てファイルを生成するためには、関係するトピックに対して Describe 権限が必要です。

bin/kafka-reassign-partitions.sh --topics-to-move-json-file /tmp/topics-to-move.json --broker-list "0,1" --generate \
  --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config /tmp/config.properties > /tmp/partition-reassignment.json
Copy to Clipboard Toggle word wrap

パーティション再割り当てを実行するには、Cluster:kafka-cluster に対して DescribeAlter のパーミッションが必要です。また、関係するトピックには、Describe の パーミッションが必要です。

bin/kafka-reassign-partitions.sh --reassignment-json-file /tmp/partition-reassignment.json --execute \
  --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config /tmp/config.properties
Copy to Clipboard Toggle word wrap

パーティション再割り当てを確認するには、Cluster:kafka-cluster および関連する各トピックに対して Describe および AlterConfigs のパーミッションが必要です。

bin/kafka-reassign-partitions.sh --reassignment-json-file /tmp/partition-reassignment.json --verify \
  --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config /tmp/config.properties
Copy to Clipboard Toggle word wrap

14.5.4. Red Hat Single Sign-On の Authorization Services の試行

この例では、Red Hat Single Sign-On Authorization Services を keycloak 認可で使用する方法を説明します。Red Hat Single Sign-On の Authorization Services を使用して、Kafka クライアントにアクセス制限を強制します。Red Hat Single Sign-On の Authorization Services では、承認スコープ、ポリシー、およびパーミッションを使用してアクセス制御をリソースに定義および適用します。

Red Hat Single Sign-On の Authorization Services REST エンドポイントは、認証されたユーザーのリソースに付与されたパーミッションのリストを提供します。許可 (パーミッション) のリストは、Kafka クライアントによって認証されたセッションが確立された後に最初のアクションとして Red Hat Single Sign-On サーバーから取得されます。付与の変更が検出されるように、バックグラウンドでリストが更新されます。付与は、各ユーザーセッションが迅速な承認決定を提供するために、Kafka ブローカーにてローカルでキャッシュおよび適用されます。

AMQ Streams には、設定ファイルのサンプル が用意されています。これには、Red Hat Single Sign-On を設定するための以下のサンプルファイルが含まれます。

kafka-ephemeral-oauth-single-keycloak-authz.yaml
Red Hat Single Sign-On を使用して OAuth 2.0 トークンベースの承認に設定された Kafka カスタムリソースの例。カスタムリソースを使用して、keycloak 認可およびトークンベースの oauth 認証を使用する Kafka クラスターをデプロイできます。
kafka-authz-realm.json
サンプルグループ、ユーザー、ロール、およびクライアントで設定された Red Hat Single Sign-On レルムの例。レルムを Red Hat Single Sign-On インスタンスにインポートし、Kafka にアクセスするための詳細なパーミッションを設定できます。

Red Hat Single Sign-On で例を試す場合は、これらのファイルを使用して、本セクションの順序で説明したタスクを実行します。

認証

トークンベースの oauth 認証を設定する場合、jwksEndpointUri をローカル JWT 検証の URI として指定します。keycloak 承認を設定するとき、a tokenEndpointUri を Red Hat Single Sign-On トークンエンドポイントの URI として指定します。両方の URI のホスト名は同じである必要があります。

グループまたはロールポリシーを使用した対象パーミッション

Red Hat Single Sign-On では、サービスアカウントが有効になっている機密性の高いクライアントを、クライアント ID とシークレットを使用して、独自の名前のサーバーに対して認証できます。これは、通常、特定ユーザーのエージェント (Web サイトなど) としてではなく、独自の名前で動作するマイクロサービスに便利です。サービスアカウントには、通常のユーザーと同様にロールを割り当てることができます。ただし、グループを割り当てることはできません。そのため、サービスアカウントを使用してマイクロサービスへのパーミッションをターゲットにする場合は、グループポリシーを使用できないため、代わりにロールポリシーを使用する必要があります。逆に、ユーザー名およびパスワードを使用した認証が必要な通常のユーザーアカウントにのみ特定のパーミッションを制限する場合は、ロールポリシーではなく、グループポリシーを使用すると、副次的に実現することができます。これは、ClusterManager で始まるパーミッションの例で使用されるものです。通常、クラスター管理の実行は CLI ツールを使用して対話的に行われます。結果的に生成されるアクセストークンを使用して Kafka ブローカーに対して認証を行う前に、ユーザーのログインを要求することは妥当です。この場合、アクセストークンはクライアントアプリケーションではなく、特定のユーザーを表します。

14.5.4.1. Red Hat Single Sign-On 管理コンソールへのアクセス

Red Hat Single Sign-On を設定してから、管理コンソールに接続し、事前設定されたレルムを追加します。kafka-authz-realm.json ファイルのサンプルを使用して、レルムをインポートします。管理コンソールのレルムに定義された承認ルールを確認できます。このルールは、Red Hat Single Sign-On レルムの例を使用するよう設定された Kafka クラスターのリソースへのアクセスを許可します。

前提条件

  • 実行中の OpenShift クラスター。
  • 事前設定されたレルムが含まれる AMQ Streams の examples/security/keycloak-authorization/kafka-authz-realm.json ファイル。

手順

  1. Red Hat Single Sign-On ドキュメントの Server Installation and Configuration の説明にしたがって、Red Hat Single Sign-On Operator を使用して Red Hat Single Sign-On サーバーをインストールします。
  2. Red Hat Single Sign-On インスタンスが実行されるまで待ちます。
  3. 管理コンソールにアクセスできるように外部ホスト名を取得します。

    NS=sso
    oc get ingress keycloak -n $NS
    Copy to Clipboard Toggle word wrap

    この例では、Red Hat Single Sign-On サーバーが sso namespace で実行されていることを前提としています。

  4. admin ユーザーのパスワードを取得します。

    oc get -n $NS pod keycloak-0 -o yaml | less
    Copy to Clipboard Toggle word wrap

    パスワードはシークレットとして保存されるため、Red Hat Single Sign-On インスタンスの設定 YAML ファイルを取得して、シークレット名 (secretKeyRef.name) を特定します。

  5. シークレットの名前を使用して、クリアテキストのパスワードを取得します。

    SECRET_NAME=credential-keycloak
    oc get -n $NS secret $SECRET_NAME -o yaml | grep PASSWORD | awk '{print $2}' | base64 -D
    Copy to Clipboard Toggle word wrap

    この例では、シークレットの名前が credential-keycloak であることを前提としています。

  6. ユーザー名 admin と取得したパスワードを使用して、管理コンソールにログインします。

    https://HOSTNAME を使用して Kubernetes Ingress にアクセスします。

    管理コンソールを使用して、サンプルレルムを Red Hat Single Sign-On にアップロードできるようになりました。

  7. Add Realm をクリックして、サンプルレルムをインポートします。
  8. examples/security/keycloak-authorization/kafka-authz-realm.json ファイルを追加してから Create をクリックします。

    これで、管理コンソールの現在のレルムとして kafka-authz が含まれるようになりました。

    デフォルトビューには、Master レルムが表示されます。

  9. Red Hat Single Sign-On 管理コンソールで Clients > kafka > Authorization > Settings の順に移動し、Decision StrategyAffirmative に設定されていることを確認します。

    肯定的な (Affirmative) ポリシーとは、クライアントが Kafka クラスターにアクセスするためには少なくとも 1 つのポリシーが満たされている必要があることを意味します。

  10. Red Hat Single Sign-On 管理コンソールで、GroupsUsersRoles、および Clients と移動して、レルム設定を表示します。

    グループ
    Groups は、ユーザーグループの作成やユーザー権限の設定に使用します。グループは、名前が割り当てられたユーザーのセットです。地域、組織、または部門単位に区分するために使用されます。グループは LDAP アイデンティティープロバイダーにリンクできます。Kafka リソースにパーミッションを付与するなど、カスタム LDAP サーバー管理ユーザーインターフェイスを使用して、ユーザーをグループのメンバーにすることができます。
    ユーザー
    Users は、ユーザーを作成するために使用されます。この例では、alicebob が定義されています。aliceClusterManager グループのメンバーであり、bobClusterManager-my-cluster グループのメンバーです。ユーザーは LDAP アイデンティティープロバイダーに保存できます。
    ロール
    Roles は、ユーザーやクライアントが特定の権限を持っていることを示すものです。ロールはグループに似た概念です。通常ロールは、組織ロールでユーザーを タグ付け するために使用され、必要なパーミッションを持ちます。ロールは LDAP アイデンティティープロバイダーに保存できません。LDAP が必須である場合は、代わりにグループを使用し、Red Hat Single Sign-On ロールをグループに追加して、ユーザーにグループを割り当てるときに対応するロールも取得するようにします。
    Clients

    Clients は特定の設定を持つことができます。この例では、kafkakafka-cliteam-a-clientteam-b-client の各クライアントが設定されています。

    • kafka クライアントは、Kafka ブローカーがアクセストークンの検証に必要な OAuth 2.0 通信を行うために使用されます。このクライアントには、Kafka ブローカーで承認を実行するために使用される承認サービスリソース定義、ポリシー、および承認スコープも含まれます。認可設定は kafka クライアントの Authorization タブで定義され、Settings タブでAuthorization Enabled をオンにすると表示されます。
    • kafka-cli クライアントは、アクセストークンまたは更新トークンを取得するためにユーザー名とパスワードを使用して認証するときに Kafka コマンドラインツールによって使用されるパブリッククライアントです。
    • team-a-client および team-b-client クライアントは、特定の Kafka トピックに部分的にアクセスできるサービスを表す機密クライアントです。
  11. Red Hat Single Sign-On 管理コンソールで、Authorization > Permissions の順に移動し、レルムに定義されたリソースおよびポリシーを使用する付与されたパーミッションを確認します。

    たとえば、kafka クライアントには以下のパーミッションがあります。

    Dev Team A can write to topics that start with x_ on any cluster
    Dev Team B can read from topics that start with x_ on any cluster
    Dev Team B can update consumer group offsets that start with x_ on any cluster
    ClusterManager of my-cluster Group has full access to cluster config on my-cluster
    ClusterManager of my-cluster Group has full access to consumer groups on my-cluster
    ClusterManager of my-cluster Group has full access to topics on my-cluster
    Copy to Clipboard Toggle word wrap
    Dev Team A
    Dev チーム A レルムロールは、任意のクラスターで x_ で始まるトピックに書き込みできます。これは、Topic:x_* というリソース、DescribeWrite のスコープ、そして Dev Team A のポリシーを組み合わせたものです。Dev Team A ポリシーは、Dev Team A というレルムロールを持つすべてのユーザーにマッチします。
    Dev Team B
    Dev チーム B レルムロールは、任意のクラスターで x_ で始まるトピックから読み取ることができます。これは、Topic:x_*、 Group:x_* のリソース、DescribeRead のスコープ、および Dev Team B のポリシーを組み合わせたものです。Dev Team B ポリシーは、Dev Team B というレルムロールを持つすべてのユーザーにマッチします。一致するユーザーおよびクライアントはトピックから読み取りでき、名前が x_ で始まるトピックおよびコンシューマーグループの消費されたオフセットを更新できます。

14.5.4.2. Red Hat Single Sign-On 承認をでの Kafka クラスターのデプロイメント

Red Hat Single Sign-On サーバーに接続するように設定された Kafka クラスターをデプロイします。サンプルの kafka-ephemeral-oauth-single-keycloak-authz.yaml ファイルを使用して、Kafka カスタムリソースとして Kafka クラスターをデプロイメントします。この例では、keycloak 承認と oauth 認証を使用して単一ノードの Kafka クラスターをデプロイします。

前提条件

  • Red Hat Single Sign-On 認可サーバーが OpenShift クラスターにデプロイされ、サンプルレルムでロードされている。
  • Cluster Operator が OpenShift クラスターにデプロイされている。
  • AMQ Streams の examples/security/keycloak-authorization/kafka-ephemeral-oauth-single-keycloak-authz.yaml カスタムリソース。

手順

  1. デプロイした Red Hat Single Sign-On インスタンスのホスト名を使用して、Kafka ブローカーのトラストストア証明書を準備し、Red Hat Single Sign-On サーバーと通信します。

    SSO_HOST=SSO-HOSTNAME
    SSO_HOST_PORT=$SSO_HOST:443
    STOREPASS=storepass
    
    echo "Q" | openssl s_client -showcerts -connect $SSO_HOST_PORT 2>/dev/null | awk ' /BEGIN CERTIFICATE/,/END CERTIFICATE/ { print $0 } ' > /tmp/sso.pem
    Copy to Clipboard Toggle word wrap

    Kubernetes Ingress は セキュアな (HTTPS) 接続を確立するために使用されるため、証明書が必要です。

    通常、単一の証明書ではなく、証明書チェーンがあります。指定する必要があるのは、/tmp/sso.pem ファイルの最後にリストされている最上位の発行者 CA だけです。手動で抽出するか、次のコマンドを使用して抽出できます。

    証明書チェーンの最上位の CA 証明書を抽出するコマンドの例

    split -p "-----BEGIN CERTIFICATE-----" sso.pem sso-
    for f in $(ls sso-*); do mv $f $f.pem; done
    cp $(ls sso-* | sort -r | head -n 1) sso-ca.crt
    Copy to Clipboard Toggle word wrap

    注記

    信頼できる CA 証明書は通常、openssl コマンドを使用するのではなく、信頼できるソースから取得します。

  2. シークレットとして OpenShift に証明書をデプロイします。

    oc create secret generic oauth-server-cert --from-file=/tmp/sso-ca.crt -n $NS
    Copy to Clipboard Toggle word wrap
  3. ホスト名を環境変数として設定します。

    SSO_HOST=SSO-HOSTNAME
    Copy to Clipboard Toggle word wrap
  4. サンプル Kafka クラスターを作成およびデプロイします。

    cat examples/security/keycloak-authorization/kafka-ephemeral-oauth-single-keycloak-authz.yaml | sed -E 's#\${SSO_HOST}'"#$SSO_HOST#" | oc create -n $NS -f -
    Copy to Clipboard Toggle word wrap

14.5.4.3. CLI Kafka クライアントセッションの TLS 接続の準備

対話型 CLI セッション用の新規 Pod を作成します。TLS 接続用の Red Hat Single Sign-On 証明書を使用してトラストストアを設定します。トラストストアは、Red Hat Single Sign-On および Kafka ブローカーに接続します。

前提条件

  • Red Hat Single Sign-On 認可サーバーが OpenShift クラスターにデプロイされ、サンプルレルムでロードされている。

    Red Hat Single Sign-On 管理コンソールで、クライアントに割り当てられたロールが Clients > Service Account Roles に表示されることを確認します。

  • Red Hat Single Sign-On に接続するように設定された Kafka クラスターが OpenShift クラスターにデプロイされている。

手順

  1. AMQ Streams の Kafka イメージを使用してインタラクティブな Pod コンテナーを新たに実行し、稼働中の Kafka ブローカーに接続します。

    NS=sso
    oc run -ti --restart=Never --image=registry.redhat.io/amq-streams/kafka-36-rhel8:2.6.0 kafka-cli -n $NS -- /bin/sh
    Copy to Clipboard Toggle word wrap
    注記

    イメージのダウンロードの待機中に oc がタイムアウトする場合、その後の試行によって an AlreadyExists エラーが発生することがあります。

  2. Pod コンテナーにアタッチします。

    oc attach -ti kafka-cli -n $NS
    Copy to Clipboard Toggle word wrap
  3. Red Hat Single Sign-On インスタンスのホスト名を使用して、TLS を使用してクライアントコネクションの証明書を準備します。

    SSO_HOST=SSO-HOSTNAME
    SSO_HOST_PORT=$SSO_HOST:443
    STOREPASS=storepass
    
    echo "Q" | openssl s_client -showcerts -connect $SSO_HOST_PORT 2>/dev/null | awk ' /BEGIN CERTIFICATE/,/END CERTIFICATE/ { print $0 } ' > /tmp/sso.pem
    Copy to Clipboard Toggle word wrap

    通常、単一の証明書ではなく、証明書チェーンがあります。指定する必要があるのは、/tmp/sso.pem ファイルの最後にリストされている最上位の発行者 CA だけです。手動で抽出するか、次のコマンドを使用して抽出できます。

    証明書チェーンの最上位の CA 証明書を抽出するコマンドの例

    split -p "-----BEGIN CERTIFICATE-----" sso.pem sso-
    for f in $(ls sso-*); do mv $f $f.pem; done
    cp $(ls sso-* | sort -r | head -n 1) sso-ca.crt
    Copy to Clipboard Toggle word wrap

    注記

    信頼できる CA 証明書は通常、openssl コマンドを使用するのではなく、信頼できるソースから取得します。

  4. Kafka ブローカーへの TLS 接続のトラストストアを作成します。

    keytool -keystore /tmp/truststore.p12 -storetype pkcs12 -alias sso -storepass $STOREPASS -import -file /tmp/sso-ca.crt -noprompt
    Copy to Clipboard Toggle word wrap
  5. Kafka ブートストラップアドレスを Kafka ブローカーのホスト名および tls リスナーポート (9093) のホスト名として使用し、Kafka ブローカーの証明書を準備します。

    KAFKA_HOST_PORT=my-cluster-kafka-bootstrap:9093
    STOREPASS=storepass
    
    echo "Q" | openssl s_client -showcerts -connect $KAFKA_HOST_PORT 2>/dev/null | awk ' /BEGIN CERTIFICATE/,/END CERTIFICATE/ { print $0 } ' > /tmp/my-cluster-kafka.pem
    Copy to Clipboard Toggle word wrap

    取得した .pem ファイルは通常、1 つの証明書ではなく、証明書チェーンです。指定する必要があるのは、/tmp/my-cluster-kafka.pem ファイルの最後にリストされている最上位の発行者 CA のみです。手動で抽出するか、次のコマンドを使用して抽出できます。

    証明書チェーンの最上位の CA 証明書を抽出するコマンドの例

    split -p "-----BEGIN CERTIFICATE-----" /tmp/my-cluster-kafka.pem kafka-
    for f in $(ls kafka-*); do mv $f $f.pem; done
    cp $(ls kafka-* | sort -r | head -n 1) my-cluster-kafka-ca.crt
    Copy to Clipboard Toggle word wrap

    注記

    信頼できる CA 証明書は通常、openssl コマンドを使用するのではなく、信頼できるソースから取得します。この例では、Kafka クラスターがデプロイされたのと同じ名前空間の Pod でクライアントが実行されていると想定しています。クライアントが OpenShift クラスターの外部から Kafka クラスターにアクセスしている場合は、最初にブートストラップアドレスを決定する必要があります。その場合、クラスター証明書を OpenShift シークレットから直接取得することもでき、openssl は必要ありません。詳細は、13章Kafka クラスターへのクライアントアクセスの設定 を参照してください。

  6. Kafka ブローカーの証明書をトラストストアに追加します。

    keytool -keystore /tmp/truststore.p12 -storetype pkcs12 -alias my-cluster-kafka -storepass $STOREPASS -import -file /tmp/my-cluster-kafka-ca.crt -noprompt
    Copy to Clipboard Toggle word wrap

    承認されたアクセスを確認するために、セッションを開いたままにします。

14.5.4.4. CLI Kafka クライアントセッションを使用した Kafka への承認されたアクセスの確認

対話型 CLI セッションを使用して、Red Hat Single Sign-On レルムを通じて適用される承認ルールを確認します。Kafka のサンプルプロデューサーおよびコンシューマークライアントを使用してチェックを適用し、異なるレベルのアクセスを持つユーザーおよびサービスアカウントでトピックを作成します。

team-a-client クライアントおよび team-b-client クライアントを使用して、承認ルールを確認します。alice admin ユーザーを使用して、Kafka で追加の管理タスクを実行します。

この例で使用される AMQ Streams Kafka イメージには、Kafka プロデューサーおよびコンシューマーバイナリーが含まれます。

前提条件

クライアントおよび管理ユーザーの設定

  1. team-a-client クライアントの認証プロパティーで Kafka 設定ファイルを準備します。

    SSO_HOST=SSO-HOSTNAME
    
    cat > /tmp/team-a-client.properties << EOF
    security.protocol=SASL_SSL
    ssl.truststore.location=/tmp/truststore.p12
    ssl.truststore.password=$STOREPASS
    ssl.truststore.type=PKCS12
    sasl.mechanism=OAUTHBEARER
    sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
      oauth.client.id="team-a-client" \
      oauth.client.secret="team-a-client-secret" \
      oauth.ssl.truststore.location="/tmp/truststore.p12" \
      oauth.ssl.truststore.password="$STOREPASS" \
      oauth.ssl.truststore.type="PKCS12" \
      oauth.token.endpoint.uri="https://$SSO_HOST/auth/realms/kafka-authz/protocol/openid-connect/token" ;
    sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
    EOF
    Copy to Clipboard Toggle word wrap

    SASL OAUTHBEARER メカニズムが使用されます。このメカニズムにはクライアント ID とクライアントシークレットが必要です。これは、クライアントが最初に Red Hat Single Sign-On サーバーに接続してアクセストークンを取得することを意味します。その後、クライアントは Kafka ブローカーに接続し、アクセストークンを使用して認証します。

  2. team-b-client クライアントの認証プロパティーで Kafka 設定ファイルを準備します。

    cat > /tmp/team-b-client.properties << EOF
    security.protocol=SASL_SSL
    ssl.truststore.location=/tmp/truststore.p12
    ssl.truststore.password=$STOREPASS
    ssl.truststore.type=PKCS12
    sasl.mechanism=OAUTHBEARER
    sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
      oauth.client.id="team-b-client" \
      oauth.client.secret="team-b-client-secret" \
      oauth.ssl.truststore.location="/tmp/truststore.p12" \
      oauth.ssl.truststore.password="$STOREPASS" \
      oauth.ssl.truststore.type="PKCS12" \
      oauth.token.endpoint.uri="https://$SSO_HOST/auth/realms/kafka-authz/protocol/openid-connect/token" ;
    sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
    EOF
    Copy to Clipboard Toggle word wrap
  3. curl を使用して管理者ユーザー alice を認証し、パスワード付与認証を実行して更新トークンを取得します。

    USERNAME=alice
    PASSWORD=alice-password
    
    GRANT_RESPONSE=$(curl -X POST "https://$SSO_HOST/auth/realms/kafka-authz/protocol/openid-connect/token" -H 'Content-Type: application/x-www-form-urlencoded' -d "grant_type=password&username=$USERNAME&password=$PASSWORD&client_id=kafka-cli&scope=offline_access" -s -k)
    
    REFRESH_TOKEN=$(echo $GRANT_RESPONSE | awk -F "refresh_token\":\"" '{printf $2}' | awk -F "\"" '{printf $1}')
    Copy to Clipboard Toggle word wrap

    更新トークンは、有効期間がなく、期限切れにならないオフライントークンです。

  4. admin ユーザー alice の認証プロパティーで Kafka 設定ファイルを準備します。

    cat > /tmp/alice.properties << EOF
    security.protocol=SASL_SSL
    ssl.truststore.location=/tmp/truststore.p12
    ssl.truststore.password=$STOREPASS
    ssl.truststore.type=PKCS12
    sasl.mechanism=OAUTHBEARER
    sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
      oauth.refresh.token="$REFRESH_TOKEN" \
      oauth.client.id="kafka-cli" \
      oauth.ssl.truststore.location="/tmp/truststore.p12" \
      oauth.ssl.truststore.password="$STOREPASS" \
      oauth.ssl.truststore.type="PKCS12" \
      oauth.token.endpoint.uri="https://$SSO_HOST/auth/realms/kafka-authz/protocol/openid-connect/token" ;
    sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
    EOF
    Copy to Clipboard Toggle word wrap

    kafka-cli パブリッククライアントは、sasl.jaas .configoauth.client. id に使用されます。これはパブリッククライアントであるため、シークレットは必要ありません。クライアントは直前の手順で認証された更新トークンで認証されます。更新トークンは背後でアクセストークンを要求します。これは、認証のために Kafka ブローカーに送信されます。

承認されたアクセスでのメッセージの生成

team-a-client の設定を使用して、a_x_ で始まるトピックへのメッセージを作成できるかどうかを確認します。

  1. トピック my-topic に書き込みます。

    bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic my-topic \
      --producer.config=/tmp/team-a-client.properties
    First message
    Copy to Clipboard Toggle word wrap

    以下のリクエストは、Not authorized to access topics: [my-topic] エラーを返します。

    team-a-clientDev Team A ロールを持っており、a_ で始まるトピックに対してサポートされているすべてのアクションを実行する権限を与えられていますが、x_ で始まるトピックへの書き込みのみ可能です。my-topic という名前のトピックは、これらのルールのいずれにも一致しません。

  2. トピック a_messages に書き込む。

    bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic a_messages \
      --producer.config /tmp/team-a-client.properties
    First message
    Second message
    Copy to Clipboard Toggle word wrap

    メッセージは Kafka に正常に生成されます。

  3. CTRL+C を押して CLI アプリケーションを終了します。
  4. リクエストについて、Kafka コンテナーログで Authorization GRANTED のデバッグログを確認します。

    oc logs my-cluster-kafka-0 -f -n $NS
    Copy to Clipboard Toggle word wrap

承認されたアクセスでのメッセージの消費

team-a-client 設定を使用して、トピック a_messages からメッセージを消費します。

  1. トピック a_messages からメッセージをフェッチします。

    bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic a_messages \
      --from-beginning --consumer.config /tmp/team-a-client.properties
    Copy to Clipboard Toggle word wrap

    team-a-clientDev Team A ロールは、名前が a_ で始まるコンシューマーグループのみにアクセスできるため、リクエストはエラーを返します。

  2. team-a-client プロパティーを更新し、使用が許可されているカスタムコンシューマーグループを指定します。

    bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic a_messages \
      --from-beginning --consumer.config /tmp/team-a-client.properties --group a_consumer_group_1
    Copy to Clipboard Toggle word wrap

    コンシューマーは a_messages トピックからすべてのメッセージを受信します。

承認されたアクセスでの Kafka の管理

team-a-client はクラスターレベルのアクセスのないアカウントですが、一部の管理操作と使用することができます。

  1. トピックをリスト表示します。

    bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/team-a-client.properties --list
    Copy to Clipboard Toggle word wrap

    a_messages トピックが返されます。

  2. コンシューマーグループをリスト表示します。

    bin/kafka-consumer-groups.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/team-a-client.properties --list
    Copy to Clipboard Toggle word wrap

    a_consumer_group_1 コンシューマーグループが返されます。

    クラスター設定の詳細を取得します。

    bin/kafka-configs.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/team-a-client.properties \
      --entity-type brokers --describe --entity-default
    Copy to Clipboard Toggle word wrap

    操作には team-a-client にないクラスターレベルのパーミッションが必要なため、リクエストはエラーを返します。

異なるパーミッションを持つクライアントの使用

team-b-client 設定を使用して、b_ で始まるトピックにメッセージを生成します。

  1. トピック a_messages に書き込む。

    bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic a_messages \
      --producer.config /tmp/team-b-client.properties
    Message 1
    Copy to Clipboard Toggle word wrap

    以下のリクエストは、Not authorized to access topics: [a_messages] エラーを返します。

  2. トピック b_messages に書き込む。

    bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic b_messages \
      --producer.config /tmp/team-b-client.properties
    Message 1
    Message 2
    Message 3
    Copy to Clipboard Toggle word wrap

    メッセージは Kafka に正常に生成されます。

  3. トピック x_messages に書き込む。

    bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \
      --producer.config /tmp/team-b-client.properties
    Message 1
    Copy to Clipboard Toggle word wrap

    Not authorized to access topics: [x_messages] エラーが返され、team-b-client はトピック x_messages からのみ読み取りできます。

  4. team-a-client を使用してトピック x_messages に書き込みます。

    bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \
      --producer.config /tmp/team-a-client.properties
    Message 1
    Copy to Clipboard Toggle word wrap

    このリクエストは、Not authorized to access topics: [x_messages] エラーを返します。team-a-clientx_messages トピックに書き込みできますが、トピックが存在しない場合に作成するパーミッションがありません。team-a-clientx_messages トピックに書き込みできるようにするには、管理者 power user はパーティションやレプリカの数などの適切な設定で作成する必要があります。

承認された管理ユーザーでの Kafka の管理

管理者ユーザー alice を使用して Kafka を管理します。alice は、すべての Kafka クラスターのすべての管理にフルアクセスできます。

  1. alice として x_messages トピックを作成します。

    bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/alice.properties \
      --topic x_messages --create --replication-factor 1 --partitions 1
    Copy to Clipboard Toggle word wrap

    トピックが正常に作成されました。

  2. alice としてすべてのトピックをリスト表示します。

    bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/alice.properties --list
    bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/team-a-client.properties --list
    bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/team-b-client.properties --list
    Copy to Clipboard Toggle word wrap

    管理者ユーザーの alice はすべてのトピックをリスト表示できますが、team-a-clientteam-b-client は自分がアクセスできるトピックのみをリスト表示できます。

    Dev Team A ロールと Dev Team B ロールは、どちらも x_ で始まるトピックに対する Describe 権限を持っていますが、他のチームのトピックに対する Describe 権限を持っていないため、他のチームのトピックを見ることができません。

  3. team-a-client を使用して、x_messages トピックにメッセージを生成します。

    bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \
      --producer.config /tmp/team-a-client.properties
    Message 1
    Message 2
    Message 3
    Copy to Clipboard Toggle word wrap

    alicex_messages トピックを作成すると、メッセージが正常に Kafka に生成されます。

  4. team-b-client を使用して、x_messages トピックにメッセージを生成します。

    bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \
      --producer.config /tmp/team-b-client.properties
    Message 4
    Message 5
    Copy to Clipboard Toggle word wrap

    このリクエストは、Not authorized to access topics: [x_messages] エラーを返します。

  5. team-b-client を使用して、x_messages トピックからメッセージを消費します。

    bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \
      --from-beginning --consumer.config /tmp/team-b-client.properties --group x_consumer_group_b
    Copy to Clipboard Toggle word wrap

    コンシューマーは、x_messages トピックからすべてのメッセージを受け取ります。

  6. team-a-client を使用して、x_messages トピックからメッセージを消費します。

    bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \
      --from-beginning --consumer.config /tmp/team-a-client.properties --group x_consumer_group_a
    Copy to Clipboard Toggle word wrap

    このリクエストは、Not authorized to access topics: [x_messages] エラーを返します。

  7. team-a-client を使用して、a_ で始まるコンシューマーグループからのメッセージを消費します。

    bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \
      --from-beginning --consumer.config /tmp/team-a-client.properties --group a_consumer_group_a
    Copy to Clipboard Toggle word wrap

    このリクエストは、Not authorized to access topics: [x_messages] エラーを返します。

    Dev Team A には、x_ で始まるトピックの Read 権限がありません。

  8. alice を使用して、x_messages トピックへのメッセージを生成します。

    bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \
      --from-beginning --consumer.config /tmp/alice.properties
    Copy to Clipboard Toggle word wrap

    メッセージは Kafka に正常に生成されます。

    alice は、すべてのトピックに対して読み取りまたは書き込みを行うことができます。

  9. alice を使用してクラスター設定を読み取ります。

    bin/kafka-configs.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/alice.properties \
      --entity-type brokers --describe --entity-default
    Copy to Clipboard Toggle word wrap

    この例のクラスター設定は空です。

トップに戻る
Red Hat logoGithubredditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。 最新の更新を見る.

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

Theme

© 2025 Red Hat