第2章 デプロイメント設定
本章では、サポートされるデプロイメントの異なる側面を設定する方法について説明します。
- Kafka クラスター
- Kafka Connect クラスター
- Source2Image がサポートされる Kafka Connect クラスター
- Kafka Mirror Maker
- Kafka Bridge
- OAuth 2.0 のトークンベースの認証
- OAuth 2.0 のトークンベースの承認
2.1. Kafka クラスターの設定
Kafka
リソースの完全なスキーマは 「Kafka
スキーマ参照」 に記載されています。指定の Kafka
リソースに適用されたすべてのラベルは、Kafka クラスターを構成する OpenShift リソースにも適用されます。そのため、必要に応じてリソースにラベルが適用されるため便利です。
2.1.1. Kafka YAML の設定例
Kafka デプロイメントで利用可能な設定オプションを理解するには、ここに提供されるサンプル YAML ファイルを参照してください。
例では、可能な設定オプションの一部のみを取り上げますが、特に重要なオプションは次のとおりです。
- リソース要求 (CPU/メモリー)
- 最大および最小メモリー割り当ての JVM オプション
- リスナー (および認証)
- 認証
- ストレージ
- ラックアウェアネス (Rack Awareness)
- メトリクス
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: replicas: 3 1 version: 1.6 2 resources: 3 requests: memory: 64Gi cpu: "8" limits: 4 memory: 64Gi cpu: "12" jvmOptions: 5 -Xms: 8192m -Xmx: 8192m listeners: 6 - name: plain 7 port: 9092 8 type: internal 9 tls: false 10 configuration: useServiceDnsDomain: true 11 - name: tls port: 9093 type: internal tls: true authentication: 12 type: tls - name: external 13 port: 9094 type: route tls: true configuration: brokerCertChainAndKey: 14 secretName: my-secret certificate: my-certificate.crt key: my-key.key authorization: 15 type: simple config: 16 auto.create.topics.enable: "false" offsets.topic.replication.factor: 3 transaction.state.log.replication.factor: 3 transaction.state.log.min.isr: 2 ssl.cipher.suites: "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384" 17 ssl.enabled.protocols: "TLSv1.2" ssl.protocol: "TLSv1.2" storage: 18 type: persistent-claim 19 size: 10000Gi 20 rack: 21 topologyKey: topology.kubernetes.io/zone metrics: 22 lowercaseOutputName: true rules: 23 # Special cases and very specific rules - pattern : kafka.server<type=(.+), name=(.+), clientId=(.+), topic=(.+), partition=(.*)><>Value name: kafka_server_$1_$2 type: GAUGE labels: clientId: "$3" topic: "$4" partition: "$5" # ... zookeeper: 24 replicas: 3 resources: requests: memory: 8Gi cpu: "2" limits: memory: 8Gi cpu: "2" jvmOptions: -Xms: 4096m -Xmx: 4096m storage: type: persistent-claim size: 1000Gi metrics: # ... entityOperator: 25 topicOperator: resources: requests: memory: 512Mi cpu: "1" limits: memory: 512Mi cpu: "1" userOperator: resources: requests: memory: 512Mi cpu: "1" limits: memory: 512Mi cpu: "1" kafkaExporter: 26 # ... cruiseControl: 27 # ...
- 1
- レプリカは、ブローカーノードの数を指定します。
- 2
- アップグレード手順 にしたがって変更可能な Kafka バージョン。
- 3
- リソース要求は、指定のコンテナーに対して予約するリソースを指定します。
- 4
- リソースの制限は、コンテナーによって消費可能な最大リソースを指定します。
- 5
- JVM オプションは、JVM の最小 (
-Xms
) および最大 (-Xmx
) メモリー割り当てを指定 できます。 - 6
- リスナーは、ブートストラップアドレスでクライアントが Kafka クラスターに接続する方法を設定します。リスナーは、OpenShift クラスター内部または外部の接続の 内部 または 外部 リスナーとして設定されます。
- 7
- リスナーを識別するための名前。Kafka クラスター内で一意である必要があります。
- 8
- Kafka 内でリスナーによって使用されるポート番号。ポート番号は指定の Kafka クラスター内で一意である必要があります。許可されるポート番号は 9092 以上ですが、すでに Prometheus および JMX によって使用されているポート 9404 および 9999 以外になります。リスナーのタイプによっては、ポート番号は Kafka クライアントに接続するポート番号と同じではない場合があります。
- 9
internal
として、または external リスナーに対して指定されるリスナータイプ(route
、loadbalancer
、nodeport
、またはingress
)。- 10
- 各リスナーの TLS 暗号化を有効にします。デフォルトは
false
です。route
リスナーには TLS 暗号化は必要ありません。 - 11
- クラスターサービスサフィックス(通常は
cluster.local
)を含む完全修飾 DNS 名が割り当てられているかどうかを定義します。 - 12
- 相互 TLS、SCRAM-SHA-512、またはトークンベース OAuth 2.0 として指定される リスナー認証メカニズム。
- 13
- 14
- 外部の認証局によって管理される Kafka リスナー証明書 の任意設定。
brokerCertChainAndKey
プロパティーは、サーバー証明書および秘密鍵を保持するSecret
を指定します。Kafka リスナー証明書も TLS リスナーに対して設定できます。 - 15
- 承認は Kafka ブローカーで簡易な OAUTH 2.0 または OPA 承認を有効にします。簡易承認では、
AclAuthorizer
Kafka プラグインが使用されます。 - 16
- 設定によって、ブローカー設定が指定されます。標準の Apache Kafka 設定が提供されることがありますが、AMQ Streams によって直接管理されないプロパティーに限定されます。
- 17
- 18
- ストレージは、
ephemeral
、persistent-claim
、またはjbod
として設定されます。 - 19
- 20
- 21
- ラックアウェアネスは、異なるラック全体でレプリカを分散 ために設定されます。
topology
キーはクラスターノードのラベルと一致する必要があります。 - 22
- 23
- JMX Exporter でメトリクスを Grafana ダッシュボードにエクスポートする Kafka ルール。AMQ Streams によって提供されるルールのセットは Kafka リソース設定にコピーされることがあります。
- 24
- Kafka 設定と似たプロパティーが含まれる、ZooKeeper 固有の設定。
- 25
- Topic Operator および User Operator の設定を指定する、Entity Operator 設定。
- 26
- データを Prometheus メトリクスとして公開するために使用される Kafka Exporter 設定。
- 27
- Kafka クラスターのリバランス を行うために使用される Cruise Control。
2.1.2. データストレージに関する留意事項
効率的なデータストレージインフラストラクチャーは、AMQ Streams のパフォーマンスを最適化するために不可欠です。
ブロックストレージが必要です。NFS などのファイルストレージは、Kafka では機能しません。
ブロックストレージには、以下などを選択できます。
- Amazon Elastic Block Store (EBS)などのクラウドベースのブロックストレージソリューション。
- ローカルの永続ボリューム。
- ファイバーチャネル や iSCSI などのプロトコルがアクセスする SAN (ストレージネットワークエリア) ボリューム。
AMQ Streams には OpenShift の raw ブロックボリュームは必要ありません。
2.1.2.1. ファイルシステム
XFS ファイルシステムを使用するようにストレージシステムを設定することが推奨されます。AMQ Streams は ext4 ファイルシステムとも互換性がありますが、最適化するには追加の設定が必要になることがあります。
2.1.2.2. Apache Kafka および ZooKeeper ストレージ
Apache Kafka と ZooKeeper には別々のディスクを使用します。
3 つのタイプのデータストレージがサポートされます。
- 一時データストレージ (開発用のみで推奨されます)
- 永続データストレージ
- JBOD (Just a Bunch of Disks、Kafka のみに適しています)
詳細は「Kafka および ZooKeeper ストレージ」を参照してください。
ソリッドステートドライブ (SSD) は必須ではありませんが、複数のトピックに対してデータが非同期的に送受信される大規模なクラスターで Kafka のパフォーマンスを向上させることができます。SSD は、高速で低レイテンシーのデータアクセスが必要な ZooKeeper で特に有効です。
Kafka と ZooKeeper の両方にデータレプリケーションが組み込まれているため、複製されたストレージのプロビジョニングは必要ありません。
2.1.3. Kafka および ZooKeeper のストレージタイプ
Kafka および ZooKeeper はステートフルなアプリケーションであるため、データをディスクに格納する必要があります。AMQ Streams では、3 つのタイプのストレージがサポートされます。
- 一時ストレージ
- 永続ストレージ
- JBOD ストレージ
JBOD ストレージは Kafka でサポートされ、ZooKeeper ではサポートされていません。
Kafka
リソースを設定する場合、Kafka ブローカーおよび対応する ZooKeeper ノードによって使用されるストレージのタイプを指定できます。以下のリソースの storage
プロパティーを使用して、ストレージタイプを設定します。
-
Kafka.spec.kafka
-
Kafka.spec.zookeeper
ストレージタイプは type
フィールドで設定されます。
Kafka クラスターをデプロイした後に、ストレージタイプを変更することはできません。
その他のリソース
- 一時ストレージの詳細は、「一時ストレージのスキーマ参照」を参照してください。
- 永続ストレージの詳細は、「永続ストレージのスキーマ参照」を参照してください。
- JBOD ストレージの詳細は、「JBODの スキーマ参照」を参照してください。
-
Kafka
のスキーマに関する詳細は、「Kafka
のスキーマ参照」を参照してください。
2.1.3.1. 一時ストレージ
一時ストレージは emptyDir
ボリュームを使用してデータを保存します。一時ストレージを使用するには、type
フィールドを ephemeral
に設定する必要があります。
emptyDir
ボリュームは永続的ではなく、保存されたデータは Pod の再起動時に失われます。新規 Pod の起動後に、クラスターの他のノードからすべてのデータを復元する必要があります。一時ストレージは、単一ノードの ZooKeeper クラスターやレプリケーション係数が 1 の Kafka トピックでの使用には適していません。これはデータが損失する原因となるからです。
一時ストレージの例
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... storage: type: ephemeral # ... zookeeper: # ... storage: type: ephemeral # ...
2.1.3.1.1. ログディレクトリー
一時ボリュームは、以下のパスにマウントされるログディレクトリーとして Kafka ブローカーによって使用されます。
/var/lib/kafka/data/kafka-log_idx_
-
idx
は、Kafka ブローカー Pod インデックスです。たとえば、/var/lib/kafka/data/kafka-log0
のようになります。
2.1.3.2. 永続ストレージ
永続ストレージは Persistent Volume Claim (永続ボリューム要求、PVC) を使用して、データを保存するための永続ボリュームをプロビジョニングします。永続ボリューム要求を使用すると、ボリュームのプロビジョニングを行う ストレージクラス に応じて、さまざまなタイプのボリュームをプロビジョニングできます。永続ボリューム要求と使用できるデータタイプには、多くのタイプの SAN ストレージやローカル永続ボリューム などがあります。
永続ストレージを使用するには、type
を persistent-claim
に設定する必要があります。永続ストレージでは、追加の設定オプションがサポートされます。
id
(任意)-
ストレージ ID 番号。このオプションは、JBOD ストレージ宣言で定義されるストレージボリュームには必須です。デフォルトは
0
です。 size
(必須)- 永続ボリューム要求のサイズを定義します (例: 1000Gi)。
class
(任意)- 動的ボリュームプロビジョニングに使用する OpenShift の ストレージクラス。
selector
(任意)- 使用する特定の永続ボリュームを選択できます。このようなボリュームを選択するラベルを表す key:value ペアが含まれます。
deleteClaim
(任意)-
クラスターのアンデプロイ時に永続ボリューム要求を削除する必要があるかどうかを指定するブール値。デフォルトは
false
です。
既存の AMQ Streams クラスターで永続ボリュームのサイズを増やすことは、永続ボリュームのサイズ変更をサポートする OpenShift バージョンでのみサポートされます。サイズを変更する永続ボリュームには、ボリューム拡張をサポートするストレージクラスを使用する必要があります。ボリューム拡張をサポートしないその他のバージョンの OpenShift およびストレージクラスでは、クラスターをデプロイする前に必要なストレージサイズを決定する必要があります。既存の永続ボリュームのサイズを縮小することはできません。
size
が 1000Gi の永続ストレージ設定の例 (抜粋)
# ... storage: type: persistent-claim size: 1000Gi # ...
以下の例は、ストレージクラスの使用例を示しています。
特定のストレージクラスを指定する永続ストレージ設定の例 (抜粋)
# ... storage: type: persistent-claim size: 1Gi class: my-storage-class # ...
最後に、selector
を使用して特定のラベルが付いた永続ボリュームを選択し、SSD などの必要な機能を提供できます。
セレクターを指定する永続ストレージ設定の例 (抜粋)
# ... storage: type: persistent-claim size: 1Gi selector: hdd-type: ssd deleteClaim: true # ...
2.1.3.2.1. ストレージクラスのオーバーライド
デフォルトのストレージクラスを使用する代わりに、1 つ以上の Kafka ブローカー または ZooKeeper ノードに異なるストレージクラスを指定できます。これは、ストレージクラスが、異なるアベイラビリティーゾーンやデータセンターに制限されている場合などに便利です。この場合、overrides
フィールドを使用できます。
以下の例では、デフォルトのストレージクラスの名前は my-storage-class
になります。
ストレージクラスのオーバーライドを使用した AMQ Streams クラスターの例
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: labels: app: my-cluster name: my-cluster namespace: myproject spec: # ... kafka: replicas: 3 storage: deleteClaim: true size: 100Gi type: persistent-claim class: my-storage-class overrides: - broker: 0 class: my-storage-class-zone-1a - broker: 1 class: my-storage-class-zone-1b - broker: 2 class: my-storage-class-zone-1c # ... zookeeper: replicas: 3 storage: deleteClaim: true size: 100Gi type: persistent-claim class: my-storage-class overrides: - broker: 0 class: my-storage-class-zone-1a - broker: 1 class: my-storage-class-zone-1b - broker: 2 class: my-storage-class-zone-1c # ...
overrides
プロパティーが設定され、ボリュームによって以下のストレージクラスが使用されます。
-
ZooKeeper ノード 0 の永続ボリュームでは
my-storage-class-zone-1a
が使用されます。 -
ZooKeeper ノード 1 の永続ボリュームでは
my-storage-class-zone-1b
が使用されます。 -
ZooKeeepr ノード 2 の永続ボリュームでは
my-storage-class-zone-1c
が使用されます。 -
Kafka ブローカー 0 の永続ボリュームでは
my-storage-class-zone-1a
が使用されます。 -
Kafka ブローカー 1 の永続ボリュームでは
my-storage-class-zone-1b
が使用されます。 -
Kafka ブローカー 2 の永続ボリュームでは
my-storage-class-zone-1c
が使用されます。
現在、overrides
プロパティーは、ストレージクラスの設定をオーバーライドするためのみに使用されます。他のストレージ設定フィールドのオーバーライドは現在サポートされていません。ストレージ設定の他のフィールドは現在サポートされていません。
2.1.3.2.2. Persistent Volume Claim (永続ボリューム要求、PVC) の命名
永続ストレージが使用されると、以下の名前で Persistent Volume Claim (永続ボリューム要求、PVC) が作成されます。
data-cluster-name-kafka-idx
-
Kafka ブローカー Pod
idx
のデータを保存するために使用されるボリュームの永続ボリューム要求です。 data-cluster-name-zookeeper-idx
-
ZooKeeper ノード Pod
idx
のデータを保存するために使用されるボリュームの永続ボリューム要求です。
2.1.3.2.3. ログディレクトリー
永続ボリュームは、以下のパスにマウントされるログディレクトリーとして Kafka ブローカーによって使用されます。
/var/lib/kafka/data/kafka-log_idx_
-
idx
は、Kafka ブローカー Pod インデックスです。たとえば、/var/lib/kafka/data/kafka-log0
のようになります。
2.1.3.3. 永続ボリュームのサイズ変更
既存の AMQ Streams クラスターによって使用される永続ボリュームのサイズを増やすことで、ストレージ容量を増やすことができます。永続ボリュームのサイズ変更は、JBOD ストレージ設定で 1 つまたは複数の永続ボリュームが使用されるクラスターでサポートされます。
永続ボリュームのサイズを拡張することはできますが、縮小することはできません。永続ボリュームのサイズ縮小は、現在 OpenShift ではサポートされていません。
前提条件
- ボリュームのサイズ変更をサポートする OpenShift クラスター。
- Cluster Operator が稼働している必要があります。
- ボリューム拡張をサポートするストレージクラスを使用して作成された永続ボリュームを使用する Kafka クラスター。
手順
Kafka
リソースで、Kafka クラスター、ZooKeeper クラスター、またはその両方に割り当てられた永続ボリュームのサイズを増やします。-
Kafka クラスターに割り当てられたボリュームサイズを増やすには、
spec.kafka.storage
プロパティーを編集します。 ZooKeeper クラスターに割り当てたボリュームサイズを増やすには、
spec.zookeeper.storage
プロパティーを編集します。たとえば、ボリュームサイズを
1000Gi
から2000Gi
に増やすには、以下のように編集します。apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... storage: type: persistent-claim size: 2000Gi class: my-storage-class # ... zookeeper: # ...
-
Kafka クラスターに割り当てられたボリュームサイズを増やすには、
リソースを作成または更新します。
次のように
oc apply
を使用します。oc apply -f your-file
OpenShift では、Cluster Operator からの要求に応じて、選択された永続ボリュームの容量が増やされます。サイズ変更が完了すると、サイズ変更された永続ボリュームを使用するすべての Pod が Cluster Operator によって再起動されます。これは自動的に行われます。
その他のリソース
OpenShift での永続ボリュームのサイズ変更に関する詳細は、「Resizing Persistent Volumes using Kubernetes」を参照してください。
2.1.3.4. JBOD ストレージの概要
AMQ Streams で、複数のディスクやボリュームのデータストレージ設定である JBOD を使用するように設定できます。JBOD は、Kafka ブローカーのデータストレージを増やす方法の 1 つです。また、パフォーマンスを向上することもできます。
JBOD 設定は 1 つ以上のボリュームによって記述され、各ボリュームは 一時 または 永続 ボリュームのいずれかになります。JBOD ボリューム宣言のルールおよび制約は、一時および永続ストレージのルールおよび制約と同じです。たとえば、永続ストレージのボリュームをプロビジョニング後に変更することはできません。
2.1.3.4.1. JBOD の設定
AMQ Streams で JBOD を使用するには、ストレージ type
をjbod
に設定する必要があります。volumes
プロパティーを使用すると、JBOD ストレージアレイまたは設定を構成するディスクを記述できます。以下は、JBOD 設定例の抜粋になります。
# ... storage: type: jbod volumes: - id: 0 type: persistent-claim size: 100Gi deleteClaim: false - id: 1 type: persistent-claim size: 100Gi deleteClaim: false # ...
id は、JBOD ボリュームの作成後に変更することはできません。
ユーザーは JBOD 設定に対してボリュームを追加または削除できます。
2.1.3.4.2. JBOD および 永続ボリューム要求 (PVC)
永続ストレージを使用して JBOD ボリュームを宣言する場合、永続ボリューム要求 (Persistent Volume Claim、PVC) の命名スキームは以下のようになります。
data-id-cluster-name-kafka-idx
-
id
は、Kafka ブローカー Podidx
のデータを保存するために使用されるボリュームの ID に置き換えます。
2.1.3.4.3. ログディレクトリー
JBOD ボリュームは、以下のパスにマウントされるログディレクトリーとして Kafka ブローカーによって使用されます。
/var/lib/kafka/data-id/kafka-log_idx_
-
id
は、Kafka ブローカー Podidx
のデータを保存するために使用されるボリュームの ID に置き換えます。たとえば、/var/lib/kafka/data-0/kafka-log0
のようになります。
2.1.3.5. JBOD ストレージへのボリュームの追加
この手順では、JBOD ストレージを使用するように設定されている Kafka クラスターにボリュームを追加する方法を説明します。この手順は、他のストレージタイプを使用するように設定されている Kafka クラスターには適用できません。
以前使用され、削除された id
の下に新規ボリュームを追加する場合、以前使用された PersistentVolumeClaims
が必ず削除されているよう確認する必要があります。
前提条件
- OpenShift クラスター。
- 稼働中の Cluster Operator。
- JBOD ストレージのある Kafka クラスター。
手順
Kafka
リソースのspec.kafka.storage.volumes
プロパティーを編集します。新しいボリュームをvolumes
アレイに追加します。たとえば、id が2
の新しいボリュームを追加します。apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... storage: type: jbod volumes: - id: 0 type: persistent-claim size: 100Gi deleteClaim: false - id: 1 type: persistent-claim size: 100Gi deleteClaim: false - id: 2 type: persistent-claim size: 100Gi deleteClaim: false # ... zookeeper: # ...
リソースを作成または更新します。
oc apply
を使用して、これを行うことができます。oc apply -f KAFKA-CONFIG-FILE
- 新しいトピックを作成するか、既存のパーティションを新しいディスクに再度割り当てします。
その他のリソース
トピックの再割り当てに関する詳細は 「パーティションの再割り当て」 を参照してください。
2.1.3.6. JBOD ストレージからのボリュームの削除
この手順では、JBOD ストレージを使用するように設定されている Kafka クラスターからボリュームを削除する方法を説明します。この手順は、他のストレージタイプを使用するように設定されている Kafka クラスターには適用できません。JBOD ストレージには、常に 1 つのボリュームが含まれている必要があります。
データの損失を避けるには、ボリュームを削除する前にすべてのパーティションを移動する必要があります。
前提条件
- OpenShift クラスター。
- 稼働中の Cluster Operator。
- 複数のボリュームがある JBOD ストレージのある Kafka クラスター。
手順
- 削除するディスクからすべてのパーティションを再度割り当てます。削除するディスクに割り当てられたままになっているパーティションのデータは削除される可能性があります。
Kafka
リソースのspec.kafka.storage.volumes
プロパティーを編集します。volumes
アレイから 1 つまたは複数のボリュームを削除します。たとえば、ID が1
と2
のボリュームを削除します。apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... storage: type: jbod volumes: - id: 0 type: persistent-claim size: 100Gi deleteClaim: false # ... zookeeper: # ...
リソースを作成または更新します。
oc apply
を使用して、これを行うことができます。oc apply -f your-file
関連情報
トピックの再割り当てに関する詳細は 「パーティションの再割り当て」 を参照してください。
2.1.4. Kafka ブローカーレプリカ
Kafka クラスターは多くのブローカーを使って実行できます。Kafka.spec.kafka.replicas
の Kafka クラスターに使用されるブローカーの数を設定できます。クラスターに最適なブローカー数は、特定のユースケースに基づいて決定する必要があります。
2.1.4.1. ブローカーノード数の設定
この手順では、新規クラスターの Kafka ブローカーノードの数を設定する方法を説明します。これは、パーティションのない新しいクラスターのみに適用できます。クラスターにトピックがすでに定義されている場合は、「クラスターのスケーリング」 を参照してください。
前提条件
- OpenShift クラスター。
- 稼働中の Cluster Operator が必要です。
- トピックが定義されていない Kafka クラスター。
手順
Kafka
リソースのreplicas
プロパティーを編集します。以下は例になります。apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... replicas: 3 # ... zookeeper: # ...
リソースを作成または更新します。
oc apply
を使用して、これを行うことができます。oc apply -f your-file
関連情報
クラスターにトピックがすでに定義されている場合は、「クラスターのスケーリング」 を参照してください。
2.1.5. Kafka ブローカーの設定
AMQ Streams では、Kafka クラスターの Kafka ブローカーの設定をカスタマイズできます。Apache Kafka ドキュメント の「Broker Configs」セクションに記載されているほとんどのオプションを指定および設定できます。以下に関係する設定オプションは設定できません。
- セキュリティー (暗号化、認証、および承認)
- リスナーの設定
- Broker ID の設定
- ログデータディレクトリーの設定
- ブローカー間の通信
- ZooKeeper の接続
これらのオプションは AMQ Streams によって自動的に設定されます。
ブローカー設定の詳細は、「 KafkaClusterSpec
スキーマ 」を参照してください。
リスナーの設定
Kafka ブローカーに接続するリスナーを設定します。リスナーの設定に関する詳細は、「リスナーの設定」を参照してください。
Kafka へのアクセスの承認
ユーザーが実行するアクションを許可または拒否するように Kafka クラスターを設定できます。Kafka ブローカーへのアクセスをセキュアにするための詳細は、「Kafka へのアクセス管理」を参照してください。
2.1.5.1. Kafka ブローカーの設定
既存の Kafka ブローカーを設定するか、指定した設定で新しい Kafka ブローカーを作成します。
前提条件
- OpenShift クラスターが利用できる必要があります。
- Cluster Operator が稼働している必要があります。
手順
-
クラスターデプロイメントを指定する
Kafka
リソースが含まれる YAML 設定ファイルを開きます。 Kafka
リソースのspec.kafka.config
プロパティーで、Kafka 設定を 1 つまたは複数入力します。以下は例になります。apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka spec: kafka: # ... config: default.replication.factor: 3 offsets.topic.replication.factor: 3 transaction.state.log.replication.factor: 3 transaction.state.log.min.isr: 1 # ... zookeeper: # ...
新しい設定を適用してリソースを作成または更新します。
次のように
oc apply
を使用します。oc apply -f kafka.yaml
kafka.yaml
は、設定するリソースの YAML 設定ファイルに置き換えます (例:kafka-persistent.yaml
)。
2.1.6. リスナーの設定
リスナーは、Kafka ブローカーへの接続に使用されます。
AMQ Streamsは、Kafka
リソースを介してリスナーを設定するためのプロパティを備えたジェネリックな GenericKafkaListener
スキーマを提供しています。
GenericKafkaListener
は、リスナー設定に柔軟なアプローチを提供します。
プロパティーを指定して、OpenShift クラスター内で接続する 内部 リスナーを設定したり、OpenShift クラスター外部で接続する外部 リスナーを設定したりできます。
汎用リスナーの設定
各リスナーは Kafka
リソースの配列として定義されます。
リスナーの設定に関する詳細は、GenericKafkaListener スキーマ参照
を参照してください。
汎用リスナー設定は、非推奨となった KafkaListeners
スキーマ参照 を使用して、以前のリスナー設定の代わりに使用します。ただし、後方互換性によって、以前の形式を新しい形式に変換 することができます。
KafkaListeners
スキーマは、plain
、tls、および
external
リスナーのサブプロパティーを使用し、それぞれ固定ポートを使用します。スキーマのアーキテクチャー固有の制限により、3 つのリスナーのみを設定でき、設定オプションはリスナーのタイプに制限されます。
GenericKafkaListener スキーマ
を使用すると、名前とポートが一意であれば、必要なリスナーをいくつでも設定できます。
たとえば、異なる認証メカニズムを必要とするネットワークからのアクセスを処理する場合などに、複数の外部リスナーを設定することがあります。また、OpenShift ネットワークを外部ネットワークに参加させる必要があることがあります。この場合、OpenShift サービスの DNS ドメイン(通常は.cluster.local
)が使用されないように内部リスナーを構成することができます(useServiceDnsDomain
プロパティを使用)。
Kafka ブローカーへのアクセスをセキュアにするためのリスナー設定
リスナーを設定して、認証を使用したセキュアな接続を確立できます。Kafka ブローカーへのアクセスをセキュアにするための詳細は、「Kafka へのアクセス管理」を参照してください。
OpenShift 外部のクライアントアクセスに対する外部リスナーの設定
ロードバランサーなどの指定された接続メカニズムを使用して、OpenShift 環境外部のクライアントアクセスに対して外部リスナーを設定できます。外部クライアントを接続するための設定オプションの詳細は、「外部リスナーの設定」を参照してください。
リスナー証明書
TLS 暗号化が有効になっている TLS リスナーまたは外部リスナーの、Kafka リスナー証明書 と呼ばれる独自のサーバー証明書を提供できます。詳細は「Kafka リスナー証明書」を参照してください。
2.1.7. ZooKeeper レプリカ
通常、ZooKeeper クラスターまたはアンサンブルは、一般的に 3、5、7 個の奇数個のノードで実行されます。
効果的なクォーラムを維持するには、過半数のノードが利用可能である必要があります。ZooKeeper クラスターでクォーラムを損失すると、クライアントへの応答が停止し、Kafka ブローカーが機能しなくなります。AMQ Streams では、 ZooKeeper クラスターの安定性および高可用性が重要になります。
- 3 ノードクラスター
- 3 ノードの ZooKeeper クラスターでは、クォーラムを維持するために、少なくとも 2 つのノードが稼働している必要があります。このクラスターは、利用できないノードが 1 つのみであれば対応できます。
- 5 ノードクラスター
- 5 ノードの ZooKeeper クラスターでは、クォーラムを維持するために、少なくとも 3 つのノードが稼働している必要があります。このクラスターは、利用できないノードが 2 つの場合まで対応できます。
- 7 ノードクラスター
- 7 ノードの ZooKeeper クラスターでは、クォーラムを維持するために、少なくとも 4 つのノードが稼働している必要があります。このクラスターは、利用できないノードが 3 つの場合まで対応できます。
開発の目的で、単一ノードの ZooKeeper を実行することも可能です。
クラスターのノードの数が多いほどクォーラムを維持するコストも高くなるため、ノードの数が多いほどパフォーマンスが向上するとは限りません。可用性の要件に応じて、使用するノードの数を決定します。
2.1.7.1. ZooKeeper ノードの数
ZooKeeper ノードの数は、Kafka.spec.zookeeper
の replicas
プロパティーを使用して設定できます。
レプリカの設定を示す例
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... zookeeper: # ... replicas: 3 # ...
2.1.7.2. ZooKeeper レプリカの数の変更
前提条件
- OpenShift クラスターが利用できる必要があります。
- Cluster Operator が稼働している必要があります。
手順
-
クラスターデプロイメントを指定する
Kafka
リソースが含まれる YAML 設定ファイルを開きます。 Kafka
リソースのspec.zookeeper.replicas
プロパティーで、複製された ZooKeeper サーバーの数を入力します。以下は例になります。apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... zookeeper: # ... replicas: 3 # ...
新しい設定を適用してリソースを作成または更新します。
次のように
oc apply
を使用します。oc apply -f kafka.yaml
kafka.yaml
は、設定するリソースの YAML 設定ファイルに置き換えます (例:kafka-persistent.yaml
)。
2.1.8. ZooKeeper の設定
AMQ Streams では、Apache ZooKeeper ノードの設定をカスタマイズできます。ZooKeeper のドキュメントに記載されているほとんどのオプションを指定および設定できます。
以下に関連するオプションは設定できません。
- セキュリティー (暗号化、認証、および承認)
- リスナーの設定
- データディレクトリーの設定
- ZooKeeper クラスターの構成
これらのオプションは AMQ Streams によって自動的に設定されます。
2.1.8.1. ZooKeeper の設定
ZooKeeper ノードは、Kafka.spec.zookeeper
の config
プロパティーを使用して設定されます。このプロパティーには、ZooKeeper 設定オプションがキーとして含まれます。値は、以下の JSON タイプの 1 つを使用して記述できます。
- 文字列
- 数値
- ブール値
ユーザーは、AMQ Streams で直接管理されるオプションを除き、ZooKeeper ドキュメント に記載されているオプションを指定および設定できます。以下の文字列の 1 つと同じキーまたは以下の文字列の 1 つで始まるキーを持つ設定オプションはすべて禁止されています。
-
server.
-
dataDir
-
dataLogDir
-
clientPort
-
authProvider
-
quorum.auth
-
requireClientAuthScheme
禁止されているオプションの 1 つが config
プロパティーにある場合、そのオプションは無視され、警告メッセージが Cluster Operator ログファイルに出力されます。その他のオプションは、すべて ZooKeeper に渡されます。
Cluster Operator では、提供された config
オブジェクトのキーまたは値は検証されません。無効な設定を指定すると、ZooKeeper クラスターが起動しなかったり、不安定になる可能性があります。このような場合、Kafka.spec.zookeeper.config
オブジェクトの設定を修正し、Cluster Operator によって新しい設定がすべての ZooKeeper ノードにロールアウトされるようにします。
選択したオプションのデフォルト値は次のとおりです。
-
timeTick
、デフォルト値2000
-
initLimit
、デフォルト値5
-
syncLimit
、デフォルト値2
-
autopurge.purgeInterval
、デフォルト値1
これらのオプションは、Kafka.spec.zookeeper.config
プロパティーにない場合に自動的に設定されます。
TLSバージョンの特定のcipher suiteを使用するクライアント接続には、3つの許可されたssl
設定オプションを使用します。暗号スイートは、セキュアな接続とデータ転送のためのアルゴリズムを組み合わせます。
ZooKeeper の設定例
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka spec: kafka: # ... zookeeper: # ... config: autopurge.snapRetainCount: 3 autopurge.purgeInterval: 1 ssl.cipher.suites: "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384" 1 ssl.enabled.protocols: "TLSv1.2" 2 ssl.protocol: "TLSv1.2" 3 # ...
2.1.8.2. ZooKeeper の設定
前提条件
- OpenShift クラスターが利用できる必要があります。
- Cluster Operator が稼働している必要があります。
手順
-
クラスターデプロイメントを指定する
Kafka
リソースが含まれる YAML 設定ファイルを開きます。 Kafka
リソースのspec.zookeeper.config
プロパティーで、1 つまたは複数の ZooKeeper 設定を指定します。以下は例になります。apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka spec: kafka: # ... zookeeper: # ... config: autopurge.snapRetainCount: 3 autopurge.purgeInterval: 1 # ...
新しい設定を適用してリソースを作成または更新します。
次のように
oc apply
を使用します。oc apply -f kafka.yaml
kafka.yaml
は、設定するリソースの YAML 設定ファイルに置き換えます (例:kafka-persistent.yaml
)。
2.1.9. ZooKeeper の接続
ZooKeeper サービスは暗号化および認証でセキュア化され、AMQ Streams の一部でない外部アプリケーションでの使用は想定されていません。
ただし、ZooKeeper への接続を必要とする Kafka CLI ツールを使用する場合は、ZooKeeper コンテナー内でターミナルを使用し、ZooKeeper アドレスとして localhost:12181
に接続できます。
2.1.9.1. ターミナルからの ZooKeeper への接続
ほとんどの Kafka CLI ツールは Kafka に直接接続できます。したがって、通常の状況では ZooKeeper に接続する必要はありません。必要な場合には、この手順を実行できます。ZooKeeper コンテナー内でターミナルを開き、ZooKeeper の接続を必要とする Kafka CLI ツールを使用します。
前提条件
- OpenShift クラスターが利用できる必要があります。
- Kafka クラスターが稼働している必要があります。
- Cluster Operator が稼働している必要があります。
手順
OpenShift コンソールを使用してターミナルを開くか、CLI から
exec
コマンドを実行します。以下は例になります。
oc exec -it my-cluster-zookeeper-0 -- bin/kafka-topics.sh --list --zookeeper localhost:12181
必ず
localhost:12181
を使用してください。ZooKeeper に対して Kafka コマンドを実行できるようになりました。
2.1.10. Entitiy Operator
Entity Operator は、実行中の Kafka クラスターで Kafka 関連のエンティティーを管理します。
Entity Operator は以下と構成されます。
- Kafka トピックを管理する Topic Operator
- Kafka ユーザーを管理する User Operator
Cluster Operator は Kafka
リソース設定を介して、Kafka クラスターのデプロイ時に、上記の Operator の 1 つまたは両方を含む Entity Operator をデプロイできます。
デプロイされると、デプロイメント設定に応じて、Entity Operator にオペレーターが含まれます。
これらのオペレーターは、Kafka クラスターのトピックおよびユーザーを管理するために自動的に設定されます。
2.1.10.1. Entity Operator の設定プロパティー
Kafka.spec
の entityOperator
プロパティーを使用して Entity Operator を設定します。
entityOperator
プロパティーでは複数のサブプロパティーがサポートされます。
-
tlsSidecar
-
topicOperator
-
userOperator
-
template
tlsSidecar
プロパティーには、ZooKeeper との通信に使用される TLS サイドカーコンテナーの設定が含まれます。TLS サイドカーコンテナーの設定に関する詳細は、「TLS サイドカー」 を参照してください。
template
プロパティーには、ラベル、アノテーション、アフィニティー、および容認 (Toleration) などの Entity Operator Pod の設定が含まれます。テンプレートの設定に関する詳細は、「OpenShift リソースのカスタマイズ」 を参照してください。
topicOperator
プロパティーには、Topic Operator の設定が含まれます。このオプションがないと、Entity Operator は Topic Operator なしでデプロイされます。
userOperator
プロパティーには、User Operator の設定が含まれます。このオプションがないと、Entity Operator は User Operator なしでデプロイされます。
Entity Operator を設定するためのプロパティーに関する詳細は「EntityUserOperatorSpec
スキーマ参照」を参照してください。
両方の Operator を有効にする基本設定の例
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... zookeeper: # ... entityOperator: topicOperator: {} userOperator: {}
topicOperator
および userOperator
に空のオブジェクト ({}
) が使用された場合、すべてのプロパティーでデフォルト値が使用されます。
topicOperator
および userOperator
プロパティーの両方がない場合、Entity Operator はデプロイされません。
2.1.10.2. Topic Operator 設定プロパティー
Topic Operator デプロイメントは、topicOperator
オブジェクト内で追加オプションを使用すると設定できます。以下のプロパティーがサポートされます。
watchedNamespace
-
Topic Operator によって
KafkaTopics
が監視される OpenShift namespace。デフォルトは、Kafka クラスターがデプロイされた namespace です。 reconciliationIntervalSeconds
-
定期的な調整 (reconciliation) の間隔 (秒単位)。デフォルト は
90
です。 zookeeperSessionTimeoutSeconds
-
ZooKeeper セッションのタイムアウト (秒単位)。デフォルト は
20
です。 topicMetadataMaxAttempts
-
Kafka からトピックメタデータの取得を試行する回数。各試行の間隔は、指数バックオフとして定義されます。パーティションまたはレプリカの数によって、トピックの作成に時間がかかる可能性がある場合は、この値を増やすことを検討してください。デフォルト は
6
です。 image
-
image
プロパティーを使用すると、使用されるコンテナーイメージを設定できます。カスタムコンテナーイメージの設定に関する詳細は、「コンテナーイメージ」 を参照してください。 resources
-
resources
プロパティーを使用すると、Topic Operator に割り当てられるリソースの量を設定できます。リソースの要求と制限の設定に関する詳細は、「CPU およびメモリーリソース」 を参照してください。 ログ
-
logging
プロパティーは、Topic Operator のロギングを設定します。詳細は 「Operator ロガー」 を参照してください。
Topic Operator 設定の例
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... zookeeper: # ... entityOperator: # ... topicOperator: watchedNamespace: my-topic-namespace reconciliationIntervalSeconds: 60 # ...
2.1.10.3. User Operator 設定プロパティー
User Operator デプロイメントは、userOperator
オブジェクト内で追加オプションを使用すると設定できます。以下のプロパティーがサポートされます。
watchedNamespace
-
User Operator によって
KafkaUsers
が監視される OpenShift namespace。デフォルトは、Kafka クラスターがデプロイされた namespace です。 reconciliationIntervalSeconds
-
定期的な調整 (reconciliation) の間隔 (秒単位)。デフォルト は
120
です。 zookeeperSessionTimeoutSeconds
-
ZooKeeper セッションのタイムアウト (秒単位)。デフォルト は
6
です。 image
-
image
プロパティーを使用すると、使用されるコンテナーイメージを設定できます。カスタムコンテナーイメージの設定に関する詳細は、「コンテナーイメージ」 を参照してください。 resources
-
resources
プロパティーを使用すると、User Operator に割り当てられるリソースの量を設定できます。リソースの要求と制限の設定に関する詳細は、「CPU およびメモリーリソース」 を参照してください。 ログ
-
logging
プロパティーは、User Operator のロギングを設定します。詳細は 「Operator ロガー」 を参照してください。
User Operator 設定の例
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... zookeeper: # ... entityOperator: # ... userOperator: watchedNamespace: my-user-namespace reconciliationIntervalSeconds: 60 # ...
2.1.10.4. Operator ロガー
Topic Operator および User Operator には設定可能なロガーがあります。
-
rootLogger.level
これらの Operator では Apache log4j2
ロガー実装が使用されます。
Kafka
リソースの logging
プロパティーを使用して、ロガーおよびロガーレベルを設定します。
ログレベルを設定するには、ロガーとレベルを直接指定 (インライン) するか、またはカスタム (外部) ConfigMap を使用します。ConfigMap を使用する場合、logging.name
プロパティーを外部ロギング設定が含まれる ConfigMap の名前に設定します。ConfigMap 内では、ロギング設定は log4j2.properties
を使用して記述されます。
ここで、inline
および external
ロギングの例を示します。
inline ロギング
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... zookeeper: # ... entityOperator: # ... topicOperator: watchedNamespace: my-topic-namespace reconciliationIntervalSeconds: 60 logging: type: inline loggers: rootLogger.level: INFO # ... userOperator: watchedNamespace: my-topic-namespace reconciliationIntervalSeconds: 60 logging: type: inline loggers: rootLogger.level: INFO # ...
外部ロギング
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... zookeeper: # ... entityOperator: # ... topicOperator: watchedNamespace: my-topic-namespace reconciliationIntervalSeconds: 60 logging: type: external name: customConfigMap # ...
その他のリソース
- ガベッジコレクター (GC) ロギングを有効 (または無効) にすることもできます。GC ロギングの詳細は、を参照してください。 「JVM 設定」
- ログレベルの詳細は、「Apache logging services」を参照してください。
2.1.10.5. Entity Operator の設定
前提条件
- OpenShift クラスター。
- 稼働中の Cluster Operator。
手順
Kafka
リソースのentityOperator
プロパティーを編集します。以下は例になります。apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... zookeeper: # ... entityOperator: topicOperator: watchedNamespace: my-topic-namespace reconciliationIntervalSeconds: 60 userOperator: watchedNamespace: my-user-namespace reconciliationIntervalSeconds: 60
リソースを作成または更新します。
oc apply
を使用して、これを行うことができます。oc apply -f your-file
2.1.11. CPU およびメモリーリソース
AMQ Streams では、デプロイされたコンテナーごとに特定のリソースを要求し、これらのリソースの最大消費を定義できます。
AMQ Streams では、以下の 2 つのタイプのリソースがサポートされます。
- CPU
- メモリー
AMQ Streams では、CPU およびメモリーリソースの指定に OpenShift 構文が使用されます。
2.1.11.1. リソースの制限および要求
リソースの制限と要求は、以下のリソースで resources
プロパティーを使用して設定されます。
-
Kafka.spec.kafka
-
Kafka.spec.zookeeper
-
Kafka.spec.entityOperator.topicOperator
-
Kafka.spec.entityOperator.userOperator
-
Kafka.spec.entityOperator.tlsSidecar
-
Kafka.spec.kafkaExporter
-
KafkaConnect.spec
-
KafkaConnectS2I.spec
-
KafkaBridge.spec
その他のリソース
- OpenShift におけるコンピュートリソースの管理に関する詳細は、「Managing Compute Resources for Containers」を参照してください。
2.1.11.1.1. リソース要求
要求によって、指定のコンテナーに対して予約するリソースが指定されます。リソースを予約すると、リソースが常に利用できるようになります。
リソース要求が OpenShift クラスターで利用可能な空きリソースを超える場合、Pod はスケジュールされません。
リソース要求は requests
プロパティーで指定されます。AMQ Streams では、現在以下のリソース要求がサポートされます。
-
cpu
-
memory
1 つまたは複数のサポートされるリソースに対してリクエストを設定できます。
すべてのリソースを対象とするリソース要求の設定例
# ... resources: requests: cpu: 12 memory: 64Gi # ...
2.1.11.1.2. リソース制限
制限によって、指定のコンテナーが消費可能な最大リソースが指定されます。制限は予約されず、常に利用できるとは限りません。コンテナーは、リソースが利用できる場合のみ、制限以下のリソースを使用できます。リソース制限は、常にリソース要求よりも高くする必要があります。
リソース制限は limits
プロパティーで指定されます。AMQ Streams では、現在以下のリソース制限がサポートされます。
-
cpu
-
memory
1 つまたは複数のサポートされる制限に対してリソースを設定できます。
リソース制限の設定例
# ... resources: limits: cpu: 12 memory: 64Gi # ...
2.1.11.1.3. サポートされる CPU 形式
CPU の要求および制限は以下の形式でサポートされます。
-
整数値 (
5
) または少数 (2.5
) の CPU コアの数。 -
数値または ミリ CPU / ミリコア (
100m
)。1000 ミリコア は CPU コア1
つと同じです。
CPU ユニットの例
# ... resources: requests: cpu: 500m limits: cpu: 2.5 # ...
1 つの CPU コアのコンピューティング能力は、OpenShift がデプロイされたプラットフォームによって異なることがあります。
その他のリソース
- CPU 仕様の詳細は、「Meaning of CPU」を参照してください。
2.1.11.1.4. サポートされるメモリー形式
メモリー要求および制限は、メガバイト、ギガバイト、メビバイト、およびギビバイトで指定されます。
-
メモリーをメガバイトで指定するには、
M
接尾辞を使用します。たとえば、1000M
のように指定します。 -
メモリーをギガバイトで指定するには、
G
接尾辞を使用します。たとえば、1G
のように指定します。 -
メモリーをメビバイトで指定するには、
Mi
接尾辞を使用します。たとえば、1000Mi
のように指定します。 -
メモリーをギビバイトで指定するには、
Gi
接尾辞を使用します。たとえば、1Gi
のように指定します。
異なるメモリー単位の使用例
# ... resources: requests: memory: 512Mi limits: memory: 2Gi # ...
その他のリソース
- メモリーの指定およびサポートされるその他の単位に関する詳細は、「Meaning of memory」を参照してください。
2.1.11.2. リソース要求および制限の設定
前提条件
- OpenShift クラスター。
- 稼働中の Cluster Operator。
手順
クラスターデプロイメントを指定するリソースの
resources
プロパティーを編集します。以下は例になります。apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka spec: kafka: # ... resources: requests: cpu: "8" memory: 64Gi limits: cpu: "12" memory: 128Gi # ... zookeeper: # ...
リソースを作成または更新します。
oc apply
を使用して、これを行うことができます。oc apply -f your-file
関連情報
- スキーマの詳細は、「ResourceRequirements API reference」を参照してください。
2.1.12. Kafka ロガー
Kafka には独自の設定可能なロガーがあります。
-
log4j.logger.org.I0Itec.zkclient.ZkClient
-
log4j.logger.org.apache.zookeeper
-
log4j.logger.kafka
-
log4j.logger.org.apache.kafka
-
log4j.logger.kafka.request.logger
-
log4j.logger.kafka.network.Processor
-
log4j.logger.kafka.server.KafkaApis
-
log4j.logger.kafka.network.RequestChannel$
-
log4j.logger.kafka.controller
-
log4j.logger.kafka.log.LogCleaner
-
log4j.logger.state.change.logger
-
log4j.logger.kafka.authorizer.logger
ZooKeeper にも設定可能なロガーもあります。
-
zookeeper.root.logger
Kafka と ZooKeeper では Apache log4j
ロガー実装が使用されます。
log4j2.properties
を使用して ConfigMap 内にロギング設定を記述するため、Operator によって Apache log4j2
ロガー実装が使用されます。詳細は、「Operator ロガー」 を参照してください。
logging
プロパティーを使用してロガーおよびロガーレベルを設定します。
ログレベルを設定するには、ロガーとレベルを直接指定 (インライン) するか、またはカスタム (外部) ConfigMap を使用します。ConfigMap を使用する場合、logging.name
プロパティーを外部ロギング設定が含まれる ConfigMap の名前に設定します。ConfigMap 内では、ロギング設定は log4j.properties
を使用して記述されます。
ここで、inline
および external
ロギングの例を示します。
inline ロギング
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka spec: # ... kafka: # ... logging: type: inline loggers: kafka.root.logger.level: "INFO" # ... zookeeper: # ... logging: type: inline loggers: zookeeper.root.logger: "INFO" # ... entityOperator: # ... topicOperator: # ... logging: type: inline loggers: rootLogger.level: INFO # ... userOperator: # ... logging: type: inline loggers: rootLogger.level: INFO # ...
外部ロギング
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka spec: # ... logging: type: external name: customConfigMap # ...
外部およびインラインロギングレベルの両方への変更は、再起動なしで Kafka ブローカーに適用されます。
その他のリソース
- ガベッジコレクター (GC) ロギングを有効 (または無効) にすることもできます。ガベージコレクションの詳細は、を参照してください。 「JVM 設定」
- ログレベルの詳細は、「Apache logging services」を参照してください。
2.1.13. Kafka のラックアウェアネス (Rack awareness)
AMQ Streams のラックアウェアネス (Rack awareness) 機能は、Kafka ブローカー Pod および Kafka トピックレプリカを異なるラック全体に分散できるようにします。ラック認識を有効にすることで、Kafka ブローカーや Kafka ブローカーがホストしているトピックの可用性を向上できるようにします。
「ラック」(Rack) は、可用性ゾーン、データセンター、またはデータセンターの実際のラックを表す可能性があります。
2.1.13.1. Kafka ブローカーでのラック認識 (Rack awareness) の設定
Kafka のラック認識 (Rack awareness) は、Kafka.spec.kafka
の rack
プロパティーで設定できます。rack
オブジェクトには、topologyKey
という名前の必須フィールドが 1 つあります。このキーは、OpenShift クラスターノードに割り当てられたラベルの 1 つと一致する必要があります。このラベルは、Kafka ブローカー Pod をノードにスケジュールする際に OpenShift によって使用されます。OpenShift クラスターがクラウドプロバイダープラットフォームで稼働している場合、そのラベルはノードが稼働している可用性ゾーンを表す必要があります。通常、ノードには topologyKey
の値として使用できる topology.kubernetes.io/zone
ラベル(または古い OpenShift バージョンの failure-domain.beta.kubernetes.io/zone
)のラベルが付けられます。OpenShift ノードラベルの詳細は、「Well-Known Labels, Annotations and Taints」を参照してください。これにより、ブローカー Pod がゾーン全体に分散され、Kafka ブローカー内にブローカーの broker.rack
設定パラメーターも設定されます。
前提条件
- OpenShift クラスター。
- 稼働中の Cluster Operator。
手順
- ノードがデプロイされたゾーンやラックを表すノードラベルについては、OpenShift 管理者に相談します。
ラベルをトポロジーキーとして使用し、
Kafka
リソースのrack
プロパティーを編集します。apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... rack: topologyKey: topology.kubernetes.io/zone # ...
リソースを作成または更新します。
oc apply
を使用して、これを行うことができます。oc apply -f your-file
関連情報
- Kafka ラックアウェアネスに init コンテナーイメージを設定するための詳細は、「コンテナーイメージ」 を参照してください。
2.1.14. ヘルスチェック
ヘルスチェックは、アプリケーションの健全性を検証する定期的なテストです。ヘルスチェックプローブが失敗すると、OpenShift によってアプリケーションが正常でないと見なされ、その修正が試行されます。
OpenShift では、以下の 2 つのタイプのおよび ヘルスチェックプローブがサポートされます。
- Liveness プローブ
- Readiness プローブ
プローブの詳細は、「Configure Liveness and Readiness Probes」を参照してください。AMQ Streams コンポーネントでは、両タイプのプローブが使用されます。
ユーザーは、Liveness および Readiness プローブに選択されたオプションを設定できます。
2.1.14.1. Healthcheck の設定
Liveness および Readiness プローブは、以下のリソースの livenessProbe
および readinessProbe
プロパティーを使用して設定できます。
-
Kafka.spec.kafka
-
Kafka.spec.zookeeper
-
Kafka.spec.entityOperator.tlsSidecar
-
Kafka.spec.entityOperator.topicOperator
-
Kafka.spec.entityOperator.userOperator
-
Kafka.spec.kafkaExporter
-
KafkaConnect.spec
-
KafkaConnectS2I.spec
-
KafkaMirrorMaker.spec
-
KafkaBridge.spec
livenessProbe
および readinessProbe
の両方で以下のオプションがサポートされます。
-
initialDelaySeconds
-
timeoutSeconds
-
periodSeconds
-
successThreshold
-
failureThreshold
livenessProbe
および readinessProbe
のオプションに関する詳細は、「Probe
スキーマ参照」 を参照してください。
Liveness および Readiness プローブの設定例
# ... readinessProbe: initialDelaySeconds: 15 timeoutSeconds: 5 livenessProbe: initialDelaySeconds: 15 timeoutSeconds: 5 # ...
2.1.14.2. Healthcheck の設定
前提条件
- OpenShift クラスター。
- 稼働中の Cluster Operator。
手順
Kafka
リソースのlivenessProbe
またはreadinessProbe
プロパティーを編集します。以下は例になります。apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... readinessProbe: initialDelaySeconds: 15 timeoutSeconds: 5 livenessProbe: initialDelaySeconds: 15 timeoutSeconds: 5 # ... zookeeper: # ...
リソースを作成または更新します。
oc apply
を使用して、これを行うことができます。oc apply -f your-file
2.1.15. Prometheus メトリクス
AMQ Streams では、Apache Kafka および ZooKeeper によってサポートされる JMX メトリクスを Prometheus メトリクスに変換するために、Prometheus JMX エクスポーター を使用した Prometheus メトリクスがサポートされます。有効になったメトリクスは、9404 番ポートで公開されます。
Prometheus および Grafana の設定およびデプロイに関する詳細は、『OpenShift での AMQ Streams のデプロイおよびアップグレード』の「Kafka へのメトリクスの導入」を参照してください。
2.1.15.1. メトリクスの設定
Prometheus メトリクスは、以下のリソースに metrics
プロパティーを設定して有効化されます。
-
Kafka.spec.kafka
-
Kafka.spec.zookeeper
-
KafkaConnect.spec
-
KafkaConnectS2I.spec
metrics
プロパティーがリソースに定義されていない場合、Prometheus メトリクスは無効になります。追加設定なしで Prometheus メトリクスのエクスポートを有効にするには、空のオブジェクト ({}
) を設定します。
追加設定なしでメトリクスを有効にする例
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... metrics: {} # ... zookeeper: # ...
metrics
プロパティーには、Prometheus JMX エスクポーター の追加設定が含まれることがあります。
追加の Prometheus JMX Exporter 設定を使用したメトリクスを有効化する例
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... metrics: lowercaseOutputName: true rules: - pattern: "kafka.server<type=(.+), name=(.+)PerSec\\w*><>Count" name: "kafka_server_$1_$2_total" - pattern: "kafka.server<type=(.+), name=(.+)PerSec\\w*, topic=(.+)><>Count" name: "kafka_server_$1_$2_total" labels: topic: "$3" # ... zookeeper: # ...
2.1.15.2. Prometheus メトリクスの設定
前提条件
- OpenShift クラスター。
- 稼働中の Cluster Operator。
手順
Kafka
リソースのmetrics
プロパティーを編集します。以下は例になります。apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... zookeeper: # ... metrics: lowercaseOutputName: true # ...
リソースを作成または更新します。
oc apply
を使用して、これを行うことができます。oc apply -f your-file
2.1.16. JMX オプション
AMQ Streams では、JMX ポートを 9999 番で開放することで、Kafka ブローカーから JMX メトリクスを取得することがサポートされます。各 Kafka ブローカーに関するさまざまなメトリクスを取得できます。たとえば、BytesPerSecond
の値やブローカーのネットワークの要求レートなどの、使用データを取得できます。AMQ Streams では、パスワードとユーザー名で保護された JMX ポートの開放や、保護されていない JMX ポートの開放がサポートされます。
2.1.16.1. JMX オプションの設定
前提条件
- OpenShift クラスター。
- 稼働中の Cluster Operator。
以下のリソースで jmxOptions
プロパティーを使用すると JMX オプションを設定できます。
-
Kafka.spec.kafka
Kafka ブローカーで開放された JMX ポートの、ユーザー名とパスワードの保護を設定できます。
JMX ポートのセキュリティー保護
JMX ポートをセキュアにすると、非承認の Pod によるポートへのアクセスを防ぐことができます。現在、JMX ポートをセキュアにする唯一の方法がユーザー名とパスワードを使用することです。JMX ポートのセキュリティーを有効にするには、authentication
フィールドの type
パラメーターを password
に設定します。
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... jmxOptions: authentication: type: "password" # ... zookeeper: # ...
これにより、ヘッドレスサービスを使用し、対応するブローカーを指定して Pod をクラスター内部にデプロイし、JMX メトリクスを取得することができます。ブローカー 0 から JMX メトリクスを取得するには、指定するヘッドレスサービスの前にブローカー 0 を追加します。
"<cluster-name>-kafka-0-<cluster-name>-<headless-service-name>"
JMX ポートがセキュアである場合、Pod のデプロイメントで JMX シークレットからユーザー名とパスワードを参照すると、そのユーザー名とパスワードを取得できます。
開放された JMX ポートの使用
JMX ポートのセキュリティーを無効にする場合は、authentication
フィールドに何も入力しません。
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... jmxOptions: {} # ... zookeeper: # ...
これにより、ヘッドレスサービスで JMX ポートを開放し、上記と似た方法で Pod をクラスター内にデプロイすることができます。唯一の違いは、すべての Pod が JMX ポートから読み取りできることです。
2.1.17. JVM オプション
AMQ Streams の以下のコンポーネントは、仮想マシン (VM) 内で実行されます。
- Apache Kafka
- Apache ZooKeeper
- Apache Kafka Connect
- Apache Kafka MirrorMaker
- AMQ Streams Kafka Bridge
JVM 設定オプションによって、さまざまなプラットフォームおよびアーキテクチャーのパフォーマンスが最適化されます。AMQ Streams では、これらのオプションの一部を設定できます。
2.1.17.1. JVM 設定
jvmOptions
プロパティーを使用して、コンポーネントが稼働している JVM のサポートされるオプションを設定します。
サポートされる JVM オプションは、さまざまなプラットフォームやアーキテクチャーのパフォーマンスを最適化するのに便利です。
2.1.17.2. JVM オプションの設定
前提条件
- OpenShift クラスター。
- 稼働中の Cluster Operator。
手順
Kafka
、KafkaConnect
、KafkaConnectS2I
、KafkaMirrorMaker
、またはKafkaBridge
リソースのjvmOptions
プロパティーを編集します。以下は例になります。apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... jvmOptions: "-Xmx": "8g" "-Xms": "8g" # ... zookeeper: # ...
リソースを作成または更新します。
oc apply
を使用して、これを行うことができます。oc apply -f your-file
2.1.18. コンテナーイメージ
AMQ Streams では、コンポーネントに使用されるコンテナーイメージを設定できます。コンテナーイメージのオーバーライドは、別のコンテナーレジストリーを使用する必要がある特別な状況でのみ推奨されます。たとえば、AMQ Streams によって使用されるコンテナーリポジトリーにネットワークがアクセスできない場合などがこれに該当します。そのような場合は、AMQ Streams イメージをコピーするか、ソースからビルドする必要があります。設定したイメージが AMQ Streams イメージと互換性のない場合は、適切に機能しない可能性があります。
2.1.18.1. コンテナーイメージの設定
image
プロパティーを使用して、使用するコンテナーイメージを指定します。
コンテナーイメージのオーバーライドは、特別な状況でのみ推奨されます。
2.1.18.2. コンテナーイメージの設定
前提条件
- OpenShift クラスター。
- 稼働中の Cluster Operator。
手順
Kafka
リソースのimage
プロパティーを編集します。以下は例になります。apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... image: my-org/my-image:latest # ... zookeeper: # ...
リソースを作成または更新します。
oc apply
を使用して、これを行うことができます。oc apply -f your-file
2.1.19. TLS サイドカー
サイドカーは、Pod で実行されるコンテナーですが、サポートの目的で提供されます。AMQ Streams では、TLS サイドカーは TLS を使用して、各種のコンポーネントと ZooKeeper との間のすべての通信を暗号化および復号化します。
TLS サイドカーは以下で使用されます。
- Entitiy Operator
- Cruise Control
2.1.19.1. TLS サイドカー設定
TLS サイドカーは、以下で tlsSidecar
プロパティーを使用して設定できます。
-
Kafka.spec.kafka
-
Kafka.spec.zookeeper
-
Kafka.spec.entityOperator
TLS サイドカーは、以下の追加オプションをサポートします。
-
image
-
resources
-
logLevel
-
readinessProbe
-
livenessProbe
resources
プロパティーを使用すると、TLS サイドカーに割り当てられる メモリーおよび CPU リソース を指定できます。
image
プロパティーを使用すると、使用されるコンテナーイメージを設定できます。カスタムコンテナーイメージの設定に関する詳細は、「コンテナーイメージ」 を参照してください。
logLevel
プロパティーは、ログレベルを指定するために使用されます。以下のログレベルがサポートされます。
- emerg
- alert
- crit
- err
- warning
- notice
- info
- debug
デフォルト値は notice です。
Healthcheck の readinessProbe
および livenessProbe
プロパティーの設定に関する詳細は、「Healthcheck の設定」 を参照してください。
TLS サイドカーの設定例
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... tlsSidecar: image: my-org/my-image:latest resources: requests: cpu: 200m memory: 64Mi limits: cpu: 500m memory: 128Mi logLevel: debug readinessProbe: initialDelaySeconds: 15 timeoutSeconds: 5 livenessProbe: initialDelaySeconds: 15 timeoutSeconds: 5 # ... zookeeper: # ...
2.1.19.2. TLS サイドカーの設定
前提条件
- OpenShift クラスター。
- 稼働中の Cluster Operator。
手順
Kafka
リソースのtlsSidecar
プロパティーを編集します。以下は例になります。apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... zookeeper: # ... entityOperator: # ... tlsSidecar: resources: requests: cpu: 200m memory: 64Mi limits: cpu: 500m memory: 128Mi # ... cruiseControl: # ... tlsSidecar: resources: requests: cpu: 200m memory: 64Mi limits: cpu: 500m memory: 128Mi # ...
リソースを作成または更新します。
oc apply
を使用して、これを行うことができます。oc apply -f your-file
2.1.20. Pod スケジューリングの設定
2 つのアプリケーションが同じ OpenShift ノードにスケジュールされた場合、両方のアプリケーションがディスク I/O のように同じリソースを使用し、パフォーマンスに影響する可能性があります。これにより、パフォーマンスが低下する可能性があります。ノードを他の重要なワークロードと共有しないように Kafka Pod をスケジュールする場合、適切なノードを使用したり、Kafka 専用のノードのセットを使用すると、このような問題を適切に回避できます。
2.1.20.1. 他のアプリケーションに基づく Pod のスケジューリング
2.1.20.1.1. 重要なアプリケーションがノードを共有しないようにする
Pod の非アフィニティーを使用すると、重要なアプリケーションが同じディスクにスケジュールされないようにすることができます。Kafka クラスターの実行時に、Pod の非アフィニティーを使用して、Kafka ブローカーがデータベースなどの他のワークロードとノードを共有しないようにすることが推奨されます。
2.1.20.1.2. アフィニティー
以下のリソースで affinity
をプロパティーを使用すると、アフィニティーを設定できます。
-
Kafka.spec.kafka.template.pod
-
Kafka.spec.zookeeper.template.pod
-
Kafka.spec.entityOperator.template.pod
-
KafkaConnect.spec.template.pod
-
KafkaConnectS2I.spec.template.pod
-
KafkaBridge.spec.template.pod
アフィニティー設定には、さまざまなタイプのアフィニティーを含めることができます。
- Pod のアフィニティーおよび非アフィニティー
- ノードのアフィニティー
affinity
プロパティーの形式は、OpenShift の仕様に準拠します。詳細は、Kubernetes のノードおよび Pod のアフィニティーに関するドキュメント を参照してください。
2.1.20.1.3. Kafka コンポーネントでの Pod の非アフィニティーの設定
前提条件
- OpenShift クラスター。
- 稼働中の Cluster Operator。
手順
クラスターデプロイメントを指定するリソースの
affinity
プロパティーを編集します。ラベルを使用して、同じノードでスケジュールすべきでない Pod を指定します。topologyKey
をkubernetes.io/hostname
に設定し、選択した Pod が同じホスト名のノードでスケジュールされてはならないことを指定する必要があります。以下は例になります。apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka spec: kafka: # ... template: pod: affinity: podAntiAffinity: requiredDuringSchedulingIgnoredDuringExecution: - labelSelector: matchExpressions: - key: application operator: In values: - postgresql - mongodb topologyKey: "kubernetes.io/hostname" # ... zookeeper: # ...
リソースを作成または更新します。
oc apply
を使用して、これを行うことができます。oc apply -f your-file
2.1.20.2. 特定のノードへの Pod のスケジューリング
2.1.20.2.1. ノードのスケジューリング
OpenShift クラスターは、通常多くの異なるタイプのワーカーノードで構成されます。ワークロードが非常に大きい環境の CPU に対して最適化されたものもあれば、メモリー、ストレージ (高速のローカル SSD)、または ネットワークに対して最適化されたものもあります。異なるノードを使用すると、コストとパフォーマンスの両面で最適化しやすくなります。最適なパフォーマンスを実現するには、AMQ Streams コンポーネントのスケジューリングで適切なノードを使用できるようにすることが重要です。
OpenShift はノードのアフィニティーを使用してワークロードを特定のノードにスケジュールします。ノードのアフィニティーにより、Pod がスケジュールされるノードにスケジューリングの制約を作成できます。制約はラベルセレクターとして指定されます。beta.kubernetes.io/instance-type
などの組み込みノードラベルまたはカスタムラベルのいずれかを使用してラベルを指定すると、適切なノードを選択できます。
2.1.20.2.2. アフィニティー
以下のリソースで affinity
をプロパティーを使用すると、アフィニティーを設定できます。
-
Kafka.spec.kafka.template.pod
-
Kafka.spec.zookeeper.template.pod
-
Kafka.spec.entityOperator.template.pod
-
KafkaConnect.spec.template.pod
-
KafkaConnectS2I.spec.template.pod
-
KafkaBridge.spec.template.pod
アフィニティー設定には、さまざまなタイプのアフィニティーを含めることができます。
- Pod のアフィニティーおよび非アフィニティー
- ノードのアフィニティー
affinity
プロパティーの形式は、OpenShift の仕様に準拠します。詳細は、Kubernetes のノードおよび Pod のアフィニティーに関するドキュメント を参照してください。
2.1.20.2.3. Kafka コンポーネントでのノードのアフィニティーの設定
前提条件
- OpenShift クラスター。
- 稼働中の Cluster Operator。
手順
AMQ Streams コンポーネントをスケジュールする必要のあるノードにラベルを付けます。
oc label
を使用してこれを行うことができます。oc label node your-node node-type=fast-network
または、既存のラベルによっては再利用が可能です。
クラスターデプロイメントを指定するリソースの
affinity
プロパティーを編集します。以下は例になります。apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka spec: kafka: # ... template: pod: affinity: nodeAffinity: requiredDuringSchedulingIgnoredDuringExecution: nodeSelectorTerms: - matchExpressions: - key: node-type operator: In values: - fast-network # ... zookeeper: # ...
リソースを作成または更新します。
oc apply
を使用して、これを行うことができます。oc apply -f your-file
2.1.20.3. 専用ノードの使用
2.1.20.3.1. 専用ノード
クラスター管理者は、選択した OpenShift ノードをテイントとしてマーク付けできます。テイントのあるノードは、通常のスケジューリングから除外され、通常の Pod はそれらのノードでの実行はスケジュールされません。ノードに設定されたテイントを許容できるサービスのみをスケジュールできます。このようなノードで実行されるその他のサービスは、ログコレクターやソフトウェア定義のネットワークなどのシステムサービスのみです。
テイントは専用ノードの作成に使用できます。専用のノードで Kafka とそのコンポーネントを実行する利点は多くあります。障害の原因になったり、Kafka に必要なリソースを消費するその他のアプリケーションが同じノードで実行されません。これにより、パフォーマンスと安定性が向上します。
専用ノードで Kafka Pod をスケジュールするには、ノードのアフィニティー と 許容 (toleration) を設定します。
2.1.20.3.2. アフィニティー
以下のリソースで affinity
をプロパティーを使用すると、アフィニティーを設定できます。
-
Kafka.spec.kafka.template.pod
-
Kafka.spec.zookeeper.template.pod
-
Kafka.spec.entityOperator.template.pod
-
KafkaConnect.spec.template.pod
-
KafkaConnectS2I.spec.template.pod
-
KafkaBridge.spec.template.pod
アフィニティー設定には、さまざまなタイプのアフィニティーを含めることができます。
- Pod のアフィニティーおよび非アフィニティー
- ノードのアフィニティー
affinity
プロパティーの形式は、OpenShift の仕様に準拠します。詳細は、Kubernetes のノードおよび Pod のアフィニティーに関するドキュメント を参照してください。
2.1.20.3.3. 許容 (Toleration)
以下のリソースで tolerations
プロパティーを使用すると許容 (Toleration) を設定できます。
-
Kafka.spec.kafka.template.pod
-
Kafka.spec.zookeeper.template.pod
-
Kafka.spec.entityOperator.template.pod
-
KafkaConnect.spec.template.pod
-
KafkaConnectS2I.spec.template.pod
-
KafkaBridge.spec.template.pod
tolerations
プロパティーの形式は、OpenShift の仕様に準拠します。詳細は、Kubernetes の「Taints and Tolerations」を参照してください。
2.1.20.3.4. 専用ノードの設定と Pod のスケジューリング
前提条件
- OpenShift クラスター。
- 稼働中の Cluster Operator。
手順
- 専用ノードとして使用するノードを選択します。
- これらのノードにスケジュールされているワークロードがないことを確認します。
選択したノードにテイントを設定します。
oc adm taint
を使用してこれを行うことができます。oc adm taint node your-node dedicated=Kafka:NoSchedule
さらに、選択したノードにラベルも追加します。
oc label
を使用してこれを行うことができます。oc label node your-node dedicated=Kafka
クラスターデプロイメントを指定するリソースの
affinity
およびtolerations
プロパティーを編集します。以下は例になります。apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka spec: kafka: # ... template: pod: tolerations: - key: "dedicated" operator: "Equal" value: "Kafka" effect: "NoSchedule" affinity: nodeAffinity: requiredDuringSchedulingIgnoredDuringExecution: nodeSelectorTerms: - matchExpressions: - key: dedicated operator: In values: - Kafka # ... zookeeper: # ...
リソースを作成または更新します。
oc apply
を使用して、これを行うことができます。oc apply -f your-file
2.1.21. Kafka Exporter
Kafka
リソースを設定すると、クラスターに Kafka Exporter を自動的にデプロイできます。
Kafka Exporter は、Prometheus メトリクス (主にオフセット、コンシューマーグループ、コンシューマーラグおよびトピックに関連するデータ) として分析用にデータを抽出します。
Kafka Exporter の設定と、パフォーマンスのためにコンシューマーラグの監視が重要な理由の詳細は、『OpenShift での AMQ Streams のデプロイおよびアップグレード』の「Kafka Exporter の追加」を参照してください。
2.1.22. Kafka クラスターのローリングアップデートの実行
この手順では、OpenShift アノテーションを使用して、既存の Kafka クラスターのローリングアップデートを手動でトリガーする方法を説明します。
前提条件
以下を実行する方法については、『 OpenShift での AMQ Streams のデプロイおよびアップグレード』を参照してください。
手順
手動で更新する Kafka Pod を制御する
StatefulSet
の名前を見つけます。たとえば、Kafka クラスターの名前が my-cluster の場合、対応する
StatefulSet
の名前は my-cluster-kafka になります。OpenShift で
StatefulSet
リソースにアノテーションを付けます。たとえば、oc annotate
を使用すると以下のようになります。oc annotate statefulset cluster-name-kafka strimzi.io/manual-rolling-update=true
-
次の調整が発生するまで待ちます (デフォルトでは 2 分ごとです)。アノテーションが調整プロセスで検出されれば、アノテーションが付いた
StatefulSet
内のすべての Pod でローリングアップデートがトリガーされます。すべての Pod のローリングアップデートが完了すると、アノテーションはStatefulSet
から削除されます。
2.1.23. ZooKeeper クラスターのローリングアップデートの実行
この手順では、OpenShift アノテーションを使用して、既存の ZooKeeper クラスターのローリングアップデートを手動でトリガーする方法を説明します。
前提条件
以下を実行する方法については、『 OpenShift での AMQ Streams のデプロイおよびアップグレード』を参照してください。
手順
手動で更新する ZooKeeper Pod を制御する
StatefulSet
の名前を見つけます。たとえば、Kafka クラスターの名前が my-cluster の場合、ZooKeeper の対応する
StatefulSet
の名前は my-cluster-zookeeper になります。OpenShift で
StatefulSet
リソースにアノテーションを付けます。たとえば、oc annotate
を使用すると以下のようになります。oc annotate statefulset cluster-name-zookeeper strimzi.io/manual-rolling-update=true
-
次の調整が発生するまで待ちます (デフォルトでは 2 分ごとです)。アノテーションが調整プロセスで検出されれば、アノテーションが付いた
StatefulSet
内のすべての Pod でローリングアップデートがトリガーされます。すべての Pod のローリングアップデートが完了すると、アノテーションはStatefulSet
から削除されます。
2.1.24. クラスターのスケーリング
2.1.24.1. Kafka クラスターのスケーリング
2.1.24.1.1. ブローカーのクラスターへの追加
トピックのスループットを向上させる主な方法は、そのトピックのパーティション数を増やすことです。これにより、追加のパーティションによってクラスター内の異なるブローカー間でトピックの負荷が共有されます。ただし、各ブローカーが特定のリソース (通常は I/O) によって制約される場合、パーティションを増やしてもスループットは向上しません。代わりに、ブローカーをクラスターに追加する必要があります。
追加のブローカーをクラスターに追加する場合、Kafka ではパーティションは自動的に割り当てられません。既存のブローカーから新規のブローカーに移動するパーティションを決定する必要があります。
すべてのブローカー間でパーティションが再分散されたら、各ブローカーのリソース使用率が低下するはずです。
2.1.24.1.2. クラスターからのブローカーの削除
AMQ Streams では StatefulSets
を使用してブローカー Pod を管理されるため、あらゆる Pod を削除できるわけではありません。クラスターから削除できるのは、番号が最も大きい 1 つまたは複数の Pod のみです。たとえば、12 個のブローカーがあるクラスターでは、Pod の名前は cluster-name-kafka-0
から cluster-name-kafka-11
になります。1 つのブローカー分をスケールダウンする場合、cluster-name-kafka-11
が削除されます。
クラスターからブローカーを削除する前に、そのブローカーにパーティションが割り当てられていないことを確認します。また、使用が停止されたブローカーの各パーティションを引き継ぐ、残りのブローカーを決める必要もあります。ブローカーに割り当てられたパーティションがなければ、クラスターを安全にスケールダウンできます。
2.1.24.2. パーティションの再割り当て
現在、Topic Operator はレプリカを別のブローカーに再割当てすることをサポートしないため、ブローカー Pod に直接接続してレプリカをブローカーに再割り当てする必要があります。
ブローカー Pod 内では、kafka-reassign-partitions.sh
ユーティリティーを使用してパーティションを別のブローカーに再割り当てできます。
これには、以下の 3 つのモードがあります。
--generate
- トピックとブローカーのセットを取り、再割り当て JSON ファイル を生成します。これにより、トピックのパーティションがブローカーに割り当てられます。これはトピック全体で動作するため、一部のトピックのパーティションを再割り当てする必要がある場合は使用できません。
--execute
- 再割り当て JSON ファイル を取り、クラスターのパーティションおよびブローカーに適用します。その結果、パーティションを取得したブローカーは、パーティションリーダーのフォロワーになります。新規ブローカーが ISR (In-Sync Replica、同期レプリカ) に参加できたら、古いブローカーはフォロワーではなくなり、そのレプリカが削除されます。
--verify
-
--verify
は、-- execute
ステップと同じ 再割り当て JSON ファイル を使用して、ファイル内のすべてのパーティションが目的のブローカーに移動されたかどうかを確認します。再割り当てが完了すると、--verify は有効な スロットル も削除します。スロットルを削除しないと、再割り当てが完了した後もクラスターは影響を受け続けます。
クラスターでは、1 度に 1 つの再割当てのみを実行でき、実行中の再割当てをキャンセルすることはできません。再割り当てをキャンセルする必要がある場合は、割り当てが完了するのを待ってから別の再割り当てを実行し、最初の再割り当ての結果を元に戻します。kafka-reassign-partitions.sh
によって、元に戻すための再割り当て JSON が出力の一部として生成されます。大規模な再割り当ては、進行中の再割り当てを停止する必要がある場合に備えて、複数の小さな再割り当てに分割するようにしてください。
2.1.24.2.1. 再割り当て JSON ファイル
再割り当て JSON ファイル には特定の構造があります。
{
"version": 1,
"partitions": [
<PartitionObjects>
]
}
ここで <PartitionObjects> は、以下のようなコンマ区切りのオブジェクトリストになります。
{ "topic": <TopicName>, "partition": <Partition>, "replicas": [ <AssignedBrokerIds> ] }
Kafka は "log_dirs"
プロパティーもサポートしますが、AMQ Streams では使用しないでください。
以下は、トピック topic-a
およびパーティション 4
をブローカー 2
、4
および 7
に割り当て、トピック topic-b
およびパーティション 2
をブローカー 1
、5
、および 7
に割り当てる、再割り当て JSON ファイルの例になります。
{ "version": 1, "partitions": [ { "topic": "topic-a", "partition": 4, "replicas": [2,4,7] }, { "topic": "topic-b", "partition": 2, "replicas": [1,5,7] } ] }
JSON に含まれていないパーティションは変更されません。
2.1.24.2.2. JBOD ボリューム間でのパーティションの再割り当て
Kafka クラスターで JBOD ストレージを使用する場合は、特定のボリュームとログディレクトリー (各ボリュームに単一のログディレクトリーがある) との間でパーティションを再割り当てを選択することができます。パーティションを特定のボリュームに再割り当てするには、再割り当て JSON ファイルで log_dirs
オプションを <PartitionObjects> に追加します。
{ "topic": <TopicName>, "partition": <Partition>, "replicas": [ <AssignedBrokerIds> ], "log_dirs": [ <AssignedLogDirs> ] }
log_dirs
オブジェクトに含まれるログディレクトリーの数は、replicas
オブジェクトで指定されるレプリカ数と同じである必要があります。値は、ログディレクトリーへの絶対パスか、any
キーワードである必要があります。
以下は例になります。
{ "topic": "topic-a", "partition": 4, "replicas": [2,4,7]. "log_dirs": [ "/var/lib/kafka/data-0/kafka-log2", "/var/lib/kafka/data-0/kafka-log4", "/var/lib/kafka/data-0/kafka-log7" ] }
2.1.24.3. 再割り当て JSON ファイルの生成
この手順では、kafka-reassign-partitions.sh
ツールを使用して、指定のトピックセットすべてのパーティションを再割り当てする再割り当て JSON ファイルを生成する方法を説明します。
前提条件
- 稼働中の Cluster Operator。
-
Kafka
リソース。 - パーティションを再割り当てするトピックセット。
手順
移動するトピックを一覧表示する
topics.json
という名前の JSON ファイルを準備します。これには、以下の構造が必要です。{ "version": 1, "topics": [ <TopicObjects> ] }
ここで <TopicObjects> は、以下のようなコンマ区切りのオブジェクトリストになります。
{ "topic": <TopicName> }
たとえば、
topic-a
とtopic-b
のすべてのパーティションを再割り当てするには、以下のようなtopics.json
ファイルを準備する必要があります。{ "version": 1, "topics": [ { "topic": "topic-a"}, { "topic": "topic-b"} ] }
topics.json
ファイルをブローカー Pod の 1 つにコピーします。cat topics.json | oc exec -c kafka <BrokerPod> -i -- \ /bin/bash -c \ 'cat > /tmp/topics.json'
kafka-reassign-partitions.sh
コマンドを使用して、再割り当て JSON を生成します。oc exec <BrokerPod> -c kafka -it -- \ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \ --topics-to-move-json-file /tmp/topics.json \ --broker-list <BrokerList> \ --generate
たとえば、
topic-a
およびtopic-b
のすべてのパーティションをブローカー4
および7
に移動する場合は、以下を実行しmす。oc exec <BrokerPod> -c kafka -it -- \ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \ --topics-to-move-json-file /tmp/topics.json \ --broker-list 4,7 \ --generate
2.1.24.4. 手動による再割り当て JSON ファイルの作成
特定のパーティションを移動したい場合は、再割り当て JSON ファイルを手動で作成できます。
2.1.24.5. 再割り当てスロットル
パーティションの再割り当てには、ブローカーの間で大量のデータを転送する必要があるため、処理が遅くなる可能性があります。クライアントへの悪影響を防ぐため、再割り当て処理をスロットルで調整することができます。これにより、再割り当ての完了に時間がかかる可能性があります。
- スロットルが低すぎると、新たに割り当てられたブローカーは公開されるレコードに遅れずに対応することはできず、再割り当ては永久に完了しません。
- スロットルが高すぎると、クライアントに影響します。
たとえば、プロデューサーの場合は、承認待ちが通常のレイテンシーよりも大きくなる可能性があります。コンシューマーの場合は、ポーリング間のレイテンシーが大きいことが原因でスループットが低下する可能性があります。
2.1.24.6. Kafka クラスターのスケールアップ
この手順では、Kafka クラスターでブローカーの数を増やす方法を説明します。
前提条件
- 既存の Kafka クラスター。
-
拡大されたクラスターでパーティションをブローカーに再割り当てする方法が記述される
reassignment.json
というファイル名の 再割り当て JSON ファイル。
手順
-
kafka.spec.kafka.replicas
設定オプションを増やして、新しいブローカーを必要なだけ追加します。 - 新しいブローカー Pod が起動したことを確認します。
後でコマンドを実行するブローカー Pod に
reassignment.json
ファイルをコピーします。cat reassignment.json | \ oc exec broker-pod -c kafka -i -- /bin/bash -c \ 'cat > /tmp/reassignment.json'
以下は例になります。
cat reassignment.json | \ oc exec my-cluster-kafka-0 -c kafka -i -- /bin/bash -c \ 'cat > /tmp/reassignment.json'
同じブローカー Pod から
kafka-reassign-partitions.sh
コマンドラインツールを使用して、パーティションの再割り当てを実行します。oc exec broker-pod -c kafka -it -- \ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \ --reassignment-json-file /tmp/reassignment.json \ --execute
レプリケーションをスロットルで調整する場合、
--throttle
とブローカー間のスロットル率 (バイト/秒単位) を渡すこともできます。以下は例になります。oc exec my-cluster-kafka-0 -c kafka -it -- \ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \ --reassignment-json-file /tmp/reassignment.json \ --throttle 5000000 \ --execute
このコマンドは、2 つの再割り当て JSON オブジェクトを出力します。最初の JSON オブジェクトには、移動されたパーティションの現在の割り当てが記録されます。後で再割り当てを元に戻す必要がある場合に備え、この値をローカルファイル (Pod のファイル以外) に保存します。2 つ目の JSON オブジェクトは、再割り当て JSON ファイルに渡した目的の再割り当てです。
再割り当ての最中にスロットルを変更する必要がある場合は、同じコマンドラインに別のスロットル率を指定して実行します。以下は例になります。
oc exec my-cluster-kafka-0 -c kafka -it -- \ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \ --reassignment-json-file /tmp/reassignment.json \ --throttle 10000000 \ --execute
ブローカー Pod のいずれかから
kafka-reassign-partitions.sh
コマンドラインツールを使用して、再割り当てが完了したかどうかを定期的に確認します。これは先ほどの手順と同じコマンドですが、--execute
オプションの代わりに--verify
オプションを使用します。oc exec broker-pod -c kafka -it -- \ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \ --reassignment-json-file /tmp/reassignment.json \ --verify
以下に例を示します。
oc exec my-cluster-kafka-0 -c kafka -it -- \ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \ --reassignment-json-file /tmp/reassignment.json \ --verify
-
--verify
コマンドによって、移動した各パーティションが正常に完了したことが報告されると、再割り当ては終了します。この最終的な--verify
によって、結果的に再割り当てスロットルも削除されます。割り当てを元のブローカーに戻すために JSON ファイルを保存した場合は、ここでそのファイルを削除できます。
2.1.24.7. Kafka クラスターのスケールダウン
関連情報
この手順では、Kafka クラスターでブローカーの数を減らす方法を説明します。
前提条件
- 既存の Kafka クラスター。
-
最も番号の大きい
Pod(s)
のブローカーが削除された後にクラスターのブローカーにパーティションを再割り当てする方法が記述されている、reassignment.json
という名前の 再割り当て JSON ファイル。
手順
後でコマンドを実行するブローカー Pod に
reassignment.json
ファイルをコピーします。cat reassignment.json | \ oc exec broker-pod -c kafka -i -- /bin/bash -c \ 'cat > /tmp/reassignment.json'
以下は例になります。
cat reassignment.json | \ oc exec my-cluster-kafka-0 -c kafka -i -- /bin/bash -c \ 'cat > /tmp/reassignment.json'
同じブローカー Pod から
kafka-reassign-partitions.sh
コマンドラインツールを使用して、パーティションの再割り当てを実行します。oc exec broker-pod -c kafka -it -- \ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \ --reassignment-json-file /tmp/reassignment.json \ --execute
レプリケーションをスロットルで調整する場合、
--throttle
とブローカー間のスロットル率 (バイト/秒単位) を渡すこともできます。以下は例になります。oc exec my-cluster-kafka-0 -c kafka -it -- \ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \ --reassignment-json-file /tmp/reassignment.json \ --throttle 5000000 \ --execute
このコマンドは、2 つの再割り当て JSON オブジェクトを出力します。最初の JSON オブジェクトには、移動されたパーティションの現在の割り当てが記録されます。後で再割り当てを元に戻す必要がある場合に備え、この値をローカルファイル (Pod のファイル以外) に保存します。2 つ目の JSON オブジェクトは、再割り当て JSON ファイルに渡した目的の再割り当てです。
再割り当ての最中にスロットルを変更する必要がある場合は、同じコマンドラインに別のスロットル率を指定して実行します。以下は例になります。
oc exec my-cluster-kafka-0 -c kafka -it -- \ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \ --reassignment-json-file /tmp/reassignment.json \ --throttle 10000000 \ --execute
ブローカー Pod のいずれかから
kafka-reassign-partitions.sh
コマンドラインツールを使用して、再割り当てが完了したかどうかを定期的に確認します。これは先ほどの手順と同じコマンドですが、--execute
オプションの代わりに--verify
オプションを使用します。oc exec broker-pod -c kafka -it -- \ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \ --reassignment-json-file /tmp/reassignment.json \ --verify
以下に例を示します。
oc exec my-cluster-kafka-0 -c kafka -it -- \ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \ --reassignment-json-file /tmp/reassignment.json \ --verify
-
--verify
コマンドによって、移動した各パーティションが正常に完了したことが報告されると、再割り当ては終了します。この最終的な--verify
によって、結果的に再割り当てスロットルも削除されます。割り当てを元のブローカーに戻すために JSON ファイルを保存した場合は、ここでそのファイルを削除できます。 すべてのパーティションの再割り当てが終了すると、削除されるブローカーはクラスター内のいずれのパーティションにも対応しないはずです。これは、ブローカーのデータログディレクトリーにライブパーティションのログが含まれていないことを確認すると検証できます。ブローカーのログディレクトリーに、拡張正規表現
[a-zA-Z0-9.-]+\.[a-z0-9]+-delete$
と一致しないディレクトリーが含まれる場合、ブローカーにはライブパーティションがあるため、停止してはなりません。これを確認するには、以下のコマンドを実行します。
oc exec my-cluster-kafka-0 -c kafka -it -- \ /bin/bash -c \ "ls -l /var/lib/kafka/kafka-log_<N>_ | grep -E '^d' | grep -vE '[a-zA-Z0-9.-]+\.[a-z0-9]+-delete$'"
N は削除された
Pod(s)
の数に置き換えます。上記のコマンドによって出力が生成される場合、ブローカーにはライブパーティションがあります。この場合、再割り当てが終了していないか、再割り当て JSON ファイルが適切ではありません。
-
ブローカーにライブパーティションがないことが確認できたら、
Kafka
リソースのKafka.spec.kafka.replicas
を編集できます。これにより、StatefulSet
がスケールダウンされ、番号が最も大きいブローカーPod(s)
が削除されます。
2.1.25. Kafka ノードの手動による削除
その他のリソース
この手順では、OpenShift アノテーションを使用して既存の Kafka ノードを削除する方法を説明します。Kafka ノードの削除するには、Kafka ブローカーが稼働している Pod
と、関連する PersistentVolumeClaim
の両方を削除します (クラスターが永続ストレージでデプロイされた場合)。削除後、Pod
と関連する PersistentVolumeClaim
は自動的に再作成されます。
PersistentVolumeClaim
を削除すると、データが永久に失われる可能性があります。以下の手順は、ストレージで問題が発生した場合にのみ実行してください。
前提条件
以下を実行する方法については、『 OpenShift での AMQ Streams のデプロイおよびアップグレード』を参照してください。
手順
削除する
Pod
の名前を見つけます。たとえば、クラスターの名前が cluster-name の場合、Pod の名前は cluster-name-kafka-index になります。index はゼロで始まり、レプリカーの合計数で終わる値です。
OpenShift で
Pod
リソースにアノテーションを付けます。oc annotate
を使用します。oc annotate pod cluster-name-kafka-index strimzi.io/delete-pod-and-pvc=true
- 基盤となる永続ボリューム要求 (Persistent Volume Claim) でアノテーションが付けられた Pod が削除され、再作成されるときに、次の調整の実行を待ちます。
2.1.26. ZooKeeper ノードの手動による削除
この手順では、OpenShift アノテーションを使用して既存の ZooKeeper ノードを削除する方法を説明します。ZooKeeper ノードの削除するには、ZooKeeper が稼働している Pod
と、関連する PersistentVolumeClaim
の両方を削除します (クラスターが永続ストレージでデプロイされた場合)。削除後、Pod
と関連する PersistentVolumeClaim
は自動的に再作成されます。
PersistentVolumeClaim
を削除すると、データが永久に失われる可能性があります。以下の手順は、ストレージで問題が発生した場合にのみ実行してください。
前提条件
以下を実行する方法については、『 OpenShift での AMQ Streams のデプロイおよびアップグレード』を参照してください。
手順
削除する
Pod
の名前を見つけます。たとえば、クラスターの名前が cluster-name の場合、Pod の名前は cluster-name-zookeeper-index になります。index はゼロで始まり、レプリカーの合計数で終わる値です。
OpenShift で
Pod
リソースにアノテーションを付けます。oc annotate
を使用します。oc annotate pod cluster-name-zookeeper-index strimzi.io/delete-pod-and-pvc=true
- 基盤となる永続ボリューム要求 (Persistent Volume Claim) でアノテーションが付けられた Pod が削除され、再作成されるときに、次の調整の実行を待ちます。
2.1.27. ローリングアップデートのメンテナンス時間枠
メンテナンス時間枠によって、Kafka および ZooKeeper クラスターの特定のローリングアップデートが便利な時間に開始されるようにスケジュールできます。
2.1.27.1. メンテナンス時間枠の概要
ほとんどの場合、Cluster Operator は対応する Kafka
リソースの変更に対応するために Kafka または ZooKeeper クラスターのみを更新します。これにより、Kafka
リソースの変更を適用するタイミングを計画し、Kafka クライアントアプリケーションへの影響を最小限に抑えることができます。
ただし、Kafka
リソースの変更がなくても Kafka および ZooKeeper クラスターの更新が発生することがあります。たとえば、Cluster Operator によって管理される CA (認証局) 証明書が期限切れ直前である場合にローリング再起動の実行が必要になります。
サービスの 可用性 は Pod のローリング再起動による影響を受けないはずですが (ブローカーおよびトピックの設定が適切である場合)、Kafka クライアントアプリケーションの パフォーマンス は影響を受ける可能性があります。メンテナンス時間枠によって、Kafka および ZooKeeper クラスターのこのような自発的なアップデートが便利な時間に開始されるようにスケジュールできます。メンテナンス時間枠がクラスターに設定されていない場合は、予測できない高負荷が発生する期間など、不便な時間にこのような自発的なローリングアップデートが行われる可能性があります。
2.1.27.2. メンテナンス時間枠の定義
Kafka.spec.maintenanceTimeWindows
プロパティーに文字列の配列を入力して、メンテナンス時間枠を設定します。各文字列は、UTC (協定世界時、Coordinated Universal Time) であると解釈される cron 式 です。UTC は実用的にはグリニッジ標準時と同じです。
以下の例では、日、月、火、水、および木曜日の午前 0 時に開始し、午前 1 時 59 分 (UTC) に終わる、単一のメンテナンス時間枠が設定されます。
# ... maintenanceTimeWindows: - "* * 0-1 ? * SUN,MON,TUE,WED,THU *" # ...
実際には、必要な CA 証明書の更新が設定されたメンテナンス時間枠内で完了できるように、Kafka
リソースの Kafka.spec.clusterCa.renewalDays
および Kafka.spec.clientsCa.renewalDays
プロパティーとともにメンテナンス期間を設定する必要があります。
AMQ Streams では、指定の期間にしたがってメンテナンス操作を正確にスケジュールしません。その代わりに、調整ごとにメンテナンス期間が現在「オープン」であるかどうかを確認します。これは、特定の時間枠内でのメンテナンス操作の開始が、最大で Cluster Operator の調整が行われる間隔の長さ分、遅れる可能性があることを意味します。したがって、メンテナンス時間枠は最低でもその間隔の長さにする必要があります。
その他のリソース
- Cluster Operator 設定についての詳細は、「Cluster Operator の設定」 を参照してください。
2.1.27.3. メンテナンス時間枠の設定
サポートされるプロセスによってトリガーされるローリングアップデートのメンテナンス時間枠を設定できます。
前提条件
- OpenShift クラスターが必要です。
- Cluster Operator が稼働している必要があります。
手順
Kafka
リソースのmaintenanceTimeWindows
プロパティー を追加または編集します。たとえば、0800 から 1059 までと、1400 から 1559 までのメンテナンスを可能にするには、以下のようにmaintenanceTimeWindows
を設定します。apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: kafka: # ... zookeeper: # ... maintenanceTimeWindows: - "* * 8-10 * * ?" - "* * 14-15 * * ?"
リソースを作成または更新します。
oc apply
を使用して、これを行うことができます。oc apply -f your-file
関連情報
- Kafka クラスターのローリングアップデートの実行については、を参照してください。 「Kafka クラスターのローリングアップデートの実行」
- ZooKeeper クラスターのローリングアップデートの実行については、を参照してください。 「ZooKeeper クラスターのローリングアップデートの実行」
2.1.28. CA 証明書の手動更新
クラスターおよびクライアント CA 証明書は、それぞれの証明書の更新期間の開始時に自動更新されます。Kafka.spec.clusterCa.generateCertificateAuthority
および Kafka.spec.clientsCa.generateCertificateAuthority
が false
に設定されている場合、CA 証明書は自動更新されません。
証明書の更新期間が始まる前に、これらの証明書のいずれかまたは両方を手動で更新できます。セキュリティー上の理由や、証明書の更新または有効期間を変更した 場合などに、自動更新を行うことがあります。
更新された証明書は、更新前の証明書と同じ秘密鍵を使用します。
前提条件
- Cluster Operator が稼働している必要があります。
- CA 証明書と秘密鍵がインストールされている Kafka クラスターが必要です。
手順
strimzi.io/force-renew
アノテーションを、更新対象の CA 証明書が含まれるSecret
に適用します。表2.1 証明書の更新を強制する Secret のアノテーション。 証明書 Secret annotate コマンド クラスター CA
KAFKA-CLUSTER-NAME-cluster-ca-cert
oc annotate secret KAFKA-CLUSTER-NAME-cluster-ca-cert strimzi.io/force-renew=true
クライアント CA
KAFKA-CLUSTER-NAME-clients-ca-cert
oc annotate secret KAFKA-CLUSTER-NAME-clients-ca-cert strimzi.io/force-renew=true
次回の調整で、アノテーションを付けた
Secret
の新規 CA 証明書が Cluster Operator によって生成されます。メンテナンス時間枠が設定されている場合、Cluster Operator によって、最初の調整時に次のメンテナンス時間枠内で新規 CA 証明書が生成されます。Cluster Operator によって更新されたクラスターおよびクライアント CA 証明書をクライアントアプリケーションがリロードする必要があります。
CA 証明書が有効である期間を確認します。
たとえば、
openssl
コマンドを使用します。oc get secret CA-CERTIFICATE-SECRET -o 'jsonpath={.data.CA-CERTIFICATE}' | base64 -d | openssl x509 -subject -issuer -startdate -enddate -noout
CA-CERTIFICATE-SECRETは
Secret
の名前で、クラスタCA証明書の場合はKAFKA-CLUSTER-NAME-cluster-ca-cert
であり、クライアントCA証明書の場合はKAFKA-CLUSTER-NAME-clients-ca-cert
となります。CA-CERTIFICATEは、
jsonpath={.data.ca\.crt}
のように、CA証明書の名前です。このコマンドは、CA 証明書の有効期間である
notBefore
およびnotAfter
の日付を返します。たとえば、クラスター CA 証明書の場合は以下のようになります。
subject=O = io.strimzi, CN = cluster-ca v0 issuer=O = io.strimzi, CN = cluster-ca v0 notBefore=Jun 30 09:43:54 2020 GMT notAfter=Jun 30 09:43:54 2021 GMT
Secret から古い証明書を削除します。
コンポーネントで新しい証明書が使用される場合でも、古い証明書がアクティブであることがあります。古い証明書を削除して、潜在的なセキュリティーリスクを取り除きます。
2.1.29. 秘密鍵の交換
クラスター CA およびクライアント CA 証明書によって使用される秘密鍵を交換できます。秘密鍵を交換すると、Cluster Operator は新しい秘密鍵の新規 CA 証明書を生成します。
前提条件
- Cluster Operator が稼働している必要があります。
- CA 証明書と秘密鍵がインストールされている Kafka クラスターが必要です。
手順
更新対象の秘密鍵が含まれる
Secret
にstrimzi.io/force-replace
アノテーションを適用します。表2.2 秘密鍵を置き換えるコマンド 秘密鍵 Secret annotate コマンド クラスター CA
<cluster-name>-cluster-ca
oc annotate secret <cluster-name>-cluster-ca strimzi.io/force-replace=true
クライアント CA
<cluster-name>-clients-ca
oc annotate secret <cluster-name>-clients-ca strimzi.io/force-replace=true
次回の調整時に、Cluster Operator は以下を生成します。
-
アノテーションを付けた
Secret
の新しい秘密鍵 - 新規 CA 証明書
メンテナンス時間枠が設定されている場合、Cluster Operator によって、最初の調整時に次のメンテナンス時間枠内で新しい秘密鍵と CA 証明書が生成されます。
Cluster Operator によって更新されたクラスターおよびクライアント CA 証明書をクライアントアプリケーションがリロードする必要があります。
その他のリソース
2.1.30. Kafka クラスターの一部として作成されたリソースの一覧
以下のリソースは、OpenShift クラスターの Cluster Operator によって作成されます。
共有リソース
cluster-name-cluster-ca
- クラスター通信の暗号化に使用されるクラスター CA のあるシークレット。
cluster-name-cluster-ca-cert
- クラスター CA 公開鍵のあるシークレット。このキーは、Kafka ブローカーのアイデンティティーの検証に使用できます。
cluster-name-clients-ca
- ユーザー証明書に署名するために使用されるクライアント CA 秘密鍵のあるシークレット。
cluster-name-clients-ca-cert
- クライアント CA 公開鍵のあるシークレット。このキーは、Kafka ユーザーのアイデンティティーの検証に使用できます。
cluster-name-cluster-operator-certs
- Kafka および ZooKeeper と通信するための Cluster Operator キーのあるシークレット。
ZooKeeper ノード
cluster-name-zookeeper
- ZooKeeper ノード Pod の管理を担当する StatefulSet。
cluster-name-zookeeper-idx
- Zookeeper StatefulSet によって作成された Pod。
cluster-name-zookeeper-nodes
- DNS が ZooKeeper Pod の IP アドレスを直接解決するのに必要なヘッドレスサービス。
cluster-name-zookeeper-client
- Kafka ブローカーがクライアントとして ZooKeeper ノードに接続するために使用するサービス。
cluster-name-zookeeper-config
- ZooKeeper 補助設定が含まれ、ZooKeeper ノード Pod によってボリュームとしてマウントされる ConfigMap。
cluster-name-zookeeper-nodes
- ZooKeeper ノードキーがあるシークレット。
cluster-name-zookeeper
- Zookeeper ノードで使用されるサービスアカウント。
cluster-name-zookeeper
- ZooKeeper ノードに設定された Pod の Disruption Budget。
cluster-name-network-policy-zookeeper
- ZooKeeper サービスへのアクセスを管理するネットワークポリシー。
data-cluster-name-zookeeper-idx
-
ZooKeeper ノード Pod
idx
のデータを保存するために使用されるボリュームの永続ボリューム要求です。このリソースは、データを保存するために永続ボリュームのプロビジョニングに永続ストレージが選択された場合のみ作成されます。
Kafka ブローカー
cluster-name-kafka
- Kafka ブローカー Pod の管理を担当する StatefulSet。
cluster-name-kafka-idx
- Kafka StatefulSet によって作成された Pod。
cluster-name-kafka-brokers
- DNS が Kafka ブローカー Pod の IP アドレスを直接解決するのに必要なサービス。
cluster-name-kafka-bootstrap
- サービスは、Kafka クライアントのブートストラップサーバーとして使用できます。
cluster-name-kafka-external-bootstrap
- OpenShift クラスター外部から接続するクライアントのブートストラップサービス。このリソースは、外部リスナーが有効な場合にのみ作成されます。
cluster-name-kafka-pod-id
- トラフィックを OpenShift クラスターの外部から個別の Pod にルーティングするために使用されるサービス。このリソースは、外部リスナーが有効な場合にのみ作成されます。
cluster-name-kafka-external-bootstrap
-
OpenShift クラスターの外部から接続するクライアントのブートストラップルート。このリソースは、外部リスナーが有効になっていて、タイプ
route
に設定されている場合にのみ作成されます。 cluster-name-kafka-pod-id
-
OpenShift クラスターの外部から個別の Pod へのトラフィックに対するルート。このリソースは、外部リスナーが有効になっていて、タイプ
route
に設定されている場合にのみ作成されます。 cluster-name-kafka-config
- Kafka 補助設定が含まれ、Kafka ブローカー Pod によってボリュームとしてマウントされる ConfigMap。
cluster-name-kafka-brokers
- Kafka ブローカーキーのあるシークレット。
cluster-name-kafka
- Kafka ブローカーによって使用されるサービスアカウント。
cluster-name-kafka
- Kafka ブローカーに設定された Pod の Disruption Budget。
cluster-name-network-policy-kafka
- Kafka サービスへのアクセスを管理するネットワークポリシー。
strimzi-namespace-name-cluster-name-kafka-init
- Kafka ブローカーによって使用されるクラスターロールバインディング。
cluster-name-jmx
- Kafka ブローカーポートのセキュア化に使用される JMX ユーザー名およびパスワードのあるシークレット。このリソースは、Kafka で JMX が有効になっている場合にのみ作成されます。
data-cluster-name-kafka-idx
-
Kafka ブローカー Pod
idx
のデータを保存するために使用されるボリュームの永続ボリューム要求です。このリソースは、データを保存するために永続ボリュームのプロビジョニングに永続ストレージが選択された場合のみ作成されます。 data-id-cluster-name-kafka-idx
-
Kafka ブローカー Pod
idx
のデータを保存するために使用されるボリュームid
の永続ボリューム要求です。このリソースは、永続ボリュームをプロビジョニングしてデータを保存するときに、JBOD ボリュームに永続ストレージが選択された場合のみ作成されます。
Entitiy Operator
これらのリソースは、Cluster Operator を使用して Entity Operator がデプロイされる場合にのみ作成されます。
cluster-name-entity-operator
- Topic および User Operator とのデプロイメント。
cluster-name-entity-operator-random-string
- Entity Operator デプロイメントによって作成された Pod。
cluster-name-entity-topic-operator-config
- Topic Operator の補助設定のある ConfigMap。
cluster-name-entity-user-operator-config
- User Operator の補助設定のある ConfigMap。
cluster-name-entity-operator-certs
- Kafka および ZooKeeper と通信するための Entity Operator キーのあるシークレット。
cluster-name-entity-operator
- Entity Operator によって使用されるサービスアカウント。
strimzi-cluster-name-topic-operator
- Entity Operator によって使用されるロールバインディング。
strimzi-cluster-name-user-operator
- Entity Operator によって使用されるロールバインディング。
Kafka Exporter
これらのリソースは、Cluster Operator を使用して Kafka Exporter がデプロイされる場合にのみ作成されます。
cluster-name-kafka-exporter
- Kafka Exporter でのデプロイメント。
cluster-name-kafka-exporter-random-string
- Kafka Exporter デプロイメントによって作成された Pod。
cluster-name-kafka-exporter
- コンシューマーラグメトリクスの収集に使用されるサービス。
cluster-name-kafka-exporter
- Kafka Exporter によって使用されるサービスアカウント。
Cruise Control
これらのリソースは、Cluster Operator を使用して Cruise Control がデプロイされた場合のみ作成されます。
cluster-name-cruise-control
- Cruise Control でのデプロイメント。
cluster-name-cruise-control-random-string
- Cruise Control デプロイメントによって作成された Pod。
cluster-name-cruise-control-config
- Cruise Control の補助設定が含まれ、Cruise Control Pod によってボリュームとしてマウントされる ConfigMap。
cluster-name-cruise-control-certs
- Kafka および ZooKeeper と通信するための Cruise Control キーのあるシークレット。
cluster-name-cruise-control
- Cruise Control との通信に使用されるサービス。
cluster-name-cruise-control
- Cruise Control によって使用されるサービスアカウント。
cluster-name-network-policy-cruise-control
- Cruise Control サービスへのアクセスを管理するネットワークポリシー。
JMXTrans
これらのリソースは、Cluster Operator を使用して JMXTrans がデプロイされる場合にのみ作成されます。
cluster-name-jmxtrans
- JMXTrans でのデプロイメント。
cluster-name-jmxtrans-random-string
- JMXTrans デプロイメントによって作成された Pod。
cluster-name-jmxtrans-config
- JMXTrans 補助設定が含まれ、JMXTrans Pod によってボリュームとしてマウントされる ConfigMap。
cluster-name-jmxtrans
- JMXTrans によって使用されるサービスアカウント。