2.5. 将操作应用到 Kafka 连接中的数据


如果要对 Kamelet 和 Kafka 主题之间的数据执行操作,请使用 action Kamelets 作为 Kamelet Binding 中的中间步骤。

2.5.1. 将事件数据路由到不同的目标主题

当您配置到 Kafka 实例时,您可以选择从事件数据转换主题信息,以便事件路由到不同的 Kafka 主题。使用以下转换操作 Kamelets 之一:

  • 正则表达式路由器 - 使用正则表达式和替换字符串修改消息的主题。例如,如果要删除主题前缀、添加前缀或删除主题名称的一部分。配置 Regex Router Action Kamelet (regex-router-action)。
  • timestamp - 根据原始主题和消息的时间戳修改消息的主题。例如,在使用需要写入不同表或基于时间戳的索引的接收器时。例如,当您要将事件从 Kafka 写入 Elasticsearch 时,但每个事件都需要根据事件本身中的信息进入不同的索引。配置 Timestamp Router Action Kamelet (timestamp-router-action)。
  • Message TimeStamp - 修改消息的主题,它基于原始主题值和来自消息值字段的 timestamp 字段。配置 Message Timestamp Router Action Kamelet (message-timestamp-router-action)。
  • predicate- Filter 事件基于给定的 JSON 路径表达式,方法是配置 Predicate Filter Action Kamelet (predicate-filter-action)。

先决条件

流程

要转换目的地主题,请使用 transformation action Kamelets 作为 Kamelet Binding 中的中间步骤。

有关如何将 action Kamelet 添加到 Kamelet Binding 的详细信息,请参阅 Adding a operation to a Kamelet Binding

2.5.2. 过滤特定 Kafka 主题的事件数据

如果您使用 source Kamelet 将记录生成到许多不同的 Kafka 主题,并且您要将记录过滤成一个 Kafka 主题,请将 topic-name-matches-filter-action Kamelet 作为 Kamelet Binding 中的中间步骤添加 topic-name-matches-filter-action Kamelet。

先决条件

  • 您已在 YAML 文件中创建了一个 Kamelet Binding。
  • 您知道要从中过滤出事件数据的 Kafka 主题的名称。

流程

  1. 编辑 Kamelet Binding,使其包含 topic-name-matches-filter-action Kamelet 作为源和接收器 Kamelets 之间的中间步骤。

    通常,您可以使用 kafka-source Kamelet 作为源 Kamelet,并提供一个主题作为所需 主题参数的值

    在以下 Kamelet Binding 示例中,kafka-source Kamelet 指定 test-topic、test-topic-2 和 test-topic-3 Kafka 主题和 topic-name-matches-filter-action Kamelet 指定从 topic-test 主题过滤事件数据:

    apiVersion: camel.apache.org/v1alpha1
    kind: KameletBinding
    metadata:
      name: kafka-to-log-by-topic
    spec:
      source:
        ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: kafka-source
        properties:
          bootstrapServers: "broker.url:9092"
          password: "testpassword"
          topic: "test-topic, test-topic-2, test-topic-3"
          user: "testuser"
          securityProtocol: "PLAINTEXT" // only for AMQ streams
    steps:
      - ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: topic-name-matches-filter-action
        properties:
          regex:  "test-topic"
    sink:
        ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: log-sink
        properties:
            showStreams: true

    如果要过滤 kafka-source Kamelet 以外的源 Kamelet 主题,您必须提供 Kafka 主题信息。您可以使用 insert-header-action Kamelet 将 Kafka topic 字段添加为中间步骤,在 Kamelet Binding 中的 topic-name-matches-filter-action 步骤前,如下例所示:

    apiVersion: camel.apache.org/v1alpha1
    kind: KameletBinding
    metadata:
      name: coffee-to-log-by-topic
    spec:
      source:
        ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: coffee-source
        properties:
          period: 5000
    steps:
      - ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: insert-header-action
        properties:
          name:  "KAFKA.topic"
          value:  "test-topic"
      - ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: topic-name-matches-filter-action
        properties:
          regex:  "test-topic"
    sink:
        ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: log-sink
        properties:
          showStreams: true
  2. 保存 Kamelet Binding YAML 文件。
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.