13.11. 根据有效负载字段将记录路由到分区
默认情况下,当 Debezium 检测到数据收集的变化时,它会发出的更改事件发送到使用单个 Apache Kafka 分区的主题。如 自定义 Kafka Connect 自动主题创建 中所述,您可以根据主密钥的哈希值自定义默认配置,将事件路由到多个分区。
然而,在某些情况下,您可能还希望 Debezium 将事件路由到特定的主题分区。分区路由 SMT 可让您根据一个或多个指定有效负载字段的值将事件路由到特定的目标分区。要计算目标分区,Debebe 使用指定字段值的哈希值。
13.11.1. 示例: Debezium 分区路由 SMT 的基本配置
您可以在 Debezium 连接器的 Kafka Connect 配置中配置分区路由转换。配置指定以下参数:
partition.payload.field
- 指定 SMT 用来计算目标分区的事件有效负载中的字段。您可以使用点表示法来指定嵌套有效负载字段。
partition.topic.num
- 指定目标主题中的分区数量。
partition.hash.function
- 指定用来决定目标分区数的字段哈希函数。
默认情况下,Debezium 将配置的数据收集的所有更改事件记录路由到单个 Apache Kafka 主题。连接器不会将事件记录定向到主题中的特定分区。
要将 Debezium 连接器配置为将事件路由到特定分区,请在 Debezium 连接器的 Kafka Connect 配置中配置 PartitionRouting
SMT。
例如,您可以在连接器配置中添加以下配置。
... topic.creation.default.partitions=2 topic.creation.default.replication.factor=1 ... topic.prefix=fulfillment transforms=PartitionRouting transforms.PartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting transforms.PartitionRouting.partition.payload.fields=change.name transforms.PartitionRouting.partition.topic.num=2 transforms.PartitionRouting.predicate=allTopic predicates=allTopic predicates.allTopic.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches predicates.allTopic.pattern=fulfillment.* ...
根据前面的配置,当 SMT 收到一个名称以前缀开头的主题绑定的消息时,它会将消息重定向到特定的主题分区。
SMT 从消息有效负载中的 name
字段的值的哈希值计算目标分区。通过指定'allTopic' predicate,配置有选择地应用 SMT。更改
前缀是一个特殊关键字,它允许 SMT 自动引用有效负载中的元素,该元素描述数据的 before
或 after
状态。如果事件消息中没有指定的字段,则 SMT 会忽略它。如果消息中没有字段,则转换将完全忽略事件消息,并将消息的原始版本传送到默认目的地主题。SMT 配置中的 topic.num
设置指定的分区数量必须与 Kafka Connect 配置指定的分区数量匹配。例如,在前面的配置示例中,由 Kafka Connect 属性 topic.creation.default.partitions
指定的值与 SMT 配置中的 topic.num
值匹配。
根据 这个
产品表
id | name | description | weight |
101 | scooter | small 2-wheel scooter | 3.14 |
102 | car battery | 12v car battery | 8.1 |
103 | 12-pack 深入位 | 12-pack ofdepth位,大小范围从 #40 到 #3 | 0.8 |
104 | hammer | 12Oz carpenter 的 hammer | 0.75 |
105 | hammer | 14Oz carpenter 的 hammer | 0.875 |
106 | hammer | 16Oz carpenter 的 hammer | 1.0 |
107 | rocks | 分类的原则的方框 | 5.3 |
108 | jacket | Water resistent black wind breaker | 0.1 |
109 | spare tire | 24 个空闲备份 | 22.2 |
根据配置,SMT 将字段名称 hammer
的记录更改事件到同一分区。也就是说,id
值 104
、105
和 106
的项目会被路由到同一分区。
13.11.2. 示例: Debezium 分区路由 SMT 的高级配置
假设您想要将来自两个数据收集(t1、t2)的事件路由到相同的主题(例如,my_topic),并且您希望使用字段 f1 从数据收集 t1 分区事件,并使用字段 f2 从数据收集 t2 分区事件。
您可以应用以下配置:
transforms=PartitionRouting transforms.PartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting transforms.PartitionRouting.partition.payload.fields=change.f1,change.f2 transforms.PartitionRouting.partition.topic.num=2 transforms.PartitionRouting.predicate=myTopic predicates=myTopic predicates.myTopic.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches predicates.myTopic.pattern=my_topic
以上配置没有指定如何重新路由事件,以便它们发送到特定的目标主题。有关如何将事件发送到其默认目标主题以外的主题的详情,请查看 主题路由 SMT。https://access.redhat.com/documentation/zh-cn/red_hat_integration/2023.q4/html-single/debezium_user_guide/index
13.11.3. 从 Debezium ComputePartition SMT 迁移
Debezium ComputePartition
SMT 将在以后的发行版本中停用。下面的部分的信息描述了如何从 ComputePartition
SMT 迁移到新的 PartitionRouting
SMT。
假设配置为所有主题设置相同分区数量,请将以下 ComputePartition'configuration 替换为"PartitionRouting
SMT"。以下示例提供了两个配置的比较。
示例: Legacy ComputePartition
配置
... topic.creation.default.partitions=2 topic.creation.default.replication.factor=1 ... topic.prefix=fulfillment transforms=ComputePartition transforms.ComputePartition.type=io.debezium.transforms.partitions.ComputePartition transforms.ComputePartition.partition.data-collections.field.mappings=inventory.products:name,inventory.orders:purchaser transforms.ComputePartition.partition.data-collections.partition.num.mappings=inventory.products:2,inventory.orders:2 ...
将前面的 ComputePartition
替换为以下 分区
配置。示例:替换早期 ComputePartition
配置的分区 Routing
配置
... topic.creation.default.partitions=2 topic.creation.default.replication.factor=1 ... topic.prefix=fulfillment transforms=PartitionRouting transforms.PartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting transforms.PartitionRouting.partition.payload.fields=change.name,change.purchaser transforms.PartitionRouting.partition.topic.num=2 transforms.PartitionRouting.predicate=allTopic predicates=allTopic predicates.allTopic.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches predicates.allTopic.pattern=fulfillment.* ...
如果 SMT 将事件发送到没有共享相同分区的主题,您必须为每个主题指定唯一的 partition.num.mappings
值。例如,在以下示例中,旧 产品
集合的主题配置了 3 个分区,而 orders
数据收集的主题被配置为 2 个分区:
示例:为不同主题设置唯一分区值的 Legacy ComputePartition
配置
... topic.prefix=fulfillment transforms=ComputePartition transforms.ComputePartition.type=io.debezium.transforms.partitions.ComputePartition transforms.ComputePartition.partition.data-collections.field.mappings=inventory.products:name,inventory.orders:purchaser transforms.ComputePartition.partition.data-collections.partition.num.mappings=inventory.products:3,inventory.orders:2 ...
将前面的 ComputePartition
配置替换为以下 PartitionRouting
配置:.PartitionRouting
配置,为不同的主题设置唯一的 partition.topic.num
值
... topic.prefix=fulfillment transforms=ProductsPartitionRouting,OrdersPartitionRouting transforms.ProductsPartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting transforms.ProductsPartitionRouting.partition.payload.fields=change.name transforms.ProductsPartitionRouting.partition.topic.num=3 transforms.ProductsPartitionRouting.predicate=products transforms.OrdersPartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting transforms.OrdersPartitionRouting.partition.payload.fields=change.purchaser transforms.OrdersPartitionRouting.partition.topic.num=2 transforms.OrdersPartitionRouting.predicate=products predicates=products,orders predicates.products.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches predicates.products.pattern=fulfillment.inventory.products predicates.orders.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches predicates.orders.pattern=fulfillment.inventory.orders ...
13.11.4. 用于配置分区路由转换的选项
下表列出了您可以为分区路由 SMT 设置的配置选项。
属性 | 默认 | 描述 |
指定 SMT 用来计算目标分区的事件有效负载中的字段。如果您希望 SMT 将原始有效负载中的字段添加到输出数据结构中的特定级别,请使用点表示法。要访问与数据收集相关的字段,您可以使用: | ||
此 SMT 行为的主题分区数量。使用 | ||
|
在计算要确定目标分区数目的字段的计算哈希时,要使用的哈希函数。可能的值有: |