5.2. Debezium MongoDB コネクターの仕組み


コネクターがサポートする MongoDB トポロジーの概要は、アプリケーションを計画するときに役立ちます。

MongoDB コネクターが設定およびデプロイされると、シードアドレスの MongoDB サーバーに接続して起動し、利用可能な各レプリカセットの詳細を判断します。各レプリカセットには独立した独自の oplog があるため、コネクターはレプリカセットごとに個別のタスクの使用を試みます。コネクターは、使用するタスクの最大数を制限でき、十分なタスクが利用できない場合は、コネクターは各タスクに複数のレプリカセットを割り当てます。ただし、タスクはレプリカセットごとに個別のスレッドを使用します。

注記

シャードクラスターに対してコネクターを実行する場合は、レプリカセットの数よりも大きい tasks.max の値を使用します。これにより、コネクターはレプリカセットごとに 1 つのタスクを作成でき、Kafka Connect が利用可能なワーカープロセス全体でタスクを調整、配布、および管理できるようにします。

Debezium MongoDB コネクターの仕組みの詳細は、以下を参照してください。

5.2.1. Debezium コネクターでサポートされる MongoDB トポロジー

MongoDB コネクターは以下の MongoDB トポロジーをサポートします。

MongoDB レプリカセット

Debezium MongoDB コネクターは単一の MongoDB レプリカセットから変更をキャプチャーできます。実稼働のレプリカセットには、少なくとも 3 つのメンバー が必要です。

レプリカセットで MongoDB コネクターを使用するには、コネクター設定の mongodb.connection.string プロパティーの値を レプリカセットの接続文字列 に設定する必要があります。コネクターが MongoDB 変更ストリームからの変更のキャプチャーを開始する準備ができると、接続タスクが開始されます。次に、接続タスクは、指定された接続文字列を使用して、使用可能なレプリカセットメンバーへの接続を確立します。

警告

コネクターによるデータベース接続の管理方法が変更されたため、Debezium のこのリリースでは、コネクターがメンバーシップ検出を実行できないようにする mongodb.members.auto.discover プロパティーの使用はサポートされなくなりました。

MongoDB のシャードクラスター

MongoDB のシャードクラスター は以下で構成されます。

  • レプリカセットとしてデプロイされる 1 つ以上のシャード
  • クラスターの設定サーバーとして動作する個別のレプリカセット。
  • クライアントが接続し、要求を適切なシャードにルーティングする 1 つ以上の ルーター ( mongos とも呼ばれます)。

    シャードクラスターで MongoDB コネクターを使用するには、コネクター設定で、mongodb.connection.string プロパティーの値を sharded cluster connection string に設定します。

警告

mongodb.connection.string プロパティーは、以前のバージョンのコネクターに configuration server レプリカのホストアドレスを提供するために使用されていた非推奨の mongodb.hosts プロパティーを置き換えます。現在のリリースでは、mongodb.connection.string を使用して、コネクターに MongoDB ルーター (mongos とも呼ばれる) のアドレスを提供します。

注記

コネクターがシャードクラスターに接続すると、クラスター内のシャードを表す各レプリカセットに関する情報が検出されます。コネクターは、個別のタスクを使用して各シャードからの変更をキャプチャーします。シャードがクラスターに追加されるか、またはクラスターから削除されると、コネクターはタスクの数を動的に調整して変更を補正します。

MongoDB スタンドアロンサーバー
スタンドアロンサーバーには oplog がないため、MongoDB コネクターはスタンドアロン MongoDB サーバーの変更を監視できません。スタンドアロンサーバーが 1 つのメンバーを持つレプリカセットに変換されると、コネクターが動作します。
注記

MongoDB は、実稼働でのスタンドアロンサーバーの実行を推奨しません。詳細は MongoDB のドキュメント を参照してください。

5.2.2. Debezium MongoDB コネクターでレプリカセットおよびシャードクラスターに論理名を使用する方法

コネクター設定プロパティー topic.prefix は、MongoDB レプリカセットまたはシャードされたクラスターの 論理名 として提供されます。コネクターは論理名をさまざまな方法で使用します。すべてのトピック名の接頭辞として、各レプリカセットの変更ストリームの位置を記録する際に一意の識別子として使用されます。

各 MongoDB コネクターに、ソース MongoDB システムを意味する一意の論理名を命名する必要があります。論理名は、アルファベットまたはアンダースコアで始まり、残りの文字を英数字またはアンダースコアとすることが推奨されます。

5.2.3. Debezium MongoDB コネクターでのスナップショットの実行方法

Debezium タスクがレプリカセットを使用して起動すると、コネクターの論理名とレプリカセット名を使用して、コネクターが変更の読み取りを停止した位置を示す オフセット を検出します。オフセットが検出され、oplog に存在する場合、タスクは記録されたオフセットの位置から即座に ストリームの変更 を続行します。

ただし、オフセットが見つからない場合、または oplog にその位置が含まれていない場合、タスクはまず スナップショット を実行してレプリカセットの内容の現在の状態を取得する必要があります。このプロセスは、oplog の現在の位置を記録して開始され、オフセット (スナップショットが開始されたことを示すフラグとともに) として記録します。次に、タスクは各コレクションのコピーに進み、できるだけ多くのスレッド (snapshot.max.threads 設定プロパティーの値まで) を生成して、この作業を並行して実行します。コネクターは、参照したドキュメントごとに個別の 読み取りイベント を記録します。各読み取りイベントには、オブジェクトの識別子、オブジェクトの完全な状態、およびオブジェクトが見つかった MongoDB レプリカセットに関する ソース 情報が含まれます。ソース情報には、イベントがスナップショットの作成中に生成されたことを示すフラグも含まれます。

このスナップショットは、コネクターのフィルターと一致するすべてのコレクションがコピーされるまで継続されます。タスクのスナップショットが完了する前にコネクターが停止した場合は、コネクターを再起動すると、再びスナップショットを開始します。

注記

コネクターがレプリカセットのスナップショットを実行している間、タスクの再割り当てと再設定を回避します。コネクターは、スナップショットの進捗を報告するログメッセージを生成します。最大限の制御を行うために、コネクターごとに個別の Kafka Connect クラスターを実行します。

スナップショットの詳細は、以下のセクションを参照してください。

5.2.4. アドホックスナップショット

デフォルトでは、コネクターは初回スナップショット操作の開始後にのみ実行されます。通常の状況では、この最初のスナップショットが作成されると、コネクターではスナップショットプロセスは繰り返し処理されません。コネクターがキャプチャーする今後の変更イベントデータはストリーミングプロセス経由でのみ行われます。

ただし、場合によっては、最初のスナップショット中にコネクターを取得したデータが古くなったり、失われたり、または不完全となったり可能性があります。収集データを再キャプチャーするメカニズムを提供するために、Debezium はアドホックスナップショットを実行するオプションを備えています。データベースで以下が変更されたことで、アドホックスナップショットが実行される場合があります。

  • コネクター設定が変更され、異なるコレクションのセットをキャプチャーします。
  • Kafka トピックを削除して、再構築する必要があります。
  • 設定エラーや他の問題が原因で、データの破損が発生します。

いわゆる アドホックスナップショット を開始することで、以前にスナップショットをキャプチャしたコレクションに対してスナップショットを再実行することができます。アドホックスナップショットでは、シグナリングコレクション を使用する必要があります。シグナルリクエストを Debezium シグナルコレクションに送信して、アドホックスナップショットを開始します。

既存のコレクションのアドホックスナップショットを開始すると、コネクターはコレクションにすでに存在するトピックにコンテンツを追加します。既存のトピックが削除された場合には、トピックの自動作成 が有効になっているのであれば、Debezium は自動的にトピックを作成できます。

アドホックのスナップショットシグナルは、スナップショットに追加するコレクションを指定します。スナップショットは、データベースの内容全体をキャプチャーしたり、データベース内のコレクションのサブセットのみをキャプチャーしたりできます。また、スナップショットは、データベース内のコレクションの内容のサブセットをキャプチャーできます。

キャプチャーするコレクションは、シグナリングコレクションに execute-snapshot メッセージを送信することで指定します。execute-snapshot シグナルのタイプを incremental に設定し、スナップショットに含めるコレクション名を次の表に示すように指定します。

表5.1 アドホックの execute-snapshot シグナルレコードの例
フィールドデフォルト

type

incremental

実行するスナップショットのタイプを指定します。
タイプの設定は任意です。現在要求できるのは、incremental スナップショットのみです。

data-collections

該当なし

スナップショットされるコレクションの完全修飾名にマッチする正規表現を含む配列。
名前の形式は signal.data.collection 設定オプションと同じです。

additional-condition

該当なし

コレクションの内容のサブセットを取得するために、コレクションの列に基づいて条件を指定するオプションの文字列。

