9.7. 配置 Kafka Connect
更新 KafkaConnect
自定义资源的 spec
属性来配置 Kafka Connect 部署。
使用 Kafka Connect 为 Kafka 集群设置外部数据连接。使用 KafkaConnect
资源的属性来配置 Kafka Connect 部署。
您还可以使用 KafkaConnect
资源来指定以下内容:
- 构建包含插件的容器镜像的连接器插件配置,以建立连接
- 配置运行连接器的 Kafka Connect worker pod
-
使用
KafkaConnector
资源管理连接器的注解
Cluster Operator 管理使用 KafkaConnector 资源部署的 Kafka Connect
集群,以及利用 KafkaConnector
资源创建的连接器。
要深入了解 Kafka Connect 集群配置选项,请参阅 自定义资源 API 参考。
处理大量信息
您可以调整配置以处理大量信息。如需更多信息,请参阅 处理大量信息。
KafkaConnect
自定义资源配置示例
- 1
- 使用
KafkaConnect
。 - 2
- 启用
KafkaConnector
资源来启动、停止和管理连接器实例。 - 3
- 运行任务的 worker 的副本节点数量。
- 4
- 连接到 Kafka 集群的 bootstrap 地址。该地址采用 <
cluster_name>-kafka-bootstrap:<port_number>
格式。Kafka 集群不需要由 Streams for Apache Kafka 管理,或部署到 Kubernetes 集群。 - 5
- 运行连接器及其任务的 worker 连接(而不是连接器)。标准 Apache Kafka 配置可能会提供,仅限于不直接由 Apache Kafka 的 Streams 管理的属性。在本例中,指定了 JSON 转换器。Kafka Connect 使用的内部主题设置了 3 的复制因子(生产环境的最小要求)。在创建主题后更改复制因素无效。
- 6
- 为保留支持的资源(当前
cpu
和memory
)的请求,以及指定可消耗的最大资源的限制。 - 7
- Kafka Connect 集群的身份验证,指定为 mTLS、基于令牌的 OAuth、基于 SASL 的 SCRAM-SHA-256/SCRAM-SHA-512 或 PLAIN。默认情况下,Kafka Connect 使用纯文本连接连接到 Kafka 代理。
- 8
- 用于加密到 Kafka 集群的 TLS 配置,可信证书存储在指定 secret 中的 X.509 格式。
- 9
- 构建用于自动使用连接器插件构建容器镜像的配置属性。
- 10
- (必需)推送新镜像的容器 registry 的配置。
- 11
- (必需)连接器插件及其工件列表,以添加到新容器镜像中。每个插件必须配置至少一个
工件
。 - 12
- 指定 Kafka Connect 日志记录器和日志级别直接(
内联
)或通过 ConfigMap 间接(外部
)。自定义 Log4j 配置必须放在 ConfigMap 中的log4j.properties
或log4j2.properties
键下。对于 Kafka Connectlog4j.rootLogger
日志记录器,您可以将日志级别设置为 INFO, ERROR, WARN, TRACE, DEBUG,FATAL 或 OFF。 - 13
- 检查检查以了解何时重启容器(存活度)以及何时容器可以接受流量(就绪度)。
- 14
- Prometheus 指标,通过引用包含在此示例中 Prometheus JMX 导出器配置的 ConfigMap 启用。您可以使用对
metricsConfig.valueFrom.configMapKeyRef.key
下包含空文件的 ConfigMap 的引用来启用指标。 - 15
- JVM 配置选项,用于优化运行 Kafka Connect 的虚拟机(VM)的性能。
- 16
- ProductShortName OPTION: 容器镜像配置,仅在特殊情况下推荐使用。
- 17
- SPECIALIZED OPTION:部署的机架感知配置。这是用于在同一位置(而非跨地区)部署的专用选项。如果您希望连接器从最接近的副本而不是领导副本使用,则使用此选项。在某些情况下,使用来自最接近的副本的消耗可以提高网络利用率或降低成本。
topologyKey
必须与包含机架 ID 的节点标签匹配。此配置中使用的示例使用标准topology.kubernetes.io/zone
标签指定区。要从最接近的副本使用,请在 Kafka 代理配置中启用RackAwareReplicaSelector
。 - 18
- 模板自定义。此处的 pod 使用反关联性调度,因此 pod 不会调度到具有相同主机名的节点。
- 19
- 为分布式追踪设置环境变量,并将凭证传递给连接器。
- 20
- 使用 OpenTelemetry 启用分布式追踪。
9.7.1. 为多个实例配置 Kafka 连接 复制链接链接已复制到粘贴板!
默认情况下,Apache Kafka 的 Streams 配置 Kafka Connect 使用的内部主题的组 ID 和名称。在运行多个 Kafka Connect 实例时,您必须使用以下配置属性更改 这些默认设置
:
对于具有相同 group.id
的所有实例,三个主题的值必须相同。
除非修改了这些默认设置,否则每个连接到同一 Kafka 集群的实例都会使用相同的值部署。在实践中,这意味着所有实例组成一个集群并使用相同的内部主题。
尝试使用同一内部主题的多个实例将导致意外错误,因此您必须更改每个实例的这些属性值。
9.7.2. 配置 Kafka Connect 用户授权 复制链接链接已复制到粘贴板!
在 Kafka 中使用授权时,Kafka Connect 用户需要对集群组和 Kafka Connect 的内部主题进行读/写访问。此流程概述了如何使用 简单
授权和 ACL 授予访问权限。
Kafka Connect 集群组 ID 和内部主题的属性由 Apache Kafka 的 Streams 配置。另外,您可以在 KafkaConnect
资源的 spec
中显式定义它们。这在 为多个实例配置 Kafka Connect 时很有用,因为运行多个 Kafka Connect 实例时组 ID 和主题的值必须有所不同。
简单授权使用由 Kafka AclAuthorizer
和 StandardAuthorizer
插件管理的 ACL 规则来确保适当的访问级别。有关将 KafkaUser
资源配置为使用简单授权的更多信息,请参阅 AclRule
模式参考。
先决条件
- 一个 OpenShift 集群
- 正在运行的 Cluster Operator
流程
编辑
KafkaUser
资源中的authorization
属性,为用户提供访问权限。使用
字面
名称值为 Kafka Connect 主题和集群组配置访问权限。下表显示了为主题和集群组 ID 配置的默认名称。Expand 表 9.2. 访问权限配置的名称 属性 名称 offset.storage.topic
connect-cluster-offsets
status.storage.topic
connect-cluster-status
config.storage.topic
connect-cluster-configs
group
connect-cluster
在本例中,默认名称用于指定访问权限。如果您要为 Kafka Connect 实例使用不同的名称,请在 ACL 配置中使用这些名称。
简单授权配置示例
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 创建或更新资源。
oc apply -f KAFKA-USER-CONFIG-FILE
oc apply -f KAFKA-USER-CONFIG-FILE
Copy to Clipboard Copied! Toggle word wrap Toggle overflow