8.5. 在 schema 更改后刷新捕获表
为 SQL Server 表启用更改数据捕获时,随着表中的更改,事件记录将保留到服务器上的捕获表中。如果您引入源表更改结构的变化,例如,通过添加新列,则该更改不会动态反映在更改表中。只要捕获表继续使用过时的模式,Debezium 连接器无法正确发出表的数据更改事件。您必须进行干预才能刷新捕获表,以便连接器恢复处理更改事件。
由于 CDC 在 SQL Server 中实施的方式,您无法使用 Debezium 更新捕获表。要刷新捕获表,一个必须是具有升级权限的 SQL Server 数据库 operator。作为 Debezium 用户,您必须使用 SQL Server 数据库 Operator 协调任务,以完成 schema 刷新并恢复到 Kafka 主题。
您可以使用以下方法之一在 schema 更改后更新捕获表:
- 离线 schema 更新 要求您在更新捕获表前停止 Debezium 连接器。
- 在线 schema 更新 可以在 Debezium 连接器运行时更新捕获表。
使用每种的步骤有优缺点。
无论您使用在线更新方法,还是离线更新方法,您必须在同一源表中应用后续模式更新前完成整个模式更新过程。最佳实践是在单一批处理中执行所有 DDL,因此只能运行一次。
在启用了 CDC 的源表中不支持一些架构更改。例如,如果在表中启用了 CDC,如果重命名了其中一个列或更改列类型,则 SQL Server 不允许更改表的 schema。
在将源表中的列从 NULL
改为 NOT NULL
或反之亦然后,SQL Server 连接器无法正确捕获更改的信息,直到创建新捕获实例后。如果您在列设计后没有创建新的捕获表,请更改连接器发出的事件记录无法正确指示列是可选的。也就是说,之前定义为可选(或 NULL
)的列继续是,尽管现在被定义为 NOT NULL
。同样,已根据需要定义的列(notNULL
)保留该设计,尽管它们现在被定义为 NULL
。
使用 sp_rename
功能重命名表后,它将继续在旧源表名称下发出更改,直到连接器重启为止。在重启连接器时,它将在新源表名称下发出更改。
8.5.1. 在 schema 更改后运行离线更新
离线 schema 更新提供了更新捕获表的安全方法。但是,离线更新可能不适用于需要高可用性的应用程序。
先决条件
- 更新被提交到启用了 CDC 的 SQL Server 表的 schema。
- 您是一个具有升级权限的 SQL Server 数据库 operator。
流程
- 暂停更新数据库的应用程序。
- 等待 Debezium 连接器流传输所有未流更改事件记录。
- 停止 Debezium 连接器。
- 对源表 schema 应用所有更改。
-
使用
sys.sp_cdc_enable_table
过程为 update 源表创建一个新的捕获表,参数@capture_instance
的唯一值。 - 恢复在第 1 步中暂停的应用程序。
- 启动 Debezium 连接器。
-
在 Debezium 连接器从新的捕获表开始流后,通过运行存储的流程
sys.sp_cdc_disable_table
来丢弃旧的捕获表,并将参数@capture_instance
设置为旧的捕获实例名称。
8.5.2. 在 schema 更改后运行在线更新
完成在线模式更新的过程要比运行离线模式更新的步骤简单,您可以完成它,而无需应用程序和数据处理中的任何停机时间。但是,随着在线架构更新,在更新源数据库中的架构后,可能会出现潜在的处理差距,但在创建新的捕获实例之前。在这个间隔中,更改事件仍然会被更改表的旧实例捕获,而保存到旧表的更改数据会保留之前模式的结构。例如,如果您向源表中添加新列,更改新捕获表就绪前生成的事件,请不要包含新列的字段。如果您的应用程序不容许这样的转换周期,则最好使用离线模式更新过程。
先决条件
- 更新被提交到启用了 CDC 的 SQL Server 表的 schema。
- 您是一个具有升级权限的 SQL Server 数据库 operator。
流程
- 对源表 schema 应用所有更改。
-
使用
@capture_instance
参数的唯一值,运行sys.sp_cdc_enable_table
,为 update 源表创建一个新的捕获表。 -
当 Debezium 从新的捕获表开始流时,您可以通过运行
sys.sp_cdc_disable_table
,并将参数@capture_instance
设置为旧的捕获实例名称来丢弃旧的捕获表。
示例:在数据库架构更改后运行在线模式更新
以下示例演示了如何在将 phone_number
添加到 customers
源表后在更改表中完成在线模式更新。
运行以下查询来修改
customers
源表的 schema,以添加phone_number
字段:ALTER TABLE customers ADD phone_number VARCHAR(32);
通过运行
sys.sp_cdc_enable_table
的步骤,创建新的捕获实例。EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'customers', @role_name = NULL, @supports_net_changes = 0, @capture_instance = 'dbo_customers_v2'; GO
运行以下查询,将新数据插入到
customers
表中:INSERT INTO customers(first_name,last_name,email,phone_number) VALUES ('John','Doe','john.doe@example.com', '+1-555-123456'); GO
Kafka Connect 日志通过类似以下信息的条目报告配置更新:
connect_1 | 2019-01-17 10:11:14,924 INFO || Multiple capture instances present for the same table: Capture instance "dbo_customers" [sourceTableId=testDB.dbo.customers, changeTableId=testDB.cdc.dbo_customers_CT, startLsn=00000024:00000d98:0036, changeTableObjectId=1525580473, stopLsn=00000025:00000ef8:0048] and Capture instance "dbo_customers_v2" [sourceTableId=testDB.dbo.customers, changeTableId=testDB.cdc.dbo_customers_v2_CT, startLsn=00000025:00000ef8:0048, changeTableObjectId=1749581271, stopLsn=NULL] [io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource] connect_1 | 2019-01-17 10:11:14,924 INFO || Schema will be changed for ChangeTable [captureInstance=dbo_customers_v2, sourceTableId=testDB.dbo.customers, changeTableId=testDB.cdc.dbo_customers_v2_CT, startLsn=00000025:00000ef8:0048, changeTableObjectId=1749581271, stopLsn=NULL] [io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource] ... connect_1 | 2019-01-17 10:11:33,719 INFO || Migrating schema to ChangeTable [captureInstance=dbo_customers_v2, sourceTableId=testDB.dbo.customers, changeTableId=testDB.cdc.dbo_customers_v2_CT, startLsn=00000025:00000ef8:0048, changeTableObjectId=1749581271, stopLsn=NULL] [io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource]
最后,
phone_number
字段添加到 schema,其值会出现在写入 Kafka 主题的消息中。... { "type": "string", "optional": true, "field": "phone_number" } ... "after": { "id": 1005, "first_name": "John", "last_name": "Doe", "email": "john.doe@example.com", "phone_number": "+1-555-123456" },
运行
sys.sp_cdc_disable_table
的步骤丢弃旧的捕获实例。EXEC sys.sp_cdc_disable_table @source_schema = 'dbo', @source_name = 'dbo_customers', @capture_instance = 'dbo_customers'; GO