2.2. MariaDB 用 Debezium コネクター
MariaDB 用の Debezium コネクターは、テクノロジープレビュー機能です。テクノロジープレビュー機能は、Red Hat 製品のサービスレベルアグリーメント (SLA) の対象外であり、機能的に完全ではないことがあります。Red Hat では、実稼働環境での使用を推奨していません。テクノロジープレビュー機能は、最新の製品機能をいち早く提供して、開発段階で機能のテストを行い、フィードバックを提供していただくことを目的としています。
Red Hat のテクノロジープレビュー機能のサポート範囲に関する詳細は、テクノロジープレビュー機能のサポート範囲 を参照してください。
MariaDB には、データベースにコミットされた順序ですべての操作を記録するバイナリーログ (binlog) があります。これには、テーブルスキーマの変更やテーブルのデータの変更が含まれます。MariaDB はレプリケーションとリカバリーに binlog を使用します。
Debezium MariaDB コネクターは binlog を読み取り、行レベルの INSERT
、UPDATE
、および DELETE
操作の変更イベントを生成し、変更イベントを Kafka トピックに出力します。クライアントアプリケーションはこれらの Kafka トピックを読み取ります。
MariaDB は通常、指定期間後に binlogs をパージするように設定されているため、MariaDB コネクターは各データベースの最初の 整合性スナップショット を実行します。MariaDB コネクターは、スナップショットが作成された時点から binlog を読み取ります。
このコネクターと互換性のある MariaDB データベースのバージョンについては、Debezium でサポートされる設定ページを参照してください。
Debezium MariaDB コネクターの使用に関する情報と手順は、次のように設定されています。
2.2.1. Debezium MariaDB コネクターの仕組み リンクのコピーリンクがクリップボードにコピーされました!
コネクターがサポートする MariaDB トポロジーの概要は、アプリケーションの計画に役立ちます。Debezium MariaDB コネクターを最適に設定および実行するには、コネクターによるテーブルの構造の追跡方法、スキーマ変更の公開方法、スナップショットの実行方法、および Kafka トピック名の決定方法を理解しておくと便利です。
詳細は以下を参照してください。
2.2.1.1. Debezium コネクターでサポートされる MariaDB トポロジー リンクのコピーリンクがクリップボードにコピーされました!
Debezium MariaDB コネクターは、次の MariaDB トポロジーをサポートします。
- Standalone
-
単一の MariaDB サーバーを使用する場合、Debezium MariaDB コネクターがサーバーを監視できるように、サーバーで binlog を有効にする必要があります。
バイナリーログも増分 バックアップ として使用できるため、これは多くの場合で許容されます。
この場合、MariaDB コネクターは常にこのスタンドアロン MariaDB サーバーインスタンスに接続し、それに従います。 - Primary and replica
Debezium MariaDB コネクターは、プライマリーサーバーの 1 つ、またはレプリカの 1 つ (そのレプリカの binlog が有効になっている場合) をフォローできますが、コネクターはそのサーバーに表示されるクラスター内の変更のみを検出します。通常、これはマルチプライマリートポロジー以外では問題ではありません。
コネクターは、サーバーの binlog の位置を記録します。この位置は、クラスターの各サーバーごとに異なります。したがって、コネクターは 1 つの MariaDB サーバーインスタンスのみに従う必要があります。このサーバーに障害が発生した場合、サーバーを再起動またはリカバリーしないと、コネクターは継続できません。
- High available clusters
- MariaDB にはさまざまな 高可用性 ソリューションが存在し、問題や障害の耐性をつけ、即座に回復することが大変容易になります。HA MariaDB クラスターは GTID を使用するため、レプリカはプライマリーサーバーで発生するすべての変更を追跡できます。
- Multi-primary
-
Galera クラスターのレプリケーション は、それぞれが複数のプライマリーサーバーからレプリケートする 1 つ以上の MariaDB レプリカノードを使用します。クラスターレプリケーションは、複数の MariaDB クラスターのレプリケーションを集約する強力な方法を提供します。
Debezium MariaDB コネクターは、これらのマルチプライマリー MariaDB レプリカをソースとして使用し、新しいレプリカが古いレプリカに追いついている限り、異なるマルチプライマリー MariaDB レプリカにフェイルオーバーできます。つまり、新しいレプリカには最初のレプリカで確認されたすべてのトランザクションが含まれます。これは、コネクターがデータベースやテーブルのサブセットのみを使用している場合でも機能します。これは、新しいマルチプライマリー MariaDB レプリカに再接続して binlog 内の正しい位置を見つけようとする場合に、特定の GTID ソースを含めるか除外するようにコネクターを設定できるためです。 - Hosted
Debezium MariaDB コネクターは、Amazon RDS や Amazon Aurora などのホスト型データベースオプションを使用できます。
これらのホストオプションではグローバル読み取りロックの使用が許可されていないため、コネクターは一貫性のあるスナップショットを作成するときにテーブルレベルのロックを使用します。
2.2.1.2. Debezium MariaDB コネクターがデータベーススキーマの変更を処理する方法 リンクのコピーリンクがクリップボードにコピーされました!
データベースクライアントがデータベースのクエリーを行うと、クライアントはデータベースの現在のスキーマを使用します。しかし、データベーススキーマはいつでも変更が可能です。そのため、挿入、更新、または削除の操作が記録されるたびに、コネクターはどのスキーマであるかを特定できる必要があります。また、コネクターは必ずしも現在のスキーマをすべてのイベントに適用できるとは限りません。イベントが比較的古い場合は、現在のスキーマが適用される前に記録された可能性があります。
スキーマ変更後に発生するイベントを正しく処理するために、MariaDB には、データに影響を与える行レベルの変更だけでなく、データベースに適用される DDL ステートメントもトランザクションログに含めます。コネクターは、binlog 内でこれらの DDL ステートメントを検出すると、そのステートメントを解析し、各テーブルのスキーマのインメモリー表現を更新します。コネクターはこのスキーマ表現を使用して、挿入、更新、または削除の操作時にテーブルの構造を特定し、適切な変更イベントを生成します。別のデータベーススキーマ履歴 Kafka トピックでは、コネクターは各 DDL ステートメントがある binlog の場所とともにすべての DDL ステートメントを記録します。
クラッシュするか、正常に停止した後に、コネクターを再起動すると、特定の位置 (特定の時点) から binlog の読み取りを開始します。コネクターは、データベーススキーマ履歴の Kafka トピックを読み取り、コネクターが起動する binlog の時点まですべての DDL ステートメントを解析することで、この時点で存在したテーブル構造を再ビルドします。
このデータベーススキーマ履歴トピックは、内部コネクター専用となっています。オプションで、コネクターは コンシューマーアプリケーション向けの別のトピックにスキーマ変更イベントを送信する こともできます。
MariaDB コネクターが gh-ost
や pt-online-schema-change
などのスキーマ変更ツールが適用されたテーブルの変更をキャプチャーすると、移行プロセス中にヘルパーテーブルが作成されます。これらのヘルパーテーブルで発生する変更をキャプチャーするようにコネクターを設定する必要があります。コネクターがヘルパーテーブル用に生成するレコードをコンシューマーが必要としない場合は、単一メッセージ変換 (SMT) を設定して、コネクターが発行するメッセージからこれらのレコードを削除します。
関連情報
- Debezium イベントレコードを受信する トピックのデフォルト名。
2.2.1.3. Debezium MariaDB コネクターがデータベーススキーマの変更を公開する方法 リンクのコピーリンクがクリップボードにコピーされました!
Debezium MariaDB コネクターを設定して、データベース内のテーブルに適用されるスキーマの変更を記述するスキーマ変更イベントを生成することができます。コネクターは、スキーマ変更イベントを <topicPrefix>
という名前の Kafka トピックに書き込みます。ここで、topicPrefix
は topic.prefix
コネクター設定プロパティーで指定された名前空間です。コネクターがスキーマ変更トピックに送信するメッセージには、ペイロードと、任意で変更イベントメッセージのスキーマが含まれます。
スキーマ変更イベントのスキーマには、次の要素があります。
name
- スキーマ変更イベントメッセージの名前。
type
- 変更イベントメッセージのタイプ。
version
- スキーマのバージョン。バージョンは整数で、スキーマが変更されるたびに増加します。
fields
- 変更イベントメッセージに含まれるフィールド。
例: MariaDB コネクタースキーマ変更トピックのスキーマ
次の例は、JSON 形式の一般的なスキーマを示しています。
スキーマ変更イベントメッセージのペイロードには、以下の要素が含まれます。
ddl
-
スキーマの変更につながる SQL
CREATE
、ALTER
、またはDROP
ステートメントを提供します。 databaseName
-
DDL ステートメントが適用されるデータベースの名前。
databaseName
の値は、メッセージキーとして機能します。 pos
- ステートメントが表示される binlog の位置。
tableChanges
-
スキーマの変更後のテーブルスキーマ全体の構造化表現。
tableChanges
フィールドには、テーブルの各列のエントリーなどのアレイが含まれます。構造化された表現は JSON または Avro 形式でデータを表示するため、コンシューマーは DDL パーサーを介して最初にメッセージを処理しなくてもメッセージを簡単に読み取りできます。
キャプチャーモードであるテーブルでは、コネクターはスキーマ変更トピックにスキーマ変更の履歴だけでなく、内部データベーススキーマ履歴トピックにも格納します。内部データベーススキーマ履歴トピックはコネクターのみの使用を対象としており、使用するアプリケーションによる直接使用を目的としていません。スキーマ変更に関する通知が必要なアプリケーションが、スキーマ変更トピックからの情報のみを使用するようにしてください。
データベーススキーマ履歴トピックをパーティションに分割しないでください。データベーススキーマ履歴トピックが正しく機能するには、コネクターが出力するイベントレコードの一貫したグローバル順序を維持する必要があります。
トピックがパーティション間で分割されないようにするには、以下のいずれかの方法を使用してトピックのパーティション数を設定します。
-
データベーススキーマ履歴トピックを手動で作成する場合は、パーティション数を
1
に指定します。 -
Apache Kafka ブローカーを使用してデータベーススキーマ履歴トピックを自動的に作成する場合に、トピックが作成されるので、Kafka
num.partitions
設定オプションの値を1
に設定します。
コネクターがスキーマ変更トピックに出力するメッセージの形式は、初期の状態であり、通知なしに変更される可能性があります。
例: MariaDB コネクタースキーマ変更トピックに送信されるメッセージ
以下の例は、JSON 形式の一般的なスキーマ変更メッセージを示しています。メッセージには、テーブルスキーマの論理表現が含まれます。
項目 | フィールド名 | 説明 |
---|---|---|
1 |
|
|
2 |
|
コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。 |
3 |
|
変更が含まれるデータベースとスキーマを識別します。 |
4 |
|
このフィールドには、スキーマの変更を行う DDL が含まれます。 |
5 |
| DDL コマンドによって生成されるスキーマの変更が含まれる 1 つ以上の項目の配列。 |
6 |
| 変更の種類を説明します。値は以下のいずれかになります。
|
7 |
|
作成、変更、または破棄されたテーブルの完全な識別子。テーブルの名前が変更されると、この識別子は |
8 |
| 適用された変更後のテーブルメタデータを表します。 |
9 |
| テーブルのプライマリーキーを設定する列のリスト。 |
10 |
| 変更されたテーブルの各列のメタデータ。 |
11 |
| 各テーブル変更のカスタム属性メタデータ。 |
詳細は、スキーマ履歴トピック を参照してください。
2.2.1.4. Debezium MariaDB コネクターがデータベーススナップショットを実行する方法 リンクのコピーリンクがクリップボードにコピーされました!
Debezium MariaDB コネクターが初めて起動されると、データベースの初期 整合性スナップショット が実行されます。このスナップショットにより、コネクターはデータベースの現在の状態のベースラインを確立できます。
Debezium はスナップショットを実行するときにさまざまなモードを使用できます。スナップショットモードは、snapshot.mode
設定プロパティーによって決まります。プロパティーのデフォルト値は initial
です。snapshot.mode
プロパティーの値を変更することで、コネクターがスナップショットを作成する方法をカスタマイズできます。
スナップショットの詳細は、以下のセクションを参照してください。
コネクターは、スナップショットを実行するときに一連のタスクを完了します。正確な手順は、スナップショットモードと、データベースに対して有効なテーブルロックポリシーによって異なります。Debezium MariaDB コネクターは、グローバル読み取りロック または テーブルレベルロック を使用する初期スナップショットを実行するときに、さまざまな手順を実行します。
2.2.1.4.1. グローバル読み取りロックを使用する初期スナップショット リンクのコピーリンクがクリップボードにコピーされました!
snapshot.mode
プロパティーの値を変更することで、コネクターがスナップショットを作成する方法をカスタマイズできます。別のスナップショットモードを設定する場合、コネクターはこのワークフローの変更バージョンを使用してスナップショットを完了します。グローバル読み取りロックが許可されていない環境でのスナップショットプロセスは、テーブルレベルロックのスナップショットワークフロー を参照してください。
Debezium MariaDB コネクターがグローバル読み取りロックを使用して初期スナップショットを実行するために使用するデフォルトのワークフロー
以下の表は、Debezium がグローバル読み取りロックでスナップショットを作成する際のワークフローの手順を示しています。
手順 | アクション |
---|---|
1 | データベースへの接続を確立します。 |
2 |
キャプチャーするテーブルを決定します。デフォルトでは、コネクターはシステム以外のすべてのテーブルのデータをキャプチャーします。スナップショットが完了した後、コネクターは指定されたテーブルのデータをストリーミングし続けます。コネクターで特定のテーブルからのみデータをキャプチャーする場合は、 |
3 |
キャプチャーするテーブルに対してグローバル読み取りロックを取得し、他のデータベースクライアントによる writes をブロックします。 |
4 |
繰り返し可能な読み取りセマンティクス でトランザクションを開始し、トランザクション内の後続の読み取りがすべて 整合性スナップショット に対して実行されるようにします。 注記 これらの分離セマンティクスを使用すると、スナップショットの進行が遅くなる可能性があります。スナップショットの完了に時間がかかりすぎる場合は、別の分離設定の使用を検討するか、最初のスナップショットをスキップして、代わりに 増分スナップショット を実行します。 |
5 | 現在の binlog の位置を読み取ります。 |
6 |
データベース内のすべてのテーブル、またはキャプチャー対象として指定されたすべてのテーブルの構造をキャプチャーします。コネクターは、必要なすべての 注記
デフォルトでは、コネクターは、キャプチャー用に設定されていないテーブルも含め、データベース内のすべてのテーブルのスキーマをキャプチャーします。テーブルがキャプチャー用に設定されていない場合、最初のスナップショットはテーブルの構造のみをキャプチャーし、テーブルデータはキャプチャーされません。 |
7 | 手順 3 で取得したグローバル読み取りロックを解放します。他のデータベースクライアントがデータベースに書き込みできるようになりました。 |
8 | コネクターが手順 5 で読み取った binlog の位置で、コネクターはキャプチャー用に指定されたテーブルのスキャンを開始します。スキャン中に、コネクターは次のタスクを実行します。
|
9 | トランザクションをコミットします。 |
10 | コネクターオフセットにスナップショットの正常な完了を記録します。 |
作成された初期スナップショットは、キャプチャーされたテーブルの各行の現在の状態をキャプチャーします。このベースライン状態から、コネクターは発生した後続の変更をキャプチャーします。
スナップショットプロセスが開始されたら、コネクターの障害、リバランス、またはその他の理由でプロセスが中断されると、コネクターの再起動後にプロセスが再起動されます。
コネクターによって最初のスナップショットが完了した後、更新に抜けがないように、手順 5 で読み取りした位置からストリーミングを続行します。
何らかの理由でコネクターが再び停止した場合に、コネクターは再起動後に最後に停止した位置から変更のストリーミングを再開します。
コネクターの再起動後、ログがプルーニングされている場合、ログ内のコネクターの位置が使用できなくなる可能性があります。その後、コネクターは失敗し、新しいスナップショットが必要であることを示すエラーを返します。この状況でスナップショットを自動的に開始するようにコネクターを設定するには、snapshot.mode
プロパティーの値を when_needed
に設定します。Debezium MariaDB コネクターのトラブルシューティングに関する詳細は、問題が発生したときの動作 を参照してください。
2.2.1.4.2. テーブルレベルロックを使用する初期スナップショット リンクのコピーリンクがクリップボードにコピーされました!
一部のデータベース環境では、管理者がグローバル読み取りロックを許可していません。Debezium MariaDB コネクターがグローバル読み取りロックが許可されていないことを検出した場合、コネクターはスナップショットを実行するときにテーブルレベルのロックを使用します。コネクターがテーブルレベルロックを使用するスナップショットを実行するには、Debezium コネクターが MariaDB への接続に使用するデータベースアカウントで LOCK TABLES
権限が必要です。
Debezium MariaDB コネクターがテーブルレベルのロックを使用して初期スナップショットを実行するために使用するデフォルトのワークフロー
次の表は、テーブルレベルの読み取りロックを使用してスナップショットを作成するために Debezium が実行するワークフローの手順を示しています。グローバル読み取りロックが許可されていない環境でのスナップショットプロセスについては、グローバル読み取りロックのスナップショットワークフロー を参照してください。
手順 | アクション |
---|---|
1 | データベースへの接続を確立します。 |
2 |
キャプチャーするテーブルを決定します。デフォルトでは、コネクターはすべてのシステム以外のテーブルをキャプチャーします。コネクターにテーブルまたはテーブル要素のサブセットをキャプチャーさせるには、 |
3 | テーブルレベルロックを取得します。 |
4 |
繰り返し可能な読み取りセマンティクス でトランザクションを開始し、トランザクション内の後続の読み取りがすべて 整合性スナップショット に対して実行されるようにします。 |
5 | 現在の binlog の位置を読み取ります。 |
6 |
コネクターが変更をキャプチャーするように設定されたデータベースとテーブルのスキーマを読み取ります。コネクターは、必要なすべての 注記 デフォルトでは、コネクターは、キャプチャー用に設定されていないテーブルも含め、データベース内のすべてのテーブルのスキーマをキャプチャーします。テーブルがキャプチャー用に設定されていない場合、最初のスナップショットはテーブルの構造のみをキャプチャーし、テーブルデータはキャプチャーされません。 初期スナップショットに含まれなかったテーブルのスキーマ情報がスナップショットに保持される理由の詳細は、初期スナップショットがすべてのテーブルのスキーマをキャプチャーする理由 を参照してください。 |
7 | コネクターが手順 5 で読み取った binlog の位置で、コネクターはキャプチャー用に指定されたテーブルのスキャンを開始します。スキャン中に、コネクターは次のタスクを実行します。
|
8 | トランザクションをコミットします。 |
9 | テーブルレベルロックを解除します。他のデータベースクライアントは、以前にロックされていたテーブルに書き込みできるようになります。 |
10 | コネクターオフセットにスナップショットの正常な完了を記録します。 |
設定 | 説明 |
---|---|
| コネクターは起動するたびにスナップショットを実行します。スナップショットには、キャプチャーされたテーブルの構造およびデータが含まれます。この値を指定すると、コネクターが起動するたびに、キャプチャーされたテーブルからのデータの完全な表現がトピックに入力されます。スナップショットが完了すると、コネクターは、後続のデータベース変更のに備え、イベントレコードのストリーミングを開始します。 |
| コネクターは 初期スナップショットを作成するためのデフォルトのワークフロー で説明されているように、データベーススナップショットを実行します。スナップショットが完了すると、コネクターは、後続のデータベース変更のに備え、イベントレコードのストリーミングを開始します。 |
| コネクターはデータベーススナップショットを実行します。スナップショットが完了すると、コネクターは停止し、後続のデータベース変更のイベントレコードをストリーミングしなくなります。 |
|
非推奨です。 |
|
コネクターは、初期スナップショットを作成するためのデフォルトのワークフロー で説明されているすべての手順を実行して、関連するすべてのテーブルの構造をキャプチャーします。ただし、コネクターの起動時点のデータセットを表す |
|
コネクターが起動すると、スナップショットを実行するのではなく、後続のデータベース変更のイベントレコードのストリーミングがすぐに開始されます。 |
|
非推奨です。 |
|
損失または破損したデータベーススキーマの履歴トピックを復元するにはこのオプションを設定します。再起動後、コネクターはソーステーブルからトピックを再構築するスナップショットを実行します。また、このプロパティーを設定して、予期しない増加が発生するデータベーススキーマ履歴トピックを定期的にプルーニングすることもできます。 |
| コネクターが起動した後、次のいずれかの状況を検出した場合にのみスナップショットが実行されます。
|
詳細は、コネクター設定プロパティーテーブルの snapshot.mode
を参照してください。
2.2.1.4.3. 初期スナップショットがすべてのテーブルのスキーマ履歴をキャプチャーする理由 リンクのコピーリンクがクリップボードにコピーされました!
コネクターが実行する最初のスナップショットは、2 種類の情報をキャプチャーします。
- テーブルデータ
-
コネクターの
table.include.list
プロパティーにあるテーブルのINSERT
、UPDATE
、およびDELETE
操作に関する情報。 - スキーマデータ
- テーブルに適用される構造の変更を記述する DDL ステートメント。スキーマデータは、内部スキーマ履歴トピックとコネクターのスキーマ変更トピック (設定されている場合) の両方に保持されます。
初期スナップショットを実行すると、キャプチャー対象として指定されていないテーブルのスキーマ情報がスナップショットによってキャプチャーされることが分かります。デフォルトでは、初期スナップショットは、キャプチャー用に指定されたテーブルからだけでなく、データベースに存在するすべてのテーブルのスキーマ情報を取得するように設計されています。コネクターでは、テーブルのスキーマがスキーマ履歴トピックにある状態で、テーブルをキャプチャーする必要があります。初期スナップショットが元のキャプチャーセットの一部ではないテーブルのスキーマデータをキャプチャーできるようにして、後で必要になった場合にこれらのテーブルからイベントデータを簡単にキャプチャーできるように、Debezium はコネクターを準備します。初期スナップショットがテーブルのスキーマをキャプチャーしない場合は、コネクターがテーブルからデータをキャプチャーする前に、履歴トピックにスキーマを追加する必要があります。
場合によっては、最初のスナップショットでのスキーマキャプチャーを制限する場合があります。これは、スナップショットの完了に必要な時間の短縮に便利です。または、Debezium が複数の論理データベースにアクセスできるユーザーアカウントを使用して、データベースインスタンスに接続しているにもかかわらず、コネクターで特定の論理データベース内のテーブルからの変更のみをキャプチャーする場合にも便利です。
関連情報
- 初期スナップショットでキャプチャーされなかったテーブルからのデータのキャプチャー (スキーマ変更なし)
- 初期スナップショットでキャプチャーされなかったテーブルからのデータのキャプチャー (スキーマ変更)
-
schema.history.internal.store.only.captured.tables.ddl
プロパティーを設定して、スキーマ情報をキャプチャーするテーブルを指定します。 -
schema.history.internal.store.only.captured.databases.ddl
プロパティーを設定して、スキーマ変更をキャプチャーする論理データベースを指定します。
2.2.1.4.4. 初期スナップショットでキャプチャーされなかったテーブルからのデータのキャプチャー (スキーマ変更なし) リンクのコピーリンクがクリップボードにコピーされました!
コネクターを使用して、最初のスナップショットでスキーマがキャプチャーされなかったテーブルからデータをキャプチャーする場合があります。コネクターの設定によっては、最初のスナップショットはデータベース内の特定のテーブルのテーブルスキーマのみをキャプチャーする場合があります。テーブルスキーマが履歴トピックに存在しない場合、コネクターはテーブルのキャプチャーに失敗し、スキーマ欠落エラーを報告します。
テーブルからデータを取得できる場合もありますが、テーブルスキーマを追加するには別の手順を実行する必要があります。
前提条件
- コネクターにより最初のスナップショット中にキャプチャーされなかったスキーマが含まれるテーブルからデータをキャプチャーしたいと考えている。
- トランザクションログでは、テーブルのすべてのエントリーが同じスキーマを使用します。構造変更が行われた新しいテーブルからデータをキャプチャーする方法については、初期スナップショットでキャプチャーされなかったテーブルからのデータのキャプチャー (スキーマ変更) を参照してください。
手順
- コネクターを停止します。
-
schema.history.internal.kafka.topic プロパティー
で指定された内部データベーススキーマ履歴トピックを削除します。 以下の変更をコネクター設定に適用します。
-
snapshot.mode
をrecovery
に設定します。 -
schema.history.internal.store.only.captured.tables.ddl
の値をfalse
に設定します。 -
コネクターがキャプチャーするテーブルを
table.include.list
に追加します。これにより、コネクターは今後すべてのテーブルのスキーマ履歴を再構築できます。
-
- コネクターを再起動します。スナップショットのリカバリープロセスでは、テーブルの現在の構造に基づいてスキーマ履歴が再ビルドされます。
- (オプション) スナップショットが完了したら、増分スナップショット を開始して、コネクターがオフラインだった間に発生した他のテーブルへの変更とともに、新しく追加されたテーブルの既存のデータをキャプチャーします。
-
(オプション)
snapshot.mode
をno_data
にリセットして、今後の再起動後にコネクターが回復を開始しないようにします。
2.2.1.4.5. 初期スナップショットでキャプチャーされなかったテーブルからのデータのキャプチャー (スキーマ変更) リンクのコピーリンクがクリップボードにコピーされました!
スキーマ変更がテーブルに適用される場合、スキーマ変更前にコミットされたレコードの構造は、変更後にコミットされたレコードとは異なります。Debezium はテーブルからデータをキャプチャーするときに、スキーマ履歴を読み取り、各イベントに正しいスキーマが適用されていることを確認します。スキーマがスキーマ履歴トピックに存在しない場合、コネクターはテーブルをキャプチャーできず、エラーが発生します。
最初のスナップショットでキャプチャーされず、テーブルのスキーマが変更されたテーブルからデータをキャプチャーする場合、スキーマがまだ使用可能でない場合は、履歴トピックにスキーマを追加する必要があります。新しいスキーマスナップショットを実行するか、テーブルの初期スナップショットを実行して、スキーマを追加できます。
前提条件
- コネクターにより最初のスナップショット中にキャプチャーされなかったスキーマが含まれるテーブルからデータをキャプチャーしたいと考えている。
- スキーマ変更がテーブルに適用されたため、キャプチャーされるレコードの構造が不均一になっている。
手順
- 初期スナップショットにすべてのテーブルのスキーマがキャプチャーされている場合 (
store.only.captured.tables.ddl
はfalse
に設定されました)。 -
table.include.list
プロパティーを編集して、キャプチャーするテーブルを指定します。 - コネクターを再起動します。
- 新しく追加したテーブルから既存のデータをキャプチャーする場合は、増分スナップショット を開始します。
-
- 初期スナップショットにすべてのテーブルのスキーマがキャプチャーされていない場合 (
store.only.captured.tables.ddl
がtrue
に設定されています)。 最初のスナップショットでキャプチャーするテーブルのスキーマが保存されなかった場合は、次のいずれかの手順を実行します。
- 手順 1: スキーマスナップショット、その後に増分スナップショット
この手順では、コネクターは最初にスキーマのスナップショットを実行します。その後、増分スナップショットを開始して、コネクターがデータを同期できるようにします。
- コネクターを停止します。
-
schema.history.internal.kafka.topic プロパティー
で指定された内部データベーススキーマ履歴トピックを削除します。 設定された Kafka Connect
offset.storage.topic
内のオフセットをクリアします。オフセットを削除する方法の詳細は、Debezium コミュニティーの FAQ を参照してください。警告オフセットの削除は、内部 Kafka Connect データの操作の経験がある上級ユーザーのみが実行してください。この操作によりシステムが破損する場合があるため、最後の手段としてのみ実行してください。
次の手順の説明に従って、コネクター設定のプロパティーの値を設定します。
-
snapshot.mode
プロパティーの値をno_data
に設定します。 -
table.include.list
を編集して、キャプチャーするテーブルを追加します。
-
- コネクターを再起動します。
- Debezium が新規および既存のテーブルのスキーマをキャプチャーするまで待ちます。コネクターが停止した後にテーブルで発生したデータ変更はキャプチャーされません。
- データが損失されないようにするには、増分スナップショット を開始します。
- 手順 2: 初期スナップショットと、それに続くオプションの増分スナップショット
この手順では、コネクターはデータベースの完全な初期スナップショットを実行します。他の初期スナップショットと同様、多数の大きなテーブルが含まれるデータベースでは、初期スナップショットの実行操作には時間がかかる可能性があります。スナップショットの完了後、任意で増分スナップショットをトリガーして、コネクターがオフラインの間に発生した変更をキャプチャーできます。
- コネクターを停止します。
-
schema.history.internal.kafka.topic プロパティー
で指定された内部データベーススキーマ履歴トピックを削除します。 設定された Kafka Connect
offset.storage.topic
内のオフセットをクリアします。オフセットを削除する方法の詳細は、Debezium コミュニティーの FAQ を参照してください。警告オフセットの削除は、内部 Kafka Connect データの操作の経験がある上級ユーザーのみが実行してください。この操作によりシステムが破損する場合があるため、最後の手段としてのみ実行してください。
-
table.include.list
を編集して、キャプチャーするテーブルを追加します。 次の手順の説明に従って、コネクター設定のプロパティーの値を設定します。
-
snapshot.mode
プロパティーの値をinitial
に設定します。 -
(オプション)
schema.history.internal.store.only.captured.tables.ddl
をfalse
に設定します。
-
- コネクターを再起動します。コネクターはデータベース全体のスナップショットを取得します。スナップショットが完了すると、コネクターはストリーミングに移行します。
- (オプション) コネクターがオフラインの間に変更されたデータをキャプチャーするには、増分スナップショット を開始します。
2.2.1.5. アドホックスナップショット リンクのコピーリンクがクリップボードにコピーされました!
デフォルトでは、コネクターは初回スナップショット操作の開始後にのみ実行されます。通常の状況では、この最初のスナップショットが作成されると、コネクターではスナップショットプロセスは繰り返し処理されません。コネクターがキャプチャーする今後の変更イベントデータはストリーミングプロセス経由でのみ行われます。
ただし、場合によっては、最初のスナップショット中にコネクターを取得したデータが古くなったり、失われたり、または不完全となったり可能性があります。テーブルデータを再キャプチャーするメカニズムを提供するため、Debezium にはアドホックスナップショットを実行するオプションがあります。Debezium 環境で次のいずれかの変更が発生したら、アドホックスナップショットを実行することを推奨します。
- コネクター設定は、異なるテーブルセットをキャプチャーするように変更されます。
- Kafka トピックを削除して、再構築する必要があります。
- 設定エラーや他の問題が原因で、データの破損が発生します。
アドホックと呼ばれるスナップショット を開始することで、以前にスナップショットをキャプチャーしたテーブルのスナップショットを再実行できます。アドホックスナップショットには、シグナルテーブル を使用する必要があります。シグナルリクエストを Debezium シグナルテーブルに送信して、アドホックスナップショットを開始します。
既存のテーブルのアドホックスナップショットを開始すると、コネクターはテーブルにすでに存在するトピックにコンテンツを追加します。既存のトピックが削除された場合には、トピックの自動作成 が有効になっているのであれば、Debezium は自動的にトピックを作成できます。
アドホックのスナップショットシグナルは、スナップショットに追加するテーブルを指定します。スナップショットは、データベースの内容全体をキャプチャーしたり、データベース内のテーブルのサブセットのみをキャプチャーしたりできます。また、スナップショットは、データベース内のテーブルの内容のサブセットをキャプチャできます。
execute-snapshot
メッセージをシグナルテーブルに送信してキャプチャーするテーブルを指定します。execute-snapshot
シグナルのタイプを incremental
または blocking
に設定し、スナップショットに含めるテーブルの名前を次の表に示すように指定します。
フィールド | デフォルト | 値 |
---|---|---|
|
|
実行するスナップショットのタイプを指定します。 |
| 該当なし |
スナップショットに含めるテーブルの完全修飾名に一致する正規表現を含む配列。 |
| 該当なし |
コネクターがスナップショットに含めるレコードのサブセットを決定するために評価する追加条件のセットを指定する、オプションの配列。
|
| 該当なし | スナップショット処理中にコネクターがテーブルのプライマリーキーとして使用する列名を指定するオプションの文字列。 |
アドホック増分スナップショットのトリガー
アドホック増分スナップショットを開始するには、execute-snapshot
シグナルタイプのエントリーをシグナリングテーブルに追加するか、シグナルメッセージを Kafka シグナリングトピックに送信します。コネクターがメッセージを処理した後に、スナップショット操作を開始します。スナップショットプロセスは、最初と最後のプライマリーキーの値を読み取り、これらの値を各テーブルの開始ポイントおよびエンドポイントとして使用します。テーブルのエントリー数と設定されたチャンクサイズに基づいて、Debezium はテーブルをチャンクに分割し、チャンクごとに 1 度に 1 つずつスナップショットを順番に作成していきます。
詳細は、スナップショットの増分 を参照してください。
アドホックブロッキングスナップショットのトリガー
シグナリングテーブルまたはシグナリングトピックに、execute-snapshot
シグナルタイプを持つエントリーを追加することによって、アドホックブロッキングスナップショットを開始します。コネクターがメッセージを処理した後に、スナップショット操作を開始します。コネクターはストリーミングを一時的に停止し、初期スナップショットの時と同じプロセスに従って、指定されたテーブルのスナップショットを開始します。スナップショットが完了すると、コネクターはストリーミングを再開します。
詳細は、ブロッキングスナップショット を参照してください。
2.2.1.6. 増分スナップショット リンクのコピーリンクがクリップボードにコピーされました!
スナップショットを柔軟に管理するため、Debezium には 増分スナップショット と呼ばれる補助スナップショットメカニズムが含まれています。増分スナップショットは、Debezium コネクターにシグナルを送信 するための Debezium メカニズムに依存します。
増分スナップショットでは、最初のスナップショットのように、データベースの完全な状態を一度にすべてキャプチャーする代わりに、一連の設定可能なチャンクで各テーブルを段階的にキャプチャーします。スナップショットがキャプチャーするテーブルと、各チャンクのサイズ を指定できます。チャンクのサイズにより、データベース上の各フェッチ操作中にスナップショットで収集される行数が決まります。増分スナップショットのデフォルトのチャンクサイズは 1024 行です。
増分スナップショットが進むと、Debezium はウォーターマークを使用して進捗を追跡し、キャプチャーする各テーブル行のレコードを管理します。この段階的なアプローチでは、標準の初期スナップショットプロセスと比較して、以下の利点があります。
- スナップショットが完了するまで、ストリーミングストリーミングを延期する代わりに、ストリームしたデータキャプチャーと並行して増分スナップショットを実行できます。コネクターはスナップショットプロセス全体で変更ログからのほぼリアルタイムイベントをキャプチャーし続け、他の操作はブロックしません。
- 増分スナップショットの進捗が中断された場合は、データを失うことなく再開できます。プロセスが再開すると、スナップショットは最初からテーブルをキャプチャーするのではなく、停止した時点から開始します。
-
いつでも増分スナップショットを実行し、必要に応じてプロセスを繰り返してデータベースの更新に適合できます。たとえば、コネクター設定を変更してテーブルを
table.include.list
プロパティーに追加した後にスナップショットを再実行します。
増分スナップショットプロセス
増分スナップショットを実行する場合には、Debezium は各テーブルをプライマリーキー別に分類して、設定されたチャンクサイズ に基づいてテーブルをチャンクに分割します。チャンクごとに作業し、テーブルの行ごとにチャンクでキャプチャーします。キャプチャーする行ごとに、スナップショットは READ
イベントを出力します。そのイベントは、対象となるチャンクのスナップショットを開始する時の行の値を表します。
スナップショットの作成が進むにつれ、他のプロセスがデータベースへのアクセスを継続し、テーブルレコードが変更される可能性があります。このような変更を反映させるように、通常通りに INSERT
、UPDATE
、DELETE
操作がトランザクションログにコミットされます。同様に、継続中の Debezium ストリーミングプロセスは、これらの変更イベントを検出し、対応する変更イベントレコードを Kafka に出力します。
Debezium を使用してプライマリーキーが同じレコード間での競合を解決する方法
場合によっては、ストリーミングプロセスが出力する UPDATE
または DELETE
イベントを順番に受信できます。つまり、ストリーミングプロセスは、スナップショットがその行の READ
イベントが含まれるチャンクをキャプチャーする前に、テーブルの行を変更するイベントを生成する可能性があります。スナップショットが最終的に対象の行にあった READ
イベントを出力すると、その値はすでに置き換えられています。Debezium は、シーケンスが到達する増分スナップショットイベントが正しい論理順序で処理されるように、競合を解決するためにバッファースキームを使用します。スナップショットのイベント間で競合が発生し、ストリームされたイベントが解決されてからでないと、Debezium はイベントのレコードを Kafka に送信しません。
スナップショットウィンドウ
遅れて入ってきた READ
イベントと、同じテーブルの行を変更するストリーミングイベント間の競合の解決を容易にするために、Debezium は スナップショットウィンドウ と呼ばれるものを使用します。スナップショットウィンドウは、増分スナップショットが指定のテーブルチャンクのデータをキャプチャーしている途中に、間隔を決定します。チャンクのスナップショットウィンドウを開く前に、Debezium は通常の動作に従い、トランザクションログから直接ターゲットの Kafka トピックにイベントをダウンストリームに出力します。ただし、特定のチャンクのスナップショットが開放された瞬間から終了するまで、Debezium は重複除去のステップを実行して、プライマリーキーが同じイベント間での競合を解決します。
データコレクションごとに、Debezium は 2 種類のイベントを出力し、それらの両方のレコードを単一の宛先 Kafka トピックに保存します。テーブルから直接キャプチャーするスナップショットレコードは、READ
操作として出力されます。その間、ユーザーはデータコレクションのレコードの更新を続け、各コミットを反映するようにトランザクションログが更新されるので、Debezium は変更ごとに UPDATE
または DELETE
操作を出力します。
スナップショットウィンドウが開放され、Debezium がスナップショットチャンクの処理を開始すると、スナップショットレコードをメモリーバッファーに提供します。スナップショットウィンドウ中に、バッファー内の READ
イベントのプライマリーキーは、受信ストリームイベントのプライマリーキーと比較されます。一致するものが見つからない場合、ストリーミングされたイベントレコードが Kafka に直接送信されます。Debezium が一致を検出すると、バッファーされた READ
イベントを破棄し、ストリーミングされたレコードを宛先トピックに書き込みます。これは、ストリーミングされたイベントが静的スナップショットイベントよりも論理的に優先されるためです。チャンクのスナップショットウィンドウが終了すると、バッファーに含まれるのは、関連するトランザクションログイベントが存在しない READ
イベントのみです。Debezium は、これらの残りの READ
イベントをテーブルの Kafka トピックに出力します。
コネクターは各スナップショットチャンクにプロセスを繰り返します。
現在、増分スナップショットを開始するには、次のいずれかの方法を使用できます。
2.2.1.6.1. 増分スナップショットのトリガー リンクのコピーリンクがクリップボードにコピーされました!
増分スナップショットを開始するには、ソースデータベースのシグナリングテーブルに アドホックスナップショットシグナル を送信します。スナップショットシグナルは SQL INSERT
クエリーとして送信します。
Debezium がシグナルテーブルの変更を検出すると、シグナルを読み取り、要求されたスナップショット操作を実行します。
送信するクエリーはスナップショットに追加するテーブルを指定し、必要に応じてスナップショット操作の種類を指定します。Debezium は現在、incremental
と blocking
のスナップショットタイプをサポートしています。
スナップショットに追加するテーブルを指定するには、テーブルをリストする data-collections
配列またはテーブルの照合に使用する正規表現の配列を指定します。以下に例を示します。
{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}
増分スナップショットシグナルの data-collections
アレイにはデフォルト値がありません。data-collections
配列が空の場合、Debezium は空の配列をアクションが必要ないと解釈し、スナップショットは作成しません。
スナップショットに含めるテーブルの名前にドット (.
)、スペース、またはその他の英数字以外の文字が含まれている場合は、テーブル名を二重引用符でエスケープする必要があります。
たとえば、db1
データベースに存在し、My.Table
という名前のテーブルを含めるには、"db1.\"My.Table\""
の形式を使用します。
前提条件
- ソースデータベースにシグナリングデータコレクションが存在する。
-
シグナルデータコレクションが
signal.data.collection
プロパティーで指定されている。
ソースシグナリングチャネルを使用して増分スナップショットをトリガーする
SQL クエリーを送信し、アドホック増分スナップショット要求をシグナルテーブルに追加します。
INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"<snapshotType>","additional-conditions":[{"data-collection": "<fullyQualfiedTableName>", "filter": "<additional-condition>"}]}');
INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"<snapshotType>","additional-conditions":[{"data-collection": "<fullyQualfiedTableName>", "filter": "<additional-condition>"}]}');
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 以下に例を示します。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow コマンドの
id
、type
、およびdata
パラメーターの値は、シグナルテーブルのフィールド に対応します。
以下の表では、この例のパラメーターを説明しています。Expand 表2.28 シグナルテーブルに増分スナップショットシグナルを送信する SQL コマンドのフィールドの説明 項目 値 説明 1
database.debezium_signal
ソースデータベースにあるシグナルテーブルの完全修飾名を指定します。
2
ad-hoc-1
id
パラメーターは、シグナルリクエストのID
識別子として割り当てられる任意の文字列を指定します。
この文字列を使用して、シグナルテーブルのエントリーへのログメッセージを特定します。Debezium はこの文字列を使用しません。代わりに、スナップショット作成中に、Debezium は独自のID
文字列をウォーターマークシグナルとして生成します。3
execute-snapshot
type
パラメーターを指定し、シグナルがトリガーする操作を指定します。
4
data-collections
シグナルの
data
フィールドの必須コンポーネントで、スナップショットに含めるテーブル名の配列またはテーブル名と一致する正規表現を指定します。
配列には、database.table
形式を使用してテーブルの完全修飾名と一致する正規表現がリストされます。この形式は、コネクターの シグナリングテーブル の名前を指定するために使用する形式と同じです。5
incremental
実行するスナップショット操作のタイプを指定する、シグナルの
data
フィールドのオプションのtype
コンポーネント。
有効な値はincremental
とblocking
です。
値を指定しない場合、コネクターはデフォルトで増分スナップショットを実行します。6
additional-conditions
コネクターがスナップショットに含めるレコードのサブセットを決定するために評価する追加条件のセットを指定する、オプションの配列。
各追加条件は、data-collection
プロパティーとfilter
プロパティーを持つオブジェクトです。データの収集単位で異なるフィルターを指定できます。
*data-collection
プロパティーは、フィルターが適用されるデータコレクションの完全修飾名です。additional-conditions
パラメーターの詳細は、additional-conditions
付きでアドホック増分スナップショットを実行する を参照してください。
additional-conditions
付きでアドホック増分スナップショットを実行する
スナップショットに、テーブル内のコンテンツのサブセットのみを含める場合は、スナップショットシグナルに additional-conditions
パラメーターを追加してシグナル要求を変更できます。
一般的なスナップショットの SQL クエリーは、以下の形式を取ります。
SELECT * FROM <tableName> ....
SELECT * FROM <tableName> ....
additional-conditions
パラメーターを追加して、以下の例のように WHERE
条件を SQL クエリーに追加します。
SELECT * FROM <data-collection> WHERE <filter> ....
SELECT * FROM <data-collection> WHERE <filter> ....
以下の例は、シグナルテーブルに追加の条件を含むアドホック増分スナップショット要求を送信する SQL クエリーを示しています。
INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"<snapshotType>","additional-conditions":[{"data-collection": "<fullyQualfiedTableName>", "filter": "<additional-condition>"}]}');
INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"<snapshotType>","additional-conditions":[{"data-collection": "<fullyQualfiedTableName>", "filter": "<additional-condition>"}]}');
たとえば、以下の列が含まれる products
テーブルがあるとします。
-
id
(プライマリーキー) -
color
-
quantity
products
テーブルの増分スナップショットに color=blue
のデータ項目のみを含める場合は、次の SQL ステートメントを使用してスナップショットをトリガーできます。
INSERT INTO db1.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["db1.products"],"type":"incremental", "additional-conditions":[{"data-collection": "db1.products", "filter": "color=blue"}]}');
INSERT INTO db1.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["db1.products"],"type":"incremental", "additional-conditions":[{"data-collection": "db1.products", "filter": "color=blue"}]}');
additional-conditions
パラメーターを使用すると、列が 2 つ以上となる条件を指定することもできます。たとえば、前述の例の products
テーブルを使用して、color=blue
および quantity>10
だけに一致するアイテムのみのデータが含まれる増分スナップショットをトリガーするクエリーを送信できます。
INSERT INTO db1.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["db1.products"],"type":"incremental", "additional-conditions":[{"data-collection": "db1.products", "filter": "color=blue AND quantity>10"}]}');
INSERT INTO db1.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["db1.products"],"type":"incremental", "additional-conditions":[{"data-collection": "db1.products", "filter": "color=blue AND quantity>10"}]}');
以下の例は、コネクターによってキャプチャーされる増分スナップショットイベントの JSON を示しています。
例2.9 増分スナップショットイベントメッセージ
項目 | フィールド名 | 説明 |
---|---|---|
1 |
|
実行するスナップショット操作タイプを指定します。 |
2 |
|
イベントタイプを指定します。 |
2.2.1.6.2. Kafka シグナルチャネルを使用して増分スナップショットをトリガーする リンクのコピーリンクがクリップボードにコピーされました!
設定された Kafka トピック にメッセージを送信して、コネクターにアドホック増分スナップショットを実行するよう要求できます。
Kafka メッセージのキーは、topic.prefix
コネクター設定オプションの値と一致する必要があります。
メッセージの値は、type
と data
フィールドが含まれる JSON オブジェクトとなっています。
シグナルタイプは execute-snapshot
で、data
フィールドには以下のフィールドが必要です。
フィールド | デフォルト | 値 |
---|---|---|
|
|
実行するスナップショットのタイプ。現在、Debezium は |
| 該当なし |
スナップショットに含めるテーブルの完全修飾名と一致する、コンマ区切りの正規表現の配列。 |
| 該当なし |
コネクターがスナップショットに含めるレコードのサブセットを指定するために評価する基準を指定する、オプションの追加条件の配列。 |
例2.10 execute-snapshot
Kafka メッセージ
Key = `test_connector` Value = `{"type":"execute-snapshot","data": {"data-collections": ["{collection-container}.table1", "{collection-container}.table2"], "type": "INCREMENTAL"}}`
Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["{collection-container}.table1", "{collection-container}.table2"], "type": "INCREMENTAL"}}`
additional-conditions 付きのアドホック増分スナップショット
Debezium は additional-conditions
フィールドを使用してテーブルのコンテンツのサブセットを選択します。
通常、Debezium はスナップショットを実行するときに、次のような SQL クエリーを実行します。
SELECT * FROM <tableName> ….
スナップショット要求に additional-conditions
プロパティーが含まれている場合、プロパティーの data-collection
および filter
パラメーターが SQL クエリーに追加されます。次に例を示します。
SELECT * FROM <data-collection> WHERE <filter> ….
たとえば、列 id
(プライマリーキー)、color
、および brand
を含む products
テーブルがある場合、スナップショットに color='blue'
のコンテンツのみを含める場合は、スナップショットをリクエストするときに、コンテンツをフィルタリングする additional-conditions
プロパティーを追加することができます。
Key = `test_connector` Value = `{"type":"execute-snapshot","data": {"data-collections": ["db1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "db1.products" ,"filter":"color='blue'"}]}}`
Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["db1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "db1.products" ,"filter":"color='blue'"}]}}`
また、additional-conditions
プロパティーを使用して、複数の列に基づいて条件を渡すこともできます。たとえば、前の例と同じ products
テーブルを使用して、color='blue'
および brand='MyBrand'
である products
テーブルのコンテンツのみをスナップショットに含める場合は、次のリクエストを送信できます。
Key = `test_connector` Value = `{"type":"execute-snapshot","data": {"data-collections": ["db1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "db1.products" ,"filter":"color='blue' AND brand='MyBrand'"}]}}`
Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["db1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "db1.products" ,"filter":"color='blue' AND brand='MyBrand'"}]}}`
2.2.1.6.3. 増分スナップショットの停止 リンクのコピーリンクがクリップボードにコピーされました!
状況によっては、増分スナップショットを停止する必要がある場合があります。たとえば、スナップショットが正しく設定されていない場合や、他のデータベース操作にリソースが使用可能であるこのとの確認が必要な場合があります。ソースデータベースのシグナリングテーブルにシグナルを送信することで、すでに実行中のスナップショットを停止できます。
スナップショット停止信号をシグナリングテーブルに送信するには、SQL INSERT
クエリーで送信します。stop-snapshot シグナルは、スナップショット操作の type
を incremental
として指定し、オプションで、現在実行中のスナップショットから省略するテーブルを指定します。Debezium はシグナルテーブルの変更を検出した後、シグナルを読み、増分スナップショット操作が進行中であればそれを停止します。
関連情報
また、JSON メッセージを Kafka シグナリングトピック に送信して、増分スナップショットを停止することもできます。
前提条件
- ソースデータベースにシグナリングデータコレクションが存在する。
-
シグナルデータコレクションが
signal.data.collection
プロパティーで指定されている。
ソースシグナリングチャネルを使用して増分スナップショットを停止する
SQL クエリーを送信して、シグナリングテーブルへのアドホックインクリメンタルスナップショットを停止します。
INSERT INTO <signalTable> (id, type, data) values ('<id>', 'stop-snapshot', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"incremental"}');
INSERT INTO <signalTable> (id, type, data) values ('<id>', 'stop-snapshot', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"incremental"}');
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 以下に例を示します。
INSERT INTO db1.debezium_signal (id, type, data) values ('ad-hoc-1', 'stop-snapshot', '{"data-collections": ["db1.table1", "db1.table2"], "type":"incremental"}');
INSERT INTO db1.debezium_signal (id, type, data)
1 values ('ad-hoc-1',
2 'stop-snapshot',
3 '{"data-collections": ["db1.table1", "db1.table2"],
4 "type":"incremental"}');
5 Copy to Clipboard Copied! Toggle word wrap Toggle overflow signal コマンドの
id
、type
、およびdata
パラメーターの値は、シグナリングテーブルのフィールド に対応します。
以下の表では、この例のパラメーターを説明しています。Expand 表2.31 シグナリングテーブルに増分スナップショット停止信号を送信するための SQL コマンドのフィールドの説明 項目 値 説明 1
database.debezium_signal
ソースデータベースにあるシグナルテーブルの完全修飾名を指定します。
2
ad-hoc-1
id
パラメーターは、シグナルリクエストのID
識別子として割り当てられる任意の文字列を指定します。
この文字列を使用して、シグナルテーブルのエントリーへのログメッセージを特定します。Debezium はこの文字列を使用しません。3
stop-snapshot
type
パラメーターを指定し、シグナルがトリガーする操作を指定します。
4
data-collections
シグナルの
data
フィールドのオプションコンポーネントで、スナップショットから削除するテーブル名の配列またはテーブル名とマッチする正規表現を指定します。
配列には、database.table
の形式で完全修飾名でテーブルに一致する正規表現がリストされます。data
フィールドからこのコンポーネントを省略すると、シグナルによって進行中の増分スナップショット全体が停止されます。5
incremental
停止するスナップショット操作のタイプを指定する信号の
data
フィールドの必須コンポーネント。
現在、有効な唯一のオプションはincremental
です。type
の値を指定しない場合、シグナルは増分スナップショットの停止に失敗します。
2.2.1.6.4. Kafka シグナリングチャネルを使用して増分スナップショットを停止する リンクのコピーリンクがクリップボードにコピーされました!
設定された Kafka シグナリングトピック にシグナルメッセージを送信して、アドホック増分スナップショットを停止できます。
Kafka メッセージのキーは、topic.prefix
コネクター設定オプションの値と一致する必要があります。
メッセージの値は、type
と data
フィールドが含まれる JSON オブジェクトとなっています。
シグナルタイプは stop-snapshot
で、data
フィールドには以下のフィールドが必要です。
フィールド | デフォルト | 値 |
---|---|---|
|
|
実行するスナップショットのタイプ。現在、Debezium は |
| 該当なし |
テーブルの完全修飾名に一致する、コンマで区切られた正規表現のオプションの配列、スナップショットから削除するテーブル名に一致するテーブル名または正規表現の配列。 |
次の例は、典型的な stop-snapshot
の Kafka メッセージを示しています。
Key = `test_connector` Value = `{"type":"stop-snapshot","data": {"data-collections": ["db1.table1", "db1.table2"], "type": "INCREMENTAL"}}`
Key = `test_connector`
Value = `{"type":"stop-snapshot","data": {"data-collections": ["db1.table1", "db1.table2"], "type": "INCREMENTAL"}}`
2.2.1.7. ブロッキングスナップショット リンクのコピーリンクがクリップボードにコピーされました!
スナップショットをより柔軟に管理するために、Debezium には ブロッキングスナップショット と呼ばれる補助アドホックスナップショットメカニズムが含まれています。ブロッキングスナップショットは、Debezium コネクターにシグナルを送信 するための Debezium メカニズムに依存します。
ブロッキングスナップショットは、ランタイム時にトリガーできることを除いて、初期スナップショット と同じように動作します。
次のような状況では、標準の初期スナップショットプロセスを使用するのではなく、ブロッキングスナップショットを実行する必要があります。
- 新しいテーブルを追加し、コネクターの実行中にスナップショットを完了したいと考えている。
- 大きなテーブルを追加し、増分スナップショットよりも短い時間でスナップショットを完了したいと考えている。
ブロッキングスナップショットのプロセス
ブロッキングスナップショットを実行すると、Debezium はストリーミングを停止し、初期スナップショットの時と同じプロセスに従って、指定されたテーブルのスナップショットを開始します。スナップショットが完了すると、ストリーミングが再開されます。
スナップショットの設定
シグナルの data
コンポーネントでは、次のプロパティーを設定できます。
- data-collections: スナップショットする必要のあるテーブルを指定します。
-
data-collections: スナップショットに含めるテーブルを指定します。
このプロパティーは、完全修飾テーブル名に一致する正規表現のコンマ区切りリストを受け入れます。プロパティーの動作は、ブロッキングスナップショットでキャプチャーするテーブルを指定するtable.include.list
プロパティーの動作と似ています。 additional-conditions: テーブルごとに異なるフィルターを指定できます。
-
data-collection
プロパティーは、フィルターが適用されるテーブルの完全修飾名であり、データベースに応じて大文字と小文字を区別するか、区別しないかを指定できます。 -
filter
プロパティーは、snapshot.select.statement.overrides
で使用される値と同じものが設定されます。これは、大文字小文字を区別して一致させる必要があるテーブルの完全修飾名です。
-
以下に例を示します。
{"type": "blocking", "data-collections": ["schema1.table1", "schema1.table2"], "additional-conditions": [{"data-collection": "schema1.table1", "filter": "SELECT * FROM [schema1].[table1] WHERE column1 = 0 ORDER BY column2 DESC"}, {"data-collection": "schema1.table2", "filter": "SELECT * FROM [schema1].[table2] WHERE column2 > 0"}]}
{"type": "blocking", "data-collections": ["schema1.table1", "schema1.table2"], "additional-conditions": [{"data-collection": "schema1.table1", "filter": "SELECT * FROM [schema1].[table1] WHERE column1 = 0 ORDER BY column2 DESC"}, {"data-collection": "schema1.table2", "filter": "SELECT * FROM [schema1].[table2] WHERE column2 > 0"}]}
重複の可能性
スナップショットをトリガーするシグナルを送信した時点と、ストリーミングが停止してスナップショットが開始する時点との間に遅延が生じる可能性があります。この遅延の結果、スナップショットが完了した後、コネクターはスナップショットがキャプチャーしたレコードと重複するイベントレコードを発行する可能性があります。
2.2.1.8. Debezium MariaDB 変更イベントレコードを受信する Kafka トピックのデフォルト名 リンクのコピーリンクがクリップボードにコピーされました!
デフォルトでは、MariaDB コネクターは、テーブルで発生するすべての INSERT
、UPDATE
、および DELETE
操作の変更イベントを、そのテーブルに固有の単一の Apache Kafka トピックに書き込みます。
コネクターは以下の規則を使用して変更イベントトピックに名前を付けます。
topicPrefix.databaseName.tableName
fulfillment
はトピック接頭辞、inventory
はデータベース名で、データベースに orders
、customers
、および products
という名前のテーブルが含まれるとします。Debezium MariaDB コネクターは、データベース内の各テーブルに 1 つずつ、3 つの Kafka トピックにイベントを送信します。
fulfillment.inventory.orders fulfillment.inventory.customers fulfillment.inventory.products
fulfillment.inventory.orders
fulfillment.inventory.customers
fulfillment.inventory.products
以下のリストは、デフォルト名のコンポーネントの定義を示しています。
- topicPrefix
-
topic.prefix
コネクター設定プロパティーで指定されたトピック接頭辞。 - schemaName
- 操作が発生したスキーマの名前。
- tableName
- 操作が発生したテーブルの名前。
コネクターは同様の命名規則を適用して、内部データベーススキーマの履歴トピック (スキーマ変更トピック と トランザクションメタデータトピック) にラベルを付けます。
デフォルトのトピック名が要件を満たさない場合は、カスタムトピック名を設定できます。カスタムトピック名を設定するには、論理トピックルーティング SMT に正規表現を指定します。論理トピックルーティング SMT を使用してトピックの命名をカスタマイズする方法は、トピックルーティング を参照してください。
トランザクションメタデータ
Debezium は、トランザクション境界を表し、データ変更イベントメッセージを強化するイベントを生成できます。
Debezium は、コネクターのデプロイ後に発生するトランザクションに対してのみメタデータを登録し、受信します。コネクターをデプロイする前に発生するトランザクションのメタデータは利用できません。
Debezium は、すべてのトランザクションで BEGIN
および END
区切り文字のトランザクション境界イベントを生成します。トランザクション境界イベントには以下のフィールドが含まれます。
status
-
BEGIN
またはEND
id
- 一意のトランザクション識別子の文字列表現。
ts_ms
-
データソースでのトランザクション境界イベント (
BEGIN
またはEND
イベント) の時間。データソースから Debezium にイベント時間を渡されない場合、フィールドは代わりに Debezium がイベントを処理する時間を表します。 event_count
(END
イベント用)- トランザクションによって出力されるイベントの合計数。
data_collections
(END
イベント用)-
data_collection
とevent_count
要素のペアの配列。これは、コネクターがデータコレクションから発信された変更に対して出力するイベントの数を示します。
例
topic.transaction
オプションで上書きされない限り、コネクターはトランザクションイベントを <topic.prefix>
.transaction
トピックに出力します。
変更データイベントのエンリッチメント
トランザクションメタデータを有効にすると、データメッセージ Envelope
は新しい transaction
フィールドで強化されます。このフィールドは、複合フィールドの形式ですべてのイベントに関する情報を提供します。
id
- 一意のトランザクション識別子の文字列表現。
total_order
- トランザクションによって生成されたすべてのイベントを対象とするイベントの絶対位置。
data_collection_order
- トランザクションによって出力されたすべてのイベントを対象とするイベントのデータコレクションごとの位置。
以下は、メッセージの例になります。
2.2.2. Debezium MariaDB コネクターデータ変更イベントの説明 リンクのコピーリンクがクリップボードにコピーされました!
Debezium MariaDB コネクターは、行レベルの INSERT
、UPDATE
、および DELETE
操作ごとにデータ変更イベントを生成します。各イベントにはキーと値が含まれます。キーと値の構造は、変更されたテーブルによって異なります。
Debezium および Kafka Connect は、イベントメッセージの継続的なストリーム を中心として設計されています。ただし、これらのイベントの構造は時間の経過とともに変化する可能性があり、コンシューマーによる処理が困難になることがあります。これに対応するために、各イベントにはコンテンツのスキーマが含まれます。スキーマレジストリーを使用している場合は、コンシューマーがレジストリーからスキーマを取得するために使用できるスキーマ ID が含まれます。これにより、各イベントが自己完結型になります。
以下のスケルトン JSON は、変更イベントの基本となる 4 つの部分を示しています。ただし、アプリケーションで使用するために選択した Kafka Connect コンバーターの設定方法によって、変更イベントのこれら 4 部分の表現が決定されます。schema
フィールドは、変更イベントが生成されるようにコンバーターを設定した場合のみ変更イベントに含まれます。同様に、イベントキーおよびイベントペイロードは、変更イベントが生成されるようにコンバーターを設定した場合のみ変更イベントに含まれます。JSON コンバーターを使用し、変更イベントの基本となる 4 つの部分すべてを生成するように設定すると、変更イベントの構造は次のようになります。
項目 | フィールド名 | 説明 |
---|---|---|
1 |
|
最初の |
2 |
|
最初の |
3 |
|
2 つ目の |
4 |
|
2 つ目の |
デフォルトでは、コネクターによって、変更イベントレコードがイベントの元のテーブルと同じ名前を持つトピックにストリーミングされます。トピック名 を参照してください。
MariaDB コネクターは、すべての Kafka Connect スキーマ名が Avro スキーマ名形式 に準拠していることを確認します。つまり、論理サーバー名はアルファベットまたはアンダースコア (a-z、A-Z、または _) で始まる必要があります。論理サーバー名の残りの各文字と、データベース名とテーブル名の各文字は、アルファベット、数字、またはアンダースコア ( a-z、A-Z、0-9、または _) でなければなりません。無効な文字がある場合は、アンダースコアに置き換えられます。
論理サーバー名、データベース名、またはテーブル名に無効な文字が含まれ、名前を区別する唯一の文字が無効であると、無効な文字はすべてアンダースコアに置き換えられるため、予期せぬ競合が発生する可能性があります。
詳細は以下を参照してください。
2.2.2.1. Debezium MariaDB 変更イベントのキーについて リンクのコピーリンクがクリップボードにコピーされました!
変更イベントのキーには、変更されたテーブルのキーのスキーマと、変更された行の実際のキーのスキーマが含まれます。スキーマとそれに対応するペイロードの両方には、コネクターによってイベントが作成された時点において、変更されたテーブルの PRIMARY KEY
(または一意の制約) に存在した各列のフィールドが含まれます。
以下の customers
テーブルについて考えてみましょう。この後に、このテーブルの変更イベントキーの例を示します。
customers
テーブルへの変更をキャプチャーする変更イベントのすべてに、イベントキースキーマがあります。customers
テーブルに前述の定義がある限り、customers
テーブルへの変更をキャプチャーする変更イベントのキー構造はすべて以下のようになります。JSON では、以下のようになります。
項目 | フィールド名 | 説明 |
---|---|---|
1 |
|
キーのスキーマ部分は、キーの |
2 |
|
キーのペイロードの構造を定義するスキーマの名前。このスキーマは、変更されたテーブルのプライマリーキーの構造を記述します。キースキーマ名の形式は connector-name.database-name.table-name.
|
3 |
|
イベントキーの |
4 |
|
各フィールドの名前、型、および必要かどうかなど、 |
5 |
|
この変更イベントが生成された行のキーが含まれます。この例では、キーには値が |
2.2.2.2. Debezium MariaDB 変更イベントの値について リンクのコピーリンクがクリップボードにコピーされました!
変更イベントの値はキーよりも若干複雑です。キーと同様に、値には schema
セクションと payload
セクションがあります。schema
セクションには、入れ子のフィールドを含む、Envelope
セクションの payload
構造を記述するスキーマが含まれています。データを作成、更新、または削除する操作のすべての変更イベントには、Envelope 構造を持つ値 payload があります。
変更イベントキーの例を紹介するために使用した、同じサンプルテーブルについて考えてみましょう。
このテーブルへの変更に対する変更イベントの値部分には以下について記述されています。
create イベント
以下の例は、customers
テーブルにデータを作成する操作に対して、コネクターによって生成される変更イベントの値の部分を示しています。
項目 | フィールド名 | 説明 |
---|---|---|
1 |
| 値のペイロードの構造を記述する、値のスキーマ。変更イベントの値スキーマは、コネクターが特定のテーブルに生成するすべての変更イベントで同じになります。 |
2 |
|
|
3 |
|
|
4 |
|
|
5 |
|
値の実際のデータ。これは、変更イベントが提供する情報です。 |
6 |
|
コネクターによってイベントが生成される原因となった操作の型を記述する必須文字列。この例では、
|
7 |
|
コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。 |
8 |
|
イベント発生前の行の状態を指定する任意のフィールド。この例のように、 |
9 |
|
イベント発生後の行の状態を指定する任意のフィールド。この例では、 |
10 |
| イベントのソースメタデータを記述する必須のフィールド。このフィールドには、イベントの発生元、イベントの発生順序、およびイベントが同じトランザクションの一部であるかどうかなど、このイベントと他のイベントを比較するために使用できる情報が含まれています。ソースメタデータには以下が含まれています。
MariaDB データベース設定で |
更新 イベント
サンプル customers
テーブルにある更新の変更イベントの値には、そのテーブルの 作成 イベントと同じスキーマがあります。同様に、イベント値のペイロードは同じ構造を持ちます。ただし、イベント値ペイロードでは 更新 イベントに異なる値が含まれます。以下は、コネクターによって customers
テーブルでの更新に生成されるイベントの変更イベント値の例になります。
項目 | フィールド名 | 説明 |
---|---|---|
1 |
|
イベント発生前の行の状態を指定する任意のフィールド。更新 イベント値の |
2 |
|
イベント発生後の行の状態を指定する任意のフィールド。 |
3 |
|
イベントのソースメタデータを記述する必須のフィールド。
MariaDB データベース設定で |
4 |
|
操作の型を記述する必須の文字列。更新 イベントの値では、 |
5 |
|
コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。 |
6 |
| コネクターがイベントを処理した時間をマイクロ秒単位で表示するオプションフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。 |
7 |
| コネクターがイベントを処理した時間をナノ秒単位で表示するオプションフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。 |
行のプライマリーキー/一意キーの列を更新すると、行のキーの値が変更されます。キーが変更されると、3 つ のイベントが Debezium によって出力されます。3 つのイベントとは、DELETE
イベント、行の古いキーを持つ 廃棄 (tombstone)、およびそれに続く行の新しいキーを持つイベントです。詳細は次のセクションで説明します。
プライマリーキーの更新
行のプライマリーキーフィールドを変更する UPDATE
操作は、プライマリーキーの変更と呼ばれます。プライマリーキーの変更では、UPDATE
イベントレコードの代わりにコネクターが古いキーの DELETE
イベントレコードと、新しい (更新された) キーの CREATE
イベントレコードを出力します。これらのイベントには通常の構造と内容があり、イベントごとにプライマリーキーの変更に関連するメッセージヘッダーがあります。
-
DELETE
イベントレコードには、メッセージヘッダーとして__debezium.newkey
が含まれます。このヘッダーの値は、更新された行の新しいプライマリーキーです。 -
CREATE
イベントレコードには、メッセージヘッダーとして__debezium.oldkey
が含まれます。このヘッダーの値は、更新された行にあった以前の (古い) プライマリーキーです。
delete イベント
削除 変更イベントの値は、同じテーブルの 作成 および 更新 イベントと同じ schema
の部分になります。サンプル customers
テーブルの 削除 イベントの payload
部分は以下のようになります。
項目 | フィールド名 | 説明 |
---|---|---|
1 |
|
イベント発生前の行の状態を指定する任意のフィールド。削除 イベント値の |
2 |
|
イベント発生後の行の状態を指定する任意のフィールド。削除 イベント値の |
3 |
|
イベントのソースメタデータを記述する必須のフィールド。削除 イベント値の
MariaDB データベース設定で |
4 |
|
操作の型を記述する必須の文字列。 |
5 |
|
コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。 |
6 |
| コネクターがイベントを処理した時間をマイクロ秒単位で表示するオプションフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。 |
7 |
| コネクターがイベントを処理した時間をナノ秒単位で表示するオプションフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。 |
削除 変更イベントレコードは、この行の削除を処理するために必要な情報を持つコンシューマーを提供します。コンシューマーによっては、削除を適切に処理するために古い値が必要になることがあるため、古い値が含まれます。
MariaDB コネクターイベントは、Kafka log compaction と連携するように設計されています。ログコンパクションにより、少なくとも各キーの最新のメッセージが保持される限り、一部の古いメッセージを削除できます。これにより、トピックに完全なデータセットが含まれ、キーベースの状態のリロードに使用できるようにするとともに、Kafka がストレージ領域を確保できるようにします。
tombstone イベント
行が削除された場合でも、Kafka は同じキーを持つ以前のメッセージをすべて削除できるため、削除 イベントの値はログコンパクションで動作します。ただし、Kafka が同じキーを持つすべてのメッセージを削除するには、メッセージの値が null
である必要があります。これを可能にするために、Debezium MariaDB コネクターは delete イベントを発行した後、null
値以外の同じキーを持つ特別な tombstone イベントを発行します。
truncate イベント
truncate 変更イベントは、テーブルが切り捨てられたことを通知します。truncate イベントのメッセージキーが null
です。メッセージの値は次の例のようになります。
項目 | フィールド名 | 説明 |
---|---|---|
1 |
|
イベントのソースメタデータを記述する必須のフィールド。truncate イベント値の
|
2 |
|
操作の型を記述する必須の文字列。 |
3 |
|
コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。 |
4 |
| コネクターがイベントを処理した時間をマイクロ秒単位で表示するオプションフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。 |
5 |
| コネクターがイベントを処理した時間をナノ秒単位で表示するオプションフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。 |
1 つの TRUNCATE
ステートメントが複数のテーブルに適用される場合、コネクターは切り捨てられたテーブルごとに 1 つの truncate 変更イベントレコードを出力します。
truncate イベントは、テーブル全体に対して加えられ、メッセージキーを持たない変更を表します。その結果、複数のパーティションがあるトピックでは、変更イベント (create、update など)、またはテーブルに関連する truncate イベントについて、順序は保証されません。たとえば、コンシューマーが複数のパーティションからテーブルのイベントを読み取る場合、別のパーティションからテーブル内のすべてのデータを削除する truncate イベントを受信した後、あるパーティションからテーブルの update イベントを受け取る可能性があります。順序は、単一のパーティションを使用するトピックでのみ保証されます。
コネクターに truncate イベントをキャプチャーさせたくない場合は、skipped.operations
オプションを使用して除外します。
2.2.3. Debezium MariaDB コネクターがデータ型をマッピングする方法 リンクのコピーリンクがクリップボードにコピーされました!
Debezium MariaDB コネクターは、行が存在するテーブルのように構造化されたイベントを使用して行の変更を表します。イベントには、各列の値のフィールドが含まれます。その列の MariaDB データ型によって、Debezium がイベント内の値をどのように表現するかが決まります。
文字列を格納する列は、MariaDB では文字セットと照合順序を使用して定義されます。MariaDB コネクターは、binlog イベント内の列値のバイナリー表現を読み取るときに、列の文字セットを使用します。
コネクターは、MariaDB データ型を literal と semantic 型の両方にマップできます。
- リテラル型: Kafka Connect スキーマタイプを使用して値がどのように表されるか。
- セマンティック型: Kafka Connect スキーマがどのようにフィールド (スキーマ名) の意味をキャプチャーするか。
デフォルトのデータ型変換が要件に合わない場合は、コネクター用の カスタムコンバーターの作成 が可能です。
詳細は以下を参照してください。
基本型
次の表は、コネクターが基本的な MariaDB データ型をどのようにマッピングするかを示しています。
MariaDB タイプ | リテラル型 | セマンティック型 |
---|---|---|
|
| 該当なし |
|
| 該当なし |
|
|
|
|
| 該当なし |
|
| 該当なし |
|
| 該当なし |
|
| 該当なし |
|
| 該当なし |
|
| 該当なし |
|
|
精度は、ストレージサイズを決定するためにのみ使用されます。0 から 23 までの精度 |
|
| 該当なし |
|
| 該当なし |
|
| 該当なし |
|
| 該当なし |
|
|
該当なし |
|
|
該当なし |
|
|
該当なし |
|
| 該当なし |
|
|
該当なし |
|
|
n/a |
|
|
該当なし |
|
| 該当なし |
|
|
該当なし |
|
|
n/a |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
時間型
TIMESTAMP
データ型を除き、MariaDB の時間型は、time.precision.mode
コネクター設定プロパティーの値に依存します。デフォルト値が CURRENT_TIMESTAMP
または NOW
として指定される TIMESTAMP
列では、Kafka Connect スキーマのデフォルト値として値 1970-01-01 00:00:00
が使用されます。
MariaDB では、ゼロ値が null 値よりも優先される場合があるため、DATE
、DATETIME
、および TIMESTAMP
列にゼロ値を許可します。MariaDB コネクターは、列定義で null 値が許可されている場合はゼロ値を null 値として表し、列で null 値が許可されていない場合はエポック日として表します。
タイムゾーンのない時間型
DATETIME
型は、"2018-01-13 09:48:27" のようにローカルの日時を表します。タイムゾーンの情報は含まれません。このような列は、UTC を使用して列の精度に基づいてエポックミリ秒またはマイクロ秒に変換されます。TIMESTAMP
型は、タイムゾーン情報のないタイムスタンプを表します。MariaDB によって、書き込み時にサーバー (またはセッション) の現在のタイムゾーンから UTC に変換され、値を読み戻すときに UTC からサーバー (またはセッション) の現在のタイムゾーンに変換されます。以下に例を示します。
-
値が
2018-06-20 06:37:03
のDATETIME
は、1529476623000
になります。 -
値が
2018-06-20 06:37:03
のTIMESTAMP
は2018-06-20T13:37:03Z
になります。
このような列は、サーバー (またはセッション) の現在のタイムゾーンに基づいて、UTC の同等の io.debezium.time.ZonedTimestamp
に変換されます。タイムゾーンは、デフォルトでサーバーからクエリーされます。
これが失敗した場合は、データベースの timezone
MariaDB 設定オプションで明示的に指定する必要があります。たとえば、データベースのタイムゾーン (グローバルに設定されているか、timezone
オプションを使用してコネクター用に設定されている) が "America/Los_Angeles" の場合、TIMESTAMP 値 "2018-06-20 06:37:03" は、値 "2018-06-20T13:37:03Z" を持つ ZonedTimestamp
で表されます。
Kafka Connect および Debezium を実行する JVM のタイムゾーン設定は、これらの変換には影響しません。
時間値に関連するプロパティーの詳細は、MariaDB コネクター設定プロパティー のドキュメントを参照してください。
- time.precision.mode=adaptive_time_microseconds(default)
MariaDB コネクターは、列のデータ型定義に基づいてリテラル型とセマンティック型を決定し、イベントがデータベース内の値を正確に表すようにします。すべての時間フィールドはマイクロ秒単位です。正しくキャプチャーされる
TIME
フィールドの値は、範囲が00:00:00.000000
から23:59:59.999999
までの正の値です。Expand 表2.40 time.precision.mode=adaptive_time_microseconds の場合のマッピング MariaDB タイプ リテラル型 セマンティック型 DATE
INT32
io.debezium.time.Date
エポックからの日数を表します。TIME[(M)]
INT64
io.debezium.time.MicroTime
時間の値をマイクロ秒単位で表し、タイムゾーン情報は含まれません。MariaDB では、M
は0 - 6
の範囲で指定できます。DATETIME, DATETIME(0), DATETIME(1), DATETIME(2), DATETIME(3)
INT64
io.debezium.time.Timestamp
エポックからの経過時間をミリ秒で表し、タイムゾーン情報は含まれません。DATETIME(4), DATETIME(5), DATETIME(6)
INT64
io.debezium.time.MicroTimestamp
エポックからの経過時間をマイクロ秒で表し、タイムゾーン情報は含まれません。- time.precision.mode=connect
MariaDB コネクターは、定義済みの Kafka Connect 論理型を使用します。この方法はデフォルトの方法よりも精度が低く、データベース列に
3
を超える 少数秒の精度 値がある場合は、イベントの精度が低くなる可能性があります。00:00:00.000
から23:59:59.999
までの値のみを処理できます。テーブルのtime.precision.mode=connect
の値が、必ずサポートされる範囲内になるようにすることができる場合のみ、TIME
を設定します。connect
設定は、今後の Debezium バージョンで削除される予定です。Expand 表2.41 time.precision.mode=connect の場合のマッピング MariaDB タイプ リテラル型 セマンティック型 DATE
INT32
org.apache.kafka.connect.data.Date
エポックからの日数を表します。TIME[(M)]
INT64
org.apache.kafka.connect.data.Time
午前 0 時以降の時間値をマイクロ秒で表し、タイムゾーン情報は含まれません。DATETIME[(M)]
INT64
org.apache.kafka.connect.data.Timestamp
エポックからの経過時間をミリ秒で表し、タイムゾーン情報は含まれません。
10 進数型
Debezium コネクターは、decimal.handling.mode
コネクター設定プロパティーの設定に従って小数を処理します。
- decimal.handling.mode=precise
Expand 表2.42 decimal.handling.mode=precise の場合のマッピング MariaDB タイプ リテラル型 セマンティック型 NUMERIC[(M[,D])]
BYTES
org.apache.kafka.connect.data.Decimal
scale
スキーマパラメーターには、小数点を移動した桁数を表す整数が含まれます。DECIMAL[(M[,D])]
BYTES
org.apache.kafka.connect.data.Decimal
scale
スキーマパラメーターには、小数点を移動した桁数を表す整数が含まれます。- decimal.handling.mode=double
Expand 表2.43 decimal.handling.mode=double の場合のマッピング MariaDB タイプ リテラル型 セマンティック型 NUMERIC[(M[,D])]
FLOAT64
該当なし
DECIMAL[(M[,D])]
FLOAT64
該当なし
- decimal.handling.mode=string
Expand 表2.44 decimal.handling.mode=string の場合のマッピング MariaDB タイプ リテラル型 セマンティック型 NUMERIC[(M[,D])]
STRING
該当なし
DECIMAL[(M[,D])]
STRING
該当なし
ブール値
MariaDB は、BOOLEAN
値を内部的に特定の方法で処理します。BOOLEAN
列は、内部で TINYINT(1)
データ型にマッピングされます。ストリーミング中にテーブルが作成されると、Debezium は元の DDL を受信するため、適切な BOOLEAN
マッピングが使用されます。スナップショットの作成中、Debezium は SHOW CREATE TABLE
を実行して、BOOLEAN
と TINYINT(1)
の両方の列に TINYINT(1)
を返すテーブル定義を取得します。その後、Debezium は元の型のマッピングを取得する方法はないため、TINYINT(1)
にマッピングします。
ソース列をブールデータ型に変換できるように、Debezium は TinyIntOneToBooleanConverter
カスタムコンバーター を提供しています。これは、以下のいずれかの方法で使用できます。
-
すべての
TINYINT(1)
またはTINYINT(1) UNSIGNED
列をBOOLEAN
型にマップします。 正規表現のコンマ区切りリストを使用して、列のサブセットを列挙します。
このタイプの変換を使用するには、以下の例のようにselector
パラメーターを使用してconverters
設定プロパティーを設定する必要があります。converters=boolean boolean.type=io.debezium.connector.binlog.converters.TinyIntOneToBooleanConverter boolean.selector=db1.table1.*, db1.table2.column1
converters=boolean boolean.type=io.debezium.connector.binlog.converters.TinyIntOneToBooleanConverter boolean.selector=db1.table1.*, db1.table2.column1
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 注意: 場合によっては、スナップショットが
SHOW CREATE TABLE
を実行したときに、データベースがtinyint unsigned
の長さを表示されないため、このコンバーターは機能しません。新しいオプションlength.checker
はこの問題を解決することができます。デフォルト値はtrue
です。次の例に示すように、length.checker
を無効にして、タイプに基づいてすべての列を変換するのではなく、変換が必要な列をselected
プロパティーに指定します。converters=boolean boolean.type=io.debezium.connector.binlog.converters.TinyIntOneToBooleanConverter boolean.length.checker=false boolean.selector=db1.table1.*, db1.table2.column1
converters=boolean boolean.type=io.debezium.connector.binlog.converters.TinyIntOneToBooleanConverter boolean.length.checker=false boolean.selector=db1.table1.*, db1.table2.column1
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
空間型
現在、Debezium MariaDB コネクターは次の空間データ型をサポートしています。
MariaDB タイプ | リテラル型 | セマンティック型 |
---|---|---|
|
|
|
2.2.4. MariaDB データを代替データ型にマッピングするためのカスタムコンバーター リンクのコピーリンクがクリップボードにコピーされました!
デフォルトでは、Debezium MariaDB コネクターは、MariaDB データ型用の CustomConverter
実装を複数提供します。これらのカスタムコンバーターは、コネクター設定に基づいて特定のデータ型に対する代替マッピングを提供します。コネクターに CustomConverter
を追加するには、カスタムコンバーターのドキュメント の指示に従ってください。
TINYINT(1)
からブール値
デフォルトでは、コネクターのスナップショット中に、Debezium MariaDB コネクターは JDBC ドライバーから列タイプを取得し、BOOLEAN
列に TINYINT(1)
タイプを割り当てます。Debezium はこれらの JDBC 列タイプを使用して、スナップショットイベントのスキーマを定義します。コネクターがスナップショットからストリーミングフェーズに移行した後、デフォルトのマッピングから生じる変更イベントスキーマによって、BOOLEAN
列のマッピングが不整合になる可能性があります。MariaDB が BOOLEAN
列を均一に出力するようにするには、次の設定例に示すように、カスタム TinyIntOneToBooleanConverter
を適用できます。
例: TinyIntOneToBooleanConverter
の設定
converters=tinyint-one-to-boolean tinyint-one-to-boolean.type=io.debezium.connector.binlog.converters.TinyIntOneToBooleanConverter tinyint-one-to-boolean.selector=.*.MY_TABLE.DATA tinyint-one-to-boolean.length.checker=false
converters=tinyint-one-to-boolean
tinyint-one-to-boolean.type=io.debezium.connector.binlog.converters.TinyIntOneToBooleanConverter
tinyint-one-to-boolean.selector=.*.MY_TABLE.DATA
tinyint-one-to-boolean.length.checker=false
前の例では、selector
と length.checker
プロパティーはオプションです。デフォルトでは、コンバーターは TINYINT
データ型の長さが 1
であることをチェックします。length.checker
が false
の場合、コンバーターは TINYINT
データ型の長さが 1
であることを明示的に確認しません。selector
は、指定された正規表現に基づいて、変換するテーブルまたは列を指定します。selector
プロパティーを省略すると、コンバーターはすべての TINYINT
列を論理 BOOL
フィールドタイプにマップします。selector
オプションを設定せず、TINYINT
列を TINYINT(1)
にマップする場合は、length.checker
プロパティーを省略するか、その値を true
に設定します。
JDBC sink のデータ型
Debezium JDBC sink コネクターを Debezium MariaDB ソースコネクターと統合すると、MariaDB コネクターはスナップショットフェーズとストリーミングフェーズ中に一部の列属性を異なる方法で出力します。JDBC sink コネクターがスナップショットフェーズとストリーミングフェーズの両方からの変更を一貫して使用するには、次の例に示すように、JdbcSinkDataTypesConverter
コンバータを MariaDB ソースコネクター設定の一部として含める必要があります。
例: JdbcSinkDataTypesConverter
設定
前の例では、selector.*
および treat.real.as.double
設定プロパティーはオプションです。
selector.*
プロパティーは、コンバーターが適用されるテーブルと列を指定する正規表現のコンマ区切りリストを指定します。デフォルトでは、コンバーターはすべてのテーブルに含まれるすべてのブール値、実数、および文字列ベースの列データ型に次のルールを適用します。
-
BOOLEAN
データ型は常にINT16
論理型として出力され、1
はtrue
、0
はfalse
を表します。 -
REAL
データ型は常にFLOAT64
論理型として出力されます。 -
文字列ベースの列には、列の文字セットを含む
__debezium.source.column.character_set
スキーマパラメーターが常に含まれます。
各データ型について、デフォルトのスコープをオーバーライドし、セレクターを特定のテーブルと列にのみ適用するセレクタールールを設定できます。たとえば、ブールコンバーターのスコープを設定するには、前の例のように、converters.jdbc-sink.selector.boolean=.*.MY_TABLE.BOOL_COL
のルールをコネクター設定に追加します。
2.2.5. Debezium コネクターを実行するための MariaDB の設定 リンクのコピーリンクがクリップボードにコピーされました!
Debezium コネクターをインストールして実行する前に、MariaDB セットアップタスクが複数必要です。
詳細は以下を参照してください。
2.2.5.1. Debezium コネクター用の MariaDB ユーザーの作成 リンクのコピーリンクがクリップボードにコピーされました!
Debezium MariaDB コネクターには MariaDB ユーザーアカウントが必要です。この MariaDB ユーザーには、Debezium MariaDB コネクターが変更をキャプチャーするデータベースすべてに対して適切な権限が必要です。
前提条件
- MariaDB サーバー。
- SQL コマンドの基本知識。
手順
MariaDB ユーザーを作成します。
mariadb> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';
mariadb> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 必要なパーミッションをユーザーに付与します。
mariadb> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
mariadb> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 必要なパーミッションの説明は、表2.46「ユーザーパーミッションの説明」 を参照してください。
重要グローバル読み取りロックを許可しない Amazon RDS や Amazon Aurora などのホストオプションを使用している場合、テーブルレベルのロックを使用して 整合性スナップショット を作成します。この場合、作成するユーザーに
LOCK TABLES
パーミッションも付与する必要があります。詳細は スナップショット を参照してください。ユーザーのパーミッションの最終処理を行います。
mariadb> FLUSH PRIVILEGES;
mariadb> FLUSH PRIVILEGES;
Copy to Clipboard Copied! Toggle word wrap Toggle overflow Expand 表2.46 ユーザーパーミッションの説明 キーワード 説明 SELECT
コネクターがデータベースのテーブルから行を選択できるようにします。これは、スナップショットを実行する場合にのみ使用されます。
RELOAD
内部キャッシュのクリアまたはリロード、テーブルのフラッシュ、またはロックの取得を行う
FLUSH
ステートメントをコネクターが使用できるようにします。これは、スナップショットを実行する場合にのみ使用されます。SHOW DATABASES
SHOW DATABASE
ステートメントを実行して、コネクターがデータベース名を確認できるようにします。これは、スナップショットを実行する場合にのみ使用されます。REPLICATION-SLAVE
コネクターが MariaDB サーバーの binlog に接続して読み取ることができるようになります。
REPLICATION CLIENT
コネクターが以下のステートメントを使用できるようにします。
-
SHOW MASTER STATUS
-
SHOW SLAVE STATUS
-
SHOW BINARY LOGS
これは必ずコネクターに必要です。
ON
パーミッションが適用されるデータベースを指定します。
TO 'user'
パーミッションを付与するユーザーを指定します。
IDENTIFIED BY 'password'
ユーザーの MariaDB パスワードを指定します。
-
2.2.5.2. Debezium の MariaDB バイナリーログの有効化 リンクのコピーリンクがクリップボードにコピーされました!
MariaDB レプリケーションのバイナリーログを有効にする必要があります。バイナリーログは、レプリカが変更を伝播できるようにトランザクションの更新を記録します。
前提条件
- MariaDB サーバー。
- 適切な MariaDB ユーザー権限。
手順
log-bin
オプションが有効になっているかどうかを確認します。mariadb> SHOW VARIABLES LIKE '%log_bin%';
mariadb> SHOW VARIABLES LIKE '%log_bin%';
Copy to Clipboard Copied! Toggle word wrap Toggle overflow binlog が
OFF
の場合は、次の表のプロパティーを MariaDB サーバーの設定ファイルに追加します。server-id = 223344 # Querying variable is called server_id, e.g. SELECT variable_value FROM information_schema.global_variables WHERE variable_name='server_id'; log_bin = mariadb-bin binlog_format = ROW binlog_row_image = FULL binlog_expire_logs_seconds = 864000
server-id = 223344 # Querying variable is called server_id, e.g. SELECT variable_value FROM information_schema.global_variables WHERE variable_name='server_id'; log_bin = mariadb-bin binlog_format = ROW binlog_row_image = FULL binlog_expire_logs_seconds = 864000
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 再度 binlog の状態をチェックして、変更を確認します。
mariadb> SHOW VARIABLES LIKE '%log_bin%';
mariadb> SHOW VARIABLES LIKE '%log_bin%';
Copy to Clipboard Copied! Toggle word wrap Toggle overflow Amazon RDS で MariaDB を実行する場合、バイナリーログを実行するには、データベースインスタンスの自動バックアップを有効にする必要があります。データベースインスタンスが自動バックアップを実行するように設定されていないと、前の手順で説明した設定を適用しても binlog は無効になります。
Expand 表2.47 MariaDB binlog 設定プロパティーの説明 プロパティー 説明 server-id
server-id
の値は、MariaDB クラスター内の各サーバーおよびレプリケーションクライアントごとに一意である必要があります。log_bin
log_bin
の値は、binlog ファイルのシーケンスのベース名です。binlog_format
binlog-format
はROW
またはrow
に設定する必要があります。binlog_row_image
binlog_row_image
はFULL
またはfull
に設定する必要があります。binlog_expire_logs_seconds
binlog_expire_logs_seconds
は、非推奨のシステム変数expire_logs_days
に対応します。これは、binlog ファイルを自動的に削除する秒数です。デフォルト値は2592000
で、つまり、30 日です。実際の環境に見合った値を設定します。詳細は、MariaDB による binlog ファイルの消去 を参照してください。
2.2.5.3. Debezium の MariaDB グローバルトランザクション識別子の有効化 リンクのコピーリンクがクリップボードにコピーされました!
グローバルトランザクション識別子 (GTID) は、クラスター内のサーバーで発生するトランザクションを一意に識別します。Debezium MariaDB コネクターでは必須ではありませんが、GTID を使用するとレプリケーションが簡素化され、プライマリーサーバーとレプリカサーバーの整合性が保たれているかどうかを簡単に確認できるようになります。
MariaDB の場合、GTID はデフォルトで有効になっており、追加の設定は必要ありません。
2.2.5.4. Debezium の MariaDB セッションタイムアウトの設定 リンクのコピーリンクがクリップボードにコピーされました!
大規模なデータベースに対して最初の整合性スナップショットが作成されると、テーブルの読み込み時に、確立された接続がタイムアウトする可能性があります。MariaDB 設定ファイルで interactive_timeout
と wait_timeout
を設定し、この動作を防ぐことができます。
前提条件
- MariaDB サーバー。
- SQL コマンドの基本知識。
- MariaDB 設定ファイルへのアクセス。
手順
interactive_timeout
を設定します。mariadb> interactive_timeout=<duration-in-seconds>
mariadb> interactive_timeout=<duration-in-seconds>
Copy to Clipboard Copied! Toggle word wrap Toggle overflow wait_timeout
を設定します。mariadb> wait_timeout=<duration-in-seconds>
mariadb> wait_timeout=<duration-in-seconds>
Copy to Clipboard Copied! Toggle word wrap Toggle overflow Expand 表2.48 MariaDB セッションタイムアウトオプションの説明 オプション 説明 interactive_timeout
サーバーが対話的な接続を閉じる前にアクティビティーの発生を待つ時間 (秒単位)。詳細は、MariaDB のドキュメント を参照してください。
wait_timeout
サーバーが非対話型接続を閉じる前にアクティビティーを待機する秒数。詳細は、MariaDB のドキュメント を参照してください。
2.2.5.5. Debezium MariaDB コネクターのクエリーログイベントの有効化 リンクのコピーリンクがクリップボードにコピーされました!
各 binlog イベントの元の SQL
ステートメントを確認したい場合があります。MariaDB 設定で binlog_annotate_row_events
オプションを有効にすると、これを実行できます。
前提条件
- MariaDB サーバー。
- SQL コマンドの基本知識。
- MariaDB 設定ファイルへのアクセス。
手順
MariaDB で
binlog_annotate_row_events
を有効にします。mariadb> binlog_annotate_row_events=ON
mariadb> binlog_annotate_row_events=ON
Copy to Clipboard Copied! Toggle word wrap Toggle overflow binlog_annotate_row_events
は、binlog エントリーにSQL
ステートメントが含まれるようにするためのサポートを有効または無効にする値に設定されます。-
ON
= 有効化 -
OFF
= 無効化
-
2.2.5.6. Debezium MariaDB コネクターの binlog 行値オプションの検証 リンクのコピーリンクがクリップボードにコピーされました!
データベース内の binlog_row_value_options
変数の設定を確認します。コネクターが UPDATE イベントを消費できるようにするには、この変数を PARTIAL_JSON
以外の値に設定する必要があります。
前提条件
- MariaDB サーバー。
- SQL コマンドの基本知識。
- MariaDB 設定ファイルへのアクセス。
手順
現在の変数値を確認する
mariadb> show global variables where variable_name = 'binlog_row_value_options';
mariadb> show global variables where variable_name = 'binlog_row_value_options';
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 結果
+--------------------------+-------+ | Variable_name | Value | +--------------------------+-------+ | binlog_row_value_options | | +--------------------------+-------+
+--------------------------+-------+ | Variable_name | Value | +--------------------------+-------+ | binlog_row_value_options | | +--------------------------+-------+
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 変数の値が
PARTIAL_JSON
に設定されている場合、次のコマンドを実行して設定を解除します。mariadb> set @@global.binlog_row_value_options="" ;
mariadb> set @@global.binlog_row_value_options="" ;
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
2.2.6. Debezium MariaDB コネクターのデプロイメント リンクのコピーリンクがクリップボードにコピーされました!
Debezium MariaDB コネクターをデプロイするには、次のいずれかの方法を使用できます。
Streams for Apache Kafka を使用して、コネクタープラグインを含むイメージを自動的に作成します。
以下は推奨される方法です。
-
Dockerfile からカスタム Kafka Connect コンテナーイメージをビルドします。
この Containerfile デプロイメント方法は非推奨となりました。この方法の手順は、ドキュメントの今後のバージョンで削除される予定です。
2.2.6.1. Streams for Apache Kafka を使用した MariaDB コネクターのデプロイメント リンクのコピーリンクがクリップボードにコピーされました!
Debezium コネクターのデプロイで推奨される方法は、Streams for Apache Kafka を使用して、コネクタープラグインを含む Kafka Connect コンテナーイメージを構築することです。
デプロイメントプロセス中に、以下のカスタムリソース (CR) を作成し、使用します。
-
Kafka Connect インスタンスを定義し、コネクターアーティファクトに関する情報をイメージに含める必要がある
KafkaConnect
CR。 -
コネクターがソースデータベースにアクセスするために使用する情報を提供する
KafkaConnector
CR。Streams for Apache Kafka が Kafka Connect Pod を起動した後、KafkaConnector
CR を適用してコネクターを起動します。
Kafka Connect イメージのビルド仕様では、デプロイ可能なコネクターを指定できます。各コネクタープラグインに対して、デプロイメントに利用可能にする他のコンポーネントを指定することもできます。たとえば、Apicurio Registry アーティファクトや Debezium スクリプトコンポーネントを追加できます。Streams for Apache Kafka は、Kafka Connect イメージをビルドするときに、指定されたアーティファクトをダウンロードし、それをイメージに組み込みます。
Kafka Connect CR の spec.build.output
パラメーターは、生成される KafkaConnect
コンテナーイメージを格納する場所を指定します。コンテナーイメージは、quay.io などのコンテナーレジストリー、または OpenShift ImageStream に保存できます。イメージを ImageStream に保存するには、Kafka Connect をデプロイする前に ImageStream を作成する必要があります。ImageStreams は自動的に作成されません。
KafkaConnect
リソースを使用してクラスターを作成する場合は、Kafka Connect REST API を使用してコネクターを作成または更新できません。ただし、REST API を使用して情報を取得できます。
関連情報
- 「Streams for Apache Kafka on OpenShift のデプロイと管理」の Kafka Connect の設定
- 「Streams for Apache Kafka on OpenShift のデプロイと管理」の 新しいコンテナーイメージの自動ビルド
2.2.6.2. Streams for Apache Kafka を使用した Debezium MariaDB コネクターのデプロイ リンクのコピーリンクがクリップボードにコピーされました!
以前のバージョンの Streams for Apache Kafka では、OpenShift に Debezium コネクターをデプロイするには、まずコネクター用の Kafka Connect イメージをビルドする必要がありました。OpenShift にコネクターをデプロイするための現在の推奨方法は、Streams for Apache Kafka のビルド設定を使用して、使用する Debezium コネクタープラグインを含む Kafka Connect コンテナーイメージを自動的にビルドすることです。
ビルドプロセス中に、Streams for Apache Kafka Operator は、Debezium コネクター定義を含む KafkaConnect
カスタムリソースの入力パラメーターを Kafka Connect コンテナーイメージに変換します。このビルドは、Red Hat Maven リポジトリーまたは別の設定済みの HTTP サーバーから必要なアーティファクトをダウンロードします。
新規に作成されたコンテナーは .spec.build.output
に指定されるコンテナーレジストリーにプッシュされ、Kafka Connect クラスターのデプロイに使用されます。Streams for Apache Kafka が Kafka Connect イメージをビルドした後、ビルドに含まれるコネクターを起動するための KafkaConnector
カスタムリソースを作成します。
前提条件
- クラスター Operator がインストールされている OpenShift クラスターにアクセスできる。
- Streams for Apache Kafka Operator が実行されている。
- Apache Kafka クラスターが Streams for Apache Kafka on OpenShift のデプロイと管理 に記載されているとおりにデプロイされている。
- Kafka Connect が Streams for Apache Kafka にデプロイされている。
- Red Hat build of Debezium のライセンスを所有している。
-
OpenShift
oc
CLI クライアントがインストールされている、または OpenShift Container Platform Web コンソールにアクセスできる。 Kafka Connect ビルドイメージの保存方法に応じて、レジストリーのパーミッションを用意するか、ImageStream リソースを作成している。
- ビルドイメージを Red Hat Quay.io または Docker Hub などのイメージレジストリーに保存する場合は、以下が必要です。
- レジストリーでイメージを作成し、管理するためのアカウントおよびパーミッション
- ビルドイメージをネイティブ OpenShift ImageStream として保存する場合は、以下を行います。
- ImageStream リソースを新規コンテナーイメージを保存するためにクラスターにデプロイします。クラスターの ImageStream を明示的に作成する必要があります。ImageStreams は、デフォルトでは利用できません。ImageStreams の詳細は、OpenShift Container Platform ドキュメント イメージストリームの管理 を参照してください。
手順
- OpenShift クラスターにログインします。
コネクターの Debezium
KafkaConnect
カスタムリソース (CR) を作成するか、既存のリソースを変更します。たとえば、metadata.annotations
およびspec.build
プロパティーを指定するdbz-connect.yaml
という名前のKafkaConnect
CR を作成します。以下の例は、KafkaConnect
カスタムリソースを記述するdbz-connect.yaml
ファイルからの抜粋を示しています。
例2.11 Debezium コネクターを含む
KafkaConnect
カスタムリソースを定義したdbz-connect.yaml
ファイル次の例では、カスタムリソースは、次のアーティファクトをダウンロードするように設定されています。
- Debezium MariaDB コネクターアーカイブ。
- Red Hat build of Apicurio Registry アーカイブApicurio Registry はオプションのコンポーネントです。コネクターで Avro シリアル化を使用する場合にのみ、Apicurio Registry コンポーネントを追加します。
- Debezium スクリプティング SMT アーカイブと、Debezium コネクターで使用する関連スクリプティングエンジン。SMT アーカイブとスクリプト言語の依存関係はオプションのコンポーネントです。Debezium コンテンツベースのルーティング SMT または フィルター SMT を使用する場合にのみ、これらのコンポーネントを追加します。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow Expand 表2.49 Kafka Connect 設定の説明 項目 説明 1
strimzi.io/use-connector-resources
アノテーションを"true"
に設定して、クラスター Operator がKafkaConnector
リソースを使用してこの Kafka Connect クラスター内のコネクターを設定できるようにします。2
spec.build
設定は、ビルドイメージの保存場所を指定し、プラグインアーティファクトの場所とともにイメージに追加するプラグインをリストします。3
build.output
は、新しくビルドされたイメージを保存するレジストリーを指定します。4
イメージ出力の名前およびイメージ名を指定します。
output.type
の有効な値は、Docker Hub や Quay などのコンテナーレジストリーにプッシュする場合はdocker
、内部の OpenShift ImageStream にイメージをプッシュする場合はimagestream
です。ImageStream を使用するには、ImageStream リソースをクラスターにデプロイする必要があります。KafkaConnect 設定でbuild.output
を指定する方法の詳細は、Streams for Apache Kafka API リファレンスの スキーマ参照のビルド を参照してください。5
plugins
設定は、Kafka Connect イメージに追加するすべてのコネクターをリストします。リストの各エントリーについて、プラグインname
と、コネクターのビルドに必要なアーティファクトに関する情報を指定します。必要に応じて、各コネクタープラグインに対して、コネクターと使用できる他のコンポーネントを含めることができます。たとえば、Service Registry アーティファクトまたは Debezium スクリプトコンポーネントを追加できます。6
artifacts.type
の値は、artifacts.url
で指定するアーティファクトのファイルタイプを指定します。有効なタイプはzip
、tgz
、またはjar
です。Debezium コネクターアーカイブは、.zip
ファイル形式で提供されます。type
の値は、url
フィールドで参照されるファイルのタイプと一致させる必要があります。7
artifacts.url
の値は、コネクターアーティファクトのファイルを格納する Maven リポジトリーなどの HTTP サーバーのアドレスを指定します。Debezium コネクターアーティファクトは Red Hat リポジトリーで入手できます。OpenShift クラスターが指定されたサーバーにアクセスできる必要があります。8
(オプション) Apicurio Registry コンポーネントをダウンロードするためのアーティファクト
type
とurl
を指定します。デフォルトの JSON コンバーターを使用する代わりに、コネクターが Apache Avro を使用して Red Hat build of Apicurio Registry でイベントのキーと値をシリアル化する場合にのみ、Apicurio Registry アーティファクトを含めます。9
(オプション) Debezium コネクターで使用する Debezium スクリプト SMT アーカイブのアーティファクト
type
とurl
を指定します。Debezium コンテンツベースルーティング SMT または フィルター SMT を使用する場合にのみ、スクリプト SMT を含めます。スクリプト SMT を使用するには、groovy などの JSR 223 準拠のスクリプト実装もデプロイする必要があります。10
(オプション) JSR 223 準拠のスクリプト実装の JAR ファイルのアーティファクト
type
とurl
を指定します。これは、Debezium スクリプト SMT で必要です。重要Streams for Apache Kafka を使用してコネクタープラグインを Kafka Connect イメージに組み込む場合、必要なスクリプト言語コンポーネントごとに、
artifacts.url
に JAR ファイルのロケーションを指定し、artifacts.type
の値もjar
に設定する必要があります。値が無効な場合は、実行時にコネクターが失敗します。スクリプト SMT で Apache Groovy 言語を使用できるようにするために、この例のカスタムリソースは、次のライブラリーの JAR ファイルを取得します。
-
groovy
-
groovy-jsr223
(スクリプトエージェント) -
groovy-json
(JSON 文字列を解析するためのモジュール)
別の方法として、Debezium スクリプト SMT は、GraalVM JavaScript の JSR 223 実装の使用もサポートします。
以下のコマンドを入力して、
KafkaConnect
ビルド仕様を OpenShift クラスターに適用します。oc create -f dbz-connect.yaml
oc create -f dbz-connect.yaml
Copy to Clipboard Copied! Toggle word wrap Toggle overflow Streams Operator はカスタムリソースで指定された設定に基づいて、デプロイする Kafka Connect イメージを準備します。
ビルドが完了すると、Operator はイメージを指定されたレジストリーまたは ImageStream にプッシュし、Kafka Connect クラスターを起動します。設定にリスト表示されているコネクターアーティファクトはクラスターで利用できます。KafkaConnector
リソースを作成し、デプロイする各コネクターのインスタンスを定義します。
たとえば、次のKafkaConnector
CR を作成し、mariadb-inventory-connector.yaml
として保存します。例2.12 Debezium コネクターの
KafkaConnector
カスタムリソースを定義するmariadb-inventory-connector.yaml
ファイルCopy to Clipboard Copied! Toggle word wrap Toggle overflow Expand 表2.50 コネクター設定の説明 項目 説明 1
Kafka Connect クラスターに登録するコネクターの名前。
2
コネクタークラスの名前。
3
同時に動作できるタスクの数。
4
コネクターの設定。
5
ホストデータベースインスタンスのアドレス。
6
データベースインスタンスのポート番号。
7
Debezium がデータベースへの接続に使用するアカウントの名前。
8
Debezium がデータベースユーザーアカウントに接続するために使用するパスワード。
9
コネクターの一意の ID (数値)。
10
データベースインスタンスまたはクラスターのトピック接頭辞。
指定する名前は、英数字またはアンダースコアのみで設定する必要があります。
トピック接頭辞は、このコネクターから変更イベントを受信する Kafka トピックの接頭辞として使用されるため、名前はクラスターのコネクター間で一意である必要があります。
コネクターを Avro コネクター と統合する場合、この namespace は、関連する Kafka Connect スキーマの名前や、対応する Avro スキーマの namespace でも使用されます。11
コネクターが変更イベントをキャプチャーするテーブルのリスト。
以下のコマンドを実行してコネクターリソースを作成します。
oc create -n <namespace> -f <kafkaConnector>.yaml
oc create -n <namespace> -f <kafkaConnector>.yaml
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 以下に例を示します。
oc create -n debezium -f mariadb-inventory-connector.yaml
oc create -n debezium -f mariadb-inventory-connector.yaml
Copy to Clipboard Copied! Toggle word wrap Toggle overflow コネクターは Kafka Connect クラスターに登録され、
KafkaConnector
CR のspec.config.database.dbname
で指定されたデータベースに対して実行を開始します。コネクター Pod の準備ができると、Debezium が実行されます。
これで、Debezium MariaDB デプロイメントを検証する 準備が整いました。
2.2.6.3. Dockerfile からカスタム Kafka Connect コンテナーイメージを構築して Debezium MariaDB コネクターをデプロイする手順 リンクのコピーリンクがクリップボードにコピーされました!
Debezium MariaDB コネクターをデプロイするには、Debezium コネクターアーカイブが含まれるカスタム Kafka Connect コンテナーイメージをビルドし、このコンテナーイメージをコンテナーレジストリーにプッシュする必要があります。次に、以下のカスタムリソース (CR) を作成する必要があります。
-
Kafka Connect インスタンスを定義する
KafkaConnect
CR。image
は Debezium コネクターを実行するために作成したイメージの名前を指定します。この CR は、Red Hat Streams for Apache Kafka がデプロイされている OpenShift インスタンスに適用します。Streams for Apache Kafka は、Apache Kafka を OpenShift に導入する Operator とイメージを提供します。 -
Debezium MariaDB コネクターを定義する
KafkaConnector
CR。この CR をKafkaConnect
CR を適用するのと同じ OpenShift インスタンスに適用します。
前提条件
- MariaDB が実行中であり、Debezium コネクターで動作するように MariaDB を設定する 手順が完了しました。
- Streams for Apache Kafka が OpenShift にデプロイされ、Apache Kafka および Kafka Connect が実行されている。詳細は、Streams for Apache Kafka on OpenShift のデプロイと管理 を参照してください。
- Podman または Docker がインストールされている。
-
Debezium コネクターを実行するコンテナーを追加する予定のコンテナーレジストリー (
quay.io
やdocker.io
など) でコンテナーを作成および管理するアカウントとパーミッションを持っている。
手順
Kafka Connect 用の Debezium MariaDB コンテナーを作成します。
registry.redhat.io/amq-streams/kafka-39-rhel9:2.9.0
をベースイメージとして使用する Dockerfile を作成します。たとえば、ターミナルウィンドウから、以下のコマンドを入力します。Copy to Clipboard Copied! Toggle word wrap Toggle overflow Expand 項目 説明 1
任意のファイル名を指定できます。
2
Kafka Connect プラグインディレクトリーへのパスを指定します。Kafka Connect のプラグインディレクトリーが別の場所にある場合は、このパスを実際のディレクトリーのパスに置き換えてください。
このコマンドは、現在のディレクトリーに
debezium-container-for-mariadb.yaml
という名前の Dockerfile を作成します。前の手順で作成した
debezium-container-for-mariadb.yaml
Docker ファイルからコンテナーイメージをビルドします。ファイルが含まれるディレクトリーから、ターミナルウィンドウを開き、以下のコマンドのいずれかを入力します。podman build -t debezium-container-for-mariadb:latest .
podman build -t debezium-container-for-mariadb:latest .
Copy to Clipboard Copied! Toggle word wrap Toggle overflow docker build -t debezium-container-for-mariadb:latest .
docker build -t debezium-container-for-mariadb:latest .
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 上記のコマンドは
debezium-container-for-mariadb
という名前のコンテナーイメージを構築します。カスタムイメージを
quay.io
などのコンテナーレジストリーまたは内部のコンテナーレジストリーにプッシュします。コンテナーレジストリーは、イメージをデプロイする OpenShift インスタンスで利用できる必要があります。以下のいずれかのコマンドを実行します。podman push <myregistry.io>/debezium-container-for-mariadb:latest
podman push <myregistry.io>/debezium-container-for-mariadb:latest
Copy to Clipboard Copied! Toggle word wrap Toggle overflow docker push <myregistry.io>/debezium-container-for-mariadb:latest
docker push <myregistry.io>/debezium-container-for-mariadb:latest
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 新しい Debezium MariaDB
KafkaConnect
カスタムリソース (CR) を作成します。たとえば、annotations
およびimage
プロパティーを指定するdbz-connect.yaml
という名前のKafkaConnect
CR を作成します。以下の例は、KafkaConnect
カスタムリソースを記述するdbz-connect.yaml
ファイルからの抜粋を示しています。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow Expand 項目 説明 1
KafkaConnector
リソースはこの Kafka Connect クラスターでコネクターを設定するために使用されることを、metadata.annotations
は Cluster Operator に示します。2
spec.image
は Debezium コネクターを実行するために作成したイメージの名前を指定します。設定された場合、このプロパティーによって Cluster Operator のSTRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE
変数がオーバーライドされます。以下のコマンドを入力して、
KafkaConnect
CR を OpenShift Kafka Connect 環境に適用します。oc create -f dbz-connect.yaml
oc create -f dbz-connect.yaml
Copy to Clipboard Copied! Toggle word wrap Toggle overflow このコマンドは、Debezium コネクターを実行するために作成したイメージの名前を指定する Kafka Connect インスタンスを追加します。
Debezium MariaDB コネクターインスタンスを設定する
KafkaConnector
カスタムリソースを作成します。Debezium MariaDB コネクターは、コネクターの設定プロパティーを指定する
.yaml
ファイルで設定します。コネクター設定は、Debezium に対して、スキーマおよびテーブルのサブセットにイベントを生成するよう指示する可能性があり、または機密性の高い、大きすぎる、または不必要な指定のコラムで Debezium が値を無視、マスク、または切り捨てするようにプロパティーを設定する可能性もあります。次の例では、ポート
3306
で MariaDB ホスト192.168.99.100
に接続し、インベントリー
データベースへの変更をキャプチャーする Debezium コネクターを設定します。dbserver1
は、サーバーの論理名です。MariaDB
inventory-connector.yaml
Copy to Clipboard Copied! Toggle word wrap Toggle overflow Expand 表2.51 コネクター設定の説明 項目 説明 1
コネクターの名前。
2
一度に実行できるタスクは 1 つだけです。MariaDB コネクターは MariaDB サーバーの
binlog
を読み取るため、単一のコネクタータスクを使用すると適切な順序とイベント処理が保証されます。Kafka Connect サービスはコネクターを使用して作業を行う 1 つ以上のタスクを開始し、実行中のタスクを自動的に Kafka Connect サービスのクラスター全体に分散します。いずれかのサービスが停止またはクラッシュすると、これらのタスクは稼働中のサービスに再分散されます。3
コネクターの設定。
4
データベースホスト。MariaDB サーバー (
mariadb
) を実行しているコンテナーの名前です。5
connector の一意 ID。
6
MariaDB サーバーまたはクラスターのトピック接頭辞。この名前は、変更イベントレコードを受信するすべての Kafka トピックの接頭辞として使用されます。
7
コネクターは
インベントリー
テーブルからのみ変更をキャプチャーします。8
DDL ステートメントをデータベーススキーマ履歴トピックに書き込み、復元するためにコネクターによって使用される Kafka ブローカーのリスト。再起動時に、コネクターが読み取りを開始すべき時点で binlog に存在したデータベースのスキーマを復元します。
9
データベーススキーマ履歴トピックの名前。このトピックは内部使用のみを目的としており、コンシューマーが使用しないようにしてください。
Kafka Connect でコネクターインスタンスを作成します。たとえば、
KafkaConnector
リソースをinventory-connector.yaml
ファイルに保存した場合は、以下のコマンドを実行します。oc apply -f inventory-connector.yaml
oc apply -f inventory-connector.yaml
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 上記のコマンドは
inventory-connector
を登録し、コネクターはKafkaConnector
CR に定義されているinventory
データベースに対して実行を開始します。
Debezium MariaDB コネクターに設定できる設定プロパティーの完全なリストについては、MariaDB コネクター設定プロパティー を参照してください。
結果
コネクターが起動すると、コネクターが設定されている MariaDB データベースの 一貫性のあるスナップショットが実行されます。その後、コネクターは行レベルの操作のデータ変更イベントの生成を開始し、変更イベントレコードを Kafka トピックにストリーミングします。
2.2.6.4. Debezium MariaDB コネクターが動作していることの確認 リンクのコピーリンクがクリップボードにコピーされました!
コネクターがエラーなしで正常に起動すると、コネクターがキャプチャーするように設定された各テーブルのトピックが作成されます。ダウンストリームアプリケーションは、これらのトピックをサブスクライブして、ソースデータベースで発生する情報イベントを取得できます。
コネクターが実行されていることを確認するには、OpenShift Container Platform Web コンソールまたは OpenShift CLI ツール (oc) から以下の操作を実行します。
- コネクターのステータスを確認します。
- コネクターがトピックを生成していることを確認します。
- 各テーブルの最初のスナップショットの実行中にコネクターが生成する読み取り操作 ("op":"r") のイベントがトピックに反映されていることを確認します。
前提条件
- Debezium コネクターが Streams for Apache Kafka on OpenShift にデプロイされている。
-
OpenShift
oc
CLI クライアントがインストールされている。 - OpenShift Container Platform Web コンソールにアクセスできる。
手順
以下の方法のいずれかを使用して
KafkaConnector
リソースのステータスを確認します。OpenShift Container Platform Web コンソールから以下を実行します。
-
Home
Search に移動します。 -
Search ページで Resources をクリックし、Select Resource ボックスを開き、
KafkaConnector
を入力します。 - KafkaConnectors リストから、チェックするコネクターの名前をクリックします (例: inventory-connector-mariadb)。
- Conditions セクションで、Type および Status 列の値が Ready および True に設定されていることを確認します。
-
Home
ターミナルウィンドウから以下を実行します。
以下のコマンドを実行します。
oc describe KafkaConnector <connector-name> -n <project>
oc describe KafkaConnector <connector-name> -n <project>
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 以下に例を示します。
oc describe KafkaConnector inventory-connector-mariadb -n debezium
oc describe KafkaConnector inventory-connector-mariadb -n debezium
Copy to Clipboard Copied! Toggle word wrap Toggle overflow このコマンドは、以下の出力のようなステータス情報を返します。
例2.13
KafkaConnector
リソースのステータスCopy to Clipboard Copied! Toggle word wrap Toggle overflow
コネクターによって Kafka トピックが作成されたことを確認します。
OpenShift Container Platform Web コンソールから以下を実行します。
-
Home
Search に移動します。 -
Search ページで Resources をクリックし、Select Resource ボックスを開き、
KafkaTopic
を入力します。 -
KafkaTopics リストから確認するトピックの名前をクリックします (例:
inventory-connector-mariadb.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d
)。 - Conditions セクションで、Type および Status 列の値が Ready および True に設定されていることを確認します。
-
Home
ターミナルウィンドウから以下を実行します。
以下のコマンドを実行します。
oc get kafkatopics
oc get kafkatopics
Copy to Clipboard Copied! Toggle word wrap Toggle overflow このコマンドは、以下の出力のようなステータス情報を返します。
例2.14
KafkaTopic
リソースのステータスCopy to Clipboard Copied! Toggle word wrap Toggle overflow
トピックの内容を確認します。
- ターミナルウィンドウから、以下のコマンドを入力します。
oc exec -n <project> -it <kafka-cluster> -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=<topic-name>
oc exec -n <project> -it <kafka-cluster> -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=<topic-name>
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 以下に例を示します。
oc exec -n debezium -it debezium-kafka-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=inventory-connector-mariadb.inventory.products_on_hand
oc exec -n debezium -it debezium-kafka-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=inventory-connector-mariadb.inventory.products_on_hand
Copy to Clipboard Copied! Toggle word wrap Toggle overflow トピック名を指定する形式は、手順 1 で返された
oc describe
コマンドと同じです (例:inventory-connector-mariadb.inventory.addresses
)。トピックの各イベントについて、このコマンドは、以下の出力のような情報を返します。
例2.15 Debezium 変更イベントの内容
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"inventory-connector-mariadb.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-mariadb.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-mariadb.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"int64","optional":false,"field":"ts_us"},{"type":"int64","optional":false,"field":"ts_ns"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mariadb.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"inventory-connector-mariadb.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"3.0.8.Final-redhat-00004","connector":"mariadb","name":"inventory-connector-mariadb","ts_ms":1638985247805,"ts_us":1638985247805000000,"ts_ns":1638985247805000000,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"mariadb-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"ts_us":1638985247805102,"ts_ns":1638985247805102588,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"inventory-connector-mariadb.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-mariadb.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-mariadb.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"int64","optional":false,"field":"ts_us"},{"type":"int64","optional":false,"field":"ts_ns"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mariadb.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"inventory-connector-mariadb.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"3.0.8.Final-redhat-00004","connector":"mariadb","name":"inventory-connector-mariadb","ts_ms":1638985247805,"ts_us":1638985247805000000,"ts_ns":1638985247805000000,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"mariadb-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"ts_us":1638985247805102,"ts_ns":1638985247805102588,"transaction":null}}
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 上記の例では、
payload
値は、コネクタースナップショットがテーブルinventory.products_on_hand
から読み込み ("op" ="r"
) イベントを生成したことを示しています。product_id
レコードの"before"
状態はnull
であり、レコードに以前の値が存在しないことを示しています。"after"
状態は、product_id
101
を持つ項目のquantity
が3
であることを示しています。
2.2.6.5. Debezium MariaDB コネクター設定プロパティーの説明 リンクのコピーリンクがクリップボードにコピーされました!
Debezium MariaDB コネクターには、アプリケーションに適切なコネクター動作を実現するために使用できる多数の設定プロパティーがあります。多くのプロパティーにはデフォルト値があります。
MariaDB コネクター設定プロパティーに関する情報は、以下のように整理されます。
- 必要なコネクター設定プロパティー
- 高度なコネクター設定プロパティー
- Debezium がデータベース履歴トピックから読み取るイベントを処理する方法を制御する データベース履歴コネクター設定プロパティー
必要な Debezium MariaDB コネクター設定プロパティー
以下の設定プロパティーは、デフォルト値がない場合は 必須 です。
bigint.unsigned.handling.mode
デフォルト値:long
。
コネクターが変更イベントで BIGINT UNSIGNED 列を表す方法を指定します。以下のオプションのいずれかを設定します。long
-
Java の
long
データ型を使用して、BIGINT UNSIGNED 列の値を表します。long
型はの精度を最適ではありませんが、大半のコンシューマーで簡単に実装できます。環境の多くでは、これが推奨される設定です。 precise
-
値を表すために
java.math.BigDecimal
データ型を使用します。コネクターは、Kafka Connectorg.apache.kafka.connect.data.Decimal
データ型を使用して、エンコードされたバイナリー形式で値を表します。コネクターが通常 2^63 より大きい値で動作する場合は、このオプションを設定します。long
データ型ではそのサイズの値を伝達できません。
binary.handling.mode
デフォルト値:
バイト
。
変更イベントでコネクターがバイナリー列 (Blob
、binary
、varbinary
など) の値を表す方法を指定します。
以下のオプションのいずれかを設定します。bytes
- バイナリーデータをバイト配列として表します。
base64
- バイナリーデータを base64 でエンコードされた文字列として表します。
base64-url-safe
- バイナリーデータを base64-url-safe-encoded 文字列として表します。
hex
- バイナリーデータを 16 進数 (base16) でエンコードされた文字列として表します。
column.exclude.list
デフォルト値: 空の文字列。
変更イベントレコードの値から除外する列の完全修飾名に一致する、オプションのコンマ区切りの正規表現のリスト。ソースレコード内の他の列は通常どおりキャプチャーされます。列の完全修飾名の形式は databaseName.tableName.columnName です。
列の名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、列名に存在する可能性のある部分文字列とは一致しない、列の名前文字列全体と照合されます。このプロパティーを設定に含める場合は、
column.include.list
プロパティーも設定しないでください。
column.include.list
デフォルト値: 空の文字列。
変更イベントレコードの値に含める列の完全修飾名に一致する、オプションのコンマ区切りの正規表現のリスト。その他の列はイベントレコードから省略されます。列の完全修飾名の形式は databaseName.tableName.columnName です。
列の名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、列名に存在する可能性のある部分文字列とは一致しない、列の名前文字列全体と照合されます。
このプロパティーを設定に含める場合は、column.exclude.list
プロパティーを設定しないでください。
column.mask.hash.v2.hashAlgorithm.with.salt.salt
デフォルト値: デフォルトなし。
文字ベースの列の完全修飾名に一致する、オプションのコンマ区切りの正規表現のリスト。列の完全修飾名の形式は<databaseName>.<tableName>.<columnName>
です。
列の名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、列の名前文字列全体に対して照合されます。式は、列名に存在する可能性のある部分文字列と一致しません。作成された変更イベントレコードでは、指定された列の値は仮名に置き換えられます。
仮名は、指定された hashAlgorithm と salt を適用した結果のハッシュ値で構成されます。使用されるハッシュ関数に基づいて、参照整合性は保持され、列値は仮名に置き換えられます。サポートされるハッシュ関数は、Java Cryptography Architecture Standard Algorithm Name Documentation の MessageDigest section に説明されています。
次の例では、
CzQMA0cB5K
はランダムに選択された salt です。column.mask.hash.SHA-256.with.salt.CzQMA0cB5K = inventory.orders.customerName, inventory.shipment.customerName
column.mask.hash.SHA-256.with.salt.CzQMA0cB5K = inventory.orders.customerName, inventory.shipment.customerName
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 必要な場合は、仮名は自動的に列の長さに短縮されます。コネクター設定には、異なるハッシュアルゴリズムと salt を指定する複数のプロパティーを含めることができます。
使用される hashAlgorithm、選択された salt、および実際のデータセットによっては、結果のデータセットが完全にマスクされない場合があります。
ハッシュストラテジーバージョン 2 は、異なる場所またはシステムでハッシュされる値が整合性を保てるようにします。
column.mask.with.length.chars
デフォルト値: デフォルトなし。
文字ベースの列の完全修飾名に一致する、オプションのコンマ区切りの正規表現のリスト。一連の列の値をコネクターでマスクする場合 (たとえば、列に機密データが含まれている場合) は、このプロパティーを設定します。length
を正の整数に設定して、指定された列のデータをプロパティー名の 長さ で指定されたアスタリスク (*
) 文字数で置き換えます。指定した列のデータを空の文字列に置き換えるには、長さ を0
(ゼロ) に設定します。列の完全修飾名は、次の形式に従います: databaseName.tableName.columnName列の名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、列の名前文字列全体に対して照合されます。式は、列名に存在する可能性のある部分文字列と一致しません。
単一の設定で、異なる長さを持つ複数のプロパティーを指定できます。
column.propagate.source.type
デフォルト値: デフォルトなし。
コネクターが列のメタデータを表す追加パラメーターを発行する列の完全修飾名に一致する、オプションのコンマ区切りの正規表現のリスト。このプロパティーが設定されている場合、コネクターは次のフィールドをイベントレコードのスキーマに追加します。-
__debezium.source.column.type
-
__debezium.source.column.length
-
__debezium.source.column.scale
これらのパラメーターは、それぞれ列の元の型名と長さ (可変幅型の場合) を伝播します。
コネクターがこの余分なデータを発行できるようにすると、sink データベース内の特定の数値または文字ベースの列のサイズを適切に設定するのに役立ちます。
列の完全修飾名は、次のいずれかの形式に従います: databaseName.tableName.columnName、または databaseName.schemaName.tableName.columnName.
列の名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、列の名前文字列全体に対して照合されます。式は、列名に存在する可能性のある部分文字列と一致しません。
-
column.truncate.to.length.chars
デフォルト値: デフォルトなし。
文字ベースの列の完全修飾名に一致する、オプションのコンマ区切りの正規表現のリスト。プロパティー名の 長さ で指定された文字数を超えた場合に、一連の列のデータを切り捨てる場合は、このプロパティーを設定します。length
を正の整数値に設定します (例:column.truncate.to.20.chars)
。列の完全修飾名は、次の形式に従います: databaseName.tableName.columnName列の名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、列の名前文字列全体に対して照合されます。式は、列名に存在する可能性のある部分文字列と一致しません。
単一の設定で、異なる長さを持つ複数のプロパティーを指定できます。
connect.timeout.ms
-
デフォルト値:
30000
(30 秒)。
接続要求がタイムアウトする前にコネクターが MariaDB データベースサーバーへの接続を確立するまで待機する最大時間 (ミリ秒単位) を指定する正の整数値。
connector.class
-
デフォルト値: デフォルトなし。
コネクターの Java クラスの名前。MariaDB コネクターの場合は常に指定します。
database.exclude.list
デフォルト値: 空の文字列。
データベースの名前に一致するオプションのコンマ区切りの正規表現のリスト。ただし、コネクターに変更をキャプチャーさせません。コネクターは、database.exclude.list
に名前が指定されていないデータベースの変更をキャプチャーします。
データベースの名前を照合するために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、データベースの名前文字列全体に対して照合されます。データベース名に存在する可能性のある部分文字列とは一致しません。
このプロパティーを設定に含める場合は、database.include.list
プロパティーも設定しないでください。
database.hostname
-
デフォルト値: デフォルトなし。
MariaDB データベースサーバーの IP アドレスまたはホスト名。
database.include.list
デフォルト値: 空の文字列。
コネクターが変更をキャプチャーし、さらにデータベースの名前に一致する、オプションのコンマ区切りの正規表現のリスト。コネクターは、名前がdatabase.include.list
にないデータベースの変更をキャプチャーしません。デフォルトでは、コネクターはすべてのデータベースの変更をキャプチャーします。
データベースの名前を照合するために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、データベースの名前文字列全体に対して照合されます。データベース名に存在する可能性のある部分文字列とは一致しません。
このプロパティーを設定に含める場合は、database.exclude.list
プロパティーも設定しないでください。
database.password
-
デフォルト値: デフォルトなし。
コネクターが MariaDB データベースサーバーに接続するために使用する MariaDB ユーザーのパスワード。
database.port
-
デフォルト値:
3306
。
MariaDB データベースサーバーの整数ポート番号。
database.server.id
-
デフォルト値: デフォルトなし。
このデータベースクライアントの数値 ID。指定された ID は、MariaDB クラスター内で現在実行中のすべてのデータベースプロセス全体で一意である必要があります。コネクターは、binlog の読み取りを可能にするために、この一意の ID を使用して、MariaDB データベースクラスターを別のサーバーとして参加させます。
database.user
-
デフォルト値: デフォルトなし。
コネクターが MariaDB データベースサーバーに接続するために使用する MariaDB ユーザーの名前。
decimal.handling.mode
デフォルト値:
precise
。
コネクターが変更イベントでDECIMAL
列とNUMERIC
列の値を処理する方法を指定します。
以下のオプションのいずれかを設定します。precise
(デフォルト)-
値を正確に表すために、バイナリー形式の
java.math.BigDecimal
値を使用します。 double
-
値を表すために
double
データ型を使用します。このオプションを選択すると精度が低下する可能性がありますが、ほとんどのコンシューマーにとって使いやすいものになります。 string
- フォーマットされた文字列としてエンコードされます。このオプションは簡単に使用できますが、実際の型に関するセマンティック情報が失われる可能性があります。
event.deserialization.failure.handling.mode
デフォルト値:
fail
。
binlog イベントのデシリアライズ中に例外が発生した場合にコネクターがどのように反応するかを指定します。このオプションは非推奨です。代わりにevent.processing.failure.handling.mode
オプションを使用してください。fail
- 問題のあるイベントとその binlog オフセットを示す例外を伝播し、コネクターを停止させます。
warn
- 問題のあるイベントとその binlog オフセットをログに記録し、イベントをスキップします。
ignore
- 問題のあるイベントを通過し、何もログに記録しません。
field.name.adjustment.mode
デフォルト値: デフォルトなし。
コネクターで使用されるメッセージコンバーターとの互換性を確保するために、フィールド名を調整する方法を指定します。以下のオプションのいずれかを設定します。none
- 調整はありません。
avro
- Avro 名で有効でない文字をアンダースコア文字に置き換えます。
avro_unicode
アンダースコア文字または Avro 名で使用できない文字は、
_uxxxx
などの対応する Unicode に置き換えます。
注記`_` is an escape sequence, similar to a backslash in Java
`_` is an escape sequence, similar to a backslash in Java
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 詳細は、Avro の命名 を参照してください。
gtid.source.excludes
-
デフォルト値: デフォルトなし。
コネクターが MariaDB サーバー上の binlog の位置を特定するために使用する GTID セット内のソースドメイン ID に一致する正規表現のコンマ区切りリスト。このプロパティーが設定されている場合、コネクターは、指定されたexclude
パターンのいずれにも一致しないソース UUID が含まれる GTID 範囲のみを使用します。
GTID の値を一致させるために、Debezium は、アンカー 正規表現として指定した正規表現を適用します。つまり、指定された式は GTID のドメイン識別子に対して照合されます。
このプロパティーを設定に含める場合は、gtid.source.includes
プロパティーも設定しないでください。
gtid.source.includes
-
デフォルト値: デフォルトなし。
コネクターが MariaDB サーバー上の binlog の位置を特定するために使用する GTID セット内のソースドメイン ID に一致する正規表現のコンマ区切りリスト。このプロパティーが設定されている場合、コネクターは、指定されたinclude
パターンのいずれかに一致するソース UUID が含まれる GTID 範囲のみを使用します。
GTID の値を一致させるために、Debezium は、アンカー 正規表現として指定した正規表現を適用します。つまり、指定された式は GTID のドメイン識別子に対して照合されます。
このプロパティーを設定に含める場合は、gtid.source.excludes
プロパティーも設定しないでください。
include.query
-
デフォルト値:
false
。
変更イベントを生成した元の SQL クエリーをコネクターに含めるかどうかを指定するブール値。
このオプションをtrue
に設定する場合は、MariaDB のbinlog_annotate_row_events
オプションをON
に設定して指定する必要もあります。include.query
がtrue
の場合、スナップショットプロセスによって生成されるイベントに対するクエリーは存在しません。include.query
をtrue
に設定すると、変更イベントに元の SQL ステートメントを含めることで明示的に除外またはマスクされたテーブルまたはフィールドが公開される可能性があります。そのため、デフォルト設定はfalse
です。
各ログイベントに対して元のSQL
ステートメントを返すようにデータベースを設定する方法の詳細は、クエリーログイベントの有効化 を参照してください。
include.schema.changes
-
デフォルト値:
true
ブール値は、コネクターがデータベーススキーマの変更をトピック接頭辞と同じ名前の Kafka トピックに公開するかどうかを指定します。コネクターは、データベース名が含まれるキーを持つ各スキーマの変更と、スキーマ更新を記述する JSON 構造の値を記録します。スキーマの変更を記録するこのメカニズムは、データベーススキーマ履歴への変更のコネクターの内部記録とは独立しています。
include.schema.comments
デフォルト値:
false
。
コネクターがメタデータオブジェクトのテーブルおよび列のコメントを解析して公開するかどうかを指定するブール値。
注記このオプションを
true
に設定すると、コネクターに含まれるスキーマコメントによって、各スキーマオブジェクトに大量の文字列データが追加される可能性があります。論理スキーマオブジェクトの数とサイズを増やすと、コネクターが使用するメモリーの量が増加します。
inconsistent.schema.handling.mode
デフォルト値:
fail
。
内部スキーマ表現に存在しないテーブルを参照する binlog イベントにコネクターが応答する方法を指定します。つまり、内部表現はデータベースと一致しません。
以下のオプションのいずれかを設定します。fail
- コネクターは、問題のあるイベントとその binlog オフセットを報告する例外を出力します。その後、コネクターが停止します。
warn
- コネクターは問題のあるイベントとそのバイナリーログオフセットをログに記録し、イベントをスキップします。
skip
- コネクターは問題のあるイベントをスキップして、その旨はログに報告されません。
message.key.columns
-
デフォルト値: デフォルトなし。
指定のテーブルの Kafka トピックに公開する変更イベントレコードのカスタムメッセージキーを形成するためにコネクターが使用する列を指定する式のリスト。
デフォルトでは、Debezium はテーブルのプライマリーキー列を、出力するレコードのメッセージキーとして使用します。デフォルトの代わりに、またはプライマリーキーのないテーブルのキーを指定するには、1 つ以上の列をもとにカスタムメッセージキーを設定できます。
テーブルのカスタムメッセージキーを作成するには、テーブルとメッセージキーとして使用する列をリストします。各リストエントリーは、<fully-qualified_tableName>:<keyColumn>,<keyColumn>
の形式を取ります。複数の列名をベースにテーブルキーを作成するには、列名の間にコンマを挿入します。
完全修飾テーブル名はそれぞれ、次の形式の正規表現です。<databaseName>.<tableName>
プロパティーには複数のテーブルのエントリーを含めることができます。セミコロンを使用して、リスト内のテーブルエントリーを区切ります。
以下の例は、テーブルinventory.customers
およびpurchase.orders
:inventory.customers:pk1,pk2;(.*).purchaseorders:pk3,pk4
のメッセージキーを設定します。テーブルinventory.customer
の場合、列pk1
とpk2
がメッセージキーとして指定されます。データベースでpurchaseorders
テーブルは、pk3
およびpk4
サーバーのコラムをメッセージキーとして使用します。
カスタムメッセージキーの作成に使用する列の数に制限はありません。ただし、一意の鍵を指定するために必要な最小数を使用することが推奨されます。
name
-
デフォルト値: デフォルトなし。
コネクターの一意の名前。同じ名前を使用して別のコネクターを登録しようとすると、登録は失敗します。このプロパティーはすべての Kafka Connect コネクターに必要です。
schema.name.adjustment.mode
デフォルト値: デフォルトなし。
コネクターが使用するメッセージコンバーターとの互換性を確保するために、コネクターがスキーマ名を調整する方法を指定します。以下のオプションのいずれかを設定します。none
- 調整はありません。
avro
- Avro 名で有効でない文字をアンダースコア文字に置き換えます。
avro_unicode
-
アンダースコア文字または Avro 名で使用できない文字は、
_uxxxx
などの対応する Unicode に置き換えます。
注記:_
はエスケープシーケンスで、Java のバックスラッシュに似ています。
skip.messages.without.change
-
デフォルト値:
false
。
含まれる列の変更が検出されない場合に、コネクターがレコードのメッセージを発行するかどうかを指定します。列は、column.include.list
にリストされている場合、またはcolumn.exclude.list
にリストされていない場合は、included とみなされます。含まれる列に変更がない場合にコネクターがレコードをキャプチャーしないようにするには、値をtrue
に設定します。
table.exclude.list
デフォルト値: 空の文字列。
テーブルの完全修飾テーブル識別子に一致する、オプションのコンマ区切りの正規表現のリスト。ただし、コネクターに変更をキャプチャーさせません。コネクターはtable.exclude.list
に含まれていないテーブルの変更をキャプチャーします。各識別子の形式は databaseName.tableName です。
列の名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、テーブル名に存在する可能性のある部分文字列とは一致しない、テーブルの名前文字列全体と照合されます。
このプロパティーを設定に含める場合は、table.include.list
プロパティーも設定しないでください。
table.include.list
デフォルト値: 空の文字列。
変更をキャプチャーするテーブルの完全修飾テーブル識別子に一致する、オプションのコンマ区切りの正規表現のリスト。コネクターは、table.include.list
に含まれていないテーブルの変更をキャプチャしません。各識別子の形式は databaseName.tableName です。デフォルトでは、コネクターは変更をキャプチャーするように設定されている全データベース内の非システムテーブルの変更をすべてキャプチャーします。
テーブルの名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、テーブル名に存在する可能性のある部分文字列とは一致しない、テーブルの名前文字列全体と照合されます。
このプロパティーを設定に含める場合は、table.exclude.list
プロパティーも設定しないでください。
tasks.max
-
デフォルト値:
1
。
このコネクターに対して作成するタスクの最大数。MariaDB コネクターは常に単一のタスクを使用するため、デフォルト値を変更しても効果はありません。
time.precision.mode
デフォルト値:
adaptive_time_microseconds
。
コネクターが時間、日付、タイムスタンプの値を表すために使用する精度のタイプを指定します。次のいずれかのオプションを設定します。
adaptive_time_microseconds
(デフォルト)-
コネクターは、データベース列のタイプに基づいて、ミリ秒、マイクロ秒、またはナノ秒の精度値を使用して、データベースとまったく同じように日付、日付時刻、およびタイムスタンプの値をキャプチャーします。ただし、常にマイクロ秒としてキャプチャーされる TIME タイプフィールドは例外です。
connect
- コネクターは常に、データベース列の精度に関係なくミリ秒の精度を使用する、Kafka Connect の組み込みの時間、日付、タイムスタンプの表現を使用して、時間とタイムスタンプの値を表します。
tombstones.on.delete
デフォルト値:
true
。
delete イベントの後に tombstone イベントが続くかどうかを指定します。ソースレコードが削除された後、コネクターはトゥームストーンイベント (デフォルトの動作) を発行して、トピックの ログ圧縮 が有効になっている場合に、削除された行のキーに関連するすべてのイベントを Kafka が完全に削除できるようにします。次のいずれかのオプションを設定します。
true
(デフォルト)-
コネクターは、delete イベントとそれに続く tombstone イベントを発行することによって削除操作を表します。
false
-
コネクターは delete イベントのみを出力します。
topic.prefix
デフォルト値: デフォルトなし。
Debezium が変更をキャプチャーしている特定の MariaDB データベースサーバーまたはクラスターの namespace を提供するトピック接頭辞。トピック接頭辞は、このコネクターが発行するイベントを受信するすべての Kafka トピックの名前として使用されるので、トピック接頭辞がすべてのコネクター間で一意であることが重要です。値には、英数字、ハイフン、ドット、アンダースコアのみを使用できます。
警告このプロパティーを設定した後は、値を変更しないでください。値を変更すると、コネクターの再起動後に、コネクターは元のトピックにイベントを引き続き送信するのではなく、新しい値に基づいた名前のトピックに後続のイベントを送信します。また、コネクターはデータベーススキーマ履歴トピックを復元できません。
高度な Debezium MariaDB コネクター設定プロパティー
次のリストは、MariaDB コネクターの高度な設定プロパティーについて説明します。これらのプロパティーのデフォルト値を変更する必要はほぼありません。そのため、コネクター設定にデフォルト値を指定する必要はありません。
connect.keep.alive
-
デフォルト値:
true
。
MariaDB サーバーまたはクラスターへの接続を維持するために別のスレッドを使用するかどうかを指定するブール値。
converters
デフォルト値: デフォルトなし。
コネクターが使用できる カスタムコンバーター インスタンスのシンボリック名のコンマ区切りリストを列挙します。
たとえば、boolean
です。
このプロパティーは、コネクターがカスタムコンバーターを使用できるようにするために必要です。
コネクターに設定するコンバータごとに、コンバータインターフェイスを実装するクラスの完全修飾名を指定する.type
プロパティーも追加する必要があります。.type
プロパティーでは、以下の形式を使用します。
<converterSymbolicName>.type
以下に例を示します。
boolean.type: io.debezium.connector.binlog.converters.TinyIntOneToBooleanConverter
boolean.type: io.debezium.connector.binlog.converters.TinyIntOneToBooleanConverter
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 設定されたコンバータの動作をさらに制御したい場合は、1 つ以上の設定パラメーターを追加して、コンバータに値を渡すことができます。これらの追加設定パラメ設定ーターをコンバータに関連付けるには、パラメーター名の前にコンバーターのシンボル名を付けます。
たとえば、
boolean
コンバーターが処理する列のサブセットを指定するselector
パラメーターを定義するには、次のプロパティーを追加します。
boolean.selector=db1.table1.*, db1.table2.column1
boolean.selector=db1.table1.*, db1.table2.column1
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
custom.metric.tags
-
デフォルト値: デフォルトなし。
コンテキスト情報を提供するメタデータを追加して、MBean オブジェクト名をカスタマイズするタグを定義します。キーと値のペアのコンマ区切りリストを指定します。各キーは MBean オブジェクト名のタグを表し、対応する値はキーの値を表します (例:
k1=v1、k2=v2
)。
コネクターは、指定されたタグを基本 MBean オブジェクト名に追加します。タグは、メトリクスデータを整理および分類するのに役立ちます。特定のアプリケーションインスタンス、環境、リージョン、バージョンなどを識別するためのタグを定義できます。詳細は、カスタマイズされた MBean 名 を参照してください。
database.initial.statements
デフォルト値: デフォルトなし。
トランザクションログを読み取る接続ではなく、データベースへの JDBC 接続が確立されたときに実行される、セミコロンで区切られた SQL ステートメントのリスト。SQL ステートメントでセミコロンを区切り文字としてではなく、文字として指定する場合は、2 つのセミコロン (;;
) を使用します。
コネクターは独自の判断で JDBC 接続を確立する可能性があるため、このプロパティーはセッションパラメーターの設定専用です。DML ステートメントを実行するものではありません。
database.query.timeout.ms
-
デフォルト値:
600000
(10 分)。
コネクターがクエリーの完了を待機する時間をミリ秒単位で指定します。タイムアウト制限を削除するには、値を0
(ゼロ) に設定します。
database.ssl.keystore
-
デフォルト値: デフォルトなし。
キーストアファイルの場所を指定するオプションの設定。キーストアファイルは、クライアントと MariaDB サーバー間の双方向認証に使用できます。
database.ssl.keystore.password
-
デフォルト値: デフォルトなし。
キーストアファイルのパスワード。database.ssl.keystore
が設定されている場合にのみパスワードを指定します。
database.ssl.mode
デフォルト値:
preferred
。
コネクターが暗号化された接続を使用するかどうかを指定します。以下の設定が可能です。disabled
- 暗号化されていない接続の使用を指定します。
preferred
(デフォルト)- サーバーが安全な接続をサポートしている場合、コネクターは暗号化された接続を確立します。サーバーが安全な接続をサポートしていない場合、コネクターは暗号化されていない接続を使用します。
required
- コネクターは暗号化された接続を確立します。暗号化された接続を確立できない場合、コネクターは失敗します。
verify_ca
-
コネクターは、
required
のオプションを設定した場合と同じように動作しますが、設定された認証局 (CA) 証明書に対してサーバーの TLS 証明書も検証します。サーバーの TLS 証明書が有効な CA 証明書と一致しない場合、コネクターは失敗します。
verify_identity
-
コネクターは
verify_ca
オプションを設定した場合と同じように動作しますが、サーバー証明書がリモート接続のホストと一致するかどうかも検証します。
database.ssl.truststore
-
デフォルト値: デフォルトなし。
サーバー証明書検証用のトラストストアファイルの場所。
database.ssl.truststore.password
-
デフォルト値: デフォルトなし。
トラストストアファイルのパスワード。トラストストアの整合性をチェックし、トラストストアのロックを解除するために使用されます。
enable.time.adjuster
デフォルト値:
true
。
コネクターが 2 桁の年指定を 4 桁に変換するかどうかを示すブール値。変換がデータベースに完全に委任される場合は、値をfalse
に設定します。
MariaDB ユーザーは、2 桁または 4 桁の年の値を挿入できます。2 桁の値は、1970 - 2069 の範囲の年にマッピングされます。デフォルトでは、コネクターが変換を実行します。
errors.max.retries
デフォルト値:
-1
。
接続エラーなどの再試行可能なエラーが発生する操作が実行された後にコネクターがどのように応答するかを指定します。
以下のオプションのいずれかを設定します。-1
- 制限なしコネクターは常に自動的に再起動し、以前の失敗回数に関係なく、操作を再試行します。
0
- Disabledコネクターはすぐに失敗し、操作を再試行することはありません。コネクターを再起動するにはユーザーの介入が必要です。
> 0
- 指定された最大再試行回数に達するまで、コネクターは自動的に再起動します。次の障害が発生すると、コネクターは停止し、再起動するにはユーザーの介入が必要になります。
event.converting.failure.handling.mode
デフォルト値:
warn
。
列のデータ型と Debezium 内部スキーマで指定された型が一致しないためにテーブルレコードを変換できない場合にコネクターがどのように応答するかを指定します。
以下のオプションのいずれかを設定します。fail
-
例外は、フィールドのデータ型がスキーマタイプと一致しなかったために変換が失敗したことを報告し、変換を正常に行うにはコネクターを
schema _only_recovery
モードで再起動する必要がある可能性があることを示します。 warn
-
コネクターは、変換に失敗した列のイベントフィールドに
null
値を書き込み、警告ログにメッセージを書き込みます。
skip
-
コネクターは、変換に失敗した列のイベントフィールドに
null
値を書き込み、デバッグログにメッセージを書き込みます。
event.processing.failure.handling.mode
デフォルト値:
fail
。
問題のあるイベントに遭遇した場合など、イベントの処理中に発生する障害をコネクターがどのように処理するかを指定します。以下の設定が可能です。fail
- コネクターは、問題のあるイベントとその位置を報告する例外を発生させます。その後、コネクターが停止します。
warn
- コネクターにより例外が出力されることはありません。代わりに、問題のあるイベントとその位置をログに記録し、イベントをスキップします。
ignore
- コネクターは問題のあるイベントを無視し、ログエントリーは生成されません。
heartbeat.action.query
デフォルト値: デフォルトなし。
コネクターがハートビートメッセージを送信するときに、コネクターがソースデータベースで実行するクエリーを指定します。
たとえば、次のクエリーは、ソースデータベースで実行された GTID セットの状態を定期的にキャプチャーします。
INSERT INTO gtid_history_table (select @gtid_executed)
heartbeat.interval.ms
デフォルト値:
0
。
コネクターが Kafka トピックにハートビートメッセージを送信する頻度を指定します。デフォルトでは、コネクターによりハートビートメッセージは送信されません。
ハートビートメッセージは、コネクターがデータベースから変更イベントを受信しているかどうかを監視するのに便利です。ハートビートメッセージは、コネクターの再起動時に再送信する必要がある変更イベントの数を減らすのに役立つ可能性があります。ハートビートメッセージを送信するには、このプロパティーを、ハートビートメッセージの間隔をミリ秒単位で示す正の整数に設定します。
incremental.snapshot.allow.schema.changes
-
デフォルト値:
false
。
コネクターが増分スナップショット中にスキーマの変更を許可するかどうかを指定します。値がtrue
に設定されている場合、コネクターは増分スナップショット中にスキーマの変更を検出し、DDL のロックを回避するために現在のチャンクを再選択します。
プライマリーキーへの変更はサポートされていません。増分スナップショットの作成中にプライマリーを変更すると、誤った結果が生じる可能性があります。さらに他の制限として、スキーマの変更が列のデフォルト値にのみ影響する場合、DDL が binlog ストリームから処理されるまで変更が検出されないことが挙げられます。これはスナップショットイベントの値には影響しませんが、これらのスナップショットイベントのスキーマのデフォルトが古くなっている可能性があります。
incremental.snapshot.chunk.size
-
デフォルト値:
1024
。
コネクターが増分スナップショットチャンクを取得するときにフェッチしてメモリーに読み込む行の最大数。スナップショットは、サイズが大きいスナップショットの場合にはクエリーが少なくなるため、チャンクサイズを増やすと効率が上がります。ただし、チャンクサイズが大きい場合には、スナップショットデータのバッファーにより多くのメモリーが必要になります。チャンクサイズは、環境で最適なパフォーマンスを発揮できる値に、調整します。
incremental.snapshot.watermarking.strategy
デフォルト値:
insert_insert
。
増分スナップショットによってキャプチャーされ、ストリーミングの再開後に再キャプチャーされる可能性のあるイベントの重複を排除するために、コネクターが増分スナップショット中に使用するウォーターマークメカニズムを指定します。
以下のオプションのいずれかを指定することができます。insert_insert
(デフォルト)- 増分スナップショットを開始するシグナルを送信すると、スナップショット中に Debezium が読み取るチャンクごとに、スナップショットウィンドウを開くシグナルを記録するエントリーがシグナリングデータコレクションに書き込まれます。スナップショットが完了すると、Debezium はウィンドウを閉じるシグナルを記録する 2 番目のエントリーを挿入します。
insert_delete
- 増分スナップショットを開始するシグナルを送信すると、Debezium が読み取るチャンクごとに、スナップショットウィンドウを開くシグナルを記録する 1 つのエントリーがシグナリングデータコレクションに書き込まれます。スナップショットが完了すると、このエントリーは削除されます。スナップショットウィンドウを閉じるシグナルのエントリーは作成されません。シグナリングデータコレクションの急増を防ぐには、このオプションを設定します。
max.batch.size
-
デフォルト値:
2048
。
このコネクターの反復処理中に処理される必要があるイベントの各バッチの最大サイズを指定する正の整数値。
max.queue.size
-
デフォルト値:
8192
。
ブロッキングキューが保持できるレコードの最大数を指定する正の整数値。Debezium はデータベースからストリームされたイベントを読み込む際、Kafka に書き込む前にブロッキングキューにイベントを配置します。ブロッキングキューは、コネクターが Kafka に書き込むよりも速くメッセージを取り込む場合、または Kafka が利用できなくなった場合に、データベースから変更イベントを読み込むためのバックプレッシャーを提供することができます。コネクターがオフセットを定期的に記録すると、キューに保持されるイベントは無視されます。max.queue.size
は常にmax.batch.size
の値よりも大きい値に設定してください。
max.queue.size.in.bytes
-
デフォルト値:
0
。
ブロッキングキューの最大ボリュームをバイト単位で指定する長整数値。デフォルトでは、ブロックキューにはボリューム制限は指定されません。キューが使用できるバイト数を指定するには、このプロパティーを正の long 値に設定します。max.queue.size
も設定されている場合、キューのサイズがいずれかのプロパティーで指定された制限に達すると、キューへの書き込みがブロックされます。たとえば、max.queue.size=1000
、max.queue.size.in.bytes=5000
と設定した場合、キューに 1000 レコードが入った後、あるいはキュー内のレコードの量が 5000 バイトに達した後、キューへの書き込みがブロックされます。
min.row.count.to.stream.results
デフォルト値:
1000
。
スナップショットの作成中に、コネクターは変更をキャプチャーするように設定されている各テーブルをクエリーします。コネクターは各クエリーの結果を使用して、そのテーブルのすべての行のデータが含まれる読み取りイベントを生成します。このプロパティーは、MariaDB コネクターがテーブルの結果をメモリーに格納するか、ストリーミングを行うかを決定します。メモリーへの格納はすばやく処理できますが、大量のメモリーを必要とします。ストリーミングを行うと、処理は遅くなりますが、非常に大きなテーブルにも対応できます。このプロパティーの設定は、コネクターが結果のストリーミングを行う前にテーブルに含まれる必要がある行の最小数を指定します。
すべてのテーブルサイズチェックを省略し、スナップショットの実行中に常にすべての結果をストリーミングする場合は、このプロパティーを
0
に設定します。
notification.enabled.channels
デフォルト値: デフォルトなし。
コネクターに対して有効になっている通知チャネル名のリスト。デフォルトでは、以下のチャネルが利用可能です。-
sink
-
log
-
jmx
-
poll.interval.ms
-
デフォルト値:
500
(0.5 秒)。
コネクターがイベントのバッチ処理を開始する前に、新しい変更イベントが表示されるのを待機する時間をミリ秒単位で指定する正の整数値。
provide.transaction.metadata
-
デフォルト値:
false
。
コネクターがトランザクション境界を持つイベントを生成し、トランザクションメタデータを使用して変更イベントエンベロープを強化するかどうかを決定します。コネクターにこれを実行させる場合はtrue
を指定します。詳細は、トランザクションメタデータ を参照してください。
signal.data.collection
-
デフォルト値: デフォルトなし。
コネクターに シグナル を送信するために使用されるデータコレクションの完全修飾名。<databaseName>.<tableName>
の形式を使用してコレクション名を指定します。
signal.enabled.channels
デフォルト値: デフォルトなし。
コネクターに対して有効になっているシグナリングチャネル名のリスト。デフォルトでは、以下のチャネルが利用可能です。-
source
-
kafka
-
file
-
jmx
-
skipped.operations
デフォルト値:
t
ストリーミング中にコネクターがスキップする操作タイプのコンマ区切りリスト。以下のタイプの操作をスキップするようにコネクターを設定することができます。-
c
(挿入/作成) -
u
(更新) -
d
(削除) -
t
(省略)
-
コネクターに操作をスキップしてほしくない場合は、値を none
に設定します。
snapshot.delay.ms
-
デフォルト値: デフォルトなし。
コネクターの起動時にスナップショットを実行する前にコネクターが待機する間隔 (ミリ秒単位)。クラスターで複数のコネクターを起動する場合、このプロパティーは、コネクターのリバランスが行われる原因となるスナップショットの中断を防ぐのに役立ちます。
snapshot.fetch.size
-
デフォルト値: 未設定。
デフォルトでは、スナップショットの作成中に、コネクターはテーブルの内容を行単位で読み取ります。バッチ内の行の最大数を指定するには、このプロパティーを設定します。
snapshot.include.collection.list
-
デフォルト値:
table.include.list
で指定されたすべてのテーブル。
スナップショットに含めるテーブルの完全修飾名 (<databaseName>.<tableName>
) と一致する正規表現のコンマ区切りリスト (任意)。指定する項目は、コネクターのtable.include.list
プロパティーで名前を付ける必要があります。このプロパティーは、コネクターのsnapshot.mode
プロパティーがnever
. 以外の値に設定されている場合にのみ有効です。
このプロパティーは増分スナップショットの動作には影響しません。
テーブルの名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定された式は、テーブル名に存在する可能性のある部分文字列とは一致しない、テーブルの名前文字列全体と照合されます。
snapshot.lock.timeout.ms
-
デフォルト値:
10000
。
スナップショットを実行するときにテーブルロックを取得するために待機する最大時間 (ミリ秒単位) を指定する正の整数。コネクターがこの期間にテーブルロックを取得できないと、スナップショットは失敗します。詳細は、MariaDB コネクターによるデータベーススナップショットの実行方法 を説明するドキュメントを参照してください。
snapshot.locking.mode
デフォルト値:
minimal
。
コネクターがグローバル MariaDB 読み取りロックを保持するかどうか、および保持する期間を指定します。これにより、コネクターがスナップショットを実行している間、データベースへの更新を加えることができません。以下の設定が可能です。minimal
-
コネクターは、データベーススキーマやその他のメタデータを読み取るスナップショットの初期フェーズのみ、グローバル読み取りロックを保持します。スナップショットの次のフェーズでは、コネクターは各テーブルからすべての行を選択するときにロックを解除します。一貫した方法で SELECT 操作を実行するために、コネクターは REPEATABLE READ トランザクションを使用します。グローバル読み取りロックが解除されると、他の MariaDB クライアントがデータベースを更新できるようになりますが、トランザクションの期間中、コネクターは同じデータを読み取り続けるため、REPEATABLE READ 分離を使用すると、スナップショットの一貫性が確保されます。
extended
-
スナップショットの作成中にすべての書き込み操作をブロックします。クライアントが MariaDB の REPEATABLE READ 分離レベルと互換性のない同時操作を送信する場合は、この設定を使用します。
none
- スナップショット中にコネクターがテーブルロックを取得するのを防ぎます。このオプションはすべてのスナップショットモードで許可されますが、スナップショットの作成中にスキーマの変更が発生しない場合に のみ 安全に使用できます。MyISAM エンジンで定義されたテーブルは常にテーブルロックを取得します。その結果、このオプションを設定しても、このようなテーブルはロックされます。この動作は、行レベルのロックを取得する InnoDB エンジンによって定義されたテーブルとは異なります。
snapshot.max.threads
デフォルト値:
1
。
初期スナップショットを実行するときにコネクターが使用するスレッドの数を指定します。並列初期スナップショットを有効にするには、プロパティーを 1 より大きい値に設定します。並列初期スナップショットでは、コネクターは複数のテーブルを同時に処理します。
重要並列初期スナップショットは開発者プレビュー機能のみとなっています。開発者プレビューソフトウェアは、Red Hat では一切サポートされておらず、機能的に完全ではなく、実稼働環境に対応していません。開発者プレビューのソフトウェアを実稼働ワークロードまたはビジネスクリティカルなワークロードには使用しないでください。開発者プレビューソフトウェアは、今後 Red Hat 製品サービスとして追加される可能性のある製品ソフトウェアを前もって早期に利用できます。お客様はこのソフトウェアを使用して機能をテストし、開発プロセス中にフィードバックを提供できます。このソフトウェアはいつでも変更または削除される可能性があり、限定的なテストしか行われていません。Red Hat は、関連する SLA なしに、開発者プレビューソフトウェアに対するフィードバックを送信する手段を提供する場合があります。
Red Hat 開発者プレビューソフトウェアのサポート範囲の詳細は、開発者プレビューのサポート範囲 を参照してください。
snapshot.mode
デフォルト値:
initial
。
コネクターの起動時にスナップショットを実行するための基準を指定します。以下の設定が可能です。always
- コネクターは起動するたびにスナップショットを実行します。スナップショットには、キャプチャーされたテーブルの構造およびデータが含まれます。この値を指定すると、コネクターが起動するたびに、キャプチャーされたテーブルからのデータの完全な表現がトピックに入力されます。
initial
(デフォルト)- コネクターは、論理サーバー名のオフセットが記録されていない場合、または以前のスナップショットが完了しなかったことが検出された場合にのみ、スナップショットを実行します。スナップショットが完了すると、コネクターは、後続のデータベース変更のに備え、イベントレコードのストリーミングを開始します。
initial_only
- コネクターは、論理サーバー名のオフセットが記録されていない場合にのみスナップショットを実行します。スナップショットが完了すると、コネクターは停止します。binlog からの変更イベントを読み取る際にストリーミングに移行しません。
schema_only
-
非推奨です。
no_data
を参照してください。 no_data
- コネクターは、テーブルデータではなくスキーマのみをキャプチャーするスナップショットを実行します。トピックにデータの一貫したスナップショットを含める必要はないが、最後のコネクターの再起動後に適用されたスキーマの変更をキャプチャーする場合は、このオプションを設定します。
schema_only_recovery
-
非推奨です。
recovery
を参照してください。 recovery
損失または破損したデータベーススキーマの履歴トピックを復元するにはこのオプションを設定します。再起動後、コネクターはソーステーブルからトピックを再構築するスナップショットを実行します。また、このプロパティーを設定して、予期しない増加が発生するデータベーススキーマ履歴トピックを定期的にプルーニングすることもできます。
警告最後のコネクターのシャットダウン後にスキーマの変更がデータベースにコミットされた場合、このモードを使用してスナップショットを実行しないでください。
never
-
コネクターが起動すると、スナップショットを実行するのではなく、後続のデータベース変更のイベントレコードのストリーミングがすぐに開始されます。
no_data
オプションが優先的に使用されるようになり、このオプションは、今後非推奨にするか検討中です。 when_needed
コネクターが起動した後、次のいずれかの状況を検出した場合にのみスナップショットが実行されます。
- トピックのオフセットを検出できません。
- 以前に記録されたオフセットは、サーバー上で使用できない binlog の位置または GTID を指定します。
snapshot.query.mode
デフォルト値:
select_all
。
スナップショットを実行するときにコネクターがデータをクエリーする方法を指定します。
以下のオプションのいずれかを設定します。select_all
(デフォルト)-
コネクターは、
select all
クエリーを使用してキャプチャーされたテーブルから行を取得し、必要に応じて、列のinclude
およびexclude
リストの設定に基づいて選択された列を調整します。
この設定により、
snapshot.select.statement.overrides
プロパティーを使用する場合と比較して、より柔軟にスナップショットコンテンツを管理できるようになります。
snapshot.select.statement.overrides
デフォルト値: デフォルトなし。
スナップショットに含めるテーブル行を指定します。スナップショットにテーブルの行のサブセットのみを含める場合は、プロパティーを使用します。このプロパティーはスナップショットにのみ影響します。コネクターがログから読み取るイベントには影響しません。<databaseName>.<tableName>
の形式で完全修飾テーブル名のコンマ区切りリストを指定します。以下に例を示します。
"snapshot.select.statement.overrides": "inventory.products,customers.orders"
リスト内の各テーブルに対して、スナップショットを取得するときにコネクターがテーブルで実行する
SELECT
ステートメントを指定する別の設定プロパティーを追加します。指定したSELECT
ステートメントは、スナップショットに追加するテーブル行のサブセットを決定します。このSELECT
ステートメントプロパティーの名前を指定するには、次の形式を使用します。
snapshot.select.statement.overrides.<databaseName>.<tableName>
。たとえば、snapshot.select.statement.overrides.customers.orders
などです。
ソフト削除列delete_flag
を含むcustomers.orders
テーブルから、スナップショットにソフト削除されていないレコードのみを含める場合は、次のプロパティーを追加します。"snapshot.select.statement.overrides": "customer.orders", "snapshot.select.statement.overrides.customer.orders": "SELECT * FROM [customers].[orders] WHERE delete_flag = 0 ORDER BY id DESC"
"snapshot.select.statement.overrides": "customer.orders", "snapshot.select.statement.overrides.customer.orders": "SELECT * FROM [customers].[orders] WHERE delete_flag = 0 ORDER BY id DESC"
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 作成されるスナップショットでは、コネクターには
delete_flag = 0
のレコードのみが含まれます。
snapshot.tables.order.by.row.count
デフォルト値:
disabled
。
コネクターが初期スナップショットを実行するときにテーブルを処理する順序を指定します。以下のオプションのいずれかを設定します。descending
- コネクターは、行数に基づいて、最上位から最下位の順にテーブルのスナップショットを作成します。
ascending
- コネクターは、行数に基づいて、最下位から最上位の順にテーブルのスナップショットを作成します。
disabled
- コネクターは、初期スナップショットを実行するときに行数を無視します。
streaming.delay.ms
-
デフォルト値:
0
。
コネクターがスナップショットを完了した後、ストリーミングプロセスの開始を遅延する時間をミリ秒単位で指定します。遅延間隔を設定すると、スナップショットが完了した直後で、ストリーミングプロセスの開始前に障害が発生した場合に、コネクターがスナップショットを再開できないようにします。Kafka Connect ワーカーに設定されているoffset.flush.interval.ms
プロパティーの値よりも高い遅延値を設定します。
table.ignore.builtin
-
デフォルト値:
true
。
組み込みシステムテーブルを無視するかどうかを指定するブール値。これは、テーブルの include および exclude リストに関係なく適用されます。デフォルトでは、システムテーブルの値に加えられた変更はキャプチャーから除外され、Debezium はシステムテーブルの変更に対してイベントを生成しません。
topic.cache.size
-
デフォルト値:
10000
。
制限された同時ハッシュマップ内のメモリーに格納できるトピック名の数を指定します。コネクターはキャッシュを使用して、データコレクションに対応するトピック名を決定します。
topic.delimiter
-
デフォルト値:
.
。
コネクターがトピック名のコンポーネント間に挿入する区切り文字を指定します。
topic.heartbeat.prefix
デフォルト値:
__debezium-heartbeat
。
コネクターがハートビートメッセージを送信するトピックの名前を指定します。トピック名の形式は次のとおりです。
topic.heartbeat.prefix.topic.prefix
たとえば、トピックの接頭辞が
fulfillment
の場合、デフォルトのトピック名は__debezium-heartbeat.fulfillment
になります。
topic.naming.strategy
-
デフォルト値:
io.debezium.schema.DefaultTopicNamingStrategy
。
コネクターが使用するTopicNamingStrategy
クラスの名前。指定されたストラテジーによって、データ変更、スキーマ変更、トランザクション、ハートビートなどのイベントレコードを格納するトピックにコネクターが名前を付ける方法が決まります。
topic.transaction
デフォルト値:
transaction
。
コネクターがトランザクションメタデータメッセージを送信するトピックの名前を指定します。トピック名のパターンは次のとおりです。
topic.prefix.topic.transaction
たとえば、トピック接頭辞が
fulfillment
の場合、デフォルトのトピック名はfulfillment.transaction
になります。
use.nongraceful.disconnect
-
デフォルト値: false。
バイナリーログクライアントのキープアライブスレッドがSO_LINGER
ソケットオプションを0
に設定して、古い TCP 接続をすぐに切断するかどうかを指定するブール値。SSLSocketImpl.close
でコネクターのデッドロックが発生する場合は、値をtrue
に設定します。
Debezium コネクターデータベーススキーマ履歴設定プロパティー
Debezium には、コネクターがスキーマ履歴トピックと対話する方法を制御する schema.history.internal.*
プロパティーのセットが含まれています。
以下の表は、Debezium コネクターを設定するための schema.history.internal
プロパティーを説明しています。
プロパティー | デフォルト | 説明 |
---|---|---|
デフォルトなし | コネクターがデータベーススキーマの履歴を保存する Kafka トピックの完全名。 | |
デフォルトなし | Kafka クラスターへの最初の接続を確立するためにコネクターが使用するホストとポートのペアのリスト。このコネクションは、コネクターによって以前に保存されたデータベーススキーマ履歴の取得や、ソースデータベースから読み取られる各 DDL ステートメントの書き込みに使用されます。各ペアは、Kafka Connect プロセスによって使用される同じ Kafka クラスターを示す必要があります。 | |
| 永続化されたデータのポーリングが行われている間にコネクターが起動/回復を待つ最大時間 (ミリ秒単位) を指定する整数値。デフォルトは 100 ミリ秒です。 | |
| Kafka 管理クライアントを使用してクラスター情報を取得する際に、コネクターが待機すべき最大ミリ秒数を指定する整数値です。 | |
| Kafka 管理クライアントを使用して kafka 履歴トピックを作成する間、コネクターが待機する最大ミリ秒数を指定する整数値。 | |
|
エラーでコネクターのリカバリーが失敗する前に、コネクターが永続化された履歴データの読み取りを試行する最大回数。データが受信されなかった場合に最大待機する時間は、 | |
|
コネクターが不正または不明なデータベースのステートメントを無視するかどうか、または人が問題を修正するために処理を停止するかどうかを指定するブール値。安全なデフォルトは | |
|
コネクターがスキーマまたはデータベース内のすべてのテーブルからスキーマ構造を記録するか、キャプチャー対象に指定されたテーブルのみからスキーマ構造を記録するかを指定するブール値。
| |
|
コネクターがデータベースインスタンス内のすべての論理データベースのスキーマ構造を記録するかどうかを指定するブール値。
|
パススルー MariaDB コネクター設定プロパティー
コネクター設定で pass-through プロパティーを設定して、Apache Kafka プロデューサーとコンシューマーの動作をカスタマイズできます。Kafka プロデューサーとコンシューマーの全設定プロパティーの詳細は、Kafka ドキュメント を参照してください。
プロデューサーとコンシューマーのクライアントがスキーマ履歴トピックと対話する方法を設定するための Pass-through プロパティー
Debezium は、データベーススキーマ履歴トピックへのスキーマ変更を記述するために Apache Kafka プロデューサーに依存しています。同様に、コネクターが起動すると、データベーススキーマ履歴トピックから読み取る Kafka コンシューマーに依存します。schema.history.internal.producer.*
および schema.history.internal.consumer.*
接頭辞で始まるパススルー設定プロパティーのセットに値を割り当てて、Kafka プロデューサーおよびコンシューマークライアントの設定を定義します。パススループロデューサーおよびコンシューマーデータベーススキーマ履歴プロパティーは、以下の例のように Kafka ブローカーとのこれらのクライアントの接続をセキュアにする方法など、さまざまな動作を制御します。
Debezium は、プロパティーを Kafka クライアントに渡す前に、プロパティー名から接頭辞を削除します。
Kafka プロデューサー設定プロパティー と Kafka コンシューマー設定プロパティー の詳細は、Apache Kafka ドキュメントを参照してください。
MariaDB コネクターが Kafka シグナリングトピックと対話する方法を設定するための Pass-through プロパティー
Debezium は、コネクターが Kafka シグナルトピックと対話する方法を制御する signal.*
プロパティーのセットを提供します。
以下の表は、Kafka signal
プロパティーを説明しています。
プロパティー | デフォルト | 説明 |
---|---|---|
<topic.prefix>-signal | コネクターがアドホックシグナルについて監視する Kafka トピックの名前。 注記 トピックの自動作成 が無効になっている場合は、必要なシグナリングトピックを手動で作成する必要があります。シグナルの順序を維持するには、シグナリングトピックが必要です。シグナリングトピックには単一のパーティションが必要です。 | |
kafka-signal | Kafka コンシューマーによって使用されるグループ ID の名前。 | |
デフォルトなし | コネクターが Kafka クラスターへの初期接続を確立するために使用するホストとポートのペアのリスト。各ペアは、Debezium Kafka Connect プロセスによって使用される Kafka クラスターを参照します。 | |
| コネクターが信号をポーリングするときに待機する最大ミリ秒数を指定する整数値。 |
シグナリングチャネルの Kafka コンシューマークライアントを設定するためのパススループロパティー
Debezium コネクターでは、Kafka コンシューマーのパススルー設定が可能です。パススルーシグナルのプロパティーは、接頭辞 signals.consumer.*
で始まります。たとえば、コネクターは signal.consumer.security.protocol=SSL
などのプロパティーを Kafka コンシューマーに渡します。
Debezium は、プロパティーを Kafka シグナルコンシューマーに渡す前に、プロパティーから接頭辞を削除します。
MariaDB コネクター sink notification チャネルを設定するためのパススループロパティー
次の表では、Debezium sink notification
チャネルの設定に使用できるプロパティーについて説明します。
プロパティー | デフォルト | 説明 |
---|---|---|
デフォルトなし |
Debezium から通知を受信するトピックの名前。このプロパティーは、有効な通知チャネルの 1 つとして |
Debezium コネクターのパススルーデータベースドライバー設定プロパティー
Debezium コネクターでは、データベースドライバーのパススルー設定が可能です。パススルーデータベースプロパティーは接頭辞 driver.*
で始まります。たとえば、コネクターは driver.foobar=false
などのプロパティーを JDBC URL に渡します。
Debezium は、プロパティーをデータベースドライバーに渡す前に、プロパティーから接頭辞を削除します。
2.2.7. Debezium MariaDB コネクターのパフォーマンスの監視 リンクのコピーリンクがクリップボードにコピーされました!
Debezium MariaDB コネクターは、Zookeeper、Kafka、および Kafka Connect によって提供される JMX メトリクスの組み込みサポートに加えて、3 種類のメトリクスを提供します。
- スナップショットメトリクス は、スナップショットの実行中にコネクター操作に関する情報を提供します。
- ストリーミングメトリック は、コネクターが binlog を読み取る際のコネクター操作に関する情報を提供します。
- スキーマ履歴メトリクス は、コネクターのスキーマ履歴の状態に関する情報を提供します。
Debezium の監視に関するドキュメント では、JMX を使用してこれらのメトリクスを公開する方法の詳細を説明しています。
2.2.7.1. MariaDB コネクタースナップショットとストリーミング MBean オブジェクトのカスタマイズされた名前 リンクのコピーリンクがクリップボードにコピーされました!
Debezium コネクターは、コネクターの MBean 名を介してメトリクスを公開します。これらのメトリクスは各コネクターインスタンスに固有であり、コネクターのスナップショット、ストリーミング、およびスキーマ履歴プロセスの動作に関するデータを提供します。
デフォルトでは、正しく設定されたコネクターをデプロイすると、Debezium はさまざまなコネクターメトリクスごとに一意の MBean 名を生成します。コネクタープロセスのメトリクスを表示するには、MBean を監視するように可観測性スタックを設定します。ただし、これらのデフォルトの MBean 名はコネクター設定に依存しており、設定の変更によって MBean 名が変更される場合があります。MBean 名を変更すると、コネクターインスタンスと MBean 間のリンクが切断され、監視アクティビティーが中断されます。このシナリオでは、監視を再開するには、新しい MBean 名を使用するように監視スタックを再設定する必要があります。
MBean 名の変更が原因で監視が中断されないように、カスタムメトリクスタグを設定できます。カスタムメトリクスを設定するには、コネクター設定に custom.metric.tags
プロパティーを追加します。このプロパティーは、各キーが MBean オブジェクト名のタグを表し、対応する値がそのタグの値を表すキーと値のペアを受け入れます。たとえば、k1=v1,k2=v2
です。Debezium は、指定されたタグをコネクターの MBean 名に追加します。
コネクターの custom.metric.tags
プロパティーを設定した後、指定されたタグに関連付けられたメトリクスを取得するように監視スタックを設定できます。可観測性スタックは、変更可能な MBean 名ではなく、指定されたタグを使用してコネクターを一意に識別します。その後、Debezium が MBean 名の構築方法を再定義したり、コネクター設定の topic.prefix
が変更されたりしても、メトリクススクレイプタスクは指定されたタグパターンを使用してコネクターを識別するため、メトリクスの収集は中断されません。
カスタムタグを使用するさらなる利点は、データパイプラインのアーキテクチャーを反映するタグを使用できるため、運用上のニーズに合った方法でメトリクスを整理できることです。たとえば、コネクターアクティビティーのタイプ、アプリケーションコンテキスト、またはデータソースを宣言する値を持つタグを指定できます (例: db1-streaming-for-application-abc
)。複数のキーと値のペアを指定すると、指定されたすべてのペアがコネクターの MBean 名に追加されます。
次の例は、タグがデフォルトの MBean 名を変更する方法を示しています。
例2.16 カスタムタグがコネクター MBean 名を変更する方法
デフォルトでは、MariaDB コネクターはストリーミングメトリクスに次の MBean 名を使用します。
debezium.mariadb:type=connector-metrics,context=streaming,server=<topic.prefix>
debezium.mariadb:type=connector-metrics,context=streaming,server=<topic.prefix>
custom.metric.tags
の値を database=salesdb-streaming,table=inventory
に設定すると、Debezium は次のカスタム MBean 名を生成します。
debezium.mariadb:type=connector-metrics,context=streaming,server=<topic.prefix>,database=salesdb-streaming,table=inventory
debezium.mariadb:type=connector-metrics,context=streaming,server=<topic.prefix>,database=salesdb-streaming,table=inventory
2.2.7.2. MariaDB データベースのスナップショット中の Debezium の監視 リンクのコピーリンクがクリップボードにコピーされました!
MBean は debezium.mariadb:type=connector-metrics,context=snapshot,server=<topic.prefix>
です。
スナップショット操作がアクティブでない場合や、最後のコネクターの起動後にスナップショットの作成が発生した場合に、スナップショットメトリクスは公開されません。
次の表に、使用可能なスナップショットメトリクスを示します。
属性 | タイプ | 説明 |
---|---|---|
| コネクターが読み取りした最後のスナップショットイベント。 | |
| コネクターが最新のイベントを読み取りおよび処理してからの経過時間 (ミリ秒単位)。 | |
| 前回の開始またはリセット以降にコネクターで確認されたイベントの合計数。 | |
| コネクターに設定された include/exclude リストのフィルタリングルールによってフィルターされたイベントの数。 | |
| コネクターによって取得されるテーブルのリスト。 | |
| snapshotter とメインの Kafka Connect ループの間でイベントを渡すために使用されるキューの長さ。 | |
| snapshotter とメインの Kafka Connect ループの間でイベントを渡すために使用されるキューの空き容量。 | |
| スナップショットに含まれているテーブルの合計数。 | |
| スナップショットによってまだコピーされていないテーブルの数。 | |
| スナップショットが起動されたかどうか。 | |
| スナップショットが一時停止されたかどうか。 | |
| スナップショットが中断されたかどうか。 | |
| スナップショットが完了したかどうか。 | |
| スナップショットが完了したかどうかに関わらず、これまでスナップショットにかかった時間 (秒単位)。スナップショットが一時停止された時間も含まれます。 | |
| スナップショットが一時停止された合計秒数。スナップショットが数回一時停止された場合は、一時停止時間が加算されます。 | |
| スナップショットの各テーブルに対してスキャンされる行数が含まれるマップ。テーブルは、処理中に増分がマップに追加されます。スキャンされた 10,000 行ごとに、テーブルの完成時に更新されます。 | |
|
キューの最大バッファー (バイト単位)。このメトリクスは、 | |
| キュー内のレコードの現在の容量 (バイト単位)。 |
コネクターは、増分スナップショットの実行時に、以下の追加のスナップショットメトリクスも提供します。
2.2.7.3. Debezium MariaDB コネクターレコードストリーミングの監視 リンクのコピーリンクがクリップボードにコピーされました!
Debezium MariaDB コネクターは、Zookeeper、Kafka、および Kafka Connect によって提供される JMX メトリクスの組み込みサポートに加えて、3 種類のメトリクスを提供します。
- スナップショットメトリクス は、スナップショットの実行中にコネクター操作に関する情報を提供します。
- ストリーミングメトリック は、コネクターが binlog を読み取る際のコネクター操作に関する情報を提供します。
- スキーマ履歴メトリクス は、コネクターのスキーマ履歴の状態に関する情報を提供します。
Debezium の監視に関するドキュメント では、JMX を使用してこれらのメトリクスを公開する方法の詳細を説明しています。
MBean は debezium.mariadb:type=connector-metrics,context=streaming,server=<topic.prefix>
です。
以下の表は、利用可能なストリーミングメトリクスのリストです。
属性 | タイプ | 説明 |
---|---|---|
| コネクターが読み取られた最後のストリーミングイベント。 | |
| コネクターが最新のイベントを読み取りおよび処理してからの経過時間 (ミリ秒単位)。 | |
| コネクターを最後に起動してから、またはメトリクスをリセットしてから、ソースデータベースによって報告されたデータ変更イベントの合計数。Debezium が処理するデータ変更ワークロードを表します。 | |
| コネクターを最後に起動してから、またはメトリクスをリセットしてから、コネクターによって処理された作成イベントの合計数。 | |
| コネクターを最後に起動してから、またはメトリクスをリセットしてから、コネクターによって処理された更新イベントの合計数。 | |
| コネクターを最後に起動してから、またはメトリクスをリセットしてから、コネクターによって処理された削除イベントの合計数。 | |
| コネクターに設定された include/exclude リストのフィルタリングルールによってフィルターされたイベントの数。 | |
| コネクターによって取得されるテーブルのリスト。 | |
| ストリーマーとメイン Kafka Connect ループの間でイベントを渡すために使用されるキューの長さ。 | |
| ストリーマーとメインの Kafka Connect ループの間でイベントを渡すために使用されるキューの空き容量。 | |
| コネクターが現在データベースサーバーに接続されているかどうかを示すフラグ。 | |
| 最後の変更イベントのタイムスタンプとそれを処理するコネクターとの間の期間 (ミリ秒単位)。この値には、データベースサーバーとコネクターが実行されているマシンのクロックの差が組み込まれます。 | |
| コミットされた処理済みトランザクションの数。 | |
| 最後に受信したイベントの位置。 | |
| 最後に処理されたトランザクションのトランザクション識別子。 | |
|
キューの最大バッファー (バイト単位)。このメトリクスは、 | |
| キュー内のレコードの現在の容量 (バイト単位)。 |
2.2.7.4. Debezium MariaDB コネクタースキーマ履歴の監視 リンクのコピーリンクがクリップボードにコピーされました!
MBean は debezium.mariadb:type=connector-metrics,context=schema-history,server=<topic.prefix>
です。
以下の表は、利用可能なスキーマ履歴メトリクスのリストです。
属性 | タイプ | 説明 |
---|---|---|
|
データベーススキーマ履歴の状態を示す | |
| リカバリーが開始された時点のエポック秒の時間。 | |
| リカバリーフェーズ中に読み取られた変更の数。 | |
| リカバリーおよびランタイム中に適用されるスキーマ変更の合計数。 | |
| 最後の変更が履歴ストアから復元された時点からの経過時間 (ミリ秒単位)。 | |
| 最後の変更が適用された時点からの経過時間 (ミリ秒単位)。 | |
| 履歴ストアから復元された最後の変更の文字列表現。 | |
| 最後に適用された変更の文字列表現。 |
2.2.8. Debezium MariaDB コネクターが障害や問題を処理する方法 リンクのコピーリンクがクリップボードにコピーされました!
Debezium は、複数のアップストリームデータベースのすべての変更をキャプチャーする分散システムであり、イベントの見逃しや損失は発生しません。システムが正常に操作している場合や、慎重に管理されている場合は、Debezium は変更イベントレコードごとに 1 度だけ 配信します。
障害が発生しても、システムからイベントがなくなることはありません。ただし、Debezium が障害から回復している間に、いくつかの変更イベントが繰り返される可能性があります。このような正常でない状態では、Debezium は Kafka と同様に、変更イベントを 少なくとも 1 回 配信します。
詳細は以下を参照してください。
- 設定および起動エラー
以下の状況では、起動時にコネクターが失敗し、エラーまたは例外がログに記録され、実行が停止されます。
- コネクターの設定が無効である。
- 指定された接続パラメーターを使用してコネクターを MariaDB サーバーに正常に接続できない。
- MariaDB に履歴がない binlog の位置でコネクターが再起動を試行する。
このような場合、エラーメッセージには問題の詳細が含まれ、推奨される回避策も含まれることがあります。設定の修正したり、MariaDB の問題に対処した後、コネクターを再起動します。
ただし、高可用性の MariaDB クラスターに接続している場合は、コネクターをすぐに再起動できます。これはクラスターの別の MariaDB サーバーに接続し、最後のトランザクションを表すサーバーの binlog の場所を特定し、その特定の場所から新しいサーバーの binlog の読み取りを開始します。
- Kafka Connect が正常に停止する
- Kafka Connect が正常に停止すると、Debezium MariaDB コネクタータスクが停止され、新しい Kafka Connect プロセスで再起動される間に短い遅延が発生します。
- Kafka Connect プロセスのクラッシュ
- Kafka Connect がクラッシュすると、プロセスが停止し、最後に処理されたオフセットが記録されずに Debezium MariaDB コネクタータスクが終了します。分散モードでは、Kafka Connect は他のプロセスでコネクタータスクを再起動します。ただし、MariaDB コネクターは以前のプロセスで記録された最後のオフセットから再開します。その結果、代わりのタスクによってクラッシュ前に処理された一部のイベントが再生成され、重複したイベントが作成されることがあります。
各変更イベントメッセージには、重複イベントの特定に使用できるソース固有の情報が含まれます。以下に例を示します。
- イベント元
- MariaDB サーバーのイベント時間
- binlog ファイル名と位置
- GTID
- MariaDB が binlog ファイルをパージする
- Debezium MariaDB コネクターが長時間停止すると、MariaDB サーバーは古い binlog ファイルを消去し、コネクターの最後の位置が失われる可能性があります。コネクターが再起動すると、MariaDB サーバーに開始点がなくなり、コネクターは別の最初のスナップショットを実行します。スナップショットが無効の場合、コネクターはエラーによって失敗します。
MariaDB コネクターが初期スナップショットを実行する方法の詳細は、スナップショット を参照してください。