13.4. Debezium イベントレコードからのフィールドレベルの変更抽出
Debezium のデータ変更イベントは、さまざまな情報を提供する複雑な構造を持ちます。ただし、場合によっては、元のデータベース変更から生じるフィールドレベルの変更に関する追加情報がないと、ダウンストリームコンシューマーが Debezium 変更イベントメッセージを処理できません。Debezium には、データベース操作がソースデータベースのフィールドを変更する方法を含むイベントメッセージを強化するために、ExtractChangedRecordState
単一メッセージ変換 (SMT) が同梱されています。
イベント変更変換は Kafka Connect SMT です。
13.4.1. Debezium 変更イベントの構造について
Debezium は、複雑な構造を持つデータ変更イベントを生成します。各イベントは以下の部分で構成されています。
メタデータには、次のタイプが含まれますが、限定されません。
- データを変更した操作のタイプ。
- データベースの名前や、変更が発生したテーブルなどのソース情報。
- 変更が加えられたタイミングを識別するタイムスタンプ。
- (任意の項目) トランザクション情報
- 変更前の行データ。
- 変更後の行データ。
次の例は、典型的な Debezium UPDATE
変更イベントの構造の一部を示しています。
{ "op": "u", "source": { ... }, "ts_ms" : "...", "before" : { "field1" : "oldvalue1", "field2" : "oldvalue2" }, "after" : { "field1" : "newvalue1", "field2" : "newvalue2" } }
前の例の複雑なメッセージ形式により、ソースデータベースで発生した変更に関する詳細情報が提供されます。ただし、この形式は一部のダウンストリームコンシューマーには適していない可能性があります。sink コネクター、または Kafka エコシステムの他の部分では、データベース操作によって変更または変更されないフィールドを明示的に識別するメッセージが必要な場合があります。ExtractChangedRecordState
SMT は、変更イベントメッセージにヘッダーを追加して、データベース操作によって変更されるフィールドと変更されないフィールドを識別します。
13.4.2. Debezium イベントの動作による SMT の変更
イベント変更 SMT は、Kafka レコードの Debezium UPDATE
変更イベントから before
と after
フィールドを抽出します。変換では、before
と after
イベントの状態構造を調べて、操作によって変更されたフィールドと変更されないフィールドを識別します。コネクターの設定に応じて、変換により、変更されたフィールド、変更されていないフィールド、またはその両方をリストするメッセージヘッダーを追加する変更されたイベントメッセージが生成されます。イベントが INSERT
または DELETE
を表す場合、この単一のメッセージ変換は効果がありません。
イベント変更 SMT は、Debezium コネクター、または Debezium コネクターによって発行されたメッセージを消費する sink コネクターに対して設定できます。Apache Kafka で元の Debezium 変更イベント全体を保持する場合は、sink コネクターのイベント変更 SMT を設定します。SMT を元のコネクターまたは sink コネクターに適用するかどうかの判断は、特定のユースケースによります。
ユースケースに応じて、以下のタスクのいずれかまたは両方を実行して、元のメッセージを変更するように変換を設定できます。
-
UPDATE
イベントによって変更されたフィールドを、ユーザーが設定したheader.changed.name
ヘッダーに一覧表示することで識別します。 -
UPDATE
イベントによって変更されていないフィールドを、ユーザーが設定したheader.unchanged.name
ヘッダーに一覧表示することで識別します。
13.4.3. Debezium イベント変更 SMT の設定
Kafka Connect ソースまたは sink コネクターの Debezium イベント変更 SMT を設定するには、SMT 設定の詳細をコネクターの設定に追加します。ヘッダーを追加しないデフォルトの動作を取得するには、次の例のように、コネクター設定に変換を追加します。
transforms=changes,... transforms.changes.type=io.debezium.transforms.ExtractChangedRecordState
他の Kafka Connect のコネクター設定と同様に、transforms=
にコンマで区切られた複数の SMT エイリアスを設定し、Kafka Connect に SMT を適用させたい順番に設定することができます。
次の例のコネクター設定では、イベント変更 SMT のオプションを複数設定します。
transforms=changes,... transforms.changes.type=io.debezium.transforms.ExtractChangedRecordState transforms.changes.header.changed.name=Changed transforms.changes.header.unchanged.name=Unchanged
header.changed.name
- データベース操作によって変更されるフィールドのコンマ区切りリストを保存するために使用する Kafka メッセージヘッダー名。
header.unchanged.name
- データベース操作後に変更されないフィールドのコンマ区切りリストを保存するために使用する Kafka メッセージヘッダー名。
設定のカスタマイズ
コネクターは、多くの種類のイベントメッセージ (ハートビートメッセージ、廃棄メッセージ、またはトランザクションまたはスキーマの変更に関するメタデータメッセージ) を発行する場合があります。イベントのサブセットに変換を適用するには、特定のイベントだけを対象にして 変換を選択して適用する SMT 述語ステートメント を定義できます。
13.4.4. イベント変更変換を選択的に適用するためのオプション
データベースの変更が発生したときに Debezium コネクターが出力する変更イベントメッセージの他に、コネクターはハートビートメッセージなど、他のタイプのメッセージとスキーマ変更およびトランザクションに関するメタデータメッセージも出力します。これらの他のメッセージの構造は、SMT が処理するように設計された変更イベントメッセージの構造とは異なるため、目的のデータ変更メッセージのみを処理するようにコネクターを SMT を選んで適用することが推奨されます。
SMT を選択的に適用する方法の詳細は、変換用の SMT 述語の設定 を参照してください。
13.4.5. Debezium イベント変更 SMT の設定オプションの説明
次の表で、イベントフラット化 SMT を設定する際に指定できるオプションを説明します。
オプション | デフォルト | 説明 |
---|---|---|
データベース操作によって変更されるフィールドのコンマ区切りリストを保存するために使用する Kafka メッセージヘッダー名。 | ||
データベース操作後に変更されないフィールドのコンマ区切りリストを保存するために使用する Kafka メッセージヘッダー名。 |