第1章 MySQL の Debezium コネクター


MySQL には、データベースにコミットされた順序ですべての操作を記録するバイナリーログ (binlog) があります。これには、テーブルスキーマとテーブル内のデータの変更が含まれます。MySQL はレプリケーションとリカバリーに binlog を使用します。

MySQL コネクターは binlog を読み取り、行レベルの INSERTUPDATEDELETE 操作の変更イベントを生成し、Kafka トピックに変更イベントを記録します。クライアントアプリケーションはこれらの Kafka トピックを読み取ります。

MySQL は通常、指定された期間後に binlogs をパージするように設定されているため、MySQL コネクターは各データベースの 整合性スナップショット を実行し、初期 整合性スナップショット を実行します。MySQL コネクターは、スナップショットが作成された時点から binlog を読み取ります。

1.1. MySQL コネクターの仕組みの概要

Debezium MySQL コネクターはテーブルの構造を追跡し、スナップショットを実行し、binlog イベントを Debezium 変更イベントに変換し、これらのイベントが Kafka に記録されます。

1.1.1. MySQL コネクターによるデータベーススキーマの使用方法

データベースクライアントがデータベースをクエリーすると、データベースの現在のスキーマが使用されます。データベーススキーマが頻繁に変更されると、Debezium MySQL コネクターINSERTUPDATE、および DELETE 操作ごとにスキーマがどのように表示されるかを認識します。

MySQL には、各テーブルのスキーマのインメモリー表現を解析し、更新する binlog の 行レベルの変更DDL ステートメント の両方が含まれます。これは、正確な変更イベントを生成する各操作時にテーブル構造を理解するために使用されます。

注記

コネクターは、すべての DDL ステートメントと、別のデータベース履歴の位置を記録し、コネクターの再起動時(クラッシュまたは正常なシャットダウンの後)が、その特定の時点からの binlog の読み取りを続行します。

ヒント

トピックの命名規則の詳細 は、MySQL コネクターおよび Kafka トピック を参照してください。

関連情報

  • ここで説明する データベース履歴トピック を使用しない場合は、スキーマ変更トピック を参照してください。

1.1.2. MySQL コネクターによるデータベーススナップショットの実行方法

Debezium MySQL コネクターが最初に起動すると、データベースの最初の 整合性スナップショット が実行されます。以下のフローは、このスナップショットの完了方法を説明します。

注記

これは、snapshot.mode プロパティーの 初期 として設定されるデフォルトの スナップショットモード です。他のスナップショットモードについては、MySQL コネクター設定プロパティー を確認してください。

コネクター
Stepアクション

1

他のデータベースクライアントによる 書き込み をブロックする グローバル読み取りロック を取得します。

注記

スナップショット自体は、コネクターの binlog の位置およびテーブルスキーマの読み取りを干渉する可能性のある DDL の適用を防ぐことはありません。グローバル読み取りロックは、後のステップでリリースする前に binlog の位置が読み取られている間に保持されます。

2

繰り返し可能な読み取りセマンティクス でトランザクションを開始し、トランザクション内の後続の読み取りがすべて 整合性スナップショット に対して実行されるようにします。

3

現在の binlog の位置を読み取ります。

4

コネクターの設定によって許可されるデータベースおよびテーブルのスキーマを読み取ります。

5

グローバル読み取りロック を解放します。これにより、他のデータベースクライアントがデータベースに書き込みできるようになりました。

6

DDL の変更をスキーマ変更トピックに書き込みます。これには、必要な DROP…​ および CREATE…​ DDL ステートメントがすべて含まれます。

注記

これは、該当する場合に発生します。

7

データベーステーブルをスキャンし、各行に関連するテーブル固有の Kafka トピックで CREATE イベントを生成します。

8

トランザクションをコミットします。

9

コネクターオフセットの完了済みスナップショットを記録します。

1.1.2.1. コネクターが失敗するとどうなりますか ?

