第8章 アプリケーションの Debezium コネクターの設定
デフォルトの Debezium コネクター動作がアプリケーションに適したではない場合、以下の Debezium 機能を使用して必要な動作を設定できます。
-
ByLogicalTableRouter
SMT は、データ変更イベントレコードを指定したトピックに再度ルーティングします。 -
ExtractNewRecordState
SMT は、一部の Kafka コンシューマーで必要とされる可能性のある単純な形式で、データ変更イベントレコードの複雑な構造をフラット化します。 - PostgreSQL、MongoDB、または SQL Server コネクターの Avro シリアライズ を設定すると、イベントレコードコンシューマーの変更がレコードスキーマに対応できるようになります。
-
CloudEventsConverter
では、Debezium コネクターは CloudEvents 仕様に準拠する変更イベントレコードを出力できます。
8.1. 指定したトピックへのルーティング変更イベントレコード
データ変更イベントが含まれる各 Kafka レコードには、デフォルトの宛先トピックがあります。必要な場合は、レコードが Kafka Connect コンバーターに到達する前に指定したトピックにレコードを再ルート指定できます。そのため、Debezium は SMT(Single ByLogicalTableRouter
Message transformation)を提供します。Debezium コネクターの Kafka Connect 設定でこの変換を設定します。設定オプションを使用すると、以下を指定できます。
- 再ルートを作成するレコードを識別する式
- 宛先トピックに解決する式
- 宛先トピックに再ルーティングされるレコード間の一意の鍵を確保する方法
変換設定が必要な動作を提供するのはユーザー次第です。Debezium は、変換の設定から生じる動作を検証しません。
ByLogicalTableRouter
変換は Kafka Connect SMT です。
詳細は以下のトピックを参照してください。
8.1.1. 指定したトピックへのルーティングレコードのユースケース
デフォルトの動作では、Debezium コネクターは、名前がデータベースの名前から形成されているトピックと、変更が行われたテーブルの名前が Debezium コネクターによって各変更イベントレコードを送信することです。つまり、トピックは 1 つの物理テーブルのレコードを受け取ります。トピックで複数の物理テーブルのレコードを受信する場合、Debezium コネクターがそのトピックにレコードを再ルーティングするように設定する必要があります。
論理テーブル
論理表は、複数の物理テーブルのルーティングレコードの一般的なユースケースです。論理表には、スキーマがすべて同じスキーマを持つ複数の物理テーブルがあります。たとえば、シャード化されたテーブルには同じスキーマがあります。論理テーブルは、db_shard1.my_table
およびという 2 つ以上のシャードテーブルで構成される場合があります db_shard2.my_table
。テーブルはシャードごとに異なり、物理的には異なりますが、それらが論理テーブルを形成します。シャードのテーブルのテーブルの変更イベントレコードを同じトピックに再ルーティングできます。
パーティション化された PostgreSQL テーブル
Debezium PostgreSQL コネクターがパーティション設定されたテーブルの変更を取得する場合、イベントレコードの変更が各パーティションに対して異なるトピックにルーティングされることがデフォルトの動作になります。すべてのパーティションから 1 つのトピックにレコードを出力するには、ByLogicalTableRouter
SMT を設定します。パーティションテーブルの各キーは一意であることが保証されるので、SMT がキーを固有のキー key.enforce.uniqueness=false
を確保するようにキーフィールドを追加しないようにを設定します。キーフィールドを追加するのはデフォルトの動作です。
8.1.2. 複数テーブルのルーティングレコードの例
複数の物理テーブルのイベントレコードを同じトピックにルーティングするには、Debezium コネクターの Kafka Connect 設定で ByLogicalTableRouter
変換を設定します。ByLogicalTableRouter
SMT の設定では、以下を判断する正規表現を指定する必要があります。
- レコードをルーティングするテーブル。これらのテーブルはすべて同じスキーマを持つ必要があります。
- 宛先トピック名。
たとえば、.properties
ファイルの設定は以下のようになります。
transforms=Reroute transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter transforms.Reroute.topic.regex=(.*)customers_shard(.*) transforms.Reroute.topic.replacement=$1customers_all_shards
topic.regex
変換が各変更イベントレコードに適用され、特定のトピックにルーティングするかどうかを判断する正規表現を指定します。
この例では、正規表現は名前に
customers_shard
文字列が含まれるテーブルへの変更の記録(.)customers_shard(.)
と一致します。これにより、以下の名前を持つテーブルのレコードを再ルート化します。myserver.mydb.customers_shard1
myserver.mydb.customers_shard2
myserver.mydb.customers_shard3
topic.replacement
-
宛先トピック名を表す正規表現を指定します。変換は、各一致するレコードをこの式で識別したトピックにルーティングします。この例では、上記の 3 つのシャードテーブルのレコードは
myserver.mydb.customers_all_shards
トピックにルーティングされます。
8.1.3. 同じトピックにルーティングされるレコード全体で一意の鍵を確保
Debezium Change イベントキーは、テーブルのプライマリーキーを構成するテーブル列を使用します。複数の物理テーブルのレコードを 1 つのトピックにルーティングするには、イベントキーはそれらのすべてのテーブル全体で一意である必要があります。ただし、各物理テーブルには、そのテーブル内でのみ一意のプライマリーキーを設定できます。たとえば、テーブルの行には、myserver.mydb.customers_shard1
テーブルの行と同じキーの値がある場合があり myserver.mydb.customers_shard2
ます。
変更イベントレコードが同じトピックにあるテーブル全体で各イベントキーが一意になるように、ByLogicalTableRouter
変換によってフィールドが変更されイベントキーに挿入されます。デフォルトでは、挿入されたフィールドの名前はです __dbz__physicalTableIdentifier
。挿入されたフィールドの値は、デフォルトの宛先トピック名です。
必要に応じて、別のフィールドをキーに挿入するように ByLogicalTableRouter
変換を設定できます。これを実行するには、key.field.name
オプションを指定し、既存のプライマリーキーフィールド名と競合しないフィールド名に設定します。以下に例を示します。
transforms=Reroute transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter transforms.Reroute.topic.regex=(.*)customers_shard(.*) transforms.Reroute.topic.replacement=$1customers_all_shards transforms.Reroute.key.field.name=shard_id
この例では、shard_id
フィールドをルーティングされたレコードのキー構造に追加します。
キーの新しいフィールドの値を調整するには、以下のいずれかのオプションを設定します。
key.field.regex
- 変換がデフォルトの宛先トピック名に適用され、1 つ以上の文字グループをキャプチャーする正規表現を指定します。
key.field.replacement
- 取得したグループの観点から、挿入されたキーフィールドの値を決定する正規表現を指定します。
以下に例を示します。
transforms.Reroute.key.field.regex=(.*)customers_shard(.*) transforms.Reroute.key.field.replacement=$2
この設定では、宛先のデフォルトのトピック名が以下のようになります。
myserver.mydb.customers_shard1
myserver.mydb.customers_shard2
myserver.mydb.customers_shard3
変換では、2 番目のキャプチャーグループ(シャード番号)の値をキーの新しいフィールドの値として使用します。この例では、挿入された key フィールドの値は、1
、2
、またはです 3
。
テーブルにグローバルに一意の鍵が含まれ、キー構造を変更する必要がない場合は、key.enforce.uniqueness
プロパティーをに設定し false
ます。
... transforms.Reroute.key.enforce.uniqueness=false ...
8.1.4. ByLogicalTableRouter
変換を設定するオプション
プロパティー | デフォルト | 説明 |
変換が各変更イベントレコードに適用され、特定のトピックにルーティングするかどうかを判断する正規表現を指定します。 | ||
宛先トピック名を表す正規表現を指定します。変換は、各一致するレコードをこの式で識別したトピックにルーティングします。この式は、に指定する正規表現によってキャプチャーされるグループを参照でき | ||
|
レコードの変更イベントキーにフィールドを追加するかどうかを示します。キーフィールドを追加すると、変更イベントレコードが同じトピックになるテーブル全体で各イベントキーが一意になります。これは、同じキーを持つものの、異なるソーステーブルから生成されたレコードの変更イベントの競合を防ぐのに役立ちます。 | |
|
変更イベントキーに追加するフィールドの名前。このフィールドの値は、元のテーブル名を識別します。SMT でこのフィールドを追加するには、がデフォルトであるで | |
変換がデフォルトの宛先トピック名に適用され、1 つ以上の文字グループをキャプチャーする正規表現を指定します。SMT がこの式を適用するには、がデフォルトであるで | ||
に指定された式がキャプチャーしたグループの形式で、挿入されたキーフィールドの値を決定する正規表現を指定し |