第12章 AMQ Streams の管理
本章では、AMQ Streams のデプロイメントを維持するタスクについて説明します。
12.1. カスタムリソースの使用
oc
コマンドを使用して、AMQ Streams カスタムリソースで情報を取得し、他の操作を実行できます。
カスタムリソースの status
サブリソースと oc
を使用すると、リソースに関する情報を取得できます。
12.1.1. カスタムリソースでの oc
操作の実行
get
、describe
、edit
、または delete
などの oc
コマンドを使用して、リソースタイプで操作を実行します。たとえば、oc get kafkatopics
はすべての Kafka トピックのリストを取得し、oc get kafkas
はデプロイされたすべての Kafka クラスターを取得します。
リソースタイプを参照する場合は、単数名および複数名の両方を使用できます。oc get kafkas
は、oc get kafka
と同じ結果を得られます。
リソースの 短縮名 を使用することもできます。短縮名を理解すると、AMQ Streams を管理する時間を節約できます。Kafka
の短縮名は k
であるため、oc get k
を実行してすべての Kafka クラスターを一覧表示することもできます。
oc get k NAME DESIRED KAFKA REPLICAS DESIRED ZK REPLICAS my-cluster 3 3
AMQ Streams リソース | 正式名 | 短縮名 |
---|---|---|
Kafka | kafka | k |
Kafka Topic | kafkatopic | kt |
Kafka User | kafkauser | ku |
Kafka Connect | kafkaconnect | kc |
Kafka Connect S2I | kafkaconnects2i | kcs2i |
Kafka Connector | kafkaconnector | kctr |
Kafka Mirror Maker | kafkamirrormaker | kmm |
Kafka Mirror Maker 2 | kafkamirrormaker2 | kmm2 |
Kafka Bridge | kafkabridge | kb |
Kafka Rebalance | kafkarebalance | kr |
12.1.1.1. リソースカテゴリー
カスタムリソースのカテゴリーは oc
コマンドでも使用することができます。
すべての AMQ Streams カスタムリソースはカテゴリー strimzi
に属するため、strimzi
を使用すると 1 つのコマンドですべての AMQ Streams リソースを取得できます。
たとえば、oc get strimzi
を実行すると、指定の namespace のすべての AMQ Streams カスタムリソースが一覧表示されます。
oc get strimzi NAME DESIRED KAFKA REPLICAS DESIRED ZK REPLICAS kafka.kafka.strimzi.io/my-cluster 3 3 NAME PARTITIONS REPLICATION FACTOR kafkatopic.kafka.strimzi.io/kafka-apps 3 3 NAME AUTHENTICATION AUTHORIZATION kafkauser.kafka.strimzi.io/my-user tls simple
oc get strimzi -o name
コマンドは、すべてのリソースタイプおよびリソース名を返します。-o name
オプションは type/name 形式で出力を取得します。
oc get strimzi -o name kafka.kafka.strimzi.io/my-cluster kafkatopic.kafka.strimzi.io/kafka-apps kafkauser.kafka.strimzi.io/my-user
この strimzi
コマンドを他のコマンドと組み合わせることができます。たとえば、これを oc delete
コマンドに渡して、1 つのコマンドですべてのリソースを削除できます。
oc delete $(oc get strimzi -o name) kafka.kafka.strimzi.io "my-cluster" deleted kafkatopic.kafka.strimzi.io "kafka-apps" deleted kafkauser.kafka.strimzi.io "my-user" deleted
1 つの操作ですべてのリソースを削除することは、AMQ Streams の新機能をテストする場合などに役立ちます。
12.1.1.2. サブリソースのステータスのクエリー
他の値を -o
オプションに渡すことができます。たとえば、-o yaml
を使用すると、YAML 形式で出力されます。-o json
を使用すると JSON で返されます。
すべてのオプションは oc get --help
で確認できます。
最も便利なオプションの 1 つは JSONPath サポート で、JSONPath 式を渡して Kubernetes API にクエリーを実行できます。JSONPath 式は、リソースの特定部分を抽出または操作できます。
たとえば、JSONPath 式 {.status.listeners[?(@.type=="tls")].bootstrapServers}
を使用して、Kafka カスタムリソースのステータスからブートストラップアドレスを取得し、Kafka クライアントで使用できます。
このコマンドは、tls
リスナーの bootstrapServers
の値を見つけます。
oc get kafka my-cluster -o=jsonpath='{.status.listeners[?(@.type=="tls")].bootstrapServers}{"\n"}' my-cluster-kafka-bootstrap.myproject.svc:9093
type 条件を @.type=="external"
または @.type=="plain"
に変更すると、他の Kafka リスナーのアドレスを取得することもできます。
oc get kafka my-cluster -o=jsonpath='{.status.listeners[?(@.type=="external")].bootstrapServers}{"\n"}' 192.168.1.247:9094
jsonpath
を使用すると、カスタムリソースからその他のプロパティーやプロパティーのグループを抽出することができます。
12.1.2. AMQ Streams カスタムリソースのステータス情報
下記の表のとおり、複数のリソースに status
プロパティーがあります。
AMQ Streams リソース | スキーマ参照 | ステータス情報がパブリッシュされる場所 |
---|---|---|
| Kafka クラスター。 | |
| デプロイされている場合は Kafka Connect クラスター。 | |
| デプロイされている場合は Source-to-Image (S2I) サポートのある Kafka Connect クラスター。 | |
| デプロイされている場合は KafkaConnector リソース。 | |
| デプロイされている場合は Kafka MirrorMakerツール。 | |
| Kafka クラスターの Kafka トピック | |
| Kafka クラスターの Kafka ユーザー。 | |
| デプロイされている場合は AMQ Streams の Kafka Bridge。 |
リソースの status
プロパティーによって、リソースの下記項目の情報が提供されます。
-
status.conditions
プロパティーの Current state (現在の状態)。 -
status.observedGeneration
プロパティーの Last observed generation (最後に確認された生成)。
status
プロパティーによって、リソース固有の情報も提供されます。以下に例を示します。
-
KafkaStatus
によって、リスナーアドレスの情報と Kafka クラスターの ID が提供されます。 -
KafkaConnectStatus
によって、Kafka Connect コネクターの REST API エンドポイントが提供されます。 -
KafkaUserStatus
によって、Kafka ユーザーの名前と、ユーザーのクレデンシャルが保存されるSecret
が提供されます。 -
KafkaBridgeStatus
によって、外部クライアントアプリケーションが Bridge サービスにアクセスできる HTTP アドレスが提供されます。
リソースの Current state (現在の状態) は、spec
プロパティーによって定義される Desired state (望ましい状態) を実現するリソースに関する進捗を追跡するのに便利です。ステータス条件によって、リソースの状態が変更された時間および理由が提供され、Operator によるリソースの望ましい状態の実現を妨げたり遅らせたりしたイベントの詳細が提供されます。
Last observed generation (最後に確認された生成) は、Cluster Operator によって最後に照合されたリソースの生成です。observedGeneration
の値が metadata.generation
の値と異なる場合、リソースの最新の更新が Operator によって処理されていません。これらの値が同じである場合、リソースの最新の変更がステータス情報に反映されます。
AMQ Streams によってカスタムリソースのステータスが作成および維持されます。定期的にカスタムリソースの現在の状態が評価され、その結果に応じてステータスが更新されます。くださいーたとえば、oc edit
を使用してカスタムリソースで更新を行う場合、その status
は編集不可能です。さらに、status
の変更は Kafka クラスターステータスの設定に影響しません。
以下では、Kafka カスタムリソースに status
プロパティーが指定されています。
Kafka カスタムリソースとステータス
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: spec: # ... status: conditions: 1 - lastTransitionTime: 2021-07-23T23:46:57+0000 status: "True" type: Ready 2 observedGeneration: 4 3 listeners: 4 - addresses: - host: my-cluster-kafka-bootstrap.myproject.svc port: 9092 type: plain - addresses: - host: my-cluster-kafka-bootstrap.myproject.svc port: 9093 certificates: - | -----BEGIN CERTIFICATE----- ... -----END CERTIFICATE----- type: tls - addresses: - host: 172.29.49.180 port: 9094 certificates: - | -----BEGIN CERTIFICATE----- ... -----END CERTIFICATE----- type: external clusterId: CLUSTER-ID 5 # ...
- 1
- status の
conditions
は、既存のリソース情報から推測できないステータスに関連する基準や、リソースのインスタンスに固有する基準を記述します。 - 2
Ready
条件は、Cluster Operator が現在 Kafka クラスターでトラフィックの処理が可能であると判断するかどうかを示しています。- 3
observedGeneration
は、最後に Cluster Operator によって照合されたKafka
カスタムリソースの生成を示しています。- 4
listeners
は、現在の Kafka ブートストラップアドレスをタイプ別に示しています。- 5
- Kafka クラスター ID。重要
タイプが
nodeport
の外部リスナーのカスタムリソースステータスにおけるアドレスは、現在サポートされていません。
Kafka ブートストラップアドレスがステータスに一覧表示されても、それらのエンドポイントまたは Kafka クラスターが準備状態であるとは限りません。
ステータス情報のアクセス
リソースのステータス情報はコマンドラインから取得できます。詳細は、「カスタムリソースのステータスの検出」 を参照してください。
12.1.3. カスタムリソースのステータスの検出
この手順では、カスタムリソースのステータスを検出する方法を説明します。
前提条件
- OpenShift クラスターが必要です。
- Cluster Operator が稼働している必要があります。
手順
カスタムリソースを指定し、
-o jsonpath
オプションを使用して標準の JSONPath 式を適用してstatus
プロパティーを選択します。oc get kafka <kafka_resource_name> -o jsonpath='{.status}'
この式は、指定されたカスタムリソースのすべてのステータス情報を返します。
status.listeners
またはstatus.observedGeneration
などのドット表記を使用すると、表示するステータス情報を微調整できます。
その他のリソース
- 「AMQ Streams カスタムリソースのステータス情報」
- JSONPath の使用に関する詳細は、「JSONPath support」を参照してください。
12.2. カスタムリソースの調整の一時停止
修正や更新を実行するために、AMQ Streams Operator によって管理されるカスタムリソースの調整を一時停止すると便利な場合があります。調整が一時停止されると、カスタムリソースに加えられた変更は一時停止が終了するまで Operator によって無視されます。
カスタムリソースの調整を停止する場合は、設定で strimzi.io/pause-reconciliation
アノテーションを true
に設定します。これにより、適切な Operator がカスタムリソースの調整を一時停止するよう指示されます。たとえば、Cluster Operator による調整が一時停止されるように、アノテーションを KafkaConnect
リソースに適用できます。
pause アノテーションを有効にしてカスタムリソースを作成することもできます。カスタムリソースは作成されますが、無視されます。
現在、KafkaTopic
リソースの調整を一時停止することはできません。
前提条件
- カスタムリソースを管理する AMQ Streams Operator が稼働している必要があります。
手順
OpenShift のカスタムリソースにアノテーションを付け、
pause-reconciliation
をtrue
に設定します。oc annotate KIND-OF-CUSTOM-RESOURCE NAME-OF-CUSTOM-RESOURCE strimzi.io/pause-reconciliation="true"
たとえば、
KafkaConnect
カスタムリソースの場合は次のようになります。oc annotate KafkaConnect my-connect strimzi.io/pause-reconciliation="true"
カスタムリソースの status 条件で、
ReconciliationPaused
への変更が表示されていることを確認します。oc describe KIND-OF-CUSTOM-RESOURCE NAME-OF-CUSTOM-RESOURCE
lastTransitionTime
でtype
条件がReconciliationPaused
に変更されています。一時停止された調整条件タイプを持つカスタムリソースの例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: annotations: strimzi.io/pause-reconciliation: "true" strimzi.io/use-connector-resources: "true" creationTimestamp: 2021-03-12T10:47:11Z #... spec: # ... status: conditions: - lastTransitionTime: 2021-03-12T10:47:41.689249Z status: "True" type: ReconciliationPaused
一時停止からの再開
-
調整を再開するには、アノテーションを
false
に設定するか、アノテーションを削除します。
その他のリソース
12.3. Kafka および ZooKeeper クラスターの手動によるローリングアップデートの開始
AMQ Streams は、Cluster Operator 経由で Kafka および ZooKeeper クラスターのローリングアップデートを手動でトリガーするために、StatefulSet
および Pod
リソースのアノテーションの使用をサポートします。ローリングアップデートにより、新しい Pod でリソースの Pod が再起動されます。
通常、例外的な状況でのみ、特定の Pod や同じ StatefulSet
からの Pod のセットを手動で実行する必要があります。ただし、Pod を直接削除せずに、Cluster Operator 経由でローリングアップデートを実行すると、以下を確実に行うことができます。
- Pod を手動で削除しても、他の Pod を並行して削除するなどの、同時に行われる Cluster Operator の操作とは競合しません。
- Cluster Operator ロジックによって、In-Sync レプリカの数などの Kafka 設定で指定された内容が処理されます。
12.3.1. 前提条件
手動でローリングアップデートを実行するには、稼働中の Cluster Operator および Kafka クラスターが必要です。
以下を実行する方法については、『 OpenShift での AMQ Streams のデプロイおよびアップグレード』を参照してください。
12.3.2. StatefulSet アノテーションを使用したローリングアップデートの実行
この手順では、OpenShift StatefulSet
アノテーションを使用して、既存の Kafka クラスターまたは ZooKeeper クラスターのローリングアップデートを手動でトリガーする方法を説明します。
手順
手動で更新する Kafka または ZooKeeper Pod を制御する
StatefulSet
の名前を見つけます。たとえば、Kafka クラスターの名前が my-cluster の場合、対応する
StatefulSet
の名前は my-cluster-kafka と my-cluster-zookeeper になります。OpenShift で
StatefulSet
リソースにアノテーションを付けます。oc annotate
を使用します。oc annotate statefulset cluster-name-kafka strimzi.io/manual-rolling-update=true oc annotate statefulset cluster-name-zookeeper strimzi.io/manual-rolling-update=true
-
次の調整が発生するまで待ちます (デフォルトでは 2 分ごとです)。アノテーションが調整プロセスで検出されれば、アノテーションが付いた
StatefulSet
内のすべての Pod でローリングアップデートがトリガーされます。すべての Pod のローリングアップデートが完了すると、アノテーションはStatefulSet
から削除されます。
12.3.3. Pod アノテーションを使用したローリングアップデートの実行
この手順では、OpenShift Pod
アノテーションを使用して、既存の Kafka クラスターまたは ZooKeeper クラスターのローリングアップデートを手動でトリガーする方法を説明します。同じ StatefulSet
の複数の Pod にアノテーションが付けられると、連続したローリングアップデートは同じ調整実行内で実行されます。
手順
手動で更新する Kafka または ZooKeeper
Pod
の名前を見つけます。たとえば、Kafka クラスターの名前が my-cluster の場合、対応する
Pod
の名前は my-cluster-kafka-index と my-cluster-zookeeper-index になります。インデックス はゼロで始まり、レプリカの総数で終わります。OpenShift で
Pod
リソースにアノテーションを付けます。oc annotate
を使用します。oc annotate pod cluster-name-kafka-index strimzi.io/manual-rolling-update=true oc annotate pod cluster-name-zookeeper-index strimzi.io/manual-rolling-update=true
-
次の調整が発生するまで待ちます (デフォルトでは 2 分ごとです)。アノテーションが調整プロセスで検出されれば、アノテーションが付いた
Pod
のローリングアップデートがトリガーされます。Pod のローリングアップデートが完了すると、アノテーションはPod
から削除されます。
12.4. ラベルおよびアノテーションを使用したサービスの検出
サービスディスカバリーは、AMQ Streams と同じ OpenShift クラスターで稼働しているクライアントアプリケーションの Kafka クラスターとの対話を容易にします。
サービスディスカバリー ラベルおよびアノテーションは、Kafka クラスターにアクセスするために使用されるサービスに対して生成されます。
- 内部 Kafka ブートストラップサービス
- HTTP Bridge サービス
ラベルは、サービスの検出を可能にします。アノテーションは、クライアントアプリケーションが接続を確立するために使用できる接続詳細を提供します。
サービスディスカバリーラベル strimzi.io/discovery
は、Service
リソースに対して true
に設定されています。サービスディスカバリーアノテーションには同じキーがあり、各サービスの接続詳細を JSON 形式で提供します。
内部 Kafka ブートストラップサービスの例
apiVersion: v1 kind: Service metadata: annotations: strimzi.io/discovery: |- [ { "port" : 9092, "tls" : false, "protocol" : "kafka", "auth" : "scram-sha-512" }, { "port" : 9093, "tls" : true, "protocol" : "kafka", "auth" : "tls" } ] labels: strimzi.io/cluster: my-cluster strimzi.io/discovery: "true" strimzi.io/kind: Kafka strimzi.io/name: my-cluster-kafka-bootstrap name: my-cluster-kafka-bootstrap spec: #...
HTTP Bridge サービスの例
apiVersion: v1 kind: Service metadata: annotations: strimzi.io/discovery: |- [ { "port" : 8080, "tls" : false, "auth" : "none", "protocol" : "http" } ] labels: strimzi.io/cluster: my-bridge strimzi.io/discovery: "true" strimzi.io/kind: KafkaBridge strimzi.io/name: my-bridge-bridge-service
12.4.1. サービスの接続詳細の返信
サービスを検出するには、コマンドラインまたは対応する API 呼び出しでサービスを取得するときに、ディスカバリーラベルを指定します。
oc get service -l strimzi.io/discovery=true
サービスディスカバリーラベルの取得時に接続詳細が返されます。
12.5. 永続ボリュームからのクラスターの復元
Kafka クラスターは、永続ボリューム (PV) が存在していれば、そこから復元できます。
たとえば、以下の場合に行います。
- namespace が意図せずに削除された後。
- OpenShift クラスター全体が失われた後でも PV がインフラストラクチャーに残っている場合。
12.5.1. namespace が削除された場合の復元
永続ボリュームと namespace の関係により、namespace の削除から復元することが可能です。PersistentVolume
(PV) は、namespace の外部に存在するストレージリソースです。PV は、namespace 内部に存在する PersistentVolumeClaim
(PVC) を使用して Kafka Pod にマウントされます。
PV の回収 (reclaim) ポリシーは、namespace が削除されるときにクラスターに動作方法を指示します。以下に、回収 (reclaim) ポリシーの設定とその結果を示します。
- Delete (デフォルト) に設定すると、PVC が namespace 内で削除されるときに PV が削除されます。
- Retain に設定すると、namespace の削除時に PV は削除されません。
namespace が意図せず削除された場合に PV から復旧できるようにするには、PV 仕様で persistentVolumeReclaimPolicy
プロパティーを使用してポリシーを Delete から Retain にリセットする必要があります。
apiVersion: v1
kind: PersistentVolume
# ...
spec:
# ...
persistentVolumeReclaimPolicy: Retain
または、PV は、関連付けられたストレージクラスの回収 (reclaim) ポリシーを継承できます。ストレージクラスは、動的ボリュームの割り当てに使用されます。
ストレージクラスの reclaimPolicy
プロパティーを設定することで、ストレージクラスを使用する PV が適切な回収 (reclaim) ポリシー で作成されます。ストレージクラスは、storageClassName
プロパティーを使用して PV に対して設定されます。
apiVersion: v1 kind: StorageClass metadata: name: gp2-retain parameters: # ... # ... reclaimPolicy: Retain
apiVersion: v1
kind: PersistentVolume
# ...
spec:
# ...
storageClassName: gp2-retain
Retain を回収 (reclaim) ポリシーとして使用しながら、クラスター全体を削除する場合は、PV を手動で削除する必要があります。そうしないと、PV は削除されず、リソースに不要な経費がかかる原因になります。
12.5.2. OpenShift クラスター喪失からの復旧
クラスターが失われた場合、ディスク/ボリュームのデータがインフラストラクチャー内に保持されていれば、それらのデータを使用してクラスターを復旧できます。PV が復旧可能でそれらが手動で作成されていれば、復旧の手順は namespace の削除と同じです。
12.5.3. 削除したクラスターの永続ボリュームからの復元
この手順では、削除されたクラスターを永続ボリューム (PV) から復元する方法を説明します。
この状況では、Topic Operator はトピックが Kafka に存在することを認識しますが、KafkaTopic
リソースは存在しません。
クラスター再作成の手順を行うには、2 つの方法があります。
すべての
KafkaTopic
リソースを復旧できる場合は、オプション 1 を使用します。これにより、クラスターが起動する前に
KafkaTopic
リソースを復旧することで、該当するトピックが Topic Operator によって削除されないようにする必要があります。すべての
KafkaTopic
リソースを復旧できない場合は、オプション 2 を使用します。この場合、Topic Operator なしでクラスターをデプロイし、Topic Operator のトピックストアメタデータを削除してから、Topic Operator で Kafka クラスターを再デプロイすることで、該当するトピックから
KafkaTopic
リソースを再作成できるようにします。
Topic Operator がデプロイされていない場合は、PersistentVolumeClaim
(PVC) リソースのみを復旧する必要があります。
作業を始める前に
この手順では、データの破損を防ぐために PV を正しい PVC にマウントする必要があります。volumeName
が PVC に指定されており、それが PV の名前に一致する必要があります。
詳細は以下を参照してください。
この手順には、手動での再作成が必要な KafkaUser
リソースの復旧は含まれません。パスワードと証明書を保持する必要がある場合は、KafkaUser
リソースの作成前にシークレットを再作成する必要があります。
手順
クラスターの PV についての情報を確認します。
oc get pv
PV の情報がデータとともに表示されます。
この手順で重要な列を示す出力例:
NAME RECLAIMPOLICY CLAIM pvc-5e9c5c7f-3317-11ea-a650-06e1eadd9a4c ... Retain ... myproject/data-my-cluster-zookeeper-1 pvc-5e9cc72d-3317-11ea-97b0-0aef8816c7ea ... Retain ... myproject/data-my-cluster-zookeeper-0 pvc-5ead43d1-3317-11ea-97b0-0aef8816c7ea ... Retain ... myproject/data-my-cluster-zookeeper-2 pvc-7e1f67f9-3317-11ea-a650-06e1eadd9a4c ... Retain ... myproject/data-0-my-cluster-kafka-0 pvc-7e21042e-3317-11ea-9786-02deaf9aa87e ... Retain ... myproject/data-0-my-cluster-kafka-1 pvc-7e226978-3317-11ea-97b0-0aef8816c7ea ... Retain ... myproject/data-0-my-cluster-kafka-2
- NAME は各 PV の名前を示します。
- RECLAIM POLICY は PV が 保持される ことを示します。
- CLAIM は元の PVC へのリンクを示します。
元の namespace を再作成します。
oc create namespace myproject
元の PVC リソース仕様を再作成し、PVC を該当する PV にリンクします。
以下に例を示します。
apiVersion: v1 kind: PersistentVolumeClaim metadata: name: data-0-my-cluster-kafka-0 spec: accessModes: - ReadWriteOnce resources: requests: storage: 100Gi storageClassName: gp2-retain volumeMode: Filesystem volumeName: pvc-7e1f67f9-3317-11ea-a650-06e1eadd9a4c
PV 仕様を編集して、元の PVC にバインドされた
claimRef
プロパティーを削除します。以下に例を示します。
apiVersion: v1 kind: PersistentVolume metadata: annotations: kubernetes.io/createdby: aws-ebs-dynamic-provisioner pv.kubernetes.io/bound-by-controller: "yes" pv.kubernetes.io/provisioned-by: kubernetes.io/aws-ebs creationTimestamp: "<date>" finalizers: - kubernetes.io/pv-protection labels: failure-domain.beta.kubernetes.io/region: eu-west-1 failure-domain.beta.kubernetes.io/zone: eu-west-1c name: pvc-7e226978-3317-11ea-97b0-0aef8816c7ea resourceVersion: "39431" selfLink: /api/v1/persistentvolumes/pvc-7e226978-3317-11ea-97b0-0aef8816c7ea uid: 7efe6b0d-3317-11ea-a650-06e1eadd9a4c spec: accessModes: - ReadWriteOnce awsElasticBlockStore: fsType: xfs volumeID: aws://eu-west-1c/vol-09db3141656d1c258 capacity: storage: 100Gi claimRef: apiVersion: v1 kind: PersistentVolumeClaim name: data-0-my-cluster-kafka-2 namespace: myproject resourceVersion: "39113" uid: 54be1c60-3319-11ea-97b0-0aef8816c7ea nodeAffinity: required: nodeSelectorTerms: - matchExpressions: - key: failure-domain.beta.kubernetes.io/zone operator: In values: - eu-west-1c - key: failure-domain.beta.kubernetes.io/region operator: In values: - eu-west-1 persistentVolumeReclaimPolicy: Retain storageClassName: gp2-retain volumeMode: Filesystem
この例では、以下のプロパティーが削除されます。
claimRef: apiVersion: v1 kind: PersistentVolumeClaim name: data-0-my-cluster-kafka-2 namespace: myproject resourceVersion: "39113" uid: 54be1c60-3319-11ea-97b0-0aef8816c7ea
Cluster Operator をデプロイします。
oc create -f install/cluster-operator -n my-project
クラスターを再作成します。
クラスターの再作成に必要なすべての
KafkaTopic
リソースがあるかどうかに応じて、以下の手順を実行します。オプション 1: クラスターを失う前に存在した
KafkaTopic
リソースが すべて ある場合 (__consumer_offsets
からコミットされたオフセットなどの内部トピックを含む)。すべての
KafkaTopic
リソースを再作成します。クラスターをデプロイする前にリソースを再作成する必要があります。そうでないと、Topic Operator によってトピックが削除されます。
Kafka クラスターをデプロイします。
以下に例を示します。
oc apply -f kafka.yaml
オプション 2: クラスターを失う前に存在したすべての
KafkaTopic
リソースがない場合。オプション 1 と同様に Kafka クラスターをデプロイしますが、デプロイ前に Kafka リソースから
topicOperator
プロパティーを削除して、Topic Operator がない状態でデプロイします。デプロイメントに Topic Operator が含まれると、Topic Operator によってすべてのトピックが削除されます。
Kafka クラスターから内部トピックストアのトピックを削除します。
oc run kafka-admin -ti --image=registry.redhat.io/amq7/amq-streams-kafka-27-rhel7:1.7.0 --rm=true --restart=Never -- ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic __strimzi-topic-operator-kstreams-topic-store-changelog --delete && ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic __strimzi_store_topic --delete
このコマンドは、Kafka クラスターへのアクセスに使用されるリスナーおよび認証のタイプに対応している必要があります。
Kafka クラスターを
topicOperator
プロパティーで再デプロイして TopicOperator を有効にし、KafkaTopic
リソースを再作成します。以下に例を示します。
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: my-cluster spec: #... entityOperator: topicOperator: {} 1 #...
- 1
- ここで示すデフォルト設定には、追加のプロパティーはありません。「
EntityTopicOperatorSpec
スキーマ参照」に説明されているプロパティーを使用して、必要な設定を指定します。
KafkaTopic
リソースのリストを表示して、復旧を確認します。oc get KafkaTopic
12.6. クライアント設定のチューニング
設定プロパティーを使用して、Kafka プロデューサーおよびコンシューマーのパフォーマンスを最適化します。
最小セットの設定プロパティーが必要ですが、プロパティーを追加または調整して、プロデューサーとコンシューマーが Kafka と対話する方法を変更できます。たとえば、プロデューサーの場合は、クライアントがリアルタイムでデータに応答できるように、メッセージのレイテンシーおよびスループットをチューニングできます。また、設定を変更して、より強力にメッセージの持続性を保証することもできます。
クライアントメトリックを分析して初期設定を行う場所を判断することから始め、必要な設定になるまで段階的に変更を加え、さらに比較を行うことができます。
12.6.1. Kafka プロデューサー設定のチューニング
特定のユースケースに合わせて調整されたオプションのプロパティーとともに、基本的なプロデューサー設定を使用します。
設定を調整してスループットを最大化すると、レイテンシーが増加する可能性があり、その逆も同様です。必要なバランスを取得するために、プロデューサー設定を実験して調整する必要があります。
12.6.1.1. 基本のプロデューサー設定
接続およびシリアライザープロパティーはすべてのプロデューサーに必要です。通常、追跡用のクライアント ID を追加し、プロデューサーで圧縮してリクエストのバッチサイズを減らすことが推奨されます。
基本的なプロデューサー設定には以下が含まれます。
- パーティション内のメッセージの順序は保証されません。
- ブローカーに到達するメッセージの完了通知は持続性を保証しません。
# ... bootstrap.servers=localhost:9092 1 key.serializer=org.apache.kafka.common.serialization.StringSerializer 2 value.serializer=org.apache.kafka.common.serialization.StringSerializer 3 client.id=my-client 4 compression.type=gzip 5 # ...
- 1
- (必須) Kafka ブローカーの host:port ブートストラップサーバーアドレスを使用して Kafka クラスターに接続するようプロデューサーを指示します。プロデューサーはアドレスを使用して、クラスター内のすべてのブローカーを検出し、接続します。サーバーがダウンした場合に備えて、コンマ区切りリストを使用して 2 つまたは 3 つのアドレスを指定しますが、クラスター内のすべてのブローカーのリストを提供する必要はありません。
- 2
- (必須) メッセージがブローカーに送信される前に、各メッセージの鍵をバイトに変換するシリアライザー。
- 3
- (必須) メッセージがブローカーに送信される前に、各メッセージの値をバイトに変換するシリアライザー。
- 4
- (任意) クライアントの論理名。リクエストのソースを特定するためにログおよびメトリクスで使用されます。
- 5
- (任意) メッセージを圧縮するコーデック。これは、送信され、圧縮された形式で格納された後、コンシューマーへの到達時に圧縮解除される可能性があります。圧縮はスループットを改善し、ストレージの負荷を減らすのに役立ちますが、圧縮や圧縮解除のコストが異常に高い低レイテンシーのアプリケーションには不適切である場合があります。
12.6.1.2. データの持続性
メッセージ配信の完了通知を使用して、データの持続性を適用し、メッセージが失われる可能性を最小限に抑えることができます。
# ...
acks=all 1
# ...
- 1
acks=all
と指定すると、パーティションリーダーは、メッセージリクエストが正常に受信されたことを確認する前に、特定数のフォロワーに対してメッセージをレプリケートすることを強制されます。acks=all
の追加のチェックにより、プルデューサーがメッセージを送信してから完了通知を受信するまでのレイテンシーが増加します。
完了通知がプロデューサーに送信される前にメッセージをログに追加する必要のあるブローカーの数は、トピックの min.insync.replicas
設定によって決定されます。最初に、トピックレプリケーション係数を 3 にし、他のブローカーの In-Sync レプリカを 2 にするのが一般的です。この設定では、単一のブローカーが利用できない場合でもプロデューサーは影響を受けません。2 番目のブローカーが利用できなくなると、プロデューサーは完了通知を受信せず、それ以上のメッセージを生成できなくなります。
acks=all
をサポートするトピック設定
# ...
min.insync.replicas=2 1
# ...
- 1
2
In-Sync レプリカを使用します。デフォルトは1
です。
システムに障害が発生すると、バッファーの未送信データが失われる可能性があります。
12.6.1.3. 順序付き配信
メッセージは 1 度だけ配信されるため、べき等プロデューサーは重複を回避します。障害発生時でも配信の順序が維持されるように、ID とシーケンス番号がメッセージに割り当てられます。データの一貫性を維持するために acks=all
を使用している場合は、順序付き配信にべき等を有効にするのは妥当です。
べき等を使った順序付き配信
# ... enable.idempotence=true 1 max.in.flight.requests.per.connection=5 2 acks=all 3 retries=2147483647 4 # ...
パフォーマンスコストが原因で acks=all
およびべき等を使用しない場合は、インフライト (完了確認されない) リクエストの数を 1 に設定して、順序を保持します。そうしないと、Message-A が失敗し、Message-B がブローカーに書き込まれた後にのみ成功する可能性があります。
べき等を使用しない順序付け配信
# ... enable.idempotence=false 1 max.in.flight.requests.per.connection=1 2 retries=2147483647 # ...
12.6.1.4. 信頼性の保証
べき等は、1 つのパーティションへの書き込みを 1 回だけ行う場合に便利です。トランザクションをべき等と使用すると、複数のパーティション全体で 1 度だけ書き込みを行うことができます。
トランザクションは、同じトランザクション ID を使用するメッセージが 1 度作成され、すべてがそれぞれのログに書き込まれるか、何も書き込まれないかのどちらかになることを保証します。
# ... enable.idempotence=true max.in.flight.requests.per.connection=5 acks=all retries=2147483647 transactional.id=UNIQUE-ID 1 transaction.timeout.ms=900000 2 # ...
トランザクションの保証を維持するには、transactional.id
の選択が重要になります。トランザクション ID は、一意なトピックパーティションセットに使用する必要があります。たとえば、トピックパーティション名からトランザクション ID への外部マッピングを使用したり、競合を回避する関数を使用してトピックパーティション名からトランザクション IDを算出したりすると、これを実現できます。
12.6.1.5. スループットおよびレイテンシーの最適化
通常、システムの要件は、指定のレイテンシー内であるメッセージの割合に対して、特定のスループットのターゲットを達成することです。たとえば、95 % のメッセージが 2 秒以内に完了確認される、1 秒あたり 500,000 個のメッセージをターゲットとします。
プロデューサーのメッセージングセマンティック (メッセージの順序付けと持続性) は、アプリケーションの要件によって定義される可能性があります。たとえば、アプリケーションによって提供される重要なプロパティーや保証に反することなく、acks=0
または acks=1
を使用するオプションがない可能性があります。
ブローカーの再起動は、パーセンタイルの高いの統計に大きく影響します。たとえば、長期間では、99% のレイテンシーはブローカーの再起動に関する動作によるものです。これは、ベンチマークを設計したり、本番環境のパフォーマンスで得られた数字を使ってベンチマークを行い、そのパフォーマンスの数字を比較したりする場合に検討する価値があります。
目的に応じて、Kafka はスループットとレイテンシーのプロデューサーパフォーマンスを調整するために多くの設定パラメーターと設定方法を提供します。
- メッセージのバッチ処理 (
linger.ms
およびbatch.size
) -
メッセージのバッチ処理では、同じブローカー宛のメッセージをより多く送信するために、メッセージの送信を遅らせ、単一の生成リクエストでバッチ処理できるようにします。バッチ処理では、スループットを増やすためにレイテンシーを長くして妥協します。時間ベースのバッチ処理は
linger.ms
を使用して設定され、サイズベースのバッチ処理はbatch.size
を使用して設定されます。 - 圧縮処理 (
compression.type
) -
メッセージ圧縮処理により、プロデューサー (メッセージの圧縮に費やされた CPU 時間) のレイテンシーが追加されますが、リクエスト (および場合によってはディスクの書き込み) を小さくするため、スループットが増加します。圧縮に価値があるかどうか、および使用に最適な圧縮は、送信されるメッセージによって異なります。圧縮処理は
KafkaProducer.send()
を呼び出すスレッドで発生するため、アプリケーションでこの方法のレイテンシーが問題になる場合は、より多くのスレッドを使用するよう検討してください。 - パイプライン処理 (
max.in.flight.requests.per.connection
) - パイプライン処理は、以前のリクエストへの応答を受け取る前により多くのリクエストを送信します。通常、パイプライン処理を増やすと、バッチ処理の悪化などの別の問題がスループットに悪影響を与え始めるしきい値まではスループットが増加します。
レイテンシーの短縮
アプリケーションが KafkaProducer.send()
を呼び出す場合、メッセージには以下が行われます。
- インターセプターによる処理。
- シリアライズ。
- パーティションへの割り当て。
- 圧縮処理。
- パーティションごとのキューでメッセージのバッチに追加。
ここで、send()
メソッドが返されます。そのため、send()
がブロックされる時間は、以下によって決定されます。
- インターセプター、シリアライザー、およびパーティションヤーで費やされた時間。
- 使用される圧縮アルゴリズム。
- 圧縮に使用するバッファーの待機に費やされた時間。
バッチは、以下のいずれかが行われるまでキューに残ります。
-
バッチが満杯になる (
batch.size
による)。 -
linger.ms
によって導入された遅延が経過。 - 送信者は他のパーティションのメッセージバッチを同じブローカーに送信しようとし、このバッチの追加も可能。
- プロデューサーがフラッシュまたは閉じられる。
バッチ処理とバッファーの設定を参照して、レイテンシーをブロックする send()
の影響を軽減します。
# ... linger.ms=100 1 batch.size=16384 2 buffer.memory=33554432 3 # ...
スループットの増加
メッセージの配信および送信リクエストの完了までの最大待機時間を調整して、メッセージリクエストのスループットを向上します。
また、カスタムパーティションを作成してデフォルトを置き換えることで、メッセージを指定のパーティションに転送することもできます。
# ... delivery.timeout.ms=120000 1 partitioner.class=my-custom-partitioner 2 # ...
12.6.2. Kafka コンシューマー設定の調整
特定のユースケースに合わせて調整されたオプションのプロパティーとともに、基本的なコンシューマー設定を使用します。
コンシューマーを調整する場合、最も重要なことは、取得するデータ量に効率的に対処できるようにすることです。プロデューサーのチューニングと同様に、コンシューマーが想定どおりに動作するまで、段階的に変更を加える必要があります。
12.6.2.1. 基本的なコンシューマー設定
接続およびデシリアライザープロパティーはすべてのコンシューマーに必要です。通常、追跡用にクライアント ID を追加することが推奨されます。
コンシューマー設定では、後続の設定に関係なく、以下を行います。
- メッセージをスキップまたは再読み取りするようオフセットを変更しない限り、コンシューマーはメッセージを指定のオフセットから取得し、順番に消費します。
- オフセットはクラスターの別のブローカーに送信される可能性があるため、オフセットを Kafka にコミットした場合でも、ブローカーはコンシューマーが応答を処理したかどうかを認識しません。
# ... bootstrap.servers=localhost:9092 1 key.deserializer=org.apache.kafka.common.serialization.StringDeserializer 2 value.deserializer=org.apache.kafka.common.serialization.StringDeserializer 3 client.id=my-client 4 group.id=my-group-id 5 # ...
- 1
- (必須) Kafka ブローカーの host:port ブートストラップサーバーアドレスを使用して、コンシューマーが Kafka クラスターに接続するよう指示しますコンシューマーはアドレスを使用して、クラスター内のすべてのブローカーを検出し、接続します。サーバーがダウンした場合に備えて、コンマ区切りリストを使用して 2 つまたは 3 つのアドレスを指定しますが、クラスター内のすべてのブローカーのリストを提供する必要はありません。ロードバランサーサービスを使用して Kafka クラスターを公開する場合、可用性はロードバランサーによって処理されるため、サービスのアドレスのみが必要になります。
- 2
- (必須) Kafka ブローカーから取得されたバイトをメッセージキーに変換するデシリアライザー。
- 3
- (必須) Kafka ブローカーから取得されたバイトをメッセージ値に変換するデシリアライザー。
- 4
- (任意) クライアントの論理名。リクエストのソースを特定するためにログおよびメトリクスで使用されます。ID は、時間クォータの処理に基づいてコンシューマーにスロットリングを適用するために使用することもできます。
- 5
- (条件) コンシューマーがコンシューマーグループに参加するには、グループ ID が 必要 です。
コンシューマーグループは、特定のトピックから複数のプロデューサーによって生成される、典型的に大量のデータストリームを共有するのに使用します。コンシューマーは group.id
でグループ化され、メッセージをメンバー全体に分散できます。
12.6.2.2. コンシューマーグループを使用したデータ消費のスケーリング
コンシューマーグループは、特定のトピックから 1 つまたは複数のプロデューサーによって生成される、典型的な大量のデータストリームを共有します。group.id
プロパティーが同じコンシューマーは同じグループになります。グループ内のコンシューマーの 1 つがリーダーを選択し、パーティションをグループのコンシューマーにどのように割り当てるかを決定します。各パーティションは 1 つのコンシューマーにのみ割り当てることができます。
コンシューマーの数がパーティションよりも少ない場合、同じ group.id
を持つコンシューマーインスタンスを追加して、データの消費をスケーリングできます。コンシューマーをグループに追加して、パーティションの数より多くしても、スループットは改善されませんが、コンシューマーが機能しなくなったときに予備のコンシューマーを使用できます。より少ないコンシューマーでスループットの目標を達成できれば、リソースを節約できます。
同じコンシューマーグループのコンシューマーは、オフセットコミットとハートビートを同じブローカーに送信します。グループのコンシューマーの数が多いほど、ブローカーのリクエスト負荷が高くなります。
# ...
group.id=my-group-id 1
# ...
- 1
- グループ ID を使用してコンシューマーグループにコンシューマーを追加します。
12.6.2.3. メッセージの順序の保証
Kafka ブローカーは、トピック、パーティション、およびオフセット位置のリストからメッセージを送信するようブローカーに要求するコンシューマーからフェッチリクエストを受け取ります。
コンシューマーは、ブローカーにコミットされたのと同じ順序でメッセージを単一のパーティションで監視します。つまり、Kafka は単一パーティションのメッセージ のみ 順序付けを保証します。逆に、コンシューマーが複数のパーティションからメッセージを消費している場合、コンシューマーによって監視される異なるパーティションのメッセージの順序は、必ずしも送信順序を反映しません。
1 つのトピックからメッセージを厳格に順序付ける場合は、コンシューマーごとに 1 つのパーティションを使用します。
12.6.2.4. スループットおよびレイテンシーの最適化
クライアントアプリケーションが KafkaConsumer.poll()
を呼び出すときに返されるメッセージの数を制御します。
fetch.max.wait.ms
および fetch.min.bytes
プロパティーを使用して、Kafka ブローカーからコンシューマーによって取得されるデータの最小量を増やします。時間ベースのバッチ処理は fetch.max.wait.ms
を使用して設定され、サイズベースのバッチ処理は fetch.min.bytes
を使用して設定されます。
コンシューマーまたはブローカーの CPU 使用率が高い場合、コンシューマーからのリクエストが多すぎる可能性があります。リクエストの数を減らし、メッセージがより大きなバッチで配信されるように、fetch.max.wait.ms
および fetch.min.bytes
プロパティーを調整します。より高い値に調整することでスループットが改善されますが、レイテンシーのコストが発生します。生成されるデータ量が少ない場合、より高い値に調整することもできます。
たとえば、fetch.max.wait.ms
を 500ms に設定し、fetch.min.bytes
を 16384 バイトに設定した場合、Kafka がコンシューマーからフェッチリクエストを受信すると、いずれかのしきい値に最初に到達した時点で応答されます。
逆に、fetch.max.wait.ms
および fetch.min.bytes
プロパティーを低く設定すると、エンドツーエンドのレイテンシーを改善できます。
# ... fetch.max.wait.ms=500 1 fetch.min.bytes=16384 2 # ...
フェッチリクエストサイズの増加によるレイテンシーの短縮
fetch.max.bytes
および max.partition.fetch.bytes
プロパティーを使用して、Kafka ブローカーからコンシューマーによって取得されるデータの最大量を増やします。
fetch.max.bytes
プロパティーは、一度にブローカーから取得されるデータ量の上限をバイト単位で設定します。
max.partition.fetch.bytes
は、各パーティションに返されるデータ量の上限をバイト単位で設定します。これは、常に max.message.bytes
のブローカーまたはトピック設定に設定されたバイト数よりも大きくする必要があります。
クライアントが消費できるメモリーの最大量は、以下のように概算されます。
NUMBER-OF-BROKERS * fetch.max.bytes and NUMBER-OF-PARTITIONS * max.partition.fetch.bytes
メモリー使用量がこれに対応できる場合は、これら 2 つのプロパティーの値を増やすことができます。各リクエストでより多くのデータを許可すると、フェッチリクエストが少なくなるため、レイテンシーが向上されます。
# ... fetch.max.bytes=52428800 1 max.partition.fetch.bytes=1048576 2 # ...
12.6.2.5. オフセットをコミットする際のデータ損失または重複の回避
Kafka の 自動コミットメカニズム により、コンシューマーはメッセージのオフセットを自動的にコミットできます。有効にすると、コンシューマーはブローカーをポーリングして受信したオフセットを 5000ms 間隔でコミットします。
自動コミットのメカニズムは便利ですが、データ損失と重複のリスクが発生します。コンシューマーが多くのメッセージを取得および変換し、自動コミットの実行時にコンシューマーバッファーに処理されたメッセージがある状態でシステムがクラッシュすると、そのデータは失われます。メッセージの処理後、自動コミットの実行前にシステムがクラッシュした場合、リバランス後に別のコンシューマーインスタンスでデータが複製されます。
ブローカーへの次のポーリングの前またはコンシューマーが閉じられる前に、すべてのメッセージが処理された場合は、自動コミットによるデータの損失を回避できます。
データ損失や重複の可能性を最小限にするには、enable.auto.commit
を false
に設定し、クライアントアプリケーションを開発して、オフセットのコミットをさらに制御します。または、auto.commit.interval.ms
を使用して、コミットの間隔を減らすことができます。
# ...
enable.auto.commit=false 1
# ...
- 1
- 自動コミットを false に設定すると、オフセットのコミットの制御が強化されます。
enable.auto.commit
を false
に設定すると、すべて の処理が実行され、メッセージが消費された後にオフセットをコミットできます。たとえば、Kafka commitSync
および commitAsync
コミット API を呼び出すようにアプリケーションを設定できます。
commitSync
API は、ポーリングから返されるメッセージバッチのオフセットをコミットします。バッチのメッセージすべての処理が完了したら API を呼び出します。commitSync
API を使用する場合、アプリケーションはバッチの最後のオフセットがコミットされるまで新しいメッセージをポーリングしません。これがスループットに悪影響する場合は、コミットする頻度を減らすか、commitAsync
API を使用できます。commitAsync
API はブローカーがコミットリクエストに応答するまで待機しませんが、リバランス時にさらに重複が発生するリスクがあります。一般的なアプローチとして、両方のコミット API をアプリケーションで組み合わせ、コンシューマーをシャットダウンまたはリバランスの直前に commitSync
API を使用し、最終コミットが正常に実行されるようにします。
12.6.2.5.1. トランザクションメッセージの制御
プロデューサー側でトランザクション ID を使用し、べき等 (enable.idempotence=true
) を有効にして、1 回のみの配信の保証を検討してください。コンシューマー側で、isolation.level
プロパティーを使用して、コンシューマーによってトランザクションメッセージが読み取られる方法を制御できます。
isolation.level
プロパティーに有効な値は 2 つあります。
-
read_committed
-
read_uncommitted
(デフォルト)
コミットされたトランザクションメッセージのみがコンシューマーによって読み取られるようにするには、read_committed
を使用します。ただし、これによりトランザクションの結果を記録するトランザクションマーカー (committed または aborted) がブローカーによって書き込まれるまで、コンシューマーはメッセージを返すことができないため、エンドツーエンドのレイテンシーが長くなります。
# ...
enable.auto.commit=false
isolation.level=read_committed 1
# ...
- 1
- コミットされたメッセージのみがコンシューマーによって読み取られるように、
read_committed
に設定します。
12.6.2.6. データ損失を回避するための障害からの復旧
session.timeout.ms
および heartbeat.interval.ms
プロパティーを使用して、コンシューマーグループ内のコンシューマー障害をチェックし、復旧するのにかかる時間を設定します。
session.timeout.ms
プロパティーは、コンシューマーグループのコンシュマーが非アクティブであるとみなされ、そのグループのアクティブなコンシューマー間でリバランスがトリガーされる前に、ブローカーと通信できない最大時間をミリ秒単位で指定します。グループのリバランス時に、パーティションはグループのメンバーに再割り当てされます。
heartbeat.interval.ms
プロパティーは、コンシューマーがアクティブで接続されていることを示す、コンシューマーグループコーディネーターへのハートビートチェックの間隔をミリ秒単位で指定します。通常、ハートビートの間隔はセッションタイムアウトの間隔の 3 分の 2 にする必要があります。
session.timeout.ms
プロパティーの値を低く設定すると、失敗するコンシューマーが早期に発見され、リバランスがより迅速に実行されます。ただし、タイムアウトの値を低くしすぎて、ブローカーがハートビートを時間内に受信できず、不必要なリバランスがトリガーされることがないように気を付けてください。
ハートビートの間隔が短くなると、誤ってリバランスを行う可能性が低くなりますが、ハートビートを頻繁に行うとブローカーリソースのオーバーヘッドが増えます。
12.6.2.7. オフセットポリシーの管理
auto.offset.reset
プロパティーを使用して、オフセットをすべてコミットしなかった場合やコミットされたオフセットが有効でないまたは削除された場合の、コンシューマーの動作を制御します。
コンシューマーアプリケーションを初めてデプロイし、既存のトピックからメッセージを読み取る場合について考えてみましょう。group.id
が初めて使用されるため、__consumer_offsets
トピックには、このアプリケーションのオフセット情報は含まれません。新しいアプリケーションは、ログの始めからすべての既存メッセージの処理を開始するか、新しいメッセージのみ処理を開始できます。デフォルトのリセット値は、パーティションの最後から開始する latest
で、一部のメッセージは見逃されることを意味します。データの損失を回避し、処理量を増やすには、auto.offset.reset
を earliest
に設定し、パーティションの最初から開始します。
また、ブローカーに設定されたオフセットの保持期間 (offsets.retention.minutes
) が終了したときにメッセージが失われないようにするため、earliest
オプションを使用することも検討してください。コンシューマーグループまたはスタンドアロンコンシューマーが非アクティブで、保持期間中にオフセットをコミットしない場合、以前にコミットされたオフセットは __consumer_offsets
から削除されます。
# ... heartbeat.interval.ms=3000 1 session.timeout.ms=10000 2 auto.offset.reset=earliest 3 # ...
- 1
- 予想されるリバランスに応じて、ハートビートの間隔を短くして調整します。
- 2
- タイムアウトの期限が切れる前に Kafka ブローカーによってハートビートが受信されなかった場合、コンシューマーはコンシューマーグループから削除され、リバランスが開始されます。ブローカー設定に
group.min.session.timeout.ms
およびgroup.max.session.timeout.ms
がある場合は、セッションタイムアウト値はこの範囲内である必要があります。 - 3
- パーティションの最初に戻り、オフセットがコミットされなかった場合にデータの損失が発生しないようにするには、
earliest
に設定します。
1 つのフェッチリクエストで返されるデータ量が大きい場合、コンシューマーが処理する前にタイムアウトが発生することがあります。この場合は、max.partition.fetch.bytes
の値を低くするか、session.timeout.ms
の値を高くします。
12.6.2.8. リバランスの影響を最小限にする
グループのアクティブなコンシューマー間で行うパーティションのリバランスは、以下にかかる時間です。
- コンシューマーによるオフセットのコミット
- 作成される新しいコンシューマーグループ
- グループリーダーによるグループメンバーへのパーティションの割り当て。
- 割り当てを受け取り、取得を開始するグループのコンシューマー
明らかに、このプロセスは特にコンシューマーグループクラスターのローリング再起動時に繰り返し発生するサービスのダウンタイムを増やします。
このような場合、静的メンバーシップ の概念を使用してリバランスの数を減らすことができます。リバランスによって、コンシューマーグループメンバー全体でトピックパーティションが割り当てられます。静的メンバーシップは永続性を使用し、セッションタイムアウト後の再起動時にコンシューマーインスタンスが認識されるようにします。
コンシューマーグループコーディネーターは、group.instance.id
プロパティーを使用して指定される一意の ID を使用して新しいコンシューマーインスタンスを特定できます。再起動時には、コンシューマーには新しいメンバー ID が割り当てられますが、静的メンバーとして、同じインスタンス ID を使用し、同じトピックパーティションの割り当てが行われます。
コンシューマーアプリケーションが最低でも max.poll.interval.ms
ミリ秒毎にポーリングへの呼び出しを行わない場合、コンシューマーは失敗したと見なされ、リバランスが発生します。アプリケーションがポーリングから返されたすべてレコードを時間内に処理できない場合は、max.poll.interval.ms
プロパティーを使用して、コンシューマーからの新規メッセージのポーリングの間隔をミリ秒単位で指定して、リバランスの発生を防ぎます。または、max.poll.records
プロパティーを使用して、コンシューマーバッファーから返されるレコードの数の上限を設定し、アプリケーションが max.poll.interval.ms
内でより少ないレコードを処理できるようにします。
# ... group.instance.id=UNIQUE-ID 1 max.poll.interval.ms=300000 2 max.poll.records=500 3 # ...
12.7. AMQ Streams のアンインストール
この手順では、AMQ Streams をアンインストールし、デプロイメントに関連するリソースを削除する方法を説明します。
前提条件
この手順を実行するには、デプロイメント用に特別に作成され、AMQ Streams リソースから参照されるリソースを特定します。
このようなリソースには以下があります。
- シークレット (カスタム CA および証明書、Kafka Connect Secrets、その他の Kafka シークレット)
-
ロギング
ConfigMaps
(タイプはexternal
)
これらのリソースは、Kafka
、KafkaConnect
、KafkaConnectS2I
、KafkaMirrorMaker
、または KafkaBridge
設定によって参照されます。
手順
Cluster Operator の
Deployment
、関連するCustomResourceDefinitions
およびRBAC
リソースを削除します。oc delete -f install/cluster-operator
警告CustomResourceDefinitions
を削除すると、対応するカスタムリソース (Kafka
、KafkaConnect
、KafkaConnectS2I
、KafkaMirrorMaker
、またはKafkaBridge
) 、およびそれらに依存するリソース (Deployments、StatefulSets、その他の依存リソース) のガベージコレクションが実行されます。- 前提条件で特定したリソースを削除します。