最初のスナップ ショットの実行中にコネクターが失敗、停止、またはリバランスされると、コネクターは再起動後に新しいスナップショットを作成します。この意図的な スナップショット が完了すると、Debezium MySQL コネクターは binlog の同じ位置から再起動するため、更新が見逃されることはありません。

注記

コネクターが長時間停止した場合、MySQL が古い binlog ファイルをパージし、コネクターの位置が失われる可能性があります。位置が失われた場合、コネクターは 最初のスナップショット を開始位置に戻します。Debezium MySQL コネクターのトラブルシューティングに関する詳細は、MySQL connector common issues を参照してください。

1.1.2.2. グローバル読み取りロックが許可されていない場合はどうすればよいですか ?

一部の環境では、グローバル読み取りロック が許可されません。Debezium MySQL コネクターがグローバル読み取りロックが許可されないことを検出すると、代わりにテーブルレベルロックを使用して、この方法でスナップショットを実行します。

重要

ユーザーが LOCK_TABLES 権限を持っている必要があります。

コネクター
Stepアクション

1

繰り返し可能な読み取りセマンティクス でトランザクションを開始し、トランザクション内の後続の読み取りがすべて 整合性スナップショット に対して実行されるようにします。

2

データベースとテーブルの名前を読み取り、選別します。

3

現在の binlog の位置を読み取ります。

4

コネクターの設定によって許可されるデータベースおよびテーブルのスキーマを読み取ります。

5

DDL の変更をスキーマ変更トピックに書き込みます。これには、必要な DROP…​ および CREATE…​ DDL ステートメントがすべて含まれます。

注記

これは、該当する場合に発生します。

6

データベーステーブルをスキャンし、各行に関連するテーブル固有の Kafka トピックで CREATE イベントを生成します。

7

トランザクションをコミットします。

8

テーブルレベルロックを解除します。

9

コネクターオフセットの完了済みスナップショットを記録します。

1.1.3. MySQL コネクターがスキーマ変更トピックを処理する方法

Debezium MySQL コネクターを設定すると、MySQL サーバーのデータベースに適用されるすべての DDL ステートメントが含まれるスキーマ変更イベントを生成できます。コネクターは、これらのイベントをすべて < serverName > という名前の Kafka トピックに書き込みます。serverNamedatabase.server.name 設定プロパティーに指定されたコネクターの名前になります。

重要

スキーマ変更イベント を使用する場合は、スキーマ変更トピックを使用し、データベース履歴トピックを使用し ません

注記

スキーマの変更が正しい順序で保持されるように、Kafka の num.partitions 設定が 1 に設定されていることを確認してください。

1.1.3.1. スキーマ変更トピック構造

スキーマ変更トピックに書き込まれる各メッセージには、DDL ステートメントの適用時に使用される接続されたデータベースの名前が含まれるメッセージキーが含まれます。

{
  "schema": {
    "type": "struct",
    "name": "io.debezium.connector.mysql.SchemaChangeKey",
    "optional": false,
    "fields": [
      {
        "field": "databaseName",
        "type": "string",
        "optional": false
      }
    ]
  },
  "payload": {
    "databaseName": "inventory"
  }
}

スキーマ変更イベントメッセージの値には、DDL ステートメント、ステートメントが適用されるデータベース、およびステートメントが現れる binlog の位置が含まれる構造が含まれます。

