6.4. 部署 Kafka Connect
Kafka Connect 是一个使用连接器插件在 Kafka 代理和其他系统间流传输数据的集成工具包。Kafka Connect 提供了一个框架,用于将 Kafka 与外部数据源或目标(如数据库或消息传递系统)集成,用于使用连接器导入或导出数据。连接器是提供所需的连接配置的插件。
在 AMQ Streams 中,Kafka Connect 以分布式模式部署。Kafka Connect 也可以以独立模式工作,但 AMQ Streams 不支持它。
使用 连接器 的概念,Kafka Connect 提供了将大量数据移至和移出 Kafka 集群的框架,同时保持可扩展性和可靠性。
Cluster Operator 管理使用 KafkaConnector 资源部署的 Kafka Connect 集群,以及利用 KafkaConnector 资源创建的连接器。
要使用 Kafka Connect,您需要执行以下操作。
术语连接器(connector)可以代表在 Kafka Connect 中运行的一个连接器实例,也可以代表一个连接器类。在本指南中,当从上下文中清除含义时,会使用 连接器。
6.4.1. 部署 Kafka 连接到 OpenShift 集群 复制链接链接已复制到粘贴板!
此流程演示了如何使用 Cluster Operator 将 Kafka Connect 集群部署到 OpenShift 集群。
Kafka Connect 集群部署使用可配置的节点(也称为 worker)实现,它将连接器的工作负载分发为 任务,以便消息流具有高度可扩展且可靠的。
部署使用 YAML 文件来提供规格来创建 KafkaConnect 资源。
AMQ Streams 提供 示例配置文件。在此过程中,我们使用以下示例文件:
-
examples/connect/kafka-connect.yaml
如果部署 Kafka Connect 集群以并行运行,则每个实例都必须为内部 Kafka Connect 主题使用唯一名称。要做到这一点,配置每个 Kafka Connect 实例来替换默认值。
流程
部署 Kafka 连接到您的 OpenShift 集群。使用
examples/connect/kafka-connect.yaml文件来部署 Kafka Connect。oc apply -f examples/connect/kafka-connect.yaml
oc apply -f examples/connect/kafka-connect.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow 检查部署的状态:
oc get pods -n <my_cluster_operator_namespace>
oc get pods -n <my_cluster_operator_namespace>Copy to Clipboard Copied! Toggle word wrap Toggle overflow 输出显示部署名称和就绪
NAME READY STATUS RESTARTS my-connect-cluster-connect-<pod_id> 1/1 Running 0
NAME READY STATUS RESTARTS my-connect-cluster-connect-<pod_id> 1/1 Running 0Copy to Clipboard Copied! Toggle word wrap Toggle overflow my-connect-cluster是 Kafka Connect 集群的名称。标识创建的每个 pod 的 pod ID。
在默认部署中,您可以创建一个单个的 Kafka Connect pod。
READY显示就绪/预期的副本数。当STATUS显示为Running时,部署成功。
6.4.2. Kafka Connect 集群资源列表 复制链接链接已复制到粘贴板!
以下资源由 OpenShift 集群中的 Cluster Operator 创建:
- <connect_cluster_name>-connect
提供给以下 Kafka Connect 资源的名称:
-
创建 Kafka Connect worker 节点 pod 的部署(当
StableConnectIdentities功能门被禁用时)。 -
StrimziPodSet 创建 Kafka Connect worker 节点 pod (当启用了
StableConnectIdentities功能门时)。 -
为 Connect pod 提供稳定 DNS 名称的无头服务(启用了
StableConnectIdentities功能门时)。 - 为 Kafka Connect worker 节点配置的 Pod Disruption Budget。
-
创建 Kafka Connect worker 节点 pod 的部署(当
- <connect_cluster_name>-connect-<pod_id>
-
由 Kafka Connect StrimziPodSet 创建的 Pod (当启用了
StableConnectIdentities功能门时)。 - <connect_cluster_name>-connect-api
- 公开用于管理 Kafka Connect 集群的 REST 接口的服务。
- <connect_cluster_name>-config
- 包含 Kafka Connect 辅助配置的 ConfigMap,并由 Kafka 代理 pod 挂载为卷。