12.8. Debezium MongoDB 変更イベントから after 状態のソースドキュメント抽出
Debezium MongoDB コネクターは、MongoDB コレクションで発生する各操作を表すデータ変更メッセージを出力します。これらのイベントメッセージの複雑な構造は、元のデータベースイベントの詳細を忠実に表しています。ただし、ダウンストリームのコンシューマーの中には、メッセージを元の形式で処理できない場合があります。たとえば、データコレクション内のネストされたドキュメントを表すために、コネクターは、ネストされたフィールドを含む形式でイベントメッセージを出力します。sink コネクター、または元のメッセージの階層形式を処理できない他のコンシューマーをサポートするには、Debezium MongoDB イベントフラット化 (ExtractNewDocumentState) の単一メッセージ変換 (SMT) を使用できます。SMT は元のメッセージの構造を簡素化し、他の方法でメッセージを変更してデータを処理しやすくすることができます。
イベントフラット化変換は Kafka Connect SMT です。
この章では、Debezium MongoDB ベースのデータベースコネクターのイベントフラット化シングルメッセージ変換 (SMT) について説明します。リレーショナルデータベースで使用する同等の SMT の詳細は、新記録の状態抽出 SMT のドキュメント を参照してください。
詳細は以下のセクションを参照してください。
- 「Debezium MongoDB 変更イベントの構造の説明」
- 「Debezium MongoDB イベントフラット化変換の動作」
- 「Debezium MongoDB イベントフラット化変換の設定」
- 「MongoDB イベントメッセージで配列をエンコードするためのオプション」
- 「MongoDB イベントメッセージでネストされた構造のフラット化」
-
「Debezium MongoDB コネクターが
$unset
操作によって削除されたフィールドの名前を報告する方法」 - 「元のデータベース操作のタイプの判別」
- 「MongoDB イベントフラット化 SMT を使用した Debezium メタデータの Kafka レコード追加」
- 「MongoDB 抽出の新しいドキュメント状態変換を選択して適用するオプション」
- 「MongoDB の Debezium イベントフラット化変換の設定オプション」
- 既知の制限
12.8.1. Debezium MongoDB 変更イベントの構造の説明
Debezium MongoDB コネクターは、構造が複雑な変更イベントを生成します。各イベントメッセージには、以下の部分が含まれます。
- ソースメタデータ
以下のフィールドが含まれますが、これに限定されません。
- コレクション内のデータを変更した操作のタイプ (作成/挿入、更新、または削除)。
- 変更が発生したデータベースおよびコレクションの名前。
- 変更が加えられたタイミングを識別するタイムスタンプ。
- (任意の項目) トランザクション情報
- ドキュメントデータ
before
データこのフィールドは、Debezium コネクターの
capture.mode
が以下のいずれかの値に設定されている場合に MongoDB 6.0 以降を実行する環境に存在します。-
change_streams_with_pre_image
. change_streams_update_full_with_pre_image
.詳細は、MongoDB プレイメージサポート を参照してください。
-
After
データ現在の操作後にドキュメントに存在する値を表す JSON 文字列。イベントメッセージに
after
フィールドが存在するかどうかは、イベントの種類とコネクター設定によって異なります。MongoDBinsert
操作のcreate
イベントには、capture.mode
設定に関係なく、常にafter
フィールドが含まれます。update
イベントの場合、after
フィールドはcapture.mode
が以下のいずれかの値に設定されている場合にのみ存在します。-
change_streams_update_full
change_streams_update_full_with_pre_image
.注記変更イベントメッセージの
after
値は、必ずしもイベント直後のドキュメントの状態を表すとは限りません。値は動的に計算されず、コネクターが変更イベントをキャプチャーした後、コレクションをクエリーしてドキュメントの現在の値を取得します。たとえば、複数の操作
a
、b
、およびc
がドキュメントを立て続けに変更する状況を想像してみてください。コネクターが変更a
を処理するときに、全ドキュメントのコレクションをクエリーします。その間にb
とc
の変更が発生します。コネクターが変更a
に関する全ドキュメントのクエリーに対する応答を受信すると、後に続くb
またはc
の変更をもとにしたドキュメントのバージョンを受信する可能性があります。詳細は、capture.mode
プロパティーのドキュメントを参照してください。
-
以下のフラグメントは、MongoDB の insert
操作後にコネクターが発行する create
変更イベントの基本構造を示しています。
{ "op": "c", "after": "{\"field1\":\"newvalue1\",\"field2\":\"newvalue1\"}", "source": { ... } }
前の例の after
フィールドの複雑な形式では、ソースデータベースで発生した変更に関する詳細情報を提供します。ただし、一部のコンシューマーは、ネストされた値が含まれるメッセージを処理できません。元のメッセージの複雑なネストされたフィールドを、より単純で普遍的、かつ互換性のある構造に変換するには、MongoDB のイベントフラット化 SMT を使用します。SMT は、以下の例のようにメッセージ内のネストされたフィールドの構造をフラット化します。
{ "field1" : "newvalue1", "field2" : "newvalue2" }
Debezium MongoDB コネクターによって生成されたメッセージのデフォルト構造の詳細は、コネクターのドキュメント を参照してください。
12.8.2. Debezium MongoDB イベントフラット化変換の動作
MongoDB のイベントフラット化 SMT は、Debezium MongoDB コネクターによって発行された create
または update
変更イベントメッセージから after
フィールドを抽出します。SMT は、元の変更イベントメッセージを処理した後、after
フィールドの内容のみを含む、単純化バージョンを生成します。
ユースケースに応じて、ExtractNewDocumentState SMT を Debezium MongoDB コネクター、または Debezium コネクターが生成するメッセージを消費する sink コネクターに適用できます。SMT を Debezium MongoDB コネクターに適用すると、SMT はコネクターが Apache Kafka に送信する前に出力するメッセージを変更します。Kafka が完全な Debezium 変更イベントメッセージを元の形式で確実に保持するには、SMT を sink コネクターに適用します。
イベントフラット化 SMT を使用して MongoDB コネクターから送信されたメッセージを処理すると、SMT は元のメッセージのレコードの構造を、通常の sink コネクターで使用できるように、適切に型指定された Kafka Connect レコードに変換します。たとえば、SMT は元のメッセージの after
情報を表す JSON 文字列を、任意のコンシューマーが処理できるスキーマ構造に変換します。
オプションとして、MongoDB のイベントフラット化 SMT を設定して、処理中に他の方法でメッセージを変更できます。詳細は 設定トピックを参照してください。
12.8.3. Debezium MongoDB イベントフラット化変換の設定
Debezium MongoDB コネクターによって発行されたメッセージを消費する sink コネクター用に、MongoDB のイベントフラット化 (ExtractNewDocumentState) SMT を設定します。
詳細は以下のセクションを参照してください。
- 「例: Debezium MongoDB イベントフラット化変換の基本設定」
- 「MongoDB イベントメッセージで配列をエンコードするためのオプション」
- 「MongoDB イベントメッセージでネストされた構造のフラット化」
-
「Debezium MongoDB コネクターが
$unset
操作によって削除されたフィールドの名前を報告する方法」 - 「元のデータベース操作のタイプの判別」
- 「MongoDB イベントフラット化 SMT を使用した Debezium メタデータの Kafka レコード追加」
- 「MongoDB 抽出の新しいドキュメント状態変換を選択して適用するオプション」
- 「MongoDB の Debezium イベントフラット化変換の設定オプション」
12.8.3.1. 例: Debezium MongoDB イベントフラット化変換の基本設定
SMT のデフォルト動作を取得するには、以下の例のように、オプションを指定せずに sink コネクターの設定に SMT を追加します。
transforms=unwrap,... transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
Kafka Connect コネクター設定と同様に、transforms=
を複数のコンマ区切りの SMT エイリアスに設定できます。Kafka Connect は、リスト順で指定した変換を適用します。
MongoDB イベントフラット化 SMT を使用するコネクターに複数のオプションを設定できます。以下の例は、コネクターの drop.tombstones
、delete.handling.mode
、および add.headers
オプションを指定する設定です。
transforms=unwrap,... transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState transforms.unwrap.drop.tombstones=false transforms.unwrap.delete.handling.mode=drop transforms.unwrap.add.headers=op
前の例の設定オプションの詳細は、設定トピック を参照してください。
設定のカスタマイズ
コネクターは、多くの種類のイベントメッセージ (ハートビートメッセージ、廃棄メッセージ、またはトランザクションの変更に関するメタデータメッセージ) を発行する場合があります。イベントのサブセットに変換を適用するには、特定のイベントだけを対象にして 変換を選択して適用する SMT 述語ステートメント を定義できます。
12.8.4. MongoDB イベントメッセージで配列をエンコードするためのオプション
デフォルトでは、イベントフラット化 SMT は MongoDB 配列を Apache Kafka Connect または Apache Avro スキーマと互換性のある配列に変換します。MongoDB 配列には複数のタイプの要素を含めることができますが、Kafka 配列のすべての要素が同じタイプである必要があります。
SMT が環境のニーズを満たす方法で配列をエンコードするようにするには、array.encoding
設定オプションを指定できます。以下の例は、配列エンコーディングを指定するための設定を示しています。
transforms=unwrap,... transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState transforms.unwrap.array.encoding=<array|document>
設定に応じて、SMT は、次のエンコード方法のいずれかを使用して、ソースメッセージ内の配列の各インスタンスを処理します。
- 配列エンコーディング
-
array.encoding
がarray
(デフォルト) に設定されている場合、SMT エンコードはarray
データ型を使用して元のメッセージの配列をエンコードします。正しい処理を確保するには、配列インスタンスのすべての要素が同じタイプである必要があります。このオプションで制限が課されることになりますが、ダウンストリームクライアントが配列を簡単に処理できるようにします。 - ドキュメントのエンコーディング
-
array.encoding
がdocument
に設定されている場合に、BSON シリアル化 と同様の方法で、ソースの各配列を structs の中の struct に変換します。メイン struct には、_0
、_1
、_2
などのフィールドが含まれます。各フィールド名は、元の配列内の要素のインデックスを表します。SMT は、これらの各インデックスフィールドに、ソース配列内の同等の要素に対して取得した値を設定します。Avro エンコーディングでは数字で始まるフィールド名が禁止されているため、インデックス名にはアンダースコアが接頭辞として付けられます。
次の例は、Debezium MongoDB コネクターが、異種データ型で構成される配列を含むデータベースドキュメントを表す方法を示しています。
例12.2 例: 複数のデータ型を含む配列のドキュメントエンコーディング
{ "_id": 1, "a1": [ { "a": 1, "b": "none" }, { "a": "c", "d": "something" } ] }
array.encoding
が document
に設定されている場合、SMT は前述のドキュメントを以下の形式に変換します。
{ "_id": 1, "a1": { "_0": { "a": 1, "b": "none" }, "_1": { "a": "c", "d": "something" } } }
document
エンコーディングオプションを使用すると、SMT は異種要素で設定される任意の配列を処理できます。ただし、このオプションを使用する前に、sink コネクターと他のダウンストリームコンシューマーが複数のデータ型を含む配列を処理できることを常に確認してください。
12.8.5. MongoDB イベントメッセージでネストされた構造のフラット化
データベース操作に埋め込みドキュメントが含まれる場合、Debezium MongoDB コネクターは、元のドキュメントの階層構造を反映する Kafka イベントレコードを発行します。つまり、イベントメッセージは、ネストされたフィールド構造のセットとして、ネスト化ドキュメントを表します。ダウンストリームコネクターがネスト化された構造が含まれるメッセージを処理できない環境では、イベントフラット化 SMT を設定して、メッセージ内の階層構造をフラット化できます。フラットなメッセージ構造は、テーブルのようなストレージに適しています。
ネストされた構造をフラット化するように SMT を設定するには、flatten.struct
設定オプションを true
に設定します。変換されたメッセージでは、フィールド名がドキュメントソースと一致するよう構築されます。SMT は、親ドキュメントフィールドの名前とネストされたドキュメントフィールドの名前を連結して、フラット化された各フィールドの名前を変更します。名前のコンポーネントを区切るには、flatten.struct.delimiter
オプションで定義される区切り文字を使用します。struct.delimiter
のデフォルト値はアンダースコア文字 (_
) です。
次の例は、SMT がネストされた構造をフラット化するかどうかを指定する設定です。
transforms=unwrap,... transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState transforms.unwrap.flatten.struct=<true|false> transforms.unwrap.flatten.struct.delimiter=<string>
以下の例は、MongoDB コネクターが出力するイベントメッセージを示しています。メッセージには、ネストされた 2 つのドキュメント b
と c
のフィールドを含むドキュメント a
のフィールドが含まれます。
{ "_id": 1, "a": { "b": 1, "c": "none" }, "d": 100 }
以下の例のメッセージは、MongoDB の SMT が前のメッセージでネスト化された構造をフラット化した後に表示した出力です。
{ "_id": 1, "a_b": 1, "a_c": "none", "d": 100 }
結果のメッセージでは、元のメッセージでネストされた b
フィールドと c
フィールドがフラット化され、名前が変更されます。名前が変更されたフィールドは、親ドキュメント a
の名前を、ネストされたドキュメントの名前 a_b
および a_c
と連結することによって形成されます。新しいフィールド名のコンポーネントは、struct.delimiter
設定プロパティーの設定で定義されているようにアンダースコアで区切られます。
12.8.6. Debezium MongoDB コネクターが $unset
操作によって削除されたフィールドの名前を報告する方法
MongoDB では、$unset
Operator と $rename
Operator の両方がドキュメントからフィールドを削除します。MongoDB コレクションはスキーマレスであるため、更新でドキュメントからフィールドが削除されると、更新済みドキュメントで抜けているフィールドの名前を推測できません。Debezium は、削除されたフィールドの名前をリストする removedFields
要素を含む更新メッセージを発行して、削除済みフィールドに関する情報を必要とする可能性がある sink コネクターまたはその他のコンシューマーをサポートします。
以下の例は、フィールド a
を削除する操作の更新メッセージの一部を示しています。
"payload": { "op": "u", "ts_ms": "...", "before": "{ ... }", "after": "{ ... }", "updateDescription": { "removedFields": ["a"], "updatedFields": null, "truncatedArrays": null } }
上記の例では、before
および after
はドキュメントが更新される前と後のソースドキュメントの状態を表します。これらのフィールドは、コネクターの capture.mode
が次のリストで説明されているように設定されている場合にのみ、コネクターが発行するイベントメッセージに存在します。
before
フィールド変更前のドキュメントの状態を提供します。このフィールドは、
capture.mode
が以下のいずれかの値に設定されている場合にのみ表示されます。-
change_streams_with_pre_image
. -
change_streams_update_full_with_pre_image
.
-
After
フィールド変更後のドキュメントの完全な状態を提供します。このフィールドは、
capture.mode
が以下のいずれかの値に設定されている場合にのみ表示されます。-
change_streams_update_full
-
change_streams_update_full_with_pre_image
.
-
完全なドキュメントをキャプチャーするようにコネクターが設定されていると仮定すると、ExtractNewDocumentState
SMT が $unset
イベントの update
メッセージを受信すると、SMT は、次の例に示すように、削除されたフィールドに null
値があることを表し、メッセージを再エンコードします。
{ "id": 1, "a": null }
完全なドキュメントをキャプチャーするように設定されていないコネクターの場合、SMT が $unset
操作の更新イベントを受信すると、以下の出力メッセージが生成されます。
{ "a": null }
12.8.7. 元のデータベース操作のタイプの判別
SMT がイベントメッセージをフラット化した後、生成されるメッセージでは、イベントを生成した操作が create
、update
、または初期スナップショットの read
であるかを判断できなくなります。通常、delete
操作に関する情報を公開するか、削除に伴うイベントを書き換えるようにコネクターを設定して、削除操作を識別できます。イベントメッセージで廃棄および書き換えに関する情報を公開するようにコネクターを設定する方法は、drop.tombstones
および delete.handling.mode
プロパティーを参照してください。
イベントメッセージでデータベース操作のタイプを報告させるには、SMT は次の要素のいずれかに op
フィールドを追加します。
- イベントメッセージのボディー。
- メッセージヘッダー
たとえば、元の操作のタイプを示すヘッダープロパティーを追加するには、次の例のように、変換を追加してから、add.headers
プロパティーをコネクター設定に追加します。
transforms=unwrap,... transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState transforms.unwrap.add.headers=op
上記の設定に基づいて、SMT は op
ヘッダーをメッセージに追加し、文字列の値を割り当てて操作のタイプを特定することでイベントタイプを報告します。割り当てられた文字列の値は、元の MongoDB 変更イベントメッセージ の op
フィールドの値に基づいています。
12.8.8. MongoDB イベントフラット化 SMT を使用した Debezium メタデータの Kafka レコード追加
MongoDB のイベントフラット化 SMT は、元の変更イベントメッセージから簡素化されたメッセージにメタデータフィールドを追加できます。追加されたメタデータフィールドには、2 つのアンダースコア ("__"
) が接頭辞として付けられます。イベントレコードにメタデータを追加すると、変更イベントが発生したコレクションの名前などのコンテンツを含めたり、レプリカセット名などのコネクター固有のフィールドを含めたりすることができます。現在、SMT は、source
、transaction
、および updateDescription
の変更イベントのサブ構造フィールドのみを追加できます。
MongoDB 変更イベントの構造に関する詳細は、MongoDB コネクターのドキュメント を参照してください。
たとえば、次の設定を指定して、変更イベントのレプリカセット名 (rs
) とコレクション名を、最終的なフラット化されたイベントレコードに追加できます。
transforms=unwrap,... transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState transforms.unwrap.add.fields=rs,collection
上記の設定により、フラット化されたレコードに次のコンテンツが追加されます。
{ "__rs" : "rs0", "__collection" : "my-collection", ... }
SMT でメタデータフィールドを delete
イベントに追加する場合には、delete.handling.mode
オプションの値を rewrite
に設定します。
12.8.9. MongoDB 抽出の新しいドキュメント状態変換を選択して適用するオプション
データベースの変更が発生したときに Debezium コネクターが出力する変更イベントメッセージの他に、コネクターはハートビートメッセージなど、他のタイプのメッセージとスキーマ変更およびトランザクションに関するメタデータメッセージも出力します。これらの他のメッセージの構造は、SMT が処理するように設計された変更イベントメッセージの構造とは異なるため、目的のデータ変更メッセージのみを処理するようにコネクターを SMT を選んで適用することが推奨されます。
SMT を選択的に適用する方法の詳細は、変換用の SMT 述語の設定 を参照してください。
12.8.10. MongoDB の Debezium イベントフラット化変換の設定オプション
以下の表は、MongoDB イベントフラット化 SMT の設定オプションを示しています。
プロパティー | デフォルト | 説明 |
---|---|---|
| SMT が元のイベントメッセージから読み取る配列をエンコードする際に使用する形式を指定します。以下のオプションのいずれかを設定します。
| |
| SMT は、メッセージ内のネストされたプロパティーの名前を設定可能な区切り文字で連結して、単純なフィールド名を形成することで、元のイベントメッセージの構造 (structs) をフラット化します。 | |
|
| |
|
Debezium は、 | |
|
SMT が
| |
__ (double-underscore) | このオプションの文字列を設定して、ヘッダーに接頭辞を設定します。 | |
デフォルトなし |
SMT で簡略化されたメッセージのヘッダーに追加するメタデータフィールドのコンマ区切りのリストを、スペースを入れずに指定します。元のメッセージに重複したフィールド名が含まれている場合、struct の名前とフィールドの名前を一緒に指定することで、変更する特定のフィールドを識別できます (例:
必要に応じて、以下の形式のエントリーをリストに追加し、フィールドの元の名前を上書きして新しい名前を割り当てることができます。
以下に例を示します。 version:VERSION, connector:CONNECTOR, source.ts_ms:EVENT_TIMESTAMP
指定する新しい名前の値では、大文字と小文字が区別されます。 | |
__ (double-underscore) | フィールド名の前に付けるオプションの文字列を指定します。 | |
デフォルトなし |
このオプションを、簡略化された Kafka メッセージの
以下に例を示します。 version:VERSION, connector:CONNECTOR, source.ts_ms:EVENT_TIMESTAMP
指定する新しい名前の値では、大文字と小文字が区別されます。
SMT が簡略化したメッセージの |
既知の制限
- MongoDB はスキーマのないデータベースであるため、Debezium を使用してスキーマベースのデータリレーショナルデータベースに変更をストリーミングするときに一貫した列定義を確保するには、名前が同じコレクション内のフィールドに同じ型のデータを格納する必要があります。
- SMT を設定して、sink コネクターと互換性のある形式でメッセージを生成します。sink コネクターがフラットなメッセージ構造を必要としているにもかかわらず、ソース MongoDB ドキュメント内の配列を Structs の struct としてエンコードするメッセージを受信した場合、 sink コネクターはメッセージを処理できません。