{
  "schema": {
    "type": "struct",
    "name": "io.debezium.connector.mysql.SchemaChangeValue",
    "optional": false,
    "fields": [
      {
        "field": "databaseName",
        "type": "string",
        "optional": false
      },
      {
        "field": "ddl",
        "type": "string",
        "optional": false
      },
      {
        "field": "source",
        "type": "struct",
        "name": "io.debezium.connector.mysql.Source",
        "optional": false,
        "fields": [
          {
            "type": "string",
            "optional": true,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "server_id"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_sec"
          },
          {
            "type": "string",
            "optional": true,
            "field": "gtid"
          },
          {
            "type": "string",
            "optional": false,
            "field": "file"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "pos"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "row"
          },
          {
            "type": "boolean",
            "optional": true,
            "default": false,
            "field": "snapshot"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "thread"
          },
          {
            "type": "string",
            "optional": true,
            "field": "db"
          },
          {
            "type": "string",
            "optional": true,
            "field": "table"
          },
          {
            "type": "string",
            "optional": true,
            "field": "query"
          }
        ]
      }
    ]
  },
  "payload": {
    "databaseName": "inventory",
    "ddl": "CREATE TABLE products ( id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, name VARCHAR(255) NOT NULL, description VARCHAR(512), weight FLOAT ); ALTER TABLE products AUTO_INCREMENT = 101;",
    "source" : {
      "version": "0.10.0.Beta4",
      "name": "mysql-server-1",
      "server_id": 0,
      "ts_sec": 0,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 154,
      "row": 0,
      "snapshot": true,
      "thread": null,
      "db": null,
      "table": null,
      "query": null
    }
  }
}
1.1.3.1.1. スキーマ変更トピックに関する重要なヒント

ddl フィールドには複数の DDL ステートメントが含まれる場合があります。すべてのステートメントは databaseName フィールドのデータベースに適用され、データベースに適用されるのと同じ順序で表示されます。source フィールドは、テーブル固有のトピックに書き込まれた標準のデータ変更イベントとして設定されます。このフィールドは、異なるトピックでイベントを関連付けるのに役立ちます。

....
    "payload": {
        "databaseName": "inventory",
        "ddl": "CREATE TABLE products ( id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,...
        "source" : {
            ....
        }
    }
....
クライアントが DDL ステートメントを 複数のデータベース に送信する場合
  • MySQL がこれらをアトミックに適用する場合、コネクターは DDL ステートメントを順番に取得し、データベース別にグループ化して、各グループにスキーマ変更イベントを作成します。
  • MySQL がこれらを個別に適用すると、コネクターは各ステートメントに対して個別のスキーマ変更イベントを作成します。

関連情報

1.1.4. MySQL コネクターイベント

Debezium MySQL コネクターによって生成されたすべてのデータ変更イベントには、キーと値が含まれます。変更イベントキーと変更イベント値には、それぞれ スキーマペイロードが含まれ、スキーマはペイロード の構造を記述し、ペイロードにはデータが含まれます。

警告

MySQL コネクターは、すべての Kafka Connect スキーマ名が Avro スキーマ名の形式 に準拠するようにします。これは、文字やアンダースコアではない文字がアンダースコアに置き換えられます。これにより、論理サーバー名、データベース名、およびテーブル名コンテナー(これらのアンダースコアに置き換えられたその他の文字)時にスキーマ名で予期しない競合が発生する可能性があります。

1.1.4.1. 変更イベントキー

指定のテーブルでは、変更イベントのキーには、イベントの作成時に プライマリーRY KEY (または一意の制約)の各列のフィールドが含まれる構造があります。サンプルのテーブルと、そのテーブルのスキーマとペイロードがどのように表示されるかを見てみましょう。

テーブルの例

CREATE TABLE customers (
  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  first_name VARCHAR(255) NOT NULL,
  last_name VARCHAR(255) NOT NULL,
  email VARCHAR(255) NOT NULL UNIQUE KEY
) AUTO_INCREMENT=1001;

変更イベントキーの例

{
 "schema": { 1
    "type": "struct",
 "name": "mysql-server-1.inventory.customers.Key", 2
 "optional": false, 3
 "fields": [ 4
      {
        "field": "id",
        "type": "int32",
        "optional": false
      }
    ]
  },
 "payload": { 5
    "id": 1001
  }
}

1
スキーマ は、ペイロード の内容を記述します。
2
mysql-server- 1.inventory.customers.Key は、mysql-server- 1 がコネクター名、inventory はデータベース、customers がテーブルである構造を定義するスキーマの名前です。
3
ペイロード はオプションではないことを示します。
4
ペイロード で想定されるフィールドのタイプを指定します。
5
ペイロード自体。この場合、単一の id フィールドのみが含まれます。

このキーは、id プライマリーキー列の値が 1001 である mysql-server-1 のコネクターから外れる inventory.customers テーブルの行を記述します。

1.1.4.2. 変更イベント値

変更イベント値には、schema および payload セクションが含まれます。エンベロープ構造を持つ変更イベント値には 3 つのタイプがあります。この構造のフィールドについては以下に説明され、各変更イベント値の例でマークが付けられます。

項目フィールド名説明

1

name

mysql-server- 1.inventory.customers.Key は、mysql-server- 1 がコネクター名、inventory はデータベース、customers がテーブルである構造を定義するスキーマの名前です。

2

op

操作のタイプを記述する 必須 文字列。

  • c = create
  • u = update
  • d = delete
  • r = read (最初のスナップショット のみ)

3

before

イベント発生前の行の状態を指定する任意のフィールド。

4

after

イベント発生後の行の状態を指定する任意のフィールド。

5

source

以下を含むイベントのソースメタデータを記述する 必須 フィールド。

  • Debezium のバージョン
  • コネクター名
  • イベントが記録された binlog 名
  • binlog の位置
  • イベント内の行
  • イベントがスナップショットの一部である場合
  • 影響を受けるデータベースおよびテーブルの名前
  • イベントを作成する MySQL スレッドの ID (スナップショット以外)
  • MySQL サーバー ID (利用可能な場合)
  • timestamp
注記

binlog_rows_query_log_events オプションが有効で、コネクターの include.query オプションが有効になっている場合、イベントを生成した元の SQL ステートメントが含まれる クエリー フィールドが表示されます。

6

ts_ms

コネクターがイベントを処理した時間を表示する任意のフィールド。

注記

この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。

サンプルのテーブルと、そのテーブルのスキーマとペイロードがどのように表示されるかを見てみましょう。

テーブルの例

CREATE TABLE customers (
  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  first_name VARCHAR(255) NOT NULL,
  last_name VARCHAR(255) NOT NULL,
  email VARCHAR(255) NOT NULL UNIQUE KEY
) AUTO_INCREMENT=1001;

1.1.4.2.1. 変更イベント値の作成

以下の例は、customers テーブルの 作成 イベントを示しています。

{
  "schema": { 1
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "mysql-server-1.inventory.customers.Value",
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "mysql-server-1.inventory.customers.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "connector"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
          },
          {
            "type": "boolean",
            "optional": true,
            "default": false,
            "field": "snapshot"
          },
          {
            "type": "string",
            "optional": false,
            "field": "db"
          },
          {
            "type": "string",
            "optional": true,
            "field": "table"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "server_id"
          },
          {
            "type": "string",
            "optional": true,
            "field": "gtid"
          },
          {
            "type": "string",
            "optional": false,
            "field": "file"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "pos"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "row"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "thread"
          },
          {
            "type": "string",
            "optional": true,
            "field": "query"
          }
        ],
        "optional": false,
        "name": "io.product.connector.mysql.Source",
        "field": "source"
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      }
    ],
    "optional": false,
    "name": "mysql-server-1.inventory.customers.Envelope"
  },
  "payload": { 2
    "op": "c",
    "ts_ms": 1465491411815,
    "before": null,
    "after": {
      "id": 1004,
      "first_name": "Anne",
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "source": {
      "version": "1.0.3.Final",
      "connector": "mysql",
      "name": "mysql-server-1",
      "ts_ms": 0,
      "snapshot": false,
      "db": "inventory",
      "table": "customers",
      "server_id": 0,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 154,
      "row": 0,
      "thread": 7,
      "query": "INSERT INTO customers (first_name, last_name, email) VALUES ('Anne', 'Kretchmar', 'annek@noanswer.org')"
    }
  }
}
1
このイベントの schema 部分は、エンベロープのスキーマ、ソース構造のスキーマ(MySQL コネクターに固有ですべてのイベントで再利用)、before および after フィールドのテーブル固有のスキーマを表示します。
2
このイベントの payload 部分には、イベント内の情報、行が作成されたことを記述している( op=cであるため)、after フィールドの値には新しい挿入された行の idfirst_namelast_name、および email 列の値が含まれていることを表しています。
1.1.4.2.2. 変更イベント値の更新

customers テーブルの 更新 変更イベントの値には、作成 イベントと同じスキーマがあります。ペイロードは同じように設定されますが、異なる値を保持します。以下に例を示します(書式を調整して読みやすくしてあります)。

{
  "schema": { ... },
  "payload": {
    "before": { 1
      "id": 1004,
      "first_name": "Anne",
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "after": { 2
      "id": 1004,
      "first_name": "Anne Marie",
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "source": { 3
      "version": "1.0.3.Final",
      "name": "mysql-server-1",
      "connector": "mysql",
      "name": "mysql-server-1",
      "ts_ms": 1465581,
      "snapshot": false,
      "db": "inventory",
      "table": "customers",
      "server_id": 223344,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 484,
      "row": 0,
      "thread": 7,
      "query": "UPDATE customers SET first_name='Anne Marie' WHERE id=1004"
    },
    "op": "u", 4
    "ts_ms": 1465581029523 5
  }
}

これを 挿入 イベントの値と比較すると、payload セクションにいくつかの違いがあります。

1
before フィールドは、データベースのコミット前の行と値の状態を表しています。
2
after フィールドは、更新された行の状態を表し、first_name の値は Anne Marie になっています。before と after の構造を比較 する と、コミットが原因で、この行で実際に何が変更されたかを判断できます。
3
source フィールド構造には以前と同じフィールドがありますが、値は異なります(このイベントは binlog の異なる位置にあります)。source 構造には、この変更の MySQL レコードに関する情報が表示されます(トレーサビリティーを提供)。また、このトピックや他のトピックの他のイベントと比較し、他のイベントと同じ MySQL コミットの前、後、または一部でこのイベントが発生したかどうかを確認するための情報もあります。
4
op フィールドの値は u になっており、更新によってこの行が変更されたことを示しています。
5
ts_ms フィールドは、Debezium がこのイベントを処理したときのタイムスタンプを表示します。
注記

行のプライマリーキーまたは一意の鍵の列を更新すると、行のキーの値が変更され、Debezium は 3 つのイベントを出力します。つまり、行の古いキーを持つ DELETE イベントと tombstone イベント、行の新しいキーを持つ INSERT イベントです。

1.1.4.2.3. 変更イベント値の削除

customers テーブルの 削除 変更イベントの値には、作成 および 更新 イベントと同じスキーマがあります。ペイロードは同じように設定されますが、異なる値を保持します。以下に例を示します(書式を調整して読みやすくしてあります)。

{
  "schema": { ... },
  "payload": {
    "before": { 1
      "id": 1004,
      "first_name": "Anne Marie",
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "after": null, 2
    "source": { 3
      "version": "1.0.3.Final",
      "connector": "mysql",
      "name": "mysql-server-1",
      "ts_ms": 1465581,
      "snapshot": false,
      "db": "inventory",
      "table": "customers",
      "server_id": 223344,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 805,
      "row": 0,
      "thread": 7,
      "query": "DELETE FROM customers WHERE id=1004"
    },
    "op": "d", 4
    "ts_ms": 1465581902461 5
  }
}

作成更新 イベントの ペイロード 部分をペイロードと比較すると、いくつかの違いがあります。

1
before フィールドは、データベースのコミットで削除した行の状態を表しています。
2
after フィールドは null で、行が存在しないことを示します。
3
source フィールド構造には以前と同じ値が多数ありますが、ts_sec および pos フィールドは変更されました(他のシナリオでは ファイルが変更された可能性があります)。
4
op フィールドの値は d になっており、この行が削除されたことを示しています。
5
ts_ms は、Debezium がこのイベントを処理したときのタイムスタンプを示します。

このイベントは、この行の削除を処理するのに必要な情報をコンシューマーに提供します。コンシューマーによっては、削除を適切に処理するために古い値が必要になることがあるため、古い値が含まれます。

MySQL コネクターのイベントは、Kafka ログコンパクション と動作するように設計されています。これにより、すべてのキーの最新のメッセージが保持される限り、古いメッセージを削除できます。これにより、トピックに完全なデータセットが含まれ、キーベースの状態のリロードに使用できるようになりました。

行が 削除 された場合でも、Kafka は同じキーを持つ以前のメッセージをすべて削除できるため、上記の 削除 イベント値はログコンパクションで動作します。メッセージの値が null の場合、Kafka は同じキーを持つすべてのメッセージを削除できることを認識します。これを可能にするために、Debezium の MySQL コネクターは、null 値以外で同じキーを持つ特別な廃棄イベントを持つ 削除 イベントに従います。

1.1.5. MySQL コネクターによるデータ型のマッピング方法

Debezium MySQL コネクターは、行が存在するテーブルのように構造化されたイベントで行への変更を表します。イベントには、各列の値のフィールドが含まれます。その列の MySQL データ型によって、その値がイベントでどのように表されるかが決まります。

文字列を格納する列は、文字セットと照合順序を使用して MySQL に定義されます。MySQL コネクターは、binlog イベントの列値のバイナリー表現を読み取るときに、列の文字セットを使用します。以下の表は、MySQL データ型を リテラル 型と セマンティック型 の両方にマップする方法を示しています。

  • リテラル型 : Kafka Connect スキーマ型を使用して値がどのように表されるか。
  • セマンティック型 : Kafka Connect スキーマがどのようにフィールド(スキーマ名)の意味をキャプチャーするか。
MySQL 型リテラル型セマンティック型

BOOLEAN, BOOL

BOOLEAN

該当なし

BIT(1)

BOOLEAN

該当なし

BIT(>1)

BYTES

io.debezium.data.Bits

注記

length スキーマパラメーターには、ビット数を表す整数が含まれます。byte[] にはビットが リトルエンディアン 形式で含まれ、指定数のビットが含まれるようにサイズが指定されます。

例(n がビット)

numBytes = n/8 + (n%8== 0 ? 0 : 1)

TINYINT

INT16

該当なし

SMALLINT[(M)]

INT16

該当なし

MEDIUMINT[(M)]

INT32

該当なし

INT, INTEGER[(M)]

INT32

該当なし

BIGINT[(M)]

INT64

該当なし

REAL[(M,D)]

FLOAT32

該当なし

FLOAT[(M,D)]

FLOAT64

該当なし

DOUBLE[(M,D)]

FLOAT64

該当なし

CHAR(M)]

STRING

該当なし

VARCHAR(M)]

STRING

該当なし

BINARY(M)]

BYTES

該当なし

VARBINARY(M)]

BYTES

該当なし

TINYBLOB

BYTES

該当なし

TINYTEXT

STRING

該当なし

BLOB

BYTES

該当なし

TEXT

STRING

該当なし

MEDIUMBLOB

BYTES

該当なし

MEDIUMTEXT

STRING

該当なし

LONGBLOB

BYTES

該当なし

LONGTEXT

STRING

該当なし

JSON

STRING

io.debezium.data.Json

注記

JSON ドキュメント、配列、またはスケーラーの文字列表現が含まれます。

ENUM

STRING

io.debezium.data.Enum

注記

許可されるスキーマパラメーターには、許可さ れる値のコンマ区切りリストが含まれます。

SET

STRING

io.debezium.data.EnumSet

注記

許可されるスキーマパラメーターには、許可さ れる値のコンマ区切りリストが含まれます。

YEAR[(2|4)]

INT32

io.debezium.time.Year

TIMESTAMP[(M)]

STRING

io.debezium.time.ZonedTimestamp

注記

マイクロ秒の精度を持つ ISO 8601 形式。MySQL では、M0-6 の範囲にすることができます。

1.1.5.1. 時間値

TIMESTAMP データ型を除き、MySQL の時間型は time.precision.mode 設定プロパティーの値によって異なります。

