第4章 Kafka へのセキュアなアクセスの管理
各クライアントの Kafka ブローカーへのアクセスを管理することで、Kafka クラスターを保護できます。
Kafka ブローカーとクライアント間のセキュアな接続には、以下が含まれます。
- データ交換の暗号化
- アイデンティティー証明に使用する認証
- ユーザーが実行するアクションを許可または拒否する承認
本章では、以下を取り上げ、Kafka ブローカーとクライアント間でセキュアな接続を設定する方法を説明します。
- Kafka クラスターおよびクライアントのセキュリティーオプション
- Kafka ブローカーをセキュアにする方法
- OAuth 2.0 トークンベースの認証および承認に承認サーバーを使用する方法
4.1. Kafka のセキュリティーオプション
Kafka
リソースを使用して、Kafka の認証および承認に使用されるメカニズムを設定します。
4.1.1. リスナー認証
OpenShift クラスター内のクライアントの場合は、plain
(暗号化なし) または tls
内部 リスナーを作成できます。
OpenShift クラスター外のクライアントの場合は、外部 リスナーを作成して接続メカニズムを指定します。接続メカニズムは nodeport
、loadbalancer
、ingress
、または route
(OpenShift の場合) のいずれかです。
外部クライアントを接続するための設定オプションの詳細は、「外部リスナーの設定」を参照してください。
サポートされる認証オプションは次のとおりです。
- 相互 TLS 認証 (TLS による暗号化が有効なリスナーのみ)
- SCRAM-SHA-512 認証
- OAuth 2.0 のトークンベースの認証
選択する認証オプションは、Kafka ブローカーへのクライアントアクセスを認証する方法によって異なります。
図4.1 Kafka リスナーの認証オプション
リスナーの authentication
プロパティーは、そのリスナーに固有の認証メカニズムを指定するために使用されます。
authentication
プロパティーが指定されていない場合、リスナーはそのリスナー経由で接続するクライアントを認証しません。認証がないと、リスナーではすべての接続が許可されます。
認証は、User Operator を使用して KafkaUsers
を管理する場合に設定する必要があります。
以下の例で指定されるものは次のとおりです。
-
SCRAM-SHA-512 認証に設定された
plain
リスナー -
相互 TLS 認証を使用する
tls
リスナー -
相互 TLS 認証を使用する
external
リスナー
各リスナーは、Kafka クラスター内で一意の名前およびポートで設定されます。
ブローカー間通信 (9091) およびメトリクス (9404) 用に確保されたポートを使用するようにリスナーを設定することはできません。
リスナー認証設定の例
# ... listeners: - name: plain port: 9092 type: internal tls: true authentication: type: scram-sha-512 - name: tls port: 9093 type: internal tls: true authentication: type: tls - name: external port: 9094 type: loadbalancer tls: true authentication: type: tls # ...
4.1.1.1. 相互 TLS 認証
相互 TLS 認証は、Kafka ブローカーと ZooKeeper Pod 間の通信で常に使用されます。
AMQ Streams では、Kafka が TLS (Transport Layer Security) を使用して、相互認証の有無を問わず、Kafka ブローカーとクライアントとの間で暗号化された通信が行われるよう設定できます。相互 (双方向) 認証の場合、サーバーとクライアントの両方が証明書を提示します。相互認証を設定すると、ブローカーはクライアントを認証し (クライアント認証)、クライアントはブローカーを認証します (サーバー認証)。
TLS 認証は一般的には一方向で、一方が他方のアイデンティティーを認証します。たとえば、Web ブラウザーと Web サーバーの間で HTTPS が使用される場合、ブラウザーは Web サーバーのアイデンティティーの証明を取得します。
4.1.1.2. SCRAM-SHA-512 認証
SCRAM (Salted Challenge Response Authentication Mechanism) は、パスワードを使用して相互認証を確立できる認証プロトコルです。AMQ Streams では、Kafka が SASL (Simple Authentication and Security Layer) SCRAM-SHA-512 を使用するよう設定し、暗号化されていないクライアントの接続と暗号化されたクライアントの接続の両方で認証を提供できます。
TLS クライアント接続で SCRAM-SHA-512 認証が使用される場合、TLS プロトコルは暗号化を提供しますが、認証には使用されません。
SCRAM の以下のプロパティーは、暗号化されていない接続でも SCRAM-SHA-512 を安全に使用できるようにします。
- 通信チャネル上では、パスワードはクリアテキストで送信されません。代わりに、クライアントとサーバーはお互いにチャレンジを生成し、認証するユーザーのパスワードを認識していることを証明します。
- サーバーとクライアントは、認証を交換するたびに新しいチャレンジを生成します。よって、この交換はリレー攻撃に対する回復性を備えています。
KafkaUser.spec.authentication.type
を scram-sha-512
に設定すると、User Operator によって、大文字と小文字の ASCII 文字と数字で構成された無作為の 12 文字のパスワードが生成されます。
4.1.1.3. ネットワークポリシー
AMQ Streams では、Kafka ブローカーで有効になっているリスナーごとに NetworkPolicy
リソースが自動的に作成されます。デフォルトでは、すべてのアプリケーションと namespace にアクセスする権限が NetworkPolicy
によってリスナーに付与されます。
ネットワークレベルでのリスナーへのアクセスを指定のアプリケーションまたは namespace のみに制限するには、networkPolicyPeers
プロパティーを使用します。
リスナーの認証設定の一部としてネットワークポリシーを使用します。リスナーごとに、異なる networkPolicyPeers
設定を指定できます。
詳細は、「リスナーネットワークポリシー」のセクションおよび 「NetworkPolicyPeer API reference」を参照してください。
AMQ Streams でネットワークポリシーを使用するには、ingress NetworkPolicies
が OpenShift の設定でサポートされる必要があります。
4.1.1.4. 追加のリスナー設定オプション
GenericKafkaListenerConfiguration スキーマのプロパティーを使用して、設定をリスナーに追加できます。
4.1.2. Kafka の承認
Kafka.spec.kafka
リソースの authorization
プロパティーを使用して Kafka ブローカーの承認を設定できます。authorization
プロパティーがないと、承認が有効にならず、クライアントの制限はありません。承認を有効にすると、承認は有効なすべてのリスナーに適用されます。承認方法は type
フィールドで定義されます。
サポートされる承認オプションは次のとおりです。
- 簡易承認
- OAuth 2.0 での承認 (OAuth 2.0 トークンベースの認証を使用している場合)
- Open Policy Agent (OPA) での承認
図4.2 Kafka クラスター承認オプション
4.1.2.1. スーパーユーザー
スーパーユーザーは、アクセスの制限に関係なく Kafka クラスターのすべてのリソースにアクセスでき、すべての承認メカニズムでサポートされます。
Kafka クラスターのスーパーユーザーを指定するには、superUsers
プロパティーにユーザープリンシパルのリストを追加します。ユーザーが TLS クライアント認証を使用する場合、ユーザー名は CN=
で始まる証明書のサブジェクトの共通名になります。
スーパーユーザーを使用した設定例
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: my-cluster namespace: myproject spec: kafka: # ... authorization: type: simple superUsers: - CN=client_1 - user_2 - CN=client_3 # ...
4.2. Kafka クライアントのセキュリティーオプション
KafkaUser
リソースを使用して、Kafka クライアントの認証メカニズム、承認メカニズム、およびアクセス権限を設定します。セキュリティーの設定では、クライアントはユーザーとして表されます。
Kafka ブローカーへのユーザーアクセスを認証および承認できます。認証によってアクセスが許可され、承認によって許容されるアクションへのアクセスが制限されます。
Kafka ブローカーへのアクセスが制限されない スーパーユーザー を作成することもできます。
認証および承認メカニズムは、Kafka ブローカーへのアクセスに使用されるリスナーの仕様 と一致する必要があります。
4.2.1. ユーザー処理用の Kafka クラスターの特定
KafkaUser
リソースには、このリソースが属する Kafka クラスターに適した名前 (Kafka
リソースの名前から派生) を定義するラベルが含まれています。
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaUser metadata: name: my-user labels: strimzi.io/cluster: my-cluster
このラベルは、KafkaUser
リソースを特定し、新しいユーザーを作成するために、User Operator によって使用されます。また、以降のユーザーの処理でも使用されます。
ラベルが Kafka クラスターと一致しない場合、User Operator は KafkaUser
を識別できず、ユーザーは作成されません。
KafkaUser
リソースのステータスが空のままであれば、ラベルを確認してください。
4.2.2. ユーザー認証
ユーザー認証は、KafkaUser.spec
の authentication
プロパティーを使用して設定されます。ユーザーに有効な認証メカニズムは、type
フィールドを使用して指定されます。
サポートされる認証メカニズム
- TLS クライアント認証
- SCRAM-SHA-512 認証
認証メカニズムを指定しないと、User Operator によってユーザーまたはそのクレデンシャルが作成されません。
4.2.2.1. TLS クライアント認証
TLS クライアント認証を使用するには、type
フィールドを tls
に設定します。
TLS クライアント認証が有効になっている KafkaUser
の例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaUser metadata: name: my-user labels: strimzi.io/cluster: my-cluster spec: authentication: type: tls # ...
ユーザーが User Operator によって作成されると、KafkaUser
リソースと同じ名前で新しいシークレットが作成されます。Secret には、TLS クライアント認証の秘密鍵と公開鍵が含まれます。公開鍵は、クライアント認証局 (CA) によって署名されたユーザー証明書に含まれます。
すべての鍵は X.509 形式です。
Secret には、PEM 形式および PKCS #12 形式の秘密鍵と証明書が含まれます。
Kafka と Secret との通信をセキュアにする方法については、11章TLS 証明書の管理 を参照してください。
ユーザークレデンシャルのある Secret
の例
apiVersion: v1 kind: Secret metadata: name: my-user labels: strimzi.io/kind: KafkaUser strimzi.io/cluster: my-cluster type: Opaque data: ca.crt: # Public key of the client CA user.crt: # User certificate that contains the public key of the user user.key: # Private key of the user user.p12: # PKCS #12 archive file for storing certificates and keys user.password: # Password for protecting the PKCS #12 archive file
4.2.2.2. SCRAM-SHA-512 認証
SCRAM-SHA-512 認証メカニズムを使用するには、type
フィールドを scram-sha-512
に設定します。
SCRAM-SHA-512 認証が有効になっている KafkaUser
の例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaUser metadata: name: my-user labels: strimzi.io/cluster: my-cluster spec: authentication: type: scram-sha-512 # ...
ユーザーが User Operator によって作成されると、KafkaUser
リソースと同じ名前で新しいシークレットが作成されます。シークレットの password
キーには、生成されたパスワードが含まれ、base64 でエンコードされます。パスワードを使用するにはデコードする必要があります。
ユーザークレデンシャルのある Secret
の例
apiVersion: v1 kind: Secret metadata: name: my-user labels: strimzi.io/kind: KafkaUser strimzi.io/cluster: my-cluster type: Opaque data: password: Z2VuZXJhdGVkcGFzc3dvcmQ= 1 sasl.jaas.config: b3JnLmFwYWNoZS5rYWZrYS5jb21tb24uc2VjdXJpdHkuc2NyYW0uU2NyYW1Mb2dpbk1vZHVsZSByZXF1aXJlZCB1c2VybmFtZT0ibXktdXNlciIgcGFzc3dvcmQ9ImdlbmVyYXRlZHBhc3N3b3JkIjsK 2
生成されたパスワードをデコードします。
echo "Z2VuZXJhdGVkcGFzc3dvcmQ=" | base64 --decode
4.2.3. ユーザーの承認
ユーザーの承認は、KafkaUser.spec
の authorization
プロパティーを使用して設定されます。ユーザーに有効な承認タイプは、type
フィールドを使用して指定します。
簡易承認を使用するには、KafkaUser.spec.authorization
で type
プロパティーを simple
に設定します。簡易承認では、デフォルトの Kafka 承認プラグインである AclAuthorizer
が使用されます。
代わりに、OPA 承認 を使用することもできます。OAuth 2.0 トークンベースの認証をすでに使用している場合は、OAuth 2.0 承認 を使用することもできます。
承認が指定されていない場合は、User Operator によるユーザーのアクセス権限のプロビジョニングは行われません。KafkaUser
がリソースにアクセスできるかどうかは、使用されているオーソライザーによって異なります。たとえば、AclAuthorizer
の場合は、allow.everyone.if.no.acl.found
設定によって決定されます。
4.2.3.1. ACL ルール
AclAuthorizer
は、ACL ルールを使用して Kafka ブローカーへのアクセスを管理します。
ACL ルールによって、acls
プロパティーで指定したユーザーにアクセス権限が付与されます。
AclRule
オブジェクトの詳細は、「AclRule
スキーマ参照」を参照してください。
4.2.3.2. Kafka ブローカーへのスーパーユーザーアクセス
ユーザーを Kafka ブローカー設定のスーパーユーザーのリストに追加すると、KafkaUser
の ACL で定義された承認制約に関係なく、そのユーザーにはクラスターへのアクセスが無制限に許可されます。
ブローカーへのスーパーユーザーアクセスの設定に関する詳細は「Kafka の承認」を参照してください。
4.2.3.3. ユーザークォータ
KafkaUser
リソースの spec
を設定してクォータを強制し、バイトしきい値または CPU 使用の時間制限に基づいてユーザーが Kafka ブローカーへのアクセスを超過しないようにすることができます。
ユーザークォータをともなう KafkaUser
の例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaUser metadata: name: my-user labels: strimzi.io/cluster: my-cluster spec: # ... quotas: producerByteRate: 1048576 1 consumerByteRate: 2097152 2 requestPercentage: 55 3
これらのプロパティーの詳細は、「KafkaUserQuotas
スキーマ参照」を参照してください。
4.3. Kafka ブローカーへのアクセスのセキュア化
Kafka ブローカーへのセキュアなアクセスを確立するには、以下を設定し、適用します。
Kafka
リソースが以下を行うようにします。- 指定された認証タイプでリスナーを作成します。
- Kafka クラスター全体の承認を設定します。
-
リスナーを使用して Kafka ブローカーにセキュアにアクセスするための
KafkaUser
リソース。
Kafka
リソースで以下を設定します。
- リスナー認証
- Kafka リスナーへのアクセスを制限するネットワークポリシー
- Kafka の承認
- ブローカーへのアクセスが制限されないスーパーユーザー
認証は、リスナーごとに独立して設定されます。承認は、常に Kafka クラスター全体に対して設定されます。
Cluster Operator はリスナーを作成し、クラスターおよびクライアント認証局 (CA) 証明書を設定して Kafka クラスター内で認証を有効にします。
独自の証明書をインストールして、Cluster Operator によって生成された証明書を置き換えることができます。外部認証局によって管理される Kafka リスナー証明書を使用するようにリスナーを設定することもできます。PKCS #12 形式 (.p12) および PEM 形式 (.crt) の証明書を利用できます。
KafkaUser
を使用して、Kafka にアクセスするために特定のクライアントが使用する認証および承認メカニズムを有効にします。
KafkaUser
リソースで以下を設定します。
- 有効なリスナー認証と一致する認証
- 有効な Kafka 承認と一致する承認
- クライアントによるリソースの使用を制御するクォータ
User Operator はクライアントに対応するユーザーを作成すると共に、選択した認証タイプに基づいて、クライアント認証に使用されるセキュリティークレデンシャルを作成します。
その他のリソース
スキーマの詳細は、以下を参照してください。
-
Kafka
の場合は「Kafka
スキーマ参照」 -
KafkaUser
の場合は「KafkaUser
スキーマ参照」
4.3.1. Kafka ブローカーのセキュア化
この手順では、AMQ Streams の実行時に Kafka ブローカーをセキュアにするためのステップを説明します。
Kafka ブローカーに実装されたセキュリティーは、アクセスを必要とするクライアントに実装されたセキュリティーとの互換性を維持する必要があります。
-
Kafka.spec.kafka.listeners[*].authentication
はKafkaUser.spec.authentication
と一致します。 -
Kafka.spec.kafka.authorization
はKafkaUser.spec.authorization
と一致します。
この手順では、TLS 認証を使用した簡易承認とリスナーの設定を説明します。リスナーの設定に関する詳細は、「GenericKafkaListener
スキーマ参照」を参照してください。
代わりに、リスナー認証 には SCRAM-SHA または OAuth 2.0、Kafka 承認 には OAuth 2.0 または OPA を使用することができます。
手順
Kafka
リソースを設定します。-
承認には
authorization
プロパティーを設定します。 listeners
プロパティーを設定し、認証のあるリスナーを作成します。以下に例を示します。
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka spec: kafka: # ... authorization: 1 type: simple superUsers: 2 - CN=client_1 - user_2 - CN=client_3 listeners: - name: tls port: 9093 type: internal tls: true authentication: type: tls 3 # ... zookeeper: # ...
- 1
- 2
- Kafka へのアクセスを制限されないユーザープリンシパルのリスト。CN は、TLS による認証が使用される場合のクライアント証明書の共通名です。
- 3
- リスナーの認証メカニズムは各リスナーに対して設定でき、相互 TLS、SCRAM-SHA-512、またはトークンベース OAuth 2.0 として指定 できます。
外部リスナーを設定している場合、設定は選択した接続のメカニズムによって異なります。
-
承認には
Kafka
リソースを作成または更新します。oc apply -f KAFKA-CONFIG-FILE
Kafka クラスターは、TLS 認証を使用する Kafka ブローカーリスナーと共に設定されます。
Kafka ブローカー Pod ごとにサービスが作成されます。
サービスが作成され、Kafka クラスターに接続するための ブートストラップアドレス として機能します。
kafka ブローカーの ID を検証するためのクラスター CA 証明書も、
Kafka
リソースと同じ名前で作成されます。
4.3.2. Kafka へのユーザーアクセスのセキュア化
KafkaUser
リソースのプロパティーを使用して Kafka ユーザーを設定します。
oc apply
を使用すると、ユーザーを作成または編集できます。oc delete
を使用すると、既存のユーザーを削除できます。
以下に例を示します。
-
oc apply -f USER-CONFIG-FILE
-
oc delete KafkaUser USER-NAME
KafkaUser
認証および承認メカニズムを設定する場合、必ず同等の Kafka
設定と一致するようにしてください。
-
KafkaUser.spec.authentication
はKafka.spec.kafka.listeners[*].authentication
と一致 -
KafkaUser.spec.authorization
はKafka.spec.kafka.authorization
と一致
この手順では、TLS 認証でユーザーを作成する方法を説明します。SCRAM-SHA 認証でユーザーを作成することも可能です。
必要な認証は、Kafka ブローカーリスナーに設定された認証のタイプ によって異なります。
Kafka ユーザーと Kafka ブローカー間の認証は、それぞれの認証設定によって異なります。たとえば、TLS が Kafka 設定で有効になっていない場合は、TLS でユーザーを認証できません。
前提条件
- TLS による認証および暗号化を使用して Kafka ブローカーリスナーで設定された 稼働中の Kafka クラスターが必要です。
- 稼働中の User Operator (通常は Entity Operator でデプロイされる) が必要です。
KafkaUser
の認証タイプは、Kafka
ブローカーで設定された認証と一致する必要があります。
手順
KafkaUser
リソースを設定します。以下に例を示します。
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaUser metadata: name: my-user labels: strimzi.io/cluster: my-cluster spec: authentication: 1 type: tls authorization: type: simple 2 acls: - resource: type: topic name: my-topic patternType: literal operation: Read - resource: type: topic name: my-topic patternType: literal operation: Describe - resource: type: group name: my-group patternType: literal operation: Read
KafkaUser
リソースを作成または更新します。oc apply -f USER-CONFIG-FILE
KafkaUser
リソースと同じ名前の Secret と共に、ユーザーが作成されます。Secret には、TLS クライアント認証の秘密鍵と公開鍵が含まれます。
Kafka ブローカーへの接続をセキュアにするために Kafka クライアントをプロパティーで設定する詳細は『OpenShift での AMQ Streams のデプロイおよびアップグレード』の「OpenShift 外クライアントのアクセスの設定」を参照してください。
4.3.3. ネットワークポリシーを使用した Kafka リスナーへのアクセス制限
networkPolicyPeers
プロパティーを使用すると、リスナーへのアクセスを指定のアプリケーションのみに制限できます。
前提条件
- Ingress NetworkPolicies をサポートする OpenShift クラスター。
- Cluster Operator が稼働している必要があります。
手順
-
Kafka
リソースを開きます。 networkPolicyPeers
プロパティーで、Kafka クラスターへのアクセスが許可されるアプリケーション Pod または namespace を定義します。以下は、ラベル
app
がkafka-client
に設定されているアプリケーションからの接続のみを許可するようtls
リスナーを設定する例になります。apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka spec: kafka: # ... listeners: - name: tls port: 9093 type: internal tls: true authentication: type: tls networkPolicyPeers: - podSelector: matchLabels: app: kafka-client # ... zookeeper: # ...
リソースを作成または更新します。
oc apply
を使用します。oc apply -f your-file
その他のリソース
-
スキーマの詳細は、「NetworkPolicyPeer API reference」および「
GenericKafkaListener
スキーマ参照」を参照してください。
4.4. OAuth 2.0 トークンベース認証の使用
AMQ Streams は、OAUTHBEARER および PLAIN メカニズムを使用して OAuth 2.0 認証の使用をサポートします。
OAuth 2.0 は、アプリケーション間で標準的なトークンベースの認証および承認を有効にし、中央の承認サーバーを使用してリソースに制限されたアクセス権限を付与するトークンを発行します。
Kafka ブローカーおよびクライアントの両方が OAuth 2.0 を使用するように設定する必要があります。OAuth 2.0 認証を設定した後に OAuth 2.0 承認 を設定できます。
OAuth 2.0 認証は、Kafka 承認 と併用できます。
OAuth 2.0 認証を使用すると、アプリケーションクライアントはアカウントのクレデンシャルを公開せずにアプリケーションサーバー (リソースサーバー と呼ばれる) のリソースにアクセスできます。
アプリケーションクライアントは、アクセストークンを認証の手段として渡します。アプリケーションサーバーはこれを使用して、付与するアクセス権限のレベルを決定することもできます。承認サーバーは、アクセスの付与とアクセスに関する問い合わせを処理します。
AMQ Streams のコンテキストでは以下が行われます。
- Kafka ブローカーは OAuth 2.0 リソースサーバーとして動作します。
- Kafka クライアントは OAuth 2.0 アプリケーションクライアントとして動作します。
Kafka クライアントは Kafka ブローカーに対して認証を行います。ブローカーおよびクライアントは、必要に応じて OAuth 2.0 承認サーバーと通信し、アクセストークンを取得または検証します。
AMQ Streams のデプロイメントでは、OAuth 2.0 インテグレーションは以下を提供します。
- Kafka ブローカーのサーバー側 OAuth 2.0 サポート。
- Kafka MirrorMaker、Kafka Connect、および Kafka Bridge のクライアント側 OAuth 2.0 サポート。
その他のリソース
4.4.1. OAuth 2.0 認証メカニズム
AMQ Streams は、OAuth 2.0 認証で OAUTHBEARER および PLAIN メカニズムをサポートします。どちらのメカニズムも、Kafka クライアントが Kafka ブローカーで認証されたセッションを確立できるようにします。クライアント、承認サーバー、および Kafka ブローカー間の認証フローは、メカニズムごとに異なります。
可能な限り、OAUTHBEARER を使用するようにクライアントを設定することが推奨されます。OAUTHBEARER では、クライアントクレデンシャルは Kafka ブローカーと共有されることがないため、PLAIN よりも高レベルのセキュリティーが提供されます。OAUTHBEARER をサポートしない Kafka クライアントの場合のみ、PLAIN の使用を検討してください。
必要であれば、同じ oauth
リスナーで OAUTHBEARER と PLAIN を両方有効にすることができます。
OAUTHBEARER の概要
Kafka は OAUTHBEARER 認証メカニズムをサポートしますが、明示的に設定する必要があります。また、多くの Kafka クライアントツールでは、プロトコルレベルで OAUTHBEARER の基本サポートを提供するライブラリーを使用します。
AMQ Streams では、アプリケーションの開発を容易にするため、アップストリームの Kafka Client Java ライブラリー用の OAuth コールバックハンドラーが提供されます (ただし、他のライブラリーには提供されません)。そのため、このようなクライアントには独自のコールバックハンドラーを作成する必要はありません。アプリケーションクライアントはコールバックハンドラーを使用してアクセストークンを提供できます。Go などの他言語で書かれたクライアントは、カスタムコードを使用して承認サーバーに接続し、アクセストークンを取得する必要があります。
OAUTHBEARER を使用する場合、クライアントはクレデンシャルを交換するために Kafka ブローカーでセッションを開始します。ここで、クレデンシャルはコールバックハンドラーによって提供されるベアラートークンの形式を取ります。コールバックを使用して、以下の 3 つの方法のいずれかでトークンの提供を設定できます。
- クライアント ID および Secret (OAuth 2.0 クライアントクレデンシャルメカニズムを使用)
- 設定時に手動で取得された有効期限の長いアクセストークン
- 設定時に手動で取得された有効期限の長い更新トークン
OAUTHBEARER は、Kafka ブローカーの oauth
リスナー設定で自動的に有効になります。enableOauthBearer
プロパティーを true
に設定できますが、これは必須ではありません。
# ... authentication: type: oauth # ... enableOauthBearer: true
OAUTHBEARER 認証は、プロトコルレベルで OAUTHBEARER メカニズムをサポートする Kafka クライアントでのみ使用できます。
PLAIN の概要
PLAIN は、すべての Kafka クライアントツール (kafkacat などの開発者ツールを含む) によって使用される簡易認証メカニズムです。PLAIN を OAuth 2.0 認証とともに使用できるようにするため、AMQ Streams にはサーバー側のコールバックが含まれ、この OAuth 2.0 over PLAIN を呼び出します。
PLAIN の AMQ Streams 実装では、クライアントのクレデンシャルは ZooKeeper に保存されません。代わりに、OAUTHBEARER 認証が使用される場合と同様に、クライアントのクレデンシャルは準拠した承認サーバーの背後で一元的に処理されます。
OAuth 2.0 over PLAIN コールバックを併用する場合、以下のいずれかの方法を使用して Kafka クライアントは Kafka ブローカーで認証されます。
- クライアント ID およびシークレット (OAuth 2.0 クライアントクレデンシャルメカニズムを使用)
- 設定時に手動で取得された有効期限の長いアクセストークン
PLAIN 認証を使用し、username
および password
を提供するように、クライアントを有効にする必要があります。パスワードの最初に $accessToken:
が付けられ、その後にアクセストークンの値が続く場合は、Kafka ブローカーはパスワードをアクセストークンとして解釈します。それ以外の場合は、Kafka ブローカーは username
をクライアント ID として解釈し、password
をクライアントシークレットとして解釈します。
password
がアクセストークンとして設定されている場合、username
は Kafka ブローカーによってアクセストークンから取得されるプリンシパル名と同じになるように設定される必要があります。このプロセスは、userNameClaim
、fallbackUserNameClaim
、fallbackUsernamePrefix
、または userInfoEndpointUri
を使用してユーザー名の抽出を設定する方法によって異なります。また、承認サーバーによっても異なり、特にクライアント ID をアカウント名にマッピングする方法によります。
PLAIN を使用するには、Kafka ブローカーの oauth
リスナー設定で有効にする必要があります。
以下の例では、デフォルトで有効になっている OAUTHBEARER に加え、PLAIN も有効になっています。PLAIN のみを使用する場合は、enableOauthBearer
を false
に設定して OAUTHBEARER を無効にすることができます。
# ...
authentication:
type: oauth
# ...
enablePlain: true
tokenEndpointUri: https://OAUTH-SERVER-ADDRESS/auth/realms/external/protocol/openid-connect/token
その他のリソース
4.4.2. OAuth 2.0 Kafka ブローカーの設定
OAuth 2.0 の Kafka ブローカー設定には、以下が関係します。
- 承認サーバーでの OAuth 2.0 クライアントの作成
- Kafka カスタムリソースでの OAuth 2.0 認証の設定
承認サーバーに関連する Kafka ブローカーおよび Kafka クライアントはどちらも OAuth 2.0 クライアントと見なされます。
4.4.2.1. 承認サーバーの OAuth 2.0 クライアント設定
セッションの開始中に受信されたトークンを検証するように Kafka ブローカーを設定するには、承認サーバーで OAuth 2.0 の クライアント 定義を作成し、以下のクライアントクレデンシャルが有効な状態で 機密情報 として設定することが推奨されます。
-
kafka
のクライアント ID (例) - 認証メカニズムとしてのクライアント ID およびシークレット
承認サーバーのパブリックでないイントロスペクションエンドポイントを使用する場合のみ、クライアント ID およびシークレットを使用する必要があります。高速のローカル JWT トークンの検証と同様に、パブリック承認サーバーのエンドポイントを使用する場合は、通常クレデンシャルは必要ありません。
4.4.2.2. Kafka クラスターでの OAuth 2.0 認証設定
Kafka クラスターで OAuth 2.0 認証を使用するには、たとえば、認証方法が oauth
の Kafka クラスターカスタムリソースの TLS リスナー設定を指定します。
OAuth 2.0 の認証方法タイプの割り当て
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
spec:
kafka:
# ...
listeners:
- name: tls
port: 9093
type: internal
tls: true
authentication:
type: oauth
#...
plain
、tls
、および external
リスナーを設定できますが、OAuth 2.0 では TLS による暗号化が無効になっている plain
リスナーまたは external
リスナーを使用しないことが推奨されます。これは、ネットワークでのデータ漏えいの脆弱性や、トークンの盗難による不正アクセスへの脆弱性が発生するためです。
external
リスナーを type: oauth
で設定し、セキュアなトランスポート層がクライアントと通信するようにします。
OAuth 2.0 の外部リスナーとの使用
# ...
listeners:
- name: external
port: 9094
type: loadbalancer
tls: true
authentication:
type: oauth
#...
tls
プロパティーはデフォルトで false に設定されているため、有効にする必要があります。
認証のタイプを OAuth 2.0 として定義した場合、検証のタイプに基づいて、 高速のローカル JWT 検証 または イントロスペクションエンドポイントを使用したトークンの検証 のいずれかとして、設定を追加します。
説明や例を用いてリスナー向けに OAuth 2.0 を設定する手順は、「Kafka ブローカーの OAuth 2.0 サポートの設定」を参照してください。
4.4.2.3. 高速なローカル JWT トークン検証の設定
高速なローカル JWT トークンの検証では、JWTトークンの署名がローカルでチェックされます。
ローカルチェックでは、トークンに対して以下が確認されます。
-
アクセストークンに
Bearer
の (typ) 要求値が含まれ、トークンがタイプに準拠することを確認します。 - 有効であるか (期限切れでない) を確認します。
-
トークンに
validIssuerURI
と一致する発行元があることを確認します。
承認サーバーによって発行されなかったすべてのトークンが拒否されるよう、リスナーの設定時に validIssuerURI
属性を指定します。
高速のローカル JWT トークン検証の実行中に、承認サーバーの通信は必要はありません。OAuth 2.0 の承認サーバーによって公開されるエンドポイントの jwksEndpointUri
属性を指定して、高速のローカル JWT トークン検証をアクティベートします。エンドポイントには、署名済み JWT トークンの検証に使用される公開鍵が含まれます。これらは、Kafka クライアントによってクレデンシャルとして送信されます。
承認サーバーとの通信はすべて TLS による暗号化を使用して実行する必要があります。
証明書トラストストアを AMQ Streams プロジェクト namespace の OpenShift シークレットとして設定し、tlsTrustedCertificates
属性を使用してトラストストアファイルが含まれる OpenShift シークレットを示すことができます。
JWT トークンからユーザー名を適切に取得するため、userNameClaim
の設定を検討してください。Kafka ACL 承認を使用する場合は、認証中にユーザー名でユーザーを特定する必要があります。JWT トークンの sub
要求は、通常は一意な ID でユーザー名ではありません。
高速なローカル JWT トークン検証の設定例
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka spec: kafka: #... listeners: - name: tls port: 9093 type: internal tls: true authentication: type: oauth validIssuerUri: <https://<auth-server-address>/auth/realms/tls> jwksEndpointUri: <https://<auth-server-address>/auth/realms/tls/protocol/openid-connect/certs> userNameClaim: preferred_username maxSecondsWithoutReauthentication: 3600 tlsTrustedCertificates: - secretName: oauth-server-cert certificate: ca.crt #...
4.4.2.4. OAuth 2.0 イントロスペクションエンドポイントの設定
OAuth 2.0 のイントロスペクションエンドポイントを使用したトークンの検証では、受信したアクセストークンは不透明として対処されます。Kafka ブローカーは、アクセストークンをイントロスペクションエンドポイントに送信します。このエンドポイントは、検証に必要なトークン情報を応答として返します。ここで重要なのは、特定のアクセストークンが有効である場合は最新情報を返すことで、トークンの有効期限に関する情報も返します。
OAuth 2.0 のイントロスペクションベースの検証を設定するには、高速のローカル JWT トークン検証に指定された jwksEndpointUri
属性ではなく、introspectionEndpointUri
属性を指定します。通常、イントロスペクションエンドポイントは保護されているため、承認サーバーに応じて clientId
および clientSecret
を指定する必要があります。
イントロスペクションエンドポイントの設定例
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka spec: kafka: listeners: - name: tls port: 9093 type: internal tls: true authentication: type: oauth clientId: kafka-broker clientSecret: secretName: my-cluster-oauth key: clientSecret validIssuerUri: <https://<auth-server-address>/auth/realms/tls> introspectionEndpointUri: <https://<auth-server-address>/auth/realms/tls/protocol/openid-connect/token/introspect> userNameClaim: preferred_username maxSecondsWithoutReauthentication: 3600 tlsTrustedCertificates: - secretName: oauth-server-cert certificate: ca.crt
4.4.3. Kafka ブローカーの再認証の設定
Kafka クライアントと Kafka ブローカー間の OAuth 2.0 セッションに Kafka セッション再認証 を使用するように、oauth
リスナーを設定できます。このメカニズムは、定義された期間後に、クライアントとブローカー間の認証されたセッションを期限切れにします。セッションの有効期限が切れると、クライアントは既存のコネクションを破棄せずに再使用して、新しいセッションを即座に開始します。
セッションの再認証はデフォルトで無効になっています。これを有効にするには、oauth
リスナー設定で maxSecondsWithoutReauthentication
に時間の値を設定します。OAUTHBEARER および PLAIN 認証では、同じプロパティーを使用してセッションの再認証が設定されます。設定例については、「Kafka ブローカーの OAuth 2.0 サポートの設定」 を参照してください。
セッションの再認証は、クライアントによって使用される Kafka クライアントライブラリーによってサポートされる必要があります。
セッションの再認証は、高速ローカル JWT またはイントロスペクションエンドポイントのトークン検証と使用できます。
クライアントの再認証
ブローカーの認証されたセッションが期限切れになると、クライアントは接続を切断せずに新しい有効なアクセストークンをブローカーに送信し、既存のセッションを再認証する必要があります。
トークンの検証に成功すると、既存の接続を使用して新しいクライアントセッションが開始されます。クライアントが再認証に失敗した場合、さらにメッセージを送受信しようとすると、ブローカーは接続を閉じます。ブローカーで再認証メカニズムが有効になっていると、Kafka クライアントライブラリー 2.2 以降を使用する Java クライアントが自動的に再認証されます。
更新トークンが使用される場合、セッションの再認証は更新トークンにも適用されます。セッションが期限切れになると、クライアントは更新トークンを使用してアクセストークンを更新します。その後、クライアントは新しいアクセストークンを使用して既存のセッションに再認証されます。
OAUTHBEARER および PLAIN のセッションの有効期限
セッションの再認証が設定されている場合、OAUTHBEARER と PLAIN 認証ではセッションの有効期限は異なります。
クライアント ID とシークレットによる方法を使用する OAUTHBEARER および PLAIN の場合:
-
ブローカーの認証されたセッションは、設定された
maxSecondsWithoutReauthentication
で期限切れになります。 - アクセストークンが設定期間前に期限切れになると、セッションは設定期間前に期限切れになります。
有効期間の長いアクセストークンによる方法を使用する PLAIN の場合:
-
ブローカーの認証されたセッションは、設定された
maxSecondsWithoutReauthentication
で期限切れになります。 - アクセストークンが設定期間前に期限切れになると、再認証に失敗します。セッションの再認証は試行されますが、PLAIN にはトークンを更新するメカニズムがありません。
maxSecondsWithoutReauthentication
が設定されていない場合は、再認証しなくても、OAUTHBEARER および PLAIN クライアントはブローカーへの接続を無期限に維持します。認証されたセッションは、アクセストークンの期限が切れても終了しません。ただし、これは keycloak
承認を使用したり、カスタムオーソライザーをインストールしたりして、承認を設定する場合に考慮されます。
4.4.4. OAuth 2.0 Kafka クライアントの設定
Kafka クライアントは以下のいずれかで設定されます。
- 承認サーバーから有効なアクセストークンを取得するために必要なクレデンシャル (クライアント ID およびシークレット)。
- 承認サーバーから提供されたツールを使用して取得された、有効期限の長い有効なアクセストークンまたは更新トークン。
アクセストークンは、Kafka ブローカーに送信される唯一の情報です。アクセストークンを取得するために承認サーバーでの認証に使用されるクレデンシャルは、ブローカーに送信されません。
クライアントによるアクセストークンの取得後、承認サーバーと通信する必要はありません。
クライアント ID とシークレットを使用した認証が最も簡単です。有効期間の長いアクセストークンまたは更新トークンを使用すると、承認サーバーツールに追加の依存関係があるため、より複雑になります。
有効期間が長いアクセストークンを使用している場合は、承認サーバーでクライアントを設定し、トークンの最大有効期間を長くする必要があります。
Kafka クライアントが直接アクセストークンで設定されていない場合、クライアントは承認サーバーと通信して Kafka セッションの開始中にアクセストークンのクレデンシャルを交換します。Kafka クライアントは以下のいずれかを交換します。
- クライアント ID およびシークレット
- クライアント ID、更新トークン、および (任意の) シークレット
4.4.5. OAuth 2.0 のクライアント認証フロー
ここでは、Kafka セッションの開始時における Kafka クライアント、Kafka ブローカー、および承認ブローカー間の通信フローを説明および可視化します。フローは、クライアントとサーバーの設定によって異なります。
Kafka クライアントがアクセストークンをクレデンシャルとして Kafka ブローカーに送信する場合、トークンを検証する必要があります。
使用する承認サーバーや利用可能な設定オプションによっては、以下の使用が適している場合があります。
- 承認サーバーと通信しない、JWT の署名確認およびローカルトークンのイントロスペクションをベースとした高速なローカルトークン検証。
- 承認サーバーによって提供される OAuth 2.0 のイントロスペクションエンドポイント。
高速のローカルトークン検証を使用するには、トークンでの署名検証に使用される公開証明書のある JWKS エンドポイントを提供する承認サーバーが必要になります。
この他に、承認サーバーで OAuth 2.0 のイントロスペクションエンドポイントを使用することもできます。新しい Kafka ブローカー接続が確立されるたびに、ブローカーはクライアントから受け取ったアクセストークンを承認サーバーに渡し、応答を確認してトークンが有効であるかどうかを確認します。
Kafka クライアントのクレデンシャルは以下に対して設定することもできます。
- 以前に生成された有効期間の長いアクセストークンを使用した直接ローカルアクセス。
- 新しいアクセストークンの発行についての承認サーバーとの通信。
承認サーバーは不透明なアクセストークンの使用のみを許可する可能性があり、この場合はローカルトークンの検証は不可能です。
4.4.5.1. クライアント認証フローの例
Kafka クライアントおよびブローカーが以下に設定されている場合の、Kafka セッション認証中のコミュニケーションフローを確認できます。
クライアントではクライアント ID とシークレットが使用され、ブローカーによって検証が承認サーバーに委譲される場合
- Kafka クライアントは承認サーバーからアクセストークンを要求します。これにはクライアント ID とシークレットを使用し、任意で更新トークンも使用します。
- 承認サーバーによって新しいアクセストークンが生成されます。
- Kafka クライアントは SASL OAUTHBEARER メカニズムを使用してアクセストークンを渡し、Kafka ブローカーの認証を行います。
- Kafka ブローカーは、独自のクライアント ID およびシークレットを使用して、承認サーバーでトークンイントロスペクションエンドポイントを呼び出し、アクセストークンを検証します。
- トークンが有効な場合は、Kafka クライアントセッションが確立されます。
クライアントではクライアント ID およびシークレットが使用され、ブローカーによって高速のローカルトークン検証が実行される場合
- Kafka クライアントは、トークンエンドポイントから承認サーバーの認証を行います。これにはクライアント ID とシークレットが使用され、任意で更新トークンも使用されます。
- 承認サーバーによって新しいアクセストークンが生成されます。
- Kafka クライアントは SASL OAUTHBEARER メカニズムを使用してアクセストークンを渡し、Kafka ブローカーの認証を行います。
- Kafka ブローカーは、JWT トークン署名チェックおよびローカルトークンイントロスペクションを使用して、ローカルでアクセストークンを検証します。
クライアントでは有効期限の長いアクセストークンが使用され、ブローカーによって検証が承認サーバーに委譲される場合
- Kafka クライアントは、SASL OAUTHBEARER メカニズムを使用して有効期限の長いアクセストークンを渡し、Kafka ブローカーの認証を行います。
- Kafka ブローカーは、独自のクライアント ID およびシークレットを使用して、承認サーバーでトークンイントロスペクションエンドポイントを呼び出し、アクセストークンを検証します。
- トークンが有効な場合は、Kafka クライアントセッションが確立されます。
クライアントでは有効期限の長いアクセストークンが使用され、ブローカーによって高速のローカル検証が実行される場合
- Kafka クライアントは、SASL OAUTHBEARER メカニズムを使用して有効期限の長いアクセストークンを渡し、Kafka ブローカーの認証を行います。
- Kafka ブローカーは、JWT トークン署名チェックおよびローカルトークンイントロスペクションを使用して、ローカルでアクセストークンを検証します。
トークンが取り消された場合に承認サーバーとのチェックが行われないため、高速のローカル JWT トークン署名の検証は有効期限の短いトークンにのみ適しています。トークンの有効期限はトークンに書き込まれますが、失効はいつでも発生する可能性があるため、承認サーバーと通信せずに対応することはできません。発行されたトークンはすべて期限切れになるまで有効とみなされます。
4.4.6. OAuth 2.0 認証の設定
OAuth 2.0 は、Kafka クライアントと AMQ Streams コンポーネントとの対話に使用されます。
AMQ Streams に OAuth 2.0 を使用するには、以下を行う必要があります。
4.4.6.1. OAuth 2.0 承認サーバーとしての Red Hat Single Sign-On の設定
この手順では、Red Hat Single Sign-On を承認サーバーとしてデプロイし、AMQ Streams と統合するための設定方法を説明します。
承認サーバーは、一元的な認証および承認の他、ユーザー、クライアント、およびパーミッションの一元管理を実現します。Red Hat Single Sign-On にはレルムの概念があります。レルム はユーザー、クライアント、パーミッション、およびその他の設定の個別のセットを表します。デフォルトの マスターレルム を使用できますが、新しいレルムを作成することもできます。各レルムは独自の OAuth 2.0 エンドポイントを公開します。そのため、アプリケーションクライアントとアプリケーションサーバーはすべて同じレルムを使用する必要があります。
AMQ Streams で OAuth 2.0 を使用するには、Red Hat Single Sign-On のデプロイメントを使用して認証レルムを作成および管理します。
Red Hat Single Sign-On がすでにデプロイされている場合は、デプロイメントの手順を省略して、現在のデプロイメントを使用できます。
作業を開始する前の注意事項
Red Hat Single Sign-On を使用するための知識が必要です。
デプロイメントおよび管理の手順は、以下を参照してください。
前提条件
- AMQ Streams および Kafka が稼働している必要があります。
Red Hat Single Sign-On デプロイメントに関する条件:
- 「Red Hat Single Sign-On でサポートされる構成」を確認しておく必要があります。
- インストールには、system:admin などの cluster-admin ロールを持つユーザーが必要です。
手順
Red Hat Single Sign-On を OpenShift クラスターにデプロイします。
OpenShift Web コンソールでデプロイメントの進捗を確認します。
Red Hat Single Sign-On の Admin Console にログインし、AMQ Streams の OAuth 2.0 ポリシーを作成します。
ログインの詳細は、Red Hat Single Sign-On のデプロイ時に提供されます。
レルムを作成し、有効にします。
既存のマスターレルムを使用できます。
- 必要に応じて、レルムのセッションおよびトークンのタイムアウトを調整します。
-
kafka-broker
というクライアントを作成します。 -
Access Type を
Confidential
に設定します。 -
Standard Flow Enabled を
OFF
に設定し、このクライアントからの Web ログインを無効にします。 -
Service Accounts Enabled を
ON
に設定し、このクライアントが独自の名前で認証できるようにします。
-
Access Type を
- 続行する前に Save クリックします。
- タブにある、AMQ Streams の Kafka クラスター設定で使用するシークレットを書き留めておきます。
Kafka ブローカーに接続するすべてのアプリケーションクライアントに対して、このクライアント作成手順を繰り返し行います。
新しいクライアントごとに定義を作成します。
設定では、名前をクライアント ID として使用します。
次のステップ
承認サーバーのデプロイおよび設定後に、Kafka ブローカーが OAuth 2.0 を使用するように設定 します。
4.4.6.2. Kafka ブローカーの OAuth 2.0 サポートの設定
この手順では、ブローカーリスナーが承認サーバーを使用して OAuth 2.0 認証を使用するように、Kafka ブローカーを設定する方法について説明します。
TLS リスナーを設定して、暗号化されたインターフェースで OAuth 2.0 を使用することが推奨されます。プレーンリスナーは推奨されません。
承認サーバーが信頼できる CA によって署名された証明書を使用し、OAuth 2.0 サーバーのホスト名と一致する場合、TLS 接続はデフォルト設定を使用して動作します。それ以外の場合は、プローバー証明書でトラストストアを設定するか、証明書のホスト名の検証を無効にする必要があります。
Kafka ブローカーの設定する場合、新たに接続された Kafka クライアントの OAuth 2.0 認証中にアクセストークンを検証するために使用されるメカニズムには、以下の 2 つのオプションがあります。
作業を開始する前の注意事項
Kafka ブローカーリスナーの OAuth 2.0 認証の設定に関する詳細は、以下を参照してください。
前提条件
- AMQ Streams および Kafka が稼働している必要があります。
- OAuth 2.0 の承認サーバーがデプロイされている必要があります。
手順
エディターで、
Kafka
リソースの Kafka ブローカー設定 (Kafka.spec.kafka
) を更新します。oc edit kafka my-cluster
Kafka ブローカーの
listeners
設定を行います。各タイプのリスナーは独立しているため、同じ設定にする必要はありません。
以下は、外部リスナーに設定された設定オプションの例になります。
例 1: 高速なローカル JWT トークン検証の設定
#... - name: external port: 9094 type: loadbalancer tls: true authentication: type: oauth 1 validIssuerUri: <https://<auth-server-address>/auth/realms/external> 2 jwksEndpointUri: <https://<auth-server-address>/auth/realms/external/protocol/openid-connect/certs> 3 userNameClaim: preferred_username 4 maxSecondsWithoutReauthentication: 3600 5 tlsTrustedCertificates: 6 - secretName: oauth-server-cert certificate: ca.crt disableTlsHostnameVerification: true 7 jwksExpirySeconds: 360 8 jwksRefreshSeconds: 300 9 jwksMinRefreshPauseSeconds: 1 10 enableECDSA: "true" 11
- 1
oauth
に設定されたリスナータイプ。- 2
- 認証に使用されるトークン発行者の URI。
- 3
- ローカルの JWT 検証に使用される JWKS 証明書エンドポイントの URI。
- 4
- トークンの実際のユーザー名が含まれるトークン要求 (またはキー)。ユーザー名は、ユーザーの識別に使用される principal です。
userNameClaim
の値は、使用される認証フローと承認サーバーによって異なります。 - 5
- (任意設定): セッションの有効期限がアクセストークンと同じ期間になるよう強制する Kafka の再認証メカニズムを有効にします。指定された値がアクセストークンの有効期限が切れるまでの残り時間未満の場合、クライアントは実際にトークンの有効期限が切れる前に再認証する必要があります。デフォルトでは、アクセストークンの期限が切れてもセッションは期限切れにならず、クライアントは再認証を試行しません。
- 6
- (任意設定): 承認サーバーへの TLS 接続用の信用できる証明書。
- 7
- (任意設定): TLS ホスト名の検証を無効にします。デフォルトは
false
です。 - 8
- JWKS 証明書が期限切れになる前に有効であるとみなされる期間。デフォルトは
360
秒 です。デフォルトよりも長い時間を指定する場合は、無効になった証明書へのアクセスが許可されるリスクを考慮してください。 - 9
- JWKS 証明書を更新する間隔。この間隔は、有効期間よりも 60 秒以上短くする必要があります。デフォルトは
300
秒 です。 - 10
- JWKS 公開鍵の更新が連続して試行される間隔の最小一時停止時間 (秒単位)。不明な署名キーが検出されると、JWKS キーの更新は、最後に更新を試みてから少なくとも指定された期間は一時停止し、通常の定期スケジュール以外でスケジュールされます。キーの更新はエクスポネンシャルバックオフ (exponential backoff) のルールに従い、
jwksRefreshSeconds
に到達するまで、一時停止を増やして失敗した更新の再試行を行います。デフォルト値は 1 です。 - 11
- (任意設定): ECDSA を使用して承認サーバーで JWT トークンを署名する場合は、これを有効にする必要があります。BouncyCastle 暗号ライブラリーを使用して追加の暗号プロバイダーがインストールされます。デフォルトは
false
です。
例 2: イントロスペクションエンドポイントを使用したトークンの検証の設定
- name: external port: 9094 type: loadbalancer tls: true authentication: type: oauth validIssuerUri: <https://<auth-server-address>/auth/realms/external> introspectionEndpointUri: <https://<auth-server-address>/auth/realms/external/protocol/openid-connect/token/introspect> 1 clientId: kafka-broker 2 clientSecret: 3 secretName: my-cluster-oauth key: clientSecret userNameClaim: preferred_username 4 maxSecondsWithoutReauthentication: 3600 5
- 1
- トークンイントロスペクションエンドポイントの URI。
- 2
- クライアントを識別するためのクライアント ID。
- 3
- 認証にはクライアントシークレットとクライアント ID が使用されます。
- 4
- トークンの実際のユーザー名が含まれるトークン要求 (またはキー)。ユーザー名は、ユーザーの識別に使用される principal です。
userNameClaim
の値は、使用される承認サーバーによって異なります。 - 5
- (任意設定): セッションの有効期限がアクセストークンと同じ期間になるよう強制する Kafka の再認証メカニズムを有効にします。指定された値がアクセストークンの有効期限が切れるまでの残り時間未満の場合、クライアントは実際にトークンの有効期限が切れる前に再認証する必要があります。デフォルトでは、アクセストークンの期限が切れてもセッションは期限切れにならず、クライアントは再認証を試行しません。
OAuth 2.0 認証の適用方法や、承認サーバーのタイプによっては、追加 (任意) の設定を使用できます。
# ... authentication: type: oauth # ... checkIssuer: false 1 checkAudience: true 2 fallbackUserNameClaim: client_id 3 fallbackUserNamePrefix: client-account- 4 validTokenType: bearer 5 userInfoEndpointUri: https://OAUTH-SERVER-ADDRESS/auth/realms/external/protocol/openid-connect/userinfo 6 enableOauthBearer: false 7 enablePlain: true 8 tokenEndpointUri: https://OAUTH-SERVER-ADDRESS/auth/realms/external/protocol/openid-connect/token 9 customClaimCheck: "@.custom == 'custom-value'" 10
- 1
- 承認サーバーが
iss
クレームを提供しない場合は、発行者チェックを行うことができません。このような場合、checkIssuer
をfalse
に設定し、validIssuerUri
を指定しないようにします。デフォルトはtrue
です。 - 2
- 承認サーバーによって
aud
(オーディエンス) クレームが提供され、オーディエンスチェックを強制する場合は、checkAudience
をtrue
に設定します。オーディエンスチェックによって、トークンの目的の受信者が特定されます。そのため、Kafka ブローカーによって、aud
クレームにclientId
のないトークンは拒否されます。デフォルトはfalse
です。 - 3
- 承認サーバーは、通常ユーザーとクライアントの両方を識別する単一の属性を提供しない場合があります。クライアントが独自の名前で認証される場合、サーバーによって クライアント ID が提供されることがあります。更新トークンまたはアクセストークンを取得するために、ユーザー名およびパスワードを使用してユーザーが認証される場合、サーバーによってクライアント ID の他に ユーザー名 が提供されることがあります。プライマリーユーザー ID 属性が使用できない場合は、このフォールバックオプションで、使用するユーザー名クレーム (属性) を指定します。
- 4
fallbackUserNameClaim
が適用される場合、ユーザー名クレームの値とフォールバックユーザー名クレームの値が競合しないようにする必要もあることがあります。producer
というクライアントが存在し、producer
という通常ユーザーも存在する場合について考えてみましょう。この 2 つを区別するには、このプロパティーを使用してクライアントのユーザー ID に接頭辞を追加します。- 5
- (
introspectionEndpointUri
を使用する場合のみ該当): 使用している認証サーバーによっては、イントロスペクションエンドポイントによってトークンタイプ属性が返されるかどうかは分からず、異なる値が含まれることがあります。イントロスペクションエンドポイントからの応答に含まれなければならない有効なトークンタイプ値を指定できます。 - 6
- (
introspectionEndpointUri
を使用する場合のみ該当): イントロスペクションエンドポイントの応答に識別可能な情報が含まれないように、承認サーバーが設定または実装されることがあります。ユーザー ID を取得するには、userinfo
エンドポイントの URI をフォールバックとして設定します。userNameClaim
、fallbackUserNameClaim
、およびfallbackUserNamePrefix
設定がuserinfo
エンドポイントの応答に適用されます。 - 7
- これを
false`to disable the OAUTHBEARER mechanism on the listener. At least one of PLAIN or OAUTHBEARER has to be enabled. Default is `true
に設定します。 - 8
- リスナーで PLAIN メカニズムを有効にするには、これを
true
に設定します。これは、すべてのプラットフォームのすべてのクライアントでサポートされます。Kafka クライアントで PLAIN メカニズムを有効にし、username
およびpassword
を設定する必要があります。このメカニズムは、OAuth アクセストークンを使用するか、OAuth クライアント ID およびシークレット (クライアントクレデンシャル) を使用して、認証するために使用できます。クライアントによって、文字列$accessToken:
で始まるpassword
が設定された場合、パスワードはサーバー上のアクセストークンとして解釈され、username
はアカウントのユーザー名として解釈されます。それ以外の場合は、ユーザーはクライアント ID として解釈され、パスワードはクライアントシークレットとして解釈されます。デフォルトはfalse
です。 - 9
- これは、前のポイントで説明されているように、
enablePlain
が true に設定されている場合にクライアントクレデンシャル認証をサポートするように設定する必要があります。 - 10
- これを JsonPath フィルタークエリーに設定すると、検証中に追加のカスタムルールを JWT アクセストークンに適用できます。アクセストークンに必要なデータが含まれていないと拒否されます。
introspectionEndpointUri
を使用する場合、カスタムチェックはイントロスペクションエンドポイントの応答 JSON に適用されます。
- エディターを保存して終了し、ローリングアップデートの完了を待ちます。
ログで更新を確認するか、Pod の状態遷移を監視して確認します。
oc logs -f ${POD_NAME} -c ${CONTAINER_NAME} oc get pod -w
ローリングアップデートによって、ブローカーが OAuth 2.0 認証を使用するように設定されます。
次のステップ
4.4.6.3. OAuth 2.0 を使用するよう Kafka Java クライアントを設定
この手順では、Kafka ブローカーとの対話に OAuth 2.0 を使用するように Kafka プロデューサーおよびコンシューマー API を設定する方法を説明します。
クライアントコールバックプラグインを pom.xml ファイルに追加し、システムプロパティーを設定します。
前提条件
- AMQ Streams および Kafka が稼働している必要があります。
- OAuth 2.0 承認サーバーがデプロイされ、Kafka ブローカーへの OAuth のアクセスが設定されている必要があります。
- Kafka ブローカーが OAuth 2.0 に対して設定されている必要があります。
手順
OAuth 2.0 サポートのあるクライアントライブラリーを Kafka クライアントの
pom.xml
ファイルに追加します。<dependency> <groupId>io.strimzi</groupId> <artifactId>kafka-oauth-client</artifactId> <version>0.7.1.redhat-00003</version> </dependency>
コールバックのシステムプロパティーを設定します。
以下に例を示します。
System.setProperty(ClientConfig.OAUTH_TOKEN_ENDPOINT_URI, “https://<auth-server-address>/auth/realms/master/protocol/openid-connect/token”); 1 System.setProperty(ClientConfig.OAUTH_CLIENT_ID, "<client-name>"); 2 System.setProperty(ClientConfig.OAUTH_CLIENT_SECRET, "<client-secret>"); 3
Kafka クライアント設定の TLS で暗号化された接続で SASL OAUTHBEARER メカニズムを有効にします。
以下に例を示します。
props.put("sasl.jaas.config", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"); props.put("security.protocol", "SASL_SSL"); 1 props.put("sasl.mechanism", "OAUTHBEARER"); props.put("sasl.login.callback.handler.class", "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler");
- 1
- この例では、TLS 接続で
SASL_SSL
を使用します。暗号化されていない接続ではSASL_PLAINTEXT
を使用します。
- Kafka クライアントが Kafka ブローカーにアクセスできることを確認します。
次のステップ
4.4.6.4. Kafka コンポーネントの OAuth 2.0 の設定
この手順では、承認サーバーを使用して OAuth 2.0 認証を使用するように Kafka コンポーネントを設定する方法を説明します。
以下の認証を設定できます。
- Kafka Connect
- Kafka MirrorMaker
- Kafka Bridge
この手順では、Kafka コンポーネントと承認サーバーは同じサーバーで稼働しています。
作業を開始する前の注意事項
Kafka コンポーネントの OAuth 2.0 認証の設定に関する詳細は、以下を参照してください。
前提条件
- AMQ Streams および Kafka が稼働している必要があります。
- OAuth 2.0 承認サーバーがデプロイされ、Kafka ブローカーへの OAuth のアクセスが設定されている必要があります。
- Kafka ブローカーが OAuth 2.0 に対して設定されている必要があります。
手順
クライアントシークレットを作成し、これを環境変数としてコンポーネントにマウントします。
以下は、Kafka Bridge の
Secret
を作成する例になります。apiVersion: kafka.strimzi.io/v1beta2 kind: Secret metadata: name: my-bridge-oauth type: Opaque data: clientSecret: MGQ1OTRmMzYtZTllZS00MDY2LWI5OGEtMTM5MzM2NjdlZjQw 1
- 1
clientSecret
キーは base64 形式である必要があります。
Kafka コンポーネントのリソースを作成または編集し、OAuth 2.0 認証が認証プロパティーに設定されるようにします。
OAuth 2.0 認証では、以下を使用できます。
- クライアント ID およびシークレット
- クライアント ID および更新トークン
- アクセストークン
- TLS
KafkaClientAuthenticationOAuth スキーマ参照は、それぞれの例を提供します。
以下は、クライアント ID、シークレット、および TLS を使用して OAuth 2.0 が Kafka Bridge クライアントに割り当てられる例になります。
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaBridge metadata: name: my-bridge spec: # ... authentication: type: oauth 1 tokenEndpointUri: https://<auth-server-address>/auth/realms/master/protocol/openid-connect/token 2 clientId: kafka-bridge clientSecret: secretName: my-bridge-oauth key: clientSecret tlsTrustedCertificates: 3 - secretName: oauth-server-cert certificate: tls.crt
OAuth 2.0 認証の適用方法や、承認サーバーのタイプによって、使用できる追加の設定オプションがあります。
# ... spec: # ... authentication: # ... disableTlsHostnameVerification: true 1 checkAccessTokenType: false 2 accessTokenIsJwt: false 3 scope: any 4
- 1
- (任意設定): TLS ホスト名の検証を無効にします。デフォルトは
false
です。 - 2
- 承認サーバーによって、JWT トークン内部で
typ
(タイプ) 要求が返されない場合は、checkAccessTokenType: false
を適用するとトークンタイプがチェックされず次に進むことができます。デフォルトはtrue
です。 - 3
- 不透明なトークンを使用している場合、アクセストークンが JWT トークンとして処理されないように
accessTokenIsJwt: false
を適用することができます。 - 4
- (任意設定): トークンエンドポイントからトークンを要求するための
scope
。認証サーバーでは、クライアントによるスコープの指定が必要になることがあります。この場合ではany
になります。
Kafka リソースのデプロイメントに変更を適用します。
oc apply -f your-file
ログで更新を確認するか、Pod の状態遷移を監視して確認します。
oc logs -f ${POD_NAME} -c ${CONTAINER_NAME} oc get pod -w
ローリングアップデートでは、OAuth 2.0 認証を使用して Kafka ブローカーと対話するコンポーネントが設定されます。
4.5. OAuth 2.0 トークンベース承認の使用
AMQ Streams は、Red Hat Single Sign-On の 承認サービス による OAuth 2.0 トークンベースの承認をサポートします。これにより、セキュリティーポリシーとパーミッションの一元的な管理が可能になります。
Red Hat Single Sign-On で定義されたセキュリティーポリシーおよびパーミッションは、Kafka ブローカーのリソースへのアクセスを付与するために使用されます。ユーザーとクライアントは、Kafka ブローカーで特定のアクションを実行するためのアクセスを許可するポリシーに対して照合されます。
Kafka では、デフォルトですべてのユーザーがブローカーに完全アクセスできます。また、アクセス制御リスト (ACL) を基にして承認を設定するために AclAuthorizer
プラグインが提供されます。
ZooKeeper には、 ユーザー名 を基にしてリソースへのアクセスを付与または拒否する ACL ルールが保存されます。ただし、Red Hat Single Sign-On を使用した OAuth 2.0 トークンベースの承認では、より柔軟にアクセス制御を Kafka ブローカーに実装できます。さらに、Kafka ブローカーで OAuth 2.0 の承認および ACL が使用されるように設定することができます。
4.5.1. OAuth 2.0 の承認メカニズム
AMQ Streams の OAuth 2.0 での承認では、Red Hat Single Sign-On サーバーの Authorization Services REST エンドポイントを使用して、Red Hat Single Sign-On を使用するトークンベースの認証が拡張されます。これは、定義されたセキュリティーポリシーを特定のユーザーに適用し、そのユーザーの異なるリソースに付与されたパーミッションの一覧を提供します。ポリシーはロールとグループを使用して、パーミッションをユーザーと照合します。OAuth 2.0 の承認では、Red Hat Single Sign-On の Authorization Services から受信した、ユーザーに付与された権限のリストを基にして、権限がローカルで強制されます。
4.5.1.1. Kafka ブローカーのカスタムオーソライザー
AMQ Streams では、Red Hat Single Sign-On の オーソライザー (KeycloakRBACAuthorizer
) が提供されます。Red Hat Single Sign-On によって提供される Authorization Services で Red Hat Single Sign-On REST エンドポイントを使用できるようにするには、Kafka ブローカーでカスタムオーソライザーを設定します。
オーソライザーは必要に応じて付与された権限のリストを承認サーバーから取得し、ローカルで Kafka ブローカーに承認を強制するため、クライアントの要求ごとに迅速な承認決定が行われます。
4.5.2. OAuth 2.0 承認サポートの設定
この手順では、Red Hat Single Sign-On の Authorization Services を使用して、OAuth 2.0 承認を使用するように Kafka ブローカーを設定する方法を説明します。
作業を始める前に
特定のユーザーに必要なアクセス、または制限するアクセスについて検討してください。Red Hat Single Sign-On では、Red Hat Single Sign-On の グループ、ロール、クライアント、および ユーザー の組み合わせを使用して、アクセスを設定できます。
通常、グループは組織の部門または地理的な場所を基にしてユーザーを照合するために使用されます。また、ロールは職務を基にしてユーザーを照合するために使用されます。
Red Hat Single Sign-On を使用すると、ユーザーおよびグループを LDAP で保存できますが、クライアントおよびロールは LDAP で保存できません。ユーザーデータへのアクセスとストレージを考慮して、承認ポリシーの設定方法を選択する必要がある場合があります。
スーパーユーザー は、Kafka ブローカーに実装された承認にかかわらず、常に制限なく Kafka ブローカーにアクセスできます。
前提条件
- AMQ Streams は、トークンベースの認証 に Red Hat Single Sign-On と OAuth 2.0 を使用するように設定されている必要があります。承認を設定するときに、同じ Red Hat Single Sign-On サーバーエンドポイントを使用する必要があります。
-
OAuth 2.0 認証は、再認証を有効にするために
maxSecondsWithoutReauthentication
オプションで設定する必要があります。
手順
- Red Hat Single Sign-On の Admin Console にアクセスするか、Red Hat Single Sign-On の Admin CLI を使用して、OAuth 2.0 認証の設定時に作成した Kafka ブローカークライアントの Authorization Services を有効にします。
- 承認サービスを使用して、クライアントのリソース、承認スコープ、ポリシー、およびパーミッションを定義します。
- ロールとグループをユーザーとクライアントに割り当てて、パーミッションをユーザーとクライアントにバインドします。
エディターで
Kafka
リソースの Kafka ブローカー設定 (Kafka.spec.kafka
) を更新して、Kafka ブローカーで Red Hat Single Sign-On による承認が使用されるように設定します。oc edit kafka my-cluster
Kafka ブローカーの
kafka
設定を指定して、keycloak
による承認を使用し、承認サーバーと Red Hat Single Sign-On の Authorization Services にアクセスできるようにします。以下に例を示します。
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: my-cluster spec: kafka: # ... authorization: type: keycloak 1 tokenEndpointUri: <https://<auth-server-address>/auth/realms/external/protocol/openid-connect/token> 2 clientId: kafka 3 delegateToKafkaAcls: false 4 disableTlsHostnameVerification: false 5 superUsers: 6 - CN=fred - sam - CN=edward tlsTrustedCertificates: 7 - secretName: oauth-server-cert certificate: ca.crt grantsRefreshPeriodSeconds: 60 8 grantsRefreshPoolSize: 5 9 #...
- 1
- タイプ
keycloak
によって Red Hat Single Sign-On の承認が有効になります。 - 2
- Red Hat Single Sign-On トークンエンドポイントの URI。本番環境では常に HTTP を使用してください。
- 3
- 承認サービスが有効になっている Red Hat Single Sign-On の OAuth 2.0 クライアント定義のクライアント ID。通常、
kafka
が ID として使用されます。 - 4
- (任意設定): Red Hat Single Sign-On の Authorization Services のポリシーによってアクセスが拒否される場合は、Kafka
AclAuthorizer
に承認を委譲します。デフォルトはfalse
です。 - 5
- (任意設定): TLS ホスト名の検証を無効にします。デフォルトは
false
です。 - 6
- (任意設定): 指定の スーパーユーザー。
- 7
- (任意設定): 承認サーバーへの TLS 接続用の信用できる証明書。
- 8
- (任意設定): 連続する付与 (Grants) 更新実行の間隔。これは、アクティブなセッションが Red Hat Single Sign-On でユーザーのパーミッション変更を検出する最大時間です。デフォルト値は 60 です。
- 9
- (任意設定): アクティブなセッションの付与 (Grants) の更新 (並行して) に使用するスレッドの数。デフォルト値は 5 です。
- エディターを保存して終了し、ローリングアップデートの完了を待ちます。
ログで更新を確認するか、Pod の状態遷移を監視して確認します。
oc logs -f ${POD_NAME} -c kafka oc get pod -w
ローリングアップデートによって、ブローカーが OAuth 2.0 承認を使用するように設定されます。
- クライアントまたは特定のロールを持つユーザーとして Kafka ブローカーにアクセスして、設定したパーミッションを検証し、必要なアクセス権限があり、付与されるべきでないアクセス権限がないことを確認します。