8.2. Debezium SQL Server コネクターの仕組み


Debezium SQL Server コネクターを最適に設定および実行するには、コネクターによるスナップショットの実行方法、変更イベントのストリーム方法、Kafka トピック名の決定方法、およびメタデータの使用方法を理解すると便利です。

コネクターの仕組みに関する詳細は、以下のセクションを参照してください。

8.2.1. Debezium SQL Sever コネクターによるデータベーススナップショットの実行方法

SQL Server CDC は、データベースの変更履歴を完全に保存するようには設計されていません。Debezium SQL Server コネクターでデータベースの現在の状態のベースラインを確立するためには、snapshotting と呼ばれるプロセスを使用します。

コネクターによるスナップショットの作成方法を設定できます。デフォルトでは、コネクターのスナップショットモードは initial に設定されます。この initial スナップショットモードを基にして、コネクターが最初に起動すると、データベースの最初の 整合性スナップショット が実行されます。この初期スナップショットは、コネクター用に設定されたinclude プロパティーおよび exclude プロパティー (table.include.listcolumn.include.listtable.exclude.list など) で定義された基準に一致するテーブルの構造とデータをキャプチャします。

コネクターがスナップショットを作成すると、以下のタスクを完了します。

  1. キャプチャーするテーブルを決定します。
  2. スナップショットの作成時に構造が変更されないように、CDC が有効になっている SQL Server テーブルのロックを取得します。ロックのレベルは、snapshot.isolation.mode 設定プロパティーによって決定されます。
  3. サーバーのトランザクションログでの最大ログシーケンス番号 (LSN) の位置を読み取ります。
  4. 関連するテーブルすべての構造をキャプチャーします。
  5. 必要な場合は、ステップ 2 で取得したロックを解放します。ほとんどの場合、ロックは短期間のみ保持されます。
  6. ステップ 3 で読み込まれた LSN の位置に基づいてキャプチャーする SQL Server ソーステーブルとスキーマをスキャンし、テーブルの各行の READ イベントを生成して、そのテーブルの Kafka トピックにイベントを書き込みます。
  7. コネクターオフセットにスナップショットの正常な完了を記録します。

作成された最初のスナップショットは、CDC に対して有効になっているテーブルの各行の現在の状態をキャプチャーします。このベースライン状態から、コネクターは発生した後続の変更をキャプチャーします。

8.2.1.1. アドホックスナップショット

重要

アドホックスナップショットの使用はテクノロジープレビュー機能です。テクノロジープレビュー機能は、Red Hat の実稼働環境のサービスレベルアグリーメント (SLA) ではサポートされません。また、機能的に完全ではない可能性があるため、Red Hat はテクノロジープレビュー機能を実稼働環境に実装することは推奨しません。テクノロジープレビューの機能は、最新の技術をいち早く提供して、開発段階で機能のテストやフィードバックの収集を可能にするために提供されます。サポート範囲の詳細は、テクノロジープレビュー機能のサポート範囲 を参照してください。

デフォルトでは、コネクターは初回スナップショット操作の開始後にのみ実行されます。通常の状況では、この最初のスナップショットが作成されると、コネクターではスナップショットプロセスは繰り返し処理されません。コネクターがキャプチャーする今後の変更イベントデータはストリーミングプロセス経由でのみ行われます。

ただし、場合によっては、最初のスナップショット中にコネクターを取得したデータが古くなったり、失われたり、または不完全となったり可能性があります。テーブルデータを再キャプチャーするメカニズムを提供するため、Debezium にはアドホックスナップショットを実行するオプションがあります。データベースで以下が変更されたことで、アドホックスナップショットが実行される場合があります。

  • コネクター設定は、異なるテーブルセットをキャプチャーするように変更されます。
  • Kafka トピックを削除して、再構築する必要があります。
  • 設定エラーや他の問題が原因で、データの破損が発生します。

アドホックと呼ばれるスナップショット を開始することで、以前にスナップショットをキャプチャーしたテーブルのスナップショットを再実行できます。アドホックスナップショットには、シグナルテーブル を使用する必要があります。シグナルリクエストを Debezium シグナルテーブルに送信して、アドホックスナップショットを開始します。

既存のテーブルのアドホックスナップショットを開始すると、コネクターはテーブルにすでに存在するトピックにコンテンツを追加します。既存のトピックが削除された場合には、トピックの自動作成 が有効になっているのであれば、Debezium は自動的にトピックを作成できます。

アドホックのスナップショットシグナルは、スナップショットに追加するテーブルを指定します。スナップショットは、データベースの内容全体をキャプチャーしたり、データベース内のテーブルのサブセットのみをキャプチャーしたりできます。

execute-snapshot メッセージをシグナルテーブルに送信してキャプチャーするテーブルを指定します。以下の表で説明されているように、run-snapshot シグナルのタイプを incremental に設定し、スナップショットに追加するテーブルの名前を指定します。

表8.1 アドホックの execute-snapshot シグナルレコードの例
フィールドデフォルト

type

incremental

実行するスナップショットのタイプを指定します。
タイプの設定は任意です。現在要求できるのは、incremental スナップショットのみです。

data-collections

該当なし

スナップショットを作成するテーブルの完全修飾名が含まれる配列。
名前の形式は signal.data.collection 設定オプションと同じです。

アドホックスナップショットのトリガー

execute-snapshot シグナルタイプのエントリーをシグナルテーブルに追加して、アドホックスナップショットを開始します。コネクターがメッセージを処理した後に、スナップショット操作を開始します。スナップショットプロセスは、最初と最後のプライマリーキーの値を読み取り、これらの値を各テーブルの開始ポイントおよびエンドポイントとして使用します。テーブルのエントリー数と設定されたチャンクサイズに基づいて、Debezium はテーブルをチャンクに分割し、チャンクごとに 1 度に 1 つずつスナップショットを順番に作成していきます。

現在、execute-snapshot アクションタイプは 増分スナップショット のみをトリガーします。詳細は、スナップショットの増分を参照してください。

8.2.1.2. 増分スナップショット

重要

増分スナップショットの使用はテクノロジープレビュー機能です。テクノロジープレビュー機能は、Red Hat の実稼働環境のサービスレベルアグリーメント (SLA) ではサポートされません。また、機能的に完全ではない可能性があるため、Red Hat はテクノロジープレビュー機能を実稼働環境に実装することは推奨しません。テクノロジープレビューの機能は、最新の技術をいち早く提供して、開発段階で機能のテストやフィードバックの収集を可能にするために提供されます。サポート範囲の詳細は、テクノロジープレビュー機能のサポート範囲 を参照してください。

スナップショットを柔軟に管理するため、Debezium には 増分スナップショット と呼ばれる補助スナップショットメカニズムが含まれています。増分スナップショットは、Debezium コネクターにシグナルを送信するための Debezium メカニズムに依存します。

増分スナップショットでは、最初のスナップショットのように、データベースの完全な状態を一度にすべてキャプチャーする代わりに、一連の設定可能なチャンクで各テーブルを段階的にキャプチャーします。スナップショットがキャプチャーするテーブルと、各チャンクのサイズ を指定できます。チャンクのサイズにより、データベース上の各フェッチ操作中にスナップショットで収集される行数が決まります。増分スナップショットのデフォルトのチャンクサイズは 1 KB です。

増分スナップショットが進むと、Debezium はウォーターマークを使用して進捗を追跡し、キャプチャーする各テーブル行のレコードを管理します。この段階的なアプローチでは、標準の初期スナップショットプロセスと比較して、以下の利点があります。

  • スナップショットが完了するまで、ストリーミングストリーミングを延期する代わりに、ストリームしたデータキャプチャーと並行して増分スナップショットを実行できます。コネクターはスナップショットプロセス全体で変更ログからのほぼリアルタイムイベントをキャプチャーし続け、他の操作はブロックしません。
  • 増分スナップショットの進捗が中断された場合は、データを失うことなく再開できます。プロセスが再開すると、スナップショットは最初からテーブルをキャプチャーするのではなく、停止した時点から開始します。
  • いつでも増分スナップショットを実行し、必要に応じてプロセスを繰り返してデータベースの更新に適合できます。たとえば、コネクター設定を変更してテーブルを table.include.list プロパティーに追加した後にスナップショットを再実行します。

増分スナップショットプロセス

増分スナップショットを実行する場合には、Debezium は各テーブルをプライマリーキー別に分類して、設定されたチャンクサイズ に基づいてテーブルをチャンクに分割します。チャンクごとに作業し、テーブルの行ごとにチャンクでキャプチャーします。キャプチャーする行ごとに、スナップショットは READ イベントを出力します。そのイベントは、対象となるチャンクのスナップショットを開始する時の行の値を表します。

スナップショットの作成が進むにつれ、他のプロセスがデータベースへのアクセスを継続し、テーブルレコードが変更される可能性があります。このような変更を反映させるように、通常通りに INSERTUPDATEDELETE 操作がトランザクションログにコミットされます。同様に、継続中の Debezium ストリーミングプロセスは、これらの変更イベントを検出し、対応する変更イベントレコードを Kafka に出力します。

Debezium を使用してプライマリーキーが同じレコード間での競合を解決する方法

場合によっては、ストリーミングプロセスが出力する UPDATE または DELETE イベントを順番に受信できます。つまり、ストリーミングプロセスは、スナップショットがその行の READ イベントが含まれるチャンクをキャプチャーする前に、テーブルの行を変更するイベントを生成する可能性があります。スナップショットが最終的に対象の行にあった READ イベントを出力すると、その値はすでに置き換えられています。Debezium は、シーケンスが到達する増分スナップショットイベントが正しい論理順序で処理されるように、競合を解決するためにバッファースキームを使用します。スナップショットのイベント間で競合が発生し、ストリームされたイベントが解決されてからでないと、Debezium はイベントのレコードを Kafka に送信しません。

スナップショットウィンドウ

遅れて入ってきた READ イベントと、同じテーブルの行を変更するストリーミングイベント間の競合の解決を容易にするために、Debezium は スナップショットウィンドウ と呼ばれるものを使用します。スナップショットウィンドウは、増分スナップショットが指定のテーブルチャンクのデータをキャプチャーしている途中に、間隔を決定します。チャンクのスナップショットウィンドウを開く前に、Debezium は通常の動作に従い、トランザクションログから直接ターゲットの Kafka トピックにイベントをダウンストリームに出力します。ただし、特定のチャンクのスナップショットが開放された瞬間から終了するまで、Debezium は重複除去のステップを実行して、プライマリーキーが同じイベント間での競合を解決します。

データコレクションごとに、Debezium は 2 種類のイベントを出力し、それらの両方のレコードを単一の宛先 Kafka トピックに保存します。テーブルから直接キャプチャーするスナップショットレコードは、READ 操作として出力されます。その間、ユーザーはデータコレクションのレコードの更新を続け、各コミットを反映するようにトランザクションログが更新されるので、Debezium は変更ごとに UPDATE または DELETE 操作を出力します。

スナップショットウィンドウが開放され、Debezium がスナップショットチャンクの処理を開始すると、スナップショットレコードをメモリーバッファーに提供します。スナップショットウィンドウ中に、バッファー内の READ イベントのプライマリーキーは、受信ストリームイベントのプライマリーキーと比較されます。一致するものが見つからない場合、ストリーミングされたイベントレコードが Kafka に直接送信されます。Debezium が一致を検出すると、バッファーされた READ イベントを破棄し、ストリーミングされたレコードを宛先トピックに書き込みます。これは、ストリーミングされたイベントが静的スナップショットイベントよりも論理的に優先されるためです。チャンクのスナップショットウィンドウが終了すると、バッファーに含まれるのは、関連するトランザクションログイベントが存在しない READ イベントのみです。Debezium は、これらの残りの READ イベントをテーブルの Kafka トピックに出力します。

コネクターは各スナップショットチャンクにプロセスを繰り返します。

増分スナップショットのトリガー

現在、増分スナップショットを開始する唯一の方法は、アドホックスナップショットシグナル をソースデータベースのシグナルテーブルに送信することです。SQL INSERT クエリーとしてテーブルにシグナルを送信します。Debezium がシグナルテーブルの変更を検出すると、シグナルを読み取り、要求されたスナップショット操作を実行します。

送信するクエリーはスナップショットに追加するテーブルを指定し、必要に応じてスナップショット操作の種類を指定します。現在、スナップショット操作で唯一の有効なオプションはデフォルト値の incremental だけです。

スナップショットに追加するテーブルを指定するには、テーブルをリスト表示する data-collectionsアレイを指定します (例:
{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]})。

増分スナップショットシグナルの data-collections アレイにはデフォルト値がありません。data-collections アレイが空である場合には、アクションが不要であり、スナップショットを実行しないことが、Debezium で検出されます。

前提条件

  • シグナルが有効になっている

    • シグナルデータコレクションがソースのデータベースに存在し、コネクターはこれをキャプチャーするように設定されています。
    • シグナルデータコレクションは signal.data.collection プロパティーで指定されます。

