2.3. Kafka Connect 集群配置


使用 KafkaConnect 资源配置 Kafka Connect 部署。Kafka Connect 是一个使用连接器插件在 Kafka 代理和其他系统间流传输数据的集成工具包。Kafka Connect 提供了一个框架,用于将 Kafka 与外部数据源或目标(如数据库)集成,如数据库,用于使用连接器导入或导出数据。连接器是提供所需的连接配置的插件。

第 12.2.60 节 “KafkaConnect 模式参考” 描述 KafkaConnect 资源的完整 schema。

如需有关部署连接器插件的更多信息,请参阅 使用连接器插件扩展 Kafka 连接

2.3.1. 配置 Kafka 连接

使用 Kafka Connect 为 Kafka 集群设置外部数据连接。使用 KafkaConnect 资源的属性来配置 Kafka Connect 部署。

KafkaConnector 配置

KafkaConnector 资源允许您以 OpenShift 原生的方式创建和管理 Kafka Connect 的连接器实例。

在 Kafka Connect 配置中,您可以通过添加 strimzi.io/use-connector-resources 注解来为 Kafka Connect 集群启用 KafkaConnectors。您还可以添加 构建配置,以便 AMQ Streams 使用您数据连接所需的连接器插件自动构建容器镜像。Kafka Connect 连接器的外部配置通过 externalConfiguration 属性指定。

要管理连接器,您可以使用 Kafka Connect REST API,或使用 KafkaConnector 自定义资源。KafkaConnector 资源必须部署到它们所链接的 Kafka Connect 集群相同的命名空间中。有关使用这些方法创建、重新配置或删除连接器的更多信息,请参阅创建和管理连接器

连接器配置作为 HTTP 请求的一部分传递给 Kafka Connect,并存储在 Kafka 本身中。ConfigMap 和机密是用于存储配置和机密数据的标准 OpenShift 资源。您可以使用 ConfigMap 和 Secret 来配置连接器的特定元素。然后,您可以引用 HTTP REST 命令中的配置值,以便在需要时保持独立且更安全的配置。此方法尤其适用于机密数据,如用户名、密码或证书。

处理大量消息

您可以调整配置来处理大量消息。更多信息请参阅 第 2.7 节 “处理大量消息”

先决条件

  • 一个 OpenShift 集群
  • 正在运行的 Cluster Operator

有关运行的信息 ,请参阅 OpenShift 中的部署和升级 AMQ Streams 指南:

流程

  1. 编辑 KafkaConnect 资源的 spec 属性。

    您可以在以下示例配置中显示您可以配置的属性:

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnect 
    1
    
    metadata:
      name: my-connect-cluster
      annotations:
        strimzi.io/use-connector-resources: "true" 
    2
    
    spec:
      replicas: 3 
    3
    
      authentication: 
    4
    
        type: tls
        certificateAndKey:
          certificate: source.crt
          key: source.key
          secretName: my-user-source
      bootstrapServers: my-cluster-kafka-bootstrap:9092 
    5
    
      tls: 
    6
    
        trustedCertificates:
          - secretName: my-cluster-cluster-cert
            certificate: ca.crt
          - secretName: my-cluster-cluster-cert
            certificate: ca2.crt
      config: 
    7
    
        group.id: my-connect-cluster
        offset.storage.topic: my-connect-cluster-offsets
        config.storage.topic: my-connect-cluster-configs
        status.storage.topic: my-connect-cluster-status
        key.converter: org.apache.kafka.connect.json.JsonConverter
        value.converter: org.apache.kafka.connect.json.JsonConverter
        key.converter.schemas.enable: true
        value.converter.schemas.enable: true
        config.storage.replication.factor: 3
        offset.storage.replication.factor: 3
        status.storage.replication.factor: 3
      build: 
    8
    
        output: 
    9
    
          type: docker
          image: my-registry.io/my-org/my-connect-cluster:latest
          pushSecret: my-registry-credentials
        plugins: 
    10
    
          - name: debezium-postgres-connector
            artifacts:
              - type: tgz
                url: https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/1.3.1.Final/debezium-connector-postgres-1.3.1.Final-plugin.tar.gz
                sha512sum: 962a12151bdf9a5a30627eebac739955a4fd95a08d373b86bdcea2b4d0c27dd6e1edd5cb548045e115e33a9e69b1b2a352bee24df035a0447cb820077af00c03
          - name: camel-telegram
            artifacts:
              - type: tgz
                url: https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-telegram-kafka-connector/0.7.0/camel-telegram-kafka-connector-0.7.0-package.tar.gz
                sha512sum: a9b1ac63e3284bea7836d7d24d84208c49cdf5600070e6bd1535de654f6920b74ad950d51733e8020bf4187870699819f54ef5859c7846ee4081507f48873479
      externalConfiguration: 
    11
    
        env:
          - name: AWS_ACCESS_KEY_ID
            valueFrom:
              secretKeyRef:
                name: aws-creds
                key: awsAccessKey
          - name: AWS_SECRET_ACCESS_KEY
            valueFrom:
              secretKeyRef:
                name: aws-creds
                key: awsSecretAccessKey
      resources: 
    12
    
        requests:
          cpu: "1"
          memory: 2Gi
        limits:
          cpu: "2"
          memory: 2Gi
      logging: 
    13
    
        type: inline
        loggers:
          log4j.rootLogger: "INFO"
      readinessProbe: 
    14
    
        initialDelaySeconds: 15
        timeoutSeconds: 5
      livenessProbe:
        initialDelaySeconds: 15
        timeoutSeconds: 5
      metricsConfig: 
    15
    
        type: jmxPrometheusExporter
        valueFrom:
          configMapKeyRef:
            name: my-config-map
            key: my-key
      jvmOptions: 
    16
    
        "-Xmx": "1g"
        "-Xms": "1g"
      image: my-org/my-image:latest 
    17
    
      rack:
        topologyKey: topology.kubernetes.io/zone 
    18
    
      template: 
    19
    
        pod:
          affinity:
            podAntiAffinity:
              requiredDuringSchedulingIgnoredDuringExecution:
                - labelSelector:
                    matchExpressions:
                      - key: application
                        operator: In
                        values:
                          - postgresql
                          - mongodb
                  topologyKey: "kubernetes.io/hostname"
        connectContainer: 
    20
    
          env:
            - name: JAEGER_SERVICE_NAME
              value: my-jaeger-service
            - name: JAEGER_AGENT_HOST
              value: jaeger-agent-name
            - name: JAEGER_AGENT_PORT
              value: "6831"
    Copy to Clipboard Toggle word wrap
    1
    使用 KafkaConnect
    2
    为 Kafka Connect 集群启用 KafkaConnectors。
    3
    用于运行服务的 worker 的副本节点数量
    4
    Kafka Connect 集群的身份验证,它被指定为 mTLS、基于令牌的OAuth、基于 SASL 的 SCRAM-SHA-256/SCRAM-SHA-512PLAIN。默认情况下,Kafka Connect 使用纯文本连接连接到 Kafka 代理。
    5
    连接到 Kafka Connect 集群的 bootstrap 服务器
    6
    TLS 加密,使用密钥名称,其中 TLS 证书存储为集群的 X.509 格式。如果证书存储在同一 secret 中,可以多次列出它。
    7
    worker 的 Kafka 连接配置 (而非连接器)。可以提供标准 Apache Kafka 配置,仅限于不直接由 AMQ Streams 管理的属性。
    8
    构建配置属性,以使用连接器插件构建容器镜像。
    9
    (必需)推送新镜像的容器注册表的配置。
    10
    (必需)连接器插件列表及其要添加到新容器镜像的工件。每个插件必须至少配置一个 工件
    11
    Kafka 连接器的外部配置,如此处所示或卷。您还可以使用 配置供应商插件 从外部源 加载配置值
    12
    用于保留 支持的资源、当前 cpu 和内存 的请求,以及指定可消耗的最大资源数量。
    13
    指定的 Kafka Connect 日志记录器和日志级别 直接(内联)或通过 ConfigMap 间接添加(外部)。自定义 ConfigMap 必须放在 log4j.propertieslog4j2.properties 键下。对于 Kafka Connect log4j.rootLogger 日志记录器,您可以将日志级别设置为 INFO、ERROR、WARN、WARN、TRACE、DEBUG、FATAL 或 OFF。
    14
    状况检查,了解何时重启容器(持续)以及容器是否可以接受流量(就绪状态)。
    15
    Prometheus metrics,通过引用本例中的 Prometheus JMX JMX exporter 配置的 ConfigMap 来启用。您可以启用指标,而无需进一步配置,使用对 metricsConfig.valueFrom.configMapKeyRef.key 下包含空文件的 ConfigMap 的引用。
    16
    JVM 配置选项,用于优化运行 Kafka Connect 的虚拟机(VM)的性能。
    17
    ADVANCED OPTION: 容器镜像配置,这只在特殊情况下建议。
    18
    SPECIALIZED OPTION :对部署的意识 配置.这是用于在同一位置(而非跨地区)部署的专用选项。如果您希望连接器从最接近的副本而不是领导副本使用,则使用此选项。在某些情况下,消耗自最接近的副本可以提高网络利用率或降低成本。topologyKey 必须与包含机架 ID 的节点标签匹配。此配置中使用的示例使用标准 topology.kubernetes.io/zone 标签指定区。要从最接近的副本使用,请在 Kafka 代理配置中启用 RackAwareReplicaSelector
    19
    模板自定义。这里调度了带有反关联性的 pod,因此不会将 pod 调度到具有相同主机名的节点。
    20
    为分布式追踪设置环境变量。
  2. 创建或更新资源:

    oc apply -f KAFKA-CONNECT-CONFIG-FILE
    Copy to Clipboard Toggle word wrap
  3. 如果为 Kafka Connect 启用了授权,请配置 Kafka Connect Connect 用户以启用对 Kafka Connect consumer 组和主题的访问

