3.2. Debezium Db2 コネクターの仕組み


Debezium Db2 コネクターを最適に設定および実行するには、コネクターによるスナップショットの実行方法、変更イベントのストリーム方法、Kafka トピック名の決定方法、およびスキーマ変更の処理方法を理解すると便利です。

詳細は以下を参照してください。

3.2.1. Debezium Db2 コネクターによるデータベーススナップショットの実行方法

Db2 のレプリケーション機能は、データベース変更の完全な履歴を保存するようには設計されていません。そのため、Debezium Db2 コネクターが初めてデータベースに接続すると、キャプチャーモードのテーブルの整合性スナップショットを作成し、この状態を Kafka にストリーミングします。これにより、テーブルの内容のベースラインが確立されます。

デフォルトでは、Db2 コネクターがスナップショットを実行すると、以下が実行されます。

  1. キャプチャーモードになっているテーブルを判断するため、スナップショットに含まれなければならないテーブルも判断されます。デフォルトでは、システム以外のテーブルはすべてキャプチャーモードになっています。table.exclude.listtable.include.list などのコネクター設定プロパティーを使用すると、キャプチャーモードである必要があるテーブルを指定できます。
  2. キャプチャーモードの各テーブルでロックを取得します。これにより、スナップショットの実行中にこれらのテーブルでスキーマの変更が発生しないようにします。ロックのレベルは、snapshot.isolation.mode コネクター設定プロパティーによって決定されます。
  3. サーバーのトランザクションログで、最大 (最新) の LSN の位置を読み取ります。
  4. キャプチャーモードになっているすべてのテーブルのスキーマをキャプチャーします。コネクターは、内部データベース履歴トピックでこの情報を永続化します。
  5. 必要に応じて、ステップ 2 で取得したロックを解放します。通常、これらのロックは短期間のみ保持されます。
  6. ステップ 3 で読み取られた LSN の位置で、コネクターはキャプチャーモードテーブルとそれらのスキーマをスキャンします。スキャン中、コネクターは以下を行います。

    1. スナップショットの開始前に、テーブルが作成されたことを確認します。そうでない場合は、スナップショットはそのテーブルをスキップします。スナップショットが完了し、コネクターが変更イベントの出力を開始した後、コネクターはスナップショットの実行中に作成されたテーブルの変更イベントを生成します。
    2. キャプチャーモードになっている各テーブルで、各行の 読み取り イベントを生成します。すべての 読み取り イベントには、LSN の位置が含まれ、これはステップ 3 で取得した LSN の位置と同じです。
    3. テーブルと同じ名前を持つ Kafka トピックに各 読み取り イベントを出力します。
  7. コネクターオフセットにスナップショットの正常な完了を記録します。

3.2.1.1. アドホックスナップショット

重要

アドホックスナップショットの使用はテクノロジープレビュー機能です。テクノロジープレビュー機能は、Red Hat の実稼働環境のサービスレベルアグリーメント (SLA) ではサポートされません。また、機能的に完全ではない可能性があるため、Red Hat はテクノロジープレビュー機能を実稼働環境に実装することは推奨しません。テクノロジープレビューの機能は、最新の技術をいち早く提供して、開発段階で機能のテストやフィードバックの収集を可能にするために提供されます。サポート範囲の詳細は、テクノロジープレビュー機能のサポート範囲 を参照してください。

デフォルトでは、コネクターは初回スナップショット操作の開始後にのみ実行されます。通常の状況では、この最初のスナップショットが作成されると、コネクターではスナップショットプロセスは繰り返し処理されません。コネクターがキャプチャーする今後の変更イベントデータはストリーミングプロセス経由でのみ行われます。

ただし、場合によっては、最初のスナップショット中にコネクターを取得したデータが古くなったり、失われたり、または不完全となったり可能性があります。テーブルデータを再キャプチャーするメカニズムを提供するため、Debezium にはアドホックスナップショットを実行するオプションがあります。データベースで以下が変更されたことで、アドホックスナップショットが実行される場合があります。

  • コネクター設定は、異なるテーブルセットをキャプチャーするように変更されます。
  • Kafka トピックを削除して、再構築する必要があります。
  • 設定エラーや他の問題が原因で、データの破損が発生します。

