第4章 JDBC 用の Debezium コネクター (開発者プレビュー)
Debezium JDBC コネクターは、複数のソーストピックからイベントを消費し、JDBC ドライバーを使用してそれらのイベントをリレーショナルデータベースに書き込むことができる Kafka Connect シンクコネクター実装です。このコネクターは、Db2、MySQL、Oracle、PostgreSQL、SQL Server などのさまざまなデータベース言語をサポートします。
Debezium JDBC コネクターは開発者プレビューソフトウェアのみでの使用となります。開発者プレビューソフトウェアは、Red Hat では一切サポートされておらず、機能的に完全ではなく、実稼働環境に対応していません。開発者プレビューのソフトウェアを実稼働ワークロードまたはビジネスクリティカルなワークロードには使用しないでください。開発者プレビューソフトウェアは、今後 Red Hat 製品サービスとして追加される可能性のある製品ソフトウェアを前もって早期に利用できます。お客様はこのソフトウェアを使用して機能をテストし、開発プロセス中にフィードバックを提供できます。このソフトウェアはいつでも変更または削除される可能性があり、限定的なテストしか行われていません。
Red Hat 開発者プレビューソフトウェアのサポート範囲の詳細は、開発者プレビューのサポート範囲 を参照してください。
4.1. Debezium JDBC コネクターの仕組み
Debezium JDBC コネクターは Kafka Connect シンクコネクターであるため、Kafka Connect ランタイムが必要です。コネクターは、サブスクライブしている Kafka トピックを定期的にポーリングし、それらのトピックからのイベントを消費して、設定されたリレーショナルデータベースにイベントを書き込みます。コネクターは、upsert セマンティクスと基本的なスキーマの進化を使用して、べき等書き込み操作をサポートします。
Debezium JDBC コネクターは次の機能を提供します。
- 「Debezium JDBC コネクターが複雑な変更イベントをどのように消費するかの説明」
- 「Debezium JDBC コネクターの at-least-once 配信の説明」
- 「Debezium JDBC の複数タスクの使用の説明」
- 「Debezium JDBC コネクターのデータと列タイプのマッピングの説明」
- 「Debezium JDBC コネクターがソースイベントのプライマリーキーを処理する方法の説明」
-
「
DELETE
イベントまたは tombstone イベントの使用時に行を削除するような Debezium JDBC コネクターの設定」 - 「コネクターによるべき等書き込みの実行の有効化」
- 「Debezium JDBC コネクターのスキーマ進化モード」
- 「宛先テーブル名と列名の大文字と小文字を定義するオプションの指定」
4.1.1. Debezium JDBC コネクターが複雑な変更イベントをどのように消費するかの説明
デフォルトでは、Debezium ソースコネクターは複雑で、階層分けされた変更イベントを生成します。Debezium コネクターを他の JDBC シンクコネクター実装とともに使用する場合、変更イベントのペイロードをシンク実装で使用できるように、ExtractNewRecordState
シングルメッセージ変換 (SMT) を適用する必要がある場合があります。Debezium JDBC シンクコネクターを実行する場合、Debezium シンクコネクターは変換を使用せずにネイティブの Debezium 変更イベントを直接使用できるため、SMT をデプロイする必要はありません。
JDBC シンクコネクターが Debezium ソースコネクターから複雑な変更イベントを消費するとき、元の insert
または update
イベントの after
セクションから値を抽出します。削除イベントがシンクコネクターによって消費されるとき、イベントのペイロードの一部は参照されません。
Debezium JDBC シンクコネクターは、スキーマ変更トピックから読み取るように設計されていません。ソースコネクターがスキーマ変更をキャプチャーするように設定されている場合は、コネクターがスキーマ変更トピックを消費しないように、JDBC コネクター設定で topics
または topic.regex
プロパティーを設定します。
4.1.2. Debezium JDBC コネクターの at-least-once 配信の説明
Debezium JDBC シンクコネクターは、Kafka トピックから消費されるイベントが少なくとも 1 回は処理されるようにします。
4.1.3. Debezium JDBC の複数タスクの使用の説明
Debezium JDBC シンクコネクターは、複数の Kafka Connect タスクにわたって実行できます。複数のタスクに対してコネクターを実行するには、tasks.max
設定プロパティーでコネクターで使用するタスクの数を設定します。Kafka Connect ランタイムは、指定された数のタスクを開始し、タスクごとにコネクターの 1 つのインスタンスを実行します。複数のタスクの場合は、複数のソーストピックからの変更を並行して読み取り処理することでパフォーマンスを向上させます。
4.1.4. Debezium JDBC コネクターのデータと列タイプのマッピングの説明
Debezium JDBC シンクコネクターが受信メッセージフィールドのデータ型を送信メッセージフィールドに正しくマップできるようにするには、コネクターはソースイベントに存在する各フィールドのデータ型に関する情報を必要とします。コネクターは、さまざまなデータベース言語にわたる幅広い列タイプのマッピングをサポートします。イベントフィールドの type
メタデータから宛先列の型を正しく変換するために、コネクターはソースデータベースに定義されているデータ型マッピングを適用します。ソースコネクター設定で column.propagate.source.type
または datatype.propagate.source.type
オプションを設定して、コネクターが列のデータ型を解決する方法を強化できます。これらのオプションを有効にすると、Debezium には追加のパラメーターメタデータが含まれ、JDBC シンクコネクターが宛先列のデータ型をより正確に解決できるようになります。
Debezium JDBC シンクコネクターが Kafka トピックからのイベントを処理するには、Kafka トピックメッセージキー (存在する場合) がプリミティブデータ型または Struct
である必要があります。さらに、ソースメッセージのペイロードの Struct
は、ネストされた Struct
タイプのないフラットな構造、または Debezium の複雑な階層構造に準拠したネストされた Struct
レイアウトのいずれかを含む必要があります。
Kafka トピック内のイベントの構造がこれらのルールに準拠していない場合は、カスタムの単一メッセージ変換を実装して、ソースイベントの構造を使用可能な形式に変換する必要があります。
4.1.5. Debezium JDBC コネクターがソースイベントのプライマリーキーを処理する方法の説明
デフォルトでは、Debezium JDBC シンクコネクターは、ソースイベントのフィールドをイベントのプライマリーキーに変換しません。ビジネス要件によっては、またはシンクコネクターが upsert セマンティクスを使用する場合は、安定したプライマリーキーがないとイベント処理が複雑になる可能性があります。一貫したプライマリーキーを定義するには、次の表に示すプライマリーキーモードのいずれかを使用するようにコネクターを設定できます。
モード | 説明 |
---|---|
| テーブルの作成時にプライマリーキーフィールドが指定されていません。 |
| プライマリーキーは次の 3 つの列で構成されます。
これらの列の値は、Kafka イベントの座標から取得されます。 |
|
プライマリーキーは、Kafka イベントのキーで構成されます。 |
|
プライマリーキーは、Kafka イベントの値で構成されます。 |
一部のデータベース言語では、primary.key.mode
を kafka
に設定し、schema.evolution
を Basic
に設定すると例外が発生する場合があります。この例外は、方言が STRING
データ型マッピングを TEXT
や CLOB
などの可変長文字列データ型にマップし、さらにこの方言で、プライマリー列に制限なく長さを指定できない場合に発生します。この問題を回避するには、環境に次の設定を適用します。
-
schema.evolution
をBasic
に設定しないでください。 - データベースのテーブルとプライマリーキーのマッピングを事前に作成します。
4.1.6. DELETE
イベントまたは tombstone イベントの使用時に行を削除するような Debezium JDBC コネクターの設定
Debezium JDBC シンクコネクターは、DELETE
または tombstone イベントが消費されると、宛先データベース内の行を削除できます。デフォルトでは、JDBC シンクコネクターにより削除モードは有効になりません。
コネクターで行を削除する場合は、コネクター設定で明示的に delete.enabled=true
を設定する必要があります。このモードを使用するには、primary.key.fields
を none
以外の値に設定する必要もあります。削除はプライマリーキーマッピングに基づいて実行されるため、前述の設定が必要です。そのため、宛先テーブルにプライマリーキーマッピングがない場合、コネクターは行を削除できません。
4.1.7. コネクターによるべき等書き込みの実行の有効化
Debezium JDBC シンクコネクターはべき等書き込みを実行できるため、同じレコードを繰り返し再生し、データベースの最終状態を変更できません。
コネクターがべき等書き込みを実行できるようにするには、コネクターの insert.mode
を明示的に upsert
に設定する必要があります。upsert
操作は、指定されたプライマリーキーがすでに存在するかどうかに応じて、update
または insert
として適用されます。
プライマリーキー値がすでに存在する場合、この操作により行内の値が更新されます。指定されたプライマリーキー値が存在しない場合、insert
で新しい行が追加されます。
upsert 操作には SQL 標準がないため、各データベース方言はべき等書き込みを異なる方法で処理します。次の表は、Debezium がサポートするデータベース言語の upsert
の DML 構文を示しています。
方言 | Upsert 構文 |
---|---|
Db2 |
|
MySQL |
|
Oracle |
|
PostgreSQL |
|
SQL Server |
|
4.1.8. Debezium JDBC コネクターのスキーマ進化モード
Debezium JDBC シンクコネクターでは、次のスキーマ進化モードを使用できます。
モード | 説明 |
---|---|
| コネクターは、DDL スキーマの進化を実行しません。 |
| コネクターは、イベントペイロード内にあるが宛先テーブルには存在しないフィールドを自動的に検出します。コネクターは宛先テーブルを変更して新しいフィールドを追加します。 |
schema.evolution
が Basic
に設定されている場合、コネクターは受信イベントの構造に従って宛先データベーステーブルを自動的に作成または変更します。
トピックから初めてイベントを受信し、宛先テーブルがまだ存在しない場合、Debezium JDBC シンクコネクターはイベントのキー、またはレコードのスキーマ構造を使用してテーブルの列構造を解決します。スキーマの進化が有効な場合、コネクターは、DML イベントを宛先テーブルに適用する前に、CREATE TABLE
SQL ステートメントを準備して実行します。
Debezium JDBC コネクターがトピックからイベントを受信するとき、レコードのスキーマ構造が宛先テーブルのスキーマ構造と異なる場合、コネクターはイベントのキーまたはそのスキーマ構造を使用してどの列が新しいかを識別し、データベーステーブルに追加する必要があります。スキーマの進化が有効な場合、コネクターは、宛先テーブルに DML イベントを適用する前に、ALTER TABLE
SQL ステートメントを準備して実行します。列のデータ型の変更、列の削除、およびプライマリーキーの調整は危険な操作であると考えられるため、コネクターではこれらの操作の実行が禁止されています。
各フィールドのスキーマによって、列が NULL
か NOT NULL
かが決まります。スキーマは各列のデフォルト値も定義します。コネクターが NULL を指定できるかどうかの設定、または必要のないデフォルト値を含めてテーブルを作成しようとしている場合には、事前に手動でテーブルを作成するか、シンクコネクターがイベントを処理する前に関連のフィールドのスキーマを調節する必要があります。NULL を指定できるかどうかの設定またはデフォルト値を調整するには、パイプライン内の変更を適用するか、ソースデータベースで定義されている列の状態を変更するカスタムの単一メッセージ変換を導入できます。
フィールドのデータ型は、事前定義されたマッピングのセットに基づいて解決されます。詳細は、「Debezium JDBC コネクターがデータ型をマップする方法」 を参照してください。
宛先データベースにすでに存在するテーブルのイベント構造に新しいフィールドを導入する場合は、新しいフィールドをオプションとして定義するか、フィールドのデフォルト値がデータベーススキーマで指定されている必要があります。フィールドを宛先テーブルから削除する場合は、次のいずれかのオプションを使用します。
- フィールドを手動で削除します。
- 列をドロップします。
- フィールドにデフォルト値を割り当てます。
- フィールドを NULL を指定可能として定義します。
4.1.9. 宛先テーブル名と列名の大文字と小文字を定義するオプションの指定
Debezium JDBC シンクコネクターは、宛先データベースで実行される DDL (スキーマ変更) または DML (データ変更) SQL ステートメントを構築することによって、Kafka メッセージを消費します。デフォルトでは、コネクターはソーストピックの名前とイベントフィールドの名前を、宛先テーブルのテーブル名と列名の基礎として使用します。構築された SQL では、元の文字列の大文字と小文字を保持するために、識別子が引用符で自動的に区切られることはありません。その結果、デフォルトでは、宛先データベース内のテーブル名または列名のテキストの大文字と小文字の区別については、大文字と小文字が指定されていない場合にデータベースが名前文字列を処理する方法に完全に依存します。
たとえば、宛先データベースの言語が Oracle で、イベントのトピックが order
である場合、名前が引用符で囲まれていない場合、Oracle ではデフォルトで大文字の名前が使用されるため、宛先テーブルは ORDERS
として作成されます。同様に、宛先データベース言語が PostgreSQL で、イベントのトピックが ORDERS
である場合、名前が引用符で囲まれていない場合、PostgreSQL はデフォルトで小文字の名前を使用するため、宛先テーブルは order
として作成されます。
Kafka イベントに存在するテーブル名とフィールド名の大文字と小文字を明示的に保持するには、コネクター設定で quote.identifiers
プロパティーの値を true
に設定します。このオプションが設定されている場合、受信イベントが order
というトピックのもので、宛先データベース言語が Oracle である場合、コネクターは、構築された SQL でテーブルの名前が "orders"
として定義されているため、orders
という名前のテーブルを作成します。引用符を有効にすると、コネクターが列名を作成するときと同じ動作になります。