第3章 シンクコネクター


Debezium は、Apache Kafka トピックなど、ソースからイベントを消費できるシンクコネクターを提供します。シンクコネクターはデータの形式を標準化し、設定されたシンクリポジトリーにイベントデータを保存します。その後、他のシステム、アプリケーション、またはユーザーはデータシンクからイベントにアクセスできるようになります。

シンクコネクターは、消費するイベントデータに一貫した構造を適用するため、データシンクから読み取るダウンストリームのアプリケーションは、そのデータを簡単に解釈して処理できます。

現在、Debezium は以下のシンクコネクターを提供しています。

3.1. JDBC 用の Debezium コネクター

Debezium JDBC コネクターは、複数のソーストピックからイベントを消費し、JDBC ドライバーを使用してそれらのイベントをリレーショナルデータベースに書き込むことができる Kafka Connect sink コネクター実装です。このコネクターは、Db2、MySQL、Oracle、PostgreSQL、SQL Server などのさまざまなデータベース言語をサポートします。

3.1.1. Debezium JDBC コネクターの仕組み

Debezium JDBC コネクターは Kafka Connect sink コネクターであるため、Kafka Connect ランタイムが必要です。コネクターは、サブスクライブしている Kafka トピックを定期的にポーリングし、それらのトピックからのイベントを消費して、設定されたリレーショナルデータベースにイベントを書き込みます。コネクターは、upsert セマンティクスと基本的なスキーマの進化を使用して、べき等書き込み操作をサポートします。

Debezium JDBC コネクターは次の機能を提供します。

3.1.1.1. Debezium JDBC コネクターが複雑な変更イベントをどのように消費するかの説明

デフォルトでは、Debezium ソースコネクターは複雑で、階層分けされた変更イベントを生成します。Debezium コネクターを他の JDBC sink コネクター実装とともに使用する場合、変更イベントのペイロードを sink 実装で使用できるように、ExtractNewRecordState シングルメッセージ変換 (SMT) を適用する必要がある場合があります。Debezium JDBC sink コネクターを実行する場合、Debezium sink コネクターは変換を使用せずにネイティブの Debezium 変更イベントを直接使用できるため、SMT をデプロイする必要はありません。

JDBC sink コネクターが Debezium ソースコネクターから複雑な変更イベントを消費するとき、元の insert または update イベントの after セクションから値を抽出します。削除イベントがシンクコネクターによって消費されるとき、イベントのペイロードの一部は参照されません。

重要

Debezium JDBC sink コネクターは、スキーマ変更トピックから読み取るように設計されていません。ソースコネクターがスキーマ変更をキャプチャーするように設定されている場合は、コネクターがスキーマ変更トピックを消費しないように、JDBC コネクター設定で topics または topic.regex プロパティーを設定します。

3.1.1.2. Debezium JDBC コネクターの at-least-once 配信の説明

Debezium JDBC sink コネクターは、Kafka トピックから消費されるイベントが少なくとも 1 回は処理されるようにします。

3.1.1.3. Debezium JDBC の複数タスクの使用の説明

Debezium JDBC sink コネクターは、複数の Kafka Connect タスクにわたって実行できます。複数のタスクに対してコネクターを実行するには、tasks.max 設定プロパティーでコネクターで使用するタスクの数を設定します。Kafka Connect ランタイムは、指定された数のタスクを開始し、タスクごとにコネクターの 1 つのインスタンスを実行します。複数のタスクの場合は、複数のソーストピックからの変更を並行して読み取り処理することでパフォーマンスを向上させます。

3.1.1.4. Debezium JDBC コネクターのデータと列タイプのマッピングの説明

Debezium JDBC sink コネクターが受信メッセージフィールドのデータ型を送信メッセージフィールドに正しくマップできるようにするには、コネクターはソースイベントに存在する各フィールドのデータ型に関する情報を必要とします。コネクターは、さまざまなデータベース言語にわたる幅広い列タイプのマッピングをサポートします。イベントフィールドの type メタデータから宛先列の型を正しく変換するために、コネクターはソースデータベースに定義されているデータ型マッピングを適用します。ソースコネクター設定で column.propagate.source.type または datatype.propagate.source.type オプションを設定して、コネクターが列のデータ型を解決する方法を強化できます。これらのオプションを有効にすると、Debezium には追加のパラメーターメタデータが含まれ、JDBC sink コネクターが宛先列のデータ型をより正確に解決できるようになります。

Debezium JDBC sink コネクターが Kafka トピックからのイベントを処理するには、Kafka トピックメッセージキー (存在する場合) がプリミティブデータ型または Struct である必要があります。さらに、ソースメッセージのペイロードの Struct は、ネストされた Struct タイプのないフラットな構造、または Debezium の複雑な階層構造に準拠したネストされた Struct レイアウトのいずれかを含む必要があります。

Kafka トピック内のイベントの構造がこれらのルールに準拠していない場合は、カスタムの単一メッセージ変換を実装して、ソースイベントの構造を使用可能な形式に変換する必要があります。

3.1.1.5. Debezium JDBC コネクターがソースイベントのプライマリーキーを処理する方法の説明

