第 12 章 为应用程序配置 Debezium 连接器
当默认的 Debezium 连接器行为不适合您的应用程序时,您可以使用以下 Debezium 功能来配置您需要的行为。
- Kafka Connect 自动主题创建
- 启用 Connect 在运行时创建主题,并根据名称将配置设置应用到这些主题。
- avro serialization
- 支持将 Debezium PostgreSQL、MongoDB 或 SQL Server 连接器配置为使用 Avro 来序列化消息键和值,从而使更改事件记录用户更容易适应更改记录模式。
- xref:configuring-notifications-to-report-connector-status
- 提供了一种机制,可以通过一组可配置的频道来公开有关连接器的状态信息。
- CloudEvents converter
- 启用 Debezium 连接器来发出符合 CloudEvents 规格的更改事件记录。
- 向 Debezium 连接器发送信号
- 提供修改连接器行为或触发操作的方法,如启动临时快照。
12.1. 自定义 Kafka Connect 自动主题创建
Kafka 提供了两种创建主题的机制。您可以为 Kafka 代理启用自动主题创建,并以 Kafka 2.6.0 开始,也可以启用 Kafka Connect 来创建主题。Kafka 代理使用 auto.create.topics.enable
属性来控制自动主题创建。在 Kafka Connect 中,topic.creation.enable
属性指定是否允许 Kafka Connect 创建主题。在这两种情况下,属性的默认设置都启用自动主题创建。
当启用自动主题创建时,如果 Debezium 源连接器为没有目标主题的表发出更改事件记录,则会在运行时创建该主题,因为事件记录被嵌套到 Kafka 中。
在代理和 Kafka Connect 中自动创建主题之间的区别
代理创建的主题仅限于共享单个默认配置。代理无法将唯一配置应用到不同的主题或一组主题。相反,Kafka Connect 可以在创建主题时应用任何多个配置,设置复制因素、分区数量和其他特定于主题的设置,如 Debezium 连接器配置中指定的。连接器配置定义了一组主题创建组,并将一组主题配置属性与每个组相关联。
代理配置和 Kafka Connect 配置相互独立。Kafka Connect 都可以创建主题,无论是否在代理中禁用主题创建。如果您在代理和 Kafka Connect 上启用自动主题创建,则 Connect 配置才会具有优先权,且代理仅在 Kafka Connect 配置中没有设置时创建主题。
如需更多信息,请参阅以下内容:
12.1.1. 为 Kafka 代理禁用自动主题创建
默认情况下,如果主题尚不存在,Kafka 代理配置可让代理在运行时创建主题。代理创建的主题无法使用自定义属性配置。如果您使用早于 2.6.0 的 Kafka 版本,且您希望使用特定配置创建主题,则必须在代理中禁用自动创建主题,然后显式创建主题,或者通过自定义部署过程创建。
流程
-
在代理配置中,将
auto.create.topics.enable
的值设置为false
。
12.1.2. 在 Kafka Connect 中配置自动主题创建
Kafka Connect 中的自动主题创建由 topic.creation.enable
属性控制。属性的默认值为 true
,启用自动主题创建,如下例所示:
topic.creation.enable = true
topic.creation.enable
属性的设置应用到 Connect 集群中的所有 worker。
Kafka Connect 自动主题创建需要您定义 Kafka Connect 在创建主题时应用的配置属性。您可以通过定义主题组,然后在 Debezium 连接器配置中指定主题配置属性,然后指定要应用到每个组的属性。连接器配置定义了默认主题创建组,以及一个或多个自定义主题创建组。自定义主题创建组使用主题名称模式列表来指定组设置要应用到的主题。
有关 Kafka Connect 如何匹配主题到主题创建组的详情,请参阅 主题创建组。有关如何将配置属性分配给组的更多信息,请参阅 主题创建组配置属性。
默认情况下,Kafka Connect 创建的主题会根据模式 server.schema.table
命名,例如 dbserver.myschema.inventory
。
流程
-
要防止 Kafka Connect 自动创建主题,在 Kafka Connect 自定义资源中将
topic.creation.enable
的值设置为false
,如下例所示:
apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnect metadata: name: my-connect-cluster ... spec: config: topic.creation.enable: "false"
Kafka Connect 自动主题创建需要至少为 default
主题创建组设置 replication.factor
和 partitions
属性。组对组有效,可从 Kafka 代理的默认值获取所需属性的值。
12.1.3. 配置自动创建的主题
对于 Kafka Connect 自动创建主题,它需要源连接器中有关配置属性的信息,以便在创建主题时应用。您可以在每个 Debezium 连接器的配置中定义控制主题创建的属性。当 Kafka Connect 为连接器发出的事件记录创建主题时,生成的主题会从适用的组获取其配置。该配置仅适用于该连接器发送的事件记录。
12.1.3.1. 主题创建组
一组主题属性与主题创建组关联。最小,您必须定义一个 默认
主题创建组并指定其配置属性。除此之外,您还可以定义一个或多个自定义主题创建组,并为每个组指定唯一属性。
在创建自定义主题创建组时,您可以根据主题名称模式为每个组定义成员主题。您可以指定描述每个组中包含或排除的主题的命名模式。include
和 exclude
属性包含用于定义主题名称模式的正则表达式列表。例如,如果您想一个组包含以字符串 dbserver1.inventory
开头的所有主题,请将 topic.creation.inventory.include
属性的值设置为 dbserver1\\.inventoryRaft
。
如果您同时为自定义主题组指定 include
和 exclude
属性,则排除规则具有优先权,并覆盖包含的规则。
12.1.3.2. 主题创建组配置属性
默认
主题创建组以及每个自定义组都与一组唯一的配置属性关联。您可以将组配置为包含任何 Kafka 主题级配置属性。例如,您可以为旧主题片段 指定清理策略,保留时间,或主题 组的主题压缩类型。您必须至少定义一组最小属性,来描述要创建的主题的配置。
如果没有注册自定义组,或者任何注册的组的 include
模式与要创建的任何主题的名称不匹配,则 Kafka Connect 使用默认
组的配置来创建主题。
有关配置主题的常规信息,请参阅在 OpenShift 上安装 Debezium 中的 Kafka 主题创建建议。
12.1.3.3. 指定 Debezium 默认主题创建组的配置
在使用 Kafka Connect 自动主题创建前,您必须创建一个默认主题创建组并为它定义配置。默认主题创建组的配置应用于名称与自定义主题创建组的 include
列表模式匹配的任何主题。
先决条件
在 Kafka Connect 自定义资源中,
metadata.annotations
中的use-connector-resources
值指定集群 Operator 使用 KafkaConnector 自定义资源在集群中配置连接器。例如:... metadata: name: my-connect-cluster annotations: strimzi.io/use-connector-resources: "true" ...
流程
要为
topic.creation.default
组定义属性,请将它们添加到连接器自定义资源的spec.config
中,如下例所示:apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnector metadata: name: inventory-connector labels: strimzi.io/cluster: my-connect-cluster spec: ... config: ... topic.creation.default.replication.factor: 3 1 topic.creation.default.partitions: 10 2 topic.creation.default.cleanup.policy: compact 3 topic.creation.default.compression.type: lz4 4 ...
您可以在
default
组配置中包含任何 Kafka 主题级配置属性。
项 | 描述 |
---|---|
1 |
|
2 |
|
3 |
|
4 |
|
自定义组仅回退到所需的 replication.factor
和 partitions
属性 的默认
组设置。如果自定义主题组的配置保留了其他属性未定义,则不会应用在默认组中指定的值。
12.1.3.4. 指定 Debezium 自定义主题创建组的配置
您可以定义多个自定义主题组,每个组都有自己的配置。
流程
要定义自定义主题组,在连接器自定义资源中添加一个
topic.creation.<group_name>.include
property tospec.config
,后跟您要应用到自定义组中的主题的配置属性。以下示例显示了定义自定义主题创建组
inventory
和applicationlogs
的自定义资源摘录:apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnector metadata: name: inventory-connector ... spec: ... config: ... 1 topic.creation.inventory.include: dbserver1\\.inventory\\.* 2 topic.creation.inventory.partitions: 20 topic.creation.inventory.cleanup.policy: compact topic.creation.inventory.delete.retention.ms: 7776000000 3 topic.creation.applicationlogs.include: dbserver1\\.logs\\.applog-.* 4 topic.creation.applicationlogs.exclude": dbserver1\\.logs\\.applog-old-.* 5 topic.creation.applicationlogs.replication.factor: 1 topic.creation.applicationlogs.partitions: 20 topic.creation.applicationlogs.cleanup.policy: delete topic.creation.applicationlogs.retention.ms: 7776000000 topic.creation.applicationlogs.compression.type: lz4 ... ...
项 | 描述 |
---|---|
1 |
定义 |
2 |
|
3 |
定义 |
4 |
|
5 |
|
12.1.3.5. 注册 Debezium 自定义主题创建组
在为任何自定义主题创建组指定配置后,注册组。
流程
通过将
topic.creation.groups
属性添加到连接器自定义资源,并指定以逗号分隔的自定义主题创建组列表来注册自定义组。连接器自定义资源的以下摘录注册自定义主题创建组
inventory
和applicationlogs
:apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnector metadata: name: inventory-connector ... spec: ... config: topic.creation.groups: inventory,applicationlogs ...
完成的配置
以下示例显示了包含 默认
主题组的配置的已完成配置,以及 清单
的配置和 applicationlogs
自定义主题创建组:
示例:配置默认主题创建组和两个自定义组
apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnector metadata: name: inventory-connector ... spec: ... config: ... topic.creation.default.replication.factor: 3, topic.creation.default.partitions: 10, topic.creation.default.cleanup.policy: compact topic.creation.default.compression.type: lz4 topic.creation.groups: inventory,applicationlogs topic.creation.inventory.include: dbserver1\\.inventory\\.* topic.creation.inventory.partitions: 20 topic.creation.inventory.cleanup.policy: compact topic.creation.inventory.delete.retention.ms: 7776000000 topic.creation.applicationlogs.include: dbserver1\\.logs\\.applog-.* topic.creation.applicationlogs.exclude": dbserver1\\.logs\\.applog-old-.* topic.creation.applicationlogs.replication.factor: 1 topic.creation.applicationlogs.partitions: 20 topic.creation.applicationlogs.cleanup.policy: delete topic.creation.applicationlogs.retention.ms: 7776000000 topic.creation.applicationlogs.compression.type: lz4 ...