9.7. 配置 Kafka Connect


更新 KafkaConnect 自定义资源的 spec 属性来配置 Kafka Connect 部署。

使用 Kafka Connect 为 Kafka 集群设置外部数据连接。使用 KafkaConnect 资源的属性来配置 Kafka Connect 部署。

您还可以使用 KafkaConnect 资源来指定以下内容:

  • 构建包含插件的容器镜像的连接器插件配置,以建立连接
  • 配置运行连接器的 Kafka Connect worker pod
  • 使用 KafkaConnector 资源管理连接器的注解

Cluster Operator 管理使用 KafkaConnector 资源部署的 Kafka Connect 集群,以及利用 KafkaConnector 资源创建的连接器。

要深入了解 Kafka Connect 集群配置选项,请参阅 自定义资源 API 参考

处理大量信息

您可以调整配置以处理大量信息。如需更多信息,请参阅 处理大量信息

KafkaConnect 自定义资源配置示例

# Basic configuration (required)
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect 
1

metadata:
  name: my-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true" 
2

# Deployment specifications
spec:
  # Replicas (required)
  replicas: 3 
3

  # Bootstrap servers (required)
  bootstrapServers: my-cluster-kafka-bootstrap:9092 
4

  # Kafka Connect configuration (recommended)
  config: 
5

    group.id: my-connect-cluster
    offset.storage.topic: my-connect-cluster-offsets
    config.storage.topic: my-connect-cluster-configs
    status.storage.topic: my-connect-cluster-status
    key.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter: org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable: true
    value.converter.schemas.enable: true
    config.storage.replication.factor: 3
    offset.storage.replication.factor: 3
    status.storage.replication.factor: 3
  # Resources requests and limits (recommended)
  resources: 
6

    requests:
      cpu: "1"
      memory: 2Gi
    limits:
      cpu: "2"
      memory: 2Gi
  # Authentication (optional)
  authentication: 
7

    type: tls
    certificateAndKey:
      certificate: source.crt
      key: source.key
      secretName: my-user-source
  # TLS configuration (optional)
  tls: 
8

    trustedCertificates:
      - secretName: my-cluster-cluster-cert
        pattern: "*.crt"
      - secretName: my-cluster-cluster-cert
        pattern: "*.crt"
  # Build configuration (optional)
  build: 
9

    output: 
10

      type: docker
      image: my-registry.io/my-org/my-connect-cluster:latest
      pushSecret: my-registry-credentials
    plugins: 
11

      - name: connector-1
        artifacts:
          - type: tgz
            url: <url_to_download_connector_1_artifact>
            sha512sum: <SHA-512_checksum_of_connector_1_artifact>
      - name: connector-2
        artifacts:
          - type: jar
            url: <url_to_download_connector_2_artifact>
            sha512sum: <SHA-512_checksum_of_connector_2_artifact>
  # Logging configuration (optional)
  logging: 
12

    type: inline
    loggers:
      log4j.rootLogger: INFO
  # Readiness probe (optional)
  readinessProbe: 
13

    initialDelaySeconds: 15
    timeoutSeconds: 5
  # Liveness probe (optional)
  livenessProbe:
    initialDelaySeconds: 15
    timeoutSeconds: 5
  # Metrics configuration (optional)
  metricsConfig: 
14

    type: jmxPrometheusExporter
    valueFrom:
      configMapKeyRef:
        name: my-config-map
        key: my-key
  # JVM options (optional)
  jvmOptions: 
15

    "-Xmx": "1g"
    "-Xms": "1g"
  # Custom image (optional)
  image: my-org/my-image:latest 
16

  # Rack awareness (optional)
  rack:
    topologyKey: topology.kubernetes.io/zone 
17

  # Pod and container template (optional)
  template: 
18

    pod:
      affinity:
        podAntiAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            - labelSelector:
                matchExpressions:
                  - key: application
                    operator: In
                    values:
                      - postgresql
                      - mongodb
              topologyKey: "kubernetes.io/hostname"
    connectContainer: 
