13.2. 将 Debezium 事件记录路由到您指定的主题


每个包含数据更改事件的 Kafka 记录都有一个默认目的地主题。如果需要,您可以将记录重新路由到您在记录到达 Kafka Connect 转换前指定的主题。要做到这一点,Debebebe 提供主题路由单一消息转换(SMT)。在 Debezium 连接器的 Kafka Connect 配置中配置此转换。配置选项允许您指定以下内容:

  • 用于标识要重新路由的记录的表达式
  • 解析到目标主题的表达式
  • 如何确保将记录中的唯一密钥重新路由到目标主题

您要确保转换配置提供了您想要的行为。Debezium 不会验证您的转换配置的结果行为。

主题路由转换是一个 Kafka Connect SMT

以下主题提供详情:

13.2.1. 将 Debezium 记录路由到您指定的主题用例

默认行为是,Debezium 连接器会将每个更改事件记录发送到一个主题,其名称是从数据库的名称以及进行更改的表的名称。换句话说,主题接收一个物理表的记录。当希望一个主题接收多个物理表的记录时,您必须将 Debezium 连接器配置为将记录重新路由到该主题。

逻辑表

逻辑表是多个物理表路由到一个主题的常见用例。在逻辑表中,有多个物理表都有相同的模式。例如,分片表具有相同的模式。逻辑表可能由两个或多个分片表组成: db_shard1.my_tabledb_shard2.my_table。表在不同的分片中,物理上是不同的,但它们组成一个逻辑表。您可以将任何分片中表的事件记录重新路由到同一主题。

分区的 PostgreSQL 表

当 Debezium PostgreSQL 连接器捕获分区表中的更改时,默认行为是更改事件记录会路由到每个分区的不同主题。要将记录从所有分区发送到一个主题,请配置主题路由 SMT。因为分区表中的每个键都保证是唯一的,所以请配置 key.enforce.uniqueness=false,以便 SMT 不添加 key 字段以确保唯一的密钥。额外的 key 字段是默认行为。

13.2.2. 多个表的路由 Debezium 记录示例

要将多个物理表的事件记录更改为同一主题,请在 Debezium 连接器的 Kafka Connect 配置中配置主题路由转换。配置主题路由 SMT 要求您指定确定的正则表达式:

  • 要路由记录的表。这些表都必须具有相同的模式。
  • 目标主题名称。

以下示例中的连接器配置为主题路由 SMT 设置几个选项:

transforms=Reroute
transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter
transforms.Reroute.topic.regex=(.*)customers_shard(.*)
transforms.Reroute.topic.replacement=$1customers_all_shards
topic.regex

指定转换适用于每个更改事件记录的正则表达式,以确定它是否应该路由到特定主题。

在示例中,正则表达式 (.*)customers_shard(.*) 与名称包含 customers_shard 字符串的表的记录匹配。这将使用以下名称为表重新路由记录:

myserver.mydb.customers_shard1
myserver.mydb.customers_shard2
myserver.mydb.customers_shard3

topic.replacement
指定代表目标主题名称的正则表达式。转换会将每个匹配记录路由到此表达式标识的主题。在本例中,上面列出的三个分片表的记录将路由到 myserver.mydb.customers_all_shards 主题。
schema.name.adjustment.mode
指定来自结果主题名称的消息键模式名称应该如何进行调整,以便与连接器使用的消息转换器兼容。该值可以是 none (默认)或 avro

自定义配置

要自定义配置,您可以定义一个 SMT predicate 语句,用于指定希望转换过程的表,或不处理。如果您将 SMT 配置为与正则表达式匹配的路由表,且您不希望 SMT 重新路由与表达式匹配的特定表,则 predicate 可能会很有用。

13.2.3. 确保路由到同一主题的 Debezium 记录间的唯一键

Debezium 更改事件键使用组成表的主键的表列。要将多个物理表的记录路由到一个主题,事件键必须在所有这些表中唯一。但是,每个物理表都可以有一个主键,该密钥只在该表中唯一。例如,myserver.mydb.customers_shard1 表中的行可能具有与 myserver.mydb.customers_shard2 表中的行相同的键值。

