第3章 デプロイメント設定


本章では、サポートされるデプロイメントの異なる側面を設定する方法について説明します。

  • Kafka クラスター
  • Kafka Connect クラスター
  • Source2Image がサポートされる Kafka Connect クラスター
  • Kafka Mirror Maker
  • Kafka Bridge
  • OAuth 2.0 のトークンベースの認証
  • OAuth 2.0 のトークンベースの承認

3.1. Kafka クラスターの設定

Kafka リソースの完全なスキーマは Kafka スキーマ参照」 に記載されています。指定の Kafka リソースに適用されたすべてのラベルは、Kafka クラスターを設定する OpenShift リソースにも適用されます。そのため、必要に応じてリソースにラベルが適用されるため便利です。

3.1.1. Kafka YAML の設定例

Kafka デプロイメントで利用可能な設定オプションを理解するには、ここに提供されるサンプル YAML ファイルを参照してください。

例では、可能な設定オプションの一部のみを取り上げますが、特に重要なオプションは次のとおりです。

  • リソース要求 (CPU/メモリー)
  • 最大および最小メモリー割り当ての JVM オプション
  • リスナー (および認証)
  • 認証
  • ストレージ
  • ラックアウェアネス
  • メトリクス
apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    replicas: 3 1
    version: 1.5 2
    resources: 3
      requests:
        memory: 64Gi
        cpu: "8"
      limits: 4
        memory: 64Gi
        cpu: "12"
    jvmOptions: 5
      -Xms: 8192m
      -Xmx: 8192m
    listeners: 6
      tls:
        authentication:7
          type: tls
      external: 8
        type: route
        authentication:
          type: tls
        configuration:
          brokerCertChainAndKey: 9
            secretName: my-secret
            certificate: my-certificate.crt
            key: my-key.key
    authorization: 10
      type: simple
    config: 11
      auto.create.topics.enable: "false"
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      ssl.cipher.suites: "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384" 12
      ssl.enabled.protocols: "TLSv1.2"
      ssl.protocol: "TLSv1.2"
    storage: 13
      type: persistent-claim 14
      size: 10000Gi 15
    rack: 16
      topologyKey: failure-domain.beta.kubernetes.io/zone
    metrics: 17
      lowercaseOutputName: true
      rules: 18
      # Special cases and very specific rules
      - pattern : kafka.server<type=(.+), name=(.+), clientId=(.+), topic=(.+), partition=(.*)><>Value
        name: kafka_server_$1_$2
        type: GAUGE
        labels:
          clientId: "$3"
          topic: "$4"
          partition: "$5"
        # ...
  zookeeper: 19
    replicas: 3
    resources:
      requests:
        memory: 8Gi
        cpu: "2"
      limits:
        memory: 8Gi
        cpu: "2"
    jvmOptions:
      -Xms: 4096m
      -Xmx: 4096m
    storage:
      type: persistent-claim
      size: 1000Gi
    metrics:
      # ...
  entityOperator: 20
    topicOperator:
      resources:
        requests:
          memory: 512Mi
          cpu: "1"
        limits:
          memory: 512Mi
          cpu: "1"
    userOperator:
      resources:
        requests:
          memory: 512Mi
          cpu: "1"
        limits:
          memory: 512Mi
          cpu: "1"
  kafkaExporter: 21
    # ...
  cruiseControl: 22
    # ...
1
レプリカは、ブローカーノードの数を指定 します。
2
3
4
リソースの制限は、コンテナーによって消費可能な最大リソースを指定します。
5
6
リスナーは、ブートストラップアドレスでクライアントが Kafka クラスターに接続する方法を設定します。リスナーは、plain (暗号化なし)、tls、または external として設定 されます。
7
リスナーの認証メカニズムは各リスナーに対して設定でき、相互 TLS または SCRAM-SHA として指定 できます。
8
9
外部の認証局によって管理される Kafka リスナー証明書 の任意設定。brokerCertChainAndKey プロパティーは、サーバー証明書および秘密鍵を保持する Secret を指定します。Kafka リスナー証明書も TLS リスナーに対して設定できます。
10
11
12
13
14
15
永続ストレージには、動的ボリュームプロビジョニングのためのストレージ idclass などの 追加の設定オプション があります。
16
ラックアウェアネスは、異なるラック全体でレプリカを分散 ために設定されます。topology キーはクラスターノードのラベルと一致する必要があります。
17
18
JMX Exporter でメトリクスを Grafana ダッシュボードにエクスポートする Kafka ルール。AMQ Streams によって提供されるルールのセットは Kafka リソース設定にコピーされることがあります。
19
Kafka 設定と似たプロパティーが含まれる、ZooKeeper 固有の設定
20
21
22
Kafka クラスターのリバランス を行うために使用される Cruise Control。

3.1.2. データストレージに関する留意事項

効率的なデータストレージインフラストラクチャーは、AMQ Streams のパフォーマンスを最適化するために不可欠です。

ブロックストレージが必要です。NFS などのファイルストレージは、Kafka では機能しません。

ブロックストレージには、以下などを選択できます。

注記

AMQ Streams には OpenShift の raw ブロックボリュームは必要ありません。

3.1.2.1. ファイルシステム

XFS ファイルシステムを使用するようにストレージシステムを設定することが推奨されます。AMQ Streams は ext4 ファイルシステムとも互換性がありますが、最適化するには追加の設定が必要になることがあります。

3.1.2.2. Apache Kafka および ZooKeeper ストレージ

Apache Kafka と ZooKeeper には別々のディスクを使用します。

3 種類のデータストレージがサポートされます。

  • 一時データストレージ (開発用のみで推奨されます)
  • 永続データストレージ
  • JBOD (Just a Bunch of Disks、Kafka のみに適しています)

詳細は Kafka および ZooKeeper ストレージ を参照してください。

ソリッドステートドライブ (SSD) は必須ではありませんが、複数のトピックに対してデータが非同期的に送受信される大規模なクラスターで Kafka のパフォーマンスを向上させることができます。SSD は、高速で低レイテンシーのデータアクセスが必要な ZooKeeper で特に有効です。

注記

Kafka と ZooKeeper の両方にデータレプリケーションが組み込まれているため、複製されたストレージのプロビジョニングは必要ありません。

3.1.3. Kafka および ZooKeeper のストレージタイプ

Kafka および ZooKeeper はステートフルなアプリケーションであるため、データをディスクに格納する必要があります。AMQ Streams では、3 つのタイプのストレージがサポートされます。

  • 一時ストレージ
  • 永続ストレージ
  • JBOD ストレージ
注記

JBOD ストレージは Kafka でのみサポートされ、ZooKeeper ではサポートされません。

Kafka リソースを設定する場合、Kafka ブローカーおよび対応する ZooKeeper ノードで使用されるストレージのタイプを指定できます。以下のリソースの storage プロパティーを使用して、ストレージタイプを設定します。

  • Kafka.spec.kafka
  • Kafka.spec.zookeeper

ストレージタイプは type フィールドで設定されます。

警告

Kafka クラスターをデプロイした後に、ストレージタイプを変更することはできません。

関連情報

3.1.3.1. 一時ストレージ

一時ストレージは `emptyDir` ボリュームを使用してデータを保存します。一時ストレージを使用するには、type フィールドを ephemeral に設定する必要があります。

重要

emptyDir ボリュームは永続的ではなく、保存されたデータは Pod の再起動時に失われます。新規 Pod の起動後に、クラスターの他のノードからすべてのデータを復元する必要があります。一時ストレージは、単一ノードの ZooKeeper クラスターやレプリケーション係数が 1 の Kafka トピックでの使用には適していません。これはデータが損失する原因となるからです。

一時ストレージの例

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    # ...
    storage:
      type: ephemeral
    # ...
  zookeeper:
    # ...
    storage:
      type: ephemeral
    # ...

3.1.3.1.1. ログディレクトリー

一時ボリュームは、以下のパスにマウントされるログディレクトリーとして Kafka ブローカーによって使用されます。

/var/lib/kafka/data/kafka-log_idx_
idx は、Kafka ブローカー Pod インデックスです。たとえば、/var/lib/kafka/data/kafka-log0 のようになります。

3.1.3.2. 永続ストレージ

永続ストレージは Persistent Volume Claim (永続ボリューム要求、PVC) を使用して、データを保存するための永続ボリュームをプロビジョニングします。永続ボリューム要求を使用すると、ボリュームのプロビジョニングを行う ストレージクラス に応じて、さまざまなタイプのボリュームをプロビジョニングできます。永続ボリューム要求と使用できるデータタイプには、多くのタイプの SAN ストレージや ローカル永続ボリューム などがあります。

永続ストレージを使用するには、typepersistent-claim に設定する必要があります。永続ストレージでは、追加の設定オプションがサポートされます。

id (任意)
ストレージ ID 番号。このオプションは、JBOD ストレージ宣言で定義されるストレージボリュームには必須です。デフォルトは 0 です。
size (必須)
永続ボリューム要求のサイズを定義します (例: 1000Gi)。
class (任意)
動的ボリュームプロビジョニングに使用する OpenShift の ストレージクラス
selector (任意)
使用する特定の永続ボリュームを選択できます。このようなボリュームを選択するラベルを表す key:value ペアが含まれます。
deleteClaim (任意)
クラスターのアンデプロイ時に永続ボリューム要求を削除する必要があるかどうかを指定するブール値。デフォルトは false です。
警告

既存の AMQ Streams クラスターで永続ボリュームのサイズを増やすことは、永続ボリュームのサイズ変更をサポートする OpenShift バージョンでのみサポートされます。サイズを変更する永続ボリュームには、ボリューム拡張をサポートするストレージクラスを使用する必要があります。ボリューム拡張をサポートしないその他のバージョンの OpenShift およびストレージクラスでは、クラスターをデプロイする前に必要なストレージサイズを決定する必要があります。既存の永続ボリュームのサイズを縮小することはできません。

size が 1000Gi の永続ストレージ設定の例 (抜粋)

# ...
storage:
  type: persistent-claim
  size: 1000Gi
# ...

以下の例は、ストレージクラスの使用例を示しています。

特定のストレージクラスを指定する永続ストレージ設定の例 (抜粋)

# ...
storage:
  type: persistent-claim
  size: 1Gi
  class: my-storage-class
# ...

最後に、selector を使用して特定のラベルが付いた永続ボリュームを選択し、SSD などの必要な機能を提供できます。

セレクターを指定する永続ストレージ設定の例 (抜粋)

# ...
storage:
  type: persistent-claim
  size: 1Gi
  selector:
    hdd-type: ssd
  deleteClaim: true
# ...

3.1.3.2.1. ストレージクラスのオーバーライド

デフォルトのストレージクラスを使用する代わりに、1 つ以上の Kafka ブローカーに異なるストレージクラスを指定できます。これは、ストレージクラスが、異なるアベイラビリティーゾーンやデータセンターに制限されている場合などに便利です。この場合、overrides フィールドを使用できます。

以下の例では、デフォルトのストレージクラスの名前は my-storage-class になります。

ストレージクラスのオーバーライドを使用した AMQ Streams クラスターの例

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  labels:
    app: my-cluster
  name: my-cluster
  namespace: myproject
spec:
  # ...
  kafka:
    replicas: 3
    storage:
      deleteClaim: true
      size: 100Gi
      type: persistent-claim
      class: my-storage-class
      overrides:
        - broker: 0
          class: my-storage-class-zone-1a
        - broker: 1
          class: my-storage-class-zone-1b
        - broker: 2
          class: my-storage-class-zone-1c
  # ...

overrides プロパティーが設定され、ブローカーボリュームによって以下のストレージクラスが使用されます。

  • broker 0 の永続ボリュームでは my-storage-class-zone-1a が使用されます。
  • broker 1 の永続ボリュームでは my-storage-class-zone-1b が使用されます。
  • broker 2 の永続ボリュームでは my-storage-class-zone-1c が使用されます。

現在、overrides プロパティーは、ストレージクラスの設定をオーバーライドするためのみに使用されます。他のストレージ設定フィールドのオーバーライドは現在サポートされていません。ストレージ設定の他のフィールドは現在サポートされていません。

3.1.3.2.2. 永続ボリューム要求の命名

永続ストレージが使用されると、以下の名前で永続ボリューム要求が作成されます。

data-cluster-name-kafka-idx
Kafka ブローカー Pod idx のデータを保存するために使用されるボリュームの永続ボリューム要求です。
data-cluster-name-zookeeper-idx
ZooKeeper ノード Pod idx のデータを保存するために使用されるボリュームの永続ボリューム要求です。
3.1.3.2.3. ログディレクトリー

永続ボリュームは、以下のパスにマウントされるログディレクトリーとして Kafka ブローカーによって使用されます。

/var/lib/kafka/data/kafka-log_idx_
idx は、Kafka ブローカー Pod インデックスです。たとえば、/var/lib/kafka/data/kafka-log0 のようになります。

3.1.3.3. 永続ボリュームのサイズ変更

既存の AMQ Streams クラスターによって使用される永続ボリュームのサイズを増やすことで、ストレージ容量を増やすことができます。永続ボリュームのサイズ変更は、JBOD ストレージ設定で 1 つまたは複数の永続ボリュームが使用されるクラスターでサポートされます。

注記

永続ボリュームのサイズを拡張することはできますが、縮小することはできません。永続ボリュームのサイズ縮小は、現在 OpenShift ではサポートされていません。

前提条件

  • ボリュームのサイズ変更をサポートする OpenShift クラスター。
  • Cluster Operator が稼働中です。
  • ボリューム拡張をサポートするストレージクラスを使用して作成された永続ボリュームを使用する Kafka クラスター。

手順

  1. Kafka リソースで、Kafka クラスター、ZooKeeper クラスター、またはその両方に割り当てられた永続ボリュームのサイズを増やします。

    • Kafka クラスターに割り当てられたボリュームサイズを増やすには、spec.kafka.storage プロパティーを編集します。
    • ZooKeeper クラスターに割り当てたボリュームサイズを増やすには、spec.zookeeper.storage プロパティーを編集します。

      たとえば、ボリュームサイズを 1000Gi から 2000Gi に増やすには、以下のように編集します。

      apiVersion: kafka.strimzi.io/v1beta1
      kind: Kafka
      metadata:
        name: my-cluster
      spec:
        kafka:
          # ...
          storage:
            type: persistent-claim
            size: 2000Gi
            class: my-storage-class
          # ...
        zookeeper:
          # ...
  2. リソースを作成または更新します。

    次のように oc apply を使用します。

    oc apply -f your-file

    OpenShift では、Cluster Operator からの要求に応じて、選択された永続ボリュームの容量が増やされます。サイズ変更が完了すると、サイズ変更された永続ボリュームを使用するすべての Pod が Cluster Operator によって再起動されます。これは自動的に行われます。

関連情報

OpenShift での永続ボリュームのサイズ変更に関する詳細は、Resizing Persistent Volumes using Kubernetes を参照してください。

3.1.3.4. JBOD ストレージの概要

AMQ Streams で、複数のディスクやボリュームのデータストレージ設定である JBOD を使用するように設定できます。JBOD は、Kafka ブローカーのデータストレージを増やす方法の 1 つです。また、パフォーマンスを向上することもできます。

JBOD 設定は 1 つまたは複数のボリュームによって記述され、各ボリュームは 一時 または 永続 ボリュームのいずれかになります。JBOD ボリューム宣言のルールおよび制約は、一時および永続ストレージのルールおよび制約と同じです。たとえば、永続ストレージのボリュームをプロビジョニング後に変更することはできません。

3.1.3.4.1. JBOD の設定

AMQ Streams で JBOD を使用するには、ストレージ typejbod に設定する必要があります。volumes プロパティーを使用すると、JBOD ストレージアレイまたは設定を設定するディスクを記述できます。以下は、JBOD 設定例の抜粋になります。

# ...
storage:
  type: jbod
  volumes:
  - id: 0
    type: persistent-claim
    size: 100Gi
    deleteClaim: false
  - id: 1
    type: persistent-claim
    size: 100Gi
    deleteClaim: false
# ...

id は、JBOD ボリュームの作成後に変更することはできません。

ユーザーは JBOD 設定に対してボリュームを追加または削除できます。

3.1.3.4.2. JBOD および 永続ボリューム要求

永続ストレージを使用して JBOD ボリュームを宣言する場合、永続ボリューム要求の命名スキームは以下のようになります。

data-id-cluster-name-kafka-idx
id は、Kafka ブローカー Pod idx のデータを保存するために使用されるボリュームの ID に置き換えます。
3.1.3.4.3. ログディレクトリー

JBOD ボリュームは、以下のパスにマウントされるログディレクトリーとして Kafka ブローカーによって使用されます。

/var/lib/kafka/data-id/kafka-log_idx_
id は、Kafka ブローカー Pod idx のデータを保存するために使用されるボリュームの ID に置き換えます。たとえば、/var/lib/kafka/data-0/kafka-log0 のようになります。

3.1.3.5. JBOD ストレージへのボリュームの追加

この手順では、JBOD ストレージを使用するように設定されている Kafka クラスターにボリュームを追加する方法を説明します。この手順は、他のストレージタイプを使用するように設定されている Kafka クラスターには適用できません。

注記

以前使用され、削除された id の下に新規ボリュームを追加する場合、以前使用された PersistentVolumeClaims が必ず削除されているよう確認する必要があります。

前提条件

  • OpenShift クラスター
  • 稼働中の Cluster Operator
  • JBOD ストレージのある Kafka クラスター。

手順

  1. Kafka リソースの spec.kafka.storage.volumes プロパティーを編集します。新しいボリュームを volumes アレイに追加します。たとえば、id が 2 の新しいボリュームを追加します。

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        # ...
        storage:
          type: jbod
          volumes:
          - id: 0
            type: persistent-claim
            size: 100Gi
            deleteClaim: false
          - id: 1
            type: persistent-claim
            size: 100Gi
            deleteClaim: false
          - id: 2
            type: persistent-claim
            size: 100Gi
            deleteClaim: false
        # ...
      zookeeper:
        # ...
  2. リソースを作成または更新します。

    oc apply を使用して、これを行うことができます。

    oc apply -f your-file
  3. 新しいトピックを作成するか、既存のパーティションを新しいディスクに再度割り当てます。

関連情報

トピックの再割り当てについて、詳しくは 「パーティションの再割り当て」 を参照してください。

3.1.3.6. JBOD ストレージからのボリュームの削除

この手順では、JBOD ストレージを使用するように設定されている Kafka クラスターからボリュームを削除する方法を説明します。この手順は、他のストレージタイプを使用するように設定されている Kafka クラスターには適用できません。JBOD ストレージには、常に 1 つのボリュームが含まれている必要があります。

重要

データの損失を避けるには、ボリュームを削除する前にすべてのパーティションを移動する必要があります。

前提条件

  • OpenShift クラスター
  • 稼働中の Cluster Operator
  • 複数のボリュームがある JBOD ストレージのある Kafka クラスター

手順

  1. 削除するディスクからすべてのパーティションを再度割り当てます。削除するディスクに割り当てられたままになっているパーティションのデータは削除される可能性があります。
  2. Kafka リソースの spec.kafka.storage.volumes プロパティーを編集します。volumes アレイから 1 つまたは複数のボリュームを削除します。たとえば、ID が 12 のボリュームを削除します。

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        # ...
        storage:
          type: jbod
          volumes:
          - id: 0
            type: persistent-claim
            size: 100Gi
            deleteClaim: false
        # ...
      zookeeper:
        # ...
  3. リソースを作成または更新します。

    oc apply を使用して、これを行うことができます。

    oc apply -f your-file

関連情報

トピックの再割り当てについて、詳しくは 「パーティションの再割り当て」 を参照してください。

3.1.4. Kafka ブローカーレプリカ

Kafka クラスターは多くのブローカーを使って実行できます。Kafka.spec.kafka.replicas の Kafka クラスターに使用されるブローカーの数を設定できます。クラスターに最適なブローカー数は、特定のユースケースに基づいて決定する必要があります。

3.1.4.1. ブローカーノード数の設定

この手順では、新規クラスターの Kafka ブローカーノードの数を設定する方法を説明します。これは、パーティションのない新しいクラスターのみに適用できます。クラスターにトピックがすでに定義されている場合は、「クラスターのスケーリング」 を参照してください。

前提条件

  • OpenShift クラスター
  • 稼働中の Cluster Operator が必要です。
  • トピックが定義されていない Kafka クラスター。

手順

  1. Kafka リソースの replicas プロパティーを編集します。以下に例を示します。

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        # ...
        replicas: 3
        # ...
      zookeeper:
        # ...
  2. リソースを作成または更新します。

    oc apply を使用して、これを行うことができます。

    oc apply -f your-file

関連情報

クラスターにトピックがすでに定義されている場合は、「クラスターのスケーリング」 を参照してください。

3.1.5. Kafka ブローカーの設定

AMQ Streams では、Kafka クラスターの Kafka ブローカーの設定をカスタマイズできます。Apache Kafka ドキュメント の Broker Configs セクションに記載されているほとんどのオプションを指定および設定できます。以下に関係する設定オプションは設定できません。

  • セキュリティー (暗号化、認証、および承認)
  • リスナーの設定
  • Broker ID の設定
  • ログデータディレクトリーの設定
  • ブローカー間の通信
  • ZooKeeper の接続

これらのオプションは AMQ Streams によって自動的に設定されます。

3.1.5.1. Kafka ブローカーの設定

Kafka.spec.kafkaconfig プロパティーには Kafka ブローカー設定オプションがキーとして含まれ、それらの値は以下の JSON タイプの 1 つになります。

  • 文字列
  • 数値
  • ブール値

AMQ Streams によって直接管理されるオプション以外は、Apache Kafka ドキュメント の Broker Configs セクションにあるすべてのオプションを指定および設定できます。以下の文字列の 1 つと同じキーまたは以下の文字列の 1 つで始まるキーを持つ設定オプションはすべて変更できません。

  • listeners
  • advertised.
  • broker.
  • listener.
  • host.name
  • port
  • inter.broker.listener.name
  • sasl.
  • ssl.
  • security.
  • password.
  • principal.builder.class
  • log.dir
  • zookeeper.connect
  • zookeeper.set.acl
  • authorizer.
  • super.user

制限されたオプションが config プロパティーに指定された場合、そのオプションは無視され、Cluster Operator のログファイルに警告メッセージが出力されます。サポートされるその他すべてのオプションは Kafka に渡されます。

許可される 3 つの ssl 設定オプションを使用して、TLS バージョンの固有の暗号スイートで外部リスナーを実行します。暗号スイートでは、セキュアな接続とデータ転送のためのアルゴリズムが組み合わされます。

Kafka ブローカーの設定例

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    # ...
    config:
      num.partitions: 1
      num.recovery.threads.per.data.dir: 1
      default.replication.factor: 3
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 1
      log.retention.hours: 168
      log.segment.bytes: 1073741824
      log.retention.check.interval.ms: 300000
      num.network.threads: 3
      num.io.threads: 8
      socket.send.buffer.bytes: 102400
      socket.receive.buffer.bytes: 102400
      socket.request.max.bytes: 104857600
      group.initial.rebalance.delay.ms: 0
      ssl.cipher.suites: "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384" 1
      ssl.enabled.protocols: "TLSv1.2" 2
      ssl.protocol: "TLSv1.2" 3
    # ...

1
TLS の暗号スイートは、ECDHE 鍵交換メカニズム、RSA 認証アルゴリズム、AES 一括暗号化アルゴリズム、および SHA384 MAC アルゴリズムの組み合わせを使用します。
2
SSI プロトコル TLSv1.2 は有効になります。
3
TLSv1.2 プロトコルを指定し、SSL コンテキスト生成します。許可される値は TLSv1.1 および TLSv1.2 です。

3.1.5.2. Kafka ブローカーの設定

既存の Kafka ブローカーを設定するか、指定した設定で新しい Kafka ブローカーを作成します。

前提条件

  • OpenShift クラスターが利用できる必要があります。
  • Cluster Operator が稼働している必要があります。

手順

  1. クラスターデプロイメントを指定する Kafka リソースが含まれる YAML 設定ファイルを開きます。
  2. Kafka リソースの spec.kafka.config プロパティーで、Kafka 設定を 1 つまたは複数入力します。以下に例を示します。

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    spec:
      kafka:
        # ...
        config:
          default.replication.factor: 3
          offsets.topic.replication.factor: 3
          transaction.state.log.replication.factor: 3
          transaction.state.log.min.isr: 1
        # ...
      zookeeper:
        # ...
  3. 新しい設定を適用してリソースを作成または更新します。

    次のように oc apply を使用します。

    oc apply -f kafka.yaml

    kafka.yaml は、設定するリソースの YAML 設定ファイルに置き換えます (例: kafka-persistent.yaml)。

3.1.6. Kafka ブローカーリスナー

Kafka ブローカーで有効なリスナーを設定できます。以下のタイプのリスナーがサポートされます。

  • ポート 9092 のプレーンリスナー (TLS による暗号化なし)
  • ポート 9093 の TLS リスナー (TLS による暗号化を使用)
  • OpenShift の外部からアクセスするためのポート 9094 の外部リスナー

OAuth 2.0

OAuth 2.0 トークンベースの認証を使用している場合、承認サーバーに接続するようにリスナーを設定できます。詳細は、OAuth 2.0 トークンベース認証の使用 を参照してください。

リスナー証明書

TLS 暗号化が有効になっている TLS リスナーまたは外部リスナーの、Kafka リスナー証明書 と呼ばれる独自のサーバー証明書を提供できます。詳細は、「Kafka リスナー証明書」 を参照してください。

3.1.6.1. Kafka リスナー

Kafka.spec.kafka リソースの listeners プロパティーを使用して Kafka ブローカーリスナーを設定できます。listeners プロパティーには 3 つのサブプロパティーが含まれます。

  • plain
  • tls
  • external

各リスナーは、listeners オブジェクトに指定のプロパティーがある場合にのみ定義されます。

すべてのリスナーが有効な listeners プロパティーの例

# ...
listeners:
  plain: {}
  tls: {}
  external:
    type: loadbalancer
# ...

プレーンリスナーのみが有効な listeners プロパティーの例

# ...
listeners:
  plain: {}
# ...

3.1.6.2. Kafka リスナーの設定

前提条件

  • OpenShift クラスター
  • 稼働中の Cluster Operator

手順

  1. Kafka.spec.kafka リソースの listeners プロパティーを編集します。

    認証のないプレーン (暗号化されていない) リスナーの設定例:

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    spec:
      kafka:
        # ...
        listeners:
          plain: {}
        # ...
      zookeeper:
        # ...
  2. リソースを作成または更新します。

    oc apply を使用して、これを行うことができます。

    oc apply -f your-file

関連情報

3.1.6.3. リスナー認証

リスナーの authentication プロパティーは、そのリスナーに固有の認証メカニズムを指定するために使用されます。

  • 相互 TLS 認証 (TLS による暗号化のリスナーのみ)
  • SCRAM-SHA 認証

authentication プロパティーが指定されていない場合、リスナーはそのリスナー経由で接続するクライアントを認証しません。

認証は、User Operator を使用して KafkaUsers を管理する場合に設定する必要があります。

3.1.6.3.1. リスナーの認証設定

以下の例で指定されるものは次のとおりです。

  • SCRAM-SHA 認証に設定された plain リスナー
  • 相互 TLS 認証を使用する tls リスナー
  • 相互 TLS 認証を使用する external リスナー

リスナー認証設定の例

# ...
listeners:
  plain:
    authentication:
      type: scram-sha-512
  tls:
    authentication:
      type: tls
  external:
    type: loadbalancer
    tls: true
    authentication:
      type: tls
# ...

3.1.6.3.2. 相互 TLS 認証

相互 TLS 認証は、Kafka ブローカーと ZooKeeper Pod 間の通信で常に使用されます。

相互認証または双方向認証は、サーバーとクライアントの両方が証明書を提示するときに使用されます。AMQ Streams では、Kafka が TLS (Transport Layer Security) を使用して、相互認証の有無を問わず、Kafka ブローカーとクライアントとの間で暗号化された通信が行われるよう設定できます。相互認証を設定する場合、ブローカーによってクライアントが認証され、クライアントによってブローカーが認証されます。

注記

TLS 認証は一般的には一方向で、一方が他方のアイデンティティーを認証します。たとえば、Web ブラウザーと Web サーバーの間で HTTPS が使用される場合、サーバーはブラウザーのアイデンティティーの証明を取得します。

3.1.6.3.2.1. クライアントに相互 TLS 認証を使用する場合

以下の場合、Kafka クライアントの認証に相互 TLS 認証が推奨されます。

  • 相互 TLS 認証を使用した認証がクライアントでサポートされる場合。
  • パスワードの代わりに TLS 証明書を使用する必要がある場合。
  • 期限切れの証明書を使用しないように、クライアントアプリケーションを定期的に再設定および再起動できる場合。
3.1.6.3.3. SCRAM-SHA 認証

SCRAM (Salted Challenge Response Authentication Mechanism) は、パスワードを使用して相互認証を確立できる認証プロトコルです。AMQ Streams では、Kafka が SASL (Simple Authentication and Security Layer) SCRAM-SHA-512 を使用するよう設定し、暗号化されていないクライアントの接続と TLS で暗号化されたクライアントの接続の両方で認証を提供できます。TLS 認証は、Kafka ブローカーと ZooKeeper ノードの間で常に内部で使用されます。TLS クライアント接続で TLS プロトコルを使用すると、接続が暗号化されますが、認証には使用されません。

SCRAM の以下のプロパティーは、暗号化されていない接続でも SCRAM-SHA を安全に使用できるようにします。

  • 通信チャネル上では、パスワードはクリアテキストで送信されません。代わりに、クライアントとサーバーはお互いにチャレンジを生成し、認証するユーザーのパスワードを認識していることを証明します。
  • サーバーとクライアントは、認証を交換するたびに新しいチャレンジを生成します。よって、交換はリレー攻撃に対して回復性を備えています。
3.1.6.3.3.1. サポートされる SCRAM クレデンシャル

AMQ Streams では SCRAM-SHA-512 のみがサポートされます。KafkaUser.spec.authentication.typescram-sha-512 に設定すると、User Operator によって、大文字と小文字の ASCII 文字と数字で設定された無作為の 12 文字のパスワードが生成されます。

3.1.6.3.3.2. クライアントに SCRAM-SHA 認証を使用する場合

以下の場合、Kafka クライアントの認証に SCRAM-SHA が推奨されます。

  • SCRAM-SHA-512 を使用した認証がクライアントでサポートされる場合。
  • TLS 証明書の代わりにパスワードを使用する必要がある場合。
  • 暗号化されていない通信に認証が必要な場合。

3.1.6.4. 外部リスナー

外部リスナーを使用して AMQ Streams の Kafka クラスターを OpenShift 環境外のクライアントに公開します。

その他のリソース

3.1.6.4.1. 外部リスナーでアドバタイズされたアドレスのカスタマイズ

デフォルトでは、AMQ Streams は Kafka クラスターがそのクライアントにアドバタイズするホスト名とポートを自動的に決定しようとします。AMQ Streams が稼働しているインフラストラクチャーでは Kafka にアクセスできる正しいホスト名やポートを提供しない可能性があるため、デフォルトの動作はすべての状況に適しているわけではありません。外部リスナーの overrides プロパティーで、アドバタイズされたホスト名およびポートをカスタマイズできます。その後、TLS ホスト名の検証で使用できるようにするため、AMQ Streams では Kafka ブローカーでアドバタイズされたアドレスが自動的に設定され、ブローカー証明書に追加されます。アドバタイズされたホストおよびポートのオーバーライドは、すべてのタイプの外部リスナーで利用できます。

アドバタイズされたアドレスのオーバーライドが設定された外部リスナーの例

# ...
listeners:
  external:
    type: route
    authentication:
      type: tls
    overrides:
      brokers:
      - broker: 0
        advertisedHost: example.hostname.0
        advertisedPort: 12340
      - broker: 1
        advertisedHost: example.hostname.1
        advertisedPort: 12341
      - broker: 2
        advertisedHost: example.hostname.2
        advertisedPort: 12342
# ...

さらに、ブートストラップサービスの名前を指定することもできます。この名前はブローカー証明書に追加され、TLS ホスト名の検証に使用できます。すべてのタイプの外部リスナーで、ブートストラップアドレスを追加できます。

追加のブートストラップアドレスが設定された外部リスナーの例

# ...
listeners:
  external:
    type: route
    authentication:
      type: tls
    overrides:
      bootstrap:
        address: example.hostname
# ...

3.1.6.4.2. ルート外部リスナー

タイプ route の外部リスナーは、OpenShift の Routes および HAProxy ルーターを使用して Kafka を公開します。

注記

route は OpenShift でのみサポートされます。

3.1.6.4.2.1. OpenShift Routes を使用した Kafka の公開

OpenShift Routes および HAProxy ルーターを使用して Kafka を公開する場合、各 Kafka ブローカー Pod に専用の Route が作成されます。追加の Route が作成され、Kafka ブートストラップアドレスとして提供されます。これらの Routes を使用すると、Kafka クライアントを 443 番ポートで Kafka に接続することができます。

TLS による暗号化は常に Routes と使用されます。

デフォルトでは、ルートホストは OpenShift によって自動的に割り当てられます。ただし、 overrides プロパティーに要求されたホストを指定すると、割り当てられたルートをオーバーライドすることができます。AMQ Streams では、要求されたホストが利用可能であるか検証されません。そのため、ホストが利用可能であることをユーザーが確認する必要があります。

OpenShift ルートホストのオーバーライドが設定されたタイプ routes の外部リスナーの例

# ...
listeners:
  external:
    type: route
    authentication:
      type: tls
    overrides:
      bootstrap:
        host: bootstrap.myrouter.com
      brokers:
      - broker: 0
        host: broker-0.myrouter.com
      - broker: 1
        host: broker-1.myrouter.com
      - broker: 2
        host: broker-2.myrouter.com
# ...

Routes を使用した Kafka へのアクセスに関する詳細は 「OpenShift ルートを使用した Kafka へのアクセス」 を参照してください。

3.1.6.4.2.2. OpenShift ルートを使用した Kafka へのアクセス

前提条件

  • OpenShift クラスター
  • 稼働中の Cluster Operator

手順

  1. 外部リスナーが有効で、タイプ route に設定されている Kafka クラスターをデプロイします。

    Routes を使用するよう設定された外部リスナーがある設定の例

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    spec:
      kafka:
        # ...
        listeners:
          external:
            type: route
            # ...
        # ...
      zookeeper:
        # ...
  2. リソースを作成または更新します。

    oc apply -f your-file
  3. ブートストラップ Route のアドレスを見つけます。

    oc get routes CLUSTER-NAME-kafka-bootstrap -o=jsonpath='{.status.ingress[0].host}{"\n"}'

    このアドレスと Kafka クライアントの 443 番ポートをブートストラップアドレスとして使用します。

  4. ブローカーの認証局の公開証明書を取得します。

    oc get secret CLUSTER-NAME-cluster-ca-cert -o jsonpath='{.data.ca\.crt}' | base64 -d > ca.crt

    Kafka クライアントで取得した証明書を使用して TLS 接続を設定します。認証が有効になっている場合は、SASL または TLS 認証を設定する必要もあります。

その他のリソース

3.1.6.4.3. ロードバランサー外部リスナー

タイプが loadbalancer の外部リスナーは、Loadbalancer タイプの Services を使用して、Kafka を公開します。

3.1.6.4.3.1. ロードバランサーを使用した Kafka の公開

Loadbalancer タイプの Services を使用して Kafka を公開すると、Kafka ブローカー Pod ごとに新しいロードバランサーサービスが作成されます。追加のロードバランサーが作成され、Kafka の ブートストラップ アドレスとして提供されます。ロードバランサーは 9094 番ポートで接続をリッスンします。

デフォルトでは、TLS による暗号化は有効になっています。これを無効にするには、tls フィールドを false に設定します。

タイプが loadbalancer の外部リスナーの例

# ...
listeners:
  external:
    type: loadbalancer
    authentication:
      type: tls
# ...

ロードバランサーを使用した Kafka へのアクセスに関する詳細は 「ロードバランサーを使用した Kafka へのアクセス」 を参照してください。

3.1.6.4.3.2. 外部ロードバランサーリスナーの DNS 名のカスタマイズ

loadbalancer リスナーでは、dnsAnnotations プロパティーを使用して追加のアノテーションをロードバランサーサービスに追加できます。これらのアノテーションを使用すると、自動的に DNS 名をロードバランサーサービスに割り当てる ExternalDNS などの DNS ツールをインストルメント化できます。

dnsAnnotations を使用するタイプ loadbalancer の外部リスナーの例

# ...
listeners:
  external:
    type: loadbalancer
    authentication:
      type: tls
    overrides:
      bootstrap:
        dnsAnnotations:
          external-dns.alpha.kubernetes.io/hostname: kafka-bootstrap.mydomain.com.
          external-dns.alpha.kubernetes.io/ttl: "60"
      brokers:
      - broker: 0
        dnsAnnotations:
          external-dns.alpha.kubernetes.io/hostname: kafka-broker-0.mydomain.com.
          external-dns.alpha.kubernetes.io/ttl: "60"
      - broker: 1
        dnsAnnotations:
          external-dns.alpha.kubernetes.io/hostname: kafka-broker-1.mydomain.com.
          external-dns.alpha.kubernetes.io/ttl: "60"
      - broker: 2
        dnsAnnotations:
          external-dns.alpha.kubernetes.io/hostname: kafka-broker-2.mydomain.com.
          external-dns.alpha.kubernetes.io/ttl: "60"
