14.4. リスナーを使用した Kafka クラスターへのクライアントアクセス設定
Kafka クラスターのアドレスを使用して、同じ OpenShift クラスター内のクライアントへのアクセスを提供できます。または、別の OpenShift namespace または完全に OpenShift の外部にあるクライアントへの外部アクセスを提供できます。この手順では、OpenShift の外部または別の OpenShift クラスターから、Kafka クラスターへのクライアントアクセスを設定する方法を示します。
Kafka リスナーは、Kafka クラスターへのアクセスを提供します。クライアントアクセスは、次の設定を使用して保護されます。
-
mTLS 暗号化および認証、ならびに Kafka
simple
認可を有効にして、Kafka クラスターに外部リスナーが設定されます。 -
simple
認可用に mTLS 認証および アクセス制御リスト (ACL) を定義して、クライアントにKafkaUser
が作成されます。
相互 tls
、scram-sha-512
、または oauth
認証を使用するようにリスナーを設定できます。mTLS は常に暗号化を使用しますが、SCRAM-SHA-512 および OAuth 2.0 認証を使用する場合は暗号化も推奨されます。
Kafka ブローカーに simple
、oauth
、opa
、または custom
認可を設定できます。認可を有効にすると、認可は有効なすべてのリスナーに適用されます。
KafkaUser
認証および認可メカニズムを設定する場合は、必ず同等の Kafka 設定と一致させてください。
-
KafkaUser.spec.authentication
はKafka.spec.kafka.listeners[*].authentication
と一致します。 -
KafkaUser.spec.authorization
はKafka.spec.kafka.authorization
と一致します。
KafkaUser
に使用する認証をサポートするリスナーが少なくとも 1 つ必要です。
Kafka ユーザーと Kafka ブローカー間の認証は、それぞれの認証設定によって異なります。たとえば、mTLS が Kafka 設定で有効になっていない場合は、mTLS でユーザーを認証できません。
Streams for Apache Kafka の Operator が、設定プロセスを自動化し、認証に必要な証明書を作成します。
- Cluster Operator はリスナーを作成し、クラスターとクライアント認証局 (CA) 証明書を設定して、Kafka クラスターでの認証を有効にします。
- User Operator はクライアントに対応するユーザーを作成すると共に、選択した認証タイプに基づいて、クライアント認証に使用されるセキュリティークレデンシャルを作成します。
証明書をクライアント設定に追加します。
この手順では、Cluster Operator によって生成された CA 証明書が使用されますが、独自の証明書をインストール して証明書を置き換えることもできます。外部 CA (認証局) によって管理される Kafka リスナー証明書を使用 するようにリスナーを設定することもできます。
証明書は、PEM (.crt) および PKCS #12 (.p12) 形式で利用できます。この手順では、PEM 証明書を使用します。X.509 形式の証明書を使用するクライアントで PEM 証明書を使用します。
同じ OpenShift クラスターおよび namespace の内部クライアントの場合、Pod 仕様でクラスター CA 証明書をマウントできます。詳細については、クラスター CA を信頼するように内部クライアントを設定する を参照してください。
前提条件
- OpenShift クラスターの外部で実行されているクライアントによる接続に、Kafka クラスターを使用できる。
- Cluster Operator および User Operator がクラスターで実行されている。
手順
Kafka リスナーを使用して Kafka クラスターを設定します。
- リスナーを通じて Kafka ブローカーにアクセスするために必要な認証を定義します。
Kafka ブローカーで認可を有効にします。
リスナーの設定例
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: my-cluster namespace: myproject spec: kafka: # ... listeners: 1 - name: external1 2 port: 9094 3 type: <listener_type> 4 tls: true 5 authentication: type: tls 6 configuration: 7 #... authorization: 8 type: simple superUsers: - super-user-name 9 # ...
- 1
- 外部リスナーを有効にする設定オプションは、汎用 Kafka リスナースキーマリファレンス に記載されています。
- 2
- リスナーを識別するための名前。Kafka クラスター内で一意である必要があります。
- 3
- Kafka 内でリスナーによって使用されるポート番号。ポート番号は指定の Kafka クラスター内で一意である必要があります。許可されるポート番号は 9092 以上ですが、すでに Prometheus および JMX によって使用されているポート 9404 および 9999 以外になります。リスナーのタイプによっては、ポート番号は Kafka クライアントに接続するポート番号と同じではない場合があります。
- 4
- 外部リスナーのタイプは、
route
(OpenShift のみ)、loadbalancer
、nodeport
またはingress
(Kubernetes のみ) として指定されます。内部リスナーはinternal
またはcluster-ip
として指定されます。 - 5
- 必須。リスナーでの TLS 暗号化。
route
およびingress
タイプのリスナーの場合は、true
に設定する必要があります。mTLS 認証の場合は、authentication
プロパティーも使用します。 - 6
- リスナーでのクライアント認証メカニズム。mTLS を使用したサーバーおよびクライアント認証の場合、
tls: true
およびauthentication.type: tls
を指定します。 - 7
- (オプション) リスナータイプの要件に応じて、追加の リスナー設定 を指定できます。
- 8
AclAuthorizer
およびStandardAuthorizer
Kafka プラグインを使用するsimple
として指定された認可。- 9
- (任意設定) スーパーユーザーは、ACL で定義されたアクセス制限に関係なく、すべてのブローカーにアクセスできます。
警告OpenShift ルートアドレスは、Kafka クラスター名、リスナー名、プロジェクト名、およびルーターのドメインで構成されます。たとえば、
my-cluster-kafka-external1-bootstrap-my-project.domain.com
(<cluster_name>-kafka-<listener_name>-bootstrap-<namespace>.<domain>) のようになります。各 DNS ラベル (ピリオド.の間) は 63 文字を超えてはならず、アドレスの合計の長さは 255 文字を超えてはなりません。
Kafka
リソースを作成または更新します。oc apply -f <kafka_configuration_file>
Kafka クラスターは、mTLS 認証を使用する Kafka ブローカーリスナーと共に設定されます。
Kafka ブローカー Pod ごとにサービスが作成されます。
サービスが作成され、Kafka クラスターに接続するための ブートストラップアドレス として機能します。
サービスは、
nodeport
リスナーを使用した Kafka クラスターへの外部接続用 外部ブートストラップアドレス としても作成されます。kafka ブローカーのアイデンティティーを検証するクラスター CA 証明書もシークレット
<cluster_name>-cluster-ca-cert
に作成されます。注記外部リスナーの使用時に Kafka クラスターをスケーリングする場合、すべての Kafka ブローカーのローリング更新がトリガーされる可能性があります。これは設定によって異なります。
Kafka
リソースのステータスから、Kafka クラスターにアクセスする際に使用するブートストラップアドレスを取得します。oc get kafka <kafka_cluster_name> -o=jsonpath='{.status.listeners[?(@.name=="<listener_name>")].bootstrapServers}{"\n"}'
以下に例を示します。
oc get kafka my-cluster -o=jsonpath='{.status.listeners[?(@.name=="external")].bootstrapServers}{"\n"}'
Kafka クライアントのブートストラップアドレスを使用して、Kafka クラスターに接続します。
Kafka クラスターにアクセスする必要があるクライアントに対応するユーザーを作成または変更します。
-
Kafka
リスナーと同じ認証タイプを指定します。 simple
認可の認可 ACL を指定します。ユーザー設定の例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaUser metadata: name: my-user labels: strimzi.io/cluster: my-cluster 1 spec: authentication: type: tls 2 authorization: type: simple acls: 3 - resource: type: topic name: my-topic patternType: literal operations: - Describe - Read - resource: type: group name: my-group patternType: literal operations: - Read
-
KafkaUser
リソースを作成または変更します。oc apply -f USER-CONFIG-FILE
KafkaUser
リソースと同じ名前のシークレットと共に、ユーザーが作成されます。シークレットには、mTLS 認証用の公開鍵と秘密鍵が含まれています。シークレットの例
apiVersion: v1 kind: Secret metadata: name: my-user labels: strimzi.io/kind: KafkaUser strimzi.io/cluster: my-cluster type: Opaque data: ca.crt: <public_key> # Public key of the clients CA user.crt: <user_certificate> # Public key of the user user.key: <user_private_key> # Private key of the user user.p12: <store> # PKCS #12 store for user certificates and keys user.password: <password_for_store> # Protects the PKCS #12 store
Kafka クラスターの
<cluster_name>-cluster-ca-cert
シークレットからクラスター CA 証明書を抽出します。oc get secret <cluster_name>-cluster-ca-cert -o jsonpath='{.data.ca\.crt}' | base64 -d > ca.crt
<user_name>
シークレットからユーザー CA 証明書を抽出します。oc get secret <user_name> -o jsonpath='{.data.user\.crt}' | base64 -d > user.crt
<user_name>
シークレットからユーザーの秘密鍵を抽出します。oc get secret <user_name> -o jsonpath='{.data.user\.key}' | base64 -d > user.key
Kafka クラスターに接続するためのブートストラップアドレスのホスト名とポートを使用してクライアントを設定します。
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<hostname>:<port>");
Kafka クラスターの ID を検証するために、トラストストア認証情報を使用してクライアントを設定します。
パブリッククラスター CA 証明書を指定します。
トラストストア設定の例
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "PEM"); props.put(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, "<ca.crt_file_content>");
SSL は、mTLS 認証用に指定されたセキュリティープロトコルです。TLS を介した SCRAM-SHA-512 認証には
SASL_SSL
を指定します。PEM はトラストストアのファイル形式です。Kafka クラスターに接続する際にユーザーを検証するために、キーストア認証情報を使用してクライアントを設定します。
公開証明書と秘密鍵を指定します。
キーストア設定の例
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "PEM"); props.put(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG, "<user.crt_file_content>"); props.put(SslConfigs.SSL_KEYSTORE_KEY_CONFIG, "<user.key_file_content>");
キーストア証明書と秘密鍵を設定に直接追加します。1 行形式で追加します。
BEGIN CERTIFICATE
とEND CERTIFICATE
区切り文字の間は、改行文字 (\n
) で始まります。元の証明書の各行も\n
で終了します。キーストア設定の例
props.put(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG, "-----BEGIN CERTIFICATE----- \n<user_certificate_content_line_1>\n<user_certificate_content_line_n>\n-----END CERTIFICATE---"); props.put(SslConfigs.SSL_KEYSTORE_KEY_CONFIG, "----BEGIN PRIVATE KEY-----\n<user_key_content_line_1>\n<user_key_content_line_n>\n-----END PRIVATE KEY-----");
関連情報
- 「リスナー認証」
- 「Kafka の承認」
認可サーバーを使用している場合は、トークンベースの認証と認可を使用できます。