11.3. 以 CloudEvents 格式发出 Debezium 更改事件记录


CloudEvents 是以常见方式描述事件数据的规格。其目的是跨服务、平台和系统提供互操作性。Debezium 允许您配置 MongoDB、MySQL、PostgreSQL 或 SQL Server 连接器来发出符合 CloudEvents 规格的更改事件记录。

重要

以 CloudEvents 格式发出更改事件记录是一项技术预览功能。技术预览功能不被红帽产品服务级别协议(SLA)支持,且可能无法完成。因此,红帽不推荐在生产环境中实施任何技术预览功能。此技术预览功能为您提供对即将推出的产品创新的早期访问,允许您在开发过程中测试并提供反馈。如需有关支持范围的更多信息,请参阅 技术预览功能支持范围

CloudEvents 规格定义:

  • 一组标准化事件属性
  • 定义自定义属性的规则
  • 将事件格式映射到序列化表示的编码规则,如 JSON 或 Avro
  • 用于传输层的协议绑定,如 Apache Kafka、HTTP 或 AMQP

要将 Debezium 连接器配置为发出符合 CloudEvents 规格的更改事件记录,Debebe 提供了 io.debezium.converters.CloudEventsConverter,它是一个 Kafka Connect 消息转换器。

目前,只支持结构化映射模式。CloudEvents 更改事件 envelope 可以是 JSON 或 Avro,每个 envelope 类型都支持 JSON 或 Avro 作为 数据格式。预计未来的 Debezium 发行版本将支持二进制映射模式。

有关以 CloudEvents 格式发出更改事件的信息如下:

有关使用 Avro 的详情,请参考:

以下示例显示了 PostgreSQL 连接器发送的 CloudEvents 更改事件记录类似如下。在本例中,PostgreSQL 连接器配置为使用 JSON 作为 CloudEvents 格式 envelope,也可以用作 数据格式

{
  "id" : "name:test_server;lsn:29274832;txId:565",   
1

  "source" : "/debezium/postgresql/test_server",     
2

  "specversion" : "1.0",                             
3

  "type" : "io.debezium.postgresql.datachangeevent", 
4

  "time" : "2020-01-13T13:55:39.738Z",               
5

  "datacontenttype" : "application/json",            
6

  "iodebeziumop" : "r",                              
7

  "iodebeziumversion" : "2.3.4.Final",        
8

  "iodebeziumconnector" : "postgresql",
  "iodebeziumname" : "test_server",
  "iodebeziumtsms" : "1578923739738",
  "iodebeziumsnapshot" : "true",
  "iodebeziumdb" : "postgres",
  "iodebeziumschema" : "s1",
  "iodebeziumtable" : "a",
  "iodebeziumlsn" : "29274832",
  "iodebeziumxmin" : null,
  "iodebeziumtxid": "565",                           
9

  "iodebeziumtxtotalorder": "1",
  "iodebeziumtxdatacollectionorder": "1",
  "data" : {                                         
10

    "before" : null,
    "after" : {
      "pk" : 1,
      "name" : "Bob"
    }
  }
}
Copy to Clipboard Toggle word wrap
1 1 1
连接器根据更改事件的内容为更改事件生成的唯一 ID。
2 2 2
事件源,这是由连接器配置中 topic.prefix 属性指定的数据库的逻辑名称。
3 3 3
CloudEvents 规格版本。
4 4 4
生成更改事件的连接器类型。此字段的格式是 io.debezium.CONNECTOR_TYPE.datachangeeventCONNECTOR_TYPE 的值是 mongodbmysqlpostgresqlsqlserver
5 5
源数据库中更改的时间。
6
描述 data 属性的内容类型,本例中为 JSON。唯一的替代方法是 Avro。
7
操作标识符。可能的值有 r 用于读取、c 表示 create、u 表示 update,或 d 用于 delete。
8
来自 Debezium 更改事件已知的所有 属性都会通过使用属性名称的 iodebezium 前缀映射到 CloudEvents 扩展属性。
9
在连接器中启用时,从 Debezium 更改事件已知的 事务 属性都会通过使用属性名称的 iodebeziumtx 前缀映射到 CloudEvents 扩展属性。
10
实际数据更改本身。根据操作和连接器,数据可能包含、after 和/或 patch 字段。

以下示例还显示了 PostgreSQL 连接器发出的 CloudEvents 更改事件记录是什么样子。在本例中,PostgreSQL 连接器再次配置为使用 JSON 作为 CloudEvents 格式 envelope,但这一次连接器被配置为使用 Avro 作为 数据格式

{
  "id" : "name:test_server;lsn:33227720;txId:578",
  "source" : "/debezium/postgresql/test_server",
  "specversion" : "1.0",
  "type" : "io.debezium.postgresql.datachangeevent",
  "time" : "2020-01-13T14:04:18.597Z",
  "datacontenttype" : "application/avro",            
1

  "dataschema" : "http://my-registry/schemas/ids/1", 
2

  "iodebeziumop" : "r",
  "iodebeziumversion" : "2.3.4.Final",
  "iodebeziumconnector" : "postgresql",
  "iodebeziumname" : "test_server",
  "iodebeziumtsms" : "1578924258597",
  "iodebeziumsnapshot" : "true",
  "iodebeziumdb" : "postgres",
  "iodebeziumschema" : "s1",
  "iodebeziumtable" : "a",
  "iodebeziumtxId" : "578",
  "iodebeziumlsn" : "33227720",
  "iodebeziumxmin" : null,
  "iodebeziumtxid": "578",
  "iodebeziumtxtotalorder": "1",
  "iodebeziumtxdatacollectionorder": "1",
  "data" : "AAAAAAEAAgICAg=="                        
3

}
Copy to Clipboard Toggle word wrap
1
表示 data 属性包含 Avro 二进制数据。
2
Avro 数据遵循的 schema 的 URI。
3
data 属性包含 base64 编码的 Avro 二进制文件数据。

也可以将 Avro 用于 envelope 和 data 属性。

11.3.2. 配置 Debezium CloudEvents converter 示例

在 Debezium 连接器配置中配置 io.debezium.converters.CloudEventsConverter。以下示例演示了如何配置 CloudEvents converter 来发出具有以下特征的更改事件记录:

  • 使用 JSON 作为信封。
  • 使用 http://my-registry/schemas/ids/1 中的 schema registry 将 data 属性序列化为二进制 Avro 数据。
...
"value.converter": "io.debezium.converters.CloudEventsConverter",
"value.converter.serializer.type" : "json",          
1

"value.converter.data.serializer.type" : "avro",
"value.converter.avro.schema.registry.url": "http://my-registry/schemas/ids/1"
...
Copy to Clipboard Toggle word wrap
1
指定 serializer.type 是可选的,因为 json 是默认值。

CloudEvents 转换器转换 Kafka 记录值。在同一个连接器配置中,如果要对记录键进行操作,您可以指定 key.converter。例如,您可以指定 StringConverterLongConverterJsonConverterAvroConverter

11.3.3. Debezium CloudEvents converter 配置选项

当您将 Debezium 连接器配置为使用 CloudEvent converter 时,您可以指定以下选项。

Expand
表 11.3. CloudEvents converter 配置选项的描述

选项

默认

描述

serializer.type

json

用于 CloudEvents envelope 结构的编码类型。该值可以是 jsonavro

data.serializer.type

json

用于 data 属性的编码类型。该值可以是 jsonavro

json. ...

N/A

使用 JSON 时要传递给底层转换器的任何配置选项。json. 前缀已被删除。

avro. ...

N/A

使用 Avro 时要传递给底层转换器的任何配置选项。avro. 前缀已被删除。例如,对于 Avro 数据,您可以指定 avro.schema.registry.url 选项。

schema.name.adjustment.mode

none

指定应如何调整模式名称以与连接器使用的消息转换器兼容。该值可以是 noneavro

返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat