12.6. Debezium MongoDB 変更イベントから状態 後 のソースドキュメントの抽出
Debezium MongoDB コネクターは、MongoDB コレクションで発生する各操作を表すデータ変更メッセージを出力します。これらのイベントメッセージの複雑な構造は、元のデータベースイベントの詳細を表します。ただし、ダウンストリームのコンシューマーの中には、メッセージを元の形式で処理できない場合があります。たとえば、データコレクションでネストされたドキュメントを表すために、コネクターはネストされたフィールドが含まれる形式でイベントメッセージを出力します。シンクコネクター、または元のメッセージの階層形式を処理できない他のコンシューマーをサポートするには、Debezium MongoDB イベントフラット化(ExtractNewDocumentState)の単一メッセージ変換(SMT)を使用できます。SMT は元のメッセージの構造を簡素化し、データの処理を容易にする他の方法でメッセージを変更できます。
イベントフラット化変換は Kafka Connect SMT です。
本章の情報では、Debezium MongoDB コネクターのイベントフラット化単一メッセージ変換(SMT)のみを説明します。リレーショナルデータベースで使用する同等の SMT の詳細は、New Record State Extraction SMT のドキュメント を参照してください。
詳細は以下のセクションを参照してください。
- 「Debezium MongoDB 変更イベントの構造の説明」
- 「Debezium MongoDB イベントフラット化変換の動作」
- 「Debezium MongoDB イベントフラット化変換の設定」
- 「MongoDB イベントメッセージでアレイをエンコードするためのオプション」
- 「MongoDB イベントメッセージでネストされた構造をフラット化」
-
「Debezium MongoDB コネクターが
$unset
操作によって削除されたフィールドの名前を報告する方法」 - 「元のデータベース操作のタイプの決定」
- 「MongoDB イベントフラット化 SMT を使用して Debezium メタデータを Kafka レコードに追加」
- 「MongoDB 抽出の新しいドキュメント状態変換を選択的に適用するオプション」
- 「MongoDB の Debezium イベントフラット化変換の設定オプション」
- 既知の制限
12.6.1. Debezium MongoDB 変更イベントの構造の説明
Debezium MongoDB コネクターは、複雑な構造を持つ変更イベントを生成します。各イベントメッセージには、以下の部分が含まれます。
- ソースメタデータ
以下のフィールドが含まれますが、これに限定されません。
- コレクションでデータが変更された操作のタイプ(作成/挿入、更新、または削除)。
- 変更が発生したデータベースおよびコレクションの名前。
- 変更が加えられたタイミングを識別するタイムスタンプ。
- オプションのトランザクション情報。
- ドキュメントデータ
- データの
前
このフィールドは、Debezium コネクターの
capture.mode
が以下のいずれかの値に設定されている場合に MongoDB 6.0 以降を実行する環境に存在します。-
change_streams_with_pre_image
. change_streams_update_full_with_pre_image
.詳細は、MongoDB pre-image supportを参照してください。
-
After
data現在の操作の後にドキュメントに存在する値を表す JSON 文字列。イベントメッセージに
after
フィールドが存在するかどうかは、イベントの種類とコネクター設定によって異なります。MongoDB挿入
操作の作成
イベントには、capture.mode
設定に関係なく、常にafter
フィールドが含まれます。更新
イベントの場合、after
フィールドはcapture.mode
が以下のいずれかの値に設定されている場合にのみ存在します。-
change_streams_update_full
change_streams_update_full_with_pre_image
.注記変更イベントメッセージの
after
値は、イベント直後にあるドキュメントの状態を表すとは限りません。値は動的に計算されません。代わりに、コネクターが変更イベントをキャプチャーした後、コレクションをクエリーしてドキュメントの現在の値を取得します。たとえば、複数の操作、
b
、およびc
がドキュメントを
すばやく連続して変更する状況を想像してみてください。コネクターが を処理すると
、完全なドキュメントのコレクションをクエリーします。その間、b
とc
の変更が発生します。コネクターが を変更するための完全なドキュメントに対する応答を受信すると、b
またはc
の後続の変更を基にしたドキュメントのバージョンを受け取る可能性があります。詳細は、
capture.mode
プロパティーのドキュメントを参照してください。
-
- データの
以下のフラグメントは、MongoDB の 挿入
操作後にコネクターが発行する create
変更イベントの基本構造を示しています。
{ "op": "c", "after": "{\"field1\":\"newvalue1\",\"field2\":\"newvalue1\"}", "source": { ... } }
上記の例の after
フィールドの複雑な形式は、ソースデータベースで発生する変更に関する詳細情報を提供します。ただし、一部のコンシューマーは、ネストされた値が含まれるメッセージを処理できません。元のメッセージの複雑なネストされたフィールドを、より汎用的な互換性のある構造に変換するには、MongoDB のイベントフラット化 SMT を使用します。SMT は、以下の例のようにメッセージ内のネストされたフィールドの構造をフラット化します。
{ "field1" : "newvalue1", "field2" : "newvalue2" }
Debezium MongoDB コネクターによって生成されたメッセージのデフォルト構造の詳細は、コネクターの ドキュメント を参照してください。
12.6.2. Debezium MongoDB イベントフラット化変換の動作
MongoDB のイベントフラット化 SMT は、Debezium MongoDB コネクターによって出力される変更イベントメッセージの 作成
または 更新
から after
フィールドを抽出します。SMT が元の変更イベントメッセージを処理すると、after
フィールドの内容のみが含まれる簡略化されたバージョンが生成されます。
ユースケースに応じて、ExtractNewDocumentState SMT を Debezium MongoDB コネクターまたは Debezium コネクターが生成するメッセージを消費するシンクコネクターに適用できます。SMT を Debezium MongoDB コネクターに適用すると、SMT はコネクターが Apache Kafka に送信する前に出力するメッセージを変更します。Kafka が完全な Debezium 変更イベントメッセージを元の形式で保持するには、SMT をシンクコネクターに適用します。
イベントフラット化 SMT を使用して MongoDB コネクターから出力されるメッセージを処理する場合、SMT は元のメッセージのレコードの構造を、通常のシンクコネクターが使用できる適切な型指定された Kafka Connect レコードに変換します。たとえば、SMT は元のメッセージの after
情報を表す JSON 文字列を、任意のコンシューマーが処理できるスキーマ構造に変換します。
オプションとして、MongoDB のイベントフラット化 SMT を設定して、処理中に他の方法でメッセージを変更できます。詳細は、設定トピック を参照してください。
12.6.3. Debezium MongoDB イベントフラット化変換の設定
Debezium MongoDB コネクターによって出力されるメッセージを消費するシンクコネクターの MongoDB のイベントフラット化(ExtractNewDocumentState) SMT を設定します。
詳細は以下のセクションを参照してください。
- 「例:Debezium MongoDB イベント flattening-transformation の基本設定」
- 「MongoDB イベントメッセージでアレイをエンコードするためのオプション」
- 「MongoDB イベントメッセージでネストされた構造をフラット化」
-
「Debezium MongoDB コネクターが
$unset
操作によって削除されたフィールドの名前を報告する方法」 - 「元のデータベース操作のタイプの決定」
- 「MongoDB イベントフラット化 SMT を使用して Debezium メタデータを Kafka レコードに追加」
- 「MongoDB 抽出の新しいドキュメント状態変換を選択的に適用するオプション」
- 「MongoDB の Debezium イベントフラット化変換の設定オプション」
12.6.3.1. 例:Debezium MongoDB イベント flattening-transformation の基本設定
SMT のデフォルト動作を取得するには、以下の例のように、オプションを指定せずにシンクコネクターの設定に SMT を追加します。
transforms=unwrap,... transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
Kafka Connect コネクター設定と同様に、transforms=
を複数のコンマ区切りの SMT エイリアスに設定できます。Kafka Connect は、リスト順で指定した変換を適用します。
MongoDB イベントフラット化 SMT を使用するコネクターに複数のオプションを設定できます。以下の例は、コネクターの drop.tombstones
、delete.handling.mode
、および add.headers
オプションを設定する設定を示しています。
transforms=unwrap,... transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState transforms.unwrap.drop.tombstones=false transforms.unwrap.delete.handling.mode=drop transforms.unwrap.add.headers=op
前の例の設定オプションの詳細については、設定トピック を参照してください。
設定のカスタマイズ
コネクターは、多くの種類のイベントメッセージ (ハートビートメッセージ、廃棄メッセージ、またはトランザクションの変更に関するメタデータメッセージ) を発行する場合があります。イベントのサブセットに変換を適用するには、特定のイベントのみに 変換を選択的に適用する SMT 述語ステートメント を定義できます。
12.6.4. MongoDB イベントメッセージでアレイをエンコードするためのオプション
デフォルトでは、イベントフラット化 SMT は MongoDB アレイを Apache Kafka Connect または Apache Avro スキーマと互換性のあるアレイに変換します。MongoDB アレイには複数のタイプの要素を含めることができますが、Kafka 配列のすべての要素が同じタイプである必要があります。
SMT が環境のニーズを満たす方法でアレイをエンコードするようにするには、array.encoding
設定オプションを指定できます。以下の例は、アレイエンコーディングを設定するための設定を示しています。
transforms=unwrap,... transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState transforms.unwrap.array.encoding=<array|document>
設定に応じて、SMT は以下のエンコーディング方法のいずれかを使用してソースメッセージの配列の各インスタンスを処理します。
- array encoding
-
array.encoding
がarray
(デフォルト)に設定されている場合、SMT エンコードは配列
データ型を使用して元のメッセージにアレイをエンコードします。正しい処理を確保するには、配列インスタンスのすべての要素が同じタイプである必要があります。このオプションは 1 つの制限ですが、ダウンストリームクライアントがアレイを簡単に処理できるようにします。 - ドキュメントのエンコーディング
-
array.encoding
がdocument
に設定されている場合、SMT はソースの各配列を BSON シリアル化 と同様の方法で 構造体 に変換します。メイン 構造に は、_0
、_1
、_2
などのフィールドが含まれます。各フィールド名は、元の配列内の要素のインデックスを表します。SMT は、これらの各インデックスフィールドに、ソースアレイ内の同等の要素を取得する値を投入します。Avro エンコーディングでは数字で始まるフィールド名を禁止するため、インデックス名にはアンダースコアが付けられます。
以下の例は、Debezium MongoDB コネクターが異種データ型を含む配列が含まれるデータベースドキュメントを表す方法を示しています。
例12.1 例:複数のデータ型を含むアレイの文書化エンコーディング
{ "_id": 1, "a1": [ { "a": 1, "b": "none" }, { "a": "c", "d": "something" } ] }
array.encoding
が document
に設定されている場合、SMT は前述のドキュメントを以下の形式に変換します。
{ "_id": 1, "a1": { "_0": { "a": 1, "b": "none" }, "_1": { "a": "c", "d": "something" } } }
document
エンコーディングオプションを使用すると、SMT は異種要素で設定される任意のアレイを処理できます。ただし、このオプションを使用する前に、シンクコネクターおよびその他のダウンストリームコンシューマーが複数のデータ型を含むアレイを処理できることを確認します。
12.6.5. MongoDB イベントメッセージでネストされた構造をフラット化
データベース操作に埋め込みドキュメントが含まれる場合、Debezium MongoDB コネクターは、元のドキュメントの階層構造を反映する構造を持つ Kafka イベントレコードを出力します。つまり、イベントメッセージはネストされたドキュメントをネストされたフィールド構造のセットとして表します。ダウンストリームコネクターがネストされた構造を含むメッセージを処理できない環境では、イベントフラット化 SMT を設定して、メッセージ内の階層構造をフラット化できます。フラットなメッセージ構造は、テーブルのようなストレージに適しています。
SMT がネストされた構造をフラット化するように設定するには、flat ing.struct
設定オプションを true
に設定します。変換されたメッセージでは、フィールド名がドキュメントソースと一致するよう構築されます。SMT は、親ドキュメントフィールドの名前をネストされたドキュメントフィールドの名前と連結して、各フラット化フィールドの名前を変更します。flatten.struct.delimiter
オプションで定義される区切り文字は、名前のコンポーネントを区切ります。struct.delimiter
のデフォルト値はアンダースコア(_
)です。
以下の例は、SMT がネストされた構造をフラット化するかどうかを指定する設定を示しています。
transforms=unwrap,... transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState transforms.unwrap.flatten.struct=<true|false> transforms.unwrap.flatten.struct.delimiter=<string>
以下の例は、MongoDB コネクターによって出力されるイベントメッセージを示しています。このメッセージには、b
および c
の 2 つ の
ネストされたドキュメントのフィールドが含まれるドキュメントのフィールドが含まれます。
{ "_id": 1, "a": { "b": 1, "c": "none" }, "d": 100 }
以下の例のメッセージは、MongoDB の SMT からの出力を示しています。上記のメッセージでネストされた構造をフラット化します。
{ "_id": 1, "a_b": 1, "a_c": "none", "d": 100 }
結果メッセージでは、元のメッセージでネスト化されていた b
フィールドおよび c
フィールドはフラット化され、名前が変更されます。名前が変更されたフィールドは、親ドキュメントの名前
および a
_ba_c
を連結して形成されます。新しいフィールド名のコンポーネントは、struct.delimiter
設定プロパティーの設定で定義されているようにアンダースコアで区切られます。
12.6.6. Debezium MongoDB コネクターが $unset
操作によって削除されたフィールドの名前を報告する方法
MongoDB では、$unset
演算子と $rename
Operator の両方がドキュメントからフィールドを削除します。MongoDB コレクションはスキーマレスであるため、更新後にドキュメントからフィールドを削除すると、更新されたドキュメントから不足しているフィールド名を推測することはできません。削除されたフィールドに関する情報を必要とする可能性のあるシンクコネクターやその他のコンシューマーをサポートするために、Debezium は削除されたフィールドの名前を一覧表示する removedFields
要素を含む更新メッセージを出力します。
以下の例は、のフィールドを削除する操作の更新メッセージの一部を示しています。
"payload": { "op": "u", "ts_ms": "...", "before": "{ ... }", "after": "{ ... }", "updateDescription": { "removedFields": ["a"], "updatedFields": null, "truncatedArrays": null } }
上記の例では、before
および after
はドキュメントの更新前後のソースドキュメントの状態を表します。これらのフィールドは、以下のリストで説明されているようにコネクターの capture.mode
が設定されている場合にのみコネクターが出力するイベントメッセージに存在します。
before
フィールド変更前のドキュメントの状態を提供します。このフィールドは、
capture.mode
が以下のいずれかの値に設定されている場合にのみ表示されます。-
change_streams_with_pre_image
-
change_streams_update_full_with_pre_image
.
-
After
フィールド変更後のドキュメントの完全な状態を提供します。このフィールドは、
capture.mode
が以下のいずれかの値に設定されている場合にのみ表示されます。-
change_streams_update_full
-
change_streams_update_full_with_pre_image
.
-
完全なドキュメントをキャプチャーするように設定されたコネクターを想定し、ExtractNewDocumentState
SMT が $unset
イベントの 更新
メッセージを受信すると、SMT は、以下の例のように、削除されたフィールドに null
値があることを表すことでメッセージをエンコードします。
{ "id": 1, "a": null }
完全なドキュメントをキャプチャーするように設定されていないコネクターの場合、SMT が $unset
操作の更新イベントを受信すると、以下の出力メッセージが生成されます。
{ "a": null }
12.6.7. 元のデータベース操作のタイプの決定
SMT がイベントメッセージをフラット化した後、生成されるメッセージは、イベントを生成した操作が create
、update
、または initial snapshot read
であるかを示すことはなくなりました。通常、削除操作は、削除に付随する tombstone または rewrite イベントに関する情報を公開するようにコネクターを設定することで特定できます。廃棄(tombstone)に関する情報を公開し、イベントメッセージに書き換えるようコネクターを設定する方法は、
drop.tombstones
および delete.handling.mode
プロパティーを参照してください。
イベントメッセージでデータベース操作のタイプを報告するために、SMT は op
フィールドを以下の要素のいずれかに追加できます。
- イベントメッセージのボディー。
- メッセージヘッダー。
たとえば、元の操作のタイプを示す header プロパティーを追加するには、以下の例のように変換を追加し、add.headers
プロパティーをコネクター設定に追加します。
transforms=unwrap,... transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState transforms.unwrap.add.headers=op
上記の設定に基づいて、SMT は op
ヘッダーをメッセージに追加し、文字列の値を割り当てて操作のタイプを特定することでイベントタイプを報告します。割り当てられた文字列の値は、元の MongoDB 変更イベントメッセージ の op
フィールドの値に基づいています。
12.6.8. MongoDB イベントフラット化 SMT を使用して Debezium メタデータを Kafka レコードに追加
MongoDB のイベントフラット化 SMT は、元の変更イベントメッセージから簡素化されたメッセージにメタデータフィールドを追加できます。追加されたメタデータフィールドには、二重アンダースコア(__)が接頭辞として付けられ
ます。イベントレコードにメタデータを追加すると、変更イベントが発生したコレクションの名前などのコンテンツや、レプリカセット名などのコネクター固有のフィールドを含めることができます。現在、SMT は、source
、transaction
、および updateDescription
の変更イベントのサブ構造のフィールドのみを追加できます。
MongoDB 変更イベントの構造に関する詳細は、MongoDB コネクターのドキュメント を参照してください。
たとえば、以下の設定を指定して、レプリカセット名(rs
)と変更イベントのコレクション名を最終的なフラット化イベントレコードに追加します。
transforms=unwrap,... transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState transforms.unwrap.add.fields=rs,collection
上記の設定により、次のコンテンツがフラット化されたレコードに追加されます。
{ "__rs" : "rs0", "__collection" : "my-collection", ... }
SMT でメタデータフィールドを追加してイベントを削除する場合は、
オプションの値を delete
.handling.moderewrite
に設定します。
12.6.9. MongoDB 抽出の新しいドキュメント状態変換を選択的に適用するオプション
データベースの変更が発生したときに Debezium コネクターが出力する変更イベントメッセージの他に、コネクターはハートビートメッセージなど、他のタイプのメッセージとスキーマ変更およびトランザクションに関するメタデータメッセージも出力します。これらの他のメッセージの構造は、SMT が処理するように設計された変更イベントメッセージの構造とは異なるため、目的のデータ変更メッセージのみを処理するようにコネクターを SMT を選んで適用することが推奨されます。
SMT を選択的に適用する方法の詳細は、変換用の SMT 述語の設定 を参照し てください。
12.6.10. MongoDB の Debezium イベントフラット化変換の設定オプション
以下の表は、MongoDB イベントフラット化 SMT の設定オプションを示しています。
プロパティー | デフォルト | 説明 |
---|---|---|
| SMT が元のイベントメッセージから読み取るアレイをエンコードする際に使用する形式を指定します。以下のオプションのいずれかを設定します。
| |
| SMT は、メッセージ内のネストされたプロパティーの名前を設定可能な区切り文字で連結して、単純なフィールド名を形成することで、元のイベントメッセージの構造(structs)をフラット化します。 | |
|
| |
|
Debezium は、 | |
|
SMT が
| |
__ (double-underscore) | このオプションの文字列を設定して、ヘッダーに接頭辞を設定します。 | |
デフォルトなし |
SMT が簡略化されたメッセージのヘッダーに追加するメタデータフィールドのコンマ区切りリストを指定します。元のメッセージに重複したフィールド名が含まれている場合は、フィールドの名前とともに構造体の名前(
必要に応じて、以下の形式のエントリーをリストに追加することで、フィールドの元の名前を上書きして新しい名前を割り当てることができます。
以下はその例です。 version:VERSION, connector:CONNECTOR, source.ts_ms:EVENT_TIMESTAMP
指定する新しい名前値は、大文字と小文字が区別されます。 | |
__ (double-underscore) | フィールド名の前に付けるオプションの文字列を指定します。 | |
デフォルトなし |
このオプションをメタデータフィールドのコンマ区切りリスト(スペースなし)に設定し、簡素化された Kafka メッセージの
以下はその例です。 version:VERSION, connector:CONNECTOR, source.ts_ms:EVENT_TIMESTAMP
指定する新しい名前値は、大文字と小文字が区別されます。
SMT が簡素化したメッセージの | |
| Avro の命名要件に準拠するためにフィールド名がサニタイズされるかどうか。詳細は Avro の命名 を参照してください。 |
既知の制限
- MongoDB はスキーマレスデータベースであるため、Debezium を使用してスキーマベースのデータリレーショナルデータベースに変更をストリーミングする場合に一貫した列定義を確保するため、同じ名前のコレクション内のフィールドが同じタイプのデータを保存する必要があります。
- SMT を設定して、シンクコネクターと互換性のある形式でメッセージを生成します。シンクコネクターにフラットメッセージ構造が必要で、ソース MongoDB ドキュメントの配列を構造体としてエンコードするメッセージを受信する場合、シンクコネクターはメッセージを処理できません。