第6章 Oracle の Debezium コネクター
Debezium の Oracle コネクターは、Oracle サーバーのデータベースで発生する行レベルの変更をキャプチャーして記録します。これには、コネクターの実行中に追加されたテーブルが含まれます。コネクターを設定して、スキーマおよびテーブルの特定のサブセットの変更イベントを出力したり、特定の列で値を無視、マスク、または切り捨てしたりするように設定できます。
このコネクターと互換性のある Oracle データベースのバージョンについては、Debezium でサポートされる設定ページを参照してください。
ネイティブの LogMiner データベースパッケージを使用して、Debezium が Oracle から最も新しい変更イベントを取り込みます。
Debezium Oracle コネクターの使用に関する情報および手順は、以下のように整理されています。
6.1. Debezium Oracle コネクターの仕組み
Debezium Oracle コネクターを最適に設定し実行するには、コネクターがどのようにスナップショットを実行し、変更イベントをストリームして、Kafka トピック名を決定し、メタデータを使用して、イベントバッファーリングを実装するのかを理解することが役に立ちます。
詳細は、以下のトピックを参照してください。
6.1.1. Debezium Oracle コネクターによるデータベーススナップショットの実行方法
通常、Oracle サーバーの redo ログは、WAL セグメントにデータベースの全履歴を保持するようには設定されていません。そのため、Debezium Oracle コネクターはログからデータベースの履歴全体を取得できません。コネクターがデータベースの現在の状態のベースラインを確立できるようにするには、コネクターの初回起動時に、データベースの最初の 整合性スナップショット を実行します。
snapshot.mode
コネクター設定プロパティーの値を設定することで、コネクターがスナップショットを作成する方法をカスタマイズできます。デフォルトでは、コネクターのスナップショットモードは initial
に設定されます。
初期スナップショットを作成するデフォルトのコネクターワークフロー
スナップショットモードがデフォルトに設定されている場合には、コネクターは以下の作業を完了してスナップショットを作成します。
- キャプチャーするテーブルを決定します。
-
スナップショットの作成時に構造が変更されないように監視されているテーブルごとに
ROW SHARE MODE
ロックを取得します。Debezium は短期間のみ、ロックを保持します。 - サーバーの redo ログから現在のシステム変更番号 (SCN) の位置を読み取ります。
- 関連するテーブルすべての構造をキャプチャーします。
- ステップ 2 で取得したロックを解放します。
-
手順 3 で読み込まれた SCN の位置で有効なものとして、関連するデータベーステーブルとスキーマをすべてスキャンして (
SELECT * FROM … AS OF SCN 123
)、各行にREAD
イベントを生成し、イベントレコードをテーブル固有の Kafka トピックに書き込みます。 - コネクターオフセットにスナップショットの正常な完了を記録します。
スナップショットプロセスが開始されたら、コネクターの障害、リバランス、またはその他の理由でプロセスが中断されると、コネクターの再起動後にプロセスが再起動されます。コネクターによって最初のスナップショットが完了した後、更新に抜けがないように、ステップ 3 で読み取りした位置からストリーミングを続行します。何らかの理由でコネクターが再び停止した場合に、コネクターは再起動後に最後に停止した位置から変更のストリーミングを再開します。
設定 | 説明 |
---|---|
| コネクターは、最初のスナップショット を作成するためのデフォルトのワークフローで説明されているように、データベーススナップショットを実行します。スナップショットが完了すると、コネクターは、後続のデータベース変更のに備え、イベントレコードのストリーミングを開始します。 |
| コネクターはデータベースのスナップショットを実行し、変更イベントレコードをストリーミングする前に停止して、それ以降の変更イベントのキャプチャを許可しません。 |
|
コネクターは関連するすべてのテーブルの構造をキャプチャーし、デフォルトのスナップショットワークフロー に記載されているすべてのステップを実行します。ただし、コネクターの起動時 (Step 6) の時点でデータセットを表す |
|
失われたまたは破損するデータベース履歴トピックを復元するには、このオプションを設定します。再起動後、コネクターはソーステーブルからトピックを再構築するスナップショットを実行します。また、このプロパティーを設定して、予期しない増加が発生するデータベース履歴トピックを定期的にプルーニングすることもできます。 |
詳細については、コネクター設定プロパティーテーブルの snapshot.mode
をご覧ください。
6.1.1.1. アドホックスナップショット
デフォルトでは、コネクターは初回スナップショット操作の開始後にのみ実行されます。通常の状況では、この最初のスナップショットが作成されると、コネクターではスナップショットプロセスは繰り返し処理されません。コネクターがキャプチャーする今後の変更イベントデータはストリーミングプロセス経由でのみ行われます。
ただし、場合によっては、最初のスナップショット中にコネクターを取得したデータが古くなったり、失われたり、または不完全となったり可能性があります。テーブルデータを再キャプチャーするメカニズムを提供するため、Debezium にはアドホックスナップショットを実行するオプションがあります。データベースで以下が変更されたことで、アドホックスナップショットが実行される場合があります。
- コネクター設定は、異なるテーブルセットをキャプチャーするように変更されます。
- Kafka トピックを削除して、再構築する必要があります。
- 設定エラーや他の問題が原因で、データの破損が発生します。
アドホックと呼ばれるスナップショット を開始することで、以前にスナップショットをキャプチャーしたテーブルのスナップショットを再実行できます。アドホックスナップショットには、シグナルテーブル を使用する必要があります。シグナルリクエストを Debezium シグナルテーブルに送信して、アドホックスナップショットを開始します。
既存のテーブルのアドホックスナップショットを開始すると、コネクターはテーブルにすでに存在するトピックにコンテンツを追加します。既存のトピックが削除された場合には、トピックの自動作成 が有効になっているのであれば、Debezium は自動的にトピックを作成できます。
アドホックのスナップショットシグナルは、スナップショットに追加するテーブルを指定します。スナップショットは、データベースの内容全体をキャプチャーしたり、データベース内のテーブルのサブセットのみをキャプチャーしたりできます。
execute-snapshot
メッセージをシグナルテーブルに送信してキャプチャーするテーブルを指定します。以下の表で説明されているように、run-snapshot
シグナルのタイプを incremental
に設定し、スナップショットに追加するテーブルの名前を指定します。
フィールド | デフォルト | 値 |
---|---|---|
|
|
実行するスナップショットのタイプを指定します。 |
| 該当なし |
スナップショットを作成するテーブルの完全修飾名が含まれる配列。 |
アドホックスナップショットのトリガー
execute-snapshot
シグナルタイプのエントリーをシグナルテーブルに追加して、アドホックスナップショットを開始します。コネクターがメッセージを処理した後に、スナップショット操作を開始します。スナップショットプロセスは、最初と最後のプライマリーキーの値を読み取り、これらの値を各テーブルの開始ポイントおよびエンドポイントとして使用します。テーブルのエントリー数と設定されたチャンクサイズに基づいて、Debezium はテーブルをチャンクに分割し、チャンクごとに 1 度に 1 つずつスナップショットを順番に作成していきます。
現在、execute-snapshot
アクションタイプは 増分スナップショット のみをトリガーします。詳細は、スナップショットの増分を参照してください。
6.1.1.2. 増分スナップショット
スナップショットを柔軟に管理するため、Debezium には 増分スナップショット と呼ばれる補助スナップショットメカニズムが含まれています。増分スナップショットは、Debezium コネクターにシグナルを送信するための Debezium メカニズムに依存します。
増分スナップショットでは、最初のスナップショットのように、データベースの完全な状態を一度にすべてキャプチャーする代わりに、一連の設定可能なチャンクで各テーブルを段階的にキャプチャーします。スナップショットがキャプチャーするテーブルと、各チャンクのサイズ を指定できます。チャンクのサイズにより、データベース上の各フェッチ操作中にスナップショットで収集される行数が決まります。増分スナップショットのデフォルトのチャンクサイズは 1 KB です。
増分スナップショットが進むと、Debezium はウォーターマークを使用して進捗を追跡し、キャプチャーする各テーブル行のレコードを管理します。この段階的なアプローチでは、標準の初期スナップショットプロセスと比較して、以下の利点があります。
- スナップショットが完了するまで、ストリーミングストリーミングを延期する代わりに、ストリームしたデータキャプチャーと並行して増分スナップショットを実行できます。コネクターはスナップショットプロセス全体で変更ログからのほぼリアルタイムイベントをキャプチャーし続け、他の操作はブロックしません。
- 増分スナップショットの進捗が中断された場合は、データを失うことなく再開できます。プロセスが再開すると、スナップショットは最初からテーブルをキャプチャーするのではなく、停止した時点から開始します。
-
いつでも増分スナップショットを実行し、必要に応じてプロセスを繰り返してデータベースの更新に適合できます。たとえば、コネクター設定を変更してテーブルを
table.include.list
プロパティーに追加した後にスナップショットを再実行します。
増分スナップショットプロセス
増分スナップショットを実行する場合には、Debezium は各テーブルをプライマリーキー別に分類して、設定されたチャンクサイズ に基づいてテーブルをチャンクに分割します。チャンクごとに作業し、テーブルの行ごとにチャンクでキャプチャーします。キャプチャーする行ごとに、スナップショットは READ
イベントを出力します。そのイベントは、対象となるチャンクのスナップショットを開始する時の行の値を表します。
スナップショットの作成が進むにつれ、他のプロセスがデータベースへのアクセスを継続し、テーブルレコードが変更される可能性があります。このような変更を反映させるように、通常通りに INSERT
、UPDATE
、DELETE
操作がトランザクションログにコミットされます。同様に、継続中の Debezium ストリーミングプロセスは、これらの変更イベントを検出し、対応する変更イベントレコードを Kafka に出力します。
Debezium を使用してプライマリーキーが同じレコード間での競合を解決する方法
場合によっては、ストリーミングプロセスが出力する UPDATE
または DELETE
イベントを順番に受信できます。つまり、ストリーミングプロセスは、スナップショットがその行の READ
イベントが含まれるチャンクをキャプチャーする前に、テーブルの行を変更するイベントを生成する可能性があります。スナップショットが最終的に対象の行にあった READ
イベントを出力すると、その値はすでに置き換えられています。Debezium は、シーケンスが到達する増分スナップショットイベントが正しい論理順序で処理されるように、競合を解決するためにバッファースキームを使用します。スナップショットのイベント間で競合が発生し、ストリームされたイベントが解決されてからでないと、Debezium はイベントのレコードを Kafka に送信しません。
スナップショットウィンドウ
遅れて入ってきた READ
イベントと、同じテーブルの行を変更するストリーミングイベント間の競合の解決を容易にするために、Debezium は スナップショットウィンドウ と呼ばれるものを使用します。スナップショットウィンドウは、増分スナップショットが指定のテーブルチャンクのデータをキャプチャーしている途中に、間隔を決定します。チャンクのスナップショットウィンドウを開く前に、Debezium は通常の動作に従い、トランザクションログから直接ターゲットの Kafka トピックにイベントをダウンストリームに出力します。ただし、特定のチャンクのスナップショットが開放された瞬間から終了するまで、Debezium は重複除去のステップを実行して、プライマリーキーが同じイベント間での競合を解決します。
データコレクションごとに、Debezium は 2 種類のイベントを出力し、それらの両方のレコードを単一の宛先 Kafka トピックに保存します。テーブルから直接キャプチャーするスナップショットレコードは、READ
操作として出力されます。その間、ユーザーはデータコレクションのレコードの更新を続け、各コミットを反映するようにトランザクションログが更新されるので、Debezium は変更ごとに UPDATE
または DELETE
操作を出力します。
スナップショットウィンドウが開放され、Debezium がスナップショットチャンクの処理を開始すると、スナップショットレコードをメモリーバッファーに提供します。スナップショットウィンドウ中に、バッファー内の READ
イベントのプライマリーキーは、受信ストリームイベントのプライマリーキーと比較されます。一致するものが見つからない場合、ストリーミングされたイベントレコードが Kafka に直接送信されます。Debezium が一致を検出すると、バッファーされた READ
イベントを破棄し、ストリーミングされたレコードを宛先トピックに書き込みます。これは、ストリーミングされたイベントが静的スナップショットイベントよりも論理的に優先されるためです。チャンクのスナップショットウィンドウが終了すると、バッファーに含まれるのは、関連するトランザクションログイベントが存在しない READ
イベントのみです。Debezium は、これらの残りの READ
イベントをテーブルの Kafka トピックに出力します。
コネクターは各スナップショットチャンクにプロセスを繰り返します。
増分スナップショットのトリガー
現在、増分スナップショットを開始する唯一の方法は、アドホックスナップショットシグナル をソースデータベースのシグナルテーブルに送信することです。SQL INSERT
クエリーとしてテーブルにシグナルを送信します。Debezium がシグナルテーブルの変更を検出すると、シグナルを読み取り、要求されたスナップショット操作を実行します。
送信するクエリーはスナップショットに追加するテーブルを指定し、必要に応じてスナップショット操作の種類を指定します。現在、スナップショット操作で唯一の有効なオプションはデフォルト値の incremental
だけです。
スナップショットに追加するテーブルを指定するには、テーブルを一覧表示する data-collections
アレイを指定します (例:
{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}
)。
増分スナップショットシグナルの data-collections
アレイにはデフォルト値がありません。data-collections
アレイが空である場合には、アクションが不要であり、スナップショットを実行しないことが、Debezium で検出されます。
スナップショットに含めるテーブルの名前に、データベース、スキーマ、またはテーブルの名前にドット (.
) が含まれている場合、そのテーブルを data-collections
配列に追加するには、名前の各パートを二重引用符でエスケープする必要があります。
たとえば、以下のようなテーブルを含めるには public
スキーマに存在し、その名前が My.Table
を持つテーブルを含めるには、次の形式を使用します。"public"."My.Table"
前提条件
- シグナルデータコレクションがソースのデータベースに存在し、コネクターはこれをキャプチャーするように設定されています。
-
シグナルデータコレクションは
signal.data.collection
プロパティーで指定されます。
手順
SQL クエリーを送信し、アドホック増分スナップショット要求をシグナルテーブルに追加します。
INSERT INTO _<signalTable>_ (id, type, data) VALUES (_'<id>'_, _'<snapshotType>'_, '{"data-collections": ["_<tableName>_","_<tableName>_"],"type":"_<snapshotType>_"}');
以下に例を示します。
INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.table1", "schema2.table2"],"type":"incremental"}');
コマンドの
id
、type
、およびdata
パラメーターの値は、シグナルテーブルのフィールド に対応します。以下の表では、これらのパラメーターについて説明しています。
表6.3 シグナルテーブルに増分スナップショットシグナルを送信する SQL コマンドのフィールドの説明 値 説明 myschema.debezium_signal
ソースデータベースにあるシグナルテーブルの完全修飾名を指定します。
ad-hoc-1
id
パラメーターは、シグナルリクエストのID
識別子として割り当てられる任意の文字列を指定します。
この文字列を使用して、シグナルテーブルのエントリーへのログメッセージを特定します。Debezium はこの文字列を使用しません。代わりに、スナップショット作成中に、Debezium は独自のID
文字列をウォーターマークシグナルとして生成します。execute-snapshot
type
パラメーターを指定し、シグナルがトリガーする操作を指定します。
data-collections
スナップショットに含めるテーブル名の配列を指定するシグナルの
data
フィールドの必須コンポーネント。
配列は、signal.data.collection
設定プロパティーにコネクターのシグナルテーブルの名前を指定するときに使用する形式で、完全修飾名別にテーブルを一覧表示します。incremental
実行するスナップショット操作の種類指定するシグナルの
data
フィールドの任意のtype
コンポーネント。
現在、唯一の有効なオプションはデフォルト値incremental
だけです。
シグナルテーブルに送信する SQL クエリーでのtype
値の指定は任意です。
値を指定しない場合には、コネクターは増分スナップショットを実行します。
以下の例は、コネクターによってキャプチャーされる増分スナップショットイベントの JSON を示しています。
例: 増分スナップショットイベントメッセージ
{ "before":null, "after": { "pk":"1", "value":"New data" }, "source": { ... "snapshot":"incremental" 1 }, "op":"r", 2 "ts_ms":"1620393591654", "transaction":null }
項目 | フィールド名 | 説明 |
---|---|---|
1 |
|
実行するスナップショット操作タイプを指定します。 |
2 |
|
イベントタイプを指定します。 |
Oracle の Debezium コネクターでは、増分スナップショットの実行中のスキーマの変更はサポートしません。
6.1.2. Debezium Oracle 変更イベントレコードを受信する Kafka トピックのデフォルト名
デフォルトでは、Oracle コネクターは、テーブルで発生するすべての INSERT
、UPDATE
、DELETE
操作の変更イベントを、そのテーブルに固有の単一の Apache Kafka トピックに書き込みます。コネクターは以下の規則を使用して変更イベントトピックに名前を付けます。
serverName.schemaName.tableName
以下のリストは、デフォルト名のコンポーネントの定義を示しています。
- serverName
-
database.server.name
コネクター設定プロパティーで指定したサーバーの論理名です。 - schemaName
- 操作が発生したスキーマの名前。
- tableName
- 操作が発生したテーブルの名前。
たとえば、fulfillment
がサーバー名、inventory
がスキーマ名で、データベースに orders
、customers
、products
という名前のテーブルが含まれる場合には、Debezium Oracle コネクターは、データベースのテーブルごとに 1 つ、以下の Kafka トピックにイベントを出力します。
fulfillment.inventory.orders fulfillment.inventory.customers fulfillment.inventory.products
コネクターは同様の命名規則を適用して、内部データベース履歴トピック (スキーマ変更トピック と トランザクションメタデータトピック) にラベルを付けます。
デフォルトのトピック名が要件を満たさない場合は、カスタムトピック名を設定できます。カスタムトピック名を設定するには、論理トピックルーティング SMT に正規表現を指定します。論理トピックルーティング SMT を使用してトピックの命名をカスタマイズする方法は、トピックルーティング を参照してください。
6.1.3. Debezium Oracle コネクターによるデータベーススキーマの変更の公開方法
Debezium Oracle コネクターは、データベース内のキャプチャされたテーブルに適用される構造的な変更を記述するスキーマ変更イベントを生成するように設定することができます。コネクターは、スキーマ変更イベントをすべて <serverName>
という名前の Kafka トピックに書き込みます。serverName
は database.server.name
設定プロパティーに指定された論理サーバー名になります。
Debezium は、新しいテーブルからデータをストリーミングするたびに、このトピックに新しいメッセージを出力します。
コネクターがスキーマ変更トピックに送信するメッセージには、ペイロードと、任意で変更イベントメッセージのスキーマが含まれます。スキーマ変更イベントメッセージのペイロードには、以下の要素が含まれます。
ddl
-
スキーマの変更につながる SQL
CREATE
、ALTER
、またはDROP
ステートメントを提供します。 databaseName
-
ステートメントが適用されるデータベースの名前。
databaseName
の値は、メッセージキーとして機能します。 tableChanges
-
スキーマの変更後のテーブルスキーマ全体の構造化表現。
tableChanges
フィールドには、テーブルの各列のエントリーなどのアレイが含まれます。構造化された表現は JSON または Avro 形式でデータを表示するため、コンシューマーは DDL パーサーを介して最初にメッセージを処理しなくてもメッセージを簡単に読み取りできます。
デフォルトでは、コネクターは ALL_TABLES
データベースビューを使用して、スキーマ履歴トピックに格納するテーブル名を識別します。そのビュー内で、コネクターは、データベースへの接続に使用するユーザーアカウントが使用できるテーブルからのみデータにアクセスできます。スキーマ履歴トピックが異なるテーブルのサブセットを格納するように設定を変更できます。次のいずれかの方法を使用して、トピックが格納するテーブルセットを変更します。* Debezium がデータベースへのアクセスに使用するアカウントの権限を変更して、別のテーブルセットが ALL_TABLES
ビューに表示されるようにします。* コネクタープロパティー database.history.store.only.captured.tables.ddl
を true
に設定します。
コネクターがテーブルをキャプチャするように設定されている場合、テーブルのスキーマ変更の履歴は、スキーマ変更トピックだけでなく、内部データベース履歴トピックにも格納されます。内部データベース履歴トピックはコネクターのみの使用を対象としており、使用するアプリケーションによる直接使用を目的としていません。スキーマ変更に関する通知が必要なアプリケーションが、スキーマ変更トピックからの情報のみを使用するようにしてください。
データベース履歴トピックをパーティションに分割しないでください。データベース履歴トピックが正しく機能するには、コネクターが出力するイベントレコードの一貫したグローバル順序を維持する必要があります。
トピックがパーティション間で分割されないようにするには、以下のいずれかの方法を使用してトピックのパーティション数を設定します。
-
データベース履歴トピックを手動で作成する場合は、パーティション数を
1
に指定します。 -
Apache Kafka ブローカーを使用してデータベース履歴トピックを自動的に作成する場合に、トピックが作成されるので、Kafka
num.partitions
設定オプションの値を1
に設定します。
例: Oracle コネクタースキーマ変更トピックに発行されたメッセージ
以下の例は、JSON 形式の一般的なスキーマ変更メッセージを示しています。メッセージには、テーブルスキーマの論理表現が含まれます。
{ "schema": { ... }, "payload": { "source": { "version": "1.9.5.Final", "connector": "oracle", "name": "server1", "ts_ms": 1588252618953, "snapshot": "true", "db": "ORCLPDB1", "schema": "DEBEZIUM", "table": "CUSTOMERS", "txId" : null, "scn" : "1513734", "commit_scn": "1513754", "lcr_position" : null, "rs_id": "001234.00012345.0124", "ssn": 1, "redo_thread": 1 }, "databaseName": "ORCLPDB1", 1 "schemaName": "DEBEZIUM", // "ddl": "CREATE TABLE \"DEBEZIUM\".\"CUSTOMERS\" \n ( \"ID\" NUMBER(9,0) NOT NULL ENABLE, \n \"FIRST_NAME\" VARCHAR2(255), \n \"LAST_NAME" VARCHAR2(255), \n \"EMAIL\" VARCHAR2(255), \n PRIMARY KEY (\"ID\") ENABLE, \n SUPPLEMENTAL LOG DATA (ALL) COLUMNS\n ) SEGMENT CREATION IMMEDIATE \n PCTFREE 10 PCTUSED 40 INITRANS 1 MAXTRANS 255 \n NOCOMPRESS LOGGING\n STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645\n PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1\n BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)\n TABLESPACE \"USERS\" ", 2 "tableChanges": [ 3 { "type": "CREATE", 4 "id": "\"ORCLPDB1\".\"DEBEZIUM\".\"CUSTOMERS\"", 5 "table": { 6 "defaultCharsetName": null, "primaryKeyColumnNames": [ 7 "ID" ], "columns": [ 8 { "name": "ID", "jdbcType": 2, "nativeType": null, "typeName": "NUMBER", "typeExpression": "NUMBER", "charsetName": null, "length": 9, "scale": 0, "position": 1, "optional": false, "autoIncremented": false, "generated": false }, { "name": "FIRST_NAME", "jdbcType": 12, "nativeType": null, "typeName": "VARCHAR2", "typeExpression": "VARCHAR2", "charsetName": null, "length": 255, "scale": null, "position": 2, "optional": false, "autoIncremented": false, "generated": false }, { "name": "LAST_NAME", "jdbcType": 12, "nativeType": null, "typeName": "VARCHAR2", "typeExpression": "VARCHAR2", "charsetName": null, "length": 255, "scale": null, "position": 3, "optional": false, "autoIncremented": false, "generated": false }, { "name": "EMAIL", "jdbcType": 12, "nativeType": null, "typeName": "VARCHAR2", "typeExpression": "VARCHAR2", "charsetName": null, "length": 255, "scale": null, "position": 4, "optional": false, "autoIncremented": false, "generated": false } ] } } ] } }
項目 | フィールド名 | 説明 |
---|---|---|
1 |
| 変更が含まれるデータベースとスキーマを識別します。 |
2 |
| このフィールドには、スキーマの変更を行う DDL が含まれます。 |
3 |
| DDL コマンドによって生成されるスキーマの変更が含まれる 1 つ以上の項目の配列。 |
4 |
|
変更の種類を説明します。
|
5 |
|
作成、変更、または破棄されたテーブルの完全な識別子。テーブルの名前が変更されると、この識別子は |
6 |
| 適用された変更後のテーブルメタデータを表します。 |
7 |
| テーブルのプライマリーキーを設定する列のリスト。 |
8 |
| 変更されたテーブルの各列のメタデータ。 |
コネクターがスキーマ変更トピックに送信するメッセージでは、メッセージキーはスキーマの変更が含まれるデータベースの名前です。以下の例では、payload
フィールドに databaseName
キーが含まれます。
{ "schema": { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "databaseName" } ], "optional": false, "name": "io.debezium.connector.oracle.SchemaChangeKey" }, "payload": { "databaseName": "ORCLPDB1" } }
6.1.4. トランザクション境界を表す Debezium Oracle コネクターによって生成されたイベント
Debezium は、トランザクションメタデータ境界を表し、データ変更イベントメッセージをエンリッチするイベントを生成できます。
Debezium は、コネクターのデプロイ後に発生するトランザクションに対してのみメタデータを登録し、受信します。コネクターをデプロイする前に発生するトランザクションのメタデータは利用できません。
データベーストランザクションは、キーワード BEGIN
および END
で囲まれたステートメントブロックによって表されます。Debezium は、すべてのトランザクションで BEGIN
および END
区切り文字のトランザクション境界イベントを生成します。トランザクション境界イベントには以下のフィールドが含まれます。
status
-
BEGIN
またはEND
id
- 一意のトランザクション識別子の文字列表現。
event_count
(END
イベント用)- トランザクションによって出力されるイベントの合計数。
data_collections
(END
イベント用)-
data_collection
とevent_count
要素のペアの配列。これは、コネクターがデータコレクションから発信された変更に対して出力するイベントの数を示します。
以下の例は、典型的なトランザクション境界メッセージを示しています。
例: Oracle コネクタートランザクション境界イベント
{ "status": "BEGIN", "id": "5.6.641", "event_count": null, "data_collections": null } { "status": "END", "id": "5.6.641", "event_count": 2, "data_collections": [ { "data_collection": "ORCLPDB1.DEBEZIUM.CUSTOMER", "event_count": 1 }, { "data_collection": "ORCLPDB1.DEBEZIUM.ORDER", "event_count": 1 } ] }
transaction.topic
オプションで上書きされない限り、コネクターはトランザクションイベントを <database.server.name>
.transaction
トピックに出力します。
6.1.4.1. Debezium Oracle コネクターがトランザクションメタデータで変更イベントメッセージを強化する方法
トランザクションメタデータを有効にすると、データメッセージ Envelope
は新しい transaction
フィールドでエンリッチされます。このフィールドは、複合フィールドの形式ですべてのイベントに関する情報を提供します。
id
- 一意のトランザクション識別子の文字列表現。
total_order
- トランザクションによって生成されたすべてのイベントを対象とするイベントの絶対位置。
data_collection_order
- トランザクションによって出力されたすべてのイベントを対象とするイベントのデータコレクションごとの位置。
以下の例は、典型的なトランザクションのイベントメッセージを示しています。
{ "before": null, "after": { "pk": "2", "aa": "1" }, "source": { ... }, "op": "c", "ts_ms": "1580390884335", "transaction": { "id": "5.6.641", "total_order": "1", "data_collection_order": "1" } }
6.1.5. Debezium Oracle コネクターのイベントバッファーリング使用方法
Oracle は、後でロールバックによって破棄された変更を含め、発生した順序で再実行ログにすべての変更を書き込みます。その結果、別のトランザクションからの同時変更はインターットアンドされます。コネクターが最初に変更ストリームを読み取ると、どの変更がコミットまたはロールバックされるかをすぐに判断できないため、変更イベントは内部バッファーに一時的に保存されます。変更がコミットされると、コネクターは変更イベントをバッファーから Kafka に書き込みます。コネクターはロールバックによって破棄される変更イベントを破棄します。
プロパティー log.mining.buffer.type
を設定することにより、コネクターが使用するバッファーリングメカニズムを設定できます。
ヒープ
デフォルトのバッファータイプは memory
を使用して設定されます。デフォルトの memory
設定では、コネクターは JVM プロセスのヒープメモリーを使用してバッファーイベントレコードを割り当て、管理します。memory
バッファー設定を使用する場合は、Java プロセスに割り当てるメモリー量が、お使いの環境で長時間実行されるトランザクションや大規模トランザクションに対応することができることを確認してください。
6.1.6. Debezium Oracle コネクターが SCN 値のギャップを検出する方法
Debezium Oracle コネクターが LogMiner を使用するよう設定されると、システム変更番号 (SCN) に基づく開始範囲と終了範囲を使用して、Oracle から変更イベントを収集します。コネクターはこの範囲を自動的に管理し、コネクターがほぼリアルタイムで変更を流せるか、データベース内の大規模またはバルクトランザクションの量によって変更のバックログを処理しなければならないかに応じて、範囲を増減させます。
特定の状況下で、Oracle データベースは SCN 値を一定の割合で増加させるのではなく、異常に高い割合で SCN を前進させます。このような SCN 値のジャンプは、特定のインテグレーションがデータベースと対話する方法やホットバックアップなどのイベントの結果により発生する可能性があります。
Debezium Oracle コネクターは、以下の設定プロパティーに依存して SCN ギャップを検出し、マイニング範囲を調整します。
log.mining.scn.gap.detection.gap.size.min
- ギャップの最小サイズを指定します。
log.mining.scn.gap.detection.time.interval.max.ms
- 最大間隔を指定します。
コネクターは最初に、現在のマイニング範囲で現在の SCN と最大 SCN との間の変更数の違いを比較します。現在の SCN 値と最高 SCN 値の差が最小ギャップサイズより大きい場合、コネクターは SCN ギャップを潜在的に検出していることになる。ギャップが存在するかどうかを確認するために、コネクターは次に前のマイニング範囲の最後に現在の SCN および SCN のタイムスタンプを比較します。タイムスタンプの違いが最大間隔未満の場合、SCN ギャップの存在が確認されます。
SCN ギャップが発生すると、Debezium コネクターは現在のマイマイセッションの範囲のエンドポイントとして現在の SCN を自動的に使用します。これにより、SCN 値が予期せず増加するため、コネクターは変更を返す間で小規模な範囲を減らさずにリアルタイムイベントを迅速にキャッチできます。コネクターが SCN ギャップに応答して前述の手順を実行すると、log.mining.batch.size.max プロパティーで指定された値を無視します。コネクターがマイニングセッションを終了し、リアルタイムのイベントに追いついた後、最大ログマイニングバッチサイズの実施を再開します。
SCN ギャップ検出は、コネクターが動作していて、ほぼリアルタイムでイベントを処理しているときに、大きな SCN 増分が発生した場合にのみ有効です。
6.1.7. Debezium は、変更頻度の低いデータベースでオフセットをどのように管理するか ?
Debezium Oracle コネクターは、コネクターオフセットでシステム変更番号を追跡するので、コネクターを再起動したときに、中断したところから開始することができます。これらのオフセットは、発行された各変更イベントの一部です。ただし、データベース変更の頻度が低い場合 (数時間または数日ごと)、オフセットが古くなり、トランザクションログでシステム変更番号が利用できなくなると、コネクターの再起動が正常に行われなくなる可能性があります。
Oracle への接続に非 CDB モードを使用するコネクターの場合、オフセットの同期を維持するために、コネクターが一定間隔でハートビートイベントを発信するように強制するために heartbeat.interval.ms
を有効にすると、コネクターが定期的にハートビートイベントを発行するようになり、オフセットの同期が維持されます。
Oracle との接続に CDB モードを使用するコネクターの場合、Synchronization のメンテナーンスはより複雑になります。heartbeat.interval.ms
を設定する必要があるだけでなく heartbeat.interval.ms
を設定する必要があります。CDB モード では、コネクターは PDB 内の変更のみを追跡するため、両方のプロパティーを指定する必要があります。プラガブルデータベース内から変更イベントをトリガーするための補足的なメカニズムが必要である。一定の間隔で、ハートビートアクションクエリーがコネクターに新しいテーブル行を挿入したり、プラガブルデータベースの既存の行を更新したりします。Debezium は、テーブルの変更を検知し、変更イベントを発行することで、変更処理の頻度が低いプラガブルデータベースでもオフセットの同期を確保します。
コネクターのユーザーアカウント が所有していないテーブルで heartbeat.action.query
を使用するには、コネクターのユーザーにそれらのテーブルで必要な INSERT
または UPDATE
クエリーを実行する権限を付与する必要があります。