13.2. 指定したトピックへの Debezium イベントレコードのルーティング
データ変更イベントが含まれるそれぞれの Kafka レコードは、デフォルトのルーティング先トピックを持ちます。必要に応じて、レコードが Kafka Connect コンバーターに到達する前に、指定したトピックにレコードを再ルーティングすることができます。そのために、Debezium ではトピックルーティング単一メッセージ変換 (SMT) を利用することができます。Debezium コネクターの Kafka Connect 設定でこの変換を設定します。設定オプションにより、以下の項目を指定することができます。
- 再ルーティングするレコードを識別するための式。
- ルーティング先トピックに解決する式。
- 宛先トピックに再ルーティングされるレコード間でキーの一意性を確保する方法。
変換の設定により必要な動作が得られるようにするのは、ユーザー側の範疇です。Debezium は、変換の設定により得られる動作を検証しません。
トピックルーティング変換は Kafka Connect SMT です。
詳細は以下のセクションを参照してください。
13.2.1. 指定したトピックに Debezium レコードをルーティングするユースケース
Debezium コネクターのデフォルト動作では、それぞれの変更イベントレコードは、名前がデータベースおよび変更が加えられたテーブルの名前から作られるトピックに送信されます。つまり、トピックは 1 つの物理テーブルのレコードを受け取ります。トピックが複数の物理テーブルのレコードを受け取るようにするには、Debezium コネクターを設定してレコードをそのトピックに再ルーティングする必要があります。
論理テーブル
論理テーブルは、複数の物理テーブルのレコードを 1 つのトピックにルーティングする場合の一般的なユースケースです。論理テーブル内には、すべて同じスキーマを持つ複数の物理テーブルがあります。たとえば、シャーディングされたテーブルのスキーマは同一です。論理テーブルは、db_shard1.my_table
および db_shard2.my_table
という 2 つ以上のシャード化されたテーブルで構成されている場合があります。テーブルは異なるシャードにあり物理的に別個のものですが、1 つにまとまり論理テーブルを形成します。任意のシャード内のテーブルの変更イベントレコードを、同じトピックに再ルーティングすることができます。
パーティションで分割された PostgreSQL テーブル
Debezium PostgreSQL コネクターがパーティションで分割されたテーブルの変更をキャプチャーする場合、デフォルトの動作では、変更イベントレコードはパーティションごとに異なるトピックにルーティングされます。すべてのパーティションからのレコードを 1 つのトピックに出力するには、トピックルーティング SMT を設定します。パーティションで分割されたテーブルの各キーは必ず一意であるため、キーの一意性を確保するために SMT がキーフィールドを追加しないように key.enforce.uniqueness=false
を設定します。デフォルトの動作では、キーフィールドが追加されます。
13.2.2. 複数テーブルの Debezium レコードを 1 つのトピックにルーティングする例
複数の物理テーブルの変更イベントレコードを同じトピックにルーティングするには、Debezium コネクターの Kafka Connect 設定でトピックルーティング変換を設定します。トピックルーティング SMT を設定するには、以下の項目を決定する正規表現を指定する必要があります。
- レコードをルーティングするテーブル。これらのテーブルのスキーマは、すべて同一でなければなりません。
- ルーティング先トピックの名前。
以下の例のコネクター設定では、トピックルーティング SMT の複数のオプションを設定します。
transforms=Reroute transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter transforms.Reroute.topic.regex=(.*)customers_shard(.*) transforms.Reroute.topic.replacement=$1customers_all_shards
topic.regex
変更イベントレコードを特定のトピックにルーティングする必要があるかどうかを決定するために、変換がそれぞれのレコードに適用する正規表現を指定します。
この例では、正規表現
(.*)customers_shard(.*)
は、名前にcustomers_shard
文字列が含まれるテーブルに対する変更のレコードがマッチします。この場合、以下の名前のテーブルのレコードが再ルーティングされます。myserver.mydb.customers_shard1
myserver.mydb.customers_shard2
myserver.mydb.customers_shard3
topic.replacement
-
ルーティング先トピックの名前を表す正規表現を指定します。変換により、マッチする各レコードがこの式で識別されるトピックにルーティングされます。この例では、上記 3 つのシャーディングされたテーブルのレコードが
myserver.mydb.customers_all_shards
トピックにルーティングされます。 schema.name.adjustment.mode
-
コネクターで使用されるメッセージ変換器との互換性のために、結果のトピック名から派生するメッセージキースキーマ名を調整する方法を指定します。値は
none
(デフォルト) またはavro
です。
設定のカスタマイズ
変換で処理する、または処理しないテーブルを指定する SMT 述語ステートメント を定義して、設定をカスタマイズできます。述語は、正規表現に一致するテーブルをルーティングするように SMT を設定し、正規表現に一致する特定のテーブルを SMT に再ルーティングさせたくない場合に役立ちます。
13.2.3. 同一トピックにルーティングされる Debezium レコード間でのキーの一意性確保
Debezium の変更イベントキーは、テーブルのプライマリーキーを設定するテーブル列を使用します。複数の物理テーブルのレコードを 1 つのトピックにルーティングするには、それらの全テーブルに渡ってイベントキーが一意でなければなりません。ただし、それぞれの物理テーブルは、そのテーブル内でのみ一意なプライマリーキーを持つことができます。たとえば、myserver.mydb.customers_shard1
テーブルの行は、myserver.mydb.customers_shard2
テーブルの行と同じキー値を持つ場合があります。
変更イベントレコードが同じトピックにルーティングされる全テーブルに渡ってそれぞれのイベントキーが必ず一意になるように、トピックルーティング変換は変更イベントキーにフィールドを挿入します。デフォルトでは、挿入されるフィールドの名前は __dbz__physicalTableIdentifier
です。挿入されるフィールドの値は、デフォルトのルーティング先トピックの名前です。
必要に応じて、別のフィールドをキーに挿入するようにトピックルーティング変換を設定することができます。そのためには、key.field.name
オプションを指定し、それを既存のプライマリーキーフィールド名と競合しないフィールド名に設定します。以下に例を示します。
transforms=Reroute transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter transforms.Reroute.topic.regex=(.*)customers_shard(.*) transforms.Reroute.topic.replacement=$1customers_all_shards transforms.Reroute.key.field.name=shard_id
この例では、ルーティングされるレコードのキー構造に shard_id
フィールドが追加されます。
キーの新しいフィールドの値を調整する場合は、以下の両方のオプションを設定します。
key.field.regex
- 1 つまたは複数の文字グループをキャプチャーするために、変換がデフォルトのルーティング先トピックの名前に適用する正規表現を指定します。
key.field.replacement
- キャプチャーされるこれらのグループに関して、挿入されるキーフィールドの値を決定するための正規表現を指定します。
以下に例を示します。
transforms.Reroute.key.field.regex=(.*)customers_shard(.*) transforms.Reroute.key.field.replacement=$2
この設定では、デフォルトのルーティング先トピックの名前を以下のように仮定します。
myserver.mydb.customers_shard1
myserver.mydb.customers_shard2
myserver.mydb.customers_shard3
変換では、2 番目にキャプチャーされたグループの値であるシャード番号が、キーの新しいフィールドの値として使用されます。この例では、挿入されるキーフィールドの値は 1
、2
、または 3
です。
テーブルにグローバルに一意なキーが含まれ、キー構造を変更する必要がない場合は、key.enforce.uniqueness
プロパティーを false
に設定することができます。
... transforms.Reroute.key.enforce.uniqueness=false ...
13.2.4. トピックルーティング変換を一部適用するオプション
データベースの変更が発生したときに Debezium コネクターが出力する変更イベントメッセージの他に、コネクターはハートビートメッセージなど、他のタイプのメッセージとスキーマ変更およびトランザクションに関するメタデータメッセージも出力します。これらの他のメッセージの構造は、SMT が処理するように設計された変更イベントメッセージの構造とは異なるため、目的のデータ変更メッセージのみを処理するようにコネクターを SMT を選んで適用することが推奨されます。
以下の方法のいずれかを使用して、SMT を選んで適用するようにコネクターを設定できます。
- 変換用の SMT 述語を設定する。
- SMT に topic.regex 設定オプションを使用する。
13.2.5. Debezium トピックルーティング変換設定用のオプション
以下の表で、トピックルーティングの SMT 設定オプションを紹介します。
オプション | デフォルト | 説明 |
---|---|---|
変更イベントレコードを特定のトピックにルーティングする必要があるかどうかを決定するために、変換がそれぞれのレコードに適用する正規表現を指定します。 | ||
ルーティング先トピックの名前を表す正規表現を指定します。変換により、マッチする各レコードがこの式で識別されるトピックにルーティングされます。この式により、 | ||
|
レコードの変更イベントキーにフィールドを追加するかどうかを定義します。キーフィールドを追加することで、変更イベントレコードが同じトピックにルーティングされる全テーブルに渡って、それぞれのイベントキーの一意性が確保されます。この設定は、同じキーを持つが異なるソーステーブルに由来するレコードの変更イベントの競合を防ぐのに役立ちます。 | |
|
変更イベントキーに追加されるフィールドの名前。このフィールドの値により、元のテーブル名が識別されます。SMT がこのフィールドを追加するには | |
1 つまたは複数の文字グループをキャプチャーするために、変換がデフォルトのルーティング先トピックの名前に適用する正規表現を指定します。SMT がこの正規表現を適用するには、 | ||
| ||
none |
結果のトピック名から派生したメッセージキースキーマ名を、コネクターが使用するメッセージコンバーターとの互換性に合わせて調整する方法を指定します。 | |
| LRUCache で最大エントリーを保持するために使用されるサイズ。キャッシュは、論理テーブルのキーと値の古い/新しいスキーマを保持し、派生キーとトピックの正規表現の結果もキャッシュして、ソースレコードの変換を改善します。 |