1.3. コネクション内のデータへの操作の適用
Kamelet とイベントチャネルの間に渡すデータで操作を実行する場合は、Kamelet Binding 内の中間ステップとしてアクション Kamelets を使用します。たとえば、アクション Kamelet を使用して、データをシリアライズまたはデシリアライズしたり、データをフィルターしたり、フィールドやメッセージヘッダーを挿入したりできます。
フィールドのフィルターリングや追加などの操作操作は、JSON データ (Content-Type
ヘッダーが application/json
に設定されている場合) でのみ動作します。イベントデータが JSON 以外の形式 (Avro または Protocol Buffers など) を使用する場合は、操作アクションおよびその後のシリアライズステップ (例: protobuf-deserialize-action
または avro-deserialize-action
Kamelet を参照する) の前にデシリアライズステップ (例: protobuf-serialize-action
または avro-serialize-action
Kamelet を参照する) を追加して、データの形式を変換する必要があります。接続のデータ形式を変換する方法は、データ変換 Kamelets を参照してください。
アクション Kamelets には以下が含まれます。
1.3.1. Kamelet Binding への操作の追加
アクション Kamelet を実装するには、Kamelet Binding ファイルの spec
セクションで、ソースセクションとシンクセクションの間に steps
セクションを追加します。
前提条件
- Kamelet Binding でのソースおよびシンクコンポーネントの接続 で説明されているように、Kamelet Binding を作成している。
Kamelet Binding に追加するアクション Kamelet とアクション Kamelet に必要なパラメーターを知っている必要があります。
この手順の例では、
predicate-filter-action
Kamelet のパラメーターはstring
タイプの式で、コーヒーデータをフィルターして "deep" taste intensity を持つコーヒーだけをログに記録するための JSON パス式を提供します。predicate-filter-action
Kamelet では、Kamelet Binding に Builder トレイト設定プロパティーを設定する必要があります。この例には、イベントデータフォーマットが JSON であるため、この場合はオプションのデシリアライズおよびシリアライズアクションも含まれます。
手順
エディターで
KameletBinding
ファイルを開きます。たとえば、以下は
coffee-to-log.yaml
ファイルの内容になります。apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffee-to-log spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: coffee-source properties: period: 5000 sink: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: log-sink
source
セクションの上にintegration
セクションを追加し、(predicate-filter-action
Kamelet で必要とされるように) 以下の Builder トレイト設定プロパティーを指定します。apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffee-to-log spec: integration: traits: builder: configuration: properties: - "quarkus.arc.unremovable- types=com.fasterxml.jackson.databind.ObjectMapper" source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: coffee-source properties: period: 5000 sink: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: log-sink
source
セクションとsink
セクションの間にsteps
セクションを追加し、アクション Kamelet を定義します。以下に例を示します。apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffee-to-log spec: integration: traits: builder: configuration: properties: - "quarkus.arc.unremovable-types=com.fasterxml.jackson.databind.ObjectMapper" source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: coffee-source properties: period: 5000 steps: - ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: json-deserialize-action - ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: predicate-filter-action properties: expression: "@.intensifier =~ /.*deep/" - ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: json-serialize-action sink: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: log-sink
- 変更を保存します。
以下のように、
oc apply
コマンドを使用してKameletBinding
リソースを更新します。oc apply -f coffee-to-log.yaml
Camel K Operator は再生成し、更新された
KameletBinding
リソースに基づいて生成する CamelK インテグレーションを実行します。Kamelet Binding のステータスを表示するには、以下を実行します。
oc get kameletbindings
対応するインテグレーションのステータスを表示するには、次のコマンドを実行します。
oc get integrations
インテグレーションのログファイルの出力を表示するには、以下を実行します。
kamel logs <integration-name>
たとえば、インテグレーション名が
coffee-to-log
の場合は、以下のコマンドを使用します。kamel logs coffee-to-log
インテグレーションを停止するには、Kamelet Binding を削除します。
oc delete kameletbindings/<kameletbinding-name>
以下に例を示します。
oc delete kameletbindings/coffee-to-log
1.3.2. アクション kamelets
1.3.2.1. データフィルターリング Kamelets
たとえば、機密データの漏えいや不要なネットワーク課金の生成を防ぐためやに、ソースとシンクコンポーネント間で渡されるデータをフィルターリングすることができます。
以下の基準に基づいてデータをフィルターできます。
-
Kafka トピック名: Topic Name Matches Filter Action Kamelet (
topic-name-matches-filter-action
) を設定して、指定の Java 正規表現に一致する名前を持つ Kafka トピックのイベントをフィルターします。詳細は、特定の Kafka トピックのイベントデータの絞り込み を参照してください。 -
ヘッダーキー: Header Filter Action Kamelet (
has-header-filter-action
) を設定して、特定のメッセージヘッダーを持つイベントをフィルターします。 -
null 値: Tombstone Filter Action Kamelet (
is-tombstone-filter-action
) を設定して、null ペイロードを持つイベントであるトゥームストーンイベントをフィルターします。 述語: Predicate Filter Action Kamelet (
predicate-filter-action
) を設定して、指定の JSON パス式に基づいてイベントをフィルターします。predicate-filter-action
Kamelet では、Kamelet Binding に Builder トレイト 設定プロパティーを設定する必要があります。spec: integration: traits: builder: configuration: properties: - "quarkus.arc.unremovable-types=com.fasterxml. jackson.databind.ObjectMapper"
データフィルターリング Kamelets は、JSON データ (つまり、Content-Type ヘッダーが application/json に設定されている場合) で追加設定なしで機能します。イベントデータが JSON 以外の形式を使用する場合は、操作アクションおよびその後のシリアライズステップ (例: protobuf-deserialize-action
または avro-deserialize-action
) の前にデシリアライズステップ (例: protobuf-serialize-action
または avro-serialize-action
) を追加して、データの形式を変換する必要があります。接続のデータ形式を変換する方法は、データ変換 Kamelets を参照してください。
1.3.2.2. データ変換 Kamelets
以下のデータ変換 Kamelets を使用すると、ソースコンポーネントとシンクコンポーネント間で渡すデータの形式をシリアライズおよびデシリアライズできます。データ変換は、イベントデータのペイロード (キーまたはヘッダーではない) に適用されます。
Avro: Apache Hadoop 用のデータのシリアライズおよびデータ交換サービスを提供するオープンソースプロジェクト。
-
Avro デシリアライザーアクション Kamelet (
avro-deserialize-action
) -
Avro シリアライザーアクション Kamelet (
avro-deserialize-action
)
-
Avro デシリアライザーアクション Kamelet (
プロトコルバッファー: 内部で使用する Google が開発した高パフォーマンスのコンパクトなバイナリーワイヤ形式で、内部ネットワークサービスと通信できます。
-
Protobuf Deserialize Action Kamelet (
protobuf-deserialize-action
) -
Protobuf Serialize Action Kamelet (
protobuf-serialize-action
)
-
Protobuf Deserialize Action Kamelet (
JSON (JavaScript Object Notation): JavaScript プログラミング言語のサブセットをベースとしたデータ変換形式。JSON は、言語にまったく依存しないテキスト形式です。
-
JSON Deserialize Action Kamelet (
json-deserialize-action
) -
JSON Serialize Action Kamelet (
json-serialize-action
)
-
JSON Deserialize Action Kamelet (
Avro および Protobuf のシリアライズ/デシリアライズ Kamelets で、スキーマ (JSON 形式を使用した単一行) を指定する必要があります。JSON のシリアライズ/デシリアライズ Kamelets には、その指定は必要ありません。
1.3.2.3. データ変更 Kamelets
以下のデータ変更 Kamelets を使用すると、ソースコンポーネントとシンクコンポーネント間で渡すデータに対して簡単な操作を実行できます。
-
フィールドの抽出:
extract-field-action
Kamelet を使用して、データのボディーからフィールドをプルし、データの本文全体を抽出したフィールドに置き換えます。 -
フィールドの抽出:
hoist-field-action
Kamelet を使用して、データの本文を 1 つのフィールドにラップします。 -
ヘッダーの挿入:
insert-header-action
Kamelet を使用して、静的データまたはレコードメタデータのいずれかを使用してヘッダーフィールドを追加します。 -
フィールドの挿入:
insert-field-action
Kamelet を使用して、静的データまたはレコードメタデータのいずれかを使用してフィールド値を追加します。 mask フィールドのマスク:
mask-field-action
Kamelet を使用して、フィールド値をフィールドタイプの有効な null 値 (0、空の文字列など) または特定の置換値 (置換値は空でない文字列または数値である必要があります) に置き換えます。たとえば、リレーショナルデータベースからデータをキャプチャーして Kafka に送信し、このデータには保護されている (PCI / PII) 情報が含まれる場合、Kafka クラスターがまだ認定されていないため、保護されている情報をマスクする必要があります。
-
フィールドの置き換え:
replace-field-action
Kamelet を使用して、フィールドをフィルターまたは名前に変更します。名前を変更するフィールド、無効にするフィールド (exclude)、有効にするフィールド (include) を指定できます。 -
値/キー:(Kafka の場合)
value-to-key-action
Kamelet を使用して、レコードキーをペイロードのフィールドのサブセットから作成した新しいキーに置き換えます。イベントキーを、データが Kafka に書き込まれる前に、イベント情報に基づく値に設定できます。たとえば、データベーステーブルからレコードを読み取る場合、顧客 ID に基づいて Kafka のレコードをパーティションできます。