3.2. Debezium Db2 コネクターの仕組み
Debezium Db2 コネクターを最適に設定および実行するには、コネクターによるスナップショットの実行方法、変更イベントのストリーム方法、Kafka トピック名の決定方法、およびスキーマ変更の処理方法を理解すると便利です。
詳細は以下を参照してください。
3.2.1. Debezium Db2 コネクターによるデータベーススナップショットの実行方法
Db2 のレプリケーション機能は、データベース変更の完全な履歴を保存するようには設計されていません。そのため、Debezium Db2 コネクターはログからデータベースの履歴全体を取得できません。コネクターがデータベースの現在の状態のベースラインを確立できるようにするには、コネクターの初回起動時に、キャプチャーボード のテーブルの最初の 整合性スナップショット を実行します。スナップショットが変更をキャプチャーするたびに、コネクターはキャプチャーされたテーブルの Kafka トピックに read
イベントを発行します。
スナップショットの詳細は、以下のセクションを参照してください。
Debezium Db2 コネクターが最初のスナップショットの実行に使用するデフォルトのワークフロー
以下のワークフローでは、Debezium がスナップショットを作成する手順を示しています。この手順では、snapshot.mode
設定プロパティーがデフォルト値 (initial)
に設定されている場合のスナップショットのプロセスを説明します。snapshot.mode
プロパティーの値を変更することで、コネクターがスナップショットを作成する方法をカスタマイズできます。別のスナップショットモードを設定する場合、コネクターはこのワークフローの変更バージョンを使用してスナップショットを完了します。
- データベースへの接続を確立します。
-
キャプチャーモードで、かつスナップショットに含める必要があるテーブルを決定します。デフォルトでは、コネクターはシステム以外のすべてのテーブルのデータをキャプチャーします。スナップショットが完了した後、コネクターは指定されたテーブルのデータをストリーミングし続けます。コネクターで特定のテーブルからのみデータをキャプチャーする場合は、
table.include.list
やtable.exclude.list
などのプロパティーを設定して、テーブルまたはテーブル要素のサブセットのみのデータをキャプチャーするようにコネクターに指示できます。 -
キャプチャーモードの各テーブルでロックを取得します。このロックを使用して、スナップショットが完了するまで、それらのテーブルでスキーマの変更が行われないようにします。ロックのレベルは、
snapshot.isolation.mode
コネクター設定プロパティーによって決定されます。 - サーバーのトランザクションログで、最上位 (最新) の LSN の位置を読み取ります。
すべてのテーブル、またはキャプチャー対象として指定されたすべてのテーブルのスキーマをキャプチャーします。コネクターは、内部データベースのスキーマ履歴トピックにスキーマ情報を保持します。スキーマ履歴は、変更イベントの発生時に有効な構造に関する情報を提供します。
注記デフォルトでは、コネクターは、キャプチャー用に設定されていないテーブルも含め、キャプチャーモードにあるデータベース内の全テーブルのスキーマをキャプチャーします。テーブルがキャプチャー用に設定されていない場合、最初のスナップショットはテーブルの構造のみをキャプチャーし、テーブルデータはキャプチャーされません。
初期スナップショットに含まれなかったテーブルのスキーマ情報がスナップショットに保持される理由の詳細は、初期スナップショットがすべてのテーブルのスキーマをキャプチャーする理由 を参照してください。
- 手順 3 で取得したロックをすべてリリースします。他のデータベースクライアントは、以前にロックされていたテーブルに書き込みできるようになります。
手順 4 で読み取った LSN 位置で、コネクターはキャプチャーするように指定されたテーブルをスキャンします。スキャン中に、コネクターは次のタスクを実行します。
- スナップショットが開始される前に、テーブルが作成されたことを確認します。スナップショットの開始後にテーブルが作成された場合、コネクターはテーブルをスキップします。スナップショットが完了し、コネクターがストリーミングに移行すると、スナップショットの開始後に作成されたテーブルに対して変更イベントが発行されます。
-
テーブルからキャプチャーされた行ごとに
read
イベントを生成します。すべてのread
イベントには、LSN の位置が含まれ、これは手順 4 で取得した LSN の位置と同じです。 -
ソーステーブルの Kafka トピックに各
read
イベントを出力します。 - 該当する場合は、データテーブルロックを解放します。
- コネクターオフセットにスナップショットの正常な完了を記録します。
作成された初期スナップショットは、キャプチャーされたテーブルの各行の現在の状態をキャプチャーします。このベースライン状態から、コネクターは発生した後続の変更をキャプチャーします。
スナップショットプロセスが開始されたら、コネクターの障害、リバランス、またはその他の理由でプロセスが中断されると、コネクターの再起動後にプロセスが再起動されます。
コネクターによって最初のスナップショットが完了した後、更新に抜けがないように、手順 4 で読み取った位置からストリーミングを続行します。
何らかの理由でコネクターが再び停止した場合に、コネクターは再起動後に最後に停止した位置から変更のストリーミングを再開します。
3.2.1.1. 初期スナップショットがすべてのテーブルのスキーマ履歴をキャプチャーする理由
コネクターが実行する最初のスナップショットは、2 種類の情報をキャプチャーします。
- テーブルデータ
-
コネクターの
table.include.list
プロパティーにあるテーブルのINSERT
、UPDATE
、およびDELETE
操作に関する情報。 - スキーマデータ
- テーブルに適用される構造の変更を記述する DDL ステートメント。スキーマデータは、内部スキーマ履歴トピックとコネクターのスキーマ変更トピック (設定されている場合) の両方に保持されます。
初期スナップショットを実行すると、キャプチャー対象として指定されていないテーブルのスキーマ情報がスナップショットによってキャプチャーされることが分かります。デフォルトでは、初期スナップショットは、キャプチャー用に指定されたテーブルからだけでなく、データベースに存在するすべてのテーブルのスキーマ情報を取得するように設計されています。コネクターでは、テーブルのスキーマがスキーマ履歴トピックにある状態で、テーブルをキャプチャーする必要があります。初期スナップショットが元のキャプチャーセットの一部ではないテーブルのスキーマデータをキャプチャーできるようにして、後で必要になった場合にこれらのテーブルからイベントデータを簡単にキャプチャーできるように、Debezium はコネクターを準備します。初期スナップショットがテーブルのスキーマをキャプチャーしない場合は、コネクターがテーブルからデータをキャプチャーする前に、履歴トピックにスキーマを追加する必要があります。
場合によっては、最初のスナップショットでのスキーマキャプチャーを制限する場合があります。これは、スナップショットの完了に必要な時間の短縮に便利です。または、Debezium が複数の論理データベースにアクセスできるユーザーアカウントを使用して、データベースインスタンスに接続しているにもかかわらず、コネクターで特定の論理データベース内のテーブルからの変更のみをキャプチャーする場合にも便利です。
関連情報
- 初期スナップショットでキャプチャーされなかったテーブルからのデータのキャプチャー (スキーマ変更なし)
- 初期スナップショットでキャプチャーされなかったテーブルからのデータのキャプチャー (スキーマ変更)
-
schema.history.internal.store.only.captured.tables.ddl
プロパティーを設定して、スキーマ情報をキャプチャーするテーブルを指定します。 -
schema.history.internal.store.only.captured.databases.ddl
プロパティーを設定して、スキーマ変更をキャプチャーする論理データベースを指定します。
3.2.1.2. 初期スナップショットでキャプチャーされなかったテーブルからのデータのキャプチャー (スキーマ変更なし)
コネクターを使用して、最初のスナップショットでスキーマがキャプチャーされなかったテーブルからデータをキャプチャーする場合があります。コネクターの設定によっては、最初のスナップショットはデータベース内の特定のテーブルのテーブルスキーマのみをキャプチャーする場合があります。テーブルスキーマが履歴トピックに存在しない場合、コネクターはテーブルのキャプチャーに失敗し、スキーマ欠落エラーを報告します。
テーブルからデータを取得できる場合もありますが、テーブルスキーマを追加するには別の手順を実行する必要があります。
前提条件
- コネクターにより最初のスナップショット中にキャプチャーされなかったスキーマが含まれるテーブルからデータをキャプチャーしたいと考えている。
- コネクターが読み取った最も初期の変更テーブルエントリーと、最新の変更テーブルエントリーの LSN の間で、スキーマの変更がテーブルに加えられていない。構造が変更された新しいテーブルからデータを取得する方法は、「初期スナップショットでキャプチャーされなかったテーブルからのデータのキャプチャー (スキーマ変更)」 を参照してください。
手順
- コネクターを停止します。
-
schema.history.internal.kafka.topic プロパティー
で指定された内部データベーススキーマ履歴トピックを削除します。 設定された Kafka Connect
offset.storage.topic
内のオフセットをクリアします。オフセットを削除する方法の詳細は、Debezium コミュニティーの FAQ を参照してください。警告オフセットの削除は、内部 Kafka Connect データの操作の経験がある上級ユーザーのみが実行してください。この操作によりシステムが破損する場合があるため、最後の手段としてのみ実行してください。
以下の変更をコネクター設定に適用します。
(オプション)
schema.history.internal.captured.tables.ddl
の値をfalse
に設定します。この設定により、スナップショットですべてのテーブルのスキーマがキャプチャーされ、今後、コネクターがすべてのテーブルのスキーマ履歴を再構築できるようにします。
注記すべてのテーブルのスキーマをキャプチャーするスナップショットは、完了までにさらに時間がかかります。
-
コネクターがキャプチャーするテーブルを
table.include.list
に追加します。 snapshot.mode
を次のいずれかの値に設定します。Initial
-
コネクターを再起動すると、テーブルデータとテーブル構造をキャプチャーするデータベースの完全なスナップショットが作成されます。
このオプションを選択する場合は、コネクターがすべてのテーブルのスキーマをキャプチャーできるように、schema.history.internal.captured.tables.ddl
プロパティーの値をfalse
に設定することを検討してください。 schema_only
- コネクターを再起動すると、テーブルスキーマのみをキャプチャーするスナップショットが作成されます。完全なデータスナップショットとは異なり、このオプションではテーブルデータはキャプチャーされません。完全なスナップショットが作成される前に、早くコネクターを再起動する場合は、このオプションを使用します。
-
コネクターを再起動します。コネクターは、
snapshot.mode
で指定されたタイプのスナップショットを完了します。 (オプション) コネクターが
schema_only
スナップショットを実行した場合、スナップショットの完了後に 増分スナップショット を開始して、追加したテーブルからデータをキャプチャーします。コネクターは、テーブルからリアルタイムの変更をストリーミングし続けながら、スナップショットを実行します。増分スナップショットを実行すると、次のデータ変更がキャプチャーされます。- コネクターが以前にキャプチャーしたテーブルの場合、増分スナップショットは、コネクターが停止している間、つまりコネクターが停止してから現在の再起動までの間に発生した変更をキャプチャーします。
- 新しく追加されたテーブルの場合、増分スナップショットは既存のテーブル行をすべてキャプチャーします。
3.2.1.3. 初期スナップショットでキャプチャーされなかったテーブルからのデータのキャプチャー (スキーマ変更)
スキーマ変更がテーブルに適用される場合、スキーマ変更前にコミットされたレコードの構造は、変更後にコミットされたレコードとは異なります。Debezium はテーブルからデータをキャプチャーするときに、スキーマ履歴を読み取り、各イベントに正しいスキーマが適用されていることを確認します。スキーマがスキーマ履歴トピックに存在しない場合、コネクターはテーブルをキャプチャーできず、エラーが発生します。
最初のスナップショットでキャプチャーされず、テーブルのスキーマが変更されたテーブルからデータをキャプチャーする場合、スキーマがまだ使用可能でない場合は、履歴トピックにスキーマを追加する必要があります。新しいスキーマスナップショットを実行するか、テーブルの初期スナップショットを実行して、スキーマを追加できます。
前提条件
- コネクターにより最初のスナップショット中にキャプチャーされなかったスキーマが含まれるテーブルからデータをキャプチャーしたいと考えている。
- スキーマ変更がテーブルに適用されたため、キャプチャーされるレコードの構造が不均一になっている。
手順
- 初期スナップショットにすべてのテーブルのスキーマがキャプチャーされている場合 (
store.only.captured.tables.ddl
はfalse
に設定されました)。 -
table.include.list
プロパティーを編集して、キャプチャーするテーブルを指定します。 - コネクターを再起動します。
- 新しく追加したテーブルから既存のデータをキャプチャーする場合は、増分スナップショット を開始します。
-
- 初期スナップショットにすべてのテーブルのスキーマがキャプチャーされていない場合 (
store.only.captured.tables.ddl
がtrue
に設定されています)。 最初のスナップショットでキャプチャーするテーブルのスキーマが保存されなかった場合は、次のいずれかの手順を実行します。
- 手順 1: スキーマスナップショット、その後に増分スナップショット
この手順では、コネクターは最初にスキーマのスナップショットを実行します。その後、増分スナップショットを開始して、コネクターがデータを同期できるようにします。
- コネクターを停止します。
-
schema.history.internal.kafka.topic プロパティー
で指定された内部データベーススキーマ履歴トピックを削除します。 設定された Kafka Connect
offset.storage.topic
内のオフセットをクリアします。オフセットを削除する方法の詳細は、Debezium コミュニティーの FAQ を参照してください。警告オフセットの削除は、内部 Kafka Connect データの操作の経験がある上級ユーザーのみが実行してください。この操作によりシステムが破損する場合があるため、最後の手段としてのみ実行してください。
次の手順の説明に従って、コネクター設定のプロパティーの値を設定します。
-
snapshot.mode
プロパティーの値をschema_only
に設定します。 -
table.include.list
を編集して、キャプチャーするテーブルを追加します。
-
- コネクターを再起動します。
- Debezium が新規および既存のテーブルのスキーマをキャプチャーするまで待ちます。コネクターが停止した後にテーブルで発生したデータ変更はキャプチャーされません。
- データが損失されないようにするには、増分スナップショット を開始します。
- 手順 2: 初期スナップショットと、それに続くオプションの増分スナップショット
この手順では、コネクターはデータベースの完全な初期スナップショットを実行します。他の初期スナップショットと同様、多数の大きなテーブルが含まれるデータベースでは、初期スナップショットの実行操作には時間がかかる可能性があります。スナップショットの完了後、任意で増分スナップショットをトリガーして、コネクターがオフラインの間に発生した変更をキャプチャーできます。
- コネクターを停止します。
-
schema.history.internal.kafka.topic プロパティー
で指定された内部データベーススキーマ履歴トピックを削除します。 設定された Kafka Connect
offset.storage.topic
内のオフセットをクリアします。オフセットを削除する方法の詳細は、Debezium コミュニティーの FAQ を参照してください。警告オフセットの削除は、内部 Kafka Connect データの操作の経験がある上級ユーザーのみが実行してください。この操作によりシステムが破損する場合があるため、最後の手段としてのみ実行してください。
-
table.include.list
を編集して、キャプチャーするテーブルを追加します。 次の手順の説明に従って、コネクター設定のプロパティーの値を設定します。
-
snapshot.mode
プロパティーの値をinitial
に設定します。 -
(オプション)
schema.history.internal.store.only.captured.tables.ddl
をfalse
に設定します。
-
- コネクターを再起動します。コネクターはデータベース全体のスナップショットを取得します。スナップショットが完了すると、コネクターはストリーミングに移行します。
- (オプション) コネクターがオフラインの間に変更されたデータをキャプチャーするには、増分スナップショット を開始します。
3.2.2. アドホックスナップショット
デフォルトでは、コネクターは初回スナップショット操作の開始後にのみ実行されます。通常の状況では、この最初のスナップショットが作成されると、コネクターではスナップショットプロセスは繰り返し処理されません。コネクターがキャプチャーする今後の変更イベントデータはストリーミングプロセス経由でのみ行われます。
ただし、場合によっては、最初のスナップショット中にコネクターを取得したデータが古くなったり、失われたり、または不完全となったり可能性があります。テーブルデータを再キャプチャーするメカニズムを提供するため、Debezium にはアドホックスナップショットを実行するオプションがあります。データベースで以下が変更されたことで、アドホックスナップショットが実行される場合があります。
- コネクター設定は、異なるテーブルセットをキャプチャーするように変更されます。
- Kafka トピックを削除して、再構築する必要があります。
- 設定エラーや他の問題が原因で、データの破損が発生します。
アドホックと呼ばれるスナップショット を開始することで、以前にスナップショットをキャプチャーしたテーブルのスナップショットを再実行できます。アドホックスナップショットには、シグナルテーブル を使用する必要があります。シグナルリクエストを Debezium シグナルテーブルに送信して、アドホックスナップショットを開始します。
既存のテーブルのアドホックスナップショットを開始すると、コネクターはテーブルにすでに存在するトピックにコンテンツを追加します。既存のトピックが削除された場合には、トピックの自動作成 が有効になっているのであれば、Debezium は自動的にトピックを作成できます。
アドホックのスナップショットシグナルは、スナップショットに追加するテーブルを指定します。スナップショットは、データベースの内容全体をキャプチャーしたり、データベース内のテーブルのサブセットのみをキャプチャーしたりできます。また、スナップショットは、データベース内のテーブルの内容のサブセットをキャプチャできます。
execute-snapshot
メッセージをシグナルテーブルに送信してキャプチャーするテーブルを指定します。以下の表で説明されているように、execute-snapshot
シグナルのタイプを incremental
に設定し、スナップショットに追加するテーブルの名前を指定します。
フィールド | デフォルト | 値 |
---|---|---|
|
|
実行するスナップショットのタイプを指定します。 |
| 該当なし |
スナップショットされるテーブルの完全修飾名にマッチする正規表現を含む配列。 |
| 該当なし | テーブルの内容のサブセットを取得するために、テーブルの列に基づいて条件を指定するオプションの文字列。 |
| 該当なし | スナップショット処理中にコネクターがテーブルのプライマリーキーとして使用する列名を指定するオプションの文字列。 |
アドホックスナップショットのトリガー
execute-snapshot
シグナルタイプのエントリーをシグナルテーブルに追加して、アドホックスナップショットを開始します。コネクターがメッセージを処理した後に、スナップショット操作を開始します。スナップショットプロセスは、最初と最後のプライマリーキーの値を読み取り、これらの値を各テーブルの開始ポイントおよびエンドポイントとして使用します。テーブルのエントリー数と設定されたチャンクサイズに基づいて、Debezium はテーブルをチャンクに分割し、チャンクごとに 1 度に 1 つずつスナップショットを順番に作成していきます。
現在、execute-snapshot
アクションタイプは 増分スナップショット のみをトリガーします。詳細は、スナップショットの増分を参照してください。
3.2.3. 増分スナップショット
スナップショットを柔軟に管理するため、Debezium には 増分スナップショット と呼ばれる補助スナップショットメカニズムが含まれています。増分スナップショットは、Debezium コネクターにシグナルを送信する ための Debezium メカニズムに依存します。
増分スナップショットでは、最初のスナップショットのように、データベースの完全な状態を一度にすべてキャプチャーする代わりに、一連の設定可能なチャンクで各テーブルを段階的にキャプチャーします。スナップショットがキャプチャーするテーブルと、各チャンクのサイズ を指定できます。チャンクのサイズにより、データベース上の各フェッチ操作中にスナップショットで収集される行数が決まります。増分スナップショットのデフォルトのチャンクサイズは 1024 行です。
増分スナップショットが進むと、Debezium はウォーターマークを使用して進捗を追跡し、キャプチャーする各テーブル行のレコードを管理します。この段階的なアプローチでは、標準の初期スナップショットプロセスと比較して、以下の利点があります。
- スナップショットが完了するまで、ストリーミングストリーミングを延期する代わりに、ストリームしたデータキャプチャーと並行して増分スナップショットを実行できます。コネクターはスナップショットプロセス全体で変更ログからのほぼリアルタイムイベントをキャプチャーし続け、他の操作はブロックしません。
- 増分スナップショットの進捗が中断された場合は、データを失うことなく再開できます。プロセスが再開すると、スナップショットは最初からテーブルをキャプチャーするのではなく、停止した時点から開始します。
-
いつでも増分スナップショットを実行し、必要に応じてプロセスを繰り返してデータベースの更新に適合できます。たとえば、コネクター設定を変更してテーブルを
table.include.list
プロパティーに追加した後にスナップショットを再実行します。
増分スナップショットプロセス
増分スナップショットを実行する場合には、Debezium は各テーブルをプライマリーキー別に分類して、設定されたチャンクサイズ に基づいてテーブルをチャンクに分割します。チャンクごとに作業し、テーブルの行ごとにチャンクでキャプチャーします。キャプチャーする行ごとに、スナップショットは READ
イベントを出力します。そのイベントは、対象となるチャンクのスナップショットを開始する時の行の値を表します。
スナップショットの作成が進むにつれ、他のプロセスがデータベースへのアクセスを継続し、テーブルレコードが変更される可能性があります。このような変更を反映させるように、通常通りに INSERT
、UPDATE
、DELETE
操作がトランザクションログにコミットされます。同様に、継続中の Debezium ストリーミングプロセスは、これらの変更イベントを検出し、対応する変更イベントレコードを Kafka に出力します。
Debezium を使用してプライマリーキーが同じレコード間での競合を解決する方法
場合によっては、ストリーミングプロセスが出力する UPDATE
または DELETE
イベントを順番に受信できます。つまり、ストリーミングプロセスは、スナップショットがその行の READ
イベントが含まれるチャンクをキャプチャーする前に、テーブルの行を変更するイベントを生成する可能性があります。スナップショットが最終的に対象の行にあった READ
イベントを出力すると、その値はすでに置き換えられています。Debezium は、シーケンスが到達する増分スナップショットイベントが正しい論理順序で処理されるように、競合を解決するためにバッファースキームを使用します。スナップショットのイベント間で競合が発生し、ストリームされたイベントが解決されてからでないと、Debezium はイベントのレコードを Kafka に送信しません。
スナップショットウィンドウ
遅れて入ってきた READ
イベントと、同じテーブルの行を変更するストリーミングイベント間の競合の解決を容易にするために、Debezium は スナップショットウィンドウ と呼ばれるものを使用します。スナップショットウィンドウは、増分スナップショットが指定のテーブルチャンクのデータをキャプチャーしている途中に、間隔を決定します。チャンクのスナップショットウィンドウを開く前に、Debezium は通常の動作に従い、トランザクションログから直接ターゲットの Kafka トピックにイベントをダウンストリームに出力します。ただし、特定のチャンクのスナップショットが開放された瞬間から終了するまで、Debezium は重複除去のステップを実行して、プライマリーキーが同じイベント間での競合を解決します。
データコレクションごとに、Debezium は 2 種類のイベントを出力し、それらの両方のレコードを単一の宛先 Kafka トピックに保存します。テーブルから直接キャプチャーするスナップショットレコードは、READ
操作として出力されます。その間、ユーザーはデータコレクションのレコードの更新を続け、各コミットを反映するようにトランザクションログが更新されるので、Debezium は変更ごとに UPDATE
または DELETE
操作を出力します。
スナップショットウィンドウが開放され、Debezium がスナップショットチャンクの処理を開始すると、スナップショットレコードをメモリーバッファーに提供します。スナップショットウィンドウ中に、バッファー内の READ
イベントのプライマリーキーは、受信ストリームイベントのプライマリーキーと比較されます。一致するものが見つからない場合、ストリーミングされたイベントレコードが Kafka に直接送信されます。Debezium が一致を検出すると、バッファーされた READ
イベントを破棄し、ストリーミングされたレコードを宛先トピックに書き込みます。これは、ストリーミングされたイベントが静的スナップショットイベントよりも論理的に優先されるためです。チャンクのスナップショットウィンドウが終了すると、バッファーに含まれるのは、関連するトランザクションログイベントが存在しない READ
イベントのみです。Debezium は、これらの残りの READ
イベントをテーブルの Kafka トピックに出力します。
コネクターは各スナップショットチャンクにプロセスを繰り返します。
Db2 の Debezium コネクターでは、増分スナップショットの実行中のスキーマの変更はサポートしません。
3.2.3.1. 増分スナップショットのトリガー
現在、増分スナップショットを開始する唯一の方法は、アドホックスナップショットシグナル をソースデータベースのシグナルテーブルに送信することです。
シグナルを SQL INSERT
クエリーとしてシグナルテーブルに送信します。
Debezium がシグナルテーブルの変更を検出すると、シグナルを読み取り、要求されたスナップショット操作を実行します。
送信するクエリーはスナップショットに追加するテーブルを指定し、必要に応じてスナップショット操作の種類を指定します。現在、スナップショット操作で唯一の有効なオプションはデフォルト値の incremental
だけです。
スナップショットに追加するテーブルを指定するには、テーブルをリストする data-collections
配列またはテーブルの照合に使用する正規表現の配列を指定します。以下に例を示します。
{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}
増分スナップショットシグナルの data-collections
アレイにはデフォルト値がありません。data-collections
アレイが空である場合には、アクションが不要であり、スナップショットを実行しないことが、Debezium で検出されます。
スナップショットに含めるテーブルの名前に、データベース、スキーマ、またはテーブルの名前にドット (.
) が含まれている場合、そのテーブルを data-collections
配列に追加するには、名前の各パートを二重引用符でエスケープする必要があります。
たとえば、以下のようなテーブルを含めるには public
スキーマに存在し、その名前が My.Table
のテーブルを含めるには、"public"."My.Table"
の形式を使用します。
前提条件
- ソースデータベースにシグナリングデータコレクションが存在する。
-
シグナルデータコレクションが
signal.data.collection
プロパティーで指定されている。
ソースシグナリングチャネルを使用して増分スナップショットをトリガーする
SQL クエリーを送信し、アドホック増分スナップショット要求をシグナルテーブルに追加します。
INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<tableName>","<tableName>"],"type":"<snapshotType>","additional-condition":"<additional-condition>"}');
以下に例を示します。
INSERT INTO myschema.debezium_signal (id, type, data) 1 values ('ad-hoc-1', 2 'execute-snapshot', 3 '{"data-collections": ["schema1.table1", "schema2.table2"], 4 "type":"incremental"}, 5 "additional-condition":"color=blue"}'); 6
コマンドの
id
、type
、およびdata
パラメーターの値は、シグナルテーブルのフィールド に対応します。以下の表では、この例のパラメーターを説明しています。
表3.2 シグナルテーブルに増分スナップショットシグナルを送信する SQL コマンドのフィールドの説明 項目 値 説明 1
myschema.debezium_signal
ソースデータベースにあるシグナルテーブルの完全修飾名を指定します。
2
ad-hoc-1
id
パラメーターは、シグナルリクエストのID
識別子として割り当てられる任意の文字列を指定します。
この文字列を使用して、シグナルテーブルのエントリーへのログメッセージを特定します。Debezium はこの文字列を使用しません。代わりに、スナップショット作成中に、Debezium は独自のID
文字列をウォーターマークシグナルとして生成します。3
execute-snapshot
type
パラメーターを指定し、シグナルがトリガーする操作を指定します。
4
data-collections
シグナルの
data
フィールドの必須コンポーネントで、スナップショットに含めるテーブル名の配列またはテーブル名と一致する正規表現を指定します。
この配列は、完全修飾名でテーブルをマッチさせる正規表現をリストアップします。signal.data.collection
設定プロパティーでコネクターのシグナリングテーブル名を指定するのと同じ形式を使用します。5
incremental
実行するスナップショット操作の種類指定するシグナルの
data
フィールドの任意のtype
コンポーネント。
現在、唯一の有効なオプションはデフォルト値incremental
だけです。
値を指定しない場合には、コネクターは増分スナップショットを実行します。6
additional-condition
テーブルの内容のサブセットを取得するために、テーブルの列に基づいて条件を指定するオプションの文字列。
additional-condition
パラメーターの詳細は、additional-condition
付きのアドホック増分スナップショット を参照してください。
additional-condition
付きのアドホック増分スナップショット
スナップショットに、テーブル内のコンテンツのサブセットのみを含める場合は、スナップショットシグナルシグナルに additional-condition
パラメーターを追加してシグナル要求を変更できます。
一般的なスナップショットの SQL クエリーは、以下の形式を取ります。
SELECT * FROM <tableName> ....
additional-condition
パラメーターを追加して、以下の例のように WHERE
条件を SQL クエリーに追加します。
SELECT * FROM <tableName> WHERE <additional-condition> ....
以下の例は、シグナルテーブルに追加の条件を含むアドホック増分スナップショット要求を送信する SQL クエリーを示しています。
INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<tableName>","<tableName>"],"type":"<snapshotType>","additional-condition":"<additional-condition>"}');
たとえば、以下の列が含まれる products
テーブルがあるとします。
-
id
(プライマリーキー) -
color
-
quantity
products
テーブルの増分スナップショットに color=blue
のデータ項目のみを含める場合は、次の SQL ステートメントを使用してスナップショットをトリガーできます。
INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.products"],"type":"incremental", "additional-condition":"color=blue"}');
additional-condition
パラメーターを使用すると、列が 2 つ以上となる条件を指定することもできます。たとえば、前述の例の products
テーブルを使用して、color=blue
および quantity>10
だけに一致するアイテムのみのデータが含まれる増分スナップショットをトリガーするクエリーを送信できます。
INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.products"],"type":"incremental", "additional-condition":"color=blue AND quantity>10"}');
以下の例は、コネクターによってキャプチャーされる増分スナップショットイベントの JSON を示しています。
例: 増分スナップショットイベントメッセージ
{ "before":null, "after": { "pk":"1", "value":"New data" }, "source": { ... "snapshot":"incremental" 1 }, "op":"r", 2 "ts_ms":"1620393591654", "transaction":null }
項目 | フィールド名 | 説明 |
---|---|---|
1 |
|
実行するスナップショット操作タイプを指定します。 |
2 |
|
イベントタイプを指定します。 |
3.2.3.2. Kafka シグナルチャネルを使用して増分スナップショットをトリガーする
設定された Kafka トピック にメッセージを送信して、コネクターにアドホック増分スナップショットを実行するよう要求できます。
Kafka メッセージのキーは、topic.prefix
コネクター設定オプションの値と一致する必要があります。
メッセージの値は、type
と data
フィールドが含まれる JSON オブジェクトとなっています。
シグナルタイプは execute-snapshot
で、data
フィールドには以下のフィールドが必要です。
フィールド | デフォルト | 値 |
---|---|---|
|
|
実行するスナップショットのタイプ。現在、Debezium は |
| 該当なし |
スナップショットに含めるテーブルの完全修飾名と一致する、コンマ区切りの正規表現の配列。 |
| 該当なし | コネクターがスナップショットに含める列のサブセットを指定するために評価する条件を指定するオプションの文字列。 |
execute-snapshot Kafka メッセージの例:
Key = `test_connector` Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`
追加条件付きのアドホック増分スナップショット
Debezium は additional-condition
フィールドを使用してテーブルのコンテンツのサブセットを選択します。
通常、Debezium はスナップショットを実行するときに、次のような SQL クエリーを実行します。
SELECT * FROM <tableName> ….
スナップショットリクエストに additional-condition
が含まれる場合、次のように additional-condition
が SQL クエリーに追加されます。
SELECT * FROM <tableName> WHERE <additional-condition> ….
たとえば、列 id
(プライマリーキー)、color
、および brand
を含む products
テーブルがある場合、スナップショットに color='blue'
のコンテンツのみを含める場合は、スナップショットをリクエストするときに、コンテンツをフィルタリングする additional-condition
ステートメントを追加することができます。
Key = `test_connector` Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.products"], "type": "INCREMENTAL", "additional-condition":"color='blue'"}}`
additional-condition
ステートメントを使用して、複数の列に基づいて条件を渡すことができます。たとえば、前の例と同じ products
テーブルを使用して、color='blue'
および brand='MyBrand'
である products
テーブルのコンテンツのみをスナップショットに含める場合は、次のリクエストを送信できます。
Key = `test_connector` Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.products"], "type": "INCREMENTAL", "additional-condition":"color='blue' AND brand='MyBrand'"}}`
3.2.3.3. 増分スナップショットの停止
ソースデータベースのテーブルにシグナルを送信して、増分スナップショットを停止することもできます。SQL INSERT
クエリーを送信して、停止スナップショットシグナルをテーブルに送信します。
Debezium はシグナルテーブルの変更を検出した後、シグナルを読み、増分スナップショット操作が進行中であればそれを停止します。
送信するクエリーは、incremental
のスナップショット操作を指定し、任意で、削除する実行中のスナップショットのテーブルを指定します。
前提条件
- ソースデータベースにシグナリングデータコレクションが存在する。
-
シグナルデータコレクションが
signal.data.collection
プロパティーで指定されている。
ソースシグナリングチャネルを使用して増分スナップショットを停止する
SQL クエリーを送信して、シグナリングテーブルへのアドホックインクリメンタルスナップショットを停止します。
INSERT INTO <signalTable> (id, type, data) values ('<id>', 'stop-snapshot', '{"data-collections": ["<tableName>","<tableName>"],"type":"incremental"}');
以下に例を示します。
INSERT INTO myschema.debezium_signal (id, type, data) 1 values ('ad-hoc-1', 2 'stop-snapshot', 3 '{"data-collections": ["schema1.table1", "schema2.table2"], 4 "type":"incremental"}'); 5
signal コマンドの
id
、type
、およびdata
パラメーターの値は、シグナルテーブルのフィールド に対応します。以下の表では、この例のパラメーターを説明しています。
表3.4 シグナリングテーブルに増分スナップショット停止信号を送信するための SQL コマンドのフィールドの説明 項目 値 説明 1
myschema.debezium_signal
ソースデータベースにあるシグナルテーブルの完全修飾名を指定します。
2
ad-hoc-1
id
パラメーターは、シグナルリクエストのID
識別子として割り当てられる任意の文字列を指定します。
この文字列を使用して、シグナルテーブルのエントリーへのログメッセージを特定します。Debezium はこの文字列を使用しません。3
stop-snapshot
type
パラメーターを指定し、シグナルがトリガーする操作を指定します。
4
data-collections
シグナルの
data
フィールドのオプションコンポーネントで、スナップショットから削除するテーブル名の配列またはテーブル名とマッチする正規表現を指定します。
この配列は、完全修飾名でテーブルをマッチさせる正規表現をリストアップします。signal.data.collection
設定プロパティーでコネクターのシグナリングテーブル名を指定するのと同じ形式を使用します。data
フィールドのこのコンポーネントを省略すると、シグナルは進行中の増分スナップショット全体を停止します。5
incremental
停止させるスナップショット操作の種類を指定する信号の
data
フィールドの必須コンポーネント。
現在、有効な唯一のオプションはincremental
です。type
の値を指定しない場合、シグナルは増分スナップショットの停止に失敗します。
3.2.3.4. Kafka シグナリングチャネルを使用して増分スナップショットを停止する
設定された Kafka シグナルトピック にシグナルメッセージを送信して、アドホック増分スナップショットを停止できます。
Kafka メッセージのキーは、topic.prefix
コネクター設定オプションの値と一致する必要があります。
メッセージの値は、type
と data
フィールドが含まれる JSON オブジェクトとなっています。
シグナルタイプは stop-snapshot
で、data
フィールドには以下のフィールドが必要です。
フィールド | デフォルト | 値 |
---|---|---|
|
|
実行するスナップショットのタイプ。現在、Debezium は |
| 該当なし |
スナップショットに含めるテーブルの完全修飾名と一致する、コンマ区切りの正規表現のオプションの配列。 |
次の例は、典型的な stop-snapshot
の Kafka メッセージを示しています。
Key = `test_connector` Value = `{"type":"stop-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`
3.2.4. Debezium Db2 コネクターによる変更データテーブルの読み取り方法
スナップショットの完了後、Debezium Db2 コネクターが初めて起動すると、キャプチャーモードである各ソーステーブルの変更データテーブルを識別します。コネクターは各変更データテーブルに対して以下を行います。
- 最後に保存された最大 LSN から現在の最大 LSN の間に作成された変更イベントを読み取ります。
- 各イベントのコミット LSN および変更 LSN に従って、変更イベントを順序付けます。これにより、コネクターはテーブルが変更された順序で変更イベントを出力します。
- コミット LSN および変更 LSN をオフセットとして Kafka Connect に渡します。
- コネクターが Kafka Connect に渡した最大 LSN を保存します。
再起動後、コネクターは停止した場所でオフセット (コミット LSN および変更 LSN) から変更イベントの出力を再開します。コネクターが稼働し、変更イベントを出力している間、キャプチャーモードからテーブルを削除したり、テーブルをキャプチャーモードに追加したりすると、コネクターは変更を検出して、それに合わせて動作を変更します。
3.2.5. Debezium Db2 変更イベントレコードを受信する Kafka トピックのデフォルト名
デフォルトでは、Db2 コネクターは、テーブルで発生するすべての INSERT
、UPDATE
、DELETE
操作の変更イベントを、そのテーブルに固有の単一の Apache Kafka トピックに書き込みます。コネクターは以下の規則を使用して変更イベントトピックに名前を付けます。
topicPrefix.schemaName.tableName
以下のリストは、デフォルト名のコンポーネントの定義を示しています。
- topicPrefix
-
topic.prefix
コネクター設定プロパティーで指定されたトピック接頭辞。 - schemaName
- 操作が発生したスキーマの名前。
- tableName
- 操作が発生したテーブルの名前。
たとえば、MYSCHEMA
スキーマに 4 つのテーブル (PRODUCTS
、PRODUCTS_ON_HAND
、CUSTOMERS
、ORDERS
) を含む mydatabase
データベースを使用した Db2 インストールについて考えてみます。コネクターはイベントを以下の 4 つの Kafka トピックに出力します。
-
mydatabase.MYSCHEMA.PRODUCTS
-
mydatabase.MYSCHEMA.PRODUCTS_ON_HAND
-
mydatabase.MYSCHEMA.CUSTOMERS
-
mydatabase.MYSCHEMA.ORDERS
コネクターは同様の命名規則を適用して、内部データベーススキーマの履歴トピック (スキーマ変更トピック と トランザクションメタデータトピック) にラベルを付けます。
デフォルトのトピック名が要件を満たさない場合は、カスタムトピック名を設定できます。カスタムトピック名を設定するには、論理トピックルーティング SMT に正規表現を指定します。論理トピックルーティング SMT を使用してトピックの命名をカスタマイズする方法は、トピックルーティング を参照してください。
3.2.6. Debezium Db2 コネクターによるデータベーススキーマの変更の処理方法
データベースクライアントがデータベースのクエリーを行うと、クライアントはデータベースの現在のスキーマを使用します。しかし、データベーススキーマはいつでも変更が可能です。そのため、挿入、更新、または削除の操作が記録されるたびに、コネクターはどのスキーマであるかを特定できる必要があります。また、コネクターは必ずしも現在のスキーマをすべてのイベントに適用できるとは限りません。イベントが比較的古い場合は、現在のスキーマが適用される前に記録された可能性があります。
スキーマ変更後に発生するイベントを正しく処理するために、Debezium Db2 コネクターは、関連するデータテーブルの構造をミラーリングする Db2 変更データテーブルの構造に基づいて、新しいスキーマのスナップショットを保存します。コネクターは、データベーススキーマ履歴 Kafka トピックに、スキーマ変更の結果 (複数操作の LSN) と合わせてテーブルのスキーマ情報を保存します。コネクターは、保管されたスキーマ表現を使用して、挿入、更新、または削除の各操作時にテーブルの構造を正しくミラーリングする変更イベントを生成します。
クラッシュまたは正常に停止した後にコネクターが再起動すると、最後に読み取った位置から Db2 変更データテーブル内のエントリーの読み取りを再開します。コネクターがデータベーススキーマ履歴トピックから読み取るスキーマ情報を基に、コネクターが再起動する場所に存在したテーブル構造を適用します。
キャプチャーモードの Db2 テーブルのスキーマを更新する場合は、対応する変更テーブルのスキーマも更新することが重要です。データベーススキーマを更新するには、昇格権限のある Db2 データベース管理者である必要があります。Debezium 環境で Db2 データベーススキーマを更新する方法は、スキーマ履歴の進化 を参照してください。
データベーススキーマ履歴トピックは、内部コネクター専用となっています。コネクターは任意で コンシューマーアプリケーションを対象とした別のトピックにスキーマ変更イベントを発行する こともできます。
関連情報
- Debezium イベントレコードを受信する トピックのデフォルト名。
3.2.7. Debezium Db2 コネクターのスキーマ変更トピック
Debezium Db2 コネクターを設定すると、データベーステーブルに適用されるスキーマの変更を記述するスキーマ変更イベントを生成できます。
Debezium は、以下の場合にスキーマ変更トピックにメッセージを出力します。
- 新しいテーブルがキャプチャーモードになる。
- テーブルがキャプチャーモードから削除される。
- データベーススキーマの更新 中に、キャプチャーモードであるテーブルのスキーマが変更される。
コネクターはスキーマ変更イベントを、<topicPrefix>
という名前の Kafka スキーマ変更トピックに書き込みます。ここで <topicPrefix>
は、topic.prefix
コネクター設定プロパティーで指定されたトピック接頭辞です。コネクターがスキーマ変更トピックに送信するメッセージには以下の要素などのペイロードが含まれます。
databaseName
-
ステートメントが適用されるデータベースの名前。
databaseName
の値は、メッセージキーとして機能します。 pos
- ステートメントが表示されるトランザクションログ内の位置。
tableChanges
-
スキーマの変更後のテーブルスキーマ全体の構造化表現。
tableChanges
フィールドには、テーブルの各列のエントリーなどのアレイが含まれます。構造化された表現は JSON または Avro 形式でデータを表示するため、コンシューマーは DDL パーサーを介して最初にメッセージを処理しなくてもメッセージを簡単に読み取りできます。
キャプチャーモードであるテーブルでは、コネクターはスキーマ変更トピックにスキーマ変更の履歴だけでなく、内部データベーススキーマ履歴トピックにも格納します。内部データベーススキーマ履歴トピックはコネクターのみの使用を対象としており、使用するアプリケーションによる直接使用を目的としていません。スキーマ変更に関する通知が必要なアプリケーションが、スキーマ変更トピックからの情報のみを使用するようにしてください。
データベーススキーマ履歴トピックをパーティションに分割しないでください。データベーススキーマ履歴トピックが正しく機能するには、コネクターが出力するイベントレコードの一貫したグローバル順序を維持する必要があります。
トピックがパーティション間で分割されないようにするには、以下のいずれかの方法を使用してトピックのパーティション数を設定します。
-
データベーススキーマ履歴トピックを手動で作成する場合は、パーティション数を
1
に指定します。 -
Apache Kafka ブローカーを使用してデータベーススキーマ履歴トピックを自動的に作成する場合に、トピックが作成されるので、Kafka
num.partitions
設定オプションの値を1
に設定します。
コネクターがスキーマ変更トピックに出力するメッセージの形式は、初期の状態であり、通知なしに変更される可能性があります。
例: Db2 コネクターのスキーマ変更トピックに出力されるメッセージ
以下の例は、スキーマ変更トピックのメッセージを示しています。メッセージには、テーブルスキーマの論理表現が含まれます。
{ "schema": { ... }, "payload": { "source": { "version": "2.3.4.Final", "connector": "db2", "name": "db2", "ts_ms": 0, "snapshot": "true", "db": "testdb", "schema": "DB2INST1", "table": "CUSTOMERS", "change_lsn": null, "commit_lsn": "00000025:00000d98:00a2", "event_serial_no": null }, "ts_ms": 1588252618953, 1 "databaseName": "TESTDB", 2 "schemaName": "DB2INST1", "ddl": null, 3 "tableChanges": [ 4 { "type": "CREATE", 5 "id": "\"DB2INST1\".\"CUSTOMERS\"", 6 "table": { 7 "defaultCharsetName": null, "primaryKeyColumnNames": [ 8 "ID" ], "columns": [ 9 { "name": "ID", "jdbcType": 4, "nativeType": null, "typeName": "int identity", "typeExpression": "int identity", "charsetName": null, "length": 10, "scale": 0, "position": 1, "optional": false, "autoIncremented": false, "generated": false }, { "name": "FIRST_NAME", "jdbcType": 12, "nativeType": null, "typeName": "varchar", "typeExpression": "varchar", "charsetName": null, "length": 255, "scale": null, "position": 2, "optional": false, "autoIncremented": false, "generated": false }, { "name": "LAST_NAME", "jdbcType": 12, "nativeType": null, "typeName": "varchar", "typeExpression": "varchar", "charsetName": null, "length": 255, "scale": null, "position": 3, "optional": false, "autoIncremented": false, "generated": false }, { "name": "EMAIL", "jdbcType": 12, "nativeType": null, "typeName": "varchar", "typeExpression": "varchar", "charsetName": null, "length": 255, "scale": null, "position": 4, "optional": false, "autoIncremented": false, "generated": false } ], "attributes": [ 10 { "customAttribute": "attributeValue" } ] } } ] } }
項目 | フィールド名 | 説明 |
---|---|---|
1 |
| コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。 ソースオブジェクトの ts_ms は、データベースで変更が行われた時刻を示す。payload.source.ts_ms の値を payload.ts_ms の値と比較することにより、ソースデータベースの更新と Debezium との間の遅延を判断できます。 |
2 |
| 変更が含まれるデータベースとスキーマを識別します。 |
3 |
|
Db2 コネクターの場合は常に |
4 |
| DDL コマンドによって生成されるスキーマの変更が含まれる 1 つ以上の項目の配列。 |
5 |
| 変更の種類を説明します。値は以下のいずれかになります。
|
6 |
| 作成、変更、または破棄されたテーブルの完全な識別子。 |
7 |
| 適用された変更後のテーブルメタデータを表します。 |
8 |
| テーブルのプライマリーキーを設定する列のリスト。 |
9 |
| 変更されたテーブルの各列のメタデータ。 |
10 |
| 各テーブル変更のカスタム属性メタデータ。 |
コネクターがスキーマ変更トピックに送信するメッセージでは、メッセージキーはスキーマの変更が含まれるデータベースの名前です。以下の例では、payload
フィールドにキーが含まれます。
{ "schema": { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "databaseName" } ], "optional": false, "name": "io.debezium.connector.db2.SchemaChangeKey" }, "payload": { "databaseName": "TESTDB" } }
3.2.8. トランザクション境界を表す Debezium Db2 コネクターによって生成されたイベント
Debezium は、トランザクション境界を表し、変更データイベントメッセージをエンリッチするイベントを生成できます。
Debezium は、コネクターのデプロイ後に発生するトランザクションに対してのみメタデータを登録し、受信します。コネクターをデプロイする前に発生するトランザクションのメタデータは利用できません。
Debezium は、すべてのトランザクションで BEGIN
および END
区切り文字のトランザクション境界イベントを生成します。トランザクション境界イベントには以下のフィールドが含まれます。
status
-
BEGIN
またはEND
id
- 一意のトランザクション識別子の文字列表現。
ts_ms
-
データソースでのトランザクション境界イベント (
BEGIN
またはEND
イベント) の時間。もしデータソースが Debezium にイベント時間を提供しないなら、このフィールドは代わりに Debezium がイベントを処理する時間を表します。 event_count
(END
イベント用)- トランザクションによって出力されるイベントの合計数。
data_collections
(END
イベント用)-
data_collection
とevent_count
要素のペアの配列。これは、コネクターがデータコレクションから発信された変更に対して出力するイベントの数を示します。
例
{ "status": "BEGIN", "id": "00000025:00000d08:0025", "ts_ms": 1486500577125, "event_count": null, "data_collections": null } { "status": "END", "id": "00000025:00000d08:0025", "ts_ms": 1486500577691, "event_count": 2, "data_collections": [ { "data_collection": "testDB.dbo.tablea", "event_count": 1 }, { "data_collection": "testDB.dbo.tableb", "event_count": 1 } ] }
topic.transaction
オプションで上書きされない限り、コネクターはトランザクションイベントを <topic.prefix>
.transaction
トピックに出力します。
データ変更イベントのエンリッチメント
トランザクションメタデータを有効にすると、コネクターは変更イベント Envelope
を新しい transaction
フィールドでエンリッチします。このフィールドは、複合フィールドの形式ですべてのイベントに関する情報を提供します。
id
- 一意のトランザクション識別子の文字列表現。
total_order
- トランザクションによって生成されたすべてのイベントを対象とするイベントの絶対位置。
data_collection_order
- トランザクションによって出力されたすべてのイベントを対象とするイベントのデータコレクションごとの位置。
以下は、メッセージの例になります。
{ "before": null, "after": { "pk": "2", "aa": "1" }, "source": { ... }, "op": "c", "ts_ms": "1580390884335", "transaction": { "id": "00000025:00000d08:0025", "total_order": "1", "data_collection_order": "1" } }