15.4. OAuth 2.0 トークンベース認証の使用


Streams for Apache Kafka は、OAUTHBEARER および PLAIN メカニズムを使用した OAuth 2.0 認証 の使用をサポートしています。

OAuth 2.0 は、アプリケーション間で標準的なトークンベースの認証および認可を有効にし、中央の認可サーバーを使用してリソースに制限されたアクセス権限を付与するトークンを発行します。

OAuth 2.0 認証を設定した後に OAuth 2.0 認可 を設定できます。

Kafka ブローカーおよびクライアントの両方が OAuth 2.0 を使用するように設定する必要があります。OAuth 2.0 認証は、simple または OPA ベースの Kafka authorization と併用することもできます。

OAuth 2.0 のトークンベースの認証を使用すると、アプリケーションクライアントはアカウントのクレデンシャルを公開せずにアプリケーションサーバー (リソースサーバー と呼ばれる) のリソースにアクセスできます。

アプリケーションクライアントは、アクセストークンを認証の手段として渡します。アプリケーションサーバーはこれを使用して、付与するアクセス権限のレベルを決定することもできます。認可サーバーは、アクセスの付与とアクセスに関する問い合わせを処理します。

Streams for Apache Kafka のコンテキストでは、以下が行われます。

  • Kafka ブローカーは OAuth 2.0 リソースサーバーとして動作します。
  • Kafka クライアントは OAuth 2.0 アプリケーションクライアントとして動作します。

Kafka クライアントは Kafka ブローカーに対して認証を行います。ブローカーおよびクライアントは、必要に応じて OAuth 2.0 認可サーバーと通信し、アクセストークンを取得または検証します。

Streams for Apache Kafka のデプロイメントでは、OAuth 2.0 統合により以下が提供されます。

  • Kafka ブローカーのサーバー側 OAuth 2.0 サポート
  • Kafka MirrorMaker、Kafka Connect、および Kafka Bridge のクライアント側 OAuth 2.0 サポート。

15.4.1. OAuth 2.0 認証メカニズム

Streams for Apache Kafka は、OAuth 2.0 認証用の OAUTHBEARER および PLAIN メカニズムをサポートしています。どちらのメカニズムも、Kafka クライアントが Kafka ブローカーで認証されたセッションを確立できるようにします。クライアント、認可サーバー、および Kafka ブローカー間の認証フローは、メカニズムごとに異なります。

可能な限り、OAUTHBEARER を使用するようにクライアントを設定することが推奨されます。OAUTHBEARER では、クライアントクレデンシャルは Kafka ブローカーと 共有されることがない ため、PLAIN よりも高レベルのセキュリティーが提供されます。OAUTHBEARER をサポートしない Kafka クライアントの場合のみ、PLAIN の使用を検討してください。

クライアントの接続に OAuth 2.0 認証を使用するように Kafka ブローカーリスナーを設定します。必要な場合は、同じ oauth リスナーで OAUTHBEARER および PLAIN メカニズムを使用できます。各メカニズムをサポートするプロパティーは、oauth リスナー設定で明示的に指定する必要があります。

OAUTHBEARER の概要

OAUTHBEARER は、Kafka ブローカーの oauth リスナー設定で自動的に有効になります。enableOauthBearer プロパティーを true に設定できますが、これは必須ではありません。

  # ...
  authentication:
    type: oauth
    # ...
    enableOauthBearer: true

また、多くの Kafka クライアントツールでは、プロトコルレベルで OAUTHBEARER の基本サポートを提供するライブラリーを使用します。Streams for Apache Kafka では、アプリケーションの開発を支援するために、アップストリームの Kafka Client Java ライブラリー用の OAuth コールバックハンドラー が提供されます (ただし、他のライブラリーは対象外)。そのため、独自のコールバックハンドラーを作成する必要はありません。アプリケーションクライアントはコールバックハンドラーを使用してアクセストークンを提供できます。Go などの他言語で書かれたクライアントは、カスタムコードを使用して認可サーバーに接続し、アクセストークンを取得する必要があります。

OAUTHBEARER を使用する場合、クライアントはクレデンシャルを交換するために Kafka ブローカーでセッションを開始します。ここで、クレデンシャルはコールバックハンドラーによって提供されるベアラートークンの形式を取ります。コールバックを使用して、以下の 3 つの方法のいずれかでトークンの提供を設定できます。

  • クライアント ID および Secret (OAuth 2.0 クライアントクレデンシャルメカニズム を使用)
  • 設定時に手動で取得された有効期限の長いアクセストークン
  • 設定時に手動で取得された有効期限の長い更新トークン
注記

OAUTHBEARER 認証は、プロトコルレベルで OAUTHBEARER メカニズムをサポートする Kafka クライアントでのみ使用できます。

PLAIN の概要

PLAIN を使用するには、Kafka ブローカーの oauth リスナー設定で有効にする必要があります。

以下の例では、デフォルトで有効になっている OAUTHBEARER に加え、PLAIN も有効になっています。PLAIN のみを使用する場合は、enableOauthBearerfalse に設定して OAUTHBEARER を無効にすることができます。

  # ...
  authentication:
    type: oauth
    # ...
    enablePlain: true
    tokenEndpointUri: https://OAUTH-SERVER-ADDRESS/auth/realms/external/protocol/openid-connect/token

PLAIN は、すべての Kafka クライアントツールによって使用される簡単な認証メカニズムです。PLAIN を OAuth 2.0 認証で使用できるようにするために、Streams for Apache Kafka では、OAuth 2.0 over PLAIN サーバー側コールバックが提供されます。

PLAIN の Streams for Apache Kafka 実装では、クライアントクレデンシャルは ZooKeeper に保存されません。代わりに、OAUTHBEARER 認証が使用される場合と同様に、クライアントのクレデンシャルは準拠した認可サーバーの背後で一元的に処理されます。

OAuth 2.0 over PLAIN コールバックを併用する場合、以下のいずれかの方法を使用して Kafka クライアントは Kafka ブローカーで認証されます。

  • クライアント ID およびシークレット (OAuth 2.0 クライアントクレデンシャルメカニズムを使用)
  • 設定時に手動で取得された有効期限の長いアクセストークン

どちらの方法でも、クライアントは Kafka ブローカーにクレデンシャルを渡すために、PLAIN username および password プロパティーを提供する必要があります。クライアントはこれらのプロパティーを使用してクライアント ID およびシークレット、または、ユーザー名およびアクセストークンを渡します。

クライアント ID およびシークレットは、アクセストークンの取得に使用されます。

アクセストークンは、password プロパティーの値として渡されます。$accessToken: 接頭辞の有無に関わらずアクセストークンを渡します。

  • リスナー設定でトークンエンドポイント (tokenEndpointUri) を設定する場合は、接頭辞が必要です。
  • リスナー設定でトークンエンドポイント (tokenEndpointUri) を設定しない場合は、接頭辞は必要ありません。Kafka ブローカーは、パスワードを raw アクセストークンとして解釈します。

