2.5. Kafka 接続内でのデータへの操作の適用
Kamelet と Kafka トピック間で渡されるデータで操作を実行する場合は、Kamelet Binding 内の中間ステップとして、アクション Kamelets を使用します。
2.5.1. 異なる宛先トピックへのイベントデータのルーティング
Kafka インスタンスへのコネクションを設定する場合、イベントが異なる Kafka トピックにルーティングされるように、オプションでイベントデータからのトピック情報を変換できます。以下の変換アクション Kamelets のいずれかを使用します。
-
Regex Router: 正規表現と代替文字列を使用してメッセージのトピックを変更します。たとえば、トピック接頭辞を削除する場合は、接頭辞を追加するか、トピック名の一部を削除します。Regex Router Action Kamelet (
regex-router-action
) を設定します。 -
TimeStamp: 元のトピックとメッセージのタイムスタンプに基づいてメッセージのトピックを変更します。たとえば、タイムスタンプに基づいて異なるテーブルまたはインデックスに書き込む必要があるシンクを使用する場合などです。たとえば、Kafka から Elasticsearch にイベントを書きみ、各イベントはイベント自体の情報に基づいて異なるインデックスに移動する必要がある場合。Timestamp Router Action Kamelet(
timestamp-router-action
) を設定します。 -
Message TimeStamp: 元のトピック値とメッセージ値フィールドからのタイムスタンプフィールドに基づいてメッセージのトピックを変更します。Message Timestamp Router Action Kamelet(
message-timestamp-router-action
) を設定します。 -
述語: Predicate Filter Action Kamelet (
predicate-filter-action
) を設定して、指定の JSON パス式に基づいてイベントをフィルターします。
前提条件
-
Kamelet Binding でのデータソースの Kafka トピックへの接続 で説明されているように、シンクが
kafka-sink
Kamelet である Kamelet Binding を作成している。 - Kamelet Binding に追加する変換のタイプを知っている必要があります。
手順
宛先トピックを変更するには、Kamelet Binding 内の中間ステップとして変更アクション Kamelets の 1 つを使用します。
アクション Kamelet を Kamelet Binding に追加する方法は、Kamelet Binding への操作の追加 を参照してください。
2.5.2. 特定の Kafka トピックのイベントデータの絞り込み
多くの異なる Kafka トピックにレコードを生成するソース Kamelet を使用し、レコードを 1 つの Kafka トピックに絞り込む場合は、Kamelet Binding の中間ステップとして topic-name-matches-filter-action
Kamelet を追加します。
前提条件
- YAML ファイルに Kamelet Binding を作成している。
- イベンデータを絞り込む Kafka トピックの名前を知っておく必要があります。
手順
Kamelet Binding を編集して、ソースとシンク Kamelets の間の中間ステップとして
topic-name-matches-filter-action
Kamelet を追加します。通常、ソース Kamelet として
kafka-source
Kamelet を使用し、トピックを必要なtopic
パラメーターの値として指定します。以下の Kamelet Binding の例では、
kafka-source
Kamelet はtest-topic、test-topic-2、および test-topic-3
Kafka トピックを指定し、topic-name-matches-filter-action
Kamelet は、topic-test
トピックからイベントデータをフィルターするように指定します。apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: kafka-to-log-by-topic spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: kafka-source properties: bootstrapServers: "broker.url:9092" password: "testpassword" topic: "test-topic, test-topic-2, test-topic-3" user: "testuser" securityProtocol: "PLAINTEXT" // only for AMQ streams steps: - ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: topic-name-matches-filter-action properties: regex: "test-topic" sink: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: log-sink properties: showStreams: true
kafka-source
Kamelet 以外のソース Kamelet からのトピックをフィルタリングする場合は、Kafka トピック情報を指定する必要があります。以下の例のように、insert-header-action
Kamelet を使用して、Kamelet Binding のtopic-name-matches-filter-action
ステップの前に Kafka トピックフィールドを中間ステップとして追加できます。apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffee-to-log-by-topic spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: coffee-source properties: period: 5000 steps: - ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: insert-header-action properties: name: "KAFKA.topic" value: "test-topic" - ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: topic-name-matches-filter-action properties: regex: "test-topic" sink: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: log-sink properties: showStreams: true
- Kamelet Binding YAML ファイルを保存します。