6.3. 認証
Kafka クラスターへのクライアント接続を認証するには、次のオプションを使用できます。
- TLS クライアント認証
- 暗号化接続で X.509 証明書を使用する TLS (Transport Layer Security)
- Kafka SASL
- サポートされる認証メカニズムを使用した Kafka SASL (Simple Authentication and Security Layer)
- OAuth 2.0
- OAuth 2.0 のトークンベースの認証
SASL 認証は、暗号化されていないプレーン接続と TLS 接続の両方のさまざまなメカニズムをサポートします。
-
PLAIN
- ユーザー名とパスワードに基づく認証。 -
SCRAM-SHA-256
およびSCRAM-SHA-512
- Salted Challenge Response Authentication Mechanism (SCRAM) を使用した認証。 -
GSSAPI
- Kerberos サーバーに対する認証。
PLAIN
メカニズムは、ネットワークを通じてユーザー名とパスワードを暗号化されていない形式で送信します。TLS による暗号化と組み合わせる場合にのみ使用してください。
6.3.1. TLS クライアント認証の有効化
Kafka ブローカーで TLS クライアント認証を有効にして、すでに TLS 暗号化を使用している Kafka ノードへの接続のセキュリティーを強化します。
ssl.client.auth
プロパティーを使用して、次のいずれかの値で TLS 認証を設定します。
-
none
- TLS クライアント認証はオフです (デフォルト)。 -
requested
- TLS クライアント認証は任意です。 -
required
- クライアントは TLS クライアント証明書を使用して認証する必要があります。
クライアントが TLS クライアント認証を使用して認証する場合、認証されたプリンシパル名はクライアント証明書内の識別名から派生したものになります。たとえば、CN=someuser
という識別名の証明書を持つユーザーは、プリンシパル CN=someuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown
で認証されます。このプリンシパル名は、認証されたユーザーまたはエンティティーの一意の識別子を提供します。TLS クライアント認証が使用されておらず、SASL が無効な場合、プリンシパル名はデフォルトで ANONYMOUS
になります。
前提条件
- Streams for Apache Kafka が 各ホストにインストールされており、設定ファイルが使用可能である。
- TLS 暗号化が 有効になっている。
手順
- ユーザー証明書の署名に使用される CA (認証局) の公開鍵を含む JKS (Java Keystore) トラストストアを準備します。
すべてのクラスターノードの Kafka 設定プロパティーファイルを次のように編集します。
-
ssl.truststore.location
プロパティーを使用して JKS トラストストアへのパスを指定します。 -
トラストストアがパスワードで保護される場合は、
ssl.truststore.password
プロパティーを使用してパスワードを設定します。 ssl.client.auth
プロパティーをrequired
に設定します。TLS クライアント認証の設定
ssl.truststore.location=/path/to/truststore.jks ssl.truststore.password=123456 ssl.client.auth=required
-
- Kafka ブローカーを (再) 起動します。
6.3.2. SASL PLAIN クライアント認証の有効化
Kafka で SASL PLAIN 認証を有効にして、Kafka ノードへの接続のセキュリティーを強化します。
SASL 認証は、KafkaServer
JAAS コンテキストを使用して、Java Authentication and Authorization Service (JAAS) を通じて有効にします。JAAS 設定は、専用のファイルで定義することも、Kafka 設定で直接定義することもできます。
推奨される専用ファイルの場所は /opt/kafka/config/jaas.conf
です。kafka
ユーザーがファイルを読み取れることを確認してください。すべての Kafka ノードで JAAS 設定ファイルの同期を維持します。
前提条件
- Streams for Apache Kafka が 各ホストにインストールされており、設定ファイルが使用可能である。
手順
/opt/kafka/config/jaas.conf
JAAS 設定ファイルを編集または作成して、PlainLoginModule
を有効にし、許可されるユーザー名とパスワードを指定します。このファイルがすべての Kafka ブローカーで同じであることを確認します。
JAAS 設定
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required user_admin="123456" user_user1="123456" user_user2="123456"; };
すべてのクラスターノードの Kafka 設定プロパティーファイルを次のように編集します。
-
listener.security.protocol.map
プロパティーを使用して、特定のリスナーで SASL PLAIN 認証を有効にします。SASL_PLAINTEXT
またはSASL_SSL
を指定します。 sasl.enabled.mechanisms
プロパティーをPLAIN
に設定します。SASL plain 設定
listeners=INSECURE://:9092,AUTHENTICATED://:9093,REPLICATION://:9094 listener.security.protocol.map=INSECURE:PLAINTEXT,AUTHENTICATED:SASL_PLAINTEXT,REPLICATION:PLAINTEXT sasl.enabled.mechanisms=PLAIN
-
KAFKA_OPTS
環境変数を使用して Kafka ブローカーを (再) 起動し、JAAS 設定を Kafka ブローカーに渡します。su - kafka export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/config/jaas.conf"; /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
6.3.3. SASL SCRAM クライアント認証の有効化
Kafka で SASL SCRAM 認証を有効にして、Kafka ノードへの接続のセキュリティーを強化します。
SASL 認証は、KafkaServer
JAAS コンテキストを使用して、Java Authentication and Authorization Service (JAAS) を通じて有効にします。JAAS 設定は、専用のファイルで定義することも、Kafka 設定で直接定義することもできます。
推奨される専用ファイルの場所は /opt/kafka/config/jaas.conf
です。kafka
ユーザーがファイルを読み取れることを確認してください。すべての Kafka ノードで JAAS 設定ファイルの同期を維持します。
前提条件
- Streams for Apache Kafka が 各ホストにインストールされており、設定ファイルが使用可能である。
手順
/opt/kafka/config/jaas.conf
JAAS 設定ファイルを編集または作成して、ScramLoginModule
を有効にします。このファイルがすべての Kafka ブローカーで同じであることを確認します。
JAAS 設定
KafkaServer { org.apache.kafka.common.security.scram.ScramLoginModule required; };
すべてのクラスターノードの Kafka 設定プロパティーファイルを次のように編集します。
-
listener.security.protocol.map
プロパティーを使用して、特定のリスナーで SASL SCRAM 認証を有効にします。SASL_PLAINTEXT
またはSASL_SSL
を指定します。 sasl.enabled.mechanisms
オプションをSCRAM-SHA-256
またはSCRAM-SHA-512
に設定します。以下に例を示します。
listeners=INSECURE://:9092,AUTHENTICATED://:9093,REPLICATION://:9094 listener.security.protocol.map=INSECURE:PLAINTEXT,AUTHENTICATED:SASL_PLAINTEXT,REPLICATION:PLAINTEXT sasl.enabled.mechanisms=SCRAM-SHA-512
-
KAFKA_OPTS
環境変数を使用して Kafka ブローカーを (再) 起動し、JAAS 設定を Kafka ブローカーに渡します。su - kafka export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/config/jaas.conf"; /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
6.3.4. 複数の SASL メカニズムの有効化
SASL 認証を使用する場合、複数のメカニズムを有効にすることができます。Kafka は複数の SASL メカニズムを同時に使用できます。複数のメカニズムが有効な場合、特定のクライアントが使用するメカニズムを選択できます。
複数のメカニズムを使用するには、各メカニズムに必要な設定をセットアップします。sasl.mechanism.inter.broker.protocol
プロパティーを使用して、異なる KafkaServer
JAAS 設定を同じコンテキストに追加し、Kafka 設定内の複数のメカニズムをコンマ区切りのリストとして有効にできます。
複数の SASL メカニズムの JAAS 設定
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required user_admin="123456" user_user1="123456" user_user2="123456"; com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/etc/security/keytabs/kafka_server.keytab" principal="kafka/kafka1.hostname.com@EXAMPLE.COM"; org.apache.kafka.common.security.scram.ScramLoginModule required; };
SASL メカニズムの有効化
sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512
6.3.5. ブローカー間認証のための SASL の有効化
Kafka ノード間の SASL SCRAM 認証を有効にして、ブローカー間接続のセキュリティーを強化します。Kafka クラスターへのクライアント接続に SASL 認証を使用するだけでなく、ブローカー間認証にも SASL を使用できます。クライアント接続用の SASL とは異なり、ブローカー間通信用に選択できるメカニズムは 1 つだけです。
前提条件
- ZooKeeper が 各ホストにインストールされており、設定ファイルが使用可能である。
SCRAM メカニズムを使用している場合は、Kafka クラスターに SCRAM 認証情報を登録します。
Kafka クラスターのすべてのノードについて、inter-broker SASL SCRAM ユーザーを ZooKeeper に追加します。これにより、Kafka クラスターの実行前に、認証用の認証情報がブートストラップ用に更新されるようになります。
ブローカー間 SASL SCRAM ユーザーの登録
bin/kafka-configs.sh \ --zookeeper localhost:2181 \ --alter \ --add-config 'SCRAM-SHA-512=[password=changeit]' \ --entity-type users \ --entity-name kafka
手順
sasl.mechanism.inter.broker.protocol
プロパティーを使用して、Kafka 設定でブローカー間 SASL メカニズムを指定します。ブローカー間 SASL メカニズム
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
(オプション) SCRAM メカニズムを使用している場合は、SCRAM ユーザーを追加 して、Kafka クラスターに SCRAM 認証情報を登録します。
これにより、Kafka クラスターの実行前に、認証用の認証情報がブートストラップ用に更新されるようになります。
username
フィールドとpassword
フィールドを使用して、KafkaServer
JAAS コンテキストでのブローカー間通信用のユーザー名とパスワードを指定します。ブローカー間 JAAS コンテキスト
KafkaServer { org.apache.kafka.common.security.plain.ScramLoginModule required username="admin" password="123456" # ... };
6.3.6. SASL SCRAM ユーザーの追加
この手順では、Kafka で SASL SCRAM を使用して認証用に新しいユーザーを登録するステップの概要を説明します。SASL SCRAM 認証は、クライアント接続のセキュリティーを強化します。
前提条件
- Streams for Apache Kafka が 各ホストにインストールされており、設定ファイルが使用可能である。
- SASL SCRAM 認証が 有効になっている。
手順
kafka-configs.sh
ツールを使用して、新しい SASL SCRAM ユーザーを追加します。/opt/kafka/kafka-configs.sh \ --bootstrap-server <broker_host>:<port> \ --alter \ --add-config 'SCRAM-SHA-512=[password=<password>]' \ --entity-type users --entity-name <username>
以下に例を示します。
/opt/kafka/kafka-configs.sh \ --bootstrap-server localhost:9092 \ --alter \ --add-config 'SCRAM-SHA-512=[password=123456]' \ --entity-type users \ --entity-name user1
6.3.7. SASL SCRAM ユーザーの削除
この手順では、Kafka で SASL SCRAM を使用して認証用に登録したユーザーを削除するステップの概要を説明します。
前提条件
- Streams for Apache Kafka が 各ホストにインストールされており、設定ファイルが使用可能である。
- SASL SCRAM 認証が 有効になっている。
手順
kafka-configs.sh
ツールを使用して SASL SCRAM ユーザーを削除します。/opt/kafka/bin/kafka-configs.sh \ --bootstrap-server <broker_host>:<port> \ --alter \ --delete-config 'SCRAM-SHA-512' \ --entity-type users \ --entity-name <username>
以下に例を示します。
/opt/kafka/bin/kafka-configs.sh \ --bootstrap-server localhost:9092 \ --alter \ --delete-config 'SCRAM-SHA-512' \ --entity-type users \ --entity-name user1
6.3.8. Kerberos (GSSAPI) 認証の有効化
Streams for Apache Kafka は、Kafka クラスターへのセキュアなシングルサインオンアクセスのために、Kerberos (GSSAPI) 認証プロトコルの使用をサポートしています。GSSAPI は、Kerberos 機能の API ラッパーで、基盤の実装の変更からアプリケーションを保護します。
Kerberos は、対称暗号化と信頼できるサードパーティーの Kerberos Key Distribution Centre (KDC) を使用して、クライアントとサーバーが相互に認証できるようにするネットワーク認証システムです。
この手順では、Kafka クライアントが Kerberos (GSSAPI) 認証を使用して Kafka および ZooKeeper にアクセスできるように Streams for Apache Kafka を設定する方法を説明します。
この手順では、Kerberos krb5 リソースサーバーが Red Hat Enterprise Linux ホストに設定されていることを前提としています。
この手順では、例を用いて以下の設定方法を説明します。
- サービスプリンシパル
- Kafka ブローカー (Kerberos ログインを使用するため)
- ZooKeeper (Kerberos ログインを使用するため)
- プロデューサーおよびコンシューマークライアント (Kerberos 認証を使用して Kafka にアクセスするため)
この手順では、単一のホストでの単一の ZooKeeper および Kafka インストールの Kerberos 設定方法を、プロデューサーおよびコンシューマークライアントの追加設定方法を説明します。
前提条件
Kafka および ZooKeeper が Kerberos クレデンシャルを認証および承認するように設定できるようにするには、以下が必要です。
- Kerberos サーバーへのアクセス
- 各 Kafka ブローカーホストの Kerberos クライアント
Kerberos サーバー、およびブローカーホストのクライアントを設定する手順の詳細は、AMQ Streams on RHEL - Example Kerberos set up configuration を参照してください。
認証用のサービスプリンシパルの追加
Kerberos サーバーから、ZooKeeper、Kafka ブローカー、Kafka プロデューサーおよびコンシューマークライアントのサービスプリンシパル (ユーザー) を作成します。
サービスプリンシパルの形式は SERVICE-NAME/FULLY-QUALIFIED-HOST-NAME@DOMAIN-REALM にする必要があります。
Kerberos KDC を使用してサービスプリンシパルと、プリンシパルキーを保存するキータブを作成します。
Kerberos プリンシパルのドメイン名が大文字であることを確認してください。
以下に例を示します。
-
zookeeper/node1.example.redhat.com@EXAMPLE.REDHAT.COM
-
kafka/node1.example.redhat.com@EXAMPLE.REDHAT.COM
-
producer1/node1.example.redhat.com@EXAMPLE.REDHAT.COM
consumer1/node1.example.redhat.com@EXAMPLE.REDHAT.COM
ZooKeeper サービスプリンシパルは、Kafka
config/server.properties
ファイルのzookeeper.connect
設定と同じホスト名にする必要があります。zookeeper.connect=node1.example.redhat.com:2181
ホスト名が異なる場合は localhost が使用されるため、認証に失敗します。
-
ホストにディレクトリーを作成し、キータブファイルを追加します。
以下に例を示します。
/opt/kafka/krb5/zookeeper-node1.keytab /opt/kafka/krb5/kafka-node1.keytab /opt/kafka/krb5/kafka-producer1.keytab /opt/kafka/krb5/kafka-consumer1.keytab
kafka
ユーザーがディレクトリーにアクセスできることを確認します。chown kafka:kafka -R /opt/kafka/krb5
ZooKeeper を設定して Kerberos ログインを使用
認証に Kerberos Key Distribution Center (KDC) を使用するように zookeeper
に作成したユーザープリンシパルとキータブを使用して ZooKeeper を設定します。
opt/kafka/config/jaas.conf
ファイルを作成または変更して、ZooKeeper クライアントおよびサーバー操作をサポートします。Client { com.sun.security.auth.module.Krb5LoginModule required debug=true useKeyTab=true 1 storeKey=true 2 useTicketCache=false 3 keyTab="/opt/kafka/krb5/zookeeper-node1.keytab" 4 principal="zookeeper/node1.example.redhat.com@EXAMPLE.REDHAT.COM"; 5 }; Server { com.sun.security.auth.module.Krb5LoginModule required debug=true useKeyTab=true storeKey=true useTicketCache=false keyTab="/opt/kafka/krb5/zookeeper-node1.keytab" principal="zookeeper/node1.example.redhat.com@EXAMPLE.REDHAT.COM"; }; QuorumServer { com.sun.security.auth.module.Krb5LoginModule required debug=true useKeyTab=true storeKey=true keyTab="/opt/kafka/krb5/zookeeper-node1.keytab" principal="zookeeper/node1.example.redhat.com@EXAMPLE.REDHAT.COM"; }; QuorumLearner { com.sun.security.auth.module.Krb5LoginModule required debug=true useKeyTab=true storeKey=true keyTab="/opt/kafka/krb5/zookeeper-node1.keytab" principal="zookeeper/node1.example.redhat.com@EXAMPLE.REDHAT.COM"; };
- 1
true
に設定し、キータブからプリンシパルキーを取得します。- 2
true
に設定し、プリンシパルキーを保存します。- 3
true
に設定し、チケットキャッシュから Ticket Granting Ticket (TGT) を取得します。- 4
keyTab
プロパティーは、Kerberos KDC からコピーされた keytab ファイルの場所を示します。その場所とファイルは、kafka
ユーザーが読み取り可能になものにする必要があります。- 5
principal
プロパティーは、KDC ホストで作成された完全修飾プリンシパル名と一致するように設定され、その形式はSERVICE-NAME/FULLY-QUALIFIED-HOST-NAME@DOMAIN-NAME
に従います。
opt/kafka/config/zookeeper.properties
を編集して、更新された JAAS 設定を使用します。# ... requireClientAuthScheme=sasl jaasLoginRenew=3600000 1 kerberos.removeHostFromPrincipal=false 2 kerberos.removeRealmFromPrincipal=false 3 quorum.auth.enableSasl=true 4 quorum.auth.learnerRequireSasl=true 5 quorum.auth.serverRequireSasl=true quorum.auth.learner.loginContext=QuorumLearner 6 quorum.auth.server.loginContext=QuorumServer quorum.auth.kerberos.servicePrincipal=zookeeper/_HOST 7 quorum.cnxn.threads.size=20
- 1
- ログイン更新の頻度をミリ秒単位で制御します。これは、チケットの更新間隔に合わせて調整できます。デフォルトは 1 時間です。
- 2
- ホスト名がログインプリンシパル名の一部として使用されるかどうかを指定します。クラスターのすべてのノードで単一の keytab を使用する場合、これは
true
に設定されます。ただし、トラブルシューティングのために、各ブローカーホストに個別のキータブと完全修飾プリンシパルを生成することが推奨されます。 - 3
- Kerberos ネゴシエーションのプリンシパル名からレルム名を削除するかどうかを制御します。この設定は、
false
にすることを推奨します。 - 4
- ZooKeeper サーバーおよびクライアントの SASL 認証メカニズムを有効にします。
- 5
RequireSasl
プロパティーは、マスター選出などのクォーラムイベントに SASL 認証を必要とするかどうかを制御します。- 6
loginContext
プロパティーは、指定されたコンポーネントの認証設定に使用される JAAS 設定のログインコンテキストの名前を識別します。loginContext 名は、opt/kafka/config/jaas.conf
ファイルの関連セクションの名前に対応します。- 7
- 識別に使用されるプリンシパル名を形成するために使用される命名規則を制御します。プレースホルダー
_HOST
は、実行時にserver.1
プロパティーによって定義されたホスト名に自動的に解決されます。
JVM パラメーターで ZooKeeper を起動し、Kerberos ログイン設定を指定します。
su - kafka export EXTRA_ARGS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/opt/kafka/config/jaas.conf"; /opt/kafka/bin/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties
デフォルトのサービス名 (
zookeeper
) を使用していない場合は、-Dzookeeper.sasl.client.username=NAME
パラメーターを使用して名前を追加します。注記/etc/krb5.conf
を場所として使用している場合は、ZooKeeper、Kafka、Kafka プロデューサーおよびコンシューマーの起動時に-Djava.security.krb5.conf=/etc/krb5.conf
を指定する必要はありません。
Kafka ブローカーサーバー設定して Kerberos ログインを使用
認証に Kerberos Key Distribution Center (KDC) を使用するように kafka
に作成したユーザープリンシパルとキータブを使用して Kafka を設定します。
opt/kafka/config/jaas.conf
ファイルを以下の要素で修正します。KafkaServer { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/opt/kafka/krb5/kafka-node1.keytab" principal="kafka/node1.example.redhat.com@EXAMPLE.REDHAT.COM"; }; KafkaClient { com.sun.security.auth.module.Krb5LoginModule required debug=true useKeyTab=true storeKey=true useTicketCache=false keyTab="/opt/kafka/krb5/kafka-node1.keytab" principal="kafka/node1.example.redhat.com@EXAMPLE.REDHAT.COM"; };
Kafka クラスターの各ブローカーを設定するには、
config/server.properties
ファイルのリスナー設定を変更して、リスナーが SASL/GSSAPI ログインを使用するようにします。リスナーのセキュリティープロトコルのマップに SASL プロトコルを追加し、不要なプロトコルを削除します。
以下に例を示します。
# ... broker.id=0 # ... listeners=SECURE://:9092,REPLICATION://:9094 1 inter.broker.listener.name=REPLICATION # ... listener.security.protocol.map=SECURE:SASL_PLAINTEXT,REPLICATION:SASL_PLAINTEXT 2 # .. sasl.enabled.mechanisms=GSSAPI 3 sasl.mechanism.inter.broker.protocol=GSSAPI 4 sasl.kerberos.service.name=kafka 5 ...
- 1
- クライアントとの汎用通信用のセキュアなリスナー (通信用 TLS をサポート)、およびブローカー間通信用のレプリケーションリスナーの 2 つが設定されています。
- 2
- TLS に対応しているリスナーのプロトコル名は SASL_PLAINTEXT になります。コネクターが TLS に対応していない場合、プロトコル名は SASL_PLAINTEXT になります。SSL が必要ない場合は、
ssl.*
プロパティーを削除できます。 - 3
- Kerberos 認証における SASL メカニズムは
GSSAPI
になります。 - 4
- ブローカー間通信の Kerberos 認証。
- 5
- 認証要求に使用するサービスの名前は、同じ Kerberos 設定を使用している他のサービスと区別するために指定されます。
Kafka ブローカーを起動し、JVM パラメーターを使用して Kerberos ログイン設定を指定します。
su - kafka export KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/opt/kafka/config/jaas.conf"; /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
ブローカーおよび ZooKeeper クラスターが設定されていて、Kerberos ベース以外の認証システムで動作している場合は、ZooKeeper およびブローカークラスターを起動し、ログで設定エラーを確認できます。
ブローカーおよび Zookeeper インスタンスを起動すると、Kerberos 認証用にクラスターが設定されました。
Kafka プロデューサーおよびコンシューマークライアントの設定して Kerberos 認証を使用
認証に Kerberos Key Distribution Center (KDC) を使用するように、producer1
および consumer1
に作成したユーザープリンシパルとキータブを使用して Kafka プロデューサーおよびコンシューマークライアントを設定します。
プロデューサーまたはコンシューマー設定ファイルに Kerberos 設定を追加します。
以下に例を示します。
/opt/kafka/config/producer.properties
# ... sasl.mechanism=GSSAPI 1 security.protocol=SASL_PLAINTEXT 2 sasl.kerberos.service.name=kafka 3 sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \ 4 useKeyTab=true \ useTicketCache=false \ storeKey=true \ keyTab="/opt/kafka/krb5/producer1.keytab" \ principal="producer1/node1.example.redhat.com@EXAMPLE.REDHAT.COM"; # ...
/opt/kafka/config/consumer.properties
# ... sasl.mechanism=GSSAPI security.protocol=SASL_PLAINTEXT sasl.kerberos.service.name=kafka sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \ useKeyTab=true \ useTicketCache=false \ storeKey=true \ keyTab="/opt/kafka/krb5/consumer1.keytab" \ principal="consumer1/node1.example.redhat.com@EXAMPLE.REDHAT.COM"; # ...
クライアントを実行して、Kafka ブローカーからメッセージを送受信できることを確認します。
プロデューサークライアント:
export KAFKA_HEAP_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Dsun.security.krb5.debug=true"; /opt/kafka/bin/kafka-console-producer.sh --producer.config /opt/kafka/config/producer.properties --topic topic1 --bootstrap-server node1.example.redhat.com:9094
コンシューマークライアント:
export KAFKA_HEAP_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Dsun.security.krb5.debug=true"; /opt/kafka/bin/kafka-console-consumer.sh --consumer.config /opt/kafka/config/consumer.properties --topic topic1 --bootstrap-server node1.example.redhat.com:9094
関連情報
- Kerberos の man ページ: krb5.conf(5)、kinit(1)、klist(1)、および kdestroy(1)
- AMQ Streams on RHEL - Example Kerberos set up configuration
- AMQ Streams on RHEL - Example Kafka client with Kerberos authentication