# ...

3.1.6.4.3.3. ロードバランサー IP アドレスのカスタマイズ

loadbalancer リスナーで、ロードバランサーの作成時に loadBalancerIP プロパティーを使用すると、特定の IP アドレスをリクエストできます。特定の IP アドレスでロードバランサーを使用する必要がある場合は、このプロパティーを使用します。クラウドプロバイダーがこの機能に対応していない場合、loadBalancerIP フィールドは無視されます。

特定のロードバランサー IP アドレスリクエストのある loadbalancer タイプの外部リスナーの例

# ...
listeners:
  external:
    type: loadbalancer
    authentication:
      type: tls
    overrides:
      bootstrap:
        loadBalancerIP: 172.29.3.10
      brokers:
      - broker: 0
        loadBalancerIP: 172.29.3.1
      - broker: 1
        loadBalancerIP: 172.29.3.2
      - broker: 2
        loadBalancerIP: 172.29.3.3
# ...

3.1.6.4.3.4. ロードバランサーを使用した Kafka へのアクセス

前提条件

  • OpenShift クラスター
  • 稼働中の Cluster Operator

手順

  1. 外部リスナーが有効で、タイプが loadbalancer に設定されている Kafka クラスターをデプロイします。

    ロードバランサーを使用するよう設定された外部リスナーがある設定の例

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    spec:
      kafka:
        # ...
        listeners:
          external:
            type: loadbalancer
            authentication:
              type: tls
            # ...
        # ...
      zookeeper:
        # ...
  2. リソースを作成または更新します。

    oc apply を使用して、これを行うことができます。

    oc apply -f your-file
  3. ブートストラップロードバランサーのホスト名を見つけます。

    oc get を使用してこれを行うことができます。

    oc get service cluster-name-kafka-external-bootstrap -o=jsonpath='{.status.loadBalancer.ingress[0].hostname}{"\n"}'

    ホスト名が見つからない場合 (コマンドによって返されなかった場合)、ロードバランサーの IP アドレスを使用します。

    oc get を使用してこれを行うことができます。

    oc get service cluster-name-kafka-external-bootstrap -o=jsonpath='{.status.loadBalancer.ingress[0].ip}{"\n"}'

    ホスト名または IP アドレスと Kafka クライアントの 9094 番ポートをブートストラップアドレスとして使用します。

  4. TLS による暗号化が無効になっている場合を除き、ブローカーの認証局の公開証明書を取得します。

    oc get を使用してこれを行うことができます。

    oc get secret cluster-name-cluster-ca-cert -o jsonpath='{.data.ca\.crt}' | base64 -d > ca.crt

    Kafka クライアントで取得した証明書を使用して TLS 接続を設定します。認証が有効になっている場合は、SASL または TLS 認証を設定する必要もあります。

その他のリソース

3.1.6.4.4. ノードポートの外部リスナー

タイプが nodeport の外部リスナーは、NodePort タイプの Services を使用して、Kafka を公開します。

3.1.6.4.4.1. ノードポートを使用した Kafka の公開

NodePort タイプの Services を使用して Kafka を公開する場合、Kafka クライアントは OpenShift のノードに直接接続されます。各クライアントの OpenShift ノード上のポートへのアクセスを有効にする必要があります (ファイアウォール、セキュリティーグループなど)。各 Kafka ブローカー Pod が別々のポートでアクセス可能になります。

追加の NodePort タイプのサービスが作成され、Kafka ブートストラップアドレスとして提供されます。

Kafka ブローカー Pod にアドバタイズされたアドレスを設定する場合、AMQ Stremas では該当の Pod が稼働しているノードのアドレスが使用されます。多くの場合で、ノードには複数のアドレスがあります。以下の優先順位で、最初に見つかったタイプのアドレスが使用されます。

  1. ExternalDNS
  2. ExternalIP
  3. Hostname
  4. InternalDNS
  5. InternalIP

リスナー設定の preferredAddressType プロパティーを使用して、ノードアドレスとしてチェックされた最初のアドレスタイプを指定できます。たとえば、デプロイメントに DNS サポートがない場合や、内部 DNS または IP アドレスを介してブローカーを内部でのみ公開する場合、このプロパティーは便利です。該当タイプのアドレスが見つかった場合はそのアドレスが使用されます。アドレスタイプが見つからなかった場合、AMQ Streams は標準の優先順位でタイプの検索を続行します。

優先アドレスタイプで設定された外部リスナーの例

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
spec:
  kafka:
    # ...
    listeners:
      external:
        type: nodeport
        tls: true
        authentication:
          type: tls
        configuration:
          preferredAddressType: InternalDNS
    # ...
  zookeeper:
    # ...

デフォルトでは、TLS による暗号化は有効になっています。これを無効にするには、tls フィールドを false に設定します。

注記

ノードポートを使用して Kafka クラスターを公開する場合、現在 TLS ホスト名の検証はサポートされません。

デフォルトでは、ブートストラップおよびブローカーサービスに使用されるポート番号は OpenShift によって自動的に割り当てられます。ただし、overrides プロパティーに要求されたポート番号を指定すると、割り当てられたノードポートをオーバーライドすることができます。AMQ Streams では、要求されたポートで検証を実行しません。そのため、ポートが使用可能であることをユーザーが確認する必要があります。

ノードポートのオーバーライドが設定された外部リスナーの例

# ...
listeners:
  external:
    type: nodeport
    tls: true
    authentication:
      type: tls
    overrides:
      bootstrap:
        nodePort: 32100
      brokers:
      - broker: 0
        nodePort: 32000
      - broker: 1
        nodePort: 32001
      - broker: 2
        nodePort: 32002
# ...

ノードポートを使用した Kafka へのアクセスに関する詳細は 「ノードポートを使用した Kafka へのアクセス」 を参照してください。

3.1.6.4.4.2. 外部ノードポートリスナーの DNS 名のカスタマイズ

nodeport リスナーでは、dnsAnnotations プロパティーを使用して追加のアノテーションをノードポートサービスに追加できます。これらのアノテーションを使用すると、自動的に DNS 名をクラスターノードに割り当てる ExternalDNS などの DNS ツールをインストルメント化できます。

dnsAnnotations を使用するタイプ nodeport の外部リスナーの例

# ...
listeners:
  external:
    type: nodeport
    tls: true
    authentication:
      type: tls
    overrides:
      bootstrap:
        dnsAnnotations:
          external-dns.alpha.kubernetes.io/hostname: kafka-bootstrap.mydomain.com.
          external-dns.alpha.kubernetes.io/ttl: "60"
      brokers:
      - broker: 0
        dnsAnnotations:
          external-dns.alpha.kubernetes.io/hostname: kafka-broker-0.mydomain.com.
          external-dns.alpha.kubernetes.io/ttl: "60"
      - broker: 1
        dnsAnnotations:
          external-dns.alpha.kubernetes.io/hostname: kafka-broker-1.mydomain.com.
          external-dns.alpha.kubernetes.io/ttl: "60"
      - broker: 2
        dnsAnnotations:
          external-dns.alpha.kubernetes.io/hostname: kafka-broker-2.mydomain.com.
          external-dns.alpha.kubernetes.io/ttl: "60"
# ...

3.1.6.4.4.3. ノードポートを使用した Kafka へのアクセス

この手順では、ノードポートを使用して外部クライアントから AMQ Streams Kafka クラスターにアクセスする方法について説明します。

ブローカーに接続するには、Kafka bootstrap アドレスのホスト名 (アドバタイズされたアドレス) とポート番号、および認証に使用される証明書が必要です。

前提条件

  • OpenShift クラスター
  • 稼働中の Cluster Operator

手順

  1. 外部リスナーが有効で、タイプが nodeport に設定されている Kafka クラスターをデプロイします。

    以下に例を示します。

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    spec:
      kafka:
        # ...
        listeners:
          external:
            type: nodeport
            tls: true
            authentication:
              type: tls
            configuration:
              brokerCertChainAndKey: 1
                secretName: my-secret
                certificate: my-certificate.crt
                key: my-key.key
              preferredAddressType: InternalDNS 2
        # ...
      zookeeper:
        # ...
    1
    外部の認証局によって管理される Kafka リスナー証明書 の任意設定。brokerCertChainAndKey プロパティーは、サーバー証明書および秘密鍵を保持する Secret を指定します。Kafka リスナー証明書も TLS リスナーに対して設定できます。
    2
  2. リソースを作成または更新します。

    oc apply -f your-file
  3. ブートストラップサービスのポート番号を見つけます。

    oc get service cluster-name-kafka-external-bootstrap -o=jsonpath='{.spec.ports[0].nodePort}{"\n"}'

    ポートは Kafka ブートストラップアドレスで使用されます。

  4. OpenShift ノードのアドレスを見つけます。

    oc get node node-name -o=jsonpath='{range .status.addresses[*]}{.type}{"\t"}{.address}{"\n"}'

    異なるアドレスが返される場合は、以下の順序を基にしてアドレスタイプを選択します。

    1. ExternalDNS
    2. ExternalIP
    3. Hostname
    4. InternalDNS
    5. InternalIP

    アドレスと前述のステップで見つけたポートを、Kafka ブートストラップアドレスで使用します。

  5. TLS による暗号化が無効になっている場合を除き、ブローカーの認証局の公開証明書を取得します。

    oc get secret cluster-name-cluster-ca-cert -o jsonpath='{.data.ca\.crt}' | base64 -d > ca.crt

    Kafka クライアントで取得した証明書を使用して TLS 接続を設定します。認証が有効になっている場合は、SASL または TLS 認証を設定する必要もあります。

その他のリソース

3.1.6.4.5. OpenShift Ingress 外部リスナー

タイプが ingress の外部リスナーは、Kubernetes IngressNGINX Ingress Controller for Kubernetes を使用して Kafka を公開します。

3.1.6.4.5.1. Kubernetes Ingress を使用した Kafka の公開

Kubernetes IngressNGINX Ingress Controller for Kubernetes を使用して Kafka が公開されると、Kafka ブローカー Pod ごとに専用の Ingress リソースが作成されます。追加の Ingress リソースが作成され、Kafka ブートストラップアドレスとして提供されます。これらの Ingress リソースを使用すると、Kafka クライアントを 443 番ポートで Kafka に接続することができます。

注記

Ingress を使用する外部リスナーは、現在 NGINX Ingress Controller for Kubernetes でのみテストされています。

Kafka は TCP 上でバイナリープロトコルを使用しますが、NGINX Ingress Controller for Kubernetes は HTTP プロトコルで動作するように設計されています。Ingress から Kafka コネクションを渡せるようにするため、AMQ Streams では NGINX Ingress Controller for Kubernetes の TLS パススルー機能が使用されます。TLS パススルーが NGINX Ingress Controller for Kubernetes デプロイメントで有効になっていることを確認してください。TLS パススルーの有効化に関する詳細は、TLS パススルーのドキュメント を参照してください。Ingress を使用して Kafka を公開する場合、TLS パススルー機能を使用するため、TLS による暗号化を無効にできません。

Ingress コントローラーはホスト名を自動的に割り当てません。spec.kafka.listeners.external.configuration セクションに、ブートストラップおよびブローカーごとのサービスによって使用されるホスト名を指定する必要があります。また、確実にホスト名が Ingress エンドポイントに解決することを確認する必要があります。AMQ Streams では、要求されたホストが利用可能で、適切に Ingress エンドポイントにルーティングされることを検証しません。

タイプが ingress の外部リスナーの例

# ...
listeners:
  external:
    type: ingress
    authentication:
      type: tls
    configuration:
      bootstrap:
        host: bootstrap.myingress.com
      brokers:
      - broker: 0
        host: broker-0.myingress.com
      - broker: 1
        host: broker-1.myingress.com
      - broker: 2
        host: broker-2.myingress.com
# ...

Ingress を使用した Kafka へのアクセスに関する詳細は 「ingress を使用した Kafka へのアクセス」 を参照してください。

3.1.6.4.5.2. Ingress クラスの設定

デフォルトで、Ingress クラスは nginx に設定されます。class プロパティーを使用して Ingress クラスを変更できます。

Ingress クラスの nginx-internal を使用した ingress タイプの外部リスナーの例

# ...
listeners:
  external:
    type: ingress
    class: nginx-internal
    # ...
# ...

3.1.6.4.5.3. 外部 ingress リスナーの DNS 名のカスタマイズ

ingress リスナーでは、dnsAnnotations プロパティーを使用して追加のアノテーションを ingress リソースに追加できます。これらのアノテーションを使用すると、自動的に DNS 名を ingress リソースに割り当てる ExternalDNS などの DNS ツールをインストルメント化できます。

dnsAnnotations を使用するタイプ ingress の外部リスナーの例

# ...
listeners:
  external:
    type: ingress
    authentication:
      type: tls
    configuration:
      bootstrap:
        dnsAnnotations:
          external-dns.alpha.kubernetes.io/hostname: bootstrap.myingress.com.
          external-dns.alpha.kubernetes.io/ttl: "60"
        host: bootstrap.myingress.com
      brokers:
      - broker: 0
        dnsAnnotations:
          external-dns.alpha.kubernetes.io/hostname: broker-0.myingress.com.
          external-dns.alpha.kubernetes.io/ttl: "60"
        host: broker-0.myingress.com
      - broker: 1
        dnsAnnotations:
          external-dns.alpha.kubernetes.io/hostname: broker-1.myingress.com.
          external-dns.alpha.kubernetes.io/ttl: "60"
        host: broker-1.myingress.com
      - broker: 2
        dnsAnnotations:
          external-dns.alpha.kubernetes.io/hostname: broker-2.myingress.com.
          external-dns.alpha.kubernetes.io/ttl: "60"
        host: broker-2.myingress.com
# ...

3.1.6.4.5.4. ingress を使用した Kafka へのアクセス

以下の手順では、Ingress を使用して OpenShift の外部から AMQ Streams Kafka クラスターにアクセスする方法を説明します。

前提条件

手順

  1. 外部リスナーが有効で、タイプ ingress に設定されている Kafka クラスターをデプロイします。

    Ingress を使用するよう設定された外部リスナーがある設定の例

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    spec:
      kafka:
        # ...
        listeners:
          external:
            type: ingress
            authentication:
              type: tls
            configuration:
              bootstrap:
                host: bootstrap.myingress.com
              brokers:
              - broker: 0
                host: broker-0.myingress.com
              - broker: 1
                host: broker-1.myingress.com
              - broker: 2
                host: broker-2.myingress.com
        # ...
      zookeeper:
        # ...
  2. configuration セクションのホストが適切に Ingress エンドポイントに解決することを確認してください。
  3. リソースを作成または更新します。

    oc apply -f your-file
  4. ブローカーの認証局の公開証明書を取得します。

    oc get secret cluster-name-cluster-ca-cert -o jsonpath='{.data.ca\.crt}' | base64 -d > ca.crt
  5. Kafka クライアントで取得した証明書を使用して TLS 接続を設定します。認証が有効になっている場合は、SASL または TLS 認証を設定する必要もあります。443 番ポートで、クライアントを設定で指定したホストに接続します。

その他のリソース

3.1.6.5. ネットワークポリシー

AMQ Streams では、Kafka ブローカーで有効になっているリスナーごとに NetworkPolicy リソースが自動的に作成されます。デフォルトでは、すべてのアプリケーションと namespace にアクセスする権限が NetworkPolicy によってリスナーに付与されます。

ネットワークレベルでのリスナーへのアクセスを指定のアプリケーションまたは namespace のみに制限するには、networkPolicyPeers フィールドを使用します。

認証および承認と合わせてネットワークポリシーを使用します。

リスナーごとに、異なる networkPolicyPeers 設定を指定できます。

3.1.6.5.1. リスナーのネットワークポリシー設定

以下に、plain および tls リスナーの networkPolicyPeers 設定の例を示します。

# ...
listeners:
  plain:
    authentication:
      type: scram-sha-512
    networkPolicyPeers:
      - podSelector:
          matchLabels:
            app: kafka-sasl-consumer
      - podSelector:
          matchLabels:
            app: kafka-sasl-producer
  tls:
    authentication:
      type: tls
    networkPolicyPeers:
      - namespaceSelector:
          matchLabels:
            project: myproject
      - namespaceSelector:
          matchLabels:
            project: myproject2
# ...

この例では以下が設定されています。

  • ラベル app: kafka-sasl-consumer および app: kafka-sasl-producer と一致するアプリケーション Pod のみが plain リスナーに接続できます。アプリケーション Pod は Kafka ブローカーと同じ namespace で実行されている必要があります。
  • ラベル project: myproject および project: myproject2 と一致する namespace で稼働しているアプリケーション Pod のみ、tls リスナーに接続できます。

networkPolicyPeers フィールドの構文は、NetworkPolicy リソースの from フィールドと同じです。スキーマの詳細は、NetworkPolicyPeer API reference および KafkaListeners スキーマ参照 を参照してください。

注記

AMQ Streams でネットワークポリシーを使用するには、ingress NetworkPolicies が OpenShift の設定でサポートされる必要があります。

3.1.6.5.2. networkPolicyPeers を使用した Kafka リスナーへのアクセス制限

networkPolicyPeers フィールドを使用すると、リスナーへのアクセスを指定のアプリケーションのみに制限できます。

前提条件

  • Ingress NetworkPolicies をサポートする OpenShift クラスター。
  • Cluster Operator が稼働中です。

手順

  1. Kafka リソースを開きます。
  2. networkPolicyPeers フィールドで、Kafka クラスターへのアクセスが許可されるアプリケーション Pod または namespace を定義します。

    以下は、ラベル appkafka-client に設定されているアプリケーションからの接続のみを許可するよう tls リスナーを設定する例になります。

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    spec:
      kafka:
        # ...
        listeners:
          tls:
            networkPolicyPeers:
              - podSelector:
                  matchLabels:
                    app: kafka-client
        # ...
      zookeeper:
        # ...
  3. リソースを作成または更新します。

    次のように oc apply を使用します。

    oc apply -f your-file

関連情報

3.1.7. 認証および承認

AMQ Streams では認証および承認がサポートされます。認証は、リスナー ごとに独立して設定できます。承認は、常に Kafka クラスター全体に対して設定されます。

3.1.7.1. 認証

認証は、authentication プロパティーの リスナー設定 の一部として設定されます。認証メカニズムは type フィールドで定義されます。

authentication プロパティーがない場合、指定のリスナーで認証が有効になりません。認証がないと、リスナーではすべての接続が許可されます。

