7.11. 根据有效负载字段将记录路由到分区


默认情况下,当 Debezium 检测到数据收集中的更改时,它会发出的更改事件发送到使用单个 Apache Kafka 分区的主题。如 自定义 Kafka Connect 自动主题创建 中所述,您可以自定义默认配置,以根据主密钥的哈希值将事件路由到多个分区。

然而,在某些情况下,您可能还希望 Debezium 将事件路由到特定的主题分区。分区路由 SMT 可让您根据一个或多个指定有效负载字段的值将事件路由到特定的目标分区。要计算目标分区,Debebe 使用指定字段值的哈希值。

7.11.1. 示例:Debezium 分区路由 SMT 的基本配置

您可以在 Debezium 连接器的 Kafka Connect 配置中配置分区路由转换。配置指定以下参数:

partition.payload.fields
指定 SMT 用来计算目标分区的事件有效负载中的字段。您可以使用点表示法指定嵌套的有效负载字段。
partition.topic.num
指定目标主题中的分区数量。
partition.hash.function
指定用于决定目标分区数的字段哈希的哈希功能。

默认情况下,Debebe 将配置的数据集合的所有更改事件记录路由到单个 Apache Kafka 主题。连接器不会将事件记录定向到主题中的特定分区。

要将 Debezium 连接器配置为将事件路由到特定分区,请在 Debezium 连接器的 Kafka Connect 配置中配置 PartitionRouting SMT。

例如,您可以在连接器配置中添加以下配置。

...
topic.creation.default.partitions=2
topic.creation.default.replication.factor=1
...

topic.prefix=fulfillment
transforms=PartitionRouting
transforms.PartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting
transforms.PartitionRouting.partition.payload.fields=change.name
transforms.PartitionRouting.partition.topic.num=2
transforms.PartitionRouting.predicate=allTopic
predicates=allTopic
predicates.allTopic.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.allTopic.pattern=fulfillment.*
...
Copy to Clipboard Toggle word wrap

根据上述配置,每当 SMT 收到一个与一个主题绑定的消息时,其名称以前缀 fulfillment 开头,它会将消息重定向到特定的主题分区。

SMT 从消息有效负载中 name 字段的值的哈希值计算目标分区。通过指定'allTopic' predicate,配置会选择性地应用 SMT。更改 前缀是一个特殊的关键字,可让 SMT 自动引用有效负载中的元素,该元素描述数据的 beforeafter 状态。如果事件消息中没有指定字段,则 SMT 会忽略它。如果消息中没有字段,则转换会完全忽略事件消息,并将原始版本的消息传送到默认的目的地主题。SMT 配置中的 topic.num 设置指定的分区数量必须与 Kafka Connect 配置中指定的分区数量匹配。例如,在前面的配置示例中,Kafka Connect property topic.creation.default.partitions 指定的值与 SMT 配置中的 topic.num 值匹配。

给定 此产品

Expand
表 7.12. products 表

id

名称

description

weight

101

scooter

小的 2-wheel scooter

3.14

102

car battery

12v car battery

8.1

103

12-pack drill 位

12-pack ofth 位,大小范围从 #40 到 #3

0.8

104

hammer

12Oz carpenter 的 hammer

0.75

105

hammer

14Oz carpenter 的 hammer

0.875

106

hammer

16Oz carpenter 的 hammer

1.0

107

rocks

特有的特质学位

5.3

108

jacket

Water resistent black wind breaker

0.1

109

spare tire

24 inch spare tire

22.2

根据配置,SMT 路由将字段名为 hammer 的记录更改事件到同一分区。也就是说,ID 值为 104105106 的项目被路由到同一分区。

7.11.2. 示例:Debezium 分区路由 SMT 的高级配置

假设您想要将事件从两个数据收集(t1、t2)路由到同一主题(如 my_topic),并且您希望使用字段 f1 将来自数据收集的事件分区,并使用字段 f2 从数据收集 t2 分区事件。

您可以应用以下配置:

transforms=PartitionRouting
transforms.PartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting
transforms.PartitionRouting.partition.payload.fields=change.f1,change.f2
transforms.PartitionRouting.partition.topic.num=2
transforms.PartitionRouting.predicate=myTopic

predicates=myTopic
predicates.myTopic.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.myTopic.pattern=my_topic
Copy to Clipboard Toggle word wrap

前面的配置没有指定如何重新路由事件,以便它们发送到特定的目标主题。有关如何将事件发送到其默认目标主题以外的主题,请参阅 主题 路由 SMT

