5.4. Kafka の設定


Kafka はプロパティーファイルを使用して静的設定を保存します。推奨される設定ファイルの場所は /opt/kafka/config/server.properties です。設定ファイルは kafka ユーザーが読み取りできる必要があります。

AMQ Streams には、製品のさまざまな基本的な機能と高度な機能を強調する設定ファイルのサンプルが含まれています。AMQ Streams インストールディレクトリーの config/server.properties を参照してください。

本章では、最も重要な設定オプションについて説明します。

5.4.1. ZooKeeper

Kafka ブローカーは、設定の一部を保存し、クラスターを調整するために (たとえば、どのノードがどのパーティションのリーダーであるかを決定するために) ZooKeeper を必要とします。ZooKeeper クラスターの接続の詳細は、設定ファイルに保存されます。zookeeper.connect フィールドには、zookeeper クラスターのメンバーのホスト名およびポートのコンマ区切りリストが含まれます。

以下に例を示します。

zookeeper.connect=zoo1.my-domain.com:2181,zoo2.my-domain.com:2181,zoo3.my-domain.com:2181

Kafka はこれらのアドレスを使用して ZooKeeper クラスターに接続します。この設定により、すべての Kafka znodes が ZooKeeper データベースのルートに直接作成されます。そのため、このような ZooKeeper クラスターは単一の Kafka クラスターにのみ使用できます。単一の ZooKeeper クラスターを使用するように複数の Kafka クラスターを設定するには、Kafka 設定ファイルの ZooKeeper 接続文字列の最後にベース (接頭辞) パスを指定します。

zookeeper.connect=zoo1.my-domain.com:2181,zoo2.my-domain.com:2181,zoo3.my-domain.com:2181/my-cluster-1

5.4.2. リスナー

リスナーは、Kafka ブローカーへの接続に使用されます。各 Kafka ブローカーは、複数のリスナーを使用するように設定できます。リスナーごとに異なる設定が必要なため、別のポートまたはネットワークインターフェイスでリッスンできます。

リスナーを設定するには、設定ファイル (/opt/kafka/config/server.properties) の listeners プロパティーを編集します。listeners プロパティーにコンマ区切りのリストとしてリスナーを追加します。各プロパティーを以下のように設定します。

<listenerName>://<hostname>:<port>

<hostname> が空の場合、Kafka は java.net.InetAddress.getCanonicalHostName() クラスをホスト名として使用します。

複数のリスナーの設定例

listeners=internal-1://:9092,internal-2://:9093,replication://:9094

Kafka クライアントが Kafka クラスターに接続する場合は、最初にクラスターノードの 1 つである ブートストラップサーバー に接続します。ブートストラップサーバーはクライアントにクラスター内のすべてのブローカーのリストを提供し、クライアントは各ブローカーに個別に接続します。ブローカーのリストは、設定された listeners に基づいています。

アドバタイズされたリスナー

任意で、advertised.listeners プロパティーを使用して、listeners プロパティーに指定されたものとは異なるリスナーアドレスのセットをクライアントに提供できます。これは、プロキシーなどの追加のネットワークインフラストラクチャーがクライアントとブローカー間にある場合や、IP アドレスの代わりに外部 DNS 名が使用されている場合に便利です。

advertised.listeners プロパティーは listeners プロパティーと同じ方法でフォーマットされます。

アドバタイズされたリスナーの設定例

listeners=internal-1://:9092,internal-2://:9093
advertised.listeners=internal-1://my-broker-1.my-domain.com:1234,internal-2://my-broker-1.my-domain.com:1235

注記

アドバタイズされたリスナーの名前は、listeners プロパティーに記載されているものと一致する必要があります。

inter-broker リスナー

inter-broker リスナー は、Kafka Inter-broker の通信に使用されます。inter-broker 通信は以下に必要です。

  • 異なるブローカー間のワークロードの調整
  • 異なるブローカーに保存されているパーティション間でのメッセージのレプリケーション
  • パーティションリーダーシップの変更など、コントローラーからの管理タスクの処理

inter-broker リスナーは、任意のポートに割り当てることができます。複数のリスナーが設定されている場合、inter.broker.listener.name プロパティーで inter-broker リスナーの名前を定義できます。

ここでは、inter-broker リスナーの名前は REPLICATION です。

listeners=REPLICATION://0.0.0.0:9091
inter.broker.listener.name=REPLICATION

コントロールプレーンリスナー

デフォルトでは、コントローラーと他のブローカー間の通信は inter-broker リスナー を使用します。コントローラーは、パーティションリーダーシップの変更など、管理タスクを調整します。

コントローラー接続用に専用の コントロールプレーンリスナー を有効にすることができます。コントロールプレーンリスナーは、任意のポートに割り当てることができます。

コントロールプレーンリスナーを有効にするには、リスナー名で control.plane.listener.name プロパティーを設定します。

listeners=CONTROLLER://0.0.0.0:9090,REPLICATION://0.0.0.0:9091
...
control.plane.listener.name=CONTROLLER

コントロールプレーンリスナーを有効にすると、コントローラーの通信がブローカー間のデータレプリケーションによって遅延しないため、クラスターのパフォーマンスが向上する可能性があります。データレプリケーションは、inter-broker のリスナーを介して続行されます。

control.plane.listener が設定されていない場合、コントローラー接続には inter-broker のリスナー が使用されます。

5.4.3. ログのコミット

Apache Kafka は、プロデューサーから受信するすべてのレコードをコミットログに保存します。コミットログには、Kafka が配信する必要がある実際のデータ (レコードの形式) が含まれます。これらは、ブローカーの動作を記録するアプリケーションログファイルではありません。

ログディレクトリー

log.dirs プロパティーファイルを使用してログディレクトリーを設定し、1 つまたは複数のログディレクトリーにコミットログを保存できます。これは、インストール時に作成された /var/lib/kafka ディレクトリーに設定する必要があります。

log.dirs=/var/lib/kafka

パフォーマンス上の理由から、log.dir を複数のディレクトリーに設定し、それぞれを別の物理デバイスに配置して、ディスク I/O のパフォーマンスを向上できます。以下に例を示します。

log.dirs=/var/lib/kafka1,/var/lib/kafka2,/var/lib/kafka3

5.4.4. ブローカー ID

ブローカー ID は、クラスター内の各ブローカーの一意の ID です。ブローカー ID として 0 以上の整数を割り当てることができます。ブローカー ID は、再起動またはクラッシュ後にブローカーを識別するために使用されます。そのため、ID が安定し、時間の経過とともに変更されないようにすることが重要です。ブローカー ID はブローカーのプロパティーファイルで設定されます。

broker.id=1

5.4.5. ZooKeeper の認証

デフォルトでは、ZooKeeper と Kafka 間の接続は認証されません。ただし、Kafka および ZooKeeper は、SASL (Simple Authentication and Security Layer) を使用して認証をセットアップするために使用できる Java Authentication and Authorization Service (JAAS) をサポートします。ZooKeeper は、ローカルに保存されたクレデンシャルと DIGEST-MD5 SASL メカニズムを使用した認証をサポートします。

5.4.5.1. JAAS 設定

ZooKeeper 接続の SASL 認証は JAAS 設定ファイルで設定する必要があります。デフォルトでは、Kafka は ZooKeeper への接続用に Client という名前の JAAS コンテキストを使用します。Client コンテキストは /opt/kafka/config/jass.conf ファイルで設定する必要があります。以下の例のように、コンテキストでは PLAIN SASL 認証を有効にする必要があります。

Client {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="kafka"
    password="123456";
};

5.4.5.2. ZooKeeper 認証の有効化

この手順では、ZooKeeper に接続する際に SASL DIGEST-MD5 メカニズムを使用した認証を有効にする方法を説明します。

前提条件

  • ZooKeeper でクライアント/サーバー間の認証が 有効である

SASL DIGEST-MD5 認証の有効化

  1. すべての Kafka ブローカーノードで、/opt/kafka/config/jaas.conf JAAS 設定ファイルを作成または編集し、以下のコンテキストを追加します。

    Client {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="<Username>"
        password="<Password>";
    };

    ユーザー名とパスワードは ZooKeeper で設定されているものと同じである必要があります。

    Client コンテキストの例を以下に示します。

    Client {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="kafka"
        password="123456";
    };
  2. すべての Kafka ブローカーノードを 1 つずつ再起動します。JAAS 設定を Kafka ブローカーに渡すには、KAFKA_OPTS 環境変数を使用します。

    su - kafka
    export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/config/jaas.conf"; /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties

    マルチノードクラスターでブローカーを再起動する方法は、「Kafka ブローカーの正常なローリング再起動の実行」 を参照してください。

関連情報

5.4.6. 認可

Kafka ブローカーの認可は、authorizer プラグインを使用して実装されます。

本セクションでは、Kafka で提供される AclAuthorizer プラグインを使用する方法を説明します。

または、独自の認可プラグインを使用できます。たとえば、OAuth 2.0 トークンベースの認証 を使用している場合、OAuth 2.0 認可 を使用できます。

5.4.6.1. シンプルな ACL authorizer

AclAuthorizer を含む authorizer プラグインは authorizer.class.name プロパティーを使用して有効にします。

authorizer.class.name=kafka.security.auth.AclAuthorizer

選択した authorizer には完全修飾名が必要です。AclAuthorizer の場合、完全修飾名は kafka.security.auth.AclAuthorizer です。

5.4.6.1.1. ACL ルール

AclAuthorizer は ACL ルールを使用して Kafka ブローカーへのアクセスを管理します。

ACL ルールは以下の形式で定義されます。

プリンシパル P は、ホスト H から Kafka リソース R で操作 O を許可または拒否されます。

たとえば、以下のようにルールを設定できます。

John は、ホスト 127.0.0.1 からトピック コメント表示 できます。

ホストは、John が接続しているマシンの IP アドレスです。

ほとんどの場合、ユーザーはプロデューサーまたはコンシューマーアプリケーションです。

Consumer01 は、ホスト 127.0.0.1 からコンシューマーグループ アカウント書き込み できます。

ACL ルールが存在しない場合

特定のリソースに ACL ルールが存在しない場合は、すべてのアクションが拒否されます。この動作は、Kafka 設定ファイル /opt/kafka/config/server.propertiesallow.everyone.if.no.acl.found プロパティーを true に設定すると変更できます。

5.4.6.1.2. プリンシパル

プリンシパル はユーザーのアイデンティティーを表します。ID の形式は、クライアントが Kafka に接続するために使用される認証メカニズムによって異なります。

  • User:ANONYMOUS: 認証なしで接続する場合
  • User:<username>: PLAIN や SCRAM などの単純な認証メカニズムを使用して接続する場合

    例: User:admin または User:user1

  • User:<DistinguishedName>: TLS クライアント認証を使用して接続する場合

    例: User:CN=user1,O=MyCompany,L=Prague,C=CZ

  • User:<Kerberos username>: Kerberos を使用して接続する場合

DistinguishedName はクライアント証明書からの識別名です。

Kerberos ユーザー名 は、Kerberos プリンシパルの主要部分で、Kerberos を使用して接続する場合のデフォルトで使用されます。sasl.kerberos.principal.to.local.rules プロパティーを使用して、Kerberos プリンシパルから Kafka プリンシパルを構築する方法を設定できます。

5.4.6.1.3. ユーザーの認証

認可を使用するには、認証を有効にし、クライアントにより使用される必要があります。そうでないと、すべての接続のプリンシパルは User:ANONYMOUS になります。

認証方法の詳細は、暗号化と認証 を参照してください。

5.4.6.1.4. スーパーユーザー

スーパーユーザーは、ACL ルールに関係なくすべてのアクションを実行できます。

スーパーユーザーは、super.users プロパティーを使用して Kafka 設定ファイルで定義されます。

以下に例を示します。

super.users=User:admin,User:operator
5.4.6.1.5. レプリカブローカーの認証

認可を有効にすると、これはすべてのリスナーおよびすべての接続に適用されます。これには、ブローカー間のデータのレプリケーションに使用される inter-broker の接続が含まれます。そのため、認可を有効にする場合は、inter-broker 接続に認証を使用し、ブローカーが使用するユーザーに十分な権限を付与してください。たとえば、ブローカー間の認証で kafka-broker ユーザーが使用される場合、スーパーユーザー設定にはユーザー名 super.users=User:kafka-broker が含まれている必要があります。

5.4.6.1.6. サポートされるリソース

Kafka ACL は、以下のタイプのリソースに適用できます。

  • Topics
  • コンシューマーグループ
  • クラスター
  • TransactionId
  • DelegationToken
5.4.6.1.7. サポートされる操作

AclAuthorizer はリソースでの操作を承認します。

以下の表で X の付いたフィールドは、各リソースでサポートされる操作を表します。

表5.1 リソースでサポートされる操作
 Topicsコンシューマーグループクラスター

Read

X

X

 

Write

X

  

Create

  

X

Delete

X

  

Alter

X

  

Describe

X

X

X

ClusterAction

  

X

すべて

X

X

X

5.4.6.1.8. ACL 管理オプション

ACL ルールは、Kafka ディストリビューションパッケージの一部として提供される bin/kafka-acls.sh ユーティリティーを使用して管理されます。

kafka-acls.sh パラメーターオプションを使用して、ACL ルールを追加、リスト表示、および削除したり、その他の機能を実行したりします。

パラメーターには、--add など、二重ハイフンの標記が必要です。

オプションタイプ説明デフォルト

add

Action

ACL ルールを追加します。

 

remove

Action

ACL ルールを削除します。

 

list

Action

ACL ルールをリスト表示します。

 

authorizer

Action

authorizer の完全修飾クラス名。

kafka.security.auth.AclAuthorizer

authorizer-properties

Configuration

初期化のために authorizer に渡されるキー/値のペア。

AclAuthorizer では、サンプル値は zookeeper.connect=zoo1.my-domain.com:2181 です。

 

bootstrap-server

リソース

Kafka クラスターに接続するためのホスト/ポートのペア。

このオプションまたは authorizer オプションを使用します (両方ではなく)。

command-config

リソース

管理クライアントに渡す設定プロパティーファイル。これは bootstrap-server パラメーターと共に使用されます。

 

cluster

リソース

クラスターを ACL リソースとして指定します。

 

topic

リソース

トピック名を ACL リソースとして指定します。

ワイルドカードとして使用されるアスタリスク (*) は、すべてのトピック に解釈されます。

1 つのコマンドに複数の --topic オプションを指定できます。

 

group

リソース

コンシューマーグループ名を ACL リソースとして指定します。

1 つのコマンドに複数の --group オプションを指定できます。

 

transactional-id

リソース

トランザクション ID を ACL リソースとして指定します。

トランザクション配信は、プロデューサーによって複数のパーティションに送信されたすべてのメッセージが正常に配信されるか、いずれも配信されない必要があることを意味します。

ワイルドカードとして使用されるアスタリスク (*) は、すべての ID に解釈されます。

 

delegation-token

リソース

委任トークンを ACL リソースとして指定します。

ワイルドカードとして使用されるアスタリスク (*) は、すべてのトークン に解釈されます。

 

resource-pattern-type

Configuration

add パラメーターのリソースパターンのタイプ、または list または remove パラメーターのリソースパターンのフィルター値を指定します。

literal または prefixed をリソース名のリソースパターンタイプとして使用します。

any または match を、リソースパターンのフィルター値または特定のパターンタイプフィルターとして使用します。

literal

allow-principal

プリンシパル

allow ACL ルールに追加されるプリンシパル。

1 つのコマンドに複数の --allow-principal オプションを指定できます。

 

deny-principal

プリンシパル

拒否 ACL ルールに追加されるプリンシパル。

1 つのコマンドに複数の --deny-principal オプションを指定できます。

 

principal

プリンシパル

プリンシパルの ACL のリストを返すために list パラメーターと共に使用されるプリンシパル名。

1 つのコマンドに複数の --principal オプションを指定できます。

 

allow-host

ホスト

--allow-principal に記載されているプリンシパルへのアクセスを許可する IP アドレス。

ホスト名または CIDR 範囲はサポートされていません。

--allow-principal が指定されている場合、デフォルトは * ですべてのホストを意味します。

deny-host

ホスト

--deny-principal に記載されているプリンシパルへのアクセスを拒否する IP アドレス。

ホスト名または CIDR 範囲はサポートされていません。

--deny-principal が指定されている場合、デフォルトは * ですべてのホストを意味します。

操作 (operation)

操作

操作を許可または拒否します。

1 つのコマンドに複数の --operation オプションを指定できます。

すべて

producer

ショートカット

メッセージプロデューサーが必要とするすべての操作を許可または拒否するためのショートカット (トピックでは WRITE と DESCRIBE、クラスターでは CREATE)。

 

consumer

ショートカット

メッセージコンシューマーが必要とするすべての操作を許可または拒否するためのショートカット (トピックについては READ と DESCRIBE、コンシューマーグループについては READ)。

 

idempotent

ショートカット

--producer パラメーターとの併用時に冪等性を有効にするショートカット。これにより、メッセージがパーティションに 1 度だけ配信されるようになります。

プロデューサーが特定のトランザクション ID に基づいてメッセージを送信することを許可されている場合、Idepmotence は自動的に有効になります。

 

force

ショートカット

すべてのクエリーを受け入れ、プロンプトは表示されないショートカット。

 

5.4.6.2. 認可の有効化

この手順では、Kafka ブローカーでの認可用に AclAuthorizer プラグインを有効にする方法を説明します。

前提条件

手順

  1. AclAuthorizer を使用するように、Kafka 設定ファイル /opt/kafka/config/server.properties を編集します。

    authorizer.class.name=kafka.security.auth.AclAuthorizer
  2. Kafka ブローカーを (再) 起動します。

5.4.6.3. ACL ルールの追加

AclAuthorizer は、ユーザーが実行できる/できない操作を記述するルールのセットを定義するアクセス制御リスト (ACL) を使用します。

この手順では、Kafka ブローカーで AclAuthorizer プラグインを使用する場合に、ACL ルールを追加する方法を説明します。

ルールは kafka-acls.sh ユーティリティーを使用して追加され、ZooKeeper に保存されます。

前提条件

手順

  1. --add オプションを指定して kafka-acls.sh を実行します。

    例:

    • MyConsumerGroup コンシューマーグループを使用して、user1 および user2myTopic からの読み取りを許可します。

      bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zoo1.my-domain.com:2181 --add --operation Read --topic myTopic --allow-principal User:user1 --allow-principal User:user2
      
      bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zoo1.my-domain.com:2181 --add --operation Describe --topic myTopic --allow-principal User:user1 --allow-principal User:user2
      
      bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zoo1.my-domain.com:2181 --add --operation Read --operation Describe --group MyConsumerGroup --allow-principal User:user1 --allow-principal User:user2
    • user1 が IP アドレスホスト 127.0.0.1 から myTopic を読むためのアクセスを拒否します。

      bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zoo1.my-domain.com:2181 --add --operation Describe --operation Read --topic myTopic --group MyConsumerGroup --deny-principal User:user1 --deny-host 127.0.0.1
    • MyConsumerGroupmyTopic のコンシューマーとして user1 を追加します。

      bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zoo1.my-domain.com:2181 --add --consumer --topic myTopic --group MyConsumerGroup --allow-principal User:user1

5.4.6.4. ACL ルールの一覧表示

この手順では、Kafka ブローカーで AclAuthorizer プラグインを使用する場合に、既存の ACL ルールをリスト表示する方法を説明します。

ルールは、kafka-acls.sh ユーティリティーを使用してリストされます。

前提条件

手順

  • --list オプションを指定して kafka-acls.sh を実行します。

    以下に例を示します。

    $ bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zoo1.my-domain.com:2181 --list --topic myTopic
    
    Current ACLs for resource `Topic:myTopic`:
    
    User:user1 has Allow permission for operations: Read from hosts: *
    User:user2 has Allow permission for operations: Read from hosts: *
    User:user2 has Deny permission for operations: Read from hosts: 127.0.0.1
    User:user1 has Allow permission for operations: Describe from hosts: *
    User:user2 has Allow permission for operations: Describe from hosts: *
    User:user2 has Deny permission for operations: Describe from hosts: 127.0.0.1

5.4.6.5. ACL ルールの削除

この手順では、Kafka ブローカーで AclAuthorizer プラグインを使用する場合に、ACL ルールを削除する方法を説明します。

ルールは kafka-acls.sh ユーティリティーを使用して削除されます。

前提条件

手順

  • --remove オプションを指定して kafka-acls.sh を実行します。

    例:

  • MyConsumerGroup コンシューマーグループを使用して、user1 および user2myTopic からの読み取りを許可する ACL を削除します。

    bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zoo1.my-domain.com:2181 --remove --operation Read --topic myTopic --allow-principal User:user1 --allow-principal User:user2
    
    bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zoo1.my-domain.com:2181 --remove --operation Describe --topic myTopic --allow-principal User:user1 --allow-principal User:user2
    
    bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zoo1.my-domain.com:2181 --remove --operation Read --operation Describe --group MyConsumerGroup --allow-principal User:user1 --allow-principal User:user2
  • MyConsumerGroupmyTopic のコンシューマーとして user1 を追加する ACL を削除します。

    bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zoo1.my-domain.com:2181 --remove --consumer --topic myTopic --group MyConsumerGroup --allow-principal User:user1
  • user1 が IP アドレスホスト 127.0.0.1 から myTopic を読むためのアクセスを拒否する ACL を削除します。

    bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zoo1.my-domain.com:2181 --remove --operation Describe --operation Read --topic myTopic --group MyConsumerGroup --deny-principal User:user1 --deny-host 127.0.0.1

5.4.7. ZooKeeper の認可

Kafka と ZooKeeper の間で認証が有効になっている場合、ZooKeeper アクセス制御リスト (ACL) ルールを使用して、ZooKeeper に格納されている Kafka のメタデータへのアクセスを自動的に制御できます。

5.4.7.1. ACL 設定

ZooKeeper ACL ルールの適用は、config/server.properties Kafka 設定ファイルの zookeeper.set.acl プロパティーによって制御されます。

プロパティーはデフォルトで無効になっていて、true に設定することにより有効になります。

zookeeper.set.acl=true

ACL ルールが有効になっている場合、ZooKeeper で znode が作成されると、作成した Kafka ユーザーのみがこれを変更または削除することができます。その他のすべてのユーザーには読み取り専用アクセスがあります。

Kafka は、新しく作成された ZooKeeper znodes に対してのみ ACL ルールを設定します。ACL がクラスターの最初の起動後にのみ有効である場合、zookeeper-security-migration.sh ツールは既存のすべての znodes に ACL を設定できます。

ZooKeeper のデータの機密性

ZooKeeper に保存されるデータには以下が含まれます。

  • トピック名およびその設定
  • SASL SCRAM 認証が使用される場合のソルト化およびハッシュ化されたユーザークレデンシャル

しかし、ZooKeeper は Kafka を使用して送受信されたレコードを保存しません。ZooKeeper に保存されるデータは機密ではないと想定されます。

データが機密として考慮される場合 (たとえば、トピック名にカスタマー ID が含まれるなど)、保護に使用できる唯一のオプションは、ネットワークレベルで ZooKeeper を分離し、Kafka ブローカーにのみアクセスを許可することです。

5.4.7.2. 新しい Kafka クラスターでの ZooKeeper ACL の有効化

この手順では、新しい Kafka クラスターの Kafka 設定で ZooKeeper ACL を有効にする方法を説明します。この手順は、Kafka クラスターの最初の起動前にのみ使用してください。すでに実行中のクラスターで ZooKeeper ACL を有効にする場合は、「既存の Kafka クラスターでの ZooKeeper ACL の有効化」 を参照してください。

前提条件

手順

  1. すべてのクラスターノードの /opt/kafka/config/server.properties Kafka 設定ファイルを編集し、zookeeper.set.acl フィールドを true に設定します。

    zookeeper.set.acl=true
  2. Kafka ブローカーを起動します。

5.4.7.3. 既存の Kafka クラスターでの ZooKeeper ACL の有効化

この手順では、稼働している Kafka クラスターの Kafka 設定で ZooKeeper ACL を有効にする方法を説明します。zookeeper-security-migration.sh ツールを使用して、既存のすべての znodes に ZooKeeper の ACL を設定します。zookeeper-security-migration.sh は AMQ Streams の一部として利用でき、bin ディレクトリーにあります。

前提条件

ZooKeeper ACL の有効化

  1. すべてのクラスターノードの /opt/kafka/config/server.properties Kafka 設定ファイルを編集し、zookeeper.set.acl フィールドを true に設定します。

    zookeeper.set.acl=true
  2. すべての Kafka ブローカーを 1 つずつ再起動します。

    マルチノードクラスターでブローカーを再起動する方法は、「Kafka ブローカーの正常なローリング再起動の実行」 を参照してください。

  3. zookeeper-security-migration.sh ツールを使用して、既存のすべての znodes ノードに ACL を設定します。

    su - kafka
    cd /opt/kafka
    KAFKA_OPTS="-Djava.security.auth.login.config=./config/jaas.conf"; ./bin/zookeeper-security-migration.sh --zookeeper.acl=secure --zookeeper.connect=<ZooKeeperURL>
    exit

    以下に例を示します。

    su - kafka
    cd /opt/kafka
    KAFKA_OPTS="-Djava.security.auth.login.config=./config/jaas.conf"; ./bin/zookeeper-security-migration.sh --zookeeper.acl=secure --zookeeper.connect=zoo1.my-domain.com:2181
    exit

5.4.8. 暗号化と認証

AMQ Streams は、リスナー設定の一部として設定される暗号化および認証をサポートします。

5.4.8.1. リスナーの設定

Kafka ブローカーの暗号化および認証は、リスナーごとに設定されます。Kafka リスナーの設定に関する詳細は、「リスナー」 を参照してください。

Kafka ブローカーの各リスナーは、独自のセキュリティープロトコルで設定されます。設定プロパティー listener.security.protocol.map は、どのリスナーがどのセキュリティープロトコルを使用するかを定義します。各リスナー名がセキュリティープロトコルにマッピングされます。サポートされるセキュリティープロトコルは次のとおりです。

PLAINTEXT
暗号化または認証を使用しないリスナー。
SSL
TLS 暗号化を使用し、オプションで TLS クライアント証明書を使用した認証を使用するリスナー。
SASL_PLAINTEXT
暗号化なし、SASL ベースの認証を使用するリスナー。
SASL_SSL
TLS ベースの暗号化および SASL ベースの認証を使用するリスナー。

以下の listeners 設定の場合、

listeners=INT1://:9092,INT2://:9093,REPLICATION://:9094

listener.security.protocol.map は以下のようになります。

listener.security.protocol.map=INT1:SASL_PLAINTEXT,INT2:SASL_SSL,REPLICATION:SSL

これにより、リスナー INT1 は暗号化されていない接続および SASL 認証を使用し、リスナー INT2 は暗号化された接続および SASL 認証を使用し、REPLICATION インターフェイスは TLS による暗号化 (TLS クライアント認証が使用される可能性があり) を使用するように設定されます。同じセキュリティープロトコルを複数回使用できます。以下は、有効な設定の例です。

listener.security.protocol.map=INT1:SSL,INT2:SSL,REPLICATION:SSL

このような設定は、すべてのインターフェイスに TLS 暗号化および TLS 認証を使用します。以下の章では、TLS および SASL の設定方法について詳しく説明します。

5.4.8.2. TLS 暗号化

Kafka は、Kafka クライアントとの通信を暗号化するために TLS をサポートします。

TLS による暗号化およびサーバー認証を使用するには、秘密鍵と公開鍵が含まれるキーストアを提供する必要があります。これは通常、Java Keystore (JKS) 形式のファイルを使用して行われます。このファイルのパスは、ssl.keystore.location プロパティーに設定されます。ssl.keystore.password プロパティーを使用して、キーストアを保護するパスワードを設定する必要があります。以下に例を示します。

ssl.keystore.location=/path/to/keystore/server-1.jks
ssl.keystore.password=123456

秘密鍵を保護するために、追加のパスワードが使用されることがあります。このようなパスワードは、ssl.key.password プロパティーを使用して設定できます。

Kafka は、認証局によって署名された鍵と自己署名の鍵を使用できます。認証局が署名する鍵を使用することが、常に推奨される方法です。クライアントが接続している Kafka ブローカーのアイデンティティーを検証できるようにするには、証明書に Common Name (CN) または Subject Alternative Names (SAN) としてアドバタイズされたホスト名が常に含まれる必要があります。

異なるリスナーに異なる SSL 設定を使用できます。ssl. で始まるすべてのオプションの前に listener.name.<NameOfTheListener>. を付けることができます。この場合、リスナーの名前は常に小文字である必要があります。これにより、その特定のリスナーのデフォルトの SSL 設定が上書きされます。以下の例は、異なるリスナーに異なる SSL 設定を使用する方法を示しています。

listeners=INT1://:9092,INT2://:9093,REPLICATION://:9094
listener.security.protocol.map=INT1:SSL,INT2:SSL,REPLICATION:SSL

