9.2. Kafka の設定
Kafka カスタムリソースの spec プロパティーを更新して、Kafka デプロイメントを設定します。
Kafka の設定に加えて、ZooKeeper と Streams for Apache Kafka Operator の設定を追加できます。ロギングやヘルスチェックなどの一般的な設定プロパティーは、コンポーネントごとに独立して設定されます。
特に重要な設定オプションには次のものがあります。
- リソース要求 (CPU/メモリー)
- 最大および最小メモリー割り当ての JVM オプション
- クライアントを Kafka ブローカーに接続するためのリスナー (およびクライアントの認証)
- 認証
- ストレージ
- ラックアウェアネス
- メトリック
- Cruise Control によるクラスターのリバランス
- KRaft ベースの Kafka クラスターのメタデータバージョン
- ZooKeeper ベースの Kafka クラスターのブローカー間プロトコルバージョン
config の .spec.kafka.metadataVersion プロパティーまたは inter.broker.protocol.version プロパティーは、指定された Kafka バージョン (spec.kafka.version) でサポートされるバージョンにする必要があります。このプロパティーは、Kafka クラスターで使用される Kafka メタデータまたはブローカー間プロトコルのバージョンを表します。これらのプロパティーのいずれかが設定で設定されていない場合、Cluster Operator はバージョンを、使用されている Kafka バージョンのデフォルトに更新します。
サポートされている最も古いメタデータのバージョンは 3.3 です。Kafka バージョンより古いメタデータバージョンを使用すると、一部の機能が無効になる可能性があります。
Kafka クラスターの設定オプションの詳細は、Streams for Apache Kafka カスタムリソース API リファレンス を参照してください。
TLS 証明書の管理
Kafka をデプロイする場合、Cluster Operator は自動で TLS 証明書の設定および更新を行い、クラスター内での暗号化および認証を有効にします。必要な場合は、更新期間の開始前にクラスターおよびクライアント CA 証明書を手動で更新できます。クラスターおよびクライアント CA 証明書によって使用される鍵を置き換えることもできます。詳細は、CA 証明書の手動更新 および 秘密鍵の置換 を参照してください。
Kafka カスタムリソース設定の例
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
spec:
kafka:
replicas: 3
version: 3.7.0
logging:
type: inline
loggers:
kafka.root.logger.level: INFO
resources:
requests:
memory: 64Gi
cpu: "8"
limits:
memory: 64Gi
cpu: "12"
readinessProbe:
initialDelaySeconds: 15
timeoutSeconds: 5
livenessProbe:
initialDelaySeconds: 15
timeoutSeconds: 5
jvmOptions:
-Xms: 8192m
-Xmx: 8192m
image: my-org/my-image:latest
listeners:
- name: plain
port: 9092
type: internal
tls: false
configuration:
useServiceDnsDomain: true
- name: tls
port: 9093
type: internal
tls: true
authentication:
type: tls
- name: external1
port: 9094
type: route
tls: true
configuration:
brokerCertChainAndKey:
secretName: my-secret
certificate: my-certificate.crt
key: my-key.key
authorization:
type: simple
config:
auto.create.topics.enable: "false"
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
default.replication.factor: 3
min.insync.replicas: 2
inter.broker.protocol.version: "3.7"
storage:
type: persistent-claim
size: 10000Gi
rack:
topologyKey: topology.kubernetes.io/zone
metricsConfig:
type: jmxPrometheusExporter
valueFrom:
configMapKeyRef:
name: my-config-map
key: my-key
# ...
zookeeper:
replicas: 3
logging:
type: inline
loggers:
zookeeper.root.logger: INFO
resources:
requests:
memory: 8Gi
cpu: "2"
limits:
memory: 8Gi
cpu: "2"
jvmOptions:
-Xms: 4096m
-Xmx: 4096m
storage:
type: persistent-claim
size: 1000Gi
metricsConfig:
# ...
entityOperator:
tlsSidecar:
resources:
requests:
cpu: 200m
memory: 64Mi
limits:
cpu: 500m
memory: 128Mi
topicOperator:
watchedNamespace: my-topic-namespace
reconciliationIntervalSeconds: 60
logging:
type: inline
loggers:
rootLogger.level: INFO
resources:
requests:
memory: 512Mi
cpu: "1"
limits:
memory: 512Mi
cpu: "1"
userOperator:
watchedNamespace: my-topic-namespace
reconciliationIntervalSeconds: 60
logging:
type: inline
loggers:
rootLogger.level: INFO
resources:
requests:
memory: 512Mi
cpu: "1"
limits:
memory: 512Mi
cpu: "1"
kafkaExporter:
# ...
cruiseControl:
# ...
- 1
- レプリカノードの数。
- 2
- Kafka バージョン。アップグレード手順に従うと、サポート対象のバージョンに変更できます。
- 3
- ConfigMap を介して直接 (
inline) または間接 (external) に追加された Kafka ロガーとログレベル。カスタム Log4j 設定は、ConfigMap のlog4j.propertiesキーの下に配置する必要があります。Kafkakafka.root.logger.levelロガーでは、ログレベルを INFO、ERROR、WARN、TRACE、DEBUG、FATAL または OFF に設定できます。 - 4
- 現在
cpuおよびmemoryである、サポートされるリソースの予約を要求し、消費可能な最大リソースを指定を制限します。 - 5
- コンテナーを再起動するタイミング (liveness) およびコンテナーがトラフィックを許可できるタイミング (readiness) を把握するためのヘルスチェック。
- 6
- Kafka を実行している仮想マシン (VM) のパフォーマンスを最適化するための JVM 設定オプション。
- 7
- 高度なオプション: コンテナーイメージの設定。特別な状況でのみ推奨されます。
- 8
- リスナーは、ブートストラップアドレスでクライアントが Kafka クラスターに接続する方法を設定します。リスナーは、OpenShift クラスター内部または外部からの接続の 内部 または 外部 リスナーとして設定されます。
- 9
- リスナーを識別するための名前。Kafka クラスター内で一意である必要があります。
- 10
- Kafka 内でリスナーによって使用されるポート番号。ポート番号は指定の Kafka クラスター内で一意である必要があります。許可されるポート番号は 9092 以上ですが、すでに Prometheus および JMX によって使用されているポート 9404 および 9999 以外になります。リスナーのタイプによっては、ポート番号は Kafka クライアントに接続するポート番号と同じではない場合があります。
- 11
- リスナーのタイプは、
internalまたはcluster-ip(ブローカーごとのClusterIPサービスを使用して Kafka を公開するため) として指定されるか、外部リスナーの場合は、route(OpenShift のみ)、loadbalancer、nodeportまたはingress(Kubernetes のみ) として指定されます。 - 12
- 各リスナーの TLS 暗号化を有効または無効にします。
routeおよびIngressタイプのリスナーの場合、TLS 暗号化を常にtrueに設定して有効にする必要があります。 - 13
- クラスターサービス接尾辞 (通常は
cluster.local) を含む完全修飾 DNS 名が割り当てられているかどうかを定義します。 - 14
- mTLS、SCRAM-SHA-512、またはトークンベースの OAuth 2.0 として指定されるリスナー認証メカニズム。
- 15
- 外部リスナー設定は、
route、loadbalancer、またはnodeportからなど、Kafka クラスターが外部の OpenShift に公開される方法を指定します。 - 16
- 外部 CA (認証局) によって管理される Kafka リスナー証明書のオプションの設定。
brokerCertChainAndKeyは、サーバー証明書および秘密鍵が含まれるSecretを指定します。TLS による暗号化が有効な任意のリスナーで Kafka リスナー証明書を設定できます。 - 17
- 認可は Kafka ブローカーで簡易、OAUTH2.0、または OPA 認可を有効化します。簡易認可では、
AclAuthorizerおよびStandardAuthorizerKafka プラグインが使用されます。 - 18
- ブローカー設定。Streams for Apache Kafka によって直接管理されないプロパティーに限り、標準の Apache Kafka 設定を指定できます。
- 19
- 永続ボリュームのストレージサイズは拡張可能で、さらに JBOD ストレージへのボリューム追加が可能です。
- 20
- 永続ストレージには、動的ボリュームプロビジョニングのためのストレージ
idやclassなど、追加の設定オプションがあります。 - 21
- 異なるラック、データセンター、またはアベイラビリティーゾーンにレプリカを分散させるためのラックアウェアネス設定。
topologyKeyは、ラック ID を含むノードラベルと一致する必要があります。この設定で使用される例では、標準のtopology.kubernetes.io/zoneラベルを使用するゾーンを指定します。 - 22
- Prometheus メトリックが有効になりました。この例では、メトリクスは Prometheus JMX Exporter (デフォルトのメトリクスエクスポーター) に対して設定されます。
- 23
- Prometheus JMX Exporter 経由で Prometheus 形式のメトリクスを Grafana ダッシュボードにエクスポートするルール。Prometheus JMX Exporter の設定が含まれる ConfigMap を参照することで有効になります。
metricsConfig.valueFrom.configMapKeyRef.key配下に空のファイルが含まれる ConfigMap の参照を使用して、追加設定なしでメトリックを有効にできます。 - 24
- Kafka 設定と似たプロパティーが含まれる、ZooKeeper 固有の設定。
- 25
- ZooKeeper ノードの数。通常、ZooKeeper クラスターまたはアンサンブルは、一般的に 3、5、7 個の奇数個のノードで実行されます。効果的なクォーラムを維持するには、過半数のノードが利用可能である必要があります。ZooKeeper クラスターでクォーラムを失うと、クライアントへの応答が停止し、Kafka ブローカーが機能しなくなります。Streams for Apache Kafka では、ZooKeeper クラスターの安定性および高可用性が重要です。
- 26
- ZooKeeper ロガーとログレベル。
- 27
- Topic Operator および User Operator の設定を指定する Entity Operator 設定。
- 28
- Entity Operator の TLS サイドカー設定。Entity Operator は、ZooKeeper とのセキュアな通信に TLS サイドカーを使用します。
- 29
- 指定された Topic Operator ロガーおよびログレベル。この例では、
inlineロギングを使用します。 - 30
- 指定された User Operator ロガーおよびログレベル。
- 31
- Kafka Exporter の設定。Kafka Exporter は、Kafka ブローカーからメトリックデータ、特にコンシューマーラグデータを抽出するためのオプションのコンポーネントです。Kafka Exporter が適切に機能できるようにするには、コンシューマーグループを使用する必要があります。
- 32
- Kafka クラスターの再バランスに使用される Cruise Control のオプションの設定。
9.2.1. Kafka Static Quota プラグインを使用したブローカーへの制限の設定 リンクのコピーリンクがクリップボードにコピーされました!
Kafka Static Quota プラグインを使用して、Kafka クラスターのブローカーにスループットおよびストレージの制限を設定します。Kafka リソースを設定して、プラグインを有効にし、制限を設定します。バイトレートのしきい値およびストレージクォータを設定して、ブローカーと対話するクライアントに制限を設けることができます。
プロデューサーおよびコンシューマー帯域幅にバイトレートのしきい値を設定できます。制限の合計は、ブローカーにアクセスするすべてのクライアントに分散されます。たとえば、バイトレートのしきい値として 40 MBps ををプロデューサーに設定できます。2 つのプロデューサーが実行されている場合、それぞれのスループットは 20MBps に制限されます。
ストレージクォータは、Kafka ディスクストレージの制限をソフト制限とハード制限間で調整します。この制限は、利用可能なすべてのディスク容量に適用されます。プロデューサーは、ソフト制限とハード制限の間で徐々に遅くなります。制限により、ディスクの使用量が急激に増加しないようにし、容量を超えないようにします。ディスクがいっぱいになると、修正が難しい問題が発生する可能性があります。ハード制限は、ストレージの上限です。
JBOD ストレージの場合、制限はすべてのディスクに適用されます。ブローカーが 2 つの 1 TB ディスクを使用し、クォータが 1.1 TB の場合は、1 つのディスクにいっぱいになり、別のディスクがほぼ空になることがあります。
前提条件
- Kafka クラスターを管理する Cluster Operator が稼働している。
手順
Kafkaリソースのconfigにプラグインのプロパティーを追加します。プラグインプロパティーは、この設定例のとおりです。
Kafka Static Quota プラグインの設定例
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: my-cluster spec: kafka: # ... config: client.quota.callback.class: io.strimzi.kafka.quotas.StaticQuotaCallback1 client.quota.callback.static.produce: 10000002 client.quota.callback.static.fetch: 10000003 client.quota.callback.static.storage.soft: 4000000000004 client.quota.callback.static.storage.hard: 5000000000005 client.quota.callback.static.storage.check-interval: 56 リソースを更新します。
oc apply -f <kafka_configuration_file>
9.2.2. ZooKeeper のデフォルト設定値 リンクのコピーリンクがクリップボードにコピーされました!
Streams for Apache Kafka を使用して ZooKeeper をデプロイする場合、Streams for Apache Kafka によって設定されるデフォルト設定の一部が、標準の ZooKeeper のデフォルト設定と異なります。これは、Streams for Apache Kafka が、OpenShift 環境内で ZooKeeper を実行するために最適化された値を使用して、多数の ZooKeeper プロパティーを設定するためです。
Streams for Apache Kafka の主要な ZooKeeper プロパティーのデフォルト設定は次のとおりです。
| プロパティー | デフォルト値 | 説明 |
|---|---|---|
|
| 2000 | 1 ティックの長さ (ミリ秒単位)。これによってセッションタイムアウトの長さが決まります。 |
|
| 5 | ZooKeeper クラスター内で、フォロワーがリーダーに遅れることを許可される最大ティック数。 |
|
| 2 | ZooKeeper クラスター内でフォロワーがリーダーと同期していなくても許容される最大ティック数。 |
|
| 1 |
|
|
| false | ZooKeeper 管理サーバーを無効にするフラグ。管理サーバーは、Streams for Apache Kafka では使用されません。 |
Kafka カスタムリソースの Zookeeper.config としてこれらのデフォルト値を変更すると、ZooKeeper クラスターの動作とパフォーマンスに影響を与える可能性があります。
9.2.3. アノテーションを使用した Kafka ノードの削除 リンクのコピーリンクがクリップボードにコピーされました!
この手順では、OpenShift アノテーションを使用して既存の Kafka ノードを削除する方法を説明します。Kafka ノードの削除するには、Kafka ブローカーが稼働している Pod と、関連する PersistentVolumeClaim の両方を削除します (クラスターが永続ストレージでデプロイされた場合)。削除後、Pod と関連する PersistentVolumeClaim が自動的に再作成されます。
PersistentVolumeClaim を削除すると、永久的なデータ損失が発生する可能性があり、クラスターの可用性は保証できません。以下の手順は、ストレージで問題が発生した場合にのみ実行してください。
前提条件
- 稼働中の Cluster Operator
手順
削除する
Podの名前を見つけます。Kafka ブローカー Pod の名前は
<cluster_name>-kafka-<index_number>です。ここで、<index_number>は 0 から始まり、レプリカの合計数から 1 を引いた値で終わります。例:my-cluster-kafka-0oc annotateを使用して、OpenShift でPodリソースにアノテーションを付けます。oc annotate pod <cluster_name>-kafka-<index_number> strimzi.io/delete-pod-and-pvc="true"- 基盤となる永続ボリューム要求 (Persistent Volume Claim) でアノテーションが付けられた Pod が削除され、再作成されるときに、次の調整の実行を待ちます。
9.2.4. アノテーションを使用した ZooKeeper ノードの削除 リンクのコピーリンクがクリップボードにコピーされました!
この手順では、OpenShift アノテーションを使用して既存の ZooKeeper ノードを削除する方法を説明します。ZooKeeper ノードを削除するには、ZooKeeper が稼働している Pod と、関連する PersistentVolumeClaim の両方を削除します (クラスターが永続ストレージでデプロイされた場合)。削除後、Pod と関連する PersistentVolumeClaim が自動的に再作成されます。
PersistentVolumeClaim を削除すると、永久的なデータ損失が発生する可能性があり、クラスターの可用性は保証できません。以下の手順は、ストレージで問題が発生した場合にのみ実行してください。
前提条件
- 稼働中の Cluster Operator
手順
削除する
Podの名前を見つけます。ZooKeeper Pod の名前は
<cluster_name>-zookeeper-<index_number>です。ここで、<index_number>は 0 から始まり、レプリカの合計数から 1 を引いた値で終わります。例:my-cluster-zookeeper-0oc annotateを使用して、OpenShift でPodリソースにアノテーションを付けます。oc annotate pod <cluster_name>-zookeeper-<index_number> strimzi.io/delete-pod-and-pvc="true"- 基盤となる永続ボリューム要求 (Persistent Volume Claim) でアノテーションが付けられた Pod が削除され、再作成されるときに、次の調整の実行を待ちます。