第 6 章 MySQL 的 Debezium 连接器
MySQL 有一个二进制日志(binlog),它按照它们提交到数据库的顺序记录所有操作。这包括对表模式的更改,以及对表中的数据的更改。MySQL 使用 binlog 进行复制和恢复。
Debezium MySQL 连接器读取 binlog,为行级 INSERT
、UPDATE
和 DELETE
操作生成更改事件,并将更改事件发送到 Kafka 主题。客户端应用程序读取这些 Kafka 主题。
因为 MySQL 通常会在指定时间段内清除 binlogs,因此 MySQL 连接器会针对每个数据库执行初始 一致的快照。MySQL 连接器从创建快照的时间点读取 binlog。
有关与此连接器兼容的 MySQL 数据库版本的详情,请查看 Debezium 支持的配置页面。
使用 Debezium MySQL 连接器的信息和步骤进行组织,如下所示:
6.1. Debezium MySQL 连接器的工作方式
连接器支持的 MySQL 拓扑概述可用于规划应用程序。为了优化配置和运行 Debezium MySQL 连接器,了解连接器如何跟踪表结构、公开模式更改、执行快照以及确定 Kafka 主题名称。
详情包括在以下主题中:
6.1.1. Debezium 连接器支持的 MySQL 拓扑
Debezium MySQL 连接器支持以下 MySQL 拓扑:
- Standalone
- 当使用单个 MySQL 服务器时,服务器必须启用 binlog (并选择性地启用 GTIDs),以便 Debezium MySQL 连接器可以监控服务器。这通常可以接受,因为二进制日志也可以用作 增量备份。在这种情况下,MySQL 连接器总是连接到并遵循这个独立 MySQL 服务器实例。
- 主和副本
Debezium MySQL 连接器可以遵循其中一个主服务器或其中一个副本(如果该副本启用了 binlog),但连接器只会看到对该服务器可见的集群的更改。通常,除了多主拓扑外,这不是问题。
连接器在服务器的 binlog 中记录其位置,这在集群中的每个服务器都有所不同。因此,连接器必须只遵循一个 MySQL 服务器实例。如果该服务器失败,必须在连接器继续之前重启或恢复该服务器。
- 高可用性集群
- MySQL 存在各种 高可用性解决方案,它们可以更容易容许,并且几乎立即从问题和故障中恢复。大多数 HA MySQL 集群使用 GTID,以便副本可以在任何主服务器上跟踪所有更改。
- Multi-primary
网络数据库(NDB)集群复制 使用一个或多个 MySQL 副本节点,它们各自从多个主服务器复制。这是聚合多个 MySQL 集群复制的强大方法。这个拓扑需要使用 GTID。
Debezium MySQL 连接器可以使用这些多主 MySQL 副本作为源,只要新副本被发现到旧副本,就可以切换到不同的多主 MySQL 副本。也就是说,新副本具有在第一个副本中看到的所有事务。即使连接器只使用数据库和/或表的子集,当尝试重新连接到新的多主 MySQL 副本时,也可以将连接器配置为包含或排除特定的 GTID 源,并在 binlog 中找到正确的位置。
- 托管
支持 Debezium MySQL 连接器以使用托管选项,如 Amazon RDS 和 Amazon Aurora。
由于这些托管选项不允许全局读取锁定,因此表级锁定用于创建 一致的快照。
6.1.2. Debezium MySQL 连接器如何处理数据库架构更改
当数据库客户端查询数据库时,客户端将使用数据库的当前架构。但是,数据库模式可以随时更改,这意味着连接器必须能够识别每个插入、更新或删除操作被记录的时间。另外,连接器不一定将当前的模式应用到每个事件。如果事件相对旧,则应用当前模式之前可能会记录该事件。
为确保在架构更改后正确处理事件,MySQL 仅包含在事务日志中,不仅影响数据的行级更改,还应用于数据库的 DDL 语句。当连接器在 binlog 中遇到这些 DDL 语句时,它会解析它们并更新每个表模式的内存表示。连接器使用此模式表示来识别每个插入、更新或删除操作时表的结构,并生成适当的更改事件。在单独的数据库架构历史记录 Kafka 主题中,连接器记录所有 DDL 语句,以及 binlog 中出现每个 DDL 语句的位置。
当连接器在崩溃或安全停止后重启时,它从特定位置(即时间点)开始读取 binlog。连接器通过读取数据库模式历史记录 Kafka 主题并将所有 DDL 语句解析为连接器启动的 binlog 中,以此重建此时存在的表结构。
此数据库架构历史记录主题仅用于内部连接器。另外,连接器也可以将 模式更改事件发送到用于消费者应用程序的不同主题。
当 MySQL 连接器捕获表中的更改时,会应用 gh-ost
或 pt-online-schema-change
等模式更改,在迁移过程中会创建帮助程序表。您必须配置连接器来捕获这些帮助程序表中的更改。如果消费者不需要为帮助程序表生成的记录,请配置 单个消息转换(SMT) 从连接器发出的消息中删除这些记录。
其他资源
- 接收 Debezium 事件记录 的主题的默认名称。
6.1.3. Debezium MySQL 连接器如何公开数据库架构更改
您可以配置 Debezium MySQL 连接器来生成模式更改事件,该事件描述了应用到数据库中表的架构更改。连接器将模式更改事件写入名为 < topicPrefix>
的 Kafka 主题,其中 topicPrefix
是 topic.prefix
连接器配置属性中指定的命名空间。连接器发送到 schema 更改主题的消息包含一个有效负载,以及可选的包含更改事件消息的 schema。
模式更改事件消息的有效负载包括以下元素:
ddl
-
提供会导致架构更改的 SQL
CREATE
、ALTER
或DROP
语句。 databaseName
-
将 DDL 语句应用到的数据库的名称。
databaseName
的值充当 message 键。 pos
- 语句出现在 binlog 中的位置。
tableChanges
-
架构更改后整个表模式的结构化表示。
tableChanges
字段包含一个数组,其中包含表的每个列的条目。由于结构化表示以 JSON 或 Avro 格式呈现数据,因此用户可轻松读取消息,而不必先通过 DDL 解析器处理它们。
对于处于捕获模式的表,连接器不仅将模式更改的历史记录存储在 schema 更改主题中,也存储在内部数据库架构历史记录主题中。内部数据库架构历史记录主题仅用于连接器,它不适用于消耗应用程序直接使用。确保需要通知架构更改的应用程序只消耗来自 schema 更改主题的信息。
切勿对数据库架构历史记录主题进行分区。要使数据库架构历史记录主题正常工作,它必须维护连接器发出的事件记录的全局顺序。
要确保主题没有在分区间分割,请使用以下方法之一为主题设置分区计数:
-
如果您手动创建数据库架构历史记录主题,请指定分区计数
1
。 -
如果您使用 Apache Kafka 代理自动创建数据库 schema 历史记录主题,则会创建该主题,将 Kafka
num.partitions
配置选项 的值设置为1
。
连接器发出到其 schema 更改主题的消息格式处于 incubating 状态,并可能在没有通知的情况下改变。
示例:消息发送到 MySQL 连接器模式更改主题
以下示例显示了 JSON 格式的典型的模式更改消息。该消息包含表模式的逻辑表示。
{ "schema": { }, "payload": { "source": { 1 "version": "2.3.4.Final", "connector": "mysql", "name": "mysql", "ts_ms": 1651535750218, 2 "snapshot": "false", "db": "inventory", "sequence": null, "table": "customers", "server_id": 223344, "gtid": null, "file": "mysql-bin.000003", "pos": 570, "row": 0, "thread": null, "query": null }, "databaseName": "inventory", 3 "schemaName": null, "ddl": "ALTER TABLE customers ADD middle_name varchar(255) AFTER first_name", 4 "tableChanges": [ 5 { "type": "ALTER", 6 "id": "\"inventory\".\"customers\"", 7 "table": { 8 "defaultCharsetName": "utf8mb4", "primaryKeyColumnNames": [ 9 "id" ], "columns": [ 10 { "name": "id", "jdbcType": 4, "nativeType": null, "typeName": "INT", "typeExpression": "INT", "charsetName": null, "length": null, "scale": null, "position": 1, "optional": false, "autoIncremented": true, "generated": true }, { "name": "first_name", "jdbcType": 12, "nativeType": null, "typeName": "VARCHAR", "typeExpression": "VARCHAR", "charsetName": "utf8mb4", "length": 255, "scale": null, "position": 2, "optional": false, "autoIncremented": false, "generated": false }, { "name": "middle_name", "jdbcType": 12, "nativeType": null, "typeName": "VARCHAR", "typeExpression": "VARCHAR", "charsetName": "utf8mb4", "length": 255, "scale": null, "position": 3, "optional": true, "autoIncremented": false, "generated": false }, { "name": "last_name", "jdbcType": 12, "nativeType": null, "typeName": "VARCHAR", "typeExpression": "VARCHAR", "charsetName": "utf8mb4", "length": 255, "scale": null, "position": 4, "optional": false, "autoIncremented": false, "generated": false }, { "name": "email", "jdbcType": 12, "nativeType": null, "typeName": "VARCHAR", "typeExpression": "VARCHAR", "charsetName": "utf8mb4", "length": 255, "scale": null, "position": 5, "optional": false, "autoIncremented": false, "generated": false } ], "attributes": [ 11 { "customAttribute": "attributeValue" } ] } } ] } }
项 | 字段名称 | 描述 |
---|---|---|
1 |
|
|
2 |
| 可选字段,显示连接器处理事件的时间。这个时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。 在源对象中,ts_ms 表示数据库中进行更改的时间。通过将 payload.source.ts_ms 的值与 payload.ts_ms 的值进行比较,您可以确定源数据库更新和 Debezium 之间的滞后。 |
3 |
|
标识包含更改的数据库和架构。 |
4 |
|
此字段包含负责架构更改的 DDL。 |
5 |
| 包含 DDL 命令生成的模式更改的一个或多个项目的数组。 |
6 |
| 描述更改的类型。该值如下之一:
|
7 |
|
创建、更改或丢弃的表的完整标识符。如果是表重命名,这个标识符是 < |
8 |
| 代表应用更改后的表元数据。 |
9 |
| 组成表主密钥的列的列表。 |
10 |
| 更改表中每个列的元数据。 |
11 |
| 每个表更改的自定义属性元数据。 |
如需更多信息,请参阅 schema 历史记录主题。
6.1.4. Debezium MySQL 连接器如何执行数据库快照
当 Debezium MySQL 连接器首次启动时,它会执行数据库的初始 一致快照。这个快照可让连接器为数据库的当前状态建立基准。
Debezium 可以在运行快照时使用不同的模式。快照模式由 snapshot.mode
配置属性决定。属性的默认值为 初始
。您可以通过更改 snapshot.mode
属性的值来自定义连接器创建快照的方式。
您可以在以下部分找到有关快照的更多信息:
连接器在执行快照时完成一系列任务。快照模式以及对数据库有效的表锁定策略的具体步骤会有所不同。当 Debezium MySQL 连接器执行 使用全局读取锁 或 表级 锁定的初始快照时,Debezium MySQL 连接器可以完成不同的步骤。
6.1.4.1. 使用全局读锁的初始快照
您可以通过更改 snapshot.mode
属性的值来自定义连接器创建快照的方式。如果您配置不同的快照模式,连接器使用这个工作流的修改版本完成快照。有关不允许全局读取锁定的环境中快照进程的详情,请查看 表级锁定的快照工作流。
Debezium MySQL 连接器用来执行带有全局读锁的初始快照的默认工作流
下表显示了 Debezium 遵循的工作流中的步骤,以使用全局读取锁定创建快照。
步骤 | 操作 |
---|---|
1 | 建立与数据库的连接。 |
2 |
确定要捕获的表。默认情况下,连接器捕获所有非系统表的数据。快照完成后,连接器将继续流传输指定表的数据。如果您希望连接器只从特定表捕获数据,您可以通过设置 |
3 |
获取表上的全局读取锁定,以捕获给其他数据库客户端阻止 写入。 快照本身不会阻止其他客户端应用 DDL,这可能会影响连接器的尝试读取 binlog 位置和表模式。连接器在读取 binlog 位置时保留全局读取锁定,并在以后的步骤中释放锁定。 |
4 |
使用 可重复的读取语义 启动事务,以确保事务中的所有后续读取都针对 一致的快照 完成。 注意 使用这些隔离语义可能会减慢快照的进度。如果快照完成用时过长,请考虑使用不同的隔离配置,或者跳过初始快照并运行 增量快照。 |
5 | 读取当前的 binlog 位置。 |
6 |
捕获数据库中所有表的结构,或者为捕获指定的所有表。连接器在其内部数据库模式历史记录主题中保留模式信息,包括所有必要的 注意 默认情况下,连接器捕获数据库中每个表的 schema,包括没有配置为捕获的表。如果没有为捕获配置表,则初始快照只捕获其结构;它不会捕获任何表数据。 有关为什么没有包括在初始快照中的表的快照保留模式信息,请参阅 了解为什么初始快照捕获所有表的 schema。 |
7 | 释放在第 3 步中获得的全局读取锁定。其他数据库客户端现在可以写入数据库。 |
8 | 在连接器在 Step 5 中读取的 binlog 位置,连接器开始扫描为捕获的表。在扫描过程中,连接器完成以下任务:
|
9 | 提交事务。 |
10 | 在连接器偏移中记录快照成功完成。 |
生成的初始快照捕获捕获捕获的表中每行的当前状态。在这个基准状态中,连接器会捕获后续更改。
在快照进程开始后,如果进程因为连接器失败、重新平衡或其他原因而中断,则进程会在连接器重启后重启。
连接器完成初始快照后,它会继续从在第 5 步中读取的位置进行流,使其不会错过任何更新。
如果连接器因为任何原因而再次停止,它会在重启后从之前关闭的位置恢复流更改。
连接器重启后,如果删除了日志,则日志中连接器的位置可能不再可用。然后,连接器会失败,并返回一个错误,表示需要新的快照。要将连接器配置为在这种情况下自动启动快照,请将 snapshot.mode
属性的值设置为 when_needed
。有关 Debezium MySQL 连接器故障排除的更多信息,请参阅 当出现问题时的行为。
6.1.4.2. 使用表级锁定的初始快照
在某些数据库环境中,管理员不允许全局读取锁定。如果 Debezium MySQL 连接器检测到不允许全局读取锁定,连接器会在执行快照时使用表级锁定。要使连接器执行使用表级锁的快照,Debezium 连接器用来连接到 MySQL 的数据库帐户必须具有 LOCK TABLES
权限。
Debezium MySQL 连接器用来执行带有表级别锁定的初始快照的默认工作流
以下工作流列出了 Debezium 使用表级读取锁定创建快照所采取的步骤。有关不允许全局读取锁定的环境中快照进程的详情,请查看 全局读取锁定的快照工作流。
步骤 | 操作 |
---|---|
1 | 建立与数据库的连接。 |
2 |
确定要捕获的表。默认情况下,连接器捕获所有非系统表。要让连接器捕获表或表元素的子集,您可以设置多个 |
3 | 获取表级锁定。 |
4 | 使用 可重复的读取语义 启动事务,以确保事务中的所有后续读取都针对 一致的快照 完成。 |
5 | 读取当前的 binlog 位置。 |
6 |
读取连接器配置为捕获更改的数据库和表的 schema。连接器在其内部数据库模式历史记录主题中保留模式信息,包括所有必要的 注意 默认情况下,连接器捕获数据库中每个表的 schema,包括没有配置为捕获的表。如果没有为捕获配置表,则初始快照只捕获其结构;它不会捕获任何表数据。 有关为什么没有包括在初始快照中的表的快照保留模式信息,请参阅 了解为什么初始快照捕获所有表的 schema。 |
7 | 在连接器在 Step 5 中读取的 binlog 位置,连接器开始扫描为捕获的表。在扫描过程中,连接器完成以下任务:
|
8 | 提交事务。 |
9 | 释放表级锁定。其他数据库客户端现在可以写入任何之前锁定的表。 |
10 | 在连接器偏移中记录快照成功完成。 |
6.1.4.3. 初始快照捕获所有表的 schema 历史记录的描述
连接器运行的初始快照捕获两种类型的信息:
- 表数据
-
在连接器的
table.include.list
属性中命名的表中的INSERT
、UPDATE
和DELETE
操作的信息。 - 模式数据
- 描述应用到表的结构更改的 DDL 语句。模式数据会保留给内部模式历史记录主题,以及连接器的 schema 更改主题(如果配置了)。
运行初始快照后,您可能会注意到快照捕获没有指定用于捕获的表的模式信息。默认情况下,初始快照旨在捕获数据库中存在的每个表的模式信息,而不仅仅是从指定为捕获的表的表。连接器要求表的模式存在于架构历史记录主题中,然后才能捕获表。通过启用初始快照来捕获不是原始捕获集一部分的表的 schema 数据,Debebe 准备好连接器,以便稍后需要捕获这些表中的事件数据。如果初始快照没有捕获表的 schema,您必须将模式添加到历史记录主题,然后才能从表中捕获数据。
在某些情况下,您可能想要限制初始快照中的模式捕获。当您要减少完成快照所需的时间时,这非常有用。或者,当 Debezium 通过可访问多个逻辑数据库的用户帐户连接到数据库实例时,但您希望连接器只从特定逻辑数据库中的表捕获更改。
附加信息
- 从不是由初始快照捕获的表捕获数据(没有模式更改)
- 从不是由初始快照捕获的表捕获数据(应用程序更改)
-
设置
schema.history.internal.store.only.captured.tables.ddl
属性,以指定从中捕获模式信息的表。 -
设置
schema.history.internal.store.only.captured.databases.ddl
属性,以指定从中捕获模式更改的逻辑数据库。
6.1.4.4. 从不是由初始快照捕获的表捕获数据(没有模式更改)
在某些情况下,您可能希望连接器从其模式未被初始快照捕获的表中捕获数据。根据连接器配置,初始快照只能捕获数据库中特定表的表模式。如果历史记录主题中没有表模式,连接器将无法捕获表,并报告缺少的 schema 错误。
您可能仍然能够从表中捕获数据,但您必须执行额外的步骤来添加表模式。
前提条件
- 您希望从带有连接器在初始快照期间没有捕获的 schema 捕获数据。
- 在事务日志中,表的所有条目都使用相同的模式。有关从具有存结构更改的新表中捕获数据的详情,请参考从 未由初始快照(schema 更改)捕获的表中的捕获数据。
流程
- 停止连接器。
-
删除由 schema.history.internal.
kafka.topic 属性指定的内部数据库架构历史记录
主题。 对连接器配置应用以下更改:
-
将
snapshot.mode
设置为schema_only_recovery
。 -
将
schema.history.internal.store.only.captured.tables.ddl
的值设置为false
。 -
添加您希望连接器捕获至
table.include.list
的表。这样可保证将来,连接器可以重建所有表的 schema 历史记录。
-
将
- 重启连接器。快照恢复过程根据表的当前结构重建模式历史记录。
- (可选)在快照完成后,启动一个 增量快照 来捕获新添加的表的现有数据,以及该连接器关闭时发生的其他表的更改。
-
(可选)将
snapshot.mode
重置为schema_only
,以防止连接器在以后的重启后启动恢复。
6.1.4.5. 从不是由初始快照捕获的表捕获数据(应用程序更改)
如果架构更改应用到表,则在架构更改前提交的记录与更改后提交的不同结构不同。当 Debezium 从表中捕获数据时,它会读取 schema 历史记录,以确保它为每个事件应用正确的模式。如果 schema 历史记录主题中没有 schema,则连接器无法捕获表,并出现错误结果。
如果要从初始快照捕获的表中捕获数据,并且修改了表的 schema,则必须将模式添加到历史记录主题中(如果它还没有可用)。您可以通过运行新的模式快照或运行表的初始快照来添加模式。
前提条件
- 您希望从带有连接器在初始快照期间没有捕获的 schema 捕获数据。
- 架构更改应用于表,以便捕获的记录没有统一结构。
流程
- 初始快照捕获了所有表的模式(
storage.only.captured.tables.ddl
设置为false
) -
编辑
table.include.list
属性,以指定您要捕获的表。 - 重启连接器。
- 如果要从新添加的表中捕获现有数据,则启动 增量快照。
-
编辑
- 初始快照没有捕获所有表的模式(storage
.only.captured.tables.ddl
设置为true
) 如果初始快照没有保存您要捕获的表的模式,请完成以下步骤之一:
- 流程 1:架构快照,后跟增量快照
在此过程中,连接器首先执行 schema 快照。然后,您可以启动增量快照,使连接器能够同步数据。
- 停止连接器。
-
删除由 schema.history.internal.
kafka.topic 属性指定的内部数据库架构历史记录
主题。 清除配置的 Kafka Connect
offset.storage.topic
中的偏移量。有关如何删除偏移的更多信息,请参阅 Debezium 社区常见问题解答。警告删除偏移应仅由具有操作内部 Kafka Connect 数据经验的高级用户执行。此操作可能具有破坏性,应仅作为最后的手段来执行。
为连接器配置中的属性设置值,如以下步骤所述:
-
将
snapshot.mode
属性的值设置为schema_only
。 -
编辑
table.include.list
以添加您要捕获的表。
-
将
- 重启连接器。
- 等待 Debezium 捕获新表和现有表的模式。在连接器停止后发生任何表的数据更改不会被捕获。
- 为确保没有丢失数据,请启动 增量快照。
- 步骤 2:初始快照,后跟可选的增量快照
在此过程中,连接器执行数据库的完整初始快照。与任何初始快照一样,在具有多个大型表的数据库中,运行初始快照可能会非常耗时。快照完成后,您可以选择触发增量快照来捕获连接器离线时发生的任何更改。
- 停止连接器。
-
删除由 schema.history.internal.
kafka.topic 属性指定的内部数据库架构历史记录
主题。 清除配置的 Kafka Connect
offset.storage.topic
中的偏移量。有关如何删除偏移的更多信息,请参阅 Debezium 社区常见问题解答。警告删除偏移应仅由具有操作内部 Kafka Connect 数据经验的高级用户执行。此操作可能具有破坏性,应仅作为最后的手段来执行。
-
编辑
table.include.list
以添加您要捕获的表。 为连接器配置中的属性设置值,如以下步骤所述:
-
将
snapshot.mode
属性的值设置为initial
。 -
(可选)将
schema.history.internal.store.only.captured.tables.ddl
设置为false
。
-
将
- 重启连接器。连接器获取完整的数据库快照。快照完成后,连接器会过渡到 streaming。
- (可选)要捕获连接器离线时更改的任何数据,请启动 增量快照。
6.1.5. 临时快照
默认情况下,连接器仅在首次启动后运行初始快照操作。在正常情况下,在这个初始快照后,连接器不会重复快照过程。连接器捕获的任何更改事件数据都只通过流处理。
然而,在某些情况下,连接器在初始快照期间获得的数据可能会过时、丢失或不完整。为了提供总结表数据的机制,Debezium 包含一个执行临时快照的选项。数据库中的以下更改可能会导致执行临时快照:
- 连接器配置会被修改为捕获不同的表集合。
- Kafka 主题已删除,必须重建。
- 由于配置错误或某些其他问题导致数据损坏。
您可以通过启动所谓的 临时快照来为之前捕获的表重新运行快照。临时快照需要使用 信号表。您可以通过向 Debezium 信号表发送信号请求来发起临时快照。
当您启动现有表的临时快照时,连接器会将内容附加到表已存在的主题中。如果删除了之前存在的主题,如果启用了 自动主题创建,Debezium 可以自动创建主题。
临时快照信号指定要包含在快照中的表。快照可以捕获整个数据库的内容,或者仅捕获数据库中表的子集。另外,快照也可以捕获数据库中表的内容子集。
您可以通过将 execute-snapshot
消息发送到信号表来指定要捕获的表。将 execute-snapshot
信号类型设置为 增量
,并提供快照中包含的表名称,如下表所述:
字段 | 默认 | 值 |
---|---|---|
|
|
指定您要运行的快照类型。 |
| N/A |
包含与要快照的表的完全限定域名匹配的正则表达式的数组。 |
| N/A | 可选字符串,根据表的列指定条件,用于捕获表的内容的子集。 |
| N/A | 可选字符串,指定连接器在快照过程中用作表的主键的列名称。 |
触发临时快照
您可以通过向信号表中添加 execute-snapshot
信号类型的条目来发起临时快照。连接器处理消息后,它会开始快照操作。快照进程读取第一个和最后一个主密钥值,并使用这些值作为每个表的开头和结束点。根据表中的条目数量以及配置的块大小,Debezium 会将表划分为块,并一次性执行每个块的快照。
6.1.6. 增量快照
为了提供管理快照的灵活性,Debezium 包含附加快照机制,称为 增量快照。增量快照依赖于 Debezium 机制 向 Debezium 连接器发送信号。
在增量快照中,除了一次捕获数据库的完整状态,就像初始快照一样,Debebe 会在一系列可配置的块中捕获每个表。您可以指定您希望快照捕获的表 以及每个块的大小。块大小决定了快照在数据库的每个获取操作期间收集的行数。增量快照的默认块大小为 1024 行。
当增量快照进行时,Debebe 使用 watermarks 跟踪其进度,维护它捕获的每个表行的记录。与标准初始快照过程相比,捕获数据的阶段方法具有以下优点:
- 您可以使用流化数据捕获并行运行增量快照,而不是在快照完成前进行后流。连接器会在快照过程中从更改日志中捕获接近实时事件,且操作都不会阻止其他操作。
- 如果增量快照的进度中断,您可以在不丢失任何数据的情况下恢复它。在进程恢复后,快照从停止的点开始,而不是从开始计算表。
-
您可以随时根据需要运行增量快照,并根据需要重复该过程以适应数据库更新。例如,您可以在修改连接器配置后重新运行快照,以将表添加到其
table.include.list
属性中。
增量快照过程
当您运行增量快照时,Debezium 会按主键对每个表进行排序,然后根据 配置的块大小 将表分成块。然后,按块的工作块会捕获块中的每个表行。对于它捕获的每行,快照会发出 READ
事件。该事件代表块的快照开始时的行值。
当快照继续进行时,其他进程可能会继续访问数据库,可能会修改表记录。为了反映此类更改,INSERT
、UPDATE
或 DELETE
操作会按照常常提交到事务日志。同样,持续 Debezium 流进程将继续检测这些更改事件,并将相应的更改事件记录发送到 Kafka。
Debezium 如何使用相同的主密钥在记录间解决冲突
在某些情况下,streaming 进程发出的 UPDATE
或 DELETE
事件会停止序列。也就是说,流流过程可能会发出一个修改表行的事件,该事件捕获包含该行的 READ
事件的块。当快照最终为行发出对应的 READ
事件时,其值已被替换。为确保以正确的逻辑顺序处理到达序列的增量快照事件,Debebe 使用缓冲方案来解析冲突。仅在快照事件和流化事件之间发生冲突后,De Debezium 会将事件记录发送到 Kafka。
快照窗口
为了帮助解决修改同一表行的后期事件和流化事件之间的冲突,Debebe 会使用一个所谓的 快照窗口。快照窗口分解了增量快照捕获指定表块数据的间隔。在块的快照窗口打开前,Debebe 会使用其常见行为,并将事件从事务日志直接下游发送到目标 Kafka 主题。但从特定块的快照打开后,直到关闭为止,De-duplication 步骤会在具有相同主密钥的事件之间解决冲突。
对于每个数据收集,Debezium 会发出两种类型的事件,并将其存储在单个目标 Kafka 主题中。从表直接捕获的快照记录作为 READ
操作发送。同时,当用户继续更新数据收集中的记录,并且会更新事务日志来反映每个提交,Debezium 会为每个更改发出 UPDATE
或 DELETE
操作。
当快照窗口打开时,Debezium 开始处理快照块,它会向内存缓冲区提供快照记录。在快照窗口期间,缓冲区中 READ
事件的主密钥与传入流事件的主键进行比较。如果没有找到匹配项,则流化事件记录将直接发送到 Kafka。如果 Debezium 检测到匹配项,它会丢弃缓冲的 READ
事件,并将流化记录写入目标主题,因为流的事件逻辑地取代静态快照事件。在块关闭的快照窗口后,缓冲区仅包含 READ
事件,这些事件不存在相关的事务日志事件。Debezium 将这些剩余的 READ
事件发送到表的 Kafka 主题。
连接器为每个快照块重复这个过程。
6.1.6.1. 触发增量快照
目前,启动增量快照的唯一方法是向源数据库上的 信号表发送临时快照 信号。
作为 SQL INSERT
查询,您将向信号提交信号。
在 Debezium 检测到信号表中的更改后,它会读取信号并运行请求的快照操作。
您提交的查询指定要包含在快照中的表,并可以选择指定快照操作的类型。目前,快照操作的唯一有效选项是默认值 incremental
。
要指定快照中包含的表,请提供列出表或用于匹配表的正则表达式数组的 数据集合
,例如:
{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}
增量快照信号的 data-collections
数组没有默认值。如果 data-collections
数组为空,Debezium 会检测到不需要任何操作,且不会执行快照。
如果要包含在快照中的表的名称在数据库、模式或表的名称中包含句点(.
),以将表添加到 data-collections
数组中,您必须使用双引号转义名称的每个部分。
例如,要包含一个存在于 公共
模式的表,其名称为 My.Table
,请使用以下格式 :"public"."My.Table
"。
先决条件
- 源数据库中存在信号数据收集。
-
信号数据收集在
signal.data.collection
属性中指定。
使用源信号频道来触发增量快照
发送 SQL 查询,将临时增量快照请求添加到信号表中:
INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<tableName>","<tableName>"],"type":"<snapshotType>","additional-condition":"<additional-condition>"}');
例如,
INSERT INTO myschema.debezium_signal (id, type, data) 1 values ('ad-hoc-1', 2 'execute-snapshot', 3 '{"data-collections": ["schema1.table1", "schema2.table2"], 4 "type":"incremental"}, 5 "additional-condition":"color=blue"}'); 6
命令中的
id
、type
和data
参数的值对应于 信号表 的字段。下表描述了示例中的参数:
表 6.3. SQL 命令中字段的描述,用于将增量快照信号发送到信号表 项 值 描述 1
myschema.debezium_signal
指定源数据库上信号表的完全限定名称。
2
ad-hoc-1
id
参数指定一个任意字符串,它被分配为信号请求的id
标识符。
使用此字符串识别信号表中的条目的日志记录消息。Debezium 不使用此字符串。相反,Debebe 会在快照期间生成自己的id
字符串作为水位线信号。3
execute-snapshot
type
参数指定信号旨在触发的操作。
4
data-collections
信号的
data
字段所需的组件,用于指定表名称或正则表达式数组,以匹配快照中包含的表名称。
数组列出了按照完全限定名称匹配表的正则表达式,其格式与您在signal.data.collection
配置属性中指定连接器信号表的名称相同。5
incremental
信号的
data
字段的可选类型
组件,用于指定要运行的快照操作类型。
目前,唯一有效的选项是默认值incremental
。
如果没有指定值,连接器将运行增量快照。6
additional-condition
可选字符串,根据表的列指定条件,用于捕获表的内容的子集。有关
additional-condition
参数的更多信息,请参阅带有额外条件
的临时增量快照。
带有额外条件
的临时增量快照
如果您希望快照只包含表中的内容子集,您可以通过向快照信号附加 additional-condition
参数来修改信号请求。
典型的快照的 SQL 查询采用以下格式:
SELECT * FROM <tableName> ....
通过添加 additional-condition
参数,您可以将 WHERE
条件附加到 SQL 查询中,如下例所示:
SELECT * FROM <tableName> WHERE <additional-condition> ....
以下示例显示了向信号表发送带有额外条件的临时增量快照请求的 SQL 查询:
INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<tableName>","<tableName>"],"type":"<snapshotType>","additional-condition":"<additional-condition>"}');
例如,假设您有一个包含以下列的 products
表:
-
ID
(主键) -
color
-
quantity
如果您需要 product
表的增量快照,其中只包含 color=blue
的数据项,您可以使用以下 SQL 语句来触发快照:
INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.products"],"type":"incremental", "additional-condition":"color=blue"}');
additional-condition
参数还允许您传递基于多个列的条件。例如,使用上例中的 product
表,您可以提交查询来触发增量快照,该快照仅包含 color=blue
和 quantity>10
的项数据:
INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.products"],"type":"incremental", "additional-condition":"color=blue AND quantity>10"}');
以下示例显示了连接器捕获的增量快照事件的 JSON。
示例:增加快照事件消息
{ "before":null, "after": { "pk":"1", "value":"New data" }, "source": { ... "snapshot":"incremental" 1 }, "op":"r", 2 "ts_ms":"1620393591654", "transaction":null }
项 | 字段名称 | 描述 |
---|---|---|
1 |
|
指定要运行的快照操作类型。 |
2 |
|
指定事件类型。 |
6.1.6.2. 使用 Kafka 信号频道来触发增量快照
您可以向 配置的 Kafka 主题 发送消息,以请求连接器来运行临时增量快照。
Kafka 消息的键必须与 topic.prefix
连接器配置选项的值匹配。
message 的值是带有 type
和 data
字段的 JSON 对象。
信号类型是 execute-snapshot
,data
字段必须具有以下字段:
字段 | 默认 | 值 |
---|---|---|
|
|
要执行的快照的类型。目前,Debeium 仅支持 |
| N/A |
以逗号分隔的正则表达式数组,与快照中包含的表的完全限定域名匹配。 |
| N/A | 可选字符串,指定连接器评估为指定要包含在快照中的列子集的条件。 |
execute-snapshot Kafka 消息示例:
Key = `test_connector` Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`
带有额外条件的临时增量快照
Debezium 使用 additional-condition
字段来选择表内容的子集。
通常,当 Debezium 运行快照时,它会运行 SQL 查询,例如:
SELECT * FROM <tableName> ….
当快照请求包含 additional-condition
时,extra-condition
会附加到 SQL 查询中,例如:
SELECT * FROM <tableName> WHERE <additional-condition> ….
例如,如果一个 product
table with the column id
(主键)、color
和 brand
,如果您希望快照只包含 color='blue'
的内容,当您请求快照时,您可以附加一个 additional-condition
语句来过滤内容:
Key = `test_connector` Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.products"], "type": "INCREMENTAL", "additional-condition":"color='blue'"}}`
您可以使用 additional-condition
语句根据多个列传递条件。例如,如果您希望快照只包含 color='blue'
的
表中,以及 products
brand='MyBrand'
,则您可以发送以下请求:
Key = `test_connector` Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.products"], "type": "INCREMENTAL", "additional-condition":"color='blue' AND brand='MyBrand'"}}`
6.1.6.3. 停止增量快照
您还可以通过向源数据库上的表发送信号来停止增量快照。您可以通过发送 SQL INSERT
查询向表提交停止快照信号。
在 Debezium 检测到信号表中的更改后,它会读取信号,并在正在进行时停止增量快照操作。
您提交的查询指定 增量
的快照操作,以及要删除的当前运行快照的表。
先决条件
- 源数据库中存在信号数据收集。
-
信号数据收集在
signal.data.collection
属性中指定。
使用源信号频道停止增量快照
发送 SQL 查询以停止临时增量快照到信号表:
INSERT INTO <signalTable> (id, type, data) values ('<id>', 'stop-snapshot', '{"data-collections": ["<tableName>","<tableName>"],"type":"incremental"}');
例如,
INSERT INTO myschema.debezium_signal (id, type, data) 1 values ('ad-hoc-1', 2 'stop-snapshot', 3 '{"data-collections": ["schema1.table1", "schema2.table2"], 4 "type":"incremental"}'); 5
signal 命令中的
id
、type
和data
参数的值对应于 信号表 的字段。下表描述了示例中的参数:
表 6.5. SQL 命令中字段的描述,用于将停止增量快照信号发送到信号表 项 值 描述 1
myschema.debezium_signal
指定源数据库上信号表的完全限定名称。
2
ad-hoc-1
id
参数指定一个任意字符串,它被分配为信号请求的id
标识符。
使用此字符串识别信号表中的条目的日志记录消息。Debezium 不使用此字符串。3
stop-snapshot
指定
type
参数指定信号要触发的操作。
4
data-collections
信号的
data
字段的可选组件,用于指定表名称或正则表达式数组,以匹配要从快照中删除的表名称。
数组列出了按照完全限定名称匹配表的正则表达式,其格式与您在signal.data.collection
配置属性中指定连接器信号表的名称相同。如果省略了data
字段的这一组件,信号将停止正在进行的整个增量快照。5
incremental
信号的
data
字段所需的组件,用于指定要停止的快照操作类型。
目前,唯一有效的选项是增量的
。
如果没有指定类型
值,信号将无法停止增量快照。
6.1.6.4. 使用 Kafka 信号频道停止增量快照
您可以将信号消息发送到 配置的 Kafka 信号主题,以停止临时增量快照。
Kafka 消息的键必须与 topic.prefix
连接器配置选项的值匹配。
message 的值是带有 type
和 data
字段的 JSON 对象。
信号类型是 stop-snapshot
,data
字段必须具有以下字段:
字段 | 默认 | 值 |
---|---|---|
|
|
要执行的快照的类型。目前,Debeium 仅支持 |
| N/A |
可选数组,以逗号分隔的正则表达式,与表的完全限定域名匹配,以包含在快照中。 |
以下示例显示了典型的 stop-snapshot
Kafka 信息:
Key = `test_connector` Value = `{"type":"stop-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`
6.1.7. 接收 Debezium MySQL 更改事件记录的默认 Kafka 主题名称
默认情况下,MySQL 连接器会将表中的所有 INSERT
、UPDATE
和 DELETE
操作的更改事件写入特定于该表的单一 Apache Kafka 主题。
连接器使用以下惯例来命名更改事件主题:
topicPrefix.databaseName.tableName
假设 fulfillment
是主题前缀,inventory
是数据库名称,数据库包含名为 orders
, customers
, 和 products
的表。Debezium MySQL 连接器将事件发送到三个 Kafka 主题,每个表对应一个数据库:
fulfillment.inventory.orders fulfillment.inventory.customers fulfillment.inventory.products
以下列表为默认名称的组件提供定义:
- topicPrefix
-
由
topic.prefix
连接器配置属性指定的主题前缀。 - schemaName
- 操作所在的模式的名称。
- tableName
- 操作所在的表的名称。
连接器应用类似的命名约定,以标记其内部数据库架构历史记录主题、架构更改主题 和事务元数据主题。
如果默认主题名称不满足您的要求,您可以配置自定义主题名称。要配置自定义主题名称,您可以在逻辑主题路由 SMT 中指定正则表达式。有关使用逻辑主题路由 SMT 来自定义主题命名的更多信息,请参阅 主题路由。
事务元数据
Debezium 可以生成代表事务边界的事件,以及丰富的数据更改事件消息。
Debezium 注册并只针对部署连接器后发生的事务接收元数据。部署连接器前发生的事务元数据不可用。
Debezium 为每个事务中的 BEGIN
和 END
分隔符生成事务边界事件。事务边界事件包含以下字段:
status
-
BEGIN
或END
. id
- 唯一事务标识符的字符串。
ts_ms
-
数据源的事务边界事件(
BEGIN
或END
事件)的时间。如果数据源没有向事件时间提供 Debezium,则该字段代表 Debezium 处理事件的时间。 event_count
(用于END
事件)- 事务发出的事件总数。
data_collections
(用于END
事件)-
data_collection
和event_count
元素的数组,用于指示连接器发出来自数据收集的更改的事件数量。
示例
{ "status": "BEGIN", "id": "0e4d5dcd-a33b-11ea-80f1-02010a22a99e:10", "ts_ms": 1486500577125, "event_count": null, "data_collections": null } { "status": "END", "id": "0e4d5dcd-a33b-11ea-80f1-02010a22a99e:10", "ts_ms": 1486500577691, "event_count": 2, "data_collections": [ { "data_collection": "s1.a", "event_count": 1 }, { "data_collection": "s2.a", "event_count": 1 } ] }
除非通过 topic.transaction
选项覆盖,否则连接器会将事务事件发送到 < topic.prefix>
.transaction
主题。
更改数据事件增强
启用事务元数据后,数据消息 Envelope
通过新的 transaction
字段进行了增强。此字段以字段复合的形式提供有关每个事件的信息:
id
- 唯一事务标识符的字符串。
total_order
- 事件在事务生成的所有事件中绝对位置。
data_collection_order
- 在事务发出的所有事件间,按数据收集位置。
以下是消息的示例:
{ "before": null, "after": { "pk": "2", "aa": "1" }, "source": { ... }, "op": "c", "ts_ms": "1580390884335", "transaction": { "id": "0e4d5dcd-a33b-11ea-80f1-02010a22a99e:10", "total_order": "1", "data_collection_order": "1" } }
对于没有启用 GTID 的系统,事务标识符是使用 binlog filename 和 binlog 位置的组合构建的。例如,如果与事务 BEGIN 事件对应的 binlog 文件名和位置分别是 mysql-bin.000002 和 1913,则 Debezium 构建的事务标识符为 file=mysql-bin.000002,pos=1913
。