6.5. OAuth 2.0 トークンベース認証の使用


Streams for Apache Kafka は、OAUTHBEARER および PLAIN メカニズムを使用した OAuth 2.0 認証 の使用をサポートしています。

OAuth 2.0 は、アプリケーション間で標準的なトークンベースの認証および認可を有効にし、中央の認可サーバーを使用してリソースに制限されたアクセス権限を付与するトークンを発行します。

OAuth 2.0 認証を設定した後に OAuth 2.0 認可 を設定できます。

Kafka ブローカーおよびクライアントの両方が OAuth 2.0 を使用するように設定する必要があります。OAuth 2.0 認証は、simple または OPA ベースの Kafka 認可と併用することもできます。

OAuth 2.0 認証を使用すると、アプリケーションクライアントはアカウントのクレデンシャルを公開せずにアプリケーションサーバー (リソースサーバー と呼ばれる) のリソースにアクセスできます。

アプリケーションクライアントは、アクセストークンを認証の手段として渡します。アプリケーションサーバーはこれを使用して、付与するアクセス権限のレベルを決定することもできます。認可サーバーは、アクセスの付与とアクセスに関する問い合わせを処理します。

Streams for Apache Kafka のコンテキストでは、以下が行われます。

  • Kafka ブローカーは OAuth 2.0 リソースサーバーとして動作します。
  • Kafka クライアントは OAuth 2.0 アプリケーションクライアントとして動作します。

Kafka クライアントは Kafka ブローカーに対して認証を行います。ブローカーおよびクライアントは、必要に応じて OAuth 2.0 認可サーバーと通信し、アクセストークンを取得または検証します。

Streams for Apache Kafka のデプロイメントでは、OAuth 2.0 統合により以下が提供されます。

  • Kafka ブローカーのサーバー側 OAuth 2.0 サポート
  • Kafka MirrorMaker、Kafka Connect、および Kafka Bridge のクライアント側 OAuth 2.0 サポート

RHEL 上の Streams for Apache Kafka には、2 つの OAuth 2.0 ライブラリーが含まれています。

kafka-oauth-client
io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler という名前のカスタムログインコールバックハンドラークラスを提供します。OAUTHBEARER 認証メカニズムを処理するには、Apache Kafka が提供する OAuthBearerLoginModule でログインコールバックハンドラーを使用します。
kafka-oauth-common
kafka-oauth-client ライブラリーに必要な機能の一部を提供するヘルパーライブラリーです。

提供されるクライアントライブラリーは、keycloak-corejackson-databind、および slf4j-api などの追加のサードパーティーライブラリーにも依存します。

Maven プロジェクトを使用してクライアントをパッケージ化し、すべての依存関係ライブラリーが含まれるようにすることが推奨されます。依存関係ライブラリーは今後のバージョンで変更される可能性があります。

6.5.1. OAuth 2.0 認証メカニズム

Streams for Apache Kafka は、OAuth 2.0 認証用の OAUTHBEARER および PLAIN メカニズムをサポートしています。どちらのメカニズムも、Kafka クライアントが Kafka ブローカーで認証されたセッションを確立できるようにします。クライアント、認可サーバー、および Kafka ブローカー間の認証フローは、メカニズムごとに異なります。

可能な限り、OAUTHBEARER を使用するようにクライアントを設定することが推奨されます。OAUTHBEARER では、クライアントクレデンシャルは Kafka ブローカーと 共有されることがない ため、PLAIN よりも高レベルのセキュリティーが提供されます。OAUTHBEARER をサポートしない Kafka クライアントの場合のみ、PLAIN の使用を検討してください。

クライアントの接続に OAuth 2.0 認証を使用するように Kafka ブローカーリスナーを設定します。必要な場合は、同じ oauth リスナーで OAUTHBEARER および PLAIN メカニズムを使用できます。各メカニズムをサポートするプロパティーは、oauth リスナー設定で明示的に指定する必要があります。

OAUTHBEARER の概要

OAUTHBEARER を使用するには、Kafka ブローカーの OAuth 認証リスナー設定で sasl.enabled.mechanismsOAUTHBEARER に設定します。詳細な設定は、「OAuth 2.0 Kafka ブローカーの設定」 を参照してください。

listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER

また、多くの Kafka クライアントツールでは、プロトコルレベルで OAUTHBEARER の基本サポートを提供するライブラリーを使用します。Streams for Apache Kafka では、アプリケーションの開発を支援するために、アップストリームの Kafka Client Java ライブラリー用の OAuth コールバックハンドラー が提供されます (ただし、他のライブラリーは対象外)。そのため、独自のコールバックハンドラーを作成する必要はありません。アプリケーションクライアントはコールバックハンドラーを使用してアクセストークンを提供できます。Go などの他言語で書かれたクライアントは、カスタムコードを使用して認可サーバーに接続し、アクセストークンを取得する必要があります。

OAUTHBEARER を使用する場合、クライアントはクレデンシャルを交換するために Kafka ブローカーでセッションを開始します。ここで、クレデンシャルはコールバックハンドラーによって提供されるベアラートークンの形式を取ります。コールバックを使用して、以下の 3 つの方法のいずれかでトークンの提供を設定できます。

  • クライアント ID および Secret (OAuth 2.0 クライアントクレデンシャルメカニズム を使用)
  • 設定時に手動で取得された有効期限の長いアクセストークン
  • 設定時に手動で取得された有効期限の長い更新トークン
注記

OAUTHBEARER 認証は、プロトコルレベルで OAUTHBEARER メカニズムをサポートする Kafka クライアントでのみ使用できます。

PLAIN の概要

PLAIN を使用するには、PLAINsasl.enabled.mechanisms の値に追加します。

listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER,PLAIN

PLAIN は、すべての Kafka クライアントツールによって使用される簡単な認証メカニズムです。PLAIN を OAuth 2.0 認証で使用できるようにするために、Streams for Apache Kafka では、OAuth 2.0 over PLAIN サーバー側コールバックが提供されます。

OAUTHBEARER 認証が使用される場合と同様に、クライアントクレデンシャルは準拠した認可サーバーの背後で一元的に処理されます。OAuth 2.0 over PLAIN コールバックを併用する場合、以下のいずれかの方法を使用して Kafka クライアントは Kafka ブローカーで認証されます。

  • クライアント ID および Secret (OAuth 2.0 クライアントクレデンシャル メカニズムを使用)
  • 設定時に手動で取得された有効期限の長いアクセストークン

どちらの方法でも、クライアントは Kafka ブローカーにクレデンシャルを渡すために、PLAIN username および password プロパティーを提供する必要があります。クライアントはこれらのプロパティーを使用してクライアント ID およびシークレット、または、ユーザー名およびアクセストークンを渡します。

クライアント ID およびシークレットは、アクセストークンの取得に使用されます。

アクセストークンは、password プロパティーの値として渡されます。$accessToken: 接頭辞の有無に関わらずアクセストークンを渡します。

  • リスナー設定でトークンエンドポイント (oauth.token.endpoint.uri) を設定する場合は、接頭辞が必要です。
  • リスナー設定でトークンエンドポイント (oauth.token.endpoint.uri) を設定しない場合は、接頭辞は必要ありません。Kafka ブローカーは、パスワードを raw アクセストークンとして解釈します。

アクセストークンとして password が設定されている場合、username は Kafka ブローカーがアクセストークンから取得するプリンシパル名と同じものを設定する必要があります。oauth.username.claimoauth.fallback.username.claimoauth.fallback.username.prefix、および oauth.userinfo.endpoint.uri プロパティーを使用すると、リスナーにユーザー名の抽出オプションを指定できます。ユーザー名の抽出プロセスも、認可サーバーによって異なります。特に、クライアント ID をアカウント名にマッピングする方法により異なります。

注記

OAuth over PLAIN は、(非推奨の) OAuth 2.0 パスワード付与メカニズムを使用したユーザー名とパスワードの受け渡し (パスワード付与) をサポートしていません。

6.5.1.1. プロパティーまたは変数を使用した OAuth 2.0 の設定

OAuth 2.0 設定は、Java Authentication and Authorization Service (JAAS) プロパティーまたは環境変数を使用して設定できます。

  • JAAS のプロパティーは、server.properties 設定ファイルで設定され、listener.name.LISTENER-NAME.oauthbearer.sasl.jaas.config プロパティーのキーと値のペアとして渡されます。
  • 環境変数を使用する場合は、server.properties ファイルで listener.name.LISTENER-NAME.oauthbearer.sasl.jaas.config プロパティーを指定する必要がありますが、他の JAAS プロパティーを省略できます。

    大文字 (アッパーケース) の環境変数の命名規則を使用できます。

Streams for Apache Kafka OAuth 2.0 ライブラリーは、次の文字列で始まるプロパティーを使用します。

6.5.2. OAuth 2.0 Kafka ブローカーの設定

OAuth 2.0 認証の Kafka ブローカー設定には、以下が関係します。

  • 認可サーバーでの OAuth 2.0 クライアントの作成
  • Kafka クラスターでの OAuth 2.0 認証の設定
注記

認可サーバーに関連する Kafka ブローカーおよび Kafka クライアントはどちらも OAuth 2.0 クライアントと見なされます。

6.5.2.1. 認可サーバーの OAuth 2.0 クライアント設定

セッションの開始中に受信されたトークンを検証するように Kafka ブローカーを設定するには、認可サーバーで OAuth 2.0 の クライアント 定義を作成し、以下のクライアントクレデンシャルが有効な状態で 機密情報 として設定することが推奨されます。

  • kafka-broker のクライアント ID (例)
  • 認証メカニズムとしてのクライアント ID およびシークレット
注記

認可サーバーのパブリックでないイントロスペクションエンドポイントを使用する場合のみ、クライアント ID およびシークレットを使用する必要があります。高速のローカル JWT トークンの検証と同様に、パブリック認可サーバーのエンドポイントを使用する場合は通常、クレデンシャルは必要ありません。

6.5.2.2. Kafka クラスターでの OAuth 2.0 認証設定

Kafka クラスターで OAuth 2.0 認証を使用するには、Kafka server.properties ファイルで Kafka クラスターの OAuth 認証リスナー設定を有効にします。最小限の設定が必要です。また、TLS が inter-broker 通信に使用される TLS リスナーを設定することもできます。

以下の方法のいずれかを使用して、認可サーバーによるトークン検証用にブローカーを設定できます。

  • 高速のローカルトークン検証: 署名付き JWT 形式のアクセストークンと組み合わせた JWKS エンドポイント
  • イントロスペクション エンドポイント

OAUTHBEARER または PLAIN 認証、またはその両方を設定できます。

以下の例は、グローバル リスナー設定を適用する最小の設定を示しています。これは、inter-broker 通信がアプリケーションクライアントと同じリスナーを通過することを意味します。

この例では、sasl.enabled.mechanisms ではなく、listener.name.LISTENER-NAME.sasl.enabled.mechanisms を指定する特定のリスナーの OAuth 2.0 設定も示しています。LISTENER-NAME は、リスナーの大文字と小文字を区別しない名前です。ここでは、リスナー CLIENT という名前が付けられ、プロパティー名は listener.name.client.sasl.enabled.mechanisms になります。

この例では OAUTHBEARER 認証を使用します。

例: JWKS エンドポイントを使用した OAuth 2.0 認証の最小リスナー設定

sasl.enabled.mechanisms=OAUTHBEARER 1
listeners=CLIENT://0.0.0.0:9092 2
listener.security.protocol.map=CLIENT:SASL_PLAINTEXT 3
listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER 4
sasl.mechanism.inter.broker.protocol=OAUTHBEARER 5
inter.broker.listener.name=CLIENT 6
listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler 7
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ 8
  oauth.valid.issuer.uri="https://<oauth_server_address>" \ 9
  oauth.jwks.endpoint.uri="https://<oauth_server_address>/jwks" \ 10
  oauth.username.claim="preferred_username"  \ 11
  oauth.client.id="kafka-broker" \ 12
  oauth.client.secret="kafka-secret" \ 13
  oauth.token.endpoint.uri="https://<oauth_server_address>/token" ; 14
listener.name.client.oauthbearer.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler 15
listener.name.client.oauthbearer.connections.max.reauth.ms=3600000 16

1
SASL でのクレデンシャル交換に OAUTHBEARER メカニズムを有効にします。
2
接続するクライアントアプリケーションのリスナーを設定します。システム hostname はアドバタイズされたホスト名として使用されます。このホスト名は、再接続するためにクライアントが解決する必要があります。この例では、リスナーの名前は CLIENT です。
3
リスナーのチャネルプロトコルを指定します。SASL_SSL は TLS 用です。暗号化されていない接続 (TLS なし) には SASL_PLAINTEXT が使用されますが、TCP 接続層での盗聴のリスクがあります。
4
CLIENT リスナーの OAUTHBEARER メカニズムを指定します。クライアント名 (CLIENT) は通常、listeners プロパティーでは大文字、listener.name プロパティー (listener.name.client) では小文字、そして listener.name.client.* プロパティーの一部である場合は小文字で指定されます。
5
inter-broker 通信の OAUTHBEARER メカニズムを指定します。
6
ブローカー間通信のリスナーを指定します。仕様は、有効な設定のために必要です。
7
クライアントリスナーで OAuth 2.0 認証を設定します。
8
クライアントおよび inter-broker 通信の認証設定を行います。oauth.client.idoauth.client.secret、および auth.token.endpoint.uri プロパティーは inter-broker 設定に関連するものです。
9
有効な発行者 URI。この発行者が発行するアクセストークンのみが受け入れられます。例: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME
10
JWKS エンドポイント URL。例: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/certs
11
トークンの実際のユーザー名が含まれるトークン要求 (またはキー)。ユーザー名は、ユーザーの識別に使用される principal です。値は、使用される認証フローと認可サーバーによって異なります。必要に応じて、"['user.info'].['user.id']" のような JsonPath 式を使用して、トークン内のネストされた JSON 属性からユーザー名を取得できます。
12
すべてのブローカーで同じ Kafka ブローカーのクライアント ID。これは、kafka-broker として認可サーバーに登録されたクライアントです
13
Kafka ブローカーのシークレット (すべてのブローカーで同じ)。
14
認可サーバーへの OAuth 2.0 トークンエンドポイント URL。実稼働環境の場合は、常に https:// urls を使用してください。例: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/token
15
inter-broker 通信の OAuth2.0 認証を有効にします (inter-broker 通信の OAuth2.0 認証にのみ必要)。
16
(オプション): トークンの期限が切れるとセッションが強制的に期限切れとなり、また、Kafka の再認証メカニズム が有効になります。指定された値がアクセストークンの有効期限が切れるまでの残り時間よりも短い場合、クライアントは実際にトークンの有効期限が切れる前に再認証する必要があります。デフォルトでは、アクセストークンの期限が切れてもセッションは期限切れにならず、クライアントは再認証を試行しません。

以下の例は、TLS が inter-broker の通信に使用される TLS リスナーの最小設定を示しています。

例: OAuth 2.0 認証の TLS リスナー設定

listeners=REPLICATION://kafka:9091,CLIENT://kafka:9092 1
listener.security.protocol.map=REPLICATION:SSL,CLIENT:SASL_PLAINTEXT 2
listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER
inter.broker.listener.name=REPLICATION
listener.name.replication.ssl.keystore.password=<keystore_password> 3
listener.name.replication.ssl.truststore.password=<truststore_password>
listener.name.replication.ssl.keystore.type=JKS
listener.name.replication.ssl.truststore.type=JKS
listener.name.replication.ssl.secure.random.implementation=SHA1PRNG 4
listener.name.replication.ssl.endpoint.identification.algorithm=HTTPS 5
listener.name.replication.ssl.keystore.location=<path_to_keystore> 6
listener.name.replication.ssl.truststore.location=<path_to_truststore> 7
listener.name.replication.ssl.client.auth=required 8
listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ 9
  oauth.valid.issuer.uri="https://<oauth_server_address>" \
  oauth.jwks.endpoint.uri="https://<oauth_server_address>/jwks" \
  oauth.username.claim="preferred_username" ;

