7.2. OAuth 2.0 トークンベース認証の使用


Streams for Apache Kafka は、トークンベースの認証に OAuth 2.0 を使用することをサポートしています。OAuth 2.0 認可サーバーは、アクセスの付与とアクセスに関する問い合わせを処理します。Kafka クライアントは Kafka ブローカーに対して認証を行います。ブローカーとクライアントは、必要に応じて認可サーバーと通信し、アクセストークンを取得または検証します。

Streams for Apache Kafka のデプロイメントでは、OAuth 2.0 統合によって次のサポートが提供されます。

  • Kafka ブローカーのサーバー側 OAuth 2.0 認証
  • Kafka MirrorMaker、Kafka Connect、Kafka Bridge のクライアント側 OAuth 2.0 認証

RHEL 上の Streams for Apache Kafka には、2 つの OAuth 2.0 ライブラリーが含まれています。

kafka-oauth-client
io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler という名前のカスタムログインコールバックハンドラークラスを提供します。OAUTHBEARER 認証メカニズムを処理するには、Apache Kafka が提供する OAuthBearerLoginModule でログインコールバックハンドラーを使用します。
kafka-oauth-common
kafka-oauth-client ライブラリーに必要な機能の一部を提供するヘルパーライブラリーです。

提供されるクライアントライブラリーは、keycloak-corejackson-databind、および slf4j-api などの追加のサードパーティーライブラリーにも依存します。

Maven プロジェクトを使用してクライアントをパッケージ化し、すべての依存関係ライブラリーが含まれるようにすることが推奨されます。依存関係ライブラリーは今後のバージョンで変更される可能性があります。

7.2.1. リスナーでの OAuth 2.0 認証の設定

OAuth 2.0 認証を使用して Kafka ブローカーを保護するには、Kafka の server.properties ファイルで OAUth 2.0 認証とクライアント認証メカニズムを使用するように Kafka リスナーを設定し、認証で使用する認証メカニズムとトークン検証のタイプに応じてさらに設定を追加します。

最小限の設定が必要です。TLS リスナーを設定することもできます。TLS リスナーでは、ブローカー間の通信に TLS が使用されます。OAuth 2.0 認証と TLS 暗号化を併用することを推奨します。暗号化を行わないと、ネットワークの盗聴やトークンの盗難による不正アクセスに対して接続が脆弱になります。

認証のタイプを OAuth 2.0 として定義した場合は、検証のタイプ (高速なローカル JWT 検証またはイントロスペクションエンドポイントを使用したトークン検証) に基づいて設定を追加します。

SASL 認証メカニズムの有効化

クライアントがクレデンシャルを交換し、Kafka との認証済みセッションを確立できるように、次の SASL メカニズムの 1 つまたは両方を使用します。

OAUTHBEARER

OAUTHBEARER 認証メカニズムを使用すると、クレデンシャルの交換に、OAuth コールバックハンドラーによって提供されるベアラートークンが使用されます。次の方法を使用するトークンのプロビジョニングを設定できます。

  • クライアント ID とシークレット (OAuth 2.0 クライアントクレデンシャルメカニズム を使用)
  • クライアント ID とクライアントアサーション
  • 有効期間の長いアクセストークン
  • 手動で取得した有効期間の長いリフレッシュトークン

OAUTHBEARER は、PLAIN よりも高いレベルのセキュリティーを提供するため推奨されますが、プロトコルレベルで OAUTHBEARER メカニズムをサポートする Kafka クライアントでのみ使用できます。クライアントクレデンシャルが Kafka と共有されることはありません。

PLAIN

PLAIN は、すべての Kafka クライアントツールで使用されるシンプルな認証メカニズムです。OAUTHBEARER をサポートしていない Kafka クライアントでのみ PLAIN を使用することを検討してください。PLAIN 認証メカニズムを使用すると、次の方法を使用するようにクレデンシャルの交換を設定できます。

  • クライアント ID とシークレット (OAuth 2.0 クライアントクレデンシャルメカニズム を使用)
  • 有効期間の長いアクセストークン
    使用する方法に関係なく、クライアントは username および password プロパティーを Kafka に提供する必要があります。

クレデンシャルは、OAUTHBEARER 認証を使用する方法と同様に、準拠した認可サーバーの背後で一元的に処理されます。ユーザー名の抽出プロセスは、認可サーバーの設定によって異なります。

OAUTHBEARER メカニズムのリスナー設定の例

sasl.enabled.mechanisms=OAUTHBEARER 
1

listeners=CLIENT://0.0.0.0:9092 
2

listener.security.protocol.map=CLIENT:SASL_PLAINTEXT 
3

listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER 
4

sasl.mechanism.inter.broker.protocol=OAUTHBEARER 
5

inter.broker.listener.name=CLIENT 
6

listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler 
7

listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
  # ...
Copy to Clipboard

1
SASL でのクレデンシャル交換に OAUTHBEARER メカニズムを有効にします。
2
接続するクライアントアプリケーションのリスナーを設定します。システム hostname はアドバタイズされたホスト名として使用されます。このホスト名は、再接続するためにクライアントが解決する必要があります。この例では、リスナーの名前は CLIENT です。
3
リスナーのチャネルプロトコルを指定します。SASL_SSL は TLS 用です。暗号化されていない接続 (TLS なし) には SASL_PLAINTEXT が使用されますが、TCP 接続層での盗聴のリスクがあります。
4
CLIENT リスナーの OAUTHBEARER メカニズムを指定します。クライアント名 (CLIENT) は通常、listeners プロパティーでは大文字、listener.name プロパティー (listener.name.client) では小文字、そして listener.name.client.* プロパティーの一部である場合は小文字で指定されます。
5
ブローカー間通信の OAUTHBEARER メカニズムを指定します。
6
ブローカー間通信のリスナーを指定します。仕様は、有効な設定のために必要です。
7
クライアントリスナーで OAuth 2.0 認証を設定します。

プロパティーまたは変数を使用した OAuth 2.0 の設定

OAuth 2.0 設定は、Authentication and Authorization Service (JAAS) プロパティーまたは環境変数を使用して設定します。

  • JAAS のプロパティーは server.properties 設定ファイルで設定され、listener.name.<listener_name>.oauthbearer.sasl.jaas.config プロパティーのキーと値のペアとして渡されます。
  • 環境変数を使用する場合は、server.properties ファイルに listener.name.<listener_name>.oauthbearer.sasl.jaas.config プロパティーを指定する必要がありますが、他の JAAS プロパティーは省略できます。

    大文字 (アッパーケース) の環境変数の命名規則を使用できます。

Streams for Apache Kafka OAuth 2.0 ライブラリーは、次の文字列で始まるプロパティーを使用します。

高速なローカル JWT トークン検証の設定

高速なローカル JWT トークン検証では、JWT トークン署名をローカルでチェックし、トークンが次の基準を満たしていることを確認します。

  • トークンがアクセストークンであることを示す Bearer が、typ (タイプ) または token_type ヘッダークレームの値として含まれている。
  • 現在有効であり、期限切れでない。
  • validIssuerURI に一致する発行者を持つ。

リスナーの設定時に validIssuerURI 属性を指定することで、認可サーバーから発行されていないトークンは拒否されます。

高速のローカル JWT トークン検証の実行中に、認可サーバーの通信は必要はありません。OAuth 2.0 の認可サーバーによって公開されるエンドポイントの jwksEndpointUri 属性を指定して、高速のローカル JWT トークン検証をアクティベートします。エンドポイントには、署名済み JWT トークンの検証に使用される公開鍵が含まれます。これらは、Kafka クライアントによってクレデンシャルとして送信されます。

認可サーバーとの通信はすべて TLS による暗号化を使用して実行する必要があります。証明書のトラストストアを設定し、トラストストアのファイルを指定することができます。

JWT トークンからユーザー名を適切に取得するため、userNameClaim の設定を検討してください。必要に応じて、"['user.info'].['user.id']" のような JsonPath 式を使用して、トークン内のネストされた JSON 属性からユーザー名を取得できます。

Kafka ACL 認可を使用する場合は、認証時にユーザー名でユーザーを特定します。JWT トークンの sub 要求は、通常は一意な ID でユーザー名ではありません。

高速なローカル JWT トークン検証の設定例

# ...
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ 
1

  oauth.valid.issuer.uri="https://<auth_server_address>/<issuer-context>" \ 
2

  oauth.jwks.endpoint.uri="https://<oauth_server_address>/<path_to_jwks_endpoint>" \ 
3

  oauth.jwks.refresh.seconds="300" \ 
4

  oauth.jwks.refresh.min.pause.seconds="1" \ 
5

  oauth.jwks.expiry.seconds="360" \ 
6

  oauth.username.claim="preferred_username" \ 
7

  oauth.ssl.truststore.location="<path_to_truststore_p12_file>" \ 
8

  oauth.ssl.truststore.password="<truststore_password>" \ 
9

  oauth.ssl.truststore.type="PKCS12" ; 
10

listener.name.client.oauthbearer.connections.max.reauth.ms=3600000 
11
Copy to Clipboard

1
OAuth 2.0 の CLIENT リスナーを設定します。認可サーバーとの接続はセキュアな HTTPS 接続を使用する必要があります。
2
有効な発行者 URI。この発行者が発行するアクセストークンのみが受け入れられます。(常に必要です)
3
JWKS エンドポイント URL。
4
エンドポイントの更新間隔 (デフォルトは 300)。
5
JWKS 公開鍵の更新が連続して試行される間隔の最小一時停止時間 (秒単位)。不明な署名キーが検出されると、JWKS キーの更新は、最後に更新を試みてから少なくとも指定された期間は一時停止し、通常の定期スケジュール以外でスケジュールされます。キーの更新は指数バックオフのルールに従い、oauth.jwks.refresh.seconds に到達するまで、一時停止を増やして失敗した更新を再試行します。デフォルト値は 1 です。
6
JWK 証明書が期限切れになる前に有効とみなされる期間。デフォルトは 360 秒です。デフォルトよりも長い時間を指定する場合は、無効になった証明書へのアクセスが許可されるリスクを考慮してください。
7
トークンの実際のユーザー名が含まれるトークン要求 (またはキー)。ユーザー名は、ユーザーの識別に使用される principal です。値は、使用される認証フローと認可サーバーによって異なります。必要に応じて、"['user.info'].['user.id']" のような JsonPath 式を使用して、トークン内のネストされた JSON 属性からユーザー名を取得できます。
8
TLS 設定で使用されるトラストストアの場所。
9
トラストストアにアクセスするためのパスワード。
10
PKCS #12 形式のトラストストアタイプ。
11
(オプション): トークンの期限が切れるとセッションが強制的に期限切れとなり、また、Kafka の再認証メカニズム が有効になります。指定された値がアクセストークンの有効期限が切れるまでの残り時間よりも短い場合、クライアントは実際にトークンの有効期限が切れる前に再認証する必要があります。デフォルトでは、アクセストークンの期限が切れてもセッションは期限切れにならず、クライアントは再認証を試行しません。

イントロスペクションエンドポイントを使用したトークン検証の設定

OAuth 2.0 のイントロスペクションエンドポイントを使用したトークンの検証では、受信したアクセストークンは不透明として対処されます。Kafka ブローカーは、アクセストークンをイントロスペクションエンドポイントに送信します。このエンドポイントは、検証に必要なトークン情報を応答として返します。ここで重要なのは、特定のアクセストークンが有効である場合は最新情報を返すことで、トークンの有効期限に関する情報も返します。

OAuth 2.0 イントロスペクションベースの検証を設定するには、高速ローカル JWT トークン検証用に指定された JWK エンドポイント URI ではなく、イントロスペクションエンドポイント URI を指定します。通常、イントロスペクションエンドポイントは保護されているため、認可サーバーに応じて client ID および client secret を指定する必要があります。

イントロスペクションエンドポイントを使用したトークン検証設定の例

# ...
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  oauth.introspection.endpoint.uri="https://<oauth_server_address>/<introspection_endpoint>" \ 
1

  oauth.client.id="kafka-broker" \ 
2

  oauth.client.secret="kafka-broker-secret" \ 
3

  oauth.ssl.truststore.location="<path_to_truststore_p12_file>" \ 
4

  oauth.ssl.truststore.password="<truststore_password>" \ 
5

  oauth.ssl.truststore.type="PKCS12" \ 
6

  oauth.username.claim="preferred_username" ; 
7
Copy to Clipboard

1
トークンイントロスペクションエンドポイントの URI。
2
Kafka ブローカーのクライアント ID。
3
Kafka ブローカーのシークレット。
4
TLS 設定で使用されるトラストストアの場所。
5
トラストストアにアクセスするためのパスワード。
6
PKCS #12 形式のトラストストアタイプ。
7
トークンの実際のユーザー名が含まれるトークン要求 (またはキー)。ユーザー名は、ユーザーの識別に使用される principal です。値は、使用される認証フローと認可サーバーによって異なります。必要に応じて、"['user.info'].['user.id']" のような JsonPath 式を使用して、トークン内のネストされた JSON 属性からユーザー名を取得できます。

認可サーバーの保護されたエンドポイントへのブローカーの認証

通常、認可サーバーの証明書エンドポイント (oauth.jwks.endpoint.uri) はパブリックにアクセス可能ですが、イントロスペクションエンドポイント (oauth.introspection.endpoint.uri) は保護されています。ただし、これは認可サーバーの設定によって異なる場合があります。

Kafka ブローカーは、HTTP 認証スキームを使用して、次のどちらかの方法で、認可サーバーの保護されたエンドポイントに対して認証できます。

  • HTTP Basic 認証 でクライアント ID とシークレットを使用する。
  • HTTP ベアラー認証 でベアラートークンを使用する。

HTTP Basic 認証を設定するには、次のプロパティーを設定します。

  • oauth.client.id
  • oauth.client.secret

HTTP ベアラー認証の場合は、次のいずれかのプロパティーを設定します。

  • oauth.server.bearer.token.location: ベアラートークンを含むディスク上のファイルパスを指定します。
  • oauth.server.bearer.token: ベアラートークンをクリアテキストで指定します。

追加の設定オプションの指定

認証要件と使用している認可サーバーに応じて追加の設定を指定します。これらのプロパティーの一部は、特定の認証メカニズムにのみ適用されるか、他のプロパティーと組み合わせて使用される場合にのみ適用されます。

たとえば、PLAIN メカニズムで OAUth を使用する場合、アクセストークンが、$accessToken: 接頭辞の有無にかかわらず、password プロパティー値として渡されます。

  • リスナー設定でトークンエンドポイント (oauth.token.endpoint.uri) を設定する場合、接頭辞が必要です。
  • リスナー設定でトークンエンドポイントを設定しない場合、接頭辞は必要ありません。Kafka ブローカーは、パスワードを raw アクセストークンとして解釈します。

アクセストークンとして password が設定されている場合、username は Kafka ブローカーがアクセストークンから取得するプリンシパル名と同じものを設定する必要があります。oauth.username.claimoauth.username.prefixoauth.fallback.username.claimoauth.fallback.username.prefix、および oauth.userinfo.endpoint.uri プロパティーを使用すると、リスナーでユーザー名抽出オプションを指定できます。ユーザー名の抽出プロセスも、認可サーバーによって異なります。特に、クライアント ID をアカウント名にマッピングする方法により異なります。

注記

PLAIN メカニズムはパスワードグラント認証をサポートしていません。認証には、クライアントクレデンシャル (クライアント ID + シークレット) またはアクセストークンのいずれかを使用してください。

追加の設定の例

listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  # ...
  oauth.token.endpoint.uri="https://<auth_server_address>/<path_to_token_endpoint>" \ 
1

  oauth.custom.claim.check="@.custom == 'custom-value'" \ 
2

  oauth.scope="<scope>" \ 
3

  oauth.check.audience="true" \ 
4

  oauth.audience="<audience>" \ 
5

  oauth.client.id="kafka-broker" \ 
6

  oauth.client.secret="kafka-broker-secret" \ 
7

  oauth.connect.timeout.seconds=60 \ 
8

  oauth.read.timeout.seconds=60 \ 
9

  oauth.http.retries=2 \ 
10

  oauth.http.retry.pause.millis=300 \ 
11

  oauth.groups.claim="$.groups" \ 
12

  oauth.groups.claim.delimiter="," \ 
13

  oauth.include.accept.header="false" ; 
14

  oauth.check.issuer=false \ 
15

  oauth.username.prefix="user-account-" \ 
16

  oauth.fallback.username.claim="client_id" \ 
17

  oauth.fallback.username.prefix="service-account-" \ 
18

  oauth.valid.token.type="bearer" \ 
19

  oauth.userinfo.endpoint.uri="https://<auth_server_address>/<path_to_userinfo_endpoint>" ; 
20
Copy to Clipboard

1
認可サーバーへの OAuth 2.0 トークンエンドポイント URL。実稼働環境では、常に https:// URL を使用してください。KeycloakAuthorizer を使用する場合、またはブローカー間通信に OAuth 2.0 が有効なリスナーを使用する場合に必要です。
2
(オプション) カスタムクレームチェック。検証時に追加のカスタムルールを JWT アクセストークンに適用する JsonPath フィルタークエリー。アクセストークンに必要なデータが含まれていないと拒否されます。イントロスペクション エンドポイントメソッドを使用する場合は、カスタムチェックがイントロスペクションエンドポイントの応答 JSON に適用されます。
3
(オプション) scope パラメーターがトークンエンドポイントに渡されます。スコープ は、ブローカー間認証用にアクセストークンを取得する場合に使用されます。また、clientIdsecret を使用した PLAIN クライアント認証の上にある OAuth 2.0 のクライアント名にも使われています。これは、認可サーバーに応じて、トークンの取得機能とトークンの内容のみに影響します。リスナーによるトークン検証ルールには影響しません。
4
(オプション) オーディエンスチェック。認可サーバーが aud (オーディエンス) クレームを提供していて、オーディエンスチェックを実施する場合は、ouath.check.audiencetrue に設定します。オーディエンスチェックによって、トークンの目的の受信者が特定されます。これにより、Kafka ブローカーは aud 要求に clientId を持たないトークンを拒否します。デフォルトは false です。
5
(オプション) トークンエンドポイントに渡される audience パラメーター。オーディエンス は、ブローカー間認証用にアクセストークンを取得する場合に使用されます。また、clientIdsecret を使用した PLAIN クライアント認証の上にある OAuth 2.0 のクライアント名にも使われています。これは、認可サーバーに応じて、トークンの取得機能とトークンの内容のみに影響します。リスナーによるトークン検証ルールには影響しません。
6
すべてのブローカーで同一の、Kafka ブローカーの設定されたクライアント ID。これは、kafka-broker として認可サーバーに登録されたクライアントです。イントロスペクションエンドポイントがトークンの検証に使用される場合、または KeycloakAuthorizer が使用される場合に必要です。
7
すべてのブローカーで同じ Kafka ブローカーに設定されたシークレット。ブローカーが認証サーバーに対して認証する必要がある場合は、クライアントシークレット、アクセストークン、またはリフレッシュトークンのいずれかを指定する必要があります。
8
(オプション) 認可サーバーへの接続時のタイムアウト (秒単位)。デフォルト値は 60 です。
9
(オプション): 認可サーバーへの接続時の読み取りタイムアウト (秒単位)。デフォルト値は 60 です。
10
認可サーバーへの失敗した HTTP リクエストを再試行する最大回数。デフォルト値は 0 で、再試行は実行されないことを意味します。このオプションを効果的に使用するには、oauth.connect.timeout.seconds オプションと oauth.read.timeout.seconds オプションのタイムアウト時間を短縮することを検討してください。ただし、再試行により現在のワーカースレッドが他のリクエストで利用できなくなる可能性があり、リクエストが多すぎると停止する場合、Kafka ブローカーが応答しなくなる可能性があることに注意してください。
11
認可サーバーへの失敗した HTTP リクエストの再試行を行うまでの待機時間。デフォルトでは、この時間はゼロに設定されており、一時停止は適用されません。これは、リクエストの失敗の原因となる問題の多くは、リクエストごとのネットワークの不具合やプロキシーの問題であり、すぐに解決できるためです。ただし、認可サーバーに負荷がかかっている場合、または高トラフィックが発生している場合は、このオプションを 100 ミリ秒以上の値に設定して、サーバーの負荷を軽減し、再試行が成功する可能性を高めることができます。
12
JWT トークンまたはイントロスペクションエンドポイントの応答からグループ情報を抽出するために使用される JsonPath クエリー。デフォルトでは設定されません。これは、カスタムオーソライザーがユーザーグループに基づいて認可の決定を行うために使用できます。
13
1 つのコンマ区切りの文字列として返されるときにグループ情報を解析するために使用される区切り文字。デフォルト値は ',' (コンマ) です。
14
(オプション) oauth.include.accept.headerfalse に設定して、リクエストから Accept ヘッダーを削除します。ヘッダーを含めることで認可サーバーとの通信時に問題が発生する場合は、この設定を使用できます。
15
認可サーバーが iss クレームを提供しない場合は、発行者チェックを行うことができません。このような場合、oauth.check.issuerfalse に設定し、oauth.valid.issuer.uri を指定しないようにします。デフォルトは true です。
16
ユーザー ID を構成するときに使用する接頭辞。これは、oauth.username.claim が設定されている場合にのみ有効になります。
17
認可サーバーは、通常ユーザーとクライアントの両方を識別する単一の属性を提供しない場合があります。クライアントが独自の名前で認証される場合、サーバーによって クライアント ID 属性が提供されることがあります。リフレッシュトークンまたはアクセストークンを取得するために、ユーザー名およびパスワードを使用してユーザーが認証される場合、サーバーによってクライアント ID の他に ユーザー名 が提供されることがあります。プライマリーユーザー ID 属性が使用できない場合は、このフォールバックオプションで、使用するユーザー名クレーム (属性) を指定します。必要に応じて、"['client.info'].['client.id']" のような JsonPath 式を使用して、トークン内のネストされた JSON 属性からフォールバックユーザー名を取得できます。
18
oauth.fallback.username.claim が適用される場合、ユーザー名クレームの値とフォールバックユーザー名クレームの値が競合しないようにする必要もあることがあります。producer というクライアントが存在し、producer という通常ユーザーも存在する場合について考えてみましょう。この 2 つを区別するには、このプロパティーを使用してクライアントのユーザー ID に接頭辞を追加します。
19
(oauth.introspection.endpoint.uri を使用する場合のみ該当): 使用している認証サーバーによっては、イントロスペクションエンドポイントによって トークンタイプ 属性が返されるかどうかはわからず、異なる値が含まれることがあります。イントロスペクションエンドポイントからの応答に含まれなければならない有効なトークンタイプ値を指定できます。
20
(oauth.introspection.endpoint.uri を使用する場合のみ該当): イントロスペクションエンドポイントの応答に識別可能な情報が含まれないように、認可サーバーが設定または実装されることがあります。ユーザー ID を取得するには、userinfo エンドポイントの URI をフォールバックとして設定します。oauth.username.claimoauth.username.prefixoauth.fallback.username.claim、および oauth.fallback.username.prefix 設定は、userinfo エンドポイントの応答にも適用されます。

ブローカー間通信のリスナーの設定

次の例では、ブローカー間通信がアプリケーションクライアントと同じリスナーを通過する最小設定で、高速トークン検証に OAUTHBEARER メカニズムを使用します。

oauth.client.idoauth.client.secret、および auth.token.endpoint.uri プロパティーは、ブローカー間の通信に関連しています。

OAUTHBEARER メカニズムを使用したブローカー間設定の例

sasl.enabled.mechanisms=OAUTHBEARER
listeners=CLIENT://0.0.0.0:9092
listener.security.protocol.map=CLIENT:SASL_PLAINTEXT
listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER
sasl.mechanism.inter.broker.protocol=OAUTHBEARER
inter.broker.listener.name=CLIENT
listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ 
1

  oauth.valid.issuer.uri="https://<auth_server_address>/<issuer-context>" \
  oauth.jwks.endpoint.uri="https://<oauth_server_address>/<path_to_jwks_endpoint>" \
  oauth.username.claim="preferred_username"  \
  oauth.client.id="kafka-broker" \ 
2

  oauth.client.secret="kafka-secret" \ 
3

  oauth.token.endpoint.uri="https://<oauth_server_address>/<token_endpoint>" ; 
4

listener.name.client.oauthbearer.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler 
5

listener.name.client.oauthbearer.connections.max.reauth.ms=3600000
Copy to Clipboard

1
クライアントおよびブローカー間通信の認証設定を指定します。
2
すべてのブローカーで同じ Kafka ブローカーのクライアント ID。これは、kafka-broker として認可サーバーに登録されたクライアントです
3
Kafka ブローカーのシークレット (すべてのブローカーで同じ)。
4
認可サーバーへの OAuth 2.0 トークンエンドポイント URL。実稼働環境では、常に https:// URL を使用してください。
5
ブローカー間通信の OAuth 2.0 認証を有効にします (この認証にのみ必要です)。

次の例は、ブローカー間通信に使用される TLS リスナーの最小設定を示しています。

TLS を使用したブローカー間設定の例

sasl.enabled.mechanisms=OAUTHBEARER
listeners=REPLICATION://kafka:9091,CLIENT://kafka:9092 
1

listener.security.protocol.map=REPLICATION:SSL,CLIENT:SASL_PLAINTEXT 
2

listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER
inter.broker.listener.name=REPLICATION
listener.name.replication.ssl.keystore.password=<keystore_password> 
3

listener.name.replication.ssl.truststore.password=<truststore_password>
listener.name.replication.ssl.keystore.type=JKS
listener.name.replication.ssl.truststore.type=JKS
listener.name.replication.ssl.secure.random.implementation=SHA1PRNG 
4

listener.name.replication.ssl.endpoint.identification.algorithm=HTTPS 
5

listener.name.replication.ssl.keystore.location=<path_to_keystore> 
6

listener.name.replication.ssl.truststore.location=<path_to_truststore> 
7

listener.name.replication.ssl.client.auth=required 
8

listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  oauth.valid.issuer.uri="https://<auth_server_address>/<issuer-context>" \
  oauth.jwks.endpoint.uri="https://<oauth_server_address>/<path_to_jwks_endpoint>" \
  oauth.username.claim="preferred_username" ;
Copy to Clipboard

1
ブローカー間通信とクライアントアプリケーションには、個別の設定が必要です。
2
REPLICATION リスナーが TLS を使用し、CLIENT リスナーが暗号化されていないチャネルで SASL を使用するように設定します。実稼働環境では、クライアントは暗号化されたチャンネル (SASL_SSL) を使用できます。
3
ssl. プロパティーは TLS 設定を定義します。
4
乱数ジェネレーターの実装。設定されていない場合は、Java プラットフォーム SDK デフォルトが使用されます。
5
ホスト名の検証。空の文字列に設定すると、ホスト名の検証はオフになります。設定されていない場合、デフォルト値は HTTPS で、サーバー証明書のホスト名の検証を強制します。
6
リスナーのキーストアへのパス。
7
リスナーのトラストストアへのパス。
8
(ブローカー間接続に使用される) TLS 接続の確立時に REPLICATION リスナーのクライアントがクライアント証明書で認証する必要があることを指定します。

次の例では、ブローカー間通信がアプリケーションクライアントと同じリスナーを通過する最小設定で、高速トークン検証に PLAIN メカニズムを使用します。

PLAIN メカニズムを使用したブローカー間設定の例

listeners=CLIENT://0.0.0.0:9092
listener.security.protocol.map=CLIENT:SASL_PLAINTEXT
listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER,PLAIN
sasl.mechanism.inter.broker.protocol=OAUTHBEARER
inter.broker.listener.name=CLIENT
listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  oauth.valid.issuer.uri="https:<auth_server_address>/<issuer-context>" \
  oauth.jwks.endpoint.uri="https://<auth_server>/<path_to_jwks_endpoint>" \
  oauth.username.claim="preferred_username"  \
  oauth.client.id="kafka-broker" \
  oauth.client.secret="kafka-secret" \
  oauth.token.endpoint.uri="https://<oauth_server_address>/<token_endpoint>" ;
