第4章 JDBC 用の Debezium コネクター (開発者プレビュー)


Debezium JDBC コネクターは、複数のソーストピックからイベントを消費し、JDBC ドライバーを使用してそれらのイベントをリレーショナルデータベースに書き込むことができる Kafka Connect シンクコネクター実装です。このコネクターは、Db2、MySQL、Oracle、PostgreSQL、SQL Server などのさまざまなデータベース言語をサポートします。

重要

Debezium JDBC コネクターは開発者プレビューソフトウェアのみでの使用となります。開発者プレビューソフトウェアは、Red Hat では一切サポートされておらず、機能的に完全ではなく、実稼働環境に対応していません。開発者プレビューのソフトウェアを実稼働ワークロードまたはビジネスクリティカルなワークロードには使用しないでください。開発者プレビューソフトウェアは、今後 Red Hat 製品サービスとして追加される可能性のある製品ソフトウェアを前もって早期に利用できます。お客様はこのソフトウェアを使用して機能をテストし、開発プロセス中にフィードバックを提供できます。このソフトウェアはいつでも変更または削除される可能性があり、限定的なテストしか行われていません。

Red Hat 開発者プレビューソフトウェアのサポート範囲の詳細は、開発者プレビューのサポート範囲 を参照してください。

4.1. Debezium JDBC コネクターの仕組み

Debezium JDBC コネクターは Kafka Connect シンクコネクターであるため、Kafka Connect ランタイムが必要です。コネクターは、サブスクライブしている Kafka トピックを定期的にポーリングし、それらのトピックからのイベントを消費して、設定されたリレーショナルデータベースにイベントを書き込みます。コネクターは、upsert セマンティクスと基本的なスキーマの進化を使用して、べき等書き込み操作をサポートします。

Debezium JDBC コネクターは次の機能を提供します。

4.1.1. Debezium JDBC コネクターが複雑な変更イベントをどのように消費するかの説明

デフォルトでは、Debezium ソースコネクターは複雑で、階層分けされた変更イベントを生成します。Debezium コネクターを他の JDBC シンクコネクター実装とともに使用する場合、変更イベントのペイロードをシンク実装で使用できるように、ExtractNewRecordState シングルメッセージ変換 (SMT) を適用する必要がある場合があります。Debezium JDBC シンクコネクターを実行する場合、Debezium シンクコネクターは変換を使用せずにネイティブの Debezium 変更イベントを直接使用できるため、SMT をデプロイする必要はありません。

JDBC シンクコネクターが Debezium ソースコネクターから複雑な変更イベントを消費するとき、元の insert または update イベントの after セクションから値を抽出します。削除イベントがシンクコネクターによって消費されるとき、イベントのペイロードの一部は参照されません。

重要

Debezium JDBC シンクコネクターは、スキーマ変更トピックから読み取るように設計されていません。ソースコネクターがスキーマ変更をキャプチャーするように設定されている場合は、コネクターがスキーマ変更トピックを消費しないように、JDBC コネクター設定で topics または topic.regex プロパティーを設定します。

4.1.2. Debezium JDBC コネクターの at-least-once 配信の説明

Debezium JDBC シンクコネクターは、Kafka トピックから消費されるイベントが少なくとも 1 回は処理されるようにします。

4.1.3. Debezium JDBC の複数タスクの使用の説明

Debezium JDBC シンクコネクターは、複数の Kafka Connect タスクにわたって実行できます。複数のタスクに対してコネクターを実行するには、tasks.max 設定プロパティーでコネクターで使用するタスクの数を設定します。Kafka Connect ランタイムは、指定された数のタスクを開始し、タスクごとにコネクターの 1 つのインスタンスを実行します。複数のタスクの場合は、複数のソーストピックからの変更を並行して読み取り処理することでパフォーマンスを向上させます。

4.1.4. Debezium JDBC コネクターのデータと列タイプのマッピングの説明

Debezium JDBC シンクコネクターが受信メッセージフィールドのデータ型を送信メッセージフィールドに正しくマップできるようにするには、コネクターはソースイベントに存在する各フィールドのデータ型に関する情報を必要とします。コネクターは、さまざまなデータベース言語にわたる幅広い列タイプのマッピングをサポートします。イベントフィールドの type メタデータから宛先列の型を正しく変換するために、コネクターはソースデータベースに定義されているデータ型マッピングを適用します。ソースコネクター設定で column.propagate.source.type または datatype.propagate.source.type オプションを設定して、コネクターが列のデータ型を解決する方法を強化できます。これらのオプションを有効にすると、Debezium には追加のパラメーターメタデータが含まれ、JDBC シンクコネクターが宛先列のデータ型をより正確に解決できるようになります。

Debezium JDBC シンクコネクターが Kafka トピックからのイベントを処理するには、Kafka トピックメッセージキー (存在する場合) がプリミティブデータ型または Struct である必要があります。さらに、ソースメッセージのペイロードの Struct は、ネストされた Struct タイプのないフラットな構造、または Debezium の複雑な階層構造に準拠したネストされた Struct レイアウトのいずれかを含む必要があります。

Kafka トピック内のイベントの構造がこれらのルールに準拠していない場合は、カスタムの単一メッセージ変換を実装して、ソースイベントの構造を使用可能な形式に変換する必要があります。

4.1.5. Debezium JDBC コネクターがソースイベントのプライマリーキーを処理する方法の説明

デフォルトでは、Debezium JDBC シンクコネクターは、ソースイベントのフィールドをイベントのプライマリーキーに変換しません。ビジネス要件によっては、またはシンクコネクターが upsert セマンティクスを使用する場合は、安定したプライマリーキーがないとイベント処理が複雑になる可能性があります。一貫したプライマリーキーを定義するには、次の表に示すプライマリーキーモードのいずれかを使用するようにコネクターを設定できます。

モード説明

none

テーブルの作成時にプライマリーキーフィールドが指定されていません。

kafka

プライマリーキーは次の 3 つの列で構成されます。

  • __connect_topic
  • __connect_partition
  • __connect_offset

これらの列の値は、Kafka イベントの座標から取得されます。

record_key

プライマリーキーは、Kafka イベントのキーで構成されます。

プライマリーキーがプリミティブ型の場合は、primary.key.fields プロパティーを設定して、使用する列の名前を指定します。プライマリーキーが struct 型の場合、struct のフィールドはプライマリーキーの列としてマップされます。primary.key.fields プロパティーを使用して、プライマリーキーを列のサブセットに制限できます。

record_value

プライマリーキーは、Kafka イベントの値で構成されます。

Kafka イベントの値は常に Struct であるため、デフォルトでは、値に含まれるすべてのフィールドがプライマリーキーの列になります。プライマリーキーのフィールドのサブセットを使用するには、primary.key.fields プロパティーを設定して、プライマリーキー列の導出元となる値のフィールドのコンマ区切りリストを指定します。

重要

一部のデータベース言語では、primary.key.modekafka に設定し、schema.evolutionBasic に設定すると例外が発生する場合があります。この例外は、方言が STRING データ型マッピングを TEXTCLOB などの可変長文字列データ型にマップし、さらにこの方言で、プライマリー列に制限なく長さを指定できない場合に発生します。この問題を回避するには、環境に次の設定を適用します。

  • schema.evolutionBasic に設定しないでください。
  • データベースのテーブルとプライマリーキーのマッピングを事前に作成します。

4.1.6. DELETE イベントまたは tombstone イベントの使用時に行を削除するような Debezium JDBC コネクターの設定

Debezium JDBC シンクコネクターは、DELETE または tombstone イベントが消費されると、宛先データベース内の行を削除できます。デフォルトでは、JDBC シンクコネクターにより削除モードは有効になりません。

コネクターで行を削除する場合は、コネクター設定で明示的に delete.enabled=true を設定する必要があります。このモードを使用するには、primary.key.fields none 以外の値に設定する必要もあります。削除はプライマリーキーマッピングに基づいて実行されるため、前述の設定が必要です。そのため、宛先テーブルにプライマリーキーマッピングがない場合、コネクターは行を削除できません。

4.1.7. コネクターによるべき等書き込みの実行の有効化

Debezium JDBC シンクコネクターはべき等書き込みを実行できるため、同じレコードを繰り返し再生し、データベースの最終状態を変更できません。

コネクターがべき等書き込みを実行できるようにするには、コネクターの insert.mode を明示的に upsert に設定する必要があります。upsert 操作は、指定されたプライマリーキーがすでに存在するかどうかに応じて、update または insert として適用されます。

プライマリーキー値がすでに存在する場合、この操作により行内の値が更新されます。指定されたプライマリーキー値が存在しない場合、insert で新しい行が追加されます。

upsert 操作には SQL 標準がないため、各データベース方言はべき等書き込みを異なる方法で処理します。次の表は、Debezium がサポートするデータベース言語の upsert の DML 構文を示しています。

方言Upsert 構文

Db2

MERGE …​

MySQL

INSERT …​ ON DUPLICATE KEY UPDATE …​

Oracle

MERGE …​

PostgreSQL

INSERT …​ ON CONFLICT …​ DO UPDATE SET …​