7.11.3. 从 Debezium ComputePartition SMT 迁移

Debezium ComputePartition SMT 已停止。以下部分中的信息论述了如何从 ComputePartition SMT 迁移到新的 PartitionRouting SMT。

假设配置为所有主题设置相同的分区数量,请将以下 ComputePartition'configuration 替换为"PartitionRouting SMT"。以下示例提供了两个配置的比较。

示例:传统的 ComputePartition 配置

...
topic.creation.default.partitions=2
topic.creation.default.replication.factor=1
...
topic.prefix=fulfillment
transforms=ComputePartition
transforms.ComputePartition.type=io.debezium.transforms.partitions.ComputePartition
transforms.ComputePartition.partition.data-collections.field.mappings=inventory.products:name,inventory.orders:purchaser
transforms.ComputePartition.partition.data-collections.partition.num.mappings=inventory.products:2,inventory.orders:2
...
Copy to Clipboard Toggle word wrap

将前面的 ComputePartition 替换为以下 PartitionRouting 配置。示例:替换早期 ComputePartition 配置的 PartitionRouting 配置

...
topic.creation.default.partitions=2
topic.creation.default.replication.factor=1
...

topic.prefix=fulfillment
transforms=PartitionRouting
transforms.PartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting
transforms.PartitionRouting.partition.payload.fields=change.name,change.purchaser
transforms.PartitionRouting.partition.topic.num=2
transforms.PartitionRouting.predicate=allTopic
predicates=allTopic
predicates.allTopic.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.allTopic.pattern=fulfillment.*
...
Copy to Clipboard Toggle word wrap

如果 SMT 向没有共享相同分区数量的主题发出事件,则必须为每个主题指定唯一的 partition.num.mappings 值。例如,在以下示例中,旧 产品 集合的主题配置了 3 个分区,订购 数据收集的主题配置了 2 个分区:

示例:传统的 ComputePartition 配置,为不同的主题设置唯一的分区值

...
topic.prefix=fulfillment
transforms=ComputePartition
transforms.ComputePartition.type=io.debezium.transforms.partitions.ComputePartition
transforms.ComputePartition.partition.data-collections.field.mappings=inventory.products:name,inventory.orders:purchaser
transforms.ComputePartition.partition.data-collections.partition.num.mappings=inventory.products:3,inventory.orders:2
...
Copy to Clipboard Toggle word wrap

使用以下 PartitionRouting 配置替换前面的 ComputePartition 配置:.PartitionRouting 配置,为不同的主题设置唯一的 partition.topic.num

...
topic.prefix=fulfillment

transforms=ProductsPartitionRouting,OrdersPartitionRouting
transforms.ProductsPartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting
transforms.ProductsPartitionRouting.partition.payload.fields=change.name
transforms.ProductsPartitionRouting.partition.topic.num=3
transforms.ProductsPartitionRouting.predicate=products

transforms.OrdersPartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting
transforms.OrdersPartitionRouting.partition.payload.fields=change.purchaser
transforms.OrdersPartitionRouting.partition.topic.num=2
transforms.OrdersPartitionRouting.predicate=products

predicates=products,orders
predicates.products.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.products.pattern=fulfillment.inventory.products
predicates.orders.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.orders.pattern=fulfillment.inventory.orders
...
Copy to Clipboard Toggle word wrap

7.11.4. 配置分区路由转换的选项

下表列出了您可以为分区路由 SMT 设置的配置选项。

Expand
表 7.13. 分区路由 SMT (PartitionRouting)配置选项

属性

默认

描述

partition.payload.fields

 

指定 SMT 用来计算目标分区的事件有效负载中的字段。如果您希望 SMT 将原始有效负载中的字段添加到输出数据结构中的特定级别,请使用点表示法。要访问与数据收集相关的字段,您可以使用:在、之前、或 更改'change' 字段是一个特殊字段,它导致 SMT 会自动填充 'after' 或 'before' 元素中的内容,具体取决于操作类型。如果记录中没有指定字段,则 SMT 会跳过它。例如,after.name,source.table,change.name

partition.topic.num

 

此 SMT 操作的主题的分区数量。使用 TopicNameMatches predicate 根据主题过滤记录。

partition.hash.function

java

在确定目标分区数的字段的计算哈希时使用哈希功能。可能的值有:

java - standard Java Object::hashCode function

murmur - latest of MurmurHash 功能, MurmurHash3

此配置是可选的。如果没有指定或无效值,则使用默认值。

返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat