検索

第2章 ソースコネクター

download PDF

Debezium は、さまざまなデータベース管理システムから変更をキャプチャーするソースコネクターのライブラリーを提供します。各コネクターは、非常によく似た構造を持つ変更イベントを生成するため、発生元に関係なくアプリケーションがイベントを消費し、応答することが容易になります。

現在、Debezium は以下のデータベースのソースコネクターを提供します。

2.1. Db2 の Debezium コネクター

Debezium の Db2 コネクターは、Db2 データベースのテーブルで行レベルの変更をキャプチャーできます。このコネクターと互換性のある Db2 データベースのバージョンについては、Debezium でサポートされる設定ページを参照してください。

このコネクターは、テーブルを "キャプチャーモード" にする SQL ベースのポーリングモデルを使用する、SQL Server の Debezium 実装から大きく影響を受けます。テーブルがキャプチャーモードの場合、Debezium Db2 コネクターは、そのテーブルへの行レベルの更新ごとに変更イベントを生成し、ストリーミングします。

キャプチャーモードのテーブルには、関連する変更テーブルがあり、このテーブルは Db2 によって作成されます。キャプチャーモードのテーブルに対する変更ごとに、Db2 はその変更に関するデータをテーブルの関連する変更データテーブルに追加します。変更データテーブルには、行の各状態のエントリーが含まれます。また、削除に関する特別なエントリーもあります。Debezium Db2 コネクターは変更イベントを変更データテーブルから読み取り、イベントを Kafka トピックに出力します。

Debezium Db2 コネクターが Db2 データベースに初めて接続すると、コネクターが変更をキャプチャーするように設定されたテーブルの整合性スナップショットを読み取ります。デフォルトでは、システム以外のテーブルがすべて対象になります。キャプチャーモードにするテーブルまたはキャプチャーモードから除外するテーブルを指定できるコネクター設定プロパティーがあります。

スナップショットが完了すると、コネクターはコミットされた更新の変更イベントをキャプチャーモードのテーブルに出力し始めます。デフォルトでは、特定のテーブルの変更イベントは、テーブルと同じ名前を持つ Kafka トピックに移動します。アプリケーションとサービスはこれらのトピックから変更イベントを使用します。

注記

コネクターには、Linux 用の Db2 の標準部分として利用できる抽象構文表記 (ASN) ライブラリーを使用する必要があります。ASN ライブラリーを使用するには、IBM InfoSphere Data Replication (IIDR) のライセンスが必要です。ASN ライブラリーを使用するには、IIDR をインストールする必要はありません。

Debezium Db2 コネクターを使用するための情報および手順は、以下のように設定されています。

2.1.1. Debezium Db2 コネクターの概要

Debezium Db2 コネクターは、Db2 で SQL レプリケーションを有効にする ASN Capture/Apply エージェント をベースにしています。キャプチャーエージェントは以下を行います。

  • キャプチャーモードであるテーブルの変更データテーブルを生成します。
  • キャプチャーモードのテーブルを監視し、更新の変更イベントを対応する変更データテーブルのテーブルに格納します。

Debezium コネクターは SQL インターフェイスを使用して変更イベントの変更データテーブルに対してクエリーを実行します。

データベース管理者は、変更をキャプチャーするテーブルをキャプチャーモードにする必要があります。便宜上およびテストを自動化するために、以下の管理タスクをコンパイルし、実行できる Debezium 管理ユーザー定義機能 (UDF) が C にあります。

  • ASN エージェントの開始、停止、および再初期化。
  • テーブルをキャプチャーモードにする。
  • レプリケーション (ASN) スキーマと変更データテーブルの作成。
  • キャプチャーモードからのテーブルの削除。

また、Db2 制御コマンドを使用してこれらのタスクを実行することもできます。

対象のテーブルがキャプチャーモードになった後、コネクターは対応する変更データテーブルを読み取り、テーブル更新の変更イベントを取得します。コネクターは、変更されたテーブルと同じ名前を持つ Kafka トピックに対して、行レベルの挿入、更新、および削除操作ごとに変更イベントを出力します。これは、変更可能なデフォルトの動作です。クライアントアプリケーションは、対象のデータベーステーブルに対応する Kafka トピックを読み取り、行レベルの各変更イベントに対応できます。

通常、データベース管理者はテーブルのライフサイクルの途中でテーブルをキャプチャーモードにします。つまり、コネクターにはテーブルに加えられたすべての変更の完全な履歴はありません。そのため、Db2 コネクターが最初に特定の Db2 データベースに接続すると、キャプチャーモードである各テーブルで 整合性スナップショット を実行して起動します。コネクターは、スナップショットの完成後に、スナップショットが作成された時点から変更イベントをストリーミングします。これにより、コネクターはキャプチャーモードのテーブルの整合性のあるビューで開始し、スナップショットの実行中に加えられた変更は破棄されません。

Debezium コネクターはフォールトトラレントです。コネクターは変更イベントを読み取りおよび生成すると、変更データテーブルエントリーのログシーケンス番号 (LSN) を記録します。LSN はデータベースログの変更イベントの位置になります。コネクターが何らかの理由で停止した場合 (通信障害、ネットワークの問題、クラッシュなど)、コネクターは再起動後に最後に停止した場所の変更データテーブルの読み取りを続行します。これにはスナップショットが含まれます。つまり、コネクターの停止時にスナップショットが完了しなかった場合、コネクターの再起動時に新しいスナップショットが開始されます。

2.1.2. Debezium Db2 コネクターの仕組み

Debezium Db2 コネクターを最適に設定および実行するには、コネクターによるスナップショットの実行方法、変更イベントのストリーム方法、Kafka トピック名の決定方法、およびスキーマ変更の処理方法を理解すると便利です。

詳細は以下を参照してください。

2.1.2.1. Debezium Db2 コネクターによるデータベーススナップショットの実行方法

Db2 のレプリケーション機能は、データベース変更の完全な履歴を保存するようには設計されていません。そのため、Debezium Db2 コネクターはログからデータベースの履歴全体を取得できません。コネクターがデータベースの現在の状態のベースラインを確立できるようにするには、コネクターの初回起動時に、キャプチャーボード のテーブルの最初の 整合性スナップショット を実行します。スナップショットが変更をキャプチャーするたびに、コネクターはキャプチャーされたテーブルの Kafka トピックに read イベントを発行します。

スナップショットの詳細は、以下のセクションを参照してください。

2.1.2.1.1. Debezium Db2 コネクターが最初のスナップショットの実行に使用するデフォルトのワークフロー

以下のワークフローでは、Debezium がスナップショットを作成する手順を示しています。この手順では、snapshot.mode 設定プロパティーがデフォルト値 (initial) に設定されている場合のスナップショットのプロセスを説明します。snapshot.mode プロパティーの値を変更することで、コネクターがスナップショットを作成する方法をカスタマイズできます。別のスナップショットモードを設定する場合、コネクターはこのワークフローの変更バージョンを使用してスナップショットを完了します。

  1. データベースへの接続を確立します。
  2. キャプチャーモードで、かつスナップショットに含める必要があるテーブルを決定します。デフォルトでは、コネクターはシステム以外のすべてのテーブルのデータをキャプチャーします。スナップショットが完了した後、コネクターは指定されたテーブルのデータをストリーミングし続けます。コネクターで特定のテーブルからのみデータをキャプチャーする場合は、table.include.listtable.exclude.list などのプロパティーを設定して、テーブルまたはテーブル要素のサブセットのみのデータをキャプチャーするようにコネクターに指示できます。
  3. キャプチャーモードの各テーブルでロックを取得します。このロックを使用して、スナップショットが完了するまで、それらのテーブルでスキーマの変更が行われないようにします。ロックのレベルは、snapshot.isolation.mode コネクター設定プロパティーによって決定されます。
  4. サーバーのトランザクションログで、最上位 (最新) の LSN の位置を読み取ります。
  5. すべてのテーブル、またはキャプチャー対象として指定されたすべてのテーブルのスキーマをキャプチャーします。コネクターは、内部データベースのスキーマ履歴トピックにスキーマ情報を保持します。スキーマ履歴は、変更イベントの発生時に有効な構造に関する情報を提供します。

    注記

    デフォルトでは、コネクターは、キャプチャー用に設定されていないテーブルも含め、キャプチャーモードにあるデータベース内の全テーブルのスキーマをキャプチャーします。テーブルがキャプチャー用に設定されていない場合、最初のスナップショットはテーブルの構造のみをキャプチャーし、テーブルデータはキャプチャーされません。

    初期スナップショットに含まれなかったテーブルのスキーマ情報がスナップショットに保持される理由の詳細は、初期スナップショットがすべてのテーブルのスキーマをキャプチャーする理由 を参照してください。

  6. 手順 3 で取得したロックをすべてリリースします。他のデータベースクライアントは、以前にロックされていたテーブルに書き込みできるようになります。
  7. 手順 4 で読み取った LSN 位置で、コネクターはキャプチャーするように指定されたテーブルをスキャンします。スキャン中に、コネクターは次のタスクを実行します。

    1. スナップショットが開始される前に、テーブルが作成されたことを確認します。スナップショットの開始後にテーブルが作成された場合、コネクターはテーブルをスキップします。スナップショットが完了し、コネクターがストリーミングに移行すると、スナップショットの開始後に作成されたテーブルに対して変更イベントが発行されます。
    2. テーブルからキャプチャーされた行ごとに read イベントを生成します。すべての read イベントには、LSN の位置が含まれ、これは手順 4 で取得した LSN の位置と同じです。
    3. ソーステーブルの Kafka トピックに各 read イベントを出力します。
    4. 該当する場合は、データテーブルロックを解放します。
  8. コネクターオフセットにスナップショットの正常な完了を記録します。

作成された初期スナップショットは、キャプチャーされたテーブルの各行の現在の状態をキャプチャーします。このベースライン状態から、コネクターは発生した後続の変更をキャプチャーします。

スナップショットプロセスが開始されたら、コネクターの障害、リバランス、またはその他の理由でプロセスが中断されると、コネクターの再起動後にプロセスが再起動されます。

コネクターによって最初のスナップショットが完了した後、更新に抜けがないように、手順 4 で読み取った位置からストリーミングを続行します。

何らかの理由でコネクターが再び停止した場合に、コネクターは再起動後に最後に停止した位置から変更のストリーミングを再開します。

表2.1 snapshot.mode コネクター設定プロパティーの設定
設定説明

always

コネクターは起動するたびにスナップショットを実行します。スナップショットが完了すると、コネクターは、後続のデータベース変更のに備え、イベントレコードのストリーミングを開始します。

Initial

コネクターは、最初のスナップショット を作成するためのデフォルトのワークフローで説明されているように、データベーススナップショットを実行します。スナップショットが完了すると、コネクターは、後続のデータベース変更のに備え、イベントレコードのストリーミングを開始します。

initial_only

コネクターはデータベースのスナップショットを実行します。スナップショットが完了すると、コネクターは停止し、後続のデータベース変更のイベントレコードをストリーミングしません。

schema_only

非推奨。no_data を参照してください。

no_data

コネクターは関連するすべてのテーブルの構造をキャプチャーし、デフォルトのスナップショットワークフロー に記載されているすべてのステップを実行します。ただし、コネクターの起動時(ステップ 7.b)の時点でデータセットを表す READ イベントが作成されない点が異なります。

recovery

損失または破損したデータベーススキーマの履歴トピックを復元するにはこのオプションを設定します。再起動後、コネクターはソーステーブルからトピックを再構築するスナップショットを実行します。また、このプロパティーを設定して、予期しない増加が発生するデータベーススキーマ履歴トピックを定期的にプルーニングすることもできます。

+ 警告:最後のコネクターのシャットダウン後にスキーマの変更がデータベースにコミットされた場合、このモードを使用してスナップショットを実行しないでください。

when_needed

コネクターが起動した後、以下の状況のいずれかを検出する場合にのみスナップショットが実行されます。

  • トピックオフセットを検出できません。
  • 以前に記録されたオフセットは、サーバーで利用できないログの位置を指定します。

詳細は、コネクター設定プロパティーテーブルの snapshot.mode を参照してください。

2.1.2.1.2. 初期スナップショットがすべてのテーブルのスキーマ履歴をキャプチャーする理由

コネクターが実行する最初のスナップショットは、2 種類の情報をキャプチャーします。

テーブルデータ
コネクターの table.include.list プロパティーにあるテーブルの INSERTUPDATE、および DELETE 操作に関する情報。
スキーマデータ
テーブルに適用される構造の変更を記述する DDL ステートメント。スキーマデータは、内部スキーマ履歴トピックとコネクターのスキーマ変更トピック (設定されている場合) の両方に保持されます。

初期スナップショットを実行すると、キャプチャー対象として指定されていないテーブルのスキーマ情報がスナップショットによってキャプチャーされることが分かります。デフォルトでは、初期スナップショットは、キャプチャー用に指定されたテーブルからだけでなく、データベースに存在するすべてのテーブルのスキーマ情報を取得するように設計されています。コネクターでは、テーブルのスキーマがスキーマ履歴トピックにある状態で、テーブルをキャプチャーする必要があります。初期スナップショットが元のキャプチャーセットの一部ではないテーブルのスキーマデータをキャプチャーできるようにして、後で必要になった場合にこれらのテーブルからイベントデータを簡単にキャプチャーできるように、Debezium はコネクターを準備します。初期スナップショットがテーブルのスキーマをキャプチャーしない場合は、コネクターがテーブルからデータをキャプチャーする前に、履歴トピックにスキーマを追加する必要があります。

場合によっては、最初のスナップショットでのスキーマキャプチャーを制限する場合があります。これは、スナップショットの完了に必要な時間の短縮に便利です。または、Debezium が複数の論理データベースにアクセスできるユーザーアカウントを使用して、データベースインスタンスに接続しているにもかかわらず、コネクターで特定の論理データベース内のテーブルからの変更のみをキャプチャーする場合にも便利です。

Additional information

2.1.2.1.3. 初期スナップショットでキャプチャーされなかったテーブルからのデータのキャプチャー (スキーマ変更なし)

コネクターを使用して、最初のスナップショットでスキーマがキャプチャーされなかったテーブルからデータをキャプチャーする場合があります。コネクターの設定によっては、最初のスナップショットはデータベース内の特定のテーブルのテーブルスキーマのみをキャプチャーする場合があります。テーブルスキーマが履歴トピックに存在しない場合、コネクターはテーブルのキャプチャーに失敗し、スキーマ欠落エラーを報告します。

テーブルからデータを取得できる場合もありますが、テーブルスキーマを追加するには別の手順を実行する必要があります。

前提条件

手順

  1. コネクターを停止します。
  2. schema.history.internal.kafka.topic プロパティー で指定された内部データベーススキーマ履歴トピックを削除します。
  3. 設定された Kafka Connect offset.storage.topic 内のオフセットをクリアします。オフセットを削除する方法の詳細は、Debezium コミュニティーの FAQ を参照してください。

    警告

    オフセットの削除は、内部 Kafka Connect データの操作の経験がある上級ユーザーのみが実行してください。この操作によりシステムが破損する場合があるため、最後の手段としてのみ実行してください。

  4. 以下の変更をコネクター設定に適用します。

    1. (オプション) schema.history.internal.captured.tables.ddl の値を false に設定します。この設定により、スナップショットですべてのテーブルのスキーマがキャプチャーされ、今後、コネクターがすべてのテーブルのスキーマ履歴を再構築できるようにします。

      注記

      すべてのテーブルのスキーマをキャプチャーするスナップショットは、完了までにさらに時間がかかります。

    2. コネクターがキャプチャーするテーブルを table.include.list に追加します。
    3. snapshot.mode を次のいずれかの値に設定します。

      Initial
      コネクターを再起動すると、テーブルデータとテーブル構造をキャプチャーするデータベースの完全なスナップショットが作成されます。
      このオプションを選択する場合は、コネクターがすべてのテーブルのスキーマをキャプチャーできるように、schema.history.internal.captured.tables.ddl プロパティーの値を false に設定することを検討してください。
      schema_only
      コネクターを再起動すると、テーブルスキーマのみをキャプチャーするスナップショットが作成されます。完全なデータスナップショットとは異なり、このオプションではテーブルデータはキャプチャーされません。完全なスナップショットが作成される前に、早くコネクターを再起動する場合は、このオプションを使用します。
  5. コネクターを再起動します。コネクターは、snapshot.mode で指定されたタイプのスナップショットを完了します。
  6. (オプション) コネクターが schema_only スナップショットを実行した場合、スナップショットの完了後に 増分スナップショット を開始して、追加したテーブルからデータをキャプチャーします。コネクターは、テーブルからリアルタイムの変更をストリーミングし続けながら、スナップショットを実行します。増分スナップショットを実行すると、次のデータ変更がキャプチャーされます。

    • コネクターが以前にキャプチャーしたテーブルの場合、増分スナップショットは、コネクターが停止している間、つまりコネクターが停止してから現在の再起動までの間に発生した変更をキャプチャーします。
    • 新しく追加されたテーブルの場合、増分スナップショットは既存のテーブル行をすべてキャプチャーします。
2.1.2.1.4. 初期スナップショットでキャプチャーされなかったテーブルからのデータのキャプチャー (スキーマ変更)

スキーマ変更がテーブルに適用される場合、スキーマ変更前にコミットされたレコードの構造は、変更後にコミットされたレコードとは異なります。Debezium はテーブルからデータをキャプチャーするときに、スキーマ履歴を読み取り、各イベントに正しいスキーマが適用されていることを確認します。スキーマがスキーマ履歴トピックに存在しない場合、コネクターはテーブルをキャプチャーできず、エラーが発生します。

最初のスナップショットでキャプチャーされず、テーブルのスキーマが変更されたテーブルからデータをキャプチャーする場合、スキーマがまだ使用可能でない場合は、履歴トピックにスキーマを追加する必要があります。新しいスキーマスナップショットを実行するか、テーブルの初期スナップショットを実行して、スキーマを追加できます。

前提条件

  • コネクターにより最初のスナップショット中にキャプチャーされなかったスキーマが含まれるテーブルからデータをキャプチャーしたいと考えている。
  • スキーマ変更がテーブルに適用されたため、キャプチャーされるレコードの構造が不均一になっている。

手順

初期スナップショットにすべてのテーブルのスキーマがキャプチャーされている場合 (store.only.captured.tables.ddlfalse に設定されました)。
  1. table.include.list プロパティーを編集して、キャプチャーするテーブルを指定します。
  2. コネクターを再起動します。
  3. 新しく追加したテーブルから既存のデータをキャプチャーする場合は、増分スナップショット を開始します。
初期スナップショットにすべてのテーブルのスキーマがキャプチャーされていない場合 (store.only.captured.tables.ddltrue に設定されています)。

最初のスナップショットでキャプチャーするテーブルのスキーマが保存されなかった場合は、次のいずれかの手順を実行します。

手順 1: スキーマスナップショット、その後に増分スナップショット

この手順では、コネクターは最初にスキーマのスナップショットを実行します。その後、増分スナップショットを開始して、コネクターがデータを同期できるようにします。

  1. コネクターを停止します。
  2. schema.history.internal.kafka.topic プロパティー で指定された内部データベーススキーマ履歴トピックを削除します。
  3. 設定された Kafka Connect offset.storage.topic 内のオフセットをクリアします。オフセットを削除する方法の詳細は、Debezium コミュニティーの FAQ を参照してください。

    警告

    オフセットの削除は、内部 Kafka Connect データの操作の経験がある上級ユーザーのみが実行してください。この操作によりシステムが破損する場合があるため、最後の手段としてのみ実行してください。

  4. 次の手順の説明に従って、コネクター設定のプロパティーの値を設定します。

    1. snapshot.mode プロパティーの値を schema_only に設定します。
    2. table.include.list を編集して、キャプチャーするテーブルを追加します。
  5. コネクターを再起動します。
  6. Debezium が新規および既存のテーブルのスキーマをキャプチャーするまで待ちます。コネクターが停止した後にテーブルで発生したデータ変更はキャプチャーされません。
  7. データが損失されないようにするには、増分スナップショット を開始します。
手順 2: 初期スナップショットと、それに続くオプションの増分スナップショット

この手順では、コネクターはデータベースの完全な初期スナップショットを実行します。他の初期スナップショットと同様、多数の大きなテーブルが含まれるデータベースでは、初期スナップショットの実行操作には時間がかかる可能性があります。スナップショットの完了後、任意で増分スナップショットをトリガーして、コネクターがオフラインの間に発生した変更をキャプチャーできます。

  1. コネクターを停止します。
  2. schema.history.internal.kafka.topic プロパティー で指定された内部データベーススキーマ履歴トピックを削除します。
  3. 設定された Kafka Connect offset.storage.topic 内のオフセットをクリアします。オフセットを削除する方法の詳細は、Debezium コミュニティーの FAQ を参照してください。

    警告

    オフセットの削除は、内部 Kafka Connect データの操作の経験がある上級ユーザーのみが実行してください。この操作によりシステムが破損する場合があるため、最後の手段としてのみ実行してください。

  4. table.include.list を編集して、キャプチャーするテーブルを追加します。
  5. 次の手順の説明に従って、コネクター設定のプロパティーの値を設定します。

    1. snapshot.mode プロパティーの値を initial に設定します。
    2. (オプション) schema.history.internal.store.only.captured.tables.ddlfalse に設定します。
  6. コネクターを再起動します。コネクターはデータベース全体のスナップショットを取得します。スナップショットが完了すると、コネクターはストリーミングに移行します。
  7. (オプション) コネクターがオフラインの間に変更されたデータをキャプチャーするには、増分スナップショット を開始します。

2.1.2.2. アドホックスナップショット

デフォルトでは、コネクターは初回スナップショット操作の開始後にのみ実行されます。通常の状況では、この最初のスナップショットが作成されると、コネクターではスナップショットプロセスは繰り返し処理されません。コネクターがキャプチャーする今後の変更イベントデータはストリーミングプロセス経由でのみ行われます。

ただし、場合によっては、最初のスナップショット中にコネクターを取得したデータが古くなったり、失われたり、または不完全となったり可能性があります。テーブルデータを再キャプチャーするメカニズムを提供するため、Debezium にはアドホックスナップショットを実行するオプションがあります。Debezium 環境で次のいずれかの変更が発生したら、アドホックスナップショットを実行することを推奨します。

  • コネクター設定は、異なるテーブルセットをキャプチャーするように変更されます。
  • Kafka トピックを削除して、再構築する必要があります。
  • 設定エラーや他の問題が原因で、データの破損が発生します。

アドホックと呼ばれるスナップショット を開始することで、以前にスナップショットをキャプチャーしたテーブルのスナップショットを再実行できます。アドホックスナップショットには、シグナルテーブル を使用する必要があります。シグナルリクエストを Debezium シグナルテーブルに送信して、アドホックスナップショットを開始します。

既存のテーブルのアドホックスナップショットを開始すると、コネクターはテーブルにすでに存在するトピックにコンテンツを追加します。既存のトピックが削除された場合には、トピックの自動作成 が有効になっているのであれば、Debezium は自動的にトピックを作成できます。

