8.2. Kafka の設定


Kafka カスタムリソースの spec プロパティーを更新して、Kafka デプロイメントを設定します。

Kafka の設定に加え、ZooKeeper および AMQ Streams Operator の設定を追加することもできます。ロギングやヘルスチェックなどの一般的な設定プロパティーは、コンポーネントごとに独立して設定されます。

特に重要な設定オプションには次のものがあります。

  • リソース要求 (CPU/メモリー)
  • 最大および最小メモリー割り当ての JVM オプション
  • クライアントを Kafka ブローカーに接続するためのリスナー (およびクライアントの認証)
  • 認証
  • ストレージ
  • ラックアウェアネス
  • メトリック
  • Cruise Control によるクラスターのリバランス

Kafka クラスター設定オプションの詳細は、AMQ Streams カスタムリソースの API リファレンス を参照してください。

Kafka バージョン

Kafka configinter.broker.protocol.version プロパティーは、指定された Kafka バージョン (spec.kafka.version) によってサポートされるバージョンである必要があります。このプロパティーは、Kafka クラスターで使用される Kafka プロトコルのバージョンを表します。

Kafka 3.0.0 以降、inter.broker.protocol.version3.0 以上に設定されていると、log.message.format.version オプションは無視されるため、設定する必要はありません。

Kafka バージョンのアップグレード時には、inter.broker.protocol.version のアップグレードが必要です。詳細は、Upgrading Kafka を参照してください。

TLS 証明書の管理

Kafka をデプロイする場合、Cluster Operator は自動で TLS 証明書の設定および更新を行い、クラスター内での暗号化および認証を有効にします。必要な場合は、更新期間の開始前にクラスターおよびクライアント CA 証明書を手動で更新できます。クラスターおよびクライアント CA 証明書によって使用される鍵を置き換えることもできます。詳細は、CA 証明書の手動更新 および 秘密鍵の置換 を参照してください。

Kafka カスタムリソース設定の例

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    replicas: 3 
1

    version: 3.6.0 
2

    logging: 
3

      type: inline
      loggers:
        kafka.root.logger.level: INFO
    resources: 
4

      requests:
        memory: 64Gi
        cpu: "8"
      limits:
        memory: 64Gi
        cpu: "12"
    readinessProbe: 
5

      initialDelaySeconds: 15
      timeoutSeconds: 5
    livenessProbe:
      initialDelaySeconds: 15
      timeoutSeconds: 5
    jvmOptions: 
6

      -Xms: 8192m
      -Xmx: 8192m
    image: my-org/my-image:latest 
7

    listeners: 
8

      - name: plain 
9

        port: 9092 
10

        type: internal 
11

        tls: false 
12

        configuration:
          useServiceDnsDomain: true 
13

      - name: tls
        port: 9093
        type: internal
        tls: true
        authentication: 
14

          type: tls
      - name: external1 
15

        port: 9094
        type: route
        tls: true
        configuration:
          brokerCertChainAndKey: 
16

            secretName: my-secret
            certificate: my-certificate.crt
            key: my-key.key
    authorization: 
17

      type: simple
    config: 
18

      auto.create.topics.enable: "false"
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      default.replication.factor: 3
      min.insync.replicas: 2
      inter.broker.protocol.version: "3.6"
    storage: 
19

      type: persistent-claim 
20

      size: 10000Gi
    rack: 
21

      topologyKey: topology.kubernetes.io/zone
    metricsConfig: 
22

      type: jmxPrometheusExporter
      valueFrom:
        configMapKeyRef: 
23

          name: my-config-map
          key: my-key
    # ...
  zookeeper: 
24

    replicas: 3 
25

    logging: 
26

      type: inline
      loggers:
        zookeeper.root.logger: INFO
    resources:
      requests:
        memory: 8Gi
        cpu: "2"
      limits:
        memory: 8Gi
        cpu: "2"
    jvmOptions:
      -Xms: 4096m
      -Xmx: 4096m
    storage:
      type: persistent-claim
      size: 1000Gi
    metricsConfig:
      # ...
  entityOperator: 
27

    tlsSidecar: 
28

      resources:
        requests:
          cpu: 200m
          memory: 64Mi
        limits:
          cpu: 500m
          memory: 128Mi
    topicOperator:
      watchedNamespace: my-topic-namespace
      reconciliationIntervalSeconds: 60
      logging: 
29

        type: inline
        loggers:
          rootLogger.level: INFO
      resources:
        requests:
          memory: 512Mi
          cpu: "1"
        limits:
          memory: 512Mi
          cpu: "1"
    userOperator:
      watchedNamespace: my-topic-namespace
      reconciliationIntervalSeconds: 60
      logging: 
30

        type: inline
        loggers:
          rootLogger.level: INFO
      resources:
        requests:
          memory: 512Mi
          cpu: "1"
        limits:
          memory: 512Mi
          cpu: "1"
  kafkaExporter: 
31

    # ...
  cruiseControl: 
32

    # ...
Copy to Clipboard Toggle word wrap

