3.2. Debezium Db2 コネクターの仕組み


Debezium Db2 コネクターを最適に設定および実行するには、コネクターによるスナップショットの実行方法、変更イベントのストリーム方法、Kafka トピック名の決定方法、およびスキーマ変更の処理方法を理解すると便利です。

詳細は以下を参照してください。

3.2.1. Debezium Db2 コネクターによるデータベーススナップショットの実行方法

Db2 のレプリケーション機能は、データベース変更の完全な履歴を保存するようには設計されていません。そのため、Debezium Db2 コネクターが初めてデータベースに接続すると、キャプチャーモードのテーブルの整合性スナップショットを作成し、この状態を Kafka にストリーミングします。これにより、テーブルの内容のベースラインが確立されます。

デフォルトでは、Db2 コネクターがスナップショットを実行すると、以下が実行されます。

  1. キャプチャーモードになっているテーブルを判断するため、スナップショットに含まれなければならないテーブルも判断されます。デフォルトでは、システム以外のテーブルはすべてキャプチャーモードになっています。table.exclude.listtable.include.list などのコネクター設定プロパティーを使用すると、キャプチャーモードである必要があるテーブルを指定できます。
  2. キャプチャーモードの各テーブルでロックを取得します。これにより、スナップショットの実行中にこれらのテーブルでスキーマの変更が発生しないようにします。ロックのレベルは、snapshot.isolation.mode コネクター設定プロパティーによって決定されます。
  3. サーバーのトランザクションログで、最大 (最新) の LSN の位置を読み取ります。
  4. キャプチャーモードになっているすべてのテーブルのスキーマをキャプチャーします。コネクターは、内部データベーススキーマ履歴トピックでこの情報を永続化します。
  5. 必要に応じて、ステップ 2 で取得したロックを解放します。通常、これらのロックは短期間のみ保持されます。
  6. ステップ 3 で読み取られた LSN の位置で、コネクターはキャプチャーモードテーブルとそれらのスキーマをスキャンします。スキャン中、コネクターは以下を行います。

    1. スナップショットの開始前に、テーブルが作成されたことを確認します。そうでない場合は、スナップショットはそのテーブルをスキップします。スナップショットが完了し、コネクターが変更イベントの出力を開始した後、コネクターはスナップショットの実行中に作成されたテーブルの変更イベントを生成します。
    2. キャプチャーモードになっている各テーブルで、各行の 読み取り イベントを生成します。すべての 読み取り イベントには、LSN の位置が含まれ、これはステップ 3 で取得した LSN の位置と同じです。
    3. テーブルと同じ名前を持つ Kafka トピックに各 読み取り イベントを出力します。
  7. コネクターオフセットにスナップショットの正常な完了を記録します。

3.2.1.1. アドホックスナップショット

デフォルトでは、コネクターは初回スナップショット操作の開始後にのみ実行されます。通常の状況では、この最初のスナップショットが作成されると、コネクターではスナップショットプロセスは繰り返し処理されません。コネクターがキャプチャーする今後の変更イベントデータはストリーミングプロセス経由でのみ行われます。

ただし、場合によっては、最初のスナップショット中にコネクターを取得したデータが古くなったり、失われたり、または不完全となったり可能性があります。テーブルデータを再キャプチャーするメカニズムを提供するため、Debezium にはアドホックスナップショットを実行するオプションがあります。データベースで以下が変更されたことで、アドホックスナップショットが実行される場合があります。

  • コネクター設定は、異なるテーブルセットをキャプチャーするように変更されます。
  • Kafka トピックを削除して、再構築する必要があります。
  • 設定エラーや他の問題が原因で、データの破損が発生します。

アドホックと呼ばれるスナップショット を開始することで、以前にスナップショットをキャプチャーしたテーブルのスナップショットを再実行できます。アドホックスナップショットには、シグナルテーブル を使用する必要があります。シグナルリクエストを Debezium シグナルテーブルに送信して、アドホックスナップショットを開始します。

既存のテーブルのアドホックスナップショットを開始すると、コネクターはテーブルにすでに存在するトピックにコンテンツを追加します。既存のトピックが削除された場合には、トピックの自動作成 が有効になっているのであれば、Debezium は自動的にトピックを作成できます。

アドホックのスナップショットシグナルは、スナップショットに追加するテーブルを指定します。スナップショットは、データベースの内容全体をキャプチャーしたり、データベース内のテーブルのサブセットのみをキャプチャーしたりできます。また、スナップショットは、データベース内のテーブルの内容のサブセットをキャプチャできます。

execute-snapshot メッセージをシグナルテーブルに送信してキャプチャーするテーブルを指定します。以下の表で説明されているように、run-snapshot シグナルのタイプを incremental に設定し、スナップショットに追加するテーブルの名前を指定します。

表3.1 アドホックの execute-snapshot シグナルレコードの例
フィールドデフォルトValue

type

incremental

実行するスナップショットのタイプを指定します。
タイプの設定は任意です。現在要求できるのは、incremental スナップショットのみです。

data-collections

該当なし

スナップショットされるテーブルの完全修飾名にマッチする正規表現を含む配列。
名前の形式は signal.data.collection 設定オプションと同じです。

additional-condition

該当なし

テーブルの内容のサブセットを取得するために、テーブルの列に基づいて条件を指定するオプションの文字列。

アドホックスナップショットのトリガー

execute-snapshot シグナルタイプのエントリーをシグナルテーブルに追加して、アドホックスナップショットを開始します。コネクターがメッセージを処理した後に、スナップショット操作を開始します。スナップショットプロセスは、最初と最後のプライマリーキーの値を読み取り、これらの値を各テーブルの開始ポイントおよびエンドポイントとして使用します。テーブルのエントリー数と設定されたチャンクサイズに基づいて、Debezium はテーブルをチャンクに分割し、チャンクごとに 1 度に 1 つずつスナップショットを順番に作成していきます。

現在、execute-snapshot アクションタイプは 増分スナップショット のみをトリガーします。詳細は、スナップショットの増分を参照してください。

3.2.1.2. 増分スナップショット

スナップショットを柔軟に管理するため、Debezium には 増分スナップショット と呼ばれる補助スナップショットメカニズムが含まれています。増分スナップショットは、Debezium コネクターにシグナルを送信するための Debezium メカニズムに依存します。

増分スナップショットでは、最初のスナップショットのように、データベースの完全な状態を一度にすべてキャプチャーする代わりに、一連の設定可能なチャンクで各テーブルを段階的にキャプチャーします。スナップショットがキャプチャーするテーブルと、各チャンクのサイズ を指定できます。チャンクのサイズにより、データベース上の各フェッチ操作中にスナップショットで収集される行数が決まります。増分スナップショットのデフォルトのチャンクサイズは 1 KB です。

増分スナップショットが進むと、Debezium はウォーターマークを使用して進捗を追跡し、キャプチャーする各テーブル行のレコードを管理します。この段階的なアプローチでは、標準の初期スナップショットプロセスと比較して、以下の利点があります。

  • スナップショットが完了するまで、ストリーミングストリーミングを延期する代わりに、ストリームしたデータキャプチャーと並行して増分スナップショットを実行できます。コネクターはスナップショットプロセス全体で変更ログからのほぼリアルタイムイベントをキャプチャーし続け、他の操作はブロックしません。
  • 増分スナップショットの進捗が中断された場合は、データを失うことなく再開できます。プロセスが再開すると、スナップショットは最初からテーブルをキャプチャーするのではなく、停止した時点から開始します。
  • いつでも増分スナップショットを実行し、必要に応じてプロセスを繰り返してデータベースの更新に適合できます。たとえば、コネクター設定を変更してテーブルを table.include.list プロパティーに追加した後にスナップショットを再実行します。

増分スナップショットプロセス

増分スナップショットを実行する場合には、Debezium は各テーブルをプライマリーキー別に分類して、設定されたチャンクサイズ に基づいてテーブルをチャンクに分割します。チャンクごとに作業し、テーブルの行ごとにチャンクでキャプチャーします。キャプチャーする行ごとに、スナップショットは READ イベントを出力します。そのイベントは、対象となるチャンクのスナップショットを開始する時の行の値を表します。

スナップショットの作成が進むにつれ、他のプロセスがデータベースへのアクセスを継続し、テーブルレコードが変更される可能性があります。このような変更を反映させるように、通常通りに INSERTUPDATEDELETE 操作がトランザクションログにコミットされます。同様に、継続中の Debezium ストリーミングプロセスは、これらの変更イベントを検出し、対応する変更イベントレコードを Kafka に出力します。

Debezium を使用してプライマリーキーが同じレコード間での競合を解決する方法

場合によっては、ストリーミングプロセスが出力する UPDATE または DELETE イベントを順番に受信できます。つまり、ストリーミングプロセスは、スナップショットがその行の READ イベントが含まれるチャンクをキャプチャーする前に、テーブルの行を変更するイベントを生成する可能性があります。スナップショットが最終的に対象の行にあった READ イベントを出力すると、その値はすでに置き換えられています。Debezium は、シーケンスが到達する増分スナップショットイベントが正しい論理順序で処理されるように、競合を解決するためにバッファースキームを使用します。スナップショットのイベント間で競合が発生し、ストリームされたイベントが解決されてからでないと、Debezium はイベントのレコードを Kafka に送信しません。

スナップショットウィンドウ

遅れて入ってきた READ イベントと、同じテーブルの行を変更するストリーミングイベント間の競合の解決を容易にするために、Debezium は スナップショットウィンドウ と呼ばれるものを使用します。スナップショットウィンドウは、増分スナップショットが指定のテーブルチャンクのデータをキャプチャーしている途中に、間隔を決定します。チャンクのスナップショットウィンドウを開く前に、Debezium は通常の動作に従い、トランザクションログから直接ターゲットの Kafka トピックにイベントをダウンストリームに出力します。ただし、特定のチャンクのスナップショットが開放された瞬間から終了するまで、Debezium は重複除去のステップを実行して、プライマリーキーが同じイベント間での競合を解決します。

データコレクションごとに、Debezium は 2 種類のイベントを出力し、それらの両方のレコードを単一の宛先 Kafka トピックに保存します。テーブルから直接キャプチャーするスナップショットレコードは、READ 操作として出力されます。その間、ユーザーはデータコレクションのレコードの更新を続け、各コミットを反映するようにトランザクションログが更新されるので、Debezium は変更ごとに UPDATE または DELETE 操作を出力します。

スナップショットウィンドウが開放され、Debezium がスナップショットチャンクの処理を開始すると、スナップショットレコードをメモリーバッファーに提供します。スナップショットウィンドウ中に、バッファー内の READ イベントのプライマリーキーは、受信ストリームイベントのプライマリーキーと比較されます。一致するものが見つからない場合、ストリーミングされたイベントレコードが Kafka に直接送信されます。Debezium が一致を検出すると、バッファーされた READ イベントを破棄し、ストリーミングされたレコードを宛先トピックに書き込みます。これは、ストリーミングされたイベントが静的スナップショットイベントよりも論理的に優先されるためです。チャンクのスナップショットウィンドウが終了すると、バッファーに含まれるのは、関連するトランザクションログイベントが存在しない READ イベントのみです。Debezium は、これらの残りの READ イベントをテーブルの Kafka トピックに出力します。

コネクターは各スナップショットチャンクにプロセスを繰り返します。

警告

Db2 の Debezium コネクターでは、増分スナップショットの実行中のスキーマの変更はサポートしません。

3.2.1.3. 増分スナップショットのトリガー

現在、増分スナップショットを開始する唯一の方法は、アドホックスナップショットシグナル をソースデータベースのシグナルテーブルに送信することです。

シグナルを SQL INSERT クエリーとしてシグナルテーブルに送信します。

Debezium がシグナルテーブルの変更を検出すると、シグナルを読み取り、要求されたスナップショット操作を実行します。

送信するクエリーはスナップショットに追加するテーブルを指定し、必要に応じてスナップショット操作の種類を指定します。現在、スナップショット操作で唯一の有効なオプションはデフォルト値の incremental だけです。

スナップショットに追加するテーブルを指定するには、テーブルをリストする data-collections 配列またはテーブルの照合に使用する正規表現の配列を指定します。以下に例を示します。

{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}

増分スナップショットシグナルの data-collections アレイにはデフォルト値がありません。data-collections アレイが空である場合には、アクションが不要であり、スナップショットを実行しないことが、Debezium で検出されます。

注記

スナップショットに追加するテーブルの名前に、データベース、スキーマ、またはテーブルの名前にドット(.)が含まれている場合、テーブルを data-collections 配列に追加するには、名前の各部分を二重引用符でエスケープする必要があります。

たとえば、public スキーマに存在し、My.Table という名前のテーブルを含めるには、"public"."My.Table" の形式を使用します。

前提条件

手順

  1. SQL クエリーを送信し、アドホック増分スナップショット要求をシグナルテーブルに追加します。

    Copy to Clipboard Toggle word wrap
    INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<tableName>","<tableName>"],"type":"<snapshotType>","additional-condition":"<additional-condition>"}');

    以下に例を示します。

    Copy to Clipboard Toggle word wrap
    INSERT INTO myschema.debezium_signal (id, type, data) 
    1
    
    values ('ad-hoc-1',   
    2
    
        'execute-snapshot',  
    3
    
        '{"data-collections": ["schema1.table1", "schema2.table2"], 
    4
    
        "type":"incremental"}, 
    5
    
        "additional-condition":"color=blue"}'); 
    6

    コマンドの idtype、および data パラメーターの値は、シグナルテーブルのフィールド に対応します。

    以下の表では、例のパラメーターについて説明しています。

    表3.2 シグナルテーブルに増分スナップショットシグナルを送信する SQL コマンドのフィールドの説明
    項目Value説明

    1

    myschema.debezium_signal

    ソースデータベースのシグナルテーブルの完全修飾名を指定します。

    2

    ad-hoc-1

    id パラメーターは、シグナルリクエストの ID 識別子として割り当てられる任意の文字列を指定します。
    この文字列を使用して、シグナルテーブルのエントリーへのログメッセージを特定します。Debezium はこの文字列を使用しません。代わりに、スナップショット作成中に、Debezium は独自の ID 文字列をウォーターマークシグナルとして生成します。

    3

    execute-snapshot

    type パラメーターは、シグナルがトリガーする操作を指定します。

    4

    data-collections

    シグナルの data フィールドの必須コンポーネントで、スナップショットに含めるテーブル名の配列またはテーブル名と一致する正規表現を指定します。
    この配列は、完全修飾名でテーブルをマッチさせる正規表現をリストアップします。signal.data.collection 設定プロパティーでコネクターのシグナリングテーブル名を指定するのと同じ形式を使用します。

    5

    incremental

    実行するスナップショット操作の種類指定するシグナルの data フィールドの任意のtype コンポーネント。
    現在、唯一の有効なオプションはデフォルト値 incremental だけです。
    値を指定しない場合には、コネクターは増分スナップショットを実行します。

    6

    additional-condition

    テーブルの内容のサブセットを取得するために、テーブルの列に基づいて条件を指定するオプションの文字列。additional-condition パラメーターの詳細は、追加条件のあるアドホック増分スナップショット を参照してください。

追加条件のあるアドホック増分スナップショット

スナップショットに、テーブル内のコンテンツのサブセットのみを含める場合は、スナップショットシグナルシグナルに additional-condition パラメーターを追加してシグナル要求を変更できます。

一般的なスナップショットの SQL クエリーは、以下の形式を取ります。

Copy to Clipboard Toggle word wrap
SELECT * FROM <tableName> ....

additional-condition パラメーターを追加して、以下の例のように WHERE 条件を SQL クエリーに追加します。

Copy to Clipboard Toggle word wrap
SELECT * FROM <tableName> WHERE <additional-condition> ....

以下の例は、シグナルテーブルに追加の条件を含むアドホック増分スナップショット要求を送信する SQL クエリーを示しています。

Copy to Clipboard Toggle word wrap
INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<tableName>","<tableName>"],"type":"<snapshotType>","additional-condition":"<additional-condition>"}');

たとえば、以下の列が含まれる products テーブルがあるとします。

  • id (プライマリーキー)
  • color
  • quantity

products テーブルの増分スナップショットに color=blue のデータ項目のみを含める場合は、以下の SQL ステートメントを使用してスナップショットをトリガーできます。

Copy to Clipboard Toggle word wrap
INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.products"],"type":"incremental", "additional-condition":"color=blue"}');

additional-condition パラメーターを使用すると、列を超える条件を渡すこともできます。たとえば、前述の例の products テーブルを使用して、color=blue および quantity>10 の項目のみのデータを含む増分スナップショットをトリガーするクエリーを送信できます。

Copy to Clipboard Toggle word wrap
INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.products"],"type":"incremental", "additional-condition":"color=blue AND quantity>10"}');

以下の例は、コネクターによってキャプチャーされる増分スナップショットイベントの JSON を示しています。

例: 増分スナップショットイベントメッセージ

Copy to Clipboard Toggle word wrap
{
    "before":null,
    "after": {
        "pk":"1",
        "value":"New data"
    },
    "source": {
        ...
        "snapshot":"incremental" 
1

    },
    "op":"r", 
2

    "ts_ms":"1620393591654",
    "transaction":null
}

項目フィールド名説明

1

snapshot

実行するスナップショット操作タイプを指定します。
現在、唯一の有効なオプションはデフォルト値 incremental だけです。
シグナルテーブルに送信する SQL クエリーでの type 値の指定は任意です。
値を指定しない場合には、コネクターは増分スナップショットを実行します。

2

op

イベントタイプを指定します。
スナップショットイベントの値は r で、READ 操作を示します。

3.2.1.4. 増分スナップショットの停止

ソースデータベースのテーブルにシグナルを送信することで、増分スナップショットを停止することもできます。SQL INSERT クエリーを送信して、停止スナップショットシグナルをテーブルに送信します。

Debezium はシグナルテーブルの変更を検出した後、シグナルを読み、増分スナップショット操作が進行中であればそれを停止します。

送信するクエリーは、incremental のスナップショット操作を指定し、任意で、削除する実行中のスナップショットのテーブルを指定します。

前提条件

手順

  1. SQL クエリーを送信して、シグナリングテーブルへのアドホックインクリメンタル スナップショットを停止します。

    Copy to Clipboard Toggle word wrap
    INSERT INTO <signalTable> (id, type, data) values ('<id>', 'stop-snapshot', '{"data-collections": ["<tableName>","<tableName>"],"type":"incremental"}');

    以下に例を示します。

    Copy to Clipboard Toggle word wrap
    INSERT INTO myschema.debezium_signal (id, type, data) 
    1
    
    values ('ad-hoc-1',   
    2
    
        'stop-snapshot',  
    3
    
        '{"data-collections": ["schema1.table1", "schema2.table2"], 
    4
    
        "type":"incremental"}'); 
    5

    signal コマンドの idtype、および data パラメーターの値は、シグナル テーブルのフィールドに対応し ます。

    以下の表では、例のパラメーターについて説明しています。

    表3.3 シグナリングテーブルに増分スナップショット停止信号を送信するための SQL コマンドのフィールドの説明
    項目Value説明

    1

    myschema.debezium_signal

    ソースデータベースのシグナルテーブルの完全修飾名を指定します。

    2

    ad-hoc-1

    id パラメーターは、シグナルリクエストの ID 識別子として割り当てられる任意の文字列を指定します。
    この文字列を使用して、シグナルテーブルのエントリーへのログメッセージを特定します。Debezium はこの文字列を使用しません。

    3

    stop-snapshot

    type パラメーターを指定し、シグナルがトリガーする操作を指定します。

    4

    data-collections

    シグナルの data フィールドのオプションコンポーネントで、スナップショットから削除するテーブル名の配列またはテーブル名とマッチする正規表現を指定します。
    この配列は、完全修飾名でテーブルをマッチさせる正規表現をリストアップします。signal.data.collection 設定プロパティーでコネクターのシグナリングテーブル名を指定するのと同じ形式を使用します。data フィールドのこのコンポーネントを省略すると、シグナルは進行中の増分スナップショット全体を停止します。

    5

    incremental

    停止させるスナップショット操作の種類を指定する信号の data フィールドの必須コンポーネント。
    現在、有効な唯一のオプションは incremental です。
    type の値を指定しないと、シグナルは増分スナップショットの停止に失敗します。

3.2.2. Debezium Db2 コネクターによる変更データテーブルの読み取り方法

スナップショットの完了後、Debezium Db2 コネクターが初めて起動すると、キャプチャーモードである各ソーステーブルの変更データテーブルを識別します。コネクターは各変更データテーブルに対して以下を行います。

  1. 最後に保存された最大 LSN から現在の最大 LSN の間に作成された変更イベントを読み取ります。
  2. 各イベントのコミット LSN および変更 LSN に従って、変更イベントを順序付けます。これにより、コネクターはテーブルが変更された順序で変更イベントを出力します。
  3. コミット LSN および変更 LSN をオフセットとして Kafka Connect に渡します。
  4. コネクターが Kafka Connect に渡した最大 LSN を保存します。

再起動後、コネクターは停止した場所でオフセット (コミット LSN および変更 LSN) から変更イベントの出力を再開します。コネクターが稼働し、変更イベントを出力している間、キャプチャーモードからテーブルを削除したり、テーブルをキャプチャーモードに追加したりすると、コネクターは変更を検出して、それに合わせて動作を変更します。

3.2.3. Debezium Db2 変更イベントレコードを受信する Kafka トピックのデフォルト名

デフォルトでは、Db2 コネクターは、テーブルで発生するすべての INSERTUPDATEDELETE 操作の変更イベントを、そのテーブルに固有の単一の Apache Kafka トピックに書き込みます。コネクターは以下の規則を使用して変更イベントトピックに名前を付けます。

topicPrefix.schemaName.tableName

以下のリストは、デフォルト名のコンポーネントの定義を示しています。

topicPrefix
topic.prefix コネクター設定プロパティーで指定されたトピック接頭辞。
schemaName
操作が発生したスキーマの名前。
tableName
操作が発生したテーブルの名前。

たとえば、MYSCHEMA スキーマに 4 つのテーブル (PRODUCTSPRODUCTS_ON_HANDCUSTOMERSORDERS) を含む mydatabase データベースを使用した Db2 インストールについて考えてみます。コネクターはイベントを以下の 4 つの Kafka トピックに出力します。

  • mydatabase.MYSCHEMA.PRODUCTS
  • mydatabase.MYSCHEMA.PRODUCTS_ON_HAND
  • mydatabase.MYSCHEMA.CUSTOMERS
  • mydatabase.MYSCHEMA.ORDERS

コネクターは同様の命名規則を適用して、内部データベーススキーマ履歴トピック、スキーマ変更トピック、およびトランザクションメタデータトピック にラベル 付けます。

デフォルトのトピック名が要件を満たさない場合は、カスタムトピック名を設定できます。カスタムトピック名を設定するには、論理トピックルーティング SMT に正規表現を指定します。論理トピックルーティング SMT を使用してトピックの命名をカスタマイズする方法は、トピック ルーティング を参照し てください。

3.2.4. Debezium Db2 コネクターのスキーマ変更トピック

Debezium Db2 コネクターを設定すると、データベースのキャプチャーされたテーブルに適用されるスキーマの変更を記述するスキーマ変更イベントを生成できます。

Debezium は、以下の場合にスキーマ変更トピックにメッセージを出力します。

  • 新しいテーブルがキャプチャーモードになる。
  • テーブルがキャプチャーモードから削除される。
  • データベーススキーマの更新 中に、キャプチャーモードのテーブルのスキーマに変更が加えられます。

コネクターはスキーマ変更イベントを、<topicPrefix> という名前の Kafka スキーマ変更トピックに書き込みます。ここで <topicPrefix> は、topic.prefix コネクター設定プロパティーで指定されたトピック接頭辞です。コネクターがスキーマ変更トピックに送信するメッセージには以下の要素などのペイロードが含まれます。

databaseName
ステートメントが適用されるデータベースの名前。databaseName の値は、メッセージキーとして機能します。
pos
ステートメントが表示される binlog の位置。
tableChanges
スキーマの変更後のテーブルスキーマ全体の構造化表現。tableChanges フィールドには、テーブルの各列のエントリーなどのアレイが含まれます。構造化された表現は JSON または Avro 形式でデータを表示するため、コンシューマーは DDL パーサーを介して最初にメッセージを処理しなくてもメッセージを簡単に読み取りできます。
重要

キャプチャーモードであるテーブルでは、コネクターはスキーマ変更トピックにスキーマ変更の履歴だけでなく、内部データベーススキーマ履歴トピックにも格納します。内部データベーススキーマ履歴トピックはコネクターのみの使用を対象としており、使用するアプリケーションによる直接使用を目的としていません。スキーマ変更に関する通知が必要なアプリケーションが、スキーマ変更トピックからの情報のみを使用するようにしてください。

重要

データベーススキーマ履歴トピックをパーティションに分割しないでください。データベーススキーマ履歴トピックが正しく機能するには、コネクターが出力するイベントレコードの一貫したグローバル順序を維持する必要があります。

トピックがパーティション間で分割されないようにするには、以下のいずれかの方法を使用してトピックのパーティション数を設定します。

  • データベーススキーマ履歴トピックを手動で作成する場合は、パーティション数を 1 に指定します。
  • Apache Kafka ブローカーを使用してデータベーススキーマ履歴トピックを自動的に作成する場合に、トピックが作成されるので、Kafka num.partitions 設定オプションの値を 1 に設定します。
警告

コネクターがスキーマ変更トピックに出力するメッセージの形式は、初期の状態であり、通知なしに変更される可能性があります。

例: Db2 コネクターのスキーマ変更トピックに出力されるメッセージ

以下の例は、スキーマ変更トピックのメッセージを示しています。メッセージには、テーブルスキーマの論理表現が含まれます。

Copy to Clipboard Toggle word wrap
{
  "schema": {
  ...
  },
  "payload": {
    "source": {
      "version": "2.1.4.Final",
      "connector": "db2",
      "name": "db2",
      "ts_ms": 0,
      "snapshot": "true",
      "db": "testdb",
      "schema": "DB2INST1",
      "table": "CUSTOMERS",
      "change_lsn": null,
      "commit_lsn": "00000025:00000d98:00a2",
      "event_serial_no": null
    },
    "ts_ms": 1588252618953, 
1

    "databaseName": "TESTDB", 
2

    "schemaName": "DB2INST1",
    "ddl": null, 
3

    "tableChanges": [ 
4

      {
        "type": "CREATE", 
5

        "id": "\"DB2INST1\".\"CUSTOMERS\"", 
6

        "table": { 
7

          "defaultCharsetName": null,
          "primaryKeyColumnNames": [ 
8

            "ID"
          ],
          "columns": [ 
9

            {
              "name": "ID",
              "jdbcType": 4,
              "nativeType": null,
              "typeName": "int identity",
              "typeExpression": "int identity",
              "charsetName": null,
              "length": 10,
              "scale": 0,
              "position": 1,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            },
            {
              "name": "FIRST_NAME",
              "jdbcType": 12,
              "nativeType": null,
              "typeName": "varchar",
              "typeExpression": "varchar",
              "charsetName": null,
              "length": 255,
              "scale": null,
              "position": 2,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            },
            {
              "name": "LAST_NAME",
              "jdbcType": 12,
              "nativeType": null,
              "typeName": "varchar",
              "typeExpression": "varchar",
              "charsetName": null,
              "length": 255,
              "scale": null,
              "position": 3,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            },
            {
              "name": "EMAIL",
              "jdbcType": 12,
              "nativeType": null,
              "typeName": "varchar",
              "typeExpression": "varchar",
              "charsetName": null,
              "length": 255,
              "scale": null,
              "position": 4,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            }
          ],
          "attributes": [ 
10

            {
              "customAttribute": "attributeValue"
            }
          ]
        }
      }
    ]
  }
}
表3.4 スキーマ変更トピックに出力されたメッセージのフィールドの説明
項目フィールド名説明

1

ts_ms

コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。

ソースオブジェクトの ts_ms は、データベースで変更が行われた時刻を示す。payload.source.ts_ms の値を payload.ts_ms の値と比較することにより、ソースデータベースの更新と Debezium との間の遅延を判断できます。

2

databaseName
schemaName

変更が含まれるデータベースとスキーマを識別します。

3

ddl

Db2 コネクターの場合は常に null です。その他のコネクターでは、このフィールドにスキーマの変更を行う DDL が含まれます。この DDL は Db2 コネクターでは使用できません。

4

tableChanges

DDL コマンドによって生成されるスキーマの変更が含まれる 1 つ以上の項目の配列。

5

type

変更の種類を説明します。値は以下のいずれかになります。

  • CREATE - テーブルの作成
  • ALTER - テーブルの変更
  • DROP - テーブルの削除

6

id

作成、変更、または破棄されたテーブルの完全な識別子。

7

table

適用された変更後のテーブルメタデータを表します。

8

primaryKeyColumnNames

テーブルのプライマリーキーを設定する列のリスト。

9

変更されたテーブルの各列のメタデータ。

10

attributes

各テーブル変更のカスタム属性メタデータ。

コネクターがスキーマ変更トピックに送信するメッセージでは、メッセージキーはスキーマの変更が含まれるデータベースの名前です。以下の例では、payload フィールドにキーが含まれます。

Copy to Clipboard Toggle word wrap
{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "string",
        "optional": false,
        "field": "databaseName"
      }
    ],
    "optional": false,
    "name": "io.debezium.connector.db2.SchemaChangeKey"
  },
  "payload": {
    "databaseName": "TESTDB"
  }
}

3.2.5. トランザクション境界を表す Debezium Db2 コネクターによって生成されたイベント

Debezium は、トランザクション境界を表し、変更データイベントメッセージをエンリッチするイベントを生成できます。

Debezium がトランザクションメタデータを受信する場合の制限

Debezium は、コネクターのデプロイ後に発生するトランザクションに対してのみメタデータを登録し、受信します。コネクターをデプロイする前に発生するトランザクションのメタデータは利用できません。

Debezium は、すべてのトランザクションで BEGIN および END 区切り文字のトランザクション境界イベントを生成します。トランザクション境界イベントには以下のフィールドが含まれます。

status
BEGIN または END
id
一意のトランザクション識別子の文字列表現。
ts_ms
データソースでのトランザクション境界イベント (BEGIN または END イベント) の時間。もしデータソースが Debezium にイベント時間を提供しないなら、このフィールドは代わりに Debezium がイベントを処理する時間を表します。
event_count (END イベント用)
トランザクションによって出力されるイベントの合計数。
data_collections (END イベント用)
data_collectionevent_count 要素のペアの配列。これは、コネクターがデータコレクションから発信された変更に対して出力するイベントの数を示します。

Copy to Clipboard Toggle word wrap
{
  "status": "BEGIN",
  "id": "00000025:00000d08:0025",
  "ts_ms": 1486500577125,
  "event_count": null,
  "data_collections": null
}

{
  "status": "END",
  "id": "00000025:00000d08:0025",
  "ts_ms": 1486500577691,
  "event_count": 2,
  "data_collections": [
    {
      "data_collection": "testDB.dbo.tablea",
      "event_count": 1
    },
    {
      "data_collection": "testDB.dbo.tableb",
      "event_count": 1
    }
  ]
}

topic.transaction オプションで上書きされない限り、コネクターはトランザクションイベントを <topic.prefix>.transaction トピックに出力します。

データ変更イベントのエンリッチメント

トランザクションメタデータを有効にすると、コネクターは変更イベント Envelope を新しい transaction フィールドでエンリッチします。このフィールドは、複合フィールドの形式ですべてのイベントに関する情報を提供します。

id
一意のトランザクション識別子の文字列表現。
total_order
トランザクションによって生成されたすべてのイベントを対象とするイベントの絶対位置。
data_collection_order
トランザクションによって出力されたすべてのイベントを対象とするイベントのデータコレクションごとの位置。

以下は、メッセージの例になります。

Copy to Clipboard Toggle word wrap
{
  "before": null,
  "after": {
    "pk": "2",
    "aa": "1"
  },
  "source": {
...
  },
  "op": "c",
  "ts_ms": "1580390884335",
  "transaction": {
    "id": "00000025:00000d08:0025",
    "total_order": "1",
    "data_collection_order": "1"
  }
}
トップに戻る
Red Hat logoGithubredditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。 最新の更新を見る.

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

Theme

© 2025 Red Hat, Inc.