手順

  1. SQL クエリーを送信し、アドホック増分スナップショット要求をシグナルテーブルに追加します。

    INSERT INTO _<signalTable>_ (id, type, data) VALUES (_'<id>'_, _'<snapshotType>'_, '{"data-collections": ["_<tableName>_","_<tableName>_"],"type":"_<snapshotType>_"}');

    以下に例を示します。

    INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.table1", "schema2.table2"],"type":"incremental"}');

    コマンドの idtype、および data パラメーターの値は、シグナルテーブルのフィールド に対応します。

    以下の表では、これらのパラメーターについて説明しています。

    表8.2 シグナルテーブルに増分スナップショットシグナルを送信する SQL コマンドのフィールドの説明
    説明

    myschema.debezium_signal

    ソースデータベースにあるシグナルテーブルの完全修飾名を指定します。

    ad-hoc-1

    id パラメーターは、シグナルリクエストの ID 識別子として割り当てられる任意の文字列を指定します。
    この文字列を使用して、シグナルテーブルのエントリーへのログメッセージを特定します。Debezium はこの文字列を使用しません。代わりに、スナップショット作成中に、Debezium は独自の ID 文字列をウォーターマークシグナルとして生成します。

    execute-snapshot

    type パラメーターを指定し、シグナルがトリガーする操作を指定します。

    data-collections

    スナップショットに含めるテーブル名の配列を指定するシグナルの dataフィールドの必須コンポーネント。
    配列は、signal.data.collection 設定プロパティーにコネクターのシグナルテーブルの名前を指定するときに使用する形式で、完全修飾名別にテーブルをリスト表示します。

    incremental

    実行するスナップショット操作の種類指定するシグナルの data フィールドの任意のtype コンポーネント。
    現在、唯一の有効なオプションはデフォルト値 incremental だけです。
    シグナルテーブルに送信する SQL クエリーでの type 値の指定は任意です。
    値を指定しない場合には、コネクターは増分スナップショットを実行します。

以下の例は、コネクターによってキャプチャーされる増分スナップショットイベントの JSON を示しています。

例: 増分スナップショットイベントメッセージ

{
    "before":null,
    "after": {
        "pk":"1",
        "value":"New data"
    },
    "source": {
        ...
        "snapshot":"incremental" 1
    },
    "op":"r", 2
    "ts_ms":"1620393591654",
    "transaction":null
}

項目フィールド名説明

1

snapshot

実行するスナップショット操作タイプを指定します。
現在、唯一の有効なオプションはデフォルト値 incremental だけです。
シグナルテーブルに送信する SQL クエリーでの type 値の指定は任意です。
値を指定しない場合には、コネクターは増分スナップショットを実行します。

2

op

イベントタイプを指定します。
スナップショットイベントの値は r で、READ 操作を示します。

警告

SQL Server の Debezium コネクターでは、増分スナップショットの実行中のスキーマの変更はサポートしません。

8.2.2. Debezium SQL Server コネクターによる変更データテーブルの読み取り方法

コネクターが最初に起動すると、キャプチャーされたテーブルの構造のスナップショットを作成し、その情報を内部データベース履歴トピックに永続化します。その後、コネクターは各ソーステーブルの変更テーブルを特定し、以下の手順を完了します。

  1. コネクターは、変更テーブルごとに、最後に保存された最大 LSN と現在の最大 LSN の間に作成された変更をすべて読み取ります。
  2. コネクターは、コミット LSN と変更 LSN の値を基にして、読み取る変更を昇順で並び替えします。この並べ替えの順序により、変更はデータベースで発生した順序で Debezium によって再生されるようになります。
  3. コネクターは、コミット LSN および変更 LSN をオフセットとして Kafka Connect に渡します。
  4. コネクターは最大 LSN を保存し、ステップ 1 からプロセスを再開します。

再開後、コネクターは読み取った最後のオフセット (コミットおよび変更 LSN) から処理を再開します。

コネクターは、含まれるソーステーブルに対して CDC が有効または無効化されているかどうかを検出し、その動作を調整することができます。

8.2.3. Debezium SQL Server 変更イベントレコードを受信する Kafka トピックのデフォルト名

デフォルトでは、SQL Server コネクターは、テーブルで発生するすべての INSERTUPDATEDELETE 操作のイベントを、そのテーブルに固有の単一の Apache Kafka トピックに書き込みます。コネクターは以下の規則を使用して変更イベントトピックに名前を付けます。<serverName>.<schemaName>.<tableName>

以下のリストは、デフォルト名のコンポーネントの定義を示しています。

serverName
database.server.name コネクター設定プロパティーで指定したサーバーの論理名です。
schemaName
変更イベントが発生したデータベーススキーマの名前。
tableName
変更イベントが発生したデータベーステーブルの名前。

たとえば、fulfillment がサーバー名、dbo がスキーマ名で、データベースに productsproducts_on_handcustomersorders という名前のテーブルがある場合、コネクターは変更イベントレコードを次の Kafka トピックにストリーミングします。

  • fulfillment.dbo.products
  • fulfillment.dbo.products_on_hand
  • fulfillment.dbo.customers
  • fulfillment.dbo.orders

コネクターは同様の命名規則を適用して、内部データベース履歴トピック (スキーマ変更トピックトランザクションメタデータトピック) にラベルを付けます。

デフォルトのトピック名が要件を満たさない場合は、カスタムトピック名を設定できます。カスタムトピック名を設定するには、論理トピックルーティング SMT に正規表現を指定します。論理トピックルーティング SMT を使用してトピックの命名をカスタマイズする方法は、トピックルーティング を参照してください。

8.2.4. Debezium SQL Server コネクターによるスキーマ変更トピックの使用方法

CDC が有効になっている各テーブルについて、Debezium SQL Server コネクターは、データベース内のキャプチャしたテーブルに適用されたスキーマ変更イベントの履歴を保存します。コネクターは、スキーマ変更イベントをすべて <serverName> という名前の Kafka トピックに書き込みます。serverNamedatabase.server.name 設定プロパティーに指定された論理サーバー名になります。

コネクターがスキーマ変更トピックに送信するメッセージには、ペイロードと、任意で変更イベントメッセージのスキーマが含まれます。スキーマ変更イベントメッセージのペイロードには、以下の要素が含まれます。

databaseName
ステートメントが適用されるデータベースの名前。databaseName の値は、メッセージキーとして機能します。
tableChanges
スキーマの変更後のテーブルスキーマ全体の構造化表現。tableChanges フィールドには、テーブルの各列のエントリーなどのアレイが含まれます。構造化された表現は JSON または Avro 形式でデータを表示するため、コンシューマーは DDL パーサーを介して最初にメッセージを処理しなくてもメッセージを簡単に読み取りできます。
重要

コネクターがテーブルをキャプチャするように設定されている場合、テーブルのスキーマ変更の履歴は、スキーマ変更トピックだけでなく、内部データベース履歴トピックにも格納されます。内部データベース履歴トピックはコネクターのみの使用を対象としており、使用するアプリケーションによる直接使用を目的としていません。スキーマ変更に関する通知が必要なアプリケーションが、スキーマ変更トピックからの情報のみを使用するようにしてください。

警告

コネクターがスキーマ変更トピックに出力するメッセージの形式は、初期の状態であり、通知なしに変更される可能性があります。

