2.2. MariaDB 用 Debezium コネクター


重要

MariaDB 用の Debezium コネクターは、テクノロジープレビュー機能です。テクノロジープレビュー機能は、Red Hat 製品のサービスレベルアグリーメント (SLA) の対象外であり、機能的に完全ではないことがあります。Red Hat では、実稼働環境での使用を推奨していません。テクノロジープレビュー機能は、最新の製品機能をいち早く提供して、開発段階で機能のテストを行い、フィードバックを提供していただくことを目的としています。

Red Hat のテクノロジープレビュー機能のサポート範囲に関する詳細は、テクノロジープレビュー機能のサポート範囲 を参照してください。

MariaDB には、データベースにコミットされた順序ですべての操作を記録するバイナリーログ (binlog) があります。これには、テーブルスキーマの変更やテーブルのデータの変更が含まれます。MariaDB はレプリケーションとリカバリーに binlog を使用します。

Debezium MariaDB コネクターは binlog を読み取り、行レベルの INSERTUPDATE、および DELETE 操作の変更イベントを生成し、変更イベントを Kafka トピックに出力します。クライアントアプリケーションはこれらの Kafka トピックを読み取ります。

MariaDB は通常、指定期間後に binlogs をパージするように設定されているため、MariaDB コネクターは各データベースの最初の 整合性スナップショット を実行します。MariaDB コネクターは、スナップショットが作成された時点から binlog を読み取ります。

このコネクターと互換性のある MariaDB データベースのバージョンについては、Debezium でサポートされる設定ページを参照してください。

Debezium MariaDB コネクターの使用に関する情報と手順は、次のように設定されています。

2.2.1. Debezium MariaDB コネクターの仕組み

コネクターがサポートする MariaDB トポロジーの概要は、アプリケーションの計画に役立ちます。Debezium MariaDB コネクターを最適に設定および実行するには、コネクターによるテーブルの構造の追跡方法、スキーマ変更の公開方法、スナップショットの実行方法、および Kafka トピック名の決定方法を理解しておくと便利です。

詳細は以下を参照してください。

2.2.1.1. Debezium コネクターでサポートされる MariaDB トポロジー

Debezium MariaDB コネクターは、次の MariaDB トポロジーをサポートします。

Standalone
単一の MariaDB サーバーを使用する場合、Debezium MariaDB コネクターがサーバーを監視できるように、サーバーで binlog を有効にする必要があります。
バイナリーログも増分 バックアップ として使用できるため、これは多くの場合で許容されます。
この場合、MariaDB コネクターは常にこのスタンドアロン MariaDB サーバーインスタンスに接続し、それに従います。
Primary and replica

Debezium MariaDB コネクターは、プライマリーサーバーの 1 つ、またはレプリカの 1 つ (そのレプリカの binlog が有効になっている場合) をフォローできますが、コネクターはそのサーバーに表示されるクラスター内の変更のみを検出します。通常、これはマルチプライマリートポロジー以外では問題ではありません。

コネクターは、サーバーの binlog の位置を記録します。この位置は、クラスターの各サーバーごとに異なります。したがって、コネクターは 1 つの MariaDB サーバーインスタンスのみに従う必要があります。このサーバーに障害が発生した場合、サーバーを再起動またはリカバリーしないと、コネクターは継続できません。

High available clusters
MariaDB にはさまざまな 高可用性 ソリューションが存在し、問題や障害の耐性をつけ、即座に回復することが大変容易になります。HA MariaDB クラスターは GTID を使用するため、レプリカはプライマリーサーバーで発生するすべての変更を追跡できます。
Multi-primary
Galera クラスターのレプリケーション は、それぞれが複数のプライマリーサーバーからレプリケートする 1 つ以上の MariaDB レプリカノードを使用します。クラスターレプリケーションは、複数の MariaDB クラスターのレプリケーションを集約する強力な方法を提供します。
Debezium MariaDB コネクターは、これらのマルチプライマリー MariaDB レプリカをソースとして使用し、新しいレプリカが古いレプリカに追いついている限り、異なるマルチプライマリー MariaDB レプリカにフェイルオーバーできます。つまり、新しいレプリカには最初のレプリカで確認されたすべてのトランザクションが含まれます。これは、コネクターがデータベースやテーブルのサブセットのみを使用している場合でも機能します。これは、新しいマルチプライマリー MariaDB レプリカに再接続して binlog 内の正しい位置を見つけようとする場合に、特定の GTID ソースを含めるか除外するようにコネクターを設定できるためです。
Hosted

Debezium MariaDB コネクターは、Amazon RDS や Amazon Aurora などのホスト型データベースオプションを使用できます。

これらのホストオプションではグローバル読み取りロックの使用が許可されていないため、コネクターは一貫性のあるスナップショットを作成するときにテーブルレベルのロックを使用します。

2.2.1.2. Debezium MariaDB コネクターがデータベーススキーマの変更を処理する方法

データベースクライアントがデータベースのクエリーを行うと、クライアントはデータベースの現在のスキーマを使用します。しかし、データベーススキーマはいつでも変更が可能です。そのため、挿入、更新、または削除の操作が記録されるたびに、コネクターはどのスキーマであるかを特定できる必要があります。また、コネクターは必ずしも現在のスキーマをすべてのイベントに適用できるとは限りません。イベントが比較的古い場合は、現在のスキーマが適用される前に記録された可能性があります。

スキーマ変更後に発生するイベントを正しく処理するために、MariaDB には、データに影響を与える行レベルの変更だけでなく、データベースに適用される DDL ステートメントもトランザクションログに含めます。コネクターは、binlog 内でこれらの DDL ステートメントを検出すると、そのステートメントを解析し、各テーブルのスキーマのインメモリー表現を更新します。コネクターはこのスキーマ表現を使用して、挿入、更新、または削除の操作時にテーブルの構造を特定し、適切な変更イベントを生成します。別のデータベーススキーマ履歴 Kafka トピックでは、コネクターは各 DDL ステートメントがある binlog の場所とともにすべての DDL ステートメントを記録します。

クラッシュするか、正常に停止した後に、コネクターを再起動すると、特定の位置 (特定の時点) から binlog の読み取りを開始します。コネクターは、データベーススキーマ履歴の Kafka トピックを読み取り、コネクターが起動する binlog の時点まですべての DDL ステートメントを解析することで、この時点で存在したテーブル構造を再ビルドします。

このデータベーススキーマ履歴トピックは、内部コネクター専用となっています。オプションで、コネクターは コンシューマーアプリケーション向けの別のトピックにスキーマ変更イベントを送信する こともできます。

MariaDB コネクターが gh-ostpt-online-schema-change などのスキーマ変更ツールが適用されたテーブルの変更をキャプチャーすると、移行プロセス中にヘルパーテーブルが作成されます。これらのヘルパーテーブルで発生する変更をキャプチャーするようにコネクターを設定する必要があります。コネクターがヘルパーテーブル用に生成するレコードをコンシューマーが必要としない場合は、単一メッセージ変換 (SMT) を設定して、コネクターが発行するメッセージからこれらのレコードを削除します。

関連情報

2.2.1.3. Debezium MariaDB コネクターがデータベーススキーマの変更を公開する方法

Debezium MariaDB コネクターを設定して、データベース内のテーブルに適用されるスキーマの変更を記述するスキーマ変更イベントを生成することができます。コネクターは、スキーマ変更イベントを <topicPrefix> という名前の Kafka トピックに書き込みます。ここで、topicPrefixtopic.prefix コネクター設定プロパティーで指定された名前空間です。コネクターがスキーマ変更トピックに送信するメッセージには、ペイロードと、任意で変更イベントメッセージのスキーマが含まれます。

スキーマ変更イベントのスキーマには、次の要素があります。

name
スキーマ変更イベントメッセージの名前。
type
変更イベントメッセージのタイプ。
version
スキーマのバージョン。バージョンは整数で、スキーマが変更されるたびに増加します。
fields
変更イベントメッセージに含まれるフィールド。

例: MariaDB コネクタースキーマ変更トピックのスキーマ

次の例は、JSON 形式の一般的なスキーマを示しています。

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "string",
        "optional": false,
        "field": "databaseName"
      }
    ],
    "optional": false,
    "name": "io.debezium.connector.mariadb.SchemaChangeKey",
    "version": 1
  },
  "payload": {
    "databaseName": "inventory"
  }
}
Copy to Clipboard Toggle word wrap

スキーマ変更イベントメッセージのペイロードには、以下の要素が含まれます。

ddl
スキーマの変更につながる SQL CREATEALTER、または DROP ステートメントを提供します。
databaseName
DDL ステートメントが適用されるデータベースの名前。databaseName の値は、メッセージキーとして機能します。
pos
ステートメントが表示される binlog の位置。
tableChanges
スキーマの変更後のテーブルスキーマ全体の構造化表現。tableChanges フィールドには、テーブルの各列のエントリーなどのアレイが含まれます。構造化された表現は JSON または Avro 形式でデータを表示するため、コンシューマーは DDL パーサーを介して最初にメッセージを処理しなくてもメッセージを簡単に読み取りできます。
重要

キャプチャーモードであるテーブルでは、コネクターはスキーマ変更トピックにスキーマ変更の履歴だけでなく、内部データベーススキーマ履歴トピックにも格納します。内部データベーススキーマ履歴トピックはコネクターのみの使用を対象としており、使用するアプリケーションによる直接使用を目的としていません。スキーマ変更に関する通知が必要なアプリケーションが、スキーマ変更トピックからの情報のみを使用するようにしてください。

重要

データベーススキーマ履歴トピックをパーティションに分割しないでください。データベーススキーマ履歴トピックが正しく機能するには、コネクターが出力するイベントレコードの一貫したグローバル順序を維持する必要があります。

トピックがパーティション間で分割されないようにするには、以下のいずれかの方法を使用してトピックのパーティション数を設定します。

  • データベーススキーマ履歴トピックを手動で作成する場合は、パーティション数を 1 に指定します。
  • Apache Kafka ブローカーを使用してデータベーススキーマ履歴トピックを自動的に作成する場合に、トピックが作成されるので、Kafka num.partitions 設定オプションの値を 1 に設定します。
警告

コネクターがスキーマ変更トピックに出力するメッセージの形式は、初期の状態であり、通知なしに変更される可能性があります。

例: MariaDB コネクタースキーマ変更トピックに送信されるメッセージ

以下の例は、JSON 形式の一般的なスキーマ変更メッセージを示しています。メッセージには、テーブルスキーマの論理表現が含まれます。

{
  "schema": { },
  "payload": {
      "source": {  
1

        "version": "3.0.8.Final",
        "connector": "mariadb",
        "name": "mariadb",
        "ts_ms": 1651535750218, 
2

        "ts_us": 1651535750218000, 
3

        "ts_ns": 1651535750218000000, 
4

        "snapshot": "false",
        "db": "inventory",
        "sequence": null,
        "table": "customers",
        "server_id": 223344,
        "gtid": null,
        "file": "mariadb-bin.000003",
        "pos": 570,
        "row": 0,
        "thread": null,
        "query": null
      },
      "databaseName": "inventory", 
5

      "schemaName": null,
      "ddl": "ALTER TABLE customers ADD middle_name varchar(255) AFTER first_name", 
6

      "tableChanges": [  
7

        {
          "type": "ALTER", 
8

          "id": "\"inventory\".\"customers\"", 
9

          "table": {    
10

            "defaultCharsetName": "utf8mb4",
            "primaryKeyColumnNames": [  
11

              "id"
            ],
            "columns": [  
12

              {
                "name": "id",
                "jdbcType": 4,
                "nativeType": null,
                "typeName": "INT",
                "typeExpression": "INT",
                "charsetName": null,
                "length": null,
                "scale": null,
                "position": 1,
                "optional": false,
                "autoIncremented": true,
                "generated": true
              },
              {
                "name": "first_name",
                "jdbcType": 12,
                "nativeType": null,
                "typeName": "VARCHAR",
                "typeExpression": "VARCHAR",
                "charsetName": "utf8mb4",
                "length": 255,
                "scale": null,
                "position": 2,
                "optional": false,
                "autoIncremented": false,
                "generated": false
              },
              {
                "name": "middle_name",
                "jdbcType": 12,
                "nativeType": null,
                "typeName": "VARCHAR",
                "typeExpression": "VARCHAR",
                "charsetName": "utf8mb4",
                "length": 255,
                "scale": null,
                "position": 3,
                "optional": true,
                "autoIncremented": false,
                "generated": false
              },
              {
                "name": "last_name",
                "jdbcType": 12,
                "nativeType": null,
                "typeName": "VARCHAR",
                "typeExpression": "VARCHAR",
                "charsetName": "utf8mb4",
                "length": 255,
                "scale": null,
                "position": 4,
                "optional": false,
                "autoIncremented": false,
                "generated": false
              },
              {
                "name": "email",
                "jdbcType": 12,
                "nativeType": null,
                "typeName": "VARCHAR",
                "typeExpression": "VARCHAR",
                "charsetName": "utf8mb4",
                "length": 255,
                "scale": null,
                "position": 5,
                "optional": false,
                "autoIncremented": false,
                "generated": false
            }
          ],
          "attributes": [ 
13

            {
              "customAttribute": "attributeValue"
            }
          ]
        }
      }
    ]
  }
}
Copy to Clipboard Toggle word wrap
Expand
表2.25 スキーマ変更トピックに出力されたメッセージのフィールドの説明
項目フィールド名説明

1

source

source フィールドは、コネクターがテーブル固有のトピックに書き込む標準のデータ変更イベントとして設定されます。このフィールドは、異なるトピックでイベントを関連付けるのに役立ちます。

2

ts_ms, ts_us, ts_ns

コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。

ソースオブジェクトの ts_ms は、データベースで変更が行われた時刻を示す。payload.source.ts_ms の値を payload.ts_ms の値と比較することにより、ソースデータベースの更新と Debezium との間の遅延を判断できます。

3

databaseName
schemaName

変更が含まれるデータベースとスキーマを識別します。databaseName フィールドの値は、レコードのメッセージキーとして使用されます。

4

ddl

このフィールドには、スキーマの変更を行う DDL が含まれます。ddl フィールドには複数の DDL ステートメントが含まれることがあります。各ステートメントは、databaseName フィールドのデータベースに適用されます。ステートメントは、データベースに適用された順序で示されます。

クライアントは、複数のデータベースに適用される複数の DDL ステートメントを送信できます。MariaDB がそれらをアトミックに適用する場合、コネクターは DDL ステートメントを順番に取得し、データベースごとにグループ化し、各グループに対してスキーマ変更イベントを作成します。MariaDB がそれらを個別に適用する場合、コネクターはステートメントごとに個別のスキーマ変更イベントを作成します。

5

tableChanges

DDL コマンドによって生成されるスキーマの変更が含まれる 1 つ以上の項目の配列。

6

type

変更の種類を説明します。値は以下のいずれかになります。

CREATE
テーブルの作成
ALTER
テーブルの変更
DROP
テーブルの削除

7

id

作成、変更、または破棄されたテーブルの完全な識別子。テーブルの名前が変更されると、この識別子は <old>,<new> のテーブル名が連結されます。

8

table

適用された変更後のテーブルメタデータを表します。

9

primaryKeyColumnNames

テーブルのプライマリーキーを設定する列のリスト。

10

変更されたテーブルの各列のメタデータ。

11

attributes

各テーブル変更のカスタム属性メタデータ。

詳細は、スキーマ履歴トピック を参照してください。

2.2.1.4. Debezium MariaDB コネクターがデータベーススナップショットを実行する方法

Debezium MariaDB コネクターが初めて起動されると、データベースの初期 整合性スナップショット が実行されます。このスナップショットにより、コネクターはデータベースの現在の状態のベースラインを確立できます。

Debezium はスナップショットを実行するときにさまざまなモードを使用できます。スナップショットモードは、snapshot.mode 設定プロパティーによって決まります。プロパティーのデフォルト値は initial です。snapshot.mode プロパティーの値を変更することで、コネクターがスナップショットを作成する方法をカスタマイズできます。

スナップショットの詳細は、以下のセクションを参照してください。

コネクターは、スナップショットを実行するときに一連のタスクを完了します。正確な手順は、スナップショットモードと、データベースに対して有効なテーブルロックポリシーによって異なります。Debezium MariaDB コネクターは、グローバル読み取りロック または テーブルレベルロック を使用する初期スナップショットを実行するときに、さまざまな手順を実行します。

2.2.1.4.1. グローバル読み取りロックを使用する初期スナップショット

snapshot.mode プロパティーの値を変更することで、コネクターがスナップショットを作成する方法をカスタマイズできます。別のスナップショットモードを設定する場合、コネクターはこのワークフローの変更バージョンを使用してスナップショットを完了します。グローバル読み取りロックが許可されていない環境でのスナップショットプロセスは、テーブルレベルロックのスナップショットワークフロー を参照してください。

Debezium MariaDB コネクターがグローバル読み取りロックを使用して初期スナップショットを実行するために使用するデフォルトのワークフロー

以下の表は、Debezium がグローバル読み取りロックでスナップショットを作成する際のワークフローの手順を示しています。

Expand
手順アクション

1

データベースへの接続を確立します。

2

キャプチャーするテーブルを決定します。デフォルトでは、コネクターはシステム以外のすべてのテーブルのデータをキャプチャーします。スナップショットが完了した後、コネクターは指定されたテーブルのデータをストリーミングし続けます。コネクターで特定のテーブルからのみデータをキャプチャーする場合は、table.include.listtable.exclude.list などのプロパティーを設定して、テーブルまたはテーブル要素のサブセットのみのデータをキャプチャーするようにコネクターに指示できます。

3

キャプチャーするテーブルに対してグローバル読み取りロックを取得し、他のデータベースクライアントによる writes をブロックします。

スナップショット自体は、コネクターによる binlog の位置やテーブルスキーマの読み取りを妨害する可能性のある DDL を他のクライアントが適用しないように防ぐことはありません。コネクターは binlog の位置を読み取る間にグローバル読み取りロックを保持し、後のステップで説明するように、ロックを解除します。

4

繰り返し可能な読み取りセマンティクス でトランザクションを開始し、トランザクション内の後続の読み取りがすべて 整合性スナップショット に対して実行されるようにします。

注記

これらの分離セマンティクスを使用すると、スナップショットの進行が遅くなる可能性があります。スナップショットの完了に時間がかかりすぎる場合は、別の分離設定の使用を検討するか、最初のスナップショットをスキップして、代わりに 増分スナップショット を実行します。

5

現在の binlog の位置を読み取ります。

6

データベース内のすべてのテーブル、またはキャプチャー対象として指定されたすべてのテーブルの構造をキャプチャーします。コネクターは、必要なすべての DROP… および CREATE… DDL ステートメントなど、スキーマ情報を内部データベーススキーマ履歴トピックに保持します。
スキーマ履歴は、変更イベントの発生時に有効な構造に関する情報を提供します。

注記

デフォルトでは、コネクターは、キャプチャー用に設定されていないテーブルも含め、データベース内のすべてのテーブルのスキーマをキャプチャーします。テーブルがキャプチャー用に設定されていない場合、最初のスナップショットはテーブルの構造のみをキャプチャーし、テーブルデータはキャプチャーされません。

初期スナップショットに含まれなかったテーブルのスキーマ情報がスナップショットに保持される理由の詳細は、初期スナップショットがすべてのテーブルのスキーマをキャプチャーする理由 を参照してください。

7

手順 3 で取得したグローバル読み取りロックを解放します。他のデータベースクライアントがデータベースに書き込みできるようになりました。

8

コネクターが手順 5 で読み取った binlog の位置で、コネクターはキャプチャー用に指定されたテーブルのスキャンを開始します。スキャン中に、コネクターは次のタスクを実行します。

  1. スナップショットが開始される前に、テーブルが作成されたことを確認します。スナップショットの開始後にテーブルが作成された場合、コネクターはテーブルをスキップします。スナップショットが完了し、コネクターがストリーミングに移行すると、スナップショットの開始後に作成されたテーブルに対して変更イベントが発行されます。
  2. テーブルからキャプチャーされた行ごとに read イベントを生成します。すべての read イベントには、同じバイナリーログ位置 (手順 5 で取得した位置) が含まれています。
  3. ソーステーブルの Kafka トピックに各 read イベントを出力します。
  4. 該当する場合は、データテーブルロックを解放します。

9

トランザクションをコミットします。

10

コネクターオフセットにスナップショットの正常な完了を記録します。

作成された初期スナップショットは、キャプチャーされたテーブルの各行の現在の状態をキャプチャーします。このベースライン状態から、コネクターは発生した後続の変更をキャプチャーします。

スナップショットプロセスが開始されたら、コネクターの障害、リバランス、またはその他の理由でプロセスが中断されると、コネクターの再起動後にプロセスが再起動されます。

コネクターによって最初のスナップショットが完了した後、更新に抜けがないように、手順 5 で読み取りした位置からストリーミングを続行します。

何らかの理由でコネクターが再び停止した場合に、コネクターは再起動後に最後に停止した位置から変更のストリーミングを再開します。

コネクターの再起動後、ログがプルーニングされている場合、ログ内のコネクターの位置が使用できなくなる可能性があります。その後、コネクターは失敗し、新しいスナップショットが必要であることを示すエラーを返します。この状況でスナップショットを自動的に開始するようにコネクターを設定するには、snapshot.mode プロパティーの値を when_needed に設定します。Debezium MariaDB コネクターのトラブルシューティングに関する詳細は、問題が発生したときの動作 を参照してください。

2.2.1.4.2. テーブルレベルロックを使用する初期スナップショット

一部のデータベース環境では、管理者がグローバル読み取りロックを許可していません。Debezium MariaDB コネクターがグローバル読み取りロックが許可されていないことを検出した場合、コネクターはスナップショットを実行するときにテーブルレベルのロックを使用します。コネクターがテーブルレベルロックを使用するスナップショットを実行するには、Debezium コネクターが MariaDB への接続に使用するデータベースアカウントで LOCK TABLES 権限が必要です。

Debezium MariaDB コネクターがテーブルレベルのロックを使用して初期スナップショットを実行するために使用するデフォルトのワークフロー

次の表は、テーブルレベルの読み取りロックを使用してスナップショットを作成するために Debezium が実行するワークフローの手順を示しています。グローバル読み取りロックが許可されていない環境でのスナップショットプロセスについては、グローバル読み取りロックのスナップショットワークフロー を参照してください。

Expand
手順アクション

1

データベースへの接続を確立します。

2

キャプチャーするテーブルを決定します。デフォルトでは、コネクターはすべてのシステム以外のテーブルをキャプチャーします。コネクターにテーブルまたはテーブル要素のサブセットをキャプチャーさせるには、table.include.listtable.exclude.list など、データをフィルタリングするための多数の include および exclude プロパティーを設定できます。

3

テーブルレベルロックを取得します。

4

繰り返し可能な読み取りセマンティクス でトランザクションを開始し、トランザクション内の後続の読み取りがすべて 整合性スナップショット に対して実行されるようにします。

5

現在の binlog の位置を読み取ります。

6

コネクターが変更をキャプチャーするように設定されたデータベースとテーブルのスキーマを読み取ります。コネクターは、必要なすべての DROP… および CREATE… DDL ステートメントなど、スキーマ情報を内部データベーススキーマ履歴トピックに保持します。
スキーマ履歴は、変更イベントの発生時に有効な構造に関する情報を提供します。

注記

デフォルトでは、コネクターは、キャプチャー用に設定されていないテーブルも含め、データベース内のすべてのテーブルのスキーマをキャプチャーします。テーブルがキャプチャー用に設定されていない場合、最初のスナップショットはテーブルの構造のみをキャプチャーし、テーブルデータはキャプチャーされません。

初期スナップショットに含まれなかったテーブルのスキーマ情報がスナップショットに保持される理由の詳細は、初期スナップショットがすべてのテーブルのスキーマをキャプチャーする理由 を参照してください。

7

コネクターが手順 5 で読み取った binlog の位置で、コネクターはキャプチャー用に指定されたテーブルのスキャンを開始します。スキャン中に、コネクターは次のタスクを実行します。

  1. スナップショットが開始される前に、テーブルが作成されたことを確認します。スナップショットの開始後にテーブルが作成された場合、コネクターはテーブルをスキップします。スナップショットが完了し、コネクターがストリーミングに移行すると、スナップショットの開始後に作成されたテーブルに対して変更イベントが発行されます。
  2. テーブルからキャプチャーされた行ごとに read イベントを生成します。すべての read イベントには、同じバイナリーログ位置 (手順 5 で取得した位置) が含まれています。
  3. ソーステーブルの Kafka トピックに各 read イベントを出力します。
  4. 該当する場合は、データテーブルロックを解放します。

8

トランザクションをコミットします。

9

テーブルレベルロックを解除します。他のデータベースクライアントは、以前にロックされていたテーブルに書き込みできるようになります。

10

コネクターオフセットにスナップショットの正常な完了を記録します。

Expand
表2.26 snapshot.mode コネクター設定プロパティーの設定
設定説明

always

コネクターは起動するたびにスナップショットを実行します。スナップショットには、キャプチャーされたテーブルの構造およびデータが含まれます。この値を指定すると、コネクターが起動するたびに、キャプチャーされたテーブルからのデータの完全な表現がトピックに入力されます。スナップショットが完了すると、コネクターは、後続のデータベース変更のに備え、イベントレコードのストリーミングを開始します。

Initial

コネクターは 初期スナップショットを作成するためのデフォルトのワークフロー で説明されているように、データベーススナップショットを実行します。スナップショットが完了すると、コネクターは、後続のデータベース変更のに備え、イベントレコードのストリーミングを開始します。

initial_only

コネクターはデータベーススナップショットを実行します。スナップショットが完了すると、コネクターは停止し、後続のデータベース変更のイベントレコードをストリーミングしなくなります。

schema_only

非推奨です。no_data を参照してください。

no_data

コネクターは、初期スナップショットを作成するためのデフォルトのワークフロー で説明されているすべての手順を実行して、関連するすべてのテーブルの構造をキャプチャーします。ただし、コネクターの起動時点のデータセットを表す READ イベントは作成しません (手順 7.2)。

never

コネクターが起動すると、スナップショットを実行するのではなく、後続のデータベース変更のイベントレコードのストリーミングがすぐに開始されます。no_data オプションが優先的に使用されるようになり、このオプションは、今後非推奨にするか検討中です。

schema_only_recovery

非推奨です。recovery を参照してください。

recovery

損失または破損したデータベーススキーマの履歴トピックを復元するにはこのオプションを設定します。再起動後、コネクターはソーステーブルからトピックを再構築するスナップショットを実行します。また、このプロパティーを設定して、予期しない増加が発生するデータベーススキーマ履歴トピックを定期的にプルーニングすることもできます。

警告: 最後のコネクターのシャットダウン後にスキーマの変更がデータベースにコミットされた場合、このモードを使用してスナップショットを実行しないでください。

when_needed

コネクターが起動した後、次のいずれかの状況を検出した場合にのみスナップショットが実行されます。

  • トピックのオフセットを検出できません。
  • 以前に記録されたオフセットは、サーバー上で利用できないログ位置を指定します。

詳細は、コネクター設定プロパティーテーブルの snapshot.mode を参照してください。

2.2.1.4.3. 初期スナップショットがすべてのテーブルのスキーマ履歴をキャプチャーする理由

コネクターが実行する最初のスナップショットは、2 種類の情報をキャプチャーします。

テーブルデータ
コネクターの table.include.list プロパティーにあるテーブルの INSERTUPDATE、および DELETE 操作に関する情報。
スキーマデータ
テーブルに適用される構造の変更を記述する DDL ステートメント。スキーマデータは、内部スキーマ履歴トピックとコネクターのスキーマ変更トピック (設定されている場合) の両方に保持されます。

初期スナップショットを実行すると、キャプチャー対象として指定されていないテーブルのスキーマ情報がスナップショットによってキャプチャーされることが分かります。デフォルトでは、初期スナップショットは、キャプチャー用に指定されたテーブルからだけでなく、データベースに存在するすべてのテーブルのスキーマ情報を取得するように設計されています。コネクターでは、テーブルのスキーマがスキーマ履歴トピックにある状態で、テーブルをキャプチャーする必要があります。初期スナップショットが元のキャプチャーセットの一部ではないテーブルのスキーマデータをキャプチャーできるようにして、後で必要になった場合にこれらのテーブルからイベントデータを簡単にキャプチャーできるように、Debezium はコネクターを準備します。初期スナップショットがテーブルのスキーマをキャプチャーしない場合は、コネクターがテーブルからデータをキャプチャーする前に、履歴トピックにスキーマを追加する必要があります。

場合によっては、最初のスナップショットでのスキーマキャプチャーを制限する場合があります。これは、スナップショットの完了に必要な時間の短縮に便利です。または、Debezium が複数の論理データベースにアクセスできるユーザーアカウントを使用して、データベースインスタンスに接続しているにもかかわらず、コネクターで特定の論理データベース内のテーブルからの変更のみをキャプチャーする場合にも便利です。

コネクターを使用して、最初のスナップショットでスキーマがキャプチャーされなかったテーブルからデータをキャプチャーする場合があります。コネクターの設定によっては、最初のスナップショットはデータベース内の特定のテーブルのテーブルスキーマのみをキャプチャーする場合があります。テーブルスキーマが履歴トピックに存在しない場合、コネクターはテーブルのキャプチャーに失敗し、スキーマ欠落エラーを報告します。

テーブルからデータを取得できる場合もありますが、テーブルスキーマを追加するには別の手順を実行する必要があります。

前提条件

手順

  1. コネクターを停止します。
  2. schema.history.internal.kafka.topic プロパティー で指定された内部データベーススキーマ履歴トピックを削除します。
  3. 以下の変更をコネクター設定に適用します。

    1. snapshot.moderecovery に設定します。
    2. schema.history.internal.store.only.captured.tables.ddl の値を false に設定します。
    3. コネクターがキャプチャーするテーブルを table.include.list に追加します。これにより、コネクターは今後すべてのテーブルのスキーマ履歴を再構築できます。
  4. コネクターを再起動します。スナップショットのリカバリープロセスでは、テーブルの現在の構造に基づいてスキーマ履歴が再ビルドされます。
  5. (オプション) スナップショットが完了したら、増分スナップショット を開始して、コネクターがオフラインだった間に発生した他のテーブルへの変更とともに、新しく追加されたテーブルの既存のデータをキャプチャーします。
  6. (オプション) snapshot.modeno_data にリセットして、今後の再起動後にコネクターが回復を開始しないようにします。

スキーマ変更がテーブルに適用される場合、スキーマ変更前にコミットされたレコードの構造は、変更後にコミットされたレコードとは異なります。Debezium はテーブルからデータをキャプチャーするときに、スキーマ履歴を読み取り、各イベントに正しいスキーマが適用されていることを確認します。スキーマがスキーマ履歴トピックに存在しない場合、コネクターはテーブルをキャプチャーできず、エラーが発生します。

最初のスナップショットでキャプチャーされず、テーブルのスキーマが変更されたテーブルからデータをキャプチャーする場合、スキーマがまだ使用可能でない場合は、履歴トピックにスキーマを追加する必要があります。新しいスキーマスナップショットを実行するか、テーブルの初期スナップショットを実行して、スキーマを追加できます。

前提条件

  • コネクターにより最初のスナップショット中にキャプチャーされなかったスキーマが含まれるテーブルからデータをキャプチャーしたいと考えている。
  • スキーマ変更がテーブルに適用されたため、キャプチャーされるレコードの構造が不均一になっている。

手順

初期スナップショットにすべてのテーブルのスキーマがキャプチャーされている場合 (store.only.captured.tables.ddlfalse に設定されました)。
  1. table.include.list プロパティーを編集して、キャプチャーするテーブルを指定します。
  2. コネクターを再起動します。
  3. 新しく追加したテーブルから既存のデータをキャプチャーする場合は、増分スナップショット を開始します。
初期スナップショットにすべてのテーブルのスキーマがキャプチャーされていない場合 (store.only.captured.tables.ddltrue に設定されています)。

最初のスナップショットでキャプチャーするテーブルのスキーマが保存されなかった場合は、次のいずれかの手順を実行します。

手順 1: スキーマスナップショット、その後に増分スナップショット

この手順では、コネクターは最初にスキーマのスナップショットを実行します。その後、増分スナップショットを開始して、コネクターがデータを同期できるようにします。

  1. コネクターを停止します。
  2. schema.history.internal.kafka.topic プロパティー で指定された内部データベーススキーマ履歴トピックを削除します。
  3. 設定された Kafka Connect offset.storage.topic 内のオフセットをクリアします。オフセットを削除する方法の詳細は、Debezium コミュニティーの FAQ を参照してください。

    警告

    オフセットの削除は、内部 Kafka Connect データの操作の経験がある上級ユーザーのみが実行してください。この操作によりシステムが破損する場合があるため、最後の手段としてのみ実行してください。

  4. 次の手順の説明に従って、コネクター設定のプロパティーの値を設定します。

    1. snapshot.mode プロパティーの値を no_data に設定します。
    2. table.include.list を編集して、キャプチャーするテーブルを追加します。
  5. コネクターを再起動します。
  6. Debezium が新規および既存のテーブルのスキーマをキャプチャーするまで待ちます。コネクターが停止した後にテーブルで発生したデータ変更はキャプチャーされません。
  7. データが損失されないようにするには、増分スナップショット を開始します。
手順 2: 初期スナップショットと、それに続くオプションの増分スナップショット

この手順では、コネクターはデータベースの完全な初期スナップショットを実行します。他の初期スナップショットと同様、多数の大きなテーブルが含まれるデータベースでは、初期スナップショットの実行操作には時間がかかる可能性があります。スナップショットの完了後、任意で増分スナップショットをトリガーして、コネクターがオフラインの間に発生した変更をキャプチャーできます。

  1. コネクターを停止します。
  2. schema.history.internal.kafka.topic プロパティー で指定された内部データベーススキーマ履歴トピックを削除します。
  3. 設定された Kafka Connect offset.storage.topic 内のオフセットをクリアします。オフセットを削除する方法の詳細は、Debezium コミュニティーの FAQ を参照してください。

    警告

    オフセットの削除は、内部 Kafka Connect データの操作の経験がある上級ユーザーのみが実行してください。この操作によりシステムが破損する場合があるため、最後の手段としてのみ実行してください。

  4. table.include.list を編集して、キャプチャーするテーブルを追加します。
  5. 次の手順の説明に従って、コネクター設定のプロパティーの値を設定します。

    1. snapshot.mode プロパティーの値を initial に設定します。
    2. (オプション) schema.history.internal.store.only.captured.tables.ddlfalse に設定します。
  6. コネクターを再起動します。コネクターはデータベース全体のスナップショットを取得します。スナップショットが完了すると、コネクターはストリーミングに移行します。
  7. (オプション) コネクターがオフラインの間に変更されたデータをキャプチャーするには、増分スナップショット を開始します。

2.2.1.5. アドホックスナップショット

デフォルトでは、コネクターは初回スナップショット操作の開始後にのみ実行されます。通常の状況では、この最初のスナップショットが作成されると、コネクターではスナップショットプロセスは繰り返し処理されません。コネクターがキャプチャーする今後の変更イベントデータはストリーミングプロセス経由でのみ行われます。

ただし、場合によっては、最初のスナップショット中にコネクターを取得したデータが古くなったり、失われたり、または不完全となったり可能性があります。テーブルデータを再キャプチャーするメカニズムを提供するため、Debezium にはアドホックスナップショットを実行するオプションがあります。Debezium 環境で次のいずれかの変更が発生したら、アドホックスナップショットを実行することを推奨します。

  • コネクター設定は、異なるテーブルセットをキャプチャーするように変更されます。
  • Kafka トピックを削除して、再構築する必要があります。
  • 設定エラーや他の問題が原因で、データの破損が発生します。

アドホックと呼ばれるスナップショット を開始することで、以前にスナップショットをキャプチャーしたテーブルのスナップショットを再実行できます。アドホックスナップショットには、シグナルテーブル を使用する必要があります。シグナルリクエストを Debezium シグナルテーブルに送信して、アドホックスナップショットを開始します。

既存のテーブルのアドホックスナップショットを開始すると、コネクターはテーブルにすでに存在するトピックにコンテンツを追加します。既存のトピックが削除された場合には、トピックの自動作成 が有効になっているのであれば、Debezium は自動的にトピックを作成できます。

アドホックのスナップショットシグナルは、スナップショットに追加するテーブルを指定します。スナップショットは、データベースの内容全体をキャプチャーしたり、データベース内のテーブルのサブセットのみをキャプチャーしたりできます。また、スナップショットは、データベース内のテーブルの内容のサブセットをキャプチャできます。

execute-snapshot メッセージをシグナルテーブルに送信してキャプチャーするテーブルを指定します。execute-snapshot シグナルのタイプを incremental または blocking に設定し、スナップショットに含めるテーブルの名前を次の表に示すように指定します。

Expand
表2.27 アドホックの execute-snapshot シグナルレコードの例
フィールドデフォルト

type

incremental

実行するスナップショットのタイプを指定します。
現在、incremental または blocking スナップショットを要求できます。

data-collections

該当なし

スナップショットに含めるテーブルの完全修飾名に一致する正規表現を含む配列。
MariaDB コネクターの場合、テーブルの完全修飾名を指定するには、database.table の形式を使用します。

additional-conditions

該当なし

コネクターがスナップショットに含めるレコードのサブセットを決定するために評価する追加条件のセットを指定する、オプションの配列。
各追加条件は、アドホックスナップショットがキャプチャーするデータをフィルタリングする基準を指定するオブジェクトです。各追加条件には次のパラメーターを設定できます。

data-collection
フィルターを適用するテーブルの完全修飾名。各テーブルに異なるフィルターを適用できます。
filter
スナップショットに含めるためにデータベースレコードに存在する必要がある列の値を指定します (例: "color='blue'")。

filter パラメーターに割り当てる値は、ブロッキングスナップショットの snapshot.select.statement.overrides プロパティーを設定するときに SELECT ステートメントの WHERE 句で指定する値と同じタイプです。

surrogate-key

該当なし

スナップショット処理中にコネクターがテーブルのプライマリーキーとして使用する列名を指定するオプションの文字列。

アドホック増分スナップショットのトリガー

アドホック増分スナップショットを開始するには、execute-snapshot シグナルタイプのエントリーをシグナリングテーブルに追加するか、シグナルメッセージを Kafka シグナリングトピックに送信します。コネクターがメッセージを処理した後に、スナップショット操作を開始します。スナップショットプロセスは、最初と最後のプライマリーキーの値を読み取り、これらの値を各テーブルの開始ポイントおよびエンドポイントとして使用します。テーブルのエントリー数と設定されたチャンクサイズに基づいて、Debezium はテーブルをチャンクに分割し、チャンクごとに 1 度に 1 つずつスナップショットを順番に作成していきます。

詳細は、スナップショットの増分 を参照してください。

アドホックブロッキングスナップショットのトリガー

シグナリングテーブルまたはシグナリングトピックに、execute-snapshot シグナルタイプを持つエントリーを追加することによって、アドホックブロッキングスナップショットを開始します。コネクターがメッセージを処理した後に、スナップショット操作を開始します。コネクターはストリーミングを一時的に停止し、初期スナップショットの時と同じプロセスに従って、指定されたテーブルのスナップショットを開始します。スナップショットが完了すると、コネクターはストリーミングを再開します。

詳細は、ブロッキングスナップショット を参照してください。

2.2.1.6. 増分スナップショット

スナップショットを柔軟に管理するため、Debezium には 増分スナップショット と呼ばれる補助スナップショットメカニズムが含まれています。増分スナップショットは、Debezium コネクターにシグナルを送信 するための Debezium メカニズムに依存します。

増分スナップショットでは、最初のスナップショットのように、データベースの完全な状態を一度にすべてキャプチャーする代わりに、一連の設定可能なチャンクで各テーブルを段階的にキャプチャーします。スナップショットがキャプチャーするテーブルと、各チャンクのサイズ を指定できます。チャンクのサイズにより、データベース上の各フェッチ操作中にスナップショットで収集される行数が決まります。増分スナップショットのデフォルトのチャンクサイズは 1024 行です。

増分スナップショットが進むと、Debezium はウォーターマークを使用して進捗を追跡し、キャプチャーする各テーブル行のレコードを管理します。この段階的なアプローチでは、標準の初期スナップショットプロセスと比較して、以下の利点があります。

  • スナップショットが完了するまで、ストリーミングストリーミングを延期する代わりに、ストリームしたデータキャプチャーと並行して増分スナップショットを実行できます。コネクターはスナップショットプロセス全体で変更ログからのほぼリアルタイムイベントをキャプチャーし続け、他の操作はブロックしません。
  • 増分スナップショットの進捗が中断された場合は、データを失うことなく再開できます。プロセスが再開すると、スナップショットは最初からテーブルをキャプチャーするのではなく、停止した時点から開始します。
  • いつでも増分スナップショットを実行し、必要に応じてプロセスを繰り返してデータベースの更新に適合できます。たとえば、コネクター設定を変更してテーブルを table.include.list プロパティーに追加した後にスナップショットを再実行します。

増分スナップショットプロセス

増分スナップショットを実行する場合には、Debezium は各テーブルをプライマリーキー別に分類して、設定されたチャンクサイズ に基づいてテーブルをチャンクに分割します。チャンクごとに作業し、テーブルの行ごとにチャンクでキャプチャーします。キャプチャーする行ごとに、スナップショットは READ イベントを出力します。そのイベントは、対象となるチャンクのスナップショットを開始する時の行の値を表します。

スナップショットの作成が進むにつれ、他のプロセスがデータベースへのアクセスを継続し、テーブルレコードが変更される可能性があります。このような変更を反映させるように、通常通りに INSERTUPDATEDELETE 操作がトランザクションログにコミットされます。同様に、継続中の Debezium ストリーミングプロセスは、これらの変更イベントを検出し、対応する変更イベントレコードを Kafka に出力します。

Debezium を使用してプライマリーキーが同じレコード間での競合を解決する方法

場合によっては、ストリーミングプロセスが出力する UPDATE または DELETE イベントを順番に受信できます。つまり、ストリーミングプロセスは、スナップショットがその行の READ イベントが含まれるチャンクをキャプチャーする前に、テーブルの行を変更するイベントを生成する可能性があります。スナップショットが最終的に対象の行にあった READ イベントを出力すると、その値はすでに置き換えられています。Debezium は、シーケンスが到達する増分スナップショットイベントが正しい論理順序で処理されるように、競合を解決するためにバッファースキームを使用します。スナップショットのイベント間で競合が発生し、ストリームされたイベントが解決されてからでないと、Debezium はイベントのレコードを Kafka に送信しません。

スナップショットウィンドウ

遅れて入ってきた READ イベントと、同じテーブルの行を変更するストリーミングイベント間の競合の解決を容易にするために、Debezium は スナップショットウィンドウ と呼ばれるものを使用します。スナップショットウィンドウは、増分スナップショットが指定のテーブルチャンクのデータをキャプチャーしている途中に、間隔を決定します。チャンクのスナップショットウィンドウを開く前に、Debezium は通常の動作に従い、トランザクションログから直接ターゲットの Kafka トピックにイベントをダウンストリームに出力します。ただし、特定のチャンクのスナップショットが開放された瞬間から終了するまで、Debezium は重複除去のステップを実行して、プライマリーキーが同じイベント間での競合を解決します。

データコレクションごとに、Debezium は 2 種類のイベントを出力し、それらの両方のレコードを単一の宛先 Kafka トピックに保存します。テーブルから直接キャプチャーするスナップショットレコードは、READ 操作として出力されます。その間、ユーザーはデータコレクションのレコードの更新を続け、各コミットを反映するようにトランザクションログが更新されるので、Debezium は変更ごとに UPDATE または DELETE 操作を出力します。

スナップショットウィンドウが開放され、Debezium がスナップショットチャンクの処理を開始すると、スナップショットレコードをメモリーバッファーに提供します。スナップショットウィンドウ中に、バッファー内の READ イベントのプライマリーキーは、受信ストリームイベントのプライマリーキーと比較されます。一致するものが見つからない場合、ストリーミングされたイベントレコードが Kafka に直接送信されます。Debezium が一致を検出すると、バッファーされた READ イベントを破棄し、ストリーミングされたレコードを宛先トピックに書き込みます。これは、ストリーミングされたイベントが静的スナップショットイベントよりも論理的に優先されるためです。チャンクのスナップショットウィンドウが終了すると、バッファーに含まれるのは、関連するトランザクションログイベントが存在しない READ イベントのみです。Debezium は、これらの残りの READ イベントをテーブルの Kafka トピックに出力します。

コネクターは各スナップショットチャンクにプロセスを繰り返します。

現在、増分スナップショットを開始するには、次のいずれかの方法を使用できます。

2.2.1.6.1. 増分スナップショットのトリガー

増分スナップショットを開始するには、ソースデータベースのシグナリングテーブルに アドホックスナップショットシグナル を送信します。スナップショットシグナルは SQL INSERT クエリーとして送信します。

Debezium がシグナルテーブルの変更を検出すると、シグナルを読み取り、要求されたスナップショット操作を実行します。

送信するクエリーはスナップショットに追加するテーブルを指定し、必要に応じてスナップショット操作の種類を指定します。Debezium は現在、incrementalblocking のスナップショットタイプをサポートしています。

スナップショットに追加するテーブルを指定するには、テーブルをリストする data-collections 配列またはテーブルの照合に使用する正規表現の配列を指定します。以下に例を示します。

{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}

増分スナップショットシグナルの data-collections アレイにはデフォルト値がありません。data-collections 配列が空の場合、Debezium は空の配列をアクションが必要ないと解釈し、スナップショットは作成しません。

注記

スナップショットに含めるテーブルの名前にドット (.)、スペース、またはその他の英数字以外の文字が含まれている場合は、テーブル名を二重引用符でエスケープする必要があります。
たとえば、db1 データベースに存在し、My.Table という名前のテーブルを含めるには、"db1.\"My.Table\"" の形式を使用します。

前提条件

ソースシグナリングチャネルを使用して増分スナップショットをトリガーする

  1. SQL クエリーを送信し、アドホック増分スナップショット要求をシグナルテーブルに追加します。

    INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"<snapshotType>","additional-conditions":[{"data-collection": "<fullyQualfiedTableName>", "filter": "<additional-condition>"}]}');
    Copy to Clipboard Toggle word wrap

    以下に例を示します。

    INSERT INTO db1.debezium_signal (id, type, data) 
    1
    
    values ('ad-hoc-1',   
    2
    
        'execute-snapshot',  
    3
    
        '{"data-collections": ["db1.table1", "db1.table2"], 
    4
    
        "type":"incremental", 
    5
    
        "additional-conditions":[{"data-collection": "db1.table1" ,"filter":"color=\'blue\'"}]}'); 
    6
    Copy to Clipboard Toggle word wrap

    コマンドの idtype、および data パラメーターの値は、シグナルテーブルのフィールド に対応します。
    以下の表では、この例のパラメーターを説明しています。

    Expand
    表2.28 シグナルテーブルに増分スナップショットシグナルを送信する SQL コマンドのフィールドの説明
    項目説明

    1

    database.debezium_signal

    ソースデータベースにあるシグナルテーブルの完全修飾名を指定します。

    2

    ad-hoc-1

    id パラメーターは、シグナルリクエストの ID 識別子として割り当てられる任意の文字列を指定します。
    この文字列を使用して、シグナルテーブルのエントリーへのログメッセージを特定します。Debezium はこの文字列を使用しません。代わりに、スナップショット作成中に、Debezium は独自の ID 文字列をウォーターマークシグナルとして生成します。

    3

    execute-snapshot

    type パラメーターを指定し、シグナルがトリガーする操作を指定します。

    4

    data-collections

    シグナルの data フィールドの必須コンポーネントで、スナップショットに含めるテーブル名の配列またはテーブル名と一致する正規表現を指定します。
    配列には、database.table 形式を使用してテーブルの完全修飾名と一致する正規表現がリストされます。この形式は、コネクターの シグナリングテーブル の名前を指定するために使用する形式と同じです。

    5

    incremental

    実行するスナップショット操作のタイプを指定する、シグナルの data フィールドのオプションの type コンポーネント。
    有効な値は incrementalblocking です。
    値を指定しない場合、コネクターはデフォルトで増分スナップショットを実行します。

    6

    additional-conditions

    コネクターがスナップショットに含めるレコードのサブセットを決定するために評価する追加条件のセットを指定する、オプションの配列。
    各追加条件は、data-collection プロパティーと filter プロパティーを持つオブジェクトです。データの収集単位で異なるフィルターを指定できます。
    * data-collection プロパティーは、フィルターが適用されるデータコレクションの完全修飾名です。additional-conditions パラメーターの詳細は、additional-conditions 付きでアドホック増分スナップショットを実行する を参照してください。

additional-conditions 付きでアドホック増分スナップショットを実行する

スナップショットに、テーブル内のコンテンツのサブセットのみを含める場合は、スナップショットシグナルに additional-conditions パラメーターを追加してシグナル要求を変更できます。

一般的なスナップショットの SQL クエリーは、以下の形式を取ります。

SELECT * FROM <tableName> ....
Copy to Clipboard Toggle word wrap

additional-conditions パラメーターを追加して、以下の例のように WHERE 条件を SQL クエリーに追加します。

SELECT * FROM <data-collection> WHERE <filter> ....
Copy to Clipboard Toggle word wrap

以下の例は、シグナルテーブルに追加の条件を含むアドホック増分スナップショット要求を送信する SQL クエリーを示しています。

INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"<snapshotType>","additional-conditions":[{"data-collection": "<fullyQualfiedTableName>", "filter": "<additional-condition>"}]}');
Copy to Clipboard Toggle word wrap

たとえば、以下の列が含まれる products テーブルがあるとします。

  • id (プライマリーキー)
  • color
  • quantity

products テーブルの増分スナップショットに color=blue のデータ項目のみを含める場合は、次の SQL ステートメントを使用してスナップショットをトリガーできます。

INSERT INTO db1.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["db1.products"],"type":"incremental", "additional-conditions":[{"data-collection": "db1.products", "filter": "color=blue"}]}');
Copy to Clipboard Toggle word wrap

additional-conditions パラメーターを使用すると、列が 2 つ以上となる条件を指定することもできます。たとえば、前述の例の products テーブルを使用して、color=blue および quantity>10 だけに一致するアイテムのみのデータが含まれる増分スナップショットをトリガーするクエリーを送信できます。

INSERT INTO db1.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["db1.products"],"type":"incremental", "additional-conditions":[{"data-collection": "db1.products", "filter": "color=blue AND quantity>10"}]}');
Copy to Clipboard Toggle word wrap

以下の例は、コネクターによってキャプチャーされる増分スナップショットイベントの JSON を示しています。

例2.9 増分スナップショットイベントメッセージ

{
    "before":null,
    "after": {
        "pk":"1",
        "value":"New data"
    },
    "source": {
        ...
        "snapshot":"incremental" 
1

    },
    "op":"r", 
2

    "ts_ms":"1620393591654",
    "ts_us":"1620393591654547",
    "ts_ns":"1620393591654547920",
    "transaction":null
}
Copy to Clipboard Toggle word wrap
Expand
表2.29 増分スナップショットイベントメッセージのフィールドの説明
項目フィールド名説明

1

snapshot

実行するスナップショット操作タイプを指定します。
現在、有効なオプションは、blockingincremental のみ です。
シグナルテーブルに送信する SQL クエリーでの type 値の指定は任意です。
値を指定しない場合には、コネクターは増分スナップショットを実行します。

2

op

イベントタイプを指定します。
スナップショットイベントの値は r で、READ 操作を示します。

2.2.1.6.2. Kafka シグナルチャネルを使用して増分スナップショットをトリガーする

設定された Kafka トピック にメッセージを送信して、コネクターにアドホック増分スナップショットを実行するよう要求できます。

Kafka メッセージのキーは、topic.prefix コネクター設定オプションの値と一致する必要があります。

メッセージの値は、typedata フィールドが含まれる JSON オブジェクトとなっています。

シグナルタイプは execute-snapshot で、data フィールドには以下のフィールドが必要です。

Expand
表2.30 スナップショットデータフィールドの実行
フィールドデフォルト

type

incremental

実行するスナップショットのタイプ。現在、Debezium は incremental 型と blocking 型をサポートしています。
詳細は次のセクションを参照してください。

data-collections

該当なし

スナップショットに含めるテーブルの完全修飾名と一致する、コンマ区切りの正規表現の配列。
signal.data.collection 設定オプションに必要な形式と同じ形式を使用して名前を指定します。

additional-conditions

該当なし

コネクターがスナップショットに含めるレコードのサブセットを指定するために評価する基準を指定する、オプションの追加条件の配列。
各追加条件は、アドホックスナップショットがキャプチャーするデータをフィルタリングする基準を指定するオブジェクトです。各追加条件には次のパラメーターを設定できます。data-collection:: フィルターが適用されるテーブルの完全修飾名。各テーブルに異なるフィルターを適用できます。filter:: スナップショットに含めるためにデータベースレコードに存在する必要がある列の値を指定します (例: "color='blue'")。

filter パラメーターに割り当てる値は、ブロッキングスナップショットの snapshot.select.statement.overrides プロパティーを設定するときに SELECT ステートメントの WHERE 句で指定する値と同じタイプです。

例2.10 execute-snapshot Kafka メッセージ

Key = `test_connector`

Value = `{"type":"execute-snapshot","data": {"data-collections": ["{collection-container}.table1", "{collection-container}.table2"], "type": "INCREMENTAL"}}`
Copy to Clipboard Toggle word wrap

additional-conditions 付きのアドホック増分スナップショット

Debezium は additional-conditions フィールドを使用してテーブルのコンテンツのサブセットを選択します。

通常、Debezium はスナップショットを実行するときに、次のような SQL クエリーを実行します。