2.3.2. 多个实例的 Kafka 连接配置

如果您运行多个 Kafka Connect 实例,您必须更改以下配置属性 的默认配置

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect
spec:
  # ...
  config:
    group.id: connect-cluster 
1

    offset.storage.topic: connect-cluster-offsets 
2

    config.storage.topic: connect-cluster-configs 
3

    status.storage.topic: connect-cluster-status  
4

    # ...
# ...
Copy to Clipboard Toggle word wrap
1
Kafka 中的 Kafka Connect 集群 ID。
2
存储连接器偏移的 Kafka 主题。
3
存储连接器和任务状态配置的 Kafka 主题。
4
存储连接器和任务状态更新的 Kafka 主题。
注意

这三个主题的值对于具有相同 组.id 的所有 Kafka 连接实例必须相同。

除非更改默认设置,否则每个 Kafka 连接实例都使用相同的值部署连接到同一 Kafka 集群。这是因为所有实例都联合在集群中运行,并使用相同的主题。

如果多个 Kafka Connect 集群尝试使用相同主题,则 Kafka Connect 将无法按预期工作,并生成错误。

如果要运行多个 Kafka Connect 实例,请更改每个实例的这些属性值。

2.3.3. 配置 Kafka Connect 用户授权

这个步骤描述了如何授权用户对 Kafka 连接的访问。

当在 Kafka 中使用任何类型的授权时,Kafka Connect 用户需要对使用者组和 Kafka Connect 的内部主题进行读/写访问权限。

consumer 组和内部主题的属性由 AMQ Streams 自动配置,也可以在 KafkaConnect 资源的 spec 中明确指定。

KafkaConnect 资源中的配置属性示例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect
spec:
  # ...
  config:
    group.id: my-connect-cluster 
1

    offset.storage.topic: my-connect-cluster-offsets 
2

    config.storage.topic: my-connect-cluster-configs 
3

    status.storage.topic: my-connect-cluster-status 
4

    # ...
  # ...
Copy to Clipboard Toggle word wrap

1
Kafka 中的 Kafka Connect 集群 ID。
2
存储连接器偏移的 Kafka 主题。
3
存储连接器和任务状态配置的 Kafka 主题。
4
存储连接器和任务状态更新的 Kafka 主题。

此流程演示了如何在 使用简单 授权时提供访问权限。

简单授权使用由 Kafka AclAuthorizer 插件处理的 ACL 规则来提供正确的访问权限级别。有关配置 KafkaUser 资源以使用简单授权的更多信息,请参阅 AclRule 模式参考

注意

先决条件

  • 一个 OpenShift 集群
  • 正在运行的 Cluster Operator

流程

  1. 编辑 KafkaUser 资源中的 authorization 属性,以为用户提供访问权限。

    在以下示例中,使用 字面 名称值为 Kafka Connect 主题和消费者组配置访问权限:

    Expand
    属性名称

    offset.storage.topic

    connect-cluster-offsets

    status.storage.topic

    connect-cluster-status

    config.storage.topic

    connect-cluster-configs

    group

    connect-cluster

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaUser
    metadata:
      name: my-user
      labels:
        strimzi.io/cluster: my-cluster
    spec:
      # ...
      authorization:
        type: simple
        acls:
          # access to offset.storage.topic
          - resource:
              type: topic
              name: connect-cluster-offsets
              patternType: literal
            operations:
              - Create
              - Describe
              - Read
              - Write
            host: "*"
          # access to status.storage.topic
          - resource:
              type: topic
              name: connect-cluster-status
              patternType: literal
            operations:
              - Create
              - Describe
              - Read
              - Write
            host: "*"
          # access to config.storage.topic
          - resource:
              type: topic
              name: connect-cluster-configs
              patternType: literal
            operations:
              - Create
              - Describe
              - Read
              - Write
            host: "*"
          # consumer group
          - resource:
              type: group
              name: connect-cluster
              patternType: literal
            operations:
              - Read
            host: "*"
    Copy to Clipboard Toggle word wrap
  2. 创建或更新资源。

    oc apply -f KAFKA-USER-CONFIG-FILE
    Copy to Clipboard Toggle word wrap

2.3.4. Kafka Connect 集群资源列表

以下资源由 OpenShift 集群中的 Cluster Operator 创建:

connect-cluster-name-connect
用于创建 Kafka Connect worker 节点 pod 的部署。
connect-cluster-name-connect-api
此服务公开了管理 Kafka Connect 集群的 REST 接口。
connect-cluster-name-config
包含 Kafka Connect ancillary 配置的 ConfigMap,并由 Kafka 代理 pod 挂载为卷。
connect-cluster-name-connect
为 Kafka Connect worker 节点配置的 Pod Disruption Budget。

红帽构建的 Debezium 是一个分布式更改数据捕获平台。它捕获数据库中的行级更改,创建更改事件记录,并将记录流传输到 Kafka 主题。Debezium 基于 Apache Kafka 构建。您可以将红帽构建的 Debezium 与 AMQ Streams 一起部署并集成。部署 AMQ Streams 后,您可以通过 Kafka Connect 将 Debezium 部署为连接器配置。Debezium 将更改事件记录传递到 OpenShift 上的 AMQ Streams。应用程序可以读取 这些更改事件流,并按发生更改事件的顺序访问更改事件。

Debezium 具有多个用途,包括:

  • 数据复制
  • 更新缓存和搜索索引
  • 简化单体式应用程序
  • 数据集成
  • 启用流查询

要捕获数据库更改,请使用 Debezium 数据库连接器部署 Kafka 连接。您可以配置 KafkaConnector 资源来定义连接器实例。

有关将红帽构建的 Debezium 与 AMQ Streams 一起部署的更多信息,请参阅产品文档。文档包括 Debezium 入门指南,指导您完成设置数据库更新事件记录所需的服务和连接器。

返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat