13.11. ペイロードフィールドに基づいたパーティションへのレコードのルーティング


デフォルトでは、Debezium がデータコレクション内の変更を検出すると、Debezium が発行する変更イベントは、単一の Apache Kafka パーティションを使用するトピックに送信されます。Kafka Connect 自動トピック作成のカスタマイズ で説明されているように、プライマリーキーのハッシュに基づいてイベントを複数のパーティションにルーティングするようにデフォルト設定をカスタマイズできます。

ただし、場合によっては、Debezium がイベントを特定のトピックパーティションにルーティングすることも必要になる場合があります。パーティションルーティング SMT を使用すると、1 つ以上の指定されたペイロードフィールドの値に基づいて、イベントを特定の宛先パーティションにルーティングできます。Debezium は、宛先パーティションを計算するために、指定したフィールド値のハッシュを使用します。

13.11.1. 例: Debezium パーティションルーティング SMT の基本設定

Debezium コネクターの Kafka Connect 設定でパーティションルーティング変換を設定します。設定では次のパラメーターを指定します。

partition.payload.field
SMT が宛先パーティションを計算するために使用するイベントペイロード内のフィールドを指定します。ドット表記を使用して、ネストされたペイロードフィールドを指定できます。
partition.topic.num
ルーティング先トピックのパーティション数を指定します。
partition.hash.function
宛先パーティションの番号を決定するフィールドのハッシュに使用するハッシュ関数を指定します。

デフォルトでは、Debezium は設定されたデータ収集のすべての変更イベントレコードを単一の Apache Kafka トピックにルーティングします。コネクターでは、イベントレコードはトピックの特定のパーティションに転送されません。

イベントを特定のパーティションにルーティングするように Debezium コネクターを設定するには、Debezium コネクターの Kafka Connect 設定で PartitionRouting SMT を設定します。

たとえば、コネクター設定に以下の設定を追加します。

...
topic.creation.default.partitions=2
topic.creation.default.replication.factor=1
...

topic.prefix=fulfillment
transforms=PartitionRouting
transforms.PartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting
transforms.PartitionRouting.partition.payload.fields=change.name
transforms.PartitionRouting.partition.topic.num=2
transforms.PartitionRouting.predicate=allTopic
predicates=allTopic
predicates.allTopic.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.allTopic.pattern=fulfillment.*
...

前述の設定に基づいて、SMT は、接頭辞が fulfillment の名前を持つトピックにバインドされたメッセージを受信するたびに、そのメッセージを特定のトピックパーティションにリダイレクトします。

SMT は、メッセージペイロードの name フィールドの値のハッシュからターゲットパーティションを計算します。allTopic 述語を指定して、設定は SMT を選択的に適用します。change という接頭辞は、SMT がデータの before または after の状態を記述するペイロード内の要素を自動的に参照できるようにする特別なキーワードです。指定されたフィールドがイベントメッセージに存在しない場合、SMT はそれを無視します。メッセージにどのフィールドも存在しない場合、変換ではイベントメッセージが完全に無視され、メッセージの元のバージョンがデフォルトの宛先トピックに配信されます。SMT 設定の topic.num 設定で指定されたパーティションの数は、Kafka Connect 設定で指定されたパーティションの数と一致する必要があります。たとえば、前述の設定例では、Kafka Connect プロパティー topic.creation.default.partitions で指定された値は、SMT 設定の topic.num 値と一致します。

Products 表を確認してください。

表13.12 Product の表

id

名前

説明

重量

101

scooter

小型二輪スクーター

3.14

102

車のバッテリー

12v の車のバッテリー

8.1

103

ドリルビット 12 本パック

#40 から #3 までのサイズのドリルビット 12 本パック

0.8

104

ハンマー

大工のハンマー 12oz

0.75

105

ハンマー

大工のハンマー 14oz

0.875

106

ハンマー

大工のハンマー 16oz

1.0

107

さまざまな石が入った箱

5.3

108

ジャケット

耐水性の黒のウインドブレーカー

0.1

109

スペアタイヤ

24 インチのスペアタイヤ

22.2

設定に基づいて、SMT はフィールド名が Hammer であるレコードの変更イベントを同じパーティションにルーティングします。つまり、ID 値が 104105、および 106 のアイテムは同じパーティションにルーティングされます。

13.11.2. 例: Debezium パーティションルーティング SMT の高度な設定

イベントを 2 つのデータコレクション (t1, t2) から同じトピック (例: my_topic) にルーティングし、フィールド f1 と f2 を使用してデータコレクション t1 からのイベントと、データコレクションから f2 からのイベントに分けます。

次の設定を適用できます。

transforms=PartitionRouting
transforms.PartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting
transforms.PartitionRouting.partition.payload.fields=change.f1,change.f2
transforms.PartitionRouting.partition.topic.num=2
transforms.PartitionRouting.predicate=myTopic

predicates=myTopic
predicates.myTopic.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.myTopic.pattern=my_topic