1
inter-broker 通信とクライアントアプリケーションには、個別の設定が必要です。
2
REPLICATION リスナーが TLS を使用し、CLIENT リスナーが暗号化されていないチャネルで SASL を使用するように設定します。実稼働環境では、クライアントは暗号化されたチャンネル (SASL_SSL) を使用できます。
3
ssl. プロパティーは TLS 設定を定義します。
4
乱数ジェネレーターの実装。設定されていない場合は、Java プラットフォーム SDK デフォルトが使用されます。
5
ホスト名の検証。空の文字列に設定すると、ホスト名の検証はオフになります。設定されていない場合、デフォルト値は HTTPS で、サーバー証明書のホスト名の検証を強制します。
6
リスナーのキーストアへのパス。
7
リスナーのトラストストアへのパス。
8
(inter-broker 接続に使用される) TLS 接続の確立時に REPLICATION リスナーのクライアントがクライアント証明書で認証する必要があることを指定します。
9
OAuth 2.0 の CLIENT リスナーを設定します。認可サーバーとの接続はセキュアな HTTPS 接続を使用する必要があります。

以下の例は、SASL でのクレデンシャル交換に PLAIN 認証メカニズムを使用した OAuth 2.0 認証の最小設定を示しています。高速なローカルトークン検証が使用されます。

例: PLAIN 認証の最小リスナー設定

listeners=CLIENT://0.0.0.0:9092 1
listener.security.protocol.map=CLIENT:SASL_PLAINTEXT 2
listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER,PLAIN 3
sasl.mechanism.inter.broker.protocol=OAUTHBEARER 4
inter.broker.listener.name=CLIENT 5
listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler 6
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ 7
  oauth.valid.issuer.uri="http://<auth_server>/auth/realms/<realm>" \ 8
  oauth.jwks.endpoint.uri="https://<auth_server>/auth/realms/<realm>/protocol/openid-connect/certs" \ 9
  oauth.username.claim="preferred_username"  \ 10
  oauth.client.id="kafka-broker" \ 11
  oauth.client.secret="kafka-secret" \ 12
  oauth.token.endpoint.uri="https://<oauth_server_address>/token" ; 13
listener.name.client.oauthbearer.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler 14
listener.name.client.plain.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.plain.JaasServerOauthOverPlainValidatorCallbackHandler 15
listener.name.client.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ 16
  oauth.valid.issuer.uri="https://<oauth_server_address>" \ 17
  oauth.jwks.endpoint.uri="https://<oauth_server_address>/jwks" \ 18
  oauth.username.claim="preferred_username"  \ 19
  oauth.token.endpoint.uri="http://<auth_server>/auth/realms/<realm>/protocol/openid-connect/token" ; 20
connections.max.reauth.ms=3600000 21

1
接続するクライアントアプリケーション用のリスナー (この例では CLIENT) を設定します。システム hostname はアドバタイズされたホスト名として使用されます。このホスト名は、再接続するためにクライアントが解決する必要があります。これは唯一の設定済みリスナーであるため、inter-broker 通信にも使用されます。
2
暗号化されていないチャネルで SASL を使用するように例の CLIENT リスナーを設定します。実稼働環境では、TCP 接続層での盗聴を避けるために、クライアントは暗号化チャンネル (SASL_SSL) を使用する必要があります。
3
SASL でのクレデンシャル交換の PLAIN 認証メカニズムおよび OAUTHBEARER を有効にします。inter-broker 通信に必要なため、OAUTHBEARER も指定されます。Kafka クライアントは、接続に使用するメカニズムを選択できます。
4
inter-broker 通信の OAUTHBEARER 認証メカニズムを指定します。
5
inter-broker 通信のリスナー (この例では CLIENT) を指定します。有効な設定のために必要です。
6
OAUTHBEARER メカニズムのサーバーコールバックハンドラーを設定します。
7
OAUTHBEARER メカニズムを使用して、クライアントおよび inter-broker 通信の認証設定を設定します。oauth.client.idoauth.client.secret、および oauth.token.endpoint.uri プロパティーは inter-broker 設定に関連するものです。
8
有効な発行者 URI。この発行者からのアクセストークンのみが受け入れられます。例: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME
9
JWKS エンドポイント URL。例: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/certs
10
トークンの実際のユーザー名が含まれるトークン要求 (またはキー)。ユーザー名は、ユーザーの識別に使用される principal です。値は、使用される認証フローと認可サーバーによって異なります。必要に応じて、"['user.info'].['user.id']" のような JsonPath 式を使用して、トークン内のネストされた JSON 属性からユーザー名を取得できます。
11
すべてのブローカーで同じ Kafka ブローカーのクライアント ID。これは、kafka-broker として認可サーバーに登録されたクライアントです
12
Kafka ブローカーの秘密 (すべてのブローカーで同じ)。
13
認可サーバーへの OAuth 2.0 トークンエンドポイント URL。実稼働環境の場合は、常に https:// urls を使用してください。例: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/token
14
inter-broker 通信に OAuth 2.0 認証を有効にします。
15
PLAIN 認証のサーバーコールバックハンドラーを設定します。
16
PLAIN 認証を使用して、クライアント通信の認証設定を設定します。

oauth.token.endpoint.uri は、OAuth 2.0 クライアントクレデンシャルメカニズム を使用して OAuth 2.0 over PLAIN を有効にする任意のプロパティーです。

17
有効な発行者 URI。この発行者からのアクセストークンのみが受け入れられます。例: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME
18
JWKS エンドポイント URL。例: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/certs
19
トークンの実際のユーザー名が含まれるトークン要求 (またはキー)。ユーザー名は、ユーザーの識別に使用される principal です。値は、使用される認証フローと認可サーバーによって異なります。必要に応じて、"['user.info'].['user.id']" のような JsonPath 式を使用して、トークン内のネストされた JSON 属性からユーザー名を取得できます。
20
認可サーバーへの OAuth 2.0 トークンエンドポイント URL。PLAIN メカニズムの追加設定。これが指定されている場合、クライアントは $accessToken: 接頭辞を使用してアクセストークンを password として渡すことで、PLAIN 経由で認証できます。

実稼働環境の場合は、常に https:// urls を使用してください。例: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/token

21
(オプション): トークンの期限が切れるとセッションが強制的に期限切れとなり、また、Kafka の再認証メカニズム が有効になります。指定された値がアクセストークンの有効期限が切れるまでの残り時間よりも短い場合、クライアントは実際にトークンの有効期限が切れる前に再認証する必要があります。デフォルトでは、アクセストークンの期限が切れてもセッションは期限切れにならず、クライアントは再認証を試行しません。

6.5.2.3. 高速なローカル JWT トークン検証の設定

高速なローカル JWT トークンの検証では、JWT トークンの署名がローカルでチェックされます。

ローカルチェックでは、トークンに対して以下が確認されます。

  • アクセストークンに Bearer の (typ) 要求値が含まれ、トークンがタイプに準拠することを確認します。
  • 有効 (期限切れでない) かどうかを確認します。
  • トークンに validIssuerURI と一致する発行元があることを確認します。

認可サーバーによって発行されなかったすべてのトークンが拒否されるよう、リスナーの設定時に 有効な発行者 URI を指定します。

高速のローカル JWT トークン検証の実行中に、認可サーバーの通信は必要はありません。OAuth 2.0 認可サーバーによって公開される JWK エンドポイント URI を指定して、高速のローカル JWT トークン検証をアクティベートします。エンドポイントには、署名済み JWT トークンの検証に使用される公開鍵が含まれます。これらは、Kafka クライアントによってクレデンシャルとして送信されます。

注記

認可サーバーとの通信はすべて HTTPS を使用して実行する必要があります。

TLS リスナーでは、証明書 トラストストア を設定し、トラストストアファイルをポイントできます。

高速なローカル JWT トークン検証のプロパティーの例

listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  oauth.valid.issuer.uri="https://<oauth_server_address>" \ 1
  oauth.jwks.endpoint.uri="https://<oauth_server_address>/jwks" \ 2
  oauth.jwks.refresh.seconds="300" \ 3
  oauth.jwks.refresh.min.pause.seconds="1" \ 4
  oauth.jwks.expiry.seconds="360" \ 5
  oauth.username.claim="preferred_username" \ 6
  oauth.ssl.truststore.location="<path_to_truststore_p12_file>" \ 7
  oauth.ssl.truststore.password="<truststore_password>" \ 8
  oauth.ssl.truststore.type="PKCS12" ; 9

1
有効な発行者 URI。この発行者が発行するアクセストークンのみが受け入れられます。例: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME
2
JWKS エンドポイント URL。例: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/certs
3
エンドポイントの更新間隔 (デフォルトは 300)。
4
JWKS 公開鍵の更新が連続して試行される間隔の最小一時停止時間 (秒単位)。不明な署名キーが検出されると、JWKS キーの更新は、最後に更新を試みてから少なくとも指定された期間は一時停止し、通常の定期スケジュール以外でスケジュールされます。キーの更新は指数バックオフのルールに従い、oauth.jwks.refresh.seconds に到達するまで、一時停止を増やして失敗した更新を再試行します。デフォルト値は 1 です。
5
JWK 証明書が期限切れになる前に有効とみなされる期間。デフォルトは 360 秒です。デフォルトよりも長い時間を指定する場合は、無効になった証明書へのアクセスが許可されるリスクを考慮してください。
6
トークンの実際のユーザー名が含まれるトークン要求 (またはキー)。ユーザー名は、ユーザーの識別に使用される principal です。値は、使用される認証フローと認可サーバーによって異なります。必要に応じて、"['user.info'].['user.id']" のような JsonPath 式を使用して、トークン内のネストされた JSON 属性からユーザー名を取得できます。
7
TLS 設定で使用されるトラストストアの場所。
8
トラストストアにアクセスするためのパスワード。
9
PKCS #12 形式のトラストストアタイプ。

6.5.2.4. OAuth 2.0 イントロスペクションエンドポイントの設定

OAuth 2.0 のイントロスペクションエンドポイントを使用したトークンの検証では、受信したアクセストークンは不透明として対処されます。Kafka ブローカーは、アクセストークンをイントロスペクションエンドポイントに送信します。このエンドポイントは、検証に必要なトークン情報を応答として返します。ここで重要なのは、特定のアクセストークンが有効である場合は最新情報を返すことで、トークンの有効期限に関する情報も返します。

OAuth 2.0 イントロスペクションベースの検証を設定するには、高速ローカル JWT トークン検証用に指定された JWK エンドポイント URI ではなく、イントロスペクションエンドポイント URI を指定します。通常、イントロスペクションエンドポイントは保護されているため、認可サーバーに応じて client ID および client secret を指定する必要があります。

イントロスペクションエンドポイントのプロパティー例

listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  oauth.introspection.endpoint.uri="https://<oauth_server_address>/introspection" \ 1
  oauth.client.id="kafka-broker" \ 2
  oauth.client.secret="kafka-broker-secret" \ 3
  oauth.ssl.truststore.location="<path_to_truststore_p12_file>" \ 4
  oauth.ssl.truststore.password="<truststore_password>" \ 5
  oauth.ssl.truststore.type="PKCS12" \ 6
  oauth.username.claim="preferred_username" ; 7

1
OAuth 2.0 イントロスペクションエンドポイント URI。例: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/token/introspect
2
Kafka ブローカーのクライアント ID。
3
Kafka ブローカーのシークレット。
4
TLS 設定で使用されるトラストストアの場所。
5
トラストストアにアクセスするためのパスワード。
6
PKCS #12 形式のトラストストアタイプ。
7
トークンの実際のユーザー名が含まれるトークン要求 (またはキー)。ユーザー名は、ユーザーの識別に使用される principal です。値は、使用される認証フローと認可サーバーによって異なります。必要に応じて、"['user.info'].['user.id']" のような JsonPath 式を使用して、トークン内のネストされた JSON 属性からユーザー名を取得できます。

6.5.3. Kafka ブローカーの再認証の設定

Kafka クライアントと Kafka ブローカー間の OAuth 2.0 セッションに Kafka session re-authentication を使用するように、OAuth リスナーを設定できます。このメカニズムは、定義された期間後に、クライアントとブローカー間の認証されたセッションを期限切れにします。セッションの有効期限が切れると、クライアントは既存の接続を破棄せずに再使用して、新しいセッションを即座に開始します。

セッションの再認証はデフォルトで無効になっています。これは、server.properties ファイルで有効にできます。SASL メカニズムとして OAUTHBEARER または PLAIN を有効にした TLS リスナーに connections.max.reauth.ms プロパティーを設定します。

リスナーごとにセッションの再認証を指定できます。以下に例を示します。

listener.name.client.oauthbearer.connections.max.reauth.ms=3600000

セッションの再認証は、クライアントによって使用される Kafka クライアントライブラリーによってサポートされる必要があります。

セッションの再認証は、高速ローカル JWT または イントロスペクションエンドポイント のトークン検証と共に使用できます。

クライアントの再認証

ブローカーの認証されたセッションが期限切れになると、クライアントは接続を切断せずに新しい有効なアクセストークンをブローカーに送信し、既存のセッションを再認証する必要があります。

トークンの検証に成功すると、既存の接続を使用して新しいクライアントセッションが開始されます。クライアントが再認証に失敗した場合、さらにメッセージを送受信しようとすると、ブローカーは接続を閉じます。ブローカーで再認証メカニズムが有効になっていると、Kafka クライアントライブラリー 2.2 以降を使用する Java クライアントが自動的に再認証されます。

セッションの再認証は、リフレッシュトークンが使用されている場合にも適用されます。セッションが期限切れになると、クライアントはリフレッシュトークンを使用してアクセストークンを更新します。その後、クライアントは新しいアクセストークンを使用して既存の接続に再認証されます。

OAUTHBEARER および PLAIN のセッションの有効期限

セッションの再認証が設定されている場合、OAUTHBEARER と PLAIN 認証ではセッションの有効期限は異なります。

クライアント ID とシークレット による方法を使用する OAUTHBEARER および PLAIN の場合:

  • ブローカーの認証されたセッションは、設定された connections.max.reauth.ms で期限切れになります。
  • アクセストークンが設定期間前に期限切れになると、セッションは設定期間前に期限切れになります。

有効期間の長いアクセストークン 方法を使用する PLAIN の場合:

  • ブローカーの認証されたセッションは、設定された connections.max.reauth.ms で期限切れになります。
  • アクセストークンが設定期間前に期限切れになると、再認証に失敗します。セッションの再認証は試行されますが、PLAIN にはトークンを更新するメカニズムがありません。

connections.max.reauth.ms設定されていない 場合は、再認証しなくても、OAUTHBEARER および PLAIN クライアントはブローカーへの接続を無期限に維持します。認証されたセッションは、アクセストークンの期限が切れても終了しません。ただし、keycloak 認可を使用したり、カスタム authorizer をインストールして、認可を設定する場合に考慮できます。

6.5.4. OAuth 2.0 Kafka クライアントの設定

Kafka クライアントは以下のいずれかで設定されます。

  • 認可サーバーから有効なアクセストークンを取得するために必要なクレデンシャル (クライアント ID およびシークレット)。
  • 認可サーバーから提供されたツールを使用して取得された、有効期限の長い有効なアクセストークンまたは更新トークン。

アクセストークンは、Kafka ブローカーに送信される唯一の情報です。アクセストークンを取得するために認可サーバーでの認証に使用されるクレデンシャルは、ブローカーに送信されません。

クライアントによるアクセストークンの取得後、認可サーバーと通信する必要はありません。

クライアント ID とシークレットを使用した認証が最も簡単です。有効期間の長いアクセストークンまたは更新トークンを使用すると、認可サーバーツールに追加の依存関係があるため、より複雑になります。

注記

有効期間が長いアクセストークンを使用している場合は、認可サーバーでクライアントを設定し、トークンの最大有効期間を長くする必要があります。

