7.2. 将 Debezium 事件记录路由到您指定的主题


每个包含数据更改事件的 Kafka 记录都有一个默认的目标主题。如果需要,您可以将记录重新路由到您在记录到达 Kafka Connect 转换器之前指定的主题。要做到这一点,Debebe 提供主题路由单消息转换(SMT)。在 Debezium 连接器的 Kafka Connect 配置中配置此转换。配置选项允许您指定以下内容:

  • 用于标识重新路由记录的表达式
  • 解析到目标主题的表达式
  • 如何确保将记录中的唯一键重新路由到目标主题

您需要确保转换配置提供了您想要的行为。Debezium 不会验证您的转换配置中结果的行为。

主题路由转换是一个 Kafka Connect SMT

以下主题提供详情:

默认行为是,Debezium 连接器将每个更改事件记录发送到一个主题,其名称由数据库的名称以及进行更改的表的名称发送。换句话说,主题接收一个物理表的记录。当您希望主题接收多个物理表的记录时,您必须配置 Debezium 连接器,将记录重新路由到该主题。

逻辑表

逻辑表是多个物理表的路由表到一个主题的常见用例。在逻辑表中,有许多物理表都具有相同的架构。例如,分片表具有相同的模式。逻辑表可能包含两个或多个分片表: db_shard1.my_tabledb_shard2.my_table。表在不同的分片中,它们物理上不同,但它们组成一个逻辑表。您可以将任何分片中的表重新路由更改事件记录到同一主题。

对 PostgreSQL 表进行分区

当 Debezium PostgreSQL 连接器捕获分区表中的更改时,默认行为是更改事件记录会路由到每个分区的不同主题。要将所有分区中的记录发送到一个主题,请配置主题路由 SMT。因为分区表中的每个键保证是唯一的,因此请配置 key.enforce.uniqueness=false,以便 SMT 不会添加 key 字段来确保唯一键。额外的 key 字段是默认行为。

要将多个物理表的更改事件记录路由到同一主题,请在 Debezium 连接器的 Kafka Connect 配置中配置主题路由转换。配置主题路由 SMT 要求您指定决定以下内容的正则表达式:

  • 用于路由记录的表。这些表必须具有相同的模式。
  • 目标主题名称。

以下示例中的连接器配置为主题路由 SMT 设置几个选项:

transforms=Reroute
transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter
transforms.Reroute.topic.regex=(.*)customers_shard(.*)
transforms.Reroute.topic.replacement=$1customers_all_shards
Copy to Clipboard Toggle word wrap
topic.regex

指定转换应用于每个更改事件记录的正则表达式,以确定它是否应路由到特定主题。

在示例中,正则表达式 (.*)customers_shard(.*) 与名称包含 customers_shard 字符串的表的记录匹配。这将使用以下名称的表重新路由记录:

myserver.mydb.customers_shard1
myserver.mydb.customers_shard2
myserver.mydb.customers_shard3

topic.replacement
指定代表目标主题名称的正则表达式。转换将每个匹配记录路由到此表达式标识的主题。在本例中,上面所列的三个分片表的记录将路由到 myserver.mydb.customers_all_shards 主题。
schema.name.adjustment.mode
指定如何调整从生成的主题名称派生的消息密钥模式名称,以便与连接器使用的消息转换器兼容。该值可以是 none (默认)或 avro

自定义配置

要自定义配置,您可以定义一个 SMT predicate 语句,该语句 指定您要转换来处理的表,或者无法处理。如果您将 SMT 配置为与正则表达式匹配的路由表,并且您不希望 SMT 重新路由与表达式匹配的特定表,则 predicate 可能很有用。

Debezium 更改事件键使用组成表的主键的表列。要将多个物理表的记录路由到一个主题,事件键在所有这些表中必须是唯一的。但是,每个物理表可能会有唯一的主键。例如,myserver.mydb.customers_shard1 表中的一行可能与 myserver.mydb.customers_shard2 表中的行相同。

为确保每个事件键在更改事件记录进入同一主题的表中是唯一的,主题路由转换会将字段插入到更改事件键中。默认情况下,插入的字段的名称为 __dbz__physicalTableIdentifier。inserted 字段的值是默认的目标主题名称。

如果要,您可以配置主题路由转换,将不同的字段插入到密钥中。要做到这一点,请指定 key.field.name 选项,并将其设置为不使用现有主键字段名称冲突的字段名称。例如:

transforms=Reroute
transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter
transforms.Reroute.topic.regex=(.*)customers_shard(.*)
transforms.Reroute.topic.replacement=$1customers_all_shards
transforms.Reroute.key.field.name=shard_id
Copy to Clipboard Toggle word wrap

本例将 shard_id 字段添加到路由记录中的密钥结构中。

如果要调整键的新字段的值,请配置这两个选项:

key.field.regex
指定转换应用于默认目的地主题名称的正则表达式,以捕获一个或多个字符组。
key.field.replacement
指定用来确定在捕获组方面插入的键值的正则表达式。

例如:

transforms.Reroute.key.field.regex=(.*)customers_shard(.*)
transforms.Reroute.key.field.replacement=$2
Copy to Clipboard Toggle word wrap

使用这个配置,假设默认目的地主题名称是:

myserver.mydb.customers_shard1
myserver.mydb.customers_shard2
myserver.mydb.customers_shard3

转换使用第二个捕获的组中的值(分片号)作为键的新字段的值。在本例中,插入的 key 字段的值将是 123

如果您的表包含全局唯一的键,且您不需要更改密钥结构,您可以将 key.enforce.uniqueness 选项设置为 false

...
transforms.Reroute.key.enforce.uniqueness=false
...
Copy to Clipboard Toggle word wrap

7.2.4. 用于有选择地应用主题路由转换的选项

除了 Debezium 连接器在数据库更改时发出的更改事件消息外,连接器还会发出其他类型的信息,包括 heartbeat 信息,以及有关 schema 更改和事务的元数据消息。由于这些消息的结构与 SMT 旨在处理的更改事件消息的结构不同,因此最好将连接器配置为有选择地应用 SMT,以便它只处理预期的数据更改消息。

您可以使用以下方法之一将连接器配置为有选择地应用 SMT:

7.2.5. 配置 Debezium 主题路由转换的选项

下表描述了主题路由 SMT 配置选项。

Expand
表 7.1. 主题路由 SMT 配置选项
选项默认描述

topic.regex

 

指定转换应用于每个更改事件记录的正则表达式,以确定它是否应路由到特定主题。

topic.replacement

 

指定代表目标主题名称的正则表达式。转换将每个匹配记录路由到此表达式标识的主题。此表达式可以引用由您为 topic.regex 指定的正则表达式捕获的组。要引用组,指定 $1$2 等等。

key.enforce​.uniqueness

true

指明是否要在记录的 change 事件键中添加字段。添加 key 字段可确保每个事件键在更改事件记录进入同一主题的表中是唯一的。这有助于防止对具有相同键但来自不同源表的记录更改事件冲突。

如果您不希望转换添加 key 字段,请指定 false。例如,如果您是从分区 PostgreSQL 表到一个主题的路由记录,您可以配置 key.enforce.uniqueness=false,因为分区 PostgreSQL 表中保证了唯一的键。

key.field.name

__dbz__physicalTableIdentifier

要添加到更改事件键的字段名称。此字段的值标识原始表名称。要使 SMT 添加此字段,key.enforce.uniqueness 必须为 true,这是默认值。

key.field.regex

 

指定转换应用于默认目的地主题名称的正则表达式,以捕获一个或多个字符组。要使 SMT 应用此表达式,key.enforce.uniqueness 必须为 true,这是默认值。

key.field​.replacement

 

指定正则表达式,以确定插入的键值,以决定由 key.field.regex 指定的表达式所捕获的组的值。要使 SMT 应用此表达式,key.enforce.uniqueness 必须为 true,这是默认值。

schema.name.adjustment.mode

none

指定来自结果主题名称的消息键 schema 名称应该如何进行调整以保持与连接器使用的消息转换器匹配,这包括: none 不应用任何调整(默认),aro 将 Avro 类型名称中使用的字符替换为下划线。

logical.table.cache.size

16

在 LRUCache 中用于保存最大条目的大小。缓存将保留逻辑表键和值的旧/new 模式,同时缓存派生的 key 和 topic regex 结果以提高源记录转换。

返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat