5.2.5. 部署示例 KafkaConnector 资源


AMQ Streams 在 example /connect/source-connector.yaml 中包含一个 KafkaConnect or 示例。这会创建一个基本的 FileStreamSourceConnector 实例,它将 Kafka 许可证文件的每一行(一个文件源示例)发送到单个 Kafka 主题。

这个步骤描述了如何创建:

  • 从 Kafka 许可证文件(源)读取数据的 FileStreamSourceConnector,并将数据作为消息写入 Kafka 主题。
  • 从 Kafka 主题读取信息并将消息写入临时文件( sink)的 FileStreamSinkConnector
注意

在生产环境中,您要准备包含所需 Kafka Connect 连接器的容器镜像,如 第 5.2.3 节 “使用连接器插件扩展 Kafka Connect” 所述。

提供 FileStreamSourceConnectorFileStreamSinkConnector 作为示例。如此处所述,在容器中运行这些连接器不太可能适合生产用例。

先决条件

流程

  1. 编辑 example /connect/source-connector.yaml 文件:

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      name: my-source-connector 
    1
    
      labels:
        strimzi.io/cluster: my-connect-cluster 
    2
    
    spec:
      class: org.apache.kafka.connect.file.FileStreamSourceConnector 
    3
    
      tasksMax: 2 
    4
    
      config: 
    5
    
        file: "/opt/kafka/LICENSE" 
    6
    
        topic: my-topic 
    7
    
        # ...
    Copy to Clipboard Toggle word wrap
    1
    KafkaConnector 资源的名称,用作连接器的名称。使用对 OpenShift 资源有效的任何名称。
    2
    用于在其中创建连接器实例的 Kafka Connect 集群的名称。连接器必须部署到与其链接的 Kafka Connect 集群相同的命名空间。
    3
    连接器类的全名或别名.这应该存在于由 Kafka Connect 集群使用的镜像中。
    4
    连接器可以创建的 Kafka Connect 任务 的最大数量。
    5
    连接器配置,作为键值对.
    6
    这个示例源连接器配置从 /opt/kafka/LICENSE 文件中读取数据。
    7
    将源数据发布到的 Kafka 主题。
  2. 在 OpenShift 集群中创建源 KafkaConnector

    oc apply -f examples/connect/source-connector.yaml
    Copy to Clipboard Toggle word wrap
  3. 创建一个 示例/connect/sink-connector.yaml 文件:

    touch examples/connect/sink-connector.yaml
    Copy to Clipboard Toggle word wrap
  4. 将以下 YAML 粘贴到 sink-connector.yaml 文件中:

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      name: my-sink-connector
      labels:
        strimzi.io/cluster: my-connect
    spec:
      class: org.apache.kafka.connect.file.FileStreamSinkConnector 
    1
    
      tasksMax: 2
      config: 
    2
    
        file: "/tmp/my-file" 
    3
    
        topics: my-topic 
    4
    Copy to Clipboard Toggle word wrap
    1
    连接器类的全名或别名.这应该存在于由 Kafka Connect 集群使用的镜像中。
    2
    连接器配置,作为键值对.
    3
    发布源数据的临时文件。
    4
    从中读取源数据的 Kafka 主题。
  5. 在 OpenShift 集群中创建 sink KafkaConnector

    oc apply -f examples/connect/sink-connector.yaml
    Copy to Clipboard Toggle word wrap
  6. 检查是否创建了连接器资源:

    oc get kctr --selector strimzi.io/cluster=MY-CONNECT-CLUSTER -o name
    
    my-source-connector
    my-sink-connector
    Copy to Clipboard Toggle word wrap

    MY-CONNECT-CLUSTER 替换为您的 Kafka 连接集群。

  7. 在容器中,执行 kafka-console-consumer.sh 读取由源连接器写入主题的消息:

    oc exec MY-CLUSTER-kafka-0 -i -t -- bin/kafka-console-consumer.sh --bootstrap-server MY-CLUSTER-kafka-bootstrap.NAMESPACE.svc:9092 --topic my-topic --from-beginning
    Copy to Clipboard Toggle word wrap
源和接收器连接器配置选项

连接器配置在 KafkaConnector 资源的 spec.config 属性中定义。

FileStreamSourceConnectorFileStreamSinkConnector 类支持与 Kafka Connect REST API 相同的配置选项。其他连接器支持不同的配置选项。

Expand
表 5.2. FileStreamSource 连接器类的配置选项
名称类型默认值描述

file

字符串

null

可将消息写入到的源文件。如果没有指定,则使用标准输入。

主题

list

null

发布数据的 Kafka 主题。

Expand
表 5.3. FileStreamSinkConnector 类的配置选项
名称类型默认值描述

file

字符串

null

要写入消息的目标文件。如果没有指定,则使用标准输出。

主题

list

null

一个或多个 Kafka 主题,从中读取数据。

topics.regex

字符串

null

与一个或多个 Kafka 主题匹配以从中读取数据的正则表达式。

返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat