6.3. 以 CloudEvents 格式发出 Debezium 更改事件记录
CloudEvents 是以通用方式描述事件数据的规格。其目的是在服务、平台和系统间实现互操作性。Debezium 可让您配置 Db2、MongoDB、MySQL、Oracle、PostgreSQL 或 SQL Server 连接器,以发出符合 CloudEvents 规格的更改事件记录。
以 CloudEvents 格式发出更改事件记录是一个技术预览功能。技术预览功能不被红帽产品服务级别协议(SLA)支持,且可能无法完成。因此,红帽不推荐在生产环境中实施任何技术预览功能。此技术预览功能为您提供对即将推出的产品创新的早期访问,允许您在开发过程中测试并提供反馈。如需有关支持范围的更多信息,请参阅 技术预览功能支持范围。
CloudEvents 规格定义:
- 一组标准化事件属性
- 定义自定义属性的规则
- 将事件格式映射到序列化表示的编码规则,如 JSON 或 Apache Avro
- Apache Kafka、HTTP 或 AMQP 等传输层的协议绑定
要将 Debezium 连接器配置为发出符合 CloudEvents 规格的更改事件记录,Debebe 提供了 io.debezium.converters.CloudEventsConverter,它是一个 Kafka Connect 消息转换器。
目前,只能使用结构化映射模式。CloudEvents 更改事件信封可以是 JSON 或 Avro,您可以使用 JSON 或 Avro 作为每个信封类型的数据格式。有关以 CloudEvents 格式发出更改事件的信息被组织,如下所示:
有关使用 Avro 的详情,请参考:
6.3.1. CloudEvents 格式的 Debezium 更改事件记录示例 复制链接链接已复制到粘贴板!
以下示例显示了 PostgreSQL 连接器发出的 CloudEvents 更改事件记录是什么样子。在本例中,PostgreSQL 连接器被配置为使用 JSON 作为 CloudEvents 格式,并作为 数据格式。
{
"id" : "name:test_server;lsn:29274832;txId:565",
"source" : "/debezium/postgresql/test_server",
"specversion" : "1.0",
"type" : "io.debezium.connector.postgresql.DataChangeEvent",
"time" : "2020-01-13T13:55:39.738Z",
"datacontenttype" : "application/json",
"iodebeziumop" : "r",
"iodebeziumversion" : "2.7.3.Final",
"iodebeziumconnector" : "postgresql",
"iodebeziumname" : "test_server",
"iodebeziumtsms" : "1578923739738",
"iodebeziumsnapshot" : "true",
"iodebeziumdb" : "postgres",
"iodebeziumschema" : "s1",
"iodebeziumtable" : "a",
"iodebeziumlsn" : "29274832",
"iodebeziumxmin" : null,
"iodebeziumtxid": "565",
"iodebeziumtxtotalorder": "1",
"iodebeziumtxdatacollectionorder": "1",
"data" : {
"before" : null,
"after" : {
"pk" : 1,
"name" : "Bob"
}
}
}
| 项 | 描述 |
|---|---|
| 1 | 连接器根据更改事件的内容为更改事件生成的唯一 ID。 |
| 2 |
事件源,这是数据库的逻辑名称,如连接器配置中的 |
| 3 | CloudEvents 规格版本。 |
| 4 |
生成更改事件的连接器类型。此字段的格式是 |
| 5 | 源数据库更改的时间。 |
| 6 |
描述 |
| 7 |
操作标识符。可能的值有 |
| 8 |
来自 Debezium 更改事件的所有 |
| 9 |
在连接器中启用时,Debezium change events 已知的每个 |
| 10 |
实际的数据更改。根据操作和连接器,数据可能会在、或 |
以下示例显示了 PostgreSQL 连接器发出的 CloudEvents 更改事件记录是什么样子。在本例中,PostgreSQL 连接器再次配置为使用 JSON 作为 CloudEvents 格式 envelope,但此时连接器被配置为对数据格式使用 Avro。
{
"id" : "name:test_server;lsn:33227720;txId:578",
"source" : "/debezium/postgresql/test_server",
"specversion" : "1.0",
"type" : "io.debezium.connector.postgresql.DataChangeEvent",
"time" : "2020-01-13T14:04:18.597Z",
"datacontenttype" : "application/avro",
"dataschema" : "http://my-registry/schemas/ids/1",
"iodebeziumop" : "r",
"iodebeziumversion" : "2.7.3.Final",
"iodebeziumconnector" : "postgresql",
"iodebeziumname" : "test_server",
"iodebeziumtsms" : "1578924258597",
"iodebeziumsnapshot" : "true",
"iodebeziumdb" : "postgres",
"iodebeziumschema" : "s1",
"iodebeziumtable" : "a",
"iodebeziumtxId" : "578",
"iodebeziumlsn" : "33227720",
"iodebeziumxmin" : null,
"iodebeziumtxid": "578",
"iodebeziumtxtotalorder": "1",
"iodebeziumtxdatacollectionorder": "1",
"data" : "AAAAAAEAAgICAg=="
}
| 项 | 描述 |
|---|---|
| 1 |
表示 |
| 2 | Avro 数据遵循的 schema 的 URI。 |
| 3 |
|
也可以将 Avro 用于信封以及 data 属性。
6.3.2. 配置 Debezium CloudEvents converter 的示例 复制链接链接已复制到粘贴板!
在 Debezium 连接器配置中配置 io.debezium.converters.CloudEventsConverter。以下示例演示了如何配置 CloudEvents converter,以发出具有以下特征的更改事件记录:
- 使用 JSON 作为信封。
-
使用
http://my-registry/schemas/ids/1中的模式 registry 将data属性序列化为二进制 Avro 数据。
...
"value.converter": "io.debezium.converters.CloudEventsConverter",
"value.converter.serializer.type" : "json",
"value.converter.data.serializer.type" : "avro",
"value.converter.avro.schema.registry.url": "http://my-registry/schemas/ids/1"
...
| 项 | 描述 |
|---|---|
| 1 |
指定 |
CloudEvents 转换器转换 Kafka 记录值。在同一连接器配置中,如果要对记录密钥进行操作,您可以指定 key.converter。例如,您可以指定 StringConverter、LongConverter、JsonConverter 或 AvroConverter。
6.3.3. 元数据和一些 CloudEvents 字段的配置 复制链接链接已复制到粘贴板!
默认情况下,metadata.source 属性由三个部分组成,如下例所示:
"value,id:generate,type:generate,dataSchemaName:generate"
第一部分指定检索记录的元数据的来源;允许的值是 value 和 标头。下一部分指定转换器如何为以下元数据字段填充值:
-
id -
type -
dataSchemaName(在 Schema Registry 中注册 schema 的名称)
转换程序可以使用以下任一方法填充每个字段:
generate- 转换程序为字段生成值。
header- 转换程序从消息标头获取字段的值。
获取记录元数据
要构建 CloudEvent,转换器需要源、操作和事务元数据。通常,转换器可以从记录的值检索元数据。但在某些情况下,在转换器收到记录之前,可能会以某种方式处理记录,即元数据在其值中不存在,例如,在 Outbox Event Router SMT 处理记录后。要保留所需的元数据,您可以使用以下方法在记录标头中传递元数据。
流程
-
在记录到达转换器前,实现在记录的标头中记录元数据的机制,例如使用
HeaderFromSMT。 -
将转换器的
metadata.source属性的值设置为header。
以下示例显示了使用 Outbox Event Router SMT 和 HeaderFrom SMT 的连接器的配置:
...
"tombstones.on.delete": false,
"transforms": "addMetadataHeaders,outbox",
"transforms.addMetadataHeaders.type": "org.apache.kafka.connect.transforms.HeaderFrom$Value",
"transforms.addMetadataHeaders.fields": "source,op,transaction",
"transforms.addMetadataHeaders.headers": "source,op,transaction",
"transforms.addMetadataHeaders.operation": "copy",
"transforms.addMetadataHeaders.predicate": "isHeartbeat",
"transforms.addMetadataHeaders.negate": true,
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.expand.json.payload": true,
"transforms.outbox.table.fields.additional.placement": "type:header",
"predicates": "isHeartbeat",
"predicates.isHeartbeat.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.isHeartbeat.pattern": "__debezium-heartbeat.*",
"value.converter": "io.debezium.converters.CloudEventsConverter",
"value.converter.metadata.source": "header",
"header.converter": "org.apache.kafka.connect.json.JsonConverter",
"header.converter.schemas.enable": true
...
要使用 HeaderFrom 转换,可能需要过滤 tombstone 和 heartbeat 信息。
metadata.source 属性的标头值是一个全局设置。因此,即使您省略了属性值的部分,如 id 和 type 源,转换器也会为忽略的部分生成 标头值。
获取 CloudEvent 元数据
默认情况下,CloudEvents 转换会自动为 CloudEvent 的 id 和 type 字段生成值,并为其 data 字段生成 schema 名称。您可以通过更改默认值并在适当的标头中指定字段的值来自定义转换器填充这些字段的方式。例如:
"value.converter.metadata.source": "value,id:header,type:header,dataSchemaName:header"
在前面的配置生效时,您可以配置上游功能,以添加 id 和 type 标头,以及您要传递给 CloudEvents converter 的值。
如果您只想为 id 标头提供值,请使用:
"value.converter.metadata.source": "value,id:header,type:generate,dataSchemaName:generate"
要将转换器配置为从标头获取 id、type 和 dataSchemaName 元数据,请使用以下短语法:
"value.converter.metadata.source": "header"
要启用转换器从标头字段检索数据架构名称,您必须将 schema.data.name.source.header.enable 设置为 true。
6.3.4. Debezium CloudEvents converter 配置选项 复制链接链接已复制到粘贴板!
当您将 Debezium 连接器配置为使用 CloudEvent converter 时,您可以指定以下选项。
| 选项 | 默认 | 描述 |
|
|
用于 CloudEvents envelope 结构的编码类型。该值可以是 | |
|
|
用于 | |
| N/A |
使用 JSON 时要传递给底层转换器的任何配置选项。 | |
| N/A |
使用 Avro 时要传递给底层转换器的任何配置选项。 | |
| none |
指定应该如何调整模式名称,以便与连接器使用的消息转换器兼容。该值可以是 | |
| none |
指定在 Schema Registry 中注册 schema 的 CloudEvents 模式名称。当 | |
| false |
指定转换器是否可以从标头中检索 CloudEvents | |
|
|
指定转换器在生成云事件时是否包含扩展属性。该值可以是 | |
|
|
以逗号分隔的列表,用于指定转换器从其中检索 CloudEvent
有关配置示例,请参阅 配置元数据源和一些 CloudEvents 字段。 |