9.2. Kafka の設定


Kafka カスタムリソースの spec プロパティーを更新して、Kafka デプロイメントを設定します。

Kafka の設定に加えて、ZooKeeper と Streams for Apache Kafka Operator の設定を追加できます。ロギングやヘルスチェックなどの一般的な設定プロパティーは、コンポーネントごとに独立して設定されます。

特に重要な設定オプションには次のものがあります。

  • リソース要求 (CPU/メモリー)
  • 最大および最小メモリー割り当ての JVM オプション
  • クライアントを Kafka ブローカーに接続するためのリスナー (およびクライアントの認証)
  • 認証
  • ストレージ
  • ラックアウェアネス
  • メトリック
  • Cruise Control によるクラスターのリバランス
  • KRaft ベースの Kafka クラスターのメタデータバージョン
  • ZooKeeper ベースの Kafka クラスターのブローカー間プロトコルバージョン

config.spec.kafka.metadataVersion プロパティーまたは inter.broker.protocol.version プロパティーは、指定された Kafka バージョン (spec.kafka.version) でサポートされるバージョンにする必要があります。このプロパティーは、Kafka クラスターで使用される Kafka メタデータまたはブローカー間プロトコルのバージョンを表します。これらのプロパティーのいずれかが設定で設定されていない場合、Cluster Operator はバージョンを、使用されている Kafka バージョンのデフォルトに更新します。

注記

サポートされている最も古いメタデータのバージョンは 3.3 です。Kafka バージョンより古いメタデータバージョンを使用すると、一部の機能が無効になる可能性があります。

Kafka クラスターの設定オプションの詳細は、Streams for Apache Kafka カスタムリソース API リファレンス を参照してください。

TLS 証明書の管理

Kafka をデプロイする場合、Cluster Operator は自動で TLS 証明書の設定および更新を行い、クラスター内での暗号化および認証を有効にします。必要な場合は、更新期間の開始前にクラスターおよびクライアント CA 証明書を手動で更新できます。クラスターおよびクライアント CA 証明書によって使用される鍵を置き換えることもできます。詳細は、CA 証明書の手動更新 および 秘密鍵の置換 を参照してください。

Kafka カスタムリソース設定の例

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    replicas: 3 1
    version: 3.7.0 2
    logging: 3
      type: inline
      loggers:
        kafka.root.logger.level: INFO
    resources: 4
      requests:
        memory: 64Gi
        cpu: "8"
      limits:
        memory: 64Gi
        cpu: "12"
    readinessProbe: 5
      initialDelaySeconds: 15
      timeoutSeconds: 5
    livenessProbe:
      initialDelaySeconds: 15
      timeoutSeconds: 5
    jvmOptions: 6
      -Xms: 8192m
      -Xmx: 8192m
    image: my-org/my-image:latest 7
    listeners: 8
      - name: plain 9
        port: 9092 10
        type: internal 11
        tls: false 12
        configuration:
          useServiceDnsDomain: true 13
      - name: tls
        port: 9093
        type: internal
        tls: true
        authentication: 14
          type: tls
      - name: external1 15
        port: 9094
        type: route
        tls: true
        configuration:
          brokerCertChainAndKey: 16
            secretName: my-secret
            certificate: my-certificate.crt
            key: my-key.key
    authorization: 17
      type: simple
    config: 18
      auto.create.topics.enable: "false"
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      default.replication.factor: 3
      min.insync.replicas: 2
      inter.broker.protocol.version: "3.7"
    storage: 19
      type: persistent-claim 20
      size: 10000Gi
    rack: 21
      topologyKey: topology.kubernetes.io/zone
    metricsConfig: 22
      type: jmxPrometheusExporter
      valueFrom:
        configMapKeyRef: 23
          name: my-config-map
          key: my-key
    # ...
  zookeeper: 24
    replicas: 3 25
    logging: 26
      type: inline
      loggers:
        zookeeper.root.logger: INFO
    resources:
      requests:
        memory: 8Gi
        cpu: "2"
      limits:
        memory: 8Gi
        cpu: "2"
    jvmOptions:
      -Xms: 4096m
      -Xmx: 4096m
    storage:
      type: persistent-claim
      size: 1000Gi
    metricsConfig:
      # ...
  entityOperator: 27
    tlsSidecar: 28
      resources:
        requests:
          cpu: 200m
          memory: 64Mi
        limits:
          cpu: 500m
          memory: 128Mi
    topicOperator:
      watchedNamespace: my-topic-namespace
      reconciliationIntervalSeconds: 60
      logging: 29
        type: inline
        loggers:
          rootLogger.level: INFO
      resources:
        requests:
          memory: 512Mi
          cpu: "1"
        limits:
          memory: 512Mi
          cpu: "1"
    userOperator:
      watchedNamespace: my-topic-namespace
      reconciliationIntervalSeconds: 60
      logging: 30
        type: inline
        loggers:
          rootLogger.level: INFO
      resources:
        requests:
          memory: 512Mi
          cpu: "1"
        limits:
          memory: 512Mi
          cpu: "1"
  kafkaExporter: 31
    # ...
  cruiseControl: 32
    # ...

