第7章 Oracle の Debezium コネクター


Debezium の Oracle コネクターは、Oracle サーバーのデータベースで発生する行レベルの変更をキャプチャーして記録します。これには、コネクターの実行中に追加されたテーブルが含まれます。コネクターを設定して、スキーマおよびテーブルの特定のサブセットの変更イベントを出力したり、特定の列で値を無視、マスク、または切り捨てしたりするように設定できます。

このコネクターと互換性のある Oracle データベースのバージョンについては、Debezium でサポートされる設定ページを参照してください。

ネイティブの LogMiner データベースパッケージを使用して、Debezium が Oracle から最も新しい変更イベントを取り込みます。

Debezium Oracle コネクターの使用に関する情報および手順は、以下のように整理されています。

7.1. Debezium Oracle コネクターの仕組み

Debezium Oracle コネクターを最適に設定し実行するには、コネクターがどのようにスナップショットを実行し、変更イベントをストリームして、Kafka トピック名を決定し、メタデータを使用して、イベントバッファリングを実装するのかを理解することが役に立ちます。

詳細は、以下のトピックを参照してください。

7.1.1. Debezium Oracle コネクターによるデータベーススナップショットの実行方法

通常、Oracle サーバーの redo ログは、データベースの完全な履歴を保持しないように設定されています。そのため、Debezium Oracle コネクターはログからデータベースの履歴全体を取得できません。コネクターがデータベースの現在の状態のベースラインを確立できるようにするには、コネクターの初回起動時に、データベースの最初の 整合性スナップショット を実行します。

注記

初期スナップショットの完了までにかかる時間が、データベースに設定されている UNDO_RETENTION 時間 (デフォルトでは 15 分) を超えると、ORA-01555 例外が発生する可能性があります。エラーに関する詳細情報と、そこから回復するための手順の詳細は、よくある質問 を参照してください。

スナップショットの詳細は、以下のセクションを参照してください。

Oracle コネクターが初期スナップショットを実行するために使用するデフォルトのワークフロー

以下のワークフローでは、Debezium がスナップショットを作成する手順を示しています。この手順では、snapshot.mode 設定プロパティーがデフォルト値 (initial) に設定されている場合のスナップショットのプロセスを説明します。snapshot.mode プロパティーの値を変更することで、コネクターがスナップショットを作成する方法をカスタマイズできます。別のスナップショットモードを設定する場合、コネクターはこのワークフローの変更バージョンを使用してスナップショットを完了します。

スナップショットモードがデフォルトに設定されている場合には、コネクターは以下の作業を完了してスナップショットを作成します。

  1. データベースへの接続を確立します。
  2. キャプチャーするテーブルを決定します。デフォルトでは、コネクターは、キャプチャーから除外するスキーマ が含まれるテーブル以外、すべてのテーブルをキャプチャーします。スナップショットが完了した後、コネクターは指定されたテーブルのデータをストリーミングし続けます。コネクターで特定のテーブルからのみデータをキャプチャーする場合は、table.include.listtable.exclude.list などのプロパティーを設定して、テーブルまたはテーブル要素のサブセットのみのデータをキャプチャーするようにコネクターに指示できます。
  3. スナップショットの作成中に構造的な変更が発生しないように、キャプチャした各テーブルの ROW SHARE MODE ロックを取得します。Debezium は短期間のみ、ロックを保持します。
  4. サーバーの REDO ログから現在のシステム変更番号 (SCN) の位置を読み取ります。
  5. すべてのデータベーステーブル、またはキャプチャー対象として指定されたすべてのテーブルの構造をキャプチャーします。コネクターは、内部データベースのスキーマ履歴トピックにスキーマ情報を保持します。スキーマ履歴は、変更イベントの発生時に有効な構造に関する情報を提供します。

    注記

    デフォルトでは、コネクターは、キャプチャー用に設定されていないテーブルも含め、キャプチャーモードにあるデータベース内の全テーブルのスキーマをキャプチャーします。テーブルがキャプチャー用に設定されていない場合、最初のスナップショットはテーブルの構造のみをキャプチャーし、テーブルデータはキャプチャーされません。初期スナップショットに含まれなかったテーブルのスキーマ情報がスナップショットに保持される理由の詳細は、初期スナップショットがすべてのテーブルのスキーマをキャプチャーする理由 を参照してください。

  6. 手順 3 で取得したロックを解放します。他のデータベースクライアントは、以前にロックされていたテーブルに書き込みできるようになります。
  7. 手順 4 で読み取った SCN の位置で、コネクターはキャプチャー用に指定されたテーブル (SELECT * FROM … AS OF SCN 123) をスキャンします。スキャン中に、コネクターは次のタスクを実行します。

    1. スナップショットが開始される前に、テーブルが作成されたことを確認します。スナップショットの開始後にテーブルが作成された場合、コネクターはテーブルをスキップします。スナップショットが完了し、コネクターがストリーミングに移行すると、スナップショットの開始後に作成されたテーブルに対して変更イベントが発行されます。
    2. テーブルからキャプチャーされた行ごとに read イベントを生成します。すべての read イベントには同じ SCN 位置が含まれていますが、これは手順 4 で取得した SCN 位置です。
    3. ソーステーブルの Kafka トピックに各 read イベントを出力します。
    4. 該当する場合は、データテーブルロックを解放します。
  8. コネクターオフセットにスナップショットの正常な完了を記録します。

作成された初期スナップショットは、キャプチャーされたテーブルの各行の現在の状態をキャプチャーします。このベースライン状態から、コネクターは発生した後続の変更をキャプチャーします。

スナップショットプロセスが開始されたら、コネクターの障害、リバランス、またはその他の理由でプロセスが中断されると、コネクターの再起動後にプロセスが再起動されます。コネクターによって最初のスナップショットが完了した後、更新に抜けがないように、ステップ 3 で読み取りした位置からストリーミングを続行します。何らかの理由でコネクターが再び停止した場合に、コネクターは再起動後に最後に停止した位置から変更のストリーミングを再開します。

表7.1 snapshot.mode コネクター設定プロパティーの設定
設定説明

always

各コネクターの開始時にスナップショットを実行します。スナップショットが完了すると、コネクターは、後続のデータベース変更のに備え、イベントレコードのストリーミングを開始します。

Initial

コネクターは、最初のスナップショットを作成するためのデフォルトのワークフローで説明されているように、データベーススナップショットを実行します。スナップショットが完了すると、コネクターは、後続のデータベース変更のに備え、イベントレコードのストリーミングを開始します。

initial_only

コネクターはデータベースのスナップショットを実行し、変更イベントレコードをストリーミングする前に停止して、それ以降の変更イベントのキャプチャを許可しません。

schema_only

コネクターは関連するすべてのテーブルの構造をキャプチャーし、デフォルトのスナップショットワークフロー に記載されているすべてのステップを実行します。ただし、コネクターの起動時 (Step 6) の時点でデータセットを表す READ イベントが作成されない点が異なります。

schema_only_recovery

損失または破損したデータベーススキーマの履歴トピックを復元するにはこのオプションを設定します。再起動後、コネクターはソーステーブルからトピックを再構築するスナップショットを実行します。また、このプロパティーを設定して、予期しない増加が発生するデータベーススキーマ履歴トピックを定期的にプルーニングすることもできます。

警告: 最後のコネクターのシャットダウン後にスキーマの変更がデータベースにコミットされた場合、このモードを使用してスナップショットを実行しないでください。

