第6章 Kafka クラスターへのクライアントアクセスの設定
AMQ Streams のデプロイ 後、本章では以下の操作を行う方法について説明します。
- サンプルプロデューサーおよびコンシューマークライアントをデプロイし、これを使用してデプロイメントを検証する
Kafka クラスターへの外部クライアントアクセスを設定する
OpenShift 外部のクライアントに Kafka クラスターへのアクセスを設定する手順はより複雑です。『AMQ Streams on OpenShift の使用』で説明する Kafka コンポーネントの設定手順 に精通している必要があります。
6.1. サンプルクライアントのデプロイ リンクのコピーリンクがクリップボードにコピーされました!
この手順では、ユーザーが作成した Kafka クラスターを使用してメッセージを送受信するプロデューサーおよびコンシューマークライアントの例をデプロイする方法を説明します。
前提条件
- クライアントが Kafka クラスターを使用できる必要があります。
手順
Kafka プロデューサーをデプロイします。
oc run kafka-producer -ti --image=registry.redhat.io/amq7/amq-streams-kafka-27-rhel7:1.7.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list cluster-name-kafka-bootstrap:9092 --topic my-topic
oc run kafka-producer -ti --image=registry.redhat.io/amq7/amq-streams-kafka-27-rhel7:1.7.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list cluster-name-kafka-bootstrap:9092 --topic my-topicCopy to Clipboard Copied! Toggle word wrap Toggle overflow - プロデューサーが稼働しているコンソールにメッセージを入力します。
- Enter を押してメッセージを送信します。
Kafka コンシューマーをデプロイします。
oc run kafka-consumer -ti --image=registry.redhat.io/amq7/amq-streams-kafka-27-rhel7:1.7.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server cluster-name-kafka-bootstrap:9092 --topic my-topic --from-beginning
oc run kafka-consumer -ti --image=registry.redhat.io/amq7/amq-streams-kafka-27-rhel7:1.7.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server cluster-name-kafka-bootstrap:9092 --topic my-topic --from-beginningCopy to Clipboard Copied! Toggle word wrap Toggle overflow - コンシューマーコンソールに受信メッセージが表示されることを確認します。
6.2. OpenShift 外クライアントのアクセスの設定 リンクのコピーリンクがクリップボードにコピーされました!
以下の手順では、OpenShift 外部からの Kafka クラスターへのクライアントアクセスを設定する方法を説明します。
Kafka クラスターのアドレスを使用して、異なる OpenShift namespace または完全に OpenShift 外のクライアントに外部アクセスを提供できます。
アクセスを提供するために、外部 Kafka リスナーを設定します。
以下のタイプの外部リスナーがサポートされます。
-
OpenShift
Routeおよびデフォルトの HAProxy ルーターを使用するroute -
ロードバランサーサービスを使用する
loadbalancer -
OpenShift ノードのポートを使用する
nodeport -
OpenShift Ingress と NGINX Ingress Controller for Kubernetes を使用する
ingress
要件ならびにお使いの環境およびインフラストラクチャーに応じて、選択するタイプは異なります。たとえば、ロードバランサーは、ベアメタル等の特定のインフラストラクチャーには適さない場合があります。ベアメタルでは、ノードポートがより適したオプションを提供します。
以下の手順では、
- TLS 暗号化および認証、ならびに Kafka 簡易承認 を有効にして、Kafka クラスターに外部リスナーが設定されます。
-
簡易承認 用に TLS 認証およびアクセス制御リスト (ACL) を定義して、クライアントに
KafkaUserが作成されます。
TLS または SCRAM-SHA-512 認証を使用するようにリスナーを設定できます。これらはいずれも TLS 暗号化と共に使用できます。承認サーバーを使用している場合は、トークンベースの OAuth 2.0 認証 および OAuth 2.0 承認 を使用できます。Open Policy Agent (OPA) 承認も、Kafka 承認 オプションとしてサポートされます。
KafkaUser の認証および承認メカニズムを設定する場合、必ず同等の Kafka 設定と一致するようにしてください。
-
KafkaUser.spec.authenticationはKafka.spec.kafka.listeners[*].authenticationと一致 -
KafkaUser.spec.authorizationはKafka.spec.kafka.authorizationと一致
KafkaUser に使用する認証をサポートするリスナーが少なくとも 1 つ必要です。
Kafka ユーザーと Kafka ブローカー間の認証は、それぞれの認証設定によって異なります。たとえば、TLS が Kafka 設定で有効になっていない場合は、TLS でユーザーを認証できません。
AMQ Streams Operator により設定プロセスが自動されます。
- Cluster Operator はリスナーを作成し、クラスターおよびクライアント認証局 (CA) 証明書を設定して Kafka クラスター内で認証を有効にします。
- User Operator はクライアントに対応するユーザーを作成すると共に、選択した認証タイプに基づいて、クライアント認証に使用されるセキュリティークレデンシャルを作成します。
この手順では、Cluster Operator によって生成された証明書が使用されますが、独自の証明書をインストール してそれらを置き換えることができます。外部認証局によって管理される Kafka リスナー証明書を使用するようにリスナーを設定することもできます。
PKCS #12 形式 (.p12) および PEM 形式 (.crt) の証明書を利用できます。
前提条件
- クライアントが Kafka クラスターを使用できる必要があります。
- Cluster Operator および User Operator がクラスターで実行されている必要があります。
- OpenShift クラスター外のクライアントが Kafka クラスターに接続できる必要があります。
手順
externalKafka リスナーと共に Kafka クラスターを設定します。- リスナーを通じて Kafka ブローカーにアクセスするのに必要な認証を定義します。
Kafka ブローカーで承認を有効にします。
以下に例を示します。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - 1
- 外部リスナーを有効にする設定オプションは、汎用 Kafka リスナースキーマ参照 に記載されています。
- 2
- リスナーを識別するための名前。Kafka クラスター内で一意である必要があります。
- 3
- Kafka 内でリスナーによって使用されるポート番号。ポート番号は指定の Kafka クラスター内で一意である必要があります。許可されるポート番号は 9092 以上ですが、すでに Prometheus および JMX によって使用されているポート 9404 および 9999 以外になります。リスナーのタイプによっては、ポート番号は Kafka クライアントに接続するポート番号と同じではない場合があります。
- 4
route、loadbalancer、nodeport、またはingressとして指定された外部リスナータイプ。内部リスナーはinternalとして指定されます。- 5
- リスナーで TLS 暗号化を有効にします。デフォルトは
falseです。routeリスナーには TLS による暗号化は必要ありません。 - 6
- 認証は
tlsとして指定されます。 - 7
- (任意設定:
nodeportリスナーのみ) ノードアドレスとして AMQ Streams によって使用される最初のアドレスタイプの希望を指定します。 - 8
- (任意設定) AMQ Streams はクライアントに公開するアドレスを自動的に決定します。アドレスは OpenShift によって自動的に割り当てられます。AMQ Streams を実行しているインフラストラクチャーが正しい ブートストラップおよびブローカーサービスのアドレス を提供しない場合、そのアドレスを上書きできます。検証はオーバーライドに対しては実行されません。オーバーライド設定はリスナーのタイプによって異なります。たとえば、
routeの場合はホストを、loadbalancerの場合は DNS 名または IP アドレスを、またnodeportの場合はノードポートを、それぞれ上書きすることができます。 - 9
simpleと指定された承認 (AclAuthorizerKafka プラグインを使用する)。- 10
- (任意設定) スーパーユーザーは、ACL で定義されたアクセス制限に関係なく、すべてのブローカーにアクセスできます。
警告OpenShift Route アドレスは、Kafka クラスターの名前、リスナーの名前、および作成される namespace の名前で構成されます。たとえば、
my-cluster-kafka-listener1-bootstrap-myproject(CLUSTER-NAME-kafka-LISTENER-NAME-bootstrap-NAMESPACE) となります。routeリスナータイプを使用している場合は、アドレス全体の長さが最大 63 文字という制限を超えないように注意してください。
Kafkaリソースを作成または更新します。oc apply -f KAFKA-CONFIG-FILE
oc apply -f KAFKA-CONFIG-FILECopy to Clipboard Copied! Toggle word wrap Toggle overflow Kafka クラスターは、TLS 認証を使用する Kafka ブローカーリスナーと共に設定されます。
Kafka ブローカー Pod ごとにサービスが作成されます。
サービスが作成され、Kafka クラスターに接続するための ブートストラップアドレス として機能します。
サービスは、
nodeportリスナーを使用した Kafka クラスターへの外部接続用 外部ブートストラップアドレス としても作成されます。kafka ブローカーの ID を検証するためのクラスター CA 証明書も、
Kafkaリソースと同じ名前で作成されます。Kafkaリソースのステータスから、ブートストラップアドレスおよびポートを探します。oc get kafka KAFKA-CLUSTER-NAME -o jsonpath='{.status.listeners[?(@.type=="external")].bootstrapServers}'oc get kafka KAFKA-CLUSTER-NAME -o jsonpath='{.status.listeners[?(@.type=="external")].bootstrapServers}'Copy to Clipboard Copied! Toggle word wrap Toggle overflow Kafka クライアントのブートストラップアドレスを使用して、Kafka クラスターに接続します。
生成された
KAFKA-CLUSTER-NAME-cluster-ca-certSecret から、公開クラスター CA 証明書およびパスワードを抽出します。oc get secret KAFKA-CLUSTER-NAME-cluster-ca-cert -o jsonpath='{.data.ca\.p12}' | base64 -d > ca.p12oc get secret KAFKA-CLUSTER-NAME-cluster-ca-cert -o jsonpath='{.data.ca\.p12}' | base64 -d > ca.p12Copy to Clipboard Copied! Toggle word wrap Toggle overflow oc get secret KAFKA-CLUSTER-NAME-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 -d > ca.passwordoc get secret KAFKA-CLUSTER-NAME-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 -d > ca.passwordCopy to Clipboard Copied! Toggle word wrap Toggle overflow Kafka クライアントの証明書およびパスワードを使用して、TLS 暗号化により Kafka クラスターに接続します。
注記デフォルトでは、クラスター CA 証明書は自動的に更新されます。専用の Kafka リスナー証明書を使用している場合は、証明書を手動で更新する 必要があります。
Kafka クラスターにアクセスする必要があるクライアントに対応するユーザーを作成または変更します。
KafkaUserリソースを作成または変更します。oc apply -f USER-CONFIG-FILE
oc apply -f USER-CONFIG-FILECopy to Clipboard Copied! Toggle word wrap Toggle overflow KafkaUserリソースと同じ名前の Secret と共に、ユーザーが作成されます。Secret には、TLS クライアント認証の秘密鍵と公開鍵が含まれます。以下に例を示します。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow Kafka クラスターへのセキュアな接続を確立するのに必要なプロパティーを使用して Kafka クラスターに接続するように、クライアントを設定します。
パブリッククラスター証明書の認証詳細を追加します。
security.protocol: SSL ssl.truststore.location: PATH-TO/ssl/keys/truststore ssl.truststore.password: CLUSTER-CA-CERT-PASSWORD ssl.truststore.type=PKCS12
security.protocol: SSL1 ssl.truststore.location: PATH-TO/ssl/keys/truststore2 ssl.truststore.password: CLUSTER-CA-CERT-PASSWORD3 ssl.truststore.type=PKCS124 Copy to Clipboard Copied! Toggle word wrap Toggle overflow 注記TLS 経由で SCRAM-SHA 認証を使用する場合は、
security.protocol: SASL_SSLを使用します。Kafka クラスターに接続するためのブートストラップアドレスおよびポートを追加します。
bootstrap.servers: BOOTSTRAP-ADDRESS:PORT
bootstrap.servers: BOOTSTRAP-ADDRESS:PORTCopy to Clipboard Copied! Toggle word wrap Toggle overflow パブリックユーザー証明書の認証情報を追加します。
ssl.keystore.location: PATH-TO/ssl/keys/user1.keystore ssl.keystore.password: USER-CERT-PASSWORD
ssl.keystore.location: PATH-TO/ssl/keys/user1.keystore1 ssl.keystore.password: USER-CERT-PASSWORD2 Copy to Clipboard Copied! Toggle word wrap Toggle overflow パブリックユーザー証明書は、作成時にクライアント CA により署名されます。