13.10. 配置 Debezium MongoDB 连接器以使用 outbox 模式
此 SMT 仅用于 Debezium MongoDB 连接器。有关将 outbox 事件路由器 SMT 用于关系数据库的详情,请参考 Outbox 事件路由器。
outbox 模式是在多个(micro)服务之间安全可靠地交换数据的方法。开箱即用模式实施可避免服务内部状态(通常在数据库中保留)和需要相同数据的服务所消耗的事件之间的不一致。
要在 Debezium 应用程序中实施 outbox 模式,请将 Debezium 连接器配置为:
- 捕获 outbox 集合中的更改
- 应用 Debezium MongoDB outbox 事件路由器单个消息转换(SMT)
配置为应用 MongoDB outbox SMT 的 Debezium 连接器应该只捕获 outbox 集合中发生的更改。如需更多信息,请参阅 有选择地应用转换的选项。
只有在每个 outbox 集合具有相同的结构时,连接器才可以捕获多个 outbox 集合中的更改。
要使用这个 SMT,在实际业务集合和插入 outbox 集合中的操作必须是多文档事务的一部分,自 MongoDB 4.0 开始被支持,以防止商业集合和 outbox 集合之间潜在的数据不一致。对于将来的更新,要在没有多文档事务的情况下在 ACID 事务中启用更新现有数据并插入 outbox 事件,我们计划支持以现有集合的子文档的形式存储 outbox 事件的额外配置,而不是独立的 outbox 集合。
有关 outbox 模式的更多信息,请参阅使用 Outbox Pattern 的 Reliable microservicess Data Exchange。
以下主题提供详情:
- 第 13.10.1 节 “Debezium MongoDB outbox 消息示例”
- 第 13.10.2 节 “Debezium mongodb outbox 事件路由器 SMT 期望的 outbox 集合结构”
- 第 13.10.3 节 “基本 Debezium MongoDB outbox 事件路由器 SMT 配置”
- 第 13.10.5 节 “在 Debezium MongoDB outbox 消息中使用 Avro 作为有效负载格式”
- 第 13.10.6 节 “在 Debezium MongoDB outbox 信息中发出其他字段”
- 第 13.10.8 节 “用于配置 outbox 事件路由器转换的选项”
13.10.1. Debezium MongoDB outbox 消息示例
要了解如何配置 Debezium MongoDB outbox 事件路由器 SMT,请考虑以下 Debezium outbox 消息示例:
# Kafka Topic: outbox.event.order # Kafka Message key: "b2730779e1f596e275826f08" # Kafka Message Headers: "id=596e275826f08b2730779e1f" # Kafka Message Timestamp: 1556890294484 { "{\"id\": {\"$oid\": \"da8d6de63b7745ff8f4457db\"}, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}" }
配置为应用 MongoDB outbox 事件路由器 SMT 的 Debezium 连接器通过转换原始 Debezium 更改事件信息来生成前面的消息,如下例所示:
# Kafka Message key: { "id": "{\"$oid\": \"596e275826f08b2730779e1f\"}" } # Kafka Message Headers: "" # Kafka Message Timestamp: 1556890294484 { "patch": null, "after": "{\"_id\": {\"$oid\": \"596e275826f08b2730779e1f\"}, \"aggregateid\": {\"$oid\": \"b2730779e1f596e275826f08\"}, \"aggregatetype\": \"Order\", \"type\": \"OrderCreated\", \"payload\": {\"_id\": {\"$oid\": \"da8d6de63b7745ff8f4457db\"}, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}}", "source": { "version": "2.3.4.Final", "connector": "mongodb", "name": "fulfillment", "ts_ms": 1558965508000, "snapshot": false, "db": "inventory", "rs": "rs0", "collection": "customers", "ord": 31, "h": 1546547425148721999 }, "op": "c", "ts_ms": 1556890294484 }
此 Debezium outbox 消息示例基于 默认的 outbox 事件路由器配置,它假定基于聚合的 outbox 集合结构和事件路由。要自定义行为,outbox 事件路由器 SMT 提供了大量 配置选项。
13.10.2. Debezium mongodb outbox 事件路由器 SMT 期望的 outbox 集合结构
要应用默认的 MongoDB outbox 事件路由器 SMT 配置,您的 outbox 集合被假定为具有以下字段:
{ "_id": "objectId", "aggregatetype": "string", "aggregateid": "objectId", "type": "string", "payload": "object" }
字段 | 效果 |
---|---|
|
包含事件的唯一 ID。在 outbox 消息中,这个值是一个标头。例如,您可以使用此 ID 删除重复的消息。 |
|
包含 SMT 附加到连接器发出 outbox 消息的主题名称中的值。默认行为是,这个值替换了 |
|
包含事件密钥,它为有效负载提供 ID。SMT 使用这个值作为 emitted outbox 消息中的键。这对于在 Kafka 分区中维护正确顺序非常重要。 |
|
outbox 更改事件的表示。默认结构是 JSON。默认情况下,Kafka message 值只由 |
其他自定义字段 |
outbox 集合中的任何其他字段都可以 添加到 outbox 事件,可以在 payload 部分或作为消息标头中添加。 |
13.10.3. 基本 Debezium MongoDB outbox 事件路由器 SMT 配置
要将 Debezium MongoDB 连接器配置为支持 outbox 模式,请配置 outbox.MongoEventRouter
SMT。要获取 SMT 的默认行为,请在没有指定任何选项的情况下将其添加到连接器配置中,如下例所示:
transforms=outbox,... transforms.outbox.type=io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter
自定义配置
连接器可能会发出许多类型的事件信息(如 heartbeat 消息、tombstone 消息或有关事务的元数据信息)。要将转换应用到源自 outbox 集合中的事件,请定义一个 有选择地将转换应用到这些事件的 SMT predicate 语句。
13.10.4. 用于有选择地应用 MongoDB Outbox 事件路由器转换的选项
除了 Debezium 连接器在数据库更改时发出的更改事件消息外,连接器还会发出其他类型的信息,包括心跳消息,以及有关 schema 更改和事务的元数据消息。由于这些其他消息的结构与 SMT 设计的更改事件消息的结构不同,因此最好将连接器配置为有选择地应用 SMT,以便它只处理预期的数据更改消息。您可以使用以下任一方法配置连接器来有选择地应用 SMT:
- 为转换配置 SMT predicate。
-
对 SMT 使用
route.topic.regex
配置选项。
13.10.5. 在 Debezium MongoDB outbox 消息中使用 Avro 作为有效负载格式
MongoDB outbox 事件路由器 SMT 支持任意有效负载格式。outbox 集合中 有效负载
字段值以透明的方式传递。使用 JSON 的替代方法是使用 Avro。这对消息格式监管很有用,并确保 outbox 事件模式以向后兼容的方式发展。
源应用程序如何为 outbox 消息有效负载生成 Avro 格式内容超出了本文档的范围。可以利用 KafkaAvroSerializer
类序列化 GenericRecord
实例。要确保 Kafka message 值是准确的 Avro 二进制数据,请将以下配置应用到连接器:
transforms=outbox,... transforms.outbox.type=io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter value.converter=io.debezium.converters.ByteArrayConverter
默认情况下,payload
字段值( Avro 数据)是唯一消息值。配置 ByteArrayConverter
,因为值转换器 将有效负载
字段值按原样传播到 Kafka 消息值。
请注意,这与其他 SMT 建议的 BinaryDataConverter
不同。这是因为 MongoDB 在内部存储字节阵列的不同方法。
Debezium 连接器可以配置为发送心跳、事务元数据或模式更改事件(支持因连接器而异)。这些事件无法通过 ByteArrayConverter
序列化,因此必须提供额外的配置,因此转换器知道如何序列化这些事件。例如,以下配置演示了使用没有模式的 Apache Kafka JsonConverter
:
transforms=outbox,... transforms.outbox.type=io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter value.converter=io.debezium.converters.ByteArrayConverter value.converter.delegate.converter.type=org.apache.kafka.connect.json.JsonConverter value.converter.delegate.converter.type.schemas.enable=false
delegate Converter
实现通过 delegate.converter.type
选项指定。如果转换器需要任何其他配置选项,也可以指定它们,如使用 schemas.enable=false
显示的模式禁用。
13.10.6. 在 Debezium MongoDB outbox 信息中发出其他字段
outbox 集合可能会包含您要添加到发出出出出出消息的值的字段。例如,假设 outbox 集合的 aggregatetype
字段中的值为 purchase-order
,另一个字段 eventType
,其可能的值是 order-created
和 order-shipped
。可以使用语法 field:placement:alias
来添加其他字段。
放置
允许的值有: - header
- envelope
- partition
要在 outbox 消息标头中发出 eventType
字段值,请配置 SMT,如下所示:
transforms=outbox,... transforms.outbox.type=io.debezium.transforms.outbox.EventRouter transforms.outbox.collection.fields.additional.placement=eventType:header:type
结果将是 Kafka 消息的标头,其 type
作为其键,eventType
字段的值为其值。
要在 outbox 消息 envelope 中发出 eventType
字段值,请配置 SMT,如下所示:
transforms=outbox,... transforms.outbox.type=io.debezium.transforms.outbox.EventRouter transforms.outbox.collection.fields.additional.placement=eventType:envelope:type
要控制在其上生成 outbox 消息的分区,请配置 SMT,如下所示:
transforms=outbox,... transforms.outbox.type=io.debezium.transforms.outbox.EventRouter transforms.outbox.collection.fields.additional.placement=partitionField:partition
请注意,对于 分区
放置,添加别名将无效。
13.10.7. 扩展转义的 JSON 字符串作为 JSON
默认情况下,Debezium outbox 消息 的有效负载
以字符串表示。当字符串的原始源采用 JSON 格式时,生成的 Kafka 消息使用转义序列来代表字符串,如下例所示:
# Kafka Topic: outbox.event.order # Kafka Message key: "1" # Kafka Message Headers: "id=596e275826f08b2730779e1f" # Kafka Message Timestamp: 1556890294484 { "{\"id\": {\"$oid\": \"da8d6de63b7745ff8f4457db\"}, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}" }
您可以配置 outbox 事件路由器以扩展消息内容,将转义的 JSON 转换为其原始的未转义 JSON 格式。在转换的字符串中,companion 模式会从原始 JSON 文档中分离。以下示例显示生成的 Kafka 信息中展开的 JSON:
# Kafka Topic: outbox.event.order # Kafka Message key: "1" # Kafka Message Headers: "id=596e275826f08b2730779e1f" # Kafka Message Timestamp: 1556890294484 { "id": "da8d6de63b7745ff8f4457db", "lineItems": [{"id": 1, "item": "Debezium in Action", "status": "ENTERED", "quantity": 2, "totalPrice": 39.98}, {"id": 2, "item": "Debezium for Dummies", "status": "ENTERED", "quantity": 1, "totalPrice": 29.99}], "orderDate": "2019-01-31T12:13:01", "customerId": 123 }
要在转换中启用字符串转换,将 collection.expand.json.payload
的值设置为 true
,并使用 StringConverter
,如下例所示:
transforms=outbox,... transforms.outbox.type=io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter transforms.outbox.collection.expand.json.payload=true value.converter=org.apache.kafka.connect.storage.StringConverter
13.10.8. 用于配置 outbox 事件路由器转换的选项
下表描述了您可以为 outbox 事件路由器 SMT 指定的选项。在表格中,Group 列指示 Kafka 的配置选项分类。
选项 | 默认 | 组 | 描述 |
---|---|---|---|
| 集合 | 决定在 outbox 集合上有更新操作时 SMT 的行为。可能的设置有:
outbox 集合中的所有更改都应该是插入或删除操作。也就是说,outbox 集合作为队列的功能;不允许对 outbox 集合中的文档进行更新。SMT 自动过滤掉的删除操作(用于删除开箱即用的事件)。 | |
| 集合 |
指定包含唯一事件 ID 的 outbox 集合字段。此 ID 将存储在 | |
| 集合 | 指定包含 event 键的 outbox 集合字段。当此字段包含值时,SMT 会使用该值作为 emitted outbox 信息中的键。这对于在 Kafka 分区中维护正确的顺序非常重要。 | |
集合 | 默认情况下,emitted outbox 消息中的时间戳是 Debezium 事件时间戳。要在 outbox 消息中使用不同的时间戳,请将这个选项设置为包含您要发出出出消息的时间戳的 outbox 集合字段。 | ||
| 集合 | 指定包含事件有效负载的 outbox 集合字段。 | |
| 集合 |
指定是否应进行 String 有效负载的 JSON 扩展。如果没有找到内容,或者在解析错误时保留内容,则会保留"as is"。 | |
collection, Envelope | 指定您要添加到 outbox 消息标头或信封的一个或多个 outbox 集合字段。指定以逗号分隔的对列表。在每个对中,指定字段的名称,以及是否希望该值在标头中还是 envelope。使用冒号分隔对中的值,例如:
要为字段指定别名,请将别名指定为 trio,例如:
第二个值是放置,它必须始终是 配置示例包括在 emitting additional fields in Debezium outbox messages 中。 | ||
collection, Schema | 设置后,这个值将用作 schema 版本,如 Kafka Connect Schema Javadoc 所述。 | ||
| 路由器 | 指定 outbox 集合中字段的名称。默认情况下,此字段中指定的值成为连接器发出 outbox 消息的主题名称的一部分。有关示例,请查看预期 outbox 集合的描述。 | |
| 路由器 |
指定 outbox SMT 在 RegexRouter 中应用到 outbox 集合文档的正则表达式。这个正则表达式是设置
+ 默认的行为是 SMT 将 | |
| 路由器 |
指定连接器发出 outbox 消息的主题名称。默认主题名称为
+ 要更改主题名称,您可以:
| |
| 路由器 |
指明一个空或 |