13.7. Debezium の変更イベントからステート after ソースレコードを抽出する
Debezium コネクターは、データ変更メッセージを出力し、ソースデータベースからキャプチャーする各操作を表します。コネクターが Apache Kafka に送信するメッセージの構造は複雑で、元のデータベースイベントの詳細を表します。
この複雑なメッセージ形式は、システムで発生する変更に関する情報を正確に詳述しますが、この形式は一部のダウンストリームコンシューマーには適していない場合があります。sink コネクター、または Kafka エコシステムの他の部分では、フィールド名と値が単純化されたフラットな構造で表示されるようにフォーマットされたメッセージが必要になる場合があります。
Debezium コネクターが生成するイベントレコードの形式を簡素化するために、Debezium イベントフラット化単一メッセージ変換 (SMT) を使用できます。コネクターが生成するデフォルトの形式よりも単純な形式の Kafka レコードを必要とするコンシューマーをサポートするように変換を設定します。特定のユースケースに合わせて、SMT を Debezium コネクター、または Debezium コネクターが生成するメッセージを消費する sink コネクターに適用できます。Apache Kafka が Debezium 変更イベントメッセージを元の形式で保持できるようにするには、sink コネクターの SMT を設定します。
イベントフラット化変換は Kafka Connect SMT です。
この章では、Debezium SQL ベースのデータベースコネクターのイベントフラット化シングルメッセージ変換 (SMT) を説明します。Debezium MongoDB コネクターの同等の SMT の詳細は、MongoDB 新規ドキュメントの状態の抽出 を参照してください。
詳細は以下のセクションを参照してください。
13.7.1. Debezium 変更イベントの構造について
Debezium は、複雑な構造を持つデータ変更イベントを生成します。それぞれイベントは、以下の 3 つの部分で構成されます。
以下の項目が含まれるメタデータ (ただし、これらに限定されません)
- データを変更した操作のタイプ。
- データベースの名前や、変更が発生したテーブルなどのソース情報。
- 変更が加えられたタイミングを識別するタイムスタンプ。
- (任意の項目) トランザクション情報
- 変更前の行データ
- 変更後の行データ
以下の例は、UPDATE
変更イベントのメッセージ構造の一部を示しています。
{ "op": "u", "source": { ... }, "ts_ms" : "...", "before" : { "field1" : "oldvalue1", "field2" : "oldvalue2" }, "after" : { "field1" : "newvalue1", "field2" : "newvalue2" } }
コネクターの変更イベント構造の詳細は、コネクターのドキュメントを参照してください。
イベントフラット化 SMT が前の例のメッセージを処理した後、メッセージ形式が単純化され、次にあるメッセージが生成されます。
{ "field1" : "newvalue1", "field2" : "newvalue2" }
13.7.2. Debezium イベントフラット化変換の動作
イベントフラットニング SMT は、Kafka レコードの Debezium 変更イベントから after
フィールドを抽出します。SMT は元の変更イベントを after
フィールドのみで置き換え、シンプルな Kafka レコードを作成します。
Debezium コネクターまたは Debezium コネクターから出力されるメッセージを使用する sink コネクターに、イベントフラット化 SMT を設定することができます。sink コネクターにイベントフラット化を設定するメリットは、Apache Kafka に保存されるレコードに Debezium の変更イベント全体が含まれることです。SMT を元のコネクターまたは sink コネクターに適用するかどうかの判断は、特定のユースケースによります。
以下の操作のいずれかを実行するように変換を設定することができます。
- 変更イベントからのメタデータを簡素化した Kafka レコードに追加する。デフォルト動作では、SMT はメタデータを追加しません。
-
DELETE
操作の変更イベントを含む Kafka レコードをストリームに保持します。デフォルトの動作は、SMT がDELETE
操作変更イベントの Kafka レコードをドロップするというもので、ほとんどのコンシューマーがまだ処理できないためです。
データベースの DELETE
操作により、Debezium は 2 つの Kafka レコードを生成します。
-
"op": "d"
、before
行のデータ、その他のフィールドが含まれるレコード。 -
削除された行と同じキーを持ち、値が
null
である墓石のレコード。このレコードは Apache Kafka のマーカーです。これは、ログコンパクション によりこのキーを持つすべてのレコードが削除されることを意味します。
before
行のデータを含むレコードをドロップする代わりに、イベントフラットニング SMT が以下のいずれかを行うように設定することができます。
-
ストリーム内のレコードを保持し、
"value": "null"
フィールドのみを持つように編集します。 -
レコードをストリームに維持し、追加した
"__deleted": "true"
エントリーと共にbefore
フィールドに含まれていたキー/値のペアが含まれるvalue
フィールドを持つようにそのレコードを編集する。
同様に、tombstone レコードを破棄する代わりに、イベントフラット化 SMT を設定して tombstone レコードをストリームに維持することができます。
13.7.3. Debezium イベントフラット化変換の設定
コネクターの設定に SMT 設定の詳細を追加して、Kafka Connect ソースコネクターまたは sink コネクターに Debezium イベントフラット化 SMT を設定します。たとえば、変換のデフォルトの動作を取得するには、次の例のように、オプションを指定せずにコネクター設定に追加します。
transforms=unwrap,... transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
他の Kafka Connect のコネクター設定と同様に、transforms=
にコンマで区切られた複数の SMT エイリアスを設定し、Kafka Connect に SMT を適用させたい順番に設定することができます。
次の .properties
の例では、いくつかのイベントフラットニング SMT オプションを設定しています。
transforms=unwrap,... transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState transforms.unwrap.drop.tombstones=false transforms.unwrap.delete.handling.mode=rewrite transforms.unwrap.add.fields=table,lsn
drop.tombstones=false
-
イベントストリームに
DELETE
操作の墓石の記録を残します。 delete.handling.mode=rewrite
DELETE
操作では、変更イベントにあったvalue
フィールドをフラット化することで、Kafka レコードを編集します。value
フィールドには、before
フィールドにあったキーと値のペアが直接入ります。SMT では、たとえば__deleted
を追加して、それをtrue
に設定します。"value": { "pk": 2, "cola": null, "__deleted": "true" }
add.fields=table,lsn
-
table
およびlsn
フィールドの変更イベントメタデータを簡素化した Kafka レコードに追加します。
設定のカスタマイズ
コネクターは、多くの種類のイベントメッセージ (ハートビートメッセージ、廃棄メッセージ、またはトランザクションまたはスキーマの変更に関するメタデータメッセージ) を発行する場合があります。イベントのサブセットに変換を適用するには、特定のイベントだけを対象にして 変換を選択して適用する SMT 述語ステートメント を義できます。
13.7.4. Kafka レコードに Debezium メタデータを追加する例
イベントフラット化 SMT を設定して、元の変更イベントメタデータを簡素化した Kafka レコードに追加できます。たとえば、簡素化したレコードのヘッダーまたは値に、次のいずれかの項目を含めることができます。
- 変更を加えた操作のタイプ
- データベースまたは変更が加えられたテーブルの名前
- Postgres LSN フィールド等のコネクター固有のフィールド
簡略化された Kafka レコードのヘッダーにメタデータを追加するには、add.header
オプションを指定します。簡略化された Kafka レコードの値にメタデータを追加するには、add.fields
オプションを指定します。これらのオプションには、それぞれ変更イベントフィールド名のコンマ区切りリストを設定します。スペースは指定しないでください。フィールド名が重複している場合、それらのフィールドの 1 つのメタデータを追加するには、フィールドと共に構造体を指定します。以下に例を示します。
transforms=unwrap,... transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState transforms.unwrap.add.fields=op,table,lsn,source.ts_ms transforms.unwrap.add.headers=db transforms.unwrap.delete.handling.mode=rewrite
この設定では、簡素化した Kafka レコードには以下のような内容が含まれます。
{ ... "__op" : "c", "__table": "MY_TABLE", "__lsn": "123456789", "__source_ts_ms" : "123456789", ... }
また、簡略化された Kafka のレコードには、__db
ヘッダーが付いています。
簡素化した Kafka レコードでは、SMT はメタデータフィールド名の前にダブルアンダースコアを追加します。また、構造体を指定すると、SMT は構造体名とフィールド名の間にアンダースコアを挿入します。
DELETE
操作用のシンプルな Kafka レコードにメタデータを追加するには、delete.handling.mode=rewrite
も設定する必要があります。
13.7.5. イベントフラット化変換を選択的に適用するオプション
データベースの変更が発生したときに Debezium コネクターが出力する変更イベントメッセージの他に、コネクターはハートビートメッセージなど、他のタイプのメッセージとスキーマ変更およびトランザクションに関するメタデータメッセージも出力します。これらの他のメッセージの構造は、SMT が処理するように設計された変更イベントメッセージの構造とは異なるため、目的のデータ変更メッセージのみを処理するようにコネクターを SMT を選んで適用することが推奨されます。
SMT を選択的に適用する方法の詳細は、変換用の SMT 述語の設定 を参照してください。
13.7.6. Debezium イベントフラット化変換設定用のオプション
次の表で、イベントフラット化 SMT を設定する際に指定することのできるオプションを説明します。
オプション | デフォルト | 説明 |
---|---|---|
|
Debezium は、 | |
|
Debezium は、 | |
行データを使用してレコードをルーティングするトピックを決定するには、このオプションを | ||
__ (double-underscore) | このオプションの文字列を設定して、フィールドに接頭辞を設定します。 | |
このオプションをメタデータフィールドのコンマ区切りリスト (スペースなし) に設定し、簡素化した Kafka レコードの値に追加します。フィールド名が重複している場合、それらのフィールドの 1 つのメタデータを追加するには、フィールドと共に構造体を指定します (例: | ||
__ (double-underscore) | このオプションの文字列を設定して、ヘッダーに接頭辞を設定します。 | |
このオプションをメタデータフィールドのコンマ区切りリスト (スペースなし) に設定し、簡素化した Kafka レコードのヘッダーに追加します。フィールド名が重複している場合、それらのフィールドの 1 つのメタデータを追加するには、フィールドと共に構造体を指定します (例: | ||
出力メッセージから削除するソースメッセージ内のフィールド名をリストするために使用する Kafka メッセージヘッダー名。 | ||
|
SMT でイベントのキーから | |
|
SMT で、 |