7.2. Debezium Oracle 连接器数据更改事件的描述
每个数据更改事件,而 Oracle 连接器发出的事件都有一个键和值。键和值的结构取决于更改事件源自的表。有关 Debezium 如何构造主题名称的信息,请参阅 主题名称。
Debezium Oracle 连接器确保所有 Kafka Connect 模式名称 是有效的 Avro 模式名称。这意味着逻辑服务器名称必须以字母字符或下划线([a-z,A-Z,_])以及逻辑服务器名称中的剩余字符以及模式中的所有字符([a-z,A-Z,0-9,\_])开头。连接器会自动将无效字符替换为下划线字符。
当多个逻辑服务器名称、模式名称或表名称之间没有有效字符且这些字符被下划线替换时,这些命名冲突可能会导致。
Debezium 和 Kafka Connect 围绕 事件消息的持续流 设计。但是,这些事件的结构可能会随时间推移而改变,因此主题消费者很难处理。为了便于处理 mutable 事件结构,Kafka Connect 中的每个事件都是自包含的。每个消息键和值有两个部分:schema 和 payload。架构描述了有效负载的结构,而有效负载包含实际数据。
SYS
或 SYSTEM
用户帐户执行的更改不会被连接器捕获。
以下主题包含有关数据更改事件的更多详情:
7.2.1. 关于 Debezium Oracle 连接器更改事件中的键
对于每个更改的表,更改事件键是结构化的,以便在创建事件时,主键(或唯一键约束)中的每个列都有一个字段。
例如,在 inventory
数据库 schema 中定义的 customers
表,可能有以下更改事件键:
CREATE TABLE customers ( id NUMBER(9) GENERATED BY DEFAULT ON NULL AS IDENTITY (START WITH 1001) NOT NULL PRIMARY KEY, first_name VARCHAR2(255) NOT NULL, last_name VARCHAR2(255) NOT NULL, email VARCHAR2(255) NOT NULL UNIQUE );
如果 < topic.prefix&
gt;.transaction
配置属性被设置为 server1
,则数据库表中的 customer 表中发生的每个更改事件的 JSON 表示如下关键结构:
{ "schema": { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "ID" } ], "optional": false, "name": "server1.INVENTORY.CUSTOMERS.Key" }, "payload": { "ID": 1004 } }
键的 schema
部分包含一个 Kafka Connect 模式,它描述了 key 部分的内容。在前面的示例中,有效负载
值不是可选,该结构由名为 server1.DEBEZIUM.CUSTOMERS.Key
的 schema 定义,并且有一个名为 int32
的 id
的必要字段。键的 payload
字段的值表示它实际上是一个带有 id
字段的结构(在 JSON 中,是一个对象),其值为 1004
。
因此,您可以将这个键解释为 inventory.customers
表中的行(来自名为 server1
的连接器),其 id
主键列的值为 1004
。
7.2.2. 关于 Debezium Oracle 连接器更改事件中的值
更改事件消息中值的结构反映了消息中更改事件中的 message 键 结构,并包含 schema 部分和 payload 部分。
更改事件值的有效负载
更改事件值的有效数据部分中有一个 envelope 数据结构,它包含以下字段:
op
-
包含用于描述操作类型的字符串值的必需字段。Oracle 连接器更改事件值中的
op
字段包含以下值之一:c
(创建或插入)、u
(更新)、d
(删除)或r
(表示快照)。 before
-
可选字段(如果存在)描述事件 发生前 行的状态。该结构由
server1.INVENTORY.CUSTOMERS.Value
Kafka Connect 模式描述,server1
连接器用于inventory.customers
表中的所有行。
after
-
可选字段(如果存在)在发生 更改后 包含行的状态。该结构由用于
before
字段的同一server1.INVENTORY.CUSTOMERS.Value
Kafka Connect 模式描述。 source
包含描述事件源元数据的结构的必填字段。对于 Oracle 连接器,结构包括以下字段:
- Debezium 版本。
- 连接器名称。
- 事件是持续快照的一部分。
- 事务 ID (不包括快照)。
- 变化的 SCN。
- 指示源数据库中记录何时更改(对于快照,时间戳表示快照何时发生)。
进行更改的用户名
提示commit_scn
字段是可选的,描述了更改事件参与的事务提交的 SCN。
ts_ms
- 可选字段(如果存在),其中包含运行 Kafka Connect 任务的 JVM 中的系统时钟(基于连接器处理该事件)的时间(基于时钟)。
更改事件值的 schema
事件消息的 schema 部分包含一个 schema,用于描述有效负载的信封结构及其其中嵌套字段。
有关更改事件值的更多信息,请参阅以下主题:
创建 事件
以下示例显示了来自 change 事件键示例中描述的 customer 表中 create event 值的值:
{ "schema": { "type": "struct", "fields": [ { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "ID" }, { "type": "string", "optional": false, "field": "FIRST_NAME" }, { "type": "string", "optional": false, "field": "LAST_NAME" }, { "type": "string", "optional": false, "field": "EMAIL" } ], "optional": true, "name": "server1.DEBEZIUM.CUSTOMERS.Value", "field": "before" }, { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "ID" }, { "type": "string", "optional": false, "field": "FIRST_NAME" }, { "type": "string", "optional": false, "field": "LAST_NAME" }, { "type": "string", "optional": false, "field": "EMAIL" } ], "optional": true, "name": "server1.DEBEZIUM.CUSTOMERS.Value", "field": "after" }, { "type": "struct", "fields": [ { "type": "string", "optional": true, "field": "version" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": true, "field": "ts_ms" }, { "type": "string", "optional": true, "field": "txId" }, { "type": "string", "optional": true, "field": "scn" }, { "type": "string", "optional": true, "field": "commit_scn" }, { "type": "string", "optional": true, "field": "rs_id" }, { "type": "int64", "optional": true, "field": "ssn" }, { "type": "int32", "optional": true, "field": "redo_thread" }, { "type": "string", "optional": true, "field": "user_name" }, { "type": "boolean", "optional": true, "field": "snapshot" } ], "optional": false, "name": "io.debezium.connector.oracle.Source", "field": "source" }, { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" } ], "optional": false, "name": "server1.DEBEZIUM.CUSTOMERS.Envelope" }, "payload": { "before": null, "after": { "ID": 1004, "FIRST_NAME": "Anne", "LAST_NAME": "Kretchmar", "EMAIL": "annek@noanswer.org" }, "source": { "version": "2.3.4.Final", "name": "server1", "ts_ms": 1520085154000, "txId": "6.28.807", "scn": "2122185", "commit_scn": "2122185", "rs_id": "001234.00012345.0124", "ssn": 1, "redo_thread": 1, "user_name": "user", "snapshot": false }, "op": "c", "ts_ms": 1532592105975 } }
在上例中,请注意事件如何定义以下模式:
-
envelope (
server1.DEBEZIUM.CUSTOMERS.Envelope
)。 -
源
结构(io.debezium.connector.oracle.Source
,它特定于 Oracle 连接器并在所有事件间重复使用。 -
before
和after
字段的特定于表的模式。
before
和 after
字段的 schema 的名称的格式为 <logicalName>.<schemaName>.<tableName>.Value
, 因此完全独立与所有其他表的 schema。因此,当您使用 Avro converter 时,每个逻辑源中表的 Avro 模式都有自己的演进和历史记录。
此事件的 value 的 payload
部分提供有关事件的信息。它描述了已创建了行(op=c
),并显示 after
字段值,其中包含插入到 ID
、FIRST_NAME
、LAST_NAME
和 EMAIL
列的值。
默认情况下,事件的 JSON 表示比它们描述的行大得多。较大的大小是由于 JSON 表示,包括消息的 schema 和 payload 部分。您可以使用 Avro Converter 来缩小连接器写入 Kafka 主题的信息大小。
更新 事件
以下示例显示了一个 update 更改事件,连接器从与以前的 create 事件相同的表中捕获。
{ "schema": { ... }, "payload": { "before": { "ID": 1004, "FIRST_NAME": "Anne", "LAST_NAME": "Kretchmar", "EMAIL": "annek@noanswer.org" }, "after": { "ID": 1004, "FIRST_NAME": "Anne", "LAST_NAME": "Kretchmar", "EMAIL": "anne@example.com" }, "source": { "version": "2.3.4.Final", "name": "server1", "ts_ms": 1520085811000, "txId": "6.9.809", "scn": "2125544", "commit_scn": "2125544", "rs_id": "001234.00012345.0124", "ssn": 1, "redo_thread": 1, "user_name": "user", "snapshot": false }, "op": "u", "ts_ms": 1532592713485 } }
有效负载的结构与 create (插入)事件的有效负载相同,但以下值不同:
-
op
字段的值是u
,表示此行因为更新而更改。 -
before
字段显示行的前一个状态,以及更新
数据库提交前存在的值。 -
after
字段显示行的更新状态,EMAIL
值现在设置为anne@example.com
。 -
source
字段的结构包含与之前相同的字段,但这些值不同,因为连接器从 redo 日志中的不同位置捕获事件。 -
ts_ms
字段显示 Debezium 处理事件的时间戳。
payload
部分显示一些其他有用的信息。例如,通过比较 before
和 after
结构,我们可以确定提交后的行如何更改。源
结构提供有关此变化的 Oracle 记录的信息,从而提供可追溯性。它还有助于我们深入了解此事件与本主题中的其他事件以及其他主题相关的情况。它是否在之前、之后或作为与另一个事件相同的提交的一部分?
当更新行的主/唯一键的列时,行的键值会改变。因此,Debezium 会在此类 更新后发出三个 事件:
-
DELETE
事件。 - 一个 tombstone 事件,带有行的旧键。
-
为行提供新密钥的
INSERT
事件。
删除 事件
以下示例显示了上一次 create 和 update 事件示例中显示的表的 delete 事件。delete 事件的 schema
部分与这些事件的 schema
部分相同。
{ "schema": { ... }, "payload": { "before": { "ID": 1004, "FIRST_NAME": "Anne", "LAST_NAME": "Kretchmar", "EMAIL": "anne@example.com" }, "after": null, "source": { "version": "2.3.4.Final", "name": "server1", "ts_ms": 1520085153000, "txId": "6.28.807", "scn": "2122184", "commit_scn": "2122184", "rs_id": "001234.00012345.0124", "ssn": 1, "redo_thread": 1, "user_name": "user", "snapshot": false }, "op": "d", "ts_ms": 1532592105960 } }
与 create 或 update 事件相比,事件的 payload
部分显示了几个不同之处:
-
op
字段的值是d
,表示行已被删除。 -
before
字段显示与数据库提交中删除的行的前导状态。 -
after
字段的值为null
,表示行不再存在。 -
source
字段的结构中包括了多个在 create 或 update 事件中存在的键, 但ts_ms
,scn
, 和txId
中的值不同。 -
ts_ms
显示指示 Debezium 处理此事件的时间戳。
删除 事件为消费者提供处理此行删除所需的信息。
Oracle 连接器的事件被设计为使用 Kafka 日志压缩,只要保留每个键的最新消息,就可以删除一些旧的信息。这允许 Kafka 回收存储空间,同时确保主题包含完整的数据集,并可用于重新载入基于密钥的状态。
删除行时,上例中显示的 delete 事件值仍可用于日志压缩,因为 Kafka 能够删除使用同一键的所有之前消息。message 值必须设置为 null
,以指示 Kafka 删除共享同一键的所有消息。为了实现此目的,Debezium 的 Oracle 连接器总是遵循一个 delete 事件,它有一个特殊的 tombstone 事件,它具有相同的键但 null
值。您可以通过设置连接器属性 tombstones.on.delete
来改变默认的行为。
Truncate 事件
截断 更改事件信号,表示表已被截断。在这种情况下,message 键为 null
,message 值类似如下:
{ "schema": { ... }, "payload": { "before": null, "after": null, "source": { 1 "version": "2.3.4.Final", "connector": "oracle", "name": "oracle_server", "ts_ms": 1638974535000, "snapshot": "false", "db": "ORCLPDB1", "sequence": null, "schema": "DEBEZIUM", "table": "TEST_TABLE", "txId": "02000a0037030000", "scn": "13234397", "commit_scn": "13271102", "lcr_position": null, "rs_id": "001234.00012345.0124", "ssn": 1, "redo_thread": 1, "user_name": "user" }, "op": "t", 2 "ts_ms": 1638974558961, 3 "transaction": null } }
项 | 字段名称 | 描述 |
---|---|---|
1 |
|
描述事件源元数据的必需字段。在 truncate 事件值中,
|
2 |
|
描述操作类型的强制字符串。 |
3 |
|
可选字段,显示连接器处理事件的时间。这个时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。
|
因为 truncate 事件代表对整个表所做的更改,且没有消息密钥,所以在带有多个分区的主题中,不能保证消费者接收 truncate 事件和更改事件(创建、更新 等等)以订购表。例如,当消费者从不同的分区读取事件时,它可能会在为同一表接收 truncate 事件后收到表 的更新 事件。只有在主题使用单个分区时,才能保证排序。
如果您不想捕获 truncate 事件,请使用 skipped.operations
选项过滤它们。