7.2. Debezium Oracle コネクターのデータ変更イベントの説明


Oracle コネクターが出力する全データ変更イベントにはキーと値があります。キーと値の構造は、変更イベントの発生元となるテーブルによって異なります。Debezium のトピック名を構築する方法は、トピック名を参照してください。

警告

Debezium Db2 コネクターは、すべての Kafka Connect スキーマ名Avro スキーマ名の形式 に準拠するようにします。つまり、論理サーバー名はアルファベット文字またはアンダースコア ([a-z,A-Z,_]) で始まり、論理サーバー名の残りの文字と、スキーマおよびテーブル名の残りの文字は英数字またはアンダースコア ([a-z,A-Z,0-9,\_]) で始まる必要があります。コネクターは無効な文字をアンダースコアに自動的に置き換えます。

複数の論理サーバー名、スキーマ名、またはテーブル名の中で区別ができる文字のみが無効な場合に、アンダースコアに置き換えられると、命名で予期しない競合が発生する可能性があります。

Debezium および Kafka Connect は、イベントメッセージの継続的なストリーム を中心として設計されています。ただし、これらのイベントの構造は時間の経過とともに変化する可能性があり、トピックコンシューマーによる処理が困難になることがあります。変更可能なイベント構造の処理を容易にするため、Kafka Connect の各イベントは自己完結型となっています。すべてのメッセージキーと値には、スキーマペイロード の 2 つの部分で設定されます。スキーマはペイロードの構造を記述しますが、ペイロードには実際のデータが含まれます。

警告

SYSSYSTEM ユーザーアカウントが加える変更は、コネクターではキャプチャーされません。

データ変更イベントの詳細は、以下を参照してください。

7.2.1. Debezium Oracle コネクター変更イベントのキー

変更されたテーブルごとに変更イベントキーは、イベントの作成時にテーブルのプライマリーキー (または一意のキー制約) の各コラムにフィールドが存在するように設定されます。

たとえば、inventory データベーススキーマに定義されている customers テーブルには、以下の変更イベントキーが含まれる場合があります。

CREATE TABLE customers (
  id NUMBER(9) GENERATED BY DEFAULT ON NULL AS IDENTITY (START WITH 1001) NOT NULL PRIMARY KEY,
  first_name VARCHAR2(255) NOT NULL,
  last_name VARCHAR2(255) NOT NULL,
  email VARCHAR2(255) NOT NULL UNIQUE
);

<topic.prefix>.transaction 設定プロパティーの値が server1 に設定されている場合は、データベースの customers テーブルで発生するすべての変更イベントの JSON 表現には以下のキー構造が使用されます。

{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "type": "int32",
                "optional": false,
                "field": "ID"
            }
        ],
        "optional": false,
        "name": "server1.INVENTORY.CUSTOMERS.Key"
    },
    "payload": {
        "ID": 1004
    }
}

キーのスキーマ 部分には、キーの部分の内容を記述する Kafka Connect スキーマが含まれます。上記の例では、payload 値はオプションではなく、構造は server1.DEBEZIUM.CUSTOMERS.Key という名前のスキーマで定義され、タイプ int32id という名前の必須フィールドが 1 つあります。キーの payload フィールドの値から、id フィールドが 1 つでその値が 1004 の構造 (JSON では単なるオブジェクト) であることが分かります。

つまり、このキーは、id プライマリーキーの列にある値が 1004 の、inventory.customers テーブルの行 (名前が server1 のコネクターからの出力) を記述していると解釈できます。

7.2.2. Debezium Oracle 変更イベントの値

変更イベントメッセージの値の構造は、メッセージの変更イベントの メッセージキーの構造 をミラーリングし、スキーマ セクションと ペイロード セクションの両方が含まれます。

変更イベント値のペイロード

変更イベント値のペイロードセクションの エンベロープ 構造には、以下のフィールドが含まれます。

