2.3. Connecting a data source to a Kafka topic in a kamelet binding


データソースを Kafka トピックに接続するには、図 2.2 にあるように kamelet バインディングを作成します。

Connecting a data source to a Kafka topic 「Kafka トピックへのデータソースの接続」

前提条件

  • イベントの送信先となる Kafka トピックの名前を知っておく必要があります。

    この手順の例では、イベントを受信するために test-topic を使用しています。

  • Kafka インスタンスの以下のパラメーターの値を知っている必要があります。

    • bootstrapServers: Kafka Broker URL のコンマ区切りリスト
    • password: Kafka に対して認証を行うためのパスワード
    • user: Kafka に対して認証するユーザー名

      OpenShift Streams の使用時にこれらの値を取得する方法は、「Kafka クレデンシャルの取得」を参照してください。

      AMQストリームでの Kafka 認証に関する詳細は、「Managing secure access to Kafka」を参照してください。

  • Kafka ブローカーとの通信のセキュリティープロトコルを把握している。OpenShift Streams の Kafka クラスターでは、SASL_SSL (デフォルト) です。AMQ ストリームの Kafka クラスターの場合は、SASL/Plain になります。
  • Camel K インテグレーションに追加する kamelets と必要なインスタンスパラメーターを把握している。この手順の kamelets の例は、以下のとおりです。

    • coffee-source kamelet: 各イベントを送信する頻度を指定する任意のパラメーター period があります。Example source kamelet のコードを、coffee-source.kamelet.yaml ファイルという名前のファイルにコピーしてから、以下のコマンドを実行してこれをリソースとして namespace に追加できます。

      oc apply -f coffee-source.kamelet.yaml

    • kafka-sink kamelet: Kamelet Catalog で提供されます。このバインディングでは Kafka トピックがデータを受信するため (データコンシューマー)、kafka-sink kamelet を使用します。必須パラメーターの値の例は次のとおりです。

      • bootstrapServers: "broker.url:9092"
      • password - "testpassword"
      • user: "testuser"
      • topic: "test-topic"
      • securityProtocol: OpenShift Streams の Kafka クラスターでは、SASL_SSL がデフォルト値であるため、このパラメーターを設定する必要はありません。AMQ ストリームの Kafka クラスターの場合は、このパラメーターの値は “PLAINTEXT” です。

手順

データソースを Kafka トピックに接続するには、kamelet バインディングを作成します。

  1. 任意のエディターで、以下の基本構造で YAML ファイルを作成します。

    apiVersion: camel.apache.org/v1alpha1
    kind: KameletBinding
    metadata:
      name:
    spec:
      source:
      sink:
  2. kamelet binding の名前を追加します。この例では、バインディングが coffee-source kamelet を kafka-sink kamelet に接続するため、名前は coffees-to-kafka になります。

    apiVersion: camel.apache.org/v1alpha1
    kind: KameletBinding
    metadata:
      name: coffees-to-kafka
    spec:
      source:
      sink:
  3. kamelet バインディングのソースの場合は、データソース kamelet を指定し (たとえば、coffee-source kamelet はコーヒーに関するデータが含まれるイベントを生成します)、kamelet のパラメーターを設定します。

    apiVersion: camel.apache.org/v1alpha1
    kind: KameletBinding
    metadata:
      name: coffees-to-kafka
    spec:
      source:
        ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: coffee-source
        properties:
          period: 5000
      sink:
  4. kamelet binding のシンクの場合は、kafka-sink kamelets およびその必要なパラメーターを指定します。

    たとえば、Kafka クラスターが OpenShift Streams にある場合 (securityProtocol パラメーターを設定する必要はありません)。

    apiVersion: camel.apache.org/v1alpha1
    kind: KameletBinding
    metadata:
      name: coffees-to-kafka
    spec:
      source:
        ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: coffee-source
        properties:
          period: 5000
      sink:
        ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: kafka-sink
        properties:
          bootstrapServers: "broker.url:9092""
          password: "testpassword"
          topic: "test-topic"
          user: "testuser"

    たとえば、Kafka クラスターが AMQ Streams にある場合は、securityProtocol パラメーターを “PLAINTEXT” に設定する必要があります。

    apiVersion: camel.apache.org/v1alpha1
    kind: KameletBinding
    metadata:
      name: coffees-to-kafka
    spec:
      source:
        ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: coffee-source
        properties:
          period: 5000
      sink:
        ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: kafka-sink
        properties:
          bootstrapServers: "broker.url:9092""
          password: "testpassword"
          topic: "test-topic"
          user: "testuser"
          securityProtocol: "PLAINTEXT"
  5. YAML ファイルを保存します (例: coffees-to-kafka.yaml)。
  6. OpenShift プロジェクトにログインします。
  7. KameletBinding をリソースとして OpenShift namespace に追加します。

    oc apply -f <kamelet binding filename>

    以下に例を示します。

    oc apply -f coffees-to-kafka.yaml

    Camel K Operator は、KameletBinding リソースを使用して Camel K インテグレーションを生成し、実行します。ビルドに数分かかる場合があります。

  8. kamelet バインディングリソースのステータスを表示するには、次のコマンドを実行します。

    oc get kameletbindings

  9. インテグレーションの状態を表示するには、以下を実行します。

    oc get integrations

  10. インテグレーションのログを表示するには、以下を実行します。

    kamel logs <integration> -n <project>

    以下に例を示します。

    kamel logs coffees-to-kafka -n my-camel-k-kafka

    結果は以下の出力に似ています。

    ...
    [1] 2021-06-21 07:48:12,696 INFO  [io.quarkus] (main) camel-k-integration 1.4.0 on JVM (powered by Quarkus 1.13.0.Final) started in 2.790s.
Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.