12.8. Debezium MongoDB 変更イベントから after 状態のソースドキュメント抽出


Debezium MongoDB コネクターは、MongoDB コレクションで発生する各操作を表すデータ変更メッセージを出力します。これらのイベントメッセージの複雑な構造は、元のデータベースイベントの詳細を忠実に表しています。ただし、ダウンストリームのコンシューマーの中には、メッセージを元の形式で処理できない場合があります。たとえば、データコレクション内のネストされたドキュメントを表すために、コネクターは、ネストされたフィールドを含む形式でイベントメッセージを出力します。sink コネクター、または元のメッセージの階層形式を処理できない他のコンシューマーをサポートするには、Debezium MongoDB イベントフラット化 (ExtractNewDocumentState) の単一メッセージ変換 (SMT) を使用できます。SMT は元のメッセージの構造を簡素化し、他の方法でメッセージを変更してデータを処理しやすくすることができます。

イベントフラット化変換は Kafka Connect SMT です。

注記

この章では、Debezium MongoDB ベースのデータベースコネクターのイベントフラット化シングルメッセージ変換 (SMT) について説明します。リレーショナルデータベースで使用する同等の SMT の詳細は、新記録の状態抽出 SMT のドキュメント を参照してください。

詳細は以下のセクションを参照してください。

12.8.1. Debezium MongoDB 変更イベントの構造の説明

Debezium MongoDB コネクターは、構造が複雑な変更イベントを生成します。各イベントメッセージには、以下の部分が含まれます。

ソースメタデータ

以下のフィールドが含まれますが、これに限定されません。

  • コレクション内のデータを変更した操作のタイプ (作成/挿入、更新、または削除)。
  • 変更が発生したデータベースおよびコレクションの名前。
  • 変更が加えられたタイミングを識別するタイムスタンプ。
  • (任意の項目) トランザクション情報
ドキュメントデータ
before データ

このフィールドは、Debezium コネクターの capture.mode が以下のいずれかの値に設定されている場合に MongoDB 6.0 以降を実行する環境に存在します。

After データ

現在の操作後にドキュメントに存在する値を表す JSON 文字列。イベントメッセージに after フィールドが存在するかどうかは、イベントの種類とコネクター設定によって異なります。MongoDB insert 操作の create イベントには、capture.mode 設定に関係なく、常に after フィールドが含まれます。update イベントの場合、after フィールドは capture.mode が以下のいずれかの値に設定されている場合にのみ存在します。

  • change_streams_update_full
  • change_streams_update_full_with_pre_image.

    注記

    変更イベントメッセージの after 値は、必ずしもイベント直後のドキュメントの状態を表すとは限りません。値は動的に計算されず、コネクターが変更イベントをキャプチャーした後、コレクションをクエリーしてドキュメントの現在の値を取得します。

    たとえば、複数の操作 ab、および c がドキュメントを立て続けに変更する状況を想像してみてください。コネクターが変更 a を処理するときに、全ドキュメントのコレクションをクエリーします。その間に bc の変更が発生します。コネクターが変更 a に関する全ドキュメントのクエリーに対する応答を受信すると、後に続く b または c の変更をもとにしたドキュメントのバージョンを受信する可能性があります。詳細は、capture.mode プロパティーのドキュメントを参照してください。

以下のフラグメントは、MongoDB の insert 操作後にコネクターが発行する create 変更イベントの基本構造を示しています。

{
  "op": "c",
  "after": "{\"field1\":\"newvalue1\",\"field2\":\"newvalue1\"}",
  "source": { ... }
}

前の例の after フィールドの複雑な形式では、ソースデータベースで発生した変更に関する詳細情報を提供します。ただし、一部のコンシューマーは、ネストされた値が含まれるメッセージを処理できません。元のメッセージの複雑なネストされたフィールドを、より単純で普遍的、かつ互換性のある構造に変換するには、MongoDB のイベントフラット化 SMT を使用します。SMT は、以下の例のようにメッセージ内のネストされたフィールドの構造をフラット化します。

{
  "field1" : "newvalue1",
  "field2" : "newvalue2"
}

Debezium MongoDB コネクターによって生成されたメッセージのデフォルト構造の詳細は、コネクターのドキュメント を参照してください。

12.8.2. Debezium MongoDB イベントフラット化変換の動作

MongoDB のイベントフラット化 SMT は、Debezium MongoDB コネクターによって発行された create または update 変更イベントメッセージから after フィールドを抽出します。SMT は、元の変更イベントメッセージを処理した後、after フィールドの内容のみを含む、単純化バージョンを生成します。

ユースケースに応じて、ExtractNewDocumentState SMT を Debezium MongoDB コネクター、または Debezium コネクターが生成するメッセージを消費する sink コネクターに適用できます。SMT を Debezium MongoDB コネクターに適用すると、SMT はコネクターが Apache Kafka に送信する前に出力するメッセージを変更します。Kafka が完全な Debezium 変更イベントメッセージを元の形式で確実に保持するには、SMT を sink コネクターに適用します。

イベントフラット化 SMT を使用して MongoDB コネクターから送信されたメッセージを処理すると、SMT は元のメッセージのレコードの構造を、通常の sink コネクターで使用できるように、適切に型指定された Kafka Connect レコードに変換します。たとえば、SMT は元のメッセージの after 情報を表す JSON 文字列を、任意のコンシューマーが処理できるスキーマ構造に変換します。

オプションとして、MongoDB のイベントフラット化 SMT を設定して、処理中に他の方法でメッセージを変更できます。詳細は 設定トピックを参照してください。

12.8.3. Debezium MongoDB イベントフラット化変換の設定

Debezium MongoDB コネクターによって発行されたメッセージを消費する sink コネクター用に、MongoDB のイベントフラット化 (ExtractNewDocumentState) SMT を設定します。

詳細は以下のセクションを参照してください。

12.8.3.1. 例: Debezium MongoDB イベントフラット化変換の基本設定

SMT のデフォルト動作を取得するには、以下の例のように、オプションを指定せずに sink コネクターの設定に SMT を追加します。

transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState

Kafka Connect コネクター設定と同様に、transforms= を複数のコンマ区切りの SMT エイリアスに設定できます。Kafka Connect は、リスト順で指定した変換を適用します。

MongoDB イベントフラット化 SMT を使用するコネクターに複数のオプションを設定できます。以下の例は、コネクターの drop.tombstonesdelete.handling.mode、および add.headers オプションを指定する設定です。

transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
transforms.unwrap.drop.tombstones=false
transforms.unwrap.delete.handling.mode=drop
transforms.unwrap.add.headers=op

前の例の設定オプションの詳細は、設定トピック を参照してください。

設定のカスタマイズ

コネクターは、多くの種類のイベントメッセージ (ハートビートメッセージ、廃棄メッセージ、またはトランザクションの変更に関するメタデータメッセージ) を発行する場合があります。イベントのサブセットに変換を適用するには、特定のイベントだけを対象にして 変換を選択して適用する SMT 述語ステートメント を定義できます。

12.8.4. MongoDB イベントメッセージで配列をエンコードするためのオプション

デフォルトでは、イベントフラット化 SMT は MongoDB 配列を Apache Kafka Connect または Apache Avro スキーマと互換性のある配列に変換します。MongoDB 配列には複数のタイプの要素を含めることができますが、Kafka 配列のすべての要素が同じタイプである必要があります。

SMT が環境のニーズを満たす方法で配列をエンコードするようにするには、array.encoding 設定オプションを指定できます。以下の例は、配列エンコーディングを指定するための設定を示しています。

transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
transforms.unwrap.array.encoding=<array|document>

設定に応じて、SMT は、次のエンコード方法のいずれかを使用して、ソースメッセージ内の配列の各インスタンスを処理します。

