9.2. Kafka の設定
Kafka
カスタムリソースの spec
プロパティーを更新して、Kafka デプロイメントを設定します。
Kafka の設定に加えて、ZooKeeper と Streams for Apache Kafka Operator の設定を追加できます。ロギングやヘルスチェックなどの一般的な設定プロパティーは、コンポーネントごとに独立して設定されます。
特に重要な設定オプションには次のものがあります。
- リソース要求 (CPU/メモリー)
- 最大および最小メモリー割り当ての JVM オプション
- クライアントを Kafka ブローカーに接続するためのリスナー (およびクライアントの認証)
- 認証
- ストレージ
- ラックアウェアネス
- メトリック
- Cruise Control によるクラスターのリバランス
- KRaft ベースの Kafka クラスターのメタデータバージョン
- ZooKeeper ベースの Kafka クラスターのブローカー間プロトコルバージョン
config
の .spec.kafka.metadataVersion
プロパティーまたは inter.broker.protocol.version
プロパティーは、指定された Kafka バージョン (spec.kafka.version
) でサポートされるバージョンにする必要があります。このプロパティーは、Kafka クラスターで使用される Kafka メタデータまたはブローカー間プロトコルのバージョンを表します。これらのプロパティーのいずれかが設定で設定されていない場合、Cluster Operator はバージョンを、使用されている Kafka バージョンのデフォルトに更新します。
サポートされている最も古いメタデータのバージョンは 3.3 です。Kafka バージョンより古いメタデータバージョンを使用すると、一部の機能が無効になる可能性があります。
Kafka クラスターの設定オプションの詳細は、Streams for Apache Kafka カスタムリソース API リファレンス を参照してください。
TLS 証明書の管理
Kafka をデプロイする場合、Cluster Operator は自動で TLS 証明書の設定および更新を行い、クラスター内での暗号化および認証を有効にします。必要な場合は、更新期間の開始前にクラスターおよびクライアント CA 証明書を手動で更新できます。クラスターおよびクライアント CA 証明書によって使用される鍵を置き換えることもできます。詳細は、CA 証明書の手動更新 および 秘密鍵の置換 を参照してください。
Kafka
カスタムリソース設定の例
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: my-cluster spec: kafka: replicas: 3 1 version: 3.7.0 2 logging: 3 type: inline loggers: kafka.root.logger.level: INFO resources: 4 requests: memory: 64Gi cpu: "8" limits: memory: 64Gi cpu: "12" readinessProbe: 5 initialDelaySeconds: 15 timeoutSeconds: 5 livenessProbe: initialDelaySeconds: 15 timeoutSeconds: 5 jvmOptions: 6 -Xms: 8192m -Xmx: 8192m image: my-org/my-image:latest 7 listeners: 8 - name: plain 9 port: 9092 10 type: internal 11 tls: false 12 configuration: useServiceDnsDomain: true 13 - name: tls port: 9093 type: internal tls: true authentication: 14 type: tls - name: external1 15 port: 9094 type: route tls: true configuration: brokerCertChainAndKey: 16 secretName: my-secret certificate: my-certificate.crt key: my-key.key authorization: 17 type: simple config: 18 auto.create.topics.enable: "false" offsets.topic.replication.factor: 3 transaction.state.log.replication.factor: 3 transaction.state.log.min.isr: 2 default.replication.factor: 3 min.insync.replicas: 2 inter.broker.protocol.version: "3.7" storage: 19 type: persistent-claim 20 size: 10000Gi rack: 21 topologyKey: topology.kubernetes.io/zone metricsConfig: 22 type: jmxPrometheusExporter valueFrom: configMapKeyRef: 23 name: my-config-map key: my-key # ... zookeeper: 24 replicas: 3 25 logging: 26 type: inline loggers: zookeeper.root.logger: INFO resources: requests: memory: 8Gi cpu: "2" limits: memory: 8Gi cpu: "2" jvmOptions: -Xms: 4096m -Xmx: 4096m storage: type: persistent-claim size: 1000Gi metricsConfig: # ... entityOperator: 27 tlsSidecar: 28 resources: requests: cpu: 200m memory: 64Mi limits: cpu: 500m memory: 128Mi topicOperator: watchedNamespace: my-topic-namespace reconciliationIntervalSeconds: 60 logging: 29 type: inline loggers: rootLogger.level: INFO resources: requests: memory: 512Mi cpu: "1" limits: memory: 512Mi cpu: "1" userOperator: watchedNamespace: my-topic-namespace reconciliationIntervalSeconds: 60 logging: 30 type: inline loggers: rootLogger.level: INFO resources: requests: memory: 512Mi cpu: "1" limits: memory: 512Mi cpu: "1" kafkaExporter: 31 # ... cruiseControl: 32 # ...
- 1
- レプリカノードの数。
- 2
- Kafka バージョン。アップグレード手順に従うと、サポート対象のバージョンに変更できます。
- 3
- ConfigMap を介して直接 (
inline
) または間接 (external
) に追加された Kafka ロガーとログレベル。カスタム Log4j 設定は、ConfigMap のlog4j.properties
キーの下に配置する必要があります。Kafkakafka.root.logger.level
ロガーでは、ログレベルを INFO、ERROR、WARN、TRACE、DEBUG、FATAL または OFF に設定できます。 - 4
- 現在
cpu
およびmemory
である、サポートされるリソースの予約を要求し、消費可能な最大リソースを指定を制限します。 - 5
- コンテナーを再起動するタイミング (liveness) およびコンテナーがトラフィックを許可できるタイミング (readiness) を把握するためのヘルスチェック。
- 6
- Kafka を実行している仮想マシン (VM) のパフォーマンスを最適化するための JVM 設定オプション。
- 7
- 高度なオプション: コンテナーイメージの設定。特別な状況でのみ推奨されます。
- 8
- リスナーは、ブートストラップアドレスでクライアントが Kafka クラスターに接続する方法を設定します。リスナーは、OpenShift クラスター内部または外部からの接続の 内部 または 外部 リスナーとして設定されます。
- 9
- リスナーを識別するための名前。Kafka クラスター内で一意である必要があります。
- 10
- Kafka 内でリスナーによって使用されるポート番号。ポート番号は指定の Kafka クラスター内で一意である必要があります。許可されるポート番号は 9092 以上ですが、すでに Prometheus および JMX によって使用されているポート 9404 および 9999 以外になります。リスナーのタイプによっては、ポート番号は Kafka クライアントに接続するポート番号と同じではない場合があります。
- 11
- リスナーのタイプは、
internal
またはcluster-ip
(ブローカーごとのClusterIP
サービスを使用して Kafka を公開するため) として指定されるか、外部リスナーの場合は、route
(OpenShift のみ)、loadbalancer
、nodeport
またはingress
(Kubernetes のみ) として指定されます。 - 12
- 各リスナーの TLS 暗号化を有効または無効にします。
route
およびIngress
タイプのリスナーの場合、TLS 暗号化を常にtrue
に設定して有効にする必要があります。 - 13
- クラスターサービス接尾辞 (通常は
cluster.local
) を含む完全修飾 DNS 名が割り当てられているかどうかを定義します。 - 14
- mTLS、SCRAM-SHA-512、またはトークンベースの OAuth 2.0 として指定されるリスナー認証メカニズム。
- 15
- 外部リスナー設定は、
route
、loadbalancer
、またはnodeport
からなど、Kafka クラスターが外部の OpenShift に公開される方法を指定します。 - 16
- 外部 CA (認証局) によって管理される Kafka リスナー証明書のオプションの設定。
brokerCertChainAndKey
は、サーバー証明書および秘密鍵が含まれるSecret
を指定します。TLS による暗号化が有効な任意のリスナーで Kafka リスナー証明書を設定できます。 - 17
- 認可は Kafka ブローカーで簡易、OAUTH2.0、または OPA 認可を有効化します。簡易認可では、
AclAuthorizer
およびStandardAuthorizer
Kafka プラグインが使用されます。 - 18
- ブローカー設定。Streams for Apache Kafka によって直接管理されないプロパティーに限り、標準の Apache Kafka 設定を指定できます。
- 19
- 永続ボリュームのストレージサイズは拡張可能で、さらに JBOD ストレージへのボリューム追加が可能です。
- 20
- 永続ストレージには、動的ボリュームプロビジョニングのためのストレージ
id
やclass
など、追加の設定オプションがあります。 - 21
- 異なるラック、データセンター、またはアベイラビリティーゾーンにレプリカを分散させるためのラックアウェアネス設定。
topologyKey
は、ラック ID を含むノードラベルと一致する必要があります。この設定で使用される例では、標準のtopology.kubernetes.io/zone
ラベルを使用するゾーンを指定します。 - 22
- Prometheus メトリックが有効になりました。この例では、メトリクスは Prometheus JMX Exporter (デフォルトのメトリクスエクスポーター) に対して設定されます。
- 23
- Prometheus JMX Exporter 経由で Prometheus 形式のメトリクスを Grafana ダッシュボードにエクスポートするルール。Prometheus JMX Exporter の設定が含まれる ConfigMap を参照することで有効になります。
metricsConfig.valueFrom.configMapKeyRef.key
配下に空のファイルが含まれる ConfigMap の参照を使用して、追加設定なしでメトリックを有効にできます。 - 24
- Kafka 設定と似たプロパティーが含まれる、ZooKeeper 固有の設定。
- 25
- ZooKeeper ノードの数。通常、ZooKeeper クラスターまたはアンサンブルは、一般的に 3、5、7 個の奇数個のノードで実行されます。効果的なクォーラムを維持するには、過半数のノードが利用可能である必要があります。ZooKeeper クラスターでクォーラムを失うと、クライアントへの応答が停止し、Kafka ブローカーが機能しなくなります。Streams for Apache Kafka では、ZooKeeper クラスターの安定性および高可用性が重要です。
- 26
- ZooKeeper ロガーとログレベル。
- 27
- Topic Operator および User Operator の設定を指定する Entity Operator 設定。
- 28
- Entity Operator の TLS サイドカー設定。Entity Operator は、ZooKeeper とのセキュアな通信に TLS サイドカーを使用します。
- 29
- 指定された Topic Operator ロガーおよびログレベル。この例では、
inline
ロギングを使用します。 - 30
- 指定された User Operator ロガーおよびログレベル。
- 31
- Kafka Exporter の設定。Kafka Exporter は、Kafka ブローカーからメトリックデータ、特にコンシューマーラグデータを抽出するためのオプションのコンポーネントです。Kafka Exporter が適切に機能できるようにするには、コンシューマーグループを使用する必要があります。
- 32
- Kafka クラスターの再バランスに使用される Cruise Control のオプションの設定。
9.2.1. Kafka Static Quota プラグインを使用したブローカーへの制限の設定
Kafka Static Quota プラグインを使用して、Kafka クラスターのブローカーにスループットおよびストレージの制限を設定します。Kafka
リソースを設定して、プラグインを有効にし、制限を設定します。バイトレートのしきい値およびストレージクォータを設定して、ブローカーと対話するクライアントに制限を設けることができます。
プロデューサーおよびコンシューマー帯域幅にバイトレートのしきい値を設定できます。制限の合計は、ブローカーにアクセスするすべてのクライアントに分散されます。たとえば、バイトレートのしきい値として 40 MBps ををプロデューサーに設定できます。2 つのプロデューサーが実行されている場合、それぞれのスループットは 20MBps に制限されます。
ストレージクォータは、Kafka ディスクストレージの制限をソフト制限とハード制限間で調整します。この制限は、利用可能なすべてのディスク容量に適用されます。プロデューサーは、ソフト制限とハード制限の間で徐々に遅くなります。制限により、ディスクの使用量が急激に増加しないようにし、容量を超えないようにします。ディスクがいっぱいになると、修正が難しい問題が発生する可能性があります。ハード制限は、ストレージの上限です。
JBOD ストレージの場合、制限はすべてのディスクに適用されます。ブローカーが 2 つの 1 TB ディスクを使用し、クォータが 1.1 TB の場合は、1 つのディスクにいっぱいになり、別のディスクがほぼ空になることがあります。
前提条件
- Kafka クラスターを管理する Cluster Operator が稼働している。
手順
Kafka
リソースのconfig
にプラグインのプロパティーを追加します。プラグインプロパティーは、この設定例のとおりです。
Kafka Static Quota プラグインの設定例
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: my-cluster spec: kafka: # ... config: client.quota.callback.class: io.strimzi.kafka.quotas.StaticQuotaCallback 1 client.quota.callback.static.produce: 1000000 2 client.quota.callback.static.fetch: 1000000 3 client.quota.callback.static.storage.soft: 400000000000 4 client.quota.callback.static.storage.hard: 500000000000 5 client.quota.callback.static.storage.check-interval: 5 6
リソースを更新します。
oc apply -f <kafka_configuration_file>
9.2.2. ZooKeeper のデフォルト設定値
Streams for Apache Kafka を使用して ZooKeeper をデプロイする場合、Streams for Apache Kafka によって設定されるデフォルト設定の一部が、標準の ZooKeeper のデフォルト設定と異なります。これは、Streams for Apache Kafka が、OpenShift 環境内で ZooKeeper を実行するために最適化された値を使用して、多数の ZooKeeper プロパティーを設定するためです。
Streams for Apache Kafka の主要な ZooKeeper プロパティーのデフォルト設定は次のとおりです。
プロパティー | デフォルト値 | 説明 |
---|---|---|
| 2000 | 1 ティックの長さ (ミリ秒単位)。これによってセッションタイムアウトの長さが決まります。 |
| 5 | ZooKeeper クラスター内で、フォロワーがリーダーに遅れることを許可される最大ティック数。 |
| 2 | ZooKeeper クラスター内でフォロワーがリーダーと同期していなくても許容される最大ティック数。 |
| 1 |
|
| false | ZooKeeper 管理サーバーを無効にするフラグ。管理サーバーは、Streams for Apache Kafka では使用されません。 |
Kafka
カスタムリソースの Zookeeper.config
としてこれらのデフォルト値を変更すると、ZooKeeper クラスターの動作とパフォーマンスに影響を与える可能性があります。
9.2.3. アノテーションを使用した Kafka ノードの削除
この手順では、OpenShift アノテーションを使用して既存の Kafka ノードを削除する方法を説明します。Kafka ノードの削除するには、Kafka ブローカーが稼働している Pod
と、関連する PersistentVolumeClaim
の両方を削除します (クラスターが永続ストレージでデプロイされた場合)。削除後、Pod
と関連する PersistentVolumeClaim
が自動的に再作成されます。
PersistentVolumeClaim
を削除すると、永久的なデータ損失が発生する可能性があり、クラスターの可用性は保証できません。以下の手順は、ストレージで問題が発生した場合にのみ実行してください。
前提条件
- 稼働中の Cluster Operator
手順
削除する
Pod
の名前を見つけます。Kafka ブローカー Pod の名前は
<cluster_name>-kafka-<index_number>
です。ここで、<index_number>
は 0 から始まり、レプリカの合計数から 1 を引いた値で終わります。例:my-cluster-kafka-0
oc annotate
を使用して、OpenShift でPod
リソースにアノテーションを付けます。oc annotate pod <cluster_name>-kafka-<index_number> strimzi.io/delete-pod-and-pvc="true"
- 基盤となる永続ボリューム要求 (Persistent Volume Claim) でアノテーションが付けられた Pod が削除され、再作成されるときに、次の調整の実行を待ちます。
9.2.4. アノテーションを使用した ZooKeeper ノードの削除
この手順では、OpenShift アノテーションを使用して既存の ZooKeeper ノードを削除する方法を説明します。ZooKeeper ノードを削除するには、ZooKeeper が稼働している Pod
と、関連する PersistentVolumeClaim
の両方を削除します (クラスターが永続ストレージでデプロイされた場合)。削除後、Pod
と関連する PersistentVolumeClaim
が自動的に再作成されます。
PersistentVolumeClaim
を削除すると、永久的なデータ損失が発生する可能性があり、クラスターの可用性は保証できません。以下の手順は、ストレージで問題が発生した場合にのみ実行してください。
前提条件
- 稼働中の Cluster Operator
手順
削除する
Pod
の名前を見つけます。ZooKeeper Pod の名前は
<cluster_name>-zookeeper-<index_number>
です。ここで、<index_number>
は 0 から始まり、レプリカの合計数から 1 を引いた値で終わります。例:my-cluster-zookeeper-0
oc annotate
を使用して、OpenShift でPod
リソースにアノテーションを付けます。oc annotate pod <cluster_name>-zookeeper-<index_number> strimzi.io/delete-pod-and-pvc="true"
- 基盤となる永続ボリューム要求 (Persistent Volume Claim) でアノテーションが付けられた Pod が削除され、再作成されるときに、次の調整の実行を待ちます。