2.5. Oracle の Debezium コネクター
Debezium の Oracle コネクターは、Oracle サーバーのデータベースで発生する行レベルの変更をキャプチャーして記録します。これには、コネクターの実行中に追加されたテーブルが含まれます。コネクターを設定して、スキーマおよびテーブルの特定のサブセットの変更イベントを出力したり、特定の列で値を無視、マスク、または切り捨てしたりするように設定できます。
このコネクターと互換性のある Oracle データベースのバージョンについては、Debezium でサポートされる設定ページを参照してください。
Debezium は、ネイティブ LogMiner データベースパッケージまたは XStream API を使用して、Oracle から変更イベントを取り込むことができます。
XStream での Debezium Oracle コネクターの使用は、開発者プレビュー機能です。開発者プレビュー機能は、Red Hat ではいかなる形でもサポートされていません。また、機能的には完全ではなく、実稼働環境に対応していません。開発者プレビューのソフトウェアを実稼働ワークロードまたはビジネスクリティカルなワークロードには使用しないでください。開発者プレビューソフトウェアは、今後 Red Hat 製品サービスとして追加される可能性のある製品ソフトウェアを前もって早期に利用できます。お客様はこのソフトウェアを使用して機能をテストし、開発プロセス中にフィードバックを提供できます。このソフトウェアにはドキュメントが存在しない可能性があり、変更または削除される可能性があります。また、限定的なテストしか行われていません。Red Hat は、関連する SLA なしに、開発者プレビューソフトウェアに対するフィードバックを送信する手段を提供する場合があります。
Red Hat 開発者プレビューソフトウェアのサポート範囲の詳細は、開発者プレビューのサポート範囲 を参照してください。
Debezium Oracle コネクターの使用に関する情報および手順は、以下のように整理されています。
2.5.1. Debezium Oracle コネクターの仕組み リンクのコピーリンクがクリップボードにコピーされました!
Debezium Oracle コネクターを最適に設定し実行するには、コネクターがどのようにスナップショットを実行し、変更イベントをストリームして、Kafka トピック名を決定し、メタデータを使用して、イベントバッファリングを実装するのかを理解することが役に立ちます。
詳細は、以下のトピックを参照してください。
2.5.1.1. Debezium Oracle コネクターによるデータベーススナップショットの実行方法 リンクのコピーリンクがクリップボードにコピーされました!
通常、Oracle サーバーの redo ログは、データベースの完全な履歴を保持しないように設定されています。そのため、Debezium Oracle コネクターはログからデータベースの履歴全体を取得できません。コネクターがデータベースの現在の状態のベースラインを確立できるようにするには、コネクターの初回起動時に、データベースの最初の 整合性スナップショット を実行します。
初期スナップショットの完了までにかかる時間が、データベースに設定されている UNDO_RETENTION 時間 (デフォルトでは 15 分) を超えると、ORA-01555 例外が発生する可能性があります。エラーに関する詳細情報と、そこから回復するための手順の詳細は、よくある質問 を参照してください。
テーブルのスナップショット中に、Oracle で ORA-01466 例外が発生する可能性があります。これは、ユーザーがテーブルのスキーマを変更したり、スナップショット対象のテーブルに関連付けられたインデックスまたは関連オブジェクトを追加、変更、または削除したりした場合に発生します。このような事態が発生した場合、コネクターが停止し、初期スナップショットを最初から取得する必要があります。
この問題を解決するには、特定のテーブルのスナップショットが再起動されるように、snapshot.database.errors.max.retries プロパティーを 0 より大きい値に設定してください。再試行時にスナップショット全体が最初から開始されることはありませんが、問題の特定のテーブルは最初から再読み取りされ、テーブルのトピックには重複したスナップショットイベントが含まれます。
スナップショットの詳細は、以下のセクションを参照してください。
2.5.1.1.1. Oracle コネクターが初期スナップショットを実行するために使用するデフォルトのワークフロー リンクのコピーリンクがクリップボードにコピーされました!
以下のワークフローでは、Debezium がスナップショットを作成する手順を示しています。この手順では、snapshot.mode 設定プロパティーがデフォルト値 (initial) に設定されている場合のスナップショットのプロセスを説明します。snapshot.mode プロパティーの値を変更することで、コネクターがスナップショットを作成する方法をカスタマイズできます。別のスナップショットモードを設定する場合、コネクターはこのワークフローの変更バージョンを使用してスナップショットを完了します。
スナップショットモードがデフォルトに設定されている場合には、コネクターは以下の作業を完了してスナップショットを作成します。
- データベースへの接続を確立します。
-
キャプチャーするテーブルを決定します。デフォルトでは、コネクターは、キャプチャーから除外するスキーマ が含まれるテーブル以外、すべてのテーブルをキャプチャーします。スナップショットが完了した後、コネクターは指定されたテーブルのデータをストリーミングし続けます。コネクターで特定のテーブルからのみデータをキャプチャーする場合は、
table.include.listやtable.exclude.listなどのプロパティーを設定して、テーブルまたはテーブル要素のサブセットのみのデータをキャプチャーするようにコネクターに指示できます。 -
スナップショットの作成中に構造的な変更が発生しないように、キャプチャした各テーブルの
ROW SHARE MODEロックを取得します。Debezium は短期間のみ、ロックを保持します。 - サーバーの REDO ログから現在のシステム変更番号 (SCN) の位置を読み取ります。
すべてのデータベーステーブル、またはキャプチャー対象として指定されたすべてのテーブルの構造をキャプチャーします。コネクターは、内部データベースのスキーマ履歴トピックにスキーマ情報を保持します。スキーマ履歴は、変更イベントの発生時に有効な構造に関する情報を提供します。
注記デフォルトでは、コネクターは、キャプチャー用に設定されていないテーブルも含め、キャプチャーモードにあるデータベース内の全テーブルのスキーマをキャプチャーします。テーブルがキャプチャー用に設定されていない場合、最初のスナップショットはテーブルの構造のみをキャプチャーし、テーブルデータはキャプチャーされません。初期スナップショットに含まれなかったテーブルのスキーマ情報がスナップショットに保持される理由の詳細は、初期スナップショットがすべてのテーブルのスキーマをキャプチャーする理由 を参照してください。
- 手順 3 で取得したロックを解放します。他のデータベースクライアントは、以前にロックされていたテーブルに書き込みできるようになります。
手順 4 で読み取った SCN の位置で、コネクターはキャプチャー用に指定されたテーブル (
SELECT * FROM … AS OF SCN 123) をスキャンします。スキャン中に、コネクターは次のタスクを実行します。- スナップショットが開始される前に、テーブルが作成されたことを確認します。スナップショットの開始後にテーブルが作成された場合、コネクターはテーブルをスキップします。スナップショットが完了し、コネクターがストリーミングに移行すると、スナップショットの開始後に作成されたテーブルに対して変更イベントが発行されます。
-
テーブルからキャプチャーされた行ごとに
readイベントを生成します。すべてのreadイベントには同じ SCN 位置が含まれていますが、これは手順 4 で取得した SCN 位置です。 -
ソーステーブルの Kafka トピックに各
readイベントを出力します。 - 該当する場合は、データテーブルロックを解放します。
- コネクターオフセットにスナップショットの正常な完了を記録します。
作成された初期スナップショットは、キャプチャーされたテーブルの各行の現在の状態をキャプチャーします。このベースライン状態から、コネクターは発生した後続の変更をキャプチャーします。
スナップショットプロセスが開始されたら、コネクターの障害、リバランス、またはその他の理由でプロセスが中断されると、コネクターの再起動後にプロセスが再起動されます。コネクターによって最初のスナップショットが完了した後、更新に抜けがないように、ステップ 3 で読み取りした位置からストリーミングを続行します。何らかの理由でコネクターが再び停止した場合に、コネクターは再起動後に最後に停止した位置から変更のストリーミングを再開します。
| 設定 | 説明 |
|---|---|
|
| 各コネクターの開始時にスナップショットを実行します。スナップショットが完了すると、コネクターは、後続のデータベース変更のに備え、イベントレコードのストリーミングを開始します。 |
|
| コネクターは 初期スナップショットを作成するためのデフォルトのワークフロー で説明されているように、データベーススナップショットを実行します。スナップショットが完了すると、コネクターは、後続のデータベース変更のに備え、イベントレコードのストリーミングを開始します。 |
|
| コネクターはデータベースのスナップショットを実行し、変更イベントレコードをストリーミングする前に停止して、それ以降の変更イベントのキャプチャを許可しません。 |
|
|
非推奨です。 |
|
|
コネクターは関連するすべてのテーブルの構造をキャプチャーし、デフォルトのスナップショットワークフロー に記載されているすべてのステップを実行します。ただし、コネクターの起動時 (Step 6) の時点でデータセットを表す |
|
|
非推奨です。 |
|
|
損失または破損したデータベーススキーマの履歴トピックを復元するにはこのオプションを設定します。再起動後、コネクターはソーステーブルからトピックを再構築するスナップショットを実行します。また、このプロパティーを設定して、予期しない増加が発生するデータベーススキーマ履歴トピックを定期的にプルーニングすることもできます。 |
|
| コネクターが起動した後、次のいずれかの状況を検出した場合にのみスナップショットが実行されます。
|
詳細は、コネクター設定プロパティーテーブルの snapshot.mode を参照してください。
2.5.1.1.2. 初期スナップショットがすべてのテーブルのスキーマ履歴をキャプチャーする理由 リンクのコピーリンクがクリップボードにコピーされました!
コネクターが実行する最初のスナップショットは、2 種類の情報をキャプチャーします。
- テーブルデータ
-
コネクターの
table.include.listプロパティーにあるテーブルのINSERT、UPDATE、およびDELETE操作に関する情報。 - スキーマデータ
- テーブルに適用される構造の変更を記述する DDL ステートメント。スキーマデータは、内部スキーマ履歴トピックとコネクターのスキーマ変更トピック (設定されている場合) の両方に保持されます。
初期スナップショットを実行すると、キャプチャー対象として指定されていないテーブルのスキーマ情報がスナップショットによってキャプチャーされることが分かります。デフォルトでは、初期スナップショットは、キャプチャー用に指定されたテーブルからだけでなく、データベースに存在するすべてのテーブルのスキーマ情報を取得するように設計されています。コネクターでは、テーブルのスキーマがスキーマ履歴トピックにある状態で、テーブルをキャプチャーする必要があります。初期スナップショットが元のキャプチャーセットの一部ではないテーブルのスキーマデータをキャプチャーできるようにして、後で必要になった場合にこれらのテーブルからイベントデータを簡単にキャプチャーできるように、Debezium はコネクターを準備します。初期スナップショットがテーブルのスキーマをキャプチャーしない場合は、コネクターがテーブルからデータをキャプチャーする前に、履歴トピックにスキーマを追加する必要があります。
場合によっては、最初のスナップショットでのスキーマキャプチャーを制限する場合があります。これは、スナップショットの完了に必要な時間の短縮に便利です。または、Debezium が複数の論理データベースにアクセスできるユーザーアカウントを使用して、データベースインスタンスに接続しているにもかかわらず、コネクターで特定の論理データベース内のテーブルからの変更のみをキャプチャーする場合にも便利です。
関連情報
- 初期スナップショットでキャプチャーされなかったテーブルからのデータのキャプチャー (スキーマ変更なし)
- 初期スナップショットでキャプチャーされなかったテーブルからのデータのキャプチャー (スキーマ変更)
-
schema.history.internal.store.only.captured.tables.ddlプロパティーを設定して、スキーマ情報をキャプチャーするテーブルを指定します。 -
schema.history.internal.store.only.captured.databases.ddlプロパティーを設定して、スキーマ変更をキャプチャーする論理データベースを指定します。
2.5.1.1.3. 初期スナップショットでキャプチャーされなかったテーブルからのデータのキャプチャー (スキーマ変更なし) リンクのコピーリンクがクリップボードにコピーされました!
コネクターを使用して、最初のスナップショットでスキーマがキャプチャーされなかったテーブルからデータをキャプチャーする場合があります。コネクターの設定によっては、最初のスナップショットはデータベース内の特定のテーブルのテーブルスキーマのみをキャプチャーする場合があります。テーブルスキーマが履歴トピックに存在しない場合、コネクターはテーブルのキャプチャーに失敗し、スキーマ欠落エラーを報告します。
テーブルからデータを取得できる場合もありますが、テーブルスキーマを追加するには別の手順を実行する必要があります。
前提条件
- コネクターにより最初のスナップショット中にキャプチャーされなかったスキーマが含まれるテーブルからデータをキャプチャーしたいと考えている。
- トランザクションログでは、テーブルのすべてのエントリーが同じスキーマを使用します。構造が変更された新しいテーブルからデータを取得する方法は、「初期スナップショットでキャプチャーされなかったテーブルからのデータのキャプチャー (スキーマ変更)」 を参照してください。
手順
- コネクターを停止します。
-
schema.history.internal.kafka.topic プロパティーで指定された内部データベーススキーマ履歴トピックを削除します。 コネクター設定で、以下を行います。
-
snapshot.modeをrecoveryに設定します。 -
(オプション)
schema.history.internal.store.only.captured.tables.ddlの値をfalseに設定して、コネクターが現在キャプチャー対象として指定されていないテーブルのデータを今後容易にキャプチャーできるようにします。コネクターは、テーブルのスキーマ履歴が履歴トピックに存在する場合にのみ、テーブルからデータをキャプチャーできます。 -
コネクターがキャプチャーするテーブルを
table.include.listに追加します。
-
- コネクターを再起動します。スナップショットのリカバリープロセスでは、テーブルの現在の構造に基づいてスキーマ履歴が再ビルドされます。
- (オプション) スナップショットが完了したら、新しく追加されたテーブルで 増分スナップショット を開始します。増分スナップショットは、最初に新しく追加されたテーブルの履歴データをストリーミングし、次に、そのコネクターがオフラインの間に発生した変更など、以前に設定されたテーブルの REDO ログとアーカイブログからの変更の読み取りを再開します。
-
(オプション)
snapshot.modeをno_dataにリセットして、今後の再起動後にコネクターが回復を開始しないようにします。
2.5.1.1.4. 初期スナップショットでキャプチャーされなかったテーブルからのデータのキャプチャー (スキーマ変更) リンクのコピーリンクがクリップボードにコピーされました!
スキーマ変更がテーブルに適用される場合、スキーマ変更前にコミットされたレコードの構造は、変更後にコミットされたレコードとは異なります。Debezium はテーブルからデータをキャプチャーするときに、スキーマ履歴を読み取り、各イベントに正しいスキーマが適用されていることを確認します。スキーマがスキーマ履歴トピックに存在しない場合、コネクターはテーブルをキャプチャーできず、エラーが発生します。
最初のスナップショットでキャプチャーされず、テーブルのスキーマが変更されたテーブルからデータをキャプチャーする場合、スキーマがまだ使用可能でない場合は、履歴トピックにスキーマを追加する必要があります。新しいスキーマスナップショットを実行するか、テーブルの初期スナップショットを実行して、スキーマを追加できます。
前提条件
- コネクターにより最初のスナップショット中にキャプチャーされなかったスキーマが含まれるテーブルからデータをキャプチャーしたいと考えている。
- スキーマ変更がテーブルに適用されたため、キャプチャーされるレコードの構造が不均一になっている。
手順
- 初期スナップショットにすべてのテーブルのスキーマがキャプチャーされている場合 (
store.only.captured.tables.ddlはfalseに設定されました)。 -
table.include.listプロパティーを編集して、キャプチャーするテーブルを指定します。 - コネクターを再起動します。
- 新しく追加したテーブルから既存のデータをキャプチャーする場合は、増分スナップショット を開始します。
-
- 初期スナップショットにすべてのテーブルのスキーマがキャプチャーされていない場合 (
store.only.captured.tables.ddlがtrueに設定されています)。 最初のスナップショットでキャプチャーするテーブルのスキーマが保存されなかった場合は、次のいずれかの手順を実行します。
- 手順 1: スキーマスナップショット、その後に増分スナップショット
この手順では、コネクターは最初にスキーマのスナップショットを実行します。その後、増分スナップショットを開始して、コネクターがデータを同期できるようにします。
- コネクターを停止します。
-
schema.history.internal.kafka.topic プロパティーで指定された内部データベーススキーマ履歴トピックを削除します。 設定された Kafka Connect
offset.storage.topic内のオフセットをクリアします。オフセットを削除する方法の詳細は、Debezium コミュニティーの FAQ を参照してください。警告オフセットの削除は、内部 Kafka Connect データの操作の経験がある上級ユーザーのみが実行してください。この操作によりシステムが破損する場合があるため、最後の手段としてのみ実行してください。
次の手順の説明に従って、コネクター設定のプロパティーの値を設定します。
-
snapshot.modeプロパティーの値をno_dataに設定します。 -
table.include.listを編集して、キャプチャーするテーブルを追加します。
-
- コネクターを再起動します。
- Debezium が新規および既存のテーブルのスキーマをキャプチャーするまで待ちます。コネクターが停止した後にテーブルで発生したデータ変更はキャプチャーされません。
- データが損失されないようにするには、増分スナップショット を開始します。
- 手順 2: 初期スナップショットと、それに続くオプションの増分スナップショット
この手順では、コネクターはデータベースの完全な初期スナップショットを実行します。他の初期スナップショットと同様、多数の大きなテーブルが含まれるデータベースでは、初期スナップショットの実行操作には時間がかかる可能性があります。スナップショットの完了後、任意で増分スナップショットをトリガーして、コネクターがオフラインの間に発生した変更をキャプチャーできます。
- コネクターを停止します。
-
schema.history.internal.kafka.topic プロパティーで指定された内部データベーススキーマ履歴トピックを削除します。 設定された Kafka Connect
offset.storage.topic内のオフセットをクリアします。オフセットを削除する方法の詳細は、Debezium コミュニティーの FAQ を参照してください。警告オフセットの削除は、内部 Kafka Connect データの操作の経験がある上級ユーザーのみが実行してください。この操作によりシステムが破損する場合があるため、最後の手段としてのみ実行してください。
-
table.include.listを編集して、キャプチャーするテーブルを追加します。 次の手順の説明に従って、コネクター設定のプロパティーの値を設定します。
-
snapshot.modeプロパティーの値をinitialに設定します。 -
(オプション)
schema.history.internal.store.only.captured.tables.ddlをfalseに設定します。
-
- コネクターを再起動します。コネクターはデータベース全体のスナップショットを取得します。スナップショットが完了すると、コネクターはストリーミングに移行します。
- (オプション) コネクターがオフラインの間に変更されたデータをキャプチャーするには、増分スナップショット を開始します。
2.5.1.2. アドホックスナップショット リンクのコピーリンクがクリップボードにコピーされました!
デフォルトでは、コネクターは初回スナップショット操作の開始後にのみ実行されます。通常の状況では、この最初のスナップショットが作成されると、コネクターではスナップショットプロセスは繰り返し処理されません。コネクターがキャプチャーする今後の変更イベントデータはストリーミングプロセス経由でのみ行われます。
ただし、場合によっては、最初のスナップショット中にコネクターを取得したデータが古くなったり、失われたり、または不完全となったり可能性があります。テーブルデータを再キャプチャーするメカニズムを提供するため、Debezium にはアドホックスナップショットを実行するオプションがあります。Debezium 環境で次のいずれかの変更が発生したら、アドホックスナップショットを実行することを推奨します。
- コネクター設定は、異なるテーブルセットをキャプチャーするように変更されます。
- Kafka トピックを削除して、再構築する必要があります。
- 設定エラーや他の問題が原因で、データの破損が発生します。
アドホックと呼ばれるスナップショット を開始することで、以前にスナップショットをキャプチャーしたテーブルのスナップショットを再実行できます。アドホックスナップショットには、シグナルテーブル を使用する必要があります。シグナルリクエストを Debezium シグナルテーブルに送信して、アドホックスナップショットを開始します。
既存のテーブルのアドホックスナップショットを開始すると、コネクターはテーブルにすでに存在するトピックにコンテンツを追加します。既存のトピックが削除された場合には、トピックの自動作成 が有効になっているのであれば、Debezium は自動的にトピックを作成できます。
アドホックのスナップショットシグナルは、スナップショットに追加するテーブルを指定します。スナップショットは、データベースの内容全体をキャプチャーしたり、データベース内のテーブルのサブセットのみをキャプチャーしたりできます。また、スナップショットは、データベース内のテーブルの内容のサブセットをキャプチャできます。
execute-snapshot メッセージをシグナルテーブルに送信してキャプチャーするテーブルを指定します。execute-snapshot シグナルのタイプを incremental または blocking に設定し、スナップショットに含めるテーブルの名前を次の表に示すように指定します。
| フィールド | デフォルト | 値 |
|---|---|---|
|
|
|
実行するスナップショットのタイプを指定します。 |
|
| 該当なし |
スナップショットに含めるテーブルの完全修飾名に一致する正規表現を含む配列。 |
|
| 該当なし |
コネクターがスナップショットに含めるレコードのサブセットを決定するために評価する追加条件のセットを指定する、オプションの配列。
|
|
| 該当なし | スナップショット処理中にコネクターがテーブルのプライマリーキーとして使用する列名を指定するオプションの文字列。 |
アドホック増分スナップショットのトリガー
アドホック増分スナップショットを開始するには、execute-snapshot シグナルタイプのエントリーをシグナリングテーブルに追加するか、シグナルメッセージを Kafka シグナリングトピックに送信します。コネクターがメッセージを処理した後に、スナップショット操作を開始します。スナップショットプロセスは、最初と最後のプライマリーキーの値を読み取り、これらの値を各テーブルの開始ポイントおよびエンドポイントとして使用します。テーブルのエントリー数と設定されたチャンクサイズに基づいて、Debezium はテーブルをチャンクに分割し、チャンクごとに 1 度に 1 つずつスナップショットを順番に作成していきます。
詳細は、スナップショットの増分 を参照してください。
アドホックブロッキングスナップショットのトリガー
シグナリングテーブルまたはシグナリングトピックに、execute-snapshot シグナルタイプを持つエントリーを追加することによって、アドホックブロッキングスナップショットを開始します。コネクターがメッセージを処理した後に、スナップショット操作を開始します。コネクターはストリーミングを一時的に停止し、初期スナップショットの時と同じプロセスに従って、指定されたテーブルのスナップショットを開始します。スナップショットが完了すると、コネクターはストリーミングを再開します。
詳細は、ブロッキングスナップショット を参照してください。
2.5.1.3. 増分スナップショット リンクのコピーリンクがクリップボードにコピーされました!
スナップショットを柔軟に管理するため、Debezium には 増分スナップショット と呼ばれる補助スナップショットメカニズムが含まれています。増分スナップショットは、Debezium コネクターにシグナルを送信 するための Debezium メカニズムに依存します。
増分スナップショットでは、最初のスナップショットのように、データベースの完全な状態を一度にすべてキャプチャーする代わりに、一連の設定可能なチャンクで各テーブルを段階的にキャプチャーします。スナップショットがキャプチャーするテーブルと、各チャンクのサイズ を指定できます。チャンクのサイズにより、データベース上の各フェッチ操作中にスナップショットで収集される行数が決まります。増分スナップショットのデフォルトのチャンクサイズは 1024 行です。
増分スナップショットが進むと、Debezium はウォーターマークを使用して進捗を追跡し、キャプチャーする各テーブル行のレコードを管理します。この段階的なアプローチでは、標準の初期スナップショットプロセスと比較して、以下の利点があります。
- スナップショットが完了するまで、ストリーミングストリーミングを延期する代わりに、ストリームしたデータキャプチャーと並行して増分スナップショットを実行できます。コネクターはスナップショットプロセス全体で変更ログからのほぼリアルタイムイベントをキャプチャーし続け、他の操作はブロックしません。
- 増分スナップショットの進捗が中断された場合は、データを失うことなく再開できます。プロセスが再開すると、スナップショットは最初からテーブルをキャプチャーするのではなく、停止した時点から開始します。
-
いつでも増分スナップショットを実行し、必要に応じてプロセスを繰り返してデータベースの更新に適合できます。たとえば、コネクター設定を変更してテーブルを
table.include.listプロパティーに追加した後にスナップショットを再実行します。
増分スナップショットプロセス
増分スナップショットを実行する場合には、Debezium は各テーブルをプライマリーキー別に分類して、設定されたチャンクサイズ に基づいてテーブルをチャンクに分割します。チャンクごとに作業し、テーブルの行ごとにチャンクでキャプチャーします。キャプチャーする行ごとに、スナップショットは READ イベントを出力します。そのイベントは、対象となるチャンクのスナップショットを開始する時の行の値を表します。
スナップショットの作成が進むにつれ、他のプロセスがデータベースへのアクセスを継続し、テーブルレコードが変更される可能性があります。このような変更を反映させるように、通常通りに INSERT、UPDATE、DELETE 操作がトランザクションログにコミットされます。同様に、継続中の Debezium ストリーミングプロセスは、これらの変更イベントを検出し、対応する変更イベントレコードを Kafka に出力します。
Debezium を使用してプライマリーキーが同じレコード間での競合を解決する方法
場合によっては、ストリーミングプロセスが出力する UPDATE または DELETE イベントを順番に受信できます。つまり、ストリーミングプロセスは、スナップショットがその行の READ イベントが含まれるチャンクをキャプチャーする前に、テーブルの行を変更するイベントを生成する可能性があります。スナップショットが最終的に対象の行にあった READ イベントを出力すると、その値はすでに置き換えられています。Debezium は、シーケンスが到達する増分スナップショットイベントが正しい論理順序で処理されるように、競合を解決するためにバッファースキームを使用します。スナップショットのイベント間で競合が発生し、ストリームされたイベントが解決されてからでないと、Debezium はイベントのレコードを Kafka に送信しません。
スナップショットウィンドウ
遅れて入ってきた READ イベントと、同じテーブルの行を変更するストリーミングイベント間の競合の解決を容易にするために、Debezium は スナップショットウィンドウ と呼ばれるものを使用します。スナップショットウィンドウは、増分スナップショットが指定のテーブルチャンクのデータをキャプチャーしている途中に、間隔を決定します。チャンクのスナップショットウィンドウを開く前に、Debezium は通常の動作に従い、トランザクションログから直接ターゲットの Kafka トピックにイベントをダウンストリームに出力します。ただし、特定のチャンクのスナップショットが開放された瞬間から終了するまで、Debezium は重複除去のステップを実行して、プライマリーキーが同じイベント間での競合を解決します。
データコレクションごとに、Debezium は 2 種類のイベントを出力し、それらの両方のレコードを単一の宛先 Kafka トピックに保存します。テーブルから直接キャプチャーするスナップショットレコードは、READ 操作として出力されます。その間、ユーザーはデータコレクションのレコードの更新を続け、各コミットを反映するようにトランザクションログが更新されるので、Debezium は変更ごとに UPDATE または DELETE 操作を出力します。
スナップショットウィンドウが開放され、Debezium がスナップショットチャンクの処理を開始すると、スナップショットレコードをメモリーバッファーに提供します。スナップショットウィンドウ中に、バッファー内の READ イベントのプライマリーキーは、受信ストリームイベントのプライマリーキーと比較されます。一致するものが見つからない場合、ストリーミングされたイベントレコードが Kafka に直接送信されます。Debezium が一致を検出すると、バッファーされた READ イベントを破棄し、ストリーミングされたレコードを宛先トピックに書き込みます。これは、ストリーミングされたイベントが静的スナップショットイベントよりも論理的に優先されるためです。チャンクのスナップショットウィンドウが終了すると、バッファーに含まれるのは、関連するトランザクションログイベントが存在しない READ イベントのみです。Debezium は、これらの残りの READ イベントをテーブルの Kafka トピックに出力します。
コネクターは各スナップショットチャンクにプロセスを繰り返します。
現在、増分スナップショットを開始するには、次のいずれかの方法を使用できます。
Oracle の Debezium コネクターでは、増分スナップショットの実行中のスキーマの変更はサポートしません。
2.5.1.3.1. 増分スナップショットのトリガー リンクのコピーリンクがクリップボードにコピーされました!
増分スナップショットを開始するには、ソースデータベースのシグナリングテーブルに アドホックスナップショットシグナル を送信します。スナップショットシグナルは SQL INSERT クエリーとして送信します。
Debezium がシグナルテーブルの変更を検出すると、シグナルを読み取り、要求されたスナップショット操作を実行します。
送信するクエリーはスナップショットに追加するテーブルを指定し、必要に応じてスナップショット操作の種類を指定します。Debezium は現在、incremental と blocking のスナップショットタイプをサポートしています。
スナップショットに追加するテーブルを指定するには、テーブルをリストする data-collections 配列またはテーブルの照合に使用する正規表現の配列を指定します。以下に例を示します。
{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}
増分スナップショットシグナルの data-collections アレイにはデフォルト値がありません。data-collections 配列が空の場合、Debezium は空の配列をアクションが必要ないと解釈し、スナップショットは作成しません。
スナップショットに含めるテーブルの名前にドット (.)、スペース、またはその他の英数字以外の文字が含まれている場合は、テーブル名を二重引用符でエスケープする必要があります。
たとえば、db1 データベースの public スキーマに存在し、My.Table という名前のテーブルを含めるには、"db1.public.\"My.Table\"" の形式を使用します。
前提条件
- ソースデータベースにシグナリングデータコレクションが存在する。
-
シグナルデータコレクションが
signal.data.collectionプロパティーで指定されている。
ソースシグナリングチャネルを使用して増分スナップショットをトリガーする
SQL クエリーを送信し、アドホック増分スナップショット要求をシグナルテーブルに追加します。
INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"<snapshotType>","additional-conditions":[{"data-collection": "<fullyQualfiedTableName>", "filter": "<additional-condition>"}]}');INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"<snapshotType>","additional-conditions":[{"data-collection": "<fullyQualfiedTableName>", "filter": "<additional-condition>"}]}');Copy to Clipboard Copied! Toggle word wrap Toggle overflow 以下に例を示します。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow コマンドの
id、type、およびdataパラメーターの値は、シグナルテーブルのフィールド に対応します。
以下の表では、この例のパラメーターを説明しています。Expand 表2.108 シグナルテーブルに増分スナップショットシグナルを送信する SQL コマンドのフィールドの説明 項目 値 説明 1
database.schema.debezium_signalソースデータベースにあるシグナルテーブルの完全修飾名を指定します。
2
ad-hoc-1idパラメーターは、シグナルリクエストのID識別子として割り当てられる任意の文字列を指定します。
この文字列を使用して、シグナルテーブルのエントリーへのログメッセージを特定します。Debezium はこの文字列を使用しません。代わりに、スナップショット作成中に、Debezium は独自のID文字列をウォーターマークシグナルとして生成します。3
execute-snapshottypeパラメーターを指定し、シグナルがトリガーする操作を指定します。
4
data-collectionsシグナルの
dataフィールドの必須コンポーネントで、スナップショットに含めるテーブル名の配列またはテーブル名と一致する正規表現を指定します。
配列には、database.schema.table形式を使用してテーブルの完全修飾名と一致する正規表現がリストされます。この形式は、コネクターの シグナリングテーブル の名前を指定するために使用する形式と同じです。5
incremental実行するスナップショット操作のタイプを指定する、シグナルの
dataフィールドのオプションのtypeコンポーネント。
有効な値はincrementalとblockingです。
値を指定しない場合、コネクターはデフォルトで増分スナップショットを実行します。6
additional-conditionsコネクターがスナップショットに含めるレコードのサブセットを決定するために評価する追加条件のセットを指定する、オプションの配列。
各追加条件は、data-collectionプロパティーとfilterプロパティーを持つオブジェクトです。データの収集単位で異なるフィルターを指定できます。
*data-collectionプロパティーは、フィルターが適用されるデータコレクションの完全修飾名です。additional-conditionsパラメーターの詳細は、「additional-conditions付きでアドホック増分スナップショットを実行する」 を参照してください。
2.5.1.3.2. additional-conditions 付きでアドホック増分スナップショットを実行する リンクのコピーリンクがクリップボードにコピーされました!
スナップショットに、テーブル内のコンテンツのサブセットのみを含める場合は、スナップショットシグナルに additional-conditions パラメーターを追加してシグナル要求を変更できます。
一般的なスナップショットの SQL クエリーは、以下の形式を取ります。
SELECT * FROM <tableName> ....
SELECT * FROM <tableName> ....
additional-conditions パラメーターを追加して、以下の例のように WHERE 条件を SQL クエリーに追加します。
SELECT * FROM <data-collection> WHERE <filter> ....
SELECT * FROM <data-collection> WHERE <filter> ....
以下の例は、シグナルテーブルに追加の条件を含むアドホック増分スナップショット要求を送信する SQL クエリーを示しています。
INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"<snapshotType>","additional-conditions":[{"data-collection": "<fullyQualfiedTableName>", "filter": "<additional-condition>"}]}');
INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"<snapshotType>","additional-conditions":[{"data-collection": "<fullyQualfiedTableName>", "filter": "<additional-condition>"}]}');
たとえば、以下の列が含まれる products テーブルがあるとします。
-
id(プライマリーキー) -
color -
quantity
products テーブルの増分スナップショットに color=blue のデータ項目のみを含める場合は、次の SQL ステートメントを使用してスナップショットをトリガーできます。
INSERT INTO db1.myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["db1.schema1.products"],"type":"incremental", "additional-conditions":[{"data-collection": "db1.schema1.products", "filter": "color=blue"}]}');
INSERT INTO db1.myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["db1.schema1.products"],"type":"incremental", "additional-conditions":[{"data-collection": "db1.schema1.products", "filter": "color=blue"}]}');
additional-conditions パラメーターを使用すると、列が 2 つ以上となる条件を指定することもできます。たとえば、前述の例の products テーブルを使用して、color=blue および quantity>10 だけに一致するアイテムのみのデータが含まれる増分スナップショットをトリガーするクエリーを送信できます。
INSERT INTO db1.myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["db1.schema1.products"],"type":"incremental", "additional-conditions":[{"data-collection": "db1.schema1.products", "filter": "color=blue AND quantity>10"}]}');
INSERT INTO db1.myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["db1.schema1.products"],"type":"incremental", "additional-conditions":[{"data-collection": "db1.schema1.products", "filter": "color=blue AND quantity>10"}]}');
以下の例は、コネクターによってキャプチャーされる増分スナップショットイベントの JSON を示しています。
例2.32 増分スナップショットイベントメッセージ
| 項目 | フィールド名 | 説明 |
|---|---|---|
| 1 |
|
実行するスナップショット操作タイプを指定します。 |
| 2 |
|
イベントタイプを指定します。 |
2.5.1.3.3. Kafka シグナルチャネルを使用して増分スナップショットをトリガーする リンクのコピーリンクがクリップボードにコピーされました!
設定された Kafka トピック にメッセージを送信して、コネクターにアドホック増分スナップショットを実行するよう要求できます。
Kafka メッセージのキーは、topic.prefix コネクター設定オプションの値と一致する必要があります。
メッセージの値は、type と data フィールドが含まれる JSON オブジェクトとなっています。
シグナルタイプは execute-snapshot で、data フィールドには以下のフィールドが必要です。
| フィールド | デフォルト | 値 |
|---|---|---|
|
|
|
実行するスナップショットのタイプ。現在、Debezium は |
|
| 該当なし |
スナップショットに含めるテーブルの完全修飾名と一致する、コンマ区切りの正規表現の配列。 |
|
| 該当なし |
コネクターがスナップショットに含めるレコードのサブセットを指定するために評価する基準を指定する、オプションの追加条件の配列。 |
例2.33 execute-snapshot Kafka メッセージ
Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["{collection-container}.table1", "{collection-container}.table2"], "type": "INCREMENTAL"}}`
Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["{collection-container}.table1", "{collection-container}.table2"], "type": "INCREMENTAL"}}`
additional-conditions 付きのアドホック増分スナップショット
Debezium は additional-conditions フィールドを使用してテーブルのコンテンツのサブセットを選択します。
通常、Debezium はスナップショットを実行するときに、次のような SQL クエリーを実行します。
SELECT * FROM <tableName> ….
スナップショット要求に additional-conditions プロパティーが含まれている場合、プロパティーの data-collection および filter パラメーターが SQL クエリーに追加されます。次に例を示します。
SELECT * FROM <data-collection> WHERE <filter> ….
たとえば、列 id (プライマリーキー)、color、および brand を含む products テーブルがある場合、スナップショットに color='blue' のコンテンツのみを含める場合は、スナップショットをリクエストするときに、コンテンツをフィルタリングする additional-conditions プロパティーを追加することができます。
Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["db1.schema1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "db1.schema1.products" ,"filter":"color='blue'"}]}}`
Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["db1.schema1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "db1.schema1.products" ,"filter":"color='blue'"}]}}`
また、additional-conditions プロパティーを使用して、複数の列に基づいて条件を渡すこともできます。たとえば、前の例と同じ products テーブルを使用して、color='blue' および brand='MyBrand' である products テーブルのコンテンツのみをスナップショットに含める場合は、次のリクエストを送信できます。
Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["db1.schema1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "db1.schema1.products" ,"filter":"color='blue' AND brand='MyBrand'"}]}}`
Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["db1.schema1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "db1.schema1.products" ,"filter":"color='blue' AND brand='MyBrand'"}]}}`
2.5.1.3.4. 増分スナップショットの停止 リンクのコピーリンクがクリップボードにコピーされました!
状況によっては、増分スナップショットを停止する必要がある場合があります。たとえば、スナップショットが正しく設定されていない場合や、他のデータベース操作にリソースが使用可能であるこのとの確認が必要な場合があります。ソースデータベースのシグナリングテーブルにシグナルを送信することで、すでに実行中のスナップショットを停止できます。
スナップショット停止信号をシグナリングテーブルに送信するには、SQL INSERT クエリーで送信します。stop-snapshot シグナルは、スナップショット操作の type を incremental として指定し、オプションで、現在実行中のスナップショットから省略するテーブルを指定します。Debezium はシグナルテーブルの変更を検出した後、シグナルを読み、増分スナップショット操作が進行中であればそれを停止します。
関連情報
また、JSON メッセージを Kafka シグナリングトピック に送信して、増分スナップショットを停止することもできます。
前提条件
- ソースデータベースにシグナリングデータコレクションが存在する。
-
シグナルデータコレクションが
signal.data.collectionプロパティーで指定されている。
ソースシグナリングチャネルを使用して増分スナップショットを停止する
SQL クエリーを送信して、シグナリングテーブルへのアドホックインクリメンタルスナップショットを停止します。
INSERT INTO <signalTable> (id, type, data) values ('<id>', 'stop-snapshot', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"incremental"}');INSERT INTO <signalTable> (id, type, data) values ('<id>', 'stop-snapshot', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"incremental"}');Copy to Clipboard Copied! Toggle word wrap Toggle overflow 以下に例を示します。
INSERT INTO db1.myschema.debezium_signal (id, type, data) values ('ad-hoc-1', 'stop-snapshot', '{"data-collections": ["db1.schema1.table1", "db1.schema1.table2"], "type":"incremental"}');INSERT INTO db1.myschema.debezium_signal (id, type, data)1 values ('ad-hoc-1',2 'stop-snapshot',3 '{"data-collections": ["db1.schema1.table1", "db1.schema1.table2"],4 "type":"incremental"}');5 Copy to Clipboard Copied! Toggle word wrap Toggle overflow signal コマンドの
id、type、およびdataパラメーターの値は、シグナリングテーブルのフィールド に対応します。
以下の表では、この例のパラメーターを説明しています。Expand 表2.111 シグナリングテーブルに増分スナップショット停止信号を送信するための SQL コマンドのフィールドの説明 項目 値 説明 1
database.schema.debezium_signalソースデータベースにあるシグナルテーブルの完全修飾名を指定します。
2
ad-hoc-1idパラメーターは、シグナルリクエストのID識別子として割り当てられる任意の文字列を指定します。
この文字列を使用して、シグナルテーブルのエントリーへのログメッセージを特定します。Debezium はこの文字列を使用しません。3
stop-snapshottypeパラメーターを指定し、シグナルがトリガーする操作を指定します。
4
data-collectionsシグナルの
dataフィールドのオプションコンポーネントで、スナップショットから削除するテーブル名の配列またはテーブル名とマッチする正規表現を指定します。
配列には、database.schema.tableの形式で完全修飾名でテーブルに一致する正規表現がリストされます。dataフィールドからこのコンポーネントを省略すると、シグナルによって進行中の増分スナップショット全体が停止されます。5
incremental停止するスナップショット操作のタイプを指定する信号の
dataフィールドの必須コンポーネント。
現在、有効な唯一のオプションはincrementalです。typeの値を指定しない場合、シグナルは増分スナップショットの停止に失敗します。
2.5.1.3.5. Kafka シグナリングチャネルを使用して増分スナップショットを停止する リンクのコピーリンクがクリップボードにコピーされました!
設定された Kafka シグナリングトピック にシグナルメッセージを送信して、アドホック増分スナップショットを停止できます。
Kafka メッセージのキーは、topic.prefix コネクター設定オプションの値と一致する必要があります。
メッセージの値は、type と data フィールドが含まれる JSON オブジェクトとなっています。
シグナルタイプは stop-snapshot で、data フィールドには以下のフィールドが必要です。
| フィールド | デフォルト | 値 |
|---|---|---|
|
|
|
実行するスナップショットのタイプ。現在、Debezium は |
|
| 該当なし |
テーブルの完全修飾名に一致する、コンマで区切られた正規表現のオプションの配列、スナップショットから削除するテーブル名に一致するテーブル名または正規表現の配列。 |
次の例は、典型的な stop-snapshot の Kafka メッセージを示しています。
Key = `test_connector`
Value = `{"type":"stop-snapshot","data": {"data-collections": ["db1.schema1.table1", "db1.schema1.table2"], "type": "INCREMENTAL"}}`
Key = `test_connector`
Value = `{"type":"stop-snapshot","data": {"data-collections": ["db1.schema1.table1", "db1.schema1.table2"], "type": "INCREMENTAL"}}`
2.5.1.4. ブロッキングスナップショット リンクのコピーリンクがクリップボードにコピーされました!
スナップショットをより柔軟に管理するために、Debezium には ブロッキングスナップショット と呼ばれる補助アドホックスナップショットメカニズムが含まれています。ブロッキングスナップショットは、Debezium コネクターにシグナルを送信 するための Debezium メカニズムに依存します。
ブロッキングスナップショットは、ランタイム時にトリガーできることを除いて、初期スナップショット と同じように動作します。
次のような状況では、標準の初期スナップショットプロセスを使用するのではなく、ブロッキングスナップショットを実行する必要があります。
- 新しいテーブルを追加し、コネクターの実行中にスナップショットを完了したいと考えている。
- 大きなテーブルを追加し、増分スナップショットよりも短い時間でスナップショットを完了したいと考えている。
ブロッキングスナップショットのプロセス
ブロッキングスナップショットを実行すると、Debezium はストリーミングを停止し、初期スナップショットの時と同じプロセスに従って、指定されたテーブルのスナップショットを開始します。スナップショットが完了すると、ストリーミングが再開されます。
スナップショットの設定
シグナルの data コンポーネントでは、次のプロパティーを設定できます。
- data-collections: スナップショットする必要のあるテーブルを指定します。
-
data-collections: スナップショットに含めるテーブルを指定します。
このプロパティーは、完全修飾テーブル名に一致する正規表現のコンマ区切りリストを受け入れます。プロパティーの動作は、ブロッキングスナップショットでキャプチャーするテーブルを指定するtable.include.listプロパティーの動作と似ています。 additional-conditions: テーブルごとに異なるフィルターを指定できます。
-
data-collectionプロパティーは、フィルターが適用されるテーブルの完全修飾名であり、データベースに応じて大文字と小文字を区別するか、区別しないかを指定できます。 -
filterプロパティーは、snapshot.select.statement.overridesで使用される値と同じものが設定されます。これは、大文字小文字を区別して一致させる必要があるテーブルの完全修飾名です。
-
以下に例を示します。
{"type": "blocking", "data-collections": ["schema1.table1", "schema1.table2"], "additional-conditions": [{"data-collection": "schema1.table1", "filter": "SELECT * FROM [schema1].[table1] WHERE column1 = 0 ORDER BY column2 DESC"}, {"data-collection": "schema1.table2", "filter": "SELECT * FROM [schema1].[table2] WHERE column2 > 0"}]}
{"type": "blocking", "data-collections": ["schema1.table1", "schema1.table2"], "additional-conditions": [{"data-collection": "schema1.table1", "filter": "SELECT * FROM [schema1].[table1] WHERE column1 = 0 ORDER BY column2 DESC"}, {"data-collection": "schema1.table2", "filter": "SELECT * FROM [schema1].[table2] WHERE column2 > 0"}]}
重複の可能性
スナップショットをトリガーするシグナルを送信した時点と、ストリーミングが停止してスナップショットが開始する時点との間に遅延が生じる可能性があります。この遅延の結果、スナップショットが完了した後、コネクターはスナップショットがキャプチャーしたレコードと重複するイベントレコードを発行する可能性があります。
2.5.1.5. Debezium Oracle 変更イベントレコードを受信する Kafka トピックのデフォルト名 リンクのコピーリンクがクリップボードにコピーされました!
デフォルトでは、Oracle コネクターは、テーブルで発生するすべての INSERT、UPDATE、DELETE 操作の変更イベントを、そのテーブルに固有の単一の Apache Kafka トピックに書き込みます。コネクターは以下の規則を使用して変更イベントトピックに名前を付けます。
topicPrefix.schemaName.tableName
以下のリストは、デフォルト名のコンポーネントの定義を示しています。
- topicPrefix
-
topic.prefixコネクター設定プロパティーで指定されたトピック接頭辞。 - schemaName
- 操作が発生したスキーマの名前。
- tableName
- 操作が発生したテーブルの名前。
たとえば、fulfillment がサーバー名、inventory がスキーマ名で、データベースに orders、customers、products という名前のテーブルが含まれる場合には、Debezium Oracle コネクターは、データベースのテーブルごとに 1 つ、以下の Kafka トピックにイベントを出力します。
fulfillment.inventory.orders fulfillment.inventory.customers fulfillment.inventory.products
fulfillment.inventory.orders
fulfillment.inventory.customers
fulfillment.inventory.products
コネクターは同様の命名規則を適用して、内部データベーススキーマの履歴トピック (スキーマ変更トピック と トランザクションメタデータトピック) にラベルを付けます。
デフォルトのトピック名が要件を満たさない場合は、カスタムトピック名を設定できます。カスタムトピック名を設定するには、論理トピックルーティング SMT に正規表現を指定します。論理トピックルーティング SMT を使用してトピックの命名をカスタマイズする方法は、トピックルーティング を参照してください。
2.5.1.6. Debezium Oracle コネクターによるデータベーススキーマの変更の処理方法 リンクのコピーリンクがクリップボードにコピーされました!
データベースクライアントがデータベースのクエリーを行うと、クライアントはデータベースの現在のスキーマを使用します。しかし、データベーススキーマはいつでも変更が可能です。そのため、挿入、更新、または削除の操作が記録されるたびに、コネクターはどのスキーマであるかを特定できる必要があります。また、コネクターは必ずしも現在のスキーマをすべてのイベントに適用できるとは限りません。イベントが比較的古い場合は、現在のスキーマが適用される前に記録された可能性があります。
スキーマ変更後に発生するイベントを正しく処理するために、Oracle には、データに影響を与える行レベルの変更だけでなく、データベースに適用される DDL ステートメントも REDO ログに含めます。コネクターは、redo ログ内でこれらの DDL ステートメントを検出すると、そのステートメントを解析し、各テーブルのスキーマのインメモリー表現を更新します。コネクターはこのスキーマ表現を使用して、挿入、更新、または削除の操作時にテーブルの構造を特定し、適切な変更イベントを生成します。別のデータベーススキーマ履歴 Kafka トピックでは、コネクターは各 DDL ステートメントがある redo ログの場所とともにすべての DDL ステートメントを記録します。
クラッシュするか、正常に停止した後に、コネクターを再起動すると、特定の位置 (特定の時点) から redo ログの読み取りを開始します。コネクターは、データベーススキーマ履歴の Kafka トピックを読み取り、コネクターが起動する redo ログの時点まですべての DDL ステートメントを解析することで、この時点で存在したテーブル構造を再ビルドします。
このデータベーススキーマ履歴トピックは、内部コネクター専用となっています。オプションで、コネクターは コンシューマーアプリケーション向けの別のトピックにスキーマ変更イベントを送信する こともできます。
関連情報
- Debezium イベントレコードを受信する トピックのデフォルト名。
2.5.1.7. Debezium Oracle コネクターによるデータベーススキーマの変更の公開方法 リンクのコピーリンクがクリップボードにコピーされました!
Debezium Oracle コネクターは、データベース内のテーブルに適用される構造的な変更を記述するスキーマ変更イベントを生成するように設定できます。コネクターは、スキーマ変更イベントを <serverName> という名前の Kafka トピックに書き込みます。ここで、serverName は topic.prefix 設定プロパティーに指定した名前空間を指します。
Debezium は、新しいテーブルからデータをストリーミングするとき、またはテーブルの構造が変更されるたびに、新しいメッセージをスキーマ変更トピックに送信します。
コネクターがスキーマ変更トピックに送信するメッセージには、ペイロードと、任意で変更イベントメッセージのスキーマが含まれます。
スキーマ変更イベントのスキーマには、次の要素があります。
name- スキーマ変更イベントメッセージの名前。
type- 変更イベントメッセージのタイプ。
version- スキーマのバージョン。バージョンは整数で、スキーマが変更されるたびに増加します。
fields- 変更イベントメッセージに含まれるフィールド。
例: Oracle コネクターのスキーマ変更トピックのスキーマ
次の例は、JSON 形式の一般的なスキーマを示しています。
スキーマ変更イベントメッセージのペイロードには、以下の要素が含まれます。
ddl-
スキーマの変更につながる SQL
CREATE、ALTER、またはDROPステートメントを提供します。 databaseName-
ステートメントが適用されるデータベースの名前。
databaseNameの値は、メッセージキーとして機能します。 tableChanges-
スキーマの変更後のテーブルスキーマ全体の構造化表現。
tableChangesフィールドには、テーブルの各列のエントリーなどのアレイが含まれます。構造化された表現は JSON または Avro 形式でデータを表示するため、コンシューマーは DDL パーサーを介して最初にメッセージを処理しなくてもメッセージを簡単に読み取りできます。
デフォルトでは、コネクターは ALL_TABLES データベースビューを使用して、スキーマ履歴トピックに格納するテーブル名を識別します。そのビュー内で、コネクターは、データベースへの接続に使用するユーザーアカウントが使用できるテーブルからのみデータにアクセスできます。
スキーマ履歴トピックが異なるテーブルのサブセットを格納するように設定を変更できます。以下の方法のいずれかを使用して、トピックが保存するテーブルのセットを変更します。
-
Debezium がデータベースへのアクセスに使用するアカウントのパーミッションを変更し、別のテーブルセットが
ALL_TABLESビューに表示されるようにします。 -
コネクタープロパティー
schema.history.internal.store.only.captured.tables.ddlをtrueに設定します。
コネクターがテーブルをキャプチャーするように設定されている場合、テーブルのスキーマ変更の履歴は、スキーマ変更トピックだけでなく、内部データベーススキーマの履歴トピックにも格納されます。内部データベーススキーマ履歴トピックはコネクターのみの使用を対象としており、使用するアプリケーションによる直接使用を目的としていません。スキーマ変更に関する通知が必要なアプリケーションが、スキーマ変更トピックからの情報のみを使用するようにしてください。
データベーススキーマ履歴トピックをパーティションに分割しないでください。データベーススキーマ履歴トピックが正しく機能するには、コネクターが出力するイベントレコードの一貫したグローバル順序を維持する必要があります。
トピックがパーティション間で分割されないようにするには、以下のいずれかの方法を使用してトピックのパーティション数を設定します。
-
データベーススキーマ履歴トピックを手動で作成する場合は、パーティション数を
1に指定します。 -
Apache Kafka ブローカーを使用してデータベーススキーマ履歴トピックを自動的に作成する場合に、トピックが作成されるので、Kafka
num.partitions設定オプションの値を1に設定します。
例: Oracle コネクタースキーマ変更トピックに発行されたメッセージ
以下の例は、JSON 形式の一般的なスキーマ変更メッセージを示しています。メッセージには、テーブルスキーマの論理表現が含まれます。
| 項目 | フィールド名 | 説明 |
|---|---|---|
| 1 |
| コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。 ソースオブジェクトの ts_ms は、データベースで変更が行われた時刻を示す。payload.source.ts_ms の値を payload.ts_ms の値と比較することにより、ソースデータベースの更新と Debezium との間の遅延を判断できます。 |
| 2 |
| 変更が含まれるデータベースとスキーマを識別します。 |
| 3 |
| このフィールドには、スキーマの変更を行う DDL が含まれます。 |
| 4 |
| DDL コマンドによって生成されるスキーマの変更が含まれる 1 つ以上の項目の配列。 |
| 5 |
|
変更の種類を説明します。
|
| 6 |
|
作成、変更、または破棄されたテーブルの完全な識別子。テーブルの名前が変更されると、この識別子は |
| 7 |
| 適用された変更後のテーブルメタデータを表します。 |
| 8 |
| テーブルのプライマリーキーを設定する列のリスト。 |
| 9 |
| 変更されたテーブルの各列のメタデータ。 |
| 10 |
| 各テーブル変更のカスタム属性メタデータ。 |
コネクターがスキーマ変更トピックに送信するメッセージでは、メッセージキーはスキーマの変更が含まれるデータベースの名前です。以下の例では、payload フィールドに databaseName キーが含まれます。
2.5.1.8. トランザクション境界を表す Debezium Oracle コネクターによって生成されたイベント リンクのコピーリンクがクリップボードにコピーされました!
Debezium は、トランザクションメタデータ境界を表し、データ変更イベントメッセージを強化するイベントを生成できます。
Debezium は、コネクターのデプロイ後に発生するトランザクションに対してのみメタデータを登録し、受信します。コネクターをデプロイする前に発生するトランザクションのメタデータは利用できません。
データベーストランザクションは、キーワード BEGIN および END で囲まれたステートメントブロックによって表されます。Debezium は、すべてのトランザクションで BEGIN および END 区切り文字のトランザクション境界イベントを生成します。トランザクション境界イベントには以下のフィールドが含まれます。
status-
BEGINまたはEND id- 一意のトランザクション識別子の文字列表現。
ts_ms-
データソースでのトランザクション境界イベント (
BEGINまたはENDイベント) の時間。データソースから Debezium にイベント時間を渡されない場合、フィールドは代わりに Debezium がイベントを処理する時間を表します。 event_count(ENDイベント用)- トランザクションによって出力されるイベントの合計数。
data_collections(ENDイベント用)-
data_collectionとevent_count要素のペアの配列。これは、コネクターがデータコレクションから発信された変更に対して出力するイベントの数を示します。
以下の例は、典型的なトランザクション境界メッセージを示しています。
例: Oracle コネクタートランザクション境界イベント
topic.transaction オプションで上書きされない限り、コネクターはトランザクションイベントを <topic.prefix>.transaction トピックに出力します。
2.5.1.8.1. Debezium Oracle コネクターがトランザクションメタデータで変更イベントメッセージを強化する方法 リンクのコピーリンクがクリップボードにコピーされました!
トランザクションメタデータを有効にすると、データメッセージ Envelope は新しい transaction フィールドで強化されます。このフィールドは、複合フィールドの形式ですべてのイベントに関する情報を提供します。
id- 一意のトランザクション識別子の文字列表現。
total_order- トランザクションによって生成されたすべてのイベントを対象とするイベントの絶対位置。
data_collection_order- トランザクションによって出力されたすべてのイベントを対象とするイベントのデータコレクションごとの位置。
以下の例は、典型的なトランザクションのイベントメッセージを示しています。
LogMiner マイニングストラテジー
Oracle redo ログのエントリーには、DML の変更用に送信する元の SQL ステートメントは保存されません。代わりに、redo エントリーには、一連の変更ベクトルと、これらのベクトルに関連するテーブルスペース、テーブル、および列を表すオブジェクト識別子のセットが保持されます。つまり、redo ログエントリーには、DML 変更の影響を受けるスキーマ、テーブル、または列の名前は含まれません。
Debezium Oracle コネクターは、log.mining.strategy 設定プロパティーを使用して、Oracle LogMiner が変更ベクトル内のオブジェクト識別子の検索を処理する方法を制御します。状況によっては、スキーマの変更に関して、あるログマイニングストラテジーが他のログマイニングストラテジーよりも信頼性が高い場合があります。ただし、ログマイニングストラテジーを選択する前に、それがパフォーマンスとオーバーヘッドに及ぼす影響を考慮することが重要です。
REDO ログへのデータディクショナリーの書き込み
redo_log_catalog マイニングストラテジーは、各 redo ログスイッチの直後に、データディクショナリーのコピーを redo ログにフラッシュするようにデータベースに指示します。これは、データの変更と疎結合されたスキーマの変更を追跡する場合に最も信頼性の高いストラテジーです。これは、Oracle LogMiner には、一連の変更ベクトルでデータディクショナリーの開始状態と終了状態の間を補間する方法があるためです。
ただし、redo_log_catalog モードは、機能するためにいくつかの重要な手順を必要とするため、最もコストがかかります。まず、このモードでは、ログスイッチのたびにデータディクショナリーを REDO ログにフラッシュする必要があります。スイッチするたびにログをフラッシュすると、アーカイブログ内の貴重なスペースがすぐに消費され、アーカイブログの量が多すぎてデータベース管理者が想定した数を超える可能性があります。このモードを使用する場合は、データベース管理者と調整して、データベースが適切に設定されていることを確認してください。
redo_log_catalog モードを使用するようにコネクターを設定する場合は、複数の Debezium Oracle コネクターを使用して、同じ論理データベースからの変更をキャプチャーしないでください。
オンラインカタログの直接使用
デフォルトのストラテジーモードである online_catalog は、redo_log_catalog モードとは異なる動作をします。ストラテジーが online_catalog に設定されている場合、データベースはデータディクショナリーを REDO ログにフラッシュしません。代わりに、Oracle LogMiner は常に最新のデータディクショナリーの状態を使用して比較を実行します。このストラテジーでは、常に現在のディクショナリーを使用し、REDO ログへのフラッシュを排除することで、オーバーヘッドが少なくなり、より効率的に動作します。ただし、これらの利点は、疎結合されたスキーマの変更とデータの変更を解析できないため、相殺されます。その結果、このストラテジーではイベントが失敗する場合があります。
LogMiner がスキーマ変更後に SQL の信頼性を再構築できなかった場合は、REDO ログで原因を確認してください。OBJ# 123456 (番号はテーブルのオブジェクト識別子) などの名前のテーブルへの参照、または COL1 や COL2 などの名前の列を探します。online_catalog ストラテジーを使用するようにコネクターを設定する場合は、テーブルスキーマとそのインデックスが静的であり、変更されないことを確認するための手順を実行します。Debezium コネクターが online_catalog モードを使用するように設定されており、スキーマ変更を適用する必要がある場合は、次の手順を実行します。
- コネクターが既存のデータ変更 (DML) をすべてキャプチャーするまで待機します。
- スキーマ (DDL) の変更を実行し、コネクターが変更をキャプチャーするまで待機します。
- テーブルのデータ変更 (DML) を再開します。
この手順を実行することで、Oracle LogMiner がすべてのデータ変更に対して SQL を安全に再構築できるようになります。
ハイブリッドアプローチ
このストラテジーを有効にするには、log.mining.strategy 設定プロパティーの値を hybrid に設定します。このストラテジーの目的は、いずれのストラテジーの欠点も招くことなく、redo_log_catalog ストラテジーの信頼性と online_catalog ストラテジーのパフォーマンスおよび低いオーバーヘッドを提供することです。
hybrid ストラテジーは、主に online_catalog モードで動作し、つまり Debezium Oracle コネクターは最初にイベント再構築を Oracle LogMiner に委譲します。Oracle LogMiner が SQL を正常に再構築すると、Debezium は online_catalog ストラテジーを使用するように設定されているかのようにイベントを通常どおりに処理します。Oracle LogMiner が SQL を再構築できなかったことをコネクターが検出すると、コネクターはそのテーブルオブジェクトのスキーマ履歴を使用して、SQL を直接再構築しようとします。コネクターは、Oracle LogMiner とコネクターの両方が SQL を再構築できない場合にのみ失敗を報告します。
lob.enabled プロパティーが true に設定されている場合は、hybrid マイニングストラテジーを使用できません。CLOB、BLOB、または XML データのストリーミングが必要な場合は、online_catalog または redo_log_catalog ストラテジーのみを使用できます。
クエリーモード
Debezium Oracle コネクターは、デフォルトで Oracle LogMiner と統合されます。このインテグレーションには、トランザクションログに記録された変更を変更イベントとして取り込むための複雑な JDBC SQL クエリーの生成など、特殊な一連の手順が必要です。JDBC SQL クエリーで使用される V$LOGMNR_CONTENTS ビューには、クエリーのパフォーマンスを向上させるための索引がありません。そのため、クエリーの実行を改善する方法として、SQL クエリーの生成方法を制御するさまざまなクエリーモードを使用できます。
log.mining.query.filter.mode コネクタープロパティーを次のいずれかで設定して、JDBC SQL クエリーの生成方法を変更できます。
none-
(デフォルト) このモードでは、データベースレベルでの挿入、更新、削除などのさまざまな操作タイプに基づいてフィルタリングのみを行う JDBC クエリーが作成されます。スキーマ、テーブル、またはユーザー名の包含/除外リストに基づいてデータをフィルター処理する場合、これはコネクター内の処理ループ中に行われます。
このモードは、多くの場合、変更があまり多くないデータベースから少数のテーブルをキャプチャーする場合に役立ちます。生成されるクエリーは非常に単純で、データベースのオーバーヘッドを低く抑えてできるだけ早く読み取ることに主に重点を置いています。 in-
このモードでは、データベースレベルの操作タイプだけでなく、スキーマ、テーブル、およびユーザー名の包含/除外リストもフィルタリングする JDBC クエリーが作成されます。クエリーの述語は、包含/除外リスト設定プロパティーで指定された値に基づいて SQL in 句を使用して生成されます。
このモードは通常、変更が過度に存在するデータベースから多数のテーブルをキャプチャーする場合に役立ちます。生成されるクエリーはnoneモードよりもはるかに複雑で、ネットワークオーバーヘッドを削減し、データベースレベルで可能な限り多くのフィルタリングを実行することに重点を置いています。
最後に、スキーマおよびテーブルの包含/除外設定プロパティーの一部として正規表現を 指定しないでください。正規表現を使用すると、コネクターがこれらの設定プロパティーに基づく変更と一致しなくなり、変更が失われる原因になります。 regex-
このモードでは、データベースレベルの操作タイプだけでなく、スキーマ、テーブル、およびユーザー名の包含/除外リストもフィルタリングする JDBC クエリーが作成されます。ただし、
inモードとは異なり、このモードでは、値が含まれるか除外されるかに応じて結合または論理和を使用する OracleREGEXP_LIKE演算子を使用して SQL クエリーを生成します。
このモードは、少数の正規表現を使用して識別できる可変数のテーブルをキャプチャーする場合に便利です。生成されるクエリーは他のモードよりもはるかに複雑で、ネットワークオーバーヘッドを削減し、データベースレベルで可能な限り多くのフィルタリングを実行することに重点を置いています。
2.5.1.9. Debezium Oracle コネクターのイベントバッファリング使用方法 リンクのコピーリンクがクリップボードにコピーされました!
Oracle は、後でロールバックによって破棄された変更を含め、発生した順序で redo ログにすべての変更を書き込みます。その結果、別のトランザクションからの同時変更はインターットアンドされます。コネクターが最初に変更ストリームを読み取ると、どの変更がコミットまたはロールバックされるかをすぐに判断できないため、変更イベントは内部バッファーに一時的に保存されます。変更がコミットされると、コネクターは変更イベントをバッファーから Kafka に書き込みます。コネクターはロールバックによって破棄される変更イベントを破棄します。
プロパティー log.mining.buffer.type を設定することにより、コネクターが使用するバッファリングメカニズムを設定できます。
ヒープ
デフォルトのバッファータイプは memory を使用して設定されます。デフォルトの memory 設定では、コネクターは JVM プロセスのヒープメモリーを使用してバッファーイベントレコードを割り当て、管理します。memory バッファー設定を使用する場合は、Java プロセスに割り当てるメモリー量が、お使いの環境で長時間実行されるトランザクションや大規模トランザクションに対応することができることを確認してください。
2.5.1.10. Debezium Oracle コネクターが SCN 値のギャップを検出する方法 リンクのコピーリンクがクリップボードにコピーされました!
Debezium Oracle コネクターが LogMiner を使用するよう設定されると、システム変更番号 (SCN) に基づく開始範囲と終了範囲を使用して、Oracle から変更イベントを収集します。コネクターはこの範囲を自動的に管理し、コネクターがほぼリアルタイムで変更を流せるか、データベース内の大規模またはバルクトランザクションの量によって変更のバックログを処理しなければならないかに応じて、範囲を増減させます。
特定の状況下で、Oracle データベースは SCN 値を一定の割合で増加させるのではなく、異常に高い割合で SCN を前進させます。このような SCN 値のジャンプは、特定のインテグレーションがデータベースと対話する方法やホットバックアップなどのイベントの結果により発生する可能性があります。
Debezium Oracle コネクターは、以下の設定プロパティーに依存して SCN ギャップを検出し、マイニング範囲を調整します。
log.mining.scn.gap.detection.gap.size.min- ギャップの最小サイズを指定します。
log.mining.scn.gap.detection.time.interval.max.ms- 最大間隔を指定します。
コネクターは最初に、現在のマイニング範囲で現在の SCN と最大 SCN との間の変更数の違いを比較します。現在の SCN 値と最高 SCN 値の差が最小ギャップサイズより大きい場合、コネクターは SCN ギャップを潜在的に検出していることになる。ギャップが存在するかどうかを確認するために、コネクターは次に前のマイニング範囲の最後に現在の SCN および SCN のタイムスタンプを比較します。タイムスタンプの違いが最大間隔未満の場合、SCN ギャップの存在が確認されます。
SCN ギャップが発生すると、Debezium コネクターは現在のマイマイセッションの範囲のエンドポイントとして現在の SCN を自動的に使用します。これにより、SCN 値が予期せず増加するため、コネクターは変更を返す間で小規模な範囲を減らさずにリアルタイムイベントを迅速にキャッチできます。コネクターが SCN ギャップに応答して前述の手順を実行すると、log.mining.batch.size.max プロパティーで指定された値を無視します。コネクターがマイニングセッションを終了し、リアルタイムのイベントに追いついた後、最大ログマイニングバッチサイズの実施を再開します。
SCN ギャップ検出は、コネクターが動作していて、ほぼリアルタイムでイベントを処理しているときに、大きな SCN 増分が発生した場合にのみ有効です。
2.5.1.11. Debezium は、変更頻度の低いデータベースでオフセットをどのように管理するか ? リンクのコピーリンクがクリップボードにコピーされました!
Debezium Oracle コネクターは、コネクターオフセットでシステム変更番号を追跡するので、コネクターを再起動したときに、中断したところから開始することができます。これらのオフセットは、発行された各変更イベントの一部です。ただし、データベース変更の頻度が低い場合 (数時間または数日ごと)、オフセットが古くなり、トランザクションログでシステム変更番号が利用できなくなると、コネクターの再起動が正常に行われなくなる可能性があります。
Oracle への接続に非 CDB モードを使用するコネクターの場合、オフセットの同期を維持するために、コネクターが一定間隔でハートビートイベントを発信するように強制するために heartbeat.interval.ms を有効にすると、コネクターが定期的にハートビートイベントを発行するようになり、オフセットの同期が維持されます。
Oracle との接続に CDB モードを使用するコネクターの場合、Synchronization のメンテナンスはより複雑になります。heartbeat.interval.ms を設定する必要があるだけでなく heartbeat.interval.ms を設定する必要があります。CDB モードでは、コネクターは PDB 内の変更のみを追跡するため、両方のプロパティーを指定する必要があります。プラガブルデータベース内から変更イベントをトリガーするための補足的なメカニズムが必要である。一定の間隔で、ハートビートアクションクエリーがコネクターに新しいテーブル行を挿入したり、プラガブルデータベースの既存の行を更新したりします。Debezium は、テーブルの変更を検知し、変更イベントを発行することで、変更処理の頻度が低いプラガブルデータベースでもオフセットの同期を確保します。
コネクターのユーザーアカウント が所有していないテーブルで heartbeat.action.query を使用するには、コネクターのユーザーにそれらのテーブルで必要な INSERT または UPDATE クエリーを実行する権限を付与する必要があります。
2.5.2. Debezium Oracle コネクターのデータ変更イベントの説明 リンクのコピーリンクがクリップボードにコピーされました!
Oracle コネクターが出力する全データ変更イベントにはキーと値があります。キーと値の構造は、変更イベントの発生元となるテーブルによって異なります。Debezium のトピック名を構築する方法は、トピック名 を参照してください。
Debezium Db2 コネクターは、すべての Kafka Connect スキーマ名 が Avro スキーマ名の形式 に準拠するようにします。つまり、論理サーバー名はアルファベット文字またはアンダースコア ([a-z,A-Z,_]) で始まり、論理サーバー名の残りの文字と、スキーマおよびテーブル名の残りの文字は英数字またはアンダースコア ([a-z,A-Z,0-9,\_]) で始まる必要があります。コネクターは無効な文字をアンダースコアに自動的に置き換えます。
複数の論理サーバー名、スキーマ名、またはテーブル名の中で区別ができる文字のみが無効な場合に、アンダースコアに置き換えられると、命名で予期しない競合が発生する可能性があります。
Debezium および Kafka Connect は、イベントメッセージの継続的なストリーム を中心として設計されています。ただし、これらのイベントの構造は時間の経過とともに変化する可能性があり、トピックコンシューマーによる処理が困難になることがあります。変更可能なイベント構造の処理を容易にするため、Kafka Connect の各イベントは自己完結型となっています。すべてのメッセージキーと値には、スキーマ と ペイロード の 2 つの部分で設定されます。スキーマはペイロードの構造を記述しますが、ペイロードには実際のデータが含まれます。
SYS、SYSTEM ユーザーアカウントが加える変更は、コネクターではキャプチャーされません。
データ変更イベントの詳細は、以下を参照してください。
2.5.2.1. Debezium Oracle コネクター変更イベントのキー リンクのコピーリンクがクリップボードにコピーされました!
変更されたテーブルごとに変更イベントキーは、イベントの作成時にテーブルのプライマリーキー (または一意のキー制約) の各コラムにフィールドが存在するように設定されます。
たとえば、inventory データベーススキーマに定義されている customers テーブルには、以下の変更イベントキーが含まれる場合があります。
<topic.prefix>.transaction 設定プロパティーの値が server1 に設定されている場合は、データベースの customers テーブルで発生するすべての変更イベントの JSON 表現には以下のキー構造が使用されます。
キーの スキーマ 部分には、キーの部分の内容を記述する Kafka Connect スキーマが含まれます。上記の例では、payload 値はオプションではなく、構造は server1.DEBEZIUM.CUSTOMERS.Key という名前のスキーマで定義され、タイプ int32 の id という名前の必須フィールドが 1 つあります。キーの payload フィールドの値から、id フィールドが 1 つでその値が 1004 の構造 (JSON では単なるオブジェクト) であることが分かります。
つまり、このキーは、id プライマリーキーの列にある値が 1004 の、inventory.customers テーブルの行 (名前が server1 のコネクターからの出力) を記述していると解釈できます。
2.5.2.2. Debezium Oracle 変更イベントの値 リンクのコピーリンクがクリップボードにコピーされました!
変更イベントメッセージの値の構造は、メッセージの変更イベントの メッセージキーの構造 をミラーリングし、スキーマ セクションと ペイロード セクションの両方が含まれます。
変更イベント値のペイロード
変更イベント値のペイロードセクションの エンベロープ 構造には、以下のフィールドが含まれます。
op-
操作のタイプを記述する文字列値が含まれる必須フィールド。Oracle コネクターの変更イベント値のペイロードの
opフィールドには、c(作成または挿入)、u(更新)、d(delete)、またはr(スナップショットを示す read) のいずれかの値が含まれます。 before-
任意のフィールド。存在する場合は、イベント発生 前 の行の状態を記述します。この構造は、
server1.INVENTORY.CUSTOMERS.ValueKafka Connect スキーマによって記述され、server1コネクターはinventory.customersテーブルのすべての行に使用します。
after-
オプションのフィールドで、存在する場合は、変更が発生した 後 の行の状態が含まれます。構造は、
beforeフィールドに使用されるserver1.INVENTORY.CUSTOMERS.ValueKafka Connect スキーマによって記述されます。 sourceイベントのソースメタデータを記述する構造体を含む必須フィールド。Oracle コネクターの場合、構造には以下のフィールドが含まれます。
- Debezium のバージョン。
- コネクター名。
- イベントが進行中のスナップショットの一部であるかどうか。
- トランザクション ID (スナップショットには含まれない)。
- 変更がコミットされたときにデータベースが割り当てるシステム変更番号 (SCN) に関連する以下の値。
|
| データベースがトランザクションを追跡する際に使用する一意の識別子。 |
|
| トランザクションの開始時の SCN。 |
|
| トランザクションの開始時刻。 |
|
| トランザクションがコミットされた時刻。
|
|
| タイムスタンプをミリ秒単位で提供します。 |
|
| タイムスタンプをマイクロ秒単位で提供します。 |
|
| タイムスタンプをナノ秒単位で提供します。
|
|
| 任意のフィールド。存在する場合は、コネクターがイベントを処理した時間 (Kafka Connect タスクを実行する JVM のシステムクロックがベース) が含まれます。 |
変更イベント値のスキーマ
イベントメッセージの値の スキーマ 部分には、ペイロードのエンベロープ構造と、その中のネストされたフィールドを記述するスキーマが含まれます。
変更イベント値の詳細は、以下を参照してください。
create イベント
次の例は、イベントキーの変更 例で説明した customers テーブルの create イベントの値を示しています。
上記の例では、イベントが以下のスキーマを定義する方法に注目してください。
-
エンベロープ (
server1.DEBEZIUM.CUSTOMERS.Envelope)。 -
ソース構造 (io.debezium.connector.oracle.Source、Oracle コネクターに固有で、すべてのイベントで再利用)。 -
beforeフィールドおよびafterフィールドのテーブル固有のスキーマ。
before フィールドおよび after フィールドのスキーマ名は <logicalName>.<schemaName>.<tableName>.Value の形式であるため、他のすべてのテーブルのスキーマからは完全に独立しています。その結果、Avro コンバーター を使用した場合、各論理ソースのテーブルの Avro スキーマは、それぞれ独自に進化し、独自の履歴を持つことになります。
このイベントの 値 の payload 部分には、イベントに関する情報が含まれます。行が作成された (op=c) ことを術士、および after フィールドの値に、行の ID、FIRST_NAME、LAST_NAME、および EMAIL 列に挿入された値が含まれていることを表しています。
デフォルトでは、イベントの JSON 表現はそれが記述する行よりもはるかに大きいように見えることがあります。サイズが大きいのは、JSON 表現がメッセージのスキーマ部分とペイロード部分の両方を含んでいるためです。Avro コンバーター を使用すると、コネクターが Kafka トピックに記述するメッセージのサイズを小さくすることができます。
更新 イベント
次の例は、前の create イベントと同じテーブルからコネクターが捕捉した update change イベントを示しています。
ペイロードは create (挿入) イベントのペイロードと同じですが、以下の値は異なります。
-
opフィールドの値がuであり、この行が更新により変更されたことを示す。 -
beforeフィールドは、updateデータベースのコミット前に存在する値が含まれる行の以前の状態を表示します。 -
afterフィールドは行の更新状態を示しており、EMAILの値は現在anne@example.comに設定されています。 -
sourceフィールドの構造は以前と同じフィールドを含みますが、コネクターが REDO ログ内の異なる位置からイベントをキャプチャしたため、値は異なります。 -
ts_msフィールドは、Debezium がいつイベントを処理したかを示すタイムスタンプを示す。
payload セクションでは、他に有用な情報を複数示しています。たとえば、before と after の構造を比較して、コミットの結果として行がどのように変更されたかを判断できます。ソース 構造で、この変更の記録に関する情報がわかるので、追跡が可能です。また、このトピックや他のトピックの他のイベントと関連して、このイベントが発生した場合に、見識を得ることができます。これは、別のイベントと同じコミットの前、後、または一部として発生していましたか ?
行のプライマリーキー/一意キーの列を更新すると、行のキーの値が変更されます。その結果、Debezium はこのような更新後に 3 つのイベントを出力します。
-
DELETEイベント。 - 行のキーが古い tombstone イベント
-
行に新しいキーを提供する
INSERTイベント。
delete イベント
次の例は、先の create と update のイベント例で示したテーブルの delete イベントを示しています。delete イベントの schema 部分は、これらのイベントの schema 部分と同一です。
イベントの payload 部分は、create または update イベントのペイロードと比較して、いくつかの違いを示しています。
-
opフィールドの値はdであり、行が削除されたことを意味します。 -
beforeフィールドは、データベースのコミットで削除された行の旧状態を示します。 -
afterフィールドの値がnull場合、その行はもはや存在しないことを意味します。 -
sourceフィールドの構造には、create または update イベントに存在する多くのキーが含まれるが、ts_ms、scn、txIdフィールドの値は異なっている。 -
ts_msは、Debezium がこのイベントを処理したタイミングを示すタイムスタンプを示します。
delete イベントは、コンシューマーがこの行の削除を処理するために必要な情報を提供する。
Oracle コネクターのイベントは、Kafka ログコンパクション と連携するように設計されており、すべてのキーで最新のメッセージが保持される限り、古いメッセージを削除できます。これにより、トピックに完全なデータセットが含まれ、キーベースの状態のリロードに使用できるようにするとともに、Kafka がストレージ領域を確保できるようにします。
行が削除されると、Kafka は同じキーを使用する以前のメッセージをすべて削除できるため、前の例で示された delete イベント値は、ログ圧縮でまだ機能します。同じキーを共有する メッセージをすべて 削除するように Kafka に指示するには、メッセージの値を null に設定する必要があります。これを可能にするにはデフォルトで、Debezium Oracle コネクターは、値が null 以外で同じキーを持つ特別な 廃棄 イベントが含まれる 削除 イベントに従います。コネクタープロパティー tombstones.on.delete を設定すると、デフォルトの動作を変更できます。
truncate イベント
truncate 変更イベントは、テーブルが切り捨てられたことを通知します。この場合のメッセージキーは null で、メッセージの値は以下のようになります。
| 項目 | フィールド名 | 説明 |
|---|---|---|
| 1 |
|
イベントのソースメタデータを記述する必須のフィールド。truncate イベント値の
|
| 2 |
|
操作の型を記述する必須の文字列。 |
| 3 |
|
コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。
|
1 つの TRUNCATE 操作が複数テーブルに影響する場合、コネクターは切り捨てられたテーブルごとに 1 つの truncate 変更イベントレコードを出力します。
truncate イベントは、テーブル全体に対して加えられ、メッセージキーを持たない変更を表します。その結果、複数のパーティションがあるトピックでは、変更イベント (create、update など)、またはテーブルに関連する truncate イベントについて、順序は保証されません。たとえば、コンシューマーが複数のパーティションからテーブルのイベントを読み取る場合、別のパーティションからテーブル内のすべてのデータを削除する truncate イベントを受信した後、あるパーティションからテーブルの update イベントを受け取る可能性があります。順序は、単一のパーティションを使用するトピックでのみ保証されます。
コネクターに truncate イベントをキャプチャーさせたくない場合は、skipped.operations オプションを使用して除外します。
2.5.3. Debezium Oracle コネクターによるデータ型のマッピング方法 リンクのコピーリンクがクリップボードにコピーされました!
Debezium Oracle コネクターは、テーブル行の値の変化を検出すると、その変化を表す change イベントを発行します。各変更イベントレコードは、元のテーブルと同じように構造化されており、イベントレコードには列値ごとにフィールドが含まれます。テーブル列のデータ型は、以下のセクションの表に示すように、コネクターが変更イベントフィールドで列の値をどのように表現するかを決定します。
テーブルの各列に対して、Debezium はソースデータ型を対応するイベントフィールドの リテラル型、場合によっては セマンティック型 にマッピングします。
- リテラル型
-
以下のカフカコネクトスキーマタイプのいずれかを使用して、値が文字通りどのように表現されるかを記述します。
INT8、INT16、INT32、INT64、FLOAT32、FLOAT64、BOOLEAN、STRING、BYTES、ARRAY、MAP、およびSTRUCT。 - セマンティック型
- フィールドの Kafka Connect スキーマの名前を使用して、Kafka Connect スキーマがフィールドの 意味 をキャプチャーする方法を記述します。
デフォルトのデータ型変換が要件に合わない場合は、コネクター用の カスタムコンバーターの作成 が可能です。
一部の Oracle ラージオブジェクト (CLOB、NCLOB、BLOB) および数値データ型については、デフォルトの設定プロパティー設定を変更することにより、コネクターがタイプマッピングを実行する方法を操作することができます。Debezium プロパティーがこれらのデータ型のマッピングをどのように制御するかの詳細は、Binary and Character LOB types および Numeric types を参照してください。
Debezium コネクターによる Oracle データ型のマッピング方法に関する詳細は、以下を参照してください。
文字タイプ
以下の表は、コネクターによる基本の文字タイプへのマッピング方法を説明しています。
| Oracle データ型 | リテラル型 (スキーマ型) | セマンティック型 (スキーマ名) および注記 |
|---|---|---|
|
|
| 該当なし |
|
|
| 該当なし |
|
|
| 該当なし |
|
|
| 該当なし |
|
|
| 該当なし |
バイナリーおよび文字の LOB 型
以下の表は、コネクターによるバイナリーおよび文字 LOB (Large Object) 型へのマッピング方法を説明しています。
| Oracle データ型 | リテラル型 (スキーマ型) | セマンティック型 (スキーマ名) および注記 |
|---|---|---|
|
| 該当なし | このデータ型はサポートされていません。 |
|
|
|
コネクター設定の
|
|
|
| 該当なし |
|
| 該当なし | このデータ型はサポートされていません。 |
|
| 該当なし | このデータ型はサポートされていません。 |
|
|
| 該当なし |
|
| 該当なし |
コネクター設定の
|
Oracle は、CLOB、NCLOB、および BLOB データ型の列値を、SQL ステートメントで明示的に設定または変更された場合にのみ供給します。その結果、変更イベントには、変更されていない CLOB、NCLOB、または BLOB 列の値が含まれることはありません。代わりに、コネクタープロパティー unavailable.value.placeholder で定義されているプレースホルダーが含まれます。
CLOB、NCLOB、または BLOB 列の値が更新されると、対応する更新変更イベントの after 要素に新しい値が追加されます。before 要素には使用できない値プレースホルダーが含まれます。
数字型
以下の表は、Debezium Oracle コネクターによる数値型のマッピング方法を説明しています。
コネクターの decimal.handling.mode 設定プロパティーの値を変更することで、コネクターが Oracle DECIMAL、NUMBER、NUMERIC、および REAL データ型をマッピングする方法を変更できます。このプロパティーをデフォルト値の precise に設定すると、コネクターは、表に示すように、これらの Oracle データ型を Kafka Connect org.apache.kafka.connect.data.Decimal 論理型にマッピングします。プロパティーの値を double または string に設定すると、コネクターは一部の Oracle データ型に別のマッピングを使用します。詳細は、以下の表の セマンティックタイプおよび注意 事項の欄を参照してください。
| Oracle データ型 | リテラル型 (スキーマ型) | セマンティック型 (スキーマ名) および注記 |
|---|---|---|
|
|
| 該当なし |
|
|
| 該当なし |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
スケールが 0 の
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
前述したように、Oracle では NUMBER 型において負のスケールが可能です。これが原因で、数値が Decimal として表示される場合に、Avro 形式への変換中に問題が発生する可能性があります。10 進数 タイプにはスケール情報が含まれますが、Avro 仕様 ではスケールの正の値しか使用できません。使用されるスキーマレジストリーによっては、Avro シリアル化に失敗する場合があります。この問題を回避するには、NumberToZeroScaleConverter を使用できます。これは、スケールが負で、十分に大きな数値 (P - S >= 19) をスケール 0 の Decimal 型に変換します。以下のように設定できます。
converters=zero_scale zero_scale.type=io.debezium.connector.oracle.converters.NumberToZeroScaleConverter zero_scale.decimal.mode=precise
converters=zero_scale
zero_scale.type=io.debezium.connector.oracle.converters.NumberToZeroScaleConverter
zero_scale.decimal.mode=precise
デフォルトでは、数値は Decimal 型 (zero_scale.decimal.mode=precise) に変換されますが、サポート対象の残りの 2 つの型 (double と string) もサポートすることですべてに対応します。
ブール値型
Oracle は、BOOLEAN データ型のネイティブサポートを提供しません。ただし、論理 BOOLEAN データ型の概念をシミュレートするために、特定のセマンティクスと他のデータ型を使用することが一般的です。
ソース列をブールデータ型に変換できるように、Debezium は NumberOneToBooleanConverter カスタムコンバーター を提供しています。これは、以下のいずれかの方法で使用できます。
-
すべての
NUMBER(1)列をBOOLEANタイプにマッピングします。 正規表現のコンマ区切りリストを使用して、列のサブセットを列挙します。
このタイプの変換を使用するには、以下の例のようにselectorパラメーターを使用してconverters設定プロパティーを設定する必要があります。converters=boolean boolean.type=io.debezium.connector.oracle.converters.NumberOneToBooleanConverter boolean.selector=.*MYTABLE.FLAG,.*.IS_ARCHIVED
converters=boolean boolean.type=io.debezium.connector.oracle.converters.NumberOneToBooleanConverter boolean.selector=.*MYTABLE.FLAG,.*.IS_ARCHIVEDCopy to Clipboard Copied! Toggle word wrap Toggle overflow
時間型
Oracle INTERVAL、TIMESTAMP WITH TIME ZONE、および TIMESTAMP WITH LOCAL TIME ZONE データ型以外では、コネクターが時間型を変換する方法は time.precision.mode 設定プロパティーの値に依存します。
time.precision.mode 設定プロパティーが adaptive (デフォルト) に設定された場合、コネクターは列のデータ型を基に時間型のリテラルおよびセマンティック型を決定し、イベントが 正確 にデータベースの値を表すようにします。
| Oracle データ型 | リテラル型 (スキーマ型) | セマンティック型 (スキーマ名) および注記 |
|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
time.precision.mode 設定プロパティーが connect に設定された場合、コネクターは事前定義された Kafka Connect の論理型を使用します。これは、コンシューマーが組み込みの Kafka Connect の論理型のみを認識し、可変精度の時間値を処理できない場合に便利です。Oracle がサポートする精度レベルは、Kafka Connect サポートの論理型を超過するため、time.precision.mode を connect に設定していて、データベース列の fractional second precision の値が 3 より大きい場合には a loss of precision という結果になります。
| Oracle データ型 | リテラル型 (スキーマ型) | セマンティック型 (スキーマ名) および注記 |
|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ROWID タイプ
次の表は、コネクターが ROWID (行アドレス) データ型をどのようにマッピングするかを説明したものです。
| Oracle データ型 | リテラル型 (スキーマ型) | セマンティック型 (スキーマ名) および注記 |
|---|---|---|
|
|
| Oracle XStream を使用する場合、このデータ型はサポートされません。 |
|
| 該当なし | このデータ型はサポートされていません。 |
Debezium Oracle コネクターでの XMLTYPE の使用は、テクノロジープレビュー機能のみとなっています。テクノロジープレビュー機能は、Red Hat 製品のサービスレベルアグリーメント (SLA) の対象外であり、機能的に完全ではないことがあります。Red Hat は、実稼働環境でこれらを使用することを推奨していません。テクノロジープレビュー機能は、最新の製品機能をいち早く提供して、開発段階で機能のテストを行い、フィードバックを提供していただくことを目的としています。Red Hat のテクノロジープレビュー機能のサポート範囲に関する詳細は、テクノロジープレビュー機能のサポート範囲 を参照してください。
以下の表は、コネクターによる XMLTYPE データ型へのマッピング方法を説明しています。
| Oracle データ型 | リテラル型 (スキーマ型) | セマンティック型 (スキーマ名) および注記 |
|---|---|---|
|
|
|
|
ユーザー定義のタイプ
オラクルでは、組み込みのデータ型では要件を満たせない場合に、カスタムデータ型を定義して柔軟性を持たせることができます。オブジェクト型、REF データ型、Varrays、Nested Tables などのユーザー定義型があります。現時点では、Debezium Oracle コネクターをこれらのユーザー定義タイプで使用することはできません。
Oracle によって提供されたタイプ
Oracle は、組み込み型や ANSI でサポートされている型では不十分な場合に、新しい型を定義するために使用できる SQL ベースのインタフェースを提供しています。Oracle は、Any 型や Spatial 型など、幅広い目的に対応するために一般的に使用されるデータ型をいくつか提供しています。現時点では、Debezium Oracle コネクターはこれらのデータ型では使用できません。
デフォルト値
データベーススキーマの列にデフォルト値が指定されている場合、Oracle コネクターはこの値を対応する Kafka レコードフィールドのスキーマに伝搬させようと試みます。ほとんどの一般的なデータ型がサポートされています。
-
文字型 (
CHAR、NCHAR、VARCHAR、VARCHAR2、NVARCHAR、NVARCHAR2) -
数値型 (
INTEGER、NUMERIC、など) -
時間型 (
DATE、TIMESTAMP、INTERVALなど)。
一時的なタイプが TO_TIMESTAMP や TO_DATE などの関数呼び出しを使用してデフォルト値を表す場合、コネクターは関数を評価するために追加のデータベース呼び出しを行うことでデフォルト値を解決します。たとえば、DATE 列が TO_DATE('2021-01-02', 'YYYY-MM-DD') というデフォルト値で定義されている場合、その列のデフォルト値はその日付の UNIX エポックからの日数、この場合は 18629 となります。
一時的な型がデフォルト値を表すために SYSDATE 定数を使用する場合、コネクターは、列が NOT NULL または NULL として定義されているかどうかに基づいてこれを解決します。列が NULL を指定可能な場合、デフォルト値は設定されません。ただし、列に NULL を指定できない場合、デフォルト値は 0 (DATE または TIMESTAMP(n) データ型の場合) または 1970-01-01T00:00:00Z (TIMESTAMP WITH TIME ZONE または TIMESTAMP WITH LOCAL TIME ZONE データ型の場合) のいずれかに解決されます。デフォルトの値のタイプは数値です。ただし、列が TIMESTAMP WITH TIME ZONE または TIMESTAMP WITH LOCAL TIME ZONE の場合は文字列として出力されます。
カスタムコンバーター
デフォルトでは、Debezium Oracle コネクターは、Oracle データ型に固有の CustomConverter 実装をいくつか提供します。これらのカスタムコンバーターは、コネクター設定に基づいて特定のデータ型に対する代替マッピングを提供します。コネクターに CustomConverter を追加するには、カスタムコンバーターのドキュメント の指示に従ってください。
Debezium Oracle コネクターは、次のカスタムコンバーターを提供します。
NUMBER(1) をブール値に変換する
Oracle データベースバージョン 23 以降では、BOOLEAN の論理データ型を使用できます。以前のバージョンでは、データベースは、False の場合は値 0、True の場合は値 1 に制約するような、NUMBER(1) データ型を使用して BOOLEAN 型をシミュレートしていました。
デフォルトでは、Debezium は NUMBER(1) データ型を使用するソース列の変更イベントを発行すると、データを INT8 リテラル型に変換します。NUMBER(1) データ型のデフォルトマッピングで要件が満たされない場合は、次の例に示すように、NumberOneToBooleanConverter を設定することで、これらの列を発行するときに論理 BOOL 型を使用するようにコネクターを設定できます。
例: NumberOneToBooleanConverter の設定
converters=number-to-boolean number-to-boolean.type=io.debezium.connector.oracle.converters.NumberOneToBooleanConverter number-to-boolean.selector=.*.MY_TABLE.DATA
converters=number-to-boolean
number-to-boolean.type=io.debezium.connector.oracle.converters.NumberOneToBooleanConverter
number-to-boolean.selector=.*.MY_TABLE.DATA
前の例では、selector プロパティーはオプションです。selector プロパティーは、コンバーターを適用するテーブルまたは列を指定する正規表現を指定します。セレクター プロパティーを省略すると、Debezium がイベントを発行するときに、NUMBER(1) データ型のすべての列が論理 BOOL 型を使用するフィールドに変換されます。
NUMBER の小数点部分を切り捨て/切り上げする
Oracle は、負 (つまり NUMBER(-2)) の NUMBER ベースの列の作成をサポートしています。すべてのシステムが負の値を処理できるわけではないため、これらの値が原因でパイプラインで処理上の問題が発生する可能性があります。たとえば、Apache Avro はこれらの値をサポートしていないため、Debezium がイベントを Avro 形式に変換すると問題が発生する可能性があります。同様に、これらの値をサポートしていないダウンストリームコンシューマーでもエラーが発生する可能性があります。
設定例
converters=number-zero-scale number-zero-scale.type=io.debezium.connector.oracle.converters.NumberToZeroScaleConverter number-zero-scale.decimal.mode=precise
converters=number-zero-scale
number-zero-scale.type=io.debezium.connector.oracle.converters.NumberToZeroScaleConverter
number-zero-scale.decimal.mode=precise
前の例では、decimal.mode プロパティーはコネクターが小数値を出力する方法を指定します。このプロパティーは任意です。decimal.mode プロパティーを省略すると、コンバーターはデフォルトで PRECISE 小数点処理モードを使用します。
RAW から文字列に変換する
Oracle では RAW などの特定のデータ型の使用は推奨されていませんが、レガシーシステムではそのような型が引き続き使用される可能性があります。デフォルトでは、Debezium は RAW 列タイプを論理 BYTES として出力します。これは、バイナリーまたはテキストベースのデータの保存を可能にするタイプです。
場合によっては、RAW 列に文字データが一連のバイトとして格納されることがあります。コンシューマーによる消費を容易にするために、RawToStringConverter を使用するように Debezium を設定できます。RawToStringConverter は、このような RAW 列を簡単にターゲットにして、値をバイトではなく文字列として出力できるようにします。以下の例は、RawToStringConverter をコネクター設定に追加する方法を示しています。
例: RawToStringConverter の設定
converters=raw-to-string raw-to-string.type=io.debezium.connector.oracle.converters.RawToStringConverter raw-to-string.selector=.*.MY_TABLE.DATA
converters=raw-to-string
raw-to-string.type=io.debezium.connector.oracle.converters.RawToStringConverter
raw-to-string.selector=.*.MY_TABLE.DATA
前の例では、selector プロパティーを使用すると、コンバーターが処理するテーブルまたは列を指定する正規表現を定義できます。selector プロパティーを省略すると、コンバーターはすべての RAW 列タイプを論理 STRING フィールドタイプにマップします。
2.5.4. Debezium と連携させるための Oracle の設定 リンクのコピーリンクがクリップボードにコピーされました!
Debezium Oracle コネクターと連携するように Oracle を設定するには、以下の手順が必要です。以下の手順では、コンテナーデータベースと少なくとも 1 つのプラグ可能なデータベースでマルチテナンシーの設定を使用することを前提としています。マルチテナント設定を使用しない場合は、以下の手順を調整する必要がある場合があります。
Debezium コネクターと使用するために Oracle を設定する場合の詳細は、以下を参照してください。
2.5.4.1. Oracle インストールタイプとの Debezium Oracle コネクターの互換性 リンクのコピーリンクがクリップボードにコピーされました!
Oracle データベースは、スタンドアロンインスタンスまたは Oracle Real Application Cluster (RAC) を使用してインストールできます。Debezium Oracle コネクターはどちらのタイプのインストールとも互換性があります。
2.5.4.2. 変更イベントをキャプチャーする際に Debezium Oracle コネクターが除外するスキーマ リンクのコピーリンクがクリップボードにコピーされました!
Debezium Oracle コネクターがテーブルをキャプチャーすると、以下のスキーマからテーブルが自動的に除外されます。
-
appqossys -
audsys -
ctxsys -
dvsys -
dbsfwuser -
dbsnmp -
qsmadmin_internal -
lbacsys -
mdsys -
ojvmsys -
olapsys -
orddata -
ordsys -
outln -
sys -
system -
vecsys(Oracle 23+) -
wmsys -
xdb
コネクターがテーブルからの変更をキャプチャできるようにするには、そのテーブルが前述のリストにない名前のスキーマを使用している必要があります。
2.5.4.3. 変更イベントをキャプチャーする際に Debezium Oracle コネクターが除外するテーブル リンクのコピーリンクがクリップボードにコピーされました!
Debezium Oracle コネクターがテーブルをキャプチャーすると、以下のルールに一致するテーブルが自動的に除外されます。
-
CMP[3|4]$[0-9]+に一致する圧縮アドバイザーテーブル。 -
パターン
SYS_IOT_OVER_%に一致するインデックスで整理された表。 -
MDRT_%、MDRS_%、またはMDXT_%パターンと一致する空間テーブル。 - ネストされたテーブル
コネクターが前述のルールのいずれかに一致する名前のテーブルをキャプチャーできるようにするには、テーブルの名前を変更する必要があります。
2.5.4.4. Debezium で使用する Oracle データベースの準備 リンクのコピーリンクがクリップボードにコピーされました!
Oracle LogMiner に必要な設定
Oracle AWS RDS では、上記のコマンドを実行することも、sysdba としてログインすることもできません。AWS には、LogMiner 設定に使用可能な代わりのコマンドが含まれます。これらのコマンドを実行する前に、Oracle AWS RDS インスタンスでバックアップが有効になっていることを確認してください。
Oracle でバックアップが有効になっていることを確認するには、最初に次のコマンドを実行します。LOG_MODE は ARCHIVELOG となるはずです。そうでない場合は、Oracle AWS RDS インスタンスの再起動が必要になる場合があります。
Oracle AWS RDS LogMiner に必要な設定
SQL> SELECT LOG_MODE FROM V$DATABASE; LOG_MODE ------------ ARCHIVELOG
SQL> SELECT LOG_MODE FROM V$DATABASE;
LOG_MODE
------------
ARCHIVELOG
LOG_MODE が ARCHIVELOG に設定されたら、コマンドを実行して LogMiner 設定を完了します。最初のコマンドはデータベースを archivelogs に設定し、2 番目のコマンドは補足ロギングを追加します。
Oracle AWS RDS LogMiner に必要な設定
exec rdsadmin.rdsadmin_util.set_configuration('archivelog retention hours',24);
exec rdsadmin.rdsadmin_util.alter_supplemental_logging('ADD');
exec rdsadmin.rdsadmin_util.set_configuration('archivelog retention hours',24);
exec rdsadmin.rdsadmin_util.alter_supplemental_logging('ADD');
Debezium がデータベース行の変更 前 の状態をキャプチャできるようにするには、キャプチャしたテーブルまたはデータベース全体の補足ロギングも有効にする必要があります。次の例は、1 つの inventory.customers テーブルのすべての列に対して補足的なログを設定する方法を示しています。
ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
すべてのテーブル列の補助ロギングを有効にすると、Oracle redo ログのボリュームが増えます。ログのサイズに過剰な増加を防ぐには、前述の設定を選択的に適用します。
補完用のロギングは最小限のデータベースレベルで有効にする必要があり、以下のように設定できます。
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
2.5.4.5. データディクショナリーに対応するように Oracle redo ログのサイズを変更 リンクのコピーリンクがクリップボードにコピーされました!
データベースの設定によっては、REDO ログのサイズや数が、許容できるパフォーマンスを得るために十分でない場合があります。Debezium Oracle コネクターを設定する前に、REDO ログの容量がデータベースをサポートするのに十分であることを確認してください。
データベースの REDO ログの容量は、そのデータディクショナリーを保存するのに十分でなければなりません。一般的に、データディクショナリーのサイズは、データベースのテーブルや列の数に応じて大きくなります。REDO ログの容量が十分でない場合、データベースと Debezium コネクターの両方でパフォーマンスの問題が発生する可能性があります。
データベースのログ容量を増やす必要があるかどうかは、データベース管理者に相談してください。
2.5.4.6. Debezium Oracle コネクターが使用するアーカイブログの宛先の指定 リンクのコピーリンクがクリップボードにコピーされました!
Oracle データベース管理者は、アーカイブログの宛先を最大 31 個まで設定できます。管理者は、各宛先のパラメーターを設定して、特定の用途 (物理スタンバイのログ送信や、長期のログの保持を可能にする外部ストレージなど) を指定できます。Oracle は、アーカイブログの宛先に関する詳細を V$ARCHIVE_DEST_STATUS ビューで報告します。
Debezium Oracle コネクターは、ステータスが VALID でタイプが LOCAL の宛先のみを使用します。Oracle 環境にその基準を満たす宛先が複数含まれている場合は、Oracle 管理者に相談して、Debezium が使用するアーカイブログの宛先を決定します。
手順
-
Debezium を使用するようにアーカイブログの宛先を指定するには、コネクターの設定で
archive.destination.nameプロパティーを設定します。
たとえば、データベースが 2 つのアーカイブ宛先パス/path/oneと/path/twoで設定されており、V$ARCHIVE_DEST_STATUSテーブルがこれらのパスをDEST_NAMEの列で指定された宛先名に関連付けているとします。両方の宛先が Debezium の基準を満たしている場合 (つまり、statusがVALIDで、typeがLOCALの場合)、データベースが/path/twoに書き込むアーカイブログを使用するようにコネクターを設定するには、archive.destination.nameの値を、V$ARCHIVE_DEST_STATUSテーブルで/path/twoに関連付けられているDEST_NAME列の値に設定します。たとえば、DEST_NAMEが/path/twoのLOG_ARCHIVE_DEST_3の場合は、Debezium を以下のように設定します。
{
"archive.destination.name": "LOG_ARCHIVE_DEST_3"
}
{
"archive.destination.name": "LOG_ARCHIVE_DEST_3"
}
archive.destination.name の値は、データベースがアーカイブログに使用するパスに設定しないでください。アーカイブログ保持ポリシーに準じる V$ARCHIVE_DEST_STATUS テーブルの行に対して、DEST_NAME 列にあるアーカイブログ保存先の名前にプロパティーを設定します。
Oracle 環境に基準を満たす宛先が複数含まれており、優先する宛先を指定しなかった場合、Debezium Oracle コネクターは宛先パスをランダムに選択します。各宛先には異なる保持ポリシーが設定されている場合があります。そのため、要求するログデータが削除されたパスをコネクターが選択した場合、エラーが発生する可能性があります。
2.5.4.7. Debezium Oracle コネクターの Oracle ユーザーの作成 リンクのコピーリンクがクリップボードにコピーされました!
Debezium Oracle コネクターが変更イベントをキャプチャーするには、特定のパーミッションを持つ Oracle LogMiner ユーザーとして実行する必要があります。以下の例は、マルチテナントデータベースモデルでコネクターの Oracle ユーザーアカウントを作成する SQL を示しています。
コネクターは、自分の Oracle ユーザーアカウントによって行われたデータベースの変更をキャプチャします。ただし、SYS や SYSTEM のユーザーアカウントで行われた変更は捕捉できません。
コネクターの LogMiner ユーザーの作成
| 項目 | ロール名 | 説明 |
|---|---|---|
| 1 | CREATE SESSION | コネクターが Oracle に接続できるようにします。 |
| 2 | SET CONTAINER | コネクターがプラグ可能なデータベース間の切り替えを可能にします。これは、Oracle インストールでコンテナーデータベースのサポート (CDB) が有効になっている場合にのみ必要です。 |
| 3 | SELECT ON V_$DATABASE |
コネクターによる |
| 4 | FLASHBACK ANY TABLE |
コネクターがデータの初期スナップショットを実行する方法であるフラッシュバッククエリーを実行できるようにします。オプションで、すべてのテーブルに対して |
| 5 | SELECT ANY TABLE |
コネクターで任意のテーブルを読み込めるようにする。オプションで、すべてのテーブルに対して |
| 6 | SELECT_CATALOG_ROLE | Oracle LogMiner セッションで必要とされるデータディクショナリーをコネクターで読み込めるようにします。 |
| 7 | EXECUTE_CATALOG_ROLE | コネクターがデータディクショナリーを Oracle redo ログに書き込むことを可能にします。これは、スキーマの変更を追跡するために必要なものです。 |
| 8 | SELECT ANY TRANSACTION |
スナップショットプロセスで、任意のトランザクションに対してフラッシュバックスナップショットクエリーを実行できるようにします。 |
| 9 | LOGMINING | ロールこのロールは、Oracle LogMiner とそのパッケージへの完全なアクセスを付与する方法として、新しいバージョンの Oracle で追加されました。このロールのない古いバージョンの Oracle では、この付与を無視できます。 |
| 10 | CREATE TABLE | コネクターがそのデフォルトのテーブルスペースにフラッシュテーブルを作成することを有効にします。フラッシュテーブルにより、LGWR 内部バッファーのディスクへのフラッシュをコネクターが明示的に制御することができます。 |
| 11 | LOCK ANY TABLE | スキーマスナップショットの実行中にコネクターがテーブルをロックできるようにします。スナップショットロックが設定により明示的に無効化されている場合、このグラントは安全に無視することができます。 |
| 12 | CREATE SEQUENCE | コネクターがそのデフォルトのテーブルスペースにシーケンスを作成することを有効にします。 |
| 13 | EXECUTE ON DBMS_LOGMNR |
|
| 14 | EXECUTE ON DBMS_LOGMNR_D |
|
| 15 から 25 | SELECT ON V_$…. | コネクターがこれらのテーブルを読み取ることを可能にします。Oracle LogMiner セッションを準備するために、コネクターは Oracle redo およびアーカイブログ、および現在のトランザクションの状態に関する情報を読み取れる必要があります。これらの付与がないと、コネクターは操作できません。 |
2.5.4.8. Oracle スタンバイデータベースでのコネクターの実行 リンクのコピーリンクがクリップボードにコピーされました!
スタンバイデータベースは、プライマリーインスタンスの同期されたコピーを提供します。プライマリーデータベースに障害が発生した場合、スタンバイデータベースが継続的な可用性と障害復旧を提供します。Oracle は、物理スタンバイデータベースと論理スタンバイデータベースの両方を使用します。
フィジカルスタンバイ
フィジカルスタンバイは、プライマリーの実稼働データベースのブロック単位の正確なコピーであり、そのシステム変更番号 (SCN) の値はプライマリーのものと同一です。フィジカルスタンバイは外部接続を受け入れないため、Debezium Oracle コネクターはフィジカルスタンバイデータベースから直接変更イベントをキャプチャーできません。コネクターは、スタンバイがプライマリーデータベースに変換された後にのみ、フィジカルスタンバイからイベントをキャプチャーできます。その後、コネクターは、プライマリーデータベースであるかのように、以前のスタンバイに接続します。
ロジカルスタンバイ
論理スタンバイにはプライマリーと同じ論理データが含まれますが、データは物理的に異なる方法で保存される場合があります。論理スタンバイの SCN オフセットは、プライマリーデータベースのオフセットとは異なります。論理スタンバイデータベースからの変更をキャプチャーするように Debezium Oracle コネクターを設定 できます。
2.5.4.8.1. Oracle フェイルオーバーデータベースからデータをキャプチャーする リンクのコピーリンクがクリップボードにコピーされました!
フェイルオーバーデータベースを設定する場合、通常は論理スタンバイデータベースではなく物理スタンバイデータベースを使用するのがベストプラクティスです。フィジカルスタンバイは、ロジカルスタンバイよりもプライマリーデータベースとの一貫性の高い状態を維持します。フィジカルスタンバイにはプライマリーデータの正確なレプリカが含まれており、スタンバイのシステム変更番号 (SCN) 値はプライマリーのものと同一です。Debezium 環境では、データベースがフィジカルスタンバイにフェイルオーバーした後、一貫性のある SCN 値が存在するため、コネクターが最後に処理された SCN 値を見つけることができます。
フィジカルスタンバイは読み取り専用モードでロックされ、同期を維持するために管理リカバリーが実行されます。データベースがスタンバイモードの場合、クライアントからの外部 JDBC 接続は受け入れられず、外部アプリケーションからアクセスすることはできません。
障害発生後、Debezium が以前のフィジカルスタンバイに接続できるようにするには、DBA がスタンバイへのフェイルオーバーを有効にして、プライマリーデータベースに昇格するためのいくつかのアクションを実行する必要があります。次のリストは、いくつかの重要なアクションを示しています。
- スタンバイ上の管理リカバリーをキャンセルする。
- アクティブな回復プロセスを完了する。
- スタンバイをプライマリーロールに変換する
- クライアントの読み取りおよび書き込み操作に対する新しいプライマリーを開く。
以前のフィジカルスタンバイが通常どおり使用できるようになったら、それに接続するように Debezium Oracle コネクターを設定できます。コネクターが新しいプライマリーからキャプチャーできるようにするには、コネクター設定内のデータベースホスト名を編集し、元のプライマリーのホスト名を新しいプライマリーのホスト名に置き換えます。
2.5.4.8.2. 論理スタンバイからイベントをキャプチャーするための Debezium Oracle コネクターの設定 リンクのコピーリンクがクリップボードにコピーされました!
Oracle 用の Debezium コネクターは、プライマリーデータベースに接続するときに、内部フラッシュテーブルを使用して Oracle Log Writer Buffer (LGWR) プロセスのフラッシュサイクルを管理します。フラッシュプロセスでは、コネクターがデータベースにアクセスするために使用するユーザーアカウントに、このフラッシュテーブルの作成と書き込みの権限が必要です。ただし、論理スタンバイデータベースでは通常、読み取り専用アクセスが許可されるため、コネクターによるデータベースへの書き込みは防止されます。コネクター設定を変更して、コネクターが論理スタンバイからイベントをキャプチャーできるようにしたり、DBA がコネクターがフラッシュテーブルを格納できる新しい書き込み可能な表領域を作成したりすることができます。
Debezium Oracle コネクターが読み取り専用のロジカルスタンバイデータベースから変更を取り込む機能は、開発者プレビュー機能です。開発者プレビュー機能は、Red Hat ではいかなる形でもサポートされていません。また、機能的には完全ではなく、実稼働環境に対応していません。開発者プレビューのソフトウェアを実稼働ワークロードまたはビジネスクリティカルなワークロードには使用しないでください。開発者プレビューソフトウェアは、今後 Red Hat 製品サービスとして追加される可能性のある製品ソフトウェアを前もって早期に利用できます。お客様はこのソフトウェアを使用して機能をテストし、開発プロセス中にフィードバックを提供できます。このソフトウェアにはドキュメントが存在しない可能性があり、変更または削除される可能性があります。また、限定的なテストしか行われていません。Red Hat は、関連する SLA なしに、開発者プレビューソフトウェアに対するフィードバックを送信する手段を提供する場合があります。
Red Hat 開発者プレビューソフトウェアのサポート範囲の詳細は、開発者プレビューのサポート範囲 を参照してください。
手順
Debezium が Oracle 読み取り専用論理スタンバイデータベースからイベントをキャプチャーできるようにするには、コネクター設定に次のプロパティーを追加して、フラッシュテーブルの作成と管理を無効にします。
internal.log.mining.read.only=true
internal.log.mining.read.only=trueCopy to Clipboard Copied! Toggle word wrap Toggle overflow 上記の設定により、データベースは
LOG_MINING_FLUSHテーブルを作成および更新できなくなります。internal.log.mining.read.onlyプロパティーは、Oracle スタンドアロンデータベースまたは Oracle RAC インストールで使用できます。
拡張された最大文字列サイズ
データベースパラメーター max_string_size は、Oracle データベースと拡張により Debezium が VARCHAR2、NVARCHAR2、および RAW フィールドの値を解釈する方法を制御します。デフォルトの STANDARD は、これらのデータ型の長さが、Oracle 12c より前のリリースと同じ制限と一致することを意味します (VARCHAR2 の場合は 4000 バイト、RAW の場合は NVARCHAR2 と 2000 バイト)。EXTENDED として設定すると、これらの列で最大 32767 バイトのデータを保存できるようになりました。
データベース管理者は max_string_size を STANDARD から EXTENDED に変更できますが、その逆は許可されません。データベースが EXTENDED 文字列サポートに更新されると、元に戻すことはできません。
Debezium Oracle コネクターでは、データベースパラメーター max_string_size が EXTENDED に設定されている場合、VARCHAR2 および NVARCHAR2 フィールドの文字列長が 4000 バイトを超えるフィールド、または 2000 バイトを超える RAW フィールドの変更をキャプチャーするために、lob.enabled コネクター設定オプションを true に設定する必要があります。
EXTENDED に設定すると、文字列データのバイト長がレガシーの最大値を超えると、Oracle は文字データのインフライトの暗黙的な変換を実行します。この暗黙的な変換により、Oracle は文字列データを内部的には CLOB として扱います。これにより、外部からは通常の文字列のようにフィールドを扱うことができますが、データベース層レベルではストレージに関する注意点やリスクが伴います。
Oracle はこれらの文字列を内部的に CLOB として扱うため、redo ログにもそれに応じた特有の操作タイプが記録されます。Debezium Oracle コネクターは、それらの操作タイプをマイニング対象として認識する必要があります。また、これらの操作タイプは CLOB 操作に非常によく似ているため、他の LOB タイプと同じ方法で redo ログエントリーをマイニングする必要があります。そのため、文字列データのバイト長に関係なく redo ログからの変更をキャプチャーするには、lob.enabled を true に設定する必要があります。
Oracle が EXTENDED 文字列サイズを使用するように設定されていると、LogMiner が拡張文字列フィールドに対して SQL を再構築するときに、単一引用符 (') をエスケープできないことがあります。この問題は、拡張文字列フィールドのバイト長がレガシー最大長を超えていない場合に発生する可能性があります。その結果、これらのフィールドの値が切り捨てられ、無効な SQL ステートメントが生成されますが、Oracle コネクターはこれを解析できません。
問題の原因の一部を解決するために、以下のプロパティーを true に設定することで、一重引用符の検出を緩和するようにコネクターを設定できます。
internal.log.mining.sql.relaxed.quote.detection
詳細は、Red Hat Integration 3.0.8 のリリースノート を参照してください。
2.5.5. Debezium Oracle コネクターのデプロイメント リンクのコピーリンクがクリップボードにコピーされました!
以下の方法のいずれかを使用して Debezium Oracle コネクターをデプロイできます。
Streams for Apache Kafka を使用して、コネクタープラグインを含むイメージを自動的に作成します。
以下は推奨される方法です。
-
Dockerfile からカスタム Kafka Connect コンテナーイメージをビルドします。
この Containerfile デプロイメント方法は非推奨となりました。この方法の手順は、ドキュメントの今後のバージョンで削除される予定です。
Debezium Oracle コネクターのアーカイブには、ライセンスの関係で、Oracle データベースに接続するために必要な Oracle JDBC ドライバーが含まれていません。コネクターがデータベースにアクセスできるようにするには、コネクター環境にドライバーを追加する必要があります。詳細は、Oracle JDBC ドライバーの取得 を参照してください。
関連情報
2.5.5.1. Oracle JDBC ドライバーの取得 リンクのコピーリンクがクリップボードにコピーされました!
Debezium が Oracle データベースに接続するために必要な Oracle JDBC ドライバーファイルは、ライセンスの関係で Debezium Oracle コネクターアーカイブに含まれていません。ドライバーは、Maven Central からダウンロード可能です。使用するデプロイメント方法に応じて、Kafka Connect カスタムリソースまたはコネクターイメージの構築に使用する Dockerfile にコマンドを追加して、ドライバーを取得することができます。
-
Streams for Apache Kafka を使用して Kafka Connect イメージにコネクターを追加する場合は、「Streams for Apache Kafka を使用した Debezium Oracle コネクターのデプロイ」 に示されているように、
KafkaConnectカスタムリソースのbuilds.plugins.artifact.urlにドライバーの Maven Central の場所を追加します。 -
Dockerfile を使用してコネクター用のコンテナーイメージを構築する場合、Dockerfile に
curlコマンドを挿入して、Maven Central から必要なドライバーファイルをダウンロードするための URL を指定します。詳細には、Dockerfile からカスタム Kafka Connect コンテナーイメージを構築して Debezium Oracle コネクターをデプロイ するを参照してください。
2.5.5.2. Streams for Apache Kafka を使用した Debezium Oracle コネクターのデプロイメント リンクのコピーリンクがクリップボードにコピーされました!
Debezium コネクターのデプロイで推奨される方法は、Streams for Apache Kafka を使用して、コネクタープラグインを含む Kafka Connect コンテナーイメージを構築することです。
デプロイメントプロセス中に、以下のカスタムリソース (CR) を作成し、使用します。
-
Kafka Connect インスタンスを定義し、コネクターアーティファクトに関する情報をイメージに含める必要がある
KafkaConnectCR。 -
コネクターがソースデータベースにアクセスするために使用する情報を提供する
KafkaConnectorCR。Streams for Apache Kafka が Kafka Connect Pod を起動した後、KafkaConnectorCR を適用してコネクターを起動します。
Kafka Connect イメージのビルド仕様では、デプロイ可能なコネクターを指定できます。各コネクタープラグインに対して、デプロイメントに利用可能にする他のコンポーネントを指定することもできます。たとえば、Apicurio Registry アーティファクトや Debezium スクリプトコンポーネントを追加できます。Streams for Apache Kafka は、Kafka Connect イメージをビルドするときに、指定されたアーティファクトをダウンロードし、それをイメージに組み込みます。
Kafka Connect CR の spec.build.output パラメーターは、生成される KafkaConnect コンテナーイメージを格納する場所を指定します。コンテナーイメージは、quay.io などのコンテナーレジストリー、または OpenShift ImageStream に保存できます。イメージを ImageStream に保存するには、Kafka Connect をデプロイする前に ImageStream を作成する必要があります。ImageStreams は自動的に作成されません。
KafkaConnect リソースを使用してクラスターを作成する場合は、Kafka Connect REST API を使用してコネクターを作成または更新できません。ただし、REST API を使用して情報を取得できます。
関連情報
- 「Streams for Apache Kafka on OpenShift のデプロイと管理」の Kafka Connect の設定
- 「Streams for Apache Kafka on OpenShift のデプロイと管理」の 新しいコンテナーイメージの自動ビルド
2.5.5.3. Streams for Apache Kafka を使用した Debezium Oracle コネクターのデプロイ リンクのコピーリンクがクリップボードにコピーされました!
以前のバージョンの Streams for Apache Kafka では、OpenShift に Debezium コネクターをデプロイするには、まずコネクター用の Kafka Connect イメージをビルドする必要がありました。OpenShift にコネクターをデプロイするための現在の推奨方法は、Streams for Apache Kafka のビルド設定を使用して、使用する Debezium コネクタープラグインを含む Kafka Connect コンテナーイメージを自動的にビルドすることです。
ビルドプロセス中に、Streams for Apache Kafka Operator は、Debezium コネクター定義を含む KafkaConnect カスタムリソースの入力パラメーターを Kafka Connect コンテナーイメージに変換します。このビルドは、Red Hat Maven リポジトリーまたは別の設定済みの HTTP サーバーから、必要なアーティファクトをダウンロードします。
新規に作成されたコンテナーは .spec.build.output に指定されるコンテナーレジストリーにプッシュされ、Kafka Connect クラスターのデプロイに使用されます。Streams for Apache Kafka が Kafka Connect イメージをビルドした後、ビルドに含まれるコネクターを起動するための KafkaConnector カスタムリソースを作成します。
前提条件
- クラスター Operator がインストールされている OpenShift クラスターにアクセスできる。
- Streams for Apache Kafka Operator が実行されている。
- Apache Kafka クラスターが Streams for Apache Kafka on OpenShift のデプロイと管理 に記載されているとおりにデプロイされている。
- Kafka Connect が Streams for Apache Kafka にデプロイされている。
- Red Hat build of Debezium のライセンスを所有している。
-
OpenShift
ocCLI クライアントがインストールされている、または OpenShift Container Platform Web コンソールにアクセスできる。 Kafka Connect ビルドイメージの保存方法に応じて、レジストリーのパーミッションを用意するか、ImageStream リソースを作成している。
- ビルドイメージを Red Hat Quay.io または Docker Hub などのイメージレジストリーに保存する場合は、以下が必要です。
- レジストリーでイメージを作成し、管理するためのアカウントおよびパーミッション
- ビルドイメージをネイティブ OpenShift ImageStream として保存する場合は、以下を行います。
- 新規コンテナーイメージを保存するために、ImageStream リソースをクラスターにデプロイします。クラスターの ImageStream を明示的に作成する必要があります。ImageStreams は、デフォルトでは利用できません。ImageStreams の詳細は、OpenShift Container Platform でのイメージストリームの管理 を参照してください。
手順
- OpenShift クラスターにログインします。
コネクターの Debezium
KafkaConnectカスタムリソース (CR) を作成するか、既存のリソースを変更します。たとえば、metadata.annotationsおよびspec.buildプロパティーを指定するdbz-connect.yamlという名前のKafkaConnectCR を作成します。以下の例は、KafkaConnectカスタムリソースを記述するdbz-connect.yamlファイルからの抜粋を示しています。
例2.34 Debezium コネクターを含む
KafkaConnectカスタムリソースを定義したdbz-connect.yamlファイル次の例では、カスタムリソースは、次のアーティファクトをダウンロードするように設定されています。
- Debezium Oracle コネクターアーカイブ。
- Red Hat build of Apicurio Registry アーカイブApicurio Registry はオプションのコンポーネントです。コネクターで Avro シリアル化を使用する場合にのみ、Apicurio Registry コンポーネントを追加します。
- Debezium スクリプト SMT アーカイブと Debezium コネクターで使用する関連言語の依存関係。SMT アーカイブおよび言語の依存関係は任意のコンポーネントです。Debezium コンテンツベースのルーティング SMT または フィルター SMT を使用する場合にのみ、これらのコンポーネントを追加します。
- Oracle JDBC ドライバー。Oracle データベースに接続するために必要ですが、コネクターアーカイブには含まれていません。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow Expand 表2.121 Kafka Connect 設定の説明 項目 説明 1
strimzi.io/use-connector-resourcesアノテーションを"true"に設定して、クラスター Operator がKafkaConnectorリソースを使用してこの Kafka Connect クラスター内のコネクターを設定できるようにします。2
spec.build設定は、ビルドイメージの保存場所を指定し、プラグインアーティファクトの場所とともにイメージに追加するプラグインをリストします。3
build.outputは、新しくビルドされたイメージを保存するレジストリーを指定します。4
イメージ出力の名前およびイメージ名を指定します。
output.typeの有効な値は、Docker Hub や Quay などのコンテナーレジストリーにプッシュする場合はdocker、内部の OpenShift ImageStream にイメージをプッシュする場合はimagestreamです。ImageStream を使用するには、ImageStream リソースをクラスターにデプロイする必要があります。KafkaConnect 設定でbuild.outputを指定する方法の詳細は、Streams for Apache Kafka API リファレンスの スキーマ参照のビルド を参照してください。5
plugins設定は、Kafka Connect イメージに追加するすべてのコネクターをリストします。リストの各エントリーについて、プラグインnameと、コネクターのビルドに必要なアーティファクトに関する情報を指定します。必要に応じて、各コネクタープラグインに対して、コネクターと使用できる他のコンポーネントを含めることができます。たとえば、Service Registry アーティファクトまたは Debezium スクリプトコンポーネントを追加できます。6
artifacts.typeの値は、artifacts.urlで指定するアーティファクトのファイルタイプを指定します。有効なタイプはzip、tgz、またはjarです。Debezium コネクターアーカイブは、.zipファイル形式で提供されます。JDBC ドライバーファイルは.jar形式です。typeの値は、urlフィールドで参照されるファイルのタイプと一致させる必要があります。7
artifacts.urlの値は、コネクターアーティファクトのファイルを格納する Maven リポジトリーなどの HTTP サーバーのアドレスを指定します。Debezium コネクターアーティファクトは Red Hat リポジトリーで入手できます。OpenShift クラスターが指定されたサーバーにアクセスできる必要があります。8
(オプション) Apicurio Registry コンポーネントをダウンロードするためのアーティファクト
typeとurlを指定します。デフォルトの JSON コンバーターを使用する代わりに、コネクターが Apache Avro を使用して Red Hat build of Apicurio Registry でイベントのキーと値をシリアル化する場合にのみ、Apicurio Registry アーティファクトを含めます。9
(オプション) Debezium コネクターで使用する Debezium スクリプト SMT アーカイブのアーティファクト
typeとurlを指定します。Debezium コンテンツベースルーティング SMT または フィルター SMT を使用する場合にのみ、スクリプト SMT を含めます。スクリプト SMT を使用するには、groovy などの JSR 223 準拠のスクリプト実装もデプロイする必要があります。10
(オプション) JSR 223 準拠のスクリプト実装の JAR ファイルのアーティファクト
typeとurlを指定します。これは、Debezium スクリプト SMT で必要です。重要Streams for Apache Kafka を使用してコネクタープラグインを Kafka Connect イメージに組み込む場合、必要なスクリプト言語コンポーネントごとに、
artifacts.urlに JAR ファイルのロケーションを指定し、artifacts.typeの値もjarに設定する必要があります。値が無効な場合は、実行時にコネクターが失敗します。スクリプト SMT で Apache Groovy 言語を使用できるようにするために、この例のカスタムリソースは、次のライブラリーの JAR ファイルを取得します。
-
groovy -
groovy-jsr223(スクリプトエージェント) -
groovy-json(JSON 文字列を解析するためのモジュール)
Debezium スクリプト SMT は、GraalVM JavaScript の JSR 223 実装の使用もサポートします。
11
Maven Central における Oracle JDBC ドライバーの場所を指定します。必要なドライバーは Debezium Oracle コネクターアーカイブには含まれていません。
以下のコマンドを入力して、
KafkaConnectビルド仕様を OpenShift クラスターに適用します。oc create -f dbz-connect.yaml
oc create -f dbz-connect.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow Streams Operator はカスタムリソースで指定された設定に基づいて、デプロイする Kafka Connect イメージを準備します。
ビルドが完了すると、Operator はイメージを指定されたレジストリーまたは ImageStream にプッシュし、Kafka Connect クラスターを起動します。設定にリスト表示されているコネクターアーティファクトはクラスターで利用できます。KafkaConnectorリソースを作成し、デプロイする各コネクターのインスタンスを定義します。
たとえば、以下のKafkaConnectorCR を作成し、oracle-inventory-connector.yamlとして保存します。例2.35 Debezium コネクターの
KafkaConnectorカスタムリソースを定義するoracle-inventory-connector.yamlファイルCopy to Clipboard Copied! Toggle word wrap Toggle overflow Expand 表2.122 コネクター設定の説明 項目 説明 1
Kafka Connect クラスターに登録するコネクターの名前。
2
コネクタークラスの名前。
3
同時に動作できるタスクの数。
4
コネクターの設定。
5
ホストデータベースインスタンスのアドレス。
6
データベースインスタンスのポート番号。
7
Debezium がデータベースへの接続に使用するアカウントの名前。
8
Debezium がデータベースユーザーアカウントに接続するために使用するパスワード。
9
変更をキャプチャーするデータベースの名前。
10
データベースインスタンスまたはクラスターのトピック接頭辞。
指定する名前は、英数字またはアンダースコアのみで設定する必要があります。
トピック接頭辞は、このコネクターから変更イベントを受信する Kafka トピックの接頭辞として使用されるため、名前はクラスターのコネクター間で一意である必要があります。
コネクターを Avro コネクター と統合する場合、この namespace は、関連する Kafka Connect スキーマの名前や、対応する Avro スキーマの namespace でも使用されます。11
コネクターが変更イベントをキャプチャーするテーブルのリスト。
以下のコマンドを実行してコネクターリソースを作成します。
oc create -n <namespace> -f <kafkaConnector>.yaml
oc create -n <namespace> -f <kafkaConnector>.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow 以下に例を示します。
oc create -n debezium -f oracle-inventory-connector.yaml
oc create -n debezium -f oracle-inventory-connector.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow コネクターは Kafka Connect クラスターに登録され、
KafkaConnectorCR のspec.config.database.dbnameで指定されたデータベースに対して実行を開始します。コネクター Pod の準備ができると、Debezium が実行されます。
これで、Debezium Oracle デプロイメントを検証する 準備が整いました。
2.5.5.4. Dockerfile からカスタム Kafka Connect コンテナーイメージをビルドして Debezium Oracle コネクターのデプロイ リンクのコピーリンクがクリップボードにコピーされました!
Debezium Oracle コネクターをデプロイするには、Debezium コネクターアーカイブが含まれるカスタム Kafka Connect コンテナーイメージをビルドし、このコンテナーイメージをコンテナーレジストリーにプッシュする必要があります。次に、以下のカスタムリソース (CR) を作成する必要があります。
-
Kafka Connect インスタンスを定義する
KafkaConnectCR。imageは Debezium コネクターを実行するために作成したイメージの名前を指定します。この CR は、Red Hat Streams for Apache Kafka がデプロイされている OpenShift インスタンスに適用します。Streams for Apache Kafka は、Apache Kafka を OpenShift に導入する Operator とイメージを提供します。 -
Debezium Oracle コネクターを定義する
KafkaConnectorCR。この CR をKafkaConnectCR を適用するのと同じ OpenShift インスタンスに適用します。
前提条件
- Oracle Database が稼働し、Oracle を設定して Debezium コネクターと連携する 手順が完了済みである必要があります。
- Streams for Apache Kafka が OpenShift にデプロイされ、Apache Kafka および Kafka Connect が実行されている。詳細は、Streams for Apache Kafka on OpenShift のデプロイと管理 を参照してください。
- Podman または Docker がインストールされている。
-
Debezium コネクターを実行するコンテナーを追加する予定のコンテナーレジストリー (
quay.ioやdocker.ioなど) でコンテナーを作成および管理するアカウントとパーミッションを持っている。 Kafka Connect サーバーは、Oracle 用の必要な JDBC ドライバーをダウンロードするために、Maven Central にアクセスすることができます。また、ドライバーのローカルコピー、またはローカルの Maven リポジトリーや他の HTTP サーバーから利用可能なものを使用することもできます。
詳細は、Oracle JDBC ドライバーの取得 を参照してください。
手順
Kafka Connect の Debezium Oracle コンテナーを作成します。
registry.redhat.io/amq-streams/kafka-39-rhel9:2.9.0をベースイメージとして使用する Dockerfile を作成します。たとえば、ターミナルウィンドウから、以下のコマンドを入力します。Copy to Clipboard Copied! Toggle word wrap Toggle overflow Expand 項目 説明 1
任意のファイル名を指定できます。
2
Kafka Connect プラグインディレクトリーへのパスを指定します。Kafka Connect のプラグインディレクトリーが別の場所にある場合は、このパスを実際のディレクトリーのパスに置き換えてください。
このコマンドは、現在のディレクトリーに
debezium-container-for-oracle.yamlという名前の Docker ファイルを作成します。前のステップで作成した
debezium-container-for-oracle.yamlDocker ファイルからコンテナーイメージをビルドします。ファイルが含まれるディレクトリーから、ターミナルウィンドウを開き、以下のコマンドのいずれかを入力します。podman build -t debezium-container-for-oracle:latest .
podman build -t debezium-container-for-oracle:latest .Copy to Clipboard Copied! Toggle word wrap Toggle overflow docker build -t debezium-container-for-oracle:latest .
docker build -t debezium-container-for-oracle:latest .Copy to Clipboard Copied! Toggle word wrap Toggle overflow 上記のコマンドは、
debezium-container-for-oracleという名前のコンテナーイメージを構築します。カスタムイメージを quay.io などのコンテナーレジストリーまたは内部のコンテナーレジストリーにプッシュします。コンテナーレジストリーは、イメージをデプロイする OpenShift インスタンスで利用できる必要があります。以下のいずれかのコマンドを実行します。
podman push <myregistry.io>/debezium-container-for-oracle:latest
podman push <myregistry.io>/debezium-container-for-oracle:latestCopy to Clipboard Copied! Toggle word wrap Toggle overflow docker push <myregistry.io>/debezium-container-for-oracle:latest
docker push <myregistry.io>/debezium-container-for-oracle:latestCopy to Clipboard Copied! Toggle word wrap Toggle overflow 新しい Debezium Oracle KafkaConnect カスタムリソース (CR) を作成します。たとえば、
annotationsおよびimageプロパティーを指定するdbz-connect.yamlという名前のKafkaConnectCR を作成します。以下の例は、KafkaConnectカスタムリソースを記述するdbz-connect.yamlファイルからの抜粋を示しています。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow Expand 項目 説明 1
KafkaConnectorリソースはこの Kafka Connect クラスターでコネクターを設定するために使用されることを、metadata.annotationsは Cluster Operator に示します。2
spec.imageは Debezium コネクターを実行するために作成したイメージの名前を指定します。設定された場合、このプロパティーによって Cluster Operator のSTRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE変数がオーバーライドされます。以下のコマンドを入力して、
KafkaConnectCR を OpenShift Kafka Connect 環境に適用します。oc create -f dbz-connect.yaml
oc create -f dbz-connect.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow このコマンドは、Debezium コネクターを実行するために作成したイメージの名前を指定する Kafka Connect インスタンスを追加します。
Debezium Oracle コネクターインスタンスを設定する
KafkaConnectorカスタムリソースを作成します。通常、コネクターに使用できる設定プロパティーを使用して、
.yamlファイルに Debezium Db2 コネクターを設定します。コネクター設定は、Debezium に対して、スキーマおよびテーブルのサブセットにイベントを生成するよう指示する可能性があり、または機密性の高い、大きすぎる、または不必要な指定のコラムで Debezium が値を無視、マスク、または切り捨てするようにプロパティーを設定する可能性もあります。以下の例では、ポート
1521で Oracle ホスト IP アドレスに接続する Debezium コネクターを設定します。このホストにはORCLCDBという名前のデータベースがあり、server1はサーバーの論理名です。Oracle
inventory-connector.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow Expand 表2.123 コネクター設定の説明 項目 説明 1
Kafka Connect サービスに登録する場合のコネクターの名前。
2
この Oracle コネクタークラスの名前。
3
Oracle インスタンスのアドレス。
4
Oracle インスタンスのポート番号。
5
コネクターのユーザーの作成 で指定した Oracle ユーザーの名前。
6
コネクターのユーザーの作成 で指定した Oracle ユーザーのパスワード。
7
変更をキャプチャーするデータベースの名前。
8
コネクターが変更をキャプチャーする Oracle のプラグ可能なデータベースの名前。コンテナーデータベース (CDB) のインストールのみで使用されます。
9
topic prefix は、コネクターが変更をキャプチャーする Oracle データベースサーバーの名前空間を識別して提供します。
10
DDL ステートメントをデータベーススキーマの履歴トピックに書き込み、復元するためにコネクターによって使用される Kafka ブローカーのリスト。
11
コネクターが DDL ステートメントを書き、復元するデータベーススキーマの履歴トピックの名前。このトピックは内部使用のみを目的としており、コンシューマーが使用しないようにしてください。
Kafka Connect でコネクターインスタンスを作成します。たとえば、
KafkaConnectorリソースをinventory-connector.yamlファイルに保存した場合は、以下のコマンドを実行します。oc apply -f inventory-connector.yaml
oc apply -f inventory-connector.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow 上記のコマンドは
inventory-connectorを登録し、コネクターはKafkaConnectorCR に定義されているserver1データベースに対して実行を開始します。
Debezium Oracle コネクターに設定できる設定プロパティーの完全リストは Oracle コネクタープロパティー を参照してください。
結果
コネクターが起動すると、コネクターが設定された Oracle データベースの 整合性スナップショットが実行 されます。その後、コネクターは行レベルの操作のデータ変更イベントの生成を開始し、変更イベントレコードを Kafka トピックにストリーミングします。
2.5.5.5. コンテナーデータベースとノンコンテナーデータベースの設定 リンクのコピーリンクがクリップボードにコピーされました!
Oracle Database は以下のデプロイメントタイプをサポートしています。
- コンテナーデータベース (CDB)
- 複数の PDB (Multiple Pluggable Database) を格納できるデータベース。データベースクライアントは、標準的な非 CDB データベースであるかのように、各 PDB に接続します。
- ノンコンテナーデータベース (非 CDB)
- プラガブルデータベースの作成には対応していない、標準的な Oracle のデータベース。
mTLS セキュアな接続の使用
相互 TLS 認証 (mTLS) を使用して Oracle に接続する場合は、コネクターとデータベースサービスの両方が独自の識別情報を証明する必要があります。Oracle コネクターの Debezium は、mTLS 認証をサポートする Oracle JDBC ドライバーの組み込み機能に依存しています。
以下の方法のいずれかを使用して、Debezium と Oracle との間で mTLS 接続を確立できます。
2.5.5.6. Java キーとトラストストアを使用して、Oracle コネクターが mTLS を使用するように設定する リンクのコピーリンクがクリップボードにコピーされました!
前提条件
- コネクターが設定されたキーおよびトラストストアファイルにアクセスできることを確認している。
- データベースの TNS (Transparent Network Substrate) リスナーが TCPS のセキュアな接続をサポートすることを確認している。
手順
以下の例のように Debezium Oracle コネクターを設定します。
例: Debezium Oracle コネクター TLS 設定
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 注記Debezium が TLS 暗号化を使用して Oracle と通信するには、サーバーとの TCPS 接続が必要です。TCPS 接続を確立するには、
database.hostプロパティーではなく、database.urlプロパティーを使用するようにコネクターを設定する必要があります。database.hostプロパティーとは異なり、database.urlプロパティーでは、明示的にTCPSプロトコルを使用する必要がある Oracle TNS (Transparent Network Substrate) 接続文字列を定義できます。
2.5.5.7. Oracle Wallet を使用して、コネクターが mTLS を使用するように設定する リンクのコピーリンクがクリップボードにコピーされました!
前提条件
- Oracle データベースサーバー上に Oracle Wallet が設定されていることをデータベース管理者に確認している。
-
Oracle JDBC ドライバーアーカイブ内で
oraclepki.jarを見つけます。
手順
-
$Debezium Debezium Oracle コネクター jar が存在するのと同じ場所に
oraclepki.jarをインストールします。Oracle JDBC ドライバーをダウンロードしてインストールした場合は、oraclepki.jarも同じ場所に配置してください。 -
database.hostnameではなく、database.url設定プロパティーを使用してコネクターを設定します。database.urlを使用すると、Oracle Wallet と対話するための TNS ベースの設定が提供されます。以下は設定例です。 -
Oracle JDBC ドライバープロパティー
oracle.net.wallet_locationを設定して、Oracle JDBC ドライバーで使用する Oracle Wallet 設定を明示的に設定します。
mTLS 設定の例
{
"database.url": "jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=TCPS)(PORT=xxxx)(HOST=xx.xx.xx.xx))(CONNECT_DATA=(SID=xxxx)))",
"driver.oracle.net.wallet_location": "(SOURCE=(METHOD=file)(METHOD_DATA=(DIRECTORY=/opt/kafka/external-configuration/oracle_wallet/)))"
}
{
"database.url": "jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=TCPS)(PORT=xxxx)(HOST=xx.xx.xx.xx))(CONNECT_DATA=(SID=xxxx)))",
"driver.oracle.net.wallet_location": "(SOURCE=(METHOD=file)(METHOD_DATA=(DIRECTORY=/opt/kafka/external-configuration/oracle_wallet/)))"
}
database.url に正しいホスト、ポート、およびサービス識別子 (sid) を設定していることを確認してください。また、driver.oracle.net.wallet_location のディレクトリーが読み取り可能であることを確認してください。
2.5.5.8. Debezium Oracle コネクターが実行していることの確認 リンクのコピーリンクがクリップボードにコピーされました!
コネクターがエラーなしで正常に起動すると、コネクターがキャプチャーするように設定された各テーブルのトピックが作成されます。ダウンストリームアプリケーションは、これらのトピックをサブスクライブして、ソースデータベースで発生する情報イベントを取得できます。
コネクターが実行されていることを確認するには、OpenShift Container Platform Web コンソールまたは OpenShift CLI ツール (oc) から以下の操作を実行します。
- コネクターのステータスを確認します。
- コネクターがトピックを生成していることを確認します。
- 各テーブルの最初のスナップショットの実行中にコネクターが生成する読み取り操作 ("op":"r") のイベントがトピックに反映されていることを確認します。
前提条件
- Debezium コネクターが Streams for Apache Kafka on OpenShift にデプロイされている。
-
OpenShift
ocCLI クライアントがインストールされている。 - OpenShift Container Platform Web コンソールにアクセスできる。
手順
以下の方法のいずれかを使用して
KafkaConnectorリソースのステータスを確認します。OpenShift Container Platform Web コンソールから以下を実行します。
-
Home
Search に移動します。 -
Search ページで Resources をクリックし、Select Resource ボックスを開き、
KafkaConnectorを入力します。 - KafkaConnectors リストから、チェックするコネクターの名前をクリックします (例: inventory-connector-oracle)。
- Conditions セクションで、Type および Status 列の値が Ready および True に設定されていることを確認します。
-
Home
ターミナルウィンドウから以下を実行します。
以下のコマンドを実行します。
oc describe KafkaConnector <connector-name> -n <project>
oc describe KafkaConnector <connector-name> -n <project>Copy to Clipboard Copied! Toggle word wrap Toggle overflow 以下に例を示します。
oc describe KafkaConnector inventory-connector-oracle -n debezium
oc describe KafkaConnector inventory-connector-oracle -n debeziumCopy to Clipboard Copied! Toggle word wrap Toggle overflow このコマンドは、以下の出力のようなステータス情報を返します。
例2.36
KafkaConnectorリソースのステータスCopy to Clipboard Copied! Toggle word wrap Toggle overflow
コネクターによって Kafka トピックが作成されたことを確認します。
OpenShift Container Platform Web コンソールから以下を実行します。
-
Home
Search に移動します。 -
Search ページで Resources をクリックし、Select Resource ボックスを開き、
KafkaTopicを入力します。 -
KafkaTopics リストから確認するトピックの名前をクリックします (例:
inventory-connector-oracle.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d)。 - Conditions セクションで、Type および Status 列の値が Ready および True に設定されていることを確認します。
-
Home
ターミナルウィンドウから以下を実行します。
以下のコマンドを実行します。
oc get kafkatopics
oc get kafkatopicsCopy to Clipboard Copied! Toggle word wrap Toggle overflow このコマンドは、以下の出力のようなステータス情報を返します。
例2.37
KafkaTopicリソースのステータスCopy to Clipboard Copied! Toggle word wrap Toggle overflow
トピックの内容を確認します。
- ターミナルウィンドウから、以下のコマンドを入力します。
oc exec -n <project> -it <kafka-cluster> -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=<topic-name>
oc exec -n <project> -it <kafka-cluster> -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=<topic-name>Copy to Clipboard Copied! Toggle word wrap Toggle overflow 以下に例を示します。
oc exec -n debezium -it debezium-kafka-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=inventory-connector-oracle.inventory.products_on_hand
oc exec -n debezium -it debezium-kafka-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=inventory-connector-oracle.inventory.products_on_handCopy to Clipboard Copied! Toggle word wrap Toggle overflow トピック名を指定する形式は、手順 1 で返された
oc describeコマンドと同じです (例:inventory-connector-oracle.inventory.addresses)。トピックの各イベントについて、このコマンドは、以下の出力のような情報を返します。
例2.38 Debezium 変更イベントの内容
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"inventory-connector-oracle.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-oracle.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-oracle.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"int64","optional":false,"field":"ts_us"},{"type":"int64","optional":false,"field":"ts_ns"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.oracle.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"inventory-connector-oracle.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"3.0.8.Final-redhat-00004","connector":"oracle","name":"inventory-connector-oracle","ts_ms":1638985247805,"ts_us":1638985247805000000,"ts_ns":1638985247805000000,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"oracle-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"ts_us":1638985247805102,"ts_ns":1638985247805102588,"transaction":null}}{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"inventory-connector-oracle.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-oracle.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-oracle.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"int64","optional":false,"field":"ts_us"},{"type":"int64","optional":false,"field":"ts_ns"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.oracle.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"inventory-connector-oracle.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"3.0.8.Final-redhat-00004","connector":"oracle","name":"inventory-connector-oracle","ts_ms":1638985247805,"ts_us":1638985247805000000,"ts_ns":1638985247805000000,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"oracle-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"ts_us":1638985247805102,"ts_ns":1638985247805102588,"transaction":null}}Copy to Clipboard Copied! Toggle word wrap Toggle overflow 上記の例では、
payload値は、コネクタースナップショットがテーブルinventory.products_on_handから読み込み ("op" ="r") イベントを生成したことを示しています。product_idレコードの"before"状態はnullであり、レコードに以前の値が存在しないことを示しています。"after"状態は、product_id101を持つ項目のquantityが3であることを示しています。
2.5.6. Debezium Oracle コネクター設定プロパティーの説明 リンクのコピーリンクがクリップボードにコピーされました!
Debezium Oracle コネクターには、アプリケーションに適したコネクター動作を実現するために使用できる設定プロパティーが多数あります。多くのプロパティーにはデフォルト値があります。プロパティーに関する情報は、以下のように設定されています。
- 必要な Debezium Oracle コネクター設定プロパティー
- Debezium がデータベース履歴トピックから読み取るイベントを処理する方法を制御する データベース履歴コネクター設定プロパティー
必要な Debezium Oracle コネクター設定プロパティー
以下の設定プロパティーは、デフォルト値がない場合は 必須 です。
| プロパティー | デフォルト | 説明 |
| デフォルトなし | コネクターの一意名。同じ名前で再登録を試みると失敗します。(このプロパティーはすべての Kafka Connect コネクターに必要です) | |
| デフォルトなし |
コネクターの Java クラスの名前。Oracle コネクターには、常に | |
| デフォルトなし |
コネクターが使用できる カスタムコンバーター インスタンスのシンボリック名のコンマ区切りリストを列挙します。
コネクターに設定するコンバーターごとに、コンバーターインターフェイスを実装するクラスの完全修飾名を指定する
以下に例を示します。 boolean.type: io.debezium.connector.oracle.converters.NumberOneToBooleanConverter
設定されたコンバータの動作をさらに制御したい場合は、1 つ以上の設定パラメーターを追加して、コンバータに値を渡すことができます。追加の設定パラメーターとコンバーターを関連付けるには、パラメーター名の前にコンバーターのシンボリック名を付けます。 boolean.selector: .*MYTABLE.FLAG,.*.IS_ARCHIVED
| |
|
| このコネクターに作成するタスクの最大数。Oracle コネクターは常に単一のタスクを使用するため、この値を使用しません。そのため、デフォルト値は常に許容されます。 | |
| デフォルトなし | Oracle データベースサーバーの IP アドレスまたはホスト名。 | |
| デフォルトなし | Oracle データベースサーバーの整数のポート番号。 | |
| デフォルトなし | コネクターが Oracle データベースサーバーへの接続に使用する Oracle ユーザーアカウントの名前。 | |
| デフォルトなし | Oracle データベースサーバーへの接続時に使用するパスワード。 | |
| デフォルトなし | 接続先のデータベースの名前。コンテナーデータベース環境では、含まれるプラグ可能なデータベース (PDB) の名前ではなく、ルートコンテナーデータベース (CDB) の名前を指定します。 | |
| デフォルトなし | raw データベースの JDBC URL を指定します。このプロパティーを使用すると、そのデータベース接続を柔軟に定義できます。有効な値は、raw TNS 名および RAC 接続文字列などです。 | |
| デフォルトなし | 接続先の Oracle のプラグ可能なデータベースの名前。このプロパティーは、コンテナーデータベース (CDB) のインストールでのみ使用してください。 | |
| デフォルトなし |
コネクターが変更をキャプチャーする Oracle データベースサーバーの名前空間を識別して提供するトピック接頭辞。設定した値は、コネクターが出力するすべての Kafka トピック名の接頭辞として使用されます。Debezium 環境のすべてのコネクターで一意のトピック接頭辞を指定します。英数字、ハイフン、ドットおよびアンダースコアの文字が有効です。 警告 このプロパティーの値を変更しないでください。名前の値を変更すると、再起動後に、元のトピックにイベントを発行し続けるのではなく、新しい値に基づいた名前のトピックに後続のイベントを発行します。また、コネクターはデータベーススキーマ履歴トピックを復元できません。 | |
|
| コネクターがデータベースの変更をストリーミングする際に使用するアダプター実装。以下の値を設定できます。
| |
| Initial | このコネクターがキャプチャーされたテーブルのスナップショットを取得するために使用するモードを指定します。以下の値を設定できます。
スナップショットが完了すると、
詳細は、 | |
| shared | コネクターがテーブルロックを保持するかどうか、また保持する時間をコントロールします。テーブルロックは、コネクターがスナップショットを実行している間、特定の種類の変更テーブル操作が発生するのを防ぎます。以下の値を設定できます。
| |
|
|
スナップショットを実行するときにコネクターがデータをクエリーする方法を指定します。
この設定により、 | |
|
コネクターの |
スナップショットに含めるテーブルの完全修飾名 (
マルチテナントコンテナーデータベース (CDB) 環境では、
テーブルの名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、テーブル名に存在する可能性のある部分文字列とは一致しない、テーブルの名前文字列全体と照合されます。
スナップショットには、コネクターの
このプロパティーは、コネクターの | |
| デフォルトなし | スナップショットに追加するテーブル行を指定します。スナップショットにテーブルの行のサブセットのみを含める場合は、プロパティーを使用します。このプロパティーはスナップショットにのみ影響します。コネクターがログから読み取るイベントには影響しません。
プロパティーには、
スナップショットにソフト削除以外のレコードのみを含める場合は、soft-delete 列 ( "snapshot.select.statement.overrides": "customer.orders", "snapshot.select.statement.overrides.customer.orders": "SELECT * FROM customers.orders WHERE delete_flag = 0 ORDER BY id DESC"
作成されるスナップショットでは、コネクターには | |
| デフォルトなし |
変更をキャプチャーする 対象とする スキーマの名前と一致する正規表現のコンマ区切りリスト (任意)。LogMiner 実装を使用する環境では、POSIX 正規表現のみを使用する必要があります。
スキーマの名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定した式は、スキーマ名に存在する可能性のある部分文字列とは一致しない、スキーマの名前文字列全体と照合されます。 | |
|
| コネクターがメタデータオブジェクトでテーブルおよび列のコメントを解析して公開するかどうかを指定するブール値。このオプションを有効にすると、メモリー使用量に影響を及ぼします。論理スキーマオブジェクトの数およびサイズは、Debezium コネクターによって消費されるメモリーの量に大きく影響し、それぞれに大きな文字列データを追加すると、非常に高価になる可能性があります。 | |
| デフォルトなし |
変更をキャプチャーする 対象としない スキーマの名前と一致する正規表現のコンマ区切りリスト (任意)。LogMiner 実装を使用する環境では、POSIX 正規表現のみを使用する必要があります。
スキーマの名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定した式は、スキーマ名に存在する可能性のある部分文字列とは一致しない、スキーマの名前文字列全体と照合されます。 | |
| デフォルトなし |
キャプチャーするテーブルの完全修飾テーブル識別子と一致する正規表現のコンマ区切りリスト (任意)。LogMiner 実装を使用する場合は、このプロパティーで POSIX 正規表現のみを使用してください。このプロパティーが設定されている場合、コネクターは指定されたテーブルからのみ変更をキャプチャします。各テーブルの識別子は、
テーブルの名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、テーブル名に存在する可能性のある部分文字列とは一致しない、テーブルの名前文字列全体と照合されます。 | |
| デフォルトなし |
監視から除外するテーブルの完全修飾テーブル識別子と一致する正規表現のコンマ区切りリスト (任意)。LogMiner 実装を使用する場合は、このプロパティーで POSIX 正規表現のみを使用してください。コネクターは除外リストに指定されていないテーブルからの変更をキャプチャーします。
テーブルの名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、テーブル名に存在する可能性のある部分文字列とは一致しない、テーブルの名前文字列全体と照合されます。 | |
| デフォルトなし |
変更イベントメッセージの値に含まれる必要がある列の完全修飾名と一致する正規表現のコンマ区切りリスト (任意)。LogMiner 実装を使用する環境では、POSIX 正規表現のみを使用する必要があります。列の完全修飾名は、
列の名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、列名に存在する可能性のある部分文字列とは一致しない、列の名前文字列全体と照合されます。 | |
| デフォルトなし |
変更イベントメッセージの値から除外される必要がある列の完全修飾名と一致する正規表現のコンマ区切りリスト (任意)。LogMiner 実装を使用する環境では、POSIX 正規表現のみを使用する必要があります。完全修飾の列名は、
列の名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、列名に存在する可能性のある部分文字列とは一致しない、列の名前文字列全体と照合されます。 | |
|
|
含まれる列に変更がない場合にメッセージの公開をスキップするかどうかを指定します。これは基本的に、含まれる列に変更がない場合、 | |
|
| 該当なし |
文字ベースの列の完全修飾名と一致する正規表現のコンマ区切りリスト (任意)。列の完全修飾名の形式は
仮名は、指定された hashAlgorithm と salt を適用すると得られるハッシュ化された値で構成されます。使用されるハッシュ関数に基づいて、参照整合性は保持され、列値は仮名に置き換えられます。サポートされるハッシュ関数は、Java Cryptography Architecture Standard Algorithm Name Documentation の MessageDigest セクション に記載されています。 column.mask.hash.SHA-256.with.salt.CzQMA0cB5K = inventory.orders.customerName, inventory.shipment.customerName
必要な場合は、仮名は自動的に列の長さに短縮されます。コネクター設定には、異なるハッシュアルゴリズムと salt を指定する複数のプロパティーを含めることができます。 |
| bytes |
バイナリー ( | |
| none |
コネクターで使用されるメッセージコンバータとの互換性のために、スキーマ名をどのように調整するかを指定します。設定可能:
| |
| none |
コネクターで使用されるメッセージコンバータとの互換性のために、フィールド名をどのように調整するかを指定します。設定可能:
詳細は、Avro の命名 を参照してください。 | |
|
|
コネクターが
| |
|
|
| |
|
| イベントの処理中にコネクターが例外に対応する方法を指定します。以下のオプションのいずれかを使用できます。
| |
|
| このコネクターの反復処理中に処理するイベントの各バッチの最大サイズを指定する正の整数値。 | |
|
|
ブロッキングキューが保持できるレコードの最大数を指定する正の整数値。Debezium はデータベースからストリームされたイベントを読み込む際、Kafka に書き込む前にブロッキングキューにイベントを配置します。ブロッキングキューは、コネクターが Kafka に書き込むよりも速くメッセージを取り込む場合、または Kafka が利用できなくなった場合に、データベースから変更イベントを読み込むためのバックプレッシャーを提供することができます。コネクターがオフセットを定期的に記録すると、キューに保持されるイベントは無視されます。 | |
|
|
ブロッキングキューの最大容量をバイト単位で指定する長整数値。デフォルトでは、ブロックキューにはボリューム制限は指定されません。キューが使用できるバイト数を指定するには、このプロパティーを正の long 値に設定します。 | |
|
| 各反復処理の実行中に新しい変更イベントが表示されるまでコネクターが待機する時間 (ミリ秒単位) を指定する正の整数値。 | |
|
| コネクターがデータベーススキーマの変更をトピック接頭辞と同じ名前の Kafka トピックに公開するかどうかを指定するブール値。コネクターは、データベース名が含まれるキーを持つ各スキーマの変更と、スキーマ更新を記述する JSON 構造の値を記録します。スキーマの変更を記録するこのメカニズムは、データベーススキーマ履歴への変更のコネクターの内部記録とは独立しています。 | |
|
| delete イベントの後に廃棄 (tombstone) イベントを行うかどうかを制御します。以下の値が可能です。
ソースレコードを削除すると、廃棄イベント (デフォルトの動作) により、Kafka が log compaction が有効なトピックで削除した列のキーを共有するイベントをすべて完全に削除できるようになります。 | |
| デフォルトなし | 指定のテーブルの Kafka トピックに公開する変更イベントレコードのカスタムメッセージキーを形成するためにコネクターが使用する列を指定する式のリスト。
デフォルトでは、Debezium はテーブルのプライマリーキー列を、出力するレコードのメッセージキーとして使用します。デフォルトの代わりに、またはプライマリーキーのないテーブルのキーを指定するには、1 つ以上の列をもとにカスタムメッセージキーを設定できます。 | |
| デフォルトなし |
文字ベースの列の完全修飾名と一致する正規表現のコンマ区切りリスト (任意)。一連の列の値をコネクターでマスクする場合 (たとえば、列に機密データが含まれている場合) は、このプロパティーを設定します。
列の完全修飾名は、 単一の設定で、異なる長さを持つ複数のプロパティーを指定できます。 | |
| デフォルトなし |
文字をアスタリスク ( | |
| デフォルトなし | 列のメタデータを表す追加パラメーターをコネクターに発行させたい列の完全修飾名に一致する、オプションのコンマ区切りの正規表現のリスト。このプロパティーが設定されている場合、コネクターは次のフィールドをイベントレコードのスキーマに追加します。
これらのパラメーターは、列の元の型名と長さ (可変幅型の場合) をそれぞれ伝達します。
列の完全修飾名は | |
| デフォルトなし | データベース内の列に対して定義されているデータ型の完全修飾名を指定する正規表現のオプションのコンマ区切りリスト。このプロパティーが設定されている場合、データ型が一致する列に対して、コネクターはスキーマに次の追加フィールドを含むイベントレコードを発行します。
これらのパラメーターは、列の元の型名と長さ (可変幅型の場合) をそれぞれ伝達します。
列の完全修飾名は、 Oracle 固有のデータ型名の一覧は、Oracle データ型マッピング を参照してください。 | |
|
|
コネクターがメッセージをハートビートトピックに送信する頻度を定義する間隔 (ミリ秒単位) を指定します。 | |
| デフォルトなし |
コネクターがハートビートメッセージを送信するときにコネクターがソースデータベースで実行するクエリーを指定します。 このプロパティーを設定し、ハートビートテーブルを作成してハートビートメッセージを受信することで、Debezium が高トラフィックデータベースと同じホスト上にある低トラフィックデータベースのオフセットの同期に失敗 する状況を解決することができます。コネクターは設定されたテーブルにレコードを挿入した後、低トラフィックデータベースから変更を受信し、データベースの SCN 変更を認識することができるので、オフセットをブローカーと同期させることができます。 | |
| デフォルトなし |
スナップショットを作成する前に、コネクターが起動してから待機する間隔をミリ秒単位で指定します。 | |
| 0 |
コネクターがスナップショットを完了した後、ストリーミングプロセスの開始を遅延する時間をミリ秒単位で指定します。遅延間隔を設定すると、スナップショットが完了した直後で、ストリーミングプロセスの開始前に障害が発生した場合に、コネクターがスナップショットを再開できないようにします。Kafka Connect ワーカーに設定されている | |
|
| スナップショットの実行中に各テーブルから 1 度に読み取る必要がある行の最大数を指定します。コネクターは、指定のサイズの複数のバッチでテーブルの内容を読み取ります。 | |
|
|
指定のクエリーのデータベースのラウンドトリップごとにフェッチされる行の数を指定します。 | |
|
|
詳細は、トランザクションメタデータ を参照してください。 | |
|
|
Oracle LogMiner がテーブル ID と列 ID を名前に解決するために特定のデータディクショナリーを構築および使用する方法を制御するマイニングストラテジーを指定します。
| |
|
|
Oracle LogMiner クエリーの構築方法を制御するマイニングクエリーモードを指定します。
| |
|
|
バッファータイプは、コネクターがトランザクションデータのバッファリングをどのように管理するかを制御します。 | |
|
| トランザクションがトランザクションバッファーに格納できるイベントの最大数。このしきい値を超えるイベントカウントを持つトランザクションは出力されず、破棄されます。デフォルトの動作では、トランザクションイベントのしきい値はありません。 | |
|
|
新しいセッションが使用される前に LogMiner セッションをアクティブに保つことができる最大期間 (ミリ秒単位)。 | |
|
|
JDBC 接続を切断して、ログの切り替え時、またはマイニングセッションの最大存続期間のしきい値に到達したときに再開するかを指定します。 | |
|
| このコネクターが redo/archive ログから読み込もうとする最小 SCN 間隔サイズ。 | |
|
| このコネクターが REDO/ARCHIVE ログから読み取るときに使用する最大 SCN インターバルサイズです。 | |
|
| コネクターが redo/archive ログから読み取るために使用する間隔を増減する量。 | |
|
| コネクターが REDO/ARCHIVE ログからデータを読み取る際に使用する開始 SCN 間隔サイズ。これは、バッチサイズを調整するための手段としても機能します。現在の SCN とバッチの開始/終了 SCN の差がこの値よりも大きい場合、バッチサイズが増減されます。 | |
|
| redo/archive ログからデータを読み込んだ後、再びデータの読み込みを開始するまでのコネクターのスリープ時間の最小値です。値はミリ秒単位です。 | |
|
| redo/archive ログからデータを読み込んだ後、再びデータの読み込みを開始するまでのコネクターイルのスリープ時間の最大値。値はミリ秒単位です。 | |
|
| redo/archive ログからデータを読み込んだ後、再びデータの読み込みを開始するまでのコネクターのスリープ時間の開始値。値はミリ秒単位です。 | |
|
| logminer からデータを読み取る際に、コネクターが最適なスリープ時間を調整するために使用する時間の最大値を上下させる。値はミリ秒単位です。 | |
|
|
SYSDATE からアーカイブログを採掘するまでの過去の時間数です。デフォルトの設定 ( | |
|
|
コネクターが変更をアーカイブログだけから挽くのか、オンライン REDO ログとアーカイブログを組み合わせて挽くのか (デフォルト) を制御します。 | |
|
|
開始システムの変更番号がアーカイブログにあるかどうかを判断するためのポーリングの間に、コネクターがスリープするミリ秒数です。 | |
|
|
正の整数値で、redo ログの切り替えの間に長時間実行されるトランザクションを保持する時間 (ミリ秒) を指定します。 デフォルトでは、LogMiner アダプターは、実行中のすべてのトランザクションのインメモリーバッファーを維持します。トランザクションの一部となるすべての DML 操作は、コミットまたはロールバックが検出されるまでバッファーされるため、そのバッファーがオーバーフローしないように長時間実行されるトランザクションを回避する必要があります。設定されたこの値を超えるトランザクションは完全に破棄され、コネクターはトランザクションに含まれていた操作のメッセージを出力しません。 | |
| デフォルトなし |
LogMiner でアーカイブログをマイニングする際に使用する、設定された Oracle のアーカイブ先を指定します。 | |
| デフォルトなし | LogMiner クエリーから包含するデータベースユーザーのリスト。キャプチャープロセスで、指定したユーザーからの変更を含めるようにするには、このプロパティーを設定すると便利です。 | |
| デフォルトなし | LogMiner クエリーから除外するデータベースユーザーのリスト。特定のユーザーが行った変更を常にキャプチャプロセスから除外したい場合は、このプロパティーを設定すると便利です。 | |
|
|
SCN ギャップがあるかどうかを判断するために、コネクターが現在の SCN 値と前回の SCN 値の差と比較する値を指定します。SCN 値の差が指定された値より大きく、時間差が | |
|
|
SCN ギャップがあるかどうかを判断するために、コネクターが現在の SCN タイムスタンプと前回の SCN タイムスタンプの差と比較する値をミリ秒単位で指定します。タイムスタンプの差が指定された値よりも小さく、SCN デルタが指定された値よりも大きい場合、SCN ギャップが検出され、設定された最大バッチよりも大きいマイニングウィンドウを使用します。 | |
|
|
Oracle LogWriter Buffer (LGWR) の REDO ログへのフラッシュを調整するフラッシュテーブルの名前を指定します。この名前は | |
|
|
REDO ログで構築された SQL ステートメントが | |
|
|
ラージオブジェクト (CLOB または BLOB) の列値を変更イベントで出力するかどうかを制御します。 | |
|
| コネクターが提供する定数を指定して、元の値がデータベースによって提供されておらず、また変更されていない値であることを示します。 | |
| デフォルトなし | Oracle Real Application Clusters (RAC) ノードのホスト名またはアドレスをコンマで区切って入力してください。このフィールドは、Oracle RAC のデプロイメントとの互換性を有効にするために必要です。 RAC ノードのリストを以下のいずれかの方法で指定します。
| |
|
| ストリーミング中にコネクターがスキップする操作タイプをコンマで区切ったリスト。以下のタイプの操作をスキップするようにコネクターを設定することができます。
デフォルトでは、省略操作のみがスキップされます。 | |
| デフォルト値なし |
シグナルをコネクターへの送信に使用されるデータコレクションの完全修飾名 。このプロパティーを Oracle プラグインデータベース (PDB) で使用する場合、その値にはルートデータベースの名前を設定します。 | |
| source | コネクターに対して有効な信号チャネル名のリスト。デフォルトでは、以下のチャネルが利用可能です。
| |
| デフォルトなし | コネクターに対して有効になっている通知チャネル名のリスト。デフォルトでは、以下のチャネルが利用可能です。
| |
|
| 増分スナップショットのチャンクの実行中にコネクターがメモリーを取得して読み取る行の最大数。スナップショットは、サイズが大きいスナップショットの場合にはクエリーが少なくなるため、チャンクサイズを増やすと効率が上がります。ただし、チャンクサイズが大きい場合には、スナップショットデータのバッファーにより多くのメモリーが必要になります。チャンクサイズは、環境で最適なパフォーマンスを発揮できる値に、調整します。 | |
|
|
増分スナップショットによってキャプチャーされ、ストリーミングの再開後に再キャプチャーされる可能性のあるイベントを重複排除するために、コネクターが増分スナップショット中に使用するウォーターマークメカニズムを指定します。
| |
|
|
データ変更、スキーマ変更、トランザクション、ハートビートイベントなどのトピック名を決定するために使用する TopicNamingStrategy クラスの名前。デフォルトは | |
|
|
トピック名の区切り文字を指定します。デフォルトは | |
|
| トピック名を保持するために使用されるサイズ (bounded concurrent hash map)。このキャッシュは、与えられたデータコレクションに対応するトピック名を決定するのに役立つ。 | |
|
|
コネクターがハートビートメッセージを送信するトピックの名前を制御します。トピック名のパターンは、 | |
|
|
コネクターがトランザクションのメタデータメッセージを送信するトピックの名前を制御します。トピック名のパターンは、 | |
|
|
初期スナップショットを実行するときにコネクターが使用するスレッドの数を指定します。並列初期スナップショットを有効にするには、プロパティーを 1 より大きい値に設定します。並列初期スナップショットでは、コネクターは複数のテーブルを同時に処理します。 注記
並行初期スナップショットを有効にすると、各テーブルのスナップショットを実行するスレッドの処理完了時間がそれぞれ異なる場合があります。あるテーブルのスナップショットが、他のテーブルよりも完了するまでに大幅に時間がかかる場合、作業を終えたスレッドは待機状態になります。一部の環境では、ロードバランサーやファイアウォールなどのネットワークデバイスは、長時間アイドル状態のままの接続を終了します。スナップショットが完了すると、コネクターがすべてのスナップショットデータを正常に送信した場合でも、コネクターは接続を閉じることができず、例外となり、スナップショットが不完全になります。 | |
|
|
データベースエラーが発生したときにテーブルのスナップショットを再試行する回数を指定します。この設定プロパティーは現在、 | |
|
|
コンテキスト情報を提供するメタデータを追加して、MBean オブジェクト名をカスタマイズするタグを定義します。キーと値のペアのコンマ区切りリストを指定します。各キーは MBean オブジェクト名のタグを表し、対応する値はキーの値を表します。たとえば、 コネクターは、指定されたタグを基本 MBean オブジェクト名に追加します。タグは、メトリクスデータを整理および分類するのに役立ちます。特定のアプリケーションインスタンス、環境、リージョン、バージョンなどを識別するためのタグを定義できます。詳細は、カスタマイズされた MBean 名 を参照してください。 | |
|
|
接続エラーなど、再試行可能なエラーが発生する操作の後に、コネクターがどのように応答するかを指定します。
| |
|
| クエリーの実行を待機する時間 (ミリ秒単位)。デフォルトは 600 秒 (600,000 ミリ秒) です。ゼロは制限がないことを意味します。 |
Debezium Oracle コネクターデータベーススキーマ履歴の設定プロパティー
Debezium には、コネクターがスキーマ履歴トピックと対話する方法を制御する schema.history.internal.* プロパティーのセットが含まれています。
以下の表は、Debezium コネクターを設定するための schema.history.internal プロパティーを説明しています。
| プロパティー | デフォルト | 説明 |
|---|---|---|
| デフォルトなし | コネクターがデータベーススキーマの履歴を保存する Kafka トピックの完全名。 | |
| デフォルトなし | Kafka クラスターへの最初の接続を確立するためにコネクターが使用するホストとポートのペアのリスト。このコネクションは、コネクターによって以前に保存されたデータベーススキーマ履歴の取得や、ソースデータベースから読み取られる各 DDL ステートメントの書き込みに使用されます。各ペアは、Kafka Connect プロセスによって使用される同じ Kafka クラスターを示す必要があります。 | |
|
| 永続化されたデータのポーリングが行われている間にコネクターが起動/回復を待つ最大時間 (ミリ秒単位) を指定する整数値。デフォルトは 100 ミリ秒です。 | |
|
| Kafka 管理クライアントを使用してクラスター情報を取得する際に、コネクターが待機すべき最大ミリ秒数を指定する整数値です。 | |
|
| Kafka 管理クライアントを使用して kafka 履歴トピックを作成する間、コネクターが待機する最大ミリ秒数を指定する整数値。 | |
|
|
エラーでコネクターのリカバリーが失敗する前に、コネクターが永続化された履歴データの読み取りを試行する最大回数。データが受信されなかった場合に最大待機する時間は、 | |
|
|
コネクターが不正または不明なデータベースのステートメントを無視するかどうか、または人が問題を修正するために処理を停止するかどうかを指定するブール値。安全なデフォルトは | |
|
|
コネクターがスキーマまたはデータベース内のすべてのテーブルからスキーマ構造を記録するか、キャプチャー対象に指定されたテーブルのみからスキーマ構造を記録するかを指定するブール値。
| |
|
|
コネクターがデータベースインスタンス内のすべての論理データベースのスキーマ構造を記録するかどうかを指定するブール値。
|
パススルー Oracle コネクター設定プロパティー
コネクターは pass-through プロパティーをサポートしており、これにより Debezium は Apache Kafka プロデューサーとコンシューマーの動作を微調整するためのカスタム設定オプションを指定できます。Kafka プロデューサーとコンシューマーの全設定プロパティーの詳細は、Kafka ドキュメント を参照してください。
プロデューサーとコンシューマーのクライアントがスキーマ履歴トピックと対話する方法を設定するための Pass-through プロパティー
Debezium は、データベーススキーマ履歴トピックへのスキーマ変更を記述するために Apache Kafka プロデューサーに依存しています。同様に、コネクターが起動すると、データベーススキーマ履歴トピックから読み取る Kafka コンシューマーに依存します。schema.history.internal.producer.* および schema.history.internal.consumer.* 接頭辞で始まるパススルー設定プロパティーのセットに値を割り当てて、Kafka プロデューサーおよびコンシューマークライアントの設定を定義します。パススループロデューサーおよびコンシューマーデータベーススキーマ履歴プロパティーは、以下の例のように Kafka ブローカーとのこれらのクライアントの接続をセキュアにする方法など、さまざまな動作を制御します。
Debezium は、プロパティーを Kafka クライアントに渡す前に、プロパティー名から接頭辞を削除します。
Kafka プロデューサー設定プロパティー と Kafka コンシューマー設定プロパティー の詳細は、Apache Kafka ドキュメントを参照してください。
Oracle コネクターが Kafka シグナリングトピックと対話する方法を設定するためのパススループロパティー
Debezium は、コネクターが Kafka シグナルトピックと対話する方法を制御する signal.* プロパティーのセットを提供します。
以下の表は、Kafka signal プロパティーを説明しています。
| プロパティー | デフォルト | 説明 |
|---|---|---|
| <topic.prefix>-signal | コネクターがアドホックシグナルについて監視する Kafka トピックの名前。 注記 トピックの自動作成 が無効になっている場合は、必要なシグナリングトピックを手動で作成する必要があります。シグナルの順序を維持するには、シグナリングトピックが必要です。シグナリングトピックには単一のパーティションが必要です。 | |
| kafka-signal | Kafka コンシューマーによって使用されるグループ ID の名前。 | |
| デフォルトなし | コネクターが Kafka クラスターへの初期接続を確立するために使用するホストとポートのペアのリスト。各ペアは、Debezium Kafka Connect プロセスによって使用される Kafka クラスターを参照します。 | |
|
| コネクターが信号をポーリングするときに待機する最大ミリ秒数を指定する整数値。 |
シグナリングチャネルの Kafka コンシューマークライアントを設定するためのパススループロパティー
Debezium コネクターでは、Kafka コンシューマーのパススルー設定が可能です。パススルーシグナルのプロパティーは、接頭辞 signals.consumer.* で始まります。たとえば、コネクターは signal.consumer.security.protocol=SSL などのプロパティーを Kafka コンシューマーに渡します。
Debezium は、プロパティーを Kafka シグナルコンシューマーに渡す前に、プロパティーから接頭辞を削除します。
Oracle コネクター sink 通知チャネルを設定するためのパススループロパティー
次の表では、Debezium sink notification チャネルの設定に使用できるプロパティーについて説明します。
| プロパティー | デフォルト | 説明 |
|---|---|---|
| デフォルトなし |
Debezium から通知を受信するトピックの名前。このプロパティーは、有効な通知チャネルの 1 つとして |
Debezium コネクターのパススルーデータベースドライバー設定プロパティー
Debezium コネクターでは、データベースドライバーのパススルー設定が可能です。パススルーデータベースプロパティーは接頭辞 driver.* で始まります。たとえば、コネクターは driver.foobar=false などのプロパティーを JDBC URL に渡します。
Debezium は、プロパティーをデータベースドライバーに渡す前に、プロパティーから接頭辞を削除します。
2.5.7. Debezium Oracle コネクターのパフォーマンスの監視 リンクのコピーリンクがクリップボードにコピーされました!
Debezium Oracle コネクターは、Apache Zookeeper、Apache Kafka、および Kafka Connect に含まれる JMX メトリクスの組み込みサポートに加えて、3 種類のメトリクスを提供します。
- スナップショットの実行時にコネクターを監視するための、スナップショットメトリクス。
- 変更イベントの処理時にコネクターを監視するための、ストリーミングメトリクス。
- コネクターのスキーマ履歴の状態を監視するための、スキーマ履歴メトリクス。
JMX 経由でこれらのメトリクスを公開する方法の詳細は、監視に関するドキュメント を参照してください。
2.5.7.1. Oracle コネクタースナップショットおよびストリーミング MBean オブジェクトのカスタマイズされた名前 リンクのコピーリンクがクリップボードにコピーされました!
Debezium コネクターは、コネクターの MBean 名を介してメトリクスを公開します。これらのメトリクスは各コネクターインスタンスに固有であり、コネクターのスナップショット、ストリーミング、およびスキーマ履歴プロセスの動作に関するデータを提供します。
デフォルトでは、正しく設定されたコネクターをデプロイすると、Debezium はさまざまなコネクターメトリクスごとに一意の MBean 名を生成します。コネクタープロセスのメトリクスを表示するには、MBean を監視するように可観測性スタックを設定します。ただし、これらのデフォルトの MBean 名はコネクター設定に依存しており、設定の変更によって MBean 名が変更される場合があります。MBean 名を変更すると、コネクターインスタンスと MBean 間のリンクが切断され、監視アクティビティーが中断されます。このシナリオでは、監視を再開するには、新しい MBean 名を使用するように監視スタックを再設定する必要があります。
MBean 名の変更が原因で監視が中断されないように、カスタムメトリクスタグを設定できます。カスタムメトリクスを設定するには、コネクター設定に custom.metric.tags プロパティーを追加します。このプロパティーは、各キーが MBean オブジェクト名のタグを表し、対応する値がそのタグの値を表すキーと値のペアを受け入れます。たとえば、k1=v1,k2=v2 です。Debezium は、指定されたタグをコネクターの MBean 名に追加します。
コネクターの custom.metric.tags プロパティーを設定した後、指定されたタグに関連付けられたメトリクスを取得するように監視スタックを設定できます。可観測性スタックは、変更可能な MBean 名ではなく、指定されたタグを使用してコネクターを一意に識別します。その後、Debezium が MBean 名の構築方法を再定義したり、コネクター設定の topic.prefix が変更されたりしても、メトリクススクレイプタスクは指定されたタグパターンを使用してコネクターを識別するため、メトリクスの収集は中断されません。
カスタムタグを使用するさらなる利点は、データパイプラインのアーキテクチャーを反映するタグを使用できるため、運用上のニーズに合った方法でメトリクスを整理できることです。たとえば、コネクターアクティビティーのタイプ、アプリケーションコンテキスト、またはデータソースを宣言する値を持つタグを指定できます (例: db1-streaming-for-application-abc)。複数のキーと値のペアを指定すると、指定されたすべてのペアがコネクターの MBean 名に追加されます。
次の例は、タグがデフォルトの MBean 名を変更する方法を示しています。
例2.39 カスタムタグがコネクター MBean 名を変更する方法
デフォルトでは、Oracle コネクターはストリーミングメトリクスに次の MBean 名を使用します。
debezium.oracle:type=connector-metrics,context=streaming,server=<topic.prefix>
debezium.oracle:type=connector-metrics,context=streaming,server=<topic.prefix>
custom.metric.tags の値を database=salesdb-streaming,table=inventory に設定すると、Debezium は次のカスタム MBean 名を生成します。
debezium.oracle:type=connector-metrics,context=streaming,server=<topic.prefix>,database=salesdb-streaming,table=inventory
debezium.oracle:type=connector-metrics,context=streaming,server=<topic.prefix>,database=salesdb-streaming,table=inventory
2.5.7.2. Debezium Oracle コネクターのスナップショットメトリクス リンクのコピーリンクがクリップボードにコピーされました!
MBean は debezium.oracle:type=connector-metrics,context=snapshot,server=<topic.prefix> です。
スナップショット操作がアクティブでない場合や、最後のコネクターの起動後にスナップショットの作成が発生した場合に、スナップショットメトリクスは公開されません。
次の表に、使用可能なスナップショットメトリクスを示します。
| 属性 | タイプ | 説明 |
|---|---|---|
|
| コネクターが読み取りした最後のスナップショットイベント。 | |
|
| コネクターが最新のイベントを読み取りおよび処理してからの経過時間 (ミリ秒単位)。 | |
|
| 前回の開始またはリセット以降にコネクターで確認されたイベントの合計数。 | |
|
| コネクターに設定された include/exclude リストのフィルタリングルールによってフィルターされたイベントの数。 | |
|
| コネクターによって取得されるテーブルのリスト。 | |
|
| snapshotter とメインの Kafka Connect ループの間でイベントを渡すために使用されるキューの長さ。 | |
|
| snapshotter とメインの Kafka Connect ループの間でイベントを渡すために使用されるキューの空き容量。 | |
|
| スナップショットに含まれているテーブルの合計数。 | |
|
| スナップショットによってまだコピーされていないテーブルの数。 | |
|
| スナップショットが起動されたかどうか。 | |
|
| スナップショットが一時停止されたかどうか。 | |
|
| スナップショットが中断されたかどうか。 | |
|
| スナップショットが完了したかどうか。 | |
|
| スナップショットが完了したかどうかに関わらず、これまでスナップショットにかかった時間 (秒単位)。スナップショットが一時停止された時間も含まれます。 | |
|
| スナップショットが一時停止された合計秒数。スナップショットが数回一時停止された場合は、一時停止時間が加算されます。 | |
|
| スナップショットの各テーブルに対してスキャンされる行数が含まれるマップ。テーブルは、処理中に増分がマップに追加されます。スキャンされた 10,000 行ごとに、テーブルの完成時に更新されます。 | |
|
|
キューの最大バッファー (バイト単位)。このメトリクスは、 | |
|
| キュー内のレコードの現在の容量 (バイト単位)。 |
コネクターは、増分スナップショットの実行時に、以下の追加のスナップショットメトリクスも提供します。
2.5.7.3. Debezium Oracle コネクターのストリーミングメトリクス リンクのコピーリンクがクリップボードにコピーされました!
MBean は debezium.oracle:type=connector-metrics,context=streaming,server=<topic.prefix> です。
以下の表は、利用可能なストリーミングメトリクスのリストです。
| 属性 | タイプ | 説明 |
|---|---|---|
|
| コネクターが読み取られた最後のストリーミングイベント。 | |
|
| コネクターが最新のイベントを読み取りおよび処理してからの経過時間 (ミリ秒単位)。 | |
|
| コネクターを最後に起動してから、またはメトリクスをリセットしてから、ソースデータベースによって報告されたデータ変更イベントの合計数。Debezium が処理するデータ変更ワークロードを表します。 | |
|
| コネクターを最後に起動してから、またはメトリクスをリセットしてから、コネクターによって処理された作成イベントの合計数。 | |
|
| コネクターを最後に起動してから、またはメトリクスをリセットしてから、コネクターによって処理された更新イベントの合計数。 | |
|
| コネクターを最後に起動してから、またはメトリクスをリセットしてから、コネクターによって処理された削除イベントの合計数。 | |
|
| コネクターに設定された include/exclude リストのフィルタリングルールによってフィルターされたイベントの数。 | |
|
| コネクターによって取得されるテーブルのリスト。 | |
|
| ストリーマーとメイン Kafka Connect ループの間でイベントを渡すために使用されるキューの長さ。 | |
|
| ストリーマーとメインの Kafka Connect ループの間でイベントを渡すために使用されるキューの空き容量。 | |
|
| コネクターが現在データベースサーバーに接続されているかどうかを示すフラグ。 | |
|
| 最後の変更イベントのタイムスタンプとそれを処理するコネクターとの間の期間 (ミリ秒単位)。この値には、データベースサーバーとコネクターが実行されているマシンのクロックの差が組み込まれます。 | |
|
| コミットされた処理済みトランザクションの数。 | |
|
| 最後に受信したイベントの位置。 | |
|
| 最後に処理されたトランザクションのトランザクション識別子。 | |
|
|
キューの最大バッファー (バイト単位)。このメトリクスは、 | |
|
| キュー内のレコードの現在の容量 (バイト単位)。 |
Debezium Oracle コネクターは、以下のストリーミングメトリクスも追加で提供します。
| 属性 | タイプ | 説明 |
|---|---|---|
|
| 処理された最新のシステム変更番号です。 | |
|
| トランザクションバッファー内の最も古いシステム変更番号。 | |
|
|
最も古いシステム変更番号の経過時間 (ミリ秒単位)。バッファーが空の場合、値は | |
|
| トランザクションバッファーからの最後のコミットされたシステム変更番号。 | |
|
| 現在、コネクターのオフセットに書き込まれているシステム変更番号。 | |
|
| 現在採掘されているログファイルの配列。 | |
|
| 任意の LogMiner セッションに指定されたログの最小数です。 | |
|
| 任意の LogMiner セッションに指定されたログの最大数。 | |
|
|
| |
|
| 最終日について、データベースがログスイッチを実行した回数。 | |
|
| 最後の LogMiner セッションクエリーで確認される DML 操作の数。 | |
|
| 単一の LogMiner セッションクエリーの処理中に確認される DML 操作の最大数。 | |
|
| 確認された DML 操作の合計数。 | |
|
| 実行された LogMiner セッションクエリー (別名バッチ) の合計数。 | |
|
| 最後の LogMiner セッションクエリーのフェッチ時間 (ミリ秒単位)。 | |
|
| 任意の LogMiner セッションクエリーのフェッチの最大時間 (ミリ秒単位)。 | |
|
| 最後の LogMiner クエリーバッチ処理の時間 (ミリ秒単位)。 | |
|
| DML イベント SQL ステートメントの解析に費やした時間 (ミリ秒単位)。 | |
|
| 最後の LogMiner セッションを開始する期間 (ミリ秒単位)。 | |
|
| LogMiner セッションを開始する最長期間 (ミリ秒単位)。 | |
|
| コネクターが LogMiner セッションを開始するのに費やす合計期間 (ミリ秒単位)。 | |
|
| 単一の LogMiner セッションからの結果を処理するのに費やされた最小時間 (ミリ秒単位)。 | |
|
| 単一の LogMiner セッションからの結果を処理するのに費やされた最大時間 (ミリ秒単位)。 | |
|
| LogMiner セッションからの結果を処理するのに費やされた合計時間 (ミリ秒単位)。 | |
|
| ログマイニングビューからの処理する次の行を取得する JDBC ドライバーによって費やされた合計期間 (ミリ秒単位)。 | |
|
| すべてのセッションでログマイニングビューから処理される行の合計数。 | |
|
| データベースのラウンドトリップごとにログのマイニングクエリーによって取得されるエントリーの数。 | |
|
| ログマイニングビューから結果の別のバッチを取得する前にコネクターがスリープ状態になる期間 (ミリ秒単位)。 | |
|
| ログマイニングビューから処理される行/秒の最大数。 | |
|
| ログマイニングから処理される行/秒の平均数。 | |
|
| 最後のバッチでログマイニングビューから処理された平均行数/秒。 | |
|
| 検出された接続問題の数。 | |
|
|
トランザクションがコミットやロールバックされずにコネクターのインメモリーバッファーに保持されてから破棄されるまでの時間数。詳細は、 | |
|
| トランザクションバッファーの現在のアクティブなトランザクションの数。 | |
|
| トランザクションバッファーのコミットされたトランザクションの数。 | |
|
|
サイズが | |
|
| トランザクションバッファーのロールバックされたトランザクションの数。 | |
|
| コミットされたトランザクションでロールバックされたイベントの数。これは、ほとんどのユースケースで制約違反を意味します。 | |
|
| トランザクションバッファーのコミットされた 1 秒あたりのトランザクションの平均数。 | |
|
| トランザクションバッファーに登録された DML 操作の数。 | |
|
| トランザクションログに変更が発生した時刻とそれがトランザクションバッファーに追加された時刻の差 (ミリ秒単位)。 | |
|
| トランザクションログに変更が発生した時刻とそれがトランザクションバッファーに追加された時刻の差の最大値 (ミリ秒単位)。 | |
|
| トランザクションログに変更が発生した時刻とそれがトランザクションバッファーに追加された時刻の差の最小値 (ミリ秒単位)。 | |
|
|
古いためにトランザクションバッファーから削除された、最も新しい放棄されたトランザクション識別子の配列。詳細は、 | |
|
| abandoned transactions リストに含まれる現在のエントリー数。 | |
|
| マイニングされトランザクションバッファーにロールバックされたトランザクション識別子の配列。 | |
|
| 最後のトランザクションバッファーコミット操作の期間 (ミリ秒単位)。 | |
|
| 最長のトランザクションバッファーコミット操作の期間 (ミリ秒単位)。 | |
|
| 検出されたエラーの数。 | |
|
| 検出された警告の数。 | |
|
|
システム変更番号の繰り上げチェックが行われ、変更されなかった回数。高い値は、長時間稼働するトランザクションが進行中で、コネクターのオフセットに最近処理されたシステム変更番号をフラッシュするのを妨げていることを示す場合があります。最適な条件であれば、 | |
|
|
検出されたものの、DDL パーサーで解析できなかった DDL レコードの数です。これは常に、 | |
|
| 現在のマイニングセッションのユーザーグローバルエリア (UGA) のメモリー消費量 (単位: バイト)。 | |
|
| すべてのマイニングセッションでの最大のユーザーグローバルエリア (UGA) のメモリー消費量 (バイト)。 | |
|
| 現在のマイニングセッションのプロセスグローバルエリア (PGA) のメモリー消費量 (単位: バイト)。 | |
|
| 全マイニングセッションのプロセスグローバルエリア (PGA) の最大メモリー消費量 (バイト)。 |
2.5.7.4. Debezium Oracle コネクターのスキーマ履歴メトリクス リンクのコピーリンクがクリップボードにコピーされました!
MBean は debezium.oracle:type=connector-metrics,context=schema-history,server=<topic.prefix> です。
以下の表は、利用可能なスキーマ履歴メトリクスのリストです。
| 属性 | タイプ | 説明 |
|---|---|---|
|
|
データベーススキーマ履歴の状態を示す | |
|
| リカバリーが開始された時点のエポック秒の時間。 | |
|
| リカバリーフェーズ中に読み取られた変更の数。 | |
|
| リカバリーおよびランタイム中に適用されるスキーマ変更の合計数。 | |
|
| 最後の変更が履歴ストアから復元された時点からの経過時間 (ミリ秒単位)。 | |
|
| 最後の変更が適用された時点からの経過時間 (ミリ秒単位)。 | |
|
| 履歴ストアから復元された最後の変更の文字列表現。 | |
|
| 最後に適用された変更の文字列表現。 |
2.5.8. Debezium での Oracle XStream データベースの使用 (開発者プレビュー) リンクのコピーリンクがクリップボードにコピーされました!
Debezium Oracle コネクターは、デフォルトでネイティブの Oracle LogMiner を使用して変更を取り込みます。代わりに Oracle XStream を使用するようにコネクターを切り替えできます。Oracle XStream を使用するようにコネクターを設定するには、LogMiner で使用するものとは異なる特定のデータベースおよびコネクター設定を適用する必要があります。
XStream での Debezium Oracle コネクターの使用は、開発者プレビュー機能です。開発者プレビュー機能は、Red Hat ではいかなる形でもサポートされていません。また、機能的には完全ではなく、実稼働環境に対応していません。開発者プレビューのソフトウェアを実稼働ワークロードまたはビジネスクリティカルなワークロードには使用しないでください。開発者プレビューソフトウェアは、今後 Red Hat 製品サービスとして追加される可能性のある製品ソフトウェアを前もって早期に利用できます。お客様はこのソフトウェアを使用して機能をテストし、開発プロセス中にフィードバックを提供できます。このソフトウェアにはドキュメントが存在しない可能性があり、変更または削除される可能性があります。また、限定的なテストしか行われていません。Red Hat は、関連する SLA なしに、開発者プレビューソフトウェアに対するフィードバックを送信する手段を提供する場合があります。
Red Hat 開発者プレビューソフトウェアのサポート範囲の詳細は、開発者プレビューのサポート範囲 を参照してください。
前提条件
- XStream API を使用するには、GoldenGate 製品のライセンスが必要です。GoldenGate をインストールする必要はありません。
2.5.8.1. Debezium で使用する Oracle XStream データベースの準備 リンクのコピーリンクがクリップボードにコピーされました!
Oracle XStream に必要な設定
さらに、変更したデータベースの行の before の状態をデータ変更でキャプチャーされるように、キャプチャーしたテーブルとデータベース向けに、補完用のロギングを有効にしておく必要があります。以下は、特定のテーブルでこの設定を行う方法を示しています。これは、Oracle の redo ログでキャプチャーする情報量を最小限に抑えるのに適しています。
ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
2.5.8.2. Debezium Oracle コネクターの XStream ユーザーの作成 リンクのコピーリンクがクリップボードにコピーされました!
Debezium Oracle コネクターでは、コネクターが変更イベントをキャプチャーできるように、ユーザーアカウントに特定の権限がセットアップされている必要があります。以下の例では、マルチテナントデータベースモデルでユーザー設定を作成する方法を説明します。
2.5.8.2.1. Debezium Oracle コネクターの XStream 管理者の作成 リンクのコピーリンクがクリップボードにコピーされました!
2.5.8.2.2. Debezium Oracle コネクターの XStream ユーザーの作成 リンクのコピーリンクがクリップボードにコピーされました!
2.5.8.3. XStream アウトバウンドサーバーの作成 リンクのコピーリンクがクリップボードにコピーされました!
XStream アウトバウンドサーバー を作成します。
XStream アウトバウンドサーバーの作成
プラグ可能なデータベースから変更をキャプチャーするように XStream アウトバウンドサーバーをセットアップする場合は、source_container_name パラメーターの値としてプラグ可能なデータベース名を指定します。
XStream アウトバウンドサーバーに接続するための XStream ユーザーアカウントを設定する
単一の XStream アウトバウンドサーバーを複数の Debezium Oracle コネクターで共有することはできません。各コネクターには、一意の XStream Outbound コネクターを設定する必要があります。
2.5.8.4. XStream アダプターを使用するための Debezium の設定 リンクのコピーリンクがクリップボードにコピーされました!
デフォルトでは、Debezium は Oracle LogMiner を使用して Oracle から変更イベントを取り込みます。コネクター設定を調整して、コネクターが Oracle XStream アダプターを使用できるように有効化できます。
次の設定例では、プロパティー database.connection.adapter および database.out.server.name を追加して、コネクターが XStream API 実装を使用できるようにします。
2.5.8.5. Oracle JDBC ドライバーおよび XStream API ファイルの取得 リンクのコピーリンクがクリップボードにコピーされました!
Debezium Oracle コネクターでは、Oracle データベースに接続するために Oracle JDBC ドライバー (ojdbc11.jar) が必要です。コネクターが XStream を使用してデータベースにアクセスする場合は、XStream API (xstreams.jar) も必要です。ライセンス要件により、Debezium はこれらのファイルを Oracle コネクターアーカイブに含めることができません。ただし、必要なファイルは、Oracle Instant Client の一部として無料ダウンロードできます。次の手順では、Oracle Instant Client をダウンロードして必要なファイルを抽出する方法について説明します。
手順
- ブラウザーから、お使いのオペレーティングシステム用の Oracle Instant Client パッケージ をダウンロードします。
アーカイブを展開してから、
instantclient_<version>ディレクトリーを開きます。以下に例を示します。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow -
ojdbc11.jarファイルとxstreams.jarファイルをコピーし、<kafka_home>/libsディレクトリー (例:kafka/libs)に追加します。 環境変数
LD_LIBRARY_PATHを作成し、その値を Instant Client ディレクトリーへのパスに設定します。以下に例を示します。LD_LIBRARY_PATH=/path/to/instant_client/
LD_LIBRARY_PATH=/path/to/instant_client/Copy to Clipboard Copied! Toggle word wrap Toggle overflow
2.5.8.6. XStream コネクターのプロパティー リンクのコピーリンクがクリップボードにコピーされました!
デフォルト値がない場合は、XStream を使用する際に以下の設定プロパティーが 必要 です。
| プロパティー | デフォルト | 説明 |
| デフォルトなし | データベースに設定されている XStream アウトバウンドサーバーの名前。 |
2.5.8.7. Oracle Xstream と DBMS_LOB パッケージ リンクのコピーリンクがクリップボードにコピーされました!
Oracle は、Blob、CLOB、および NCLOB 列を操作するためのプログラムのコレクションで構成される DBMS_LOB と呼ばれるデータベースパッケージを提供します。これらのプログラムのほとんどは LOB 列全体を操作しますが、WRITEAPPEND という 1 つのプログラムは LOB データバッファーのサブセットを操作できます。
XStream を使用する場合、WRITEAPPEND は、プログラムの呼び出しごとに論理変更レコード (LCR) イベントを発行します。これらの LCR イベントは、Oracle LogMiner アダプターを使用する場合のように、1 つの変更イベントに統合されることはありません。その結果、トピックのコンシューマーは部分的な列値を持つイベントを受信する可能性があります。
2.5.9. Oracle コネクターのよくある質問 リンクのコピーリンクがクリップボードにコピーされました!
- Oracle 11g はサポートされますか ?
- Oracle 11g はサポート対象外ですが、ベストエフォートベースで Oracle 11g との後方互換性を確保しています。Red Hat は、コミュニティーを頼りに、Oracle 11g との互換性に関する懸念点を伝え、リグレッションが特定された場合にバグ修正を提供します。
- Oracle LogMiner は非推奨となりましたか ?
- いいえ、Oracle は、Oracle 12c で Oracle LogMiner を使用した継続的なマイニングオプションのみを非推奨にし、Oracle 19c からそのオプションを削除しました。Debezium Oracle コネクターは、このオプションに依存せずに機能するため、新しいバージョンの Oracle で問題なく安全に使用できます。
- オフセットの位置を変更するにはどうすればよいですか ?
Debezium Oracle コネクターは、
scnという名前のフィールドとcommit_scnという別の名前の 2 つの重要な値をオフセットに保持します。scnフィールドは、コネクターが変更をキャプチャーするときに使用する low-watermark の開始位置を表す文字列です。-
コネクターオフセットを含むトピックの名前を見つけます。これは、
offset.storage.topic設定プロパティーとして指定された値に基づいて設定されます。 コネクターの最後のオフセット、格納先のキーを見つけ、そのオフセットの保存に使用するパーティションを特定します。これは、Kafka ブローカーのインストールによって提供される
kafkacatユーティリティースクリプトを使用して実行できます。以下に例を示します。kafkacat -b localhost -C -t my_connect_offsets -f 'Partition(%p) %k %s\n' Partition(11) ["inventory-connector",{"server":"server1"}] {"scn":"324567897", "commit_scn":"324567897: 0x2832343233323:1"}kafkacat -b localhost -C -t my_connect_offsets -f 'Partition(%p) %k %s\n' Partition(11) ["inventory-connector",{"server":"server1"}] {"scn":"324567897", "commit_scn":"324567897: 0x2832343233323:1"}Copy to Clipboard Copied! Toggle word wrap Toggle overflow inventory-connectorのキーは["inventory-connector",{"server":"server1"}]、パーティションは11で、最後のオフセットはキーの後にくるコンテンツです。以前のオフセットに戻すには、コネクターを停止し、次のコマンドを発行する必要があります。
echo '["inventory-connector",{"server":"server1"}]|{"scn":"3245675000","commit_scn":"324567500"}' | \ kafkacat -P -b localhost -t my_connect_offsets -K \| -p 11echo '["inventory-connector",{"server":"server1"}]|{"scn":"3245675000","commit_scn":"324567500"}' | \ kafkacat -P -b localhost -t my_connect_offsets -K \| -p 11Copy to Clipboard Copied! Toggle word wrap Toggle overflow これにより、指定のキーとオフセット値を、
my_connect_offsetsトピックのパーティション11に書き込みます。この例では、コネクターは324567897ではなく SCN3245675000に戻します。
-
コネクターオフセットを含むトピックの名前を見つけます。これは、
- コネクターで、特定のオフセット SCN が含まれるログが見つけられない場合はどうなりますか?
Debezium コネクターは、コネクターオフセットに low-watermark および high-watermark SCN 値を維持します。low-watermark SCN は開始位置を表し、コネクターが正常に起動するために利用可能なオンライン redo または archive ログに存在する必要があります。コネクターがこのオフセット SCN を検出できないと報告した場合は、まだ利用可能なログに SCN が含まれていないため、コネクターが中断した場所から変更をマイニングできないことを示しています。
この場合は、2 つのオプションがあります。1 つ目として、コネクターの履歴トピックとオフセットを削除し、コネクターを再起動して、提案どおりに新しいスナップショットを作成します。こうすることで、すべてのトピックコンシューマーでデータ損失が発生しないようにします。2 つ目として、オフセットを手動で操作し、redo または archive ログで利用可能な位置に SCN を進めます。これにより、古い SCN 値と新しく提供された SCN 値の間で発生した変更がなくなり、トピックに書き込まれなくなります。これは、推奨されません。
- さまざまなマイニングストラテジーの違いは何ですか ?
Debezium Oracle コネクターは、
log.mining.strategyに 3 つのオプションがあります。デフォルトは
online_catalogであり、コネクターにデータディクショナリーを redo ログに書き込まないように指示します。代わりに、Oracle LogMiner は、テーブルの構造の現在の状態を含むオンラインデータディクショナリーを常に使用します。つまり、テーブルの構造が変更され、オンラインデータディクショナリーと一致しなくなった場合やテーブルの構造が変更された場合は、Oracle LogMiner がテーブルまたは列の名前を解決できなくなります。キャプチャーされるテーブルに対して頻繁にスキーマの変更が適用される場合は、このマイニングストラテジーのオプションを使用しないでください。テーブルのログから変更がすべてキャプチャーされ、コネクターの停止、スキーマの変更適用、コネクターの再起動を行い、テーブルのデータ変更を再開するように、すべてのデータ変更をスキーマの変更でロックステップすることが重要です。このオプションでは、必要な Oracle データベースメモリーが少なくて済み、LogMiner プロセスによってデータディクショナリーをロードまたは準備する必要がないため、通常、Oracle LogMiner セッションの起動ははるかに早くなります。2 番目のオプション
redo_log_catalogは、ログスイッチが検出されるたびに、Oracle データディクショナリーを redo ログに書き込みます。このデータディクショナリーは、Oracle LogMiner が redo および archive ログを解析するときにスキーマの変更を効果的に追跡するために必要です。このオプションでは、通常よりも多くのアーカイブログが生成されますが、データ変更のキャプチャーに影響を与えずに、キャプチャーされるテーブルをリアルタイムで操作できます。通常、このオプションはより多くの Oracle データベースメモリーを必要とし、各ログスイッチの後、Oracle LogMiner セッションとプロセスを開始するまでに少し時間がかかります。最後のオプションである
hybridは、上記の 2 つのストラテジーの利点を組み合わせ、欠点を取り除きます。このストラテジーは、online_catalogのパフォーマンスとredo_log_catalogのスキーマ追跡の回復力を活用しながら、アーカイブログ生成によるオーバーヘッドとパフォーマンスコストが通常よりも高くならないようにします。このモードではフォールバックモードが利用され、LogMiner がデータベース変更の SQL を再構築できない場合、Debezium コネクターはコネクターによって維持されるメモリー内スキーマモデルに依存して、実行中の SQL を再構築します。このモードは最終的にデフォルトに移行し、今後、唯一の動作モードになる予定です。- LogMiner を使用したハイブリッドマイニングストラテジーには制限がありますか?
-
はい、
log.mining.strategyのハイブリッドモードのストラテジーはまだ開発中であるため、すべてのデータ型をサポートしていません。現時点でこのモードでは、CLOB、NCLOB、Blob、XML、JSONデータ型に対する操作など、SQL ステートメントを再構築することはできません。つまり、lob.enabledをtrueの値に指定して有効にすると、ハイブリッドストラテジーを使用できなくなり、この組み合わせはサポートされていないためコネクターは起動に失敗します。 - コネクターが AWS での変更のキャプチャーを停止しているように見えるのはなぜですか?
タイムアウト AWS Gateway Load Balancer で 350 秒の修正されたアイドルタイムアウト により、完了までに 350 秒を超える JDBC 呼び出しは無期限にハングする可能性があります。
Oracle LogMiner API の呼び出しが完了するまでに 350 秒以上かかる状況では、タイムアウトが発生し、AWS Gateway Load Balancer がハングアップすることがあります。たとえば、大量のデータを処理する LogMiner セッションと Oracle の定期的なチェックポイントタスクが同時に実行された場合、このようなタイムアウトが発生する可能性があります。
AWS Gateway Load Balancer でタイムアウトが発生しないように、Kafka Connect 環境から root または super-user として次の手順を実行して、キープアライブパケットを有効にします。
ターミナルから、以下のコマンドを実行します。
sysctl -w net.ipv4.tcp_keepalive_time=60
sysctl -w net.ipv4.tcp_keepalive_time=60Copy to Clipboard Copied! Toggle word wrap Toggle overflow /etc/sysctl.confを編集し、以下のように以下の変数の値を設定します。net.ipv4.tcp_keepalive_time=60
net.ipv4.tcp_keepalive_time=60Copy to Clipboard Copied! Toggle word wrap Toggle overflow Oracle コネクターが
database.hostnameではなくdatabase.urlプロパティーを使用するように再設定し、以下の例のように Oracle 接続文字列記述子(ENABLE=broken)を追加します。database.url=jdbc:oracle:thin:username/password!@(DESCRIPTION=(ENABLE=broken)(ADDRESS_LIST=(ADDRESS=(PROTOCOL=TCP)(Host=hostname)(Port=port)))(CONNECT_DATA=(SERVICE_NAME=serviceName)))
database.url=jdbc:oracle:thin:username/password!@(DESCRIPTION=(ENABLE=broken)(ADDRESS_LIST=(ADDRESS=(PROTOCOL=TCP)(Host=hostname)(Port=port)))(CONNECT_DATA=(SERVICE_NAME=serviceName)))Copy to Clipboard Copied! Toggle word wrap Toggle overflow
前述の手順では、TCP ネットワークスタックが 60 秒ごとにキープアライブパケットを送信するように設定します。その結果、LogMiner API への JDBC 呼び出しが完了するまで 350 秒を超える時間がかかる場合、AWS Gateway ロードバランサーはタイムアウトしません。これにより、コネクターはデータベースのトランザクションログから変更の読み取りを続行します。
- ORA-01555 の原因と対処方法は?
Debezium Oracle コネクターは、最初のスナップショットフェーズの実行時にフラッシュバッククエリーを使用します。フラッシュバッククエリーは、データベースの
UNDO_RETENTIONデータベースパラメーターによって維持されるフラッシュバック領域に依存する特別なタイプのクエリーで、指定の SCN での特定の時点におけるテーブルの内容に基づいてクエリーの結果を返します。デフォルトでは、Oracle は通常、データベース管理者が増減の調節をしない限り、undo または flashback 領域を約 15 分間保持します。大きなテーブルをキャプチャーする設定の場合、最初のスナップショットを実行するのに 15 分以上かかるか、設定したUNDO_RETENTIONの期間分かかる場合があり、最終的に次の例外が発生します。ORA-01555: snapshot too old: rollback segment number 12345 with name "_SYSSMU11_1234567890$" too small
ORA-01555: snapshot too old: rollback segment number 12345 with name "_SYSSMU11_1234567890$" too smallCopy to Clipboard Copied! Toggle word wrap Toggle overflow この例外を扱う方法として、まずデータベース管理者と連携し、
UNDO_RETENTIONデータベースパラメーターを一時的に増やすことができるかどうかを確認します。Oracle データベースの再起動は必要ありません。したがって、データベースの可用性に影響を与えることなく、オンラインで実行できます。ただし、これを変更しても、テーブルスペースに、必要な UNDO データを格納する領域が十分にない場合、上記の例外か、「snapshot too old」の例外が発生する可能性があります。この例外を処理する 2 つ目の方法として、
snapshot.modeをno_dataに設定し、最初のスナップショットではなく、増分スナップショットに依存します。増分スナップショットはフラッシュバッククエリーに依存しないため、ORA-01555 例外の対象ではありません。- ORA-04036 の原因と対処方法は?
データベースの変更が頻繁に発生すると、Debezium Oracle コネクターは ORA-04036 例外を報告する可能性があります。Oracle LogMiner セッションが開始され、ログの切り替えが検出されるまで再利用されます。セッションは、Oracle LogMiner で最適なパフォーマンス使用率を達成できるように再利用されますが、マイニングセッションが長時間実行されると、PGA メモリーが過剰に使用され、最終的に次のような例外が発生する可能性があります。
ORA-04036: PGA memory used by the instance exceeds PGA_AGGREGATE_LIMIT
ORA-04036: PGA memory used by the instance exceeds PGA_AGGREGATE_LIMITCopy to Clipboard Copied! Toggle word wrap Toggle overflow この例外は、Oracle が redo ログを切り替える頻度、または Debezium Oracle コネクターがマイニングセッションを再利用できる期間を指定することで回避できます。Debezium Oracle コネクターには、設定オプション
log.mining.session.max.msがあり、現在の Oracle LogMiner セッションを終了して新しいセッションを開始する前に再利用できる期間を制御します。これにより、データベースで許可されている PGA メモリーを超えることなく、データベースリソースを管理できます。- ORA-01882 の原因と対処方法は?
Debezium Oracle コネクターは、Oracle データベースへの接続時に以下の例外を報告する可能性があります。
ORA-01882: timezone region not found
ORA-01882: timezone region not foundCopy to Clipboard Copied! Toggle word wrap Toggle overflow これは、JDBC ドライバーでタイムゾーン情報を正しく解決できない場合に発生します。このドライバー関連の問題を解決するには、地域を使用してタイムゾーンの詳細を解決しないようにドライバーに指示する必要があります。これには、
driver.oracle.jdbc.timezoneAsRegion=falseを使用してドライバーパススループロパティーを指定します。- ORA-25191 の原因と対処方法は?
Debezium Oracle コネクターは、インデックス設定テーブル (IOT) が Oracle LogMiner でサポートされていないため、IOT を自動的に無視します。ただし、ORA-25191 例外が出力された場合は、そのようなマッピングに固有のまれなケースが原因である可能性があり、自動的に除外するには追加のルールが必要になる場合があります。ORA-25191 例外の例を以下に示します。
ORA-25191: cannot reference overflow table of an index-organized table
ORA-25191: cannot reference overflow table of an index-organized tableCopy to Clipboard Copied! Toggle word wrap Toggle overflow ORA-25191 例外が出力された場合は、テーブルとそのマッピング、他の親テーブルに関連する内容など、Jira で問題を起票してください。回避策として、包含/除外設定オプションを調整して、コネクターがそのようなテーブルにアクセスできないようにします。
- SAX feature external-general-entities not supported を解決する方法
Debezium Oracle コネクターでの
XMLTYPEの使用は、テクノロジープレビュー機能として利用できます。この機能を使用するには、Oraclexdbおよびxmlparserv2の依存関係が必要です。
Oracle のxmlparserv2依存関係は SAX ベースのパーサーを実装しており、ランタイムがクラスパス上の他の実装ではなくこの実装を検出して使用すると、このエラーが発生します。通常使用する SAX 実装を具体的に制御するには、特定の引数を指定して JVM を起動する必要があります。
次の JVM 引数を指定するとこのエラーは発生せず、Oracle コネクターが正常に起動します。-Djavax.xml.parsers.SAXParserFactory=com.sun.org.apache.xerces.internal.jaxp.SAXParserFactoryImpl
-Djavax.xml.parsers.SAXParserFactory=com.sun.org.apache.xerces.internal.jaxp.SAXParserFactoryImplCopy to Clipboard Copied! Toggle word wrap Toggle overflow