7.9. 配置 Debezium 连接器以使用 outbox 模式
outbox 模式是一种在多个(micro)服务之间安全且可靠地交换数据的方法。outbox 模式实现可避免服务内部状态(通常是在其数据库中保留)和需要相同数据的服务所消耗的事件之间的不一致。
要在 Debezium 应用程序中实施 outbox 模式,请将 Debezium 连接器配置为:
- 捕获 outbox 表中的更改
- 应用 Debezium outbox 事件路由器单一消息转换(SMT)
配置为应用 outbox SMT 的 Debezium 连接器应该只捕获在 outbox 表中发生的更改。如需更多信息,请参阅 用于有选择地应用转换的选项。
只有在每个 outbox 表都有相同的结构时,连接器才能捕获多个 outbox 表中的更改。
请参阅 Reliable microservices Data Exchange with Outbox Pattern,以了解为什么 outbox 模式有用及其工作方式。
outbox 事件路由器 SMT 与 MongoDB 连接器不兼容。
MongoDB 用户可以运行 MongoDB outbox 事件路由器 SMT。
以下主题提供详情:
- 第 7.9.1 节 “Debezium outbox 消息示例”
- 第 7.9.2 节 “Debezium outbox 事件路由器 SMT 期望的 outbox 表结构”
- 第 7.9.3 节 “基本 Debezium outbox 事件路由器 SMT 配置”
- 第 7.9.4 节 “用于有选择地应用 Outbox 事件路由器转换的选项”
- 第 7.9.5.2 节 “在 Debezium outbox 信息中使用 Apache Avro 作为有效负载格式”
- 第 7.9.6 节 “在 Debezium outbox 信息中发出其他字段”
- 第 7.9.5.1.1 节 “生成扩展转义的 JSON 字符串作为 JSON”
- 第 7.9.7 节 “配置 outbox 事件路由器转换的选项”
7.9.1. Debezium outbox 消息示例
要了解如何配置 Debezium outbox 事件路由器 SMT,请查看以下 Debezium outbox 消息示例:
# Kafka Topic: outbox.event.order # Kafka Message key: "1" # Kafka Message Headers: "id=4d47e190-0402-4048-bc2c-89dd54343cdc" # Kafka Message Timestamp: 1556890294484 { "{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}" }
配置为应用 outbox 事件路由器 SMT 的 Debezium 连接器通过转换 Debezium 原始消息来生成上述消息,如下所示:
# Kafka Message key: "406c07f3-26f0-4eea-a50c-109940064b8f" # Kafka Message Headers: "" # Kafka Message Timestamp: 1556890294484 { "before": null, "after": { "id": "406c07f3-26f0-4eea-a50c-109940064b8f", "aggregateid": "1", "aggregatetype": "Order", "payload": "{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}", "timestamp": 1556890294344, "type": "OrderCreated" }, "source": { "version": "2.7.3.Final", "connector": "postgresql", "name": "dbserver1-bare", "db": "orderdb", "ts_usec": 1556890294448870, "txId": 584, "lsn": 24064704, "schema": "inventory", "table": "outboxevent", "snapshot": false, "last_snapshot_record": null, "xmin": null }, "op": "c", "ts_ms": 1556890294484, "ts_us": 1556890294484651, "ts_ns": 1556890294484651402 }
这个 Debezium outbox 消息示例基于 默认的 outbox 事件路由器配置,它根据聚合假设 outbox 表结构和事件路由。要自定义行为,outbox 事件路由器 SMT 提供了大量 配置选项。
7.9.2. Debezium outbox 事件路由器 SMT 期望的 outbox 表结构
要应用默认的 outbox 事件路由器 SMT 配置,您的 outbox 表被认为有以下列:
Column | Type | Modifiers --------------+------------------------+----------- id | uuid | not null aggregatetype | character varying(255) | not null aggregateid | character varying(255) | not null type | character varying(255) | not null payload | jsonb |
列 | 效果 |
---|---|
|
包含事件的唯一 ID。在 outbox 消息中,这个值是一个标头。例如,您可以使用此 ID 删除重复的消息。 |
|
包含一个值,它把 SMT 附加到主题名称中,连接器会发出 outbox 信息。默认行为是,这个值替换了 |
|
包含事件键,它为有效负载提供 ID。SMT 使用这个值作为发出的 outbox 消息中的键。这对于在 Kafka 分区中维护正确顺序非常重要。 |
|
outbox 更改事件表示。默认结构是 JSON。默认情况下,Kafka 消息值仅由 |
其他自定义列 |
outbox 表中的任何其他列都可以 添加到 payload 部分或作为消息标头中的 outbox 事件中。 |
7.9.3. 基本 Debezium outbox 事件路由器 SMT 配置
要配置 Debezium 连接器来支持 outbox 模式,请配置 outbox.EventRouter
SMT。要获取 SMT 的默认行为,请将其添加到连接器配置中,而不指定任何选项,如下例所示:
transforms=outbox,... transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
自定义配置
连接器可能会发出许多类型的事件消息(如心跳消息、tombstone 消息或有关事务或模式更改的元数据消息)。要将转换仅应用到源自 outbox 表中的事件,请定义一个 SMT predicate 语句,该语句仅有选择地将转换应用到 这些事件。
7.9.4. 用于有选择地应用 Outbox 事件路由器转换的选项
除了 Debezium 连接器在数据库更改时发出的更改事件消息外,连接器还会发出其他类型的信息,包括 heartbeat 信息,以及有关 schema 更改和事务的元数据消息。由于这些消息的结构与 SMT 旨在处理的更改事件消息的结构不同,因此最好将连接器配置为有选择地应用 SMT,以便它只处理预期的数据更改消息。您可以使用以下方法之一将连接器配置为有选择地应用 SMT:
- 为转换配置 SMT predicate。
-
为 SMT 使用
route.topic.regex
配置选项。
7.9.5. 有效负载序列化格式
outbox 事件路由器 SMT 支持任意有效负载格式。SMT 传递 payload
列值,它不会在不修改的情况下从 outbox 表中读取。SMT 将这些列值转换为 Kafka 消息字段的方式取决于您如何配置 SMT。序列化数据的通用有效负载格式为 JSON 和 Avro。
7.9.5.1. 使用 JSON 作为序列化格式
outbox 事件路由器 SMT 的默认序列化格式是 JSON。要使用此格式,源列的数据类型必须是 JSON (例如,PostgreSQL 中的 jsonb
)。
7.9.5.1.1. 生成扩展转义的 JSON 字符串作为 JSON
当 Debezium outbox 消息 将有效负载
表示为 JSON String 时,生成的 Kafka 消息会转义字符串,如下例所示:
# Kafka Topic: outbox.event.order # Kafka Message key: "1" # Kafka Message Headers: "id=4d47e190-0402-4048-bc2c-89dd54343cdc" # Kafka Message Timestamp: 1556890294484 { "{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}" }
outbox 事件路由器允许您将消息内容扩展到 "real" JSON,从 JSON 文档中去除 companion 模式。在以下示例中,生成的 Kafka 信息被格式化如下:
# Kafka Topic: outbox.event.order # Kafka Message key: "1" # Kafka Message Headers: "id=4d47e190-0402-4048-bc2c-89dd54343cdc" # Kafka Message Timestamp: 1556890294484 { "id": 1, "lineItems": [{"id": 1, "item": "Debezium in Action", "status": "ENTERED", "quantity": 2, "totalPrice": 39.98}, {"id": 2, "item": "Debezium for Dummies", "status": "ENTERED", "quantity": 1, "totalPrice": 29.99}], "orderDate": "2019-01-31T12:13:01", "customerId": 123 }
要启用使用 outbox 事件路由器转换,请将 table.expand.json.payload
设置为 true,并使用下例中所示的 JsonConverter
:
transforms=outbox,... transforms.outbox.type=io.debezium.transforms.outbox.EventRouter transforms.outbox.table.expand.json.payload=true value.converter=org.apache.kafka.connect.json.JsonConverter
7.9.5.2. 在 Debezium outbox 信息中使用 Apache Avro 作为有效负载格式
Apache Avro 是一个用于序列化数据的通用框架。使用 Avro 有助于消息格式管理,并确保 outbox 事件模式以向后兼容的方式进行演变。
源应用程序如何为 outbox 消息有效负载生成 Avro 格式的内容,而不是本文档的范围。一个可能是利用 KafkaAvroSerializer
类来序列化 GenericRecord
实例。要确保 Kafka 消息值是确切的 Avro 二进制数据,请将以下配置应用到连接器:
transforms=outbox,... transforms.outbox.type=io.debezium.transforms.outbox.EventRouter value.converter=io.debezium.converters.BinaryDataConverter
默认情况下,payload
列值( Avro data)是唯一的消息值。当数据存储为 Avro 格式时,列格式必须设置为二进制数据类型,如 PostgreSQL 中的 bytea
。SMT 的值转换器也必须设置为 BinaryDataConverter
,以便它将 payload
列的二进制值传播到 Kafka 消息值中。
Debezium 连接器可以被配置为发出心跳、事务元数据或架构更改事件(支持因连接器而异)。这些事件不能被 BinaryDataConverter
序列化,因此必须提供额外的配置,因此转换器知道如何序列化这些事件。例如,以下配置演示了使用带有任何模式的 Apache Kafka JsonConverter
:
transforms=outbox,... transforms.outbox.type=io.debezium.transforms.outbox.EventRouter value.converter=io.debezium.converters.BinaryDataConverter value.converter.delegate.converter.type=org.apache.kafka.connect.json.JsonConverter value.converter.delegate.converter.type.schemas.enable=false
delegate Converter
实现由 delegate.converter.type
选项指定。如果转换器需要任何额外的配置选项,也可以指定它们,如使用 schemas.enable=false
所示的模式禁用。
以下示例演示了如何配置 SMT,使用带有 Apicurio Registry 的委派转换器,将数据转换为 Avro 格式:
transforms=outbox,... transforms.outbox.type=io.debezium.transforms.outbox.EventRouter value.converter=io.debezium.converters.BinaryDataConverter value.converter.delegate.converter.type=io.apicurio.registry.utils.converter.AvroConverter value.converter.delegate.converter.apicurio.registry.url=http://apicurio:8080/apis/registry/v2 value.converter.delegate.converter.apicurio.registry.auto-register=true value.converter.delegate.converter.registry.find-latest=true
最后,以下示例演示了如何配置 SMT,使用带有 Confluent Schema Registry 的委派转换器,将数据转换为 Avro 格式:
transforms=outbox,... transforms.outbox.type=io.debezium.transforms.outbox.EventRouter value.converter=io.debezium.converters.BinaryDataConverter value.converter.delegate.converter.type=io.confluent.connect.avro.AvroConverter value.converter.delegate.converter.type.basic.auth.credentials.source=USER_INFO value.converter.delegate.converter.type.basic.auth.user.info={CREDENTIALS} value.converter.delegate.converter.type.schema.registry.url={URL}
在前面的配置示例,因为 AvroConverter
配置为委派转换器,因此需要第三方库。有关如何在类路径中添加第三方库的信息超出了本文档的范围。
7.9.6. 在 Debezium outbox 信息中发出其他字段
您的 outbox 表可能包含您要添加到发出的 outbox 消息中的值。例如,假设一个 outbox 表,它在 aggregatetype
列中的值为 purchase-order
,另一个列 eventType
,其可能的值是 order-create
和 order-shipped
。可以使用语法 列添加其他字段:placement:alias
。
放置
允许的值有: - header
- envelope
- partition
要在 outbox 消息标头中发出 eventType
列值,请配置 SMT,如下所示:
transforms=outbox,... transforms.outbox.type=io.debezium.transforms.outbox.EventRouter transforms.outbox.table.fields.additional.placement=eventType:header:type
结果将是 Kafka 消息中的一个标头,类型为 type
作为其键,eventType
列的值作为其值。
要发送 outbox 消息 envelope 中的 eventType
列值,请配置 SMT,如下所示:
transforms=outbox,... transforms.outbox.type=io.debezium.transforms.outbox.EventRouter transforms.outbox.table.fields.additional.placement=eventType:envelope:type
要控制生成 outbox 消息的分区,请配置 SMT,如下所示:
transforms=outbox,... transforms.outbox.type=io.debezium.transforms.outbox.EventRouter transforms.outbox.table.fields.additional.placement=partitionColumn:partition
请注意,对于 分区
放置,添加一个别名无效。
7.9.7. 配置 outbox 事件路由器转换的选项
下表描述了您可以为 outbox 事件路由器 SMT 指定的选项。在表中,Group 列指示 Kafka 的配置选项分类。
选项 | 默认 | 组 | 描述 |
---|---|---|---|
| 表 |
决定在 outbox 表中有
outbox 表中的所有更改都应该是 | |
| 表 |
指定包含唯一事件 ID 的 outbox 表列。此 ID 将存储在 | |
| 表 | 指定包含事件键的 outbox 表列。当此列包含值时,SMT 使用该值作为发出的 outbox 消息中的键。这对于在 Kafka 分区中维护正确顺序非常重要。 | |
表 | 默认情况下,发出的 outbox 消息中的时间戳是 Debezium 事件时间戳。要在 outbox 消息中使用不同的时间戳,请将此选项设置为 outbox 表列,其中包含您要发出的 outbox 消息的时间戳。 | ||
| 表 | 指定包含事件有效负载的 outbox 表列。 | |
| 表 |
指定是否应执行 String 有效负载的 JSON 扩展。如果没有找到内容或解析错误,则内容将保持"as is"。 | |
| 表 |
当启用 JSON expansion property
| |
表, Envelope | 指定您要添加到 outbox 消息标头或 envelopes 的一个或多个 outbox 表列。指定以逗号分隔的对列表。在每个对中,指定列的名称以及是否希望该值位于标头中还是信封中。使用冒号将对中的值分开,例如:
要为列指定别名,请将别名的 trio 指定为第三个值,例如:
第二个值是放置,它始终必须是 配置示例包括在 emitting additional fields in Debezium outbox messages 中。 | ||
| 表, Envelope |
如果 Outbox 有效负载中没有找到由 | |
表, 架构 | 设置后,这个值将用作 schema 版本,如 Kafka Connect Schema Javadoc 所述。 | ||
| 路由器 | 指定 outbox 表中列的名称。默认行为是,此列中的值成为连接器将 outbox 消息发送到的主题名称的一部分。示例是在 预期出箱表 的描述 中。 | |
| 路由器 |
指定 outbox SMT 在 RegexRouter 中应用到 outbox 表记录的正则表达式。此正则表达式是 | |
| 路由器 |
指定连接器发出 outbox 消息的主题名称。默认主题名称为
| |
| 路由器 |
指明是否为空或 |