9.6. MirrorMaker 2 を専用モードで実行する


MirrorMaker 2 を使用して、設定を通じて Kafka クラスター間でデータを同期します。この手順では、専用の単一ノード MirrorMaker 2 クラスターを設定して実行する方法を示します。専用クラスターは、Kafka Connect ワーカーノードを使用して、Kafka クラスター間でデータをミラーリングします。

注記

MirrorMaker 2 を分散モードで実行することも可能です。MirrorMaker 2 は、専用モードと分散モードの両方でコネクターとして動作します。専用の MirrorMaker クラスターを実行する場合、コネクターは Kafka Connect クラスターで設定されます。結果として、Kafka Connect クラスターへの直接アクセス、追加のコネクターの実行、および REST API の使用が可能になります。詳細は、Apache Kafka のドキュメント を参照してください。

MirrorMaker の以前のバージョンは、MirrorMaker 2 をレガシーモードで実行 することで引き続きサポートされます。

設定では以下を指定する必要があります。

  • 各 Kafka クラスター
  • TLS 認証を含む各クラスターの接続情報
  • レプリケーションのフローおよび方向

    • クラスターからクラスターへ
    • トピックからトピックへ
  • レプリケーションルール
  • コミットされたオフセット追跡間隔

この手順では、プロパティーファイルに設定を作成し、MirrorMaker スクリプトファイルを使用して接続を設定するときにプロパティーを渡すことによって MirrorMaker 2 を実装する方法を説明します。

ソースクラスターからレプリケーションするトピックおよびコンシューマーグループを指定できます。ソースおよびターゲットクラスターの名前を指定し、レプリケーションするトピックとコンシューマーグループを指定します。

以下の例では、クラスター 1 から 2 のレプリケーションに、トピックとコンシューマーグループが指定されます。

特定のトピックおよびコンシューマーグループをレプリケーションする設定例

clusters=cluster-1,cluster-2
cluster-1->cluster-2.topics = topic-1, topic-2
cluster-1->cluster-2.groups = group-1, group-2

名前のリストを指定したり、正規表現を使用したりできます。デフォルトでは、これらのプロパティーを設定しないと、すべてのトピックおよびコンシューマーグループがレプリケーションされます。.* を正規表現として使用し、すべてのトピックおよびコンシューマーグループをレプリケーションすることもできます。ただし、クラスターに不要な負荷が余分にかかるのを避けるため、必要なトピックとコンシューマーグループのみを指定するようにしてください。

作業を開始する前に

設定プロパティーファイルの例は ./config/connect-mirror-maker.properties にあります。

前提条件

  • レプリケートする各 Kafka クラスターノードのホストに Streams for Apache Kafka がインストールされている。

手順

  1. テキストエディターでサンプルプロパティーファイルを開くか、新しいプロパティーファイルを作成し、ファイルを編集して接続情報と各 Kafka クラスターのレプリケーションフローを追加します。

    以下の例は、cluster-1 および cluster-2 の 2 つのクラスターを双方向に接続する設定を示しています。クラスター名は、clusters プロパティーで設定できます。

    MirrorMaker 2 の設定例

    clusters=cluster-1,cluster-2 1
    
    cluster-1.bootstrap.servers=<cluster_name>-kafka-bootstrap-<project_name_one>:443 2
    cluster-1.security.protocol=SSL 3
    cluster-1.ssl.truststore.password=<truststore_name>
    cluster-1.ssl.truststore.location=<path_to_truststore>/truststore.cluster-1.jks_
    cluster-1.ssl.keystore.password=<keystore_name>
    cluster-1.ssl.keystore.location=<path_to_keystore>/user.cluster-1.p12
    
    cluster-2.bootstrap.servers=<cluster_name>-kafka-bootstrap-<project_name_two>:443 4
    cluster-2.security.protocol=SSL 5
    cluster-2.ssl.truststore.password=<truststore_name>
    cluster-2.ssl.truststore.location=<path_to_truststore>/truststore.cluster-2.jks_
    cluster-2.ssl.keystore.password=<keystore_name>
    cluster-2.ssl.keystore.location=<path_to_keystore>/user.cluster-2.p12
    
    cluster-1->cluster-2.enabled=true 6
    cluster-2->cluster-1.enabled=true 7
    cluster-1->cluster-2.topics=.* 8
    cluster-2->cluster-1.topics=topic-1, topic-2 9
    cluster-1->cluster-2.groups=.* 10
    cluster-2->cluster-1.groups=group-1, group-2 11
    
    replication.policy.separator=- 12
    sync.topic.acls.enabled=false 13
    refresh.topics.interval.seconds=60 14
    refresh.groups.interval.seconds=60 15

    1
    各 Kafka クラスターは、そのエイリアスで識別されます。
    2
    ブートストラップアドレス およびポート 443 を使用した、cluster-1 の接続情報。両方のクラスターはポート 443 を使用し、OpenShift Routes を使用して Kafka に接続します。
    3
    ssl. プロパティーは、cluster-1 の TLS 設定を定義します。
    4
    cluster-2 の接続情報です。
    5
    ssl. プロパティーは、cluster-2 の TLS 設定を定義します。
    6
    cluster-1 から cluster-2 へのレプリケーションフローが有効になっています。
    7
    cluster-2 から cluster-1. へのレプリケーションフローが有効になっています。
    8
    cluster-1 から cluster-2 へのすべてのトピックのレプリケーションです。ソースコネクターは指定のトピックをレプリケーションします。チェックポイントコネクターは、指定されたトピックのオフセットを追跡します。
    9
    cluster-2 から cluster-1 への特定のトピックのレプリケーション。
    10
    cluster-1 から cluster-2 へのすべてのコンシューマーグループのレプリケーション。チェックポイントコネクターは、指定されたコンシューマーグループをレプリケーションします。
    11
    cluster-2 から cluster-1 への特定のコンシューマーグループのレプリケーション。
    12
    リモートトピック名の変更に使用する区切り文字を定義します。
    13
    有効にすると、同期されたトピックに ACL が適用されます。デフォルトは false です。
    14
    新しいトピックの同期をチェックする間隔です。
    15
    新しいコンシューマーグループの同期をチェックする間隔です。
  2. オプション: 必要に応じて、リモートトピックの名前の自動変更をオーバーライドするポリシーを追加します。その名前の前にソースクラスターの名前を追加する代わりに、トピックが元の名前を保持します。

    このオプションの設定は、active/passive バックアップおよびデータ移行に使用されます。

    replication.policy.class=org.apache.kafka.connect.mirror.IdentityReplicationPolicy
  3. オプション: コンシューマーグループのオフセットを同期する場合は、設定を追加して同期を有効にし、管理します。

    refresh.groups.interval.seconds=60
    sync.group.offsets.enabled=true 1
    sync.group.offsets.interval.seconds=60 2
    emit.checkpoints.interval.seconds=60 3
    1
    コンシューマーグループのオフセットを同期する任意設定。これは、active/passive 設定でのリカバリーに便利です。同期はデフォルトでは有効になっていません。
    2
    コンシューマーグループオフセットの同期が有効な場合は、同期の頻度を調整できます。
    3
    オフセット追跡のチェック頻度を調整します。オフセット同期の頻度を変更する場合は、これらのチェックの頻度も調整することを推奨します。
  4. ターゲットクラスターで ZooKeeper および Kafka を起動します。

    su - kafka
    /opt/kafka/bin/zookeeper-server-start.sh -daemon \
    /opt/kafka/config/zookeeper.properties
    /opt/kafka/bin/kafka-server-start.sh -daemon \
    /opt/kafka/config/server.properties
  5. プロパティーファイルで定義したクラスター接続設定およびレプリケーションポリシーで MirrorMaker を起動します。

    /opt/kafka/bin/connect-mirror-maker.sh \
    /opt/kafka/config/connect-mirror-maker.properties

    MirrorMaker はクラスター間の接続を設定します。

  6. ターゲットクラスターごとに、トピックがレプリケーションされていることを確認します。

    /opt/kafka/bin/kafka-topics.sh --bootstrap-server <broker_address> --list
Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.