第 11 章 为应用程序配置 Debezium 连接器


当默认 Debezium 连接器行为不适用于应用程序时,您可以使用以下 Debezium 功能来配置您需要的行为。

Kafka Connect 自动主题创建
启用连接在运行时创建主题,并根据名称将配置设置应用到这些主题。
Avro 序列化
支持配置 Debezium PostgreSQL、MongoDB 或 SQL Server 连接器,以使用 Avro 序列化消息键和值,从而更轻松地更改事件记录用户以适应更改记录架构。
CloudEvents converter
启用 Debezium 连接器来发出符合 CloudEvents 规格的更改事件记录。
将信号发送到 Debezium 连接器
提供修改连接器行为或触发操作的方法,如启动临时快照。

11.1. 自定义 Kafka Connect 自动主题创建

Kafka 提供两个自动创建主题的机制。您可以为 Kafka 代理启用自动主题创建,并以 Kafka 2.6.0 开始,也可以启用 Kafka Connect 来创建主题。Kafka 代理使用 auto.create.topics.enable 属性来控制自动主题创建。在 Kafka Connect 中,topic.creation.enable 属性指定是否允许 Kafka Connect 创建主题。在这两种情况下,属性的默认设置都启用自动主题创建。

启用自动主题创建后,如果 Debezium 源连接器为没有目标主题的表发出更改事件记录,则该主题会在运行时创建,因为事件记录最接近到 Kafka 中。

代理和 Kafka Connect 自动主题创建之间的区别

代理创建的主题仅限于共享单个默认配置。代理无法将唯一配置应用到不同的主题或一组主题。相反,Kafka Connect 可以在创建主题时应用任何几个配置,设置复制因素、分区数量和其他特定于主题的设置,如 Debezium 连接器配置中指定的。连接器配置定义了一组主题创建组,并将一组主题配置属性与每个组相关联。

代理配置和 Kafka Connect 配置相互独立。无论您在代理中禁用主题创建,Kafka Connect 都可以创建主题。如果您在代理和 Kafka Connect 中启用自动主题创建,则连接配置具有优先权,代理仅在 Kafka Connect 配置中的任何设置时才创建主题。

如需更多信息,请参阅以下主题:

11.1.1. 为 Kafka 代理禁用自动主题创建

默认情况下,如果主题尚不存在,Kafka 代理配置可让代理在运行时创建主题。代理创建的主题无法使用自定义属性配置。如果您使用早于 2.6.0 的 Kafka 版本,且您希望使用特定配置创建主题,则必须在代理中禁用自动创建主题,然后显式创建主题,或者通过自定义部署过程创建。

流程

  • 在代理配置中,将 auto.create.topics.enable 的值设置为 false

11.1.2. 在 Kafka Connect 中配置自动主题创建

Kafka Connect 中的自动主题创建由 topic.creation.enable 属性控制。属性的默认值为 true,启用自动主题创建,如下例所示:

topic.creation.enable = true

topic.creation.enable 属性的设置应用到 Connect 集群中的所有 worker。

Kafka Connect 自动主题创建需要您定义 Kafka Connect 在创建主题时适用的配置属性。您可以通过定义主题组,然后在 Debezium 连接器配置中指定主题配置属性,然后指定要应用到每个组的属性。连接器配置定义了默认主题创建组,以及可选的一个或多个自定义主题创建组。自定义主题创建组使用主题名称模式列表来指定组设置应用到的主题。

有关 Kafka Connect 与主题创建组匹配的详细信息,请参阅 主题创建组。有关如何将配置属性分配给组的更多信息,请参阅 主题创建组配置属性

默认情况下,Kafka Connect 创建的主题根据模式 server.schema.table 命名,如 dbserver.myschema.inventory

流程

  • 要防止 Kafka Connect 自动创建主题,请在 Kafka Connect 自定义资源中将 topic.creation.enable 的值设置为 false,如下例所示:
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaConnect
metadata:
  name: my-connect-cluster

...

spec:
  config:
    topic.creation.enable: "false"
注意

Kafka Connect 自动主题创建需要至少为 default 主题创建组设置 replication.factorpartitions 属性。对组有效,可从 Kafka 代理的默认值中获取所需属性的值。

11.1.3. 配置自动创建的主题

要使 Kafka Connect 自动创建主题,它需要源连接器中有关配置属性的信息,才能在创建主题时应用。您可以在每个 Debezium 连接器的配置中定义控制主题创建的属性。当 Kafka Connect 为连接器发出的事件记录创建主题,生成的主题会从适用的组获取其配置。该配置只适用于仅由该连接器发送的事件记录。

11.1.3.1. 主题创建组

一组主题属性与主题创建组关联。最少,您必须定义 默认 主题创建组并指定其配置属性。此外,您还可以定义一个或多个自定义主题创建组,并为每个组指定唯一的属性。

当您创建自定义主题创建组时,您可以根据主题名称模式为每个组定义成员主题。您可以指定描述每个组中包含或排除的主题的命名模式。includeexclude 属性包含用来定义主题名称模式的正则表达式的逗号分隔列表。例如,如果您希望组包含以字符串 dbserver1.inventory 开头的所有主题,请将其 topic.creation.inventory.include 属性的值设置为 dbserver1\\.inventory\\

注意

如果您同时为自定义主题组指定 includeexclude 属性,则排除规则具有优先权,并覆盖包含的规则。

11.1.3.2. 主题创建组配置属性

默认 主题创建组,每个自定义组都与一组唯一的配置属性关联。您可以将组配置为包含任何 Kafka 主题级配置属性。例如,您可以为旧主题片段保留时间、主题组指定 主题压缩类型 指定清理策略。您必须至少定义一组最小属性,以描述要创建的主题的配置。

如果没有注册自定义组,或者任何注册的组的 include 模式与要创建的任何主题的名称不匹配,则 Kafka Connect 使用默认组的配置来创建主题。

有关配置主题的常规信息,请参阅在 OpenShift 上安装 Debezium 中的 Kafka 主题创建建议

11.1.3.3. 指定 Debezium 默认主题创建组的配置

在使用 Kafka Connect 自动主题创建前,您必须创建一个默认主题创建组并为它定义配置。默认主题创建组的配置应用于名称与自定义主题创建组的 include 列表模式匹配的任何主题。

先决条件

  • 在 Kafka Connect 自定义资源中,metadata.annotations 中的 use-connector-resources 值指定集群 Operator 使用 KafkaConnector 自定义资源在集群中配置连接器。例如:

     ...
        metadata:
          name: my-connect-cluster
          annotations: strimzi.io/use-connector-resources: "true"
     ...

流程

  • 要为 topic.creation.default 组定义属性,将它们添加到连接器自定义资源的 spec.config 中,如下例所示:

    apiVersion: kafka.strimzi.io/v1beta1
    kind: KafkaConnector
    metadata:
      name: inventory-connector
      labels:
        strimzi.io/cluster: my-connect-cluster
    spec:
    ...
    
       config:
    ...
         topic.creation.default.replication.factor: 3  1
         topic.creation.default.partitions: 10  2
         topic.creation.default.cleanup.policy: compact  3
         topic.creation.default.compression.type: lz4  4
    ...

    您可以在默认组配置中包含任何 Kafka 主题级配置属性

表 11.1. 默认 主题创建组的连接器配置
描述

1

topic.creation. default.replication.factor 为默认组创建的主题定义复制因素。
replication.factor 对于默认组来说是强制的,但对于自定义组是可选的。如果没有设置,自定义组将回退到默认组的值。使用 -1 来使用 Kafka 代理的默认值。

2

topic.creation. default.partitions 为默认组创建的主题定义分区数量。
分区 对于默认组来说是强制的,但对于自定义组是可选的。如果没有设置,自定义组将回退到默认组的值。使用 -1 来使用 Kafka 代理的默认值。

3

topic.creation.default.cleanup.policy 映射到 主题级别配置参数cleanup.policy 属性,并且定义日志保留策略。

4

topic.creation.default.compression.type 映射到 主题级别配置参数compression.type 属性,并定义如何在硬盘上压缩消息。

注意

自定义组只回退到所需的 replication.factorpartitions 属性 的默认 组设置。如果自定义主题组的配置未定义其他属性,则不会应用 默认组中 指定的值。

11.1.3.4. 指定 Debezium 自定义主题创建组的配置

您可以定义多个自定义主题组,每个主题组都有自己的配置。

