6.5. 添加 Kafka Connect 连接器


Kafka Connect 使用连接器来与其他系统集成以流传输数据。连接器是 Kafka Connector 类的实例,可以是以下类型之一:

源连接器
源连接器是一个运行时实体,它从外部系统获取数据并将其作为信息提供给 Kafka。
sink 连接器
sink 连接器是一个运行时实体,它从 Kafka 主题获取信息并将其传送到外部系统。

Kafka Connect 使用插件架构为连接器提供实施工件。插件允许连接到其他系统,并提供额外的配置来操作数据。插件包括连接器和其他组件,如数据转换器和转换。连接器使用特定类型的外部系统运行。每个连接器都定义了其配置架构。您提供到 Kafka Connect 的配置,以在 Kafka Connect 中创建连接器实例。然后,连接器实例定义了一组用于在系统之间移动数据的任务。

使用以下方法之一在 Kafka Connect 中添加连接器插件:

将插件添加到容器镜像后,您可以使用以下方法启动、停止和管理连接器实例:

您还可以使用这些选项创建新的连接器实例。

6.5.1. 自动使用连接器插件构建新容器镜像

配置 Kafka Connect,以便 AMQ Streams 会自动使用其他连接器构建新容器镜像。您可以使用 KafkaConnect 自定义资源的 .spec.build.plugins 属性定义连接器插件。AMQ Streams 将自动下载并将连接器插件添加到新容器镜像中。容器被推送到 .spec.build.output 中指定的容器存储库,并在 Kafka Connect 部署中自动使用。

先决条件

您需要提供自己的容器 registry,从中可以推送镜像、存储和拉取镜像。AMQ Streams 支持私有容器 registry 以及 QuayDocker Hub 等公共 registry。

流程

  1. 通过在 .spec.build.output 中指定容器 registry 来配置 KafkaConnect 自定义资源,并在 .spec.build.plugins 中指定其他连接器:

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnect
    metadata:
      name: my-connect-cluster
    spec: 
    1
    
      #...
      build:
        output: 
    2
    
          type: docker
          image: my-registry.io/my-org/my-connect-cluster:latest
          pushSecret: my-registry-credentials
        plugins: 
    3
    
          - name: debezium-postgres-connector
            artifacts:
              - type: tgz
                url: https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/2.1.3.Final/debezium-connector-postgres-2.1.3.Final-plugin.tar.gz
                sha512sum: c4ddc97846de561755dc0b021a62aba656098829c70eb3ade3b817ce06d852ca12ae50c0281cc791a5a131cb7fc21fb15f4b8ee76c6cae5dd07f9c11cb7c6e79
          - name: camel-telegram
            artifacts:
              - type: tgz
                url: https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-telegram-kafka-connector/0.11.5/camel-telegram-kafka-connector-0.11.5-package.tar.gz
                sha512sum: d6d9f45e0d1dbfcc9f6d1c7ca2046168c764389c78bc4b867dab32d24f710bb74ccf2a007d7d7a8af2dfca09d9a52ccbc2831fc715c195a3634cca055185bd91
      #...
    Copy to Clipboard Toggle word wrap
    1
    2
    (必需)推送新镜像的容器 registry 的配置。
    3
    (必需)连接器插件及其工件列表,以添加到新容器镜像中。每个插件必须配置至少一个 工件
  2. 创建或更新资源:

    $ oc apply -f <kafka_connect_configuration_file>
    Copy to Clipboard Toggle word wrap
  3. 等待新容器镜像构建,并且部署 Kafka Connect 集群。
  4. 使用 Kafka Connect REST API 或 KafkaConnector 自定义资源使用您添加的连接器插件。

使用 Kafka Connect 基础镜像中的连接器插件创建自定义 Docker 镜像。将自定义镜像添加到 /opt/kafka/plugins 目录中。

您可以使用 Red Hat Ecosystem Catalog 上的 Kafka 容器镜像作为基础镜像,以使用额外的连接器插件创建自己的自定义镜像。

在启动时,Kafka Connect 的 AMQ Streams 版本会加载 /opt/kafka/plugins 目录中包含的任何第三方连接器插件。

