8.5. 在模式更改后刷新捕获表
当为 SQL Server 表启用数据捕获时,如表中的更改一样,事件记录会保留到服务器上的捕获表中。如果您对源表更改的结构进行了更改,例如通过添加新列,则这个更改不会动态反映在更改表中。只要捕获表继续使用过时的模式,Debezium 连接器无法正确为表发出数据更改事件。您必须刷新捕获表才能使连接器恢复处理更改事件。
由于 CDC 在 SQL Server 中实施的方式,您无法使用 Debezium 更新捕获表。要刷新捕获表,必须是具有升级权限的 SQL Server 数据库 operator。作为 Debezium 用户,您必须使用 SQL Server 数据库 operator 协调任务,以完成架构刷新并恢复到 Kafka 主题。
您可以使用以下方法之一在 schema 更改后更新捕获表:
使用每种流程的优点和缺点。
无论您是使用在线还是离线更新方法,必须在在同一源表中应用后续的模式更新前完成整个模式更新过程。最佳实践是在单个批处理中执行所有 DDL,这样流程只能运行一次。
启用 CDC 的源表不支持一些模式更改。例如,如果在表中启用了 CDC,如果您重命名了其中一个列或更改列类型,则 SQL Server 不允许更改表的 schema。
将源表中的列从 NULL
改为 NOT NULL
或反之后,SQL Server 连接器无法正确捕获更改的信息,直到您创建新的捕获实例为止。如果您在更改列设计后没有创建新的捕获表,则更改连接器发出的事件记录无法正确指示列是否是可选的。也就是说,之前定义为可选(或 NULL
)的列仍然如此,尽管现在被定义为 NOT NULL
。同样,已根据需要定义的列(notNULL
),保留该设计,虽然它们现在定义为 NULL
。
使用 sp_rename
功能重命名表后,它将继续在旧源表名称下发出更改,直到连接器重启为止。重启连接器后,它将在新源表名称下发出更改。
8.5.1. 模式更改后运行离线更新
离线模式更新提供了更新捕获表的安全方法。但是,离线更新可能不适用于需要高可用性的应用程序。
先决条件
- 更新被提交到启用了 CDC 的 SQL Server 表的 schema。
- 您是一个具有升级权限的 SQL Server 数据库 operator。
流程
- 暂停更新数据库的应用。
- 等待 Debezium 连接器流传输所有未流更改事件记录。
- 停止 Debezium 连接器。
- 将所有更改应用到源表模式。
-
使用
sys.sp_cdc_enable_table
进程为更新源表创建一个新的捕获表,以及参数@capture_instance
的唯一值。 - 恢复在第 1 步中暂停的应用程序。
- 启动 Debezium 连接器。
-
在 Debezium 连接器从新的捕获表中启动流后,通过运行存储的步骤
sys.sp_cdc_disable_table
来丢弃旧的捕获表,并将参数@capture_instance
设置为旧捕获实例名称。
8.5.2. 架构更改后运行在线更新
完成在线架构更新的过程比运行离线架构更新的步骤要简单,您可以在应用程序和数据处理中无需停机才能完成。但是,有了在线架构更新,在更新源数据库中的模式后,可能会出现潜在的处理差距,但在创建新捕获实例前可能会发生。在该间隔中,更改事件仍然被更改表的旧实例捕获,而保存到旧表中的更改数据会保留之前架构的结构。例如,如果您在源表中添加新列,请在新捕获表就绪前更改生成的事件,请不要包含新列的字段。如果您的应用程序没有容许这样的转换周期,最好使用离线架构更新过程。
先决条件
- 更新被提交到启用了 CDC 的 SQL Server 表的 schema。
- 您是一个具有升级权限的 SQL Server 数据库 operator。
流程
- 将所有更改应用到源表模式。
-
通过运行带有参数
@capture_instance
的唯一值的sys.sp_cdc_enable_table
存储的流程,为更新源表创建一个新的捕获表。 -
当 Debezium 从新捕获表中启动流时,您可以通过运行
sys.sp_cdc_disable_table
存储流程并将参数@capture_instance
设置为旧捕获实例名称来丢弃旧的捕获表。
示例:在数据库 schema 更改后运行在线 schema 更新
以下示例演示了如何在将列 phone_number
添加到 customers
源表中后在更改表中完成在线模式更新。
运行以下查询来添加
phone_number
字段来修改customers
源表的 schema:ALTER TABLE customers ADD phone_number VARCHAR(32);
通过运行
sys.sp_cdc_enable_table
存储的步骤来创建新的捕获实例。EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'customers', @role_name = NULL, @supports_net_changes = 0, @capture_instance = 'dbo_customers_v2'; GO
运行以下查询在
customers
表中插入新数据:INSERT INTO customers(first_name,last_name,email,phone_number) VALUES ('John','Doe','john.doe@example.com', '+1-555-123456'); GO
Kafka Connect 日志通过类似以下消息的条目报告配置更新:
connect_1 | 2019-01-17 10:11:14,924 INFO || Multiple capture instances present for the same table: Capture instance "dbo_customers" [sourceTableId=testDB.dbo.customers, changeTableId=testDB.cdc.dbo_customers_CT, startLsn=00000024:00000d98:0036, changeTableObjectId=1525580473, stopLsn=00000025:00000ef8:0048] and Capture instance "dbo_customers_v2" [sourceTableId=testDB.dbo.customers, changeTableId=testDB.cdc.dbo_customers_v2_CT, startLsn=00000025:00000ef8:0048, changeTableObjectId=1749581271, stopLsn=NULL] [io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource] connect_1 | 2019-01-17 10:11:14,924 INFO || Schema will be changed for ChangeTable [captureInstance=dbo_customers_v2, sourceTableId=testDB.dbo.customers, changeTableId=testDB.cdc.dbo_customers_v2_CT, startLsn=00000025:00000ef8:0048, changeTableObjectId=1749581271, stopLsn=NULL] [io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource] ... connect_1 | 2019-01-17 10:11:33,719 INFO || Migrating schema to ChangeTable [captureInstance=dbo_customers_v2, sourceTableId=testDB.dbo.customers, changeTableId=testDB.cdc.dbo_customers_v2_CT, startLsn=00000025:00000ef8:0048, changeTableObjectId=1749581271, stopLsn=NULL] [io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource]
最后,将
phone_number
字段添加到 schema 中,其值会出现在写入 Kafka 主题的消息中。... { "type": "string", "optional": true, "field": "phone_number" } ... "after": { "id": 1005, "first_name": "John", "last_name": "Doe", "email": "john.doe@example.com", "phone_number": "+1-555-123456" },
运行
sys.sp_cdc_disable_table
存储的步骤来丢弃旧的捕获实例。EXEC sys.sp_cdc_disable_table @source_schema = 'dbo', @source_name = 'dbo_customers', @capture_instance = 'dbo_customers'; GO