詳細は、コネクター設定プロパティーテーブルの snapshot.mode を参照してください。

7.1.1.1. 初期スナップショットがすべてのテーブルのスキーマ履歴をキャプチャーする理由

コネクターが実行する最初のスナップショットは、2 種類の情報をキャプチャーします。

テーブルデータ
コネクターの table.include.list プロパティーにあるテーブルの INSERTUPDATE、および DELETE 操作に関する情報。
スキーマデータ
テーブルに適用される構造の変更を記述する DDL ステートメント。スキーマデータは、内部スキーマ履歴トピックとコネクターのスキーマ変更トピック (設定されている場合) の両方に保持されます。

初期スナップショットを実行すると、キャプチャー対象として指定されていないテーブルのスキーマ情報がスナップショットによってキャプチャーされることが分かります。デフォルトでは、初期スナップショットは、キャプチャー用に指定されたテーブルからだけでなく、データベースに存在するすべてのテーブルのスキーマ情報を取得するように設計されています。コネクターでは、テーブルのスキーマがスキーマ履歴トピックにある状態で、テーブルをキャプチャーする必要があります。初期スナップショットが元のキャプチャーセットの一部ではないテーブルのスキーマデータをキャプチャーできるようにして、後で必要になった場合にこれらのテーブルからイベントデータを簡単にキャプチャーできるように、Debezium はコネクターを準備します。初期スナップショットがテーブルのスキーマをキャプチャーしない場合は、コネクターがテーブルからデータをキャプチャーする前に、履歴トピックにスキーマを追加する必要があります。

場合によっては、最初のスナップショットでのスキーマキャプチャーを制限する場合があります。これは、スナップショットの完了に必要な時間の短縮に便利です。または、Debezium が複数の論理データベースにアクセスできるユーザーアカウントを使用して、データベースインスタンスに接続しているにもかかわらず、コネクターで特定の論理データベース内のテーブルからの変更のみをキャプチャーする場合にも便利です。

7.1.1.2. 初期スナップショットでキャプチャーされなかったテーブルからのデータのキャプチャー (スキーマ変更なし)

コネクターを使用して、最初のスナップショットでスキーマがキャプチャーされなかったテーブルからデータをキャプチャーする場合があります。コネクターの設定によっては、最初のスナップショットはデータベース内の特定のテーブルのテーブルスキーマのみをキャプチャーする場合があります。テーブルスキーマが履歴トピックに存在しない場合、コネクターはテーブルのキャプチャーに失敗し、スキーマ欠落エラーを報告します。

テーブルからデータを取得できる場合もありますが、テーブルスキーマを追加するには別の手順を実行する必要があります。

前提条件

手順

  1. コネクターを停止します。
  2. schema.history.internal.kafka.topic プロパティー で指定された内部データベーススキーマ履歴トピックを削除します。
  3. コネクター設定で、以下を行います。

    1. snapshot.modeschema_only_recovery に設定します。
    2. schema.history.internal.store.only.captured.tables.ddl の値を false に設定します。
    3. コネクターがキャプチャーするテーブルを table.include.list に追加します。これにより、コネクターは今後すべてのテーブルのスキーマ履歴を再構築できます。
  4. コネクターを再起動します。スナップショットのリカバリープロセスでは、テーブルの現在の構造に基づいてスキーマ履歴が再ビルドされます。
  5. (オプション) スナップショットが完了したら、増分スナップショット を開始して、コネクターがオフラインだった間に発生した他のテーブルへの変更とともに、新しく追加されたテーブルの既存のデータをキャプチャーします。
  6. (オプション) snapshot.modeschema_only にリセットして、今後の再起動後にコネクターが回復を開始しないようにします。

7.1.1.3. 初期スナップショットでキャプチャーされなかったテーブルからのデータのキャプチャー (スキーマ変更)

スキーマ変更がテーブルに適用される場合、スキーマ変更前にコミットされたレコードの構造は、変更後にコミットされたレコードとは異なります。Debezium はテーブルからデータをキャプチャーするときに、スキーマ履歴を読み取り、各イベントに正しいスキーマが適用されていることを確認します。スキーマがスキーマ履歴トピックに存在しない場合、コネクターはテーブルをキャプチャーできず、エラーが発生します。

最初のスナップショットでキャプチャーされず、テーブルのスキーマが変更されたテーブルからデータをキャプチャーする場合、スキーマがまだ使用可能でない場合は、履歴トピックにスキーマを追加する必要があります。新しいスキーマスナップショットを実行するか、テーブルの初期スナップショットを実行して、スキーマを追加できます。

前提条件

  • コネクターにより最初のスナップショット中にキャプチャーされなかったスキーマが含まれるテーブルからデータをキャプチャーしたいと考えている。
  • スキーマ変更がテーブルに適用されたため、キャプチャーされるレコードの構造が不均一になっている。

手順

初期スナップショットにすべてのテーブルのスキーマがキャプチャーされている場合 (store.only.captured.tables.ddlfalse に設定されました)。
  1. table.include.list プロパティーを編集して、キャプチャーするテーブルを指定します。
  2. コネクターを再起動します。
  3. 新しく追加したテーブルから既存のデータをキャプチャーする場合は、増分スナップショット を開始します。
初期スナップショットにすべてのテーブルのスキーマがキャプチャーされていない場合 (store.only.captured.tables.ddltrue に設定されています)。

最初のスナップショットでキャプチャーするテーブルのスキーマが保存されなかった場合は、次のいずれかの手順を実行します。

手順 1: スキーマスナップショット、その後に増分スナップショット

この手順では、コネクターは最初にスキーマのスナップショットを実行します。その後、増分スナップショットを開始して、コネクターがデータを同期できるようにします。

  1. コネクターを停止します。
  2. schema.history.internal.kafka.topic プロパティー で指定された内部データベーススキーマ履歴トピックを削除します。
  3. 設定された Kafka Connect offset.storage.topic 内のオフセットをクリアします。オフセットを削除する方法の詳細は、Debezium コミュニティーの FAQ を参照してください。

    警告

    オフセットの削除は、内部 Kafka Connect データの操作の経験がある上級ユーザーのみが実行してください。この操作によりシステムが破損する場合があるため、最後の手段としてのみ実行してください。

  4. 次の手順の説明に従って、コネクター設定のプロパティーの値を設定します。

    1. snapshot.mode プロパティーの値を schema_only に設定します。
    2. table.include.list を編集して、キャプチャーするテーブルを追加します。
  5. コネクターを再起動します。
  6. Debezium が新規および既存のテーブルのスキーマをキャプチャーするまで待ちます。コネクターが停止した後にテーブルで発生したデータ変更はキャプチャーされません。
  7. データが損失されないようにするには、増分スナップショット を開始します。
手順 2: 初期スナップショットと、それに続くオプションの増分スナップショット

この手順では、コネクターはデータベースの完全な初期スナップショットを実行します。他の初期スナップショットと同様、多数の大きなテーブルが含まれるデータベースでは、初期スナップショットの実行操作には時間がかかる可能性があります。スナップショットの完了後、任意で増分スナップショットをトリガーして、コネクターがオフラインの間に発生した変更をキャプチャーできます。

  1. コネクターを停止します。
  2. schema.history.internal.kafka.topic プロパティー で指定された内部データベーススキーマ履歴トピックを削除します。
  3. 設定された Kafka Connect offset.storage.topic 内のオフセットをクリアします。オフセットを削除する方法の詳細は、Debezium コミュニティーの FAQ を参照してください。

    警告

    オフセットの削除は、内部 Kafka Connect データの操作の経験がある上級ユーザーのみが実行してください。この操作によりシステムが破損する場合があるため、最後の手段としてのみ実行してください。

  4. table.include.list を編集して、キャプチャーするテーブルを追加します。
  5. 次の手順の説明に従って、コネクター設定のプロパティーの値を設定します。

    1. snapshot.mode プロパティーの値を initial に設定します。
    2. (オプション) schema.history.internal.store.only.captured.tables.ddlfalse に設定します。
  6. コネクターを再起動します。コネクターはデータベース全体のスナップショットを取得します。スナップショットが完了すると、コネクターはストリーミングに移行します。
  7. (オプション) コネクターがオフラインの間に変更されたデータをキャプチャーするには、増分スナップショット を開始します。

7.1.2. アドホックスナップショット

デフォルトでは、コネクターは初回スナップショット操作の開始後にのみ実行されます。通常の状況では、この最初のスナップショットが作成されると、コネクターではスナップショットプロセスは繰り返し処理されません。コネクターがキャプチャーする今後の変更イベントデータはストリーミングプロセス経由でのみ行われます。

ただし、場合によっては、最初のスナップショット中にコネクターを取得したデータが古くなったり、失われたり、または不完全となったり可能性があります。テーブルデータを再キャプチャーするメカニズムを提供するため、Debezium にはアドホックスナップショットを実行するオプションがあります。データベースで以下が変更されたことで、アドホックスナップショットが実行される場合があります。

  • コネクター設定は、異なるテーブルセットをキャプチャーするように変更されます。
  • Kafka トピックを削除して、再構築する必要があります。
  • 設定エラーや他の問題が原因で、データの破損が発生します。

アドホックと呼ばれるスナップショット を開始することで、以前にスナップショットをキャプチャーしたテーブルのスナップショットを再実行できます。アドホックスナップショットには、シグナルテーブル を使用する必要があります。シグナルリクエストを Debezium シグナルテーブルに送信して、アドホックスナップショットを開始します。

既存のテーブルのアドホックスナップショットを開始すると、コネクターはテーブルにすでに存在するトピックにコンテンツを追加します。既存のトピックが削除された場合には、トピックの自動作成 が有効になっているのであれば、Debezium は自動的にトピックを作成できます。

アドホックのスナップショットシグナルは、スナップショットに追加するテーブルを指定します。スナップショットは、データベースの内容全体をキャプチャーしたり、データベース内のテーブルのサブセットのみをキャプチャーしたりできます。また、スナップショットは、データベース内のテーブルの内容のサブセットをキャプチャできます。

execute-snapshot メッセージをシグナルテーブルに送信してキャプチャーするテーブルを指定します。以下の表で説明されているように、execute-snapshot シグナルのタイプを incremental に設定し、スナップショットに追加するテーブルの名前を指定します。

表7.2 アドホックの execute-snapshot シグナルレコードの例
フィールドデフォルト

type

incremental

実行するスナップショットのタイプを指定します。
タイプの設定は任意です。現在要求できるのは、incremental スナップショットのみです。

data-collections

該当なし

スナップショットされるテーブルの完全修飾名にマッチする正規表現を含む配列。
名前の形式は signal.data.collection 設定オプションと同じです。

additional-condition

該当なし

テーブルの内容のサブセットを取得するために、テーブルの列に基づいて条件を指定するオプションの文字列。

surrogate-key

該当なし

スナップショット処理中にコネクターがテーブルのプライマリーキーとして使用する列名を指定するオプションの文字列。

アドホックスナップショットのトリガー

execute-snapshot シグナルタイプのエントリーをシグナルテーブルに追加して、アドホックスナップショットを開始します。コネクターがメッセージを処理した後に、スナップショット操作を開始します。スナップショットプロセスは、最初と最後のプライマリーキーの値を読み取り、これらの値を各テーブルの開始ポイントおよびエンドポイントとして使用します。テーブルのエントリー数と設定されたチャンクサイズに基づいて、Debezium はテーブルをチャンクに分割し、チャンクごとに 1 度に 1 つずつスナップショットを順番に作成していきます。

現在、execute-snapshot アクションタイプは 増分スナップショット のみをトリガーします。詳細は、スナップショットの増分を参照してください。

7.1.3. 増分スナップショット

スナップショットを柔軟に管理するため、Debezium には 増分スナップショット と呼ばれる補助スナップショットメカニズムが含まれています。増分スナップショットは、Debezium コネクターにシグナルを送信する ための Debezium メカニズムに依存します。

増分スナップショットでは、最初のスナップショットのように、データベースの完全な状態を一度にすべてキャプチャーする代わりに、一連の設定可能なチャンクで各テーブルを段階的にキャプチャーします。スナップショットがキャプチャーするテーブルと、各チャンクのサイズ を指定できます。チャンクのサイズにより、データベース上の各フェッチ操作中にスナップショットで収集される行数が決まります。増分スナップショットのデフォルトのチャンクサイズは 1024 行です。

増分スナップショットが進むと、Debezium はウォーターマークを使用して進捗を追跡し、キャプチャーする各テーブル行のレコードを管理します。この段階的なアプローチでは、標準の初期スナップショットプロセスと比較して、以下の利点があります。

  • スナップショットが完了するまで、ストリーミングストリーミングを延期する代わりに、ストリームしたデータキャプチャーと並行して増分スナップショットを実行できます。コネクターはスナップショットプロセス全体で変更ログからのほぼリアルタイムイベントをキャプチャーし続け、他の操作はブロックしません。
  • 増分スナップショットの進捗が中断された場合は、データを失うことなく再開できます。プロセスが再開すると、スナップショットは最初からテーブルをキャプチャーするのではなく、停止した時点から開始します。
  • いつでも増分スナップショットを実行し、必要に応じてプロセスを繰り返してデータベースの更新に適合できます。たとえば、コネクター設定を変更してテーブルを table.include.list プロパティーに追加した後にスナップショットを再実行します。

増分スナップショットプロセス

増分スナップショットを実行する場合には、Debezium は各テーブルをプライマリーキー別に分類して、設定されたチャンクサイズ に基づいてテーブルをチャンクに分割します。チャンクごとに作業し、テーブルの行ごとにチャンクでキャプチャーします。キャプチャーする行ごとに、スナップショットは READ イベントを出力します。そのイベントは、対象となるチャンクのスナップショットを開始する時の行の値を表します。

スナップショットの作成が進むにつれ、他のプロセスがデータベースへのアクセスを継続し、テーブルレコードが変更される可能性があります。このような変更を反映させるように、通常通りに INSERTUPDATEDELETE 操作がトランザクションログにコミットされます。同様に、継続中の Debezium ストリーミングプロセスは、これらの変更イベントを検出し、対応する変更イベントレコードを Kafka に出力します。

Debezium を使用してプライマリーキーが同じレコード間での競合を解決する方法

場合によっては、ストリーミングプロセスが出力する UPDATE または DELETE イベントを順番に受信できます。つまり、ストリーミングプロセスは、スナップショットがその行の READ イベントが含まれるチャンクをキャプチャーする前に、テーブルの行を変更するイベントを生成する可能性があります。スナップショットが最終的に対象の行にあった READ イベントを出力すると、その値はすでに置き換えられています。Debezium は、シーケンスが到達する増分スナップショットイベントが正しい論理順序で処理されるように、競合を解決するためにバッファースキームを使用します。スナップショットのイベント間で競合が発生し、ストリームされたイベントが解決されてからでないと、Debezium はイベントのレコードを Kafka に送信しません。