op
操作のタイプを記述する文字列値が含まれる必須フィールド。Oracle コネクターの変更イベント値のペイロードの op フィールドには、c (作成または挿入)、u (更新)、d (delete)、または r (スナップショットを示す read) のいずれかの値が含まれます。
before
任意のフィールド。存在する場合は、イベント発生 の行の状態を記述します。この構造は、server1.INVENTORY.CUSTOMERS.Value Kafka Connect スキーマによって記述され、server1 コネクターは inventory.customers テーブルのすべての行に使用します。
after
オプションのフィールドで、存在する場合は、変更が発生した の行の状態が含まれます。構造は、before フィールドに使用される server1.INVENTORY.CUSTOMERS.Value Kafka Connect スキーマによって記述されます。
source

イベントのソースメタデータを記述する構造体を含む必須フィールド。Oracle コネクターの場合、構造には以下のフィールドが含まれます。

  • Debezium のバージョン。
  • コネクター名。
  • イベントが進行中のスナップショットの一部であるかどうか。
  • トランザクション ID(スナップショットの含まない)。
  • 変更の SCN。
  • ソースデータベースのレコードがいつ変更されたかを示すタイムスタンプ (スナップショットの場合、タイムスタンプはスナップショットの発生時刻を示す)。
  • 変更を加えたユーザー名

    ヒント

    commit_scn フィールドは任意で、変更イベントが参加するトランザクションコミットの SCN を記述します。

ts_ms
任意のフィールド。存在する場合は、コネクターがイベントを処理した時間 (Kafka Connect タスクを実行する JVM のシステムクロックがベース) が含まれます。

変更イベント値のスキーマ

イベントメッセージの値の スキーマ 部分には、ペイロードのエンベロープ構造と、その中のネストされたフィールドを記述するスキーマが含まれます。

変更イベント値の詳細は、以下を参照してください。

作成 イベント

次の例は、イベントキーの変更 例で説明した customers テーブルの create イベントの値を示しています。

{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "ID"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "FIRST_NAME"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "LAST_NAME"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "EMAIL"
                    }
                ],
                "optional": true,
                "name": "server1.DEBEZIUM.CUSTOMERS.Value",
                "field": "before"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "ID"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "FIRST_NAME"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "LAST_NAME"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "EMAIL"
                    }
                ],
                "optional": true,
                "name": "server1.DEBEZIUM.CUSTOMERS.Value",
                "field": "after"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "string",
                        "optional": true,
                        "field": "version"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "name"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "ts_ms"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "txId"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "scn"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "commit_scn"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "rs_id"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "ssn"
                    },
                    {
                        "type": "int32",
                        "optional": true,
                        "field": "redo_thread"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "user_name"
                    },
                    {
                        "type": "boolean",
                        "optional": true,
                        "field": "snapshot"
                    }
                ],
                "optional": false,
                "name": "io.debezium.connector.oracle.Source",
                "field": "source"
            },
            {
                "type": "string",
                "optional": false,
                "field": "op"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "ts_ms"
            }
        ],
        "optional": false,
        "name": "server1.DEBEZIUM.CUSTOMERS.Envelope"
    },
    "payload": {
        "before": null,
        "after": {
            "ID": 1004,
            "FIRST_NAME": "Anne",
            "LAST_NAME": "Kretchmar",
            "EMAIL": "annek@noanswer.org"
        },
        "source": {
            "version": "2.3.4.Final",
            "name": "server1",
            "ts_ms": 1520085154000,
            "txId": "6.28.807",
            "scn": "2122185",
            "commit_scn": "2122185",
            "rs_id": "001234.00012345.0124",
            "ssn": 1,
            "redo_thread": 1,
            "user_name": "user",
            "snapshot": false
        },
        "op": "c",
        "ts_ms": 1532592105975
    }
}

上記の例では、イベントが以下のスキーマを定義する方法に注目してください。

  • エンベロープ (server1.DEBEZIUM.CUSTOMERS.Envelope)。
  • ソース 構造 (io.debezium.connector.oracle.Source、Oracle コネクターに固有で、すべてのイベントで再利用)。
  • before フィールドおよび after フィールドのテーブル固有のスキーマ。
ヒント

before フィールドおよび after フィールドのスキーマ名は <logicalName>.<schemaName>.<tableName>.Value の形式であるため、他のすべてのテーブルのスキーマからは完全に独立しています。その結果、Avro コンバーター を使用した場合、各論理ソースのテーブルの Avro スキーマは、それぞれ独自に進化し、独自の履歴を持つことになります。