アドホックのスナップショットシグナルは、スナップショットに追加するテーブルを指定します。スナップショットは、データベースの内容全体をキャプチャーしたり、データベース内のテーブルのサブセットのみをキャプチャーしたりできます。また、スナップショットは、データベース内のテーブルの内容のサブセットをキャプチャできます。

execute-snapshot メッセージをシグナルテーブルに送信してキャプチャーするテーブルを指定します。execute-snapshot シグナルのタイプを incremental または blocking に設定し、スナップショットに含めるテーブルの名前を次の表に示すように指定します。

表2.2 アドホックの execute-snapshot シグナルレコードの例
フィールドデフォルト

type

incremental

実行するスナップショットのタイプを指定します。
現在、incremental または blocking スナップショットを要求できます。

data-collections

該当なし

スナップショットされるテーブルの完全修飾名にマッチする正規表現を含む配列。
名前の形式は signal.data.collection 設定オプションと同じです。

additional-condition

該当なし

テーブルの内容のサブセットを取得するために、テーブルの列に基づいて条件を指定するオプションの文字列。

注記

このプロパティーは非推奨にされています。スナップショットでキャプチャーするデータのサブセットを定義する基準を指定するには、additional-conditions パラメーターを使用します。

additional-conditions

該当なし

コネクターがスナップショットに含めるレコードのサブセットを決定するために評価する追加条件のセットを指定する、オプションの配列。
各追加条件は、アドホックスナップショットがキャプチャーするデータをフィルタリングする基準を指定するオブジェクトです。各追加条件には次のパラメーターを設定できます。

data-collection
フィルターを適用するテーブルの完全修飾名。各テーブルに異なるフィルターを適用できます。
filter
スナップショットに含めるためにデータベースレコードに存在する必要がある列の値を指定します (例: "color='blue'")。

filter パラメーターに割り当てる値は、ブロッキングスナップショットの snapshot.select.statement.overrides プロパティーを設定するときに SELECT ステートメントの WHERE 句で指定する値と同じタイプです。以前の Debezium リリースでは、スナップショットシグナルに対して明示的な filter パラメーターは定義されていませんでした。代わりに、フィルター条件は、現在非推奨となっている additional-condition パラメーターに指定した値によって暗示されていました。

surrogate-key

該当なし

スナップショット処理中にコネクターがテーブルのプライマリーキーとして使用する列名を指定するオプションの文字列。

アドホック増分スナップショットのトリガー

execute-snapshot シグナルタイプのエントリーをシグナルテーブルに追加して、アドホック増分スナップショットを開始します。コネクターがメッセージを処理した後に、スナップショット操作を開始します。スナップショットプロセスは、最初と最後のプライマリーキーの値を読み取り、これらの値を各テーブルの開始ポイントおよびエンドポイントとして使用します。テーブルのエントリー数と設定されたチャンクサイズに基づいて、Debezium はテーブルをチャンクに分割し、チャンクごとに 1 度に 1 つずつスナップショットを順番に作成していきます。

詳細は、スナップショットの増分を参照してください。

アドホックブロッキングスナップショットのトリガー

execute-snapshot シグナルタイプのエントリーをシグナルテーブルに追加して、アドホックブロッキングスナップショットを開始します。コネクターがメッセージを処理した後に、スナップショット操作を開始します。コネクターはストリーミングを一時的に停止し、初期スナップショットの時と同じプロセスに従って、指定されたテーブルのスナップショットを開始します。スナップショットが完了すると、コネクターはストリーミングを再開します。

詳細は、ブロッキングスナップショット を参照してください。

2.1.2.3. 増分スナップショット

スナップショットを柔軟に管理するため、Debezium には 増分スナップショット と呼ばれる補助スナップショットメカニズムが含まれています。増分スナップショットは、Debezium コネクターにシグナルを送信するための Debezium メカニズムに依存します。

増分スナップショットでは、最初のスナップショットのように、データベースの完全な状態を一度にすべてキャプチャーする代わりに、一連の設定可能なチャンクで各テーブルを段階的にキャプチャーします。スナップショットがキャプチャーするテーブルと、各チャンクのサイズ を指定できます。チャンクのサイズにより、データベース上の各フェッチ操作中にスナップショットで収集される行数が決まります。増分スナップショットのデフォルトのチャンクサイズは 1024 行です。

増分スナップショットが進むと、Debezium はウォーターマークを使用して進捗を追跡し、キャプチャーする各テーブル行のレコードを管理します。この段階的なアプローチでは、標準の初期スナップショットプロセスと比較して、以下の利点があります。

  • スナップショットが完了するまで、ストリーミングストリーミングを延期する代わりに、ストリームしたデータキャプチャーと並行して増分スナップショットを実行できます。コネクターはスナップショットプロセス全体で変更ログからのほぼリアルタイムイベントをキャプチャーし続け、他の操作はブロックしません。
  • 増分スナップショットの進捗が中断された場合は、データを失うことなく再開できます。プロセスが再開すると、スナップショットは最初からテーブルをキャプチャーするのではなく、停止した時点から開始します。
  • いつでも増分スナップショットを実行し、必要に応じてプロセスを繰り返してデータベースの更新に適合できます。たとえば、コネクター設定を変更してテーブルを table.include.list プロパティーに追加した後にスナップショットを再実行します。

増分スナップショットプロセス

増分スナップショットを実行する場合には、Debezium は各テーブルをプライマリーキー別に分類して、設定されたチャンクサイズ に基づいてテーブルをチャンクに分割します。チャンクごとに作業し、テーブルの行ごとにチャンクでキャプチャーします。キャプチャーする行ごとに、スナップショットは READ イベントを出力します。そのイベントは、対象となるチャンクのスナップショットを開始する時の行の値を表します。

スナップショットの作成が進むにつれ、他のプロセスがデータベースへのアクセスを継続し、テーブルレコードが変更される可能性があります。このような変更を反映させるように、通常通りに INSERTUPDATEDELETE 操作がトランザクションログにコミットされます。同様に、継続中の Debezium ストリーミングプロセスは、これらの変更イベントを検出し、対応する変更イベントレコードを Kafka に出力します。

Debezium を使用してプライマリーキーが同じレコード間での競合を解決する方法

場合によっては、ストリーミングプロセスが出力する UPDATE または DELETE イベントを順番に受信できます。つまり、ストリーミングプロセスは、スナップショットがその行の READ イベントが含まれるチャンクをキャプチャーする前に、テーブルの行を変更するイベントを生成する可能性があります。スナップショットが最終的に対象の行にあった READ イベントを出力すると、その値はすでに置き換えられています。Debezium は、シーケンスが到達する増分スナップショットイベントが正しい論理順序で処理されるように、競合を解決するためにバッファースキームを使用します。スナップショットのイベント間で競合が発生し、ストリームされたイベントが解決されてからでないと、Debezium はイベントのレコードを Kafka に送信しません。

スナップショットウィンドウ

遅れて入ってきた READ イベントと、同じテーブルの行を変更するストリーミングイベント間の競合の解決を容易にするために、Debezium は スナップショットウィンドウ と呼ばれるものを使用します。スナップショットウィンドウは、増分スナップショットが指定のテーブルチャンクのデータをキャプチャーしている途中に、間隔を決定します。チャンクのスナップショットウィンドウを開く前に、Debezium は通常の動作に従い、トランザクションログから直接ターゲットの Kafka トピックにイベントをダウンストリームに出力します。ただし、特定のチャンクのスナップショットが開放された瞬間から終了するまで、Debezium は重複除去のステップを実行して、プライマリーキーが同じイベント間での競合を解決します。

データコレクションごとに、Debezium は 2 種類のイベントを出力し、それらの両方のレコードを単一の宛先 Kafka トピックに保存します。テーブルから直接キャプチャーするスナップショットレコードは、READ 操作として出力されます。その間、ユーザーはデータコレクションのレコードの更新を続け、各コミットを反映するようにトランザクションログが更新されるので、Debezium は変更ごとに UPDATE または DELETE 操作を出力します。

スナップショットウィンドウが開放され、Debezium がスナップショットチャンクの処理を開始すると、スナップショットレコードをメモリーバッファーに提供します。スナップショットウィンドウ中に、バッファー内の READ イベントのプライマリーキーは、受信ストリームイベントのプライマリーキーと比較されます。一致するものが見つからない場合、ストリーミングされたイベントレコードが Kafka に直接送信されます。Debezium が一致を検出すると、バッファーされた READ イベントを破棄し、ストリーミングされたレコードを宛先トピックに書き込みます。これは、ストリーミングされたイベントが静的スナップショットイベントよりも論理的に優先されるためです。チャンクのスナップショットウィンドウが終了すると、バッファーに含まれるのは、関連するトランザクションログイベントが存在しない READ イベントのみです。Debezium は、これらの残りの READ イベントをテーブルの Kafka トピックに出力します。

コネクターは各スナップショットチャンクにプロセスを繰り返します。

警告

Db2 の Debezium コネクターでは、増分スナップショットの実行中のスキーマの変更はサポートしません。

2.1.2.3.1. 増分スナップショットのトリガー

現在、増分スナップショットを開始する唯一の方法は、アドホックスナップショットシグナル をソースデータベースのシグナルテーブルに送信することです。

シグナルを SQL INSERT クエリーとしてシグナルテーブルに送信します。

Debezium がシグナルテーブルの変更を検出すると、シグナルを読み取り、要求されたスナップショット操作を実行します。

送信するクエリーはスナップショットに追加するテーブルを指定し、必要に応じてスナップショット操作タイプを指定します。現在、インクリメンタル および ブロッキング タイプをサポートしています。

スナップショットに追加するテーブルを指定するには、テーブルをリストする data-collections 配列またはテーブルの照合に使用する正規表現の配列を指定します。以下に例を示します。

{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}

増分スナップショットシグナルの data-collections アレイにはデフォルト値がありません。data-collections アレイが空である場合には、アクションが不要であり、スナップショットを実行しないことが、Debezium で検出されます。

注記

スナップショットに含めるテーブルの名前に、データベース、スキーマ、またはテーブルの名前にドット (.) が含まれている場合、そのテーブルを data-collections 配列に追加するには、名前の各パートを二重引用符でエスケープする必要があります。

たとえば、以下のようなテーブルを含めるには public スキーマに存在し、その名前が My.Tableのテーブルを含めるには、"public"."My.Table" の形式を使用します。

前提条件

ソースシグナリングチャネルを使用して増分スナップショットをトリガーする

  1. SQL クエリーを送信し、アドホック増分スナップショット要求をシグナルテーブルに追加します。

    INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<tableName>","<tableName>"],"type":"<snapshotType>","additional-conditions":[{"data-collection": "<tableName>", "filter": "<additional-condition>"}]}');

    以下に例を示します。

    INSERT INTO myschema.debezium_signal (id, type, data) 1
    values ('ad-hoc-1',   2
        'execute-snapshot',  3
        '{"data-collections": ["schema1.table1", "schema2.table2"], 4
        "type":"incremental", 5
        "additional-conditions":[{"data-collection": "schema1.table1" ,"filter":"color=\'blue\'"}]}'); 6

    コマンドの idtype、および data パラメーターの値は、シグナルテーブルのフィールド に対応します。

    以下の表では、この例のパラメーターを説明しています。

    表2.3 シグナルテーブルに増分スナップショットシグナルを送信する SQL コマンドのフィールドの説明
    項目説明

    1

    myschema.debezium_signal

    ソースデータベースにあるシグナルテーブルの完全修飾名を指定します。

    2

    ad-hoc-1

    id パラメーターは、シグナルリクエストの ID 識別子として割り当てられる任意の文字列を指定します。
    この文字列を使用して、シグナルテーブルのエントリーへのログメッセージを特定します。Debezium はこの文字列を使用しません。代わりに、スナップショット作成中に、Debezium は独自の ID 文字列をウォーターマークシグナルとして生成します。

    3

    execute-snapshot

    type パラメーターを指定し、シグナルがトリガーする操作を指定します。

    4

    data-collections

    シグナルの data フィールドの必須コンポーネントで、スナップショットに含めるテーブル名の配列またはテーブル名と一致する正規表現を指定します。
    この配列は、完全修飾名でテーブルをマッチさせる正規表現をリストアップします。signal.data.collection 設定プロパティーでコネクターのシグナリングテーブル名を指定するのと同じ形式を使用します。

    5

    incremental

    実行するスナップショット操作の タイプ を指定するシグナルの data フィールドの任意の type コンポーネント。
    現在、インクリメンタル および ブロッキング タイプをサポートしています。
    値を指定しない場合には、コネクターは増分スナップショットを実行します。

    6

    additional-conditions

    コネクターがスナップショットに含めるレコードのサブセットを決定するために評価する追加条件のセットを指定する、オプションの配列。
    各追加条件は、data-collection プロパティーと filter プロパティーを持つオブジェクトです。各データ収集に異なるフィルターを指定できます。
    * data-collection プロパティーは、フィルターが適用されるデータコレクションの完全修飾名です。additional-conditions パラメーターの詳細は、additional-conditions 付きのアドホック増分スナップショット を参照してください。

additional-conditions 付きのアドホック増分スナップショット

スナップショットに、テーブル内のコンテンツのサブセットのみを含める場合は、スナップショットシグナルに additional-conditions パラメーターを追加してシグナル要求を変更できます。

一般的なスナップショットの SQL クエリーは、以下の形式を取ります。

SELECT * FROM <tableName> ....

additional-conditions パラメーターを追加して、以下の例のように WHERE 条件を SQL クエリーに追加します。

SELECT * FROM <data-collection> WHERE <filter> ....

以下の例は、シグナルテーブルに追加の条件を含むアドホック増分スナップショット要求を送信する SQL クエリーを示しています。

INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<tableName>","<tableName>"],"type":"<snapshotType>","additional-conditions":[{"data-collection": "<tableName>", "filter": "<additional-condition>"}]}');

たとえば、以下の列が含まれる products テーブルがあるとします。

  • id (プライマリーキー)
  • color
  • quantity

products テーブルの増分スナップショットに color=blue のデータ項目のみを含める場合は、次の SQL ステートメントを使用してスナップショットをトリガーできます。

INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.products"],"type":"incremental", "additional-conditions":[{"data-collection": "schema1.products", "filter": "color=blue"}]}');

additional-conditions パラメーターを使用すると、列が 2 つ以上となる条件を指定することもできます。たとえば、前述の例の products テーブルを使用して、color=blue および quantity>10 だけに一致するアイテムのみのデータが含まれる増分スナップショットをトリガーするクエリーを送信できます。

INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.products"],"type":"incremental", "additional-conditions":[{"data-collection": "schema1.products", "filter": "color=blue AND quantity>10"}]}');

以下の例は、コネクターによってキャプチャーされる増分スナップショットイベントの JSON を示しています。

例: 増分スナップショットイベントメッセージ

{
    "before":null,
    "after": {
        "pk":"1",
        "value":"New data"
    },
    "source": {
        ...
        "snapshot":"incremental" 1
    },
    "op":"r", 2
    "ts_ms":"1620393591654",
    "ts_us":"1620393591654547",
    "ts_ns":"1620393591654547920",
    "transaction":null
}

項目フィールド名説明

1

snapshot

実行するスナップショット操作タイプを指定します。
現在、有効なオプションは blocking および incremental のみです。
シグナルテーブルに送信する SQL クエリーでの type 値の指定は任意です。
値を指定しない場合には、コネクターは増分スナップショットを実行します。

2

op

イベントタイプを指定します。
スナップショットイベントの値は r で、READ 操作を示します。

2.1.2.3.2. Kafka シグナルチャネルを使用して増分スナップショットをトリガーする

設定済みの Kafka トピック にメッセージを送信して、コネクターにアドホック増分スナップショットの実行を要求することができます。

Kafka メッセージのキーは、topic.prefix コネクター設定オプションの値と一致する必要があります。

メッセージの値は、typedata フィールドが含まれる JSON オブジェクトとなっています。

シグナルタイプは execute-snapshot で、data フィールドには以下のフィールドが必要です。

表2.4 スナップショットデータフィールドの実行
フィールドデフォルト

type

incremental

実行するスナップショットのタイプ。現在、Debezium は incremental および blocking タイプをサポートしています。
詳細については、次のセクションを参照してください。

data-collections

該当なし

スナップショットに含めるテーブルの完全修飾名と一致する、コンマ区切りの正規表現の配列。
signal.data.collection 設定オプションに必要な形式と同じ形式を使用して名前を指定します。

additional-condition

該当なし

コネクターがスナップショットに含めるレコードのサブセットを指定するために評価する条件を指定するオプションの文字列。

注記

このプロパティーは非推奨であり、additional-conditions プロパティーに置き換える必要があります。

additional-conditions

該当なし

コネクターがスナップショットに含めるレコードのサブセットを指定するために評価する基準を指定する、オプションの追加条件の配列。
各追加条件は、アドホックスナップショットがキャプチャーするデータをフィルタリングする基準を指定するオブジェクトです。各追加条件には次のパラメーターを設定できます。data-collection:: フィルターが適用されるテーブルの完全修飾名。各テーブルに異なるフィルターを適用できます。filter:: スナップショットに含めるためにデータベースレコードに存在する必要がある列の値を指定します (例: "color='blue'")。

filter パラメーターに割り当てる値は、ブロッキングスナップショットの snapshot.select.statement.overrides プロパティーを設定するときに SELECT ステートメントの WHERE 句で指定する値と同じタイプです。以前の Debezium リリースでは、スナップショットシグナルに対して明示的な filter パラメーターは定義されていませんでした。代わりに、フィルター条件は、現在非推奨となっている additional-condition パラメーターに指定した値によって暗示されていました。

execute-snapshot Kafka メッセージの例:

Key = `test_connector`

Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`

additional-conditions 付きのアドホック増分スナップショット

Debezium は additional-conditions フィールドを使用してテーブルのコンテンツのサブセットを選択します。

通常、Debezium はスナップショットを実行するときに、次のような SQL クエリーを実行します。

SELECT * FROM <tableName> …​.

スナップショット要求に additional-conditions プロパティーが含まれている場合、プロパティーの data-collection および filter パラメーターが SQL クエリーに追加されます。次に例を示します。

SELECT * FROM <data-collection> WHERE <filter> …​.

たとえば、列 id (プライマリーキー)、color、および brand を含む products テーブルがある場合、スナップショットに color='blue' のコンテンツのみを含める場合は、スナップショットをリクエストするときに、コンテンツをフィルタリングする additional-conditions プロパティーを追加することができます。

Key = `test_connector`

Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "schema1.products" ,"filter":"color='blue'"}]}}`

additional-conditions プロパティーを使用して、複数の列に基づいて条件を渡すことができます。たとえば、前の例と同じ products テーブルを使用して、color='blue' および brand='MyBrand' である products テーブルのコンテンツのみをスナップショットに含める場合は、次のリクエストを送信できます。

Key = `test_connector`

Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "schema1.products" ,"filter":"color='blue' AND brand='MyBrand'"}]}}`
2.1.2.3.3. 増分スナップショットの停止

ソースデータベースのテーブルにシグナルを送信して、増分スナップショットを停止することもできます。SQL INSERT クエリーを送信して、停止スナップショットシグナルをテーブルに送信します。

Debezium はシグナルテーブルの変更を検出した後、シグナルを読み、増分スナップショット操作が進行中であればそれを停止します。

送信するクエリーは、incremental のスナップショット操作を指定し、任意で、削除する実行中のスナップショットのテーブルを指定します。

前提条件

ソースシグナリングチャネルを使用して増分スナップショットを停止する

  1. SQL クエリーを送信して、シグナリングテーブルへのアドホックインクリメンタルスナップショットを停止します。

    INSERT INTO <signalTable> (id, type, data) values ('<id>', 'stop-snapshot', '{"data-collections": ["<tableName>","<tableName>"],"type":"incremental"}');

    以下に例を示します。

    INSERT INTO myschema.debezium_signal (id, type, data) 1
    values ('ad-hoc-1',   2
        'stop-snapshot',  3
        '{"data-collections": ["schema1.table1", "schema2.table2"], 4
        "type":"incremental"}'); 5

    signal コマンドの idtype、および data パラメーターの値は、シグナルテーブルのフィールド に対応します。

    以下の表では、この例のパラメーターを説明しています。

    表2.5 シグナリングテーブルに増分スナップショット停止信号を送信するための SQL コマンドのフィールドの説明
    項目説明

    1

    myschema.debezium_signal

    ソースデータベースにあるシグナルテーブルの完全修飾名を指定します。

    2

    ad-hoc-1

    id パラメーターは、シグナルリクエストの ID 識別子として割り当てられる任意の文字列を指定します。
    この文字列を使用して、シグナルテーブルのエントリーへのログメッセージを特定します。Debezium はこの文字列を使用しません。

    3

    stop-snapshot

    type パラメーターを指定し、シグナルがトリガーする操作を指定します。

    4

    data-collections

    シグナルの data フィールドのオプションコンポーネントで、スナップショットから削除するテーブル名の配列またはテーブル名とマッチする正規表現を指定します。
    この配列は、完全修飾名でテーブルをマッチさせる正規表現をリストアップします。signal.data.collection 設定プロパティーでコネクターのシグナリングテーブル名を指定するのと同じ形式を使用します。data フィールドのこのコンポーネントを省略すると、シグナルは進行中の増分スナップショット全体を停止します。

    5

    incremental

    停止するスナップショット操作のタイプを指定するシグナルの data フィールドの必須コンポーネント。
    現在、有効な唯一のオプションは incremental です。
    type の値を指定しない場合、シグナルは増分スナップショットの停止に失敗します。

2.1.2.3.4. Kafka シグナリングチャネルを使用して増分スナップショットを停止する

シグナルメッセージを 設定済みの Kafka シグナリングトピック に送信して、アドホック増分スナップショットを停止できます。

Kafka メッセージのキーは、topic.prefix コネクター設定オプションの値と一致する必要があります。

メッセージの値は、typedata フィールドが含まれる JSON オブジェクトとなっています。

シグナルタイプは stop-snapshot で、data フィールドには以下のフィールドが必要です。

表2.6 スナップショットデータフィールドの実行
フィールドデフォルト

type

incremental

実行するスナップショットのタイプ。現在、Debezium は incremental 型のみをサポートしています。
詳細は次のセクションを参照してください。

data-collections

該当なし

スナップショットに含めるテーブルの完全修飾名と一致する、コンマ区切りの正規表現のオプションの配列。
signal.data.collection 設定オプションに必要な形式と同じ形式を使用して名前を指定します。

次の例は、典型的な stop-snapshot の Kafka メッセージを示しています。

Key = `test_connector`

Value = `{"type":"stop-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`

2.1.2.4. ブロッキングスナップショット

スナップショットをより柔軟に管理するために、Debezium には ブロッキングスナップショット と呼ばれる補助アドホックスナップショットメカニズムが含まれています。スナップショットをブロックするには、Debezium コネクターにシグナルを送信するための Debezium メカニズムに 依存します。

ブロッキングスナップショットは、ランタイム時にトリガーできることを除いて、初期スナップショット と同じように動作します。

次のような状況では、標準の初期スナップショットプロセスを使用するのではなく、ブロッキングスナップショットを実行する必要があります。

  • 新しいテーブルを追加し、コネクターの実行中にスナップショットを完了したいと考えている。
  • 大きなテーブルを追加し、増分スナップショットよりも短い時間でスナップショットを完了したいと考えている。

ブロッキングスナップショットのプロセス

ブロッキングスナップショットを実行すると、Debezium はストリーミングを停止し、初期スナップショットの時と同じプロセスに従って、指定されたテーブルのスナップショットを開始します。スナップショットが完了すると、ストリーミングが再開されます。

スナップショットの設定

シグナルの data コンポーネントでは、次のプロパティーを設定できます。

  • data-collections: スナップショットする必要のあるテーブルを指定します。
  • additional-conditions: テーブルごとに異なるフィルターを指定できます。

    • data-collection プロパティーは、フィルターが適用されるテーブルの完全修飾名です。
    • filter プロパティーは、snapshot.select.statement.overrides で使用される値と同じになります。

以下に例を示します。

  {"type": "blocking", "data-collections": ["schema1.table1", "schema1.table2"], "additional-conditions": [{"data-collection": "schema1.table1", "filter": "SELECT * FROM [schema1].[table1] WHERE column1 = 0 ORDER BY column2 DESC"}, {"data-collection": "schema1.table2", "filter": "SELECT * FROM [schema1].[table2] WHERE column2 > 0"}]}

重複の可能性

スナップショットをトリガーするシグナルを送信した時点と、ストリーミングが停止してスナップショットが開始する時点との間に遅延が生じる可能性があります。この遅延の結果、スナップショットが完了した後、コネクターはスナップショットがキャプチャーしたレコードと重複するイベントレコードを発行する可能性があります。

2.1.2.5. Debezium Db2 コネクターによる変更データテーブルの読み取り方法

スナップショットの完了後、Debezium Db2 コネクターが初めて起動すると、キャプチャーモードである各ソーステーブルの変更データテーブルを識別します。コネクターは各変更データテーブルに対して以下を行います。

  1. 最後に保存された最大 LSN から現在の最大 LSN の間に作成された変更イベントを読み取ります。
  2. 各イベントのコミット LSN および変更 LSN に従って、変更イベントを順序付けます。これにより、コネクターはテーブルが変更された順序で変更イベントを出力します。
  3. コミット LSN および変更 LSN をオフセットとして Kafka Connect に渡します。
  4. コネクターが Kafka Connect に渡した最大 LSN を保存します。

再起動後、コネクターは停止した場所でオフセット (コミット LSN および変更 LSN) から変更イベントの出力を再開します。コネクターが稼働し、変更イベントを出力している間、キャプチャーモードからテーブルを削除したり、テーブルをキャプチャーモードに追加したりすると、コネクターは変更を検出して、それに合わせて動作を変更します。

2.1.2.6. Debezium Db2 変更イベントレコードを受信する Kafka トピックのデフォルト名

デフォルトでは、Db2 コネクターは、テーブルで発生するすべての INSERTUPDATEDELETE 操作の変更イベントを、そのテーブルに固有の単一の Apache Kafka トピックに書き込みます。コネクターは以下の規則を使用して変更イベントトピックに名前を付けます。

topicPrefix.schemaName.tableName

以下のリストは、デフォルト名のコンポーネントの定義を示しています。

topicPrefix
topic.prefix コネクター設定プロパティーで指定されたトピック接頭辞。
schemaName
操作が発生したスキーマの名前。
tableName
操作が発生したテーブルの名前。

たとえば、MYSCHEMA スキーマに 4 つのテーブル (PRODUCTSPRODUCTS_ON_HANDCUSTOMERSORDERS) を含む mydatabase データベースを使用した Db2 インストールについて考えてみます。コネクターはイベントを以下の 4 つの Kafka トピックに出力します。

  • mydatabase.MYSCHEMA.PRODUCTS
  • mydatabase.MYSCHEMA.PRODUCTS_ON_HAND
  • mydatabase.MYSCHEMA.CUSTOMERS
  • mydatabase.MYSCHEMA.ORDERS

コネクターは同様の命名規則を適用して、内部データベーススキーマの履歴トピック (スキーマ変更トピックトランザクションメタデータトピック) にラベルを付けます。

デフォルトのトピック名が要件を満たさない場合は、カスタムトピック名を設定できます。カスタムトピック名を設定するには、論理トピックルーティング SMT に正規表現を指定します。論理トピックルーティング SMT を使用してトピックの命名をカスタマイズする方法は、トピック ルーティング を参照し てください。

2.1.2.7. Debezium Db2 コネクターによるデータベーススキーマの変更の処理方法

データベースクライアントがデータベースのクエリーを行うと、クライアントはデータベースの現在のスキーマを使用します。しかし、データベーススキーマはいつでも変更が可能です。そのため、挿入、更新、または削除の操作が記録されるたびに、コネクターはどのスキーマであるかを特定できる必要があります。また、コネクターは必ずしも現在のスキーマをすべてのイベントに適用できるとは限りません。イベントが比較的古い場合は、現在のスキーマが適用される前に記録された可能性があります。

スキーマ変更後に発生するイベントを正しく処理するために、Debezium Db2 コネクターは、関連するデータテーブルの構造をミラーリングする Db2 変更データテーブルの構造に基づいて、新しいスキーマのスナップショットを保存します。コネクターは、データベーススキーマ履歴 Kafka トピックに、スキーマ変更の結果 (複数操作の LSN) と合わせてテーブルのスキーマ情報を保存します。コネクターは、保管されたスキーマ表現を使用して、挿入、更新、または削除の各操作時にテーブルの構造を正しくミラーリングする変更イベントを生成します。

クラッシュまたは正常に停止した後にコネクターが再起動すると、最後に読み取った位置から Db2 変更データテーブル内のエントリーの読み取りを再開します。コネクターがデータベーススキーマ履歴トピックから読み取るスキーマ情報を基に、コネクターが再起動する場所に存在したテーブル構造を適用します。

キャプチャーモードの Db2 テーブルのスキーマを更新する場合は、対応する変更テーブルのスキーマも更新することが重要です。データベーススキーマを更新するには、昇格権限のある Db2 データベース管理者である必要があります。Debezium 環境で Db2 データベーススキーマを更新する方法は、スキーマ履歴の進化 を参照してください。

データベーススキーマ履歴トピックは、内部コネクター専用となっています。コネクターは任意で コンシューマーアプリケーションを対象とした別のトピックにスキーマ変更イベントを発行する こともできます。

関連情報

2.1.2.8. Debezium Db2 コネクターのスキーマ変更トピック

Debezium Db2 コネクターを設定すると、データベーステーブルに適用されるスキーマの変更を記述するスキーマ変更イベントを生成できます。

Debezium は、以下の場合にスキーマ変更トピックにメッセージを出力します。

  • 新しいテーブルがキャプチャーモードになる。
  • テーブルがキャプチャーモードから削除される。
  • データベーススキーマの更新 中に、キャプチャーモードであるテーブルのスキーマが変更される。

コネクターはスキーマ変更イベントを、<topicPrefix> という名前の Kafka スキーマ変更トピックに書き込みます。ここで <topicPrefix> は、topic.prefix コネクター設定プロパティーで指定されたトピック接頭辞です。

スキーマ変更イベントのスキーマには、次の要素があります。

name
スキーマ変更イベントメッセージの名前。
type
変更イベントメッセージのタイプ。
version
スキーマのバージョン。バージョンは整数で、スキーマが変更されるたびに増加します。
fields
変更イベントメッセージに含まれるフィールド。

例: Db2 コネクターのスキーマ変更トピックのスキーマ

次の例は、JSON 形式の一般的なスキーマを示しています。

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "string",
        "optional": false,
        "field": "databaseName"
      }
    ],
    "optional": false,
    "name": "io.debezium.connector.db2.SchemaChangeKey",
    "version": 1
  },
  "payload": {
    "databaseName": "inventory"
  }
}

コネクターがスキーマ変更トピックに送信するメッセージには以下の要素などのペイロードが含まれます。

databaseName
ステートメントが適用されるデータベースの名前。databaseName の値は、メッセージキーとして機能します。
pos
ステートメントが表示されるトランザクションログ内の位置。
tableChanges
スキーマの変更後のテーブルスキーマ全体の構造化表現。tableChanges フィールドには、テーブルの各列のエントリーなどのアレイが含まれます。構造化された表現は JSON または Avro 形式でデータを表示するため、コンシューマーは DDL パーサーを介して最初にメッセージを処理しなくてもメッセージを簡単に読み取りできます。
重要

キャプチャーモードであるテーブルでは、コネクターはスキーマ変更トピックにスキーマ変更の履歴だけでなく、内部データベーススキーマ履歴トピックにも格納します。内部データベーススキーマ履歴トピックはコネクターのみの使用を対象としており、使用するアプリケーションによる直接使用を目的としていません。スキーマ変更に関する通知が必要なアプリケーションが、スキーマ変更トピックからの情報のみを使用するようにしてください。

重要

データベーススキーマ履歴トピックをパーティションに分割しないでください。データベーススキーマ履歴トピックが正しく機能するには、コネクターが出力するイベントレコードの一貫したグローバル順序を維持する必要があります。

トピックがパーティション間で分割されないようにするには、以下のいずれかの方法を使用してトピックのパーティション数を設定します。

  • データベーススキーマ履歴トピックを手動で作成する場合は、パーティション数を 1 に指定します。
  • Apache Kafka ブローカーを使用してデータベーススキーマ履歴トピックを自動的に作成する場合に、トピックが作成されるので、Kafka num.partitions 設定オプションの値を 1 に設定します。
警告

コネクターがスキーマ変更トピックに出力するメッセージの形式は、初期の状態であり、通知なしに変更される可能性があります。

例: Db2 コネクターのスキーマ変更トピックに出力されるメッセージ

以下の例は、スキーマ変更トピックのメッセージを示しています。メッセージには、テーブルスキーマの論理表現が含まれます。

{
  "schema": {
  ...
  },
  "payload": {
    "source": {
      "version": "2.7.3.Final",
      "connector": "db2",
      "name": "db2",
      "ts_ms": 0,
      "snapshot": "true",
      "db": "testdb",
      "schema": "DB2INST1",
      "table": "CUSTOMERS",
      "change_lsn": null,
      "commit_lsn": "00000025:00000d98:00a2",
      "event_serial_no": null
    },
    "ts_ms": 1588252618953, 1
    "databaseName": "TESTDB", 2
    "schemaName": "DB2INST1",
    "ddl": null, 3
    "tableChanges": [ 4
      {
        "type": "CREATE", 5
        "id": "\"DB2INST1\".\"CUSTOMERS\"", 6
        "table": { 7
          "defaultCharsetName": null,
          "primaryKeyColumnNames": [ 8
            "ID"
          ],
          "columns": [ 9
            {
              "name": "ID",
              "jdbcType": 4,
              "nativeType": null,
              "typeName": "int identity",
              "typeExpression": "int identity",
              "charsetName": null,
              "length": 10,
              "scale": 0,
              "position": 1,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            },
            {
              "name": "FIRST_NAME",
              "jdbcType": 12,
              "nativeType": null,
              "typeName": "varchar",
              "typeExpression": "varchar",
              "charsetName": null,
              "length": 255,
              "scale": null,
              "position": 2,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            },
            {
              "name": "LAST_NAME",
              "jdbcType": 12,
              "nativeType": null,
              "typeName": "varchar",
              "typeExpression": "varchar",
              "charsetName": null,
              "length": 255,
              "scale": null,
              "position": 3,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            },
            {
              "name": "EMAIL",
              "jdbcType": 12,
              "nativeType": null,
              "typeName": "varchar",
              "typeExpression": "varchar",
              "charsetName": null,
              "length": 255,
              "scale": null,
              "position": 4,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            }
          ],
          "attributes": [ 10
            {
              "customAttribute": "attributeValue"
            }
          ]
        }
      }
    ]
  }
}
表2.7 スキーマ変更トピックに出力されたメッセージのフィールドの説明
項目フィールド名説明

1

ts_ms

コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。

ソースオブジェクトの ts_ms は、データベースで変更が行われた時刻を示す。payload.source.ts_ms の値を payload.ts_ms の値と比較することにより、ソースデータベースの更新と Debezium との間の遅延を判断できます。

2

databaseName
schemaName

変更が含まれるデータベースとスキーマを識別します。

3

ddl

Db2 コネクターの場合は常に null です。その他のコネクターでは、このフィールドにスキーマの変更を行う DDL が含まれます。この DDL は Db2 コネクターでは使用できません。

4

tableChanges

DDL コマンドによって生成されるスキーマの変更が含まれる 1 つ以上の項目の配列。

5

type

変更の種類を説明します。値は以下のいずれかになります。

  • CREATE - テーブルの作成
  • ALTER - テーブルの変更
  • DROP - テーブルの削除

6

id

作成、変更、または破棄されたテーブルの完全な識別子。

7

table

適用された変更後のテーブルメタデータを表します。

8

primaryKeyColumnNames

テーブルのプライマリーキーを設定する列のリスト。

9

変更されたテーブルの各列のメタデータ。

10

attributes

各テーブル変更のカスタム属性メタデータ。

コネクターがスキーマ変更トピックに送信するメッセージでは、メッセージキーはスキーマの変更が含まれるデータベースの名前です。以下の例では、payload フィールドにキーが含まれます。

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "string",
        "optional": false,
        "field": "databaseName"
      }
    ],
    "optional": false,
    "name": "io.debezium.connector.db2.SchemaChangeKey",
    "version": 1
  },
  "payload": {
    "databaseName": "TESTDB"
  }
}

2.1.2.9. トランザクション境界を表す Debezium Db2 コネクターによって生成されたイベント

Debezium は、トランザクション境界を表し、変更データイベントメッセージをエンリッチするイベントを生成できます。

Debezium がトランザクションメタデータを受信する場合の制限

Debezium は、コネクターのデプロイ後に発生するトランザクションに対してのみメタデータを登録し、受信します。コネクターをデプロイする前に発生するトランザクションのメタデータは利用できません。

Debezium は、すべてのトランザクションで BEGIN および END 区切り文字のトランザクション境界イベントを生成します。トランザクション境界イベントには以下のフィールドが含まれます。

status
BEGIN または END
id
一意のトランザクション識別子の文字列表現。
ts_ms
データソースでのトランザクション境界イベント (BEGIN または END イベント) の時間。もしデータソースが Debezium にイベント時間を提供しないなら、このフィールドは代わりに Debezium がイベントを処理する時間を表します。
event_count (END イベント用)
トランザクションによって出力されるイベントの合計数。
data_collections (END イベント用)
data_collectionevent_count 要素のペアの配列。これは、コネクターがデータコレクションから発信された変更に対して出力するイベントの数を示します。

{
  "status": "BEGIN",
  "id": "00000025:00000d08:0025",
  "ts_ms": 1486500577125,
  "event_count": null,
  "data_collections": null
}

{
  "status": "END",
  "id": "00000025:00000d08:0025",
  "ts_ms": 1486500577691,
  "event_count": 2,
  "data_collections": [
    {
      "data_collection": "testDB.dbo.tablea",
      "event_count": 1
    },
    {
      "data_collection": "testDB.dbo.tableb",
      "event_count": 1
    }
  ]
}

topic.transaction オプションで上書きされない限り、コネクターはトランザクションイベントを <topic.prefix>.transaction トピックに出力します。

データ変更イベントのエンリッチメント

トランザクションメタデータを有効にすると、コネクターは変更イベント Envelope を新しい transaction フィールドでエンリッチします。このフィールドは、複合フィールドの形式ですべてのイベントに関する情報を提供します。

id
一意のトランザクション識別子の文字列表現。
total_order
トランザクションによって生成されたすべてのイベントを対象とするイベントの絶対位置。
data_collection_order
トランザクションによって出力されたすべてのイベントを対象とするイベントのデータコレクションごとの位置。

以下は、メッセージの例になります。

{
  "before": null,
  "after": {
    "pk": "2",
    "aa": "1"
  },
  "source": {
...
  },
  "op": "c",
  "ts_ms": "1580390884335",
  "ts_us": "1580390884335875",
  "ts_ns": "1580390884335875412",
  "transaction": {
    "id": "00000025:00000d08:0025",
    "total_order": "1",
    "data_collection_order": "1"
  }
}

2.1.3. Debezium Db2 コネクターのデータ変更イベントの説明

Debezium Db2 コネクターは、行レベルの INSERTUPDATE、および DELETE 操作ごとにデータ変更イベントを生成します。各イベントにはキーと値が含まれます。キーと値の構造は、変更されたテーブルによって異なります。

Debezium および Kafka Connect は、イベントメッセージの継続的なストリーム を中心として設計されています。ただし、これらのイベントの構造は時間の経過とともに変化する可能性があり、コンシューマーによる処理が困難になることがあります。これに対応するために、各イベントにはコンテンツのスキーマが含まれます。スキーマレジストリーを使用している場合は、コンシューマーがレジストリーからスキーマを取得するために使用できるスキーマ ID が含まれます。これにより、各イベントが自己完結型になります。

以下のスケルトン JSON は、変更イベントの基本となる 4 つの部分を示しています。ただし、アプリケーションで使用するために選択した Kafka Connect コンバーターの設定方法によって、変更イベントのこれら 4 部分の表現が決定されます。schema フィールドは、変更イベントが生成されるようにコンバーターを設定した場合のみ変更イベントに含まれます。同様に、イベントキーおよびイベントペイロードは、変更イベントが生成されるようにコンバーターを設定した場合のみ変更イベントに含まれます。JSON コンバーターを使用し、変更イベントの基本となる 4 つの部分すべてを生成するように設定すると、変更イベントの構造は次のようになります。

{
 "schema": { 1
   ...
  },
 "payload": { 2
   ...
 },
 "schema": { 3
   ...
 },
 "payload": { 4
   ...
 },
}
表2.8 変更イベントの基本内容の概要
項目フィールド名説明

1

schema

最初の schema フィールドはイベントキーの一部です。イベントキーの payload の部分の内容を記述する Kafka Connect スキーマを指定します。つまり、最初の schema フィールドは、変更されたテーブルのプライマリーキーの構造、またはテーブルにプライマリーキーがない場合は変更されたテーブルの一意キーの構造を記述します。

message.key.columnsコネクター設定プロパティー を設定すると、テーブルのプライマリーキーをオーバーライドできます。この場合、最初の schema フィールドはそのプロパティーによって識別されるキーの構造を記述します。

2

payload

最初の payload フィールドはイベントキーの一部です。前述の schema フィールドによって記述された構造を持ち、変更された行のキーが含まれます。

3

schema

2 つ目の schema フィールドはイベント値の一部です。イベント値の payload の部分の内容を記述する Kafka Connect スキーマを指定します。つまり、2 つ目の schema は変更された行の構造を記述します。通常、このスキーマには入れ子になったスキーマが含まれます。

4

payload

2 つ目の payload フィールドはイベント値の一部です。前述の schema フィールドによって記述された構造を持ち、変更された行の実際のデータが含まれます。

デフォルトでは、コネクターによって、変更イベントレコードがイベントの元のテーブルと同じ名前を持つトピックにストリーミングされます。詳細は、トピック名 を参照してください。

警告

Debezium Db2 コネクターは、すべての Kafka Connect スキーマ名が Avro スキーマ名の形式 に準拠するようにします。つまり、論理サーバー名はアルファベットまたはアンダースコア (a-z、A-Z、または _) で始まる必要があります。論理サーバー名の残りの各文字と、データベース名とテーブル名の各文字は、アルファベット、数字、またはアンダースコア ( a-z、A-Z、0-9、または \_) でなければなりません。無効な文字がある場合は、アンダースコアに置き換えられます。

論理サーバー名、データベース名、またはテーブル名に無効な文字が含まれ、名前を区別する唯一の文字が無効であると、無効な文字はすべてアンダースコアに置き換えられるため、予期せぬ競合が発生する可能性があります。

また、データベース、スキーマ、およびテーブルの Db2 名では、大文字と小文字を区別することができます。つまり、コネクターは同じ Kafka トピックに複数のテーブルのイベントレコードを出力できます。

詳細は以下を参照してください。

2.1.3.1. Debezium db2 変更イベントのキー

変更イベントのキーには、変更されたテーブルのキーのスキーマと、変更された行の実際のキーのスキーマが含まれます。スキーマとそれに対応するペイロードの両方には、コネクターによってイベントが作成された時点において、変更されたテーブルの PRIMARY KEY (または一意の制約) に存在した各列のフィールドが含まれます。

以下の customers テーブルについて考えてみましょう。この後に、このテーブルの変更イベントキーの例を示します。

テーブルの例

CREATE TABLE customers (
 ID INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY,
 FIRST_NAME VARCHAR(255) NOT NULL,
 LAST_NAME VARCHAR(255) NOT NULL,
 EMAIL VARCHAR(255) NOT NULL UNIQUE
);

変更イベントキーの例

customers テーブルへの変更をキャプチャーする変更イベントのすべてに、イベントキースキーマがあります。customers テーブルに前述の定義がある限り、customers テーブルへの変更をキャプチャーする変更イベントのキー構造はすべて以下のようになります。JSON では、以下のようになります。

{
    "schema": {  1
        "type": "struct",
        "fields": [  2
            {
                "type": "int32",
                "optional": false,
                "field": "ID"
            }
        ],
        "optional": false,  3
        "name": "mydatabase.MYSCHEMA.CUSTOMERS.Key"  4
    },
    "payload": {  5
        "ID": 1004
    }
}
表2.9 変更イベントキーの説明
項目フィールド名説明

1

schema

キーのスキーマ部分は、キーの payload 部分の内容を記述する Kafka Connect スキーマを指定します。

2

fields

各フィールドの名前、型、および必要かどうかなど、payload で想定される各フィールドを指定します。

3

任意

イベントキーの payload フィールドに値が含まれる必要があるかどうかを示します。この例では、キーのペイロードに値が必要です。テーブルにプライマリーキーがない場合は、キーの payload フィールドの値は任意です。

4

mydatabase.MYSCHEMA.CUSTOMERS.Key

キーのペイロードの構造を定義するスキーマの名前。このスキーマは、変更されたテーブルのプライマリーキーの構造を記述します。キースキーマ名の形式は connector-name.database-name.table-name.Key です。この例では、以下のようになります。

  • mydatabase はこのイベントを生成したコネクターの名前です。
  • MYSCHEMA は変更されたテーブルが含まれるデータベーススキーマです。
  • CUSTOMERS は更新されたテーブルです。

5

payload

この変更イベントが生成された行のキーが含まれます。この例では、キーには値が 1004 の 1 つの ID フィールドが含まれます。

2.1.3.2. Debezium Db2 変更イベントの値

変更イベントの値はキーよりも若干複雑です。キーと同様に、値には schema セクションと payload セクションがあります。schema セクションには、入れ子のフィールドを含む、Envelope セクションの payload 構造を記述するスキーマが含まれています。データを作成、更新、または削除する操作のすべての変更イベントには、Envelope 構造を持つ値 payload があります。

変更イベントキーの例を紹介するために使用した、同じサンプルテーブルについて考えてみましょう。

テーブルの例

CREATE TABLE customers (
 ID INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY,
 FIRST_NAME VARCHAR(255) NOT NULL,
 LAST_NAME VARCHAR(255) NOT NULL,
 EMAIL VARCHAR(255) NOT NULL UNIQUE
);

customers テーブルのすべての変更イベントのイベント値部分は同じスキーマを指定します。イベント値のペイロードは、イベント型によって異なります。

作成 イベント

以下の例は、customers テーブルにデータを作成する操作に対して、コネクターによって生成される変更イベントの値の部分を示しています。

{
  "schema": {  1
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "ID"
          },
          {
            "type": "string",
            "optional": false,
            "field": "FIRST_NAME"
          },
          {
            "type": "string",
            "optional": false,
            "field": "LAST_NAME"
          },
          {
            "type": "string",
            "optional": false,
            "field": "EMAIL"
          }
        ],
        "optional": true,
        "name": "mydatabase.MYSCHEMA.CUSTOMERS.Value",  2
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "ID"
          },
          {
            "type": "string",
            "optional": false,
            "field": "FIRST_NAME"
          },
          {
            "type": "string",
            "optional": false,
            "field": "LAST_NAME"
          },
          {
            "type": "string",
            "optional": false,
            "field": "EMAIL"
          }
        ],
        "optional": true,
        "name": "mydatabase.MYSCHEMA.CUSTOMERS.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "connector"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_us"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_ns"
          },
          {
            "type": "boolean",
            "optional": true,
            "default": false,
            "field": "snapshot"
          },
          {
            "type": "string",
            "optional": false,
            "field": "db"
          },
          {
            "type": "string",
            "optional": false,
            "field": "schema"
          },
          {
            "type": "string",
            "optional": false,
            "field": "table"
          },
          {
            "type": "string",
            "optional": true,
            "field": "change_lsn"
          },
          {
            "type": "string",
            "optional": true,
            "field": "commit_lsn"
          },
        ],
        "optional": false,
        "name": "io.debezium.connector.db2.Source",  3
        "field": "source"
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_us"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ns"
      }
    ],
    "optional": false,
    "name": "mydatabase.MYSCHEMA.CUSTOMERS.Envelope"  4
  },
  "payload": {  5
    "before": null,  6
    "after": {  7
      "ID": 1005,
      "FIRST_NAME": "john",
      "LAST_NAME": "doe",
      "EMAIL": "john.doe@example.org"
    },
    "source": {  8
      "version": "2.7.3.Final",
      "connector": "db2",
      "name": "myconnector",
      "ts_ms": 1559729468470,
      "ts_us": 1559729468470476,
      "ts_ns": 1559729468470476000,
      "snapshot": false,
      "db": "mydatabase",
      "schema": "MYSCHEMA",
      "table": "CUSTOMERS",
      "change_lsn": "00000027:00000758:0003",
      "commit_lsn": "00000027:00000758:0005",
    },
    "op": "c",  9
    "ts_ms": 1559729471739,  10
    "ts_us": 1559729471739762,  11
    "ts_ns": 1559729471739762314  12
  }
}
表2.10 作成 イベント値フィールドの説明
項目フィールド名説明

1

schema

値のペイロードの構造を記述する、値のスキーマ。変更イベントの値スキーマは、コネクターが特定のテーブルに生成するすべての変更イベントで同じになります。

2

name

スキーマ セクションで、各 name フィールドは、値のペイロードのフィールドのスキーマを指定します。

mydatabase.MYSCHEMA.CUSTOMERS.Value はペイロードのbefore および after フィールドのスキーマです。このスキーマは customers テーブルに固有です。コネクターは、MYSCHEMA.CUSTOMERS テーブルのすべての行に対してこのスキーマを使用します。

before フィールドおよび after フィールドのスキーマ名はlogicalName.schemaName.tableName.Value の形式を取るので、スキーマ名がデータベースで一意になるようにします。つまり、Avro コンバーター を使用する場合、各論理ソースの各テーブルの Avro スキーマには独自の進化と履歴があります。

3

name

io.debezium.connector.db2.Source は、ペイロードの source フィールドのスキーマです。このスキーマは Db2 コネクターに固有です。コネクターは生成するすべてのイベントにこれを使用します。

4

name

mydatabase.MYSCHEMA.CUSTOMERS.Envelope は、ペイロードの全体的な構造のスキーマです。mydatabase はデータベース、MYSCHEMA はスキーマ、CUSTOMERS はテーブルです。

5

payload

値の実際のデータ。これは、変更イベントが提供する情報です。

イベントの JSON 表現はそれが記述する行よりもはるかに大きいように見えることがあります。これは、JSON 表現にはメッセージのスキーマ部分とペイロード部分を含める必要があるためです。しかし、Avro コンバーター を使用すると、コネクターが Kafka トピックにストリーミングするメッセージのサイズを大幅に小さくすることができます。

6

before

イベント発生前の行の状態を指定する任意のフィールド。この例のように、op フィールドが create (作成) の c である場合、この変更イベントは新しい内容に対するものであるため、beforenull になります。

7

after

イベント発生後の行の状態を指定する任意のフィールド。この例では、after フィールドには、新しい行の IDFIRST_NAMELAST_NAME、および EMAIL 列の値が含まれます。

8

source

イベントのソースメタデータを記述する必須のフィールド。source 構造には、この変更に関する Db2 の情報が示され、トレーサビリティーが提供されます。また、同じトピックや他のトピックの他のイベントと比較する情報もあり、このイベントが他のイベントの前または後に発生したか、あるいはこのイベントが他のイベントと同じコミットの一部であるかを認識できます。ソースメタデータには以下が含まれています。

  • Debezium バージョン
  • コネクター型および名前
  • データベースに変更が加えられた時点のタイムスタンプ
  • イベントが進行中のスナップショットの一部であるかどうか
  • 新しい行が含まれるデータベース、スキーマ、およびテーブルの名前
  • 変更 LSN
  • コミット LSN (このイベントがスナップショットの一部である場合は省略)

9

op

コネクターによってイベントが生成される原因となった操作の型を記述する必須文字列。この例では、c は操作によって行が作成されたことを示しています。有効な値は以下のとおりです。

  • c = create
  • u = update
  • d = delete
  • r = read (読み取り、スナップショットのみに適用)

10

ts_ms, ts_us, ts_ns

コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。

source オブジェクトで、ts_ms は変更がデータベースに加えられた時間を示します。payload.source.ts_ms の値を payload.ts_ms の値と比較することにより、ソースデータベースの更新と Debezium との間の遅延を判断できます。

更新イベント

サンプル customers テーブルにある更新の変更イベントの値には、そのテーブルの 作成 イベントと同じスキーマがあります。同様に、更新イベント値のペイロードは同じ構造を持ちます。ただし、イベント値ペイロードでは 更新 イベントに異なる値が含まれます。以下は、コネクターによって customers テーブルでの更新に生成されるイベントの変更イベント値の例になります。

{
  "schema": { ... },
  "payload": {
    "before": {  1
      "ID": 1005,
      "FIRST_NAME": "john",
      "LAST_NAME": "doe",
      "EMAIL": "john.doe@example.org"
    },
    "after": {  2
      "ID": 1005,
      "FIRST_NAME": "john",
      "LAST_NAME": "doe",
      "EMAIL": "noreply@example.org"
    },
    "source": {  3
      "version": "2.7.3.Final",
      "connector": "db2",
      "name": "myconnector",
      "ts_ms": 1559729995937,
      "ts_us": 1559729995937497,
      "ts_ns": 1559729995937497000,
      "snapshot": false,
      "db": "mydatabase",
      "schema": "MYSCHEMA",
      "table": "CUSTOMERS",
      "change_lsn": "00000027:00000ac0:0002",
      "commit_lsn": "00000027:00000ac0:0007",
    },
    "op": "u",  4
    "ts_ms": 1559729998706,  5
    "ts_us": 1559729998706647,  6
    "ts_ns": 1559729998706647825  7
  }
}
表2.11 更新 イベント値フィールドの説明
項目フィールド名説明

1

before

イベント発生前の行の状態を指定する任意のフィールド。更新 イベント値の before フィールドには、各テーブル列のフィールドと、データベースのコミット前にその列にあった値が含まれます。この例では、EMAIL の値が EMAIL value is john.doe@example.com であることに注意してください。

2

after

イベント発生後の行の状態を指定する任意のフィールド。beforeafter の構造を比較すると、この行への更新内容を判断できます。この例では、EMAILの値が noreply@example.com となっています。

3

source

イベントのソースメタデータを記述する必須のフィールド。source フィールド構造には 作成 イベントと同じフィールドが含まれますが、一部の値が異なります。たとえば、更新 イベントサンプルの LSN は異なります。この情報を使用して、このイベントを他のイベントと比較し、このイベントが他のイベントの前または後に発生したか、あるいはこのイベントが他のイベントと同じコミットの一部であるかを認識できます。ソースメタデータには以下が含まれています。

  • Debezium バージョン
  • コネクター型および名前
  • データベースに変更が加えられた時点のタイムスタンプ
  • イベントが進行中のスナップショットの一部であるかどうか
  • 新しい行が含まれるデータベース、スキーマ、およびテーブルの名前
  • 変更 LSN
  • コミット LSN (このイベントがスナップショットの一部である場合は省略)

4

op

操作の型を記述する必須の文字列。更新 イベントの値では、op フィールドの値は u で、更新によってこの行が変更したことを示します。

5

ts_ms, ts_us, ts_ns

コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。

source オブジェクトで、ts_ms は変更がデータベースに加えられた時間を示します。payload.source.ts_ms の値を payload.ts_ms の値と比較することにより、ソースデータベースの更新と Debezium との間の遅延を判断できます。

注記

行のプライマリーキー/一意キーの列を更新すると、行のキーの値が変更されます。キーが変更されると、3 つのイベントが Debezium によって出力されます。3 つのイベントとは、DELETE イベント、行の古いキーを持つ 廃棄 (tombstone)、およびそれに続く行の新しいキーを持つイベントです。

削除 イベント

削除 変更イベントの値は、同じテーブルの 作成 および 更新 イベントと同じ schema の部分になります。サンプル customers テーブルの 削除 イベントのイベント値 payload は以下のようになります。

{
  "schema": { ... },
  },
  "payload": {
    "before": {  1
      "ID": 1005,
      "FIRST_NAME": "john",
      "LAST_NAME": "doe",
      "EMAIL": "noreply@example.org"
    },
    "after": null,  2
    "source": {  3
      "version": "2.7.3.Final",
      "connector": "db2",
      "name": "myconnector",
      "ts_ms": 1559730445243,
      "ts_us": 1559730445243482,
      "ts_ns": 1559730445243482000,
      "snapshot": false,
      "db": "mydatabase",
      "schema": "MYSCHEMA",
      "table": "CUSTOMERS",
      "change_lsn": "00000027:00000db0:0005",
      "commit_lsn": "00000027:00000db0:0007"
    },
    "op": "d",  4
    "ts_ms": 1559730450205,  5
    "ts_us": 1559730450205521,  6
    "ts_ns": 1559730450205521475  7
  }
}
表2.12 削除 イベント値フィールドの説明
項目フィールド名説明

1

before

イベント発生前の行の状態を指定する任意のフィールド。削除 イベント値の before フィールドには、データベースのコミットで削除される前に行にあった値が含まれます。

2

after

イベント発生後の行の状態を指定する任意のフィールド。削除 イベント値の after フィールドは null で、行が存在しないことを示します。

3

source

イベントのソースメタデータを記述する必須のフィールド。削除 イベント値の source フィールド構造は、同じテーブルの 作成 および 更新 イベントと同じになります。多くの source フィールドの値も同じです。削除 イベント値では、ts_ms および LSN フィールドの値や、その他の値が変更された可能性があります。ただし、削除 イベント値の source フィールドは、同じメタデータを提供します。

  • Debezium バージョン
  • コネクター型および名前
  • データベースに変更が加えられた時点のタイムスタンプ
  • イベントが進行中のスナップショットの一部であるかどうか
  • 新しい行が含まれるデータベース、スキーマ、およびテーブルの名前
  • 変更 LSN
  • コミット LSN (このイベントがスナップショットの一部である場合は省略)

4

op

操作の型を記述する必須の文字列。op フィールドの値は d で、行が削除されたことを示します。

5

ts_ms, ts_us, ts_ns

コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。

source オブジェクトで、ts_ms は変更がデータベースに加えられた時間を示します。payload.source.ts_ms の値を payload.ts_ms の値と比較することにより、ソースデータベースの更新と Debezium との間の遅延を判断できます。

削除 変更イベントレコードは、この行の削除を処理するために必要な情報を持つコンシューマーを提供します。コンシューマーによっては、削除を適切に処理するために古い値が必要になることがあるため、古い値が含まれます。

Db2 コネクターイベントは、Kafka のログコンパクション と動作するように設計されています。ログコンパクションにより、少なくとも各キーの最新のメッセージが保持される限り、一部の古いメッセージを削除できます。これにより、トピックに完全なデータセットが含まれ、キーベースの状態のリロードに使用できるようにするとともに、Kafka がストレージ領域を確保できるようにします。

行が削除された場合でも、Kafka は同じキーを持つ以前のメッセージをすべて削除できるため、削除 イベントの値はログコンパクションで動作します。ただし、Kafka が同じキーを持つすべてのメッセージを削除するには、メッセージの値が null である必要があります。これを可能にするために、Debezium の Db2 コネクターは 削除 イベントを出力した後に、null 値以外で同じキーを持つ特別な廃棄 (tombstone) イベントを出力します。

2.1.4. Debezium Db2 コネクターによるデータ型のマッピング方法

Db2 がサポートするデータ型の詳細は、Db2 ドキュメントの Data Types を参照してください。

Db2 コネクターは、行が存在するテーブルのように構造化されたイベントで行への変更を表します。イベントには、各列の値のフィールドが含まれます。その値がどのようにイベントで示されるかは、列の Db2 のデータ型によって異なります。ここでは、これらのマッピングを説明します。デフォルトのデータ型変換がニーズを満たさない場合は、コネクター用 の カスタムコンバーター を作成 できます。

詳細は以下を参照してください。

基本型

以下の表では、各 Db2 データ型をイベントフィールドの リテラル型 および セマンティック型にマッピングする方法を説明します。

  • literal type は、Kafka Connect スキーマタイプ (INT8INT16INT32INT64FLOAT32FLOAT64BOOLEANSTRINGBYTESARRAYMAPSTRUCT) を使用して、値がどのように表現されるかを記述します。
  • セマンティック型 は、フィールドの Kafka Connect スキーマの名前を使用して、Kafka Connect スキーマがフィールドの 意味 をキャプチャーする方法を記述します。
表2.13 Db2 の基本データ型のマッピング
DB2 データ型リテラル型 (スキーマ型)セマンティック型 (スキーマ名) および注記

BOOLEAN

BOOLEAN

BOOLEAN 型の列のあるテーブルからのみスナップショットを作成できます。現在、Db2 での SQL レプリケーションは BOOLEAN をサポートしないため、Debezium はこれらのテーブルで CDC を実行できません。別の型の使用を検討してください。

BIGINT

INT64

該当なし

BINARY

BYTES

該当なし

BLOB

BYTES

該当なし

CHAR[(N)]

STRING

該当なし

CLOB

STRING

該当なし

DATE

INT32

io.debezium.time.Date

タイムゾーン情報のないタイムスタンプの文字列表現

DECFLOAT

BYTES

org.apache.kafka.connect.data.Decimal

DECIMAL

BYTES

org.apache.kafka.connect.data.Decimal

DBCLOB

STRING

該当なし

DOUBLE

FLOAT64

該当なし

INTEGER

INT32

該当なし

REAL

FLOAT32

該当なし

SMALLINT

INT16

該当なし

TIME

INT32

io.debezium.time.Time

タイムゾーン情報のない時刻の文字列表現

TIMESTAMP

INT64

io.debezium.time.MicroTimestamp

タイムゾーン情報のないタイムスタンプの文字列表現

VARBINARY

BYTES

該当なし

VARCHAR[(N)]

STRING

該当なし

VARGRAPHIC

STRING

該当なし

XML

STRING

io.debezium.data.Xml

XML ドキュメントの文字列表現が含まれます。

列のデフォルト値がある場合は、対応するフィールドの Kafka Connect スキーマに伝達されます。明示的な列値が指定されない限り、変更イベントにはフィールドのデフォルト値が含まれます。そのため、スキーマからデフォルト値を取得する必要はほとんどありません。

時間型

タイムゾーン情報を含む DATETIMEOFFSET データタイプを除き、Db2 は time.precision.mode コネクター設定プロパティーの値に基づいて時間型をマップします。ここでは、以下のマッピングを説明します。

time.precision.mode=adaptive

time.precision.mode 設定プロパティーがデフォルトの adaptive に設定された場合、コネクターは列のデータ型定義に基づいてリテラル型とセマンティック型を決定します。これにより、イベントがデータベースの値を 正確 に表すようになります。

表2.14 time.precision.mode が adaptive の場合のマッピング
DB2 データ型リテラル型 (スキーマ型)セマンティック型 (スキーマ名) および注記

DATE

INT32

io.debezium.time.Date

エポックからの日数を表します。

TIME(0), TIME(1), TIME(2), TIME(3)

INT32

io.debezium.time.Time

午前 0 時から経過した時間をミリ秒で表し、タイムゾーン情報は含まれません。

TIME(4), TIME(5), TIME(6)

INT64

io.debezium.time.MicroTime

午前 0 時から経過した時間をマイクロ秒で表し、タイムゾーン情報は含まれません。

TIME(7)

INT64

io.debezium.time.NanoTime

午前 0 時から経過した時間をナノ秒で表し、タイムゾーン情報は含まれません。

DATETIME

INT64

io.debezium.time.Timestamp

エポックからの経過時間をミリ秒で表し、タイムゾーン情報は含まれません。

time.precision.mode=connect

time.precision.mode 設定プロパティーが connect に設定された場合、コネクターは Kafka Connect の論理型を使用します。これは、コンシューマーが組み込みの Kafka Connect の論理型のみを処理でき、可変精度の時間値を処理できない場合に便利です。ただし、Db2 はマイクロ秒の 10 分の 1 の精度をサポートするため、connect 時間精度を指定してコネクターによって生成されたイベントは、データベース列の少数秒の精度値が 3 よりも大きい場合に、精度が失われます

表2.15 time.precision.mode がconnect の場合のマッピング
DB2 データ型リテラル型 (スキーマ型)セマンティック型 (スキーマ名) および注記

DATE

INT32

org.apache.kafka.connect.data.Date

エポックからの日数を表します。

TIME([P])

INT64

org.apache.kafka.connect.data.Time

午前 0 時からの経過時間をミリ秒で表し、タイムゾーン情報は含まれません。Db2 では、範囲が 0 - 7 の P が許可され、マイクロ秒の 10 分の 1 の精度まで保存されますが、P が 3 よりも大きい場合は、このモードでは精度が失われます。

DATETIME

INT64

org.apache.kafka.connect.data.Timestamp

エポックからの経過時間をミリ秒で表し、タイムゾーン情報は含まれません。

タイムスタンプ型

DATETIME タイプは、タイムゾーン情報のないタイムスタンプを表します。このような列は、UTC を基にして同等の Kafka Connect 値に変換されます。たとえば、"2018-06-20 15:13:16.945104" という DATETIME 値は、"1529507596000" という値の io.debezium.time.Timestamp で表されます。

Kafka Connect および Debezium を実行している JVM のタイムゾーンは、この変換には影響しません。

表2.16 10 進数型
DB2 データ型リテラル型 (スキーマ型)セマンティック型 (スキーマ名) および注記

NUMERIC[(P[,S])]

BYTES

org.apache.kafka.connect.data.Decimal

scale スキーマパラメーターには、小数点を移動した桁数を表す整数が含まれます。connect.decimal.precision スキーマパラメーターには、指定の 10 進数値の精度を表す整数が含まれます。

DECIMAL[(P[,S])]

BYTES

org.apache.kafka.connect.data.Decimal

scale スキーマパラメーターには、小数点を移動した桁数を表す整数が含まれます。connect.decimal.precision スキーマパラメーターには、指定の 10 進数値の精度を表す整数が含まれます。

2.1.5. Debezium コネクターを実行するための Db2 の設定

Db2 テーブルにコミットされた変更イベントを Debezium がキャプチャーするには、必要な権限を持つ Db2 データベース管理者が、変更データキャプチャー (CDC) のデータベースでテーブルを設定する必要があります。Debezium の実行を開始した後、キャプチャーエージェントの設定を調整してパフォーマンスを最適化できます。

Debezium コネクターと使用するために Db2 を設定する場合の詳細は、以下を参照してください。

2.1.5.1. 変更データキャプチャーの Db2 テーブルの設定

テーブルをキャプチャーモードにするために、Debezium ではユーザー定義関数 (UDF) のセットが提供されます。ここでは、これらの管理 UDF をインストールおよび実行する手順を説明します。また、Db2 制御コマンドを実行してテーブルをキャプチャーモードにすることもできます。その後、管理者は Debezium がキャプチャーする各テーブルに対して、CDC を有効にする必要があります。

前提条件

  • db2instl ユーザーとして Db2 にログインしている。
  • Db2 ホストの $HOME/asncdctools/src ディレクトリーで Debezium 管理 UDF を使用できる。UDF は Debezium サンプルリポジトリー から入手できます。
  • Db2 コマンド bldrtn が PATH 上にある。たとえば、export PATH=$PATH:/opt/ibm/db2/V11.5.0.0/samples/c/ を Db2 11.5 で実行する。