Debezium は、以下のイベントの発生時にスキーマ変更トピックにメッセージを出力します。

  • テーブルの CDC を有効にします。
  • テーブルの CDC を無効にします。
  • スキーマの進化手順 に従って、CDC が有効になっているテーブルの構造を変更します。

例: SQL Server コネクターのスキーマ変更トピックに送信されるメッセージ

以下の例は、スキーマ変更トピックのメッセージを示しています。メッセージには、テーブルスキーマの論理表現が含まれます。

{
  "schema": {
  ...
  },
  "payload": {
    "source": {
      "version": "1.7.2.Final",
      "connector": "sqlserver",
      "name": "server1",
      "ts_ms": 1588252618953,
      "snapshot": "true",
      "db": "testDB",
      "schema": "dbo",
      "table": "customers",
      "change_lsn": null,
      "commit_lsn": "00000025:00000d98:00a2",
      "event_serial_no": null
    },
    "databaseName": "testDB", 1
    "schemaName": "dbo",
    "ddl": null, 2
    "tableChanges": [ 3
      {
        "type": "CREATE", 4
        "id": "\"testDB\".\"dbo\".\"customers\"", 5
        "table": { 6
          "defaultCharsetName": null,
          "primaryKeyColumnNames": [ 7
            "id"
          ],
          "columns": [ 8
            {
              "name": "id",
              "jdbcType": 4,
              "nativeType": null,
              "typeName": "int identity",
              "typeExpression": "int identity",
              "charsetName": null,
              "length": 10,
              "scale": 0,
              "position": 1,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            },
            {
              "name": "first_name",
              "jdbcType": 12,
              "nativeType": null,
              "typeName": "varchar",
              "typeExpression": "varchar",
              "charsetName": null,
              "length": 255,
              "scale": null,
              "position": 2,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            },
            {
              "name": "last_name",
              "jdbcType": 12,
              "nativeType": null,
              "typeName": "varchar",
              "typeExpression": "varchar",
              "charsetName": null,
              "length": 255,
              "scale": null,
              "position": 3,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            },
            {
              "name": "email",
              "jdbcType": 12,
              "nativeType": null,
              "typeName": "varchar",
              "typeExpression": "varchar",
              "charsetName": null,
              "length": 255,
              "scale": null,
              "position": 4,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            }
          ]
        }
      }
    ]
  }
}
表8.3 スキーマ変更トピックに出力されたメッセージのフィールドの説明
項目フィールド名説明

1

databaseName
schemaName

変更が含まれるデータベースとスキーマを識別します。

2

ddl

SQL Server コネクターの場合は常に null です。その他のコネクターでは、このフィールドにスキーマの変更を行う DDL が含まれます。この DDL は SQL Server コネクターでは使用できません。

3

tableChanges

DDL コマンドによって生成されるスキーマの変更が含まれる 1 つ以上の項目の配列。

4

type

変更の種類を説明します。値は以下のいずれかになります。

  • CREATE - テーブルの作成
  • ALTER - テーブルの変更
  • DROP - テーブルの削除

5

id

作成、変更、または破棄されたテーブルの完全な識別子。

6

table

適用された変更後のテーブルメタデータを表します。

7

primaryKeyColumnNames

テーブルのプライマリーキーを設定する列のリスト。

8

変更されたテーブルの各列のメタデータ。

コネクターがスキーマ変更トピックに送信するメッセージでは、キーはスキーマの変更が含まれるデータベースの名前です。以下の例では、payload フィールドにキーが含まれます。

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "string",
        "optional": false,
        "field": "databaseName"
      }
    ],
    "optional": false,
    "name": "io.debezium.connector.sqlserver.SchemaChangeKey"
  },
  "payload": {
    "databaseName": "testDB"
  }
}

8.2.5. Debezium SQL Server コネクターのデータ変更イベントの説明

Debezium SQL Server コネクターは、行レベルの INSERTUPDATE、および DELETE 操作ごとにデータ変更イベントを生成します。各イベントにはキーと値が含まれます。キーと値の構造は、変更されたテーブルによって異なります。

Debezium および Kafka Connect は、イベントメッセージの継続的なストリーム を中心として設計されています。ただし、これらのイベントの構造は時間の経過とともに変化する可能性があり、コンシューマーによる処理が困難になることがあります。これに対応するために、各イベントにはコンテンツのスキーマが含まれます。スキーマレジストリーを使用している場合は、コンシューマーがレジストリーからスキーマを取得するために使用できるスキーマ ID が含まれます。これにより、各イベントが自己完結型になります。

以下のスケルトン JSON は、変更イベントの基本となる 4 つの部分を示しています。ただし、アプリケーションで使用するために選択した Kafka Connect コンバーターの設定方法によって、変更イベントのこれら 4 部分の表現が決定されます。schema フィールドは、変更イベントが生成されるようにコンバーターを設定した場合のみ変更イベントに含まれます。同様に、イベントキーおよびイベントペイロードは、変更イベントが生成されるようにコンバーターを設定した場合のみ変更イベントに含まれます。JSON コンバーターを使用し、変更イベントの基本となる 4 つの部分すべてを生成するように設定すると、変更イベントの構造は次のようになります。

{
 "schema": { 1
   ...
  },
 "payload": { 2
   ...
 },
 "schema": { 3
   ...
 },
 "payload": { 4
   ...
 },
}
表8.4 変更イベントの基本内容の概要
項目フィールド名説明

1

schema

最初の schema フィールドはイベントキーの一部です。イベントキーの payload の部分の内容を記述する Kafka Connect スキーマを指定します。つまり、最初の schema フィールドは、変更されたテーブルのプライマリーキーの構造、またはテーブルにプライマリーキーがない場合は変更されたテーブルの一意キーの構造を記述します。

message.key.columnsコネクター設定プロパティー を設定すると、テーブルのプライマリーキーをオーバーライドできます。この場合、最初の schema フィールドはそのプロパティーによって識別されるキーの構造を記述します。

2

payload

最初の payload フィールドはイベントキーの一部です。前述の schema フィールドによって記述された構造を持ち、変更された行のキーが含まれます。

3

schema

2 つ目の schema フィールドはイベント値の一部です。イベント値の payload の部分の内容を記述する Kafka Connect スキーマを指定します。つまり、2 つ目の schema は変更された行の構造を記述します。通常、このスキーマには入れ子になったスキーマが含まれます。

4

payload

2 つ目の payload フィールドはイベント値の一部です。前述の schema フィールドによって記述された構造を持ち、変更された行の実際のデータが含まれます。

デフォルトでは、コネクターによって、変更イベントレコードがイベントの元のテーブルと同じ名前を持つトピックにストリーミングされます。トピック名 を参照してください。

警告