# Default configuration - will be used for listeners INT1 and INT2
ssl.keystore.location=/path/to/keystore/server-1.jks
ssl.keystore.password=123456

# Different configuration for listener REPLICATION
listener.name.replication.ssl.keystore.location=/path/to/keystore/server-1.jks
listener.name.replication.ssl.keystore.password=123456

その他の TLS 設定オプション

上記のメインの TLS 設定オプションの他に、Kafka は TLS 設定を調整するための多くのオプションをサポートします。たとえば、TLS/ SSL プロトコルまたは暗号スイートを有効または無効にするには、次のコマンドを実行します。

ssl.cipher.suites
有効な暗号スイートのリスト。各暗号スイートは、TLS 接続に使用される認証、暗号化、MAC、および鍵交換アルゴリズムの組み合わせです。デフォルトでは、利用可能なすべての暗号スイートが有効になっています。
ssl.enabled.protocols
有効な TLS / SSL プロトコルのリスト。デフォルトは TLSv1.2,TLSv1.1,TLSv1 です。

5.4.8.3. TLS 暗号化の有効化

この手順では、Kafka ブローカーで暗号化を有効にする方法を説明します。

前提条件

手順

  1. クラスター内のすべての Kafka ブローカーの TLS 証明書を生成します。証明書には、Common Name または Subject Alternative Name にアドバタイズされたアドレスおよびブートストラップアドレスが必要です。
  2. 以下のように、すべてのクラスターノードの /opt/kafka/config/server.properties Kafka 設定ファイルを編集します。

    • listener.security.protocol.map フィールドを変更して、TLS 暗号化を使用するリスナーに SSL プロトコルを指定します。
    • ssl.keystore.location オプションを、ブローカー証明書を持つ JKS キーストアへのパスに設定します。
    • ssl.keystore.password オプションを、キーストアの保護に使用したパスワードに設定します。

      以下に例を示します。

      listeners=UNENCRYPTED://:9092,ENCRYPTED://:9093,REPLICATION://:9094
      listener.security.protocol.map=UNENCRYPTED:PLAINTEXT,ENCRYPTED:SSL,REPLICATION:PLAINTEXT
      ssl.keystore.location=/path/to/keystore/server-1.jks
      ssl.keystore.password=123456
  3. Kafka ブローカーを (再) 起動します。

5.4.8.4. 認証

認証には、以下を使用できます。

5.4.8.4.1. TLS クライアント認証

TLS クライアント認証は、TLS 暗号化を使用している接続でのみ使用できます。公開鍵のあるトラストストアをブローカーに提供し、TLS クライアント認証を使用することができます。これらのキーは、ブローカーに接続するクライアントを認証するために使用できます。トラストストアは Java Keystore (JKS) 形式で提供され、認証局の公開鍵が含まれている必要があります。トラストストアに含まれる認証局のいずれかによって署名された公開鍵および秘密鍵を持つクライアントはすべて認証されます。トラストストアの場所は、フィールド ssl.truststore.location を使用して設定されます。トラストストアがパスワードで保護される場合、ssl.truststore.password プロパティーでパスワードを設定する必要があります。以下に例を示します。

ssl.truststore.location=/path/to/keystore/server-1.jks
ssl.truststore.password=123456

トラストストアが設定されたら、ssl.client.auth プロパティーを使用して TLS クライアント認証を有効にする必要があります。このプロパティーは、3 つの異なる値のいずれかに設定できます。

none
TLS クライアント認証はオフになっています。(デフォルト値)
requested
TLS クライアント認証は任意です。クライアントは TLS クライアント証明書を使用した認証を要求されますが、このような認証をしないことを選択することができます。
required
クライアントは TLS クライアント証明書を使用して認証する必要があります。

クライアントが TLS クライアント認証を使用して認証する場合、認証されたプリンシパル名は認証済みクライアント証明書からの識別名になります。たとえば、CN=someuser という識別名の証明書を持つユーザーは、プリンシパル CN=someuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown で認証されます。TLS クライアント認証が使用されておらず、SASL が無効な場合、プリンシパル名は ANONYMOUS になります。

5.4.8.4.2. SASL 認証

SASL 認証は、Java Authentication and Authorization Service (JAAS) を使用して設定されます。JAAS は、Kafka と ZooKeeper 間の接続の認証にも使用されます。JAAS は独自の設定ファイルを使用します。このファイルに推奨される場所は /opt/kafka/config/jaas.conf です。ファイルは kafka ユーザーが読み取りできる必要があります。Kafka を実行中の場合、このファイルの場所は Java システムプロパティー java.security.auth.login.config を使用して指定されます。このプロパティーは、ブローカーノードの起動時に Kafka に渡す必要があります。

KAFKA_OPTS="-Djava.security.auth.login.config=/path/to/my/jaas.config"; bin/kafka-server-start.sh

SASL 認証は、暗号化されていないプレーンの接続と TLS 接続の両方を介してサポートされます。SASL はリスナーごとに個別に有効にできます。これを有効にするには、listener.security.protocol.map のセキュリティープロトコルを SASL_PLAINTEXT または SASL_SSL のいずれかにする必要があります。

Kafka の SASL 認証は、いくつかの異なるメカニズムをサポートします。

PLAIN
ユーザー名とパスワードに基づいて認証を実装します。ユーザー名とパスワードは Kafka 設定にローカルに保存されます。
SCRAM-SHA-256 および SCRAM-SHA-512
Salted Challenge Response Authentication Mechanism (SCRAM) を使用して認証を実装します。SCRAM クレデンシャルは、ZooKeeper に一元的に保存されます。SCRAM は、ZooKeeper クラスターノードがプライベートネットワークで分離された状態で実行されている場合に使用できます。
GSSAPI
Kerberos サーバーに対して認証を実装します。
警告

PLAIN メカニズムは、ネットワークを通じてユーザー名とパスワードを暗号化されていない形式で送信します。そのため、TLS による暗号化と組み合わせる場合にのみ使用してください。

SASL メカニズムは JAAS 設定ファイルを使用して設定されます。Kafka は KafkaServer という名前の JAAS コンテキストを使用します。JAAS で設定された後、Kafka 設定で SASL メカニズムを有効にする必要があります。これは、sasl.enabled.mechanisms プロパティーを使用して実行されます。このプロパティーには、有効なメカニズムのコンマ区切りリストが含まれます。

sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512

inter-broker 通信に使用されるリスナーが SASL を使用している場合、sasl.mechanism.inter.broker.protocol プロパティーを使用して、使用する SASL メカニズムを指定する必要があります。以下に例を示します。

sasl.mechanism.inter.broker.protocol=PLAIN

inter-broker 通信に使用されるユーザー名およびパスワードは、フィールド username および password を使用して KafkaServer JAAS コンテキストで指定する必要があります。

SASL プレーン

PLAIN メカニズムを使用する場合、接続が許可されるユーザー名およびパスワードは JAAS コンテキストに直接指定されます。以下の例は、SASL PLAIN 認証に設定されたコンテキストを示しています。この例では、3 つの異なるユーザーを設定します。

  • admin
  • user1
  • user2
KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    user_admin="123456"
    user_user1="123456"
    user_user2="123456";
};

ユーザーデータベースを持つ JAAS 設定ファイルは、すべての Kafka ブローカーで同期して維持する必要があります。

SASL PLAIN が inter-broker の認証にも使用される場合、username および password プロパティーを JAAS コンテキストに含める必要があります。

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="123456"
    user_admin="123456"
    user_user1="123456"
    user_user2="123456";
};

SASL SCRAM

Kafka の SCRAM 認証は、SCRAM-SHA-256 および SCRAM-SHA-512 の 2 つのメカニズムで設定されます。これらのメカニズムは、使用されるハッシュアルゴリズム (SHA-256 とより強力な SHA-512) のみが異なります。SCRAM 認証を有効にするには、JAAS 設定ファイルに以下の設定を含める必要があります。

KafkaServer {
    org.apache.kafka.common.security.scram.ScramLoginModule required;
};

Kafka 設定ファイルで SASL 認証を有効にすると、両方の SCRAM メカニズムがリスト表示されます。ただし、それらの 1 つのみを inter-broker 通信に選択できます。以下に例を示します。

sasl.enabled.mechanisms=SCRAM-SHA-256,SCRAM-SHA-512
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512

SCRAM メカニズムのユーザークレデンシャルは ZooKeeper に保存されます。kafka-configs.sh ツールを使用してそれらを管理できます。たとえば、以下のコマンドを実行して、パスワード 123456 で user1 を追加します。

bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'SCRAM-SHA-256=[password=123456],SCRAM-SHA-512=[password=123456]' --entity-type users --entity-name user1

ユーザークレデンシャルを削除するには、以下を使用します。

bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name user1

SASL GSSAPI

Kerberos を使用した認証に使用される SASL メカニズムは GSSAPI と呼ばれます。Kerberos SASL 認証を設定するには、以下の設定を JAAS 設定ファイルに追加する必要があります。

KafkaServer {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    keyTab="/etc/security/keytabs/kafka_server.keytab"
    principal="kafka/kafka1.hostname.com@EXAMPLE.COM";
};

Kerberos プリンシパルのドメイン名は常に大文字にする必要があります。

JAAS 設定の他に、Kerberos サービス名を Kafka 設定の sasl.kerberos.service.name プロパティーで指定する必要があります。

sasl.enabled.mechanisms=GSSAPI
sasl.mechanism.inter.broker.protocol=GSSAPI
sasl.kerberos.service.name=kafka

複数の SASL メカニズム

Kafka は複数の SASL メカニズムを同時に使用できます。異なる JAAS 設定はすべて同じコンテキストに追加できます。

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    user_admin="123456"
    user_user1="123456"
    user_user2="123456";

    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    keyTab="/etc/security/keytabs/kafka_server.keytab"
    principal="kafka/kafka1.hostname.com@EXAMPLE.COM";

    org.apache.kafka.common.security.scram.ScramLoginModule required;
};

複数のメカニズムを有効にすると、クライアントは使用するメカニズムを選択できます。

5.4.8.5. TLS クライアント認証の有効化

この手順では、Kafka ブローカーで TLS クライアント認証を有効にする方法を説明します。

前提条件

手順

  1. ユーザー証明書に署名するために使用される認証局の公開鍵が含まれる JKS トラストストアを準備します。
  2. 以下のように、すべてのクラスターノードの /opt/kafka/config/server.properties Kafka 設定ファイルを編集します。

    • ssl.truststore.location オプションを、ユーザー証明書の認証局が含まれる JKS トラストストアへのパスに設定します。
    • ssl.truststore.password オプションを、トラストストアの保護に使用したパスワードに設定します。
    • ssl.client.auth オプションを required に設定します。

      以下に例を示します。

      ssl.truststore.location=/path/to/truststore.jks
      ssl.truststore.password=123456
      ssl.client.auth=required
  3. Kafka ブローカーを (再) 起動します。

5.4.8.6. SASL PLAIN 認証の有効化

この手順では、Kafka ブローカーで SASL PLAIN 認証を有効にする方法を説明します。

前提条件

手順

  1. /opt/kafka/config/jaas.conf JAAS 設定ファイルを編集するか、作成します。このファイルには、すべてのユーザーとそのパスワードが含まれている必要があります。このファイルがすべての Kafka ブローカーで同じであることを確認します。

    以下に例を示します。

    KafkaServer {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        user_admin="123456"
        user_user1="123456"
        user_user2="123456";
    };
  2. 以下のように、すべてのクラスターノードの /opt/kafka/config/server.properties Kafka 設定ファイルを編集します。

    • listener.security.protocol.map フィールドを変更して、SASL PLAIN 認証を使用するリスナーの SASL_PLAINTEXT または SASL_SSL プロトコルを指定します。
    • sasl.enabled.mechanisms オプションを PLAIN に設定します。

      以下に例を示します。

      listeners=INSECURE://:9092,AUTHENTICATED://:9093,REPLICATION://:9094
      listener.security.protocol.map=INSECURE:PLAINTEXT,AUTHENTICATED:SASL_PLAINTEXT,REPLICATION:PLAINTEXT
      sasl.enabled.mechanisms=PLAIN
  3. KAFKA_OPTS 環境変数を使用して Kafka ブローカーを (再) 起動し、JAAS 設定を Kafka ブローカーに渡します。

    su - kafka
    export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/config/jaas.conf"; /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties

5.4.8.7. SASL SCRAM 認証の有効化

この手順では、Kafka ブローカーで SASL SCRAM 認証を有効にする方法を説明します。

前提条件

手順

  1. /opt/kafka/config/jaas.conf JAAS 設定ファイルを編集するか、作成します。KafkaServer コンテキストの ScramLoginModule を有効にします。このファイルがすべての Kafka ブローカーで同じであることを確認します。

    以下に例を示します。

    KafkaServer {
        org.apache.kafka.common.security.scram.ScramLoginModule required;
    };
  2. 以下のように、すべてのクラスターノードの /opt/kafka/config/server.properties Kafka 設定ファイルを編集します。

    • listener.security.protocol.map フィールドを変更して、SASL SCRAM 認証を使用するリスナーの SASL_PLAINTEXT または SASL_SSL プロトコルを指定します。
    • sasl.enabled.mechanisms オプションを SCRAM-SHA-256 または SCRAM-SHA-512 に設定します。

      以下に例を示します。

      listeners=INSECURE://:9092,AUTHENTICATED://:9093,REPLICATION://:9094
      listener.security.protocol.map=INSECURE:PLAINTEXT,AUTHENTICATED:SASL_PLAINTEXT,REPLICATION:PLAINTEXT
      sasl.enabled.mechanisms=SCRAM-SHA-512
  3. KAFKA_OPTS 環境変数を使用して Kafka ブローカーを (再) 起動し、JAAS 設定を Kafka ブローカーに渡します。

    su - kafka
    export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/config/jaas.conf"; /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties

5.4.8.8. SASL SCRAM ユーザーの追加

この手順では、SASL SCRAM を使用した認証に新しいユーザーを追加する方法を説明します。

前提条件

手順

  • kafka-configs.sh ツールを使用して、新しい SASL SCRAM ユーザーを追加します。

    bin/kafka-configs.sh --bootstrap-server <broker_address> --alter --add-config 'SCRAM-SHA-512=[password=<Password>]' --entity-type users --entity-name <Username>

    以下に例を示します。

    bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'SCRAM-SHA-512=[password=123456]' --entity-type users --entity-name user1

5.4.8.9. SASL SCRAM ユーザーの削除

この手順では、SASL SCRAM 認証を使用する場合にユーザーを削除する方法を説明します。

前提条件

手順

  • kafka-configs.sh ツールを使用して SASL SCRAM ユーザーを削除します。

    /opt/kafka/bin/kafka-configs.sh --bootstrap-server <broker_address> --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name <Username>

    以下に例を示します。

    /opt/kafka/bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name user1

5.4.9. OAuth 2.0 トークンベース認証の使用

AMQ Streams は、OAUTHBEARER および PLAIN メカニズムを使用して、OAuth 2.0 認証 の使用をサポートします。

OAuth 2.0 は、アプリケーション間で標準的なトークンベースの認証および認可を有効にし、中央の認可サーバーを使用してリソースに制限されたアクセス権限を付与するトークンを発行します。

Kafka ブローカーおよびクライアントの両方が OAuth 2.0 を使用するように設定する必要があります。OAuth 2.0 認証を設定した後に OAuth 2.0 認可 を設定できます。

注記

OAuth 2.0 認証は、使用する承認サーバーに関係なく ACL ベースの Kafka 承認 と併用できます。

OAuth 2.0 認証を使用すると、アプリケーションクライアントはアカウントのクレデンシャルを公開せずにアプリケーションサーバー (リソースサーバー と呼ばれる) のリソースにアクセスできます。

アプリケーションクライアントは、アクセストークンを認証の手段として渡します。アプリケーションサーバーはこれを使用して、付与するアクセス権限のレベルを決定することもできます。認可サーバーは、アクセスの付与とアクセスに関する問い合わせを処理します。

AMQ Streams のコンテキストでは以下が行われます。

  • Kafka ブローカーは OAuth 2.0 リソースサーバーとして動作します。
  • Kafka クライアントは OAuth 2.0 アプリケーションクライアントとして動作します。

Kafka クライアントは Kafka ブローカーに対して認証を行います。ブローカーおよびクライアントは、必要に応じて OAuth 2.0 認可サーバーと通信し、アクセストークンを取得または検証します。

AMQ Streams のデプロイメントでは、OAuth 2.0 インテグレーションは以下を提供します。

  • Kafka ブローカーのサーバー側 OAuth 2.0 サポート
  • Kafka MirrorMaker、Kafka Connect、および Kafka Bridge のクライアント側 OAuth 2.0 サポート

RHEL での AMQ Streams には OAuth 2.0 ライブラリーが 2 つ含まれています。

kafka-oauth-client
io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler という名前のカスタムログインコールバックハンドラークラスを提供します。OAUTHBEARER 認証メカニズムを処理するには、Apache Kafka が提供する OAuthBearerLoginModule でログインコールバックハンドラーを使用します。
kafka-oauth-common
kafka-oauth-client ライブラリーに必要な機能の一部を提供するヘルパーライブラリーです。

提供されるクライアントライブラリーは、keycloak-corejackson-databind、および slf4j-api などの追加のサードパーティーライブラリーにも依存します。

Maven プロジェクトを使用してクライアントをパッケージ化し、すべての依存関係ライブラリーが含まれるようにすることが推奨されます。依存関係ライブラリーは今後のバージョンで変更される可能性があります。

5.4.9.1. OAuth 2.0 認証メカニズム

AMQ Streams は、OAuth 2.0 認証で OAUTHBEARER および PLAIN メカニズムをサポートします。どちらのメカニズムも、Kafka クライアントが Kafka ブローカーで認証されたセッションを確立できるようにします。クライアント、認可サーバー、および Kafka ブローカー間の認証フローは、メカニズムごとに異なります。

可能な限り、OAUTHBEARER を使用するようにクライアントを設定することが推奨されます。OAUTHBEARER では、クライアントクレデンシャルは Kafka ブローカーと 共有されることがない ため、PLAIN よりも高レベルのセキュリティーが提供されます。OAUTHBEARER をサポートしない Kafka クライアントの場合のみ、PLAIN の使用を検討してください。

クライアントの接続に OAuth 2.0 認証を使用するように Kafka ブローカーリスナーを設定します。必要な場合は、同じ oauth リスナーで OAUTHBEARER および PLAIN メカニズムを使用できます。各メカニズムをサポートするプロパティーは、oauth リスナー設定で明示的に指定する必要があります。

OAUTHBEARER の概要

OAUTHBEARER を使用するには、Kafka ブローカーの OAuth 認証リスナー設定で sasl.enabled.mechanismsOAUTHBEARER に設定します。詳細な設定は、「OAuth 2.0 Kafka ブローカーの設定」 を参照してください。

listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER

また、多くの Kafka クライアントツールでは、プロトコルレベルで OAUTHBEARER の基本サポートを提供するライブラリーを使用します。AMQ Streams では、アプリケーションの開発をサポートするために、アップストリームの Kafka Client Java ライブラリーに OAuth コールバックハンドラー が提供されます (ただし、他のライブラリーは対象外)。そのため、独自のコールバックハンドラーを作成する必要はありません。アプリケーションクライアントはコールバックハンドラーを使用してアクセストークンを提供できます。Go などの他言語で書かれたクライアントは、カスタムコードを使用して認可サーバーに接続し、アクセストークンを取得する必要があります。

OAUTHBEARER を使用する場合、クライアントはクレデンシャルを交換するために Kafka ブローカーでセッションを開始します。ここで、クレデンシャルはコールバックハンドラーによって提供されるベアラートークンの形式を取ります。コールバックを使用して、以下の 3 つの方法のいずれかでトークンの提供を設定できます。

  • クライアント ID および Secret (OAuth 2.0 クライアントクレデンシャルメカニズム を使用)
  • 設定時に手動で取得された有効期限の長いアクセストークン
  • 設定時に手動で取得された有効期限の長い更新トークン
注記

OAUTHBEARER 認証は、プロトコルレベルで OAUTHBEARER メカニズムをサポートする Kafka クライアントでのみ使用できます。

PLAIN の概要

PLAIN を使用するには、PLAINsasl.enabled.mechanisms の値に追加します。

listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER,PLAIN

PLAIN は、すべての Kafka クライアントツールによって使用される簡単な認証メカニズムです。PLAIN を OAuth 2.0 認証で使用できるようにするために、AMQ Streams では OAuth 2.0 over PLAIN サーバー側のコールバックが提供されます。

PLAIN の AMQ Streams 実装では、クライアントのクレデンシャルは ZooKeeper に保存されません。代わりに、OAUTHBEARER 認証が使用される場合と同様に、クライアントのクレデンシャルは準拠した認可サーバーの背後で一元的に処理されます。

OAuth 2.0 over PLAIN コールバックを併用する場合、以下のいずれかの方法を使用して Kafka クライアントは Kafka ブローカーで認証されます。

  • クライアント ID および Secret (OAuth 2.0 クライアントクレデンシャル メカニズムを使用)
  • 設定時に手動で取得された有効期限の長いアクセストークン

どちらの方法でも、クライアントは Kafka ブローカーにクレデンシャルを渡すために、PLAIN username および password プロパティーを提供する必要があります。クライアントはこれらのプロパティーを使用してクライアント ID およびシークレット、または、ユーザー名およびアクセストークンを渡します。

クライアント ID およびシークレットは、アクセストークンの取得に使用されます。

アクセストークンは、password プロパティーの値として渡されます。$accessToken: 接頭辞の有無に関わらずアクセストークンを渡します。

  • リスナー設定でトークンエンドポイント (oauth.token.endpoint.uri) を設定する場合は、接頭辞が必要です。
  • リスナー設定でトークンエンドポイント (oauth.token.endpoint.uri) を設定しない場合は、接頭辞は必要ありません。Kafka ブローカーは、パスワードを raw アクセストークンとして解釈します。

アクセストークンとして password が設定されている場合、username は Kafka ブローカーがアクセストークンから取得するプリンシパル名と同じものを設定する必要があります。oauth.username.claimoauth.fallback.username.claimoauth.fallback.username.prefix、および oauth.userinfo.endpoint.uri プロパティーを使用すると、リスナーにユーザー名の抽出オプションを指定できます。ユーザー名の抽出プロセスも、承認サーバーによって異なります。特に、クライアント ID をアカウント名にマッピングする方法により異なります。

5.4.9.1.1. プロパティーまたは変数を使用した OAuth 2.0 の設定

OAuth 2.0 設定は、Java Authentication and Authorization Service (JAAS) プロパティーまたは環境変数を使用して設定できます。

  • JAAS のプロパティーは、server.properties 設定ファイルで設定され、listener.name.LISTENER-NAME.oauthbearer.sasl.jaas.config プロパティーのキーと値のペアとして渡されます。
  • 環境変数を使用する場合は、server.properties ファイルで listener.name.LISTENER-NAME.oauthbearer.sasl.jaas.config プロパティーを指定する必要がありますが、他の JAAS プロパティーを省略できます。

    大文字 (アッパーケース) の環境変数の命名規則を使用できます。

AMQ Streams OAuth 2.0 ライブラリーは、以下で始まるプロパティーを使用します。

5.4.9.2. OAuth 2.0 Kafka ブローカーの設定

OAuth 2.0 認証の Kafka ブローカー設定には、以下が関係します。

  • 認可サーバーでの OAuth 2.0 クライアントの作成
  • Kafka クラスターでの OAuth 2.0 認証の設定
注記

認可サーバーに関連する Kafka ブローカーおよび Kafka クライアントはどちらも OAuth 2.0 クライアントと見なされます。

5.4.9.2.1. 認可サーバーの OAuth 2.0 クライアント設定

セッションの開始中に受信されたトークンを検証するように Kafka ブローカーを設定するには、認可サーバーで OAuth 2.0 の クライアント 定義を作成し、以下のクライアントクレデンシャルが有効な状態で 機密情報 として設定することが推奨されます。

  • kafka-broker のクライアント ID (例)
  • 認証メカニズムとしてのクライアント ID およびシークレット
注記

認可サーバーのパブリックでないイントロスペクションエンドポイントを使用する場合のみ、クライアント ID およびシークレットを使用する必要があります。高速のローカル JWT トークンの検証と同様に、パブリック認可サーバーのエンドポイントを使用する場合は通常、クレデンシャルは必要ありません。

5.4.9.2.2. Kafka クラスターでの OAuth 2.0 認証設定

Kafka クラスターで OAuth 2.0 認証を使用するには、Kafka server.properties ファイルで Kafka クラスターの OAuth 認証リスナー設定を有効にします。最小限の設定が必要です。また、TLS が inter-broker 通信に使用される TLS リスナーを設定することもできます。

以下の方法のいずれかを使用して、認可サーバーによるトークン検証用にブローカーを設定できます。

  • 高速のローカルトークン検証: 署名付き JWT 形式のアクセストークンと組み合わせた JWKS エンドポイント
  • イントロスペクション エンドポイント

OAUTHBEARER または PLAIN 認証、またはその両方を設定できます。

以下の例は、グローバル リスナー設定を適用する最小の設定を示しています。これは、inter-broker 通信がアプリケーションクライアントと同じリスナーを通過することを意味します。

この例では、sasl.enabled.mechanisms ではなく、listener.name.LISTENER-NAME.sasl.enabled.mechanisms を指定する特定のリスナーの OAuth 2.0 設定も示しています。LISTENER-NAME は、リスナーの大文字と小文字を区別しない名前です。ここでは、リスナー CLIENT という名前が付けられ、プロパティー名は listener.name.client.sasl.enabled.mechanisms になります。

この例では OAUTHBEARER 認証を使用します。

例: JWKS エンドポイントを使用した OAuth 2.0 認証の最小リスナー設定

sasl.enabled.mechanisms=OAUTHBEARER 1
listeners=CLIENT://0.0.0.0:9092 2
listener.security.protocol.map=CLIENT:SASL_PLAINTEXT 3
listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER 4
sasl.mechanism.inter.broker.protocol=OAUTHBEARER 5
inter.broker.listener.name=CLIENT 6
listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler 7
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ 8
  oauth.valid.issuer.uri="https://AUTH-SERVER-ADDRESS" \ 9
  oauth.jwks.endpoint.uri="https://AUTH-SERVER-ADDRESS/jwks" \ 10
  oauth.username.claim="preferred_username"  \ 11
  oauth.client.id="kafka-broker" \ 12
  oauth.client.secret="kafka-secret" \ 13
  oauth.token.endpoint.uri="https://AUTH-SERVER-ADDRESS/token" ; 14
listener.name.client.oauthbearer.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler 15
listener.name.client.oauthbearer.connections.max.reauth.ms=3600000 16

1
SASL でのクレデンシャル交換に OAUTHBEARER メカニズムを有効にします。
2
接続するクライアントアプリケーションのリスナーを設定します。システム hostname はアドバタイズされたホスト名として使用されます。このホスト名は、再接続するためにクライアントが解決する必要があります。この例では、リスナーの名前は CLIENT です。
3
リスナーのチャネルプロトコルを指定します。SASL_SSL は TLS 用です。暗号化されていない接続 (TLS なし) には SASL_PLAINTEXT が使用されますが、TCP 接続層での盗聴のリスクがあります。
4
CLIENT リスナーの OAUTHBEARER メカニズムを指定します。クライアント名 (CLIENT) は通常、listeners プロパティーでは大文字、listener.name プロパティー (listener.name.client) では小文字、そして listener.name.client.* プロパティーの一部である場合は小文字で指定されます。
5
inter-broker 通信の OAUTHBEARER メカニズムを指定します。
6
inter-broker 通信のリスナーを指定します。仕様は、有効な設定のために必要です。
7
クライアントリスナーで OAuth 2.0 認証を設定します。
8
クライアントおよび inter-broker 通信の認証設定を行います。oauth.client.idoauth.client.secret、および auth.token.endpoint.uri プロパティーは inter-broker 設定に関連するものです。
9
有効な発行者 URI。この発行者が発行するアクセストークンのみが受け入れられます。例: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME
10
JWKS エンドポイント URL。例: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/certs
11
トークンの実際のユーザー名が含まれるトークン要求 (またはキー)。ユーザー名は、ユーザーの識別に使用される principal です。値は、使用される認証フローと承認サーバーによって異なります。
12
すべてのブローカーで同じ Kafka ブローカーのクライアント ID。これは、kafka-broker として認可サーバーに登録されたクライアントです
13
Kafka ブローカーのシークレット (すべてのブローカーで同じ)。
14
認可サーバーへの OAuth 2.0 トークンエンドポイント URL。実稼働環境の場合は、常に https:// urls を使用してください。例: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/token
15
inter-broker 通信の OAuth2.0 認証を有効にします (inter-broker 通信の OAuth2.0 認証にのみ必要)。
16
(オプション): トークンの期限が切れるとセッションが強制的に期限切れとなり、また、Kafka の再認証メカニズム が有効になります。指定された値がアクセストークンの有効期限が切れるまでの残り時間よりも短い場合、クライアントは実際にトークンの有効期限が切れる前に再認証する必要があります。デフォルトでは、アクセストークンの期限が切れてもセッションは期限切れにならず、クライアントは再認証を試行しません。

