13.11. ペイロードフィールドに基づいたパーティションへのレコードのルーティング
デフォルトでは、Debezium がデータコレクション内の変更を検出すると、Debezium が発行する変更イベントは、単一の Apache Kafka パーティションを使用するトピックに送信されます。Kafka Connect 自動トピック作成のカスタマイズ で説明されているように、プライマリーキーのハッシュに基づいてイベントを複数のパーティションにルーティングするようにデフォルト設定をカスタマイズできます。
ただし、場合によっては、Debezium がイベントを特定のトピックパーティションにルーティングすることも必要になる場合があります。パーティションルーティング SMT を使用すると、1 つ以上の指定されたペイロードフィールドの値に基づいて、イベントを特定の宛先パーティションにルーティングできます。Debezium は、宛先パーティションを計算するために、指定したフィールド値のハッシュを使用します。
13.11.1. 例: Debezium パーティションルーティング SMT の基本設定
Debezium コネクターの Kafka Connect 設定でパーティションルーティング変換を設定します。設定では次のパラメーターを指定します。
partition.payload.field
- SMT が宛先パーティションを計算するために使用するイベントペイロード内のフィールドを指定します。ドット表記を使用して、ネストされたペイロードフィールドを指定できます。
partition.topic.num
- ルーティング先トピックのパーティション数を指定します。
partition.hash.function
- 宛先パーティションの番号を決定するフィールドのハッシュに使用するハッシュ関数を指定します。
デフォルトでは、Debezium は設定されたデータ収集のすべての変更イベントレコードを単一の Apache Kafka トピックにルーティングします。コネクターでは、イベントレコードはトピックの特定のパーティションに転送されません。
イベントを特定のパーティションにルーティングするように Debezium コネクターを設定するには、Debezium コネクターの Kafka Connect 設定で PartitionRouting
SMT を設定します。
たとえば、コネクター設定に以下の設定を追加します。
... topic.creation.default.partitions=2 topic.creation.default.replication.factor=1 ... topic.prefix=fulfillment transforms=PartitionRouting transforms.PartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting transforms.PartitionRouting.partition.payload.fields=change.name transforms.PartitionRouting.partition.topic.num=2 transforms.PartitionRouting.predicate=allTopic predicates=allTopic predicates.allTopic.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches predicates.allTopic.pattern=fulfillment.* ...
前述の設定に基づいて、SMT は、接頭辞が fulfillment
の名前を持つトピックにバインドされたメッセージを受信するたびに、そのメッセージを特定のトピックパーティションにリダイレクトします。
SMT は、メッセージペイロードの name
フィールドの値のハッシュからターゲットパーティションを計算します。allTopic 述語を指定して、設定は SMT を選択的に適用します。change
という接頭辞は、SMT がデータの before
または after
の状態を記述するペイロード内の要素を自動的に参照できるようにする特別なキーワードです。指定されたフィールドがイベントメッセージに存在しない場合、SMT はそれを無視します。メッセージにどのフィールドも存在しない場合、変換ではイベントメッセージが完全に無視され、メッセージの元のバージョンがデフォルトの宛先トピックに配信されます。SMT 設定の topic.num
設定で指定されたパーティションの数は、Kafka Connect 設定で指定されたパーティションの数と一致する必要があります。たとえば、前述の設定例では、Kafka Connect プロパティー topic.creation.default.partitions
で指定された値は、SMT 設定の topic.num
値と一致します。
Products
表を確認してください。
id | 名前 | 説明 | 重量 |
101 | scooter | 小型二輪スクーター | 3.14 |
102 | 車のバッテリー | 12v の車のバッテリー | 8.1 |
103 | ドリルビット 12 本パック | #40 から #3 までのサイズのドリルビット 12 本パック | 0.8 |
104 | ハンマー | 大工のハンマー 12oz | 0.75 |
105 | ハンマー | 大工のハンマー 14oz | 0.875 |
106 | ハンマー | 大工のハンマー 16oz | 1.0 |
107 | 岩 | さまざまな石が入った箱 | 5.3 |
108 | ジャケット | 耐水性の黒のウインドブレーカー | 0.1 |
109 | スペアタイヤ | 24 インチのスペアタイヤ | 22.2 |
設定に基づいて、SMT はフィールド名が Hammer
であるレコードの変更イベントを同じパーティションにルーティングします。つまり、ID
値が 104
、105
、および 106
のアイテムは同じパーティションにルーティングされます。
13.11.2. 例: Debezium パーティションルーティング SMT の高度な設定
イベントを 2 つのデータコレクション (t1, t2) から同じトピック (例: my_topic) にルーティングし、フィールド f1 と f2 を使用してデータコレクション t1 からのイベントと、データコレクションから f2 からのイベントに分けます。
次の設定を適用できます。
transforms=PartitionRouting transforms.PartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting transforms.PartitionRouting.partition.payload.fields=change.f1,change.f2 transforms.PartitionRouting.partition.topic.num=2 transforms.PartitionRouting.predicate=myTopic predicates=myTopic predicates.myTopic.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches predicates.myTopic.pattern=my_topic
上記の設定では、特定の宛先トピックに送信されるようにイベントを再ルーティングする方法は指定されていません。デフォルトの宛先トピック以外のトピックにイベントを送信する方法は、トピックルーティング SMT と トピックルーティング SMT を参照してください。
13.11.3. Debezium ComputePartition SMT からの移行
Debezium ComputePartition
SMT は、今後のリリースで廃止されます。次のセクションの情報では、ComputePartition
SMT から新しい PartitionRouting
SMT に移行する方法について説明します。
設定ですべてのトピックに対して同じ数のパーティションが設定されていると仮定して、次の ComputePartition 設定を PartitionRouting SMT
に置き換えます。以下の例は、2 つの設定の比較を示しています。
例: 従来の ComputePartition
設定
... topic.creation.default.partitions=2 topic.creation.default.replication.factor=1 ... topic.prefix=fulfillment transforms=ComputePartition transforms.ComputePartition.type=io.debezium.transforms.partitions.ComputePartition transforms.ComputePartition.partition.data-collections.field.mappings=inventory.products:name,inventory.orders:purchaser transforms.ComputePartition.partition.data-collections.partition.num.mappings=inventory.products:2,inventory.orders:2 ...
上記の ComputePartition
を以下の PartitionRouting
設定に置き換えます。例: 以前の ComputePartition
設定を置き換える PartitionRouting
設定
... topic.creation.default.partitions=2 topic.creation.default.replication.factor=1 ... topic.prefix=fulfillment transforms=PartitionRouting transforms.PartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting transforms.PartitionRouting.partition.payload.fields=change.name,change.purchaser transforms.PartitionRouting.partition.topic.num=2 transforms.PartitionRouting.predicate=allTopic predicates=allTopic predicates.allTopic.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches predicates.allTopic.pattern=fulfillment.* ...
SMT が同じ数のパーティションを共有しないトピックにイベントを発行する場合は、トピックごとに一意の partition.num.mappings
値を指定する必要があります。たとえば、次の例では、レガシーの products
コレクションのトピックは 3 つのパーティションで設定され、orders
データコレクションのトピックは 2 つのパーティションで設定されています。
例: さまざまなトピックに一意のパーティション値を設定する従来の ComputePartition
設定
... topic.prefix=fulfillment transforms=ComputePartition transforms.ComputePartition.type=io.debezium.transforms.partitions.ComputePartition transforms.ComputePartition.partition.data-collections.field.mappings=inventory.products:name,inventory.orders:purchaser transforms.ComputePartition.partition.data-collections.partition.num.mappings=inventory.products:3,inventory.orders:2 ...
前述の ComputePartition
設定は、PartitionRouting
設定 (別のトピックの一意の partition.topic.num
値を設定する.PartitionRouting
の設定) に置き換えます。
... topic.prefix=fulfillment transforms=ProductsPartitionRouting,OrdersPartitionRouting transforms.ProductsPartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting transforms.ProductsPartitionRouting.partition.payload.fields=change.name transforms.ProductsPartitionRouting.partition.topic.num=3 transforms.ProductsPartitionRouting.predicate=products transforms.OrdersPartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting transforms.OrdersPartitionRouting.partition.payload.fields=change.purchaser transforms.OrdersPartitionRouting.partition.topic.num=2 transforms.OrdersPartitionRouting.predicate=products predicates=products,orders predicates.products.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches predicates.products.pattern=fulfillment.inventory.products predicates.orders.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches predicates.orders.pattern=fulfillment.inventory.orders ...
13.11.4. パーティションルーティング変換設定用のオプション
次の表に、パーティションルーティング SMT に設定できる設定オプションを示します。
プロパティー | デフォルト | 説明 |
SMT がターゲットパーティションを計算するために使用するイベントペイロード内のフィールドを指定します。SMT で元のペイロードのフィールドを出力データ構造の特定のレベルに追加する場合は、ドット表記を使用します。データ収集に関連するフィールドにアクセスするには、 | ||
この SMT が動作するトピックのパーティションの数。 | ||
|
宛先パーティションの番号を決定するフィールドのハッシュを計算するときに使用されるハッシュ関数。可能な値は次のとおりです。 |