Kafka クライアントが直接アクセストークンで設定されていない場合、クライアントは認可サーバーと通信して Kafka セッションの開始中にアクセストークンのクレデンシャルを交換します。Kafka クライアントは以下のいずれかを交換します。

  • クライアント ID およびシークレット
  • クライアント ID、更新トークン、および (任意の) シークレット
  • ユーザー名とパスワード、およびクライアント ID と (オプションで) シークレット

6.5.5. OAuth 2.0 クライアント認証フロー

OAuth 2.0 認証フローは、基礎となる Kafka クライアントおよび Kafka ブローカー設定によって異なります。フローは、使用する認可サーバーによってもサポートされる必要があります。

Kafka ブローカーリスナー設定は、クライアントがアクセストークンを使用して認証する方法を決定します。クライアントはクライアント ID およびシークレットを渡してアクセストークンをリクエストできます。

リスナーが PLAIN 認証を使用するように設定されている場合、クライアントはクライアント ID およびシークレット、または、ユーザー名およびアクセストークンで認証できます。これらの値は PLAIN メカニズムの username および password プロパティーとして渡されます。

リスナー設定は、以下のトークン検証オプションをサポートします。

  • 認可サーバーと通信しない、JWT の署名確認およびローカルトークンのイントロスペクションをベースとした高速なローカルトークン検証を使用できます。認可サーバーは、トークンで署名を検証するために使用される公開証明書のある JWKS エンドポイントを提供します。
  • 認可サーバーが提供するトークンイントロスペクションエンドポイントへの呼び出しを使用することができます。新しい Kafka ブローカー接続が確立されるたびに、ブローカーはクライアントから受け取ったアクセストークンを認可サーバーに渡します。Kafka ブローカーは応答を確認して、トークンが有効かどうかを確認します。
注記

認可サーバーは不透明なアクセストークンの使用のみを許可する可能性があり、この場合はローカルトークンの検証は不可能です。

Kafka クライアントクレデンシャルは、以下のタイプの認証に対して設定することもできます。

  • 以前に生成された有効期間の長いアクセストークンを使用した直接ローカルアクセス
  • 新しいアクセストークンを発行するには、認可サーバーに連絡します (クライアント ID とシークレット、または更新トークン、またはユーザー名とパスワードを使用)。

6.5.5.1. SASL OAUTHBEARER メカニズムを使用したクライアント認証フローの例

SASL OAUTHBEARER メカニズムを使用して、Kafka 認証に以下の通信フローを使用できます。

クライアントがクライアント ID とシークレットを使用し、ブローカーが検証を認可サーバーに委任する場合

Client using client ID and secret with broker delegating validation to authorization server

  1. Kafka クライアントは、クライアント ID とシークレット、および必要に応じてリフレッシュトークンを使用して、認可サーバーからアクセストークンを要求します。または、クライアントはユーザー名とパスワードを使用して認証することもできます。
  2. 認可サーバーは新しいアクセストークンを生成します。
  3. Kafka クライアントは、SASL OAUTHBEARER メカニズムを使用してアクセストークンを渡すことで Kafka ブローカーで認証されます。
  4. Kafka ブローカーは、独自のクライアント ID およびシークレットを使用し、認可サーバーでトークンイントロスペクションエンドポイントを呼び出すことで、アクセストークンを検証します。
  5. トークンが有効な場合、Kafka クライアントセッションが確立されます。

クライアントがクライアント ID およびシークレットを使用し、ブローカーが高速のローカルトークン検証を実行する場合

Client using client ID and secret with broker performing fast local token validation

  1. Kafka クライアントは、クライアント ID とシークレット、および必要に応じてリフレッシュトークンを使用して、トークンエンドポイントから認可サーバーに対して認証を行います。または、クライアントはユーザー名とパスワードを使用して認証することもできます。
  2. 認可サーバーは新しいアクセストークンを生成します。
  3. Kafka クライアントは、SASL OAUTHBEARER メカニズムを使用してアクセストークンを渡すことで Kafka ブローカーで認証されます。
  4. Kafka ブローカーは、JWT トークン署名チェックおよびローカルトークンイントロスペクションを使用して、ローカルでアクセストークンを検証します。

クライアントが有効期限の長いアクセストークンを使用し、ブローカーが検証を認可サーバーに委任する場合

Client using long-lived access token with broker delegating validation to authorization server

  1. Kafka クライアントは、SASL OAUTHBEARER メカニズムを使用して、有効期限の長いアクセストークンを渡すために Kafka ブローカーで認証します。
  2. Kafka ブローカーは、独自のクライアント ID およびシークレットを使用して、認可サーバーでトークンイントロスペクションエンドポイントを呼び出し、アクセストークンを検証します。
  3. トークンが有効な場合、Kafka クライアントセッションが確立されます。

クライアントが有効期限の長いアクセストークンを使用し、ブローカーが高速のローカル検証を実行する場合

Client using long-lived access token with broker performing fast local validation

  1. Kafka クライアントは、SASL OAUTHBEARER メカニズムを使用して、有効期限の長いアクセストークンを渡すために Kafka ブローカーで認証します。
  2. Kafka ブローカーは、JWT トークン署名チェックおよびローカルトークンイントロスペクションを使用して、ローカルでアクセストークンを検証します。
警告

トークンが取り消された場合に認可サーバーとのチェックが行われないため、高速のローカル JWT トークン署名の検証は有効期限の短いトークンにのみ適しています。トークンの有効期限はトークンに書き込まれますが、失効はいつでも発生する可能性があるため、認可サーバーと通信せずに対応することはできません。発行されたトークンはすべて期限切れになるまで有効とみなされます。

6.5.5.2. SASL PLAIN メカニズムを使用したクライアント認証フローの例

OAuth PLAIN メカニズムを使用して、Kafka 認証に以下の通信フローを使用できます。

クライアントがクライアント ID およびシークレットを使用し、ブローカーがクライアントのアクセストークンを取得する場合

Client using a client ID and secret with the broker obtaining the access token for the client

  1. Kafka クライアントは、clientId をユーザー名として、secret をパスワードとして渡します。
  2. Kafka ブローカーは、トークンエンドポイントを使用して clientId および secret を認可サーバーに渡します。
  3. 認可サーバーは、新しいアクセストークンまたはエラー (クライアントクレデンシャルが有効でない場合) を返します。
  4. Kafka ブローカーは、以下のいずれかの方法でトークンを検証します。

    1. トークンイントロスペクションエンドポイントが指定されている場合、Kafka ブローカーは認可サーバーでエンドポイントを呼び出すことで、アクセストークンを検証します。トークンの検証に成功した場合には、セッションが確立されます。
    2. ローカルトークンのイントロスペクションが使用される場合、要求は認可サーバーに対して行われません。Kafka ブローカーは、JWT トークン署名チェックを使用して、アクセストークンをローカルで検証します。

クライアントが、クライアント ID およびシークレットなしで有効期限の長いアクセストークンを使用する場合

Client using a long-lived access token without a client ID and secret

  1. Kafka クライアントはユーザー名とパスワードを渡します。パスワードは、クライアントを実行する前に手動で取得および設定されたアクセストークンの値を提供します。
  2. Kafka ブローカーリスナーが認証のトークンエンドポイントで設定されているかどうかに応じて、$accessToken: 文字列の接頭辞の有無にかかわらず、パスワードは渡されます。

    1. トークンエンドポイントが設定されている場合、パスワードの前に $accessToken: を付け、password パラメーターにクライアントシークレットではなくアクセストークンが含まれていることをブローカーに知らせる必要があります。Kafka ブローカーは、ユーザー名をアカウントのユーザー名として解釈します。
    2. トークンエンドポイントが Kafka ブローカーリスナーで設定されていない場合 (no-client-credentials mode を強制)、パスワードは接頭辞なしでアクセストークンを提供する必要があります。Kafka ブローカーは、ユーザー名をアカウントのユーザー名として解釈します。このモードでは、クライアントはクライアント ID およびシークレットを使用せず、password パラメーターは常に raw アクセストークンとして解釈されます。
  3. Kafka ブローカーは、以下のいずれかの方法でトークンを検証します。

    1. トークンイントロスペクションエンドポイントが指定されている場合、Kafka ブローカーは認可サーバーでエンドポイントを呼び出すことで、アクセストークンを検証します。トークンの検証に成功した場合には、セッションが確立されます。
    2. ローカルトークンイントロスペクションが使用されている場合には、認可サーバーへの要求はありません。Kafka ブローカーは、JWT トークン署名チェックを使用して、アクセストークンをローカルで検証します。

