第 12 章 为应用程序配置 Debezium 连接器


当默认的 Debezium 连接器行为不适合您的应用程序时,您可以使用以下 Debezium 功能来配置您需要的行为。

Kafka Connect 自动主题创建
启用 Connect 在运行时创建主题,并根据名称将配置设置应用到这些主题。
avro serialization
支持将 Debezium PostgreSQL、MongoDB 或 SQL Server 连接器配置为使用 Avro 来序列化消息键和值,从而使更改事件记录用户更容易适应更改记录模式。
xref:configuring-notifications-to-report-connector-status
提供了一种机制,可以通过一组可配置的频道来公开有关连接器的状态信息。
CloudEvents converter
启用 Debezium 连接器来发出符合 CloudEvents 规格的更改事件记录。
向 Debezium 连接器发送信号
提供修改连接器行为或触发操作的方法,如启动临时快照。

12.1. 自定义 Kafka Connect 自动主题创建

Kafka 提供了两种创建主题的机制。您可以为 Kafka 代理启用自动主题创建,并以 Kafka 2.6.0 开始,也可以启用 Kafka Connect 来创建主题。Kafka 代理使用 auto.create.topics.enable 属性来控制自动主题创建。在 Kafka Connect 中,topic.creation.enable 属性指定是否允许 Kafka Connect 创建主题。在这两种情况下,属性的默认设置都启用自动主题创建。

当启用自动主题创建时,如果 Debezium 源连接器为没有目标主题的表发出更改事件记录,则会在运行时创建该主题,因为事件记录被嵌套到 Kafka 中。

在代理和 Kafka Connect 中自动创建主题之间的区别

代理创建的主题仅限于共享单个默认配置。代理无法将唯一配置应用到不同的主题或一组主题。相反,Kafka Connect 可以在创建主题时应用任何多个配置,设置复制因素、分区数量和其他特定于主题的设置,如 Debezium 连接器配置中指定的。连接器配置定义了一组主题创建组,并将一组主题配置属性与每个组相关联。

代理配置和 Kafka Connect 配置相互独立。Kafka Connect 都可以创建主题,无论是否在代理中禁用主题创建。如果您在代理和 Kafka Connect 上启用自动主题创建,则 Connect 配置才会具有优先权,且代理仅在 Kafka Connect 配置中没有设置时创建主题。

如需更多信息,请参阅以下内容:

12.1.1. 为 Kafka 代理禁用自动主题创建

默认情况下,如果主题尚不存在,Kafka 代理配置可让代理在运行时创建主题。代理创建的主题无法使用自定义属性配置。如果您使用早于 2.6.0 的 Kafka 版本,且您希望使用特定配置创建主题,则必须在代理中禁用自动创建主题,然后显式创建主题,或者通过自定义部署过程创建。

流程

  • 在代理配置中,将 auto.create.topics.enable 的值设置为 false

12.1.2. 在 Kafka Connect 中配置自动主题创建

Kafka Connect 中的自动主题创建由 topic.creation.enable 属性控制。属性的默认值为 true,启用自动主题创建,如下例所示:

topic.creation.enable = true

topic.creation.enable 属性的设置应用到 Connect 集群中的所有 worker。

Kafka Connect 自动主题创建需要您定义 Kafka Connect 在创建主题时应用的配置属性。您可以通过定义主题组,然后在 Debezium 连接器配置中指定主题配置属性,然后指定要应用到每个组的属性。连接器配置定义了默认主题创建组,以及一个或多个自定义主题创建组。自定义主题创建组使用主题名称模式列表来指定组设置要应用到的主题。

有关 Kafka Connect 如何匹配主题到主题创建组的详情,请参阅 主题创建组。有关如何将配置属性分配给组的更多信息,请参阅 主题创建组配置属性

默认情况下,Kafka Connect 创建的主题会根据模式 server.schema.table 命名,例如 dbserver.myschema.inventory

流程

  • 要防止 Kafka Connect 自动创建主题,在 Kafka Connect 自定义资源中将 topic.creation.enable 的值设置为 false,如下例所示:
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaConnect
metadata:
  name: my-connect-cluster

...

spec:
  config:
    topic.creation.enable: "false"
注意

Kafka Connect 自动主题创建需要至少为 default 主题创建组设置 replication.factorpartitions 属性。组对组有效,可从 Kafka 代理的默认值获取所需属性的值。

12.1.3. 配置自动创建的主题

