6.2. Debezium Oracle 连接器数据更改事件的描述
Oracle 连接器发出的每个数据更改事件都有一个键和值。键和值的结构取决于更改事件源自的表。有关 Debezium 如何构建主题名称的详情,请参考 主题名称。
Debezium Oracle 连接器确保所有 Kafka Connect 模式名称都 是有效的 Avro 模式名称。这意味着,逻辑服务器名称必须以字母字符或下划线([a-z,A-Z,_])以及逻辑服务器名称中的其余字符以及架构和表名称中的所有字符都必须是字母数字字符或下划线([a-z,A-Z,0-9,\_])。连接器自动将无效字符替换为下划线字符。
当多个逻辑服务器名称、模式名称或表名称之间的字符无效字符无效字符时,意外命名冲突可能会导致字符区分。
Debezium 和 Kafka Connect 围绕 事件消息的持续流 设计。但是,这些事件的结构可能会随时间而变化,这很难以处理主题。为便于处理可变事件结构,Kafka Connect 中的每个事件都是自我包含的。每个消息键和值有两个部分:schema 和 payload。schema 描述了有效负载的结构,而有效负载包含实际数据。
由 SYS
或 SYSTEM
用户帐户执行的更改不会被连接器捕获。
以下主题包含有关数据更改事件的更多详情:
6.2.1. 关于 Debezium Oracle 连接器更改事件中的键
对于每个更改的表,更改事件键是结构化的,以便在事件创建事件时为表的主键(或唯一键约束)中有一个字段。
例如,在 inventory
数据库架构中定义的 客户
表可能有以下更改事件键:
CREATE TABLE customers ( id NUMBER(9) GENERATED BY DEFAULT ON NULL AS IDENTITY (START WITH 1001) NOT NULL PRIMARY KEY, first_name VARCHAR2(255) NOT NULL, last_name VARCHAR2(255) NOT NULL, email VARCHAR2(255) NOT NULL UNIQUE );
如果 < topic.prefix&
gt;.transaction
配置属性的值设置为 server1
,则数据库表中的 customers
表中发生的每个更改事件的 JSON 表示具有以下键结构:
{ "schema": { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "ID" } ], "optional": false, "name": "server1.INVENTORY.CUSTOMERS.Key" }, "payload": { "ID": 1004 } }
键的 schema
部分包含一个 Kafka Connect 模式,用于描述密钥部分的内容。在上例中,有效负载
值不是可选,其结构由名为 server1.DEBEZIUM.CUSTOMERS.Key
的 schema 定义,一个名为 id
的 name type int32
。键的 payload
字段的值表示它实际上是一个带有 id
字段的结构(在 JSON 中,是一个对象),其值为 1004
。
因此,您可以将这个键解释为 inventory.customers
表中的行(来自名为 server1
的连接器),其 id
主键列的值为 1004
。
6.2.2. 关于 Debezium Oracle 连接器更改事件中的值
更改事件消息中的值结构反映了消息中 更改事件中的消息键 的结构,并且包含 schema 部分和 payload 部分。
更改事件值的有效负载
更改事件值的 payload 部分中的 envelope 结构包含以下字段:
op
-
包含描述操作类型的字符串值的必填字段。Oracle 连接器更改事件值有效负载中的
op
字段包含以下值之一:c
(创建或插入)、u
(update)、d
(delete)或r
(读,代表快照)。 之前
-
可选字段(如果存在)描述了事件 发生前 行的状态。该结构由
server1.INVENTORY.CUSTOMERS.Value
Kafka Connect 模式描述,server1
连接器用于inventory.customers
表中的所有行。
after
-
一个可选字段,如果存在,包含 更改后 行的状态。该结构由用于
before
字段的同一server1.INVENTORY.CUSTOMERS.Value
Kafka Connect 模式描述。 source
包含描述事件源元数据的结构的必填字段。如果是 Oracle 连接器,结构包括以下字段:
- Debezium 版本。
- 连接器名称。
- 事件是持续快照的一部分。
- 事务 ID (不包括快照)。
- 更改的 SCN。
- 指明源数据库中记录何时更改的时间戳(用于快照,时间戳指示快照何时发生)。
进行更改的用户名
提示commit_scn
字段是可选的,并描述了更改事件参与的事务提交的 SCN。
ts_ms
- 可选字段(如果存在)包含连接器处理事件的 JVM 中系统时钟的时间(基于系统时钟)。
更改事件值的 schema
事件消息值的 schema 部分包含一个 schema,用于描述有效负载的 envelope 结构及其中的嵌套字段。
有关更改事件值的更多信息,请参阅以下主题:
创建 事件
以下示例显示了 customers
表中的 create 事件值的值,如 更改事件键示例中所述 :
{ "schema": { "type": "struct", "fields": [ { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "ID" }, { "type": "string", "optional": false, "field": "FIRST_NAME" }, { "type": "string", "optional": false, "field": "LAST_NAME" }, { "type": "string", "optional": false, "field": "EMAIL" } ], "optional": true, "name": "server1.DEBEZIUM.CUSTOMERS.Value", "field": "before" }, { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "ID" }, { "type": "string", "optional": false, "field": "FIRST_NAME" }, { "type": "string", "optional": false, "field": "LAST_NAME" }, { "type": "string", "optional": false, "field": "EMAIL" } ], "optional": true, "name": "server1.DEBEZIUM.CUSTOMERS.Value", "field": "after" }, { "type": "struct", "fields": [ { "type": "string", "optional": true, "field": "version" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": true, "field": "ts_ms" }, { "type": "string", "optional": true, "field": "txId" }, { "type": "string", "optional": true, "field": "scn" }, { "type": "string", "optional": true, "field": "commit_scn" }, { "type": "string", "optional": true, "field": "rs_id" }, { "type": "int32", "optional": true, "field": "ssn" }, { "type": "int32", "optional": true, "field": "redo_thread" }, { "type": "string", "optional": true, "field": "user_name" }, { "type": "boolean", "optional": true, "field": "snapshot" } ], "optional": false, "name": "io.debezium.connector.oracle.Source", "field": "source" }, { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" } ], "optional": false, "name": "server1.DEBEZIUM.CUSTOMERS.Envelope" }, "payload": { "before": null, "after": { "ID": 1004, "FIRST_NAME": "Anne", "LAST_NAME": "Kretchmar", "EMAIL": "annek@noanswer.org" }, "source": { "version": "2.1.4.Final", "name": "server1", "ts_ms": 1520085154000, "txId": "6.28.807", "scn": "2122185", "commit_scn": "2122185", "rs_id": "001234.00012345.0124", "ssn": 1, "redo_thread": 1, "user_name": "user", "snapshot": false }, "op": "c", "ts_ms": 1532592105975 } }
在上例中,注意事件如何定义以下模式:
-
envelope (
server1.DEBEZIUM.CUSTOMERS.Envelope
). -
源
结构(io.debezium.connector.oracle.Source
,它特定于 Oracle 连接器并在所有事件间重复使用。 -
用于
before
和after
字段的特定于表的模式。
before
和 after
字段的 schema 的名称的格式为 <logicalName>.<schemaName>.<tableName>.Value
, 因此完全独立与所有其他表的 schema。因此,当您使用 Avro converter 时,每个逻辑源中的表的 Avro 模式都有自己的演进和历史记录。
此事件值的 payload
部分提供有关事件的信息。它描述了创建了行(op=c
),并显示 after
字段值包含插入到行 ID
、FIRST_NAME
、LAST_NAME
和 EMAIL
列中的值。
默认情况下,事件的 JSON 表示大于它们描述的行。较大的大小是由 JSON 表示的,包括消息的 schema 和 payload 部分。您可以使用 Avro Converter 减少连接器写入 Kafka 主题的消息大小。
更新 事件
以下示例显示了连接器从上一次创建事件相同的表中的 更新 更改事件。
{ "schema": { ... }, "payload": { "before": { "ID": 1004, "FIRST_NAME": "Anne", "LAST_NAME": "Kretchmar", "EMAIL": "annek@noanswer.org" }, "after": { "ID": 1004, "FIRST_NAME": "Anne", "LAST_NAME": "Kretchmar", "EMAIL": "anne@example.com" }, "source": { "version": "2.1.4.Final", "name": "server1", "ts_ms": 1520085811000, "txId": "6.9.809", "scn": "2125544", "commit_scn": "2125544", "rs_id": "001234.00012345.0124", "ssn": 1, "redo_thread": 1, "user_name": "user", "snapshot": false }, "op": "u", "ts_ms": 1532592713485 } }
有效负载的结构与 创建 (插入)事件的有效负载相同,但以下值不同:
-
op
字段的值是u
,表示此行因为更新而改变。 -
before
字段显示行的前者状态,以及更新
数据库提交前存在的值。 -
after
字段显示行的更新状态,EMAIL
值现在设置为anne@example.com
。 -
source
字段的结构包含与之前相同的字段,但它们的值不同,因为连接器从红色日志中的不同位置捕获事件。 -
ts_ms
字段显示 Debezium 处理事件的时间戳。
payload
部分显示一些其他有用的信息。例如,通过比较 before
和 after
结构,我们可以确定在提交后如何更改行。源
结构提供有关 Oracle 在此变化记录相关的信息,从而提供了可追溯性。另外,当此事件与本主题和其它事件相关时,还会深入了解。它是否在与另一个事件相同的提交之前、之后或作为另一个事件的一部分发生?
更新行主/唯一键的列时,行键的值会改变。因此,Debezium 会在此类 更新后发出三个 事件:
-
DELETE
事件。 - 一个 tombstone 事件,带有行的旧键。
-
为行提供新密钥的
INSERT
事件。
删除 事件
以下示例显示了上一次 create 和 update 事件示例中显示的表的 delete 事件。delete 事件的 schema
部分与这些事件的 schema
部分相同。
{ "schema": { ... }, "payload": { "before": { "ID": 1004, "FIRST_NAME": "Anne", "LAST_NAME": "Kretchmar", "EMAIL": "anne@example.com" }, "after": null, "source": { "version": "2.1.4.Final", "name": "server1", "ts_ms": 1520085153000, "txId": "6.28.807", "scn": "2122184", "commit_scn": "2122184", "rs_id": "001234.00012345.0124", "ssn": 1, "redo_thread": 1, "user_name": "user", "snapshot": false }, "op": "d", "ts_ms": 1532592105960 } }
与 create 或 update 事件相比,事件的 payload
部分显示了几个不同之处:
-
op
字段的值为d
,表示已删除该行。 -
before
字段显示数据库提交删除的行的前一个状态。 -
after
字段的值为null
,表示行不再存在。 -
source
字段的结构中包括了多个在 create 或 update 事件中存在的键, 但ts_ms
,scn
, 和txId
中的值不同。 -
ts_ms
显示指示 Debezium 处理此事件的时间戳。
删除 事件为消费者提供处理此行所需的信息。
Oracle 连接器的事件被设计为用于 Kafka 日志压缩,这样可允许删除一些旧的信息,只要每个键至少保留最新的消息。这允许 Kafka 在确保主题包含完整的数据集并可用于重新载入基于密钥的状态时回收存储空间。
删除行时,上例中显示的 delete 事件值仍可用于日志压缩,因为 Kafka 能够删除所有使用相同键的消息。message 值必须设置为 null
,以指示 Kafka 删除共享同一键的所有消息。为了实现此目的,默认情况下 Debezium 的 Oracle 连接器始终遵循一个 delete 事件,它有一个特殊的 tombstone 事件,其键相同但 null
值。您可以通过设置连接器属性 tombstones.on.delete
来改变默认的行为。
截断 事件
一个 截断 的更改事件信号,表示表已被截断。本例中 message 键为 null
,消息值类似如下:
{ "schema": { ... }, "payload": { "before": null, "after": null, "source": { 1 "version": "2.1.4.Final", "connector": "oracle", "name": "oracle_server", "ts_ms": 1638974535000, "snapshot": "false", "db": "ORCLPDB1", "sequence": null, "schema": "DEBEZIUM", "table": "TEST_TABLE", "txId": "02000a0037030000", "scn": "13234397", "commit_scn": "13271102", "lcr_position": null, "rs_id": "001234.00012345.0124", "ssn": 1, "redo_thread": 1, "user_name": "user" }, "op": "t", 2 "ts_ms": 1638974558961, 3 "transaction": null } }
项 | 字段名称 | 描述 |
---|---|---|
1 |
|
描述事件源元数据的必需字段。在 truncate 事件值中,
|
2 |
|
描述操作类型的强制字符串。 |
3 |
|
显示连接器处理事件的时间的可选字段。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。
|
由于 truncate 事件代表对整个表所做的更改,且没有消息键,且没有具有多个分区的主题,因此不能保证消费者收到 截断 事件和更改事件(创建、update 等等)用于表。例如,当消费者从不同分区读取事件时,可能会在收到同一表的 截断 事件后收到表 的更新 事件。只有在主题使用单个分区时,才可以保证排序。
如果您不想捕获 截断 的事件,请使用 skipped.operations
选项过滤它们。