13.7. Debezium の変更イベントからステート after ソースレコードを抽出する


Debezium コネクターは、データ変更メッセージを出力し、ソースデータベースからキャプチャーする各操作を表します。コネクターが Apache Kafka に送信するメッセージの構造は複雑で、元のデータベースイベントの詳細を表します。

この複雑なメッセージ形式は、システムで発生する変更に関する情報を正確に詳述しますが、この形式は一部のダウンストリームコンシューマーには適していない場合があります。sink コネクター、または Kafka エコシステムの他の部分では、フィールド名と値が単純化されたフラットな構造で表示されるようにフォーマットされたメッセージが必要になる場合があります。

Debezium コネクターが生成するイベントレコードの形式を簡素化するために、Debezium イベントフラット化単一メッセージ変換 (SMT) を使用できます。コネクターが生成するデフォルトの形式よりも単純な形式の Kafka レコードを必要とするコンシューマーをサポートするように変換を設定します。特定のユースケースに合わせて、SMT を Debezium コネクター、または Debezium コネクターが生成するメッセージを消費する sink コネクターに適用できます。Apache Kafka が Debezium 変更イベントメッセージを元の形式で保持できるようにするには、sink コネクターの SMT を設定します。

イベントフラット化変換は Kafka Connect SMT です。

注記

この章では、Debezium SQL ベースのデータベースコネクターのイベントフラット化シングルメッセージ変換 (SMT) を説明します。Debezium MongoDB コネクターの同等の SMT の詳細は、MongoDB 新規ドキュメントの状態の抽出 を参照してください。

詳細は以下のセクションを参照してください。

13.7.1. Debezium 変更イベントの構造について

Debezium は、複雑な構造を持つデータ変更イベントを生成します。それぞれイベントは、以下の 3 つの部分で構成されます。

  • 以下の項目が含まれるメタデータ (ただし、これらに限定されません)

    • データを変更した操作のタイプ。
    • データベースの名前や、変更が発生したテーブルなどのソース情報。
    • 変更が加えられたタイミングを識別するタイムスタンプ。
    • (任意の項目) トランザクション情報
  • 変更前の行データ
  • 変更後の行データ

以下の例は、UPDATE 変更イベントのメッセージ構造の一部を示しています。

{
	"op": "u",
	"source": {
		...
	},
	"ts_ms" : "...",
	"before" : {
		"field1" : "oldvalue1",
		"field2" : "oldvalue2"
	},
	"after" : {
		"field1" : "newvalue1",
		"field2" : "newvalue2"
	}
}

コネクターの変更イベント構造の詳細は、コネクターのドキュメントを参照してください。

イベントフラット化 SMT が前の例のメッセージを処理した後、メッセージ形式が単純化され、次にあるメッセージが生成されます。

{
	"field1" : "newvalue1",
	"field2" : "newvalue2"
}

13.7.2. Debezium イベントフラット化変換の動作

イベントフラットニング SMT は、Kafka レコードの Debezium 変更イベントから after フィールドを抽出します。SMT は元の変更イベントを after フィールドのみで置き換え、シンプルな Kafka レコードを作成します。

Debezium コネクターまたは Debezium コネクターから出力されるメッセージを使用する sink コネクターに、イベントフラット化 SMT を設定することができます。sink コネクターにイベントフラット化を設定するメリットは、Apache Kafka に保存されるレコードに Debezium の変更イベント全体が含まれることです。SMT を元のコネクターまたは sink コネクターに適用するかどうかの判断は、特定のユースケースによります。

以下の操作のいずれかを実行するように変換を設定することができます。

  • 変更イベントからのメタデータを簡素化した Kafka レコードに追加する。デフォルト動作では、SMT はメタデータを追加しません。
  • DELETE 操作の変更イベントを含む Kafka レコードをストリームに保持します。デフォルトの動作は、SMT が DELETE 操作変更イベントの Kafka レコードをドロップするというもので、ほとんどのコンシューマーがまだ処理できないためです。

データベースの DELETE 操作により、Debezium は 2 つの Kafka レコードを生成します。

  • "op": "d"before 行のデータ、その他のフィールドが含まれるレコード。
  • 削除された行と同じキーを持ち、値が null である墓石のレコード。このレコードは Apache Kafka のマーカーです。これは、ログコンパクション によりこのキーを持つすべてのレコードが削除されることを意味します。

before 行のデータを含むレコードをドロップする代わりに、イベントフラットニング SMT が以下のいずれかを行うように設定することができます。

  • ストリーム内のレコードを保持し、"value": "null" フィールドのみを持つように編集します。
  • レコードをストリームに維持し、追加した "__deleted": "true" エントリーと共に before フィールドに含まれていたキー/値のペアが含まれる value フィールドを持つようにそのレコードを編集する。

同様に、tombstone レコードを破棄する代わりに、イベントフラット化 SMT を設定して tombstone レコードをストリームに維持することができます。

13.7.3. Debezium イベントフラット化変換の設定

コネクターの設定に SMT 設定の詳細を追加して、Kafka Connect ソースコネクターまたは sink コネクターに Debezium イベントフラット化 SMT を設定します。たとえば、変換のデフォルトの動作を取得するには、次の例のように、オプションを指定せずにコネクター設定に追加します。

transforms=unwrap,...
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState

他の Kafka Connect のコネクター設定と同様に、transforms= にコンマで区切られた複数の SMT エイリアスを設定し、Kafka Connect に SMT を適用させたい順番に設定することができます。

次の .properties の例では、いくつかのイベントフラットニング SMT オプションを設定しています。

transforms=unwrap,...
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones=false
transforms.unwrap.delete.handling.mode=rewrite
transforms.unwrap.add.fields=table,lsn
drop.tombstones=false
イベントストリームに DELETE 操作の墓石の記録を残します。
delete.handling.mode=rewrite

DELETE 操作では、変更イベントにあった value フィールドをフラット化することで、Kafka レコードを編集します。valueフィールドには、before フィールドにあったキーと値のペアが直接入ります。SMT では、たとえば __deleted を追加して、それを true に設定します。

"value": {
  "pk": 2,
  "cola": null,
  "__deleted": "true"
}
add.fields=table,lsn
table および lsn フィールドの変更イベントメタデータを簡素化した Kafka レコードに追加します。

設定のカスタマイズ

コネクターは、多くの種類のイベントメッセージ (ハートビートメッセージ、廃棄メッセージ、またはトランザクションまたはスキーマの変更に関するメタデータメッセージ) を発行する場合があります。イベントのサブセットに変換を適用するには、特定のイベントだけを対象にして 変換を選択して適用する SMT 述語ステートメント を義できます。

13.7.4. Kafka レコードに Debezium メタデータを追加する例

イベントフラット化 SMT を設定して、元の変更イベントメタデータを簡素化した Kafka レコードに追加できます。たとえば、簡素化したレコードのヘッダーまたは値に、次のいずれかの項目を含めることができます。

  • 変更を加えた操作のタイプ
  • データベースまたは変更が加えられたテーブルの名前
  • Postgres LSN フィールド等のコネクター固有のフィールド

簡略化された Kafka レコードのヘッダーにメタデータを追加するには、add.header オプションを指定します。簡略化された Kafka レコードの値にメタデータを追加するには、add.fields オプションを指定します。これらのオプションには、それぞれ変更イベントフィールド名のコンマ区切りリストを設定します。スペースは指定しないでください。フィールド名が重複している場合、それらのフィールドの 1 つのメタデータを追加するには、フィールドと共に構造体を指定します。以下に例を示します。

transforms=unwrap,...
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.add.fields=op,table,lsn,source.ts_ms
transforms.unwrap.add.headers=db
transforms.unwrap.delete.handling.mode=rewrite

この設定では、簡素化した Kafka レコードには以下のような内容が含まれます。

{
 ...
	"__op" : "c",
	"__table": "MY_TABLE",
	"__lsn": "123456789",
	"__source_ts_ms" : "123456789",
 ...
}

また、簡略化された Kafka のレコードには、__db ヘッダーが付いています。

簡素化した Kafka レコードでは、SMT はメタデータフィールド名の前にダブルアンダースコアを追加します。また、構造体を指定すると、SMT は構造体名とフィールド名の間にアンダースコアを挿入します。

DELETE 操作用のシンプルな Kafka レコードにメタデータを追加するには、delete.handling.mode=rewrite も設定する必要があります。

13.7.5. イベントフラット化変換を選択的に適用するオプション

データベースの変更が発生したときに Debezium コネクターが出力する変更イベントメッセージの他に、コネクターはハートビートメッセージなど、他のタイプのメッセージとスキーマ変更およびトランザクションに関するメタデータメッセージも出力します。これらの他のメッセージの構造は、SMT が処理するように設計された変更イベントメッセージの構造とは異なるため、目的のデータ変更メッセージのみを処理するようにコネクターを SMT を選んで適用することが推奨されます。

SMT を選択的に適用する方法の詳細は、変換用の SMT 述語の設定 を参照してください。

13.7.6. Debezium イベントフラット化変換設定用のオプション

次の表で、イベントフラット化 SMT を設定する際に指定することのできるオプションを説明します。

表13.7 イベントフラット化 SMT 設定オプションの説明
オプションデフォルト説明

drop.tombstones

true