配列エンコーディング
array.encodingarray (デフォルト) に設定されている場合、SMT エンコードは array データ型を使用して元のメッセージの配列をエンコードします。正しい処理を確保するには、配列インスタンスのすべての要素が同じタイプである必要があります。このオプションで制限が課されることになりますが、ダウンストリームクライアントが配列を簡単に処理できるようにします。
ドキュメントのエンコーディング
array.encodingdocument に設定されている場合に、BSON シリアル化 と同様の方法で、ソースの各配列を structs の中の struct に変換します。メイン struct には、_0_1_2 などのフィールドが含まれます。各フィールド名は、元の配列内の要素のインデックスを表します。SMT は、これらの各インデックスフィールドに、ソース配列内の同等の要素に対して取得した値を設定します。Avro エンコーディングでは数字で始まるフィールド名が禁止されているため、インデックス名にはアンダースコアが接頭辞として付けられます。

次の例は、Debezium MongoDB コネクターが、異種データ型で構成される配列を含むデータベースドキュメントを表す方法を示しています。

例12.2 例: 複数のデータ型を含む配列のドキュメントエンコーディング

{
    "_id": 1,
    "a1": [
        {
            "a": 1,
            "b": "none"
        },
        {
            "a": "c",
            "d": "something"
        }
    ]
}

array.encodingdocument に設定されている場合、SMT は前述のドキュメントを以下の形式に変換します。

{
    "_id": 1,
    "a1": {
        "_0": {
            "a": 1,
            "b": "none"
        },
        "_1": {
            "a": "c",
            "d": "something"
        }
    }
}

document エンコーディングオプションを使用すると、SMT は異種要素で設定される任意の配列を処理できます。ただし、このオプションを使用する前に、sink コネクターと他のダウンストリームコンシューマーが複数のデータ型を含む配列を処理できることを常に確認してください。

12.8.5. MongoDB イベントメッセージでネストされた構造のフラット化

データベース操作に埋め込みドキュメントが含まれる場合、Debezium MongoDB コネクターは、元のドキュメントの階層構造を反映する Kafka イベントレコードを発行します。つまり、イベントメッセージは、ネストされたフィールド構造のセットとして、ネスト化ドキュメントを表します。ダウンストリームコネクターがネスト化された構造が含まれるメッセージを処理できない環境では、イベントフラット化 SMT を設定して、メッセージ内の階層構造をフラット化できます。フラットなメッセージ構造は、テーブルのようなストレージに適しています。

ネストされた構造をフラット化するように SMT を設定するには、flatten.struct 設定オプションを true に設定します。変換されたメッセージでは、フィールド名がドキュメントソースと一致するよう構築されます。SMT は、親ドキュメントフィールドの名前とネストされたドキュメントフィールドの名前を連結して、フラット化された各フィールドの名前を変更します。名前のコンポーネントを区切るには、flatten.struct.delimiter オプションで定義される区切り文字を使用します。struct.delimiter のデフォルト値はアンダースコア文字 (_) です。

次の例は、SMT がネストされた構造をフラット化するかどうかを指定する設定です。

transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
transforms.unwrap.flatten.struct=<true|false>
transforms.unwrap.flatten.struct.delimiter=<string>

以下の例は、MongoDB コネクターが出力するイベントメッセージを示しています。メッセージには、ネストされた 2 つのドキュメント bc のフィールドを含むドキュメント a のフィールドが含まれます。

{
    "_id": 1,
    "a": {
            "b": 1,
            "c": "none"
    },
    "d": 100
}

以下の例のメッセージは、MongoDB の SMT が前のメッセージでネスト化された構造をフラット化した後に表示した出力です。

{
    "_id": 1,
    "a_b": 1,
    "a_c": "none",
    "d": 100
}

結果のメッセージでは、元のメッセージでネストされた b フィールドと c フィールドがフラット化され、名前が変更されます。名前が変更されたフィールドは、親ドキュメント a の名前を、ネストされたドキュメントの名前 a_b および a_c と連結することによって形成されます。新しいフィールド名のコンポーネントは、struct.delimiter 設定プロパティーの設定で定義されているようにアンダースコアで区切られます。

12.8.6. Debezium MongoDB コネクターが $unset 操作によって削除されたフィールドの名前を報告する方法

MongoDB では、$unset Operator と $rename Operator の両方がドキュメントからフィールドを削除します。MongoDB コレクションはスキーマレスであるため、更新でドキュメントからフィールドが削除されると、更新済みドキュメントで抜けているフィールドの名前を推測できません。Debezium は、削除されたフィールドの名前をリストする removedFields 要素を含む更新メッセージを発行して、削除済みフィールドに関する情報を必要とする可能性がある sink コネクターまたはその他のコンシューマーをサポートします。

以下の例は、フィールド a を削除する操作の更新メッセージの一部を示しています。

"payload": {
  "op": "u",
  "ts_ms": "...",
  "before": "{ ... }",
  "after": "{ ... }",
  "updateDescription": {
    "removedFields": ["a"],
    "updatedFields": null,
    "truncatedArrays": null
  }
}

上記の例では、before および after はドキュメントが更新される前と後のソースドキュメントの状態を表します。これらのフィールドは、コネクターの capture.mode が次のリストで説明されているように設定されている場合にのみ、コネクターが発行するイベントメッセージに存在します。

before フィールド

変更前のドキュメントの状態を提供します。このフィールドは、capture.mode が以下のいずれかの値に設定されている場合にのみ表示されます。

  • change_streams_with_pre_image.
  • change_streams_update_full_with_pre_image.
After フィールド

変更後のドキュメントの完全な状態を提供します。このフィールドは、capture.mode が以下のいずれかの値に設定されている場合にのみ表示されます。

  • change_streams_update_full
  • change_streams_update_full_with_pre_image.

完全なドキュメントをキャプチャーするようにコネクターが設定されていると仮定すると、ExtractNewDocumentState SMT が $unset イベントの update メッセージを受信すると、SMT は、次の例に示すように、削除されたフィールドに null 値があることを表し、メッセージを再エンコードします。

{
    "id": 1,
    "a": null
}

完全なドキュメントをキャプチャーするように設定されていないコネクターの場合、SMT が $unset 操作の更新イベントを受信すると、以下の出力メッセージが生成されます。

{
   "a": null
}

12.8.7. 元のデータベース操作のタイプの判別

SMT がイベントメッセージをフラット化した後、生成されるメッセージでは、イベントを生成した操作が createupdate、または初期スナップショットの read であるかを判断できなくなります。通常、delete 操作に関する情報を公開するか、削除に伴うイベントを書き換えるようにコネクターを設定して、削除操作を識別できます。イベントメッセージで廃棄および書き換えに関する情報を公開するようにコネクターを設定する方法は、drop.tombstones および delete.handling.mode プロパティーを参照してください。

イベントメッセージでデータベース操作のタイプを報告させるには、SMT は次の要素のいずれかに op フィールドを追加します。

  • イベントメッセージのボディー。
  • メッセージヘッダー

たとえば、元の操作のタイプを示すヘッダープロパティーを追加するには、次の例のように、変換を追加してから、add.headers プロパティーをコネクター設定に追加します。

transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
transforms.unwrap.add.headers=op

上記の設定に基づいて、SMT は op ヘッダーをメッセージに追加し、文字列の値を割り当てて操作のタイプを特定することでイベントタイプを報告します。割り当てられた文字列の値は、元の MongoDB 変更イベントメッセージop フィールドの値に基づいています。

12.8.8. MongoDB イベントフラット化 SMT を使用した Debezium メタデータの Kafka レコード追加