SQL Server

MERGE …​

4.1.8. Debezium JDBC コネクターのスキーマ進化モード

Debezium JDBC シンクコネクターでは、次のスキーマ進化モードを使用できます。

モード説明

none

コネクターは、DDL スキーマの進化を実行しません。

基本

コネクターは、イベントペイロード内にあるが宛先テーブルには存在しないフィールドを自動的に検出します。コネクターは宛先テーブルを変更して新しいフィールドを追加します。

schema.evolutionBasic に設定されている場合、コネクターは受信イベントの構造に従って宛先データベーステーブルを自動的に作成または変更します。

トピックから初めてイベントを受信し、宛先テーブルがまだ存在しない場合、Debezium JDBC シンクコネクターはイベントのキー、またはレコードのスキーマ構造を使用してテーブルの列構造を解決します。スキーマの進化が有効な場合、コネクターは、DML イベントを宛先テーブルに適用する前に、CREATE TABLE SQL ステートメントを準備して実行します。

Debezium JDBC コネクターがトピックからイベントを受信するとき、レコードのスキーマ構造が宛先テーブルのスキーマ構造と異なる場合、コネクターはイベントのキーまたはそのスキーマ構造を使用してどの列が新しいかを識別し、データベーステーブルに追加する必要があります。スキーマの進化が有効な場合、コネクターは、宛先テーブルに DML イベントを適用する前に、ALTER TABLE SQL ステートメントを準備して実行します。列のデータ型の変更、列の削除、およびプライマリーキーの調整は危険な操作であると考えられるため、コネクターではこれらの操作の実行が禁止されています。

各フィールドのスキーマによって、列が NULLNOT NULL かが決まります。スキーマは各列のデフォルト値も定義します。コネクターが NULL を指定できるかどうかの設定、または必要のないデフォルト値を含めてテーブルを作成しようとしている場合には、事前に手動でテーブルを作成するか、シンクコネクターがイベントを処理する前に関連のフィールドのスキーマを調節する必要があります。NULL を指定できるかどうかの設定またはデフォルト値を調整するには、パイプライン内の変更を適用するか、ソースデータベースで定義されている列の状態を変更するカスタムの単一メッセージ変換を導入できます。

フィールドのデータ型は、事前定義されたマッピングのセットに基づいて解決されます。詳細は、「Debezium JDBC コネクターがデータ型をマップする方法」 を参照してください。

重要

宛先データベースにすでに存在するテーブルのイベント構造に新しいフィールドを導入する場合は、新しいフィールドをオプションとして定義するか、フィールドのデフォルト値がデータベーススキーマで指定されている必要があります。フィールドを宛先テーブルから削除する場合は、次のいずれかのオプションを使用します。

  • フィールドを手動で削除します。
  • 列をドロップします。
  • フィールドにデフォルト値を割り当てます。
  • フィールドを NULL を指定可能として定義します。

4.1.9. 宛先テーブル名と列名の大文字と小文字を定義するオプションの指定

Debezium JDBC シンクコネクターは、宛先データベースで実行される DDL (スキーマ変更) または DML (データ変更) SQL ステートメントを構築することによって、Kafka メッセージを消費します。デフォルトでは、コネクターはソーストピックの名前とイベントフィールドの名前を、宛先テーブルのテーブル名と列名の基礎として使用します。構築された SQL では、元の文字列の大文字と小文字を保持するために、識別子が引用符で自動的に区切られることはありません。その結果、デフォルトでは、宛先データベース内のテーブル名または列名のテキストの大文字と小文字の区別については、大文字と小文字が指定されていない場合にデータベースが名前文字列を処理する方法に完全に依存します。

たとえば、宛先データベースの言語が Oracle で、イベントのトピックが order である場合、名前が引用符で囲まれていない場合、Oracle ではデフォルトで大文字の名前が使用されるため、宛先テーブルは ORDERS として作成されます。同様に、宛先データベース言語が PostgreSQL で、イベントのトピックが ORDERS である場合、名前が引用符で囲まれていない場合、PostgreSQL はデフォルトで小文字の名前を使用するため、宛先テーブルは order として作成されます。

Kafka イベントに存在するテーブル名とフィールド名の大文字と小文字を明示的に保持するには、コネクター設定で quote.identifiers プロパティーの値を true に設定します。このオプションが設定されている場合、受信イベントが order というトピックのもので、宛先データベース言語が Oracle である場合、コネクターは、構築された SQL でテーブルの名前が "orders" として定義されているため、orders という名前のテーブルを作成します。引用符を有効にすると、コネクターが列名を作成するときと同じ動作になります。

Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.