13.11. 根据有效负载字段将记录路由到分区


默认情况下,当 Debezium 检测到数据收集的变化时,它会发出的更改事件发送到使用单个 Apache Kafka 分区的主题。如 自定义 Kafka Connect 自动主题创建 中所述,您可以根据主密钥的哈希值自定义默认配置,将事件路由到多个分区。

然而,在某些情况下,您可能还希望 Debezium 将事件路由到特定的主题分区。分区路由 SMT 可让您根据一个或多个指定有效负载字段的值将事件路由到特定的目标分区。要计算目标分区,Debebe 使用指定字段值的哈希值。

13.11.1. 示例: Debezium 分区路由 SMT 的基本配置

您可以在 Debezium 连接器的 Kafka Connect 配置中配置分区路由转换。配置指定以下参数:

partition.payload.field
指定 SMT 用来计算目标分区的事件有效负载中的字段。您可以使用点表示法来指定嵌套有效负载字段。
partition.topic.num
指定目标主题中的分区数量。
partition.hash.function
指定用来决定目标分区数的字段哈希函数。

默认情况下,Debezium 将配置的数据收集的所有更改事件记录路由到单个 Apache Kafka 主题。连接器不会将事件记录定向到主题中的特定分区。

要将 Debezium 连接器配置为将事件路由到特定分区,请在 Debezium 连接器的 Kafka Connect 配置中配置 PartitionRouting SMT。

例如,您可以在连接器配置中添加以下配置。

...
topic.creation.default.partitions=2
topic.creation.default.replication.factor=1
...

topic.prefix=fulfillment
transforms=PartitionRouting
transforms.PartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting
transforms.PartitionRouting.partition.payload.fields=change.name
transforms.PartitionRouting.partition.topic.num=2
transforms.PartitionRouting.predicate=allTopic
predicates=allTopic
predicates.allTopic.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.allTopic.pattern=fulfillment.*
...

根据前面的配置,当 SMT 收到一个名称以前缀开头的主题绑定的消息时,它会将消息重定向到特定的主题分区。

SMT 从消息有效负载中的 name 字段的值的哈希值计算目标分区。通过指定'allTopic' predicate,配置有选择地应用 SMT。更改 前缀是一个特殊关键字,它允许 SMT 自动引用有效负载中的元素,该元素描述数据的 beforeafter 状态。如果事件消息中没有指定的字段,则 SMT 会忽略它。如果消息中没有字段,则转换将完全忽略事件消息,并将消息的原始版本传送到默认目的地主题。SMT 配置中的 topic.num 设置指定的分区数量必须与 Kafka Connect 配置指定的分区数量匹配。例如,在前面的配置示例中,由 Kafka Connect 属性 topic.creation.default.partitions 指定的值与 SMT 配置中的 topic.num 值匹配。

根据 这个 产品表

表 13.12. 产品表

id

name

description

weight

101

scooter

small 2-wheel scooter

3.14

102

car battery

12v car battery

8.1

103

12-pack 深入位

12-pack ofdepth位,大小范围从 #40 到 #3

0.8

104

hammer

12Oz carpenter 的 hammer

0.75

105

hammer

14Oz carpenter 的 hammer

0.875

106

hammer

16Oz carpenter 的 hammer

1.0

107

rocks

分类的原则的方框

5.3

108

jacket

Water resistent black wind breaker

0.1

109

spare tire

24 个空闲备份

22.2

根据配置,SMT 将字段名称 hammer 的记录更改事件到同一分区。也就是说,id104105106 的项目会被路由到同一分区。

13.11.2. 示例: Debezium 分区路由 SMT 的高级配置

假设您想要将来自两个数据收集(t1、t2)的事件路由到相同的主题(例如,my_topic),并且您希望使用字段 f1 从数据收集 t1 分区事件,并使用字段 f2 从数据收集 t2 分区事件。

您可以应用以下配置:

transforms=PartitionRouting
transforms.PartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting
transforms.PartitionRouting.partition.payload.fields=change.f1,change.f2
transforms.PartitionRouting.partition.topic.num=2
transforms.PartitionRouting.predicate=myTopic

predicates=myTopic
predicates.myTopic.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.myTopic.pattern=my_topic

以上配置没有指定如何重新路由事件,以便它们发送到特定的目标主题。有关如何将事件发送到其默认目标主题以外的主题的详情,请查看 主题路由 SMThttps://access.redhat.com/documentation/zh-cn/red_hat_integration/2023.q4/html-single/debezium_user_guide/index

13.11.3. 从 Debezium ComputePartition SMT 迁移

Debezium ComputePartition SMT 将在以后的发行版本中停用。下面的部分的信息描述了如何从 ComputePartition SMT 迁移到新的 PartitionRouting SMT。

假设配置为所有主题设置相同分区数量,请将以下 ComputePartition'configuration 替换为"PartitionRouting SMT"。以下示例提供了两个配置的比较。

示例: Legacy ComputePartition 配置

...
topic.creation.default.partitions=2
topic.creation.default.replication.factor=1
...
topic.prefix=fulfillment
transforms=ComputePartition
transforms.ComputePartition.type=io.debezium.transforms.partitions.ComputePartition
transforms.ComputePartition.partition.data-collections.field.mappings=inventory.products:name,inventory.orders:purchaser
transforms.ComputePartition.partition.data-collections.partition.num.mappings=inventory.products:2,inventory.orders:2
...

将前面的 ComputePartition 替换为以下 分区 配置。示例:替换早期 ComputePartition 配置的分区 Routing 配置

...
topic.creation.default.partitions=2
topic.creation.default.replication.factor=1
...

topic.prefix=fulfillment
transforms=PartitionRouting
transforms.PartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting
transforms.PartitionRouting.partition.payload.fields=change.name,change.purchaser
transforms.PartitionRouting.partition.topic.num=2
transforms.PartitionRouting.predicate=allTopic
predicates=allTopic
predicates.allTopic.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.allTopic.pattern=fulfillment.*
...

如果 SMT 将事件发送到没有共享相同分区的主题,您必须为每个主题指定唯一的 partition.num.mappings 值。例如,在以下示例中,旧 产品 集合的主题配置了 3 个分区,而 orders 数据收集的主题被配置为 2 个分区:

示例:为不同主题设置唯一分区值的 Legacy ComputePartition 配置

...
topic.prefix=fulfillment
transforms=ComputePartition
transforms.ComputePartition.type=io.debezium.transforms.partitions.ComputePartition
transforms.ComputePartition.partition.data-collections.field.mappings=inventory.products:name,inventory.orders:purchaser
transforms.ComputePartition.partition.data-collections.partition.num.mappings=inventory.products:3,inventory.orders:2
...

将前面的 ComputePartition 配置替换为以下 PartitionRouting 配置:.PartitionRouting 配置,为不同的主题设置唯一的 partition.topic.num

...
topic.prefix=fulfillment

transforms=ProductsPartitionRouting,OrdersPartitionRouting
transforms.ProductsPartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting
transforms.ProductsPartitionRouting.partition.payload.fields=change.name
transforms.ProductsPartitionRouting.partition.topic.num=3
transforms.ProductsPartitionRouting.predicate=products

transforms.OrdersPartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting
transforms.OrdersPartitionRouting.partition.payload.fields=change.purchaser
transforms.OrdersPartitionRouting.partition.topic.num=2
transforms.OrdersPartitionRouting.predicate=products

predicates=products,orders
predicates.products.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.products.pattern=fulfillment.inventory.products
predicates.orders.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.orders.pattern=fulfillment.inventory.orders
...

13.11.4. 用于配置分区路由转换的选项

下表列出了您可以为分区路由 SMT 设置的配置选项。

表 13.13. 分区路由 SMT(分区)配置选项

属性

默认

描述

partition.payload.fields

 

指定 SMT 用来计算目标分区的事件有效负载中的字段。如果您希望 SMT 将原始有效负载中的字段添加到输出数据结构中的特定级别,请使用点表示法。要访问与数据收集相关的字段,您可以使用: 之前 或更改。'change' 字段是一个特殊字段,它会根据操作类型在 'after' 或 'before' 元素中自动填充内容。如果记录中没有指定字段,则 SMT 会跳过它。例如, after.name,source.table,change.name

partition.topic.num

 

此 SMT 行为的主题分区数量。使用 TopicNameMatches predicate 按主题过滤记录。

partition.hash.function

java

在计算要确定目标分区数目的字段的计算哈希时,要使用的哈希函数。可能的值有:

java - 标准 Java Object::hashCode function

murmur - latest version of MurmurHash 功能, MurmurHash3

this configuration is optional.如果没有指定或无效值,则使用默认值。

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.