12.8. 配置 Debezium MongoDB 连接器以使用 outbox 模式
此 SMT 仅用于 Debezium MongoDB 连接器。有关将 outbox 事件路由器 SMT 用于关系数据库的详情,请参考 Outbox 事件路由器。
开箱即用模式是一种在多个(micro)服务间安全、可靠地交换数据的方法。开箱即用模式实施可避免服务内部状态(通常在其数据库中保留)和状态,以及需要相同数据的服务所消耗的事件的状态。
要在 Debezium 应用程序中实施 outbox 模式,请将 Debezium 连接器配置为:
- 捕获 outbox 集合中的更改
- 应用 Debezium MongoDB outbox 事件路由器单消息转换(SMT)
配置为应用 MongoDB outbox SMT 的 Debezium 连接器应该只捕获在 outbox 集合中发生的更改。如需更多信息,请参阅有 选择应用转换的选项。
只有每个 outbox 集合具有相同的结构时,连接器才能捕获多个 outbox 集合中的更改。
要使用此 SMT,必须作为多文档事务的一部分(从 MongoDB 4.0 开始),对实际业务集合和插入到 outbox 集合中的操作作为多文档事务的一部分完成,以防止业务集合和 outbox 集合之间的潜在的数据不一致。对于将来的更新,要更新现有数据并在 ACID 事务中没有多文档事务的情况下插入现有数据,我们计划支持以现有集合的子文档而不是独立集合的形式存储外部事件的额外配置。
有关 outbox 模式的更多信息,请参阅使用 Outbox 模式的可靠的交换数据交换。
以下主题提供详情:
12.8.1. Debezium MongoDB outbox 消息示例
要了解如何配置 Debezium MongoDB outbox 事件路由器 SMT,请考虑以下 Debezium outbox 信息示例:
# Kafka Topic: outbox.event.order # Kafka Message key: "b2730779e1f596e275826f08" # Kafka Message Headers: "id=596e275826f08b2730779e1f" # Kafka Message Timestamp: 1556890294484 { "{\"id\": {\"$oid\": \"da8d6de63b7745ff8f4457db\"}, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}" }
被配置为应用 MongoDB outbox 事件路由器 SMT 的 Debezium 连接器通过转换原始 Debezium 更改事件信息来生成前面的消息,如下例所示:
# Kafka Message key: { "id": "{\"$oid\": \"596e275826f08b2730779e1f\"}" } # Kafka Message Headers: "" # Kafka Message Timestamp: 1556890294484 { "patch": null, "after": "{\"_id\": {\"$oid\": \"596e275826f08b2730779e1f\"}, \"aggregateid\": {\"$oid\": \"b2730779e1f596e275826f08\"}, \"aggregatetype\": \"Order\", \"type\": \"OrderCreated\", \"payload\": {\"_id\": {\"$oid\": \"da8d6de63b7745ff8f4457db\"}, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}}", "source": { "version": "2.1.4.Final", "connector": "mongodb", "name": "fulfillment", "ts_ms": 1558965508000, "snapshot": false, "db": "inventory", "rs": "rs0", "collection": "customers", "ord": 31, "h": 1546547425148721999 }, "op": "c", "ts_ms": 1556890294484 }
这个 Debezium outbox 消息示例基于 默认的 outbox 事件路由器配置,它假定一个基于聚合的集合结构和事件路由。要自定义行为,outbox 事件路由器 SMT 提供了大量 配置选项。
12.8.2. Debezium mongodb outbox 事件路由器 SMT 期望的 outbox 集合结构
要应用默认的 MongoDB outbox 事件路由器 SMT 配置,假设您的 outbox 集合具有以下字段:
{ "_id": "objectId", "aggregatetype": "string", "aggregateid": "objectId", "type": "string", "payload": "object" }
字段 | 效果 |
---|---|
|
包含事件的唯一 ID。在 outbox 消息中,这个值是一个标头。您可以使用此 ID 删除重复的消息。 |
|
包含 SMT 附加到主题名称中的值,连接器向这个主题发出一个 outbox 信息。默认行为是,这个值替换了 |
|
包含 event 键,它为有效负载提供 ID。SMT 使用这个值作为发送的 outbox 消息中的键。这对于在 Kafka 分区中维护正确顺序非常重要。 |
|
开箱即用更改事件的表示。默认结构为 JSON。默认情况下,Kafka message 值仅由 |
其他自定义字段 |
outbox 集合中的任何其他字段都可以 添加到 payload 部分中或作为消息标头的 outbox 事件中。 |
12.8.3. 基本 Debezium MongoDB outbox 事件路由器 SMT 配置
要配置 Debezium MongoDB 连接器来支持 outbox 模式,请配置 outbox.MongoEventRouter
SMT。要获取 SMT 的默认行为,请将其添加到连接器配置中,而不指定任何选项,如下例所示:
transforms=outbox,... transforms.outbox.type=io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter
自定义配置
连接器可能会发出多种类型的事件信息(如心跳消息、tombstone 消息或有关事务的元数据消息)。要将转换应用到源自在 outbox 集合中的事件,请定义一个 SMT predicate 语句,用于选择性地将转换应用到 这些事件。
12.8.4. 用于有选择地应用 MongoDB outbox 事件路由器转换的选项
除了 Debezium 连接器在发生数据库更改时发出的更改事件消息外,连接器还会发出其他类型的消息,包括心跳消息以及有关架构更改和事务的元数据消息。由于这些消息的结构与 SMT 旨在处理的更改事件消息的结构不同,因此最好将连接器配置为有选择地应用 SMT,以便它只处理预期的数据更改消息。您可以使用以下方法之一将连接器配置为有选择地应用 SMT:
- 为转换配置 SMT predicate。
-
将
route.topic.regex
配置选项用于 SMT。
12.8.5. 在 Debezium MongoDB outbox 消息中使用 Avro 作为有效负载格式
MongoDB outbox 事件路由器 SMT 支持任意有效负载格式。outbox 集合中的 payload
字段值以透明方式传递。使用 JSON 的替代方案是使用 Avro。这对消息格式监管非常有用,并确保外部事件模式以向后兼容的方式演进。
源应用程序如何为开箱即用消息有效负载生成 Avro 格式的内容不在本文档范围内。一种可能是利用 KafkaAvroSerializer
类来序列化 GenericRecord
实例。要确保 Kafka message 值是确切的 Avro 二进制数据,请将以下配置应用到连接器:
transforms=outbox,... transforms.outbox.type=io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter value.converter=io.debezium.converters.ByteArrayConverter
默认情况下,有效负载
字段值( Avro 数据)是唯一消息值。将 ByteArrayConverter
配置为值转换器会将 payload
字段值按原样传播到 Kafka 消息值。
请注意,这与为其他 SMT 推荐的 BinaryDataConverter
不同。这是因为 MongoDB 在内部存储字节阵列的方法不同。
Debezium 连接器可以配置为发出心跳、事务元数据或模式更改事件(根据连接器而异)。这些事件不能由 ByteArrayConverter
序列化,因此必须提供额外的配置,以便转换器了解如何序列化这些事件。例如,以下配置演示了使用没有模式的 Apache Kafka JsonConverter
:
transforms=outbox,... transforms.outbox.type=io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter value.converter=io.debezium.converters.ByteArrayConverter value.converter.delegate.converter.type=org.apache.kafka.connect.json.JsonConverter value.converter.delegate.converter.type.schemas.enable=false
delegate Converter
实现由 delegate.converter.type
选项指定。如果转换器需要额外的配置选项,也可以指定它们,如上述模式的禁用使用 schemas.enable=false
。
12.8.6. 在 Debezium MongoDB outbox 信息中记录其他字段
您的 outbox 集合可能会包含您要添加到发出的消息中的值。例如,假设在 aggregatetype
字段中具有 product -order
值的 outbox 集合,另一个字段 eventType
,其可能的值是 order-created
和 order-shipped
。可以使用语法 字段添加其他字段:placement:alias
。
放置
允许的值是: - header
- envelope
- partition
要在 outbox 消息标头中发出 eventType
字段值,请配置 SMT,如下所示:
transforms=outbox,... transforms.outbox.type=io.debezium.transforms.outbox.EventRouter transforms.outbox.collection.fields.additional.placement=eventType:header:type
结果将是 Kafka 消息的标头,其 type
作为其键,eventType
字段的值作为其值。
要在 outbox message envelope 中发出 eventType
字段值,请配置 SMT,如下所示:
transforms=outbox,... transforms.outbox.type=io.debezium.transforms.outbox.EventRouter transforms.outbox.collection.fields.additional.placement=eventType:envelope:type
要控制生成 outbox 信息的分区,请配置 SMT,如下所示:
transforms=outbox,... transforms.outbox.type=io.debezium.transforms.outbox.EventRouter transforms.outbox.collection.fields.additional.placement=partitionField:partition
请注意,对于 分区
放置,添加别名将无效。
12.8.7. 将转义的 JSON 字符串扩展为 JSON
默认情况下,Debebe outbox 消息 的有效负载
表示为字符串。当字符串的原始源采用 JSON 格式时,生成的 Kafka 消息使用转义序列来表示字符串,如下例所示:
# Kafka Topic: outbox.event.order # Kafka Message key: "1" # Kafka Message Headers: "id=596e275826f08b2730779e1f" # Kafka Message Timestamp: 1556890294484 { "{\"id\": {\"$oid\": \"da8d6de63b7745ff8f4457db\"}, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}" }
您可以配置 outbox 事件路由器来扩展消息内容,将转义的 JSON 转换为其原始未转义的 JSON 格式。在转换的字符串中,companion 模式从原始 JSON 文档中分离。以下示例显示了生成的 Kafka 信息中展开的 JSON:
# Kafka Topic: outbox.event.order # Kafka Message key: "1" # Kafka Message Headers: "id=596e275826f08b2730779e1f" # Kafka Message Timestamp: 1556890294484 { "id": "da8d6de63b7745ff8f4457db", "lineItems": [{"id": 1, "item": "Debezium in Action", "status": "ENTERED", "quantity": 2, "totalPrice": 39.98}, {"id": 2, "item": "Debezium for Dummies", "status": "ENTERED", "quantity": 1, "totalPrice": 29.99}], "orderDate": "2019-01-31T12:13:01", "customerId": 123 }
要在转换中启用字符串转换,请将 collection.expand.json.payload
的值设置为 true
,并使用 StringConverter
,如下例所示:
transforms=outbox,... transforms.outbox.type=io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter transforms.outbox.collection.expand.json.payload=true value.converter=org.apache.kafka.connect.storage.StringConverter
12.8.8. 配置开箱即用事件路由器转换的选项
下表描述了您可以为 outbox 事件路由器 SMT 指定的选项。在表中,Group 列表示 Kafka 的配置选项分类。
选项 | 默认 | 组 | 描述 |
---|---|---|---|
| 集合 | 决定在 outbox 集合上有更新操作时 SMT 的行为。可能的设置有:
开箱即用集合中的所有更改都应该是插入或删除操作。也就是说,一个 outbox 集合的功能是队列;不允许对 outbox 集合中的文档进行更新。SMT 会在开箱即用的集合中自动过滤删除删除操作(用于删除继续的事件)。 | |
| 集合 |
指定包含唯一事件 ID 的 outbox 集合字段。此 ID 将存储在 | |
| 集合 | 指定包含事件键的 outbox 集合字段。当此字段包含一个值时,SMT 将该值用作发出的 outbox 消息中的键。这对于在 Kafka 分区中维护正确顺序非常重要。 | |
集合 | 默认情况下,发出的 outbox 消息的时间戳是 Debezium 事件时间戳。要在开箱即用消息中使用不同的时间戳,请将此选项设置为 outbox collection 字段,其中包含您要发出的时间戳信息。 | ||
| 集合 | 指定包含事件有效负载的 outbox 集合字段。 | |
| 集合 |
指定是否应进行 String 有效负载的 JSON 扩展。如果没有找到内容,或者在解析错误时保存内容,则内容将保持不变。 | |
集合,Envelope | 指定您要添加到 outbox 消息标头或 envelopes 的一个或多个 outbox 集合字段。指定以逗号分隔的对列表。在每个对中,指定一个字段的名称,以及是否希望该值位于标头还是 envelope 中。使用冒号分隔对中的值,例如:
要为字段指定一个别名,请使用别名指定一个 trio 作为第三个值,例如:
第二个值是放置,它必须始终是 | ||
集合,Schema | 设置后,这个值将用作 schema 版本,如 Kafka Connect Schema Javadoc 所述。 | ||
| 路由器 | 指定 outbox 集合中字段的名称。默认情况下,此字段指定的值将成为连接器向其发出外部消息的主题名称的一部分。例如,请参阅 预期 outbox 集合的描述。 | |
| 路由器 |
指定 outbox SMT 在 RegexRouter 中应用到 outbox 集合文档中的正则表达式。这个正则表达式是
+ 默认行为是 SMT 将 | |
| 路由器 |
指定连接器向其发送消息的主题名称。默认主题名称为
+ 要更改主题名称,您可以:
| |
| 路由器 |
指明空还是 |