MongoDB のイベントフラット化 SMT は、元の変更イベントメッセージから簡素化されたメッセージにメタデータフィールドを追加できます。追加されたメタデータフィールドには、2 つのアンダースコア ("__") が接頭辞として付けられます。イベントレコードにメタデータを追加すると、変更イベントが発生したコレクションの名前などのコンテンツを含めたり、レプリカセット名などのコネクター固有のフィールドを含めたりすることができます。現在、SMT は、sourcetransaction、および updateDescription の変更イベントのサブ構造フィールドのみを追加できます。

MongoDB 変更イベントの構造に関する詳細は、MongoDB コネクターのドキュメント を参照してください。

たとえば、次の設定を指定して、変更イベントのレプリカセット名 (rs) とコレクション名を、最終的なフラット化されたイベントレコードに追加できます。

transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
transforms.unwrap.add.fields=rs,collection

上記の設定により、フラット化されたレコードに次のコンテンツが追加されます。

{ "__rs" : "rs0", "__collection" : "my-collection", ... }

SMT でメタデータフィールドを delete イベントに追加する場合には、delete.handling.mode オプションの値を rewrite に設定します。

12.8.9. MongoDB 抽出の新しいドキュメント状態変換を選択して適用するオプション

データベースの変更が発生したときに Debezium コネクターが出力する変更イベントメッセージの他に、コネクターはハートビートメッセージなど、他のタイプのメッセージとスキーマ変更およびトランザクションに関するメタデータメッセージも出力します。これらの他のメッセージの構造は、SMT が処理するように設計された変更イベントメッセージの構造とは異なるため、目的のデータ変更メッセージのみを処理するようにコネクターを SMT を選んで適用することが推奨されます。

SMT を選択的に適用する方法の詳細は、変換用の SMT 述語の設定 を参照してください。

12.8.10. MongoDB の Debezium イベントフラット化変換の設定オプション

以下の表は、MongoDB イベントフラット化 SMT の設定オプションを示しています。

プロパティーデフォルト説明

array.encoding

array

SMT が元のイベントメッセージから読み取る配列をエンコードする際に使用する形式を指定します。以下のオプションのいずれかを設定します。

array
SMT は array データ型を使用して、MongoDB 配列を Apache Kafka Connect または Apache Avro スキーマと互換性のある形式にエンコードします。このオプションを設定する場合は、各配列インスタンスの要素が同じタイプであることを確認します。MongoDB では複数のデータ型を含めることができますが、ダウンストリームクライアントによっては配列を処理できません。
document
SMT は、BSON シリアル化 と同様の方法で、各 MongoDB 配列を structsstruct に変換します。メインの struct には、_0_1_2 などの名前のフィールドが含まれています。Avro の命名標準に準拠するために、SMT は各インデックスフィールドの数値名の前にアンダースコアを付けます。各数値フィールド名は、元の配列にある要素のインデックスを表します。SMT は、これらの各インデックスフィールドに、指定された配列要素のソースドキュメントから取得した値を設定します。

array.coding オプションの詳細は、MongoDB イベントメッセージで配列をエンコードするためのオプション を参照してください。

flatten.struct

false

SMT は、メッセージ内のネストされたプロパティーの名前を設定可能な区切り文字で連結して、単純なフィールド名を形成することで、元のイベントメッセージの構造 (structs) をフラット化します。

flatten.struct.delimiter

_

flatten.structtrue に設定されている場合、出力レコードにフィールド名を生成するために、変換が入力レコードから連結するフィールド名の間に挿入する区切り文字を指定します。

drop.tombstones

true

Debezium は、DELETE 操作ごとに tombstone レコードを生成します。デフォルト動作では、イベントフラット化 SMT はストリームからトゥームストーンレコードを削除します。tombstone レコードをストリームに残すには、drop.tombstones=false を指定します。

delete.handling.mode

drop

SMT が delete 操作用に生成する変更イベントレコードを処理する方法を指定します。以下のオプションのいずれかを設定します。

drop
SMT は、イベントストリームから delete 操作のレコードを削除します。
none
SMT は、イベントストリームからの元の変更イベントレコードを保持します。レコードには "value": "null" のみが含まれています。
rewrite
SMT は、ストリームからの変更イベントレコードの変更バージョンを保持します。レコードが削除されたことを示す別の方法を提供するために、変更されたレコードには、元のレコードからのキーと値のペアを含む value フィールドなど、value__deleted: true が追加されます。

rewrite オプションを設定すると、削除されたレコードを追跡するには、DELETE 操作の更新された単純化されたレコードで十分であると感じる場合があります。このような場合には、SMT で tombstone レコードを削除する ことが推奨されます。

add.headers.prefix

__ (double-underscore)

このオプションの文字列を設定して、ヘッダーに接頭辞を設定します。

add.headers

デフォルトなし

SMT で簡略化されたメッセージのヘッダーに追加するメタデータフィールドのコンマ区切りのリストを、スペースを入れずに指定します。元のメッセージに重複したフィールド名が含まれている場合、struct の名前とフィールドの名前を一緒に指定することで、変更する特定のフィールドを識別できます (例: source.ts_ms)。

必要に応じて、以下の形式のエントリーをリストに追加し、フィールドの元の名前を上書きして新しい名前を割り当てることができます。

<field_name>:<new_field_name>.

以下に例を示します。

version:VERSION, connector:CONNECTOR, source.ts_ms:EVENT_TIMESTAMP

指定する新しい名前の値では、大文字と小文字が区別されます。

SMT が簡略化したメッセージのヘッダーにメタデータフィールドを追加するとき、SMT は各メタデータフィールド名の前に 2 つのアンダースコアを付けます。構造体の指定に関して、SMT は構造体名とフィールド名の間にもアンダースコアを挿入します。

変更イベントの元のメッセージにないフィールドを指定した場合、SMT ではヘッダーにそのフィールドが追加されません。

add.fields.prefix

__ (double-underscore)

フィールド名の前に付けるオプションの文字列を指定します。

add.fields

デフォルトなし

このオプションを、簡略化された Kafka メッセージの value 要素に追加するメタデータフィールドのコンマ区切りリスト (スペースなし) に設定します。元のメッセージに重複したフィールド名が含まれている場合、struct の名前とフィールドの名前を一緒に指定することで、変更する特定のフィールドを識別できます (例: source.ts_ms)。
必要に応じて、以下の形式のエントリーをリストに追加し、フィールドの元の名前を上書きして新しい名前を割り当てることができます。

<field_name>:<new_field_name>.

以下に例を示します。

version:VERSION, connector:CONNECTOR, source.ts_ms:EVENT_TIMESTAMP

指定する新しい名前の値では、大文字と小文字が区別されます。

SMT が簡略化したメッセージの value 要素にメタデータフィールドを追加するとき、SMT は各メタデータフィールド名の前に 2 つのアンダースコアを付けます。構造体の指定に関して、SMT は構造体名とフィールド名の間にもアンダースコアを挿入します。

元の変更イベントメッセージに存在しないフィールドを指定した場合でも、SMT は指定されたフィールドを変更されたメッセージの value 要素に追加します。

既知の制限

  • MongoDB はスキーマのないデータベースであるため、Debezium を使用してスキーマベースのデータリレーショナルデータベースに変更をストリーミングするときに一貫した列定義を確保するには、名前が同じコレクション内のフィールドに同じ型のデータを格納する必要があります。
  • SMT を設定して、sink コネクターと互換性のある形式でメッセージを生成します。sink コネクターがフラットなメッセージ構造を必要としているにもかかわらず、ソース MongoDB ドキュメント内の配列を Structs の struct としてエンコードするメッセージを受信した場合、 sink コネクターはメッセージを処理できません。
Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.