12.5. Debezium の変更イベントからステート after ソースレコードを抽出する
Debezium のデータ変更イベントは、さまざまな情報を提供する複雑な構造を持ちます。Debezium の変更イベントを伝える Kafka レコードには、このすべての情報が含まれています。ただし、Kafka エコシステムの一部では、フィールド名と値のフラットな構造の Kafka レコードが要求されます。この種のレコードを提供するために、Debezium ではイベントフラット化単一メッセージ変換 (SMT) を利用することができます。Debezium の変更イベントが含まれる Kafka レコードよりも単純なフォーマットの Kafka レコードをコンシューマーが要求する場合に、この変換を設定します。
イベントフラット化変換は Kafka Connect SMT です。
この変換は、SQL データベースコネクターでのみ利用することができます。
詳細は以下のセクションを参照してください。
12.5.1. Debezium 変更イベントの構造について
Debezium は、複雑な構造を持つデータ変更イベントを生成します。それぞれイベントは、以下の 3 つの部分で設定されます。
以下の項目が含まれるメタデータ (ただし、これらに限定されません)
- 変更を加えた操作
- データベースや変更が加えられたテーブルの名前などのソース情報
- 変更が加えられた時刻のタイムスタンプ
- (任意の項目) トランザクション情報
- 変更前の行データ
- 変更後の行データ
例えば、UPDATE
変更イベントの構造の一部は次のようになります。
{ "op": "u", "source": { ... }, "ts_ms" : "...", "before" : { "field1" : "oldvalue1", "field2" : "oldvalue2" }, "after" : { "field1" : "newvalue1", "field2" : "newvalue2" } }
この複雑なフォーマットは、システムで発生する変更に関するほとんどの情報を提供します。しかし、その他のコネクターや Kafka エコシステムの他の要素では、通常、以下のような単純なフォーマットのデータが要求されます。
{ "field1" : "newvalue1", "field2" : "newvalue2" }
コンシューマーが必要とする Kafka レコードのフォーマットを提供するには、イベントフラット化 SMT を設定します。
12.5.2. Debezium イベントフラット化変換の動作
イベントフラットニング SMT は、Kafka レコードの Debezium 変更イベントから after
フィールドを抽出します。SMT は元の変更イベントを after
フィールドのみで置き換え、シンプルな Kafka レコードを作成します。
Debezium コネクターまたは Debezium コネクターから出力されるメッセージを使用するシンクコネクターに、イベントフラット化 SMT を設定することができます。シンクコネクターにイベントフラット化を設定するメリットは、Apache Kafka に保存されるレコードに Debezium の変更イベント全体が含まれることです。SMT を元のコネクターまたはシンクコネクターに適用するかどうかの判断は、特定のユースケースによります。
以下の操作のいずれかを実行するように変換を設定することができます。
- 変更イベントからのメタデータを簡素化した Kafka レコードに追加する。デフォルト動作では、SMT はメタデータを追加しません。
-
DELETE
操作の変更イベントを含む Kafka レコードをストリームに保持します。デフォルトの動作は、SMT がDELETE
操作変更イベントの Kafka レコードをドロップするというもので、ほとんどのコンシューマーがまだ処理できないためです。
データベースの DELETE
操作により、Debezium は 2 つの Kafka レコードを生成します。
-
"op": "d"
、before
行のデータ、その他のフィールドが含まれるレコード。 -
削除された行と同じキーを持ち、値が
null
である墓石のレコード。このレコードは Apache Kafka のマーカーです。これは、ログコンパクション によりこのキーを持つすべてのレコードが削除されることを意味します。
before
行のデータを含むレコードをドロップする代わりに、イベントフラットニング SMT が以下のいずれかを行うように設定することができます。
-
ストリーム内のレコードを保持し、
"value": "null"
フィールドのみを持つように編集します。 -
レコードをストリームに維持し、追加した
"__deleted": "true"
エントリーと共にbefore
フィールドに含まれていたキー/値のペアが含まれるvalue
フィールドを持つようにそのレコードを編集する。
同様に、トゥームストーンレコードをドロップする代わりに、イベントフラット化 SMT を設定してトゥームストーンレコードをストリームに維持することができます。
12.5.3. Debezium イベントフラット化変換の設定
コネクターの設定に SMT 設定の詳細を追加して、Kafka Connect ソースコネクターまたはシンクコネクターに Debezium イベントフラット化 SMT を設定します。デフォルトの動作を得るためには、.properties
ファイルで、以下のように指定します。
transforms=unwrap,... transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
他の Kafka Connect のコネクター設定と同様に、transforms=
にコンマで区切られた複数の SMT エイリアスを設定し、Kafka Connect に SMT を適用させたい順番に設定することができます。
次の .properties
の例では、いくつかのイベントフラットニング SMT オプションを設定しています。
transforms=unwrap,... transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState transforms.unwrap.drop.tombstones=false transforms.unwrap.delete.handling.mode=rewrite transforms.unwrap.add.fields=table,lsn
drop.tombstones=false
-
イベントストリームに
DELETE
操作の墓石の記録を残します。 delete.handling.mode=rewrite
DELETE
操作では、変更イベントにあったvalue
フィールドをフラット化することで、Kafka レコードを編集します。value
フィールドには、before
フィールドにあったキーと値のペアが直接入ります。SMT では、例えば__deleted
を追加して、それをtrue
に設定します。"value": { "pk": 2, "cola": null, "__deleted": "true" }
add.fields=table,lsn
-
table
およびlsn
フィールドの変更イベントメタデータを簡素化した Kafka レコードに追加します。
12.5.4. Kafka レコードに Debezium メタデータを追加する例
イベントフラット化 SMT では、元の変更イベントメタデータを簡素化した Kafka レコードに追加することができます。たとえば、簡素化したレコードのヘッダーまたは値に、次のいずれかの項目を含めることができます。
- 変更を加えた操作のタイプ
- データベースまたは変更が加えられたテーブルの名前
- Postgres LSN フィールド等のコネクター固有のフィールド
簡略化された Kafka レコードのヘッダーにメタデータを追加するには、add.header
オプションを指定します。簡略化された Kafka レコードの値にメタデータを追加するには、add.fields
オプションを指定します。これらのオプションには、それぞれ変更イベントフィールド名のコンマ区切りリストを設定します。スペースは指定しないでください。フィールド名が重複している場合、それらのフィールドの 1 つのメタデータを追加するには、フィールドと共に構造体を指定します。以下に例を示します。
transforms=unwrap,... transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState transforms.unwrap.add.fields=op,table,lsn,source.ts_ms transforms.unwrap.add.headers=db transforms.unwrap.delete.handling.mode=rewrite
この設定では、簡素化した Kafka レコードには以下のような内容が含まれます。
{ ... "__op" : "c", "__table": "MY_TABLE", "__lsn": "123456789", "__source_ts_ms" : "123456789", ... }
また、簡略化された Kafka のレコードには、__db
ヘッダーが付いています。
簡素化した Kafka レコードでは、SMT はメタデータフィールド名の前にダブルアンダースコアを追加します。また、構造体を指定すると、SMT は構造体名とフィールド名の間にアンダースコアを挿入します。
DELETE
操作用のシンプルな Kafka レコードにメタデータを追加するには、delete.handling.mode=rewrite
も設定する必要があります。
12.5.5. イベントフラット化変換を選択的に適用するオプション
データベースの変更が発生したときに Debezium コネクターが出力する変更イベントメッセージの他に、コネクターはハートビートメッセージなど、他のタイプのメッセージとスキーマ変更およびトランザクションに関するメタデータメッセージも出力します。これらの他のメッセージの構造は、SMT が処理するように設計された変更イベントメッセージの構造とは異なるため、目的のデータ変更メッセージのみを処理するようにコネクターを SMT を選んで適用することが推奨されます。
SMT の選択的な適用方法の詳細は、変換用の SMT 述語の設定 を参照してください。
12.5.6. Debezium イベントフラット化変換設定用のオプション
次の表で、イベントフラット化 SMT を設定する際に指定することのできるオプションを説明します。
オプション | デフォルト | 説明 |
---|---|---|
|
Debezium は、 | |
|
Debezium は、 | |
行データを使用してレコードをルーティングするトピックを決定するには、このオプションを | ||
__ (double-underscore) | このオプションの文字列を設定して、フィールドに接頭辞を設定します。 | |
このオプションをメタデータフィールドのコンマ区切りリスト (スペースなし) に設定し、簡素化した Kafka レコードの値に追加します。フィールド名が重複している場合、それらのフィールドの 1 つのメタデータを追加するには、フィールドと共に構造体を指定します (例: | ||
__ (double-underscore) | このオプションの文字列を設定して、ヘッダーに接頭辞を設定します。 | |
このオプションをメタデータフィールドのコンマ区切りリスト (スペースなし) に設定し、簡素化した Kafka レコードのヘッダーに追加します。フィールド名が重複している場合、それらのフィールドの 1 つのメタデータを追加するには、フィールドと共に構造体を指定します (例: |