上記の設定では、特定の宛先トピックに送信されるようにイベントを再ルーティングする方法は指定されていません。デフォルトの宛先トピック以外のトピックにイベントを送信する方法は、トピックルーティング SMTトピックルーティング SMT を参照してください。

13.11.3. Debezium ComputePartition SMT からの移行

Debezium ComputePartition SMT は、今後のリリースで廃止されます。次のセクションの情報では、ComputePartition SMT から新しい PartitionRouting SMT に移行する方法について説明します。

設定ですべてのトピックに対して同じ数のパーティションが設定されていると仮定して、次の ComputePartition 設定を PartitionRouting SMT に置き換えます。以下の例は、2 つの設定の比較を示しています。

例: 従来の ComputePartition 設定

...
topic.creation.default.partitions=2
topic.creation.default.replication.factor=1
...
topic.prefix=fulfillment
transforms=ComputePartition
transforms.ComputePartition.type=io.debezium.transforms.partitions.ComputePartition
transforms.ComputePartition.partition.data-collections.field.mappings=inventory.products:name,inventory.orders:purchaser
transforms.ComputePartition.partition.data-collections.partition.num.mappings=inventory.products:2,inventory.orders:2
...

上記の ComputePartition を以下の PartitionRouting 設定に置き換えます。例: 以前の ComputePartition 設定を置き換える PartitionRouting 設定

...
topic.creation.default.partitions=2
topic.creation.default.replication.factor=1
...

topic.prefix=fulfillment
transforms=PartitionRouting
transforms.PartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting
transforms.PartitionRouting.partition.payload.fields=change.name,change.purchaser
transforms.PartitionRouting.partition.topic.num=2
transforms.PartitionRouting.predicate=allTopic
predicates=allTopic
predicates.allTopic.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.allTopic.pattern=fulfillment.*
...

SMT が同じ数のパーティションを共有しないトピックにイベントを発行する場合は、トピックごとに一意の partition.num.mappings 値を指定する必要があります。たとえば、次の例では、レガシーの products コレクションのトピックは 3 つのパーティションで設定され、orders データコレクションのトピックは 2 つのパーティションで設定されています。

例: さまざまなトピックに一意のパーティション値を設定する従来の ComputePartition 設定

...
topic.prefix=fulfillment
transforms=ComputePartition
transforms.ComputePartition.type=io.debezium.transforms.partitions.ComputePartition
transforms.ComputePartition.partition.data-collections.field.mappings=inventory.products:name,inventory.orders:purchaser
transforms.ComputePartition.partition.data-collections.partition.num.mappings=inventory.products:3,inventory.orders:2
...

前述の ComputePartition 設定は、PartitionRouting 設定 (別のトピックの一意の partition.topic.num 値を設定する.PartitionRouting の設定) に置き換えます。

...
topic.prefix=fulfillment

transforms=ProductsPartitionRouting,OrdersPartitionRouting
transforms.ProductsPartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting
transforms.ProductsPartitionRouting.partition.payload.fields=change.name
transforms.ProductsPartitionRouting.partition.topic.num=3
transforms.ProductsPartitionRouting.predicate=products

transforms.OrdersPartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting
transforms.OrdersPartitionRouting.partition.payload.fields=change.purchaser
transforms.OrdersPartitionRouting.partition.topic.num=2
transforms.OrdersPartitionRouting.predicate=products

predicates=products,orders
predicates.products.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.products.pattern=fulfillment.inventory.products
predicates.orders.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.orders.pattern=fulfillment.inventory.orders
...

13.11.4. パーティションルーティング変換設定用のオプション

次の表に、パーティションルーティング SMT に設定できる設定オプションを示します。

表13.13 パーティションルーティング SMT (PartitionRouting) 設定オプション

プロパティー

デフォルト

説明

partition.payload.fields

 

SMT がターゲットパーティションを計算するために使用するイベントペイロード内のフィールドを指定します。SMT で元のペイロードのフィールドを出力データ構造の特定のレベルに追加する場合は、ドット表記を使用します。データ収集に関連するフィールドにアクセスするには、afterbefore、または change を使用できます。'change' フィールドは、SMT が操作のタイプに応じて、'after' または 'before' 要素にコンテンツを自動的に入力する特別なフィールドです。指定されたフィールドがレコード内に存在しない場合、SMT はそのフィールドをスキップします。たとえば、after.name,source.table,change.name などです。

partition.topic.num

 

この SMT が動作するトピックのパーティションの数。TopicNameMatches 述語を使用して、トピックごとにレコードをフィルターします。

partition.hash.function

java

宛先パーティションの番号を決定するフィールドのハッシュを計算するときに使用されるハッシュ関数。可能な値は次のとおりです。

java - 標準の Java Object::hashCode 関数

murmur - MurmurHash 関数の最新バージョン、MurmurHash3

この設定はオプションです。指定がない場合または無効な値が使用された場合は、デフォルト値が使用されます。

Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.