サポートされる認証メカニズム

3.1.7.1.1. TLS クライアント認証

TLS クライアント認証は、typetls に指定して有効にします。TLS クライアント認証は tls リスナーでのみサポートされます。

タイプ tlsauthentication の例

# ...
authentication:
  type: tls
# ...

3.1.7.2. Kafka ブローカーでの認証の設定

前提条件

  • OpenShift クラスターが利用できる必要があります。
  • Cluster Operator が稼働している必要があります。

手順

  1. クラスターデプロイメントを指定する Kafka リソースが含まれる YAML 設定ファイルを開きます。
  2. Kafka リソースの spec.kafka.listeners プロパティーで、認証を有効にするリスナーに authentication フィールドを追加します。以下に例を示します。

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    spec:
      kafka:
        # ...
        listeners:
          tls:
            authentication:
              type: tls
        # ...
      zookeeper:
        # ...
  3. 新しい設定を適用してリソースを作成または更新します。

    次のように oc apply を使用します。

    oc apply -f kafka.yaml

    kafka.yaml は、設定するリソースの YAML 設定ファイルに置き換えます (例: kafka-persistent.yaml)。

その他のリソース

  • サポートされる認証メカニズムの詳細は、認証 を参照してください。
  • Kafka のスキーマに関する詳細は、Kafka のスキーマ参照 を参照してください。

3.1.7.3. 承認

Kafka.spec.kafka リソースの authorization プロパティーを使用すると Kafka ブローカーの承認を設定できます。authrization プロパティーがないと、承認が有効になりません。承認を有効にすると、承認は有効なすべての リスナー に適用されます。承認方法は type フィールドで定義されます。

以下を設定できます。

3.1.7.3.1. 簡易承認

AMQ Streams の簡易承認では、SimpleAclAuthorizer が使用されます。これは、Apache Kafka で提供されるデフォルトのアクセス制御リスト (ACL) 承認プラグインです。ACL を使用すると、ユーザーがアクセスできるリソースを細かく定義できます。簡易承認を有効にするには、type フィールドを simple に設定します。

簡易承認の例

# ...
authorization:
  type: simple
# ...

ユーザーのアクセスルールは、アクセス制御リスト (ACL) を使用して定義 されます。必要に応じて、superUsers フィールドにスーパーユーザーのリストを指定できます。

3.1.7.3.2. スーパーユーザー

スーパーユーザーは、ACL で定義されたアクセス制限に関係なく、Kafka クラスターのすべてのリソースにアクセスできます。Kafka クラスターのスーパーユーザーを指定するには、superUsers フィールドにユーザープリンシパルのリストを入力します。ユーザーが TLS クライアント認証を使用する場合、ユーザー名は CN= で始まる証明書のサブジェクトの共通名になります。

スーパーユーザーの指定例

# ...
authorization:
  type: simple
  superUsers:
    - CN=fred
    - sam
    - CN=edward
# ...

注記

Kafka.spec.kafkaconfig プロパティーにある super.user 設定オプションは無視されます。この代わりに、authorization プロパティーでスーパーユーザーを指定します。詳細は Kafka ブローカーの設定 を参照してください。

3.1.7.4. Kafka ブローカーでの承認の設定

承認を設定し、特定の Kafka ブローカーのスーパーユーザーを指定します。

前提条件

  • OpenShift クラスター。
  • Cluster Operator が稼働している必要があります。

手順

  1. Kafka.spec.kafka リソースの authorization プロパティーを追加または編集します。以下に例を示します。

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    spec:
      kafka:
        # ...
        authorization:
          type: simple
          superUsers:
            - CN=fred
            - sam
            - CN=edward
        # ...
      zookeeper:
        # ...
  2. リソースを作成または更新します。

    oc apply を使用して、これを行うことができます。

    oc apply -f your-file

関連情報

  • サポートされる承認方法の詳細は、承認 を参照してください。
  • Kafka のスキーマに関する詳細は、Kafka のスキーマ参照 を参照してください。
  • ユーザー認証の設定に関する詳細は、Kafka User リソース を参照してください。

3.1.8. ZooKeeper レプリカ

通常、ZooKeeper クラスターまたはアンサンブルは、一般的に 3、5、7 個の奇数個のノードで実行されます。

効果的なクォーラムを維持するには、過半数のノードが利用可能である必要があります。ZooKeeper クラスターでクォーラムを失うと、クライアントへの応答が停止し、Kafka ブローカーが機能しなくなります。AMQ Streams では、 ZooKeeper クラスターの安定性および高可用性が重要になります。

3 ノードクラスター
3 ノードの ZooKeeper クラスターでは、クォーラムを維持するために、少なくとも 2 つのノードが稼働している必要があります。このクラスターは、利用できないノードが 1 つのみであれば対応できます。
5 ノードクラスター
5 ノードの ZooKeeper クラスターでは、クォーラムを維持するために、少なくとも 3 つのノードが稼働している必要があります。このクラスターは、利用できないノードが 2 つの場合まで対応できます。
7 ノードクラスター
7 ノードの ZooKeeper クラスターでは、クォーラムを維持するために、少なくとも 4 つのノードが稼働している必要があります。このクラスターは、利用できないノードが 3 つの場合まで対応できます。
注記

開発の目的で、単一ノードの ZooKeeper を実行することも可能です。

クラスターのノードの数が多いほどクォーラムを維持するコストも高くなるため、ノードの数が多いほどパフォーマンスが向上するとは限りません。可用性の要件に応じて、使用するノードの数を決定します。

3.1.8.1. ZooKeeper ノードの数

ZooKeeper ノードの数は、Kafka.spec.zookeeperreplicas プロパティーを使用して設定できます。

レプリカの設定を示す例

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    # ...
  zookeeper:
    # ...
    replicas: 3
    # ...

3.1.8.2. ZooKeeper レプリカの数の変更

前提条件

  • OpenShift クラスターが利用できる必要があります。
  • Cluster Operator が稼働している必要があります。

手順

  1. クラスターデプロイメントを指定する Kafka リソースが含まれる YAML 設定ファイルを開きます。
  2. Kafka リソースの spec.zookeeper.replicas プロパティーで、複製された ZooKeeper サーバーの数を入力します。以下に例を示します。

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        # ...
      zookeeper:
        # ...
        replicas: 3
        # ...
  3. 新しい設定を適用してリソースを作成または更新します。

    次のように oc apply を使用します。

    oc apply -f kafka.yaml

    kafka.yaml は、設定するリソースの YAML 設定ファイルに置き換えます (例: kafka-persistent.yaml)。

3.1.9. ZooKeeper の設定

AMQ Streams では、Apache ZooKeeper ノードの設定をカスタマイズできます。ZooKeeper のドキュメント に記載されているほとんどのオプションを指定および設定できます。

以下に関連するオプションは設定できません。

  • セキュリティー (暗号化、認証、および承認)
  • リスナーの設定
  • データディレクトリーの設定
  • ZooKeeper クラスターの設定

これらのオプションは AMQ Streams によって自動的に設定されます。

3.1.9.1. ZooKeeper の設定

ZooKeeper ノードは、Kafka.spec.zookeeperconfig プロパティーを使用して設定されます。このプロパティーには、ZooKeeper 設定オプションがキーとして含まれます。値は、以下の JSON タイプの 1 つを使用して記述できます。

  • 文字列
  • 数値
  • ブール値

ユーザーは、AMQ Streams で直接管理されるオプションを除き、ZooKeeper ドキュメント に記載されているオプションを指定および設定できます。以下の文字列の 1 つと同じキーまたは以下の文字列の 1 つで始まるキーを持つ設定オプションはすべて禁止されています。

  • server.
  • dataDir
  • dataLogDir
  • clientPort
  • authProvider
  • quorum.auth
  • requireClientAuthScheme

禁止されているオプションの 1 つが config プロパティーにある場合、そのオプションは無視され、警告メッセージが Cluster Operator ログファイルに出力されます。その他のオプションは、すべて ZooKeeper に渡されます。

重要

Cluster Operator では、提供された config オブジェクトのキーまたは値は検証されません。無効な設定を指定すると、ZooKeeper クラスターが起動しなかったり、不安定になる可能性があります。このような場合、Kafka.spec.zookeeper.config オブジェクトの設定を修正し、Cluster Operator によって新しい設定がすべての ZooKeeper ノードにロールアウトされるようにします。

選択したオプションのデフォルト値は次のとおりです。

  • timeTick、デフォルト値 2000
  • initLimit、デフォルト値 5
  • syncLimit、デフォルト値 2
  • autopurge.purgeInterval、デフォルト値 1

これらのオプションは、Kafka.spec.zookeeper.config プロパティーにない場合に自動的に設定されます。

許可される 3 つの ssl 設定オプションを使用して、TLS バージョンの固有の暗号スイートで外部リスナーを実行します。暗号スイートでは、セキュアな接続とデータ転送のためのアルゴリズムが組み合わされます。

ZooKeeper の設定例

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
spec:
  kafka:
    # ...
  zookeeper:
    # ...
    config:
      autopurge.snapRetainCount: 3
      autopurge.purgeInterval: 1
      ssl.cipher.suites: "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384" 1
      ssl.enabled.protocols: "TLSv1.2" 2
      ssl.protocol: "TLSv1.2" 3
    # ...

1
TLS の暗号スイートは、ECDHE 鍵交換メカニズム、RSA 認証アルゴリズム、AES 一括暗号化アルゴリズム、および SHA384 MAC アルゴリズムの組み合わせを使用します。
2
SSI プロトコル TLSv1.2 は有効になります。
3
TLSv1.2 プロトコルを指定し、SSL コンテキスト生成します。許可される値は TLSv1.1 および TLSv1.2 です。

3.1.9.2. ZooKeeper の設定

前提条件

  • OpenShift クラスターが利用できる必要があります。
  • Cluster Operator が稼働している必要があります。

手順

  1. クラスターデプロイメントを指定する Kafka リソースが含まれる YAML 設定ファイルを開きます。
  2. Kafka リソースの spec.zookeeper.config プロパティーで、1 つまたは複数の ZooKeeper 設定を指定します。以下に例を示します。

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    spec:
      kafka:
        # ...
      zookeeper:
        # ...
        config:
          autopurge.snapRetainCount: 3
          autopurge.purgeInterval: 1
        # ...
  3. 新しい設定を適用してリソースを作成または更新します。

    次のように oc apply を使用します。

    oc apply -f kafka.yaml

    kafka.yaml は、設定するリソースの YAML 設定ファイルに置き換えます (例: kafka-persistent.yaml)。

3.1.10. ZooKeeper の接続

ZooKeeper サービスは暗号化および認証でセキュア化され、AMQ Streams の一部でない外部アプリケーションでの使用は想定されていません。

しかし、kafka-topics ツールなどの ZooKeeper への接続を必要とする Kafka CLI ツールを使用する場合は、Kafka コンテナー内でターミナルを使用し、localhost:2181 を ZooKeeper アドレスとして使用して、TLS トンネルのローカル側を ZooKeeper に接続できます。

3.1.10.1. ターミナルからの ZooKeeper への接続

Kafka コンテナー内でターミナルを開き、ZooKeeper の接続を必要とする Kafka CLI ツールを使用します。

前提条件

  • OpenShift クラスターが利用できる必要があります。
  • Kafka クラスターが稼働している必要があります。
  • Cluster Operator が稼働している必要があります。

手順

  1. OpenShift コンソールを使用してターミナルを開くか、CLI から exec コマンドを実行します。

    以下に例を示します。

    oc exec -it my-cluster-kafka-0 -- bin/kafka-topics.sh --list --zookeeper localhost:2181

    必ず localhost:2181 を使用してください。

    ZooKeeper に対して Kafka コマンドを実行できるようになりました。

3.1.11. Entitiy Operator

Entity Operator は、実行中の Kafka クラスターで Kafka 関連のエンティティーを管理します。

Entity Operator は以下と設定されます。

Cluster Operator は Kafka リソース設定を介して、Kafka クラスターのデプロイ時に、上記の Operator の 1 つまたは両方を含む Entity Operator をデプロイできます。

注記

デプロイされると、デプロイメント設定に応じて、Entity Operator に operator が含まれます。

これらの operator は、Kafka クラスターのトピックおよびユーザーを管理するために自動的に設定されます。

3.1.11.1. Entity Operator の設定プロパティー

Kafka.specentityOperator プロパティーを使用して Entity Operator を設定します。

entityOperator プロパティーでは複数のサブプロパティーがサポートされます。

  • tlsSidecar
  • topicOperator
  • userOperator
  • template

tlsSidecar プロパティーには、ZooKeeper との通信に使用される TLS サイドカーコンテナーの設定が含まれます。TLS サイドカーコンテナーの設定に関する詳細は、「TLS サイドカー」 を参照してください。

template プロパティーには、ラベル、アノテーション、アフィニティー、および容認 (Toleration) などの Entity Operator Pod の設定が含まれます。テンプレートの設定に関する詳細は、「テンプレートプロパティー」 を参照してください。

topicOperator プロパティーには、Topic Operator の設定が含まれます。このオプションがないと、Entity Operator は Topic Operator なしでデプロイされます。

userOperator プロパティーには、User Operator の設定が含まれます。このオプションがないと、Entity Operator は User Operator なしでデプロイされます。

Entity Operator を設定するためのプロパティーに関する詳細は EntityUserOperatorSpec スキーマ参照 を参照してください。

両方の Operator を有効にする基本設定の例

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    # ...
  zookeeper:
    # ...
  entityOperator:
    topicOperator: {}
    userOperator: {}

topicOperator および userOperator に空のオブジェクト ({}) が使用された場合、すべてのプロパティーでデフォルト値が使用されます。

topicOperator および userOperator プロパティーの両方がない場合、Entity Operator はデプロイされません。

3.1.11.2. Topic Operator 設定プロパティー

Topic Operator デプロイメントは、topicOperator オブジェクト内で追加オプションを使用すると設定できます。以下のプロパティーがサポートされます。

watchedNamespace
Topic Operator によって KafkaTopics が監視される OpenShift namespace。デフォルトは、Kafka クラスターがデプロイされた namespace です。
reconciliationIntervalSeconds
定期的な調整 (reconciliation) の間隔 (秒単位)。デフォルト は 90 です。
zookeeperSessionTimeoutSeconds
ZooKeeper セッションのタイムアウト (秒単位)。デフォルト は 20 です。
topicMetadataMaxAttempts
Kafka からトピックメタデータの取得を試行する回数。各試行の間隔は、指数バックオフとして定義されます。パーティションまたはレプリカの数によって、トピックの作成に時間がかかる可能性がある場合は、この値を増やすことを検討してください。デフォルト は 6 です。
image
image プロパティーを使用すると、使用されるコンテナーイメージを設定できます。カスタムコンテナーイメージの設定に関する詳細は、「コンテナーイメージ」 を参照してください。
resources
resources プロパティーを使用すると、Topic Operator に割り当てられるリソースの量を設定できます。リソースの要求と制限の設定に関する詳細は、「CPU およびメモリーリソース」 を参照してください。
logging
logging プロパティーは、Topic Operator のロギングを設定します。詳細は、「Operator ロガー」 を参照してください。

Topic Operator 設定の例

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    # ...
  zookeeper:
    # ...
  entityOperator:
    # ...
    topicOperator:
      watchedNamespace: my-topic-namespace
      reconciliationIntervalSeconds: 60
    # ...

3.1.11.3. User Operator 設定プロパティー

User Operator デプロイメントは、userOperator オブジェクト内で追加オプションを使用すると設定できます。以下のプロパティーがサポートされます。

watchedNamespace
User Operator によって KafkaUsers が監視される OpenShift namespace。デフォルトは、Kafka クラスターがデプロイされた namespace です。
reconciliationIntervalSeconds
定期的な調整 (reconciliation) の間隔 (秒単位)。デフォルトは 120 です。
zookeeperSessionTimeoutSeconds
ZooKeeper セッションのタイムアウト (秒単位)。デフォルト は 6 です。
image
image プロパティーを使用すると、使用されるコンテナーイメージを設定できます。カスタムコンテナーイメージの設定に関する詳細は、「コンテナーイメージ」 を参照してください。
resources
resources プロパティーを使用すると、User Operator に割り当てられるリソースの量を設定できます。リソースの要求と制限の設定に関する詳細は、「CPU およびメモリーリソース」 を参照してください。
logging
logging プロパティーは、User Operator のロギングを設定します。詳細は、「Operator ロガー」 を参照してください。

User Operator 設定の例

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    # ...
  zookeeper:
    # ...
  entityOperator:
    # ...
    userOperator:
      watchedNamespace: my-user-namespace
      reconciliationIntervalSeconds: 60
    # ...

3.1.11.4. Operator ロガー

Topic Operator および User Operator には設定可能なロガーがあります。

  • rootLogger.level

これらの Operator では Apache log4j2 ロガー実装が使用されます。

Kafka リソースの logging プロパティーを使用して、ロガーおよびロガーレベルを設定します。

ログレベルを設定するには、ロガーとレベルを直接指定 (インライン) するか、またはカスタム (外部) ConfigMap を使用します。ConfigMap を使用する場合、logging.name プロパティーを外部ロギング設定が含まれる ConfigMap の名前に設定します。ConfigMap 内では、ロギング設定は log4j2.properties を使用して記述されます。

ここで、inline および external ロギングの例を示します。

inline ロギング

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    # ...
  zookeeper:
    # ...
  entityOperator:
    # ...
    topicOperator:
      watchedNamespace: my-topic-namespace
      reconciliationIntervalSeconds: 60
      logging:
        type: inline
        loggers:
          rootLogger.level: INFO
    # ...
    userOperator:
      watchedNamespace: my-topic-namespace
      reconciliationIntervalSeconds: 60
      logging:
        type: inline
        loggers:
          rootLogger.level: INFO
# ...

外部ロギング

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    # ...
  zookeeper:
    # ...
  entityOperator:
    # ...
    topicOperator:
      watchedNamespace: my-topic-namespace
      reconciliationIntervalSeconds: 60
      logging:
        type: external
        name: customConfigMap
# ...

その他のリソース

  • ガベッジコレクター (GC) ロギングを有効 (または無効) にすることもできます。GC ロギングの詳細は、「JVM 設定」 を参照してください。
  • ログレベルの詳細は、Apache logging services を参照してください。

3.1.11.5. Entity Operator の設定

前提条件

  • OpenShift クラスター
  • 稼働中の Cluster Operator

手順

  1. Kafka リソースの entityOperator プロパティーを編集します。以下に例を示します。

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        # ...
      zookeeper:
        # ...
      entityOperator:
        topicOperator:
          watchedNamespace: my-topic-namespace
          reconciliationIntervalSeconds: 60
        userOperator:
          watchedNamespace: my-user-namespace
          reconciliationIntervalSeconds: 60
  2. リソースを作成または更新します。

    oc apply を使用して、これを行うことができます。

    oc apply -f your-file

3.1.12. CPU およびメモリーリソース

AMQ Streams では、デプロイされたコンテナーごとに特定のリソースを要求し、これらのリソースの最大消費を定義できます。

AMQ Streams では、以下の 2 つのタイプのリソースがサポートされます。

  • CPU
  • メモリー

AMQ Streams では、CPU およびメモリーリソースの指定に OpenShift 構文が使用されます。

3.1.12.1. リソースの制限および要求

リソースの制限と要求は、以下のリソースで resources プロパティーを使用して設定されます。

  • Kafka.spec.kafka
  • Kafka.spec.kafka.tlsSidecar
  • Kafka.spec.zookeeper
  • Kafka.spec.entityOperator.topicOperator
  • Kafka.spec.entityOperator.userOperator
  • Kafka.spec.entityOperator.tlsSidecar
  • Kafka.spec.KafkaExporter
  • KafkaConnect.spec
  • KafkaConnectS2I.spec
  • KafkaBridge.spec

その他のリソース

3.1.12.1.1. リソース要求

要求によって、指定のコンテナーに対して予約するリソースが指定されます。リソースを予約すると、リソースが常に利用できるようになります。

重要

リソース要求が OpenShift クラスターで利用可能な空きリソースを超える場合、Pod はスケジュールされません。

リソース要求は requests プロパティーで指定されます。AMQ Streams では、現在以下のリソース要求がサポートされます。

  • cpu
  • memory

1 つまたは複数のサポートされるリソースに対してリクエストを設定できます。

すべてのリソースを対象とするリソース要求の設定例

# ...
resources:
  requests:
    cpu: 12
    memory: 64Gi
# ...

3.1.12.1.2. リソース制限

制限によって、指定のコンテナーが消費可能な最大リソースが指定されます。制限は予約されず、常に利用できるとは限りません。コンテナーは、リソースが利用できる場合のみ、制限以下のリソースを使用できます。リソース制限は、常にリソース要求よりも高くする必要があります。

リソース制限は limits プロパティーで指定されます。AMQ Streams では、現在以下のリソース制限がサポートされます。

  • cpu
  • memory

1 つまたは複数のサポートされる制限に対してリソースを設定できます。

リソース制限の設定例

# ...
resources:
  limits:
    cpu: 12
    memory: 64Gi
# ...

3.1.12.1.3. サポートされる CPU 形式

CPU の要求および制限は以下の形式でサポートされます。

  • 整数値 (5) または少数 (2.5) の CPU コアの数。
  • 数値または ミリ CPU / ミリコア (100m)。1000 ミリコア は CPU コア 1 つと同じです。

CPU ユニットの例

# ...
resources:
  requests:
    cpu: 500m
  limits:
    cpu: 2.5
# ...

注記

1 つの CPU コアのコンピューティング能力は、OpenShift がデプロイされたプラットフォームによって異なることがあります。

その他のリソース

  • CPU 仕様の詳細は、Meaning of CPU を参照してください。
3.1.12.1.4. サポートされるメモリー形式

メモリー要求および制限は、メガバイト、ギガバイト、メビバイト、およびギビバイトで指定されます。

  • メモリーをメガバイトで指定するには、M 接尾辞を使用します。たとえば、1000M のように指定します。
  • メモリーをギガバイトで指定するには、G 接尾辞を使用します。たとえば、1G のように指定します。
  • メモリーをメビバイトで指定するには、Mi 接尾辞を使用します。たとえば、1000Mi のように指定します。
  • メモリーをギビバイトで指定するには、Gi 接尾辞を使用します。たとえば、1Gi のように指定します。

異なるメモリー単位の使用例

# ...
resources:
  requests:
    memory: 512Mi
  limits:
    memory: 2Gi
# ...

その他のリソース

  • メモリーの指定およびサポートされるその他の単位に関する詳細は、Meaning of memory を参照してください。

3.1.12.2. リソース要求および制限の設定

前提条件

  • OpenShift クラスター
  • 稼働中の Cluster Operator

手順

  1. クラスターデプロイメントを指定するリソースの resources プロパティーを編集します。以下に例を示します。

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    spec:
      kafka:
        # ...
        resources:
          requests:
            cpu: "8"
            memory: 64Gi
          limits:
            cpu: "12"
            memory: 128Gi
        # ...
      zookeeper:
        # ...
  2. リソースを作成または更新します。

    oc apply を使用して、これを行うことができます。

    oc apply -f your-file

関連情報

  • スキーマの詳細は、{K8sResourceRequirementsAPI} を参照してください。

3.1.13. Kafka ロガー

Kafka には独自の設定可能なロガーがあります。

  • kafka.root.logger.level
  • log4j.logger.org.I0Itec.zkclient.ZkClient
  • log4j.logger.org.apache.zookeeper
  • log4j.logger.kafka
  • log4j.logger.org.apache.kafka
  • log4j.logger.kafka.request.logger
  • log4j.logger.kafka.network.Processor
  • log4j.logger.kafka.server.KafkaApis
  • log4j.logger.kafka.network.RequestChannel$
  • log4j.logger.kafka.controller
  • log4j.logger.kafka.log.LogCleaner
  • log4j.logger.state.change.logger
  • log4j.logger.kafka.authorizer.logger

ZooKeeper にも設定可能なロガーもあります。

  • zookeeper.root.logger

Kafka と ZooKeeper では Apache log4j ロガー実装が使用されます。

logging プロパティーを使用してロガーおよびロガーレベルを設定します。

ログレベルを設定するには、ロガーとレベルを直接指定 (インライン) するか、またはカスタム (外部) ConfigMap を使用します。ConfigMap を使用する場合、logging.name プロパティーを外部ロギング設定が含まれる ConfigMap の名前に設定します。ConfigMap 内では、ロギング設定は log4j.properties を使用して記述されます。

ここで、inline および external ロギングの例を示します。

inline ロギング

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
spec:
  # ...
  logging:
    type: inline
    loggers:
      kafka.root.logger.level: "INFO"
  # ...
  zookeeper:
    # ...
    logging:
      type: inline
      loggers:
        zookeeper.root.logger: "INFO"
  # ...
  entityOperator:
    # ...
    topicOperator:
      # ...
      logging:
        type: inline
        loggers:
          rootLogger.level: INFO
    # ...
    userOperator:
      # ...
      logging:
        type: inline
        loggers:
          rootLogger.level: INFO
    # ...

外部ロギング

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
spec:
  # ...
  logging:
    type: external
    name: customConfigMap
  # ...

log4j2.properties を使用して ConfigMap 内にロギング設定を記述するため、Operator によって Apache log4j2 ロガー実装が使用されます。詳細は、「Operator ロガー」 を参照してください。

関連情報

  • ガベッジコレクター (GC) ロギングを有効 (または無効) にすることもできます。ガべージコレクションの詳細は、「JVM 設定」 を参照してください。
  • ログレベルの詳細は、Apache logging services を参照してください。

3.1.14. Kafka のラックアウェアネス (Rack awareness)

AMQ Streams のラックアウェアネス (Rack awareness) 機能は、Kafka ブローカー Pod および Kafka トピックレプリカを異なるラック全体に分散できるようにします。ラック認識を有効にすることで、Kafka ブローカーや Kafka ブローカーがホストしているトピックの可用性を向上できるようにします。

注記

ラック (Rack) は、可用性ゾーン、データセンター、またはデータセンターの実際のラックを表す可能性があります。

3.1.14.1. Kafka ブローカーでのラック認識 (Rack awareness) の設定

Kafka のラック認識 (Rack awareness) は、Kafka.spec.kafkarack プロパティーで設定できます。rack オブジェクトには、topologyKey という名前の必須フィールドが 1 つあります。このキーは、OpenShift クラスターノードに割り当てられたラベルの 1 つと一致する必要があります。このラベルは、Kafka ブローカー Pod をノードにスケジュールする際に OpenShift によって使用されます。OpenShift クラスターがクラウドプロバイダープラットフォームで稼働している場合、そのラベルはノードが稼働している可用性ゾーンを表す必要があります。通常、ノードには、topologyKey の値として簡単に使用できる failure-domain.beta.kubernetes.io/zone のラベルが付けられます。これにより、ブローカー Pod がゾーン全体に分散され、Kafka ブローカー内にブローカーの broker.rack 設定パラメーターも設定されます。

前提条件

  • OpenShift クラスター
  • 稼働中の Cluster Operator

手順

  1. ノードがデプロイされたゾーンやラックを表すノードラベルについては、OpenShift 管理者に相談します。
  2. ラベルをトポロジーキーとして使用し、Kafka リソースの rack プロパティーを編集します。

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        # ...
        rack:
          topologyKey: failure-domain.beta.kubernetes.io/zone
        # ...
  3. リソースを作成または更新します。

    oc apply を使用して、これを行うことができます。

    oc apply -f your-file

関連情報

3.1.15. ヘルスチェック

ヘルスチェックは、アプリケーションの健全性を検証する定期的なテストです。ヘルスチェックプローブが失敗すると、OpenShift によってアプリケーションが正常でないと見なされ、その修正が試行されます。

OpenShift では、以下の 2 つのタイプのおよび ヘルスチェックプローブがサポートされます。

  • Liveness プローブ
  • Readiness プローブ

プローブの詳細は、Configure Liveness and Readiness Probes を参照してください。AMQ Streams コンポーネントでは、両タイプのプローブが使用されます。

ユーザーは、Liveness および Readiness プローブに選択されたオプションを設定できます。

3.1.15.1. Healthcheck の設定

Liveness および Readiness プローブは、以下のリソースの livenessProbe および readinessProbe プロパティーを使用して設定できます。

  • Kafka.spec.kafka
  • Kafka.spec.kafka.tlsSidecar
  • Kafka.spec.zookeeper
  • Kafka.spec.entityOperator.tlsSidecar
  • Kafka.spec.entityOperator.topicOperator
  • Kafka.spec.entityOperator.userOperator
  • Kafka.spec.KafkaExporter
  • KafkaConnect.spec
  • KafkaConnectS2I.spec
  • KafkaMirrorMaker.spec
  • KafkaBridge.spec

livenessProbe および readinessProbe の両方で以下のオプションがサポートされます。

  • initialDelaySeconds
  • timeoutSeconds
  • periodSeconds
  • successThreshold
  • failureThreshold

livenessProbe および readinessProbe のオプションに関する詳細は、Probe スキーマ参照」 を参照してください。

Liveness および Readiness プローブの設定例

# ...
readinessProbe:
  initialDelaySeconds: 15
  timeoutSeconds: 5
livenessProbe:
  initialDelaySeconds: 15
  timeoutSeconds: 5
# ...

3.1.15.2. Healthcheck の設定

前提条件

  • OpenShift クラスター
  • 稼働中の Cluster Operator

手順

  1. KafkaKafkaConnect、または KafkaConnectS2IlivenessProbe または readinessProbe プロパティーを編集します。以下に例を示します。

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        # ...
        readinessProbe:
          initialDelaySeconds: 15
          timeoutSeconds: 5
        livenessProbe:
          initialDelaySeconds: 15
          timeoutSeconds: 5
        # ...
      zookeeper:
        # ...
  2. リソースを作成または更新します。

    oc apply を使用して、これを行うことができます。

    oc apply -f your-file

3.1.16. Prometheus メトリクス

AMQ Streams では、Apache Kafka および ZooKeeper によってサポートされる JMX メトリクスを Prometheus メトリクスに変換するために、Prometheus JMX エクスポーター を使用した Prometheus メトリクスがサポートされます。有効になったメトリクスは、9404 番ポートで公開されます。

Prometheus および Grafana の設定およびデプロイに関する詳細は Kafka へのメトリクスの導入 を参照してください。

3.1.16.1. メトリクスの設定

Prometheus メトリクスは、以下のリソースに metrics プロパティーを設定して有効化されます。

  • Kafka.spec.kafka
  • Kafka.spec.zookeeper
  • KafkaConnect.spec
  • KafkaConnectS2I.spec

metrics プロパティーがリソースに定義されていない場合、Prometheus メトリクスは無効になります。追加設定なしで Prometheus メトリクスのエクスポートを有効にするには、空のオブジェクト ({}) を設定します。

追加設定なしでメトリクスを有効にする例

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    # ...
    metrics: {}
    # ...
  zookeeper:
    # ...

metrics プロパティーには、Prometheus JMX エスクポーター の追加設定が含まれることがあります。

追加の Prometheus JMX Exporter 設定を使用したメトリクスを有効化する例

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    # ...
    metrics:
      lowercaseOutputName: true
      rules:
        - pattern: "kafka.server<type=(.+), name=(.+)PerSec\\w*><>Count"
          name: "kafka_server_$1_$2_total"
        - pattern: "kafka.server<type=(.+), name=(.+)PerSec\\w*, topic=(.+)><>Count"
          name: "kafka_server_$1_$2_total"
          labels:
            topic: "$3"
    # ...
  zookeeper:
    # ...

3.1.16.2. Prometheus メトリクスの設定

前提条件

  • OpenShift クラスター
  • 稼働中の Cluster Operator

手順

  1. KafkaKafkaConnect、または KafkaConnectS2I リソースの metrics プロパティーを編集します。以下に例を示します。

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        # ...
      zookeeper:
        # ...
        metrics:
          lowercaseOutputName: true
        # ...
  2. リソースを作成または更新します。

    oc apply を使用して、これを行うことができます。

    oc apply -f your-file

3.1.17. JMX オプション

AMQ Streams では、JMX ポートを 9999 番で開放することで、Kafka ブローカーから JMX メトリクスを取得することがサポートされます。各 Kafka ブローカーに関するさまざまなメトリクスを取得できます。たとえば、BytesPerSecond の値やブローカーのネットワークの要求レートなどの、使用データを取得できます。AMQ Streams では、パスワードとユーザー名で保護された JMX ポートの開放や、保護されていない JMX ポートの開放がサポートされます。

3.1.17.1. JMX オプションの設定

前提条件

  • OpenShift クラスター
  • 稼働中の Cluster Operator。

以下のリソースで jmxOptions プロパティーを使用すると JMX オプションを設定できます。

  • Kafka.spec.kafka

Kafka ブローカーで開放された JMX ポートの、ユーザー名とパスワードの保護を設定できます。

JMX ポートのセキュリティー保護

JMX ポートをセキュアにすると、非承認の Pod によるポートへのアクセスを防ぐことができます。現在、JMX ポートをセキュアにする唯一の方法がユーザー名とパスワードを使用することです。JMX ポートのセキュリティーを有効にするには、authentication フィールドの type パラメーターを password に設定します。

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    # ...
    jmxOptions:
      authentication:
        type: "password"
    # ...
  zookeeper:
    # ...

これにより、ヘッドレスサービスを使用し、対応するブローカーを指定して Pod をクラスター内部にデプロイし、JMX メトリクスを取得することができます。ブローカー 0 から JMX メトリクスを取得するには、指定するヘッドレスサービスの前にブローカー 0 を追加します。

"<cluster-name>-kafka-0-<cluster-name>-<headless-service-name>"

JMX ポートがセキュアである場合、Pod のデプロイメントで JMX シークレットからユーザー名とパスワードを参照すると、そのユーザー名とパスワードを取得できます。

開放された JMX ポートの使用

JMX ポートのセキュリティーを無効にする場合は、authentication フィールドに何も入力しません。

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    # ...
    jmxOptions: {}
    # ...
  zookeeper:
    # ...

これにより、ヘッドレスサービスで JMX ポートを開放し、上記と似た方法で Pod をクラスター内にデプロイすることができます。唯一の違いは、すべての Pod が JMX ポートから読み取りできることです。

3.1.18. JVM オプション

AMQ Streams の以下のコンポーネントは、仮想マシン (VM) 内で実行されます。

  • Apache Kafka
  • Apache ZooKeeper
  • Apache Kafka Connect
  • Apache Kafka MirrorMaker
  • AMQ Streams Kafka Bridge

JVM 設定オプションによって、さまざまなプラットフォームおよびアーキテクチャーのパフォーマンスが最適化されます。AMQ Streams では、これらのオプションの一部を設定できます。

3.1.18.1. JVM 設定

JVM オプションは、以下のリソースの jvmOptions プロパティーを使用して設定できます。

  • Kafka.spec.kafka
  • Kafka.spec.zookeeper
  • KafkaConnect.spec
  • KafkaConnectS2I.spec
  • KafkaMirrorMaker.spec
  • KafkaBridge.spec

使用可能な JVM オプションの選択されたサブセットのみを設定できます。以下のオプションがサポートされます。

-Xms および -Xmx

-Xms は、JVM の開始時の最小初期割り当てヒープサイズを設定します。-Xmx は、最大ヒープサイズを設定します。

注記

-Xmx-Xms などの JVM 設定で使用できる単位は、対応するイメージの JDK java バイナリーによって許可される単位です。そのため、1g または 1G は 1,073,741,824 バイトを意味し、Gi は接尾辞として有効な単位ではありません。これは、1G は 1,000,000,000 バイト、1Gi は 1,073,741,824 バイトを意味する OpenShift の慣例に準拠している、メモリー要求および制限 に使用される単位とは対照的です。

-Xms および -Xmx に使用されるデフォルト値は、コンテナーに メモリー要求 の制限が設定されているかどうかによって異なります。

  • メモリーの制限がある場合は、JVM の最小および最大メモリーは制限に対応する値に設定されます。
  • メモリーの制限がない場合、JVM の最小メモリーは 128M に設定され、JVM の最大メモリーは定義されません。これにより、JVM のメモリーを必要に応じて拡張できます。これは、テストおよび開発での単一ノード環境に適しています。
重要

-Xmx を明示的に設定するには、以下の点に注意する必要があります。

  • JVM のメモリー使用量の合計は、-Xmx によって設定された最大ヒープの約 4 倍になります。
  • 適切な OpenShift メモリー制限を設定せずに -Xmx が設定された場合、OpenShift ノードで、実行されている他の Pod からメモリー不足が発生するとコンテナーが強制終了される可能性があります。
  • 適切な OpenShift メモリー要求を設定せずに -Xmx が設定された場合、コンテナーはメモリー不足のノードにスケジュールされる可能性があります。この場合、コンテナーは起動せずにクラッシュします (-Xms-Xmx に設定されている場合は即座にクラッシュし、そうでない場合はその後にクラッシュします)。

-Xmx を明示的に設定する場合は、以下を行うことが推奨されます。

  • メモリー要求とメモリー制限を同じ値に設定します。
  • -Xmx の 4.5 倍以上のメモリー要求を使用します。
  • -Xms を - Xmx と同じ値に設定することを検討してください。
重要

大量のディスク I/O を実行するコンテナー (Kafka ブローカーコンテナーなど) は、オペレーティングシステムのページキャッシュとして使用できるメモリーを確保しておく必要があります。このようなコンテナーでは、要求されるメモリーは JVM によって使用されるメモリーよりもはるかに多くなります。

-Xmx および -Xms の設定例 (抜粋)

# ...
jvmOptions:
  "-Xmx": "2g"
  "-Xms": "2g"
# ...

上記の例では、JVM のヒープに 2 GiB (2,147,483,648 バイト) が使用されます。メモリー使用量の合計は約 8GiB になります。

最初のヒープサイズ (-Xms) および最大ヒープサイズ (-Xmx) に同じ値を設定すると、JVM が必要以上のヒープを割り当てて起動後にメモリーを割り当てないようにすることができます。Kafka および ZooKeeper Pod では、このような割り当てによって不要なレイテンシーが発生する可能性があります。Kafka Connect では、割り当ての過剰を防ぐことが最も重要になります。これは、コンシューマーの数が増えるごとに割り当て過剰の影響がより深刻になる分散モードで特に重要です。

-server

-server はサーバー JVM を有効にします。このオプションは true または false に設定できます。

-server の設定例 (抜粋)

# ...
jvmOptions:
  "-server": true
# ...

注記

いずれのオプション (-server および -XX) も指定されないと、Apache Kafka の KAFKA_JVM_PERFORMANCE_OPTS のデフォルト設定が使用されます。

-XX

-XX オブジェクトは、JVM の高度なランタイムオプションの設定に使用できます。-server および -XX オプションは、Apache Kafka の KAFKA_JVM_PERFORMANCE_OPTS オプションの設定に使用されます。

-XX オブジェクトの使用例

jvmOptions:
  "-XX":
    "UseG1GC": true
    "MaxGCPauseMillis": 20
    "InitiatingHeapOccupancyPercent": 35
    "ExplicitGCInvokesConcurrent": true
    "UseParNewGC": false

上記の設定例の場合、JVM オプションは以下のようになります。

-XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:-UseParNewGC
注記

いずれのオプション (-server および -XX) も指定されないと、Apache Kafka の KAFKA_JVM_PERFORMANCE_OPTS のデフォルト設定が使用されます。

3.1.18.1.1. ガベッジコレクターのロギング

JvmOptions セクションでは、ガベージコレクター (GC) のロギングを有効または無効にすることもできます。GC ロギングはデフォルトで無効になっています。これを有効にするには、以下のように gcLoggingEnabled プロパティーを設定します。

GC ロギングを有効にする例

# ...
jvmOptions:
  gcLoggingEnabled: true
# ...

3.1.18.2. JVM オプションの設定

前提条件

  • OpenShift クラスター
  • 稼働中の Cluster Operator

手順

  1. KafkaKafkaConnectKafkaConnectS2IKafkaMirrorMaker、または KafkaBridge リソースの jvmOptions プロパティーを編集します。以下に例を示します。

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        # ...
        jvmOptions:
          "-Xmx": "8g"
          "-Xms": "8g"
        # ...
      zookeeper:
        # ...
  2. リソースを作成または更新します。

    oc apply を使用して、これを行うことができます。

    oc apply -f your-file

3.1.19. コンテナーイメージ

AMQ Streams では、コンポーネントに使用されるコンテナーイメージを設定できます。コンテナーイメージのオーバーライドは、別のコンテナーレジストリーを使用する必要がある特別な状況でのみ推奨されます。たとえば、AMQ Streams によって使用されるコンテナーリポジトリーにネットワークがアクセスできない場合などがこれに該当します。そのような場合は、AMQ Streams イメージをコピーするか、ソースからビルドする必要があります。設定したイメージが AMQ Streams イメージと互換性のない場合は、適切に機能しない可能性があります。

3.1.19.1. コンテナーイメージの設定

以下のリソースの image プロパティーを使用すると、各コンポーネントに使用するコンテナーイメージを指定できます。

  • Kafka.spec.kafka
  • Kafka.spec.kafka.tlsSidecar
  • Kafka.spec.zookeeper
  • Kafka.spec.entityOperator.topicOperator
  • Kafka.spec.entityOperator.userOperator
  • Kafka.spec.entityOperator.tlsSidecar
  • KafkaConnect.spec
  • KafkaConnectS2I.spec
  • KafkaBridge.spec
3.1.19.1.1. Kafka、Kafka Connect、および Kafka MirrorMaker の image プロパティーの設定

Kafka、Kafka Connect (S2I サポートのある Kafka Connect を含む)、および Kafka MirrorMaker では、複数のバージョンの Kafka がサポートされます。各コンポーネントには独自のイメージが必要です。異なる Kafka バージョンのデフォルトイメージは、以下の環境変数で設定されます。

  • STRIMZI_KAFKA_IMAGES
  • STRIMZI_KAFKA_CONNECT_IMAGES
  • STRIMZI_KAFKA_CONNECT_S2I_IMAGES
  • STRIMZI_KAFKA_MIRROR_MAKER_IMAGES

これらの環境変数には、Kafka バージョンと対応するイメージ間のマッピングが含まれます。マッピングは、image および version プロパティーとともに使用されます。

  • imageversion のどちらもカスタムリソースに指定されていない場合、version は Cluster Operator のデフォルトの Kafka バージョンに設定され、環境変数のこのバージョンに対応するイメージが指定されます。
  • image が指定されていても version が指定されていない場合、指定されたイメージが使用され、Cluster Operator のデフォルトの Kafka バージョンが version であると想定されます。
  • version が指定されていても image が指定されていない場合、環境変数の指定されたバージョンに対応するイメージが使用されます。
  • versionimage の両方を指定すると、指定されたイメージが使用されます。このイメージには、指定のバージョンの Kafka イメージが含まれると想定されます。

異なるコンポーネントの image および version は、以下のプロパティーで設定できます。

  • Kafka の場合は spec.kafka.image および spec.kafka.version
  • Kafka Connect、Kafka Connect S2I、および Kafka MirrorMaker の場合は spec.image および spec.version
警告

version のみを提供し、image プロパティーを未指定のままにしておくことが推奨されます。これにより、カスタムリソースの設定時に間違いが発生する可能性が低減されます。異なるバージョンの Kafka に使用されるイメージを変更する必要がある場合は、Cluster Operator の環境変数を設定することが推奨されます。

3.1.19.1.2. 他のリソースでの image プロパティーの設定

他のカスタムリソースの image プロパティーでは、デプロイメント中に指定の値が使用されます。image プロパティーがない場合、Cluster Operator 設定に指定された image が使用されます。image 名が Cluster Operator 設定に定義されていない場合、デフォルト値が使用されます。

  • Kafka ブローカー TLS サイドカーの場合:

    1. Cluster Operator 設定から STRIMZI_DEFAULT_TLS_SIDECAR_KAFKA_IMAGE 環境変数に指定されたコンテナーイメージ。
    2. registry.redhat.io/amq7/amq-streams-kafka-25-rhel7:1.5.0 コンテナーイメージ。
  • Topic Operator の場合:

    1. Cluster Operator 設定から STRIMZI_DEFAULT_TOPIC_OPERATOR_IMAGE 環境変数に指定されたコンテナーイメージ。
    2. registry.redhat.io/amq7/amq-streams-rhel7-operator:1.5.0 コンテナーイメージ。
  • User Operator の場合:

    1. Cluster Operator 設定から STRIMZI_DEFAULT_USER_OPERATOR_IMAGE 環境変数に指定されたコンテナーイメージ。
    2. registry.redhat.io/amq7/amq-streams-rhel7-operator:1.5.0 コンテナーイメージ。
  • Entity Operator TLS サイドカーの場合:

    1. Cluster Operator 設定から STRIMZI_DEFAULT_TLS_SIDECAR_ENTITY_OPERATOR_IMAGE 環境変数に指定されたコンテナーイメージ。
    2. registry.redhat.io/amq7/amq-streams-kafka-25-rhel7:1.5.0 コンテナーイメージ。
  • Kafka Exporter の場合:

    1. Cluster Operator 設定から STRIMZI_DEFAULT_KAFKA_EXPORTER_IMAGE 環境変数に指定されたコンテナーイメージ。
    2. registry.redhat.io/amq7/amq-streams-kafka-25-rhel7:1.5.0 コンテナーイメージ。
  • Kafka Bridge の場合:

    1. Cluster Operator 設定から STRIMZI_DEFAULT_KAFKA_BRIDGE_IMAGE 環境変数に指定されたコンテナーイメージ。
    2. registry.redhat.io/amq7/amq-streams-bridge-rhel7:1.5.0 コンテナーイメージ。
  • Kafka ブローカーイニシャライザーの場合:

    1. Cluster Operator 設定から STRIMZI_DEFAULT_KAFKA_INIT_IMAGE 環境変数に指定されたコンテナーイメージ。
    2. registry.redhat.io/amq7/amq-streams-rhel7-operator:1.5.0 コンテナーイメージ。
警告

コンテナーイメージのオーバーライドは、別のコンテナーレジストリーを使用する必要がある特別な状況でのみ推奨されます。たとえば、AMQ Streams によって使用されるコンテナーリポジトリーにネットワークがアクセスできない場合などがこれに該当します。そのような場合は、AMQ Streams イメージをコピーするか、ソースからビルドする必要があります。設定したイメージが AMQ Streams イメージと互換性のない場合は、適切に機能しない可能性があります。

コンテナーイメージ設定の例

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    # ...
    image: my-org/my-image:latest
    # ...
  zookeeper:
    # ...

3.1.19.2. コンテナーイメージの設定

前提条件

  • OpenShift クラスター
  • 稼働中の Cluster Operator

手順

  1. KafkaKafkaConnect、または KafkaConnectS2I リソースの image プロパティーを編集します。以下に例を示します。

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        # ...
        image: my-org/my-image:latest
        # ...
      zookeeper:
        # ...
  2. リソースを作成または更新します。

    oc apply を使用して、これを行うことができます。

    oc apply -f your-file

3.1.20. TLS サイドカー

サイドカーは、Pod で実行されるコンテナーですが、サポートの目的で提供されます。AMQ Streams では、TLS サイドカーは TLS を使用して、各種のコンポーネントと ZooKeeper との間のすべての通信を暗号化および復号化します。ZooKeeper にはネイティブの TLS サポートがありません。

TLS サイドカーは以下で使用されます。

  • Kafka ブローカー
  • ZooKeeper ノード
  • Entitiy Operator

3.1.20.1. TLS サイドカー設定

TLS サイドカーは、以下で tlsSidecar プロパティーを使用して設定できます。

  • Kafka.spec.kafka
  • Kafka.spec.zookeeper
  • Kafka.spec.entityOperator

TLS サイドカーは、以下の追加オプションをサポートします。

  • image
  • resources
  • logLevel
  • readinessProbe
  • livenessProbe

resources プロパティーを使用すると、TLS サイドカーに割り当てられる メモリーおよび CPU リソース を指定できます。

image プロパティーを使用すると、使用されるコンテナーイメージを設定できます。カスタムコンテナーイメージの設定に関する詳細は、「コンテナーイメージ」 を参照してください。

logLevel プロパティーは、ログレベルを指定するために使用されます。以下のログレベルがサポートされます。

  • emerg
  • alert
  • crit
  • err
  • warning
  • notice
  • info
  • debug

デフォルト値は notice です。

Healthcheck のために readinessProbe および livenessProbe プロパティーを設定するための詳細は 「Healthcheck の設定」 を参照してください。

TLS サイドカーの設定例

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    # ...
    tlsSidecar:
      image: my-org/my-image:latest
      resources:
        requests:
          cpu: 200m
          memory: 64Mi
        limits:
          cpu: 500m
          memory: 128Mi
      logLevel: debug
      readinessProbe:
        initialDelaySeconds: 15
        timeoutSeconds: 5
      livenessProbe:
        initialDelaySeconds: 15
        timeoutSeconds: 5
    # ...
  zookeeper:
    # ...

3.1.20.2. TLS サイドカーの設定

前提条件

  • OpenShift クラスター
  • 稼働中の Cluster Operator

手順

  1. Kafka リソースの tlsSidecar プロパティーを編集します。以下に例を示します。

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        # ...
        tlsSidecar:
          resources:
            requests:
              cpu: 200m
              memory: 64Mi
            limits:
              cpu: 500m
              memory: 128Mi
        # ...
      zookeeper:
        # ...
  2. リソースを作成または更新します。

    oc apply を使用して、これを行うことができます。

    oc apply -f your-file

3.1.21. Pod スケジューリングの設定

重要

2 つのアプリケーションが同じ OpenShift ノードにスケジュールされた場合、両方のアプリケーションがディスク I/O のように同じリソースを使用し、パフォーマンスに影響する可能性があります。これにより、パフォーマンスが低下する可能性があります。ノードを他の重要なワークロードと共有しないように Kafka Pod をスケジュールする場合、適切なノードを使用したり、Kafka 専用のノードのセットを使用すると、このような問題を適切に回避できます。

3.1.21.1. 他のアプリケーションに基づく Pod のスケジューリング

3.1.21.1.1. 重要なアプリケーションがノードを共有しないようにする

Pod の非アフィニティーを使用すると、重要なアプリケーションが同じディスクにスケジュールされないようにすることができます。Kafka クラスターの実行時に、Pod の非アフィニティーを使用して、Kafka ブローカーがデータベースなどの他のワークロードとノードを共有しないようにすることが推奨されます。

3.1.21.1.2. アフィニティー

以下のリソースで affinity をプロパティーを使用すると、アフィニティーを設定できます。

  • Kafka.spec.kafka.template.pod
  • Kafka.spec.zookeeper.template.pod
  • Kafka.spec.entityOperator.template.pod
  • KafkaConnect.spec.template.pod
  • KafkaConnectS2I.spec.template.pod
  • KafkaBridge.spec.template.pod

アフィニティー設定には、さまざまなタイプのアフィニティーを含めることができます。

  • Pod のアフィニティーおよび非アフィニティー
  • ノードのアフィニティー

affinity プロパティーの形式は、OpenShift の仕様に準拠します。詳細は、Kubernetes のノードおよび Pod のアフィニティーに関するドキュメント を参照してください。

3.1.21.1.3. Kafka コンポーネントでの Pod の非アフィニティーの設定

前提条件

  • OpenShift クラスター
  • 稼働中の Cluster Operator

手順

  1. クラスターデプロイメントを指定するリソースの affinity プロパティーを編集します。ラベルを使用して、同じノードでスケジュールすべきでない Pod を指定します。topologyKeykubernetes.io/hostname に設定し、選択した Pod が同じホスト名のノードでスケジュールされてはならないことを指定する必要があります。以下に例を示します。

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    spec:
      kafka:
        # ...
        template:
          pod:
            affinity:
              podAntiAffinity:
                requiredDuringSchedulingIgnoredDuringExecution:
                  - labelSelector:
                      matchExpressions:
                        - key: application
                          operator: In
                          values:
                            - postgresql
                            - mongodb
                    topologyKey: "kubernetes.io/hostname"
        # ...
      zookeeper:
        # ...
  2. リソースを作成または更新します。

    oc apply を使用して、これを行うことができます。

    oc apply -f your-file

3.1.21.2. 特定のノードへの Pod のスケジューリング

3.1.21.2.1. ノードのスケジューリング

OpenShift クラスターは、通常多くの異なるタイプのワーカーノードで設定されます。ワークロードが非常に大きい環境の CPU に対して最適化されたものもあれば、メモリー、ストレージ (高速のローカル SSD)、または ネットワークに対して最適化されたものもあります。異なるノードを使用すると、コストとパフォーマンスの両面で最適化しやすくなります。最適なパフォーマンスを実現するには、AMQ Streams コンポーネントのスケジューリングで適切なノードを使用できるようにすることが重要です。

OpenShift はノードのアフィニティーを使用してワークロードを特定のノードにスケジュールします。ノードのアフィニティーにより、Pod がスケジュールされるノードにスケジューリングの制約を作成できます。制約はラベルセレクターとして指定されます。beta.kubernetes.io/instance-type などの組み込みノードラベルまたはカスタムラベルのいずれかを使用してラベルを指定すると、適切なノードを選択できます。

3.1.21.2.2. アフィニティー

以下のリソースで affinity をプロパティーを使用すると、アフィニティーを設定できます。

  • Kafka.spec.kafka.template.pod
  • Kafka.spec.zookeeper.template.pod
  • Kafka.spec.entityOperator.template.pod
  • KafkaConnect.spec.template.pod
  • KafkaConnectS2I.spec.template.pod
  • KafkaBridge.spec.template.pod

アフィニティー設定には、さまざまなタイプのアフィニティーを含めることができます。

  • Pod のアフィニティーおよび非アフィニティー
  • ノードのアフィニティー

affinity プロパティーの形式は、OpenShift の仕様に準拠します。詳細は、Kubernetes のノードおよび Pod のアフィニティーに関するドキュメント を参照してください。

3.1.21.2.3. Kafka コンポーネントでのノードのアフィニティーの設定

前提条件

  • OpenShift クラスター
  • 稼働中の Cluster Operator

手順

  1. AMQ Streams コンポーネントをスケジュールする必要のあるノードにラベルを付けます。

    oc label を使用してこれを行うことができます。

    oc label node your-node node-type=fast-network

    または、既存のラベルによっては再利用が可能です。

  2. クラスターデプロイメントを指定するリソースの affinity プロパティーを編集します。以下に例を示します。

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    spec:
      kafka:
        # ...
        template:
          pod:
            affinity:
              nodeAffinity:
                requiredDuringSchedulingIgnoredDuringExecution:
                  nodeSelectorTerms:
                    - matchExpressions:
                      - key: node-type
                        operator: In
                        values:
                        - fast-network
        # ...
      zookeeper:
        # ...
  3. リソースを作成または更新します。

    oc apply を使用して、これを行うことができます。

    oc apply -f your-file

3.1.21.3. 専用ノードの使用

3.1.21.3.1. 専用ノード

クラスター管理者は、選択した OpenShift ノードをテイントとしてマーク付けできます。テイントのあるノードは、通常のスケジューリングから除外され、通常の Pod はそれらのノードでの実行はスケジュールされません。ノードに設定されたテイントを許容できるサービスのみをスケジュールできます。このようなノードで実行されるその他のサービスは、ログコレクターやソフトウェア定義のネットワークなどのシステムサービスのみです。

テイントは専用ノードの作成に使用できます。専用のノードで Kafka とそのコンポーネントを実行する利点は多くあります。障害の原因になったり、Kafka に必要なリソースを消費するその他のアプリケーションが同じノードで実行されません。これにより、パフォーマンスと安定性が向上します。

専用ノードで Kafka Pod をスケジュールするには、ノードのアフィニティー許容 (toleration) を設定します。

3.1.21.3.2. アフィニティー

以下のリソースで affinity をプロパティーを使用すると、アフィニティーを設定できます。

  • Kafka.spec.kafka.template.pod
  • Kafka.spec.zookeeper.template.pod
  • Kafka.spec.entityOperator.template.pod
  • KafkaConnect.spec.template.pod
  • KafkaConnectS2I.spec.template.pod
  • KafkaBridge.spec.template.pod

アフィニティー設定には、さまざまなタイプのアフィニティーを含めることができます。

  • Pod のアフィニティーおよび非アフィニティー
  • ノードのアフィニティー

affinity プロパティーの形式は、OpenShift の仕様に準拠します。詳細は、Kubernetes のノードおよび Pod のアフィニティーに関するドキュメント を参照してください。

3.1.21.3.3. 許容 (Toleration)

以下のリソースで tolerations プロパティーを使用すると許容 (Toleration) を設定できます。

  • Kafka.spec.kafka.template.pod
  • Kafka.spec.zookeeper.template.pod
  • Kafka.spec.entityOperator.template.pod
  • KafkaConnect.spec.template.pod
  • KafkaConnectS2I.spec.template.pod
  • KafkaBridge.spec.template.pod

tolerations プロパティーの形式は、OpenShift の仕様に準拠します。詳細は、Kubernetes の Taints and Tolerations を参照してください。

3.1.21.3.4. 専用ノードの設定と Pod のスケジューリング

前提条件

  • OpenShift クラスター
  • 稼働中の Cluster Operator

手順

  1. 専用ノードとして使用するノードを選択します。
  2. これらのノードにスケジュールされているワークロードがないことを確認します。
  3. 選択したノードにテイントを設定します。

    oc adm taint を使用してこれを行うことができます。

    oc adm taint node your-node dedicated=Kafka:NoSchedule
  4. さらに、選択したノードにラベルも追加します。

    oc label を使用してこれを行うことができます。

    oc label node your-node dedicated=Kafka
  5. クラスターデプロイメントを指定するリソースの affinity および tolerations プロパティーを編集します。以下に例を示します。

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    spec:
      kafka:
        # ...
        template:
          pod:
            tolerations:
              - key: "dedicated"
                operator: "Equal"
                value: "Kafka"
                effect: "NoSchedule"
            affinity:
              nodeAffinity:
                requiredDuringSchedulingIgnoredDuringExecution:
                  nodeSelectorTerms:
                  - matchExpressions:
                    - key: dedicated
                      operator: In
                      values:
                      - Kafka
        # ...
      zookeeper:
        # ...
  6. リソースを作成または更新します。

    oc apply を使用して、これを行うことができます。

    oc apply -f your-file

3.1.22. Kafka Exporter

Kafka リソースを設定すると、クラスターに Kafka Exporter を自動的にデプロイできます。

Kafka Exporter は、主にオフセット、コンシューマーグループ、コンシューマーラグ、およびトピックに関連するデータである分析用のデータを Prometheus メトリクスとして抽出します。

Kafka Exporter の詳細と、パフォーマンスのためにコンシューマーラグを監視する重要性の理由については、Kafka Exporter を参照してください。

3.1.22.1. Kafka Exporter の設定

この手順では、KafkaExporter プロパティーから Kafka リソースの Kafka Exporter を設定する方法を説明します。

Kafka リソースの設定に関する詳細は Kafka YAML の設定例 を参照してください。

この手順では、Kafka Exporter 設定に関連するプロパティーを取り上げます。

これらのプロパティーは、Kafka クラスターのデプロイメントまたは再デプロイメントの一部として設定できます。

前提条件

  • OpenShift クラスター
  • 稼働中の Cluster Operator

手順

  1. Kafka リソースの KafkaExporter プロパティーを編集します。

    設定可能なプロパティーは以下の例のとおりです。

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      # ...
      kafkaExporter:
        image: my-org/my-image:latest 1
        groupRegex: ".*" 2
        topicRegex: ".*" 3
        resources: 4
          requests:
            cpu: 200m
            memory: 64Mi
          limits:
            cpu: 500m
            memory: 128Mi
        logging: debug 5
        enableSaramaLogging: true 6
        template: 7
          pod:
            metadata:
              labels:
                label1: value1
            imagePullSecrets:
              - name: my-docker-credentials
            securityContext:
              runAsUser: 1000001
              fsGroup: 0
            terminationGracePeriodSeconds: 120
        readinessProbe: 8
          initialDelaySeconds: 15
          timeoutSeconds: 5
        livenessProbe: 9
          initialDelaySeconds: 15
          timeoutSeconds: 5
    # ...
    1
    高度な任意手順: 特別な場合のみ推奨される コンテナーイメージの設定。
    2
    メトリクスに含まれるコンシューマーグループを指定する正規表現。
    3
    メトリクスに含まれるトピックを指定する正規表現。
    4
    5
    指定の重大度 (debug、info、warn、error、fatal) 以上でメッセージをログに記録するためのログ設定。
    6
    Sarama ロギングを有効にするブール値 (Kafka Exporter によって使用される Go クライアントライブラリー)。
    7
    8
    9
  2. リソースを作成または更新します。

    oc apply -f kafka.yaml

次のステップ

Kafka Exporter の設定およびデプロイ後に、Grafana を有効にして Kafka Exporter ダッシュボードを表示 できます。

3.1.23. Kafka クラスターのローリング更新の実行

この手順では、OpenShift アノテーションを使用して、既存の Kafka クラスターのローリング更新を手動でトリガーする方法を説明します。

前提条件

  • 稼働中の Kafka クラスター。
  • 稼働中の Cluster Operator。

手順

  1. 手動で更新する Kafka Pod を制御する StatefulSet の名前を見つけます。

    たとえば、Kafka クラスターの名前が my-cluster の場合、対応する StatefulSet の名前は my-cluster-kafka になります。

  2. OpenShift で StatefulSet リソースにアノテーションを付けます。たとえば、oc annotate を使用すると以下のようになります。

    oc annotate statefulset cluster-name-kafka strimzi.io/manual-rolling-update=true
  3. 次の調整が発生するまで待ちます (デフォルトでは 2 分ごとです)。アノテーションが調整プロセスで検出されれば、アノテーションが付いた StatefulSet 内のすべての Pod でローリング更新がトリガーされます。すべての Pod のローリング更新が完了すると、アノテーションは StatefulSet から削除されます。

関連情報

3.1.24. ZooKeeper クラスターのローリング更新の実行

この手順では、OpenShift アノテーションを使用して、既存の ZooKeeper クラスターのローリング更新を手動でトリガーする方法を説明します。

前提条件

  • 稼働中の ZooKeeper クラスター。
  • 稼働中の Cluster Operator。

手順

  1. 手動で更新する ZooKeeper Pod を制御する StatefulSet の名前を見つけます。

    たとえば、Kafka クラスターの名前が my-cluster の場合、ZooKeeper の対応する StatefulSet の名前は my-cluster-zookeeper になります。

  2. OpenShift で StatefulSet リソースにアノテーションを付けます。たとえば、oc annotate を使用すると以下のようになります。

    oc annotate statefulset cluster-name-zookeeper strimzi.io/manual-rolling-update=true
  3. 次の調整が発生するまで待ちます (デフォルトでは 2 分ごとです)。アノテーションが調整プロセスで検出されれば、アノテーションが付いた StatefulSet 内のすべての Pod でローリング更新がトリガーされます。すべての Pod のローリング更新が完了すると、アノテーションは StatefulSet から削除されます。

関連情報

3.1.25. クラスターのスケーリング

3.1.25.1. Kafka クラスターのスケーリング

3.1.25.1.1. ブローカーのクラスターへの追加

トピックのスループットを向上させる主な方法は、そのトピックのパーティション数を増やすことです。これにより、追加のパーティションによってクラスター内の異なるブローカー間でトピックの負荷が共有されます。ただし、各ブローカーが特定のリソース (通常は I/O) によって制約される場合、パーティションを増やしてもスループットは向上しません。代わりに、ブローカーをクラスターに追加する必要があります。

追加のブローカーをクラスターに追加する場合、Kafka ではパーティションは自動的に割り当てられません。既存のブローカーから新規のブローカーに移動するパーティションを決定する必要があります。

すべてのブローカー間でパーティションが再分散されたら、各ブローカーのリソース使用率が低下するはずです。

3.1.25.1.2. クラスターからのブローカーの削除

AMQ Streams では StatefulSets を使用してブローカー Pod を管理されるため、あらゆる Pod を削除できるわけではありません。クラスターから削除できるのは、番号が最も大きい 1 つまたは複数の Pod のみです。たとえば、12 個のブローカーがあるクラスターでは、Pod の名前は cluster-name-kafka-0 から cluster-name-kafka-11 になります。1 つのブローカー分をスケールダウンする場合、cluster-name-kafka-11 が削除されます。

クラスターからブローカーを削除する前に、そのブローカーにパーティションが割り当てられていないことを確認します。また、使用が停止されたブローカーの各パーティションを引き継ぐ、残りのブローカーを決める必要もあります。ブローカーに割り当てられたパーティションがなければ、クラスターを安全にスケールダウンできます。

3.1.25.2. パーティションの再割り当て

現在、Topic Operator はレプリカを別のブローカーに再割当てすることをサポートしないため、ブローカー Pod に直接接続してレプリカをブローカーに再割り当てする必要があります。

ブローカー Pod 内では、kafka-reassign-partitions.sh ユーティリティーを使用してパーティションを別のブローカーに再割り当てできます。

これには、以下の 3 つのモードがあります。

--generate
トピックとブローカーのセットを取り、再割り当て JSON ファイル を生成します。これにより、トピックのパーティションがブローカーに割り当てられます。これはトピック全体で動作するため、一部のトピックのパーティションを再割り当てする必要がある場合は使用できません。
--execute
再割り当て JSON ファイル を取得し、クラスターのパーティションおよびブローカーに適用します。その結果、パーティションを取得したブローカーは、パーティションリーダーのフォロワーになります。新規ブローカーが ISR (同期レプリカ) に参加できたら、古いブローカーはフォロワーではなくなり、そのレプリカが削除されます。
--verify
--verify は、-- execute ステップと同じ 再割り当て JSON ファイル を使用して、ファイル内のすべてのパーティションが目的のブローカーに移動されたかどうかを確認します。再割り当てが完了すると、--verify は有効な スロットル も削除します。スロットルを削除しないと、再割り当てが完了した後もクラスターは影響を受け続けます。

クラスターでは、1 度に 1 つの再割り当てのみを実行でき、実行中の再割り当てをキャンセルすることはできません。再割り当てをキャンセルする必要がある場合は、割り当てが完了するのを待ってから別の再割り当てを実行し、最初の再割り当ての結果を元に戻します。kafka-reassign-partitions.sh によって、元に戻すための再割り当て JSON が出力の一部として生成されます。大規模な再割り当ては、進行中の再割り当てを停止する必要がある場合に備えて、複数の小さな再割り当てに分割するようにしてください。

3.1.25.2.1. 再割り当て JSON ファイル

再割り当て JSON ファイル には特定の構造があります。

{
  "version": 1,
  "partitions": [
    <PartitionObjects>
  ]
}

ここで <PartitionObjects> は、以下のようなコンマ区切りのオブジェクトリストになります。

{
  "topic": <TopicName>,
  "partition": <Partition>,
  "replicas": [ <AssignedBrokerIds> ]
}
注記

Kafka は "log_dirs" プロパティーもサポートしますが、AMQ Streams では使用しないでください。

以下は、トピック topic-a およびパーティション 4 をブローカー 24 および 7 に割り当て、トピック topic-b およびパーティション 2 をブローカー 15、および 7 に割り当てる、再割り当て JSON ファイルの例になります。

{
  "version": 1,
  "partitions": [
    {
      "topic": "topic-a",
      "partition": 4,
      "replicas": [2,4,7]
    },
    {
      "topic": "topic-b",
      "partition": 2,
      "replicas": [1,5,7]
    }
  ]
}

JSON に含まれていないパーティションは変更されません。

3.1.25.2.2. JBOD ボリューム間でのパーティションの再割り当て

Kafka クラスターで JBOD ストレージを使用する場合は、特定のボリュームとログディレクトリー (各ボリュームに単一のログディレクトリーがある) との間でパーティションを再割り当てを選択することができます。パーティションを特定のボリュームに再割り当てするには、再割り当て JSON ファイルで log_dirs オプションを <PartitionObjects> に追加します。

{
  "topic": <TopicName>,
  "partition": <Partition>,
  "replicas": [ <AssignedBrokerIds> ],
  "log_dirs": [ <AssignedLogDirs> ]
}

log_dirs オブジェクトに含まれるログディレクトリーの数は、replicas オブジェクトで指定されるレプリカ数と同じである必要があります。値は、ログディレクトリーへの絶対パスか、any キーワードである必要があります。

以下に例を示します。

{
      "topic": "topic-a",
      "partition": 4,
      "replicas": [2,4,7].
      "log_dirs": [ "/var/lib/kafka/data-0/kafka-log2", "/var/lib/kafka/data-0/kafka-log4", "/var/lib/kafka/data-0/kafka-log7" ]
}

3.1.25.3. 再割り当て JSON ファイルの生成

この手順では、kafka-reassign-partitions.sh ツールを使用して、指定のトピックセットすべてのパーティションを再割り当てする再割り当て JSON ファイルを生成する方法を説明します。

前提条件

  • 稼働中の Cluster Operator。
  • Kafka リソース。
  • パーティションを再割り当てするトピックセット。

手順

  1. 移動するトピックを一覧表示する topics.json という名前の JSON ファイルを準備します。これには、以下の構造が必要です。

    {
      "version": 1,
      "topics": [
        <TopicObjects>
      ]
    }

    ここで <TopicObjects> は、以下のようなコンマ区切りのオブジェクトリストになります。

    {
      "topic": <TopicName>
    }

    たとえば、topic-atopic-b のすべてのパーティションを再割り当てするには、以下のような topics.json ファイルを準備する必要があります。

    {
      "version": 1,
      "topics": [
        { "topic": "topic-a"},
        { "topic": "topic-b"}
      ]
    }
  2. topics.json ファイルをブローカー Pod の 1 つにコピーします。

    cat topics.json | oc exec -c kafka <BrokerPod> -i -- \
      /bin/bash -c \
      'cat > /tmp/topics.json'
  3. kafka-reassign-partitions.sh コマンドを使用して、再割り当て JSON を生成します。

    oc exec <BrokerPod> -c kafka -it -- \
      bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 \
      --topics-to-move-json-file /tmp/topics.json \
      --broker-list <BrokerList> \
      --generate

    たとえば、 topic-a および topic-b のすべてのパーティションをブローカー 4 および 7 に移動する場合は、以下を実行し m す。

    oc exec <BrokerPod> -c kafka -it -- \
      bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 \
      --topics-to-move-json-file /tmp/topics.json \
      --broker-list 4,7 \
      --generate

3.1.25.4. 手動による再割り当て JSON ファイルの作成

特定のパーティションを移動したい場合は、再割り当て JSON ファイルを手動で作成できます。

3.1.25.5. 再割り当てスロットル

パーティションの再割り当てには、ブローカーの間で大量のデータを転送する必要があるため、処理が遅くなる可能性があります。クライアントへの悪影響を防ぐため、再割り当て処理をスロットルで調整することができます。これにより、再割り当ての完了に時間がかかる可能性があります。

  • スロットルが低すぎると、新たに割り当てられたブローカーは公開されるレコードに遅れずに対応することはできず、再割り当ては永久に完了しません。
  • スロットルが高すぎると、クライアントに影響します。

たとえば、プロデューサーの場合は、承認待ちが通常のレイテンシーよりも大きくなる可能性があります。コンシューマーの場合は、ポーリング間のレイテンシーが大きいことが原因でスループットが低下する可能性があります。

3.1.25.6. Kafka クラスターのスケールアップ

この手順では、Kafka クラスターでブローカーの数を増やす方法を説明します。

前提条件

  • 既存の Kafka クラスター。
  • 拡大されたクラスターでパーティションをブローカーに再割り当てする方法が記述される reassignment.json というファイル名の 再割り当て JSON ファイル

手順

  1. kafka.spec.kafka.replicas 設定オプションを増やして、新しいブローカーを必要なだけ追加します。
  2. 新しいブローカー Pod が起動したことを確認します。
  3. 後でコマンドを実行するブローカー Pod に reassignment.json ファイルをコピーします。

    cat reassignment.json | \
      oc exec broker-pod -c kafka -i -- /bin/bash -c \
      'cat > /tmp/reassignment.json'

    以下は例になります。

    cat reassignment.json | \
      oc exec my-cluster-kafka-0 -c kafka -i -- /bin/bash -c \
      'cat > /tmp/reassignment.json'
  4. 同じブローカー Pod から kafka-reassign-partitions.sh コマンドラインツールを使用して、パーティションの再割り当てを実行します。

    oc exec broker-pod -c kafka -it -- \
      bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 \
      --reassignment-json-file /tmp/reassignment.json \
      --execute

    レプリケーションをスロットルで調整する場合、--throttle とブローカー間のスロットル率 (バイト/秒単位) を渡すこともできます。以下に例を示します。

    oc exec my-cluster-kafka-0 -c kafka -it -- \
      bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 \
      --reassignment-json-file /tmp/reassignment.json \
      --throttle 5000000 \
      --execute

    このコマンドは、2 つの再割り当て JSON オブジェクトを出力します。最初の JSON オブジェクトには、移動されたパーティションの現在の割り当てが記録されます。後で再割り当てを元に戻す必要がある場合に備え、この値をローカルファイル (Pod のファイル以外) に保存します。2 つ目の JSON オブジェクトは、再割り当て JSON ファイルに渡した目的の再割り当てです。

  5. 再割り当ての最中にスロットルを変更する必要がある場合は、同じコマンドラインに別のスロットル率を指定して実行します。以下は例になります。

    oc exec my-cluster-kafka-0 -c kafka -it -- \
      bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 \
      --reassignment-json-file /tmp/reassignment.json \
      --throttle 10000000 \
      --execute
  6. ブローカー Pod のいずれかから kafka-reassign-partitions.sh コマンドラインツールを使用して、再割り当てが完了したかどうかを定期的に確認します。これは先ほどの手順と同じコマンドですが、--execute オプションの代わりに --verify オプションを使用します。

    oc exec broker-pod -c kafka -it -- \
      bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 \
      --reassignment-json-file /tmp/reassignment.json \
      --verify

    以下に例を示します。

    oc exec my-cluster-kafka-0 -c kafka -it -- \
      bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 \
      --reassignment-json-file /tmp/reassignment.json \
      --verify
  7. --verify コマンドによって、移動した各パーティションが正常に完了したことが報告されると、再割り当ては終了します。この最終的な --verify によって、結果的に再割り当てスロットルも削除されます。割り当てを元のブローカーに戻すために JSON ファイルを保存した場合は、ここでそのファイルを削除できます。

