2.6. PostgreSQL の Debezium コネクター
Debezium の PostgreSQL コネクターは、PostgreSQL データベースのスキーマで行レベルの変更をキャプチャーします。このコネクターと互換性のある PostgreSQL のバージョンについては、Debezium Supported Configurations page を参照してください。
PostgreSQL サーバーまたはクラスターに初めて接続すると、コネクターはすべてのスキーマの整合性スナップショットを作成します。スナップショットの完了後、コネクターはデータベースのコンテンツを挿入、更新、および削除する行レベルの変更を継続的にキャプチャーします。これらの行レベルの変更は、PostgreSQL データベースにコミットされています。コネクターはデータの変更イベントレコードを生成し、それらを Kafka トピックにストリーミングします。各テーブルのデフォルトの動作では、コネクターは生成されたすべてのイベントをそのテーブルの個別の Kafka トピックにストリーミングします。アプリケーションとサービスは、そのトピックからのデータ変更イベントレコードを使用します。
Debezium PostgreSQL コネクターを使用するための情報および手順は、以下のように設定されています。
- 「Debezium PostgreSQL コネクターの概要」
- 「Debezium PostgreSQL コネクターの仕組み」
- 「Debezium PostgreSQL コネクターのデータ変更イベントの説明」
- 「Debezium PostgreSQL コネクターによるデータ型のマッピング方法」
- 「Debezium コネクターを実行するための PostgreSQL のセットアップ」
- カスタムコンバーター
- 「Debezium PostgreSQL コネクターのデプロイメント」
- 「Debezium PostgreSQL コネクターのパフォーマンスの監視」
- 「Debezium PostgreSQL コネクターによる障害および問題の処理方法」
2.6.1. Debezium PostgreSQL コネクターの概要 リンクのコピーリンクがクリップボードにコピーされました!
PostgreSQL の 論理デコード 機能は、バージョン 9.4 で導入されました。これは、トランザクションログにコミットされた変更の抽出を可能にし、出力プラグイン を用いてユーザーフレンドリーな方法でこれらの変更の処理を可能にするメカニズムです。出力プラグインを使用すると、クライアントは変更を使用できます。
PostgreSQL コネクターには、連携してデータベースの変更を読み取りおよび処理する 2 つの主要部分が含まれています。
-
pgoutputは、PostgreSQL 10+ の標準的な論理デコード出力プラグインです。これは、この Debezium リリースでサポートされている唯一の論理デコード出力プラグインです。このプラグインは PostgreSQL コミュニティーにより維持され、PostgreSQL 自体によって 論理レプリケーション に使用されます。このプラグインは常に存在するため、追加のライブラリーをインストールする必要はありません。Debezium コネクターは、raw レプリケーションイベントストリームを直接変更イベントに変換します。 - PostgreSQL の ストリーミングレプリケーションプロトコル および PostgreSQL JDBC ドライバー を使用して、論理デコード出力プラグインによって生成された変更を読み取る Java コード (実際の Kafka Connect コネクター)。
コネクターは、キャプチャーされた各行レベルの挿入、更新、および削除操作の 変更イベント を生成し、個別の Kafka トピックの各テーブルに対する変更イベントレコードを送信します。クライアントアプリケーションは、対象のデータベーステーブルに対応する Kafka トピックを読み取り、これらのトピックから受け取るすべての行レベルイベントに対応できます。
通常、PostgreSQL は一定期間後にログ先行書き込み (WAL、write-ahead log) をパージします。つまり、コネクターにはデータベースに加えられたすべての変更の完全な履歴はありません。そのため、PostgreSQL コネクターが最初に特定の PostgreSQL データベースに接続すると、データベーススキーマごとに 整合性スナップショット を実行して起動します。コネクターは、スナップショットの完成後に、スナップショットが作成された正確な時点から変更のストリーミングを続行します。これにより、コネクターはすべてのデータの整合性のあるビューで開始し、スナップショットの作成中に加えられた変更は省略されません。
コネクターはフォールトトラレントです。コネクターは変更を読み取り、イベントを生成するため、各イベントの WAL の位置を記録します。コネクターが何らかの理由で停止した場合 (通信障害、ネットワークの問題、クラッシュなど)、コネクターは再起動後に最後に停止した場所から WAL の読み取りを続行します。これにはスナップショットが含まれます。スナップショット中にコネクターが停止した場合、コネクターは再起動時に新しいスナップショットを開始します。
コネクターは PostgreSQL の論理デコード機能に依存および反映します。これには、以下の制限があります。
- 論理デコードは DDL の変更をサポートしません。よって、コネクターは DDL の変更イベントをコンシューマーに報告できません。
- 論理デコードレプリケーションスロットはコミット後ではなくコミット中に変更を公開するため、望ましくない副次的な影響が発生する可能性があります。クライアントが矛盾した状態を観察する可能性がある主なシナリオは 2 つあります。まず、レプリケーションが完了する前にマスターが停止した場合に、コミットされていない変更を公開します。2 番目は、レプリケート中のため一時的に読み取れない変更 (つまり、read-after-write の整合性) を公開します。たとえば、EmbeddedEngine コンシューマーは、作成された行の通知を受信しますが、トランザクションでは読み取ることができません。
さらに、pgoutput 論理デコード出力プラグインは生成された列の値をキャプチャーしないため、これらの列のデータはコネクターの出力から欠落することになります。
問題が発生した場合の動作 では、問題の発生時にコネクターがどのように対応するかが説明されています。
Debezium は現在、UTF-8 文字エンコーディングのデータベースのみをサポートしています。1 バイト文字エンコーディングでは、拡張 ASCII コード文字が含まれる文字列を正しく処理できません。
2.6.2. Debezium PostgreSQL コネクターの仕組み リンクのコピーリンクがクリップボードにコピーされました!
Debezium PostgreSQL コネクターを最適に設定および実行するには、コネクターによるスナップショットの実行方法、変更イベントのストリーム方法、Kafka トピック名の決定方法、およびメタデータの使用方法を理解すると便利です。
詳細は以下を参照してください。
2.6.2.1. PostgreSQL コネクターのセキュリティー リンクのコピーリンクがクリップボードにコピーされました!
Debezium コネクターを使用して PostgreSQL データベースから変更をストリーミングするには、コネクターは特定の権限がデータベースで必要になります。必要な権限を付与する方法の 1 つとして、ユーザーに superuser 権限を付与する方法がありますが、これにより PostgreSQL データが不正アクセスによって公開される可能性ああります。Debezium ユーザーに過剰な権限を付与するのではなく、特定の特権を付与する専用の Debezium レプリケーションユーザーを作成することが推奨されます。
Debezium PostgreSQL ユーザーの権限設定の詳細は、パーミッションの設定 を参照してください。PostgreSQL の論理レプリケーションセキュリティーの詳細は、PostgreSQL のドキュメント を参照してください。
2.6.2.2. Debezium PostgreSQL コネクターによるデータベーススナップショットの実行方法 リンクのコピーリンクがクリップボードにコピーされました!
ほとんどの PostgreSQL サーバーは、WAL セグメントにデータベースの完全な履歴を保持しないように設定されています。つまり、PostgreSQL コネクターは WAL のみを読み取ってもデータベースの履歴全体を確認できません。そのため、コネクターが最初に起動すると、データベースの最初の 整合性スナップショット が実行されます。
スナップショットの詳細は、以下のセクションを参照してください。
初期スナップショットのデフォルトのワークフロー動作
次の手順では、コネクターが初期スナップショット中に実行するデフォルトの手順について説明します。この動作を変更するには、snapshot.mode コネクター設定プロパティー を initial 以外の値に設定します。
-
snapshot.isolation.modeプロパティーで指定された分離レベルを使用するトランザクションを開始します。指定されたモードによって、このトランザクションのその後の読み取りがデータの単一の一貫性のあるバージョンに対して行われるかどうかが決まります。モードによっては、他のクライアントによる後続のINSERT、UPDATE、およびDELETE操作の結果として生じたデータの変更が、このトランザクションに表示される場合があります。 - サーバーのトランザクションログの現在の位置を読み取ります。
-
データベーステーブルとスキーマをスキャンし、各行の
READイベントを生成して、そのイベントを適切なテーブル固有の Kafka トピックに書き込みます。 - トランザクションをコミットします。
- コネクターオフセットにスナップショットの正常な完了を記録します。
コネクターに障害が発生した場合、コネクターのリバランスが発生した場合、または 1 の後で 5 の完了前に停止した場合、コネクターは再起動後に新しいスナップショットを開始します。コネクターが最初のスナップショットを完了すると、PostgreSQL コネクターは手順 2 で読み取る位置からストリーミングを続行します。これにより、コネクターが更新を見逃さないようします。何らかの理由でコネクターが再び停止した場合に、コネクターは再起動後に最後に停止した位置から変更のストリーミングを続行します。
| オプション | 説明 |
|---|---|
|
|
コネクターは起動時に常にスナップショットを実行します。スナップショットが完了した後、コネクターは上記の手順の 3. から変更のストリーミングを続行します。このモードは次の状況で役立ちます。
|
|
| Kafka オフセットトピックが存在しない場合、コネクターはデータベーススナップショットを実行します。データベースのスナップショットが完了すると、Kafka オフセットトピックが書き込まれます。Kafka オフセットトピックに以前保存された LSN がある場合、コネクターはその位置から変更をストリーミングを続行します。 |
|
| コネクターはデータベースのスナップショットを実行し、変更イベントレコードをストリーミングする前に停止します。コネクターが起動していても、停止前にスナップショットを完了しなかった場合、コネクターはスナップショットプロセスを再起動し、スナップショットの完了時に停止します。 |
|
| コネクターはスナップショットを実行しません。コネクターがこのように設定されている場合、起動後は次のように動作します。 Kafka オフセットトピックに以前保存された LSN がある場合、コネクターはその位置から変更をストリーミングを続行します。LSN が保存されていない場合、コネクターは、サーバー上で PostgreSQL 論理レプリケーションスロットが作成された時点から変更のストリーミングを開始します。対象のすべてのデータが WAL に反映されている場合にのみ、このスナップショットモードを使用します。 |
|
|
非推奨。 |
|
| コネクターが起動した後、次のいずれかの状況を検出した場合にのみスナップショットが実行されます。
|
2.6.2.3. アドホックスナップショット リンクのコピーリンクがクリップボードにコピーされました!
デフォルトでは、コネクターは初回スナップショット操作の開始後にのみ実行されます。通常の状況では、この最初のスナップショットが作成されると、コネクターではスナップショットプロセスは繰り返し処理されません。コネクターがキャプチャーする今後の変更イベントデータはストリーミングプロセス経由でのみ行われます。
ただし、場合によっては、最初のスナップショット中にコネクターを取得したデータが古くなったり、失われたり、または不完全となったり可能性があります。テーブルデータを再キャプチャーするメカニズムを提供するため、Debezium にはアドホックスナップショットを実行するオプションがあります。Debezium 環境で次のいずれかの変更が発生したら、アドホックスナップショットを実行することを推奨します。
- コネクター設定は、異なるテーブルセットをキャプチャーするように変更されます。
- Kafka トピックを削除して、再構築する必要があります。
- 設定エラーや他の問題が原因で、データの破損が発生します。
アドホックと呼ばれるスナップショット を開始することで、以前にスナップショットをキャプチャーしたテーブルのスナップショットを再実行できます。アドホックスナップショットには、シグナルテーブル を使用する必要があります。シグナルリクエストを Debezium シグナルテーブルに送信して、アドホックスナップショットを開始します。
既存のテーブルのアドホックスナップショットを開始すると、コネクターはテーブルにすでに存在するトピックにコンテンツを追加します。既存のトピックが削除された場合には、トピックの自動作成 が有効になっているのであれば、Debezium は自動的にトピックを作成できます。
アドホックのスナップショットシグナルは、スナップショットに追加するテーブルを指定します。スナップショットは、データベースの内容全体をキャプチャーしたり、データベース内のテーブルのサブセットのみをキャプチャーしたりできます。また、スナップショットは、データベース内のテーブルの内容のサブセットをキャプチャできます。
execute-snapshot メッセージをシグナルテーブルに送信してキャプチャーするテーブルを指定します。execute-snapshot シグナルのタイプを incremental または blocking に設定し、スナップショットに含めるテーブルの名前を次の表に示すように指定します。
| フィールド | デフォルト | 値 |
|---|---|---|
|
|
|
実行するスナップショットのタイプを指定します。 |
|
| 該当なし |
スナップショットに含めるテーブルの完全修飾名に一致する正規表現を含む配列。 |
|
| 該当なし |
コネクターがスナップショットに含めるレコードのサブセットを決定するために評価する追加条件のセットを指定する、オプションの配列。
|
|
| 該当なし | スナップショット処理中にコネクターがテーブルのプライマリーキーとして使用する列名を指定するオプションの文字列。 |
アドホック増分スナップショットのトリガー
アドホック増分スナップショットを開始するには、execute-snapshot シグナルタイプのエントリーをシグナリングテーブルに追加するか、シグナルメッセージを Kafka シグナリングトピックに送信します。コネクターがメッセージを処理した後に、スナップショット操作を開始します。スナップショットプロセスは、最初と最後のプライマリーキーの値を読み取り、これらの値を各テーブルの開始ポイントおよびエンドポイントとして使用します。テーブルのエントリー数と設定されたチャンクサイズに基づいて、Debezium はテーブルをチャンクに分割し、チャンクごとに 1 度に 1 つずつスナップショットを順番に作成していきます。
詳細は、スナップショットの増分 を参照してください。
アドホックブロッキングスナップショットのトリガー
シグナリングテーブルまたはシグナリングトピックに、execute-snapshot シグナルタイプを持つエントリーを追加することによって、アドホックブロッキングスナップショットを開始します。コネクターがメッセージを処理した後に、スナップショット操作を開始します。コネクターはストリーミングを一時的に停止し、初期スナップショットの時と同じプロセスに従って、指定されたテーブルのスナップショットを開始します。スナップショットが完了すると、コネクターはストリーミングを再開します。
詳細は、ブロッキングスナップショット を参照してください。
2.6.2.4. 増分スナップショット リンクのコピーリンクがクリップボードにコピーされました!
スナップショットを柔軟に管理するため、Debezium には 増分スナップショット と呼ばれる補助スナップショットメカニズムが含まれています。増分スナップショットは、Debezium コネクターにシグナルを送信 するための Debezium メカニズムに依存します。
増分スナップショットでは、最初のスナップショットのように、データベースの完全な状態を一度にすべてキャプチャーする代わりに、一連の設定可能なチャンクで各テーブルを段階的にキャプチャーします。スナップショットがキャプチャーするテーブルと、各チャンクのサイズ を指定できます。チャンクのサイズにより、データベース上の各フェッチ操作中にスナップショットで収集される行数が決まります。増分スナップショットのデフォルトのチャンクサイズは 1024 行です。
増分スナップショットが進むと、Debezium はウォーターマークを使用して進捗を追跡し、キャプチャーする各テーブル行のレコードを管理します。この段階的なアプローチでは、標準の初期スナップショットプロセスと比較して、以下の利点があります。
- スナップショットが完了するまで、ストリーミングストリーミングを延期する代わりに、ストリームしたデータキャプチャーと並行して増分スナップショットを実行できます。コネクターはスナップショットプロセス全体で変更ログからのほぼリアルタイムイベントをキャプチャーし続け、他の操作はブロックしません。
- 増分スナップショットの進捗が中断された場合は、データを失うことなく再開できます。プロセスが再開すると、スナップショットは最初からテーブルをキャプチャーするのではなく、停止した時点から開始します。
-
いつでも増分スナップショットを実行し、必要に応じてプロセスを繰り返してデータベースの更新に適合できます。たとえば、コネクター設定を変更してテーブルを
table.include.listプロパティーに追加した後にスナップショットを再実行します。
増分スナップショットプロセス
増分スナップショットを実行する場合には、Debezium は各テーブルをプライマリーキー別に分類して、設定されたチャンクサイズ に基づいてテーブルをチャンクに分割します。チャンクごとに作業し、テーブルの行ごとにチャンクでキャプチャーします。キャプチャーする行ごとに、スナップショットは READ イベントを出力します。そのイベントは、対象となるチャンクのスナップショットを開始する時の行の値を表します。
スナップショットの作成が進むにつれ、他のプロセスがデータベースへのアクセスを継続し、テーブルレコードが変更される可能性があります。このような変更を反映させるように、通常通りに INSERT、UPDATE、DELETE 操作がトランザクションログにコミットされます。同様に、継続中の Debezium ストリーミングプロセスは、これらの変更イベントを検出し、対応する変更イベントレコードを Kafka に出力します。
Debezium を使用してプライマリーキーが同じレコード間での競合を解決する方法
場合によっては、ストリーミングプロセスが出力する UPDATE または DELETE イベントを順番に受信できます。つまり、ストリーミングプロセスは、スナップショットがその行の READ イベントが含まれるチャンクをキャプチャーする前に、テーブルの行を変更するイベントを生成する可能性があります。スナップショットが最終的に対象の行にあった READ イベントを出力すると、その値はすでに置き換えられています。Debezium は、シーケンスが到達する増分スナップショットイベントが正しい論理順序で処理されるように、競合を解決するためにバッファースキームを使用します。スナップショットのイベント間で競合が発生し、ストリームされたイベントが解決されてからでないと、Debezium はイベントのレコードを Kafka に送信しません。
スナップショットウィンドウ
遅れて入ってきた READ イベントと、同じテーブルの行を変更するストリーミングイベント間の競合の解決を容易にするために、Debezium は スナップショットウィンドウ と呼ばれるものを使用します。スナップショットウィンドウは、増分スナップショットが指定のテーブルチャンクのデータをキャプチャーしている途中に、間隔を決定します。チャンクのスナップショットウィンドウを開く前に、Debezium は通常の動作に従い、トランザクションログから直接ターゲットの Kafka トピックにイベントをダウンストリームに出力します。ただし、特定のチャンクのスナップショットが開放された瞬間から終了するまで、Debezium は重複除去のステップを実行して、プライマリーキーが同じイベント間での競合を解決します。
データコレクションごとに、Debezium は 2 種類のイベントを出力し、それらの両方のレコードを単一の宛先 Kafka トピックに保存します。テーブルから直接キャプチャーするスナップショットレコードは、READ 操作として出力されます。その間、ユーザーはデータコレクションのレコードの更新を続け、各コミットを反映するようにトランザクションログが更新されるので、Debezium は変更ごとに UPDATE または DELETE 操作を出力します。
スナップショットウィンドウが開放され、Debezium がスナップショットチャンクの処理を開始すると、スナップショットレコードをメモリーバッファーに提供します。スナップショットウィンドウ中に、バッファー内の READ イベントのプライマリーキーは、受信ストリームイベントのプライマリーキーと比較されます。一致するものが見つからない場合、ストリーミングされたイベントレコードが Kafka に直接送信されます。Debezium が一致を検出すると、バッファーされた READ イベントを破棄し、ストリーミングされたレコードを宛先トピックに書き込みます。これは、ストリーミングされたイベントが静的スナップショットイベントよりも論理的に優先されるためです。チャンクのスナップショットウィンドウが終了すると、バッファーに含まれるのは、関連するトランザクションログイベントが存在しない READ イベントのみです。Debezium は、これらの残りの READ イベントをテーブルの Kafka トピックに出力します。
コネクターは各スナップショットチャンクにプロセスを繰り返します。
現在、増分スナップショットを開始するには、次のいずれかの方法を使用できます。
PostgreSQL の Debezium コネクターでは、増分スナップショットの実行中のスキーマの変更はサポートしません。増分スナップショットの開始 前に スキーマの変更が行われ、シグナルが送信された 後に スキーマの変更が行われた場合は、スキーマの変更を正しく処理するために、パススルーの設定オプション database.autosave が conservative に設定されます。
2.6.2.4.1. 増分スナップショットのトリガー リンクのコピーリンクがクリップボードにコピーされました!
増分スナップショットを開始するには、ソースデータベースのシグナリングテーブルに アドホックスナップショットシグナル を送信します。スナップショットシグナルは SQL INSERT クエリーとして送信します。
Debezium がシグナルテーブルの変更を検出すると、シグナルを読み取り、要求されたスナップショット操作を実行します。
送信するクエリーはスナップショットに追加するテーブルを指定し、必要に応じてスナップショット操作の種類を指定します。Debezium は現在、incremental と blocking のスナップショットタイプをサポートしています。
スナップショットに追加するテーブルを指定するには、テーブルをリストする data-collections 配列またはテーブルの照合に使用する正規表現の配列を指定します。以下に例を示します。
{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}
増分スナップショットシグナルの data-collections アレイにはデフォルト値がありません。data-collections 配列が空の場合、Debezium は空の配列をアクションが必要ないと解釈し、スナップショットは作成しません。
スナップショットに含めるテーブルの名前にドット (.)、スペース、またはその他の英数字以外の文字が含まれている場合は、テーブル名を二重引用符でエスケープする必要があります。
たとえば、public スキーマに存在し、My.Table という名前のテーブルを含めるには、"public.\"My.Table\"" の形式を使用します。
前提条件
- ソースデータベースにシグナリングデータコレクションが存在する。
-
シグナルデータコレクションが
signal.data.collectionプロパティーで指定されている。
ソースシグナリングチャネルを使用して増分スナップショットをトリガーする
SQL クエリーを送信し、アドホック増分スナップショット要求をシグナルテーブルに追加します。
INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"<snapshotType>","additional-conditions":[{"data-collection": "<fullyQualfiedTableName>", "filter": "<additional-condition>"}]}');INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"<snapshotType>","additional-conditions":[{"data-collection": "<fullyQualfiedTableName>", "filter": "<additional-condition>"}]}');Copy to Clipboard Copied! Toggle word wrap Toggle overflow 以下に例を示します。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow コマンドの
id、type、およびdataパラメーターの値は、シグナルテーブルのフィールド に対応します。
以下の表では、この例のパラメーターを説明しています。Expand 表2.130 シグナルテーブルに増分スナップショットシグナルを送信する SQL コマンドのフィールドの説明 項目 値 説明 1
schema.debezium_signalソースデータベースにあるシグナルテーブルの完全修飾名を指定します。
2
ad-hoc-1idパラメーターは、シグナルリクエストのID識別子として割り当てられる任意の文字列を指定します。
この文字列を使用して、シグナルテーブルのエントリーへのログメッセージを特定します。Debezium はこの文字列を使用しません。代わりに、スナップショット作成中に、Debezium は独自のID文字列をウォーターマークシグナルとして生成します。3
execute-snapshottypeパラメーターを指定し、シグナルがトリガーする操作を指定します。
4
data-collectionsシグナルの
dataフィールドの必須コンポーネントで、スナップショットに含めるテーブル名の配列またはテーブル名と一致する正規表現を指定します。
配列には、schema.table形式を使用してテーブルの完全修飾名と一致する正規表現がリストされます。この形式は、コネクターの シグナリングテーブル の名前を指定するために使用する形式と同じです。5
incremental実行するスナップショット操作のタイプを指定する、シグナルの
dataフィールドのオプションのtypeコンポーネント。
有効な値はincrementalとblockingです。
値を指定しない場合、コネクターはデフォルトで増分スナップショットを実行します。6
additional-conditionsコネクターがスナップショットに含めるレコードのサブセットを決定するために評価する追加条件のセットを指定する、オプションの配列。
各追加条件は、data-collectionプロパティーとfilterプロパティーを持つオブジェクトです。データの収集単位で異なるフィルターを指定できます。
*data-collectionプロパティーは、フィルターが適用されるデータコレクションの完全修飾名です。additional-conditionsパラメーターの詳細は、「additional-conditions付きでアドホック増分スナップショットを実行する」 を参照してください。
2.6.2.4.2. additional-conditions 付きでアドホック増分スナップショットを実行する リンクのコピーリンクがクリップボードにコピーされました!
スナップショットに、テーブル内のコンテンツのサブセットのみを含める場合は、スナップショットシグナルに additional-conditions パラメーターを追加してシグナル要求を変更できます。
一般的なスナップショットの SQL クエリーは、以下の形式を取ります。
SELECT * FROM <tableName> ....
SELECT * FROM <tableName> ....
additional-conditions パラメーターを追加して、以下の例のように WHERE 条件を SQL クエリーに追加します。
SELECT * FROM <data-collection> WHERE <filter> ....
SELECT * FROM <data-collection> WHERE <filter> ....
以下の例は、シグナルテーブルに追加の条件を含むアドホック増分スナップショット要求を送信する SQL クエリーを示しています。
INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"<snapshotType>","additional-conditions":[{"data-collection": "<fullyQualfiedTableName>", "filter": "<additional-condition>"}]}');
INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"<snapshotType>","additional-conditions":[{"data-collection": "<fullyQualfiedTableName>", "filter": "<additional-condition>"}]}');
たとえば、以下の列が含まれる products テーブルがあるとします。
-
id(プライマリーキー) -
color -
quantity
products テーブルの増分スナップショットに color=blue のデータ項目のみを含める場合は、次の SQL ステートメントを使用してスナップショットをトリガーできます。
INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.products"],"type":"incremental", "additional-conditions":[{"data-collection": "schema1.products", "filter": "color=blue"}]}');
INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.products"],"type":"incremental", "additional-conditions":[{"data-collection": "schema1.products", "filter": "color=blue"}]}');
additional-conditions パラメーターを使用すると、列が 2 つ以上となる条件を指定することもできます。たとえば、前述の例の products テーブルを使用して、color=blue および quantity>10 だけに一致するアイテムのみのデータが含まれる増分スナップショットをトリガーするクエリーを送信できます。
INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.products"],"type":"incremental", "additional-conditions":[{"data-collection": "schema1.products", "filter": "color=blue AND quantity>10"}]}');
INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.products"],"type":"incremental", "additional-conditions":[{"data-collection": "schema1.products", "filter": "color=blue AND quantity>10"}]}');
以下の例は、コネクターによってキャプチャーされる増分スナップショットイベントの JSON を示しています。
例2.40 増分スナップショットイベントメッセージ
| 項目 | フィールド名 | 説明 |
|---|---|---|
| 1 |
|
実行するスナップショット操作タイプを指定します。 |
| 2 |
|
イベントタイプを指定します。 |
2.6.2.4.3. Kafka シグナルチャネルを使用して増分スナップショットをトリガーする リンクのコピーリンクがクリップボードにコピーされました!
設定された Kafka トピック にメッセージを送信して、コネクターにアドホック増分スナップショットを実行するよう要求できます。
Kafka メッセージのキーは、topic.prefix コネクター設定オプションの値と一致する必要があります。
メッセージの値は、type と data フィールドが含まれる JSON オブジェクトとなっています。
シグナルタイプは execute-snapshot で、data フィールドには以下のフィールドが必要です。
| フィールド | デフォルト | 値 |
|---|---|---|
|
|
|
実行するスナップショットのタイプ。現在、Debezium は |
|
| 該当なし |
スナップショットに含めるテーブルの完全修飾名と一致する、コンマ区切りの正規表現の配列。 |
|
| 該当なし |
コネクターがスナップショットに含めるレコードのサブセットを指定するために評価する基準を指定する、オプションの追加条件の配列。 |
例2.41 execute-snapshot Kafka メッセージ
Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["{collection-container}.table1", "{collection-container}.table2"], "type": "INCREMENTAL"}}`
Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["{collection-container}.table1", "{collection-container}.table2"], "type": "INCREMENTAL"}}`
additional-conditions 付きのアドホック増分スナップショット
Debezium は additional-conditions フィールドを使用してテーブルのコンテンツのサブセットを選択します。
通常、Debezium はスナップショットを実行するときに、次のような SQL クエリーを実行します。
SELECT * FROM <tableName> ….
スナップショット要求に additional-conditions プロパティーが含まれている場合、プロパティーの data-collection および filter パラメーターが SQL クエリーに追加されます。次に例を示します。
SELECT * FROM <data-collection> WHERE <filter> ….
たとえば、列 id (プライマリーキー)、color、および brand を含む products テーブルがある場合、スナップショットに color='blue' のコンテンツのみを含める場合は、スナップショットをリクエストするときに、コンテンツをフィルタリングする additional-conditions プロパティーを追加することができます。
Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "schema1.products" ,"filter":"color='blue'"}]}}`
Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "schema1.products" ,"filter":"color='blue'"}]}}`
また、additional-conditions プロパティーを使用して、複数の列に基づいて条件を渡すこともできます。たとえば、前の例と同じ products テーブルを使用して、color='blue' および brand='MyBrand' である products テーブルのコンテンツのみをスナップショットに含める場合は、次のリクエストを送信できます。
Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "schema1.products" ,"filter":"color='blue' AND brand='MyBrand'"}]}}`
Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "schema1.products" ,"filter":"color='blue' AND brand='MyBrand'"}]}}`
2.6.2.4.4. 増分スナップショットの停止 リンクのコピーリンクがクリップボードにコピーされました!
状況によっては、増分スナップショットを停止する必要がある場合があります。たとえば、スナップショットが正しく設定されていない場合や、他のデータベース操作にリソースが使用可能であるこのとの確認が必要な場合があります。ソースデータベースのシグナリングテーブルにシグナルを送信することで、すでに実行中のスナップショットを停止できます。
スナップショット停止信号をシグナリングテーブルに送信するには、SQL INSERT クエリーで送信します。stop-snapshot シグナルは、スナップショット操作の type を incremental として指定し、オプションで、現在実行中のスナップショットから省略するテーブルを指定します。Debezium はシグナルテーブルの変更を検出した後、シグナルを読み、増分スナップショット操作が進行中であればそれを停止します。
関連情報
また、JSON メッセージを Kafka シグナリングトピック に送信して、増分スナップショットを停止することもできます。
前提条件
- ソースデータベースにシグナリングデータコレクションが存在する。
-
シグナルデータコレクションが
signal.data.collectionプロパティーで指定されている。
ソースシグナリングチャネルを使用して増分スナップショットを停止する
SQL クエリーを送信して、シグナリングテーブルへのアドホックインクリメンタルスナップショットを停止します。
INSERT INTO <signalTable> (id, type, data) values ('<id>', 'stop-snapshot', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"incremental"}');INSERT INTO <signalTable> (id, type, data) values ('<id>', 'stop-snapshot', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"incremental"}');Copy to Clipboard Copied! Toggle word wrap Toggle overflow 以下に例を示します。
INSERT INTO myschema.debezium_signal (id, type, data) values ('ad-hoc-1', 'stop-snapshot', '{"data-collections": ["schema1.table1", "schema1.table2"], "type":"incremental"}');INSERT INTO myschema.debezium_signal (id, type, data)1 values ('ad-hoc-1',2 'stop-snapshot',3 '{"data-collections": ["schema1.table1", "schema1.table2"],4 "type":"incremental"}');5 Copy to Clipboard Copied! Toggle word wrap Toggle overflow signal コマンドの
id、type、およびdataパラメーターの値は、シグナリングテーブルのフィールド に対応します。
以下の表では、この例のパラメーターを説明しています。Expand 表2.133 シグナリングテーブルに増分スナップショット停止信号を送信するための SQL コマンドのフィールドの説明 項目 値 説明 1
schema.debezium_signalソースデータベースにあるシグナルテーブルの完全修飾名を指定します。
2
ad-hoc-1idパラメーターは、シグナルリクエストのID識別子として割り当てられる任意の文字列を指定します。
この文字列を使用して、シグナルテーブルのエントリーへのログメッセージを特定します。Debezium はこの文字列を使用しません。3
stop-snapshottypeパラメーターを指定し、シグナルがトリガーする操作を指定します。
4
data-collectionsシグナルの
dataフィールドのオプションコンポーネントで、スナップショットから削除するテーブル名の配列またはテーブル名とマッチする正規表現を指定します。
配列には、schema.tableの形式で完全修飾名でテーブルに一致する正規表現がリストされます。dataフィールドからこのコンポーネントを省略すると、シグナルによって進行中の増分スナップショット全体が停止されます。5
incremental停止するスナップショット操作のタイプを指定する信号の
dataフィールドの必須コンポーネント。
現在、有効な唯一のオプションはincrementalです。typeの値を指定しない場合、シグナルは増分スナップショットの停止に失敗します。
2.6.2.4.5. Kafka シグナリングチャネルを使用して増分スナップショットを停止する リンクのコピーリンクがクリップボードにコピーされました!
設定された Kafka シグナリングトピック にシグナルメッセージを送信して、アドホック増分スナップショットを停止できます。
Kafka メッセージのキーは、topic.prefix コネクター設定オプションの値と一致する必要があります。
メッセージの値は、type と data フィールドが含まれる JSON オブジェクトとなっています。
シグナルタイプは stop-snapshot で、data フィールドには以下のフィールドが必要です。
| フィールド | デフォルト | 値 |
|---|---|---|
|
|
|
実行するスナップショットのタイプ。現在、Debezium は |
|
| 該当なし |
テーブルの完全修飾名に一致する、コンマで区切られた正規表現のオプションの配列、スナップショットから削除するテーブル名に一致するテーブル名または正規表現の配列。 |
次の例は、典型的な stop-snapshot の Kafka メッセージを示しています。
Key = `test_connector`
Value = `{"type":"stop-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`
Key = `test_connector`
Value = `{"type":"stop-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`
2.6.2.5. ブロッキングスナップショット リンクのコピーリンクがクリップボードにコピーされました!
スナップショットをより柔軟に管理するために、Debezium には ブロッキングスナップショット と呼ばれる補助アドホックスナップショットメカニズムが含まれています。ブロッキングスナップショットは、Debezium コネクターにシグナルを送信 するための Debezium メカニズムに依存します。
ブロッキングスナップショットは、ランタイム時にトリガーできることを除いて、初期スナップショット と同じように動作します。
次のような状況では、標準の初期スナップショットプロセスを使用するのではなく、ブロッキングスナップショットを実行する必要があります。
- 新しいテーブルを追加し、コネクターの実行中にスナップショットを完了したいと考えている。
- 大きなテーブルを追加し、増分スナップショットよりも短い時間でスナップショットを完了したいと考えている。
ブロッキングスナップショットのプロセス
ブロッキングスナップショットを実行すると、Debezium はストリーミングを停止し、初期スナップショットの時と同じプロセスに従って、指定されたテーブルのスナップショットを開始します。スナップショットが完了すると、ストリーミングが再開されます。
スナップショットの設定
シグナルの data コンポーネントでは、次のプロパティーを設定できます。
- data-collections: スナップショットする必要のあるテーブルを指定します。
-
data-collections: スナップショットに含めるテーブルを指定します。
このプロパティーは、完全修飾テーブル名に一致する正規表現のコンマ区切りリストを受け入れます。プロパティーの動作は、ブロッキングスナップショットでキャプチャーするテーブルを指定するtable.include.listプロパティーの動作と似ています。 additional-conditions: テーブルごとに異なるフィルターを指定できます。
-
data-collectionプロパティーは、フィルターが適用されるテーブルの完全修飾名であり、データベースに応じて大文字と小文字を区別するか、区別しないかを指定できます。 -
filterプロパティーは、snapshot.select.statement.overridesで使用される値と同じものが設定されます。これは、大文字小文字を区別して一致させる必要があるテーブルの完全修飾名です。
-
以下に例を示します。
{"type": "blocking", "data-collections": ["schema1.table1", "schema1.table2"], "additional-conditions": [{"data-collection": "schema1.table1", "filter": "SELECT * FROM [schema1].[table1] WHERE column1 = 0 ORDER BY column2 DESC"}, {"data-collection": "schema1.table2", "filter": "SELECT * FROM [schema1].[table2] WHERE column2 > 0"}]}
{"type": "blocking", "data-collections": ["schema1.table1", "schema1.table2"], "additional-conditions": [{"data-collection": "schema1.table1", "filter": "SELECT * FROM [schema1].[table1] WHERE column1 = 0 ORDER BY column2 DESC"}, {"data-collection": "schema1.table2", "filter": "SELECT * FROM [schema1].[table2] WHERE column2 > 0"}]}
重複の可能性
スナップショットをトリガーするシグナルを送信した時点と、ストリーミングが停止してスナップショットが開始する時点との間に遅延が生じる可能性があります。この遅延の結果、スナップショットが完了した後、コネクターはスナップショットがキャプチャーしたレコードと重複するイベントレコードを発行する可能性があります。
2.6.2.6. Debezium PostgreSQL コネクターによる変更イベントレコードのストリーミング方法 リンクのコピーリンクがクリップボードにコピーされました!
通常、PostgreSQL コネクターは、接続されている PostgreSQL サーバーから変更をストリーミングするのに大半の時間を費やします。このメカニズムは、PostgreSQL のレプリケーションプロトコル に依存します。このプロトコルにより、クライアントはログシーケンス番号 (LSN) と呼ばれる特定の場所で変更がサーバーのトランザクションログにコミットされる際に、サーバーから変更を受信することができます。
サーバーがトランザクションをコミットするたびに、別のサーバープロセスが 論理デコードプラグイン からコールバック関数を呼び出します。この関数はトランザクションからの変更を処理し、特定の形式 (Debezium プラグインの場合は Protobuf または JSON) に変換して、出力ストリームに書き込みます。その後、クライアントは変更を使用できます。
Debezium PostgreSQL コネクターは PostgreSQL クライアントとして動作します。コネクターが変更を受信すると、イベントを Debezium の create、update、または delete イベントに変換します。これには、イベントの LSN が含まれます。PostgreSQL コネクターは、同じプロセスで実行されている Kafka Connect フレームワークにレコードのこれらの変更イベントを転送します。Kafka Connect プロセスは、変更イベントレコードを適切な Kafka トピックに生成された順序で非同期に書き込みます。
Kafka Connect は定期的に最新の オフセット を別の Kafka トピックに記録します。オフセットは、各イベントに含まれるソース固有の位置情報を示します。PostgreSQL コネクターでは、各変更イベントに記録された LSN がオフセットです。
Kafka Connect が正常にシャットダウンすると、コネクターを停止し、すべてのイベントレコードを Kafka にフラッシュして、各コネクターから受け取った最後のオフセットを記録します。Kafka Connect の再起動時に、各コネクターの最後に記録されたオフセットを読み取り、最後に記録されたオフセットで各コネクターを起動します。コネクターを再起動すると、PostgreSQL サーバーにリクエストを送信し、その位置の直後に開始されるイベントを送信します。
PostgreSQL コネクターは、論理デコードプラグインによって送信されるイベントの一部としてスキーマ情報を取得します。ただし、コネクターはプライマリーキーが設定される列に関する情報を取得しません。コネクターは JDBC メタデータ (サイドチャネル) からこの情報を取得します。テーブルのプライマリーキー定義が変更される場合 (プライマリーキー列の追加、削除、または名前変更によって)、変更される場合、JDBC からのプライマリーキー情報が論理デコードプラグインが生成する変更イベントと同期されないごくわずかな期間が発生します。このごくわずかな期間に、キーの構造が不整合な状態でメッセージが作成される可能性があります。不整合にならないようにするには、以下のようにプライマリーキーの構造を更新します。
- データベースまたはアプリケーションを読み取り専用モードにします。
- Debezium に残りのイベントをすべて処理させます。
- Debezium を停止します。
- 関連するテーブルのプライマリーキー定義を更新します。
- データベースまたはアプリケーションを読み取り/書き込みモードにします。
- Debezium を再起動します。
PostgreSQL 10+ 論理デコードサポート (pgoutput)
PostgreSQL 10+ の時点で、PostgreSQL でネイティブにサポートされる pgoutput と呼ばれる論理レプリケーションストリームモードがあります。つまり、Debezium PostgreSQL コネクターは追加のプラグインを必要とせずにそのレプリケーションストリームを使用できます。これは、プラグインのインストールがサポートされないまたは許可されない環境で特に便利です。
詳細は、PostgreSQL のセットアップ を参照してください。
2.6.2.7. Debezium PostgreSQL の変更イベントレコードを受信する Kafka トピックのデフォルト名 リンクのコピーリンクがクリップボードにコピーされました!
デフォルトでは、PostgreSQL コネクターは、テーブルで発生するすべての INSERT、UPDATE、DELETE 操作の変更イベントを、そのテーブルに固有の単一の Apache Kafka トピックに書き込みます。コネクターは以下の規則を使用して変更イベントトピックに名前を付けます。
topicPrefix.schemaName.tableName
以下のリストは、デフォルト名のコンポーネントの定義を示しています。
- topicPrefix
-
topic.prefixコネクター設定プロパティーで指定されたトピック接頭辞。 - schemaName
- 変更イベントが発生したデータベーススキーマの名前。
- tableName
- 変更イベントが発生したデータベーステーブルの名前。
たとえば、postgres データベースと products、products_on_hand、customers、orders の 4 つのテーブルを含む inventory スキーマを持つ PostgreSQL インストレーションの変更をキャプチャーするコネクターの設定において、fulfillment が論理的なサーバー名であるとします。コネクターは以下の 4 つの Kafka トピックにレコードをストリーミングします。
-
fulfillment.inventory.products -
fulfillment.inventory.products_on_hand -
fulfillment.inventory.customers -
fulfillment.inventory.orders
テーブルは特定のスキーマの一部ではなく、デフォルトの public PostgreSQL スキーマで作成されたとします。Kafka トピックの名前は以下になります。
-
fulfillment.public.products -
fulfillment.public.products_on_hand -
fulfillment.public.customers -
fulfillment.public.orders
コネクターは、同様の命名規則を適用して、トランザクションメタデータのトピック をラベル付けします。
デフォルトのトピック名が要件を満たさない場合は、カスタムトピック名を設定できます。カスタムトピック名を設定するには、論理トピックルーティング SMT に正規表現を指定します。論理トピックルーティング SMT を使用してトピックの命名をカスタマイズする方法は、トピックルーティング を参照してください。
2.6.2.8. トランザクション境界を表す Debezium PostgreSQL コネクターによって生成されたイベント リンクのコピーリンクがクリップボードにコピーされました!
Debezium は、トランザクション境界を表し、データ変更イベントメッセージを強化するイベントを生成できます。
Debezium は、コネクターのデプロイ後に発生するトランザクションに対してのみメタデータを登録し、受信します。コネクターをデプロイする前に発生するトランザクションのメタデータは利用できません。
Debezium はすべてのトランザクションの BEGIN および END に対して、以下のフィールドが含まれるイベントを生成します。
status-
BEGINまたはEND id-
Postgres トランザクション ID 自体と、コロンで区切られた特定の操作の LSN で構成される一意のトランザクション識別子の文字列表現。形式は
txID:LSNです。 ts_ms-
データソースでのトランザクション境界イベント (
BEGINまたはENDイベント) の時間。データソースから Debezium にイベント時間を渡されない場合、フィールドは代わりに Debezium がイベントを処理する時間を表します。 event_count(ENDイベント用)- トランザクションによって出力されるイベントの合計数。
data_collections(ENDイベント用)-
data_collectionとevent_count要素のペアの配列。これは、コネクターがデータコレクションから発信された変更に対して出力するイベントの数を示します。
例
topic.transaction オプションで上書きされない限り、トランザクションイベントは <topic.prefix>.transaction という名前のトピックに書き込まれます。
変更データイベントのエンリッチメント
トランザクションメタデータを有効にすると、データメッセージ Envelope は新しい transaction フィールドで強化されます。このフィールドは、複合フィールドの形式ですべてのイベントに関する情報を提供します。
id- 一意のトランザクション識別子の文字列表現。
total_order- トランザクションによって生成されたすべてのイベントを対象とするイベントの絶対位置。
data_collection_order- トランザクションによって出力されたすべてのイベントを対象とするイベントのデータコレクションごとの位置。
以下は、メッセージの例になります。
2.6.3. Debezium PostgreSQL コネクターのデータ変更イベントの説明 リンクのコピーリンクがクリップボードにコピーされました!
Debezium PostgreSQL コネクターは、行レベルの INSERT、UPDATE、および DELETE 操作ごとにデータ変更イベントを生成します。各イベントにはキーと値が含まれます。キーと値の構造は、変更されたテーブルによって異なります。
Debezium および Kafka Connect は、イベントメッセージの継続的なストリーム を中心として設計されています。ただし、これらのイベントの構造は時間の経過とともに変化する可能性があり、コンシューマーによる処理が困難になることがあります。これに対応するために、各イベントにはコンテンツのスキーマが含まれます。スキーマレジストリーを使用している場合は、コンシューマーがレジストリーからスキーマを取得するために使用できるスキーマ ID が含まれます。これにより、各イベントが自己完結型になります。
以下のスケルトン JSON は、変更イベントの基本となる 4 つの部分を示しています。ただし、アプリケーションで使用するために選択した Kafka Connect コンバーターの設定方法によって、変更イベントのこれら 4 部分の表現が決定されます。schema フィールドは、変更イベントが生成されるようにコンバーターを設定した場合のみ変更イベントに含まれます。同様に、イベントキーおよびイベントペイロードは、変更イベントが生成されるようにコンバーターを設定した場合のみ変更イベントに含まれます。JSON コンバーターを使用し、変更イベントの基本となる 4 つの部分すべてを生成するように設定すると、変更イベントの構造は次のようになります。
| 項目 | フィールド名 | 説明 |
|---|---|---|
| 1 |
|
最初の |
| 2 |
|
最初の |
| 3 |
|
2 つ目の |
| 4 |
|
2 つ目の |
デフォルトの動作では、コネクターによって、変更イベントレコードが イベントの元のテーブル名前が同じトピック にストリーミングされます。
Kafka 0.10 以降では、任意でイベントキーおよび値を タイムスタンプ とともに記録できます。このタイムスタンプはメッセージが作成された (プロデューサーによって記録) 時間または Kafka によってログに買い込まれた時間を示します。
PostgreSQL コネクターは、すべての Kafka Connect スキーマ名が Avro スキーマ名の形式 に準拠するようにします。つまり、論理サーバー名はアルファベットまたはアンダースコア (a-z、A-Z、または _) で始まる必要があります。論理サーバー名の残りの各文字と、スキーマ名とテーブル名の各文字は、アルファベット、数字、またはアンダースコア ( a-z、A-Z、0-9、または _) でなければなりません。無効な文字がある場合は、アンダースコアに置き換えられます。
論理サーバー名、スキーマ名、またはテーブル名に無効な文字が含まれ、名前を区別する唯一の文字が無効であると、無効な文字はすべてアンダースコアに置き換えられるため、予期せぬ競合が発生する可能性があります。
詳細は以下を参照してください。
2.6.3.1. Debezium PostgreSQL の変更イベントのキー リンクのコピーリンクがクリップボードにコピーされました!
指定のテーブルでは、変更イベントのキーは、イベントが作成された時点でテーブルのプライマリーキーの各列のフィールドが含まれる構造を持ちます。また、テーブルの REPLICA IDENTITY が FULL または USING INDEX に設定されている場合は、各ユニークキー制約のフィールドがあります。
public データベーススキーマに定義されている customers テーブルと、そのテーブルの変更イベントキーの例を見てみましょう。
テーブルの例
変更イベントキーの例
topic.prefix コネクター設定プロパティーに PostgreSQL_server の値がある場合、この定義がある限り customers テーブルの変更イベントはすべて同じキー構造を持ち、JSON では以下のようになります。
| 項目 | フィールド名 | 説明 |
|---|---|---|
| 1 |
|
キーのスキーマ部分は、キーの |
| 2 |
|
キーのペイロードの構造を定義するスキーマの名前。このスキーマは、変更されたテーブルのプライマリーキーの構造を記述します。キースキーマ名の形式は connector-name.database-name.table-name.
|
| 3 |
|
イベントキーの |
| 4 |
|
各フィールドの名前、インデックス、およびスキーマなど、 |
| 5 |
|
この変更イベントが生成された行のキーが含まれます。この例では、キーには値が |
column.exclude.list および column.include.list コネクター設定プロパティーは、テーブル列のサブセットのみをキャプチャーできるようにしますが、プライマリーキーまたは一意キーのすべての列は常にイベントのキーに含まれます。
テーブルにプライマリーキーまたは一意キーがない場合は、変更イベントのキーは null になります。プライマリーキーや一意キーの制約がないテーブルの行は一意に識別できません。
2.6.3.2. Debezium PostgreSQL 変更イベントの値 リンクのコピーリンクがクリップボードにコピーされました!
変更イベントの値はキーよりも若干複雑です。キーと同様に、値には schema セクションと payload セクションがあります。schema セクションには、入れ子のフィールドを含む、Envelope セクションの payload 構造を記述するスキーマが含まれています。データを作成、更新、または削除する操作のすべての変更イベントには、Envelope 構造を持つ値 payload があります。
変更イベントキーの例を紹介するために使用した、同じサンプルテーブルについて考えてみましょう。
この表への変更に対する変更イベントの値は、REPLICA IDENTITY 設定およびイベントの目的である操作により異なります。
詳細は、以下を参照してください。
Replica identity
REPLICA IDENTITY は UPDATE および DELETE イベントの論理デコードプラグインで利用可能な情報量を決定する PostgreSQL 固有のテーブルレベルの設定です。具体的には、REPLICA IDENTITY の設定は、UPDATE または DELETE イベントが発生するたびに、関係するテーブル列の以前の値に利用可能な情報 (ある場合) を制御します。
REPLICA IDENTITY には 4 つの可能性があります。
DEFAULT- テーブルにプライマリーキーがある場合に、UPDATEおよびDELETEイベントにテーブルのプライマリーキー列の以前の値が含まれることがデフォルトの動作になります。UPDATEイベントでは、値が変更されたプライマリーキー列のみが存在します。テーブルにプライマリーキーがない場合、コネクターはそのテーブルの
UPDATEまたはDELETEイベントを出力しません。プライマリーキーのないテーブルの場合、コネクターは 作成 イベントのみを出力します。通常、プライマリーキーのないテーブルは、テーブルの最後にメッセージを追加するために使用されます。そのため、UPDATEおよびDELETEイベントは便利ではありません。-
NOTHING:UPDATEおよびDELETE操作の出力されたイベントにはテーブル列の以前の値に関する情報は含まれません。 -
FULL:UPDATEおよびDELETE操作の出力されたイベントには、テーブルの列すべての以前の値が含まれます。 -
INDEXindex-name:UPDATEおよびDELETE操作の発生したイベントには、指定されたインデックスに含まれる列の以前の値が含まれます。UPDATEイベントには、更新された値を持つインデックス化された列も含まれます。
create イベント
以下の例は、customers テーブルにデータを作成する操作に対して、コネクターによって生成される変更イベントの値の部分を示しています。
| 項目 | フィールド名 | 説明 |
|---|---|---|
| 1 |
| 値のペイロードの構造を記述する、値のスキーマ。変更イベントの値スキーマは、コネクターが特定のテーブルに生成するすべての変更イベントで同じになります。 |
| 2 |
|
|
| 3 |
|
|
| 4 |
|
|
| 5 |
|
値の実際のデータ。これは、変更イベントが提供する情報です。 |
| 6 |
|
イベント発生前の行の状態を指定する任意のフィールド。この例のように、 注記
このフィールドを利用できるかどうかは、各テーブルの |
| 7 |
|
イベント発生後の行の状態を指定する任意のフィールド。この例では、 |
| 8 |
| イベントのソースメタデータを記述する必須のフィールド。このフィールドには、イベントの発生元、イベントの発生順序、およびイベントが同じトランザクションの一部であるかどうかなど、このイベントと他のイベントを比較するために使用できる情報が含まれています。ソースメタデータには以下が含まれています。
|
| 9 |
|
コネクターによってイベントが生成される原因となった操作の型を記述する必須文字列。この例では、
|
| 10 |
|
コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。 |
更新 イベント
サンプル customers テーブルにある更新の変更イベントの値には、そのテーブルの 作成 イベントと同じスキーマがあります。同様に、イベント値のペイロードは同じ構造を持ちます。ただし、イベント値ペイロードでは 更新 イベントに異なる値が含まれます。以下は、コネクターによって customers テーブルでの更新に生成されるイベントの変更イベント値の例になります。
| 項目 | フィールド名 | 説明 |
|---|---|---|
| 1 |
|
データベースをコミットする前に行にあった値が含まれる任意のフィールド。この例では、テーブルの
update イベントに、行にあるすべての列に指定されたこれまでの値を含めるには、 |
| 2 |
|
イベント発生後の行の状態を指定する任意のフィールド。この例では、 |
| 3 |
|
イベントのソースメタデータを記述する必須のフィールド。
|
| 4 |
|
操作の型を記述する必須の文字列。更新 イベントの値では、 |
| 5 |
|
コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。 |
行のプライマリーキー/一意キーの列を更新すると、行のキーの値が変更されます。キーが変更されると、3 つ のイベントが Debezium によって出力されます。3 つのイベントとは、DELETE イベント、行の古いキーを持つ 廃棄 (tombstone)、およびそれに続く行の新しいキーを持つイベントです。詳細は次のセクションで説明します。
プライマリーキーの更新
行のプライマリーキーフィールドを変更する UPDATE 操作は、プライマリーキーの変更と呼ばれます。プライマリーキーの変更では、UPDATE イベントレコードの代わりにコネクターが古いキーの DELETE イベントレコードと、新しい (更新された) キーの CREATE イベントレコードを出力します。これらのイベントには通常の構造と内容があり、イベントごとにプライマリーキーの変更に関連するメッセージヘッダーがあります。
-
DELETEイベントレコードには、メッセージヘッダーとして__debezium.newkeyが含まれます。このヘッダーの値は、更新された行の新しいプライマリーキーです。 -
CREATEイベントレコードには、メッセージヘッダーとして__debezium.oldkeyが含まれます。このヘッダーの値は、更新された行にあった以前の (古い) プライマリーキーです。
delete イベント
削除 変更イベントの値は、同じテーブルの 作成 および 更新 イベントと同じ schema の部分になります。サンプル customers テーブルの 削除 イベントの payload 部分は以下のようになります。
| 項目 | フィールド名 | 説明 |
|---|---|---|
| 1 |
|
イベント発生前の行の状態を指定する任意のフィールド。delete イベント値の |
| 2 |
|
イベント発生後の行の状態を指定する任意のフィールド。削除 イベント値の |
| 3 |
|
イベントのソースメタデータを記述する必須のフィールド。削除 イベント値の
|
| 4 |
|
操作の型を記述する必須の文字列。 |
| 5 |
|
コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。 |
削除 変更イベントレコードは、この行の削除を処理するために必要な情報を持つコンシューマーを提供します。
プライマリーキーを持たないテーブルに対して生成された 削除 イベントをコンシューマーが処理できるようにするには、テーブルの REPLICA IDENTITY を FULL に設定します。テーブルにプライマリーキーがなく、テーブルの REPLICA IDENTITY が DEFAULT または NOTHING に設定されている場合、削除 イベントの before フィールドはありません。
PostgreSQL コネクターイベントは、Kafka のログコンパクション と動作するように設計されています。ログコンパクションにより、少なくとも各キーの最新のメッセージが保持される限り、一部の古いメッセージを削除できます。これにより、トピックに完全なデータセットが含まれ、キーベースの状態のリロードに使用できるようにするとともに、Kafka がストレージ領域を確保できるようにします。
tombstone イベント
行が削除された場合でも、Kafka は同じキーを持つ以前のメッセージをすべて削除できるため、削除 イベントの値はログコンパクションで動作します。ただし、Kafka が同じキーを持つすべてのメッセージを削除するには、メッセージの値が null である必要があります。これを可能にするには、PostgreSQL コネクターは、値が null 値以外の同じキーを持つ特別な 廃棄 イベントが含まれる 削除 イベントに従います。
truncate イベント
truncate 変更イベントは、テーブルが切り捨てられたことを通知します。この場合のメッセージキーは null で、メッセージの値は以下のようになります。
| 項目 | フィールド名 | 説明 |
|---|---|---|
| 1 |
|
イベントのソースメタデータを記述する必須のフィールド。truncate イベント値の
|
| 2 |
|
操作の型を記述する必須の文字列。 |
| 3 |
|
コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。 |
1 つの TRUNCATE 操作が複数テーブルに影響する場合、コネクターは切り捨てられたテーブルごとに 1 つの truncate 変更イベントレコードを出力します。
truncate イベントは、テーブル全体に対して加えられ、メッセージキーを持たない変更を表します。その結果、複数のパーティションがあるトピックでは、変更イベント (create、update など)、またはテーブルに関連する truncate イベントについて、順序は保証されません。たとえば、コンシューマーが複数のパーティションからテーブルのイベントを読み取る場合、別のパーティションからテーブル内のすべてのデータを削除する truncate イベントを受信した後、あるパーティションからテーブルの update イベントを受け取る可能性があります。順序は、単一のパーティションを使用するトピックでのみ保証されます。
コネクターに truncate イベントをキャプチャーさせたくない場合は、skipped.operations オプションを使用して除外します。
このイベントタイプは、Postgres 14+ の pgoutput プラグインでのみサポートされています (Postgres ドキュメント)。
メッセージ イベントは、一般的に pg_logical_emit_message 関数を使用して、汎用の論理デコードメッセージが WAL に直接挿入されたことを通知します。メッセージキーは、ここでは prefix という名前の 1 つのフィールドを持つ Struct で、メッセージを挿入する際に指定された接頭辞を持ちます。トランザクションメッセージの場合、メッセージの値は以下のようになります。
他のイベントタイプとは異なり、非トランザクションメッセージは、関連する BEGIN や END のトランザクションイベントを持ちません。メッセージの値は、非取引メッセージの場合は以下のようになります。
| 項目 | フィールド名 | 説明 |
|---|---|---|
| 1 |
|
イベントのソースメタデータを記述する必須のフィールド。メッセージ イベント値では、
|
| 2 |
|
操作の型を記述する必須の文字列。 |
| 3 |
|
コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。
非トランザクション メッセージ イベントの場合、 |
| 4 |
| メッセージのメタデータを格納するフィールド
|
2.6.4. Debezium PostgreSQL コネクターによるデータ型のマッピング方法 リンクのコピーリンクがクリップボードにコピーされました!
PostgreSQL コネクターは、行が存在するテーブルのように構造化されたイベントで行への変更を表します。イベントには、各列の値のフィールドが含まれます。その値がどのようにイベントで示されるかは、列の PostgreSQL のデータ型によって異なります。以下のセクションでは、PostgreSQL データ型をイベントフィールドの リテラル型 および セマンティック型 にマッピングする方法を説明します。
-
literal type は、Kafka Connect スキーマタイプ (
INT8、INT16、INT32、INT64、FLOAT32、FLOAT64、BOOLEAN、STRING、BYTES、ARRAY、MAP、STRUCT) を使用して、値がどのように表現されるかを記述します。 - セマンティック型 は、フィールドの Kafka Connect スキーマの名前を使用して、Kafka Connect スキーマがフィールドの 意味 をキャプチャーする方法を記述します。
デフォルトのデータ型変換が要件に合わない場合は、コネクター用の カスタムコンバーターの作成 が可能です。
詳細は以下を参照してください。
基本型
以下の表は、コネクターによる基本型へのマッピング方法を説明しています。
| PostgreSQL のデータ型 | リテラル型 (スキーマ型) | セマンティック型 (スキーマ名) および注記 |
|---|---|---|
|
|
| 該当なし |
|
|
| 該当なし |
|
|
|
|
|
|
|
|
|
|
| 該当なし |
|
|
| 該当なし |
|
|
| 該当なし |
|
|
| 該当なし |
|
|
| 該当なし |
|
|
| 該当なし |
|
|
| 該当なし |
|
|
| 該当なし |
|
|
| 該当なし |
|
|
| 該当なし |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
該当なし |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 該当なし |
|
|
| 該当なし |
|
|
|
該当なし |
|
|
|
n/a |
|
|
|
n/a |
|
|
|
該当なし |
|
|
|
該当なし |
|
|
|
該当なし |
|
|
|
|
時間型
タイムゾーン情報が含まれる PostgreSQL の TIMESTAMPTZ and TIMETZ データ型以外に、時間型がマッピングされる仕組みは time.precision.mode コネクター設定プロパティーの値によって異なります。ここでは、以下のマッピングを説明します。
time.precision.mode=adaptive
time.precision.mode プロパティーがデフォルトの adaptive に設定された場合、コネクターは列のデータ型定義に基づいてリテラル型とセマンティック型を決定します。これにより、イベントがデータベースの値を 正確 に表すようになります。
| PostgreSQL のデータ型 | リテラル型 (スキーマ型) | セマンティック型 (スキーマ名) および注記 |
|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
time.precision.mode=adaptive_time_microseconds
time.precision.mode 設定プロパティーが adaptive_time_microseconds に設定されている場合には、コネクターは列のデータ型定義に基づいて一時的な型のリテラル型とセマンティック型を決定します。これにより、マイクロ秒としてキャプチャーされた TIME フィールド以外は、イベントがデータベースの値を 正確 に表すようになります。
| PostgreSQL のデータ型 | リテラル型 (スキーマ型) | セマンティック型 (スキーマ名) および注記 |
|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
time.precision.mode=connect
time.precision.mode 設定プロパティーが connect に設定された場合、コネクターは Kafka Connect の論理型を使用します。これは、コンシューマーが組み込みの Kafka Connect の論理型のみを処理でき、可変精度の時間値を処理できない場合に便利です。ただし、PostgreSQL はマイクロ秒の精度をサポートするため、connect 時間精度を指定してコネクターによって生成されたイベントは、データベース列の 少数秒の精度 値が 3 よりも大きい場合に、精度が失われます。
| PostgreSQL のデータ型 | リテラル型 (スキーマ型) | セマンティック型 (スキーマ名) および注記 |
|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
TIMESTAMP 型
TIMESTAMP 型は、タイムゾーン情報のないタイムスタンプを表します。このような列は、UTC を基にして同等の Kafka Connect 値に変換されます。たとえば、time.precision.mode が connect に設定されていない場合、TIMESTAMP 値 "2018-06-20 15:13:16.945104" は、io.debezium.time.MicroTimestamp の値 "1529507596945104" で表されます。
Kafka Connect および Debezium を実行している JVM のタイムゾーンは、この変換には影響しません。
PostgreSQL は TIMESTAMP 列に +/-infinite の値を使用することをサポートしています。これらの特殊な値は、正の無限大の場合は 9223372036825200000、負の無限大の場合は -9223372036832400000 の値を持つタイムスタンプに変換されます。この動作は、PostgreSQL JDBC ドライバーの標準的な動作に似ています。詳細は、org.postgresql.PGStatement インターフェイスを参照してください。
10 進数型
PostgreSQL コネクター設定プロパティーの設定 decimal.handling.mode は、コネクターが 10 進数型をマッピングする方法を決定します。
decimal.handling.mode プロパティーが precise に設定されている場合、コネクターは DECIMAL と NUMERIC、MONEY 列すべてに Kafka Connect org.apache.kafka.connect.data.Decimal 論理型を使用します。これはデフォルトのモードです。
| PostgreSQL のデータ型 | リテラル型 (スキーマ型) | セマンティック型 (スキーマ名) および注記 |
|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
このルールには例外があります。スケーリング制約なしで NUMERIC または DECIMAL 型が使用されると、データベースから取得される値のスケールは値ごとに異なります (可変)。この場合、コネクターは io.debezium.data.VariableScaleDecimal を使用し、これには転送された値とスケールの両方が含まれます。
| PostgreSQL のデータ型 | リテラル型 (スキーマ型) | セマンティック型 (スキーマ名) および注記 |
|---|---|---|
|
|
|
|
|
|
|
|
decimal.handling.mode プロパティーが double に設定されている場合、コネクターはすべての DECIMAL、NUMERIC、MONEY 値を Java の double 値として表し、次の表のようにエンコードします。
| PostgreSQL のデータ型 | リテラル型 (スキーマ型) | セマンティック型 (スキーマ名) |
|---|---|---|
|
|
| |
|
|
| |
|
|
|
decimal.handling.mode 設定プロパティーの最後の設定は string です。この場合、コネクターは DECIMAL、NUMERIC および MONEY 値をフォーマットされた文字列表現として表し、それらを以下の表に示すとおりエンコードします。
| PostgreSQL のデータ型 | リテラル型 (スキーマ型) | セマンティック型 (スキーマ名) |
|---|---|---|
|
|
| |
|
|
| |
|
|
|
PostgreSQL は、decimal.handling.mode の設定が string または double の場合、DECIMAL /NUMERIC 値に格納される特別な値として NaN(not a number) をサポートしています。この場合、コネクターは NaN を Double.NaN または文字列定数 NAN のいずれかとしてエンコードします。
HSTORE 型
PostgreSQL コネクター設定プロパティーの設定 hstore.handling.mode は、コネクターが HSTORE の値をマッピングする方法を決定します。
hstore.handling.mode プロパティーが json (デフォルト) に設定されている場合、コネクターは HSTORE 値を JSON 値の文字列表現として表し、以下の表で示すようにエンコードします。hstore.handling.mode プロパティーが map に設定されている場合、コネクターは HSTORE 値に MAP スキーマタイプを使用します。
| PostgreSQL のデータ型 | リテラル型 (スキーマ型) | セマンティック型 (スキーマ名) および注記 |
|---|---|---|
|
|
|
|
|
|
|
該当なし |
ドメイン型
PostgreSQL は、他の基礎となるタイプに基づいたユーザー定義の型をサポートします。このような列型を使用すると、Debezium は完全な型階層に基づいて列の表現を公開します。
PostgreSQL ドメイン型を使用する列で変更をキャプチャーするには、特別に考慮する必要があります。デフォルトデータベース型の 1 つを拡張するドメインタイプと、カスタムの長さまたはスケールを定義するドメインタイプが含まれるように列が定義されると、生成されたスキーマは定義されたその長さとスケールを継承します。
カスタムの長さまたはスケールを定義するドメインタイプを拡張する別のドメインタイプが含まれるように列が定義されていると、その情報は PostgreSQL ドライバーの列メタデータにはないため、生成されたスキーマは定義された長さやスケールを継承 しません。
ネットワークアドレス型
PostgreSQL には、IPv4、IPv6、および MAC アドレスを保存できるデータ型があります。ネットワークアドレスの格納には、プレーンテキスト型ではなくこの型を使用することが推奨されます。ネットワークアドレス型は、入力エラーチェックと特化した演算子および関数を提供します。
| PostgreSQL のデータ型 | リテラル型 (スキーマ型) | セマンティック型 (スキーマ名) および注記 |
|---|---|---|
|
|
|
該当なし |
|
|
|
該当なし |
|
|
|
該当なし |
|
|
|
該当なし |
PostGIS タイプ
PostgreSQL コネクターは、すべての PostGIS データ型 をサポートします。
| PostGIS データ型 | リテラル型 (スキーマ型) | セマンティック型 (スキーマ名) および注記 |
|---|---|---|
|
|
|
詳細は、Open Geospatial Consortium Simple Features Access を参照してください。 |
|
|
|
詳細は、Open Geospatial Consortium Simple Features Access を参照してください。 |
pgvector タイプ
PostgreSQL コネクターは、すべての pgvector エクステンションデータ型 をサポートします。
| pgvector データ型 | リテラル型 (スキーマ型) | セマンティック型 (スキーマ名) および注記 |
|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
次のフィールドを含む構造体が含まれます。
|
TOAST 化された値
PostgreSQL ではページサイズにハード制限があります。つまり、約 8KB 以上の値は、TOAST ストレージ を使用して保存する必要があるのです。これは、データベースからのレプリケーションメッセージに影響します。TOAST メカニズムを使用して保存され、変更されていない値は、テーブルのレプリカ ID の一部でない限り、メッセージに含まれません。競合が発生する可能性があるため、Debezium が不足している値を直接データベースから読み取る安全な方法はありません。そのため、Debezium は以下のルールに従って、TOAST 化された値を処理します。
-
REPLICA IDENTITY FULL- TOAST 列の値を持つテーブルは、他の列と同様に変更イベントのbeforeおよびafterフィールドの一部となります。 -
REPLICA IDENTITY DEFAULTのあるテーブル - データベースからUPDATEイベントを受信すると、レプリカ ID の一部ではない変更されていない TOAST 列値はイベントに含まれません。同様に、DELETEイベントを受信するときに TOAST 列 (ある場合) はbeforeフィールドにありません。この場合、Debezium は列値を安全に提供できないため、コネクターはコネクター設定プロパティーunavailable.value.placeholderによって定義されたとおりにプレースホルダー値を返します。
デフォルト値
データベーススキーマの列にデフォルト値が指定されている場合、PostgreSQL コネクターは可能な限りこの値を Kafka スキーマに反映させようとします。ほとんどの一般的なデータ型がサポートされています。
-
BOOLEAN -
数値型 (
(INT、FLOAT、NUMERICなど) -
テキストタイプ (
CHAR、VARCHAR、TEXTなど) -
時間の種類 (
DATE、TIME、INTERVAL、TIMESTAMP、TIMESTAMPTZ) -
JSON,JSONB,XML -
UUID
時間型の場合、デフォルト値の解析は PostgreSQL ライブラリーによって提供されることに注意してください。したがって、PostgreSQL で通常サポートされている文字列表現は、コネクターでもサポートされている必要があります。
デフォルト値がインラインで直接指定されるのではなく関数によって生成される場合、コネクターは代わりに、指定されたデータ型の 0 に相当するものをエクスポートします。これらの値は以下の通りです。
-
BOOLEANではFALSE -
数値タイプの場合、適切な精度で
0 - text/XML タイプの場合は空の文字列
-
JSON タイプの場合は
{} -
1970-01-01DATE、TIMESTAMP、TIMESTAMPTZタイプの場合 -
TIME00:00 -
INTERVALのEPOCH -
00000000-0000-0000-0000-000000000000(UUID)
現在、このサポートは、関数の明示的な使用にのみ適用されます。たとえば、CURRENT_TIMESTAMP(6) は括弧付きでサポートされていますが、CURRENT_TIMESTAMP はサポートされていません。
デフォルト値の伝搬のサポートは、主に、スキーマのバージョン間の互換性を強制するスキーマレジストリーを持つ PostgreSQL コネクターを使用する際に、スキーマを安全に進化させるために存在します。この主な問題と、異なるプラグインのリフレッシュ動作のために、Kafka スキーマに存在するデフォルト値は、データベーススキーマのデフォルト値と常に同期していることは保証されません。
- デフォルト値は、あるプラグインがいつ、どのようにインメモリースキーマの更新をトリガーするかによって、Kafka スキーマに '遅れて' 現れることがあります。リフレッシュの間にデフォルトが何度も変更されると、Kafka スキーマに値が現れないか、スキップされることがある。
- コネクターに処理を待機しているレコードがあるときにスキーマの更新がトリガーされた場合、デフォルト値が Kafka スキーマに '早期' に表示されることがあります。これは、列のメタデータがレプリケーションメッセージに含まれているのではなく、リフレッシュ時にデータベースから読み取られるためです。これは、コネクターが遅れていてリフレッシュが発生した場合や、更新がソースデータベースに書き込まれ続けている間にコネクターが一時的に停止した場合に、コネクターの起動時に発生する可能性があります。
この動作は予想外かもしれませんが、それでも安全です。影響を受けるのはスキーマ定義のみで、メッセージに含まれる実際の値はソースデータベースに書き込まれたものと一貫性を保ちます。
カスタムコンバーター
デフォルトでは、Debezium は、SQL CREATE TYPE ステートメントを使用して作成された複合型など、カスタムのデータ型が指定された列からのデータは複製しません。カスタムデータ型の列をレプリケートするには、カスタムコンバーターを作成する の手順に従いますが、重要な注意事項が複数あります。
-
コネクター設定の
include.unknown.datatypesプロパティーをtrueに設定します。デフォルトのfalse設定では、カスタムコンバーターは常にnull値を返します。 コンバーターに渡される値のタイプは、レプリケーションスロットに設定されている論理デコード出力プラグインにより異なります。
-
decoderbufsは、列データのバイト配列 (byte[]) 表現を渡します。 -
pgoutputは、列データの文字列表現を渡します。
-
2.6.5. Debezium コネクターを実行するための PostgreSQL のセットアップ リンクのコピーリンクがクリップボードにコピーされました!
このリリースの Debezium では、ネイティブの pgoutput 論理レプリケーションストリームのみがサポートされます。pgoutput プラグインを使用するように PostgreSQL を設定するには、レプリケーションスロットを有効にし、レプリケーションの実行に必要な権限を持つユーザーを設定します。
詳細は以下を参照してください。
2.6.5.1. Debezium pgoutput プラグインのレプリケーションスロットの設定 リンクのコピーリンクがクリップボードにコピーされました!
PostgreSQL の論理デコード機能はレプリケーションスロットを使用します。レプリケーションスロットを設定するには、postgresql.conf ファイルに以下を指定します。
wal_level=logical max_wal_senders=1 max_replication_slots=1
wal_level=logical
max_wal_senders=1
max_replication_slots=1
これらの設定は、PostgreSQL サーバーを以下のように指示します。
-
wal_level- 先行書き込みログで論理デコードを使用します。 -
max_wal_senders- WAL 変更の処理に、1 つの個別プロセスの最大を使用します。 -
max_replication_slots- WAL の変更をストリーミングするために作成される 1 つのレプリケーションスロットの最大を許可します。
レプリケーションスロットは、Debezium の停止中でも Debezium に必要なすべての WAL エントリーを保持することが保証されいます。したがって、以下の点を避けるために、レプリケーションスロットを注意して監視することが重要になります。
- 過剰なディスク消費量。
- レプリケーションスロットが長期間使用されないと発生する可能性がある、あらゆる状態 (カタログの肥大化など)。
詳細は、レプリケーションスロットに関する PostgreSQL のドキュメント を参照してください。
PostgreSQL ログ先行書き込みの設定 や仕組みを理解していると、Debezium PostgreSQL コネクターを使用する場合に役立ちます。
2.6.5.2. Debezium コネクターの PostgreSQL パーミッションのセットアップ リンクのコピーリンクがクリップボードにコピーされました!
PostgreSQL サーバーを設定して Debezium コネクターを実行するには、レプリケーションを実行できるデータベースユーザーが必要です。レプリケーションは、適切なパーミッションを持つデータベースユーザーのみが実行でき、設定された数のホストに対してのみ実行できます。
セキュリティー で説明されているように、スーパーユーザーはデフォルトで必要な REPLICATION および LOGIN ロールを持っていますが、Debezium レプリケーションユーザーの権限を昇格しないことが推奨されます。代わりに、必要最低限の特権を持つ Debezium ユーザーを作成します。
前提条件
- PostgreSQL の管理者権限。
手順
ユーザーにレプリケーションの権限を付与するには、少なくとも
REPLICATIONおよびLOGIN権限を持つ PostgreSQL ロールを定義し、そのロールをユーザーに付与します。以下に例を示します。CREATE ROLE <name> REPLICATION LOGIN;
CREATE ROLE <name> REPLICATION LOGIN;Copy to Clipboard Copied! Toggle word wrap Toggle overflow
2.6.5.3. Debezium が PostgreSQL パブリケーションを作成できるように権限を設定 リンクのコピーリンクがクリップボードにコピーされました!
Debezium は、PostgreSQL ソーステーブルの変更イベントを、テーブル用に作成された パブリケーション からストリーミングします。パブリケーションには、1 つ以上のテーブルから生成される変更イベントのフィルターされたセットが含まれます。各パブリケーションのデータは、パブリケーションの仕様に基づいてフィルターされます。この仕様は、PostgreSQL データベース管理者または Debezium コネクターが作成できます。Debezium PostgreSQL コネクターに、パブリケーションの作成やレプリケートするデータの指定を許可するには、コネクターはデータベースで特定の権限で操作する必要があります。
パブリケーションの作成方法を決定するオプションは複数あります。通常、コネクターを設定する前に、キャプチャーするテーブルのパブリケーションを手動で作成することが推奨されます。しかし、Debezium がパブリケーションを自動的に作成し、それに追加するデータを指定できるように、ご使用の環境を設定できます。
Debezium は include list および exclude list プロパティーを使用して、データがパブリケーションに挿入される方法を指定します。Debezium がパブリケーションを作成できるようにするオプションの詳細は、publication.autocreate.mode を参照してください。
Debezium が PostgreSQL パブリケーションを作成するには、以下の権限を持つユーザーとして実行する必要があります。
- パブリケーションにテーブルを追加するためのデータベースのレプリケーション権限。
-
パブリケーションを追加するためのデータベースの
CREATE権限。 -
最初のテーブルデータをコピーするためのテーブルの
SELECT権限。テーブルの所有者には、テーブルに対するSELECT権限が自動的に付与されます。
テーブルをパブリケーションに追加する場合は、ユーザーはテーブルの所有者である必要があります。ただし、ソーステーブルはすでに存在するため、元の所有者と所有権を共有する仕組みが必要です。共有所有権を有効にするには、PostgreSQL レプリケーショングループを作成した後、既存のテーブルの所有者とレプリケーションユーザーをそのグループに追加します。
手順
レプリケーショングループを作成します。
CREATE ROLE <replication_group>;
CREATE ROLE <replication_group>;Copy to Clipboard Copied! Toggle word wrap Toggle overflow テーブルの元の所有者をグループに追加します。
GRANT REPLICATION_GROUP TO <original_owner>;
GRANT REPLICATION_GROUP TO <original_owner>;Copy to Clipboard Copied! Toggle word wrap Toggle overflow Debezium レプリケーションユーザーをグループに追加します。
GRANT REPLICATION_GROUP TO <replication_user>;
GRANT REPLICATION_GROUP TO <replication_user>;Copy to Clipboard Copied! Toggle word wrap Toggle overflow テーブルの所有権を
<replication_group>に移します。ALTER TABLE <table_name> OWNER TO REPLICATION_GROUP;
ALTER TABLE <table_name> OWNER TO REPLICATION_GROUP;Copy to Clipboard Copied! Toggle word wrap Toggle overflow
Debezium がキャプチャ設定を指定するためには、の値が publication.autocreate.mode を filtered に設定する必要があります。
2.6.5.4. Debezium コネクターホストでのレプリケーションを許可するように PostgreSQL を設定 リンクのコピーリンクがクリップボードにコピーされました!
Debezium による PostgreSQL データのレプリケーションを可能にするには、データベースを設定し、PostgreSQL コネクターを実行するホストでのレプリケーションを許可する必要があります。データベースとのレプリケーションが許可されるクライアントを指定するには、エントリーを PostgreSQL ホストベースの認証ファイル pg_hba.conf に追加します。pg_hba.conf ファイルの詳細は、the PostgreSQL のドキュメントを参照してください。
手順
pg_hba.confファイルにエントリーを追加して、データベースホストでレプリケートできる Debezium コネクターホストを指定します。以下に例を示します。pg_hba.confファイルの例です。local replication <youruser> trust host replication <youruser> 127.0.0.1/32 trust host replication <youruser> ::1/128 trust
local replication <youruser> trust1 host replication <youruser> 127.0.0.1/32 trust2 host replication <youruser> ::1/128 trust3 Copy to Clipboard Copied! Toggle word wrap Toggle overflow Expand 表2.154 pg_hba.conf 設定の説明 項目 説明 1
ローカル (つまりサーバーマシン上) で
<youruser>のレプリケーションを許可するようにサーバーに指示します。2
IPV4を使用してレプリケーションの変更を受信することを、localhostの<youruser>に許可するようサーバーに指示します。3
IPV6を使用したレプリケーション変更の受信をlocalhostの<youruser>に許可するようサーバーに指示します。
ネットワークマスクの詳細は、PostgreSQL のドキュメント を参照してください。
2.6.5.5. サポートされる PostgreSQL トポロジー リンクのコピーリンクがクリップボードにコピーされました!
PostgreSQL コネクターは、スタンドアロンの PostgreSQL サーバーまたは PostgreSQL サーバーのクラスターで使用できます。
2.6.5.5.1. PostgreSQL 15 以前のクラスター リンクのコピーリンクがクリップボードにコピーされました!
PostgreSQL 15 以前を実行する環境に Debezium をデプロイする場合は、クラスター内のプライマリーサーバー上でのみ論理レプリケーションスロットを設定できます。クラスター内のレプリカサーバーで論理レプリケーションを設定することはできません。
したがって、Debezium PostgreSQL コネクターはプライマリーサーバーにのみ接続して通信できます。プライマリーサーバーに障害が発生すると、コネクターは停止します。障害から回復するには、クラスターを修復してから、元のプライマリーサーバーを primary にプロモートするか、別の PostgreSQL サーバーを primary にプロモートする必要があります。詳細は、障害発生後の新しいプライマリーからのデータのキャプチャー を参照してください。
2.6.5.5.2. PostgreSQL 16 以降のクラスター リンクのコピーリンクがクリップボードにコピーされました!
PostgreSQL 16 以降のクラスターで Debezium をデプロイする場合は、レプリカサーバーに論理レプリケーションスロットをセットアップできます。この機能により、Debezium はプライマリーサーバー以外のサーバーから変更イベントをキャプチャーできるようになります。ただし、レプリカサーバーへの Debezium 接続では、通常、プライマリーサーバーへの接続よりも待機時間が長くなることに注意してください。
また、PostgreSQL レプリカサーバー上のレプリケーションスロットは、プライマリーサーバー上の対応するスロットと自動的に同期されないことに注意してください。PostgreSQL 16 クラスターで障害発生後のリカバリーを容易にするには、定期的に手動同期を実行して、スタンバイサーバー上のレプリケーションスロットの位置をプライマリーサーバー上の位置に合わせる必要があります。
2.6.5.5.3. PostgreSQL 17 以降のクラスターを搭載した Debezium リンクのコピーリンクがクリップボードにコピーされました!
PostgreSQL 17 以降で Debezium をデプロイする場合、プライマリーサーバーに論理レプリケーションスロットを設定し、それらのスロットをフェイルオーバー用に有効化できます。PostgreSQL は、フェイルオーバースロットの状態を 1 つ以上のレプリカサーバーに自動的に伝播できます。自動レプリケーションが有効になっている環境では、障害が発生すると、使用可能なレプリカが自動的にプライマリーにプロモートされます。Debezium は、設定の変更を必要とせずに新しいプライマリーサーバーからの変更を引き続き取り込むことができるため、コネクターがイベントを見逃すことがなくなります。
Debezium と PostgreSQL 17 の使用、およびフェイルオーバーレプリケーションスロットを設定する機能は、テクノロジープレビュー機能です。テクノロジープレビュー機能は、Red Hat 製品のサービスレベルアグリーメント (SLA) の対象外であり、機能的に完全ではないことがあります。Red Hat は、実稼働環境でこれらを使用することを推奨していません。テクノロジープレビュー機能は、最新の製品機能をいち早く提供して、開発段階で機能のテストを行い、フィードバックを提供していただくことを目的としています。Red Hat のテクノロジープレビュー機能のサポート範囲に関する詳細は、テクノロジープレビュー機能のサポート範囲 を参照してください。
2.6.5.6. Debezium WAL ディスク領域の消費を管理するための PostgreSQL の設定 リンクのコピーリンクがクリップボードにコピーされました!
場合によっては、WAL ファイルによって使用される PostgreSQL ディスク領域が、異常に急上昇したり増加することがあります。このような場合、いくつかの理由が考えられます。
コネクターがデータを受信した最大の LSN は、サーバーの
pg_replication_slotsビューのconfirmed_flush_lsn列で確認できます。この LSN よりも古いデータは利用できず、データベースがディスク領域を解放します。また、
pg_replication_slotsビューのrestart_lsn列には、コネクターが必要とする可能性のある最も古い WAL の LSN が含まれています。confirmed_flush_lsnの値が定期的に増加し、restart_lsnの値に遅延が発生する場合は、データベースは領域を解放する必要があります。データベースは、通常バッチブロックでディスク領域を解放します。これは想定内の動作であり、ユーザーによるアクションは必要ありません。
追跡されるデータベースには多くの更新がありますが、一部の更新のみがコネクターの変更をキャプチャーするテーブルおよびスキーマに関連します。この状況は、定期的なハートビートイベントで簡単に解決できます。コネクターの
heartbeat.interval.msコネクター設定プロパティーを設定します。注記コネクターがハートビートテーブルからイベントを検出して処理するには、publication.name プロパティーで指定された PostgreSQL パブリケーションにテーブルを追加する必要があります。このパブリケーションが Debezium のデプロイメント以前のものである場合、コネクターは定義されたとおりにパブリケーションを使用します。パブリケーションがデータベース内の
FOR ALL TABLESの変更を自動的にレプリケートするように設定されていない場合は、次のように、ハートビートテーブルをパブリケーションに明示的に追加する必要があります。
ALTER PUBLICATION <publicationName> ADD TABLE <heartbeatTableName>;PostgreSQL インスタンスには複数のデータベースが含まれ、その 1 つがトラフィックが多いデータベースです。Debezium は、他のデータベースと比較して、トラフィックが少ない別のデータベースで変更をキャプチャーします。レプリケーションスロットがデータベースごとに機能し、Debezium が呼び出しされないため、Debezium は LSN を確認できません。WAL はすべてのデータベースで共有されているため、Debezium が変更をキャプチャーするデータベースによってイベントが出力されるまで、使用量が増加する傾向にあります。これに対応するには、以下を行う必要があります。
-
heartbeat.interval.msコネクター設定プロパティーを使用して、定期的なハートビートレコードの生成を有効にします。 - Debezium が変更をキャプチャーするデータベースから変更イベントを定期的に送信します。
新しい行を挿入したり、同じ行を定期的に更新することで、別のプロセスがテーブルを定期的に更新します。次に PostgreSQL は Debezium を呼び出して、最新の LSN を確認し、データベースが WAL 領域を解放できるようにします。このタスクは、
heartbeat.action.queryコネクター設定プロパティーを使用して自動化できます。-
同じデータベースサーバーに対する複数のコネクターの設定
Debezium はレプリケーションスロットを使用してデータベースから変更をストリーミングします。これらのレプリケーションスロットは、LSN (ログシーケンス番号) の形式で現在の位置を維持します。LSN は、Debezium コネクターによって使用される WAL 内の場所へのポインターです。これは、PostgreSQL が Debezium によって処理されるまで WAL を利用可能な状態に保つのに役立ちます。1 つのコンシューマーまたはプロセスに対して、1 つのレプリケーションスロットのみ存在できます。異なるコンシューマーごとに状態が異なる、異なる位置からのデータを必要とする可能性があるためです。
レプリケーションスロットは 1 つのコネクターでのみ使用できるため、Debezium コネクターごとに一意のレプリケーションスロットを作成することが重要です。ただし、コネクターがアクティブでない場合、Postgres は他のコネクターがレプリケーションスロットを消費できるようにする場合があります。これは、スロットが各変更を 1 回だけ発行するため、データ損失につながり、危険を伴う可能性があります (詳細参照)。
Debezium は、レプリケーションスロットに加えて、pgoutput プラグインを使用するときにパブリケーションを使用してイベントをストリーミングします。レプリケーションスロットと同様に、パブリケーションはデータベースレベルであり、一連のテーブルに対して定義されます。そのため、コネクターが同じテーブルセットで作業しない限り、各コネクターに固有のパブリケーションが必要になります。Debezium がパブリケーションを作成できるようにするオプションの詳細は、publication.autocreate.mode を参照してください。
各コネクターの一意のレプリケーションスロット名とパブリケーション名を設定する方法は、slot.name および publication.name を参照してください。
2.6.5.7. Debezium がキャプチャーする PostgreSQL データベースのアップグレード リンクのコピーリンクがクリップボードにコピーされました!
Debezium が使用する PostgreSQL データベースをアップグレードする場合は、データ損失を防止し、Debezium が確実に動作し続けるように、特定の手順を実行する必要があります。一般的に、Debezium はネットワーク障害やその他の機能停止による中断に対して耐性があります。たとえば、コネクターが監視しているデータベースサーバーが停止またはクラッシュした場合、コネクターは PostgreSQL サーバーとの通信を再確立した後、ログシーケンス番号 (LSN) オフセットによって記録された最後の位置から読み取りを続けます。コネクターは、最後に記録されたオフセットに関する情報を Kafka Connect オフセットトピックから取得し、設定された PostgreSQL レプリケーションスロットに対して同じ値のログシーケンス番号 (LSN) をクエリーします。
コネクターを起動して PostgreSQL データベースから変更イベントをキャプチャーするには、レプリケーションスロットが存在する必要があります。ただし、PostgreSQL アップグレードプロセスの一部として、レプリケーションスロットは削除され、アップグレードの完了後に元のスロットは復元されません。その結果、コネクターが再起動してレプリケーションスロットからの最後の既知のオフセットを要求すると、PostgreSQL で、情報を返すことができません。
新しいレプリケーションスロットを作成することもできますが、データが損失されないようにするには、新しいスロットを作成する必要があります。新しいレプリケーションスロットは、スロットの作成後に発生した変更に対してのみ LSN を提供できます。アップグレード前に発生したイベントのオフセットは、提供できません。コネクターが再起動すると、最初に Kafka オフセットトピックから最後の既知のオフセットを要求します。次に、要求をレプリケーションスロットに送信し、オフセットトピックから取得したオフセットの情報を返します。ただし、新しいレプリケーションスロットは、コネクターが想定の位置からストリーミングを再開するために必要な情報を提供できません。その後、コネクターはログ内の既存の変更イベントをスキップし、ログ内の最新の位置からのみストリーミングを再開します。これにより、通知なしにデータ損失が発生する可能性があります。コネクターは、スキップされたイベントのレコードを出力せず、イベントがスキップされたことを示す情報も提供しません。
データ損失のリスクを最小限に抑えながら Debezium がイベントのキャプチャーを継続できるように PostgreSQL データベースのアップグレードを実行する方法は、次の手順を参照してください。
手順
- データベースに書き込むアプリケーションを一時的に停止するか、読み取り専用モードにします。
- データベースをバックアップします。
- データベースへの書き込みアクセスを一時的に無効にします。
- 書き込み操作をブロックする前にデータベースで発生した変更が先行書き込みログ (WAL) に保存されていること、および WAL LSN がレプリケーションスロットに反映されていることを確認します。
-
レプリケーションスロットに書き込まれるすべてのイベントレコードをキャプチャーするのに十分な時間をコネクターに提供します。
この手順により、ダウンタイム前に発生したすべての変更イベントが考慮され、Kafka に保存されるようになります。 - フラッシュされた LSN の値をチェックして、コネクターがレプリケーションスロットからのエントリーの消費を終了したことを確認します。
Kafka Connect を停止して、コネクターを正常にシャットダウンします。
Kafka Connect はコネクターを停止し、すべてのイベントレコードを Kafka にフラッシュし、各コネクターから受信した最後のオフセットを記録します。
注記Kafka Connect クラスター全体を停止する代わりに、コネクターを削除することで停止できます。オフセットトピックは、他の Kafka コネクターと共有される可能性があるため、削除しないでください。後で、データベースへの書き込みアクセスを復元し、コネクターを再起動する準備ができたら、コネクターを再作成する必要があります。
-
PostgreSQL 管理者として、プライマリーデータベースサーバーのレプリケーションスロットを削除します。
slot.drop.on.stopプロパティーを使用して、レプリケーションスロットを削除しないでください。このプロパティーはテスト専用です。 - データベースを停止します。
-
pg_upgrade、pg_dump、pg_restoreなど、承認された PostgreSQL アップグレード手順を使用してアップグレードを実行します。 -
(オプション) 標準の Kafka ツールを使用して、オフセットストレージトピックからコネクターオフセットを削除します。
コネクターオフセットを削除する方法の例は、Debezium コミュニティー FAQ の コネクターオフセットを削除する方法 を参照してください。 - データベースを再起動します。
PostgreSQL 管理者として、データベース上に Debezium 論理レプリケーションスロットを作成します。データベースへの書き込みを有効にする前に、スロットを作成する必要があります。そうしないと、Debezium は変更をキャプチャーできず、データが失われます。
レプリケーションスロットの設定に関する詳細は、「Debezium
pgoutputプラグインのレプリケーションスロットの設定」 を参照してください。- Debezium がキャプチャーするテーブルを定義するパブリケーションがアップグレード後も存在することを確認します。パブリケーションが利用できない場合は、PostgreSQL 管理者としてデータベースに接続し、新しいパブリケーションを作成します。
-
前の手順で新しいパブリケーションを作成する必要があった場合は、Debezium コネクター設定を更新して、新しいパブリケーションの名前を
publication.nameプロパティーに追加します。 - コネクター設定で、コネクターの名前を変更します。
-
コネクター設定で、
slot.nameを Debezium レプリケーションスロットの名前に設定します。 - 新規レプリケーションスロットが利用可能であることを確認します。
- データベースへの書き込みアクセスを復元し、データベースに書き込んだアプリケーションを再起動します。
コネクター設定で、
snapshot.modeプロパティーをneverに設定し、コネクターを再起動します。注記手順 6 で Debezium がデータベース変更の読み取りをすべて完了したことを確認できなかった場合は、
snapshot.mode=initialを設定して新しいスナップショットを実行するようにコネクターを設定できます。必要に応じて、アップグレード直前に取得したデータベースのバックアップの内容をチェックして、コネクターがレプリケーションスロットからすべての変更を読み取るかどうかを確認できます。
2.6.6. Debezium PostgreSQL コネクターのデプロイメント リンクのコピーリンクがクリップボードにコピーされました!
以下の方法のいずれかを使用して Debezium PostgreSQL コネクターをデプロイできます。
Streams for Apache Kafka を使用して、コネクタープラグインを含むイメージを自動的に作成します。
以下は推奨される方法です。
-
Dockerfile からカスタム Kafka Connect コンテナーイメージをビルドします。
この Containerfile デプロイメント方法は非推奨となりました。この方法の手順は、ドキュメントの今後のバージョンで削除される予定です。
2.6.6.1. Streams for Apache Kafka を使用した PostgreSQL コネクターデプロイメント リンクのコピーリンクがクリップボードにコピーされました!
Debezium コネクターのデプロイで推奨される方法は、Streams for Apache Kafka を使用して、コネクタープラグインを含む Kafka Connect コンテナーイメージを構築することです。
デプロイメントプロセス中に、以下のカスタムリソース (CR) を作成し、使用します。
-
Kafka Connect インスタンスを定義し、コネクターアーティファクトに関する情報をイメージに含める必要がある
KafkaConnectCR。 -
コネクターがソースデータベースにアクセスするために使用する情報を提供する
KafkaConnectorCR。Streams for Apache Kafka が Kafka Connect Pod を起動した後、KafkaConnectorCR を適用してコネクターを起動します。
Kafka Connect イメージのビルド仕様では、デプロイ可能なコネクターを指定できます。各コネクタープラグインに対して、デプロイメントに利用可能にする他のコンポーネントを指定することもできます。たとえば、Apicurio Registry アーティファクトや Debezium スクリプトコンポーネントを追加できます。Streams for Apache Kafka は、Kafka Connect イメージをビルドするときに、指定されたアーティファクトをダウンロードし、それをイメージに組み込みます。
Kafka Connect CR の spec.build.output パラメーターは、生成される KafkaConnect コンテナーイメージを格納する場所を指定します。コンテナーイメージは、quay.io などのコンテナーレジストリー、または OpenShift ImageStream に保存できます。イメージを ImageStream に保存するには、Kafka Connect をデプロイする前に ImageStream を作成する必要があります。ImageStreams は自動的に作成されません。
KafkaConnect リソースを使用してクラスターを作成する場合は、Kafka Connect REST API を使用してコネクターを作成または更新できません。ただし、REST API を使用して情報を取得できます。
関連情報
- 「Streams for Apache Kafka on OpenShift のデプロイと管理」の Kafka Connect の設定
- 「Streams for Apache Kafka on OpenShift のデプロイと管理」の 新しいコンテナーイメージの自動ビルド
2.6.6.2. Streams for Apache Kafka を使用した Debezium PostgreSQL コネクターのデプロイ リンクのコピーリンクがクリップボードにコピーされました!
以前のバージョンの Streams for Apache Kafka では、OpenShift に Debezium コネクターをデプロイするには、まずコネクター用の Kafka Connect イメージをビルドする必要がありました。OpenShift にコネクターをデプロイするための現在の推奨方法は、Streams for Apache Kafka のビルド設定を使用して、使用する Debezium コネクタープラグインを含む Kafka Connect コンテナーイメージを自動的にビルドすることです。
ビルドプロセス中に、Streams for Apache Kafka Operator は、Debezium コネクター定義を含む KafkaConnect カスタムリソースの入力パラメーターを Kafka Connect コンテナーイメージに変換します。このビルドは、Red Hat Maven リポジトリーまたは別の設定済みの HTTP サーバーから、必要なアーティファクトをダウンロードします。
新規に作成されたコンテナーは .spec.build.output に指定されるコンテナーレジストリーにプッシュされ、Kafka Connect クラスターのデプロイに使用されます。Streams for Apache Kafka が Kafka Connect イメージをビルドした後、ビルドに含まれるコネクターを起動するための KafkaConnector カスタムリソースを作成します。
前提条件
- クラスター Operator がインストールされている OpenShift クラスターにアクセスできる。
- Streams for Apache Kafka Operator が実行されている。
- Apache Kafka クラスターが Streams for Apache Kafka on OpenShift のデプロイと管理 に記載されているとおりにデプロイされている。
- Kafka Connect が Streams for Apache Kafka にデプロイされている。
- Red Hat build of Debezium のライセンスを所有している。
-
OpenShift
ocCLI クライアントがインストールされている、または OpenShift Container Platform Web コンソールにアクセスできる。 Kafka Connect ビルドイメージの保存方法に応じて、レジストリーのパーミッションを用意するか、ImageStream リソースを作成している。
- ビルドイメージを Red Hat Quay.io または Docker Hub などのイメージレジストリーに保存する場合は、以下が必要です。
- レジストリーでイメージを作成し、管理するためのアカウントおよびパーミッション
- ビルドイメージをネイティブ OpenShift ImageStream として保存する場合は、以下を行います。
- 新規コンテナーイメージを保存するために、ImageStream リソースをクラスターにデプロイします。クラスターの ImageStream を明示的に作成する必要があります。ImageStreams は、デフォルトでは利用できません。ImageStreams の詳細は、OpenShift Container Platform でのイメージストリームの管理 を参照してください。
手順
- OpenShift クラスターにログインします。
コネクターの Debezium
KafkaConnectカスタムリソース (CR) を作成するか、既存のリソースを変更します。たとえば、metadata.annotationsおよびspec.buildプロパティーを指定するdbz-connect.yamlという名前のKafkaConnectCR を作成します。以下の例は、KafkaConnectカスタムリソースを記述するdbz-connect.yamlファイルからの抜粋を示しています。
例2.42 Debezium コネクターを含む
KafkaConnectカスタムリソースを定義したdbz-connect.yamlファイル次の例では、カスタムリソースは、次のアーティファクトをダウンロードするように設定されています。
- Debezium PostgreSQL コネクターアーカイブです。
- Red Hat build of Apicurio Registry アーカイブApicurio Registry はオプションのコンポーネントです。コネクターで Avro シリアル化を使用する場合にのみ、Apicurio Registry コンポーネントを追加します。
- Debezium スクリプティング SMT アーカイブと、Debezium コネクターで使用する関連スクリプティングエンジン。SMT アーカイブとスクリプト言語の依存関係はオプションのコンポーネントです。Debezium コンテンツベースのルーティング SMT または フィルター SMT を使用する場合にのみ、これらのコンポーネントを追加します。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow Expand 表2.155 Kafka Connect 設定の説明 項目 説明 1
strimzi.io/use-connector-resourcesアノテーションを"true"に設定して、クラスター Operator がKafkaConnectorリソースを使用してこの Kafka Connect クラスター内のコネクターを設定できるようにします。2
spec.build設定は、ビルドイメージの保存場所を指定し、プラグインアーティファクトの場所とともにイメージに追加するプラグインをリストします。3
build.outputは、新しくビルドされたイメージを保存するレジストリーを指定します。4
イメージ出力の名前およびイメージ名を指定します。
output.typeの有効な値は、Docker Hub や Quay などのコンテナーレジストリーにプッシュする場合はdocker、内部の OpenShift ImageStream にイメージをプッシュする場合はimagestreamです。ImageStream を使用するには、ImageStream リソースをクラスターにデプロイする必要があります。KafkaConnect 設定でbuild.outputを指定する方法の詳細は、Streams for Apache Kafka API リファレンスの スキーマ参照のビルド を参照してください。5
plugins設定は、Kafka Connect イメージに追加するすべてのコネクターをリストします。リストの各エントリーについて、プラグインnameと、コネクターのビルドに必要なアーティファクトに関する情報を指定します。必要に応じて、各コネクタープラグインに対して、コネクターと使用できる他のコンポーネントを含めることができます。たとえば、Service Registry アーティファクトまたは Debezium スクリプトコンポーネントを追加できます。6
artifacts.typeの値は、artifacts.urlで指定するアーティファクトのファイルタイプを指定します。有効なタイプはzip、tgz、またはjarです。Debezium コネクターアーカイブは、.zipファイル形式で提供されます。typeの値は、urlフィールドで参照されるファイルのタイプと一致させる必要があります。7
artifacts.urlの値は、コネクターアーティファクトのファイルを格納する Maven リポジトリーなどの HTTP サーバーのアドレスを指定します。Debezium コネクターアーティファクトは Red Hat リポジトリーで入手できます。OpenShift クラスターが指定されたサーバーにアクセスできる必要があります。8
(オプション) Apicurio Registry コンポーネントをダウンロードするためのアーティファクト
typeとurlを指定します。デフォルトの JSON コンバーターを使用する代わりに、コネクターが Apache Avro を使用して Red Hat build of Apicurio Registry でイベントのキーと値をシリアル化する場合にのみ、Apicurio Registry アーティファクトを含めます。9
(オプション) Debezium コネクターで使用する Debezium スクリプト SMT アーカイブのアーティファクト
typeとurlを指定します。Debezium コンテンツベースルーティング SMT または フィルター SMT を使用する場合にのみ、スクリプト SMT を含めます。スクリプト SMT を使用するには、groovy などの JSR 223 準拠のスクリプト実装もデプロイする必要があります。10
(オプション) JSR 223 準拠のスクリプト実装の JAR ファイルのアーティファクト
typeとurlを指定します。これは、Debezium スクリプト SMT で必要です。重要Streams for Apache Kafka を使用してコネクタープラグインを Kafka Connect イメージに組み込む場合、必要なスクリプト言語コンポーネントごとに、
artifacts.urlに JAR ファイルのロケーションを指定し、artifacts.typeの値もjarに設定する必要があります。値が無効な場合は、実行時にコネクターが失敗します。スクリプト SMT で Apache Groovy 言語を使用できるようにするために、この例のカスタムリソースは、次のライブラリーの JAR ファイルを取得します。
-
groovy -
groovy-jsr223(スクリプトエージェント) -
groovy-json(JSON 文字列を解析するためのモジュール)
別の方法として、Debezium スクリプト SMT は、GraalVM JavaScript の JSR 223 実装の使用もサポートします。
以下のコマンドを入力して、
KafkaConnectビルド仕様を OpenShift クラスターに適用します。oc create -f dbz-connect.yaml
oc create -f dbz-connect.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow Streams Operator はカスタムリソースで指定された設定に基づいて、デプロイする Kafka Connect イメージを準備します。
ビルドが完了すると、Operator はイメージを指定されたレジストリーまたは ImageStream にプッシュし、Kafka Connect クラスターを起動します。設定にリスト表示されているコネクターアーティファクトはクラスターで利用できます。KafkaConnectorリソースを作成し、デプロイする各コネクターのインスタンスを定義します。
たとえば、以下のKafkaConnectorCR を作成し、postgresql-inventory-connector.yamlとして保存します。例2.43 Debezium コネクターの
KafkaConnectorカスタムリソースを定義するpostgresql-inventory-connector.yamlファイルCopy to Clipboard Copied! Toggle word wrap Toggle overflow Expand 表2.156 コネクター設定の説明 項目 説明 1
Kafka Connect クラスターに登録するコネクターの名前。
2
コネクタークラスの名前。
3
同時に動作できるタスクの数。
4
コネクターの設定。
5
ホストデータベースインスタンスのアドレス。
6
データベースインスタンスのポート番号。
7
Debezium がデータベースへの接続に使用するアカウントの名前。
8
Debezium がデータベースユーザーアカウントに接続するために使用するパスワード。
9
変更をキャプチャーするデータベースの名前。
10
データベースインスタンスまたはクラスターのトピック接頭辞。
指定する名前は、英数字またはアンダースコアのみで設定する必要があります。
トピック接頭辞は、このコネクターから変更イベントを受信する Kafka トピックの接頭辞として使用されるため、名前はクラスターのコネクター間で一意である必要があります。
コネクターを Avro コネクター と統合する場合、この namespace は、関連する Kafka Connect スキーマの名前や、対応する Avro スキーマの namespace でも使用されます。11
コネクターが変更イベントをキャプチャーするテーブルのリスト。
以下のコマンドを実行してコネクターリソースを作成します。
oc create -n <namespace> -f <kafkaConnector>.yaml
oc create -n <namespace> -f <kafkaConnector>.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow 以下に例を示します。
oc create -n debezium -f postgresql-inventory-connector.yaml
oc create -n debezium -f postgresql-inventory-connector.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow コネクターは Kafka Connect クラスターに登録され、
KafkaConnectorCR のspec.config.database.dbnameで指定されたデータベースに対して実行を開始します。コネクター Pod の準備ができると、Debezium が実行されます。
これで、Debezium PostgreSQL デプロイメントを検証する 準備が整いました。
2.6.6.3. Dockerfile からカスタム Kafka Connect コンテナーイメージをビルドして Debezium PostgreSQL コネクターのデプロイ リンクのコピーリンクがクリップボードにコピーされました!
Debezium PostgreSQL コネクターをデプロイするには、Debezium コネクターアーカイブが含まれるカスタム Kafka Connect コンテナーイメージをビルドし、このコンテナーイメージをコンテナーレジストリーにプッシュする必要があります。次に、2 つのカスタムリソース (CR) を作成する必要があります。
-
Kafka Connect インスタンスを定義する
KafkaConnectCR。imageは Debezium コネクターを実行するために作成したイメージの名前を指定します。この CR は、Red Hat Streams for Apache Kafka がデプロイされている OpenShift インスタンスに適用します。Streams for Apache Kafka は、Apache Kafka を OpenShift に導入する Operator とイメージを提供します。 -
Debezium PostgreSQL コネクターを定義する
KafkaConnectorCR。この CR をKafkaConnectCR を適用したのと同じ OpenShift インスタンスに適用します。
前提条件
- PostgreSQL が実行され、PostgreSQL を設定して Debezium コネクターを実行する 手順が実行済みである。
- Streams for Apache Kafka が OpenShift にデプロイされ、Apache Kafka および Kafka Connect が実行されている。詳細は、Streams for Apache Kafka on OpenShift のデプロイと管理 を参照してください。
- Podman または Docker がインストールされている。
-
Debezium コネクターを実行するコンテナーを追加する予定のコンテナーレジストリー (
quay.ioやdocker.ioなど) でコンテナーを作成および管理するアカウントとパーミッションを持っている。
手順
Kafka Connect の Debezium PostgreSQL コンテナーを作成します。
registry.redhat.io/amq-streams/kafka-39-rhel9:2.9.0をベースイメージとして使用する Dockerfile を作成します。たとえば、ターミナルウィンドウから、以下のコマンドを入力します。Copy to Clipboard Copied! Toggle word wrap Toggle overflow Expand 項目 説明 1
任意のファイル名を指定できます。
2
Kafka Connect プラグインディレクトリーへのパスを指定します。Kafka Connect のプラグインディレクトリーが別の場所にある場合は、このパスを実際のディレクトリーのパスに置き換えてください。
このコマンドは、現在のディレクトリーに
debezium-container-for-postgresql.yamlという名前の Dockerfile を作成します。前のステップで作成した
debezium-container-for-postgresql.yamlDocker ファイルからコンテナーイメージをビルドします。ファイルが含まれるディレクトリーから、ターミナルウィンドウを開き、以下のコマンドのいずれかを入力します。podman build -t debezium-container-for-postgresql:latest .
podman build -t debezium-container-for-postgresql:latest .Copy to Clipboard Copied! Toggle word wrap Toggle overflow docker build -t debezium-container-for-postgresql:latest .
docker build -t debezium-container-for-postgresql:latest .Copy to Clipboard Copied! Toggle word wrap Toggle overflow buildコマンドは、debezium-container-for-postgresqlという名前のコンテナーイメージを構築します。カスタムイメージを
quay.ioなどのコンテナーレジストリーまたは内部のコンテナーレジストリーにプッシュします。コンテナーレジストリーは、イメージをデプロイする OpenShift インスタンスで利用できる必要があります。以下のいずれかのコマンドを実行します。podman push <myregistry.io>/debezium-container-for-postgresql:latest
podman push <myregistry.io>/debezium-container-for-postgresql:latestCopy to Clipboard Copied! Toggle word wrap Toggle overflow docker push <myregistry.io>/debezium-container-for-postgresql:latest
docker push <myregistry.io>/debezium-container-for-postgresql:latestCopy to Clipboard Copied! Toggle word wrap Toggle overflow 新しい Debezium PostgreSQL
KafkaConnectカスタムリソース (CR) を作成します。たとえば、annotationsおよびimageプロパティーを指定するdbz-connect.yamlという名前のKafkaConnectCR を作成します。以下の例は、KafkaConnectカスタムリソースを記述するdbz-connect.yamlファイルからの抜粋を示しています。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow Expand 項目 説明 1
KafkaConnectorリソースはこの Kafka Connect クラスターでコネクターを設定するために使用されることを、metadata.annotationsは Cluster Operator に示します。2
spec.imageは Debezium コネクターを実行するために作成したイメージの名前を指定します。設定された場合、このプロパティーによって Cluster Operator のSTRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE変数がオーバーライドされます。以下のコマンドを実行して、
KafkaConnectCR を OpenShift Kafka インスタンスに適用します。oc create -f dbz-connect.yaml
oc create -f dbz-connect.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow これにより、OpenShift の Kafka Connect 環境が更新され、Debezium コネクターを実行するために作成したイメージの名前を指定する Kafka Connector インスタンスが追加されます。
Debezium PostgreSQL コネクターインスタンスを設定する
KafkaConnectorカスタムリソースを作成します。通常、コネクター設定プロパティーを設定する
.yamlファイルに Debezium PostgreSQL コネクターを設定します。コネクター設定は、Debezium に対して、スキーマおよびテーブルのサブセットにイベントを生成するよう指示する可能性があり、または機密性の高い、大きすぎる、または不必要な指定のコラムで Debezium が値を無視、マスク、または切り捨てするようにプロパティーを設定する可能性もあります。Debezium PostgreSQL コネクターに設定できる設定プロパティーの完全リストは PostgreSQL コネクタープロパティー を参照してください。次の例は、ポート
5432で PostgreSQL サーバーホスト192.168.99.100に接続する Debezium コネクターを設定するカスタムリソースからの抜粋です。このホストには、sampledbという名前のデータベース、publicという名前のスキーマ、inventory-connector-postgresqlという論理名のサーバーがあります。
PostgreSQL
inventory-connector.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow Expand 表2.157 PostgreSQL の inventory-connector.yaml サンプルの設定の説明 項目 説明 1
コネクターを Kafka Connect に登録するために使用される名前。
2
このコネクターに作成するタスクの最大数。PostgreSQL コネクターは単一のコネクタータスクを使用して PostgreSQL サーバーの
binlogを読み取るため、適切な順序とイベント処理を確保するため、一度に動作できるタスクは 1 つのみです。Kafka Connect サービスは、コネクターを使用して 1 つ以上のタスクを開始して作業を実行し、実行中のタスクを Kafka Connect サービスのクラスター全体に自動的に分散します。サービスが停止またはクラッシュした場合、タスクは実行中のサービスに再分散されます。3
コネクターの設定。
4
PostgreSQL サーバーを実行するデータベースホストの名前。この例では、データベースのホスト名は
192.168.99.100です。5
一意のトピック接頭辞。トピック接頭辞は、PostgreSQL サーバーまたはサーバーのクラスターの論理識別子です。この文字列は、コネクターから変更イベントレコードを受け取るすべての Kafka トピックの名前の先頭に付加されます。
6
コネクターは
publicスキーマでのみ変更をキャプチャーします。選択したテーブルでのみ変更をキャプチャーするようにコネクターを設定できます。詳細はtable.include.listを参照してください。7
PostgreSQL サーバーにインストールされている PostgreSQL 論理デコードプラグイン の名前。コネクターは
pgoutputプラグインの使用のみをサポートしますが、plugin.nameをpgoutputに明示的に設定する必要があります。Kafka Connect でコネクターインスタンスを作成します。たとえば、
KafkaConnectorリソースをinventory-connector.yamlファイルに保存した場合は、以下のコマンドを実行します。oc apply -f inventory-connector.yaml
oc apply -f inventory-connector.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow このコマンドは
inventory-connectorを登録して、コネクターがKafkaConnectorCR に定義されているsampledbデータベースに対して実行を開始します。
結果
コネクターが起動すると、コネクターが設定された PostgreSQL サーバーデータベースの 整合性スナップショットが実行 されます。その後、コネクターは行レベルの操作のデータ変更イベントの生成を開始し、変更イベントレコードを Kafka トピックにストリーミングします。
2.6.6.4. Debezium PostgreSQL コネクターが実行していることの確認 リンクのコピーリンクがクリップボードにコピーされました!
コネクターがエラーなしで正常に起動すると、コネクターがキャプチャーするように設定された各テーブルのトピックが作成されます。ダウンストリームアプリケーションは、これらのトピックをサブスクライブして、ソースデータベースで発生する情報イベントを取得できます。
コネクターが実行されていることを確認するには、OpenShift Container Platform Web コンソールまたは OpenShift CLI ツール (oc) から以下の操作を実行します。
- コネクターのステータスを確認します。
- コネクターがトピックを生成していることを確認します。
- 各テーブルの最初のスナップショットの実行中にコネクターが生成する読み取り操作 ("op":"r") のイベントがトピックに反映されていることを確認します。
前提条件
- Debezium コネクターが Streams for Apache Kafka on OpenShift にデプロイされている。
-
OpenShift
ocCLI クライアントがインストールされている。 - OpenShift Container Platform Web コンソールにアクセスできる。
手順
以下の方法のいずれかを使用して
KafkaConnectorリソースのステータスを確認します。OpenShift Container Platform Web コンソールから以下を実行します。
-
Home
Search に移動します。 -
Search ページで Resources をクリックし、Select Resource ボックスを開き、
KafkaConnectorを入力します。 - KafkaConnectors リストから、チェックするコネクターの名前をクリックします (例: inventory-connector-postgresql)。
- Conditions セクションで、Type および Status 列の値が Ready および True に設定されていることを確認します。
-
Home
ターミナルウィンドウから以下を実行します。
以下のコマンドを実行します。
oc describe KafkaConnector <connector-name> -n <project>
oc describe KafkaConnector <connector-name> -n <project>Copy to Clipboard Copied! Toggle word wrap Toggle overflow 以下に例を示します。
oc describe KafkaConnector inventory-connector-postgresql -n debezium
oc describe KafkaConnector inventory-connector-postgresql -n debeziumCopy to Clipboard Copied! Toggle word wrap Toggle overflow このコマンドは、以下の出力のようなステータス情報を返します。
例2.44
KafkaConnectorリソースのステータスCopy to Clipboard Copied! Toggle word wrap Toggle overflow
コネクターによって Kafka トピックが作成されたことを確認します。
OpenShift Container Platform Web コンソールから以下を実行します。
-
Home
Search に移動します。 -
Search ページで Resources をクリックし、Select Resource ボックスを開き、
KafkaTopicを入力します。 -
KafkaTopics リストから確認するトピックの名前をクリックします (例:
inventory-connector-postgresql.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d)。 - Conditions セクションで、Type および Status 列の値が Ready および True に設定されていることを確認します。
-
Home
ターミナルウィンドウから以下を実行します。
以下のコマンドを実行します。
oc get kafkatopics
oc get kafkatopicsCopy to Clipboard Copied! Toggle word wrap Toggle overflow このコマンドは、以下の出力のようなステータス情報を返します。
例2.45
KafkaTopicリソースのステータスCopy to Clipboard Copied! Toggle word wrap Toggle overflow
トピックの内容を確認します。
- ターミナルウィンドウから、以下のコマンドを入力します。
oc exec -n <project> -it <kafka-cluster> -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=<topic-name>
oc exec -n <project> -it <kafka-cluster> -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=<topic-name>Copy to Clipboard Copied! Toggle word wrap Toggle overflow 以下に例を示します。
oc exec -n debezium -it debezium-kafka-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=inventory-connector-postgresql.inventory.products_on_hand
oc exec -n debezium -it debezium-kafka-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=inventory-connector-postgresql.inventory.products_on_handCopy to Clipboard Copied! Toggle word wrap Toggle overflow トピック名を指定する形式は、手順 1 で返された
oc describeコマンドと同じです (例:inventory-connector-postgresql.inventory.addresses)。トピックの各イベントについて、このコマンドは、以下の出力のような情報を返します。
例2.46 Debezium 変更イベントの内容
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"inventory-connector-postgresql.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-postgresql.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-postgresql.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"int64","optional":false,"field":"ts_us"},{"type":"int64","optional":false,"field":"ts_ns"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"inventory-connector-postgresql.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"3.0.8.Final-redhat-00004","connector":"postgresql","name":"inventory-connector-postgresql","ts_ms":1638985247805,"ts_us":1638985247805000000,"ts_ns":1638985247805000000,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"postgresql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"ts_us":1638985247805102,"ts_ns":1638985247805102588,"transaction":null}}{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"inventory-connector-postgresql.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-postgresql.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-postgresql.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"int64","optional":false,"field":"ts_us"},{"type":"int64","optional":false,"field":"ts_ns"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"inventory-connector-postgresql.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"3.0.8.Final-redhat-00004","connector":"postgresql","name":"inventory-connector-postgresql","ts_ms":1638985247805,"ts_us":1638985247805000000,"ts_ns":1638985247805000000,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"postgresql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"ts_us":1638985247805102,"ts_ns":1638985247805102588,"transaction":null}}Copy to Clipboard Copied! Toggle word wrap Toggle overflow 上記の例では、
payload値は、コネクタースナップショットがテーブルinventory.products_on_handから読み込み ("op" ="r") イベントを生成したことを示しています。product_idレコードの"before"状態はnullであり、レコードに以前の値が存在しないことを示しています。"after"状態は、product_id101を持つ項目のquantityが3であることを示しています。
2.6.6.5. Debezium PostgreSQL コネクター設定プロパティーの説明 リンクのコピーリンクがクリップボードにコピーされました!
Debezium PostgreSQL コネクターには、アプリケーションに適したコネクター動作を実現するために使用できる設定プロパティーが多数あります。多くのプロパティーにはデフォルト値があります。プロパティーに関する情報は、以下のように設定されています。
必要な Debezium PostgreSQL コネクター設定プロパティー
以下の設定プロパティーは、デフォルト値がない場合は 必須 です。
| プロパティー | デフォルト | 説明 |
|---|---|---|
| デフォルトなし | コネクターの一意名。同じ名前で再登録を試みると失敗します。このプロパティーはすべての Kafka Connect コネクターに必要です。 | |
| デフォルトなし |
コネクターの Java クラスの名前。PostgreSQL コネクターには、常に | |
|
| このコネクターのために作成する必要のあるタスクの最大数。PostgreSQL コネクターは常に単一のタスクを使用するため、この値を使用しません。そのため、デフォルト値は常に許容されます。 | |
|
| PostgreSQL サーバーにインストールされている PostgreSQL 論理デコードプラグイン の名前。
サポートされている値は | |
|
| 特定のデータベース/スキーマの特定のプラグインから変更をストリーミングするために作成された PostgreSQL 論理デコードスロットの名前。サーバーはこのスロットを使用して、設定する Debezium コネクターにイベントをストリーミングします。 スロット名は PostgreSQL レプリケーションスロットの命名ルール に準拠する必要があり、命名ルールには "Each replication slot has a name, which can contain lower-case letters, numbers, and the underscore character." と記載されています。 | |
|
| コネクターが正常に想定されるように停止した場合に論理レプリケーションスロットを削除するかどうか。デフォルトの動作では、コネクターが停止したときにレプリケーションスロットはコネクターに設定された状態を保持します。コネクターが再起動すると、同じレプリケーションスロットがあるため、コネクターは停止した場所から処理を開始できます。
テストまたは開発環境でのみ | |
|
| コネクターがフェイルオーバースロットを作成するかどうかを指定します。この設定を省略した場合、またはプライマリーサーバーで PostgreSQL 16 以前が実行されている場合、コネクターはフェイルオーバースロットを作成しません。 注記
PostgreSQL は 重要
Debezium と PostgreSQL 17 の使用、およびフェイルオーバーレプリケーションスロットを設定する機能は、テクノロジープレビュー機能です。 | |
|
|
このパブリケーションが存在しない場合は起動時に作成され、すべてのテーブル が含まれます。Debezium は、設定されている場合は、独自の include/exclude リストフィルターを適用し、対象となる特定のテーブルのイベントのみをパブリケーションが変更するように制限します。コネクターユーザーがこのパブリケーションを作成するには、スーパーユーザーの権限が必要であるため、通常はコネクターを初めて開始する前にパブリケーションを作成することを推奨します。 パブリケーションがすでに存在し、すべてのテーブルが含まれてているか、テーブルのサブセットで設定されている場合、Debezium は定義されているようにパブリケーションを使用します。 | |
| デフォルトなし | PostgreSQL データベースサーバーの IP アドレスまたはホスト名。 | |
|
| PostgreSQL データベースサーバーのポート番号 (整数)。 | |
| デフォルトなし | PostgreSQL データベースサーバーに接続するための PostgreSQL データベースユーザーの名前。 | |
| デフォルトなし | PostgreSQL データベースサーバーへの接続時に使用するパスワード。 | |
| デフォルトなし | 変更をストリーミングする PostgreSQL データベースの名前。 | |
| デフォルトなし |
Debezium が変更をキャプチャーする特定の PostgreSQL データベースサーバーまたはクラスターの名前空間を提供するトピック接頭辞。接頭辞は、他のコネクター全体で一意となる必要があります。これは、このコネクターからレコードを受信するすべての Kafka トピックのトピック名接頭辞として使用されるためです。データベースサーバーの論理名には英数字とハイフン、ドット、アンダースコアのみを使用する必要があります。 警告 このプロパティーの値を変更しないでください。名前の値を変更すると、再起動後に、元のトピックにイベントを発行し続けるのではなく、新しい値に基づいた名前のトピックに後続のイベントを発行します。 | |
| デフォルトなし |
変更をキャプチャーする 対象とする スキーマの名前と一致する正規表現のコンマ区切りリスト (任意)。
スキーマの名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定した式は、スキーマ名に存在する可能性のある部分文字列とは一致しない、スキーマの ID 全体と照合されます。 | |
| デフォルトなし |
変更をキャプチャーする 対象としない スキーマの名前と一致する正規表現のコンマ区切りリスト (任意)。システムスキーマ以外で、
スキーマの名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定した式は、スキーマ名に存在する可能性のある部分文字列とは一致しない、スキーマの ID 全体と照合されます。 | |
| デフォルトなし |
変更をキャプチャーするテーブルの完全修飾テーブル識別子と一致する正規表現のコンマ区切りリスト (任意)。このプロパティーが設定されている場合、コネクターは指定されたテーブルからのみ変更をキャプチャします。各識別子の形式は schemaName.tableName です。デフォルトでは、コネクターは変更がキャプチャーされる各スキーマのシステムでないすべてのテーブルの変更をキャプチャーします。
テーブルの名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、テーブル名に存在する可能性のある部分文字列とは一致しない、テーブルの ID 全体と照合されます。 | |
| デフォルトなし |
変更をキャプチャーしないテーブルの完全修飾テーブル識別子と一致する正規表現のコンマ区切りリスト (任意)。各識別子の形式は schemaName.tableName です。このプロパティーが設定されている場合、コネクターは、指定のないすべてのテーブルから変更をキャプチャーします。
テーブルの名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、テーブル名に存在する可能性のある部分文字列とは一致しない、テーブルの ID 全体と照合されます。 | |
| デフォルトなし |
変更イベントレコード値に含まれる必要がある列の完全修飾名と一致する正規表現のコンマ区切りリスト (任意)。列の完全修飾名の形式は schemaName.tableName.columnName です。
列の名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、列の名前文字列全体を一致させるために式が使用され、列名に存在する可能性のある部分文字列とは一致しません。 | |
| デフォルトなし |
変更イベントレコード値から除外される必要がある列の完全修飾名と一致する正規表現のコンマ区切りリスト (任意)。列の完全修飾名の形式は schemaName.tableName.columnName です。
列の名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、列の名前文字列全体を一致させるために式が使用され、列名に存在する可能性のある部分文字列とは一致しません。 | |
|
|
含まれる列に変更がない場合にメッセージの公開をスキップするかどうかを指定します。これは基本的に、含まれる列に変更がない場合、 注記
このプロパティーは、テーブルの REPLICA IDENTITY が | |
|
|
時間、日付、およびタイムスタンプは、異なる精度の種類で表すことができます。 | |
|
|
コネクターによる | |
|
|
コネクターによる | |
|
|
| |
|
|
PostgreSQL サーバーへの暗号化された接続を使用するかどうか。オプションには以下が含まれます。 | |
| デフォルトなし | クライアントの SSL 証明書が含まれるファイルへのパス。詳細は the PostgreSQL のドキュメント を参照してください。 | |
| デフォルトなし | クライアントの SSL 秘密鍵が含まれるファイルへのパス。詳細は the PostgreSQL のドキュメント を参照してください。 | |
| デフォルトなし |
| |
| デフォルトなし | サーバーが検証されるルート証明書が含まれるファイルへのパス。詳細は the PostgreSQL のドキュメント を参照してください。 | |
| デフォルトなし |
SSL ソケットを作成するクラスの名前。開発環境で SSL 検証を無効にするには、 | |
|
| TCP keep-alive プローブを有効にして、データベース接続がまだ有効であることを確認します。詳細は the PostgreSQL のドキュメント を参照してください。 | |
|
|
delete イベントの後に廃棄 (tombstone) イベントが続くかどうかを制御します。 | |
| 該当なし |
文字ベースの列の完全修飾名と一致する正規表現のコンマ区切りリスト (任意)。プロパティー名の 長さ で指定された文字数を超えた場合に、一連の列のデータを切り捨てる場合は、このプロパティーを設定します。
列の完全修飾名は、 単一の設定で、異なる長さを持つ複数のプロパティーを指定できます。 | |
| 該当なし |
文字ベースの列の完全修飾名と一致する正規表現のコンマ区切りリスト (任意)。一連の列の値をコネクターでマスクする場合 (たとえば、列に機密データが含まれている場合) は、このプロパティーを設定します。 列の完全修飾名は、次の形式に従います: schemaName.tableName.columnName列の名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、列の名前文字列全体に対して照合されます。式は、列名に存在する可能性のある部分文字列と一致しません。 単一の設定で、異なる長さを持つ複数のプロパティーを指定できます。 | |
|
| 該当なし |
文字ベースの列の完全修飾名と一致する正規表現のコンマ区切りリスト (任意)。列の完全修飾名の形式は <schemaName>.<tableName>.<columnName> です。
仮名は、指定された hashAlgorithm と salt を適用すると得られるハッシュ化された値で構成されます。使用されるハッシュ関数に基づいて、参照整合性は保持され、列値は仮名に置き換えられます。サポートされるハッシュ関数は、Java Cryptography Architecture Standard Algorithm Name Documentation の MessageDigest セクション に記載されています。 column.mask.hash.SHA-256.with.salt.CzQMA0cB5K = inventory.orders.customerName, inventory.shipment.customerName
必要な場合は、仮名は自動的に列の長さに短縮されます。コネクター設定には、異なるハッシュアルゴリズムと salt を指定する複数のプロパティーを含めることができます。 |
| 該当なし | 列のメタデータを表す追加パラメーターをコネクターに発行させたい列の完全修飾名に一致する、オプションのコンマ区切りの正規表現のリスト。このプロパティーが設定されている場合、コネクターは次のフィールドをイベントレコードのスキーマに追加します。
これらのパラメーターは、列の元の型名と長さ (可変幅型の場合) をそれぞれ伝達します。
列の完全修飾名は、次のいずれかの形式に従います: databaseName.tableName.columnName、または databaseName.schemaName.tableName.columnName. | |
| 該当なし | データベース内の列に対して定義されているデータ型の完全修飾名を指定する正規表現のオプションのコンマ区切りリスト。このプロパティーが設定されている場合、データ型が一致する列に対して、コネクターはスキーマに次の追加フィールドを含むイベントレコードを発行します。
これらのパラメーターは、列の元の型名と長さ (可変幅型の場合) をそれぞれ伝達します。
列の完全修飾名の形式は、databaseName.tableName.typeName、または databaseName.schemaName.tableName.typeName のいずれかになります。 PostgreSQL 固有のデータ型名の一覧は、PostgreSQL データ型マッピング を参照してください。 | |
| 空の文字列 | 指定のテーブルの Kafka トピックに公開する変更イベントレコードのカスタムメッセージキーを形成するためにコネクターが使用する列を指定する式のリスト。
デフォルトでは、Debezium はテーブルのプライマリーキー列を、出力するレコードのメッセージキーとして使用します。デフォルトの代わりに、またはプライマリーキーのないテーブルのキーを指定するには、1 つ以上の列をもとにカスタムメッセージキーを設定できます。
各完全修飾テーブル名は、 カスタムメッセージキーの作成に使用する列の数に制限はありません。ただし、一意の鍵を指定するために必要な最小数を使用することが推奨されます。 重要
このプロパティーに指定した式がテーブルのプライマリーキーの一部ではない列と一致する場合は、テーブルの | |
| all_tables |
コネクターが パブリケーション を作成するかどうか、また作成する方法を指定します。この設定は、コネクターが 注記 パブリケーションを作成するには、コネクターは特定の権限があるデータベースアカウントを使用して PostgreSQL にアクセスする必要があります。詳細は、Debezium が PostgreSQL パブリケーションを作成できるように権限を設定 を参照してください。 以下のいずれかの値を指定します。
| |
| 空の文字列 | このプロパティーを設定すると、テーブル名に基づいて、コネクターがキャプチャーするテーブルのサブセットに特定の レプリカアイデンティティー 設定が適用されます。プロパティーによって設定されるレプリカアイデンティティー値は、データベースに設定されているレプリカアイデンティティー値を上書きします。 このプロパティーは、キーと値のペアのコンマ区切りのリストを受け入れます。各キーは完全修飾テーブル名に一致する正規表現であり、対応する値はレプリカアイデンティティータイプを指定します。以下に例を示します。
完全修飾テーブル名を指定するには、 レプリカアイデンティティーを次のいずれかの値に設定します。
schema1.*:FULL,schema2.table2:NOTHING,schema2.table3:INDEX idx_name
重要
| |
| bytes |
変更イベントでバイナリー (
| |
| none | コネクターで使用されるメッセージコンバータとの互換性のために、スキーマ名をどのように調整するかを指定します。以下のいずれかの値を設定します。
注記
上記の例では、アンダースコア文字 ( | |
| none | コネクターで使用されるメッセージコンバータとの互換性のために、フィールド名をどのように調整するかを指定します。以下のいずれかの値を指定します。
注記
上記の例では、アンダースコア文字 ( 詳しい情報は、Avro naming を参照してください。 | |
|
|
Postgres の | |
| デフォルトなし | コネクターでキャプチャーする論理デコードメッセージの接頭辞と一致するコンマ区切りの正規表現 (任意)。デフォルトでは、コネクターはすべての論理デコードメッセージをキャプチャーします。このプロパティーが設定されている場合、コネクターはプロパティーで指定された接頭辞が含まれる論理デコードメッセージのみをキャプチャーします。その他の論理デコードメッセージはすべて除外されます。 メッセージの接頭辞名を照合するために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、メッセージ接頭辞文字列全体と照合されます。式は、接頭辞に存在する可能性のある部分文字列と一致しません。
このプロパティーを設定に含める場合は、 メッセージ イベントの構造とその順序付けのセマンティクスについては、メッセージ イベント を参照してください。 | |
| デフォルトなし |
コネクターでキャプチャーしない論理デコードメッセージの接頭辞名と一致するコンマ区切りの正規表現 (任意)。このプロパティーが設定されている場合、コネクターは、指定された接頭部を使用する論理デコードメッセージをキャプチャーしません。その他のメッセージはすべてキャプチャーされます。 メッセージの接頭辞名を照合するために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、メッセージ接頭辞文字列全体と照合されます。式は、接頭辞に存在する可能性のある部分文字列と一致しません。
このプロパティーを設定に含める場合は、 メッセージ イベントの構造とその順序付けのセマンティクスについては、メッセージ イベント を参照してください。 |
高度な Debezium PostgreSQL コネクター設定プロパティー
以下の 高度な 設定プロパティーには、ほとんどの状況で機能するデフォルト設定があるため、コネクターの設定で指定する必要はほとんどありません。
| プロパティー | デフォルト | 説明 |
|---|---|---|
| デフォルトなし |
コネクターが使用できる カスタムコンバーター インスタンスのシンボリック名のコンマ区切りリストを列挙します。以下に例を示します。
コネクターがカスタムコンバーターを使用できるようにするには、
コネクターに設定するコンバーターごとに、コンバーターインターフェイスを実装するクラスの完全修飾名を指定する
以下に例を示します。 isbn.type: io.debezium.test.IsbnConverter
設定されたコンバータの動作をさらに制御したい場合は、1 つ以上の設定パラメーターを追加して、コンバータに値を渡すことができます。追加の設定パラメーターとコンバーターを関連付けるには、パラメーター名の前にコンバーターのシンボリック名を付けます。 isbn.schema.name: io.debezium.postgresql.type.Isbn
| |
|
| 初期スナップショットまたはアドホックブロッキングスナップショット中にデータを読み取る際に、コネクターが適用するトランザクションの分離レベルとロックの種類 (存在する場合) を指定します。 各分離レベルは、並行性とパフォーマンスの最適化と、データの整合性と正確性の最大化との間で異なるバランスを取ります。より厳格な分離レベルを使用するスナップショットでは、データの品質と整合性が向上しますが、その代償として、ロック時間が長くなり、同時トランザクションが減少するため、パフォーマンスが低下します。制限の少ない分離レベルでは効率は上がりますが、データの不整合が発生します。PostgreSQL のトランザクション分離レベルの詳細は、PostgreSQL のドキュメント を参照してください。 次のいずれかの分離レベルを指定します。
| |
| Initial |
コネクターの起動時にスナップショットを実行するための基準を指定します。
Kafka オフセットトピックに以前保存された LSN がある場合、コネクターはその位置から変更をストリーミングを続行します。LSN が保存されていない場合、コネクターは、サーバー上で PostgreSQL 論理レプリケーションスロットが作成された時点から、変更のストリーミングを開始します。対象のすべてのデータが WAL に反映されている場合にのみ、このスナップショットモードを使用します。
詳細は、 | |
|
|
スキーマスナップショットを実行するときにコネクターがテーブルのロックを保持する方法を指定します。
警告 スナップショット中にスキーマの変更が発生する可能性がある場合は、このモードを使用しないでください。 | |
|
|
スナップショットを実行するときにコネクターがデータをクエリーする方法を指定します。
この設定により、 | |
|
|
スナップショットに含めるテーブルの完全修飾名 ( テーブルの名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、テーブル名に存在する可能性のある部分文字列とは一致しない、テーブルの名前文字列全体と照合されます。 | |
|
| スナップショットの実行時に、テーブルロックを取得するまで待つ最大時間 (ミリ秒単位) を指定する正の整数値。コネクターがこの期間にテーブルロックを取得できないと、スナップショットは失敗します。詳細は、コネクターによるスナップショットの実行方法 を参照してください。 | |
| デフォルトなし | スナップショットに追加するテーブル行を指定します。スナップショットにテーブルの行のサブセットのみを含める場合は、プロパティーを使用します。このプロパティーはスナップショットにのみ影響します。コネクターがログから読み取るイベントには影響しません。
プロパティーには、
スナップショットにソフト削除以外のレコードのみを含める場合は、soft-delete 列 ( "snapshot.select.statement.overrides": "customer.orders", "snapshot.select.statement.overrides.customer.orders": "SELECT * FROM customers.orders WHERE delete_flag = 0 ORDER BY id DESC"
作成されるスナップショットでは、コネクターには | |
|
|
イベントの処理中にコネクターが例外に反応する方法を指定します。 | |
|
| コネクターが処理するイベントの各バッチの最大サイズを指定する正の整数値。 | |
|
|
ブロッキングキューが保持できるレコードの最大数を指定する正の整数値。Debezium はデータベースからストリームされたイベントを読み込む際、Kafka に書き込む前にブロッキングキューにイベントを配置します。ブロッキングキューは、コネクターが Kafka に書き込むよりも速くメッセージを取り込む場合、または Kafka が利用できなくなった場合に、データベースから変更イベントを読み込むためのバックプレッシャーを提供することができます。コネクターがオフセットを定期的に記録すると、キューに保持されるイベントは無視されます。 | |
|
|
ブロッキングキューの最大容量をバイト単位で指定する長整数値。デフォルトでは、ブロックキューにはボリューム制限は指定されません。キューが使用できるバイト数を指定するには、このプロパティーを正の long 値に設定します。 | |
|
| コネクターがイベントのバッチの処理を開始する前に、新しい変更イベントの発生を待つ期間をミリ秒単位で指定する正の整数値。デフォルトは 500 ミリ秒です。 | |
|
|
コネクターがデータ型が不明なフィールドを見つけたときのコネクターの動作を指定します。コネクターが変更イベントからフィールドを省略し、警告をログに記録するのがデフォルトの動作です。 注記
| |
| デフォルトなし |
データベースへの JDBC 接続を確立するときにコネクターが実行する SQL ステートメントのセミコロン区切りリスト。セミコロンを区切り文字としてではなく、文字として使用する場合は、2 つの連続したセミコロン | |
|
|
レプリケーションの接続状態をサーバーに送信する頻度をミリ秒単位で指定します。 | |
|
|
コネクターがハートビートメッセージを Kafka トピックに送信する頻度を制御します。デフォルトの動作では、コネクターはハートビートメッセージを送信しません。 | |
| デフォルトなし |
コネクターがハートビートメッセージを送信するときにコネクターがソースデータベースで実行するクエリーを指定します。 | |
|
|
テーブルのインメモリースキーマの更新をトリガーする条件を指定します。 | |
| デフォルトなし | コネクターの起動時にスナップショットを実行するまでコネクターが待つ必要がある間隔 (ミリ秒単位)。クラスターで複数のコネクターを起動する場合、このプロパティーは、コネクターのリバランスが行われる原因となるスナップショットの中断を防ぐのに役立ちます。 | |
| 0 |
コネクターがスナップショットを完了した後、ストリーミングプロセスの開始を遅延する時間をミリ秒単位で指定します。遅延間隔を設定すると、スナップショットが完了した直後で、ストリーミングプロセスの開始前に障害が発生した場合に、コネクターがスナップショットを再開できないようにします。Kafka Connect ワーカーに設定されている | |
|
| スナップショットの実行中、コネクターは行のバッチでテーブルの内容を読み取ります。このプロパティーは、バッチの行の最大数を指定します。 | |
| デフォルトなし |
設定された論理デコードプラグインに渡すパラメーターのセミコロン区切りリスト。たとえば、 | |
|
| レプリケーションスロットへの接続に失敗した場合に、連続して接続を試行する最大回数です。 | |
|
| コネクターがレプリケーションスロットへの接続に失敗した場合に再試行を行う間隔 (ミリ秒単位)。 | |
|
|
コネクターが提供する定数を指定して、元の値がデータベースによって提供されていない Toast 化された値であることを示します。 | |
|
|
コネクターがトランザクション境界でイベントを生成し、トランザクションメタデータで変更イベントエンベロープを強化するかどうかを決定します。コネクターにこれを実行させる場合は | |
|
|
WAL ログを削除できるように、コネクターがソース PostgreSQL データベース内の処理済みレコードの LSN をコミットするかどうかを決定します。コネクターが処理されたレコードの LSN をコミットしないようにする場合は、 注記
このプロパティーの値を | |
| 10000 (10 秒) | 再試行可能なエラーが発生した後にコネクターを再起動するまで待機する時間 (ミリ秒単位)。 | |
|
| ストリーミング中にコネクターがスキップする操作タイプをコンマで区切ったリスト。以下のタイプの操作をスキップするようにコネクターを設定することができます。
コネクターに操作をスキップしてほしくない場合は、値を | |
| デフォルト値なし |
シグナルをコネクターへの送信に使用されるデータコレクションの完全修飾名。 | |
| source | コネクターに対して有効な信号チャネル名のリスト。デフォルトでは、以下のチャネルが利用可能です。
| |
| デフォルトなし | コネクターに対して有効になっている通知チャネル名のリスト。デフォルトでは、以下のチャネルが利用可能です。
| |
| 1024 | 増分スナップショットのチャンクの実行中にコネクターがメモリーを取得して読み取る行の最大数。スナップショットは、サイズが大きいスナップショットの場合にはクエリーが少なくなるため、チャンクサイズを増やすと効率が上がります。ただし、チャンクサイズが大きい場合には、スナップショットデータのバッファーにより多くのメモリーが必要になります。チャンクサイズは、環境で最適なパフォーマンスを発揮できる値に、調整します。 | |
|
|
増分スナップショットによってキャプチャーされ、ストリーミングの再開後に再キャプチャーされる可能性のあるイベントを重複排除するために、コネクターが増分スナップショット中に使用するウォーターマークメカニズムを指定します。
| |
|
|
レプリケーションスロットから XMIN が読み込まれる頻度 (ミリ秒単位)。XMIN 値は、新しいレプリケーションスロットの開始位置の下限を示す。デフォルト値の | |
|
|
データ変更、スキーマ変更、トランザクション、ハートビートイベントなどのトピック名を決定するために使用する TopicNamingStrategy クラスの名前。デフォルトは | |
|
|
トピック名の区切り文字を指定します。デフォルトは | |
|
| トピック名を保持するために使用されるサイズ (bounded concurrent hash map)。このキャッシュは、与えられたデータコレクションに対応するトピック名を決定するのに役立つ。 | |
|
|
コネクターがハートビートメッセージを送信するトピックの名前を制御します。トピック名のパターンは、 | |
|
|
コネクターがトランザクションのメタデータメッセージを送信するトピックの名前を制御します。トピック名のパターンは、 | |
|
|
初期スナップショットを実行するときにコネクターが使用するスレッドの数を指定します。並列初期スナップショットを有効にするには、プロパティーを 1 より大きい値に設定します。並列初期スナップショットでは、コネクターは複数のテーブルを同時に処理します。 注記
並行初期スナップショットを有効にすると、各テーブルのスナップショットを実行するスレッドの処理完了時間がそれぞれ異なる場合があります。あるテーブルのスナップショットが、他のテーブルよりも完了するまでに大幅に時間がかかる場合、作業を終えたスレッドは待機状態になります。一部の環境では、ロードバランサーやファイアウォールなどのネットワークデバイスは、長時間アイドル状態のままの接続を終了します。スナップショットが完了すると、コネクターがすべてのスナップショットデータを正常に送信した場合でも、コネクターは接続を閉じることができず、例外となり、スナップショットが不完全になります。 | |
|
|
コンテキスト情報を提供するメタデータを追加して、MBean オブジェクト名をカスタマイズするタグを定義します。キーと値のペアのコンマ区切りリストを指定します。各キーは MBean オブジェクト名のタグを表し、対応する値はキーの値を表します。たとえば、 コネクターは、指定されたタグを基本 MBean オブジェクト名に追加します。タグは、メトリクスデータを整理および分類するのに役立ちます。特定のアプリケーションインスタンス、環境、リージョン、バージョンなどを識別するためのタグを定義できます。詳細は、カスタマイズされた MBean 名 を参照してください。 | |
|
|
接続エラーなど、再試行可能なエラーが発生する操作の後に、コネクターがどのように応答するかを指定します。
| |
|
|
コネクターがクエリーの完了を待機する時間をミリ秒単位で指定します。タイムアウト制限を削除するには、値を |
パススルー PostgreSQL コネクター設定プロパティー
コネクターは pass-through プロパティーをサポートしており、これにより Debezium は Apache Kafka プロデューサーとコンシューマーの動作を微調整するためのカスタム設定オプションを指定できます。Kafka プロデューサーとコンシューマーの全設定プロパティーの詳細は、Kafka ドキュメント を参照してください。
PostgreSQL コネクターが Kafka シグナリングトピックとどのように対話するかを設定するためのパススループロパティー
Debezium は、コネクターが Kafka シグナルトピックと対話する方法を制御する signal.* プロパティーのセットを提供します。
以下の表は、Kafka signal プロパティーを説明しています。
| プロパティー | デフォルト | 説明 |
|---|---|---|
| <topic.prefix>-signal | コネクターがアドホックシグナルについて監視する Kafka トピックの名前。 注記 トピックの自動作成 が無効になっている場合は、必要なシグナリングトピックを手動で作成する必要があります。シグナルの順序を維持するには、シグナリングトピックが必要です。シグナリングトピックには単一のパーティションが必要です。 | |
| kafka-signal | Kafka コンシューマーによって使用されるグループ ID の名前。 | |
| デフォルトなし | コネクターが Kafka クラスターへの初期接続を確立するために使用するホストとポートのペアのリスト。各ペアは、Debezium Kafka Connect プロセスによって使用される Kafka クラスターを参照します。 | |
|
| コネクターが信号をポーリングするときに待機する最大ミリ秒数を指定する整数値。 |
シグナリングチャネルの Kafka コンシューマークライアントを設定するためのパススループロパティー
Debezium コネクターでは、Kafka コンシューマーのパススルー設定が可能です。パススルーシグナルのプロパティーは、接頭辞 signals.consumer.* で始まります。たとえば、コネクターは signal.consumer.security.protocol=SSL などのプロパティーを Kafka コンシューマーに渡します。
Debezium は、プロパティーを Kafka シグナルコンシューマーに渡す前に、プロパティーから接頭辞を削除します。
PostgreSQL コネクター sink notification チャネルを設定するためのパススループロパティー
次の表では、Debezium sink notification チャネルの設定に使用できるプロパティーについて説明します。
| プロパティー | デフォルト | 説明 |
|---|---|---|
| デフォルトなし |
Debezium から通知を受信するトピックの名前。このプロパティーは、有効な通知チャネルの 1 つとして |
Debezium コネクターのパススルーデータベースドライバー設定プロパティー
Debezium コネクターでは、データベースドライバーのパススルー設定が可能です。パススルーデータベースプロパティーは接頭辞 driver.* で始まります。たとえば、コネクターは driver.foobar=false などのプロパティーを JDBC URL に渡します。
Debezium は、プロパティーをデータベースドライバーに渡す前に、プロパティーから接頭辞を削除します。
2.6.7. Debezium PostgreSQL コネクターのパフォーマンスの監視 リンクのコピーリンクがクリップボードにコピーされました!
Debezium PostgreSQL コネクターは、Zookeeper、Kafka、および Kafka Connect によって提供される JMX メトリクスの組み込みサポートに加えて、2 種類のメトリクスを提供します。
- スナップショットメトリクス は、スナップショットの実行中にコネクター操作に関する情報を提供します。
- メトリクスのストリーミング は、コネクターが変更をキャプチャーし、変更イベントレコードをストリーミングする際のコネクター操作に関する情報を提供します。
Debezium の監視に関するドキュメント では、JMX を使用してこれらのメトリクスを公開する方法の詳細を説明しています。
2.6.7.1. PostgreSQL コネクタースナップショットおよびストリーミング MBean オブジェクトのカスタマイズされた名前 リンクのコピーリンクがクリップボードにコピーされました!
Debezium コネクターは、コネクターの MBean 名を介してメトリクスを公開します。これらのメトリクスは各コネクターインスタンスに固有であり、コネクターのスナップショット、ストリーミング、およびスキーマ履歴プロセスの動作に関するデータを提供します。
デフォルトでは、正しく設定されたコネクターをデプロイすると、Debezium はさまざまなコネクターメトリクスごとに一意の MBean 名を生成します。コネクタープロセスのメトリクスを表示するには、MBean を監視するように可観測性スタックを設定します。ただし、これらのデフォルトの MBean 名はコネクター設定に依存しており、設定の変更によって MBean 名が変更される場合があります。MBean 名を変更すると、コネクターインスタンスと MBean 間のリンクが切断され、監視アクティビティーが中断されます。このシナリオでは、監視を再開するには、新しい MBean 名を使用するように監視スタックを再設定する必要があります。
MBean 名の変更が原因で監視が中断されないように、カスタムメトリクスタグを設定できます。カスタムメトリクスを設定するには、コネクター設定に custom.metric.tags プロパティーを追加します。このプロパティーは、各キーが MBean オブジェクト名のタグを表し、対応する値がそのタグの値を表すキーと値のペアを受け入れます。たとえば、k1=v1,k2=v2 です。Debezium は、指定されたタグをコネクターの MBean 名に追加します。
コネクターの custom.metric.tags プロパティーを設定した後、指定されたタグに関連付けられたメトリクスを取得するように監視スタックを設定できます。可観測性スタックは、変更可能な MBean 名ではなく、指定されたタグを使用してコネクターを一意に識別します。その後、Debezium が MBean 名の構築方法を再定義したり、コネクター設定の topic.prefix が変更されたりしても、メトリクススクレイプタスクは指定されたタグパターンを使用してコネクターを識別するため、メトリクスの収集は中断されません。
カスタムタグを使用するさらなる利点は、データパイプラインのアーキテクチャーを反映するタグを使用できるため、運用上のニーズに合った方法でメトリクスを整理できることです。たとえば、コネクターアクティビティーのタイプ、アプリケーションコンテキスト、またはデータソースを宣言する値を持つタグを指定できます (例: db1-streaming-for-application-abc)。複数のキーと値のペアを指定すると、指定されたすべてのペアがコネクターの MBean 名に追加されます。
次の例は、タグがデフォルトの MBean 名を変更する方法を示しています。
例2.47 カスタムタグがコネクター MBean 名を変更する方法
デフォルトでは、PostgreSQL コネクターはストリーミングメトリクスに次の MBean 名を使用します。
debezium.postgresql:type=connector-metrics,context=streaming,server=<topic.prefix>
debezium.postgresql:type=connector-metrics,context=streaming,server=<topic.prefix>
custom.metric.tags の値を database=salesdb-streaming,table=inventory に設定すると、Debezium は次のカスタム MBean 名を生成します。
debezium.postgresql:type=connector-metrics,context=streaming,server=<topic.prefix>,database=salesdb-streaming,table=inventory
debezium.postgresql:type=connector-metrics,context=streaming,server=<topic.prefix>,database=salesdb-streaming,table=inventory
2.6.7.2. PostgreSQL データベースのスナップショット作成時の Debezium の監視 リンクのコピーリンクがクリップボードにコピーされました!
MBean は debezium.postgres:type=connector-metrics,context=snapshot,server=<topic.prefix> です。
スナップショット操作がアクティブでない場合や、最後のコネクターの起動後にスナップショットの作成が発生した場合に、スナップショットメトリクスは公開されません。
次の表に、使用可能なスナップショットメトリクスを示します。
| 属性 | タイプ | 説明 |
|---|---|---|
|
| コネクターが読み取りした最後のスナップショットイベント。 | |
|
| コネクターが最新のイベントを読み取りおよび処理してからの経過時間 (ミリ秒単位)。 | |
|
| 前回の開始またはリセット以降にコネクターで確認されたイベントの合計数。 | |
|
| コネクターに設定された include/exclude リストのフィルタリングルールによってフィルターされたイベントの数。 | |
|
| コネクターによって取得されるテーブルのリスト。 | |
|
| snapshotter とメインの Kafka Connect ループの間でイベントを渡すために使用されるキューの長さ。 | |
|
| snapshotter とメインの Kafka Connect ループの間でイベントを渡すために使用されるキューの空き容量。 | |
|
| スナップショットに含まれているテーブルの合計数。 | |
|
| スナップショットによってまだコピーされていないテーブルの数。 | |
|
| スナップショットが起動されたかどうか。 | |
|
| スナップショットが一時停止されたかどうか。 | |
|
| スナップショットが中断されたかどうか。 | |
|
| スナップショットが完了したかどうか。 | |
|
| スナップショットが完了したかどうかに関わらず、これまでスナップショットにかかった時間 (秒単位)。スナップショットが一時停止された時間も含まれます。 | |
|
| スナップショットが一時停止された合計秒数。スナップショットが数回一時停止された場合は、一時停止時間が加算されます。 | |
|
| スナップショットの各テーブルに対してスキャンされる行数が含まれるマップ。テーブルは、処理中に増分がマップに追加されます。スキャンされた 10,000 行ごとに、テーブルの完成時に更新されます。 | |
|
|
キューの最大バッファー (バイト単位)。このメトリクスは、 | |
|
| キュー内のレコードの現在の容量 (バイト単位)。 |
コネクターは、増分スナップショットの実行時に、以下の追加のスナップショットメトリクスも提供します。
2.6.7.3. Debezium PostgreSQL コネクターレコードストリーミングの監視 リンクのコピーリンクがクリップボードにコピーされました!
MBean は debezium.postgres:type=connector-metrics,context=streaming,server=<topic.prefix> です。
以下の表は、利用可能なストリーミングメトリクスのリストです。
| 属性 | タイプ | 説明 |
|---|---|---|
|
| コネクターが読み取られた最後のストリーミングイベント。 | |
|
| コネクターが最新のイベントを読み取りおよび処理してからの経過時間 (ミリ秒単位)。 | |
|
| コネクターを最後に起動してから、またはメトリクスをリセットしてから、ソースデータベースによって報告されたデータ変更イベントの合計数。Debezium が処理するデータ変更ワークロードを表します。 | |
|
| コネクターを最後に起動してから、またはメトリクスをリセットしてから、コネクターによって処理された作成イベントの合計数。 | |
|
| コネクターを最後に起動してから、またはメトリクスをリセットしてから、コネクターによって処理された更新イベントの合計数。 | |
|
| コネクターを最後に起動してから、またはメトリクスをリセットしてから、コネクターによって処理された削除イベントの合計数。 | |
|
| コネクターに設定された include/exclude リストのフィルタリングルールによってフィルターされたイベントの数。 | |
|
| コネクターによって取得されるテーブルのリスト。 | |
|
| ストリーマーとメイン Kafka Connect ループの間でイベントを渡すために使用されるキューの長さ。 | |
|
| ストリーマーとメインの Kafka Connect ループの間でイベントを渡すために使用されるキューの空き容量。 | |
|
| コネクターが現在データベースサーバーに接続されているかどうかを示すフラグ。 | |
|
| 最後の変更イベントのタイムスタンプとそれを処理するコネクターとの間の期間 (ミリ秒単位)。この値には、データベースサーバーとコネクターが実行されているマシンのクロックの差が組み込まれます。 | |
|
| コミットされた処理済みトランザクションの数。 | |
|
| 最後に受信したイベントの位置。 | |
|
| 最後に処理されたトランザクションのトランザクション識別子。 | |
|
|
キューの最大バッファー (バイト単位)。このメトリクスは、 | |
|
| キュー内のレコードの現在の容量 (バイト単位)。 |
2.6.8. Debezium PostgreSQL コネクターによる障害および問題の処理方法 リンクのコピーリンクがクリップボードにコピーされました!
Debezium は、複数のアップストリームデータベースのすべての変更をキャプチャーする分散システムであり、イベントの見逃しや損失は発生しません。システムが正常に操作している場合や、慎重に管理されている場合は、Debezium は変更イベントレコードごとに 1 度だけ 配信します。
PostgreSQL 変更イベントレコードの 1 回限りの配信は、開発者プレビュー機能のみとなっています。開発者プレビューソフトウェアは、Red Hat では一切サポートされておらず、機能的に完全ではなく、実稼働環境に対応していません。開発者プレビューのソフトウェアを実稼働ワークロードまたはビジネスクリティカルなワークロードには使用しないでください。開発者プレビューソフトウェアは、今後 Red Hat 製品サービスとして追加される可能性のある製品ソフトウェアを前もって早期に利用できます。お客様はこのソフトウェアを使用して機能をテストし、開発プロセス中にフィードバックを提供できます。このソフトウェアにはドキュメントが存在しない可能性があり、変更または削除される可能性があります。また、限定的なテストしか行われていません。Red Hat は、関連する SLA なしに、開発者プレビューソフトウェアに対するフィードバックを送信する手段を提供する場合があります。
Red Hat 開発者プレビューソフトウェアのサポート範囲の詳細は、開発者プレビューのサポート範囲 を参照してください。
障害が発生しても、システムはイベントを失いません。ただし、障害からの回復中に、コネクターが重複した変更イベントを発行する可能性があります。このような正常でない状態では、Debezium は Kafka と同様に、変更イベントを 少なくとも 1 回 配信します。
詳細は以下を参照してください。
設定および起動エラー
以下の状況では、起動時にコネクターが失敗し、エラーまたは例外がログに記録され、実行が停止されます。
- コネクターの設定が無効である。
- 指定の接続パラメーターを使用してコネクターを PostgreSQL に接続できない。
- コネクターは (LSN を使用して) PostgreSQL WAL の以前に記録された位置から再起動され、PostgreSQL ではその履歴が利用できなくなります。
このような場合、エラーメッセージには問題の詳細が含まれ、推奨される回避策も含まれることがあります。設定の修正したり、PostgreSQL の問題に対処した後、コネクターを再起動します。
PostgreSQL コネクターは、最後に処理されたオフセットを PostgreSQL LSN の形式で外部に保存します。コネクターが再起動し、サーバーインスタンスに接続すると、コネクターはサーバーと通信し、その特定のオフセットからストリーミングを続行します。このオフセットは、Debezium レプリケーションスロットがそのままの状態である限り利用できます。プライマリーサーバーでレプリケーションスロットを削除しないでください。削除するとデータが失われます。スロットが削除された場合の障害例は、次のセクションを参照してください。
PostgreSQL 15 以前
PostgreSQL 15 以前のクラスターでは、プライマリーサーバー上にのみ論理レプリケーションスロットを作成できます。その結果、PostgreSQL 15 環境では、Debezium PostgreSQL コネクターはクラスター内のアクティブなプライマリーサーバーからのイベントのみをキャプチャーできるようになります。PostgreSQL 15 クラスターでは、プライマリーノード上のレプリケーションスロットはレプリカサーバーに伝播されません。プライマリーサーバーがダウンした場合は、スタンバイノードをプライマリーにプロモートする必要があります。
PostgreSQL 16 以降
Debezium を PostgreSQL 16 以降で使用する場合、レプリカに論理レプリケーションスロットを作成できますが、レプリカのレプリケーションスロットをプライマリーサーバーの対応するスロットと手動で同期する必要があります。レプリカスロットの同期は自動ではありません。
PostgreSQL 17 以降
Debezium を PostgreSQL 17 以降で使用する場合、自動フェイルオーバー用にプライマリーサーバーでレプリケーションスロットを設定できるため、Debezium が変更イベントを見逃すことはありません。レプリケーションスロットがフェイルオーバー用に設定されている場合、PostgreSQL はレプリケーションスロットをプライマリーからレプリカに自動的に同期し、レプリカがプロモートして新しいプライマリーになった後も Debezium がスロットからの読み取りを続行できるようにします。
Debezium と PostgreSQL 17 の使用、およびフェイルオーバーレプリケーションスロットを設定する機能は、テクノロジープレビュー機能です。テクノロジープレビュー機能は、Red Hat 製品のサービスレベルアグリーメント (SLA) の対象外であり、機能的に完全ではないことがあります。Red Hat は、実稼働環境でこれらを使用することを推奨していません。テクノロジープレビュー機能は、最新の製品機能をいち早く提供して、開発段階で機能のテストを行い、フィードバックを提供していただくことを目的としています。Red Hat のテクノロジープレビュー機能のサポート範囲に関する詳細は、テクノロジープレビュー機能のサポート範囲 を参照してください。
一部のマネージド PostgreSQL サービス (AWS RDS や GCP CloudSQL など) では、ディスクレプリケーションを使用してスタンバイへのレプリケーションを実装します。その結果、これらのサービスはレプリケーションスロットを自動的にレプリケートし、フェイルオーバー後に使用できるようにします。
新しいプライマリーには、pgoutput プラグインを使用するように設定されたレプリケーションスロットが必要であり、変更をキャプチャーするデータベースが含まれている必要があります。その後でのみ、コネクターが新しいサーバーを示すようにし、コネクターを再起動することができます。
2.6.8.1. PostgreSQL 17 クラスターの障害からの復旧 リンクのコピーリンクがクリップボードにコピーされました!
PostgreSQL 17 以降を実行する環境では、フェイルオーバーレプリケーションスロットの使用がサポートされています。PostgreSQL 17 以降のクラスターで障害が発生し、スタンバイにフェイルオーバーレプリケーションスロットが設定されている場合は、次の手順を実行して、Debezium がキャプチャーを再開できるようにします。
- データが失われていない完全なレプリケーションスロットがあることを確認できるまで、Debezium を一時停止します。
- アプリケーションが 新しい プライマリーに書き込めるようにする前に、Debezium レプリケーションスロットを再作成します。レプリケーションスロットを再作成する前に、アプリケーションが新しいプライマリーに書き込むことを許可すると、アプリケーションは変更イベントを見逃す可能性があります。
- コネクターを再起動します。
-
元のプライマリーが失敗する前に発生した変更について、Debezium がレプリケーションスロットから LSN を読み取ることができることを確認します。
たとえば、障害が発生したプライマリーのバックアップを障害発生直前の時点から復元し、スロットに記録されている最後の位置を特定します。バックアップデータの取得は管理上難しい場合がありますが、バックアップを検査することで、Debezium がすべての変更を消費したかどうかを確実に判断するメカニズムが提供されます。
2.6.8.2. PostgreSQL 15 以前のクラスターで障害が発生した後に新しいプライマリーサーバーからデータをキャプチャーする リンクのコピーリンクがクリップボードにコピーされました!
PostgreSQL 15 以前のクラスターでプライマリサーバーが障害を起こした後、元のプライマリーサーバーではなく、以前のレプリカサーバーのいずれかからデータをキャプチャーするように Debezium を設定することが必要になる場合があります。Debezium が以前のレプリカサーバーからデータをキャプチャーできるようにするには、次の手順を実行します。
手順
- クラスターの障害の原因となった状態を修復します。
コネクターが停止している間に、コネクター設定内のプロパティーの値を更新して、新しいサーバーの詳細を反映します。たとえば、次のプロパティーの正しい値が設定に含まれていることを確認します。
次のタスクを完了して、新しいプライマリーサーバーを Debezium で動作するように設定します。
- サーバー上で レプリケーションスロットを設定 します。
Debezium が レプリケーションを実行 し、サーバー上で パブリケーションを作成 できることを確認します。
PostgreSQL サーバーを Debezium で動作するように設定する方法の詳細は、PostgreSQL のセットアップ を参照してください。
-
スタンバイ PostgreSQL ノードを
primaryにプロモートします。 - コネクターを再起動します。
-
スナップショットモード を
alwaysに設定し、新しいプライマリーサーバーでスナップショットを実行して、サーバー上のデータの初期状態をキャプチャーし、データが失われないようにします。
Kafka Connect のプロセスは正常に停止する
Kafka Connect が分散モードで実行され、Kafka Connect プロセスが正常に停止した場合を想定します。Kafka Connect はそのプロセスをシャットダウンする前に、プロセスのコネクタータスクをそのグループの別の Kafka Connect プロセスに移行します。新しいコネクタータスクは、以前のタスクが停止した場所でプロセスを開始します。コネクタータスクが正常に停止され、新しいプロセスで再起動されるまでの間、プロセスに短い遅延が発生します。
Kafka Connect プロセスのクラッシュ
Kafka Connector プロセスが予期せず停止した場合、最後に処理されたオフセットを記録せずに、実行中のコネクタータスクが終了します。Kafka Connect が分散モードで実行されている場合は、Kafka Connect は他のプロセスでこれらのコネクタータスクを再起動します。ただし、PostgreSQL コネクターは、以前のプロセスで最後に 記録された オフセットから再開します。つまり、新しい代替タスクによって、クラッシュの直前に処理された同じ変更イベントが生成される可能性があります。重複するイベントの数は、オフセットのフラッシュ期間とクラッシュの直前のデータ変更の量によって異なります。
障害からの復旧中に一部のイベントが重複された可能性があるため、コンシューマーは常に重複されたイベントがある可能性を想定する必要があります。Debezium の変更はべき等であるため、一連のイベントは常に同じ状態になります。
各変更イベントレコードでは Debezium コネクターは、イベント発生時の PostgreSQL サーバー時間、サーバートランザクションの ID、トランザクションの変更が書き込まれたログ先行書き込みの位置など、イベント発生元に関するソース固有の情報を挿入します。コンシューマーは、LSN を重点としてこの情報を追跡し、イベントが重複しているかどうかを判断します。
コネクターの一定期間の停止
コネクターが正常に停止された場合、データベースを引き続き使用できます。変更はすべて PostgreSQL WAL に記録されます。コネクターが再起動すると、停止した場所で変更のストリーミングが再開されます。つまり、コネクターが停止した間に発生したデータベースのすべての変更に対して変更イベントレコードが生成されます。
適切に設定された Kafka クラスターは大量のスループットを処理できます。Kafka Connect は Kafka のベストプラクティスに従って作成され、十分なリソースがあれば Kafka Connect コネクターも非常に多くのデータベース変更イベントを処理できます。このため、Debezium コネクターがしばらく停止した後に再起動すると、停止中に発生したデータベースの変更に対して処理の遅れを取り戻す可能性が非常に高くなります。遅れを取り戻すのに掛かる時間は、Kafka の機能やパフォーマンス、および PostgreSQL のデータに加えられた変更の量によって異なります。