アクセストークンとして password が設定されている場合、username は Kafka ブローカーがアクセストークンから取得するプリンシパル名と同じものを設定する必要があります。userNameClaimfallbackUserNameClaimfallbackUsernamePrefix、および userInfoEndpointUri プロパティーを使用すると、リスナーにユーザー名抽出オプションを指定できます。ユーザー名の抽出プロセスも、認可サーバーによって異なります。特に、クライアント ID をアカウント名にマッピングする方法により異なります。

注記

OAuth over PLAIN は、password grant メカニズムをサポートしていません。上記のように、SASL PLAIN メカニズムを介して、client credentials (clientId + シークレット) またはアクセストークンをプロキシーすることしかできません。

15.4.2. OAuth 2.0 Kafka ブローカーの設定

OAuth 2.0 の Kafka ブローカー設定には、以下が関係します。

  • 認可サーバーでの OAuth 2.0 クライアントの作成
  • Kafka カスタムリソースでの OAuth 2.0 認証の設定
注記

認可サーバーに関連する Kafka ブローカーおよび Kafka クライアントはどちらも OAuth 2.0 クライアントと見なされます。

15.4.2.1. 認可サーバーの OAuth 2.0 クライアント設定

セッションの開始中に受信されたトークンを検証するように Kafka ブローカーを設定するには、認可サーバーで OAuth 2.0 の クライアント 定義を作成し、以下のクライアントクレデンシャルが有効な状態で 機密情報 として設定することが推奨されます。

  • kafka のクライアント ID (例)
  • 認証メカニズムとしてのクライアント ID およびシークレット
注記

認可サーバーのパブリックでないイントロスペクションエンドポイントを使用する場合のみ、クライアント ID およびシークレットを使用する必要があります。高速のローカル JWT トークンの検証と同様に、パブリック認可サーバーのエンドポイントを使用する場合は通常、クレデンシャルは必要ありません。

15.4.2.2. Kafka クラスターでの OAuth 2.0 認証設定

Kafka クラスターで OAuth 2.0 認証を使用するには、たとえば、認証方法が oauth の Kafka クラスターカスタムリソースの tls リスナー設定を指定します。

OAuth 2.0 の認証方法タイプの割り当て

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
spec:
  kafka:
    # ...
    listeners:
      - name: tls
        port: 9093
        type: internal
        tls: true
        authentication:
          type: oauth
      #...

リスナーで OAuth 2.0 認証を設定できます。OAuth 2.0 認証と TLS 暗号化 (tls: true) を併用することを推奨します。暗号化を行わないと、ネットワークの盗聴やトークンの盗難による不正アクセスに対して接続が脆弱になります。

external リスナーを type: oauth で設定し、セキュアなトランスポート層がクライアントと通信するようにします。

OAuth 2.0 の外部リスナーとの使用

# ...
listeners:
  - name: external3
    port: 9094
    type: loadbalancer
    tls: true
    authentication:
      type: oauth
    #...

tls プロパティーはデフォルトで false に設定されているため、有効にする必要があります。

認証のタイプを OAuth 2.0 として定義した場合、検証のタイプに基づいて、 高速のローカル JWT 検証 または イントロスペクションエンドポイントを使用したトークンの検証 のいずれかとして、設定を追加します。

説明や例を用いてリスナー向けに OAuth 2.0 を設定する手順は、Kafka ブローカーの OAuth 2.0 サポートの設定 を参照してください。

15.4.2.3. 高速なローカル JWT トークン検証の設定

高速なローカル JWT トークンの検証では、JWT トークンの署名がローカルでチェックされます。

ローカルチェックでは、トークンに対して以下が確認されます。

  • アクセストークンに Bearer の (typ) 要求値が含まれ、トークンがタイプに準拠することを確認します。
  • 有効 (期限切れでない) かどうかを確認します。
  • トークンに validIssuerURI と一致する発行元があることを確認します。

リスナーの設定時に validIssuerURI 属性を指定することで、認可サーバーから発行されていないトークンは拒否されます。

高速のローカル JWT トークン検証の実行中に、認可サーバーの通信は必要はありません。OAuth 2.0 の認可サーバーによって公開されるエンドポイントの jwksEndpointUri 属性を指定して、高速のローカル JWT トークン検証をアクティベートします。エンドポイントには、署名済み JWT トークンの検証に使用される公開鍵が含まれます。これらは、Kafka クライアントによってクレデンシャルとして送信されます。

注記

認可サーバーとの通信はすべて TLS による暗号化を使用して実行する必要があります。

Streams for Apache Kafka のプロジェクト namespace で証明書トラストストアを OpenShift シークレットとして設定し、tlsTrustedCertificates 属性を使用して、トラストストアファイルを含む OpenShift シークレットを参照することができます。

JWT トークンからユーザー名を適切に取得するため、userNameClaim の設定を検討してください。必要に応じて、"['user.info'].['user.id']" のような JsonPath 式を使用して、トークン内のネストされた JSON 属性からユーザー名を取得できます。

Kafka ACL 認可を使用する場合は、認証中にユーザー名でユーザーを特定する必要があります。JWT トークンの sub 要求は、通常は一意な ID でユーザー名ではありません。

高速なローカル JWT トークン検証の設定例

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
spec:
  kafka:
    #...
    listeners:
      - name: tls
        port: 9093
        type: internal
        tls: true
        authentication:
          type: oauth
          validIssuerUri: <https://<auth_server_address>/auth/realms/tls>
          jwksEndpointUri: <https://<auth_server_address>/auth/realms/tls/protocol/openid-connect/certs>
          userNameClaim: preferred_username
          maxSecondsWithoutReauthentication: 3600
          tlsTrustedCertificates:
          - secretName: oauth-server-cert
            certificate: ca.crt
    #...

15.4.2.4. OAuth 2.0 イントロスペクションエンドポイントの設定

OAuth 2.0 のイントロスペクションエンドポイントを使用したトークンの検証では、受信したアクセストークンは不透明として対処されます。Kafka ブローカーは、アクセストークンをイントロスペクションエンドポイントに送信します。このエンドポイントは、検証に必要なトークン情報を応答として返します。ここで重要なのは、特定のアクセストークンが有効である場合は最新情報を返すことで、トークンの有効期限に関する情報も返します。

OAuth 2.0 のイントロスペクションベースの検証を設定するには、高速のローカル JWT トークン検証に指定された jwksEndpointUri 属性ではなく、introspectionEndpointUri 属性を指定します。通常、イントロスペクションエンドポイントは保護されているため、認可サーバーに応じて clientId および clientSecret を指定する必要があります。

イントロスペクションエンドポイントの設定例

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
spec:
  kafka:
    listeners:
      - name: tls
        port: 9093
        type: internal
        tls: true
        authentication:
          type: oauth
          clientId: kafka-broker
          clientSecret:
            secretName: my-cluster-oauth
            key: clientSecret
          validIssuerUri: <https://<auth_server_-_address>/auth/realms/tls>
          introspectionEndpointUri: <https://<auth_server_address>/auth/realms/tls/protocol/openid-connect/token/introspect>
          userNameClaim: preferred_username
          maxSecondsWithoutReauthentication: 3600
          tlsTrustedCertificates:
          - secretName: oauth-server-cert
            certificate: ca.crt

15.4.3. Kafka ブローカーの再認証の設定

Kafka クライアントと Kafka ブローカー間の OAuth 2.0 セッションに Kafka session re-authentication を使用するように、oauth リスナーを設定できます。このメカニズムは、定義された期間後に、クライアントとブローカー間の認証されたセッションを期限切れにします。セッションの有効期限が切れると、クライアントは既存の接続を破棄せずに再使用して、新しいセッションを即座に開始します。

セッションの再認証はデフォルトで無効になっています。これを有効にするには、oauth リスナー設定で maxSecondsWithoutReauthentication の時間値を設定します。OAUTHBEARER および PLAIN 認証では、同じプロパティーを使用してセッションの再認証が設定されます。設定例については、「Kafka ブローカーの OAuth 2.0 サポートの設定」 を参照してください。

セッションの再認証は、クライアントによって使用される Kafka クライアントライブラリーによってサポートされる必要があります。

セッションの再認証は、高速ローカル JWT または イントロスペクションエンドポイント のトークン検証と共に使用できます。

クライアントの再認証

ブローカーの認証されたセッションが期限切れになると、クライアントは接続を切断せずに新しい有効なアクセストークンをブローカーに送信し、既存のセッションを再認証する必要があります。

トークンの検証に成功すると、既存の接続を使用して新しいクライアントセッションが開始されます。クライアントが再認証に失敗した場合、さらにメッセージを送受信しようとすると、ブローカーは接続を閉じます。ブローカーで再認証メカニズムが有効になっていると、Kafka クライアントライブラリー 2.2 以降を使用する Java クライアントが自動的に再認証されます。

セッションの再認証は、リフレッシュトークンが使用されている場合にも適用されます。セッションが期限切れになると、クライアントは更新トークンを使用してアクセストークンを更新します。その後、クライアントは新しいアクセストークンを使用して既存のセッションに再認証されます。

OAUTHBEARER および PLAIN のセッションの有効期限

セッションの再認証が設定されている場合、OAUTHBEARER と PLAIN 認証ではセッションの有効期限は異なります。

クライアント ID とシークレットによる方法を使用する OAUTHBEARER および PLAIN の場合:

  • ブローカーの認証されたセッションは、設定された maxSecondsWithoutReauthentication で期限切れになります。
  • アクセストークンが設定期間前に期限切れになると、セッションは設定期間前に期限切れになります。

有効期間の長いアクセストークンによる方法を使用する PLAIN の場合:

  • ブローカーの認証されたセッションは、設定された maxSecondsWithoutReauthentication で期限切れになります。
  • アクセストークンが設定期間前に期限切れになると、再認証に失敗します。セッションの再認証は試行されますが、PLAIN にはトークンを更新するメカニズムがありません。

maxSecondsWithoutReauthentication設定されていない 場合、OAUTHBEARER および PLAIN クライアントは、再認証しなくてもブローカーへの接続を無限に保持できます。認証されたセッションは、アクセストークンの期限が切れても終了しません。ただし、keycloak 認可を使用したり、カスタム authorizer をインストールして、認可を設定する場合に考慮できます。

15.4.4. OAuth 2.0 Kafka クライアントの設定

Kafka クライアントは以下のいずれかで設定されます。

  • 認可サーバーから有効なアクセストークンを取得するために必要なクレデンシャル (クライアント ID およびシークレット)。
  • 認可サーバーから提供されたツールを使用して取得された、有効期限の長い有効なアクセストークンまたは更新トークン。

アクセストークンは、Kafka ブローカーに送信される唯一の情報です。アクセストークンを取得するために認可サーバーでの認証に使用されるクレデンシャルは、ブローカーに送信されません。

クライアントによるアクセストークンの取得後、認可サーバーと通信する必要はありません。

クライアント ID とシークレットを使用した認証が最も簡単です。有効期間の長いアクセストークンまたは更新トークンを使用すると、認可サーバーツールに追加の依存関係があるため、より複雑になります。

注記

有効期間が長いアクセストークンを使用している場合は、認可サーバーでクライアントを設定し、トークンの最大有効期間を長くする必要があります。

Kafka クライアントが直接アクセストークンで設定されていない場合、クライアントは認可サーバーと通信して Kafka セッションの開始中にアクセストークンのクレデンシャルを交換します。Kafka クライアントは以下のいずれかを交換します。

  • クライアント ID およびシークレット
  • クライアント ID、更新トークン、および (任意の) シークレット
  • ユーザー名とパスワード、およびクライアント ID と (オプションで) シークレット

15.4.5. OAuth 2.0 クライアント認証フロー

OAuth 2.0 認証フローは、基礎となる Kafka クライアントおよび Kafka ブローカー設定によって異なります。フローは、使用する認可サーバーによってもサポートされる必要があります。

Kafka ブローカーリスナー設定は、クライアントがアクセストークンを使用して認証する方法を決定します。クライアントはクライアント ID およびシークレットを渡してアクセストークンをリクエストできます。

リスナーが PLAIN 認証を使用するように設定されている場合、クライアントはクライアント ID およびシークレット、または、ユーザー名およびアクセストークンで認証できます。これらの値は PLAIN メカニズムの username および password プロパティーとして渡されます。

リスナー設定は、以下のトークン検証オプションをサポートします。

  • 認可サーバーと通信しない、JWT の署名確認およびローカルトークンのイントロスペクションをベースとした高速なローカルトークン検証を使用できます。認可サーバーは、トークンで署名を検証するために使用される公開証明書のある JWKS エンドポイントを提供します。
  • 認可サーバーが提供するトークンイントロスペクションエンドポイントへの呼び出しを使用することができます。新しい Kafka ブローカー接続が確立されるたびに、ブローカーはクライアントから受け取ったアクセストークンを認可サーバーに渡します。Kafka ブローカーは応答を確認して、トークンが有効かどうかを確認します。
注記

認可サーバーは不透明なアクセストークンの使用のみを許可する可能性があり、この場合はローカルトークンの検証は不可能です。

Kafka クライアントクレデンシャルは、以下のタイプの認証に対して設定することもできます。

  • 以前に生成された有効期間の長いアクセストークンを使用した直接ローカルアクセス
  • 新しいアクセストークンを発行するには、認可サーバーに連絡します (クライアント ID とシークレット、または更新トークン、またはユーザー名とパスワードを使用)。

15.4.5.1. SASL OAUTHBEARER メカニズムを使用したクライアント認証フローの例

SASL OAUTHBEARER メカニズムを使用して、Kafka 認証に以下の通信フローを使用できます。

クライアントがクライアント ID とシークレットを使用し、ブローカーが検証を認可サーバーに委任する場合

Client using client ID and secret with broker delegating validation to authorization server

  1. Kafka クライアントは、クライアント ID とシークレット、および必要に応じてリフレッシュトークンを使用して、認可サーバーからアクセストークンを要求します。または、クライアントはユーザー名とパスワードを使用して認証することもできます。
  2. 認可サーバーは新しいアクセストークンを生成します。
  3. Kafka クライアントは、SASL OAUTHBEARER メカニズムを使用してアクセストークンを渡すことで Kafka ブローカーで認証されます。
  4. Kafka ブローカーは、独自のクライアント ID およびシークレットを使用し、認可サーバーでトークンイントロスペクションエンドポイントを呼び出すことで、アクセストークンを検証します。
  5. トークンが有効な場合、Kafka クライアントセッションが確立されます。

クライアントがクライアント ID およびシークレットを使用し、ブローカーが高速のローカルトークン検証を実行する場合

Client using client ID and secret with broker performing fast local token validation

  1. Kafka クライアントは、クライアント ID とシークレット、および必要に応じてリフレッシュトークンを使用して、トークンエンドポイントから認可サーバーに対して認証を行います。または、クライアントはユーザー名とパスワードを使用して認証することもできます。
  2. 認可サーバーは新しいアクセストークンを生成します。
  3. Kafka クライアントは、SASL OAUTHBEARER メカニズムを使用してアクセストークンを渡すことで Kafka ブローカーで認証されます。
  4. Kafka ブローカーは、JWT トークン署名チェックおよびローカルトークンイントロスペクションを使用して、ローカルでアクセストークンを検証します。

クライアントが有効期限の長いアクセストークンを使用し、ブローカーが検証を認可サーバーに委任する場合

Client using long-lived access token with broker delegating validation to authorization server

  1. Kafka クライアントは、SASL OAUTHBEARER メカニズムを使用して、有効期限の長いアクセストークンを渡すために Kafka ブローカーで認証します。
  2. Kafka ブローカーは、独自のクライアント ID およびシークレットを使用して、認可サーバーでトークンイントロスペクションエンドポイントを呼び出し、アクセストークンを検証します。
  3. トークンが有効な場合、Kafka クライアントセッションが確立されます。

クライアントが有効期限の長いアクセストークンを使用し、ブローカーが高速のローカル検証を実行する場合

Client using long-lived access token with broker performing fast local validation

  1. Kafka クライアントは、SASL OAUTHBEARER メカニズムを使用して、有効期限の長いアクセストークンを渡すために Kafka ブローカーで認証します。
  2. Kafka ブローカーは、JWT トークン署名チェックおよびローカルトークンイントロスペクションを使用して、ローカルでアクセストークンを検証します。
警告

トークンが取り消された場合に認可サーバーとのチェックが行われないため、高速のローカル JWT トークン署名の検証は有効期限の短いトークンにのみ適しています。トークンの有効期限はトークンに書き込まれますが、失効はいつでも発生する可能性があるため、認可サーバーと通信せずに対応することはできません。発行されたトークンはすべて期限切れになるまで有効とみなされます。

15.4.5.2. SASL PLAIN メカニズムを使用したクライアント認証フローの例

OAuth PLAIN メカニズムを使用して、Kafka 認証に以下の通信フローを使用できます。

クライアントがクライアント ID およびシークレットを使用し、ブローカーがクライアントのアクセストークンを取得する場合

Client using a client ID and secret with the broker obtaining the access token for the client

  1. Kafka クライアントは、clientId をユーザー名として、secret をパスワードとして渡します。
  2. Kafka ブローカーは、トークンエンドポイントを使用して clientId および secret を認可サーバーに渡します。
  3. 認可サーバーは、新しいアクセストークンまたはエラー (クライアントクレデンシャルが有効でない場合) を返します。
  4. Kafka ブローカーは、以下のいずれかの方法でトークンを検証します。

    1. トークンイントロスペクションエンドポイントが指定されている場合、Kafka ブローカーは認可サーバーでエンドポイントを呼び出すことで、アクセストークンを検証します。トークンの検証に成功した場合には、セッションが確立されます。
    2. ローカルトークンのイントロスペクションが使用される場合、要求は認可サーバーに対して行われません。Kafka ブローカーは、JWT トークン署名チェックを使用して、アクセストークンをローカルで検証します。

クライアントが、クライアント ID およびシークレットなしで有効期限の長いアクセストークンを使用する場合

Client using a long-lived access token without a client ID and secret

  1. Kafka クライアントはユーザー名とパスワードを渡します。パスワードは、クライアントを実行する前に手動で取得および設定されたアクセストークンの値を提供します。
  2. Kafka ブローカーリスナーが認証のトークンエンドポイントで設定されているかどうかに応じて、$accessToken: 文字列の接頭辞の有無にかかわらず、パスワードは渡されます。

    1. トークンエンドポイントが設定されている場合、パスワードの前に $accessToken: を付け、password パラメーターにクライアントシークレットではなくアクセストークンが含まれていることをブローカーに知らせる必要があります。Kafka ブローカーは、ユーザー名をアカウントのユーザー名として解釈します。
    2. トークンエンドポイントが Kafka ブローカーリスナーで設定されていない場合 (no-client-credentials mode を強制)、パスワードは接頭辞なしでアクセストークンを提供する必要があります。Kafka ブローカーは、ユーザー名をアカウントのユーザー名として解釈します。このモードでは、クライアントはクライアント ID およびシークレットを使用せず、password パラメーターは常に raw アクセストークンとして解釈されます。
  3. Kafka ブローカーは、以下のいずれかの方法でトークンを検証します。

    1. トークンイントロスペクションエンドポイントが指定されている場合、Kafka ブローカーは認可サーバーでエンドポイントを呼び出すことで、アクセストークンを検証します。トークンの検証に成功した場合には、セッションが確立されます。
    2. ローカルトークンイントロスペクションが使用されている場合には、認可サーバーへの要求はありません。Kafka ブローカーは、JWT トークン署名チェックを使用して、アクセストークンをローカルで検証します。

15.4.6. OAuth 2.0 認証の設定

OAuth 2.0 は、Kafka クライアントと Streams for Apache Kafka コンポーネントとの対話に使用されます。

Streams for Apache Kafka に OAuth 2.0 を使用するには、以下を行う必要があります。

15.4.6.1. OAuth 2.0 認可サーバーの設定

この手順では、Streams for Apache Kafka との統合用に認可サーバーを設定するために必要な一般的な手順について説明します。

これらの手順は製品固有のものではありません。

手順は、選択した認可サーバーによって異なります。OAuth 2.0 アクセスの設定方法については、認可サーバーの製品ドキュメントを参照してください。

注記

認可サーバーがすでにデプロイされている場合は、デプロイ手順をスキップして、現在のデプロイを使用できます。

手順

  1. 認可サーバーをクラスターにデプロイします。
  2. 認可サーバーの CLI または管理コンソールにアクセスして、Streams for Apache Kafka 用に OAuth 2.0 を設定します。

    Streams for Apache Kafka で動作するように認可サーバーを準備します。

  3. kafka-broker クライアントを設定します。
  4. アプリケーションの Kafka クライアントコンポーネントごとにクライアントを設定します。

次のステップ

認可サーバーのデプロイおよび設定後に、Kafka ブローカーが OAuth 2.0 を使用するように設定 します。

15.4.6.2. Kafka ブローカーの OAuth 2.0 サポートの設定

この手順では、ブローカーリスナーが認可サーバーを使用して OAuth 2.0 認証を使用するように、Kafka ブローカーを設定する方法について説明します。

tls: true を使用したリスナーを介して、暗号化されたインターフェイスで OAuth 2.0 を使用することを推奨します。プレーンリスナーは推奨されません。

認可サーバーが信頼できる CA によって署名された証明書を使用し、OAuth 2.0 サーバーのホスト名と一致する場合、TLS 接続はデフォルト設定を使用して動作します。それ以外の場合は、適切な証明書でトラストストアを設定するか、証明書のホスト名の検証を無効にする必要があります。

Kafka ブローカーの設定する場合、新たに接続された Kafka クライアントの OAuth 2.0 認証中にアクセストークンを検証するために使用されるメカニズムには、以下の 2 つのオプションがあります。

作業を開始する前の注意事項

Kafka ブローカーリスナーの OAuth 2.0 認証の設定に関する詳細は、以下を参照してください。

前提条件

  • Streams for Apache Kafka と Kafka が実行中である。
  • OAuth 2.0 の認可サーバーがデプロイされている。

手順

  1. エディターで、Kafka リソースの Kafka ブローカー設定 (Kafka.spec.kafka) を更新します。

    oc edit kafka my-cluster
  2. Kafka ブローカーの listeners 設定を行います。

    各タイプのリスナーは独立しているため、同じ設定にする必要はありません。

    以下は、外部リスナーに設定された設定オプションの例になります。

    例 1: 高速なローカル JWT トークン検証の設定

    #...
    - name: external3
      port: 9094
      type: loadbalancer
      tls: true
      authentication:
        type: oauth 1
        validIssuerUri: https://<auth_server_address>/auth/realms/external 2
        jwksEndpointUri: https://<auth_server_address>/auth/realms/external/protocol/openid-connect/certs 3
        userNameClaim: preferred_username 4
        maxSecondsWithoutReauthentication: 3600 5
        tlsTrustedCertificates: 6
        - secretName: oauth-server-cert
          certificate: ca.crt
        disableTlsHostnameVerification: true 7
        jwksExpirySeconds: 360 8
        jwksRefreshSeconds: 300 9
        jwksMinRefreshPauseSeconds: 1 10

    1
    oauth に設定されたリスナータイプ。
    2
    認証に使用されるトークン発行者の URI。
    3
    ローカルの JWT 検証に使用される JWKS 証明書エンドポイントの URI。
    4
    ユーザーを識別するために使用される実際のユーザー名を含むトークンクレーム (またはキー)。その値は、認可サーバーによって異なります。必要に応じて、"['user.info'].['user.id']" のような JsonPath 式を使用して、トークン内のネストされた JSON 属性からユーザー名を取得できます。
    5
    (任意設定): セッションの有効期限がアクセストークンと同じ期間になるよう強制する Kafka の再認証メカニズムを有効にします。指定された値がアクセストークンの有効期限が切れるまでの残り時間よりも短い場合、クライアントは実際にトークンの有効期限が切れる前に再認証する必要があります。デフォルトでは、アクセストークンの期限が切れてもセッションは期限切れにならず、クライアントは再認証を試行しません。
    6
    (任意設定): 認可サーバーへの TLS 接続用の信用できる証明書。
    7
    (任意設定): TLS ホスト名の検証を無効にします。デフォルトは false です。
    8
    JWKS 証明書が期限切れになる前に有効であるとみなされる期間。デフォルトは 360 秒です。デフォルトよりも長い時間を指定する場合は、無効になった証明書へのアクセスが許可されるリスクを考慮してください。
    9
    JWKS 証明書を更新する間隔。この間隔は、有効期間よりも 60 秒以上短くする必要があります。デフォルトは 300 秒です。
    10
    JWKS 公開鍵の更新が連続して試行される間隔の最小一時停止時間 (秒単位)。不明な署名キーが検出されると、JWKS キーの更新は、最後に更新を試みてから少なくとも指定された期間は一時停止し、通常の定期スケジュール以外でスケジュールされます。キーの更新は指数バックオフ (指数バックオフ) のルールに従い、jwksRefreshSeconds に到達するまで、一時停止を増やして失敗した更新を再試行します。デフォルト値は 1 です。

    例 2: イントロスペクションエンドポイントを使用したトークンの検証の設定

    - name: external3
      port: 9094
      type: loadbalancer
      tls: true
      authentication:
        type: oauth
        validIssuerUri: https://<auth_server_address>/auth/realms/external
        introspectionEndpointUri: https://<auth_server_address>/auth/realms/external/protocol/openid-connect/token/introspect 1
        clientId: kafka-broker 2
        clientSecret: 3
          secretName: my-cluster-oauth
          key: clientSecret
        userNameClaim: preferred_username 4
        maxSecondsWithoutReauthentication: 3600 5

    1
    トークンイントロスペクションエンドポイントの URI。
    2
    クライアントを識別するためのクライアント ID。
    3
    認証にはクライアントシークレットとクライアント ID が使用されます。
    4
    ユーザーを識別するために使用される実際のユーザー名を含むトークンクレーム (またはキー)。その値は、認可サーバーによって異なります。必要に応じて、"['user.info'].['user.id']" のような JsonPath 式を使用して、トークン内のネストされた JSON 属性からユーザー名を取得できます。
    5
    (任意設定): セッションの有効期限がアクセストークンと同じ期間になるよう強制する Kafka の再認証メカニズムを有効にします。指定された値がアクセストークンの有効期限が切れるまでの残り時間よりも短い場合、クライアントは実際にトークンの有効期限が切れる前に再認証する必要があります。デフォルトでは、アクセストークンの期限が切れてもセッションは期限切れにならず、クライアントは再認証を試行しません。

    OAuth 2.0 認証の適用方法や、認可サーバーのタイプによっては、追加 (任意) の設定を使用できます。

      # ...
      authentication:
        type: oauth
        # ...
        checkIssuer: false 1
        checkAudience: true 2
        fallbackUserNameClaim: client_id 3
        fallbackUserNamePrefix: client-account- 4
        validTokenType: bearer 5
        userInfoEndpointUri: https://<auth_server_address>/auth/realms/external/protocol/openid-connect/userinfo 6
        enableOauthBearer: false 7
        enablePlain: true 8
        tokenEndpointUri: https://<auth_server_address>/auth/realms/external/protocol/openid-connect/token 9
        customClaimCheck: "@.custom == 'custom-value'" 10
        clientAudience: audience 11
        clientScope: scope 12
        connectTimeoutSeconds: 60 13
        readTimeoutSeconds: 60 14
        httpRetries: 2 15
        httpRetryPauseMs: 300 16
        groupsClaim: "$.groups" 17
        groupsClaimDelimiter: "," 18
        includeAcceptHeader: false 19
    1
    認可サーバーが iss クレームを提供しない場合は、発行者チェックを行うことができません。このような場合、checkIssuerfalse に設定し、validIssuerUri を指定しないようにします。デフォルトは true です。
    2
    オーソリゼーションサーバーが aud(オーディエンス) クレームを提供していて、オーディエンスチェックを実施したい場合は、checkAudiencetrue に設定します。オーディエンスチェックによって、トークンの目的の受信者が特定されます。これにより、Kafka ブローカーは aud 要求に clientId を持たないトークンを拒否します。デフォルトは false です。
    3
    認可サーバーは、通常ユーザーとクライアントの両方を識別する単一の属性を提供しない場合があります。クライアントが独自の名前で認証される場合、サーバーによって クライアント ID が提供されることがあります。更新トークンまたはアクセストークンを取得するために、ユーザー名およびパスワードを使用してユーザーが認証される場合、サーバーによってクライアント ID の他に username が提供されることがあります。プライマリーユーザー ID 属性が使用できない場合は、このフォールバックオプションで、使用するユーザー名クレーム (属性) を指定します。必要に応じて、"['client.info'].['client.id']" のような JsonPath 式を使用してフォールバックユーザー名を取得し、トークン内のネストされた JSON 属性からユーザー名を取得できます。
    4
    fallbackUserNameClaim が適用される場合、ユーザー名クレームの値とフォールバックユーザー名クレームの値が競合しないようにする必要もあることがあります。producer というクライアントが存在し、producer という通常ユーザーも存在する場合について考えてみましょう。この 2 つを区別するには、このプロパティーを使用してクライアントのユーザー ID に接頭辞を追加します。
    5
    (introspectionEndpointUri を使用する場合のみ該当): 使用している認可サーバーによっては、イントロスペクションエンドポイントによってトークンタイプ属性が返されるかどうかは分からず、異なる値が含まれることがあります。イントロスペクションエンドポイントからの応答に含まれなければならない有効なトークンタイプ値を指定できます。
    6
    (introspectionEndpointUri を使用する場合のみ該当): イントロスペクションエンドポイントの応答に識別可能な情報が含まれないように、認可サーバーが設定または実装されることがあります。ユーザー ID を取得するには、userinfo エンドポイントの URI をフォールバックとして設定します。userNameClaimfallbackUserNameClaim、および fallbackUserNamePrefix の設定が userinfo エンドポイントの応答に適用されます。
    7
    これを false に設定してリスナーで OAUTHBEARER メカニズムを無効にします。PLAIN または OAUTHBEARER のいずれかを有効にする必要があります。デフォルトは true です。
    8
    リスナーで PLAIN 認証を有効にするには、true に設定します。これは、すべてのプラットフォームのすべてのクライアントでサポートされています。
    9
    PLAIN メカニズムの追加設定。これが指定されている場合、クライアントは $accessToken: 接頭辞を使用してアクセストークンを password として渡すことで、PLAIN 経由で認証できます。実稼働環境の場合は、常に https:// urls を使用してください。
    10
    これを JsonPath フィルタークエリーに設定すると、検証中に追加のカスタムルールを JWT アクセストークンに適用できます。アクセストークンに必要なデータが含まれていないと拒否されます。introspectionEndpointUri を使用する場合、カスタムチェックはイントロスペクションエンドポイントの応答 JSON に適用されます。
    11
    トークンエンドポイントに渡される audience パラメーター。オーディエンス は、inter-broker 認証用にアクセストークンを取得する場合に使用されます。また、clientIdsecret を使用した PLAIN クライアント認証の上にある OAuth 2.0 のクライアント名にも使われています。これは、認可サーバーに応じて、トークンの取得機能とトークンの内容のみに影響します。リスナーによるトークン検証ルールには影響しません。
    12
    scope パラメーターがトークンエンドポイントに渡されます。スコープ は、inter-broker 認証用にアクセストークンを取得する場合に使用されます。また、clientIdsecret を使用した PLAIN クライアント認証の上にある OAuth 2.0 のクライアント名にも使われています。これは、認可サーバーに応じて、トークンの取得機能とトークンの内容のみに影響します。リスナーによるトークン検証ルールには影響しません。
    13
    認可サーバーへの接続時のタイムアウト (秒単位)。デフォルト値は 60 です。
    14
    認可サーバーへの接続時の読み取りタイムアウト (秒単位)。デフォルト値は 60 です。
    15
    認可サーバーへの失敗した HTTP リクエストを再試行する最大回数。デフォルト値は 0 で、再試行は実行されないことを意味します。このオプションを効果的に使用するには、connectTimeoutSeconds オプションと readTimeoutSeconds オプションのタイムアウト時間を短縮することを検討してください。ただし、再試行により現在のワーカースレッドが他のリクエストで利用できなくなる可能性があり、リクエストが多すぎると停止する場合、Kafka ブローカーが応答しなくなる可能性があることに注意してください。
    16
    認可サーバーへの失敗した HTTP リクエストの再試行を行うまでの待機時間。デフォルトでは、この時間はゼロに設定されており、一時停止は適用されません。これは、リクエストの失敗の原因となる問題の多くは、リクエストごとのネットワークの不具合やプロキシーの問題であり、すぐに解決できるためです。ただし、認可サーバーに負荷がかかっている場合、または高トラフィックが発生している場合は、このオプションを 100 ミリ秒以上の値に設定して、サーバーの負荷を軽減し、再試行が成功する可能性を高めることができます。
    17
    JWT トークンまたはイントロスペクションエンドポイント応答からグループ情報を抽出するために使用される JsonPath クエリー。このオプションはデフォルトでは設定されていません。このオプションを設定すると、カスタム承認者はユーザーグループに基づいて承認の決定を行うことができます。
    18
    グループ情報が単一の区切り文字列として返される場合に、グループ情報を解析するために使用される区切り文字。デフォルト値は ',' (コンマ) です。
    19
    一部の認可サーバーでは、クライアントが Accept: application/json ヘッダーを送信する際に問題が発生します。includeAcceptHeader: false を設定する場合、ヘッダーは送信されません。デフォルトは true です。
  3. エディターを保存して終了し、ローリング更新の完了を待ちます。
  4. 更新をログで確認するか、Pod 状態の遷移を監視して確認します。

    oc logs -f ${POD_NAME} -c ${CONTAINER_NAME}
    oc get pod -w

    ローリング更新によって、ブローカーが OAuth 2.0 認証を使用するように設定されます。

15.4.6.3. OAuth 2.0 を使用するための Kafka Java クライアントの設定

Kafka ブローカーとの対話に OAuth 2.0 を使用するように Kafka プロデューサー API とコンシューマー API を設定します。コールバックプラグインをクライアントの pom.xml ファイルに追加してから、OAuth 2.0 用にクライアントを設定します。

クライアント設定で次を指定します。

  • SASL (Simple Authentication and Security Layer) セキュリティープロトコル:

    • TLS 暗号化接続を介した認証用の SASL_SSL
    • 暗号化されていない接続を介した認証用の SASL_PLAINTEXT

      プロダクションには SASL_SSL を使用し、ローカル開発には SASL_PLAINTEXT のみを使用してください。SASL_SSL を使用する場合は、追加の ssl.truststore 設定が必要です。OAuth 2.0 認可サーバーへのセキュアな接続 (https://) には、トラストストア設定が必要です。OAuth 2.0 認可サーバーを確認するには、認可サーバーの CA 証明書をクライアント設定のトラストストアに追加します。トラストストアは、PEM または PKCS #12 形式で設定できます。

  • Kafka SASL メカニズム:

    • ベアラートークンを使用したクレデンシャル交換用の OAUTHBEARER
    • クライアントクレデンシャル (clientId + secret) またはアクセストークンを渡す PLAIN
  • SASL メカニズムを実装する JAAS (Java Authentication and Authorization Service) モジュール:

    • org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule は OAUTHBEARER メカニズムを実装します。
    • org.apache.kafka.common.security.plain.PlainLoginModule は plain メカニズムを実装します。

    OAuthbearer メカニズムを使用できるようにするには、io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler のカスタムクラスをコールバックハンドラーとして追加する必要もあります。JaasClientOauthLoginCallbackHandler は、クライアントのログイン中にアクセストークンの認可サーバーへの OAuth コールバックを処理します。これにより、トークンの自動更新が可能になり、ユーザーの介入なしで継続的な認証が保証されます。さらに、OAuth 2.0 パスワード付与方式を使用してクライアントのログイン認証情報を処理します。

  • 以下の認証方法をサポートする SASL 認証プロパティー:

    • OAuth 2.0 クライアントクレデンシャル
    • OAuth 2.0 パスワードグラント (非推奨)
    • アクセストークン
    • リフレッシュトークン

    SASL 認証プロパティーを JAAS 設定として追加します (sasl.jaas.config および sasl.login.callback.handler.class)。認証プロパティーを設定する方法は、OAuth 2.0 認可サーバーへのアクセスに使用している認証方法によって異なります。この手順では、プロパティーはプロパティーファイルで指定されてから、クライアント設定にロードされます。

注記

認証プロパティーを環境変数または Java システムプロパティーとして指定することもできます。Java システムプロパティーの場合は、setProperty を使用して設定し、-D オプションを使用してコマンドラインで渡すことができます。

前提条件

  • Streams for Apache Kafka と Kafka が実行中である。
  • OAuth 2.0 認可サーバーがデプロイされ、Kafka ブローカーへの OAuth のアクセスが設定されている。
  • Kafka ブローカーが OAuth 2.0 に対して設定されている。

手順

  1. OAuth 2.0 サポートのあるクライアントライブラリーを Kafka クライアントの pom.xml ファイルに追加します。

    <dependency>
     <groupId>io.strimzi</groupId>
     <artifactId>kafka-oauth-client</artifactId>
     <version>0.15.0.redhat-00007</version>
    </dependency>
  2. プロパティーファイルで以下の設定を指定して、クライアントプロパティーを設定します。

    • セキュリティープロトコル
    • SASL メカニズム
    • 使用されているメソッドに応じた JAAS モジュールと認証プロパティー

      たとえば、以下を client.properties ファイルに追加できます。

      クライアントクレデンシャルメカニズムのプロパティー

      security.protocol=SASL_SSL 1
      sasl.mechanism=OAUTHBEARER 2
      ssl.truststore.location=/tmp/truststore.p12 3
      ssl.truststore.password=$STOREPASS
      ssl.truststore.type=PKCS12
      sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
        oauth.token.endpoint.uri="<token_endpoint_url>" \ 4
        oauth.client.id="<client_id>" \ 5
        oauth.client.secret="<client_secret>" \ 6
        oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \ 7
        oauth.ssl.truststore.password="$STOREPASS" \ 8
        oauth.ssl.truststore.type="PKCS12" \ 9
        oauth.scope="<scope>" \ 10
        oauth.audience="<audience>" ; 11
      sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler

      1
      TLS 暗号化接続用の SASL_SSL セキュリティープロトコル。ローカル開発のみでは、暗号化されていない接続で SASL_PLAINTEXT を使用します。
      2
      OAUTHBEARER または PLAIN として指定された SASL メカニズム。
      3
      Kafka クラスターへのセキュアなアクセスのためのトラストストア設定。
      4
      認可サーバーのトークンエンドポイントの URI です。
      5
      クライアント ID。認可サーバーで クライアント を作成するときに使用される名前です。
      6
      認可サーバーで クライアント を作成するときに作成されるクライアントシークレット。
      7
      この場所には、認可サーバーの公開鍵証明書 (truststore.p12) が含まれています。
      8
      トラストストアにアクセスするためのパスワード。
      9
      トラストストアのタイプ。
      10
      (オプション): トークンエンドポイントからトークンを要求するための scope。認可サーバーでは、クライアントによるスコープの指定が必要になることがあります。
      11
      (オプション) トークンエンドポイントからトークンを要求するための audience。認可サーバーでは、クライアントによるオーディエンスの指定が必要になることがあります。

      パスワード付与メカニズムのプロパティー

      security.protocol=SASL_SSL
      sasl.mechanism=OAUTHBEARER
      ssl.truststore.location=/tmp/truststore.p12
      ssl.truststore.password=$STOREPASS
      ssl.truststore.type=PKCS12
      sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
        oauth.token.endpoint.uri="<token_endpoint_url>" \
        oauth.client.id="<client_id>" \ 1
        oauth.client.secret="<client_secret>" \ 2
        oauth.password.grant.username="<username>" \ 3
        oauth.password.grant.password="<password>" \ 4
        oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \
        oauth.ssl.truststore.password="$STOREPASS" \
        oauth.ssl.truststore.type="PKCS12" \
        oauth.scope="<scope>" \
        oauth.audience="<audience>" ;
      sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler

      1
      クライアント ID。認可サーバーで クライアント を作成するときに使用される名前です。
      2
      (オプション) 認可サーバーで クライアント を作成するときに作成されるクライアントシークレット。
      3
      パスワードグラント認証のユーザー名。OAuth パスワードグラント設定 (ユーザー名とパスワード) では、OAuth 2.0 パスワードグラント方式が使用されます。パスワードグラントを使用するには、権限が制限されたクライアントのユーザーアカウントを認可サーバー上に作成します。アカウントは、サービスアカウントのように機能する必要があります。認証にユーザーアカウントが必要な環境で使用しますが、先にリフレッシュトークンの使用を検討してください。
      4
      パスワード付与認証のパスワード。
      注記

      SASL PLAIN は、OAuth 2.0 パスワード付与メソッドを使用したユーザー名とパスワードの受け渡し (パスワード付与) をサポートしていません。

      アクセストークンのプロパティー

      security.protocol=SASL_SSL
      sasl.mechanism=OAUTHBEARER
      ssl.truststore.location=/tmp/truststore.p12
      ssl.truststore.password=$STOREPASS
      ssl.truststore.type=PKCS12
      sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
        oauth.token.endpoint.uri="<token_endpoint_url>" \
        oauth.access.token="<access_token>" \ 1
        oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \
        oauth.ssl.truststore.password="$STOREPASS" \
        oauth.ssl.truststore.type="PKCS12" ;
      sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler

      1
      Kafka クライアントの有効期間が長いアクセストークン。

      トークンのプロパティーを更新する

      security.protocol=SASL_SSL
      sasl.mechanism=OAUTHBEARER
      ssl.truststore.location=/tmp/truststore.p12
      ssl.truststore.password=$STOREPASS
      ssl.truststore.type=PKCS12
      sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
        oauth.token.endpoint.uri="<token_endpoint_url>" \
        oauth.client.id="<client_id>" \ 1
        oauth.client.secret="<client_secret>" \ 2
        oauth.refresh.token="<refresh_token>" \ 3
        oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \
        oauth.ssl.truststore.password="$STOREPASS" \
        oauth.ssl.truststore.type="PKCS12" ;
      sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler

      1
      クライアント ID。認可サーバーで クライアント を作成するときに使用される名前です。
      2
      (オプション) 認可サーバーで クライアント を作成するときに作成されるクライアントシークレット。
      3
      Kafka クライアントの有効期間が長い更新トークン。
  3. OAUTH 2.0 認証のクライアントプロパティーを Java クライアントコードに入力します。

    クライアントプロパティーの入力を示す例

    Properties props = new Properties();
    try (FileReader reader = new FileReader("client.properties", StandardCharsets.UTF_8)) {
      props.load(reader);
    }

  4. Kafka クライアントが Kafka ブローカーにアクセスできることを確認します。

15.4.6.4. Kafka コンポーネントの OAuth 2.0 の設定

この手順では、認可サーバーを使用して OAuth 2.0 認証を使用するように Kafka コンポーネントを設定する方法を説明します。

以下の認証を設定できます。

  • Kafka Connect
  • Kafka MirrorMaker
  • Kafka Bridge

この手順では、Kafka コンポーネントと認可サーバーは同じサーバーで稼働しています。

作業を開始する前の注意事項

Kafka コンポーネントの OAuth 2.0 認証の設定に関する詳細は、KafkaClientAuthenticationOAuth スキーマリファレンス を参照してください。スキーマリファレンスには、設定オプションの例が記載されています。

前提条件

  • Streams for Apache Kafka と Kafka が実行中である。
  • OAuth 2.0 認可サーバーがデプロイされ、Kafka ブローカーへの OAuth のアクセスが設定されている。
  • Kafka ブローカーが OAuth 2.0 に対して設定されている。

手順

  1. クライアントシークレットを作成し、これを環境変数としてコンポーネントにマウントします。

    以下は、Kafka Bridge の Secret を作成する例になります。

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Secret
    metadata:
     name: my-bridge-oauth
    type: Opaque
    data:
     clientSecret: MGQ1OTRmMzYtZTllZS00MDY2LWI5OGEtMTM5MzM2NjdlZjQw 1
    1
    clientSecret キーは base64 形式である必要があります。
  2. Kafka コンポーネントのリソースを作成または編集し、OAuth 2.0 認証が認証プロパティーに設定されるようにします。

    OAuth 2.0 認証の場合、次のオプションを使用できます。

    • クライアント ID およびシークレット
    • クライアント ID およびリフレッシュトークン
    • アクセストークン
    • ユーザー名およびパスワード
    • TLS

    以下は、クライアント ID、シークレット、および TLS を使用して OAuth 2.0 が Kafka Bridge クライアントに割り当てられる例になります。

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaBridge
    metadata:
      name: my-bridge
    spec:
      # ...
      authentication:
        type: oauth 1
        tokenEndpointUri: https://<auth-server-address>/auth/realms/master/protocol/openid-connect/token 2
        clientId: kafka-bridge
        clientSecret:
          secretName: my-bridge-oauth
          key: clientSecret
        tlsTrustedCertificates: 3
        - secretName: oauth-server-cert
          certificate: tls.crt
    1
    oauth に設定された認証タイプ。
    2
    認証用のトークンエンドポイントの URI。
    3
    認可サーバーへの TLS 接続用の信用できる証明書。

    OAuth 2.0 認証の適用方法や、認可サーバーのタイプによって、使用できる追加の設定オプションがあります。

    # ...
    spec:
      # ...
      authentication:
        # ...
        disableTlsHostnameVerification: true 1
        checkAccessTokenType: false 2
        accessTokenIsJwt: false 3
        scope: any 4
        audience: kafka 5
        connectTimeoutSeconds: 60 6
        readTimeoutSeconds: 60 7
        httpRetries: 2 8
        httpRetryPauseMs: 300 9
        includeAcceptHeader: false 10
    1
    (任意設定): TLS ホスト名の検証を無効にします。デフォルトは false です。
    2
    認可サーバーによって、JWT トークン内部で typ (タイプ) 要求が返されない場合は、checkAccessTokenType: false を適用するとトークンタイプがチェックされず次に進むことができます。デフォルトは true です。
    3
    不透明なトークンを使用している場合、アクセストークンが JWT トークンとして処理されないように accessTokenIsJwt: false を適用することができます。
    4
    (オプション): トークンエンドポイントからトークンを要求するための scope。認可サーバーでは、クライアントによるスコープの指定が必要になることがあります。この場合では any になります。
    5
    (オプション) トークンエンドポイントからトークンを要求するための audience。認可サーバーでは、クライアントによるオーディエンスの指定が必要になることがあります。今回の場合は kafka です。
    6
    (オプション) 認可サーバーへの接続時のタイムアウト (秒単位)。デフォルト値は 60 です。
    7
    (オプション): 認可サーバーへの接続時の読み取りタイムアウト (秒単位)。デフォルト値は 60 です。
    8
    (オプション) 認可サーバーへの失敗した HTTP リクエストを再試行する最大回数。デフォルト値は 0 で、再試行は実行されないことを意味します。このオプションを効果的に使用するには、connectTimeoutSeconds オプションと readTimeoutSeconds オプションのタイムアウト時間を短縮することを検討してください。ただし、再試行により現在のワーカースレッドが他のリクエストで利用できなくなる可能性があり、リクエストが多すぎると停止する場合、Kafka ブローカーが応答しなくなる可能性があることに注意してください。
    9
    (オプション) 失敗した認可サーバーへの HTTP リクエストの再試行を行うまでの待機時間。デフォルトでは、この時間はゼロに設定されており、一時停止は適用されません。これは、リクエストの失敗の原因となる問題の多くは、リクエストごとのネットワークの不具合やプロキシーの問題であり、すぐに解決できるためです。ただし、認可サーバーに負荷がかかっている場合、または高トラフィックが発生している場合は、このオプションを 100 ミリ秒以上の値に設定して、サーバーの負荷を軽減し、再試行が成功する可能性を高めることができます。
    10
    (オプション) 一部の認可サーバーでは、クライアントが Accept: application/json ヘッダーを送信する際に問題が発生します。includeAcceptHeader: false を設定する場合、ヘッダーは送信されません。デフォルトは true です。
  3. Kafka リソースのデプロイメントに変更を適用します。

    oc apply -f your-file
  4. 更新をログで確認するか、Pod 状態の遷移を監視して確認します。

    oc logs -f ${POD_NAME} -c ${CONTAINER_NAME}
    oc get pod -w

    ローリング更新では、OAuth 2.0 認証を使用して Kafka ブローカーと対話するコンポーネントが設定されます。

Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.