搜索

8.5. 在 schema 更改后刷新捕获表

download PDF

为 SQL Server 表启用更改数据捕获时,随着表中的更改,事件记录将保留到服务器上的捕获表中。如果您引入源表更改结构的变化,例如,通过添加新列,则该更改不会动态反映在更改表中。只要捕获表继续使用过时的模式,Debezium 连接器无法正确发出表的数据更改事件。您必须进行干预才能刷新捕获表,以便连接器恢复处理更改事件。

由于 CDC 在 SQL Server 中实施的方式,您无法使用 Debezium 更新捕获表。要刷新捕获表,一个必须是具有升级权限的 SQL Server 数据库 operator。作为 Debezium 用户,您必须使用 SQL Server 数据库 Operator 协调任务,以完成 schema 刷新并恢复到 Kafka 主题。

您可以使用以下方法之一在 schema 更改后更新捕获表:

使用每种的步骤有优缺点。

警告

无论您使用在线更新方法,还是离线更新方法,您必须在同一源表中应用后续模式更新前完成整个模式更新过程。最佳实践是在单一批处理中执行所有 DDL,因此只能运行一次。

注意

在启用了 CDC 的源表中不支持一些架构更改。例如,如果在表中启用了 CDC,如果重命名了其中一个列或更改列类型,则 SQL Server 不允许更改表的 schema。

注意

在将源表中的列从 NULL 改为 NOT NULL 或反之亦然后,SQL Server 连接器无法正确捕获更改的信息,直到创建新捕获实例后。如果您在列设计后没有创建新的捕获表,请更改连接器发出的事件记录无法正确指示列是可选的。也就是说,之前定义为可选(或 NULL)的列继续是,尽管现在被定义为 NOT NULL。同样,已根据需要定义的列(notNULL)保留该设计,尽管它们现在被定义为 NULL

注意

使用 sp_rename 功能重命名表后,它将继续在旧源表名称下发出更改,直到连接器重启为止。在重启连接器时,它将在新源表名称下发出更改。

8.5.1. 在 schema 更改后运行离线更新

离线 schema 更新提供了更新捕获表的安全方法。但是,离线更新可能不适用于需要高可用性的应用程序。

先决条件

  • 更新被提交到启用了 CDC 的 SQL Server 表的 schema。
  • 您是一个具有升级权限的 SQL Server 数据库 operator。

流程

  1. 暂停更新数据库的应用程序。
  2. 等待 Debezium 连接器流传输所有未流更改事件记录。
  3. 停止 Debezium 连接器。
  4. 对源表 schema 应用所有更改。
  5. 使用 sys.sp_cdc_enable_table 过程为 update 源表创建一个新的捕获表,参数 @capture_instance 的唯一值。
  6. 恢复在第 1 步中暂停的应用程序。
  7. 启动 Debezium 连接器。
  8. 在 Debezium 连接器从新的捕获表开始流后,通过运行存储的流程 sys.sp_cdc_disable_table 来丢弃旧的捕获表,并将参数 @capture_instance 设置为旧的捕获实例名称。

8.5.2. 在 schema 更改后运行在线更新

完成在线模式更新的过程要比运行离线模式更新的步骤简单,您可以完成它,而无需应用程序和数据处理中的任何停机时间。但是,随着在线架构更新,在更新源数据库中的架构后,可能会出现潜在的处理差距,但在创建新的捕获实例之前。在这个间隔中,更改事件仍然会被更改表的旧实例捕获,而保存到旧表的更改数据会保留之前模式的结构。例如,如果您向源表中添加新列,更改新捕获表就绪前生成的事件,请不要包含新列的字段。如果您的应用程序不容许这样的转换周期,则最好使用离线模式更新过程。

先决条件

  • 更新被提交到启用了 CDC 的 SQL Server 表的 schema。
  • 您是一个具有升级权限的 SQL Server 数据库 operator。

流程

  1. 对源表 schema 应用所有更改。
  2. 使用 @capture_instance 参数的唯一值,运行 sys.sp_cdc_enable_table,为 update 源表创建一个新的捕获表。
  3. 当 Debezium 从新的捕获表开始流时,您可以通过运行 sys.sp_cdc_disable_table,并将参数 @capture_instance 设置为旧的捕获实例名称来丢弃旧的捕获表。

示例:在数据库架构更改后运行在线模式更新

以下示例演示了如何在将 phone_number 添加到 customers 源表后在更改表中完成在线模式更新。

  1. 运行以下查询来修改 customers 源表的 schema,以添加 phone_number 字段:

    ALTER TABLE customers ADD phone_number VARCHAR(32);
  2. 通过运行 sys.sp_cdc_enable_table 的步骤,创建新的捕获实例。

    EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'customers', @role_name = NULL, @supports_net_changes = 0, @capture_instance = 'dbo_customers_v2';
    GO
  3. 运行以下查询,将新数据插入到 customers 表中:

    INSERT INTO customers(first_name,last_name,email,phone_number) VALUES ('John','Doe','john.doe@example.com', '+1-555-123456');
    GO

    Kafka Connect 日志通过类似以下信息的条目报告配置更新:

    connect_1    | 2019-01-17 10:11:14,924 INFO   ||  Multiple capture instances present for the same table: Capture instance "dbo_customers" [sourceTableId=testDB.dbo.customers, changeTableId=testDB.cdc.dbo_customers_CT, startLsn=00000024:00000d98:0036, changeTableObjectId=1525580473, stopLsn=00000025:00000ef8:0048] and Capture instance "dbo_customers_v2" [sourceTableId=testDB.dbo.customers, changeTableId=testDB.cdc.dbo_customers_v2_CT, startLsn=00000025:00000ef8:0048, changeTableObjectId=1749581271, stopLsn=NULL]   [io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource]
    connect_1    | 2019-01-17 10:11:14,924 INFO   ||  Schema will be changed for ChangeTable [captureInstance=dbo_customers_v2, sourceTableId=testDB.dbo.customers, changeTableId=testDB.cdc.dbo_customers_v2_CT, startLsn=00000025:00000ef8:0048, changeTableObjectId=1749581271, stopLsn=NULL]   [io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource]
    ...
    connect_1    | 2019-01-17 10:11:33,719 INFO   ||  Migrating schema to ChangeTable [captureInstance=dbo_customers_v2, sourceTableId=testDB.dbo.customers, changeTableId=testDB.cdc.dbo_customers_v2_CT, startLsn=00000025:00000ef8:0048, changeTableObjectId=1749581271, stopLsn=NULL]   [io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource]

    最后,phone_number 字段添加到 schema,其值会出现在写入 Kafka 主题的消息中。

    ...
         {
            "type": "string",
            "optional": true,
            "field": "phone_number"
         }
    ...
        "after": {
          "id": 1005,
          "first_name": "John",
          "last_name": "Doe",
          "email": "john.doe@example.com",
          "phone_number": "+1-555-123456"
        },
  4. 运行 sys.sp_cdc_disable_table 的步骤丢弃旧的捕获实例。

    EXEC sys.sp_cdc_disable_table @source_schema = 'dbo', @source_name = 'dbo_customers', @capture_instance = 'dbo_customers';
    GO
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.