12.2. 将 Debezium 事件记录路由到您指定的主题
每个包含数据更改事件的 Kafka 记录都有一个默认的目的地主题。如果需要,您可以将记录重新路由到您在记录到达 Kafka Connect 转换器前指定的主题。为此,Debebe 提供主题路由单一消息转换(SMT)。在 Debezium 连接器的 Kafka Connect 配置中配置这个转换。配置选项允许您指定以下内容:
- 用于标识要重新路由的记录的表达式
- 解析到目的地主题的表达式
- 如何确保将记录中的唯一键重新路由到目标主题
您确定转换配置提供了您想要的行为。Debezium 不会验证来自您转换配置的结果的行为。
主题路由转换是一个 Kafka Connect SMT。
以下主题提供详情:
12.2.1. 将 Debezium 记录路由到您指定的主题的用例
默认行为是 Debezium 连接器将每个更改事件记录发送到名称从数据库名称以及进行更改的表名称的主题。换句话说,主题接收一个物理表的记录。当您想接收多个物理表的记录时,您必须配置 Debezium 连接器,以将记录重新路由到该主题。
逻辑表
逻辑表是将多个物理表的记录路由到一个主题的常见用例。在逻辑表中,有多个物理表,它们都有相同的模式。例如,分片表具有相同的模式。逻辑表可由两个或多个分片表组成: db_shard1.my_table
和 db_shard2.my_table
。表位于不同的分片中,物理不同,但它们组成一个逻辑表。您可以将任何分片中的表重新路由更改事件记录到同一主题。
分区的 PostgreSQL 表
当 Debezium PostgreSQL 连接器捕获分区表中的更改时,默认行为是更改事件记录将路由到每个分区的不同主题。要将记录从所有分区发送到一个主题,请配置主题路由 SMT。因为分区表中的每个键都保证是唯一的,因此请配置 key.enforce.uniqueness=false
,以便 SMT 不添加 key 字段来确保唯一密钥。添加 key 字段是默认行为。
12.2.2. 将多个表的 Debezium 记录路由到一个主题的示例
要将多个物理表的更改事件记录路由到同一主题,请在 Debezium 连接器的 Kafka Connect 配置中配置主题路由转换。配置主题路由 SMT 要求您指定决定的正则表达式:
- 用于路由记录的表。这些表必须具有相同的模式。
- 目标主题名称。
以下示例中的连接器配置为主题路由 SMT 设置几个选项:
transforms=Reroute transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter transforms.Reroute.topic.regex=(.*)customers_shard(.*) transforms.Reroute.topic.replacement=$1customers_all_shards
topic.regex
指定转换适用于每个更改事件记录的正则表达式,以确定是否应将其路由到特定的主题。
在示例中,正则表达式
(.*)customers_shard(.*)
与名称包含customers_shard
字符串的表的记录匹配。这会重新路由具有以下名称的表的记录:myserver.mydb.customers_shard1
myserver.mydb.customers_shard2
myserver.mydb.customers_shard3
topic.replacement
-
指定代表目标主题名称的正则表达式。转换将每个匹配记录路由到此表达式标识的主题。在本例中,上面列出的三个分片表的记录将路由到
myserver.mydb.customers_all_shards
主题。 schema.name.adjustment.mode
-
指定消息密钥架构名称如何从生成的主题名称进行调整,以便与连接器使用的消息转换器兼容。该值可以是
none
(默认)或avro
。
自定义配置
要自定义配置,您可以定义 SMT predicate 语句,用于指定您要转换进程的表,或者不处理。如果您将 SMT 配置为与正则表达式匹配的路由表,且您不希望 SMT 重新路由与表达式匹配的特定表,则 predicate 可能很有用。
12.2.3. 确保 Debezium 记录中的唯一键路由到同一主题
Debezium 更改事件键使用表列组成表的主键。要将多个物理表的记录路由到一个主题,事件键必须在所有这些表中都是唯一的。但是,每个物理表可能有一个只在该表中独有的主密钥。例如,myserver.mydb.customers_shard1
表中的行可能与 myserver.mydb.customers_shard2
表中的行相同。
为确保每个事件键在更改事件记录进入同一主题的表中都是唯一的,主题路由转换会将字段插入到更改事件键中。默认情况下,插入的字段的名称是 __dbz__physicalTableIdentifier
。insert 字段的值是默认的目的地主题名称。
如果要,您可以配置主题路由转换,将不同的字段插入到密钥中。为此,请指定 key.field.name
选项,并将其设置为使用现有主密钥字段名称冲突的字段名称。例如:
transforms=Reroute transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter transforms.Reroute.topic.regex=(.*)customers_shard(.*) transforms.Reroute.topic.replacement=$1customers_all_shards transforms.Reroute.key.field.name=shard_id
本例将 shard_id
字段添加到路由记录中的键结构中。
如果要调整键的新字段的值,请配置这两个选项:
key.field.regex
- 指定转换应用到默认目标主题名称的正则表达式,以捕获一个或多个字符组。
key.field.replacement
- 指定在捕获的组方面确定插入的 key 字段的值的正则表达式。
例如:
transforms.Reroute.key.field.regex=(.*)customers_shard(.*) transforms.Reroute.key.field.replacement=$2
使用这个配置,假设默认目标主题名称为:
myserver.mydb.customers_shard1
myserver.mydb.customers_shard2
myserver.mydb.customers_shard3
转换使用第二个捕获的组中的值(分片号)作为键的新字段的值。在本例中,插入的 key 字段的值将是 1
、2
或 3
。
如果您的表包含全局唯一密钥,且不需要更改密钥结构,您可以将 key.enforce.uniqueness
选项设置为 false
:
... transforms.Reroute.key.enforce.uniqueness=false ...
12.2.4. 用于有选择地应用主题路由转换的选项
除了 Debezium 连接器在发生数据库更改时发出的更改事件消息外,连接器还会发出其他类型的消息,包括心跳消息以及有关架构更改和事务的元数据消息。由于这些消息的结构与 SMT 旨在处理的更改事件消息的结构不同,因此最好将连接器配置为有选择地应用 SMT,以便它只处理预期的数据更改消息。
您可以使用以下方法之一将连接器配置为有选择地应用 SMT:
- 为转换配置 SMT predicate。
- 对 SMT 使用 topic.regex 配置选项。
12.2.5. 用于配置 Debezium 主题路由转换的选项
下表描述了主题路由 SMT 配置选项。
选项 | 默认 | 描述 |
---|---|---|
指定转换适用于每个更改事件记录的正则表达式,以确定是否应将其路由到特定的主题。 | ||
指定代表目标主题名称的正则表达式。转换将每个匹配记录路由到此表达式标识的主题。此表达式可以引用您为 | ||
|
指明是否向记录的更改事件键添加字段。添加 key 字段可确保每个事件键在更改事件记录进入同一主题的表中是唯一的。这有助于防止对具有相同键但源自不同源表的记录更改事件冲突。 | |
|
要添加到更改事件键的字段名称。此字段的值标识原始表名称。要使 SMT 添加此字段, | |
指定转换应用到默认目标主题名称的正则表达式,以捕获一个或多个字符组。要使 SMT 应用此表达式, | ||
指定在为 | ||
none |
指定消息密钥架构名称应该如何调整,以便与连接器使用的消息转换器兼容,包括: | |
| 用于在 LRUCache 中保存最大条目的大小。缓存将保留用于逻辑表键和值的旧/新模式,还缓存派生的 key 和 topic regex 结果,以改进源记录转换。 |