流程

  • 要定义自定义主题组,在连接器自定义资源中添加一个 topic.creation.<group_name>.include property to spec.config,后跟您要应用到自定义组中的主题的配置属性。

    以下示例显示了定义自定义主题创建组 清单和 applicationlogs 的自定义资源摘录:

    apiVersion: kafka.strimzi.io/v1beta1
    kind: KafkaConnector
    metadata:
      name: inventory-connector
    ...
    spec:
    ...
    
       config:
    ... 1
        topic.creation.inventory.include: dbserver1\\.inventory\\.*  2
        topic.creation.inventory.partitions: 20
        topic.creation.inventory.cleanup.policy: compact
        topic.creation.inventory.delete.retention.ms: 7776000000
    
        3
        topic.creation.applicationlogs.include: dbserver1\\.logs\\.applog-.* 4
        topic.creation.applicationlogs.exclude": dbserver1\\.logs\\.applog-old-.*  5
        topic.creation.applicationlogs.replication.factor: 1
        topic.creation.applicationlogs.partitions: 20
        topic.creation.applicationlogs.cleanup.policy: delete
        topic.creation.applicationlogs.retention.ms: 7776000000
        topic.creation.applicationlogs.compression.type: lz4
    ...
    ...
表 11.2. 自定义清单 和应用程序日志 主题创建组的连接器配置
描述

1

定义 清单组 的配置。
对于自定义组,copy .factorpartitions 属性是可选的。如果没有设置值,自定义组会返回为默认组设置的值。将值设为 -1 以使用为 Kafka 代理设置的值。

2

topic.creation.inventory.include 定义一个正则表达式,以匹配以 dbserver1.inventory. 开头的所有主题。为 清单组 定义的配置仅应用于名称与指定正则表达式匹配的主题。

3

定义 applicationlogs 组的配置。
对于自定义组,copy .factorpartitions 属性是可选的。如果没有设置值,自定义组会返回为默认组设置的值。将值设为 -1 以使用为 Kafka 代理设置的值。

4

topic.creation.applicationlogs.include 定义一个正则表达式,以匹配以 dbserver1.logs.applog- 开头的所有主题。为 applicationlogs 组定义的配置仅应用于名称与指定正则表达式匹配的主题。由于也会为这个组定义了 exclude 属性,因此与 include 正则表达式匹配的主题可能会被 排除 属性进一步限制。

5

topic.creation.applicationlogs.exclude 定义了一个正则表达式,以匹配以 dbserver1.logs.applog-old- 开头的所有主题。为 applicationlogs 组定义的配置仅应用于名称与给定的正则表达式 不匹配的 主题。因为已为这个组定义了一个 include 属性,applicationlogs 组的配置仅应用于名称与指定的 include 正则表达式的名称匹配的,与指定的 exclude 正则表达式匹配的主题。

11.1.3.5. 注册 Debezium 自定义主题创建组

在为任何自定义主题创建组指定配置后,请注册组。

流程

  • 通过在连接器自定义资源中添加 topic.creation.groups 属性,并指定以逗号分隔的自定义主题创建组列表来注册自定义组。

    以下来自连接器自定义资源摘录会注册自定义主题创建组 清单 和应用程序日志

    apiVersion: kafka.strimzi.io/v1beta1
    kind: KafkaConnector
    metadata:
      name: inventory-connector
    ...
    spec:
    ...
    
       config:
         topic.creation.groups: inventory,applicationlogs
    
    ...

完成的配置

以下示例显示了一个已完成的配置,其中包含 默认 主题组的配置,以及 清单的 配置和应用程序logs 自定义主题创建组:

示例:配置默认主题创建和两个自定义组

apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaConnector
metadata:
  name: inventory-connector
...
spec:
...

   config:
...
    topic.creation.default.replication.factor: 3,
    topic.creation.default.partitions: 10,
    topic.creation.default.cleanup.policy: compact
    topic.creation.default.compression.type: lz4
    topic.creation.groups: inventory,applicationlogs
    topic.creation.inventory.include: dbserver1\\.inventory\\.*
    topic.creation.inventory.partitions: 20
    topic.creation.inventory.cleanup.policy: compact
    topic.creation.inventory.delete.retention.ms: 7776000000
    topic.creation.applicationlogs.include: dbserver1\\.logs\\.applog-.*
    topic.creation.applicationlogs.exclude": dbserver1\\.logs\\.applog-old-.*
    topic.creation.applicationlogs.replication.factor: 1
    topic.creation.applicationlogs.partitions: 20
    topic.creation.applicationlogs.cleanup.policy: delete
    topic.creation.applicationlogs.retention.ms: 7776000000
    topic.creation.applicationlogs.compression.type: lz4
...

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.