9.6. 配置 Kafka Connect
更新 KafkaConnect
自定义资源的 spec
属性来配置 Kafka Connect 部署。
使用 Kafka Connect 为 Kafka 集群设置外部数据连接。使用 KafkaConnect
资源的属性来配置 Kafka Connect 部署。
要深入了解 Kafka Connect 集群配置选项,请参阅 Apache Kafka 自定义资源 API 参考。
KafkaConnector 配置
KafkaConnector
资源允许您以 OpenShift 原生的方式创建和管理 Kafka Connect 的连接器实例。
在 Kafka Connect 配置中,您可以通过添加 strimzi.io/use-connector-resources
注解来为 Kafka Connect 集群启用 KafkaConnectors。您还可以添加 构建配置
,以便 Apache Kafka 的 Streams 会自动使用您数据连接所需的连接器插件构建容器镜像。Kafka Connect 连接器的外部配置通过 externalConfiguration
属性指定。
要管理连接器,您可以使用 KafkaConnector
自定义资源或 Kafka Connect REST API。KafkaConnector
资源必须部署到它们所链接的 Kafka Connect 集群相同的命名空间中。有关使用这些方法创建、重新配置或删除连接器的更多信息,请参阅 添加连接器。
连接器配置作为 HTTP 请求的一部分传递给 Kafka Connect,并存储在 Kafka 本身中。ConfigMap 和机密是用于存储配置和机密数据的标准 OpenShift 资源。您可以使用 ConfigMap 和 Secret 来配置连接器的特定元素。然后,您可以在 HTTP REST 命令中引用配置值,这样可保持配置独立且更安全。此方法特别适用于机密数据,如用户名、密码或证书。
处理大量信息
您可以调整配置以处理大量信息。如需更多信息,请参阅 处理大量信息。
KafkaConnect
自定义资源配置示例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect 1 metadata: name: my-connect-cluster annotations: strimzi.io/use-connector-resources: "true" 2 spec: replicas: 3 3 authentication: 4 type: tls certificateAndKey: certificate: source.crt key: source.key secretName: my-user-source bootstrapServers: my-cluster-kafka-bootstrap:9092 5 tls: 6 trustedCertificates: - secretName: my-cluster-cluster-cert certificate: ca.crt - secretName: my-cluster-cluster-cert certificate: ca2.crt config: 7 group.id: my-connect-cluster offset.storage.topic: my-connect-cluster-offsets config.storage.topic: my-connect-cluster-configs status.storage.topic: my-connect-cluster-status key.converter: org.apache.kafka.connect.json.JsonConverter value.converter: org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable: true value.converter.schemas.enable: true config.storage.replication.factor: 3 offset.storage.replication.factor: 3 status.storage.replication.factor: 3 build: 8 output: 9 type: docker image: my-registry.io/my-org/my-connect-cluster:latest pushSecret: my-registry-credentials plugins: 10 - name: connector-1 artifacts: - type: tgz url: <url_to_download_connector_1_artifact> sha512sum: <SHA-512_checksum_of_connector_1_artifact> - name: connector-2 artifacts: - type: jar url: <url_to_download_connector_2_artifact> sha512sum: <SHA-512_checksum_of_connector_2_artifact> externalConfiguration: 11 env: - name: AWS_ACCESS_KEY_ID valueFrom: secretKeyRef: name: aws-creds key: awsAccessKey - name: AWS_SECRET_ACCESS_KEY valueFrom: secretKeyRef: name: aws-creds key: awsSecretAccessKey resources: 12 requests: cpu: "1" memory: 2Gi limits: cpu: "2" memory: 2Gi logging: 13 type: inline loggers: log4j.rootLogger: INFO readinessProbe: 14 initialDelaySeconds: 15 timeoutSeconds: 5 livenessProbe: initialDelaySeconds: 15 timeoutSeconds: 5 metricsConfig: 15 type: jmxPrometheusExporter valueFrom: configMapKeyRef: name: my-config-map key: my-key jvmOptions: 16 "-Xmx": "1g" "-Xms": "1g" image: my-org/my-image:latest 17 rack: topologyKey: topology.kubernetes.io/zone 18 template: 19 pod: affinity: podAntiAffinity: requiredDuringSchedulingIgnoredDuringExecution: - labelSelector: matchExpressions: - key: application operator: In values: - postgresql - mongodb topologyKey: "kubernetes.io/hostname" connectContainer: 20 env: - name: OTEL_SERVICE_NAME value: my-otel-service - name: OTEL_EXPORTER_OTLP_ENDPOINT value: "http://otlp-host:4317" tracing: type: opentelemetry 21
- 1
- 使用
KafkaConnect
。 - 2
- 为 Kafka Connect 集群启用 KafkaConnectors。
- 3
- 运行任务的 worker 的副本节点数量。
- 4
- Kafka Connect 集群的身份验证,指定为 mTLS、基于令牌的 OAuth、基于 SASL 的 SCRAM-SHA-256/SCRAM-SHA-512 或 PLAIN。默认情况下,Kafka Connect 使用纯文本连接连接到 Kafka 代理。
- 5
- 用于连接到 Kafka 集群的 bootstrap 服务器。
- 6
- TLS 加密,使用密钥名称,其中 TLS 证书存储为集群的 X.509 格式。如果证书存储在同一 secret 中,则可以多次列出。
- 7
- worker 的 Kafka 连接配置(而不是连接器)。标准 Apache Kafka 配置可能会提供,仅限于不直接由 Apache Kafka 的 Streams 管理的属性。
- 8
- 构建用于自动使用连接器插件构建容器镜像的配置属性。
- 9
- (必需)推送新镜像的容器 registry 的配置。
- 10
- (必需)连接器插件及其工件列表,以添加到新容器镜像中。每个插件必须配置至少一个
工件
。 - 11
- 使用环境变量的连接器的外部配置,如此处或卷所示。您还可以使用配置供应商插件从外部来源加载配置值。
- 12
- 为保留支持的资源(当前
cpu
和memory
)的请求,以及指定可消耗的最大资源的限制。 - 13
- 指定 Kafka Connect 日志记录器和日志级别直接(
内联
)或通过 ConfigMap 间接(外部
)。自定义 Log4j 配置必须放在 ConfigMap 中的log4j.properties
或log4j2.properties
键下。对于 Kafka Connectlog4j.rootLogger
日志记录器,您可以将日志级别设置为 INFO, ERROR, WARN, TRACE, DEBUG,FATAL 或 OFF。 - 14
- 检查检查以了解何时重启容器(存活度)以及何时容器可以接受流量(就绪度)。
- 15
- Prometheus 指标,通过引用包含在此示例中 Prometheus JMX 导出器配置的 ConfigMap 启用。您可以使用对
metricsConfig.valueFrom.configMapKeyRef.key
下包含空文件的 ConfigMap 的引用来启用指标。 - 16
- JVM 配置选项,用于优化运行 Kafka Connect 的虚拟机(VM)的性能。
- 17
- ProductShortName OPTION: 容器镜像配置,仅在特殊情况下推荐使用。
- 18
- SPECIALIZED OPTION:部署的机架感知配置。这是用于在同一位置(而非跨地区)部署的专用选项。如果您希望连接器从最接近的副本而不是领导副本使用,则使用此选项。在某些情况下,使用来自最接近的副本的消耗可以提高网络利用率或降低成本。
topologyKey
必须与包含机架 ID 的节点标签匹配。此配置中使用的示例使用标准topology.kubernetes.io/zone
标签指定区。要从最接近的副本使用,请在 Kafka 代理配置中启用RackAwareReplicaSelector
。 - 19
- 模板自定义。此处的 pod 使用反关联性调度,因此 pod 不会调度到具有相同主机名的节点。
- 20
- 为分布式追踪设置环境变量。
- 21
- 使用 OpenTelemetry 启用分布式追踪。
9.6.1. 为多个实例配置 Kafka 连接
默认情况下,Apache Kafka 的 Streams 配置 Kafka Connect 使用的内部主题的组 ID 和名称。在运行多个 Kafka Connect 实例时,您必须使用以下配置属性更改 这些默认设置
:
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect spec: config: group.id: my-connect-cluster 1 offset.storage.topic: my-connect-cluster-offsets 2 config.storage.topic: my-connect-cluster-configs 3 status.storage.topic: my-connect-cluster-status 4 # ... # ...
对于具有相同 group.id
的所有实例,三个主题的值必须相同。
除非修改了这些默认设置,否则每个连接到同一 Kafka 集群的实例都会使用相同的值部署。在实践中,这意味着所有实例组成一个集群并使用相同的内部主题。
尝试使用同一内部主题的多个实例将导致意外错误,因此您必须更改每个实例的这些属性值。
9.6.2. 配置 Kafka Connect 用户授权
在 Kafka 中使用授权时,Kafka Connect 用户需要对集群组和 Kafka Connect 的内部主题进行读/写访问。此流程概述了如何使用 简单
授权和 ACL 授予访问权限。
Kafka Connect 集群组 ID 和内部主题的属性由 Apache Kafka 的 Streams 配置。另外,您可以在 KafkaConnect
资源的 spec
中显式定义它们。这在 为多个实例配置 Kafka Connect 时很有用,因为运行多个 Kafka Connect 实例时组 ID 和主题的值必须有所不同。
简单授权使用由 Kafka AclAuthorizer
和 StandardAuthorizer
插件管理的 ACL 规则来确保适当的访问级别。有关将 KafkaUser
资源配置为使用简单授权的更多信息,请参阅 AclRule
模式参考。
先决条件
- 一个 OpenShift 集群
- 正在运行的 Cluster Operator
流程
编辑
KafkaUser
资源中的authorization
属性,为用户提供访问权限。使用
字面
名称值为 Kafka Connect 主题和集群组配置访问权限。下表显示了为主题和集群组 ID 配置的默认名称。表 9.2. 访问权限配置的名称 属性 名称 offset.storage.topic
connect-cluster-offsets
status.storage.topic
connect-cluster-status
config.storage.topic
connect-cluster-configs
group
connect-cluster
在本例中,默认名称用于指定访问权限。如果您要为 Kafka Connect 实例使用不同的名称,请在 ACL 配置中使用这些名称。
简单授权配置示例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaUser metadata: name: my-user labels: strimzi.io/cluster: my-cluster spec: # ... authorization: type: simple acls: # access to offset.storage.topic - resource: type: topic name: connect-cluster-offsets patternType: literal operations: - Create - Describe - Read - Write host: "*" # access to status.storage.topic - resource: type: topic name: connect-cluster-status patternType: literal operations: - Create - Describe - Read - Write host: "*" # access to config.storage.topic - resource: type: topic name: connect-cluster-configs patternType: literal operations: - Create - Describe - Read - Write host: "*" # cluster group - resource: type: group name: connect-cluster patternType: literal operations: - Read host: "*"
创建或更新资源。
oc apply -f KAFKA-USER-CONFIG-FILE
9.6.3. 手动停止或暂停 Kafka Connect 连接器
如果您使用 KafkaConnector
资源配置连接器,请使用 state
配置来停止或暂停连接器。与连接器和任务保持实例化的暂停状态不同,停止连接器只保留配置,且没有活跃进程。从运行停止连接器可能更适合长时间运行,而不是只暂停。虽然暂停的连接器速度更快恢复,但已停止的连接器具有释放内存和资源的优点。
state
配置替换 KafkaConnectorSpec
模式中的(已弃用) pause
配置,它允许在连接器上暂停。如果您之前使用 pause
配置来暂停连接器,我们建议您只使用 state
配置过渡到 以避免冲突。
先决条件
- Cluster Operator 正在运行。
流程
查找控制您要暂停或停止连接器的
KafkaConnector
自定义资源的名称:oc get KafkaConnector
编辑
KafkaConnector
资源,以停止或暂停连接器。停止 Kafka Connect 连接器的配置示例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: my-source-connector labels: strimzi.io/cluster: my-connect-cluster spec: class: org.apache.kafka.connect.file.FileStreamSourceConnector tasksMax: 2 config: file: "/opt/kafka/LICENSE" topic: my-topic state: stopped # ...
将
state
配置更改为stopped
或paused
。当此属性没有设置时,连接器的默认状态为running
。对
KafkaConnector
配置应用更改。您可以通过将
state
改为running
,或删除配置来恢复连接器。
另外,您可以 公开 Kafka Connect API,并使用 stop
和 pause
端点停止连接器运行。例如,PUT /connectors/<connector_name>/stop
。然后,您可以使用 resume
端点重启它。
9.6.4. 手动重启 Kafka 连接连接器
如果您使用 KafkaConnector
资源来管理连接器,请使用 strimzi.io/restart
注解来手动触发连接器的重启。
先决条件
- Cluster Operator 正在运行。
流程
查找控制您要重启的 Kafka 连接器的
KafkaConnector
自定义资源的名称:oc get KafkaConnector
通过在 OpenShift 中注解
KafkaConnector
资源来重启连接器。oc annotate KafkaConnector <kafka_connector_name> strimzi.io/restart="true"
重启
注解设置为true
。等待下一个协调发生(默认为两分钟)。
只要协调过程检测到注解,Kafka 连接器就会重启。当 Kafka Connect 接受重启请求时,注解会从
KafkaConnector
自定义资源中删除。
9.6.5. 手动重启 Kafka Connect 连接器任务
如果您使用 KafkaConnector
资源来管理连接器,请使用 strimzi.io/restart-task
注解来手动触发连接器任务的重启。
先决条件
- Cluster Operator 正在运行。
流程
查找控制您要重启的 Kafka 连接器任务的
KafkaConnector
自定义资源名称:oc get KafkaConnector
查找从
KafkaConnector
自定义资源重启的任务 ID:oc describe KafkaConnector <kafka_connector_name>
任务 ID 是非负整数,从 0 开始。
通过注解 OpenShift 中的
KafkaConnector
资源,使用 ID 重启连接器任务:oc annotate KafkaConnector <kafka_connector_name> strimzi.io/restart-task="0"
在本例中,任务
0
被重启。等待下一个协调发生(默认为两分钟)。
Kafka 连接器任务会重启,只要协调过程检测到注解。当 Kafka Connect 接受重启请求时,注解会从
KafkaConnector
自定义资源中删除。