对于 Kafka Connect 自动创建主题,它需要源连接器中有关配置属性的信息,以便在创建主题时应用。您可以在每个 Debezium 连接器的配置中定义控制主题创建的属性。当 Kafka Connect 为连接器发出的事件记录创建主题时,生成的主题会从适用的组获取其配置。该配置仅适用于该连接器发送的事件记录。

12.1.3.1. 主题创建组

一组主题属性与主题创建组关联。最小,您必须定义一个 默认 主题创建组并指定其配置属性。除此之外,您还可以定义一个或多个自定义主题创建组,并为每个组指定唯一属性。

在创建自定义主题创建组时,您可以根据主题名称模式为每个组定义成员主题。您可以指定描述每个组中包含或排除的主题的命名模式。includeexclude 属性包含用于定义主题名称模式的正则表达式列表。例如,如果您想一个组包含以字符串 dbserver1.inventory 开头的所有主题,请将 topic.creation.inventory.include 属性的值设置为 dbserver1\\.inventoryRaft

注意

如果您同时为自定义主题组指定 includeexclude 属性,则排除规则具有优先权,并覆盖包含的规则。

12.1.3.2. 主题创建组配置属性

默认 主题创建组以及每个自定义组都与一组唯一的配置属性关联。您可以将组配置为包含任何 Kafka 主题级配置属性。例如,您可以为旧主题片段 指定清理策略,保留时间或主题 组的主题压缩类型。您必须至少定义一组最小属性,来描述要创建的主题的配置。

如果没有注册自定义组,或者任何注册的组的 include 模式与要创建的任何主题的名称不匹配,则 Kafka Connect 使用默认组的配置来创建主题。

有关配置主题的常规信息,请参阅在 OpenShift 上安装 Debezium 中的 Kafka 主题创建建议

12.1.3.3. 指定 Debezium 默认主题创建组的配置

在使用 Kafka Connect 自动主题创建前,您必须创建一个默认主题创建组并为它定义配置。默认主题创建组的配置应用于名称与自定义主题创建组的 include 列表模式匹配的任何主题。

先决条件

  • 在 Kafka Connect 自定义资源中,metadata.annotations 中的 use-connector-resources 值指定集群 Operator 使用 KafkaConnector 自定义资源在集群中配置连接器。例如:

     ...
        metadata:
          name: my-connect-cluster
          annotations: strimzi.io/use-connector-resources: "true"
     ...

流程

  • 要为 topic.creation.default 组定义属性,请将它们添加到连接器自定义资源的 spec.config 中,如下例所示:

    apiVersion: kafka.strimzi.io/v1beta1
    kind: KafkaConnector
    metadata:
      name: inventory-connector
      labels:
        strimzi.io/cluster: my-connect-cluster
    spec:
    ...
    
       config:
    ...
         topic.creation.default.replication.factor: 3  1
         topic.creation.default.partitions: 10  2
         topic.creation.default.cleanup.policy: compact  3
         topic.creation.default.compression.type: lz4  4
    ...

    您可以在 default 组配置中包含任何 Kafka 主题级配置属性

表 12.1. 默认 主题创建组的连接器配置
描述

1

topic.creation.default.replication.factor 定义了由默认组创建的主题的复制因素。
replication.factor 对于 default 组是必需的,但对于自定义组是可选的。如果没有设置,自定义组将回退到默认组的值。使用 -1 使用 Kafka 代理的默认值。

2

topic.creation.default.partitions 为默认组创建的主题定义分区数量。
partitions 对于默认组来说是强制的,但对于自定义组是可选的。如果没有设置,自定义组将回退到默认组的值。使用 -1 使用 Kafka 代理的默认值。

3

topic.creation.default.cleanup.policy 映射到 主题级别配置参数cleanup.policy 属性,并定义日志保留策略。

4

topic.creation.default.compression.type 映射到 主题级别配置参数compression.type 属性,并定义如何在硬盘上压缩消息。

注意

自定义组仅回退到所需的 replication.factorpartitions 属性 的默认 组设置。如果自定义主题组的配置保留了其他属性未定义,则不会应用在默认组中指定的值。

12.1.3.4. 指定 Debezium 自定义主题创建组的配置

您可以定义多个自定义主题组,每个组都有自己的配置。

流程

  • 要定义自定义主题组,在连接器自定义资源中添加一个 topic.creation.<group_name>.include property to spec.config,后跟您要应用到自定义组中的主题的配置属性。

    以下示例显示了定义自定义主题创建组 inventoryapplicationlogs 的自定义资源摘录:

    apiVersion: kafka.strimzi.io/v1beta1
    kind: KafkaConnector
    metadata:
      name: inventory-connector
    ...
    spec:
    ...
    
       config:
    ... 1
        topic.creation.inventory.include: dbserver1\\.inventory\\.*  2
        topic.creation.inventory.partitions: 20
        topic.creation.inventory.cleanup.policy: compact
        topic.creation.inventory.delete.retention.ms: 7776000000
    
        3
        topic.creation.applicationlogs.include: dbserver1\\.logs\\.applog-.* 4
        topic.creation.applicationlogs.exclude": dbserver1\\.logs\\.applog-old-.*  5
        topic.creation.applicationlogs.replication.factor: 1
        topic.creation.applicationlogs.partitions: 20
        topic.creation.applicationlogs.cleanup.policy: delete
        topic.creation.applicationlogs.retention.ms: 7776000000
        topic.creation.applicationlogs.compression.type: lz4
    ...
    ...
表 12.2. 自定义 清单和 applicationlogs 主题创建组的连接器配置
描述

1

定义 清单组 的配置。
对于 自定义组,copy.factorpartitions 属性是可选的。如果没有设置值,自定义组会返回到 default 组设置的值。将值设为 -1 以使用为 Kafka 代理设置的值。

2

topic.creation.inventory.include 定义一个正则表达式,以匹配以 dbserver1.inventory. 开头的所有主题。为 清单组 定义的配置仅应用于名称与指定正则表达式匹配的主题。

3

定义 applicationlogs 组的配置。
对于 自定义组,copy.factorpartitions 属性是可选的。如果没有设置值,自定义组会返回到 default 组设置的值。将值设为 -1 以使用为 Kafka 代理设置的值。

4

topic.creation.applicationlogs.include 定义了一个正则表达式,以匹配以 dbserver1.logs.applog- 开头的所有主题。为 applicationlogs 组定义的配置仅应用于名称与指定正则表达式匹配的主题。由于此组也定义了 exclude 属性,所以与该 exclude 属性限制与 include 正则表达式匹配的主题可能会进一步限制。

5

topic.creation.applicationlogs.exclude 定义了一个正则表达式,以匹配以 dbserver1.logs.applog-old- 开头的所有主题。为 applicationlogs 组定义的配置仅应用于名称与给定正则表达式 不匹配的 主题。因为已为这个组定义了一个 include 属性,applicationlogs 组的配置仅应用于名称与指定的 include 正则表达式的名称匹配的,与指定的 exclude 正则表达式匹配的主题。

12.1.3.5. 注册 Debezium 自定义主题创建组

在为任何自定义主题创建组指定配置后,注册组。

流程

  • 通过将 topic.creation.groups 属性添加到连接器自定义资源,并指定以逗号分隔的自定义主题创建组列表来注册自定义组。

    连接器自定义资源的以下摘录注册自定义主题创建组 inventoryapplicationlogs

    apiVersion: kafka.strimzi.io/v1beta1
    kind: KafkaConnector
    metadata:
      name: inventory-connector
    ...
    spec:
    ...
    
       config:
         topic.creation.groups: inventory,applicationlogs
    
    ...

完成的配置

以下示例显示了包含 默认 主题组的配置的已完成配置,以及 清单 的配置和 applicationlogs 自定义主题创建组:

示例:配置默认主题创建组和两个自定义组

apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaConnector
metadata:
  name: inventory-connector
...
spec:
...

   config:
...
    topic.creation.default.replication.factor: 3,
    topic.creation.default.partitions: 10,
    topic.creation.default.cleanup.policy: compact
    topic.creation.default.compression.type: lz4
    topic.creation.groups: inventory,applicationlogs
    topic.creation.inventory.include: dbserver1\\.inventory\\.*
    topic.creation.inventory.partitions: 20
    topic.creation.inventory.cleanup.policy: compact
    topic.creation.inventory.delete.retention.ms: 7776000000
    topic.creation.applicationlogs.include: dbserver1\\.logs\\.applog-.*
    topic.creation.applicationlogs.exclude": dbserver1\\.logs\\.applog-old-.*
    topic.creation.applicationlogs.replication.factor: 1
    topic.creation.applicationlogs.partitions: 20
    topic.creation.applicationlogs.cleanup.policy: delete
    topic.creation.applicationlogs.retention.ms: 7776000000
    topic.creation.applicationlogs.compression.type: lz4
...

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.