搜索

2.2. Kafka Connect/S2I 集群配置

download PDF

本节论述了如何在 AMQ Streams 集群中配置 Kafka Connect 或 Kafka Connect with Source-to-Image(S2I)部署。

Kafka Connect 是一个集成工具包,用于使用连接器插件在 Kafka 代理和其他系统间流传输数据。Kafka Connect 提供了一个框架,用于将 Kafka 与外部数据源或目标(如数据库)集成,以使用连接器导入或导出数据。连接器是提供所需连接配置的插件。

如果使用 Kafka Connect,您可以配置 KafkaConnect 或 KafkaConnect S2I 资源。如果您使用 Source-to-Image( S2I )框架来部署 Kafka Connect,请使用 KafkaConnectS2I 资源。

重要

随着 KafkaConnect 资源引入 构建配置,AMQ Streams 现在可以使用数据连接所需的连接器插件自动构建容器镜像。因此,使用 Source-to-Image(S2I)进行 Kafka Connect 的支持已弃用,并将在 AMQ Streams 1.8 后被删除。要准备此更改,您可以将 Kafka Connect S2I 实例迁移到 Kafka Connect 实例

2.2.1. 配置 Kafka 连接

使用 Kafka Connect 设置到 Kafka 集群的外部数据连接。

使用 KafkaConnect 或 KafkaConnect S2I 资源的属性来配置 Kafka Connect 部署。此流程中演示的示例适用于 KafkaConnect 资源,但 KafkaConnectS2I 资源的属性相同。

Kafka 连接器配置

KafkaConnector 资源允许您以 OpenShift 原生方式为 Kafka Connect 创建和管理连接器实例。

在 Kafka Connect 配置中,您可以通过添加 strimzi.io/use-connector-resources 注解为 Kafka Connect 集群启用 KafkaConnectors。您还可以添加 构建配置,以便 AMQ Streams 会自动构建带有数据连接所需的连接器插件的容器镜像。Kafka Connect 连接器的外部配置通过 externalConfiguration 属性指定。

要管理连接器,您可以使用 Kafka Connect REST API,或使用 KafkaConnector 自定义资源。KafkaConnector 资源必须部署到与其链接的 Kafka Connect 集群相同的命名空间中。有关使用这些方法创建、重新配置或删除连接器的更多信息,请参阅 OpenShift 上部署和升级 AMQ Streams 中的创建和管理连接器

连接器配置作为 HTTP 请求的一部分传递给 Kafka Connect,并存储在 Kafka 本身中。ConfigMap 和机密是用于存储配置和机密数据的标准 OpenShift 资源。您可以使用 ConfigMap 和 Secret 配置连接器的某些元素。然后,您可以引用 HTTP REST 命令中的配置值,以便在需要时保持配置独立和安全。这个方法尤其适用于机密数据,如用户名、密码或证书。

先决条件

  • OpenShift 集群
  • 一个正在运行的 Cluster Operator

有关运行 以下的说明,请参阅 OpenShift 指南中的部署和升级 AMQ Streams

