搜索

9.6. 配置 Kafka Connect

download PDF

更新 KafkaConnect 自定义资源的 spec 属性来配置 Kafka Connect 部署。

使用 Kafka Connect 为 Kafka 集群设置外部数据连接。使用 KafkaConnect 资源的属性来配置 Kafka Connect 部署。

要深入了解 Kafka Connect 集群配置选项,请参阅 Apache Kafka 自定义资源 API 参考

KafkaConnector 配置

KafkaConnector 资源允许您以 OpenShift 原生的方式创建和管理 Kafka Connect 的连接器实例。

在 Kafka Connect 配置中,您可以通过添加 strimzi.io/use-connector-resources 注解来为 Kafka Connect 集群启用 KafkaConnectors。您还可以添加 构建配置,以便 Apache Kafka 的 Streams 会自动使用您数据连接所需的连接器插件构建容器镜像。Kafka Connect 连接器的外部配置通过 externalConfiguration 属性指定。

要管理连接器,您可以使用 KafkaConnector 自定义资源或 Kafka Connect REST API。KafkaConnector 资源必须部署到它们所链接的 Kafka Connect 集群相同的命名空间中。有关使用这些方法创建、重新配置或删除连接器的更多信息,请参阅 添加连接器

连接器配置作为 HTTP 请求的一部分传递给 Kafka Connect,并存储在 Kafka 本身中。ConfigMap 和机密是用于存储配置和机密数据的标准 OpenShift 资源。您可以使用 ConfigMap 和 Secret 来配置连接器的特定元素。然后,您可以在 HTTP REST 命令中引用配置值,这样可保持配置独立且更安全。此方法特别适用于机密数据,如用户名、密码或证书。

处理大量信息

您可以调整配置以处理大量信息。如需更多信息,请参阅 处理大量信息

KafkaConnect 自定义资源配置示例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect 1
metadata:
  name: my-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true" 2
spec:
  replicas: 3 3
  authentication: 4
    type: tls
    certificateAndKey:
      certificate: source.crt
      key: source.key
      secretName: my-user-source
  bootstrapServers: my-cluster-kafka-bootstrap:9092 5
  tls: 6
    trustedCertificates:
      - secretName: my-cluster-cluster-cert
        certificate: ca.crt
      - secretName: my-cluster-cluster-cert
        certificate: ca2.crt
  config: 7
    group.id: my-connect-cluster
    offset.storage.topic: my-connect-cluster-offsets
    config.storage.topic: my-connect-cluster-configs
    status.storage.topic: my-connect-cluster-status
    key.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter: org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable: true
    value.converter.schemas.enable: true
    config.storage.replication.factor: 3
    offset.storage.replication.factor: 3
    status.storage.replication.factor: 3
  build: 8
    output: 9
      type: docker
      image: my-registry.io/my-org/my-connect-cluster:latest
      pushSecret: my-registry-credentials
    plugins: 10
      - name: connector-1
        artifacts:
          - type: tgz
            url: <url_to_download_connector_1_artifact>
            sha512sum: <SHA-512_checksum_of_connector_1_artifact>
      - name: connector-2
        artifacts:
          - type: jar
            url: <url_to_download_connector_2_artifact>
            sha512sum: <SHA-512_checksum_of_connector_2_artifact>
  externalConfiguration: 11
    env:
      - name: AWS_ACCESS_KEY_ID
        valueFrom:
          secretKeyRef:
            name: aws-creds
            key: awsAccessKey
      - name: AWS_SECRET_ACCESS_KEY
        valueFrom:
          secretKeyRef:
            name: aws-creds
            key: awsSecretAccessKey
  resources: 12
    requests:
      cpu: "1"
      memory: 2Gi
    limits:
      cpu: "2"
      memory: 2Gi
  logging: 13
    type: inline
    loggers:
      log4j.rootLogger: INFO
  readinessProbe: 14
    initialDelaySeconds: 15
    timeoutSeconds: 5
  livenessProbe:
    initialDelaySeconds: 15
    timeoutSeconds: 5
  metricsConfig: 15
    type: jmxPrometheusExporter
    valueFrom:
      configMapKeyRef:
        name: my-config-map
        key: my-key
  jvmOptions: 16
    "-Xmx": "1g"
    "-Xms": "1g"
  image: my-org/my-image:latest 17
  rack:
    topologyKey: topology.kubernetes.io/zone 18
  template: 19
    pod:
      affinity:
        podAntiAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            - labelSelector:
                matchExpressions:
                  - key: application
                    operator: In
                    values:
                      - postgresql
                      - mongodb
              topologyKey: "kubernetes.io/hostname"
    connectContainer: 20
      env:
        - name: OTEL_SERVICE_NAME
          value: my-otel-service
        - name: OTEL_EXPORTER_OTLP_ENDPOINT
          value: "http://otlp-host:4317"
  tracing:
    type: opentelemetry 21

1
使用 KafkaConnect
2
为 Kafka Connect 集群启用 KafkaConnectors。
3
运行任务的 worker 的副本节点数量。
4
Kafka Connect 集群的身份验证,指定为 mTLS、基于令牌的 OAuth、基于 SASL 的 SCRAM-SHA-256/SCRAM-SHA-512 或 PLAIN。默认情况下,Kafka Connect 使用纯文本连接连接到 Kafka 代理。
5
用于连接到 Kafka 集群的 bootstrap 服务器。
6
TLS 加密,使用密钥名称,其中 TLS 证书存储为集群的 X.509 格式。如果证书存储在同一 secret 中,则可以多次列出。
7
worker 的 Kafka 连接配置(而不是连接器)。标准 Apache Kafka 配置可能会提供,仅限于不直接由 Apache Kafka 的 Streams 管理的属性。
8
构建用于自动使用连接器插件构建容器镜像的配置属性。
9
(必需)推送新镜像的容器 registry 的配置。
10
(必需)连接器插件及其工件列表,以添加到新容器镜像中。每个插件必须配置至少一个 工件
11
使用环境变量的连接器的外部配置,如此处或卷所示。您还可以使用配置供应商插件从外部来源加载配置值。
12
为保留支持的资源(当前 cpumemory )的请求,以及指定可消耗的最大资源的限制。
13
指定 Kafka Connect 日志记录器和日志级别直接(内联)或通过 ConfigMap 间接(外部)。自定义 Log4j 配置必须放在 ConfigMap 中的 log4j.propertieslog4j2.properties 键下。对于 Kafka Connect log4j.rootLogger 日志记录器,您可以将日志级别设置为 INFO, ERROR, WARN, TRACE, DEBUG,FATAL 或 OFF。
14
检查检查以了解何时重启容器(存活度)以及何时容器可以接受流量(就绪度)。
15
Prometheus 指标,通过引用包含在此示例中 Prometheus JMX 导出器配置的 ConfigMap 启用。您可以使用对 metricsConfig.valueFrom.configMapKeyRef.key 下包含空文件的 ConfigMap 的引用来启用指标。
16
JVM 配置选项,用于优化运行 Kafka Connect 的虚拟机(VM)的性能。
17
ProductShortName OPTION: 容器镜像配置,仅在特殊情况下推荐使用。
18
SPECIALIZED OPTION:部署的机架感知配置。这是用于在同一位置(而非跨地区)部署的专用选项。如果您希望连接器从最接近的副本而不是领导副本使用,则使用此选项。在某些情况下,使用来自最接近的副本的消耗可以提高网络利用率或降低成本。topologyKey 必须与包含机架 ID 的节点标签匹配。此配置中使用的示例使用标准 topology.kubernetes.io/zone 标签指定区。要从最接近的副本使用,请在 Kafka 代理配置中启用 RackAwareReplicaSelector
19
模板自定义。此处的 pod 使用反关联性调度,因此 pod 不会调度到具有相同主机名的节点。
20
为分布式追踪设置环境变量。
21
使用 OpenTelemetry 启用分布式追踪。

9.6.1. 为多个实例配置 Kafka 连接

默认情况下,Apache Kafka 的 Streams 配置 Kafka Connect 使用的内部主题的组 ID 和名称。在运行多个 Kafka Connect 实例时,您必须使用以下配置属性更改 这些默认设置

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect
spec:
  config:
    group.id: my-connect-cluster 1
    offset.storage.topic: my-connect-cluster-offsets 2
    config.storage.topic: my-connect-cluster-configs 3
    status.storage.topic: my-connect-cluster-status 4
    # ...
  # ...
1
Kafka 中的 Kafka Connect 集群组 ID。
2
存储连接器偏移的 Kafka 主题。
3
存储连接器和任务状态配置的 Kafka 主题。
4
存储连接器和任务状态更新的 Kafka 主题。
注意

对于具有相同 group.id 的所有实例,三个主题的值必须相同。

除非修改了这些默认设置,否则每个连接到同一 Kafka 集群的实例都会使用相同的值部署。在实践中,这意味着所有实例组成一个集群并使用相同的内部主题。

尝试使用同一内部主题的多个实例将导致意外错误,因此您必须更改每个实例的这些属性值。

9.6.2. 配置 Kafka Connect 用户授权

在 Kafka 中使用授权时,Kafka Connect 用户需要对集群组和 Kafka Connect 的内部主题进行读/写访问。此流程概述了如何使用 简单 授权和 ACL 授予访问权限。

Kafka Connect 集群组 ID 和内部主题的属性由 Apache Kafka 的 Streams 配置。另外,您可以在 KafkaConnect 资源的 spec 中显式定义它们。这在 为多个实例配置 Kafka Connect 时很有用,因为运行多个 Kafka Connect 实例时组 ID 和主题的值必须有所不同。

简单授权使用由 Kafka AclAuthorizerStandardAuthorizer 插件管理的 ACL 规则来确保适当的访问级别。有关将 KafkaUser 资源配置为使用简单授权的更多信息,请参阅 AclRule 模式参考

先决条件

  • 一个 OpenShift 集群
  • 正在运行的 Cluster Operator

流程

  1. 编辑 KafkaUser 资源中的 authorization 属性,为用户提供访问权限。

    使用 字面 名称值为 Kafka Connect 主题和集群组配置访问权限。下表显示了为主题和集群组 ID 配置的默认名称。

    表 9.2. 访问权限配置的名称
    属性名称

    offset.storage.topic

    connect-cluster-offsets

    status.storage.topic

    connect-cluster-status

    config.storage.topic

    connect-cluster-configs

    group

    connect-cluster

    在本例中,默认名称用于指定访问权限。如果您要为 Kafka Connect 实例使用不同的名称,请在 ACL 配置中使用这些名称。

    简单授权配置示例

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaUser
    metadata:
      name: my-user
      labels:
        strimzi.io/cluster: my-cluster
    spec:
      # ...
      authorization:
        type: simple
        acls:
          # access to offset.storage.topic
          - resource:
              type: topic
              name: connect-cluster-offsets
              patternType: literal
            operations:
              - Create
              - Describe
              - Read
              - Write
            host: "*"
          # access to status.storage.topic
          - resource:
              type: topic
              name: connect-cluster-status
              patternType: literal
            operations:
              - Create
              - Describe
              - Read
              - Write
            host: "*"
          # access to config.storage.topic
          - resource:
              type: topic
              name: connect-cluster-configs
              patternType: literal
            operations:
              - Create
              - Describe
              - Read
              - Write
            host: "*"
          # cluster group
          - resource:
              type: group
              name: connect-cluster
              patternType: literal
            operations:
              - Read
            host: "*"

  2. 创建或更新资源。

    oc apply -f KAFKA-USER-CONFIG-FILE

9.6.3. 手动停止或暂停 Kafka Connect 连接器

如果您使用 KafkaConnector 资源配置连接器,请使用 state 配置来停止或暂停连接器。与连接器和任务保持实例化的暂停状态不同,停止连接器只保留配置,且没有活跃进程。从运行停止连接器可能更适合长时间运行,而不是只暂停。虽然暂停的连接器速度更快恢复,但已停止的连接器具有释放内存和资源的优点。

注意

state 配置替换 KafkaConnectorSpec 模式中的(已弃用) pause 配置,它允许在连接器上暂停。如果您之前使用 pause 配置来暂停连接器,我们建议您只使用 state 配置过渡到 以避免冲突。

先决条件

  • Cluster Operator 正在运行。

流程

  1. 查找控制您要暂停或停止连接器的 KafkaConnector 自定义资源的名称:

    oc get KafkaConnector
  2. 编辑 KafkaConnector 资源,以停止或暂停连接器。

    停止 Kafka Connect 连接器的配置示例

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      name: my-source-connector
      labels:
        strimzi.io/cluster: my-connect-cluster
    spec:
      class: org.apache.kafka.connect.file.FileStreamSourceConnector
      tasksMax: 2
      config:
        file: "/opt/kafka/LICENSE"
        topic: my-topic
      state: stopped
      # ...

    state 配置更改为 stoppedpaused。当此属性没有设置时,连接器的默认状态为 running

  3. KafkaConnector 配置应用更改。

    您可以通过将 state 改为 running,或删除配置来恢复连接器。

注意

另外,您可以 公开 Kafka Connect API,并使用 stoppause 端点停止连接器运行。例如,PUT /connectors/<connector_name>/stop。然后,您可以使用 resume 端点重启它。

9.6.4. 手动重启 Kafka 连接连接器

如果您使用 KafkaConnector 资源来管理连接器,请使用 strimzi.io/restart 注解来手动触发连接器的重启。

先决条件

  • Cluster Operator 正在运行。

流程

  1. 查找控制您要重启的 Kafka 连接器的 KafkaConnector 自定义资源的名称:

    oc get KafkaConnector
  2. 通过在 OpenShift 中注解 KafkaConnector 资源来重启连接器。

    oc annotate KafkaConnector <kafka_connector_name> strimzi.io/restart="true"

    重启 注解设置为 true

  3. 等待下一个协调发生(默认为两分钟)。

    只要协调过程检测到注解,Kafka 连接器就会重启。当 Kafka Connect 接受重启请求时,注解会从 KafkaConnector 自定义资源中删除。

9.6.5. 手动重启 Kafka Connect 连接器任务

如果您使用 KafkaConnector 资源来管理连接器,请使用 strimzi.io/restart-task 注解来手动触发连接器任务的重启。

先决条件

  • Cluster Operator 正在运行。

流程

  1. 查找控制您要重启的 Kafka 连接器任务的 KafkaConnector 自定义资源名称:

    oc get KafkaConnector
  2. 查找从 KafkaConnector 自定义资源重启的任务 ID:

    oc describe KafkaConnector <kafka_connector_name>

    任务 ID 是非负整数,从 0 开始。

  3. 通过注解 OpenShift 中的 KafkaConnector 资源,使用 ID 重启连接器任务:

    oc annotate KafkaConnector <kafka_connector_name> strimzi.io/restart-task="0"

    在本例中,任务 0 被重启。

  4. 等待下一个协调发生(默认为两分钟)。

    Kafka 连接器任务会重启,只要协调过程检测到注解。当 Kafka Connect 接受重启请求时,注解会从 KafkaConnector 自定义资源中删除。

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.