9.2. MirrorMaker 2 コネクターの設定


Kafka クラスター間のデータの同期を調整する内部コネクターには、MirrorMaker 2 コネクター設定を使用します。

MirrorMaker 2 は次のコネクターで構成されます。

MirrorSourceConnector
ソースコネクターは、トピックをソースクラスターからターゲットクラスターにレプリケーションします。また、ACL をレプリケーションし、MirrorCheckpointConnector を実行する必要があります。
MirrorCheckpointConnector
チェックポイントコネクターは定期的にオフセットを追跡します。有効にすると、ソースクラスターとターゲットクラスター間のコンシューマーグループオフセットも同期されます。
MirrorHeartbeatConnector
ハートビートコネクターは、ソースクラスターとターゲットクラスター間の接続を定期的にチェックします。

以下の表は、コネクタープロパティーと、これらを使用するために設定するコネクターを説明しています。

表9.1 MirrorMaker 2 コネクター設定プロパティー
プロパティーsourceConnectorcheckpointConnectorheartbeatConnector
admin.timeout.ms
新規トピックの検出などの管理タスクのタイムアウト。デフォルトは 60000 (1 分) です。

replication.policy.class
リモートトピックの命名規則を定義するポリシー。デフォルトは org.apache.kafka.connect.mirror.DefaultReplicationPolicy です。

replication.policy.separator
ターゲットクラスターのトピックの命名に使用されるセパレーター。デフォルトでは、区切り文字はドット (.) に設定されています。区切り文字の設定は、リモートトピック名を定義する DefaultReplicationPolicy レプリケーションポリシークラスにのみ適用されます。トピックは元の名前を保持するため、IdentityReplicationPolicy クラスはこのプロパティーを使用しません。

consumer.poll.timeout.ms
ソースクラスターをポーリングする際のタイムアウト。デフォルトは 1000 (1 秒) です。

 
offset-syncs.topic.location
offset-syncs トピックの場所。これは、source (デフォルト) または target クラスターになります。

 
topic.filter.class
レプリケーションするトピックを選択するためのトピックフィルター。デフォルトは org.apache.kafka.connect.mirror.DefaultTopicFilter です。

 
config.property.filter.class
レプリケーションするトピック設定プロパティーを選択するトピックフィルター。デフォルトは org.apache.kafka.connect.mirror.DefaultConfigPropertyFilter です。

  
config.properties.exclude
レプリケーションすべきでないトピック設定プロパティー。コンマ区切りのプロパティー名と正規表現をサポートします。

  
offset.lag.max
リモートパーティションが同期されるまでの最大許容 (同期外) オフセットラグ。デフォルトは 100 です。

  
offset-syncs.topic.replication.factor
内部 offset-syncs トピックのレプリケーション係数。デフォルトは 3 です。

  
refresh.topics.enabled
新しいトピックおよびパーティションの確認を有効にします。デフォルトは true です。

  
refresh.topics.interval.seconds
トピック更新の頻度。デフォルトは 600 (10 分) です。デフォルトでは、ソースクラスターの新規トピックのチェックは 10 分ごとに行われます。頻度は、refresh.topics.interval.seconds をソースコネクター設定に追加することで変更できます。

  
replication.factor
新しいトピックのレプリケーション係数。デフォルトは 2 です。

  
sync.topic.acls.enabled
ソースクラスターからの ACL の同期を有効にします。デフォルトは true です。詳細は、「ACL ルールの同期」 を参照してください。

  
sync.topic.acls.interval.seconds
ACL 同期の頻度。デフォルトは 600 (10 分) です。

  
sync.topic.configs.enabled
ソースクラスターからのトピック設定の同期を有効にします。デフォルトは true です。

  
sync.topic.configs.interval.seconds
トピック設定の同期頻度。デフォルトは 600 (10 分) です。

  
checkpoints.topic.replication.factor
内部 checkpoints トピックのレプリケーション係数。デフォルトは 3 です。
 

 
emit.checkpoints.enabled
コンシューマーオフセットをターゲットクラスターに同期できるようにします。デフォルトは true です。
 

 
emit.checkpoints.interval.seconds
コンシューマーオフセット同期の頻度。デフォルトは 60 (1 分) です。
 

 
group.filter.class
レプリケーションするコンシューマーグループを選択するためのグループフィルター。デフォルトは org.apache.kafka.connect.mirror.DefaultGroupFilter です。
 

 
refresh.groups.enabled
新規コンシューマーグループの確認を有効にします。デフォルトは true です。
 

 
refresh.groups.interval.seconds
コンシューマーグループ更新の頻度。デフォルトは 600 (10 分) です。
 

 
sync.group.offsets.enabled
ターゲットクラスターの __consumer_offsets トピックへのコンシューマーグループオフセットの同期を有効にします。デフォルトは false です。
 

 
sync.group.offsets.interval.seconds
コンシューマーグループオフセット同期の頻度。デフォルトは 60 (1 分) です。
 

 
emit.heartbeats.enabled
ターゲットクラスターでの接続性チェックを有効にします。デフォルトは true です。
  

emit.heartbeats.interval.seconds
接続性チェックの頻度。デフォルトは 1 (1 秒) です。
  

heartbeats.topic.replication.factor
内部 heartbeats トピックのレプリケーション係数。デフォルトは 3 です。
  

9.2.1. コンシューマーグループオフセットの場所の変更トピック

MirrorMaker 2 は、内部トピックを使用してコンシューマーグループのオフセットを追跡します。

offset-syncs トピック
offset-syncs トピックは、レプリケーションされたトピックパーティションのソースおよびターゲットオフセットをレコードメタデータからマッピングします。
checkpoints トピック
checkpoints トピックは、各コンシューマーグループでレプリケーションされたトピックパーティションのソースおよびターゲットクラスターで、最後にコミットされたオフセットをマッピングします。

これらは MirrorMaker 2 によって内部的に使用されるため、これらのトピックと直接対話することはありません。

MirrorCheckpointConnector は、オフセット追跡用の チェックポイント を発行します。checkpoints トピックのオフセットは、設定によって事前に決定された間隔で追跡されます。両方のトピックは、フェイルオーバー時に正しいオフセットの位置からレプリケーションの完全復元を可能にします。

offset-syncs トピックの場所は、デフォルトで source クラスターです。offset-syncs.topic.location コネクター設定を使用して、これを target クラスターに変更することができます。トピックが含まれるクラスターへの読み取り/書き込みアクセスが必要です。ターゲットクラスターを offset-syncs トピックの場所として使用すると、ソースクラスターへの読み取りアクセス権しかない場合でも、MirrorMaker 2 を使用できるようになります。

9.2.2. コンシューマーグループオフセットの同期

__consumer_offsets トピックには、各コンシューマーグループのコミットされたオフセットに関する情報が保存されます。オフセットの同期は、ソースクラスターのコンシューマーグループのコンシューマーオフセットをターゲットクラスターのコンシューマーオフセットに定期的に転送します。

オフセットの同期は、特に active/passive 設定で便利です。アクティブなクラスターがダウンした場合、コンシューマーアプリケーションを passive (スタンバイ) クラスターに切り替え、最後に転送されたオフセットの位置からピックアップできます。

トピックオフセットの同期を使用するには、sync.group.offsets.enabled を checkpoint コネクター設定に追加し、プロパティーを true に設定して、同期を有効にします。同期はデフォルトで無効になっています。

ソースコネクターで IdentityReplicationPolicy を使用する場合は、チェックポイントコネクター設定でも設定する必要があります。これにより、ミラーリングされたコンシューマーオフセットが正しいトピックに適用されます。

コンシューマーオフセットは、ターゲットクラスターでアクティブではないコンシューマーグループに対してのみ同期されます。コンシューマーグループがターゲットクラスターにある場合、Synchronization を実行できず、UNKNOWN_MEMBER_ID エラーが返されます。

同期を有効にすると、ソースクラスターからオフセットの同期が定期的に行われます。この頻度は、sync.group.offsets.interval.seconds および emit.checkpoints.interval.seconds をチェックポイントコネクター設定に追加することで変更できます。これらのプロパティーは、コンシューマーグループのオフセットが同期される頻度 (秒単位) と、オフセットを追跡するためにチェックポイントが生成される頻度を指定します。両方のプロパティーのデフォルトは 60 秒です。refresh.groups.interval.seconds プロパティーを使用して、新規コンシューマーグループのチェック頻度を変更することもできます。デフォルトでは 10 分ごとに実行されます。

同期は時間ベースであるため、コンシューマーによって passive クラスターへ切り替えられると、一部のメッセージが重複する可能性があります。

注記

Java で作成されたアプリケーションがある場合は、RemoteClusterUtils.java ユーティリティーを使用して、アプリケーションを通じてオフセットを同期できます。ユーティリティーは、checkpoints トピックからコンシューマーグループのリモートオフセットを取得します。

9.2.3. ハートビートコネクターを使用するタイミングの決定

ハートビートコネクターはハートビートを出力して、ソース Kafka クラスターとターゲット Kafka クラスター間の接続を確認します。内部 heartbeat トピックはソースクラスターからレプリケートされます。つまり、ハートビートコネクターがソースクラスターに接続されている必要があります。heartbeat トピックはターゲットクラスターに配置されているため、次のことが可能になります。

  • データのミラーリング元のすべてのソースクラスターを特定します。
  • ミラーリングプロセスの稼働状況と遅延を確認する

これは、プロセスが何らかの理由でスタックしたり停止したりしていないことを確認するのに役立ちます。ハートビートコネクターは、Kafka クラスター間のミラーリングプロセスを監視するための貴重なツールですが、必ずしも使用する必要があるわけではありません。たとえば、デプロイメントのネットワーク遅延が低い場合、またはトピックの数が少ない場合は、ログメッセージやその他の監視ツールを使用してミラーリングプロセスを監視することが推奨されます。ハートビートコネクターを使用しない場合は、MirrorMaker 2 設定からハートビートコネクターを省略してください。

9.2.4. MirrorMaker 2 コネクターの設定の調整

MirrorMaker 2 コネクターが正しく動作することを確認するには、コネクター全体で特定の設定を調整してください。具体的には、次のプロパティーが該当するすべてのコネクターで同じ値であることを確認してください。

  • replication.policy.class
  • replication.policy.separator
  • offset-syncs.topic.location
  • topic.filter.class

たとえば、replication.policy.class の値は、ソース、チェックポイント、およびハートビートコネクターで同じである必要があります。設定が一致していないか欠落していると、データレプリケーションやオフセット同期で問題が発生するため、関連するすべてのコネクターを同じ設定にしておくことが重要です。

Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.