1
レプリカノードの数。
2
Kafka バージョン。アップグレード手順に従うと、サポート対象のバージョンに変更できます。
3
ConfigMap を介して直接 (inline) または間接 (external) に追加された Kafka ロガーとログレベル。カスタム Log4j 設定は、ConfigMap の log4j.properties キーの下に配置する必要があります。Kafka kafka.root.logger.level ロガーでは、ログレベルを INFO、ERROR、WARN、TRACE、DEBUG、FATAL または OFF に設定できます。
4
現在 cpu および memory である、サポートされるリソースの予約を要求し、消費可能な最大リソースを指定を制限します。
5
コンテナーを再起動するタイミング (liveness) およびコンテナーがトラフィックを許可できるタイミング (readiness) を把握するためのヘルスチェック。
6
Kafka を実行している仮想マシン (VM) のパフォーマンスを最適化するための JVM 設定オプション。
7
高度なオプション: コンテナーイメージの設定。特別な状況でのみ推奨されます。
8
リスナーは、ブートストラップアドレスでクライアントが Kafka クラスターに接続する方法を設定します。リスナーは、OpenShift クラスター内部または外部からの接続の 内部 または 外部 リスナーとして設定されます。
9
リスナーを識別するための名前。Kafka クラスター内で一意である必要があります。
10
Kafka 内でリスナーによって使用されるポート番号。ポート番号は指定の Kafka クラスター内で一意である必要があります。許可されるポート番号は 9092 以上ですが、すでに Prometheus および JMX によって使用されているポート 9404 および 9999 以外になります。リスナーのタイプによっては、ポート番号は Kafka クライアントに接続するポート番号と同じではない場合があります。
11
リスナーのタイプは、internal または cluster-ip (ブローカーごとの ClusterIP サービスを使用して Kafka を公開するため) として指定されるか、外部リスナーの場合は、route (OpenShift のみ)、loadbalancernodeport または ingress (Kubernetes のみ) として指定されます。
12
各リスナーの TLS 暗号化を有効または無効にします。route および Ingress タイプのリスナーの場合、TLS 暗号化を常に true に設定して有効にする必要があります。
13
クラスターサービス接尾辞 (通常は cluster.local) を含む完全修飾 DNS 名が割り当てられているかどうかを定義します。
14
mTLS、SCRAM-SHA-512、またはトークンベースの OAuth 2.0 として指定されるリスナー認証メカニズム。
15
外部リスナー設定は、routeloadbalancer、または nodeport からなど、Kafka クラスターが外部の OpenShift に公開される方法を指定します。
16
外部 CA (認証局) によって管理される Kafka リスナー証明書のオプションの設定。brokerCertChainAndKey は、サーバー証明書および秘密鍵が含まれる Secret を指定します。TLS による暗号化が有効な任意のリスナーで Kafka リスナー証明書を設定できます。
17
認可は Kafka ブローカーで簡易、OAUTH2.0、または OPA 認可を有効化します。簡易認可では、AclAuthorizer および StandardAuthorizer Kafka プラグインが使用されます。
18
ブローカー設定。Streams for Apache Kafka によって直接管理されないプロパティーに限り、標準の Apache Kafka 設定を指定できます。
19
永続ボリュームのストレージサイズは拡張可能で、さらに JBOD ストレージへのボリューム追加が可能です。
20
永続ストレージには、動的ボリュームプロビジョニングのためのストレージ idclass など、追加の設定オプションがあります。
21
異なるラック、データセンター、またはアベイラビリティーゾーンにレプリカを分散させるためのラックアウェアネス設定。topologyKey は、ラック ID を含むノードラベルと一致する必要があります。この設定で使用される例では、標準の topology.kubernetes.io/zone ラベルを使用するゾーンを指定します。
22
Prometheus メトリックが有効になりました。この例では、メトリクスは Prometheus JMX Exporter (デフォルトのメトリクスエクスポーター) に対して設定されます。
23
Prometheus JMX Exporter 経由で Prometheus 形式のメトリクスを Grafana ダッシュボードにエクスポートするルール。Prometheus JMX Exporter の設定が含まれる ConfigMap を参照することで有効になります。metricsConfig.valueFrom.configMapKeyRef.key 配下に空のファイルが含まれる ConfigMap の参照を使用して、追加設定なしでメトリックを有効にできます。
24
Kafka 設定と似たプロパティーが含まれる、ZooKeeper 固有の設定。
25
ZooKeeper ノードの数。通常、ZooKeeper クラスターまたはアンサンブルは、一般的に 3、5、7 個の奇数個のノードで実行されます。効果的なクォーラムを維持するには、過半数のノードが利用可能である必要があります。ZooKeeper クラスターでクォーラムを失うと、クライアントへの応答が停止し、Kafka ブローカーが機能しなくなります。Streams for Apache Kafka では、ZooKeeper クラスターの安定性および高可用性が重要です。
26
ZooKeeper ロガーとログレベル。
27
Topic Operator および User Operator の設定を指定する Entity Operator 設定。
28
Entity Operator の TLS サイドカー設定。Entity Operator は、ZooKeeper とのセキュアな通信に TLS サイドカーを使用します。
29
指定された Topic Operator ロガーおよびログレベル。この例では、inline ロギングを使用します。
30
指定された User Operator ロガーおよびログレベル。
31
Kafka Exporter の設定。Kafka Exporter は、Kafka ブローカーからメトリックデータ、特にコンシューマーラグデータを抽出するためのオプションのコンポーネントです。Kafka Exporter が適切に機能できるようにするには、コンシューマーグループを使用する必要があります。
32
Kafka クラスターの再バランスに使用される Cruise Control のオプションの設定。

9.2.1. Kafka Static Quota プラグインを使用したブローカーへの制限の設定

Kafka Static Quota プラグインを使用して、Kafka クラスターのブローカーにスループットおよびストレージの制限を設定します。Kafka リソースを設定して、プラグインを有効にし、制限を設定します。バイトレートのしきい値およびストレージクォータを設定して、ブローカーと対話するクライアントに制限を設けることができます。

プロデューサーおよびコンシューマー帯域幅にバイトレートのしきい値を設定できます。制限の合計は、ブローカーにアクセスするすべてのクライアントに分散されます。たとえば、バイトレートのしきい値として 40 MBps ををプロデューサーに設定できます。2 つのプロデューサーが実行されている場合、それぞれのスループットは 20MBps に制限されます。

ストレージクォータは、Kafka ディスクストレージの制限をソフト制限とハード制限間で調整します。この制限は、利用可能なすべてのディスク容量に適用されます。プロデューサーは、ソフト制限とハード制限の間で徐々に遅くなります。制限により、ディスクの使用量が急激に増加しないようにし、容量を超えないようにします。ディスクがいっぱいになると、修正が難しい問題が発生する可能性があります。ハード制限は、ストレージの上限です。

注記

JBOD ストレージの場合、制限はすべてのディスクに適用されます。ブローカーが 2 つの 1 TB ディスクを使用し、クォータが 1.1 TB の場合は、1 つのディスクにいっぱいになり、別のディスクがほぼ空になることがあります。

前提条件

  • Kafka クラスターを管理する Cluster Operator が稼働している。

手順

  1. Kafka リソースの config にプラグインのプロパティーを追加します。

    プラグインプロパティーは、この設定例のとおりです。

    Kafka Static Quota プラグインの設定例

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        # ...
        config:
          client.quota.callback.class: io.strimzi.kafka.quotas.StaticQuotaCallback 1
          client.quota.callback.static.produce: 1000000 2
          client.quota.callback.static.fetch: 1000000 3
          client.quota.callback.static.storage.soft: 400000000000 4
          client.quota.callback.static.storage.hard: 500000000000 5
          client.quota.callback.static.storage.check-interval: 5 6

    1
    Kafka Static Quota プラグインを読み込みます。
    2
    プロデューサーのバイトレートしきい値を設定します。この例では 1 MBps です。
    3
    コンシューマーのバイトレートしきい値を設定します。この例では 1 MBps です。
    4
    ストレージのソフト制限の下限を設定します。この例では 400 GB です。
    5
    ストレージのハード制限の上限を設定します。この例では 500 GB です。
    6
    ストレージのチェックの間隔 (秒単位) を設定します。この例では 5 秒です。これを 0 に設定するとチェックを無効にできます。
  2. リソースを更新します。

    oc apply -f <kafka_configuration_file>

9.2.2. ZooKeeper のデフォルト設定値

Streams for Apache Kafka を使用して ZooKeeper をデプロイする場合、Streams for Apache Kafka によって設定されるデフォルト設定の一部が、標準の ZooKeeper のデフォルト設定と異なります。これは、Streams for Apache Kafka が、OpenShift 環境内で ZooKeeper を実行するために最適化された値を使用して、多数の ZooKeeper プロパティーを設定するためです。

Streams for Apache Kafka の主要な ZooKeeper プロパティーのデフォルト設定は次のとおりです。

表9.1 Streams for Apache Kafka のデフォルトの ZooKeeper プロパティー
プロパティーデフォルト値説明

tickTime

2000

1 ティックの長さ (ミリ秒単位)。これによってセッションタイムアウトの長さが決まります。

initLimit

5

ZooKeeper クラスター内で、フォロワーがリーダーに遅れることを許可される最大ティック数。

syncLimit

2

ZooKeeper クラスター内でフォロワーがリーダーと同期していなくても許容される最大ティック数。

autopurge.purgeInterval

1

autopurge 機能を有効にし、サーバー側の ZooKeeper トランザクションログをパージする間隔を時間単位で設定します。

admin.enableServer

false

ZooKeeper 管理サーバーを無効にするフラグ。管理サーバーは、Streams for Apache Kafka では使用されません。

重要

Kafka カスタムリソースの Zookeeper.config としてこれらのデフォルト値を変更すると、ZooKeeper クラスターの動作とパフォーマンスに影響を与える可能性があります。

9.2.3. アノテーションを使用した Kafka ノードの削除

この手順では、OpenShift アノテーションを使用して既存の Kafka ノードを削除する方法を説明します。Kafka ノードの削除するには、Kafka ブローカーが稼働している Pod と、関連する PersistentVolumeClaim の両方を削除します (クラスターが永続ストレージでデプロイされた場合)。削除後、Pod と関連する PersistentVolumeClaim が自動的に再作成されます。

警告

PersistentVolumeClaim を削除すると、永久的なデータ損失が発生する可能性があり、クラスターの可用性は保証できません。以下の手順は、ストレージで問題が発生した場合にのみ実行してください。

前提条件

  • 稼働中の Cluster Operator

手順

  1. 削除する Pod の名前を見つけます。

    Kafka ブローカー Pod の名前は <cluster_name>-kafka-<index_number> です。ここで、<index_number> は 0 から始まり、レプリカの合計数から 1 を引いた値で終わります。例: my-cluster-kafka-0

  2. oc annotate を使用して、OpenShift で Pod リソースにアノテーションを付けます。

    oc annotate pod <cluster_name>-kafka-<index_number> strimzi.io/delete-pod-and-pvc="true"
  3. 基盤となる永続ボリューム要求 (Persistent Volume Claim) でアノテーションが付けられた Pod が削除され、再作成されるときに、次の調整の実行を待ちます。

9.2.4. アノテーションを使用した ZooKeeper ノードの削除

この手順では、OpenShift アノテーションを使用して既存の ZooKeeper ノードを削除する方法を説明します。ZooKeeper ノードを削除するには、ZooKeeper が稼働している Pod と、関連する PersistentVolumeClaim の両方を削除します (クラスターが永続ストレージでデプロイされた場合)。削除後、Pod と関連する PersistentVolumeClaim が自動的に再作成されます。

警告

PersistentVolumeClaim を削除すると、永久的なデータ損失が発生する可能性があり、クラスターの可用性は保証できません。以下の手順は、ストレージで問題が発生した場合にのみ実行してください。

前提条件

  • 稼働中の Cluster Operator

手順

  1. 削除する Pod の名前を見つけます。

    ZooKeeper Pod の名前は <cluster_name>-zookeeper-<index_number> です。ここで、<index_number> は 0 から始まり、レプリカの合計数から 1 を引いた値で終わります。例: my-cluster-zookeeper-0

  2. oc annotate を使用して、OpenShift で Pod リソースにアノテーションを付けます。

    oc annotate pod <cluster_name>-zookeeper-<index_number> strimzi.io/delete-pod-and-pvc="true"
  3. 基盤となる永続ボリューム要求 (Persistent Volume Claim) でアノテーションが付けられた Pod が削除され、再作成されるときに、次の調整の実行を待ちます。
Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.