1
レプリカノードの数。
2
Kafka バージョン。アップグレード手順に従うと、サポート対象のバージョンに変更できます。
3
ConfigMap を介して直接 (inline) または間接 (external) に追加された Kafka ロガーとログレベル。カスタム Log4j 設定は、ConfigMap の log4j.properties キーの下に配置する必要があります。Kafka kafka.root.logger.level ロガーでは、ログレベルを INFO、ERROR、WARN、TRACE、DEBUG、FATAL または OFF に設定できます。
4
現在 cpu および memory である、サポートされるリソースの予約を要求し、消費可能な最大リソースを指定を制限します。
5
コンテナーを再起動するタイミング (liveness) およびコンテナーがトラフィックを許可できるタイミング (readiness) を把握するためのヘルスチェック。
6
Kafka を実行している仮想マシン (VM) のパフォーマンスを最適化するための JVM 設定オプション。
7
高度なオプション: コンテナーイメージの設定。特別な状況でのみ推奨されます。
8
リスナーは、ブートストラップアドレスでクライアントが Kafka クラスターに接続する方法を設定します。リスナーは、OpenShift クラスター内部または外部からの接続の 内部 または 外部 リスナーとして設定されます。
9
リスナーを識別するための名前。Kafka クラスター内で一意である必要があります。
10
Kafka 内でリスナーによって使用されるポート番号。ポート番号は指定の Kafka クラスター内で一意である必要があります。許可されるポート番号は 9092 以上ですが、すでに Prometheus および JMX によって使用されているポート 9404 および 9999 以外になります。リスナーのタイプによっては、ポート番号は Kafka クライアントに接続するポート番号と同じではない場合があります。
11
リスナーのタイプは、internal または cluster-ip (ブローカーごとの ClusterIP サービスを使用して Kafka を公開するため) として指定されるか、外部リスナーの場合は、route (OpenShift のみ)、loadbalancernodeport または ingress (Kubernetes のみ) として指定されます。
12
各リスナーの TLS 暗号化を有効にします。デフォルトは false です。TLS 暗号化は、route および ingress タイプのリスナーに対して true に設定して有効にする必要があります。
13
クラスターサービス接尾辞 (通常は cluster.local) を含む完全修飾 DNS 名が割り当てられているかどうかを定義します。
14
mTLS、SCRAM-SHA-512、またはトークンベースの OAuth 2.0 として指定されるリスナー認証メカニズム。
15
外部リスナー設定は、routeloadbalancer、または nodeport からなど、Kafka クラスターが外部の OpenShift に公開される方法を指定します。
16
外部 CA (認証局) によって管理される Kafka リスナー証明書のオプションの設定。brokerCertChainAndKey は、サーバー証明書および秘密鍵が含まれる Secret を指定します。TLS による暗号化が有効な任意のリスナーで Kafka リスナー証明書を設定できます。
17
認可は Kafka ブローカーで簡易、OAUTH2.0、または OPA 認可を有効化します。簡易認可では、AclAuthorizer および StandardAuthorizer Kafka プラグインが使用されます。
18
ブローカー設定。標準の Apache Kafka 設定が提供されることがありますが、AMQ Streams によって直接管理されないプロパティーに限定されます。
19
永続ボリュームのストレージサイズは拡張可能で、さらに JBOD ストレージへのボリューム追加が可能です。
20
永続ストレージには、動的ボリュームプロビジョニングのためのストレージ idclass など、追加の設定オプションがあります。
21
異なるラック、データセンター、またはアベイラビリティーゾーンにレプリカを分散させるためのラックアウェアネス設定。topologyKey は、ラック ID を含むノードラベルと一致する必要があります。この設定で使用される例では、標準の topology.kubernetes.io/zone ラベルを使用するゾーンを指定します。
22
Prometheus メトリックが有効になりました。この例では、メトリクスは Prometheus JMX Exporter (デフォルトのメトリクスエクスポーター) に対して設定されます。
23
Prometheus JMX Exporter 経由で Prometheus 形式のメトリクスを Grafana ダッシュボードにエクスポートするルール。Prometheus JMX Exporter の設定が含まれる ConfigMap を参照することで有効になります。metricsConfig.valueFrom.configMapKeyRef.key 配下に空のファイルが含まれる ConfigMap の参照を使用して、追加設定なしでメトリックを有効にできます。
24
Kafka 設定と似たプロパティーが含まれる、ZooKeeper 固有の設定。
25
ZooKeeper ノードの数。通常、ZooKeeper クラスターまたはアンサンブルは、一般的に 3、5、7 個の奇数個のノードで実行されます。効果的なクォーラムを維持するには、過半数のノードが利用可能である必要があります。ZooKeeper クラスターでクォーラムを失うと、クライアントへの応答が停止し、Kafka ブローカーが機能しなくなります。AMQ Streams では、ZooKeeper クラスターの安定性および高可用性が重要になります。
26
ZooKeeper ロガーとログレベル。
27
Topic Operator および User Operator の設定を指定する Entity Operator 設定。
28
Entity Operator の TLS サイドカー設定。Entity Operator は、ZooKeeper とのセキュアな通信に TLS サイドカーを使用します。
29
指定された Topic Operator ロガーおよびログレベル。この例では、inline ロギングを使用します。
30
指定された User Operator ロガーおよびログレベル。
31
Kafka Exporter の設定。Kafka Exporter は、Kafka ブローカーからメトリックデータ、特にコンシューマーラグデータを抽出するためのオプションのコンポーネントです。Kafka Exporter が適切に機能できるようにするには、コンシューマーグループを使用する必要があります。
32
Kafka クラスターの再バランスに使用される Cruise Control のオプションの設定。

8.2.1. Kafka Static Quota プラグインを使用したブローカーへの制限の設定

Kafka Static Quota プラグインを使用して、Kafka クラスターのブローカーにスループットおよびストレージの制限を設定します。Kafka リソースを設定して、プラグインを有効にし、制限を設定します。バイトレートのしきい値およびストレージクォータを設定して、ブローカーと対話するクライアントに制限を設けることができます。

プロデューサーおよびコンシューマー帯域幅にバイトレートのしきい値を設定できます。制限の合計は、ブローカーにアクセスするすべてのクライアントに分散されます。たとえば、バイトレートのしきい値として 40 MBps ををプロデューサーに設定できます。2 つのプロデューサーが実行されている場合、それぞれのスループットは 20MBps に制限されます。

ストレージクォータは、Kafka ディスクストレージの制限をソフト制限とハード制限間で調整します。この制限は、利用可能なすべてのディスク容量に適用されます。プロデューサーは、ソフト制限とハード制限の間で徐々に遅くなります。制限により、ディスクの使用量が急激に増加しないようにし、容量を超えないようにします。ディスクがいっぱいになると、修正が難しい問題が発生する可能性があります。ハード制限は、ストレージの上限です。

注記

JBOD ストレージの場合、制限はすべてのディスクに適用されます。ブローカーが 2 つの 1 TB ディスクを使用し、クォータが 1.1 TB の場合は、1 つのディスクにいっぱいになり、別のディスクがほぼ空になることがあります。

前提条件

  • Kafka クラスターを管理する Cluster Operator が稼働している。

手順

  1. Kafka リソースの config にプラグインのプロパティーを追加します。

    プラグインプロパティーは、この設定例のとおりです。

    Kafka Static Quota プラグインの設定例

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        # ...
        config:
          client.quota.callback.class: io.strimzi.kafka.quotas.StaticQuotaCallback 
    1
    
          client.quota.callback.static.produce: 1000000 
    2
    
          client.quota.callback.static.fetch: 1000000 
    3
    
          client.quota.callback.static.storage.soft: 400000000000 
    4
    
          client.quota.callback.static.storage.hard: 500000000000 
    5
    
          client.quota.callback.static.storage.check-interval: 5 
    6
    Copy to Clipboard Toggle word wrap

    1
    Kafka Static Quota プラグインを読み込みます。
    2
    プロデューサーのバイトレートしきい値を設定します。この例では 1 MBps です。
    3
    コンシューマーのバイトレートしきい値を設定します。この例では 1 MBps です。
    4
    ストレージのソフト制限の下限を設定します。この例では 400 GB です。
    5
    ストレージのハード制限の上限を設定します。この例では 500 GB です。
    6
    ストレージのチェックの間隔 (秒単位) を設定します。この例では 5 秒です。これを 0 に設定するとチェックを無効にできます。
  2. リソースを更新します。

    oc apply -f <kafka_configuration_file>
    Copy to Clipboard Toggle word wrap

8.2.2. ZooKeeper のデフォルト設定値

AMQ Streams を使用して ZooKeeper をデプロイする場合、AMQ Streams によって設定されるデフォルト設定の一部は、標準の ZooKeeper デフォルトとは異なります。これは、AMQ Streams が、OpenShift 環境内で ZooKeeper を実行するために最適化された値を使用して、多数の ZooKeeper プロパティーを設定するためです。

AMQ Streams の主要な ZooKeeper プロパティーのデフォルト設定は次のとおりです。

Expand
表8.1 AMQ Streams のデフォルトの ZooKeeper プロパティー
プロパティーデフォルト値説明

tickTime

2000

1 ティックの長さ (ミリ秒単位)。これによってセッションタイムアウトの長さが決まります。

initLimit

5

ZooKeeper クラスター内で、フォロワーがリーダーに遅れることを許可される最大ティック数。

syncLimit

2

ZooKeeper クラスター内でフォロワーがリーダーと同期していなくても許容される最大ティック数。

autopurge.purgeInterval

1

autopurge 機能を有効にし、サーバー側の ZooKeeper トランザクションログをパージする間隔を時間単位で設定します。

admin.enableServer

false

ZooKeeper 管理サーバーを無効にするフラグ。管理サーバーは AMQ Streams では使用されません。

重要

Kafka カスタムリソースの Zookeeper.config としてこれらのデフォルト値を変更すると、ZooKeeper クラスターの動作とパフォーマンスに影響を与える可能性があります。

トップに戻る
Red Hat logoGithubredditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。 最新の更新を見る.

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

Theme

© 2025 Red Hat