17.2. OAuth 2.0 トークンベース認証の使用
Streams for Apache Kafka は、トークンベースの認証に OAuth 2.0 を使用することをサポートしています。OAuth 2.0 認可サーバーは、アクセスの付与とアクセスに関する問い合わせを処理します。Kafka クライアントは Kafka ブローカーに対して認証を行います。ブローカーとクライアントは、必要に応じて認可サーバーと通信し、アクセストークンを取得または検証します。
Streams for Apache Kafka のデプロイメントでは、OAuth 2.0 統合によって次のサポートが提供されます。
- Kafka ブローカーのサーバー側 OAuth 2.0 認証
- Kafka MirrorMaker、Kafka Connect、Kafka Bridge のクライアント側 OAuth 2.0 認証
17.2.1. リスナーでの OAuth 2.0 認証の設定 リンクのコピーリンクがクリップボードにコピーされました!
OAuth 2.0 認証を使用して Kafka ブローカーを保護するには、OAUth 2.0 認証とクライアント認証メカニズムを使用するように Kafka リソースのリスナーを設定し、認証で使用する認証メカニズムとトークン検証のタイプに応じてさらに設定を追加します。
OAuth 認証を使用するようにリスナーを設定する
Kafka リソースで、oauth 認証タイプのリスナーを指定します。内部リスナーと外部リスナーを設定できます。OAuth 2.0 認証と TLS 暗号化 (tls: true) を併用することを推奨します。暗号化を行わないと、ネットワークの盗聴やトークンの盗難による不正アクセスに対して接続が脆弱になります。
OAuth 2.0 認証を使用したリスナー設定の例
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
spec:
kafka:
# ...
listeners:
- name: tls
port: 9093
type: internal
tls: true
authentication:
type: oauth
- name: external3
port: 9094
type: loadbalancer
tls: true
authentication:
type: oauth
#...
SASL 認証メカニズムの有効化
クライアントがクレデンシャルを交換し、Kafka との認証済みセッションを確立できるように、次の SASL メカニズムの 1 つまたは両方を使用します。
OAUTHBEAREROAUTHBEARER認証メカニズムを使用すると、クレデンシャルの交換に、OAuth コールバックハンドラーによって提供されるベアラートークンが使用されます。次の方法を使用するトークンのプロビジョニングを設定できます。- クライアント ID とシークレット (OAuth 2.0 クライアントクレデンシャルメカニズム を使用)
- クライアント ID とクライアントアサーション
- 有効期間の長いアクセストークンまたはサービスアカウントトークン
- 手動で取得した有効期間の長いリフレッシュトークン
OAUTHBEARERは、PLAINよりも高いレベルのセキュリティーを提供するため推奨されますが、プロトコルレベルでOAUTHBEARERメカニズムをサポートする Kafka クライアントでのみ使用できます。クライアントクレデンシャルが Kafka と共有されることはありません。PLAINPLAINは、すべての Kafka クライアントツールで使用されるシンプルな認証メカニズムです。OAUTHBEARERをサポートしていない Kafka クライアントでのみPLAINを使用することを検討してください。PLAIN認証メカニズムを使用すると、次の方法を使用するようにクレデンシャルの交換を設定できます。- クライアント ID とシークレット (OAuth 2.0 クライアントクレデンシャルメカニズム を使用)
-
有効期間の長いアクセストークン
使用する方法に関係なく、クライアントはusernameおよびpasswordプロパティーを Kafka に提供する必要があります。
クレデンシャルは、
OAUTHBEARER認証を使用する方法と同様に、準拠した認可サーバーの背後で一元的に処理されます。ユーザー名の抽出プロセスは、認可サーバーの設定によって異なります。
OAUTHBEARER は、Kafka ブローカーの OAuth リスナー設定で自動的に有効になります。PLAIN メカニズムを使用するには、enablePlain プロパティーを true に設定する必要があります。
次の例では、PLAIN メカニズムを有効にします。また、enableOauthBearer プロパティーを使用してリスナーの OAUTHBEARER メカニズムを無効します。
PLAIN メカニズムのリスナー設定の例
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
spec:
kafka:
# ...
listeners:
- name: tls
port: 9093
type: internal
tls: true
authentication:
type: oauth
- name: external3
port: 9094
type: loadbalancer
tls: true
authentication:
type: oauth
enablePlain: true
enableOauthBearer: false
#...
認証のタイプを OAuth 2.0 として定義した場合は、検証のタイプ (高速なローカル JWT 検証またはイントロスペクションエンドポイントを使用したトークン検証) に基づいて設定を追加します。
高速なローカル JWT トークン検証の設定
高速なローカル JWT トークン検証では、JWT トークン署名をローカルでチェックし、トークンが次の基準を満たしていることを確認します。
-
トークンがアクセストークンであることを示す
Bearerが、typ(タイプ) またはtoken_typeヘッダークレームの値として含まれている。 - 現在有効であり、期限切れでない。
-
validIssuerURIに一致する発行者を持つ。
リスナーの設定時に validIssuerURI 属性を指定することで、認可サーバーから発行されていないトークンは拒否されます。
高速のローカル JWT トークン検証の実行中に、認可サーバーの通信は必要はありません。OAuth 2.0 の認可サーバーによって公開されるエンドポイントの jwksEndpointUri 属性を指定して、高速のローカル JWT トークン検証をアクティベートします。エンドポイントには、署名済み JWT トークンの検証に使用される公開鍵が含まれます。これらは、Kafka クライアントによってクレデンシャルとして送信されます。
認可サーバーとの通信はすべて TLS による暗号化を使用して実行する必要があります。Streams for Apache Kafka のプロジェクト namespace で証明書トラストストアを OpenShift Secret として設定し、tlsTrustedCertificates プロパティーを使用して、トラストストアファイルを含む OpenShift シークレットを参照することができます。
JWT トークンからユーザー名を適切に取得するため、userNameClaim の設定を検討してください。必要に応じて、"['user.info'].['user.id']" のような JsonPath 式を使用して、トークン内のネストされた JSON 属性からユーザー名を取得できます。
Kafka ACL 認可を使用する場合は、認証時にユーザー名でユーザーを特定します。JWT トークンの sub 要求は、通常は一意な ID でユーザー名ではありません。
高速なローカル JWT トークン検証の設定例
#...
- name: external3
port: 9094
type: loadbalancer
tls: true
authentication:
type: oauth
validIssuerUri: https://<auth_server_address>/<issuer-context>
jwksEndpointUri: https://<auth_server_address>/<path_to_jwks_endpoint>
userNameClaim: preferred_username
maxSecondsWithoutReauthentication: 3600
tlsTrustedCertificates:
- secretName: oauth-server-cert
pattern: "*.crt"
disableTlsHostnameVerification: true
jwksExpirySeconds: 360
jwksRefreshSeconds: 300
jwksMinRefreshPauseSeconds: 1
- 1
oauthに設定されたリスナータイプ。- 2
- 認証に使用されるトークン発行者の URI。
- 3
- ローカルの JWT 検証に使用される JWKS 証明書エンドポイントの URI。
- 4
- ユーザーを識別するために使用される実際のユーザー名を含むトークンクレーム (またはキー)。その値は、認可サーバーによって異なります。必要に応じて、
"['user.info'].['user.id']"のような JsonPath 式を使用して、トークン内のネストされた JSON 属性からユーザー名を取得できます。 - 5
- (任意設定): セッションの有効期限がアクセストークンと同じ期間になるよう強制する Kafka の再認証メカニズムを有効にします。指定された値がアクセストークンの有効期限が切れるまでの残り時間よりも短い場合、クライアントは実際にトークンの有効期限が切れる前に再認証する必要があります。デフォルトでは、アクセストークンの期限が切れてもセッションは期限切れにならず、クライアントは再認証を試行しません。
- 6
- (オプション) 認可サーバーへの TLS 接続用に指定されたシークレット内に X.509 形式で保存された証明書。
- 7
- (任意設定): TLS ホスト名の検証を無効にします。デフォルトは
falseです。 - 8
- JWKS 証明書が期限切れになる前に有効であるとみなされる期間。デフォルトは
360秒です。デフォルトよりも長い時間を指定する場合は、無効になった証明書へのアクセスが許可されるリスクを考慮してください。 - 9
- JWKS 証明書を更新する間隔。この間隔は、有効期間よりも 60 秒以上短くする必要があります。デフォルトは
300秒です。 - 10
- JWKS 公開鍵の更新が連続して試行される間隔の最小一時停止時間 (秒単位)。不明な署名キーが検出されると、JWKS キーの更新は、最後に更新を試みてから少なくとも指定された期間は一時停止し、通常の定期スケジュール以外でスケジュールされます。キーの更新は指数バックオフ (指数バックオフ) のルールに従い、
jwksRefreshSecondsに到達するまで、一時停止を増やして失敗した更新を再試行します。デフォルト値は 1 です。
OpenShift サービスアカウントを使用した高速なローカル JWT トークン検証の設定
OpenShift サービスアカウントのリスナーを設定するには、Kubernetes API サーバーを認可サーバーとして使用する必要があります。
Kubernetes API サーバーを認可サーバーとして使用した高速なローカル JWT トークン検証の設定例
#...
- name: external3
port: 9094
type: loadbalancer
tls: true
authentication:
type: oauth
validIssuerUri: https://kubernetes.default.svc.cluster.local
jwksEndpointUri: https://kubernetes.default.svc.cluster.local/openid/v1/jwks
serverBearerTokenLocation: /var/run/secrets/kubernetes.io/serviceaccount/token
checkAccessTokenType: false
includeAcceptHeader: false
tlsTrustedCertificates:
- secretName: oauth-server-cert
pattern: "*.crt"
maxSecondsWithoutReauthentication: 3600
customClaimCheck: "@.['kubernetes.io'] && @.['kubernetes.io'].['namespace'] in ['myproject']"
- 1
- 認証に使用されるトークン発行者の URI。
.cluster.local拡張子を含む FQDN を使用する必要があります。これは OpenShift クラスターの設定によって異なる場合があります。 - 2
- ローカルの JWT 検証に使用される JWKS 証明書エンドポイントの URI。
.cluster.local拡張子を含む FQDN を使用する必要があります。これは OpenShift クラスターの設定によって異なる場合があります。 - 3
jwksEndpointUriにアクセスするために、Kafka ブローカーが Kubernetes API サーバーに認証するために使用するアクセストークンの場所です。- 4
- アクセストークンタイプのチェックをスキップします。サービスアカウントトークンには、このチェックに対するクレームが存在しないためです。
- 5
- JWKS エンドポイントへの HTTP リクエストの
Acceptヘッダー送信をスキップします。これは、Kubernetes API サーバーによってサポートされていないためです。 - 6
- 認可サーバーに接続するための信頼できる証明書。これは、Kubernetes API サーバーの公開証明書を含む手動で作成されたシークレットを参照する必要があります。このシークレットは、実行中の Pod の
/var/run/secrets/kubernetes.io/serviceaccount/ca.crt配下にマウントされます。シークレットは次のコマンドを使用して作成できます。oc get cm kube-root-ca.crt -o jsonpath="{['data']['ca\.crt']}" > /tmp/ca.crt oc create secret generic oauth-server-cert --from-file=ca.crt=/tmp/ca.crt - 7
- (オプション) JWT トークンが受け入れられるために満たす必要がある追加の制約。JsonPath フィルタークエリーとして表現されます。この例では、認証が許可されるには、サービスアカウントが
myprojectnamespace に属している必要があります。
上記の設定では、サービスアカウント JWT トークンの sub クレームをユーザー ID として使用します。たとえば、myproject namespace にデプロイされた Pod のデフォルトサービスアカウントのユーザー名は、system:serviceaccount:myproject:default です。
ACL を設定する場合、ServiceAccount ユーザーを参照するための一般的な形式は、User:system:serviceaccount:<Namespace>:<ServiceAccount-name> です。
イントロスペクションエンドポイントを使用したトークン検証の設定
OAuth 2.0 のイントロスペクションエンドポイントを使用したトークンの検証では、受信したアクセストークンは不透明として対処されます。Kafka ブローカーは、アクセストークンをイントロスペクションエンドポイントに送信します。このエンドポイントは、検証に必要なトークン情報を応答として返します。ここで重要なのは、特定のアクセストークンが有効である場合は最新情報を返すことで、トークンの有効期限に関する情報も返します。
OAuth 2.0 のイントロスペクションベースの検証を設定するには、高速なローカル JWT トークン検証用に指定された jwksEndpointUri 属性ではなく、introspectionEndpointUri 属性を指定します。通常、イントロスペクションエンドポイントは保護されているため、認可サーバーに応じて clientId および clientSecret を指定する必要があります。
イントロスペクションエンドポイントを使用したトークン検証設定の例
- name: external3
port: 9094
type: loadbalancer
tls: true
authentication:
type: oauth
validIssuerUri: https://<auth_server_address>/<issuer-context>
introspectionEndpointUri: https://<auth_server_address>/<path_to_introspection_endpoint>
clientId: kafka-broker
clientSecret:
secretName: my-cluster-oauth
key: clientSecret
userNameClaim: preferred_username
maxSecondsWithoutReauthentication: 3600
tlsTrustedCertificates:
- secretName: oauth-server-cert
pattern: "*.crt"
- 1
- トークンイントロスペクションエンドポイントの URI。
- 2
- クライアントを識別するためのクライアント ID。
- 3
- 認証にはクライアントシークレットとクライアント ID が使用されます。
- 4
- ユーザーを識別するために使用される実際のユーザー名を含むトークンクレーム (またはキー)。その値は、認可サーバーによって異なります。必要に応じて、
"['user.info'].['user.id']"のような JsonPath 式を使用して、トークン内のネストされた JSON 属性からユーザー名を取得できます。 - 5
- (任意設定): セッションの有効期限がアクセストークンと同じ期間になるよう強制する Kafka の再認証メカニズムを有効にします。指定された値がアクセストークンの有効期限が切れるまでの残り時間よりも短い場合、クライアントは実際にトークンの有効期限が切れる前に再認証する必要があります。デフォルトでは、アクセストークンの期限が切れてもセッションは期限切れにならず、クライアントは再認証を試行しません。
認可サーバーの保護されたエンドポイントへのブローカーの認証
通常、認可サーバーの証明書エンドポイント (jwksEndpointUri) はパブリックにアクセス可能ですが、イントロスペクションエンドポイント (introspectionEndpointUri) は保護されています。ただし、これは認可サーバーの設定によって異なる場合があります。
Kafka ブローカーは、HTTP 認証スキームを使用して、次のどちらかの方法で、認可サーバーの保護されたエンドポイントに対して認証できます。
- HTTP Basic 認証 でクライアント ID とシークレットを使用する。
- HTTP ベアラー認証 でベアラートークンを使用する。
HTTP Basic 認証を設定するには、次のプロパティーを設定します。
-
clientId -
clientSecret
HTTP ベアラー認証の場合は、次のプロパティーを設定します。
-
serverBearerTokenLocation: ベアラートークンを含むディスク上のファイルパスを指定します。
追加の設定オプションの指定
認証要件と使用している認可サーバーに応じて追加の設定を指定します。これらのプロパティーの一部は、特定の認証メカニズムにのみ適用されるか、他のプロパティーと組み合わせて使用される場合にのみ適用されます。
たとえば、PLAIN メカニズムで OAUth を使用する場合、アクセストークンが、$accessToken: 接頭辞の有無にかかわらず、password プロパティー値として渡されます。
-
リスナー設定でトークンエンドポイント (
tokenEndpointUri) を設定する場合は、接頭辞が必要です。 - リスナー設定でトークンエンドポイントを設定しない場合、接頭辞は必要ありません。Kafka ブローカーは、パスワードを raw アクセストークンとして解釈します。
アクセストークンとして password が設定されている場合、username は Kafka ブローカーがアクセストークンから取得するプリンシパル名と同じものを設定する必要があります。userNameClaim、usernamePrefix、fallbackUserNameClaim、fallbackUsernamePrefix、および userInfoEndpointUri プロパティーを使用して、リスナーでユーザー名抽出オプションを指定できます。ユーザー名の抽出プロセスも、認可サーバーによって異なります。特に、クライアント ID をアカウント名にマッピングする方法により異なります。
PLAIN メカニズムはパスワードグラント認証をサポートしていません。認証には、クライアントクレデンシャル (クライアント ID + シークレット) またはアクセストークンのいずれかを使用してください。
任意設定の例
# ...
authentication:
type: oauth
# ...
checkIssuer: false
checkAudience: true
usernamePrefix: user-
fallbackUserNameClaim: client_id
fallbackUserNamePrefix: client-account-
serverBearerTokenLocation: path/to/access/token
validTokenType: bearer
userInfoEndpointUri: https://<auth_server_address>/<path_to_userinfo_endpoint>
enableOauthBearer: false
enablePlain: true
tokenEndpointUri: https://<auth_server_address>/<path_to_token_endpoint>
customClaimCheck: "@.custom == 'custom-value'"
clientAudience: audience
clientScope: scope
connectTimeoutSeconds: 60
readTimeoutSeconds: 60
httpRetries: 2
httpRetryPauseMs: 300
groupsClaim: "$.groups"
groupsClaimDelimiter: ","
includeAcceptHeader: false
- 1
- 認可サーバーが
issクレームを提供しない場合は、発行者チェックを行うことができません。このような場合、checkIssuerをfalseに設定し、validIssuerUriを指定しないようにします。デフォルトはtrueです。 - 2
- オーソリゼーションサーバーが
aud(オーディエンス) クレームを提供していて、オーディエンスチェックを実施したい場合は、checkAudienceをtrueに設定します。オーディエンスチェックによって、トークンの目的の受信者が特定されます。これにより、Kafka ブローカーはaud要求にclientIdを持たないトークンを拒否します。デフォルトはfalseです。 - 3
- ユーザー ID を構成するときに使用する接頭辞。これは、
userNameClaimが設定されている場合にのみ有効になります。 - 4
- 認可サーバーは、通常ユーザーとクライアントの両方を識別する単一の属性を提供しない場合があります。クライアントが独自の名前で認証される場合、サーバーによって クライアント ID が提供されることがあります。更新トークンまたはアクセストークンを取得するために、ユーザー名およびパスワードを使用してユーザーが認証される場合、サーバーによってクライアント ID の他に username が提供されることがあります。プライマリーユーザー ID 属性が使用できない場合は、このフォールバックオプションで、使用するユーザー名クレーム (属性) を指定します。必要に応じて、
"['client.info'].['client.id']"のような JsonPath 式を使用してフォールバックユーザー名を取得し、トークン内のネストされた JSON 属性からユーザー名を取得できます。 - 5
fallbackUserNameClaimが適用される場合、ユーザー名クレームの値とフォールバックユーザー名クレームの値が競合しないようにする必要もあることがあります。producerというクライアントが存在し、producerという通常ユーザーも存在する場合を考えてみましょう。この 2 つを区別するには、このプロパティーを使用してクライアントのユーザー ID に接頭辞を追加します。- 6
- 保護されたエンドポイントにアクセスするために、Kafka ブローカーが Kubernetes API サーバーに認証するために使用するアクセストークンの場所です。認可サーバーが
OAUTHBEARER認証をサポートしている必要があります。これは、PLAIN認証を使用するclientIdとclientSecretを指定する代わりに使用できます。 - 7
- (
introspectionEndpointUriを使用する場合のみ該当): 使用している認可サーバーによっては、イントロスペクションエンドポイントによって トークンタイプ 属性が返されるかどうかは分からず、異なる値が含まれることがあります。イントロスペクションエンドポイントからの応答に含まれなければならない有効なトークンタイプ値を指定できます。 - 8
- (
introspectionEndpointUriを使用する場合のみ該当): イントロスペクションエンドポイントの応答に識別可能な情報が含まれないように、認可サーバーが設定または実装されることがあります。ユーザー ID を取得するには、userinfoエンドポイントの URI をフォールバックとして設定します。userNameClaim、fallbackUserNameClaim、およびfallbackUserNamePrefixの設定がuserinfoエンドポイントの応答に適用されます。 - 9
- これを
falseに設定してリスナーでOAUTHBEARERメカニズムを無効にします。PLAINまたはOAUTHBEARERのいずれかを有効にする必要があります。デフォルトはtrueです。 - 10
- リスナーで
PLAIN認証を有効にするには、trueに設定します。これは、すべてのプラットフォームのクライアントでサポートされています。 - 11
PLAINメカニズムの追加設定。これが指定されている場合、クライアントは$accessToken:接頭辞を使用してアクセストークンをpasswordとして渡すことで、PLAIN経由で認証できます。実稼働環境の場合は、常にhttps://urls を使用してください。- 12
- これを JsonPath フィルタークエリーに設定すると、検証中に追加のカスタムルールを JWT アクセストークンに適用できます。アクセストークンに必要なデータが含まれていないと拒否されます。
introspectionEndpointUriを使用する場合、カスタムチェックはイントロスペクションエンドポイントの応答 JSON に適用されます。 - 13
- トークンエンドポイントに渡される
audienceパラメーター。オーディエンス は、ブローカー間認証用にアクセストークンを取得する場合に使用されます。これは、clientIdとsecretを使用する OAuth 2.0 overPLAINクライアント認証のクライアント名にも使用されます。これは、認可サーバーに応じて、トークンの取得機能とトークンの内容のみに影響します。リスナーによるトークン検証ルールには影響しません。 - 14
scopeパラメーターがトークンエンドポイントに渡されます。スコープ は、ブローカー間認証用にアクセストークンを取得する場合に使用されます。これは、clientIdとsecretを使用する OAuth 2.0 overPLAINクライアント認証のクライアント名にも使用されます。これは、認可サーバーに応じて、トークンの取得機能とトークンの内容のみに影響します。リスナーによるトークン検証ルールには影響しません。- 15
- 認可サーバーへの接続時のタイムアウト (秒単位)。デフォルト値は 60 です。
- 16
- 認可サーバーへの接続時の読み取りタイムアウト (秒単位)。デフォルト値は 60 です。
- 17
- 認可サーバーへの失敗した HTTP リクエストを再試行する最大回数。デフォルト値は
0で、再試行は実行されないことを意味します。このオプションを効果的に使用するには、connectTimeoutSecondsオプションとreadTimeoutSecondsオプションのタイムアウト時間を短縮することを検討してください。ただし、再試行により現在のワーカースレッドが他のリクエストで利用できなくなる可能性があり、リクエストが多すぎると停止する場合、Kafka ブローカーが応答しなくなる可能性があることに注意してください。 - 18
- 認可サーバーへの失敗した HTTP リクエストの再試行を行うまでの待機時間。デフォルトでは、この時間はゼロに設定されており、一時停止は適用されません。これは、リクエストの失敗の原因となる問題の多くは、リクエストごとのネットワークの不具合やプロキシーの問題であり、すぐに解決できるためです。ただし、認可サーバーに負荷がかかっている場合、または高トラフィックが発生している場合は、このオプションを 100 ミリ秒以上の値に設定して、サーバーの負荷を軽減し、再試行が成功する可能性を高めることができます。
- 19
- JWT トークンまたはイントロスペクションエンドポイント応答からグループ情報を抽出するために使用される JsonPath クエリー。このオプションはデフォルトでは設定されていません。このオプションを設定すると、カスタム承認者はユーザーグループに基づいて承認の決定を行うことができます。
- 20
- グループ情報が単一の区切り文字列として返される場合に、グループ情報を解析するために使用される区切り文字。デフォルト値は ',' (コンマ) です。
- 21
- 一部の認可サーバーでは、クライアントが
Accept: application/jsonヘッダーを送信する際に問題が発生します。includeAcceptHeader: falseを設定する場合、ヘッダーは送信されません。デフォルトはtrueです。
17.2.2. クライアントアプリケーションでの OAuth 2.0 の設定 リンクのコピーリンクがクリップボードにコピーされました!
クライアントアプリケーションで OAuth 2.0 を設定するには、以下を指定する必要があります。
- SASL (Simple Authentication and Security Layer) セキュリティープロトコル
- SASL メカニズム
- JAAS (Java Authentication and Authorization Service) モジュール
- 認可サーバーにアクセスするための認証プロパティー
SASL プロトコルの設定
クライアント設定で SASL プロトコルを指定します。
-
TLS 暗号化接続を介した認証用の
SASL_SSL -
暗号化されていない接続を介した認証用の
SASL_PLAINTEXT
プロダクションには SASL_SSL を使用し、ローカル開発には SASL_PLAINTEXT のみを使用してください。
SASL_SSL を使用する場合は、追加の ssl.truststore 設定が必要です。OAuth 2.0 認可サーバーへのセキュアな接続 (https://) には、トラストストア設定が必要です。OAuth 2.0 認可サーバーを確認するには、認可サーバーの CA 証明書をクライアント設定のトラストストアに追加します。トラストストアは、PEM または PKCS #12 形式で設定できます。
SASL 認証メカニズムの設定
クライアント設定で SASL メカニズムを指定します。
-
ベアラートークンを使用したクレデンシャル交換用の
OAUTHBEARER -
クライアントクレデンシャル (clientId + secret) またはアクセストークンを渡す
PLAIN
JAAS モジュールの設定
SASL 認証メカニズムを実装する JAAS モジュールを sasl.jaas.config プロパティーの値として指定します。
-
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModuleはOAUTHBEARERメカニズムを実装します。 -
org.apache.kafka.common.security.plain.PlainLoginModuleはPLAINメカニズムを実装します。
OAUTHBEARER メカニズムの場合、Streams for Apache Kafka は、クレデンシャルの交換を可能にするために、Kafka クライアント Java ライブラリーを使用するクライアントにコールバックハンドラーを提供します。他の言語のクライアントの場合、アクセストークンを取得するためにカスタムコードが必要になる場合があります。PLAIN メカニズムの場合、Streams for Apache Kafka は、クレデンシャルの交換を可能にするために、サーバー側のコールバックを提供します。
OAUTHBEARER メカニズムを使用できるようにするには、コールバックハンドラーとしてカスタムの io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler クラスも追加する必要があります。JaasClientOauthLoginCallbackHandler は、クライアントのログイン中にアクセストークンの認可サーバーへの OAuth コールバックを処理します。これにより、トークンの自動更新が可能になり、ユーザーの介入なしで継続的な認証が保証されます。さらに、OAuth 2.0 パスワードグラント方式を使用してクライアントのログインクレデンシャルを処理します。
認証プロパティーの設定
OAuth 2.0 認証にクレデンシャルまたはアクセストークンを使用するようにクライアントを設定します。
- クライアントクレデンシャルの使用
- クライアントクレデンシャルを使用するには、認可サーバーから有効なアクセストークンを取得するために必要なクレデンシャル (クライアント ID とシークレットまたはクライアント ID とクライアントアサーション) を使用してクライアントを設定する必要があります。これは最も単純なメカニズムです。
- アクセストークンの使用
- アクセストークンを使用すると、クライアントが、認可サーバーから取得した有効な有効期間の長いアクセストークンまたはリフレッシュトークンで設定されます。アクセストークンを使用すると、認可サーバーツールへ依存が増すため、複雑さが増します。有効期間が長いアクセストークンを使用している場合は、認可サーバーでクライアントを設定し、トークンの最大有効期間を長くする必要があります。
Kafka に送信される情報はアクセストークンだけです。トークンの取得に使用されるクレデンシャルは、Kafka に送信されることはありません。クライアントによるアクセストークンの取得後、認可サーバーと通信する必要はありません。
SASL 認証プロパティーは、次の認証方法をサポートします。
- OAuth 2.0 クライアントクレデンシャル
- アクセストークンまたはサービスアカウントトークン
- リフレッシュトークン
- OAuth 2.0 パスワードグラント (非推奨)
認証プロパティーを JAAS 設定 (sasl.jaas.config および sasl.login.callback.handler.class) として追加します。
クライアントアプリケーションがアクセストークンを使用して直接設定されていない場合、クライアントは Kafka セッションの開始時に、次のいずれかのクレデンシャルセットをアクセストークンと交換します。
- クライアント ID およびシークレット
- クライアント ID とクライアントアサーション
- クライアント ID、リフレッシュトークン、および (必要に応じて) シークレット
- ユーザー名とパスワード、およびクライアント ID と (必要に応じて) シークレット
認証プロパティーを環境変数または Java システムプロパティーとして指定することもできます。Java システムプロパティーの場合は、setProperty を使用して設定し、-D オプションを使用してコマンドラインで渡すことができます。
クライアントシークレットを使用したクライアントクレデンシャルの設定例
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
ssl.truststore.location=/tmp/truststore.p12
ssl.truststore.password=$STOREPASS
ssl.truststore.type=PKCS12
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
oauth.token.endpoint.uri="<token_endpoint_url>" \
oauth.client.id="<client_id>" \
oauth.client.secret="<client_secret>" \
oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \
oauth.ssl.truststore.password="$STOREPASS" \
oauth.ssl.truststore.type="PKCS12" \
oauth.scope="<scope>" \
oauth.audience="<audience>" ;
sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
- 1
- TLS 暗号化接続用の
SASL_SSLセキュリティープロトコル。ローカル開発のみでは、暗号化されていない接続でSASL_PLAINTEXTを使用します。 - 2
OAUTHBEARERまたはPLAINとして指定された SASL メカニズム。- 3
- Kafka クラスターへのセキュアなアクセスのためのトラストストア設定。
- 4
- 認可サーバーのトークンエンドポイントの URI です。
- 5
- クライアント ID。認可サーバーで クライアント を作成するときに使用される名前です。
- 6
- 認可サーバーで クライアント を作成するときに作成されるクライアントシークレット。
- 7
- この場所には、認可サーバーの公開鍵証明書 (
truststore.p12) が含まれています。 - 8
- トラストストアにアクセスするためのパスワード。
- 9
- トラストストアのタイプ。
- 10
- (オプション): トークンエンドポイントからトークンを要求するための
scope。認可サーバーでは、クライアントによるスコープの指定が必要になることがあります。 - 11
- (オプション) トークンエンドポイントからトークンを要求するための
audience。認可サーバーでは、クライアントによるオーディエンスの指定が必要になることがあります。
クライアントアサーションを使用したクライアントクレデンシャルの設定例
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
ssl.truststore.location=/tmp/truststore.p12
ssl.truststore.password=$STOREPASS
ssl.truststore.type=PKCS12
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
oauth.token.endpoint.uri="<token_endpoint_url>" \
oauth.client.id="<client_id>" \
oauth.client.assertion.location="<path_to_client_assertion_token_file>" \
oauth.client.assertion.type="urn:ietf:params:oauth:client-assertion-type:jwt-bearer" \
oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \
oauth.ssl.truststore.password="$STOREPASS" \
oauth.ssl.truststore.type="PKCS12" \
oauth.scope="<scope>" \
oauth.audience="<audience>" ;
sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
パスワードグラントの設定例
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
ssl.truststore.location=/tmp/truststore.p12
ssl.truststore.password=$STOREPASS
ssl.truststore.type=PKCS12
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
oauth.token.endpoint.uri="<token_endpoint_url>" \
oauth.client.id="<client_id>" \
oauth.client.secret="<client_secret>" \
oauth.password.grant.username="<username>" \
oauth.password.grant.password="<password>" \
oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \
oauth.ssl.truststore.password="$STOREPASS" \
oauth.ssl.truststore.type="PKCS12" \
oauth.scope="<scope>" \
oauth.audience="<audience>" ;
sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
- 1
- クライアント ID。認可サーバーで クライアント を作成するときに使用される名前です。
- 2
- (オプション) 認可サーバーで クライアント を作成するときに作成されるクライアントシークレット。
- 3
- パスワードグラント認証のユーザー名。OAuth パスワードグラント設定 (ユーザー名とパスワード) では、OAuth 2.0 パスワードグラント方式が使用されます。パスワードグラントを使用するには、権限が制限されたクライアントのユーザーアカウントを認可サーバー上に作成します。アカウントは、サービスアカウントのように機能する必要があります。認証にユーザーアカウントが必要な環境で使用しますが、先にリフレッシュトークンの使用を検討してください。
- 4
- パスワードグラント認証のパスワード。注記
SASL
PLAINは、OAuth 2.0 パスワードグラント方式を使用したユーザー名とパスワードの受け渡し (パスワードグラント) をサポートしていません。
アクセストークンの設定例
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
ssl.truststore.location=/tmp/truststore.p12
ssl.truststore.password=$STOREPASS
ssl.truststore.type=PKCS12
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
oauth.access.token="<access_token>" ;
sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
- 1
- Kafka クライアントの有効期間が長いアクセストークン。または、
oauth.access.token.locationを使用して、アクセストークンを含むファイルを指定することもできます。
OpenShift サービスアカウントトークンの設定例
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
ssl.truststore.location=/tmp/truststore.p12
ssl.truststore.password=$STOREPASS
ssl.truststore.type=PKCS12
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
oauth.access.token.location="/var/run/secrets/kubernetes.io/serviceaccount/token";
sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
- 1
- ファイルシステム上のサービスアカウントトークンの場所 (クライアントが OpenShift Pod としてデプロイされていると想定)
リフレッシュトークンの設定例
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
ssl.truststore.location=/tmp/truststore.p12
ssl.truststore.password=$STOREPASS
ssl.truststore.type=PKCS12
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
oauth.token.endpoint.uri="<token_endpoint_url>" \
oauth.client.id="<client_id>" \
oauth.client.secret="<client_secret>" \
oauth.refresh.token="<refresh_token>" \
oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \
oauth.ssl.truststore.password="$STOREPASS" \
oauth.ssl.truststore.type="PKCS12" ;
sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
カスタム OAUTHBEARER 実装用の SASL 拡張
Kafka ブローカーがカスタム OAUTHBEARER 実装を使用する場合は、追加の SASL 拡張オプションを渡す必要が生じる場合があります。これらの拡張機能には、認可サーバーによりクライアントコンテキストとして必要な属性または情報を含めることができます。オプションはキーと値のペアとして渡され、新しいセッションが起動すると Kafka ブローカーに送信されます。
キー接頭辞として oauth.sasl.extension. を使用して SASL 拡張値を渡します。
SASL 拡張値を渡すための設定例
oauth.sasl.extension.key1="value1"
oauth.sasl.extension.key2="value2"
17.2.3. OAuth 2.0 クライアント認証フロー リンクのコピーリンクがクリップボードにコピーされました!
OAuth 2.0 認証フローは、基礎となる Kafka クライアントおよび Kafka ブローカー設定によって異なります。フローは、使用する認可サーバーによってもサポートされる必要があります。
Kafka ブローカーリスナー設定は、クライアントがアクセストークンを使用して認証する方法を決定します。クライアントはクライアント ID およびシークレットを渡してアクセストークンをリクエストできます。
リスナーが PLAIN 認証を使用するように設定されている場合、クライアントはクライアント ID とシークレット、またはユーザー名とアクセストークンを使用して認証できます。これらの値は、PLAIN メカニズムの username および password プロパティーとして渡されます。
リスナー設定は、以下のトークン検証オプションをサポートします。
- 認可サーバーと通信しない、JWT の署名確認およびローカルトークンのイントロスペクションをベースとした高速なローカルトークン検証を使用できます。認可サーバーは、トークンで署名を検証するために使用される公開証明書のある JWKS エンドポイントを提供します。
- 認可サーバーが提供するトークンイントロスペクションエンドポイントへの呼び出しを使用することができます。新しい Kafka ブローカー接続が確立されるたびに、ブローカーはクライアントから受け取ったアクセストークンを認可サーバーに渡します。Kafka ブローカーは応答を確認して、トークンが有効かどうかを確認します。
認可サーバーは不透明なアクセストークンの使用のみを許可する可能性があり、この場合はローカルトークンの検証は不可能です。
Kafka クライアントクレデンシャルは、以下のタイプの認証に対して設定することもできます。
- 以前に生成された有効期間の長いアクセストークンを使用した直接ローカルアクセス
- 新しいアクセストークンを発行するために認可サーバーに問い合わせる (クライアント ID およびクレデンシャル、またはリフレッシュトークン、またはユーザー名とパスワードを使用)
17.2.3.1. SASL OAUTHBEARER メカニズムを使用したクライアント認証フローの例 リンクのコピーリンクがクリップボードにコピーされました!
SASL OAUTHBEARER メカニズムを使用して、Kafka 認証に以下の通信フローを使用できます。
クライアントがクライアント ID およびクレデンシャルを使用し、ブローカーが認可サーバーに検証を委譲する場合
- Kafka クライアントは、クライアント ID およびクレデンシャル、および必要に応じてリフレッシュトークンを使用して、認可サーバーからアクセストークンを要求します。または、クライアントはユーザー名とパスワードを使用して認証することもできます。
- 認可サーバーは新しいアクセストークンを生成します。
-
Kafka クライアントは、SASL
OAUTHBEARERメカニズムを使用して、有効期限の長いアクセストークンを渡すために Kafka ブローカーで認証します。 - Kafka ブローカーは、独自のクライアント ID およびシークレットを使用し、認可サーバーでトークンイントロスペクションエンドポイントを呼び出すことで、アクセストークンを検証します。
- トークンが有効な場合、Kafka クライアントセッションが確立されます。
クライアントがクライアント ID およびクレデンシャルを使用し、ブローカーが高速なローカルトークン検証を実行する場合
- Kafka クライアントは、クライアント ID およびクレデンシャル、および必要に応じてリフレッシュトークンを使用して、トークンエンドポイントから認可サーバーに対して認証を行います。または、クライアントはユーザー名とパスワードを使用して認証することもできます。
- 認可サーバーは新しいアクセストークンを生成します。
-
Kafka クライアントは、SASL
OAUTHBEARERメカニズムを使用して、有効期限の長いアクセストークンを渡すために Kafka ブローカーで認証します。 - Kafka ブローカーは、JWT トークン署名チェックおよびローカルトークンイントロスペクションを使用して、ローカルでアクセストークンを検証します。
クライアントが有効期限の長いアクセストークンを使用し、ブローカーが検証を認可サーバーに委譲する場合
-
Kafka クライアントは、SASL
OAUTHBEARERメカニズムを使用して、有効期限の長いアクセストークンを渡すために Kafka ブローカーで認証します。 - Kafka ブローカーは、独自のクライアント ID およびシークレットを使用して、認可サーバーでトークンイントロスペクションエンドポイントを呼び出し、アクセストークンを検証します。
- トークンが有効な場合、Kafka クライアントセッションが確立されます。
クライアントが有効期限の長いアクセストークンを使用し、ブローカーが高速のローカル検証を実行する場合
-
Kafka クライアントは、SASL
OAUTHBEARERメカニズムを使用して、有効期限の長いアクセストークンを渡すために Kafka ブローカーで認証します。 - Kafka ブローカーは、JWT トークン署名チェックおよびローカルトークンイントロスペクションを使用して、ローカルでアクセストークンを検証します。
トークンが取り消された場合に認可サーバーとのチェックが行われないため、高速のローカル JWT トークン署名の検証は有効期限の短いトークンにのみ適しています。トークンの有効期限はトークンに書き込まれますが、失効はいつでも発生する可能性があるため、認可サーバーと通信せずに対応することはできません。発行されたトークンはすべて期限切れになるまで有効とみなされます。
17.2.3.2. SASL PLAIN メカニズムを使用したクライアント認証フローの例 リンクのコピーリンクがクリップボードにコピーされました!
OAuth PLAIN メカニズムを使用して、Kafka 認証に以下の通信フローを使用できます。
クライアントがクライアント ID およびシークレットを使用し、ブローカーがクライアントのアクセストークンを取得する場合
-
Kafka クライアントは、
clientIdをユーザー名として、secretをパスワードとして渡します。 -
Kafka ブローカーは、トークンエンドポイントを使用して
clientIdおよびsecretを認可サーバーに渡します。 - 認可サーバーは、新しいアクセストークンまたはエラー (クライアントクレデンシャルが有効でない場合) を返します。
Kafka ブローカーは、以下のいずれかの方法でトークンを検証します。
- トークンイントロスペクションエンドポイントが指定されている場合、Kafka ブローカーは認可サーバーでエンドポイントを呼び出すことで、アクセストークンを検証します。トークンの検証に成功した場合には、セッションが確立されます。
- ローカルトークンのイントロスペクションが使用される場合、要求は認可サーバーに対して行われません。Kafka ブローカーは、JWT トークン署名チェックを使用して、アクセストークンをローカルで検証します。
クライアントが、クライアント ID およびシークレットなしで有効期限の長いアクセストークンを使用する場合
- Kafka クライアントはユーザー名とパスワードを渡します。パスワードは、クライアントを実行する前に手動で取得および設定されたアクセストークンの値を提供します。
Kafka ブローカーリスナーが認証のトークンエンドポイントで設定されているかどうかに応じて、
$accessToken:文字列の接頭辞の有無にかかわらず、パスワードは渡されます。-
トークンエンドポイントが設定されている場合、パスワードの前に
$accessToken:を付け、password パラメーターにクライアントシークレットではなくアクセストークンが含まれていることをブローカーに知らせる必要があります。Kafka ブローカーは、ユーザー名をアカウントのユーザー名として解釈します。 -
トークンエンドポイントが Kafka ブローカーリスナーで設定されていない場合 (
no-client-credentials modeを強制)、パスワードは接頭辞なしでアクセストークンを提供する必要があります。Kafka ブローカーは、ユーザー名をアカウントのユーザー名として解釈します。このモードでは、クライアントはクライアント ID およびシークレットを使用せず、passwordパラメーターは常に raw アクセストークンとして解釈されます。
-
トークンエンドポイントが設定されている場合、パスワードの前に
Kafka ブローカーは、以下のいずれかの方法でトークンを検証します。
- トークンイントロスペクションエンドポイントが指定されている場合、Kafka ブローカーは認可サーバーでエンドポイントを呼び出すことで、アクセストークンを検証します。トークンの検証に成功した場合には、セッションが確立されます。
- ローカルトークンイントロスペクションが使用されている場合には、認可サーバーへの要求はありません。Kafka ブローカーは、JWT トークン署名チェックを使用して、アクセストークンをローカルで検証します。
17.2.4. セッションの再認証 リンクのコピーリンクがクリップボードにコピーされました!
Kafka クライアントと Kafka 間の OAuth 2.0 セッションに Kafka セッションの再認証 を使用するように OAuth リスナーを設定します。このメカニズムは、定義された期間後に、クライアントとブローカー間の認証されたセッションを期限切れにします。セッションの有効期限が切れると、クライアントは既存の接続を破棄せずに再使用して、新しいセッションを即座に開始します。
セッションの再認証はデフォルトで無効になっています。これを有効にするには、oauth リスナー設定で maxSecondsWithoutReauthentication の時間値を設定します。このプロパティーは、OAUTHBEARER 認証でも PLAIN 認証でも、セッションの再認証を設定するために使用されます。設定例は、「リスナーでの OAuth 2.0 認証の設定」 を参照してください。
セッションの再認証は、クライアントによって使用される Kafka クライアントライブラリーによってサポートされる必要があります。
セッションの再認証は、高速ローカル JWT または イントロスペクションエンドポイント のトークン検証と共に使用できます。
クライアントの再認証
ブローカーの認証されたセッションが期限切れになると、クライアントは接続を切断せずに新しい有効なアクセストークンをブローカーに送信し、既存のセッションを再認証する必要があります。
トークンの検証に成功すると、既存の接続を使用して新しいクライアントセッションが開始されます。クライアントが再認証に失敗した場合、さらにメッセージを送受信しようとすると、ブローカーは接続を閉じます。ブローカーで再認証メカニズムが有効になっていると、Kafka クライアントライブラリー 2.2 以降を使用する Java クライアントが自動的に再認証されます。
セッションの再認証は、リフレッシュトークンが使用されている場合にも適用されます。セッションが期限切れになると、クライアントは更新トークンを使用してアクセストークンを更新します。その後、クライアントは新しいアクセストークンを使用して既存のセッションに再認証されます。
セッションの有効期限
セッションの再認証が設定されている場合、OAUTHBEARER 認証と PLAIN 認証で、セッションの有効期限の動作が異なります。
クライアント ID とシークレットによる方法を使用する OAUTHBEARER および PLAIN の場合:
-
ブローカーの認証されたセッションは、設定された
maxSecondsWithoutReauthenticationで期限切れになります。 - アクセストークンが設定期間前に期限切れになると、セッションは設定期間前に期限切れになります。
有効期間の長いアクセストークンによる方法を使用する PLAIN の場合:
-
ブローカーの認証されたセッションは、設定された
maxSecondsWithoutReauthenticationで期限切れになります。 -
アクセストークンが設定期間前に期限切れになると、再認証に失敗します。セッションの再認証が試行されますが、
PLAINにはトークンを更新するメカニズムがありません。
maxSecondsWithoutReauthentication が 設定されていない 場合、OAUTHBEARER および PLAIN クライアントは、再認証しなくてもブローカーへの接続を無限に保持できます。認証されたセッションは、アクセストークンの期限が切れても終了しません。ただし、keycloak 認可を使用したり、カスタム authorizer をインストールして、認可を設定する場合に考慮できます。
17.2.5. 例: OAuth 2.0 認証の有効化 リンクのコピーリンクがクリップボードにコピーされました!
この例では、OAUth 2.0 認証を使用して Kafka クラスターへのクライアントアクセスを設定する方法を示します。この手順では、Kafka リスナー、Kafka Java クライアント、および Kafka コンポーネントで OAuth 2.0 認証をセットアップするために必要な設定を説明します。
17.2.5.1. リスナーでの OAuth 2.0 認証の設定 リンクのコピーリンクがクリップボードにコピーされました!
認可サーバーを使用して OAuth 2.0 認証を使用できるように Kafka リスナーを設定します。
tls: true を指定したリスナーにより暗号化されたインターフェイス上で OAuth 2.0 を使用することを推奨します。プレーンリスナーは推奨されません。
認可サーバーが信頼できる CA によって署名された証明書を使用し、OAuth 2.0 サーバーのホスト名と一致する場合、TLS 接続はデフォルト設定を使用して動作します。それ以外の場合は、適切な証明書でトラストストアを設定するか、証明書のホスト名の検証を無効にする必要があります。
Kafka ブローカーリスナーの OAuth 2.0 認証の設定に関する詳細は、KafkaListenerAuthenticationOAuth スキーマリファレンス を参照してください。
前提条件
- Streams for Apache Kafka と Kafka が実行中である。
- OAuth 2.0 の認可サーバーがデプロイされている。
手順
Kafkaリソースで、oauth認証タイプのリスナーを指定します。OAuth 2.0 認証を使用したリスナー設定の例
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka spec: kafka: # ... listeners: - name: tls port: 9093 type: internal tls: true authentication: type: oauth - name: external3 port: 9094 type: loadbalancer tls: true authentication: type: oauth #...認可サーバーおよび検証タイプに応じて OAuth リスナーを設定します。
-
Kafka設定に変更を適用します。 更新をログで確認するか、Pod 状態の遷移を監視して確認します。
oc logs -f ${POD_NAME} -c ${CONTAINER_NAME} oc get pod -wローリング更新によって、ブローカーが OAuth 2.0 認証を使用するように設定されます。
17.2.5.2. Kafka Java クライアントで OAuth 2.0 を設定する リンクのコピーリンクがクリップボードにコピーされました!
Kafka ブローカーとの対話に OAuth 2.0 を使用するように Kafka プロデューサー API とコンシューマー API を設定します。コールバックプラグインをクライアントの pom.xml ファイルに追加してから、OAuth 2.0 用にクライアントを設定します。
認証プロパティーを設定する方法は、OAuth 2.0 認可サーバーへのアクセスに使用している認証方法によって異なります。この手順では、プロパティーはプロパティーファイルで指定されてから、クライアント設定にロードされます。
前提条件
- Streams for Apache Kafka と Kafka が実行中である。
- OAuth 2.0 認可サーバーがデプロイされ、Kafka ブローカーへの OAuth のアクセスが設定されている。
- Kafka ブローカーが OAuth 2.0 に対して設定されている。
手順
OAuth 2.0 サポートのあるクライアントライブラリーを Kafka クライアントの
pom.xmlファイルに追加します。<dependency> <groupId>io.strimzi</groupId> <artifactId>kafka-oauth-client</artifactId> <version>0.15.1.redhat-00005</version> </dependency>OAuth 2.0 認証方法に応じてクライアントを設定します。
たとえば、
client.propertiesファイルで認証方法のプロパティーを指定します。OAUTH 2.0 認証のクライアントプロパティーを Java クライアントコードに入力します。
クライアントプロパティーの入力を示す例
Properties props = new Properties(); try (FileReader reader = new FileReader("client.properties", StandardCharsets.UTF_8)) { props.load(reader); }- Kafka クライアントが Kafka ブローカーにアクセスできることを確認します。
17.2.5.3. Kafka コンポーネントでの OAuth 2.0 の設定 リンクのコピーリンクがクリップボードにコピーされました!
この手順では、認可サーバーを使用して OAuth 2.0 認証を使用するように Kafka コンポーネントを設定する方法を説明します。
次のコンポーネントに対して OAuth 2.0 認証を設定できます。
- Kafka Connect
- Kafka MirrorMaker
- Kafka Bridge
この手順では、Kafka コンポーネントと認可サーバーは同じサーバーで稼働しています。
作業を開始する前の注意事項
Kafka コンポーネントの OAuth 2.0 認証の設定に関する詳細は、KafkaClientAuthenticationOAuth スキーマリファレンス を参照してください。スキーマリファレンスには、設定オプションの例が記載されています。
前提条件
- Streams for Apache Kafka と Kafka が実行中である。
- OAuth 2.0 認可サーバーがデプロイされ、Kafka ブローカーへの OAuth のアクセスが設定されている。
- Kafka ブローカーが OAuth 2.0 に対して設定されている。
手順
クライアントシークレットを作成し、これを環境変数としてコンポーネントにマウントします。
以下は、Kafka Bridge の
Secretを作成する例になります。apiVersion: kafka.strimzi.io/v1beta2 kind: Secret metadata: name: my-bridge-oauth type: Opaque data: clientSecret: MGQ1OTRmMzYtZTllZS00MDY2LWI5OGEtMTM5MzM2NjdlZjQw1 - 1
clientSecretキーは base64 形式である必要があります。
Kafka コンポーネントのリソースを作成または編集し、OAuth 2.0 認証が認証プロパティーに設定されるようにします。
OAuth 2.0 認証の場合、次のオプションを使用できます。
- クライアント ID およびシークレット
- クライアント ID とクライアントアサーション
- クライアント ID およびリフレッシュトークン
- アクセストークン
- ユーザー名およびパスワード
- TLS
以下は、クライアント ID、シークレット、および TLS を使用して OAuth 2.0 が Kafka Bridge クライアントに割り当てられる例になります。
クライアントシークレットを使用した OAuth 2.0 認証設定の例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaBridge metadata: name: my-bridge spec: # ... authentication: type: oauth1 tokenEndpointUri: https://<auth_server_address>/<path_to_token_endpoint>2 clientId: kafka-bridge clientSecret: secretName: my-bridge-oauth key: clientSecret tlsTrustedCertificates:3 - secretName: oauth-server-cert pattern: "*.crt"この例では、クライアント ID とクライアントアサーションファイルの場所を使用して OAuth 2.0 が Kafka Bridge クライアントに割り当てられます。また、TLS を使用して認可サーバーに接続します。
クライアントアサーションを使用した OAuth 2.0 認証設定の例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaBridge metadata: name: my-bridge spec: # ... authentication: type: oauth tokenEndpointUri: https://<auth_server_address>/<path_to_token_endpoint> clientId: kafka-bridge clientAssertionLocation: /var/run/secrets/sso/assertion1 tlsTrustedCertificates: - secretName: oauth-server-cert pattern: "*.crt"- 1
- クライアントの認証に使用されるクライアントアサーションファイルへのファイルシステムパス。このファイルは、通常、外部 Operator サービスによってデプロイされた Pod に追加されます。または、
clientAssertionを使用して、クライアントアサーション値を含むシークレットを参照します。
ここでは、サービスアカウントトークンを使用して、OAuth 2.0 が Kafka Bridge クライアントに割り当てられます。
サービスアカウントトークンを使用した OAuth 2.0 認証設定の例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaBridge metadata: name: my-bridge spec: # ... authentication: type: oauth accessTokenLocation: /var/run/secrets/kubernetes.io/serviceaccount/token1 - 1
- サービスアカウントトークンファイルの場所へのパス。
OAuth 2.0 認証の適用方法や、認可サーバーのタイプによって、使用できる追加の設定オプションがあります。
その他の設定オプション
# ... spec: # ... authentication: # ... disableTlsHostnameVerification: true1 accessTokenIsJwt: false2 scope: any3 audience: kafka4 connectTimeoutSeconds: 605 readTimeoutSeconds: 606 httpRetries: 27 httpRetryPauseMs: 3008 includeAcceptHeader: false9 - 1
- (任意設定): TLS ホスト名の検証を無効にします。デフォルトは
falseです。 - 2
- 不透明なトークンを使用している場合、アクセストークンが JWT トークンとして処理されないように
accessTokenIsJwt: falseを適用することができます。 - 3
- (オプション): トークンエンドポイントからトークンを要求するための
scope。認可サーバーでは、クライアントによるスコープの指定が必要になることがあります。この場合ではanyになります。 - 4
- (オプション) トークンエンドポイントからトークンを要求するための
audience。認可サーバーでは、クライアントによるオーディエンスの指定が必要になることがあります。今回の場合はkafkaです。 - 5
- (オプション) 認可サーバーへの接続時のタイムアウト (秒単位)。デフォルト値は 60 です。
- 6
- (オプション): 認可サーバーへの接続時の読み取りタイムアウト (秒単位)。デフォルト値は 60 です。
- 7
- (オプション) 認可サーバーへの失敗した HTTP リクエストを再試行する最大回数。デフォルト値は
0で、再試行は実行されないことを意味します。このオプションを効果的に使用するには、connectTimeoutSecondsオプションとreadTimeoutSecondsオプションのタイムアウト時間を短縮することを検討してください。ただし、再試行により現在のワーカースレッドが他のリクエストで利用できなくなる可能性があり、リクエストが多すぎると停止する場合、Kafka ブローカーが応答しなくなる可能性があることに注意してください。 - 8
- (オプション) 失敗した認可サーバーへの HTTP リクエストの再試行を行うまでの待機時間。デフォルトでは、この時間はゼロに設定されており、一時停止は適用されません。これは、リクエストの失敗の原因となる問題の多くは、リクエストごとのネットワークの不具合やプロキシーの問題であり、すぐに解決できるためです。ただし、認可サーバーに負荷がかかっている場合、または高トラフィックが発生している場合は、このオプションを 100 ミリ秒以上の値に設定して、サーバーの負荷を軽減し、再試行が成功する可能性を高めることができます。
- 9
- (オプション) 一部の認可サーバーでは、クライアントが
Accept: application/jsonヘッダーを送信する際に問題が発生します。includeAcceptHeader: falseを設定する場合、ヘッダーは送信されません。デフォルトはtrueです。
- コンポーネントのリソース設定に変更を適用します。
更新をログで確認するか、Pod 状態の遷移を監視して確認します。
oc logs -f ${POD_NAME} -c ${CONTAINER_NAME} oc get pod -wローリング更新では、OAuth 2.0 認証を使用して Kafka ブローカーと対話するコンポーネントが設定されます。