2.3. Kafka Connect 集群配置
使用 KafkaConnect 资源配置 Kafka Connect 部署。Kafka Connect 是一个在 Kafka 代理和其他使用连接器插件的系统间流传输数据的集成工具包。Kafka Connect 提供了一个框架,用于将 Kafka 与外部数据源或目标(如数据库)集成,如数据库,用于使用连接器导入或导出数据。连接器是提供所需的连接配置的插件。
第 6.2.61 节 “KafkaConnect 模式参考” 描述 KafkaConnect 资源的完整模式。
有关部署连接器插件的更多信息,请参阅使用连接器插件扩展 Kafka 连接。
2.3.1. 配置 Kafka 连接 复制链接链接已复制到粘贴板!
使用 Kafka Connect 为 Kafka 集群设置外部数据连接。使用 KafkaConnect 资源的属性来配置 Kafka Connect 部署。
KafkaConnector 配置
KafkaConnector 资源允许您以 OpenShift 原生的方式创建和管理 Kafka Connect 的连接器实例。
在 Kafka Connect 配置中,您可以通过添加 strimzi.io/use-connector-resources 注解来为 Kafka Connect 集群启用 KafkaConnectors。您还可以添加 构建配置,以便 AMQ Streams 使用数据连接所需的连接器插件自动构建容器镜像。Kafka Connect 连接器的外部配置通过 externalConfiguration 属性指定。
要管理连接器,您可以使用 KafkaConnector 自定义资源或 Kafka Connect REST API。KafkaConnector 资源必须部署到它们所链接的 Kafka Connect 集群相同的命名空间中。有关使用这些方法创建、重新配置或删除连接器的更多信息,请参阅 添加连接器。
连接器配置作为 HTTP 请求的一部分传递并存储在 Kafka 本身中。ConfigMap 和机密是用于存储配置和机密数据的标准 OpenShift 资源。您可以使用 ConfigMap 和 Secret 来配置连接器的特定元素。然后,您可以在 HTTP REST 命令中引用配置值,并在需要时使配置保持独立且更安全。这个方法特别适用于机密数据,如用户名、密码或证书。
处理大量信息
您可以调整配置来处理大量信息。如需更多信息,请参阅处理大量信息。
先决条件
- 一个 OpenShift 集群
- 正在运行的 Cluster Operator
有关运行的步骤,请参阅在 OpenShift 中部署和升级 AMQ Streams 指南:
流程
编辑
KafkaConnect资源的spec属性。您可以配置的属性显示在以下示例配置中:
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - 1
- 使用
KafkaConnect。 - 2
- 为 Kafka Connect 集群启用 KafkaConnectors。
- 3
- 用于运行服务的 worker 的副本节点数量。
- 4
- Kafka Connect 集群的身份验证,指定为 mTLS、基于令牌的 OAuth、基于 SASL 的 SCRAM-SHA-256/SCRAM-SHA-512 或 PLAIN。默认情况下,Kafka Connect 使用纯文本连接连接到 Kafka 代理。
- 5
- 用于连接到 Kafka Connect 集群的 Bootstrap 服务器。
- 6
- 使用密钥名称进行 TLS 加密,其中 TLS 证书以 X.509 格式存储。如果证书存储在同一 secret 中,它可以多次列出。
- 7
- worker 的 Kafka 连接配置 (而不是连接器)。可以提供标准 Apache Kafka 配置,仅限于不受 AMQ Streams 直接管理的属性。
- 8
- 9
- (必需)在推送新镜像的容器 registry 的配置。
- 10
- (必需)连接器插件及其工件列表,以添加到新容器镜像中。每个插件必须配置至少一个
工件。 - 11
- 12
- 13
- 指定 Kafka 连接日志记录器和日志级别 直接(
内联)或通过 ConfigMap 间接(外部)添加。自定义 ConfigMap 必须放在log4j.properties或log4j2.properties键下。对于 Kafka Connectlog4j.rootLogger日志记录器,您可以将日志级别设置为 INFO, ERROR, WARN, TRACE, DEBUG, FATAL 或 OFF。 - 14
- 使用 HealthCheck 可以知道何时重启一个容器(存活度)以及何时一个容器可以开始接受流量(就绪度)。
- 15
- Prometheus metrics,通过引用本例中的 Prometheus JMX exporter 配置的 ConfigMap 来启用。您可以使用对在
metricsConfig.valueFrom.configMapKeyRef.key下包含空文件的 ConfigMap 的引用来启用指标。 - 16
- JVM 配置选项 优化运行 Kafka Connect 的虚拟机(VM)的性能。
- 17
- ADVANCED OPTION: 容器镜像配置,这只在特殊情况下建议使用。
- 18
- SPECIALIZED OPTION: Rack awareness 配置用于部署。这是用于在同一位置(而不是跨地区)部署的专用选项。如果您希望连接器从最接近的副本而不是领导副本使用,则使用这个选项。在某些情况下,从最接近的副本消耗可以提高网络利用率或降低成本。
topologyKey必须与包含机架 ID 的节点标签匹配。此配置中使用的示例使用标准topology.kubernetes.io/zone标签指定区。要从最接近的副本使用,请在 Kafka 代理配置中启用RackAwareReplicaSelector。 - 19
- 模板自定义.这里的 pod 使用反关联性调度,因此 pod 不会调度到具有相同主机名的节点。
- 20
- 为分布式追踪设置环境变量。
创建或更新资源:
oc apply -f KAFKA-CONNECT-CONFIG-FILE
oc apply -f KAFKA-CONNECT-CONFIG-FILECopy to Clipboard Copied! Toggle word wrap Toggle overflow - 如果为 Kafka Connect 启用授权,请将 Kafka Connect 用户配置为启用对 Kafka Connect 消费者组和主题的访问。
2.3.2. 为多个实例配置 Kafka 连接 复制链接链接已复制到粘贴板!
如果您正在运行多个 Kafka Connect 实例,您必须更改以下配置属性 的默认配置 :
对于具有相同 group.id 的所有 Kafka Connect 实例,这三个主题的值必须相同。
除非更改默认设置,否则每个 Kafka Connect 实例都使用相同的值部署到同一 Kafka 集群。实际上,所有实例都与集群中运行的实例合并,使用相同的主题。
如果多个 Kafka Connect 集群试图使用相同的主题,Kafka Connect 将无法正常工作,并生成错误。
如果要运行多个 Kafka Connect 实例,请更改每个实例的这些属性的值。
2.3.3. 配置 Kafka 连接用户授权 复制链接链接已复制到粘贴板!
这个步骤描述了如何授权用户对 Kafka Connect 的访问权限。
当在 Kafka 中使用任何类型的授权时,Kafka Connect 用户需要读/写权限到消费者组和 Kafka Connect 的内部主题。
消费者组和内部主题的属性由 AMQ Streams 自动配置,也可以在 KafkaConnect 资源的 spec 中明确指定它们。
KafkaConnect 资源中的配置属性示例
此流程演示了如何在使用 简单 授权时提供访问权限。
简单授权使用 Kafka AclAuthorizer 插件处理的 ACL 规则来提供正确访问级别。有关将 KafkaUser 资源配置为使用简单授权的更多信息,请参阅 AclRule schema reference。
先决条件
- 一个 OpenShift 集群
- 正在运行的 Cluster Operator
流程
编辑
KafkaUser资源中的授权属性,为用户提供访问权限。在以下示例中,使用
字面名称值为 Kafka Connect 主题和消费者组配置访问权限:Expand 属性 名称 offset.storage.topicconnect-cluster-offsetsstatus.storage.topicconnect-cluster-statusconfig.storage.topicconnect-cluster-configsgroupconnect-clusterCopy to Clipboard Copied! Toggle word wrap Toggle overflow 创建或更新资源。
oc apply -f KAFKA-USER-CONFIG-FILE
oc apply -f KAFKA-USER-CONFIG-FILECopy to Clipboard Copied! Toggle word wrap Toggle overflow
2.3.4. Kafka Connect 集群资源列表 复制链接链接已复制到粘贴板!
以下资源由 OpenShift 集群中的 Cluster Operator 创建:
- connect-cluster-name-connect
提供给以下 Kafka Connect 资源的名称:
-
创建 Kafka Connect worker 节点 pod 的部署(当禁用
StableConnectIdentities功能门时)。 -
创建 Kafka Connect worker 节点 pod 的 StrimziPodSet (当启用了
StableConnectIdentities功能门时)。 -
为连接 pod 提供稳定的 DNS 名称的无头服务(当启用了
StableConnectIdentities功能门时)。 - 为 Kafka Connect worker 节点配置的 Pod Disruption Budget。
-
创建 Kafka Connect worker 节点 pod 的部署(当禁用
- connect-cluster-name-connect-idx
-
Kafka Connect StrimziPodSet 创建的 Pod (当启用了
StableConnectIdentities功能门时)。 - connect-cluster-name-connect-api
- 公开用于管理 Kafka Connect 集群的 REST 接口的服务。
- connect-cluster-name-config
- 包含 Kafka Connect 辅助配置的 ConfigMap,并由 Kafka 代理 pod 作为卷挂载。
2.3.5. 与红帽构建的 Debezium 集成以更改数据捕获 复制链接链接已复制到粘贴板!
红帽构建的 Debezium 是一个分布式更改数据捕获平台。它捕获数据库中的行级更改,创建更改事件记录,并将记录流传输到 Kafka 主题。Debezium 基于 Apache Kafka 构建。您可以将红帽构建的 Debezium 与 AMQ Streams 一起部署并集成。部署 AMQ Streams 后,您可以通过 Kafka Connect 将 Debezium 部署为连接器配置。Debezium 将更改事件记录传递到 OpenShift 上的 AMQ Streams。应用程序可以读取 这些更改事件流,并按发生更改事件的顺序访问更改事件。
Debezium 具有多个用途,包括:
- 数据复制
- 更新缓存和搜索索引
- 简化单体式应用程序
- 数据集成
- 启用流查询
要捕获数据库更改,请使用 Debezium 数据库连接器部署 Kafka 连接。您可以配置 KafkaConnector 资源来定义连接器实例。
有关将红帽构建的 Debezium 与 AMQ Streams 一起部署的更多信息,请参阅产品文档。文档包括 Debezium 入门指南,指导您完成设置数据库更新事件记录所需的服务和连接器。