2.7. SQL Server の Debezium コネクター
Debezium の SQL Server コネクターは、SQL Server データベースのスキーマで発生する行レベルの変更をキャプチャーします。
このコネクターと互換性のある SQL Server のバージョンについては、Debezium でサポートされる設定ページを参照してください。
Debezium SQL Server コネクターとその使用に関する詳細は、以下を参照してください。
Debezium SQL Server コネクターが SQL Server データベースまたはクラスターに初めて接続すると、データベースのスキーマの整合性スナップショットが作成されます。コネクターは、最初のスナップショットが完了すると、CDC に対して有効になっている SQL Server データベースにコミットされた INSERT、UPDATE または DELETE 操作の行レベルの変更を継続的にキャプチャーします。コネクターは、各データ変更操作のイベントを生成し、それらのイベントを Kafka トピックにストリーミングします。コネクターは、テーブルのすべてのイベントを専用の Kafka トピックにストリーミングします。その後、アプリケーションとサービスは、そのトピックからのデータ変更イベントレコードを使用できます。
2.7.1. Debezium SQL Server コネクターの概要 リンクのコピーリンクがクリップボードにコピーされました!
Debezium SQL Server コネクターは、SQL Server 2016 Service Pack 1 (SP1) およびそれ以降 の Standard エディションまたは Enterprise エディションで利用可能な 変更データキャプチャー 機能に基づいています。SQL Server のキャプチャープロセスでは、指定のデータベースおよびテーブルを監視し、その変更をストアドプロシージャーファサードのある特別に作成された 変更テーブル に格納します。
Debezium SQL Server コネクターがデータベース操作の変更イベントレコードをキャプチャーできるようにするには、最初に SQL Server データベースで変更データキャプチャーを有効にする必要があります。データベースと、キャプチャーする各テーブルの両方で、CDC を有効にする必要があります。ソースデータベースで CDC を設定した後、コネクターはデータベースで発生する行レベルの INSERT、UPDATE および DELETE 操作をキャプチャーできます。コネクターは、各ソーステーブルの各レコードを、そのテーブル専用の Kafka トピックに書き込みます。キャプチャーされたテーブルごとに 1 つのトピックが存在します。クライアントアプリケーションは、対象のデータベーステーブルの Kafka トピックを読み取り、これらのトピックから使用する行レベルのイベントに対応できます。
コネクターが SQL Server データベースまたはクラスターに初めて接続すると、変更をキャプチャーするように設定されたすべてのテーブルのスキーマの整合性スナップショットを作成し、この状態を Kafka にストリーミングします。スナップショットの完了後、コネクターは発生する後続の行レベルの変更を継続的にキャプチャーします。最初にすべてのデータの整合性のあるビューを確立することで、コネクターはスナップショットの実行中に行われた変更を失うことなく読み取りを続行します。
Debezium SQL Server コネクターはフォールトトラレントです。コネクターは変更を読み取り、イベントを生成するため、データベースログにイベントの位置を定期的に記録します (LSN / Log Sequence Number)。コネクターが何らかの理由で停止した場合 (通信障害、ネットワークの問題、クラッシュなど)、コネクターは再起動後に最後に読み取りした場所から SQL Server CDC テーブルの読み取りを再開します。
オフセットは定期的にコミットされます。変更イベントの発生時にはコミットされません。その結果、停止後に重複するイベントが生成される可能性があります。
フォールトトレランスはスナップショットにも適用されます。つまり、スナップショット中にコネクターが停止した場合、コネクターは再起動時に新しいスナップショットを開始します。
2.7.2. Debezium SQL Server コネクターの仕組み リンクのコピーリンクがクリップボードにコピーされました!
Debezium SQL Server コネクターを最適に設定および実行するには、コネクターによるスナップショットの実行方法、変更イベントのストリーム方法、Kafka トピック名の決定方法、およびメタデータの使用方法を理解すると便利です。
コネクターの仕組みに関する詳細は、以下のセクションを参照してください。
- 「Debezium SQL Server コネクターによるデータベーススナップショットの実行方法」
- 「アドホックスナップショット」
- 「増分スナップショット」
- 「Debezium SQL Server コネクターによる変更データテーブルの読み取り方法」
- 「Debezium SQL Server 変更イベントレコードを受信する Kafka トピックのデフォルト名」
- 「Debezium SQL Server コネクターによるスキーマ変更トピックの使用方法」
- 「Debezium SQL Server コネクターのデータ変更イベントの説明」
- 「トランザクション境界を表す Debezium SQL Server コネクターによって生成されたイベント」
2.7.2.1. Debezium SQL Server コネクターによるデータベーススナップショットの実行方法 リンクのコピーリンクがクリップボードにコピーされました!
SQL Server CDC は、データベースの変更履歴を完全に保存するようには設計されていません。Debezium SQL Server コネクターでデータベースの現在の状態のベースラインを確立するためには、snapshotting と呼ばれるプロセスを使用します。最初のスナップショットは、データベース内のテーブルの構造とデータをキャプチャーします。
スナップショットの詳細は、以下のセクションを参照してください。
2.7.2.1.1. Debezium SQL Server コネクターが最初のスナップショットを実行するために使用するデフォルトのワークフロー リンクのコピーリンクがクリップボードにコピーされました!
以下のワークフローでは、Debezium がスナップショットを作成する手順を示しています。この手順では、snapshot.mode 設定プロパティーがデフォルト値 (initial) に設定されている場合のスナップショットのプロセスを説明します。snapshot.mode プロパティーの値を変更することで、コネクターがスナップショットを作成する方法をカスタマイズできます。別のスナップショットモードを設定する場合、コネクターはこのワークフローの変更バージョンを使用してスナップショットを完了します。
- データベースへの接続を確立します。
-
キャプチャーするテーブルを決定します。デフォルトでは、コネクターはすべてのシステム以外のテーブルをキャプチャーします。コネクターにテーブルまたはテーブル要素のサブセットをキャプチャーさせるには、
table.include.listやtable.exclude.listなど、データをフィルタリングするための多数のincludeおよびexcludeプロパティーを設定できます。 -
スナップショットの作成時に構造が変更されないように、CDC が有効になっている SQL Server テーブルのロックを取得します。ロックのレベルは、
snapshot.isolation.mode設定プロパティーによって決まります。 - サーバーのトランザクションログでの最大ログシーケンス番号 (LSN) の位置を読み取ります。
すべての非システム、またはキャプチャー対象として指定されたすべてのテーブルの構造をキャプチャーします。コネクターは、内部データベーススキーマ履歴トピックでこの情報を永続化します。スキーマ履歴は、変更イベントの発生時に有効な構造に関する情報を提供します。
注記デフォルトでは、コネクターは、キャプチャー用に設定されていないテーブルも含め、キャプチャーモードにあるデータベース内の全テーブルのスキーマをキャプチャーします。テーブルがキャプチャー用に設定されていない場合、最初のスナップショットはテーブルの構造のみをキャプチャーし、テーブルデータはキャプチャーされません。初期スナップショットに含まれなかったテーブルのスキーマ情報がスナップショットに保持される理由の詳細は、初期スナップショットがすべてのテーブルのスキーマをキャプチャーする理由 を参照してください。
- 必要に応じて、手順 3 で取得したロックを解放します。他のデータベースクライアントは、以前にロックされていたテーブルに書き込みできるようになります。
手順 4 で読み取った LSN の位置で、コネクターはキャプチャーするテーブルをスキャンします。スキャン中に、コネクターは次のタスクを実行します。
- スナップショットが開始される前に、テーブルが作成されたことを確認します。スナップショットの開始後にテーブルが作成された場合、コネクターはテーブルをスキップします。スナップショットが完了し、コネクターがストリーミングに移行すると、スナップショットの開始後に作成されたテーブルに対して変更イベントが発行されます。
-
テーブルからキャプチャーされた行ごとに
readイベントを生成します。すべてのreadイベントには、LSN の位置が含まれ、これは手順 4 で取得した LSN の位置と同じです。 -
テーブルの Kafka トピックに各
readイベントを出力します。
- コネクターオフセットにスナップショットの正常な完了を記録します。
作成された最初のスナップショットは、CDC に対して有効になっているテーブルの各行の現在の状態をキャプチャーします。このベースライン状態から、コネクターは発生した後続の変更をキャプチャーします。
スナップショットプロセスが開始されたら、コネクターの障害、リバランス、またはその他の理由でプロセスが中断されると、コネクターの再起動後にプロセスが再起動されます。
コネクターによって最初のスナップショットが完了した後、更新に抜けがないように、手順 4 で読み取った位置からストリーミングを続行します。
何らかの理由でコネクターが再び停止した場合に、コネクターは再起動後に最後に停止した位置から変更のストリーミングを再開します。
| 設定 | 説明 |
|---|---|
|
| 各コネクターの開始時にスナップショットを実行します。スナップショットが完了すると、コネクターは、後続のデータベース変更のに備え、イベントレコードのストリーミングを開始します。 |
|
| コネクターは 初期スナップショットを作成するためのデフォルトのワークフロー で説明されているように、データベーススナップショットを実行します。スナップショットが完了すると、コネクターは、後続のデータベース変更のに備え、イベントレコードのストリーミングを開始します。 |
|
| コネクターはデータベースのスナップショットを実行し、変更イベントレコードをストリーミングする前に停止して、それ以降の変更イベントのキャプチャを許可しません。 |
|
|
非推奨です。 |
|
|
コネクターは、デフォルトのスナップショットワークフロー で説明されているすべての手順を実行して、関連するすべてのテーブルの構造をキャプチャーします。ただし、コネクターの起動時点のデータセットを表す |
|
|
損失または破損したデータベーススキーマの履歴トピックを復元するにはこのオプションを設定します。再起動後、コネクターはソーステーブルからトピックを再構築するスナップショットを実行します。また、このプロパティーを設定して、予期しない増加が発生するデータベーススキーマ履歴トピックを定期的にプルーニングすることもできます。 + 警告: 最後のコネクターのシャットダウン後にスキーマの変更がデータベースにコミットされた場合、このモードを使用してスナップショットを実行しないでください。 |
|
| コネクターが起動した後、次のいずれかの状況を検出した場合にのみスナップショットが実行されます。
|
詳細は、コネクター設定プロパティーの表の snapshot.mode を参照してください。
2.7.2.1.2. 初期スナップショットがすべてのテーブルのスキーマ履歴をキャプチャーする理由 リンクのコピーリンクがクリップボードにコピーされました!
コネクターが実行する最初のスナップショットは、2 種類の情報をキャプチャーします。
- テーブルデータ
-
コネクターの
table.include.listプロパティーにあるテーブルのINSERT、UPDATE、およびDELETE操作に関する情報。 - スキーマデータ
- テーブルに適用される構造の変更を記述する DDL ステートメント。スキーマデータは、内部スキーマ履歴トピックとコネクターのスキーマ変更トピック (設定されている場合) の両方に保持されます。
初期スナップショットを実行すると、キャプチャー対象として指定されていないテーブルのスキーマ情報がスナップショットによってキャプチャーされることが分かります。デフォルトでは、初期スナップショットは、キャプチャー用に指定されたテーブルからだけでなく、データベースに存在するすべてのテーブルのスキーマ情報を取得するように設計されています。コネクターでは、テーブルのスキーマがスキーマ履歴トピックにある状態で、テーブルをキャプチャーする必要があります。初期スナップショットが元のキャプチャーセットの一部ではないテーブルのスキーマデータをキャプチャーできるようにして、後で必要になった場合にこれらのテーブルからイベントデータを簡単にキャプチャーできるように、Debezium はコネクターを準備します。初期スナップショットがテーブルのスキーマをキャプチャーしない場合は、コネクターがテーブルからデータをキャプチャーする前に、履歴トピックにスキーマを追加する必要があります。
場合によっては、最初のスナップショットでのスキーマキャプチャーを制限する場合があります。これは、スナップショットの完了に必要な時間の短縮に便利です。または、Debezium が複数の論理データベースにアクセスできるユーザーアカウントを使用して、データベースインスタンスに接続しているにもかかわらず、コネクターで特定の論理データベース内のテーブルからの変更のみをキャプチャーする場合にも便利です。
関連情報
- 初期スナップショットでキャプチャーされなかったテーブルからのデータのキャプチャー (スキーマ変更なし)
- 初期スナップショットでキャプチャーされなかったテーブルからのデータのキャプチャー (スキーマ変更)
-
schema.history.internal.store.only.captured.tables.ddlプロパティーを設定して、スキーマ情報をキャプチャーするテーブルを指定します。 -
schema.history.internal.store.only.captured.databases.ddlプロパティーを設定して、スキーマ変更をキャプチャーする論理データベースを指定します。
2.7.2.1.3. 初期スナップショットでキャプチャーされなかったテーブルからのデータのキャプチャー (スキーマ変更なし) リンクのコピーリンクがクリップボードにコピーされました!
コネクターを使用して、最初のスナップショットでスキーマがキャプチャーされなかったテーブルからデータをキャプチャーする場合があります。コネクターの設定によっては、最初のスナップショットはデータベース内の特定のテーブルのテーブルスキーマのみをキャプチャーする場合があります。テーブルスキーマが履歴トピックに存在しない場合、コネクターはテーブルのキャプチャーに失敗し、スキーマ欠落エラーを報告します。
テーブルからデータを取得できる場合もありますが、テーブルスキーマを追加するには別の手順を実行する必要があります。
前提条件
- コネクターにより最初のスナップショット中にキャプチャーされなかったスキーマが含まれるテーブルからデータをキャプチャーしたいと考えている。
- コネクターが読み取った最も初期の変更テーブルエントリーと、最新の変更テーブルエントリーの LSN の間で、スキーマの変更がテーブルに加えられていない。構造が変更された新しいテーブルからデータを取得する方法は、「初期スナップショットでキャプチャーされなかったテーブルからのデータのキャプチャー (スキーマ変更)」 を参照してください。
手順
- コネクターを停止します。
-
schema.history.internal.kafka.topic プロパティーで指定された内部データベーススキーマ履歴トピックを削除します。 設定された Kafka Connect
offset.storage.topic内のオフセットをクリアします。オフセットを削除する方法の詳細は、Debezium コミュニティーの FAQ を参照してください。警告オフセットの削除は、内部 Kafka Connect データの操作の経験がある上級ユーザーのみが実行してください。この操作によりシステムが破損する場合があるため、最後の手段としてのみ実行してください。
以下の変更をコネクター設定に適用します。
(オプション)
schema.history.internal.store.only.captured.tables.ddlの値をfalseに設定します。この設定により、スナップショットですべてのテーブルのスキーマがキャプチャーされ、今後、コネクターがすべてのテーブルのスキーマ履歴を再構築できるようにします。
注記すべてのテーブルのスキーマをキャプチャーするスナップショットは、完了までにさらに時間がかかります。
-
コネクターがキャプチャーするテーブルを
table.include.listに追加します。 snapshot.modeを次のいずれかの値に設定します。Initial-
コネクターを再起動すると、テーブルデータとテーブル構造をキャプチャーするデータベースの完全なスナップショットが作成されます。
このオプションを選択する場合は、コネクターがすべてのテーブルのスキーマをキャプチャーできるように、schema.history.internal.store.only.captured.tables.ddlプロパティーの値をfalseに設定することを検討してください。 schema_only- コネクターを再起動すると、テーブルスキーマのみをキャプチャーするスナップショットが作成されます。完全なデータスナップショットとは異なり、このオプションではテーブルデータはキャプチャーされません。完全なスナップショットが作成される前に、早くコネクターを再起動する場合は、このオプションを使用します。
-
コネクターを再起動します。コネクターは、
snapshot.modeで指定されたタイプのスナップショットを完了します。 (オプション) コネクターが
schema_onlyスナップショットを実行した場合、スナップショットの完了後に 増分スナップショット を開始して、追加したテーブルからデータをキャプチャーします。コネクターは、テーブルからリアルタイムの変更をストリーミングし続けながら、スナップショットを実行します。増分スナップショットを実行すると、次のデータ変更がキャプチャーされます。- コネクターが以前にキャプチャーしたテーブルの場合、増分スナップショットは、コネクターが停止している間、つまりコネクターが停止してから現在の再起動までの間に発生した変更をキャプチャーします。
- 新しく追加されたテーブルの場合、増分スナップショットは既存のテーブル行をすべてキャプチャーします。
2.7.2.1.4. 初期スナップショットでキャプチャーされなかったテーブルからのデータのキャプチャー (スキーマ変更) リンクのコピーリンクがクリップボードにコピーされました!
スキーマ変更がテーブルに適用される場合、スキーマ変更前にコミットされたレコードの構造は、変更後にコミットされたレコードとは異なります。Debezium はテーブルからデータをキャプチャーするときに、スキーマ履歴を読み取り、各イベントに正しいスキーマが適用されていることを確認します。スキーマがスキーマ履歴トピックに存在しない場合、コネクターはテーブルをキャプチャーできず、エラーが発生します。
最初のスナップショットでキャプチャーされず、テーブルのスキーマが変更されたテーブルからデータをキャプチャーする場合、スキーマがまだ使用可能でない場合は、履歴トピックにスキーマを追加する必要があります。新しいスキーマスナップショットを実行するか、テーブルの初期スナップショットを実行して、スキーマを追加できます。
前提条件
- コネクターにより最初のスナップショット中にキャプチャーされなかったスキーマが含まれるテーブルからデータをキャプチャーしたいと考えている。
- スキーマ変更がテーブルに適用されたため、キャプチャーされるレコードの構造が不均一になっている。
手順
- 初期スナップショットにすべてのテーブルのスキーマがキャプチャーされている場合 (
store.only.captured.tables.ddlはfalseに設定されました)。 -
table.include.listプロパティーを編集して、キャプチャーするテーブルを指定します。 - コネクターを再起動します。
- 新しく追加したテーブルから既存のデータをキャプチャーする場合は、増分スナップショット を開始します。
-
- 初期スナップショットにすべてのテーブルのスキーマがキャプチャーされていない場合 (
store.only.captured.tables.ddlがtrueに設定されています)。 最初のスナップショットでキャプチャーするテーブルのスキーマが保存されなかった場合は、次のいずれかの手順を実行します。
- 手順 1: スキーマスナップショット、その後に増分スナップショット
この手順では、コネクターは最初にスキーマのスナップショットを実行します。その後、増分スナップショットを開始して、コネクターがデータを同期できるようにします。
- コネクターを停止します。
-
schema.history.internal.kafka.topic プロパティーで指定された内部データベーススキーマ履歴トピックを削除します。 設定された Kafka Connect
offset.storage.topic内のオフセットをクリアします。オフセットを削除する方法の詳細は、Debezium コミュニティーの FAQ を参照してください。警告オフセットの削除は、内部 Kafka Connect データの操作の経験がある上級ユーザーのみが実行してください。この操作によりシステムが破損する場合があるため、最後の手段としてのみ実行してください。
次の手順の説明に従って、コネクター設定のプロパティーの値を設定します。
-
snapshot.modeプロパティーの値をschema_onlyに設定します。 -
table.include.listを編集して、キャプチャーするテーブルを追加します。
-
- コネクターを再起動します。
- Debezium が新規および既存のテーブルのスキーマをキャプチャーするまで待ちます。コネクターが停止した後にテーブルで発生したデータ変更はキャプチャーされません。
- データが損失されないようにするには、増分スナップショット を開始します。
- 手順 2: 初期スナップショットと、それに続くオプションの増分スナップショット
この手順では、コネクターはデータベースの完全な初期スナップショットを実行します。他の初期スナップショットと同様、多数の大きなテーブルが含まれるデータベースでは、初期スナップショットの実行操作には時間がかかる可能性があります。スナップショットの完了後、任意で増分スナップショットをトリガーして、コネクターがオフラインの間に発生した変更をキャプチャーできます。
- コネクターを停止します。
-
schema.history.internal.kafka.topic プロパティーで指定された内部データベーススキーマ履歴トピックを削除します。 設定された Kafka Connect
offset.storage.topic内のオフセットをクリアします。オフセットを削除する方法の詳細は、Debezium コミュニティーの FAQ を参照してください。警告オフセットの削除は、内部 Kafka Connect データの操作の経験がある上級ユーザーのみが実行してください。この操作によりシステムが破損する場合があるため、最後の手段としてのみ実行してください。
-
table.include.listを編集して、キャプチャーするテーブルを追加します。 次の手順の説明に従って、コネクター設定のプロパティーの値を設定します。
-
snapshot.modeプロパティーの値をinitialに設定します。 -
(オプション)
schema.history.internal.store.only.captured.tables.ddlをfalseに設定します。
-
- コネクターを再起動します。コネクターはデータベース全体のスナップショットを取得します。スナップショットが完了すると、コネクターはストリーミングに移行します。
- (オプション) コネクターがオフラインの間に変更されたデータをキャプチャーするには、増分スナップショット を開始します。
2.7.2.2. アドホックスナップショット リンクのコピーリンクがクリップボードにコピーされました!
デフォルトでは、コネクターは初回スナップショット操作の開始後にのみ実行されます。通常の状況では、この最初のスナップショットが作成されると、コネクターではスナップショットプロセスは繰り返し処理されません。コネクターがキャプチャーする今後の変更イベントデータはストリーミングプロセス経由でのみ行われます。
ただし、場合によっては、最初のスナップショット中にコネクターを取得したデータが古くなったり、失われたり、または不完全となったり可能性があります。テーブルデータを再キャプチャーするメカニズムを提供するため、Debezium にはアドホックスナップショットを実行するオプションがあります。Debezium 環境で次のいずれかの変更が発生したら、アドホックスナップショットを実行することを推奨します。
- コネクター設定は、異なるテーブルセットをキャプチャーするように変更されます。
- Kafka トピックを削除して、再構築する必要があります。
- 設定エラーや他の問題が原因で、データの破損が発生します。
アドホックと呼ばれるスナップショット を開始することで、以前にスナップショットをキャプチャーしたテーブルのスナップショットを再実行できます。アドホックスナップショットには、シグナルテーブル を使用する必要があります。シグナルリクエストを Debezium シグナルテーブルに送信して、アドホックスナップショットを開始します。
既存のテーブルのアドホックスナップショットを開始すると、コネクターはテーブルにすでに存在するトピックにコンテンツを追加します。既存のトピックが削除された場合には、トピックの自動作成 が有効になっているのであれば、Debezium は自動的にトピックを作成できます。
アドホックのスナップショットシグナルは、スナップショットに追加するテーブルを指定します。スナップショットは、データベースの内容全体をキャプチャーしたり、データベース内のテーブルのサブセットのみをキャプチャーしたりできます。また、スナップショットは、データベース内のテーブルの内容のサブセットをキャプチャできます。
execute-snapshot メッセージをシグナルテーブルに送信してキャプチャーするテーブルを指定します。execute-snapshot シグナルのタイプを incremental または blocking に設定し、スナップショットに含めるテーブルの名前を次の表に示すように指定します。
| フィールド | デフォルト | 値 |
|---|---|---|
|
|
|
実行するスナップショットのタイプを指定します。 |
|
| 該当なし |
スナップショットに含めるテーブルの完全修飾名に一致する正規表現を含む配列。 |
|
| 該当なし |
コネクターがスナップショットに含めるレコードのサブセットを決定するために評価する追加条件のセットを指定する、オプションの配列。
|
|
| 該当なし | スナップショット処理中にコネクターがテーブルのプライマリーキーとして使用する列名を指定するオプションの文字列。 |
アドホック増分スナップショットのトリガー
アドホック増分スナップショットを開始するには、execute-snapshot シグナルタイプのエントリーをシグナリングテーブルに追加するか、シグナルメッセージを Kafka シグナリングトピックに送信します。コネクターがメッセージを処理した後に、スナップショット操作を開始します。スナップショットプロセスは、最初と最後のプライマリーキーの値を読み取り、これらの値を各テーブルの開始ポイントおよびエンドポイントとして使用します。テーブルのエントリー数と設定されたチャンクサイズに基づいて、Debezium はテーブルをチャンクに分割し、チャンクごとに 1 度に 1 つずつスナップショットを順番に作成していきます。
詳細は、スナップショットの増分 を参照してください。
アドホックブロッキングスナップショットのトリガー
シグナリングテーブルまたはシグナリングトピックに、execute-snapshot シグナルタイプを持つエントリーを追加することによって、アドホックブロッキングスナップショットを開始します。コネクターがメッセージを処理した後に、スナップショット操作を開始します。コネクターはストリーミングを一時的に停止し、初期スナップショットの時と同じプロセスに従って、指定されたテーブルのスナップショットを開始します。スナップショットが完了すると、コネクターはストリーミングを再開します。
詳細は、ブロッキングスナップショット を参照してください。
2.7.2.3. 増分スナップショット リンクのコピーリンクがクリップボードにコピーされました!
各 SQL Server サーバーまたはデータベースは、特定の 照合順序 を使用するように設定されています。これにより、文字データの保存、並べ替え、比較、および表示方法が決まります。一部の照合順序セット (SQL Server 照合順序 (SQL_*) など) の並べ替えルールは、Unicode 並べ替えアルゴリズムと互換性がありません。場合によっては、互換性のない並べ替えルールが原因で、コネクターによるアドホックスナップショットの実行時にデータが失われる可能性があります。たとえば、SQL Server が文字列を Unicode として送信するように設定されている場合 (つまり、接続プロパティー sendStringParametersAsUnicode が true に設定されている場合)、コネクターはスナップショット中にレコードをスキップする可能性があります。アドホックスナップショット中にデータが失われないようにするには、driver.sendStringParametersAsUnicode 接続文字列プロパティーの値を false に設定します。
sendStringParametersAsUnicode プロパティーの使用方法の詳細は、SQL Server 接続プロパティーのドキュメント を参照してください。
スナップショットを柔軟に管理するため、Debezium には 増分スナップショット と呼ばれる補助スナップショットメカニズムが含まれています。増分スナップショットは、Debezium コネクターにシグナルを送信 するための Debezium メカニズムに依存します。
増分スナップショットでは、最初のスナップショットのように、データベースの完全な状態を一度にすべてキャプチャーする代わりに、一連の設定可能なチャンクで各テーブルを段階的にキャプチャーします。スナップショットがキャプチャーするテーブルと、各チャンクのサイズ を指定できます。チャンクのサイズにより、データベース上の各フェッチ操作中にスナップショットで収集される行数が決まります。増分スナップショットのデフォルトのチャンクサイズは 1024 行です。
増分スナップショットが進むと、Debezium はウォーターマークを使用して進捗を追跡し、キャプチャーする各テーブル行のレコードを管理します。この段階的なアプローチでは、標準の初期スナップショットプロセスと比較して、以下の利点があります。
- スナップショットが完了するまで、ストリーミングストリーミングを延期する代わりに、ストリームしたデータキャプチャーと並行して増分スナップショットを実行できます。コネクターはスナップショットプロセス全体で変更ログからのほぼリアルタイムイベントをキャプチャーし続け、他の操作はブロックしません。
- 増分スナップショットの進捗が中断された場合は、データを失うことなく再開できます。プロセスが再開すると、スナップショットは最初からテーブルをキャプチャーするのではなく、停止した時点から開始します。
-
いつでも増分スナップショットを実行し、必要に応じてプロセスを繰り返してデータベースの更新に適合できます。たとえば、コネクター設定を変更してテーブルを
table.include.listプロパティーに追加した後にスナップショットを再実行します。
増分スナップショットプロセス
増分スナップショットを実行する場合には、Debezium は各テーブルをプライマリーキー別に分類して、設定されたチャンクサイズ に基づいてテーブルをチャンクに分割します。チャンクごとに作業し、テーブルの行ごとにチャンクでキャプチャーします。キャプチャーする行ごとに、スナップショットは READ イベントを出力します。そのイベントは、対象となるチャンクのスナップショットを開始する時の行の値を表します。
スナップショットの作成が進むにつれ、他のプロセスがデータベースへのアクセスを継続し、テーブルレコードが変更される可能性があります。このような変更を反映させるように、通常通りに INSERT、UPDATE、DELETE 操作がトランザクションログにコミットされます。同様に、継続中の Debezium ストリーミングプロセスは、これらの変更イベントを検出し、対応する変更イベントレコードを Kafka に出力します。
Debezium を使用してプライマリーキーが同じレコード間での競合を解決する方法
場合によっては、ストリーミングプロセスが出力する UPDATE または DELETE イベントを順番に受信できます。つまり、ストリーミングプロセスは、スナップショットがその行の READ イベントが含まれるチャンクをキャプチャーする前に、テーブルの行を変更するイベントを生成する可能性があります。スナップショットが最終的に対象の行にあった READ イベントを出力すると、その値はすでに置き換えられています。Debezium は、シーケンスが到達する増分スナップショットイベントが正しい論理順序で処理されるように、競合を解決するためにバッファースキームを使用します。スナップショットのイベント間で競合が発生し、ストリームされたイベントが解決されてからでないと、Debezium はイベントのレコードを Kafka に送信しません。
スナップショットウィンドウ
遅れて入ってきた READ イベントと、同じテーブルの行を変更するストリーミングイベント間の競合の解決を容易にするために、Debezium は スナップショットウィンドウ と呼ばれるものを使用します。スナップショットウィンドウは、増分スナップショットが指定のテーブルチャンクのデータをキャプチャーしている途中に、間隔を決定します。チャンクのスナップショットウィンドウを開く前に、Debezium は通常の動作に従い、トランザクションログから直接ターゲットの Kafka トピックにイベントをダウンストリームに出力します。ただし、特定のチャンクのスナップショットが開放された瞬間から終了するまで、Debezium は重複除去のステップを実行して、プライマリーキーが同じイベント間での競合を解決します。
データコレクションごとに、Debezium は 2 種類のイベントを出力し、それらの両方のレコードを単一の宛先 Kafka トピックに保存します。テーブルから直接キャプチャーするスナップショットレコードは、READ 操作として出力されます。その間、ユーザーはデータコレクションのレコードの更新を続け、各コミットを反映するようにトランザクションログが更新されるので、Debezium は変更ごとに UPDATE または DELETE 操作を出力します。
スナップショットウィンドウが開放され、Debezium がスナップショットチャンクの処理を開始すると、スナップショットレコードをメモリーバッファーに提供します。スナップショットウィンドウ中に、バッファー内の READ イベントのプライマリーキーは、受信ストリームイベントのプライマリーキーと比較されます。一致するものが見つからない場合、ストリーミングされたイベントレコードが Kafka に直接送信されます。Debezium が一致を検出すると、バッファーされた READ イベントを破棄し、ストリーミングされたレコードを宛先トピックに書き込みます。これは、ストリーミングされたイベントが静的スナップショットイベントよりも論理的に優先されるためです。チャンクのスナップショットウィンドウが終了すると、バッファーに含まれるのは、関連するトランザクションログイベントが存在しない READ イベントのみです。Debezium は、これらの残りの READ イベントをテーブルの Kafka トピックに出力します。
コネクターは各スナップショットチャンクにプロセスを繰り返します。
現在、増分スナップショットを開始するには、次のいずれかの方法を使用できます。
SQL Server の Debezium コネクターでは、増分スナップショットの実行中のスキーマの変更はサポートしません。
2.7.2.3.1. 増分スナップショットのトリガー リンクのコピーリンクがクリップボードにコピーされました!
増分スナップショットを開始するには、ソースデータベースのシグナリングテーブルに アドホックスナップショットシグナル を送信します。スナップショットシグナルは SQL INSERT クエリーとして送信します。
Debezium がシグナルテーブルの変更を検出すると、シグナルを読み取り、要求されたスナップショット操作を実行します。
送信するクエリーはスナップショットに追加するテーブルを指定し、必要に応じてスナップショット操作の種類を指定します。Debezium は現在、incremental と blocking のスナップショットタイプをサポートしています。
スナップショットに追加するテーブルを指定するには、テーブルをリストする data-collections 配列またはテーブルの照合に使用する正規表現の配列を指定します。以下に例を示します。
{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}
増分スナップショットシグナルの data-collections アレイにはデフォルト値がありません。data-collections 配列が空の場合、Debezium は空の配列をアクションが必要ないと解釈し、スナップショットは作成しません。
スナップショットに含めるテーブルの名前にドット (.)、スペース、またはその他の英数字以外の文字が含まれている場合は、テーブル名を二重引用符でエスケープする必要があります。
たとえば、db1 データベースの public スキーマに存在し、My.Table という名前のテーブルを含めるには、"db1.public.\"My.Table\"" の形式を使用します。
前提条件
- ソースデータベースにシグナリングデータコレクションが存在する。
-
シグナルデータコレクションが
signal.data.collectionプロパティーで指定されている。
ソースシグナリングチャネルを使用して増分スナップショットをトリガーする
SQL クエリーを送信し、アドホック増分スナップショット要求をシグナルテーブルに追加します。
INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"<snapshotType>","additional-conditions":[{"data-collection": "<fullyQualfiedTableName>", "filter": "<additional-condition>"}]}');INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"<snapshotType>","additional-conditions":[{"data-collection": "<fullyQualfiedTableName>", "filter": "<additional-condition>"}]}');Copy to Clipboard Copied! Toggle word wrap Toggle overflow 以下に例を示します。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow コマンドの
id、type、およびdataパラメーターの値は、シグナルテーブルのフィールド に対応します。
以下の表では、この例のパラメーターを説明しています。Expand 表2.164 シグナルテーブルに増分スナップショットシグナルを送信する SQL コマンドのフィールドの説明 項目 値 説明 1
database.schema.debezium_signalソースデータベースにあるシグナルテーブルの完全修飾名を指定します。
2
ad-hoc-1idパラメーターは、シグナルリクエストのID識別子として割り当てられる任意の文字列を指定します。
この文字列を使用して、シグナルテーブルのエントリーへのログメッセージを特定します。Debezium はこの文字列を使用しません。代わりに、スナップショット作成中に、Debezium は独自のID文字列をウォーターマークシグナルとして生成します。3
execute-snapshottypeパラメーターを指定し、シグナルがトリガーする操作を指定します。
4
data-collectionsシグナルの
dataフィールドの必須コンポーネントで、スナップショットに含めるテーブル名の配列またはテーブル名と一致する正規表現を指定します。
配列には、database.schema.table形式を使用してテーブルの完全修飾名と一致する正規表現がリストされます。この形式は、コネクターの シグナリングテーブル の名前を指定するために使用する形式と同じです。5
incremental実行するスナップショット操作のタイプを指定する、シグナルの
dataフィールドのオプションのtypeコンポーネント。
有効な値はincrementalとblockingです。
値を指定しない場合、コネクターはデフォルトで増分スナップショットを実行します。6
additional-conditionsコネクターがスナップショットに含めるレコードのサブセットを決定するために評価する追加条件のセットを指定する、オプションの配列。
各追加条件は、data-collectionプロパティーとfilterプロパティーを持つオブジェクトです。データの収集単位で異なるフィルターを指定できます。
*data-collectionプロパティーは、フィルターが適用されるデータコレクションの完全修飾名です。additional-conditionsパラメーターの詳細は、「additional-conditions付きでアドホック増分スナップショットを実行する」 を参照してください。
2.7.2.3.2. additional-conditions 付きでアドホック増分スナップショットを実行する リンクのコピーリンクがクリップボードにコピーされました!
スナップショットに、テーブル内のコンテンツのサブセットのみを含める場合は、スナップショットシグナルに additional-conditions パラメーターを追加してシグナル要求を変更できます。
一般的なスナップショットの SQL クエリーは、以下の形式を取ります。
SELECT * FROM <tableName> ....
SELECT * FROM <tableName> ....
additional-conditions パラメーターを追加して、以下の例のように WHERE 条件を SQL クエリーに追加します。
SELECT * FROM <data-collection> WHERE <filter> ....
SELECT * FROM <data-collection> WHERE <filter> ....
以下の例は、シグナルテーブルに追加の条件を含むアドホック増分スナップショット要求を送信する SQL クエリーを示しています。
INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"<snapshotType>","additional-conditions":[{"data-collection": "<fullyQualfiedTableName>", "filter": "<additional-condition>"}]}');
INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"<snapshotType>","additional-conditions":[{"data-collection": "<fullyQualfiedTableName>", "filter": "<additional-condition>"}]}');
たとえば、以下の列が含まれる products テーブルがあるとします。
-
id(プライマリーキー) -
color -
quantity
products テーブルの増分スナップショットに color=blue のデータ項目のみを含める場合は、次の SQL ステートメントを使用してスナップショットをトリガーできます。
INSERT INTO db1.myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["db1.schema1.products"],"type":"incremental", "additional-conditions":[{"data-collection": "db1.schema1.products", "filter": "color=blue"}]}');
INSERT INTO db1.myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["db1.schema1.products"],"type":"incremental", "additional-conditions":[{"data-collection": "db1.schema1.products", "filter": "color=blue"}]}');
additional-conditions パラメーターを使用すると、列が 2 つ以上となる条件を指定することもできます。たとえば、前述の例の products テーブルを使用して、color=blue および quantity>10 だけに一致するアイテムのみのデータが含まれる増分スナップショットをトリガーするクエリーを送信できます。
INSERT INTO db1.myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["db1.schema1.products"],"type":"incremental", "additional-conditions":[{"data-collection": "db1.schema1.products", "filter": "color=blue AND quantity>10"}]}');
INSERT INTO db1.myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["db1.schema1.products"],"type":"incremental", "additional-conditions":[{"data-collection": "db1.schema1.products", "filter": "color=blue AND quantity>10"}]}');
以下の例は、コネクターによってキャプチャーされる増分スナップショットイベントの JSON を示しています。
例2.48 増分スナップショットイベントメッセージ
| 項目 | フィールド名 | 説明 |
|---|---|---|
| 1 |
|
実行するスナップショット操作タイプを指定します。 |
| 2 |
|
イベントタイプを指定します。 |
2.7.2.3.3. Kafka シグナルチャネルを使用して増分スナップショットをトリガーする リンクのコピーリンクがクリップボードにコピーされました!
設定された Kafka トピック にメッセージを送信して、コネクターにアドホック増分スナップショットを実行するよう要求できます。
Kafka メッセージのキーは、topic.prefix コネクター設定オプションの値と一致する必要があります。
メッセージの値は、type と data フィールドが含まれる JSON オブジェクトとなっています。
シグナルタイプは execute-snapshot で、data フィールドには以下のフィールドが必要です。
| フィールド | デフォルト | 値 |
|---|---|---|
|
|
|
実行するスナップショットのタイプ。現在、Debezium は |
|
| 該当なし |
スナップショットに含めるテーブルの完全修飾名と一致する、コンマ区切りの正規表現の配列。 |
|
| 該当なし |
コネクターがスナップショットに含めるレコードのサブセットを指定するために評価する基準を指定する、オプションの追加条件の配列。 |
例2.49 execute-snapshot Kafka メッセージ
Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["{collection-container}.table1", "{collection-container}.table2"], "type": "INCREMENTAL"}}`
Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["{collection-container}.table1", "{collection-container}.table2"], "type": "INCREMENTAL"}}`
additional-conditions 付きのアドホック増分スナップショット
Debezium は additional-conditions フィールドを使用してテーブルのコンテンツのサブセットを選択します。
通常、Debezium はスナップショットを実行するときに、次のような SQL クエリーを実行します。
SELECT * FROM <tableName> ….
スナップショット要求に additional-conditions プロパティーが含まれている場合、プロパティーの data-collection および filter パラメーターが SQL クエリーに追加されます。次に例を示します。
SELECT * FROM <data-collection> WHERE <filter> ….
たとえば、列 id (プライマリーキー)、color、および brand を含む products テーブルがある場合、スナップショットに color='blue' のコンテンツのみを含める場合は、スナップショットをリクエストするときに、コンテンツをフィルタリングする additional-conditions プロパティーを追加することができます。
Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["db1.schema1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "db1.schema1.products" ,"filter":"color='blue'"}]}}`
Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["db1.schema1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "db1.schema1.products" ,"filter":"color='blue'"}]}}`
また、additional-conditions プロパティーを使用して、複数の列に基づいて条件を渡すこともできます。たとえば、前の例と同じ products テーブルを使用して、color='blue' および brand='MyBrand' である products テーブルのコンテンツのみをスナップショットに含める場合は、次のリクエストを送信できます。
Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["db1.schema1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "db1.schema1.products" ,"filter":"color='blue' AND brand='MyBrand'"}]}}`
Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["db1.schema1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "db1.schema1.products" ,"filter":"color='blue' AND brand='MyBrand'"}]}}`
2.7.2.3.4. 増分スナップショットの停止 リンクのコピーリンクがクリップボードにコピーされました!
状況によっては、増分スナップショットを停止する必要がある場合があります。たとえば、スナップショットが正しく設定されていない場合や、他のデータベース操作にリソースが使用可能であるこのとの確認が必要な場合があります。ソースデータベースのシグナリングテーブルにシグナルを送信することで、すでに実行中のスナップショットを停止できます。
スナップショット停止信号をシグナリングテーブルに送信するには、SQL INSERT クエリーで送信します。stop-snapshot シグナルは、スナップショット操作の type を incremental として指定し、オプションで、現在実行中のスナップショットから省略するテーブルを指定します。Debezium はシグナルテーブルの変更を検出した後、シグナルを読み、増分スナップショット操作が進行中であればそれを停止します。
関連情報
また、JSON メッセージを Kafka シグナリングトピック に送信して、増分スナップショットを停止することもできます。
前提条件
- ソースデータベースにシグナリングデータコレクションが存在する。
-
シグナルデータコレクションが
signal.data.collectionプロパティーで指定されている。
ソースシグナリングチャネルを使用して増分スナップショットを停止する
SQL クエリーを送信して、シグナリングテーブルへのアドホックインクリメンタルスナップショットを停止します。
INSERT INTO <signalTable> (id, type, data) values ('<id>', 'stop-snapshot', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"incremental"}');INSERT INTO <signalTable> (id, type, data) values ('<id>', 'stop-snapshot', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"incremental"}');Copy to Clipboard Copied! Toggle word wrap Toggle overflow 以下に例を示します。
INSERT INTO db1.myschema.debezium_signal (id, type, data) values ('ad-hoc-1', 'stop-snapshot', '{"data-collections": ["db1.schema1.table1", "db1.schema1.table2"], "type":"incremental"}');INSERT INTO db1.myschema.debezium_signal (id, type, data)1 values ('ad-hoc-1',2 'stop-snapshot',3 '{"data-collections": ["db1.schema1.table1", "db1.schema1.table2"],4 "type":"incremental"}');5 Copy to Clipboard Copied! Toggle word wrap Toggle overflow signal コマンドの
id、type、およびdataパラメーターの値は、シグナリングテーブルのフィールド に対応します。
以下の表では、この例のパラメーターを説明しています。Expand 表2.167 シグナリングテーブルに増分スナップショット停止信号を送信するための SQL コマンドのフィールドの説明 項目 値 説明 1
database.schema.debezium_signalソースデータベースにあるシグナルテーブルの完全修飾名を指定します。
2
ad-hoc-1idパラメーターは、シグナルリクエストのID識別子として割り当てられる任意の文字列を指定します。
この文字列を使用して、シグナルテーブルのエントリーへのログメッセージを特定します。Debezium はこの文字列を使用しません。3
stop-snapshottypeパラメーターを指定し、シグナルがトリガーする操作を指定します。
4
data-collectionsシグナルの
dataフィールドのオプションコンポーネントで、スナップショットから削除するテーブル名の配列またはテーブル名とマッチする正規表現を指定します。
配列には、database.schema.tableの形式で完全修飾名でテーブルに一致する正規表現がリストされます。dataフィールドからこのコンポーネントを省略すると、シグナルによって進行中の増分スナップショット全体が停止されます。5
incremental停止するスナップショット操作のタイプを指定する信号の
dataフィールドの必須コンポーネント。
現在、有効な唯一のオプションはincrementalです。typeの値を指定しない場合、シグナルは増分スナップショットの停止に失敗します。
2.7.2.3.5. Kafka シグナリングチャネルを使用して増分スナップショットを停止する リンクのコピーリンクがクリップボードにコピーされました!
設定された Kafka シグナリングトピック にシグナルメッセージを送信して、アドホック増分スナップショットを停止できます。
Kafka メッセージのキーは、topic.prefix コネクター設定オプションの値と一致する必要があります。
メッセージの値は、type と data フィールドが含まれる JSON オブジェクトとなっています。
シグナルタイプは stop-snapshot で、data フィールドには以下のフィールドが必要です。
| フィールド | デフォルト | 値 |
|---|---|---|
|
|
|
実行するスナップショットのタイプ。現在、Debezium は |
|
| 該当なし |
テーブルの完全修飾名に一致する、コンマで区切られた正規表現のオプションの配列、スナップショットから削除するテーブル名に一致するテーブル名または正規表現の配列。 |
次の例は、典型的な stop-snapshot の Kafka メッセージを示しています。
Key = `test_connector`
Value = `{"type":"stop-snapshot","data": {"data-collections": ["db1.schema1.table1", "db1.schema1.table2"], "type": "INCREMENTAL"}}`
Key = `test_connector`
Value = `{"type":"stop-snapshot","data": {"data-collections": ["db1.schema1.table1", "db1.schema1.table2"], "type": "INCREMENTAL"}}`
2.7.2.4. ブロッキングスナップショット リンクのコピーリンクがクリップボードにコピーされました!
スナップショットをより柔軟に管理するために、Debezium には ブロッキングスナップショット と呼ばれる補助アドホックスナップショットメカニズムが含まれています。ブロッキングスナップショットは、Debezium コネクターにシグナルを送信 するための Debezium メカニズムに依存します。
ブロッキングスナップショットは、ランタイム時にトリガーできることを除いて、初期スナップショット と同じように動作します。
次のような状況では、標準の初期スナップショットプロセスを使用するのではなく、ブロッキングスナップショットを実行する必要があります。
- 新しいテーブルを追加し、コネクターの実行中にスナップショットを完了したいと考えている。
- 大きなテーブルを追加し、増分スナップショットよりも短い時間でスナップショットを完了したいと考えている。
ブロッキングスナップショットのプロセス
ブロッキングスナップショットを実行すると、Debezium はストリーミングを停止し、初期スナップショットの時と同じプロセスに従って、指定されたテーブルのスナップショットを開始します。スナップショットが完了すると、ストリーミングが再開されます。
スナップショットの設定
シグナルの data コンポーネントでは、次のプロパティーを設定できます。
- data-collections: スナップショットする必要のあるテーブルを指定します。
-
data-collections: スナップショットに含めるテーブルを指定します。
このプロパティーは、完全修飾テーブル名に一致する正規表現のコンマ区切りリストを受け入れます。プロパティーの動作は、ブロッキングスナップショットでキャプチャーするテーブルを指定するtable.include.listプロパティーの動作と似ています。 additional-conditions: テーブルごとに異なるフィルターを指定できます。
-
data-collectionプロパティーは、フィルターが適用されるテーブルの完全修飾名であり、データベースに応じて大文字と小文字を区別するか、区別しないかを指定できます。 -
filterプロパティーは、snapshot.select.statement.overridesで使用される値と同じものが設定されます。これは、大文字小文字を区別して一致させる必要があるテーブルの完全修飾名です。
-
以下に例を示します。
{"type": "blocking", "data-collections": ["schema1.table1", "schema1.table2"], "additional-conditions": [{"data-collection": "schema1.table1", "filter": "SELECT * FROM [schema1].[table1] WHERE column1 = 0 ORDER BY column2 DESC"}, {"data-collection": "schema1.table2", "filter": "SELECT * FROM [schema1].[table2] WHERE column2 > 0"}]}
{"type": "blocking", "data-collections": ["schema1.table1", "schema1.table2"], "additional-conditions": [{"data-collection": "schema1.table1", "filter": "SELECT * FROM [schema1].[table1] WHERE column1 = 0 ORDER BY column2 DESC"}, {"data-collection": "schema1.table2", "filter": "SELECT * FROM [schema1].[table2] WHERE column2 > 0"}]}
重複の可能性
スナップショットをトリガーするシグナルを送信した時点と、ストリーミングが停止してスナップショットが開始する時点との間に遅延が生じる可能性があります。この遅延の結果、スナップショットが完了した後、コネクターはスナップショットがキャプチャーしたレコードと重複するイベントレコードを発行する可能性があります。
2.7.2.5. Debezium SQL Server コネクターによる変更データテーブルの読み取り方法 リンクのコピーリンクがクリップボードにコピーされました!
コネクターが最初に起動すると、キャプチャーされたテーブルの構造のスナップショットを作成し、その情報を内部データベーススキーマ履歴トピックに対して永続化します。その後、コネクターは各ソーステーブルの変更テーブルを特定し、以下の手順を完了します。
- コネクターは、変更テーブルごとに、最後に保存された最大 LSN と現在の最大 LSN の間に作成された変更をすべて読み取ります。
- コネクターは、コミット LSN と変更 LSN の値を基にして、読み取る変更を昇順で並び替えします。この並べ替えの順序により、変更はデータベースで発生した順序で Debezium によって再生されるようになります。
- コネクターは、コミット LSN および変更 LSN をオフセットとして Kafka Connect に渡します。
- コネクターは最大 LSN を保存し、ステップ 1 からプロセスを再開します。
再開後、コネクターは読み取った最後のオフセット (コミットおよび変更 LSN) から処理を再開します。
コネクターは、含まれるソーステーブルに対して CDC が有効または無効化されているかどうかを検出し、その動作を調整することができます。
2.7.2.6. データベースでの最大 LSN の記録なし リンクのコピーリンクがクリップボードにコピーされました!
次の理由により、最大 LSN がデータベースに記録されない場合があります。
- SQL Server エージェントが実行されていない
- 変更テーブルにまだ変更が記録されていない
- データベースのアクティビティーが少なく、cdc クリーンアップジョブで cdc テーブルから定期的にエントリーが消去される
これらの可能性のうち、実行中の SQL Server エージェントが前提条件であるため、実際には No 1. は問題です (No 2. と 3. は正常です)。
この問題を軽減し、No 1. と他の問題を区別するために、"SELECT CASE WHEN dss.[status]=4 THEN 1 ELSE 0 END AS isRunning FROM [#db].sys.dm_server_services dss WHERE dss.[servicename] LIKE N’SQL Server Agent (%';". のクエリーを使用して SQL Server エージェントのステータスをチェックします。SQL Server Agent が実行されていない場合に、ログに "No maximum LSN recorded in the database; SQL Server Agent is not running" というエラーが書き込まれます。
ステータスクエリーを実行する SQL Server には、VIEW SERVER STATE のサーバーパーミッションが必要です。設定したユーザーにこのパーミッションを付与する必要がない場合は、database.sqlserver.agent.status.query プロパティーで独自のクエリーを設定できます。SQL Server Agent が実行中 (false または 0) で、What minimum permissions do I need to provide to a user so that it can check the status of SQL Server Agent Service? または Safely and Easily Use High-Level Permissions Without Granting Them to Anyone: Server-level で説明されているように、高度なパーミッションを付与せずに安全に使用している場合に、True または 1 を返す関数を定義できます。クエリープロパティーの設定は、database.sqlserver.agent.status.query=SELECT [#db].func_is_sql_server_agent_running() のようになります。[#db] は、データベース名のプレースホルダーとして使用する必要があります。
2.7.2.7. Debezium SQL Server コネクターの制限事項 リンクのコピーリンクがクリップボードにコピーされました!
SQL Server では、変更キャプチャのインスタンスを作成するために、ベースオブジェクトがテーブルであることが特に必要です。そのため、インデックス付きビュー (別名: マテリアライズドビュー) からの変更の取り込みは、SQL Server ではサポートされておらず、したがって Debezium SQL Server コネクターもサポートされていません。
2.7.2.8. Debezium SQL Server 変更イベントレコードを受信する Kafka トピックのデフォルト名 リンクのコピーリンクがクリップボードにコピーされました!
デフォルトでは、SQL Server コネクターは、テーブルで発生するすべての INSERT、UPDATE、DELETE 操作のイベントを、そのテーブルに固有の単一の Apache Kafka トピックに書き込みます。コネクターは、<topicPrefix>.<schemaName>.<tableName> の規則を使用して変更イベントトピックに名前を付けます。
以下のリストは、デフォルト名のコンポーネントの定義を示しています。
- topicPrefix
-
topic.prefix設定プロパティーで指定したサーバーの論理名です。 - schemaName
- 変更イベントが発生したデータベーススキーマの名前。
- tableName
- 変更イベントが発生したデータベーステーブルの名前。
たとえば、fulfillment が論理サーバー名、dbo がスキーマ名で、データベースに products、products_on_hand、customers、orders という名前のテーブルがある場合、コネクターは変更イベントレコードを次の Kafka トピックにストリーミングします。
-
fulfillment.testDB.dbo.products -
fulfillment.testDB.dbo.products_on_hand -
fulfillment.testDB.dbo.customers -
fulfillment.testDB.dbo.orders
コネクターは同様の命名規則を適用して、内部データベーススキーマの履歴トピック (スキーマ変更トピック と トランザクションメタデータトピック) にラベルを付けます。
デフォルトのトピック名が要件を満たさない場合は、カスタムトピック名を設定できます。カスタムトピック名を設定するには、論理トピックルーティング SMT に正規表現を指定します。論理トピックルーティング SMT を使用してトピックの命名をカスタマイズする方法は、トピックルーティング を参照してください。
2.7.2.9. Debezium SQL Server コネクターがデータベーススキーマの変更を処理する方法 リンクのコピーリンクがクリップボードにコピーされました!
データベースクライアントがデータベースのクエリーを行うと、クライアントはデータベースの現在のスキーマを使用します。しかし、データベーススキーマはいつでも変更が可能です。そのため、挿入、更新、または削除の操作が記録されるたびに、コネクターはどのスキーマであるかを特定できる必要があります。また、コネクターは必ずしも現在のスキーマをすべてのイベントに適用できるとは限りません。イベントが比較的古い場合は、現在のスキーマが適用される前に記録された可能性があります。
スキーマ変更後に発生する変更イベントを正しく処理するために、Debezium SQL Server コネクターは、関連するデータテーブルの構造をミラーリングする SQL Server 変更テーブルの構造に基づいて、新しいスキーマのスナップショットを保存します。コネクターは、データベーススキーマ履歴 Kafka トピックに、スキーマ変更の結果 (複数操作の LSN) と合わせてテーブルのスキーマ情報を保存します。コネクターは、保管されたスキーマ表現を使用して、挿入、更新、または削除の各操作時にテーブルの構造を正しくミラーリングする変更イベントを生成します。
クラッシュまたは正常に停止した後にコネクターが再起動すると、最後に読み取った位置から SQL Server CDC テーブル内のエントリーの読み取りを再開します。コネクターがデータベーススキーマ履歴トピックから読み取るスキーマ情報を基に、コネクターが再起動する場所に存在したテーブル構造を適用します。
キャプチャーモードの Db2 テーブルのスキーマを更新する場合は、対応する変更テーブルのスキーマも更新することが重要です。データベーススキーマを更新するには、昇格権限のある SQL Server データベース管理者である必要があります。Debezium 環境での SQL Server データベーススキーマの更新の詳細は、データベーススキーマの進化 を参照してください。
データベーススキーマ履歴トピックは、内部コネクター専用となっています。オプションで、コネクターは コンシューマーアプリケーション向けの別のトピックにスキーマ変更イベントを送信する こともできます。
関連情報
- Debezium イベントレコードを受信する トピックのデフォルト名。
2.7.2.10. Debezium SQL Server コネクターによるスキーマ変更トピックの使用方法 リンクのコピーリンクがクリップボードにコピーされました!
CDC が有効になっているテーブルごとに、Debezium SQL Server コネクターは、データベース内のテーブルに適用されたスキーマ変更イベントの履歴を保存します。コネクターはスキーマ変更イベントを <topicPrefix> という名前の Kafka トピックに書き込みます。ここで、topicPrefix は topic.prefix 設定プロパティーで指定された論理サーバー名です。
コネクターがスキーマ変更トピックに送信するメッセージには、ペイロードと、任意で変更イベントメッセージのスキーマが含まれます。
スキーマ変更イベントのスキーマには、次の要素があります。
name- スキーマ変更イベントメッセージの名前。
type- 変更イベントメッセージのタイプ。
version- スキーマのバージョン。バージョンは整数で、スキーマが変更されるたびに増加します。
fields- 変更イベントメッセージに含まれるフィールド。
例: SQL Server コネクタースキーマ変更トピックのスキーマ
次の例は、JSON 形式の一般的なスキーマを示しています。
スキーマ変更イベントメッセージのペイロードには、以下の要素が含まれます。
databaseName-
ステートメントが適用されるデータベースの名前。
databaseNameの値は、メッセージキーとして機能します。 tableChanges-
スキーマの変更後のテーブルスキーマ全体の構造化表現。
tableChangesフィールドには、テーブルの各列のエントリーなどのアレイが含まれます。構造化された表現は JSON または Avro 形式でデータを表示するため、コンシューマーは DDL パーサーを介して最初にメッセージを処理しなくてもメッセージを簡単に読み取りできます。
コネクターがテーブルをキャプチャーするように設定されている場合、テーブルのスキーマ変更の履歴は、スキーマ変更トピックだけでなく、内部データベーススキーマの履歴トピックにも格納されます。内部データベーススキーマ履歴トピックはコネクターのみの使用を対象としており、使用するアプリケーションによる直接使用を目的としていません。スキーマ変更に関する通知が必要なアプリケーションが、スキーマ変更トピックからの情報のみを使用するようにしてください。
コネクターがスキーマ変更トピックに出力するメッセージの形式は、初期の状態であり、通知なしに変更される可能性があります。
Debezium は、以下のイベントの発生時にスキーマ変更トピックにメッセージを出力します。
- テーブルの CDC を有効にします。
- テーブルの CDC を無効にします。
- スキーマの進化手順 に従って、CDC が有効になっているテーブルの構造を変更します。
例: SQL Server コネクターのスキーマ変更トピックに送信されるメッセージ
以下の例は、スキーマ変更トピックのメッセージを示しています。メッセージには、テーブルスキーマの論理表現が含まれます。
| 項目 | フィールド名 | 説明 |
|---|---|---|
| 1 |
| コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。 ソースオブジェクトの ts_ms は、データベースで変更が行われた時刻を示す。payload.source.ts_ms の値を payload.ts_ms の値と比較することにより、ソースデータベースの更新と Debezium との間の遅延を判断できます。 |
| 2 |
| 変更が含まれるデータベースとスキーマを識別します。 |
| 3 |
|
SQL Server コネクターの場合は常に |
| 4 |
| DDL コマンドによって生成されるスキーマの変更が含まれる 1 つ以上の項目の配列。 |
| 5 |
| 変更の種類を説明します。値は以下のいずれかになります。
|
| 6 |
| 作成、変更、または破棄されたテーブルの完全な識別子。 |
| 7 |
| 適用された変更後のテーブルメタデータを表します。 |
| 8 |
| テーブルのプライマリーキーを設定する列のリスト。 |
| 9 |
| 変更されたテーブルの各列のメタデータ。 |
| 10 |
| 各テーブル変更のカスタム属性メタデータ。 |
コネクターがスキーマ変更トピックに送信するメッセージでは、キーはスキーマの変更が含まれるデータベースの名前です。以下の例では、payload フィールドにキーが含まれます。
2.7.2.11. Debezium SQL Server コネクターのデータ変更イベントの説明 リンクのコピーリンクがクリップボードにコピーされました!
Debezium SQL Server コネクターは、行レベルの INSERT、UPDATE、および DELETE 操作ごとにデータ変更イベントを生成します。各イベントにはキーと値が含まれます。キーと値の構造は、変更されたテーブルによって異なります。
Debezium および Kafka Connect は、イベントメッセージの継続的なストリーム を中心として設計されています。ただし、これらのイベントの構造は時間の経過とともに変化する可能性があり、コンシューマーによる処理が困難になることがあります。これに対応するために、各イベントにはコンテンツのスキーマが含まれます。スキーマレジストリーを使用している場合は、コンシューマーがレジストリーからスキーマを取得するために使用できるスキーマ ID が含まれます。これにより、各イベントが自己完結型になります。
以下のスケルトン JSON は、変更イベントの基本となる 4 つの部分を示しています。ただし、アプリケーションで使用するために選択した Kafka Connect コンバーターの設定方法によって、変更イベントのこれら 4 部分の表現が決定されます。schema フィールドは、変更イベントが生成されるようにコンバーターを設定した場合のみ変更イベントに含まれます。同様に、イベントキーおよびイベントペイロードは、変更イベントが生成されるようにコンバーターを設定した場合のみ変更イベントに含まれます。JSON コンバーターを使用し、変更イベントの基本となる 4 つの部分すべてを生成するように設定すると、変更イベントの構造は次のようになります。
| 項目 | フィールド名 | 説明 |
|---|---|---|
| 1 |
|
最初の |
| 2 |
|
最初の |
| 3 |
|
2 つ目の |
| 4 |
|
2 つ目の |
デフォルトでは、コネクターによって、変更イベントレコードがイベントの元のテーブルと同じ名前を持つトピックにストリーミングされます。詳細は、トピック名 を参照してください。
SQL Server コネクターは、すべての Kafka Connect スキーマ名が Avro スキーマ名の形式 に準拠するようにします。つまり、論理サーバー名はアルファベットまたはアンダースコア (a-z、A-Z、または _) で始まる必要があります。論理サーバー名の残りの各文字と、データベース名とテーブル名の各文字は、アルファベット、数字、またはアンダースコア ( a-z、A-Z、0-9、または \_) でなければなりません。無効な文字がある場合は、アンダースコアに置き換えられます。
論理サーバー名、データベース名、またはテーブル名に無効な文字が含まれ、名前を区別する唯一の文字が無効であると、無効な文字はすべてアンダースコアに置き換えられるため、予期せぬ競合が発生する可能性があります。
変更イベントの詳細は、以下を参照してください。
2.7.2.11.1. Debezium SQL Server 変更イベントのキー リンクのコピーリンクがクリップボードにコピーされました!
変更イベントのキーには、変更されたテーブルのキーのスキーマと、変更された行の実際のキーのスキーマが含まれます。スキーマとそれに対応するペイロードの両方には、コネクターによってイベントが作成された時点において、変更されたテーブルのプライマリーキー (または一意なキー制約) に存在した各列のフィールドが含まれます。
以下の customers テーブルについて考えてみましょう。この後に、このテーブルの変更イベントキーの例を示します。
テーブルの例
変更イベントキーの例
customers テーブルへの変更をキャプチャーする変更イベントのすべてに、イベントキースキーマがあります。customers テーブルに前述の定義がある限り、customers テーブルへの変更をキャプチャーする変更イベントのキー構造は、JSON では以下のようになります。
| 項目 | フィールド名 | 説明 |
|---|---|---|
| 1 |
|
キーのスキーマ部分は、キーの |
| 2 |
|
各フィールドの名前、型、および必要かどうかなど、 |
| 3 |
|
イベントキーの |
| 4 |
|
キーのペイロードの構造を定義するスキーマの名前。このスキーマは、変更されたテーブルのプライマリーキーの構造を記述します。キースキーマ名の形式は connector-name.database-schema-name.table-name.
|
| 5 |
|
この変更イベントが生成された行のキーが含まれます。この例では、キーには値が |
2.7.2.11.2. Debezium SQL Server 変更イベントの値 リンクのコピーリンクがクリップボードにコピーされました!
変更イベントの値はキーよりも若干複雑です。キーと同様に、値には schema セクションと payload セクションがあります。schema セクションには、入れ子のフィールドを含む、Envelope セクションの payload 構造を記述するスキーマが含まれています。データを作成、更新、または削除する操作のすべての変更イベントには、Envelope 構造を持つ値 payload があります。
変更イベントキーの例を紹介するために使用した、同じサンプルテーブルについて考えてみましょう。
このテーブルへの変更に対する変更イベントの値部分には、以下の各イベント型について記述されています。
create イベント
以下の例は、customers テーブルにデータを作成する操作に対して、コネクターによって生成される変更イベントの値の部分を示しています。
| 項目 | フィールド名 | 説明 |
|---|---|---|
| 1 |
| 値のペイロードの構造を記述する、値のスキーマ。変更イベントの値スキーマは、コネクターが特定のテーブルに生成するすべての変更イベントで同じになります。 |
| 2 |
|
|
| 3 |
|
|
| 4 |
|
|
| 5 |
|
値の実際のデータ。これは、変更イベントが提供する情報です。 |
| 6 |
|
イベント発生前の行の状態を指定する任意のフィールド。この例のように、 |
| 7 |
|
イベント発生後の行の状態を指定する任意のフィールド。この例では、 |
| 8 |
| イベントのソースメタデータを記述する必須のフィールド。このフィールドには、イベントの発生元、イベントの発生順序、およびイベントが同じトランザクションの一部であるかどうかなど、このイベントと他のイベントを比較するために使用できる情報が含まれています。ソースメタデータには以下が含まれています。
|
| 9 |
|
コネクターによってイベントが生成される原因となった操作の型を記述する必須文字列。この例では、
|
| 10 |
|
コネクターがイベントを処理した時間を表示する任意のフィールド。イベントメッセージエンベロープでは、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。 |
更新 イベント
サンプル customers テーブルにある更新の変更イベントの値には、そのテーブルの 作成 イベントと同じスキーマがあります。同様に、イベント値のペイロードは同じ構造を持ちます。ただし、イベント値ペイロードでは 更新 イベントに異なる値が含まれます。以下は、コネクターによって customers テーブルでの更新に生成されるイベントの変更イベント値の例になります。
| 項目 | フィールド名 | 説明 |
|---|---|---|
| 1 |
|
イベント発生前の行の状態を指定する任意のフィールド。更新 イベント値の |
| 2 |
|
イベント発生後の行の状態を指定する任意のフィールド。 |
| 3 |
|
イベントのソースメタデータを記述する必須のフィールド。
|
| 4 |
|
操作の型を記述する必須の文字列。更新 イベントの値では、 |
| 5 |
|
コネクターがイベントを処理した時間を表示する任意のフィールド。イベントメッセージエンベロープでは、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。 |
行のプライマリーキー/一意キーの列を更新すると、行のキーの値が変更されます。キーが変更されると、3 つ のイベントが Debezium によって出力されます。3 つのイベントとは、delete イベント、行に古いキーが含まれる tombstone イベント、および行に新しいキーが含まれる create イベントを指します。
delete イベント
削除 変更イベントの値は、同じテーブルの 作成 および 更新 イベントと同じ schema の部分になります。サンプル customers テーブルの 削除 イベントの payload 部分は以下のようになります。
| 項目 | フィールド名 | 説明 |
|---|---|---|
| 1 |
|
イベント発生前の行の状態を指定する任意のフィールド。削除 イベント値の |
| 2 |
|
イベント発生後の行の状態を指定する任意のフィールド。削除 イベント値の |
| 3 |
|
イベントのソースメタデータを記述する必須のフィールド。削除 イベント値の
|
| 4 |
|
操作の型を記述する必須の文字列。 |
| 5 |
|
コネクターがイベントを処理した時間を表示する任意のフィールド。イベントメッセージエンベロープでは、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。 |
SQL Server コネクターイベントは、Kafka ログコンパクション と動作するように設計されています。ログコンパクションにより、少なくとも各キーの最新のメッセージが保持される限り、一部の古いメッセージを削除できます。これにより、トピックに完全なデータセットが含まれ、キーベースの状態のリロードに使用できるようにするとともに、Kafka がストレージ領域を確保できるようにします。
tombstone イベント
行が削除された場合でも、Kafka は同じキーを持つ以前のメッセージをすべて削除できるため、削除 イベントの値はログコンパクションで動作します。ただし、Kafka が同じキーを持つすべてのメッセージを削除するには、メッセージの値が null である必要があります。これを可能にするために、Debezium の SQL Server コネクターは delete イベントを出力した後に、null 値以外の同じキーを持つ、特別な廃棄 (tombstone) イベントを出力します。
2.7.2.12. トランザクション境界を表す Debezium SQL Server コネクターによって生成されたイベント リンクのコピーリンクがクリップボードにコピーされました!
Debezium は、トランザクション境界を表し、データ変更イベントメッセージを強化するイベントを生成できます。
Debezium は、コネクターのデプロイ後に発生するトランザクションに対してのみメタデータを登録し、受信します。コネクターをデプロイする前に発生するトランザクションのメタデータは利用できません。
データベーストランザクションは、キーワード BEGIN および END で囲まれたステートメントブロックによって表されます。Debezium は、すべてのトランザクションで BEGIN および END 区切り文字のトランザクション境界イベントを生成します。トランザクション境界イベントには以下のフィールドが含まれます。
status-
BEGINまたはEND id- 一意のトランザクション識別子の文字列表現。
ts_ms-
データソースでのトランザクション境界イベント (
BEGINまたはENDイベント) の時間。データソースから Debezium にイベント時間を渡されない場合、フィールドは代わりに Debezium がイベントを処理する時間を表します。 event_count(ENDイベント用)- トランザクションによって出力されるイベントの合計数。
data_collections(ENDイベント用)-
data_collectionとevent_count要素のペアの配列。これは、コネクターがデータコレクションから発信された変更に対して出力するイベントの数を示します。
Debezium には、トランザクションがいつ終了したかを確実に識別する方法がありません。このように、トランザクション END マーカーは、別のトランザクションの最初のイベントが到着した後にのみ発行されます。これにより、トラフィックの少ないシステムの場合、END マーカーの配信が遅れる可能性があります。
以下の例は、典型的なトランザクション境界メッセージを示しています。
例: SQL Server コネクタートランザクション境界イベント
topic.transaction オプションで上書きされない限り、トランザクションイベントは <topic.prefix>.transaction という名前のトピックに書き込まれます。
2.7.2.12.1. 変更データイベントのエンリッチメント リンクのコピーリンクがクリップボードにコピーされました!
トランザクションメタデータを有効にすると、データメッセージ Envelope は新しい transaction フィールドで強化されます。このフィールドは、複合フィールドの形式ですべてのイベントに関する情報を提供します。
id- 一意のトランザクション識別子の文字列表現。
total_order- トランザクションによって生成されたすべてのイベントを対象とするイベントの絶対位置。
data_collection_order- トランザクションによって出力されたすべてのイベントを対象とするイベントのデータコレクションごとの位置。
以下の例は、典型的なメッセージの例を示しています。
2.7.2.13. Debezium SQL Server コネクターによるデータ型のマッピング方法 リンクのコピーリンクがクリップボードにコピーされました!
Debezium SQL Server コネクターは、行が存在するテーブルのように構造化されたイベントを生成して、テーブル行データへの変更を表します。各イベントには、行のコラム値を表すフィールドが含まれます。イベントが操作のコラム値を表す方法は、列の SQL データ型によって異なります。このイベントで、コネクターは各 SQL Server データ型のフィールドを リテラル型 と セマンティック型 の両方にマップします。
コネクターは SQL Server のデータ型を リテラル 型および セマンティック 型の両方にマップできます。
- リテラル型
-
Kafka Connect のスキーマタイプ (
INT8、INT16、INT32、INT64、FLOAT32、FLOAT64、BOOLEAN、STRING、BYTES、ARRAY、MAP、STRUCT) を使用して、値が文字通りどのように表現されるかを記述します。 - セマンティック型
- フィールドの Kafka Connect スキーマの名前を使用して、Kafka Connect スキーマがフィールドの 意味 をキャプチャーする方法を記述します。
デフォルトのデータ型変換が要件に合わない場合は、コネクター用の カスタムコンバーターの作成 が可能です。
データ型マッピングの詳細は、以下を参照してください。
基本型
以下の表は、コネクターによる基本的な SQL Server データ型のマッピング方法を示しています。
| SQL Server のデータ型 | リテラル型 (スキーマ型) | セマンティック型 (スキーマ名) および注記 |
|---|---|---|
|
|
| 該当なし |
|
|
| 該当なし |
|
|
| 該当なし |
|
|
| 該当なし |
|
|
| 該当なし |
|
|
| 該当なし |
|
|
| 該当なし |
|
|
| 該当なし |
|
|
| 該当なし |
|
|
| 該当なし |
|
|
| 該当なし |
|
|
| 該当なし |
|
|
| 該当なし |
|
|
|
|
|
|
|
|
その他のデータ型マッピングは、以下のセクションで説明します。
列のデフォルト値がある場合は、対応するフィールドの Kafka Connect スキーマに伝達されます。変更メッセージには、フィールドのデフォルト値が含まれます (明示的な列値が指定されていない場合)。そのため、スキーマからデフォルト値を取得する必要はほとんどありません。
時間値
タイムゾーン情報が含まれる SQL Server の DATETIMEOFFSET 以外の時間型は、time.precision.mode 設定プロパティーの値によって異なります。time.precision.mode 設定プロパティーが adaptive (デフォルト) に設定された場合、コネクターは列のデータ型を基に時間型のリテラルおよびセマンティック型を決定し、イベントが 正確 にデータベースの値を表すようにします。
| SQL Server のデータ型 | リテラル型 (スキーマ型) | セマンティック型 (スキーマ名) および注記 |
|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
time.precision.mode 設定プロパティーが connect に設定された場合、コネクターは事前定義された Kafka Connect の論理型を使用します。これは、コンシューマーが組み込みの Kafka Connect の論理型のみを認識し、可変精度の時間値を処理できない場合に便利です。一方で、SQL Server はマイクロ秒の 10 分の 1 の精度をサポートするため、connect 時間精度モードでコネクターによって生成されたイベントは、データ列の 少数秒の精度 値が 3 よりも大きい場合に 精度が失われます。
| SQL Server のデータ型 | リテラル型 (スキーマ型) | セマンティック型 (スキーマ名) および注記 |
|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
タイムスタンプ値
DATETIME、SMALLDATETIME および DATETIME2 タイプは、タイムゾーン情報のないタイムスタンプを表します。このような列は、UTC を基にして同等の Kafka Connect 値に変換されます。たとえば、"2018-06-20 15:13:16.945104" という DATETIME2 の値は、"1529507596945104" という値の io.debezium.time.MicroTimestamp で表されます。
Kafka Connect および Debezium を実行している JVM のタイムゾーンは、この変換には影響しないことに注意してください。
10 進数値
Debezium コネクターは、decimal.handling.mode コネクター設定プロパティー の設定にしたがって 10 進数を処理します。
- decimal.handling.mode=precise
Expand 表2.176 decimal.handling.mode=precise の場合のマッピング SQL Server タイプ リテラル型 (スキーマ型) セマンティック型 (スキーマ名) NUMERIC[(P[,S])]BYTESorg.apache.kafka.connect.data.Decimal
scaleスキーマパラメーターには、小数点を移動した桁数を表す整数が含まれます。DECIMAL[(P[,S])]BYTESorg.apache.kafka.connect.data.Decimal
scaleスキーマパラメーターには、小数点を移動した桁数を表す整数が含まれます。SMALLMONEYBYTESorg.apache.kafka.connect.data.Decimal
scaleスキーマパラメーターには、小数点を移動した桁数を表す整数が含まれます。MONEYBYTESorg.apache.kafka.connect.data.Decimal
scaleスキーマパラメーターには、小数点を移動した桁数を表す整数が含まれます。- decimal.handling.mode=double
Expand 表2.177 decimal.handling.mode=double の場合のマッピング SQL Server タイプ リテラル型 セマンティック型 NUMERIC[(M[,D])]FLOAT64該当なし
DECIMAL[(M[,D])]FLOAT64該当なし
SMALLMONEY[(M[,D])]FLOAT64該当なし
MONEY[(M[,D])]FLOAT64該当なし
- decimal.handling.mode=string
Expand 表2.178 decimal.handling.mode=string の場合のマッピング SQL Server タイプ リテラル型 セマンティック型 NUMERIC[(M[,D])]STRING該当なし
DECIMAL[(M[,D])]STRING該当なし
SMALLMONEY[(M[,D])]STRING該当なし
MONEY[(M[,D])]STRING該当なし
2.7.3. Debezium コネクターを実行するための SQL Server のセットアップ リンクのコピーリンクがクリップボードにコピーされました!
Debezium が SQL Server テーブルから変更イベントをキャプチャーするには、必要な権限を持つ SQL Server の管理者が最初にクエリーを実行してデータベースで CDC を有効にします。その後、管理者は Debezium がキャプチャーする各テーブルに対して、CDC を有効にする必要があります。
デフォルトでは、Microsoft SQL Server への JDBC 接続は SSL 暗号化によって保護されています。SQL Server データベースで SSL が有効になっていない場合、または SSL を使用せずにデータベースに接続する場合は、コネクター設定で database.encrypt プロパティーの値を false に設定して SSL を無効にできます。
Debezium コネクターと使用するための SQL Server の設定に関する詳細は、以下を参照してください。
CDC の適用後、CDC が有効になっているテーブルにコミットされる INSERT、UPDATE、および DELETE 操作がすべてキャプチャーされます。その後、Debezium コネクターはこれらのイベントをキャプチャーして Kafka トピックに出力できます。
2.7.3.1. SQL Server データベースでの CDC の有効化 リンクのコピーリンクがクリップボードにコピーされました!
テーブルの CDC を有効にする前に、SQL Server データベースに対して CDC を有効にする必要があります。SQL Server 管理者は、システムストアドプロシージャーを実行して CDC を有効にします。システムストアドプロシージャーは、SQL Server Management Studio または Transact-SQL を使用すると実行できます。
前提条件
- SQL Server の sysadmin 固定サーバーロールのメンバーである。
- データベースの db_owner である。
- SQL Server Agent が稼働している。
SQL Server の CDC 機能は、ユーザーが作成したテーブルでのみ発生する変更を処理します。SQL Server master データベースで CDC を有効にすることはできません。
手順
- SQL Server Management Studio の View メニューから Template Explorer をクリックします。
- Template Browser で、SQL Server Templates をデプロイメントします。
- Change Data Capture > Configuration をデプロイメントした後、Enable Database for CDC をクリックします。
-
テンプレートで、
USEステートメントのデータベース名を、CDC に対して有効にするデータベースの名前に置き換えます。 ストアドプロシージャー
sys.sp_cdc_enable_dbを実行して、CDC 用のデータベースを有効にします。データベースが CDC に対して有効になったら、
cdcという名前のスキーマ、CDC ユーザー、メタデータテーブル、およびその他のシステムオブジェクトが作成されます。以下の例は、データベース
MyDBに対して CDC を有効にする方法を示しています。例: CDC テンプレートに対する SQL Server データベースの有効化
USE MyDB GO EXEC sys.sp_cdc_enable_db GO
USE MyDB GO EXEC sys.sp_cdc_enable_db GOCopy to Clipboard Copied! Toggle word wrap Toggle overflow
2.7.3.2. SQL Server テーブルでの CDC の有効化 リンクのコピーリンクがクリップボードにコピーされました!
SQL Server 管理者は、Debezium がキャプチャーするソーステーブルで変更データキャプチャーを有効にする必要があります。データベースが CDC に対してすでに有効になっている必要があります。テーブルで CDC を有効にするには、SQL Server 管理者はストアドプロシージャー sys.sp_cdc_enable_table をテーブルに対して実行します。ストアドプロシージャーは、SQL Server Management Studio または Transact-SQL を使用すると実行できます。キャプチャーするすべてのテーブルに対して SQL Server の CDC を有効にする必要があります。
前提条件
- CDC が SQL Server データベースで有効になっている。
- SQL Server Agent が稼働している。
-
データベースの
db_owner固定データベースロールのメンバーである。
手順
- SQL Server Management Studio の View メニューから Template Explorer をクリックします。
- Template Browser で、SQL Server Templates をデプロイメントします。
- Change Data Capture > Configuration をデプロイメントした後、Enable Table Specifying Filegroup Option をクリックします。
-
テンプレートで、
USEステートメントのテーブル名を、キャプチャーするテーブルの名前に置き換えます。 ストアドプロシージャー
sys.sp_cdc_enable_tableを実行します。以下の例は、テーブル
MyTableに対して CDC を有効にする方法を示しています。例: SQL Server テーブルに対する CDC の有効化
Copy to Clipboard Copied! Toggle word wrap Toggle overflow <.> キャプチャーするテーブルの名前を指定します。<.> ソーステーブルのキャプチャーされた列で
SELECT権限を付与するユーザーを追加できるMyRoleロールを指定します。sysadminまたはdb_ownerロールのユーザーも、指定された変更テーブルにアクセスできます。@role_nameの値が明示的にNULLに設定されている場合、キャプチャーされた情報へのアクセスを制限するロールは使用されません。<.> キャプチャーされたテーブルの変更テーブルを SQL Server が配置するfilegroupを指定します。指定されたfilegroupは、すでに存在している必要があります。ソーステーブルに使用するのと同じfilegroupに変更テーブルを置かないことが推奨されます。
2.7.3.3. ユーザーが CDC テーブルにアクセスできることの確認 リンクのコピーリンクがクリップボードにコピーされました!
SQL Server 管理者は、システムストアドプロシージャを実行してデータベースまたはテーブルをクエリーし、その CDC 設定情報を取得できます。ストアドプロシージャーは、SQL Server Management Studio または Transact-SQL を使用すると実行できます。
前提条件
-
キャプチャーインスタンスのキャプチャーされたすべての列に対して
SELECT権限を持っている。db_ownerデータベースロールのメンバーは、定義されたすべてのキャプチャーインスタンスの情報を確認できます。 - クエリーに含まれるテーブル情報に定義したゲーティングロールへのメンバーシップがある。
手順
- SQL Server Management Studio の View メニューから Object Explorer をクリックします。
- Object Explorer から Databases をデプロイメントし、MyDB などのデータベースオブジェクトをデプロイメントします。
- Programmability > Stored Procedures > System Stored Procedures をデプロイメントします。
sys.sp_cdc_help_change_data_captureストアドプロシージャを実行して、テーブルを問い合わせます。クエリーは空の結果を返しません。
次の例では、データベース
MyDB上でストアドプリファレンスsys.sp_cdc_help_change_data_captureを実行します。例: CDC 設定情報のテーブルのクエリー
USE MyDB; GO EXEC sys.sp_cdc_help_change_data_capture GO
USE MyDB; GO EXEC sys.sp_cdc_help_change_data_capture GOCopy to Clipboard Copied! Toggle word wrap Toggle overflow クエリーは、CDC に対して有効になっているデータベースの各テーブルの設定情報を返し、呼び出し元のアクセスが許可される変更データが含まれます。結果が空の場合は、ユーザーにキャプチャーインスタンスと CDC テーブルの両方にアクセスできる権限があることを確認します。
2.7.3.4. Azure 上の SQL Server リンクのコピーリンクがクリップボードにコピーされました!
Debezium SQL Server コネクターは Azure の SQL Server と併用できます。CDC for SQL Server on Azure の設定と Debezium での使用は、こちらの例 を参考にしてください。
2.7.3.5. SQL Server キャプチャージョブエージェント設定のサーバー負荷およびレイテンシーへの影響 リンクのコピーリンクがクリップボードにコピーされました!
データベース管理者がソーステーブルに対して変更データキャプチャーを有効にすると、キャプチャージョブエージェントの実行が開始されます。エージェントは新しい変更イベントレコードをトランザクションログから読み取り、イベントレコードを変更データテーブルに複製します。変更がソーステーブルにコミットされてから、対応する変更テーブルに変更が反映される間、常に短いレイテンシーが間隔で発生します。この遅延間隔は、ソーステーブルで変更が発生したときから、Debezium がその変更を Apache Kafka にストリーミングできるようになるまでの差を表します。
データの変更に素早く対応する必要があるアプリケーションについては、ソースと変更テーブル間で密接に同期を維持するのが理想的です。キャプチャーエージェントを実行してできるだけ迅速に変更イベントを継続的に処理すると、スループットが増加し、レイテンシーが減少するため、イベントの発生後にほぼリアルタイムで新しいイベントレコードが変更テーブルに入力されることを想像するかもしれません。しかし、これは必ずしもそうであるとは限りません。同期を即時に行うとパフォーマンスに影響します。キャプチャージョブエージェントが新しいイベントレコードについてデータベースにクエリーを実行するたびに、データベースホストの CPU 負荷が増加します。サーバーへの負荷が増えると、データベース全体のパフォーマンスに悪影響を及ぼす可能性があり、特にデータベースの使用がピークに達するときにトランザクションの効率が低下する可能性があります。
データベースメトリクスを監視して、サーバーがキャプチャーエージェントのアクティビティーをサポートできなくなるレベルにデータベースが達した場合に認識できるようにすることが重要となります。パフォーマンスの問題を認識した場合、データベースホストの全体的な CPU 負荷を許容できるレイテンシーで調整するために、SQL Server のキャプチャーエージェント設定を変更できます。
2.7.3.6. SQL Server のキャプチャージョブエージェントの設定パラメーター リンクのコピーリンクがクリップボードにコピーされました!
SQL Server では、キャプチャージョブエージェントの動作を制御するパラメーターは SQL Server テーブル msdb.dbo.cdc_jobs に定義されます。キャプチャージョブエージェントの実行中にパフォーマンスの問題が発生した場合は、sys.sp_cdc_change_job ストアドプロシージャーを実行し、新しい値を指定することで、キャプチャージョブ設定を調整し、CPU の負荷を軽減します。
SQL Server のキャプチャージョブエージェントパラメーターの設定方法に関する具体的なガイダンスは、このドキュメントの範囲外となります。
以下のパラメーターは、Debezium SQL Server コネクターと使用するキャプチャーエージェントの動作を変更する場合に最も重要になります。
pollinginterval- キャプチャーエージェントがログスキャンのサイクルで待機する秒数を指定します。
- 値が大きいほど、データベースホストの負荷が減少し、レイテンシーが増加します。
-
0を値として指定すると、スキャン間の待ち時間はありません。 -
デフォルト値は
5です。
maxtrans-
各ログスキャンサイクル中に処理するトランザクションの最大数を指定します。キャプチャージョブが指定の数のトランザクションを処理したら、次のスキャンを開始する前に
pollingintervalによって指定された期間、一時停止します。 - 値が小さいほど、データベースホストの負荷が減少し、レイテンシーが増加します。
-
デフォルト値は
500です。
-
各ログスキャンサイクル中に処理するトランザクションの最大数を指定します。キャプチャージョブが指定の数のトランザクションを処理したら、次のスキャンを開始する前に
maxscans-
キャプチャージョブが、データベーストランザクションログの完全な内容のキャプチャーを試みるスキャンサイクルの数の制限を指定します。
continuousパラメーターが1に設定されると、ジョブはスキャンを再開する前にpollingintervalで指定された期間一時停止します。 - 値が小さいほど、データベースホストの負荷が減少し、レイテンシーが増加します。
-
デフォルト値は
10です。
-
キャプチャージョブが、データベーストランザクションログの完全な内容のキャプチャーを試みるスキャンサイクルの数の制限を指定します。
関連情報
- キャプチャーエージェントパラメーターの詳細は、SQL Server のドキュメントを参照してください。
2.7.4. Debezium SQL Server コネクターのデプロイ リンクのコピーリンクがクリップボードにコピーされました!
以下の方法のいずれかを使用して Debezium SQL Server コネクターをデプロイできます。
Streams for Apache Kafka を使用して、コネクタープラグインを含むイメージを自動的に作成します。
以下は推奨される方法です。
-
Dockerfile からカスタム Kafka Connect コンテナーイメージをビルドします。
この Containerfile デプロイメント方法は非推奨となりました。この方法の手順は、ドキュメントの今後のバージョンで削除される予定です。
2.7.4.1. Streams for Apache Kafka を使用した SQL Server コネクターのデプロイメント リンクのコピーリンクがクリップボードにコピーされました!
Debezium コネクターのデプロイで推奨される方法は、Streams for Apache Kafka を使用して、コネクタープラグインを含む Kafka Connect コンテナーイメージを構築することです。
デプロイメントプロセス中に、以下のカスタムリソース (CR) を作成し、使用します。
-
Kafka Connect インスタンスを定義し、コネクターアーティファクトに関する情報をイメージに含める必要がある
KafkaConnectCR。 -
コネクターがソースデータベースにアクセスするために使用する情報を提供する
KafkaConnectorCR。Streams for Apache Kafka が Kafka Connect Pod を起動した後、KafkaConnectorCR を適用してコネクターを起動します。
Kafka Connect イメージのビルド仕様では、デプロイ可能なコネクターを指定できます。各コネクタープラグインに対して、デプロイメントに利用可能にする他のコンポーネントを指定することもできます。たとえば、Apicurio Registry アーティファクトや Debezium スクリプトコンポーネントを追加できます。Streams for Apache Kafka は、Kafka Connect イメージをビルドするときに、指定されたアーティファクトをダウンロードし、それをイメージに組み込みます。
Kafka Connect CR の spec.build.output パラメーターは、生成される KafkaConnect コンテナーイメージを格納する場所を指定します。コンテナーイメージは、quay.io などのコンテナーレジストリー、または OpenShift ImageStream に保存できます。イメージを ImageStream に保存するには、Kafka Connect をデプロイする前に ImageStream を作成する必要があります。ImageStreams は自動的に作成されません。
KafkaConnect リソースを使用してクラスターを作成する場合は、Kafka Connect REST API を使用してコネクターを作成または更新できません。ただし、REST API を使用して情報を取得できます。
関連情報
- 「Streams for Apache Kafka on OpenShift のデプロイと管理」の Kafka Connect の設定
- 「Streams for Apache Kafka on OpenShift のデプロイと管理」の 新しいコンテナーイメージの自動ビルド
2.7.4.2. Streams for Apache Kafka を使用した Debezium SQL Server コネクターのデプロイ リンクのコピーリンクがクリップボードにコピーされました!
以前のバージョンの Streams for Apache Kafka では、OpenShift に Debezium コネクターをデプロイするには、まずコネクター用の Kafka Connect イメージをビルドする必要がありました。OpenShift にコネクターをデプロイするための現在の推奨方法は、Streams for Apache Kafka のビルド設定を使用して、使用する Debezium コネクタープラグインを含む Kafka Connect コンテナーイメージを自動的にビルドすることです。
ビルドプロセス中に、Streams for Apache Kafka Operator は、Debezium コネクター定義を含む KafkaConnect カスタムリソースの入力パラメーターを Kafka Connect コンテナーイメージに変換します。このビルドは、Red Hat Maven リポジトリーまたは別の設定済みの HTTP サーバーから必要なアーティファクトをダウンロードします。
新規に作成されたコンテナーは .spec.build.output に指定されるコンテナーレジストリーにプッシュされ、Kafka Connect クラスターのデプロイに使用されます。Streams for Apache Kafka が Kafka Connect イメージをビルドした後、ビルドに含まれるコネクターを起動するための KafkaConnector カスタムリソースを作成します。
前提条件
- クラスター Operator がインストールされている OpenShift クラスターにアクセスできる。
- Streams for Apache Kafka Operator が実行されている。
- Apache Kafka クラスターが Streams for Apache Kafka on OpenShift のデプロイと管理 に記載されているとおりにデプロイされている。
- Kafka Connect が Streams for Apache Kafka にデプロイされている。
- Red Hat build of Debezium のライセンスを所有している。
-
OpenShift
ocCLI クライアントがインストールされている、または OpenShift Container Platform Web コンソールにアクセスできる。 Kafka Connect ビルドイメージの保存方法に応じて、レジストリーのパーミッションを用意するか、ImageStream リソースを作成している。
- ビルドイメージを Red Hat Quay.io または Docker Hub などのイメージレジストリーに保存する場合は、以下が必要です。
- レジストリーでイメージを作成し、管理するためのアカウントおよびパーミッション
- ビルドイメージをネイティブ OpenShift ImageStream として保存する場合は、以下を行います。
- ImageStream リソースを新規コンテナーイメージを保存するためにクラスターにデプロイします。クラスターの ImageStream を明示的に作成する必要があります。ImageStreams は、デフォルトでは利用できません。ImageStreams の詳細は、OpenShift Container Platform ドキュメント イメージストリームの管理 を参照してください。
手順
- OpenShift クラスターにログインします。
コネクターの Debezium
KafkaConnectカスタムリソース (CR) を作成するか、既存のリソースを変更します。たとえば、metadata.annotationsおよびspec.buildプロパティーを指定するdbz-connect.yamlという名前のKafkaConnectCR を作成します。以下の例は、KafkaConnectカスタムリソースを記述するdbz-connect.yamlファイルからの抜粋を示しています。
例2.50 Debezium コネクターを含む
KafkaConnectカスタムリソースを定義したdbz-connect.yamlファイル次の例では、カスタムリソースは、次のアーティファクトをダウンロードするように設定されています。
- Debezium SQL Server コネクターアーカイブ。
- Red Hat build of Apicurio Registry アーカイブApicurio Registry はオプションのコンポーネントです。コネクターで Avro シリアル化を使用する場合にのみ、Apicurio Registry コンポーネントを追加します。
- Debezium スクリプティング SMT アーカイブと、Debezium コネクターで使用する関連スクリプティングエンジン。SMT アーカイブとスクリプト言語の依存関係はオプションのコンポーネントです。Debezium コンテンツベースのルーティング SMT または フィルター SMT を使用する場合にのみ、これらのコンポーネントを追加します。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow Expand 表2.179 Kafka Connect 設定の説明 項目 説明 1
strimzi.io/use-connector-resourcesアノテーションを"true"に設定して、クラスター Operator がKafkaConnectorリソースを使用してこの Kafka Connect クラスター内のコネクターを設定できるようにします。2
spec.build設定は、ビルドイメージの保存場所を指定し、プラグインアーティファクトの場所とともにイメージに追加するプラグインをリストします。3
build.outputは、新しくビルドされたイメージを保存するレジストリーを指定します。4
イメージ出力の名前およびイメージ名を指定します。
output.typeの有効な値は、Docker Hub や Quay などのコンテナーレジストリーにプッシュする場合はdocker、内部の OpenShift ImageStream にイメージをプッシュする場合はimagestreamです。ImageStream を使用するには、ImageStream リソースをクラスターにデプロイする必要があります。KafkaConnect 設定でbuild.outputを指定する方法の詳細は、Streams for Apache Kafka API リファレンスの スキーマ参照のビルド を参照してください。5
plugins設定は、Kafka Connect イメージに追加するすべてのコネクターをリストします。リストの各エントリーについて、プラグインnameと、コネクターのビルドに必要なアーティファクトに関する情報を指定します。必要に応じて、各コネクタープラグインに対して、コネクターと使用できる他のコンポーネントを含めることができます。たとえば、Service Registry アーティファクトまたは Debezium スクリプトコンポーネントを追加できます。6
artifacts.typeの値は、artifacts.urlで指定するアーティファクトのファイルタイプを指定します。有効なタイプはzip、tgz、またはjarです。Debezium コネクターアーカイブは、.zipファイル形式で提供されます。typeの値は、urlフィールドで参照されるファイルのタイプと一致させる必要があります。7
artifacts.urlの値は、コネクターアーティファクトのファイルを格納する Maven リポジトリーなどの HTTP サーバーのアドレスを指定します。Debezium コネクターアーティファクトは Red Hat リポジトリーで入手できます。OpenShift クラスターが指定されたサーバーにアクセスできる必要があります。8
(オプション) Apicurio Registry コンポーネントをダウンロードするためのアーティファクト
typeとurlを指定します。デフォルトの JSON コンバーターを使用する代わりに、コネクターが Apache Avro を使用して Red Hat build of Apicurio Registry でイベントのキーと値をシリアル化する場合にのみ、Apicurio Registry アーティファクトを含めます。9
(オプション) Debezium コネクターで使用する Debezium スクリプト SMT アーカイブのアーティファクト
typeとurlを指定します。Debezium コンテンツベースルーティング SMT または フィルター SMT を使用する場合にのみ、スクリプト SMT を含めます。スクリプト SMT を使用するには、groovy などの JSR 223 準拠のスクリプト実装もデプロイする必要があります。10
(オプション) JSR 223 準拠のスクリプト実装の JAR ファイルのアーティファクト
typeとurlを指定します。これは、Debezium スクリプト SMT で必要です。重要Streams for Apache Kafka を使用してコネクタープラグインを Kafka Connect イメージに組み込む場合、必要なスクリプト言語コンポーネントごとに、
artifacts.urlに JAR ファイルのロケーションを指定し、artifacts.typeの値もjarに設定する必要があります。値が無効な場合は、実行時にコネクターが失敗します。スクリプト SMT で Apache Groovy 言語を使用できるようにするために、この例のカスタムリソースは、次のライブラリーの JAR ファイルを取得します。
-
groovy -
groovy-jsr223(スクリプトエージェント) -
groovy-json(JSON 文字列を解析するためのモジュール)
別の方法として、Debezium スクリプト SMT は、GraalVM JavaScript の JSR 223 実装の使用もサポートします。
以下のコマンドを入力して、
KafkaConnectビルド仕様を OpenShift クラスターに適用します。oc create -f dbz-connect.yaml
oc create -f dbz-connect.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow Streams Operator はカスタムリソースで指定された設定に基づいて、デプロイする Kafka Connect イメージを準備します。
ビルドが完了すると、Operator はイメージを指定されたレジストリーまたは ImageStream にプッシュし、Kafka Connect クラスターを起動します。設定にリスト表示されているコネクターアーティファクトはクラスターで利用できます。KafkaConnectorリソースを作成し、デプロイする各コネクターのインスタンスを定義します。
たとえば、以下のKafkaConnectorCR を作成し、sqlserver-inventory-connector.yamlとして保存します。例2.51 Debezium コネクターの
KafkaConnectorカスタムリソースを定義するsqlserver-inventory-connector.yamlファイルCopy to Clipboard Copied! Toggle word wrap Toggle overflow Expand 表2.180 コネクター設定の説明 項目 説明 1
Kafka Connect クラスターに登録するコネクターの名前。
2
コネクタークラスの名前。
3
同時に動作できるタスクの数。
4
コネクターの設定。
5
ホストデータベースインスタンスのアドレス。
6
データベースインスタンスのポート番号。
7
Debezium がデータベースへの接続に使用するアカウントの名前。
8
Debezium がデータベースユーザーアカウントに接続するために使用するパスワード。
9
データベースインスタンスまたはクラスターのトピック接頭辞。
指定する名前は、英数字またはアンダースコアのみで設定する必要があります。
トピック接頭辞は、このコネクターから変更イベントを受信する Kafka トピックの接頭辞として使用されるため、名前はクラスターのコネクター間で一意である必要があります。
コネクターを Avro コネクター と統合する場合、この namespace は、関連する Kafka Connect スキーマの名前や、対応する Avro スキーマの namespace でも使用されます。10
コネクターが変更イベントをキャプチャーするテーブルのリスト。
以下のコマンドを実行してコネクターリソースを作成します。
oc create -n <namespace> -f <kafkaConnector>.yaml
oc create -n <namespace> -f <kafkaConnector>.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow 以下に例を示します。
oc create -n debezium -f sqlserver-inventory-connector.yaml
oc create -n debezium -f sqlserver-inventory-connector.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow コネクターは Kafka Connect クラスターに登録され、
KafkaConnectorCR のspec.config.database.dbnameで指定されたデータベースに対して実行を開始します。コネクター Pod の準備ができると、Debezium が実行されます。
これで、Debezium SQL ディプロイメントを検証する 準備が整いました。
2.7.4.3. Dockerfile からカスタム Kafka Connect コンテナーイメージをビルドして Debezium SQL Server コネクターのデプロイ リンクのコピーリンクがクリップボードにコピーされました!
Debezium SQL Server コネクターをデプロイするには、Debezium コネクターアーカイブが含まれるカスタム Kafka Connect コンテナーイメージをビルドし、このコンテナーイメージをコンテナーレジストリーにプッシュする必要があります。次に、以下のカスタムリソース (CR) を作成する必要があります。
-
Kafka Connect インスタンスを定義する
KafkaConnectCR。imageは Debezium コネクターを実行するために作成したイメージの名前を指定します。この CR は、Red Hat Streams for Apache Kafka がデプロイされている OpenShift インスタンスに適用します。Streams for Apache Kafka は、Apache Kafka を OpenShift に導入する Operator とイメージを提供します。 -
Debezium SQL Server コネクターを定義する
KafkaConnectorCR。この CR をKafkaConnectCR を適用するのと同じ OpenShift インスタンスに適用します。
前提条件
- SQL Server が稼働し、Debezium コネクターと連携するように SQL Server を設定する手順 が完了済みである必要があります。
- Streams for Apache Kafka が OpenShift にデプロイされ、Apache Kafka および Kafka Connect が実行されている。詳細は、Streams for Apache Kafka on OpenShift のデプロイと管理 を参照してください。
- Podman または Docker がインストールされている。
-
Debezium コネクターを実行するコンテナーを追加する予定のコンテナーレジストリー (
quay.ioやdocker.ioなど) でコンテナーを作成および管理するアカウントとパーミッションを持っている。
手順
Kafka Connect の Debezium SQL Server コンテナーを作成します。
registry.redhat.io/amq-streams/kafka-39-rhel9:2.9.0をベースイメージとして使用する Dockerfile を作成します。たとえば、ターミナルウィンドウから、以下のコマンドを入力します。Copy to Clipboard Copied! Toggle word wrap Toggle overflow Expand 項目 説明 1
任意のファイル名を指定できます。
2
Kafka Connect プラグインディレクトリーへのパスを指定します。Kafka Connect のプラグインディレクトリーが別の場所にある場合は、このパスを実際のディレクトリーのパスに置き換えてください。
このコマンドは、現在のディレクトリーに
debezium-container-for-sqlserver.yamlという名前の Dockerfile を作成します。前のステップで作成した
debezium-container-for-sqlserver.yamlDocker ファイルからコンテナーイメージをビルドします。ファイルが含まれるディレクトリーから、ターミナルウィンドウを開き、以下のコマンドのいずれかを入力します。podman build -t debezium-container-for-sqlserver:latest .
podman build -t debezium-container-for-sqlserver:latest .Copy to Clipboard Copied! Toggle word wrap Toggle overflow docker build -t debezium-container-for-sqlserver:latest .
docker build -t debezium-container-for-sqlserver:latest .Copy to Clipboard Copied! Toggle word wrap Toggle overflow 上記のコマンドは、
debezium-container-for-sqlserverという名前のコンテナーイメージを構築します。カスタムイメージを quay.io などのコンテナーレジストリーまたは内部のコンテナーレジストリーにプッシュします。コンテナーレジストリーは、イメージをデプロイする OpenShift インスタンスで利用できる必要があります。以下のいずれかのコマンドを実行します。
podman push <myregistry.io>/debezium-container-for-sqlserver:latest
podman push <myregistry.io>/debezium-container-for-sqlserver:latestCopy to Clipboard Copied! Toggle word wrap Toggle overflow docker push <myregistry.io>/debezium-container-for-sqlserver:latest
docker push <myregistry.io>/debezium-container-for-sqlserver:latestCopy to Clipboard Copied! Toggle word wrap Toggle overflow 新しい Debezium SQL Server KafkaConnect カスタムリソース (CR) を作成します。たとえば、
annotationsおよびimageプロパティーを指定するdbz-connect.yamlという名前のKafkaConnectCR を作成します。以下の例は、KafkaConnectカスタムリソースを記述するdbz-connect.yamlファイルからの抜粋を示しています。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow Expand 項目 説明 1
KafkaConnectorリソースはこの Kafka Connect クラスターでコネクターを設定するために使用されることを、metadata.annotationsは Cluster Operator に示します。2
spec.imageは Debezium コネクターを実行するために作成したイメージの名前を指定します。設定された場合、このプロパティーによって Cluster Operator のSTRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE変数がオーバーライドされます。以下のコマンドを入力して、
KafkaConnectCR を OpenShift Kafka Connect 環境に適用します。oc create -f dbz-connect.yaml
oc create -f dbz-connect.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow このコマンドは、Debezium コネクターを実行するために作成したイメージの名前を指定する Kafka Connect インスタンスを追加します。
Debezium SQL Server コネクターインスタンスを設定する
KafkaConnectorカスタムリソースを作成します。通常、コネクターに使用できる設定プロパティーを使用して、
.yamlファイルに Debezium SQL Server コネクターを設定します。コネクター設定は、Debezium に対して、スキーマおよびテーブルのサブセットにイベントを生成するよう指示する可能性があり、または機密性の高い、大きすぎる、または不必要な指定のコラムで Debezium が値を無視、マスク、または切り捨てするようにプロパティーを設定する可能性もあります。以下の例では、ポート
1433で SQL サーバーホスト192.168.99.100に接続する Debezium コネクターを設定します。このホストには、testDBという名前のデータベース、customersという名前のテーブル、inventory-connector-sqlserverという論理名のサーバーがあります。SQL Server
inventory-connector.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow Expand 表2.181 コネクター設定の説明 項目 説明 1
Kafka Connect サービスに登録する場合のコネクターの名前。
2
この SQL Server コネクタークラスの名前。
3
SQL Server インスタンスのアドレス。
4
SQL Server インスタンスのポート番号。
5
SQL Server ユーザーの名前。
6
SQL Server ユーザーのパスワード。
7
namespace を形成する SQL Server インスタンス/クラスターのトピック接頭辞で、namespace を形成し、コネクターが書き込む Kafka トピックのすべての名前、Kafka Connect スキーマ名、および Avro コンバーター が使用される場合に対応する Avro スキーマの namespace に使用されます。
8
コネクターは
dbo.customersテーブルからの変更のみをキャプチャーします。9
DDL ステートメントをデータベーススキーマ履歴トピックに書き込み、復元するためにコネクターによって使用される Kafka ブローカーのリスト。
10
コネクターが DDL ステートメントを書き、復元するデータベーススキーマ履歴トピックの名前。このトピックは内部使用のみを目的としており、コンシューマーが使用しないようにしてください。
11
サーバーの署名者証明書を格納する SSL トラストストアへのパス。データベースの暗号化が無効になっている場合 (
database.encrypt=false) を除き、このプロパティーは必須です。12
SSL トラストストアのパスワード。データベースの暗号化が無効になっている場合 (
database.encrypt=false) を除き、このプロパティーは必須です。Kafka Connect でコネクターインスタンスを作成します。たとえば、
KafkaConnectorリソースをinventory-connector.yamlファイルに保存した場合は、以下のコマンドを実行します。oc apply -f inventory-connector.yaml
oc apply -f inventory-connector.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow 上記のコマンドは
inventory-connectorを登録し、コネクターはKafkaConnectorCR に定義されているtestDBデータベースに対して実行を開始します。
Debezium SQL Server コネクターが実行していることの確認
コネクターがエラーなしで正常に起動すると、コネクターがキャプチャーするように設定された各テーブルのトピックが作成されます。ダウンストリームアプリケーションは、これらのトピックをサブスクライブして、ソースデータベースで発生する情報イベントを取得できます。
コネクターが実行されていることを確認するには、OpenShift Container Platform Web コンソールまたは OpenShift CLI ツール (oc) から以下の操作を実行します。
- コネクターのステータスを確認します。
- コネクターがトピックを生成していることを確認します。
- 各テーブルの最初のスナップショットの実行中にコネクターが生成する読み取り操作 ("op":"r") のイベントがトピックに反映されていることを確認します。
前提条件
- Debezium コネクターが Streams for Apache Kafka on OpenShift にデプロイされている。
-
OpenShift
ocCLI クライアントがインストールされている。 - OpenShift Container Platform Web コンソールにアクセスできる。
手順
以下の方法のいずれかを使用して
KafkaConnectorリソースのステータスを確認します。OpenShift Container Platform Web コンソールから以下を実行します。
-
Home
Search に移動します。 -
Search ページで Resources をクリックし、Select Resource ボックスを開き、
KafkaConnectorを入力します。 - KafkaConnectors リストから確認するコネクターの名前をクリックします (例: inventory-connector-sqlserver)。
- Conditions セクションで、Type および Status 列の値が Ready および True に設定されていることを確認します。
-
Home
ターミナルウィンドウから以下を実行します。
以下のコマンドを実行します。
oc describe KafkaConnector <connector-name> -n <project>
oc describe KafkaConnector <connector-name> -n <project>Copy to Clipboard Copied! Toggle word wrap Toggle overflow 以下に例を示します。
oc describe KafkaConnector inventory-connector-sqlserver -n debezium
oc describe KafkaConnector inventory-connector-sqlserver -n debeziumCopy to Clipboard Copied! Toggle word wrap Toggle overflow このコマンドは、以下の出力のようなステータス情報を返します。
例2.52
KafkaConnectorリソースのステータスCopy to Clipboard Copied! Toggle word wrap Toggle overflow
コネクターによって Kafka トピックが作成されたことを確認します。
OpenShift Container Platform Web コンソールから以下を実行します。
-
Home
Search に移動します。 -
Search ページで Resources をクリックし、Select Resource ボックスを開き、
KafkaTopicを入力します。 -
KafkaTopics リストから確認するトピックの名前をクリックします (例:
inventory-connector-sqlserver.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d)。 - Conditions セクションで、Type および Status 列の値が Ready および True に設定されていることを確認します。
-
Home
ターミナルウィンドウから以下を実行します。
以下のコマンドを実行します。
oc get kafkatopics
oc get kafkatopicsCopy to Clipboard Copied! Toggle word wrap Toggle overflow このコマンドは、以下の出力のようなステータス情報を返します。
例2.53
KafkaTopicリソースのステータスCopy to Clipboard Copied! Toggle word wrap Toggle overflow
トピックの内容を確認します。
- ターミナルウィンドウから、以下のコマンドを入力します。
oc exec -n <project> -it <kafka-cluster> -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=<topic-name>
oc exec -n <project> -it <kafka-cluster> -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=<topic-name>Copy to Clipboard Copied! Toggle word wrap Toggle overflow 以下に例を示します。
oc exec -n debezium -it debezium-kafka-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=inventory-connector-sqlserver.inventory.products_on_hand
oc exec -n debezium -it debezium-kafka-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=inventory-connector-sqlserver.inventory.products_on_handCopy to Clipboard Copied! Toggle word wrap Toggle overflow トピック名を指定する形式は、手順 1 で返された
oc describeコマンドと同じです (例:inventory-connector-sqlserver.inventory.addresses)。トピックの各イベントについて、このコマンドは、以下の出力のような情報を返します。
例2.54 Debezium 変更イベントの内容
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"inventory-connector-sqlserver.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-sqlserver.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-sqlserver.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"int64","optional":false,"field":"ts_us"},{"type":"int64","optional":false,"field":"ts_ns"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.sqlserver.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"inventory-connector-sqlserver.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"3.0.8.Final-redhat-00004","connector":"sqlserver","name":"inventory-connector-sqlserver","ts_ms":1638985247805,"ts_us":1638985247805000000,"ts_ns":1638985247805000000,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"sqlserver-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"ts_us":1638985247805102,"ts_ns":1638985247805102588,"transaction":null}}{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"inventory-connector-sqlserver.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-sqlserver.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-sqlserver.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"int64","optional":false,"field":"ts_us"},{"type":"int64","optional":false,"field":"ts_ns"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.sqlserver.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"inventory-connector-sqlserver.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"3.0.8.Final-redhat-00004","connector":"sqlserver","name":"inventory-connector-sqlserver","ts_ms":1638985247805,"ts_us":1638985247805000000,"ts_ns":1638985247805000000,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"sqlserver-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"ts_us":1638985247805102,"ts_ns":1638985247805102588,"transaction":null}}Copy to Clipboard Copied! Toggle word wrap Toggle overflow 上記の例では、
payload値は、コネクタースナップショットがテーブルinventory.products_on_handから読み込み ("op" ="r") イベントを生成したことを示しています。product_idレコードの"before"状態はnullであり、レコードに以前の値が存在しないことを示しています。"after"状態は、product_id101を持つ項目のquantityが3であることを示しています。
Debezium SQL Server コネクターに設定できる設定プロパティーの完全リストは SQL Server コネクタープロパティー を参照してください。
結果
コネクターが起動すると、コネクターが設定された SQL Server データベースの 整合性スナップショットが実行 されます。その後、コネクターは行レベルの操作のデータ変更イベントの生成を開始し、変更イベントレコードを Kafka トピックにストリーミングします。
2.7.4.4. Debezium SQL Server コネクター設定プロパティーの説明 リンクのコピーリンクがクリップボードにコピーされました!
Debezium SQL Server コネクターには、アプリケーションに適したコネクター動作を実現するために使用できる設定プロパティーが多数あります。多くのプロパティーにはデフォルト値があります。
プロパティーに関する情報は、以下のように設定されています。
- 必要なコネクター設定プロパティー
- 高度なコネクター設定プロパティー
- Debezium がデータベース履歴トピックから読み取るイベントを処理する方法を制御する データベース履歴コネクター設定プロパティー。
Debezium SQL Server コネクター設定プロパティー (必須)
以下の設定プロパティーは、デフォルト値がない場合は 必須 です。
| プロパティー | デフォルト | 説明 |
|---|---|---|
| デフォルトなし | コネクターの一意名。同じ名前で再登録を試みると失敗します。(このプロパティーはすべての Kafka Connect コネクターに必要です) | |
| デフォルトなし |
コネクターの Java クラスの名前。SQL Server コネクターには、常に | |
|
| コネクターがデータベースインスタンスからデータをキャプチャーするために使用できるタスクの最大数を指定します。 | |
| デフォルトなし | SQL Server データベースサーバーの IP アドレスまたはホスト名。 | |
|
|
SQL Server データベースサーバーのポート番号 (整数)。 | |
| デフォルトなし | SQL Server データベースサーバーへの接続時に使用するユーザー名。Kerberos 認証を使用する場合は省略可能で、パススループロパティー を使用して設定することができます。 | |
| デフォルトなし | SQL Server データベースサーバーへの接続時に使用するパスワード。 | |
| デフォルトなし |
SQL Server の名前付きインスタンス のインスタンス名を指定します。 | |
| デフォルトなし |
Debezium にキャプチャーさせる SQL Server データベースサーバーの名前空間を提供するトピック接頭辞。接頭辞は、他のコネクター全体で一意となる必要があります。これは、このコネクターからレコードを受信するすべての Kafka トピックの接頭辞として使用されるためです。データベースサーバーの論理名には英数字とハイフン、ドット、アンダースコアのみを使用する必要があります。 警告 このプロパティーの値を変更しないでください。名前の値を変更すると、再起動後に、元のトピックにイベントを発行し続けるのではなく、新しい値に基づいた名前のトピックに後続のイベントを発行します。また、コネクターはデータベーススキーマ履歴トピックを復元できません。 | |
| デフォルトなし |
変更をキャプチャーする 対象とする スキーマの名前と一致する正規表現のコンマ区切りリスト (任意)。
スキーマの名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定した式は、スキーマ名に存在する可能性のある部分文字列とは一致しない、スキーマの名前文字列全体と照合されます。 | |
| デフォルトなし |
変更をキャプチャーする 対象としない スキーマの名前と一致する正規表現のコンマ区切りリスト (任意)。システムスキーマ以外で、
スキーマの名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定した式は、スキーマ名に存在する可能性のある部分文字列とは一致しない、スキーマの名前文字列全体と照合されます。 | |
| デフォルトなし |
Debezium にキャプチャーさせるテーブルの完全修飾テーブル識別子に一致する、コンマ区切りの正規表現のリスト (任意)。デフォルトでは、コネクターは指定のスキーマのシステム以外のテーブルをすべてキャプチャーします。このプロパティーが設定されている場合、コネクターは指定されたテーブルからのみ変更をキャプチャします。各識別子の形式は schemaName.tableName です。
テーブルの名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、テーブル名に存在する可能性のある部分文字列とは一致しない、テーブルの名前文字列全体と照合されます。 | |
| デフォルトなし |
キャプチャーから除外するテーブルの完全修飾テーブル識別子に一致するコンマ区切りの正規表現のリスト (任意)。Debezium は
テーブルの名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、テーブル名に存在する可能性のある部分文字列とは一致しない、テーブルの名前文字列全体と照合されます。 | |
| 空の文字列 |
変更イベントメッセージの値に含まれる必要がある列の完全修飾名と一致する正規表現のコンマ区切りリスト (任意)。列の完全修飾名の形式は schemaName.tableName.columnName です。 注記 Debezium がテーブルに対して発行する各変更イベントレコードには、テーブルのプライマリーキーまたは一意のキーの各列のフィールドを含むイベントキーが含まれます。このプロパティーを設定する場合は、キャプチャーされたテーブルのプライマリーキー列を明示的にリストして、イベントキーが正しく生成されるようにします。
列の名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、列名に存在する可能性のある部分文字列とは一致しない、列の名前文字列全体と照合されます。 | |
| 空の文字列 |
変更イベントメッセージの値から除外される必要がある列の完全修飾名と一致する正規表現のコンマ区切りリスト (任意)。列の完全修飾名の形式は schemaName.tableName.columnName です。プライマリーキー列は、値から除外される場合でもイベントのキーに常に含まれることに注意してください。
列の名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、列名に存在する可能性のある部分文字列とは一致しない、列の名前文字列全体と照合されます。 | |
|
|
含まれる列に変更がない場合にメッセージの公開をスキップするかどうかを指定します。これは基本的に、含まれる列に変更がない場合、 | |
|
| 該当なし |
文字ベースの列の完全修飾名と一致する正規表現のコンマ区切りリスト (任意)。列の完全修飾名の形式は `<schemaName>.<tableName>.<columnName>` です。
仮名は、指定された hashAlgorithm と salt を適用すると得られるハッシュ化された値で構成されます。使用されるハッシュ関数に基づいて、参照整合性は保持され、列値は仮名に置き換えられます。サポートされるハッシュ関数は、Java Cryptography Architecture Standard Algorithm Name Documentation の MessageDigest セクション に記載されています。 column.mask.hash.SHA-256.with.salt.CzQMA0cB5K = inventory.orders.customerName, inventory.shipment.customerName
必要な場合は、仮名は自動的に列の長さに短縮されます。コネクター設定には、異なるハッシュアルゴリズムと salt を指定する複数のプロパティーを含めることができます。 |
|
|
時間、日付、およびタイムスタンプは、異なる精度の種類で表すことができます。 | |
|
|
コネクターによる | |
|
| コネクターがデータベーススキーマの変更をトピック接頭辞と同じ名前の Kafka トピックに公開するかどうかを指定するブール値。コネクターは、データベース名が含まれるキーを持つ各スキーマの変更と、スキーマ更新を記述する JSON 構造の値を記録します。スキーマの変更を記録するこのメカニズムは、データベーススキーマ履歴への変更のコネクターの内部記録とは独立しています。 | |
|
|
delete イベントの後に廃棄 (tombstone) イベントが続くかどうかを制御します。 | |
| 該当なし |
文字ベースの列の完全修飾名と一致する正規表現のコンマ区切りリスト (任意)。プロパティー名の 長さ で指定された文字数を超えた場合に、一連の列のデータを切り捨てる場合は、このプロパティーを設定します。
列の完全修飾名は、 単一の設定で、異なる長さを持つ複数のプロパティーを指定できます。 | |
| n/a 列の完全修飾名の形式は schemaName.tableName.columnName です。 |
文字ベースの列の完全修飾名と一致する正規表現のコンマ区切りリスト (任意)。一連の列の値をコネクターでマスクする場合 (たとえば、列に機密データが含まれている場合) は、このプロパティーを設定します。 列の完全修飾名は、次の形式に従います: schemaName.tableName.columnName列の名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、列の名前文字列全体に対して照合されます。式は、列名に存在する可能性のある部分文字列と一致しません。 単一の設定で、異なる長さを持つ複数のプロパティーを指定できます。 | |
| 該当なし | 列のメタデータを表す追加パラメーターをコネクターに発行させたい列の完全修飾名に一致する、オプションのコンマ区切りの正規表現のリスト。このプロパティーが設定されている場合、コネクターは次のフィールドをイベントレコードのスキーマに追加します。
これらのパラメーターは、列の元の型名と長さ (可変幅型の場合) をそれぞれ伝達します。
列の完全修飾名は、次の形式に従います: schemaName.tableName.columnName. | |
| 該当なし | データベース内の列に対して定義されているデータ型の完全修飾名を指定する正規表現のオプションのコンマ区切りリスト。このプロパティーが設定されている場合、データ型が一致する列に対して、コネクターはスキーマに次の追加フィールドを含むイベントレコードを発行します。
これらのパラメーターは、列の元の型名と長さ (可変幅型の場合) をそれぞれ伝達します。
列の完全修飾名は、schemaName.tableName.typeName の形式に従います。 SQL Server 固有のデータ型名の一覧は、SQL Server データ型マッピング を参照してください。 | |
| 該当なし | 指定のテーブルの Kafka トピックに公開する変更イベントレコードのカスタムメッセージキーを形成するためにコネクターが使用する列を指定する式のリスト。
デフォルトでは、Debezium はテーブルのプライマリーキー列を、出力するレコードのメッセージキーとして使用します。デフォルトの代わりに、またはプライマリーキーのないテーブルのキーを指定するには、1 つ以上の列をもとにカスタムメッセージキーを設定できます。
各完全修飾テーブル名は、 カスタムメッセージキーの作成に使用する列の数に制限はありません。ただし、一意の鍵を指定するために必要な最小数を使用することが推奨されます。 | |
| bytes |
バイナリー ( | |
| none |
コネクターで使用されるメッセージコンバータとの互換性のために、スキーマ名をどのように調整するかを指定します。設定可能:
| |
| none |
コネクターで使用されるメッセージコンバータとの互換性のために、フィールド名をどのように調整するかを指定します。設定可能:
詳しい情報は、Avro naming を参照してください。 |
高度な SQL Server コネクター設定プロパティー
以下の 高度な 設定プロパティーには、ほとんどの状況で機能する適切なデフォルト設定があるため、コネクターの設定で指定する必要はほとんどありません。
| プロパティー | デフォルト | 説明 |
|---|---|---|
| デフォルトなし |
コネクターが使用できる カスタムコンバーター インスタンスのシンボリック名のコンマ区切りリストを列挙します。以下に例を示します。
コネクターがカスタムコンバーターを使用できるようにするには、
コネクターに設定するコンバーターごとに、コンバーターインターフェイスを実装するクラスの完全修飾名を指定する
以下に例を示します。 isbn.type: io.debezium.test.IsbnConverter
設定されたコンバータの動作をさらに制御したい場合は、1 つ以上の設定パラメーターを追加して、コンバータに値を渡すことができます。追加の設定パラメーターとコンバーターを関連付けるには、パラメーター名の前にコンバーターのシンボリック名を付けます。以下に例を示します。 isbn.schema.name: io.debezium.sqlserver.type.Isbn
| |
| Initial | キャプチャーされたテーブルの構造 (および必要に応じてデータ) の最初のスナップショットを作成するモード。スナップショットが完了すると、コネクターはデータベースのやり直し (redo) ログから変更イベントの読み取りを続行します。以下の値を使用できます。
| |
| exclusive | コネクターがテーブルロックを保持するかどうか、また保持する時間をコントロールします。テーブルロックは、コネクターがスナップショットを実行している間、特定の種類の変更テーブル操作が発生するのを防ぎます。以下の値を設定できます。
| |
|
|
スナップショットを実行するときにコネクターがデータをクエリーする方法を指定します。
この設定により、 | |
|
|
スナップショットに含めるテーブルの完全修飾名 ( テーブルの名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、テーブル名に存在する可能性のある部分文字列とは一致しない、テーブルの名前文字列全体と照合されます。 | |
| repeatable_read | 使用されるトランザクション分離レベルと、キャプチャー用に指定されたテーブルをコネクターがロックする期間を制御するモード。以下の値を使用できます。
モードの選択は、データの整合性にも影響します。 | |
|
|
イベントの処理中にコネクターが例外に対応する方法を指定します。 | |
|
|
コネクターが新しい変更イベントについてデータベースに確認する前に、コネクターが待機する時間 (ミリ秒) を指定する正の整数値です。
この設定によりハートビートの送信が遅延しないようにするには、 | |
|
|
ブロッキングキューが保持できるレコードの最大数を指定する正の整数値。Debezium はデータベースからストリームされたイベントを読み込む際、Kafka に書き込む前にブロッキングキューにイベントを配置します。ブロッキングキューは、コネクターが Kafka に書き込むよりも速くメッセージを取り込む場合、または Kafka が利用できなくなった場合に、データベースから変更イベントを読み込むためのバックプレッシャーを提供することができます。コネクターがオフセットを定期的に記録すると、キューに保持されるイベントは無視されます。 | |
|
|
ブロッキングキューの最大容量をバイト単位で指定する長整数値。デフォルトでは、ブロックキューにはボリューム制限は指定されません。キューが使用できるバイト数を指定するには、このプロパティーを正の long 値に設定します。 | |
|
| このコネクターの反復処理中に処理される必要があるイベントの各バッチの最大サイズを指定する正の整数値。 | |
|
|
データベースに変更が発生するかどうかに関係なく、コネクターが Kafka ハートビートトピックにメッセージを送信する頻度を決定する間隔をミリ秒単位で指定します。 注記
ハートビートは、ポーリングサイクル中にのみ発行されます。つまり、Debezium 環境では、ハートビートメッセージの実際の送信間隔は、 | |
| デフォルトなし |
コネクターがハートビートメッセージを送信するときにコネクターがソースデータベースで実行するクエリーを指定します。 | |
| デフォルトなし |
コネクターの起動後、スナップショットを取得するまで待機する間隔 (ミリ秒単位)。 | |
| 0 |
コネクターがスナップショットを完了した後、ストリーミングプロセスの開始を遅延する時間をミリ秒単位で指定します。遅延間隔を設定すると、スナップショットが完了した直後で、ストリーミングプロセスの開始前に障害が発生した場合に、コネクターがスナップショットを再開できないようにします。Kafka Connect ワーカーに設定されている | |
|
| スナップショットの実行中に各テーブルから 1 度に読み取る必要がある行の最大数を指定します。コネクターは、このサイズの複数のバッチでテーブルの内容を読み取ります。デフォルトは 2000 です。 | |
| デフォルトなし | 指定のクエリーのデータベースのラウンドトリップごとにフェッチされる行の数を指定します。デフォルトは、JDBC ドライバーのデフォルトのフェッチサイズです。 | |
|
|
スナップショットの実行時に、テーブルロックを取得するまで待つ最大時間 (ミリ秒単位) を指定する整数値。この時間間隔でテーブルロックを取得できない場合、スナップショットは失敗します (スナップショット も参照してください)。 | |
| デフォルトなし | スナップショットに追加するテーブル行を指定します。スナップショットにテーブルの行のサブセットのみを含める場合は、プロパティーを使用します。このプロパティーはスナップショットにのみ影響します。コネクターがログから読み取るイベントには影響しません。
プロパティーには、
スナップショットにソフト削除以外のレコードのみを含める場合は、soft-delete 列 ( "snapshot.select.statement.overrides": "customer.orders", "snapshot.select.statement.overrides.customer.orders": "SELECT * FROM customers.orders WHERE delete_flag = 0 ORDER BY id DESC"
作成されるスナップショットでは、コネクターには | |
|
|
| |
| 10000 (10 秒) | 再試行可能なエラーが発生した後にコネクターを再起動するまで待機する時間 (ミリ秒単位)。 | |
|
| ストリーミング中にコネクターがスキップする操作タイプをコンマで区切ったリスト。以下のタイプの操作をスキップするようにコネクターを設定することができます。
コネクターに操作をスキップしてほしくない場合は、値を | |
| デフォルト値なし |
シグナルをコネクターに送信するために使用されるデータコレクションの完全修飾名 。 | |
| source | コネクターに対して有効な信号チャネル名のリスト。デフォルトでは、以下のチャネルが利用可能です。
| |
| デフォルトなし | コネクターに対して有効になっている通知チャネル名のリスト。デフォルトでは、以下のチャネルが利用可能です。
| |
|
|
増分スナップショット時のスキーマの変更を許可します。有効にすると、コネクターは増分スナップショットの実行中にスキーマの変更を検出し、ロック DDL を回避するために現在のチャンクを再選択します。 | |
|
| 増分スナップショットのチャンクの実行中にコネクターがメモリーを取得して読み取る行の最大数。スナップショットは、サイズが大きいスナップショットの場合にはクエリーが少なくなるため、チャンクサイズを増やすと効率が上がります。ただし、チャンクサイズが大きい場合には、スナップショットデータのバッファーにより多くのメモリーが必要になります。チャンクサイズは、環境で最適なパフォーマンスを発揮できる値に、調整します。 | |
|
|
増分スナップショットによってキャプチャーされ、ストリーミングの再開後に再キャプチャーされる可能性のあるイベントを重複排除するために、コネクターが増分スナップショット中に使用するウォーターマークメカニズムを指定します。
| |
| 500 |
データベースの複数のテーブルからの変更をストリーミングする際に、メモリーの使用量を削減するために使用する、反復ごとの最大トランザクション数を指定します。 | |
|
| 増分スナップショット時に使用するすべての SELECT ステートメントに OPTION(RECOMPILE) クエリーオプションを使用します。これは、発生しうるパラメータースニッフィング問題を解決するのに役立ちますが、クエリーの実行頻度によっては、ソースデータベースの CPU 負荷が増加する可能性があります。 | |
|
|
データ変更、スキーマ変更、トランザクション、ハートビートイベントなどのトピック名を決定するために使用する TopicNamingStrategy クラスの名前。デフォルトは | |
|
|
トピック名の区切り文字を指定します。デフォルトは | |
|
| トピック名を保持するために使用されるサイズ (bounded concurrent hash map)。このキャッシュは、与えられたデータコレクションに対応するトピック名を決定するのに役立つ。 | |
|
|
コネクターがハートビートメッセージを送信するトピックの名前を制御します。トピック名のパターンは、 | |
|
|
コネクターがトランザクションのメタデータメッセージを送信するトピックの名前を制御します。トピック名のパターンは、 詳細は、トランザクションメタデータ を参照してください。 | |
|
|
初期スナップショットを実行するときにコネクターが使用するスレッドの数を指定します。並列初期スナップショットを有効にするには、プロパティーを 1 より大きい値に設定します。並列初期スナップショットでは、コネクターは複数のテーブルを同時に処理します。 注記
並行初期スナップショットを有効にすると、各テーブルのスナップショットを実行するスレッドの処理完了時間がそれぞれ異なる場合があります。あるテーブルのスナップショットが、他のテーブルよりも完了するまでに大幅に時間がかかる場合、作業を終えたスレッドは待機状態になります。一部の環境では、ロードバランサーやファイアウォールなどのネットワークデバイスは、長時間アイドル状態のままの接続を終了します。スナップショットが完了すると、コネクターがすべてのスナップショットデータを正常に送信した場合でも、コネクターは接続を閉じることができず、例外となり、スナップショットが不完全になります。 | |
|
|
コンテキスト情報を提供するメタデータを追加して、MBean オブジェクト名をカスタマイズするタグを定義します。キーと値のペアのコンマ区切りリストを指定します。各キーは MBean オブジェクト名のタグを表し、対応する値はキーの値を表します。たとえば、 コネクターは、指定されたタグを基本 MBean オブジェクト名に追加します。タグは、メトリクスデータを整理および分類するのに役立ちます。特定のアプリケーションインスタンス、環境、リージョン、バージョンなどを識別するためのタグを定義できます。詳細は、カスタマイズされた MBean 名 を参照してください。 | |
|
|
接続エラーなど、再試行可能なエラーが発生する操作の後に、コネクターがどのように応答するかを指定します。
| |
|
| コネクターが CDC データを照会する方法を制御します。以下のモードがサポートされます。
| |
|
|
コネクターがクエリーの完了を待機する時間をミリ秒単位で指定します。タイムアウト制限を削除するには、値を |
Debezium SQL Server コネクターデータベーススキーマ履歴の設定プロパティー
Debezium には、コネクターがスキーマ履歴トピックと対話する方法を制御する schema.history.internal.* プロパティーのセットが含まれています。
以下の表は、Debezium コネクターを設定するための schema.history.internal プロパティーを説明しています。
| プロパティー | デフォルト | 説明 |
|---|---|---|
| デフォルトなし | コネクターがデータベーススキーマの履歴を保存する Kafka トピックの完全名。 | |
| デフォルトなし | Kafka クラスターへの最初の接続を確立するためにコネクターが使用するホストとポートのペアのリスト。このコネクションは、コネクターによって以前に保存されたデータベーススキーマ履歴の取得や、ソースデータベースから読み取られる各 DDL ステートメントの書き込みに使用されます。各ペアは、Kafka Connect プロセスによって使用される同じ Kafka クラスターを示す必要があります。 | |
|
| 永続化されたデータのポーリングが行われている間にコネクターが起動/回復を待つ最大時間 (ミリ秒単位) を指定する整数値。デフォルトは 100 ミリ秒です。 | |
|
| Kafka 管理クライアントを使用してクラスター情報を取得する際に、コネクターが待機すべき最大ミリ秒数を指定する整数値です。 | |
|
| Kafka 管理クライアントを使用して kafka 履歴トピックを作成する間、コネクターが待機する最大ミリ秒数を指定する整数値。 | |
|
|
エラーでコネクターのリカバリーが失敗する前に、コネクターが永続化された履歴データの読み取りを試行する最大回数。データが受信されなかった場合に最大待機する時間は、 | |
|
|
コネクターが不正または不明なデータベースのステートメントを無視するかどうか、または人が問題を修正するために処理を停止するかどうかを指定するブール値。安全なデフォルトは | |
|
|
コネクターがスキーマまたはデータベース内のすべてのテーブルからスキーマ構造を記録するか、キャプチャー対象に指定されたテーブルのみからスキーマ構造を記録するかを指定するブール値。
| |
|
|
コネクターがデータベースインスタンス内のすべての論理データベースのスキーマ構造を記録するかどうかを指定するブール値。
|
パススルー SQL Server コネクターの設定プロパティー
コネクターは pass-through プロパティーをサポートしており、これにより Debezium は Apache Kafka プロデューサーとコンシューマーの動作を微調整するためのカスタム設定オプションを指定できます。Kafka プロデューサーとコンシューマーの全設定プロパティーの詳細は、Kafka ドキュメント を参照してください。
プロデューサーとコンシューマーのクライアントがスキーマ履歴トピックと対話する方法を設定するための Pass-through プロパティー
Debezium は、データベーススキーマ履歴トピックへのスキーマ変更を記述するために Apache Kafka プロデューサーに依存しています。同様に、コネクターが起動すると、データベーススキーマ履歴トピックから読み取る Kafka コンシューマーに依存します。schema.history.internal.producer.* および schema.history.internal.consumer.* 接頭辞で始まるパススルー設定プロパティーのセットに値を割り当てて、Kafka プロデューサーおよびコンシューマークライアントの設定を定義します。パススループロデューサーおよびコンシューマーデータベーススキーマ履歴プロパティーは、以下の例のように Kafka ブローカーとのこれらのクライアントの接続をセキュアにする方法など、さまざまな動作を制御します。
Debezium は、プロパティーを Kafka クライアントに渡す前に、プロパティー名から接頭辞を削除します。
Kafka プロデューサー設定プロパティー と Kafka コンシューマー設定プロパティー の詳細は、Apache Kafka ドキュメントを参照してください。
SQL Server コネクターが Kafka シグナリングトピックと対話する方法を設定するためのパススループロパティー
Debezium は、コネクターが Kafka シグナルトピックと対話する方法を制御する signal.* プロパティーのセットを提供します。
以下の表は、Kafka signal プロパティーを説明しています。
| プロパティー | デフォルト | 説明 |
|---|---|---|
| <topic.prefix>-signal | コネクターがアドホックシグナルについて監視する Kafka トピックの名前。 注記 トピックの自動作成 が無効になっている場合は、必要なシグナリングトピックを手動で作成する必要があります。シグナルの順序を維持するには、シグナリングトピックが必要です。シグナリングトピックには単一のパーティションが必要です。 | |
| kafka-signal | Kafka コンシューマーによって使用されるグループ ID の名前。 | |
| デフォルトなし | コネクターが Kafka クラスターへの初期接続を確立するために使用するホストとポートのペアのリスト。各ペアは、Debezium Kafka Connect プロセスによって使用される Kafka クラスターを参照します。 | |
|
| コネクターが信号をポーリングするときに待機する最大ミリ秒数を指定する整数値。 |
シグナリングチャネルの Kafka コンシューマークライアントを設定するためのパススループロパティー
Debezium コネクターでは、Kafka コンシューマーのパススルー設定が可能です。パススルーシグナルのプロパティーは、接頭辞 signals.consumer.* で始まります。たとえば、コネクターは signal.consumer.security.protocol=SSL などのプロパティーを Kafka コンシューマーに渡します。
Debezium は、プロパティーを Kafka シグナルコンシューマーに渡す前に、プロパティーから接頭辞を削除します。
SQL Server コネクター sink notification チャネルを設定するためのパススループロパティー
次の表では、Debezium sink notification チャネルの設定に使用できるプロパティーについて説明します。
| プロパティー | デフォルト | 説明 |
|---|---|---|
| デフォルトなし |
Debezium から通知を受信するトピックの名前。このプロパティーは、有効な通知チャネルの 1 つとして |
Debezium コネクターのパススルーデータベースドライバー設定プロパティー
Debezium コネクターでは、データベースドライバーのパススルー設定が可能です。パススルーデータベースプロパティーは接頭辞 driver.* で始まります。たとえば、コネクターは driver.foobar=false などのプロパティーを JDBC URL に渡します。
Debezium は、プロパティーをデータベースドライバーに渡す前に、プロパティーから接頭辞を削除します。
2.7.5. スキーマ変更後のキャプチャーテーブルの更新 リンクのコピーリンクがクリップボードにコピーされました!
SQL Server テーブルに対して変更データキャプチャーが有効になっている場合、テーブルで変更が行われると、イベントレコードがサーバーのキャプチャーテーブルに永続化されます。たとえば、新しい列を追加するなどして、ソーステーブル変更の構造に変更を加えても、その変更は変更テーブルに動的に反映されません。キャプチャーテーブルが古いスキーマを使用し続ける限り、Debezium コネクターはテーブルのデータ変更イベントを正しく出力できません。コネクターが変更イベントの処理を再開できるようにするには、介入してキャプチャーテーブルを更新する必要があります。
CDC を SQL Server に実装する方法により、Debezium を使用してキャプチャーテーブルを更新することはできません。キャプチャーテーブルを更新するには、1 つが昇格された権限を持つ SQL Server データベースオペレーターである必要があります。Debezium ユーザーとして、SQL Server データベース operator とタスクを調整して、スキーマの更新を完了し、Kafka トピックへのストリーミングを復元する必要があります。
以下の方法のいずれかを使用すると、スキーマの変更後にキャプチャーテーブルを更新できます。
- オフラインでスキーマ を更新するには、キャプチャーテーブルを更新する前に Debezium コネクターを停止する必要があります。
- オンラインスキーマの更新 は、Debezium コネクターの実行中にキャプチャーテーブルを更新できます。
各手順には長所と短所があります。
オンライン更新またはオフライン更新のどちらを使用する場合でも、同じソーステーブルに後続のスキーマ更新を適用する前に、スキーマ更新プロセス全体を完了する必要があります。手順を一度に実行できるようにするため、すべての DDL を 1 つのバッチで実行することがベストプラクティスとなります。
CDC が有効になっているソーステーブルでは、一部のスキーマの変更がサポートされていません。たとえば、CDC がテーブルで有効になっている場合、SQL Server で列の名前を変更したり、列型を変更すると、テーブルのスキーマを変更できません。
ソーステーブルの列を NULL から NOT NULL またはその逆に変更した後、SQL Server コネクターは新しいキャプチャーインスタンスが作成されるまで変更された情報を正しくキャプチャーできません。列指定の変更後に新しいキャプチャーテーブルを作成しないと、コネクターによって出力される変更イベントレコードは列が任意であるかどうかを正しく示しません。つまり、以前はオプション (または NULL) として定義されていた列が、現在は NOT NULL として定義されているにもかかわらず、引き続きオプションとして定義されているということです。同様に、必要 (NOT NULL) として定義された列は、NULL として定義されても、以前の指定が保持されます。
sp_rename 関数を使用してテーブルの名前を変更すると、コネクターが再起動されるまで、古いソーステーブル名で変更が発行され続けます。コネクターを再起動すると、新しいソーステーブル名で変更が発行されます。
2.7.5.1. スキーマの変更後のオフライン更新の実行 リンクのコピーリンクがクリップボードにコピーされました!
オフラインでスキーマを更新すると、最も安全にキャプチャーテーブルを更新できます。ただし、オフラインでの更新は、高可用性を必要とするアプリケーションでは使用できないことがあります。
前提条件
- CDC が有効になっている SQL Server テーブルのスキーマに更新がコミット済みである。
- 昇格された権限を持つ SQL Server データベース operator である。
手順
- データベースを更新するアプリケーションを一時停止します。
- Debezium コネクターがストリーミングされていない変更イベントレコードをすべてストリーミングするまで待ちます。
- Debezium コネクターを停止します。
- すべての変更をソーステーブルスキーマに適用します。
-
パラメーター
@capture_instanceの一意の値でsys.sp_cdc_enable_tableの手順を使用して、更新ソーステーブルの新しいキャプチャーテーブルを作成します。 - ステップ 1 で一時停止したアプリケーションを再開します。
- Debezium コネクターを起動します。
-
Debezium コネクターが新しいキャプチャーテーブルからストリーミングを開始したら、古いキャプチャーインスタンス名に設定されたパラメーター
@capture_instanceでストアドプロシージャーsys.sp_cdc_disable_tableを実行して、古いキャプチャーテーブルを削除します。
2.7.5.2. スキーマの変更後のオンライン更新の実行 リンクのコピーリンクがクリップボードにコピーされました!
オンラインでスキーマの更新を完了する手順は、オフラインでスキーマの更新を実行する手順よりも簡単です。また、アプリケーションやデータ処理のダウンタイムなしで完了できます。ただし、オンラインでスキーマを更新すると、ソースデータベースでスキーマを更新した後、新しいキャプチャーインスタンスを作成するまでに、処理の差が生じる可能性があります。この間、変更イベントは変更テーブルの古いインスタンスによって引き続きキャプチャーされ、古いテーブルに保存された変更データは、以前のスキーマの構造を保持します。たとえば、新しい列をソーステーブルに追加した場合は、新しいキャプチャーテーブルの準備が整う前に生成された変更イベントには新しい列のフィールドは含まれません。アプリケーションがこのような移行期間を許容しない場合、オフラインでスキーマの更新を行うことが推奨されます。
前提条件
- CDC が有効になっている SQL Server テーブルのスキーマに更新がコミット済みである。
- 昇格された権限を持つ SQL Server データベース operator である。
手順
- すべての変更をソーステーブルスキーマに適用します。
-
パラメーター
@capture_instanceに一意の値を指定してsys.sp_cdc_enable_tableストアドプロシージャーを実行し、更新元テーブルに新しいキャプチャテーブルを作成します。 -
Debezium が新しいキャプチャーテーブルからのストリーミングを開始したら、パラメーター
@capture_instanceに古いキャプチャーインスタンス名を設定して、sys.sp_cdc_disable_tableストアドプロシージャーを実行することで、古いキャプチャーテーブルを削除することができます。
例: データベーススキーマの変更後のオンラインスキーマ更新の実行
次の例は、customers ソーステーブルに phone_number 列が追加された後、change テーブルでオンラインスキーマ更新を完了する方法を示しています。
次のクエリーを実行して
customersソーステーブルのスキーマを変更し、phone_numberフィールドを追加します。ALTER TABLE customers ADD phone_number VARCHAR(32);
ALTER TABLE customers ADD phone_number VARCHAR(32);Copy to Clipboard Copied! Toggle word wrap Toggle overflow sys.sp_cdc_enable_tableストアドプロシージャーを実行して、新しいキャプチャーインスタンスを作成します。EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'customers', @role_name = NULL, @supports_net_changes = 0, @capture_instance = 'dbo_customers_v2'; GO
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'customers', @role_name = NULL, @supports_net_changes = 0, @capture_instance = 'dbo_customers_v2'; GOCopy to Clipboard Copied! Toggle word wrap Toggle overflow 次のクエリーを実行して、
customersテーブルに新しいデータを挿入します。INSERT INTO customers(first_name,last_name,email,phone_number) VALUES ('John','Doe','john.doe@example.com', '+1-555-123456'); GOINSERT INTO customers(first_name,last_name,email,phone_number) VALUES ('John','Doe','john.doe@example.com', '+1-555-123456'); GOCopy to Clipboard Copied! Toggle word wrap Toggle overflow Kafka Connect ログは、以下のメッセージのようなエントリーで設定の更新を報告します。
connect_1 | 2019-01-17 10:11:14,924 INFO || Multiple capture instances present for the same table: Capture instance "dbo_customers" [sourceTableId=testDB.dbo.customers, changeTableId=testDB.cdc.dbo_customers_CT, startLsn=00000024:00000d98:0036, changeTableObjectId=1525580473, stopLsn=00000025:00000ef8:0048] and Capture instance "dbo_customers_v2" [sourceTableId=testDB.dbo.customers, changeTableId=testDB.cdc.dbo_customers_v2_CT, startLsn=00000025:00000ef8:0048, changeTableObjectId=1749581271, stopLsn=NULL] [io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource] connect_1 | 2019-01-17 10:11:14,924 INFO || Schema will be changed for ChangeTable [captureInstance=dbo_customers_v2, sourceTableId=testDB.dbo.customers, changeTableId=testDB.cdc.dbo_customers_v2_CT, startLsn=00000025:00000ef8:0048, changeTableObjectId=1749581271, stopLsn=NULL] [io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource] ... connect_1 | 2019-01-17 10:11:33,719 INFO || Migrating schema to ChangeTable [captureInstance=dbo_customers_v2, sourceTableId=testDB.dbo.customers, changeTableId=testDB.cdc.dbo_customers_v2_CT, startLsn=00000025:00000ef8:0048, changeTableObjectId=1749581271, stopLsn=NULL] [io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource]
connect_1 | 2019-01-17 10:11:14,924 INFO || Multiple capture instances present for the same table: Capture instance "dbo_customers" [sourceTableId=testDB.dbo.customers, changeTableId=testDB.cdc.dbo_customers_CT, startLsn=00000024:00000d98:0036, changeTableObjectId=1525580473, stopLsn=00000025:00000ef8:0048] and Capture instance "dbo_customers_v2" [sourceTableId=testDB.dbo.customers, changeTableId=testDB.cdc.dbo_customers_v2_CT, startLsn=00000025:00000ef8:0048, changeTableObjectId=1749581271, stopLsn=NULL] [io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource] connect_1 | 2019-01-17 10:11:14,924 INFO || Schema will be changed for ChangeTable [captureInstance=dbo_customers_v2, sourceTableId=testDB.dbo.customers, changeTableId=testDB.cdc.dbo_customers_v2_CT, startLsn=00000025:00000ef8:0048, changeTableObjectId=1749581271, stopLsn=NULL] [io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource] ... connect_1 | 2019-01-17 10:11:33,719 INFO || Migrating schema to ChangeTable [captureInstance=dbo_customers_v2, sourceTableId=testDB.dbo.customers, changeTableId=testDB.cdc.dbo_customers_v2_CT, startLsn=00000025:00000ef8:0048, changeTableObjectId=1749581271, stopLsn=NULL] [io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource]Copy to Clipboard Copied! Toggle word wrap Toggle overflow 最終的には、
phone_numberフィールドがスキーマに追加され、その値が Kafka トピックに書き込まれたメッセージに表示されます。Copy to Clipboard Copied! Toggle word wrap Toggle overflow sys.sp_cdc_disable_tableストアドプロシージャーを実行して、古いキャプチャーインスタンスを削除します。EXEC sys.sp_cdc_disable_table @source_schema = 'dbo', @source_name = 'dbo_customers', @capture_instance = 'dbo_customers'; GO
EXEC sys.sp_cdc_disable_table @source_schema = 'dbo', @source_name = 'dbo_customers', @capture_instance = 'dbo_customers'; GOCopy to Clipboard Copied! Toggle word wrap Toggle overflow
2.7.6. Debezium SQL Server コネクターのパフォーマンスの監視 リンクのコピーリンクがクリップボードにコピーされました!
Debezium SQL Server コネクターは、Zookeeper、Kafka、および Kafka Connect によって提供される JMX メトリクスの組み込みサポートに加えて、3 種類のメトリクスを提供します。コネクターは以下のメトリクスを提供します。
- スナップショットの実行時にコネクターを監視するための、スナップショットメトリクス。
- CDC テーブルデータの読み取り時にコネクターを監視するための、ストリーミングメトリクス。
- コネクターのスキーマ履歴の状態を監視するための、スキーマ履歴メトリクス。
JMX 経由で前述のメトリクスを公開する方法については、Debezium の監視に関するドキュメント を参照してください。
2.7.6.1. SQL Server コネクタースナップショットおよびストリーミング MBean オブジェクトのカスタマイズされた名前 リンクのコピーリンクがクリップボードにコピーされました!
Debezium コネクターは、コネクターの MBean 名を介してメトリクスを公開します。これらのメトリクスは各コネクターインスタンスに固有であり、コネクターのスナップショット、ストリーミング、およびスキーマ履歴プロセスの動作に関するデータを提供します。
デフォルトでは、正しく設定されたコネクターをデプロイすると、Debezium はさまざまなコネクターメトリクスごとに一意の MBean 名を生成します。コネクタープロセスのメトリクスを表示するには、MBean を監視するように可観測性スタックを設定します。ただし、これらのデフォルトの MBean 名はコネクター設定に依存しており、設定の変更によって MBean 名が変更される場合があります。MBean 名を変更すると、コネクターインスタンスと MBean 間のリンクが切断され、監視アクティビティーが中断されます。このシナリオでは、監視を再開するには、新しい MBean 名を使用するように監視スタックを再設定する必要があります。
MBean 名の変更が原因で監視が中断されないように、カスタムメトリクスタグを設定できます。カスタムメトリクスを設定するには、コネクター設定に custom.metric.tags プロパティーを追加します。このプロパティーは、各キーが MBean オブジェクト名のタグを表し、対応する値がそのタグの値を表すキーと値のペアを受け入れます。たとえば、k1=v1,k2=v2 です。Debezium は、指定されたタグをコネクターの MBean 名に追加します。
コネクターの custom.metric.tags プロパティーを設定した後、指定されたタグに関連付けられたメトリクスを取得するように監視スタックを設定できます。可観測性スタックは、変更可能な MBean 名ではなく、指定されたタグを使用してコネクターを一意に識別します。その後、Debezium が MBean 名の構築方法を再定義したり、コネクター設定の topic.prefix が変更されたりしても、メトリクススクレイプタスクは指定されたタグパターンを使用してコネクターを識別するため、メトリクスの収集は中断されません。
カスタムタグを使用するさらなる利点は、データパイプラインのアーキテクチャーを反映するタグを使用できるため、運用上のニーズに合った方法でメトリクスを整理できることです。たとえば、コネクターアクティビティーのタイプ、アプリケーションコンテキスト、またはデータソースを宣言する値を持つタグを指定できます (例: db1-streaming-for-application-abc)。複数のキーと値のペアを指定すると、指定されたすべてのペアがコネクターの MBean 名に追加されます。
次の例は、タグがデフォルトの MBean 名を変更する方法を示しています。
例2.55 カスタムタグがコネクター MBean 名を変更する方法
デフォルトでは、SQL Server コネクターはストリーミングメトリクスに以下の MBean 名を使用します。
debezium.sqlserver:type=connector-metrics,context=streaming,server=<topic.prefix>
debezium.sqlserver:type=connector-metrics,context=streaming,server=<topic.prefix>
custom.metric.tags の値を database=salesdb-streaming,table=inventory に設定すると、Debezium は次のカスタム MBean 名を生成します。
debezium.sqlserver:type=connector-metrics,context=streaming,server=<topic.prefix>,database=salesdb-streaming,table=inventory
debezium.sqlserver:type=connector-metrics,context=streaming,server=<topic.prefix>,database=salesdb-streaming,table=inventory
2.7.6.2. Debezium SQL Server コネクターのスナップショットメトリクス リンクのコピーリンクがクリップボードにコピーされました!
MBean は debezium.sql_server:type=connector-metrics,server=<topic.prefix>,task=<task.id>,context=snapshot です。
スナップショット操作がアクティブでない場合や、最後のコネクターの起動後にスナップショットの作成が発生した場合に、スナップショットメトリクスは公開されません。
次の表に、使用可能なスナップショットメトリクスを示します。
| 属性 | タイプ | 説明 |
|---|---|---|
|
| コネクターが読み取りした最後のスナップショットイベント。 | |
|
| コネクターが最新のイベントを読み取りおよび処理してからの経過時間 (ミリ秒単位)。 | |
|
| 前回の開始またはリセット以降にコネクターで確認されたイベントの合計数。 | |
|
| コネクターに設定された include/exclude リストのフィルタリングルールによってフィルターされたイベントの数。 | |
|
| コネクターによって取得されるテーブルのリスト。 | |
|
| snapshotter とメインの Kafka Connect ループの間でイベントを渡すために使用されるキューの長さ。 | |
|
| snapshotter とメインの Kafka Connect ループの間でイベントを渡すために使用されるキューの空き容量。 | |
|
| スナップショットに含まれているテーブルの合計数。 | |
|
| スナップショットによってまだコピーされていないテーブルの数。 | |
|
| スナップショットが起動されたかどうか。 | |
|
| スナップショットが一時停止されたかどうか。 | |
|
| スナップショットが中断されたかどうか。 | |
|
| スナップショットが完了したかどうか。 | |
|
| スナップショットが完了したかどうかに関わらず、これまでスナップショットにかかった時間 (秒単位)。スナップショットが一時停止された時間も含まれます。 | |
|
| スナップショットが一時停止された合計秒数。スナップショットが数回一時停止された場合は、一時停止時間が加算されます。 | |
|
| スナップショットの各テーブルに対してスキャンされる行数が含まれるマップ。テーブルは、処理中に増分がマップに追加されます。スキャンされた 10,000 行ごとに、テーブルの完成時に更新されます。 | |
|
|
キューの最大バッファー (バイト単位)。このメトリクスは、 | |
|
| キュー内のレコードの現在の容量 (バイト単位)。 |
コネクターは、増分スナップショットの実行時に、以下の追加のスナップショットメトリクスも提供します。
2.7.6.3. Debezium SQL Server コネクターのストリーミングメトリクス リンクのコピーリンクがクリップボードにコピーされました!
MBean は debezium.sql_server:type=connector-metrics,server=<topic.prefix>,task=<task.id>,context=streaming です。
以下の表は、利用可能なストリーミングメトリクスのリストです。
| 属性 | タイプ | 説明 |
|---|---|---|
|
| コネクターが読み取られた最後のストリーミングイベント。 | |
|
| コネクターが最新のイベントを読み取りおよび処理してからの経過時間 (ミリ秒単位)。 | |
|
| コネクターを最後に起動してから、またはメトリクスをリセットしてから、ソースデータベースによって報告されたデータ変更イベントの合計数。Debezium が処理するデータ変更ワークロードを表します。 | |
|
| コネクターを最後に起動してから、またはメトリクスをリセットしてから、コネクターによって処理された作成イベントの合計数。 | |
|
| コネクターを最後に起動してから、またはメトリクスをリセットしてから、コネクターによって処理された更新イベントの合計数。 | |
|
| コネクターを最後に起動してから、またはメトリクスをリセットしてから、コネクターによって処理された削除イベントの合計数。 | |
|
| コネクターに設定された include/exclude リストのフィルタリングルールによってフィルターされたイベントの数。 | |
|
| コネクターによって取得されるテーブルのリスト。 | |
|
| ストリーマーとメイン Kafka Connect ループの間でイベントを渡すために使用されるキューの長さ。 | |
|
| ストリーマーとメインの Kafka Connect ループの間でイベントを渡すために使用されるキューの空き容量。 | |
|
| コネクターが現在データベースサーバーに接続されているかどうかを示すフラグ。 | |
|
| 最後の変更イベントのタイムスタンプとそれを処理するコネクターとの間の期間 (ミリ秒単位)。この値には、データベースサーバーとコネクターが実行されているマシンのクロックの差が組み込まれます。 | |
|
| コミットされた処理済みトランザクションの数。 | |
|
| 最後に受信したイベントの位置。 | |
|
| 最後に処理されたトランザクションのトランザクション識別子。 | |
|
|
キューの最大バッファー (バイト単位)。このメトリクスは、 | |
|
| キュー内のレコードの現在の容量 (バイト単位)。 |
2.7.6.4. Debezium SQL Server コネクターのスキーマ履歴メトリクス リンクのコピーリンクがクリップボードにコピーされました!
MBean は、debezium.sql_server:type=connector-metrics,context=schema-history,server=<topic.prefix>,task=<task.id> です。
以下の表は、利用可能なスキーマ履歴メトリクスのリストです。
| 属性 | タイプ | 説明 |
|---|---|---|
|
|
データベーススキーマ履歴の状態を示す | |
|
| リカバリーが開始された時点のエポック秒の時間。 | |
|
| リカバリーフェーズ中に読み取られた変更の数。 | |
|
| リカバリーおよびランタイム中に適用されるスキーマ変更の合計数。 | |
|
| 最後の変更が履歴ストアから復元された時点からの経過時間 (ミリ秒単位)。 | |
|
| 最後の変更が適用された時点からの経過時間 (ミリ秒単位)。 | |
|
| 履歴ストアから復元された最後の変更の文字列表現。 | |
|
| 最後に適用された変更の文字列表現。 |