手順

  1. Db2 で提供される bldrtn コマンドを使用して、Db2 サーバーホストで Debezium 管理 UDF をコンパイルします。

    cd $HOME/asncdctools/src
    bldrtn asncdc
  2. データベースが稼働していない場合は起動します。DB_NAME は、Debezium が接続するデータベースの名前に置き換えます。

    db2 start db DB_NAME
  3. JDBC が Db2 メタデータカタログを読み取りできるようにします。

    cd $HOME/sqllib/bnd
    db2 connect to DB_NAME
    db2 bind db2schema.bnd blocking all grant public sqlerror continue
  4. データベースが最近バックアップされたことを確認します。ASN エージェントには、読み取りを始める最新の開始点が必要です。バックアップを実行する必要がある場合は、以下のコマンドを実行して、最新のバージョンのみを利用できるようにデータをプルーニングします。古いバージョンのデータを保持する必要がない場合は、バックアップの場所に dev/null を指定します。

    1. データベースをバックアップします。DB_NAME および BACK_UP_LOCATION を適切な値に置き換えます。

      db2 backup db DB_NAME to BACK_UP_LOCATION
    2. データベースを再起動します。

      db2 restart db DB_NAME
  5. データベースに接続して、Debezium 管理 UDF をインストールします。db2instl ユーザーとしてログインしていることを前提とするため、UDF が db2inst1 ユーザーにインストールされている必要があります。

    db2 connect to DB_NAME
  6. Debezium 管理 UDF をコピーし、その権限を設定します。

    cp $HOME/asncdctools/src/asncdc $HOME/sqllib/function
    chmod 777 $HOME/sqllib/function
  7. ASN キャプチャーエージェントを開始および停止する Debezium UDF を有効にします。

    db2 -tvmf $HOME/asncdctools/src/asncdc_UDF.sql
  8. ASN 制御テーブルを作成します。

    $ db2 -tvmf $HOME/asncdctools/src/asncdctables.sql
  9. テーブルをキャプチャーモードに追加し、キャプチャーモードからテーブルを削除する Debezium UDF を有効にします。

    $ db2 -tvmf $HOME/asncdctools/src/asncdcaddremove.sql

    Db2 サーバーを設定したら、UDF を使用して SQL コマンドで Db2 レプリケーション (ASN) を制御します。UDF によっては戻り値が必要な場合があります。この場合、SQL の VALUE ステートメントを使用して呼び出します。その他の UDF には、SQL の CALL ステートメントを使用します。

  10. SQL クライアントから ASN エージェントを起動します。

    VALUES ASNCDC.ASNCDCSERVICES('start','asncdc');

    または、シェルから以下を行います。

    db2 "VALUES ASNCDC.ASNCDCSERVICES('start','asncdc');"

    前述のステートメントは、以下のいずれかの結果を返します。

    • asncap is already running
    • start --> <COMMAND>

      この場合は、以下の例のように、ターミナルウィンドウに指定の <COMMAND> を入力します。

      /database/config/db2inst1/sqllib/bin/asncap capture_schema=asncdc capture_server=SAMPLE &
  11. テーブルをキャプチャーモードにします。キャプチャーする各テーブルに対して、以下のステートメントを呼び出します。MYSCHEMA は、キャプチャーモードにするテーブルが含まれるスキーマの名前に置き換えます。同様に、MYTABLE は、キャプチャーモードにするテーブルの名前に置き換えます。

    CALL ASNCDC.ADDTABLE('MYSCHEMA', 'MYTABLE');
  12. ASN サービスを再初期化します。

    VALUES ASNCDC.ASNCDCSERVICES('reinit','asncdc');

2.1.5.2. Db2 キャプチャーエージェント設定のサーバー負荷およびレイテンシーへの影響

データベース管理者がソーステーブルに対して変更データキャプチャーを有効にすると、キャプチャーエージェントの実行が開始されます。エージェントは新しい変更イベントレコードをトランザクションログから読み取り、イベントレコードをキャプチャーテーブルに複製します。変更がソーステーブルにコミットされてから、対応する変更テーブルに変更が反映される間、常に短いレイテンシーが間隔で発生します。この遅延間隔は、ソーステーブルで変更が発生したときから、Debezium がその変更を Apache Kafka にストリーミングできるようになるまでの差を表します。

データの変更に素早く対応する必要があるアプリケーションについては、ソースとキャプチャーテーブル間で密接に同期を維持するのが理想的です。キャプチャーエージェントを実行してできるだけ迅速に変更イベントを継続的に処理すると、スループットが増加し、レイテンシーが減少するため、イベントの発生後にほぼリアルタイムで新しいイベントレコードが変更テーブルに入力されることを想像するかもしれません。しかし、これは必ずしもそうであるとは限りません。同期を即時に行うとパフォーマンスに影響します。変更エージェントが新しいイベントレコードについてデータベースにクエリーを実行するたびに、データベースホストの CPU 負荷が増加します。サーバーへの負荷が増えると、データベース全体のパフォーマンスに悪影響を及ぼす可能性があり、特にデータベースの使用がピークに達するときにトランザクションの効率が低下する可能性があります。

データベースメトリクスを監視して、サーバーがキャプチャーエージェントのアクティビティーをサポートできなくなるレベルにデータベースが達した場合に認識できるようにすることが重要となります。キャプチャーエージェントの実行中にパフォーマンスの問題が発生した場合は、キャプチャーエージェント設定を調整して CPU の負荷を減らします。

2.1.5.3. DB2 キャプチャーエージェントの設定パラメーター

Db2 では、IBMSNAP_CAPPARMS テーブルにはキャプチャーエージェントの動作を制御するパラメーターが含まれています。これらのパラメーターの値を調整して、キャプチャープロセスの設定を調整すると、CPU の負荷を減らしながら許容レベルのレイテンシーを維持することができます。

注記

Db2 のキャプチャーエージェントパラメーターの設定方法に関する具体的なガイダンスは、このドキュメントの範囲外となります。

IBMSNAP_CAPPARMS テーブルでは、CPU 負荷の削減に最も影響を与えるパラメーターは以下のとおりです。

COMMIT_INTERVAL
  • キャプチャーエージェントがデータを変更データテーブルにコミットするまで待つ期間を秒単位で指定します。
  • 値が大きいほど、データベースホストの負荷が減少し、レイテンシーが増加します。
  • デフォルト値は 30 です。
SLEEP_INTERVAL
  • キャプチャーエージェントがアクティブなトランザクションログの最後に到達した後に、新しいコミットサイクルの開始まで待つ期間を秒単位で指定します。
  • 値が大きいほど、サーバーの負荷が減少し、レイテンシーが増加します。
  • デフォルト値は 5 です。

関連情報

  • キャプチャーエージェントパラメーターの詳細は、Db2 のドキュメントを参照してください。

2.1.6. Debezium Db2 コネクターのデプロイ

以下の方法のいずれかを使用して Debezium Db2 コネクターをデプロイできます。

重要

ライセンス要件のため、Debezium Db2 コネクターアーカイブには、Debezium が Db2 データベースに接続するために必要な Db2 JDBC ドライバーは含まれていません。コネクターがデータベースにアクセスできるようにするには、コネクター環境にドライバーを追加する必要があります。ドライバーの入手方法については、Db2JDBC ドライバーの入手を参照してください。

2.1.6.1. Db2 JDBC ドライバーの取得

Debezium が Db2 データベースに接続するために必要な Db2 JDBC ドライバーファイルは、ライセンスの関係で Debezium Db2 コネクターアーカイブに含まれていません。ドライバーは、Maven Central からダウンロード可能です。使用するデプロイメント方法に応じて、Kafka Connect カスタムリソースまたはコネクターイメージの構築に使用する Dockerfile にコマンドを追加して、ドライバーを取得することができます。

  • Streams for Apache Kafka を使用して Kafka Connect イメージにコネクターを追加する場合は、に示すように、KafkaConnect カスタムリソースの builds.plugins.artifact.url にドライバーの Maven Central の場所を追加します。
  • Dockerfile を使用してコネクター用のコンテナーイメージを構築する場合、Dockerfile に curl コマンドを挿入して、Maven Central から必要なドライバーファイルをダウンロードするための URL を指定します。詳細は、「Dockerfile からカスタム Kafka Connect コンテナーイメージをビルドして Debezium Db2 コネクターのデプロイ」 を参照してください。

2.1.6.2. Streams for Apache Kafka を使用した Db2 コネクターデプロイメント

Debezium 1.7 以降、Debezium コネクターのデプロイに推奨される方法は、Streams for Apache Kafka を使用してコネクタープラグインを含む Kafka Connect コンテナーイメージを構築することです。

デプロイメントプロセス中に、以下のカスタムリソース (CR) を作成し、使用します。

  • Kafka Connect インスタンスを定義し、コネクターアーティファクトに関する情報をイメージに含める必要がある KafkaConnect CR。
  • コネクターがソースデータベースにアクセスするために使用する情報を提供する KafkaConnector CR。Streams for Apache Kafka が Kafka Connect Pod を起動した後、KafkaConnector CR を適用してコネクターを開始します。

Kafka Connect イメージのビルド仕様では、デプロイ可能なコネクターを指定できます。各コネクタープラグインに対して、デプロイメントに利用可能にする他のコンポーネントを指定することもできます。たとえば、Apicurio Registry アーティファクトや Debezium スクリプトコンポーネントを追加できます。Streams for Apache Kafka が Kafka Connect イメージをビルドすると、指定のアーティファクトをダウンロードし、イメージに組み込みます。

Kafka Connect CR の spec.build.output パラメーターは、生成される KafkaConnectコンテナーイメージを格納する場所を指定します。コンテナーイメージは Docker レジストリーまたは OpenShift ImageStream に保存できます。イメージを ImageStream に保存するには、Kafka Connect をデプロイする前に ImageStream を作成する必要があります。ImageStreams は自動的に作成されません。

注記

KafkaConnect リソースを使用してクラスターを作成する場合は、Kafka Connect REST API を使用してコネクターを作成または更新できません。ただし、REST API を使用して情報を取得できます。

2.1.6.3. Streams for Apache Kafka を使用した Debezium Db2 コネクターのデプロイ

以前のバージョンの Streams for Apache Kafka では、OpenShift に Debezium コネクターをデプロイするには、最初にコネクター用の Kafka Connect イメージをビルドする必要がありました。コネクターを OpenShift にデプロイするのに現在推奨される方法は、Streams for Apache Kafka でビルド設定を使用して、使用する Debezium コネクタープラグインが含まれる Kafka Connect コンテナーイメージを自動的に構築することです。

ビルドプロセス中、Streams for Apache Kafka Operator は Debezium コネクター定義を含む KafkaConnect カスタムリソースの入力パラメーターを Kafka Connect コンテナーイメージに変換します。このビルドは、Red Hat Maven リポジトリーまたは別の設定済みの HTTP サーバーから必要なアーティファクトをダウンロードします。

新規に作成されたコンテナーは .spec.build.output に指定されるコンテナーレジストリーにプッシュされ、Kafka Connect クラスターのデプロイに使用されます。Streams for Apache Kafka が Kafka Connect イメージをビルドした後、KafkaConnector カスタムリソースを作成し、ビルドに含まれるコネクターを起動します。

前提条件

  • クラスター Operator がインストールされている OpenShift クラスターにアクセスできる。
  • Streams for Apache Kafka Operator が稼働している。
  • Deploying and Managing Streams for Apache Kafka on OpenShift に記載されているように、Apache Kafka クラスターがデプロイされている。
  • Kafka Connect が Streams for Apache Kafka にデプロイされている。
  • Red Hat build of Debezium のライセンスを所有している。
  • OpenShift oc CLI クライアントがインストールされている、または OpenShift Container Platform Web コンソールにアクセスできる。
  • Kafka Connect ビルドイメージの保存方法に応じて、レジストリーのパーミッションを用意するか、ImageStream リソースを作成している。

    ビルドイメージを Red Hat Quay.io または Docker Hub などのイメージレジストリーに保存する場合は、以下が必要です。
    • レジストリーでイメージを作成し、管理するためのアカウントおよびパーミッション
    ビルドイメージをネイティブ OpenShift ImageStream として保存する場合は、以下を行います。

手順

  1. OpenShift クラスターにログインします。
  2. コネクターの Debezium KafkaConnect カスタムリソース (CR) を作成するか、既存のリソースを変更します。たとえば、metadata.annotations および spec.build プロパティーを指定する dbz-connect.yaml という名前の KafkaConnect CR を作成します。以下の例は、KafkaConnect カスタムリソースを記述する dbz-connect.yaml ファイルからの抜粋を示しています。

    例2.1 Debezium コネクターを含む KafkaConnect カスタムリソースを定義した dbz-connect.yaml ファイル

    次の例では、カスタムリソースは、次のアーティファクトをダウンロードするように設定されています。

    • Debezium Db2 コネクターアーカイブ。
    • Red Hat build of Apicurio Registry アーカイブApicurio Registry はオプションのコンポーネントです。コネクターで Avro シリアル化を使用する場合にのみ、Apicurio Registry コンポーネントを追加します。
    • Debezium スクリプト SMT アーカイブと Debezium コネクターで使用する関連言語の依存関係。SMT アーカイブおよび言語の依存関係は任意のコンポーネントです。Debezium コンテンツベースのルーティング SMT または フィルター SMT を使用する場合にのみ、これらのコンポーネントを追加します。
    • Db2 JDBC ドライバー。Db2 データベースに接続するために必要ですが、コネクターアーカイブには含まれていません。
    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnect
    metadata:
      name: debezium-kafka-connect-cluster
      annotations:
        strimzi.io/use-connector-resources: "true" 1
    spec:
      version: 3.6.0
      build: 2
        output: 3
          type: imagestream  4
          image: debezium-streams-connect:latest
        plugins: 5
          - name: debezium-connector-db2
            artifacts:
              - type: zip 6
                url: https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-db2/2.7.3.Final-redhat-00001/debezium-connector-db2-2.7.3.Final-redhat-00001-plugin.zip  7
              - type: zip
                url: https://maven.repository.redhat.com/ga/io/apicurio/apicurio-registry-distro-connect-converter/2.4.4.Final-redhat-<build-number>/apicurio-registry-distro-connect-converter-2.4.4.Final-redhat-<build-number>.zip  8
              - type: zip
                url: https://maven.repository.redhat.com/ga/io/debezium/debezium-scripting/2.7.3.Final-redhat-00001/debezium-scripting-2.7.3.Final-redhat-00001.zip 9
              - type: jar
                url: https://repo1.maven.org/maven2/org/apache/groovy/groovy/3.0.11/groovy-3.0.11.jar  10
              - type: jar
                url: https://repo1.maven.org/maven2/org/apache/groovy/groovy-jsr223/3.0.11/groovy-jsr223-3.0.11.jar
              - type: jar
                url: https://repo1.maven.org/maven2/org/apache/groovy/groovy-json3.0.11/groovy-json-3.0.11.jar
              - type: jar          11
                url: https://repo1.maven.org/maven2/com/ibm/db2/jcc/11.5.0.0/jcc-11.5.0.0.jar
    
      bootstrapServers: debezium-kafka-cluster-kafka-bootstrap:9093
    
      ...
    表2.17 Kafka Connect 設定の説明
    項目説明

    1

    strimzi.io/use-connector-resources アノテーションを "true" に設定して、クラスター Operator が KafkaConnector リソースを使用してこの Kafka Connect クラスター内のコネクターを設定できるようにします。

    2

    spec.build 設定は、ビルドイメージの保存場所を指定し、プラグインアーティファクトの場所とともにイメージに追加するプラグインをリストします。

    3

    build.output は、新しくビルドされたイメージを保存するレジストリーを指定します。

    4

    イメージ出力の名前およびイメージ名を指定します。output.type の有効な値は、Docker Hub や Quay などのコンテナーレジストリーにプッシュする場合は docker、内部の OpenShift ImageStream にイメージをプッシュする場合は imagestream です。ImageStream を使用するには、ImageStream リソースをクラスターにデプロイする必要があります。KafkaConnect 設定で build.output を指定する方法の詳細は、{NameConfiguringStreamsOpenShift} の Streams for Apache Kafka Build schema reference を参照してください。

    5

    plugins 設定は、Kafka Connect イメージに追加するすべてのコネクターをリストします。リストの各エントリーについて、プラグイン name と、コネクターのビルドに必要なアーティファクトに関する情報を指定します。必要に応じて、各コネクタープラグインに対して、コネクターと使用できる他のコンポーネントを含めることができます。たとえば、Service Registry アーティファクトまたは Debezium スクリプトコンポーネントを追加できます。

    6

    artifacts.type の値は、artifacts.url で指定するアーティファクトのファイルタイプを指定します。有効なタイプは ziptgz、または jar です。Debezium コネクターアーカイブは、.zip ファイル形式で提供されます。JDBC ドライバーファイルは .jar 形式です。type の値は、url フィールドで参照されるファイルのタイプと一致する必要があります。

    7

    artifacts.url の値は、コネクターアーティファクトのファイルを格納する Maven リポジトリーなどの HTTP サーバーのアドレスを指定します。OpenShift クラスターが指定されたサーバーにアクセスできる必要があります。

    8

    (オプション) Apicurio Registry コンポーネントをダウンロードするためのアーティファクト typeurl を指定します。デフォルトの JSON コンバーターを使用する代わりに、コネクターが Apache Avro を使用して Red Hat build of Apicurio Registry でイベントのキーと値をシリアル化する場合にのみ、Apicurio Registry アーティファクトを含めます。

    9

    (オプション) Debezium コネクターで使用する Debezium スクリプト SMT アーカイブのアーティファクト typeurl を指定します。Debezium コンテンツベースのルーティング SMT または フィルター SMT を使用する場合にのみ、スクリプト SMT を含めます。スクリプト SMT を使用するには、groovy などの JSR 223 準拠のスクリプト実装もデプロイする必要があります。

    10

    (オプション) JSR 223 準拠のスクリプト実装の JAR ファイルのアーティファクト typeurl を指定します。これは、Debezium スクリプト SMT で必要です。

    重要

    Streams for Apache Kafka を使用して Kafka Connect イメージにコネクタープラグインを組み込む場合は、必要なスクリプト言語コンポーネントごとに、artifacts.url に JAR ファイルの場所を指定し、artifacts.type の値も jar に設定する必要があります。値が無効な場合は、実行時にコネクターが失敗します。

    スクリプト SMT で Apache Groovy 言語を使用できるようにするために、この例のカスタムリソースは、次のライブラリーの JAR ファイルを取得します。

    • groovy
    • groovy-jsr223 (スクリプトエージェント)
    • groovy-json (JSON 文字列を解析するためのモジュール)

    Debezium スクリプト SMT は、GraalVM JavaScript の JSR 223 実装の使用もサポートします。

    11

    Maven Central にある Db2 JDBC ドライバーの場所を指定します。必要なドライバーが Debezium Db2 コネクターアーカイブに含まれていない。

  3. 以下のコマンドを入力して、KafkaConnect ビルド仕様を OpenShift クラスターに適用します。

    oc create -f dbz-connect.yaml

    Streams Operator はカスタムリソースで指定された設定に基づいて、デプロイする Kafka Connect イメージを準備します。
    ビルドが完了すると、Operator はイメージを指定されたレジストリーまたは ImageStream にプッシュし、Kafka Connect クラスターを起動します。設定にリスト表示されているコネクターアーティファクトはクラスターで利用できます。

  4. KafkaConnector リソースを作成し、デプロイする各コネクターのインスタンスを定義します。
    たとえば、以下の KafkaConnector CR を作成し、db2-inventory-connector.yaml として保存します。

    例2.2 Debezium コネクターの KafkaConnector カスタムリソースを定義する db2-inventory-connector.yaml ファイル

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      labels:
        strimzi.io/cluster: debezium-kafka-connect-cluster
      name: inventory-connector-db2 1
    spec:
      class: io.debezium.connector.db2.Db2ConnectorConnector 2
      tasksMax: 1  3
      config:  4
        schema.history.internal.kafka.bootstrap.servers: debezium-kafka-cluster-kafka-bootstrap.debezium.svc.cluster.local:9092
        schema.history.internal.kafka.topic: schema-changes.inventory
        database.hostname: db2.debezium-db2.svc.cluster.local 5
        database.port: 50000   6
        database.user: debezium  7
        database.password: dbz  8
        database.dbname: mydatabase 9
        topic.prefix: inventory-connector-db2 10
        table.include.list: public.inventory  11
    
        ...
    表2.18 コネクター設定の説明
    項目説明

    1

    Kafka Connect クラスターに登録するコネクターの名前。

    2

    コネクタークラスの名前。

    3

    同時に動作できるタスクの数。

    4

    コネクターの設定。

    5

    ホストデータベースインスタンスのアドレス。

    6

    データベースインスタンスのポート番号。

    7

    Debezium がデータベースへの接続に使用するアカウントの名前。

    8

    Debezium がデータベースユーザーアカウントに接続するために使用するパスワード。

    9

    変更をキャプチャーするデータベースの名前。

    10

    データベースインスタンスまたはクラスターのトピック接頭辞。
    指定する名前は、英数字またはアンダースコアのみで設定する必要があります。
    トピック接頭辞は、このコネクターから変更イベントを受信する Kafka トピックの接頭辞として使用されるため、名前はクラスターのコネクター間で一意である必要があります。
    コネクターを Avro コネクター と統合する場合、この名前空間は関連する Kafka Connect スキーマの名前や、対応する Avro スキーマの名前空間でも使用されます。https://docs.redhat.com/documentation/en/red_hat_build_of_debezium/2.7.0/html-single/debezium_user_guide/index#configuring-debezium-connectors-to-use-avro-serialization

    11

    コネクターが変更イベントをキャプチャーするテーブルのリスト。

  5. 以下のコマンドを実行してコネクターリソースを作成します。

    oc create -n <namespace> -f <kafkaConnector>.yaml

    以下に例を示します。

    oc create -n debezium -f db2-inventory-connector.yaml

    コネクターは Kafka Connect クラスターに登録され、KafkaConnector CR の spec.config.database.dbname で指定されたデータベースに対して実行を開始します。コネクター Pod の準備ができると、Debezium が実行されます。

これで、Debezium Db2 のデプロイメントを確認 する準備が整いました。

2.1.6.4. Dockerfile からカスタム Kafka Connect コンテナーイメージをビルドして Debezium Db2 コネクターのデプロイ

Debezium Db2 コネクターをデプロイするには、Debezium コネクターアーカイブが含まれるカスタム Kafka Connect コンテナーイメージをビルドし、このコンテナーイメージをコンテナーレジストリーにプッシュする必要があります。次に、以下のカスタムリソース (CR) を作成する必要があります。

  • Kafka Connect インスタンスを定義する KafkaConnect CR。image は Debezium コネクターを実行するために作成したイメージの名前を指定します。この CR を、Red Hat Streams for Apache Kafka がデプロイされている OpenShift インスタンスに適用します。Streams for Apache Kafka は、Apache Kafka を OpenShift に取り入れる operator およびイメージを提供します。
  • Debezium Db2 コネクターを定義する KafkaConnector CR。この CR を KafkaConnect CR を適用したのと同じ OpenShift インスタンスに適用します。

前提条件

  • Db2 が実行中で、Db2 を設定して Debezium コネクターと連携する 手順が完了済みである必要があります。
  • Streams for Apache Kafka は OpenShift にデプロイされ、Apache Kafka および Kafka Connect を実行している。詳細は、OpenShift での Streams for Apache Kafka のデプロイおよび管理 を参照してください。
  • Podman または Docker がインストールされている。
  • Kafka Connect サーバーは、Db2 用の必要な JDBC ドライバーをダウンロードするために、Maven Central にアクセスすることができます。また、ドライバーのローカルコピー、またはローカルの Maven リポジトリーや他の HTTP サーバーから利用可能なものを使用することもできます。
  • Debezium コネクターを実行するコンテナーを追加する予定のコンテナーレジストリー (quay.iodocker.ioなど) でコンテナーを作成および管理するアカウントとパーミッションを持っている。

手順

  1. Kafka Connect の Debezium Db2 コンテナーを作成します。

    1. registry.redhat.io/amq-streams-kafka-35-rhel8:2.5.0 をベースイメージとして使用して、新規の Dockerfile を作成します。たとえば、ターミナルウィンドウから、以下のコマンドを入力します。

      cat <<EOF >debezium-container-for-db2.yaml 1
      FROM registry.redhat.io/amq-streams-kafka-35-rhel8:2.5.0
      USER root:root
      RUN mkdir -p /opt/kafka/plugins/debezium 2
      RUN cd /opt/kafka/plugins/debezium/ \
      && curl -O https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-db2/2.7.3.Final-redhat-00001/debezium-connector-db2-2.7.3.Final-redhat-00001-plugin.zip \
      && unzip debezium-connector-db2-2.7.3.Final-redhat-00001-plugin.zip \
      && rm debezium-connector-db2-2.7.3.Final-redhat-00001-plugin.zip
      RUN cd /opt/kafka/plugins/debezium/ \
      && curl -O https://repo1.maven.org/maven2/com/ibm/db2/jcc/11.5.0.0/jcc-11.5.0.0.jar
      USER 1001
      EOF
      項目説明

      1

      任意のファイル名を指定できます。

      2

      Kafka Connect プラグインディレクトリーへのパスを指定します。Kafka Connect のプラグインディレクトリーが別の場所にある場合は、このパスを実際のディレクトリーのパスに置き換えてください。

      このコマンドは、現在のディレクトリーに debezium-container-for-db2.yaml という名前の Docker ファイルを作成します。

    2. 前のステップで作成した debezium-container-for-db2.yaml Docker ファイルからコンテナーイメージをビルドします。ファイルが含まれるディレクトリーから、ターミナルウィンドウを開き、以下のコマンドのいずれかを入力します。

      podman build -t debezium-container-for-db2:latest .
      docker build -t debezium-container-for-db2:latest .

      上記のコマンドは、debezium-container-for-db2 という名前のコンテナーイメージを構築します。

    3. カスタムイメージを quay.io などのコンテナーレジストリーまたは内部のコンテナーレジストリーにプッシュします。コンテナーレジストリーは、イメージをデプロイする OpenShift インスタンスで利用できる必要があります。以下のいずれかのコマンドを実行します。

      podman push <myregistry.io>/debezium-container-for-db2:latest
      docker push <myregistry.io>/debezium-container-for-db2:latest
    4. 新しい Debezium Db2 KafkaConnect カスタムリソース (CR) を作成します。たとえば、annotations および image プロパティーを指定する dbz-connect.yaml という名前の KafkaConnect CR を作成します。以下の例は、KafkaConnect カスタムリソースを記述する dbz-connect.yaml ファイルからの抜粋を示しています。

      apiVersion: kafka.strimzi.io/v1beta2
      kind: KafkaConnect
      metadata:
        name: my-connect-cluster
        annotations:
          strimzi.io/use-connector-resources: "true" 1
      spec:
        #...
        image: debezium-container-for-db2  2
      
        ...
      項目説明

      1

      KafkaConnector リソースはこの Kafka Connect クラスターでコネクターを設定するために使用されることを、metadata.annotations は Cluster Operator に示します。

      2

      spec.image は Debezium コネクターを実行するために作成したイメージの名前を指定します。設定された場合、このプロパティーによって Cluster Operator の STRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE 変数がオーバーライドされます。

    5. 以下のコマンドを入力して、KafkaConnect CR を OpenShift Kafka Connect 環境に適用します。

      oc create -f dbz-connect.yaml

      このコマンドは、Debezium コネクターを実行するために作成したイメージの名前を指定する Kafka Connect インスタンスを追加します。

  2. Debezium Db2 コネクターインスタンスを設定する KafkaConnector カスタムリソースを作成します。

    通常、コネクターに使用できる設定プロパティーを使用して、.yaml ファイルに Debezium Db2 コネクターを設定します。コネクター設定は、Debezium に対して、スキーマおよびテーブルのサブセットにイベントを生成するよう指示する可能性があり、または機密性の高い、大きすぎる、または不必要な指定のコラムで Debezium が値を無視、マスク、または切り捨てするようにプロパティーを設定する可能性もあります。

    以下の例では、ポート 50000 で Db2 サーバーホスト 192.168.99.100 に接続する Debezium コネクターを設定します。このホストには、データベース名が mydatabase、テーブル名が inventory、サーバーの論理名が inventory-connector-db2 があります。

    Db2 inventory-connector.yaml

    apiVersion: kafka.strimzi.io/v1beta2
      kind: KafkaConnector
      metadata:
        name: inventory-connector-db2  1
        labels:
          strimzi.io/cluster: my-connect-cluster
        annotations:
          strimzi.io/use-connector-resources: 'true'
      spec:
        class: io.debezium.connector.db2.Db2Connector 2
        tasksMax: 1  3
        config:  4
          database.hostname: 192.168.99.100   5
          database.port: 50000 6
          database.user: db2inst1 7
          database.password: Password! 8
          database.dbname: mydatabase 9
          topic.prefix: inventory-connector-db2   10
          table.include.list: public.inventory   11
    
          ...

    表2.19 コネクター設定の説明
    項目説明

    1

    Kafka Connect クラスターに登録する場合のコネクターの名前。

    2

    この Db2 コネクタークラスの名前。

    3

    一度に実行できるタスクは 1 つだけです。

    4

    コネクターの設定。

    5

    Db2 インスタンスのアドレスであるデータベースホスト。

    6

    Db2 インスタンスのポート番号。

    7

    Db2 ユーザーの名前。

    8

    Db2 ユーザーのパスワード。

    9

    変更をキャプチャーするデータベースの名前。

    10

    namespace を形成する Db2 インスタンス/クラスターの論理名で、コネクターが書き込む Kafka トピックの名前、Kafka Connect スキーマ名、および Arvo コネクター が使用される場合に対応する Avro スキーマの名前空間で使用 れます。

    11

    コネクターは public.inventory テーブルからのみ変更をキャプチャーします。

  3. Kafka Connect でコネクターインスタンスを作成します。たとえば、KafkaConnector リソースを inventory-connector.yaml ファイルに保存した場合は、以下のコマンドを実行します。

    oc apply -f inventory-connector.yaml

    上記のコマンドは inventory-connector を登録し、コネクターは KafkaConnector CR に定義されている mydatabase データベースに対して実行を開始します。

Debezium Db2 コネクターに設定できる設定プロパティーの完全リストは、Db2 コネクタープロパティー を参照してください。

Results

コネクターが起動すると、コネクターが変更をキャプチャーするように設定された Db2 データベーステーブルの 整合性スナップショット が実行されます。その後、コネクターは行レベルの操作のデータ変更イベントの生成を開始し、変更イベントレコードを Kafka トピックにストリーミングします。

2.1.6.5. Debezium Db2 コネクターが実行していることの確認

コネクターがエラーなしで正常に起動すると、コネクターがキャプチャーするように設定された各テーブルのトピックが作成されます。ダウンストリームアプリケーションは、これらのトピックをサブスクライブして、ソースデータベースで発生する情報イベントを取得できます。

コネクターが実行されていることを確認するには、OpenShift Container Platform Web コンソールまたは OpenShift CLI ツール (oc) から以下の操作を実行します。

  • コネクターのステータスを確認します。
  • コネクターがトピックを生成していることを確認します。
  • 各テーブルの最初のスナップショットの実行中にコネクターが生成する読み取り操作 ("op":"r") のイベントがトピックに反映されていることを確認します。

前提条件

  • Debezium コネクターは Streams for Apache Kafka on OpenShift にデプロイされている。
  • OpenShift oc CLI クライアントがインストールされている。
  • OpenShift Container Platform Web コンソールにアクセスできる。

手順

  1. 以下の方法のいずれかを使用して KafkaConnector リソースのステータスを確認します。

    • OpenShift Container Platform Web コンソールから以下を実行します。

      1. Home Search に移動します。
      2. Search ページで Resources をクリックし、Select Resource ボックスを開き、KafkaConnector を入力します。
      3. KafkaConnectors リストから、チェックするコネクターの名前をクリックします (例: inventory-connector-db2)。
      4. Conditions セクションで、Type および Status 列の値が Ready および True に設定されていることを確認します。
    • ターミナルウィンドウから以下を実行します。

      1. 以下のコマンドを入力します。

        oc describe KafkaConnector <connector-name> -n <project>

        以下に例を示します。

        oc describe KafkaConnector inventory-connector-db2 -n debezium

        このコマンドは、以下の出力のようなステータス情報を返します。

        例2.3 KafkaConnector リソースのステータス

        Name:         inventory-connector-db2
        Namespace:    debezium
        Labels:       strimzi.io/cluster=debezium-kafka-connect-cluster
        Annotations:  <none>
        API Version:  kafka.strimzi.io/v1beta2
        Kind:         KafkaConnector
        
        ...
        
        Status:
          Conditions:
            Last Transition Time:  2021-12-08T17:41:34.897153Z
            Status:                True
            Type:                  Ready
          Connector Status:
            Connector:
              State:      RUNNING
              worker_id:  10.131.1.124:8083
            Name:         inventory-connector-db2
            Tasks:
              Id:               0
              State:            RUNNING
              worker_id:        10.131.1.124:8083
            Type:               source
          Observed Generation:  1
          Tasks Max:            1
          Topics:
            inventory-connector-db2.inventory
            inventory-connector-db2.inventory.addresses
            inventory-connector-db2.inventory.customers
            inventory-connector-db2.inventory.geom
            inventory-connector-db2.inventory.orders
            inventory-connector-db2.inventory.products
            inventory-connector-db2.inventory.products_on_hand
        Events:  <none>
  2. コネクターによって Kafka トピックが作成されたことを確認します。

    • OpenShift Container Platform Web コンソールから以下を実行します。

      1. Home Search に移動します。
      2. Search ページで Resources をクリックし、Select Resource ボックスを開き、KafkaTopic を入力します。
      3. KafkaTopics リストから確認するトピックの名前をクリックします (例: inventory-connector-db2.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d)。
      4. Conditions セクションで、Type および Status 列の値が Ready および True に設定されていることを確認します。
    • ターミナルウィンドウから以下を実行します。

      1. 以下のコマンドを入力します。

        oc get kafkatopics

        このコマンドは、以下の出力のようなステータス情報を返します。

        例2.4 KafkaTopic リソースのステータス

        NAME                                                                    CLUSTER               PARTITIONS   REPLICATION FACTOR   READY
        connect-cluster-configs                                                 debezium-kafka-cluster   1            1                    True
        connect-cluster-offsets                                                 debezium-kafka-cluster   25           1                    True
        connect-cluster-status                                                  debezium-kafka-cluster   5            1                    True
        consumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a             debezium-kafka-cluster   50           1                    True
        inventory-connector-db2--a96f69b23d6118ff415f772679da623fbbb99421                               debezium-kafka-cluster   1            1                    True
        inventory-connector-db2.inventory.addresses---1b6beaf7b2eb57d177d92be90ca2b210c9a56480          debezium-kafka-cluster   1            1                    True
        inventory-connector-db2.inventory.customers---9931e04ec92ecc0924f4406af3fdace7545c483b          debezium-kafka-cluster   1            1                    True
        inventory-connector-db2.inventory.geom---9f7e136091f071bf49ca59bf99e86c713ee58dd5               debezium-kafka-cluster   1            1                    True
        inventory-connector-db2.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d             debezium-kafka-cluster   1            1                    True
        inventory-connector-db2.inventory.products---df0746db116844cee2297fab611c21b56f82dcef           debezium-kafka-cluster   1            1                    True
        inventory-connector-db2.inventory.products_on_hand---8649e0f17ffcc9212e266e31a7aeea4585e5c6b5   debezium-kafka-cluster   1            1                    True
        schema-changes.inventory                                                debezium-kafka-cluster   1            1                    True
        strimzi-store-topic---effb8e3e057afce1ecf67c3f5d8e4e3ff177fc55          debezium-kafka-cluster   1            1                    True
        strimzi-topic-operator-kstreams-topic-store-changelog---b75e702040b99be8a9263134de3507fc0cc4017b  debezium-kafka-cluster  1   1    True
  3. トピックの内容を確認します。

    • ターミナルウィンドウから、以下のコマンドを入力します。
    oc exec -n <project>  -it <kafka-cluster> -- /opt/kafka/bin/kafka-console-consumer.sh \
    >     --bootstrap-server localhost:9092 \
    >     --from-beginning \
    >     --property print.key=true \
    >     --topic=<topic-name>

    以下に例を示します。

    oc exec -n debezium  -it debezium-kafka-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \
    >     --bootstrap-server localhost:9092 \
    >     --from-beginning \
    >     --property print.key=true \
    >     --topic=inventory-connector-db2.inventory.products_on_hand

    トピック名を指定する形式は、手順 1 で返された oc describe コマンドと同じです (例: inventory_connector_db2.inventory.addresses)。

    トピックの各イベントについて、このコマンドは、以下の出力のような情報を返します。

    例2.5 Debezium 変更イベントの内容

    {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"inventory-connector-db2.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-db2.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-db2.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"int64","optional":false,"field":"ts_us"},{"type":"int64","optional":false,"field":"ts_ns"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.db2.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"inventory-connector-db2.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"2.7.3.Final-redhat-00001","connector":"db2","name":"inventory-connector-db2","ts_ms":1638985247805,"ts_us":1638985247805000000,"ts_ns":1638985247805000000,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"db2-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"ts_us":1638985247805102,"ts_ns":1638985247805102588,"transaction":null}}

    上記の例では、payload 値は、コネクタースナップショットがテーブル inventory.products_on_hand から読み込み ("op" ="r") イベントを生成したことを示しています。product_id レコードの "before" 状態は null であり、レコードに以前の値が存在しないことを示しています。"after" 状態は、product_id 101 を持つ項目の quantity3 であることを示しています。

2.1.6.6. Debezium Db2 コネクター設定プロパティーの説明

Debezium Db2 コネクターには、アプリケーションに適したコネクター動作を実現するために使用できる設定プロパティーが多数あります。多くのプロパティーにはデフォルト値があります。プロパティーに関する情報は、以下のように設定されています。

必要な Debezium Db2 コネクター設定プロパティー

以下の設定プロパティーは、デフォルト値がない場合は必須です。

プロパティーデフォルト説明

name

デフォルトなし

コネクターの一意名。同じ名前で再登録を試みると失敗します。このプロパティーはすべての Kafka Connect コネクターに必要です。

connector.class

デフォルトなし

コネクターの Java クラスの名前。Db2 コネクターには、常に io.debezium.connector.db2.Db2Connector の値を使用します。

tasks.max

1

このコネクターのために作成する必要のあるタスクの最大数。Db2 コネクターは常に単一のタスクを使用するため、この値を使用しません。そのため、デフォルト値は常に許容されます。

database.hostname

デフォルトなし

Db2 データベースサーバーの IP アドレスまたはホスト名。

database.port

50000

Db2 データベースサーバーの整数のポート番号。

database.user

デフォルトなし

Db2 データベースサーバーに接続するための Db2 データベースユーザーの名前。

database.password

デフォルトなし

Db2 データベースサーバーへの接続時に使用するパスワード。

database.dbname

デフォルトなし

変更をストリーミングする Db2 データベースの名前

topic.prefix

デフォルトなし

Debezium が変更をキャプチャーするデータベースをホストする特定の Db2 データベースサーバーの namespace を提供するトピック接頭辞。トピックの接頭辞名には、英数字、ハイフン、ドット、およびアンダースコアのみを使用する必要があります。このトピック接頭辞は、このコネクターからレコードを受け取るすべての Kafka トピックに使用されるため、トピック接頭辞は他のすべてのコネクターで一意である必要があります。

警告

このプロパティーの値を変更しないでください。名前の値を変更すると、再起動後に、元のトピックにイベントを発行し続けるのではなく、新しい値に基づいた名前のトピックに後続のイベントを発行します。また、コネクターはデータベーススキーマ履歴トピックを復元できません。

table.include.list

デフォルトなし

コネクターで変更をキャプチャーするテーブルの完全修飾テーブル識別子と一致する正規表現のコンマ区切りリスト (任意)。このプロパティーが設定されている場合、コネクターは指定されたテーブルからのみ変更をキャプチャします。各識別子の形式は schemaName.tableName です。デフォルトでは、コネクターはシステム以外のテーブルすべての変更をキャプチャーします。

テーブルの名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、テーブル名に存在する可能性のある部分文字列とは一致しない、テーブルの名前文字列全体と照合されます。
このプロパティーを設定に含める場合は、table.exclude.list プロパティーも設定しないでください。

table.exclude.list

デフォルトなし

コネクターで変更をキャプチャーしないテーブルの完全修飾テーブル識別子と一致する正規表現のコンマ区切りリスト (任意)。コネクターは exclude リストに含まれていないシステム以外のテーブルごとに変更をキャプチャーします。各識別子の形式は schemaName.tableName です。

テーブルの名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、テーブル名に存在する可能性のある部分文字列とは一致しない、テーブルの名前文字列全体と照合されます。
このプロパティーを設定に含める場合は、table.include.list プロパティーも設定しないでください。

column.include.list

空の文字列

変更イベントレコード値に含める列の完全修飾名と一致する正規表現のコンマ区切りリスト (任意)。列の完全修飾名の形式は schemaName.tableName.columnName です。

列の名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、列名に存在する可能性のある部分文字列とは一致しない、列の名前文字列全体と照合されます。このプロパティーを設定に含める場合は、column.exclude.list プロパティーを設定しないでください。

column.exclude.list

空の文字列

変更イベント値から除外する列の完全修飾名と一致する正規表現のコンマ区切りリスト (任意)。列の完全修飾名の形式は schemaName.tableName.columnName です。

列の名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、列名に存在する可能性のある部分文字列とは一致しない、列の名前文字列全体と照合されます。プライマリーキー列は、値から除外された場合でも、イベントのキーに常に含まれます。このプロパティーを設定に含める場合は、column.include.list プロパティーを設定しないでください。

column.mask.hash.hashAlgorithm.with.salt.salt

該当なし

文字ベースの列の完全修飾名と一致する正規表現のコンマ区切りリスト (任意)。列の完全修飾名の形式は schemaName.tableName.columnName です。
列の名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、列の名前文字列全体に対して照合されます。式は、列名に存在する可能性のある部分文字列と一致しません。作成された変更イベントレコードでは、指定された列の値は仮名に置き換えられます。

仮名は、指定された hashAlgorithmsalt を適用すると得られるハッシュ化された値で構成されます。使用されるハッシュ関数に基づいて、参照整合性は保持され、列値は仮名に置き換えられます。サポートされるハッシュ関数は、Java Cryptography Architecture Standard Algorithm Name Documentation の MessageDigest セクションに説明されています。

以下の例では、CzQMA0cB5K が無作為に選択された salt になります。

column.mask.hash.SHA-256.with.salt.CzQMA0cB5K = inventory.orders.customerName, inventory.shipment.customerName

必要な場合は、仮名は自動的に列の長さに短縮されます。コネクター設定には、異なるハッシュアルゴリズムと salt を指定する複数のプロパティーを含めることができます。

使用される hashAlgorithm、選択された salt、および実際のデータセットによっては、結果として得られるデータセットが完全にマスクされないことがあります。

time.precision.mode

adaptive

時間、日付、およびタイムスタンプは、異なる精度の種類で表すことができます。

adaptive は、データベース列の型を基にして、ミリ秒、マイクロ秒、またはナノ秒の精度値のいずれかを使用して、データベースの値と全く同じように時間とタイムスタンプをキャプチャーします。

connect は、Kafka Connect の TimeDate、および Timestamp の組み込み表現を使用して、常に時間とタイムスタンプ値を表し、データベース列の精度に関わらず、ミリ秒の精度を使用します。詳細は 一時的な型 を参照してください。

tombstones.on.delete

true

削除 イベントの後に廃棄 (tombstone) イベントが続くかどうかを制御します。

true: 削除操作は、削除 イベントと後続の破棄 (tombstone) イベントで表されます。

false - delete イベントのみが出力されます。

log compaction がトピックで有効になっている場合には、ソースレコードの削除後に廃棄 (tombstone) イベントを出力すると (デフォルト動作)、Kafka は削除された行のキーに関連するすべてのイベントを完全に削除できます。

include.schema.changes

true

コネクターがデータベーススキーマの変更を、データベースサーバー ID と同じ名前の Kafka トピックに公開するかどうかを指定するブール値。各スキーマの変更は、データベース名が含まれるキーと、スキーマ更新を記述する JSON 構造である値で記録されます。これは、コネクターがデータベーススキーマ履歴を内部で記録する方法には依存しません。

column.truncate.to.length.chars

該当なし

文字ベースの列の完全修飾名と一致する正規表現のコンマ区切りリスト (任意)。プロパティー名の 長さ で指定された文字数を超えた場合に、一連の列のデータを切り捨てる場合は、このプロパティーを設定します。length を正の整数値に設定します (例: column.truncate.to.20.chars)

列の完全修飾名は、次の形式に従います: schemaName.tableName.columnName列の名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、列の名前文字列全体に対して照合されます。式は、列名に存在する可能性のある部分文字列と一致しません。

単一の設定で、異なる長さを持つ複数のプロパティーを指定できます。

column.mask.with.length.chars

該当なし

文字ベースの列の完全修飾名と一致する正規表現のコンマ区切りリスト (任意)。一連の列の値をコネクターでマスクする場合 (たとえば、列に機密データが含まれている場合) は、このプロパティーを設定します。length を正の整数に設定して、指定された列のデータをプロパティー名の 長さ で指定されたアスタリスク (*) 文字数で置き換えます。指定した列のデータを空の文字列に置き換えるには、長さ0 (ゼロ) に設定します。

列の完全修飾名は、次の形式に従います: schemaName.tableName.columnName.
列の名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、列の名前文字列全体に対して照合されます。式は、列名に存在する可能性のある部分文字列と一致しません。

単一の設定で、異なる長さを持つ複数のプロパティーを指定できます。

column.propagate.source.type

該当なし

列のメタデータを表す追加パラメーターをコネクターに発行させたい列の完全修飾名に一致する、オプションのコンマ区切りの正規表現のリスト。このプロパティーが設定されている場合、コネクターは次のフィールドをイベントレコードのスキーマに追加します。

  • __debezium.source.column.type
  • __debezium.source.column.length
  • __debezium.source.column.scale

これらのパラメーターは、列の元の型名と長さ (可変幅型の場合) をそれぞれ伝達します。
コネクターがこの余分なデータを発行できるようにすると、シンクデータベース内の特定の数値または文字ベースの列のサイズを適切に設定するのに役立ちます。

列の完全修飾名は、次のいずれかの形式に従います: databaseName.tableName.columnName、または databaseName.schemaName.tableName.columnName.
列の名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、列の名前文字列全体に対して照合されます。式は、列名に存在する可能性のある部分文字列と一致しません。

datatype.propagate.source.type

該当なし

データベース内の列に対して定義されているデータ型の完全修飾名を指定する正規表現のオプションのコンマ区切りリスト。このプロパティーが設定されている場合、データ型が一致する列に対して、コネクターはスキーマに次の追加フィールドを含むイベントレコードを発行します。

  • __debezium.source.column.type
  • __debezium.source.column.length
  • __debezium.source.column.scale

これらのパラメーターは、列の元の型名と長さ (可変幅型の場合) をそれぞれ伝達します。
コネクターがこの余分なデータを発行できるようにすると、シンクデータベース内の特定の数値または文字ベースの列のサイズを適切に設定するのに役立ちます。

列の完全修飾名の形式は、databaseName.tableName.typeName、または databaseName.schemaName.tableName.typeName のいずれかになります。
データ型の名前を照合するために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、データ型の名前文字列全体に対して照合されます。式は、型名に存在する可能性のある部分文字列と一致しません。

Db2 固有のデータ型名の一覧は、Db2 データ型マッピング を参照してください。

message.key.columns

空の文字列

指定のテーブルの Kafka トピックに公開する変更イベントレコードのカスタムメッセージキーを形成するためにコネクターが使用する列を指定する式のリスト。

デフォルトでは、Debezium はテーブルのプライマリーキー列を、出力するレコードのメッセージキーとして使用します。デフォルトの代わりに、またはプライマリーキーのないテーブルのキーを指定するには、1 つ以上の列をもとにカスタムメッセージキーを設定できます。

テーブルにカスタムメッセージキーを設定するには、テーブルを列挙した後、メッセージキーとして使用する列を列挙します。各リストエントリーは以下の形式を取ります。

<fully-qualified_tableName>:<keyColumn>,<keyColumn>

複数の列名をベースにテーブルキーを作成するには、列名の間にコンマを挿入します。
各完全修飾テーブル名は、以下の形式の正規表現です。

<schemaName>.<tableName>

プロパティーは複数のテーブルのエントリーをリストできます。リスト内の異なるテーブルのエントリーは、セミコロンを使用して、区切ります。

以下の例は、テーブル inventory.customers and purchaseorders:

inventory.customers:pk1,pk2;(.*).purchaseorders:pk3,pk4

のメッセージキーを設定します。上記の例では、列 pk1pk2 がテーブル inventory.customer のメッセージキーとして指定されます。スキーマで purchaseorders を解決する場合には、列 pk3pk4 はメッセージキーとして機能します。

schema.name.adjustment.mode

none

コネクターで使用されるメッセージコンバータとの互換性のために、スキーマ名をどのように調整するかを指定します。設定可能:

  • none は、調整を適用しません。
  • Avro は Avro タイプ名で使用できない文字をアンダースコアに置き換えます。
  • avro_unicode は、Avro タイプ名で使用できないアンダースコアまたは文字を、_uxxxx などの対応する Unicode に置き換えます。注: _ は Java のバックスラッシュなどのエスケープシーケンスです。

field.name.adjustment.mode

none

コネクターで使用されるメッセージコンバータとの互換性のために、フィールド名をどのように調整するかを指定します。設定可能:

  • none は、調整を適用しません。
  • Avro は Avro タイプ名で使用できない文字をアンダースコアに置き換えます。
  • avro_unicode は、Avro タイプ名で使用できないアンダースコアまたは文字を、_uxxxx などの対応する Unicode に置き換えます。注: _ は Java のバックスラッシュなどのエスケープシーケンスです。

詳細は、Avro の命名 を参照してください。

高度なコネクター設定プロパティー

以下の 高度な 設定プロパティーには、ほとんどの状況で機能するデフォルト設定があるため、コネクターの設定で指定する必要はほとんどありません。

プロパティーデフォルト説明

converters

デフォルトなし

コネクターが使用できる カスタムコンバーター インスタンスのシンボリック名のコンマ区切りリストを列挙します。以下に例を示します。

isbn

コネクターがカスタムコンバーターを使用できるようにするには、converters タプロパティーを設定する必要があります。

コネクターに設定するコンバーターごとに、コンバーターインターフェイスを実装するクラスの完全修飾名を指定する .type プロパティーも追加する必要があります。.type プロパティーでは、以下の形式を使用します。

<converterSymbolicName>.type

以下に例を示します。

isbn.type: io.debezium.test.IsbnConverter

設定されたコンバータの動作をさらに制御したい場合は、1 つ以上の設定パラメーターを追加して、コンバータに値を渡すことができます。追加の設定パラメーターとコンバーターを関連付けるには、パラメーター名の前にコンバーターのシンボリック名を付けます。
以下に例を示します。

isbn.schema.name: io.debezium.db2.type.Isbn

snapshot.mode

Initial

コネクターの起動時にスナップショットを実行する基準を指定します。

always
コネクターは起動するたびにスナップショットを実行します。スナップショットには、キャプチャーされたテーブルの構造およびデータが含まれます。この値を指定して、コネクターが起動するたびにキャプチャーされたテーブルからのデータの完全な表現をトピックに入力します。スナップショットが完了すると、コネクターは、後続のデータベース変更のに備え、イベントレコードのストリーミングを開始します。
Initial
コネクターは、最初のスナップショット を作成するためのデフォルトのワークフローで説明されているように、データベーススナップショットを実行します。スナップショットが完了すると、コネクターは、後続のデータベース変更のに備え、イベントレコードのストリーミングを開始します。
initial_only
コネクターは、論理サーバー名に対してオフセットが記録されていない場合にのみ、データベースにスナップショットを実行します。スナップショットが完了すると、コネクターは停止します。後続のデータベース変更に対するイベントレコードのストリーミングに移行しません。
schema_only
非推奨。no_data を参照してください。
no_data
コネクターは、関連するすべてのテーブルの構造をキャプチャーするスナップショットを実行し、デフォルトのスナップショットワークフロー に記載されているすべてのステップを実行します。ただし、コネクターの起動時(ステップ 7.b)時にデータセットを表す READ イベントが作成されない点が異なります。
recovery

損失または破損したデータベーススキーマの履歴トピックを復元するにはこのオプションを設定します。再起動後、コネクターはソーステーブルからトピックを再構築するスナップショットを実行します。また、このプロパティーを設定して、予期しない増加が発生するデータベーススキーマ履歴トピックを定期的にプルーニングすることもできます。

警告

最後のコネクターのシャットダウン後にスキーマの変更がデータベースにコミットされた場合は、このモードを使用してスナップショットを実行しないでください。

when_needed

コネクターが起動した後、以下の状況のいずれかを検出する場合にのみスナップショットが実行されます。

  • トピックオフセットを検出できません。
  • 以前に記録されたオフセットは、サーバーで利用できないログの位置を指定します。

snapshot.locking.mode

exclusive

コネクターがテーブルロックを保持するかどうか、また保持する時間をコントロールします。テーブルロックは、スナップショットの実行中に他のデータベースクライアントが特定のテーブル操作を実行できないようにします。以下の値を設定できます。

exclusive
snapshot.isolation.modeREPEATABLE_READ または EXCLUSIVE の場合、スキーマスナップショットの実行中にコネクターがテーブルでロックを保持する方法を制御します。
コネクターはテーブルロックを保持し、コネクターがデータベーススキーマと他のメタデータを読み取るスナップショットの最初のフェーズでのみ排他的なテーブルアクセスを確保します。スナップショットの後続のフェーズでは、コネクターはロックを必要としないフラッシュバッククエリーを使用して、各テーブルからすべての行を選択します。ロックを設定せずにスナップショットを実行する場合は、以下のオプション none を設定します。
none
スナップショット中にコネクターがテーブルロックを取得するのを防ぎます。この設定は、スナップショットの作成中にスキーマの変更が発生しない場合にのみ使用します。

snapshot.query.mode

select_all

スナップショットの実行中にコネクターがデータをクエリーする方法を指定します。
以下のオプションのいずれかを設定します。

select_all
コネクターは、デフォルトで select all query を実行します。任意で、列 include および exclude リストの設定に基づいて選択した列を調整します。

この設定により、snapshot.select.statement.overrides プロパティーを使用する場合と比較して、より柔軟にスナップショットコンテンツを管理できます。

snapshot.isolation.mode

repeatable_read

スナップショットの実行中に、トランザクション分離レベルとキャプチャーモードのテーブルをロックする期間を制御します。使用できる値は次のとおりです。

read_uncommitted - 最初のスナップショットの実行中に、他のトランザクションによるテーブル行の更新を防ぎません。このモードでは、データの整合性は保証されず、一部のデータが損失または破損する可能性があります。

read_committed - 最初のスナップショットの実行中に、他のトランザクションによるテーブル行の更新を防ぎません。新しいレコードが初回のスナップショットで 1 回、ストリーミングフェーズで 1 回の計 2 回発生する可能性があります。しかし、この整合性レベルはデータのミラーリングに適しています。

repeatable_read - 最初のスナップショットの実行中に、他のトランザクションがテーブル行を更新しないようにします。新しいレコードが初回のスナップショットで 1 回、ストリーミングフェーズで 1 回の計 2 回発生する可能性があります。しかし、この整合性レベルはデータのミラーリングに適しています。

exclusive - 繰り返し可能な読み取り分離レベルを使用しますが、すべてのテーブルを読み取るために排他的ロックを使用します。このモードは、最初のスナップショットの実行中に他のトランザクションがテーブル行を更新しないようにします。exclusive モードのみが完全な整合性を保証し、最初のスナップショットとログのストリーミングが履歴の線形を構成します。

event.processing.failure.handling.mode

fail

イベントの処理中にコネクターが例外を処理する方法を指定します。使用できる値は次のとおりです。

fail- コネクターは問題のあるイベントのオフセットをログに記録し、処理を停止します。

warn - コネクターは問題のあるイベントのオフセットをログに記録し、次のイベントの処理を続行します。

skip - コネクターは問題のあるイベントをスキップし、次のイベントの処理を続行します。

poll.interval.ms

500

コネクターがイベントのバッチの処理を開始する前に、新しい変更イベントの発生を待つ期間をミリ秒単位で指定する正の整数値。デフォルトは 500 ミリ秒 (0.5 秒) です。

max.batch.size

2048

コネクターが処理するイベントの各バッチの最大サイズを指定する正の整数値。

max.queue.size

8192

ブロッキングキューが保持できるレコードの最大数を指定する正の整数値。Debezium はデータベースからストリームされたイベントを読み込む際、Kafka に書き込む前にブロッキングキューにイベントを配置します。ブロッキングキューは、コネクターが Kafka に書き込むよりも速くメッセージを取り込む場合、または Kafka が利用できなくなった場合に、データベースから変更イベントを読み込むためのバックプレッシャーを提供することができます。コネクターがオフセットを定期的に記録すると、キューに保持されるイベントは無視されます。max.queue.size の値を、max.batch.size の値よりも大きくなるように設定します。

max.queue.size.in.bytes

0

ブロッキングキューの最大容量をバイト単位で指定する長整数値。デフォルトでは、ブロックキューにはボリューム制限は指定されません。キューが使用できるバイト数を指定するには、このプロパティーを正の long 値に設定します。
max.queue.size も設定されている場合、キューのサイズがどちらかのプロパティーで指定された上限に達すると、キューへの書き込みがブロックされます。たとえば、max.queue.size=1000max.queue.size.in.bytes=5000 と設定した場合、キューに 1000 レコードが入った後、あるいはキュー内のレコードの量が 5000 バイトに達した後、キューへの書き込みがブロックされます。

heartbeat.interval.ms

0

コネクターがハートビートメッセージを Kafka トピックに送信する頻度を制御します。デフォルトの動作では、コネクターはハートビートメッセージを送信しません。

ハートビートメッセージは、コネクターがデータベースから変更イベントを受信しているかどうかを監視するのに便利です。ハートビートメッセージは、コネクターの再起動時に再送信する必要がある変更イベントの数を減らすのに役立つ可能性があります。ハートビートメッセージを送信するには、このプロパティーを、ハートビートメッセージの間隔をミリ秒単位で示す正の整数に設定します。

ハートビートメッセージは、追跡されているデータベースには多くの更新があるにも関わらず、キャプチャーモードのテーブルにある更新はわずかである場合に便利です。この場合、コネクターは通常どおりにデータベーストランザクションログから読み取りしますが、変更レコードを Kafka に出力することはほとんどありません。そのため、コネクターが最新のオフセットを Kafka に送信することはほとんどありません。ハートビートメッセージを送信すると、コネクターは最新のオフセットを Kafka に送信できます。

snapshot.delay.ms

デフォルトなし

コネクターの起動時にスナップショットを実行するまでコネクターが待つ必要がある間隔 (ミリ秒単位)。クラスターで複数のコネクターを起動する場合、このプロパティーは、コネクターのリバランスが行われる原因となるスナップショットの中断を防ぐのに役立ちます。

streaming.delay.ms

0

スナップショットの完了後にストリーミングプロセスの開始をコネクターが遅延する時間をミリ秒単位で指定します。遅延間隔を設定すると、スナップショットの完了直後、ストリーミングプロセスが開始される前であれば、コネクターがスナップショットを再起動できないようにすることができます。Kafka Connect ワーカーに設定された offset.flush.interval.ms プロパティーの値よりも高い遅延値を設定します。

snapshot.include.collection.list

table.include.listに指定したすべてのテーブル

スナップショットに含めるテーブルの完全修飾名 (<schemaName>.<tableName>) と一致する正規表現のコンマ区切りリスト (オプション) です。指定する項目は、コネクターの table.include.list プロパティーで名前を付ける必要があります。このプロパティーは、コネクターの snapshot.mode プロパティーが never 以外の値に設定されている場合にのみ有効です。
このプロパティーは増分スナップショットの動作には影響しません。

テーブルの名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、テーブル名に存在する可能性のある部分文字列とは一致しない、テーブルの名前文字列全体と照合されます。

snapshot.fetch.size

2000

スナップショットの実行中、コネクターは行のバッチでテーブルの内容を読み取ります。このプロパティーは、バッチの行の最大数を指定します。

snapshot.lock.timeout.ms

10000

スナップショットの実行時に、テーブルロックを取得するまで待つ最大時間 (ミリ秒単位) を指定する正の整数値。コネクターがこの間隔でテーブルロックを取得できないと、スナップショットは失敗します。詳細は、コネクターによるスナップショットの実行方法 を参照してください。その他の可能な設定は次のとおりです。

0 - ロックを取得できないとすぐに失敗します。

-1 - コネクターは永久に待機します。

snapshot.select.statement.overrides

デフォルトなし

スナップショットに追加するテーブル行を指定します。スナップショットにテーブルの行のサブセットのみを含める場合は、プロパティーを使用します。このプロパティーはスナップショットにのみ影響します。コネクターがログから読み取るイベントには影響しません。

プロパティーには、<schemaName>.<tableName> の形式で完全修飾テーブル名のコンマ区切りリストが含まれます。たとえば、

"snapshot.select.statement.overrides": "inventory.products,customers.orders"

をリスト内の各テーブルに対して、スナップショットを作成する場合には、その他の設定プロパティーを追加して、コネクターがテーブルで実行するように SELECT ステートメントを指定します。指定した SELECT ステートメントは、スナップショットに追加するテーブル行のサブセットを決定します。以下の形式を使用して、この SELECT ステートメントプロパティーの名前 (

snapshot.select.statement.overrides.<schemaName>.<tableName>) を指定します。例: snapshot.select.statement.overrides.customers.orders.

以下に例を示します。

スナップショットにソフト削除以外のレコードのみを含める場合は、soft-delete 列 (delete_flag ) を含む customers.orders テーブルから、以下のプロパティーを追加します。

"snapshot.select.statement.overrides": "customer.orders",
"snapshot.select.statement.overrides.customer.orders": "SELECT * FROM [customers].[orders] WHERE delete_flag = 0 ORDER BY id DESC"

作成されるスナップショットでは、コネクターには delete_flag = 0 のレコードのみが含まれます。

provide.transaction.metadata

false

コネクターがトランザクション境界でイベントを生成し、トランザクションメタデータで変更イベントエンベロープを強化するかどうかを決定します。コネクターにこれを実行させる場合は true を指定します。詳細は、トランザクションメタデータ を参照してください。

skipped.operations

t

ストリーミング中にスキップされる操作タイプのコンマ区切りリスト。挿入/作成は c、更新は u、削除は d、切り捨ては t、操作をスキップしない場合は none と なります。デフォルトでは、省略操作はスキップされます (このコネクターによる出力ではない)。

signal.data.collection

デフォルトなし

シグナルをコネクターへの送信に使用されるデータコレクションの完全修飾名 コレクション名の指定には
<schemaName>.<tableName> の形式を使用します。

signal.enabled.channels

source

コネクターに対して有効な信号チャネル名のリスト。デフォルトでは、以下のチャネルが利用可能です。

  • source
  • kafka
  • file
  • jmx

notification.enabled.channels

デフォルトなし

コネクターに対して有効になっている通知チャネル名のリスト。デフォルトでは、以下のチャネルが利用可能です。

  • sink
  • log
  • jmx

incremental.snapshot.chunk.size

1024

増分スナップショットのチャンクの実行中にコネクターがメモリーを取得して読み取る行の最大数。スナップショットは、サイズが大きいスナップショットの場合にはクエリーが少なくなるため、チャンクサイズを増やすと効率が上がります。ただし、チャンクサイズが大きい場合には、スナップショットデータのバッファーにより多くのメモリーが必要になります。チャンクサイズは、環境で最適なパフォーマンスを発揮できる値に、調整します。

incremental.snapshot.watermarking.strategy

insert_insert

増分スナップショットによってキャプチャーされ、ストリーミングの再開後に再キャプチャーされる可能性のあるイベントを重複排除するために、コネクターが増分スナップショット中に使用するウォーターマークメカニズムを指定します。
以下のオプションのいずれかを指定することができます。

insert_insert
増分スナップショットを開始するシグナルを送信すると、スナップショット中に Debezium が読み取るチャンクごとに、スナップショットウィンドウを開くシグナルを記録するエントリーがシグナリングデータコレクションに書き込まれます。スナップショットが完了すると、Debezium はウィンドウを閉じるシグナルを記録する 2 番目のエントリーを挿入します。
insert_delete
増分スナップショットを開始するシグナルを送信すると、Debezium が読み取るチャンクごとに、スナップショットウィンドウを開くシグナルを記録する 1 つのエントリーがシグナリングデータコレクションに書き込まれます。スナップショットが完了すると、このエントリーは削除されます。スナップショットウィンドウを閉じるシグナルのエントリーは作成されません。シグナリングデータコレクションの急増を防ぐには、このオプションを設定します。

topic.naming.strategy

io.debezium.schema.SchemaTopicNamingStrategy

データ変更、スキーマ変更、トランザクション、ハートビートイベントなどのトピック名を決定するために使用する TopicNamingStrategy クラスの名前。デフォルトは SchemaTopicNamingStrategy

topic.delimiter

.

トピック名の区切り文字を指定します。デフォルトは . です。

topic.cache.size

10000

トピック名を保持するために使用されるサイズ (bounded concurrent hash map)。このキャッシュは、与えられたデータコレクションに対応するトピック名を決定するのに役立つ。

topic.heartbeat.prefix

__debezium-heartbeat

コネクターがハートビートメッセージを送信するトピックの名前を制御します。トピック名のパターンは、

topic.heartbeat.prefix.topic.prefix

です。たとえば、トピックの接頭辞が fulfillment の場合は、デフォルトのトピック名は __debezium-heartbeat.fulfillment になります。

topic.transaction

transaction

コネクターがトランザクションのメタデータメッセージを送信するトピックの名前を制御します。トピック名のパターンは、

topic.prefix.topic.transaction

です。たとえば、トピックの接頭辞が fulfillment の場合、デフォルトのトピック名は fulfillment.transaction になります。

snapshot.max.threads

1

初期スナップショットを実行するときにコネクターが使用するスレッドの数を指定します。並列初期スナップショットを有効にするには、プロパティーを 1 より大きい値に設定します。並列初期スナップショットでは、コネクターは複数のテーブルを同時に処理します。

重要

並列初期スナップショットはテクノロジープレビュー機能のみとなっています。テクノロジープレビュー機能は、Red Hat 製品サポートのサービスレベルアグリーメント (SLA) の対象外であり、機能的に完全ではない場合があります。Red Hat は、実稼働環境でこれらを使用することを推奨していません。テクノロジープレビューの機能は、最新の製品機能をいち早く提供して、開発段階で機能のテストを行いフィードバックを提供していただくことを目的としています。Red Hat のテクノロジープレビュー機能のサポート範囲に関する詳細は、テクノロジープレビュー機能のサポート範囲 を参照してください。

custom.metric.tags

デフォルトなし

コンテキスト情報を提供するメタデータを追加して、MBean オブジェクト名をカスタマイズするタグを定義します。キーと値のペアのコンマ区切りリストを指定します。各キーは MBean オブジェクト名のタグを表し、対応する値はキーの値を表します(例:
k1=v1,k2=v2

コネクターは、指定されたタグをベース MBean オブジェクト名に追加します。タグは、メトリクスデータの整理および分類に役立ちます。タグを定義して、特定のアプリケーションインスタンス、環境、リージョン、バージョンなどを特定できます。詳細は、カスタマイズされた MBean 名 を参照してください。

errors.max.retries

-1

接続エラーなどの再試行可能なエラーが発生する操作の後にコネクターがどのように応答するかを指定します。
以下のオプションのいずれかを設定します。

-1
制限なし。以前の失敗回数に関係なく、コネクターは常に自動的に再起動し、操作を再試行する。
0
Disabledコネクターは即座に失敗し、操作を再試行しません。コネクターの再起動にはユーザーの介入が必要です。
> 0
コネクターは、指定された最大再試行回数に達するまで自動的に再起動します。次の障害後、コネクターは停止し、ユーザーは再起動する必要があります。

database.query.timeout.ms

600000 (10 分)

コネクターがクエリーが完了するまで待機する時間をミリ秒単位で指定します。タイムアウト制限を削除するには、値を 0 (ゼロ)に設定します。

Debezium コネクターデータベーススキーマ履歴設定プロパティー

Debezium には、コネクターがスキーマ履歴トピックと対話する方法を制御する schema.history.internal.* プロパティーのセットが含まれています。

以下の表は、Debezium コネクターを設定するための schema.history.internal プロパティーを説明しています。

表2.20 コネクターデータベーススキーマ履歴の設定プロパティー
プロパティーデフォルト説明

schema.history.internal.kafka.topic

デフォルトなし

コネクターがデータベーススキーマの履歴を保存する Kafka トピックの完全名。

schema.history.internal.kafka.bootstrap.servers

デフォルトなし

Kafka クラスターへの最初の接続を確立するためにコネクターが使用するホストとポートのペアのリスト。このコネクションは、コネクターによって以前に保存されたデータベーススキーマ履歴の取得や、ソースデータベースから読み取られる各 DDL ステートメントの書き込みに使用されます。各ペアは、Kafka Connect プロセスによって使用される同じ Kafka クラスターを示す必要があります。

schema.history.internal.kafka.recovery.poll.interval.ms

100

永続化されたデータのポーリングが行われている間にコネクターが起動/回復を待つ最大時間 (ミリ秒単位) を指定する整数値。デフォルトは 100 ミリ秒です。

schema.history.internal.kafka.query.timeout.ms

3000

Kafka 管理クライアントを使用してクラスター情報を取得する際に、コネクターが待機すべき最大ミリ秒数を指定する整数値です。

schema.history.internal.kafka.create.timeout.ms

30000

Kafka 管理クライアントを使用して kafka 履歴トピックを作成する間、コネクターが待機する最大ミリ秒数を指定する整数値。

schema.history.internal.kafka.recovery.attempts

100

エラーでコネクターのリカバリーが失敗する前に、コネクターが永続化された履歴データの読み取りを試行する最大回数。データが受信されなかった場合に最大待機する時間は、recovery.attempts × recovery.poll.interval.ms です。

schema.history.internal.skip.unparseable.ddl

false

コネクターが不正または不明なデータベースのステートメントを無視するかどうか、または人が問題を修正するために処理を停止するかどうかを指定するブール値。安全なデフォルトは false です。スキップは、binlog の処理中にデータの損失や分割を引き起こす可能性があるため、必ず注意して使用する必要があります。

schema.history.internal.store.only.captured.tables.ddl

false

コネクターがスキーマまたはデータベース内のすべてのテーブルからスキーマ構造を記録するか、キャプチャー対象に指定されたテーブルのみからスキーマ構造を記録するかを指定するブール値。
以下のいずれかの値を指定します。

false (デフォルト)
データベースのスナップショット中に、コネクターは、キャプチャー対象として指定されていないテーブルを含む、データベース内のシステム以外のテーブルのスキーマデータをすべて記録します。デフォルト設定を保持することを推奨します。後で、最初にキャプチャー対象として指定しなかったテーブルから変更をキャプチャーすることにした場合、コネクターはそれらのテーブルからのデータのキャプチャーを簡単に開始できます。これは、テーブルのスキーマ構造がすでにスキーマ履歴トピックに格納されているためです。Debezium では、変更イベントが発生した時点で存在していた構造を識別できるように、テーブルのスキーマ履歴が必要です。
true
データベースのスナップショット中に、コネクターは、Debezium が変更イベントをキャプチャーするテーブルのテーブルスキーマのみを記録します。デフォルト値を変更して、後でデータベース内の他のテーブルからデータをキャプチャーするようにコネクターを設定すると、コネクターには、テーブルから変更イベントをキャプチャーするために必要なスキーマ情報がなくなります。

schema.history.internal.store.only.captured.databases.ddl

false

コネクターがデータベースインスタンス内のすべての論理データベースのスキーマ構造を記録するかどうかを指定するブール値。
以下のいずれかの値を指定します。

true
コネクターは、論理データベース内のテーブルのスキーマ構造と、Debezium が変更イベントをキャプチャーするスキーマのみを記録します。
false
コネクターは、すべての論理データベースのスキーマ構造を記録します。
注記

MySQL Connector のデフォルト値は true です。

プロデューサーおよびコンシューマークライアントを設定するためのパススルーデータベーススキーマ履歴プロパティー


Debezium は、Kafka プロデューサーを使用して、データベーススキーマ履歴トピックにスキーマの変更を書き込みます。同様に、コネクターが起動すると、データベーススキーマ履歴トピックから読み取る Kafka コンシューマーに依存します。schema.history.internal.producer.* および schema.history.internal.consumer.* 接頭辞で始まるパススルー設定プロパティーのセットに値を割り当てて、Kafka プロデューサーおよびコンシューマークライアントの設定を定義します。パススループロデューサーおよびコンシューマーデータベーススキーマ履歴プロパティーは、以下の例のように Kafka ブローカーとのこれらのクライアントの接続をセキュアにする方法など、さまざまな動作を制御します。

schema.history.internal.producer.security.protocol=SSL
schema.history.internal.producer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
schema.history.internal.producer.ssl.keystore.password=test1234
schema.history.internal.producer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
schema.history.internal.producer.ssl.truststore.password=test1234
schema.history.internal.producer.ssl.key.password=test1234

schema.history.internal.consumer.security.protocol=SSL
schema.history.internal.consumer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
schema.history.internal.consumer.ssl.keystore.password=test1234
schema.history.internal.consumer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
schema.history.internal.consumer.ssl.truststore.password=test1234
schema.history.internal.consumer.ssl.key.password=test1234

Debezium は、プロパティーを Kafka クライアントに渡す前に、プロパティー名から接頭辞を削除します。

Kafka プロデューサー設定プロパティー および Kafka コンシューマー設定プロパティーの詳細は、Kafka のドキュメントを参照してください。

Debezium コネクター Kafka は設定プロパティーをシグナル化します。

Debezium は、コネクターが Kafka シグナルトピックと対話する方法を制御する signal.* プロパティーのセットを提供します。

以下の表は、Kafka signal プロパティーを説明しています。

表2.21 Kafka のシグナル設定プロパティー
プロパティーデフォルト説明

signal.kafka.topic

<topic.prefix>-signal

コネクターがアドホックシグナルについて監視する Kafka トピックの名前。

注記

トピックの自動作成 が無効になっている場合は、必要なシグナリングトピックを手動で作成する必要があります。シグナルの順序を維持するには、シグナルトピックが必要です。シグナリングトピックには単一のパーティションが必要です。

signal.kafka.groupId

kafka-signal

Kafka コンシューマーによって使用されるグループ ID の名前。

signal.kafka.bootstrap.servers

デフォルトなし

Kafka クラスターへの最初の接続を確立するためにコネクターが使用するホストとポートのペアのリスト。各ペアは、Debezium Kafka Connect プロセスによって使用される Kafka クラスターを参照します。

signal.kafka.poll.timeout.ms

100

コネクターが信号をポーリングするときに待機する最大ミリ秒数を指定する整数値。

kafka.consumer.offset.commit.enabled

false

少なくとも 1 回は確実に配信するために、シグナルトピックのオフセットコミットを有効にします。無効にすると、コンシューマーの稼働中に受信したシグナルのみが処理されます。コンシューマーがダウンしているときに受信したシグナルはすべて失われます。

Debezium コネクターのパススルーは Kafka コンシューマークライアント設定プロパティーを示唆します。

Debezium コネクターでは、Kafka コンシューマーのパススルー設定が可能です。パススルーシグナルのプロパティーは、接頭辞 signals.consumer.* で始まります。たとえば、コネクターは signal.consumer.security.protocol=SSL などのプロパティーを Kafka コンシューマーに渡します。

Debezium は、プロパティーを Kafka シグナルコンシューマーに渡す前に、プロパティーから接頭辞を削除します。

Debezium コネクターの sink 通知設定プロパティー

以下の表は、notification プロパティーを説明しています。

表2.22 Sink 通知設定プロパティー
プロパティーデフォルト説明

notification.sink.topic.name

デフォルトなし

Debezium から通知を受信するトピックの名前。このプロパティーは、有効な通知チャネルの 1 つとして sink を含めるように notification.enabled.channels プロパティーを設定する場合に必要です。

Debezium コネクターのパススルーデータベースドライバー設定プロパティー

Debezium コネクターでは、データベースドライバーのパススルー設定が可能です。パススルーデータベースプロパティーは接頭辞 driver.* で始まります。たとえば、コネクターは driver.foobar=false などのプロパティーを JDBC URL に渡します。

データベーススキーマ履歴クライアントのパススループロパティー の場合のように、Debezium はプロパティーから接頭辞を削除してからデータベースドライバーに渡します。

2.1.7. Debezium Db2 コネクターのパフォーマンスの監視

Debezium Db2 コネクターは、Apache ZooKeeper、Apache Kafka、および Kafka Connect によって提供される JMX メトリクスの組み込みサポートに加えて、3 種類のメトリクスを提供します。

Debezium モニタリングのドキュメント では、JMX を使用してこれらのメトリクスを公開する方法の詳細を説明します。

2.1.7.1. Db2 コネクタースナップショットおよびストリーミング MBean オブジェクトのカスタマイズ名

Debezium コネクターは、コネクターの MBean 名を介してメトリクスを公開します。各コネクターインスタンスに固有のこれらのメトリクスは、コネクターのスナップショット、ストリーミング、およびスキーマ履歴プロセスの動作に関するデータを提供します。

デフォルトでは、適切に設定されたコネクターをデプロイすると、Debezium は異なるコネクターメトリクスごとに一意の MBean 名を生成します。コネクタープロセスのメトリクスを表示するには、その MBean を監視するように可観測性スタックを設定します。ただし、これらのデフォルトの MBean 名はコネクターの設定によって異なります。設定の変更によって MBean 名が変更されることがあります。MBean 名を変更すると、コネクターインスタンスと MBean 間のリンクが中断され、監視アクティビティーが中断されます。このシナリオでは、監視を再開する場合は、可観測性スタックを新しい MBean 名を使用するように再設定する必要があります。

MBean の名前の変更が原因で生じる中断の監視を防ぐには、カスタムメトリクスタグを設定できます。カスタムメトリクスを設定するには、custom.metric.tags プロパティーをコネクター設定に追加します。プロパティーは、各キーが MBean オブジェクト名のタグを表し、対応する値はそのタグの値を表します。たとえば、k1=v1,k2=v2 です。Debezium は、指定されたタグをコネクターの MBean 名に追加します。

コネクターの custom.metric.tags プロパティーを設定した後、可観測性スタックを設定して、指定されたタグに関連付けられたメトリクスを取得できます。可観測性スタックは、変更可能な MBean 名ではなく、指定されたタグを使用してコネクターを一意に識別します。その後、Debezium が MBean 名の構築方法を再定義したり、コネクター設定の topic.prefix が変更されたりしても、メトリクススクレイプタスクは指定されたタグパターンを使用してコネクターを識別するため、メトリクスの収集は中断されません。

カスタムタグを使用するさらなる利点は、データパイプラインのアーキテクチャーを反映するタグを使用できるため、運用上のニーズに合った方法でメトリクスを整理できることです。たとえば、コネクターアクティビティーのタイプ、アプリケーションコンテキスト、またはデータソースを宣言する値を持つタグを指定できます (例: db1-streaming-for-application-abc)。複数のキーと値のペアを指定すると、指定されたすべてのペアがコネクターの MBean 名に追加されます。

次の例は、タグがデフォルトの MBean 名を変更する方法を示しています。

例2.6 カスタムタグがコネクター MBean 名を変更する方法

デフォルトでは、Db2 コネクターはストリーミングメトリクスに以下の MBean 名を使用します。

debezium.db2:type=connector-metrics,context=streaming,server=<topic.prefix>

custom.metric.tags の値を database=salesdb-streaming,table=inventory に設定すると、Debezium は次のカスタム MBean 名を生成します。

debezium.db2:type=connector-metrics,context=streaming,server=<topic.prefix>,database=salesdb-streaming,table=inventory

2.1.7.2. Db2 データベースのスナップショット作成時の Debezium の監視

MBeandebezium.db2:type=connector-metrics,context=snapshot,server= <topic.prefix> です。

スナップショット操作がアクティブでない場合や、最後のコネクターの起動後にスナップショットの作成が発生した場合に、スナップショットメトリクスは公開されません。

以下の表は、利用可能なスナップショットのメトリクスの一覧です。

属性タイプ説明

LastEvent

string

コネクターが読み取りした最後のスナップショットイベント。

MilliSecondsSinceLastEvent

long

コネクターが最新のイベントを読み取りおよび処理してからの経過時間 (ミリ秒単位)。

TotalNumberOfEventsSeen

long

前回の開始またはリセット以降にコネクターで確認されたイベントの合計数。

NumberOfEventsFiltered

long

コネクターに設定された include/exclude リストのフィルタリングルールによってフィルターされたイベントの数。

CapturedTables

string[]

コネクターによって取得されるテーブルのリスト。

QueueTotalCapacity

int

snapshotter とメインの Kafka Connect ループの間でイベントを渡すために使用されるキューの長さ。

QueueRemainingCapacity

int

snapshotter とメインの Kafka Connect ループの間でイベントを渡すために使用されるキューの空き容量。

TotalTableCount

int

スナップショットに含まれているテーブルの合計数。

RemainingTableCount

int

スナップショットによってまだコピーされていないテーブルの数。

SnapshotRunning

boolean

スナップショットが起動されたかどうか。

SnapshotPaused

boolean

スナップショットが一時停止されたかどうか。

SnapshotAborted

boolean

スナップショットが中断されたかどうか。

SnapshotCompleted

boolean

スナップショットが完了したかどうか。

SnapshotDurationInSeconds

long

スナップショットが完了したかどうかに関わらず、これまでスナップショットにかかった時間 (秒単位)。スナップショットが一時停止された時間も含まれます。

SnapshotPausedDurationInSeconds

long

スナップショットが一時停止された合計秒数。スナップショットが数回一時停止された場合は、一時停止時間が加算されます。

RowsScanned

Map<String, Long>

スナップショットの各テーブルに対してスキャンされる行数が含まれるマップ。テーブルは、処理中に増分がマップに追加されます。スキャンされた 10,000 行ごとに、テーブルの完成時に更新されます。

MaxQueueSizeInBytes

long

キューの最大バッファー (バイト単位)。このメトリクスは max.queue.size.in.bytes が正の長さの値に設定されている場合に利用可能です。

CurrentQueueSizeInBytes

long

キュー内のレコードの現在の容量 (バイト単位)。

コネクターは、増分スナップショットの実行時に、以下の追加のスナップショットメトリクスも提供します。

属性タイプ説明

ChunkId

string

現在のスナップショットチャンクの識別子。

ChunkFrom

string

現在のチャンクを定義するプライマリーキーセットの下限。

ChunkTo

string

現在のチャンクを定義するプライマリーキーセットの上限。

TableFrom

string

現在スナップショットされているテーブルのプライマリーキーセットの下限。

TableTo

string

現在スナップショットされているテーブルのプライマリーキーセットの上限。

2.1.7.3. Debezium Db2 コネクターレコードストリーミングの監視

MBeandebezium.db2:type=connector-metrics,context=streaming,server= <topic.prefix> です。

以下の表は、利用可能なストリーミングメトリクスのリストです。

属性タイプ説明

LastEvent

string

コネクターが読み取られた最後のストリーミングイベント。

MilliSecondsSinceLastEvent

long

コネクターが最新のイベントを読み取りおよび処理してからの経過時間 (ミリ秒単位)。

TotalNumberOfEventsSeen

long

最後のコネクターの起動後にソースデータベースが報告するデータ変更イベントの合計数、またはメトリクスがリセットされてから報告されるデータ変更イベントの合計数。Debezium が処理するデータ変更ワークロードを表します。

TotalNumberOfCreateEventsSeen

long

最後の起動またはメトリックリセット以降にコネクターによって処理される 作成 イベントの合計数。

TotalNumberOfUpdateEventsSeen

long

最後の起動またはメトリックリセット以降にコネクターによって処理される更新イベントの合計数。

TotalNumberOfDeleteEventsSeen

long

最後の起動またはメトリックリセット以降にコネクターによって処理される削除イベントの合計数。

NumberOfEventsFiltered

long

コネクターに設定された include/exclude リストのフィルタリングルールによってフィルターされたイベントの数。

CapturedTables

string[]

コネクターによって取得されるテーブルのリスト。

QueueTotalCapacity

int

ストリーマーとメイン Kafka Connect ループの間でイベントを渡すために使用されるキューの長さ。

QueueRemainingCapacity

int

ストリーマーとメインの Kafka Connect ループの間でイベントを渡すために使用されるキューの空き容量。

Connected

boolean

コネクターが現在データベースサーバーに接続されているかどうかを示すフラグ。

MilliSecondsBehindSource

long

最後の変更イベントのタイムスタンプとそれを処理するコネクターとの間の期間 (ミリ秒単位)。この値は、データベースサーバーとコネクターが稼働しているマシンのクロック間の差異を組み込みます。

NumberOfCommittedTransactions

long

コミットされた処理済みトランザクションの数。

SourceEventPosition

Map<String, String>

最後に受信したイベントの位置。

LastTransactionId

string

最後に処理されたトランザクションのトランザクション識別子。

MaxQueueSizeInBytes

long

キューの最大バッファー (バイト単位)。このメトリクスは max.queue.size.in.bytes が正の長さの値に設定されている場合に利用可能です。

CurrentQueueSizeInBytes

long

キュー内のレコードの現在の容量 (バイト単位)。

2.1.7.4. Debezium Db2 コネクターのスキーマ履歴の監視

MBeandebezium.db2:type=connector-metrics,context=schema-history,server= <topic.prefix> です。

以下の表は、利用可能なスキーマ履歴メトリクスのリストです。

属性タイプ説明

Status

string

データベーススキーマ履歴の状態を示す STOPPEDRECOVERING (ストレージから履歴を復元)、または RUNNING のいずれか。

RecoveryStartTime

long

リカバリーが開始された時点のエポック秒の時間。

ChangesRecovered

long

リカバリーフェーズ中に読み取られた変更の数。

ChangesApplied

long

リカバリーおよびランタイム中に適用されるスキーマ変更の合計数。

MilliSecondsSinceLast​RecoveredChange

long

最後の変更が履歴ストアから復元された時点からの経過時間 (ミリ秒単位)。

MilliSecondsSinceLast​AppliedChange

long

最後の変更が適用された時点からの経過時間 (ミリ秒単位)。

LastRecoveredChange

string

履歴ストアから復元された最後の変更の文字列表現。

LastAppliedChange

string

最後に適用された変更の文字列表現。

2.1.8. Debezium Db2 コネクターの管理

Debezium Db2 コネクターをデプロイしたら、Debezium 管理 UDF を使用して、SQL コマンドで Db2 レプリケーション (ASN) を制御します。UDF によっては戻り値が必要な場合があります。この場合、SQL の VALUE ステートメントを使用して呼び出します。その他の UDF には、SQL の CALL ステートメントを使用します。

表2.23 Debezium 管理 UDF の説明
タスクコマンドおよび注記

ASN エージェントを起動する

VALUES ASNCDC.ASNCDCSERVICES('start','asncdc');

ASN エージェントを停止する

VALUES ASNCDC.ASNCDCSERVICES('stop','asncdc');

Check ASN エージェントのステータスを確認する

VALUES ASNCDC.ASNCDCSERVICES('status','asncdc');

テーブルをキャプチャーモードにする

CALL ASNCDC.ADDTABLE('MYSCHEMA', 'MYTABLE');

MYSCHEMA は、キャプチャーモードにするテーブルが含まれるスキーマの名前に置き換えます。同様に、MYTABLE は、キャプチャーモードにするテーブルの名前に置き換えます。

テーブルのキャプチャーモードを解除する

CALL ASNCDC.REMOVETABLE('MYSCHEMA', 'MYTABLE');

Reinitialize the ASN service

VALUES ASNCDC.ASNCDCSERVICES('reinit','asncdc');

テーブルをキャプチャーモードにした後か、キャプチャーモードからテーブルを削除した後に、これを行います。

2.1.9. Debezium コネクターでのキャプチャーモードの Db2 テーブルのスキーマの更新

Debezium Db2 コネクターはスキーマ変更をキャプチャーできますが、スキーマを更新するには、データベース管理者と協力してコネクターが変更イベントの生成を継続するようにする必要があります。これは、Db2 がレプリケーションを実装する方法に必要です。

Db2 のレプリケーション機能は、キャプチャーモードのテーブルごとに、すべての変更が含まれる変更データテーブルをそのソーステーブルに作成します。ただし、変更データテーブルスキーマは静的です。キャプチャーモードのテーブルのスキーマを更新する場合は、対応する変更データテーブルのスキーマを更新する必要もあります。Debezium Db2 コネクターはこれを実行できません。昇格された権限を持つデータベース管理者は、キャプチャーモードのテーブルのスキーマを更新する必要があります。

警告

同じテーブルの新しいスキーマ更新の前に、スキーマ更新の手順を完全に実行することが重要です。そのため、スキーマ更新の手順を 1 度で完了するために、すべての DDL を 1 つのバッチで実行することが推奨されます。

通常、テーブルスキーマを更新する手順は 2 つあります。

それぞれの方法に長所と短所があります。

2.1.9.1. Debezium Db2 コネクターでのオフラインスキーマ更新の実行

オフラインでスキーマの更新を行う前に、Debezium Db2 コネクターを停止します。これはより安全なスキーマ更新の手順ですが、高可用性の要件のあるアプリケーションには実現できない可能性があります。

前提条件

  • スキーマの更新が必要なキャプチャーモードのテーブル 1 つ以上。

手順

  1. データベースを更新するアプリケーションを一時停止します。
  2. Debezium コネクターがストリーミングされていない変更イベントレコードをすべてストリーミングするまで待ちます。
  3. Debezium コネクターを停止します。
  4. すべての変更をソーステーブルスキーマに適用します。
  5. ASN レジスターテーブルで、スキーマが更新されたテーブルを INACTIVE でマーク付けします。
  6. ASN キャプチャーサービスを再初期化します。
  7. キャプチャーモードからテーブルを削除するために Debezium UDF を実行 して、キャプチャーモードから古いスキーマを含まれるソーステーブルを削除します。
  8. テーブルをキャプチャーモードに追加するために Debezium UDF を実行 して、スキーマが新しいソーステーブルをキャプチャーモードに追加します。
  9. ASN レジスターテーブルで、更新されたソーステーブルを ACTIVE としてマーク付けします。
  10. ASN キャプチャーサービスを再初期化します。
  11. データベースを更新するアプリケーションを再開します。
  12. Debezium コネクターを再起動します。

2.1.9.2. Debezium Db2 コネクターでのオンラインスキーマ更新の実行

オンラインスキーマの更新ではアプリケーションやデータ処理のダウンタイムは必要ありません。そのため、オンラインスキーマの更新を実行する前に Debezium Db2 コネクターを停止しません。また、オンラインスキーマの更新手順は、オフラインスキーマの更新手順よりも簡単です。

ただし、テーブルがキャプチャーモードの場合は、列名の変更後も Db2 レプリケーション機能は引き続き古い列名を使用します。新しい列名は、Debezium の変更イベントでは表示されません。変更イベントにある新しい列名を確認するには、コネクターを再起動する必要があります。

前提条件

  • スキーマの更新が必要なキャプチャーモードのテーブル 1 つ以上。

テーブルの最後に列を追加する場合の手順

  1. 変更するスキーマのソーステーブルをロックします。
  2. ASN レジスターテーブルで、ロックされたテーブルを INACTIVE としてマーク付けします。
  3. ASN キャプチャーサービスを再初期化します。
  4. ソーステーブルのスキーマにすべての変更を適用します。
  5. 対応する変更データテーブルのスキーマにすべての変更を適用します。
  6. ASN レジスターテーブルで、ソーステーブルを ACTIVE としてマーク付けします。
  7. ASN キャプチャーサービスを再初期化します。
  8. 任意手順:コネクターを再起動して、変更イベントにある更新された列名を確認します。

テーブルの中に列を追加する場合の手順

  1. 変更するソーステーブルをロックします。
  2. ASN レジスターテーブルで、ロックされたテーブルを INACTIVE としてマーク付けします。
  3. ASN キャプチャーサービスを再初期化します。
  4. 変更するソーステーブルごとに以下を行います。

    1. ソーステーブルのデータをエクスポートします。
    2. ソーステーブルを切り捨てます。
    3. ソーステーブルを変更して列を追加します。
    4. エクスポートしたデータを変更したソーステーブルに読み込みます。
    5. ソーステーブルの対応する変更データテーブルのデータをエクスポートします。
    6. 変更データテーブルを切り捨てます。
    7. 変更データテーブルを変更して、列を追加します。
    8. エクスポートしたデータを変更した変更データテーブルに読み込みます。
  5. ASN レジスターテーブルで、テーブルを INACTIVE としてマーク付けします。これにより、古い変更データテーブルが非アクティブとしてマーク付けされるため、それらのテーブルにあるデータは保持されますが、更新されなくなります。
  6. ASN キャプチャーサービスを再初期化します。
  7. 任意手順:コネクターを再起動して、変更イベントにある更新された列名を確認します。
Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.