6.5.6. OAuth 2.0 認証の設定

OAuth 2.0 は、Kafka クライアントと Streams for Apache Kafka コンポーネントとの対話に使用されます。

Streams for Apache Kafka に OAuth 2.0 を使用するには、以下を行う必要があります。

6.5.6.1. OAuth 2.0 認可サーバーとしての Red Hat Single Sign-On の設定

この手順では、Red Hat Single Sign-On を認可サーバーとしてデプロイし、Streams for Apache Kafka との統合用に設定する方法を説明します。

認可サーバーは、一元的な認証および認可の他、ユーザー、クライアント、およびパーミッションの一元管理を実現します。Red Hat Single Sign-On にはレルムの概念があります。レルム はユーザー、クライアント、パーミッション、およびその他の設定の個別のセットを表します。デフォルトの マスターレルム を使用できますが、新しいレルムを作成することもできます。各レルムは独自の OAuth 2.0 エンドポイントを公開します。そのため、アプリケーションクライアントとアプリケーションサーバーはすべて同じレルムを使用する必要があります。

Streams for Apache Kafka で OAuth 2.0 を使用するには、Red Hat Single Sign-On のデプロイメントを使用して認証レルムを作成および管理します。

注記

Red Hat Single Sign-On がすでにデプロイされている場合は、デプロイメントの手順を省略して、現在のデプロイメントを使用できます。

作業を開始する前に

Red Hat Single Sign-On を使用するための知識が必要です。

インストールおよび管理の手順は、以下を参照してください。

前提条件

  • Streams for Apache Kafka と Kafka が実行中である。

Red Hat Single Sign-On デプロイメントの場合:

手順

  1. Red Hat Single Sign-On をインストールします。

    ZIP ファイルから、または RPM を使用してインストールできます。

  2. Red Hat Single Sign-On 管理コンソールにログインして、Streams for Apache Kafka の OAuth 2.0 ポリシーを作成します。

    ログインの詳細は、Red Hat Single Sign-On のデプロイ時に提供されます。

  3. レルムを作成し、有効にします。

    既存のマスターレルムを使用できます。

  4. 必要に応じて、レルムのセッションおよびトークンのタイムアウトを調整します。
  5. kafka-broker というクライアントを作成します。
  6. Settings タブで以下を設定します。

    • Access TypeConfidential に設定します。
    • Standard Flow EnabledOFF に設定し、このクライアントからの Web ログインを無効にします。
    • Service Accounts EnabledON に設定し、このクライアントが独自の名前で認証できるようにします。
  7. 続行する前に Save クリックします。
  8. Credentials タブから、Streams for Apache Kafka のクラスター設定で使用するシークレットをメモします。
  9. Kafka ブローカーに接続するすべてのアプリケーションクライアントに対して、このクライアント作成手順を繰り返し行います。

    新しいクライアントごとに定義を作成します。

    設定では、名前をクライアント ID として使用します。

次のステップ

認可サーバーのデプロイおよび設定後に、Kafka ブローカーが OAuth 2.0 を使用するように設定 します。

6.5.6.2. Kafka ブローカーの OAuth 2.0 サポートの設定

この手順では、ブローカーリスナーが認可サーバーを使用して OAuth 2.0 認証を使用するように、Kafka ブローカーを設定する方法を説明します。

TLS リスナーを設定して、暗号化されたインターフェイスで OAuth 2.0 を使用することが推奨されます。プレーンリスナーは推奨されません。

選択した認可サーバーをサポートするプロパティーと、実装している認可のタイプを使用して、Kafka ブローカーを設定します。

作業を開始する前の注意事項

Kafka ブローカーリスナーの設定および認証に関する詳細は、以下を参照してください。

リスナー設定で使用されるプロパティーの説明は、以下を参照してください。

前提条件

