1.3. コネクション内のデータへの操作の適用


Kamelet とイベントチャネルの間に渡すデータで操作を実行する場合は、Kamelet Binding 内の中間ステップとしてアクション Kamelets を使用します。たとえば、アクション Kamelet を使用して、データをシリアライズまたはデシリアライズしたり、データをフィルターしたり、フィールドやメッセージヘッダーを挿入したりできます。

フィールドのフィルタリングや追加などの操作操作は、JSON データ (Content-Type ヘッダーが application/json に設定されている場合) でのみ動作します。イベントデータが JSON 以外の形式 (Avro または Protocol Buffers など) を使用する場合は、操作アクションおよびその後のシリアライズステップ (例: protobuf-deserialize-action または avro-deserialize-action Kamelet を参照する) の前にデシリアライズステップ (例: protobuf-serialize-action または avro-serialize-action Kamelet を参照する) を追加して、データの形式を変換する必要があります。接続のデータ形式を変換する方法は、Data conversion Kamelets を参照してください。

アクション Kamelets には以下が含まれます。

1.3.1. Kamelet Binding への操作の追加

アクション Kamelet を実装するには、Kamelet Binding ファイルの spec セクションで、ソースセクションとシンクセクションの間に steps セクションを追加します。

前提条件

  • Kamelet Binding でのソースおよびシンクコンポーネントの接続 で説明されているように、Kamelet Binding を作成している。
  • Kamelet Binding に追加するアクション Kamelet とアクション Kamelet に必要なパラメーターを知っている必要があります。

    この手順の例では、predicate-filter-action Kamelet のパラメーターは string タイプの式で、コーヒーデータをフィルターして "deep" taste intensity を持つコーヒーだけをログに記録するための JSON パス式を提供します。predicate-filter-action Kamelet では、Kamelet Binding に Builder トレイト設定プロパティーを設定する必要があります。

    この例には、イベントデータフォーマットが JSON であるため、この場合はオプションのデシリアライズおよびシリアライズアクションも含まれます。

手順

  1. エディターで KameletBinding ファイルを開きます。

    たとえば、以下は coffee-to-log.yaml ファイルの内容になります。

    apiVersion: camel.apache.org/v1alpha1
    kind: KameletBinding
    metadata:
      name: coffee-to-log
    spec:
      source:
        ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: coffee-source
        properties:
          period: 5000
      sink:
        ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: log-sink
  2. source セクションの上に integration セクションを追加し、(predicate-filter-action Kamelet で必要とされるように) 以下の Builder トレイト設定プロパティーを指定します。

    apiVersion: camel.apache.org/v1alpha1
    kind: KameletBinding
    metadata:
      name: coffee-to-log
    spec:
      integration:
             traits:
               builder:
                 configuration:
                   properties:
                     - "quarkus.arc.unremovable- types=com.fasterxml.jackson.databind.ObjectMapper"
      source:
        ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: coffee-source
        properties:
          period: 5000
      sink:
        ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: log-sink
  3. source セクションと sink セクションの間に steps セクションを追加し、アクション Kamelet を定義します。以下に例を示します。

    apiVersion: camel.apache.org/v1alpha1
    kind: KameletBinding
    metadata:
      name: coffee-to-log
      spec:
        integration:
               traits:
                 builder:
                   configuration:
                     properties:
                       - "quarkus.arc.unremovable-types=com.fasterxml.jackson.databind.ObjectMapper"
      source:
        ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: coffee-source
        properties:
          period: 5000
      steps:
      - ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: json-deserialize-action
      - ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: predicate-filter-action
        properties:
          expression: "@.intensifier =~ /.*deep/"
      - ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: json-serialize-action
      sink:
        ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: log-sink
  4. 変更を保存します。
  5. 以下のように、oc apply コマンドを使用して KameletBinding リソースを更新します。

    oc apply -f coffee-to-log.yaml

    Camel K Operator は再生成し、更新された KameletBinding リソースに基づいて生成する CamelK インテグレーションを実行します。

  6. Kamelet Binding のステータスを表示するには、以下を実行します。

    oc get kameletbindings

  7. 対応するインテグレーションのステータスを表示するには、次のコマンドを実行します。

    oc get integrations

  8. インテグレーションのログファイルの出力を表示するには、以下を実行します。

    kamel logs <integration-name>

    たとえば、インテグレーション名が coffee-to-log の場合は、以下のコマンドを使用します。

    kamel logs coffee-to-log

  9. インテグレーションを停止するには、Kamelet Binding を削除します。

    oc delete kameletbindings/<kameletbinding-name>

    以下に例を示します。

    oc delete kameletbindings/coffee-to-log

1.3.2. アクション kamelets

1.3.2.1. データフィルタリング Kamelets

たとえば、機密データの漏えいや不要なネットワーク課金の生成を防ぐためやに、ソースとシンクコンポーネント間で渡されるデータをフィルタリングすることができます。

以下の基準に基づいてデータをフィルターできます。

  • Kafka トピック名: Topic Name Matches Filter Action Kamelet (topic-name-matches-filter-action) を設定して、指定の Java 正規表現に一致する名前を持つ Kafka トピックのイベントをフィルターします。詳細は、特定の Kafka トピックのイベントデータの絞り込み を参照してください。
  • ヘッダーキー: Header Filter Action Kamelet (has-header-filter-action) を設定して、特定のメッセージヘッダーを持つイベントをフィルターします。
  • null 値: Tombstone Filter Action Kamelet (is-tombstone-filter-action) を設定して、null ペイロードを持つイベントであるトゥームストーンイベントをフィルターします。
  • 述語: Predicate Filter Action Kamelet (predicate-filter-action) を設定して、指定の JSON パス式に基づいてイベントをフィルターします。predicate-filter-action Kamelet では、Kamelet Binding に以下の Builder トレイト 設定プロパティーを設定する必要があります。

    spec:
      integration:
        traits:
          builder:
            configuration:
              properties:
               - "quarkus.arc.unremovable-types=com.fasterxml.
                    jackson.databind.ObjectMapper"
注記

データフィルタリング Kamelets は、JSON データ (つまり、Content-Type ヘッダーが application/json に設定されている場合) で追加設定なしで機能します。イベントデータが JSON 以外の形式を使用する場合は、操作アクションおよびその後のシリアライズステップ (例: protobuf-deserialize-action または avro-deserialize-action) の前にデシリアライズステップ (例: protobuf-serialize-action または avro-serialize-action) を追加して、データの形式を変換する必要があります。接続のデータ形式を変換する方法は、Data conversion Kamelets を参照してください。

1.3.2.2. データ変換 Kamelets

以下のデータ変換 Kamelets を使用すると、ソースコンポーネントとシンクコンポーネント間で渡すデータの形式をシリアライズおよびデシリアライズできます。データ変換は、イベントデータのペイロード (キーまたはヘッダーではない) に適用されます。

  • Avro: Apache Hadoop 用のデータのシリアライズおよびデータ交換サービスを提供するオープンソースプロジェクト。

    • Avro デシリアライザーアクション Kamelet (avro-deserialize-action)
    • Avro シリアライザーアクション Kamelet (avro-deserialize-action)
  • プロトコルバッファー: 内部で使用する Google が開発した高パフォーマンスのコンパクトなバイナリーワイヤ形式で、内部ネットワークサービスと通信できます。

    • Protobuf Deserialize Action Kamelet (protobuf-deserialize-action)
    • Protobuf Serialize Action Kamelet (protobuf-serialize-action)
  • JSON (JavaScript Object Notation): JavaScript プログラミング言語のサブセットをベースとしたデータ変換形式。JSON は、言語にまったく依存しないテキスト形式です。

    • JSON Deserialize Action Kamelet (json-deserialize-action)
    • JSON Serialize Action Kamelet (json-serialize-action)
注記

Avro および Protobuf のシリアライズ/デシリアライズ Kamelets で、スキーマ (JSON 形式を使用した単一行) を指定する必要があります。JSON のシリアライズ/デシリアライズ Kamelets には、その指定は必要ありません。

1.3.2.3. データ変更 Kamelets

以下のデータ変更 Kamelets を使用すると、ソースコンポーネントとシンクコンポーネント間で渡すデータに対して簡単な操作を実行できます。

  • フィールドの抽出: extract-field-action Kamelet を使用して、データのボディーからフィールドをプルし、データの本文全体を抽出したフィールドに置き換えます。
  • フィールドの抽出: hoist-field-action Kamelet を使用して、データの本文を 1 つのフィールドにラップします。
  • ヘッダーの挿入: insert-header-action Kamelet を使用して、静的データまたはレコードメタデータのいずれかを使用してヘッダーフィールドを追加します。
  • フィールドの挿入: insert-field-action Kamelet を使用して、静的データまたはレコードメタデータのいずれかを使用してフィールド値を追加します。
  • mask フィールドのマスク: mask-field-action Kamelet を使用して、フィールド値をフィールドタイプの有効な null 値 (0、空の文字列など) または特定の置換値 (置換値は空でない文字列または数値である必要があります) に置き換えます。

    たとえば、リレーショナルデータベースからデータをキャプチャーして Kafka に送信し、このデータには保護されている (PCI / PII) 情報が含まれる場合、Kafka クラスターがまだ認定されていないため、保護されている情報をマスクする必要があります。

  • フィールドの置き換え: replace-field-action Kamelet を使用して、フィールドをフィルターまたは名前に変更します。名前を変更するフィールド、無効にするフィールド (exclude)、有効にするフィールド (include) を指定できます。
  • 値/キー:(Kafka の場合) value-to-key-action Kamelet を使用して、レコードキーをペイロードのフィールドのサブセットから作成した新しいキーに置き換えます。イベントキーを、データが Kafka に書き込まれる前に、イベント情報に基づく値に設定できます。たとえば、データベーステーブルからレコードを読み取る場合、顧客 ID に基づいて Kafka のレコードをパーティションできます。
Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.