SELECT * FROM <tableName> …​.

スナップショット要求に additional-conditions プロパティーが含まれている場合、プロパティーの data-collection および filter パラメーターが SQL クエリーに追加されます。次に例を示します。

SELECT * FROM <data-collection> WHERE <filter> …​.

たとえば、列 id (プライマリーキー)、color、および brand を含む products テーブルがある場合、スナップショットに color='blue' のコンテンツのみを含める場合は、スナップショットをリクエストするときに、コンテンツをフィルタリングする additional-conditions プロパティーを追加することができます。

Key = `test_connector`

Value = `{"type":"execute-snapshot","data": {"data-collections": ["db1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "db1.products" ,"filter":"color='blue'"}]}}`
Copy to Clipboard Toggle word wrap

また、additional-conditions プロパティーを使用して、複数の列に基づいて条件を渡すこともできます。たとえば、前の例と同じ products テーブルを使用して、color='blue' および brand='MyBrand' である products テーブルのコンテンツのみをスナップショットに含める場合は、次のリクエストを送信できます。

Key = `test_connector`

Value = `{"type":"execute-snapshot","data": {"data-collections": ["db1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "db1.products" ,"filter":"color='blue' AND brand='MyBrand'"}]}}`
Copy to Clipboard Toggle word wrap
2.2.1.6.3. 増分スナップショットの停止

状況によっては、増分スナップショットを停止する必要がある場合があります。たとえば、スナップショットが正しく設定されていない場合や、他のデータベース操作にリソースが使用可能であるこのとの確認が必要な場合があります。ソースデータベースのシグナリングテーブルにシグナルを送信することで、すでに実行中のスナップショットを停止できます。

スナップショット停止信号をシグナリングテーブルに送信するには、SQL INSERT クエリーで送信します。stop-snapshot シグナルは、スナップショット操作の typeincremental として指定し、オプションで、現在実行中のスナップショットから省略するテーブルを指定します。Debezium はシグナルテーブルの変更を検出した後、シグナルを読み、増分スナップショット操作が進行中であればそれを停止します。

関連情報

また、JSON メッセージを Kafka シグナリングトピック に送信して、増分スナップショットを停止することもできます。

前提条件

ソースシグナリングチャネルを使用して増分スナップショットを停止する

  1. SQL クエリーを送信して、シグナリングテーブルへのアドホックインクリメンタルスナップショットを停止します。

    INSERT INTO <signalTable> (id, type, data) values ('<id>', 'stop-snapshot', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"incremental"}');
    Copy to Clipboard Toggle word wrap

    以下に例を示します。

    INSERT INTO db1.debezium_signal (id, type, data) 
    1
    
    values ('ad-hoc-1',   
    2
    
        'stop-snapshot',  
    3
    
        '{"data-collections": ["db1.table1", "db1.table2"], 
    4
    
        "type":"incremental"}'); 
    5
    Copy to Clipboard Toggle word wrap

    signal コマンドの idtype、および data パラメーターの値は、シグナリングテーブルのフィールド に対応します。
    以下の表では、この例のパラメーターを説明しています。

    Expand
    表2.31 シグナリングテーブルに増分スナップショット停止信号を送信するための SQL コマンドのフィールドの説明
    項目説明

    1

    database.debezium_signal

    ソースデータベースにあるシグナルテーブルの完全修飾名を指定します。

    2

    ad-hoc-1

    id パラメーターは、シグナルリクエストの ID 識別子として割り当てられる任意の文字列を指定します。
    この文字列を使用して、シグナルテーブルのエントリーへのログメッセージを特定します。Debezium はこの文字列を使用しません。

    3

    stop-snapshot

    type パラメーターを指定し、シグナルがトリガーする操作を指定します。

    4

    data-collections

    シグナルの data フィールドのオプションコンポーネントで、スナップショットから削除するテーブル名の配列またはテーブル名とマッチする正規表現を指定します。
    配列には、database.table の形式で完全修飾名でテーブルに一致する正規表現がリストされます。

    data フィールドからこのコンポーネントを省略すると、シグナルによって進行中の増分スナップショット全体が停止されます。

    5

    incremental

    停止するスナップショット操作のタイプを指定する信号の data フィールドの必須コンポーネント。
    現在、有効な唯一のオプションは incremental です。
    type の値を指定しない場合、シグナルは増分スナップショットの停止に失敗します。

2.2.1.6.4. Kafka シグナリングチャネルを使用して増分スナップショットを停止する

設定された Kafka シグナリングトピック にシグナルメッセージを送信して、アドホック増分スナップショットを停止できます。

Kafka メッセージのキーは、topic.prefix コネクター設定オプションの値と一致する必要があります。

メッセージの値は、typedata フィールドが含まれる JSON オブジェクトとなっています。

シグナルタイプは stop-snapshot で、data フィールドには以下のフィールドが必要です。

Expand
表2.32 スナップショットデータフィールドの実行
フィールドデフォルト

type

incremental

実行するスナップショットのタイプ。現在、Debezium は incremental 型のみをサポートしています。
詳細は次のセクションを参照してください。

data-collections

該当なし

テーブルの完全修飾名に一致する、コンマで区切られた正規表現のオプションの配列、スナップショットから削除するテーブル名に一致するテーブル名または正規表現の配列。
database.table の形式を使用してテーブル名を指定します。

次の例は、典型的な stop-snapshot の Kafka メッセージを示しています。

Key = `test_connector`

Value = `{"type":"stop-snapshot","data": {"data-collections": ["db1.table1", "db1.table2"], "type": "INCREMENTAL"}}`
Copy to Clipboard Toggle word wrap

2.2.1.7. ブロッキングスナップショット

スナップショットをより柔軟に管理するために、Debezium には ブロッキングスナップショット と呼ばれる補助アドホックスナップショットメカニズムが含まれています。ブロッキングスナップショットは、Debezium コネクターにシグナルを送信 するための Debezium メカニズムに依存します。

ブロッキングスナップショットは、ランタイム時にトリガーできることを除いて、初期スナップショット と同じように動作します。

次のような状況では、標準の初期スナップショットプロセスを使用するのではなく、ブロッキングスナップショットを実行する必要があります。

  • 新しいテーブルを追加し、コネクターの実行中にスナップショットを完了したいと考えている。
  • 大きなテーブルを追加し、増分スナップショットよりも短い時間でスナップショットを完了したいと考えている。

ブロッキングスナップショットのプロセス

ブロッキングスナップショットを実行すると、Debezium はストリーミングを停止し、初期スナップショットの時と同じプロセスに従って、指定されたテーブルのスナップショットを開始します。スナップショットが完了すると、ストリーミングが再開されます。

スナップショットの設定

シグナルの data コンポーネントでは、次のプロパティーを設定できます。

  • data-collections: スナップショットする必要のあるテーブルを指定します。
  • data-collections: スナップショットに含めるテーブルを指定します。
    このプロパティーは、完全修飾テーブル名に一致する正規表現のコンマ区切りリストを受け入れます。プロパティーの動作は、ブロッキングスナップショットでキャプチャーするテーブルを指定する table.include.list プロパティーの動作と似ています。
  • additional-conditions: テーブルごとに異なるフィルターを指定できます。

    • data-collection プロパティーは、フィルターが適用されるテーブルの完全修飾名であり、データベースに応じて大文字と小文字を区別するか、区別しないかを指定できます。
    • filter プロパティーは、snapshot.select.statement.overrides で使用される値と同じものが設定されます。これは、大文字小文字を区別して一致させる必要があるテーブルの完全修飾名です。

以下に例を示します。

  {"type": "blocking", "data-collections": ["schema1.table1", "schema1.table2"], "additional-conditions": [{"data-collection": "schema1.table1", "filter": "SELECT * FROM [schema1].[table1] WHERE column1 = 0 ORDER BY column2 DESC"}, {"data-collection": "schema1.table2", "filter": "SELECT * FROM [schema1].[table2] WHERE column2 > 0"}]}
Copy to Clipboard Toggle word wrap

重複の可能性

スナップショットをトリガーするシグナルを送信した時点と、ストリーミングが停止してスナップショットが開始する時点との間に遅延が生じる可能性があります。この遅延の結果、スナップショットが完了した後、コネクターはスナップショットがキャプチャーしたレコードと重複するイベントレコードを発行する可能性があります。

2.2.1.8. Debezium MariaDB 変更イベントレコードを受信する Kafka トピックのデフォルト名

デフォルトでは、MariaDB コネクターは、テーブルで発生するすべての INSERTUPDATE、および DELETE 操作の変更イベントを、そのテーブルに固有の単一の Apache Kafka トピックに書き込みます。

コネクターは以下の規則を使用して変更イベントトピックに名前を付けます。

topicPrefix.databaseName.tableName

fulfillment はトピック接頭辞、inventory はデータベース名で、データベースに orderscustomers、および products という名前のテーブルが含まれるとします。Debezium MariaDB コネクターは、データベース内の各テーブルに 1 つずつ、3 つの Kafka トピックにイベントを送信します。

fulfillment.inventory.orders
fulfillment.inventory.customers
fulfillment.inventory.products
Copy to Clipboard Toggle word wrap

以下のリストは、デフォルト名のコンポーネントの定義を示しています。

topicPrefix
topic.prefix コネクター設定プロパティーで指定されたトピック接頭辞。
schemaName
操作が発生したスキーマの名前。
tableName
操作が発生したテーブルの名前。

コネクターは同様の命名規則を適用して、内部データベーススキーマの履歴トピック (スキーマ変更トピックトランザクションメタデータトピック) にラベルを付けます。

デフォルトのトピック名が要件を満たさない場合は、カスタムトピック名を設定できます。カスタムトピック名を設定するには、論理トピックルーティング SMT に正規表現を指定します。論理トピックルーティング SMT を使用してトピックの命名をカスタマイズする方法は、トピックルーティング を参照してください。

トランザクションメタデータ

Debezium は、トランザクション境界を表し、データ変更イベントメッセージを強化するイベントを生成できます。

Debezium がトランザクションメタデータを受信する場合の制限

Debezium は、コネクターのデプロイ後に発生するトランザクションに対してのみメタデータを登録し、受信します。コネクターをデプロイする前に発生するトランザクションのメタデータは利用できません。

Debezium は、すべてのトランザクションで BEGIN および END 区切り文字のトランザクション境界イベントを生成します。トランザクション境界イベントには以下のフィールドが含まれます。

status
BEGIN または END
id
一意のトランザクション識別子の文字列表現。
ts_ms
データソースでのトランザクション境界イベント (BEGIN または END イベント) の時間。データソースから Debezium にイベント時間を渡されない場合、フィールドは代わりに Debezium がイベントを処理する時間を表します。
event_count (END イベント用)
トランザクションによって出力されるイベントの合計数。
data_collections (END イベント用)
data_collectionevent_count 要素のペアの配列。これは、コネクターがデータコレクションから発信された変更に対して出力するイベントの数を示します。

{
  "status": "BEGIN",
  "id": "0e4d5dcd-a33b-11ea-80f1-02010a22a99e:10",
  "ts_ms": 1486500577125,
  "event_count": null,
  "data_collections": null
}

{
  "status": "END",
  "id": "0e4d5dcd-a33b-11ea-80f1-02010a22a99e:10",
  "ts_ms": 1486500577691,
  "event_count": 2,
  "data_collections": [
    {
      "data_collection": "s1.a",
      "event_count": 1
    },
    {
      "data_collection": "s2.a",
      "event_count": 1
    }
  ]
}
Copy to Clipboard Toggle word wrap

topic.transaction オプションで上書きされない限り、コネクターはトランザクションイベントを <topic.prefix>.transaction トピックに出力します。

変更データイベントのエンリッチメント

トランザクションメタデータを有効にすると、データメッセージ Envelope は新しい transaction フィールドで強化されます。このフィールドは、複合フィールドの形式ですべてのイベントに関する情報を提供します。

id
一意のトランザクション識別子の文字列表現。
total_order
トランザクションによって生成されたすべてのイベントを対象とするイベントの絶対位置。
data_collection_order
トランザクションによって出力されたすべてのイベントを対象とするイベントのデータコレクションごとの位置。

以下は、メッセージの例になります。

{
  "before": null,
  "after": {
    "pk": "2",
    "aa": "1"
  },
  "source": {
...
  },
  "op": "c",
  "ts_ms": "1580390884335",
  "ts_us": "1580390884335472",
  "ts_ns": "1580390884335472987",
  "transaction": {
    "id": "0e4d5dcd-a33b-11ea-80f1-02010a22a99e:10",
    "total_order": "1",
    "data_collection_order": "1"
  }
}
Copy to Clipboard Toggle word wrap

2.2.2. Debezium MariaDB コネクターデータ変更イベントの説明

Debezium MariaDB コネクターは、行レベルの INSERTUPDATE、および DELETE 操作ごとにデータ変更イベントを生成します。各イベントにはキーと値が含まれます。キーと値の構造は、変更されたテーブルによって異なります。

Debezium および Kafka Connect は、イベントメッセージの継続的なストリーム を中心として設計されています。ただし、これらのイベントの構造は時間の経過とともに変化する可能性があり、コンシューマーによる処理が困難になることがあります。これに対応するために、各イベントにはコンテンツのスキーマが含まれます。スキーマレジストリーを使用している場合は、コンシューマーがレジストリーからスキーマを取得するために使用できるスキーマ ID が含まれます。これにより、各イベントが自己完結型になります。

以下のスケルトン JSON は、変更イベントの基本となる 4 つの部分を示しています。ただし、アプリケーションで使用するために選択した Kafka Connect コンバーターの設定方法によって、変更イベントのこれら 4 部分の表現が決定されます。schema フィールドは、変更イベントが生成されるようにコンバーターを設定した場合のみ変更イベントに含まれます。同様に、イベントキーおよびイベントペイロードは、変更イベントが生成されるようにコンバーターを設定した場合のみ変更イベントに含まれます。JSON コンバーターを使用し、変更イベントの基本となる 4 つの部分すべてを生成するように設定すると、変更イベントの構造は次のようになります。

{
 "schema": { 
1

   ...
  },
 "payload": { 
2

   ...
 },
 "schema": { 
3

   ...
 },
 "payload": { 
4

   ...
 },
}
Copy to Clipboard Toggle word wrap
Expand
表2.33 変更イベントの基本内容の概要
項目フィールド名説明

1

schema

最初の schema フィールドはイベントキーの一部です。イベントキーの payload の部分の内容を記述する Kafka Connect スキーマを指定します。つまり、最初の schema フィールドは、変更されたテーブルのプライマリーキーの構造、またはテーブルにプライマリーキーがない場合は変更されたテーブルの一意キーの構造を記述します。

message.key.columns コネクター設定プロパティー を設定すると、テーブルのプライマリーキーをオーバーライドできます。この場合、最初の schema フィールドはそのプロパティーによって識別されるキーの構造を記述します。

2

payload

最初の payload フィールドはイベントキーの一部です。前述の schema フィールドによって記述された構造を持ち、変更された行のキーが含まれます。

3

schema

2 つ目の schema フィールドはイベント値の一部です。イベント値の payload の部分の内容を記述する Kafka Connect スキーマを指定します。つまり、2 つ目の schema は変更された行の構造を記述します。通常、このスキーマには入れ子になったスキーマが含まれます。

4

payload

2 つ目の payload フィールドはイベント値の一部です。前述の schema フィールドによって記述された構造を持ち、変更された行の実際のデータが含まれます。

デフォルトでは、コネクターによって、変更イベントレコードがイベントの元のテーブルと同じ名前を持つトピックにストリーミングされます。トピック名 を参照してください。

警告

MariaDB コネクターは、すべての Kafka Connect スキーマ名が Avro スキーマ名形式 に準拠していることを確認します。つまり、論理サーバー名はアルファベットまたはアンダースコア (a-z、A-Z、または _) で始まる必要があります。論理サーバー名の残りの各文字と、データベース名とテーブル名の各文字は、アルファベット、数字、またはアンダースコア ( a-z、A-Z、0-9、または _) でなければなりません。無効な文字がある場合は、アンダースコアに置き換えられます。

論理サーバー名、データベース名、またはテーブル名に無効な文字が含まれ、名前を区別する唯一の文字が無効であると、無効な文字はすべてアンダースコアに置き換えられるため、予期せぬ競合が発生する可能性があります。

詳細は以下を参照してください。

2.2.2.1. Debezium MariaDB 変更イベントのキーについて

変更イベントのキーには、変更されたテーブルのキーのスキーマと、変更された行の実際のキーのスキーマが含まれます。スキーマとそれに対応するペイロードの両方には、コネクターによってイベントが作成された時点において、変更されたテーブルの PRIMARY KEY (または一意の制約) に存在した各列のフィールドが含まれます。

以下の customers テーブルについて考えてみましょう。この後に、このテーブルの変更イベントキーの例を示します。

CREATE TABLE customers (
  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  first_name VARCHAR(255) NOT NULL,
  last_name VARCHAR(255) NOT NULL,
  email VARCHAR(255) NOT NULL UNIQUE KEY
) AUTO_INCREMENT=1001;
Copy to Clipboard Toggle word wrap

customers テーブルへの変更をキャプチャーする変更イベントのすべてに、イベントキースキーマがあります。customers テーブルに前述の定義がある限り、customers テーブルへの変更をキャプチャーする変更イベントのキー構造はすべて以下のようになります。JSON では、以下のようになります。

{
 "schema": { 
1

    "type": "struct",
    "name": "mariadb-server-1.inventory.customers.Key", 
2

    "optional": false, 
3

    "fields": [ 
4

      {
        "field": "id",
        "type": "int32",
        "optional": false
      }
    ]
  },
 "payload": { 
5

    "id": 1001
  }
}
Copy to Clipboard Toggle word wrap
Expand
表2.34 変更イベントキーの説明
項目フィールド名説明

1

schema

キーのスキーマ部分は、キーの payload 部分の内容を記述する Kafka Connect スキーマを指定します。

2

mariadb-server-1.inventory.customers.Key

キーのペイロードの構造を定義するスキーマの名前。このスキーマは、変更されたテーブルのプライマリーキーの構造を記述します。キースキーマ名の形式は connector-name.database-name.table-name.Key です。この例では、以下のようになります。

  • mariadb-server-1 は、このイベントを生成したコネクターの名前です。
  • inventory は変更されたテーブルが含まれるデータベースです。
  • customers は更新されたテーブルです。

3

optional

イベントキーの payload フィールドに値が含まれる必要があるかどうかを示します。この例では、キーのペイロードに値が必要です。テーブルにプライマリーキーがない場合は、キーの payload フィールドの値は任意です。

4

fields

各フィールドの名前、型、および必要かどうかなど、payload で想定される各フィールドを指定します。

5

payload

この変更イベントが生成された行のキーが含まれます。この例では、キーには値が 1001 の 1 つの id フィールドが含まれます。

2.2.2.2. Debezium MariaDB 変更イベントの値について

変更イベントの値はキーよりも若干複雑です。キーと同様に、値には schema セクションと payload セクションがあります。schema セクションには、入れ子のフィールドを含む、Envelope セクションの payload 構造を記述するスキーマが含まれています。データを作成、更新、または削除する操作のすべての変更イベントには、Envelope 構造を持つ値 payload があります。

変更イベントキーの例を紹介するために使用した、同じサンプルテーブルについて考えてみましょう。

CREATE TABLE customers (
  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  first_name VARCHAR(255) NOT NULL,
  last_name VARCHAR(255) NOT NULL,
  email VARCHAR(255) NOT NULL UNIQUE KEY
) AUTO_INCREMENT=1001;
Copy to Clipboard Toggle word wrap

このテーブルへの変更に対する変更イベントの値部分には以下について記述されています。

create イベント

以下の例は、customers テーブルにデータを作成する操作に対して、コネクターによって生成される変更イベントの値の部分を示しています。

{
  "schema": { 
1

    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "mariadb-server-1.inventory.customers.Value", 
2

        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "mariadb-server-1.inventory.customers.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "connector"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_us"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_ns"
          },
          {
            "type": "boolean",
            "optional": true,
            "default": false,
            "field": "snapshot"
          },
          {
            "type": "string",
            "optional": false,
            "field": "db"
          },
          {
            "type": "string",
            "optional": true,
            "field": "table"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "server_id"
          },
          {
            "type": "string",
            "optional": true,
            "field": "gtid"
          },
          {
            "type": "string",
            "optional": false,
            "field": "file"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "pos"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "row"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "thread"
          },
          {
            "type": "string",
            "optional": true,
            "field": "query"
          }
        ],
        "optional": false,
        "name": "io.debezium.connector.mariadb.Source", 
3

        "field": "source"
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_us"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ns"
      }
    ],
    "optional": false,
    "name": "mariadb-server-1.inventory.customers.Envelope" 
4

  },
  "payload": { 
5

    "op": "c", 
6

    "ts_ms": 1465491411815, 
7

    "ts_us": 1465491411815437, 
8

    "ts_ns": 1465491411815437158, 
9

    "before": null, 
10

    "after": { 
11

      "id": 1004,
      "first_name": "Anne",
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "source": { 
12

      "version": "3.0.8.Final",
      "connector": "mariadb",
      "name": "mariadb-server-1",
      "ts_ms": 0,
      "ts_us": 0,
      "ts_ns": 0,
      "snapshot": false,
      "db": "inventory",
      "table": "customers",
      "server_id": 0,
      "gtid": null,
      "file": "mariadb-bin.000003",
      "pos": 154,
      "row": 0,
      "thread": 7,
      "query": "INSERT INTO customers (first_name, last_name, email) VALUES ('Anne', 'Kretchmar', 'annek@noanswer.org')"
    }
  }
}
Copy to Clipboard Toggle word wrap
Expand
表2.35 作成 イベント値フィールドの説明
項目フィールド名説明

1

schema

値のペイロードの構造を記述する、値のスキーマ。変更イベントの値スキーマは、コネクターが特定のテーブルに生成するすべての変更イベントで同じになります。

2

name

schema セクションで、各 name フィールドは、値のペイロードのフィールドに対するスキーマを指定します。.

mariadb-server-1.inventory.customers.Value は、beforeafter ペイロードのスキーマです。このスキーマは customers テーブルに固有です。

before および after フィールドのスキーマ名は logicalName.tableName.Value の形式で、スキーマ名がデータベースで一意になるようにします。つまり、Avro コンバーター を使用する場合、各論理ソースの各テーブルの Avro スキーマは、それぞれ独自に進化し、独自の履歴を持つことになります。

3

name

io.debezium.connector.mariadb.Source は、ペイロードの source フィールドのスキーマです。このスキーマは MariaDB コネクターに固有です。コネクターは生成するすべてのイベントにこれを使用します。

4

name

mariadb-server-1.inventory.customers.Envelope はペイロードの全体構造のスキーマです。ここで、mariadb-server-1 はコネクター名、inventory はデータベース、customers はテーブルです。

5

payload

値の実際のデータ。これは、変更イベントが提供する情報です。

イベントの JSON 表現はそれが記述する行よりもはるかに大きいように見えることがあります。これは、JSON 表現にはメッセージのスキーマ部分とペイロード部分を含める必要があるためです。しかし、Avro コンバーター を使用すると、コネクターが Kafka トピックにストリーミングするメッセージのサイズを大幅に小さくすることができます。

6

op

コネクターによってイベントが生成される原因となった操作の型を記述する必須文字列。この例では、c は操作によって行が作成されたことを示しています。有効な値は以下のとおりです。

  • c = create
  • u = update
  • d = delete
  • r = read (読み取り、スナップショットのみに適用)

7

ts_ms, ts_us, ts_ns

コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。

source オブジェクトで、ts_ms は変更がデータベースに加えられた時間を示します。payload.source.ts_ms の値を payload.ts_ms の値と比較することにより、ソースデータベースの更新と Debezium との間の遅延を判断できます。

8

before

イベント発生前の行の状態を指定する任意のフィールド。この例のように、op フィールドが create (作成) の c である場合、この変更イベントは新しい内容に対するものであるため、beforenull になります。

9

after

イベント発生後の行の状態を指定する任意のフィールド。この例では、after フィールドには、新しい行の idfirst_namelast_name、および email 列の値が含まれます。

10

source

イベントのソースメタデータを記述する必須のフィールド。このフィールドには、イベントの発生元、イベントの発生順序、およびイベントが同じトランザクションの一部であるかどうかなど、このイベントと他のイベントを比較するために使用できる情報が含まれています。ソースメタデータには以下が含まれています。

  • Debezium バージョン
  • コネクター名
  • イベントが記録された binlog 名
  • binlog の位置
  • イベント内の行
  • イベントがスナップショットの一部であるか
  • 新しい行が含まれるデータベースおよびテーブルの名前
  • イベントを作成した MariaDB スレッドの ID (スナップショット以外のみ)
  • MariaDB サーバー ID (利用可能な場合)
  • データベースに変更が加えられた時点のタイムスタンプ

MariaDB データベース設定で binlog_annotate_row_events オプションが有効で、コネクター設定で include.query プロパティーを有効にすると、source フィールドは、変更イベントの起因となった元の SQL ステートメントが含まれる query フィールドも提供します。

更新 イベント

サンプル customers テーブルにある更新の変更イベントの値には、そのテーブルの 作成 イベントと同じスキーマがあります。同様に、イベント値のペイロードは同じ構造を持ちます。ただし、イベント値ペイロードでは 更新 イベントに異なる値が含まれます。以下は、コネクターによって customers テーブルでの更新に生成されるイベントの変更イベント値の例になります。