手順

  1. server.properties ファイルで Kafka ブローカーリスナー設定を設定します。

    たとえば、OAUTHBEARER メカニズムを使用する場合:

    sasl.enabled.mechanisms=OAUTHBEARER
    listeners=CLIENT://0.0.0.0:9092
    listener.security.protocol.map=CLIENT:SASL_PLAINTEXT
    listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER
    sasl.mechanism.inter.broker.protocol=OAUTHBEARER
    inter.broker.listener.name=CLIENT
    listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler
    listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required ;
    listener.name.client.oauthbearer.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
  2. listener.name.client.oauthbearer.sasl.jaas.config の一部としてブローカーの接続設定を行います。

    この例では、接続設定オプションを示しています。

    例 1: JWKS エンドポイント設定を使用したローカルトークンの検証

    listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
      oauth.valid.issuer.uri="https://<oauth_server_address>/auth/realms/<realm_name>" \
      oauth.jwks.endpoint.uri="https://<oauth_server_address>/auth/realms/<realm_name>/protocol/openid-connect/certs" \
      oauth.jwks.refresh.seconds="300" \
      oauth.jwks.refresh.min.pause.seconds="1" \
      oauth.jwks.expiry.seconds="360" \
      oauth.username.claim="preferred_username" \
      oauth.ssl.truststore.location="<path_to_truststore_p12_file>" \
      oauth.ssl.truststore.password="<truststore_password>" \
      oauth.ssl.truststore.type="PKCS12" ;
    listener.name.client.oauthbearer.connections.max.reauth.ms=3600000

    例 2: OAuth 2.0 イントロスペクションエンドポイントを使用した認可サーバーへのトークン検証の委任

    listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
      oauth.introspection.endpoint.uri="https://<oauth_server_address>/auth/realms/<realm_name>/protocol/openid-connect/introspection" \
      # ...

  3. 必要な場合は、認可サーバーへのアクセスを設定します。

    この手順は通常、サービスメッシュ などの技術がコンテナーの外部でセキュアなチャネルを設定するために使用される場合を除き、実稼働環境で必要になります。

    1. セキュアな認可サーバーに接続するためのカスタムトラストストアを指定します。認可サーバーへのアクセスには SSL が常に必要になります。

      プロパティーを設定して、トラストストアを設定します。

      以下に例を示します。

      listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
        # ...
        oauth.client.id="kafka-broker" \
        oauth.client.secret="kafka-broker-secret" \
        oauth.ssl.truststore.location="<path_to_truststore_p12_file>" \
        oauth.ssl.truststore.password="<truststore_password>" \
        oauth.ssl.truststore.type="PKCS12" ;
    2. 証明書のホスト名がアクセス URL ホスト名と一致しない場合は、証明書のホスト名の検証をオフにできます。

      oauth.ssl.endpoint.identification.algorithm=""

      このチェックは、認可サーバーへのクライアント接続が認証されるようにします。実稼働以外の環境で検証をオフにすることもできます。

  4. 選択した認証フローに従って追加のプロパティーを設定します。

    listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
      # ...
      oauth.token.endpoint.uri="https://<oauth_server_address>/auth/realms/<realm_name>/protocol/openid-connect/token" \ 1
      oauth.custom.claim.check="@.custom == 'custom-value'" \ 2
      oauth.scope="<scope>" \ 3
      oauth.check.audience="true" \ 4
      oauth.audience="<audience>" \ 5
      oauth.valid.issuer.uri="https://https://<oauth_server_address>/auth/<realm_name>" \ 6
      oauth.client.id="kafka-broker" \ 7
      oauth.client.secret="kafka-broker-secret" \ 8
      oauth.connect.timeout.seconds=60 \ 9
      oauth.read.timeout.seconds=60 \ 10
      oauth.http.retries=2 \ 11
      oauth.http.retry.pause.millis=300 \ 12
      oauth.groups.claim="$.groups" \ 13
      oauth.groups.claim.delimiter="," \ 14
      oauth.include.accept.header="false" ; 15
    1
    認可サーバーへの OAuth 2.0 トークンエンドポイント URL。実稼働環境の場合は、常に https:// urls を使用してください。KeycloakAuthorizer を使用する場合、またはブローカー間通信に OAuth 2.0 が有効なリスナーを使用する場合に必要です。
    2
    (オプション) カスタムクレームチェック。検証時に追加のカスタムルールを JWT アクセストークンに適用する JsonPath フィルタークエリー。アクセストークンに必要なデータが含まれていないと拒否されます。イントロスペクション エンドポイントメソッドを使用する場合は、カスタムチェックがイントロスペクションエンドポイントの応答 JSON に適用されます。
    3
    (オプション) scope パラメーターがトークンエンドポイントに渡されます。スコープ は、ブローカー間認証用にアクセストークンを取得する場合に使用されます。また、clientIdsecret を使用した PLAIN クライアント認証の上にある OAuth 2.0 のクライアント名にも使われています。これは、認可サーバーに応じて、トークンの取得機能とトークンの内容のみに影響します。リスナーによるトークン検証ルールには影響しません。
    4
    (オプション) オーディエンスチェック。認可サーバーが aud (オーディエンス) クレームを提供していて、オーディエンスチェックを実施する場合は、ouath.check.audiencetrue に設定します。オーディエンスチェックによって、トークンの目的の受信者が特定されます。これにより、Kafka ブローカーは aud 要求に clientId を持たないトークンを拒否します。デフォルトは false です。
    5
    (オプション) トークンエンドポイントに渡される audience パラメーター。オーディエンス は、ブローカー間認証用にアクセストークンを取得する場合に使用されます。また、clientIdsecret を使用した PLAIN クライアント認証の上にある OAuth 2.0 のクライアント名にも使われています。これは、認可サーバーに応じて、トークンの取得機能とトークンの内容のみに影響します。リスナーによるトークン検証ルールには影響しません。
    6
    有効な発行者 URI。この発行者が発行するアクセストークンのみが受け入れられます。(常に必要です)
    7
    すべてのブローカーで同一の、Kafka ブローカーの設定されたクライアント ID。これは、kafka-broker として認可サーバーに登録されたクライアントです。イントロスペクションエンドポイントがトークンの検証に使用される場合、または KeycloakAuthorizer が使用される場合に必要です。
    8
    すべてのブローカーで同じ Kafka ブローカーに設定されたシークレット。ブローカーが認証サーバーに対して認証する必要がある場合は、クライアントシークレット、アクセストークン、またはリフレッシュトークンのいずれかを指定する必要があります。
    9
    (オプション) 認可サーバーへの接続時のタイムアウト (秒単位)。デフォルト値は 60 です。
    10
    (オプション): 認可サーバーへの接続時の読み取りタイムアウト (秒単位)。デフォルト値は 60 です。
    11
    認可サーバーへの失敗した HTTP リクエストを再試行する最大回数。デフォルト値は 0 で、再試行は実行されないことを意味します。このオプションを効果的に使用するには、oauth.connect.timeout.seconds オプションと oauth.read.timeout.seconds オプションのタイムアウト時間を短縮することを検討してください。ただし、再試行により現在のワーカースレッドが他のリクエストで利用できなくなる可能性があり、リクエストが多すぎると停止する場合、Kafka ブローカーが応答しなくなる可能性があることに注意してください。
    12
    認可サーバーへの失敗した HTTP リクエストの再試行を行うまでの待機時間。デフォルトでは、この時間はゼロに設定されており、一時停止は適用されません。これは、リクエストの失敗の原因となる問題の多くは、リクエストごとのネットワークの不具合やプロキシーの問題であり、すぐに解決できるためです。ただし、認可サーバーに負荷がかかっている場合、または高トラフィックが発生している場合は、このオプションを 100 ミリ秒以上の値に設定して、サーバーの負荷を軽減し、再試行が成功する可能性を高めることができます。
    13
    JWT トークンまたはイントロスペクションエンドポイントの応答からグループ情報を抽出するために使用される JsonPath クエリー。デフォルトでは設定されません。これは、カスタムオーソライザーがユーザーグループに基づいて認可の決定を行うために使用できます。
    14
    1 つのコンマ区切りの文字列として返されるときにグループ情報を解析するために使用される区切り文字。デフォルト値は ',' (コンマ) です。
    15
    (オプション) oauth.include.accept.headerfalse に設定して、リクエストから Accept ヘッダーを削除します。ヘッダーを含めることで認可サーバーとの通信時に問題が発生する場合は、この設定を使用できます。
  5. OAuth 2.0 認証の適用方法、および使用されている認証サーバーのタイプに応じて、設定を追加します。

    listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
      # ...
      oauth.check.issuer=false \ 1
      oauth.fallback.username.claim="<client_id>" \ 2
      oauth.fallback.username.prefix="<client_account>" \ 3
      oauth.valid.token.type="bearer" \ 4
      oauth.userinfo.endpoint.uri="https://<oauth_server_address>/auth/realms/<realm_name>/protocol/openid-connect/userinfo" ; 5
    1
    認可サーバーが iss クレームを提供しない場合は、発行者チェックを行うことができません。このような場合、oauth.check.issuerfalse に設定し、oauth.valid.issuer.uri を指定しないようにします。デフォルトは true です。
    2
    認可サーバーは、通常ユーザーとクライアントの両方を識別する単一の属性を提供しない場合があります。クライアントが独自の名前で認証される場合、サーバーによって クライアント ID が提供されることがあります。リフレッシュトークンまたはアクセストークンを取得するために、ユーザー名およびパスワードを使用してユーザーが認証される場合、サーバーによってクライアント ID の他に ユーザー名 が提供されることがあります。プライマリーユーザー ID 属性が使用できない場合は、このフォールバックオプションで、使用するユーザー名クレーム (属性) を指定します。必要に応じて、"['client.info'].['client.id']" のような JsonPath 式を使用して、トークン内のネストされた JSON 属性からフォールバックユーザー名を取得できます。
    3
    oauth.fallback.username.claim が適用される場合、ユーザー名クレームの値とフォールバックユーザー名クレームの値が競合しないようにする必要もあることがあります。producer というクライアントが存在し、producer という通常ユーザーも存在する場合について考えてみましょう。この 2 つを区別するには、このプロパティーを使用してクライアントのユーザー ID に接頭辞を追加します。
    4
    (oauth.introspection.endpoint.uri を使用する場合のみ該当): 使用している認証サーバーによっては、イントロスペクションエンドポイントによって トークンタイプ 属性が返されるかどうかはわからず、異なる値が含まれることがあります。イントロスペクションエンドポイントからの応答に含まれなければならない有効なトークンタイプ値を指定できます。
    5
    (oauth.introspection.endpoint.uri を使用する場合のみ該当): イントロスペクションエンドポイントの応答に識別可能な情報が含まれないように、認可サーバーが設定または実装されることがあります。ユーザー ID を取得するには、userinfo エンドポイントの URI をフォールバックとして設定します。oauth.fallback.username.claimoauth.fallback.username.claim、および oauth.fallback.username.prefix 設定が userinfo エンドポイントの応答に適用されます。

6.5.6.3. OAuth 2.0 を使用するための Kafka Java クライアントの設定

Kafka ブローカーとの対話に OAuth 2.0 を使用するように Kafka プロデューサー API とコンシューマー API を設定します。コールバックプラグインをクライアントの pom.xml ファイルに追加してから、OAuth 2.0 用にクライアントを設定します。

