第3章 シンクコネクター
Debezium は、Apache Kafka トピックなど、ソースからイベントを消費できるシンクコネクターを提供します。シンクコネクターはデータの形式を標準化し、設定されたシンクリポジトリーにイベントデータを保存します。その後、他のシステム、アプリケーション、またはユーザーはデータシンクからイベントにアクセスできるようになります。
シンクコネクターは、消費するイベントデータに一貫した構造を適用するため、データシンクから読み取るダウンストリームのアプリケーションは、そのデータを簡単に解釈して処理できます。
現在、Debezium は以下のシンクコネクターを提供しています。
3.1. JDBC 用の Debezium コネクター
Debezium JDBC コネクターは、複数のソーストピックからイベントを消費し、JDBC ドライバーを使用してそれらのイベントをリレーショナルデータベースに書き込むことができる Kafka Connect sink コネクター実装です。このコネクターは、Db2、MySQL、Oracle、PostgreSQL、SQL Server などのさまざまなデータベース言語をサポートします。
3.1.1. Debezium JDBC コネクターの仕組み
Debezium JDBC コネクターは Kafka Connect sink コネクターであるため、Kafka Connect ランタイムが必要です。コネクターは、サブスクライブしている Kafka トピックを定期的にポーリングし、それらのトピックからのイベントを消費して、設定されたリレーショナルデータベースにイベントを書き込みます。コネクターは、upsert セマンティクスと基本的なスキーマの進化を使用して、べき等書き込み操作をサポートします。
Debezium JDBC コネクターは次の機能を提供します。
- 「Debezium JDBC コネクターが複雑な変更イベントをどのように消費するかの説明」
- 「Debezium JDBC コネクターの at-least-once 配信の説明」
- 「Debezium JDBC の複数タスクの使用の説明」
- 「Debezium JDBC コネクターのデータと列タイプのマッピングの説明」
- 「Debezium JDBC コネクターがソースイベントのプライマリーキーを処理する方法の説明」
-
「
DELETE
イベントまたは tombstone イベントの使用時に行を削除するような Debezium JDBC コネクターの設定」 - 「コネクターによるべき等書き込みの実行の有効化」
- 「Debezium JDBC コネクターのスキーマ進化モード」
- 「宛先テーブル名と列名の大文字と小文字を定義するオプションの指定」
- 接続アイドルタイムアウト
3.1.1.1. Debezium JDBC コネクターが複雑な変更イベントをどのように消費するかの説明
デフォルトでは、Debezium ソースコネクターは複雑で、階層分けされた変更イベントを生成します。Debezium コネクターを他の JDBC sink コネクター実装とともに使用する場合、変更イベントのペイロードを sink 実装で使用できるように、ExtractNewRecordState
シングルメッセージ変換 (SMT) を適用する必要がある場合があります。Debezium JDBC sink コネクターを実行する場合、Debezium sink コネクターは変換を使用せずにネイティブの Debezium 変更イベントを直接使用できるため、SMT をデプロイする必要はありません。
JDBC sink コネクターが Debezium ソースコネクターから複雑な変更イベントを消費するとき、元の insert
または update
イベントの after
セクションから値を抽出します。削除イベントがシンクコネクターによって消費されるとき、イベントのペイロードの一部は参照されません。
Debezium JDBC sink コネクターは、スキーマ変更トピックから読み取るように設計されていません。ソースコネクターがスキーマ変更をキャプチャーするように設定されている場合は、コネクターがスキーマ変更トピックを消費しないように、JDBC コネクター設定で topics
または topic.regex
プロパティーを設定します。
3.1.1.2. Debezium JDBC コネクターの at-least-once 配信の説明
Debezium JDBC sink コネクターは、Kafka トピックから消費されるイベントが少なくとも 1 回は処理されるようにします。
3.1.1.3. Debezium JDBC の複数タスクの使用の説明
Debezium JDBC sink コネクターは、複数の Kafka Connect タスクにわたって実行できます。複数のタスクに対してコネクターを実行するには、tasks.max
設定プロパティーでコネクターで使用するタスクの数を設定します。Kafka Connect ランタイムは、指定された数のタスクを開始し、タスクごとにコネクターの 1 つのインスタンスを実行します。複数のタスクの場合は、複数のソーストピックからの変更を並行して読み取り処理することでパフォーマンスを向上させます。
3.1.1.4. Debezium JDBC コネクターのデータと列タイプのマッピングの説明
Debezium JDBC sink コネクターが受信メッセージフィールドのデータ型を送信メッセージフィールドに正しくマップできるようにするには、コネクターはソースイベントに存在する各フィールドのデータ型に関する情報を必要とします。コネクターは、さまざまなデータベース言語にわたる幅広い列タイプのマッピングをサポートします。イベントフィールドの type
メタデータから宛先列の型を正しく変換するために、コネクターはソースデータベースに定義されているデータ型マッピングを適用します。ソースコネクター設定で column.propagate.source.type
または datatype.propagate.source.type
オプションを設定して、コネクターが列のデータ型を解決する方法を強化できます。これらのオプションを有効にすると、Debezium には追加のパラメーターメタデータが含まれ、JDBC sink コネクターが宛先列のデータ型をより正確に解決できるようになります。
Debezium JDBC sink コネクターが Kafka トピックからのイベントを処理するには、Kafka トピックメッセージキー (存在する場合) がプリミティブデータ型または Struct
である必要があります。さらに、ソースメッセージのペイロードの Struct
は、ネストされた Struct
タイプのないフラットな構造、または Debezium の複雑な階層構造に準拠したネストされた Struct
レイアウトのいずれかを含む必要があります。
Kafka トピック内のイベントの構造がこれらのルールに準拠していない場合は、カスタムの単一メッセージ変換を実装して、ソースイベントの構造を使用可能な形式に変換する必要があります。
3.1.1.5. Debezium JDBC コネクターがソースイベントのプライマリーキーを処理する方法の説明
デフォルトでは、Debezium JDBC sink コネクターは、ソースイベントのフィールドをイベントのプライマリーキーに変換しません。ビジネス要件によっては、またはシンクコネクターが upsert セマンティクスを使用する場合は、安定したプライマリーキーがないとイベント処理が複雑になる可能性があります。一貫したプライマリーキーを定義するには、次の表に示すプライマリーキーモードのいずれかを使用するようにコネクターを設定できます。
モード | 説明 |
---|---|
| テーブルの作成時にプライマリーキーフィールドが指定されていません。 |
| プライマリーキーは次の 3 つの列で構成されます。
これらの列の値は、Kafka イベントの座標から取得されます。 |
|
プライマリーキーは、Kafka イベントのキーで構成されます。 |
|
プライマリーキーは、Kafka イベントの値で構成されます。 |
|
プライマリーキーは、Kafka イベントのヘッダーで構成されます。 |
一部のデータベース言語では、primary.key.mode
を kafka
に設定し、schema.evolution
を Basic
に設定すると例外が発生する場合があります。この例外は、方言が STRING
データ型マッピングを TEXT
や CLOB
などの可変長文字列データ型にマップし、さらにこの方言で、プライマリー列に制限なく長さを指定できない場合に発生します。この問題を回避するには、環境に次の設定を適用します。
-
schema.evolution
をBasic
に設定しないでください。 - データベースのテーブルとプライマリーキーのマッピングを事前に作成します。
列がターゲットデータベースのプライマリーキーとして許可されていないデータ型にマップされる場合、primary.key.fields
にそのような列を除外する明示的な列リストが必要になります。許可されるデータ型と許可されないデータ型については、特定のデータベースベンダーのドキュメントを参照してください。
3.1.1.6. DELETE
イベントまたは tombstone イベントの使用時に行を削除するような Debezium JDBC コネクターの設定
Debezium JDBC sink コネクターは、DELETE
または tombstone イベントが消費されると、宛先データベース内の行を削除できます。デフォルトでは、JDBC sink コネクターにより削除モードは有効になりません。
コネクターで行を削除する場合は、コネクター設定で明示的に delete.enabled=true
を設定する必要があります。このモードを使用するには、primary.key.fields
を none
以外の値に設定する必要もあります。削除はプライマリーキーマッピングに基づいて実行されるため、前述の設定が必要です。そのため、宛先テーブルにプライマリーキーマッピングがない場合、コネクターは行を削除できません。
3.1.1.7. コネクターによるべき等書き込みの実行の有効化
Debezium JDBC sink コネクターはべき等書き込みを実行できるため、同じレコードを繰り返し再生し、データベースの最終状態を変更できません。
コネクターがべき等書き込みを実行できるようにするには、コネクターの insert.mode
を明示的に upsert
に設定する必要があります。upsert
操作は、指定されたプライマリーキーがすでに存在するかどうかに応じて、update
または insert
として適用されます。
プライマリーキー値がすでに存在する場合、この操作により行内の値が更新されます。指定されたプライマリーキー値が存在しない場合、insert
で新しい行が追加されます。
upsert 操作には SQL 標準がないため、各データベース方言はべき等書き込みを異なる方法で処理します。次の表は、Debezium がサポートするデータベース言語の upsert
の DML 構文を示しています。
方言 | Upsert 構文 |
---|---|
Db2 |
|
MySQL |
|
Oracle |
|
PostgreSQL |
|
SQL Server |
|
3.1.1.8. Debezium JDBC コネクターのスキーマ進化モード
Debezium JDBC sink コネクターでは、次のスキーマ進化モードを使用できます。
モード | 説明 |
---|---|
| コネクターは、DDL スキーマの進化を実行しません。 |
| コネクターは、イベントペイロード内にあるが宛先テーブルには存在しないフィールドを自動的に検出します。コネクターは宛先テーブルを変更して新しいフィールドを追加します。 |
schema.evolution
が Basic
に設定されている場合、コネクターは受信イベントの構造に従って宛先データベーステーブルを自動的に作成または変更します。
トピックから初めてイベントを受信し、宛先テーブルがまだ存在しない場合、Debezium JDBC sink コネクターはイベントのキー、またはレコードのスキーマ構造を使用してテーブルの列構造を解決します。スキーマの進化が有効な場合、コネクターは、DML イベントを宛先テーブルに適用する前に、CREATE TABLE
SQL ステートメントを準備して実行します。
Debezium JDBC コネクターがトピックからイベントを受信するとき、レコードのスキーマ構造が宛先テーブルのスキーマ構造と異なる場合、コネクターはイベントのキーまたはそのスキーマ構造を使用してどの列が新しいかを識別し、データベーステーブルに追加する必要があります。スキーマの進化が有効な場合、コネクターは、宛先テーブルに DML イベントを適用する前に、ALTER TABLE
SQL ステートメントを準備して実行します。列のデータ型の変更、列の削除、およびプライマリーキーの調整は危険な操作であると考えられるため、コネクターではこれらの操作の実行が禁止されています。
各フィールドのスキーマによって、列が NULL
か NOT NULL
かが決まります。スキーマは各列のデフォルト値も定義します。コネクターが NULL を指定できるかどうかの設定、または必要のないデフォルト値を含めてテーブルを作成しようとしている場合には、事前に手動でテーブルを作成するか、sink コネクターがイベントを処理する前に関連のフィールドのスキーマを調節する必要があります。NULL を指定できるかどうかの設定またはデフォルト値を調整するには、パイプライン内の変更を適用するか、ソースデータベースで定義されている列の状態を変更するカスタムの単一メッセージ変換を導入できます。
フィールドのデータ型は、事前定義されたマッピングのセットに基づいて解決されます。詳細は、「Debezium JDBC コネクターがデータ型をマップする方法」 を参照してください。
宛先データベースにすでに存在するテーブルのイベント構造に新しいフィールドを導入する場合は、新しいフィールドをオプションとして定義するか、フィールドのデフォルト値がデータベーススキーマで指定されている必要があります。フィールドを宛先テーブルから削除する場合は、次のいずれかのオプションを使用します。
- フィールドを手動で削除します。
- 列をドロップします。
- フィールドにデフォルト値を割り当てます。
- フィールドを NULL を指定可能として定義します。
3.1.1.9. 宛先テーブル名と列名の大文字と小文字を定義するオプションの指定
Debezium JDBC sink コネクターは、宛先データベースで実行される DDL (スキーマ変更) または DML (データ変更) SQL ステートメントを構築することによって、Kafka メッセージを消費します。デフォルトでは、コネクターはソーストピックの名前とイベントフィールドの名前を、宛先テーブルのテーブル名と列名の基礎として使用します。構築された SQL では、元の文字列の大文字と小文字を保持するために、識別子が引用符で自動的に区切られることはありません。その結果、デフォルトでは、宛先データベース内のテーブル名または列名のテキストの大文字と小文字の区別については、大文字と小文字が指定されていない場合にデータベースが名前文字列を処理する方法に完全に依存します。
たとえば、宛先データベースの言語が Oracle で、イベントのトピックが order
である場合、名前が引用符で囲まれていない場合、Oracle ではデフォルトで大文字の名前が使用されるため、宛先テーブルは ORDERS
として作成されます。同様に、宛先データベース言語が PostgreSQL で、イベントのトピックが ORDERS
である場合、名前が引用符で囲まれていない場合、PostgreSQL はデフォルトで小文字の名前を使用するため、宛先テーブルは order
として作成されます。
Kafka イベントに存在するテーブル名とフィールド名の大文字と小文字を明示的に保持するには、コネクター設定で quote.identifiers
プロパティーの値を true
に設定します。このオプションが設定されている場合、受信イベントが order
というトピックのもので、宛先データベース言語が Oracle である場合、コネクターは、構築された SQL でテーブルの名前が "orders"
として定義されているため、orders
という名前のテーブルを作成します。引用符を有効にすると、コネクターが列名を作成するときと同じ動作になります。
接続アイドルタイムアウト
Debezium の JDBC sink コネクターは、接続プールを活用してパフォーマンスを向上させます。接続プールは、初期の接続セットを確立し、指定された数の接続を維持し、必要に応じてアプリケーションに効率的に接続を割り当てるように設計されています。ただし、接続がプール内でアイドル状態のままになると問題が発生し、データベースに設定されたアイドルタイムアウトしきい値を超えて、非アクティブの状態が続くと、タイムアウトがトリガーされる可能性があります。
アイドル接続スレッドがタイムアウトをトリガーする可能性を軽減するために、接続プールは各接続のアクティビティーを定期的に検証するメカニズムを提供します。この検証により、接続がアクティブの状態を確保でき、データベースが接続をアイドル状態としてフラグ付けされないようにします。ネットワークが中断された場合に、Debezium が終了した接続を使用しようとすると、コネクターはプールに対して新しい接続を生成するよう要求します。
デフォルトでは、Debezium JDBC sink コネクターはアイドルタイムアウトテストを実行しません。ただし、hibernate.c3p0.idle_test_period
プロパティーを設定することで、コネクターがプールに対して、指定の間隔でタイムアウトテストを実行する要求を行うように設定できます。以下に例を示します。
タイムアウト設定の例
{ "hibernate.c3p0.idle_test_period": "300" }
Debezium JDBC sink コネクターは、Hibernate C3P0 接続プールを使用します。hibernate.c3p0.*` 設定名前空間でプロパティーを設定することで、CP30 接続プールをカスタマイズできます。上記の例では、hibernate.c3p0.idle_test_period プロパティーの設定により、接続プールが 300 秒ごとにアイドルタイムアウトテストを実行するように設定されます。設定を適用すると、接続プールは 5 分ごとに未使用の接続の評価を開始します。
3.1.2. Debezium JDBC コネクターがデータ型をマップする方法
Debezium JDBC sink コネクターは、論理またはプリミティブ型マッピングシステムを使用して列のデータ型を解決します。プリミティブ型には、整数、浮動小数点、ブール値、文字列、バイトなどの値が含まれます。通常、これらの型は、特定の Kafka Connect Schema
型コードのみで表されます。論理データ型は、フィールド名とスキーマの固定セットが含まれる Struct
ベースの型などの値、またはエポックからの日数など、特定のエンコーディングで表される値を含むなど、複雑な型であることがよくあります。
次の例は、プリミティブデータ型と論理データ型の代表的な構造を示しています。
プリミティブフィールドスキーマ
{ "schema": { "type": "INT64" } }
論理フィールドスキーマ
[ "schema": { "type": "INT64", "name": "org.apache.kafka.connect.data.Date" } ]
Kafka Connect だけが、これらの複雑な論理型のソースというわけではありません。実際、Debezium ソースコネクターは、タイムスタンプ、日付、さらには JSON データなど、さまざまなデータ型を表す同様の論理型を含むフィールドを持つ変更イベントを生成します。
Debezium JDBC sink コネクターは、このようなプリミティブおよび論理型を仕様して JDBC SQL コード (列の方を表す) に列の方を解決します。これらの JDBC SQL コードは、基礎となる Hibernate 永続フレームワークによって使用され、列の型を使用中の方言の論理データ型に解決します。次の表は、Kafka Connect と JDBC SQL 型の間、および Debezium と JDBC SQL 型の間の基本マッピングと論理マッピングを示しています。実際の最終的な列のタイプは、データベースのタイプごとに異なります。
プリミティブ型 | JDBC SQL 型 |
---|---|
INT8 | Types.TINYINT |
INT16 | Types.SMALLINT |
INT32 | Types.INTEGER |
INT64 | Types.BIGINT |
FLOAT32 | Types.FLOAT |
FLOAT64 | Types.DOUBLE |
BOOLEAN | Types.BOOLEAN |
STRING | Types.CHAR、Types.NCHAR、Types.VARCHAR、Types.NVARCHAR |
BYTES | Types.VARBINARY |
論理型 | JDBC SQL 型 |
---|---|
org.apache.kafka.connect.data.Decimal | Types.DECIMAL |
org.apache.kafka.connect.data.Date | Types.DATE |
org.apache.kafka.connect.data.Time | Types.TIMESTAMP |
org.apache.kafka.connect.data.Timestamp | Types.TIMESTAMP |
論理型 | JDBC SQL 型 |
---|---|
io.debezium.time.Date | Types.DATE |
io.debezium.time.Time | Types.TIMESTAMP |
io.debezium.time.MicroTime | Types.TIMESTAMP |
io.debezium.time.NanoTime | Types.TIMESTAMP |
io.debezium.time.ZonedTime | Types.TIME_WITH_TIMEZONE |
io.debezium.time.Timestamp | Types.TIMESTAMP |
io.debezium.time.MicroTimestamp | Types.TIMESTAMP |
io.debezium.time.NanoTimestamp | Types.TIMESTAMP |
io.debezium.time.ZonedTimestamp | Types.TIMESTAMP_WITH_TIMEZONE |
io.debezium.data.VariableScaleDecimal | Types.DOUBLE |
データベースがタイムゾーンのある時刻またはタイムスタンプをサポートしていない場合、マッピングはタイムゾーンなしの同等のものに解決されます。
論理型 | MySQL SQL 型 | PostgreSQL SQL 型 | SQL Server SQL 型 |
---|---|---|---|
io.debezium.data.Bits |
|
|
|
io.debezium.data.Enum |
| Types.VARCHAR | 該当なし |
io.debezium.data.Json |
|
| 該当なし |
io.debezium.data.EnumSet |
| 該当なし | 該当なし |
io.debezium.time.Year |
| 該当なし | 該当なし |
io.debezium.time.MicroDuration | 該当なし |
| 該当なし |
io.debezium.data.Ltree | 該当なし |
| 該当なし |
io.debezium.data.Uuid | 該当なし |
| 該当なし |
io.debezium.data.Xml | 該当なし |
|
|
上記のプリミティブおよび論理マッピングに加え、変更イベントのソースが Debezium ソースコネクターの場合は、列またはデータ型の伝播を有効にすることで、列のタイプと長さ、制度、スケールの解決も操作できます。強制的に伝播を行うには、ソースコネクター設定で次のプロパティーのいずれかを設定する必要があります。
-
column.propagate.source.type
-
datatype.propagate.source.type
Debezium JDBC シンクコネクターは、より高い優先順位で値を適用します。
たとえば、次のフィールドスキーマが変更イベントに含まれているとします。
Debezium は列またはデータ型の伝播を有効にしてイベントフィールドスキーマを変更します
{ "schema": { "type": "INT8", "parameters": { "__debezium.source.column.type": "TINYINT", "__debezium.source.column.length": "1" } } }
前述の例では、スキーマパラメーターが設定されていない場合、Debezium JDBC シンクコネクターはこのフィールドを Types.SMALLINT
の列型にマップします。Types.SMALLINT
には、データベース言語に応じて、さまざまな論理データベースタイプを指定できます。MySQL の場合、例の列タイプは、長さが指定されていない TINYINT
列タイプに変換されます。列またはデータ型の伝播がソースコネクターで有効になっている場合、Debezium JDBC シンクコネクターはマッピング情報を使用してデータ型マッピングプロセスを調整し、TINYINT(1)
型の列を作成します。
通常、ソースデータベースとシンクデータベースの両方に同じ種類のデータベースが使用されている場合、列またはデータ型の伝播を使用する効果は非常に大きくなります。
3.1.3. Debezium JDBC コネクターのデプロイメント
Debezium JDBC コネクターをデプロイするには、Debezium JDBC コネクターアーカイブをインストールし、コネクターを設定し、その設定を Kafka Connect に追加してコネクターを起動します。
前提条件
- Apache ZooKeeper、Apache Kafka、および Kafka Connect がインストールされている。
- 宛先データベースがインストールされ、JDBC 接続を受け入れるように設定されている。
手順
- Debezium JDBC connector plug-in archive をダウンロードします。
- ファイルを Kafka Connect 環境にデプロイメントします。
必要に応じて、Maven Central から JDBC ドライバーをダウンロードし、ダウンロードしたドライバーファイルを JDBC シンクコネクター JAR ファイルが含まれるディレクトリーに展開します。
注記Oracle および Db2 用のドライバーは、JDBC シンクコネクターには含まれていません。ドライバーをダウンロードして手動でインストールする必要があります。
- JDBC シンクコネクターがインストールされているパスにドライバー JAR ファイルを追加します。
-
JDBC シンクコネクターをインストールするパスが Kafka Connect
plugin.path
の一部であることを確認してください。 - Kafka Connect プロセスを再起動し、新しい JAR ファイルを取得します。
3.1.3.1. Debezium JDBC コネクターの設定
通常、Debezium JDBC コネクターを登録するには、コネクターの設定プロパティーを指定する JSON リクエストを送信します。次の例は、最も一般的な設定を使用して、orders
というトピックからのイベントを消費する Debezium JDBC シンクコネクターのインスタンスを登録する JSON リクエストを示しています。
例: Debezium JDBC コネクターの設定
{ "name": "jdbc-connector", 1 "config": { "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector", 2 "tasks.max": "1", 3 "connection.url": "jdbc:postgresql://localhost/db", 4 "connection.username": "pguser", 5 "connection.password": "pgpassword", 6 "insert.mode": "upsert", 7 "delete.enabled": "true", 8 "primary.key.mode": "record_key", 9 "schema.evolution": "basic", 10 "database.time_zone": "UTC", 11 "topics": "orders" 12 } }
JDBC コネクター設定の説明
項目 | 説明 |
---|---|
1 | Kafka Connect サービスにコネクターを登録するときにコネクターに割り当てられる名前。 |
2 | JDBC シンクコネクタークラスの名前。 |
3 | このコネクターに作成するタスクの最大数。 |
4 | コネクターが書き込み先のシンクデータベースへの接続に使用する JDBC URL。 |
5 | 認証に使用されるデータベースユーザーの名前。 |
6 | 認証に使用されるデータベースユーザーのパスワード。 |
7 | コネクターが使用する insert.mode。 |
8 | データベース内のレコードの削除を有効にします。詳細は、delete.enabled 設定プロパティーを参照してください。 |
9 | プライマリーキー列の解決に使用する方法を指定します。詳細は、primary.key.mode 設定プロパティーを参照してください。 |
10 | コネクターが宛先データベースのスキーマを進化できるようにします。詳細は、schema.evolution 設定プロパティーを参照してください。 |
11 | 時間フィールドタイプを書き込むときに使用されるタイムゾーンを指定します。 |
12 | 消費するトピックのコンマ区切りのリスト。 |
Debezium JDBC コネクターに設定できる設定プロパティーの完全なリストは、JDBC コネクターのプロパティー を参照してください。
POST
コマンドを使用して、この設定を実行中の Kafka Connect サービスに送信できます。サービスは設定を記録し、次の操作を実行するシンクコネクタータスクを開始します。
- データベースに接続します。
- サブスクライブされた Kafka トピックからのイベントを消費します。
- 設定されたデータベースにイベントを書き込みます。
3.1.4. Debezium JDBC コネクター設定プロパティーの説明
Debezium JDBC シンクコネクターには、ニーズを満たすコネクターの動作を実現するために使用できるいくつかの設定プロパティーがあります。多くのプロパティーにはデフォルト値があります。プロパティーに関する情報は、以下のように設定されています。
プロパティー | デフォルト | 説明 |
---|---|---|
デフォルトなし | コネクターの一意名。コネクターの登録時にこの名前を再利用しようとすると失敗します。このプロパティーはすべての Kafka Connect コネクターに必要です。 | |
デフォルトなし |
コネクターの Java クラスの名前。Debezium JDBC コネクターの場合、 | |
1 | このコネクターに使用するタスクの最大数。 | |
デフォルトなし |
消費するトピックのコンマ区切りのリスト。このプロパティーを | |
デフォルトなし |
消費するトピックを指定する正規表現。内部的には、正規表現は |
プロパティー | デフォルト | 説明 |
---|---|---|
| 使用する接続プロバイダーの実装。 | |
デフォルトなし | データベースへの接続に使用される JDBC 接続 URL。 | |
デフォルトなし | コネクターがデータベースへの接続に使用するデータベースユーザーアカウントの名前。 | |
デフォルトなし | コネクターがデータベースへの接続に使用するパスワード。 | |
| プール内の最小接続数を指定します。 | |
| プールが維持する同時接続の最大数を指定します。 | |
| 接続プールが最大サイズを超えた場合にコネクターが取得を試みる接続の数を指定します。 | |
| 未使用の接続が破棄されるまで保持する時間 (秒) を指定します。 |
プロパティー | デフォルト | 説明 |
---|---|---|
| JDBC 時間値を挿入するときに使用するタイムゾーンを指定します。 | |
|
コネクターが | |
|
コネクターが 注記
JDBC コネクターが Db2 から受信した
上記のクエリーを送信するユーザーアカウントには、切り捨てられるテーブルに対する | |
| イベントのデータベースへの挿入に使用するストラテジーを指定します。以下のオプションが利用できます。
| |
| コネクターがイベントからプライマリーキー列を解決する方法を指定します。
| |
デフォルトなし |
プライマリーキー列の名前、またはプライマリーキーの導出元となるフィールドのコンマ区切りリスト。 | |
| 生成された SQL ステートメントでテーブル名と列名を区切るために引用符を使用するかどうかを指定します。詳細は 「宛先テーブル名と列名の大文字と小文字を定義するオプションの指定」 のセクションを参照してください。 | |
| コネクターが宛先テーブルのスキーマを進化させる方法を指定します。詳細は、「Debezium JDBC コネクターのスキーマ進化モード」 を参照してください。以下のオプションが利用できます。
| |
|
コネクターが宛先テーブルの名前を構築するために使用する文字列パターンを指定します。 | |
|
PostgreSQL PostGIS エクステンションがインストールされているスキーマ名を指定します。デフォルトは | |
|
SQL Server テーブルの identity 列への | |
| 宛先テーブルにまとめてバッチ処理するレコードの数を指定します。 注記
Connect ワーカープロパティーの | |
| Debezium JDBC コネクターの削減バッファーを有効にするかどうかを指定します。 次のいずれかの設定を選択します。
削減バッファーが有効になっている場合に PostgreSQL シンクデータベースでのクエリー処理を最適化するには、JDBC 接続 URL に | |
空の文字列 |
変更イベント値から追加するフィールドの完全修飾名と一致するフィールド名の、オプションのコンマ区切りリスト。フィールドの完全修飾名の形式は、 | |
空の文字列 |
変更イベント値から除外するフィールドの完全修飾名と一致するフィールド名の、オプションのコンマ区切りリスト。フィールドの完全修飾名の形式は、 |
プロパティー | デフォルト | 説明 |
---|---|---|
|
コネクターがイベントフィールド名から列名を解決するために使用する | |
|
コネクターが受信イベントトピック名からテーブル名を解決するために使用する
|
JDBC コネクター hibernate.*
パススループロパティー
Kafka Connect はパススルー設定をサポートしており、コネクター設定から特定のプロパティーを直接渡すことで、基盤となるシステムの動作を変更できます。デフォルトでは、一部の Hibernate プロパティーは、JDBC コネクター 接続プロパティー (connection.url
、connection.username
、connection.pool.*_size など
) およびコネクターの ランタイムプロパティー (database.time_zone
、quote.identifiers など
) を通じて公開されます。
他の Hibernate の動作をカスタマイズする場合は、hibernate.* 名前空間を使用するプロパティーをコネクター設定に追加することで、パススルーのメカニズムを利用できます。
たとえば、Hibernate がターゲットデータベースのタイプとバージョンを解決できるようにするには、hibernate.dialect
プロパティーを追加し、それをデータベースの完全修飾クラス名 (例: org.hibernate.dialect.MariaDBDialect
) に設定します。
3.1.5. JDBC コネクターのよくある質問
ExtractNewRecordState
単一メッセージ変換は必要ですか?- いいえ、実際には、Debezium JDBC コネクターを他の実装と区別する要素の 1 つです。このコネクターは、競合するコネクターと同様にフラット化されたイベントを取り込むことができますが、特定の種類の変換を必要とせずに、Debezium の複雑な変更イベント構造をネイティブに取り込むこともできます。
- 列の型が変更された場合、または列の名前が変更または削除された場合、これはスキーマの進化によって処理されますか?
- いいえ、Debezium JDBC コネクターは既存の列に変更を加えません。コネクターによってサポートされるスキーマの進化は非常に基本的なものです。これは、イベント構造内のフィールドをテーブルの列リストと単純に比較し、まだ列として定義されていないフィールドをテーブルに追加します。列のタイプまたはデフォルト値が変更された場合、コネクターは宛先データベースでの調整は行いません。列の名前が変更された場合、古い列はそのまま残され、コネクターは新しい名前の列をテーブルに追加します。ただし、古い列のデータを含む既存の行は変更されません。このようなタイプのスキーマ変更は手動で処理する必要があります。
- 列の型が希望する型に解決されない場合、別のデータ型に強制的にマッピングするにはどうすればよいですか?
- Debezium JDBC コネクターは、高度な型システムを使用して列のデータ型を解決します。この型システムが特定のフィールドのスキーマ定義を JDBC 型に解決する方法の詳細は、「Debezium JDBC コネクターのデータと列タイプのマッピングの説明」 セクションを参照してください。別のデータ型マッピングを適用する場合は、テーブルを手動で定義して、優先される列の型を明示的に取得します。
- Kafka トピック名を変更せずに、テーブル名に接頭辞または接尾辞を指定するにはどうすればよいですか?
-
宛先テーブル名に接頭辞または接尾辞を追加するには、table.name.format コネクター設定プロパティーを調整して、必要な接頭辞または接尾辞を適用します。たとえば、すべてのテーブル名に
jdbc_
という接頭辞を付けるには、table.name.format
設定プロパティーにjdbc_${topic}
の値を指定します。コネクターがorder
というトピックにサブスクライブされている場合、結果のテーブルはjdbc_orders
として作成されます。 - 識別子の引用符が有効になっていないにもかかわらず、一部の列が自動的に引用符で囲まれるのはなぜですか?
-
状況によっては、
quote.identifiers
が有効になっていない場合でも、特定の列名またはテーブル名が明示的に引用符で囲まれることがあります。これは、列名またはテーブル名が、不正な構文とみなされる特定の規則で始まるか、またはそのような規則が使用されている場合に引用符が必要になることがよくあります。たとえば、primary.key.mode がkafka
に設定されている場合、一部のデータベースでは、列名が引用符で囲まれている場合にのみ、列名をアンダースコアで始めることができます。引用の動作は方言固有であり、データベースの種類によって異なります。