10.9. Kafka MirrorMaker 2 の設定


KafkaMirrorMaker2 カスタムリソースの spec プロパティーを更新して、MirrorMaker 2 デプロイメントを設定します。MirrorMaker 2 は、データ消費にソースクラスター設定を使用し、データ出力にターゲットクラスター設定を使用します。

MirrorMaker 2 は、クラスター間のデータ転送を管理する コネクター である Kafka Connect フレームワークに基づいています。

MirrorMaker 2 を設定して、ソースクラスターとターゲットクラスターの接続の詳細を含む Kafka Connect デプロイメントを定義し、一連の MirrorMaker 2 コネクターを実行して接続を確立します。

MirrorMaker 2 は、ソースクラスターとターゲットクラスター間のトピック設定の同期をサポートします。MirrorMaker 2 設定でソーストピックを指定します。MirrorMaker 2 はソーストピックを監視します。MirrorMaker 2 は、ソーストピックへの変更を検出し、リモートトピックに伝達します。変更には、欠けているトピックおよびパーティションの自動作成が含まれる場合があります。

注記

ほとんどの場合、ローカルトピックに書き込み、リモートトピックから読み取ります。リモートトピックでは書き込み操作ができないわけではありませんが、使用しないようにしてください。

設定では以下を指定する必要があります。

  • 各 Kafka クラスター
  • 認証を含む各クラスターの接続情報
  • レプリケーションのフローおよび方向

    • クラスターからクラスターへ
    • トピックからトピックへ

Kafka MirrorMaker 2 クラスター設定オプションの詳細は、Streams for Apache Kafka カスタムリソース API リファレンス を参照してください。

デフォルト設定

MirrorMaker 2 は、レプリケーション係数などのプロパティーのデフォルト設定値を提供します。デフォルトに変更がない最小設定の例は以下のようになります。

MirrorMaker 2 の最小設定

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
  name: my-mirror-maker2
spec:
  version: 4.0.0
  connectCluster: "my-cluster-target"
  clusters:
  - alias: "my-cluster-source"
    bootstrapServers: my-cluster-source-kafka-bootstrap:9092
  - alias: "my-cluster-target"
    bootstrapServers: my-cluster-target-kafka-bootstrap:9092
  mirrors:
  - sourceCluster: "my-cluster-source"
    targetCluster: "my-cluster-target"
    sourceConnector: {}

mTLS または SASL 認証を使用して、ソースおよびターゲットクラスターのアクセス制御を設定できます。この手順では、ソースおよびターゲットクラスターに対して mTLS による暗号化および認証を使用する設定を説明します。

KafkaMirrorMaker2 リソースのソースクラスターからレプリケートするトピックとコンシューマーグループを指定できます。これを行うには、topicsPattern および groupsPattern プロパティーを使用します。名前のリストを指定したり、正規表現を使用したりできます。デフォルトでは、topicsPattern および groupsPattern プロパティーを設定しない場合、すべてのトピックとコンシューマーグループがレプリケートされます。".*" を正規表現として使用して、すべてのトピックとコンシューマーグループを複製することもできます。ただし、クラスターに不要な負荷が余分にかかるのを避けるため、必要なトピックとコンシューマーグループのみを指定するようにしてください。

大量のメッセージ処理

設定を調整して、大量のメッセージを処理できます。詳細は、大量のメッセージの処理 を参照してください。

KafkaMirrorMaker2 カスタムリソース設定の例

# Basic configuration (required)
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
  name: my-mirror-maker2
# Deployment specifications
spec:
  # Replicas (required)
  replicas: 3 
1

  # Connect cluster name (required)
  connectCluster: "my-cluster-target" 
2

  # Cluster configurations (required)
  clusters: 
3

    - alias: "my-cluster-source" 
4

      # Authentication (optional)
      authentication: 
5

        certificateAndKey:
          certificate: source.crt
          key: source.key
          secretName: my-user-source
        type: tls
      bootstrapServers: my-cluster-source-kafka-bootstrap:9092 
6

      # TLS configuration (optional)
      tls: 
7

        trustedCertificates:
          - pattern: "*.crt"
            secretName: my-cluster-source-cluster-ca-cert
    - alias: "my-cluster-target" 
8

      # Authentication (optional)
      authentication: 
9

        certificateAndKey:
          certificate: target.crt
          key: target.key
          secretName: my-user-target
        type: tls
      bootstrapServers: my-cluster-target-kafka-bootstrap:9092 
10

      # Kafka Connect configuration (optional)
      config: 
11

        config.storage.replication.factor: 1
        offset.storage.replication.factor: 1
        status.storage.replication.factor: 1
      # TLS configuration (optional)
      tls: 
12

        trustedCertificates:
          - pattern: "*.crt"
            secretName: my-cluster-target-cluster-ca-cert
  # Mirroring configurations (required)
  mirrors: 
13

    - sourceCluster: "my-cluster-source" 
14

      targetCluster: "my-cluster-target" 
15

      # Topic and group patterns (required)
      topicsPattern: "topic1|topic2|topic3" 
16

      groupsPattern: "group1|group2|group3" 
17

      # Source connector configuration (required)
      sourceConnector: 
18

        tasksMax: 10 
19

        autoRestart: 
20

          enabled: true
        config:
          replication.factor: 1 
21

          offset-syncs.topic.replication.factor: 1 
22

          sync.topic.acls.enabled: "false" 
23

          refresh.topics.interval.seconds: 60 
24

          replication.policy.class: "org.apache.kafka.connect.mirror.IdentityReplicationPolicy" 
25

      # Heartbeat connector configuration (optional)
      heartbeatConnector: 
26

        autoRestart:
          enabled: true
        config:
          heartbeats.topic.replication.factor: 1 
27

          replication.policy.class: "org.apache.kafka.connect.mirror.IdentityReplicationPolicy"
      # Checkpoint connector configuration (optional)
      checkpointConnector: 
28

        autoRestart:
          enabled: true
        config:
          checkpoints.topic.replication.factor: 1 
29

          refresh.groups.interval.seconds: 600 
30

          sync.group.offsets.enabled: true 
31

          sync.group.offsets.interval.seconds: 60 
32

          emit.checkpoints.interval.seconds: 60 
33

          replication.policy.class: "org.apache.kafka.connect.mirror.IdentityReplicationPolicy"
  # Kafka version (recommended)
  version: 4.0.0 
34

  # Resources requests and limits (recommended)
  resources: 
35

    requests:
      cpu: "1"
      memory: 2Gi
    limits:
      cpu: "2"
      memory: 2Gi
  # Logging configuration (optional)
  logging: 
36

    type: inline
    loggers:
      # Kafka 4.0+ uses Log4j2
      rootLogger.level: INFO
  # Readiness probe (optional)
  readinessProbe: 
37

    initialDelaySeconds: 15
    timeoutSeconds: 5
  # Liveness probe (optional)
  livenessProbe:
    initialDelaySeconds: 15
    timeoutSeconds: 5
  # JVM options (optional)
  jvmOptions: 
38

    "-Xmx": "1g"
    "-Xms": "1g"
  # Custom image (optional)
  image: my-org/my-image:latest 
39

  # Rack awareness (optional)
  rack:
    topologyKey: topology.kubernetes.io/zone 
40

  # Pod template (optional)
  template: 
41

    pod:
      affinity:
        podAntiAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            - labelSelector:
                matchExpressions:
                  - key: application
                    operator: In
                    values:
                      - postgresql
                      - mongodb
              topologyKey: "kubernetes.io/hostname"
    connectContainer: 
42

      env:
        - name: OTEL_SERVICE_NAME
          value: my-otel-service
        - name: OTEL_EXPORTER_OTLP_ENDPOINT
          value: "http://otlp-host:4317"
  # Tracing configuration (optional)
  tracing:
    type: opentelemetry 
43

1
タスクを実行するワーカーのレプリカノード数。
2
Kafka Connect の Kafka クラスターエイリアス。ターゲット Kafka クラスターを指定する必要があります。Kafka クラスターは、その内部トピックのために Kafka Connect によって使用されます。
3
同期される Kafka クラスターの指定。
4
ソースの Kafka クラスターのクラスターエイリアス。
5
ソースクラスターの認証。tlsscram-sha-256scram-sha-512plain、または oauth として指定されます。認証設定の詳細は、KafkaMirrorMaker2Spec スキーマプロパティー を参照してください。
6
ソース Kafka クラスターに接続するためのブートストラップアドレス。アドレスの形式は <cluster_name>-kafka-bootstrap:<port_number> です。Kafka クラスターは、Streams for Apache Kafka によって管理する必要はなく、OpenShift クラスターにデプロイする必要もありません。
7
指定されたシークレット内に X.509 形式で保存されている信頼できる証明書を使用した、Kafka クラスターへの暗号化接続の TLS 設定。
8
ターゲット Kafka クラスターのクラスターエイリアス。
9
ターゲット Kafka クラスターの認証は、ソース Kafka クラスターと同様に設定されます。
10
ターゲット Kafka クラスターに接続するためのブートストラップアドレス。アドレスの形式は <cluster_name>-kafka-bootstrap:<port_number> です。Kafka クラスターは、Streams for Apache Kafka によって管理する必要はなく、OpenShift クラスターにデプロイする必要もありません。
11
Kafka Connect の設定。Streams for Apache Kafka によって直接管理されないプロパティーに限り、標準の Apache Kafka 設定を指定できます。
12
ターゲット Kafka クラスターの TLS による暗号化は、ソース Kafka クラスターと同様に設定されます。
13
MirrorMaker 2 コネクター。
14
MirrorMaker 2 コネクターによって使用されるソースクラスターのクラスターエイリアス。
15
MirrorMaker 2 コネクターによって使用されるターゲットクラスターのクラスターエイリアス。
16
コンマ区切りリストまたは正規表現パターンとして定義されたソースクラスターからのトピックレプリケーション。ソースコネクターは指定のトピックをレプリケーションします。チェックポイントコネクターは、指定されたトピックのオフセットを追跡します。ここでは、3 つのトピックを名前でリクエストします。
17
コンマ区切りリストまたは正規表現パターンとして定義されたソースクラスターからのコンシューマーグループのレプリケーション。チェックポイントコネクターは、指定されたコンシューマーグループをレプリケーションします。ここで、3 つのコンシューマーグループを名前で要求します。
18
リモートトピックを作成する MirrorSourceConnector の設定。デフォルトの設定オプションは config によって上書きされます。
19
コネクターによる作成が可能なタスクの最大数。タスクは、データのレプリケーションを処理し、並行して実行されます。インフラストラクチャーが処理のオーバーヘッドをサポートする場合、この値を大きくするとスループットが向上されます。Kafka Connect は、クラスターのメンバー間でタスクを分散します。ワーカーよりも多くのタスクがある場合は、ワーカーには複数のタスクが割り当てられます。シンクコネクターでは、消費される各トピックパーティションに 1 つタスクがあるようになることを目指します。ソースコネクターでは、並行して実行できるタスクの数は外部システムによって異なる場合もあります。並列処理を実現できない場合、コネクターは最大数より少ないタスクを作成します。
20
失敗したコネクターとタスクの自動再起動を有効にします。デフォルトでは、再起動の回数は無制限ですが、maxRestarts プロパティーを使用して自動再起動の最大回数を設定できます。
21
ターゲットクラスターで作成されるミラーリングされたトピックのレプリケーション係数。
22
ソースおよびターゲットクラスターのオフセットをマップする MirrorSourceConnector offset-syncs 内部トピックのレプリケーション係数。
23
ACL ルールの同期が有効になっていると、同期されたトピックに ACL が適用されます。デフォルトは true です。この機能は User Operator と互換性がありません。User Operator を使用している場合は、このプロパティーを false に設定します。
24
新規トピックのチェック頻度を変更する任意設定。デフォルトでは 10 分毎にチェックされます。
25
リモートトピック名の自動変更をオーバーライドするポリシーを追加します。その名前の前にソースクラスターの名前を追加する代わりに、トピックが元の名前を保持します。このオプションの設定は、active/passive バックアップおよびデータ移行に役立ちます。このプロパティーはすべてのコネクターに指定する必要があります。双方向 (アクティブ/アクティブ) レプリケーションの場合、DefaultReplicationPolicy クラスを使用してリモートトピックの名前を自動的に変更し、すべてのコネクターに replication.policy.separator プロパティーを指定してカスタムセパレーターを追加します。
26
接続チェックを実行する MirrorHeartbeatConnector の設定。デフォルトの設定オプションは config によって上書きされます。
27
ターゲットクラスターで作成されたハートビートトピックのレプリケーション係数。
28
オフセットを追跡する MirrorCheckpointConnector の設定。デフォルトの設定オプションは config によって上書きされます。
29
ターゲットクラスターで作成されたチェックポイントトピックのレプリケーション係数。
30
新規コンシューマーグループのチェック頻度を変更する任意設定。デフォルトでは 10 分毎にチェックされます。
31
コンシューマーグループのオフセットを同期する任意設定。これは、active/passive 設定でのリカバリーに便利です。同期はデフォルトでは有効になっていません。
32
コンシューマーグループオフセットの同期が有効な場合は、同期の頻度を調整できます。
33
オフセット追跡のチェック頻度を調整します。オフセット同期の頻度を変更する場合は、これらのチェックの頻度も調整することを推奨します。
34
常に同じになる Kafka Connect と MirrorMaker 2 のバージョン。
35
現在 cpu および memory である、サポートされるリソースの予約を要求し、消費可能な最大リソースを指定を制限します。
36
指定された Kafka ロガーおよびログレベルが ConfigMap を介して直接的に (inline) または間接的に (external) に追加されます。カスタム Log4j 設定は、ConfigMaplog4j2.properties キーの下に配置する必要があります。ログレベルは、INFOERRORWARNTRACEDEBUGFATAL、または OFF に設定できます。
37
コンテナーを再起動するタイミング (liveness) およびコンテナーがトラフィックを許可できるタイミング (readiness) を把握するためのヘルスチェック。
38
Kafka MirrorMaker を実行している仮想マシン (VM) のパフォーマンスを最適化するための JVM 設定オプション。
39
高度なオプション: コンテナーイメージの設定。特別な状況でのみ推奨されます。
40
特別なオプション: 展開のための Rack awareness 設定。これは、リージョン間ではなく、同じロケーション内でのデプロイメントを目的とした特殊なオプションです。このオプションは、コネクターがリーダーレプリカではなく、最も近いレプリカから消費する場合に使用できます。場合によっては、最も近いレプリカから消費することで、ネットワークの使用率を改善したり、コストを削減したりできます。topologyKey は、ラック ID を含むノードラベルと一致する必要があります。この設定で使用される例では、標準の topology.kubernetes.io/zone ラベルを使用するゾーンを指定します。最も近いレプリカから消費するには、Kafka ブローカー設定で RackAwareReplicaSelector を有効にします。
41
テンプレートのカスタマイズ。ここでは、Pod は非アフィニティーでスケジュールされるため、Pod は同じホスト名のノードではスケジュールされません。
42
分散トレース用に環境変数が設定されます。
43
分散トレーシングは、OpenTelemetry を使用して有効になります。

10.9.1. active/active または active/passive モードの設定

MirrorMaker 2 は、active/passive または active/active クラスター設定で使用できます。

アクティブ/アクティブのクラスター設定
アクティブ/アクティブ設定には、双方向でデータをレプリケーションするアクティブなクラスターが 2 つあります。アプリケーションはいずれかのクラスターを使用できます。各クラスターは同じデータを提供できます。これにより、地理的に異なる場所で同じデータを利用できるようにします。コンシューマーグループは両方のクラスターでアクティブであるため、レプリケーションされたトピックのコンシューマーオフセットはソースクラスターに同期されません。
active/passive クラスター設定
active/passive 設定には、passive クラスターにデータをレプリケーションする active クラスターがあります。passive クラスターはスタンバイのままになります。システムに障害が発生した場合に、データ復旧に passive クラスターを使用できます。

プロデューサーとコンシューマーがアクティブなクラスターのみに接続することを前提とします。MirrorMaker 2 クラスターはターゲットごとに必要です。

10.9.1.1. 双方向レプリケーション (active/active)

MirrorMaker 2 アーキテクチャーは、アクティブ/アクティブ クラスター設定での双方向レプリケーションをサポートします。

各クラスターは、source および remote トピックの概念を使用して、別のクラスターのデータをレプリケーションします。同じトピックが各クラスターに保存されるため、リモートトピックの名前は MirrorMaker 2 によってソースクラスターを表すように自動的に変更されます。元のクラスターの名前の先頭には、トピックの名前が追加されます。

図10.1 トピック名の変更

MirrorMaker 2 双方向アーキテクチャー

ソースクラスターにフラグを付けると、トピックはそのクラスターにレプリケーションされません。

remote トピックを介したレプリケーションの概念は、データの集約が必要なアーキテクチャーの設定に役立ちます。コンシューマーは、同じクラスター内でソースおよびリモートトピックにサブスクライブできます。これに個別の集約クラスターは必要ありません。

10.9.1.2. 一方向レプリケーション (active/passive)

MirrorMaker 2 アーキテクチャーは、active/passive クラスター設定での一方向レプリケーションをサポートします。

active/passive のクラスター設定を使用してバックアップを作成したり、データを別のクラスターに移行したりできます。この場合、リモートトピックの名前の自動変更は推奨しません。

IdentityReplicationPolicy をソースコネクター設定に追加することで、名前の自動変更をオーバーライドできます。この設定が適用されると、トピックには元の名前が保持されます。

Red Hat logoGithubredditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

Red Hat ドキュメントについて

Legal Notice

Theme

© 2026 Red Hat
トップに戻る