スナップショットウィンドウ

遅れて入ってきた READ イベントと、同じテーブルの行を変更するストリーミングイベント間の競合の解決を容易にするために、Debezium は スナップショットウィンドウ と呼ばれるものを使用します。スナップショットウィンドウは、増分スナップショットが指定のテーブルチャンクのデータをキャプチャーしている途中に、間隔を決定します。チャンクのスナップショットウィンドウを開く前に、Debezium は通常の動作に従い、トランザクションログから直接ターゲットの Kafka トピックにイベントをダウンストリームに出力します。ただし、特定のチャンクのスナップショットが開放された瞬間から終了するまで、Debezium は重複除去のステップを実行して、プライマリーキーが同じイベント間での競合を解決します。

データコレクションごとに、Debezium は 2 種類のイベントを出力し、それらの両方のレコードを単一の宛先 Kafka トピックに保存します。テーブルから直接キャプチャーするスナップショットレコードは、READ 操作として出力されます。その間、ユーザーはデータコレクションのレコードの更新を続け、各コミットを反映するようにトランザクションログが更新されるので、Debezium は変更ごとに UPDATE または DELETE 操作を出力します。

スナップショットウィンドウが開放され、Debezium がスナップショットチャンクの処理を開始すると、スナップショットレコードをメモリーバッファーに提供します。スナップショットウィンドウ中に、バッファー内の READ イベントのプライマリーキーは、受信ストリームイベントのプライマリーキーと比較されます。一致するものが見つからない場合、ストリーミングされたイベントレコードが Kafka に直接送信されます。Debezium が一致を検出すると、バッファーされた READ イベントを破棄し、ストリーミングされたレコードを宛先トピックに書き込みます。これは、ストリーミングされたイベントが静的スナップショットイベントよりも論理的に優先されるためです。チャンクのスナップショットウィンドウが終了すると、バッファーに含まれるのは、関連するトランザクションログイベントが存在しない READ イベントのみです。Debezium は、これらの残りの READ イベントをテーブルの Kafka トピックに出力します。

コネクターは各スナップショットチャンクにプロセスを繰り返します。

警告

Oracle の Debezium コネクターでは、増分スナップショットの実行中のスキーマの変更はサポートしません。

7.1.3.1. 増分スナップショットのトリガー

現在、増分スナップショットを開始する唯一の方法は、アドホックスナップショットシグナル をソースデータベースのシグナルテーブルに送信することです。

シグナルを SQL INSERT クエリーとしてシグナルテーブルに送信します。

Debezium がシグナルテーブルの変更を検出すると、シグナルを読み取り、要求されたスナップショット操作を実行します。

送信するクエリーはスナップショットに追加するテーブルを指定し、必要に応じてスナップショット操作の種類を指定します。現在、スナップショット操作で唯一の有効なオプションはデフォルト値の incremental だけです。

スナップショットに追加するテーブルを指定するには、テーブルをリストする data-collections 配列またはテーブルの照合に使用する正規表現の配列を指定します。以下に例を示します。

{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}

増分スナップショットシグナルの data-collections アレイにはデフォルト値がありません。data-collections アレイが空である場合には、アクションが不要であり、スナップショットを実行しないことが、Debezium で検出されます。

注記

スナップショットに含めるテーブルの名前に、データベース、スキーマ、またはテーブルの名前にドット (.) が含まれている場合、そのテーブルを data-collections 配列に追加するには、名前の各パートを二重引用符でエスケープする必要があります。

たとえば、以下のようなテーブルを含めるには public スキーマに存在し、その名前が My.Tableのテーブルを含めるには、"public"."My.Table" の形式を使用します。

前提条件

ソースシグナリングチャネルを使用して増分スナップショットをトリガーする

  1. SQL クエリーを送信し、アドホック増分スナップショット要求をシグナルテーブルに追加します。

    INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<tableName>","<tableName>"],"type":"<snapshotType>","additional-condition":"<additional-condition>"}');

    以下に例を示します。

    INSERT INTO myschema.debezium_signal (id, type, data) 1
    values ('ad-hoc-1',   2
        'execute-snapshot',  3
        '{"data-collections": ["schema1.table1", "schema2.table2"], 4
        "type":"incremental"}, 5
        "additional-condition":"color=blue"}'); 6

    コマンドの idtype、および data パラメーターの値は、シグナルテーブルのフィールド に対応します。

    以下の表では、この例のパラメーターを説明しています。

    表7.3 シグナルテーブルに増分スナップショットシグナルを送信する SQL コマンドのフィールドの説明
    項目説明

    1

    myschema.debezium_signal

    ソースデータベースにあるシグナルテーブルの完全修飾名を指定します。

    2

    ad-hoc-1

    id パラメーターは、シグナルリクエストの ID 識別子として割り当てられる任意の文字列を指定します。
    この文字列を使用して、シグナルテーブルのエントリーへのログメッセージを特定します。Debezium はこの文字列を使用しません。代わりに、スナップショット作成中に、Debezium は独自の ID 文字列をウォーターマークシグナルとして生成します。

    3

    execute-snapshot

    type パラメーターを指定し、シグナルがトリガーする操作を指定します。

    4

    data-collections

    シグナルの data フィールドの必須コンポーネントで、スナップショットに含めるテーブル名の配列またはテーブル名と一致する正規表現を指定します。
    この配列は、完全修飾名でテーブルをマッチさせる正規表現をリストアップします。signal.data.collection 設定プロパティーでコネクターのシグナリングテーブル名を指定するのと同じ形式を使用します。

    5

    incremental

    実行するスナップショット操作の種類指定するシグナルの data フィールドの任意のtype コンポーネント。
    現在、唯一の有効なオプションはデフォルト値 incremental だけです。
    値を指定しない場合には、コネクターは増分スナップショットを実行します。

    6

    additional-condition

    テーブルの内容のサブセットを取得するために、テーブルの列に基づいて条件を指定するオプションの文字列。additional-condition パラメーターの詳細は、additional-condition 付きのアドホック増分スナップショット を参照してください。

additional-condition 付きのアドホック増分スナップショット

スナップショットに、テーブル内のコンテンツのサブセットのみを含める場合は、スナップショットシグナルシグナルに additional-condition パラメーターを追加してシグナル要求を変更できます。

一般的なスナップショットの SQL クエリーは、以下の形式を取ります。

SELECT * FROM <tableName> ....

additional-condition パラメーターを追加して、以下の例のように WHERE 条件を SQL クエリーに追加します。

SELECT * FROM <tableName> WHERE <additional-condition> ....

以下の例は、シグナルテーブルに追加の条件を含むアドホック増分スナップショット要求を送信する SQL クエリーを示しています。

INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<tableName>","<tableName>"],"type":"<snapshotType>","additional-condition":"<additional-condition>"}');

たとえば、以下の列が含まれる products テーブルがあるとします。

  • id (プライマリーキー)
  • color
  • quantity

products テーブルの増分スナップショットに color=blue のデータ項目のみを含める場合は、次の SQL ステートメントを使用してスナップショットをトリガーできます。

INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.products"],"type":"incremental", "additional-condition":"color=blue"}');

additional-condition パラメーターを使用すると、列が 2 つ以上となる条件を指定することもできます。たとえば、前述の例の products テーブルを使用して、color=blue および quantity>10 だけに一致するアイテムのみのデータが含まれる増分スナップショットをトリガーするクエリーを送信できます。

INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.products"],"type":"incremental", "additional-condition":"color=blue AND quantity>10"}');

以下の例は、コネクターによってキャプチャーされる増分スナップショットイベントの JSON を示しています。

例: 増分スナップショットイベントメッセージ

{
    "before":null,
    "after": {
        "pk":"1",
        "value":"New data"
    },
    "source": {
        ...
        "snapshot":"incremental" 1
    },
    "op":"r", 2
    "ts_ms":"1620393591654",
    "transaction":null
}

項目フィールド名説明

1

snapshot

実行するスナップショット操作タイプを指定します。
現在、唯一の有効なオプションはデフォルト値 incremental だけです。
シグナルテーブルに送信する SQL クエリーでの type 値の指定は任意です。
値を指定しない場合には、コネクターは増分スナップショットを実行します。

2

op

イベントタイプを指定します。
スナップショットイベントの値は r で、READ 操作を示します。

7.1.3.2. Kafka シグナルチャネルを使用して増分スナップショットをトリガーする

設定された Kafka トピック にメッセージを送信して、コネクターにアドホック増分スナップショットを実行するよう要求できます。

Kafka メッセージのキーは、topic.prefix コネクター設定オプションの値と一致する必要があります。

メッセージの値は、typedata フィールドが含まれる JSON オブジェクトとなっています。

シグナルタイプは execute-snapshot で、data フィールドには以下のフィールドが必要です。

表7.4 スナップショットデータフィールドの実行
フィールドデフォルト

type

incremental

実行するスナップショットのタイプ。現在、Debezium は incremental 型のみをサポートしています。
詳細は次のセクションを参照してください。

data-collections

該当なし

スナップショットに含めるテーブルの完全修飾名と一致する、コンマ区切りの正規表現の配列。
signal.data.collection 設定オプションに必要な形式と同じ形式を使用して名前を指定します。

additional-condition

該当なし

コネクターがスナップショットに含める列のサブセットを指定するために評価する条件を指定するオプションの文字列。

execute-snapshot Kafka メッセージの例:

Key = `test_connector`

Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`

追加条件付きのアドホック増分スナップショット

Debezium は additional-condition フィールドを使用してテーブルのコンテンツのサブセットを選択します。

通常、Debezium はスナップショットを実行するときに、次のような SQL クエリーを実行します。

SELECT * FROM <tableName> …​.

スナップショットリクエストに additional-condition が含まれる場合、次のように additional-condition が SQL クエリーに追加されます。

SELECT * FROM <tableName> WHERE <additional-condition> …​.

たとえば、列 id (プライマリーキー)、color、および brand を含む products テーブルがある場合、スナップショットに color='blue' のコンテンツのみを含める場合は、スナップショットをリクエストするときに、コンテンツをフィルタリングする additional-condition ステートメントを追加することができます。

Key = `test_connector`

Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.products"], "type": "INCREMENTAL", "additional-condition":"color='blue'"}}`

additional-condition ステートメントを使用して、複数の列に基づいて条件を渡すことができます。たとえば、前の例と同じ products テーブルを使用して、color='blue' および brand='MyBrand' である products テーブルのコンテンツのみをスナップショットに含める場合は、次のリクエストを送信できます。

Key = `test_connector`

Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.products"], "type": "INCREMENTAL", "additional-condition":"color='blue' AND brand='MyBrand'"}}`

7.1.3.3. 増分スナップショットの停止

ソースデータベースのテーブルにシグナルを送信して、増分スナップショットを停止することもできます。SQL INSERT クエリーを送信して、停止スナップショットシグナルをテーブルに送信します。

Debezium はシグナルテーブルの変更を検出した後、シグナルを読み、増分スナップショット操作が進行中であればそれを停止します。

送信するクエリーは、incremental のスナップショット操作を指定し、任意で、削除する実行中のスナップショットのテーブルを指定します。

前提条件

ソースシグナリングチャネルを使用して増分スナップショットを停止する

  1. SQL クエリーを送信して、シグナリングテーブルへのアドホックインクリメンタルスナップショットを停止します。

    INSERT INTO <signalTable> (id, type, data) values ('<id>', 'stop-snapshot', '{"data-collections": ["<tableName>","<tableName>"],"type":"incremental"}');

    以下に例を示します。

    INSERT INTO myschema.debezium_signal (id, type, data) 1
    values ('ad-hoc-1',   2
        'stop-snapshot',  3
        '{"data-collections": ["schema1.table1", "schema2.table2"], 4
        "type":"incremental"}'); 5

    signal コマンドの idtype、および data パラメーターの値は、シグナルテーブルのフィールド に対応します。

    以下の表では、この例のパラメーターを説明しています。

    表7.5 シグナリングテーブルに増分スナップショット停止信号を送信するための SQL コマンドのフィールドの説明
    項目説明

    1

    myschema.debezium_signal

    ソースデータベースにあるシグナルテーブルの完全修飾名を指定します。

    2

    ad-hoc-1

    id パラメーターは、シグナルリクエストの ID 識別子として割り当てられる任意の文字列を指定します。
    この文字列を使用して、シグナルテーブルのエントリーへのログメッセージを特定します。Debezium はこの文字列を使用しません。

    3

    stop-snapshot

    type パラメーターを指定し、シグナルがトリガーする操作を指定します。

    4

    data-collections

    シグナルの data フィールドのオプションコンポーネントで、スナップショットから削除するテーブル名の配列またはテーブル名とマッチする正規表現を指定します。
    この配列は、完全修飾名でテーブルをマッチさせる正規表現をリストアップします。signal.data.collection 設定プロパティーでコネクターのシグナリングテーブル名を指定するのと同じ形式を使用します。data フィールドのこのコンポーネントを省略すると、シグナルは進行中の増分スナップショット全体を停止します。

    5

    incremental

    停止させるスナップショット操作の種類を指定する信号の data フィールドの必須コンポーネント。
    現在、有効な唯一のオプションは incremental です。
    type の値を指定しない場合、シグナルは増分スナップショットの停止に失敗します。

7.1.3.4. Kafka シグナリングチャネルを使用して増分スナップショットを停止する

設定された Kafka シグナルトピック にシグナルメッセージを送信して、アドホック増分スナップショットを停止できます。

Kafka メッセージのキーは、topic.prefix コネクター設定オプションの値と一致する必要があります。

メッセージの値は、typedata フィールドが含まれる JSON オブジェクトとなっています。

シグナルタイプは stop-snapshot で、data フィールドには以下のフィールドが必要です。

表7.6 スナップショットデータフィールドの実行
フィールドデフォルト

type

incremental

実行するスナップショットのタイプ。現在、Debezium は incremental 型のみをサポートしています。
詳細は次のセクションを参照してください。

data-collections

該当なし

スナップショットに含めるテーブルの完全修飾名と一致する、コンマ区切りの正規表現のオプションの配列。
signal.data.collection 設定オプションに必要な形式と同じ形式を使用して名前を指定します。

次の例は、典型的な stop-snapshot の Kafka メッセージを示しています。

Key = `test_connector`

Value = `{"type":"stop-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`

7.1.4. Debezium Oracle 変更イベントレコードを受信する Kafka トピックのデフォルト名

デフォルトでは、Oracle コネクターは、テーブルで発生するすべての INSERTUPDATEDELETE 操作の変更イベントを、そのテーブルに固有の単一の Apache Kafka トピックに書き込みます。コネクターは以下の規則を使用して変更イベントトピックに名前を付けます。

topicPrefix.schemaName.tableName

以下のリストは、デフォルト名のコンポーネントの定義を示しています。

topicPrefix
topic.prefix コネクター設定プロパティーで指定されたトピック接頭辞。
schemaName
操作が発生したスキーマの名前。
tableName
操作が発生したテーブルの名前。

たとえば、fulfillment がサーバー名、inventory がスキーマ名で、データベースに orderscustomersproducts という名前のテーブルが含まれる場合には、Debezium Oracle コネクターは、データベースのテーブルごとに 1 つ、以下の Kafka トピックにイベントを出力します。

fulfillment.inventory.orders
fulfillment.inventory.customers
fulfillment.inventory.products

コネクターは同様の命名規則を適用して、内部データベーススキーマの履歴トピック (スキーマ変更トピックトランザクションメタデータトピック) にラベルを付けます。

デフォルトのトピック名が要件を満たさない場合は、カスタムトピック名を設定できます。カスタムトピック名を設定するには、論理トピックルーティング SMT に正規表現を指定します。論理トピックルーティング SMT を使用してトピックの命名をカスタマイズする方法は、トピックルーティング を参照してください。

7.1.5. Debezium Oracle コネクターによるデータベーススキーマの変更の処理方法

データベースクライアントがデータベースのクエリーを行うと、クライアントはデータベースの現在のスキーマを使用します。しかし、データベーススキーマはいつでも変更が可能です。そのため、挿入、更新、または削除の操作が記録されるたびに、コネクターはどのスキーマであるかを特定できる必要があります。また、コネクターは必ずしも現在のスキーマをすべてのイベントに適用できるとは限りません。イベントが比較的古い場合は、現在のスキーマが適用される前に記録された可能性があります。

スキーマ変更後に発生するイベントを正しく処理するために、Oracle には、データに影響を与える行レベルの変更だけでなく、データベースに適用される DDL ステートメントも REDO ログに含めます。コネクターは、redo ログ内でこれらの DDL ステートメントを検出すると、そのステートメントを解析し、各テーブルのスキーマのインメモリー表現を更新します。コネクターはこのスキーマ表現を使用して、挿入、更新、または削除の操作時にテーブルの構造を特定し、適切な変更イベントを生成します。別のデータベーススキーマ履歴 Kafka トピックでは、コネクターは各 DDL ステートメントがある redo ログの場所とともにすべての DDL ステートメントを記録します。

クラッシュするか、正常に停止した後に、コネクターを再起動すると、特定の位置 (特定の時点) から redo ログの読み取りを開始します。コネクターは、データベーススキーマ履歴の Kafka トピックを読み取り、コネクターが起動する redo ログの時点まですべての DDL ステートメントを解析することで、この時点で存在したテーブル構造を再ビルドします。

このデータベーススキーマ履歴トピックは、内部コネクター専用となっています。コネクターは任意で コンシューマーアプリケーションを対象とした別のトピックにスキーマ変更イベントを発行する こともできます。

関連情報

7.1.6. Debezium Oracle コネクターによるデータベーススキーマの変更の公開方法

Debezium Oracle コネクターは、データベース内のテーブルに適用される構造的な変更を記述するスキーマ変更イベントを生成するように設定できます。コネクターは、スキーマ変更イベントを <serverName> という名前の Kafka トピックに書き込みます。ここで、topicNametopic.prefix 設定プロパティーに指定した名前空間を指します。

Debezium は、新しいテーブルからデータをストリーミングするとき、またはテーブルの構造が変更されるたびに、新しいメッセージをスキーマ変更トピックに送信します。

コネクターがスキーマ変更トピックに送信するメッセージには、ペイロードと、任意で変更イベントメッセージのスキーマが含まれます。スキーマ変更イベントメッセージのペイロードには、以下の要素が含まれます。

ddl
スキーマの変更につながる SQL CREATEALTER、または DROP ステートメントを提供します。
databaseName
ステートメントが適用されるデータベースの名前。databaseName の値は、メッセージキーとして機能します。
tableChanges
スキーマの変更後のテーブルスキーマ全体の構造化表現。tableChanges フィールドには、テーブルの各列のエントリーなどのアレイが含まれます。構造化された表現は JSON または Avro 形式でデータを表示するため、コンシューマーは DDL パーサーを介して最初にメッセージを処理しなくてもメッセージを簡単に読み取りできます。
重要

デフォルトでは、コネクターは ALL_TABLES データベースビューを使用して、スキーマ履歴トピックに格納するテーブル名を識別します。そのビュー内で、コネクターは、データベースへの接続に使用するユーザーアカウントが使用できるテーブルからのみデータにアクセスできます。

スキーマ履歴トピックが異なるテーブルのサブセットを格納するように設定を変更できます。以下の方法のいずれかを使用して、トピックが保存するテーブルのセットを変更します。

  • Debezium がデータベースへのアクセスに使用するアカウントのパーミッションを変更し、別のテーブルセットが ALL_TABLES ビューに表示されるようにします。
  • コネクタープロパティー schema.history.internal.store.only.captured.tables.ddltrue に設定します。
重要

コネクターがテーブルをキャプチャーするように設定されている場合、テーブルのスキーマ変更の履歴は、スキーマ変更トピックだけでなく、内部データベーススキーマの履歴トピックにも格納されます。内部データベーススキーマ履歴トピックはコネクターのみの使用を対象としており、使用するアプリケーションによる直接使用を目的としていません。スキーマ変更に関する通知が必要なアプリケーションが、スキーマ変更トピックからの情報のみを使用するようにしてください。

重要

データベーススキーマ履歴トピックをパーティションに分割しないでください。データベーススキーマ履歴トピックが正しく機能するには、コネクターが出力するイベントレコードの一貫したグローバル順序を維持する必要があります。

トピックがパーティション間で分割されないようにするには、以下のいずれかの方法を使用してトピックのパーティション数を設定します。

  • データベーススキーマ履歴トピックを手動で作成する場合は、パーティション数を 1 に指定します。
  • Apache Kafka ブローカーを使用してデータベーススキーマ履歴トピックを自動的に作成する場合に、トピックが作成されるので、Kafka num.partitions 設定オプションの値を 1 に設定します。

例: Oracle コネクタースキーマ変更トピックに発行されたメッセージ

以下の例は、JSON 形式の一般的なスキーマ変更メッセージを示しています。メッセージには、テーブルスキーマの論理表現が含まれます。

{
  "schema": {
  ...
  },
  "payload": {
    "source": {
      "version": "2.3.4.Final",
      "connector": "oracle",
      "name": "server1",
      "ts_ms": 1588252618953,
      "snapshot": "true",
      "db": "ORCLPDB1",
      "schema": "DEBEZIUM",
      "table": "CUSTOMERS",
      "txId" : null,
      "scn" : "1513734",
      "commit_scn": "1513754",
      "lcr_position" : null,
      "rs_id": "001234.00012345.0124",
      "ssn": 1,
      "redo_thread": 1,
      "user_name": "user"
    },
    "ts_ms": 1588252618953, 1
    "databaseName": "ORCLPDB1", 2
    "schemaName": "DEBEZIUM", //
    "ddl": "CREATE TABLE \"DEBEZIUM\".\"CUSTOMERS\" \n   (    \"ID\" NUMBER(9,0) NOT NULL ENABLE, \n    \"FIRST_NAME\" VARCHAR2(255), \n    \"LAST_NAME" VARCHAR2(255), \n    \"EMAIL\" VARCHAR2(255), \n     PRIMARY KEY (\"ID\") ENABLE, \n     SUPPLEMENTAL LOG DATA (ALL) COLUMNS\n   ) SEGMENT CREATION IMMEDIATE \n  PCTFREE 10 PCTUSED 40 INITRANS 1 MAXTRANS 255 \n NOCOMPRESS LOGGING\n  STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645\n  PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1\n  BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)\n  TABLESPACE \"USERS\" ", 3
    "tableChanges": [ 4
      {
        "type": "CREATE", 5
        "id": "\"ORCLPDB1\".\"DEBEZIUM\".\"CUSTOMERS\"", 6
        "table": { 7
          "defaultCharsetName": null,
          "primaryKeyColumnNames": [ 8
            "ID"
          ],
          "columns": [ 9
            {
              "name": "ID",
              "jdbcType": 2,
              "nativeType": null,
              "typeName": "NUMBER",
              "typeExpression": "NUMBER",
              "charsetName": null,
              "length": 9,
              "scale": 0,
              "position": 1,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            },
            {
              "name": "FIRST_NAME",
              "jdbcType": 12,
              "nativeType": null,
              "typeName": "VARCHAR2",
              "typeExpression": "VARCHAR2",
              "charsetName": null,
              "length": 255,
              "scale": null,
              "position": 2,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            },
            {
              "name": "LAST_NAME",
              "jdbcType": 12,
              "nativeType": null,
              "typeName": "VARCHAR2",
              "typeExpression": "VARCHAR2",
              "charsetName": null,
              "length": 255,
              "scale": null,
              "position": 3,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            },
            {
              "name": "EMAIL",
              "jdbcType": 12,
              "nativeType": null,
              "typeName": "VARCHAR2",
              "typeExpression": "VARCHAR2",
              "charsetName": null,
              "length": 255,
              "scale": null,
              "position": 4,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            }
          ],
          "attributes": [ 10
            {
              "customAttribute": "attributeValue"
            }
          ]
        }
      }
    ]
  }
}
表7.7 スキーマ変更トピックに出力されたメッセージのフィールドの説明
項目フィールド名説明

1

ts_ms

コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。

ソースオブジェクトの ts_ms は、データベースで変更が行われた時刻を示す。payload.source.ts_ms の値を payload.ts_ms の値と比較することにより、ソースデータベースの更新と Debezium との間の遅延を判断できます。

2

databaseName
schemaName

変更が含まれるデータベースとスキーマを識別します。

3

ddl

このフィールドには、スキーマの変更を行う DDL が含まれます。

4

tableChanges

DDL コマンドによって生成されるスキーマの変更が含まれる 1 つ以上の項目の配列。

5

type

変更の種類を説明します。type は以下の値のいずれかに設定できます。

CREATE
テーブルの作成
ALTER
テーブルの変更
DROP
テーブルの削除

6

id

作成、変更、または破棄されたテーブルの完全な識別子。テーブルの名前が変更されると、この識別子は<old>,<new> のテーブル名が連結されます。

7

table

適用された変更後のテーブルメタデータを表します。

8

primaryKeyColumnNames

テーブルのプライマリーキーを設定する列のリスト。

9

変更されたテーブルの各列のメタデータ。

10

attributes

各テーブル変更のカスタム属性メタデータ。

コネクターがスキーマ変更トピックに送信するメッセージでは、メッセージキーはスキーマの変更が含まれるデータベースの名前です。以下の例では、payload フィールドに databaseName キーが含まれます。

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "string",
        "optional": false,
        "field": "databaseName"
      }
    ],
    "optional": false,
    "name": "io.debezium.connector.oracle.SchemaChangeKey"
  },
  "payload": {
    "databaseName": "ORCLPDB1"
  }
}

7.1.7. トランザクション境界を表す Debezium Oracle コネクターによって生成されたイベント

Debezium は、トランザクションメタデータ境界を表し、データ変更イベントメッセージをエンリッチするイベントを生成できます。

Debezium がトランザクションメタデータを受信する場合の制限

Debezium は、コネクターのデプロイ後に発生するトランザクションに対してのみメタデータを登録し、受信します。コネクターをデプロイする前に発生するトランザクションのメタデータは利用できません。

データベーストランザクションは、キーワード BEGIN および END で囲まれたステートメントブロックによって表されます。Debezium は、すべてのトランザクションで BEGIN および END 区切り文字のトランザクション境界イベントを生成します。トランザクション境界イベントには以下のフィールドが含まれます。

status
BEGIN または END
id
一意のトランザクション識別子の文字列表現。
ts_ms
データソースでのトランザクション境界イベント (BEGIN または END イベント) の時間。もしデータソースが Debezium にイベント時間を提供しないなら、このフィールドは代わりに Debezium がイベントを処理する時間を表します。
event_count (END イベント用)
トランザクションによって出力されるイベントの合計数。
data_collections (END イベント用)
data_collectionevent_count 要素のペアの配列。これは、コネクターがデータコレクションから発信された変更に対して出力するイベントの数を示します。

以下の例は、典型的なトランザクション境界メッセージを示しています。

例: Oracle コネクタートランザクション境界イベント

{
  "status": "BEGIN",
  "id": "5.6.641",
  "ts_ms": 1486500577125,
  "event_count": null,
  "data_collections": null
}

{
  "status": "END",
  "id": "5.6.641",
  "ts_ms": 1486500577691,
  "event_count": 2,
  "data_collections": [
    {
      "data_collection": "ORCLPDB1.DEBEZIUM.CUSTOMER",
      "event_count": 1
    },
    {
      "data_collection": "ORCLPDB1.DEBEZIUM.ORDER",
      "event_count": 1
    }
  ]
}

topic.transaction オプションで上書きされない限り、コネクターはトランザクションイベントを <topic.prefix>.transaction トピックに出力します。

7.1.7.1. Debezium Oracle コネクターがトランザクションメタデータで変更イベントメッセージを強化する方法

トランザクションメタデータを有効にすると、データメッセージ Envelope は新しい transaction フィールドでエンリッチされます。このフィールドは、複合フィールドの形式ですべてのイベントに関する情報を提供します。

id
一意のトランザクション識別子の文字列表現。
total_order
トランザクションによって生成されたすべてのイベントを対象とするイベントの絶対位置。
data_collection_order
トランザクションによって出力されたすべてのイベントを対象とするイベントのデータコレクションごとの位置。

以下の例は、典型的なトランザクションのイベントメッセージを示しています。

{
  "before": null,
  "after": {
    "pk": "2",
    "aa": "1"
  },
  "source": {
...
  },
  "op": "c",
  "ts_ms": "1580390884335",
  "transaction": {
    "id": "5.6.641",
    "total_order": "1",
    "data_collection_order": "1"
  }
}

クエリーモード

Debezium Oracle コネクターは、デフォルトで Oracle LogMiner と統合されます。このインテグレーションには、トランザクションログに記録された変更を変更イベントとして取り込むための複雑な JDBC SQL クエリーの生成など、特殊な一連の手順が必要です。JDBC SQL クエリーで使用される V$LOGMNR_CONTENTS ビューには、クエリーのパフォーマンスを向上させるための索引がありません。そのため、クエリーの実行を改善する方法として、SQL クエリーの生成方法を制御するさまざまなクエリーモードを使用できます。

log.mining.query.filter.mode コネクタープロパティーを次のいずれかで設定して、JDBC SQL クエリーの生成方法を変更できます。

none
(デフォルト) このモードでは、データベースレベルでの挿入、更新、削除などのさまざまな操作タイプに基づいてフィルタリングのみを行う JDBC クエリーが作成されます。スキーマ、テーブル、またはユーザー名の包含/除外リストに基づいてデータをフィルター処理する場合、これはコネクター内の処理ループ中に行われます。

このモードは、多くの場合、変更があまり多くないデータベースから少数のテーブルをキャプチャーする場合に役立ちます。生成されるクエリーは非常に単純で、データベースのオーバーヘッドを低く抑えてできるだけ早く読み取ることに主に重点を置いています。
in
このモードでは、データベースレベルの操作タイプだけでなく、スキーマ、テーブル、およびユーザー名の包含/除外リストもフィルタリングする JDBC クエリーが作成されます。クエリーの述語は、包含/除外リスト設定プロパティーで指定された値に基づいて SQL in 句を使用して生成されます。

このモードは通常、変更が過度に存在するデータベースから多数のテーブルをキャプチャーする場合に役立ちます。生成されるクエリーは none モードよりもはるかに複雑で、ネットワークオーバーヘッドを削減し、データベースレベルで可能な限り多くのフィルタリングを実行することに重点を置いています。

最後に、スキーマおよびテーブルの包含/除外設定プロパティーの一部として正規表現を 指定しないでください。正規表現を使用すると、コネクターがこれらの設定プロパティーに基づく変更と一致しなくなり、変更が失われる原因になります。
regex
このモードでは、データベースレベルの操作タイプだけでなく、スキーマ、テーブル、およびユーザー名の包含/除外リストもフィルタリングする JDBC クエリーが作成されます。ただし、in モードとは異なり、このモードでは、値が含まれるか除外されるかに応じて結合または論理和を使用する Oracle REGEXP_LIKE 演算子を使用して SQL クエリーを生成します。

このモードは、少数の正規表現を使用して識別できる可変数のテーブルをキャプチャーする場合に便利です。生成されるクエリーは他のモードよりもはるかに複雑で、ネットワークオーバーヘッドを削減し、データベースレベルで可能な限り多くのフィルタリングを実行することに重点を置いています。

7.1.8. Debezium Oracle コネクターのイベントバッファリング使用方法

Oracle は、後でロールバックによって破棄された変更を含め、発生した順序で再実行ログにすべての変更を書き込みます。その結果、別のトランザクションからの同時変更はインターットアンドされます。コネクターが最初に変更ストリームを読み取ると、どの変更がコミットまたはロールバックされるかをすぐに判断できないため、変更イベントは内部バッファーに一時的に保存されます。変更がコミットされると、コネクターは変更イベントをバッファーから Kafka に書き込みます。コネクターはロールバックによって破棄される変更イベントを破棄します。

プロパティー log.mining.buffer.type を設定することにより、コネクターが使用するバッファリングメカニズムを設定できます。

ヒープ

デフォルトのバッファータイプは memory を使用して設定されます。デフォルトの memory 設定では、コネクターは JVM プロセスのヒープメモリーを使用してバッファーイベントレコードを割り当て、管理します。memory バッファー設定を使用する場合は、Java プロセスに割り当てるメモリー量が、お使いの環境で長時間実行されるトランザクションや大規模トランザクションに対応することができることを確認してください。

7.1.9. Debezium Oracle コネクターが SCN 値のギャップを検出する方法

Debezium Oracle コネクターが LogMiner を使用するよう設定されると、システム変更番号 (SCN) に基づく開始範囲と終了範囲を使用して、Oracle から変更イベントを収集します。コネクターはこの範囲を自動的に管理し、コネクターがほぼリアルタイムで変更を流せるか、データベース内の大規模またはバルクトランザクションの量によって変更のバックログを処理しなければならないかに応じて、範囲を増減させます。

特定の状況下で、Oracle データベースは SCN 値を一定の割合で増加させるのではなく、異常に高い割合で SCN を前進させます。このような SCN 値のジャンプは、特定のインテグレーションがデータベースと対話する方法やホットバックアップなどのイベントの結果により発生する可能性があります。

Debezium Oracle コネクターは、以下の設定プロパティーに依存して SCN ギャップを検出し、マイニング範囲を調整します。

log.mining.scn.gap.detection.gap.size.min
ギャップの最小サイズを指定します。
log.mining.scn.gap.detection.time.interval.max.ms
最大間隔を指定します。

コネクターは最初に、現在のマイニング範囲で現在の SCN と最大 SCN との間の変更数の違いを比較します。現在の SCN 値と最高 SCN 値の差が最小ギャップサイズより大きい場合、コネクターは SCN ギャップを潜在的に検出していることになる。ギャップが存在するかどうかを確認するために、コネクターは次に前のマイニング範囲の最後に現在の SCN および SCN のタイムスタンプを比較します。タイムスタンプの違いが最大間隔未満の場合、SCN ギャップの存在が確認されます。

SCN ギャップが発生すると、Debezium コネクターは現在のマイマイセッションの範囲のエンドポイントとして現在の SCN を自動的に使用します。これにより、SCN 値が予期せず増加するため、コネクターは変更を返す間で小規模な範囲を減らさずにリアルタイムイベントを迅速にキャッチできます。コネクターが SCN ギャップに応答して前述の手順を実行すると、log.mining.batch.size.max プロパティーで指定された値を無視します。コネクターがマイニングセッションを終了し、リアルタイムのイベントに追いついた後、最大ログマイニングバッチサイズの実施を再開します。

警告

SCN ギャップ検出は、コネクターが動作していて、ほぼリアルタイムでイベントを処理しているときに、大きな SCN 増分が発生した場合にのみ有効です。

7.1.10. Debezium は、変更頻度の低いデータベースでオフセットをどのように管理するか ?

Debezium Oracle コネクターは、コネクターオフセットでシステム変更番号を追跡するので、コネクターを再起動したときに、中断したところから開始することができます。これらのオフセットは、発行された各変更イベントの一部です。ただし、データベース変更の頻度が低い場合 (数時間または数日ごと)、オフセットが古くなり、トランザクションログでシステム変更番号が利用できなくなると、コネクターの再起動が正常に行われなくなる可能性があります。

Oracle への接続に非 CDB モードを使用するコネクターの場合、オフセットの同期を維持するために、コネクターが一定間隔でハートビートイベントを発信するように強制するために heartbeat.interval.ms を有効にすると、コネクターが定期的にハートビートイベントを発行するようになり、オフセットの同期が維持されます。

Oracle との接続に CDB モードを使用するコネクターの場合、Synchronization のメンテナンスはより複雑になります。heartbeat.interval.ms を設定する必要があるだけでなく heartbeat.interval.ms を設定する必要があります。CDB モードでは、コネクターは PDB 内の変更のみを追跡するため、両方のプロパティーを指定する必要があります。プラガブルデータベース内から変更イベントをトリガーするための補足的なメカニズムが必要である。一定の間隔で、ハートビートアクションクエリーがコネクターに新しいテーブル行を挿入したり、プラガブルデータベースの既存の行を更新したりします。Debezium は、テーブルの変更を検知し、変更イベントを発行することで、変更処理の頻度が低いプラガブルデータベースでもオフセットの同期を確保します。

注記

コネクターのユーザーアカウント が所有していないテーブルで heartbeat.action.query を使用するには、コネクターのユーザーにそれらのテーブルで必要な INSERT または UPDATE クエリーを実行する権限を付与する必要があります。

Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.