为确保每个事件键在更改事件记录的表之间是唯一的,主题路由转换会将字段插入到更改事件键中。默认情况下,插入字段的名称为 __dbz__physicalTableIdentifier。insert 字段的值是默认的目的地主题名称。

如果要,您可以配置主题路由转换,将不同的字段插入到键中。要做到这一点,指定 key.field.name 选项,并将其设置为没有现有主键字段名称的字段名称。例如:

transforms=Reroute
transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter
transforms.Reroute.topic.regex=(.*)customers_shard(.*)
transforms.Reroute.topic.replacement=$1customers_all_shards
transforms.Reroute.key.field.name=shard_id

本例将 shard_id 字段添加到路由记录中的键结构中。

如果要调整密钥的新字段的值,请配置这两个选项:

key.field.regex
指定转换应用到默认目的地主题名称的正则表达式,以捕获一个或多个字符组。
key.field.replacement
指定用来决定在捕获的组中插入的 key 字段的值的正则表达式。

例如:

transforms.Reroute.key.field.regex=(.*)customers_shard(.*)
transforms.Reroute.key.field.replacement=$2

使用这个配置,假设默认目标主题名称是:

myserver.mydb.customers_shard1
myserver.mydb.customers_shard2
myserver.mydb.customers_shard3

转换使用第二个捕获的组中的值,即分片编号,作为 key 的新字段的值。在本例中,插入的 key 字段的值为 123

如果您的表包含全局唯一的密钥,而您不需要更改密钥结构,您可以将 key.enforce.uniqueness 选项设置为 false

...
transforms.Reroute.key.enforce.uniqueness=false
...

13.2.4. 有选择地应用主题路由转换的选项

除了 Debezium 连接器在数据库更改时发出的更改事件消息外,连接器还会发出其他类型的信息,包括心跳消息,以及有关 schema 更改和事务的元数据消息。由于这些其他消息的结构与 SMT 设计的更改事件消息的结构不同,因此最好将连接器配置为有选择地应用 SMT,以便它只处理预期的数据更改消息。

您可以使用以下任一方法配置连接器来有选择地应用 SMT:

13.2.5. 配置 Debezium 主题路由转换的选项

下表描述了主题路由 SMT 配置选项。

表 13.1. 主题路由 SMT 配置选项
选项默认描述

topic.regex

 

指定转换适用于每个更改事件记录的正则表达式,以确定它是否应该路由到特定主题。

topic.replacement

 

指定代表目标主题名称的正则表达式。转换会将每个匹配记录路由到此表达式标识的主题。此表达式可以引用您为 topic.regex 指定的正则表达式捕获的组。要引用组,请指定 $1$2 等等。

key.enforce​.uniqueness

true

指明是否在记录的更改事件键中添加字段。添加 key 字段可确保每个事件键在更改事件记录写入同一主题的表之间是唯一的。这有助于防止更改事件对具有相同密钥但源自不同源表的记录发生。

如果您不希望转换添加 key 字段,请指定 false。例如,如果您要将记录从分区 PostgreSQL 表路由到一个主题,您可以配置 key.enforce.uniqueness=false,因为分区的 PostgreSQL 表中保证了唯一的密钥。

key.field.name

__dbz__physicalTableIdentifier

要添加到更改事件键的字段名称。此字段的值标识原始表名称。对于 SMT 添加此字段,key.enforce.uniqueness 必须为 true,这是默认值。

key.field.regex

 

指定转换应用到默认目的地主题名称的正则表达式,以捕获一个或多个字符组。要使 SMT 应用到这个表达式,key.enforce.uniqueness 必须为 true,这是默认值。

key.field​.replacement

 

指定用于在为 key.field.regex 指定的表达式捕获的组方面确定插入的 key 字段的值的正则表达式。要使 SMT 应用到这个表达式,key.enforce.uniqueness 必须为 true,这是默认值。

schema.name.adjustment.mode

none

指定来自结果主题名称的消息键 schema 名称应该如何进行调整以保持与连接器使用的消息转换器匹配,这包括: none 不应用任何调整(默认),aro 将 Avro 类型名称中使用的字符替换为下划线。

logical.table.cache.size

16

在 LRUCache 中用于保存最大条目的大小。缓存将为逻辑表键和值保留旧的/新模式,也缓存派生的键和值,以改进源记录转换。

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.