8.5. 在模式更改后刷新捕获表


当为 SQL Server 表启用数据捕获时,如表中的更改一样,事件记录会保留到服务器上的捕获表中。如果您对源表更改的结构进行了更改,例如通过添加新列,则这个更改不会动态反映在更改表中。只要捕获表继续使用过时的模式,Debezium 连接器无法正确为表发出数据更改事件。您必须刷新捕获表才能使连接器恢复处理更改事件。

由于 CDC 在 SQL Server 中实施的方式,您无法使用 Debezium 更新捕获表。要刷新捕获表,必须是具有升级权限的 SQL Server 数据库 operator。作为 Debezium 用户,您必须使用 SQL Server 数据库 operator 协调任务,以完成架构刷新并恢复到 Kafka 主题。

您可以使用以下方法之一在 schema 更改后更新捕获表:

使用每种流程的优点和缺点。

警告

无论您是使用在线还是离线更新方法,必须在在同一源表中应用后续的模式更新前完成整个模式更新过程。最佳实践是在单个批处理中执行所有 DDL,这样流程只能运行一次。

注意

启用 CDC 的源表不支持一些模式更改。例如,如果在表中启用了 CDC,如果您重命名了其中一个列或更改列类型,则 SQL Server 不允许更改表的 schema。

注意

将源表中的列从 NULL 改为 NOT NULL 或反之后,SQL Server 连接器无法正确捕获更改的信息,直到您创建新的捕获实例为止。如果您在更改列设计后没有创建新的捕获表,则更改连接器发出的事件记录无法正确指示列是否是可选的。也就是说,之前定义为可选(或 NULL)的列仍然如此,尽管现在被定义为 NOT NULL。同样,已根据需要定义的列(notNULL),保留该设计,虽然它们现在定义为 NULL

注意

使用 sp_rename 功能重命名表后,它将继续在旧源表名称下发出更改,直到连接器重启为止。重启连接器后,它将在新源表名称下发出更改。

8.5.1. 模式更改后运行离线更新

离线模式更新提供了更新捕获表的安全方法。但是,离线更新可能不适用于需要高可用性的应用程序。

先决条件

  • 更新被提交到启用了 CDC 的 SQL Server 表的 schema。
  • 您是一个具有升级权限的 SQL Server 数据库 operator。

流程

  1. 暂停更新数据库的应用。
  2. 等待 Debezium 连接器流传输所有未流更改事件记录。
  3. 停止 Debezium 连接器。
  4. 将所有更改应用到源表模式。
  5. 使用 sys.sp_cdc_enable_table 进程为更新源表创建一个新的捕获表,以及参数 @capture_instance 的唯一值。
  6. 恢复在第 1 步中暂停的应用程序。
  7. 启动 Debezium 连接器。
  8. 在 Debezium 连接器从新的捕获表中启动流后,通过运行存储的步骤 sys.sp_cdc_disable_table 来丢弃旧的捕获表,并将参数 @capture_instance 设置为旧捕获实例名称。

8.5.2. 架构更改后运行在线更新

完成在线架构更新的过程比运行离线架构更新的步骤要简单,您可以在应用程序和数据处理中无需停机才能完成。但是,有了在线架构更新,在更新源数据库中的模式后,可能会出现潜在的处理差距,但在创建新捕获实例前可能会发生。在该间隔中,更改事件仍然被更改表的旧实例捕获,而保存到旧表中的更改数据会保留之前架构的结构。例如,如果您在源表中添加新列,请在新捕获表就绪前更改生成的事件,请不要包含新列的字段。如果您的应用程序没有容许这样的转换周期,最好使用离线架构更新过程。

先决条件

  • 更新被提交到启用了 CDC 的 SQL Server 表的 schema。
  • 您是一个具有升级权限的 SQL Server 数据库 operator。

流程

  1. 将所有更改应用到源表模式。
  2. 通过运行带有参数 @capture_instance 的唯一值的 sys.sp_cdc_enable_table 存储的流程,为更新源表创建一个新的捕获表。
  3. 当 Debezium 从新捕获表中启动流时,您可以通过运行 sys.sp_cdc_disable_table 存储流程并将参数 @capture_instance 设置为旧捕获实例名称来丢弃旧的捕获表。

示例:在数据库 schema 更改后运行在线 schema 更新

以下示例演示了如何在将列 phone_number 添加到 customers 源表中后在更改表中完成在线模式更新。

  1. 运行以下查询来添加 phone_number 字段来修改 customers 源表的 schema:

    ALTER TABLE customers ADD phone_number VARCHAR(32);
  2. 通过运行 sys.sp_cdc_enable_table 存储的步骤来创建新的捕获实例。

    EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'customers', @role_name = NULL, @supports_net_changes = 0, @capture_instance = 'dbo_customers_v2';
    GO
  3. 运行以下查询在 customers 表中插入新数据:

    INSERT INTO customers(first_name,last_name,email,phone_number) VALUES ('John','Doe','john.doe@example.com', '+1-555-123456');
    GO

    Kafka Connect 日志通过类似以下消息的条目报告配置更新:

    connect_1    | 2019-01-17 10:11:14,924 INFO   ||  Multiple capture instances present for the same table: Capture instance "dbo_customers" [sourceTableId=testDB.dbo.customers, changeTableId=testDB.cdc.dbo_customers_CT, startLsn=00000024:00000d98:0036, changeTableObjectId=1525580473, stopLsn=00000025:00000ef8:0048] and Capture instance "dbo_customers_v2" [sourceTableId=testDB.dbo.customers, changeTableId=testDB.cdc.dbo_customers_v2_CT, startLsn=00000025:00000ef8:0048, changeTableObjectId=1749581271, stopLsn=NULL]   [io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource]
    connect_1    | 2019-01-17 10:11:14,924 INFO   ||  Schema will be changed for ChangeTable [captureInstance=dbo_customers_v2, sourceTableId=testDB.dbo.customers, changeTableId=testDB.cdc.dbo_customers_v2_CT, startLsn=00000025:00000ef8:0048, changeTableObjectId=1749581271, stopLsn=NULL]   [io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource]
    ...
    connect_1    | 2019-01-17 10:11:33,719 INFO   ||  Migrating schema to ChangeTable [captureInstance=dbo_customers_v2, sourceTableId=testDB.dbo.customers, changeTableId=testDB.cdc.dbo_customers_v2_CT, startLsn=00000025:00000ef8:0048, changeTableObjectId=1749581271, stopLsn=NULL]   [io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource]

    最后,将 phone_number 字段添加到 schema 中,其值会出现在写入 Kafka 主题的消息中。

    ...
         {
            "type": "string",
            "optional": true,
            "field": "phone_number"
         }
    ...
        "after": {
          "id": 1005,
          "first_name": "John",
          "last_name": "Doe",
          "email": "john.doe@example.com",
          "phone_number": "+1-555-123456"
        },
  4. 运行 sys.sp_cdc_disable_table 存储的步骤来丢弃旧的捕获实例。

    EXEC sys.sp_cdc_disable_table @source_schema = 'dbo', @source_name = 'dbo_customers', @capture_instance = 'dbo_customers';
    GO
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.