2.5. 将操作应用到 Kafka 连接中的数据
如果要在 Kamelet 和 Kafka 主题之间传递的数据执行操作,请在 Kamelet Binding 中使用 action Kamelets 作为中间步骤。
2.5.1. 将事件数据路由到不同的目的地主题
当您配置到 Kafka 实例时,您可以选择将主题信息从事件数据转换,以便事件路由到不同的 Kafka 主题。使用以下转换操作 Kamelets 之一:
-
regex Router - 使用正则表达式和替换字符串修改消息的主题。例如,如果要删除主题前缀,请添加前缀或删除主题名称的一部分。配置 Regex Router Action Kamelet (
regex-router-action
)。 -
timestamp - 根据原始主题和消息的时间戳修改消息的主题。例如,在使用需要写入不同表或基于时间戳的索引的接收器时。例如,当您要将事件从 Kafka 写入 Elasticsearch 时,每个事件都需要根据事件本身中的信息进入不同的索引。配置 Timestamp Router Action Kamelet (
timestamp-router-action
)。 -
message Time Stamp - 修改基于原始主题值的消息主题,以及来自消息值字段的时间戳字段。配置 Message Timestamp Router Action Kamelet (
message-timestamp-router-action
)。 -
predicate - 通过配置 Predicate Filter Action Kamelet (
predicate-filter-action
)来根据给定的 JSON 路径表达式过滤事件。
先决条件
-
您已创建了一个 Kamelet Binding,接收器是一个
kafka-sink
Kamelet,如 将数据源连接到 Kamelet Binding 中的 Kafka 主题 中所述。 - 您知道您要添加到 Kamelet Binding 中的哪些类型的转换。
流程
要转换目标主题,请在 Kamelet Binding 中使用一个转换操作 Kamelets 作为中间步骤。
有关如何在 Kamelet Binding 中添加操作 Kamelet 的详情,请参阅 在 Kamelet Binding 中添加操作。
2.5.2. 过滤特定 Kafka 主题的事件数据
如果您使用源 Kamelet 将记录生成到许多不同的 Kafka 主题,并且您要将记录过滤到一个 Kafka 主题,请在 Kamelet Binding 中添加 topic-name-matches-filter-action
Kamelet 作为中间步骤。
先决条件
- 您已在 YAML 文件中创建了一个 Kamelet Binding。
- 您知道要从中过滤出事件数据的 Kafka 主题的名称。
流程
编辑 Kamelet Binding,将
topic-name-matches-filter-action
Kamelet 作为源和目标 Kamelets 之间的中间步骤包含。通常,您可以使用
kafka-source
Kamelet 作为源 Kamelet,您提供了一个主题作为所需主题
参数的值。在以下 Kamelet Binding 示例中,
kafka-source
Kamelet 指定test-topic、test-topic-2 和 test-topic-3
Kafka 主题和topic-name-matches-filter-action
Kamelet 指定过滤出topic-test
主题的事件数据:apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: kafka-to-log-by-topic spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: kafka-source properties: bootstrapServers: "broker.url:9092" password: "testpassword" topic: "test-topic, test-topic-2, test-topic-3" user: "testuser" securityProtocol: "PLAINTEXT" // only for AMQ streams steps: - ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: topic-name-matches-filter-action properties: regex: "test-topic" sink: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: log-sink properties: showStreams: true
如果要过滤来自
kafka-source
Kamelet 以外的源 Kamelet 的主题,您必须提供 Kafka 主题信息。您可以使用insert-header-action
Kamelet 将 Kafka 主题字段添加为中间步骤,在 Kamelet Binding 中的topic-name-matches-filter-action
步骤前,如下例所示:apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffee-to-log-by-topic spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: coffee-source properties: period: 5000 steps: - ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: insert-header-action properties: name: "KAFKA.topic" value: "test-topic" - ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: topic-name-matches-filter-action properties: regex: "test-topic" sink: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: log-sink properties: showStreams: true
- 保存 Kamelet Binding YAML 文件。