7.9. 配置 Debezium 连接器以使用 outbox 模式


outbox 模式是一种在多个(micro)服务之间安全且可靠地交换数据的方法。outbox 模式实现可避免服务内部状态(通常是在其数据库中保留)和需要相同数据的服务所消耗的事件之间的不一致。

要在 Debezium 应用程序中实施 outbox 模式,请将 Debezium 连接器配置为:

  • 捕获 outbox 表中的更改
  • 应用 Debezium outbox 事件路由器单一消息转换(SMT)

配置为应用 outbox SMT 的 Debezium 连接器应该只捕获在 outbox 表中发生的更改。如需更多信息,请参阅 用于有选择地应用转换的选项

只有在每个 outbox 表都有相同的结构时,连接器才能捕获多个 outbox 表中的更改。

请参阅 Reliable microservices Data Exchange with Outbox Pattern,以了解为什么 outbox 模式有用及其工作方式。

注意

outbox 事件路由器 SMT 与 MongoDB 连接器不兼容。

MongoDB 用户可以运行 MongoDB outbox 事件路由器 SMT

以下主题提供详情:

7.9.1. Debezium outbox 消息示例

要了解如何配置 Debezium outbox 事件路由器 SMT,请查看以下 Debezium outbox 消息示例:

# Kafka Topic: outbox.event.order
# Kafka Message key: "1"
# Kafka Message Headers: "id=4d47e190-0402-4048-bc2c-89dd54343cdc"
# Kafka Message Timestamp: 1556890294484
{
  "{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}"
}

配置为应用 outbox 事件路由器 SMT 的 Debezium 连接器通过转换 Debezium 原始消息来生成上述消息,如下所示:

# Kafka Message key: "406c07f3-26f0-4eea-a50c-109940064b8f"
# Kafka Message Headers: ""
# Kafka Message Timestamp: 1556890294484
{
  "before": null,
  "after": {
    "id": "406c07f3-26f0-4eea-a50c-109940064b8f",
    "aggregateid": "1",
    "aggregatetype": "Order",
    "payload": "{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}",
    "timestamp": 1556890294344,
    "type": "OrderCreated"
  },
  "source": {
    "version": "2.7.3.Final",
    "connector": "postgresql",
    "name": "dbserver1-bare",
    "db": "orderdb",
    "ts_usec": 1556890294448870,
    "txId": 584,
    "lsn": 24064704,
    "schema": "inventory",
    "table": "outboxevent",
    "snapshot": false,
    "last_snapshot_record": null,
    "xmin": null
  },
  "op": "c",
  "ts_ms": 1556890294484,
  "ts_us": 1556890294484651,
  "ts_ns": 1556890294484651402
}

这个 Debezium outbox 消息示例基于 默认的 outbox 事件路由器配置,它根据聚合假设 outbox 表结构和事件路由。要自定义行为,outbox 事件路由器 SMT 提供了大量 配置选项

7.9.2. Debezium outbox 事件路由器 SMT 期望的 outbox 表结构

要应用默认的 outbox 事件路由器 SMT 配置,您的 outbox 表被认为有以下列:

Column        |          Type          | Modifiers
--------------+------------------------+-----------
id            | uuid                   | not null
aggregatetype | character varying(255) | not null
aggregateid   | character varying(255) | not null
type          | character varying(255) | not null
payload       | jsonb                  |
表 7.8. expected outbox 表列的描述
效果

id

包含事件的唯一 ID。在 outbox 消息中,这个值是一个标头。例如,您可以使用此 ID 删除重复的消息。

要从不同的 outbox 表列获取事件的唯一 ID,请在连接器配置中设置 table.field.event.id SMT 选项。

aggregatetype

包含一个值,它把 SMT 附加到主题名称中,连接器会发出 outbox 信息。默认行为是,这个值替换了 route.topic.replacement SMT 选项中的默认 ${routedByValue} 变量。

例如,在默认配置中,route.by.field SMT 选项被设置为 aggregatetyperoute.topic.replacement SMT 选项被设置为 outbox.event.${routedByValue}。假设您的应用会将两个记录添加到 outbox 表中。在第一个记录中,aggregatetype 列中的值是 customer。在第二条记录中,aggregatetype 列中的值是 orders。连接器将第一个记录发送到 outbox.event.customers 主题。连接器将第二个记录发送到 outbox.event.orders 主题。

要从不同的 outbox 表列获取这个值,请在连接器配置中设置 route.by.field SMT 选项。

aggregateid

包含事件键,它为有效负载提供 ID。SMT 使用这个值作为发出的 outbox 消息中的键。这对于在 Kafka 分区中维护正确顺序非常重要。

要从不同的 outbox 表列获取事件密钥,请在连接器配置中设置 table.field.event.key SMT 选项

payload

outbox 更改事件表示。默认结构是 JSON。默认情况下,Kafka 消息值仅由 payload 值组成。但是,如果 outbox 事件配置为包含其他字段,则 Kafka 消息值包含接收两个有效负载和其他字段,每个字段将单独表示。如需更多信息,请参阅 使用其他字段传输消息

要从不同的 outbox 表列获取事件有效负载,请在连接器配置中设置 table.field.event.payload SMT 选项。

其他自定义列

outbox 表中的任何其他列都可以 添加到 payload 部分或作为消息标头中的 outbox 事件中。

一个示例可以是列 eventType,它传递了一个用户定义的值,有助于分类或组织事件。

7.9.3. 基本 Debezium outbox 事件路由器 SMT 配置

要配置 Debezium 连接器来支持 outbox 模式,请配置 outbox.EventRouter SMT。要获取 SMT 的默认行为,请将其添加到连接器配置中,而不指定任何选项,如下例所示:

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter

自定义配置

连接器可能会发出许多类型的事件消息(如心跳消息、tombstone 消息或有关事务或模式更改的元数据消息)。要将转换仅应用到源自 outbox 表中的事件,请定义一个 SMT predicate 语句,该语句仅有选择地将转换应用到 这些事件。

7.9.4. 用于有选择地应用 Outbox 事件路由器转换的选项

除了 Debezium 连接器在数据库更改时发出的更改事件消息外,连接器还会发出其他类型的信息,包括 heartbeat 信息,以及有关 schema 更改和事务的元数据消息。由于这些消息的结构与 SMT 旨在处理的更改事件消息的结构不同,因此最好将连接器配置为有选择地应用 SMT,以便它只处理预期的数据更改消息。您可以使用以下方法之一将连接器配置为有选择地应用 SMT:

7.9.5. 有效负载序列化格式

outbox 事件路由器 SMT 支持任意有效负载格式。SMT 传递 payload 列值,它不会在不修改的情况下从 outbox 表中读取。SMT 将这些列值转换为 Kafka 消息字段的方式取决于您如何配置 SMT。序列化数据的通用有效负载格式为 JSON 和 Avro。

7.9.5.1. 使用 JSON 作为序列化格式

outbox 事件路由器 SMT 的默认序列化格式是 JSON。要使用此格式,源列的数据类型必须是 JSON (例如,PostgreSQL 中的 jsonb )。

7.9.5.1.1. 生成扩展转义的 JSON 字符串作为 JSON

当 Debezium outbox 消息 将有效负载 表示为 JSON String 时,生成的 Kafka 消息会转义字符串,如下例所示:

# Kafka Topic: outbox.event.order
# Kafka Message key: "1"
# Kafka Message Headers: "id=4d47e190-0402-4048-bc2c-89dd54343cdc"
# Kafka Message Timestamp: 1556890294484
{
  "{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}"
}

outbox 事件路由器允许您将消息内容扩展到 "real" JSON,从 JSON 文档中去除 companion 模式。在以下示例中,生成的 Kafka 信息被格式化如下:

# Kafka Topic: outbox.event.order
# Kafka Message key: "1"
# Kafka Message Headers: "id=4d47e190-0402-4048-bc2c-89dd54343cdc"
# Kafka Message Timestamp: 1556890294484
{
  "id": 1, "lineItems": [{"id": 1, "item": "Debezium in Action", "status": "ENTERED", "quantity": 2, "totalPrice": 39.98}, {"id": 2, "item": "Debezium for Dummies", "status": "ENTERED", "quantity": 1, "totalPrice": 29.99}], "orderDate": "2019-01-31T12:13:01", "customerId": 123
}

要启用使用 outbox 事件路由器转换,请将 table.expand.json.payload 设置为 true,并使用下例中所示的 JsonConverter

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.expand.json.payload=true
value.converter=org.apache.kafka.connect.json.JsonConverter

7.9.5.2. 在 Debezium outbox 信息中使用 Apache Avro 作为有效负载格式

Apache Avro 是一个用于序列化数据的通用框架。使用 Avro 有助于消息格式管理,并确保 outbox 事件模式以向后兼容的方式进行演变。

源应用程序如何为 outbox 消息有效负载生成 Avro 格式的内容,而不是本文档的范围。一个可能是利用 KafkaAvroSerializer 类来序列化 GenericRecord 实例。要确保 Kafka 消息值是确切的 Avro 二进制数据,请将以下配置应用到连接器:

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
value.converter=io.debezium.converters.BinaryDataConverter

默认情况下,payload 列值( Avro data)是唯一的消息值。当数据存储为 Avro 格式时,列格式必须设置为二进制数据类型,如 PostgreSQL 中的 bytea。SMT 的值转换器也必须设置为 BinaryDataConverter,以便它将 payload 列的二进制值传播到 Kafka 消息值中。

Debezium 连接器可以被配置为发出心跳、事务元数据或架构更改事件(支持因连接器而异)。这些事件不能被 BinaryDataConverter 序列化,因此必须提供额外的配置,因此转换器知道如何序列化这些事件。例如,以下配置演示了使用带有任何模式的 Apache Kafka JsonConverter

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
value.converter=io.debezium.converters.BinaryDataConverter
value.converter.delegate.converter.type=org.apache.kafka.connect.json.JsonConverter
value.converter.delegate.converter.type.schemas.enable=false

delegate Converter 实现由 delegate.converter.type 选项指定。如果转换器需要任何额外的配置选项,也可以指定它们,如使用 schemas.enable=false 所示的模式禁用。

以下示例演示了如何配置 SMT,使用带有 Apicurio Registry 的委派转换器,将数据转换为 Avro 格式:

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
value.converter=io.debezium.converters.BinaryDataConverter
value.converter.delegate.converter.type=io.apicurio.registry.utils.converter.AvroConverter
value.converter.delegate.converter.apicurio.registry.url=http://apicurio:8080/apis/registry/v2
value.converter.delegate.converter.apicurio.registry.auto-register=true
value.converter.delegate.converter.registry.find-latest=true

最后,以下示例演示了如何配置 SMT,使用带有 Confluent Schema Registry 的委派转换器,将数据转换为 Avro 格式:

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
value.converter=io.debezium.converters.BinaryDataConverter
value.converter.delegate.converter.type=io.confluent.connect.avro.AvroConverter
value.converter.delegate.converter.type.basic.auth.credentials.source=USER_INFO
value.converter.delegate.converter.type.basic.auth.user.info={CREDENTIALS}
value.converter.delegate.converter.type.schema.registry.url={URL}
注意

在前面的配置示例,因为 AvroConverter 配置为委派转换器,因此需要第三方库。有关如何在类路径中添加第三方库的信息超出了本文档的范围。

7.9.6. 在 Debezium outbox 信息中发出其他字段

您的 outbox 表可能包含您要添加到发出的 outbox 消息中的值。例如,假设一个 outbox 表,它在 aggregatetype 列中的值为 purchase-order,另一个列 eventType,其可能的值是 order-createorder-shipped。可以使用语法 列添加其他字段:placement:alias

放置 允许的值有: - header - envelope - partition

要在 outbox 消息标头中发出 eventType 列值,请配置 SMT,如下所示:

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.fields.additional.placement=eventType:header:type

结果将是 Kafka 消息中的一个标头,类型为 type 作为其键,eventType 列的值作为其值。

要发送 outbox 消息 envelope 中的 eventType 列值,请配置 SMT,如下所示:

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.fields.additional.placement=eventType:envelope:type

要控制生成 outbox 消息的分区,请配置 SMT,如下所示:

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.fields.additional.placement=partitionColumn:partition

请注意,对于 分区 放置,添加一个别名无效。

7.9.7. 配置 outbox 事件路由器转换的选项

下表描述了您可以为 outbox 事件路由器 SMT 指定的选项。在表中,Group 列指示 Kafka 的配置选项分类。

表 7.9. outbox 事件路由器 SMT 配置选项的描述
选项默认描述

table.op.invalid.behavior

warn

决定在 outbox 表中有 UPDATE 操作时 SMT 的行为。可能的设置有:

  • warn - SMT 会记录警告并继续下一个 outbox 表记录。
  • error - SMT 记录错误,并继续到下一个 outbox 表记录。
  • fatal - SMT 日志错误,连接器会停止处理。

outbox 表中的所有更改都应该是 INSERT 操作。也就是说,开箱即用的表作为队列运行;不允许对 outbox 表中记录的更新。SMT 会在一个 outbox 表中自动过滤出 DELETE 操作。

table.field.event.id

id

指定包含唯一事件 ID 的 outbox 表列。此 ID 将存储在 id 键下的已发送事件的标头中。

table.field.event.key

aggregateid

指定包含事件键的 outbox 表列。当此列包含值时,SMT 使用该值作为发出的 outbox 消息中的键。这对于在 Kafka 分区中维护正确顺序非常重要。

table.field.event.timestamp

 

默认情况下,发出的 outbox 消息中的时间戳是 Debezium 事件时间戳。要在 outbox 消息中使用不同的时间戳,请将此选项设置为 outbox 表列,其中包含您要发出的 outbox 消息的时间戳。

table.field.event.payload

payload

指定包含事件有效负载的 outbox 表列。

table.expand.json.payload

false

指定是否应执行 String 有效负载的 JSON 扩展。如果没有找到内容或解析错误,则内容将保持"as is"。

如需了解更多详细信息,请参阅 扩展转义的 json 部分。

table.json.payload.null.behavior

ignore

当启用 JSON expansion property table.expand.json.payload 时,请确定 json 有效负载的行为,其中包括 outbox 表中的 null 值。可能的设置有:

  • ignore - Ignore the null 值。
  • optional_bytes - Keep the null 值,并将 null 视为连接的可选字节。

table.fields.additional.placement

 

表, Envelope

指定您要添加到 outbox 消息标头或 envelopes 的一个或多个 outbox 表列。指定以逗号分隔的对列表。在每个对中,指定列的名称以及是否希望该值位于标头中还是信封中。使用冒号将对中的值分开,例如:

id:header,my-field:envelope

要为列指定别名,请将别名的 trio 指定为第三个值,例如:

id:header,my-field:envelope:my-alias

第二个值是放置,它始终必须是 headerenvelope

配置示例包括在 emitting additional fields in Debezium outbox messages 中。

table.fields.additional.error.on.missing

true

表, Envelope

如果 Outbox 有效负载中没有找到由 table.fields.additional.placement 属性指定的字段,则指定此转换是否抛出错误。

table.field.event.schema.version

 

表, 架构

设置后,这个值将用作 schema 版本,如 Kafka Connect Schema Javadoc 所述。

route.by.field

aggregatetype

路由器

指定 outbox 表中列的名称。默认行为是,此列中的值成为连接器将 outbox 消息发送到的主题名称的一部分。示例是在 预期出箱表 的描述 中。

route.topic.regex

(?<routedByValue>.*)

路由器

指定 outbox SMT 在 RegexRouter 中应用到 outbox 表记录的正则表达式。此正则表达式是 route.topic.replacement SMT 选项的设置的一部分。

默认行为是将 route.topic.replacement SMT 选项设置中的 default ${routedByValue} 变量替换为 route.by.field outbox SMT 选项的设置。

route.topic.replacement

outbox.event​.${routedByValue}

路由器

指定连接器发出 outbox 消息的主题名称。默认主题名称为 outbox.event。 后跟 outbox 表记录中的 aggregatetype 列值。例如,如果 aggregatetype 值是 customer,则主题名称为 outbox.event. customers

要更改主题名称,您可以:

route.tombstone.on.empty.payload

false

路由器

指明是否为空或 null 有效负载会导致连接器发出 tombstone 事件。

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.