第2章 MySQL 用 Debezium コネクター
MySQL にはバイナリーログ(binlog)があり、このログは、データベースにコミットされる順序ですべての操作を記録します。これには、テーブルスキーマの変更やテーブル内のデータが含まれます。MySQL はレプリケーションおよびリカバリーに binlog を使用します。
MySQL コネクターは binlog を読み取り、行レベル INSERT
、、および DELETE
操作の変更イベントを生成し UPDATE
、Kafka トピックに変更イベントを記録します。クライアントアプリケーションは、これらの Kafka トピックを読み取ります。
MySQL は通常、指定した時間が経過した後に binlog をパージするように設定されるため、MySQL コネクターは各データベースの初回の スナップショット を実行します。MySQL コネクターは、スナップショットの作成元から binlog を読み取ります。
2.1. MySQL コネクターの仕組みの概要
Debezium MySQL コネクターはテーブルの構造を追跡し、スナップショットを実行し、binlog イベントを Debezium 変更イベント、Kafka に記録されるレコードに変換します。
2.1.1. MySQL コネクターがデータベーススキーマを使用する方法
データベースクライアントがデータベースをクエリーする場合、クライアントはデータベースの現在のスキーマを使用します。しかし、データベーススキーマはいつでも変更できます。つまり、コネクターは、各挿入、更新、または削除の操作が記録されたときにスキーマを特定できる必要があります。また、コネクターは比較的古いイベントを処理し、テーブルのスキーマの変更前に記録された可能性があるため、現在のスキーマのみを使用することができません。
これを処理するため、MySQL は binlog にデータへの行レベルの変更と、データベースに適用される DDL ステートメントが含まれます。コネクターが binlog を読み取り、これらの DDL ステートメントを渡すと、それらを解析して各テーブルのスキーマのメモリー内表現を更新します。コネクターはこのスキーマ表現を使用して、それぞれの挿入、更新、または削除時にテーブルの構造を特定し、適切な変更イベントを生成します。コネクターは、個別のデータベース履歴 Kafka トピックで、各 DDL ステートメントが表示される binlog の位置とともにすべての DDL ステートメントを記録します。
コネクターがクラッシュするか、または正常に停止された後にコネクターを再起動すると、コネクターは特定の時点から特定の位置から binlog の読み取りを開始します。コネクターは、データベース履歴 Kafka トピックを読み取り、すべての DDL ステートメントをコネクターが開始する binlog のポイントに解析することで、この時点で存在していたテーブル構造を再ビルドします。
このデータベース履歴トピックは、コネクターが使用するためにのみ使用されます。コネクターは任意で、コンシューマーアプリケーション向けの異なるトピックでスキーマ変更イベントを生成できます。これは、MySQL コネクターがスキーマ変更トピックを処理する方法 に説明されています。
MySQL コネクターが、gh-ost
やなどのスキーマ変更ツールの変更をキャプチャーする際に、移行プロセス中に作成 pt-online-schema-change
されたヘルパーテーブルをホワイトリストのテーブルに含める必要があります。
ダウンストリームシステムでは、一時テーブルによって生成されたメッセージが必要ない場合は、単純なメッセージ変換を書き込んで適用して、それらをフィルターすることができます。
トピックの命名規則の詳細は、「 MySQL コネクターおよび Kafka トピック 」を参照してください。
2.1.2. MySQL コネクターによるデータベーススナップショットの実行方法
Debezium MySQL コネクターが最初に起動すると、データベースの初期 一貫性のあるスナップショット を実行します。以下のフローは、このスナップショットがどのように完了するかを説明します。
これは、snapshot.mode
プロパティー initial
でとして設定されるデフォルトのスナップショットモードです。他のスナップショットモードについては、MySQL コネクター設定プロパティー をご確認ください。
- コネクター
手順 | アクション |
---|---|
| 他のデータベースクライアントによる 書き込み をブロックする グローバル読み取りロック を取得します。 注記 このスナップショット自体は、binlog の位置とテーブルスキーマの読み取りの試行に干渉する可能性がある他のクライアントが DDL を適用できないようにする訳ではありません。グローバル読み取りロックは、後でリリースされる前に binlog の位置が読み込まれる間に保持されます。 |
| 繰り返し可能な読み取りセマンティクス でトランザクションを開始し、トランザクション内のすべての後続の読み取りが 一貫したスナップショット に対して実行されるようにします。 |
| 現在の binlog 位置を読み取ります。 |
| コネクターの設定によって許可されるデータベースおよびテーブルのスキーマを読み取ります。 |
| グローバル読み取りロック を解放します。これにより、他のデータベースクライアントがデータベースに書き込むことができるようになります。 |
|
必要なすべての 注記 これは該当する場合に発生します。 |
|
データベーステーブルをスキャンし、各行に関連するテーブル固有の Kafka トピックで |
| トランザクションをコミットします。 |
| コネクターオフセットで完了したスナップショットを記録します。 |
2.1.2.1. コネクターが失敗した場合、どうなりますか?
初期のスナップショット 作成中にコネクターが失敗するか、停止するか、またはリバランスされると、コネクターは再起動後に新しいスナップショットを作成します。意図しない スナップショット が完了すると、Debezium MySQL コネクターは binlog 内の同じ位置から再起動するため、更新が失われないようにします。
コネクターが長い期間停止すると、MySQL は古い binlog ファイルをパージする可能性があり、コネクターの位置は失われます。位置が失われた場合、コネクターは開始位置のために 最初のスナップショット に戻ります。Debezium MySQL コネクターのトラブルシューティングに関する詳細は、「 MySQL connector common issues 」を参照してください。
2.1.2.2. グローバル読み取りロックが許可されていない場合は?
一部の環境では、グローバル読み取りロック が許可されない場合があります。Debezium MySQL コネクターがグローバル読み取りロックが許可されないことを検出すると、コネクターは代わりにテーブルレベルのロックを使用して、この方法でスナップショットを実行します。
ユーザーには LOCK_TABLES
権限が必要です。
- コネクター
手順 | アクション |
---|---|
| 繰り返し可能な読み取りセマンティクス でトランザクションを開始し、トランザクション内のすべての後続の読み取りが 一貫したスナップショット に対して実行されるようにします。 |
| データベースおよびテーブルの名前を読み取り、フィルタリングします。 |
| 現在の binlog 位置を読み取ります。 |
| コネクターの設定によって許可されるデータベースおよびテーブルのスキーマを読み取ります。 |
|
必要なすべての 注記 これは該当する場合に発生します。 |
|
データベーステーブルをスキャンし、各行に関連するテーブル固有の Kafka トピックで |
| トランザクションをコミットします。 |
| テーブルレベルのロックを解放します。 |
| コネクターオフセットで完了したスナップショットを記録します。 |
2.1.3. MySQL コネクターによるスキーマ変更のトピックの処理方法
Debezium MySQL コネクターを設定して、MySQL サーバーのデータベースに適用されるすべての DDL ステートメントが含まれるスキーマ変更イベントを生成することができます。コネクターはこれらのすべてのイベントをという名前の Kafka トピックに書き込みます <serverName>
serverName
。は、database.server.name
設定プロパティーに指定されたコネクターの名前になります。
スキーマ変更イベント の使用を選択した場合は、スキーマ変更トピックを使用し、データベースの履歴トピックは使用し ない でください。
データベーススキーマ履歴には、イベントのグローバルな順序が存在することが重要です。そのため、データベース履歴トピックはパーティションを設定することはできません。これは、このトピックの作成時に 1 のパーティション数を指定する必要があることを意味します。自動トピックの作成に依存する場合は、Kafka の num.partitions
設定オプション(デフォルトのパーティション数)がに設定されていることを確認してください 1
。
2.1.3.1. スキーマ変更トピック構造
スキーマ変更トピックに書き込まれる各メッセージには、DDL ステートメントの適用時に使用される接続されたデータベースの名前が含まれるメッセージキーが含まれています。
{ "schema": { "type": "struct", "name": "io.debezium.connector.mysql.SchemaChangeKey", "optional": false, "fields": [ { "field": "databaseName", "type": "string", "optional": false } ] }, "payload": { "databaseName": "inventory" } }
スキーマ変更イベントメッセージの値には、DDL ステートメント、ステートメントが適用されるデータベース、ステートメントが表示された binlog の位置が含まれる構造が含まれます。
{ "schema": { "type": "struct", "name": "io.debezium.connector.mysql.SchemaChangeValue", "optional": false, "fields": [ { "field": "databaseName", "type": "string", "optional": false }, { "field": "ddl", "type": "string", "optional": false }, { "field": "source", "type": "struct", "name": "io.debezium.connector.mysql.Source", "optional": false, "fields": [ { "type": "string", "optional": true, "field": "version" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "server_id" }, { "type": "int64", "optional": false, "field": "ts_sec" }, { "type": "string", "optional": true, "field": "gtid" }, { "type": "string", "optional": false, "field": "file" }, { "type": "int64", "optional": false, "field": "pos" }, { "type": "int32", "optional": false, "field": "row" }, { "type": "boolean", "optional": true, "default": false, "field": "snapshot" }, { "type": "int64", "optional": true, "field": "thread" }, { "type": "string", "optional": true, "field": "db" }, { "type": "string", "optional": true, "field": "table" }, { "type": "string", "optional": true, "field": "query" } ] } ] }, "payload": { "databaseName": "inventory", "ddl": "CREATE TABLE products ( id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, name VARCHAR(255) NOT NULL, description VARCHAR(512), weight FLOAT ); ALTER TABLE products AUTO_INCREMENT = 101;", "source" : { "version": "0.10.0.Beta4", "name": "mysql-server-1", "server_id": 0, "ts_sec": 0, "gtid": null, "file": "mysql-bin.000003", "pos": 154, "row": 0, "snapshot": true, "thread": null, "db": null, "table": null, "query": null } } }
2.1.3.1.1. スキーマ変更のトピックに関する重要なヒント
この ddl
フィールドには複数の DDL ステートメントを含めることができます。すべてのステートメントは databaseName
フィールドのデータベースに適用され、データベースで適用された順序と同じ順序で表示されます。source
フィールドは、テーブル固有のトピックに書き込まれた標準データ変更イベントとして構成されます。このフィールドは、異なるトピックでイベントを関連付けるのに役立ちます。
.... "payload": { "databaseName": "inventory", "ddl": "CREATE TABLE products ( id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,... "source" : { .... } } ....
- クライアントが 複数のデータベースに DDL ステートメントを送信する場合はどうすればよいです か?
- MySQL がアトミックに適用される場合、コネクターは DDL ステートメントを順番に取得し、データベースを基にグループ化し、各グループのスキーマ変更イベントを作成します。
- MySQL が個別に適用される場合、コネクターはステートメントごとに個別のスキーマ変更イベントを作成します。
関連情報
- ここで説明する スキーマ変更トピック を使用しない場合は、データベース履歴トピックを確認してください。
2.1.4. MySQL コネクターイベント
Debezium MySQL コネクターによって生成されたすべてのデータ変更イベントには、キーと値が含まれます。変更イベントキーおよび変更イベント値には、スキーマ とペイロードの構造がスキーマと ペイロード が含まれ、ペイロードにはデータが含まれます。
MySQL コネクターは、すべての Kafka Connect スキーマ名が Avro スキーマ名の形式 に準拠するようにします。これは、ラピン文字やアンダースコアではない文字がアンダースコアに置き換えられているため、論理サーバー名、データベース名、およびテーブル名のコンテナー名がこれらのアンダースコアに置き換えられると予期せぬ競合が生じる可能性があります。
2.1.4.1. Change イベントキー
変更イベントのキーには、イベントの作成時にの PRIMARY KEY
(または一意の制約)内の各列のフィールドが含まれる構造があります。例のテーブルとテーブルのスキーマとペイロードがどのように表示されるかを見てみましょう。
テーブルの例
CREATE TABLE customers ( id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, first_name VARCHAR(255) NOT NULL, last_name VARCHAR(255) NOT NULL, email VARCHAR(255) NOT NULL UNIQUE KEY ) AUTO_INCREMENT=1001;
変更イベントキーの例
{ "schema": { 1 "type": "struct", "name": "mysql-server-1.inventory.customers.Key", 2 "optional": false, 3 "fields": [ 4 { "field": "id", "type": "int32", "optional": false } ] }, "payload": { 5 "id": 1001 } }
このキー inventory.customers
は、プライマリーキーコラムに値がであるコネクターの外部に mysql-server-1
id
ある行を記述し 1001
ます。
2.1.4.2. イベント値の変更
変更イベントの値には、スキーマとペイロードセクションが含まれます。変更イベント値には、3 つのタイプのイベント構造があります。この構造のフィールドは以下で説明され、各変更イベント値の例でマークされます。
確認項目 | フィールド名 | 説明 |
---|---|---|
1 |
|
|
2 |
| 操作のタイプを記述する 必須 の文字列。 値
|
3 |
| イベント発生前の行の状態を指定するオプションのフィールド。 |
4 |
| イベント発生後の行の状態を指定するオプションのフィールド。 |
5 |
| 以下を含む、イベントのソースメタデータを記述する 必須 フィールドです。
注記
binlog _rows_query_log_events オプションが有効で、コネクターで |
6 |
| コネクターがイベントを処理した時間を表示するオプションのフィールド。 注記 この時間は、Kafka Connect タスクを実行している JVM のシステムクロックに基づいています。 |
例のテーブルとテーブルのスキーマとペイロードがどのように表示されるかを見てみましょう。
テーブルの例
CREATE TABLE customers ( id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, first_name VARCHAR(255) NOT NULL, last_name VARCHAR(255) NOT NULL, email VARCHAR(255) NOT NULL UNIQUE KEY ) AUTO_INCREMENT=1001;
2.1.4.2.1. イベント値の変更の作成
以下の例は、customers
テーブルの 作成 イベントを示しています。
{ "schema": { 1 "type": "struct", "fields": [ { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "first_name" }, { "type": "string", "optional": false, "field": "last_name" }, { "type": "string", "optional": false, "field": "email" } ], "optional": true, "name": "mysql-server-1.inventory.customers.Value", "field": "before" }, { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "first_name" }, { "type": "string", "optional": false, "field": "last_name" }, { "type": "string", "optional": false, "field": "email" } ], "optional": true, "name": "mysql-server-1.inventory.customers.Value", "field": "after" }, { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "version" }, { "type": "string", "optional": false, "field": "connector" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "ts_ms" }, { "type": "boolean", "optional": true, "default": false, "field": "snapshot" }, { "type": "string", "optional": false, "field": "db" }, { "type": "string", "optional": true, "field": "table" }, { "type": "int64", "optional": false, "field": "server_id" }, { "type": "string", "optional": true, "field": "gtid" }, { "type": "string", "optional": false, "field": "file" }, { "type": "int64", "optional": false, "field": "pos" }, { "type": "int32", "optional": false, "field": "row" }, { "type": "int64", "optional": true, "field": "thread" }, { "type": "string", "optional": true, "field": "query" } ], "optional": false, "name": "io.product.connector.mysql.Source", "field": "source" }, { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" } ], "optional": false, "name": "mysql-server-1.inventory.customers.Envelope" }, "payload": { 2 "op": "c", "ts_ms": 1465491411815, "before": null, "after": { "id": 1004, "first_name": "Anne", "last_name": "Kretchmar", "email": "annek@noanswer.org" }, "source": { "version": "1.1.2.Final", "connector": "mysql", "name": "mysql-server-1", "ts_ms": 0, "snapshot": false, "db": "inventory", "table": "customers", "server_id": 0, "gtid": null, "file": "mysql-bin.000003", "pos": 154, "row": 0, "thread": 7, "query": "INSERT INTO customers (first_name, last_name, email) VALUES ('Anne', 'Kretchmar', 'annek@noanswer.org')" } } }
2.1.4.2.2. Update change event value
customers
テーブルの 更新 変更イベントの値には、作成 イベントと同じスキーマがあります。ペイロードは同じですが、異なる値を保持します。以下に例を示します(読みやすさを考慮したフォーマット)。
{ "schema": { ... }, "payload": { "before": { 1 "id": 1004, "first_name": "Anne", "last_name": "Kretchmar", "email": "annek@noanswer.org" }, "after": { 2 "id": 1004, "first_name": "Anne Marie", "last_name": "Kretchmar", "email": "annek@noanswer.org" }, "source": { 3 "version": "1.1.2.Final", "name": "mysql-server-1", "connector": "mysql", "name": "mysql-server-1", "ts_ms": 1465581, "snapshot": false, "db": "inventory", "table": "customers", "server_id": 223344, "gtid": null, "file": "mysql-bin.000003", "pos": 484, "row": 0, "thread": 7, "query": "UPDATE customers SET first_name='Anne Marie' WHERE id=1004" }, "op": "u", 4 "ts_ms": 1465581029523 5 } }
これと 挿入 イベントの値を比較すると、payload
セクションにいくつかの違いを確認できます。
- 1
- この
before
フィールドには、データベースのコミットの前に値が含まれる行の状態が含まれるようになりました。 - 2
after
フィールドは行の更新された状態となり、first_name
値はになりましAnne Marie
た。before
とafter
構造を比較して、コミットが原因でこの行で実際に何が変更されているかを判断できます。- 3
source
フィールド構造は以前と同じフィールドを持ちますが、値は異なります(このイベントは binlog の別の位置にあります)。source
構造は、この変更の MySQL の記録に関する情報を表示します(トレース可能性の向上)。また、このイベントや他のトピックの他のイベントと比較し、このイベントが他のイベントとして同じ MySQL コミットの前、後、またはの一部として実行されたかどうかを確認するためにも使用できます。- 4
- これで
op
フィールド値はになりu
、更新によりこの行が変更されていることを確認できます。 - 5
- この
ts_ms
フィールドは、Debezium がこのイベントを処理したときにタイムスタンプを表示します。
行のプライマリーキーまたは一意の鍵の列が更新され、Debezium は行のキーが変更され、Debezium は DELETE イベントと行の古いキーを持つ tombstone イベントを出力し、その後の行の新しいキーを含む INSERT イベントを出力します。
2.1.4.2.3. Delete change event value(変更イベント値の削除)
customers
テーブルの delete 変更イベントの値は、作成 および 更新 イベントと全く同じスキーマを持ちます。ペイロードは同じですが、異なる値を保持します。以下に例を示します(読みやすさを考慮したフォーマット)。
{ "schema": { ... }, "payload": { "before": { 1 "id": 1004, "first_name": "Anne Marie", "last_name": "Kretchmar", "email": "annek@noanswer.org" }, "after": null, 2 "source": { 3 "version": "1.1.2.Final", "connector": "mysql", "name": "mysql-server-1", "ts_ms": 1465581, "snapshot": false, "db": "inventory", "table": "customers", "server_id": 223344, "gtid": null, "file": "mysql-bin.000003", "pos": 805, "row": 0, "thread": 7, "query": "DELETE FROM customers WHERE id=1004" }, "op": "d", 4 "ts_ms": 1465581902461 5 } }
作成 および 更新 イベントのペイロードと payload
部分を比較すると、いくつかの違いを確認できます。
このイベントは、この行の削除を処理するために必要な情報をコンシューマーに提供します。一部のコンシューマーでは削除を適切に処理するために必要なため、古い値が含まれます。
MySQL コネクターのイベントは、Kafka ログの圧縮 と連携するように設計されています。これにより、すべてのキーの少なくとも最新のメッセージが保持されていれば、古いメッセージを削除することができます。これにより、Kafka はストレージ領域を回収でき、トピックに完全なデータセットが含まれ、キーベースの状態のリロードに使用できます。
行が削除されても、上記の delete イベント値はログの圧縮で機能します。これは、Kafka はその鍵を使用して以前のメッセージをすべて削除できるためです。message 値がの場合 null
、Kafka は同じキーを持つすべてのメッセージを削除できることを認識します。これを可能にするため、Debezium の MySQL コネクターは、常に同じキーではなく null
値を持つ特別な tombstone イベントを持つ 削除 イベントにしたがいます。
2.1.5. MySQL コネクターがデータタイプをマップする方法
Debezium MySQL コネクターは、行が存在するテーブルのように構造化されたイベントを持つ行への変更を表します。イベントには、各列値のフィールドが含まれます。その列の MySQL データタイプは、値をイベントでどのように表すかを決定します。
文字列を保存する列は、文字セットと照合で MySQL で定義されます。MySQL コネクターは、binlog イベント内の列値のバイナリー表現を読み取るときに、列の文字セットを使用します。以下の表は、コネクターが MySQL データタイプを リテラル タイプとセマンティクスタイプの両方にマップする方法を示し て います。
- literal タイプ : Kafka Connect スキーマタイプを使用して値がどのように表されるか
- semantic type : Kafka Connect スキーマがフィールドの意味をキャプチャーする方法(スキーマ名)
MySQL タイプ | リテラルタイプ | セマンティクスタイプ |
---|---|---|
|
| 該当なし |
|
| 該当なし |
|
|
注記
例(n がビット)
numBytes = n/8 + (n%8== 0 ? 0 : 1) |
|
| 該当なし |
|
| 該当なし |
|
| 該当なし |
|
| 該当なし |
|
| 該当なし |
|
| 該当なし |
|
| 該当なし |
|
| 該当なし |
|
| 該当なし |
|
| 該当なし |
|
| 該当なし |
|
| 該当なし |
|
| 該当なし |
|
| 該当なし |
|
| 該当なし |
|
| 該当なし |
|
| 該当なし |
|
| 該当なし |
|
| 該当なし |
|
| 該当なし |
|
|
注記
|
|
|
注記
|
|
|
注記
|
|
|
|
|
|
注記
ISO 8601 形式(マイクロ秒の精度)MySQL では |
2.1.5.1. 一時的な値
TIMESTAMP
データタイプを除外する MySQL の一時タイプは、time.precision.mode
設定プロパティーの値によって異なります。デフォルト値は CURRENT_TIMESTAMP
or として指定された TIMESTAMP
列の場合 NOW
、値 1970-01-01 00:00:00
は Kafka Connect スキーマのデフォルト値として使用されます。
MySQL は、ゼロ値が null 値よりも優先されるため DATE, `DATETIME
、、および TIMESTAMP
列の値にゼロ/値を許可します。MySQL コネクターは、列定義が null 値を許可する場合は null 値、またはコラムが null 値を許可しない場合はエポック日としてゼロ値を表します。
タイムゾーンのない一時的な値
DATETIME
タイプは、「2018-01-13 09:48:27」などのローカル日時を表します。ご覧のとおり、タイムゾーンの情報はありません。このような列は、UTC を使用して列の精度に基づいてエポックミリ秒またはマイクロ秒に変換されます。TIMESTAMP
タイプはタイムゾーン情報がないタイムスタンプを表し、値を書き込む際にサーバー(またはセッションの)タイムゾーンから UTC へ MySQL により変換されます。以下に例を示します。
-
DATETIME
の値が2018-06-20 06:37:03
になり1529476623000
ます。 -
TIMESTAMP
の値が2018-06-20 06:37:03
になり2018-06-20T13:37:03Z
ます。
このような列は、サーバー(またはセッション)の現在のタイムゾーンを基に UTC io.debezium.time.ZonedTimestamp
で同等の形式に変換されます。デフォルトでは、タイムゾーンはサーバーからクエリーされます。これに失敗する場合、database.serverTimezone
コネクター設定プロパティーによって明示的に指定する必要があります。たとえば、データベースのタイムゾーン(グローバルで、またはでコネクターに設定されている場合 database.serverTimezone property
)が「America/Los_Angeles」である場合、TIMESTAMP の値「2018-06-20 06:37:03」は「2018-06-20T13:37:03Z」の ZonedTimestamp
値で表されます。
Kafka Connect および Debezium を実行している JVM のタイムゾーンはこれらの変換には影響しないことに注意してください。
用語値に関連するプロパティーの詳細は、MySQL コネクター設定プロパティー のドキュメントを参照してください。
- time.precision.mode=adaptive_time_microseconds(default)
MySQL コネクターは、リテラルタイプとセマンティクスタイプを列のデータタイプ定義に基づいて決定し、イベントがデータベースの正確な値を表すようにします。すべての時間フィールドはマイクロ秒単位です。の範囲
TIME
にある正の値のみ00:00:00.000000
を正しくキャプチャー23:59:59.999999
できます。MySQL タイプ リテラルタイプ セマンティクスタイプ DATE
INT32
io.debezium.time.Date
注記エポックからの日数を表します。
TIME[(M)]
INT64
io.debezium.time.MicroTime
注記タイムゾーン情報が含まれ、時間の値をマイクロ秒単位で表します。MySQL では
M
、の範囲を使用でき0-6
ます。DATETIME, DATETIME(0), DATETIME(1), DATETIME(2), DATETIME(3)
INT64
io.debezium.time.Timestamp
注記エポックからの経過時間(ミリ秒単位)を表し、タイムゾーン情報が含まれません。
DATETIME(4), DATETIME(5), DATETIME(6)
INT64
io.debezium.time.MicroTimestamp
注記過去のエポックに対するマイクロ秒数を表し、タイムゾーン情報が含まれません。
- time.precision.mode=connect
MySQL コネクターは事前定義された Kafka Connect の論理タイプを使用します。このアプローチはデフォルトのアプローチと比べると、データベース列に分 数秒の精度の値が大きい場合にイベントの精度 が低下する可能性があり
3
ます。を処理23:59:59.999
できるのは00:00:00.000
の値だけです。テーブルのTIME
値がサポートされる範囲を超えない場合にtime.precision.mode=connect
のみ設定します。このconnect
設定は、今後の Debezium で削除される予定です。MySQL タイプ リテラルタイプ セマンティクスタイプ DATE
INT32
org.apache.kafka.connect.data.Date
注記エポックからの日数を表します。
TIME[(M)]
INT64
org.apache.kafka.connect.data.Time
注記中間からの時間(マイクロ秒単位)を表し、タイムゾーン情報が含まれません。
DATETIME[(M)]
INT64
org.apache.kafka.connect.data.Timestamp
注記エポックからの経過時間(ミリ秒単位)を表し、タイムゾーン情報が含まれません。
== 10 進数値
10 進数はプロパティーを使用して処理され decimal.handling.mode
ます。
詳細は、「 MySQL コネクター設定プロパティー 」を参照してください。
- decimal.handling.mode=precise
MySQL タイプ リテラルタイプ セマンティクスタイプ NUMERIC[(M[,D])]
BYTES
org.apache.kafka.connect.data.Decimal
注記scale
schema パラメーターには、押す 10 点の数字を表す整数が含まれます。DECIMAL[(M[,D])]
BYTES
org.apache.kafka.connect.data.Decimal
注記scale
schema パラメーターには、押す 10 点の数字を表す整数が含まれます。- decimal.handling.mode=double
MySQL タイプ リテラルタイプ セマンティクスタイプ NUMERIC[(M[,D])]
FLOAT64
該当なし
DECIMAL[(M[,D])]
FLOAT64
該当なし
- decimal.handling.mode=string
MySQL タイプ リテラルタイプ セマンティクスタイプ NUMERIC[(M[,D])]
STRING
該当なし
DECIMAL[(M[,D])]
STRING
該当なし
2.1.5.2. ブール値
MySQL は特定の方法で BOOLEAN
値を内部で処理します。BOOLEAN
列は内部的に TINYINT(1)
datatype にマッピングされます。テーブルがストリーミング中に作成されると、Debezium が元の DDL を受け取るため、適切な BOOLEAN
マッピングを使用します。スナップショットの Debezium が SHOW CREATE TABLE
を実行してテーブル定義を取得し、そのテーブル定義が BOOLEAN
および TINYINT(1)
列 TINYINT(1)
の両方に対して返されます。
その後、Debezium には元のタイプマッピングの取得方法がなく、にマッピングされ TINYINT(1)
ます。
設定例を以下に示します。
converters=boolean boolean.type=io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter boolean.selector=db1.table1.*, db1.table2.column1
2.1.5.3. 重要データタイプ
現在、Debezium MySQL コネクターは以下のデータタイプをサポートします。
MySQL タイプ | リテラルタイプ | セマンティクスタイプ |
---|---|---|
|
|
注記 2 つのフィールドが含まれる構造が含まれています。
|
2.1.6. MySQL コネクターおよび Kafka トピック
Debezium MySQL コネクターは INSERT
UPDATE
、イベントを 1 つの DELETE
テーブルから 1 つの Kafka トピックに書き込みます。Kafka トピックの命名規則は以下のとおりです。
format
serverName.databaseName.tableName
例2.1 example
サーバー名で、fulfillment
は、、およびの 3 つ のテーブル inventory
が含まれるデータベース orders
と customers
し products
ます。Debezium MySQL コネクターはイベントを 3 つの Kafka トピックに生成し、データベースの各テーブルに対して 1 つずつイベントを生成します。
fulfillment.inventory.orders fulfillment.inventory.customers fulfillment.inventory.products
2.1.7. MySQL がサポートするトポロジー
Debezium MySQL コネクターは、以下の MySQL トポロジーをサポートします。
- Standalone
- 単一の MySQL サーバーを使用する場合は、Debezium MySQL コネクターがサーバーをモニターできるように、binlog を有効にする(およびオプションで GTIDs が有効になっている)必要があります。バイナリーログは増分 バックアップ としても使用できるため、これは受け入れ可能です。この場合、MySQL コネクターは常に接続し、このスタンドアロン MySQL サーバーインスタンスに従います。
- マスターおよびスレーブ
Debezium MySQL コネクターは、マスターまたはスレーブの 1 つ(そのスレーブで binlog が有効になっている場合)に従うことができますが、コネクターはそのサーバーに表示されるクラスターの変更のみを確認します。一般的に、これはマルチマスタートポロジー以外の問題ではありません。
コネクターは、クラスターの各サーバーで異なるサーバーの binlog の位置を記録します。そのため、コネクターは MySQL サーバーインスタンス 1 つのみに従う必要があります。サーバーが失敗した場合は、コネクターを続行する前に再起動または復元する必要があります。
- 高可用性クラスター
- MySQL には多くの 高可用性 ソリューションが存在します。そのため、問題や障害からほぼすぐに復元できます。ほとんどの HA MySQL クラスターは GTID を使用して、スレーブがマスター上のすべての変更を追跡できるようにします。
- マルチマスター
マルチマスター MySQL トポロジー は、それぞれが複数のマスターから複製する 1 つ以上の MySQL スレーブを使用します。これは、複数の MySQL クラスターのレプリケーションを集約する強力な方法です。GTID の使用が必要になります。
Debezium MySQL コネクターは、これらのマルチマスター MySQL スレーブをソースとして使用し、w 新規スレーブが古いスレーブにキャッチされていれば、さまざまなマルチマスター MySQL スレーブにフェイルオーバーできます(例: 新しいスレーブは最初のスレーブで最後に確認されたトランザクションをすべて保持します)。コネクターは、新しいマルチマスター MySQL スレーブに再接続して binlog で正しい位置を見つける際に、特定の GTID ソースを include または exclude するように設定できるため、コネクターがデータベースやテーブルのサブセットのみを使用しても機能します。
- Hosted
Amazon RDS や Amazon Aurora などのホストオプションを使用するように Debezium MySQL コネクターがサポートされます。
重要このようなホスト型のオプションでは グローバル読み取りロック が許可されないため、一貫したスナップショット を作成するためにテーブルレベルのロックが使用されます。