第3章 シンクコネクター
Debezium は、Apache Kafka トピックなど、ソースからイベントを消費できるシンクコネクターを提供します。シンクコネクターはデータの形式を標準化し、設定されたシンクリポジトリーにイベントデータを保存します。その後、他のシステム、アプリケーション、またはユーザーはデータシンクからイベントにアクセスできるようになります。
シンクコネクターは、消費するイベントデータに一貫した構造を適用するため、データシンクから読み取るダウンストリームのアプリケーションは、そのデータを簡単に解釈して処理できます。
現在、Debezium は以下のシンクコネクターを提供しています。
3.1. JDBC 用の Debezium コネクター リンクのコピーリンクがクリップボードにコピーされました!
Debezium JDBC コネクターは、複数のソーストピックからイベントを消費し、JDBC ドライバーを使用してそれらのイベントをリレーショナルデータベースに書き込むことができる Kafka Connect sink コネクター実装です。このコネクターは、Db2、MySQL、Oracle、PostgreSQL、SQL Server などのさまざまなデータベース言語をサポートします。
3.1.1. Debezium JDBC コネクターの仕組み リンクのコピーリンクがクリップボードにコピーされました!
Debezium JDBC コネクターは Kafka Connect sink コネクターであるため、Kafka Connect ランタイムが必要です。コネクターは、サブスクライブしている Kafka トピックを定期的にポーリングし、それらのトピックからのイベントを消費して、設定されたリレーショナルデータベースにイベントを書き込みます。コネクターは、upsert セマンティクスと基本的なスキーマの進化を使用して、べき等書き込み操作をサポートします。
Debezium JDBC コネクターは次の機能を提供します。
3.1.1.1. Debezium JDBC コネクターが複雑な変更イベントをどのように消費するかの説明 リンクのコピーリンクがクリップボードにコピーされました!
デフォルトでは、Debezium ソースコネクターは複雑で、階層分けされた変更イベントを生成します。Debezium コネクターを他の JDBC sink コネクター実装とともに使用する場合、変更イベントのペイロードを sink 実装で使用できるように、ExtractNewRecordState
シングルメッセージ変換 (SMT) を適用する必要がある場合があります。Debezium JDBC sink コネクターを実行する場合、Debezium sink コネクターは変換を使用せずにネイティブの Debezium 変更イベントを直接使用できるため、SMT をデプロイする必要はありません。
JDBC sink コネクターが Debezium ソースコネクターから複雑な変更イベントを消費するとき、元の insert
または update
イベントの after
セクションから値を抽出します。削除イベントがシンクコネクターによって消費されるとき、イベントのペイロードの一部は参照されません。
Debezium JDBC sink コネクターは、スキーマ変更トピックから読み取るように設計されていません。ソースコネクターがスキーマ変更をキャプチャーするように設定されている場合は、コネクターがスキーマ変更トピックを消費しないように、JDBC コネクター設定で topics
または topic.regex
プロパティーを設定します。
3.1.1.2. Debezium JDBC コネクターの at-least-once 配信の説明 リンクのコピーリンクがクリップボードにコピーされました!
Debezium JDBC sink コネクターは、Kafka トピックから消費されるイベントが少なくとも 1 回は処理されるようにします。
3.1.1.3. Debezium JDBC の複数タスクの使用の説明 リンクのコピーリンクがクリップボードにコピーされました!
Debezium JDBC sink コネクターは、複数の Kafka Connect タスクにわたって実行できます。複数のタスクに対してコネクターを実行するには、tasks.max
設定プロパティーでコネクターで使用するタスクの数を設定します。Kafka Connect ランタイムは、指定された数のタスクを開始し、タスクごとにコネクターの 1 つのインスタンスを実行します。複数のタスクの場合は、複数のソーストピックからの変更を並行して読み取り処理することでパフォーマンスを向上させます。
3.1.1.4. Debezium JDBC コネクターのデータと列タイプのマッピングの説明 リンクのコピーリンクがクリップボードにコピーされました!
Debezium JDBC sink コネクターが受信メッセージフィールドのデータ型を送信メッセージフィールドに正しくマップできるようにするには、コネクターはソースイベントに存在する各フィールドのデータ型に関する情報を必要とします。コネクターは、さまざまなデータベース言語にわたる幅広い列タイプのマッピングをサポートします。イベントフィールドの type
メタデータから宛先列の型を正しく変換するために、コネクターはソースデータベースに定義されているデータ型マッピングを適用します。ソースコネクター設定で column.propagate.source.type
または datatype.propagate.source.type
オプションを設定して、コネクターが列のデータ型を解決する方法を強化できます。これらのオプションを有効にすると、Debezium には追加のパラメーターメタデータが含まれ、JDBC sink コネクターが宛先列のデータ型をより正確に解決できるようになります。
Debezium JDBC sink コネクターが Kafka トピックからのイベントを処理するには、Kafka トピックメッセージキー (存在する場合) がプリミティブデータ型または Struct
である必要があります。さらに、ソースメッセージのペイロードの Struct
は、ネストされた Struct
タイプのないフラットな構造、または Debezium の複雑な階層構造に準拠したネストされた Struct
レイアウトのいずれかを含む必要があります。
Kafka トピック内のイベントの構造がこれらのルールに準拠していない場合は、カスタムの単一メッセージ変換を実装して、ソースイベントの構造を使用可能な形式に変換する必要があります。
3.1.1.5. Debezium JDBC コネクターがソースイベントのプライマリーキーを処理する方法の説明 リンクのコピーリンクがクリップボードにコピーされました!
デフォルトでは、Debezium JDBC sink コネクターは、ソースイベントのフィールドをイベントのプライマリーキーに変換しません。ビジネス要件によっては、またはシンクコネクターが upsert セマンティクスを使用する場合は、安定したプライマリーキーがないとイベント処理が複雑になる可能性があります。一貫したプライマリーキーを定義するには、次の表に示すプライマリーキーモードのいずれかを使用するようにコネクターを設定できます。
モード | 説明 |
---|---|
| テーブルの作成時にプライマリーキーフィールドが指定されていません。 |
| プライマリーキーは次の 3 つの列で構成されます。
これらの列の値は、Kafka イベントの座標から取得されます。 |
|
プライマリーキーは、Kafka イベントのキーで構成されます。 |
|
プライマリーキーは、Kafka イベントの値で構成されます。 |
|
プライマリーキーは、Kafka イベントのヘッダーで構成されます。 |
一部のデータベース言語では、primary.key.mode
を kafka
に設定し、schema.evolution
を Basic
に設定すると例外が発生する場合があります。この例外は、方言が STRING
データ型マッピングを TEXT
や CLOB
などの可変長文字列データ型にマップし、さらにこの方言で、プライマリー列に制限なく長さを指定できない場合に発生します。この問題を回避するには、環境に次の設定を適用します。
-
schema.evolution
をBasic
に設定しないでください。 - データベースのテーブルとプライマリーキーのマッピングを事前に作成します。
列がターゲットデータベースのプライマリーキーとして許可されていないデータ型にマップされる場合、primary.key.fields
にそのような列を除外する明示的な列リストが必要になります。許可されるデータ型と許可されないデータ型については、特定のデータベースベンダーのドキュメントを参照してください。
3.1.1.6. DELETE イベントまたは tombstone イベントの使用時に行を削除するような Debezium JDBC コネクターの設定 リンクのコピーリンクがクリップボードにコピーされました!
Debezium JDBC sink コネクターは、DELETE
または tombstone イベントが消費されると、宛先データベース内の行を削除できます。デフォルトでは、JDBC sink コネクターにより削除モードは有効になりません。
コネクターで行を削除する場合は、コネクター設定で明示的に delete.enabled=true
を設定する必要があります。このモードを使用するには、primary.key.fields
を none
以外の値に設定する必要もあります。削除はプライマリーキーマッピングに基づいて実行されるため、前述の設定が必要です。そのため、宛先テーブルにプライマリーキーマッピングがない場合、コネクターは行を削除できません。
3.1.1.7. コネクターによるべき等書き込みの実行の有効化 リンクのコピーリンクがクリップボードにコピーされました!
Debezium JDBC sink コネクターはべき等書き込みを実行できるため、同じレコードを繰り返し再生し、データベースの最終状態を変更できません。
コネクターがべき等書き込みを実行できるようにするには、コネクターの insert.mode
を明示的に upsert
に設定する必要があります。upsert
操作は、指定されたプライマリーキーがすでに存在するかどうかに応じて、update
または insert
として適用されます。
プライマリーキー値がすでに存在する場合、この操作により行内の値が更新されます。指定されたプライマリーキー値が存在しない場合、insert
で新しい行が追加されます。
upsert 操作には SQL 標準がないため、各データベース方言はべき等書き込みを異なる方法で処理します。次の表は、Debezium がサポートするデータベース言語の upsert
の DML 構文を示しています。
方言 | Upsert 構文 |
---|---|
Db2 |
|
MySQL |
|
Oracle |
|
PostgreSQL |
|
SQL Server |
|
3.1.1.8. Debezium JDBC コネクターのスキーマ進化モード リンクのコピーリンクがクリップボードにコピーされました!
Debezium JDBC sink コネクターでは、次のスキーマ進化モードを使用できます。
モード | 説明 |
---|---|
| コネクターは、DDL スキーマの進化を実行しません。 |
| コネクターは、イベントペイロード内にあるが宛先テーブルには存在しないフィールドを自動的に検出します。コネクターは宛先テーブルを変更して新しいフィールドを追加します。 |
schema.evolution
が Basic
に設定されている場合、コネクターは受信イベントの構造に従って宛先データベーステーブルを自動的に作成または変更します。
トピックから初めてイベントを受信し、宛先テーブルがまだ存在しない場合、Debezium JDBC sink コネクターはイベントのキー、またはレコードのスキーマ構造を使用してテーブルの列構造を解決します。スキーマの進化が有効な場合、コネクターは、DML イベントを宛先テーブルに適用する前に、CREATE TABLE
SQL ステートメントを準備して実行します。
Debezium JDBC コネクターがトピックからイベントを受信するとき、レコードのスキーマ構造が宛先テーブルのスキーマ構造と異なる場合、コネクターはイベントのキーまたはそのスキーマ構造を使用してどの列が新しいかを識別し、データベーステーブルに追加する必要があります。スキーマの進化が有効な場合、コネクターは、宛先テーブルに DML イベントを適用する前に、ALTER TABLE
SQL ステートメントを準備して実行します。列のデータ型の変更、列の削除、およびプライマリーキーの調整は危険な操作であると考えられるため、コネクターではこれらの操作の実行が禁止されています。
各フィールドのスキーマによって、列が NULL
か NOT NULL
かが決まります。スキーマは各列のデフォルト値も定義します。コネクターが NULL を指定できるかどうかの設定、または必要のないデフォルト値を含めてテーブルを作成しようとしている場合には、事前に手動でテーブルを作成するか、sink コネクターがイベントを処理する前に関連のフィールドのスキーマを調節する必要があります。NULL を指定できるかどうかの設定またはデフォルト値を調整するには、パイプライン内の変更を適用するか、ソースデータベースで定義されている列の状態を変更するカスタムの単一メッセージ変換を導入できます。
フィールドのデータ型は、事前定義されたマッピングのセットに基づいて解決されます。詳細は、JDBC フィールドタイプ を参照してください。
宛先データベースにすでに存在するテーブルのイベント構造に新しいフィールドを導入する場合は、新しいフィールドをオプションとして定義するか、フィールドのデフォルト値がデータベーススキーマで指定されている必要があります。フィールドを宛先テーブルから削除する場合は、次のいずれかのオプションを使用します。
- フィールドを手動で削除します。
- 列をドロップします。
- フィールドにデフォルト値を割り当てます。
- フィールドを、NULL を指定できるフィールドとして定義します。
3.1.1.9. 宛先テーブル名と列名の大文字と小文字を定義するオプションの指定 リンクのコピーリンクがクリップボードにコピーされました!
Debezium JDBC sink コネクターは、宛先データベースで実行される DDL (スキーマ変更) または DML (データ変更) SQL ステートメントを構築することによって、Kafka メッセージを消費します。デフォルトでは、コネクターはソーストピックの名前とイベントフィールドの名前を、宛先テーブルのテーブル名と列名の基礎として使用します。構築された SQL では、元の文字列の大文字と小文字を保持するために、識別子が引用符で自動的に区切られることはありません。その結果、デフォルトでは、宛先データベース内のテーブル名または列名のテキストの大文字と小文字の区別については、大文字と小文字が指定されていない場合にデータベースが名前文字列を処理する方法に完全に依存します。
たとえば、宛先データベースの言語が Oracle で、イベントのトピックが order
である場合、名前が引用符で囲まれていない場合、Oracle ではデフォルトで大文字の名前が使用されるため、宛先テーブルは ORDERS
として作成されます。同様に、宛先データベース言語が PostgreSQL で、イベントのトピックが ORDERS
である場合、名前が引用符で囲まれていない場合、PostgreSQL はデフォルトで小文字の名前を使用するため、宛先テーブルは order
として作成されます。
Kafka イベントに存在するテーブル名とフィールド名の大文字と小文字を明示的に保持するには、コネクター設定で quote.identifiers
プロパティーの値を true
に設定します。このオプションが設定されている場合、受信イベントが order
というトピックのもので、宛先データベース言語が Oracle である場合、コネクターは、構築された SQL でテーブルの名前が "orders"
として定義されているため、orders
という名前のテーブルを作成します。引用符を有効にすると、コネクターが列名を作成するときと同じ動作になります。
接続アイドルタイムアウト
Debezium の JDBC sink コネクターは、接続プールを活用してパフォーマンスを向上させます。接続プールは、初期の接続セットを確立し、指定された数の接続を維持し、必要に応じてアプリケーションに効率的に接続を割り当てるように設計されています。ただし、接続がプール内でアイドル状態のままになると問題が発生し、データベースに設定されたアイドルタイムアウトしきい値を超えて、非アクティブの状態が続くと、タイムアウトがトリガーされる可能性があります。
アイドル接続スレッドがタイムアウトをトリガーする可能性を軽減するために、接続プールは各接続のアクティビティーを定期的に検証するメカニズムを提供します。この検証により、接続がアクティブの状態を確保でき、データベースが接続をアイドル状態としてフラグ付けされないようにします。ネットワークが中断された場合に、Debezium が終了した接続を使用しようとすると、コネクターはプールに対して新しい接続を生成するよう要求します。
デフォルトでは、Debezium JDBC sink コネクターはアイドルタイムアウトテストを実行しません。ただし、hibernate.c3p0.idle_test_period
プロパティーを設定することで、コネクターがプールに対して、指定の間隔でタイムアウトテストを実行する要求を行うように設定できます。以下に例を示します。
タイムアウト設定の例
{ "hibernate.c3p0.idle_test_period": "300" }
{
"hibernate.c3p0.idle_test_period": "300"
}
Debezium JDBC sink コネクターは、Hibernate C3P0 接続プールを使用します。hibernate.c3p0.*` 設定名前空間でプロパティーを設定することで、CP30 接続プールをカスタマイズできます。上記の例では、hibernate.c3p0.idle_test_period プロパティーの設定により、接続プールが 300 秒ごとにアイドルタイムアウトテストを実行するように設定されます。設定を適用すると、接続プールは 5 分ごとに未使用の接続の評価を開始します。
3.1.2. Debezium JDBC コネクターがデータ型をマップする方法 リンクのコピーリンクがクリップボードにコピーされました!
Debezium JDBC sink コネクターは、論理またはプリミティブ型マッピングシステムを使用して列のデータ型を解決します。プリミティブ型には、整数、浮動小数点、ブール値、文字列、バイトなどの値が含まれます。通常、これらの型は、特定の Kafka Connect Schema
型コードのみで表されます。論理データ型は、フィールド名とスキーマの固定セットが含まれる Struct
ベースの型などの値、またはエポックからの日数など、特定のエンコーディングで表される値を含むなど、複雑な型であることがよくあります。
次の例は、プリミティブデータ型と論理データ型の代表的な構造を示しています。
プリミティブフィールドスキーマ
{ "schema": { "type": "INT64" } }
{
"schema": {
"type": "INT64"
}
}
論理フィールドスキーマ
Kafka Connect だけが、これらの複雑な論理型のソースというわけではありません。実際、Debezium ソースコネクターは、タイムスタンプ、日付、さらには JSON データなど、さまざまなデータ型を表す同様の論理型を含むフィールドを持つ変更イベントを生成します。
Debezium JDBC sink コネクターは、このようなプリミティブおよび論理型を仕様して JDBC SQL コード (列の方を表す) に列の方を解決します。これらの JDBC SQL コードは、基礎となる Hibernate 永続フレームワークによって使用され、列の型を使用中の方言の論理データ型に解決します。次の表は、Kafka Connect と JDBC SQL 型の間、および Debezium と JDBC SQL 型の間の基本マッピングと論理マッピングを示しています。実際の最終的な列のタイプは、データベースのタイプごとに異なります。
プリミティブ型 | JDBC SQL 型 |
---|---|
INT8 | Types.TINYINT |
INT16 | Types.SMALLINT |
INT32 | Types.INTEGER |
INT64 | Types.BIGINT |
FLOAT32 | Types.FLOAT |
FLOAT64 | Types.DOUBLE |
BOOLEAN | Types.BOOLEAN |
STRING | Types.CHAR、Types.NCHAR、Types.VARCHAR、Types.NVARCHAR |
BYTES | Types.VARBINARY |
論理型 | JDBC SQL 型 |
---|---|
org.apache.kafka.connect.data.Decimal | Types.DECIMAL |
org.apache.kafka.connect.data.Date | Types.DATE |
org.apache.kafka.connect.data.Time | Types.TIMESTAMP |
org.apache.kafka.connect.data.Timestamp | Types.TIMESTAMP |
論理型 | JDBC SQL 型 |
---|---|
io.debezium.time.Date | Types.DATE |
io.debezium.time.Time | Types.TIMESTAMP |
io.debezium.time.MicroTime | Types.TIMESTAMP |
io.debezium.time.NanoTime | Types.TIMESTAMP |
io.debezium.time.ZonedTime | Types.TIME_WITH_TIMEZONE |
io.debezium.time.Timestamp | Types.TIMESTAMP |
io.debezium.time.MicroTimestamp | Types.TIMESTAMP |
io.debezium.time.NanoTimestamp | Types.TIMESTAMP |
io.debezium.time.ZonedTimestamp | Types.TIMESTAMP_WITH_TIMEZONE |
io.debezium.data.VariableScaleDecimal | Types.DOUBLE |
データベースがタイムゾーンのある時刻またはタイムスタンプをサポートしていない場合、マッピングはタイムゾーンなしの同等のものに解決されます。
論理型 | MySQL SQL 型 | PostgreSQL SQL 型 | SQL Server SQL 型 |
---|---|---|---|
io.debezium.data.Bits |
|
|
|
io.debezium.data.Enum |
| Types.VARCHAR | 該当なし |
io.debezium.data.Json |
|
| 該当なし |
io.debezium.data.EnumSet |
| 該当なし | 該当なし |
io.debezium.time.Year |
| 該当なし | 該当なし |
io.debezium.time.MicroDuration | 該当なし |
| 該当なし |
io.debezium.data.Ltree | 該当なし |
| 該当なし |
io.debezium.data.Uuid | 該当なし |
| 該当なし |
io.debezium.data.Xml | 該当なし |
|
|
上記のプリミティブおよび論理マッピングに加え、変更イベントのソースが Debezium ソースコネクターの場合は、列またはデータ型の伝播を有効にすることで、列のタイプと長さ、制度、スケールの解決も操作できます。強制的に伝播を行うには、ソースコネクター設定で次のプロパティーのいずれかを設定する必要があります。
-
column.propagate.source.type
-
datatype.propagate.source.type
Debezium JDBC シンクコネクターは、より高い優先順位で値を適用します。
たとえば、次のフィールドスキーマが変更イベントに含まれているとします。
Debezium は列またはデータ型の伝播を有効にしてイベントフィールドスキーマを変更します
前述の例では、スキーマパラメーターが設定されていない場合、Debezium JDBC シンクコネクターはこのフィールドを Types.SMALLINT
の列型にマップします。Types.SMALLINT
には、データベース言語に応じて、さまざまな論理データベースタイプを指定できます。MySQL の場合、例の列タイプは、長さが指定されていない TINYINT
列タイプに変換されます。列またはデータ型の伝播がソースコネクターで有効になっている場合、Debezium JDBC シンクコネクターはマッピング情報を使用してデータ型マッピングプロセスを調整し、TINYINT(1)
型の列を作成します。
通常、ソースデータベースとシンクデータベースの両方に同じ種類のデータベースが使用されている場合、列またはデータ型の伝播を使用する効果は非常に大きくなります。
3.1.3. Debezium JDBC コネクターのデプロイメント リンクのコピーリンクがクリップボードにコピーされました!
以下の方法のいずれかを使用して Debezium JDBC コネクターをデプロイできます。
Streams for Apache Kafka を使用して、コネクタープラグインを含むイメージを自動的に作成します。
以下は推奨される方法です。
-
Containerfile からカスタム Kafka Connect コンテナーイメージを構築します。
この Containerfile デプロイメント方法は非推奨となりました。この方法の手順は、ドキュメントの今後のバージョンで削除される予定です。
ライセンス要件により、Debezium JDBC コネクターアーカイブには、Debezium が Db2 および Oracle データベースに接続するために必要なドライバーは含まれていません。コネクターがこれらのデータベースにアクセスできるようにするには、コネクター環境にドライバーを追加する必要があります。コネクターに付属していないドライバーを入手する方法については、コネクターアーカイブに含まれていないドライバーの入手 を参照してください。
3.1.3.1. コネクターアーカイブに含まれていない JDBC ドライバーの取得 リンクのコピーリンクがクリップボードにコピーされました!
ライセンス要件により、Debezium が Db2 データベースおよび Oracle データベースに接続するために必要な JDBC ドライバーファイルは、Debezium JDBC コネクターアーカイブに含まれていません。これらのドライバーは、Maven Central からダウンロードできます。使用するデプロイメント方法に応じて、次のいずれかの方法を使用してドライバーを取得できます。
- Streams for Apache Kafka を使用して、Kafka Connect イメージにコネクターを追加します。
-
「Streams for Apache Kafka を使用した Debezium JDBC コネクターのデプロイ」 で示すように、
KafkaConnect
カスタムリソースのbuilds.plugins.artifact.url
にドライバーの Maven Central の場所を追加します。 - Containerfile を使用してコネクターのコンテナーイメージを構築します。
-
Containerfile に、Maven Central からドライバーファイルをダウンロードするための URL を指定する
curl
コマンドを挿入します。詳細は、「Containerfile からカスタム Kafka Connect コンテナーイメージを構築して Debezium JDBC コネクターをデプロイする」 を参照してください。
3.1.3.2. Streams for Apache Kafka を使用した JDBC コネクターのデプロイメント リンクのコピーリンクがクリップボードにコピーされました!
Debezium コネクターのデプロイで推奨される方法は、Streams for Apache Kafka を使用して、コネクタープラグインを含む Kafka Connect コンテナーイメージを構築することです。
デプロイメントプロセス中に、以下のカスタムリソース (CR) を作成し、使用します。
-
Kafka Connect インスタンスを定義し、コネクターアーティファクトに関する情報をイメージに含める必要がある
KafkaConnect
CR。 -
コネクターがソースデータベースにアクセスするために使用する情報を提供する
KafkaConnector
CR。Streams for Apache Kafka が Kafka Connect Pod を起動した後、KafkaConnector
CR を適用してコネクターを起動します。
Kafka Connect イメージのビルド仕様では、デプロイ可能なコネクターを指定できます。各コネクタープラグインに対して、デプロイメントに利用可能にする他のコンポーネントを指定することもできます。たとえば、Apicurio Registry アーティファクトや Debezium スクリプトコンポーネントを追加できます。Streams for Apache Kafka は、Kafka Connect イメージをビルドするときに、指定されたアーティファクトをダウンロードし、それをイメージに組み込みます。
Kafka Connect CR の spec.build.output
パラメーターは、生成される KafkaConnect
コンテナーイメージを格納する場所を指定します。コンテナーイメージは、quay.io などのコンテナーレジストリー、または OpenShift ImageStream に保存できます。イメージを ImageStream に保存するには、Kafka Connect をデプロイする前に ImageStream を作成する必要があります。ImageStreams は自動的に作成されません。
KafkaConnect
リソースを使用してクラスターを作成する場合は、Kafka Connect REST API を使用してコネクターを作成または更新できません。ただし、REST API を使用して情報を取得できます。
関連情報
- 「Streams for Apache Kafka on OpenShift のデプロイと管理」の Kafka Connect の設定
- 「Streams for Apache Kafka on OpenShift のデプロイと管理」の 新しいコンテナーイメージの自動ビルド
3.1.3.3. Streams for Apache Kafka を使用した Debezium JDBC コネクターのデプロイ リンクのコピーリンクがクリップボードにコピーされました!
Streams for Apache Kafka のビルド設定を使用して、Kafka Connect コンテナーイメージを OpenShift に自動的にビルドできます。ビルドイメージには、指定する Debezium コネクタープラグインが含まれます。
ビルドプロセス中に、Streams for Apache Kafka Operator は、Debezium コネクター定義を含む KafkaConnect
カスタムリソースの入力パラメーターを Kafka Connect コンテナーイメージに変換します。このビルドは、Red Hat Maven リポジトリーまたは別の設定済みの HTTP サーバーから、必要なアーティファクトをダウンロードします。
新規に作成されたコンテナーは .spec.build.output
に指定されるコンテナーレジストリーにプッシュされ、Kafka Connect クラスターのデプロイに使用されます。Streams for Apache Kafka が Kafka Connect イメージをビルドした後、ビルドに含まれるコネクターを起動するための KafkaConnector
カスタムリソースを作成します。
前提条件
- クラスター Operator がインストールされている OpenShift クラスターにアクセスできる。
- Streams for Apache Kafka Operator が実行されている。
- Apache Kafka クラスターが Streams for Apache Kafka on OpenShift のデプロイと管理 に記載されているとおりにデプロイされている。
- Kafka Connect が Streams for Apache Kafka にデプロイされている。
- コネクターが変更イベントレコードを読み取ることができる Kafka トピックがある。
- 宛先データベースがインストールされ、JDBC 接続を受け入れるように設定されている。
- Red Hat build of Debezium のライセンスを所有している。
-
OpenShift
oc
CLI クライアントがインストールされている、または OpenShift Container Platform Web コンソールにアクセスできる。 Kafka Connect ビルドイメージの保存方法に応じて、レジストリーのパーミッションを用意するか、ImageStream リソースを作成している。
- ビルドイメージを Red Hat Quay.io または Docker Hub などのイメージレジストリーに保存する場合は、以下が必要です。
- レジストリーでイメージを作成し、管理するためのアカウントおよびパーミッション
- ビルドイメージをネイティブ OpenShift ImageStream として保存する場合は、以下を行います。
- 新規コンテナーイメージを保存するために、ImageStream リソースをクラスターにデプロイします。クラスターの ImageStream を明示的に作成する必要があります。ImageStreams は、デフォルトでは利用できません。ImageStreams の詳細は、OpenShift Container Platform でのイメージストリームの管理 を参照してください。
手順
- OpenShift クラスターにログインします。
コネクターの Debezium
KafkaConnect
カスタムリソース (CR) を作成するか、既存のリソースを変更します。
たとえば、次の抜粋に示すように、annotations
とimage
プロパティーを指定するdbz-jdbc-connect.yaml
という名前のKafkaConnect
CR を作成します。次の例では、カスタムリソースは、次のアーティファクトをダウンロードするように設定されています。- Debezium JDBC コネクターアーカイブ。
Oracle または Db2 シンクデータベースへの接続に必要な JDBC ドライバー。他のシンク宛先ではこのエントリーを省略できます。
Debezium コネクターを含む
KafkaConnect
カスタムリソースを定義するdbz-jdbc-connect.yaml
ファイルCopy to Clipboard Copied! Toggle word wrap Toggle overflow Expand 表3.5 Kafka Connect 設定の説明 項目 説明 1
strimzi.io/use-connector-resources
アノテーションを"true"
に設定して、クラスター Operator がKafkaConnector
リソースを使用してこの Kafka Connect クラスター内のコネクターを設定できるようにします。2
spec.build
設定は、ビルドイメージの保存場所を指定し、プラグインアーティファクトの場所とともにイメージに追加するプラグインをリストします。3
build.output
は、新しくビルドされたイメージを保存するレジストリーを指定します。4
イメージ出力の名前およびイメージ名を指定します。
output.type
の有効な値は、Quay などのコンテナーレジストリーにプッシュするdocker
または、イメージを内部 OpenShift ImageStream にプッシュするimagestream
です。ImageStream を使用するには、ImageStream リソースをクラスターにデプロイする必要があります。KafkaConnect 設定でbuild.output
を指定する方法の詳細は、{NameConfiguringStreamsOpenShift} の Streams for Apache Kafka ビルドスキーマリファレンス を参照してください。5
plugins
設定は、Kafka Connect イメージに追加するすべてのコネクターをリストします。リストの各エントリーについて、プラグインname
と、コネクターのビルドに必要なアーティファクトに関する情報を指定します。必要に応じて、各コネクタープラグインに対して、コネクターと使用できる他のコンポーネントを含めることができます。たとえば、Service Registry アーティファクトまたは Debezium スクリプトコンポーネントを追加できます。6
artifacts.type
の値は、artifacts.url
で指定するアーティファクトのファイルタイプを指定します。有効なタイプはzip
、tgz
、またはjar
です。Debezium コネクターアーカイブは、.zip
ファイル形式で提供されます。JDBC ドライバーファイルは.jar
形式です。type
の値は、url
フィールドで参照されるファイルのタイプと一致させる必要があります。7
artifacts.url
の値は、コネクターアーティファクトのファイルを格納する Maven リポジトリーなどの HTTP サーバーのアドレスを指定します。OpenShift クラスターが指定されたサーバーにアクセスできる必要があります。8
(Db2 または Oracle シンクのみの場合) Maven Central 内の JDBC JDBC ドライバーの場所を指定します。Debezium がこれらのデータベースに接続するために必要なドライバーは、Debezium コネクターアーカイブに含まれていません。
この例では、Oracle Database JDBC ドライバーの Maven URL を示しています。Db2 JDBC ドライバーは、次の Maven の場所で入手できます:
https://repo1.maven.org/maven2/com/ibm/db2/jcc/11.5.9.0/jcc-11.5.9.0.jar
以下のコマンドを入力して、
KafkaConnect
ビルド仕様を OpenShift クラスターに適用します。oc create -f dbz-jdbc-connect.yaml
oc create -f dbz-jdbc-connect.yaml
Copy to Clipboard Copied! Toggle word wrap Toggle overflow Streams Operator はカスタムリソースで指定された設定に基づいて、デプロイする Kafka Connect イメージを準備します。
ビルドが完了すると、Operator はイメージを指定されたレジストリーまたは ImageStream にプッシュし、Kafka Connect クラスターを起動します。設定にリスト表示されているコネクターアーティファクトはクラスターで利用できます。KafkaConnector
リソースを作成し、デプロイする各コネクターのインスタンスを定義します。
たとえば、次のKafkaConnector
CR を作成し、orders-to-postgresql-jdbc-connector.yaml
として保存します。Debezium コネクターの
KafkaConnector
カスタムリソースを定義するorders-to-postgresql-jdbc-connector.yaml
ファイルCopy to Clipboard Copied! Toggle word wrap Toggle overflow Expand 表3.6 コネクター設定の説明 項目 説明 1
Kafka Connect クラスターに登録するコネクターの名前。
2
コネクタークラスの名前。
3
同時に動作できるタスクの数。
4
コネクターの設定。
5
シンクデータベースの JDBC 接続 URL。URL は、ポート番号と、データベースへの接続に必要な認証プロパティーを指定します。たとえば、
jdbc:oracle:thin:@myhost.example.com:1521/myservice
などです。6
Debezium がデータベースへの接続に使用するアカウントの名前。
7
Debezium がデータベースユーザーアカウントに接続するために使用するパスワード。
8
コネクターが読み取る Kafka トピックのコンマ区切りリストを指定します。各トピックからのイベントは、シンクデータベースで同じ名前のテーブルにストリーミングされます。
以下のコマンドを実行してコネクターリソースを作成します。
oc create -n <namespace> -f <kafkaConnector>.yaml
oc create -n <namespace> -f <kafkaConnector>.yaml
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 以下に例を示します。
oc create -n debezium -f jdbc-inventory-connector.yaml
oc create -n debezium -f jdbc-inventory-connector.yaml
Copy to Clipboard Copied! Toggle word wrap Toggle overflow コネクターは Kafka Connect クラスターに登録され、
KafkaConnector
CR のspec.config.database.dbname
で指定されたデータベースに対して実行を開始します。コネクター Pod の準備ができると、Debezium が実行されます。
3.1.3.4. Containerfile からカスタム Kafka Connect コンテナーイメージを構築して Debezium JDBC コネクターをデプロイする リンクのコピーリンクがクリップボードにコピーされました!
Debezium JDBC コネクターは、Debezium コネクターのアーカイブを含んだカスタムの Kafka Connect コンテナーイメージを構築し、そのコンテナーイメージをコンテナーレジストリーにプッシュすることでデプロイできます。その後、次のカスタムリソース (CR) を作成して、コネクター設定を定義します。
-
Kafka Connect インスタンスを定義する
KafkaConnect
CR。image
は Debezium コネクターを実行するために作成したイメージの名前を指定します。この CR は、Red Hat Streams for Apache Kafka がデプロイされている OpenShift インスタンスに適用します。Streams for Apache Kafka は、Apache Kafka を OpenShift に導入する Operator とイメージを提供します。 -
Debezium JDBC コネクターを定義する
KafkaConnector
CR。この CR をKafkaConnect
CR を適用したのと同じ OpenShift インスタンスに適用します。
このセクションで説明されているデプロイメント方法は非推奨となり、ドキュメントの今後のバージョンでは削除される予定です。
前提条件
- 宛先データベースがインストールされ、JDBC 接続を受け入れるように設定されている。
- Streams for Apache Kafka が OpenShift にデプロイされ、Apache Kafka および Kafka Connect が実行されている。詳細は、Streams for Apache Kafka on OpenShift のデプロイと管理 を参照してください。
- Podman または Docker がインストールされている。
- コネクターが変更イベントレコードを読み取ることができる Kafka トピックがある。
- 宛先データベースがインストールされ、JDBC 接続を受け入れるように設定されている。
- JDBC コネクターを使用して Db2 や Oracle データベースにデータを送信する必要がある場合、Kafka Connect サーバーは Maven Central にアクセスして、それらのデータベース用の JDBC ドライバーをダウンロードします。また、ドライバーのローカルコピー、またはローカルの Maven リポジトリーや他の HTTP サーバーから利用可能なものを使用することもできます。
-
Debezium コネクターを実行するコンテナーを追加する予定のコンテナーレジストリー (
quay.io
やdocker.io
など) でコンテナーを作成および管理するアカウントとパーミッションを持っている。
手順
Kafka Connect に Debezium JDBC コネクターコンテナーを作成します。
-
registry.redhat.io/amq-streams/kafka-39-rhel9:2.9.0
をベースイメージとして使用する Containerfile を作成します。たとえば、ターミナルウィンドウから、以下のコマンドを入力します。
-
+ .Descriptions of Containerfile settings for building a custom Kafka Connect container image
+
.Descriptions of Containerfile settings for building a custom Kafka Connect container image
項目 | 説明 |
---|---|
1 | 任意のファイル名を指定できます。 |
2 | Kafka Connect プラグインディレクトリーへのパスを指定します。Kafka Connect のプラグインディレクトリーが別の場所にある場合は、このパスを実際のディレクトリーのパスに置き換えてください。 |
+ このコマンドは、カレントディレクトリーに debezium-jdbc-connector-container.yaml
という名前の Containerfile を作成します。
前の手順で作成した
debezium-jdbc-connector-container.yaml
Containerfile からコンテナーイメージをビルドします。ファイルが含まれるディレクトリーから、ターミナルウィンドウを開き、以下のコマンドのいずれかを入力します。podman build -t debezium-jdbc-connector-container:latest .
podman build -t debezium-jdbc-connector-container:latest .
Copy to Clipboard Copied! Toggle word wrap Toggle overflow docker build -t debezium-jdbc-connector-container:latest .
docker build -t debezium-jdbc-connector-container:latest .
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 上記のコマンドは、
debezium-jdbc-connector-container
という名前のコンテナーイメージを構築します。カスタムイメージを quay.io などのコンテナーレジストリーまたは内部のコンテナーレジストリーにプッシュします。コンテナーレジストリーは、イメージをデプロイする OpenShift インスタンスで利用できる必要があります。以下のいずれかのコマンドを実行します。
podman push <myregistry.io>/debezium-jdbc-connector-container:latest
podman push <myregistry.io>/debezium-jdbc-connector-container:latest
Copy to Clipboard Copied! Toggle word wrap Toggle overflow docker push <myregistry.io>/debezium-jdbc-connector-container:latest
docker push <myregistry.io>/debezium-jdbc-connector-container:latest
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 新しい Debezium Oracle KafkaConnect カスタムリソース (CR) を作成します。たとえば、
annotations
およびimage
プロパティーを指定するdbz-connect.yaml
という名前のKafkaConnect
CR を作成します。以下の例は、KafkaConnect
カスタムリソースを記述するdbz-connect.yaml
ファイルからの抜粋を示しています。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
項目 | 説明 |
---|---|
1 |
Cluster Operator が |
2 |
|
以下のコマンドを入力して、
KafkaConnect
CR を OpenShift Kafka Connect 環境に適用します。oc create -f dbz-connect.yaml
oc create -f dbz-connect.yaml
Copy to Clipboard Copied! Toggle word wrap Toggle overflow このコマンドは、Debezium コネクターを実行するために作成したイメージの名前を指定する Kafka Connect インスタンスを追加します。
Debezium JDBC コネクターインスタンスを設定する
KafkaConnector
カスタムリソースを作成します。Debezium JDBC コネクターは、コネクターの設定プロパティーを指定する
.yaml
ファイルで設定します。次の例は、
KafkaConnect
カスタムリソースの主要なプロパティーのいくつかを設定するdbz-connect.yaml
ファイルからの抜粋を示しています。
コネクターは、ポート5432
上の PostgreSQL サーバーシンクへの JDBC 接続を確立します。
使用可能なコネクタープロパティーの全範囲については、Debezium JDBC コネクター設定プロパティーの説明 を参照してください。例3.1
jdbc-connector.yaml
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
項目 | 説明 |
---|---|
1 | Kafka Connect サービスに登録されているコネクター名。 |
2 | Streams for Apache Kafka クラスターの名前。 |
3 | Debezium JDBC コネクタークラスの名前。 |
4 | シンクデータベースの JDBC アドレス。 |
5 | Debezium がデータベースへの接続に使用するアカウントの名前。 |
6 | Debezium がデータベースユーザーアカウントの認証に使用するパスワード。 |
7 | コネクターが読み取る Kafka トピックのコンマ区切りリストを指定します。各トピックからのイベントは、シンクデータベースで同じ名前のテーブルにストリーミングされます。 |
Kafka Connect でコネクターインスタンスを作成します。たとえば、
KafkaConnector
リソースをjdbc-connector.yaml
ファイルに保存した場合は、次のコマンドを実行します。oc apply -f jdbc-connector.yaml
oc apply -f jdbc-connector.yaml
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 上記のコマンドは、
orders-topic-to-postgresql-via-jdbc-sink-connector
を登録します。コネクターは起動し、KafkaConnector
CR で指定されたとおりに、orders
トピックからの読み取りを開始します。
3.1.4. Debezium JDBC コネクター設定プロパティーの説明 リンクのコピーリンクがクリップボードにコピーされました!
Debezium JDBC シンクコネクターには、ニーズを満たすコネクターの動作を実現するために使用できるいくつかの設定プロパティーがあります。多くのプロパティーにはデフォルト値があります。プロパティーに関する情報は、以下のように設定されています。
3.1.4.1. JDBC コネクター Kafka コンシューマープロパティー リンクのコピーリンクがクリップボードにコピーされました!
プロパティー | デフォルト | 説明 |
---|---|---|
デフォルトなし | コネクターの一意名。コネクターの登録時にこの名前を再利用しようとすると失敗します。このプロパティーはすべての Kafka Connect コネクターに必要です。 | |
デフォルトなし |
コネクターの Java クラスの名前。Debezium JDBC コネクターの場合、 | |
1 | このコネクターに使用するタスクの最大数。 | |
デフォルトなし |
消費するトピックのコンマ区切りのリスト。このプロパティーを | |
デフォルトなし |
消費するトピックを指定する正規表現。内部的には、正規表現は |
3.1.4.2. JDBC コネクター接続プロパティー リンクのコピーリンクがクリップボードにコピーされました!
プロパティー | デフォルト | 説明 |
---|---|---|
| 使用する接続プロバイダーの実装。 | |
デフォルトなし | データベースへの接続に使用される JDBC 接続 URL。 | |
デフォルトなし | コネクターがデータベースへの接続に使用するデータベースユーザーアカウントの名前。 | |
デフォルトなし | コネクターがデータベースへの接続に使用するパスワード。 | |
| プール内の最小接続数を指定します。 | |
| プールが維持する同時接続の最大数を指定します。 | |
| 接続プールが最大サイズを超えた場合にコネクターが取得を試みる接続の数を指定します。 | |
| 未使用の接続が破棄されるまで保持する時間 (秒) を指定します。 |
3.1.4.3. JDBC コネクターのランタイムプロパティー リンクのコピーリンクがクリップボードにコピーされました!
プロパティー | デフォルト | 説明 |
---|---|---|
| JDBC 時間値を挿入するときに使用するタイムゾーンを指定します。 | |
|
コネクターが | |
|
コネクターが 注記
JDBC コネクターが Db2 から受信した
上記のクエリーを送信するユーザーアカウントには、切り捨てられるテーブルに対する | |
| イベントのデータベースへの挿入に使用するストラテジーを指定します。以下のオプションが利用できます。
| |
| コネクターがイベントからプライマリーキー列を解決する方法を指定します。
| |
デフォルトなし |
プライマリーキー列の名前、またはプライマリーキーの導出元となるフィールドのコンマ区切りリスト。 | |
| 生成された SQL ステートメントでテーブル名と列名を区切るために引用符を使用するかどうかを指定します。詳細は、JDBC における引用符使用時の大文字小文字の区別 セクションを参照してください。 | |
| コネクターが宛先テーブルのスキーマを進化させる方法を指定します。詳細は、「Debezium JDBC コネクターのスキーマ進化モード」 を参照してください。以下のオプションが利用できます。
| |
|
コネクターが宛先テーブルの名前を構築するために使用する文字列パターンを指定します。 | |
|
PostgreSQL PostGIS エクステンションがインストールされているスキーマ名を指定します。デフォルトは | |
|
SQL Server テーブルの identity 列への | |
| 宛先テーブルにまとめてバッチ処理するレコードの数を指定します。 注記
Connect ワーカープロパティーの | |
| Debezium JDBC コネクターの削減バッファーを有効にするかどうかを指定します。 次のいずれかの設定を選択します。
削減バッファーが有効になっている場合に PostgreSQL シンクデータベースでのクエリー処理を最適化するには、JDBC 接続 URL に | |
空の文字列 |
変更イベント値から追加するフィールドの完全修飾名と一致するフィールド名の、オプションのコンマ区切りリスト。フィールドの完全修飾名の形式は、 | |
空の文字列 |
変更イベント値から除外するフィールドの完全修飾名と一致するフィールド名の、オプションのコンマ区切りリスト。フィールドの完全修飾名の形式は、 | |
5 | ターゲットデータベースへの変更をフラッシュしようとして特定のデータベースエラーが発生した後に、コネクターが実行する再試行の最大回数を指定します。再試行回数が再試行値を超えると、シンクコネクターは FAILED 状態になります。 | |
1000 | 失敗したフラッシュ操作を再試行するためにコネクターが待機するミリ秒数を指定します。 注記
|
3.1.4.4. JDBC コネクターの拡張可能なプロパティー リンクのコピーリンクがクリップボードにコピーされました!
プロパティー | デフォルト | 説明 |
---|---|---|
|
コネクターがイベントフィールド名から列名を解決するために使用する | |
|
コネクターが受信イベントトピック名からテーブル名を解決するために使用する
|
3.1.4.5. JDBC コネクター hibernate.* パススループロパティー リンクのコピーリンクがクリップボードにコピーされました!
Kafka Connect はパススルー設定をサポートしており、コネクター設定から特定のプロパティーを直接渡すことで、基盤となるシステムの動作を変更できます。デフォルトでは、一部の Hibernate プロパティーは、JDBC コネクター 接続プロパティー (例: connection.url
、connection.username
、connection.pool.*_size
)、およびコネクターの ランタイムプロパティー (例: use.time.zone
、quote.identifiers
) を通じて公開されます。
他の Hibernate の動作をカスタマイズする場合は、hibernate.* 名前空間を使用するプロパティーをコネクター設定に追加することで、パススルーのメカニズムを利用できます。
たとえば、Hibernate がターゲットデータベースのタイプとバージョンを解決できるようにするには、hibernate.dialect
プロパティーを追加し、それをデータベースの完全修飾クラス名 (例: org.hibernate.dialect.MariaDBDialect
) に設定します。
3.1.5. JDBC コネクターのよくある質問 リンクのコピーリンクがクリップボードにコピーされました!
ExtractNewRecordState
単一メッセージ変換は必要ですか?- いいえ、実際には、Debezium JDBC コネクターを他の実装と区別する要素の 1 つです。このコネクターは、競合するコネクターと同様にフラット化されたイベントを取り込むことができますが、特定の種類の変換を必要とせずに、Debezium の複雑な変更イベント構造をネイティブに取り込むこともできます。
- 列の型が変更された場合、または列の名前が変更または削除された場合、これはスキーマの進化によって処理されますか?
- いいえ、Debezium JDBC コネクターは既存の列に変更を加えません。コネクターによってサポートされるスキーマの進化は非常に基本的なものです。これは、イベント構造内のフィールドをテーブルの列リストと単純に比較し、まだ列として定義されていないフィールドをテーブルに追加します。列のタイプまたはデフォルト値が変更された場合、コネクターは宛先データベースでの調整は行いません。列の名前が変更された場合、古い列はそのまま残され、コネクターは新しい名前の列をテーブルに追加します。ただし、古い列のデータを含む既存の行は変更されません。このようなタイプのスキーマ変更は手動で処理する必要があります。
- 列の型が希望する型に解決されない場合、別のデータ型に強制的にマッピングするにはどうすればよいですか?
- Debezium JDBC コネクターは、高度な型システムを使用して列のデータ型を解決します。この型システムが特定のフィールドのスキーマ定義を JDBC 型に解決する方法の詳細は、「Debezium JDBC コネクターのデータと列タイプのマッピングの説明」 セクションを参照してください。別のデータ型マッピングを適用する場合は、テーブルを手動で定義して、優先される列の型を明示的に取得します。
- Kafka トピック名を変更せずに、テーブル名に接頭辞または接尾辞を指定するにはどうすればよいですか?
-
宛先テーブル名に接頭辞または接尾辞を追加するには、collection.name.format コネクター設定プロパティーを調整して、必要な接頭辞または接尾辞を適用します。たとえば、すべてのテーブル名に
jdbc_
という接頭辞を付けるには、collection.name.format
設定プロパティーにjdbc_${topic}
という値を指定します。コネクターがorder
というトピックにサブスクライブされている場合、結果のテーブルはjdbc_orders
として作成されます。 - 識別子の引用符が有効になっていないにもかかわらず、一部の列が自動的に引用符で囲まれるのはなぜですか?
-
状況によっては、
quote.identifiers
が有効になっていない場合でも、特定の列名またはテーブル名が明示的に引用符で囲まれることがあります。これは、列名またはテーブル名が、不正な構文とみなされる特定の規則で始まるか、またはそのような規則が使用されている場合に引用符が必要になることがよくあります。たとえば、primary.key.mode がkafka
に設定されている場合、一部のデータベースでは、列名が引用符で囲まれている場合にのみ、列名をアンダースコアで始めることができます。引用の動作は方言固有であり、データベースの種類によって異なります。