3.5.4. MirrorMaker 2.0 を使用した Kafka クラスター間でのデータの同期


MirrorMaker 2.0 を使用して、設定を介して Kafka クラスター間のデータを同期します。

設定では以下を指定する必要があります。

  • 各 Kafka クラスター
  • TLS 認証を含む各クラスターの接続情報
  • レプリケーションのフローおよび方向

    • クラスター対クラスター
    • トピック対トピック

KafkaMirrorMaker2 リソースのプロパティーを使用して、Kafka MirrorMaker 2.0 のデプロイメントを設定します。

注記

従来のバージョンの MirrorMaker は継続してサポートされます。従来のバージョンに設定したリソースを使用する場合は、MirrorMaker 2.0 でサポートされる形式に更新する必要があります。

MirrorMaker 2.0 によって、レプリケーション係数などのプロパティーのデフォルト設定値が提供されます。デフォルトに変更がない最小設定の例は以下のようになります。

apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaMirrorMaker2
metadata:
  name: my-mirror-maker2
spec:
  version: 2.5.0
  connectCluster: "my-cluster-target"
  clusters:
  - alias: "my-cluster-source"
    bootstrapServers: my-cluster-source-kafka-bootstrap:9092
  - alias: "my-cluster-target"
    bootstrapServers: my-cluster-target-kafka-bootstrap:9092
  mirrors:
  - sourceCluster: "my-cluster-source"
    targetCluster: "my-cluster-target"
    sourceConnector: {}

TLS または SASL 認証を使用して、ソースおよびターゲットクラスターのアクセス制御を設定できます。この手順では、ソースおよびターゲットクラスターに対して TLS による暗号化および認証を使用する設定を説明します。

前提条件

手順

  1. KafkaMirrorMaker2 リソースの spec プロパティーを編集します。

    設定可能なプロパティーは以下の例のとおりです。

    apiVersion: kafka.strimzi.io/v1alpha1
    kind: KafkaMirrorMaker2
    metadata:
      name: my-mirror-maker2
    spec:
      version: 2.5.0 1
      replicas: 3 2
      connectCluster: "my-cluster-target" 3
      clusters: 4
      - alias: "my-cluster-source" 5
        authentication: 6
          certificateAndKey:
            certificate: source.crt
            key: source.key
            secretName: my-user-source
          type: tls
        bootstrapServers: my-cluster-source-kafka-bootstrap:9092 7
        tls: 8
          trustedCertificates:
          - certificate: ca.crt
            secretName: my-cluster-source-cluster-ca-cert
      - alias: "my-cluster-target" 9
        authentication: 10
          certificateAndKey:
            certificate: target.crt
            key: target.key
            secretName: my-user-target
          type: tls
        bootstrapServers: my-cluster-target-kafka-bootstrap:9092 11
        config: 12
          config.storage.replication.factor: 1
          offset.storage.replication.factor: 1
          status.storage.replication.factor: 1
          ssl.cipher.suites: "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384" 13
          ssl.enabled.protocols: "TLSv1.2"
          ssl.protocol: "TLSv1.2"
        tls: 14
          trustedCertificates:
          - certificate: ca.crt
            secretName: my-cluster-target-cluster-ca-cert
      mirrors: 15
      - sourceCluster: "my-cluster-source" 16
        targetCluster: "my-cluster-target" 17
        sourceConnector: 18
          config:
            replication.factor: 1 19
            offset-syncs.topic.replication.factor: 1 20
            sync.topic.acls.enabled: "false" 21
        heartbeatConnector: 22
          config:
            heartbeats.topic.replication.factor: 1 23
        checkpointConnector: 24
          config:
            checkpoints.topic.replication.factor: 1 25
        topicsPattern: ".*" 26
        groupsPattern: "group1|group2|group3" 27
      resources: 28
        requests:
          cpu: "1"
          memory: 2Gi
        limits:
          cpu: "2"
          memory: 2Gi
      logging: 29
        type: inline
        loggers:
          connect.root.logger.level: "INFO"
      readinessProbe: 30
        initialDelaySeconds: 15
        timeoutSeconds: 5
      livenessProbe:
        initialDelaySeconds: 15
        timeoutSeconds: 5
      jvmOptions: 31
        "-Xmx": "1g"
        "-Xms": "1g"
      image: my-org/my-image:latest 32
      template: 33
        pod:
          affinity:
            podAntiAffinity:
              requiredDuringSchedulingIgnoredDuringExecution:
                - labelSelector:
                    matchExpressions:
                      - key: application
                        operator: In
                        values:
                          - postgresql
                          - mongodb
                  topologyKey: "kubernetes.io/hostname"
        connectContainer: 34
          env:
            - name: JAEGER_SERVICE_NAME
              value: my-jaeger-service
            - name: JAEGER_AGENT_HOST
              value: jaeger-agent-name
            - name: JAEGER_AGENT_PORT
              value: "6831"
      tracing:
        type: jaeger 35
      externalConfiguration: 36
        env:
          - name: AWS_ACCESS_KEY_ID
            valueFrom:
              secretKeyRef:
                name: aws-creds
                key: awsAccessKey
          - name: AWS_SECRET_ACCESS_KEY
            valueFrom:
              secretKeyRef:
                name: aws-creds
                key: awsSecretAccessKey
    1
    Kafka Connect のバージョン。
    2
    レプリカノードの数。
    3
    Kafka Connect のクラスターエイリアス。
    4
    同期される Kafka クラスターの指定。
    5
    ソースの Kafka クラスターのクラスターエイリアス。
    6
    OAuth ベアラートークン、SASL ベースの SCRAM-SHA-512 または PLAIN メカニズムを使用し、ここで示された TLS メカニズム を使用する、ソースクラスターの認証。
    7
    ソース Kafka クラスターに接続するためのブートストラップサーバー。
    8
    ソース Kafka クラスターの TLS 証明書が X.509 形式で保存されるキー名のある TLS による暗号化。詳細は、KafkaMirrorMaker2Tls のスキーマ参照 を参照してください。
    9
    ターゲット Kafka クラスターのクラスターエイリアス。
    10
    ターゲット Kafka クラスターの認証は、ソース Kafka クラスターと同様に設定されます。
    11
    ターゲット Kafka クラスターに接続するためのブートストラップサーバー。
    12
    Kafka Connect の設定。標準の Apache Kafka 設定が提供されることがあり、AMQ Streams によって直接管理されないプロパティーに限定されます。
    13
    14
    ターゲット Kafka クラスターの TLS による暗号化は、ソース Kafka クラスターと同様に設定されます。
    15
    MirrorMaker 2.0 コネクター。
    16
    MirrorMaker 2.0 コネクターによって使用されるソースクラスターのエイリアス。
    17
    MirrorMaker 2.0 コネクターによって使用されるターゲットクラスターのエイリアス。
    18
    リモートトピックを作成する MirrorSourceConnector の設定。デフォルトの設定オプションは config によって上書きされます。
    19
    ターゲットクラスターで作成されるミラーリングされたトピックのレプリケーション係数。
    20
    ソースおよびターゲットクラスターのオフセットをマップする MirrorSourceConnector offset-syncs 内部トピックのレプリケーション係数。
    21
    有効にすると、同期されたトピックに ACL が適用されます。デフォルトは true です。
    22
    接続性チェックを実行する MirrorHeartbeatConnector の設定。デフォルトの設定オプションは config によって上書きされます。
    23
    ターゲットクラスターで作成されたハートビートトピックのレプリケーション係数。
    24
    オフセットを追跡する MirrorCheckpointConnector の設定。デフォルトの設定オプションは config によって上書きされます。
    25
    ターゲットクラスターで作成されたチェックポイントトピックのレプリケーション係数。
    26
    正規表現パターンとして定義されたソースクラスターからのトピックレプリケーション。ここで、すべてのトピックを要求します。
    27
    正規表現パターンとして定義されたソースクラスターからのコンシューマーグループレプリケーション。ここで、3 つのコンシューマーグループを名前で要求します。コンマ区切りリストを使用できます。
    28
    現在 cpu および memory である、サポートされるリソースの予約を要求し、消費可能な最大リソースを指定を制限します。
    29
    ConfigMap より直接的 (inline) または間接的 (external) に追加されたロガーおよびログレベルを指定します。カスタム ConfigMap は、log4j.properties または log4j2.properties キー下に配置する必要があります。Kafka Connect には connect.root.logger.level という単一のロガーがあります。ログレベルは INFO、ERROR、WARN、TRACE、DEBUG、FATAL、または OFF に設定できます。
    30
    コンテナーを再起動するタイミング (liveness) およびコンテナーがトラフィックを許可できるタイミング (readiness) を把握するためのヘルスチェック。
    31
    Kafka MirrorMaker を実行している仮想マシン (VM) のパフォーマンスを最適化するための JVM 設定オプション。
    32
    高度な任意手順: 特別な場合のみ推奨される コンテナーイメージの設定。
    33
    テンプレートのカスタマイズ。ここでは、Pod は非アフィニティーでスケジュールされるため、Pod は同じホスト名のノードではスケジュールされません。
    34
    35
    36
    環境変数として Kafka MirrorMaker にマウントされた OpenShift Secret。
  2. リソースを作成または更新します。

    oc apply -f <your-file>
Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.