surrogate-key

該当なし

スナップショットプロセス中にコネクターがコレクションのプライマリーキーとして使用する列名を指定するオプションの文字列。

アドホックスナップショットのトリガー

execute-snapshot シグナルタイプのエントリーをシグナルコレクションに追加して、アドホックスナップショットを開始します。コネクターがメッセージを処理した後に、スナップショット操作を開始します。スナップショットプロセスは、最初と最後のプライマリーキーの値を読み取り、これらの値を各コレクションの開始ポイントおよびエンドポイントとして使用します。コレクションのエントリー数と設定されたチャンクサイズに基づいて、Debezium はコレクションをチャンクに分割し、チャンクごとに 1 度に 1 つずつスナップショットを順番に作成していきます。

現在、execute-snapshot アクションタイプは 増分スナップショット のみをトリガーします。詳細は、スナップショットの増分を参照してください。

5.2.5. 増分スナップショット

スナップショットを柔軟に管理するため、Debezium には 増分スナップショット と呼ばれる補助スナップショットメカニズムが含まれています。増分スナップショットは、Debezium コネクターにシグナルを送信する ための Debezium メカニズムに依存します。

増分スナップショットでは、最初のスナップショットのように、データベースの完全な状態を一度にすべてキャプチャーする代わりに、一連の設定可能なチャンクで各コレクションを段階的にキャプチャーします。スナップショットがキャプチャーするコレクションと、各チャンクのサイズ を指定できます。チャンクのサイズにより、データベース上の各フェッチ操作中にスナップショットで収集される行数が決まります。増分スナップショットのデフォルトのチャンクサイズは 1024 行です。

Debezium は、増分スナップショットの進行に伴い、その進捗を追跡するために透かしを使用し、キャプチャした各コレクション行の記録を保持します。この段階的なアプローチでは、標準の初期スナップショットプロセスと比較して、以下の利点があります。

  • スナップショットが完了するまで、ストリーミングストリーミングを延期する代わりに、ストリームしたデータキャプチャーと並行して増分スナップショットを実行できます。コネクターはスナップショットプロセス全体で変更ログからのほぼリアルタイムイベントをキャプチャーし続け、他の操作はブロックしません。
  • 増分スナップショットの進捗が中断された場合は、データを失うことなく再開できます。プロセス再開後、スナップショットは、最初からコレクションを再キャプチャーするのではなく、停止したポイントから開始されます。
  • いつでも増分スナップショットを実行し、必要に応じてプロセスを繰り返してデータベースの更新に適合できます。たとえば、コネクターの設定を変更してコレクションをその collection.include.list プロパティーにコレクションを追加します。

増分スナップショットプロセス

増分スナップショットを実行する場合には、Debezium は各コレクションをプライマリーキー別に分類して、設定されたチャンクサイズ に基づいてコレクションをチャンクに分割します。チャンクごとに作業し、チャンク内の各コレクション行をキャプチャします。キャプチャーする行ごとに、スナップショットは READ イベントを出力します。そのイベントは、対象となるチャンクのスナップショットを開始する時の行の値を表します。

スナップショットが進むと、他のプロセスがデータベースにアクセスし続け、コレクションのレコードが変更される可能性があります。このような変更を反映させるように、通常通りに INSERTUPDATEDELETE 操作がトランザクションログにコミットされます。同様に、継続中の Debezium ストリーミングプロセスは、これらの変更イベントを検出し、対応する変更イベントレコードを Kafka に出力します。

Debezium を使用してプライマリーキーが同じレコード間での競合を解決する方法

場合によっては、ストリーミングプロセスが出力する UPDATE または DELETE イベントを順番に受信できます。つまり、スナップショットがその行の READ イベントを含むチャンクをキャプチャーする前に、ストリーミングプロセスがコレクション行を変更するイベントを発行する可能性があります。スナップショットが最終的に対象の行にあった READ イベントを出力すると、その値はすでに置き換えられています。Debezium は、シーケンスが到達する増分スナップショットイベントが正しい論理順序で処理されるように、競合を解決するためにバッファースキームを使用します。スナップショットのイベント間で競合が発生し、ストリームされたイベントが解決されてからでないと、Debezium はイベントのレコードを Kafka に送信しません。

スナップショットウィンドウ

遅れて到着した READ イベントと、同じコレクション行を変更するストリームイベント間の衝突を解決するために、Debezium はいわゆる スナップショットウィンドウ を採用しています。スナップショットウィンドウは、増分スナップショットが指定されたコレクションチャンクのデータをキャプチャーする間隔を区切ります。チャンクのスナップショットウィンドウを開く前に、Debezium は通常の動作に従い、トランザクションログから直接ターゲットの Kafka トピックにイベントをダウンストリームに出力します。ただし、特定のチャンクのスナップショットが開放された瞬間から終了するまで、Debezium は重複除去のステップを実行して、プライマリーキーが同じイベント間での競合を解決します。

データコレクションごとに、Debezium は 2 種類のイベントを出力し、それらの両方のレコードを単一の宛先 Kafka トピックに保存します。テーブルから直接キャプチャーするスナップショットレコードは、READ 操作として出力されます。その間、ユーザーはデータコレクションのレコードの更新を続け、各コミットを反映するようにトランザクションログが更新されるので、Debezium は変更ごとに UPDATE または DELETE 操作を出力します。

スナップショットウィンドウが開放され、Debezium がスナップショットチャンクの処理を開始すると、スナップショットレコードをメモリーバッファーに提供します。スナップショットウィンドウ中に、バッファー内の READ イベントのプライマリーキーは、受信ストリームイベントのプライマリーキーと比較されます。一致するものが見つからない場合、ストリーミングされたイベントレコードが Kafka に直接送信されます。Debezium が一致を検出すると、バッファーされた READ イベントを破棄し、ストリーミングされたレコードを宛先トピックに書き込みます。これは、ストリーミングされたイベントが静的スナップショットイベントよりも論理的に優先されるためです。チャンクのスナップショットウィンドウが終了すると、バッファーに含まれるのは、関連するトランザクションログイベントが存在しない READ イベントのみです。Debezium は、これらの残りの READ イベントをコレクションの Kafka トピックに発行します。

コネクターは各スナップショットチャンクにプロセスを繰り返します。

警告

増分スナップショットでは、プライマリーキーが安定した順序で並べる必要があります。ただし、文字列 はエンコーディングと特殊文字が予期しない動作につながる可能性があるため、順序が安定しない場合があります (Mongo sort String)。増分スナップショットを実行する場合は、プライマリーキーに他のタイプを使用することを検討してください。

シャードクラスターの増分スナップショット

シャードクラスターの増分スナップショットは、Debezium MongoDB コネクターのテクノロジープレビュー機能です。テクノロジープレビュー機能は、Red Hat の実稼働環境のサービスレベルアグリーメント (SLA) ではサポートされません。また、機能的に完全ではない可能性があるため、Red Hat はテクノロジープレビュー機能を実稼働環境に実装することは推奨しません。テクノロジープレビューの機能は、最新の技術をいち早く提供して、開発段階で機能のテストやフィードバックの収集を可能にするために提供されます。サポート範囲の詳細は、テクノロジープレビュー機能のサポート範囲 を参照してください。

シャードされた MongoDB クラスターで増分スナップショットを使用するには、次のプロパティーに特定の値を設定する必要があります。

5.2.5.1. 増分スナップショットのトリガー

現在、増分スナップショットを開始する唯一の方法は、ソースのデータベース上のシグナリングコレクションに アドホックなスナップショットシグナル を送信することです。

MongoDB の insert() メソッドを使用して、シグナルコレクションにシグナルを送信します。

Debezium は、信号コレクションの変化を検出した後、信号を読み取り、要求されたスナップショット操作を実行します。

送信するクエリーは、スナップショットに含めるコレクションを指定し、オプションでスナップショット操作の種類を指定します。現在、スナップショット操作で唯一の有効なオプションはデフォルト値の incremental だけです。

スナップショットに含めるテーブルを指定するには、テーブルをリストアップした data-collections 配列、またはテーブルのマッチングに使用する正規表現の配列を指定します。たとえば、
{"data-collections": ["public.Collection1", "public.Collection2"]}

増分スナップショットシグナルの data-collections アレイにはデフォルト値がありません。data-collections アレイが空である場合には、アクションが不要であり、スナップショットを実行しないことが、Debezium で検出されます。

注記

スナップショットに含めるコレクションの名前に、データベース、スキーマ、またはテーブルの名前にドット (.) が含まれている場合、そのコレクションを data-collections 配列に追加するには、名前の各パートを二重引用符でエスケープする必要があります。

たとえば、public データベースに存在し、My.Collection という名前のデータコレクションを含めるには、"public"."My.Collection" という形式を使用します。

前提条件

ソースシグナリングチャネルを使用して増分スナップショットをトリガーする

  1. シグナリングコレクションにスナップショットシグナルドキュメントを挿入します。

    <signalDataCollection>.insert({"id" : _<idNumber>,"type" : <snapshotType>, "data" : {"data-collections" ["<collectionName>", "<collectionName>"],"type": <snapshotType>}});

    以下に例を示します。

    db.debeziumSignal.insert({ 1
    "type" : "execute-snapshot", 2 3
    "data" : {
    "data-collections" ["\"public\".\"Collection1\"", "\"public\".\"Collection2\""], 4
    "type": "incremental"} 5
    });

    コマンドの idtype、および data パラメーターの値は、シグナリングコレクションのフィールド に対応します。

    以下の表では、この例のパラメーターを説明しています。

    表5.2 シグナリングコレクションに増分スナップショットシグナルを送信するための MongoDB insert() コマンドのフィールドの説明
    項目説明

    1

    db.debeziumSignal

    ソースデータベース上のシグナリングコレクションの完全修飾名を指定します。

    2

    null

    id パラメーターは、シグナルリクエストの ID 識別子として割り当てられる任意の文字列を指定します。
    前述の例の insert メソッドは、オプションの _id パラメーターの使用が省略されています。ドキュメントはパラメーターの値を明示的に割り当てないため、MongoDB が自動的にドキュメントに割り当てる任意の ID がシグナルリクエストの id 識別子になります。
    この文字列を使用して、シグナリングコレクションのエントリーにロギングメッセージを識別します。Debezium はこの識別子文字列を使用しません。代わりに、スナップショット作成中に、Debezium は独自の ID 文字列をウォーターマークシグナルとして生成します。

    3

    execute-snapshot

    type パラメーターを指定し、シグナルがトリガーする操作を指定します。

    4

    data-collections

    シグナルの data フィールドの必須コンポーネントで、スナップショットに含めるコレクション名の配列またはコレクション名と一致する正規表現を指定します。
    この配列は、完全修飾名でテーブルをマッチさせる正規表現をリストアップします。signal.data.collection 設定プロパティーでコネクターのシグナリングテーブル名を指定するのと同じ形式を使用します。

    5

    incremental

    実行するスナップショット操作の種類指定するシグナルの data フィールドの任意のtype コンポーネント。
    現在、唯一の有効なオプションはデフォルト値 incremental だけです。
    値を指定しない場合には、コネクターは増分スナップショットを実行します。

以下の例は、コネクターによってキャプチャーされる増分スナップショットイベントの JSON を示しています。

例: 増分スナップショットイベントメッセージ

{
    "before":null,
    "after": {
        "pk":"1",
        "value":"New data"
    },
    "source": {
        ...
        "snapshot":"incremental" 1
    },
    "op":"r", 2
    "ts_ms":"1620393591654",
    "transaction":null
}

項目フィールド名説明

1

snapshot

実行するスナップショット操作タイプを指定します。
現在、唯一の有効なオプションはデフォルト値 incremental だけです。
シグナルコレクションに送信する SQL クエリーに type 値を指定します。
値を指定しない場合には、コネクターは増分スナップショットを実行します。

2

op

イベントタイプを指定します。
スナップショットイベントの値は r で、READ 操作を示します。

5.2.5.2. Kafka シグナルチャネルを使用して増分スナップショットをトリガーする

設定された Kafka トピック にメッセージを送信して、コネクターにアドホック増分スナップショットを実行するよう要求できます。

Kafka メッセージのキーは、topic.prefix コネクター設定オプションの値と一致する必要があります。

メッセージの値は、typedata フィールドが含まれる JSON オブジェクトとなっています。

シグナルタイプは execute-snapshot で、data フィールドには以下のフィールドが必要です。

表5.3 スナップショットデータフィールドの実行
フィールドデフォルト

type

incremental

実行するスナップショットのタイプ。現在、Debezium は incremental 型のみをサポートしています。
詳細は次のセクションを参照してください。

data-collections

該当なし

スナップショットに含めるテーブルの完全修飾名と一致する、コンマ区切りの正規表現の配列。
signal.data.collection 設定オプションに必要な形式と同じ形式を使用して名前を指定します。

additional-condition

該当なし

コネクターがスナップショットに含める列のサブセットを指定するために評価する条件を指定するオプションの文字列。

execute-snapshot Kafka メッセージの例:

Key = `test_connector`

Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`

追加条件付きのアドホック増分スナップショット

Debezium は additional-condition フィールドを使用してコレクションの内容のサブセットを選択します。

通常、Debezium はスナップショットを実行するときに、次のような SQL クエリーを実行します。

SELECT * FROM <tableName> …​.

スナップショットリクエストに additional-condition が含まれる場合、次のように additional-condition が SQL クエリーに追加されます。

SELECT * FROM <tableName> WHERE <additional-condition> …​.

たとえば、列 id (プライマリーキー)、color、および brand を含む products コレクションがある場合、スナップショットに color='blue' のコンテンツのみを含める場合は、スナップショットをリクエストするときに、コンテンツをフィルタリングする additional-condition ステートメントを追加することができます。

Key = `test_connector`

Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.products"], "type": "INCREMENTAL", "additional-condition":"color='blue'"}}`

additional-condition ステートメントを使用して、複数の列に基づいて条件を渡すことができます。たとえば、前の例と同じ products コレクションを使用して、color='blue' および brand='MyBrand'products コレクションのコンテンツのみをスナップショットに含める場合は、次のリクエストを送信できます。

Key = `test_connector`

Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.products"], "type": "INCREMENTAL", "additional-condition":"color='blue' AND brand='MyBrand'"}}`

5.2.5.3. 増分スナップショットの停止

ソースデータベースのコレクションにシグナルを送信することで、増分スナップショットを停止することもできます。シグナリングコレクションにドキュメントを挿入して、スナップショット停止のシグナルを送信します。Debezium はシグナルコレクションの変更を検出した後、シグナルを読み、増分スナップショット操作が進行中であればそれを停止します。

送信するクエリーは、incremental のスナップショット操作を指定し、任意で、削除する実行中のスナップショットのコレクションを指定します。

前提条件

ソースシグナリングチャネルを使用して増分スナップショットを停止する

  1. シグナリングコレクションにスナップショット停止のシグナルドキュメントを挿入します。

    <signalDataCollection>.insert({"id" : _<idNumber>,"type" : "stop-snapshot", "data" : {"data-collections" ["<collectionName>", "<collectionName>"],"type": "incremental"}});

    以下に例を示します。

    db.debeziumSignal.insert({ 1
    "type" : "stop-snapshot", 2 3
    "data" : {
    "data-collections" ["\"public\".\"Collection1\"", "\"public\".\"Collection2\""], 4
    "type": "incremental"} 5
    });

    signal コマンドの idtype、および data パラメーターの値は、シグナリングコレクションのフィールド に対応します。

    以下の表では、この例のパラメーターを説明しています。

    表5.4 シグナリングコレクションに増分スナップショットの停止ドキュメントを送信するための insert コマンドのフィールドの説明
    項目説明

    1

    db.debeziumSignal

    ソースデータベース上のシグナリングコレクションの完全修飾名を指定します。

    2

    null

    前述の例の insert メソッドは、オプションの _id パラメーターの使用が省略されています。ドキュメントはパラメーターの値を明示的に割り当てないため、MongoDB が自動的にドキュメントに割り当てる任意の ID がシグナルリクエストの id 識別子になります。
    この文字列を使用して、シグナリングコレクションのエントリーにロギングメッセージを識別します。Debezium はこの識別子文字列を使用しません。

    3

    stop-snapshot

    type パラメーターを指定し、シグナルがトリガーする操作を指定します。

    4

    data-collections

    シグナルの data フィールドの任意コンポーネントで、スナップショットから削除するコレクション名の配列またはコレクション名と一致する正規表現を指定します。
    この配列は、完全修飾名でテーブルをマッチさせる正規表現をリストアップします。signal.data.collection 設定プロパティーでコネクターのシグナリングテーブル名を指定するのと同じ形式を使用します。data フィールドのこのコンポーネントを省略すると、シグナルは進行中の増分スナップショット全体を停止します。

    5

    incremental

    停止させるスナップショット操作の種類を指定する信号の data フィールドの必須コンポーネント。
    現在、有効な唯一のオプションは incremental です。
    type の値を指定しない場合、シグナルは増分スナップショットの停止に失敗します。

5.2.5.4. Kafka シグナリングチャネルを使用して増分スナップショットを停止する

設定された Kafka シグナルトピック にシグナルメッセージを送信して、アドホック増分スナップショットを停止できます。

Kafka メッセージのキーは、topic.prefix コネクター設定オプションの値と一致する必要があります。

メッセージの値は、typedata フィールドが含まれる JSON オブジェクトとなっています。

シグナルタイプは stop-snapshot で、data フィールドには以下のフィールドが必要です。

表5.5 スナップショットデータフィールドの実行
フィールドデフォルト

type

incremental

実行するスナップショットのタイプ。現在、Debezium は incremental 型のみをサポートしています。
詳細は次のセクションを参照してください。

data-collections

該当なし

スナップショットに含めるテーブルの完全修飾名と一致する、コンマ区切りの正規表現のオプションの配列。
signal.data.collection 設定オプションに必要な形式と同じ形式を使用して名前を指定します。

次の例は、典型的な stop-snapshot の Kafka メッセージを示しています。

Key = `test_connector`

Value = `{"type":"stop-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`

5.2.6. Debezium MongoDB コネクターでの変更イベントレコードのストリーミング方法

レプリカセットレコードのコネクタータスクがオフセットを取得すると、オフセットを使用して変更のストリーミングを開始する oplog の位置を判断します。その後、タスクは (設定によって) レプリカセットのプライマリーノードに接続するか、レプリカセット全体の変更ストリームに接続し、その位置から変更のストリーミングを開始します。すべての作成、挿入、および削除操作を処理して Debezium の 変更イベント に変換します。各変更イベントには操作が検出された oplog の位置が含まれ、コネクターはこれを最新のオフセットとして定期的に記録します。オフセットが記録される間隔は、Kafka Connect ワーカー設定プロパティーである offset.flush.interval.ms によって制御されます。

コネクターが正常に停止されると、処理された最後のオフセットが記録され、再起動時にコネクターは停止した場所から続行されます。しかし、コネクターのタスクが予期せず終了した場合、最後にオフセットが記録された後、最後のオフセットが記録される前に、タスクによってイベントが処理および生成されることがあります。再起動時に、コネクターは最後に 記録された オフセットから開始し、クラッシュの前に生成された同じイベントを生成する可能性があります。

注記

Kafka パイプライン内のすべてのコンポーネントが正常に動作している場合、Kafka コンシューマーはすべてのメッセージを 1 度だけ 受信します。ただし、問題が発生した場合、Kafka はコンシューマーが 少なくとも 1 回 のみすべてのメッセージを受信することを保証できます。予期しない結果を回避するには、コンシューマーは重複メッセージを処理できる必要があります。

前述のように、コネクタータスクは常にレプリカセットのプライマリーノードを使用して oplog からの変更をストリーミングし、コネクターが可能な限り最新の操作を確認できるようにし、代わりにセカンダリーが使用された場合よりも短いレイテンシーで変更をキャプチャーできるようにします。レプリカセットが新しいプライマリーを選出すると、コネクターは即座に変更のストリーミングを停止し、新しいプライマリーに接続して、同じ場所にある新しいプライマリーノードから変更のストリーミングを開始します。同様に、コネクターとレプリカセットメンバーとの通信で問題が発生した場合は、レプリカセットが過剰にならないように指数バックオフを使用して再接続を試みます。接続の確立後、停止した場所から変更のストリーミングを続行します。これにより、コネクターはレプリカセットメンバーシップの変更を動的に調整でき、通信障害を自動的に処理できます。

要約すると、MongoDB コネクターはほとんどの状況で実行を継続します。通信の問題により、問題が解決されるまでコネクターが待機する可能性があります。

5.2.7. Debezium 変更イベントの before フィールドに入力するための MongoDB サポート

MongoDB 6.0 以降では、変更ストリームを設定して、ドキュメントのイメージ前の状態を出力し、MongoDB 変更イベントの before フィールドにデータを投入できます。MongoDB で事前のイメージを使用できるようにするには、db.createCollection()create、または collMod を使用して、コレクションの changeStreamPreAndPostImages を設定する必要があります。Debezium MongoDB が変更イベントに事前イメージを追加できるようにするには、コネクターの capture.mode*_with_pre_image オプションのいずれかに設定します。

MongoDB 変更イベントのサイズ制限

MongoDB 変更イベントのサイズは 16 メガバイトに制限されます。したがって、事前イメージを使用すると、このしきい値を超過し、障害が発生する可能性があります。変更ストリームの制限を超えないようにする方法は、MongoDB のドキュメント を参照してください。

5.2.8. Debezium MongoDB 変更イベントレコードを受信する Kafka トピックのデフォルト名

MongoDB コネクターは、各コレクションのドキュメントに対するすべての挿入、更新、および削除操作のイベントを 1 つの Kafka トピックに書き込みます。Kafka トピックの名前は常に logicalName.databaseName.collectionName の形式を取ります。logicalName は、topic.prefix 設定プロパティーで指定されるコネクターの 論理名databaseName は操作が発生したデータベースの名前、collectionName は影響を受けるドキュメントが存在する MongoDB コレクションの名前です。

たとえば、products, products_on_hand, customers, and orders の 4 つのコレクションで設定される inventory データベースを含む MongoDB レプリカセットについて考えてみましょう。コネクターが監視するこのデータベースの論理名が fulfillment である場合、コネクターは以下の 4 つの Kafka トピックでイベントを生成します。

  • fulfillment.inventory.products
  • fulfillment.inventory.products_on_hand
  • fulfillment.inventory.customers
  • fulfillment.inventory.orders

トピック名には、レプリカセット名やシャード名が含まれないことに注意してください。その結果、シャード化コレクションへの変更 (各シャードにコレクションのドキュメントのサブセットが含まれる) はすべて同じ Kafka トピックに移動します。

Kafka を設定して、必要に応じてトピックを 自動作成 できます。そうでない場合は、Kafka 管理ツールを使用してコネクターを起動する前にトピックを作成する必要があります。

5.2.9. イベントキーが Debezium MongoDB コネクターのトピックパーティション設定を制御する方法

MongoDB コネクターは、イベントのトピックパーティションを明示的に決定しません。代わりに、Kafka はイベントキーに基づいてトピックのパーティションを作成する方法を決定できます。Kafka Connect ワーカー設定に Partitioner 実装の名前を定義することで、Kafka のパーティショニングロジックを変更できます。

Kafka は、1 つのトピックパーティションに書き込まれたイベントのみ、合計順序を維持します。キーでイベントのパーティションを行うと、同じキーを持つすべてのイベントは常に同じパーティションに移動します。これにより、特定のドキュメントのすべてのイベントが常に完全に順序付けされます。

5.2.10. トランザクション境界を表す Debezium MongoDB コネクターによって生成されたイベント

Debezium は、トランザクションメタデータ境界を表すイベントを生成でき、データイベントメッセージを補完できます。

Debezium がトランザクションメタデータを受信する場合の制限

Debezium は、コネクターのデプロイ後に発生するトランザクションに対してのみメタデータを登録し、受信します。コネクターをデプロイする前に発生するトランザクションのメタデータは利用できません。

Debezium はすべてのトランザクションの BEGIN および END に対して、以下のフィールドが含まれるイベントを生成します。

status
BEGIN または END
id
一意のトランザクション識別子の文字列表現。
event_count (END イベント用)
トランザクションによって出力されるイベントの合計数。
data_collections (END イベント用)
指定のデータコレクションからの変更によって出力されたイベントの数を提供する data_collectionevent_count のペアの配列。

以下の例では、一般的なメッセージを示します。

{
  "status": "BEGIN",
  "id": "1462833718356672513",
  "event_count": null,
  "data_collections": null
}

{
  "status": "END",
  "id": "1462833718356672513",
  "event_count": 2,
  "data_collections": [
    {
      "data_collection": "rs0.testDB.collectiona",
      "event_count": 1
    },
    {
      "data_collection": "rs0.testDB.collectionb",
      "event_count": 1
    }
  ]
}

topic.transaction オプションで上書きされない限り、トランザクションイベントは <topic.prefix>.transaction という名前のトピックに書き込まれます。

変更データイベントのエンリッチメント

トランザクションメタデータを有効にすると、データメッセージ Envelope は新しい transaction フィールドでエンリッチされます。このフィールドは、複合フィールドの形式ですべてのイベントに関する情報を提供します。

id
一意のトランザクション識別子の文字列表現。
total_order
トランザクションによって生成されたすべてのイベントを対象とするイベントの絶対位置。
data_collection_order
トランザクションによって出力されたすべてのイベントを対象とするイベントのデータコレクションごとの位置。

以下は、メッセージの内容の例です。

{
  "after": "{\"_id\" : {\"$numberLong\" : \"1004\"},\"first_name\" : \"Anne\",\"last_name\" : \"Kretchmar\",\"email\" : \"annek@noanswer.org\"}",
  "source": {
...
  },
  "op": "c",
  "ts_ms": "1580390884335",
  "transaction": {
    "id": "1462833718356672513",
    "total_order": "1",
    "data_collection_order": "1"
  }
}
Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.