このイベントの payload 部分には、イベントに関する情報が含まれます。行が作成された (op=c) ことを術士、および after フィールドの値に、行の IDFIRST_NAMELAST_NAME、および EMAIL 列に挿入された値が含まれていることを表しています。

ヒント

デフォルトでは、イベントの JSON 表現はそれが記述する行よりもはるかに大きいように見えることがあります。サイズが大きいのは、JSON 表現がメッセージのスキーマ部分とペイロード部分の両方を含んでいるためです。Avro コンバーター を使用すると、コネクターが Kafka トピックに記述するメッセージのサイズを小さくすることができます。

更新イベント

次の例は、前の create イベントと同じテーブルからコネクターが捕捉した 更新 変更イベントを示しています。

{
    "schema": { ... },
    "payload": {
        "before": {
            "ID": 1004,
            "FIRST_NAME": "Anne",
            "LAST_NAME": "Kretchmar",
            "EMAIL": "annek@noanswer.org"
        },
        "after": {
            "ID": 1004,
            "FIRST_NAME": "Anne",
            "LAST_NAME": "Kretchmar",
            "EMAIL": "anne@example.com"
        },
        "source": {
            "version": "2.3.4.Final",
            "name": "server1",
            "ts_ms": 1520085811000,
            "txId": "6.9.809",
            "scn": "2125544",
            "commit_scn": "2125544",
            "rs_id": "001234.00012345.0124",
            "ssn": 1,
            "redo_thread": 1,
            "user_name": "user",
            "snapshot": false
        },
        "op": "u",
        "ts_ms": 1532592713485
    }
}

ペイロードは create (挿入) イベントのペイロードと同じですが、以下の値は異なります。

  • op フィールドの値が u であり、この行が更新により変更されたことを示す。
  • before フィールドは、update データベースのコミット前に存在する値が含まれる行の以前の状態を表示します。
  • after フィールドは行の更新状態を示しており、EMAIL の値は現在 anne@example.com に設定されています。
  • source フィールドの構造は以前と同じフィールドを含みますが、コネクターが REDO ログ内の異なる位置からイベントをキャプチャしたため、値は異なります。
  • ts_ms フィールドは、Debezium がいつイベントを処理したかを示すタイムスタンプを示す。

payload セクションでは、他に有用な情報を複数示しています。たとえば、beforeafter の構造を比較して、コミットの結果として行がどのように変更されたかを判断できます。ソース 構造で、この変更の記録に関する情報がわかるので、追跡が可能です。また、このトピックや他のトピックの他のイベントと関連して、このイベントが発生した場合に、見識を得ることができます。これは、別のイベントと同じコミットの前、後、または一部として発生していましたか ?

注記

行のプライマリーキー/一意キーの列を更新すると、行のキーの値が変更されます。その結果、Debezium はこのような更新後に 3 つのイベントを出力します。

  • DELETE イベント。
  • 行のキーが古い tombstone イベント
  • 行に新しいキーを提供する INSERT イベント。

delete イベント

次の例は、先の createupdate のイベント例で示したテーブルの delete イベントを示しています。delete イベントの schema 部分は、これらのイベントの schema 部分と同一です。

{
    "schema": { ... },
    "payload": {
        "before": {
            "ID": 1004,
            "FIRST_NAME": "Anne",
            "LAST_NAME": "Kretchmar",
            "EMAIL": "anne@example.com"
        },
        "after": null,
        "source": {
            "version": "2.3.4.Final",
            "name": "server1",
            "ts_ms": 1520085153000,
            "txId": "6.28.807",
            "scn": "2122184",
            "commit_scn": "2122184",
            "rs_id": "001234.00012345.0124",
            "ssn": 1,
            "redo_thread": 1,
            "user_name": "user",
            "snapshot": false
        },
        "op": "d",
        "ts_ms": 1532592105960
    }
}

イベントの payload 部分は、create または update イベントのペイロードと比較して、いくつかの違いを示しています。

  • op フィールドの値は d であり、行が削除されたことを意味します。
  • before フィールドは、データベースのコミットで削除された行の旧状態を示します。
  • after フィールドの値が null 場合、その行はもはや存在しないことを意味します。
  • source フィールドの構造には、create または update イベントに存在する多くのキーが含まれるが、ts_msscntxId フィールドの値は異なっている。
  • ts_ms は、Debezium がこのイベントを処理したタイミングを示すタイムスタンプを示します。

delete イベントは、コンシューマーがこの行の削除を処理するために必要な情報を提供する。

Oracle コネクターのイベントは、Kafka ログコンパクション と連携するように設計されており、すべてのキーで最新のメッセージが保持される限り、古いメッセージを削除できます。これにより、トピックに完全なデータセットが含まれ、キーベースの状態のリロードに使用できるようにするとともに、Kafka がストレージ領域を確保できるようにします。

行が削除されると、Kafka は同じキーを使用する以前のメッセージをすべて削除できるため、前の例で示された delete イベント値は、ログ圧縮でまだ機能します。同じキーを共有する メッセージをすべて 削除するように Kafka に指示するには、メッセージの値を null に設定する必要があります。これを可能にするにはデフォルトで、Debezium Oracle コネクターは、値が null 以外で同じキーを持つ特別な 廃棄 イベントが含まれる 削除 イベントに従います。コネクタープロパティー tombstones.on.delete を設定すると、デフォルトの動作を変更できます。

切り捨て (truncate) イベント

切り捨て (truncate) 変更イベントは、テーブルが切り捨てられていることを伝えます。この場合のメッセージキーは null で、メッセージの値は以下のようになります。

{
    "schema": { ... },
    "payload": {
        "before": null,
        "after": null,
        "source": { 1
            "version": "2.3.4.Final",
            "connector": "oracle",
            "name": "oracle_server",
            "ts_ms": 1638974535000,
            "snapshot": "false",
            "db": "ORCLPDB1",
            "sequence": null,
            "schema": "DEBEZIUM",
            "table": "TEST_TABLE",
            "txId": "02000a0037030000",
            "scn": "13234397",
            "commit_scn": "13271102",
            "lcr_position": null,
            "rs_id": "001234.00012345.0124",
            "ssn": 1,
            "redo_thread": 1,
            "user_name": "user"
        },
        "op": "t", 2
        "ts_ms": 1638974558961, 3
        "transaction": null
    }
}
表7.8 切り捨て (truncate) イベント値フィールドの説明
項目フィールド名説明

1

source

イベントのソースメタデータを記述する必須のフィールド。切り捨て (truncate) イベント値の source フィールド構造は、同じテーブルの 作成更新、および 削除 イベントと同じで、以下のメタデータを提供します。

  • Debezium バージョン
  • コネクター型および名前
  • 新しい行が含まれるデータベースおよびテーブル
  • スキーマ名
  • イベントがスナップショットの一部である場合 (truncateイベントでは常にfalse)
  • 操作が実行されたトランザクションの ID
  • 操作の SCN
  • データベースに変更が加えられた時点のタイムスタンプ
  • 変更を行ったユーザー名

2

op

操作の型を記述する必須の文字列。op フィールドの値は t で、このテーブルが切り捨てされたことを示します。

3

ts_ms

コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。

source オブジェクトで、ts_ms は変更がデータベースに加えられた時間を示します。payload.source.ts_ms の値を payload.ts_ms の値と比較することにより、ソースデータベースの更新と Debezium との間の遅延を判断できます。

切り捨て (truncate) イベントは、テーブル全体に加えられた変更を表し、メッセージキーを持たないため、コンシューマーが 切り捨て られたイベントや変更イベントを受信する保証はありません (作成更新 など)。たとえば、コンシューマーが異なるパーティションからイベントを読み込む場合、同じテーブルの truncate イベントを受信した後に、テーブルの update イベントを受信することがあります。順序は、トピックが単一のパーティションを使用する場合にのみ保証されます。

切り捨て (truncate) イベントをキャプチャーしない場合は、skipped.operations オプションを使用して除外します。

Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.