10.10. Kafka MirrorMaker 2 の設定


KafkaMirrorMaker2 カスタムリソースの spec プロパティーを更新して、MirrorMaker 2 デプロイメントを設定します。MirrorMaker 2 は、データ消費にソースクラスター設定を使用し、データ出力にターゲットクラスター設定を使用します。

MirrorMaker 2 は、クラスター間のデータ転送を管理する コネクター である Kafka Connect フレームワークに基づいています。

MirrorMaker 2 を設定して、ソースクラスターとターゲットクラスターの接続の詳細を含む Kafka Connect デプロイメントを定義し、一連の MirrorMaker 2 コネクターを実行して接続を確立します。

MirrorMaker 2 は、ソースクラスターとターゲットクラスター間のトピック設定の同期をサポートします。MirrorMaker 2 設定でソーストピックを指定します。MirrorMaker 2 はソーストピックを監視します。MirrorMaker 2 は、ソーストピックへの変更を検出し、リモートトピックに伝達します。変更には、欠けているトピックおよびパーティションの自動作成が含まれる場合があります。

注記

ほとんどの場合、ローカルトピックに書き込み、リモートトピックから読み取ります。リモートトピックでは書き込み操作ができないわけではありませんが、使用しないようにしてください。

設定では以下を指定する必要があります。

  • 各 Kafka クラスター
  • 認証を含む各クラスターの接続情報
  • レプリケーションのフローおよび方向

    • クラスターからクラスターへ
    • トピックからトピックへ

Kafka MirrorMaker 2 クラスター設定オプションの詳細は、Streams for Apache Kafka カスタムリソース API リファレンス を参照してください。

注記

MirrorMaker 2 のリソース設定は、現在非推奨になっている以前のバージョンの MirrorMaker とは異なります。現在、レガシーサポートはないため、リソースは手動で新しい形式に変換する必要があります。

デフォルト設定

MirrorMaker 2 は、レプリケーション係数などのプロパティーのデフォルト設定値を提供します。デフォルトに変更がない最小設定の例は以下のようになります。

MirrorMaker 2 の最小設定

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
  name: my-mirror-maker2
spec:
  version: 3.9.0
  connectCluster: "my-cluster-target"
  clusters:
  - alias: "my-cluster-source"
    bootstrapServers: my-cluster-source-kafka-bootstrap:9092
  - alias: "my-cluster-target"
    bootstrapServers: my-cluster-target-kafka-bootstrap:9092
  mirrors:
  - sourceCluster: "my-cluster-source"
    targetCluster: "my-cluster-target"
    sourceConnector: {}
Copy to Clipboard Toggle word wrap

mTLS または SASL 認証を使用して、ソースおよびターゲットクラスターのアクセス制御を設定できます。この手順では、ソースおよびターゲットクラスターに対して mTLS による暗号化および認証を使用する設定を説明します。

KafkaMirrorMaker2 リソースのソースクラスターからレプリケートするトピックとコンシューマーグループを指定できます。これを行うには、topicsPattern および groupsPattern プロパティーを使用します。名前のリストを指定したり、正規表現を使用したりできます。デフォルトでは、topicsPattern および groupsPattern プロパティーを設定しない場合、すべてのトピックとコンシューマーグループがレプリケートされます。".*" を正規表現として使用して、すべてのトピックとコンシューマーグループを複製することもできます。ただし、クラスターに不要な負荷が余分にかかるのを避けるため、必要なトピックとコンシューマーグループのみを指定するようにしてください。

大量のメッセージ処理

設定を調整して、大量のメッセージを処理できます。詳細は、大量のメッセージの処理 を参照してください。

KafkaMirrorMaker2 カスタムリソース設定の例

# Basic configuration (required)
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
  name: my-mirror-maker2
# Deployment specifications
spec:
  # Replicas (required)
  replicas: 3 
1

  # Connect cluster name (required)
  connectCluster: "my-cluster-target" 
2

  # Cluster configurations (required)
  clusters: 
3

    - alias: "my-cluster-source" 
4

      # Authentication (optional)
      authentication: 
5

        certificateAndKey:
          certificate: source.crt
          key: source.key
          secretName: my-user-source
        type: tls
      bootstrapServers: my-cluster-source-kafka-bootstrap:9092 
6

      # TLS configuration (optional)
      tls: 
7

        trustedCertificates:
          - pattern: "*.crt"
            secretName: my-cluster-source-cluster-ca-cert
    - alias: "my-cluster-target" 
8

      # Authentication (optional)
      authentication: 
9

        certificateAndKey:
          certificate: target.crt
          key: target.key
          secretName: my-user-target
        type: tls
      bootstrapServers: my-cluster-target-kafka-bootstrap:9092 
10

      # Kafka Connect configuration (optional)
      config: 
11

        config.storage.replication.factor: 1
        offset.storage.replication.factor: 1
        status.storage.replication.factor: 1
      # TLS configuration (optional)
      tls: 
12

        trustedCertificates:
          - pattern: "*.crt"
            secretName: my-cluster-target-cluster-ca-cert
  # Mirroring configurations (required)
  mirrors: 
13

    - sourceCluster: "my-cluster-source" 
14

      targetCluster: "my-cluster-target" 
15

      # Topic and group patterns (required)
      topicsPattern: "topic1|topic2|topic3" 
16

      groupsPattern: "group1|group2|group3" 
17

      # Source connector configuration (required)
      sourceConnector: 
18

        tasksMax: 10 
19

        autoRestart: 
20

          enabled: true
        config:
          replication.factor: 1 
21

          offset-syncs.topic.replication.factor: 1 
22

          sync.topic.acls.enabled: "false" 
23

          refresh.topics.interval.seconds: 60 
24

          replication.policy.class: "org.apache.kafka.connect.mirror.IdentityReplicationPolicy" 
25

      # Heartbeat connector configuration (optional)
      heartbeatConnector: 
26

        autoRestart:
          enabled: true
        config:
          heartbeats.topic.replication.factor: 1 
27

          replication.policy.class: "org.apache.kafka.connect.mirror.IdentityReplicationPolicy"
      # Checkpoint connector configuration (optional)
      checkpointConnector: 
28

        autoRestart:
          enabled: true
        config:
          checkpoints.topic.replication.factor: 1 
29

          refresh.groups.interval.seconds: 600 
30

          sync.group.offsets.enabled: true 
31

          sync.group.offsets.interval.seconds: 60 
32

          emit.checkpoints.interval.seconds: 60 
33

          replication.policy.class: "org.apache.kafka.connect.mirror.IdentityReplicationPolicy"
  # Kafka version (recommended)
  version: 3.9.0 
34

  # Resources requests and limits (recommended)
  resources: 
35

    requests:
      cpu: "1"
      memory: 2Gi
    limits:
      cpu: "2"
      memory: 2Gi
  # Logging configuration (optional)
  logging: 
36

    type: inline
    loggers:
      connect.root.logger.level: INFO
  # Readiness probe (optional)
  readinessProbe: 
37

    initialDelaySeconds: 15
    timeoutSeconds: 5
  # Liveness probe (optional)
  livenessProbe:
    initialDelaySeconds: 15
    timeoutSeconds: 5
  # JVM options (optional)
  jvmOptions: 
38

    "-Xmx": "1g"
    "-Xms": "1g"
  # Custom image (optional)
  image: my-org/my-image:latest 
39

  # Rack awareness (optional)
  rack:
    topologyKey: topology.kubernetes.io/zone 
40

  # Pod template (optional)
  template: 
41

    pod:
      affinity:
        podAntiAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            - labelSelector:
                matchExpressions:
                  - key: application
                    operator: In
                    values:
                      - postgresql
                      - mongodb
              topologyKey: "kubernetes.io/hostname"
    connectContainer: 
42

      env:
        - name: OTEL_SERVICE_NAME
          value: my-otel-service
        - name: OTEL_EXPORTER_OTLP_ENDPOINT
          value: "http://otlp-host:4317"
  # Tracing configuration (optional)
  tracing:
    type: opentelemetry 
43
Copy to Clipboard Toggle word wrap

1
タスクを実行するワーカーのレプリカノード数。
2
Kafka Connect の Kafka クラスターエイリアス。ターゲット Kafka クラスターを指定する必要があります。Kafka クラスターは、その内部トピックのために Kafka Connect によって使用されます。
3
同期される Kafka クラスターの指定。
4
ソースの Kafka クラスターのクラスターエイリアス。
5
ソースクラスターの認証。mTLS、トークンベースの OAuth、SASL ベース SCRAM-SHA-256/SCRAM-SHA-512、または PLAIN として指定します。
6
ソース Kafka クラスターに接続するためのブートストラップアドレス。アドレスの形式は <cluster_name>-kafka-bootstrap:<port_number> です。Kafka クラスターは、Streams for Apache Kafka によって管理する必要はなく、OpenShift クラスターにデプロイする必要もありません。
7
指定されたシークレット内に X.509 形式で保存されている信頼できる証明書を使用した、Kafka クラスターへの暗号化接続の TLS 設定。
8
ターゲット Kafka クラスターのクラスターエイリアス。
9
ターゲット Kafka クラスターの認証は、ソース Kafka クラスターと同様に設定されます。
10
ターゲット Kafka クラスターに接続するためのブートストラップアドレス。アドレスの形式は <cluster_name>-kafka-bootstrap:<port_number> です。Kafka クラスターは、Streams for Apache Kafka によって管理する必要はなく、OpenShift クラスターにデプロイする必要もありません。
11
Kafka Connect の設定。Streams for Apache Kafka によって直接管理されないプロパティーに限り、標準の Apache Kafka 設定を指定できます。
12
ターゲット Kafka クラスターの TLS による暗号化は、ソース Kafka クラスターと同様に設定されます。
13
MirrorMaker 2 コネクター。
14
MirrorMaker 2 コネクターによって使用されるソースクラスターのクラスターエイリアス。
15
MirrorMaker 2 コネクターによって使用されるターゲットクラスターのクラスターエイリアス。
16
コンマ区切りリストまたは正規表現パターンとして定義されたソースクラスターからのトピックレプリケーション。ソースコネクターは指定のトピックをレプリケーションします。チェックポイントコネクターは、指定されたトピックのオフセットを追跡します。ここでは、3 つのトピックを名前でリクエストします。
17
コンマ区切りリストまたは正規表現パターンとして定義されたソースクラスターからのコンシューマーグループのレプリケーション。チェックポイントコネクターは、指定されたコンシューマーグループをレプリケーションします。ここで、3 つのコンシューマーグループを名前で要求します。
18
リモートトピックを作成する MirrorSourceConnector の設定。デフォルトの設定オプションは config によって上書きされます。
19
コネクターによる作成が可能なタスクの最大数。タスクは、データのレプリケーションを処理し、並行して実行されます。インフラストラクチャーが処理のオーバーヘッドをサポートする場合、この値を大きくするとスループットが向上されます。Kafka Connect は、クラスターのメンバー間でタスクを分散します。ワーカーよりも多くのタスクがある場合は、ワーカーには複数のタスクが割り当てられます。シンクコネクターでは、消費される各トピックパーティションに 1 つタスクがあるようになることを目指します。ソースコネクターでは、並行して実行できるタスクの数は外部システムによって異なる場合もあります。並列処理を実現できない場合、コネクターは最大数より少ないタスクを作成します。
20
失敗したコネクターとタスクの自動再起動を有効にします。デフォルトでは、再起動の回数は無制限ですが、maxRestarts プロパティーを使用して自動再起動の最大回数を設定できます。
21
ターゲットクラスターで作成されるミラーリングされたトピックのレプリケーション係数。
22
ソースおよびターゲットクラスターのオフセットをマップする MirrorSourceConnector offset-syncs 内部トピックのレプリケーション係数。
23
ACL ルールの同期が有効になっていると、同期されたトピックに ACL が適用されます。デフォルトは true です。この機能は User Operator と互換性がありません。User Operator を使用している場合は、このプロパティーを false に設定します。
24
新規トピックのチェック頻度を変更する任意設定。デフォルトでは 10 分毎にチェックされます。
25
リモートトピック名の自動変更をオーバーライドするポリシーを追加します。その名前の前にソースクラスターの名前を追加する代わりに、トピックが元の名前を保持します。このオプションの設定は、active/passive バックアップおよびデータ移行に役立ちます。このプロパティーはすべてのコネクターに指定する必要があります。双方向 (アクティブ/アクティブ) レプリケーションの場合、DefaultReplicationPolicy クラスを使用してリモートトピックの名前を自動的に変更し、すべてのコネクターに replication.policy.separator プロパティーを指定してカスタムセパレーターを追加します。
26
接続チェックを実行する MirrorHeartbeatConnector の設定。デフォルトの設定オプションは config によって上書きされます。
27
ターゲットクラスターで作成されたハートビートトピックのレプリケーション係数。
28
オフセットを追跡する MirrorCheckpointConnector の設定。デフォルトの設定オプションは config によって上書きされます。
29
ターゲットクラスターで作成されたチェックポイントトピックのレプリケーション係数。
30
新規コンシューマーグループのチェック頻度を変更する任意設定。デフォルトでは 10 分毎にチェックされます。
31
コンシューマーグループのオフセットを同期する任意設定。これは、active/passive 設定でのリカバリーに便利です。同期はデフォルトでは有効になっていません。
32
コンシューマーグループオフセットの同期が有効な場合は、同期の頻度を調整できます。
33
オフセット追跡のチェック頻度を調整します。オフセット同期の頻度を変更する場合は、これらのチェックの頻度も調整することを推奨します。
34
常に同じになる Kafka Connect と MirrorMaker 2 のバージョン。
35
現在 cpu および memory である、サポートされるリソースの予約を要求し、消費可能な最大リソースを指定を制限します。
36
指定された Kafka ロガーおよびログレベルが ConfigMap を介して直接的に (inline) または間接的に (external) に追加されます。カスタム Log4j 設定は、ConfigMap の log4j.properties キーまたは log4j2.properties キーの下に配置する必要があります。Kafka Connect log4j.rootLogger ロガーでは、ログレベルを INFO、ERROR、WARN、TRACE、DEBUG、FATAL または OFF に設定できます。
37
コンテナーを再起動するタイミング (liveness) およびコンテナーがトラフィックを許可できるタイミング (readiness) を把握するためのヘルスチェック。
38
Kafka MirrorMaker を実行している仮想マシン (VM) のパフォーマンスを最適化するための JVM 設定オプション。
39
高度なオプション: コンテナーイメージの設定。特別な状況でのみ推奨されます。
40
特別なオプション: 展開のための Rack awareness 設定。これは、リージョン間ではなく、同じロケーション内でのデプロイメントを目的とした特殊なオプションです。このオプションは、コネクターがリーダーレプリカではなく、最も近いレプリカから消費する場合に使用できます。場合によっては、最も近いレプリカから消費することで、ネットワークの使用率を改善したり、コストを削減したりできます。topologyKey は、ラック ID を含むノードラベルと一致する必要があります。この設定で使用される例では、標準の topology.kubernetes.io/zone ラベルを使用するゾーンを指定します。最も近いレプリカから消費するには、Kafka ブローカー設定で RackAwareReplicaSelector を有効にします。
41
テンプレートのカスタマイズ。ここでは、Pod は非アフィニティーでスケジュールされるため、Pod は同じホスト名のノードではスケジュールされません。
42
分散トレース用に環境変数が設定されます。
43
分散トレーシングは、OpenTelemetry を使用して有効になります。

10.10.1. active/active または active/passive モードの設定

MirrorMaker 2 は、active/passive または active/active クラスター設定で使用できます。

アクティブ/アクティブのクラスター設定
アクティブ/アクティブ設定には、双方向でデータをレプリケーションするアクティブなクラスターが 2 つあります。アプリケーションはいずれかのクラスターを使用できます。各クラスターは同じデータを提供できます。これにより、地理的に異なる場所で同じデータを利用できるようにします。コンシューマーグループは両方のクラスターでアクティブであるため、レプリケーションされたトピックのコンシューマーオフセットはソースクラスターに同期されません。
active/passive クラスター設定
active/passive 設定には、passive クラスターにデータをレプリケーションする active クラスターがあります。passive クラスターはスタンバイのままになります。システムに障害が発生した場合に、データ復旧に passive クラスターを使用できます。

プロデューサーとコンシューマーがアクティブなクラスターのみに接続することを前提とします。MirrorMaker 2 クラスターはターゲットごとに必要です。

10.10.1.1. 双方向レプリケーション (active/active)

MirrorMaker 2 アーキテクチャーは、アクティブ/アクティブ クラスター設定での双方向レプリケーションをサポートします。

各クラスターは、source および remote トピックの概念を使用して、別のクラスターのデータをレプリケーションします。同じトピックが各クラスターに保存されるため、リモートトピックの名前は MirrorMaker 2 によってソースクラスターを表すように自動的に変更されます。元のクラスターの名前の先頭には、トピックの名前が追加されます。

図10.1 トピック名の変更

MirrorMaker 2 双方向アーキテクチャー

ソースクラスターにフラグを付けると、トピックはそのクラスターにレプリケーションされません。

remote トピックを介したレプリケーションの概念は、データの集約が必要なアーキテクチャーの設定に役立ちます。コンシューマーは、同じクラスター内でソースおよびリモートトピックにサブスクライブできます。これに個別の集約クラスターは必要ありません。

10.10.1.2. 一方向レプリケーション (active/passive)

MirrorMaker 2 アーキテクチャーは、active/passive クラスター設定での一方向レプリケーションをサポートします。

active/passive のクラスター設定を使用してバックアップを作成したり、データを別のクラスターに移行したりできます。この場合、リモートトピックの名前の自動変更は推奨しません。

IdentityReplicationPolicy をソースコネクター設定に追加することで、名前の自動変更をオーバーライドできます。この設定が適用されると、トピックには元の名前が保持されます。

10.10.2. 複数のインスタンス用の MirrorMaker 2 の設定

デフォルトでは、Streams for Apache Kafka は、MirrorMaker 2 が実行される Kafka Connect フレームワークによって使用される内部トピックのグループ ID と名前を設定します。MirrorMaker 2 の複数のインスタンスを実行し、同じ connectCluster 値を共有する場合は、次の config プロパティーを使用してこれらのデフォルト設定を変更する必要があります。

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
  name: my-mirror-maker2
spec:
  connectCluster: "my-cluster-target"
  clusters:
  - alias: "my-cluster-target"
    config:
      group.id: my-connect-cluster 
1

      offset.storage.topic: my-connect-cluster-offsets 
2

      config.storage.topic: my-connect-cluster-configs 
3

      status.storage.topic: my-connect-cluster-status 
4

      # ...
    # ...
Copy to Clipboard Toggle word wrap
1
Kafka 内の Kafka Connect クラスターグループ ID。
2
コネクターオフセットを保存する Kafka トピック。
3
コネクターおよびタスクステータスの設定を保存する Kafka トピック。
4
コネクターおよびタスクステータスの更新を保存する Kafka トピック。
注記

group.id が同じすべてのインスタンスで、3 つのトピックの値を同じにする必要があります。

connectCluster 設定は、Kafka Connect が内部トピックに使用するターゲットの Kafka クラスターのエイリアスを指定します。その結果、connectCluster、グループ ID、および内部トピックの名前付け設定への変更は、ターゲットの Kafka クラスターに固有のものになります。2 つの MirrorMaker 2 インスタンスが同じソース Kafka クラスターを使用している場合、または MirrorMaker 2 インスタンスごとに connectCluster 設定とターゲットクラスターが異なるアクティブ/アクティブモードを使用している場合は、変更を加える必要はありません。

ただし、複数の MirrorMaker 2 インスタンスが同じ connectCluster を共有する場合、同じターゲット Kafka クラスターに接続する各インスタンスは同じ値でデプロイされます。実際には、これはすべてのインスタンスがクラスターを形成し、同じ内部トピックを使用することを意味します。

複数のインスタンスが同じ内部トピックを使用しようとすると、予期しないエラーが発生するため、インスタンスごとにこれらのプロパティーの値を変更する必要があります。

10.10.3. MirrorMaker 2 コネクターの設定

Kafka クラスター間のデータの同期を調整する内部コネクターには、MirrorMaker 2 コネクター設定を使用します。

MirrorMaker 2 は次のコネクターで構成されます。

MirrorSourceConnector
ソースコネクターは、トピックをソースクラスターからターゲットクラスターにレプリケーションします。また、ACL をレプリケーションし、MirrorCheckpointConnector を実行する必要があります。
MirrorCheckpointConnector
チェックポイントコネクターは定期的にオフセットを追跡します。有効にすると、ソースクラスターとターゲットクラスター間のコンシューマーグループオフセットも同期されます。
MirrorHeartbeatConnector
ハートビートコネクターは、ソースクラスターとターゲットクラスター間の接続を定期的にチェックします。

以下の表は、コネクタープロパティーと、これらを使用するために設定するコネクターを説明しています。

Expand
表10.3 MirrorMaker 2 コネクター設定プロパティー
プロパティーsourceConnectorcheckpointConnectorheartbeatConnector
admin.timeout.ms
新規トピックの検出などの管理タスクのタイムアウト。デフォルトは 60000 (1 分) です。

replication.policy.class
リモートトピックの命名規則を定義するポリシー。デフォルトは org.apache.kafka.connect.mirror.DefaultReplicationPolicy です。

replication.policy.separator
ターゲットクラスターのトピックの命名に使用されるセパレーター。デフォルトでは、区切り文字はドット (.) に設定されています。区切り文字の設定は、リモートトピック名を定義する DefaultReplicationPolicy レプリケーションポリシークラスにのみ適用されます。トピックは元の名前を保持するため、IdentityReplicationPolicy クラスはこのプロパティーを使用しません。

consumer.poll.timeout.ms
ソースクラスターをポーリングする際のタイムアウト。デフォルトは 1000 (1 秒) です。

 
offset-syncs.topic.location
offset-syncs トピックの場所。これは、source (デフォルト) または target クラスターになります。

 
topic.filter.class
レプリケーションするトピックを選択するためのトピックフィルター。デフォルトは org.apache.kafka.connect.mirror.DefaultTopicFilter です。

 
config.property.filter.class
レプリケーションするトピック設定プロパティーを選択するトピックフィルター。デフォルトは org.apache.kafka.connect.mirror.DefaultConfigPropertyFilter です。

  
config.properties.exclude
レプリケーションすべきでないトピック設定プロパティー。コンマ区切りのプロパティー名と正規表現をサポートします。

  
offset.lag.max
リモートパーティションが同期されるまでの最大許容 (同期外) オフセットラグ。デフォルトは 100 です。

  
offset-syncs.topic.replication.factor
内部 offset-syncs トピックのレプリケーション係数。デフォルトは 3 です。

  
refresh.topics.enabled
新しいトピックおよびパーティションの確認を有効にします。デフォルトは true です。

  
refresh.topics.interval.seconds
トピック更新の頻度。デフォルトは 600 (10 分) です。デフォルトでは、ソースクラスターの新規トピックのチェックは 10 分ごとに行われます。頻度は、refresh.topics.interval.seconds をソースコネクター設定に追加することで変更できます。

  
replication.factor
新しいトピックのレプリケーション係数。デフォルトは 2 です。

  
sync.topic.acls.enabled
ソースクラスターからの ACL の同期を有効にします。デフォルトは true です。詳細は、「リモートトピックの ACL ルールの同期」 を参照してください。

  
sync.topic.acls.interval.seconds
ACL 同期の頻度。デフォルトは 600 (10 分) です。

  
sync.topic.configs.enabled
ソースクラスターからのトピック設定の同期を有効にします。デフォルトは true です。

  
sync.topic.configs.interval.seconds
トピック設定の同期頻度。デフォルトは 600 (10 分) です。

  
checkpoints.topic.replication.factor
内部 checkpoints トピックのレプリケーション係数。デフォルトは 3 です。
 

 
emit.checkpoints.enabled
コンシューマーオフセットをターゲットクラスターに同期できるようにします。デフォルトは true です。
 

 
emit.checkpoints.interval.seconds
コンシューマーオフセット同期の頻度。デフォルトは 60 (1 分) です。
 

 
group.filter.class
レプリケーションするコンシューマーグループを選択するためのグループフィルター。デフォルトは org.apache.kafka.connect.mirror.DefaultGroupFilter です。
 

 
refresh.groups.enabled
新規コンシューマーグループの確認を有効にします。デフォルトは true です。
 

 
refresh.groups.interval.seconds
コンシューマーグループ更新の頻度。デフォルトは 600 (10 分) です。
 

 
sync.group.offsets.enabled
ターゲットクラスターの __consumer_offsets トピックへのコンシューマーグループオフセットの同期を有効にします。デフォルトは false です。
 

 
sync.group.offsets.interval.seconds
コンシューマーグループオフセット同期の頻度。デフォルトは 60 (1 分) です。
 

 
emit.heartbeats.enabled
ターゲットクラスターでの接続性チェックを有効にします。デフォルトは true です。
  

emit.heartbeats.interval.seconds
接続性チェックの頻度。デフォルトは 1 (1 秒) です。
  

heartbeats.topic.replication.factor
内部 heartbeats トピックのレプリケーション係数。デフォルトは 3 です。
  

10.10.3.1. コンシューマーグループオフセットの場所の変更トピック

MirrorMaker 2 は、内部トピックを使用してコンシューマーグループのオフセットを追跡します。

offset-syncs トピック
offset-syncs トピックは、レプリケーションされたトピックパーティションのソースおよびターゲットオフセットをレコードメタデータからマッピングします。
checkpoints トピック
checkpoints トピックは、各コンシューマーグループでレプリケーションされたトピックパーティションのソースおよびターゲットクラスターで、最後にコミットされたオフセットをマッピングします。

これらは MirrorMaker 2 によって内部的に使用されるため、これらのトピックと直接対話することはありません。

MirrorCheckpointConnector は、オフセット追跡用の チェックポイント を発行します。checkpoints トピックのオフセットは、設定によって事前に決定された間隔で追跡されます。両方のトピックは、フェイルオーバー時に正しいオフセットの位置からレプリケーションの完全復元を可能にします。

offset-syncs トピックの場所は、デフォルトで source クラスターです。offset-syncs.topic.location コネクター設定を使用して、これを target クラスターに変更することができます。トピックが含まれるクラスターへの読み取り/書き込みアクセスが必要です。ターゲットクラスターを offset-syncs トピックの場所として使用すると、ソースクラスターへの読み取りアクセス権しかない場合でも、MirrorMaker 2 を使用できるようになります。

10.10.3.2. コンシューマーグループオフセットの同期

__consumer_offsets トピックには、各コンシューマーグループのコミットされたオフセットに関する情報が保存されます。オフセットの同期は、ソースクラスターのコンシューマーグループのコンシューマーオフセットをターゲットクラスターのコンシューマーオフセットに定期的に転送します。

オフセットの同期は、特に active/passive 設定で便利です。アクティブなクラスターがダウンした場合、コンシューマーアプリケーションを passive (スタンバイ) クラスターに切り替え、最後に転送されたオフセットの位置からピックアップできます。

トピックオフセットの同期を使用するには、sync.group.offsets.enabled を checkpoint コネクター設定に追加し、プロパティーを true に設定して、同期を有効にします。同期はデフォルトで無効になっています。

ソースコネクターで IdentityReplicationPolicy を使用する場合は、チェックポイントコネクター設定でも設定する必要があります。これにより、ミラーリングされたコンシューマーオフセットが正しいトピックに適用されます。

コンシューマーオフセットは、ターゲットクラスターでアクティブではないコンシューマーグループに対してのみ同期されます。コンシューマーグループがターゲットクラスターにある場合、Synchronization を実行できず、UNKNOWN_MEMBER_ID エラーが返されます。

同期を有効にすると、ソースクラスターからオフセットの同期が定期的に行われます。この頻度は、sync.group.offsets.interval.seconds および emit.checkpoints.interval.seconds をチェックポイントコネクター設定に追加することで変更できます。これらのプロパティーは、コンシューマーグループのオフセットが同期される頻度 (秒単位) と、オフセットを追跡するためにチェックポイントが生成される頻度を指定します。両方のプロパティーのデフォルトは 60 秒です。refresh.groups.interval.seconds プロパティーを使用して、新規コンシューマーグループのチェック頻度を変更することもできます。デフォルトでは 10 分ごとに実行されます。

同期は時間ベースであるため、コンシューマーによって passive クラスターへ切り替えられると、一部のメッセージが重複する可能性があります。

注記

Java で作成されたアプリケーションがある場合は、RemoteClusterUtils.java ユーティリティーを使用して、アプリケーションを通じてオフセットを同期できます。ユーティリティーは、checkpoints トピックからコンシューマーグループのリモートオフセットを取得します。

10.10.3.3. ハートビートコネクターを使用するタイミングの決定

ハートビートコネクターはハートビートを出力して、ソース Kafka クラスターとターゲット Kafka クラスター間の接続を確認します。内部 heartbeat トピックはソースクラスターからレプリケートされます。つまり、ハートビートコネクターがソースクラスターに接続されている必要があります。heartbeat トピックはターゲットクラスターに配置されているため、次のことが可能になります。

  • データのミラーリング元のすべてのソースクラスターを特定します。
  • ミラーリングプロセスの稼働状況と遅延を確認する

これは、プロセスが何らかの理由でスタックしたり停止したりしていないことを確認するのに役立ちます。ハートビートコネクターは、Kafka クラスター間のミラーリングプロセスを監視するための貴重なツールですが、必ずしも使用する必要があるわけではありません。たとえば、デプロイメントのネットワーク遅延が低い場合、またはトピックの数が少ない場合は、ログメッセージやその他の監視ツールを使用してミラーリングプロセスを監視することが推奨されます。ハートビートコネクターを使用しない場合は、MirrorMaker 2 設定からハートビートコネクターを省略してください。

10.10.3.4. MirrorMaker 2 コネクターの設定の調整

MirrorMaker 2 コネクターが正しく動作することを確認するには、コネクター全体で特定の設定を調整してください。具体的には、次のプロパティーが該当するすべてのコネクターで同じ値であることを確認してください。

  • replication.policy.class
  • replication.policy.separator
  • offset-syncs.topic.location
  • topic.filter.class

たとえば、replication.policy.class の値は、ソース、チェックポイント、およびハートビートコネクターで同じである必要があります。設定が一致していないか欠落していると、データレプリケーションやオフセット同期で問題が発生するため、関連するすべてのコネクターを同じ設定にしておくことが重要です。

10.10.3.5. MirrorMaker 2 コネクターのオフセットのリスト

内部 MirrorMaker 2 コネクターのオフセット位置をリスト表示するには、Kafka Connect コネクターの管理に使用する設定と同じ設定を使用します。設定とオフセットのリスト表示の詳細は、「コネクターオフセットのリスト表示」 を参照してください。

この例では、sourceConnector 設定が更新され、コネクターのオフセット位置が返されます。オフセット情報は、指定した config map に書き込まれます。

MirrorMaker 2 コネクターの設定例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
  name: my-mirror-maker2
spec:
  version: 3.9.0
  # ...
  clusters:
  - alias: "my-cluster-source"
    bootstrapServers: my-cluster-source-kafka-bootstrap:9092
  - alias: "my-cluster-target"
    bootstrapServers: my-cluster-target-kafka-bootstrap:9092
  mirrors:
  - sourceCluster: "my-cluster-source"
    targetCluster: "my-cluster-target"
    sourceConnector:
      listOffsets:
        toConfigMap:
          name: my-connector-offsets
        # ...
Copy to Clipboard Toggle word wrap

コネクターオフセットの管理を可能にするには、KafkaMirrorMaker2 リソースに次のアノテーションを適用する必要があります。

  • strimzi.io/connector-offsets
  • strimzi.io/mirrormaker-connector

strimzi.io/mirrormaker-connector アノテーションを、コネクターの名前に設定する必要があります。これらのアノテーションは、操作が成功するか、リソースから手動で削除されるまで残ります。

MirrorMaker 2 コネクターの名前は、ソースクラスターおよびターゲットクラスターのエイリアスと、その後にコネクタータイプを追加したもの (<source_alias>-><target_alias>.<connector_type>) になります。

次の例では、my-cluster-source->my-cluster-target.MirrorSourceConnector という名前のコネクターにアノテーションが適用されています。

コネクターのアノテーションの適用例

oc annotate kafkamirrormaker2 my-mirror-maker-2 strimzi.io/connector-offsets=list strimzi.io/mirrormaker-connector="my-cluster-source->my-cluster-target.MirrorSourceConnector" -n kafka
Copy to Clipboard Toggle word wrap

オフセットは指定された config map 内にリスト表示されます。Streams for Apache Kafka は、コネクターの名前を使用して名付けられた .json プロパティーにオフセット情報を配置します。既存 config map の更新時に他のプロパティーは上書きされません。

ソースコネクターオフセットリストの例

apiVersion: v1
kind: ConfigMap
metadata:
  # ...
  ownerReferences: 
1

  - apiVersion: kafka.strimzi.io/v1beta2
    blockOwnerDeletion: false
    controller: false
    kind: KafkaMirrorMaker2
    name: my-mirror-maker2
    uid: 637e3be7-bd96-43ab-abde-c55b4c4550e0
data:
  my-cluster-source--my-cluster-target.MirrorSourceConnector.json: |- 
2

    {
      "offsets": [
        {
          "partition": {
            "cluster": "east-kafka",
            "partition": 0,
            "topic": "mirrormaker2-cluster-configs"
          },
          "offset": {
            "offset": 0
          }
        }
      ]
    }
Copy to Clipboard Toggle word wrap

1
KafkaMirrorMaker2 リソースを指す所有者参照。カスタムの所有者参照を指定するには、事前に config map を作成して所有者参照を設定します。
2
.json プロパティーはコネクター名を使用します。-> 文字は config map キーで使用できないため、コネクター名では ->-- に変更されます。
注記

設定を使用してコネクターオフセットを 変更 または リセット できますが、これが必要になることはほとんどありません。

10.10.4. MirrorMaker 2 コネクターのプロデューサとコンシューマーの設定

MirrorMaker 2 コネクターは、内部プロデューサーとコンシューマーを使用します。必要に応じて、これらのプロデューサーおよびコンシューマーを設定して、デフォルト設定を上書きできます。

たとえば、トピックをターゲットの Kafka クラスターに送信するソースプロデューサーの batch.size を増やして、大量のデータをより適切に対応できます。

重要

プロデューサおよびコンシューマーの設定オプションは MirrorMaker 2 の実装に依存しており、変更される可能性があります。

次の表では、各コネクターのプロデューサーとコンシューマー、および設定を追加できる場所を説明します。

Expand
表10.4 ソースコネクターのプロデューサーとコンシューマー
説明設定

プロデューサー

トピックメッセージをターゲット Kafka クラスターに送信します。大量のデータを処理する場合は、このプロデューサーの設定を調整することを検討してください。

mirrors.sourceConnector.config: producer.override.*

プロデューサー

レプリケートされたトピックパーティションのソースオフセットとターゲットオフセットをマップする、offset-syncs トピックに書き込みます。

mirrors.sourceConnector.config: producer.*

Consumer

ソース Kafka クラスターからトピックメッセージを取得します。

mirrors.sourceConnector.config: consumer.*

Expand
表10.5 チェックポイントコネクターのプロデューサーとコンシューマー
説明設定

プロデューサー

コンシューマーオフセットチェックポイントを発行します。

mirrors.checkpointConnector.config: producer.override.*

Consumer

offset-syncs トピックを読み込みます。

mirrors.checkpointConnector.config: consumer.*

注記

offset-syncs.topic.locationtarget に設定して、ターゲット Kafka クラスターを offset-syncs トピックの場所として使用できます。

Expand
表10.6 ハートビートコネクタープロデューサー
説明設定

プロデューサー

ハートビートを生成します。

mirrors.heartbeatConnector.config: producer.override.*

次の例は、プロデューサーとコンシューマーを設定する方法を示しています。

コネクターのプロデューサーとコンシューマーの設定例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
  name: my-mirror-maker2
spec:
  version: 3.9.0
  # ...
  mirrors:
  - sourceCluster: "my-cluster-source"
    targetCluster: "my-cluster-target"
    sourceConnector:
      tasksMax: 5
      config:
        producer.override.batch.size: 327680
        producer.override.linger.ms: 100
        producer.request.timeout.ms: 30000
        consumer.fetch.max.bytes: 52428800
        # ...
    checkpointConnector:
      config:
        producer.override.request.timeout.ms: 30000
        consumer.max.poll.interval.ms: 300000
        # ...
    heartbeatConnector:
      config:
        producer.override.request.timeout.ms: 30000
        # ...
Copy to Clipboard Toggle word wrap

10.10.5. データ複製タスクの最大数の指定

コネクターは、Kafka にデータを出し入れするタスクを作成します。各コネクターは 1 つ以上のタスクで構成されます。タスクは、タスクを実行するワーカー Pod のグループ全体に分散されます。タスクの数を増やすと、多数のパーティションをレプリケーションするとき、または多数のコンシューマーグループのオフセットを同期するときのパフォーマンスの問題に役立ちます。

タスクは並行して実行されます。ワーカーには 1 つ以上のタスクが割り当てられます。1 つのタスクが 1 つのワーカー Pod によって処理されるため、タスクよりも多くのワーカー Pod は必要ありません。ワーカーよりも多くのタスクがある場合、ワーカーは複数のタスクを処理します。

tasksMax プロパティーを使用して、MirrorMaker 設定でコネクタータスクの最大数を指定できます。タスクの最大数を指定しない場合、デフォルト設定のタスク数は 1 つです。

ハートビートコネクターは常に単一のタスクを使用します。

ソースおよびチェックポイントコネクターに対して開始されるタスクの数は、可能なタスクの最大数と tasksMax の値の間の低い値です。ソースコネクターの場合、可能なタスクの最大数は、ソースクラスターからレプリケーションされるパーティションごとに 1 つです。チェックポイントコネクターの場合、可能なタスクの最大数は、ソースクラスターからレプリケーションされるコンシューマーグループごとに 1 つです。タスクの最大数を設定するときは、プロセスをサポートするパーティションの数とハードウェアリソースを考慮してください。

インフラストラクチャーが処理のオーバーヘッドをサポートしている場合、タスクの数を増やすと、スループットと待機時間が向上する可能性があります。たとえば、タスクを追加すると、多数のパーティションまたはコンシューマーグループがある場合に、ソースクラスターのポーリングにかかる時間が短縮されます。

ソースコネクターのタスク数を増やすと、多数のパーティションがある場合に役立ちます。

ソースコネクターのタスク数を増やす

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
  name: my-mirror-maker2
spec:
  # ...
  mirrors:
  - sourceCluster: "my-cluster-source"
    targetCluster: "my-cluster-target"
    sourceConnector:
      tasksMax: 10
  # ...
Copy to Clipboard Toggle word wrap

多数のコンシューマーグループがある場合は、チェックポイントコネクターのタスク数を増やすと便利です。

チェックポイントコネクターのタスク数の増加

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
  name: my-mirror-maker2
spec:
  # ...
  mirrors:
  - sourceCluster: "my-cluster-source"
    targetCluster: "my-cluster-target"
    checkpointConnector:
      tasksMax: 10
  # ...
Copy to Clipboard Toggle word wrap

デフォルトでは、MirrorMaker 2 は 10 分ごとに新しいコンシューマーグループをチェックします。refresh.groups.interval.seconds 設定を調整して、頻度を変更できます。低く調整するときは注意してください。より頻繁なチェックは、パフォーマンスに悪影響を及ぼす可能性があります。

10.10.5.1. コネクタータスクの動作の確認

Prometheus と Grafana を使用してデプロイメントを監視している場合は、MirrorMaker 2 のパフォーマンスをチェックできます。Streams for Apache Kafka に付属する MirrorMaker 2 Grafana ダッシュボードの例では、タスクとレイテンシーに関連する次のメトリクスが表示されます。

  • タスクの数
  • レプリケーションのレイテンシー
  • オフセット同期のレイテンシー

10.10.6. リモートトピックの ACL ルールの同期

MirrorMaker 2 を Streams for Apache Kafka と併用すると、リモートトピックの ACL ルールを同期できます。ただし、この機能は User Operator を使用していない場合にのみ使用できます。

User Operator を使用せずに type: simple 認可を使用している場合、ブローカーへのアクセスを管理する ACL ルールはリモートトピックにも適用されます。これは、ソーストピックへの読み取りアクセス権を持つユーザーが、リモートの同等のトピックを読み取ることもできることを意味します。

注記

OAuth 2.0 での認可は、このようなリモートトピックへのアクセスをサポートしません。

10.10.7. Kafka MirrorMaker 2 デプロイメントの保護

この手順では、MirrorMaker 2 デプロイメントを保護するために必要な設定の概要を説明します。

ソース Kafka クラスターとターゲット Kafka クラスターには別々の設定が必要です。また、MirrorMaker がソースおよびターゲットの Kafka クラスターに接続するために必要な認証情報を提供するために、個別のユーザー設定が必要です。

Kafka クラスターの場合、OpenShift クラスター内のセキュア接続用の内部リスナーと、OpenShift クラスター外の接続用の外部リスナーを指定します。

認証および認可メカニズムを設定できます。ソースおよびターゲットの Kafka クラスターに実装されたセキュリティーオプションは、MirrorMaker 2 に実装されたセキュリティーオプションと互換性がある必要があります。

クラスターとユーザー認証情報を作成したら、セキュアな接続のために MirrorMaker 設定でそれらを指定します。

注記

この手順では、Cluster Operator によって生成された証明書が使用されますが、独自の証明書をインストール してそれらを置き換えることができます。外部 CA (認証局) によって管理される Kafka リスナー証明書を使用 するようにリスナーを設定することもできます。

作業を開始する前の注意事項

この手順を開始する前に、Streams for Apache Kafka に付属する サンプル設定ファイル を確認してください。これらには、mTLS または SCRAM-SHA-512 認証を使用して MirrorMaker 2 のデプロイメントを保護するための例が含まれています。例では、OpenShift クラスター内で接続するための内部リスナーを指定しています。

この例では、ソースおよびターゲットの Kafka クラスターでのユーザー操作を許可する ACL を含む、完全な認可の設定も提供します。

ソースおよびターゲット Kafka クラスターへのユーザーアクセスを設定する場合、ACL は内部 MirrorMaker 2 コネクターへのアクセス権と、ターゲットクラスター内の基盤となる Kafka Connect フレームワークによって使用されるクラスターグループおよび内部トピックへの読み取り/書き込みアクセス権を割り当てる必要があります。複数のインスタンスに対して MirrorMaker 2 を設定する など、クラスターグループまたは内部トピックの名前を変更した場合は、ACL 設定でその名前を使用します。

簡易認可は、Kafka AclAuthorizer および StandardAuthorizer プラグインで管理される ACL ルールを使用し、適切なアクセスレベルを確保します。KafkaUser リソースに簡易認証を使用するように設定する方法は、AclRule スキーマリファレンスを参照してください。

前提条件

  • Streams for Apache Kafka が実行中である。
  • ソースクラスターとターゲットクラスターの namespace が分離されている。

この手順では、ソースとターゲットの Kafka クラスターが別の namespace にインストールされていることを前提としています。Topic Operator を使用する場合は、こちらを行う必要があります。Topic Operator は、指定された namespace 内の単一クラスターのみをモニタリングします。

クラスターを namespace に分割することにより、クラスターシークレットをコピーして、namespace の外部からアクセスできるようにする必要があります。MirrorMaker 設定でシークレットを参照する必要があります。

手順

  1. 2 つの Kafka リソースを設定します。1 つはソース Kafka クラスターを保護するためのもので、もう 1 つはターゲット Kafka クラスターを保護するためのものです。

    認証用のリスナー設定を追加し、認可を有効にすることができます。

    この例の場合、内部リスナーは mTLS 暗号化と認証を使用して Kafka クラスター用に設定されています。Kafka の simple 認可が有効になっています。

    TLS 暗号化と mTLS 認証を使用したソース Kafka クラスター設定の例

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: my-source-cluster
    spec:
      kafka:
        version: 3.9.0
        replicas: 1
        listeners:
          - name: tls
            port: 9093
            type: internal
            tls: true
            authentication:
              type: tls
        authorization:
          type: simple
        config:
          offsets.topic.replication.factor: 1
          transaction.state.log.replication.factor: 1
          transaction.state.log.min.isr: 1
          default.replication.factor: 1
          min.insync.replicas: 1
          inter.broker.protocol.version: "3.9"
        storage:
          type: jbod
          volumes:
          - id: 0
            type: persistent-claim
            size: 100Gi
            deleteClaim: false
      zookeeper:
        replicas: 1
        storage:
          type: persistent-claim
          size: 100Gi
          deleteClaim: false
      entityOperator:
        topicOperator: {}
        userOperator: {}
    Copy to Clipboard Toggle word wrap

    TLS 暗号化と mTLS 認証を使用したターゲット Kafka クラスター設定の例

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: my-target-cluster
    spec:
      kafka:
        version: 3.9.0
        replicas: 1
        listeners:
          - name: tls
            port: 9093
            type: internal
            tls: true
            authentication:
              type: tls
        authorization:
          type: simple
        config:
          offsets.topic.replication.factor: 1
          transaction.state.log.replication.factor: 1
          transaction.state.log.min.isr: 1
          default.replication.factor: 1
          min.insync.replicas: 1
          inter.broker.protocol.version: "3.9"
        storage:
          type: jbod
          volumes:
            - id: 0
              type: persistent-claim
              size: 100Gi
              deleteClaim: false
      zookeeper:
        replicas: 1
        storage:
          type: persistent-claim
          size: 100Gi
          deleteClaim: false
      entityOperator:
        topicOperator: {}
        userOperator: {}
    Copy to Clipboard Toggle word wrap

  2. 別の namespace で Kafka リソースを作成または更新します。

    oc apply -f <kafka_configuration_file> -n <namespace>
    Copy to Clipboard Toggle word wrap

    Cluster Operator はリスナーを作成し、クラスターおよびクライアント認証局 (CA) 証明書を設定して Kafka クラスター内で認証を有効にします。

    証明書は、シークレット <cluster_name>-cluster-ca-cert に作成されます。

  3. 2 つの KafkaUser リソースを設定します。1 つはソース Kafka クラスターのユーザー用で、もう 1 つはターゲット Kafka クラスターのユーザー用です。

    1. 対応するソースおよびターゲットの Kafka クラスターと同じ認証および認可タイプを設定します。たとえば、ソース Kafka クラスターの Kafka 設定で tls 認証と simple 認可タイプを使用した場合は、KafkaUser 設定でも同じものを使用します。
    2. MirrorMaker 2 に必要な ACL を設定して、ソースおよびターゲットの Kafka クラスターでの操作を許可します。

    mTLS 認証のソースユーザー設定の例

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaUser
    metadata:
      name: my-source-user
      labels:
        strimzi.io/cluster: my-source-cluster
    spec:
      authentication:
        type: tls
      authorization:
        type: simple
        acls:
          # MirrorSourceConnector
          - resource: # Not needed if offset-syncs.topic.location=target
              type: topic
              name: mm2-offset-syncs.my-target-cluster.internal
            operations:
              - Create
              - DescribeConfigs
              - Read
              - Write
          - resource: # Needed for every topic which is mirrored
              type: topic
              name: "*"
            operations:
              - DescribeConfigs
              - Read
          # MirrorCheckpointConnector
          - resource:
              type: cluster
            operations:
              - Describe
          - resource: # Needed for every group for which offsets are synced
              type: group
              name: "*"
            operations:
              - Describe
          - resource: # Not needed if offset-syncs.topic.location=target
              type: topic
              name: mm2-offset-syncs.my-target-cluster.internal
            operations:
              - Read
    Copy to Clipboard Toggle word wrap

    mTLS 認証のターゲットユーザー設定の例

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaUser
    metadata:
      name: my-target-user
      labels:
        strimzi.io/cluster: my-target-cluster
    spec:
      authentication:
        type: tls
      authorization:
        type: simple
        acls:
          # cluster group
          - resource:
              type: group
              name: mirrormaker2-cluster
            operations:
              - Read
          # access to config.storage.topic
          - resource:
              type: topic
              name: mirrormaker2-cluster-configs
            operations:
              - Create
              - Describe
              - DescribeConfigs
              - Read
              - Write
          # access to status.storage.topic
          - resource:
              type: topic
              name: mirrormaker2-cluster-status
            operations:
              - Create
              - Describe
              - DescribeConfigs
              - Read
              - Write
          # access to offset.storage.topic
          - resource:
              type: topic
              name: mirrormaker2-cluster-offsets
            operations:
              - Create
              - Describe
              - DescribeConfigs
              - Read
              - Write
          # MirrorSourceConnector
          - resource: # Needed for every topic which is mirrored
              type: topic
              name: "*"
            operations:
              - Create
              - Alter
              - AlterConfigs
              - Write
          # MirrorCheckpointConnector
          - resource:
              type: cluster
            operations:
              - Describe
          - resource:
              type: topic
              name: my-source-cluster.checkpoints.internal
            operations:
              - Create
              - Describe
              - Read
              - Write
          - resource: # Needed for every group for which the offset is synced
              type: group
              name: "*"
            operations:
              - Read
              - Describe
          # MirrorHeartbeatConnector
          - resource:
              type: topic
              name: heartbeats
            operations:
              - Create
              - Describe
              - Write
    Copy to Clipboard Toggle word wrap

    注記

    typetls-external に設定することにより、User Operator の外部で発行された証明書を使用できます。詳細は、KafkaUserSpec スキーマリファレンス を参照してください。

  4. ソースおよびターゲットの Kafka クラスター用に作成した各 namespace で、KafkaUser リソースを作成または更新します。

    oc apply -f <kafka_user_configuration_file> -n <namespace>
    Copy to Clipboard Toggle word wrap

    User Operator はクライアント (MirrorMaker) に対応するユーザーを作成すると共に、選択した認証タイプに基づいて、クライアント認証に使用されるセキュリティークレデンシャルを作成します。

    User Operator は、KafkaUser リソースと同じ名前の新しいシークレットを作成します。シークレットには、mTLS 認証用の秘密鍵と公開鍵が含まれています。公開鍵は、クライアント CA によって署名されたユーザー証明書に含まれます。

  5. ソースおよびターゲットの Kafka クラスターに接続するための認証の詳細を使用して KafkaMirrorMaker2 リソースを設定します。

    TLS 暗号化と mTLS 認証を使用した MirrorMaker 2 設定の例

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaMirrorMaker2
    metadata:
      name: my-mirror-maker-2
    spec:
      version: 3.9.0
      replicas: 1
      connectCluster: "my-target-cluster"
      clusters:
        - alias: "my-source-cluster"
          bootstrapServers: my-source-cluster-kafka-bootstrap:9093
          tls: 
    1
    
            trustedCertificates:
              - secretName: my-source-cluster-cluster-ca-cert
                pattern: "*.crt"
          authentication: 
    2
    
            type: tls
            certificateAndKey:
              secretName: my-source-user
              certificate: user.crt
              key: user.key
        - alias: "my-target-cluster"
          bootstrapServers: my-target-cluster-kafka-bootstrap:9093
          tls: 
    3
    
            trustedCertificates:
              - secretName: my-target-cluster-cluster-ca-cert
                pattern: "*.crt"
          authentication: 
    4
    
            type: tls
            certificateAndKey:
              secretName: my-target-user
              certificate: user.crt
              key: user.key
          config:
            # -1 means it will use the default replication factor configured in the broker
            config.storage.replication.factor: -1
            offset.storage.replication.factor: -1
            status.storage.replication.factor: -1
      mirrors:
        - sourceCluster: "my-source-cluster"
          targetCluster: "my-target-cluster"
          sourceConnector:
            config:
              replication.factor: 1
              offset-syncs.topic.replication.factor: 1
              sync.topic.acls.enabled: "false"
          heartbeatConnector:
            config:
              heartbeats.topic.replication.factor: 1
          checkpointConnector:
            config:
              checkpoints.topic.replication.factor: 1
              sync.group.offsets.enabled: "true"
          topicsPattern: "topic1|topic2|topic3"
          groupsPattern: "group1|group2|group3"
    Copy to Clipboard Toggle word wrap

    1
    ソース Kafka クラスターの TLS 証明書。それらが別の namespace にある場合は、Kafka クラスターの namespace からクラスターシークレットをコピーします。
    2
    TLS mechanism を使用してソース Kafka クラスターにアクセスするためのユーザー認証。
    3
    ターゲット Kafka クラスターの TLS 証明書。
    4
    ターゲット Kafka クラスターにアクセスするためのユーザー認証。
  6. ターゲット Kafka クラスターと同じ namespace で KafkaMirrorMaker2 リソースを作成または更新します。

    oc apply -f <mirrormaker2_configuration_file> -n <namespace_of_target_cluster>
    Copy to Clipboard Toggle word wrap

10.10.8. MirrorMaker 2 コネクターの手動停止または一時停止

KafkaMirrorMaker2 リソースを使用して内部 MirrorMaker コネクターを設定している場合は、state 設定を使用してコネクターを停止または一時停止します。コネクターとタスクがインスタンス化されたままになる一時停止状態とは対照的に、コネクターを停止すると設定のみが保持され、アクティブなプロセスは保持されません。コネクター実行の停止は、単に一時停止するよりも長時間停止する場合に適しています。一時停止されたコネクターはすぐに再開されますが、停止されたコネクターにはメモリーとリソースが解放されるという利点があります。

注記

state 設定は、KafkaMirrorMaker2ConnectorSpec スキーマの (非推奨) pause 設定を置き換えるもので、コネクターでの一時停止を許可します。これまでに pause 設定を使用してコネクターを一時停止した場合には、競合の回避目的にのみ state 設定の使用に移行することを推奨します。

前提条件

  • Cluster Operator が稼働中である。

手順

  1. 再起動する MirrorMaker 2 コネクターを制御する KafkaMirrorMaker2 カスタムリソースの名前を見つけます。

    oc get KafkaMirrorMaker2
    Copy to Clipboard Toggle word wrap
  2. KafkaMirrorMaker2 リソースを編集して、コネクターを停止または一時停止します。

    MirrorMaker 2 コネクターを停止するための設定例

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaMirrorMaker2
    metadata:
      name: my-mirror-maker2
    spec:
      version: 3.9.0
      replicas: 3
      connectCluster: "my-cluster-target"
      clusters:
        # ...
      mirrors:
      - sourceCluster: "my-cluster-source"
        targetCluster: "my-cluster-target"
        sourceConnector:
          tasksMax: 10
          autoRestart:
            enabled: true
          state: stopped
      # ...
    Copy to Clipboard Toggle word wrap

    state 設定を stopped または paused に変更します。このプロパティーが設定されていない場合のコネクターのデフォルトの状態は running です。

  3. 変更を KafkaMirrorMaker2 設定に適用します。

    staterunning に変更するか、設定を削除して、コネクターを再開できます。

注記

または、Kafka Connect API を公開 し、stop エンドポイントと pause エンドポイントを使用してコネクターの実行を停止することもできます。たとえば、PUT /connectors/<connector_name>/stop などです。その後、resume エンドポイントを使用して再起動できます。

10.10.9. MirrorMaker 2 コネクターの手動での再起動

strimzi.io/restart-connector アノテーションを使用して、MirrorMaker 2 コネクターの再起動を手動でトリガーします。

前提条件

  • Cluster Operator が稼働中である。

手順

  1. 再起動する Kafka MirrorMaker 2 コネクターを制御する KafkaMirrorMaker2 カスタムリソースの名前を見つけます。

    oc get KafkaMirrorMaker2
    Copy to Clipboard Toggle word wrap
  2. KafkaMirrorMaker2 カスタムリソースから再起動する Kafka MirrorMaker 2 コネクターの名前を見つけます。

    oc describe KafkaMirrorMaker2 <mirrormaker_cluster_name>
    Copy to Clipboard Toggle word wrap
  3. コネクターの名前を使用して、OpenShift の KafkaMirrorMaker2 リソースにアノテーションを付けてコネクターを再起動します。

    oc annotate KafkaMirrorMaker2 <mirrormaker_cluster_name> "strimzi.io/restart-connector=<mirrormaker_connector_name>"
    Copy to Clipboard Toggle word wrap

    この例では、my-mirror-maker-2 クラスター内の my-connector コネクターが再起動されます。

    oc annotate KafkaMirrorMaker2 my-mirror-maker-2 "strimzi.io/restart-connector=my-connector"
    Copy to Clipboard Toggle word wrap
  4. 次の調整が発生するまで待ちます (デフォルトでは 2 分ごとです)。

    アノテーションが調整プロセスによって検出されているかぎり、MirrorMaker 2 コネクターは再起動されます。MirrorMaker 2 がリクエストを受け入れると、アノテーションは KafkaMirrorMaker2 カスタムリソースから削除されます。

10.10.10. MirrorMaker 2 コネクタータスクの手動での再起動

strimzi.io/restart-connector-task アノテーションを使用して、MirrorMaker 2 コネクターの再起動を手動でトリガーします。

前提条件

  • Cluster Operator が稼働中である。

手順

  1. 再起動する MirrorMaker 2 コネクタータスクを制御する KafkaMirrorMaker2 カスタムリソースの名前を見つけます。

    oc get KafkaMirrorMaker2
    Copy to Clipboard Toggle word wrap
  2. KafkaMirrorMaker2 カスタムリソースから、コネクターの名前と再起動するタスクの ID を検索します。

    oc describe KafkaMirrorMaker2 <mirrormaker_cluster_name>
    Copy to Clipboard Toggle word wrap

    タスク ID は 0 から始まる負の値ではない整数です。

  3. 名前と ID を使用して、OpenShift の KafkaMirrorMaker2 リソースにアノテーションを付けてコネクタータスクを再開します。

    oc annotate KafkaMirrorMaker2 <mirrormaker_cluster_name> "strimzi.io/restart-connector-task=<mirrormaker_connector_name>:<task_id>"
    Copy to Clipboard Toggle word wrap

    この例では、my-mirror-maker-2 クラスター内の my-connector コネクターのタスク 0 が再起動されます。

    oc annotate KafkaMirrorMaker2 my-mirror-maker-2 "strimzi.io/restart-connector-task=my-connector:0"
    Copy to Clipboard Toggle word wrap
  4. 次の調整が発生するまで待ちます (デフォルトでは 2 分ごとです)。

    アノテーションが調整プロセスによって検出されているかぎり、MirrorMaker 2 コネクタータスクが再起動されます。MirrorMaker 2 がリクエストを受け入れると、アノテーションは KafkaMirrorMaker2 カスタムリソースから削除されます。

Red Hat logoGithubredditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。 最新の更新を見る.

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

Theme

© 2026 Red Hat
トップに戻る