5.2. セキュアなアクセスのためのクライアントのセットアップ
セキュアな接続をサポートするために Kafka ブローカー上にリスナーを設定したら、次のステップは、これらのリスナーを使用して Kafka クラスターと通信するようにクライアントアプリケーションを設定することです。これには、リスナーに設定されたセキュリティーメカニズムに基づいて、クラスターで認証するための適切なセキュリティー設定を各クライアントに提供することが含まれます。
5.2.1. セキュリティープロトコルの設定
クライアントアプリケーションで使用されるセキュリティープロトコルを Kafka ブローカーリスナーで設定されたプロトコルと一致するように設定します。たとえば、TLS 認証には SSL
(Secure Sockets Layer) を使用し、TLS 暗号化を使用した SASL (Simple Authentication and Security Layer over SSL) 認証には SASL_SSL
を使用します。Kafka クラスターへのアクセスに必要な認証メカニズムをサポートするトラストストアとキーストアをクライアント設定に追加します。
- Truststore
- トラストストアには、Kafka ブローカーの信頼性を検証するために使用される、信頼された認証局 (CA) のパブリック証明書が含まれています。クライアントがセキュアな Kafka ブローカーに接続するとき、ブローカーの ID を確認する必要がある場合があります。
- キーストア
- キーストアには、クライアントの秘密鍵とその公開証明書が含まれています。クライアントがブローカーに対して自身を認証したい場合、独自の証明書を提示します。
TLS 認証を使用している場合、Kafka クライアント設定には、Kafka クラスターに接続するためのトラストストアとキーストアが必要です。SASL SCRAM-SHA-512 を使用している場合、認証はデジタル証明書ではなくユーザー名とパスワードの認証情報の交換によって実行されるため、キーストアは必要ありません。SCRAM-SHA-512 はより軽量なメカニズムですが、証明書ベースの認証を使用するほどセキュアではありません。
独自の証明書インフラストラクチャーがあり、サードパーティー CA からの証明書を使用している場合は、クライアントのデフォルトのトラストストアにパブリック CA 証明書がすでに含まれている可能性が高いため、それらをクライアントのトラストストアに追加する必要はありません。サーバーの証明書がデフォルトのトラストストアにすでに含まれているパブリック CA 証明書のいずれかによって署名されている場合、クライアントは自動的にその証明書を信頼します。
config.properties
ファイルを作成して、クライアントアプリケーションで使用される認証情報を指定できます。
次の例では、security.protocol
が SSL
に設定され、クライアントとブローカーの間で TLS 認証と暗号化が有効になります。
ssl.truststore.location
プロパティーと ssl.truststore.password
プロパティーは、トラストストアの場所とパスワードを指定します。ssl.keystore.location
プロパティーと ssl.keystore.password
プロパティーは、キーストアの場所とパスワードを指定します。
PKCS #12 (公開鍵暗号化標準 #12) ファイル形式が使用されます。Base64 でエンコードされた PEM (Privacy Enhanced Mail) 形式を使用することもできます。
TLS 認証のクライアント設定プロパティーの例
bootstrap.servers = my-cluster-kafka-bootstrap:9093 security.protocol = SSL ssl.truststore.location = /path/to/ca.p12 ssl.truststore.password = truststore-password ssl.keystore.location = /path/to/user.p12 ssl.keystore.password = keystore-password client.id = my-client
次の例では、security.protocol
が SASL_SSL
に設定され、クライアントとブローカーの間で TLS 暗号化を使用した SASL 認証が有効になります。暗号化ではなく認証のみが必要な場合は、SASL
プロトコルを使用できます。認証用に指定された SASL メカニズムは SCRAM-SHA-512
です。さまざまな認証メカニズムを使用できます。sasl.jaas.config
プロパティーは認証情報を指定します。
SCRAM-SHA-512 認証のクライアント設定プロパティーの例
bootstrap.servers = my-cluster-kafka-bootstrap:9093 security.protocol = SASL_SSL sasl.mechanism = SCRAM-SHA-512 sasl.jaas.config = org.apache.kafka.common.security.scram.ScramLoginModule required \ username = "user" \ password = "secret"; ssl.truststore.location = path/to/truststore.p12 ssl.truststore.password = truststore_password ssl.truststore.type = PKCS12 client.id = my-client
PEM 形式をサポートしていないアプリケーションの場合は、OpenSSL などのツールを使用して PEM ファイルを PKCS #12 形式に変換できます。
5.2.2. 許可される TLS バージョンと暗号スイートの設定
SSL 設定と暗号スイートを組み込んで、クライアントアプリケーションと Kafka クラスターの間の TLS ベースの通信をさらに保護できます。Kafka ブローカーの設定でサポートされている TLS バージョンと暗号スイートを指定します。クライアントが使用する TLS バージョンと暗号スイートを制限する場合は、クライアントに設定を追加することもできます。クライアントの設定では、ブローカーで有効になっているプロトコルと暗号スイートのみを使用する必要があります。
次の例では、Kafka ブローカーとクライアントアプリケーションの間の通信に security.protocol
を使用して SSL が有効になっています。暗号スイートはコンマ区切りのリストとして指定します。ssl.cipher.suites property
はクライアントが使用を許可されている暗号スイートのコンマ区切りのリストです。
Kafka ブローカーの SSL 設定プロパティーの例
security.protocol: "SSL" ssl.enabled.protocols: "TLSv1.3", "TLSv1.2" ssl.protocol: "TLSv1.3" ssl.cipher.suites: "TLS_AES_256_GCM_SHA384"
ssl.enabled.protocols
プロパティーは、クラスターとそのクライアントの間のセキュアな通信に使用できる TLS バージョンを指定します。この場合は、TLSv1.3
と TLSv1.2
の両方が有効になります。ssl.protocol
プロパティーは、全接続のデフォルトの TLS バージョンを設定します。有効なプロトコルから選択する必要があります。デフォルトでは、クライアントは TLSv1.3
を使用して通信します。クライアントが TLSv1.2 のみをサポートしている場合でも、サポートされているバージョンを使用してブローカーに接続し、通信できます。同様に、設定がクライアント上にあり、ブローカーが TLSv1.2 のみをサポートする場合、クライアントはサポートされているバージョンを使用します。
Apache Kafka でサポートされる暗号スイートは、使用している Kafka のバージョンと基盤となる環境によって異なります。最高レベルのセキュリティーを提供する、サポートされている最新の暗号スイートを確認してください。
5.2.3. アクセス制御リスト (ACL) の使用
クライアントアプリケーションで ACLS を明示的に設定する必要はありません。ACL は、Kafka ブローカーによってサーバー側で適用されます。クライアントがデータを生成または消費するリクエストをサーバーに送信すると、サーバーは ACL をチェックして、クライアント (ユーザー) がリクエストされた操作を実行する権限を持っているかどうかを判断します。クライアントが承認されていると、リクエストは処理されます。それ以外の場合、リクエストは拒否され、エラーが返されます。ただし、Kafka クラスターとのセキュアな接続を可能にするために、クライアントは引き続き認証され、適切なセキュリティープロトコルを使用する必要があります。
Kafka ブローカーでアクセス制御リスト (ACL) を使用している場合は、制御するトピックおよび操作へのクライアントアクセスを制限するように ACL が適切に設定されていることを確認してください。Open Policy Agent (OPA) ポリシーを使用してアクセスを管理している場合、承認ルールがポリシー内で設定されているため、Kafka ブローカーに対して ACL を指定する必要はありません。OAuth 2.0 にはある程度の柔軟性が備わっています。OAuth 2.0 プロバイダーを使用して ACL を管理できます。または、OAuth 2.0 と Kafka の simple
承認を使用して ACL を管理します。
ACL はほとんどのタイプのリクエストに適用され、操作の生成と消費に限定されません。たとえば、ACLS は、トピックの説明などの読み取り操作、新しいトピックの作成などの書き込み操作に適用できます。
5.2.4. トークンベースのアクセスに OAuth 2.0 を使用する
Streams for Apache Kafka での承認に OAuth 2.0 オープン標準を使用して、OAuth 2.0 プロバイダーを通じて承認制御を適用します。OAuth 2.0 は、アプリケーションが他のシステムに保存されているユーザーデータにアクセスするためのセキュアな方法を提供します。承認サーバーは、Kafka クラスターへのアクセスを許可するアクセストークンをクライアントアプリケーションに発行できます。
次の手順では、トークン検証に OAuth 2.0 を設定して使用するための一般的なアプローチを説明します。
- クライアント ID やシークレットなど、ブローカーとクライアントの認証情報を使用して認可サーバーを設定します。
- 認可サーバーから OAuth 2.0 認証情報を取得します。
- OAuth 2.0 認証情報を使用して Kafka ブローカー上のリスナーを設定し、認可サーバーとやりとりします。
- Oauth 2.0 の依存関係をクライアントライブラリーに追加します。
- OAuth 2.0 認証情報を使用して Kafka クライアントを設定し、認可サーバーとやりとりします。
- 実行時にアクセストークンを取得し、OAuth 2.0 プロバイダーでクライアントを認証します。
Kafka ブローカー上で OAuth 2.0 用に設定されたリスナーがある場合は、OAuth 2.0 を使用するようにクライアントアプリケーションをセットアップできます。Kafka クラスターにアクセスするための標準の Kafka クライアント設定に加えて、OAuth 2.0 認証用の特定の設定を含める必要があります。また、使用している承認サーバーが Kafka クラスターおよびクライアントアプリケーションからアクセス可能であることを確認する必要があります。
SASL (Simple Authentication and Security Layer) セキュリティープロトコルとメカニズムを指定します。実稼働環境では、次の設定が推奨されます。
-
TLS 暗号化接続用の
SASL_SSL
プロトコル。 -
ベアラートークンを使用した認証情報交換のための
OAUTHBEARER
メカニズム
JAAS (Java Authentication and Authorization Service) モジュールは SASL メカニズムを実装します。メカニズムの設定は、使用している認証方法によって異なります。たとえば、認証情報交換を使用して、OAuth 2.0 アクセストークンエンドポイント、アクセストークン、クライアント ID、およびクライアントシークレットを追加します。クライアントは承認サーバーのトークンエンドポイント (URL) に接続して、トークンがまだ有効かどうかを確認します。認証されたアクセスのための認可サーバーの公開鍵証明書を含むトラストストアも必要です。
OAauth 2.0 のクライアント設定プロパティーの例
bootstrap.servers = my-cluster-kafka-bootstrap:9093 security.protocol = SASL_SSL sasl.mechanism = OAUTHBEARER # ... sasl.jaas.config = org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.token.endpoint.uri = "https://localhost:9443/oauth2/token" \ oauth.access.token = <access_token> \ oauth.client.id = "<client_id>" \ oauth.client.secret = "<client_secret>" \ oauth.ssl.truststore.location = "/<truststore_location>/oauth-truststore.p12" \ oauth.ssl.truststore.password = "<truststore_password>" \ oauth.ssl.truststore.type = "PKCS12" \
関連情報
OAuth 2.0 を使用するようにブローカーを設定する方法の詳細は、次のガイドを参照してください。
5.2.5. Open Policy Agent (OPA) アクセスポリシーの使用
Streams for Apache Kafka で Open Policy Agent (OPA)ポリシーエージェントを使用して、アクセスポリシーに対して Kafka クラスターに接続するためのリクエストを評価します。Open Policy Agent (OPA) は、認可ポリシーを管理するポリシーエンジンです。ポリシーはアクセス制御を一元化し、クライアントアプリケーションを変更することなく動的に更新できます。たとえば、特定のユーザー (クライアント) のみに特定のトピックへのメッセージの作成と消費を許可するポリシーを作成できます。
Streams for Apache Kafka は、オーソライザーとして Kafka 承認に Open Policy Agent プラグインを使用します。
次の手順では、OPA を設定して使用するための一般的なアプローチを説明します。
- OPA サーバーのインスタンスをセットアップします。
- Kafka クラスターへのアクセスを制御する承認ルールを提供するポリシーを定義します。
- Kafka ブローカーが OPA 承認を受け入れ、OPA サーバーとやりとりするための設定を作成します。
- Kafka クラスターへの承認されたアクセスのための認証情報を提供するように Kafka クライアントを設定します。
Kafka ブローカー上で OPA 用にリスナーが設定されている場合は、OPA を使用するようにクライアントアプリケーションをセットアップできます。リスナー設定では、OPA サーバーに接続し、クライアントアプリケーションを承認するための URL を指定します。Kafka クラスターにアクセスするための標準の Kafka クライアント設定に加えて、Kafka ブローカーで認証するための認証情報を追加する必要があります。ブローカーは、OPA サーバーにリクエストを送信して認可ポリシーを評価することにより、クライアントがリクエストされた操作を実行するために必要な承認を持っているかどうかを確認します。ポリシーエンジンが認可ポリシーを強制するため、通信を保護するためにトラストストアやキーストアは必要ありません。
OPA 承認のクライアント設定プロパティーの例
bootstrap.servers = my-cluster-kafka-bootstrap:9093 security.protocol = SASL_SSL sasl.mechanism = SCRAM-SHA-512 sasl.jaas.config = org.apache.kafka.common.security.scram.ScramLoginModule required \ username = "user" \ password = "secret"; # ...
Red Hat は OPA サーバーをサポートしません。
関連情報
OPA を使用するようにブローカーを設定する方法の詳細は、次のガイドを参照してください。
5.2.6. メッセージのストリーミング時にトランザクションを使用する
ブローカーおよびプロデューサークライアントアプリケーションでトランザクションプロパティーを設定することにより、メッセージが単一のトランザクションで処理されるようにすることができます。トランザクションにより、メッセージのストリーミングに信頼性と一貫性が追加されます。
トランザクションはブローカー上で常に有効になります。次のプロパティーを使用してデフォルト設定を変更できます。
トランザクションの Kafka ブローカー設定プロパティーの例
transaction.state.log.replication.factor = 3 transaction.state.log.min.isr = 2 transaction.abort.timed.out.transaction.cleanup.interval.ms = 3600000
これは運用環境の一般的な設定であり、内部 __transaction_state
トピック用に 3 つのレプリカを作成します。\__transaction_state
トピックには、進行中のトランザクションに関する情報が保存されます。トランザクションログには、少なくとも 2 つの同期レプリカが必要です。クリーンアップ間隔は、タイムアウトしたトランザクションをチェックし、対応するトランザクションログをクリーンアップするまでの時間です。
クライアント設定にトランザクションプロパティーを追加するには、プロデューサーとコンシューマーに対して次のプロパティーを設定します。
トランザクションのプロデューサークライアント設定プロパティーの例
transactional.id = unique-transactional-id enable.idempotence = true max.in.flight.requests.per.connection = 5 acks = all retries=2147483647 transaction.timeout.ms = 30000 delivery.timeout = 25000
トランザクション ID により、Kafka ブローカーはトランザクションを追跡できるようになります。これはプロデューサーの一意の識別子であり、特定のパーティションのセットで使用する必要があります。複数のパーティションセットに対してトランザクションを実行する必要がある場合は、セットごとに異なるトランザクション ID を使用する必要があります。冪等性は、プロデューサーインスタンスが重複したメッセージを作成するのを避けるために有効になっています。冪等性では、メッセージはプロデューサー ID とシーケンス番号を使用して追跡されます。ブローカーはメッセージを受信すると、プロデューサー ID とシーケンス番号をチェックします。同じプロデューサー ID とシーケンス番号を持つメッセージがすでに受信されている場合、ブローカーは重複したメッセージを破棄します。
トランザクションが送信された順序で処理されるように、実行中のリクエストの最大数は 5 に設定されています。パーティションには、メッセージの順序を損なうことなく、最大 5 つの実行中のリクエストを含めることができます。
acks
を all
に設定すると、プロデューサーは、トランザクションが完了したと見なす前に、書き込み先のトピックパーティションのすべての同期レプリカからの確認応答を待機します。これにより、メッセージが Kafka クラスターに永続的に書き込まれ (コミットされ)、ブローカーに障害が発生した場合でもメッセージが失われることがなくなります。
トランザクションタイムアウトは、クライアントがタイムアウトになるまでにトランザクションを完了する必要がある最大時間を指定します。配信タイムアウトは、タイムアウトになるまでにプロデューサーがブローカーによるメッセージ配信の確認応答を待機する最大時間を指定します。メッセージがトランザクション期間内に確実に配信されるようにするには、配信タイムアウトをトランザクションタイムアウトよりも小さく設定します。失敗したメッセージ要求の再送信 試行回数
を指定する場合は、ネットワーク遅延とメッセージスループットを考慮し、一時的な障害も視野に入れます。
トランザクションのコンシューマークライアント設定プロパティーの例
group.id = my-group-id isolation.level = read_committed enable.auto.commit = false
read_committed
分離レベルは、コンシューマーが正常に完了したトランザクションのメッセージのみを読み取ることを指定します。コンシューマーは、進行中のトランザクションまたは失敗したトランザクションの一部であるメッセージを処理しません。これにより、コンシューマーは完全に完了したトランザクションの一部であるメッセージのみを読み取ることが保証されます。
トランザクションを使用してメッセージをストリーミングする場合は、enable.auto.commit
を false
に設定することが重要です。true
に設定すると、コンシューマーはトランザクションを考慮せずにオフセットを定期的にコミットします。これは、トランザクションが完全に完了する前にコンシューマーがメッセージをコミットする可能性があることを意味します。enable.auto.commit
を false
に設定すると、コンシューマーは、完全に書き込まれ、トランザクションの一部としてトピックにコミットされたメッセージのみを読み取り、コミットします。