クライアント設定で次を指定します。

  • SASL (Simple Authentication and Security Layer) セキュリティープロトコル:

    • TLS 暗号化接続を介した認証用の SASL_SSL
    • 暗号化されていない接続を介した認証用の SASL_PLAINTEXT

      プロダクションには SASL_SSL を使用し、ローカル開発には SASL_PLAINTEXT のみを使用してください。SASL_SSL を使用する場合は、追加の ssl.truststore 設定が必要です。OAuth 2.0 認可サーバーへのセキュアな接続 (https://) には、トラストストア設定が必要です。OAuth 2.0 認可サーバーを確認するには、認可サーバーの CA 証明書をクライアント設定のトラストストアに追加します。トラストストアは、PEM または PKCS #12 形式で設定できます。

  • Kafka SASL メカニズム:

    • ベアラートークンを使用したクレデンシャル交換用の OAUTHBEARER
    • クライアントクレデンシャル (clientId + secret) またはアクセストークンを渡す PLAIN
  • SASL メカニズムを実装する JAAS (Java Authentication and Authorization Service) モジュール:

    • org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule は OAuthbearer メカニズムを実装します。
    • org.apache.kafka.common.security.plain.PlainLoginModule は plain メカニズムを実装します。

    OAuthbearer メカニズムを使用できるようにするには、io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler のカスタムクラスをコールバックハンドラーとして追加する必要もあります。JaasClientOauthLoginCallbackHandler は、クライアントのログイン中にアクセストークンの認可サーバーへの OAuth コールバックを処理します。これにより、トークンの自動更新が可能になり、ユーザーの介入なしで継続的な認証が保証されます。さらに、OAuth 2.0 パスワード付与方式を使用してクライアントのログイン認証情報を処理します。

  • 以下の認証方法をサポートする SASL 認証プロパティー:

    • OAuth 2.0 クライアントクレデンシャル
    • OAuth 2.0 パスワードグラント (非推奨)
    • アクセストークン
    • リフレッシュトークン

    SASL 認証プロパティーを JAAS 設定として追加します (sasl.jaas.config および sasl.login.callback.handler.class)。認証プロパティーを設定する方法は、OAuth 2.0 認可サーバーへのアクセスに使用している認証方法によって異なります。この手順では、プロパティーはプロパティーファイルで指定されてから、クライアント設定にロードされます。

注記

認証プロパティーを環境変数または Java システムプロパティーとして指定することもできます。Java システムプロパティーの場合は、setProperty を使用して設定し、-D オプションを使用してコマンドラインで渡すことができます。

前提条件

  • Streams for Apache Kafka と Kafka が実行中である。
  • OAuth 2.0 認可サーバーがデプロイされ、Kafka ブローカーへの OAuth のアクセスが設定されている。
  • Kafka ブローカーが OAuth 2.0 に対して設定されている。

手順

  1. OAuth 2.0 サポートのあるクライアントライブラリーを Kafka クライアントの pom.xml ファイルに追加します。

    <dependency>
     <groupId>io.strimzi</groupId>
     <artifactId>kafka-oauth-client</artifactId>
     <version>0.15.0.redhat-00007</version>
    </dependency>
  2. プロパティーファイルで以下の設定を指定して、クライアントプロパティーを設定します。

    • セキュリティープロトコル
    • SASL メカニズム
    • 使用されているメソッドに応じた JAAS モジュールと認証プロパティー

      たとえば、以下を client.properties ファイルに追加できます。

      クライアントクレデンシャルメカニズムのプロパティー

      security.protocol=SASL_SSL 1
      sasl.mechanism=OAUTHBEARER 2
      ssl.truststore.location=/tmp/truststore.p12 3
      ssl.truststore.password=$STOREPASS
      ssl.truststore.type=PKCS12
      sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
        oauth.token.endpoint.uri="<token_endpoint_url>" \ 4
        oauth.client.id="<client_id>" \ 5
        oauth.client.secret="<client_secret>" \ 6
        oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \ 7
        oauth.ssl.truststore.password="$STOREPASS" \ 8
        oauth.ssl.truststore.type="PKCS12" \ 9
        oauth.scope="<scope>" \ 10
        oauth.audience="<audience>" ; 11
      sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler

      1
      TLS 暗号化接続用の SASL_SSL セキュリティープロトコル。ローカル開発のみでは、暗号化されていない接続で SASL_PLAINTEXT を使用します。
      2
      OAUTHBEARER または PLAIN として指定された SASL メカニズム。
      3
      Kafka クラスターへのセキュアなアクセスのためのトラストストア設定。
      4
      認可サーバーのトークンエンドポイントの URI です。
      5
      クライアント ID。認可サーバーで クライアント を作成するときに使用される名前です。
      6
      認可サーバーで クライアント を作成するときに作成されるクライアントシークレット。
      7
      この場所には、認可サーバーの公開鍵証明書 (truststore.p12) が含まれています。
      8
      トラストストアにアクセスするためのパスワード。
      9
      トラストストアのタイプ。
      10
      (オプション): トークンエンドポイントからトークンを要求するための scope。認可サーバーでは、クライアントによるスコープの指定が必要になることがあります。
      11
      (オプション) トークンエンドポイントからトークンを要求するための audience。認可サーバーでは、クライアントによるオーディエンスの指定が必要になることがあります。

      パスワード付与メカニズムのプロパティー

      security.protocol=SASL_SSL
      sasl.mechanism=OAUTHBEARER
      ssl.truststore.location=/tmp/truststore.p12
      ssl.truststore.password=$STOREPASS
      ssl.truststore.type=PKCS12
      sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
        oauth.token.endpoint.uri="<token_endpoint_url>" \
        oauth.client.id="<client_id>" \ 1
        oauth.client.secret="<client_secret>" \ 2
        oauth.password.grant.username="<username>" \ 3
        oauth.password.grant.password="<password>" \ 4
        oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \
        oauth.ssl.truststore.password="$STOREPASS" \
        oauth.ssl.truststore.type="PKCS12" \
        oauth.scope="<scope>" \
        oauth.audience="<audience>" ;
      sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler

      1
      クライアント ID。認可サーバーで クライアント を作成するときに使用される名前です。
      2
      (オプション) 認可サーバーで クライアント を作成するときに作成されるクライアントシークレット。
      3
      パスワードグラント認証のユーザー名。OAuth パスワードグラント設定 (ユーザー名とパスワード) では、OAuth 2.0 パスワードグラント方式が使用されます。パスワードグラントを使用するには、権限が制限されたクライアントのユーザーアカウントを認可サーバー上に作成します。アカウントは、サービスアカウントのように機能する必要があります。認証にユーザーアカウントが必要な環境で使用しますが、先にリフレッシュトークンの使用を検討してください。
      4
      パスワード付与認証のパスワード。
      注記

      SASL PLAIN は、OAuth 2.0 パスワード付与メソッドを使用したユーザー名とパスワードの受け渡し (パスワード付与) をサポートしていません。

      アクセストークンのプロパティー

      security.protocol=SASL_SSL
      sasl.mechanism=OAUTHBEARER
      ssl.truststore.location=/tmp/truststore.p12
      ssl.truststore.password=$STOREPASS
      ssl.truststore.type=PKCS12
      sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
        oauth.token.endpoint.uri="<token_endpoint_url>" \
        oauth.access.token="<access_token>" \ 1
        oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \
        oauth.ssl.truststore.password="$STOREPASS" \
        oauth.ssl.truststore.type="PKCS12" ;
      sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler

      1
      Kafka クライアントの有効期間が長いアクセストークン。

      トークンのプロパティーを更新する

      security.protocol=SASL_SSL
      sasl.mechanism=OAUTHBEARER
      ssl.truststore.location=/tmp/truststore.p12
      ssl.truststore.password=$STOREPASS
      ssl.truststore.type=PKCS12
      sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
        oauth.token.endpoint.uri="<token_endpoint_url>" \
        oauth.client.id="<client_id>" \ 1
        oauth.client.secret="<client_secret>" \ 2
        oauth.refresh.token="<refresh_token>" \ 3
        oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \
        oauth.ssl.truststore.password="$STOREPASS" \
        oauth.ssl.truststore.type="PKCS12" ;
      sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler

      1
      クライアント ID。認可サーバーで クライアント を作成するときに使用される名前です。
      2
      (オプション) 認可サーバーで クライアント を作成するときに作成されるクライアントシークレット。
      3
      Kafka クライアントの有効期間が長い更新トークン。
  3. OAUTH 2.0 認証のクライアントプロパティーを Java クライアントコードに入力します。

    クライアントプロパティーの入力を示す例

    Properties props = new Properties();
    try (FileReader reader = new FileReader("client.properties", StandardCharsets.UTF_8)) {
      props.load(reader);
    }

  4. Kafka クライアントが Kafka ブローカーにアクセスできることを確認します。
Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.