3.2. Debezium Db2 コネクターの仕組み
Debezium Db2 コネクターを最適に設定および実行するには、コネクターによるスナップショットの実行方法、変更イベントのストリーム方法、Kafka トピック名の決定方法、およびスキーマ変更の処理方法を理解すると便利です。
詳細は以下を参照してください。
3.2.1. Debezium Db2 コネクターによるデータベーススナップショットの実行方法
Db2 のレプリケーション機能は、データベース変更の完全な履歴を保存するようには設計されていません。そのため、Debezium Db2 コネクターが初めてデータベースに接続すると、キャプチャーモードのテーブルの整合性スナップショットを作成し、この状態を Kafka にストリーミングします。これにより、テーブルの内容のベースラインが確立されます。
デフォルトでは、Db2 コネクターがスナップショットを実行すると、以下が実行されます。
-
キャプチャーモードになっているテーブルを判断するため、スナップショットに含まれなければならないテーブルも判断されます。デフォルトでは、システム以外のテーブルはすべてキャプチャーモードになっています。
table.exclude.list
やtable.include.list
などのコネクター設定プロパティーを使用すると、キャプチャーモードである必要があるテーブルを指定できます。 -
キャプチャーモードの各テーブルでロックを取得します。これにより、スナップショットの実行中にこれらのテーブルでスキーマの変更が発生しないようにします。ロックのレベルは、
snapshot.isolation.mode
コネクター設定プロパティーによって決定されます。 - サーバーのトランザクションログで、最大 (最新) の LSN の位置を読み取ります。
- キャプチャーモードになっているすべてのテーブルのスキーマをキャプチャーします。コネクターは、内部データベーススキーマ履歴トピックでこの情報を永続化します。
- 必要に応じて、ステップ 2 で取得したロックを解放します。通常、これらのロックは短期間のみ保持されます。
ステップ 3 で読み取られた LSN の位置で、コネクターはキャプチャーモードテーブルとそれらのスキーマをスキャンします。スキャン中、コネクターは以下を行います。
- スナップショットの開始前に、テーブルが作成されたことを確認します。そうでない場合は、スナップショットはそのテーブルをスキップします。スナップショットが完了し、コネクターが変更イベントの出力を開始した後、コネクターはスナップショットの実行中に作成されたテーブルの変更イベントを生成します。
- キャプチャーモードになっている各テーブルで、各行の 読み取り イベントを生成します。すべての 読み取り イベントには、LSN の位置が含まれ、これはステップ 3 で取得した LSN の位置と同じです。
- テーブルと同じ名前を持つ Kafka トピックに各 読み取り イベントを出力します。
- コネクターオフセットにスナップショットの正常な完了を記録します。
3.2.1.1. アドホックスナップショット
デフォルトでは、コネクターは初回スナップショット操作の開始後にのみ実行されます。通常の状況では、この最初のスナップショットが作成されると、コネクターではスナップショットプロセスは繰り返し処理されません。コネクターがキャプチャーする今後の変更イベントデータはストリーミングプロセス経由でのみ行われます。
ただし、場合によっては、最初のスナップショット中にコネクターを取得したデータが古くなったり、失われたり、または不完全となったり可能性があります。テーブルデータを再キャプチャーするメカニズムを提供するため、Debezium にはアドホックスナップショットを実行するオプションがあります。データベースで以下が変更されたことで、アドホックスナップショットが実行される場合があります。
- コネクター設定は、異なるテーブルセットをキャプチャーするように変更されます。
- Kafka トピックを削除して、再構築する必要があります。
- 設定エラーや他の問題が原因で、データの破損が発生します。
アドホックと呼ばれるスナップショット を開始することで、以前にスナップショットをキャプチャーしたテーブルのスナップショットを再実行できます。アドホックスナップショットには、シグナルテーブル を使用する必要があります。シグナルリクエストを Debezium シグナルテーブルに送信して、アドホックスナップショットを開始します。
既存のテーブルのアドホックスナップショットを開始すると、コネクターはテーブルにすでに存在するトピックにコンテンツを追加します。既存のトピックが削除された場合には、トピックの自動作成 が有効になっているのであれば、Debezium は自動的にトピックを作成できます。
アドホックのスナップショットシグナルは、スナップショットに追加するテーブルを指定します。スナップショットは、データベースの内容全体をキャプチャーしたり、データベース内のテーブルのサブセットのみをキャプチャーしたりできます。また、スナップショットは、データベース内のテーブルの内容のサブセットをキャプチャできます。
execute-snapshot
メッセージをシグナルテーブルに送信してキャプチャーするテーブルを指定します。以下の表で説明されているように、run-snapshot
シグナルのタイプを incremental
に設定し、スナップショットに追加するテーブルの名前を指定します。
フィールド | デフォルト | Value |
---|---|---|
|
|
実行するスナップショットのタイプを指定します。 |
| 該当なし |
スナップショットされるテーブルの完全修飾名にマッチする正規表現を含む配列。 |
| 該当なし | テーブルの内容のサブセットを取得するために、テーブルの列に基づいて条件を指定するオプションの文字列。 |
アドホックスナップショットのトリガー
execute-snapshot
シグナルタイプのエントリーをシグナルテーブルに追加して、アドホックスナップショットを開始します。コネクターがメッセージを処理した後に、スナップショット操作を開始します。スナップショットプロセスは、最初と最後のプライマリーキーの値を読み取り、これらの値を各テーブルの開始ポイントおよびエンドポイントとして使用します。テーブルのエントリー数と設定されたチャンクサイズに基づいて、Debezium はテーブルをチャンクに分割し、チャンクごとに 1 度に 1 つずつスナップショットを順番に作成していきます。
現在、execute-snapshot
アクションタイプは 増分スナップショット のみをトリガーします。詳細は、スナップショットの増分を参照してください。
3.2.1.2. 増分スナップショット
スナップショットを柔軟に管理するため、Debezium には 増分スナップショット と呼ばれる補助スナップショットメカニズムが含まれています。増分スナップショットは、Debezium コネクターにシグナルを送信するための Debezium メカニズムに依存します。
増分スナップショットでは、最初のスナップショットのように、データベースの完全な状態を一度にすべてキャプチャーする代わりに、一連の設定可能なチャンクで各テーブルを段階的にキャプチャーします。スナップショットがキャプチャーするテーブルと、各チャンクのサイズ を指定できます。チャンクのサイズにより、データベース上の各フェッチ操作中にスナップショットで収集される行数が決まります。増分スナップショットのデフォルトのチャンクサイズは 1 KB です。
増分スナップショットが進むと、Debezium はウォーターマークを使用して進捗を追跡し、キャプチャーする各テーブル行のレコードを管理します。この段階的なアプローチでは、標準の初期スナップショットプロセスと比較して、以下の利点があります。
- スナップショットが完了するまで、ストリーミングストリーミングを延期する代わりに、ストリームしたデータキャプチャーと並行して増分スナップショットを実行できます。コネクターはスナップショットプロセス全体で変更ログからのほぼリアルタイムイベントをキャプチャーし続け、他の操作はブロックしません。
- 増分スナップショットの進捗が中断された場合は、データを失うことなく再開できます。プロセスが再開すると、スナップショットは最初からテーブルをキャプチャーするのではなく、停止した時点から開始します。
-
いつでも増分スナップショットを実行し、必要に応じてプロセスを繰り返してデータベースの更新に適合できます。たとえば、コネクター設定を変更してテーブルを
table.include.list
プロパティーに追加した後にスナップショットを再実行します。
増分スナップショットプロセス
増分スナップショットを実行する場合には、Debezium は各テーブルをプライマリーキー別に分類して、設定されたチャンクサイズ に基づいてテーブルをチャンクに分割します。チャンクごとに作業し、テーブルの行ごとにチャンクでキャプチャーします。キャプチャーする行ごとに、スナップショットは READ
イベントを出力します。そのイベントは、対象となるチャンクのスナップショットを開始する時の行の値を表します。
スナップショットの作成が進むにつれ、他のプロセスがデータベースへのアクセスを継続し、テーブルレコードが変更される可能性があります。このような変更を反映させるように、通常通りに INSERT
、UPDATE
、DELETE
操作がトランザクションログにコミットされます。同様に、継続中の Debezium ストリーミングプロセスは、これらの変更イベントを検出し、対応する変更イベントレコードを Kafka に出力します。
Debezium を使用してプライマリーキーが同じレコード間での競合を解決する方法
場合によっては、ストリーミングプロセスが出力する UPDATE
または DELETE
イベントを順番に受信できます。つまり、ストリーミングプロセスは、スナップショットがその行の READ
イベントが含まれるチャンクをキャプチャーする前に、テーブルの行を変更するイベントを生成する可能性があります。スナップショットが最終的に対象の行にあった READ
イベントを出力すると、その値はすでに置き換えられています。Debezium は、シーケンスが到達する増分スナップショットイベントが正しい論理順序で処理されるように、競合を解決するためにバッファースキームを使用します。スナップショットのイベント間で競合が発生し、ストリームされたイベントが解決されてからでないと、Debezium はイベントのレコードを Kafka に送信しません。
スナップショットウィンドウ
遅れて入ってきた READ
イベントと、同じテーブルの行を変更するストリーミングイベント間の競合の解決を容易にするために、Debezium は スナップショットウィンドウ と呼ばれるものを使用します。スナップショットウィンドウは、増分スナップショットが指定のテーブルチャンクのデータをキャプチャーしている途中に、間隔を決定します。チャンクのスナップショットウィンドウを開く前に、Debezium は通常の動作に従い、トランザクションログから直接ターゲットの Kafka トピックにイベントをダウンストリームに出力します。ただし、特定のチャンクのスナップショットが開放された瞬間から終了するまで、Debezium は重複除去のステップを実行して、プライマリーキーが同じイベント間での競合を解決します。
データコレクションごとに、Debezium は 2 種類のイベントを出力し、それらの両方のレコードを単一の宛先 Kafka トピックに保存します。テーブルから直接キャプチャーするスナップショットレコードは、READ
操作として出力されます。その間、ユーザーはデータコレクションのレコードの更新を続け、各コミットを反映するようにトランザクションログが更新されるので、Debezium は変更ごとに UPDATE
または DELETE
操作を出力します。
スナップショットウィンドウが開放され、Debezium がスナップショットチャンクの処理を開始すると、スナップショットレコードをメモリーバッファーに提供します。スナップショットウィンドウ中に、バッファー内の READ
イベントのプライマリーキーは、受信ストリームイベントのプライマリーキーと比較されます。一致するものが見つからない場合、ストリーミングされたイベントレコードが Kafka に直接送信されます。Debezium が一致を検出すると、バッファーされた READ
イベントを破棄し、ストリーミングされたレコードを宛先トピックに書き込みます。これは、ストリーミングされたイベントが静的スナップショットイベントよりも論理的に優先されるためです。チャンクのスナップショットウィンドウが終了すると、バッファーに含まれるのは、関連するトランザクションログイベントが存在しない READ
イベントのみです。Debezium は、これらの残りの READ
イベントをテーブルの Kafka トピックに出力します。
コネクターは各スナップショットチャンクにプロセスを繰り返します。
Db2 の Debezium コネクターでは、増分スナップショットの実行中のスキーマの変更はサポートしません。
3.2.1.3. 増分スナップショットのトリガー
現在、増分スナップショットを開始する唯一の方法は、アドホックスナップショットシグナル をソースデータベースのシグナルテーブルに送信することです。
シグナルを SQL INSERT
クエリーとしてシグナルテーブルに送信します。
Debezium がシグナルテーブルの変更を検出すると、シグナルを読み取り、要求されたスナップショット操作を実行します。
送信するクエリーはスナップショットに追加するテーブルを指定し、必要に応じてスナップショット操作の種類を指定します。現在、スナップショット操作で唯一の有効なオプションはデフォルト値の incremental
だけです。
スナップショットに追加するテーブルを指定するには、テーブルをリストする data-collections
配列またはテーブルの照合に使用する正規表現の配列を指定します。以下に例を示します。
{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}
増分スナップショットシグナルの data-collections
アレイにはデフォルト値がありません。data-collections
アレイが空である場合には、アクションが不要であり、スナップショットを実行しないことが、Debezium で検出されます。
スナップショットに追加するテーブルの名前に、データベース、スキーマ、またはテーブルの名前にドット(.
)が含まれている場合、テーブルを data-collections
配列に追加するには、名前の各部分を二重引用符でエスケープする必要があります。
たとえば、public
スキーマに存在し、My.Table
という名前のテーブルを含めるには、"public"."My.Table"
の形式を使用します。
前提条件
- ソースデータベースにシグナリングデータコレクションが存在します。
-
シグナルデータコレクションは
signal.data.collection
プロパティーで指定されます。
手順
SQL クエリーを送信し、アドホック増分スナップショット要求をシグナルテーブルに追加します。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<tableName>","<tableName>"],"type":"<snapshotType>","additional-condition":"<additional-condition>"}');
INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<tableName>","<tableName>"],"type":"<snapshotType>","additional-condition":"<additional-condition>"}');
以下に例を示します。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow INSERT INTO myschema.debezium_signal (id, type, data) values ('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.table1", "schema2.table2"], "type":"incremental"}, "additional-condition":"color=blue"}');
INSERT INTO myschema.debezium_signal (id, type, data)
1 values ('ad-hoc-1',
2 'execute-snapshot',
3 '{"data-collections": ["schema1.table1", "schema2.table2"],
4 "type":"incremental"},
5 "additional-condition":"color=blue"}');
6 コマンドの
id
、type
、およびdata
パラメーターの値は、シグナルテーブルのフィールド に対応します。以下の表では、例のパラメーターについて説明しています。
表3.2 シグナルテーブルに増分スナップショットシグナルを送信する SQL コマンドのフィールドの説明 項目 Value 説明 1
myschema.debezium_signal
ソースデータベースのシグナルテーブルの完全修飾名を指定します。
2
ad-hoc-1
id
パラメーターは、シグナルリクエストのID
識別子として割り当てられる任意の文字列を指定します。
この文字列を使用して、シグナルテーブルのエントリーへのログメッセージを特定します。Debezium はこの文字列を使用しません。代わりに、スナップショット作成中に、Debezium は独自のID
文字列をウォーターマークシグナルとして生成します。3
execute-snapshot
type
パラメーターは、シグナルがトリガーする操作を指定します。
4
data-collections
シグナルの
data
フィールドの必須コンポーネントで、スナップショットに含めるテーブル名の配列またはテーブル名と一致する正規表現を指定します。
この配列は、完全修飾名でテーブルをマッチさせる正規表現をリストアップします。signal.data.collection
設定プロパティーでコネクターのシグナリングテーブル名を指定するのと同じ形式を使用します。5
incremental
実行するスナップショット操作の種類指定するシグナルの
data
フィールドの任意のtype
コンポーネント。
現在、唯一の有効なオプションはデフォルト値incremental
だけです。
値を指定しない場合には、コネクターは増分スナップショットを実行します。6
additional-condition
テーブルの内容のサブセットを取得するために、テーブルの列に基づいて条件を指定するオプションの文字列。
additional-condition
パラメーターの詳細は、追加条件
のあるアドホック増分スナップショット を参照してください。
追加条件
のあるアドホック増分スナップショット
スナップショットに、テーブル内のコンテンツのサブセットのみを含める場合は、スナップショットシグナルシグナルに additional-condition
パラメーターを追加してシグナル要求を変更できます。
一般的なスナップショットの SQL クエリーは、以下の形式を取ります。
SELECT * FROM <tableName> ....
SELECT * FROM <tableName> ....
additional-condition
パラメーターを追加して、以下の例のように WHERE
条件を SQL クエリーに追加します。
SELECT * FROM <tableName> WHERE <additional-condition> ....
SELECT * FROM <tableName> WHERE <additional-condition> ....
以下の例は、シグナルテーブルに追加の条件を含むアドホック増分スナップショット要求を送信する SQL クエリーを示しています。
INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<tableName>","<tableName>"],"type":"<snapshotType>","additional-condition":"<additional-condition>"}');
INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<tableName>","<tableName>"],"type":"<snapshotType>","additional-condition":"<additional-condition>"}');
たとえば、以下の列が含まれる products
テーブルがあるとします。
-
id
(プライマリーキー) -
color
-
quantity
products
テーブルの増分スナップショットに color=blue
のデータ項目のみを含める場合は、以下の SQL ステートメントを使用してスナップショットをトリガーできます。
INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.products"],"type":"incremental", "additional-condition":"color=blue"}');
INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.products"],"type":"incremental", "additional-condition":"color=blue"}');
additional-condition
パラメーターを使用すると、列を超える条件を渡すこともできます。たとえば、前述の例の products
テーブルを使用して、color=blue
および quantity>10
の項目のみのデータを含む増分スナップショットをトリガーするクエリーを送信できます。
INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.products"],"type":"incremental", "additional-condition":"color=blue AND quantity>10"}');
INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.products"],"type":"incremental", "additional-condition":"color=blue AND quantity>10"}');
以下の例は、コネクターによってキャプチャーされる増分スナップショットイベントの JSON を示しています。
例: 増分スナップショットイベントメッセージ
{ "before":null, "after": { "pk":"1", "value":"New data" }, "source": { ... "snapshot":"incremental" }, "op":"r", "ts_ms":"1620393591654", "transaction":null }
{
"before":null,
"after": {
"pk":"1",
"value":"New data"
},
"source": {
...
"snapshot":"incremental"
},
"op":"r",
"ts_ms":"1620393591654",
"transaction":null
}
項目 | フィールド名 | 説明 |
---|---|---|
1 |
|
実行するスナップショット操作タイプを指定します。 |
2 |
|
イベントタイプを指定します。 |
3.2.1.4. 増分スナップショットの停止
ソースデータベースのテーブルにシグナルを送信することで、増分スナップショットを停止することもできます。SQL INSERT
クエリーを送信して、停止スナップショットシグナルをテーブルに送信します。
Debezium はシグナルテーブルの変更を検出した後、シグナルを読み、増分スナップショット操作が進行中であればそれを停止します。
送信するクエリーは、incremental
のスナップショット操作を指定し、任意で、削除する実行中のスナップショットのテーブルを指定します。
前提条件
- ソースデータベースにシグナリングデータコレクションが存在します。
-
シグナルデータコレクションは
signal.data.collection
プロパティーで指定されます。
手順
SQL クエリーを送信して、シグナリングテーブルへのアドホックインクリメンタル スナップショットを停止します。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow INSERT INTO <signalTable> (id, type, data) values ('<id>', 'stop-snapshot', '{"data-collections": ["<tableName>","<tableName>"],"type":"incremental"}');
INSERT INTO <signalTable> (id, type, data) values ('<id>', 'stop-snapshot', '{"data-collections": ["<tableName>","<tableName>"],"type":"incremental"}');
以下に例を示します。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow INSERT INTO myschema.debezium_signal (id, type, data) values ('ad-hoc-1', 'stop-snapshot', '{"data-collections": ["schema1.table1", "schema2.table2"], "type":"incremental"}');
INSERT INTO myschema.debezium_signal (id, type, data)
1 values ('ad-hoc-1',
2 'stop-snapshot',
3 '{"data-collections": ["schema1.table1", "schema2.table2"],
4 "type":"incremental"}');
5 signal コマンドの
id
、type
、およびdata
パラメーターの値は、シグナル テーブルのフィールドに対応し ます。以下の表では、例のパラメーターについて説明しています。
表3.3 シグナリングテーブルに増分スナップショット停止信号を送信するための SQL コマンドのフィールドの説明 項目 Value 説明 1
myschema.debezium_signal
ソースデータベースのシグナルテーブルの完全修飾名を指定します。
2
ad-hoc-1
id
パラメーターは、シグナルリクエストのID
識別子として割り当てられる任意の文字列を指定します。
この文字列を使用して、シグナルテーブルのエントリーへのログメッセージを特定します。Debezium はこの文字列を使用しません。3
stop-snapshot
type
パラメーターを指定し、シグナルがトリガーする操作を指定します。
4
data-collections
シグナルの
data
フィールドのオプションコンポーネントで、スナップショットから削除するテーブル名の配列またはテーブル名とマッチする正規表現を指定します。
この配列は、完全修飾名でテーブルをマッチさせる正規表現をリストアップします。signal.data.collection
設定プロパティーでコネクターのシグナリングテーブル名を指定するのと同じ形式を使用します。data
フィールドのこのコンポーネントを省略すると、シグナルは進行中の増分スナップショット全体を停止します。5
incremental
停止させるスナップショット操作の種類を指定する信号の
data
フィールドの必須コンポーネント。
現在、有効な唯一のオプションはincremental
です。type
の値を指定しないと、シグナルは増分スナップショットの停止に失敗します。
3.2.2. Debezium Db2 コネクターによる変更データテーブルの読み取り方法
スナップショットの完了後、Debezium Db2 コネクターが初めて起動すると、キャプチャーモードである各ソーステーブルの変更データテーブルを識別します。コネクターは各変更データテーブルに対して以下を行います。
- 最後に保存された最大 LSN から現在の最大 LSN の間に作成された変更イベントを読み取ります。
- 各イベントのコミット LSN および変更 LSN に従って、変更イベントを順序付けます。これにより、コネクターはテーブルが変更された順序で変更イベントを出力します。
- コミット LSN および変更 LSN をオフセットとして Kafka Connect に渡します。
- コネクターが Kafka Connect に渡した最大 LSN を保存します。
再起動後、コネクターは停止した場所でオフセット (コミット LSN および変更 LSN) から変更イベントの出力を再開します。コネクターが稼働し、変更イベントを出力している間、キャプチャーモードからテーブルを削除したり、テーブルをキャプチャーモードに追加したりすると、コネクターは変更を検出して、それに合わせて動作を変更します。
3.2.3. Debezium Db2 変更イベントレコードを受信する Kafka トピックのデフォルト名
デフォルトでは、Db2 コネクターは、テーブルで発生するすべての INSERT
、UPDATE
、DELETE
操作の変更イベントを、そのテーブルに固有の単一の Apache Kafka トピックに書き込みます。コネクターは以下の規則を使用して変更イベントトピックに名前を付けます。
topicPrefix.schemaName.tableName
以下のリストは、デフォルト名のコンポーネントの定義を示しています。
- topicPrefix
-
topic.prefix
コネクター設定プロパティーで指定されたトピック接頭辞。 - schemaName
- 操作が発生したスキーマの名前。
- tableName
- 操作が発生したテーブルの名前。
たとえば、MYSCHEMA
スキーマに 4 つのテーブル (PRODUCTS
、PRODUCTS_ON_HAND
、CUSTOMERS
、ORDERS
) を含む mydatabase
データベースを使用した Db2 インストールについて考えてみます。コネクターはイベントを以下の 4 つの Kafka トピックに出力します。
-
mydatabase.MYSCHEMA.PRODUCTS
-
mydatabase.MYSCHEMA.PRODUCTS_ON_HAND
-
mydatabase.MYSCHEMA.CUSTOMERS
-
mydatabase.MYSCHEMA.ORDERS
コネクターは同様の命名規則を適用して、内部データベーススキーマ履歴トピック、スキーマ変更トピック、およびトランザクションメタデータトピック にラベル を 付けます。
デフォルトのトピック名が要件を満たさない場合は、カスタムトピック名を設定できます。カスタムトピック名を設定するには、論理トピックルーティング SMT に正規表現を指定します。論理トピックルーティング SMT を使用してトピックの命名をカスタマイズする方法は、トピック ルーティング を参照し てください。
3.2.4. Debezium Db2 コネクターのスキーマ変更トピック
Debezium Db2 コネクターを設定すると、データベースのキャプチャーされたテーブルに適用されるスキーマの変更を記述するスキーマ変更イベントを生成できます。
Debezium は、以下の場合にスキーマ変更トピックにメッセージを出力します。
- 新しいテーブルがキャプチャーモードになる。
- テーブルがキャプチャーモードから削除される。
- データベーススキーマの更新 中に、キャプチャーモードのテーブルのスキーマに変更が加えられます。
コネクターはスキーマ変更イベントを、<topicPrefix>
という名前の Kafka スキーマ変更トピックに書き込みます。ここで <topicPrefix>
は、topic.prefix
コネクター設定プロパティーで指定されたトピック接頭辞です。コネクターがスキーマ変更トピックに送信するメッセージには以下の要素などのペイロードが含まれます。
databaseName
-
ステートメントが適用されるデータベースの名前。
databaseName
の値は、メッセージキーとして機能します。 pos
- ステートメントが表示される binlog の位置。
tableChanges
-
スキーマの変更後のテーブルスキーマ全体の構造化表現。
tableChanges
フィールドには、テーブルの各列のエントリーなどのアレイが含まれます。構造化された表現は JSON または Avro 形式でデータを表示するため、コンシューマーは DDL パーサーを介して最初にメッセージを処理しなくてもメッセージを簡単に読み取りできます。
キャプチャーモードであるテーブルでは、コネクターはスキーマ変更トピックにスキーマ変更の履歴だけでなく、内部データベーススキーマ履歴トピックにも格納します。内部データベーススキーマ履歴トピックはコネクターのみの使用を対象としており、使用するアプリケーションによる直接使用を目的としていません。スキーマ変更に関する通知が必要なアプリケーションが、スキーマ変更トピックからの情報のみを使用するようにしてください。
データベーススキーマ履歴トピックをパーティションに分割しないでください。データベーススキーマ履歴トピックが正しく機能するには、コネクターが出力するイベントレコードの一貫したグローバル順序を維持する必要があります。
トピックがパーティション間で分割されないようにするには、以下のいずれかの方法を使用してトピックのパーティション数を設定します。
-
データベーススキーマ履歴トピックを手動で作成する場合は、パーティション数を
1
に指定します。 -
Apache Kafka ブローカーを使用してデータベーススキーマ履歴トピックを自動的に作成する場合に、トピックが作成されるので、Kafka
num.partitions
設定オプションの値を1
に設定します。
コネクターがスキーマ変更トピックに出力するメッセージの形式は、初期の状態であり、通知なしに変更される可能性があります。
例: Db2 コネクターのスキーマ変更トピックに出力されるメッセージ
以下の例は、スキーマ変更トピックのメッセージを示しています。メッセージには、テーブルスキーマの論理表現が含まれます。
{ "schema": { ... }, "payload": { "source": { "version": "2.1.4.Final", "connector": "db2", "name": "db2", "ts_ms": 0, "snapshot": "true", "db": "testdb", "schema": "DB2INST1", "table": "CUSTOMERS", "change_lsn": null, "commit_lsn": "00000025:00000d98:00a2", "event_serial_no": null }, "ts_ms": 1588252618953, "databaseName": "TESTDB", "schemaName": "DB2INST1", "ddl": null, "tableChanges": [ { "type": "CREATE", "id": "\"DB2INST1\".\"CUSTOMERS\"", "table": { "defaultCharsetName": null, "primaryKeyColumnNames": [ "ID" ], "columns": [ { "name": "ID", "jdbcType": 4, "nativeType": null, "typeName": "int identity", "typeExpression": "int identity", "charsetName": null, "length": 10, "scale": 0, "position": 1, "optional": false, "autoIncremented": false, "generated": false }, { "name": "FIRST_NAME", "jdbcType": 12, "nativeType": null, "typeName": "varchar", "typeExpression": "varchar", "charsetName": null, "length": 255, "scale": null, "position": 2, "optional": false, "autoIncremented": false, "generated": false }, { "name": "LAST_NAME", "jdbcType": 12, "nativeType": null, "typeName": "varchar", "typeExpression": "varchar", "charsetName": null, "length": 255, "scale": null, "position": 3, "optional": false, "autoIncremented": false, "generated": false }, { "name": "EMAIL", "jdbcType": 12, "nativeType": null, "typeName": "varchar", "typeExpression": "varchar", "charsetName": null, "length": 255, "scale": null, "position": 4, "optional": false, "autoIncremented": false, "generated": false } ], "attributes": [ { "customAttribute": "attributeValue" } ] } } ] } }
{
"schema": {
...
},
"payload": {
"source": {
"version": "2.1.4.Final",
"connector": "db2",
"name": "db2",
"ts_ms": 0,
"snapshot": "true",
"db": "testdb",
"schema": "DB2INST1",
"table": "CUSTOMERS",
"change_lsn": null,
"commit_lsn": "00000025:00000d98:00a2",
"event_serial_no": null
},
"ts_ms": 1588252618953,
"databaseName": "TESTDB",
"schemaName": "DB2INST1",
"ddl": null,
"tableChanges": [
{
"type": "CREATE",
"id": "\"DB2INST1\".\"CUSTOMERS\"",
"table": {
"defaultCharsetName": null,
"primaryKeyColumnNames": [
"ID"
],
"columns": [
{
"name": "ID",
"jdbcType": 4,
"nativeType": null,
"typeName": "int identity",
"typeExpression": "int identity",
"charsetName": null,
"length": 10,
"scale": 0,
"position": 1,
"optional": false,
"autoIncremented": false,
"generated": false
},
{
"name": "FIRST_NAME",
"jdbcType": 12,
"nativeType": null,
"typeName": "varchar",
"typeExpression": "varchar",
"charsetName": null,
"length": 255,
"scale": null,
"position": 2,
"optional": false,
"autoIncremented": false,
"generated": false
},
{
"name": "LAST_NAME",
"jdbcType": 12,
"nativeType": null,
"typeName": "varchar",
"typeExpression": "varchar",
"charsetName": null,
"length": 255,
"scale": null,
"position": 3,
"optional": false,
"autoIncremented": false,
"generated": false
},
{
"name": "EMAIL",
"jdbcType": 12,
"nativeType": null,
"typeName": "varchar",
"typeExpression": "varchar",
"charsetName": null,
"length": 255,
"scale": null,
"position": 4,
"optional": false,
"autoIncremented": false,
"generated": false
}
],
"attributes": [
{
"customAttribute": "attributeValue"
}
]
}
}
]
}
}
項目 | フィールド名 | 説明 |
---|---|---|
1 |
| コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。 ソースオブジェクトの ts_ms は、データベースで変更が行われた時刻を示す。payload.source.ts_ms の値を payload.ts_ms の値と比較することにより、ソースデータベースの更新と Debezium との間の遅延を判断できます。 |
2 |
| 変更が含まれるデータベースとスキーマを識別します。 |
3 |
|
Db2 コネクターの場合は常に |
4 |
| DDL コマンドによって生成されるスキーマの変更が含まれる 1 つ以上の項目の配列。 |
5 |
| 変更の種類を説明します。値は以下のいずれかになります。
|
6 |
| 作成、変更、または破棄されたテーブルの完全な識別子。 |
7 |
| 適用された変更後のテーブルメタデータを表します。 |
8 |
| テーブルのプライマリーキーを設定する列のリスト。 |
9 |
| 変更されたテーブルの各列のメタデータ。 |
10 |
| 各テーブル変更のカスタム属性メタデータ。 |
コネクターがスキーマ変更トピックに送信するメッセージでは、メッセージキーはスキーマの変更が含まれるデータベースの名前です。以下の例では、payload
フィールドにキーが含まれます。
{ "schema": { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "databaseName" } ], "optional": false, "name": "io.debezium.connector.db2.SchemaChangeKey" }, "payload": { "databaseName": "TESTDB" } }
{
"schema": {
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "databaseName"
}
],
"optional": false,
"name": "io.debezium.connector.db2.SchemaChangeKey"
},
"payload": {
"databaseName": "TESTDB"
}
}
3.2.5. トランザクション境界を表す Debezium Db2 コネクターによって生成されたイベント
Debezium は、トランザクション境界を表し、変更データイベントメッセージをエンリッチするイベントを生成できます。
Debezium は、コネクターのデプロイ後に発生するトランザクションに対してのみメタデータを登録し、受信します。コネクターをデプロイする前に発生するトランザクションのメタデータは利用できません。
Debezium は、すべてのトランザクションで BEGIN
および END
区切り文字のトランザクション境界イベントを生成します。トランザクション境界イベントには以下のフィールドが含まれます。
status
-
BEGIN
またはEND
id
- 一意のトランザクション識別子の文字列表現。
ts_ms
-
データソースでのトランザクション境界イベント (
BEGIN
またはEND
イベント) の時間。もしデータソースが Debezium にイベント時間を提供しないなら、このフィールドは代わりに Debezium がイベントを処理する時間を表します。 event_count
(END
イベント用)- トランザクションによって出力されるイベントの合計数。
data_collections
(END
イベント用)-
data_collection
とevent_count
要素のペアの配列。これは、コネクターがデータコレクションから発信された変更に対して出力するイベントの数を示します。
例
{ "status": "BEGIN", "id": "00000025:00000d08:0025", "ts_ms": 1486500577125, "event_count": null, "data_collections": null } { "status": "END", "id": "00000025:00000d08:0025", "ts_ms": 1486500577691, "event_count": 2, "data_collections": [ { "data_collection": "testDB.dbo.tablea", "event_count": 1 }, { "data_collection": "testDB.dbo.tableb", "event_count": 1 } ] }
{
"status": "BEGIN",
"id": "00000025:00000d08:0025",
"ts_ms": 1486500577125,
"event_count": null,
"data_collections": null
}
{
"status": "END",
"id": "00000025:00000d08:0025",
"ts_ms": 1486500577691,
"event_count": 2,
"data_collections": [
{
"data_collection": "testDB.dbo.tablea",
"event_count": 1
},
{
"data_collection": "testDB.dbo.tableb",
"event_count": 1
}
]
}
topic.transaction
オプションで上書きされない限り、コネクターはトランザクションイベントを <topic.prefix>
.transaction
トピックに出力します。
データ変更イベントのエンリッチメント
トランザクションメタデータを有効にすると、コネクターは変更イベント Envelope
を新しい transaction
フィールドでエンリッチします。このフィールドは、複合フィールドの形式ですべてのイベントに関する情報を提供します。
id
- 一意のトランザクション識別子の文字列表現。
total_order
- トランザクションによって生成されたすべてのイベントを対象とするイベントの絶対位置。
data_collection_order
- トランザクションによって出力されたすべてのイベントを対象とするイベントのデータコレクションごとの位置。
以下は、メッセージの例になります。
{ "before": null, "after": { "pk": "2", "aa": "1" }, "source": { ... }, "op": "c", "ts_ms": "1580390884335", "transaction": { "id": "00000025:00000d08:0025", "total_order": "1", "data_collection_order": "1" } }
{
"before": null,
"after": {
"pk": "2",
"aa": "1"
},
"source": {
...
},
"op": "c",
"ts_ms": "1580390884335",
"transaction": {
"id": "00000025:00000d08:0025",
"total_order": "1",
"data_collection_order": "1"
}
}