8.2. Debezium PostgreSQL コネクターの仕組み


Debezium PostgreSQL コネクターを最適に設定および実行するには、コネクターによるスナップショットの実行方法、変更イベントのストリーム方法、Kafka トピック名の決定方法、およびメタデータの使用方法を理解すると便利です。

詳細は以下を参照してください。

8.2.1. PostgreSQL コネクターのセキュリティー

Debezium コネクターを使用して PostgreSQL データベースから変更をストリーミングするには、コネクターは特定の権限がデータベースで必要になります。必要な権限を付与する方法の 1 つとして、ユーザーに superuser 権限を付与する方法がありますが、これにより PostgreSQL データが不正アクセスによって公開される可能性ああります。Debezium ユーザーに過剰な権限を付与するのではなく、特定の特権を付与する専用の Debezium レプリケーションユーザーを作成することが推奨されます。

Debezium PostgreSQL ユーザーの権限設定の詳細は、パーミッションの設定 を参照してください。PostgreSQL の論理レプリケーションセキュリティーの詳細は、PostgreSQL のドキュメント を参照してください。

8.2.2. Debezium PostgreSQL コネクターによるデータベーススナップショットの実行方法

ほとんどの PostgreSQL サーバーは、WAL セグメントにデータベースの完全な履歴を保持しないように設定されています。つまり、PostgreSQL コネクターは WAL のみを読み取ってもデータベースの履歴全体を確認できません。そのため、コネクターが最初に起動すると、データベースの最初の 整合性スナップショット が実行されます。

スナップショットの詳細は、以下のセクションを参照してください。

初期スナップショットのデフォルトのワークフロー動作

スナップショットを実行するためのデフォルト動作は、以下の手順で構成されます。この動作を変更するには、snapshot.mode コネクター設定プロパティーinitial 以外の値に設定します。

  1. SERIALIZABLE、READ ONLY、DEFERRABLE 分離レベルでトランザクションを開始し、このトランザクションでの後続の読み取りがデータの単一バージョンに対して行われるようにします。他のクライアントによる後続の INSERTUPDATE、および DELETE 操作によるデータの変更は、このトランザクションでは確認できません。
  2. サーバーのトランザクションログの現在の位置を読み取ります。
  3. データベーステーブルとスキーマをスキャンし、各行の READ イベントを生成し、そのイベントを適切なテーブル固有の Kafka トピックに書き込みます。
  4. トランザクションをコミットします。
  5. コネクターオフセットにスナップショットの正常な完了を記録します。

コネクターに障害が発生した場合、コネクターのリバランスが発生した場合、または 1 の後で 5 の完了前に停止した場合、コネクターは再起動後に新しいスナップショットを開始します。コネクターが最初のスナップショットを完了すると、PostgreSQL コネクターは手順 2 で読み取る位置からストリーミングを続行します。これにより、コネクターが更新を見逃さないようします。何らかの理由でコネクターが再び停止した場合、コネクターは再起動後に最後に停止した位置から変更のストリーミングを続行します。

表8.1 snapshot.mode コネクター設定プロパティーのオプション
オプション説明

always

コネクターは起動時に常にスナップショットを実行します。スナップショットが完了した後、コネクターは上記の手順の 3. から変更のストリーミングを続行します。このモードは、以下のような状況で使用すると便利です。

  • 一部の WAL セグメントが削除され、利用できなくなったことを認識している。
  • クラスターの障害後に、新しいプライマリーが昇格された。always スナップショットモードを使用すると、新しいプライマリーが昇格された後、コネクターが新しいプライマリーで再起動するまでに加えられた変更をコネクターが見逃さないようにすることができます。

never

コネクターはスナップショットを実行しません。このようにコネクターを設定したすると、起動時の動作は次のようになります。Kafka オフセットトピックに以前保存された LSN がある場合、コネクターはその位置から変更をストリーミングを続行します。保存された LSN がない場合、コネクターはサーバーで PostgreSQL の論理レプリケーションスロットが作成された時点で変更のストリーミングを開始します。never スナップショットモードは、対象のすべてのデータが WAL に反映されている場合にのみ便利です。

initial (デフォルト)

Kafka オフセットトピックが存在しない場合、コネクターはデータベーススナップショットを実行します。データベースのスナップショットが完了すると、Kafka オフセットトピックが書き込まれます。Kafka オフセットトピックに以前保存された LSN がある場合、コネクターはその位置から変更をストリーミングを続行します。

initial_only

コネクターはデータベースのスナップショットを実行し、変更イベントレコードをストリーミングする前に停止します。コネクターが起動していても、停止前にスナップショットを完了しなかった場合、コネクターはスナップショットプロセスを再起動し、スナップショットの完了時に停止します。

exported

非推奨、全てのモードがロックレスになります。

8.2.3. アドホックスナップショット

デフォルトでは、コネクターは初回スナップショット操作の開始後にのみ実行されます。通常の状況では、この最初のスナップショットが作成されると、コネクターではスナップショットプロセスは繰り返し処理されません。コネクターがキャプチャーする今後の変更イベントデータはストリーミングプロセス経由でのみ行われます。

ただし、場合によっては、最初のスナップショット中にコネクターを取得したデータが古くなったり、失われたり、または不完全となったり可能性があります。テーブルデータを再キャプチャーするメカニズムを提供するため、Debezium にはアドホックスナップショットを実行するオプションがあります。データベースで以下が変更されたことで、アドホックスナップショットが実行される場合があります。

  • コネクター設定は、異なるテーブルセットをキャプチャーするように変更されます。
  • Kafka トピックを削除して、再構築する必要があります。
  • 設定エラーや他の問題が原因で、データの破損が発生します。

アドホックと呼ばれるスナップショット を開始することで、以前にスナップショットをキャプチャーしたテーブルのスナップショットを再実行できます。アドホックスナップショットには、シグナルテーブル を使用する必要があります。シグナルリクエストを Debezium シグナルテーブルに送信して、アドホックスナップショットを開始します。

既存のテーブルのアドホックスナップショットを開始すると、コネクターはテーブルにすでに存在するトピックにコンテンツを追加します。既存のトピックが削除された場合には、トピックの自動作成 が有効になっているのであれば、Debezium は自動的にトピックを作成できます。

アドホックのスナップショットシグナルは、スナップショットに追加するテーブルを指定します。スナップショットは、データベースの内容全体をキャプチャーしたり、データベース内のテーブルのサブセットのみをキャプチャーしたりできます。また、スナップショットは、データベース内のテーブルの内容のサブセットをキャプチャできます。

execute-snapshot メッセージをシグナルテーブルに送信してキャプチャーするテーブルを指定します。以下の表で説明されているように、execute-snapshot シグナルのタイプを incremental に設定し、スナップショットに追加するテーブルの名前を指定します。

表8.2 アドホックの execute-snapshot シグナルレコードの例
フィールドデフォルト

type

incremental

実行するスナップショットのタイプを指定します。
タイプの設定は任意です。現在要求できるのは、incremental スナップショットのみです。

data-collections

該当なし

スナップショットされるテーブルの完全修飾名にマッチする正規表現を含む配列。
名前の形式は signal.data.collection 設定オプションと同じです。

additional-condition

該当なし

テーブルの内容のサブセットを取得するために、テーブルの列に基づいて条件を指定するオプションの文字列。

surrogate-key

該当なし

スナップショット処理中にコネクターがテーブルのプライマリーキーとして使用する列名を指定するオプションの文字列。

アドホックスナップショットのトリガー

execute-snapshot シグナルタイプのエントリーをシグナルテーブルに追加して、アドホックスナップショットを開始します。コネクターがメッセージを処理した後に、スナップショット操作を開始します。スナップショットプロセスは、最初と最後のプライマリーキーの値を読み取り、これらの値を各テーブルの開始ポイントおよびエンドポイントとして使用します。テーブルのエントリー数と設定されたチャンクサイズに基づいて、Debezium はテーブルをチャンクに分割し、チャンクごとに 1 度に 1 つずつスナップショットを順番に作成していきます。

現在、execute-snapshot アクションタイプは 増分スナップショット のみをトリガーします。詳細は、スナップショットの増分を参照してください。

8.2.4. 増分スナップショット

スナップショットを柔軟に管理するため、Debezium には 増分スナップショット と呼ばれる補助スナップショットメカニズムが含まれています。増分スナップショットは、Debezium コネクターにシグナルを送信する ための Debezium メカニズムに依存します。

増分スナップショットでは、最初のスナップショットのように、データベースの完全な状態を一度にすべてキャプチャーする代わりに、一連の設定可能なチャンクで各テーブルを段階的にキャプチャーします。スナップショットがキャプチャーするテーブルと、各チャンクのサイズ を指定できます。チャンクのサイズにより、データベース上の各フェッチ操作中にスナップショットで収集される行数が決まります。増分スナップショットのデフォルトのチャンクサイズは 1024 行です。

増分スナップショットが進むと、Debezium はウォーターマークを使用して進捗を追跡し、キャプチャーする各テーブル行のレコードを管理します。この段階的なアプローチでは、標準の初期スナップショットプロセスと比較して、以下の利点があります。

  • スナップショットが完了するまで、ストリーミングストリーミングを延期する代わりに、ストリームしたデータキャプチャーと並行して増分スナップショットを実行できます。コネクターはスナップショットプロセス全体で変更ログからのほぼリアルタイムイベントをキャプチャーし続け、他の操作はブロックしません。
  • 増分スナップショットの進捗が中断された場合は、データを失うことなく再開できます。プロセスが再開すると、スナップショットは最初からテーブルをキャプチャーするのではなく、停止した時点から開始します。
  • いつでも増分スナップショットを実行し、必要に応じてプロセスを繰り返してデータベースの更新に適合できます。たとえば、コネクター設定を変更してテーブルを table.include.list プロパティーに追加した後にスナップショットを再実行します。

増分スナップショットプロセス

増分スナップショットを実行する場合には、Debezium は各テーブルをプライマリーキー別に分類して、設定されたチャンクサイズ に基づいてテーブルをチャンクに分割します。チャンクごとに作業し、テーブルの行ごとにチャンクでキャプチャーします。キャプチャーする行ごとに、スナップショットは READ イベントを出力します。そのイベントは、対象となるチャンクのスナップショットを開始する時の行の値を表します。

スナップショットの作成が進むにつれ、他のプロセスがデータベースへのアクセスを継続し、テーブルレコードが変更される可能性があります。このような変更を反映させるように、通常通りに INSERTUPDATEDELETE 操作がトランザクションログにコミットされます。同様に、継続中の Debezium ストリーミングプロセスは、これらの変更イベントを検出し、対応する変更イベントレコードを Kafka に出力します。

Debezium を使用してプライマリーキーが同じレコード間での競合を解決する方法

場合によっては、ストリーミングプロセスが出力する UPDATE または DELETE イベントを順番に受信できます。つまり、ストリーミングプロセスは、スナップショットがその行の READ イベントが含まれるチャンクをキャプチャーする前に、テーブルの行を変更するイベントを生成する可能性があります。スナップショットが最終的に対象の行にあった READ イベントを出力すると、その値はすでに置き換えられています。Debezium は、シーケンスが到達する増分スナップショットイベントが正しい論理順序で処理されるように、競合を解決するためにバッファースキームを使用します。スナップショットのイベント間で競合が発生し、ストリームされたイベントが解決されてからでないと、Debezium はイベントのレコードを Kafka に送信しません。

スナップショットウィンドウ

遅れて入ってきた READ イベントと、同じテーブルの行を変更するストリーミングイベント間の競合の解決を容易にするために、Debezium は スナップショットウィンドウ と呼ばれるものを使用します。スナップショットウィンドウは、増分スナップショットが指定のテーブルチャンクのデータをキャプチャーしている途中に、間隔を決定します。チャンクのスナップショットウィンドウを開く前に、Debezium は通常の動作に従い、トランザクションログから直接ターゲットの Kafka トピックにイベントをダウンストリームに出力します。ただし、特定のチャンクのスナップショットが開放された瞬間から終了するまで、Debezium は重複除去のステップを実行して、プライマリーキーが同じイベント間での競合を解決します。

データコレクションごとに、Debezium は 2 種類のイベントを出力し、それらの両方のレコードを単一の宛先 Kafka トピックに保存します。テーブルから直接キャプチャーするスナップショットレコードは、READ 操作として出力されます。その間、ユーザーはデータコレクションのレコードの更新を続け、各コミットを反映するようにトランザクションログが更新されるので、Debezium は変更ごとに UPDATE または DELETE 操作を出力します。

スナップショットウィンドウが開放され、Debezium がスナップショットチャンクの処理を開始すると、スナップショットレコードをメモリーバッファーに提供します。スナップショットウィンドウ中に、バッファー内の READ イベントのプライマリーキーは、受信ストリームイベントのプライマリーキーと比較されます。一致するものが見つからない場合、ストリーミングされたイベントレコードが Kafka に直接送信されます。Debezium が一致を検出すると、バッファーされた READ イベントを破棄し、ストリーミングされたレコードを宛先トピックに書き込みます。これは、ストリーミングされたイベントが静的スナップショットイベントよりも論理的に優先されるためです。チャンクのスナップショットウィンドウが終了すると、バッファーに含まれるのは、関連するトランザクションログイベントが存在しない READ イベントのみです。Debezium は、これらの残りの READ イベントをテーブルの Kafka トピックに出力します。

コネクターは各スナップショットチャンクにプロセスを繰り返します。

警告

PostgreSQL の Debezium コネクターでは、増分スナップショットの実行中のスキーマの変更はサポートしません。増分スナップショットの開始 前にスキーマの変更が行われ、シグナルが送信された後にスキーマの変更が行われた場合は、スキーマの変更を正しく処理するために、パススルーの設定オプション database.autosaveconservative に設定されます。

8.2.4.1. 増分スナップショットのトリガー

現在、増分スナップショットを開始する唯一の方法は、アドホックスナップショットシグナル をソースデータベースのシグナルテーブルに送信することです。

シグナルを SQL INSERT クエリーとしてシグナルテーブルに送信します。

Debezium がシグナルテーブルの変更を検出すると、シグナルを読み取り、要求されたスナップショット操作を実行します。

送信するクエリーはスナップショットに追加するテーブルを指定し、必要に応じてスナップショット操作の種類を指定します。現在、スナップショット操作で唯一の有効なオプションはデフォルト値の incremental だけです。

スナップショットに追加するテーブルを指定するには、テーブルをリストする data-collections 配列またはテーブルの照合に使用する正規表現の配列を指定します。以下に例を示します。

{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}

増分スナップショットシグナルの data-collections アレイにはデフォルト値がありません。data-collections アレイが空である場合には、アクションが不要であり、スナップショットを実行しないことが、Debezium で検出されます。

注記

スナップショットに含めるテーブルの名前に、データベース、スキーマ、またはテーブルの名前にドット (.) が含まれている場合、そのテーブルを data-collections 配列に追加するには、名前の各パートを二重引用符でエスケープする必要があります。

たとえば、以下のようなテーブルを含めるには public スキーマに存在し、その名前が My.Tableのテーブルを含めるには、"public"."My.Table" の形式を使用します。

前提条件

ソースシグナリングチャネルを使用して増分スナップショットをトリガーする

  1. SQL クエリーを送信し、アドホック増分スナップショット要求をシグナルテーブルに追加します。

    INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<tableName>","<tableName>"],"type":"<snapshotType>","additional-condition":"<additional-condition>"}');

    以下に例を示します。

    INSERT INTO myschema.debezium_signal (id, type, data) 1
    values ('ad-hoc-1',   2
        'execute-snapshot',  3
        '{"data-collections": ["schema1.table1", "schema2.table2"], 4
        "type":"incremental"}, 5
        "additional-condition":"color=blue"}'); 6

    コマンドの idtype、および data パラメーターの値は、シグナルテーブルのフィールド に対応します。

    以下の表では、この例のパラメーターを説明しています。

    表8.3 シグナルテーブルに増分スナップショットシグナルを送信する SQL コマンドのフィールドの説明
    項目説明

    1

    myschema.debezium_signal

    ソースデータベースにあるシグナルテーブルの完全修飾名を指定します。

    2

    ad-hoc-1

    id パラメーターは、シグナルリクエストの ID 識別子として割り当てられる任意の文字列を指定します。
    この文字列を使用して、シグナルテーブルのエントリーへのログメッセージを特定します。Debezium はこの文字列を使用しません。代わりに、スナップショット作成中に、Debezium は独自の ID 文字列をウォーターマークシグナルとして生成します。

    3

    execute-snapshot

    type パラメーターを指定し、シグナルがトリガーする操作を指定します。

    4

    data-collections

    シグナルの data フィールドの必須コンポーネントで、スナップショットに含めるテーブル名の配列またはテーブル名と一致する正規表現を指定します。
    この配列は、完全修飾名でテーブルをマッチさせる正規表現をリストアップします。signal.data.collection 設定プロパティーでコネクターのシグナリングテーブル名を指定するのと同じ形式を使用します。

    5

    incremental

    実行するスナップショット操作の種類指定するシグナルの data フィールドの任意のtype コンポーネント。
    現在、唯一の有効なオプションはデフォルト値 incremental だけです。
    値を指定しない場合には、コネクターは増分スナップショットを実行します。

    6

    additional-condition

    テーブルの内容のサブセットを取得するために、テーブルの列に基づいて条件を指定するオプションの文字列。additional-condition パラメーターの詳細は、additional-condition 付きのアドホック増分スナップショット を参照してください。

additional-condition 付きのアドホック増分スナップショット

スナップショットに、テーブル内のコンテンツのサブセットのみを含める場合は、スナップショットシグナルシグナルに additional-condition パラメーターを追加してシグナル要求を変更できます。

一般的なスナップショットの SQL クエリーは、以下の形式を取ります。

SELECT * FROM <tableName> ....

additional-condition パラメーターを追加して、以下の例のように WHERE 条件を SQL クエリーに追加します。

SELECT * FROM <tableName> WHERE <additional-condition> ....

以下の例は、シグナルテーブルに追加の条件を含むアドホック増分スナップショット要求を送信する SQL クエリーを示しています。

INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<tableName>","<tableName>"],"type":"<snapshotType>","additional-condition":"<additional-condition>"}');

たとえば、以下の列が含まれる products テーブルがあるとします。

  • id (プライマリーキー)
  • color
  • quantity

products テーブルの増分スナップショットに color=blue のデータ項目のみを含める場合は、次の SQL ステートメントを使用してスナップショットをトリガーできます。

INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.products"],"type":"incremental", "additional-condition":"color=blue"}');

additional-condition パラメーターを使用すると、列が 2 つ以上となる条件を指定することもできます。たとえば、前述の例の products テーブルを使用して、color=blue および quantity>10 だけに一致するアイテムのみのデータが含まれる増分スナップショットをトリガーするクエリーを送信できます。

INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.products"],"type":"incremental", "additional-condition":"color=blue AND quantity>10"}');

以下の例は、コネクターによってキャプチャーされる増分スナップショットイベントの JSON を示しています。

例: 増分スナップショットイベントメッセージ

{
    "before":null,
    "after": {
        "pk":"1",
        "value":"New data"
    },
    "source": {
        ...
        "snapshot":"incremental" 1
    },
    "op":"r", 2
    "ts_ms":"1620393591654",
    "transaction":null
}

項目フィールド名説明

1

snapshot

実行するスナップショット操作タイプを指定します。
現在、唯一の有効なオプションはデフォルト値 incremental だけです。
シグナルテーブルに送信する SQL クエリーでの type 値の指定は任意です。
値を指定しない場合には、コネクターは増分スナップショットを実行します。

2

op

イベントタイプを指定します。
スナップショットイベントの値は r で、READ 操作を示します。

8.2.4.2. Kafka シグナルチャネルを使用して増分スナップショットをトリガーする

設定された Kafka トピック にメッセージを送信して、コネクターにアドホック増分スナップショットを実行するよう要求できます。

Kafka メッセージのキーは、topic.prefix コネクター設定オプションの値と一致する必要があります。

メッセージの値は、typedata フィールドが含まれる JSON オブジェクトとなっています。

シグナルタイプは execute-snapshot で、data フィールドには以下のフィールドが必要です。

表8.4 スナップショットデータフィールドの実行
フィールドデフォルト

type

incremental

実行するスナップショットのタイプ。現在、Debezium は incremental 型のみをサポートしています。
詳細は次のセクションを参照してください。

data-collections

該当なし

スナップショットに含めるテーブルの完全修飾名と一致する、コンマ区切りの正規表現の配列。
signal.data.collection 設定オプションに必要な形式と同じ形式を使用して名前を指定します。

additional-condition

該当なし

コネクターがスナップショットに含める列のサブセットを指定するために評価する条件を指定するオプションの文字列。

execute-snapshot Kafka メッセージの例:

Key = `test_connector`

Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`

追加条件付きのアドホック増分スナップショット

Debezium は additional-condition フィールドを使用してテーブルのコンテンツのサブセットを選択します。

通常、Debezium はスナップショットを実行するときに、次のような SQL クエリーを実行します。

SELECT * FROM <tableName> …​.

スナップショットリクエストに additional-condition が含まれる場合、次のように additional-condition が SQL クエリーに追加されます。

SELECT * FROM <tableName> WHERE <additional-condition> …​.

たとえば、列 id (プライマリーキー)、color、および brand を含む products テーブルがある場合、スナップショットに color='blue' のコンテンツのみを含める場合は、スナップショットをリクエストするときに、コンテンツをフィルタリングする additional-condition ステートメントを追加することができます。

Key = `test_connector`

Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.products"], "type": "INCREMENTAL", "additional-condition":"color='blue'"}}`

additional-condition ステートメントを使用して、複数の列に基づいて条件を渡すことができます。たとえば、前の例と同じ products テーブルを使用して、color='blue' および brand='MyBrand' である products テーブルのコンテンツのみをスナップショットに含める場合は、次のリクエストを送信できます。

Key = `test_connector`

Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.products"], "type": "INCREMENTAL", "additional-condition":"color='blue' AND brand='MyBrand'"}}`

8.2.4.3. 増分スナップショットの停止

ソースデータベースのテーブルにシグナルを送信して、増分スナップショットを停止することもできます。SQL INSERT クエリーを送信して、停止スナップショットシグナルをテーブルに送信します。

Debezium はシグナルテーブルの変更を検出した後、シグナルを読み、増分スナップショット操作が進行中であればそれを停止します。

送信するクエリーは、incremental のスナップショット操作を指定し、任意で、削除する実行中のスナップショットのテーブルを指定します。

前提条件

ソースシグナリングチャネルを使用して増分スナップショットを停止する

  1. SQL クエリーを送信して、シグナリングテーブルへのアドホックインクリメンタルスナップショットを停止します。

    INSERT INTO <signalTable> (id, type, data) values ('<id>', 'stop-snapshot', '{"data-collections": ["<tableName>","<tableName>"],"type":"incremental"}');

    以下に例を示します。

    INSERT INTO myschema.debezium_signal (id, type, data) 1
    values ('ad-hoc-1',   2
        'stop-snapshot',  3
        '{"data-collections": ["schema1.table1", "schema2.table2"], 4
        "type":"incremental"}'); 5

    signal コマンドの idtype、および data パラメーターの値は、シグナルテーブルのフィールド に対応します。

    以下の表では、この例のパラメーターを説明しています。

    表8.5 シグナリングテーブルに増分スナップショット停止信号を送信するための SQL コマンドのフィールドの説明
    項目説明

    1

    myschema.debezium_signal

    ソースデータベースにあるシグナルテーブルの完全修飾名を指定します。

    2

    ad-hoc-1

    id パラメーターは、シグナルリクエストの ID 識別子として割り当てられる任意の文字列を指定します。
    この文字列を使用して、シグナルテーブルのエントリーへのログメッセージを特定します。Debezium はこの文字列を使用しません。

    3

    stop-snapshot

    type パラメーターを指定し、シグナルがトリガーする操作を指定します。

    4

    data-collections

    シグナルの data フィールドのオプションコンポーネントで、スナップショットから削除するテーブル名の配列またはテーブル名とマッチする正規表現を指定します。
    この配列は、完全修飾名でテーブルをマッチさせる正規表現をリストアップします。signal.data.collection 設定プロパティーでコネクターのシグナリングテーブル名を指定するのと同じ形式を使用します。data フィールドのこのコンポーネントを省略すると、シグナルは進行中の増分スナップショット全体を停止します。

    5

    incremental

    停止させるスナップショット操作の種類を指定する信号の data フィールドの必須コンポーネント。
    現在、有効な唯一のオプションは incremental です。
    type の値を指定しない場合、シグナルは増分スナップショットの停止に失敗します。

8.2.4.4. Kafka シグナリングチャネルを使用して増分スナップショットを停止する

設定された Kafka シグナルトピック にシグナルメッセージを送信して、アドホック増分スナップショットを停止できます。

Kafka メッセージのキーは、topic.prefix コネクター設定オプションの値と一致する必要があります。

メッセージの値は、typedata フィールドが含まれる JSON オブジェクトとなっています。

シグナルタイプは stop-snapshot で、data フィールドには以下のフィールドが必要です。

表8.6 スナップショットデータフィールドの実行
フィールドデフォルト

type

incremental

実行するスナップショットのタイプ。現在、Debezium は incremental 型のみをサポートしています。
詳細は次のセクションを参照してください。

data-collections

該当なし

スナップショットに含めるテーブルの完全修飾名と一致する、コンマ区切りの正規表現のオプションの配列。
signal.data.collection 設定オプションに必要な形式と同じ形式を使用して名前を指定します。

次の例は、典型的な stop-snapshot の Kafka メッセージを示しています。

Key = `test_connector`

Value = `{"type":"stop-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`

8.2.5. Debezium PostgreSQL コネクターによる変更イベントレコードのストリーミング方法

通常、PostgreSQL コネクターは、接続されている PostgreSQL サーバーから変更をストリーミングするのに大半の時間を費やします。このメカニズムは、PostgreSQL のレプリケーションプロトコル に依存します。このプロトコルにより、クライアントはログシーケンス番号 (LSN) と呼ばれる特定の場所で変更がサーバーのトランザクションログにコミットされる際に、サーバーから変更を受信することができます。

サーバーがトランザクションをコミットするたびに、別のサーバープロセスが 論理デコードプラグイン からコールバック関数を呼び出します。この関数はトランザクションからの変更を処理し、特定の形式 (Debezium プラグインの場合は Protobuf または JSON) に変換して、出力ストリームに書き込みます。その後、クライアントは変更を使用できます。

Debezium PostgreSQL コネクターは PostgreSQL クライアントとして動作します。コネクターが変更を受信すると、イベントを Debezium の createupdate、または delete イベントに変換します。これには、イベントの LSN が含まれます。PostgreSQL コネクターは、同じプロセスで実行されている Kafka Connect フレームワークにレコードのこれらの変更イベントを転送します。Kafka Connect プロセスは、変更イベントレコードを適切な Kafka トピックに生成された順序で非同期に書き込みます。

Kafka Connect は定期的に最新の オフセット を別の Kafka トピックに記録します。オフセットは、各イベントに含まれるソース固有の位置情報を示します。PostgreSQL コネクターでは、各変更イベントに記録された LSN がオフセットです。

Kafka Connect が正常にシャットダウンすると、コネクターを停止し、すべてのイベントレコードを Kafka にフラッシュして、各コネクターから受け取った最後のオフセットを記録します。Kafka Connect の再起動時に、各コネクターの最後に記録されたオフセットを読み取り、最後に記録されたオフセットで各コネクターを起動します。コネクターを再起動すると、PostgreSQL サーバーにリクエストを送信し、その位置の直後に開始されるイベントを送信します。

注記

PostgreSQL コネクターは、論理デコードプラグインによって送信されるイベントの一部としてスキーマ情報を取得します。ただし、コネクターはプライマリーキーが設定される列に関する情報を取得しません。コネクターは JDBC メタデータ (サイドチャネル) からこの情報を取得します。テーブルのプライマリーキー定義が変更される場合 (プライマリーキー列の追加、削除、または名前変更によって)、変更される場合、JDBC からのプライマリーキー情報が論理デコードプラグインが生成する変更イベントと同期されないごくわずかな期間が発生します。このごくわずかな期間に、キーの構造が不整合な状態でメッセージが作成される可能性があります。不整合にならないようにするには、以下のようにプライマリーキーの構造を更新します。

  1. データベースまたはアプリケーションを読み取り専用モードにします。
  2. Debezium に残りのイベントをすべて処理させます。
  3. Debezium を停止します。
  4. 関連するテーブルのプライマリーキー定義を更新します。
  5. データベースまたはアプリケーションを読み取り/書き込みモードにします。
  6. Debezium を再起動します。

PostgreSQL 10+ 論理デコードサポート (pgoutput)

PostgreSQL 10+ の時点で、PostgreSQL でネイティブにサポートされる pgoutput と呼ばれる論理レプリケーションストリームモードがあります。つまり、Debezium PostgreSQL コネクターは追加のプラグインを必要とせずにそのレプリケーションストリームを使用できます。これは、プラグインのインストールがサポートされないまたは許可されない環境で特に便利です。

詳細は、PostgreSQL の設定 を参照してください。

8.2.6. Debezium PostgreSQL の変更イベントレコードを受信する Kafka トピックのデフォルト名

デフォルトでは、PostgreSQL コネクターは、テーブルで発生するすべての INSERTUPDATEDELETE 操作の変更イベントを、そのテーブルに固有の単一の Apache Kafka トピックに書き込みます。コネクターは以下の規則を使用して変更イベントトピックに名前を付けます。

topicPrefix.schemaName.tableName

以下のリストは、デフォルト名のコンポーネントの定義を示しています。

topicPrefix
topic.prefix コネクター設定プロパティーで指定されたトピック接頭辞。
schemaName
変更イベントが発生したデータベーススキーマの名前。
tableName
変更イベントが発生したデータベーステーブルの名前。

たとえば、postgres データベースとproductsproducts_on_handcustomersorders の 4 つのテーブルを含む inventory スキーマを持つ PostgreSQL インストレーションの変更をキャプチャーするコネクターの設定において、fulfillment が論理的なサーバー名であるとします。コネクターは以下の 4 つの Kafka トピックにレコードをストリーミングします。

  • fulfillment.inventory.products
  • fulfillment.inventory.products_on_hand
  • fulfillment.inventory.customers
  • fulfillment.inventory.orders

テーブルは特定のスキーマの一部ではなく、デフォルトの public PostgreSQL スキーマで作成されたとします。Kafka トピックの名前は以下になります。

  • fulfillment.public.products
  • fulfillment.public.products_on_hand
  • fulfillment.public.customers
  • fulfillment.public.orders

コネクターは、同様の命名規則を適用して、トランザクションメタデータのトピック をラベル付けします。

デフォルトのトピック名が要件を満たさない場合は、カスタムトピック名を設定できます。カスタムトピック名を設定するには、論理トピックルーティング SMT に正規表現を指定します。論理トピックルーティング SMT を使用してトピックの命名をカスタマイズする方法は、トピックルーティング を参照してください。

8.2.7. トランザクション境界を表す Debezium PostgreSQL コネクターによって生成されたイベント

Debezium は、トランザクション境界を表し、データ変更イベントメッセージをエンリッチするイベントを生成できます。

Debezium がトランザクションメタデータを受信する場合の制限

Debezium は、コネクターのデプロイ後に発生するトランザクションに対してのみメタデータを登録し、受信します。コネクターをデプロイする前に発生するトランザクションのメタデータは利用できません。

Debezium はすべてのトランザクションの BEGIN および END に対して、以下のフィールドが含まれるイベントを生成します。

status
BEGIN または END
id
Postgres トランザクション ID 自体と、コロンで区切られた特定の操作の LSN で構成される一意のトランザクション識別子の文字列表現。形式は txID:LSN です。
ts_ms
データソースでのトランザクション境界イベント (BEGIN または END イベント) の時間。もしデータソースが Debezium にイベント時間を提供しないなら、このフィールドは代わりに Debezium がイベントを処理する時間を表します。
event_count (END イベント用)
トランザクションによって出力されるイベントの合計数。
data_collections (END イベント用)
data_collectionevent_count 要素のペアの配列。これは、コネクターがデータコレクションから発信された変更に対して出力するイベントの数を示します。

{
  "status": "BEGIN",
  "id": "571:53195829",
  "ts_ms": 1486500577125,
  "event_count": null,
  "data_collections": null
}

{
  "status": "END",
  "id": "571:53195832",
  "ts_ms": 1486500577691,
  "event_count": 2,
  "data_collections": [
    {
      "data_collection": "s1.a",
      "event_count": 1
    },
    {
      "data_collection": "s2.a",
      "event_count": 1
    }
  ]
}

topic.transaction オプションで上書きされない限り、トランザクションイベントは <topic.prefix>.transaction という名前のトピックに書き込まれます。

変更データイベントのエンリッチメント

トランザクションメタデータを有効にすると、データメッセージ Envelope は新しい transaction フィールドでエンリッチされます。このフィールドは、複合フィールドの形式ですべてのイベントに関する情報を提供します。

id
一意のトランザクション識別子の文字列表現。
total_order
トランザクションによって生成されたすべてのイベントを対象とするイベントの絶対位置。
data_collection_order
トランザクションによって出力されたすべてのイベントを対象とするイベントのデータコレクションごとの位置。

以下は、メッセージの例になります。

{
  "before": null,
  "after": {
    "pk": "2",
    "aa": "1"
  },
  "source": {
   ...
  },
  "op": "c",
  "ts_ms": "1580390884335",
  "transaction": {
    "id": "571:53195832",
    "total_order": "1",
    "data_collection_order": "1"
  }
}
Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.