12.7. 配置 Debezium 连接器以使用 outbox 模式
开箱即用模式是一种在多个(micro)服务间安全、可靠地交换数据的方法。开箱即用模式实施可避免服务内部状态(通常在其数据库中保留)和状态,以及需要相同数据的服务所消耗的事件的状态。
要在 Debezium 应用程序中实施 outbox 模式,请将 Debezium 连接器配置为:
- 捕获开箱即用表中的更改
- 应用 Debezium outbox 事件路由器单消息转换(SMT)
配置为应用 outbox SMT 的 Debezium 连接器应该只捕获在 outbox 表中发生的更改。如需更多信息,请参阅有 选择应用转换的选项。
只有每个 outbox 表具有相同的结构时,连接器才能捕获多个 outbox 表中的更改。
请参阅 可靠的与 Outbox Pattern 的数据交换,以了解为什么开箱即用模式很有用以及如何工作。
outbox 事件路由器 SMT 与 MongoDB 连接器不兼容。
MongoDB 用户可以运行 MongoDB outbox 事件路由器 SMT。
以下主题提供详情:
- 第 12.7.1 节 “Debezium outbox 消息示例”
- 第 12.7.2 节 “Debezium outbox 事件路由器 SMT 期望的 outbox 表结构”
- 第 12.7.3 节 “基本 Debezium outbox 事件路由器 SMT 配置”
- 第 12.7.4 节 “用于有选择地应用 Outbox 事件路由器转换的选项”
- 第 12.7.5 节 “在 Debezium outbox 消息中使用 Avro 作为有效负载格式”
- 第 12.7.6 节 “在 Debezium outbox 信息中记录其他字段”
- 第 12.7.7 节 “将转义的 JSON 字符串扩展为 JSON”
- 第 12.7.8 节 “配置开箱即用事件路由器转换的选项”
12.7.1. Debezium outbox 消息示例
要了解 Debezium outbox 事件路由器 SMT 的配置方式,请查看以下 Debezium outbox 信息示例:
# Kafka Topic: outbox.event.order # Kafka Message key: "1" # Kafka Message Headers: "id=4d47e190-0402-4048-bc2c-89dd54343cdc" # Kafka Message Timestamp: 1556890294484 { "{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}" }
被配置为应用 outbox 事件路由器 SMT 的 Debezium 连接器通过转换 Debezium raw 信息来生成上述消息,如下所示:
# Kafka Message key: "406c07f3-26f0-4eea-a50c-109940064b8f" # Kafka Message Headers: "" # Kafka Message Timestamp: 1556890294484 { "before": null, "after": { "id": "406c07f3-26f0-4eea-a50c-109940064b8f", "aggregateid": "1", "aggregatetype": "Order", "payload": "{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}", "timestamp": 1556890294344, "type": "OrderCreated" }, "source": { "version": "2.1.4.Final", "connector": "postgresql", "name": "dbserver1-bare", "db": "orderdb", "ts_usec": 1556890294448870, "txId": 584, "lsn": 24064704, "schema": "inventory", "table": "outboxevent", "snapshot": false, "last_snapshot_record": null, "xmin": null }, "op": "c", "ts_ms": 1556890294484 }
这个 Debezium outbox 消息示例基于 默认的 outbox 事件路由器配置,它假定一个基于聚合的表结构和事件路由。要自定义行为,outbox 事件路由器 SMT 提供了大量 配置选项。
12.7.2. Debezium outbox 事件路由器 SMT 期望的 outbox 表结构
要应用默认的 outbox 事件路由器 SMT 配置,假设您的 outbox 表具有以下列:
Column | Type | Modifiers --------------+------------------------+----------- id | uuid | not null aggregatetype | character varying(255) | not null aggregateid | character varying(255) | not null type | character varying(255) | not null payload | jsonb |
列 | 效果 |
---|---|
|
包含事件的唯一 ID。在 outbox 消息中,这个值是一个标头。您可以使用此 ID 删除重复的消息。 |
|
包含 SMT 附加到主题名称中的值,连接器向这个主题发出一个 outbox 信息。默认行为是,这个值替换了 |
|
包含 event 键,它为有效负载提供 ID。SMT 使用这个值作为发送的 outbox 消息中的键。这对于在 Kafka 分区中维护正确顺序非常重要。 |
|
开箱即用更改事件的表示。默认结构为 JSON。默认情况下,Kafka message 值仅由 |
其他自定义列 |
outbox 表中的任何其他列可以在 payload 部分中或作为消息标头 添加到 outbox 事件中。 |
12.7.3. 基本 Debezium outbox 事件路由器 SMT 配置
要配置 Debezium 连接器来支持 outbox 模式,请配置 outbox.EventRouter
SMT。要获取 SMT 的默认行为,请将其添加到连接器配置中,而不指定任何选项,如下例所示:
transforms=outbox,... transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
自定义配置
连接器可能会发出多种类型的事件信息(例如,心跳消息、tombstone 消息或有关事务或架构更改的元数据消息)。要将转换应用到来自 outbox 表中的事件,请定义一个 SMT predicate 语句,选择性地将转换应用到 这些事件。
12.7.4. 用于有选择地应用 Outbox 事件路由器转换的选项
除了 Debezium 连接器在发生数据库更改时发出的更改事件消息外,连接器还会发出其他类型的消息,包括心跳消息以及有关架构更改和事务的元数据消息。由于这些消息的结构与 SMT 旨在处理的更改事件消息的结构不同,因此最好将连接器配置为有选择地应用 SMT,以便它只处理预期的数据更改消息。您可以使用以下方法之一将连接器配置为有选择地应用 SMT:
- 为转换配置 SMT predicate。
-
将
route.topic.regex
配置选项用于 SMT。
12.7.5. 在 Debezium outbox 消息中使用 Avro 作为有效负载格式
outbox 事件路由器 SMT 支持任意有效负载格式。outbox 表中的 payload
列值以透明方式传递。使用 JSON 的替代方案是使用 Avro。这对消息格式监管非常有用,并确保外部事件模式以向后兼容的方式演进。
源应用程序如何为开箱即用消息有效负载生成 Avro 格式的内容不在本文档范围内。一种可能是利用 KafkaAvroSerializer
类来序列化 GenericRecord
实例。要确保 Kafka message 值是确切的 Avro 二进制数据,请将以下配置应用到连接器:
transforms=outbox,... transforms.outbox.type=io.debezium.transforms.outbox.EventRouter value.converter=io.debezium.converters.BinaryDataConverter
默认情况下,有效负载
列值( Avro 数据)是唯一消息值。将 BinaryDataConverter
配置为值转换器会将 payload
列值按原样传播到 Kafka 消息值。
Debezium 连接器可以配置为发出心跳、事务元数据或模式更改事件(根据连接器而异)。这些事件不能由 BinaryDataConverter
序列化,因此必须提供额外的配置,以便转换器了解如何序列化这些事件。例如,以下配置演示了使用没有模式的 Apache Kafka JsonConverter
:
transforms=outbox,... transforms.outbox.type=io.debezium.transforms.outbox.EventRouter value.converter=io.debezium.converters.BinaryDataConverter value.converter.delegate.converter.type=org.apache.kafka.connect.json.JsonConverter value.converter.delegate.converter.type.schemas.enable=false
delegate Converter
实现由 delegate.converter.type
选项指定。如果转换器需要额外的配置选项,也可以指定它们,如上述模式的禁用使用 schemas.enable=false
。
自 Debezium 版本 1.9 起,转换器 io.debezium.converters.ByteBufferConverter
已被弃用,并在 2.0 中删除。另外,在使用 Kafka Connect 时,必须在升级到 Debezium 2.x 前更新连接器的配置
12.7.6. 在 Debezium outbox 信息中记录其他字段
您的 outbox 表可能会包含您要添加到发送的消息中的列。例如,假设一个 outbox 表在 aggregatetype
列中具有 purchase-order
值,另一个列 eventType
,其可能的值是 order-created
和 order-shipped
。可以使用 syntax 列添加其他字段 :placement:alias
。
放置
允许的值是: - header
- envelope
- partition
要在 outbox 消息标头中发出 eventType
列值,请配置 SMT,如下所示:
transforms=outbox,... transforms.outbox.type=io.debezium.transforms.outbox.EventRouter transforms.outbox.table.fields.additional.placement=eventType:header:type
结果将是 Kafka 消息的标头,其 type
作为其键,eventType
列的值作为其值。
要在 outbox message envelope 中发出 eventType
列值,请配置 SMT,如下所示:
transforms=outbox,... transforms.outbox.type=io.debezium.transforms.outbox.EventRouter transforms.outbox.table.fields.additional.placement=eventType:envelope:type
要控制生成 outbox 信息的分区,请配置 SMT,如下所示:
transforms=outbox,... transforms.outbox.type=io.debezium.transforms.outbox.EventRouter transforms.outbox.table.fields.additional.placement=partitionColumn:partition
请注意,对于 分区
放置,添加别名将无效。
12.7.7. 将转义的 JSON 字符串扩展为 JSON
您可能已注意到 Debezium outbox 消息包含以 String 表示 的有效负载
。因此,当此字符串实际上为 JSON 时,它会在结果 Kafka 信息中显示为转义,如下所示:
# Kafka Topic: outbox.event.order # Kafka Message key: "1" # Kafka Message Headers: "id=4d47e190-0402-4048-bc2c-89dd54343cdc" # Kafka Message Timestamp: 1556890294484 { "{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}" }
借助 outbox 事件路由器,您可以将此消息内容扩展到"real"JSON,并将 companion 模式从 JSON 文档本身分离。这样,在 Kafka 信息中的结果类似如下:
# Kafka Topic: outbox.event.order # Kafka Message key: "1" # Kafka Message Headers: "id=4d47e190-0402-4048-bc2c-89dd54343cdc" # Kafka Message Timestamp: 1556890294484 { "id": 1, "lineItems": [{"id": 1, "item": "Debezium in Action", "status": "ENTERED", "quantity": 2, "totalPrice": 39.98}, {"id": 2, "item": "Debezium for Dummies", "status": "ENTERED", "quantity": 1, "totalPrice": 29.99}], "orderDate": "2019-01-31T12:13:01", "customerId": 123 }
要启用此转换,您必须将 table.expand.json.payload
设置为 true,并使用类似如下的 JsonConverter
:
transforms=outbox,... transforms.outbox.type=io.debezium.transforms.outbox.EventRouter transforms.outbox.table.expand.json.payload=true value.converter=org.apache.kafka.connect.json.JsonConverter
12.7.8. 配置开箱即用事件路由器转换的选项
下表描述了您可以为 outbox 事件路由器 SMT 指定的选项。在表中,Group 列表示 Kafka 的配置选项分类。
选项 | 默认 | 组 | 描述 |
---|---|---|---|
| 表 |
决定在 outbox 表中存在
开箱即用表中的所有更改都应该是 | |
| 表 |
指定包含唯一事件 ID 的 outbox 表列。此 ID 将存储在 | |
| 表 | 指定包含事件键的 outbox 表列。当此列包含一个值时,SMT 将该值用作发出的 outbox 消息中的键。这对于在 Kafka 分区中维护正确顺序非常重要。 | |
表 | 默认情况下,发出的 outbox 消息的时间戳是 Debezium 事件时间戳。要在开箱即用消息中使用不同的时间戳,请将此选项设置为 outbox 表列,其中包含您要发出的时间戳。 | ||
| 表 | 指定包含事件有效负载的 outbox 表列。 | |
| 表 |
指定是否应进行 String 有效负载的 JSON 扩展。如果没有找到内容,或者在解析错误时保存内容,则内容将保持不变。 | |
| 表 |
启用 JSON 扩展属性
| |
表,Envelope | 指定您要添加到 outbox 消息标头或 envelopes 的一个或多个 outbox 表列。指定以逗号分隔的对列表。在每个对中,指定列的名称以及是否希望该值位于标头还是 envelope 中。使用冒号分隔对中的值,例如:
要为列指定一个别名,请使用别名指定为第三个值,例如:
第二个值是放置,它必须始终是 | ||
表,Schema | 设置后,这个值将用作 schema 版本,如 Kafka Connect Schema Javadoc 所述。 | ||
| 路由器 | 指定 outbox 表中列的名称。默认行为是此列中的值成为连接器向其发出外部消息的主题名称的一部分。示例位于 预期 outbox 表的描述 中。 | |
| 路由器 |
指定 outbox SMT 在 RegexRouter 中应用到 outbox 表记录的正则表达式。这个正则表达式是 | |
| 路由器 |
指定连接器向其发送消息的主题名称。默认主题名称为
| |
| 路由器 |
指明空还是 |