19

      env:
        - name: OTEL_SERVICE_NAME
          value: my-otel-service
        - name: OTEL_EXPORTER_OTLP_ENDPOINT
          value: "http://otlp-host:4317"
        - name: AWS_ACCESS_KEY_ID
          valueFrom:
            secretKeyRef:
              name: aws-creds
              key: awsAccessKey
        - name: AWS_SECRET_ACCESS_KEY
          valueFrom:
            secretKeyRef:
              name: aws-creds
              key: awsSecretAccessKey
  # Tracing configuration (optional)
  tracing:
    type: opentelemetry 
20
Copy to Clipboard Toggle word wrap

1
使用 KafkaConnect
2
启用 KafkaConnector 资源来启动、停止和管理连接器实例。
3
运行任务的 worker 的副本节点数量。
4
连接到 Kafka 集群的 bootstrap 地址。该地址采用 < cluster_name>-kafka-bootstrap:<port_number> 格式。Kafka 集群不需要由 Streams for Apache Kafka 管理,或部署到 Kubernetes 集群。
5
运行连接器及其任务的 worker 连接(而不是连接器)。标准 Apache Kafka 配置可能会提供,仅限于不直接由 Apache Kafka 的 Streams 管理的属性。在本例中,指定了 JSON 转换器。Kafka Connect 使用的内部主题设置了 3 的复制因子(生产环境的最小要求)。在创建主题后更改复制因素无效。
6
为保留支持的资源(当前 cpumemory )的请求,以及指定可消耗的最大资源的限制。
7
Kafka Connect 集群的身份验证,指定为 mTLS、基于令牌的 OAuth、基于 SASL 的 SCRAM-SHA-256/SCRAM-SHA-512 或 PLAIN。默认情况下,Kafka Connect 使用纯文本连接连接到 Kafka 代理。
8
用于加密到 Kafka 集群的 TLS 配置,可信证书存储在指定 secret 中的 X.509 格式。
9
构建用于自动使用连接器插件构建容器镜像的配置属性。
10
(必需)推送新镜像的容器 registry 的配置。
11
(必需)连接器插件及其工件列表,以添加到新容器镜像中。每个插件必须配置至少一个 工件
12
指定 Kafka Connect 日志记录器和日志级别直接(内联)或通过 ConfigMap 间接(外部)。自定义 Log4j 配置必须放在 ConfigMap 中的 log4j.propertieslog4j2.properties 键下。对于 Kafka Connect log4j.rootLogger 日志记录器,您可以将日志级别设置为 INFO, ERROR, WARN, TRACE, DEBUG,FATAL 或 OFF。
13
检查检查以了解何时重启容器(存活度)以及何时容器可以接受流量(就绪度)。
14
Prometheus 指标,通过引用包含在此示例中 Prometheus JMX 导出器配置的 ConfigMap 启用。您可以使用对 metricsConfig.valueFrom.configMapKeyRef.key 下包含空文件的 ConfigMap 的引用来启用指标。
15
JVM 配置选项,用于优化运行 Kafka Connect 的虚拟机(VM)的性能。
16
ProductShortName OPTION: 容器镜像配置,仅在特殊情况下推荐使用。
17
SPECIALIZED OPTION:部署的机架感知配置。这是用于在同一位置(而非跨地区)部署的专用选项。如果您希望连接器从最接近的副本而不是领导副本使用,则使用此选项。在某些情况下,使用来自最接近的副本的消耗可以提高网络利用率或降低成本。topologyKey 必须与包含机架 ID 的节点标签匹配。此配置中使用的示例使用标准 topology.kubernetes.io/zone 标签指定区。要从最接近的副本使用,请在 Kafka 代理配置中启用 RackAwareReplicaSelector
18
模板自定义。此处的 pod 使用反关联性调度,因此 pod 不会调度到具有相同主机名的节点。
19
为分布式追踪设置环境变量,并将凭证传递给连接器。
20
使用 OpenTelemetry 启用分布式追踪。

9.7.1. 为多个实例配置 Kafka 连接

默认情况下,Apache Kafka 的 Streams 配置 Kafka Connect 使用的内部主题的组 ID 和名称。在运行多个 Kafka Connect 实例时,您必须使用以下配置属性更改 这些默认设置

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect
spec:
  config:
    group.id: my-connect-cluster 
1

    offset.storage.topic: my-connect-cluster-offsets 
2

    config.storage.topic: my-connect-cluster-configs 
3

    status.storage.topic: my-connect-cluster-status 
4

    # ...
  # ...
Copy to Clipboard Toggle word wrap
1
Kafka 中的 Kafka Connect 集群组 ID。
2
存储连接器偏移的 Kafka 主题。
3
存储连接器和任务状态配置的 Kafka 主题。
4
存储连接器和任务状态更新的 Kafka 主题。
注意

对于具有相同 group.id 的所有实例,三个主题的值必须相同。

除非修改了这些默认设置,否则每个连接到同一 Kafka 集群的实例都会使用相同的值部署。在实践中,这意味着所有实例组成一个集群并使用相同的内部主题。

尝试使用同一内部主题的多个实例将导致意外错误,因此您必须更改每个实例的这些属性值。

9.7.2. 配置 Kafka Connect 用户授权

在 Kafka 中使用授权时,Kafka Connect 用户需要对集群组和 Kafka Connect 的内部主题进行读/写访问。此流程概述了如何使用 简单 授权和 ACL 授予访问权限。

Kafka Connect 集群组 ID 和内部主题的属性由 Apache Kafka 的 Streams 配置。另外,您可以在 KafkaConnect 资源的 spec 中显式定义它们。这在 为多个实例配置 Kafka Connect 时很有用,因为运行多个 Kafka Connect 实例时组 ID 和主题的值必须有所不同。

简单授权使用由 Kafka AclAuthorizerStandardAuthorizer 插件管理的 ACL 规则来确保适当的访问级别。有关将 KafkaUser 资源配置为使用简单授权的更多信息,请参阅 AclRule 模式参考

先决条件

  • 一个 OpenShift 集群
  • 正在运行的 Cluster Operator

流程

  1. 编辑 KafkaUser 资源中的 authorization 属性,为用户提供访问权限。

    使用 字面 名称值为 Kafka Connect 主题和集群组配置访问权限。下表显示了为主题和集群组 ID 配置的默认名称。

    Expand
    表 9.2. 访问权限配置的名称
    属性名称

    offset.storage.topic

    connect-cluster-offsets

    status.storage.topic

    connect-cluster-status

    config.storage.topic

    connect-cluster-configs

    group

    connect-cluster

    在本例中,默认名称用于指定访问权限。如果您要为 Kafka Connect 实例使用不同的名称,请在 ACL 配置中使用这些名称。

    简单授权配置示例

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaUser
    metadata:
      name: my-user
      labels:
        strimzi.io/cluster: my-cluster
    spec:
      # ...
      authorization:
        type: simple
        acls:
          # access to offset.storage.topic
          - resource:
              type: topic
              name: connect-cluster-offsets
              patternType: literal
            operations:
              - Create
              - Describe
              - Read
              - Write
            host: "*"
          # access to status.storage.topic
          - resource:
              type: topic
              name: connect-cluster-status
              patternType: literal
            operations:
              - Create
              - Describe
              - Read
              - Write
            host: "*"
          # access to config.storage.topic
          - resource:
              type: topic
              name: connect-cluster-configs
              patternType: literal
            operations:
              - Create
              - Describe
              - Read
              - Write
            host: "*"
          # cluster group
          - resource:
              type: group
              name: connect-cluster
              patternType: literal
            operations:
              - Read
            host: "*"
    Copy to Clipboard Toggle word wrap

  2. 创建或更新资源。

    oc apply -f KAFKA-USER-CONFIG-FILE
    Copy to Clipboard Toggle word wrap
返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat