13.2. 指定したトピックへの Debezium イベントレコードのルーティング


データ変更イベントが含まれるそれぞれの Kafka レコードは、デフォルトのルーティング先トピックを持ちます。必要に応じて、レコードが Kafka Connect コンバーターに到達する前に、指定したトピックにレコードを再ルーティングすることができます。そのために、Debezium ではトピックルーティング単一メッセージ変換 (SMT) を利用することができます。Debezium コネクターの Kafka Connect 設定でこの変換を設定します。設定オプションにより、以下の項目を指定することができます。

  • 再ルーティングするレコードを識別するための式。
  • ルーティング先トピックに解決する式。
  • 宛先トピックに再ルーティングされるレコード間でキーの一意性を確保する方法。

変換の設定により必要な動作が得られるようにするのは、ユーザー側の範疇です。Debezium は、変換の設定により得られる動作を検証しません。

トピックルーティング変換は Kafka Connect SMT です。

詳細は以下のセクションを参照してください。

13.2.1. 指定したトピックに Debezium レコードをルーティングするユースケース

Debezium コネクターのデフォルト動作では、それぞれの変更イベントレコードは、名前がデータベースおよび変更が加えられたテーブルの名前から作られるトピックに送信されます。つまり、トピックは 1 つの物理テーブルのレコードを受け取ります。トピックが複数の物理テーブルのレコードを受け取るようにするには、Debezium コネクターを設定してレコードをそのトピックに再ルーティングする必要があります。

論理テーブル

論理テーブルは、複数の物理テーブルのレコードを 1 つのトピックにルーティングする場合の一般的なユースケースです。論理テーブル内には、すべて同じスキーマを持つ複数の物理テーブルがあります。たとえば、シャーディングされたテーブルのスキーマは同一です。論理テーブルは、db_shard1.my_table および db_shard2.my_tableという 2 つ以上のシャード化されたテーブルで構成されている場合があります。テーブルは異なるシャードにあり物理的に別個のものですが、1 つにまとまり論理テーブルを形成します。任意のシャード内のテーブルの変更イベントレコードを、同じトピックに再ルーティングすることができます。

パーティションで分割された PostgreSQL テーブル

Debezium PostgreSQL コネクターがパーティションで分割されたテーブルの変更をキャプチャーする場合、デフォルトの動作では、変更イベントレコードはパーティションごとに異なるトピックにルーティングされます。すべてのパーティションからのレコードを 1 つのトピックに出力するには、トピックルーティング SMT を設定します。パーティションで分割されたテーブルの各キーは必ず一意であるため、キーの一意性を確保するために SMT がキーフィールドを追加しないように key.enforce.uniqueness=false を設定します。デフォルトの動作では、キーフィールドが追加されます。

13.2.2. 複数テーブルの Debezium レコードを 1 つのトピックにルーティングする例

複数の物理テーブルの変更イベントレコードを同じトピックにルーティングするには、Debezium コネクターの Kafka Connect 設定でトピックルーティング変換を設定します。トピックルーティング SMT を設定するには、以下の項目を決定する正規表現を指定する必要があります。

  • レコードをルーティングするテーブル。これらのテーブルのスキーマは、すべて同一でなければなりません。
  • ルーティング先トピックの名前。

以下の例のコネクター設定では、トピックルーティング SMT の複数のオプションを設定します。

transforms=Reroute
transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter
transforms.Reroute.topic.regex=(.*)customers_shard(.*)
transforms.Reroute.topic.replacement=$1customers_all_shards
topic.regex

変更イベントレコードを特定のトピックにルーティングする必要があるかどうかを決定するために、変換がそれぞれのレコードに適用する正規表現を指定します。

この例では、正規表現 (.*)customers_shard(.*) は、名前に customers_shard 文字列が含まれるテーブルに対する変更のレコードがマッチします。この場合、以下の名前のテーブルのレコードが再ルーティングされます。

myserver.mydb.customers_shard1
myserver.mydb.customers_shard2
myserver.mydb.customers_shard3

topic.replacement
ルーティング先トピックの名前を表す正規表現を指定します。変換により、マッチする各レコードがこの式で識別されるトピックにルーティングされます。この例では、上記 3 つのシャーディングされたテーブルのレコードが myserver.mydb.customers_all_shards トピックにルーティングされます。
schema.name.adjustment.mode
コネクターで使用されるメッセージ変換器との互換性のために、結果のトピック名から派生するメッセージキースキーマ名を調整する方法を指定します。値は none (デフォルト) または avro です。

設定のカスタマイズ

変換で処理する、または処理しないテーブルを指定する SMT 述語ステートメント を定義して、設定をカスタマイズできます。述語は、正規表現に一致するテーブルをルーティングするように SMT を設定し、正規表現に一致する特定のテーブルを SMT に再ルーティングさせたくない場合に役立ちます。

13.2.3. 同一トピックにルーティングされる Debezium レコード間でのキーの一意性確保

Debezium の変更イベントキーは、テーブルのプライマリーキーを設定するテーブル列を使用します。複数の物理テーブルのレコードを 1 つのトピックにルーティングするには、それらの全テーブルに渡ってイベントキーが一意でなければなりません。ただし、それぞれの物理テーブルは、そのテーブル内でのみ一意なプライマリーキーを持つことができます。たとえば、myserver.mydb.customers_shard1 テーブルの行は、myserver.mydb.customers_shard2 テーブルの行と同じキー値を持つ場合があります。

