9.8. 配置 Kafka 连接连接器
KafkaConnector 资源提供了一种 Kubernetes 原生的方法来管理 Cluster Operator 连接器。要创建、删除或重新配置带有 KafkaConnector 资源的连接器,您必须在 KafkaConnect 自定义资源中将 use-connector-resources 注解设置为 true。
启用 KafkaConnectors 的注解
当在 KafkaConnect 配置中启用了 use-connector-resources 注解时,您必须使用 KafkaConnector 资源定义和管理连接器。
另外,您可以使用 Kafka Connect REST API 而不是 KafkaConnector 资源管理连接器。要使用 API,您必须删除 strimzi.io/use-connector-resources 注解,以便在 KafkaConnect 中使用 KafkaConnector 资源。
KafkaConnector 资源提供了在 Kafka Connect 集群中创建连接器所需的配置,它根据 KafkaConnect 配置中指定的 Kafka 集群交互。Kafka 集群不需要由 Apache Kafka 的 Streams 管理,或部署到 Kubernetes 集群。
同一 Kubernetes 集群中包含的 Kafka 组件
配置还指定连接器实例如何与外部数据系统交互,包括任何所需的身份验证方法。另外,您必须定义要监视的数据。例如,在从数据库读取数据的源连接器中,配置可能包含数据库名称。您还可以通过指定目标主题名称来定义此数据应该在 Kafka 中放置的位置。
使用 tasksMax 属性指定最大任务数。例如,带有 tasksMax: 2 的源连接器可能会将源数据导入两个任务。
源连接器配置示例
- 1
KafkaConnector资源的名称,用作连接器的名称。使用对 OpenShift 资源有效的任何名称。- 2
- 在其中创建连接器实例的 Kafka Connect 集群的名称。连接器必须部署到它们所链接的 Kafka Connect 集群相同的命名空间中。
- 3
- 连接器类的全名。这应该存在于 Kafka Connect 集群使用的镜像中。
- 4
- 连接器可创建的最大 Kafka Connect 任务数量。
- 5
- 启用自动重启失败的连接器和任务。默认情况下,重启数量是无限的,但您可以使用
maxRestarts属性设置自动重启次数的最大值。 - 6
- 连接器配置 作为键值对。
- 7
- 外部数据文件的位置。在本例中,我们将
FileStreamSourceConnector配置为从/opt/kafka/LICENSE文件中读取。 - 8
- 将源数据发布到的 Kafka 主题。
要包括外部连接器配置,如存储在 secret 中的用户访问凭证,请使用 KafkaConnect 资源的 template 属性。您还可以使用 配置提供程序 来加载值。
9.8.1. 手动停止或暂停 Kafka Connect 连接器 复制链接链接已复制到粘贴板!
如果您使用 KafkaConnector 资源配置连接器,请使用 state 配置来停止或暂停连接器。与连接器和任务保持实例化的暂停状态不同,停止连接器只保留配置,且没有活跃进程。从运行停止连接器可能更适合长时间运行,而不是只暂停。虽然暂停的连接器速度更快恢复,但已停止的连接器具有释放内存和资源的优点。
state 配置替换 KafkaConnectorSpec 模式中的(已弃用) pause 配置,它允许在连接器上暂停。如果您之前使用 pause 配置来暂停连接器,我们建议您只使用 state 配置过渡到 以避免冲突。
先决条件
- Cluster Operator 正在运行。
流程
查找控制您要暂停或停止连接器的
KafkaConnector自定义资源的名称:oc get KafkaConnector
oc get KafkaConnectorCopy to Clipboard Copied! Toggle word wrap Toggle overflow 编辑
KafkaConnector资源,以停止或暂停连接器。停止 Kafka Connect 连接器的配置示例
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 将
state配置更改为stopped或paused。当此属性没有设置时,连接器的默认状态为running。对
KafkaConnector配置应用更改。您可以通过将
state改为running,或删除配置来恢复连接器。
另外,您可以 公开 Kafka Connect API,并使用 stop 和 pause 端点停止连接器运行。例如,PUT /connectors/<connector_name>/stop。然后,您可以使用 resume 端点重启它。
9.8.2. 手动重启 Kafka 连接连接器 复制链接链接已复制到粘贴板!
如果您使用 KafkaConnector 资源来管理连接器,请使用 strimzi.io/restart 注解来手动触发连接器的重启。
先决条件
- Cluster Operator 正在运行。
流程
查找控制您要重启的 Kafka 连接器的
KafkaConnector自定义资源的名称:oc get KafkaConnector
oc get KafkaConnectorCopy to Clipboard Copied! Toggle word wrap Toggle overflow 通过在 OpenShift 中注解
KafkaConnector资源来重启连接器。oc annotate KafkaConnector <kafka_connector_name> strimzi.io/restart="true"
oc annotate KafkaConnector <kafka_connector_name> strimzi.io/restart="true"Copy to Clipboard Copied! Toggle word wrap Toggle overflow 重启注解设置为true。等待下一个协调发生(默认为两分钟)。
只要协调过程检测到注解,Kafka 连接器就会重启。当 Kafka Connect 接受重启请求时,注解会从
KafkaConnector自定义资源中删除。
9.8.3. 手动重启 Kafka Connect 连接器任务 复制链接链接已复制到粘贴板!
如果您使用 KafkaConnector 资源来管理连接器,请使用 strimzi.io/restart-task 注解来手动触发连接器任务的重启。
先决条件
- Cluster Operator 正在运行。
流程
查找控制您要重启的 Kafka 连接器任务的
KafkaConnector自定义资源名称:oc get KafkaConnector
oc get KafkaConnectorCopy to Clipboard Copied! Toggle word wrap Toggle overflow 查找从
KafkaConnector自定义资源重启的任务 ID:oc describe KafkaConnector <kafka_connector_name>
oc describe KafkaConnector <kafka_connector_name>Copy to Clipboard Copied! Toggle word wrap Toggle overflow 任务 ID 是非负整数,从 0 开始。
通过注解 OpenShift 中的
KafkaConnector资源,使用 ID 重启连接器任务:oc annotate KafkaConnector <kafka_connector_name> strimzi.io/restart-task="0"
oc annotate KafkaConnector <kafka_connector_name> strimzi.io/restart-task="0"Copy to Clipboard Copied! Toggle word wrap Toggle overflow 在本例中,任务
0被重启。等待下一个协调发生(默认为两分钟)。
Kafka 连接器任务会重启,只要协调过程检测到注解。当 Kafka Connect 接受重启请求时,注解会从
KafkaConnector自定义资源中删除。