第2章 ソースコネクター
Debezium は、さまざまなデータベース管理システムからの変更をキャプチャーするソースコネクターのライブラリーを提供します。各コネクターは、構造が非常に類似した変更イベントを生成するため、イベントの発生元に関係なく、アプリケーションがイベントを簡単に使用して応答できるようになります。
現在、Debezium は次のデータベース用のソースコネクターを提供しています。
2.1. Db2 の Debezium コネクター
Debezium の Db2 コネクターは、Db2 データベースのテーブルで行レベルの変更をキャプチャーできます。このコネクターと互換性のある Db2 データベースのバージョンについては、Debezium でサポートされる設定ページを参照してください。
このコネクターは、テーブルを "キャプチャーモード" にする SQL ベースのポーリングモデルを使用する、SQL Server の Debezium 実装から大きく影響を受けます。テーブルがキャプチャーモードの場合、Debezium Db2 コネクターは、そのテーブルへの行レベルの更新ごとに変更イベントを生成し、ストリーミングします。
キャプチャーモードのテーブルには、関連する変更テーブルがあり、このテーブルは Db2 によって作成されます。キャプチャーモードのテーブルに対する変更ごとに、Db2 はその変更に関するデータをテーブルの関連する変更データテーブルに追加します。変更データテーブルには、行の各状態のエントリーが含まれます。また、削除に関する特別なエントリーもあります。Debezium Db2 コネクターは変更イベントを変更データテーブルから読み取り、イベントを Kafka トピックに出力します。
Debezium Db2 コネクターが Db2 データベースに初めて接続すると、コネクターが変更をキャプチャーするように設定されたテーブルの整合性スナップショットを読み取ります。デフォルトでは、システム以外のテーブルがすべて対象になります。キャプチャーモードにするテーブルまたはキャプチャーモードから除外するテーブルを指定できるコネクター設定プロパティーがあります。
スナップショットが完了すると、コネクターはコミットされた更新の変更イベントをキャプチャーモードのテーブルに出力し始めます。デフォルトでは、特定のテーブルの変更イベントは、テーブルと同じ名前を持つ Kafka トピックに移動します。アプリケーションとサービスはこれらのトピックから変更イベントを使用します。
コネクターには、Linux 用の Db2 の標準部分として利用できる抽象構文表記 (ASN) ライブラリーを使用する必要があります。ASN ライブラリーを使用するには、IBM InfoSphere Data Replication (IIDR) のライセンスが必要です。ASN ライブラリーを使用するには、IIDR をインストールする必要はありません。
Debezium Db2 コネクターを使用するための情報および手順は、以下のように設定されています。
2.1.1. Debezium Db2 コネクターの概要
Debezium Db2 コネクターは、Db2 で SQL レプリケーションを有効にする ASN Capture/Apply エージェント をベースにしています。キャプチャーエージェントは以下を行います。
- キャプチャーモードであるテーブルの変更データテーブルを生成します。
- キャプチャーモードのテーブルを監視し、更新の変更イベントを対応する変更データテーブルのテーブルに格納します。
Debezium コネクターは SQL インターフェイスを使用して変更イベントの変更データテーブルに対してクエリーを実行します。
データベース管理者は、変更をキャプチャーするテーブルをキャプチャーモードにする必要があります。便宜上およびテストを自動化するために、以下の管理タスクをコンパイルし、実行できる Debezium 管理ユーザー定義機能 (UDF) が C にあります。
- ASN エージェントの開始、停止、および再初期化。
- テーブルをキャプチャーモードにする。
- レプリケーション (ASN) スキーマと変更データテーブルの作成。
- キャプチャーモードからのテーブルの削除。
また、Db2 制御コマンドを使用してこれらのタスクを実行することもできます。
対象のテーブルがキャプチャーモードになった後、コネクターは対応する変更データテーブルを読み取り、テーブル更新の変更イベントを取得します。コネクターは、変更されたテーブルと同じ名前を持つ Kafka トピックに対して、行レベルの挿入、更新、および削除操作ごとに変更イベントを出力します。これは、変更可能なデフォルトの動作です。クライアントアプリケーションは、対象のデータベーステーブルに対応する Kafka トピックを読み取り、行レベルの各変更イベントに対応できます。
通常、データベース管理者はテーブルのライフサイクルの途中でテーブルをキャプチャーモードにします。つまり、コネクターにはテーブルに加えられたすべての変更の完全な履歴はありません。そのため、Db2 コネクターが最初に特定の Db2 データベースに接続すると、キャプチャーモードである各テーブルで 整合性スナップショット を実行して起動します。コネクターは、スナップショットの完成後に、スナップショットが作成された時点から変更イベントをストリーミングします。これにより、コネクターはキャプチャーモードのテーブルの整合性のあるビューで開始し、スナップショットの実行中に加えられた変更は破棄されません。
Debezium コネクターはフォールトトラレントです。コネクターは変更イベントを読み取りおよび生成すると、変更データテーブルエントリーのログシーケンス番号 (LSN) を記録します。LSN はデータベースログの変更イベントの位置になります。コネクターが何らかの理由で停止した場合 (通信障害、ネットワークの問題、クラッシュなど)、コネクターは再起動後に最後に停止した場所の変更データテーブルの読み取りを続行します。これにはスナップショットが含まれます。つまり、コネクターの停止時にスナップショットが完了しなかった場合、コネクターの再起動時に新しいスナップショットが開始されます。
2.1.2. Debezium Db2 コネクターの仕組み
Debezium Db2 コネクターを最適に設定および実行するには、コネクターによるスナップショットの実行方法、変更イベントのストリーム方法、Kafka トピック名の決定方法、およびスキーマ変更の処理方法を理解すると便利です。
詳細は以下を参照してください。
2.1.2.1. Debezium Db2 コネクターによるデータベーススナップショットの実行方法
Db2 のレプリケーション機能は、データベース変更の完全な履歴を保存するようには設計されていません。そのため、Debezium Db2 コネクターはログからデータベースの履歴全体を取得できません。コネクターがデータベースの現在の状態のベースラインを確立できるようにするには、コネクターの初回起動時に、キャプチャーボード のテーブルの最初の 整合性スナップショット を実行します。スナップショットが変更をキャプチャーするたびに、コネクターはキャプチャーされたテーブルの Kafka トピックに read
イベントを発行します。
スナップショットの詳細は、以下のセクションを参照してください。
2.1.2.1.1. Debezium Db2 コネクターが最初のスナップショットの実行に使用するデフォルトのワークフロー
以下のワークフローでは、Debezium がスナップショットを作成する手順を示しています。この手順では、snapshot.mode
設定プロパティーがデフォルト値 (initial)
に設定されている場合のスナップショットのプロセスを説明します。snapshot.mode
プロパティーの値を変更することで、コネクターがスナップショットを作成する方法をカスタマイズできます。別のスナップショットモードを設定する場合、コネクターはこのワークフローの変更バージョンを使用してスナップショットを完了します。
- データベースへの接続を確立します。
-
キャプチャーモードで、かつスナップショットに含める必要があるテーブルを決定します。デフォルトでは、コネクターはシステム以外のすべてのテーブルのデータをキャプチャーします。スナップショットが完了した後、コネクターは指定されたテーブルのデータをストリーミングし続けます。コネクターで特定のテーブルからのみデータをキャプチャーする場合は、
table.include.list
やtable.exclude.list
などのプロパティーを設定して、テーブルまたはテーブル要素のサブセットのみのデータをキャプチャーするようにコネクターに指示できます。 -
キャプチャーモードの各テーブルでロックを取得します。このロックを使用して、スナップショットが完了するまで、それらのテーブルでスキーマの変更が行われないようにします。ロックのレベルは、
snapshot.isolation.mode
コネクター設定プロパティーによって決定されます。 - サーバーのトランザクションログで、最上位 (最新) の LSN の位置を読み取ります。
すべてのテーブル、またはキャプチャー対象として指定されたすべてのテーブルのスキーマをキャプチャーします。コネクターは、内部データベースのスキーマ履歴トピックにスキーマ情報を保持します。スキーマ履歴は、変更イベントの発生時に有効な構造に関する情報を提供します。
注記デフォルトでは、コネクターは、キャプチャー用に設定されていないテーブルも含め、キャプチャーモードにあるデータベース内の全テーブルのスキーマをキャプチャーします。テーブルがキャプチャー用に設定されていない場合、最初のスナップショットはテーブルの構造のみをキャプチャーし、テーブルデータはキャプチャーされません。
初期スナップショットに含まれなかったテーブルのスキーマ情報がスナップショットに保持される理由の詳細は、初期スナップショットがすべてのテーブルのスキーマをキャプチャーする理由 を参照してください。
- 手順 3 で取得したロックをすべてリリースします。他のデータベースクライアントは、以前にロックされていたテーブルに書き込みできるようになります。
手順 4 で読み取った LSN 位置で、コネクターはキャプチャーするように指定されたテーブルをスキャンします。スキャン中に、コネクターは次のタスクを実行します。
- スナップショットが開始される前に、テーブルが作成されたことを確認します。スナップショットの開始後にテーブルが作成された場合、コネクターはテーブルをスキップします。スナップショットが完了し、コネクターがストリーミングに移行すると、スナップショットの開始後に作成されたテーブルに対して変更イベントが発行されます。
-
テーブルからキャプチャーされた行ごとに
read
イベントを生成します。すべてのread
イベントには、LSN の位置が含まれ、これは手順 4 で取得した LSN の位置と同じです。 -
ソーステーブルの Kafka トピックに各
read
イベントを出力します。 - 該当する場合は、データテーブルロックを解放します。
- コネクターオフセットにスナップショットの正常な完了を記録します。
作成された初期スナップショットは、キャプチャーされたテーブルの各行の現在の状態をキャプチャーします。このベースライン状態から、コネクターは発生した後続の変更をキャプチャーします。
スナップショットプロセスが開始されたら、コネクターの障害、リバランス、またはその他の理由でプロセスが中断されると、コネクターの再起動後にプロセスが再起動されます。
コネクターによって最初のスナップショットが完了した後、更新に抜けがないように、手順 4 で読み取った位置からストリーミングを続行します。
何らかの理由でコネクターが再び停止した場合に、コネクターは再起動後に最後に停止した位置から変更のストリーミングを再開します。
設定 | 説明 |
---|---|
| コネクターは起動するたびにスナップショットを実行します。スナップショットが完了すると、コネクターは、後続のデータベース変更のに備え、イベントレコードのストリーミングを開始します。 |
| コネクターは 初期スナップショットを作成するためのデフォルトのワークフロー で説明されているように、データベーススナップショットを実行します。スナップショットが完了すると、コネクターは、後続のデータベース変更のに備え、イベントレコードのストリーミングを開始します。 |
| コネクターはデータベーススナップショットを実行します。スナップショットが完了すると、コネクターは停止し、後続のデータベース変更のイベントレコードをストリーミングしなくなります。 |
|
非推奨です。 |
|
コネクターは、デフォルトのスナップショットワークフロー で説明されているすべての手順を実行して、関連するすべてのテーブルの構造をキャプチャーします。ただし、コネクターの起動時点のデータセットを表す |
| 損失または破損したデータベーススキーマの履歴トピックを復元するにはこのオプションを設定します。再起動後、コネクターはソーステーブルからトピックを再構築するスナップショットを実行します。また、このプロパティーを設定して、予期しない増加が発生するデータベーススキーマ履歴トピックを定期的にプルーニングすることもできます。 警告 最後のコネクターのシャットダウン後にスキーマの変更がデータベースにコミットされた場合、このモードを使用してスナップショットを実行しないでください。 |
| コネクターが起動した後、次のいずれかの状況を検出した場合にのみスナップショットが実行されます。
|
詳細は、コネクター設定プロパティーテーブルの snapshot.mode
を参照してください。
2.1.2.1.2. 初期スナップショットがすべてのテーブルのスキーマ履歴をキャプチャーする理由
コネクターが実行する最初のスナップショットは、2 種類の情報をキャプチャーします。
- テーブルデータ
-
コネクターの
table.include.list
プロパティーにあるテーブルのINSERT
、UPDATE
、およびDELETE
操作に関する情報。 - スキーマデータ
- テーブルに適用される構造の変更を記述する DDL ステートメント。スキーマデータは、内部スキーマ履歴トピックとコネクターのスキーマ変更トピック (設定されている場合) の両方に保持されます。
初期スナップショットを実行すると、キャプチャー対象として指定されていないテーブルのスキーマ情報がスナップショットによってキャプチャーされることが分かります。デフォルトでは、初期スナップショットは、キャプチャー用に指定されたテーブルからだけでなく、データベースに存在するすべてのテーブルのスキーマ情報を取得するように設計されています。コネクターでは、テーブルのスキーマがスキーマ履歴トピックにある状態で、テーブルをキャプチャーする必要があります。初期スナップショットが元のキャプチャーセットの一部ではないテーブルのスキーマデータをキャプチャーできるようにして、後で必要になった場合にこれらのテーブルからイベントデータを簡単にキャプチャーできるように、Debezium はコネクターを準備します。初期スナップショットがテーブルのスキーマをキャプチャーしない場合は、コネクターがテーブルからデータをキャプチャーする前に、履歴トピックにスキーマを追加する必要があります。
場合によっては、最初のスナップショットでのスキーマキャプチャーを制限する場合があります。これは、スナップショットの完了に必要な時間の短縮に便利です。または、Debezium が複数の論理データベースにアクセスできるユーザーアカウントを使用して、データベースインスタンスに接続しているにもかかわらず、コネクターで特定の論理データベース内のテーブルからの変更のみをキャプチャーする場合にも便利です。
関連情報
- 初期スナップショットでキャプチャーされなかったテーブルからのデータのキャプチャー (スキーマ変更なし)
- 初期スナップショットでキャプチャーされなかったテーブルからのデータのキャプチャー (スキーマ変更)
-
schema.history.internal.store.only.captured.tables.ddl
プロパティーを設定して、スキーマ情報をキャプチャーするテーブルを指定します。 -
schema.history.internal.store.only.captured.databases.ddl
プロパティーを設定して、スキーマ変更をキャプチャーする論理データベースを指定します。
2.1.2.1.3. 初期スナップショットでキャプチャーされなかったテーブルからのデータのキャプチャー (スキーマ変更なし)
コネクターを使用して、最初のスナップショットでスキーマがキャプチャーされなかったテーブルからデータをキャプチャーする場合があります。コネクターの設定によっては、最初のスナップショットはデータベース内の特定のテーブルのテーブルスキーマのみをキャプチャーする場合があります。テーブルスキーマが履歴トピックに存在しない場合、コネクターはテーブルのキャプチャーに失敗し、スキーマ欠落エラーを報告します。
テーブルからデータを取得できる場合もありますが、テーブルスキーマを追加するには別の手順を実行する必要があります。
前提条件
- コネクターにより最初のスナップショット中にキャプチャーされなかったスキーマが含まれるテーブルからデータをキャプチャーしたいと考えている。
- コネクターが読み取った最も初期の変更テーブルエントリーと、最新の変更テーブルエントリーの LSN の間で、スキーマの変更がテーブルに加えられていない。構造が変更された新しいテーブルからデータを取得する方法は、「初期スナップショットでキャプチャーされなかったテーブルからのデータのキャプチャー (スキーマ変更)」 を参照してください。
手順
- コネクターを停止します。
-
schema.history.internal.kafka.topic プロパティー
で指定された内部データベーススキーマ履歴トピックを削除します。 設定された Kafka Connect
offset.storage.topic
内のオフセットをクリアします。オフセットを削除する方法の詳細は、Debezium コミュニティーの FAQ を参照してください。警告オフセットの削除は、内部 Kafka Connect データの操作の経験がある上級ユーザーのみが実行してください。この操作によりシステムが破損する場合があるため、最後の手段としてのみ実行してください。
以下の変更をコネクター設定に適用します。
(オプション)
schema.history.internal.captured.tables.ddl
の値をfalse
に設定します。この設定により、スナップショットですべてのテーブルのスキーマがキャプチャーされ、今後、コネクターがすべてのテーブルのスキーマ履歴を再構築できるようにします。
注記すべてのテーブルのスキーマをキャプチャーするスナップショットは、完了までにさらに時間がかかります。
-
コネクターがキャプチャーするテーブルを
table.include.list
に追加します。 snapshot.mode
を次のいずれかの値に設定します。Initial
-
コネクターを再起動すると、テーブルデータとテーブル構造をキャプチャーするデータベースの完全なスナップショットが作成されます。
このオプションを選択する場合は、コネクターがすべてのテーブルのスキーマをキャプチャーできるように、schema.history.internal.captured.tables.ddl
プロパティーの値をfalse
に設定することを検討してください。 schema_only
- コネクターを再起動すると、テーブルスキーマのみをキャプチャーするスナップショットが作成されます。完全なデータスナップショットとは異なり、このオプションではテーブルデータはキャプチャーされません。完全なスナップショットが作成される前に、早くコネクターを再起動する場合は、このオプションを使用します。
-
コネクターを再起動します。コネクターは、
snapshot.mode
で指定されたタイプのスナップショットを完了します。 (オプション) コネクターが
schema_only
スナップショットを実行した場合、スナップショットの完了後に 増分スナップショット を開始して、追加したテーブルからデータをキャプチャーします。コネクターは、テーブルからリアルタイムの変更をストリーミングし続けながら、スナップショットを実行します。増分スナップショットを実行すると、次のデータ変更がキャプチャーされます。- コネクターが以前にキャプチャーしたテーブルの場合、増分スナップショットは、コネクターが停止している間、つまりコネクターが停止してから現在の再起動までの間に発生した変更をキャプチャーします。
- 新しく追加されたテーブルの場合、増分スナップショットは既存のテーブル行をすべてキャプチャーします。
2.1.2.1.4. 初期スナップショットでキャプチャーされなかったテーブルからのデータのキャプチャー (スキーマ変更)
スキーマ変更がテーブルに適用される場合、スキーマ変更前にコミットされたレコードの構造は、変更後にコミットされたレコードとは異なります。Debezium はテーブルからデータをキャプチャーするときに、スキーマ履歴を読み取り、各イベントに正しいスキーマが適用されていることを確認します。スキーマがスキーマ履歴トピックに存在しない場合、コネクターはテーブルをキャプチャーできず、エラーが発生します。
最初のスナップショットでキャプチャーされず、テーブルのスキーマが変更されたテーブルからデータをキャプチャーする場合、スキーマがまだ使用可能でない場合は、履歴トピックにスキーマを追加する必要があります。新しいスキーマスナップショットを実行するか、テーブルの初期スナップショットを実行して、スキーマを追加できます。
前提条件
- コネクターにより最初のスナップショット中にキャプチャーされなかったスキーマが含まれるテーブルからデータをキャプチャーしたいと考えている。
- スキーマ変更がテーブルに適用されたため、キャプチャーされるレコードの構造が不均一になっている。
手順
- 初期スナップショットにすべてのテーブルのスキーマがキャプチャーされている場合 (
store.only.captured.tables.ddl
はfalse
に設定されました)。 -
table.include.list
プロパティーを編集して、キャプチャーするテーブルを指定します。 - コネクターを再起動します。
- 新しく追加したテーブルから既存のデータをキャプチャーする場合は、増分スナップショット を開始します。
-
- 初期スナップショットにすべてのテーブルのスキーマがキャプチャーされていない場合 (
store.only.captured.tables.ddl
がtrue
に設定されています)。 最初のスナップショットでキャプチャーするテーブルのスキーマが保存されなかった場合は、次のいずれかの手順を実行します。
- 手順 1: スキーマスナップショット、その後に増分スナップショット
この手順では、コネクターは最初にスキーマのスナップショットを実行します。その後、増分スナップショットを開始して、コネクターがデータを同期できるようにします。
- コネクターを停止します。
-
schema.history.internal.kafka.topic プロパティー
で指定された内部データベーススキーマ履歴トピックを削除します。 設定された Kafka Connect
offset.storage.topic
内のオフセットをクリアします。オフセットを削除する方法の詳細は、Debezium コミュニティーの FAQ を参照してください。警告オフセットの削除は、内部 Kafka Connect データの操作の経験がある上級ユーザーのみが実行してください。この操作によりシステムが破損する場合があるため、最後の手段としてのみ実行してください。
次の手順の説明に従って、コネクター設定のプロパティーの値を設定します。
-
snapshot.mode
プロパティーの値をschema_only
に設定します。 -
table.include.list
を編集して、キャプチャーするテーブルを追加します。
-
- コネクターを再起動します。
- Debezium が新規および既存のテーブルのスキーマをキャプチャーするまで待ちます。コネクターが停止した後にテーブルで発生したデータ変更はキャプチャーされません。
- データが損失されないようにするには、増分スナップショット を開始します。
- 手順 2: 初期スナップショットと、それに続くオプションの増分スナップショット
この手順では、コネクターはデータベースの完全な初期スナップショットを実行します。他の初期スナップショットと同様、多数の大きなテーブルが含まれるデータベースでは、初期スナップショットの実行操作には時間がかかる可能性があります。スナップショットの完了後、任意で増分スナップショットをトリガーして、コネクターがオフラインの間に発生した変更をキャプチャーできます。
- コネクターを停止します。
-
schema.history.internal.kafka.topic プロパティー
で指定された内部データベーススキーマ履歴トピックを削除します。 設定された Kafka Connect
offset.storage.topic
内のオフセットをクリアします。オフセットを削除する方法の詳細は、Debezium コミュニティーの FAQ を参照してください。警告オフセットの削除は、内部 Kafka Connect データの操作の経験がある上級ユーザーのみが実行してください。この操作によりシステムが破損する場合があるため、最後の手段としてのみ実行してください。
-
table.include.list
を編集して、キャプチャーするテーブルを追加します。 次の手順の説明に従って、コネクター設定のプロパティーの値を設定します。
-
snapshot.mode
プロパティーの値をinitial
に設定します。 -
(オプション)
schema.history.internal.store.only.captured.tables.ddl
をfalse
に設定します。
-
- コネクターを再起動します。コネクターはデータベース全体のスナップショットを取得します。スナップショットが完了すると、コネクターはストリーミングに移行します。
- (オプション) コネクターがオフラインの間に変更されたデータをキャプチャーするには、増分スナップショット を開始します。
2.1.2.2. アドホックスナップショット
デフォルトでは、コネクターは初回スナップショット操作の開始後にのみ実行されます。通常の状況では、この最初のスナップショットが作成されると、コネクターではスナップショットプロセスは繰り返し処理されません。コネクターがキャプチャーする今後の変更イベントデータはストリーミングプロセス経由でのみ行われます。
ただし、場合によっては、最初のスナップショット中にコネクターを取得したデータが古くなったり、失われたり、または不完全となったり可能性があります。テーブルデータを再キャプチャーするメカニズムを提供するため、Debezium にはアドホックスナップショットを実行するオプションがあります。Debezium 環境で次のいずれかの変更が発生したら、アドホックスナップショットを実行することを推奨します。
- コネクター設定は、異なるテーブルセットをキャプチャーするように変更されます。
- Kafka トピックを削除して、再構築する必要があります。
- 設定エラーや他の問題が原因で、データの破損が発生します。
アドホックと呼ばれるスナップショット を開始することで、以前にスナップショットをキャプチャーしたテーブルのスナップショットを再実行できます。アドホックスナップショットには、シグナルテーブル を使用する必要があります。シグナルリクエストを Debezium シグナルテーブルに送信して、アドホックスナップショットを開始します。
既存のテーブルのアドホックスナップショットを開始すると、コネクターはテーブルにすでに存在するトピックにコンテンツを追加します。既存のトピックが削除された場合には、トピックの自動作成 が有効になっているのであれば、Debezium は自動的にトピックを作成できます。
アドホックのスナップショットシグナルは、スナップショットに追加するテーブルを指定します。スナップショットは、データベースの内容全体をキャプチャーしたり、データベース内のテーブルのサブセットのみをキャプチャーしたりできます。また、スナップショットは、データベース内のテーブルの内容のサブセットをキャプチャできます。
execute-snapshot
メッセージをシグナルテーブルに送信してキャプチャーするテーブルを指定します。execute-snapshot
シグナルのタイプを incremental
または blocking
に設定し、スナップショットに含めるテーブルの名前を次の表に示すように指定します。
フィールド | デフォルト | 値 |
---|---|---|
|
|
実行するスナップショットのタイプを指定します。 |
| 該当なし |
スナップショットに含めるテーブルの完全修飾名に一致する正規表現を含む配列。 |
| 該当なし |
コネクターがスナップショットに含めるレコードのサブセットを決定するために評価する追加条件のセットを指定する、オプションの配列。
|
| 該当なし | スナップショット処理中にコネクターがテーブルのプライマリーキーとして使用する列名を指定するオプションの文字列。 |
アドホック増分スナップショットのトリガー
アドホック増分スナップショットを開始するには、execute-snapshot
シグナルタイプのエントリーをシグナリングテーブルに追加するか、シグナルメッセージを Kafka シグナリングトピックに送信します。コネクターがメッセージを処理した後に、スナップショット操作を開始します。スナップショットプロセスは、最初と最後のプライマリーキーの値を読み取り、これらの値を各テーブルの開始ポイントおよびエンドポイントとして使用します。テーブルのエントリー数と設定されたチャンクサイズに基づいて、Debezium はテーブルをチャンクに分割し、チャンクごとに 1 度に 1 つずつスナップショットを順番に作成していきます。
詳細は、スナップショットの増分を参照してください。
アドホックブロッキングスナップショットのトリガー
シグナリングテーブルまたはシグナリングトピックに、execute-snapshot
シグナルタイプを持つエントリーを追加することによって、アドホックブロッキングスナップショットを開始します。コネクターがメッセージを処理した後に、スナップショット操作を開始します。コネクターはストリーミングを一時的に停止し、初期スナップショットの時と同じプロセスに従って、指定されたテーブルのスナップショットを開始します。スナップショットが完了すると、コネクターはストリーミングを再開します。
詳細は、ブロッキングスナップショット を参照してください。
2.1.2.3. 増分スナップショット
スナップショットを柔軟に管理するため、Debezium には 増分スナップショット と呼ばれる補助スナップショットメカニズムが含まれています。増分スナップショットは、Debezium コネクターにシグナルを送信するため の Debezium メカニズムに依存します。
増分スナップショットでは、最初のスナップショットのように、データベースの完全な状態を一度にすべてキャプチャーする代わりに、一連の設定可能なチャンクで各テーブルを段階的にキャプチャーします。スナップショットがキャプチャーするテーブルと、各チャンクのサイズ を指定できます。チャンクのサイズにより、データベース上の各フェッチ操作中にスナップショットで収集される行数が決まります。増分スナップショットのデフォルトのチャンクサイズは 1024 行です。
増分スナップショットが進むと、Debezium はウォーターマークを使用して進捗を追跡し、キャプチャーする各テーブル行のレコードを管理します。この段階的なアプローチでは、標準の初期スナップショットプロセスと比較して、以下の利点があります。
- スナップショットが完了するまで、ストリーミングストリーミングを延期する代わりに、ストリームしたデータキャプチャーと並行して増分スナップショットを実行できます。コネクターはスナップショットプロセス全体で変更ログからのほぼリアルタイムイベントをキャプチャーし続け、他の操作はブロックしません。
- 増分スナップショットの進捗が中断された場合は、データを失うことなく再開できます。プロセスが再開すると、スナップショットは最初からテーブルをキャプチャーするのではなく、停止した時点から開始します。
-
いつでも増分スナップショットを実行し、必要に応じてプロセスを繰り返してデータベースの更新に適合できます。たとえば、コネクター設定を変更してテーブルを
table.include.list
プロパティーに追加した後にスナップショットを再実行します。
増分スナップショットプロセス
増分スナップショットを実行する場合には、Debezium は各テーブルをプライマリーキー別に分類して、設定されたチャンクサイズ に基づいてテーブルをチャンクに分割します。チャンクごとに作業し、テーブルの行ごとにチャンクでキャプチャーします。キャプチャーする行ごとに、スナップショットは READ
イベントを出力します。そのイベントは、対象となるチャンクのスナップショットを開始する時の行の値を表します。
スナップショットの作成が進むにつれ、他のプロセスがデータベースへのアクセスを継続し、テーブルレコードが変更される可能性があります。このような変更を反映させるように、通常通りに INSERT
、UPDATE
、DELETE
操作がトランザクションログにコミットされます。同様に、継続中の Debezium ストリーミングプロセスは、これらの変更イベントを検出し、対応する変更イベントレコードを Kafka に出力します。
Debezium を使用してプライマリーキーが同じレコード間での競合を解決する方法
場合によっては、ストリーミングプロセスが出力する UPDATE
または DELETE
イベントを順番に受信できます。つまり、ストリーミングプロセスは、スナップショットがその行の READ
イベントが含まれるチャンクをキャプチャーする前に、テーブルの行を変更するイベントを生成する可能性があります。スナップショットが最終的に対象の行にあった READ
イベントを出力すると、その値はすでに置き換えられています。Debezium は、シーケンスが到達する増分スナップショットイベントが正しい論理順序で処理されるように、競合を解決するためにバッファースキームを使用します。スナップショットのイベント間で競合が発生し、ストリームされたイベントが解決されてからでないと、Debezium はイベントのレコードを Kafka に送信しません。
スナップショットウィンドウ
遅れて入ってきた READ
イベントと、同じテーブルの行を変更するストリーミングイベント間の競合の解決を容易にするために、Debezium は スナップショットウィンドウ と呼ばれるものを使用します。スナップショットウィンドウは、増分スナップショットが指定のテーブルチャンクのデータをキャプチャーしている途中に、間隔を決定します。チャンクのスナップショットウィンドウを開く前に、Debezium は通常の動作に従い、トランザクションログから直接ターゲットの Kafka トピックにイベントをダウンストリームに出力します。ただし、特定のチャンクのスナップショットが開放された瞬間から終了するまで、Debezium は重複除去のステップを実行して、プライマリーキーが同じイベント間での競合を解決します。
データコレクションごとに、Debezium は 2 種類のイベントを出力し、それらの両方のレコードを単一の宛先 Kafka トピックに保存します。テーブルから直接キャプチャーするスナップショットレコードは、READ
操作として出力されます。その間、ユーザーはデータコレクションのレコードの更新を続け、各コミットを反映するようにトランザクションログが更新されるので、Debezium は変更ごとに UPDATE
または DELETE
操作を出力します。
スナップショットウィンドウが開放され、Debezium がスナップショットチャンクの処理を開始すると、スナップショットレコードをメモリーバッファーに提供します。スナップショットウィンドウ中に、バッファー内の READ
イベントのプライマリーキーは、受信ストリームイベントのプライマリーキーと比較されます。一致するものが見つからない場合、ストリーミングされたイベントレコードが Kafka に直接送信されます。Debezium が一致を検出すると、バッファーされた READ
イベントを破棄し、ストリーミングされたレコードを宛先トピックに書き込みます。これは、ストリーミングされたイベントが静的スナップショットイベントよりも論理的に優先されるためです。チャンクのスナップショットウィンドウが終了すると、バッファーに含まれるのは、関連するトランザクションログイベントが存在しない READ
イベントのみです。Debezium は、これらの残りの READ
イベントをテーブルの Kafka トピックに出力します。
コネクターは各スナップショットチャンクにプロセスを繰り返します。
現在、増分スナップショットを開始するには、次のいずれかの方法を使用できます。
Db2 の Debezium コネクターでは、増分スナップショットの実行中のスキーマの変更はサポートしません。
2.1.2.3.1. 増分スナップショットのトリガー
増分スナップショットを開始するには、ソースデータベースのシグナリングテーブルに アドホックスナップショットシグナル を送信します。スナップショットシグナルは SQL INSERT
クエリーとして送信します。
Debezium がシグナルテーブルの変更を検出すると、シグナルを読み取り、要求されたスナップショット操作を実行します。
送信するクエリーはスナップショットに追加するテーブルを指定し、必要に応じてスナップショット操作の種類を指定します。Debezium は現在、incremental
と blocking
のスナップショットタイプをサポートしています。
スナップショットに追加するテーブルを指定するには、テーブルをリストする data-collections
配列またはテーブルの照合に使用する正規表現の配列を指定します。以下に例を示します。
{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}
増分スナップショットシグナルの data-collections
アレイにはデフォルト値がありません。data-collections
配列が空の場合、Debezium は空の配列をアクションが必要ないと解釈し、スナップショットは作成しません。
スナップショットに含めるテーブルの名前にドット (.
)、スペース、またはその他の英数字以外の文字が含まれている場合は、テーブル名を二重引用符でエスケープする必要があります。
たとえば、 public
スキーマに存在し、My.Table
という名前のテーブルを含めるには、"public.\"My.Table\""
の形式を使用します。
前提条件
- ソースデータベースにシグナリングデータコレクションが存在する。
-
シグナルデータコレクションが
signal.data.collection
プロパティーで指定されている。
ソースシグナリングチャネルを使用して増分スナップショットをトリガーする
SQL クエリーを送信し、アドホック増分スナップショット要求をシグナルテーブルに追加します。
INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"<snapshotType>","additional-conditions":[{"data-collection": "<fullyQualfiedTableName>", "filter": "<additional-condition>"}]}');
以下に例を示します。
INSERT INTO myschema.debezium_signal (id, type, data) 1 values ('ad-hoc-1', 2 'execute-snapshot', 3 '{"data-collections": ["schema1.table1", "schema1.table2"], 4 "type":"incremental", 5 "additional-conditions":[{"data-collection": "schema1.table1" ,"filter":"color=\'blue\'"}]}'); 6
コマンドの
id
、type
、およびdata
パラメーターの値は、シグナルテーブルのフィールド に対応します。
以下の表では、この例のパラメーターを説明しています。表2.3 シグナルテーブルに増分スナップショットシグナルを送信する SQL コマンドのフィールドの説明 項目 値 説明 1
schema.debezium_signal
ソースデータベースにあるシグナルテーブルの完全修飾名を指定します。
2
ad-hoc-1
id
パラメーターは、シグナルリクエストのID
識別子として割り当てられる任意の文字列を指定します。
この文字列を使用して、シグナルテーブルのエントリーへのログメッセージを特定します。Debezium はこの文字列を使用しません。代わりに、スナップショット作成中に、Debezium は独自のID
文字列をウォーターマークシグナルとして生成します。3
execute-snapshot
type
パラメーターを指定し、シグナルがトリガーする操作を指定します。
4
data-collections
シグナルの
data
フィールドの必須コンポーネントで、スナップショットに含めるテーブル名の配列またはテーブル名と一致する正規表現を指定します。
配列には、schema.table
形式を使用してテーブルの完全修飾名と一致する正規表現がリストされます。この形式は、コネクターの シグナリングテーブル の名前を指定するために使用する形式と同じです。5
incremental
実行するスナップショット操作のタイプを指定する、シグナルの
data
フィールドのオプションのtype
コンポーネント。
有効な値はincremental
とblocking
です。
値を指定しない場合、コネクターはデフォルトで増分スナップショットを実行します。6
additional-conditions
コネクターがスナップショットに含めるレコードのサブセットを決定するために評価する追加条件のセットを指定する、オプションの配列。
各追加条件は、data-collection
プロパティーとfilter
プロパティーを持つオブジェクトです。データの収集単位で異なるフィルターを指定できます。
*data-collection
プロパティーは、フィルターが適用されるデータコレクションの完全修飾名です。additional-conditions
パラメーターの詳細は、「additional-conditions
付きでアドホック増分スナップショットを実行する」 を参照してください。
2.1.2.3.2. additional-conditions
付きでアドホック増分スナップショットを実行する
スナップショットに、テーブル内のコンテンツのサブセットのみを含める場合は、スナップショットシグナルに additional-conditions
パラメーターを追加してシグナル要求を変更できます。
一般的なスナップショットの SQL クエリーは、以下の形式を取ります。
SELECT * FROM <tableName> ....
additional-conditions
パラメーターを追加して、以下の例のように WHERE
条件を SQL クエリーに追加します。
SELECT * FROM <data-collection> WHERE <filter> ....
以下の例は、シグナルテーブルに追加の条件を含むアドホック増分スナップショット要求を送信する SQL クエリーを示しています。
INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"<snapshotType>","additional-conditions":[{"data-collection": "<fullyQualfiedTableName>", "filter": "<additional-condition>"}]}');
たとえば、以下の列が含まれる products
テーブルがあるとします。
-
id
(プライマリーキー) -
color
-
quantity
products
テーブルの増分スナップショットに color=blue
のデータ項目のみを含める場合は、次の SQL ステートメントを使用してスナップショットをトリガーできます。
INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.products"],"type":"incremental", "additional-conditions":[{"data-collection": "schema1.products", "filter": "color=blue"}]}');
additional-conditions
パラメーターを使用すると、列が 2 つ以上となる条件を指定することもできます。たとえば、前述の例の products
テーブルを使用して、color=blue
および quantity>10
だけに一致するアイテムのみのデータが含まれる増分スナップショットをトリガーするクエリーを送信できます。
INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.products"],"type":"incremental", "additional-conditions":[{"data-collection": "schema1.products", "filter": "color=blue AND quantity>10"}]}');
以下の例は、コネクターによってキャプチャーされる増分スナップショットイベントの JSON を示しています。
例2.1 増分スナップショットイベントメッセージ
{ "before":null, "after": { "pk":"1", "value":"New data" }, "source": { ... "snapshot":"incremental" 1 }, "op":"r", 2 "ts_ms":"1620393591654", "ts_us":"1620393591654547", "ts_ns":"1620393591654547920", "transaction":null }
項目 | フィールド名 | 説明 |
---|---|---|
1 |
|
実行するスナップショット操作タイプを指定します。 |
2 |
|
イベントタイプを指定します。 |
2.1.2.3.3. Kafka シグナルチャネルを使用して増分スナップショットをトリガーする
設定された Kafka トピック にメッセージを送信して、コネクターにアドホック増分スナップショットを実行するよう要求できます。
Kafka メッセージのキーは、topic.prefix
コネクター設定オプションの値と一致する必要があります。
メッセージの値は、type
と data
フィールドが含まれる JSON オブジェクトとなっています。
シグナルタイプは execute-snapshot
で、data
フィールドには以下のフィールドが必要です。
フィールド | デフォルト | 値 |
---|---|---|
|
|
実行するスナップショットのタイプ。現在、Debezium は |
| 該当なし |
スナップショットに含めるテーブルの完全修飾名と一致する、コンマ区切りの正規表現の配列。 |
| 該当なし |
コネクターがスナップショットに含めるレコードのサブセットを指定するために評価する基準を指定する、オプションの追加条件の配列。 |
例2.2 execute-snapshot
Kafka メッセージ
Key = `test_connector` Value = `{"type":"execute-snapshot","data": {"data-collections": ["{collection-container}.table1", "{collection-container}.table2"], "type": "INCREMENTAL"}}`
additional-conditions 付きのアドホック増分スナップショット
Debezium は additional-conditions
フィールドを使用してテーブルのコンテンツのサブセットを選択します。
通常、Debezium はスナップショットを実行するときに、次のような SQL クエリーを実行します。
SELECT * FROM <tableName> ….
スナップショット要求に additional-conditions
プロパティーが含まれている場合、プロパティーの data-collection
および filter
パラメーターが SQL クエリーに追加されます。次に例を示します。
SELECT * FROM <data-collection> WHERE <filter> ….
たとえば、列 id
(プライマリーキー)、color
、および brand
を含む products
テーブルがある場合、スナップショットに color='blue'
のコンテンツのみを含める場合は、スナップショットをリクエストするときに、コンテンツをフィルタリングする additional-conditions
プロパティーを追加することができます。
Key = `test_connector` Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "schema1.products" ,"filter":"color='blue'"}]}}`
また、additional-conditions
プロパティーを使用して、複数の列に基づいて条件を渡すこともできます。たとえば、前の例と同じ products
テーブルを使用して、color='blue'
および brand='MyBrand'
である products
テーブルのコンテンツのみをスナップショットに含める場合は、次のリクエストを送信できます。
Key = `test_connector` Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "schema1.products" ,"filter":"color='blue' AND brand='MyBrand'"}]}}`
2.1.2.3.4. 増分スナップショットの停止
状況によっては、増分スナップショットを停止する必要がある場合があります。たとえば、スナップショットが正しく設定されていない場合や、他のデータベース操作にリソースが使用可能であるこのとの確認が必要な場合があります。ソースデータベースのシグナリングテーブルにシグナルを送信することで、すでに実行中のスナップショットを停止できます。
スナップショット停止信号をシグナリングテーブルに送信するには、SQL INSERT
クエリーで送信します。stop-snapshot シグナルは、スナップショット操作の type
を incremental
として指定し、オプションで、現在実行中のスナップショットから省略するテーブルを指定します。Debezium はシグナルテーブルの変更を検出した後、シグナルを読み、増分スナップショット操作が進行中であればそれを停止します。
関連情報
また、JSON メッセージを Kafka シグナリングトピック に送信して、増分スナップショットを停止することもできます。
前提条件
- ソースデータベースにシグナリングデータコレクションが存在する。
-
シグナルデータコレクションが
signal.data.collection
プロパティーで指定されている。
ソースシグナリングチャネルを使用して増分スナップショットを停止する
SQL クエリーを送信して、シグナリングテーブルへのアドホックインクリメンタルスナップショットを停止します。
INSERT INTO <signalTable> (id, type, data) values ('<id>', 'stop-snapshot', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"incremental"}');
以下に例を示します。
INSERT INTO myschema.debezium_signal (id, type, data) 1 values ('ad-hoc-1', 2 'stop-snapshot', 3 '{"data-collections": ["schema1.table1", "schema1.table2"], 4 "type":"incremental"}'); 5
signal コマンドの
id
、type
、およびdata
パラメーターの値は、シグナルテーブルのフィールド に対応します。
以下の表では、この例のパラメーターを説明しています。表2.6 シグナリングテーブルに増分スナップショット停止信号を送信するための SQL コマンドのフィールドの説明 項目 値 説明 1
schema.debezium_signal
ソースデータベースにあるシグナルテーブルの完全修飾名を指定します。
2
ad-hoc-1
id
パラメーターは、シグナルリクエストのID
識別子として割り当てられる任意の文字列を指定します。
この文字列を使用して、シグナルテーブルのエントリーへのログメッセージを特定します。Debezium はこの文字列を使用しません。3
stop-snapshot
type
パラメーターを指定し、シグナルがトリガーする操作を指定します。
4
data-collections
シグナルの
data
フィールドのオプションコンポーネントで、スナップショットから削除するテーブル名の配列またはテーブル名とマッチする正規表現を指定します。
配列には、schema.table
の形式で完全修飾名でテーブルに一致する正規表現がリストされます。data
フィールドからこのコンポーネントを省略すると、シグナルによって進行中の増分スナップショット全体が停止されます。5
incremental
停止するスナップショット操作のタイプを指定する信号の
data
フィールドの必須コンポーネント。
現在、有効な唯一のオプションはincremental
です。type
の値を指定しない場合、シグナルは増分スナップショットの停止に失敗します。
2.1.2.3.5. Kafka シグナリングチャネルを使用して増分スナップショットを停止する
設定された Kafka シグナルトピック にシグナルメッセージを送信して、アドホック増分スナップショットを停止できます。
Kafka メッセージのキーは、topic.prefix
コネクター設定オプションの値と一致する必要があります。
メッセージの値は、type
と data
フィールドが含まれる JSON オブジェクトとなっています。
シグナルタイプは stop-snapshot
で、data
フィールドには以下のフィールドが必要です。
フィールド | デフォルト | 値 |
---|---|---|
|
|
実行するスナップショットのタイプ。現在、Debezium は |
| 該当なし |
テーブルの完全修飾名に一致する、コンマで区切られた正規表現のオプションの配列、スナップショットから削除するテーブル名に一致するテーブル名または正規表現の配列。 |
次の例は、典型的な stop-snapshot
の Kafka メッセージを示しています。
Key = `test_connector` Value = `{"type":"stop-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`
2.1.2.4. ブロッキングスナップショット
スナップショットをより柔軟に管理するために、Debezium には ブロッキングスナップショット と呼ばれる補助アドホックスナップショットメカニズムが含まれています。ブロッキングスナップショットは、Debezium コネクターにシグナルを送信 するための Debezium メカニズムに依存します。
ブロッキングスナップショットは、ランタイム時にトリガーできることを除いて、初期スナップショット と同じように動作します。
次のような状況では、標準の初期スナップショットプロセスを使用するのではなく、ブロッキングスナップショットを実行する必要があります。
- 新しいテーブルを追加し、コネクターの実行中にスナップショットを完了したいと考えている。
- 大きなテーブルを追加し、増分スナップショットよりも短い時間でスナップショットを完了したいと考えている。
ブロッキングスナップショットのプロセス
ブロッキングスナップショットを実行すると、Debezium はストリーミングを停止し、初期スナップショットの時と同じプロセスに従って、指定されたテーブルのスナップショットを開始します。スナップショットが完了すると、ストリーミングが再開されます。
スナップショットの設定
シグナルの data
コンポーネントでは、次のプロパティーを設定できます。
- data-collections: スナップショットする必要のあるテーブルを指定します。
additional-conditions: テーブルごとに異なるフィルターを指定できます。
-
data-collection
プロパティーは、フィルターが適用されるテーブルの完全修飾名です。 -
filter
プロパティーは、snapshot.select.statement.overrides
で使用される値と同じになります。
-
以下に例を示します。
{"type": "blocking", "data-collections": ["schema1.table1", "schema1.table2"], "additional-conditions": [{"data-collection": "schema1.table1", "filter": "SELECT * FROM [schema1].[table1] WHERE column1 = 0 ORDER BY column2 DESC"}, {"data-collection": "schema1.table2", "filter": "SELECT * FROM [schema1].[table2] WHERE column2 > 0"}]}
重複の可能性
スナップショットをトリガーするシグナルを送信した時点と、ストリーミングが停止してスナップショットが開始する時点との間に遅延が生じる可能性があります。この遅延の結果、スナップショットが完了した後、コネクターはスナップショットがキャプチャーしたレコードと重複するイベントレコードを発行する可能性があります。
2.1.2.5. Debezium Db2 コネクターによる変更データテーブルの読み取り方法
スナップショットの完了後、Debezium Db2 コネクターが初めて起動すると、キャプチャーモードである各ソーステーブルの変更データテーブルを識別します。コネクターは各変更データテーブルに対して以下を行います。
- 最後に保存された最大 LSN から現在の最大 LSN の間に作成された変更イベントを読み取ります。
- 各イベントのコミット LSN および変更 LSN に従って、変更イベントを順序付けます。これにより、コネクターはテーブルが変更された順序で変更イベントを出力します。
- コミット LSN および変更 LSN をオフセットとして Kafka Connect に渡します。
- コネクターが Kafka Connect に渡した最大 LSN を保存します。
再起動後、コネクターは停止した場所でオフセット (コミット LSN および変更 LSN) から変更イベントの出力を再開します。コネクターが稼働し、変更イベントを出力している間、キャプチャーモードからテーブルを削除したり、テーブルをキャプチャーモードに追加したりすると、コネクターは変更を検出して、それに合わせて動作を変更します。
2.1.2.6. Debezium Db2 変更イベントレコードを受信する Kafka トピックのデフォルト名
デフォルトでは、Db2 コネクターは、テーブルで発生するすべての INSERT
、UPDATE
、DELETE
操作の変更イベントを、そのテーブルに固有の単一の Apache Kafka トピックに書き込みます。コネクターは以下の規則を使用して変更イベントトピックに名前を付けます。
topicPrefix.schemaName.tableName
以下のリストは、デフォルト名のコンポーネントの定義を示しています。
- topicPrefix
-
topic.prefix
コネクター設定プロパティーで指定されたトピック接頭辞。 - schemaName
- 操作が発生したスキーマの名前。
- tableName
- 操作が発生したテーブルの名前。
たとえば、MYSCHEMA
スキーマに 4 つのテーブル (PRODUCTS
、PRODUCTS_ON_HAND
、CUSTOMERS
、ORDERS
) を含む mydatabase
データベースを使用した Db2 インストールについて考えてみます。コネクターはイベントを以下の 4 つの Kafka トピックに出力します。
-
mydatabase.MYSCHEMA.PRODUCTS
-
mydatabase.MYSCHEMA.PRODUCTS_ON_HAND
-
mydatabase.MYSCHEMA.CUSTOMERS
-
mydatabase.MYSCHEMA.ORDERS
コネクターは同様の命名規則を適用して、内部データベーススキーマの履歴トピック (スキーマ変更トピック と トランザクションメタデータトピック) にラベルを付けます。
デフォルトのトピック名が要件を満たさない場合は、カスタムトピック名を設定できます。カスタムトピック名を設定するには、論理トピックルーティング SMT に正規表現を指定します。論理トピックルーティング SMT を使用してトピックの命名をカスタマイズする方法は、トピックルーティング を参照してください。
2.1.2.7. Debezium Db2 コネクターによるデータベーススキーマの変更の処理方法
データベースクライアントがデータベースのクエリーを行うと、クライアントはデータベースの現在のスキーマを使用します。しかし、データベーススキーマはいつでも変更が可能です。そのため、挿入、更新、または削除の操作が記録されるたびに、コネクターはどのスキーマであるかを特定できる必要があります。また、コネクターは必ずしも現在のスキーマをすべてのイベントに適用できるとは限りません。イベントが比較的古い場合は、現在のスキーマが適用される前に記録された可能性があります。
スキーマ変更後に発生するイベントを正しく処理するために、Debezium Db2 コネクターは、関連するデータテーブルの構造をミラーリングする Db2 変更データテーブルの構造に基づいて、新しいスキーマのスナップショットを保存します。コネクターは、データベーススキーマ履歴 Kafka トピックに、スキーマ変更の結果 (複数操作の LSN) と合わせてテーブルのスキーマ情報を保存します。コネクターは、保管されたスキーマ表現を使用して、挿入、更新、または削除の各操作時にテーブルの構造を正しくミラーリングする変更イベントを生成します。
クラッシュまたは正常に停止した後にコネクターが再起動すると、最後に読み取った位置から Db2 変更データテーブル内のエントリーの読み取りを再開します。コネクターがデータベーススキーマ履歴トピックから読み取るスキーマ情報を基に、コネクターが再起動する場所に存在したテーブル構造を適用します。
キャプチャーモードの Db2 テーブルのスキーマを更新する場合は、対応する変更テーブルのスキーマも更新することが重要です。データベーススキーマを更新するには、昇格権限のある Db2 データベース管理者である必要があります。Debezium 環境で Db2 データベーススキーマを更新する方法は、スキーマ履歴の進化 を参照してください。
データベーススキーマ履歴トピックは、内部コネクター専用となっています。オプションで、コネクターは コンシューマーアプリケーション向けの別のトピックにスキーマ変更イベントを送信する こともできます。
関連情報
- Debezium イベントレコードを受信する トピックのデフォルト名。
2.1.2.8. Debezium Db2 コネクターのスキーマ変更トピック
Debezium Db2 コネクターを設定すると、データベーステーブルに適用されるスキーマの変更を記述するスキーマ変更イベントを生成できます。
Debezium は、以下の場合にスキーマ変更トピックにメッセージを出力します。
- 新しいテーブルがキャプチャーモードになる。
- テーブルがキャプチャーモードから削除される。
- データベーススキーマの更新 中に、キャプチャーモードであるテーブルのスキーマが変更される。
コネクターはスキーマ変更イベントを、<topicPrefix>
という名前の Kafka スキーマ変更トピックに書き込みます。ここで <topicPrefix>
は、topic.prefix
コネクター設定プロパティーで指定されたトピック接頭辞です。
スキーマ変更イベントのスキーマには、次の要素があります。
name
- スキーマ変更イベントメッセージの名前。
type
- 変更イベントメッセージのタイプ。
version
- スキーマのバージョン。バージョンは整数で、スキーマが変更されるたびに増加します。
fields
- 変更イベントメッセージに含まれるフィールド。
例: Db2 コネクターのスキーマ変更トピックのスキーマ
次の例は、JSON 形式の一般的なスキーマを示しています。
{ "schema": { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "databaseName" } ], "optional": false, "name": "io.debezium.connector.db2.SchemaChangeKey", "version": 1 }, "payload": { "databaseName": "inventory" } }
コネクターがスキーマ変更トピックに送信するメッセージには以下の要素などのペイロードが含まれます。
databaseName
-
ステートメントが適用されるデータベースの名前。
databaseName
の値は、メッセージキーとして機能します。 pos
- ステートメントが表示されるトランザクションログ内の位置。
tableChanges
-
スキーマの変更後のテーブルスキーマ全体の構造化表現。
tableChanges
フィールドには、テーブルの各列のエントリーなどのアレイが含まれます。構造化された表現は JSON または Avro 形式でデータを表示するため、コンシューマーは DDL パーサーを介して最初にメッセージを処理しなくてもメッセージを簡単に読み取りできます。
キャプチャーモードであるテーブルでは、コネクターはスキーマ変更トピックにスキーマ変更の履歴だけでなく、内部データベーススキーマ履歴トピックにも格納します。内部データベーススキーマ履歴トピックはコネクターのみの使用を対象としており、使用するアプリケーションによる直接使用を目的としていません。スキーマ変更に関する通知が必要なアプリケーションが、スキーマ変更トピックからの情報のみを使用するようにしてください。
データベーススキーマ履歴トピックをパーティションに分割しないでください。データベーススキーマ履歴トピックが正しく機能するには、コネクターが出力するイベントレコードの一貫したグローバル順序を維持する必要があります。
トピックがパーティション間で分割されないようにするには、以下のいずれかの方法を使用してトピックのパーティション数を設定します。
-
データベーススキーマ履歴トピックを手動で作成する場合は、パーティション数を
1
に指定します。 -
Apache Kafka ブローカーを使用してデータベーススキーマ履歴トピックを自動的に作成する場合に、トピックが作成されるので、Kafka
num.partitions
設定オプションの値を1
に設定します。
コネクターがスキーマ変更トピックに出力するメッセージの形式は、初期の状態であり、通知なしに変更される可能性があります。
例: Db2 コネクターのスキーマ変更トピックに出力されるメッセージ
以下の例は、スキーマ変更トピックのメッセージを示しています。メッセージには、テーブルスキーマの論理表現が含まれます。
{ "schema": { ... }, "payload": { "source": { "version": "2.7.3.Final", "connector": "db2", "name": "db2", "ts_ms": 0, "snapshot": "true", "db": "testdb", "schema": "DB2INST1", "table": "CUSTOMERS", "change_lsn": null, "commit_lsn": "00000025:00000d98:00a2", "event_serial_no": null }, "ts_ms": 1588252618953, 1 "databaseName": "TESTDB", 2 "schemaName": "DB2INST1", "ddl": null, 3 "tableChanges": [ 4 { "type": "CREATE", 5 "id": "\"DB2INST1\".\"CUSTOMERS\"", 6 "table": { 7 "defaultCharsetName": null, "primaryKeyColumnNames": [ 8 "ID" ], "columns": [ 9 { "name": "ID", "jdbcType": 4, "nativeType": null, "typeName": "int identity", "typeExpression": "int identity", "charsetName": null, "length": 10, "scale": 0, "position": 1, "optional": false, "autoIncremented": false, "generated": false }, { "name": "FIRST_NAME", "jdbcType": 12, "nativeType": null, "typeName": "varchar", "typeExpression": "varchar", "charsetName": null, "length": 255, "scale": null, "position": 2, "optional": false, "autoIncremented": false, "generated": false }, { "name": "LAST_NAME", "jdbcType": 12, "nativeType": null, "typeName": "varchar", "typeExpression": "varchar", "charsetName": null, "length": 255, "scale": null, "position": 3, "optional": false, "autoIncremented": false, "generated": false }, { "name": "EMAIL", "jdbcType": 12, "nativeType": null, "typeName": "varchar", "typeExpression": "varchar", "charsetName": null, "length": 255, "scale": null, "position": 4, "optional": false, "autoIncremented": false, "generated": false } ], "attributes": [ 10 { "customAttribute": "attributeValue" } ] } } ] } }
項目 | フィールド名 | 説明 |
---|---|---|
1 |
| コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。 ソースオブジェクトの ts_ms は、データベースで変更が行われた時刻を示す。payload.source.ts_ms の値を payload.ts_ms の値と比較することにより、ソースデータベースの更新と Debezium との間の遅延を判断できます。 |
2 |
| 変更が含まれるデータベースとスキーマを識別します。 |
3 |
|
Db2 コネクターの場合は常に |
4 |
| DDL コマンドによって生成されるスキーマの変更が含まれる 1 つ以上の項目の配列。 |
5 |
| 変更の種類を説明します。値は以下のいずれかになります。
|
6 |
| 作成、変更、または破棄されたテーブルの完全な識別子。 |
7 |
| 適用された変更後のテーブルメタデータを表します。 |
8 |
| テーブルのプライマリーキーを設定する列のリスト。 |
9 |
| 変更されたテーブルの各列のメタデータ。 |
10 |
| 各テーブル変更のカスタム属性メタデータ。 |
コネクターがスキーマ変更トピックに送信するメッセージでは、メッセージキーはスキーマの変更が含まれるデータベースの名前です。以下の例では、payload
フィールドにキーが含まれます。
{ "schema": { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "databaseName" } ], "optional": false, "name": "io.debezium.connector.db2.SchemaChangeKey", "version": 1 }, "payload": { "databaseName": "TESTDB" } }
2.1.2.9. トランザクション境界を表す Debezium Db2 コネクターによって生成されたイベント
Debezium は、トランザクション境界を表し、変更データイベントメッセージを強化するイベントを生成できます。
Debezium は、コネクターのデプロイ後に発生するトランザクションに対してのみメタデータを登録し、受信します。コネクターをデプロイする前に発生するトランザクションのメタデータは利用できません。
Debezium は、すべてのトランザクションで BEGIN
および END
区切り文字のトランザクション境界イベントを生成します。トランザクション境界イベントには以下のフィールドが含まれます。
status
-
BEGIN
またはEND
id
- 一意のトランザクション識別子の文字列表現。
ts_ms
-
データソースでのトランザクション境界イベント (
BEGIN
またはEND
イベント) の時間。データソースから Debezium にイベント時間を渡されない場合、フィールドは代わりに Debezium がイベントを処理する時間を表します。 event_count
(END
イベント用)- トランザクションによって出力されるイベントの合計数。
data_collections
(END
イベント用)-
data_collection
とevent_count
要素のペアの配列。これは、コネクターがデータコレクションから発信された変更に対して出力するイベントの数を示します。
例
{ "status": "BEGIN", "id": "00000025:00000d08:0025", "ts_ms": 1486500577125, "event_count": null, "data_collections": null } { "status": "END", "id": "00000025:00000d08:0025", "ts_ms": 1486500577691, "event_count": 2, "data_collections": [ { "data_collection": "testDB.dbo.tablea", "event_count": 1 }, { "data_collection": "testDB.dbo.tableb", "event_count": 1 } ] }
topic.transaction
オプションで上書きされない限り、コネクターはトランザクションイベントを <topic.prefix>
.transaction
トピックに出力します。
データ変更イベントのエンリッチメント
トランザクションメタデータを有効にすると、コネクターは変更イベント Envelope
を新しい transaction
フィールドで強化します。このフィールドは、複合フィールドの形式ですべてのイベントに関する情報を提供します。
id
- 一意のトランザクション識別子の文字列表現。
total_order
- トランザクションによって生成されたすべてのイベントを対象とするイベントの絶対位置。
data_collection_order
- トランザクションによって出力されたすべてのイベントを対象とするイベントのデータコレクションごとの位置。
以下は、メッセージの例になります。
{ "before": null, "after": { "pk": "2", "aa": "1" }, "source": { ... }, "op": "c", "ts_ms": "1580390884335", "ts_us": "1580390884335875", "ts_ns": "1580390884335875412", "transaction": { "id": "00000025:00000d08:0025", "total_order": "1", "data_collection_order": "1" } }
2.1.3. Debezium Db2 コネクターのデータ変更イベントの説明
Debezium Db2 コネクターは、行レベルの INSERT
、UPDATE
、および DELETE
操作ごとにデータ変更イベントを生成します。各イベントにはキーと値が含まれます。キーと値の構造は、変更されたテーブルによって異なります。
Debezium および Kafka Connect は、イベントメッセージの継続的なストリーム を中心として設計されています。ただし、これらのイベントの構造は時間の経過とともに変化する可能性があり、コンシューマーによる処理が困難になることがあります。これに対応するために、各イベントにはコンテンツのスキーマが含まれます。スキーマレジストリーを使用している場合は、コンシューマーがレジストリーからスキーマを取得するために使用できるスキーマ ID が含まれます。これにより、各イベントが自己完結型になります。
以下のスケルトン JSON は、変更イベントの基本となる 4 つの部分を示しています。ただし、アプリケーションで使用するために選択した Kafka Connect コンバーターの設定方法によって、変更イベントのこれら 4 部分の表現が決定されます。schema
フィールドは、変更イベントが生成されるようにコンバーターを設定した場合のみ変更イベントに含まれます。同様に、イベントキーおよびイベントペイロードは、変更イベントが生成されるようにコンバーターを設定した場合のみ変更イベントに含まれます。JSON コンバーターを使用し、変更イベントの基本となる 4 つの部分すべてを生成するように設定すると、変更イベントの構造は次のようになります。
{ "schema": { 1 ... }, "payload": { 2 ... }, "schema": { 3 ... }, "payload": { 4 ... }, }
項目 | フィールド名 | 説明 |
---|---|---|
1 |
|
最初の |
2 |
|
最初の |
3 |
|
2 つ目の |
4 |
|
2 つ目の |
デフォルトでは、コネクターによって、変更イベントレコードがイベントの元のテーブルと同じ名前を持つトピックにストリーミングされます。詳細は、トピック名 を参照してください。
Debezium Db2 コネクターは、すべての Kafka Connect スキーマ名が Avro スキーマ名の形式 に準拠するようにします。つまり、論理サーバー名はアルファベットまたはアンダースコア (a-z、A-Z、または _) で始まる必要があります。論理サーバー名の残りの各文字と、データベース名とテーブル名の各文字は、アルファベット、数字、またはアンダースコア ( a-z、A-Z、0-9、または \_) でなければなりません。無効な文字がある場合は、アンダースコアに置き換えられます。
論理サーバー名、データベース名、またはテーブル名に無効な文字が含まれ、名前を区別する唯一の文字が無効であると、無効な文字はすべてアンダースコアに置き換えられるため、予期せぬ競合が発生する可能性があります。
また、データベース、スキーマ、およびテーブルの Db2 名では、大文字と小文字を区別することができます。つまり、コネクターは同じ Kafka トピックに複数のテーブルのイベントレコードを出力できます。
詳細は以下を参照してください。
2.1.3.1. Debezium db2 変更イベントのキー
変更イベントのキーには、変更されたテーブルのキーのスキーマと、変更された行の実際のキーのスキーマが含まれます。スキーマとそれに対応するペイロードの両方には、コネクターによってイベントが作成された時点において、変更されたテーブルの PRIMARY KEY
(または一意の制約) に存在した各列のフィールドが含まれます。
以下の customers
テーブルについて考えてみましょう。この後に、このテーブルの変更イベントキーの例を示します。
テーブルの例
CREATE TABLE customers ( ID INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY, FIRST_NAME VARCHAR(255) NOT NULL, LAST_NAME VARCHAR(255) NOT NULL, EMAIL VARCHAR(255) NOT NULL UNIQUE );
変更イベントキーの例
customers
テーブルへの変更をキャプチャーする変更イベントのすべてに、イベントキースキーマがあります。customers
テーブルに前述の定義がある限り、customers
テーブルへの変更をキャプチャーする変更イベントのキー構造はすべて以下のようになります。JSON では、以下のようになります。
{ "schema": { 1 "type": "struct", "fields": [ 2 { "type": "int32", "optional": false, "field": "ID" } ], "optional": false, 3 "name": "mydatabase.MYSCHEMA.CUSTOMERS.Key" 4 }, "payload": { 5 "ID": 1004 } }
項目 | フィールド名 | 説明 |
---|---|---|
1 |
|
キーのスキーマ部分は、キーの |
2 |
|
各フィールドの名前、型、および必要かどうかなど、 |
3 |
|
イベントキーの |
4 |
|
キーのペイロードの構造を定義するスキーマの名前。このスキーマは、変更されたテーブルのプライマリーキーの構造を記述します。キースキーマ名の形式は connector-name.database-name.table-name.
|
5 |
|
この変更イベントが生成された行のキーが含まれます。この例では、キーには値が |
2.1.3.2. Debezium Db2 変更イベントの値
変更イベントの値はキーよりも若干複雑です。キーと同様に、値には schema
セクションと payload
セクションがあります。schema
セクションには、入れ子のフィールドを含む、Envelope
セクションの payload
構造を記述するスキーマが含まれています。データを作成、更新、または削除する操作のすべての変更イベントには、Envelope 構造を持つ値 payload があります。
変更イベントキーの例を紹介するために使用した、同じサンプルテーブルについて考えてみましょう。
テーブルの例
CREATE TABLE customers ( ID INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY, FIRST_NAME VARCHAR(255) NOT NULL, LAST_NAME VARCHAR(255) NOT NULL, EMAIL VARCHAR(255) NOT NULL UNIQUE );
customers
テーブルのすべての変更イベントのイベント値部分は同じスキーマを指定します。イベント値のペイロードは、イベント型によって異なります。
create イベント
以下の例は、customers
テーブルにデータを作成する操作に対して、コネクターによって生成される変更イベントの値の部分を示しています。
{ "schema": { 1 "type": "struct", "fields": [ { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "ID" }, { "type": "string", "optional": false, "field": "FIRST_NAME" }, { "type": "string", "optional": false, "field": "LAST_NAME" }, { "type": "string", "optional": false, "field": "EMAIL" } ], "optional": true, "name": "mydatabase.MYSCHEMA.CUSTOMERS.Value", 2 "field": "before" }, { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "ID" }, { "type": "string", "optional": false, "field": "FIRST_NAME" }, { "type": "string", "optional": false, "field": "LAST_NAME" }, { "type": "string", "optional": false, "field": "EMAIL" } ], "optional": true, "name": "mydatabase.MYSCHEMA.CUSTOMERS.Value", "field": "after" }, { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "version" }, { "type": "string", "optional": false, "field": "connector" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "ts_ms" }, { "type": "int64", "optional": false, "field": "ts_us" }, { "type": "int64", "optional": false, "field": "ts_ns" }, { "type": "boolean", "optional": true, "default": false, "field": "snapshot" }, { "type": "string", "optional": false, "field": "db" }, { "type": "string", "optional": false, "field": "schema" }, { "type": "string", "optional": false, "field": "table" }, { "type": "string", "optional": true, "field": "change_lsn" }, { "type": "string", "optional": true, "field": "commit_lsn" }, ], "optional": false, "name": "io.debezium.connector.db2.Source", 3 "field": "source" }, { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" }, { "type": "int64", "optional": true, "field": "ts_us" }, { "type": "int64", "optional": true, "field": "ts_ns" } ], "optional": false, "name": "mydatabase.MYSCHEMA.CUSTOMERS.Envelope" 4 }, "payload": { 5 "before": null, 6 "after": { 7 "ID": 1005, "FIRST_NAME": "john", "LAST_NAME": "doe", "EMAIL": "john.doe@example.org" }, "source": { 8 "version": "2.7.3.Final", "connector": "db2", "name": "myconnector", "ts_ms": 1559729468470, "ts_us": 1559729468470476, "ts_ns": 1559729468470476000, "snapshot": false, "db": "mydatabase", "schema": "MYSCHEMA", "table": "CUSTOMERS", "change_lsn": "00000027:00000758:0003", "commit_lsn": "00000027:00000758:0005", }, "op": "c", 9 "ts_ms": 1559729471739, 10 "ts_us": 1559729471739762, 11 "ts_ns": 1559729471739762314 12 } }
項目 | フィールド名 | 説明 |
---|---|---|
1 |
| 値のペイロードの構造を記述する、値のスキーマ。変更イベントの値スキーマは、コネクターが特定のテーブルに生成するすべての変更イベントで同じになります。 |
2 |
|
|
3 |
|
|
4 |
|
|
5 |
|
値の実際のデータ。これは、変更イベントが提供する情報です。 |
6 |
|
イベント発生前の行の状態を指定する任意のフィールド。この例のように、 |
7 |
|
イベント発生後の行の状態を指定する任意のフィールド。この例では、 |
8 |
|
イベントのソースメタデータを記述する必須のフィールド。
|
9 |
|
コネクターによってイベントが生成される原因となった操作の型を記述する必須文字列。この例では、
|
10 |
|
コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。 |
更新イベント
サンプル customers
テーブルにある更新の変更イベントの値には、そのテーブルの 作成 イベントと同じスキーマがあります。同様に、更新イベント値のペイロードは同じ構造を持ちます。ただし、イベント値ペイロードでは 更新 イベントに異なる値が含まれます。以下は、コネクターによって customers
テーブルでの更新に生成されるイベントの変更イベント値の例になります。
{ "schema": { ... }, "payload": { "before": { 1 "ID": 1005, "FIRST_NAME": "john", "LAST_NAME": "doe", "EMAIL": "john.doe@example.org" }, "after": { 2 "ID": 1005, "FIRST_NAME": "john", "LAST_NAME": "doe", "EMAIL": "noreply@example.org" }, "source": { 3 "version": "2.7.3.Final", "connector": "db2", "name": "myconnector", "ts_ms": 1559729995937, "ts_us": 1559729995937497, "ts_ns": 1559729995937497000, "snapshot": false, "db": "mydatabase", "schema": "MYSCHEMA", "table": "CUSTOMERS", "change_lsn": "00000027:00000ac0:0002", "commit_lsn": "00000027:00000ac0:0007", }, "op": "u", 4 "ts_ms": 1559729998706, 5 "ts_us": 1559729998706647, 6 "ts_ns": 1559729998706647825 7 } }
項目 | フィールド名 | 説明 |
---|---|---|
1 |
|
イベント発生前の行の状態を指定する任意のフィールド。更新 イベント値の |
2 |
|
イベント発生後の行の状態を指定する任意のフィールド。 |
3 |
|
イベントのソースメタデータを記述する必須のフィールド。
|
4 |
|
操作の型を記述する必須の文字列。更新 イベントの値では、 |
5 |
|
コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。 |
行のプライマリーキー/一意キーの列を更新すると、行のキーの値が変更されます。キーが変更されると、3 つのイベントが Debezium によって出力されます。3 つのイベントとは、DELETE
イベント、行の古いキーを持つ 廃棄 (tombstone)、およびそれに続く行の新しいキーを持つイベントです。
delete イベント
削除 変更イベントの値は、同じテーブルの 作成 および 更新 イベントと同じ schema
の部分になります。サンプル customers
テーブルの 削除 イベントのイベント値 payload
は以下のようになります。
{ "schema": { ... }, }, "payload": { "before": { 1 "ID": 1005, "FIRST_NAME": "john", "LAST_NAME": "doe", "EMAIL": "noreply@example.org" }, "after": null, 2 "source": { 3 "version": "2.7.3.Final", "connector": "db2", "name": "myconnector", "ts_ms": 1559730445243, "ts_us": 1559730445243482, "ts_ns": 1559730445243482000, "snapshot": false, "db": "mydatabase", "schema": "MYSCHEMA", "table": "CUSTOMERS", "change_lsn": "00000027:00000db0:0005", "commit_lsn": "00000027:00000db0:0007" }, "op": "d", 4 "ts_ms": 1559730450205, 5 "ts_us": 1559730450205521, 6 "ts_ns": 1559730450205521475 7 } }
項目 | フィールド名 | 説明 |
---|---|---|
1 |
|
イベント発生前の行の状態を指定する任意のフィールド。削除 イベント値の |
2 |
|
イベント発生後の行の状態を指定する任意のフィールド。削除 イベント値の |
3 |
|
イベントのソースメタデータを記述する必須のフィールド。削除 イベント値の
|
4 |
|
操作の型を記述する必須の文字列。 |
5 |
|
コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。 |
削除 変更イベントレコードは、この行の削除を処理するために必要な情報を持つコンシューマーを提供します。コンシューマーによっては、削除を適切に処理するために古い値が必要になることがあるため、古い値が含まれます。
Db2 コネクターイベントは、Kafka のログコンパクション と動作するように設計されています。ログコンパクションにより、少なくとも各キーの最新のメッセージが保持される限り、一部の古いメッセージを削除できます。これにより、トピックに完全なデータセットが含まれ、キーベースの状態のリロードに使用できるようにするとともに、Kafka がストレージ領域を確保できるようにします。
行が削除された場合でも、Kafka は同じキーを持つ以前のメッセージをすべて削除できるため、削除 イベントの値はログコンパクションで動作します。ただし、Kafka が同じキーを持つすべてのメッセージを削除するには、メッセージの値が null
である必要があります。これを可能にするために、Debezium の Db2 コネクターは 削除 イベントを出力した後に、null
値以外で同じキーを持つ特別な廃棄 (tombstone) イベントを出力します。
2.1.4. Debezium Db2 コネクターによるデータ型のマッピング方法
Db2 がサポートするデータ型の詳細は、Db2 ドキュメントの Data Types を参照してください。
Db2 コネクターは、行が存在するテーブルのように構造化されたイベントで行への変更を表します。イベントには、各列の値のフィールドが含まれます。その値がどのようにイベントで示されるかは、列の Db2 のデータ型によって異なります。ここでは、これらのマッピングを説明します。デフォルトのデータ型変換が要件に合わない場合は、コネクター用の カスタムコンバーターの作成 が可能です。
詳細は以下を参照してください。
基本型
以下の表では、各 Db2 データ型をイベントフィールドの リテラル型 および セマンティック型にマッピングする方法を説明します。
-
literal type は、Kafka Connect スキーマタイプ (
INT8
、INT16
、INT32
、INT64
、FLOAT32
、FLOAT64
、BOOLEAN
、STRING
、BYTES
、ARRAY
、MAP
、STRUCT
) を使用して、値がどのように表現されるかを記述します。 - セマンティック型 は、フィールドの Kafka Connect スキーマの名前を使用して、Kafka Connect スキーマがフィールドの 意味 をキャプチャーする方法を記述します。
DB2 データ型 | リテラル型 (スキーマ型) | セマンティック型 (スキーマ名) および注記 |
---|---|---|
|
| BOOLEAN 型の列のあるテーブルからのみスナップショットを作成できます。現在、Db2 での SQL レプリケーションは BOOLEAN をサポートしないため、Debezium はこれらのテーブルで CDC を実行できません。別の型の使用を検討してください。 |
|
| 該当なし |
|
| 該当なし |
|
| 該当なし |
|
| 該当なし |
|
| 該当なし |
|
|
|
|
|
|
|
|
|
|
| 該当なし |
|
| 該当なし |
|
| 該当なし |
|
| 該当なし |
|
| 該当なし |
|
|
|
|
|
|
|
| 該当なし |
|
| 該当なし |
|
| 該当なし |
|
|
|
列のデフォルト値がある場合は、対応するフィールドの Kafka Connect スキーマに伝達されます。明示的な列値が指定されない限り、変更イベントにはフィールドのデフォルト値が含まれます。そのため、スキーマからデフォルト値を取得する必要はほとんどありません。
時間型
タイムゾーン情報を含む DATETIMEOFFSET
データタイプを除き、Db2 は time.precision.mode
コネクター設定プロパティーの値に基づいて時間型をマップします。ここでは、以下のマッピングを説明します。
time.precision.mode=adaptive
time.precision.mode
設定プロパティーがデフォルトの adaptive
に設定された場合、コネクターは列のデータ型定義に基づいてリテラル型とセマンティック型を決定します。これにより、イベントがデータベースの値を 正確 に表すようになります。
DB2 データ型 | リテラル型 (スキーマ型) | セマンティック型 (スキーマ名) および注記 |
---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
time.precision.mode=connect
time.precision.mode
設定プロパティーが connect
に設定された場合、コネクターは Kafka Connect の論理型を使用します。これは、コンシューマーが組み込みの Kafka Connect の論理型のみを処理でき、可変精度の時間値を処理できない場合に便利です。ただし、Db2 はマイクロ秒の 10 分の 1 の精度をサポートするため、connect
時間精度を指定してコネクターによって生成されたイベントは、データベース列の少数秒の精度値が 3 よりも大きい場合に、精度が失われます。
DB2 データ型 | リテラル型 (スキーマ型) | セマンティック型 (スキーマ名) および注記 |
---|---|---|
|
|
|
|
|
|
|
|
|
タイムスタンプ型
DATETIME
タイプは、タイムゾーン情報のないタイムスタンプを表します。このような列は、UTC を基にして同等の Kafka Connect 値に変換されます。たとえば、"2018-06-20 15:13:16.945104" という DATETIME
値は、"1529507596000" という値の io.debezium.time.Timestamp
で表されます。
Kafka Connect および Debezium を実行している JVM のタイムゾーンは、この変換には影響しません。
DB2 データ型 | リテラル型 (スキーマ型) | セマンティック型 (スキーマ名) および注記 |
---|---|---|
|
|
|
|
|
|
2.1.5. Debezium コネクターを実行するための Db2 の設定
Db2 テーブルにコミットされた変更イベントを Debezium がキャプチャーするには、必要な権限を持つ Db2 データベース管理者が、変更データキャプチャーのデータベースでテーブルを設定する必要があります。Debezium の実行を開始した後、キャプチャーエージェントの設定を調整してパフォーマンスを最適化できます。
Debezium コネクターと使用するために Db2 を設定する場合の詳細は、以下を参照してください。
2.1.5.1. 変更データキャプチャーの Db2 テーブルの設定
テーブルをキャプチャーモードにするために、Debezium ではユーザー定義関数 (UDF) のセットが提供されます。ここでは、これらの管理 UDF をインストールおよび実行する手順を説明します。また、Db2 制御コマンドを実行してテーブルをキャプチャーモードにすることもできます。その後、管理者は Debezium がキャプチャーする各テーブルに対して、CDC を有効にする必要があります。
前提条件
-
db2instl
ユーザーとして Db2 にログインしている。 - Db2 ホストの $HOME/asncdctools/src ディレクトリーで Debezium 管理 UDF を使用できる。UDF は Debezium サンプルリポジトリー から入手できます。
-
Db2 コマンド
bldrtn
が PATH 上にある。たとえば、export PATH=$PATH:/opt/ibm/db2/V11.5.0.0/samples/c/
を Db2 11.5 で実行する。
手順
Db2 で提供される
bldrtn
コマンドを使用して、Db2 サーバーホストで Debezium 管理 UDF をコンパイルします。cd $HOME/asncdctools/src
bldrtn asncdc
データベースが稼働していない場合は起動します。
DB_NAME
は、Debezium が接続するデータベースの名前に置き換えます。db2 start db DB_NAME
JDBC が Db2 メタデータカタログを読み取りできるようにします。
cd $HOME/sqllib/bnd
db2 connect to DB_NAME db2 bind db2schema.bnd blocking all grant public sqlerror continue
データベースが最近バックアップされたことを確認します。ASN エージェントには、読み取りを始める最新の開始点が必要です。バックアップを実行する必要がある場合は、以下のコマンドを実行して、最新のバージョンのみを利用できるようにデータをプルーニングします。古いバージョンのデータを保持する必要がない場合は、バックアップの場所に
dev/null
を指定します。データベースをバックアップします。
DB_NAME
およびBACK_UP_LOCATION
を適切な値に置き換えます。db2 backup db DB_NAME to BACK_UP_LOCATION
データベースを再起動します。
db2 restart db DB_NAME
データベースに接続して、Debezium 管理 UDF をインストールします。
db2instl
ユーザーとしてログインしていることを前提とするため、UDF がdb2inst1
ユーザーにインストールされている必要があります。db2 connect to DB_NAME
Debezium 管理 UDF をコピーし、その権限を設定します。
cp $HOME/asncdctools/src/asncdc $HOME/sqllib/function
chmod 777 $HOME/sqllib/function
ASN キャプチャーエージェントを開始および停止する Debezium UDF を有効にします。
db2 -tvmf $HOME/asncdctools/src/asncdc_UDF.sql
ASN 制御テーブルを作成します。
$ db2 -tvmf $HOME/asncdctools/src/asncdctables.sql
テーブルをキャプチャーモードに追加し、キャプチャーモードからテーブルを削除する Debezium UDF を有効にします。
$ db2 -tvmf $HOME/asncdctools/src/asncdcaddremove.sql
Db2 サーバーを設定したら、UDF を使用して SQL コマンドで Db2 レプリケーション (ASN) を制御します。UDF によっては戻り値が必要な場合があります。この場合、SQL の
VALUE
ステートメントを使用して呼び出します。その他の UDF には、SQL のCALL
ステートメントを使用します。SQL クライアントから ASN エージェントを起動します。
VALUES ASNCDC.ASNCDCSERVICES('start','asncdc');
または、シェルから以下を行います。
db2 "VALUES ASNCDC.ASNCDCSERVICES('start','asncdc');"
前述のステートメントは、以下のいずれかの結果を返します。
-
asncap is already running
start -->
<COMMAND>
この場合は、以下の例のように、ターミナルウィンドウに指定の
<COMMAND>
を入力します。/database/config/db2inst1/sqllib/bin/asncap capture_schema=asncdc capture_server=SAMPLE &
-
テーブルをキャプチャーモードにします。キャプチャーする各テーブルに対して、以下のステートメントを呼び出します。
MYSCHEMA
は、キャプチャーモードにするテーブルが含まれるスキーマの名前に置き換えます。同様に、MYTABLE
は、キャプチャーモードにするテーブルの名前に置き換えます。CALL ASNCDC.ADDTABLE('MYSCHEMA', 'MYTABLE');
ASN サービスを再初期化します。
VALUES ASNCDC.ASNCDCSERVICES('reinit','asncdc');
2.1.5.2. Db2 キャプチャーエージェント設定のサーバー負荷およびレイテンシーへの影響
データベース管理者がソーステーブルに対して変更データキャプチャーを有効にすると、キャプチャーエージェントの実行が開始されます。エージェントは新しい変更イベントレコードをトランザクションログから読み取り、イベントレコードをキャプチャーテーブルに複製します。変更がソーステーブルにコミットされてから、対応する変更テーブルに変更が反映される間、常に短いレイテンシーが間隔で発生します。この遅延間隔は、ソーステーブルで変更が発生したときから、Debezium がその変更を Apache Kafka にストリーミングできるようになるまでの差を表します。
データの変更に素早く対応する必要があるアプリケーションについては、ソースとキャプチャーテーブル間で密接に同期を維持するのが理想的です。キャプチャーエージェントを実行してできるだけ迅速に変更イベントを継続的に処理すると、スループットが増加し、レイテンシーが減少するため、イベントの発生後にほぼリアルタイムで新しいイベントレコードが変更テーブルに入力されることを想像するかもしれません。しかし、これは必ずしもそうであるとは限りません。同期を即時に行うとパフォーマンスに影響します。変更エージェントが新しいイベントレコードについてデータベースにクエリーを実行するたびに、データベースホストの CPU 負荷が増加します。サーバーへの負荷が増えると、データベース全体のパフォーマンスに悪影響を及ぼす可能性があり、特にデータベースの使用がピークに達するときにトランザクションの効率が低下する可能性があります。
データベースメトリクスを監視して、サーバーがキャプチャーエージェントのアクティビティーをサポートできなくなるレベルにデータベースが達した場合に認識できるようにすることが重要となります。キャプチャーエージェントの実行中にパフォーマンスの問題が発生した場合は、キャプチャーエージェント設定を調整して CPU の負荷を減らします。
2.1.5.3. DB2 キャプチャーエージェントの設定パラメーター
Db2 では、IBMSNAP_CAPPARMS
テーブルにはキャプチャーエージェントの動作を制御するパラメーターが含まれています。これらのパラメーターの値を調整して、キャプチャープロセスの設定を調整すると、CPU の負荷を減らしながら許容レベルのレイテンシーを維持することができます。
Db2 のキャプチャーエージェントパラメーターの設定方法に関する具体的なガイダンスは、このドキュメントの範囲外となります。
IBMSNAP_CAPPARMS
テーブルでは、CPU 負荷の削減に最も影響を与えるパラメーターは以下のとおりです。
COMMIT_INTERVAL
- キャプチャーエージェントがデータを変更データテーブルにコミットするまで待つ期間を秒単位で指定します。
- 値が大きいほど、データベースホストの負荷が減少し、レイテンシーが増加します。
-
デフォルト値は
30
です。
SLEEP_INTERVAL
- キャプチャーエージェントがアクティブなトランザクションログの最後に到達した後に、新しいコミットサイクルの開始まで待つ期間を秒単位で指定します。
- 値が大きいほど、サーバーの負荷が減少し、レイテンシーが増加します。
-
デフォルト値は
5
です。
関連情報
- キャプチャーエージェントパラメーターの詳細は、Db2 のドキュメントを参照してください。
2.1.6. Debezium Db2 コネクターのデプロイ
以下の方法のいずれかを使用して Debezium Db2 コネクターをデプロイできます。
ライセンス要件のため、Debezium Db2 コネクターアーカイブには、Debezium が Db2 データベースに接続するために必要な Db2 JDBC ドライバーは含まれていません。コネクターがデータベースにアクセスできるようにするには、コネクター環境にドライバーを追加する必要があります。ドライバーの入手方法については、Db2JDBC ドライバーの入手を参照してください。
2.1.6.1. Db2 JDBC ドライバーの取得
Debezium が Db2 データベースに接続するために必要な Db2 JDBC ドライバーファイルは、ライセンスの関係で Debezium Db2 コネクターアーカイブに含まれていません。ドライバーは、Maven Central からダウンロード可能です。使用するデプロイメント方法に応じて、Kafka Connect カスタムリソースまたはコネクターイメージの構築に使用する Dockerfile にコマンドを追加して、ドライバーを取得することができます。
-
Streams for Apache Kafka を使用して Kafka Connect イメージにコネクターを追加する場合は、「Streams for Apache Kafka を使用した Debezium Db2 コネクターのデプロイ」 に示されているように、
KafkaConnect
カスタムリソースのbuilds.plugins.artifact.url
にドライバーの Maven Central の場所を追加します。 -
Dockerfile を使用してコネクター用のコンテナーイメージを構築する場合、Dockerfile に
curl
コマンドを挿入して、Maven Central から必要なドライバーファイルをダウンロードするための URL を指定します。詳細は、「Dockerfile からカスタム Kafka Connect コンテナーイメージをビルドして Debezium Db2 コネクターのデプロイ」 を参照してください。
2.1.6.2. Streams for Apache Kafka を使用した Db2 コネクターのデプロイメント
Debezium 1.7 以降、Debezium コネクターのデプロイに推奨される方法は、Streams for Apache Kafka を使用して、コネクタープラグインを含む Kafka Connect コンテナーイメージをビルドすることです。
デプロイメントプロセス中に、以下のカスタムリソース (CR) を作成し、使用します。
-
Kafka Connect インスタンスを定義し、コネクターアーティファクトに関する情報をイメージに含める必要がある
KafkaConnect
CR。 -
コネクターがソースデータベースにアクセスするために使用する情報を提供する
KafkaConnector
CR。Streams for Apache Kafka が Kafka Connect Pod を起動した後、KafkaConnector
CR を適用してコネクターを起動します。
Kafka Connect イメージのビルド仕様では、デプロイ可能なコネクターを指定できます。各コネクタープラグインに対して、デプロイメントに利用可能にする他のコンポーネントを指定することもできます。たとえば、Apicurio Registry アーティファクトや Debezium スクリプトコンポーネントを追加できます。Streams for Apache Kafka は、Kafka Connect イメージをビルドするときに、指定されたアーティファクトをダウンロードし、それをイメージに組み込みます。
Kafka Connect CR の spec.build.output
パラメーターは、生成される KafkaConnect
コンテナーイメージを格納する場所を指定します。コンテナーイメージは Docker レジストリーまたは OpenShift ImageStream に保存できます。イメージを ImageStream に保存するには、Kafka Connect をデプロイする前に ImageStream を作成する必要があります。ImageStreams は自動的に作成されません。
KafkaConnect
リソースを使用してクラスターを作成する場合は、Kafka Connect REST API を使用してコネクターを作成または更新できません。ただし、REST API を使用して情報を取得できます。
関連情報
- 「OpenShift 上の Streams for Apache Kafka のデプロイと管理」の Kafka Connect の設定
- 「OpenShift 上の Streams for Apache Kafka のデプロイと管理」の 新しいコンテナーイメージの自動ビルド
2.1.6.3. Streams for Apache Kafka を使用した Debezium Db2 コネクターのデプロイ
以前のバージョンの Streams for Apache Kafka では、OpenShift に Debezium コネクターをデプロイするには、まずコネクター用の Kafka Connect イメージをビルドする必要がありました。OpenShift にコネクターをデプロイするための現在の推奨方法は、Streams for Apache Kafka のビルド設定を使用して、使用する Debezium コネクタープラグインを含む Kafka Connect コンテナーイメージを自動的にビルドすることです。
ビルドプロセス中に、Streams for Apache Kafka Operator は、Debezium コネクター定義を含む KafkaConnect
カスタムリソースの入力パラメーターを Kafka Connect コンテナーイメージに変換します。このビルドは、Red Hat Maven リポジトリーまたは別の設定済みの HTTP サーバーから必要なアーティファクトをダウンロードします。
新規に作成されたコンテナーは .spec.build.output
に指定されるコンテナーレジストリーにプッシュされ、Kafka Connect クラスターのデプロイに使用されます。Streams for Apache Kafka が Kafka Connect イメージをビルドした後、ビルドに含まれるコネクターを起動するための KafkaConnector
カスタムリソースを作成します。
前提条件
- クラスター Operator がインストールされている OpenShift クラスターにアクセスできる。
- Streams for Apache Kafka Operator が実行されている。
- Apache Kafka クラスターが OpenShift 上の Streams for Apache Kafka のデプロイと管理 に記載されているとおりにデプロイされている。
- Kafka Connect が Streams for Apache Kafka にデプロイされている。
- Red Hat build of Debezium のライセンスを所有している。
-
OpenShift
oc
CLI クライアントがインストールされている、または OpenShift Container Platform Web コンソールにアクセスできる。 Kafka Connect ビルドイメージの保存方法に応じて、レジストリーのパーミッションを用意するか、ImageStream リソースを作成している。
- ビルドイメージを Red Hat Quay.io または Docker Hub などのイメージレジストリーに保存する場合は、以下が必要です。
- レジストリーでイメージを作成し、管理するためのアカウントおよびパーミッション
- ビルドイメージをネイティブ OpenShift ImageStream として保存する場合は、以下を行います。
- 新規コンテナーイメージを保存するために、ImageStream リソースをクラスターにデプロイします。クラスターの ImageStream を明示的に作成する必要があります。ImageStreams は、デフォルトでは利用できません。ImageStreams の詳細は、OpenShift Container Platform でのイメージストリームの管理 を参照してください。
手順
- OpenShift クラスターにログインします。
コネクターの Debezium
KafkaConnect
カスタムリソース (CR) を作成するか、既存のリソースを変更します。たとえば、metadata.annotations
およびspec.build
プロパティーを指定するdbz-connect.yaml
という名前のKafkaConnect
CR を作成します。以下の例は、KafkaConnect
カスタムリソースを記述するdbz-connect.yaml
ファイルからの抜粋を示しています。
例2.3 Debezium コネクターを含む
KafkaConnect
カスタムリソースを定義したdbz-connect.yaml
ファイル次の例では、カスタムリソースは、次のアーティファクトをダウンロードするように設定されています。
- Debezium Db2 コネクターアーカイブ。
- Red Hat build of Apicurio Registry アーカイブApicurio Registry はオプションのコンポーネントです。コネクターで Avro シリアル化を使用する場合にのみ、Apicurio Registry コンポーネントを追加します。
- Debezium スクリプト SMT アーカイブと Debezium コネクターで使用する関連言語の依存関係。SMT アーカイブおよび言語の依存関係は任意のコンポーネントです。Debezium コンテンツベースのルーティング SMT または フィルター SMT を使用する場合にのみ、これらのコンポーネントを追加します。
- Db2 JDBC ドライバー。Db2 データベースに接続するために必要ですが、コネクターアーカイブには含まれていません。
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: debezium-kafka-connect-cluster annotations: strimzi.io/use-connector-resources: "true" 1 spec: version: 3.6.0 build: 2 output: 3 type: imagestream 4 image: debezium-streams-connect:latest plugins: 5 - name: debezium-connector-db2 artifacts: - type: zip 6 url: https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-db2/2.7.3.Final-redhat-00001/debezium-connector-db2-2.7.3.Final-redhat-00001-plugin.zip 7 - type: zip url: https://maven.repository.redhat.com/ga/io/apicurio/apicurio-registry-distro-connect-converter/2.4.4.Final-redhat-<build-number>/apicurio-registry-distro-connect-converter-2.4.4.Final-redhat-<build-number>.zip 8 - type: zip url: https://maven.repository.redhat.com/ga/io/debezium/debezium-scripting/2.7.3.Final-redhat-00001/debezium-scripting-2.7.3.Final-redhat-00001.zip 9 - type: jar url: https://repo1.maven.org/maven2/org/apache/groovy/groovy/3.0.11/groovy-3.0.11.jar 10 - type: jar url: https://repo1.maven.org/maven2/org/apache/groovy/groovy-jsr223/3.0.11/groovy-jsr223-3.0.11.jar - type: jar url: https://repo1.maven.org/maven2/org/apache/groovy/groovy-json3.0.11/groovy-json-3.0.11.jar - type: jar 11 url: https://repo1.maven.org/maven2/com/ibm/db2/jcc/11.5.0.0/jcc-11.5.0.0.jar bootstrapServers: debezium-kafka-cluster-kafka-bootstrap:9093 ...
表2.18 Kafka Connect 設定の説明 項目 説明 1
strimzi.io/use-connector-resources
アノテーションを"true"
に設定して、クラスター Operator がKafkaConnector
リソースを使用してこの Kafka Connect クラスター内のコネクターを設定できるようにします。2
spec.build
設定は、ビルドイメージの保存場所を指定し、プラグインアーティファクトの場所とともにイメージに追加するプラグインをリストします。3
build.output
は、新しくビルドされたイメージを保存するレジストリーを指定します。4
イメージ出力の名前およびイメージ名を指定します。
output.type
の有効な値は、Docker Hub や Quay などのコンテナーレジストリーにプッシュする場合はdocker
、内部の OpenShift ImageStream にイメージをプッシュする場合はimagestream
です。ImageStream を使用するには、ImageStream リソースをクラスターにデプロイする必要があります。KafkaConnect 設定でbuild.output
を指定する方法の詳細は、{NameConfiguringStreamsOpenShift} の Streams for Apache Kafka ビルドスキーマリファレンス を参照してください。5
plugins
設定は、Kafka Connect イメージに追加するすべてのコネクターをリストします。リストの各エントリーについて、プラグインname
と、コネクターのビルドに必要なアーティファクトに関する情報を指定します。必要に応じて、各コネクタープラグインに対して、コネクターと使用できる他のコンポーネントを含めることができます。たとえば、Service Registry アーティファクトまたは Debezium スクリプトコンポーネントを追加できます。6
artifacts.type
の値は、artifacts.url
で指定するアーティファクトのファイルタイプを指定します。有効なタイプはzip
、tgz
、またはjar
です。Debezium コネクターアーカイブは、.zip
ファイル形式で提供されます。JDBC ドライバーファイルは.jar
形式です。type
の値は、url
フィールドで参照されるファイルのタイプと一致させる必要があります。7
artifacts.url
の値は、コネクターアーティファクトのファイルを格納する Maven リポジトリーなどの HTTP サーバーのアドレスを指定します。OpenShift クラスターが指定されたサーバーにアクセスできる必要があります。8
(オプション) Apicurio Registry コンポーネントをダウンロードするためのアーティファクト
type
とurl
を指定します。デフォルトの JSON コンバーターを使用する代わりに、コネクターが Apache Avro を使用して Red Hat build of Apicurio Registry でイベントのキーと値をシリアル化する場合にのみ、Apicurio Registry アーティファクトを含めます。9
(オプション) Debezium コネクターで使用する Debezium スクリプト SMT アーカイブのアーティファクト
type
とurl
を指定します。Debezium コンテンツベースルーティング SMT または フィルター SMT を使用する場合にのみ、スクリプト SMT を含めます。スクリプト SMT を使用するには、groovy などの JSR 223 準拠のスクリプト実装もデプロイする必要があります。10
(オプション) JSR 223 準拠のスクリプト実装の JAR ファイルのアーティファクト
type
とurl
を指定します。これは、Debezium スクリプト SMT で必要です。重要Streams for Apache Kafka を使用してコネクタープラグインを Kafka Connect イメージに組み込む場合、必要なスクリプト言語コンポーネントごとに、
artifacts.url
に JAR ファイルのロケーションを指定し、artifacts.type
の値もjar
に設定する必要があります。値が無効な場合は、実行時にコネクターが失敗します。スクリプト SMT で Apache Groovy 言語を使用できるようにするために、この例のカスタムリソースは、次のライブラリーの JAR ファイルを取得します。
-
groovy
-
groovy-jsr223
(スクリプトエージェント) -
groovy-json
(JSON 文字列を解析するためのモジュール)
Debezium スクリプト SMT は、GraalVM JavaScript の JSR 223 実装の使用もサポートします。
11
Maven Central にある Db2 JDBC ドライバーの場所を指定します。必要なドライバーが Debezium Db2 コネクターアーカイブに含まれていない。
以下のコマンドを入力して、
KafkaConnect
ビルド仕様を OpenShift クラスターに適用します。oc create -f dbz-connect.yaml
Streams Operator はカスタムリソースで指定された設定に基づいて、デプロイする Kafka Connect イメージを準備します。
ビルドが完了すると、Operator はイメージを指定されたレジストリーまたは ImageStream にプッシュし、Kafka Connect クラスターを起動します。設定にリスト表示されているコネクターアーティファクトはクラスターで利用できます。KafkaConnector
リソースを作成し、デプロイする各コネクターのインスタンスを定義します。
たとえば、以下のKafkaConnector
CR を作成し、db2-inventory-connector.yaml
として保存します。例2.4 Debezium コネクターの
KafkaConnector
カスタムリソースを定義するdb2-inventory-connector.yaml
ファイルapiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: labels: strimzi.io/cluster: debezium-kafka-connect-cluster name: inventory-connector-db2 1 spec: class: io.debezium.connector.db2.Db2ConnectorConnector 2 tasksMax: 1 3 config: 4 schema.history.internal.kafka.bootstrap.servers: debezium-kafka-cluster-kafka-bootstrap.debezium.svc.cluster.local:9092 schema.history.internal.kafka.topic: schema-changes.inventory database.hostname: db2.debezium-db2.svc.cluster.local 5 database.port: 50000 6 database.user: debezium 7 database.password: dbz 8 database.dbname: mydatabase 9 topic.prefix: inventory-connector-db2 10 table.include.list: public.inventory 11 ...
表2.19 コネクター設定の説明 項目 説明 1
Kafka Connect クラスターに登録するコネクターの名前。
2
コネクタークラスの名前。
3
同時に動作できるタスクの数。
4
コネクターの設定。
5
ホストデータベースインスタンスのアドレス。
6
データベースインスタンスのポート番号。
7
Debezium がデータベースへの接続に使用するアカウントの名前。
8
Debezium がデータベースユーザーアカウントに接続するために使用するパスワード。
9
変更をキャプチャーするデータベースの名前。
10
データベースインスタンスまたはクラスターのトピック接頭辞。
指定する名前は、英数字またはアンダースコアのみで設定する必要があります。
トピック接頭辞は、このコネクターから変更イベントを受信する Kafka トピックの接頭辞として使用されるため、名前はクラスターのコネクター間で一意である必要があります。
コネクターを Avro コネクター と統合する場合、この名前空間は、関連する Kafka Connect スキーマの名前や、対応する Avro スキーマの名前空間でも使用されます。11
コネクターが変更イベントをキャプチャーするテーブルのリスト。
以下のコマンドを実行してコネクターリソースを作成します。
oc create -n <namespace> -f <kafkaConnector>.yaml
以下に例を示します。
oc create -n debezium -f db2-inventory-connector.yaml
コネクターは Kafka Connect クラスターに登録され、
KafkaConnector
CR のspec.config.database.dbname
で指定されたデータベースに対して実行を開始します。コネクター Pod の準備ができると、Debezium が実行されます。
これで、Debezium Db2 のデプロイメントを確認 する準備が整いました。
2.1.6.4. Dockerfile からカスタム Kafka Connect コンテナーイメージをビルドして Debezium Db2 コネクターのデプロイ
Debezium Db2 コネクターをデプロイするには、Debezium コネクターアーカイブが含まれるカスタム Kafka Connect コンテナーイメージをビルドし、このコンテナーイメージをコンテナーレジストリーにプッシュする必要があります。次に、以下のカスタムリソース (CR) を作成する必要があります。
-
Kafka Connect インスタンスを定義する
KafkaConnect
CR。image
は Debezium コネクターを実行するために作成したイメージの名前を指定します。この CR は、Red Hat Streams for Apache Kafka がデプロイされている OpenShift インスタンスに適用します。Streams for Apache Kafka は、Apache Kafka を OpenShift に導入する Operator とイメージを提供します。 -
Debezium Db2 コネクターを定義する
KafkaConnector
CR。この CR をKafkaConnect
CR を適用したのと同じ OpenShift インスタンスに適用します。
前提条件
- Db2 が実行中で、Db2 をセットアップして Debezium コネクターと連携する 手順を完了している。
- Streams for Apache Kafka が OpenShift にデプロイされ、Apache Kafka および Kafka Connect が実行されている。詳細は、OpenShift 上の Streams for Apache Kafka のデプロイと管理 を参照してください。
- Podman または Docker がインストールされている。
- Kafka Connect サーバーは、Db2 用の必要な JDBC ドライバーをダウンロードするために、Maven Central にアクセスすることができます。また、ドライバーのローカルコピー、またはローカルの Maven リポジトリーや他の HTTP サーバーから利用可能なものを使用することもできます。
-
Debezium コネクターを実行するコンテナーを追加する予定のコンテナーレジストリー (
quay.io
やdocker.io
など) でコンテナーを作成および管理するアカウントとパーミッションを持っている。
手順
Kafka Connect の Debezium Db2 コンテナーを作成します。
registry.redhat.io/amq-streams-kafka-35-rhel8:2.5.0
をベースイメージとして使用して、新規の Dockerfile を作成します。たとえば、ターミナルウィンドウから、以下のコマンドを入力します。cat <<EOF >debezium-container-for-db2.yaml 1 FROM registry.redhat.io/amq-streams-kafka-35-rhel8:2.5.0 USER root:root RUN mkdir -p /opt/kafka/plugins/debezium 2 RUN cd /opt/kafka/plugins/debezium/ \ && curl -O https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-db2/2.7.3.Final-redhat-00001/debezium-connector-db2-2.7.3.Final-redhat-00001-plugin.zip \ && unzip debezium-connector-db2-2.7.3.Final-redhat-00001-plugin.zip \ && rm debezium-connector-db2-2.7.3.Final-redhat-00001-plugin.zip RUN cd /opt/kafka/plugins/debezium/ \ && curl -O https://repo1.maven.org/maven2/com/ibm/db2/jcc/11.5.0.0/jcc-11.5.0.0.jar USER 1001 EOF
項目 説明 1
任意のファイル名を指定できます。
2
Kafka Connect プラグインディレクトリーへのパスを指定します。Kafka Connect のプラグインディレクトリーが別の場所にある場合は、このパスを実際のディレクトリーのパスに置き換えてください。
このコマンドは、現在のディレクトリーに
debezium-container-for-db2.yaml
という名前の Docker ファイルを作成します。前のステップで作成した
debezium-container-for-db2.yaml
Docker ファイルからコンテナーイメージをビルドします。ファイルが含まれるディレクトリーから、ターミナルウィンドウを開き、以下のコマンドのいずれかを入力します。podman build -t debezium-container-for-db2:latest .
docker build -t debezium-container-for-db2:latest .
上記のコマンドは、
debezium-container-for-db2
という名前のコンテナーイメージを構築します。カスタムイメージを quay.io などのコンテナーレジストリーまたは内部のコンテナーレジストリーにプッシュします。コンテナーレジストリーは、イメージをデプロイする OpenShift インスタンスで利用できる必要があります。以下のいずれかのコマンドを実行します。
podman push <myregistry.io>/debezium-container-for-db2:latest
docker push <myregistry.io>/debezium-container-for-db2:latest
新しい Debezium Db2
KafkaConnect
カスタムリソース (CR) を作成します。たとえば、annotations
およびimage
プロパティーを指定するdbz-connect.yaml
という名前のKafkaConnect
CR を作成します。以下の例は、KafkaConnect
カスタムリソースを記述するdbz-connect.yaml
ファイルからの抜粋を示しています。
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect-cluster annotations: strimzi.io/use-connector-resources: "true" 1 spec: #... image: debezium-container-for-db2 2 ...
項目 説明 1
KafkaConnector
リソースはこの Kafka Connect クラスターでコネクターを設定するために使用されることを、metadata.annotations
は Cluster Operator に示します。2
spec.image
は Debezium コネクターを実行するために作成したイメージの名前を指定します。設定された場合、このプロパティーによって Cluster Operator のSTRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE
変数がオーバーライドされます。以下のコマンドを入力して、
KafkaConnect
CR を OpenShift Kafka Connect 環境に適用します。oc create -f dbz-connect.yaml
このコマンドは、Debezium コネクターを実行するために作成したイメージの名前を指定する Kafka Connect インスタンスを追加します。
Debezium Db2 コネクターインスタンスを設定する
KafkaConnector
カスタムリソースを作成します。通常、コネクターに使用できる設定プロパティーを使用して、
.yaml
ファイルに Debezium Db2 コネクターを設定します。コネクター設定は、Debezium に対して、スキーマおよびテーブルのサブセットにイベントを生成するよう指示する可能性があり、または機密性の高い、大きすぎる、または不必要な指定のコラムで Debezium が値を無視、マスク、または切り捨てするようにプロパティーを設定する可能性もあります。以下の例では、ポート
50000
で Db2 サーバーホスト192.168.99.100
に接続する Debezium コネクターを設定します。このホストには、データベース名がmydatabase
、テーブル名がinventory
、サーバーの論理名がinventory-connector-db2
があります。Db2
inventory-connector.yaml
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: inventory-connector-db2 1 labels: strimzi.io/cluster: my-connect-cluster annotations: strimzi.io/use-connector-resources: 'true' spec: class: io.debezium.connector.db2.Db2Connector 2 tasksMax: 1 3 config: 4 database.hostname: 192.168.99.100 5 database.port: 50000 6 database.user: db2inst1 7 database.password: Password! 8 database.dbname: mydatabase 9 topic.prefix: inventory-connector-db2 10 table.include.list: public.inventory 11 ...
表2.20 コネクター設定の説明 項目 説明 1
Kafka Connect クラスターに登録する場合のコネクターの名前。
2
この Db2 コネクタークラスの名前。
3
一度に実行できるタスクは 1 つだけです。
4
コネクターの設定。
5
Db2 インスタンスのアドレスであるデータベースホスト。
6
Db2 インスタンスのポート番号。
7
Db2 ユーザーの名前。
8
Db2 ユーザーのパスワード。
9
変更をキャプチャーするデータベースの名前。
10
名前空間を形成する Db2 インスタンス/クラスターの論理名で、コネクターが書き込む Kafka トピックの名前、Kafka Connect スキーマの名前、および Avro コネクター が使用される場合の対応する Avro スキーマの名前空間で使用されます。
11
コネクターは
public.inventory
テーブルからのみ変更をキャプチャーします。Kafka Connect でコネクターインスタンスを作成します。たとえば、
KafkaConnector
リソースをinventory-connector.yaml
ファイルに保存した場合は、以下のコマンドを実行します。oc apply -f inventory-connector.yaml
上記のコマンドは
inventory-connector
を登録し、コネクターはKafkaConnector
CR に定義されているmydatabase
データベースに対して実行を開始します。
Debezium Db2 コネクターに設定できる設定プロパティーの完全リストは、Db2 コネクタープロパティー を参照してください。
結果
コネクターが起動すると、コネクターが変更をキャプチャーするように設定された Db2 データベーステーブルの 整合性スナップショット が実行されます。その後、コネクターは行レベルの操作のデータ変更イベントの生成を開始し、変更イベントレコードを Kafka トピックにストリーミングします。
2.1.6.5. Debezium Db2 コネクターが実行していることの確認
コネクターがエラーなしで正常に起動すると、コネクターがキャプチャーするように設定された各テーブルのトピックが作成されます。ダウンストリームアプリケーションは、これらのトピックをサブスクライブして、ソースデータベースで発生する情報イベントを取得できます。
コネクターが実行されていることを確認するには、OpenShift Container Platform Web コンソールまたは OpenShift CLI ツール (oc) から以下の操作を実行します。
- コネクターのステータスを確認します。
- コネクターがトピックを生成していることを確認します。
- 各テーブルの最初のスナップショットの実行中にコネクターが生成する読み取り操作 ("op":"r") のイベントがトピックに反映されていることを確認します。
前提条件
- Debezium コネクターが OpenShift 上の Streams for Apache Kafka にデプロイされている。
-
OpenShift
oc
CLI クライアントがインストールされている。 - OpenShift Container Platform Web コンソールにアクセスできる。
手順
以下の方法のいずれかを使用して
KafkaConnector
リソースのステータスを確認します。OpenShift Container Platform Web コンソールから以下を実行します。
-
Home
Search に移動します。 -
Search ページで Resources をクリックし、Select Resource ボックスを開き、
KafkaConnector
を入力します。 - KafkaConnectors リストから、チェックするコネクターの名前をクリックします (例: inventory-connector-db2)。
- Conditions セクションで、Type および Status 列の値が Ready および True に設定されていることを確認します。
-
Home
ターミナルウィンドウから以下を実行します。
以下のコマンドを実行します。
oc describe KafkaConnector <connector-name> -n <project>
以下に例を示します。
oc describe KafkaConnector inventory-connector-db2 -n debezium
このコマンドは、以下の出力のようなステータス情報を返します。
例2.5
KafkaConnector
リソースのステータスName: inventory-connector-db2 Namespace: debezium Labels: strimzi.io/cluster=debezium-kafka-connect-cluster Annotations: <none> API Version: kafka.strimzi.io/v1beta2 Kind: KafkaConnector ... Status: Conditions: Last Transition Time: 2021-12-08T17:41:34.897153Z Status: True Type: Ready Connector Status: Connector: State: RUNNING worker_id: 10.131.1.124:8083 Name: inventory-connector-db2 Tasks: Id: 0 State: RUNNING worker_id: 10.131.1.124:8083 Type: source Observed Generation: 1 Tasks Max: 1 Topics: inventory-connector-db2.inventory inventory-connector-db2.inventory.addresses inventory-connector-db2.inventory.customers inventory-connector-db2.inventory.geom inventory-connector-db2.inventory.orders inventory-connector-db2.inventory.products inventory-connector-db2.inventory.products_on_hand Events: <none>
コネクターによって Kafka トピックが作成されたことを確認します。
OpenShift Container Platform Web コンソールから以下を実行します。
-
Home
Search に移動します。 -
Search ページで Resources をクリックし、Select Resource ボックスを開き、
KafkaTopic
を入力します。 -
KafkaTopics リストから確認するトピックの名前をクリックします (例:
inventory-connector-db2.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d
)。 - Conditions セクションで、Type および Status 列の値が Ready および True に設定されていることを確認します。
-
Home
ターミナルウィンドウから以下を実行します。
以下のコマンドを実行します。
oc get kafkatopics
このコマンドは、以下の出力のようなステータス情報を返します。
例2.6
KafkaTopic
リソースのステータスNAME CLUSTER PARTITIONS REPLICATION FACTOR READY connect-cluster-configs debezium-kafka-cluster 1 1 True connect-cluster-offsets debezium-kafka-cluster 25 1 True connect-cluster-status debezium-kafka-cluster 5 1 True consumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a debezium-kafka-cluster 50 1 True inventory-connector-db2--a96f69b23d6118ff415f772679da623fbbb99421 debezium-kafka-cluster 1 1 True inventory-connector-db2.inventory.addresses---1b6beaf7b2eb57d177d92be90ca2b210c9a56480 debezium-kafka-cluster 1 1 True inventory-connector-db2.inventory.customers---9931e04ec92ecc0924f4406af3fdace7545c483b debezium-kafka-cluster 1 1 True inventory-connector-db2.inventory.geom---9f7e136091f071bf49ca59bf99e86c713ee58dd5 debezium-kafka-cluster 1 1 True inventory-connector-db2.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d debezium-kafka-cluster 1 1 True inventory-connector-db2.inventory.products---df0746db116844cee2297fab611c21b56f82dcef debezium-kafka-cluster 1 1 True inventory-connector-db2.inventory.products_on_hand---8649e0f17ffcc9212e266e31a7aeea4585e5c6b5 debezium-kafka-cluster 1 1 True schema-changes.inventory debezium-kafka-cluster 1 1 True strimzi-store-topic---effb8e3e057afce1ecf67c3f5d8e4e3ff177fc55 debezium-kafka-cluster 1 1 True strimzi-topic-operator-kstreams-topic-store-changelog---b75e702040b99be8a9263134de3507fc0cc4017b debezium-kafka-cluster 1 1 True
トピックの内容を確認します。
- ターミナルウィンドウから、以下のコマンドを入力します。
oc exec -n <project> -it <kafka-cluster> -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=<topic-name>
以下に例を示します。
oc exec -n debezium -it debezium-kafka-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=inventory-connector-db2.inventory.products_on_hand
トピック名を指定する形式は、手順 1 で返された
oc describe
コマンドと同じです (例:inventory_connector_db2.inventory.addresses
)。トピックの各イベントについて、このコマンドは、以下の出力のような情報を返します。
例2.7 Debezium 変更イベントの内容
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"inventory-connector-db2.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-db2.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-db2.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"int64","optional":false,"field":"ts_us"},{"type":"int64","optional":false,"field":"ts_ns"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.db2.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"inventory-connector-db2.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"2.7.3.Final-redhat-00001","connector":"db2","name":"inventory-connector-db2","ts_ms":1638985247805,"ts_us":1638985247805000000,"ts_ns":1638985247805000000,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"db2-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"ts_us":1638985247805102,"ts_ns":1638985247805102588,"transaction":null}}
上記の例では、
payload
値は、コネクタースナップショットがテーブルinventory.products_on_hand
から読み込み ("op" ="r"
) イベントを生成したことを示しています。product_id
レコードの"before"
状態はnull
であり、レコードに以前の値が存在しないことを示しています。"after"
状態は、product_id
101
を持つ項目のquantity
が3
であることを示しています。
2.1.6.6. Debezium Db2 コネクター設定プロパティーの説明
Debezium Db2 コネクターには、アプリケーションに適したコネクター動作を実現するために使用できる設定プロパティーが多数あります。多くのプロパティーにはデフォルト値があります。プロパティーに関する情報は、以下のように設定されています。
- 必要な設定プロパティー
- 高度な設定プロパティー
- Debezium がデータベース履歴トピックから読み取るイベントを処理する方法を制御する データベース履歴コネクター設定プロパティー
必要な Debezium Db2 コネクター設定プロパティー
以下の設定プロパティーは、デフォルト値がない場合は必須です。
プロパティー | デフォルト | 説明 |
---|---|---|
デフォルトなし | コネクターの一意名。同じ名前で再登録を試みると失敗します。このプロパティーはすべての Kafka Connect コネクターに必要です。 | |
デフォルトなし |
コネクターの Java クラスの名前。Db2 コネクターには、常に | |
| このコネクターのために作成する必要のあるタスクの最大数。Db2 コネクターは常に単一のタスクを使用するため、この値を使用しません。そのため、デフォルト値は常に許容されます。 | |
デフォルトなし | Db2 データベースサーバーの IP アドレスまたはホスト名。 | |
| Db2 データベースサーバーの整数のポート番号。 | |
デフォルトなし | Db2 データベースサーバーに接続するための Db2 データベースユーザーの名前。 | |
デフォルトなし | Db2 データベースサーバーへの接続時に使用するパスワード。 | |
デフォルトなし | 変更をストリーミングする Db2 データベースの名前 | |
デフォルトなし |
Debezium が変更をキャプチャーするデータベースをホストする特定の Db2 データベースサーバーの namespace を提供するトピック接頭辞。トピックの接頭辞名には、英数字、ハイフン、ドット、およびアンダースコアのみを使用する必要があります。このトピック接頭辞は、このコネクターからレコードを受け取るすべての Kafka トピックに使用されるため、トピック接頭辞は他のすべてのコネクターで一意である必要があります。 警告 このプロパティーの値を変更しないでください。名前の値を変更すると、再起動後に、元のトピックにイベントを発行し続けるのではなく、新しい値に基づいた名前のトピックに後続のイベントを発行します。また、コネクターはデータベーススキーマ履歴トピックを復元できません。 | |
デフォルトなし |
コネクターで変更をキャプチャーするテーブルの完全修飾テーブル識別子と一致する正規表現のコンマ区切りリスト (任意)。このプロパティーが設定されている場合、コネクターは指定されたテーブルからのみ変更をキャプチャします。各識別子の形式は schemaName.tableName です。デフォルトでは、コネクターはシステム以外のテーブルすべての変更をキャプチャーします。
テーブルの名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、テーブル名に存在する可能性のある部分文字列とは一致しない、テーブルの名前文字列全体と照合されます。 | |
デフォルトなし |
コネクターで変更をキャプチャーしないテーブルの完全修飾テーブル識別子と一致する正規表現のコンマ区切りリスト (任意)。コネクターは exclude リストに含まれていないシステム以外のテーブルごとに変更をキャプチャーします。各識別子の形式は schemaName.tableName です。
テーブルの名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、テーブル名に存在する可能性のある部分文字列とは一致しない、テーブルの名前文字列全体と照合されます。 | |
空の文字列 |
変更イベントレコード値に含める列の完全修飾名と一致する正規表現のコンマ区切りリスト (任意)。列の完全修飾名の形式は schemaName.tableName.columnName です。
列の名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、列名に存在する可能性のある部分文字列とは一致しない、列の名前文字列全体と照合されます。このプロパティーを設定に含める場合は、 | |
空の文字列 |
変更イベント値から除外する列の完全修飾名と一致する正規表現のコンマ区切りリスト (任意)。列の完全修飾名の形式は schemaName.tableName.columnName です。
列の名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、列名に存在する可能性のある部分文字列とは一致しない、列の名前文字列全体と照合されます。プライマリーキー列は、値から除外された場合でも、イベントのキーに常に含まれます。このプロパティーを設定に含める場合は、 | |
該当なし |
文字ベースの列の完全修飾名と一致する正規表現のコンマ区切りリスト (任意)。列の完全修飾名の形式は schemaName.tableName.columnName です。
仮名は、指定された hashAlgorithm と salt を適用すると得られるハッシュ化された値で構成されます。使用されるハッシュ関数に基づいて、参照整合性は保持され、列値は仮名に置き換えられます。サポートされるハッシュ関数は、Java Cryptography Architecture Standard Algorithm Name Documentation の MessageDigest セクションに説明されています。 column.mask.hash.SHA-256.with.salt.CzQMA0cB5K = inventory.orders.customerName, inventory.shipment.customerName
必要な場合は、仮名は自動的に列の長さに短縮されます。コネクター設定には、異なるハッシュアルゴリズムと salt を指定する複数のプロパティーを含めることができます。 | |
|
時間、日付、およびタイムスタンプは、異なる精度の種類で表すことができます。 | |
|
delete イベントの後に廃棄 (tombstone) イベントが続くかどうかを制御します。 | |
| コネクターがデータベーススキーマの変更を、データベースサーバー ID と同じ名前の Kafka トピックに公開するかどうかを指定するブール値。各スキーマの変更は、データベース名が含まれるキーと、スキーマ更新を記述する JSON 構造である値で記録されます。これは、コネクターがデータベーススキーマ履歴を内部で記録する方法には依存しません。 | |
該当なし |
文字ベースの列の完全修飾名と一致する正規表現のコンマ区切りリスト (任意)。プロパティー名の 長さ で指定された文字数を超えた場合に、一連の列のデータを切り捨てる場合は、このプロパティーを設定します。 列の完全修飾名は、次の形式に従います: schemaName.tableName.columnName列の名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、列の名前文字列全体に対して照合されます。式は、列名に存在する可能性のある部分文字列と一致しません。 単一の設定で、異なる長さを持つ複数のプロパティーを指定できます。 | |
該当なし |
文字ベースの列の完全修飾名と一致する正規表現のコンマ区切りリスト (任意)。一連の列の値をコネクターでマスクする場合 (たとえば、列に機密データが含まれている場合) は、このプロパティーを設定します。
列の完全修飾名は、次の形式に従います: schemaName.tableName.columnName. 単一の設定で、異なる長さを持つ複数のプロパティーを指定できます。 | |
該当なし | 列のメタデータを表す追加パラメーターをコネクターに発行させたい列の完全修飾名に一致する、オプションのコンマ区切りの正規表現のリスト。このプロパティーが設定されている場合、コネクターは次のフィールドをイベントレコードのスキーマに追加します。
これらのパラメーターは、列の元の型名と長さ (可変幅型の場合) をそれぞれ伝達します。
列の完全修飾名は、次のいずれかの形式に従います: databaseName.tableName.columnName、または databaseName.schemaName.tableName.columnName. | |
該当なし | データベース内の列に対して定義されているデータ型の完全修飾名を指定する正規表現のオプションのコンマ区切りリスト。このプロパティーが設定されている場合、データ型が一致する列に対して、コネクターはスキーマに次の追加フィールドを含むイベントレコードを発行します。
これらのパラメーターは、列の元の型名と長さ (可変幅型の場合) をそれぞれ伝達します。
列の完全修飾名の形式は、databaseName.tableName.typeName、または databaseName.schemaName.tableName.typeName のいずれかになります。 Db2 固有のデータ型名の一覧は、Db2 データ型マッピング を参照してください。 | |
空の文字列 | 指定のテーブルの Kafka トピックに公開する変更イベントレコードのカスタムメッセージキーを形成するためにコネクターが使用する列を指定する式のリスト。
デフォルトでは、Debezium はテーブルのプライマリーキー列を、出力するレコードのメッセージキーとして使用します。デフォルトの代わりに、またはプライマリーキーのないテーブルのキーを指定するには、1 つ以上の列をもとにカスタムメッセージキーを設定できます。
プロパティーは複数のテーブルのエントリーをリストできます。リスト内の異なるテーブルのエントリーは、セミコロンを使用して、区切ります。 | |
none |
コネクターで使用されるメッセージコンバータとの互換性のために、スキーマ名をどのように調整するかを指定します。設定可能:
| |
none |
コネクターで使用されるメッセージコンバータとの互換性のために、フィールド名をどのように調整するかを指定します。設定可能:
詳細は、Avro の命名 を参照してください。 |
高度なコネクター設定プロパティー
以下の 高度な 設定プロパティーには、ほとんどの状況で機能するデフォルト設定があるため、コネクターの設定で指定する必要はほとんどありません。
プロパティー | デフォルト | 説明 |
---|---|---|
デフォルトなし |
コネクターが使用できる カスタムコンバーター インスタンスのシンボリック名をコンマ区切りリストで列挙します。以下に例を示します。
コネクターがカスタムコンバーターを使用できるようにするには、
コネクターに設定するコンバーターごとに、コンバーターインターフェイスを実装するクラスの完全修飾名を指定する
以下に例を示します。 isbn.type: io.debezium.test.IsbnConverter
設定されたコンバータの動作をさらに制御したい場合は、1 つ以上の設定パラメーターを追加して、コンバータに値を渡すことができます。追加の設定パラメーターとコンバーターを関連付けるには、パラメーター名の前にコンバーターのシンボリック名を付けます。 isbn.schema.name: io.debezium.db2.type.Isbn | |
Initial |
コネクターの起動時にスナップショットを実行するための基準を指定します。
| |
exclusive | コネクターがテーブルロックを保持するかどうか、また保持する時間をコントロールします。テーブルロックは、スナップショット中に他のデータベースクライアントが特定のテーブル操作を実行できないようにします。以下の値を設定できます。
| |
|
スナップショットを実行するときにコネクターがデータをクエリーする方法を指定します。
この設定により、 | |
|
スナップショットの実行中に、トランザクション分離レベルとキャプチャーモードのテーブルをロックする期間を制御します。使用できる値は次のとおりです。 | |
|
イベントの処理中にコネクターが例外を処理する方法を指定します。使用できる値は次のとおりです。 | |
| コネクターがイベントのバッチの処理を開始する前に、新しい変更イベントの発生を待つ期間をミリ秒単位で指定する正の整数値。デフォルトは 500 ミリ秒 (0.5 秒) です。 | |
| コネクターが処理するイベントの各バッチの最大サイズを指定する正の整数値。 | |
|
ブロッキングキューが保持できるレコードの最大数を指定する正の整数値。Debezium はデータベースからストリームされたイベントを読み込む際、Kafka に書き込む前にブロッキングキューにイベントを配置します。ブロッキングキューは、コネクターが Kafka に書き込むよりも速くメッセージを取り込む場合、または Kafka が利用できなくなった場合に、データベースから変更イベントを読み込むためのバックプレッシャーを提供することができます。コネクターがオフセットを定期的に記録すると、キューに保持されるイベントは無視されます。 | |
|
ブロッキングキューの最大容量をバイト単位で指定する長整数値。デフォルトでは、ブロックキューにはボリューム制限は指定されません。キューが使用できるバイト数を指定するには、このプロパティーを正の long 値に設定します。 | |
|
コネクターがハートビートメッセージを Kafka トピックに送信する頻度を制御します。デフォルトの動作では、コネクターはハートビートメッセージを送信しません。 | |
デフォルトなし | コネクターの起動時にスナップショットを実行するまでコネクターが待つ必要がある間隔 (ミリ秒単位)。クラスターで複数のコネクターを起動する場合、このプロパティーは、コネクターのリバランスが行われる原因となるスナップショットの中断を防ぐのに役立ちます。 | |
0 |
コネクターがスナップショットを完了した後、ストリーミングプロセスの開始を遅延する時間をミリ秒単位で指定します。遅延間隔を設定すると、スナップショットが完了した直後で、ストリーミングプロセスの開始前に障害が発生した場合に、コネクターがスナップショットを再開できないようにします。Kafka Connect ワーカーに設定されている | |
|
スナップショットに含めるテーブルの完全修飾名 ( テーブルの名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、テーブル名に存在する可能性のある部分文字列とは一致しない、テーブルの名前文字列全体と照合されます。 | |
| スナップショットの実行中、コネクターは行のバッチでテーブルの内容を読み取ります。このプロパティーは、バッチの行の最大数を指定します。 | |
|
スナップショットの実行時に、テーブルロックを取得するまで待つ最大時間 (ミリ秒単位) を指定する正の整数値。コネクターがこの間隔でテーブルロックを取得できないと、スナップショットは失敗します。詳細は、コネクターによるスナップショットの実行方法 を参照してください。その他の可能な設定は次のとおりです。 | |
デフォルトなし | スナップショットに追加するテーブル行を指定します。スナップショットにテーブルの行のサブセットのみを含める場合は、プロパティーを使用します。このプロパティーはスナップショットにのみ影響します。コネクターがログから読み取るイベントには影響しません。
プロパティーには、
スナップショットにソフト削除以外のレコードのみを含める場合は、soft-delete 列 ( "snapshot.select.statement.overrides": "customer.orders", "snapshot.select.statement.overrides.customer.orders": "SELECT * FROM customers.orders WHERE delete_flag = 0 ORDER BY id DESC"
作成されるスナップショットでは、コネクターには | |
|
コネクターがトランザクション境界でイベントを生成し、トランザクションメタデータで変更イベントエンベロープを強化するかどうかを決定します。コネクターにこれを実行させる場合は | |
|
ストリーミング中にスキップされる操作タイプのコンマ区切りリスト。挿入/作成は | |
デフォルトなし |
シグナルをコネクターへの送信に使用されるデータコレクションの完全修飾名 。コレクション名の指定には | |
source | コネクターに対して有効な信号チャネル名のリスト。デフォルトでは、以下のチャネルが利用可能です。
| |
デフォルトなし | コネクターに対して有効になっている通知チャネル名のリスト。デフォルトでは、以下のチャネルが利用可能です。
| |
| 増分スナップショットのチャンクの実行中にコネクターがメモリーを取得して読み取る行の最大数。スナップショットは、サイズが大きいスナップショットの場合にはクエリーが少なくなるため、チャンクサイズを増やすと効率が上がります。ただし、チャンクサイズが大きい場合には、スナップショットデータのバッファーにより多くのメモリーが必要になります。チャンクサイズは、環境で最適なパフォーマンスを発揮できる値に、調整します。 | |
|
増分スナップショットによってキャプチャーされ、ストリーミングの再開後に再キャプチャーされる可能性のあるイベントを重複排除するために、コネクターが増分スナップショット中に使用するウォーターマークメカニズムを指定します。
| |
|
データ変更、スキーマ変更、トランザクション、ハートビートイベントなどのトピック名を決定するために使用する TopicNamingStrategy クラスの名前。デフォルトは | |
|
トピック名の区切り文字を指定します。デフォルトは | |
| トピック名を保持するために使用されるサイズ (bounded concurrent hash map)。このキャッシュは、与えられたデータコレクションに対応するトピック名を決定するのに役立つ。 | |
|
コネクターがハートビートメッセージを送信するトピックの名前を制御します。トピック名のパターンは、 | |
|
コネクターがトランザクションのメタデータメッセージを送信するトピックの名前を制御します。トピック名のパターンは、 | |
| 初期スナップショットを実行するときにコネクターが使用するスレッドの数を指定します。並列初期スナップショットを有効にするには、プロパティーを 1 より大きい値に設定します。並列初期スナップショットでは、コネクターは複数のテーブルを同時に処理します。 重要 並列初期スナップショットはテクノロジープレビュー機能のみとなっています。テクノロジープレビュー機能は、Red Hat 製品のサービスレベルアグリーメント (SLA) の対象外であり、機能的に完全ではないことがあります。Red Hat は、実稼働環境でこれらを使用することを推奨していません。テクノロジープレビュー機能は、最新の製品機能をいち早く提供して、開発段階で機能のテストを行い、フィードバックを提供していただくことを目的としています。Red Hat のテクノロジープレビュー機能のサポート範囲に関する詳細は、テクノロジープレビュー機能のサポート範囲 を参照してください。 | |
|
コンテキスト情報を提供するメタデータを追加して、MBean オブジェクト名をカスタマイズするタグを定義します。キーと値のペアのコンマ区切りリストを指定します。各キーは MBean オブジェクト名のタグを表し、対応する値はキーの値を表します。たとえば、 コネクターは、指定されたタグを基本 MBean オブジェクト名に追加します。タグは、メトリクスデータを整理および分類するのに役立ちます。特定のアプリケーションインスタンス、環境、リージョン、バージョンなどを識別するためのタグを定義できます。詳細は、カスタマイズされた MBean 名 を参照してください。 | |
|
接続エラーなど、再試行可能なエラーが発生する操作の後に、コネクターがどのように応答するかを指定します。
| |
|
コネクターがクエリーの完了を待機する時間をミリ秒単位で指定します。タイムアウト制限を削除するには、値を |
Debezium Db2 コネクターデータベーススキーマ履歴設定プロパティー
Debezium には、コネクターがスキーマ履歴トピックと対話する方法を制御する schema.history.internal.*
プロパティーのセットが含まれています。
以下の表は、Debezium コネクターを設定するための schema.history.internal
プロパティーを説明しています。
プロパティー | デフォルト | 説明 |
---|---|---|
デフォルトなし | コネクターがデータベーススキーマの履歴を保存する Kafka トピックの完全名。 | |
デフォルトなし | Kafka クラスターへの最初の接続を確立するためにコネクターが使用するホストとポートのペアのリスト。このコネクションは、コネクターによって以前に保存されたデータベーススキーマ履歴の取得や、ソースデータベースから読み取られる各 DDL ステートメントの書き込みに使用されます。各ペアは、Kafka Connect プロセスによって使用される同じ Kafka クラスターを示す必要があります。 | |
| 永続化されたデータのポーリングが行われている間にコネクターが起動/回復を待つ最大時間 (ミリ秒単位) を指定する整数値。デフォルトは 100 ミリ秒です。 | |
| Kafka 管理クライアントを使用してクラスター情報を取得する際に、コネクターが待機すべき最大ミリ秒数を指定する整数値です。 | |
| Kafka 管理クライアントを使用して kafka 履歴トピックを作成する間、コネクターが待機する最大ミリ秒数を指定する整数値。 | |
|
エラーでコネクターのリカバリーが失敗する前に、コネクターが永続化された履歴データの読み取りを試行する最大回数。データが受信されなかった場合に最大待機する時間は、 | |
|
コネクターが不正または不明なデータベースのステートメントを無視するかどうか、または人が問題を修正するために処理を停止するかどうかを指定するブール値。安全なデフォルトは | |
|
コネクターがスキーマまたはデータベース内のすべてのテーブルからスキーマ構造を記録するか、キャプチャー対象に指定されたテーブルのみからスキーマ構造を記録するかを指定するブール値。
| |
|
コネクターがデータベースインスタンス内のすべての論理データベースのスキーマ構造を記録するかどうかを指定するブール値。
|
パススルー Db2 コネクター設定プロパティー
コネクターは pass-through プロパティーをサポートしており、これにより Debezium は Apache Kafka プロデューサーとコンシューマーの動作を微調整するためのカスタム設定オプションを指定できます。Kafka プロデューサーとコンシューマーの全設定プロパティーの詳細は、Kafka ドキュメント を参照してください。
プロデューサーとコンシューマーのクライアントがスキーマ履歴トピックと対話する方法を設定するための Pass-through プロパティー
Debezium は、データベーススキーマ履歴トピックへのスキーマ変更を記述するために Apache Kafka プロデューサーに依存しています。同様に、コネクターが起動すると、データベーススキーマ履歴トピックから読み取る Kafka コンシューマーに依存します。schema.history.internal.producer.*
および schema.history.internal.consumer.*
接頭辞で始まるパススルー設定プロパティーのセットに値を割り当てて、Kafka プロデューサーおよびコンシューマークライアントの設定を定義します。パススループロデューサーおよびコンシューマーデータベーススキーマ履歴プロパティーは、以下の例のように Kafka ブローカーとのこれらのクライアントの接続をセキュアにする方法など、さまざまな動作を制御します。
schema.history.internal.producer.security.protocol=SSL schema.history.internal.producer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks schema.history.internal.producer.ssl.keystore.password=test1234 schema.history.internal.producer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks schema.history.internal.producer.ssl.truststore.password=test1234 schema.history.internal.producer.ssl.key.password=test1234 schema.history.internal.consumer.security.protocol=SSL schema.history.internal.consumer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks schema.history.internal.consumer.ssl.keystore.password=test1234 schema.history.internal.consumer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks schema.history.internal.consumer.ssl.truststore.password=test1234 schema.history.internal.consumer.ssl.key.password=test1234
Debezium は、プロパティーを Kafka クライアントに渡す前に、プロパティー名から接頭辞を削除します。
Kafka プロデューサー設定プロパティー と Kafka コンシューマー設定プロパティー の詳細は、Apache Kafka ドキュメントを参照してください。
Db2 コネクターが Kafka シグナリングトピックと対話する方法を設定するための Pass-through プロパティー
Debezium は、コネクターが Kafka シグナルトピックと対話する方法を制御する signal.*
プロパティーのセットを提供します。
以下の表は、Kafka signal
プロパティーを説明しています。
プロパティー | デフォルト | 説明 |
---|---|---|
<topic.prefix>-signal | コネクターがアドホックシグナルについて監視する Kafka トピックの名前。 注記 トピックの自動作成 が無効になっている場合は、必要なシグナリングトピックを手動で作成する必要があります。シグナルの順序を維持するには、シグナルトピックが必要です。シグナリングトピックには単一のパーティションが必要です。 | |
kafka-signal | Kafka コンシューマーによって使用されるグループ ID の名前。 | |
デフォルトなし | コネクターが Kafka クラスターへの初期接続を確立するために使用するホストとポートのペアのリスト。各ペアは、Debezium Kafka Connect プロセスによって使用される Kafka クラスターを参照します。 | |
| コネクターが信号をポーリングするときに待機する最大ミリ秒数を指定する整数値。 | |
| Kafka コンシューマーがシグナリングトピックからメッセージを読み取った後にオフセットコミットを書き込むかどうかを指定します。このプロパティーに割り当てる値によって、コネクターがオフラインのときに、シグナリングトピックが受信する要求をコネクターが処理できるかどうかが決まります。次のいずれかの設定を選択します。
|
シグナリングチャネルの Kafka コンシューマークライアントを設定するためのパススループロパティー
Debezium コネクターでは、Kafka コンシューマーのパススルー設定が可能です。パススルーシグナルのプロパティーは、接頭辞 signals.consumer.*
で始まります。たとえば、コネクターは signal.consumer.security.protocol=SSL
などのプロパティーを Kafka コンシューマーに渡します。
Debezium は、プロパティーを Kafka シグナルコンシューマーに渡す前に、プロパティーから接頭辞を削除します。
Db2 コネクター sink notification チャネルを設定するためのパススループロパティー
次の表では、Debezium sink notification
チャネルの設定に使用できるプロパティーについて説明します。
プロパティー | デフォルト | 説明 |
---|---|---|
デフォルトなし |
Debezium から通知を受信するトピックの名前。このプロパティーは、有効な通知チャネルの 1 つとして |
Debezium コネクターのパススルーデータベースドライバー設定プロパティー
Debezium コネクターでは、データベースドライバーのパススルー設定が可能です。パススルーデータベースプロパティーは接頭辞 driver.*
で始まります。たとえば、コネクターは driver.foobar=false
などのプロパティーを JDBC URL に渡します。
Debezium は、プロパティーをデータベースドライバーに渡す前に、プロパティーから接頭辞を削除します。
2.1.7. Debezium Db2 コネクターのパフォーマンスの監視
Debezium Db2 コネクターは、Apache ZooKeeper、Apache Kafka、および Kafka Connect によって提供される JMX メトリクスの組み込みサポートに加えて、3 種類のメトリクスを提供します。
- スナップショットメトリクス は、スナップショットの実行中にコネクター操作に関する情報を提供します。
- メトリクスのストリーミング は、コネクターが変更をキャプチャーし、変更イベントレコードをストリーミングする際のコネクター操作に関する情報を提供します。
- スキーマ履歴メトリクス は、コネクターのスキーマ履歴の状態に関する情報を提供します。
Debezium モニタリングのドキュメント では、JMX を使用してこれらのメトリクスを公開する方法の詳細を説明しています。
2.1.7.1. Db2 コネクタースナップショットおよびストリーミング MBean オブジェクトのカスタマイズされた名前
Debezium コネクターは、コネクターの MBean 名を介してメトリクスを公開します。これらのメトリクスは各コネクターインスタンスに固有であり、コネクターのスナップショット、ストリーミング、およびスキーマ履歴プロセスの動作に関するデータを提供します。
デフォルトでは、正しく設定されたコネクターをデプロイすると、Debezium はさまざまなコネクターメトリクスごとに一意の MBean 名を生成します。コネクタープロセスのメトリクスを表示するには、MBean を監視するように可観測性スタックを設定します。ただし、これらのデフォルトの MBean 名はコネクター設定に依存しており、設定の変更によって MBean 名が変更される場合があります。MBean 名を変更すると、コネクターインスタンスと MBean 間のリンクが切断され、監視アクティビティーが中断されます。このシナリオでは、監視を再開するには、新しい MBean 名を使用するように監視スタックを再設定する必要があります。
MBean 名の変更が原因で監視が中断されないように、カスタムメトリクスタグを設定できます。カスタムメトリクスを設定するには、コネクター設定に custom.metric.tags
プロパティーを追加します。このプロパティーは、各キーが MBean オブジェクト名のタグを表し、対応する値がそのタグの値を表すキーと値のペアを受け入れます。たとえば、k1=v1,k2=v2
です。Debezium は、指定されたタグをコネクターの MBean 名に追加します。
コネクターの custom.metric.tags
プロパティーを設定した後、指定されたタグに関連付けられたメトリクスを取得するように監視スタックを設定できます。可観測性スタックは、変更可能な MBean 名ではなく、指定されたタグを使用してコネクターを一意に識別します。その後、Debezium が MBean 名の構築方法を再定義したり、コネクター設定の topic.prefix
が変更されたりしても、メトリクススクレイプタスクは指定されたタグパターンを使用してコネクターを識別するため、メトリクスの収集は中断されません。
カスタムタグを使用するさらなる利点は、データパイプラインのアーキテクチャーを反映するタグを使用できるため、運用上のニーズに合った方法でメトリクスを整理できることです。たとえば、コネクターアクティビティーのタイプ、アプリケーションコンテキスト、またはデータソースを宣言する値を持つタグを指定できます (例: db1-streaming-for-application-abc
)。複数のキーと値のペアを指定すると、指定されたすべてのペアがコネクターの MBean 名に追加されます。
次の例は、タグがデフォルトの MBean 名を変更する方法を示しています。
例2.8 カスタムタグがコネクター MBean 名を変更する方法
デフォルトでは、Db2 コネクターはストリーミングメトリクスに次の MBean 名を使用します。
debezium.db2:type=connector-metrics,context=streaming,server=<topic.prefix>
custom.metric.tags
の値を database=salesdb-streaming,table=inventory
に設定すると、Debezium は次のカスタム MBean 名を生成します。
debezium.db2:type=connector-metrics,context=streaming,server=<topic.prefix>,database=salesdb-streaming,table=inventory
2.1.7.2. Db2 データベースのスナップショット作成時の Debezium の監視
MBean は debezium.db2:type=connector-metrics,context=snapshot,server= <topic.prefix>
です。
スナップショット操作がアクティブでない場合や、最後のコネクターの起動後にスナップショットの作成が発生した場合に、スナップショットメトリクスは公開されません。
次の表に、使用可能なスナップショットメトリクスを示します。
属性 | タイプ | 説明 |
---|---|---|
| コネクターが読み取りした最後のスナップショットイベント。 | |
| コネクターが最新のイベントを読み取りおよび処理してからの経過時間 (ミリ秒単位)。 | |
| 前回の開始またはリセット以降にコネクターで確認されたイベントの合計数。 | |
| コネクターに設定された include/exclude リストのフィルタリングルールによってフィルターされたイベントの数。 | |
| コネクターによって取得されるテーブルのリスト。 | |
| snapshotter とメインの Kafka Connect ループの間でイベントを渡すために使用されるキューの長さ。 | |
| snapshotter とメインの Kafka Connect ループの間でイベントを渡すために使用されるキューの空き容量。 | |
| スナップショットに含まれているテーブルの合計数。 | |
| スナップショットによってまだコピーされていないテーブルの数。 | |
| スナップショットが起動されたかどうか。 | |
| スナップショットが一時停止されたかどうか。 | |
| スナップショットが中断されたかどうか。 | |
| スナップショットが完了したかどうか。 | |
| スナップショットが完了したかどうかに関わらず、これまでスナップショットにかかった時間 (秒単位)。スナップショットが一時停止された時間も含まれます。 | |
| スナップショットが一時停止された合計秒数。スナップショットが数回一時停止された場合は、一時停止時間が加算されます。 | |
| スナップショットの各テーブルに対してスキャンされる行数が含まれるマップ。テーブルは、処理中に増分がマップに追加されます。スキャンされた 10,000 行ごとに、テーブルの完成時に更新されます。 | |
|
キューの最大バッファー (バイト単位)。このメトリクスは、 | |
| キュー内のレコードの現在の容量 (バイト単位)。 |
コネクターは、増分スナップショットの実行時に、以下の追加のスナップショットメトリクスも提供します。
属性 | タイプ | 説明 |
---|---|---|
| 現在のスナップショットチャンクの識別子。 | |
| 現在のチャンクを定義するプライマリーキーセットの下限。 | |
| 現在のチャンクを定義するプライマリーキーセットの上限。 | |
| 現在スナップショットされているテーブルのプライマリーキーセットの下限。 | |
| 現在スナップショットされているテーブルのプライマリーキーセットの上限。 |
2.1.7.3. Debezium Db2 コネクターレコードストリーミングの監視
MBean は debezium.db2:type=connector-metrics,context=streaming,server= <topic.prefix>
です。
以下の表は、利用可能なストリーミングメトリクスのリストです。
属性 | タイプ | 説明 |
---|---|---|
| コネクターが読み取られた最後のストリーミングイベント。 | |
| コネクターが最新のイベントを読み取りおよび処理してからの経過時間 (ミリ秒単位)。 | |
| コネクターを最後に起動してから、またはメトリクスをリセットしてから、ソースデータベースによって報告されたデータ変更イベントの合計数。Debezium が処理するデータ変更ワークロードを表します。 | |
| コネクターを最後に起動してから、またはメトリクスをリセットしてから、コネクターによって処理された作成イベントの合計数。 | |
| コネクターを最後に起動してから、またはメトリクスをリセットしてから、コネクターによって処理された更新イベントの合計数。 | |
| コネクターを最後に起動してから、またはメトリクスをリセットしてから、コネクターによって処理された削除イベントの合計数。 | |
| コネクターに設定された include/exclude リストのフィルタリングルールによってフィルターされたイベントの数。 | |
| コネクターによって取得されるテーブルのリスト。 | |
| ストリーマーとメイン Kafka Connect ループの間でイベントを渡すために使用されるキューの長さ。 | |
| ストリーマーとメインの Kafka Connect ループの間でイベントを渡すために使用されるキューの空き容量。 | |
| コネクターが現在データベースサーバーに接続されているかどうかを示すフラグ。 | |
| 最後の変更イベントのタイムスタンプとそれを処理するコネクターとの間の期間 (ミリ秒単位)。この値には、データベースサーバーとコネクターが実行されているマシンのクロックの差が組み込まれます。 | |
| コミットされた処理済みトランザクションの数。 | |
| 最後に受信したイベントの位置。 | |
| 最後に処理されたトランザクションのトランザクション識別子。 | |
|
キューの最大バッファー (バイト単位)。このメトリクスは、 | |
| キュー内のレコードの現在の容量 (バイト単位)。 |
2.1.7.4. Debezium Db2 コネクターのスキーマ履歴の監視
MBean は debezium.db2:type=connector-metrics,context=schema-history,server= <topic.prefix>
です。
以下の表は、利用可能なスキーマ履歴メトリクスのリストです。
属性 | タイプ | 説明 |
---|---|---|
|
データベーススキーマ履歴の状態を示す | |
| リカバリーが開始された時点のエポック秒の時間。 | |
| リカバリーフェーズ中に読み取られた変更の数。 | |
| リカバリーおよびランタイム中に適用されるスキーマ変更の合計数。 | |
| 最後の変更が履歴ストアから復元された時点からの経過時間 (ミリ秒単位)。 | |
| 最後の変更が適用された時点からの経過時間 (ミリ秒単位)。 | |
| 履歴ストアから復元された最後の変更の文字列表現。 | |
| 最後に適用された変更の文字列表現。 |
2.1.8. Debezium Db2 コネクターの管理
Debezium Db2 コネクターをデプロイしたら、Debezium 管理 UDF を使用して、SQL コマンドで Db2 レプリケーション (ASN) を制御します。UDF によっては戻り値が必要な場合があります。この場合、SQL の VALUE
ステートメントを使用して呼び出します。その他の UDF には、SQL の CALL
ステートメントを使用します。
タスク | コマンドおよび注記 |
---|---|
| |
| |
| |
| |
| |
|
2.1.9. Debezium コネクターでのキャプチャーモードの Db2 テーブルのスキーマの更新
Debezium Db2 コネクターはスキーマ変更をキャプチャーできますが、スキーマを更新するには、データベース管理者と協力してコネクターが変更イベントの生成を継続するようにする必要があります。これは、Db2 がレプリケーションを実装する方法に必要です。
Db2 のレプリケーション機能は、キャプチャーモードのテーブルごとに、すべての変更が含まれる変更データテーブルをそのソーステーブルに作成します。ただし、変更データテーブルスキーマは静的です。キャプチャーモードのテーブルのスキーマを更新する場合は、対応する変更データテーブルのスキーマを更新する必要もあります。Debezium Db2 コネクターはこれを実行できません。昇格された権限を持つデータベース管理者は、キャプチャーモードのテーブルのスキーマを更新する必要があります。
同じテーブルの新しいスキーマ更新の前に、スキーマ更新の手順を完全に実行することが重要です。そのため、スキーマ更新の手順を 1 度で完了するために、すべての DDL を 1 つのバッチで実行することが推奨されます。
通常、テーブルスキーマを更新する手順は 2 つあります。
それぞれの方法に長所と短所があります。
2.1.9.1. Debezium Db2 コネクターでのオフラインスキーマ更新の実行
オフラインでスキーマの更新を行う前に、Debezium Db2 コネクターを停止します。これはより安全なスキーマ更新の手順ですが、高可用性の要件のあるアプリケーションには実現できない可能性があります。
前提条件
- スキーマの更新が必要なキャプチャーモードのテーブル 1 つ以上。
手順
- データベースを更新するアプリケーションを一時停止します。
- Debezium コネクターがストリーミングされていない変更イベントレコードをすべてストリーミングするまで待ちます。
- Debezium コネクターを停止します。
- すべての変更をソーステーブルスキーマに適用します。
-
ASN レジスターテーブルで、スキーマが更新されたテーブルを
INACTIVE
でマーク付けします。 - ASN キャプチャーサービスを再初期化します。
- キャプチャーモードからテーブルを削除するために Debezium UDF を実行 して、キャプチャーモードから古いスキーマを含まれるソーステーブルを削除します。
- テーブルをキャプチャーモードに追加するために Debezium UDF を実行 して、スキーマが新しいソーステーブルをキャプチャーモードに追加します。
-
ASN レジスターテーブルで、更新されたソーステーブルを
ACTIVE
としてマーク付けします。 - ASN キャプチャーサービスを再初期化します。
- データベースを更新するアプリケーションを再開します。
- Debezium コネクターを再起動します。
2.1.9.2. Debezium Db2 コネクターでのオンラインスキーマ更新の実行
オンラインスキーマの更新ではアプリケーションやデータ処理のダウンタイムは必要ありません。そのため、オンラインスキーマの更新を実行する前に Debezium Db2 コネクターを停止しません。また、オンラインスキーマの更新手順は、オフラインスキーマの更新手順よりも簡単です。
ただし、テーブルがキャプチャーモードの場合は、列名の変更後も Db2 レプリケーション機能は引き続き古い列名を使用します。新しい列名は、Debezium の変更イベントでは表示されません。変更イベントにある新しい列名を確認するには、コネクターを再起動する必要があります。
前提条件
- スキーマの更新が必要なキャプチャーモードのテーブル 1 つ以上。
テーブルの最後に列を追加する場合の手順
- 変更するスキーマのソーステーブルをロックします。
-
ASN レジスターテーブルで、ロックされたテーブルを
INACTIVE
としてマーク付けします。 - ASN キャプチャーサービスを再初期化します。
- ソーステーブルのスキーマにすべての変更を適用します。
- 対応する変更データテーブルのスキーマにすべての変更を適用します。
-
ASN レジスターテーブルで、ソーステーブルを
ACTIVE
としてマーク付けします。 - ASN キャプチャーサービスを再初期化します。
- 任意手順:コネクターを再起動して、変更イベントにある更新された列名を確認します。
テーブルの中に列を追加する場合の手順
- 変更するソーステーブルをロックします。
-
ASN レジスターテーブルで、ロックされたテーブルを
INACTIVE
としてマーク付けします。 - ASN キャプチャーサービスを再初期化します。
変更するソーステーブルごとに以下を行います。
- ソーステーブルのデータをエクスポートします。
- ソーステーブルを切り捨てます。
- ソーステーブルを変更して列を追加します。
- エクスポートしたデータを変更したソーステーブルに読み込みます。
- ソーステーブルの対応する変更データテーブルのデータをエクスポートします。
- 変更データテーブルを切り捨てます。
- 変更データテーブルを変更して、列を追加します。
- エクスポートしたデータを変更した変更データテーブルに読み込みます。
-
ASN レジスターテーブルで、テーブルを
INACTIVE
としてマーク付けします。これにより、古い変更データテーブルが非アクティブとしてマーク付けされるため、それらのテーブルにあるデータは保持されますが、更新されなくなります。 - ASN キャプチャーサービスを再初期化します。
- 任意手順:コネクターを再起動して、変更イベントにある更新された列名を確認します。