2.5.2. 为特定 Kafka 主题过滤事件数据
如果您使用一个源 Kamelet,它会将记录扩展到多个不同的 Kafka 主题,并想将记录过滤到一个 Kafka 主题,请添加 topic-name-matches-filter-action
Kamelet 作为 Kamelet 中的 intermediary 步骤。
先决条件
- 您已在 YAML 文件中创建了一个 Kamelet Binding。
- 您知道要从中过滤事件数据的 Kafka 主题的名称。
流程
编辑 Kamelet Binding,将
topic-name-matches-filter-action
Kamelet 作为源与 sink Kamelets 之间的中间步骤包括在内。通常,您可以使用
kafka-source
Kamelet,作为源 Kamelet,您提供一个主题作为所需主题
参数的值。在以下 Kamelet Binding 示例中,
kafka-source
Kamelet 指定test-topic、test-topic-2 和 test-topic-3
Kafka 主题和topic-name-matches-filter-action
Kamelet 指定来过滤来自topic-test
主题的事件数据:apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: kafka-to-log-by-topic spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: kafka-source properties: bootstrapServers: "broker.url:9092" password: "testpassword" topic: "test-topic, test-topic-2, test-topic-3" user: "testuser" securityProtocol: "PLAINTEXT" // only for AMQ streams steps: - ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: topic-name-matches-filter-action properties: regex: "test-topic" sink: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: log-sink properties: showStreams: true
如果要过滤来自
kafka-source
Kamelet 以外的源 Kamelet 的主题,您必须提供 Kafka 主题信息。您可以使用insert-header-action
Kamelet 添加 Kafka topic 字段作为中间步骤,然后再在 Kamelet Binding 中的topic-name-matches-filter-action
步骤之前,如下例所示:apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffee-to-log-by-topic spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: coffee-source properties: period: 5000 steps: - ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: insert-header-action properties: name: "KAFKA.topic" value: "test-topic" - ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: topic-name-matches-filter-action properties: regex: "test-topic" sink: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: log-sink properties: showStreams: true
- 保存 Kamelet Binding YAML 文件。