{
  "schema": { ... },
  "payload": {
    "before": { 
1

      "id": 1004,
      "first_name": "Anne",
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "after": { 
2

      "id": 1004,
      "first_name": "Anne Marie",
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "source": { 
3

      "version": "3.0.8.Final",
      "name": "mariadb-server-1",
      "connector": "mariadb",
      "ts_ms": 1465581029100,
      "ts_us": 1465581029100000,
      "ts_ns": 1465581029100000000,
      "snapshot": false,
      "db": "inventory",
      "table": "customers",
      "server_id": 223344,
      "gtid": null,
      "file": "mariadb-bin.000003",
      "pos": 484,
      "row": 0,
      "thread": 7,
      "query": "UPDATE customers SET first_name='Anne Marie' WHERE id=1004"
    },
    "op": "u", 
4

    "ts_ms": 1465581029523, 
5

    "ts_us": 1465581029523758, 
6

    "ts_ns": 1465581029523758914 
7

  }
}
Copy to Clipboard Toggle word wrap
Expand
表2.36 更新 イベント値フィールドの説明
項目フィールド名説明

1

before

イベント発生前の行の状態を指定する任意のフィールド。更新 イベント値の before フィールドには、各テーブル列のフィールドと、データベースのコミット前にその列にあった値が含まれます。この例では、first_name 値は Anne です。

2

after

イベント発生後の行の状態を指定する任意のフィールド。beforeafter の構造を比較すると、この行への更新内容を判断できます。この例では、first_name 値は Anne Marie です。

3

source

イベントのソースメタデータを記述する必須のフィールド。source フィールド構造には create イベントと同じフィールドがありますが、一部の値が異なります。たとえば、更新 イベントは binlog の異なる位置から発生します。ソースメタデータには以下が含まれています。

  • Debezium バージョン
  • コネクター名
  • イベントが記録された binlog 名
  • binlog の位置
  • イベント内の行
  • イベントがスナップショットの一部であるか
  • 更新された行が含まれるデータベースおよびテーブルの名前
  • イベントを作成した MariaDB スレッドの ID (スナップショット以外のみ)
  • MariaDB サーバー ID (利用可能な場合)
  • データベースに変更が加えられた時点のタイムスタンプ

MariaDB データベース設定で binlog_annotate_row_events オプションが有効で、コネクター設定で include.query プロパティーを有効にすると、source フィールドは、変更イベントの起因となった元の SQL ステートメントが含まれる query フィールドも提供します。

4

op

操作の型を記述する必須の文字列。更新 イベントの値では、op フィールドの値は u で、更新によってこの行が変更したことを示します。

5

ts_ms

コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。

source オブジェクトで、ts_ms は変更がデータベースに加えられた時間を示します。payload.source.ts_ms の値を payload.ts_ms の値と比較することにより、ソースデータベースの更新と Debezium との間の遅延を判断できます。

6

ts_us

コネクターがイベントを処理した時間をマイクロ秒単位で表示するオプションフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。

7

ts_ns

コネクターがイベントを処理した時間をナノ秒単位で表示するオプションフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。

注記

行のプライマリーキー/一意キーの列を更新すると、行のキーの値が変更されます。キーが変更されると、3 つ のイベントが Debezium によって出力されます。3 つのイベントとは、DELETE イベント、行の古いキーを持つ 廃棄 (tombstone)、およびそれに続く行の新しいキーを持つイベントです。詳細は次のセクションで説明します。

プライマリーキーの更新

行のプライマリーキーフィールドを変更する UPDATE 操作は、プライマリーキーの変更と呼ばれます。プライマリーキーの変更では、UPDATE イベントレコードの代わりにコネクターが古いキーの DELETE イベントレコードと、新しい (更新された) キーの CREATE イベントレコードを出力します。これらのイベントには通常の構造と内容があり、イベントごとにプライマリーキーの変更に関連するメッセージヘッダーがあります。

  • DELETE イベントレコードには、メッセージヘッダーとして __debezium.newkey が含まれます。このヘッダーの値は、更新された行の新しいプライマリーキーです。
  • CREATE イベントレコードには、メッセージヘッダーとして __debezium.oldkey が含まれます。このヘッダーの値は、更新された行にあった以前の (古い) プライマリーキーです。

delete イベント

削除 変更イベントの値は、同じテーブルの 作成 および 更新 イベントと同じ schema の部分になります。サンプル customers テーブルの 削除 イベントの payload 部分は以下のようになります。

{
  "schema": { ... },
  "payload": {
    "before": { 
1

      "id": 1004,
      "first_name": "Anne Marie",
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "after": null, 
2

    "source": { 
3

      "version": "3.0.8.Final",
      "connector": "mariadb",
      "name": "mariadb-server-1",
      "ts_ms": 1465581902300,
      "ts_us": 1465581902300000,
      "ts_ns": 1465581902300000000,
      "snapshot": false,
      "db": "inventory",
      "table": "customers",
      "server_id": 223344,
      "gtid": null,
      "file": "mariadb-bin.000003",
      "pos": 805,
      "row": 0,
      "thread": 7,
      "query": "DELETE FROM customers WHERE id=1004"
    },
    "op": "d", 
4

    "ts_ms": 1465581902461, 
5

    "ts_us": 1465581902461842, 
6

    "ts_ns": 1465581902461842579 
7

  }
}
Copy to Clipboard Toggle word wrap
Expand
表2.37 削除 イベント値フィールドの説明
項目フィールド名説明

1

before

イベント発生前の行の状態を指定する任意のフィールド。削除 イベント値の before フィールドには、データベースのコミットで削除される前に行にあった値が含まれます。

2

after

イベント発生後の行の状態を指定する任意のフィールド。削除 イベント値の after フィールドは null で、行が存在しないことを示します。

3

source

イベントのソースメタデータを記述する必須のフィールド。削除 イベント値の source フィールド構造は、同じテーブルの 作成 および 更新 イベントと同じになります。多くの source フィールドの値も同じです。削除 イベント値では、ts_ms および pos フィールドの値や、その他の値が変更された可能性があります。ただし、削除 イベント値の source フィールドは、同じメタデータを提供します。

  • Debezium バージョン
  • コネクター名
  • イベントが記録された binlog 名
  • binlog の位置
  • イベント内の行
  • イベントがスナップショットの一部であるか
  • 更新された行が含まれるデータベースおよびテーブルの名前
  • イベントを作成した MariaDB スレッドの ID (スナップショット以外のみ)
  • MariaDB サーバー ID (利用可能な場合)
  • データベースに変更が加えられた時点のタイムスタンプ

MariaDB データベース設定で binlog_annotate_row_events オプションが有効で、コネクター設定で include.query プロパティーを有効にすると、source フィールドは、変更イベントの起因となった元の SQL ステートメントが含まれる query フィールドも提供します。

4

op

操作の型を記述する必須の文字列。op フィールドの値は d で、行が削除されたことを示します。

5

ts_ms

コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。

source オブジェクトで、ts_ms は変更がデータベースに加えられた時間を示します。payload.source.ts_ms の値を payload.ts_ms の値と比較することにより、ソースデータベースの更新と Debezium との間の遅延を判断できます。

6

ts_us

コネクターがイベントを処理した時間をマイクロ秒単位で表示するオプションフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。

7

ts_ns

コネクターがイベントを処理した時間をナノ秒単位で表示するオプションフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。

削除 変更イベントレコードは、この行の削除を処理するために必要な情報を持つコンシューマーを提供します。コンシューマーによっては、削除を適切に処理するために古い値が必要になることがあるため、古い値が含まれます。

MariaDB コネクターイベントは、Kafka log compaction と連携するように設計されています。ログコンパクションにより、少なくとも各キーの最新のメッセージが保持される限り、一部の古いメッセージを削除できます。これにより、トピックに完全なデータセットが含まれ、キーベースの状態のリロードに使用できるようにするとともに、Kafka がストレージ領域を確保できるようにします。

tombstone イベント

行が削除された場合でも、Kafka は同じキーを持つ以前のメッセージをすべて削除できるため、削除 イベントの値はログコンパクションで動作します。ただし、Kafka が同じキーを持つすべてのメッセージを削除するには、メッセージの値が null である必要があります。これを可能にするために、Debezium MariaDB コネクターは delete イベントを発行した後、null 値以外の同じキーを持つ特別な tombstone イベントを発行します。

truncate イベント

truncate 変更イベントは、テーブルが切り捨てられたことを通知します。truncate イベントのメッセージキーが null です。メッセージの値は次の例のようになります。

{
    "schema": { ... },
    "payload": {
        "source": { 
1

            "version": "3.0.8.Final",
            "name": "mariadb-server-1",
            "connector": "mariadb",
            "name": "mariadb-server-1",
            "ts_ms": 1465581029100,
            "ts_us": 1465581029100000,
            "ts_ns": 1465581029100000000,
            "snapshot": false,
            "db": "inventory",
            "table": "customers",
            "server_id": 223344,
            "gtid": null,
            "file": "mariadb-bin.000003",
            "pos": 484,
            "row": 0,
            "thread": 7,
            "query": "UPDATE customers SET first_name='Anne Marie' WHERE id=1004"
        },
        "op": "t", 
2

        "ts_ms": 1465581029523, 
3

        "ts_us": 1465581029523468, 
4

        "ts_ns": 1465581029523468471 
5

    }
}
Copy to Clipboard Toggle word wrap
Expand
表2.38 truncate イベント値フィールドの説明
項目フィールド名説明

1

source

イベントのソースメタデータを記述する必須のフィールド。truncate イベント値の source フィールド構造は、同じテーブルの 作成更新、および 削除 イベントと同じで、以下のメタデータを提供します。

  • Debezium バージョン
  • コネクター型および名前
  • イベントが記録された binlog 名
  • binlog の位置
  • イベント内の行
  • イベントがスナップショットの一部であるか
  • データベースおよびテーブルの名前
  • イベントを切り捨てた MariaDB スレッドの ID (スナップショット以外のみ)
  • MariaDB サーバー ID (利用可能な場合)
  • データベースに変更が加えられた時点のタイムスタンプ

2

op

操作の型を記述する必須の文字列。op フィールドの値は t で、このテーブルが切り捨てされたことを示します。

3

ts_ms

コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。

source オブジェクトで、ts_ms は変更がデータベースに加えられた時間を示します。payload.source.ts_ms の値を payload.ts_ms の値と比較することにより、ソースデータベースの更新と Debezium との間の遅延を判断できます。

4

ts_us

コネクターがイベントを処理した時間をマイクロ秒単位で表示するオプションフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。

5

ts_ns

コネクターがイベントを処理した時間をナノ秒単位で表示するオプションフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。

1 つの TRUNCATE ステートメントが複数のテーブルに適用される場合、コネクターは切り捨てられたテーブルごとに 1 つの truncate 変更イベントレコードを出力します。

注記

truncate イベントは、テーブル全体に対して加えられ、メッセージキーを持たない変更を表します。その結果、複数のパーティションがあるトピックでは、変更イベント (createupdate など)、またはテーブルに関連する truncate イベントについて、順序は保証されません。たとえば、コンシューマーが複数のパーティションからテーブルのイベントを読み取る場合、別のパーティションからテーブル内のすべてのデータを削除する truncate イベントを受信した後、あるパーティションからテーブルの update イベントを受け取る可能性があります。順序は、単一のパーティションを使用するトピックでのみ保証されます。

コネクターに truncate イベントをキャプチャーさせたくない場合は、skipped.operations オプションを使用して除外します。

2.2.3. Debezium MariaDB コネクターがデータ型をマッピングする方法

Debezium MariaDB コネクターは、行が存在するテーブルのように構造化されたイベントを使用して行の変更を表します。イベントには、各列の値のフィールドが含まれます。その列の MariaDB データ型によって、Debezium がイベント内の値をどのように表現するかが決まります。

文字列を格納する列は、MariaDB では文字セットと照合順序を使用して定義されます。MariaDB コネクターは、binlog イベント内の列値のバイナリー表現を読み取るときに、列の文字セットを使用します。

コネクターは、MariaDB データ型を literalsemantic 型の両方にマップできます。

  • リテラル型: Kafka Connect スキーマタイプを使用して値がどのように表されるか。
  • セマンティック型: Kafka Connect スキーマがどのようにフィールド (スキーマ名) の意味をキャプチャーするか。

デフォルトのデータ型変換が要件に合わない場合は、コネクター用の カスタムコンバーターの作成 が可能です。

詳細は以下を参照してください。

基本型

次の表は、コネクターが基本的な MariaDB データ型をどのようにマッピングするかを示しています。

Expand
表2.39 基本型のマッピングの説明
MariaDB タイプリテラル型セマンティック型

BOOLEAN, BOOL

BOOLEAN

該当なし

BIT(1)

BOOLEAN

該当なし

BIT(>1)

BYTES

io.debezium.data.Bits

length パラメーターには、ビット数を表す整数が含まれます。byte[] にはビットが リトルエンディアン 形式で含まれ、指定数のビットが含まれるようにサイズが指定されます。たとえば、n はビットです。
numBytes = n/8 + (n%8== 0 ?0 : 1)

TINYINT

INT16

該当なし

SMALLINT[(M)]

INT16

該当なし

MEDIUMINT[(M)]

INT32

該当なし

INT, INTEGER[(M)]

INT32

該当なし

BIGINT[(M)]

INT64

該当なし

REAL[(M,D)]

FLOAT32

該当なし

FLOAT[(P)]

FLOAT32 または FLOAT64

精度は、ストレージサイズを決定するためにのみ使用されます。0 から 23 までの精度 P は、4 バイトの単精度 FLOAT32 列になります。24 から 53 までの精度 P は、8 バイトの倍精度 FLOAT64 列になります。

FLOAT(M,D)

FLOAT64

該当なし

DOUBLE[(M,D)]

FLOAT64

該当なし

CHAR(M)]

STRING

該当なし

VARCHAR(M)]

STRING

該当なし

BINARY(M)]

BYTES または STRING

該当なし

binary.handling.mode コネクター設定プロパティーの設定に基づいて、raw バイト (デフォルト)、base64 でエンコードされた文字列、base64-url-safe-encoded 文字列、または 16 進数でエンコードされた文字列のいずれか。

VARBINARY(M)]

BYTES または STRING

該当なし

binary.handling.mode コネクター設定プロパティーの設定に基づいて、raw バイト (デフォルト)、base64 でエンコードされた文字列、base64-url-safe-encoded 文字列、または 16 進数でエンコードされた文字列のいずれか。

TINYBLOB

BYTES または STRING

該当なし

binary.handling.mode コネクター設定プロパティーの設定に基づいて、raw バイト (デフォルト)、base64 でエンコードされた文字列、base64-url-safe-encoded 文字列、または 16 進数でエンコードされた文字列のいずれか。

TINYTEXT

STRING

該当なし

BLOB

BYTES または STRING

該当なし

binary.handling.mode コネクター設定プロパティーの設定に基づいて、raw バイト (デフォルト)、base64 でエンコードされた文字列、base64-url-safe-encoded 文字列、または 16 進数でエンコードされた文字列のいずれか。

サイズが最大 2GB の値のみがサポートされます。Claim Check パターンを使用して、大きな列の値を外部化することが推奨されます。

TEXT

STRING

n/a

2GB までのサイズの値のみがサポートされています。Claim Check パターンを使用して、大きな列の値を外部化することが推奨されます。

MEDIUMBLOB

BYTES または STRING

該当なし

binary.handling.mode コネクター設定プロパティーの設定に基づいて、raw バイト (デフォルト)、base64 でエンコードされた文字列、base64-url-safe-encoded 文字列、または 16 進数でエンコードされた文字列のいずれか。

MEDIUMTEXT

STRING

該当なし

LONGBLOB

BYTES または STRING

該当なし

binary.handling.mode コネクター設定プロパティーの設定に基づいて、raw バイト (デフォルト)、base64 でエンコードされた文字列、base64-url-safe-encoded 文字列、または 16 進数でエンコードされた文字列のいずれか。

サイズが最大 2GB の値のみがサポートされます。Claim Check パターンを使用して、大きな列の値を外部化することが推奨されます。

LONGTEXT

STRING

n/a

2GB までのサイズの値のみがサポートされています。Claim Check パターンを使用して、大きな列の値を外部化することが推奨されます。

JSON

STRING

io.debezium.data.Json

JSON ドキュメント、配列、またはスカラーの文字列表現が含まれます。

ENUM

STRING

io.debezium.data.Enum

allowed スキーマパラメーターには、許可された値のコンマ区切りのリストが含まれます。

SET

STRING

io.debezium.data.EnumSet

allowed スキーマパラメーターには、許可された値のコンマ区切りのリストが含まれます。

YEAR[(2|4)]

INT32

io.debezium.time.Year

TIMESTAMP[(M)]

STRING

io.debezium.time.ZonedTimestamp

マイクロ秒精度の ISO 8601 形式。MariaDB では、M0 - 6 の範囲で指定できます。

時間型

TIMESTAMP データ型を除き、MariaDB の時間型は、time.precision.mode コネクター設定プロパティーの値に依存します。デフォルト値が CURRENT_TIMESTAMP または NOW として指定される TIMESTAMP 列では、Kafka Connect スキーマのデフォルト値として値 1970-01-01 00:00:00 が使用されます。

MariaDB では、ゼロ値が null 値よりも優先される場合があるため、DATEDATETIME、および TIMESTAMP 列にゼロ値を許可します。MariaDB コネクターは、列定義で null 値が許可されている場合はゼロ値を null 値として表し、列で null 値が許可されていない場合はエポック日として表します。

タイムゾーンのない時間型

DATETIME 型は、"2018-01-13 09:48:27" のようにローカルの日時を表します。タイムゾーンの情報は含まれません。このような列は、UTC を使用して列の精度に基づいてエポックミリ秒またはマイクロ秒に変換されます。TIMESTAMP 型は、タイムゾーン情報のないタイムスタンプを表します。MariaDB によって、書き込み時にサーバー (またはセッション) の現在のタイムゾーンから UTC に変換され、値を読み戻すときに UTC からサーバー (またはセッション) の現在のタイムゾーンに変換されます。以下に例を示します。

  • 値が 2018-06-20 06:37:03DATETIME は、1529476623000 になります。
  • 値が 2018-06-20 06:37:03TIMESTAMP2018-06-20T13:37:03Z になります。

このような列は、サーバー (またはセッション) の現在のタイムゾーンに基づいて、UTC の同等の io.debezium.time.ZonedTimestamp に変換されます。タイムゾーンは、デフォルトでサーバーからクエリーされます。

これが失敗した場合は、データベースの timezone MariaDB 設定オプションで明示的に指定する必要があります。たとえば、データベースのタイムゾーン (グローバルに設定されているか、timezone オプションを使用してコネクター用に設定されている) が "America/Los_Angeles" の場合、TIMESTAMP 値 "2018-06-20 06:37:03" は、値 "2018-06-20T13:37:03Z" を持つ ZonedTimestamp で表されます。

Kafka Connect および Debezium を実行する JVM のタイムゾーン設定は、これらの変換には影響しません。

時間値に関連するプロパティーの詳細は、MariaDB コネクター設定プロパティー のドキュメントを参照してください。

time.precision.mode=adaptive_time_microseconds(default)

MariaDB コネクターは、列のデータ型定義に基づいてリテラル型とセマンティック型を決定し、イベントがデータベース内の値を正確に表すようにします。すべての時間フィールドはマイクロ秒単位です。正しくキャプチャーされる TIME フィールドの値は、範囲が 00:00:00.000000 から 23:59:59.999999 までの正の値です。

Expand
表2.40 time.precision.mode=adaptive_time_microseconds の場合のマッピング
MariaDB タイプリテラル型セマンティック型

DATE

INT32

io.debezium.time.Date
エポックからの日数を表します。

TIME[(M)]

INT64

io.debezium.time.MicroTime
時間の値をマイクロ秒単位で表し、タイムゾーン情報は含まれません。MariaDB では、M0 - 6 の範囲で指定できます。

DATETIME, DATETIME(0), DATETIME(1), DATETIME(2), DATETIME(3)

INT64

io.debezium.time.Timestamp
エポックからの経過時間をミリ秒で表し、タイムゾーン情報は含まれません。

DATETIME(4), DATETIME(5), DATETIME(6)

INT64

io.debezium.time.MicroTimestamp
エポックからの経過時間をマイクロ秒で表し、タイムゾーン情報は含まれません。

time.precision.mode=connect

MariaDB コネクターは、定義済みの Kafka Connect 論理型を使用します。この方法はデフォルトの方法よりも精度が低く、データベース列に 3 を超える 少数秒の精度 値がある場合は、イベントの精度が低くなる可能性があります。00:00:00.000 から 23:59:59.999 までの値のみを処理できます。テーブルの time.precision.mode=connect の値が、必ずサポートされる範囲内になるようにすることができる場合のみ、TIME を設定します。connect 設定は、今後の Debezium バージョンで削除される予定です。

Expand
表2.41 time.precision.mode=connect の場合のマッピング
MariaDB タイプリテラル型セマンティック型

DATE

INT32

org.apache.kafka.connect.data.Date
エポックからの日数を表します。

TIME[(M)]

INT64

org.apache.kafka.connect.data.Time
午前 0 時以降の時間値をマイクロ秒で表し、タイムゾーン情報は含まれません。

DATETIME[(M)]

INT64

org.apache.kafka.connect.data.Timestamp
エポックからの経過時間をミリ秒で表し、タイムゾーン情報は含まれません。

10 進数型

Debezium コネクターは、decimal.handling.mode コネクター設定プロパティーの設定に従って小数を処理します。

decimal.handling.mode=precise
Expand
表2.42 decimal.handling.mode=precise の場合のマッピング
MariaDB タイプリテラル型セマンティック型

NUMERIC[(M[,D])]

BYTES

org.apache.kafka.connect.data.Decimal
scale スキーマパラメーターには、小数点を移動した桁数を表す整数が含まれます。

DECIMAL[(M[,D])]

BYTES

org.apache.kafka.connect.data.Decimal
scale スキーマパラメーターには、小数点を移動した桁数を表す整数が含まれます。

decimal.handling.mode=double
Expand
表2.43 decimal.handling.mode=double の場合のマッピング
MariaDB タイプリテラル型セマンティック型

NUMERIC[(M[,D])]

FLOAT64

該当なし

DECIMAL[(M[,D])]

FLOAT64

該当なし

decimal.handling.mode=string
Expand
表2.44 decimal.handling.mode=string の場合のマッピング
MariaDB タイプリテラル型セマンティック型

NUMERIC[(M[,D])]

STRING

該当なし

DECIMAL[(M[,D])]

STRING

該当なし

ブール値

MariaDB は、BOOLEAN 値を内部的に特定の方法で処理します。BOOLEAN 列は、内部で TINYINT(1) データ型にマッピングされます。ストリーミング中にテーブルが作成されると、Debezium は元の DDL を受信するため、適切な BOOLEAN マッピングが使用されます。スナップショットの作成中、Debezium は SHOW CREATE TABLE を実行して、BOOLEANTINYINT(1) の両方の列に TINYINT(1) を返すテーブル定義を取得します。その後、Debezium は元の型のマッピングを取得する方法はないため、TINYINT(1) にマッピングします。

ソース列をブールデータ型に変換できるように、Debezium は TinyIntOneToBooleanConverter カスタムコンバーター を提供しています。これは、以下のいずれかの方法で使用できます。

  • すべての TINYINT(1) または TINYINT(1) UNSIGNED 列を BOOLEAN 型にマップします。
  • 正規表現のコンマ区切りリストを使用して、列のサブセットを列挙します。
    このタイプの変換を使用するには、以下の例のように selector パラメーターを使用して converters 設定プロパティーを設定する必要があります。

    converters=boolean
    boolean.type=io.debezium.connector.binlog.converters.TinyIntOneToBooleanConverter
    boolean.selector=db1.table1.*, db1.table2.column1
    Copy to Clipboard Toggle word wrap
  • 注意: 場合によっては、スナップショットが SHOW CREATE TABLE を実行したときに、データベースが tinyint unsigned の長さを表示されないため、このコンバーターは機能しません。新しいオプション length.checker はこの問題を解決することができます。デフォルト値は true です。次の例に示すように、length.checker を無効にして、タイプに基づいてすべての列を変換するのではなく、変換が必要な列を selected プロパティーに指定します。

    converters=boolean
    boolean.type=io.debezium.connector.binlog.converters.TinyIntOneToBooleanConverter
    boolean.length.checker=false
    boolean.selector=db1.table1.*, db1.table2.column1
    Copy to Clipboard Toggle word wrap

空間型

現在、Debezium MariaDB コネクターは次の空間データ型をサポートしています。

Expand
表2.45 空間型マッピングの説明
MariaDB タイプリテラル型セマンティック型

GEOMETRY,
LINESTRING,
POLYGON,
MULTIPOINT,
MULTILINESTRING,
MULTIPOLYGON,
GEOMETRYCOLLECTION

STRUCT

io.debezium.data.geometry.Geometry
: フィールドが 2 つの構造が含まれます。

  • srid (INT32: 構造に保存されたジオメトリーオブジェクトの型を定義する、空間参照システム ID。
  • wkb (BYTES): wkb (Well-Known-Binary) 形式でエンコードされたジオメトリーオブジェクトのバイナリー表現。詳細は、Open Geospatial Consortium を参照してください。

2.2.4. MariaDB データを代替データ型にマッピングするためのカスタムコンバーター

デフォルトでは、Debezium MariaDB コネクターは、MariaDB データ型用の CustomConverter 実装を複数提供します。これらのカスタムコンバーターは、コネクター設定に基づいて特定のデータ型に対する代替マッピングを提供します。コネクターに CustomConverter を追加するには、カスタムコンバーターのドキュメント の指示に従ってください。

TINYINT(1) からブール値

デフォルトでは、コネクターのスナップショット中に、Debezium MariaDB コネクターは JDBC ドライバーから列タイプを取得し、BOOLEAN 列に TINYINT(1) タイプを割り当てます。Debezium はこれらの JDBC 列タイプを使用して、スナップショットイベントのスキーマを定義します。コネクターがスナップショットからストリーミングフェーズに移行した後、デフォルトのマッピングから生じる変更イベントスキーマによって、BOOLEAN 列のマッピングが不整合になる可能性があります。MariaDB が BOOLEAN 列を均一に出力するようにするには、次の設定例に示すように、カスタム TinyIntOneToBooleanConverter を適用できます。

例: TinyIntOneToBooleanConverter の設定

converters=tinyint-one-to-boolean
tinyint-one-to-boolean.type=io.debezium.connector.binlog.converters.TinyIntOneToBooleanConverter
tinyint-one-to-boolean.selector=.*.MY_TABLE.DATA
tinyint-one-to-boolean.length.checker=false
Copy to Clipboard Toggle word wrap

前の例では、selectorlength.checker プロパティーはオプションです。デフォルトでは、コンバーターは TINYINT データ型の長さが 1 であることをチェックします。length.checkerfalse の場合、コンバーターは TINYINT データ型の長さが 1 であることを明示的に確認しません。selector は、指定された正規表現に基づいて、変換するテーブルまたは列を指定します。selector プロパティーを省略すると、コンバーターはすべての TINYINT 列を論理 BOOL フィールドタイプにマップします。selector オプションを設定せず、TINYINT 列を TINYINT(1) にマップする場合は、length.checker プロパティーを省略するか、その値を true に設定します。

JDBC sink のデータ型

Debezium JDBC sink コネクターを Debezium MariaDB ソースコネクターと統合すると、MariaDB コネクターはスナップショットフェーズとストリーミングフェーズ中に一部の列属性を異なる方法で出力します。JDBC sink コネクターがスナップショットフェーズとストリーミングフェーズの両方からの変更を一貫して使用するには、次の例に示すように、JdbcSinkDataTypesConverter コンバータを MariaDB ソースコネクター設定の一部として含める必要があります。

例: JdbcSinkDataTypesConverter 設定

converters=jdbc-sink
jdbc-sink.type=io.debezium.connector.binlog.converters.JdbcSinkDataTypesConverter
jdbc-sink.selector.boolean=.*.MY_TABLE.BOOL_COL
jdbc-sink.selector.real=.*.MY_TABLE.REAL_COL
jdbc-sink.selector.string=.*.MY_TABLE.STRING_COL
jdbc-sink.treat.real.as.double=true
Copy to Clipboard Toggle word wrap

前の例では、selector.* および treat.real.as.double 設定プロパティーはオプションです。

selector.* プロパティーは、コンバーターが適用されるテーブルと列を指定する正規表現のコンマ区切りリストを指定します。デフォルトでは、コンバーターはすべてのテーブルに含まれるすべてのブール値、実数、および文字列ベースの列データ型に次のルールを適用します。

  • BOOLEAN データ型は常に INT16 論理型として出力され、1true0false を表します。
  • REAL データ型は常に FLOAT64 論理型として出力されます。
  • 文字列ベースの列には、列の文字セットを含む __debezium.source.column.character_set スキーマパラメーターが常に含まれます。

各データ型について、デフォルトのスコープをオーバーライドし、セレクターを特定のテーブルと列にのみ適用するセレクタールールを設定できます。たとえば、ブールコンバーターのスコープを設定するには、前の例のように、converters.jdbc-sink.selector.boolean=.*.MY_TABLE.BOOL_COL のルールをコネクター設定に追加します。

2.2.5. Debezium コネクターを実行するための MariaDB の設定

Debezium コネクターをインストールして実行する前に、MariaDB セットアップタスクが複数必要です。

詳細は以下を参照してください。

2.2.5.1. Debezium コネクター用の MariaDB ユーザーの作成

Debezium MariaDB コネクターには MariaDB ユーザーアカウントが必要です。この MariaDB ユーザーには、Debezium MariaDB コネクターが変更をキャプチャーするデータベースすべてに対して適切な権限が必要です。

前提条件

  • MariaDB サーバー。
  • SQL コマンドの基本知識。

手順

  1. MariaDB ユーザーを作成します。

    mariadb> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';
    Copy to Clipboard Toggle word wrap
  2. 必要なパーミッションをユーザーに付与します。

    mariadb> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
    Copy to Clipboard Toggle word wrap

    必要なパーミッションの説明は、表2.46「ユーザーパーミッションの説明」 を参照してください。

    重要

    グローバル読み取りロックを許可しない Amazon RDS や Amazon Aurora などのホストオプションを使用している場合、テーブルレベルのロックを使用して 整合性スナップショット を作成します。この場合、作成するユーザーに LOCK TABLES パーミッションも付与する必要があります。詳細は スナップショット を参照してください。

  3. ユーザーのパーミッションの最終処理を行います。

    mariadb> FLUSH PRIVILEGES;
    Copy to Clipboard Toggle word wrap
    Expand
    表2.46 ユーザーパーミッションの説明
    キーワード説明

    SELECT

    コネクターがデータベースのテーブルから行を選択できるようにします。これは、スナップショットを実行する場合にのみ使用されます。

    RELOAD

    内部キャッシュのクリアまたはリロード、テーブルのフラッシュ、またはロックの取得を行う FLUSH ステートメントをコネクターが使用できるようにします。これは、スナップショットを実行する場合にのみ使用されます。

    SHOW DATABASES

    SHOW DATABASE ステートメントを実行して、コネクターがデータベース名を確認できるようにします。これは、スナップショットを実行する場合にのみ使用されます。

    REPLICATION-SLAVE

    コネクターが MariaDB サーバーの binlog に接続して読み取ることができるようになります。

    REPLICATION CLIENT

    コネクターが以下のステートメントを使用できるようにします。

    • SHOW MASTER STATUS
    • SHOW SLAVE STATUS
    • SHOW BINARY LOGS

    これは必ずコネクターに必要です。

    ON

    パーミッションが適用されるデータベースを指定します。

    TO 'user'

    パーミッションを付与するユーザーを指定します。

    IDENTIFIED BY 'password'

    ユーザーの MariaDB パスワードを指定します。

2.2.5.2. Debezium の MariaDB バイナリーログの有効化

MariaDB レプリケーションのバイナリーログを有効にする必要があります。バイナリーログは、レプリカが変更を伝播できるようにトランザクションの更新を記録します。

前提条件

  • MariaDB サーバー。
  • 適切な MariaDB ユーザー権限。

手順

  1. log-bin オプションが有効になっているかどうかを確認します。

    mariadb> SHOW VARIABLES LIKE '%log_bin%';
    Copy to Clipboard Toggle word wrap
  2. binlog が OFF の場合は、次の表のプロパティーを MariaDB サーバーの設定ファイルに追加します。

    server-id         = 223344 # Querying variable is called server_id, e.g. SELECT variable_value FROM information_schema.global_variables WHERE variable_name='server_id';
    log_bin                     = mariadb-bin
    binlog_format               = ROW
    binlog_row_image            = FULL
    binlog_expire_logs_seconds  = 864000
    Copy to Clipboard Toggle word wrap
  3. 再度 binlog の状態をチェックして、変更を確認します。

    mariadb> SHOW VARIABLES LIKE '%log_bin%';
    Copy to Clipboard Toggle word wrap
  4. Amazon RDS で MariaDB を実行する場合、バイナリーログを実行するには、データベースインスタンスの自動バックアップを有効にする必要があります。データベースインスタンスが自動バックアップを実行するように設定されていないと、前の手順で説明した設定を適用しても binlog は無効になります。

    Expand
    表2.47 MariaDB binlog 設定プロパティーの説明
    プロパティー説明

    server-id

    server-id の値は、MariaDB クラスター内の各サーバーおよびレプリケーションクライアントごとに一意である必要があります。

    log_bin

    log_bin の値は、binlog ファイルのシーケンスのベース名です。

    binlog_format

    binlog-formatROW または row に設定する必要があります。

    binlog_row_image

    binlog_row_imageFULL または full に設定する必要があります。

    binlog_expire_logs_seconds

    binlog_expire_logs_seconds は、非推奨のシステム変数 expire_logs_days に対応します。これは、binlog ファイルを自動的に削除する秒数です。デフォルト値は 2592000 で、つまり、30 日です。実際の環境に見合った値を設定します。詳細は、MariaDB による binlog ファイルの消去 を参照してください。

2.2.5.3. Debezium の MariaDB グローバルトランザクション識別子の有効化

グローバルトランザクション識別子 (GTID) は、クラスター内のサーバーで発生するトランザクションを一意に識別します。Debezium MariaDB コネクターでは必須ではありませんが、GTID を使用するとレプリケーションが簡素化され、プライマリーサーバーとレプリカサーバーの整合性が保たれているかどうかを簡単に確認できるようになります。

MariaDB の場合、GTID はデフォルトで有効になっており、追加の設定は必要ありません。

2.2.5.4. Debezium の MariaDB セッションタイムアウトの設定

大規模なデータベースに対して最初の整合性スナップショットが作成されると、テーブルの読み込み時に、確立された接続がタイムアウトする可能性があります。MariaDB 設定ファイルで interactive_timeoutwait_timeout を設定し、この動作を防ぐことができます。

前提条件

  • MariaDB サーバー。
  • SQL コマンドの基本知識。
  • MariaDB 設定ファイルへのアクセス。

手順

  1. interactive_timeout を設定します。

    mariadb> interactive_timeout=<duration-in-seconds>
    Copy to Clipboard Toggle word wrap
  2. wait_timeout を設定します。

    mariadb> wait_timeout=<duration-in-seconds>
    Copy to Clipboard Toggle word wrap
    Expand
    表2.48 MariaDB セッションタイムアウトオプションの説明
    オプション説明

    interactive_timeout

    サーバーが対話的な接続を閉じる前にアクティビティーの発生を待つ時間 (秒単位)。詳細は、MariaDB のドキュメント を参照してください。

    wait_timeout

    サーバーが非対話型接続を閉じる前にアクティビティーを待機する秒数。詳細は、MariaDB のドキュメント を参照してください。

2.2.5.5. Debezium MariaDB コネクターのクエリーログイベントの有効化

各 binlog イベントの元の SQL ステートメントを確認したい場合があります。MariaDB 設定で binlog_annotate_row_events オプションを有効にすると、これを実行できます。

前提条件

  • MariaDB サーバー。
  • SQL コマンドの基本知識。
  • MariaDB 設定ファイルへのアクセス。

手順

  • MariaDB で binlog_annotate_row_events を有効にします。

    mariadb> binlog_annotate_row_events=ON
    Copy to Clipboard Toggle word wrap

    binlog_annotate_row_events は、binlog エントリーに SQL ステートメントが含まれるようにするためのサポートを有効または無効にする値に設定されます。

    • ON = 有効化
    • OFF = 無効化

2.2.5.6. Debezium MariaDB コネクターの binlog 行値オプションの検証

データベース内の binlog_row_value_options 変数の設定を確認します。コネクターが UPDATE イベントを消費できるようにするには、この変数を PARTIAL_JSON 以外の値に設定する必要があります。

前提条件

  • MariaDB サーバー。
  • SQL コマンドの基本知識。
  • MariaDB 設定ファイルへのアクセス。

手順

  1. 現在の変数値を確認する

    mariadb> show global variables where variable_name = 'binlog_row_value_options';
    Copy to Clipboard Toggle word wrap

    結果

    +--------------------------+-------+
    | Variable_name            | Value |
    +--------------------------+-------+
    | binlog_row_value_options |       |
    +--------------------------+-------+
    Copy to Clipboard Toggle word wrap

  2. 変数の値が PARTIAL_JSON に設定されている場合、次のコマンドを実行して設定を解除します。

    mariadb> set @@global.binlog_row_value_options="" ;
    Copy to Clipboard Toggle word wrap

2.2.6. Debezium MariaDB コネクターのデプロイメント

Debezium MariaDB コネクターをデプロイするには、次のいずれかの方法を使用できます。

2.2.6.1. Streams for Apache Kafka を使用した MariaDB コネクターのデプロイメント

Debezium コネクターのデプロイで推奨される方法は、Streams for Apache Kafka を使用して、コネクタープラグインを含む Kafka Connect コンテナーイメージを構築することです。

デプロイメントプロセス中に、以下のカスタムリソース (CR) を作成し、使用します。

  • Kafka Connect インスタンスを定義し、コネクターアーティファクトに関する情報をイメージに含める必要がある KafkaConnect CR。
  • コネクターがソースデータベースにアクセスするために使用する情報を提供する KafkaConnector CR。Streams for Apache Kafka が Kafka Connect Pod を起動した後、KafkaConnector CR を適用してコネクターを起動します。

Kafka Connect イメージのビルド仕様では、デプロイ可能なコネクターを指定できます。各コネクタープラグインに対して、デプロイメントに利用可能にする他のコンポーネントを指定することもできます。たとえば、Apicurio Registry アーティファクトや Debezium スクリプトコンポーネントを追加できます。Streams for Apache Kafka は、Kafka Connect イメージをビルドするときに、指定されたアーティファクトをダウンロードし、それをイメージに組み込みます。

Kafka Connect CR の spec.build.output パラメーターは、生成される KafkaConnect コンテナーイメージを格納する場所を指定します。コンテナーイメージは、quay.io などのコンテナーレジストリー、または OpenShift ImageStream に保存できます。イメージを ImageStream に保存するには、Kafka Connect をデプロイする前に ImageStream を作成する必要があります。ImageStreams は自動的に作成されません。

注記

KafkaConnect リソースを使用してクラスターを作成する場合は、Kafka Connect REST API を使用してコネクターを作成または更新できません。ただし、REST API を使用して情報を取得できます。

関連情報

2.2.6.2. Streams for Apache Kafka を使用した Debezium MariaDB コネクターのデプロイ

以前のバージョンの Streams for Apache Kafka では、OpenShift に Debezium コネクターをデプロイするには、まずコネクター用の Kafka Connect イメージをビルドする必要がありました。OpenShift にコネクターをデプロイするための現在の推奨方法は、Streams for Apache Kafka のビルド設定を使用して、使用する Debezium コネクタープラグインを含む Kafka Connect コンテナーイメージを自動的にビルドすることです。

ビルドプロセス中に、Streams for Apache Kafka Operator は、Debezium コネクター定義を含む KafkaConnect カスタムリソースの入力パラメーターを Kafka Connect コンテナーイメージに変換します。このビルドは、Red Hat Maven リポジトリーまたは別の設定済みの HTTP サーバーから必要なアーティファクトをダウンロードします。

新規に作成されたコンテナーは .spec.build.output に指定されるコンテナーレジストリーにプッシュされ、Kafka Connect クラスターのデプロイに使用されます。Streams for Apache Kafka が Kafka Connect イメージをビルドした後、ビルドに含まれるコネクターを起動するための KafkaConnector カスタムリソースを作成します。

前提条件

  • クラスター Operator がインストールされている OpenShift クラスターにアクセスできる。
  • Streams for Apache Kafka Operator が実行されている。
  • Apache Kafka クラスターが Streams for Apache Kafka on OpenShift のデプロイと管理 に記載されているとおりにデプロイされている。
  • Kafka Connect が Streams for Apache Kafka にデプロイされている。
  • Red Hat build of Debezium のライセンスを所有している。
  • OpenShift oc CLI クライアントがインストールされている、または OpenShift Container Platform Web コンソールにアクセスできる。
  • Kafka Connect ビルドイメージの保存方法に応じて、レジストリーのパーミッションを用意するか、ImageStream リソースを作成している。

    ビルドイメージを Red Hat Quay.io または Docker Hub などのイメージレジストリーに保存する場合は、以下が必要です。
    • レジストリーでイメージを作成し、管理するためのアカウントおよびパーミッション
    ビルドイメージをネイティブ OpenShift ImageStream として保存する場合は、以下を行います。
    • ImageStream リソースを新規コンテナーイメージを保存するためにクラスターにデプロイします。クラスターの ImageStream を明示的に作成する必要があります。ImageStreams は、デフォルトでは利用できません。ImageStreams の詳細は、OpenShift Container Platform ドキュメント イメージストリームの管理 を参照してください。

手順

  1. OpenShift クラスターにログインします。
  2. コネクターの Debezium KafkaConnect カスタムリソース (CR) を作成するか、既存のリソースを変更します。たとえば、metadata.annotations および spec.build プロパティーを指定する dbz-connect.yaml という名前の KafkaConnect CR を作成します。以下の例は、KafkaConnect カスタムリソースを記述する dbz-connect.yaml ファイルからの抜粋を示しています。

    例2.11 Debezium コネクターを含む KafkaConnect カスタムリソースを定義した dbz-connect.yaml ファイル

    次の例では、カスタムリソースは、次のアーティファクトをダウンロードするように設定されています。

    • Debezium MariaDB コネクターアーカイブ。
    • Red Hat build of Apicurio Registry アーカイブApicurio Registry はオプションのコンポーネントです。コネクターで Avro シリアル化を使用する場合にのみ、Apicurio Registry コンポーネントを追加します。
    • Debezium スクリプティング SMT アーカイブと、Debezium コネクターで使用する関連スクリプティングエンジン。SMT アーカイブとスクリプト言語の依存関係はオプションのコンポーネントです。Debezium コンテンツベースのルーティング SMT または フィルター SMT を使用する場合にのみ、これらのコンポーネントを追加します。
    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnect
    metadata:
      name: debezium-kafka-connect-cluster
      annotations:
        strimzi.io/use-connector-resources: "true" 
    1
    
    spec:
      version: 3.9.0
      build: 
    2
    
        output: 
    3
    
          type: imagestream  
    4
    
          image: debezium-streams-connect:latest
        plugins: 
    5
    
          - name: debezium-connector-mariadb
            artifacts:
              - type: zip 
    6
    
                url: https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-mariadb/3.0.8.Final-redhat-00004/debezium-connector-mariadb-3.0.8.Final-redhat-00004-plugin.zip  
    7
    
              - type: zip
                url: https://maven.repository.redhat.com/ga/io/apicurio/apicurio-registry-distro-connect-converter/2.5.11.redhat-00001/apicurio-registry-distro-connect-converter-2.5.11.redhat-00001.zip  
    8
    
              - type: zip
                url: https://maven.repository.redhat.com/ga/io/debezium/debezium-scripting/3.0.8.Final-redhat-00004/debezium-scripting-3.0.8.Final-redhat-00004.zip 
    9
    
              - type: jar
                url: https://repo1.maven.org/maven2/org/apache/groovy/groovy/3.0.11/groovy-3.0.11.jar  
    10
    
              - type: jar
                url: https://repo1.maven.org/maven2/org/apache/groovy/groovy-jsr223/3.0.11/groovy-jsr223-3.0.11.jar
              - type: jar
                url: https://repo1.maven.org/maven2/org/apache/groovy/groovy-json3.0.11/groovy-json-3.0.11.jar
    
      bootstrapServers: debezium-kafka-cluster-kafka-bootstrap:9093
    
      ...
    Copy to Clipboard Toggle word wrap
    Expand
    表2.49 Kafka Connect 設定の説明
    項目説明

    1

    strimzi.io/use-connector-resources アノテーションを "true" に設定して、クラスター Operator が KafkaConnector リソースを使用してこの Kafka Connect クラスター内のコネクターを設定できるようにします。

    2

    spec.build 設定は、ビルドイメージの保存場所を指定し、プラグインアーティファクトの場所とともにイメージに追加するプラグインをリストします。

    3

    build.output は、新しくビルドされたイメージを保存するレジストリーを指定します。

    4

    イメージ出力の名前およびイメージ名を指定します。output.type の有効な値は、Docker Hub や Quay などのコンテナーレジストリーにプッシュする場合は docker、内部の OpenShift ImageStream にイメージをプッシュする場合は imagestream です。ImageStream を使用するには、ImageStream リソースをクラスターにデプロイする必要があります。KafkaConnect 設定で build.output を指定する方法の詳細は、Streams for Apache Kafka API リファレンスの スキーマ参照のビルド を参照してください。

    5

    plugins 設定は、Kafka Connect イメージに追加するすべてのコネクターをリストします。リストの各エントリーについて、プラグイン name と、コネクターのビルドに必要なアーティファクトに関する情報を指定します。必要に応じて、各コネクタープラグインに対して、コネクターと使用できる他のコンポーネントを含めることができます。たとえば、Service Registry アーティファクトまたは Debezium スクリプトコンポーネントを追加できます。

    6

    artifacts.type の値は、artifacts.url で指定するアーティファクトのファイルタイプを指定します。有効なタイプは ziptgz、または jar です。Debezium コネクターアーカイブは、.zip ファイル形式で提供されます。type の値は、url フィールドで参照されるファイルのタイプと一致させる必要があります。

    7

    artifacts.url の値は、コネクターアーティファクトのファイルを格納する Maven リポジトリーなどの HTTP サーバーのアドレスを指定します。Debezium コネクターアーティファクトは Red Hat リポジトリーで入手できます。OpenShift クラスターが指定されたサーバーにアクセスできる必要があります。

    8

    (オプション) Apicurio Registry コンポーネントをダウンロードするためのアーティファクト typeurl を指定します。デフォルトの JSON コンバーターを使用する代わりに、コネクターが Apache Avro を使用して Red Hat build of Apicurio Registry でイベントのキーと値をシリアル化する場合にのみ、Apicurio Registry アーティファクトを含めます。

    9

    (オプション) Debezium コネクターで使用する Debezium スクリプト SMT アーカイブのアーティファクト typeurl を指定します。Debezium コンテンツベースルーティング SMT または フィルター SMT を使用する場合にのみ、スクリプト SMT を含めます。スクリプト SMT を使用するには、groovy などの JSR 223 準拠のスクリプト実装もデプロイする必要があります。

    10

    (オプション) JSR 223 準拠のスクリプト実装の JAR ファイルのアーティファクト typeurl を指定します。これは、Debezium スクリプト SMT で必要です。

    重要

    Streams for Apache Kafka を使用してコネクタープラグインを Kafka Connect イメージに組み込む場合、必要なスクリプト言語コンポーネントごとに、artifacts.url に JAR ファイルのロケーションを指定し、artifacts.type の値も jar に設定する必要があります。値が無効な場合は、実行時にコネクターが失敗します。

    スクリプト SMT で Apache Groovy 言語を使用できるようにするために、この例のカスタムリソースは、次のライブラリーの JAR ファイルを取得します。

    • groovy
    • groovy-jsr223 (スクリプトエージェント)
    • groovy-json (JSON 文字列を解析するためのモジュール)

    別の方法として、Debezium スクリプト SMT は、GraalVM JavaScript の JSR 223 実装の使用もサポートします。

  3. 以下のコマンドを入力して、KafkaConnect ビルド仕様を OpenShift クラスターに適用します。

    oc create -f dbz-connect.yaml
    Copy to Clipboard Toggle word wrap

    Streams Operator はカスタムリソースで指定された設定に基づいて、デプロイする Kafka Connect イメージを準備します。
    ビルドが完了すると、Operator はイメージを指定されたレジストリーまたは ImageStream にプッシュし、Kafka Connect クラスターを起動します。設定にリスト表示されているコネクターアーティファクトはクラスターで利用できます。

  4. KafkaConnector リソースを作成し、デプロイする各コネクターのインスタンスを定義します。
    たとえば、次の KafkaConnector CR を作成し、mariadb-inventory-connector.yaml として保存します。

    例2.12 Debezium コネクターの KafkaConnector カスタムリソースを定義する mariadb-inventory-connector.yaml ファイル

        apiVersion: kafka.strimzi.io/v1beta2
        kind: KafkaConnector
        metadata:
          labels:
            strimzi.io/cluster: debezium-kafka-connect-cluster
          name: inventory-connector-mariadb 
    1
    
        spec:
          class: io.debezium.connector.mariadb.MariaDbConnector 
    2
    
          tasksMax: 1  
    3
    
          config:  
    4
    
            schema.history.internal.kafka.bootstrap.servers: debezium-kafka-cluster-kafka-bootstrap.debezium.svc.cluster.local:9092
            schema.history.internal.kafka.topic: schema-changes.inventory
            database.hostname: mariadb.debezium-mariadb.svc.cluster.local 
    5
    
            database.port: 3306   
    6
    
            database.user: debezium  
    7
    
            database.password: dbz  
    8
    
            database.server.id: 184054 
    9
    
            topic.prefix: inventory-connector-mariadb 
    10
    
            table.include.list: inventory.*  
    11
    
    
            ...
    Copy to Clipboard Toggle word wrap
    Expand
    表2.50 コネクター設定の説明
    項目説明

    1

    Kafka Connect クラスターに登録するコネクターの名前。

    2

    コネクタークラスの名前。

    3

    同時に動作できるタスクの数。

    4

    コネクターの設定。

    5

    ホストデータベースインスタンスのアドレス。

    6

    データベースインスタンスのポート番号。

    7

    Debezium がデータベースへの接続に使用するアカウントの名前。

    8

    Debezium がデータベースユーザーアカウントに接続するために使用するパスワード。

    9

    コネクターの一意の ID (数値)。

    10

    データベースインスタンスまたはクラスターのトピック接頭辞。
    指定する名前は、英数字またはアンダースコアのみで設定する必要があります。
    トピック接頭辞は、このコネクターから変更イベントを受信する Kafka トピックの接頭辞として使用されるため、名前はクラスターのコネクター間で一意である必要があります。
    コネクターを Avro コネクター と統合する場合、この namespace は、関連する Kafka Connect スキーマの名前や、対応する Avro スキーマの namespace でも使用されます。

    11

    コネクターが変更イベントをキャプチャーするテーブルのリスト。

  5. 以下のコマンドを実行してコネクターリソースを作成します。

    oc create -n <namespace> -f <kafkaConnector>.yaml
    Copy to Clipboard Toggle word wrap

    以下に例を示します。

    oc create -n debezium -f mariadb-inventory-connector.yaml
    Copy to Clipboard Toggle word wrap

    コネクターは Kafka Connect クラスターに登録され、KafkaConnector CR の spec.config.database.dbname で指定されたデータベースに対して実行を開始します。コネクター Pod の準備ができると、Debezium が実行されます。

これで、Debezium MariaDB デプロイメントを検証する 準備が整いました。

Debezium MariaDB コネクターをデプロイするには、Debezium コネクターアーカイブが含まれるカスタム Kafka Connect コンテナーイメージをビルドし、このコンテナーイメージをコンテナーレジストリーにプッシュする必要があります。次に、以下のカスタムリソース (CR) を作成する必要があります。

  • Kafka Connect インスタンスを定義する KafkaConnect CR。image は Debezium コネクターを実行するために作成したイメージの名前を指定します。この CR は、Red Hat Streams for Apache Kafka がデプロイされている OpenShift インスタンスに適用します。Streams for Apache Kafka は、Apache Kafka を OpenShift に導入する Operator とイメージを提供します。
  • Debezium MariaDB コネクターを定義する KafkaConnector CR。この CR を KafkaConnect CR を適用するのと同じ OpenShift インスタンスに適用します。

前提条件

  • MariaDB が実行中であり、Debezium コネクターで動作するように MariaDB を設定する 手順が完了しました。
  • Streams for Apache Kafka が OpenShift にデプロイされ、Apache Kafka および Kafka Connect が実行されている。詳細は、Streams for Apache Kafka on OpenShift のデプロイと管理 を参照してください。
  • Podman または Docker がインストールされている。
  • Debezium コネクターを実行するコンテナーを追加する予定のコンテナーレジストリー (quay.iodocker.io など) でコンテナーを作成および管理するアカウントとパーミッションを持っている。

手順

  1. Kafka Connect 用の Debezium MariaDB コンテナーを作成します。

    1. registry.redhat.io/amq-streams/kafka-39-rhel9:2.9.0 をベースイメージとして使用する Dockerfile を作成します。たとえば、ターミナルウィンドウから、以下のコマンドを入力します。

      cat <<EOF >debezium-container-for-mariadb.yaml 
      1
      
      FROM registry.redhat.io/amq-streams/kafka-39-rhel9:2.9.0
      USER root:root
      RUN mkdir -p /opt/kafka/plugins/debezium 
      2
      
      RUN cd /opt/kafka/plugins/debezium/ \
      && curl -O https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-mariadb/3.0.8.Final-redhat-00004/debezium-connector-mariadb-3.0.8.Final-redhat-00004-plugin.zip \
      && unzip debezium-connector-mariadb-3.0.8.Final-redhat-00004-plugin.zip \
      && rm debezium-connector-mariadb-3.0.8.Final-redhat-00004-plugin.zip
      RUN cd /opt/kafka/plugins/debezium/
      USER 1001
      EOF
      Copy to Clipboard Toggle word wrap
      Expand
      項目説明

      1

      任意のファイル名を指定できます。

      2

      Kafka Connect プラグインディレクトリーへのパスを指定します。Kafka Connect のプラグインディレクトリーが別の場所にある場合は、このパスを実際のディレクトリーのパスに置き換えてください。

      このコマンドは、現在のディレクトリーに debezium-container-for-mariadb.yaml という名前の Dockerfile を作成します。

    2. 前の手順で作成した debezium-container-for-mariadb.yaml Docker ファイルからコンテナーイメージをビルドします。ファイルが含まれるディレクトリーから、ターミナルウィンドウを開き、以下のコマンドのいずれかを入力します。

      podman build -t debezium-container-for-mariadb:latest .
      Copy to Clipboard Toggle word wrap
      docker build -t debezium-container-for-mariadb:latest .
      Copy to Clipboard Toggle word wrap

      上記のコマンドは debezium-container-for-mariadb という名前のコンテナーイメージを構築します。

    3. カスタムイメージを quay.io などのコンテナーレジストリーまたは内部のコンテナーレジストリーにプッシュします。コンテナーレジストリーは、イメージをデプロイする OpenShift インスタンスで利用できる必要があります。以下のいずれかのコマンドを実行します。

      podman push <myregistry.io>/debezium-container-for-mariadb:latest
      Copy to Clipboard Toggle word wrap
      docker push <myregistry.io>/debezium-container-for-mariadb:latest
      Copy to Clipboard Toggle word wrap
    4. 新しい Debezium MariaDB KafkaConnect カスタムリソース (CR) を作成します。たとえば、annotations および image プロパティーを指定する dbz-connect.yaml という名前の KafkaConnect CR を作成します。以下の例は、KafkaConnect カスタムリソースを記述する dbz-connect.yaml ファイルからの抜粋を示しています。

      apiVersion: kafka.strimzi.io/v1beta2
      kind: KafkaConnect
      metadata:
        name: my-connect-cluster
        annotations:
          strimzi.io/use-connector-resources: "true" 
      1
      
      spec:
        #...
        image: debezium-container-for-mariadb  
      2
      
      
        ...
      Copy to Clipboard Toggle word wrap
      Expand
      項目説明

      1

      KafkaConnector リソースはこの Kafka Connect クラスターでコネクターを設定するために使用されることを、metadata.annotations は Cluster Operator に示します。

      2

      spec.image は Debezium コネクターを実行するために作成したイメージの名前を指定します。設定された場合、このプロパティーによって Cluster Operator の STRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE 変数がオーバーライドされます。

    5. 以下のコマンドを入力して、KafkaConnect CR を OpenShift Kafka Connect 環境に適用します。

      oc create -f dbz-connect.yaml
      Copy to Clipboard Toggle word wrap

      このコマンドは、Debezium コネクターを実行するために作成したイメージの名前を指定する Kafka Connect インスタンスを追加します。

  2. Debezium MariaDB コネクターインスタンスを設定する KafkaConnector カスタムリソースを作成します。

    Debezium MariaDB コネクターは、コネクターの設定プロパティーを指定する .yaml ファイルで設定します。コネクター設定は、Debezium に対して、スキーマおよびテーブルのサブセットにイベントを生成するよう指示する可能性があり、または機密性の高い、大きすぎる、または不必要な指定のコラムで Debezium が値を無視、マスク、または切り捨てするようにプロパティーを設定する可能性もあります。

    次の例では、ポート 3306 で MariaDB ホスト 192.168.99.100 に接続し、インベントリー データベースへの変更をキャプチャーする Debezium コネクターを設定します。dbserver1 は、サーバーの論理名です。

    MariaDB inventory-connector.yaml

      apiVersion: kafka.strimzi.io/v1beta2
      kind: KafkaConnector
      metadata:
        name: inventory-connector-mariadb  
    1
    
        labels:
          strimzi.io/cluster: my-connect-cluster
      spec:
        class: io.debezium.connector.mariadb.MariaDbConnector
        tasksMax: 1  
    2
    
        config:  
    3
    
          database.hostname: mariadb  
    4
    
          database.port: 3306
          database.user: debezium
          database.password: dbz
          database.server.id: 184054  
    5
    
          topic.prefix: inventory-connector-mariadb 
    6
    
          table.include.list: inventory  
    7
    
          schema.history.internal.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092  
    8
    
          schema.history.internal.kafka.topic: schema-changes.inventory  
    9
    Copy to Clipboard Toggle word wrap

    Expand
    表2.51 コネクター設定の説明
    項目説明

    1

    コネクターの名前。

    2

    一度に実行できるタスクは 1 つだけです。MariaDB コネクターは MariaDB サーバーの binlog を読み取るため、単一のコネクタータスクを使用すると適切な順序とイベント処理が保証されます。Kafka Connect サービスはコネクターを使用して作業を行う 1 つ以上のタスクを開始し、実行中のタスクを自動的に Kafka Connect サービスのクラスター全体に分散します。いずれかのサービスが停止またはクラッシュすると、これらのタスクは稼働中のサービスに再分散されます。

    3

    コネクターの設定。

    4

    データベースホスト。MariaDB サーバー (mariadb) を実行しているコンテナーの名前です。

    5

    connector の一意 ID。

    6

    MariaDB サーバーまたはクラスターのトピック接頭辞。この名前は、変更イベントレコードを受信するすべての Kafka トピックの接頭辞として使用されます。

    7

    コネクターは インベントリー テーブルからのみ変更をキャプチャーします。

    8

    DDL ステートメントをデータベーススキーマ履歴トピックに書き込み、復元するためにコネクターによって使用される Kafka ブローカーのリスト。再起動時に、コネクターが読み取りを開始すべき時点で binlog に存在したデータベースのスキーマを復元します。

    9

    データベーススキーマ履歴トピックの名前。このトピックは内部使用のみを目的としており、コンシューマーが使用しないようにしてください。

  3. Kafka Connect でコネクターインスタンスを作成します。たとえば、KafkaConnector リソースを inventory-connector.yaml ファイルに保存した場合は、以下のコマンドを実行します。

    oc apply -f inventory-connector.yaml
    Copy to Clipboard Toggle word wrap

    上記のコマンドは inventory-connector を登録し、コネクターは KafkaConnector CR に定義されている inventory データベースに対して実行を開始します。

Debezium MariaDB コネクターに設定できる設定プロパティーの完全なリストについては、MariaDB コネクター設定プロパティー を参照してください。

結果

コネクターが起動すると、コネクターが設定されている MariaDB データベースの 一貫性のあるスナップショットが実行されます。その後、コネクターは行レベルの操作のデータ変更イベントの生成を開始し、変更イベントレコードを Kafka トピックにストリーミングします。

2.2.6.4. Debezium MariaDB コネクターが動作していることの確認

コネクターがエラーなしで正常に起動すると、コネクターがキャプチャーするように設定された各テーブルのトピックが作成されます。ダウンストリームアプリケーションは、これらのトピックをサブスクライブして、ソースデータベースで発生する情報イベントを取得できます。

コネクターが実行されていることを確認するには、OpenShift Container Platform Web コンソールまたは OpenShift CLI ツール (oc) から以下の操作を実行します。

  • コネクターのステータスを確認します。
  • コネクターがトピックを生成していることを確認します。
  • 各テーブルの最初のスナップショットの実行中にコネクターが生成する読み取り操作 ("op":"r") のイベントがトピックに反映されていることを確認します。

前提条件

  • Debezium コネクターが Streams for Apache Kafka on OpenShift にデプロイされている。
  • OpenShift oc CLI クライアントがインストールされている。
  • OpenShift Container Platform Web コンソールにアクセスできる。

手順

  1. 以下の方法のいずれかを使用して KafkaConnector リソースのステータスを確認します。

    • OpenShift Container Platform Web コンソールから以下を実行します。

      1. Home Search に移動します。
      2. Search ページで Resources をクリックし、Select Resource ボックスを開き、KafkaConnector を入力します。
      3. KafkaConnectors リストから、チェックするコネクターの名前をクリックします (例: inventory-connector-mariadb)。
      4. Conditions セクションで、Type および Status 列の値が Ready および True に設定されていることを確認します。
    • ターミナルウィンドウから以下を実行します。

      1. 以下のコマンドを実行します。

        oc describe KafkaConnector <connector-name> -n <project>
        Copy to Clipboard Toggle word wrap

        以下に例を示します。

        oc describe KafkaConnector inventory-connector-mariadb -n debezium
        Copy to Clipboard Toggle word wrap

        このコマンドは、以下の出力のようなステータス情報を返します。

        例2.13 KafkaConnector リソースのステータス

        Name:         inventory-connector-mariadb
        Namespace:    debezium
        Labels:       strimzi.io/cluster=debezium-kafka-connect-cluster
        Annotations:  <none>
        API Version:  kafka.strimzi.io/v1beta2
        Kind:         KafkaConnector
        
        ...
        
        Status:
          Conditions:
            Last Transition Time:  2021-12-08T17:41:34.897153Z
            Status:                True
            Type:                  Ready
          Connector Status:
            Connector:
              State:      RUNNING
              worker_id:  10.131.1.124:8083
            Name:         inventory-connector-mariadb
            Tasks:
              Id:               0
              State:            RUNNING
              worker_id:        10.131.1.124:8083
            Type:               source
          Observed Generation:  1
          Tasks Max:            1
          Topics:
            inventory-connector-mariadb.inventory
            inventory-connector-mariadb.inventory.addresses
            inventory-connector-mariadb.inventory.customers
            inventory-connector-mariadb.inventory.geom
            inventory-connector-mariadb.inventory.orders
            inventory-connector-mariadb.inventory.products
            inventory-connector-mariadb.inventory.products_on_hand
        Events:  <none>
        Copy to Clipboard Toggle word wrap
  2. コネクターによって Kafka トピックが作成されたことを確認します。

    • OpenShift Container Platform Web コンソールから以下を実行します。

      1. Home Search に移動します。
      2. Search ページで Resources をクリックし、Select Resource ボックスを開き、KafkaTopic を入力します。
      3. KafkaTopics リストから確認するトピックの名前をクリックします (例: inventory-connector-mariadb.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d)。
      4. Conditions セクションで、Type および Status 列の値が Ready および True に設定されていることを確認します。
    • ターミナルウィンドウから以下を実行します。

      1. 以下のコマンドを実行します。

        oc get kafkatopics
        Copy to Clipboard Toggle word wrap

        このコマンドは、以下の出力のようなステータス情報を返します。

        例2.14 KafkaTopic リソースのステータス

        NAME                                                                    CLUSTER               PARTITIONS   REPLICATION FACTOR   READY
        connect-cluster-configs                                                 debezium-kafka-cluster   1            1                    True
        connect-cluster-offsets                                                 debezium-kafka-cluster   25           1                    True
        connect-cluster-status                                                  debezium-kafka-cluster   5            1                    True
        consumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a             debezium-kafka-cluster   50           1                    True
        inventory-connector-mariadb--a96f69b23d6118ff415f772679da623fbbb99421                               debezium-kafka-cluster   1            1                    True
        inventory-connector-mariadb.inventory.addresses---1b6beaf7b2eb57d177d92be90ca2b210c9a56480          debezium-kafka-cluster   1            1                    True
        inventory-connector-mariadb.inventory.customers---9931e04ec92ecc0924f4406af3fdace7545c483b          debezium-kafka-cluster   1            1                    True
        inventory-connector-mariadb.inventory.geom---9f7e136091f071bf49ca59bf99e86c713ee58dd5               debezium-kafka-cluster   1            1                    True
        inventory-connector-mariadb.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d             debezium-kafka-cluster   1            1                    True
        inventory-connector-mariadb.inventory.products---df0746db116844cee2297fab611c21b56f82dcef           debezium-kafka-cluster   1            1                    True
        inventory-connector-mariadb.inventory.products_on_hand---8649e0f17ffcc9212e266e31a7aeea4585e5c6b5   debezium-kafka-cluster   1            1                    True
        schema-changes.inventory                                                debezium-kafka-cluster   1            1                    True
        strimzi-store-topic---effb8e3e057afce1ecf67c3f5d8e4e3ff177fc55          debezium-kafka-cluster   1            1                    True
        strimzi-topic-operator-kstreams-topic-store-changelog---b75e702040b99be8a9263134de3507fc0cc4017b  debezium-kafka-cluster  1   1    True
        Copy to Clipboard Toggle word wrap
  3. トピックの内容を確認します。

    • ターミナルウィンドウから、以下のコマンドを入力します。
    oc exec -n <project>  -it <kafka-cluster> -- /opt/kafka/bin/kafka-console-consumer.sh \
    >     --bootstrap-server localhost:9092 \
    >     --from-beginning \
    >     --property print.key=true \
    >     --topic=<topic-name>
    Copy to Clipboard Toggle word wrap

    以下に例を示します。

    oc exec -n debezium  -it debezium-kafka-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \
    >     --bootstrap-server localhost:9092 \
    >     --from-beginning \
    >     --property print.key=true \
    >     --topic=inventory-connector-mariadb.inventory.products_on_hand
    Copy to Clipboard Toggle word wrap

    トピック名を指定する形式は、手順 1 で返された oc describe コマンドと同じです (例: inventory-connector-mariadb.inventory.addresses)。

    トピックの各イベントについて、このコマンドは、以下の出力のような情報を返します。

    例2.15 Debezium 変更イベントの内容

    {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"inventory-connector-mariadb.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-mariadb.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-mariadb.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"int64","optional":false,"field":"ts_us"},{"type":"int64","optional":false,"field":"ts_ns"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mariadb.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"inventory-connector-mariadb.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"3.0.8.Final-redhat-00004","connector":"mariadb","name":"inventory-connector-mariadb","ts_ms":1638985247805,"ts_us":1638985247805000000,"ts_ns":1638985247805000000,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"mariadb-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"ts_us":1638985247805102,"ts_ns":1638985247805102588,"transaction":null}}
    Copy to Clipboard Toggle word wrap

    上記の例では、payload 値は、コネクタースナップショットがテーブル inventory.products_on_hand から読み込み ("op" ="r") イベントを生成したことを示しています。product_id レコードの "before" 状態は null であり、レコードに以前の値が存在しないことを示しています。"after" 状態は、product_id 101 を持つ項目の quantity3 であることを示しています。

2.2.6.5. Debezium MariaDB コネクター設定プロパティーの説明

Debezium MariaDB コネクターには、アプリケーションに適切なコネクター動作を実現するために使用できる多数の設定プロパティーがあります。多くのプロパティーにはデフォルト値があります。

MariaDB コネクター設定プロパティーに関する情報は、以下のように整理されます。

必要な Debezium MariaDB コネクター設定プロパティー

以下の設定プロパティーは、デフォルト値がない場合は 必須 です。

bigint.unsigned.handling.mode


デフォルト値: long
コネクターが変更イベントで BIGINT UNSIGNED 列を表す方法を指定します。以下のオプションのいずれかを設定します。

long
Java の long データ型を使用して、BIGINT UNSIGNED 列の値を表します。long 型はの精度を最適ではありませんが、大半のコンシューマーで簡単に実装できます。環境の多くでは、これが推奨される設定です。
precise
値を表すために java.math.BigDecimal データ型を使用します。コネクターは、Kafka Connect org.apache.kafka.connect.data.Decimal データ型を使用して、エンコードされたバイナリー形式で値を表します。コネクターが通常 2^63 より大きい値で動作する場合は、このオプションを設定します。long データ型ではそのサイズの値を伝達できません。
binary.handling.mode

デフォルト値: バイト
変更イベントでコネクターがバイナリー列 (Blobbinaryvarbinary など) の値を表す方法を指定します。
以下のオプションのいずれかを設定します。

bytes
バイナリーデータをバイト配列として表します。
base64
バイナリーデータを base64 でエンコードされた文字列として表します。
base64-url-safe
バイナリーデータを base64-url-safe-encoded 文字列として表します。
hex
バイナリーデータを 16 進数 (base16) でエンコードされた文字列として表します。
column.exclude.list

デフォルト値: 空の文字列
変更イベントレコードの値から除外する列の完全修飾名に一致する、オプションのコンマ区切りの正規表現のリスト。ソースレコード内の他の列は通常どおりキャプチャーされます。列の完全修飾名の形式は databaseName.tableName.columnName です。

列の名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、列名に存在する可能性のある部分文字列とは一致しない、列の名前文字列全体と照合されます。このプロパティーを設定に含める場合は、column.include.list プロパティーも設定しないでください。

column.include.list

デフォルト値: 空の文字列
変更イベントレコードの値に含める列の完全修飾名に一致する、オプションのコンマ区切りの正規表現のリスト。その他の列はイベントレコードから省略されます。列の完全修飾名の形式は databaseName.tableName.columnName です。

列の名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、列名に存在する可能性のある部分文字列とは一致しない、列の名前文字列全体と照合されます。
このプロパティーを設定に含める場合は、column.exclude.list プロパティーを設定しないでください。

column.mask.hash.v2.hashAlgorithm.with.salt.salt

デフォルト値: デフォルトなし。
文字ベースの列の完全修飾名に一致する、オプションのコンマ区切りの正規表現のリスト。列の完全修飾名の形式は <databaseName>.<tableName>.<columnName> です。
列の名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、列の名前文字列全体に対して照合されます。式は、列名に存在する可能性のある部分文字列と一致しません。作成された変更イベントレコードでは、指定された列の値は仮名に置き換えられます。
仮名は、指定された hashAlgorithmsalt を適用した結果のハッシュ値で構成されます。使用されるハッシュ関数に基づいて、参照整合性は保持され、列値は仮名に置き換えられます。サポートされるハッシュ関数は、Java Cryptography Architecture Standard Algorithm Name Documentation の MessageDigest section に説明されています。

次の例では、CzQMA0cB5K はランダムに選択された salt です。

column.mask.hash.SHA-256.with.salt.CzQMA0cB5K = inventory.orders.customerName, inventory.shipment.customerName
Copy to Clipboard Toggle word wrap

必要な場合は、仮名は自動的に列の長さに短縮されます。コネクター設定には、異なるハッシュアルゴリズムと salt を指定する複数のプロパティーを含めることができます。

使用される hashAlgorithm、選択された salt、および実際のデータセットによっては、結果のデータセットが完全にマスクされない場合があります。

ハッシュストラテジーバージョン 2 は、異なる場所またはシステムでハッシュされる値が整合性を保てるようにします。

column.mask.with.length.chars

デフォルト値: デフォルトなし。
文字ベースの列の完全修飾名に一致する、オプションのコンマ区切りの正規表現のリスト。一連の列の値をコネクターでマスクする場合 (たとえば、列に機密データが含まれている場合) は、このプロパティーを設定します。length を正の整数に設定して、指定された列のデータをプロパティー名の 長さ で指定されたアスタリスク (*) 文字数で置き換えます。指定した列のデータを空の文字列に置き換えるには、長さ0 (ゼロ) に設定します。

列の完全修飾名は、次の形式に従います: databaseName.tableName.columnName列の名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、列の名前文字列全体に対して照合されます。式は、列名に存在する可能性のある部分文字列と一致しません。

単一の設定で、異なる長さを持つ複数のプロパティーを指定できます。

column.propagate.source.type

デフォルト値: デフォルトなし。
コネクターが列のメタデータを表す追加パラメーターを発行する列の完全修飾名に一致する、オプションのコンマ区切りの正規表現のリスト。このプロパティーが設定されている場合、コネクターは次のフィールドをイベントレコードのスキーマに追加します。

  • __debezium.source.column.type
  • __debezium.source.column.length
  • __debezium.source.column.scale

    これらのパラメーターは、それぞれ列の元の型名と長さ (可変幅型の場合) を伝播します。
    コネクターがこの余分なデータを発行できるようにすると、sink データベース内の特定の数値または文字ベースの列のサイズを適切に設定するのに役立ちます。

    列の完全修飾名は、次のいずれかの形式に従います: databaseName.tableName.columnName、または databaseName.schemaName.tableName.columnName.
    列の名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、列の名前文字列全体に対して照合されます。式は、列名に存在する可能性のある部分文字列と一致しません。
column.truncate.to.length.chars

デフォルト値: デフォルトなし。
文字ベースの列の完全修飾名に一致する、オプションのコンマ区切りの正規表現のリスト。プロパティー名の 長さ で指定された文字数を超えた場合に、一連の列のデータを切り捨てる場合は、このプロパティーを設定します。length を正の整数値に設定します (例: column.truncate.to.20.chars)

列の完全修飾名は、次の形式に従います: databaseName.tableName.columnName列の名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、列の名前文字列全体に対して照合されます。式は、列名に存在する可能性のある部分文字列と一致しません。

単一の設定で、異なる長さを持つ複数のプロパティーを指定できます。

connect.timeout.ms
デフォルト値: 30000 (30 秒)。
接続要求がタイムアウトする前にコネクターが MariaDB データベースサーバーへの接続を確立するまで待機する最大時間 (ミリ秒単位) を指定する正の整数値。
connector.class
デフォルト値: デフォルトなし。
コネクターの Java クラスの名前。MariaDB コネクターの場合は常に指定します。
database.exclude.list

デフォルト値: 空の文字列
データベースの名前に一致するオプションのコンマ区切りの正規表現のリスト。ただし、コネクターに変更をキャプチャーさせません。コネクターは、database.exclude.list に名前が指定されていないデータベースの変更をキャプチャーします。

データベースの名前を照合するために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、データベースの名前文字列全体に対して照合されます。データベース名に存在する可能性のある部分文字列とは一致しません。
このプロパティーを設定に含める場合は、database.include.list プロパティーも設定しないでください。

database.hostname
デフォルト値: デフォルトなし。
MariaDB データベースサーバーの IP アドレスまたはホスト名。
database.include.list

デフォルト値: 空の文字列
コネクターが変更をキャプチャーし、さらにデータベースの名前に一致する、オプションのコンマ区切りの正規表現のリスト。コネクターは、名前が database.include.list にないデータベースの変更をキャプチャーしません。デフォルトでは、コネクターはすべてのデータベースの変更をキャプチャーします。

データベースの名前を照合するために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、データベースの名前文字列全体に対して照合されます。データベース名に存在する可能性のある部分文字列とは一致しません。
このプロパティーを設定に含める場合は、database.exclude.list プロパティーも設定しないでください。

database.password
デフォルト値: デフォルトなし。
コネクターが MariaDB データベースサーバーに接続するために使用する MariaDB ユーザーのパスワード。
database.port
デフォルト値: 3306
MariaDB データベースサーバーの整数ポート番号。
database.server.id
デフォルト値: デフォルトなし。
このデータベースクライアントの数値 ID。指定された ID は、MariaDB クラスター内で現在実行中のすべてのデータベースプロセス全体で一意である必要があります。コネクターは、binlog の読み取りを可能にするために、この一意の ID を使用して、MariaDB データベースクラスターを別のサーバーとして参加させます。
database.user
デフォルト値: デフォルトなし。
コネクターが MariaDB データベースサーバーに接続するために使用する MariaDB ユーザーの名前。
decimal.handling.mode

デフォルト値: precise
コネクターが変更イベントで DECIMAL 列と NUMERIC 列の値を処理する方法を指定します。
以下のオプションのいずれかを設定します。

precise (デフォルト)
値を正確に表すために、バイナリー形式の java.math.BigDecimal 値を使用します。
double
値を表すために double データ型を使用します。このオプションを選択すると精度が低下する可能性がありますが、ほとんどのコンシューマーにとって使いやすいものになります。
string
フォーマットされた文字列としてエンコードされます。このオプションは簡単に使用できますが、実際の型に関するセマンティック情報が失われる可能性があります。
event.deserialization.failure.handling.mode

デフォルト値: fail
binlog イベントのデシリアライズ中に例外が発生した場合にコネクターがどのように反応するかを指定します。このオプションは非推奨です。代わりに event.processing.failure.handling.mode オプションを使用してください。

fail
問題のあるイベントとその binlog オフセットを示す例外を伝播し、コネクターを停止させます。
warn
問題のあるイベントとその binlog オフセットをログに記録し、イベントをスキップします。
ignore
問題のあるイベントを通過し、何もログに記録しません。
field.name.adjustment.mode

デフォルト値: デフォルトなし。
コネクターで使用されるメッセージコンバーターとの互換性を確保するために、フィールド名を調整する方法を指定します。以下のオプションのいずれかを設定します。

none
調整はありません。
avro
Avro 名で有効でない文字をアンダースコア文字に置き換えます。
avro_unicode

アンダースコア文字または Avro 名で使用できない文字は、_uxxxx などの対応する Unicode に置き換えます。

注記
`_` is an escape sequence, similar to a backslash in Java
Copy to Clipboard Toggle word wrap

詳細は、Avro の命名 を参照してください。

gtid.source.excludes
デフォルト値: デフォルトなし。
コネクターが MariaDB サーバー上の binlog の位置を特定するために使用する GTID セット内のソースドメイン ID に一致する正規表現のコンマ区切りリスト。このプロパティーが設定されている場合、コネクターは、指定された exclude パターンのいずれにも一致しないソース UUID が含まれる GTID 範囲のみを使用します。

GTID の値を一致させるために、Debezium は、アンカー 正規表現として指定した正規表現を適用します。つまり、指定された式は GTID のドメイン識別子に対して照合されます。
このプロパティーを設定に含める場合は、gtid.source.includes プロパティーも設定しないでください。
gtid.source.includes
デフォルト値: デフォルトなし。
コネクターが MariaDB サーバー上の binlog の位置を特定するために使用する GTID セット内のソースドメイン ID に一致する正規表現のコンマ区切りリスト。このプロパティーが設定されている場合、コネクターは、指定された include パターンのいずれかに一致するソース UUID が含まれる GTID 範囲のみを使用します。

GTID の値を一致させるために、Debezium は、アンカー 正規表現として指定した正規表現を適用します。つまり、指定された式は GTID のドメイン識別子に対して照合されます。
このプロパティーを設定に含める場合は、gtid.source.excludes プロパティーも設定しないでください。
include.query
デフォルト値: false
変更イベントを生成した元の SQL クエリーをコネクターに含めるかどうかを指定するブール値。

このオプションを true に設定する場合は、MariaDB の binlog_annotate_row_events オプションを ON に設定して指定する必要もあります。include.querytrue の場合、スナップショットプロセスによって生成されるイベントに対するクエリーは存在しません。

include.querytrue に設定すると、変更イベントに元の SQL ステートメントを含めることで明示的に除外またはマスクされたテーブルまたはフィールドが公開される可能性があります。そのため、デフォルト設定は false です。

各ログイベントに対して元の SQL ステートメントを返すようにデータベースを設定する方法の詳細は、クエリーログイベントの有効化 を参照してください。
include.schema.changes
デフォルト値: true
ブール値は、コネクターがデータベーススキーマの変更をトピック接頭辞と同じ名前の Kafka トピックに公開するかどうかを指定します。コネクターは、データベース名が含まれるキーを持つ各スキーマの変更と、スキーマ更新を記述する JSON 構造の値を記録します。スキーマの変更を記録するこのメカニズムは、データベーススキーマ履歴への変更のコネクターの内部記録とは独立しています。
include.schema.comments

デフォルト値: false
コネクターがメタデータオブジェクトのテーブルおよび列のコメントを解析して公開するかどうかを指定するブール値。

注記

このオプションを true に設定すると、コネクターに含まれるスキーマコメントによって、各スキーマオブジェクトに大量の文字列データが追加される可能性があります。論理スキーマオブジェクトの数とサイズを増やすと、コネクターが使用するメモリーの量が増加します。

inconsistent.schema.handling.mode

デフォルト値: fail
内部スキーマ表現に存在しないテーブルを参照する binlog イベントにコネクターが応答する方法を指定します。つまり、内部表現はデータベースと一致しません。
以下のオプションのいずれかを設定します。

fail
コネクターは、問題のあるイベントとその binlog オフセットを報告する例外を出力します。その後、コネクターが停止します。
warn
コネクターは問題のあるイベントとそのバイナリーログオフセットをログに記録し、イベントをスキップします。
skip
コネクターは問題のあるイベントをスキップして、その旨はログに報告されません。
message.key.columns
デフォルト値: デフォルトなし。
指定のテーブルの Kafka トピックに公開する変更イベントレコードのカスタムメッセージキーを形成するためにコネクターが使用する列を指定する式のリスト。
デフォルトでは、Debezium はテーブルのプライマリーキー列を、出力するレコードのメッセージキーとして使用します。デフォルトの代わりに、またはプライマリーキーのないテーブルのキーを指定するには、1 つ以上の列をもとにカスタムメッセージキーを設定できます。
テーブルのカスタムメッセージキーを作成するには、テーブルとメッセージキーとして使用する列をリストします。各リストエントリーは、

<fully-qualified_tableName>:<keyColumn>,<keyColumn>

の形式を取ります。複数の列名をベースにテーブルキーを作成するには、列名の間にコンマを挿入します。
完全修飾テーブル名はそれぞれ、次の形式の正規表現です。
<databaseName>.<tableName>

プロパティーには複数のテーブルのエントリーを含めることができます。セミコロンを使用して、リスト内のテーブルエントリーを区切ります。

以下の例は、テーブル inventory.customers および purchase.orders:

inventory.customers:pk1,pk2;(.*).purchaseorders:pk3,pk4

のメッセージキーを設定します。テーブル inventory.customer の場合、列 pk1pk2 がメッセージキーとして指定されます。データベースで purchaseorders テーブルは、pk3 および pk4 サーバーのコラムをメッセージキーとして使用します。
カスタムメッセージキーの作成に使用する列の数に制限はありません。ただし、一意の鍵を指定するために必要な最小数を使用することが推奨されます。
name
デフォルト値: デフォルトなし。
コネクターの一意の名前。同じ名前を使用して別のコネクターを登録しようとすると、登録は失敗します。このプロパティーはすべての Kafka Connect コネクターに必要です。
schema.name.adjustment.mode

デフォルト値: デフォルトなし。
コネクターが使用するメッセージコンバーターとの互換性を確保するために、コネクターがスキーマ名を調整する方法を指定します。以下のオプションのいずれかを設定します。

none
調整はありません。
avro
Avro 名で有効でない文字をアンダースコア文字に置き換えます。
avro_unicode
アンダースコア文字または Avro 名で使用できない文字は、_uxxxx などの対応する Unicode に置き換えます。

注記: _ はエスケープシーケンスで、Java のバックスラッシュに似ています。
skip.messages.without.change
デフォルト値: false
含まれる列の変更が検出されない場合に、コネクターがレコードのメッセージを発行するかどうかを指定します。列は、column.include.list にリストされている場合、または column.exclude.list にリストされていない場合は、included とみなされます。含まれる列に変更がない場合にコネクターがレコードをキャプチャーしないようにするには、値を true に設定します。
table.exclude.list

デフォルト値: 空の文字列
テーブルの完全修飾テーブル識別子に一致する、オプションのコンマ区切りの正規表現のリスト。ただし、コネクターに変更をキャプチャーさせません。コネクターは table.exclude.list に含まれていないテーブルの変更をキャプチャーします。各識別子の形式は databaseName.tableName です。

列の名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、テーブル名に存在する可能性のある部分文字列とは一致しない、テーブルの名前文字列全体と照合されます。
このプロパティーを設定に含める場合は、table.include.list プロパティーも設定しないでください。

table.include.list

デフォルト値: 空の文字列
変更をキャプチャーするテーブルの完全修飾テーブル識別子に一致する、オプションのコンマ区切りの正規表現のリスト。コネクターは、table.include.list に含まれていないテーブルの変更をキャプチャしません。各識別子の形式は databaseName.tableName です。デフォルトでは、コネクターは変更をキャプチャーするように設定されている全データベース内の非システムテーブルの変更をすべてキャプチャーします。

テーブルの名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、テーブル名に存在する可能性のある部分文字列とは一致しない、テーブルの名前文字列全体と照合されます。
このプロパティーを設定に含める場合は、table.exclude.list プロパティーも設定しないでください。

tasks.max
デフォルト値: 1
このコネクターに対して作成するタスクの最大数。MariaDB コネクターは常に単一のタスクを使用するため、デフォルト値を変更しても効果はありません。
time.precision.mode

デフォルト値: adaptive_time_microseconds
コネクターが時間、日付、タイムスタンプの値を表すために使用する精度のタイプを指定します。次のいずれかのオプションを設定します。

adaptive_time_microseconds (デフォルト)
コネクターは、データベース列のタイプに基づいて、ミリ秒、マイクロ秒、またはナノ秒の精度値を使用して、データベースとまったく同じように日付、日付時刻、およびタイムスタンプの値をキャプチャーします。ただし、常にマイクロ秒としてキャプチャーされる TIME タイプフィールドは例外です。
connect
コネクターは常に、データベース列の精度に関係なくミリ秒の精度を使用する、Kafka Connect の組み込みの時間、日付、タイムスタンプの表現を使用して、時間とタイムスタンプの値を表します。
tombstones.on.delete

デフォルト値: true
delete イベントの後に tombstone イベントが続くかどうかを指定します。ソースレコードが削除された後、コネクターはトゥームストーンイベント (デフォルトの動作) を発行して、トピックの ログ圧縮 が有効になっている場合に、削除された行のキーに関連するすべてのイベントを Kafka が完全に削除できるようにします。次のいずれかのオプションを設定します。

true (デフォルト)
コネクターは、delete イベントとそれに続く tombstone イベントを発行することによって削除操作を表します。
false
コネクターは delete イベントのみを出力します。
topic.prefix

デフォルト値: デフォルトなし。
Debezium が変更をキャプチャーしている特定の MariaDB データベースサーバーまたはクラスターの namespace を提供するトピック接頭辞。トピック接頭辞は、このコネクターが発行するイベントを受信するすべての Kafka トピックの名前として使用されるので、トピック接頭辞がすべてのコネクター間で一意であることが重要です。値には、英数字、ハイフン、ドット、アンダースコアのみを使用できます。

警告

このプロパティーを設定した後は、値を変更しないでください。値を変更すると、コネクターの再起動後に、コネクターは元のトピックにイベントを引き続き送信するのではなく、新しい値に基づいた名前のトピックに後続のイベントを送信します。また、コネクターはデータベーススキーマ履歴トピックを復元できません。

高度な Debezium MariaDB コネクター設定プロパティー

次のリストは、MariaDB コネクターの高度な設定プロパティーについて説明します。これらのプロパティーのデフォルト値を変更する必要はほぼありません。そのため、コネクター設定にデフォルト値を指定する必要はありません。

connect.keep.alive
デフォルト値: true
MariaDB サーバーまたはクラスターへの接続を維持するために別のスレッドを使用するかどうかを指定するブール値。
converters

デフォルト値: デフォルトなし。
コネクターが使用できる カスタムコンバーター インスタンスのシンボリック名のコンマ区切りリストを列挙します。
たとえば、boolean です。
このプロパティーは、コネクターがカスタムコンバーターを使用できるようにするために必要です。
コネクターに設定するコンバータごとに、コンバータインターフェイスを実装するクラスの完全修飾名を指定する .type プロパティーも追加する必要があります。.type プロパティーでは、以下の形式を使用します。

<converterSymbolicName>.type

以下に例を示します。

boolean.type: io.debezium.connector.binlog.converters.TinyIntOneToBooleanConverter
Copy to Clipboard Toggle word wrap

設定されたコンバータの動作をさらに制御したい場合は、1 つ以上の設定パラメーターを追加して、コンバータに値を渡すことができます。これらの追加設定パラメ設定ーターをコンバータに関連付けるには、パラメーター名の前にコンバーターのシンボル名を付けます。

たとえば、boolean コンバーターが処理する列のサブセットを指定する selector パラメーターを定義するには、次のプロパティーを追加します。

boolean.selector=db1.table1.*, db1.table2.column1
Copy to Clipboard Toggle word wrap
custom.metric.tags
デフォルト値: デフォルトなし。
コンテキスト情報を提供するメタデータを追加して、MBean オブジェクト名をカスタマイズするタグを定義します。キーと値のペアのコンマ区切りリストを指定します。各キーは MBean オブジェクト名のタグを表し、対応する値はキーの値を表します (例:
k1=v1、k2=v2)。

コネクターは、指定されたタグを基本 MBean オブジェクト名に追加します。タグは、メトリクスデータを整理および分類するのに役立ちます。特定のアプリケーションインスタンス、環境、リージョン、バージョンなどを識別するためのタグを定義できます。詳細は、カスタマイズされた MBean 名 を参照してください。
database.initial.statements

デフォルト値: デフォルトなし。
トランザクションログを読み取る接続ではなく、データベースへの JDBC 接続が確立されたときに実行される、セミコロンで区切られた SQL ステートメントのリスト。SQL ステートメントでセミコロンを区切り文字としてではなく、文字として指定する場合は、2 つのセミコロン (;;) を使用します。

コネクターは独自の判断で JDBC 接続を確立する可能性があるため、このプロパティーはセッションパラメーターの設定専用です。DML ステートメントを実行するものではありません。

database.query.timeout.ms
デフォルト値: 600000 (10 分)。
コネクターがクエリーの完了を待機する時間をミリ秒単位で指定します。タイムアウト制限を削除するには、値を 0 (ゼロ) に設定します。
database.ssl.keystore
デフォルト値: デフォルトなし。
キーストアファイルの場所を指定するオプションの設定。キーストアファイルは、クライアントと MariaDB サーバー間の双方向認証に使用できます。
database.ssl.keystore.password
デフォルト値: デフォルトなし。
キーストアファイルのパスワード。database.ssl.keystore が設定されている場合にのみパスワードを指定します。
database.ssl.mode

デフォルト値: preferred
コネクターが暗号化された接続を使用するかどうかを指定します。以下の設定が可能です。

disabled
暗号化されていない接続の使用を指定します。
preferred (デフォルト)
サーバーが安全な接続をサポートしている場合、コネクターは暗号化された接続を確立します。サーバーが安全な接続をサポートしていない場合、コネクターは暗号化されていない接続を使用します。
required
コネクターは暗号化された接続を確立します。暗号化された接続を確立できない場合、コネクターは失敗します。
verify_ca
コネクターは、required のオプションを設定した場合と同じように動作しますが、設定された認証局 (CA) 証明書に対してサーバーの TLS 証明書も検証します。サーバーの TLS 証明書が有効な CA 証明書と一致しない場合、コネクターは失敗します。
verify_identity
コネクターは verify_ca オプションを設定した場合と同じように動作しますが、サーバー証明書がリモート接続のホストと一致するかどうかも検証します。
database.ssl.truststore
デフォルト値: デフォルトなし。
サーバー証明書検証用のトラストストアファイルの場所。
database.ssl.truststore.password
デフォルト値: デフォルトなし。
トラストストアファイルのパスワード。トラストストアの整合性をチェックし、トラストストアのロックを解除するために使用されます。
enable.time.adjuster

デフォルト値: true
コネクターが 2 桁の年指定を 4 桁に変換するかどうかを示すブール値。変換がデータベースに完全に委任される場合は、値を false に設定します。

MariaDB ユーザーは、2 桁または 4 桁の年の値を挿入できます。2 桁の値は、1970 - 2069 の範囲の年にマッピングされます。デフォルトでは、コネクターが変換を実行します。

errors.max.retries

デフォルト値: -1
接続エラーなどの再試行可能なエラーが発生する操作が実行された後にコネクターがどのように応答するかを指定します。
以下のオプションのいずれかを設定します。

-1
制限なしコネクターは常に自動的に再起動し、以前の失敗回数に関係なく、操作を再試行します。
0
Disabledコネクターはすぐに失敗し、操作を再試行することはありません。コネクターを再起動するにはユーザーの介入が必要です。
> 0
指定された最大再試行回数に達するまで、コネクターは自動的に再起動します。次の障害が発生すると、コネクターは停止し、再起動するにはユーザーの介入が必要になります。
event.converting.failure.handling.mode

デフォルト値: warn
列のデータ型と Debezium 内部スキーマで指定された型が一致しないためにテーブルレコードを変換できない場合にコネクターがどのように応答するかを指定します。
以下のオプションのいずれかを設定します。

fail
例外は、フィールドのデータ型がスキーマタイプと一致しなかったために変換が失敗したことを報告し、変換を正常に行うにはコネクターを schema _only_recovery モードで再起動する必要がある可能性があることを示します。
warn
コネクターは、変換に失敗した列のイベントフィールドに null 値を書き込み、警告ログにメッセージを書き込みます。
skip
コネクターは、変換に失敗した列のイベントフィールドに null 値を書き込み、デバッグログにメッセージを書き込みます。
event.processing.failure.handling.mode

デフォルト値: fail
問題のあるイベントに遭遇した場合など、イベントの処理中に発生する障害をコネクターがどのように処理するかを指定します。以下の設定が可能です。

fail
コネクターは、問題のあるイベントとその位置を報告する例外を発生させます。その後、コネクターが停止します。
warn
コネクターにより例外が出力されることはありません。代わりに、問題のあるイベントとその位置をログに記録し、イベントをスキップします。
ignore
コネクターは問題のあるイベントを無視し、ログエントリーは生成されません。
heartbeat.action.query

デフォルト値: デフォルトなし。
コネクターがハートビートメッセージを送信するときに、コネクターがソースデータベースで実行するクエリーを指定します。

たとえば、次のクエリーは、ソースデータベースで実行された GTID セットの状態を定期的にキャプチャーします。

INSERT INTO gtid_history_table (select @gtid_executed)

heartbeat.interval.ms

デフォルト値: 0
コネクターが Kafka トピックにハートビートメッセージを送信する頻度を指定します。デフォルトでは、コネクターによりハートビートメッセージは送信されません。

ハートビートメッセージは、コネクターがデータベースから変更イベントを受信しているかどうかを監視するのに便利です。ハートビートメッセージは、コネクターの再起動時に再送信する必要がある変更イベントの数を減らすのに役立つ可能性があります。ハートビートメッセージを送信するには、このプロパティーを、ハートビートメッセージの間隔をミリ秒単位で示す正の整数に設定します。

incremental.snapshot.allow.schema.changes
デフォルト値: false
コネクターが増分スナップショット中にスキーマの変更を許可するかどうかを指定します。値が true に設定されている場合、コネクターは増分スナップショット中にスキーマの変更を検出し、DDL のロックを回避するために現在のチャンクを再選択します。

プライマリーキーへの変更はサポートされていません。増分スナップショットの作成中にプライマリーを変更すると、誤った結果が生じる可能性があります。さらに他の制限として、スキーマの変更が列のデフォルト値にのみ影響する場合、DDL が binlog ストリームから処理されるまで変更が検出されないことが挙げられます。これはスナップショットイベントの値には影響しませんが、これらのスナップショットイベントのスキーマのデフォルトが古くなっている可能性があります。
incremental.snapshot.chunk.size
デフォルト値: 1024
コネクターが増分スナップショットチャンクを取得するときにフェッチしてメモリーに読み込む行の最大数。スナップショットは、サイズが大きいスナップショットの場合にはクエリーが少なくなるため、チャンクサイズを増やすと効率が上がります。ただし、チャンクサイズが大きい場合には、スナップショットデータのバッファーにより多くのメモリーが必要になります。チャンクサイズは、環境で最適なパフォーマンスを発揮できる値に、調整します。
incremental.snapshot.watermarking.strategy

デフォルト値: insert_insert
増分スナップショットによってキャプチャーされ、ストリーミングの再開後に再キャプチャーされる可能性のあるイベントの重複を排除するために、コネクターが増分スナップショット中に使用するウォーターマークメカニズムを指定します。
以下のオプションのいずれかを指定することができます。

insert_insert (デフォルト)
増分スナップショットを開始するシグナルを送信すると、スナップショット中に Debezium が読み取るチャンクごとに、スナップショットウィンドウを開くシグナルを記録するエントリーがシグナリングデータコレクションに書き込まれます。スナップショットが完了すると、Debezium はウィンドウを閉じるシグナルを記録する 2 番目のエントリーを挿入します。
insert_delete
増分スナップショットを開始するシグナルを送信すると、Debezium が読み取るチャンクごとに、スナップショットウィンドウを開くシグナルを記録する 1 つのエントリーがシグナリングデータコレクションに書き込まれます。スナップショットが完了すると、このエントリーは削除されます。スナップショットウィンドウを閉じるシグナルのエントリーは作成されません。シグナリングデータコレクションの急増を防ぐには、このオプションを設定します。
max.batch.size
デフォルト値: 2048
このコネクターの反復処理中に処理される必要があるイベントの各バッチの最大サイズを指定する正の整数値。
max.queue.size
デフォルト値: 8192
ブロッキングキューが保持できるレコードの最大数を指定する正の整数値。Debezium はデータベースからストリームされたイベントを読み込む際、Kafka に書き込む前にブロッキングキューにイベントを配置します。ブロッキングキューは、コネクターが Kafka に書き込むよりも速くメッセージを取り込む場合、または Kafka が利用できなくなった場合に、データベースから変更イベントを読み込むためのバックプレッシャーを提供することができます。コネクターがオフセットを定期的に記録すると、キューに保持されるイベントは無視されます。max.queue.size は常に max.batch.size の値よりも大きい値に設定してください。
max.queue.size.in.bytes
デフォルト値: 0
ブロッキングキューの最大ボリュームをバイト単位で指定する長整数値。デフォルトでは、ブロックキューにはボリューム制限は指定されません。キューが使用できるバイト数を指定するには、このプロパティーを正の long 値に設定します。
max.queue.size も設定されている場合、キューのサイズがいずれかのプロパティーで指定された制限に達すると、キューへの書き込みがブロックされます。たとえば、max.queue.size=1000max.queue.size.in.bytes=5000 と設定した場合、キューに 1000 レコードが入った後、あるいはキュー内のレコードの量が 5000 バイトに達した後、キューへの書き込みがブロックされます。
min.row.count.to.stream.results

デフォルト値: 1000
スナップショットの作成中に、コネクターは変更をキャプチャーするように設定されている各テーブルをクエリーします。コネクターは各クエリーの結果を使用して、そのテーブルのすべての行のデータが含まれる読み取りイベントを生成します。このプロパティーは、MariaDB コネクターがテーブルの結果をメモリーに格納するか、ストリーミングを行うかを決定します。メモリーへの格納はすばやく処理できますが、大量のメモリーを必要とします。ストリーミングを行うと、処理は遅くなりますが、非常に大きなテーブルにも対応できます。このプロパティーの設定は、コネクターが結果のストリーミングを行う前にテーブルに含まれる必要がある行の最小数を指定します。

すべてのテーブルサイズチェックを省略し、スナップショットの実行中に常にすべての結果をストリーミングする場合は、このプロパティーを 0 に設定します。

notification.enabled.channels

デフォルト値: デフォルトなし。
コネクターに対して有効になっている通知チャネル名のリスト。デフォルトでは、以下のチャネルが利用可能です。

  • sink
  • log
  • jmx
poll.interval.ms
デフォルト値: 500 (0.5 秒)。
コネクターがイベントのバッチ処理を開始する前に、新しい変更イベントが表示されるのを待機する時間をミリ秒単位で指定する正の整数値。
provide.transaction.metadata
デフォルト値: false
コネクターがトランザクション境界を持つイベントを生成し、トランザクションメタデータを使用して変更イベントエンベロープを強化するかどうかを決定します。コネクターにこれを実行させる場合は true を指定します。詳細は、トランザクションメタデータ を参照してください。
signal.data.collection
デフォルト値: デフォルトなし。
コネクターに シグナル を送信するために使用されるデータコレクションの完全修飾名。

<databaseName>.<tableName> の形式を使用してコレクション名を指定します。
signal.enabled.channels

デフォルト値: デフォルトなし。
コネクターに対して有効になっているシグナリングチャネル名のリスト。デフォルトでは、以下のチャネルが利用可能です。

  • source
  • kafka
  • file
  • jmx
skipped.operations

デフォルト値: t
ストリーミング中にコネクターがスキップする操作タイプのコンマ区切りリスト。以下のタイプの操作をスキップするようにコネクターを設定することができます。

  • c (挿入/作成)
  • u (更新)
  • d (削除)
  • t (省略)

コネクターに操作をスキップしてほしくない場合は、値を none に設定します。

snapshot.delay.ms
デフォルト値: デフォルトなし。
コネクターの起動時にスナップショットを実行する前にコネクターが待機する間隔 (ミリ秒単位)。クラスターで複数のコネクターを起動する場合、このプロパティーは、コネクターのリバランスが行われる原因となるスナップショットの中断を防ぐのに役立ちます。
snapshot.fetch.size
デフォルト値: 未設定。
デフォルトでは、スナップショットの作成中に、コネクターはテーブルの内容を行単位で読み取ります。バッチ内の行の最大数を指定するには、このプロパティーを設定します。
snapshot.include.collection.list
デフォルト値: table.include.list で指定されたすべてのテーブル。
スナップショットに含めるテーブルの完全修飾名 (<databaseName>.<tableName>) と一致する正規表現のコンマ区切りリスト (任意)。指定する項目は、コネクターの table.include.list プロパティーで名前を付ける必要があります。このプロパティーは、コネクターの snapshot.mode プロパティーが never. 以外の値に設定されている場合にのみ有効です。
このプロパティーは増分スナップショットの動作には影響しません。

テーブルの名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、テーブル名に存在する可能性のある部分文字列とは一致しない、テーブルの名前文字列全体と照合されます。
snapshot.lock.timeout.ms
デフォルト値: 10000
スナップショットを実行するときにテーブルロックを取得するために待機する最大時間 (ミリ秒単位) を指定する正の整数。コネクターがこの期間にテーブルロックを取得できないと、スナップショットは失敗します。詳細は、MariaDB コネクターによるデータベーススナップショットの実行方法 を説明するドキュメントを参照してください。
snapshot.locking.mode

デフォルト値: minimal
コネクターがグローバル MariaDB 読み取りロックを保持するかどうか、および保持する期間を指定します。これにより、コネクターがスナップショットを実行している間、データベースへの更新を加えることができません。以下の設定が可能です。

minimal
コネクターは、データベーススキーマやその他のメタデータを読み取るスナップショットの初期フェーズのみ、グローバル読み取りロックを保持します。スナップショットの次のフェーズでは、コネクターは各テーブルからすべての行を選択するときにロックを解除します。一貫した方法で SELECT 操作を実行するために、コネクターは REPEATABLE READ トランザクションを使用します。グローバル読み取りロックが解除されると、他の MariaDB クライアントがデータベースを更新できるようになりますが、トランザクションの期間中、コネクターは同じデータを読み取り続けるため、REPEATABLE READ 分離を使用すると、スナップショットの一貫性が確保されます。
extended
スナップショットの作成中にすべての書き込み操作をブロックします。クライアントが MariaDB の REPEATABLE READ 分離レベルと互換性のない同時操作を送信する場合は、この設定を使用します。
none
スナップショット中にコネクターがテーブルロックを取得するのを防ぎます。このオプションはすべてのスナップショットモードで許可されますが、スナップショットの作成中にスキーマの変更が発生しない場合に のみ 安全に使用できます。MyISAM エンジンで定義されたテーブルは常にテーブルロックを取得します。その結果、このオプションを設定しても、このようなテーブルはロックされます。この動作は、行レベルのロックを取得する InnoDB エンジンによって定義されたテーブルとは異なります。
snapshot.max.threads

デフォルト値: 1
初期スナップショットを実行するときにコネクターが使用するスレッドの数を指定します。並列初期スナップショットを有効にするには、プロパティーを 1 より大きい値に設定します。並列初期スナップショットでは、コネクターは複数のテーブルを同時に処理します。

重要

並列初期スナップショットは開発者プレビュー機能のみとなっています。開発者プレビューソフトウェアは、Red Hat では一切サポートされておらず、機能的に完全ではなく、実稼働環境に対応していません。開発者プレビューのソフトウェアを実稼働ワークロードまたはビジネスクリティカルなワークロードには使用しないでください。開発者プレビューソフトウェアは、今後 Red Hat 製品サービスとして追加される可能性のある製品ソフトウェアを前もって早期に利用できます。お客様はこのソフトウェアを使用して機能をテストし、開発プロセス中にフィードバックを提供できます。このソフトウェアはいつでも変更または削除される可能性があり、限定的なテストしか行われていません。Red Hat は、関連する SLA なしに、開発者プレビューソフトウェアに対するフィードバックを送信する手段を提供する場合があります。

Red Hat 開発者プレビューソフトウェアのサポート範囲の詳細は、開発者プレビューのサポート範囲 を参照してください。

snapshot.mode

デフォルト値: initial
コネクターの起動時にスナップショットを実行するための基準を指定します。以下の設定が可能です。

always
コネクターは起動するたびにスナップショットを実行します。スナップショットには、キャプチャーされたテーブルの構造およびデータが含まれます。この値を指定すると、コネクターが起動するたびに、キャプチャーされたテーブルからのデータの完全な表現がトピックに入力されます。
initial (デフォルト)
コネクターは、論理サーバー名のオフセットが記録されていない場合、または以前のスナップショットが完了しなかったことが検出された場合にのみ、スナップショットを実行します。スナップショットが完了すると、コネクターは、後続のデータベース変更のに備え、イベントレコードのストリーミングを開始します。
initial_only
コネクターは、論理サーバー名のオフセットが記録されていない場合にのみスナップショットを実行します。スナップショットが完了すると、コネクターは停止します。binlog からの変更イベントを読み取る際にストリーミングに移行しません。
schema_only
非推奨です。no_data を参照してください。
no_data
コネクターは、テーブルデータではなくスキーマのみをキャプチャーするスナップショットを実行します。トピックにデータの一貫したスナップショットを含める必要はないが、最後のコネクターの再起動後に適用されたスキーマの変更をキャプチャーする場合は、このオプションを設定します。
schema_only_recovery
非推奨です。recovery を参照してください。
recovery

損失または破損したデータベーススキーマの履歴トピックを復元するにはこのオプションを設定します。再起動後、コネクターはソーステーブルからトピックを再構築するスナップショットを実行します。また、このプロパティーを設定して、予期しない増加が発生するデータベーススキーマ履歴トピックを定期的にプルーニングすることもできます。

警告

最後のコネクターのシャットダウン後にスキーマの変更がデータベースにコミットされた場合、このモードを使用してスナップショットを実行しないでください。

never
コネクターが起動すると、スナップショットを実行するのではなく、後続のデータベース変更のイベントレコードのストリーミングがすぐに開始されます。no_data オプションが優先的に使用されるようになり、このオプションは、今後非推奨にするか検討中です。
when_needed

コネクターが起動した後、次のいずれかの状況を検出した場合にのみスナップショットが実行されます。

  • トピックのオフセットを検出できません。
  • 以前に記録されたオフセットは、サーバー上で使用できない binlog の位置または GTID を指定します。
snapshot.query.mode

デフォルト値: select_all
スナップショットを実行するときにコネクターがデータをクエリーする方法を指定します。
以下のオプションのいずれかを設定します。

select_all (デフォルト)
コネクターは、select all クエリーを使用してキャプチャーされたテーブルから行を取得し、必要に応じて、列の include および exclude リストの設定に基づいて選択された列を調整します。

この設定により、snapshot.select.statement.overrides プロパティーを使用する場合と比較して、より柔軟にスナップショットコンテンツを管理できるようになります。

snapshot.select.statement.overrides

デフォルト値: デフォルトなし。
スナップショットに含めるテーブル行を指定します。スナップショットにテーブルの行のサブセットのみを含める場合は、プロパティーを使用します。このプロパティーはスナップショットにのみ影響します。コネクターがログから読み取るイベントには影響しません。
<databaseName>.<tableName> の形式で完全修飾テーブル名のコンマ区切りリストを指定します。以下に例を示します。

"snapshot.select.statement.overrides": "inventory.products,customers.orders"

リスト内の各テーブルに対して、スナップショットを取得するときにコネクターがテーブルで実行する SELECT ステートメントを指定する別の設定プロパティーを追加します。指定した SELECT ステートメントは、スナップショットに追加するテーブル行のサブセットを決定します。この SELECT ステートメントプロパティーの名前を指定するには、次の形式を使用します。

snapshot.select.statement.overrides.<databaseName>.<tableName>。たとえば、snapshot.select.statement.overrides.customers.orders などです。

ソフト削除列 delete_flag を含む customers.orders テーブルから、スナップショットにソフト削除されていないレコードのみを含める場合は、次のプロパティーを追加します。

"snapshot.select.statement.overrides": "customer.orders",
"snapshot.select.statement.overrides.customer.orders": "SELECT * FROM [customers].[orders] WHERE delete_flag = 0 ORDER BY id DESC"
Copy to Clipboard Toggle word wrap

作成されるスナップショットでは、コネクターには delete_flag = 0 のレコードのみが含まれます。

snapshot.tables.order.by.row.count

デフォルト値: disabled
コネクターが初期スナップショットを実行するときにテーブルを処理する順序を指定します。以下のオプションのいずれかを設定します。

descending
コネクターは、行数に基づいて、最上位から最下位の順にテーブルのスナップショットを作成します。
ascending
コネクターは、行数に基づいて、最下位から最上位の順にテーブルのスナップショットを作成します。
disabled
コネクターは、初期スナップショットを実行するときに行数を無視します。
streaming.delay.ms
デフォルト値: 0
コネクターがスナップショットを完了した後、ストリーミングプロセスの開始を遅延する時間をミリ秒単位で指定します。遅延間隔を設定すると、スナップショットが完了した直後で、ストリーミングプロセスの開始前に障害が発生した場合に、コネクターがスナップショットを再開できないようにします。Kafka Connect ワーカーに設定されている offset.flush.interval.ms プロパティーの値よりも高い遅延値を設定します。
table.ignore.builtin
デフォルト値: true
組み込みシステムテーブルを無視するかどうかを指定するブール値。これは、テーブルの include および exclude リストに関係なく適用されます。デフォルトでは、システムテーブルの値に加えられた変更はキャプチャーから除外され、Debezium はシステムテーブルの変更に対してイベントを生成しません。
topic.cache.size
デフォルト値: 10000
制限された同時ハッシュマップ内のメモリーに格納できるトピック名の数を指定します。コネクターはキャッシュを使用して、データコレクションに対応するトピック名を決定します。
topic.delimiter
デフォルト値: .
コネクターがトピック名のコンポーネント間に挿入する区切り文字を指定します。
topic.heartbeat.prefix

デフォルト値: __debezium-heartbeat
コネクターがハートビートメッセージを送信するトピックの名前を指定します。トピック名の形式は次のとおりです。

topic.heartbeat.prefix.topic.prefix

たとえば、トピックの接頭辞が fulfillment の場合、デフォルトのトピック名は __debezium-heartbeat.fulfillment になります。

topic.naming.strategy
デフォルト値: io.debezium.schema.DefaultTopicNamingStrategy
コネクターが使用する TopicNamingStrategy クラスの名前。指定されたストラテジーによって、データ変更、スキーマ変更、トランザクション、ハートビートなどのイベントレコードを格納するトピックにコネクターが名前を付ける方法が決まります。
topic.transaction

デフォルト値: transaction
コネクターがトランザクションメタデータメッセージを送信するトピックの名前を指定します。トピック名のパターンは次のとおりです。

topic.prefix.topic.transaction

たとえば、トピック接頭辞が fulfillment の場合、デフォルトのトピック名は fulfillment.transaction になります。

use.nongraceful.disconnect
デフォルト値: false。
バイナリーログクライアントのキープアライブスレッドが SO_LINGER ソケットオプションを 0 に設定して、古い TCP 接続をすぐに切断するかどうかを指定するブール値。
SSLSocketImpl.close でコネクターのデッドロックが発生する場合は、値を true に設定します。

Debezium コネクターデータベーススキーマ履歴設定プロパティー

Debezium には、コネクターがスキーマ履歴トピックと対話する方法を制御する schema.history.internal.* プロパティーのセットが含まれています。

以下の表は、Debezium コネクターを設定するための schema.history.internal プロパティーを説明しています。

Expand
表2.52 コネクターデータベーススキーマ履歴の設定プロパティー
プロパティーデフォルト説明

schema.history.internal.kafka.topic

デフォルトなし

コネクターがデータベーススキーマの履歴を保存する Kafka トピックの完全名。

schema.history.internal.kafka.bootstrap.servers

デフォルトなし

Kafka クラスターへの最初の接続を確立するためにコネクターが使用するホストとポートのペアのリスト。このコネクションは、コネクターによって以前に保存されたデータベーススキーマ履歴の取得や、ソースデータベースから読み取られる各 DDL ステートメントの書き込みに使用されます。各ペアは、Kafka Connect プロセスによって使用される同じ Kafka クラスターを示す必要があります。

schema.history.internal.kafka.recovery.poll.interval.ms

100

永続化されたデータのポーリングが行われている間にコネクターが起動/回復を待つ最大時間 (ミリ秒単位) を指定する整数値。デフォルトは 100 ミリ秒です。

schema.history.internal.kafka.query.timeout.ms

3000

Kafka 管理クライアントを使用してクラスター情報を取得する際に、コネクターが待機すべき最大ミリ秒数を指定する整数値です。

schema.history.internal.kafka.create.timeout.ms

30000

Kafka 管理クライアントを使用して kafka 履歴トピックを作成する間、コネクターが待機する最大ミリ秒数を指定する整数値。

schema.history.internal.kafka.recovery.attempts

100

エラーでコネクターのリカバリーが失敗する前に、コネクターが永続化された履歴データの読み取りを試行する最大回数。データが受信されなかった場合に最大待機する時間は、recovery.attempts × recovery.poll.interval.ms です。

schema.history.internal.skip.unparseable.ddl

false

コネクターが不正または不明なデータベースのステートメントを無視するかどうか、または人が問題を修正するために処理を停止するかどうかを指定するブール値。安全なデフォルトは false です。スキップは、binlog の処理中にデータの損失や分割を引き起こす可能性があるため、必ず注意して使用する必要があります。

schema.history.internal.store.only.captured.tables.ddl

false

コネクターがスキーマまたはデータベース内のすべてのテーブルからスキーマ構造を記録するか、キャプチャー対象に指定されたテーブルのみからスキーマ構造を記録するかを指定するブール値。
以下のいずれかの値を指定します。

false (デフォルト)
データベースのスナップショット中に、コネクターは、キャプチャー対象として指定されていないテーブルを含む、データベース内のシステム以外のテーブルのスキーマデータをすべて記録します。デフォルト設定を保持することを推奨します。後で、最初にキャプチャー対象として指定しなかったテーブルから変更をキャプチャーすることにした場合、コネクターはそれらのテーブルからのデータのキャプチャーを簡単に開始できます。これは、テーブルのスキーマ構造がすでにスキーマ履歴トピックに格納されているためです。Debezium では、変更イベントが発生した時点で存在していた構造を識別できるように、テーブルのスキーマ履歴が必要です。
true
データベースのスナップショット中に、コネクターは、Debezium が変更イベントをキャプチャーするテーブルのテーブルスキーマのみを記録します。デフォルト値を変更して、後でデータベース内の他のテーブルからデータをキャプチャーするようにコネクターを設定すると、コネクターには、テーブルから変更イベントをキャプチャーするために必要なスキーマ情報がなくなります。

schema.history.internal.store.only.captured.databases.ddl

false

コネクターがデータベースインスタンス内のすべての論理データベースのスキーマ構造を記録するかどうかを指定するブール値。
以下のいずれかの値を指定します。

true
コネクターは、論理データベース内のテーブルのスキーマ構造と、Debezium が変更イベントをキャプチャーするスキーマのみを記録します。
false
コネクターは、すべての論理データベースのスキーマ構造を記録します。

パススルー MariaDB コネクター設定プロパティー

コネクター設定で pass-through プロパティーを設定して、Apache Kafka プロデューサーとコンシューマーの動作をカスタマイズできます。Kafka プロデューサーとコンシューマーの全設定プロパティーの詳細は、Kafka ドキュメント を参照してください。

プロデューサーとコンシューマーのクライアントがスキーマ履歴トピックと対話する方法を設定するための Pass-through プロパティー

Debezium は、データベーススキーマ履歴トピックへのスキーマ変更を記述するために Apache Kafka プロデューサーに依存しています。同様に、コネクターが起動すると、データベーススキーマ履歴トピックから読み取る Kafka コンシューマーに依存します。schema.history.internal.producer.* および schema.history.internal.consumer.* 接頭辞で始まるパススルー設定プロパティーのセットに値を割り当てて、Kafka プロデューサーおよびコンシューマークライアントの設定を定義します。パススループロデューサーおよびコンシューマーデータベーススキーマ履歴プロパティーは、以下の例のように Kafka ブローカーとのこれらのクライアントの接続をセキュアにする方法など、さまざまな動作を制御します。

schema.history.internal.producer.security.protocol=SSL
schema.history.internal.producer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
schema.history.internal.producer.ssl.keystore.password=test1234
schema.history.internal.producer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
schema.history.internal.producer.ssl.truststore.password=test1234
schema.history.internal.producer.ssl.key.password=test1234

schema.history.internal.consumer.security.protocol=SSL
schema.history.internal.consumer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
schema.history.internal.consumer.ssl.keystore.password=test1234
schema.history.internal.consumer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
schema.history.internal.consumer.ssl.truststore.password=test1234
schema.history.internal.consumer.ssl.key.password=test1234
Copy to Clipboard Toggle word wrap

Debezium は、プロパティーを Kafka クライアントに渡す前に、プロパティー名から接頭辞を削除します。

Kafka プロデューサー設定プロパティーKafka コンシューマー設定プロパティー の詳細は、Apache Kafka ドキュメントを参照してください。

MariaDB コネクターが Kafka シグナリングトピックと対話する方法を設定するための Pass-through プロパティー

Debezium は、コネクターが Kafka シグナルトピックと対話する方法を制御する signal.* プロパティーのセットを提供します。

以下の表は、Kafka signal プロパティーを説明しています。

Expand
表2.53 Kafka のシグナル設定プロパティー
プロパティーデフォルト説明

signal.kafka.topic

<topic.prefix>-signal

コネクターがアドホックシグナルについて監視する Kafka トピックの名前。

注記

トピックの自動作成 が無効になっている場合は、必要なシグナリングトピックを手動で作成する必要があります。シグナルの順序を維持するには、シグナリングトピックが必要です。シグナリングトピックには単一のパーティションが必要です。

signal.kafka.groupId

kafka-signal

Kafka コンシューマーによって使用されるグループ ID の名前。

signal.kafka.bootstrap.servers

デフォルトなし

コネクターが Kafka クラスターへの初期接続を確立するために使用するホストとポートのペアのリスト。各ペアは、Debezium Kafka Connect プロセスによって使用される Kafka クラスターを参照します。

signal.kafka.poll.timeout.ms

100

コネクターが信号をポーリングするときに待機する最大ミリ秒数を指定する整数値。

シグナリングチャネルの Kafka コンシューマークライアントを設定するためのパススループロパティー

Debezium コネクターでは、Kafka コンシューマーのパススルー設定が可能です。パススルーシグナルのプロパティーは、接頭辞 signals.consumer.* で始まります。たとえば、コネクターは signal.consumer.security.protocol=SSL などのプロパティーを Kafka コンシューマーに渡します。

Debezium は、プロパティーを Kafka シグナルコンシューマーに渡す前に、プロパティーから接頭辞を削除します。

MariaDB コネクター sink notification チャネルを設定するためのパススループロパティー

次の表では、Debezium sink notification チャネルの設定に使用できるプロパティーについて説明します。

Expand
表2.54 Sink notification 設定プロパティー
プロパティーデフォルト説明

notification.sink.topic.name

デフォルトなし

Debezium から通知を受信するトピックの名前。このプロパティーは、有効な通知チャネルの 1 つとして sink を含めるように notification.enabled.channels プロパティーを設定する場合に必要です。

Debezium コネクターのパススルーデータベースドライバー設定プロパティー

Debezium コネクターでは、データベースドライバーのパススルー設定が可能です。パススルーデータベースプロパティーは接頭辞 driver.* で始まります。たとえば、コネクターは driver.foobar=false などのプロパティーを JDBC URL に渡します。

Debezium は、プロパティーをデータベースドライバーに渡す前に、プロパティーから接頭辞を削除します。

2.2.7. Debezium MariaDB コネクターのパフォーマンスの監視

Debezium MariaDB コネクターは、Zookeeper、Kafka、および Kafka Connect によって提供される JMX メトリクスの組み込みサポートに加えて、3 種類のメトリクスを提供します。

Debezium の監視に関するドキュメント では、JMX を使用してこれらのメトリクスを公開する方法の詳細を説明しています。

Debezium コネクターは、コネクターの MBean 名を介してメトリクスを公開します。これらのメトリクスは各コネクターインスタンスに固有であり、コネクターのスナップショット、ストリーミング、およびスキーマ履歴プロセスの動作に関するデータを提供します。

デフォルトでは、正しく設定されたコネクターをデプロイすると、Debezium はさまざまなコネクターメトリクスごとに一意の MBean 名を生成します。コネクタープロセスのメトリクスを表示するには、MBean を監視するように可観測性スタックを設定します。ただし、これらのデフォルトの MBean 名はコネクター設定に依存しており、設定の変更によって MBean 名が変更される場合があります。MBean 名を変更すると、コネクターインスタンスと MBean 間のリンクが切断され、監視アクティビティーが中断されます。このシナリオでは、監視を再開するには、新しい MBean 名を使用するように監視スタックを再設定する必要があります。

MBean 名の変更が原因で監視が中断されないように、カスタムメトリクスタグを設定できます。カスタムメトリクスを設定するには、コネクター設定に custom.metric.tags プロパティーを追加します。このプロパティーは、各キーが MBean オブジェクト名のタグを表し、対応する値がそのタグの値を表すキーと値のペアを受け入れます。たとえば、k1=v1,k2=v2 です。Debezium は、指定されたタグをコネクターの MBean 名に追加します。

コネクターの custom.metric.tags プロパティーを設定した後、指定されたタグに関連付けられたメトリクスを取得するように監視スタックを設定できます。可観測性スタックは、変更可能な MBean 名ではなく、指定されたタグを使用してコネクターを一意に識別します。その後、Debezium が MBean 名の構築方法を再定義したり、コネクター設定の topic.prefix が変更されたりしても、メトリクススクレイプタスクは指定されたタグパターンを使用してコネクターを識別するため、メトリクスの収集は中断されません。

カスタムタグを使用するさらなる利点は、データパイプラインのアーキテクチャーを反映するタグを使用できるため、運用上のニーズに合った方法でメトリクスを整理できることです。たとえば、コネクターアクティビティーのタイプ、アプリケーションコンテキスト、またはデータソースを宣言する値を持つタグを指定できます (例: db1-streaming-for-application-abc)。複数のキーと値のペアを指定すると、指定されたすべてのペアがコネクターの MBean 名に追加されます。

次の例は、タグがデフォルトの MBean 名を変更する方法を示しています。

例2.16 カスタムタグがコネクター MBean 名を変更する方法

デフォルトでは、MariaDB コネクターはストリーミングメトリクスに次の MBean 名を使用します。

debezium.mariadb:type=connector-metrics,context=streaming,server=<topic.prefix>
Copy to Clipboard Toggle word wrap

custom.metric.tags の値を database=salesdb-streaming,table=inventory に設定すると、Debezium は次のカスタム MBean 名を生成します。

debezium.mariadb:type=connector-metrics,context=streaming,server=<topic.prefix>,database=salesdb-streaming,table=inventory
Copy to Clipboard Toggle word wrap

2.2.7.2. MariaDB データベースのスナップショット中の Debezium の監視

MBeandebezium.mariadb:type=connector-metrics,context=snapshot,server=<topic.prefix> です。

スナップショット操作がアクティブでない場合や、最後のコネクターの起動後にスナップショットの作成が発生した場合に、スナップショットメトリクスは公開されません。

次の表に、使用可能なスナップショットメトリクスを示します。

Expand
属性タイプ説明

LastEvent

string

コネクターが読み取りした最後のスナップショットイベント。

MilliSecondsSinceLastEvent

long

コネクターが最新のイベントを読み取りおよび処理してからの経過時間 (ミリ秒単位)。

TotalNumberOfEventsSeen

long

前回の開始またはリセット以降にコネクターで確認されたイベントの合計数。

NumberOfEventsFiltered

long

コネクターに設定された include/exclude リストのフィルタリングルールによってフィルターされたイベントの数。

CapturedTables

string[]

コネクターによって取得されるテーブルのリスト。

QueueTotalCapacity

int

snapshotter とメインの Kafka Connect ループの間でイベントを渡すために使用されるキューの長さ。

QueueRemainingCapacity

int

snapshotter とメインの Kafka Connect ループの間でイベントを渡すために使用されるキューの空き容量。

TotalTableCount

int

スナップショットに含まれているテーブルの合計数。

RemainingTableCount

int

スナップショットによってまだコピーされていないテーブルの数。

SnapshotRunning

boolean

スナップショットが起動されたかどうか。

SnapshotPaused

boolean

スナップショットが一時停止されたかどうか。

SnapshotAborted

boolean

スナップショットが中断されたかどうか。

SnapshotCompleted

boolean

スナップショットが完了したかどうか。

SnapshotDurationInSeconds

long

スナップショットが完了したかどうかに関わらず、これまでスナップショットにかかった時間 (秒単位)。スナップショットが一時停止された時間も含まれます。

SnapshotPausedDurationInSeconds

long

スナップショットが一時停止された合計秒数。スナップショットが数回一時停止された場合は、一時停止時間が加算されます。

RowsScanned

Map<String, Long>

スナップショットの各テーブルに対してスキャンされる行数が含まれるマップ。テーブルは、処理中に増分がマップに追加されます。スキャンされた 10,000 行ごとに、テーブルの完成時に更新されます。

MaxQueueSizeInBytes

long

キューの最大バッファー (バイト単位)。このメトリクスは、max.queue.size.in.bytes が正の長さの値に設定されている場合に利用できます。

CurrentQueueSizeInBytes

long

キュー内のレコードの現在の容量 (バイト単位)。

コネクターは、増分スナップショットの実行時に、以下の追加のスナップショットメトリクスも提供します。

Expand
属性タイプ説明

ChunkId

string

現在のスナップショットチャンクの識別子。

ChunkFrom

string

現在のチャンクを定義するプライマリーキーセットの下限。

ChunkTo

string

現在のチャンクを定義するプライマリーキーセットの上限。

TableFrom

string

現在スナップショットされているテーブルのプライマリーキーセットの下限。

TableTo

string

現在スナップショットされているテーブルのプライマリーキーセットの上限。

2.2.7.3. Debezium MariaDB コネクターレコードストリーミングの監視

Debezium MariaDB コネクターは、Zookeeper、Kafka、および Kafka Connect によって提供される JMX メトリクスの組み込みサポートに加えて、3 種類のメトリクスを提供します。

Debezium の監視に関するドキュメント では、JMX を使用してこれらのメトリクスを公開する方法の詳細を説明しています。

MBeandebezium.mariadb:type=connector-metrics,context=streaming,server=<topic.prefix> です。

以下の表は、利用可能なストリーミングメトリクスのリストです。

Expand
属性タイプ説明

LastEvent

string

コネクターが読み取られた最後のストリーミングイベント。

MilliSecondsSinceLastEvent

long

コネクターが最新のイベントを読み取りおよび処理してからの経過時間 (ミリ秒単位)。

TotalNumberOfEventsSeen

long

コネクターを最後に起動してから、またはメトリクスをリセットしてから、ソースデータベースによって報告されたデータ変更イベントの合計数。Debezium が処理するデータ変更ワークロードを表します。

TotalNumberOfCreateEventsSeen

long

コネクターを最後に起動してから、またはメトリクスをリセットしてから、コネクターによって処理された作成イベントの合計数。

TotalNumberOfUpdateEventsSeen

long

コネクターを最後に起動してから、またはメトリクスをリセットしてから、コネクターによって処理された更新イベントの合計数。

TotalNumberOfDeleteEventsSeen

long

コネクターを最後に起動してから、またはメトリクスをリセットしてから、コネクターによって処理された削除イベントの合計数。

NumberOfEventsFiltered

long

コネクターに設定された include/exclude リストのフィルタリングルールによってフィルターされたイベントの数。

CapturedTables

string[]

コネクターによって取得されるテーブルのリスト。

QueueTotalCapacity

int

ストリーマーとメイン Kafka Connect ループの間でイベントを渡すために使用されるキューの長さ。

QueueRemainingCapacity

int

ストリーマーとメインの Kafka Connect ループの間でイベントを渡すために使用されるキューの空き容量。

Connected

boolean

コネクターが現在データベースサーバーに接続されているかどうかを示すフラグ。

MilliSecondsBehindSource

long

最後の変更イベントのタイムスタンプとそれを処理するコネクターとの間の期間 (ミリ秒単位)。この値には、データベースサーバーとコネクターが実行されているマシンのクロックの差が組み込まれます。

NumberOfCommittedTransactions

long

コミットされた処理済みトランザクションの数。

SourceEventPosition

Map<String, String>

最後に受信したイベントの位置。

LastTransactionId

string

最後に処理されたトランザクションのトランザクション識別子。

MaxQueueSizeInBytes

long

キューの最大バッファー (バイト単位)。このメトリクスは、max.queue.size.in.bytes が正の長さの値に設定されている場合に利用できます。

CurrentQueueSizeInBytes

long

キュー内のレコードの現在の容量 (バイト単位)。

2.2.7.4. Debezium MariaDB コネクタースキーマ履歴の監視

MBeandebezium.mariadb:type=connector-metrics,context=schema-history,server=<topic.prefix> です。

以下の表は、利用可能なスキーマ履歴メトリクスのリストです。

Expand
属性タイプ説明

Status

string

データベーススキーマ履歴の状態を示す STOPPEDRECOVERING (ストレージから履歴を復元)、または RUNNING のいずれか。

RecoveryStartTime

long

リカバリーが開始された時点のエポック秒の時間。

ChangesRecovered

long

リカバリーフェーズ中に読み取られた変更の数。

ChangesApplied

long

リカバリーおよびランタイム中に適用されるスキーマ変更の合計数。

MilliSecondsSinceLast​RecoveredChange

long

最後の変更が履歴ストアから復元された時点からの経過時間 (ミリ秒単位)。

MilliSecondsSinceLast​AppliedChange

long

最後の変更が適用された時点からの経過時間 (ミリ秒単位)。

LastRecoveredChange

string

履歴ストアから復元された最後の変更の文字列表現。

LastAppliedChange

string

最後に適用された変更の文字列表現。

2.2.8. Debezium MariaDB コネクターが障害や問題を処理する方法

Debezium は、複数のアップストリームデータベースのすべての変更をキャプチャーする分散システムであり、イベントの見逃しや損失は発生しません。システムが正常に操作している場合や、慎重に管理されている場合は、Debezium は変更イベントレコードごとに 1 度だけ 配信します。

障害が発生しても、システムからイベントがなくなることはありません。ただし、Debezium が障害から回復している間に、いくつかの変更イベントが繰り返される可能性があります。このような正常でない状態では、Debezium は Kafka と同様に、変更イベントを 少なくとも 1 回 配信します。

詳細は以下を参照してください。

設定および起動エラー

以下の状況では、起動時にコネクターが失敗し、エラーまたは例外がログに記録され、実行が停止されます。

  • コネクターの設定が無効である。
  • 指定された接続パラメーターを使用してコネクターを MariaDB サーバーに正常に接続できない。
  • MariaDB に履歴がない binlog の位置でコネクターが再起動を試行する。

このような場合、エラーメッセージには問題の詳細が含まれ、推奨される回避策も含まれることがあります。設定の修正したり、MariaDB の問題に対処した後、コネクターを再起動します。

MariaDB が利用できなくなる
MariaDB サーバーが利用できなくなった場合、Debezium MariaDB コネクターはエラーで失敗し、コネクターが停止します。サーバーが再び使用できるようになったら、コネクターを再起動します。

ただし、高可用性の MariaDB クラスターに接続している場合は、コネクターをすぐに再起動できます。これはクラスターの別の MariaDB サーバーに接続し、最後のトランザクションを表すサーバーの binlog の場所を特定し、その特定の場所から新しいサーバーの binlog の読み取りを開始します。

Kafka Connect が正常に停止する
Kafka Connect が正常に停止すると、Debezium MariaDB コネクタータスクが停止され、新しい Kafka Connect プロセスで再起動される間に短い遅延が発生します。
Kafka Connect プロセスのクラッシュ
Kafka Connect がクラッシュすると、プロセスが停止し、最後に処理されたオフセットが記録されずに Debezium MariaDB コネクタータスクが終了します。分散モードでは、Kafka Connect は他のプロセスでコネクタータスクを再起動します。ただし、MariaDB コネクターは以前のプロセスで記録された最後のオフセットから再開します。その結果、代わりのタスクによってクラッシュ前に処理された一部のイベントが再生成され、重複したイベントが作成されることがあります。

各変更イベントメッセージには、重複イベントの特定に使用できるソース固有の情報が含まれます。以下に例を示します。

  • イベント元
  • MariaDB サーバーのイベント時間
  • binlog ファイル名と位置
  • GTID
Kafka が使用不可能になる
Kafka Connect フレームワークは、Kafka プロデューサー API を使用して Debezium 変更イベントを記録します。Kafka ブローカーが利用できなくなると、Debezium MariaDB コネクターは接続が再確立されるまで一時停止され、一時停止した位置から再開されます。
MariaDB が binlog ファイルをパージする
Debezium MariaDB コネクターが長時間停止すると、MariaDB サーバーは古い binlog ファイルを消去し、コネクターの最後の位置が失われる可能性があります。コネクターが再起動すると、MariaDB サーバーに開始点がなくなり、コネクターは別の最初のスナップショットを実行します。スナップショットが無効の場合、コネクターはエラーによって失敗します。

MariaDB コネクターが初期スナップショットを実行する方法の詳細は、スナップショット を参照してください。

トップに戻る
Red Hat logoGithubredditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。 最新の更新を見る.

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

Theme

© 2025 Red Hat