変更イベントレコードが同じトピックにルーティングされる全テーブルに渡ってそれぞれのイベントキーが必ず一意になるように、トピックルーティング変換は変更イベントキーにフィールドを挿入します。デフォルトでは、挿入されるフィールドの名前は __dbz__physicalTableIdentifier です。挿入されるフィールドの値は、デフォルトのルーティング先トピックの名前です。

必要に応じて、別のフィールドをキーに挿入するようにトピックルーティング変換を設定することができます。そのためには、key.field.name オプションを指定し、それを既存のプライマリーキーフィールド名と競合しないフィールド名に設定します。以下に例を示します。

transforms=Reroute
transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter
transforms.Reroute.topic.regex=(.*)customers_shard(.*)
transforms.Reroute.topic.replacement=$1customers_all_shards
transforms.Reroute.key.field.name=shard_id

この例では、ルーティングされるレコードのキー構造に shard_id フィールドが追加されます。

キーの新しいフィールドの値を調整する場合は、以下の両方のオプションを設定します。

key.field.regex
1 つまたは複数の文字グループをキャプチャーするために、変換がデフォルトのルーティング先トピックの名前に適用する正規表現を指定します。
key.field.replacement
キャプチャーされるこれらのグループに関して、挿入されるキーフィールドの値を決定するための正規表現を指定します。

以下に例を示します。

transforms.Reroute.key.field.regex=(.*)customers_shard(.*)
transforms.Reroute.key.field.replacement=$2

この設定では、デフォルトのルーティング先トピックの名前を以下のように仮定します。

myserver.mydb.customers_shard1
myserver.mydb.customers_shard2
myserver.mydb.customers_shard3

変換では、2 番目にキャプチャーされたグループの値であるシャード番号が、キーの新しいフィールドの値として使用されます。この例では、挿入されるキーフィールドの値は 12、または 3 です。

テーブルにグローバルに一意なキーが含まれ、キー構造を変更する必要がない場合は、key.enforce.uniqueness プロパティーを false に設定することができます。

...
transforms.Reroute.key.enforce.uniqueness=false
...

13.2.4. トピックルーティング変換を一部適用するオプション

データベースの変更が発生したときに Debezium コネクターが出力する変更イベントメッセージの他に、コネクターはハートビートメッセージなど、他のタイプのメッセージとスキーマ変更およびトランザクションに関するメタデータメッセージも出力します。これらの他のメッセージの構造は、SMT が処理するように設計された変更イベントメッセージの構造とは異なるため、目的のデータ変更メッセージのみを処理するようにコネクターを SMT を選んで適用することが推奨されます。

以下の方法のいずれかを使用して、SMT を選んで適用するようにコネクターを設定できます。

13.2.5. Debezium トピックルーティング変換設定用のオプション

以下の表で、トピックルーティングの SMT 設定オプションを紹介します。

表13.1 トピックルーティングの SMT 設定オプション
オプションデフォルト説明

topic.regex

 

変更イベントレコードを特定のトピックにルーティングする必要があるかどうかを決定するために、変換がそれぞれのレコードに適用する正規表現を指定します。

topic.replacement

 

ルーティング先トピックの名前を表す正規表現を指定します。変換により、マッチする各レコードがこの式で識別されるトピックにルーティングされます。この式により、topic.regex に指定する正規表現によってキャプチャーされるグループを参照することができます。グループを参照するには、$1$2 などと指定します。

key.enforce​.uniqueness

true

レコードの変更イベントキーにフィールドを追加するかどうかを定義します。キーフィールドを追加することで、変更イベントレコードが同じトピックにルーティングされる全テーブルに渡って、それぞれのイベントキーの一意性が確保されます。この設定は、同じキーを持つが異なるソーステーブルに由来するレコードの変更イベントの競合を防ぐのに役立ちます。

変換でキーフィールドを追加する必要がない場合は、false を指定します。たとえば、パーティションで分割された PostgreSQL テーブルのキーは必ず一意であるため、レコードをパーティションで分割された PostgreSQL テーブルから 1 つのトピックにルーティングする場合は、key.enforce.uniqueness=false を設定することができます。

key.field.name

__dbz__physicalTableIdentifier

変更イベントキーに追加されるフィールドの名前。このフィールドの値により、元のテーブル名が識別されます。SMT がこのフィールドを追加するには key.enforce.uniquenesstrue (デフォルト) である必要があります。

key.field.regex

 

1 つまたは複数の文字グループをキャプチャーするために、変換がデフォルトのルーティング先トピックの名前に適用する正規表現を指定します。SMT がこの正規表現を適用するには、key.enforce.uniqueness をデフォルトの true に設定する必要があります。

key.field​.replacement

 

key.field.regex で指定する正規表現によりキャプチャーされるグループに関して、挿入されるキーフィールドの値を決定するための正規表現を指定します。SMT がこの正規表現を適用するには、key.enforce.uniqueness をデフォルトの true に設定する必要があります。

schema.name.adjustment.mode

none

結果のトピック名から派生したメッセージキースキーマ名を、コネクターが使用するメッセージコンバーターとの互換性に合わせて調整する方法を指定します。none は調整 (デフォルト) を適用しません。avro は Avro タイプ名で使用できない文字をアンダースコアに置き換えます。

logical.table.cache.size

16

LRUCache で最大エントリーを保持するために使用されるサイズ。キャッシュは、論理テーブルのキーと値の古い/新しいスキーマを保持し、派生キーとトピックの正規表現の結果もキャッシュして、ソースレコードの変換を改善します。

Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.