以下の例は、TLS が inter-broker の通信に使用される TLS リスナーの最小設定を示しています。

例: OAuth 2.0 認証の TLS リスナー設定

listeners=REPLICATION://kafka:9091,CLIENT://kafka:9092 1
listener.security.protocol.map=REPLICATION:SSL,CLIENT:SASL_PLAINTEXT 2
listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER
inter.broker.listener.name=REPLICATION
listener.name.replication.ssl.keystore.password=KEYSTORE-PASSWORD 3
listener.name.replication.ssl.truststore.password=TRUSTSTORE-PASSWORD
listener.name.replication.ssl.keystore.type=JKS
listener.name.replication.ssl.truststore.type=JKS
listener.name.replication.ssl.endpoint.identification.algorithm=HTTPS 4
listener.name.replication.ssl.secure.random.implementation=SHA1PRNG 5
listener.name.replication.ssl.keystore.location=PATH-TO-KEYSTORE 6
listener.name.replication.ssl.truststore.location=PATH-TO-TRUSTSTORE 7
listener.name.replication.ssl.client.auth=required 8
listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  oauth.valid.issuer.uri="https://AUTH-SERVER-ADDRESS" \
  oauth.jwks.endpoint.uri="https://AUTH-SERVER-ADDRESS/jwks" \
  oauth.username.claim="preferred_username" ; 9

1
inter-broker 通信とクライアントアプリケーションには、個別の設定が必要です。
2
REPLICATION リスナーが TLS を使用し、CLIENT リスナーが暗号化されていないチャネルで SASL を使用するように設定します。実稼働環境では、クライアントは暗号化されたチャンネル (SASL_SSL) を使用できます。
3
ssl. プロパティーは TLS 設定を定義します。
4
乱数ジェネレーターの実装。設定されていない場合は、Java プラットフォーム SDK デフォルトが使用されます。
5
ホスト名の検証。空の文字列に設定すると、ホスト名の検証はオフになります。設定されていない場合、デフォルト値は HTTPS で、サーバー証明書のホスト名の検証を強制します。
6
リスナーのキーストアへのパス。
7
リスナーのトラストストアへのパス。
8
(inter-broker 接続に使用される) TLS 接続の確立時に REPLICATION リスナーのクライアントがクライアント証明書で認証する必要があることを指定します。
9
OAuth 2.0 の CLIENT リスナーを設定します。認可サーバーとの接続はセキュアな HTTPS 接続を使用する必要があります。

以下の例は、SASL でのクレデンシャル交換に PLAIN 認証メカニズムを使用した OAuth 2.0 認証の最小設定を示しています。高速なローカルトークン検証が使用されます。

例: PLAIN 認証の最小リスナー設定

listeners=CLIENT://0.0.0.0:9092 1
listener.security.protocol.map=CLIENT:SASL_PLAINTEXT 2
listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER,PLAIN 3
sasl.mechanism.inter.broker.protocol=OAUTHBEARER 4
inter.broker.listener.name=CLIENT 5
listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler 6
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ 7
  oauth.valid.issuer.uri="http://AUTH_SERVER/auth/realms/REALM" \ 8
  oauth.jwks.endpoint.uri="https://AUTH_SERVER/auth/realms/REALM/protocol/openid-connect/certs" \ 9
  oauth.username.claim="preferred_username"  \ 10
  oauth.client.id="kafka-broker" \ 11
  oauth.client.secret="kafka-secret" \ 12
  oauth.token.endpoint.uri="https://AUTH-SERVER-ADDRESS/token" ; 13
listener.name.client.oauthbearer.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler 14
listener.name.client.plain.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.plain.JaasServerOauthOverPlainValidatorCallbackHandler 15
listener.name.client.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ 16
  oauth.valid.issuer.uri="https://AUTH-SERVER-ADDRESS" \ 17
  oauth.jwks.endpoint.uri="https://AUTH-SERVER-ADDRESS/jwks" \ 18
  oauth.username.claim="preferred_username"  \ 19
  oauth.token.endpoint.uri="http://AUTH_SERVER/auth/realms/REALM/protocol/openid-connect/token" ; 20
connections.max.reauth.ms=3600000 21

1
接続するクライアントアプリケーション用のリスナー (この例では CLIENT) を設定します。システム hostname はアドバタイズされたホスト名として使用されます。このホスト名は、再接続するためにクライアントが解決する必要があります。これは唯一の設定済みリスナーであるため、inter-broker 通信にも使用されます。
2
暗号化されていないチャネルで SASL を使用するように例の CLIENT リスナーを設定します。実稼働環境では、TCP 接続層での盗聴を避けるために、クライアントは暗号化チャンネル (SASL_SSL) を使用する必要があります。
3
SASL でのクレデンシャル交換の PLAIN 認証メカニズムおよび OAUTHBEARER を有効にします。inter-broker 通信に必要なため、OAUTHBEARER も指定されます。Kafka クライアントは、接続に使用するメカニズムを選択できます。
4
inter-broker 通信の OAUTHBEARER 認証メカニズムを指定します。
5
inter-broker 通信のリスナー (この例では CLIENT) を指定します。有効な設定のために必要です。
6
OAUTHBEARER メカニズムのサーバーコールバックハンドラーを設定します。
7
OAUTHBEARER メカニズムを使用して、クライアントおよび inter-broker 通信の認証設定を設定します。oauth.client.idoauth.client.secret、および oauth.token.endpoint.uri プロパティーは inter-broker 設定に関連するものです。
8
有効な発行者 URI。この発行者からのアクセストークンのみが受け入れられます。例: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME
9
JWKS エンドポイント URL。例: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/certs
10
トークンの実際のユーザー名が含まれるトークン要求 (またはキー)。ユーザー名は、ユーザーを識別する プリンシパル です。値は、使用される認証フローと承認サーバーによって異なります。
11
すべてのブローカーで同じ Kafka ブローカーのクライアント ID。これは、kafka-broker として認可サーバーに登録されたクライアントです
12
Kafka ブローカーの秘密 (すべてのブローカーで同じ)。
13
認可サーバーへの OAuth 2.0 トークンエンドポイント URL。実稼働環境の場合は、常に https:// urls を使用してください。例: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/token
14
inter-broker 通信に OAuth 2.0 認証を有効にします。
15
PLAIN 認証のサーバーコールバックハンドラーを設定します。
16
PLAIN 認証を使用して、クライアント通信の認証設定を設定します。

oauth.token.endpoint.uri は、OAuth 2.0 クライアントクレデンシャルメカニズム を使用して OAuth 2.0 over PLAIN を有効にする任意のプロパティーです。

17
有効な発行者 URI。この発行者からのアクセストークンのみが受け入れられます。例: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME
18
JWKS エンドポイント URL。例: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/certs
19
トークンの実際のユーザー名が含まれるトークン要求 (またはキー)。ユーザー名は、ユーザーを識別する プリンシパル です。値は、使用される認証フローと承認サーバーによって異なります。
20
承認サーバーへの OAuth 2.0 トークンエンドポイント URL。PLAIN メカニズムの追加設定。これが指定されている場合、クライアントは $accessToken: 接頭辞を使用してアクセストークンを password として渡すことで、PLAIN 経由で認証できます。

実稼働環境の場合は、常に https:// urls を使用してください。例: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/token

21
(オプション): トークンの期限が切れるとセッションが強制的に期限切れとなり、また、Kafka の再認証メカニズム が有効になります。指定された値がアクセストークンの有効期限が切れるまでの残り時間よりも短い場合、クライアントは実際にトークンの有効期限が切れる前に再認証する必要があります。デフォルトでは、アクセストークンの期限が切れてもセッションは期限切れにならず、クライアントは再認証を試行しません。
5.4.9.2.3. 高速なローカル JWT トークン検証の設定

高速なローカル JWT トークンの検証では、JWT トークンの署名がローカルでチェックされます。

ローカルチェックでは、トークンに対して以下が確認されます。

  • アクセストークンに Bearer の (typ) 要求値が含まれ、トークンがタイプに準拠することを確認します。
  • 有効 (期限切れでない) かどうかを確認します。
  • トークンに validIssuerURI と一致する発行元があることを確認します。

認可サーバーによって発行されなかったすべてのトークンが拒否されるよう、リスナーの設定時に 有効な発行者 URI を指定します。

高速のローカル JWT トークン検証の実行中に、認可サーバーの通信は必要はありません。OAuth 2.0 認可サーバーによって公開される JWK エンドポイント URI を指定して、高速のローカル JWT トークン検証をアクティベートします。エンドポイントには、署名済み JWT トークンの検証に使用される公開鍵が含まれます。これらは、Kafka クライアントによってクレデンシャルとして送信されます。

注記

認可サーバーとの通信はすべて HTTPS を使用して実行する必要があります。

TLS リスナーでは、証明書 トラストストア を設定し、トラストストアファイルをポイントできます。

高速なローカル JWT トークン検証のプロパティーの例

listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  oauth.valid.issuer.uri="https://AUTH-SERVER-ADDRESS" \ 1
  oauth.jwks.endpoint.uri="https://AUTH-SERVER-ADDRESS/jwks" \ 2
  oauth.jwks.refresh.seconds="300" \ 3
  oauth.jwks.refresh.min.pause.seconds="1" \ 4
  oauth.jwks.expiry.seconds="360" \ 5
  oauth.username.claim="preferred_username" \ 6
  oauth.ssl.truststore.location="PATH-TO-TRUSTSTORE-P12-FILE" \ 7
  oauth.ssl.truststore.password="TRUSTSTORE-PASSWORD" \ 8
  oauth.ssl.truststore.type="PKCS12" ; 9

1
有効な発行者 URI。この発行者が発行するアクセストークンのみが受け入れられます。例: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME
2
JWKS エンドポイント URL。例: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/certs
3
エンドポイントの更新間隔 (デフォルトは 300)。
4
JWKS 公開鍵の更新が連続して試行される間隔の最小一時停止時間 (秒単位)。不明な署名キーが検出されると、JWKS キーの更新は、最後に更新を試みてから少なくとも指定された期間は一時停止し、通常の定期スケジュール以外でスケジュールされます。キーの更新は指数バックオフのルールに従い、oauth.jwks.refresh.seconds に到達するまで、一時停止を増やして失敗した更新を再試行します。デフォルト値は 1 です。
5
JWK 証明書が期限切れになる前に有効とみなされる期間。デフォルトは 360 秒です。デフォルトよりも長い時間を指定する場合は、無効になった証明書へのアクセスが許可されるリスクを考慮してください。
6
トークンの実際のユーザー名が含まれるトークン要求 (またはキー)。ユーザー名は、ユーザーの識別に使用される principal です。値は、使用される認証フローと承認サーバーによって異なります。
7
TLS 設定で使用されるトラストストアの場所。
8
トラストストアにアクセスするためのパスワード。
9
PKCS #12 形式のトラストストアタイプ。
5.4.9.2.4. OAuth 2.0 イントロスペクションエンドポイントの設定

OAuth 2.0 のイントロスペクションエンドポイントを使用したトークンの検証では、受信したアクセストークンは不透明として対処されます。Kafka ブローカーは、アクセストークンをイントロスペクションエンドポイントに送信します。このエンドポイントは、検証に必要なトークン情報を応答として返します。ここで重要なのは、特定のアクセストークンが有効である場合は最新情報を返すことで、トークンの有効期限に関する情報も返します。

OAuth 2.0 イントロスペクションベースの検証を設定するには、高速ローカル JWT トークン検証用に指定された JWK エンドポイント URI ではなく、イントロスペクションエンドポイント URI を指定します。通常、イントロスペクションエンドポイントは保護されているため、認可サーバーに応じて client ID および client secret を指定する必要があります。

イントロスペクションエンドポイントのプロパティー例

listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  oauth.introspection.endpoint.uri="https://AUTH-SERVER-ADDRESS/introspection" \ 1
  oauth.client.id="kafka-broker" \ 2
  oauth.client.secret="kafka-broker-secret" \ 3
  oauth.ssl.truststore.location="PATH-TO-TRUSTSTORE-P12-FILE" \ 4
  oauth.ssl.truststore.password="TRUSTSTORE-PASSWORD" \ 5
  oauth.ssl.truststore.type="PKCS12" \ 6
  oauth.username.claim="preferred_username" ; 7

1
OAuth 2.0 イントロスペクションエンドポイント URI。例: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/token/introspect
2
Kafka ブローカーのクライアント ID。
3
Kafka ブローカーのシークレット。
4
TLS 設定で使用されるトラストストアの場所。
5
トラストストアにアクセスするためのパスワード。
6
PKCS #12 形式のトラストストアタイプ。
7
トークンの実際のユーザー名が含まれるトークン要求 (またはキー)。ユーザー名は、ユーザーの識別に使用される principal です。oauth.username.claim の値は、使用される承認サーバーによって異なります。

5.4.9.3. Kafka ブローカーの再認証の設定

Kafka クライアントと Kafka ブローカー間の OAuth 2.0 セッションに Kafka session re-authentication を使用するように、OAuth リスナーを設定できます。このメカニズムは、定義された期間後に、クライアントとブローカー間の認証されたセッションを期限切れにします。セッションの有効期限が切れると、クライアントは既存の接続を破棄せずに再使用して、新しいセッションを即座に開始します。

セッションの再認証はデフォルトで無効になっています。これは、server.properties ファイルで有効にできます。SASL メカニズムとして OAUTHBEARER または PLAIN を有効にした TLS リスナーに connections.max.reauth.ms プロパティーを設定します。

リスナーごとにセッションの再認証を指定できます。以下に例を示します。

listener.name.client.oauthbearer.connections.max.reauth.ms=3600000

セッションの再認証は、クライアントによって使用される Kafka クライアントライブラリーによってサポートされる必要があります。

セッションの再認証は、高速ローカル JWT または イントロスペクションエンドポイント のトークン検証と共に使用できます。

クライアントの再認証

ブローカーの認証されたセッションが期限切れになると、クライアントは接続を切断せずに新しい有効なアクセストークンをブローカーに送信し、既存のセッションを再認証する必要があります。

トークンの検証に成功すると、既存の接続を使用して新しいクライアントセッションが開始されます。クライアントが再認証に失敗した場合、さらにメッセージを送受信しようとすると、ブローカーは接続を閉じます。ブローカーで再認証メカニズムが有効になっていると、Kafka クライアントライブラリー 2.2 以降を使用する Java クライアントが自動的に再認証されます。