デフォルトでは、Debezium JDBC sink コネクターは、ソースイベントのフィールドをイベントのプライマリーキーに変換しません。ビジネス要件によっては、またはシンクコネクターが upsert セマンティクスを使用する場合は、安定したプライマリーキーがないとイベント処理が複雑になる可能性があります。一貫したプライマリーキーを定義するには、次の表に示すプライマリーキーモードのいずれかを使用するようにコネクターを設定できます。

モード説明

none

テーブルの作成時にプライマリーキーフィールドが指定されていません。

kafka

プライマリーキーは次の 3 つの列で構成されます。

  • __connect_topic
  • __connect_partition
  • __connect_offset

これらの列の値は、Kafka イベントの座標から取得されます。

record_key

プライマリーキーは、Kafka イベントのキーで構成されます。

プライマリーキーがプリミティブ型の場合は、primary.key.fields プロパティーを設定して、使用する列の名前を指定します。プライマリーキーが struct 型の場合、struct のフィールドはプライマリーキーの列としてマップされます。primary.key.fields プロパティーを使用して、プライマリーキーを列のサブセットに制限できます。

record_value

プライマリーキーは、Kafka イベントの値で構成されます。

Kafka イベントの値は常に Struct であるため、デフォルトでは、値に含まれるすべてのフィールドがプライマリーキーの列になります。プライマリーキーのフィールドのサブセットを使用するには、primary.key.fields プロパティーを設定して、プライマリーキー列の導出元となる値のフィールドのコンマ区切りリストを指定します。

record_header

プライマリーキーは、Kafka イベントのヘッダーで構成されます。

Kafka イベントのヘッダーには複数のヘッダーが含まれる可能性があり、各ヘッダーは Struct または基本データ型の可能性があります。コネクターはこれらのヘッダーの Struct を作成します。したがって、この Struct のすべてのフィールドがプライマリーキーの列になります。プライマリーキーのフィールドのサブセットを使用するには、primary.key.fields プロパティーを設定して、プライマリーキー列の導出元となる値のフィールドのコンマ区切りリストを指定します。

重要

一部のデータベース言語では、primary.key.modekafka に設定し、schema.evolutionBasic に設定すると例外が発生する場合があります。この例外は、方言が STRING データ型マッピングを TEXTCLOB などの可変長文字列データ型にマップし、さらにこの方言で、プライマリー列に制限なく長さを指定できない場合に発生します。この問題を回避するには、環境に次の設定を適用します。

  • schema.evolutionBasic に設定しないでください。
  • データベースのテーブルとプライマリーキーのマッピングを事前に作成します。
重要

列がターゲットデータベースのプライマリーキーとして許可されていないデータ型にマップされる場合、primary.key.fields にそのような列を除外する明示的な列リストが必要になります。許可されるデータ型と許可されないデータ型については、特定のデータベースベンダーのドキュメントを参照してください。

3.1.1.6. DELETE イベントまたは tombstone イベントの使用時に行を削除するような Debezium JDBC コネクターの設定

Debezium JDBC sink コネクターは、DELETE または tombstone イベントが消費されると、宛先データベース内の行を削除できます。デフォルトでは、JDBC sink コネクターにより削除モードは有効になりません。

コネクターで行を削除する場合は、コネクター設定で明示的に delete.enabled=true を設定する必要があります。このモードを使用するには、primary.key.fields none 以外の値に設定する必要もあります。削除はプライマリーキーマッピングに基づいて実行されるため、前述の設定が必要です。そのため、宛先テーブルにプライマリーキーマッピングがない場合、コネクターは行を削除できません。

3.1.1.7. コネクターによるべき等書き込みの実行の有効化

Debezium JDBC sink コネクターはべき等書き込みを実行できるため、同じレコードを繰り返し再生し、データベースの最終状態を変更できません。

コネクターがべき等書き込みを実行できるようにするには、コネクターの insert.mode を明示的に upsert に設定する必要があります。upsert 操作は、指定されたプライマリーキーがすでに存在するかどうかに応じて、update または insert として適用されます。

プライマリーキー値がすでに存在する場合、この操作により行内の値が更新されます。指定されたプライマリーキー値が存在しない場合、insert で新しい行が追加されます。

upsert 操作には SQL 標準がないため、各データベース方言はべき等書き込みを異なる方法で処理します。次の表は、Debezium がサポートするデータベース言語の upsert の DML 構文を示しています。

方言Upsert 構文

Db2

MERGE …​

MySQL

INSERT …​ ON DUPLICATE KEY UPDATE …​

Oracle

MERGE …​

PostgreSQL

INSERT …​ ON CONFLICT …​ DO UPDATE SET …​

SQL Server

MERGE …​

3.1.1.8. Debezium JDBC コネクターのスキーマ進化モード

Debezium JDBC sink コネクターでは、次のスキーマ進化モードを使用できます。

モード説明

none

コネクターは、DDL スキーマの進化を実行しません。

基本

コネクターは、イベントペイロード内にあるが宛先テーブルには存在しないフィールドを自動的に検出します。コネクターは宛先テーブルを変更して新しいフィールドを追加します。

schema.evolutionBasic に設定されている場合、コネクターは受信イベントの構造に従って宛先データベーステーブルを自動的に作成または変更します。