流程

  1. 编辑 KafkaConnect 或 KafkaConnect S2I 资源的 spec 属性。

    您可以配置的属性显示在此示例配置中:

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnect 1
    metadata:
      name: my-connect-cluster
      annotations:
        strimzi.io/use-connector-resources: "true" 2
    spec:
      replicas: 3 3
      authentication: 4
        type: tls
        certificateAndKey:
          certificate: source.crt
          key: source.key
          secretName: my-user-source
      bootstrapServers: my-cluster-kafka-bootstrap:9092 5
      tls: 6
        trustedCertificates:
          - secretName: my-cluster-cluster-cert
            certificate: ca.crt
          - secretName: my-cluster-cluster-cert
            certificate: ca2.crt
      config: 7
        group.id: my-connect-cluster
        offset.storage.topic: my-connect-cluster-offsets
        config.storage.topic: my-connect-cluster-configs
        status.storage.topic: my-connect-cluster-status
        key.converter: org.apache.kafka.connect.json.JsonConverter
        value.converter: org.apache.kafka.connect.json.JsonConverter
        key.converter.schemas.enable: true
        value.converter.schemas.enable: true
        config.storage.replication.factor: 3
        offset.storage.replication.factor: 3
        status.storage.replication.factor: 3
      build: 8
        output: 9
          type: docker
          image: my-registry.io/my-org/my-connect-cluster:latest
          pushSecret: my-registry-credentials
        plugins: 10
          - name: debezium-postgres-connector
            artifacts:
              - type: tgz
                url: https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/1.3.1.Final/debezium-connector-postgres-1.3.1.Final-plugin.tar.gz
                sha512sum: 962a12151bdf9a5a30627eebac739955a4fd95a08d373b86bdcea2b4d0c27dd6e1edd5cb548045e115e33a9e69b1b2a352bee24df035a0447cb820077af00c03
          - name: camel-telegram
            artifacts:
              - type: tgz
                url: https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-telegram-kafka-connector/0.7.0/camel-telegram-kafka-connector-0.7.0-package.tar.gz
                sha512sum: a9b1ac63e3284bea7836d7d24d84208c49cdf5600070e6bd1535de654f6920b74ad950d51733e8020bf4187870699819f54ef5859c7846ee4081507f48873479
      externalConfiguration: 11
        env:
          - name: AWS_ACCESS_KEY_ID
            valueFrom:
              secretKeyRef:
                name: aws-creds
                key: awsAccessKey
          - name: AWS_SECRET_ACCESS_KEY
            valueFrom:
              secretKeyRef:
                name: aws-creds
                key: awsSecretAccessKey
      resources: 12
        requests:
          cpu: "1"
          memory: 2Gi
        limits:
          cpu: "2"
          memory: 2Gi
      logging: 13
        type: inline
        loggers:
          log4j.rootLogger: "INFO"
      readinessProbe: 14
        initialDelaySeconds: 15
        timeoutSeconds: 5
      livenessProbe:
        initialDelaySeconds: 15
        timeoutSeconds: 5
      metricsConfig: 15
        type: jmxPrometheusExporter
        valueFrom:
          configMapKeyRef:
            name: my-config-map
            key: my-key
      jvmOptions: 16
        "-Xmx": "1g"
        "-Xms": "1g"
      image: my-org/my-image:latest 17
      rack:
        topologyKey: topology.kubernetes.io/zone 18
      template: 19
        pod:
          affinity:
            podAntiAffinity:
              requiredDuringSchedulingIgnoredDuringExecution:
                - labelSelector:
                    matchExpressions:
                      - key: application
                        operator: In
                        values:
                          - postgresql
                          - mongodb
                  topologyKey: "kubernetes.io/hostname"
        connectContainer: 20
          env:
            - name: JAEGER_SERVICE_NAME
              value: my-jaeger-service
            - name: JAEGER_AGENT_HOST
              value: jaeger-agent-name
            - name: JAEGER_AGENT_PORT
              value: "6831"
    1
    根据需要,使用 KafkaConnect 或 KafkaConnectS2I
    2
    为 Kafka Connect 集群启用 KafkaConnectors。
    3
    4
    Kafka Connect 集群的身份验证,使用 TLS 机制 (如下所示)、使用 OAuth bearer 令牌 或基于 SASL 的 SCRAM-SHA-512PLAIN 机制。默认情况下,Kafka Connect 使用纯文本连接连接到 Kafka 代理。
    5
    用于连接到 Kafka Connect 集群的 bootstrap 服务器
    6
    使用密钥名称进行 TLS 加密,在其下,TLS 证书以 X.509 格式存储到集群的 X.509 格式。如果证书存储在同一个 secret 中,则可以多次列出证书。
    7
    Kafka Connect worker 而不是连接器)配置。标准 Apache Kafka 配置可能会提供,仅限于不是由 AMQ Streams 直接管理的属性。
    8
    用于自动使用连接器插件构建容器镜像的 构建配置属性
    9
    (必需)配置推送新镜像的容器注册表。
    10
    (必需)要添加到新容器镜像的连接器插件及其工件列表。每个插件必须至少配置一个 工件
    11
    使用环境变量(如下所示)或卷 为 Kafka 连接器的外部配置。您还可以使用 Kubernetes 配置提供程序 从外部来源加载配置值
    12
    请求保留 支持的资源、当前 cpu 和 memory ,以及限制,以指定可消耗的最大资源。
    13
    指定 Kafka 连接日志记录器和日志级别 直接(内联)或通过ConfigMap 间接(外部)添加。自定义 ConfigMap 必须放在 log4j.properties 或 log4j2.properties 键下。对于 Kafka Connect log4j.rootLogger 日志记录器,您可以将日志级别设置为 INFO、ERROR、WARN、TRACE、DEBUG、FATAL 或 OFF。
    14
    健康检查以了解 何时重新启动容器(存活度)以及容器何时可以接受流量(就绪度)。
    15
    Prometheus metrics,通过引用包含本例中 Prometheus JMX 导出器配置的 ConfigMap 来启用它。您可以使用对包含 metricsConfig.valueFrom.configMapKeyRef.key 下的空文件的 ConfigMap 的引用来启用指标,而无需进一步配置。
    16
    运行 Kafka Connect 的虚拟机(VM)性能优化 JVM 配置选项
    17
    ADVANCED OPTION: 容器镜像配置,只在特殊情况下推荐这样做。
    18
    将机架感知 配置为在不同机架之间分布副本。topologykey 必须与集群节点标签匹配。
    19
    模板自定义.在这里,pod 被调度为反关联性,因此 pod 不会调度到具有相同主机名的节点。
    20
  2. 创建或更新资源:

    oc apply -f KAFKA-CONNECT-CONFIG-FILE
  3. 如果为 Kafka Connect 启用了授权,请配置 Kafka Connect 用户以启用对 Kafka Connect consumer 组和主题的访问
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.