SQL Server コネクターは、すべての Kafka Connect スキーマ名が Avro スキーマ名の形式 に準拠するようにします。つまり、論理サーバー名はアルファベットまたはアンダースコア (a-z、A-Z、または _) で始まる必要があります。論理サーバー名の残りの各文字と、データベース名とテーブル名の各文字は、アルファベット、数字、またはアンダースコア ( a-z、A-Z、0-9、または \_) でなければなりません。無効な文字がある場合は、アンダースコアに置き換えられます。

論理サーバー名、データベース名、またはテーブル名に無効な文字が含まれ、名前を区別する唯一の文字が無効であると、無効な文字はすべてアンダースコアに置き換えられるため、予期せぬ競合が発生する可能性があります。

変更イベントの詳細は、以下を参照してください。

8.2.5.1. Debezium SQL Server 変更イベントのキー

変更イベントのキーには、変更されたテーブルのキーのスキーマと、変更された行の実際のキーのスキーマが含まれます。スキーマとそれに対応するペイロードの両方には、コネクターによってイベントが作成された時点において、変更されたテーブルのプライマリーキー (または一意なキー制約) に存在した各列のフィールドが含まれます。

以下の customers テーブルについて考えてみましょう。この後に、このテーブルの変更イベントキーの例を示します。

テーブルの例

CREATE TABLE customers (
  id INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY,
  first_name VARCHAR(255) NOT NULL,
  last_name VARCHAR(255) NOT NULL,
  email VARCHAR(255) NOT NULL UNIQUE
);

変更イベントキーの例

customers テーブルへの変更をキャプチャーする変更イベントのすべてに、イベントキースキーマがあります。customers テーブルに前述の定義がある限り、customers テーブルへの変更をキャプチャーする変更イベントのキー構造は、JSON では以下のようになります。

{
    "schema": { 1
        "type": "struct",
        "fields": [ 2
            {
                "type": "int32",
                "optional": false,
                "field": "id"
            }
        ],
        "optional": false, 3
        "name": "server1.dbo.customers.Key" 4
    },
    "payload": { 5
        "id": 1004
    }
}
表8.5 変更イベントキーの説明
項目フィールド名説明

1

schema

キーのスキーマ部分は、キーの payload 部分の内容を記述する Kafka Connect スキーマを指定します。

2

fields

各フィールドの名前、型、および必要かどうかなど、payload で想定される各フィールドを指定します。この例では、型が int32id という名前の必須フィールドが 1 つあります。

3

任意

イベントキーの payload フィールドに値が含まれる必要があるかどうかを示します。この例では、キーのペイロードに値が必要です。テーブルにプライマリーキーがない場合は、キーの payload フィールドの値は任意です。

4

server1.dbo.customers.Key

キーのペイロードの構造を定義するスキーマの名前。このスキーマは、変更されたテーブルのプライマリーキーの構造を記述します。キースキーマ名の形式は connector-name.database-schema-name.table-name.Key です。この例では、以下のようになります。

  • server1 はこのイベントを生成したコネクターの名前です。
  • dbo は変更されたテーブルのデータベーススキーマです。
  • customers は更新されたテーブルです。

5

payload

この変更イベントが生成された行のキーが含まれます。この例では、キーには値が1004id フィールドが 1 つ含まれます。

8.2.5.2. Debezium SQL Server 変更イベントの値

変更イベントの値はキーよりも若干複雑です。キーと同様に、値には schema セクションと payload セクションがあります。schema セクションには、入れ子のフィールドを含む、 Envelope セクションの payload 構造を記述するスキーマが含まれています。データを作成、更新、または削除する操作のすべての変更イベントには、Envelope 構造を持つ値 payload があります。

変更イベントキーの例を紹介するために使用した、同じサンプルテーブルについて考えてみましょう。

CREATE TABLE customers (
  id INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY,
  first_name VARCHAR(255) NOT NULL,
  last_name VARCHAR(255) NOT NULL,
  email VARCHAR(255) NOT NULL UNIQUE
);

このテーブルへの変更に対する変更イベントの値部分には、以下の各イベント型について記述されています。

作成 イベント

以下の例は、customers テーブルにデータを作成する操作に対して、コネクターによって生成される変更イベントの値の部分を示しています。

{
  "schema": { 1
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "server1.dbo.customers.Value", 2
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "server1.dbo.customers.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "connector"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
          },
          {
            "type": "boolean",
            "optional": true,
            "default": false,
            "field": "snapshot"
          },
          {
            "type": "string",
            "optional": false,
            "field": "db"
          },
          {
            "type": "string",
            "optional": false,
            "field": "schema"
          },
          {
            "type": "string",
            "optional": false,
            "field": "table"
          },
          {
            "type": "string",
            "optional": true,
            "field": "change_lsn"
          },
          {
            "type": "string",
            "optional": true,
            "field": "commit_lsn"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "event_serial_no"
          }
        ],
        "optional": false,
        "name": "io.debezium.connector.sqlserver.Source", 3
        "field": "source"
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      }
    ],
    "optional": false,
    "name": "server1.dbo.customers.Envelope" 4
  },
  "payload": { 5
    "before": null, 6
    "after": { 7
      "id": 1005,
      "first_name": "john",
      "last_name": "doe",
      "email": "john.doe@example.org"
    },
    "source": { 8
      "version": "1.7.2.Final",
      "connector": "sqlserver",
      "name": "server1",
      "ts_ms": 1559729468470,
      "snapshot": false,
      "db": "testDB",
      "schema": "dbo",
      "table": "customers",
      "change_lsn": "00000027:00000758:0003",
      "commit_lsn": "00000027:00000758:0005",
      "event_serial_no": "1"
    },
    "op": "c", 9
    "ts_ms": 1559729471739 10
  }
}
表8.6 作成 イベント値フィールドの説明
項目フィールド名説明

1

schema

値のペイロードの構造を記述する、値のスキーマ。変更イベントの値スキーマは、コネクターが特定のテーブルに生成するすべての変更イベントで同じになります。

2

name

スキーマ セクションで、各 name フィールドは、値のペイロードのフィールドのスキーマを指定します。

server1.dbo.customers.Value はペイロードのbefore および after フィールドのスキーマです。このスキーマは customers テーブルに固有です。

before および after フィールドのスキーマ名はlogicalName.database-schemaName.tableName.Value の形式を取るので、スキーマ名がデータベースで一意になるようにします。つまり、Avro コンバーター を使用する場合、各論理ソースの各テーブルの Avro スキーマには独自の進化と履歴があります。

3

name

io.debezium.connector.sqlserver.Source は、ペイロードの source フィールドのスキーマです。このスキーマは、SQL Server コネクターに固有です。コネクターは生成するすべてのイベントにこれを使用します。

4

name

server1.dbo.customers.Envelope は、ペイロードの全体的な構造のスキーマで、server1 はコネクター名、dbo はデータベーススキーマ名、customers はテーブルを指します。

5

payload

値の実際のデータ。これは、変更イベントが提供する情報です。

イベントの JSON 表現はそれが記述する行よりもはるかに大きいように見えることがあります。これは、JSON 表現にはメッセージのスキーマ部分とペイロード部分を含める必要があるためです。しかし、Avro converter を使用すると、コネクターが Kafka トピックにストリーミングするメッセージのサイズを大幅に小さくすることができます。

6

before

イベント発生前の行の状態を指定する任意のフィールド。この例のように、op フィールドが create (作成) の c である場合、この変更イベントは新しい内容に対するものであるため、beforenull になります。

7

after

イベント発生後の行の状態を指定する任意のフィールド。この例では、after フィールドには、新しい行の idfirst_namelast_name、および email 列の値が含まれます。

8

source

イベントのソースメタデータを記述する必須のフィールド。このフィールドには、イベントの発生元、イベントの発生順序、およびイベントが同じトランザクションの一部であるかどうかなど、このイベントと他のイベントを比較するために使用できる情報が含まれています。ソースメタデータには以下が含まれています。

  • Debezium バージョン
  • コネクター型および名前
  • データベースおよびスキーマ名
  • データベースに変更が加えられた時点のタイムスタンプ
  • イベントがスナップショットの一部であるか
  • 新しい行が含まれるテーブルの名前
  • サーバーログオフセット

9

op

コネクターによってイベントが生成される原因となった操作の型を記述する必須文字列。この例では、c は操作によって行が作成されたことを示しています。有効な値は以下のとおりです。

  • c = create
  • u = update
  • d = delete
  • r = read (読み取り、スナップショットのみに適用)

10

ts_ms

コネクターがイベントを処理した時間を表示する任意のフィールド。イベントメッセージエンベロープでは、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。

source オブジェクトで、ts_ms は変更がデータベースにコミットされた時刻を示します。payload.source.ts_ms の値を payload.ts_ms の値と比較することにより、ソースデータベースの更新と Debezium との間の遅延を判断できます。

更新イベント

サンプル customers テーブルにある更新の変更イベントの値には、そのテーブルの 作成 イベントと同じスキーマがあります。同様に、イベント値のペイロードは同じ構造を持ちます。ただし、イベント値ペイロードでは 更新 イベントに異なる値が含まれます。以下は、コネクターによって customers テーブルでの更新に生成されるイベントの変更イベント値の例になります。

{
  "schema": { ... },
  "payload": {
    "before": { 1
      "id": 1005,
      "first_name": "john",
      "last_name": "doe",
      "email": "john.doe@example.org"
    },
    "after": { 2
      "id": 1005,
      "first_name": "john",
      "last_name": "doe",
      "email": "noreply@example.org"
    },
    "source": { 3
      "version": "1.7.2.Final",
      "connector": "sqlserver",
      "name": "server1",
      "ts_ms": 1559729995937,
      "snapshot": false,
      "db": "testDB",
      "schema": "dbo",
      "table": "customers",
      "change_lsn": "00000027:00000ac0:0002",
      "commit_lsn": "00000027:00000ac0:0007",
      "event_serial_no": "2"
    },
    "op": "u", 4
    "ts_ms": 1559729998706  5
  }
}
表8.7 更新 イベント値フィールドの説明
項目フィールド名説明

1

before

イベント発生前の行の状態を指定する任意のフィールド。更新 イベント値の before フィールドには、各テーブル列のフィールドと、データベースのコミット前にその列にあった値が含まれます。この例では、email の値は john.doe@example.org です。

2

after

イベント発生後の行の状態を指定する任意のフィールド。beforeafter の構造を比較すると、この行への更新内容を判断できます。この例では、email の値は noreply@example.org です。

3

source

イベントのソースメタデータを記述する必須のフィールド。source フィールド構造には create イベントと同じフィールドがありますが、一部の値が異なります。たとえば、更新 イベントサンプルのオフセットは異なります。ソースメタデータには以下が含まれています。

  • Debezium バージョン
  • コネクター型および名前
  • データベースおよびスキーマ名
  • データベースに変更が加えられた時点のタイムスタンプ
  • イベントがスナップショットの一部であるか
  • 新しい行が含まれるテーブルの名前
  • サーバーログオフセット

event_serial_no フィールドは、同じコミットおよび変更 LSN を持つイベントを区別します。このフィールドの値が 1 以外である場合に典型的な状況です。

  • 更新によって SQL Server の CDC 変更テーブルに 2 つのイベントが生成されるため、更新 イベントの値は 2 に設定されています (詳細はソースドキュメントを参照してください)。最初のイベントには古い値が含まれ、2 番目のイベントには新しい値が含まれます。コネクターは最初のイベントの値を使用して 2 つ目のイベントを作成します。コネクターは最初のイベントを破棄します。
  • プライマリーキーが更新されると、SQL Server は 2 つのイベントを生成します。古いプライマリーキーを持つレコードを削除するための 削除 イベントと、新しいプライマリーキーを持つレコードを追加するための 作成 イベント。どちらの操作も同じコミットおよび変更 LSN を共有します。イベント番号はそれぞれ 1 および 2 です。

4

op

操作の型を記述する必須の文字列。更新 イベントの値では、 op フィールドの値は u で、更新によってこの行が変更したことを示します。

5

ts_ms

コネクターがイベントを処理した時間を表示する任意のフィールド。イベントメッセージエンベロープでは、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。

source オブジェクトで、ts_ms は変更がデータベースにコミットされた時刻を示します。payload.source.ts_ms の値を payload.ts_ms の値と比較することにより、ソースデータベースの更新と Debezium との間の遅延を判断できます。

注記

行のプライマリーキー/一意キーの列を更新すると、行のキーの値が変更されます。キーが変更されると、3 つ のイベントが Debezium によって出力されます。3 つのイベントとは、削除 イベント、行の古いキーを持つ 廃棄 (tombstone) イベント、および行の新しいキーを持つ 作成 イベントです。

delete イベント

削除 変更イベントの値は、同じテーブルの 作成 および 更新 イベントと同じ schema の部分になります。サンプル customers テーブルの 削除 イベントの payload 部分は以下のようになります。

{
  "schema": { ... },
  },
  "payload": {
    "before": { <>
      "id": 1005,
      "first_name": "john",
      "last_name": "doe",
      "email": "noreply@example.org"
    },
    "after": null, 1
    "source": { 2
      "version": "1.7.2.Final",
      "connector": "sqlserver",
      "name": "server1",
      "ts_ms": 1559730445243,
      "snapshot": false,
      "db": "testDB",
      "schema": "dbo",
      "table": "customers",
      "change_lsn": "00000027:00000db0:0005",
      "commit_lsn": "00000027:00000db0:0007",
      "event_serial_no": "1"
    },
    "op": "d", 3
    "ts_ms": 1559730450205 4
  }
}
表8.8 削除 イベント値フィールドの説明
項目フィールド名説明

1

before

イベント発生前の行の状態を指定する任意のフィールド。削除 イベント値の before フィールドには、データベースのコミットで削除される前に行にあった値が含まれます。

2

after

イベント発生後の行の状態を指定する任意のフィールド。削除 イベント値の after フィールドは null で、行が存在しないことを示します。

3

source

イベントのソースメタデータを記述する必須のフィールド。削除 イベント値の source フィールド構造は、同じテーブルの 作成 および 更新 イベントと同じになります。多くの source フィールドの値も同じです。削除 イベント値では、ts_ms および pos フィールドの値や、その他の値が変更された可能性があります。ただし、削除 イベント値の source フィールドは、同じメタデータを提供します。

  • Debezium バージョン
  • コネクター型および名前
  • データベースおよびスキーマ名
  • データベースに変更が加えられた時点のタイムスタンプ
  • イベントがスナップショットの一部であるか
  • 新しい行が含まれるテーブルの名前
  • サーバーログオフセット

4

op

操作の型を記述する必須の文字列。op フィールドの値は d で、行が削除されたことを示します。

5

ts_ms

コネクターがイベントを処理した時間を表示する任意のフィールド。イベントメッセージエンベロープでは、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。

source オブジェクトで、ts_ms は変更がデータベースに加えられた時間を示します。payload.source.ts_ms の値を payload.ts_ms の値と比較することにより、ソースデータベースの更新と Debezium との間の遅延を判断できます。

SQL Server コネクターイベントは、Kafka ログコンパクション と動作するように設計されています。ログコンパクションにより、少なくとも各キーの最新のメッセージが保持される限り、一部の古いメッセージを削除できます。これにより、トピックに完全なデータセットが含まれ、キーベースの状態のリロードに使用できるようにするとともに、Kafka がストレージ領域を確保できるようにします。

廃棄 (tombstone) イベント

行が削除された場合でも、Kafka は同じキーを持つ以前のメッセージをすべて削除できるため、削除 イベントの値はログコンパクションで動作します。ただし、Kafka が同じキーを持つすべてのメッセージを削除するには、メッセージの値が null である必要があります。これを可能にするために、Debezium の SQL Server コネクターは 削除 イベントを出力した後に、 null 値以外の同じキーを持つ、特別な廃棄 (tombstone) イベントを出力します。

8.2.6. トランザクション境界を表す Debezium SQL Server コネクターによって生成されたイベント

Debezium は、トランザクション境界を表し、データ変更イベントメッセージをエンリッチするイベントを生成できます。

Debezium がトランザクションメタデータを受信する場合の制限

Debezium は、コネクターのデプロイ後に発生するトランザクションに対してのみメタデータを登録し、受信します。コネクターをデプロイする前に発生するトランザクションのメタデータは利用できません。

データベーストランザクションは、キーワード BEGIN および END で囲まれたステートメントブロックによって表されます。Debezium は、すべてのトランザクションで BEGIN および END 区切り文字のトランザクション境界イベントを生成します。トランザクション境界イベントには以下のフィールドが含まれます。

status
BEGIN または END
id
一意のトランザクション識別子の文字列表現。
event_count (END イベント用)
トランザクションによって出力されるイベントの合計数。
data_collections (END イベント用)
指定のデータコレクションからの変更によって出力されたイベントの数を提供する data_collection および event_count のペアの配列。
警告

Debezium には、トランザクションがいつ終了したかを確実に識別する方法がありません。このように、トランザクション END マーカーは、別のトランザクションの最初のイベントが到着した後にのみ発行されます。これにより、トラフィックの少ないシステムの場合、END マーカーの配信が遅れる可能性があります。

以下の例は、典型的なトランザクション境界メッセージを示しています。

例: SQL Server コネクタートランザクション境界イベント

{
  "status": "BEGIN",
  "id": "00000025:00000d08:0025",
  "event_count": null,
  "data_collections": null
}

{
  "status": "END",
  "id": "00000025:00000d08:0025",
  "event_count": 2,
  "data_collections": [
    {
      "data_collection": "testDB.dbo.tablea",
      "event_count": 1
    },
    {
      "data_collection": "testDB.dbo.tableb",
      "event_count": 1
    }
  ]
}

トランザクションイベントは、<database.server.name>.transaction という名前のトピックに書き込まれます。

8.2.6.1. 変更データイベントのエンリッチメント

トランザクションメタデータを有効にすると、データメッセージ Envelope は新しい transaction フィールドでエンリッチされます。このフィールドは、複合フィールドの形式ですべてのイベントに関する情報を提供します。

id
一意のトランザクション識別子の文字列表現。
total_order
トランザクションによって生成されたすべてのイベントを対象とするイベントの絶対位置。
data_collection_order
トランザクションによって出力されたすべてのイベントを対象とするイベントのデータコレクションごとの位置。

以下の例は、典型的なメッセージの例を示しています。

{
  "before": null,
  "after": {
    "pk": "2",
    "aa": "1"
  },
  "source": {
...
  },
  "op": "c",
  "ts_ms": "1580390884335",
  "transaction": {
    "id": "00000025:00000d08:0025",
    "total_order": "1",
    "data_collection_order": "1"
  }
}

8.2.7. Debezium SQL Server コネクターによるデータ型のマッピング方法

Debezium SQL Server コネクターは、行が存在するテーブルのように構造化されたイベントを生成して、テーブル行データへの変更を表します。各イベントには、行のコラム値を表すフィールドが含まれます。イベントが操作のコラム値を表す方法は、列の SQL データ型によって異なります。このイベントで、コネクターは各 SQL Server データ型のフィールドを リテラル型セマンティック型 の両方にマップします。

コネクターは SQL Sever のデータ型を リテラル 型および セマンティック 型の両方にマップできます。

リテラル型
Kafka Connect のスキーマタイプ (INT8INT16INT32INT64FLOAT32FLOAT64BOOLEANSTRINGBYTESARRAYMAPSTRUCT) を使用して、値が文字通りどのように表現されるかを記述します。
セマンティック型
フィールドの Kafka Connect スキーマの名前を使用して、Kafka Connect スキーマがフィールドの 意味 をキャプチャーする方法を記述します。

データ型マッピングの詳細については、以下を参照してください。

基本型

以下の表は、コネクターによる基本的な SQL Server データ型のマッピング方法を示しています。

表8.9 SQL Server コネクターによって使用されるデータ型マッピング
SQL Server のデータ型リテラル型 (スキーマ型)セマンティック型 (スキーマ名) および注記

BIT

BOOLEAN

該当なし

TINYINT

INT16

該当なし

SMALLINT

INT16

該当なし

INT

INT32

該当なし

BIGINT

INT64

該当なし

REAL

FLOAT32

該当なし

FLOAT[(N)]

FLOAT64

該当なし

CHAR[(N)]

STRING

該当なし

VARCHAR[(N)]

STRING

該当なし

TEXT

STRING

該当なし

NCHAR[(N)]

STRING