トピックから初めてイベントを受信し、宛先テーブルがまだ存在しない場合、Debezium JDBC sink コネクターはイベントのキー、またはレコードのスキーマ構造を使用してテーブルの列構造を解決します。スキーマの進化が有効な場合、コネクターは、DML イベントを宛先テーブルに適用する前に、CREATE TABLE SQL ステートメントを準備して実行します。

Debezium JDBC コネクターがトピックからイベントを受信するとき、レコードのスキーマ構造が宛先テーブルのスキーマ構造と異なる場合、コネクターはイベントのキーまたはそのスキーマ構造を使用してどの列が新しいかを識別し、データベーステーブルに追加する必要があります。スキーマの進化が有効な場合、コネクターは、宛先テーブルに DML イベントを適用する前に、ALTER TABLE SQL ステートメントを準備して実行します。列のデータ型の変更、列の削除、およびプライマリーキーの調整は危険な操作であると考えられるため、コネクターではこれらの操作の実行が禁止されています。

各フィールドのスキーマによって、列が NULLNOT NULL かが決まります。スキーマは各列のデフォルト値も定義します。コネクターが NULL を指定できるかどうかの設定、または必要のないデフォルト値を含めてテーブルを作成しようとしている場合には、事前に手動でテーブルを作成するか、sink コネクターがイベントを処理する前に関連のフィールドのスキーマを調節する必要があります。NULL を指定できるかどうかの設定またはデフォルト値を調整するには、パイプライン内の変更を適用するか、ソースデータベースで定義されている列の状態を変更するカスタムの単一メッセージ変換を導入できます。

フィールドのデータ型は、事前定義されたマッピングのセットに基づいて解決されます。詳細は、「Debezium JDBC コネクターがデータ型をマップする方法」 を参照してください。

重要

宛先データベースにすでに存在するテーブルのイベント構造に新しいフィールドを導入する場合は、新しいフィールドをオプションとして定義するか、フィールドのデフォルト値がデータベーススキーマで指定されている必要があります。フィールドを宛先テーブルから削除する場合は、次のいずれかのオプションを使用します。

  • フィールドを手動で削除します。
  • 列をドロップします。
  • フィールドにデフォルト値を割り当てます。
  • フィールドを NULL を指定可能として定義します。

3.1.1.9. 宛先テーブル名と列名の大文字と小文字を定義するオプションの指定

Debezium JDBC sink コネクターは、宛先データベースで実行される DDL (スキーマ変更) または DML (データ変更) SQL ステートメントを構築することによって、Kafka メッセージを消費します。デフォルトでは、コネクターはソーストピックの名前とイベントフィールドの名前を、宛先テーブルのテーブル名と列名の基礎として使用します。構築された SQL では、元の文字列の大文字と小文字を保持するために、識別子が引用符で自動的に区切られることはありません。その結果、デフォルトでは、宛先データベース内のテーブル名または列名のテキストの大文字と小文字の区別については、大文字と小文字が指定されていない場合にデータベースが名前文字列を処理する方法に完全に依存します。

たとえば、宛先データベースの言語が Oracle で、イベントのトピックが order である場合、名前が引用符で囲まれていない場合、Oracle ではデフォルトで大文字の名前が使用されるため、宛先テーブルは ORDERS として作成されます。同様に、宛先データベース言語が PostgreSQL で、イベントのトピックが ORDERS である場合、名前が引用符で囲まれていない場合、PostgreSQL はデフォルトで小文字の名前を使用するため、宛先テーブルは order として作成されます。

Kafka イベントに存在するテーブル名とフィールド名の大文字と小文字を明示的に保持するには、コネクター設定で quote.identifiers プロパティーの値を true に設定します。このオプションが設定されている場合、受信イベントが order というトピックのもので、宛先データベース言語が Oracle である場合、コネクターは、構築された SQL でテーブルの名前が "orders" として定義されているため、orders という名前のテーブルを作成します。引用符を有効にすると、コネクターが列名を作成するときと同じ動作になります。

接続アイドルタイムアウト

Debezium の JDBC sink コネクターは、接続プールを活用してパフォーマンスを向上させます。接続プールは、初期の接続セットを確立し、指定された数の接続を維持し、必要に応じてアプリケーションに効率的に接続を割り当てるように設計されています。ただし、接続がプール内でアイドル状態のままになると問題が発生し、データベースに設定されたアイドルタイムアウトしきい値を超えて、非アクティブの状態が続くと、タイムアウトがトリガーされる可能性があります。

アイドル接続スレッドがタイムアウトをトリガーする可能性を軽減するために、接続プールは各接続のアクティビティーを定期的に検証するメカニズムを提供します。この検証により、接続がアクティブの状態を確保でき、データベースが接続をアイドル状態としてフラグ付けされないようにします。ネットワークが中断された場合に、Debezium が終了した接続を使用しようとすると、コネクターはプールに対して新しい接続を生成するよう要求します。

デフォルトでは、Debezium JDBC sink コネクターはアイドルタイムアウトテストを実行しません。ただし、hibernate.c3p0.idle_test_period プロパティーを設定することで、コネクターがプールに対して、指定の間隔でタイムアウトテストを実行する要求を行うように設定できます。以下に例を示します。