listener.name.client.oauthbearer.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler 
1

listener.name.client.plain.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.plain.JaasServerOauthOverPlainValidatorCallbackHandler 
2

listener.name.client.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ 
3

  oauth.valid.issuer.uri="https://<auth_server_address>/<issuer-context>" \
  oauth.jwks.endpoint.uri="https://<oauth_server_address>/<path_to_jwks_endpoint>" \
  oauth.username.claim="preferred_username"  \
  oauth.token.endpoint.uri="https://<oauth_server_address>/<token_endpoint>" ; 
4

listener.name.client.oauthbearer.connections.max.reauth.ms=3600000
Copy to Clipboard

1
ブローカー間通信の OAuth 2.0 認証を有効にします。
2
PLAIN 認証のサーバーコールバックハンドラーを設定します。
3
PLAIN 認証を使用して、クライアント通信の認証設定を設定します。oauth.token.endpoint.uri は、OAuth 2.0 クライアントクレデンシャルメカニズム を使用して OAuth 2.0 over PLAIN を有効にする任意のプロパティーです。
4
認可サーバーへの OAuth 2.0 トークンエンドポイント URL。これが指定されている場合、クライアントは $accessToken: 接頭辞を使用してアクセストークンを password として渡すことで、PLAIN 経由で認証できます。

7.2.2. クライアントアプリケーションでの OAuth 2.0 の設定

クライアントアプリケーションで OAuth 2.0 を設定するには、以下を指定する必要があります。

  • SASL (Simple Authentication and Security Layer) セキュリティープロトコル
  • SASL メカニズム
  • JAAS (Java Authentication and Authorization Service) モジュール
  • 認可サーバーにアクセスするための認証プロパティー

SASL プロトコルの設定

クライアント設定で SASL プロトコルを指定します。

  • TLS 暗号化接続を介した認証用の SASL_SSL
  • 暗号化されていない接続を介した認証用の SASL_PLAINTEXT

プロダクションには SASL_SSL を使用し、ローカル開発には SASL_PLAINTEXT のみを使用してください。

SASL_SSL を使用する場合は、追加の ssl.truststore 設定が必要です。OAuth 2.0 認可サーバーへのセキュアな接続 (https://) には、トラストストア設定が必要です。OAuth 2.0 認可サーバーを確認するには、認可サーバーの CA 証明書をクライアント設定のトラストストアに追加します。トラストストアは、PEM または PKCS #12 形式で設定できます。

SASL 認証メカニズムの設定

クライアント設定で SASL メカニズムを指定します。

  • ベアラートークンを使用したクレデンシャル交換用の OAUTHBEARER
  • クライアントクレデンシャル (clientId + secret) またはアクセストークンを渡す PLAIN

JAAS モジュールの設定

SASL 認証メカニズムを実装する JAAS モジュールを sasl.jaas.config プロパティーの値として指定します。

  • org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModuleOAUTHBEARER メカニズムを実装します。
  • org.apache.kafka.common.security.plain.PlainLoginModulePLAIN メカニズムを実装します。
注記

OAUTHBEARER メカニズムの場合、Streams for Apache Kafka は、クレデンシャルの交換を可能にするために、Kafka クライアント Java ライブラリーを使用するクライアントにコールバックハンドラーを提供します。他の言語のクライアントの場合、アクセストークンを取得するためにカスタムコードが必要になる場合があります。PLAIN メカニズムの場合、Streams for Apache Kafka は、クレデンシャルの交換を可能にするために、サーバー側のコールバックを提供します。

OAUTHBEARER メカニズムを使用できるようにするには、コールバックハンドラーとしてカスタムの io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler クラスも追加する必要があります。JaasClientOauthLoginCallbackHandler は、クライアントのログイン中にアクセストークンの認可サーバーへの OAuth コールバックを処理します。これにより、トークンの自動更新が可能になり、ユーザーの介入なしで継続的な認証が保証されます。さらに、OAuth 2.0 パスワードグラント方式を使用してクライアントのログインクレデンシャルを処理します。

認証プロパティーの設定

OAuth 2.0 認証にクレデンシャルまたはアクセストークンを使用するようにクライアントを設定します。

クライアントクレデンシャルの使用
クライアントクレデンシャルを使用するには、認可サーバーから有効なアクセストークンを取得するために必要なクレデンシャル (クライアント ID とシークレットまたはクライアント ID とクライアントアサーション) を使用してクライアントを設定する必要があります。これは最も単純なメカニズムです。
アクセストークンの使用
アクセストークンを使用すると、クライアントが、認可サーバーから取得した有効な有効期間の長いアクセストークンまたはリフレッシュトークンで設定されます。アクセストークンを使用すると、認可サーバーツールへ依存が増すため、複雑さが増します。有効期間が長いアクセストークンを使用している場合は、認可サーバーでクライアントを設定し、トークンの最大有効期間を長くする必要があります。

Kafka に送信される情報はアクセストークンだけです。トークンの取得に使用されるクレデンシャルは、Kafka に送信されることはありません。クライアントによるアクセストークンの取得後、認可サーバーと通信する必要はありません。

SASL 認証プロパティーは、次の認証方法をサポートします。

  • OAuth 2.0 クライアントクレデンシャル
  • アクセストークンまたはサービスアカウントトークン
  • リフレッシュトークン
  • OAuth 2.0 パスワードグラント (非推奨)

認証プロパティーを JAAS 設定 (sasl.jaas.config および sasl.login.callback.handler.class) として追加します。

クライアントアプリケーションがアクセストークンを使用して直接設定されていない場合、クライアントは Kafka セッションの開始時に、次のいずれかのクレデンシャルセットをアクセストークンと交換します。

  • クライアント ID およびシークレット
  • クライアント ID とクライアントアサーション
  • クライアント ID、リフレッシュトークン、および (必要に応じて) シークレット
  • ユーザー名とパスワード、およびクライアント ID と (必要に応じて) シークレット
注記

認証プロパティーを環境変数または Java システムプロパティーとして指定することもできます。Java システムプロパティーの場合は、setProperty を使用して設定し、-D オプションを使用してコマンドラインで渡すことができます。

クライアントシークレットを使用したクライアントクレデンシャルの設定例

security.protocol=SASL_SSL 
1

sasl.mechanism=OAUTHBEARER 
2

ssl.truststore.location=/tmp/truststore.p12 
3

ssl.truststore.password=$STOREPASS
ssl.truststore.type=PKCS12
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  oauth.token.endpoint.uri="<token_endpoint_url>" \ 
4

  oauth.client.id="<client_id>" \ 
5

  oauth.client.secret="<client_secret>" \ 
6

  oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \ 
7

  oauth.ssl.truststore.password="$STOREPASS" \ 
8

  oauth.ssl.truststore.type="PKCS12" \ 
9

  oauth.scope="<scope>" \ 
10

  oauth.audience="<audience>" ; 
11

sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
Copy to Clipboard

1
TLS 暗号化接続用の SASL_SSL セキュリティープロトコル。ローカル開発のみでは、暗号化されていない接続で SASL_PLAINTEXT を使用します。
2
OAUTHBEARER または PLAIN として指定された SASL メカニズム。
3
Kafka クラスターへのセキュアなアクセスのためのトラストストア設定。
4
認可サーバーのトークンエンドポイントの URI です。
5
クライアント ID。認可サーバーで クライアント を作成するときに使用される名前です。
6
認可サーバーで クライアント を作成するときに作成されるクライアントシークレット。
7
この場所には、認可サーバーの公開鍵証明書 (truststore.p12) が含まれています。
8
トラストストアにアクセスするためのパスワード。
9
トラストストアのタイプ。
10
(オプション): トークンエンドポイントからトークンを要求するための scope。認可サーバーでは、クライアントによるスコープの指定が必要になることがあります。
11
(オプション) トークンエンドポイントからトークンを要求するための audience。認可サーバーでは、クライアントによるオーディエンスの指定が必要になることがあります。

クライアントアサーションを使用したクライアントクレデンシャルの設定例

security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
ssl.truststore.location=/tmp/truststore.p12
ssl.truststore.password=$STOREPASS
ssl.truststore.type=PKCS12
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  oauth.token.endpoint.uri="<token_endpoint_url>" \
  oauth.client.id="<client_id>" \
  oauth.client.assertion.location="<path_to_client_assertion_token_file>" \ 
1

  oauth.client.assertion.type="urn:ietf:params:oauth:client-assertion-type:jwt-bearer" \ 
2

  oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \
  oauth.ssl.truststore.password="$STOREPASS" \
  oauth.ssl.truststore.type="PKCS12" \
  oauth.scope="<scope>" \
  oauth.audience="<audience>" ;
sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
Copy to Clipboard

1
クライアントの認証に使用するクライアントアサーションファイルへのパス。このファイルは、クライアントシークレットの代替となる秘密鍵ファイルです。または、oauth.client.assertion オプションを使用して、クライアントアサーション値をクリアテキストで指定します。
2
(オプション) 場合によっては、クライアントアサーションタイプを指定する必要があります。指定されていない場合、デフォルト値は urn:ietf:params:oauth:client-assertion-type:jwt-bearer です。

パスワードグラントの設定例

security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
ssl.truststore.location=/tmp/truststore.p12
ssl.truststore.password=$STOREPASS
ssl.truststore.type=PKCS12
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  oauth.token.endpoint.uri="<token_endpoint_url>" \
  oauth.client.id="<client_id>" \ 
1

  oauth.client.secret="<client_secret>" \ 
2

  oauth.password.grant.username="<username>" \ 
3

  oauth.password.grant.password="<password>" \ 
4

  oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \
  oauth.ssl.truststore.password="$STOREPASS" \
  oauth.ssl.truststore.type="PKCS12" \
  oauth.scope="<scope>" \
  oauth.audience="<audience>" ;
sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
Copy to Clipboard

1
クライアント ID。認可サーバーで クライアント を作成するときに使用される名前です。
2
(オプション) 認可サーバーで クライアント を作成するときに作成されるクライアントシークレット。
3
パスワードグラント認証のユーザー名。OAuth パスワードグラント設定 (ユーザー名とパスワード) では、OAuth 2.0 パスワードグラント方式が使用されます。パスワードグラントを使用するには、権限が制限されたクライアントのユーザーアカウントを認可サーバー上に作成します。アカウントは、サービスアカウントのように機能する必要があります。認証にユーザーアカウントが必要な環境で使用しますが、先にリフレッシュトークンの使用を検討してください。
4
パスワードグラント認証のパスワード。
注記

SASL PLAIN は、OAuth 2.0 パスワードグラント方式を使用したユーザー名とパスワードの受け渡し (パスワードグラント) をサポートしていません。

アクセストークンの設定例

security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
ssl.truststore.location=/tmp/truststore.p12
ssl.truststore.password=$STOREPASS
ssl.truststore.type=PKCS12
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  oauth.access.token="<access_token>" ; 
1

sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
Copy to Clipboard

1
Kafka クライアントの有効期間が長いアクセストークン。または、oauth.access.token.location を使用して、アクセストークンを含むファイルを指定することもできます。

OpenShift サービスアカウントトークンの設定例

security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
ssl.truststore.location=/tmp/truststore.p12
ssl.truststore.password=$STOREPASS
ssl.truststore.type=PKCS12
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  oauth.access.token.location="/var/run/secrets/kubernetes.io/serviceaccount/token";  
1

sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
Copy to Clipboard

1
ファイルシステム上のサービスアカウントトークンの場所 (クライアントが OpenShift Pod としてデプロイされていると想定)

リフレッシュトークンの設定例

security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
ssl.truststore.location=/tmp/truststore.p12
ssl.truststore.password=$STOREPASS
ssl.truststore.type=PKCS12
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  oauth.token.endpoint.uri="<token_endpoint_url>" \
  oauth.client.id="<client_id>" \ 
1

  oauth.client.secret="<client_secret>" \ 
2

  oauth.refresh.token="<refresh_token>" \ 
3

  oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \
  oauth.ssl.truststore.password="$STOREPASS" \
  oauth.ssl.truststore.type="PKCS12" ;
sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
Copy to Clipboard

1
クライアント ID。認可サーバーで クライアント を作成するときに使用される名前です。
2
(オプション) 認可サーバーで クライアント を作成するときに作成されるクライアントシークレット。
3
Kafka クライアントの有効期間が長いリフレッシュトークン。

7.2.3. OAuth 2.0 クライアント認証フロー

OAuth 2.0 認証フローは、基礎となる Kafka クライアントおよび Kafka ブローカー設定によって異なります。フローは、使用する認可サーバーによってもサポートされる必要があります。

Kafka ブローカーリスナー設定は、クライアントがアクセストークンを使用して認証する方法を決定します。クライアントはクライアント ID およびシークレットを渡してアクセストークンをリクエストできます。

リスナーが PLAIN 認証を使用するように設定されている場合、クライアントはクライアント ID とシークレット、またはユーザー名とアクセストークンを使用して認証できます。これらの値は、PLAIN メカニズムの username および password プロパティーとして渡されます。

リスナー設定は、以下のトークン検証オプションをサポートします。

  • 認可サーバーと通信しない、JWT の署名確認およびローカルトークンのイントロスペクションをベースとした高速なローカルトークン検証を使用できます。認可サーバーは、トークンで署名を検証するために使用される公開証明書のある JWKS エンドポイントを提供します。
  • 認可サーバーが提供するトークンイントロスペクションエンドポイントへの呼び出しを使用することができます。新しい Kafka ブローカー接続が確立されるたびに、ブローカーはクライアントから受け取ったアクセストークンを認可サーバーに渡します。Kafka ブローカーは応答を確認して、トークンが有効かどうかを確認します。
注記

認可サーバーは不透明なアクセストークンの使用のみを許可する可能性があり、この場合はローカルトークンの検証は不可能です。

Kafka クライアントクレデンシャルは、以下のタイプの認証に対して設定することもできます。

  • 以前に生成された有効期間の長いアクセストークンを使用した直接ローカルアクセス
  • 新しいアクセストークンを発行するために認可サーバーに問い合わせる (クライアント ID およびクレデンシャル、またはリフレッシュトークン、またはユーザー名とパスワードを使用)

7.2.3.1. SASL OAUTHBEARER メカニズムを使用したクライアント認証フローの例

SASL OAUTHBEARER メカニズムを使用して、Kafka 認証に以下の通信フローを使用できます。

クライアントがクライアント ID およびクレデンシャルを使用し、ブローカーが認可サーバーに検証を委譲する場合

Client using client ID and secret with broker delegating validation to authorization server

  1. Kafka クライアントは、クライアント ID およびクレデンシャル、および必要に応じてリフレッシュトークンを使用して、認可サーバーからアクセストークンを要求します。または、クライアントはユーザー名とパスワードを使用して認証することもできます。
  2. 認可サーバーは新しいアクセストークンを生成します。
  3. Kafka クライアントは、SASL OAUTHBEARER メカニズムを使用して、有効期限の長いアクセストークンを渡すために Kafka ブローカーで認証します。
  4. Kafka ブローカーは、独自のクライアント ID およびシークレットを使用し、認可サーバーでトークンイントロスペクションエンドポイントを呼び出すことで、アクセストークンを検証します。
  5. トークンが有効な場合、Kafka クライアントセッションが確立されます。

クライアントがクライアント ID およびクレデンシャルを使用し、ブローカーが高速なローカルトークン検証を実行する場合

Client using client ID and credentials with broker performing fast local token validation

  1. Kafka クライアントは、クライアント ID およびクレデンシャル、および必要に応じてリフレッシュトークンを使用して、トークンエンドポイントから認可サーバーに対して認証を行います。または、クライアントはユーザー名とパスワードを使用して認証することもできます。
  2. 認可サーバーは新しいアクセストークンを生成します。
  3. Kafka クライアントは、SASL OAUTHBEARER メカニズムを使用して、有効期限の長いアクセストークンを渡すために Kafka ブローカーで認証します。
  4. Kafka ブローカーは、JWT トークン署名チェックおよびローカルトークンイントロスペクションを使用して、ローカルでアクセストークンを検証します。

クライアントが有効期限の長いアクセストークンを使用し、ブローカーが検証を認可サーバーに委任する場合

Client using long-lived access token with broker delegating validation to authorization server

  1. Kafka クライアントは、SASL OAUTHBEARER メカニズムを使用して、有効期限の長いアクセストークンを渡すために Kafka ブローカーで認証します。
  2. Kafka ブローカーは、独自のクライアント ID およびシークレットを使用して、認可サーバーでトークンイントロスペクションエンドポイントを呼び出し、アクセストークンを検証します。
  3. トークンが有効な場合、Kafka クライアントセッションが確立されます。

クライアントが有効期限の長いアクセストークンを使用し、ブローカーが高速のローカル検証を実行する場合

Client using long-lived access token with broker performing fast local validation

  1. Kafka クライアントは、SASL OAUTHBEARER メカニズムを使用して、有効期限の長いアクセストークンを渡すために Kafka ブローカーで認証します。
  2. Kafka ブローカーは、JWT トークン署名チェックおよびローカルトークンイントロスペクションを使用して、ローカルでアクセストークンを検証します。
警告

トークンが取り消された場合に認可サーバーとのチェックが行われないため、高速のローカル JWT トークン署名の検証は有効期限の短いトークンにのみ適しています。トークンの有効期限はトークンに書き込まれますが、失効はいつでも発生する可能性があるため、認可サーバーと通信せずに対応することはできません。発行されたトークンはすべて期限切れになるまで有効とみなされます。

7.2.3.2. SASL PLAIN メカニズムを使用したクライアント認証フローの例

OAuth PLAIN メカニズムを使用して、Kafka 認証に以下の通信フローを使用できます。

クライアントがクライアント ID およびシークレットを使用し、ブローカーがクライアントのアクセストークンを取得する場合

Client using a client ID and secret with the broker obtaining the access token for the client

  1. Kafka クライアントは、clientId をユーザー名として、secret をパスワードとして渡します。
  2. Kafka ブローカーは、トークンエンドポイントを使用して clientId および secret を認可サーバーに渡します。
  3. 認可サーバーは、新しいアクセストークンまたはエラー (クライアントクレデンシャルが有効でない場合) を返します。
  4. Kafka ブローカーは、以下のいずれかの方法でトークンを検証します。

    1. トークンイントロスペクションエンドポイントが指定されている場合、Kafka ブローカーは認可サーバーでエンドポイントを呼び出すことで、アクセストークンを検証します。トークンの検証に成功した場合には、セッションが確立されます。
    2. ローカルトークンのイントロスペクションが使用される場合、要求は認可サーバーに対して行われません。Kafka ブローカーは、JWT トークン署名チェックを使用して、アクセストークンをローカルで検証します。

クライアントが、クライアント ID およびシークレットなしで有効期限の長いアクセストークンを使用する場合

Client using a long-lived access token without a client ID and secret

  1. Kafka クライアントはユーザー名とパスワードを渡します。パスワードは、クライアントを実行する前に手動で取得および設定されたアクセストークンの値を提供します。
  2. Kafka ブローカーリスナーが認証のトークンエンドポイントで設定されているかどうかに応じて、$accessToken: 文字列の接頭辞の有無にかかわらず、パスワードは渡されます。

    1. トークンエンドポイントが設定されている場合、パスワードの前に $accessToken: を付け、password パラメーターにクライアントシークレットではなくアクセストークンが含まれていることをブローカーに知らせる必要があります。Kafka ブローカーは、ユーザー名をアカウントのユーザー名として解釈します。
    2. トークンエンドポイントが Kafka ブローカーリスナーで設定されていない場合 (no-client-credentials mode を強制)、パスワードは接頭辞なしでアクセストークンを提供する必要があります。Kafka ブローカーは、ユーザー名をアカウントのユーザー名として解釈します。このモードでは、クライアントはクライアント ID およびシークレットを使用せず、password パラメーターは常に raw アクセストークンとして解釈されます。
  3. Kafka ブローカーは、以下のいずれかの方法でトークンを検証します。

    1. トークンイントロスペクションエンドポイントが指定されている場合、Kafka ブローカーは認可サーバーでエンドポイントを呼び出すことで、アクセストークンを検証します。トークンの検証に成功した場合には、セッションが確立されます。
    2. ローカルトークンイントロスペクションが使用されている場合には、認可サーバーへの要求はありません。Kafka ブローカーは、JWT トークン署名チェックを使用して、アクセストークンをローカルで検証します。

7.2.4. セッションの再認証

Kafka クライアントと Kafka ブローカー間の OAuth 2.0 セッションに Kafka session re-authentication を使用するように、OAuth リスナーを設定できます。このメカニズムは、定義された期間後に、クライアントとブローカー間の認証されたセッションを期限切れにします。セッションの有効期限が切れると、クライアントは既存の接続を破棄せずに再使用して、新しいセッションを即座に開始します。

セッションの再認証はデフォルトで無効になっています。これを有効にするには、server.properties ファイルの connections.max.reauth.ms プロパティーに時間の値を設定します。設定例については、「リスナーでの OAuth 2.0 認証の設定」 を参照してください。

セッションの再認証は、クライアントによって使用される Kafka クライアントライブラリーによってサポートされる必要があります。

セッションの再認証は、高速ローカル JWT または イントロスペクションエンドポイント のトークン検証と共に使用できます。

クライアントの再認証

ブローカーの認証されたセッションが期限切れになると、クライアントは接続を切断せずに新しい有効なアクセストークンをブローカーに送信し、既存のセッションを再認証する必要があります。

トークンの検証に成功すると、既存の接続を使用して新しいクライアントセッションが開始されます。クライアントが再認証に失敗した場合、さらにメッセージを送受信しようとすると、ブローカーは接続を閉じます。ブローカーで再認証メカニズムが有効になっていると、Kafka クライアントライブラリー 2.2 以降を使用する Java クライアントが自動的に再認証されます。

セッションの再認証は、リフレッシュトークンが使用されている場合にも適用されます。セッションが期限切れになると、クライアントはリフレッシュトークンを使用してアクセストークンを更新します。その後、クライアントは新しいアクセストークンを使用して既存の接続に再認証されます。

OAUTHBEARER および PLAIN のセッションの有効期限

セッションの再認証が設定されている場合、OAUTHBEARER と PLAIN 認証ではセッションの有効期限は異なります。

クライアント ID とシークレット による方法を使用する OAUTHBEARER および PLAIN の場合:

  • ブローカーの認証されたセッションは、設定された connections.max.reauth.ms で期限切れになります。
  • アクセストークンが設定期間前に期限切れになると、セッションは設定期間前に期限切れになります。

有効期間の長いアクセストークン 方法を使用する PLAIN の場合:

  • ブローカーの認証されたセッションは、設定された connections.max.reauth.ms で期限切れになります。
  • アクセストークンが設定期間前に期限切れになると、再認証に失敗します。セッションの再認証は試行されますが、PLAIN にはトークンを更新するメカニズムがありません。

connections.max.reauth.ms設定されていない 場合は、再認証しなくても、OAUTHBEARER および PLAIN クライアントはブローカーへの接続を無期限に維持します。認証されたセッションは、アクセストークンの期限が切れても終了しません。

ただし、keycloak 認可を使用したり、カスタム authorizer をインストールして、認可を設定する場合に考慮できます。

7.2.5. 例: OAuth 2.0 認証の有効化

この例では、OAUth 2.0 認証を使用して Kafka クラスターへのクライアントアクセスを設定する方法を示します。この手順では、Kafka リスナーと Kafka Java クライアントで OAuth 2.0 認証をセットアップするために必要な設定について説明します。

7.2.5.1. Kafka ブローカーの OAuth 2.0 サポートの設定

この手順では、ブローカーリスナーが認可サーバーを使用して OAuth 2.0 認証を使用するように、Kafka ブローカーを設定する方法を説明します。

TLS リスナーを設定して、暗号化されたインターフェイスで OAuth 2.0 を使用することが推奨されます。プレーンリスナーは推奨されません。

選択した認可サーバーをサポートするプロパティーと、実装している認可のタイプを使用して、Kafka ブローカーを設定します。

前提条件

手順

  1. server.properties ファイルで Kafka ブローカーリスナー設定を設定します。

    たとえば、OAUTHBEARER メカニズムを使用する場合:

    sasl.enabled.mechanisms=OAUTHBEARER
    listeners=CLIENT://0.0.0.0:9092
    listener.security.protocol.map=CLIENT:SASL_PLAINTEXT
    listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER
    sasl.mechanism.inter.broker.protocol=OAUTHBEARER
    inter.broker.listener.name=CLIENT
    listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler
    listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required ;
    listener.name.client.oauthbearer.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
    Copy to Clipboard
  2. listener.name.client.oauthbearer.sasl.jaas.config の一部としてブローカーの接続設定を行います。

  3. 必要な場合は、認可サーバーへのアクセスを設定します。

    この手順は通常、サービスメッシュ などの技術がコンテナーの外部でセキュアなチャネルを設定するために使用される場合を除き、実稼働環境で必要になります。

    1. セキュアな認可サーバーに接続するためのカスタムトラストストアを指定します。認可サーバーへのアクセスには SSL が常に必要になります。

      プロパティーを設定して、トラストストアを設定します。

      以下に例を示します。

      listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
        # ...
        oauth.client.id="kafka-broker" \
        oauth.client.secret="kafka-broker-secret" \
        oauth.ssl.truststore.location="<path_to_truststore_p12_file>" \
        oauth.ssl.truststore.password="<truststore_password>" \
        oauth.ssl.truststore.type="PKCS12" ;
      Copy to Clipboard
    2. 証明書のホスト名がアクセス URL ホスト名と一致しない場合は、証明書のホスト名の検証をオフにできます。

      oauth.ssl.endpoint.identification.algorithm=""
      Copy to Clipboard

      このチェックは、認可サーバーへのクライアント接続が認証されるようにします。実稼働以外の環境で検証をオフにすることもできます。

7.2.5.2. Kafka Java クライアントで OAuth 2.0 を設定する

Kafka ブローカーとの対話に OAuth 2.0 を使用するように Kafka プロデューサー API とコンシューマー API を設定します。コールバックプラグインをクライアントの pom.xml ファイルに追加してから、OAuth 2.0 用にクライアントを設定します。

認証プロパティーを設定する方法は、OAuth 2.0 認可サーバーへのアクセスに使用している認証方法によって異なります。この手順では、プロパティーはプロパティーファイルで指定されてから、クライアント設定にロードされます。

前提条件

  • Streams for Apache Kafka と Kafka が実行中である。
  • OAuth 2.0 認可サーバーがデプロイされ、Kafka ブローカーへの OAuth のアクセスが設定されている。
  • Kafka ブローカーが OAuth 2.0 に対して設定されている。

手順

  1. OAuth 2.0 サポートのあるクライアントライブラリーを Kafka クライアントの pom.xml ファイルに追加します。

    <dependency>
     <groupId>io.strimzi</groupId>
     <artifactId>kafka-oauth-client</artifactId>
     <version>0.15.0.redhat-00010</version>
    </dependency>
    Copy to Clipboard
  2. OAuth 2.0 認証方法に応じてクライアントを設定します。

    たとえば、client.properties ファイルで認証方法のプロパティーを指定します。

  3. OAUTH 2.0 認証のクライアントプロパティーを Java クライアントコードに入力します。

    クライアントプロパティーの入力を示す例

    Properties props = new Properties();
    try (FileReader reader = new FileReader("client.properties", StandardCharsets.UTF_8)) {
      props.load(reader);
    }
    Copy to Clipboard

  4. Kafka クライアントが Kafka ブローカーにアクセスできることを確認します。
トップに戻る
Red Hat logoGithubredditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。 最新の更新を見る.

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

Theme

© 2025 Red Hat