アドホックと呼ばれるスナップショット を開始することで、以前にスナップショットをキャプチャーしたテーブルのスナップショットを再実行できます。アドホックスナップショットには、シグナルテーブル を使用する必要があります。シグナルリクエストを Debezium シグナルテーブルに送信して、アドホックスナップショットを開始します。

既存のテーブルのアドホックスナップショットを開始すると、コネクターはテーブルにすでに存在するトピックにコンテンツを追加します。既存のトピックが削除された場合には、トピックの自動作成 が有効になっているのであれば、Debezium は自動的にトピックを作成できます。

アドホックのスナップショットシグナルは、スナップショットに追加するテーブルを指定します。スナップショットは、データベースの内容全体をキャプチャーしたり、データベース内のテーブルのサブセットのみをキャプチャーしたりできます。

execute-snapshot メッセージをシグナルテーブルに送信してキャプチャーするテーブルを指定します。以下の表で説明されているように、run-snapshot シグナルのタイプを incremental に設定し、スナップショットに追加するテーブルの名前を指定します。

表3.1 アドホックの execute-snapshot シグナルレコードの例
フィールドデフォルト

type

incremental

実行するスナップショットのタイプを指定します。
タイプの設定は任意です。現在要求できるのは、incremental スナップショットのみです。

data-collections

該当なし

スナップショットを作成するテーブルの完全修飾名が含まれる配列。
名前の形式は signal.data.collection 設定オプションと同じです。

アドホックスナップショットのトリガー

execute-snapshot シグナルタイプのエントリーをシグナルテーブルに追加して、アドホックスナップショットを開始します。コネクターがメッセージを処理した後に、スナップショット操作を開始します。スナップショットプロセスは、最初と最後のプライマリーキーの値を読み取り、これらの値を各テーブルの開始ポイントおよびエンドポイントとして使用します。テーブルのエントリー数と設定されたチャンクサイズに基づいて、Debezium はテーブルをチャンクに分割し、チャンクごとに 1 度に 1 つずつスナップショットを順番に作成していきます。

現在、execute-snapshot アクションタイプは 増分スナップショット のみをトリガーします。詳細は、スナップショットの増分を参照してください。

3.2.1.2. 増分スナップショット

重要

増分スナップショットの使用はテクノロジープレビュー機能です。テクノロジープレビュー機能は、Red Hat の実稼働環境のサービスレベルアグリーメント (SLA) ではサポートされません。また、機能的に完全ではない可能性があるため、Red Hat はテクノロジープレビュー機能を実稼働環境に実装することは推奨しません。テクノロジープレビューの機能は、最新の技術をいち早く提供して、開発段階で機能のテストやフィードバックの収集を可能にするために提供されます。サポート範囲の詳細は、テクノロジープレビュー機能のサポート範囲 を参照してください。

スナップショットを柔軟に管理するため、Debezium には 増分スナップショット と呼ばれる補助スナップショットメカニズムが含まれています。増分スナップショットは、Debezium コネクターにシグナルを送信するための Debezium メカニズムに依存します。

増分スナップショットでは、最初のスナップショットのように、データベースの完全な状態を一度にすべてキャプチャーする代わりに、一連の設定可能なチャンクで各テーブルを段階的にキャプチャーします。スナップショットがキャプチャーするテーブルと、各チャンクのサイズ を指定できます。チャンクのサイズにより、データベース上の各フェッチ操作中にスナップショットで収集される行数が決まります。増分スナップショットのデフォルトのチャンクサイズは 1 KB です。

増分スナップショットが進むと、Debezium はウォーターマークを使用して進捗を追跡し、キャプチャーする各テーブル行のレコードを管理します。この段階的なアプローチでは、標準の初期スナップショットプロセスと比較して、以下の利点があります。

  • スナップショットが完了するまで、ストリーミングストリーミングを延期する代わりに、ストリームしたデータキャプチャーと並行して増分スナップショットを実行できます。コネクターはスナップショットプロセス全体で変更ログからのほぼリアルタイムイベントをキャプチャーし続け、他の操作はブロックしません。
  • 増分スナップショットの進捗が中断された場合は、データを失うことなく再開できます。プロセスが再開すると、スナップショットは最初からテーブルをキャプチャーするのではなく、停止した時点から開始します。
  • いつでも増分スナップショットを実行し、必要に応じてプロセスを繰り返してデータベースの更新に適合できます。たとえば、コネクター設定を変更してテーブルを table.include.list プロパティーに追加した後にスナップショットを再実行します。

増分スナップショットプロセス

増分スナップショットを実行する場合には、Debezium は各テーブルをプライマリーキー別に分類して、設定されたチャンクサイズ に基づいてテーブルをチャンクに分割します。チャンクごとに作業し、テーブルの行ごとにチャンクでキャプチャーします。キャプチャーする行ごとに、スナップショットは READ イベントを出力します。そのイベントは、対象となるチャンクのスナップショットを開始する時の行の値を表します。

スナップショットの作成が進むにつれ、他のプロセスがデータベースへのアクセスを継続し、テーブルレコードが変更される可能性があります。このような変更を反映させるように、通常通りに INSERTUPDATEDELETE 操作がトランザクションログにコミットされます。同様に、継続中の Debezium ストリーミングプロセスは、これらの変更イベントを検出し、対応する変更イベントレコードを Kafka に出力します。

Debezium を使用してプライマリーキーが同じレコード間での競合を解決する方法

場合によっては、ストリーミングプロセスが出力する UPDATE または DELETE イベントを順番に受信できます。つまり、ストリーミングプロセスは、スナップショットがその行の READ イベントが含まれるチャンクをキャプチャーする前に、テーブルの行を変更するイベントを生成する可能性があります。スナップショットが最終的に対象の行にあった READ イベントを出力すると、その値はすでに置き換えられています。Debezium は、シーケンスが到達する増分スナップショットイベントが正しい論理順序で処理されるように、競合を解決するためにバッファースキームを使用します。スナップショットのイベント間で競合が発生し、ストリームされたイベントが解決されてからでないと、Debezium はイベントのレコードを Kafka に送信しません。

スナップショットウィンドウ

遅れて入ってきた READ イベントと、同じテーブルの行を変更するストリーミングイベント間の競合の解決を容易にするために、Debezium は スナップショットウィンドウ と呼ばれるものを使用します。スナップショットウィンドウは、増分スナップショットが指定のテーブルチャンクのデータをキャプチャーしている途中に、間隔を決定します。チャンクのスナップショットウィンドウを開く前に、Debezium は通常の動作に従い、トランザクションログから直接ターゲットの Kafka トピックにイベントをダウンストリームに出力します。ただし、特定のチャンクのスナップショットが開放された瞬間から終了するまで、Debezium は重複除去のステップを実行して、プライマリーキーが同じイベント間での競合を解決します。

データコレクションごとに、Debezium は 2 種類のイベントを出力し、それらの両方のレコードを単一の宛先 Kafka トピックに保存します。テーブルから直接キャプチャーするスナップショットレコードは、READ 操作として出力されます。その間、ユーザーはデータコレクションのレコードの更新を続け、各コミットを反映するようにトランザクションログが更新されるので、Debezium は変更ごとに UPDATE または DELETE 操作を出力します。