タイムアウト設定の例

{
  "hibernate.c3p0.idle_test_period": "300"
}

Debezium JDBC sink コネクターは、Hibernate C3P0 接続プールを使用します。hibernate.c3p0.*` 設定名前空間でプロパティーを設定することで、CP30 接続プールをカスタマイズできます。上記の例では、hibernate.c3p0.idle_test_period プロパティーの設定により、接続プールが 300 秒ごとにアイドルタイムアウトテストを実行するように設定されます。設定を適用すると、接続プールは 5 分ごとに未使用の接続の評価を開始します。

3.1.2. Debezium JDBC コネクターがデータ型をマップする方法

Debezium JDBC sink コネクターは、論理またはプリミティブ型マッピングシステムを使用して列のデータ型を解決します。プリミティブ型には、整数、浮動小数点、ブール値、文字列、バイトなどの値が含まれます。通常、これらの型は、特定の Kafka Connect Schema 型コードのみで表されます。論理データ型は、フィールド名とスキーマの固定セットが含まれる Struct ベースの型などの値、またはエポックからの日数など、特定のエンコーディングで表される値を含むなど、複雑な型であることがよくあります。

次の例は、プリミティブデータ型と論理データ型の代表的な構造を示しています。

プリミティブフィールドスキーマ

{
  "schema": {
    "type": "INT64"
  }
}

論理フィールドスキーマ

[
  "schema": {
    "type": "INT64",
    "name": "org.apache.kafka.connect.data.Date"
  }
]

Kafka Connect だけが、これらの複雑な論理型のソースというわけではありません。実際、Debezium ソースコネクターは、タイムスタンプ、日付、さらには JSON データなど、さまざまなデータ型を表す同様の論理型を含むフィールドを持つ変更イベントを生成します。

Debezium JDBC sink コネクターは、このようなプリミティブおよび論理型を仕様して JDBC SQL コード (列の方を表す) に列の方を解決します。これらの JDBC SQL コードは、基礎となる Hibernate 永続フレームワークによって使用され、列の型を使用中の方言の論理データ型に解決します。次の表は、Kafka Connect と JDBC SQL 型の間、および Debezium と JDBC SQL 型の間の基本マッピングと論理マッピングを示しています。実際の最終的な列のタイプは、データベースのタイプごとに異なります。

表3.1 Kafka Connect プリミティブと列データ型の間のマッピング
プリミティブ型JDBC SQL 型

INT8

Types.TINYINT

INT16

Types.SMALLINT

INT32

Types.INTEGER

INT64

Types.BIGINT

FLOAT32

Types.FLOAT

FLOAT64

Types.DOUBLE

BOOLEAN

Types.BOOLEAN

STRING

Types.CHAR、Types.NCHAR、Types.VARCHAR、Types.NVARCHAR

BYTES

Types.VARBINARY

表3.2 Kafka Connect 論理型と列データ型の間のマッピング
論理型JDBC SQL 型

org.apache.kafka.connect.data.Decimal

Types.DECIMAL

org.apache.kafka.connect.data.Date

Types.DATE

org.apache.kafka.connect.data.Time

Types.TIMESTAMP

org.apache.kafka.connect.data.Timestamp

Types.TIMESTAMP

表3.3 Debezium 論理型と列データ型の間のマッピング
論理型JDBC SQL 型

io.debezium.time.Date

Types.DATE

io.debezium.time.Time

Types.TIMESTAMP

io.debezium.time.MicroTime

Types.TIMESTAMP

io.debezium.time.NanoTime

Types.TIMESTAMP

io.debezium.time.ZonedTime

Types.TIME_WITH_TIMEZONE

io.debezium.time.Timestamp

Types.TIMESTAMP

io.debezium.time.MicroTimestamp

Types.TIMESTAMP

io.debezium.time.NanoTimestamp

Types.TIMESTAMP

io.debezium.time.ZonedTimestamp

Types.TIMESTAMP_WITH_TIMEZONE

io.debezium.data.VariableScaleDecimal

Types.DOUBLE

重要

データベースがタイムゾーンのある時刻またはタイムスタンプをサポートしていない場合、マッピングはタイムゾーンなしの同等のものに解決されます。

表3.4 Debezium 方言固有の論理型と列データ型の間のマッピング
論理型MySQL SQL 型PostgreSQL SQL 型SQL Server SQL 型

io.debezium.data.Bits

bit(n)

bit(n) または bit varying

varbinary(n)

io.debezium.data.Enum

enum

Types.VARCHAR

該当なし

io.debezium.data.Json

json

json

該当なし

io.debezium.data.EnumSet

set

該当なし

該当なし

io.debezium.time.Year

year(n)

該当なし

該当なし

io.debezium.time.MicroDuration

該当なし

interval

該当なし

io.debezium.data.Ltree

該当なし

ltree

該当なし

io.debezium.data.Uuid

該当なし

uuid

該当なし

io.debezium.data.Xml

該当なし

xml

xml

上記のプリミティブおよび論理マッピングに加え、変更イベントのソースが Debezium ソースコネクターの場合は、列またはデータ型の伝播を有効にすることで、列のタイプと長さ、制度、スケールの解決も操作できます。強制的に伝播を行うには、ソースコネクター設定で次のプロパティーのいずれかを設定する必要があります。

  • column.propagate.source.type
  • datatype.propagate.source.type

Debezium JDBC シンクコネクターは、より高い優先順位で値を適用します。

たとえば、次のフィールドスキーマが変更イベントに含まれているとします。

Debezium は列またはデータ型の伝播を有効にしてイベントフィールドスキーマを変更します

{
  "schema": {
    "type": "INT8",
    "parameters": {
      "__debezium.source.column.type": "TINYINT",
      "__debezium.source.column.length": "1"
    }
  }
}

前述の例では、スキーマパラメーターが設定されていない場合、Debezium JDBC シンクコネクターはこのフィールドを Types.SMALLINT の列型にマップします。Types.SMALLINT には、データベース言語に応じて、さまざまな論理データベースタイプを指定できます。MySQL の場合、例の列タイプは、長さが指定されていない TINYINT 列タイプに変換されます。列またはデータ型の伝播がソースコネクターで有効になっている場合、Debezium JDBC シンクコネクターはマッピング情報を使用してデータ型マッピングプロセスを調整し、TINYINT(1) 型の列を作成します。

注記

通常、ソースデータベースとシンクデータベースの両方に同じ種類のデータベースが使用されている場合、列またはデータ型の伝播を使用する効果は非常に大きくなります。

3.1.3. Debezium JDBC コネクターのデプロイメント

Debezium JDBC コネクターをデプロイするには、Debezium JDBC コネクターアーカイブをインストールし、コネクターを設定し、その設定を Kafka Connect に追加してコネクターを起動します。

前提条件

  • Apache ZooKeeperApache Kafka、および Kafka Connect がインストールされている。
  • 宛先データベースがインストールされ、JDBC 接続を受け入れるように設定されている。

手順

  1. Debezium JDBC connector plug-in archive をダウンロードします。
  2. ファイルを Kafka Connect 環境にデプロイメントします。
  3. 必要に応じて、Maven Central から JDBC ドライバーをダウンロードし、ダウンロードしたドライバーファイルを JDBC シンクコネクター JAR ファイルが含まれるディレクトリーに展開します。

    注記

    Oracle および Db2 用のドライバーは、JDBC シンクコネクターには含まれていません。ドライバーをダウンロードして手動でインストールする必要があります。

  4. JDBC シンクコネクターがインストールされているパスにドライバー JAR ファイルを追加します。
  5. JDBC シンクコネクターをインストールするパスが Kafka Connect plugin.path の一部であることを確認してください。
  6. Kafka Connect プロセスを再起動し、新しい JAR ファイルを取得します。

3.1.3.1. Debezium JDBC コネクターの設定

通常、Debezium JDBC コネクターを登録するには、コネクターの設定プロパティーを指定する JSON リクエストを送信します。次の例は、最も一般的な設定を使用して、orders というトピックからのイベントを消費する Debezium JDBC シンクコネクターのインスタンスを登録する JSON リクエストを示しています。

例: Debezium JDBC コネクターの設定

{
    "name": "jdbc-connector",  1
    "config": {
        "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",  2
        "tasks.max": "1",  3
        "connection.url": "jdbc:postgresql://localhost/db",  4
        "connection.username": "pguser",  5
        "connection.password": "pgpassword",  6
        "insert.mode": "upsert",  7
        "delete.enabled": "true",  8
        "primary.key.mode": "record_key",  9
        "schema.evolution": "basic",  10
        "database.time_zone": "UTC",  11
        "topics": "orders" 12
    }
}

JDBC コネクター設定の説明

項目説明

1

Kafka Connect サービスにコネクターを登録するときにコネクターに割り当てられる名前。

2

JDBC シンクコネクタークラスの名前。

3

このコネクターに作成するタスクの最大数。

4

コネクターが書き込み先のシンクデータベースへの接続に使用する JDBC URL。

5

認証に使用されるデータベースユーザーの名前。

6

認証に使用されるデータベースユーザーのパスワード。

7

コネクターが使用する insert.mode

8

データベース内のレコードの削除を有効にします。詳細は、delete.enabled 設定プロパティーを参照してください。

9

プライマリーキー列の解決に使用する方法を指定します。詳細は、primary.key.mode 設定プロパティーを参照してください。

10

コネクターが宛先データベースのスキーマを進化できるようにします。詳細は、schema.evolution 設定プロパティーを参照してください。

11

時間フィールドタイプを書き込むときに使用されるタイムゾーンを指定します。

12

消費するトピックのコンマ区切りのリスト。

Debezium JDBC コネクターに設定できる設定プロパティーの完全なリストは、JDBC コネクターのプロパティー を参照してください。

POST コマンドを使用して、この設定を実行中の Kafka Connect サービスに送信できます。サービスは設定を記録し、次の操作を実行するシンクコネクタータスクを開始します。

  • データベースに接続します。
  • サブスクライブされた Kafka トピックからのイベントを消費します。
  • 設定されたデータベースにイベントを書き込みます。

3.1.4. Debezium JDBC コネクター設定プロパティーの説明

Debezium JDBC シンクコネクターには、ニーズを満たすコネクターの動作を実現するために使用できるいくつかの設定プロパティーがあります。多くのプロパティーにはデフォルト値があります。プロパティーに関する情報は、以下のように設定されています。

表3.5 JDBC コネクターの汎用プロパティー
プロパティーデフォルト説明

name

デフォルトなし

コネクターの一意名。コネクターの登録時にこの名前を再利用しようとすると失敗します。このプロパティーはすべての Kafka Connect コネクターに必要です。

connector.class

デフォルトなし

コネクターの Java クラスの名前。Debezium JDBC コネクターの場合、io.debezium.connector.jdbc.JdbcSinkConnector の値を指定します。

tasks.max

1

このコネクターに使用するタスクの最大数。

topics

デフォルトなし

消費するトピックのコンマ区切りのリスト。このプロパティーを topic.regex プロパティーと組み合わせて使用しないでください。

topics.regex

デフォルトなし

消費するトピックを指定する正規表現。内部的には、正規表現は java.util.regex.Pattern にコンパイルされます。このプロパティーを topics プロパティーと組み合わせて使用しないでください。

表3.6 JDBC コネクター接続プロパティー
プロパティーデフォルト説明

connection.provider

org.hibernate.c3p0.internal.C3P0ConnectionProvider

使用する接続プロバイダーの実装。

connection.url

デフォルトなし

データベースへの接続に使用される JDBC 接続 URL。

connection.username

デフォルトなし

コネクターがデータベースへの接続に使用するデータベースユーザーアカウントの名前。

connection.password

デフォルトなし

コネクターがデータベースへの接続に使用するパスワード。

connection.pool.min_size

5

プール内の最小接続数を指定します。

connection.pool.max_size

32

プールが維持する同時接続の最大数を指定します。

connection.pool.acquire_increment

32

接続プールが最大サイズを超えた場合にコネクターが取得を試みる接続の数を指定します。

connection.pool.timeout

1800

未使用の接続が破棄されるまで保持する時間 (秒) を指定します。

表3.7 JDBC コネクターのランタイムプロパティー
プロパティーデフォルト説明

database.time_zone

UTC

JDBC 時間値を挿入するときに使用するタイムゾーンを指定します。

delete.enabled

false

コネクターが DELETE イベントまたは tombstone イベントを処理し、対応する行をデータベースから削除するかどうかを指定します。このオプションを使用するには、primary.key.moderecord.key に設定する必要があります。

truncate.enabled

false

コネクターが TRUNCATE イベントを処理し、データベースから対応するテーブルを切り捨てるかどうかを指定します。

注記

TRUNCATE ステートメントのサポートは Db2 バージョン 9.7 以降で利用可能ですが、現在、JDBC コネクターは、Db2 コネクターが出力する標準の TRUNCATE イベントを処理できません。

JDBC コネクターが Db2 から受信した TRUNCATE イベントを処理できるようにするには、標準の TRUNCATE TABLE ステートメントの代替手段を使用して切り捨てを実行します。以下に例を示します。

ALTER TABLE <table_name> ACTIVATE NOT LOGGED INITIALLY WITH EMPTY TABLE

上記のクエリーを送信するユーザーアカウントには、切り捨てられるテーブルに対する ALTER 権限が必要です。

insert.mode

insert

イベントのデータベースへの挿入に使用するストラテジーを指定します。以下のオプションが利用できます。

insert
すべてのイベントが INSERT ベースの SQL ステートメントを構築するように指定します。このオプションは、プライマリーキーが使用されていない場合、または既存のプライマリーキー値を持つ行に対して更新が行われない場合にのみ使用してください。
update
すべてのイベントが UPDATE ベースの SQL ステートメントを構築することを指定します。このオプションは、コネクターが既存の行に適用されるイベントのみを受信する場合にのみ使用してください。
upsert
コネクターが upsert セマンティクスを使用してテーブルにイベントを追加することを指定します。つまり、プライマリーキーが存在しない場合、コネクターは INSERT 操作を実行し、キーが存在する場合、コネクターは UPDATE 操作を実行します。べき等書き込みが必要な場合は、このオプションを使用するようにコネクターを設定する必要があります。

primary.key.mode

none

コネクターがイベントからプライマリーキー列を解決する方法を指定します。

none
プライマリーキー列を作成しないことを指定します。
kafka

コネクターがプライマリーキー列として Kafka 座標を使用することを指定します。キー座標はトピック名、パーティション、イベントのオフセットから定義され、次の名前の列にマップされます。

  • __connect_topic
  • __connect_partition
  • __connect_offset
record_key
プライマリーキー列がイベントのレコードキーから取得されることを指定します。レコードキーがプリミティブ型の場合、primary.key.fields プロパティーでプライマリーキー列の名前を指定する必要があります。レコードキーが struct 型の場合、primary.key.fields プロパティーはオプションで、イベントのキーからの列のサブセットをテーブルのプライマリーキーとして指定するために使用できます。
record_value
プライマリーキー列がイベントの値から取得されることを指定します。primary.key.fields プロパティーを設定して、イベントの値のフィールドのサブセットとしてプライマリーキーを定義できます。それ以外の場合は、すべてのフィールドがデフォルトで使用されます。

primary.key.fields

デフォルトなし

プライマリーキー列の名前、またはプライマリーキーの導出元となるフィールドのコンマ区切りリスト。

primary.key.moderecord_key に設定されており、イベントのキーがプリミティブ型である場合、このプロパティーではキーに使用される列名を指定する必要があります。

primary.key.mode が、プリミティブ以外のキーを含む record_key または record_value に設定されている場合、このプロパティーにはキーまたは値のフィールド名のコンマ区切りリストを指定する必要があります。primary.key.mode が、プリミティブキー以外のキーを含む record_key または record_value に設定されており、このプロパティーが指定されていない場合、コネクターは、指定されたモードに応じて、レコードキーまたはレコード値のいずれかの全フィールドからプライマリーキーを導出します。

quote.identifiers

false

生成された SQL ステートメントでテーブル名と列名を区切るために引用符を使用するかどうかを指定します。詳細は 「宛先テーブル名と列名の大文字と小文字を定義するオプションの指定」 のセクションを参照してください。

schema.evolution

none

コネクターが宛先テーブルのスキーマを進化させる方法を指定します。詳細は、「Debezium JDBC コネクターのスキーマ進化モード」 を参照してください。以下のオプションが利用できます。

none
コネクターが宛先スキーマを進化させないことを指定します。
基本
基本進化が発生することを指定します。コネクターは、受信イベントのレコードスキーマをデータベーステーブル構造と比較することにより、欠落している列をテーブルに追加します。

table.name.format

${topic}

コネクターが宛先テーブルの名前を構築するために使用する文字列パターンを指定します。
プロパティーがデフォルト値 ${topic} に設定されている場合、コネクターは Kafka からイベントを読み取った後に、ソーストピックの名前と一致する宛先テーブルにイベントレコードを書き込みます。

このプロパティーを設定して、受信イベントレコード内の特定のフィールドから値を抽出し、それらの値を使用してターゲットテーブルの名前を動的に生成することもできます。メッセージソースの値からテーブル名を生成するこの機能を使用するには、カスタム Kafka Connect 単一メッセージ変換 (SMT) を使用する必要があります。

宛先テーブルの名前を動的に生成するようにプロパティーを設定するには、この値を ${source._field_} などのパターンに設定します。このタイプのパターンを指定すると、コネクターは Debezium 変更イベントの source ブロックから値を抽出し、その値を使用してテーブル名を構築します。たとえば、プロパティーの値を ${source.schema}_${source.table} のパターンに設定できます。このパターンに基づいて、コネクターがソースブロックの schema フィールドに値 user が含まれ、table フィールドに tab の値が含まれるイベントを読み取ると、コネクターはイベントレコードを user_tab という名前のテーブルに書き込みます。

dialect.postgres.postgis.schema

public

PostgreSQL PostGIS エクステンションがインストールされているスキーマ名を指定します。デフォルトは public です。ただし、PostGIS エクステンションが別のスキーマにインストールされている場合は、このプロパティーを使用して代替スキーマ名を指定する必要があります。

dialect.sqlserver.identity.insert

false

SQL Server テーブルの identity 列への INSERT または UPSERT 操作の前にコネクターが IDENTITY_INSERT を自動的に設定し、操作の直後にそれを設定解除するかどうかを指定します。デフォルト設定 (false) が有効な場合、テーブルの IDENTITY 列への INSERT または UPSERT 操作によって SQL 例外が発生します。

batch.size

500

宛先テーブルにまとめてバッチ処理するレコードの数を指定します。

注記

Connect ワーカープロパティーの consumer.max.poll.recordsbatch.size より小さい値に設定すると、バッチ処理は consumer.max.poll.records によって制限され、目的の batch.size に到達しないことに注意してください。コネクター設定で consumer.override.max.poll.records を使用して、コネクターの基盤となるコンシューマーの max.poll.records を設定することもできます。

use.reduction.buffer

false

Debezium JDBC コネクターの削減バッファーを有効にするかどうかを指定します。

次のいずれかの設定を選択します。

false
(デフォルト) コネクターは、Kafka から消費する各変更イベントを個別の論理 SQL 変更として書き込みます。
true
コネクターは、変更イベントをシンクデータベースに書き込む前に、削減バッファーを使用して変更イベントを削減します。つまり、複数のイベントが同じプライマリーキーを参照する場合、コネクターは SQL クエリーを統合し、最新のオフセットレコードで報告された行の状態に基づいて、論理 SQL 変更を 1 つのみ書き込みます。
ターゲットデータベースの SQL 負荷を軽減するには、このオプションを選択します。

削減バッファーが有効になっている場合に PostgreSQL シンクデータベースでのクエリー処理を最適化するには、JDBC 接続 URL に reWriteBatchedInserts パラメーターを追加して、データベースがバッチクエリーを実行できるようにする必要があります。

field.include.list

空の文字列

変更イベント値から追加するフィールドの完全修飾名と一致するフィールド名の、オプションのコンマ区切りリスト。フィールドの完全修飾名の形式は、fieldName または topicName:_fieldName_ です。

このプロパティーを設定に含める場合は、field.exclude.list プロパティーを設定しないでください。

field.exclude.list

空の文字列

変更イベント値から除外するフィールドの完全修飾名と一致するフィールド名の、オプションのコンマ区切りリスト。フィールドの完全修飾名の形式は、fieldName または topicName:_fieldName_ です。

このプロパティーを設定に含める場合は、field.include.list プロパティーも設定しないでください。

表3.8 JDBC コネクターの拡張可能なプロパティー
プロパティーデフォルト説明

column.naming.strategy

i.d.c.j.n.DefaultColumnNamingStrategy

コネクターがイベントフィールド名から列名を解決するために使用する ColumnNamingStrategy 実装の完全修飾クラス名を指定します。

デフォルトでは、コネクターはフィールド名を列名として使用します。

table.naming.strategy

i.d.c.j.n.DefaultTableNamingStrategy

コネクターが受信イベントトピック名からテーブル名を解決するために使用する TableNamingStrategy 実装の完全修飾クラス名を指定します。

デフォルトの動作は次のとおりです。

  • table.name.format 設定プロパティーの ${topic} プレースホルダーをイベントのトピックに置き換えます。
  • ドット (.) は、アンダースコア (_) に置き換えて、テーブル名をサニタイズします。

JDBC コネクター hibernate.* パススループロパティー

Kafka Connect はパススルー設定をサポートしており、コネクター設定から特定のプロパティーを直接渡すことで、基盤となるシステムの動作を変更できます。デフォルトでは、一部の Hibernate プロパティーは、JDBC コネクター 接続プロパティー (connection.urlconnection.usernameconnection.pool.*_size など) およびコネクターの ランタイムプロパティー (database.time_zonequote.identifiers など) を通じて公開されます。

他の Hibernate の動作をカスタマイズする場合は、hibernate.* 名前空間を使用するプロパティーをコネクター設定に追加することで、パススルーのメカニズムを利用できます。たとえば、Hibernate がターゲットデータベースのタイプとバージョンを解決できるようにするには、hibernate.dialect プロパティーを追加し、それをデータベースの完全修飾クラス名 (例: org.hibernate.dialect.MariaDBDialect) に設定します。

3.1.5. JDBC コネクターのよくある質問

ExtractNewRecordState 単一メッセージ変換は必要ですか?
いいえ、実際には、Debezium JDBC コネクターを他の実装と区別する要素の 1 つです。このコネクターは、競合するコネクターと同様にフラット化されたイベントを取り込むことができますが、特定の種類の変換を必要とせずに、Debezium の複雑な変更イベント構造をネイティブに取り込むこともできます。
列の型が変更された場合、または列の名前が変更または削除された場合、これはスキーマの進化によって処理されますか?
いいえ、Debezium JDBC コネクターは既存の列に変更を加えません。コネクターによってサポートされるスキーマの進化は非常に基本的なものです。これは、イベント構造内のフィールドをテーブルの列リストと単純に比較し、まだ列として定義されていないフィールドをテーブルに追加します。列のタイプまたはデフォルト値が変更された場合、コネクターは宛先データベースでの調整は行いません。列の名前が変更された場合、古い列はそのまま残され、コネクターは新しい名前の列をテーブルに追加します。ただし、古い列のデータを含む既存の行は変更されません。このようなタイプのスキーマ変更は手動で処理する必要があります。
列の型が希望する型に解決されない場合、別のデータ型に強制的にマッピングするにはどうすればよいですか?
Debezium JDBC コネクターは、高度な型システムを使用して列のデータ型を解決します。この型システムが特定のフィールドのスキーマ定義を JDBC 型に解決する方法の詳細は、「Debezium JDBC コネクターのデータと列タイプのマッピングの説明」 セクションを参照してください。別のデータ型マッピングを適用する場合は、テーブルを手動で定義して、優先される列の型を明示的に取得します。
Kafka トピック名を変更せずに、テーブル名に接頭辞または接尾辞を指定するにはどうすればよいですか?
宛先テーブル名に接頭辞または接尾辞を追加するには、table.name.format コネクター設定プロパティーを調整して、必要な接頭辞または接尾辞を適用します。たとえば、すべてのテーブル名に jdbc_ という接頭辞を付けるには、table.name.format 設定プロパティーに jdbc_${topic} の値を指定します。コネクターが order というトピックにサブスクライブされている場合、結果のテーブルは jdbc_orders として作成されます。
識別子の引用符が有効になっていないにもかかわらず、一部の列が自動的に引用符で囲まれるのはなぜですか?
状況によっては、quote.identifiers が有効になっていない場合でも、特定の列名またはテーブル名が明示的に引用符で囲まれることがあります。これは、列名またはテーブル名が、不正な構文とみなされる特定の規則で始まるか、またはそのような規則が使用されている場合に引用符が必要になることがよくあります。たとえば、primary.key.modekafka に設定されている場合、一部のデータベースでは、列名が引用符で囲まれている場合にのみ、列名をアンダースコアで始めることができます。引用の動作は方言固有であり、データベースの種類によって異なります。
Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.