3.1.25.7. Kafka クラスターのスケールダウン

関連情報

この手順では、Kafka クラスターでブローカーの数を減らす方法を説明します。

前提条件

  • 既存の Kafka クラスター。
  • 最も番号の大きい Pod(s) のブローカーが削除された後にクラスターのブローカーにパーティションを再割り当てする方法が記述されている、reassignment.json という名前の 再割り当て JSON ファイル

手順

  1. 後でコマンドを実行するブローカー Pod に reassignment.json ファイルをコピーします。

    cat reassignment.json | \
      oc exec broker-pod -c kafka -i -- /bin/bash -c \
      'cat > /tmp/reassignment.json'

    以下は例になります。

    cat reassignment.json | \
      oc exec my-cluster-kafka-0 -c kafka -i -- /bin/bash -c \
      'cat > /tmp/reassignment.json'
  2. 同じブローカー Pod から kafka-reassign-partitions.sh コマンドラインツールを使用して、パーティションの再割り当てを実行します。

    oc exec broker-pod -c kafka -it -- \
      bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 \
      --reassignment-json-file /tmp/reassignment.json \
      --execute

    レプリケーションをスロットルで調整する場合、--throttle とブローカー間のスロットル率 (バイト/秒単位) を渡すこともできます。以下に例を示します。

    oc exec my-cluster-kafka-0 -c kafka -it -- \
      bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 \
      --reassignment-json-file /tmp/reassignment.json \
      --throttle 5000000 \
      --execute

    このコマンドは、2 つの再割り当て JSON オブジェクトを出力します。最初の JSON オブジェクトには、移動されたパーティションの現在の割り当てが記録されます。後で再割り当てを元に戻す必要がある場合に備え、この値をローカルファイル (Pod のファイル以外) に保存します。2 つ目の JSON オブジェクトは、再割り当て JSON ファイルに渡した目的の再割り当てです。

  3. 再割り当ての最中にスロットルを変更する必要がある場合は、同じコマンドラインに別のスロットル率を指定して実行します。以下は例になります。

    oc exec my-cluster-kafka-0 -c kafka -it -- \
      bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 \
      --reassignment-json-file /tmp/reassignment.json \
      --throttle 10000000 \
      --execute
  4. ブローカー Pod のいずれかから kafka-reassign-partitions.sh コマンドラインツールを使用して、再割り当てが完了したかどうかを定期的に確認します。これは先ほどの手順と同じコマンドですが、--execute オプションの代わりに --verify オプションを使用します。

    oc exec broker-pod -c kafka -it -- \
      bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 \
      --reassignment-json-file /tmp/reassignment.json \
      --verify

    以下に例を示します。

    oc exec my-cluster-kafka-0 -c kafka -it -- \
      bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 \
      --reassignment-json-file /tmp/reassignment.json \
      --verify
  5. --verify コマンドによって、移動した各パーティションが正常に完了したことが報告されると、再割り当ては終了します。この最終的な --verify によって、結果的に再割り当てスロットルも削除されます。割り当てを元のブローカーに戻すために JSON ファイルを保存した場合は、ここでそのファイルを削除できます。
  6. すべてのパーティションの再割り当てが終了すると、削除されるブローカーはクラスター内のいずれのパーティションにも対応しないはずです。これは、ブローカーのデータログディレクトリーにライブパーティションのログが含まれていないことを確認すると検証できます。ブローカーのログディレクトリーに、拡張正規表現 [a-zA-Z0-9.-]+\.[a-z0-9]+-delete$ と一致しないディレクトリーが含まれる場合、ブローカーにはライブパーティションがあるため、停止してはなりません。

    これを確認するには、以下のコマンドを実行します。

    oc exec my-cluster-kafka-0 -c kafka -it -- \
      /bin/bash -c \
      "ls -l /var/lib/kafka/kafka-log_<N>_ | grep -E '^d' | grep -vE '[a-zA-Z0-9.-]+\.[a-z0-9]+-delete$'"

    N は削除された Pod(s) の数に置き換えます。

    上記のコマンドによって出力が生成される場合、ブローカーにはライブパーティションがあります。この場合、再割り当てが終了していないか、再割り当て JSON ファイルが適切ではありません。

  7. ブローカーにライブパーティションがないことが確認できたら、Kafka リソースの Kafka.spec.kafka.replicas を編集できます。これにより、StatefulSet がスケールダウンされ、番号が最も大きいブローカー Pod(s) が削除されます。

3.1.26. Kafka ノードの手動による削除

その他のリソース

この手順では、OpenShift アノテーションを使用して既存の Kafka ノードを削除する方法を説明します。Kafka ノードの削除するには、Kafka ブローカーが稼働している Pod と、関連する PersistentVolumeClaim の両方を削除します (クラスターが永続ストレージでデプロイされた場合)。削除後、Pod と関連する PersistentVolumeClaim が自動的に再作成されます。

警告

PersistentVolumeClaim を削除すると、データが永久に失われる可能性があります。以下の手順は、ストレージで問題が発生した場合にのみ実行してください。

前提条件

  • 稼働中の Kafka クラスター。
  • 稼働中の Cluster Operator。

手順

  1. 削除する Pod の名前を見つけます。

    たとえば、クラスターの名前が cluster-name の場合、Pod の名前は cluster-name-kafka-index になります。index はゼロで始まり、レプリカーの合計数で終わる値です。

  2. OpenShift で Pod リソースにアノテーションを付けます。

    oc annotate を使用します。

    oc annotate pod cluster-name-kafka-index strimzi.io/delete-pod-and-pvc=true
  3. 基盤となる永続ボリューム要求 (Persistent Volume Claim) でアノテーションが付けられた Pod が削除され、再作成されるときに、次の調整の実行を待ちます。

関連情報

3.1.27. ZooKeeper ノードの手動による削除

この手順では、OpenShift アノテーションを使用して既存の ZooKeeper ノードを削除する方法を説明します。ZooKeeper ノードを削除するには、ZooKeeper が稼働している Pod と、関連する PersistentVolumeClaim の両方を削除します (クラスターが永続ストレージでデプロイされた場合)。削除後、Pod と関連する PersistentVolumeClaim が自動的に再作成されます。

警告

PersistentVolumeClaim を削除すると、データが永久に失われる可能性があります。以下の手順は、ストレージで問題が発生した場合にのみ実行してください。

前提条件

  • 稼働中の ZooKeeper クラスター。
  • 稼働中の Cluster Operator。

手順

  1. 削除する Pod の名前を見つけます。

    たとえば、クラスターの名前が cluster-name の場合、Pod の名前は cluster-name-zookeeper-index になります。index はゼロで始まり、レプリカーの合計数で終わる値です。

  2. OpenShift で Pod リソースにアノテーションを付けます。

    oc annotate を使用します。

    oc annotate pod cluster-name-zookeeper-index strimzi.io/delete-pod-and-pvc=true
  3. 基盤となる永続ボリューム要求 (Persistent Volume Claim) でアノテーションが付けられた Pod が削除され、再作成されるときに、次の調整の実行を待ちます。

関連情報

3.1.28. ローリング更新のメンテナーンス時間枠

メンテナーンス時間枠によって、Kafka および ZooKeeper クラスターの特定のローリング更新が便利な時間に開始されるようにスケジュールできます。

3.1.28.1. メンテナーンス時間枠の概要

ほとんどの場合、Cluster Operator は対応する Kafka リソースの変更に対応するために Kafka または ZooKeeper クラスターのみを更新します。これにより、Kafka リソースの変更を適用するタイミングを計画し、Kafka クライアントアプリケーションへの影響を最小限に抑えることができます。

ただし、Kafka リソースの変更がなくても Kafka および ZooKeeper クラスターの更新が発生することがあります。たとえば、Cluster Operator によって管理される CA (認証局) 証明書が期限切れ直前である場合にローリング再起動の実行が必要になります。

サービスの 可用性 は Pod のローリング再起動による影響を受けないはずですが (ブローカーおよびトピックの設定が適切である場合)、Kafka クライアントアプリケーションの パフォーマンス は影響を受ける可能性があります。メンテナーンス時間枠によって、Kafka および ZooKeeper クラスターのこのような自発的な更新が便利な時間に開始されるようにスケジュールできます。メンテナーンス時間枠がクラスターに設定されていない場合は、予測できない高負荷が発生する期間など、不便な時間にこのような自発的なローリング更新が行われる可能性があります。

3.1.28.2. メンテナーンス時間枠の定義

Kafka.spec.maintenanceTimeWindows プロパティーに文字列の配列を入力して、メンテナーンス時間枠を設定します。各文字列は、UTC (協定世界時、Coordinated Universal Time) であると解釈される cron 式 です。UTC は実用的にはグリニッジ標準時と同じです。

以下の例では、日、月、火、水、および木曜日の午前 0 時に開始し、午前 1 時 59 分 (UTC) に終わる、単一のメンテナーンス時間枠が設定されます。

# ...
maintenanceTimeWindows:
  - "* * 0-1 ? * SUN,MON,TUE,WED,THU *"
# ...

実際には、必要な CA 証明書の更新が設定されたメンテナーンス時間枠内で完了できるように、Kafka リソースの Kafka.spec.clusterCa.renewalDays および Kafka.spec.clientsCa.renewalDays プロパティーとともにメンテナーンス期間を設定する必要があります。

注記

AMQ Streams では、指定の期間にしたがってメンテナーンス操作を正確にスケジュールしません。その代わりに、調整ごとにメンテナーンス期間が現在オープンであるかどうかを確認します。これは、特定の時間枠内でのメンテナーンス操作の開始が、最大で Cluster Operator の調整が行われる間隔の長さ分、遅れる可能性があることを意味します。したがって、メンテナーンス時間枠は最低でもその間隔の長さにする必要があります。

関連情報

3.1.28.3. メンテナーンス時間枠の設定

サポートされるプロセスによってトリガーされるローリング更新のメンテナーンス時間枠を設定できます。

前提条件

  • OpenShift クラスター
  • Cluster Operator が稼働中です。

手順

  1. Kafka リソースの maintenanceTimeWindows プロパティー を追加または編集します。たとえば、0800 から 1059 までと、1400 から 1559 までのメンテナーンスを可能にするには、以下のように maintenanceTimeWindows を設定します。

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        # ...
      zookeeper:
        # ...
      maintenanceTimeWindows:
        - "* * 8-10 * * ?"
        - "* * 14-15 * * ?"
  2. リソースを作成または更新します。

    oc apply を使用して、これを行うことができます。

    oc apply -f your-file

関連情報

3.1.29. CA 証明書の手動更新

Kafka.spec.clusterCa.generateCertificateAuthority および Kafka.spec.clientsCa.generateCertificateAuthority オブジェクトが false に設定されていない限り、クラスターおよびクライアント CA 証明書は、それぞれの証明書の更新期間の開始時に自動で更新されます。セキュリティー上の理由で必要であれば、証明書の更新期間が始まる前に、これらの証明書のいずれかまたは両方を手動で更新できます。更新された証明書は、古い証明書と同じ秘密鍵を使用します。

前提条件

  • Cluster Operator が稼働中です。
  • CA 証明書と秘密鍵がインストールされている Kafka クラスターが必要です。

手順

  • strimzi.io/force-renew アノテーションを、更新対象の CA 証明書が含まれる Secret に適用します。

    証明書Secretannotate コマンド

    クラスター CA

    <cluster-name>-cluster-ca-cert

    oc annotate secret <cluster-name>-cluster-ca-cert strimzi.io/force-renew=true

    クライアント CA

    <cluster-name>-clients-ca-cert

    oc annotate secret <cluster-name>-clients-ca-cert strimzi.io/force-renew=true

次回の調整で、アノテーションを付けた Secret の新規 CA 証明書が Cluster Operator によって生成されます。メンテナーンス時間枠が設定されている場合、Cluster Operator によって、最初の調整時に次のメンテナーンス時間枠内で新規 CA 証明書が生成されます。

Cluster Operator によって更新されたクラスターおよびクライアント CA 証明書をクライアントアプリケーションがリロードする必要があります。

3.1.30. 秘密鍵の交換

クラスター CA およびクライアント CA 証明書によって使用される秘密鍵を交換できます。秘密鍵を交換すると、Cluster Operator は新しい秘密鍵の新規 CA 証明書を生成します。

前提条件

  • Cluster Operator が稼働中です。
  • CA 証明書と秘密鍵がインストールされている Kafka クラスターが必要です。

手順

  • 更新対象の秘密鍵が含まれる Secretstrimzi.io/force-replace アノテーションを適用します。

    秘密鍵Secretannotate コマンド

    クラスター CA

    <cluster-name>-cluster-ca

    oc annotate secret <cluster-name>-cluster-ca strimzi.io/force-replace=true

    クライアント CA

    <cluster-name>-clients-ca

    oc annotate secret <cluster-name>-clients-ca strimzi.io/force-replace=true

次回の調整時に、Cluster Operator は以下を生成します。

  • アノテーションを付けた Secret の新しい秘密鍵
  • 新規 CA 証明書

メンテナーンス時間枠が設定されている場合、Cluster Operator によって、最初の調整時に次のメンテナーンス時間枠内で新しい秘密鍵と CA 証明書が生成されます。

Cluster Operator によって更新されたクラスターおよびクライアント CA 証明書をクライアントアプリケーションがリロードする必要があります。

3.1.31. Kafka クラスターの一部として作成されたリソースの一覧

以下のリソースは、OpenShift クラスターの Cluster Operator によって作成されます。

cluster-name-kafka
Kafka ブローカー Pod の管理を担当する StatefulSet。
cluster-name-kafka-brokers
DNS が Kafka ブローカー Pod の IP アドレスを直接解決するのに必要なサービス。
cluster-name-kafka-bootstrap
サービスは、Kafka クライアントのブートストラップサーバーとして使用できます。
cluster-name-kafka-external-bootstrap
OpenShift クラスター外部から接続するクライアントのブートストラップサービス。このリソースは、外部リスナーが有効な場合にのみ作成されます。
cluster-name-kafka-pod-id
トラフィックを OpenShift クラスターの外部から個別の Pod にルーティングするために使用されるサービス。このリソースは、外部リスナーが有効な場合にのみ作成されます。
cluster-name-kafka-external-bootstrap
OpenShift クラスターの外部から接続するクライアントのブートストラップルート。このリソースは、外部リスナーが有効になっていて、タイプ route に設定されている場合にのみ作成されます。
cluster-name-kafka-pod-id
OpenShift クラスターの外部から個別の Pod へのトラフィックに対するルート。このリソースは、外部リスナーが有効になっていて、タイプ route に設定されている場合にのみ作成されます。
cluster-name-kafka-config
Kafka 補助設定が含まれ、Kafka ブローカー Pod によってボリュームとしてマウントされる ConfigMap。
cluster-name-kafka-brokers
Kafka ブローカーキーのあるシークレット。
cluster-name-kafka
Kafka ブローカーによって使用されるサービスアカウント。
cluster-name-kafka
Kafka ブローカーに設定された Pod の Disruption Budget。
strimzi-namespace-name-cluster-name-kafka-init
Kafka ブローカーによって使用されるクラスターロールバインディング。
cluster-name-zookeeper
ZooKeeper ノード Pod の管理を担当する StatefulSet。
cluster-name-zookeeper-nodes
DNS が ZooKeeper Pod の IP アドレスを直接解決するのに必要なサービス。
cluster-name-zookeeper-client
Kafka ブローカーがクライアントとして ZooKeeper ノードに接続するために使用するサービス。
cluster-name-zookeeper-config
ZooKeeper 補助設定が含まれ、ZooKeeper ノード Pod によってボリュームとしてマウントされる ConfigMap。
cluster-name-zookeeper-nodes
ZooKeeper ノードキーがあるシークレット。
cluster-name-zookeeper
ZooKeeper ノードに設定された Pod の Disruption Budget。
cluster-name-entity-operator
Topic および User Operator とのデプロイメント。このリソースは、Cluster Operator によって Entity Operator がデプロイされた場合のみ作成されます。
cluster-name-entity-topic-operator-config
Topic Operator の補助設定のある ConfigMap。このリソースは、Cluster Operator によって Entity Operator がデプロイされた場合のみ作成されます。
cluster-name-entity-user-operator-config
User Operator の補助設定のある ConfigMap。このリソースは、Cluster Operator によって Entity Operator がデプロイされた場合のみ作成されます。
cluster-name-entity-operator-certs
Kafka および ZooKeeper と通信するための Entity Operator キーのあるシークレット。このリソースは、Cluster Operator によって Entity Operator がデプロイされた場合のみ作成されます。
cluster-name-entity-operator
Entity Operator によって使用されるサービスアカウント。
strimzi-cluster-name-topic-operator
Entity Operator によって使用されるロールバインディング。
strimzi-cluster-name-user-operator
Entity Operator によって使用されるロールバインディング。
cluster-name-cluster-ca
クラスター通信の暗号化に使用されるクラスター CA のあるシークレット。
cluster-name-cluster-ca-cert
クラスター CA 公開鍵のあるシークレット。このキーは、Kafka ブローカーのアイデンティティーの検証に使用できます。
cluster-name-clients-ca
Kafka ブローカーと Kafka クライアントとの間の通信を暗号化するために使用されるクライアント CA のあるシークレット。
cluster-name-clients-ca-cert
クライアント CA 公開鍵のあるシークレット。このキーは、Kafka ブローカーのアイデンティティーの検証に使用できます。
cluster-name-cluster-operator-certs
Kafka および ZooKeeper と通信するための Cluster Operator キーのあるシークレット。
data-cluster-name-kafka-idx
Kafka ブローカー Pod idx のデータを保存するために使用されるボリュームの永続ボリューム要求です。このリソースは、データを保存するために永続ボリュームのプロビジョニングに永続ストレージが選択された場合のみ作成されます。
data-id-cluster-name-kafka-idx
Kafka ブローカー Pod idx のデータを保存するために使用されるボリューム id の永続ボリューム要求です。このリソースは、永続ボリュームをプロビジョニングしてデータを保存する場合に、JBOD ボリュームに永続ストレージが選択された場合のみ作成されます。
data-cluster-name-zookeeper-idx
ZooKeeper ノード Pod idx のデータを保存するために使用されるボリュームの永続ボリューム要求です。このリソースは、データを保存するために永続ボリュームのプロビジョニングに永続ストレージが選択された場合のみ作成されます。
cluster-name-jmx
Kafka ブローカーポートのセキュア化に使用される JMX ユーザー名およびパスワードのあるシークレット。
Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.