12.6. Debezium MongoDB 変更イベントから状態 後 のソースドキュメントの抽出


Debezium MongoDB コネクターは、MongoDB コレクションで発生する各操作を表すデータ変更メッセージを出力します。これらのイベントメッセージの複雑な構造は、元のデータベースイベントの詳細を表します。ただし、ダウンストリームのコンシューマーの中には、メッセージを元の形式で処理できない場合があります。たとえば、データコレクションでネストされたドキュメントを表すために、コネクターはネストされたフィールドが含まれる形式でイベントメッセージを出力します。シンクコネクター、または元のメッセージの階層形式を処理できない他のコンシューマーをサポートするには、Debezium MongoDB イベントフラット化(ExtractNewDocumentState)の単一メッセージ変換(SMT)を使用できます。SMT は元のメッセージの構造を簡素化し、データの処理を容易にする他の方法でメッセージを変更できます。

イベントフラット化変換は Kafka Connect SMT です。

注記

本章の情報では、Debezium MongoDB コネクターのイベントフラット化単一メッセージ変換(SMT)のみを説明します。リレーショナルデータベースで使用する同等の SMT の詳細は、New Record State Extraction SMT のドキュメント を参照してください。

詳細は以下のセクションを参照してください。

12.6.1. Debezium MongoDB 変更イベントの構造の説明

Debezium MongoDB コネクターは、複雑な構造を持つ変更イベントを生成します。各イベントメッセージには、以下の部分が含まれます。

ソースメタデータ

以下のフィールドが含まれますが、これに限定されません。

  • コレクションでデータが変更された操作のタイプ(作成/挿入、更新、または削除)。
  • 変更が発生したデータベースおよびコレクションの名前。
  • 変更が加えられたタイミングを識別するタイムスタンプ。
  • オプションのトランザクション情報。
ドキュメントデータ
データの

このフィールドは、Debezium コネクターの capture.mode が以下のいずれかの値に設定されている場合に MongoDB 6.0 以降を実行する環境に存在します。

  • change_streams_with_pre_image.
  • change_streams_update_full_with_pre_image.

    詳細は、MongoDB pre-image supportを参照してください。

After data

現在の操作の後にドキュメントに存在する値を表す JSON 文字列。イベントメッセージに after フィールドが存在するかどうかは、イベントの種類とコネクター設定によって異なります。MongoDB 挿入 操作の 作成 イベントには、capture.mode 設定に関係なく、常に after フィールドが含まれます。更新 イベントの場合、after フィールドは capture.mode が以下のいずれかの値に設定されている場合にのみ存在します。

  • change_streams_update_full
  • change_streams_update_full_with_pre_image.

    注記

    変更イベントメッセージの after 値は、イベント直後にあるドキュメントの状態を表すとは限りません。値は動的に計算されません。代わりに、コネクターが変更イベントをキャプチャーした後、コレクションをクエリーしてドキュメントの現在の値を取得します。

    たとえば、複数の操作、b、および c がドキュメント すばやく連続して変更する状況を想像してみてください。コネクターが を処理する 、完全なドキュメントのコレクションをクエリーします。その間、bc の変更が発生します。コネクターが を変更するための完全なドキュメントに対する応答を受信すると、b または c の後続の変更を基にしたドキュメントのバージョンを受け取る可能性があります。詳細は、capture.mode プロパティーのドキュメントを参照してください。

以下のフラグメントは、MongoDB の 挿入 操作後にコネクターが発行する create 変更イベントの基本構造を示しています。

{
  "op": "c",
  "after": "{\"field1\":\"newvalue1\",\"field2\":\"newvalue1\"}",
  "source": { ... }
}

上記の例の after フィールドの複雑な形式は、ソースデータベースで発生する変更に関する詳細情報を提供します。ただし、一部のコンシューマーは、ネストされた値が含まれるメッセージを処理できません。元のメッセージの複雑なネストされたフィールドを、より汎用的な互換性のある構造に変換するには、MongoDB のイベントフラット化 SMT を使用します。SMT は、以下の例のようにメッセージ内のネストされたフィールドの構造をフラット化します。

{
  "field1" : "newvalue1",
  "field2" : "newvalue2"
}

Debezium MongoDB コネクターによって生成されたメッセージのデフォルト構造の詳細は、コネクターの ドキュメント を参照してください。

12.6.2. Debezium MongoDB イベントフラット化変換の動作

MongoDB のイベントフラット化 SMT は、Debezium MongoDB コネクターによって出力される変更イベントメッセージの 作成 または 更新 から after フィールドを抽出します。SMT が元の変更イベントメッセージを処理すると、after フィールドの内容のみが含まれる簡略化されたバージョンが生成されます。

ユースケースに応じて、ExtractNewDocumentState SMT を Debezium MongoDB コネクターまたは Debezium コネクターが生成するメッセージを消費するシンクコネクターに適用できます。SMT を Debezium MongoDB コネクターに適用すると、SMT はコネクターが Apache Kafka に送信する前に出力するメッセージを変更します。Kafka が完全な Debezium 変更イベントメッセージを元の形式で保持するには、SMT をシンクコネクターに適用します。

イベントフラット化 SMT を使用して MongoDB コネクターから出力されるメッセージを処理する場合、SMT は元のメッセージのレコードの構造を、通常のシンクコネクターが使用できる適切な型指定された Kafka Connect レコードに変換します。たとえば、SMT は元のメッセージの after 情報を表す JSON 文字列を、任意のコンシューマーが処理できるスキーマ構造に変換します。

オプションとして、MongoDB のイベントフラット化 SMT を設定して、処理中に他の方法でメッセージを変更できます。詳細は、設定トピック を参照してください。

12.6.3. Debezium MongoDB イベントフラット化変換の設定

Debezium MongoDB コネクターによって出力されるメッセージを消費するシンクコネクターの MongoDB のイベントフラット化(ExtractNewDocumentState) SMT を設定します。

詳細は以下のセクションを参照してください。

12.6.3.1. 例:Debezium MongoDB イベント flattening-transformation の基本設定

SMT のデフォルト動作を取得するには、以下の例のように、オプションを指定せずにシンクコネクターの設定に SMT を追加します。

transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState

Kafka Connect コネクター設定と同様に、transforms= を複数のコンマ区切りの SMT エイリアスに設定できます。Kafka Connect は、リスト順で指定した変換を適用します。

MongoDB イベントフラット化 SMT を使用するコネクターに複数のオプションを設定できます。以下の例は、コネクターの drop.tombstonesdelete.handling.mode、および add.headers オプションを設定する設定を示しています。

transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
transforms.unwrap.drop.tombstones=false
transforms.unwrap.delete.handling.mode=drop
transforms.unwrap.add.headers=op

前の例の設定オプションの詳細については、設定トピック を参照してください。

設定のカスタマイズ

コネクターは、多くの種類のイベントメッセージ (ハートビートメッセージ、廃棄メッセージ、またはトランザクションの変更に関するメタデータメッセージ) を発行する場合があります。イベントのサブセットに変換を適用するには、特定のイベントのみに 変換を選択的に適用する SMT 述語ステートメント を定義できます。

12.6.4. MongoDB イベントメッセージでアレイをエンコードするためのオプション

デフォルトでは、イベントフラット化 SMT は MongoDB アレイを Apache Kafka Connect または Apache Avro スキーマと互換性のあるアレイに変換します。MongoDB アレイには複数のタイプの要素を含めることができますが、Kafka 配列のすべての要素が同じタイプである必要があります。

SMT が環境のニーズを満たす方法でアレイをエンコードするようにするには、array.encoding 設定オプションを指定できます。以下の例は、アレイエンコーディングを設定するための設定を示しています。

transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
transforms.unwrap.array.encoding=<array|document>

