12.2. 将 Debezium 事件记录路由到您指定的主题


每个包含数据更改事件的 Kafka 记录都有一个默认的目的地主题。如果需要,您可以将记录重新路由到您在记录到达 Kafka Connect 转换器前指定的主题。为此,Debebe 提供主题路由单一消息转换(SMT)。在 Debezium 连接器的 Kafka Connect 配置中配置这个转换。配置选项允许您指定以下内容:

  • 用于标识要重新路由的记录的表达式
  • 解析到目的地主题的表达式
  • 如何确保将记录中的唯一键重新路由到目标主题

您确定转换配置提供了您想要的行为。Debezium 不会验证来自您转换配置的结果的行为。

主题路由转换是一个 Kafka Connect SMT

以下主题提供详情:

12.2.1. 将 Debezium 记录路由到您指定的主题的用例

默认行为是 Debezium 连接器将每个更改事件记录发送到名称从数据库名称以及进行更改的表名称的主题。换句话说,主题接收一个物理表的记录。当您想接收多个物理表的记录时,您必须配置 Debezium 连接器,以将记录重新路由到该主题。

逻辑表

逻辑表是将多个物理表的记录路由到一个主题的常见用例。在逻辑表中,有多个物理表,它们都有相同的模式。例如,分片表具有相同的模式。逻辑表可由两个或多个分片表组成: db_shard1.my_tabledb_shard2.my_table。表位于不同的分片中,物理不同,但它们组成一个逻辑表。您可以将任何分片中的表重新路由更改事件记录到同一主题。

分区的 PostgreSQL 表

当 Debezium PostgreSQL 连接器捕获分区表中的更改时,默认行为是更改事件记录将路由到每个分区的不同主题。要将记录从所有分区发送到一个主题,请配置主题路由 SMT。因为分区表中的每个键都保证是唯一的,因此请配置 key.enforce.uniqueness=false,以便 SMT 不添加 key 字段来确保唯一密钥。添加 key 字段是默认行为。

12.2.2. 将多个表的 Debezium 记录路由到一个主题的示例

要将多个物理表的更改事件记录路由到同一主题,请在 Debezium 连接器的 Kafka Connect 配置中配置主题路由转换。配置主题路由 SMT 要求您指定决定的正则表达式:

  • 用于路由记录的表。这些表必须具有相同的模式。
  • 目标主题名称。

以下示例中的连接器配置为主题路由 SMT 设置几个选项:

transforms=Reroute
transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter
transforms.Reroute.topic.regex=(.*)customers_shard(.*)
transforms.Reroute.topic.replacement=$1customers_all_shards
topic.regex

指定转换适用于每个更改事件记录的正则表达式,以确定是否应将其路由到特定的主题。

在示例中,正则表达式 (.*)customers_shard(.*) 与名称包含 customers_shard 字符串的表的记录匹配。这会重新路由具有以下名称的表的记录:

myserver.mydb.customers_shard1
myserver.mydb.customers_shard2
myserver.mydb.customers_shard3

topic.replacement
指定代表目标主题名称的正则表达式。转换将每个匹配记录路由到此表达式标识的主题。在本例中,上面列出的三个分片表的记录将路由到 myserver.mydb.customers_all_shards 主题。
schema.name.adjustment.mode
指定消息密钥架构名称如何从生成的主题名称进行调整,以便与连接器使用的消息转换器兼容。该值可以是 none (默认)或 avro

自定义配置

要自定义配置,您可以定义 SMT predicate 语句,用于指定您要转换进程的表,或者不处理。如果您将 SMT 配置为与正则表达式匹配的路由表,且您不希望 SMT 重新路由与表达式匹配的特定表,则 predicate 可能很有用。

12.2.3. 确保 Debezium 记录中的唯一键路由到同一主题

Debezium 更改事件键使用表列组成表的主键。要将多个物理表的记录路由到一个主题,事件键必须在所有这些表中都是唯一的。但是,每个物理表可能有一个只在该表中独有的主密钥。例如,myserver.mydb.customers_shard1 表中的行可能与 myserver.mydb.customers_shard2 表中的行相同。

为确保每个事件键在更改事件记录进入同一主题的表中都是唯一的,主题路由转换会将字段插入到更改事件键中。默认情况下,插入的字段的名称是 __dbz__physicalTableIdentifier。insert 字段的值是默认的目的地主题名称。

如果要,您可以配置主题路由转换,将不同的字段插入到密钥中。为此,请指定 key.field.name 选项,并将其设置为使用现有主密钥字段名称冲突的字段名称。例如:

transforms=Reroute
transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter
transforms.Reroute.topic.regex=(.*)customers_shard(.*)
transforms.Reroute.topic.replacement=$1customers_all_shards
transforms.Reroute.key.field.name=shard_id

本例将 shard_id 字段添加到路由记录中的键结构中。

如果要调整键的新字段的值,请配置这两个选项:

key.field.regex
指定转换应用到默认目标主题名称的正则表达式,以捕获一个或多个字符组。
key.field.replacement
指定在捕获的组方面确定插入的 key 字段的值的正则表达式。

例如:

transforms.Reroute.key.field.regex=(.*)customers_shard(.*)
transforms.Reroute.key.field.replacement=$2

使用这个配置,假设默认目标主题名称为:

myserver.mydb.customers_shard1
myserver.mydb.customers_shard2
myserver.mydb.customers_shard3

转换使用第二个捕获的组中的值(分片号)作为键的新字段的值。在本例中,插入的 key 字段的值将是 123

如果您的表包含全局唯一密钥,且不需要更改密钥结构,您可以将 key.enforce.uniqueness 选项设置为 false

...
transforms.Reroute.key.enforce.uniqueness=false
...

12.2.4. 用于有选择地应用主题路由转换的选项

除了 Debezium 连接器在发生数据库更改时发出的更改事件消息外,连接器还会发出其他类型的消息,包括心跳消息以及有关架构更改和事务的元数据消息。由于这些消息的结构与 SMT 旨在处理的更改事件消息的结构不同,因此最好将连接器配置为有选择地应用 SMT,以便它只处理预期的数据更改消息。

您可以使用以下方法之一将连接器配置为有选择地应用 SMT:

12.2.5. 用于配置 Debezium 主题路由转换的选项

下表描述了主题路由 SMT 配置选项。

表 12.1. 主题路由 SMT 配置选项
选项默认描述

topic.regex

 

指定转换适用于每个更改事件记录的正则表达式,以确定是否应将其路由到特定的主题。

topic.replacement

 

指定代表目标主题名称的正则表达式。转换将每个匹配记录路由到此表达式标识的主题。此表达式可以引用您为 topic.regex 指定的正则表达式捕获的组。要引用组,请指定 $1$2 等。

key.enforce​.uniqueness

true

指明是否向记录的更改事件键添加字段。添加 key 字段可确保每个事件键在更改事件记录进入同一主题的表中是唯一的。这有助于防止对具有相同键但源自不同源表的记录更改事件冲突。

如果您不希望转换添加 key 字段,请指定 false。例如,如果您将分区的 PostgreSQL 表中的记录路由到一个主题,您可以配置 key.enforce.uniqueness=false,因为在分区 PostgreSQL 表中保证唯一密钥。

key.field.name

__dbz__physicalTableIdentifier

要添加到更改事件键的字段名称。此字段的值标识原始表名称。要使 SMT 添加此字段,key.enforce.uniqueness 必须为 true,这是默认设置。

key.field.regex

 

指定转换应用到默认目标主题名称的正则表达式,以捕获一个或多个字符组。要使 SMT 应用此表达式,key.enforce.uniqueness 必须为 true,这是默认设置。

key.field​.replacement

 

指定在为 key.field.regex 指定的表达式捕获的、用于确定插入 key 字段的值的正则表达式。要使 SMT 应用此表达式,key.enforce.uniqueness 必须为 true,这是默认设置。

schema.name.adjustment.mode

none

指定消息密钥架构名称应该如何调整,以便与连接器使用的消息转换器兼容,包括: none 不应用任何调整(默认),aro 将 Avro 类型名称中使用的字符替换为下划线。

logical.table.cache.size

16

用于在 LRUCache 中保存最大条目的大小。缓存将保留用于逻辑表键和值的旧/新模式,还缓存派生的 key 和 topic regex 结果,以改进源记录转换。

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.