7.2. OAuth 2.0 トークンベース認証の使用
Streams for Apache Kafka は、トークンベースの認証に OAuth 2.0 を使用することをサポートしています。OAuth 2.0 認可サーバーは、アクセスの付与とアクセスに関する問い合わせを処理します。Kafka クライアントは Kafka ブローカーに対して認証を行います。ブローカーとクライアントは、必要に応じて認可サーバーと通信し、アクセストークンを取得または検証します。
Streams for Apache Kafka のデプロイメントでは、OAuth 2.0 統合によって次のサポートが提供されます。
- Kafka ブローカーのサーバー側 OAuth 2.0 認証
- Kafka MirrorMaker、Kafka Connect、Kafka Bridge のクライアント側 OAuth 2.0 認証
RHEL 上の Streams for Apache Kafka には、2 つの OAuth 2.0 ライブラリーが含まれています。
kafka-oauth-client
-
io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
という名前のカスタムログインコールバックハンドラークラスを提供します。OAUTHBEARER
認証メカニズムを処理するには、Apache Kafka が提供するOAuthBearerLoginModule
でログインコールバックハンドラーを使用します。 kafka-oauth-common
-
kafka-oauth-client
ライブラリーに必要な機能の一部を提供するヘルパーライブラリーです。
提供されるクライアントライブラリーは、keycloak-core
、jackson-databind
、および slf4j-api
などの追加のサードパーティーライブラリーにも依存します。
Maven プロジェクトを使用してクライアントをパッケージ化し、すべての依存関係ライブラリーが含まれるようにすることが推奨されます。依存関係ライブラリーは今後のバージョンで変更される可能性があります。
7.2.1. リスナーでの OAuth 2.0 認証の設定
OAuth 2.0 認証を使用して Kafka ブローカーを保護するには、Kafka の server.properties
ファイルで OAUth 2.0 認証とクライアント認証メカニズムを使用するように Kafka リスナーを設定し、認証で使用する認証メカニズムとトークン検証のタイプに応じてさらに設定を追加します。
最小限の設定が必要です。TLS リスナーを設定することもできます。TLS リスナーでは、ブローカー間の通信に TLS が使用されます。OAuth 2.0 認証と TLS 暗号化を併用することを推奨します。暗号化を行わないと、ネットワークの盗聴やトークンの盗難による不正アクセスに対して接続が脆弱になります。
認証のタイプを OAuth 2.0 として定義した場合は、検証のタイプ (高速なローカル JWT 検証またはイントロスペクションエンドポイントを使用したトークン検証) に基づいて設定を追加します。
SASL 認証メカニズムの有効化
クライアントがクレデンシャルを交換し、Kafka との認証済みセッションを確立できるように、次の SASL メカニズムの 1 つまたは両方を使用します。
OAUTHBEARER
OAUTHBEARER
認証メカニズムを使用すると、クレデンシャルの交換に、OAuth コールバックハンドラーによって提供されるベアラートークンが使用されます。次の方法を使用するトークンのプロビジョニングを設定できます。- クライアント ID とシークレット (OAuth 2.0 クライアントクレデンシャルメカニズム を使用)
- クライアント ID とクライアントアサーション
- 有効期間の長いアクセストークン
- 手動で取得した有効期間の長いリフレッシュトークン
OAUTHBEARER
は、PLAIN
よりも高いレベルのセキュリティーを提供するため推奨されますが、プロトコルレベルでOAUTHBEARER
メカニズムをサポートする Kafka クライアントでのみ使用できます。クライアントクレデンシャルが Kafka と共有されることはありません。PLAIN
PLAIN
は、すべての Kafka クライアントツールで使用されるシンプルな認証メカニズムです。OAUTHBEARER
をサポートしていない Kafka クライアントでのみPLAIN
を使用することを検討してください。PLAIN
認証メカニズムを使用すると、次の方法を使用するようにクレデンシャルの交換を設定できます。- クライアント ID とシークレット (OAuth 2.0 クライアントクレデンシャルメカニズム を使用)
-
有効期間の長いアクセストークン
使用する方法に関係なく、クライアントはusername
およびpassword
プロパティーを Kafka に提供する必要があります。
クレデンシャルは、
OAUTHBEARER
認証を使用する方法と同様に、準拠した認可サーバーの背後で一元的に処理されます。ユーザー名の抽出プロセスは、認可サーバーの設定によって異なります。
OAUTHBEARER
メカニズムのリスナー設定の例
sasl.enabled.mechanisms=OAUTHBEARER listeners=CLIENT://0.0.0.0:9092 listener.security.protocol.map=CLIENT:SASL_PLAINTEXT listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER sasl.mechanism.inter.broker.protocol=OAUTHBEARER inter.broker.listener.name=CLIENT listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule # ...
sasl.enabled.mechanisms=OAUTHBEARER
listeners=CLIENT://0.0.0.0:9092
listener.security.protocol.map=CLIENT:SASL_PLAINTEXT
listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER
sasl.mechanism.inter.broker.protocol=OAUTHBEARER
inter.broker.listener.name=CLIENT
listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
# ...
- 1
- SASL でのクレデンシャル交換に
OAUTHBEARER
メカニズムを有効にします。 - 2
- 接続するクライアントアプリケーションのリスナーを設定します。システム
hostname
はアドバタイズされたホスト名として使用されます。このホスト名は、再接続するためにクライアントが解決する必要があります。この例では、リスナーの名前はCLIENT
です。 - 3
- リスナーのチャネルプロトコルを指定します。
SASL_SSL
は TLS 用です。暗号化されていない接続 (TLS なし) にはSASL_PLAINTEXT
が使用されますが、TCP 接続層での盗聴のリスクがあります。 - 4
- CLIENT リスナーの
OAUTHBEARER
メカニズムを指定します。クライアント名 (CLIENT
) は通常、listeners
プロパティーでは大文字、listener.name
プロパティー (listener.name.client
) では小文字、そしてlistener.name.client.*
プロパティーの一部である場合は小文字で指定されます。 - 5
- ブローカー間通信の
OAUTHBEARER
メカニズムを指定します。 - 6
- ブローカー間通信のリスナーを指定します。仕様は、有効な設定のために必要です。
- 7
- クライアントリスナーで OAuth 2.0 認証を設定します。
プロパティーまたは変数を使用した OAuth 2.0 の設定
OAuth 2.0 設定は、Authentication and Authorization Service (JAAS) プロパティーまたは環境変数を使用して設定します。
-
JAAS のプロパティーは
server.properties
設定ファイルで設定され、listener.name.<listener_name>.oauthbearer.sasl.jaas.config
プロパティーのキーと値のペアとして渡されます。 環境変数を使用する場合は、
server.properties
ファイルにlistener.name.<listener_name>.oauthbearer.sasl.jaas.config
プロパティーを指定する必要がありますが、他の JAAS プロパティーは省略できます。大文字 (アッパーケース) の環境変数の命名規則を使用できます。
Streams for Apache Kafka OAuth 2.0 ライブラリーは、次の文字列で始まるプロパティーを使用します。
-
oauth.
: 認証の設定 -
strimzi.
: OAuth 2.0 認可の設定
高速なローカル JWT トークン検証の設定
高速なローカル JWT トークン検証では、JWT トークン署名をローカルでチェックし、トークンが次の基準を満たしていることを確認します。
-
トークンがアクセストークンであることを示す
Bearer
が、typ
(タイプ) またはtoken_type
ヘッダークレームの値として含まれている。 - 現在有効であり、期限切れでない。
-
validIssuerURI
に一致する発行者を持つ。
リスナーの設定時に validIssuerURI
属性を指定することで、認可サーバーから発行されていないトークンは拒否されます。
高速のローカル JWT トークン検証の実行中に、認可サーバーの通信は必要はありません。OAuth 2.0 の認可サーバーによって公開されるエンドポイントの jwksEndpointUri
属性を指定して、高速のローカル JWT トークン検証をアクティベートします。エンドポイントには、署名済み JWT トークンの検証に使用される公開鍵が含まれます。これらは、Kafka クライアントによってクレデンシャルとして送信されます。
認可サーバーとの通信はすべて TLS による暗号化を使用して実行する必要があります。証明書のトラストストアを設定し、トラストストアのファイルを指定することができます。
JWT トークンからユーザー名を適切に取得するため、userNameClaim
の設定を検討してください。必要に応じて、"['user.info'].['user.id']"
のような JsonPath 式を使用して、トークン内のネストされた JSON 属性からユーザー名を取得できます。
Kafka ACL 認可を使用する場合は、認証時にユーザー名でユーザーを特定します。JWT トークンの sub
要求は、通常は一意な ID でユーザー名ではありません。
高速なローカル JWT トークン検証の設定例
...
# ...
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
oauth.valid.issuer.uri="https://<auth_server_address>/<issuer-context>" \
oauth.jwks.endpoint.uri="https://<oauth_server_address>/<path_to_jwks_endpoint>" \
oauth.jwks.refresh.seconds="300" \
oauth.jwks.refresh.min.pause.seconds="1" \
oauth.jwks.expiry.seconds="360" \
oauth.username.claim="preferred_username" \
oauth.ssl.truststore.location="<path_to_truststore_p12_file>" \
oauth.ssl.truststore.password="<truststore_password>" \
oauth.ssl.truststore.type="PKCS12" ;
listener.name.client.oauthbearer.connections.max.reauth.ms=3600000
- 1
- OAuth 2.0 の CLIENT リスナーを設定します。認可サーバーとの接続はセキュアな HTTPS 接続を使用する必要があります。
- 2
- 有効な発行者 URI。この発行者が発行するアクセストークンのみが受け入れられます。(常に必要です)
- 3
- JWKS エンドポイント URL。
- 4
- エンドポイントの更新間隔 (デフォルトは 300)。
- 5
- JWKS 公開鍵の更新が連続して試行される間隔の最小一時停止時間 (秒単位)。不明な署名キーが検出されると、JWKS キーの更新は、最後に更新を試みてから少なくとも指定された期間は一時停止し、通常の定期スケジュール以外でスケジュールされます。キーの更新は指数バックオフのルールに従い、
oauth.jwks.refresh.seconds
に到達するまで、一時停止を増やして失敗した更新を再試行します。デフォルト値は 1 です。 - 6
- JWK 証明書が期限切れになる前に有効とみなされる期間。デフォルトは
360
秒です。デフォルトよりも長い時間を指定する場合は、無効になった証明書へのアクセスが許可されるリスクを考慮してください。 - 7
- トークンの実際のユーザー名が含まれるトークン要求 (またはキー)。ユーザー名は、ユーザーの識別に使用される principal です。値は、使用される認証フローと認可サーバーによって異なります。必要に応じて、
"['user.info'].['user.id']"
のような JsonPath 式を使用して、トークン内のネストされた JSON 属性からユーザー名を取得できます。 - 8
- TLS 設定で使用されるトラストストアの場所。
- 9
- トラストストアにアクセスするためのパスワード。
- 10
- PKCS #12 形式のトラストストアタイプ。
- 11
- (オプション): トークンの期限が切れるとセッションが強制的に期限切れとなり、また、Kafka の再認証メカニズム が有効になります。指定された値がアクセストークンの有効期限が切れるまでの残り時間よりも短い場合、クライアントは実際にトークンの有効期限が切れる前に再認証する必要があります。デフォルトでは、アクセストークンの期限が切れてもセッションは期限切れにならず、クライアントは再認証を試行しません。
イントロスペクションエンドポイントを使用したトークン検証の設定
OAuth 2.0 のイントロスペクションエンドポイントを使用したトークンの検証では、受信したアクセストークンは不透明として対処されます。Kafka ブローカーは、アクセストークンをイントロスペクションエンドポイントに送信します。このエンドポイントは、検証に必要なトークン情報を応答として返します。ここで重要なのは、特定のアクセストークンが有効である場合は最新情報を返すことで、トークンの有効期限に関する情報も返します。
OAuth 2.0 イントロスペクションベースの検証を設定するには、高速ローカル JWT トークン検証用に指定された JWK エンドポイント URI ではなく、イントロスペクションエンドポイント URI を指定します。通常、イントロスペクションエンドポイントは保護されているため、認可サーバーに応じて client ID および client secret を指定する必要があります。
イントロスペクションエンドポイントを使用したトークン検証設定の例
...
# ...
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
oauth.introspection.endpoint.uri="https://<oauth_server_address>/<introspection_endpoint>" \
oauth.client.id="kafka-broker" \
oauth.client.secret="kafka-broker-secret" \
oauth.ssl.truststore.location="<path_to_truststore_p12_file>" \
oauth.ssl.truststore.password="<truststore_password>" \
oauth.ssl.truststore.type="PKCS12" \
oauth.username.claim="preferred_username" ;
- 1
- トークンイントロスペクションエンドポイントの URI。
- 2
- Kafka ブローカーのクライアント ID。
- 3
- Kafka ブローカーのシークレット。
- 4
- TLS 設定で使用されるトラストストアの場所。
- 5
- トラストストアにアクセスするためのパスワード。
- 6
- PKCS #12 形式のトラストストアタイプ。
- 7
- トークンの実際のユーザー名が含まれるトークン要求 (またはキー)。ユーザー名は、ユーザーの識別に使用される principal です。値は、使用される認証フローと認可サーバーによって異なります。必要に応じて、
"['user.info'].['user.id']"
のような JsonPath 式を使用して、トークン内のネストされた JSON 属性からユーザー名を取得できます。
認可サーバーの保護されたエンドポイントへのブローカーの認証
通常、認可サーバーの証明書エンドポイント (oauth.jwks.endpoint.uri
) はパブリックにアクセス可能ですが、イントロスペクションエンドポイント (oauth.introspection.endpoint.uri
) は保護されています。ただし、これは認可サーバーの設定によって異なる場合があります。
Kafka ブローカーは、HTTP 認証スキームを使用して、次のどちらかの方法で、認可サーバーの保護されたエンドポイントに対して認証できます。
- HTTP Basic 認証 でクライアント ID とシークレットを使用する。
- HTTP ベアラー認証 でベアラートークンを使用する。
HTTP Basic 認証を設定するには、次のプロパティーを設定します。
-
oauth.client.id
-
oauth.client.secret
HTTP ベアラー認証の場合は、次のいずれかのプロパティーを設定します。
-
oauth.server.bearer.token.location
: ベアラートークンを含むディスク上のファイルパスを指定します。 -
oauth.server.bearer.token
: ベアラートークンをクリアテキストで指定します。
追加の設定オプションの指定
認証要件と使用している認可サーバーに応じて追加の設定を指定します。これらのプロパティーの一部は、特定の認証メカニズムにのみ適用されるか、他のプロパティーと組み合わせて使用される場合にのみ適用されます。
たとえば、PLAIN
メカニズムで OAUth を使用する場合、アクセストークンが、$accessToken:
接頭辞の有無にかかわらず、password
プロパティー値として渡されます。
-
リスナー設定でトークンエンドポイント (
oauth.token.endpoint.uri
) を設定する場合、接頭辞が必要です。 - リスナー設定でトークンエンドポイントを設定しない場合、接頭辞は必要ありません。Kafka ブローカーは、パスワードを raw アクセストークンとして解釈します。
アクセストークンとして password
が設定されている場合、username
は Kafka ブローカーがアクセストークンから取得するプリンシパル名と同じものを設定する必要があります。oauth.username.claim
、oauth.username.prefix
、oauth.fallback.username.claim
、oauth.fallback.username.prefix
、および oauth.userinfo.endpoint.uri
プロパティーを使用すると、リスナーでユーザー名抽出オプションを指定できます。ユーザー名の抽出プロセスも、認可サーバーによって異なります。特に、クライアント ID をアカウント名にマッピングする方法により異なります。
PLAIN
メカニズムはパスワードグラント認証をサポートしていません。認証には、クライアントクレデンシャル (クライアント ID + シークレット) またはアクセストークンのいずれかを使用してください。
追加の設定の例
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ # ... oauth.token.endpoint.uri="https://<auth_server_address>/<path_to_token_endpoint>" \ oauth.custom.claim.check="@.custom == 'custom-value'" \ oauth.scope="<scope>" \ oauth.check.audience="true" \ oauth.audience="<audience>" \ oauth.client.id="kafka-broker" \ oauth.client.secret="kafka-broker-secret" \ oauth.connect.timeout.seconds=60 \ oauth.read.timeout.seconds=60 \ oauth.http.retries=2 \ oauth.http.retry.pause.millis=300 \ oauth.groups.claim="$.groups" \ oauth.groups.claim.delimiter="," \ oauth.include.accept.header="false" ; oauth.check.issuer=false \ oauth.username.prefix="user-account-" \ oauth.fallback.username.claim="client_id" \ oauth.fallback.username.prefix="service-account-" \ oauth.valid.token.type="bearer" \ oauth.userinfo.endpoint.uri="https://<auth_server_address>/<path_to_userinfo_endpoint>" ;
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
# ...
oauth.token.endpoint.uri="https://<auth_server_address>/<path_to_token_endpoint>" \
oauth.custom.claim.check="@.custom == 'custom-value'" \
oauth.scope="<scope>" \
oauth.check.audience="true" \
oauth.audience="<audience>" \
oauth.client.id="kafka-broker" \
oauth.client.secret="kafka-broker-secret" \
oauth.connect.timeout.seconds=60 \
oauth.read.timeout.seconds=60 \
oauth.http.retries=2 \
oauth.http.retry.pause.millis=300 \
oauth.groups.claim="$.groups" \
oauth.groups.claim.delimiter="," \
oauth.include.accept.header="false" ;
oauth.check.issuer=false \
oauth.username.prefix="user-account-" \
oauth.fallback.username.claim="client_id" \
oauth.fallback.username.prefix="service-account-" \
oauth.valid.token.type="bearer" \
oauth.userinfo.endpoint.uri="https://<auth_server_address>/<path_to_userinfo_endpoint>" ;
- 1
- 認可サーバーへの OAuth 2.0 トークンエンドポイント URL。実稼働環境では、常に
https://
URL を使用してください。KeycloakAuthorizer
を使用する場合、またはブローカー間通信に OAuth 2.0 が有効なリスナーを使用する場合に必要です。 - 2
- (オプション) カスタムクレームチェック。検証時に追加のカスタムルールを JWT アクセストークンに適用する JsonPath フィルタークエリー。アクセストークンに必要なデータが含まれていないと拒否されます。イントロスペクション エンドポイントメソッドを使用する場合は、カスタムチェックがイントロスペクションエンドポイントの応答 JSON に適用されます。
- 3
- (オプション)
scope
パラメーターがトークンエンドポイントに渡されます。スコープ は、ブローカー間認証用にアクセストークンを取得する場合に使用されます。また、clientId
とsecret
を使用した PLAIN クライアント認証の上にある OAuth 2.0 のクライアント名にも使われています。これは、認可サーバーに応じて、トークンの取得機能とトークンの内容のみに影響します。リスナーによるトークン検証ルールには影響しません。 - 4
- (オプション) オーディエンスチェック。認可サーバーが
aud
(オーディエンス) クレームを提供していて、オーディエンスチェックを実施する場合は、ouath.check.audience
をtrue
に設定します。オーディエンスチェックによって、トークンの目的の受信者が特定されます。これにより、Kafka ブローカーはaud
要求にclientId
を持たないトークンを拒否します。デフォルトはfalse
です。 - 5
- (オプション) トークンエンドポイントに渡される
audience
パラメーター。オーディエンス は、ブローカー間認証用にアクセストークンを取得する場合に使用されます。また、clientId
とsecret
を使用した PLAIN クライアント認証の上にある OAuth 2.0 のクライアント名にも使われています。これは、認可サーバーに応じて、トークンの取得機能とトークンの内容のみに影響します。リスナーによるトークン検証ルールには影響しません。 - 6
- すべてのブローカーで同一の、Kafka ブローカーの設定されたクライアント ID。これは、
kafka-broker
として認可サーバーに登録されたクライアントです。イントロスペクションエンドポイントがトークンの検証に使用される場合、またはKeycloakAuthorizer
が使用される場合に必要です。 - 7
- すべてのブローカーで同じ Kafka ブローカーに設定されたシークレット。ブローカーが認証サーバーに対して認証する必要がある場合は、クライアントシークレット、アクセストークン、またはリフレッシュトークンのいずれかを指定する必要があります。
- 8
- (オプション) 認可サーバーへの接続時のタイムアウト (秒単位)。デフォルト値は 60 です。
- 9
- (オプション): 認可サーバーへの接続時の読み取りタイムアウト (秒単位)。デフォルト値は 60 です。
- 10
- 認可サーバーへの失敗した HTTP リクエストを再試行する最大回数。デフォルト値は 0 で、再試行は実行されないことを意味します。このオプションを効果的に使用するには、
oauth.connect.timeout.seconds
オプションとoauth.read.timeout.seconds
オプションのタイムアウト時間を短縮することを検討してください。ただし、再試行により現在のワーカースレッドが他のリクエストで利用できなくなる可能性があり、リクエストが多すぎると停止する場合、Kafka ブローカーが応答しなくなる可能性があることに注意してください。 - 11
- 認可サーバーへの失敗した HTTP リクエストの再試行を行うまでの待機時間。デフォルトでは、この時間はゼロに設定されており、一時停止は適用されません。これは、リクエストの失敗の原因となる問題の多くは、リクエストごとのネットワークの不具合やプロキシーの問題であり、すぐに解決できるためです。ただし、認可サーバーに負荷がかかっている場合、または高トラフィックが発生している場合は、このオプションを 100 ミリ秒以上の値に設定して、サーバーの負荷を軽減し、再試行が成功する可能性を高めることができます。
- 12
- JWT トークンまたはイントロスペクションエンドポイントの応答からグループ情報を抽出するために使用される JsonPath クエリー。デフォルトでは設定されません。これは、カスタムオーソライザーがユーザーグループに基づいて認可の決定を行うために使用できます。
- 13
- 1 つのコンマ区切りの文字列として返されるときにグループ情報を解析するために使用される区切り文字。デフォルト値は ',' (コンマ) です。
- 14
- (オプション)
oauth.include.accept.header
をfalse
に設定して、リクエストからAccept
ヘッダーを削除します。ヘッダーを含めることで認可サーバーとの通信時に問題が発生する場合は、この設定を使用できます。 - 15
- 認可サーバーが
iss
クレームを提供しない場合は、発行者チェックを行うことができません。このような場合、oauth.check.issuer
をfalse
に設定し、oauth.valid.issuer.uri
を指定しないようにします。デフォルトはtrue
です。 - 16
- ユーザー ID を構成するときに使用する接頭辞。これは、
oauth.username.claim
が設定されている場合にのみ有効になります。 - 17
- 認可サーバーは、通常ユーザーとクライアントの両方を識別する単一の属性を提供しない場合があります。クライアントが独自の名前で認証される場合、サーバーによって クライアント ID 属性が提供されることがあります。リフレッシュトークンまたはアクセストークンを取得するために、ユーザー名およびパスワードを使用してユーザーが認証される場合、サーバーによってクライアント ID の他に ユーザー名 が提供されることがあります。プライマリーユーザー ID 属性が使用できない場合は、このフォールバックオプションで、使用するユーザー名クレーム (属性) を指定します。必要に応じて、
"['client.info'].['client.id']"
のような JsonPath 式を使用して、トークン内のネストされた JSON 属性からフォールバックユーザー名を取得できます。 - 18
oauth.fallback.username.claim
が適用される場合、ユーザー名クレームの値とフォールバックユーザー名クレームの値が競合しないようにする必要もあることがあります。producer
というクライアントが存在し、producer
という通常ユーザーも存在する場合について考えてみましょう。この 2 つを区別するには、このプロパティーを使用してクライアントのユーザー ID に接頭辞を追加します。- 19
- (
oauth.introspection.endpoint.uri
を使用する場合のみ該当): 使用している認証サーバーによっては、イントロスペクションエンドポイントによって トークンタイプ 属性が返されるかどうかはわからず、異なる値が含まれることがあります。イントロスペクションエンドポイントからの応答に含まれなければならない有効なトークンタイプ値を指定できます。 - 20
- (
oauth.introspection.endpoint.uri
を使用する場合のみ該当): イントロスペクションエンドポイントの応答に識別可能な情報が含まれないように、認可サーバーが設定または実装されることがあります。ユーザー ID を取得するには、userinfo
エンドポイントの URI をフォールバックとして設定します。oauth.username.claim
、oauth.username.prefix
、oauth.fallback.username.claim
、およびoauth.fallback.username.prefix
設定は、userinfo
エンドポイントの応答にも適用されます。
ブローカー間通信のリスナーの設定
次の例では、ブローカー間通信がアプリケーションクライアントと同じリスナーを通過する最小設定で、高速トークン検証に OAUTHBEARER
メカニズムを使用します。
oauth.client.id
、oauth.client.secret
、および auth.token.endpoint.uri
プロパティーは、ブローカー間の通信に関連しています。
OAUTHBEARER
メカニズムを使用したブローカー間設定の例
sasl.enabled.mechanisms=OAUTHBEARER listeners=CLIENT://0.0.0.0:9092 listener.security.protocol.map=CLIENT:SASL_PLAINTEXT listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER sasl.mechanism.inter.broker.protocol=OAUTHBEARER inter.broker.listener.name=CLIENT listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.valid.issuer.uri="https://<auth_server_address>/<issuer-context>" \ oauth.jwks.endpoint.uri="https://<oauth_server_address>/<path_to_jwks_endpoint>" \ oauth.username.claim="preferred_username" \ oauth.client.id="kafka-broker" \ oauth.client.secret="kafka-secret" \ oauth.token.endpoint.uri="https://<oauth_server_address>/<token_endpoint>" ; listener.name.client.oauthbearer.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler listener.name.client.oauthbearer.connections.max.reauth.ms=3600000
sasl.enabled.mechanisms=OAUTHBEARER
listeners=CLIENT://0.0.0.0:9092
listener.security.protocol.map=CLIENT:SASL_PLAINTEXT
listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER
sasl.mechanism.inter.broker.protocol=OAUTHBEARER
inter.broker.listener.name=CLIENT
listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
oauth.valid.issuer.uri="https://<auth_server_address>/<issuer-context>" \
oauth.jwks.endpoint.uri="https://<oauth_server_address>/<path_to_jwks_endpoint>" \
oauth.username.claim="preferred_username" \
oauth.client.id="kafka-broker" \
oauth.client.secret="kafka-secret" \
oauth.token.endpoint.uri="https://<oauth_server_address>/<token_endpoint>" ;
listener.name.client.oauthbearer.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
listener.name.client.oauthbearer.connections.max.reauth.ms=3600000
次の例は、ブローカー間通信に使用される TLS リスナーの最小設定を示しています。
TLS を使用したブローカー間設定の例
sasl.enabled.mechanisms=OAUTHBEARER listeners=REPLICATION://kafka:9091,CLIENT://kafka:9092 listener.security.protocol.map=REPLICATION:SSL,CLIENT:SASL_PLAINTEXT listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER inter.broker.listener.name=REPLICATION listener.name.replication.ssl.keystore.password=<keystore_password> listener.name.replication.ssl.truststore.password=<truststore_password> listener.name.replication.ssl.keystore.type=JKS listener.name.replication.ssl.truststore.type=JKS listener.name.replication.ssl.secure.random.implementation=SHA1PRNG listener.name.replication.ssl.endpoint.identification.algorithm=HTTPS listener.name.replication.ssl.keystore.location=<path_to_keystore> listener.name.replication.ssl.truststore.location=<path_to_truststore> listener.name.replication.ssl.client.auth=required listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.valid.issuer.uri="https://<auth_server_address>/<issuer-context>" \ oauth.jwks.endpoint.uri="https://<oauth_server_address>/<path_to_jwks_endpoint>" \ oauth.username.claim="preferred_username" ;
sasl.enabled.mechanisms=OAUTHBEARER
listeners=REPLICATION://kafka:9091,CLIENT://kafka:9092
listener.security.protocol.map=REPLICATION:SSL,CLIENT:SASL_PLAINTEXT
listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER
inter.broker.listener.name=REPLICATION
listener.name.replication.ssl.keystore.password=<keystore_password>
listener.name.replication.ssl.truststore.password=<truststore_password>
listener.name.replication.ssl.keystore.type=JKS
listener.name.replication.ssl.truststore.type=JKS
listener.name.replication.ssl.secure.random.implementation=SHA1PRNG
listener.name.replication.ssl.endpoint.identification.algorithm=HTTPS
listener.name.replication.ssl.keystore.location=<path_to_keystore>
listener.name.replication.ssl.truststore.location=<path_to_truststore>
listener.name.replication.ssl.client.auth=required
listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
oauth.valid.issuer.uri="https://<auth_server_address>/<issuer-context>" \
oauth.jwks.endpoint.uri="https://<oauth_server_address>/<path_to_jwks_endpoint>" \
oauth.username.claim="preferred_username" ;
- 1
- ブローカー間通信とクライアントアプリケーションには、個別の設定が必要です。
- 2
- REPLICATION リスナーが TLS を使用し、CLIENT リスナーが暗号化されていないチャネルで SASL を使用するように設定します。実稼働環境では、クライアントは暗号化されたチャンネル (
SASL_SSL
) を使用できます。 - 3
ssl.
プロパティーは TLS 設定を定義します。- 4
- 乱数ジェネレーターの実装。設定されていない場合は、Java プラットフォーム SDK デフォルトが使用されます。
- 5
- ホスト名の検証。空の文字列に設定すると、ホスト名の検証はオフになります。設定されていない場合、デフォルト値は
HTTPS
で、サーバー証明書のホスト名の検証を強制します。 - 6
- リスナーのキーストアへのパス。
- 7
- リスナーのトラストストアへのパス。
- 8
- (ブローカー間接続に使用される) TLS 接続の確立時に REPLICATION リスナーのクライアントがクライアント証明書で認証する必要があることを指定します。
次の例では、ブローカー間通信がアプリケーションクライアントと同じリスナーを通過する最小設定で、高速トークン検証に PLAIN
メカニズムを使用します。
PLAIN
メカニズムを使用したブローカー間設定の例
listeners=CLIENT://0.0.0.0:9092 listener.security.protocol.map=CLIENT:SASL_PLAINTEXT listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER,PLAIN sasl.mechanism.inter.broker.protocol=OAUTHBEARER inter.broker.listener.name=CLIENT listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.valid.issuer.uri="https:<auth_server_address>/<issuer-context>" \ oauth.jwks.endpoint.uri="https://<auth_server>/<path_to_jwks_endpoint>" \ oauth.username.claim="preferred_username" \ oauth.client.id="kafka-broker" \ oauth.client.secret="kafka-secret" \ oauth.token.endpoint.uri="https://<oauth_server_address>/<token_endpoint>" ; listener.name.client.oauthbearer.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler listener.name.client.plain.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.plain.JaasServerOauthOverPlainValidatorCallbackHandler listener.name.client.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ oauth.valid.issuer.uri="https://<auth_server_address>/<issuer-context>" \ oauth.jwks.endpoint.uri="https://<oauth_server_address>/<path_to_jwks_endpoint>" \ oauth.username.claim="preferred_username" \ oauth.token.endpoint.uri="https://<oauth_server_address>/<token_endpoint>" ; listener.name.client.oauthbearer.connections.max.reauth.ms=3600000
listeners=CLIENT://0.0.0.0:9092
listener.security.protocol.map=CLIENT:SASL_PLAINTEXT
listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER,PLAIN
sasl.mechanism.inter.broker.protocol=OAUTHBEARER
inter.broker.listener.name=CLIENT
listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
oauth.valid.issuer.uri="https:<auth_server_address>/<issuer-context>" \
oauth.jwks.endpoint.uri="https://<auth_server>/<path_to_jwks_endpoint>" \
oauth.username.claim="preferred_username" \
oauth.client.id="kafka-broker" \
oauth.client.secret="kafka-secret" \
oauth.token.endpoint.uri="https://<oauth_server_address>/<token_endpoint>" ;
listener.name.client.oauthbearer.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
listener.name.client.plain.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.plain.JaasServerOauthOverPlainValidatorCallbackHandler
listener.name.client.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
oauth.valid.issuer.uri="https://<auth_server_address>/<issuer-context>" \
oauth.jwks.endpoint.uri="https://<oauth_server_address>/<path_to_jwks_endpoint>" \
oauth.username.claim="preferred_username" \
oauth.token.endpoint.uri="https://<oauth_server_address>/<token_endpoint>" ;
listener.name.client.oauthbearer.connections.max.reauth.ms=3600000
- 1
- ブローカー間通信の OAuth 2.0 認証を有効にします。
- 2
PLAIN
認証のサーバーコールバックハンドラーを設定します。- 3
PLAIN
認証を使用して、クライアント通信の認証設定を設定します。oauth.token.endpoint.uri
は、OAuth 2.0 クライアントクレデンシャルメカニズム を使用して OAuth 2.0 over PLAIN を有効にする任意のプロパティーです。- 4
- 認可サーバーへの OAuth 2.0 トークンエンドポイント URL。これが指定されている場合、クライアントは
$accessToken:
接頭辞を使用してアクセストークンをpassword
として渡すことで、PLAIN 経由で認証できます。
7.2.2. クライアントアプリケーションでの OAuth 2.0 の設定
クライアントアプリケーションで OAuth 2.0 を設定するには、以下を指定する必要があります。
- SASL (Simple Authentication and Security Layer) セキュリティープロトコル
- SASL メカニズム
- JAAS (Java Authentication and Authorization Service) モジュール
- 認可サーバーにアクセスするための認証プロパティー
SASL プロトコルの設定
クライアント設定で SASL プロトコルを指定します。
-
TLS 暗号化接続を介した認証用の
SASL_SSL
-
暗号化されていない接続を介した認証用の
SASL_PLAINTEXT
プロダクションには SASL_SSL
を使用し、ローカル開発には SASL_PLAINTEXT
のみを使用してください。
SASL_SSL
を使用する場合は、追加の ssl.truststore
設定が必要です。OAuth 2.0 認可サーバーへのセキュアな接続 (https://
) には、トラストストア設定が必要です。OAuth 2.0 認可サーバーを確認するには、認可サーバーの CA 証明書をクライアント設定のトラストストアに追加します。トラストストアは、PEM または PKCS #12 形式で設定できます。
SASL 認証メカニズムの設定
クライアント設定で SASL メカニズムを指定します。
-
ベアラートークンを使用したクレデンシャル交換用の
OAUTHBEARER
-
クライアントクレデンシャル (clientId + secret) またはアクセストークンを渡す
PLAIN
JAAS モジュールの設定
SASL 認証メカニズムを実装する JAAS モジュールを sasl.jaas.config
プロパティーの値として指定します。
-
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
はOAUTHBEARER
メカニズムを実装します。 -
org.apache.kafka.common.security.plain.PlainLoginModule
はPLAIN
メカニズムを実装します。
OAUTHBEARER
メカニズムの場合、Streams for Apache Kafka は、クレデンシャルの交換を可能にするために、Kafka クライアント Java ライブラリーを使用するクライアントにコールバックハンドラーを提供します。他の言語のクライアントの場合、アクセストークンを取得するためにカスタムコードが必要になる場合があります。PLAIN
メカニズムの場合、Streams for Apache Kafka は、クレデンシャルの交換を可能にするために、サーバー側のコールバックを提供します。
OAUTHBEARER
メカニズムを使用できるようにするには、コールバックハンドラーとしてカスタムの io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
クラスも追加する必要があります。JaasClientOauthLoginCallbackHandler
は、クライアントのログイン中にアクセストークンの認可サーバーへの OAuth コールバックを処理します。これにより、トークンの自動更新が可能になり、ユーザーの介入なしで継続的な認証が保証されます。さらに、OAuth 2.0 パスワードグラント方式を使用してクライアントのログインクレデンシャルを処理します。
認証プロパティーの設定
OAuth 2.0 認証にクレデンシャルまたはアクセストークンを使用するようにクライアントを設定します。
- クライアントクレデンシャルの使用
- クライアントクレデンシャルを使用するには、認可サーバーから有効なアクセストークンを取得するために必要なクレデンシャル (クライアント ID とシークレットまたはクライアント ID とクライアントアサーション) を使用してクライアントを設定する必要があります。これは最も単純なメカニズムです。
- アクセストークンの使用
- アクセストークンを使用すると、クライアントが、認可サーバーから取得した有効な有効期間の長いアクセストークンまたはリフレッシュトークンで設定されます。アクセストークンを使用すると、認可サーバーツールへ依存が増すため、複雑さが増します。有効期間が長いアクセストークンを使用している場合は、認可サーバーでクライアントを設定し、トークンの最大有効期間を長くする必要があります。
Kafka に送信される情報はアクセストークンだけです。トークンの取得に使用されるクレデンシャルは、Kafka に送信されることはありません。クライアントによるアクセストークンの取得後、認可サーバーと通信する必要はありません。
SASL 認証プロパティーは、次の認証方法をサポートします。
- OAuth 2.0 クライアントクレデンシャル
- アクセストークンまたはサービスアカウントトークン
- リフレッシュトークン
- OAuth 2.0 パスワードグラント (非推奨)
認証プロパティーを JAAS 設定 (sasl.jaas.config
および sasl.login.callback.handler.class
) として追加します。
クライアントアプリケーションがアクセストークンを使用して直接設定されていない場合、クライアントは Kafka セッションの開始時に、次のいずれかのクレデンシャルセットをアクセストークンと交換します。
- クライアント ID およびシークレット
- クライアント ID とクライアントアサーション
- クライアント ID、リフレッシュトークン、および (必要に応じて) シークレット
- ユーザー名とパスワード、およびクライアント ID と (必要に応じて) シークレット
認証プロパティーを環境変数または Java システムプロパティーとして指定することもできます。Java システムプロパティーの場合は、setProperty
を使用して設定し、-D
オプションを使用してコマンドラインで渡すことができます。
クライアントシークレットを使用したクライアントクレデンシャルの設定例
security.protocol=SASL_SSL sasl.mechanism=OAUTHBEARER ssl.truststore.location=/tmp/truststore.p12 ssl.truststore.password=$STOREPASS ssl.truststore.type=PKCS12 sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.token.endpoint.uri="<token_endpoint_url>" \ oauth.client.id="<client_id>" \ oauth.client.secret="<client_secret>" \ oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \ oauth.ssl.truststore.password="$STOREPASS" \ oauth.ssl.truststore.type="PKCS12" \ oauth.scope="<scope>" \ oauth.audience="<audience>" ; sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
ssl.truststore.location=/tmp/truststore.p12
ssl.truststore.password=$STOREPASS
ssl.truststore.type=PKCS12
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
oauth.token.endpoint.uri="<token_endpoint_url>" \
oauth.client.id="<client_id>" \
oauth.client.secret="<client_secret>" \
oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \
oauth.ssl.truststore.password="$STOREPASS" \
oauth.ssl.truststore.type="PKCS12" \
oauth.scope="<scope>" \
oauth.audience="<audience>" ;
sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
- 1
- TLS 暗号化接続用の
SASL_SSL
セキュリティープロトコル。ローカル開発のみでは、暗号化されていない接続でSASL_PLAINTEXT
を使用します。 - 2
OAUTHBEARER
またはPLAIN
として指定された SASL メカニズム。- 3
- Kafka クラスターへのセキュアなアクセスのためのトラストストア設定。
- 4
- 認可サーバーのトークンエンドポイントの URI です。
- 5
- クライアント ID。認可サーバーで クライアント を作成するときに使用される名前です。
- 6
- 認可サーバーで クライアント を作成するときに作成されるクライアントシークレット。
- 7
- この場所には、認可サーバーの公開鍵証明書 (
truststore.p12
) が含まれています。 - 8
- トラストストアにアクセスするためのパスワード。
- 9
- トラストストアのタイプ。
- 10
- (オプション): トークンエンドポイントからトークンを要求するための
scope
。認可サーバーでは、クライアントによるスコープの指定が必要になることがあります。 - 11
- (オプション) トークンエンドポイントからトークンを要求するための
audience
。認可サーバーでは、クライアントによるオーディエンスの指定が必要になることがあります。
クライアントアサーションを使用したクライアントクレデンシャルの設定例
security.protocol=SASL_SSL sasl.mechanism=OAUTHBEARER ssl.truststore.location=/tmp/truststore.p12 ssl.truststore.password=$STOREPASS ssl.truststore.type=PKCS12 sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.token.endpoint.uri="<token_endpoint_url>" \ oauth.client.id="<client_id>" \ oauth.client.assertion.location="<path_to_client_assertion_token_file>" \ oauth.client.assertion.type="urn:ietf:params:oauth:client-assertion-type:jwt-bearer" \ oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \ oauth.ssl.truststore.password="$STOREPASS" \ oauth.ssl.truststore.type="PKCS12" \ oauth.scope="<scope>" \ oauth.audience="<audience>" ; sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
ssl.truststore.location=/tmp/truststore.p12
ssl.truststore.password=$STOREPASS
ssl.truststore.type=PKCS12
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
oauth.token.endpoint.uri="<token_endpoint_url>" \
oauth.client.id="<client_id>" \
oauth.client.assertion.location="<path_to_client_assertion_token_file>" \
oauth.client.assertion.type="urn:ietf:params:oauth:client-assertion-type:jwt-bearer" \
oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \
oauth.ssl.truststore.password="$STOREPASS" \
oauth.ssl.truststore.type="PKCS12" \
oauth.scope="<scope>" \
oauth.audience="<audience>" ;
sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
パスワードグラントの設定例
security.protocol=SASL_SSL sasl.mechanism=OAUTHBEARER ssl.truststore.location=/tmp/truststore.p12 ssl.truststore.password=$STOREPASS ssl.truststore.type=PKCS12 sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.token.endpoint.uri="<token_endpoint_url>" \ oauth.client.id="<client_id>" \ oauth.client.secret="<client_secret>" \ oauth.password.grant.username="<username>" \ oauth.password.grant.password="<password>" \ oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \ oauth.ssl.truststore.password="$STOREPASS" \ oauth.ssl.truststore.type="PKCS12" \ oauth.scope="<scope>" \ oauth.audience="<audience>" ; sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
ssl.truststore.location=/tmp/truststore.p12
ssl.truststore.password=$STOREPASS
ssl.truststore.type=PKCS12
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
oauth.token.endpoint.uri="<token_endpoint_url>" \
oauth.client.id="<client_id>" \
oauth.client.secret="<client_secret>" \
oauth.password.grant.username="<username>" \
oauth.password.grant.password="<password>" \
oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \
oauth.ssl.truststore.password="$STOREPASS" \
oauth.ssl.truststore.type="PKCS12" \
oauth.scope="<scope>" \
oauth.audience="<audience>" ;
sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
- 1
- クライアント ID。認可サーバーで クライアント を作成するときに使用される名前です。
- 2
- (オプション) 認可サーバーで クライアント を作成するときに作成されるクライアントシークレット。
- 3
- パスワードグラント認証のユーザー名。OAuth パスワードグラント設定 (ユーザー名とパスワード) では、OAuth 2.0 パスワードグラント方式が使用されます。パスワードグラントを使用するには、権限が制限されたクライアントのユーザーアカウントを認可サーバー上に作成します。アカウントは、サービスアカウントのように機能する必要があります。認証にユーザーアカウントが必要な環境で使用しますが、先にリフレッシュトークンの使用を検討してください。
- 4
- パスワードグラント認証のパスワード。注記
SASL
PLAIN
は、OAuth 2.0 パスワードグラント方式を使用したユーザー名とパスワードの受け渡し (パスワードグラント) をサポートしていません。
アクセストークンの設定例
security.protocol=SASL_SSL sasl.mechanism=OAUTHBEARER ssl.truststore.location=/tmp/truststore.p12 ssl.truststore.password=$STOREPASS ssl.truststore.type=PKCS12 sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.access.token="<access_token>" ; sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
ssl.truststore.location=/tmp/truststore.p12
ssl.truststore.password=$STOREPASS
ssl.truststore.type=PKCS12
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
oauth.access.token="<access_token>" ;
sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
- 1
- Kafka クライアントの有効期間が長いアクセストークン。または、
oauth.access.token.location
を使用して、アクセストークンを含むファイルを指定することもできます。
OpenShift サービスアカウントトークンの設定例
security.protocol=SASL_SSL sasl.mechanism=OAUTHBEARER ssl.truststore.location=/tmp/truststore.p12 ssl.truststore.password=$STOREPASS ssl.truststore.type=PKCS12 sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.access.token.location="/var/run/secrets/kubernetes.io/serviceaccount/token"; sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
ssl.truststore.location=/tmp/truststore.p12
ssl.truststore.password=$STOREPASS
ssl.truststore.type=PKCS12
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
oauth.access.token.location="/var/run/secrets/kubernetes.io/serviceaccount/token";
sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
- 1
- ファイルシステム上のサービスアカウントトークンの場所 (クライアントが OpenShift Pod としてデプロイされていると想定)
リフレッシュトークンの設定例
security.protocol=SASL_SSL sasl.mechanism=OAUTHBEARER ssl.truststore.location=/tmp/truststore.p12 ssl.truststore.password=$STOREPASS ssl.truststore.type=PKCS12 sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.token.endpoint.uri="<token_endpoint_url>" \ oauth.client.id="<client_id>" \ oauth.client.secret="<client_secret>" \ oauth.refresh.token="<refresh_token>" \ oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \ oauth.ssl.truststore.password="$STOREPASS" \ oauth.ssl.truststore.type="PKCS12" ; sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
ssl.truststore.location=/tmp/truststore.p12
ssl.truststore.password=$STOREPASS
ssl.truststore.type=PKCS12
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
oauth.token.endpoint.uri="<token_endpoint_url>" \
oauth.client.id="<client_id>" \
oauth.client.secret="<client_secret>" \
oauth.refresh.token="<refresh_token>" \
oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \
oauth.ssl.truststore.password="$STOREPASS" \
oauth.ssl.truststore.type="PKCS12" ;
sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
7.2.3. OAuth 2.0 クライアント認証フロー
OAuth 2.0 認証フローは、基礎となる Kafka クライアントおよび Kafka ブローカー設定によって異なります。フローは、使用する認可サーバーによってもサポートされる必要があります。
Kafka ブローカーリスナー設定は、クライアントがアクセストークンを使用して認証する方法を決定します。クライアントはクライアント ID およびシークレットを渡してアクセストークンをリクエストできます。
リスナーが PLAIN
認証を使用するように設定されている場合、クライアントはクライアント ID とシークレット、またはユーザー名とアクセストークンを使用して認証できます。これらの値は、PLAIN
メカニズムの username
および password
プロパティーとして渡されます。
リスナー設定は、以下のトークン検証オプションをサポートします。
- 認可サーバーと通信しない、JWT の署名確認およびローカルトークンのイントロスペクションをベースとした高速なローカルトークン検証を使用できます。認可サーバーは、トークンで署名を検証するために使用される公開証明書のある JWKS エンドポイントを提供します。
- 認可サーバーが提供するトークンイントロスペクションエンドポイントへの呼び出しを使用することができます。新しい Kafka ブローカー接続が確立されるたびに、ブローカーはクライアントから受け取ったアクセストークンを認可サーバーに渡します。Kafka ブローカーは応答を確認して、トークンが有効かどうかを確認します。
認可サーバーは不透明なアクセストークンの使用のみを許可する可能性があり、この場合はローカルトークンの検証は不可能です。
Kafka クライアントクレデンシャルは、以下のタイプの認証に対して設定することもできます。
- 以前に生成された有効期間の長いアクセストークンを使用した直接ローカルアクセス
- 新しいアクセストークンを発行するために認可サーバーに問い合わせる (クライアント ID およびクレデンシャル、またはリフレッシュトークン、またはユーザー名とパスワードを使用)
7.2.3.1. SASL OAUTHBEARER
メカニズムを使用したクライアント認証フローの例
SASL OAUTHBEARER
メカニズムを使用して、Kafka 認証に以下の通信フローを使用できます。
クライアントがクライアント ID およびクレデンシャルを使用し、ブローカーが認可サーバーに検証を委譲する場合
- Kafka クライアントは、クライアント ID およびクレデンシャル、および必要に応じてリフレッシュトークンを使用して、認可サーバーからアクセストークンを要求します。または、クライアントはユーザー名とパスワードを使用して認証することもできます。
- 認可サーバーは新しいアクセストークンを生成します。
-
Kafka クライアントは、SASL
OAUTHBEARER
メカニズムを使用して、有効期限の長いアクセストークンを渡すために Kafka ブローカーで認証します。 - Kafka ブローカーは、独自のクライアント ID およびシークレットを使用し、認可サーバーでトークンイントロスペクションエンドポイントを呼び出すことで、アクセストークンを検証します。
- トークンが有効な場合、Kafka クライアントセッションが確立されます。
クライアントがクライアント ID およびクレデンシャルを使用し、ブローカーが高速なローカルトークン検証を実行する場合
- Kafka クライアントは、クライアント ID およびクレデンシャル、および必要に応じてリフレッシュトークンを使用して、トークンエンドポイントから認可サーバーに対して認証を行います。または、クライアントはユーザー名とパスワードを使用して認証することもできます。
- 認可サーバーは新しいアクセストークンを生成します。
-
Kafka クライアントは、SASL
OAUTHBEARER
メカニズムを使用して、有効期限の長いアクセストークンを渡すために Kafka ブローカーで認証します。 - Kafka ブローカーは、JWT トークン署名チェックおよびローカルトークンイントロスペクションを使用して、ローカルでアクセストークンを検証します。
クライアントが有効期限の長いアクセストークンを使用し、ブローカーが検証を認可サーバーに委任する場合
-
Kafka クライアントは、SASL
OAUTHBEARER
メカニズムを使用して、有効期限の長いアクセストークンを渡すために Kafka ブローカーで認証します。 - Kafka ブローカーは、独自のクライアント ID およびシークレットを使用して、認可サーバーでトークンイントロスペクションエンドポイントを呼び出し、アクセストークンを検証します。
- トークンが有効な場合、Kafka クライアントセッションが確立されます。
クライアントが有効期限の長いアクセストークンを使用し、ブローカーが高速のローカル検証を実行する場合
-
Kafka クライアントは、SASL
OAUTHBEARER
メカニズムを使用して、有効期限の長いアクセストークンを渡すために Kafka ブローカーで認証します。 - Kafka ブローカーは、JWT トークン署名チェックおよびローカルトークンイントロスペクションを使用して、ローカルでアクセストークンを検証します。
トークンが取り消された場合に認可サーバーとのチェックが行われないため、高速のローカル JWT トークン署名の検証は有効期限の短いトークンにのみ適しています。トークンの有効期限はトークンに書き込まれますが、失効はいつでも発生する可能性があるため、認可サーバーと通信せずに対応することはできません。発行されたトークンはすべて期限切れになるまで有効とみなされます。
7.2.3.2. SASL PLAIN
メカニズムを使用したクライアント認証フローの例
OAuth PLAIN
メカニズムを使用して、Kafka 認証に以下の通信フローを使用できます。
クライアントがクライアント ID およびシークレットを使用し、ブローカーがクライアントのアクセストークンを取得する場合
-
Kafka クライアントは、
clientId
をユーザー名として、secret
をパスワードとして渡します。 -
Kafka ブローカーは、トークンエンドポイントを使用して
clientId
およびsecret
を認可サーバーに渡します。 - 認可サーバーは、新しいアクセストークンまたはエラー (クライアントクレデンシャルが有効でない場合) を返します。
Kafka ブローカーは、以下のいずれかの方法でトークンを検証します。
- トークンイントロスペクションエンドポイントが指定されている場合、Kafka ブローカーは認可サーバーでエンドポイントを呼び出すことで、アクセストークンを検証します。トークンの検証に成功した場合には、セッションが確立されます。
- ローカルトークンのイントロスペクションが使用される場合、要求は認可サーバーに対して行われません。Kafka ブローカーは、JWT トークン署名チェックを使用して、アクセストークンをローカルで検証します。
クライアントが、クライアント ID およびシークレットなしで有効期限の長いアクセストークンを使用する場合
- Kafka クライアントはユーザー名とパスワードを渡します。パスワードは、クライアントを実行する前に手動で取得および設定されたアクセストークンの値を提供します。
Kafka ブローカーリスナーが認証のトークンエンドポイントで設定されているかどうかに応じて、
$accessToken:
文字列の接頭辞の有無にかかわらず、パスワードは渡されます。-
トークンエンドポイントが設定されている場合、パスワードの前に
$accessToken:
を付け、password パラメーターにクライアントシークレットではなくアクセストークンが含まれていることをブローカーに知らせる必要があります。Kafka ブローカーは、ユーザー名をアカウントのユーザー名として解釈します。 -
トークンエンドポイントが Kafka ブローカーリスナーで設定されていない場合 (
no-client-credentials mode
を強制)、パスワードは接頭辞なしでアクセストークンを提供する必要があります。Kafka ブローカーは、ユーザー名をアカウントのユーザー名として解釈します。このモードでは、クライアントはクライアント ID およびシークレットを使用せず、password
パラメーターは常に raw アクセストークンとして解釈されます。
-
トークンエンドポイントが設定されている場合、パスワードの前に
Kafka ブローカーは、以下のいずれかの方法でトークンを検証します。
- トークンイントロスペクションエンドポイントが指定されている場合、Kafka ブローカーは認可サーバーでエンドポイントを呼び出すことで、アクセストークンを検証します。トークンの検証に成功した場合には、セッションが確立されます。
- ローカルトークンイントロスペクションが使用されている場合には、認可サーバーへの要求はありません。Kafka ブローカーは、JWT トークン署名チェックを使用して、アクセストークンをローカルで検証します。
7.2.4. セッションの再認証
Kafka クライアントと Kafka ブローカー間の OAuth 2.0 セッションに Kafka session re-authentication を使用するように、OAuth リスナーを設定できます。このメカニズムは、定義された期間後に、クライアントとブローカー間の認証されたセッションを期限切れにします。セッションの有効期限が切れると、クライアントは既存の接続を破棄せずに再使用して、新しいセッションを即座に開始します。
セッションの再認証はデフォルトで無効になっています。これを有効にするには、server.properties
ファイルの connections.max.reauth.ms
プロパティーに時間の値を設定します。設定例については、「リスナーでの OAuth 2.0 認証の設定」 を参照してください。
セッションの再認証は、クライアントによって使用される Kafka クライアントライブラリーによってサポートされる必要があります。
セッションの再認証は、高速ローカル JWT または イントロスペクションエンドポイント のトークン検証と共に使用できます。
クライアントの再認証
ブローカーの認証されたセッションが期限切れになると、クライアントは接続を切断せずに新しい有効なアクセストークンをブローカーに送信し、既存のセッションを再認証する必要があります。
トークンの検証に成功すると、既存の接続を使用して新しいクライアントセッションが開始されます。クライアントが再認証に失敗した場合、さらにメッセージを送受信しようとすると、ブローカーは接続を閉じます。ブローカーで再認証メカニズムが有効になっていると、Kafka クライアントライブラリー 2.2 以降を使用する Java クライアントが自動的に再認証されます。
セッションの再認証は、リフレッシュトークンが使用されている場合にも適用されます。セッションが期限切れになると、クライアントはリフレッシュトークンを使用してアクセストークンを更新します。その後、クライアントは新しいアクセストークンを使用して既存の接続に再認証されます。
OAUTHBEARER および PLAIN のセッションの有効期限
セッションの再認証が設定されている場合、OAUTHBEARER と PLAIN 認証ではセッションの有効期限は異なります。
クライアント ID とシークレット による方法を使用する OAUTHBEARER および PLAIN の場合:
-
ブローカーの認証されたセッションは、設定された
connections.max.reauth.ms
で期限切れになります。 - アクセストークンが設定期間前に期限切れになると、セッションは設定期間前に期限切れになります。
有効期間の長いアクセストークン 方法を使用する PLAIN の場合:
-
ブローカーの認証されたセッションは、設定された
connections.max.reauth.ms
で期限切れになります。 - アクセストークンが設定期間前に期限切れになると、再認証に失敗します。セッションの再認証は試行されますが、PLAIN にはトークンを更新するメカニズムがありません。
connections.max.reauth.ms
が 設定されていない 場合は、再認証しなくても、OAUTHBEARER および PLAIN クライアントはブローカーへの接続を無期限に維持します。認証されたセッションは、アクセストークンの期限が切れても終了しません。
ただし、keycloak
認可を使用したり、カスタム authorizer をインストールして、認可を設定する場合に考慮できます。
7.2.5. 例: OAuth 2.0 認証の有効化
この例では、OAUth 2.0 認証を使用して Kafka クラスターへのクライアントアクセスを設定する方法を示します。この手順では、Kafka リスナーと Kafka Java クライアントで OAuth 2.0 認証をセットアップするために必要な設定について説明します。
7.2.5.1. Kafka ブローカーの OAuth 2.0 サポートの設定
この手順では、ブローカーリスナーが認可サーバーを使用して OAuth 2.0 認証を使用するように、Kafka ブローカーを設定する方法を説明します。
TLS リスナーを設定して、暗号化されたインターフェイスで OAuth 2.0 を使用することが推奨されます。プレーンリスナーは推奨されません。
選択した認可サーバーをサポートするプロパティーと、実装している認可のタイプを使用して、Kafka ブローカーを設定します。
前提条件
- Streams for Apache Kafka が 各ホストにインストールされており、設定ファイルが使用可能である。
- OAuth 2.0 の認可サーバーがデプロイされている。
手順
server.properties
ファイルで Kafka ブローカーリスナー設定を設定します。たとえば、OAUTHBEARER メカニズムを使用する場合:
sasl.enabled.mechanisms=OAUTHBEARER listeners=CLIENT://0.0.0.0:9092 listener.security.protocol.map=CLIENT:SASL_PLAINTEXT listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER sasl.mechanism.inter.broker.protocol=OAUTHBEARER inter.broker.listener.name=CLIENT listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required ; listener.name.client.oauthbearer.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
sasl.enabled.mechanisms=OAUTHBEARER listeners=CLIENT://0.0.0.0:9092 listener.security.protocol.map=CLIENT:SASL_PLAINTEXT listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER sasl.mechanism.inter.broker.protocol=OAUTHBEARER inter.broker.listener.name=CLIENT listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required ; listener.name.client.oauthbearer.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
Copy to Clipboard Copied! listener.name.client.oauthbearer.sasl.jaas.config
の一部としてブローカーの接続設定を行います。必要な場合は、認可サーバーへのアクセスを設定します。
この手順は通常、サービスメッシュ などの技術がコンテナーの外部でセキュアなチャネルを設定するために使用される場合を除き、実稼働環境で必要になります。
セキュアな認可サーバーに接続するためのカスタムトラストストアを指定します。認可サーバーへのアクセスには SSL が常に必要になります。
プロパティーを設定して、トラストストアを設定します。
以下に例を示します。
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ # ... oauth.client.id="kafka-broker" \ oauth.client.secret="kafka-broker-secret" \ oauth.ssl.truststore.location="<path_to_truststore_p12_file>" \ oauth.ssl.truststore.password="<truststore_password>" \ oauth.ssl.truststore.type="PKCS12" ;
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ # ... oauth.client.id="kafka-broker" \ oauth.client.secret="kafka-broker-secret" \ oauth.ssl.truststore.location="<path_to_truststore_p12_file>" \ oauth.ssl.truststore.password="<truststore_password>" \ oauth.ssl.truststore.type="PKCS12" ;
Copy to Clipboard Copied! 証明書のホスト名がアクセス URL ホスト名と一致しない場合は、証明書のホスト名の検証をオフにできます。
oauth.ssl.endpoint.identification.algorithm=""
oauth.ssl.endpoint.identification.algorithm=""
Copy to Clipboard Copied! このチェックは、認可サーバーへのクライアント接続が認証されるようにします。実稼働以外の環境で検証をオフにすることもできます。
7.2.5.2. Kafka Java クライアントで OAuth 2.0 を設定する
Kafka ブローカーとの対話に OAuth 2.0 を使用するように Kafka プロデューサー API とコンシューマー API を設定します。コールバックプラグインをクライアントの pom.xml
ファイルに追加してから、OAuth 2.0 用にクライアントを設定します。
認証プロパティーを設定する方法は、OAuth 2.0 認可サーバーへのアクセスに使用している認証方法によって異なります。この手順では、プロパティーはプロパティーファイルで指定されてから、クライアント設定にロードされます。
前提条件
- Streams for Apache Kafka と Kafka が実行中である。
- OAuth 2.0 認可サーバーがデプロイされ、Kafka ブローカーへの OAuth のアクセスが設定されている。
- Kafka ブローカーが OAuth 2.0 に対して設定されている。
手順
OAuth 2.0 サポートのあるクライアントライブラリーを Kafka クライアントの
pom.xml
ファイルに追加します。<dependency> <groupId>io.strimzi</groupId> <artifactId>kafka-oauth-client</artifactId> <version>0.15.0.redhat-00010</version> </dependency>
<dependency> <groupId>io.strimzi</groupId> <artifactId>kafka-oauth-client</artifactId> <version>0.15.0.redhat-00010</version> </dependency>
Copy to Clipboard Copied! OAuth 2.0 認証方法に応じてクライアントを設定します。
たとえば、
client.properties
ファイルで認証方法のプロパティーを指定します。OAUTH 2.0 認証のクライアントプロパティーを Java クライアントコードに入力します。
クライアントプロパティーの入力を示す例
Properties props = new Properties(); try (FileReader reader = new FileReader("client.properties", StandardCharsets.UTF_8)) { props.load(reader); }
Properties props = new Properties(); try (FileReader reader = new FileReader("client.properties", StandardCharsets.UTF_8)) { props.load(reader); }
Copy to Clipboard Copied! - Kafka クライアントが Kafka ブローカーにアクセスできることを確認します。