ヒント

詳細は MySQL コネクター設定プロパティー を参照してください。

タイムゾーンのない時間値は、UTC からミリ秒またはマイクロ秒(DATETIME)または設定されたデータベースタイムゾーン(TIMESTAMP)に変換されます。

  • 値が 2018-06-20 06:37:03DATETIME は、1529476623000 になります。
  • 値が 2018-06-20 06:37:03TIMESTAMP2018-06-20T13:37:03Z になります。
注記

MySQL では、DATE DATE、DATETIME、および TIMESTAMP 列にゼロの値を使用できます。これは、null 値よりも優先されることがあります。ただし、MySQL コネクターは、列定義が null を許可する場合、または列が null を許可しない場合はエポック日として、それらを null 値として表します。

time.precision.mode=adaptive_time_microseconds(default)

MySQL コネクターは、イベントがデータベースの値を正確に表すように、列のデータ型定義に基づいてリテラル型とセマンティック型を決定します。すべての時間フィールドはマイクロ秒です。

MySQL 型リテラル型セマンティック型

DATE

INT32

io.debezium.time.Date

注記

エポックからの日数を表します。

TIME[(M)]

INT64

io.debezium.time.MicroTime

注記

時間の値をマイクロ秒単位で表し、タイムゾーン情報は含まれません。MySQL では、M0-6 の範囲にすることができます。

DATETIME, DATETIME(0), DATETIME(1), DATETIME(2), DATETIME(3)

INT64

io.debezium.time.Timestamp

注記

エポックからの経過時間をミリ秒で表し、タイムゾーン情報は含まれません。

DATETIME(4), DATETIME(5), DATETIME(6)

INT64

io.debezium.time.MicroTimestamp

注記

エポックからの経過時間をマイクロ秒で表し、タイムゾーン情報は含まれません。

time.precision.mode=connect

MySQL コネクターは事前定義された Kafka Connect の論理型を使用します。この方法はデフォルトの方法よりも精度が低く、データベース列に 3 を超える 少数秒の精度値がある場合は、イベントの精度が低くなる可能性があります。

MySQL 型リテラル型セマンティック型

DATE

INT32

org.apache.kafka.connect.data.Date

注記

エポックからの日数を表します。

TIME[(M)]

INT64

org.apache.kafka.connect.data.Time

注記

午前 0 時以降の時間値をマイクロ秒で表し、タイムゾーン情報は含まれません。

DATETIME[(M)]

INT64

org.apache.kafka.connect.data.Timestamp

注記

エポックからの経過時間をミリ秒で表し、タイムゾーン情報は含まれません。

1.1.5.2. 10 進数値

10 進数は decimal.handling.mode プロパティーで処理されます。

ヒント

詳細は MySQL コネクター設定プロパティー を参照してください。

decimal.handling.mode=precise
MySQL 型リテラル型セマンティック型

NUMERIC[(M[,D])]

BYTES

org.apache.kafka.connect.data.Decimal

注記

scale スキーマパラメーターには、小数点を移動した桁数を表す整数が含まれます。

DECIMAL[(M[,D])]

BYTES

org.apache.kafka.connect.data.Decimal

注記

scale スキーマパラメーターには、小数点を移動した桁数を表す整数が含まれます。

decimal.handling.mode=double
MySQL 型リテラル型セマンティック型

NUMERIC[(M[,D])]

FLOAT64

該当なし

DECIMAL[(M[,D])]

FLOAT64

該当なし

decimal.handling.mode=string
MySQL 型リテラル型セマンティック型

NUMERIC[(M[,D])]

STRING

該当なし

DECIMAL[(M[,D])]

STRING

該当なし

1.1.5.3. 空間データ型

現在、Debezium MySQL コネクターは以下の空間データ型をサポートしています。

MySQL 型リテラル型セマンティック型

GEOMETRY, LINESTRING, POLYGON, MULTIPOINT, MULTILINESTRING, MULTIPOLYGON, GEOMETRYCOLLECTION

STRUCT

io.debezium.data.geometry.Geometry

注記

2 つのフィールドを持つ構造が含まれます。

  • srid (INT32: 構造に保存されたジオメトリーオブジェクトの型を定義する、空間参照システム ID)。
  • wkb (BYTES): Well-Known-Binary (wkb)形式でエンコードされたジオメトリーオブジェクトのバイナリー表現。詳細は、Open Geospatial Consortium を参照してください。

1.1.6. MySQL コネクターおよび Kafka トピック

Debezium MySQL コネクターは、すべての INSERTUPDATEDELETE 操作のイベントを 1 つのテーブルから単一の Kafka トピックに書き込みます。Kafka トピックの命名規則は次のとおりです。

format

serverName.databaseName.tableName

例1.1 例を示します。

fulfillment がサーバー名、inventory には、顧客、および 製品3 つ のテーブルが含まれるデータベースがあるとします。Debezium MySQL コネクターは、データベースのテーブルごとに 1 つずつ、3 つの Kafka トピックでイベントを生成します。

fulfillment.inventory.orders

fulfillment.inventory.customers

fulfillment.inventory.products

1.1.7. MySQL がサポートするトポロジー

Debezium MySQL コネクターは以下の MySQL トポロジーをサポートします。

スタンドアロン
単一の MySQL サーバーを使用する場合は、Debezium MySQL コネクターがサーバーを監視できるように、binlog を有効 (および任意で GTID を有効) にする必要があります。バイナリーログも増分 バックアップ として使用できるため、これは多くの場合で許容されます。この場合、MySQL コネクターは常にこのスタンドアロン MySQL サーバーインスタンスに接続し、それに従います。
マスターおよびスレーブ

Debezium MySQL コネクターはマスターまたはスレーブの 1 つ(スレーブのbinlog が有効になっている場合)に従うことができますが、コネクターはそのサーバーに表示されるクラスターの変更のみを確認します。通常、これはマルチマスタートポロジー以外の問題ではありません。

コネクターは、サーバーの binlog の位置を記録します。この位置は、クラスターの各サーバーごとに異なります。そのため、コネクターは 1 つの MySQL サーバーインスタンスのみに従う必要があります。そのサーバーに障害が発生した場合は、コネクターを続行する前に再起動または復元する必要があります。

高可用性クラスター
MySQL にはさまざまな 高可用性 ソリューションがあり、問題や障害からほぼ簡単に回復できます。ほとんどの HA MySQL クラスターは GTID を使用するため、スレーブはいずれかのマスター上のすべての変更を追跡できます。
multi-master

マルチマスター MySQL トポロジー は、複数のマスターからそれぞれを複製する 1 つ以上の MySQL スレーブ を使用します。これは、複数の MySQL クラスターのレプリケーションを集約する強力な方法であり、GTID を使用する必要があります。

Debezium MySQL コネクターはこれらのマルチマスター MySQL スレーブをソースとして使用し、新しいスレーブが古いスレーブにキャッチされている限り、異なるマルチマスター MySQL スレーブにフェイルオーバーできます(たとえば、新しいスレーブには最初のスレーブで最後に確認されたすべてのトランザクションがあります)。これは、新しいマルチマスター MySQL スレーブへの再接続を試み、binlog で正しい場所を見つけようとする際に、特定の GTID ソースを追加または除外するようにコネクターを設定することができるため、コネクターがデータベースやテーブルのサブセットのみを使用している場合でも機能します。

ホステッド

Debezium MySQL コネクターが Amazon RDS や Amazon Aurora などのホステッドオプションを使用するためのサポートがあります。

重要

これらのホストオプションは グローバル読み取り ロックを許可しないため、テーブルレベルのロックを使用して 整合性スナップショット を作成します。

Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.