9.6. 配置 Kafka Connect
更新 KafkaConnect 自定义资源的 spec 属性来配置 Kafka Connect 部署。
使用 Kafka Connect 为 Kafka 集群设置外部数据连接。使用 KafkaConnect 资源的属性来配置 Kafka Connect 部署。
要深入了解 Kafka Connect 集群配置选项,请参阅 Apache Kafka 自定义资源 API 参考。
KafkaConnector 配置
KafkaConnector 资源允许您以 OpenShift 原生的方式创建和管理 Kafka Connect 的连接器实例。
在 Kafka Connect 配置中,您可以通过添加 strimzi.io/use-connector-resources 注解来为 Kafka Connect 集群启用 KafkaConnectors。您还可以添加 构建配置,以便 Apache Kafka 的 Streams 会自动使用您数据连接所需的连接器插件构建容器镜像。Kafka Connect 连接器的外部配置通过 externalConfiguration 属性指定。
要管理连接器,您可以使用 KafkaConnector 自定义资源或 Kafka Connect REST API。KafkaConnector 资源必须部署到它们所链接的 Kafka Connect 集群相同的命名空间中。有关使用这些方法创建、重新配置或删除连接器的更多信息,请参阅 添加连接器。
连接器配置作为 HTTP 请求的一部分传递给 Kafka Connect,并存储在 Kafka 本身中。ConfigMap 和机密是用于存储配置和机密数据的标准 OpenShift 资源。您可以使用 ConfigMap 和 Secret 来配置连接器的特定元素。然后,您可以在 HTTP REST 命令中引用配置值,这样可保持配置独立且更安全。此方法特别适用于机密数据,如用户名、密码或证书。
处理大量信息
您可以调整配置以处理大量信息。如需更多信息,请参阅 处理大量信息。
KafkaConnect 自定义资源配置示例
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: my-connect-cluster
annotations:
strimzi.io/use-connector-resources: "true"
spec:
replicas: 3
authentication:
type: tls
certificateAndKey:
certificate: source.crt
key: source.key
secretName: my-user-source
bootstrapServers: my-cluster-kafka-bootstrap:9092
tls:
trustedCertificates:
- secretName: my-cluster-cluster-cert
certificate: ca.crt
- secretName: my-cluster-cluster-cert
certificate: ca2.crt
config:
group.id: my-connect-cluster
offset.storage.topic: my-connect-cluster-offsets
config.storage.topic: my-connect-cluster-configs
status.storage.topic: my-connect-cluster-status
key.converter: org.apache.kafka.connect.json.JsonConverter
value.converter: org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable: true
value.converter.schemas.enable: true
config.storage.replication.factor: 3
offset.storage.replication.factor: 3
status.storage.replication.factor: 3
build:
output:
type: docker
image: my-registry.io/my-org/my-connect-cluster:latest
pushSecret: my-registry-credentials
plugins:
- name: connector-1
artifacts:
- type: tgz
url: <url_to_download_connector_1_artifact>
sha512sum: <SHA-512_checksum_of_connector_1_artifact>
- name: connector-2
artifacts:
- type: jar
url: <url_to_download_connector_2_artifact>
sha512sum: <SHA-512_checksum_of_connector_2_artifact>
externalConfiguration:
env:
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: aws-creds
key: awsAccessKey
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: aws-creds
key: awsSecretAccessKey
resources:
requests:
cpu: "1"
memory: 2Gi
limits:
cpu: "2"
memory: 2Gi
logging:
type: inline
loggers:
log4j.rootLogger: INFO
readinessProbe:
initialDelaySeconds: 15
timeoutSeconds: 5
livenessProbe:
initialDelaySeconds: 15
timeoutSeconds: 5
metricsConfig:
type: jmxPrometheusExporter
valueFrom:
configMapKeyRef:
name: my-config-map
key: my-key
jvmOptions:
"-Xmx": "1g"
"-Xms": "1g"
image: my-org/my-image:latest
rack:
topologyKey: topology.kubernetes.io/zone
template:
pod:
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: application
operator: In
values:
- postgresql
- mongodb
topologyKey: "kubernetes.io/hostname"
connectContainer:
env:
- name: OTEL_SERVICE_NAME
value: my-otel-service
- name: OTEL_EXPORTER_OTLP_ENDPOINT
value: "http://otlp-host:4317"
tracing:
type: opentelemetry
- 1
- 使用
KafkaConnect。 - 2
- 为 Kafka Connect 集群启用 KafkaConnectors。
- 3
- 运行任务的 worker 的副本节点数量。
- 4
- Kafka Connect 集群的身份验证,指定为 mTLS、基于令牌的 OAuth、基于 SASL 的 SCRAM-SHA-256/SCRAM-SHA-512 或 PLAIN。默认情况下,Kafka Connect 使用纯文本连接连接到 Kafka 代理。
- 5
- 用于连接到 Kafka 集群的 bootstrap 服务器。
- 6
- TLS 加密,使用密钥名称,其中 TLS 证书存储为集群的 X.509 格式。如果证书存储在同一 secret 中,则可以多次列出。
- 7
- worker 的 Kafka 连接配置(而不是连接器)。标准 Apache Kafka 配置可能会提供,仅限于不直接由 Apache Kafka 的 Streams 管理的属性。
- 8
- 构建用于自动使用连接器插件构建容器镜像的配置属性。
- 9
- (必需)推送新镜像的容器 registry 的配置。
- 10
- (必需)连接器插件及其工件列表,以添加到新容器镜像中。每个插件必须配置至少一个
工件。 - 11
- 使用环境变量的连接器的外部配置,如此处或卷所示。您还可以使用配置供应商插件从外部来源加载配置值。
- 12
- 为保留支持的资源(当前
cpu和memory)的请求,以及指定可消耗的最大资源的限制。 - 13
- 指定 Kafka Connect 日志记录器和日志级别直接(
内联)或通过 ConfigMap 间接(外部)。自定义 Log4j 配置必须放在 ConfigMap 中的log4j.properties或log4j2.properties键下。对于 Kafka Connectlog4j.rootLogger日志记录器,您可以将日志级别设置为 INFO, ERROR, WARN, TRACE, DEBUG,FATAL 或 OFF。 - 14
- 检查检查以了解何时重启容器(存活度)以及何时容器可以接受流量(就绪度)。
- 15
- Prometheus 指标,通过引用包含在此示例中 Prometheus JMX 导出器配置的 ConfigMap 启用。您可以使用对
metricsConfig.valueFrom.configMapKeyRef.key下包含空文件的 ConfigMap 的引用来启用指标。 - 16
- JVM 配置选项,用于优化运行 Kafka Connect 的虚拟机(VM)的性能。
- 17
- ProductShortName OPTION: 容器镜像配置,仅在特殊情况下推荐使用。
- 18
- SPECIALIZED OPTION:部署的机架感知配置。这是用于在同一位置(而非跨地区)部署的专用选项。如果您希望连接器从最接近的副本而不是领导副本使用,则使用此选项。在某些情况下,使用来自最接近的副本的消耗可以提高网络利用率或降低成本。
topologyKey必须与包含机架 ID 的节点标签匹配。此配置中使用的示例使用标准topology.kubernetes.io/zone标签指定区。要从最接近的副本使用,请在 Kafka 代理配置中启用RackAwareReplicaSelector。 - 19
- 模板自定义。此处的 pod 使用反关联性调度,因此 pod 不会调度到具有相同主机名的节点。
- 20
- 为分布式追踪设置环境变量。
- 21
- 使用 OpenTelemetry 启用分布式追踪。
9.6.1. 为多个实例配置 Kafka 连接 复制链接链接已复制到粘贴板!
默认情况下,Apache Kafka 的 Streams 配置 Kafka Connect 使用的内部主题的组 ID 和名称。在运行多个 Kafka Connect 实例时,您必须使用以下配置属性更改 这些默认设置 :
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: my-connect
spec:
config:
group.id: my-connect-cluster
offset.storage.topic: my-connect-cluster-offsets
config.storage.topic: my-connect-cluster-configs
status.storage.topic: my-connect-cluster-status
# ...
# ...
对于具有相同 group.id 的所有实例,三个主题的值必须相同。
除非修改了这些默认设置,否则每个连接到同一 Kafka 集群的实例都会使用相同的值部署。在实践中,这意味着所有实例组成一个集群并使用相同的内部主题。
尝试使用同一内部主题的多个实例将导致意外错误,因此您必须更改每个实例的这些属性值。
9.6.2. 配置 Kafka Connect 用户授权 复制链接链接已复制到粘贴板!
在 Kafka 中使用授权时,Kafka Connect 用户需要对集群组和 Kafka Connect 的内部主题进行读/写访问。此流程概述了如何使用 简单 授权和 ACL 授予访问权限。
Kafka Connect 集群组 ID 和内部主题的属性由 Apache Kafka 的 Streams 配置。另外,您可以在 KafkaConnect 资源的 spec 中显式定义它们。这在 为多个实例配置 Kafka Connect 时很有用,因为运行多个 Kafka Connect 实例时组 ID 和主题的值必须有所不同。
简单授权使用由 Kafka AclAuthorizer 和 StandardAuthorizer 插件管理的 ACL 规则来确保适当的访问级别。有关将 KafkaUser 资源配置为使用简单授权的更多信息,请参阅 AclRule 模式参考。
先决条件
- 一个 OpenShift 集群
- 正在运行的 Cluster Operator
流程
编辑
KafkaUser资源中的authorization属性,为用户提供访问权限。使用
字面名称值为 Kafka Connect 主题和集群组配置访问权限。下表显示了为主题和集群组 ID 配置的默认名称。Expand 表 9.2. 访问权限配置的名称 属性 名称 offset.storage.topicconnect-cluster-offsetsstatus.storage.topicconnect-cluster-statusconfig.storage.topicconnect-cluster-configsgroupconnect-cluster在本例中,默认名称用于指定访问权限。如果您要为 Kafka Connect 实例使用不同的名称,请在 ACL 配置中使用这些名称。
简单授权配置示例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaUser metadata: name: my-user labels: strimzi.io/cluster: my-cluster spec: # ... authorization: type: simple acls: # access to offset.storage.topic - resource: type: topic name: connect-cluster-offsets patternType: literal operations: - Create - Describe - Read - Write host: "*" # access to status.storage.topic - resource: type: topic name: connect-cluster-status patternType: literal operations: - Create - Describe - Read - Write host: "*" # access to config.storage.topic - resource: type: topic name: connect-cluster-configs patternType: literal operations: - Create - Describe - Read - Write host: "*" # cluster group - resource: type: group name: connect-cluster patternType: literal operations: - Read host: "*"创建或更新资源。
oc apply -f KAFKA-USER-CONFIG-FILE
9.6.3. 手动停止或暂停 Kafka Connect 连接器 复制链接链接已复制到粘贴板!
如果您使用 KafkaConnector 资源配置连接器,请使用 state 配置来停止或暂停连接器。与连接器和任务保持实例化的暂停状态不同,停止连接器只保留配置,且没有活跃进程。从运行停止连接器可能更适合长时间运行,而不是只暂停。虽然暂停的连接器速度更快恢复,但已停止的连接器具有释放内存和资源的优点。
state 配置替换 KafkaConnectorSpec 模式中的(已弃用) pause 配置,它允许在连接器上暂停。如果您之前使用 pause 配置来暂停连接器,我们建议您只使用 state 配置过渡到 以避免冲突。
先决条件
- Cluster Operator 正在运行。
流程
查找控制您要暂停或停止连接器的
KafkaConnector自定义资源的名称:oc get KafkaConnector编辑
KafkaConnector资源,以停止或暂停连接器。停止 Kafka Connect 连接器的配置示例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: my-source-connector labels: strimzi.io/cluster: my-connect-cluster spec: class: org.apache.kafka.connect.file.FileStreamSourceConnector tasksMax: 2 config: file: "/opt/kafka/LICENSE" topic: my-topic state: stopped # ...将
state配置更改为stopped或paused。当此属性没有设置时,连接器的默认状态为running。对
KafkaConnector配置应用更改。您可以通过将
state改为running,或删除配置来恢复连接器。
另外,您可以 公开 Kafka Connect API,并使用 stop 和 pause 端点停止连接器运行。例如,PUT /connectors/<connector_name>/stop。然后,您可以使用 resume 端点重启它。
9.6.4. 手动重启 Kafka 连接连接器 复制链接链接已复制到粘贴板!
如果您使用 KafkaConnector 资源来管理连接器,请使用 strimzi.io/restart 注解来手动触发连接器的重启。
先决条件
- Cluster Operator 正在运行。
流程
查找控制您要重启的 Kafka 连接器的
KafkaConnector自定义资源的名称:oc get KafkaConnector通过在 OpenShift 中注解
KafkaConnector资源来重启连接器。oc annotate KafkaConnector <kafka_connector_name> strimzi.io/restart="true"重启注解设置为true。等待下一个协调发生(默认为两分钟)。
只要协调过程检测到注解,Kafka 连接器就会重启。当 Kafka Connect 接受重启请求时,注解会从
KafkaConnector自定义资源中删除。
9.6.5. 手动重启 Kafka Connect 连接器任务 复制链接链接已复制到粘贴板!
如果您使用 KafkaConnector 资源来管理连接器,请使用 strimzi.io/restart-task 注解来手动触发连接器任务的重启。
先决条件
- Cluster Operator 正在运行。
流程
查找控制您要重启的 Kafka 连接器任务的
KafkaConnector自定义资源名称:oc get KafkaConnector查找从
KafkaConnector自定义资源重启的任务 ID:oc describe KafkaConnector <kafka_connector_name>任务 ID 是非负整数,从 0 开始。
通过注解 OpenShift 中的
KafkaConnector资源,使用 ID 重启连接器任务:oc annotate KafkaConnector <kafka_connector_name> strimzi.io/restart-task="0"在本例中,任务
0被重启。等待下一个协调发生(默认为两分钟)。
Kafka 连接器任务会重启,只要协调过程检测到注解。当 Kafka Connect 接受重启请求时,注解会从
KafkaConnector自定义资源中删除。