更新トークンが使用される場合、セッションの再認証は更新トークンにも適用されます。セッションが期限切れになると、クライアントは更新トークンを使用してアクセストークンを更新します。その後、クライアントは新しいアクセストークンを使用して既存の接続に再認証されます。

OAUTHBEARER および PLAIN のセッションの有効期限

セッションの再認証が設定されている場合、OAUTHBEARER と PLAIN 認証ではセッションの有効期限は異なります。

クライアント ID とシークレット による方法を使用する OAUTHBEARER および PLAIN の場合:

  • ブローカーの認証されたセッションは、設定された connections.max.reauth.ms で期限切れになります。
  • アクセストークンが設定期間前に期限切れになると、セッションは設定期間前に期限切れになります。

有効期間の長いアクセストークン 方法を使用する PLAIN の場合:

  • ブローカーの認証されたセッションは、設定された connections.max.reauth.ms で期限切れになります。
  • アクセストークンが設定期間前に期限切れになると、再認証に失敗します。セッションの再認証は試行されますが、PLAIN にはトークンを更新するメカニズムがありません。

connections.max.reauth.ms設定されていない 場合は、再認証しなくても、OAUTHBEARER および PLAIN クライアントはブローカーへの接続を無期限に維持します。認証されたセッションは、アクセストークンの期限が切れても終了しません。ただし、keycloak 認可を使用したり、カスタム authorizer をインストールして、認可を設定する場合に考慮できます。

5.4.9.4. OAuth 2.0 Kafka クライアントの設定

Kafka クライアントは以下のいずれかで設定されます。

  • 認可サーバーから有効なアクセストークンを取得するために必要なクレデンシャル (クライアント ID およびシークレット)。
  • 認可サーバーから提供されたツールを使用して取得された、有効期限の長い有効なアクセストークンまたは更新トークン。

アクセストークンは、Kafka ブローカーに送信される唯一の情報です。アクセストークンを取得するために認可サーバーでの認証に使用されるクレデンシャルは、ブローカーに送信されません。

クライアントによるアクセストークンの取得後、認可サーバーと通信する必要はありません。

クライアント ID とシークレットを使用した認証が最も簡単です。有効期間の長いアクセストークンまたは更新トークンを使用すると、認可サーバーツールに追加の依存関係があるため、より複雑になります。

注記

有効期間が長いアクセストークンを使用している場合は、認可サーバーでクライアントを設定し、トークンの最大有効期間を長くする必要があります。

Kafka クライアントが直接アクセストークンで設定されていない場合、クライアントは認可サーバーと通信して Kafka セッションの開始中にアクセストークンのクレデンシャルを交換します。Kafka クライアントは以下のいずれかを交換します。

  • クライアント ID およびシークレット
  • クライアント ID、更新トークン、および (任意の) シークレット

5.4.9.5. OAuth 2.0 クライアント認証フロー

OAuth 2.0 認証フローは、基礎となる Kafka クライアントおよび Kafka ブローカー設定によって異なります。フローは、使用する認可サーバーによってもサポートされる必要があります。

Kafka ブローカーリスナー設定は、クライアントがアクセストークンを使用して認証する方法を決定します。クライアントはクライアント ID およびシークレットを渡してアクセストークンをリクエストできます。

リスナーが PLAIN 認証を使用するように設定されている場合、クライアントはクライアント ID およびシークレット、または、ユーザー名およびアクセストークンで認証できます。これらの値は PLAIN メカニズムの username および password プロパティーとして渡されます。

リスナー設定は、以下のトークン検証オプションをサポートします。

  • 認可サーバーと通信しない、JWT の署名確認およびローカルトークンのイントロスペクションをベースとした高速なローカルトークン検証を使用できます。認可サーバーは、トークンで署名を検証するために使用される公開証明書のある JWKS エンドポイントを提供します。
  • 認可サーバーが提供するトークンイントロスペクションエンドポイントへの呼び出しを使用することができます。新しい Kafka ブローカー接続が確立されるたびに、ブローカーはクライアントから受け取ったアクセストークンを認可サーバーに渡します。Kafka ブローカーは応答を確認して、トークンが有効かどうかを確認します。
注記

認可サーバーは不透明なアクセストークンの使用のみを許可する可能性があり、この場合はローカルトークンの検証は不可能です。

Kafka クライアントクレデンシャルは、以下のタイプの認証に対して設定することもできます。

  • 以前に生成された有効期間の長いアクセストークンを使用した直接ローカルアクセス
  • 新しいアクセストークンを発行するための承認サーバーとの通信 (クライアント ID およびシークレットまたは更新トークンを使用)
5.4.9.5.1. SASL OAUTHBEARER メカニズムを使用したクライアント認証フローの例

SASL OAUTHBEARER メカニズムを使用して、Kafka 認証に以下の通信フローを使用できます。

クライアントがクライアント ID とシークレットを使用し、ブローカーが検証を認可サーバーに委任する場合

Client using client ID and secret with broker delegating validation to authorization server

  1. Kafka クライアントは、クライアント ID およびシークレットを使用して承認サーバーからアクセストークンを要求し、必要に応じて更新トークンを要求します。
  2. 承認サーバーは新しいアクセストークンを生成します。
  3. Kafka クライアントは、SASL OAUTHBEARER メカニズムを使用してアクセストークンを渡すことで Kafka ブローカーで認証されます。
  4. Kafka ブローカーは、独自のクライアント ID およびシークレットを使用し、認可サーバーでトークンイントロスペクションエンドポイントを呼び出すことで、アクセストークンを検証します。
  5. トークンが有効な場合、Kafka クライアントセッションが確立されます。

クライアントがクライアント ID およびシークレットを使用し、ブローカーが高速のローカルトークン検証を実行する場合

Client using client ID and secret with broker performing fast local token validation

  1. Kafka クライアントは、クライアント ID およびシークレットを使用し、オプションで更新トークンを使用して、トークンエンドポイントから承認サーバーで認証します。
  2. 承認サーバーは新しいアクセストークンを生成します。
  3. Kafka クライアントは、SASL OAUTHBEARER メカニズムを使用してアクセストークンを渡すことで Kafka ブローカーで認証されます。
  4. Kafka ブローカーは、JWT トークン署名チェックおよびローカルトークンイントロスペクションを使用して、ローカルでアクセストークンを検証します。

クライアントが有効期限の長いアクセストークンを使用し、ブローカーが検証を認可サーバーに委任する場合

Client using long-lived access token with broker delegating validation to authorization server

  1. Kafka クライアントは、SASL OAUTHBEARER メカニズムを使用して、有効期限の長いアクセストークンを渡すために Kafka ブローカーで認証します。
  2. Kafka ブローカーは、独自のクライアント ID およびシークレットを使用して、認可サーバーでトークンイントロスペクションエンドポイントを呼び出し、アクセストークンを検証します。
  3. トークンが有効な場合、Kafka クライアントセッションが確立されます。

クライアントが有効期限の長いアクセストークンを使用し、ブローカーが高速のローカル検証を実行する場合

Client using long-lived access token with broker performing fast local validation

  1. Kafka クライアントは、SASL OAUTHBEARER メカニズムを使用して、有効期限の長いアクセストークンを渡すために Kafka ブローカーで認証します。
  2. Kafka ブローカーは、JWT トークン署名チェックおよびローカルトークンイントロスペクションを使用して、ローカルでアクセストークンを検証します。
警告

トークンが取り消された場合に認可サーバーとのチェックが行われないため、高速のローカル JWT トークン署名の検証は有効期限の短いトークンにのみ適しています。トークンの有効期限はトークンに書き込まれますが、失効はいつでも発生する可能性があるため、認可サーバーと通信せずに対応することはできません。発行されたトークンはすべて期限切れになるまで有効とみなされます。

5.4.9.5.2. SASL PLAIN メカニズムを使用したクライアント認証フローの例

OAuth PLAIN メカニズムを使用して、Kafka 認証に以下の通信フローを使用できます。

クライアントがクライアント ID およびシークレットを使用し、ブローカーがクライアントのアクセストークンを取得する場合

Client using a client ID and secret with the broker obtaining the access token for the client

  1. Kafka クライアントは、clientId をユーザー名として、secret をパスワードとして渡します。
  2. Kafka ブローカーは、トークンエンドポイントを使用して clientId および secret を認可サーバーに渡します。
  3. 認可サーバーは、新しいアクセストークンまたはエラー (クライアントクレデンシャルが有効でない場合) を返します。
  4. Kafka ブローカーは、以下のいずれかの方法でトークンを検証します。

    1. トークンイントロスペクションエンドポイントが指定されている場合、Kafka ブローカーは認可サーバーでエンドポイントを呼び出すことで、アクセストークンを検証します。トークンの検証に成功した場合には、セッションが確立されます。
    2. ローカルトークンのイントロスペクションが使用される場合、要求は認可サーバーに対して行われません。Kafka ブローカーは、JWT トークン署名チェックを使用して、アクセストークンをローカルで検証します。

クライアントが、クライアント ID およびシークレットなしで有効期限の長いアクセストークンを使用する場合

Client using a long-lived access token without a client ID and secret

  1. Kafka クライアントはユーザー名とパスワードを渡します。パスワードは、クライアントを実行する前に手動で取得および設定されたアクセストークンの値を提供します。
  2. Kafka ブローカーリスナーが認証のトークンエンドポイントで設定されているかどうかに応じて、$accessToken: 文字列の接頭辞の有無にかかわらず、パスワードは渡されます。

    1. トークンエンドポイントが設定されている場合、パスワードの前に $accessToken: を付け、password パラメーターにクライアントシークレットではなくアクセストークンが含まれていることをブローカーに知らせる必要があります。Kafka ブローカーは、ユーザー名をアカウントのユーザー名として解釈します。
    2. トークンエンドポイントが Kafka ブローカーリスナーで設定されていない場合 (no-client-credentials mode を強制)、パスワードは接頭辞なしでアクセストークンを提供する必要があります。Kafka ブローカーは、ユーザー名をアカウントのユーザー名として解釈します。このモードでは、クライアントはクライアント ID およびシークレットを使用せず、password パラメーターは常に raw アクセストークンとして解釈されます。
  3. Kafka ブローカーは、以下のいずれかの方法でトークンを検証します。

    1. トークンイントロスペクションエンドポイントが指定されている場合、Kafka ブローカーは認可サーバーでエンドポイントを呼び出すことで、アクセストークンを検証します。トークンの検証に成功した場合には、セッションが確立されます。
    2. ローカルトークンイントロスペクションが使用されている場合には、認可サーバーへの要求はありません。Kafka ブローカーは、JWT トークン署名チェックを使用して、アクセストークンをローカルで検証します。

5.4.9.6. OAuth 2.0 認証の設定

OAuth 2.0 は、Kafka クライアントと AMQ Streams コンポーネントとの対話に使用されます。

AMQ Streams に OAuth 2.0 を使用するには、以下を行う必要があります。

5.4.9.6.1. OAuth 2.0 認可サーバーとしての Red Hat Single Sign-On の設定

この手順では、Red Hat Single Sign-On を認可サーバーとしてデプロイし、AMQ Streams と統合するための設定方法を説明します。

認可サーバーは、一元的な認証および認可の他、ユーザー、クライアント、およびパーミッションの一元管理を実現します。Red Hat Single Sign-On にはレルムの概念があります。レルム はユーザー、クライアント、パーミッション、およびその他の設定の個別のセットを表します。デフォルトの マスターレルム を使用できますが、新しいレルムを作成することもできます。各レルムは独自の OAuth 2.0 エンドポイントを公開します。そのため、アプリケーションクライアントとアプリケーションサーバーはすべて同じレルムを使用する必要があります。

AMQ Streams で OAuth 2.0 を使用するには、Red Hat Single Sign-On のデプロイメントを使用して認証レルムを作成および管理します。

注記

Red Hat Single Sign-On がすでにデプロイされている場合は、デプロイメントの手順を省略して、現在のデプロイメントを使用できます。

作業を開始する前に

Red Hat Single Sign-On を使用するための知識が必要です。

インストールおよび管理の手順は、以下を参照してください。

前提条件

  • AMQ Streams および Kafka が稼働している。

Red Hat Single Sign-On デプロイメントの場合:

手順

  1. Red Hat Single Sign-On をインストールします。

    ZIP ファイルから、または RPM を使用してインストールできます。

  2. Red Hat Single Sign-On の Admin Console にログインし、AMQ Streams の OAuth 2.0 ポリシーを作成します。

    ログインの詳細は、Red Hat Single Sign-On のデプロイ時に提供されます。

  3. レルムを作成し、有効にします。

    既存のマスターレルムを使用できます。

  4. 必要に応じて、レルムのセッションおよびトークンのタイムアウトを調整します。
  5. kafka-broker というクライアントを作成します。
  6. Settings タブで以下を設定します。

    • Access TypeConfidential に設定します。
    • Standard Flow EnabledOFF に設定し、このクライアントからの Web ログインを無効にします。
    • Service Accounts EnabledON に設定し、このクライアントが独自の名前で認証できるようにします。
  7. 続行する前に Save クリックします。
  8. Credentials タブにある、AMQ Streams の Kafka クラスター設定で使用するシークレットを書き留めておきます。
  9. Kafka ブローカーに接続するすべてのアプリケーションクライアントに対して、このクライアント作成手順を繰り返し行います。

    新しいクライアントごとに定義を作成します。

    設定では、名前をクライアント ID として使用します。

次のステップ

認可サーバーのデプロイおよび設定後に、Kafka ブローカーが OAuth 2.0 を使用するように設定 します。

5.4.9.6.2. Kafka ブローカーの OAuth 2.0 サポートの設定

この手順では、ブローカーリスナーが認可サーバーを使用して OAuth 2.0 認証を使用するように、Kafka ブローカーを設定する方法について説明します。

TLS リスナーを設定して、暗号化されたインターフェイスで OAuth 2.0 を使用することが推奨されます。プレーンリスナーは推奨されません。

選択した認可サーバーをサポートするプロパティーと、実装している認可のタイプを使用して、Kafka ブローカーを設定します。

作業を開始する前の注意事項

Kafka ブローカーリスナーの設定および認証に関する詳細は、以下を参照してください。

リスナー設定で使用されるプロパティーの説明は、以下を参照してください。

前提条件

  • AMQ Streams および Kafka が稼働している。
  • OAuth 2.0 の認可サーバーがデプロイされている。

手順

  1. server.properties ファイルで Kafka ブローカーリスナー設定を設定します。

    たとえば、OAUTHBEARER メカニズムを使用する場合:

    sasl.enabled.mechanisms=OAUTHBEARER
    listeners=CLIENT://0.0.0.0:9092
    listener.security.protocol.map=CLIENT:SASL_PLAINTEXT
    listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER
    sasl.mechanism.inter.broker.protocol=OAUTHBEARER
    inter.broker.listener.name=CLIENT
    listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler
    listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required ;
    listener.name.client.oauthbearer.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
  2. listener.name.client.oauthbearer.sasl.jaas.config の一部としてブローカーの接続設定を行います。

    この例では、接続設定オプションを示しています。

    例 1: JWKS エンドポイント設定を使用したローカルトークンの検証

    listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
      oauth.valid.issuer.uri="https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME" \
      oauth.jwks.endpoint.uri="https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/certs" \
      oauth.jwks.refresh.seconds="300" \
      oauth.jwks.refresh.min.pause.seconds="1" \
      oauth.jwks.expiry.seconds="360" \
      oauth.username.claim="preferred_username" \
      oauth.ssl.truststore.location="PATH-TO-TRUSTSTORE-P12-FILE" \
      oauth.ssl.truststore.password="TRUSTSTORE-PASSWORD" \
      oauth.ssl.truststore.type="PKCS12" ;
    listener.name.client.oauthbearer.connections.max.reauth.ms=3600000

    例 2: OAuth 2.0 イントロスペクションエンドポイントを使用した認可サーバーへのトークン検証の委任

    listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
      oauth.introspection.endpoint.uri="https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/introspection" \
      # ...

  3. 必要な場合は、認可サーバーへのアクセスを設定します。

    この手順は通常、サービスメッシュ などの技術がコンテナーの外部でセキュアなチャネルを設定するために使用される場合を除き、実稼働環境で必要になります。

    1. セキュアな認可サーバーに接続するためのカスタムトラストストアを指定します。認可サーバーへのアクセスには SSL が常に必要になります。

      プロパティーを設定して、トラストストアを設定します。

      以下に例を示します。

      listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
        # ...
        oauth.client.id="kafka-broker" \
        oauth.client.secret="kafka-broker-secret" \
        oauth.ssl.truststore.location="PATH-TO-TRUSTSTORE-P12-FILE" \
        oauth.ssl.truststore.password="TRUSTSTORE-PASSWORD" \
        oauth.ssl.truststore.type="PKCS12" ;
    2. 証明書のホスト名がアクセス URL ホスト名と一致しない場合は、証明書のホスト名の検証をオフにできます。

      oauth.ssl.endpoint.identification.algorithm=""

      このチェックは、認可サーバーへのクライアント接続が認証されるようにします。実稼働以外の環境で検証をオフにすることもできます。

  4. 選択した認証フローに応じて、追加のプロパティーを設定します。

    listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
      # ...
      oauth.token.endpoint.uri="https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/token" \ 1
      oauth.custom.claim.check="@.custom == 'custom-value'" \ 2
      oauth.scope="SCOPE" \ 3
      oauth.check.audience="true" \ 4
      oauth.audience="AUDIENCE" \ 5
      oauth.valid.issuer.uri="https://https://AUTH-SERVER-ADDRESS/auth/REALM-NAME" \ 6
      oauth.client.id="kafka-broker" \ 7
      oauth.client.secret="kafka-broker-secret" \ 8
      oauth.refresh.token="REFRESH-TOKEN-FOR-KAFKA-BROKERS" \ 9
      oauth.access.token="ACCESS-TOKEN-FOR-KAFKA-BROKERS" ; 10
      oauth.connect.timeout.seconds=60 11
      oauth.read.timeout.seconds=60 12
      oauth.groups.claim="$.groups" 13
      oauth.groups.claim.delimiter="," 14
    1
    認可サーバーへの OAuth 2.0 トークンエンドポイント URL。実稼働環境の場合は、常に https:// urls を使用してください。KeycloakRBACAuthorizer を使用する場合、または inter-broker 通信に OAuth 2.0 が有効なリスナーが使用されている場合に必要です。
    2
    (オプション) カスタムクレームチェック。検証時に追加のカスタムルールを JWT アクセストークンに適用する JsonPath フィルタークエリー。アクセストークンに必要なデータが含まれていないと拒否されます。イントロスペクション エンドポイントメソッドを使用する場合は、カスタムチェックがイントロスペクションエンドポイントの応答 JSON に適用されます。
    3
    (オプション) scope パラメーターがトークンエンドポイントに渡されます。スコープ は、inter-broker 認証用にアクセストークンを取得する場合に使用されます。また、clientIdsecret を使用した PLAIN クライアント認証の上にある OAuth 2.0 のクライアント名にも使われています。これは、認可サーバーに応じて、トークンの取得機能とトークンの内容のみに影響します。リスナーによるトークン検証ルールには影響しません。
    4
    (オプション) オーディエンスチェック。認可サーバーが aud (オーディエンス) クレームを提供していて、オーディエンスチェックを実施する場合は、ouath.check.audiencetrue に設定します。オーディエンスチェックによって、トークンの目的の受信者が特定されます。これにより、Kafka ブローカーは aud 要求に clientId を持たないトークンを拒否します。デフォルトは false です。
    5
    (オプション) トークンエンドポイントに渡される audience パラメーター。オーディエンス は、inter-broker 認証用にアクセストークンを取得する場合に使用されます。また、clientIdsecret を使用した PLAIN クライアント認証の上にある OAuth 2.0 のクライアント名にも使われています。これは、認可サーバーに応じて、トークンの取得機能とトークンの内容のみに影響します。リスナーによるトークン検証ルールには影響しません。
    6
    有効な発行者 URI。この発行者が発行するアクセストークンのみが受け入れられます。(常に必要です)
    7
    すべてのブローカーで同一の、Kafka ブローカーの設定されたクライアント ID。これは、kafka-broker として承認サーバーに登録されたクライアントです。イントロスペクションエンドポイントがトークンの検証に使用される場合、または KeycloakRBACAuthorizer が使用される場合に必要です。
    8
    すべてのブローカーで同じ Kafka ブローカーに設定されたシークレット。ブローカーが認証サーバーに対して認証する必要がある場合は、クライアントシークレット、アクセストークン、または更新トークンのいずれかを指定する必要があります。
    9
    (オプション) Kafka ブローカー用の長期間有効な更新トークン。
    10
    (オプション) Kafka ブローカー用の長期間有効なアクセストークン。
    11
    (オプション) 承認サーバーへの接続時のタイムアウト (秒単位)。デフォルト値は 60 です。
    12
    (オプション): 認可サーバーへの接続時の読み取りタイムアウト (秒単位)。デフォルト値は 60 です。
    13
    JWT トークンまたはイントロスペクションエンドポイントの応答からグループ情報を抽出するために使用される JsonPath クエリー。デフォルトでは設定されません。これは、カスタム承認者がユーザーグループに基づいて認可を決定するために使用できます。
    14
    1 つのコンマ区切りの文字列として返されるときにグループ情報を解析するために使用される区切り文字。デフォルト値は ,(コンマ) です。
  5. OAuth 2.0 認証の適用方法、および使用されている認証サーバーのタイプに応じて、設定を追加します。

    listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
      # ...
      oauth.check.issuer=false \ 1
      oauth.fallback.username.claim="CLIENT-ID" \ 2
      oauth.fallback.username.prefix="CLIENT-ACCOUNT" \ 3
      oauth.valid.token.type="bearer" \ 4
      oauth.userinfo.endpoint.uri="https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/userinfo" ; 5
    1
    認可サーバーが iss クレームを提供しない場合は、発行者チェックを行うことができません。このような場合、oauth.check.issuerfalse に設定し、oauth.valid.issuer.uri を指定しないようにします。デフォルトは true です。
    2
    認可サーバーは、通常ユーザーとクライアントの両方を識別する単一の属性を提供しない場合があります。クライアントが独自の名前で認証される場合、サーバーによって クライアント ID が提供されることがあります。更新トークンまたはアクセストークンを取得するために、ユーザー名およびパスワードを使用してユーザーが認証される場合、サーバーによってクライアント ID の他に ユーザー名 が提供されることがあります。プライマリーユーザー ID 属性が使用できない場合は、このフォールバックオプションで、使用するユーザー名クレーム (属性) を指定します。
    3
    oauth.fallback.username.claim が適用される場合、ユーザー名クレームの値とフォールバックユーザー名クレームの値が競合しないようにする必要もあることがあります。producer というクライアントが存在し、producer という通常ユーザーも存在する場合について考えてみましょう。この 2 つを区別するには、このプロパティーを使用してクライアントのユーザー ID に接頭辞を追加します。
    4
    (oauth.introspection.endpoint.uri を使用する場合のみ該当): 使用している認証サーバーによっては、イントロスペクションエンドポイントによって トークンタイプ 属性が返されるかどうかはわからず、異なる値が含まれることがあります。イントロスペクションエンドポイントからの応答に含まれなければならない有効なトークンタイプ値を指定できます。
    5
    (oauth.introspection.endpoint.uri を使用する場合のみ該当): イントロスペクションエンドポイントの応答に識別可能な情報が含まれないように、認可サーバーが設定または実装されることがあります。ユーザー ID を取得するには、userinfo エンドポイントの URI をフォールバックとして設定します。oauth.fallback.username.claimoauth.fallback.username.claim、および oauth.fallback.username.prefix 設定が userinfo エンドポイントの応答に適用されます。
5.4.9.6.3. OAuth 2.0 を使用するための Kafka Java クライアントの設定

この手順では、Kafka ブローカーとの対話に OAuth 2.0 を使用するように Kafka プロデューサーおよびコンシューマー API を設定する方法を説明します。

クライアントコールバックプラグインを pom.xml ファイルに追加し、システムプロパティーを設定します。

前提条件

  • AMQ Streams および Kafka が稼働している。
  • OAuth 2.0 認可サーバーがデプロイされ、Kafka ブローカーへの OAuth のアクセスが設定されている。
  • Kafka ブローカーが OAuth 2.0 に対して設定されている。

手順

  1. OAuth 2.0 サポートのあるクライアントライブラリーを Kafka クライアントの pom.xml ファイルに追加します。

    <dependency>
     <groupId>io.strimzi</groupId>
     <artifactId>kafka-oauth-client</artifactId>
     <version>0.10.0.redhat-00014</version>
    </dependency>
  2. コールバックのシステムプロパティーを設定します。

    以下に例を示します。

    System.setProperty(ClientConfig.OAUTH_TOKEN_ENDPOINT_URI, “https://<auth-server-address>/auth/realms/master/protocol/openid-connect/token”); 1
    System.setProperty(ClientConfig.OAUTH_CLIENT_ID, "<client_name>"); 2
    System.setProperty(ClientConfig.OAUTH_CLIENT_SECRET, "<client_secret>"); 3
    System.setProperty(ClientConfig.OAUTH_SCOPE, "<scope_value>") 4
    System.setProperty(ClientConfig.OAUTH_AUDIENCE, "<audience_value") 5
    1
    承認サーバーのトークンエンドポイントの URI です。
    2
    クライアント ID。認可サーバーで クライアント を作成するときに使用される名前です。
    3
    認可サーバーで クライアント を作成するときに作成されるクライアントシークレット。
    4
    (オプション): トークンエンドポイントからトークンを要求するための scope。認可サーバーでは、クライアントによるスコープの指定が必要になることがあります。
    5
    (オプション) トークンエンドポイントからトークンを要求するための audience。認可サーバーでは、クライアントによるオーディエンスの指定が必要になることがあります。
  3. Kafka クライアント設定の TLS で暗号化された接続で OAUTHBEARER または PLAIN メカニズムを有効にします。

    以下に例を示します。

    Kafka クライアントの OAUTHBEARER の有効化

    props.put("sasl.jaas.config", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;");
    props.put("security.protocol", "SASL_SSL");
    props.put("sasl.mechanism", "OAUTHBEARER");
    props.put("sasl.login.callback.handler.class", "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler");

    Kafka クライアントの PLAIN の有効化

    props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$CLIENT_ID_OR_ACCOUNT_NAME\" password=\"$SECRET_OR_ACCESS_TOKEN\" ;");
    props.put("security.protocol", "SASL_SSL"); 1
    props.put("sasl.mechanism", "PLAIN");

    1
    この例では、TLS 接続で SASL_SSL を使用します。ローカル開発のみでは、暗号化されていない接続で SASL_PLAINTEXT を使用します。
  4. Kafka クライアントが Kafka ブローカーにアクセスできることを確認します。

5.4.10. OAuth 2.0 トークンベース承認の使用

トークンベースの認証に OAuth 2.0 と Red Hat Single Sign-On を使用している場合、Red Hat Single Sign-On を使用して認可ルールを設定し、Kafka ブローカーへのクライアントのアクセスを制限することもできます。認証はユーザーのアイデンティティーを確立します。認可は、そのユーザーのアクセスレベルを決定します。

AMQ Streams は、Red Hat Single Sign-On の 認可サービス による OAuth 2.0 トークンベースの認可をサポートします。これにより、セキュリティーポリシーとパーミッションの一元的な管理が可能になります。

Red Hat Single Sign-On で定義されたセキュリティーポリシーおよびパーミッションは、Kafka ブローカーのリソースへのアクセスを付与するために使用されます。ユーザーとクライアントは、Kafka ブローカーで特定のアクションを実行するためのアクセスを許可するポリシーに対して照合されます。

Kafka では、デフォルトですべてのユーザーがブローカーに完全アクセスできます。また、アクセス制御リスト (ACL) を基にして承認を設定するために AclAuthorizer プラグインが提供されます。

ZooKeeper には、ユーザー名 を基にしてリソースへのアクセスを付与または拒否する ACL ルールが保存されます。ただし、Red Hat Single Sign-On を使用した OAuth 2.0 トークンベースの承認では、より柔軟にアクセス制御を Kafka ブローカーに実装できます。さらに、Kafka ブローカーで OAuth 2.0 の認可および ACL が使用されるように設定することができます。

5.4.10.1. OAuth 2.0 の認可メカニズム

AMQ Streams の OAuth 2.0 での認可では、Red Hat Single Sign-On サーバーの Authorization Services REST エンドポイントを使用して、Red Hat Single Sign-On を使用するトークンベースの認証が拡張されます。これは、定義されたセキュリティーポリシーを特定のユーザーに適用し、そのユーザーの異なるリソースに付与されたパーミッションのリストを提供します。ポリシーはロールとグループを使用して、パーミッションをユーザーと照合します。OAuth 2.0 の認可では、Red Hat Single Sign-On の Authorization Services から受信した、ユーザーに付与された権限のリストを基にして、権限がローカルで強制されます。

5.4.10.1.1. Kafka ブローカーのカスタム authorizer

AMQ Streams では、Red Hat Single Sign-On の authorizer (KeycloakRBACAuthorizer) が提供されます。Red Hat Single Sign-On によって提供される Authorization Services で Red Hat Single Sign-On REST エンドポイントを使用できるようにするには、Kafka ブローカーでカスタム authorizer を設定します。

authorizer は必要に応じて付与された権限のリストを認可サーバーから取得し、ローカルで Kafka ブローカーに認可を強制するため、クライアントの要求ごとに迅速な認可決定が行われます。

5.4.10.2. OAuth 2.0 認可サポートの設定

この手順では、Red Hat Single Sign-On の Authorization Services を使用して、OAuth 2.0 認可を使用するように Kafka ブローカーを設定する方法を説明します。

作業を開始する前に

特定のユーザーに必要なアクセス、または制限するアクセスについて検討してください。Red Hat Single Sign-On では、Red Hat Single Sign-On の グループロールクライアント、および ユーザー の組み合わせを使用して、アクセスを設定できます。

通常、グループは組織の部門または地理的な場所を基にしてユーザーを照合するために使用されます。また、ロールは職務を基にしてユーザーを照合するために使用されます。

Red Hat Single Sign-On を使用すると、ユーザーおよびグループを LDAP で保存できますが、クライアントおよびロールは LDAP で保存できません。ユーザーデータへのアクセスとストレージを考慮して、認可ポリシーの設定方法を選択する必要がある場合があります。

注記

スーパーユーザー は、Kafka ブローカーに実装された承認にかかわらず、常に制限なく Kafka ブローカーにアクセスできます。

前提条件

  • AMQ Streams は、トークンベースの認証 に Red Hat Single Sign-On と OAuth 2.0 を使用するように設定されている必要がある。承認を設定するときに、同じ Red Hat Single Sign-On サーバーエンドポイントを使用する必要があります。
  • Red Hat Single Sign-On のドキュメント の説明にあるように、Red Hat Single Sign-On の Authorization Services のポリシーおよびパーミッションを管理する方法を理解している必要がある。

手順

  1. Red Hat Single Sign-On の Admin Console にアクセスするか、Red Hat Single Sign-On の Admin CLI を使用して、OAuth 2.0 認証の設定時に作成した Kafka ブローカークライアントの Authorization Services を有効にします。
  2. 認可サービスを使用して、クライアントのリソース、認可スコープ、ポリシー、およびパーミッションを定義します。
  3. ロールとグループをユーザーとクライアントに割り当てて、パーミッションをユーザーとクライアントにバインドします。
  4. Red Hat Single Sign-On 承認を使用するように Kafka ブローカーを設定します。

    以下を Kafka server.properties 設定ファイルに追加し、Kafka に authorizer をインストールします。

    authorizer.class.name=io.strimzi.kafka.oauth.server.authorizer.KeycloakRBACAuthorizer
    principal.builder.class=io.strimzi.kafka.oauth.server.authorizer.JwtKafkaPrincipalBuilder
  5. Kafka ブローカーの設定を追加して、認可サーバーおよび Authorization Services にアクセスします。

    ここでは、server.properties への追加プロパティーとして追加される設定例を示しますが、大文字で始める、または大文字の命名規則を使用して、環境変数として定義することもできます。

    strimzi.authorization.token.endpoint.uri="https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/token" 1
    strimzi.authorization.client.id="kafka" 2
    1
    Red Hat Single Sign-On への OAuth 2.0 トークンエンドポイントの URL。実稼働環境の場合は、常に https:// urls を使用してください。
    2
    承認サービスが有効になっている Red Hat Single Sign-On の OAuth 2.0 クライアント定義のクライアント ID。通常、kafka が ID として使用されます。
  6. (オプション) 特定の Kafka クラスターの設定を追加します。

    以下に例を示します。

    strimzi.authorization.kafka.cluster.name="kafka-cluster" 1
    1
    特定の Kafka クラスターの名前。名前はパーミッションをターゲットにするために使用され、同じ Red Hat シングルサインオンレルム内で複数のクラスターを管理できるようにします。デフォルト値は kafka-cluster です。
  7. (オプション) シンプルな承認に委任します。

    以下に例を示します。

    strimzi.authorization.delegate.to.kafka.acl="false" 1
    1
    Red Hat Single Sign-On Authorization Services ポリシーでアクセスが拒否された場合、Kafka AclAuthorizer に権限を委任します。デフォルトは false です。
  8. (オプション) TLS 接続の設定を認可サーバーに追加します。

    以下に例を示します。

    strimzi.authorization.ssl.truststore.location=<path-to-truststore> 1
    strimzi.authorization.ssl.truststore.password=<my-truststore-password> 2
    strimzi.authorization.ssl.truststore.type=JKS 3
    strimzi.authorization.ssl.secure.random.implementation=SHA1PRNG 4
    strimzi.authorization.ssl.endpoint.identification.algorithm=HTTPS 5
    1
    証明書が含まれるトラストストアへのパス。
    2
    トラストストアのパスワード。
    3
    トラストストアのタイプ。設定されていない場合は、デフォルトの Java キーストアタイプが使用されます。
    4
    乱数ジェネレーターの実装。設定されていない場合は、Java プラットフォーム SDK デフォルトが使用されます。
    5
    ホスト名の検証。空の文字列に設定すると、ホスト名の検証はオフになります。設定されていない場合、デフォルト値は HTTPS で、サーバー証明書のホスト名の検証を強制します。
  9. (オプション) 認可サーバーからの付与の更新を設定します。付与更新ジョブは、アクティブなトークンを列挙し、それぞれに最新の付与を要求することで機能します。

    以下に例を示します。

    strimzi.authorization.grants.refresh.period.seconds="120" 1
    strimzi.authorization.grants.refresh.pool.size="10" 2
    1
    認可サーバーからの付与のリストが更新される頻度を指定します (デフォルトでは 1 分に 1 回)。デバッグの目的で付与の更新をオフにするには、"0" に設定します。
    2
    付与更新ジョブで使用されるスレッドプールのサイズ (並列度) を指定します。デフォルト値は "5" です。
  10. クライアントまたは特定のロールを持つユーザーとして Kafka ブローカーにアクセスして、設定したパーミッションを検証し、必要なアクセス権限があり、付与されるべきでないアクセス権限がないことを確認します。

5.4.11. OPA ポリシーベースの承認の使用

Open Policy Agent (OPA) は、オープンソースのポリシーエンジンです。OPA と AMQ Streams を統合して、Kafka ブローカーでのクライアント操作を許可するポリシーベースの承認メカニズムとして機能します。

クライアントからリクエストが実行されると、OPA は Kafka アクセスに定義されたポリシーに対してリクエストを評価し、リクエストを許可または拒否します。

注記

Red Hat は OPA サーバーをサポートしません。

5.4.11.1. OPA ポリシーの定義

OPA と AMQ Streams を統合する前に、粒度の細かいアクセス制御を提供するポリシーの定義方法を検討してください。

Kafka クラスター、コンシューマーグループ、およびトピックのアクセス制御を定義できます。たとえば、プロデューサークライアントから特定のブローカートピックへの書き込みアクセスを許可する認可ポリシーを定義できます。

このポリシーでは、以下の項目を指定することができます。

  • プロデューサークライアントに関連付けられた ユーザープリンシパル および ホストアドレス
  • クライアントに許可される 操作
  • ポリシーが適用される リソースタイプ (topic) および リソース名

許可と拒否の決定がポリシーに書き込まれ、提供された要求とクライアント識別データに基づいて応答が提供されます。

この例では、プロデューサークライアントはトピックへの書き込みが許可されるポリシーを満たす必要があります。

5.4.11.2. OPA への接続

Kafka が OPA ポリシーエンジンにアクセスしてアクセス制御ポリシーをクエリーできるようにするには、Kafka server.properties ファイルでカスタム OPA authorizer プラグイン (kafka-authorizer-opa-VERSION.jar) を設定します。

クライアントがリクエストを行うと、OPA ポリシーエンジンは、指定された URL アドレスと REST エンドポイントを使用してプラグインによってクエリーされます。これは、定義されたポリシーの名前でなければなりません。

プラグインは、ポリシーに対してチェックされる JSON 形式で、クライアント要求の詳細 (ユーザープリンシパル、操作、およびリソース) を提供します。詳細には、クライアントの一意のアイデンティティーが含まれます。たとえば、TLS 認証が使用される場合にクライアント証明書からの識別名を取ります。

OPA はデータを使用して、リクエストを許可または拒否するためにプラグインに true または false のいずれかの応答を提供します。

5.4.11.3. OPA 承認サポートの設定

この手順では、OPA 承認を使用するように Kafka ブローカーを設定する方法を説明します。

作業を開始する前に

特定のユーザーに必要なアクセス、または制限するアクセスについて検討してください。ユーザー リソースと Kafka リソース の組み合わせを使用して、OPA ポリシーを定義できます。

OPA を設定して、LDAP データソースからユーザー情報を読み込むことができます。

注記

スーパーユーザー は、Kafka ブローカーに実装された承認にかかわらず、常に制限なく Kafka ブローカーにアクセスできます。

前提条件

手順

  1. Kafka ブローカーで操作を実行するため、クライアントリクエストの承認に必要な OPA ポリシーを記述します。

    OPA ポリシーの定義 を参照してください。

    これで、Kafka ブローカーが OPA を使用するように設定します。

  2. Kafka の OPA authorizer プラグイン をインストールします。

    OPA への接続 を参照してください。

    プラグインファイルが Kafka クラスパスに含まれていることを確認してください。

  3. 以下を Kafka server.properties 設定ファイルに追加し、OPA プラグインを有効にします。

    authorizer.class.name: com.bisnode.kafka.authorization.OpaAuthorizer
  4. Kafka ブローカーの server.properties に設定をさらに追加して、OPA ポリシーエンジンおよびポリシーにアクセスします。

    以下に例を示します。

    opa.authorizer.url=https://OPA-ADDRESS/allow 1
    opa.authorizer.allow.on.error=false 2
    opa.authorizer.cache.initial.capacity=50000 3
    opa.authorizer.cache.maximum.size=50000 4
    opa.authorizer.cache.expire.after.seconds=600000 5
    super.users=User:alice;User:bob 6
    1
    (必須) authorizer プラグインがクエリーするポリシーの OAuth 2.0 トークンエンドポイント URL。この例では、ポリシーは allow という名前です。
    2
    authorizer プラグインが OPA ポリシーエンジンとの接続に失敗した場合に、クライアントがデフォルトで許可または拒否されるかどうかを指定するフラグ。
    3
    ローカルキャッシュの初期容量 (バイト単位)。すべてのリクエストについてプラグインに OPA ポリシーエンジンをクエリーする必要がないように、キャッシュが使用されます。
    4
    ローカルキャッシュの最大容量 (バイト単位)。
    5
    OPA ポリシーエンジンからのリロードによってローカルキャッシュが更新される時間 (ミリ秒単位)。
    6
    スーパーユーザーとして扱われるユーザープリンシパルのリスト。これにより、Open Policy Agent ポリシーをクエリーしなくても常に許可されます。

    認証および認可オプションの詳細は、Open Policy Agent の Web サイト を参照してください。

  5. 正しい承認を持つクライアントと持たないクライアントを使用して、Kafka ブローカーにアクセスして、設定したパーミッションを検証します。

5.4.12. ロギング

Kafka ブローカーは Log4j をロギングインフラストラクチャーとして使用します。デフォルトでは、ロギング設定は /opt/kafka/config/ ディレクトリーまたはクラスパスのいずれかに配置される log4j.properties 設定ファイルから読み取られます。設定ファイルの場所と名前は、Java プロパティー log4j.configuration を使用して変更できます。これは、KAFKA_LOG4J_OPTS 環境変数を使用して Kafka に渡すことができます。

su - kafka
export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:/my/path/to/log4j.config"; /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties

Log4j の設定に関する詳細は、Log4j のマニュアル を参照してください。

5.4.12.1. Kafka ブローカーロガーのロギングレベルの動的な変更

Kafka ブローカーロギングは、複数の 各ブローカーのブローカーロガー によって提供されます。ブローカーを再起動することなく、ブローカーロガーのロギングレベルを動的に変更できます。ログで返される詳細度レベルを上げると (たとえば、INFO から DEBUG に変更)、Kafka クラスターでパフォーマンスの問題を調査するのに役立ちます。

ブローカーロガーは、デフォルトのロギングレベルに動的にリセットすることもできます。

手順

  1. kafka ユーザーに切り替えます。

    su - kafka
  2. kafka-configs.sh ツールを使用して、ブローカーのブローカーロガーのリストを表示します。

    /opt/kafka/bin/kafka-configs.sh --bootstrap-server <broker_address> --describe --entity-type broker-loggers --entity-name BROKER-ID

    たとえば、ブローカー 0 の場合:

    /opt/kafka/bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type broker-loggers --entity-name 0

    これにより、TRACEDEBUGINFOWARNERROR、または FATAL の各ロガーのログレベルが返されます。以下に例を示します。

    #...
    kafka.controller.ControllerChannelManager=INFO sensitive=false synonyms={}
    kafka.log.TimeIndex=INFO sensitive=false synonyms={}
  3. 1 つ以上のブローカーロガーのログレベルを変更します。--alter および --add-config オプションを使用して、各ロガーとそのレベルを二重引用符のコンマ区切りリストとして指定します。

    /opt/kafka/bin/kafka-configs.sh --bootstrap-server <broker_address> --alter --add-config "LOGGER-ONE=NEW-LEVEL,LOGGER-TWO=NEW-LEVEL" --entity-type broker-loggers --entity-name BROKER-ID

    たとえば、ブローカー 0 の場合:

    /opt/kafka/bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config "kafka.controller.ControllerChannelManager=WARN,kafka.log.TimeIndex=WARN" --entity-type broker-loggers --entity-name 0

    成功すると、以下が返されます。

    Completed updating config for broker: 0.
ブローカーロガーのリセット

kafka-configs.sh ツールを使用して、1 つ以上のブローカーロガーをデフォルトのログレベルにリセットできます。--alter および --delete-config オプションを使用して、各ブローカーロガーを二重引用符のコンマ区切りリストとして指定します。

/opt/kafka/bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --delete-config "LOGGER-ONE,LOGGER-TWO" --entity-type broker-loggers --entity-name BROKER-ID

関連情報

Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。 最新の更新を見る.

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.