Debezium は、DELETE 操作ごとに廃棄レコードを生成します。デフォルト動作では、イベントフラット化 SMT はストリームからトゥームストーンレコードを削除します。廃棄レコードをストリームに残すには、drop.tombstones=false を指定します。

delete.handling​.mode

drop

Debezium は、DELETE 操作ごとに変更イベントレコードを生成します。デフォルト動作では、イベントフラット化 SMT はストリームからこれらのレコードを削除します。DELETE 操作の Kafka レコードをストリームに残すには、delete.handling.modenone または rewrite に設定します。

ストリームに変更イベントの記録を残す場合は、none を指定します。レコードには "value": "null" のみが含まれています。

rewrite を指定して変更イベントレコードをストリームに残し、レコードを編集して、before フィールドにあったキーと値のペアを含む フィールドを作成し、さらに __deleted: truevalue に追加します。これは、レコードが削除されていることを示す別の方法です。

rewrite を指定すると、DELETE 操作の更新された簡素化したレコードだけで、削除されたレコードを追跡できます。Debezium コネクターが作成するトゥームストーンレコードをドロップするデフォルトの動作を受け入れることを検討できます。

route.by.field

 

行データを使用してレコードをルーティングするトピックを決定するには、このオプションを after フィールド属性に設定します。SMT は、指定した after フィールド属性の値にマッチする名前のトピックにレコードをルーティングします。DELETE 操作の場合は、このオプションを before フィールド属性に設定します。

たとえば、設定が route.by.field=destination の場合、名前が after.destination の値のトピックにレコードがルーティングされます。Debezium コネクターのデフォルト動作では、名前がデータベースおよび変更が加えられたテーブルの名前で設定されるトピックに、それぞれの変更イベントレコードが送信されます。

sink コネクターにイベントフラット化 SMT を設定する場合、このオプションを設定すると、ルーティング先トピックの名前が簡素化した変更イベントレコードで更新されるデータベーステーブルの名前に優先する場合に役立ちます。トピック名が実際のユースケースに適しない場合は、route.by.field を設定してイベントを再ルーティングすることができます。

add.fields.prefix

__ (double-underscore)

このオプションの文字列を設定して、フィールドに接頭辞を設定します。

add.fields

 

このオプションをメタデータフィールドのコンマ区切りリスト (スペースなし) に設定し、簡素化した Kafka レコードの値に追加します。フィールド名が重複している場合、それらのフィールドの 1 つのメタデータを追加するには、フィールドと共に構造体を指定します (例: source.ts_ms)。

オプションとして、<field name>:<new field name> でフィールド名を上書きできます。たとえば、新しいフィールド名は version:VERSION, connector:CONNECTOR, source.ts_ms:EVENT_TIMESTAMP のようになります。new field name は、大文字と小文字が区別されることに注意してください。

SMT が簡素化したレコードの値にメタデータフィールドを追加する場合、それぞれのメタデータフィールド名の前にダブルアンダースコアが追加されます。構造体の指定に関して、SMT は構造体名とフィールド名の間にもアンダースコアを挿入します。

変更イベントレコードにないフィールドを指定した場合でも、SMT はレコードの値にそのフィールドを追加します。

add.headers.prefix

__ (double-underscore)

このオプションの文字列を設定して、ヘッダーに接頭辞を設定します。

add.headers

 

このオプションをメタデータフィールドのコンマ区切りリスト (スペースなし) に設定し、簡素化した Kafka レコードのヘッダーに追加します。フィールド名が重複している場合、それらのフィールドの 1 つのメタデータを追加するには、フィールドと共に構造体を指定します (例: source.ts_ms)。

オプションとして、<field name>:<new field name> でフィールド名を上書きできます。たとえば、新しいフィールド名は version:VERSION, connector:CONNECTOR, source.ts_ms:EVENT_TIMESTAMP のようになります。new field name は、大文字と小文字が区別されることに注意してください。

SMT が簡素化したレコードのヘッダーにメタデータフィールドを追加する場合、それぞれのメタデータフィールド名の前にダブルアンダースコアが追加されます。構造体の指定に関して、SMT は構造体名とフィールド名の間にもアンダースコアを挿入します。

変更イベントレコードにないフィールドを指定した場合、SMT はヘッダーにそのフィールドを追加しません。

drop.fields.header.name

 

出力メッセージから削除するソースメッセージ内のフィールド名をリストするために使用する Kafka メッセージヘッダー名。

drop.fields.from.key

false

SMT でイベントのキーから drop.fields.header.name に記載されているフィールドを削除するかどうかを指定します。

drop.fields.keep.schema.compatible

true

SMT で、drop.fields.header.name 設定プロパティーに含まれるオプション以外のフィールドを削除するかどうかを指定します。

デフォルトでは、SMT は optional とマークされたフィールドのみを削除します。

Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.