該当なし

NVARCHAR[(N)]

STRING

該当なし

NTEXT

STRING

該当なし

XML

STRING

io.debezium.data.Xml

XML ドキュメントの文字列表現が含まれます。

DATETIMEOFFSET[(P)]

STRING

io.debezium.time.ZonedTimestamp

タイムゾーン情報を含むタイムスタンプの文字列表現。タイムゾーンは GMT です。

その他のデータ型マッピングは、以下のセクションで説明します。

列のデフォルト値がある場合は、対応するフィールドの Kafka Connect スキーマに伝達されます。変更メッセージには、フィールドのデフォルト値が含まれます (明示的な列値が指定されていない場合)。そのため、スキーマからデフォルト値を取得する必要はほとんどありません。

時間値

タイムゾーン情報が含まれる SQL Server の DATETIMEOFFSET 以外の時間型は、time.precision.mode 設定プロパティーの値によって異なります。time.precision.mode 設定プロパティーが adaptive (デフォルト) に設定された場合、コネクターは列のデータ型を基に時間型のリテラルおよびセマンティック型を決定し、イベントが正確 にデータベースの値を表すようにします。

SQL Server のデータ型リテラル型 (スキーマ型)セマンティック型 (スキーマ名) および注記

DATE

INT32

io.debezium.time.Date

エポックからの日数を表します。

TIME(0), TIME(1), TIME(2), TIME(3)

INT32

io.debezium.time.Time

午前 0 時から経過した時間をミリ秒で表し、タイムゾーン情報は含まれません。

TIME(4), TIME(5), TIME(6)

INT64

io.debezium.time.MicroTime

午前 0 時から経過した時間をマイクロ秒で表し、タイムゾーン情報は含まれません。

TIME(7)

INT64

io.debezium.time.NanoTime

午前 0 時から経過した時間をナノ秒で表し、タイムゾーン情報は含まれません。

DATETIME

INT64

io.debezium.time.Timestamp

エポックからの経過時間をミリ秒で表し、タイムゾーン情報は含まれません。

SMALLDATETIME

INT64

io.debezium.time.Timestamp

エポックからの経過時間をミリ秒で表し、タイムゾーン情報は含まれません。

DATETIME2(0), DATETIME2(1), DATETIME2(2), DATETIME2(3)

INT64

io.debezium.time.Timestamp

エポックからの経過時間をミリ秒で表し、タイムゾーン情報は含まれません。

DATETIME2(4), DATETIME2(5), DATETIME2(6)

INT64

io.debezium.time.MicroTimestamp

エポックからの経過時間をマイクロ秒で表し、タイムゾーン情報は含まれません。

DATETIME2(7)

INT64

io.debezium.time.NanoTimestamp

エポックからの経過時間をナノ秒で表し、タイムゾーン情報は含まれません。

time.precision.mode 設定プロパティーが connect に設定された場合、コネクターは事前定義された Kafka Connect の論理型を使用します。これは、コンシューマーが組み込みの Kafka Connect の論理型のみを認識し、可変精度の時間値を処理できない場合に便利です。一方で、SQL Server はマイクロ秒の 10 分の 1 の精度をサポートするため、connect 時間精度モードでコネクターによって生成されたイベントは、データ列の 少数秒の精度 値が 3 よりも大きい場合に 精度が失われます

SQL Server のデータ型リテラル型 (スキーマ型)セマンティック型 (スキーマ名) および注記

DATE

INT32

org.apache.kafka.connect.data.Date

エポックからの日数を表します。

TIME([P])

INT64

org.apache.kafka.connect.data.Time

午前 0 時からの経過時間をミリ秒で表し、タイムゾーン情報は含まれません。SQL Server では、範囲 が 0 - 7 の P が許可され、マイクロ秒の 10 分の 1 の精度まで保存されますが、P が 3 よりも大きい場合は、このモードでは精度が失われます。

DATETIME

INT64

org.apache.kafka.connect.data.Timestamp

エポックからの経過時間をミリ秒で表し、タイムゾーン情報は含まれません。

SMALLDATETIME

INT64

org.apache.kafka.connect.data.Timestamp

エポックからの経過時間をミリ秒で表し、タイムゾーン情報は含まれません。

DATETIME2

INT64

org.apache.kafka.connect.data.Timestamp

エポックからの経過時間をミリ秒で表し、タイムゾーン情報は含まれません。SQL Server では、範囲 が 0 - 7 の P が許可され、マイクロ秒の 10 分の 1 の精度まで保存されますが、P が 3 よりも大きい場合は、このモードでは精度が失われます。

タイムスタンプ値

DATETIMESMALLDATETIME および DATETIME2 タイプは、タイムゾーン情報のないタイムスタンプを表します。このような列は、UTC を基にして同等の Kafka Connect 値に変換されます。たとえば、2018-06-20 15:13:16.945104 という DATETIME2 の値は、1529507596945104 という値の io.debezium.time.MicroTimestamp で表されます。

Kafka Connect および Debezium を実行している JVM のタイムゾーンは、この変換には影響しないことに注意してください。

10 進数値

Debezium コネクターは、decimal.handling.mode コネクター設定プロパティー の設定にしたがって 10 進数を処理します。

decimal.handling.mode=precise
表8.10 decimal.handing.mode=precise の場合のマッピング
SQL Server タイプリテラル型 (スキーマ型)セマンティック型 (スキーマ名)

NUMERIC[(P[,S])]

BYTES

org.apache.kafka.connect.data.Decimal
scale スキーマパラメーターには、小数点を移動した桁数を表す整数が含まれます。

DECIMAL[(P[,S])]

BYTES

org.apache.kafka.connect.data.Decimal
scale スキーマパラメーターには、小数点を移動した桁数を表す整数が含まれます。

SMALLMONEY

BYTES

org.apache.kafka.connect.data.Decimal
scale スキーマパラメーターには、小数点を移動した桁数を表す整数が含まれます。

MONEY

BYTES

org.apache.kafka.connect.data.Decimal
scale スキーマパラメーターには、小数点を移動した桁数を表す整数が含まれます。

decimal.handling.mode=double
表8.11 decimal.handing.mode=double の場合のマッピング
SQL Server タイプリテラル型セマンティック型

NUMERIC[(M[,D])]

FLOAT64

該当なし

DECIMAL[(M[,D])]

FLOAT64

該当なし

SMALLMONEY[(M[,D])]

FLOAT64

該当なし

MONEY[(M[,D])]

FLOAT64

該当なし

decimal.handling.mode=string
表8.12 decimal.handing.mode=string の場合のマッピング
SQL Server タイプリテラル型セマンティック型

NUMERIC[(M[,D])]

STRING

該当なし

DECIMAL[(M[,D])]

STRING

該当なし

SMALLMONEY[(M[,D])]

STRING

該当なし

MONEY[(M[,D])]

STRING

該当なし

Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.