流程

  1. 使用 registry.redhat.io/amq-streams/kafka-36-rhel8:2.6.0 作为基础镜像,创建一个新的 Dockerfile

    FROM registry.redhat.io/amq-streams/kafka-36-rhel8:2.6.0
    USER root:root
    COPY ./my-plugins/ /opt/kafka/plugins/
    USER 1001
    Copy to Clipboard Toggle word wrap

    插件文件示例

    $ tree ./my-plugins/
    ./my-plugins/
    ├── debezium-connector-mongodb
    │   ├── bson-<version>.jar
    │   ├── CHANGELOG.md
    │   ├── CONTRIBUTE.md
    │   ├── COPYRIGHT.txt
    │   ├── debezium-connector-mongodb-<version>.jar
    │   ├── debezium-core-<version>.jar
    │   ├── LICENSE.txt
    │   ├── mongodb-driver-core-<version>.jar
    │   ├── README.md
    │   └── # ...
    ├── debezium-connector-mysql
    │   ├── CHANGELOG.md
    │   ├── CONTRIBUTE.md
    │   ├── COPYRIGHT.txt
    │   ├── debezium-connector-mysql-<version>.jar
    │   ├── debezium-core-<version>.jar
    │   ├── LICENSE.txt
    │   ├── mysql-binlog-connector-java-<version>.jar
    │   ├── mysql-connector-java-<version>.jar
    │   ├── README.md
    │   └── # ...
    └── debezium-connector-postgres
        ├── CHANGELOG.md
        ├── CONTRIBUTE.md
        ├── COPYRIGHT.txt
        ├── debezium-connector-postgres-<version>.jar
        ├── debezium-core-<version>.jar
        ├── LICENSE.txt
        ├── postgresql-<version>.jar
        ├── protobuf-java-<version>.jar
        ├── README.md
        └── # ...
    Copy to Clipboard Toggle word wrap

    COPY 命令指向要复制到容器镜像的插件文件。

    本例为 Debezium 连接器(MongoDB、MySQL 和 PostgreSQL)添加了插件,但并不为所有文件都列出。在 Kafka Connect 中运行 Debezium 会查找与其他 Kafka Connect 任务相同的。

  2. 构建容器镜像。
  3. 将自定义镜像推送到容器 registry。
  4. 指向新容器镜像。

    您可以使用以下方法之一指向镜像:

    • 编辑 KafkaConnect 自定义资源的 KafkaConnect.spec.image 属性。

      如果设置,此属性覆盖 Cluster Operator 中的 STRIMZI_KAFKA_CONNECT_IMAGES 环境变量。

      apiVersion: kafka.strimzi.io/v1beta2
      kind: KafkaConnect
      metadata:
        name: my-connect-cluster
      spec: 
      1
      
        #...
        image: my-new-container-image 
      2
      
        config: 
      3
      
          #...
      Copy to Clipboard Toggle word wrap
      1
      2
      pod 的 docker 镜像。
      3
      配置 Kafka Connect worker (而不是连接器)。
    • 编辑 install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml 文件中的 STRIMZI_KAFKA_CONNECT_IMAGES 环境变量以指向新的容器镜像,然后重新安装 Cluster Operator。

6.5.3. 部署 KafkaConnector 资源

部署 KafkaConnector 资源以管理连接器。KafkaConnector 自定义资源提供了由 Cluster Operator 管理连接器的 OpenShift 原生方法。您不需要发送 HTTP 请求来管理连接器,如 Kafka Connect REST API 一样。您可以通过更新其对应的 KafkaConnector 资源来管理正在运行的连接器实例,然后应用更新。Cluster Operator 更新正在运行的连接器实例的配置。您可以通过删除其对应的 KafkaConnector 来删除连接器。

KafkaConnector 资源必须部署到它们所链接的 Kafka Connect 集群相同的命名空间中。

在此显示的配置中,autoRestart 功能被启用 (enabled: true),用于自动重启失败的连接器和任务。您还可以注解 KafkaConnector 资源来 重启连接器 或手动 重启连接器任务

连接器示例

您可以使用自己的连接器,或尝试 AMQ Streams 提供的示例。直到 Apache Kafka 3.1.0,Apache Kafka 中包含示例文件连接器插件。从 Apache Kafka 的 3.1.1 和 3.2.0 版本开始,需要将示例 作为任何其他连接器添加到插件路径中

AMQ Streams 为示例文件连接器插件提供了一个 示例 KafkaConnector 配置文件 (examples/connect/source-connector.yaml) ,它会创建以下连接器实例作为 KafkaConnector 资源:

  • FileStreamSourceConnector 实例从 Kafka 许可证文件(源)读取每行,并将数据作为消息写入单个 Kafka 主题。
  • 一个 FileStreamSinkConnector 实例,从 Kafka 主题读取信息,并将信息写入临时文件(接收器)。

我们使用示例文件在此流程中创建连接器。

注意

示例连接器不应在生产环境中使用。

先决条件

  • Kafka Connect 部署
  • Cluster Operator 正在运行

流程

  1. 使用以下方法之一将 FileStreamSourceConnectorFileStreamSinkConnector 插件添加到 Kafka Connect:

  2. 在 Kafka Connect 配置中将 strimzi.io/use-connector-resources 注解设置为 true

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnect
    metadata:
      name: my-connect-cluster
      annotations:
        strimzi.io/use-connector-resources: "true"
    spec:
        # ...
    Copy to Clipboard Toggle word wrap

    启用 KafkaConnector 资源后,Cluster Operator 会监视它们。

  3. 编辑 examples/connect/source-connector.yaml 文件:

    KafkaConnector 源连接器配置示例

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      name: my-source-connector  
    1
    
      labels:
        strimzi.io/cluster: my-connect-cluster 
    2
    
    spec:
      class: org.apache.kafka.connect.file.FileStreamSourceConnector 
    3
    
      tasksMax: 2 
    4
    
      autoRestart: 
    5
    
        enabled: true
      config: 
    6
    
        file: "/opt/kafka/LICENSE" 
    7
    
        topic: my-topic 
    8
    
        # ...
    Copy to Clipboard Toggle word wrap

    1
    KafkaConnector 资源的名称,用作连接器的名称。使用对 OpenShift 资源有效的任何名称。
    2
    在其中创建连接器实例的 Kafka Connect 集群的名称。连接器必须部署到它们所链接的 Kafka Connect 集群相同的命名空间中。
    3
    连接器类的全名。这应该存在于 Kafka Connect 集群使用的镜像中。
    4
    连接器可创建的最大 Kafka Connect 任务数量。
    5
    启用自动重启失败的连接器和任务。默认情况下,重启数量是无限的,但您可以使用 maxRestarts 属性设置自动重启次数的最大值。
    6
    连接器配置 作为键值对。
    7
    外部数据文件的位置。在本例中,我们将 FileStreamSourceConnector 配置为从 /opt/kafka/LICENSE 文件中读取。
    8
    将源数据发布到的 Kafka 主题。
  4. 在 OpenShift 集群中创建源 KafkaConnector

    oc apply -f examples/connect/source-connector.yaml
    Copy to Clipboard Toggle word wrap
  5. 创建一个 examples/connect/sink-connector.yaml 文件:

    touch examples/connect/sink-connector.yaml
    Copy to Clipboard Toggle word wrap
  6. 将以下 YAML 粘贴到 sink-connector.yaml 文件中:

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      name: my-sink-connector
      labels:
        strimzi.io/cluster: my-connect
    spec:
      class: org.apache.kafka.connect.file.FileStreamSinkConnector 
    1
    
      tasksMax: 2
      config: 
    2
    
        file: "/tmp/my-file" 
    3
    
        topics: my-topic 
    4
    Copy to Clipboard Toggle word wrap
    1
    连接器类的完整名称或别名。这应该存在于 Kafka Connect 集群使用的镜像中。
    2
    连接器配置 作为键值对。
    3
    将源数据发布到的临时文件。
    4
    从中读取源数据的 Kafka 主题。
  7. 在 OpenShift 集群中创建 sink KafkaConnector

    oc apply -f examples/connect/sink-connector.yaml
    Copy to Clipboard Toggle word wrap
  8. 检查是否已创建连接器资源:

    oc get kctr --selector strimzi.io/cluster=<my_connect_cluster> -o name
    
    my-source-connector
    my-sink-connector
    Copy to Clipboard Toggle word wrap

    将 <my_connect_cluster> 替换为 Kafka Connect 集群的名称。

  9. 在容器中,执行 kafka-console-consumer.sh 以读取源连接器写入该主题的消息:

    oc exec <my_kafka_cluster>-kafka-0 -i -t -- bin/kafka-console-consumer.sh --bootstrap-server <my_kafka_cluster>-kafka-bootstrap.NAMESPACE.svc:9092 --topic my-topic --from-beginning
    Copy to Clipboard Toggle word wrap

    将 <my_kafka_cluster> 替换为 Kafka 集群的名称。

源和接收器连接器配置选项

连接器配置在 KafkaConnector 资源的 spec.config 属性中定义。

FileStreamSourceConnectorFileStreamSinkConnector 类支持与 Kafka Connect REST API 相同的配置选项。其他连接器支持不同的配置选项。

Expand
表 6.1. FileStreamSource 连接器类的配置选项
名称类型默认值Description

file

字符串

null

将消息写入的源文件。如果没有指定,则使用标准输入。

topic

list

null

将数据发布到的 Kafka 主题。

Expand
表 6.2. FileStreamSinkConnector 类的配置选项
名称类型默认值Description

file

字符串

null

将消息写入的目标文件。如果没有指定,则使用标准输出。

topics

list

null

从一个或多个 Kafka 主题读取数据。

topics.regex

字符串

null

与一个或多个 Kafka 主题匹配的正则表达式,用于读取数据。

6.5.4. 公开 Kafka Connect API

使用 Kafka Connect REST API 作为使用 KafkaConnector 资源管理连接器的替代选择。Kafka Connect REST API 作为一个运行在 <connect_cluster_name>-connect-api:8083 的服务其中 <connect_cluster_name> 是 Kafka Connect 集群的名称。服务在创建 Kafka Connect 实例时创建。

Kafka Connect REST API 支持的操作信息包括在 Apache Kafka Connect API 文档

注意

strimzi.io/use-connector-resources 注解启用 KafkaConnectors。如果您将注解应用到 KafkaConnect 资源配置,则需要将其删除以使用 Kafka Connect API。否则,Cluster Operator 会恢复使用 Kafka Connect REST API 进行的手动更改。

您可以将连接器配置添加为 JSON 对象。

添加连接器配置的 curl 请求示例

curl -X POST \
  http://my-connect-cluster-connect-api:8083/connectors \
  -H 'Content-Type: application/json' \
  -d '{ "name": "my-source-connector",
    "config":
    {
      "connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector",
      "file": "/opt/kafka/LICENSE",
      "topic":"my-topic",
      "tasksMax": "4",
      "type": "source"
    }
}'
Copy to Clipboard Toggle word wrap

API 只能在 OpenShift 集群中访问。如果要使 Kafka Connect API 可以被 OpenShift 集群外部运行的应用程序访问,您可以通过创建以下功能之一来手动公开它:

  • LoadBalancerNodePort 类型服务
  • Ingress 资源(仅限 Kubernetes)
  • OpenShift 路由(仅限 OpenShift)
注意

连接是不安全的,因此建议从外部访问。

如果您决定创建服务,请使用来自 <connect_cluster_name>-connect-api 服务的 selector 配置服务将流量路由到的 pod:

服务的选择器配置

# ...
selector:
  strimzi.io/cluster: my-connect-cluster 
1

  strimzi.io/kind: KafkaConnect
  strimzi.io/name: my-connect-cluster-connect 
2

#...
Copy to Clipboard Toggle word wrap

1
OpenShift 集群中的 Kafka Connect 自定义资源的名称。
2
Cluster Operator 创建的 Kafka Connect 部署的名称。

您还必须创建一个允许来自外部客户端的 HTTP 请求的 NetworkPolicy

允许对 Kafka Connect API 的请求的 NetworkPolicy 示例

apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: my-custom-connect-network-policy
spec:
  ingress:
  - from:
    - podSelector: 
1

        matchLabels:
          app: my-connector-manager
    ports:
    - port: 8083
      protocol: TCP
  podSelector:
    matchLabels:
      strimzi.io/cluster: my-connect-cluster
      strimzi.io/kind: KafkaConnect
      strimzi.io/name: my-connect-cluster-connect
  policyTypes:
  - Ingress
Copy to Clipboard Toggle word wrap

1
允许连接到 API 的 pod 标签。

要在集群外添加连接器配置,请使用 curl 命令中公开 API 的资源 URL。

6.5.5. 限制对 Kafka Connect API 的访问

仅将对 Kafka Connect API 的访问限制为可信用户,以防止未经授权的操作和潜在的安全问题。Kafka Connect API 提供了大量更改连接器配置的功能,这有助于采取安全措施。有权访问 Kafka Connect API 的人员可能会获得管理员可能假设的敏感信息是安全的。

Kafka Connect REST API 可以被经过身份验证访问 OpenShift 集群的任何人访问,并知道端点 URL,其中包括主机名/IP 地址和端口号。

例如,假设机构使用 Kafka Connect 集群和连接器将客户数据库的敏感数据流传输到中央数据库。管理员使用配置供应商插件存储与连接到客户数据库和中央数据库相关的敏感信息,如数据库连接详情和身份验证凭据。配置提供程序保护此敏感信息无法向未授权用户公开。但是,有权访问 Kafka Connect API 的用户仍然可以获得对客户数据库的访问权限,而无需经过管理员的批准。他们可以通过设置一个假的数据库并将连接器配置为连接它。然后,他们可以修改连接器配置以指向客户数据库,但不是将数据发送到中央数据库,而是将其发送到假的数据库。通过将连接器配置为连接到假的数据库,可以截获连接到客户端数据库的登录详情和凭证,即使它们被安全地存储在配置提供程序中。

如果您使用 KafkaConnector 自定义资源,默认情况下,OpenShift RBAC 规则只允许 OpenShift 集群管理员更改连接器。您还可以指定非集群管理员来管理 AMQ Streams 资源。在 Kafka Connect 配置中启用 KafkaConnector 资源后,Cluster Operator 会恢复使用 Kafka Connect REST API 所做的更改。如果您不使用 KafkaConnector 资源,默认的 RBAC 规则不会限制对 Kafka Connect API 的访问。如果要使用 OpenShift RBAC 限制对 Kafka Connect REST API 的直接访问,则需要启用和使用 KafkaConnector 资源。

为提高安全性,我们建议为 Kafka Connect API 配置以下属性:

org.apache.kafka.disallowed.login.modules

(Kafka 3.4 或更高版本)设置 org.apache.kafka.disallowed.login.modules Java 系统属性,以防止使用不安全的登录模块。例如,指定 com.sun.security.auth.module.JndiLoginModule 会阻止使用 Kafka JndiLoginModule

禁止登录模块配置示例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  # ...
  jvmOptions:
    javaSystemProperties:
      - name: org.apache.kafka.disallowed.login.modules
        value: com.sun.security.auth.module.JndiLoginModule, org.apache.kafka.common.security.kerberos.KerberosLoginModule
# ...
Copy to Clipboard Toggle word wrap

只允许可信登录模块,并按照您使用的版本的 Kafka 的最新建议进行操作。作为最佳实践,您应该使用 org.apache.kafka.disallowed.login.modules 系统属性在 Kafka Connect 配置中明确禁止不安全的登录模块。

connector.client.config.override.policy

connector.client.config.override.policy 属性设置为 None,以防止连接器配置覆盖 Kafka Connect 配置及其使用的使用者和制作者。

指定连接器覆盖策略的配置示例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  # ...
  config:
    connector.client.config.override.policy: None
# ...
Copy to Clipboard Toggle word wrap

您可以从使用 Kafka Connect API 切换到使用 KafkaConnector 自定义资源来管理连接器。要进行交换机,请按照以下顺序执行以下操作:

  1. 使用配置部署 KafkaConnector 资源,以创建您的连接器实例。
  2. 通过将 strimzi.io/use-connector-resources 注解设置为 true 来启用 Kafka Connect 配置中的 KafkaConnector 资源。
警告

如果在创建它们前启用 KafkaConnector 资源,请删除所有连接器。

要从使用 KafkaConnector 资源切换到使用 Kafka Connect API,首先从 Kafka Connect 配置中删除启用 KafkaConnector 资源的注解。否则,Cluster Operator 会恢复使用 Kafka Connect REST API 进行的手动更改。

在进行交换机时 ,检查 KafkaConnect 资源的状态metadata.generation (部署的当前版本)的值必须与 status.observedGeneration (资源的最新协调)匹配。当 Kafka Connect 集群为 Ready 时,您可以删除 KafkaConnector 资源。

返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat