第 11 章 为应用程序配置 Debezium 连接器
当默认 Debezium 连接器行为不适用于应用程序时,您可以使用以下 Debezium 功能来配置您需要的行为。
- Kafka Connect 自动主题创建
- 启用连接在运行时创建主题,并根据名称将配置设置应用到这些主题。
- Avro 序列化
- 支持配置 Debezium PostgreSQL、MongoDB 或 SQL Server 连接器,以使用 Avro 序列化消息键和值,从而更轻松地更改事件记录用户以适应更改记录架构。
- CloudEvents converter
- 启用 Debezium 连接器来发出符合 CloudEvents 规格的更改事件记录。
- 将信号发送到 Debezium 连接器
- 提供修改连接器行为或触发操作的方法,如启动临时快照。
11.1. 自定义 Kafka Connect 自动主题创建
Kafka 提供两个自动创建主题的机制。您可以为 Kafka 代理启用自动主题创建,并以 Kafka 2.6.0 开始,也可以启用 Kafka Connect 来创建主题。Kafka 代理使用 auto.create.topics.enable
属性来控制自动主题创建。在 Kafka Connect 中,topic.creation.enable
属性指定是否允许 Kafka Connect 创建主题。在这两种情况下,属性的默认设置都启用自动主题创建。
启用自动主题创建后,如果 Debezium 源连接器为没有目标主题的表发出更改事件记录,则该主题会在运行时创建,因为事件记录最接近到 Kafka 中。
代理和 Kafka Connect 自动主题创建之间的区别
代理创建的主题仅限于共享单个默认配置。代理无法将唯一配置应用到不同的主题或一组主题。相反,Kafka Connect 可以在创建主题时应用任何几个配置,设置复制因素、分区数量和其他特定于主题的设置,如 Debezium 连接器配置中指定的。连接器配置定义了一组主题创建组,并将一组主题配置属性与每个组相关联。
代理配置和 Kafka Connect 配置相互独立。无论您在代理中禁用主题创建,Kafka Connect 都可以创建主题。如果您在代理和 Kafka Connect 中启用自动主题创建,则连接配置具有优先权,代理仅在 Kafka Connect 配置中的任何设置时才创建主题。
如需更多信息,请参阅以下主题:
11.1.1. 为 Kafka 代理禁用自动主题创建
默认情况下,如果主题尚不存在,Kafka 代理配置可让代理在运行时创建主题。代理创建的主题无法使用自定义属性配置。如果您使用早于 2.6.0 的 Kafka 版本,且您希望使用特定配置创建主题,则必须在代理中禁用自动创建主题,然后显式创建主题,或者通过自定义部署过程创建。
流程
-
在代理配置中,将
auto.create.topics.enable
的值设置为false
。
11.1.2. 在 Kafka Connect 中配置自动主题创建
Kafka Connect 中的自动主题创建由 topic.creation.enable
属性控制。属性的默认值为 true
,启用自动主题创建,如下例所示:
topic.creation.enable = true
topic.creation.enable
属性的设置应用到 Connect 集群中的所有 worker。
Kafka Connect 自动主题创建需要您定义 Kafka Connect 在创建主题时适用的配置属性。您可以通过定义主题组,然后在 Debezium 连接器配置中指定主题配置属性,然后指定要应用到每个组的属性。连接器配置定义了默认主题创建组,以及可选的一个或多个自定义主题创建组。自定义主题创建组使用主题名称模式列表来指定组设置应用到的主题。
有关 Kafka Connect 与主题创建组匹配的详细信息,请参阅 主题创建组。有关如何将配置属性分配给组的更多信息,请参阅 主题创建组配置属性。
默认情况下,Kafka Connect 创建的主题根据模式 server.schema.table
命名,如 dbserver.myschema.inventory
。
流程
-
要防止 Kafka Connect 自动创建主题,请在 Kafka Connect 自定义资源中将
topic.creation.enable
的值设置为false
,如下例所示:
apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnect metadata: name: my-connect-cluster ... spec: config: topic.creation.enable: "false"
Kafka Connect 自动主题创建需要至少为 default
主题创建组设置 replication.factor
和 partitions
属性。对组有效,可从 Kafka 代理的默认值中获取所需属性的值。
11.1.3. 配置自动创建的主题
要使 Kafka Connect 自动创建主题,它需要源连接器中有关配置属性的信息,才能在创建主题时应用。您可以在每个 Debezium 连接器的配置中定义控制主题创建的属性。当 Kafka Connect 为连接器发出的事件记录创建主题,生成的主题会从适用的组获取其配置。该配置只适用于仅由该连接器发送的事件记录。
11.1.3.1. 主题创建组
一组主题属性与主题创建组关联。最少,您必须定义 默认
主题创建组并指定其配置属性。此外,您还可以定义一个或多个自定义主题创建组,并为每个组指定唯一的属性。
当您创建自定义主题创建组时,您可以根据主题名称模式为每个组定义成员主题。您可以指定描述每个组中包含或排除的主题的命名模式。include
和 exclude
属性包含用来定义主题名称模式的正则表达式的逗号分隔列表。例如,如果您希望组包含以字符串 dbserver1.inventory
开头的所有主题,请将其 topic.creation.inventory.include
属性的值设置为 dbserver1\\.inventory\\
。
如果您同时为自定义主题组指定 include
和 exclude
属性,则排除规则具有优先权,并覆盖包含的规则。
11.1.3.2. 主题创建组配置属性
默认
主题创建组,每个自定义组都与一组唯一的配置属性关联。您可以将组配置为包含任何 Kafka 主题级配置属性。例如,您可以为旧主题片段、保留时间、主题组指定 主题压缩类型 指定清理策略。您必须至少定义一组最小属性,以描述要创建的主题的配置。
如果没有注册自定义组,或者任何注册的组的 include
模式与要创建的任何主题的名称不匹配,则 Kafka Connect 使用默认
组的配置来创建主题。
有关配置主题的常规信息,请参阅在 OpenShift 上安装 Debezium 中的 Kafka 主题创建建议。
11.1.3.3. 指定 Debezium 默认主题创建组的配置
在使用 Kafka Connect 自动主题创建前,您必须创建一个默认主题创建组并为它定义配置。默认主题创建组的配置应用于名称与自定义主题创建组的 include
列表模式匹配的任何主题。
先决条件
在 Kafka Connect 自定义资源中,
metadata.annotations
中的use-connector-resources
值指定集群 Operator 使用 KafkaConnector 自定义资源在集群中配置连接器。例如:... metadata: name: my-connect-cluster annotations: strimzi.io/use-connector-resources: "true" ...
流程
要为
topic.creation.default
组定义属性,将它们添加到连接器自定义资源的spec.config
中,如下例所示:apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnector metadata: name: inventory-connector labels: strimzi.io/cluster: my-connect-cluster spec: ... config: ... topic.creation.default.replication.factor: 3 1 topic.creation.default.partitions: 10 2 topic.creation.default.cleanup.policy: compact 3 topic.creation.default.compression.type: lz4 4 ...
您可以在默认组配置中包含任何 Kafka 主题级配置属性。
项 | 描述 |
---|---|
1 |
|
2 |
|
3 |
|
4 |
|
自定义组只回退到所需的 replication.factor
和 partitions
属性 的默认
组设置。如果自定义主题组的配置未定义其他属性,则不会应用 默认组中
指定的值。
11.1.3.4. 指定 Debezium 自定义主题创建组的配置
您可以定义多个自定义主题组,每个主题组都有自己的配置。
流程
要定义自定义主题组,在连接器自定义资源中添加一个
topic.creation.<group_name>.include
property tospec.config
,后跟您要应用到自定义组中的主题的配置属性。以下示例显示了定义自定义主题创建组
清单和
applicationlogs
的自定义资源摘录:apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnector metadata: name: inventory-connector ... spec: ... config: ... 1 topic.creation.inventory.include: dbserver1\\.inventory\\.* 2 topic.creation.inventory.partitions: 20 topic.creation.inventory.cleanup.policy: compact topic.creation.inventory.delete.retention.ms: 7776000000 3 topic.creation.applicationlogs.include: dbserver1\\.logs\\.applog-.* 4 topic.creation.applicationlogs.exclude": dbserver1\\.logs\\.applog-old-.* 5 topic.creation.applicationlogs.replication.factor: 1 topic.creation.applicationlogs.partitions: 20 topic.creation.applicationlogs.cleanup.policy: delete topic.creation.applicationlogs.retention.ms: 7776000000 topic.creation.applicationlogs.compression.type: lz4 ... ...
项 | 描述 |
---|---|
1 |
定义 |
2 |
|
3 |
定义 |
4 |
|
5 |
|
11.1.3.5. 注册 Debezium 自定义主题创建组
在为任何自定义主题创建组指定配置后,请注册组。
流程
通过在连接器自定义资源中添加
topic.creation.groups
属性,并指定以逗号分隔的自定义主题创建组列表来注册自定义组。以下来自连接器自定义资源摘录会注册自定义主题创建组
清单
和应用程序日志
:apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnector metadata: name: inventory-connector ... spec: ... config: topic.creation.groups: inventory,applicationlogs ...
完成的配置
以下示例显示了一个已完成的配置,其中包含 默认
主题组的配置,以及 清单的
配置和应用程序logs
自定义主题创建组:
示例:配置默认主题创建和两个自定义组
apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnector metadata: name: inventory-connector ... spec: ... config: ... topic.creation.default.replication.factor: 3, topic.creation.default.partitions: 10, topic.creation.default.cleanup.policy: compact topic.creation.default.compression.type: lz4 topic.creation.groups: inventory,applicationlogs topic.creation.inventory.include: dbserver1\\.inventory\\.* topic.creation.inventory.partitions: 20 topic.creation.inventory.cleanup.policy: compact topic.creation.inventory.delete.retention.ms: 7776000000 topic.creation.applicationlogs.include: dbserver1\\.logs\\.applog-.* topic.creation.applicationlogs.exclude": dbserver1\\.logs\\.applog-old-.* topic.creation.applicationlogs.replication.factor: 1 topic.creation.applicationlogs.partitions: 20 topic.creation.applicationlogs.cleanup.policy: delete topic.creation.applicationlogs.retention.ms: 7776000000 topic.creation.applicationlogs.compression.type: lz4 ...