12.5. Debezium の変更イベントからステート after ソースレコードを抽出する


Debezium のデータ変更イベントは、さまざまな情報を提供する複雑な構造を持ちます。Debezium の変更イベントを伝える Kafka レコードには、このすべての情報が含まれています。ただし、Kafka エコシステムの一部では、フィールド名と値のフラットな構造の Kafka レコードが要求されます。この種のレコードを提供するために、Debezium ではイベントフラット化単一メッセージ変換 (SMT) を利用することができます。Debezium の変更イベントが含まれる Kafka レコードよりも単純なフォーマットの Kafka レコードをコンシューマーが要求する場合に、この変換を設定します。

イベントフラット化変換は Kafka Connect SMT です。

この変換は、SQL データベースコネクターでのみ利用することができます。

詳細は以下のセクションを参照してください。

12.5.1. Debezium 変更イベントの構造について

Debezium は、複雑な構造を持つデータ変更イベントを生成します。それぞれイベントは、以下の 3 つの部分で設定されます。

  • 以下の項目が含まれるメタデータ (ただし、これらに限定されません)

    • 変更を加えた操作
    • データベースや変更が加えられたテーブルの名前などのソース情報
    • 変更が加えられた時刻のタイムスタンプ
    • (任意の項目) トランザクション情報
  • 変更前の行データ
  • 変更後の行データ

例えば、UPDATE 変更イベントの構造の一部は次のようになります。

{
	"op": "u",
	"source": {
		...
	},
	"ts_ms" : "...",
	"before" : {
		"field1" : "oldvalue1",
		"field2" : "oldvalue2"
	},
	"after" : {
		"field1" : "newvalue1",
		"field2" : "newvalue2"
	}
}

この複雑なフォーマットは、システムで発生する変更に関するほとんどの情報を提供します。しかし、その他のコネクターや Kafka エコシステムの他の要素では、通常、以下のような単純なフォーマットのデータが要求されます。

{
	"field1" : "newvalue1",
	"field2" : "newvalue2"
}

コンシューマーが必要とする Kafka レコードのフォーマットを提供するには、イベントフラット化 SMT を設定します。

12.5.2. Debezium イベントフラット化変換の動作

イベントフラットニング SMT は、Kafka レコードの Debezium 変更イベントから after フィールドを抽出します。SMT は元の変更イベントを after フィールドのみで置き換え、シンプルな Kafka レコードを作成します。

Debezium コネクターまたは Debezium コネクターから出力されるメッセージを使用するシンクコネクターに、イベントフラット化 SMT を設定することができます。シンクコネクターにイベントフラット化を設定するメリットは、Apache Kafka に保存されるレコードに Debezium の変更イベント全体が含まれることです。SMT を元のコネクターまたはシンクコネクターに適用するかどうかの判断は、特定のユースケースによります。

以下の操作のいずれかを実行するように変換を設定することができます。

  • 変更イベントからのメタデータを簡素化した Kafka レコードに追加する。デフォルト動作では、SMT はメタデータを追加しません。
  • DELETE 操作の変更イベントを含む Kafka レコードをストリームに保持します。デフォルトの動作は、SMT が DELETE 操作変更イベントの Kafka レコードをドロップするというもので、ほとんどのコンシューマーがまだ処理できないためです。

データベースの DELETE 操作により、Debezium は 2 つの Kafka レコードを生成します。

  • "op": "d"before 行のデータ、その他のフィールドが含まれるレコード。
  • 削除された行と同じキーを持ち、値が null である墓石のレコード。このレコードは Apache Kafka のマーカーです。これは、ログコンパクション によりこのキーを持つすべてのレコードが削除されることを意味します。

before 行のデータを含むレコードをドロップする代わりに、イベントフラットニング SMT が以下のいずれかを行うように設定することができます。

  • ストリーム内のレコードを保持し、"value": "null" フィールドのみを持つように編集します。
  • レコードをストリームに維持し、追加した "__deleted": "true" エントリーと共に before フィールドに含まれていたキー/値のペアが含まれる value フィールドを持つようにそのレコードを編集する。

同様に、トゥームストーンレコードをドロップする代わりに、イベントフラット化 SMT を設定してトゥームストーンレコードをストリームに維持することができます。

12.5.3. Debezium イベントフラット化変換の設定

コネクターの設定に SMT 設定の詳細を追加して、Kafka Connect ソースコネクターまたはシンクコネクターに Debezium イベントフラット化 SMT を設定します。デフォルトの動作を得るためには、.properties ファイルで、以下のように指定します。

transforms=unwrap,...
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState

他の Kafka Connect のコネクター設定と同様に、transforms= にコンマで区切られた複数の SMT エイリアスを設定し、Kafka Connect に SMT を適用させたい順番に設定することができます。

次の .properties の例では、いくつかのイベントフラットニング SMT オプションを設定しています。

transforms=unwrap,...
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones=false
transforms.unwrap.delete.handling.mode=rewrite
transforms.unwrap.add.fields=table,lsn
drop.tombstones=false
イベントストリームに DELETE 操作の墓石の記録を残します。
delete.handling.mode=rewrite

DELETE 操作では、変更イベントにあった value フィールドをフラット化することで、Kafka レコードを編集します。valueフィールドには、before フィールドにあったキーと値のペアが直接入ります。SMT では、例えば __deleted を追加して、それを true に設定します。

"value": {
  "pk": 2,
  "cola": null,
  "__deleted": "true"
}
add.fields=table,lsn
table および lsn フィールドの変更イベントメタデータを簡素化した Kafka レコードに追加します。

12.5.4. Kafka レコードに Debezium メタデータを追加する例

イベントフラット化 SMT では、元の変更イベントメタデータを簡素化した Kafka レコードに追加することができます。たとえば、簡素化したレコードのヘッダーまたは値に、次のいずれかの項目を含めることができます。

  • 変更を加えた操作のタイプ
  • データベースまたは変更が加えられたテーブルの名前
  • Postgres LSN フィールド等のコネクター固有のフィールド

簡略化された Kafka レコードのヘッダーにメタデータを追加するには、add.header オプションを指定します。簡略化された Kafka レコードの値にメタデータを追加するには、add.fields オプションを指定します。これらのオプションには、それぞれ変更イベントフィールド名のコンマ区切りリストを設定します。スペースは指定しないでください。フィールド名が重複している場合、それらのフィールドの 1 つのメタデータを追加するには、フィールドと共に構造体を指定します。以下に例を示します。

transforms=unwrap,...
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.add.fields=op,table,lsn,source.ts_ms
transforms.unwrap.add.headers=db
transforms.unwrap.delete.handling.mode=rewrite

この設定では、簡素化した Kafka レコードには以下のような内容が含まれます。

{
 ...
	"__op" : "c",
	"__table": "MY_TABLE",
	"__lsn": "123456789",
	"__source_ts_ms" : "123456789",
 ...
}

また、簡略化された Kafka のレコードには、__db ヘッダーが付いています。

簡素化した Kafka レコードでは、SMT はメタデータフィールド名の前にダブルアンダースコアを追加します。また、構造体を指定すると、SMT は構造体名とフィールド名の間にアンダースコアを挿入します。

DELETE 操作用のシンプルな Kafka レコードにメタデータを追加するには、delete.handling.mode=rewrite も設定する必要があります。

12.5.5. イベントフラット化変換を選択的に適用するオプション

データベースの変更が発生したときに Debezium コネクターが出力する変更イベントメッセージの他に、コネクターはハートビートメッセージなど、他のタイプのメッセージとスキーマ変更およびトランザクションに関するメタデータメッセージも出力します。これらの他のメッセージの構造は、SMT が処理するように設計された変更イベントメッセージの構造とは異なるため、目的のデータ変更メッセージのみを処理するようにコネクターを SMT を選んで適用することが推奨されます。

SMT の選択的な適用方法の詳細は、変換用の SMT 述語の設定 を参照してください。

12.5.6. Debezium イベントフラット化変換設定用のオプション

次の表で、イベントフラット化 SMT を設定する際に指定することのできるオプションを説明します。

表12.5 イベントフラット化 SMT 設定オプションの説明
オプションデフォルト説明

drop.tombstones

true

Debezium は、DELETE 操作ごとに廃棄レコードを生成します。デフォルト動作では、イベントフラット化 SMT はストリームからトゥームストーンレコードを削除します。廃棄レコードをストリームに残すには、drop.tombstones=false を指定します。

削除処理モード

drop

Debezium は、DELETE 操作ごとに変更イベントレコードを生成します。デフォルト動作では、イベントフラット化 SMT はストリームからこれらのレコードを削除します。DELETE 操作の Kafka レコードをストリームに残すには、delete.handling.modenone または rewrite に設定します。

ストリームに変更イベントの記録を残す場合は、none を指定します。レコードには "value": "null" のみが含まれています。

rewrite を指定して変更イベントのレコードをストリームに残し、レコードを編集して、beforeフィールドにあったキーと値のペアを含む value フィールドを持ち、さらに __deleted: truevalue に追加します。これは、レコードが削除されていることを示す別の方法です。

rewrite を指定すると、DELETE 操作の更新された簡素化したレコードだけで、削除されたレコードを追跡することができます。Debezium コネクターが作成するトゥームストーンレコードをドロップするデフォルトの動作を受け入れることを検討できます。

route.by.field

 

行データを使用してレコードをルーティングするトピックを決定するには、このオプションを after フィールド属性に設定します。SMT は、指定した after フィールド属性の値にマッチする名前のトピックにレコードをルーティングします。DELETE 操作の場合は、このオプションを before フィールド属性に設定します。

たとえば、設定が route.by.field=destination の場合、名前が after.destination の値のトピックにレコードがルーティングされます。Debezium コネクターのデフォルト動作では、名前がデータベースおよび変更が加えられたテーブルの名前で設定されるトピックに、それぞれの変更イベントレコードが送信されます。

シンクコネクターにイベントフラット化 SMT を設定する場合、このオプションを設定すると、ルーティング先トピックの名前が簡素化した変更イベントレコードで更新されるデータベーステーブルの名前に優先する場合に役立ちます。トピック名が実際のユースケースに適しない場合は、route.by.field を設定してイベントを再ルーティングすることができます。

add.fields.prefix

__ (double-underscore)

このオプションの文字列を設定して、フィールドに接頭辞を設定します。

add.fields

 

このオプションをメタデータフィールドのコンマ区切りリスト (スペースなし) に設定し、簡素化した Kafka レコードの値に追加します。フィールド名が重複している場合、それらのフィールドの 1 つのメタデータを追加するには、フィールドと共に構造体を指定します (例: source.ts_ms)。

オプションとして、<field name>:<new field name> でフィールド名を上書きすることができます。例えば、以下のように、新しいフィールド名は version:VERSION, connector:CONNECTOR, source.ts_ms:EVENT_TIMESTAMP のようになります。new field name は、大文字と小文字が区別されることに注意してください。

SMT が簡素化したレコードの値にメタデータフィールドを追加する場合、それぞれのメタデータフィールド名の前にダブルアンダースコアが追加されます。構造体の指定に関して、SMT は構造体名とフィールド名の間にもアンダースコアを挿入します。

変更イベントレコードにないフィールドを指定した場合でも、SMT はレコードの値にそのフィールドを追加します。

add.headers.prefix

__ (double-underscore)

このオプションの文字列を設定して、ヘッダーに接頭辞を設定します。

add.headers

 

このオプションをメタデータフィールドのコンマ区切りリスト (スペースなし) に設定し、簡素化した Kafka レコードのヘッダーに追加します。フィールド名が重複している場合、それらのフィールドの 1 つのメタデータを追加するには、フィールドと共に構造体を指定します (例: source.ts_ms)。

オプションとして、<field name>:<new field name> でフィールド名を上書きすることができます。例えば、以下のように、新しいフィールド名は version:VERSION, connector:CONNECTOR, source.ts_ms:EVENT_TIMESTAMP のようになります。new field name は、大文字と小文字が区別されることに注意してください。

SMT が簡素化したレコードのヘッダーにメタデータフィールドを追加する場合、それぞれのメタデータフィールド名の前にダブルアンダースコアが追加されます。構造体の指定に関して、SMT は構造体名とフィールド名の間にもアンダースコアを挿入します。

変更イベントレコードにないフィールドを指定した場合、SMT はヘッダーにそのフィールドを追加しません。

Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.