9.8. 配置 Kafka 连接连接器
KafkaConnector
资源提供了一种 Kubernetes 原生的方法来管理 Cluster Operator 连接器。要创建、删除或重新配置带有 KafkaConnector
资源的连接器,您必须在 KafkaConnect
自定义资源中将 use-connector-resources
注解设置为 true
。
启用 KafkaConnectors 的注解
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect-cluster annotations: strimzi.io/use-connector-resources: "true" # ...
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: my-connect-cluster
annotations:
strimzi.io/use-connector-resources: "true"
# ...
当在 KafkaConnect
配置中启用了 use-connector-resources
注解时,您必须使用 KafkaConnector
资源定义和管理连接器。
另外,您可以使用 Kafka Connect REST API 而不是 KafkaConnector
资源管理连接器。要使用 API,您必须删除 strimzi.io/use-connector-resources
注解,以便在 KafkaConnect
中使用 KafkaConnector
资源。
KafkaConnector
资源提供了在 Kafka Connect 集群中创建连接器所需的配置,它根据 KafkaConnect
配置中指定的 Kafka 集群交互。Kafka 集群不需要由 Apache Kafka 的 Streams 管理,或部署到 Kubernetes 集群。
同一 Kubernetes 集群中包含的 Kafka 组件
配置还指定连接器实例如何与外部数据系统交互,包括任何所需的身份验证方法。另外,您必须定义要监视的数据。例如,在从数据库读取数据的源连接器中,配置可能包含数据库名称。您还可以通过指定目标主题名称来定义此数据应该在 Kafka 中放置的位置。
使用 tasksMax
属性指定最大任务数。例如,带有 tasksMax: 2
的源连接器可能会将源数据导入两个任务。
源连接器配置示例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: my-source-connector labels: strimzi.io/cluster: my-connect-cluster spec: class: org.apache.kafka.connect.file.FileStreamSourceConnector tasksMax: 2 autoRestart: enabled: true config: file: "/opt/kafka/LICENSE" topic: my-topic # ...
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: my-source-connector
labels:
strimzi.io/cluster: my-connect-cluster
spec:
class: org.apache.kafka.connect.file.FileStreamSourceConnector
tasksMax: 2
autoRestart:
enabled: true
config:
file: "/opt/kafka/LICENSE"
topic: my-topic
# ...
- 1
KafkaConnector
资源的名称,用作连接器的名称。使用对 OpenShift 资源有效的任何名称。- 2
- 在其中创建连接器实例的 Kafka Connect 集群的名称。连接器必须部署到它们所链接的 Kafka Connect 集群相同的命名空间中。
- 3
- 连接器类的全名。这应该存在于 Kafka Connect 集群使用的镜像中。
- 4
- 连接器可创建的最大 Kafka Connect 任务数量。
- 5
- 启用自动重启失败的连接器和任务。默认情况下,重启数量是无限的,但您可以使用
maxRestarts
属性设置自动重启次数的最大值。 - 6
- 连接器配置 作为键值对。
- 7
- 外部数据文件的位置。在本例中,我们将
FileStreamSourceConnector
配置为从/opt/kafka/LICENSE
文件中读取。 - 8
- 将源数据发布到的 Kafka 主题。
要包括外部连接器配置,如存储在 secret 中的用户访问凭证,请使用 KafkaConnect
资源的 template
属性。您还可以使用 配置提供程序 来加载值。
9.8.1. 手动停止或暂停 Kafka Connect 连接器
如果您使用 KafkaConnector
资源配置连接器,请使用 state
配置来停止或暂停连接器。与连接器和任务保持实例化的暂停状态不同,停止连接器只保留配置,且没有活跃进程。从运行停止连接器可能更适合长时间运行,而不是只暂停。虽然暂停的连接器速度更快恢复,但已停止的连接器具有释放内存和资源的优点。
state
配置替换 KafkaConnectorSpec
模式中的(已弃用) pause
配置,它允许在连接器上暂停。如果您之前使用 pause
配置来暂停连接器,我们建议您只使用 state
配置过渡到 以避免冲突。
先决条件
- Cluster Operator 正在运行。
流程
查找控制您要暂停或停止连接器的
KafkaConnector
自定义资源的名称:oc get KafkaConnector
oc get KafkaConnector
Copy to Clipboard Copied! 编辑
KafkaConnector
资源,以停止或暂停连接器。停止 Kafka Connect 连接器的配置示例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: my-source-connector labels: strimzi.io/cluster: my-connect-cluster spec: class: org.apache.kafka.connect.file.FileStreamSourceConnector tasksMax: 2 config: file: "/opt/kafka/LICENSE" topic: my-topic state: stopped # ...
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: my-source-connector labels: strimzi.io/cluster: my-connect-cluster spec: class: org.apache.kafka.connect.file.FileStreamSourceConnector tasksMax: 2 config: file: "/opt/kafka/LICENSE" topic: my-topic state: stopped # ...
Copy to Clipboard Copied! 将
state
配置更改为stopped
或paused
。当此属性没有设置时,连接器的默认状态为running
。对
KafkaConnector
配置应用更改。您可以通过将
state
改为running
,或删除配置来恢复连接器。
另外,您可以 公开 Kafka Connect API,并使用 stop
和 pause
端点停止连接器运行。例如,PUT /connectors/<connector_name>/stop
。然后,您可以使用 resume
端点重启它。
9.8.2. 手动重启 Kafka 连接连接器
如果您使用 KafkaConnector
资源来管理连接器,请使用 strimzi.io/restart
注解来手动触发连接器的重启。
先决条件
- Cluster Operator 正在运行。
流程
查找控制您要重启的 Kafka 连接器的
KafkaConnector
自定义资源的名称:oc get KafkaConnector
oc get KafkaConnector
Copy to Clipboard Copied! 通过在 OpenShift 中注解
KafkaConnector
资源来重启连接器。oc annotate KafkaConnector <kafka_connector_name> strimzi.io/restart="true"
oc annotate KafkaConnector <kafka_connector_name> strimzi.io/restart="true"
Copy to Clipboard Copied! 重启
注解设置为true
。等待下一个协调发生(默认为两分钟)。
只要协调过程检测到注解,Kafka 连接器就会重启。当 Kafka Connect 接受重启请求时,注解会从
KafkaConnector
自定义资源中删除。
9.8.3. 手动重启 Kafka Connect 连接器任务
如果您使用 KafkaConnector
资源来管理连接器,请使用 strimzi.io/restart-task
注解来手动触发连接器任务的重启。
先决条件
- Cluster Operator 正在运行。
流程
查找控制您要重启的 Kafka 连接器任务的
KafkaConnector
自定义资源名称:oc get KafkaConnector
oc get KafkaConnector
Copy to Clipboard Copied! 查找从
KafkaConnector
自定义资源重启的任务 ID:oc describe KafkaConnector <kafka_connector_name>
oc describe KafkaConnector <kafka_connector_name>
Copy to Clipboard Copied! 任务 ID 是非负整数,从 0 开始。
通过注解 OpenShift 中的
KafkaConnector
资源,使用 ID 重启连接器任务:oc annotate KafkaConnector <kafka_connector_name> strimzi.io/restart-task="0"
oc annotate KafkaConnector <kafka_connector_name> strimzi.io/restart-task="0"
Copy to Clipboard Copied! 在本例中,任务
0
被重启。等待下一个协调发生(默认为两分钟)。
Kafka 连接器任务会重启,只要协调过程检测到注解。当 Kafka Connect 接受重启请求时,注解会从
KafkaConnector
自定义资源中删除。