設定に応じて、SMT は以下のエンコーディング方法のいずれかを使用してソースメッセージの配列の各インスタンスを処理します。

array encoding
array.encodingarray (デフォルト)に設定されている場合、SMT エンコードは 配列 データ型を使用して元のメッセージにアレイをエンコードします。正しい処理を確保するには、配列インスタンスのすべての要素が同じタイプである必要があります。このオプションは 1 つの制限ですが、ダウンストリームクライアントがアレイを簡単に処理できるようにします。
ドキュメントのエンコーディング
array.encodingdocument に設定されている場合、SMT はソースの各配列を BSON シリアル化 と同様の方法で 構造体 に変換しますメイン 構造に は、_0_1_2 などのフィールドが含まれます。各フィールド名は、元の配列内の要素のインデックスを表します。SMT は、これらの各インデックスフィールドに、ソースアレイ内の同等の要素を取得する値を投入します。Avro エンコーディングでは数字で始まるフィールド名を禁止するため、インデックス名にはアンダースコアが付けられます。

以下の例は、Debezium MongoDB コネクターが異種データ型を含む配列が含まれるデータベースドキュメントを表す方法を示しています。

例12.1 例:複数のデータ型を含むアレイの文書化エンコーディング

{
    "_id": 1,
    "a1": [
        {
            "a": 1,
            "b": "none"
        },
        {
            "a": "c",
            "d": "something"
        }
    ]
}

array.encodingdocument に設定されている場合、SMT は前述のドキュメントを以下の形式に変換します。

{
    "_id": 1,
    "a1": {
        "_0": {
            "a": 1,
            "b": "none"
        },
        "_1": {
            "a": "c",
            "d": "something"
        }
    }
}

document エンコーディングオプションを使用すると、SMT は異種要素で設定される任意のアレイを処理できます。ただし、このオプションを使用する前に、シンクコネクターおよびその他のダウンストリームコンシューマーが複数のデータ型を含むアレイを処理できることを確認します。

12.6.5. MongoDB イベントメッセージでネストされた構造をフラット化

データベース操作に埋め込みドキュメントが含まれる場合、Debezium MongoDB コネクターは、元のドキュメントの階層構造を反映する構造を持つ Kafka イベントレコードを出力します。つまり、イベントメッセージはネストされたドキュメントをネストされたフィールド構造のセットとして表します。ダウンストリームコネクターがネストされた構造を含むメッセージを処理できない環境では、イベントフラット化 SMT を設定して、メッセージ内の階層構造をフラット化できます。フラットなメッセージ構造は、テーブルのようなストレージに適しています。

SMT がネストされた構造をフラット化するように設定するには、flat ing.struct 設定オプションを true に設定します。変換されたメッセージでは、フィールド名がドキュメントソースと一致するよう構築されます。SMT は、親ドキュメントフィールドの名前をネストされたドキュメントフィールドの名前と連結して、各フラット化フィールドの名前を変更します。flatten.struct.delimiter オプションで定義される区切り文字は、名前のコンポーネントを区切ります。struct.delimiter のデフォルト値はアンダースコア(_)です。

以下の例は、SMT がネストされた構造をフラット化するかどうかを指定する設定を示しています。

transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
transforms.unwrap.flatten.struct=<true|false>
transforms.unwrap.flatten.struct.delimiter=<string>

以下の例は、MongoDB コネクターによって出力されるイベントメッセージを示しています。このメッセージには、b および c の 2 つ ネストされたドキュメントのフィールドが含まれるドキュメントのフィールドが含まれます。

{
    "_id": 1,
    "a": {
            "b": 1,
            "c": "none"
    },
    "d": 100
}

以下の例のメッセージは、MongoDB の SMT からの出力を示しています。上記のメッセージでネストされた構造をフラット化します。

{
    "_id": 1,
    "a_b": 1,
    "a_c": "none",
    "d": 100
}

結果メッセージでは、元のメッセージでネスト化されていた b フィールドおよび c フィールドはフラット化され、名前が変更されます。名前が変更されたフィールドは、親ドキュメントの名前 a _b および a_c を連結して形成されます。新しいフィールド名のコンポーネントは、struct.delimiter 設定プロパティーの設定で定義されているようにアンダースコアで区切られます。

12.6.6. Debezium MongoDB コネクターが $unset 操作によって削除されたフィールドの名前を報告する方法

MongoDB では、$unset 演算子と $rename Operator の両方がドキュメントからフィールドを削除します。MongoDB コレクションはスキーマレスであるため、更新後にドキュメントからフィールドを削除すると、更新されたドキュメントから不足しているフィールド名を推測することはできません。削除されたフィールドに関する情報を必要とする可能性のあるシンクコネクターやその他のコンシューマーをサポートするために、Debezium は削除されたフィールドの名前を一覧表示する removedFields 要素を含む更新メッセージを出力します。

以下の例は、のフィールドを削除する操作の更新メッセージの一部を示しています。

"payload": {
  "op": "u",
  "ts_ms": "...",
  "before": "{ ... }",
  "after": "{ ... }",
  "updateDescription": {
    "removedFields": ["a"],
    "updatedFields": null,
    "truncatedArrays": null
  }
}

上記の例では、before および after はドキュメントの更新前後のソースドキュメントの状態を表します。これらのフィールドは、以下のリストで説明されているようにコネクターの capture.mode が設定されている場合にのみコネクターが出力するイベントメッセージに存在します。

before フィールド

変更前のドキュメントの状態を提供します。このフィールドは、capture.mode が以下のいずれかの値に設定されている場合にのみ表示されます。

  • change_streams_with_pre_image
  • change_streams_update_full_with_pre_image.
After フィールド

変更後のドキュメントの完全な状態を提供します。このフィールドは、capture.mode が以下のいずれかの値に設定されている場合にのみ表示されます。

  • change_streams_update_full
  • change_streams_update_full_with_pre_image.

完全なドキュメントをキャプチャーするように設定されたコネクターを想定し、ExtractNewDocumentState SMT が $unset イベントの 更新 メッセージを受信すると、SMT は、以下の例のように、削除されたフィールドに null 値があることを表すことでメッセージをエンコードします。

{
    "id": 1,
    "a": null
}

完全なドキュメントをキャプチャーするように設定されていないコネクターの場合、SMT が $unset 操作の更新イベントを受信すると、以下の出力メッセージが生成されます。

{
   "a": null
}

12.6.7. 元のデータベース操作のタイプの決定

SMT がイベントメッセージをフラット化した後、生成されるメッセージは、イベントを生成した操作が createupdate、または initial snapshot read であるかを示すことはなくなりました。通常、削除操作は、削除に付随する tombstone または rewrite イベントに関する情報を公開するようにコネクターを設定することで特定できます。廃棄(tombstone)に関する情報を公開し、イベントメッセージに書き換えるようコネクターを設定する方法は、drop.tombstones および delete.handling.mode プロパティーを参照してください。

イベントメッセージでデータベース操作のタイプを報告するために、SMT は op フィールドを以下の要素のいずれかに追加できます。

  • イベントメッセージのボディー。
  • メッセージヘッダー。

たとえば、元の操作のタイプを示す header プロパティーを追加するには、以下の例のように変換を追加し、add.headers プロパティーをコネクター設定に追加します。

transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
transforms.unwrap.add.headers=op

上記の設定に基づいて、SMT は op ヘッダーをメッセージに追加し、文字列の値を割り当てて操作のタイプを特定することでイベントタイプを報告します。割り当てられた文字列の値は、元の MongoDB 変更イベントメッセージop フィールドの値に基づいています。

12.6.8. MongoDB イベントフラット化 SMT を使用して Debezium メタデータを Kafka レコードに追加

MongoDB のイベントフラット化 SMT は、元の変更イベントメッセージから簡素化されたメッセージにメタデータフィールドを追加できます。追加されたメタデータフィールドには、二重アンダースコア(__)が接頭辞として付けられます。イベントレコードにメタデータを追加すると、変更イベントが発生したコレクションの名前などのコンテンツや、レプリカセット名などのコネクター固有のフィールドを含めることができます。現在、SMT は、sourcetransaction、および updateDescription の変更イベントのサブ構造のフィールドのみを追加できます。

MongoDB 変更イベントの構造に関する詳細は、MongoDB コネクターのドキュメント を参照してください。

たとえば、以下の設定を指定して、レプリカセット名(rs)と変更イベントのコレクション名を最終的なフラット化イベントレコードに追加します。

transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
transforms.unwrap.add.fields=rs,collection

上記の設定により、次のコンテンツがフラット化されたレコードに追加されます。

{ "__rs" : "rs0", "__collection" : "my-collection", ... }

SMT でメタデータフィールドを追加してイベントを削除する場合は、 delete.handling.mode オプションの値を rewrite に設定します。

12.6.9. MongoDB 抽出の新しいドキュメント状態変換を選択的に適用するオプション

データベースの変更が発生したときに Debezium コネクターが出力する変更イベントメッセージの他に、コネクターはハートビートメッセージなど、他のタイプのメッセージとスキーマ変更およびトランザクションに関するメタデータメッセージも出力します。これらの他のメッセージの構造は、SMT が処理するように設計された変更イベントメッセージの構造とは異なるため、目的のデータ変更メッセージのみを処理するようにコネクターを SMT を選んで適用することが推奨されます。

SMT を選択的に適用する方法の詳細は、変換用の SMT 述語の設定 を参照し てください。

12.6.10. MongoDB の Debezium イベントフラット化変換の設定オプション

以下の表は、MongoDB イベントフラット化 SMT の設定オプションを示しています。

プロパティーデフォルト説明

array.encoding

array

SMT が元のイベントメッセージから読み取るアレイをエンコードする際に使用する形式を指定します。以下のオプションのいずれかを設定します。

array
SMT は array データ型を使用して、MongoDB アレイを Apache Kafka Connect または Apache Avro スキーマと互換性のある形式に変換します。このオプションを設定する場合は、各アレイインスタンスの要素が同じタイプであることを確認します。MongoDB では複数のデータ型を含めることができますが、ダウンストリームクライアントによってはアレイを処理できません。
document
SMT は、BSON シリアル化と同様の方法で、各 MongoDB 配列を 構造体 に変換しますメイン 構造に は、_0_1_2 などの名前のフィールドが含まれます。Avro の命名標準に準拠するために、SMT は各インデックスフィールドの数値名の前にアンダースコアを付けます。各数値フィールド名は、元の配列内の要素のインデックスを表します。SMT は、これらの各インデックスフィールドに、指定された配列要素のソースドキュメントから取得する値を入力します。

array.coding オプションの詳細は、MongoDB イベントメッセージでアレイをエンコードするためのオプション を参照してください。

flatten.struct

false

SMT は、メッセージ内のネストされたプロパティーの名前を設定可能な区切り文字で連結して、単純なフィールド名を形成することで、元のイベントメッセージの構造(structs)をフラット化します。

flatten.struct.delimiter

_

flatten.structtrue に設定されている場合、出力レコードにフィールド名を生成するために、入力レコードから連結したフィールド名間で変換が挿入される区切り文字を指定します。

drop.tombstones

true

Debezium は、削除 操作ごとに廃棄レコードを生成します。デフォルト動作では、イベントフラット化 SMT はストリームからトゥームストーンレコードを削除します。ストリームで tombstone レコードを保持するには、drop.tombstones=false を指定します。

delete.handling.mode

drop

SMT が 削除 操作用に生成する変更イベントレコードを処理する方法を指定します。以下のオプションのいずれかを設定します。

drop
SMT は、イベントストリームから 削除 操作のレコードを削除します。
none
SMT は、イベントストリームからの元の変更イベントレコードを保持します。レコードには "value": "null" のみが含まれています。
rewrite
SMT は、ストリームからの変更イベントレコードの変更バージョンを保持します。レコードが削除されたことを示す別の方法を提供するために、変更されたレコードには、元のレコードから送信されたキー/値のペアが含まれる値フィールドが含まれ、__deleted: true に追加します。

rewrite オプションを設定すると、削除されたレコードを追跡するのに、DELETE 操作の更新された簡素化されたレコードが十分にある可能性があります。このような場合には、SMT で 廃棄(tombstone)レコード を削除しなければならない場合があります。

add.headers.prefix

__ (double-underscore)

このオプションの文字列を設定して、ヘッダーに接頭辞を設定します。

add.headers

デフォルトなし

SMT が簡略化されたメッセージのヘッダーに追加するメタデータフィールドのコンマ区切りリストを指定します。元のメッセージに重複したフィールド名が含まれている場合は、フィールドの名前とともに構造体の名前( source.ts_ms など)を指定して、変更する特定のフィールドを特定できます。

必要に応じて、以下の形式のエントリーをリストに追加することで、フィールドの元の名前を上書きして新しい名前を割り当てることができます。

<field_name>:<new_field_name>.

以下はその例です。

version:VERSION, connector:CONNECTOR, source.ts_ms:EVENT_TIMESTAMP

指定する新しい名前値は、大文字と小文字が区別されます。

SMT が簡素化したメッセージのヘッダーにメタデータフィールドを追加すると、各メタデータフィールド名の前にダブルアンダースコアが追加されます。構造体の指定に関して、SMT は構造体名とフィールド名の間にもアンダースコアを挿入します。

変更イベントの元のメッセージにないフィールドを指定すると、SMT はヘッダーに フィールドを追加しません。

add.fields.prefix

__ (double-underscore)

フィールド名の前に付けるオプションの文字列を指定します。

add.fields

デフォルトなし

このオプションをメタデータフィールドのコンマ区切りリスト(スペースなし)に設定し、簡素化された Kafka メッセージの value 要素に追加します。元のメッセージに重複したフィールド名が含まれている場合は、フィールドの名前とともに構造体の名前( source.ts_ms など)を指定して、変更する特定のフィールドを特定できます。
必要に応じて、以下の形式のエントリーをリストに追加することで、フィールドの元の名前を上書きして新しい名前を割り当てることができます。

<field_name>:<new_field_name>.

以下はその例です。

version:VERSION, connector:CONNECTOR, source.ts_ms:EVENT_TIMESTAMP

指定する新しい名前値は、大文字と小文字が区別されます。

SMT が簡素化したメッセージの value 要素にメタデータフィールドを追加すると、各メタデータフィールド名の前にダブルアンダースコアが追加されます。構造体の指定に関して、SMT は構造体名とフィールド名の間にもアンダースコアを挿入します。

元の変更イベントメッセージに存在しないフィールドを指定した場合でも、SMT は指定されたフィールドを変更されたメッセージの value 要素に追加します。

sanitize.field.names

false

Avro の命名要件に準拠するためにフィールド名がサニタイズされるかどうか。詳細は Avro の命名 を参照してください。

既知の制限

  • MongoDB はスキーマレスデータベースであるため、Debezium を使用してスキーマベースのデータリレーショナルデータベースに変更をストリーミングする場合に一貫した列定義を確保するため、同じ名前のコレクション内のフィールドが同じタイプのデータを保存する必要があります。
  • SMT を設定して、シンクコネクターと互換性のある形式でメッセージを生成します。シンクコネクターにフラットメッセージ構造が必要で、ソース MongoDB ドキュメントの配列を構造体としてエンコードするメッセージを受信する場合、シンクコネクターはメッセージを処理できません。
Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.