7.2. Debezium Oracle 连接器数据更改事件的描述


每个数据更改事件,而 Oracle 连接器发出的事件都有一个键和值。键和值的结构取决于更改事件源自的表。有关 Debezium 如何构造主题名称的信息,请参阅 主题名称

警告

Debezium Oracle 连接器确保所有 Kafka Connect 模式名称 是有效的 Avro 模式名称。这意味着逻辑服务器名称必须以字母字符或下划线([a-z,A-Z,_])以及逻辑服务器名称中的剩余字符以及模式中的所有字符([a-z,A-Z,0-9,\_])开头。连接器会自动将无效字符替换为下划线字符。

当多个逻辑服务器名称、模式名称或表名称之间没有有效字符且这些字符被下划线替换时,这些命名冲突可能会导致。

Debezium 和 Kafka Connect 围绕 事件消息的持续流 设计。但是,这些事件的结构可能会随时间推移而改变,因此主题消费者很难处理。为了便于处理 mutable 事件结构,Kafka Connect 中的每个事件都是自包含的。每个消息键和值有两个部分:schemapayload。架构描述了有效负载的结构,而有效负载包含实际数据。

警告

SYSSYSTEM 用户帐户执行的更改不会被连接器捕获。

以下主题包含有关数据更改事件的更多详情:

7.2.1. 关于 Debezium Oracle 连接器更改事件中的键

对于每个更改的表,更改事件键是结构化的,以便在创建事件时,主键(或唯一键约束)中的每个列都有一个字段。

例如,在 inventory 数据库 schema 中定义的 customers 表,可能有以下更改事件键:

CREATE TABLE customers (
  id NUMBER(9) GENERATED BY DEFAULT ON NULL AS IDENTITY (START WITH 1001) NOT NULL PRIMARY KEY,
  first_name VARCHAR2(255) NOT NULL,
  last_name VARCHAR2(255) NOT NULL,
  email VARCHAR2(255) NOT NULL UNIQUE
);

如果 < topic.prefix&gt;.transaction 配置属性被设置为 server1,则数据库表中的 customer 表中发生的每个更改事件的 JSON 表示如下关键结构:

{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "type": "int32",
                "optional": false,
                "field": "ID"
            }
        ],
        "optional": false,
        "name": "server1.INVENTORY.CUSTOMERS.Key"
    },
    "payload": {
        "ID": 1004
    }
}

键的 schema 部分包含一个 Kafka Connect 模式,它描述了 key 部分的内容。在前面的示例中,有效负载 值不是可选,该结构由名为 server1.DEBEZIUM.CUSTOMERS.Key 的 schema 定义,并且有一个名为 int32id 的必要字段。键的 payload 字段的值表示它实际上是一个带有 id 字段的结构(在 JSON 中,是一个对象),其值为 1004

因此,您可以将这个键解释为 inventory.customers 表中的行(来自名为 server1 的连接器),其 id 主键列的值为 1004

7.2.2. 关于 Debezium Oracle 连接器更改事件中的值

更改事件消息中值的结构反映了消息中更改事件中的 message 键 结构,并包含 schema 部分和 payload 部分。

更改事件值的有效负载

更改事件值的有效数据部分中有一个 envelope 数据结构,它包含以下字段:

op
包含用于描述操作类型的字符串值的必需字段。Oracle 连接器更改事件值中的 op 字段包含以下值之一: c (创建或插入)、u (更新)、d (删除)或 r (表示快照)。
before
可选字段(如果存在)描述事件 发生前 行的状态。该结构由 server1.INVENTORY.CUSTOMERS.Value Kafka Connect 模式描述,server1 连接器用于 inventory.customers 表中的所有行。
after
可选字段(如果存在)在发生 更改后 包含行的状态。该结构由用于 before 字段的同一 server1.INVENTORY.CUSTOMERS.Value Kafka Connect 模式描述。
source

包含描述事件源元数据的结构的必填字段。对于 Oracle 连接器,结构包括以下字段:

  • Debezium 版本。
  • 连接器名称。
  • 事件是持续快照的一部分。
  • 事务 ID (不包括快照)。
  • 变化的 SCN。
  • 指示源数据库中记录何时更改(对于快照,时间戳表示快照何时发生)。
  • 进行更改的用户名

    提示

    commit_scn 字段是可选的,描述了更改事件参与的事务提交的 SCN。

ts_ms
可选字段(如果存在),其中包含运行 Kafka Connect 任务的 JVM 中的系统时钟(基于连接器处理该事件)的时间(基于时钟)。

更改事件值的 schema

事件消息的 schema 部分包含一个 schema,用于描述有效负载的信封结构及其其中嵌套字段。

有关更改事件值的更多信息,请参阅以下主题:

创建 事件

以下示例显示了来自 change 事件键示例中描述的 customer 表中 create event 值的值:

{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "ID"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "FIRST_NAME"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "LAST_NAME"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "EMAIL"
                    }
                ],
                "optional": true,
                "name": "server1.DEBEZIUM.CUSTOMERS.Value",
                "field": "before"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "ID"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "FIRST_NAME"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "LAST_NAME"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "EMAIL"
                    }
                ],
                "optional": true,
                "name": "server1.DEBEZIUM.CUSTOMERS.Value",
                "field": "after"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "string",
                        "optional": true,
                        "field": "version"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "name"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "ts_ms"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "txId"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "scn"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "commit_scn"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "rs_id"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "ssn"
                    },
                    {
                        "type": "int32",
                        "optional": true,
                        "field": "redo_thread"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "user_name"
                    },
                    {
                        "type": "boolean",
                        "optional": true,
                        "field": "snapshot"
                    }
                ],
                "optional": false,
                "name": "io.debezium.connector.oracle.Source",
                "field": "source"
            },
            {
                "type": "string",
                "optional": false,
                "field": "op"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "ts_ms"
            }
        ],
        "optional": false,
        "name": "server1.DEBEZIUM.CUSTOMERS.Envelope"
    },
    "payload": {
        "before": null,
        "after": {
            "ID": 1004,
            "FIRST_NAME": "Anne",
            "LAST_NAME": "Kretchmar",
            "EMAIL": "annek@noanswer.org"
        },
        "source": {
            "version": "2.3.4.Final",
            "name": "server1",
            "ts_ms": 1520085154000,
            "txId": "6.28.807",
            "scn": "2122185",
            "commit_scn": "2122185",
            "rs_id": "001234.00012345.0124",
            "ssn": 1,
            "redo_thread": 1,
            "user_name": "user",
            "snapshot": false
        },
        "op": "c",
        "ts_ms": 1532592105975
    }
}

在上例中,请注意事件如何定义以下模式:

  • envelope (server1.DEBEZIUM.CUSTOMERS.Envelope)。
  • 结构(io.debezium.connector.oracle.Source,它特定于 Oracle 连接器并在所有事件间重复使用。
  • beforeafter 字段的特定于表的模式。
提示

beforeafter 字段的 schema 的名称的格式为 <logicalName>.<schemaName>.<tableName>.Value, 因此完全独立与所有其他表的 schema。因此,当您使用 Avro converter 时,每个逻辑源中表的 Avro 模式都有自己的演进和历史记录。

此事件的 valuepayload 部分提供有关事件的信息。它描述了已创建了行(op=c),并显示 after 字段值,其中包含插入到 IDFIRST_NAMELAST_NAMEEMAIL 列的值。

提示

默认情况下,事件的 JSON 表示比它们描述的行大得多。较大的大小是由于 JSON 表示,包括消息的 schema 和 payload 部分。您可以使用 Avro Converter 来缩小连接器写入 Kafka 主题的信息大小。

更新 事件

以下示例显示了一个 update 更改事件,连接器从与以前的 create 事件相同的表中捕获。

{
    "schema": { ... },
    "payload": {
        "before": {
            "ID": 1004,
            "FIRST_NAME": "Anne",
            "LAST_NAME": "Kretchmar",
            "EMAIL": "annek@noanswer.org"
        },
        "after": {
            "ID": 1004,
            "FIRST_NAME": "Anne",
            "LAST_NAME": "Kretchmar",
            "EMAIL": "anne@example.com"
        },
        "source": {
            "version": "2.3.4.Final",
            "name": "server1",
            "ts_ms": 1520085811000,
            "txId": "6.9.809",
            "scn": "2125544",
            "commit_scn": "2125544",
            "rs_id": "001234.00012345.0124",
            "ssn": 1,
            "redo_thread": 1,
            "user_name": "user",
            "snapshot": false
        },
        "op": "u",
        "ts_ms": 1532592713485
    }
}

有效负载的结构与 create (插入)事件的有效负载相同,但以下值不同:

  • op 字段的值是 u,表示此行因为更新而更改。
  • before 字段显示行的前一个状态,以及更新 数据库提交前存在的值。
  • after 字段显示行的更新状态,EMAIL 值现在设置为 anne@example.com
  • source 字段的结构包含与之前相同的字段,但这些值不同,因为连接器从 redo 日志中的不同位置捕获事件。
  • ts_ms 字段显示 Debezium 处理事件的时间戳。

payload 部分显示一些其他有用的信息。例如,通过比较 beforeafter 结构,我们可以确定提交后的行如何更改。 结构提供有关此变化的 Oracle 记录的信息,从而提供可追溯性。它还有助于我们深入了解此事件与本主题中的其他事件以及其他主题相关的情况。它是否在之前、之后或作为与另一个事件相同的提交的一部分?

注意

当更新行的主/唯一键的列时,行的键值会改变。因此,Debezium 会在此类 更新后发出三个 事件:

  • DELETE 事件。
  • 一个 tombstone 事件,带有行的旧键。
  • 为行提供新密钥的 INSERT 事件。

删除 事件

以下示例显示了上一次 createupdate 事件示例中显示的表的 delete 事件。delete 事件的 schema 部分与这些事件的 schema 部分相同。

{
    "schema": { ... },
    "payload": {
        "before": {
            "ID": 1004,
            "FIRST_NAME": "Anne",
            "LAST_NAME": "Kretchmar",
            "EMAIL": "anne@example.com"
        },
        "after": null,
        "source": {
            "version": "2.3.4.Final",
            "name": "server1",
            "ts_ms": 1520085153000,
            "txId": "6.28.807",
            "scn": "2122184",
            "commit_scn": "2122184",
            "rs_id": "001234.00012345.0124",
            "ssn": 1,
            "redo_thread": 1,
            "user_name": "user",
            "snapshot": false
        },
        "op": "d",
        "ts_ms": 1532592105960
    }
}

createupdate 事件相比,事件的 payload 部分显示了几个不同之处:

  • op 字段的值是 d,表示行已被删除。
  • before 字段显示与数据库提交中删除的行的前导状态。
  • after 字段的值为 null,表示行不再存在。
  • source 字段的结构中包括了多个在 createupdate 事件中存在的键, 但 ts_ms, scn, 和 txId 中的值不同。
  • ts_ms 显示指示 Debezium 处理此事件的时间戳。

删除 事件为消费者提供处理此行删除所需的信息。

Oracle 连接器的事件被设计为使用 Kafka 日志压缩,只要保留每个键的最新消息,就可以删除一些旧的信息。这允许 Kafka 回收存储空间,同时确保主题包含完整的数据集,并可用于重新载入基于密钥的状态。

删除行时,上例中显示的 delete 事件值仍可用于日志压缩,因为 Kafka 能够删除使用同一键的所有之前消息。message 值必须设置为 null,以指示 Kafka 删除共享同一键的所有消息。为了实现此目的,Debezium 的 Oracle 连接器总是遵循一个 delete 事件,它有一个特殊的 tombstone 事件,它具有相同的键但 null 值。您可以通过设置连接器属性 tombstones.on.delete 来改变默认的行为。

Truncate 事件

截断 更改事件信号,表示表已被截断。在这种情况下,message 键为 null,message 值类似如下:

{
    "schema": { ... },
    "payload": {
        "before": null,
        "after": null,
        "source": { 1
            "version": "2.3.4.Final",
            "connector": "oracle",
            "name": "oracle_server",
            "ts_ms": 1638974535000,
            "snapshot": "false",
            "db": "ORCLPDB1",
            "sequence": null,
            "schema": "DEBEZIUM",
            "table": "TEST_TABLE",
            "txId": "02000a0037030000",
            "scn": "13234397",
            "commit_scn": "13271102",
            "lcr_position": null,
            "rs_id": "001234.00012345.0124",
            "ssn": 1,
            "redo_thread": 1,
            "user_name": "user"
        },
        "op": "t", 2
        "ts_ms": 1638974558961, 3
        "transaction": null
    }
}
表 7.8. truncate 事件值字段的描述
字段名称描述

1

source

描述事件源元数据的必需字段。在 truncate 事件值中,source 字段结构与为同一表的 create, update, 和 delete 事件相同,提供此元数据:

  • Debezium 版本
  • 连接器类型和名称
  • 包含新行的数据库和表
  • 模式名称
  • 如果事件是快照的一部分(对于 截断 事件,始终为 false
  • 执行操作的事务的 ID
  • 操作的 SCN
  • 在数据库中进行更改时的时间戳
  • 执行更改的用户名

2

op

描述操作类型的强制字符串。op 字段值为 t,表示此表已被截断。

3

ts_ms

可选字段,显示连接器处理事件的时间。这个时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

在源 对象中,ts_ms 表示数据库中进行更改的时间。通过将 payload.source.ts_ms 的值与 payload.ts_ms 的值进行比较,您可以确定源数据库更新和 Debezium 之间的滞后。

因为 truncate 事件代表对整个表所做的更改,且没有消息密钥,所以在带有多个分区的主题中,不能保证消费者接收 truncate 事件和更改事件(创建更新 等等)以订购表。例如,当消费者从不同的分区读取事件时,它可能会在为同一表接收 truncate 事件后收到表 的更新 事件。只有在主题使用单个分区时,才能保证排序。

如果您不想捕获 truncate 事件,请使用 skipped.operations 选项过滤它们。

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.