スナップショットウィンドウが開放され、Debezium がスナップショットチャンクの処理を開始すると、スナップショットレコードをメモリーバッファーに提供します。スナップショットウィンドウ中に、バッファー内の READ イベントのプライマリーキーは、受信ストリームイベントのプライマリーキーと比較されます。一致するものが見つからない場合、ストリーミングされたイベントレコードが Kafka に直接送信されます。Debezium が一致を検出すると、バッファーされた READ イベントを破棄し、ストリーミングされたレコードを宛先トピックに書き込みます。これは、ストリーミングされたイベントが静的スナップショットイベントよりも論理的に優先されるためです。チャンクのスナップショットウィンドウが終了すると、バッファーに含まれるのは、関連するトランザクションログイベントが存在しない READ イベントのみです。Debezium は、これらの残りの READ イベントをテーブルの Kafka トピックに出力します。

コネクターは各スナップショットチャンクにプロセスを繰り返します。

増分スナップショットのトリガー

現在、増分スナップショットを開始する唯一の方法は、アドホックスナップショットシグナル をソースデータベースのシグナルテーブルに送信することです。SQL INSERT クエリーとしてテーブルにシグナルを送信します。Debezium がシグナルテーブルの変更を検出すると、シグナルを読み取り、要求されたスナップショット操作を実行します。

送信するクエリーはスナップショットに追加するテーブルを指定し、必要に応じてスナップショット操作の種類を指定します。現在、スナップショット操作で唯一の有効なオプションはデフォルト値の incremental だけです。

スナップショットに追加するテーブルを指定するには、テーブルをリスト表示する data-collectionsアレイを指定します (例:
{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]})。

増分スナップショットシグナルの data-collections アレイにはデフォルト値がありません。data-collections アレイが空である場合には、アクションが不要であり、スナップショットを実行しないことが、Debezium で検出されます。

前提条件

  • シグナルが有効になっている

    • シグナルデータコレクションがソースのデータベースに存在し、コネクターはこれをキャプチャーするように設定されています。
    • シグナルデータコレクションは signal.data.collection プロパティーで指定されます。

手順

  1. SQL クエリーを送信し、アドホック増分スナップショット要求をシグナルテーブルに追加します。

    INSERT INTO _<signalTable>_ (id, type, data) VALUES (_'<id>'_, _'<snapshotType>'_, '{"data-collections": ["_<tableName>_","_<tableName>_"],"type":"_<snapshotType>_"}');

    以下に例を示します。

    INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.table1", "schema2.table2"],"type":"incremental"}');

    コマンドの idtype、および data パラメーターの値は、シグナルテーブルのフィールド に対応します。

    以下の表では、これらのパラメーターについて説明しています。

    表3.2 シグナルテーブルに増分スナップショットシグナルを送信する SQL コマンドのフィールドの説明
    説明

    myschema.debezium_signal

    ソースデータベースにあるシグナルテーブルの完全修飾名を指定します。

    ad-hoc-1

    id パラメーターは、シグナルリクエストの ID 識別子として割り当てられる任意の文字列を指定します。
    この文字列を使用して、シグナルテーブルのエントリーへのログメッセージを特定します。Debezium はこの文字列を使用しません。代わりに、スナップショット作成中に、Debezium は独自の ID 文字列をウォーターマークシグナルとして生成します。

    execute-snapshot

    type パラメーターを指定し、シグナルがトリガーする操作を指定します。

    data-collections

    スナップショットに含めるテーブル名の配列を指定するシグナルの dataフィールドの必須コンポーネント。
    配列は、signal.data.collection 設定プロパティーにコネクターのシグナルテーブルの名前を指定するときに使用する形式で、完全修飾名別にテーブルをリスト表示します。

    incremental

    実行するスナップショット操作の種類指定するシグナルの data フィールドの任意のtype コンポーネント。
    現在、唯一の有効なオプションはデフォルト値 incremental だけです。
    シグナルテーブルに送信する SQL クエリーでの type 値の指定は任意です。
    値を指定しない場合には、コネクターは増分スナップショットを実行します。

以下の例は、コネクターによってキャプチャーされる増分スナップショットイベントの JSON を示しています。

例: 増分スナップショットイベントメッセージ

{
    "before":null,
    "after": {
        "pk":"1",
        "value":"New data"
    },
    "source": {
        ...
        "snapshot":"incremental" 1
    },
    "op":"r", 2
    "ts_ms":"1620393591654",
    "transaction":null
}

項目フィールド名説明

1

snapshot

実行するスナップショット操作タイプを指定します。
現在、唯一の有効なオプションはデフォルト値 incremental だけです。
シグナルテーブルに送信する SQL クエリーでの type 値の指定は任意です。
値を指定しない場合には、コネクターは増分スナップショットを実行します。

2

op

イベントタイプを指定します。
スナップショットイベントの値は r で、READ 操作を示します。

警告

Db2 の Debezium コネクターでは、増分スナップショットの実行中のスキーマの変更はサポートしません。

3.2.2. Debezium Db2 コネクターによる変更データテーブルの読み取り方法

スナップショットの完了後、Debezium Db2 コネクターが初めて起動すると、キャプチャーモードである各ソーステーブルの変更データテーブルを識別します。コネクターは各変更データテーブルに対して以下を行います。

  1. 最後に保存された最大 LSN から現在の最大 LSN の間に作成された変更イベントを読み取ります。
  2. 各イベントのコミット LSN および変更 LSN に従って、変更イベントを順序付けます。これにより、コネクターはテーブルが変更された順序で変更イベントを出力します。
  3. コミット LSN および変更 LSN をオフセットとして Kafka Connect に渡します。
  4. コネクターが Kafka Connect に渡した最大 LSN を保存します。

再起動後、コネクターは停止した場所でオフセット (コミット LSN および変更 LSN) から変更イベントの出力を再開します。コネクターが稼働し、変更イベントを出力している間、キャプチャーモードからテーブルを削除したり、テーブルをキャプチャーモードに追加したりすると、コネクターは変更を検出して、それに合わせて動作を変更します。

3.2.3. Debezium Db2 変更イベントレコードを受信する Kafka トピックのデフォルト名

デフォルトでは、Db2 コネクターは、テーブルで発生するすべての INSERTUPDATEDELETE 操作の変更イベントを、そのテーブルに固有の単一の Apache Kafka トピックに書き込みます。コネクターは以下の規則を使用して変更イベントトピックに名前を付けます。

databaseName.schemaName.tableName

以下のリストは、デフォルト名のコンポーネントの定義を示しています。

databaseName
database.server.name コネクター設定プロパティーで指定したコネクターの論理名です。
schemaName
操作が発生したスキーマの名前。
tableName
操作が発生したテーブルの名前。

たとえば、MYSCHEMA スキーマに 4 つのテーブル (PRODUCTSPRODUCTS_ON_HANDCUSTOMERSORDERS) を含む mydatabase データベースを使用した Db2 インストールについて考えてみます。コネクターはイベントを以下の 4 つの Kafka トピックに出力します。

  • mydatabase.MYSCHEMA.PRODUCTS
  • mydatabase.MYSCHEMA.PRODUCTS_ON_HAND
  • mydatabase.MYSCHEMA.CUSTOMERS
  • mydatabase.MYSCHEMA.ORDERS

コネクターは同様の命名規則を適用して、内部データベース履歴トピック (スキーマ変更トピックトランザクションメタデータトピック) にラベルを付けます。

デフォルトのトピック名が要件を満たさない場合は、カスタムトピック名を設定できます。カスタムトピック名を設定するには、論理トピックルーティング SMT に正規表現を指定します。論理トピックルーティング SMT を使用してトピックの命名をカスタマイズする方法は、トピックルーティング を参照してください。

3.2.4. Debezium Db2 コネクターのスキーマ変更トピック

Debezium Db2 コネクターを設定すると、データベースのキャプチャーされたテーブルに適用されるスキーマの変更を記述するスキーマ変更イベントを生成できます。

Debezium は、以下の場合にスキーマ変更トピックにメッセージを出力します。

  • 新しいテーブルがキャプチャーモードになる。
  • テーブルがキャプチャーモードから削除される。
  • データベーススキーマの更新 中に、キャプチャーモードであるテーブルのスキーマが変更される。

コネクターは、スキーマ変更イベントをすべて <serverName> という名前の Kafka トピックに書き込みます。<serverName>database.server.name 設定プロパティーに指定されたコネクターの名前に置き換えます。コネクターがスキーマ変更トピックに送信するメッセージには以下の要素などのペイロードが含まれます。

databaseName
ステートメントが適用されるデータベースの名前。databaseName の値は、メッセージキーとして機能します。
pos
ステートメントが表示される binlog の位置。
tableChanges
スキーマの変更後のテーブルスキーマ全体の構造化表現。tableChanges フィールドには、テーブルの各列のエントリーなどのアレイが含まれます。構造化された表現は JSON または Avro 形式でデータを表示するため、コンシューマーは DDL パーサーを介して最初にメッセージを処理しなくてもメッセージを簡単に読み取りできます。
重要

キャプチャーモードであるテーブルでは、コネクターはスキーマ変更トピックにスキーマ変更の履歴だけでなく、内部データベース履歴トピックにも格納します。内部データベース履歴トピックはコネクターのみの使用を対象としており、使用するアプリケーションによる直接使用を目的としていません。スキーマ変更に関する通知が必要なアプリケーションが、スキーマ変更トピックからの情報のみを使用するようにしてください。

重要

データベース履歴トピックをパーティションに分割しないでください。データベース履歴トピックが正しく機能するには、コネクターが出力するイベントレコードの一貫したグローバル順序を維持する必要があります。

トピックがパーティション間で分割されないようにするには、以下のいずれかの方法を使用してトピックのパーティション数を設定します。

  • データベース履歴トピックを手動で作成する場合は、パーティション数を 1 に指定します。
  • Apache Kafka ブローカーを使用してデータベース履歴トピックを自動的に作成する場合に、トピックが作成されるので、Kafka num.partitions 設定オプションの値を 1 に設定します。
警告

コネクターがスキーマ変更トピックに出力するメッセージの形式は、初期の状態であり、通知なしに変更される可能性があります。

例: Db2 コネクターのスキーマ変更トピックに出力されるメッセージ

以下の例は、スキーマ変更トピックのメッセージを示しています。メッセージには、テーブルスキーマの論理表現が含まれます。

{
  "schema": {
  ...
  },
  "payload": {
    "source": {
      "version": "1.7.2.Final",
      "connector": "db2",
      "name": "db2",
      "ts_ms": 1588252618953,
      "snapshot": "true",
      "db": "testdb",
      "schema": "DB2INST1",
      "table": "CUSTOMERS",
      "change_lsn": null,
      "commit_lsn": "00000025:00000d98:00a2",
      "event_serial_no": null
    },
    "databaseName": "TESTDB", 1
    "schemaName": "DB2INST1",
    "ddl": null, 2
    "tableChanges": [ 3
      {
        "type": "CREATE", 4
        "id": "\"DB2INST1\".\"CUSTOMERS\"", 5
        "table": { 6
          "defaultCharsetName": null,
          "primaryKeyColumnNames": [ 7
            "ID"
          ],
          "columns": [ 8
            {
              "name": "ID",
              "jdbcType": 4,
              "nativeType": null,
              "typeName": "int identity",
              "typeExpression": "int identity",
              "charsetName": null,
              "length": 10,
              "scale": 0,
              "position": 1,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            },
            {
              "name": "FIRST_NAME",
              "jdbcType": 12,
              "nativeType": null,
              "typeName": "varchar",
              "typeExpression": "varchar",
              "charsetName": null,
              "length": 255,
              "scale": null,
              "position": 2,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            },
            {
              "name": "LAST_NAME",
              "jdbcType": 12,
              "nativeType": null,
              "typeName": "varchar",
              "typeExpression": "varchar",
              "charsetName": null,
              "length": 255,
              "scale": null,
              "position": 3,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            },
            {
              "name": "EMAIL",
              "jdbcType": 12,
              "nativeType": null,
              "typeName": "varchar",
              "typeExpression": "varchar",
              "charsetName": null,
              "length": 255,
              "scale": null,
              "position": 4,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            }
          ]
        }
      }
    ]
  }
}
表3.3 スキーマ変更トピックに出力されたメッセージのフィールドの説明
項目フィールド名説明

1

databaseName
schemaName

変更が含まれるデータベースとスキーマを識別します。

2

ddl

Db2 コネクターの場合は常に null です。その他のコネクターでは、このフィールドにスキーマの変更を行う DDL が含まれます。この DDL は Db2 コネクターでは使用できません。

3

tableChanges

DDL コマンドによって生成されるスキーマの変更が含まれる 1 つ以上の項目の配列。

4

type

変更の種類を説明します。値は以下のいずれかになります。

  • CREATE - テーブルの作成
  • ALTER - テーブルの変更
  • DROP - テーブルの削除

5

id

作成、変更、または破棄されたテーブルの完全な識別子。

6

table

適用された変更後のテーブルメタデータを表します。

7

primaryKeyColumnNames

テーブルのプライマリーキーを設定する列のリスト。

8

変更されたテーブルの各列のメタデータ。

コネクターがスキーマ変更トピックに送信するメッセージでは、メッセージキーはスキーマの変更が含まれるデータベースの名前です。以下の例では、payload フィールドにキーが含まれます。

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "string",
        "optional": false,
        "field": "databaseName"
      }
    ],
    "optional": false,
    "name": "io.debezium.connector.db2.SchemaChangeKey"
  },
  "payload": {
    "databaseName": "TESTDB"
  }
}

3.2.5. トランザクション境界を表す Debezium Db2 コネクターによって生成されたイベント

Debezium は、トランザクション境界を表し、変更データイベントメッセージをエンリッチするイベントを生成できます。

Debezium がトランザクションメタデータを受信する場合の制限

Debezium は、コネクターのデプロイ後に発生するトランザクションに対してのみメタデータを登録し、受信します。コネクターをデプロイする前に発生するトランザクションのメタデータは利用できません。

Debezium は、すべてのトランザクションで BEGIN および END 区切り文字のトランザクション境界イベントを生成します。トランザクション境界イベントには以下のフィールドが含まれます。

status
BEGIN または END
id
一意のトランザクション識別子の文字列表現。
event_count (END イベント用)
トランザクションによって出力されるイベントの合計数。
data_collections (END イベント用)
data_collectionevent_count 要素のペアの配列。これは、コネクターがデータコレクションから発信された変更に対して出力するイベントの数を示します。

{
  "status": "BEGIN",
  "id": "00000025:00000d08:0025",
  "event_count": null,
  "data_collections": null
}

{
  "status": "END",
  "id": "00000025:00000d08:0025",
  "event_count": 2,
  "data_collections": [
    {
      "data_collection": "testDB.dbo.tablea",
      "event_count": 1
    },
    {
      "data_collection": "testDB.dbo.tableb",
      "event_count": 1
    }
  ]
}

コネクターはトランザクションイベントを <database.server.name>.transaction トピックに出力します。

データ変更イベントのエンリッチメント

トランザクションメタデータを有効にすると、コネクターは変更イベント Envelope を新しい transaction フィールドでエンリッチします。このフィールドは、複合フィールドの形式ですべてのイベントに関する情報を提供します。

id
一意のトランザクション識別子の文字列表現。
total_order
トランザクションによって生成されたすべてのイベントを対象とするイベントの絶対位置。
data_collection_order
トランザクションによって出力されたすべてのイベントを対象とするイベントのデータコレクションごとの位置。

以下は、メッセージの例になります。

{
  "before": null,
  "after": {
    "pk": "2",
    "aa": "1"
  },
  "source": {
...
  },
  "op": "c",
  "ts_ms": "1580390884335",
  "transaction": {
    "id": "00000025:00000d08:0025",
    "total_order": "1",
    "data_collection_order": "1"
  }
}
Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.