9.7. Kafka MirrorMaker 2 の設定
KafkaMirrorMaker2
カスタムリソースの spec
プロパティーを更新して、MirrorMaker 2 デプロイメントを設定します。MirrorMaker 2 は、データ消費にソースクラスター設定を使用し、データ出力にターゲットクラスター設定を使用します。
MirrorMaker 2 は、クラスター間のデータ転送を管理する コネクター である Kafka Connect フレームワークに基づいています。
MirrorMaker 2 を設定して、ソースクラスターとターゲットクラスターの接続の詳細を含む Kafka Connect デプロイメントを定義し、一連の MirrorMaker 2 コネクターを実行して接続を確立します。
MirrorMaker 2 は、ソースクラスターとターゲットクラスター間のトピック設定の同期をサポートします。MirrorMaker 2 設定でソーストピックを指定します。MirrorMaker 2 はソーストピックを監視します。MirrorMaker 2 は、ソーストピックへの変更を検出し、リモートトピックに伝達します。変更には、欠けているトピックおよびパーティションの自動作成が含まれる場合があります。
ほとんどの場合、ローカルトピックに書き込み、リモートトピックから読み取ります。リモートトピックでは書き込み操作ができないわけではありませんが、使用しないようにしてください。
設定では以下を指定する必要があります。
- 各 Kafka クラスター
- 認証を含む各クラスターの接続情報
レプリケーションのフローおよび方向
- クラスターからクラスターへ
- トピックからトピックへ
Kafka MirrorMaker 2 クラスター設定オプションの詳細は、Streams for Apache Kafka カスタムリソース API リファレンス を参照してください。
MirrorMaker 2 のリソース設定は、現在非推奨になっている以前のバージョンの MirrorMaker とは異なります。現在、レガシーサポートはないため、リソースは手動で新しい形式に変換する必要があります。
デフォルト設定
MirrorMaker 2 は、レプリケーション係数などのプロパティーのデフォルト設定値を提供します。デフォルトに変更がない最小設定の例は以下のようになります。
MirrorMaker 2 の最小設定
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaMirrorMaker2 metadata: name: my-mirror-maker2 spec: version: 3.7.0 connectCluster: "my-cluster-target" clusters: - alias: "my-cluster-source" bootstrapServers: my-cluster-source-kafka-bootstrap:9092 - alias: "my-cluster-target" bootstrapServers: my-cluster-target-kafka-bootstrap:9092 mirrors: - sourceCluster: "my-cluster-source" targetCluster: "my-cluster-target" sourceConnector: {}
mTLS または SASL 認証を使用して、ソースおよびターゲットクラスターのアクセス制御を設定できます。この手順では、ソースおよびターゲットクラスターに対して mTLS による暗号化および認証を使用する設定を説明します。
KafkaMirrorMaker2
リソースのソースクラスターからレプリケートするトピックとコンシューマーグループを指定できます。これを行うには、topicsPattern
および groupsPattern
プロパティーを使用します。名前のリストを指定したり、正規表現を使用したりできます。デフォルトでは、topicsPattern
および groupsPattern
プロパティーを設定しない場合、すべてのトピックとコンシューマーグループがレプリケートされます。.*
を正規表現として使用して、すべてのトピックとコンシューマーグループを複製することもできます。ただし、クラスターに不要な負荷が余分にかかるのを避けるため、必要なトピックとコンシューマーグループのみを指定するようにしてください。
大量のメッセージ処理
設定を調整して、大量のメッセージを処理できます。詳細は、大量のメッセージの処理 を参照してください。
KafkaMirrorMaker2
カスタムリソース設定の例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaMirrorMaker2 metadata: name: my-mirror-maker2 spec: version: 3.7.0 1 replicas: 3 2 connectCluster: "my-cluster-target" 3 clusters: 4 - alias: "my-cluster-source" 5 authentication: 6 certificateAndKey: certificate: source.crt key: source.key secretName: my-user-source type: tls bootstrapServers: my-cluster-source-kafka-bootstrap:9092 7 tls: 8 trustedCertificates: - certificate: ca.crt secretName: my-cluster-source-cluster-ca-cert - alias: "my-cluster-target" 9 authentication: 10 certificateAndKey: certificate: target.crt key: target.key secretName: my-user-target type: tls bootstrapServers: my-cluster-target-kafka-bootstrap:9092 11 config: 12 config.storage.replication.factor: 1 offset.storage.replication.factor: 1 status.storage.replication.factor: 1 tls: 13 trustedCertificates: - certificate: ca.crt secretName: my-cluster-target-cluster-ca-cert mirrors: 14 - sourceCluster: "my-cluster-source" 15 targetCluster: "my-cluster-target" 16 sourceConnector: 17 tasksMax: 10 18 autoRestart: 19 enabled: true config replication.factor: 1 20 offset-syncs.topic.replication.factor: 1 21 sync.topic.acls.enabled: "false" 22 refresh.topics.interval.seconds: 60 23 replication.policy.class: "org.apache.kafka.connect.mirror.IdentityReplicationPolicy" 24 heartbeatConnector: 25 autoRestart: enabled: true config: heartbeats.topic.replication.factor: 1 26 replication.policy.class: "org.apache.kafka.connect.mirror.IdentityReplicationPolicy" checkpointConnector: 27 autoRestart: enabled: true config: checkpoints.topic.replication.factor: 1 28 refresh.groups.interval.seconds: 600 29 sync.group.offsets.enabled: true 30 sync.group.offsets.interval.seconds: 60 31 emit.checkpoints.interval.seconds: 60 32 replication.policy.class: "org.apache.kafka.connect.mirror.IdentityReplicationPolicy" topicsPattern: "topic1|topic2|topic3" 33 groupsPattern: "group1|group2|group3" 34 resources: 35 requests: cpu: "1" memory: 2Gi limits: cpu: "2" memory: 2Gi logging: 36 type: inline loggers: connect.root.logger.level: INFO readinessProbe: 37 initialDelaySeconds: 15 timeoutSeconds: 5 livenessProbe: initialDelaySeconds: 15 timeoutSeconds: 5 jvmOptions: 38 "-Xmx": "1g" "-Xms": "1g" image: my-org/my-image:latest 39 rack: topologyKey: topology.kubernetes.io/zone 40 template: 41 pod: affinity: podAntiAffinity: requiredDuringSchedulingIgnoredDuringExecution: - labelSelector: matchExpressions: - key: application operator: In values: - postgresql - mongodb topologyKey: "kubernetes.io/hostname" connectContainer: 42 env: - name: OTEL_SERVICE_NAME value: my-otel-service - name: OTEL_EXPORTER_OTLP_ENDPOINT value: "http://otlp-host:4317" tracing: type: opentelemetry 43 externalConfiguration: 44 env: - name: AWS_ACCESS_KEY_ID valueFrom: secretKeyRef: name: aws-creds key: awsAccessKey - name: AWS_SECRET_ACCESS_KEY valueFrom: secretKeyRef: name: aws-creds key: awsSecretAccessKey
- 1
- 常に同じになる Kafka Connect と Mirror Maker 2 のバージョン。
- 2
- タスクを実行するワーカーのレプリカノード数。
- 3
- Kafka Connect の Kafka クラスターエイリアス。ターゲット Kafka クラスターを指定する必要があります。Kafka クラスターは、その内部トピックのために Kafka Connect によって使用されます。
- 4
- 同期される Kafka クラスターの指定。
- 5
- ソースの Kafka クラスターのクラスターエイリアス。
- 6
- ソースクラスターの認証。mTLS、トークンベースの OAuth、SASL ベース SCRAM-SHA-256/SCRAM-SHA-512、または PLAIN として指定します。
- 7
- ソース Kafka クラスターに接続するためのブートストラップサーバー。
- 8
- ソース Kafka クラスターの TLS 証明書が X.509 形式で保存されるキー名のある TLS による暗号化。複数の証明書が同じシークレットに保存されている場合は、複数回リストできます。
- 9
- ターゲット Kafka クラスターのクラスターエイリアス。
- 10
- ターゲット Kafka クラスターの認証は、ソース Kafka クラスターと同様に設定されます。
- 11
- ターゲット Kafka クラスターに接続するためのブートストラップサーバー。
- 12
- Kafka Connect の設定。Streams for Apache Kafka によって直接管理されないプロパティーに限り、標準の Apache Kafka 設定を指定できます。
- 13
- ターゲット Kafka クラスターの TLS による暗号化は、ソース Kafka クラスターと同様に設定されます。
- 14
- MirrorMaker 2 コネクター。
- 15
- MirrorMaker 2 コネクターによって使用されるソースクラスターのクラスターエイリアス。
- 16
- MirrorMaker 2 コネクターによって使用されるターゲットクラスターのクラスターエイリアス。
- 17
- リモートトピックを作成する
MirrorSourceConnector
の設定。デフォルトの設定オプションはconfig
によって上書きされます。 - 18
- コネクターによる作成が可能なタスクの最大数。タスクは、データのレプリケーションを処理し、並行して実行されます。インフラストラクチャーが処理のオーバーヘッドをサポートする場合、この値を大きくするとスループットが向上されます。Kafka Connect は、クラスターのメンバー間でタスクを分散します。ワーカーよりも多くのタスクがある場合は、ワーカーには複数のタスクが割り当てられます。シンクコネクターでは、消費される各トピックパーティションに 1 つタスクがあるようになることを目指します。ソースコネクターでは、並行して実行できるタスクの数は外部システムによって異なる場合もあります。並列処理を実現できない場合、コネクターは最大数より少ないタスクを作成します。
- 19
- 失敗したコネクターとタスクの自動再起動を有効にします。デフォルトでは、再起動の回数は無制限ですが、
maxRestarts
プロパティーを使用して自動再起動の最大回数を設定できます。 - 20
- ターゲットクラスターで作成されるミラーリングされたトピックのレプリケーション係数。
- 21
- ソースおよびターゲットクラスターのオフセットをマップする
MirrorSourceConnector
offset-syncs
内部トピックのレプリケーション係数。 - 22
- ACL ルールの同期が有効になっていると、同期されたトピックに ACL が適用されます。デフォルトは
true
です。この機能は User Operator と互換性がありません。User Operator を使用している場合は、このプロパティーをfalse
に設定します。 - 23
- 新規トピックのチェック頻度を変更する任意設定。デフォルトでは 10 分毎にチェックされます。
- 24
- リモートトピック名の自動変更をオーバーライドするポリシーを追加します。その名前の前にソースクラスターの名前を追加する代わりに、トピックが元の名前を保持します。このオプションの設定は、active/passive バックアップおよびデータ移行に役立ちます。このプロパティーはすべてのコネクターに指定する必要があります。双方向 (アクティブ/アクティブ) レプリケーションの場合、
DefaultReplicationPolicy
クラスを使用してリモートトピックの名前を自動的に変更し、すべてのコネクターにreplication.policy.separator
プロパティーを指定してカスタムセパレーターを追加します。 - 25
- 接続チェックを実行する
MirrorHeartbeatConnector
の設定。デフォルトの設定オプションはconfig
によって上書きされます。 - 26
- ターゲットクラスターで作成されたハートビートトピックのレプリケーション係数。
- 27
- オフセットを追跡する
MirrorCheckpointConnector
の設定。デフォルトの設定オプションはconfig
によって上書きされます。 - 28
- ターゲットクラスターで作成されたチェックポイントトピックのレプリケーション係数。
- 29
- 新規コンシューマーグループのチェック頻度を変更する任意設定。デフォルトでは 10 分毎にチェックされます。
- 30
- コンシューマーグループのオフセットを同期する任意設定。これは、active/passive 設定でのリカバリーに便利です。同期はデフォルトでは有効になっていません。
- 31
- コンシューマーグループオフセットの同期が有効な場合は、同期の頻度を調整できます。
- 32
- オフセット追跡のチェック頻度を調整します。オフセット同期の頻度を変更する場合は、これらのチェックの頻度も調整することを推奨します。
- 33
- コンマ区切りリストまたは正規表現パターンとして定義されたソースクラスターからのトピックレプリケーション。ソースコネクターは指定のトピックをレプリケーションします。チェックポイントコネクターは、指定されたトピックのオフセットを追跡します。ここでは、3 つのトピックを名前でリクエストします。
- 34
- コンマ区切りリストまたは正規表現パターンとして定義されたソースクラスターからのコンシューマーグループのレプリケーション。チェックポイントコネクターは、指定されたコンシューマーグループをレプリケーションします。ここで、3 つのコンシューマーグループを名前で要求します。
- 35
- 現在
cpu
およびmemory
である、サポートされるリソースの予約を要求し、消費可能な最大リソースを指定を制限します。 - 36
- 指定された Kafka ロガーおよびログレベルが ConfigMap を介して直接的に (
inline
) または間接的に (external
) に追加されます。カスタム Log4j 設定は、ConfigMap のlog4j.properties
キーまたはlog4j2.properties
キーの下に配置する必要があります。Kafka Connectlog4j.rootLogger
ロガーでは、ログレベルを INFO、ERROR、WARN、TRACE、DEBUG、FATAL または OFF に設定できます。 - 37
- コンテナーを再起動するタイミング (liveness) およびコンテナーがトラフィックを許可できるタイミング (readiness) を把握するためのヘルスチェック。
- 38
- Kafka MirrorMaker を実行している仮想マシン (VM) のパフォーマンスを最適化するための JVM 設定オプション。
- 39
- 高度なオプション: コンテナーイメージの設定。特別な状況でのみ推奨されます。
- 40
- 特別なオプション: 展開のための Rack awareness 設定。これは、リージョン間ではなく、同じロケーション内でのデプロイメントを目的とした特殊なオプションです。このオプションは、コネクターがリーダーレプリカではなく、最も近いレプリカから消費する場合に使用できます。場合によっては、最も近いレプリカから消費することで、ネットワークの使用率を改善したり、コストを削減したりできます。
topologyKey
は、ラック ID を含むノードラベルと一致する必要があります。この設定で使用される例では、標準のtopology.kubernetes.io/zone
ラベルを使用するゾーンを指定します。最も近いレプリカから消費するには、Kafka ブローカー設定でRackAwareReplicaSelector
を有効にします。 - 41
- テンプレートのカスタマイズ。ここでは、Pod は非アフィニティーでスケジュールされるため、Pod は同じホスト名のノードではスケジュールされません。
- 42
- 分散トレース用に環境変数が設定されます。
- 43
- 分散トレーシングは、OpenTelemetry を使用して有効になります。
- 44
- 環境変数として Kafka MirrorMaker にマウントされた OpenShift Secret の外部設定。設定プロバイダープラグインを使用して、外部ソースから設定値を読み込むこともできます。
9.7.1. アクティブ/アクティブまたはアクティブ/パッシブモードの設定
MirrorMaker 2 は、active/passive または active/active クラスター設定で使用できます。
- アクティブ/アクティブのクラスター設定
- アクティブ/アクティブ設定には、双方向でデータをレプリケーションするアクティブなクラスターが 2 つあります。アプリケーションはいずれかのクラスターを使用できます。各クラスターは同じデータを提供できます。これにより、地理的に異なる場所で同じデータを利用できるようにします。コンシューマーグループは両方のクラスターでアクティブであるため、レプリケーションされたトピックのコンシューマーオフセットはソースクラスターに同期されません。
- active/passive クラスター設定
- active/passive 設定には、passive クラスターにデータをレプリケーションする active クラスターがあります。passive クラスターはスタンバイのままになります。システムに障害が発生した場合に、データ復旧に passive クラスターを使用できます。
プロデューサーとコンシューマーがアクティブなクラスターのみに接続することを前提とします。MirrorMaker 2 クラスターはターゲットごとに必要です。
9.7.1.1. 双方向レプリケーション (active/active)
MirrorMaker 2 アーキテクチャーは、アクティブ/アクティブ クラスター設定での双方向レプリケーションをサポートします。
各クラスターは、source および remote トピックの概念を使用して、別のクラスターのデータをレプリケーションします。同じトピックが各クラスターに保存されるため、リモートトピックの名前は MirrorMaker 2 によってソースクラスターを表すように自動的に変更されます。元のクラスターの名前の先頭には、トピックの名前が追加されます。
図9.1 トピック名の変更
ソースクラスターにフラグを付けると、トピックはそのクラスターにレプリケーションされません。
remote トピックを介したレプリケーションの概念は、データの集約が必要なアーキテクチャーの設定に役立ちます。コンシューマーは、同じクラスター内でソースおよびリモートトピックにサブスクライブできます。これに個別の集約クラスターは必要ありません。
9.7.1.2. 一方向レプリケーション (active/passive)
MirrorMaker 2 アーキテクチャーは、active/passive クラスター設定での一方向レプリケーションをサポートします。
active/passive のクラスター設定を使用してバックアップを作成したり、データを別のクラスターに移行したりできます。この場合、リモートトピックの名前の自動変更は推奨しません。
IdentityReplicationPolicy
をソースコネクター設定に追加することで、名前の自動変更をオーバーライドできます。この設定が適用されると、トピックには元の名前が保持されます。
9.7.2. 複数のインスタンス用の MirrorMaker 2 の設定
デフォルトでは、Streams for Apache Kafka は、MirrorMaker 2 が実行される Kafka Connect フレームワークによって使用される内部トピックのグループ ID と名前を設定します。MirrorMaker 2 の複数のインスタンスを実行し、同じ connectCluster
値を共有する場合は、次の config
プロパティーを使用してこれらのデフォルト設定を変更する必要があります。
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaMirrorMaker2 metadata: name: my-mirror-maker2 spec: connectCluster: "my-cluster-target" clusters: - alias: "my-cluster-target" config: group.id: my-connect-cluster 1 offset.storage.topic: my-connect-cluster-offsets 2 config.storage.topic: my-connect-cluster-configs 3 status.storage.topic: my-connect-cluster-status 4 # ... # ...
group.id
が同じすべてのインスタンスで、3 つのトピックの値を同じにする必要があります。
connectCluster
設定は、Kafka Connect が内部トピックに使用するターゲットの Kafka クラスターのエイリアスを指定します。その結果、connectCluster
、グループ ID、および内部トピックの名前付け設定への変更は、ターゲットの Kafka クラスターに固有のものになります。2 つの MirrorMaker 2 インスタンスが同じソース Kafka クラスターを使用している場合、または MirrorMaker 2 インスタンスごとに connectCluster
設定とターゲットクラスターが異なるアクティブ/アクティブモードを使用している場合は、変更を加える必要はありません。
ただし、複数の MirrorMaker 2 インスタンスが同じ connectCluster
を共有する場合、同じターゲット Kafka クラスターに接続する各インスタンスは同じ値でデプロイされます。実際には、これはすべてのインスタンスがクラスターを形成し、同じ内部トピックを使用することを意味します。
複数のインスタンスが同じ内部トピックを使用しようとすると、予期しないエラーが発生するため、インスタンスごとにこれらのプロパティーの値を変更する必要があります。
9.7.3. MirrorMaker 2 コネクターの設定
Kafka クラスター間のデータの同期を調整する内部コネクターには、MirrorMaker 2 コネクター設定を使用します。
MirrorMaker 2 は次のコネクターで構成されます。
MirrorSourceConnector
-
ソースコネクターは、トピックをソースクラスターからターゲットクラスターにレプリケーションします。また、ACL をレプリケーションし、
MirrorCheckpointConnector
を実行する必要があります。 MirrorCheckpointConnector
- チェックポイントコネクターは定期的にオフセットを追跡します。有効にすると、ソースクラスターとターゲットクラスター間のコンシューマーグループオフセットも同期されます。
MirrorHeartbeatConnector
- ハートビートコネクターは、ソースクラスターとターゲットクラスター間の接続を定期的にチェックします。
以下の表は、コネクタープロパティーと、これらを使用するために設定するコネクターについて説明しています。
プロパティー | sourceConnector | checkpointConnector | heartbeatConnector |
---|---|---|---|
| ✓ | ✓ | ✓ |
| ✓ | ✓ | ✓ |
| ✓ | ✓ | ✓ |
| ✓ | ✓ | |
| ✓ | ✓ | |
| ✓ | ✓ | |
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ |
9.7.3.1. コンシューマーグループオフセットの場所の変更トピック
MirrorMaker 2 は、内部トピックを使用してコンシューマーグループのオフセットを追跡します。
offset-syncs
トピック-
offset-syncs
トピックは、レプリケーションされたトピックパーティションのソースおよびターゲットオフセットをレコードメタデータからマッピングします。 checkpoints
トピック-
checkpoints
トピックは、各コンシューマーグループでレプリケーションされたトピックパーティションのソースおよびターゲットクラスターで、最後にコミットされたオフセットをマッピングします。
これらは MirrorMaker 2 によって内部的に使用されるため、これらのトピックと直接対話することはありません。
MirrorCheckpointConnector
は、オフセット追跡用の チェックポイント を発行します。checkpoints
トピックのオフセットは、設定によって事前に決定された間隔で追跡されます。両方のトピックは、フェイルオーバー時に正しいオフセットの位置からレプリケーションの完全復元を可能にします。
offset-syncs
トピックの場所は、デフォルトで source
クラスターです。offset-syncs.topic.location
コネクター設定を使用して、これを target
クラスターに変更することができます。トピックが含まれるクラスターへの読み取り/書き込みアクセスが必要です。ターゲットクラスターを offset-syncs
トピックの場所として使用すると、ソースクラスターへの読み取りアクセス権しかない場合でも、MirrorMaker 2 を使用できるようになります。
9.7.3.2. コンシューマーグループオフセットの同期
__consumer_offsets
トピックには、各コンシューマーグループのコミットされたオフセットに関する情報が保存されます。オフセットの同期は、ソースクラスターのコンシューマーグループのコンシューマーオフセットをターゲットクラスターのコンシューマーオフセットに定期的に転送します。
オフセットの同期は、特に active/passive 設定で便利です。アクティブなクラスターがダウンした場合、コンシューマーアプリケーションを passive (スタンバイ) クラスターに切り替え、最後に転送されたオフセットの位置からピックアップできます。
トピックオフセットの同期を使用するには、sync.group.offsets.enabled
を checkpoint コネクター設定に追加し、プロパティーを true
に設定して、同期を有効にします。同期はデフォルトで無効になっています。
ソースコネクターで IdentityReplicationPolicy
を使用する場合は、チェックポイントコネクター設定でも設定する必要があります。これにより、ミラーリングされたコンシューマーオフセットが正しいトピックに適用されます。
コンシューマーオフセットは、ターゲットクラスターでアクティブではないコンシューマーグループに対してのみ同期されます。コンシューマーグループがターゲットクラスターにある場合、Synchronization を実行できず、UNKNOWN_MEMBER_ID
エラーが返されます。
同期を有効にすると、ソースクラスターからオフセットの同期が定期的に行われます。この頻度は、sync.group.offsets.interval.seconds
および emit.checkpoints.interval.seconds
をチェックポイントコネクター設定に追加することで変更できます。これらのプロパティーは、コンシューマーグループのオフセットが同期される頻度 (秒単位) と、オフセットを追跡するためにチェックポイントが生成される頻度を指定します。両方のプロパティーのデフォルトは 60 秒です。refresh.groups.interval.seconds
プロパティーを使用して、新規コンシューマーグループのチェック頻度を変更することもできます。デフォルトでは 10 分ごとに実行されます。
同期は時間ベースであるため、コンシューマーによって passive クラスターへ切り替えられると、一部のメッセージが重複する可能性があります。
Java で作成されたアプリケーションがある場合は、RemoteClusterUtils.java
ユーティリティーを使用して、アプリケーションを通じてオフセットを同期できます。ユーティリティーは、checkpoints
トピックからコンシューマーグループのリモートオフセットを取得します。
9.7.3.3. ハートビートコネクターを使用するタイミングの決定
ハートビートコネクターはハートビートを出力して、ソース Kafka クラスターとターゲット Kafka クラスター間の接続を確認します。内部 heartbeat
トピックはソースクラスターからレプリケートされます。つまり、ハートビートコネクターがソースクラスターに接続されている必要があります。heartbeat
トピックはターゲットクラスターに配置されているため、次のことが可能になります。
- データのミラーリング元のすべてのソースクラスターを特定します。
- ミラーリングプロセスの稼働状況と遅延を確認する
これは、プロセスが何らかの理由でスタックしたり停止したりしていないことを確認するのに役立ちます。ハートビートコネクターは、Kafka クラスター間のミラーリングプロセスを監視するための貴重なツールですが、必ずしも使用する必要があるわけではありません。たとえば、デプロイメントのネットワーク遅延が低い場合、またはトピックの数が少ない場合は、ログメッセージやその他の監視ツールを使用してミラーリングプロセスを監視することが推奨されます。ハートビートコネクターを使用しない場合は、MirrorMaker 2 設定からハートビートコネクターを省略してください。
9.7.3.4. MirrorMaker 2 コネクターの設定の調整
MirrorMaker 2 コネクターが正しく動作することを確認するには、コネクター全体で特定の設定を調整してください。具体的には、次のプロパティーが該当するすべてのコネクターで同じ値であることを確認してください。
-
replication.policy.class
-
replication.policy.separator
-
offset-syncs.topic.location
-
topic.filter.class
たとえば、replication.policy.class
の値は、ソース、チェックポイント、およびハートビートコネクターで同じである必要があります。設定が一致していないか欠落していると、データレプリケーションやオフセット同期で問題が発生するため、関連するすべてのコネクターを同じ設定で設定しておくことが重要です。
9.7.4. MirrorMaker 2 コネクターのプロデューサとコンシューマーの設定
MirrorMaker 2 コネクターは、内部プロデューサーとコンシューマーを使用します。必要に応じて、これらのプロデューサーおよびコンシューマーを設定して、デフォルト設定を上書きできます。
たとえば、トピックをターゲットの Kafka クラスターに送信するソースプロデューサーの batch.size
を増やして、大量のデータをより適切に対応できます。
プロデューサおよびコンシューマーの設定オプションは MirrorMaker 2 の実装に依存しており、変更される可能性があります。
次の表では、各コネクターのプロデューサーとコンシューマー、および設定を追加できる場所について説明します。
タイプ | 説明 | 設定 |
---|---|---|
プロデューサー | トピックメッセージをターゲット Kafka クラスターに送信します。大量のデータを処理する場合は、このプロデューサーの設定を調整することを検討してください。 |
|
プロデューサー |
レプリケートされたトピックパーティションのソースオフセットとターゲットオフセットをマップする、 |
|
コンシューマー | ソース Kafka クラスターからトピックメッセージを取得します。 |
|
タイプ | 説明 | 設定 |
---|---|---|
プロデューサー | コンシューマーオフセットチェックポイントを発行します。 |
|
コンシューマー |
|
|
offset-syncs.topic.location
を target
に設定して、ターゲット Kafka クラスターを offset-syncs
トピックの場所として使用できます。
タイプ | 説明 | 設定 |
---|---|---|
プロデューサー | ハートビートを生成します。 |
|
次の例は、プロデューサーとコンシューマーを設定する方法を示しています。
コネクターのプロデューサーとコンシューマーの設定例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaMirrorMaker2 metadata: name: my-mirror-maker2 spec: version: 3.7.0 # ... mirrors: - sourceCluster: "my-cluster-source" targetCluster: "my-cluster-target" sourceConnector: tasksMax: 5 config: producer.override.batch.size: 327680 producer.override.linger.ms: 100 producer.request.timeout.ms: 30000 consumer.fetch.max.bytes: 52428800 # ... checkpointConnector: config: producer.override.request.timeout.ms: 30000 consumer.max.poll.interval.ms: 300000 # ... heartbeatConnector: config: producer.override.request.timeout.ms: 30000 # ...
9.7.5. データ複製タスクの最大数の指定
コネクターは、Kafka にデータを出し入れするタスクを作成します。各コネクターは 1 つ以上のタスクで構成されます。タスクは、タスクを実行するワーカー Pod のグループ全体に分散されます。タスクの数を増やすと、多数のパーティションをレプリケーションするとき、または多数のコンシューマーグループのオフセットを同期するときのパフォーマンスの問題に役立ちます。
タスクは並行して実行されます。ワーカーには 1 つ以上のタスクが割り当てられます。1 つのタスクが 1 つのワーカー Pod によって処理されるため、タスクよりも多くのワーカー Pod は必要ありません。ワーカーよりも多くのタスクがある場合、ワーカーは複数のタスクを処理します。
tasksMax
プロパティーを使用して、MirrorMaker 設定でコネクタータスクの最大数を指定できます。タスクの最大数を指定しない場合、デフォルト設定のタスク数は 1 つです。
ハートビートコネクターは常に単一のタスクを使用します。
ソースおよびチェックポイントコネクターに対して開始されるタスクの数は、可能なタスクの最大数と tasksMax
の値の間の低い値です。ソースコネクターの場合、可能なタスクの最大数は、ソースクラスターからレプリケーションされるパーティションごとに 1 つです。チェックポイントコネクターの場合、可能なタスクの最大数は、ソースクラスターからレプリケーションされるコンシューマーグループごとに 1 つです。タスクの最大数を設定するときは、プロセスをサポートするパーティションの数とハードウェアリソースを考慮してください。
インフラストラクチャーが処理のオーバーヘッドをサポートしている場合、タスクの数を増やすと、スループットと待機時間が向上する可能性があります。たとえば、タスクを追加すると、多数のパーティションまたはコンシューマーグループがある場合に、ソースクラスターのポーリングにかかる時間が短縮されます。
ソースコネクターのタスク数を増やすと、多数のパーティションがある場合に役立ちます。
ソースコネクターのタスク数を増やす
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaMirrorMaker2 metadata: name: my-mirror-maker2 spec: # ... mirrors: - sourceCluster: "my-cluster-source" targetCluster: "my-cluster-target" sourceConnector: tasksMax: 10 # ...
多数のコンシューマーグループがある場合は、チェックポイントコネクターのタスク数を増やすと便利です。
チェックポイントコネクターのタスク数の増加
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaMirrorMaker2 metadata: name: my-mirror-maker2 spec: # ... mirrors: - sourceCluster: "my-cluster-source" targetCluster: "my-cluster-target" checkpointConnector: tasksMax: 10 # ...
デフォルトでは、MirrorMaker 2 は 10 分ごとに新しいコンシューマーグループをチェックします。refresh.groups.interval.seconds
設定を調整して、頻度を変更できます。低く調整するときは注意してください。より頻繁なチェックは、パフォーマンスに悪影響を及ぼす可能性があります。
9.7.5.1. コネクタータスクの動作の確認
Prometheus と Grafana を使用してデプロイメントを監視している場合は、MirrorMaker 2 のパフォーマンスをチェックできます。Streams for Apache Kafka に付属する MirrorMaker 2 Grafana ダッシュボードの例では、タスクとレイテンシーに関連する次のメトリクスが表示されます。
- タスクの数
- レプリケーションのレイテンシー
- オフセット同期のレイテンシー
9.7.6. リモートトピックの ACL ルールの同期
MirrorMaker 2 を Streams for Apache Kafka と併用すると、リモートトピックの ACL ルールを同期できます。ただし、この機能は User Operator を使用していない場合にのみ使用できます。
User Operator を使用せずに type: simple
認可を使用している場合、ブローカーへのアクセスを管理する ACL ルールはリモートトピックにも適用されます。これは、ソーストピックへの読み取りアクセス権を持つユーザーが、リモートの同等のトピックを読み取ることもできることを意味します。
OAuth 2.0 での認可は、このようなリモートトピックへのアクセスをサポートしません。
9.7.7. Kafka MirrorMaker 2 デプロイメントの保護
この手順では、MirrorMaker 2 デプロイメントを保護するために必要な設定の概要を説明します。
ソース Kafka クラスターとターゲット Kafka クラスターには別々の設定が必要です。また、MirrorMaker がソースおよびターゲットの Kafka クラスターに接続するために必要な認証情報を提供するために、個別のユーザー設定が必要です。
Kafka クラスターの場合、OpenShift クラスター内のセキュア接続用の内部リスナーと、OpenShift クラスター外の接続用の外部リスナーを指定します。
認証および認可メカニズムを設定できます。ソースおよびターゲットの Kafka クラスターに実装されたセキュリティーオプションは、MirrorMaker 2 に実装されたセキュリティーオプションと互換性がある必要があります。
クラスターとユーザー認証情報を作成したら、セキュアな接続のために MirrorMaker 設定でそれらを指定します。
この手順では、Cluster Operator によって生成された証明書が使用されますが、独自の証明書をインストール してそれらを置き換えることができます。外部 CA (認証局) によって管理される Kafka リスナー証明書を使用 するようにリスナーを設定することもできます。
作業を開始する前の注意事項
この手順を開始する前に、Streams for Apache Kafka に付属する サンプル設定ファイル を確認してください。これらには、mTLS または SCRAM-SHA-512 認証を使用して MirrorMaker 2 のデプロイメントを保護するための例が含まれています。例では、OpenShift クラスター内で接続するための内部リスナーを指定しています。
この例では、ソースおよびターゲットの Kafka クラスターでのユーザー操作を許可する ACL を含む、完全な認可の設定も提供します。
ソースおよびターゲット Kafka クラスターへのユーザーアクセスを設定する場合、ACL は内部 MirrorMaker 2 コネクターへのアクセス権と、ターゲットクラスター内の基盤となる Kafka Connect フレームワークによって使用されるクラスターグループおよび内部トピックへの読み取り/書き込みアクセス権を割り当てる必要があります。複数のインスタンスに対して MirrorMaker 2 を設定する など、クラスターグループまたは内部トピックの名前を変更した場合は、ACL 設定でその名前を使用します。
簡易認可は、Kafka AclAuthorizer
および StandardAuthorizer
プラグインで管理される ACL ルールを使用し、適切なアクセスレベルを確保します。KafkaUser
リソースに簡易認証を使用するように設定する方法については、AclRule
スキーマリファレンスを参照してください。
前提条件
- Streams for Apache Kafka が実行中である。
- ソースクラスターとターゲットクラスターの namespace が分離されている。
この手順では、ソースとターゲットの Kafka クラスターが別の namespace にインストールされていることを前提としています。Topic Operator を使用する場合は、こちらを行う必要があります。Topoic Operator は、指定された namespace 内の単一クラスターのみをモニタリングします。
クラスターを namespace に分割することにより、クラスターシークレットをコピーして、namespace の外部からアクセスできるようにする必要があります。MirrorMaker 設定でシークレットを参照する必要があります。
手順
2 つの
Kafka
リソースを設定します。1 つはソース Kafka クラスターを保護するためのもので、もう 1 つはターゲット Kafka クラスターを保護するためのものです。認証用のリスナー設定を追加し、認可を有効にすることができます。
この例の場合、内部リスナーは mTLS 暗号化と認証を使用して Kafka クラスター用に設定されています。Kafka の
simple
認可が有効になっています。TLS 暗号化と mTLS 認証を使用したソース Kafka クラスター設定の例
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: my-source-cluster spec: kafka: version: 3.7.0 replicas: 1 listeners: - name: tls port: 9093 type: internal tls: true authentication: type: tls authorization: type: simple config: offsets.topic.replication.factor: 1 transaction.state.log.replication.factor: 1 transaction.state.log.min.isr: 1 default.replication.factor: 1 min.insync.replicas: 1 inter.broker.protocol.version: "3.7" storage: type: jbod volumes: - id: 0 type: persistent-claim size: 100Gi deleteClaim: false zookeeper: replicas: 1 storage: type: persistent-claim size: 100Gi deleteClaim: false entityOperator: topicOperator: {} userOperator: {}
TLS 暗号化と mTLS 認証を使用したターゲット Kafka クラスター設定の例
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: my-target-cluster spec: kafka: version: 3.7.0 replicas: 1 listeners: - name: tls port: 9093 type: internal tls: true authentication: type: tls authorization: type: simple config: offsets.topic.replication.factor: 1 transaction.state.log.replication.factor: 1 transaction.state.log.min.isr: 1 default.replication.factor: 1 min.insync.replicas: 1 inter.broker.protocol.version: "3.7" storage: type: jbod volumes: - id: 0 type: persistent-claim size: 100Gi deleteClaim: false zookeeper: replicas: 1 storage: type: persistent-claim size: 100Gi deleteClaim: false entityOperator: topicOperator: {} userOperator: {}
別の namespace で
Kafka
リソースを作成または更新します。oc apply -f <kafka_configuration_file> -n <namespace>
Cluster Operator はリスナーを作成し、クラスターおよびクライアント認証局 (CA) 証明書を設定して Kafka クラスター内で認証を有効にします。
証明書は、シークレット
<cluster_name>-cluster-ca-cert
に作成されます。2 つの
KafkaUser
リソースを設定します。1 つはソース Kafka クラスターのユーザー用で、もう 1 つはターゲット Kafka クラスターのユーザー用です。-
対応するソースおよびターゲットの Kafka クラスターと同じ認証および認可タイプを設定します。たとえば、ソース Kafka クラスターの
Kafka
設定でtls
認証とsimple
認可タイプを使用した場合は、KafkaUser
設定でも同じものを使用します。 - MirrorMaker 2 に必要な ACL を設定して、ソースおよびターゲットの Kafka クラスターでの操作を許可します。
mTLS 認証のソースユーザー設定の例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaUser metadata: name: my-source-user labels: strimzi.io/cluster: my-source-cluster spec: authentication: type: tls authorization: type: simple acls: # MirrorSourceConnector - resource: # Not needed if offset-syncs.topic.location=target type: topic name: mm2-offset-syncs.my-target-cluster.internal operations: - Create - DescribeConfigs - Read - Write - resource: # Needed for every topic which is mirrored type: topic name: "*" operations: - DescribeConfigs - Read # MirrorCheckpointConnector - resource: type: cluster operations: - Describe - resource: # Needed for every group for which offsets are synced type: group name: "*" operations: - Describe - resource: # Not needed if offset-syncs.topic.location=target type: topic name: mm2-offset-syncs.my-target-cluster.internal operations: - Read
mTLS 認証のターゲットユーザー設定の例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaUser metadata: name: my-target-user labels: strimzi.io/cluster: my-target-cluster spec: authentication: type: tls authorization: type: simple acls: # cluster group - resource: type: group name: mirrormaker2-cluster operations: - Read # access to config.storage.topic - resource: type: topic name: mirrormaker2-cluster-configs operations: - Create - Describe - DescribeConfigs - Read - Write # access to status.storage.topic - resource: type: topic name: mirrormaker2-cluster-status operations: - Create - Describe - DescribeConfigs - Read - Write # access to offset.storage.topic - resource: type: topic name: mirrormaker2-cluster-offsets operations: - Create - Describe - DescribeConfigs - Read - Write # MirrorSourceConnector - resource: # Needed for every topic which is mirrored type: topic name: "*" operations: - Create - Alter - AlterConfigs - Write # MirrorCheckpointConnector - resource: type: cluster operations: - Describe - resource: type: topic name: my-source-cluster.checkpoints.internal operations: - Create - Describe - Read - Write - resource: # Needed for every group for which the offset is synced type: group name: "*" operations: - Read - Describe # MirrorHeartbeatConnector - resource: type: topic name: heartbeats operations: - Create - Describe - Write
注記type
をtls-external
に設定することにより、User Operator の外部で発行された証明書を使用できます。詳細は、KafkaUserSpec
スキーマリファレンス を参照してください。-
対応するソースおよびターゲットの Kafka クラスターと同じ認証および認可タイプを設定します。たとえば、ソース Kafka クラスターの
ソースおよびターゲットの Kafka クラスター用に作成した各 namespace で、
KafkaUser
リソースを作成または更新します。oc apply -f <kafka_user_configuration_file> -n <namespace>
User Operator はクライアント (MirrorMaker) に対応するユーザーを作成すると共に、選択した認証タイプに基づいて、クライアント認証に使用されるセキュリティークレデンシャルを作成します。
User Operator は、
KafkaUser
リソースと同じ名前の新しいシークレットを作成します。シークレットには、mTLS 認証用の秘密鍵と公開鍵が含まれています。公開鍵は、クライアント CA によって署名されたユーザー証明書に含まれます。ソースおよびターゲットの Kafka クラスターに接続するための認証の詳細を使用して
KafkaMirrorMaker2
リソースを設定します。TLS 暗号化と mTLS 認証を使用した MirrorMaker 2 設定の例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaMirrorMaker2 metadata: name: my-mirror-maker-2 spec: version: 3.7.0 replicas: 1 connectCluster: "my-target-cluster" clusters: - alias: "my-source-cluster" bootstrapServers: my-source-cluster-kafka-bootstrap:9093 tls: 1 trustedCertificates: - secretName: my-source-cluster-cluster-ca-cert certificate: ca.crt authentication: 2 type: tls certificateAndKey: secretName: my-source-user certificate: user.crt key: user.key - alias: "my-target-cluster" bootstrapServers: my-target-cluster-kafka-bootstrap:9093 tls: 3 trustedCertificates: - secretName: my-target-cluster-cluster-ca-cert certificate: ca.crt authentication: 4 type: tls certificateAndKey: secretName: my-target-user certificate: user.crt key: user.key config: # -1 means it will use the default replication factor configured in the broker config.storage.replication.factor: -1 offset.storage.replication.factor: -1 status.storage.replication.factor: -1 mirrors: - sourceCluster: "my-source-cluster" targetCluster: "my-target-cluster" sourceConnector: config: replication.factor: 1 offset-syncs.topic.replication.factor: 1 sync.topic.acls.enabled: "false" heartbeatConnector: config: heartbeats.topic.replication.factor: 1 checkpointConnector: config: checkpoints.topic.replication.factor: 1 sync.group.offsets.enabled: "true" topicsPattern: "topic1|topic2|topic3" groupsPattern: "group1|group2|group3"
ターゲット Kafka クラスターと同じ namespace で
KafkaMirrorMaker2
リソースを作成または更新します。oc apply -f <mirrormaker2_configuration_file> -n <namespace_of_target_cluster>
9.7.8. MirrorMaker 2 コネクターの手動停止または一時停止
KafkaMirrorMaker2
リソースを使用して内部 MirrorMaker コネクターを設定している場合は、state
設定を使用してコネクターを停止または一時停止します。コネクターとタスクがインスタンス化されたままになる一時停止状態とは対照的に、コネクターを停止すると設定のみが保持され、アクティブなプロセスは保持されません。コネクター実行の停止は、単に一時停止するよりも長時間停止する場合に適しています。一時停止されたコネクターはすぐに再開されますが、停止されたコネクターにはメモリーとリソースが解放されるという利点があります。
state
設定は、KafkaMirrorMaker2ConnectorSpec
スキーマの (非推奨) pause
設定を置き換えるもので、コネクターでの一時停止を許可します。これまでに pause
設定を使用してコネクターを一時停止した場合には、競合の回避目的にのみ state
設定の使用に移行することを推奨します。
前提条件
- Cluster Operator が稼働中である。
手順
再起動する MirrorMaker 2 コネクターを制御する
KafkaMirrorMaker2
カスタムリソースの名前を見つけます。oc get KafkaMirrorMaker2
KafkaMirrorMaker2
リソースを編集して、コネクターを停止または一時停止します。MirrorMaker 2 コネクターを停止するための設定例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaMirrorMaker2 metadata: name: my-mirror-maker2 spec: version: 3.7.0 replicas: 3 connectCluster: "my-cluster-target" clusters: # ... mirrors: - sourceCluster: "my-cluster-source" targetCluster: "my-cluster-target" sourceConnector: tasksMax: 10 autoRestart: enabled: true state: stopped # ...
state
設定をstopped
またはpaused
に変更します。このプロパティーが設定されていない場合のコネクターのデフォルトの状態はrunning
です。変更を
KafkaMirrorMaker2
設定に適用します。state
をrunning
に変更するか、設定を削除して、コネクターを再開できます。
あるいは、Kafka Connect API を公開し、stop
エンドポイントと 一時pause
エンドポイントを使用してコネクターの実行を停止することもできます。たとえば、PUT /connectors/<connector_name>/stop
などです。その後、resume
エンドポイントを使用して再起動できます。
9.7.9. MirrorMaker 2 コネクターの手動での再起動
strimzi.io/restart-connector
アノテーションを使用して、MirrorMaker 2 コネクターの再起動を手動でトリガーします。
前提条件
- Cluster Operator が稼働中である。
手順
再起動する Kafka MirrorMaker 2 コネクターを制御する
KafkaMirrorMaker2
カスタムリソースの名前を見つけます。oc get KafkaMirrorMaker2
KafkaMirrorMaker2
カスタムリソースから再起動する Kafka MirrorMaker 2 コネクターの名前を見つけます。oc describe KafkaMirrorMaker2 <mirrormaker_cluster_name>
コネクターの名前を使用して、OpenShift の
KafkaMirrorMaker2
リソースにアノテーションを付けてコネクターを再起動します。oc annotate KafkaMirrorMaker2 <mirrormaker_cluster_name> "strimzi.io/restart-connector=<mirrormaker_connector_name>"
この例では、
my-mirror-maker-2
クラスター内のmy-connector
コネクターが再起動されます。oc annotate KafkaMirrorMaker2 my-mirror-maker-2 "strimzi.io/restart-connector=my-connector"
次の調整が発生するまで待ちます (デフォルトでは 2 分ごとです)。
アノテーションが調整プロセスによって検出されているかぎり、MirrorMaker 2 コネクターは再起動されます。MirrorMaker 2 がリクエストを受け入れると、アノテーションは
KafkaMirrorMaker2
カスタムリソースから削除されます。
9.7.10. MirrorMaker 2 コネクタータスクの手動での再起動
strimzi.io/restart-connector-task
アノテーションを使用して、MirrorMaker 2 コネクターの再起動を手動でトリガーします。
前提条件
- Cluster Operator が稼働中である。
手順
再起動する MirrorMaker 2 コネクタータスクを制御する
KafkaMirrorMaker2
カスタムリソースの名前を見つけます。oc get KafkaMirrorMaker2
KafkaMirrorMaker2
カスタムリソースから、コネクターの名前と再起動するタスクの ID を検索します。oc describe KafkaMirrorMaker2 <mirrormaker_cluster_name>
タスク ID は 0 から始まる負の値ではない整数です。
名前と ID を使用して、OpenShift の
KafkaMirrorMaker2
リソースにアノテーションを付けてコネクタータスクを再開します。oc annotate KafkaMirrorMaker2 <mirrormaker_cluster_name> "strimzi.io/restart-connector-task=<mirrormaker_connector_name>:<task_id>"
この例では、
my-mirror-maker-2
クラスター内のmy-connector
コネクターのタスク0
が再起動されます。oc annotate KafkaMirrorMaker2 my-mirror-maker-2 "strimzi.io/restart-connector-task=my-connector:0"
次の調整が発生するまで待ちます (デフォルトでは 2 分ごとです)。
アノテーションが調整プロセスによって検出されているかぎり、MirrorMaker 2 コネクタータスクが再起動されます。MirrorMaker 2 がリクエストを受け入れると、アノテーションは
KafkaMirrorMaker2
カスタムリソースから削除されます。