12.8. 配置 Debezium MongoDB 连接器以使用 outbox 模式


注意

此 SMT 仅用于 Debezium MongoDB 连接器。有关将 outbox 事件路由器 SMT 用于关系数据库的详情,请参考 Outbox 事件路由器

开箱即用模式是一种在多个(micro)服务间安全、可靠地交换数据的方法。开箱即用模式实施可避免服务内部状态(通常在其数据库中保留)和状态,以及需要相同数据的服务所消耗的事件的状态。

要在 Debezium 应用程序中实施 outbox 模式,请将 Debezium 连接器配置为:

  • 捕获 outbox 集合中的更改
  • 应用 Debezium MongoDB outbox 事件路由器单消息转换(SMT)

配置为应用 MongoDB outbox SMT 的 Debezium 连接器应该只捕获在 outbox 集合中发生的更改。如需更多信息,请参阅有 选择应用转换的选项

只有每个 outbox 集合具有相同的结构时,连接器才能捕获多个 outbox 集合中的更改。

注意

要使用此 SMT,必须作为多文档事务的一部分(从 MongoDB 4.0 开始),对实际业务集合和插入到 outbox 集合中的操作作为多文档事务的一部分完成,以防止业务集合和 outbox 集合之间的潜在的数据不一致。对于将来的更新,要更新现有数据并在 ACID 事务中没有多文档事务的情况下插入现有数据,我们计划支持以现有集合的子文档而不是独立集合的形式存储外部事件的额外配置。

有关 outbox 模式的更多信息,请参阅使用 Outbox 模式的可靠的交换数据交换

以下主题提供详情:

12.8.1. Debezium MongoDB outbox 消息示例

要了解如何配置 Debezium MongoDB outbox 事件路由器 SMT,请考虑以下 Debezium outbox 信息示例:

# Kafka Topic: outbox.event.order
# Kafka Message key: "b2730779e1f596e275826f08"
# Kafka Message Headers: "id=596e275826f08b2730779e1f"
# Kafka Message Timestamp: 1556890294484
{
  "{\"id\": {\"$oid\": \"da8d6de63b7745ff8f4457db\"}, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}"
}

被配置为应用 MongoDB outbox 事件路由器 SMT 的 Debezium 连接器通过转换原始 Debezium 更改事件信息来生成前面的消息,如下例所示:

# Kafka Message key: { "id": "{\"$oid\": \"596e275826f08b2730779e1f\"}" }
# Kafka Message Headers: ""
# Kafka Message Timestamp: 1556890294484
{
  "patch": null,
  "after": "{\"_id\": {\"$oid\": \"596e275826f08b2730779e1f\"}, \"aggregateid\": {\"$oid\": \"b2730779e1f596e275826f08\"}, \"aggregatetype\": \"Order\", \"type\": \"OrderCreated\", \"payload\": {\"_id\": {\"$oid\": \"da8d6de63b7745ff8f4457db\"}, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}}",
  "source": {
    "version": "2.1.4.Final",
    "connector": "mongodb",
    "name": "fulfillment",
    "ts_ms": 1558965508000,
    "snapshot": false,
    "db": "inventory",
    "rs": "rs0",
    "collection": "customers",
    "ord": 31,
    "h": 1546547425148721999
  },
  "op": "c",
  "ts_ms": 1556890294484
}

这个 Debezium outbox 消息示例基于 默认的 outbox 事件路由器配置,它假定一个基于聚合的集合结构和事件路由。要自定义行为,outbox 事件路由器 SMT 提供了大量 配置选项

12.8.2. Debezium mongodb outbox 事件路由器 SMT 期望的 outbox 集合结构

要应用默认的 MongoDB outbox 事件路由器 SMT 配置,假设您的 outbox 集合具有以下字段:

{
  "_id": "objectId",
  "aggregatetype": "string",
  "aggregateid": "objectId",
  "type": "string",
  "payload": "object"
}
表 12.8. 预期的 outbox 集合字段的描述
字段效果

id

包含事件的唯一 ID。在 outbox 消息中,这个值是一个标头。您可以使用此 ID 删除重复的消息。

要从不同的 outbox collection 字段获取事件的唯一 ID,请在连接器配置中设置 collection.field.event.id SMT 选项。

aggregatetype

包含 SMT 附加到主题名称中的值,连接器向这个主题发出一个 outbox 信息。默认行为是,这个值替换了 route.topic.replacement SMT 选项中的默认的 ${routedByValue} 变量。

例如,在默认配置中,route.by.field SMT 选项被设置为 aggregatetyperoute.topic.replacement SMT 选项被设置为 outbox.event.${routedByValue}。假设您的应用程序在 outbox 集合中添加了两个文档。在第一个文档中,aggregatetype 字段的值是 customers。在第二个文档中,aggregatetype 字段中的值是 orders。连接器将第一个文档发送到 outbox.event.customers 主题。连接器将第二个文档发送到 outbox.event.orders 主题。

要从不同的 outbox 集合字段获取这个值,请在连接器配置中设置 route.by.field SMT 选项。

aggregateid

包含 event 键,它为有效负载提供 ID。SMT 使用这个值作为发送的 outbox 消息中的键。这对于在 Kafka 分区中维护正确顺序非常重要。

要从不同的 outbox collection 字段获取事件密钥,请在连接器配置中设置 collection.field.event.key SMT 选项。

payload

开箱即用更改事件的表示。默认结构为 JSON。默认情况下,Kafka message 值仅由 有效负载 值组成。但是,如果 outbox 事件被配置为包含其他字段,则 Kafka 消息值包含一个 envelope 封装和附加字段,每个字段都会单独表示。如需更多信息,请参阅使用其他字段协调消息

要从不同的 outbox 集合字段获取事件有效负载,请在连接器配置中设置 collection.field.event.payload SMT 选项。

其他自定义字段

outbox 集合中的任何其他字段都可以 添加到 payload 部分中或作为消息标头的 outbox 事件中。

一个示例可以是字段 eventType,它传递用户定义的值,以帮助对事件进行分类或组织。

12.8.3. 基本 Debezium MongoDB outbox 事件路由器 SMT 配置

要配置 Debezium MongoDB 连接器来支持 outbox 模式,请配置 outbox.MongoEventRouter SMT。要获取 SMT 的默认行为,请将其添加到连接器配置中,而不指定任何选项,如下例所示:

transforms=outbox,...
transforms.outbox.type=io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter

自定义配置

连接器可能会发出多种类型的事件信息(如心跳消息、tombstone 消息或有关事务的元数据消息)。要将转换应用到源自在 outbox 集合中的事件,请定义一个 SMT predicate 语句,用于选择性地将转换应用到 这些事件。

12.8.4. 用于有选择地应用 MongoDB outbox 事件路由器转换的选项

除了 Debezium 连接器在发生数据库更改时发出的更改事件消息外,连接器还会发出其他类型的消息,包括心跳消息以及有关架构更改和事务的元数据消息。由于这些消息的结构与 SMT 旨在处理的更改事件消息的结构不同,因此最好将连接器配置为有选择地应用 SMT,以便它只处理预期的数据更改消息。您可以使用以下方法之一将连接器配置为有选择地应用 SMT:

12.8.5. 在 Debezium MongoDB outbox 消息中使用 Avro 作为有效负载格式

MongoDB outbox 事件路由器 SMT 支持任意有效负载格式。outbox 集合中的 payload 字段值以透明方式传递。使用 JSON 的替代方案是使用 Avro。这对消息格式监管非常有用,并确保外部事件模式以向后兼容的方式演进。

源应用程序如何为开箱即用消息有效负载生成 Avro 格式的内容不在本文档范围内。一种可能是利用 KafkaAvroSerializer 类来序列化 GenericRecord 实例。要确保 Kafka message 值是确切的 Avro 二进制数据,请将以下配置应用到连接器:

transforms=outbox,...
transforms.outbox.type=io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter
value.converter=io.debezium.converters.ByteArrayConverter

默认情况下,有效负载 字段值( Avro 数据)是唯一消息值。将 ByteArrayConverter 配置为值转换器会将 payload 字段值按原样传播到 Kafka 消息值。

请注意,这与为其他 SMT 推荐的 BinaryDataConverter 不同。这是因为 MongoDB 在内部存储字节阵列的方法不同。

Debezium 连接器可以配置为发出心跳、事务元数据或模式更改事件(根据连接器而异)。这些事件不能由 ByteArrayConverter 序列化,因此必须提供额外的配置,以便转换器了解如何序列化这些事件。例如,以下配置演示了使用没有模式的 Apache Kafka JsonConverter

transforms=outbox,...
transforms.outbox.type=io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter
value.converter=io.debezium.converters.ByteArrayConverter
value.converter.delegate.converter.type=org.apache.kafka.connect.json.JsonConverter
value.converter.delegate.converter.type.schemas.enable=false

delegate Converter 实现由 delegate.converter.type 选项指定。如果转换器需要额外的配置选项,也可以指定它们,如上述模式的禁用使用 schemas.enable=false

12.8.6. 在 Debezium MongoDB outbox 信息中记录其他字段

您的 outbox 集合可能会包含您要添加到发出的消息中的值。例如,假设在 aggregatetype 字段中具有 product -order 值的 outbox 集合,另一个字段 eventType,其可能的值是 order-createdorder-shipped。可以使用语法 字段添加其他字段:placement:alias

放置 允许的值是: - header - envelope - partition

要在 outbox 消息标头中发出 eventType 字段值,请配置 SMT,如下所示:

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.collection.fields.additional.placement=eventType:header:type

结果将是 Kafka 消息的标头,其 type 作为其键,eventType 字段的值作为其值。

要在 outbox message envelope 中发出 eventType 字段值,请配置 SMT,如下所示:

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.collection.fields.additional.placement=eventType:envelope:type

要控制生成 outbox 信息的分区,请配置 SMT,如下所示:

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.collection.fields.additional.placement=partitionField:partition

请注意,对于 分区 放置,添加别名将无效。

12.8.7. 将转义的 JSON 字符串扩展为 JSON

默认情况下,Debebe outbox 消息 的有效负载 表示为字符串。当字符串的原始源采用 JSON 格式时,生成的 Kafka 消息使用转义序列来表示字符串,如下例所示:

# Kafka Topic: outbox.event.order
# Kafka Message key: "1"
# Kafka Message Headers: "id=596e275826f08b2730779e1f"
# Kafka Message Timestamp: 1556890294484
{
  "{\"id\": {\"$oid\": \"da8d6de63b7745ff8f4457db\"}, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}"
}

您可以配置 outbox 事件路由器来扩展消息内容,将转义的 JSON 转换为其原始未转义的 JSON 格式。在转换的字符串中,companion 模式从原始 JSON 文档中分离。以下示例显示了生成的 Kafka 信息中展开的 JSON:

# Kafka Topic: outbox.event.order
# Kafka Message key: "1"
# Kafka Message Headers: "id=596e275826f08b2730779e1f"
# Kafka Message Timestamp: 1556890294484
{
  "id": "da8d6de63b7745ff8f4457db", "lineItems": [{"id": 1, "item": "Debezium in Action", "status": "ENTERED", "quantity": 2, "totalPrice": 39.98}, {"id": 2, "item": "Debezium for Dummies", "status": "ENTERED", "quantity": 1, "totalPrice": 29.99}], "orderDate": "2019-01-31T12:13:01", "customerId": 123
}

要在转换中启用字符串转换,请将 collection.expand.json.payload 的值设置为 true,并使用 StringConverter,如下例所示:

transforms=outbox,...
transforms.outbox.type=io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter
transforms.outbox.collection.expand.json.payload=true
value.converter=org.apache.kafka.connect.storage.StringConverter

12.8.8. 配置开箱即用事件路由器转换的选项

下表描述了您可以为 outbox 事件路由器 SMT 指定的选项。在表中,Group 列表示 Kafka 的配置选项分类。

表 12.9. outbox 事件路由器 SMT 配置选项的描述
选项默认描述

collection.op.invalid.behavior

warn

集合

决定在 outbox 集合上有更新操作时 SMT 的行为。可能的设置有:

  • warn - SMT 会记录警告并继续下一个开箱即用的集合文档。
  • Error - SMT 日志错误,并继续到下一个开箱即用的集合文档。
  • fatal - SMT 日志错误,连接器会停止处理。

开箱即用集合中的所有更改都应该是插入或删除操作。也就是说,一个 outbox 集合的功能是队列;不允许对 outbox 集合中的文档进行更新。SMT 会在开箱即用的集合中自动过滤删除删除操作(用于删除继续的事件)。

collection.field.event.id

_id

集合

指定包含唯一事件 ID 的 outbox 集合字段。此 ID 将存储在 id 键下的发送事件的标头中。

collection.field.event.key

aggregateid

集合

指定包含事件键的 outbox 集合字段。当此字段包含一个值时,SMT 将该值用作发出的 outbox 消息中的键。这对于在 Kafka 分区中维护正确顺序非常重要。

collection.field.event.timestamp

 

集合

默认情况下,发出的 outbox 消息的时间戳是 Debezium 事件时间戳。要在开箱即用消息中使用不同的时间戳,请将此选项设置为 outbox collection 字段,其中包含您要发出的时间戳信息。

collection.field.event.payload

payload

集合

指定包含事件有效负载的 outbox 集合字段。

collection.expand.json.payload

false

集合

指定是否应进行 String 有效负载的 JSON 扩展。如果没有找到内容,或者在解析错误时保存内容,则内容将保持不变。

如需了解更多详细信息,请参阅 扩展转义的 json 部分。

collection.fields.additional.placement

 

集合,Envelope

指定您要添加到 outbox 消息标头或 envelopes 的一个或多个 outbox 集合字段。指定以逗号分隔的对列表。在每个对中,指定一个字段的名称,以及是否希望该值位于标头还是 envelope 中。使用冒号分隔对中的值,例如:

id:header,my-field:envelope

要为字段指定一个别名,请使用别名指定一个 trio 作为第三个值,例如:

id:header,my-field:envelope:my-alias

第二个值是放置,它必须始终是 headerenvelope

配置示例 在 Debezium outbox 消息中发出其他字段

collection.field.event.schema.version

 

集合,Schema

设置后,这个值将用作 schema 版本,如 Kafka Connect Schema Javadoc 所述。

route.by.field

aggregatetype

路由器

指定 outbox 集合中字段的名称。默认情况下,此字段指定的值将成为连接器向其发出外部消息的主题名称的一部分。例如,请参阅 预期 outbox 集合的描述

route.topic.regex

(?<routedByValue>.*)

路由器

指定 outbox SMT 在 RegexRouter 中应用到 outbox 集合文档中的正则表达式。这个正则表达式是 route.topic.replacement SMT 选项设置的一部分。

+ 默认行为是 SMT 将 route.topic.replacement SMT 选项设置中的默认 ${routedByValue} 变量替换为 route.by.field outbox SMT 选项的设置。

route.topic.replacement

outbox.event​.${routedByValue}

路由器

指定连接器向其发送消息的主题名称。默认主题名称为 outbox.event。 后跟 outbox 集合文档中的 aggregatetype 字段值。例如,如果 aggregatetype 值为 customers,则主题名称为 outbox.event.customers

+ 要更改主题名称,您可以:

route.tombstone.on.empty.payload

false

路由器

指明空还是 null 有效负载是否会导致连接器发出 tombstone 事件。

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.