2.3. Kamelet Binding でのデータソースの Kafka トピックへの接続


データソースを Kafka トピックに接続するには、図 2.2で説明されているように Kamelet Binding を作成します。

Connecting a data source to a Kafka topic 図 2.2 データソースの Kafka トピックへの接続

前提条件

  • イベントの送信先となる Kafka トピックの名前を知っておく必要があります。

    この手順の例では、イベントを受信するために test-topic を使用します。

  • Kafka インスタンスの以下のパラメーターの値を知っている必要があります。

    • bootstrapServers: Kafka Broker URL のコンマ区切りリスト
    • password: Kafka に対して認証を行うためのパスワード。OpenShift Streams では、これは credentials.json ファイル password です。AMQ Streams の認証されていない kafka インスタンスでは、任意の空でない文字列を指定できます。
    • user: Kafka に対して認証するユーザー名。OpenShift Streams では、これは credentials.json ファイル clientID です。AMQ Streams の認証されていない kafka インスタンスでは、任意の空でない文字列を指定できます。

      OpenShift Streams の使用時にこれらの値を取得する方法は、Kafka クレデンシャルの取得 を参照してください。

    • securityProtocol: Kafka ブローカーと通信するためのセキュリティープロトコルを知っている必要があります。OpenShift Streams の Kafka クラスターでは、SASL_SSL (デフォルト) です。AMQ Streams 上の Kafka クラスターでは、PLAINTEXT になります。
  • Camel K インテグレーションに追加する Kamelets と必要なインスタンスパラメーターを知っている必要があります。

    この手順の Kamelets の例は次のとおりです。

    • coffee-source Kamelet: 各イベントを送信する頻度を指定するオプションのパラメーター period があります。ソース Kamelet の例 のコードを、coffee-source.kamelet.yaml ファイルという名前のファイルにコピーしてから、以下のコマンドを実行してこれをリソースとして namespace に追加できます。

      oc apply -f coffee-source.kamelet.yaml

    • Kamelet Catalog で提供される kafka-sink Kamelet。Kafka トピックがこのバインディングでデータ (データコンシューマー) を受信しているため、kafka-sink Kamelet を使用します。

手順

データソースを Kafka トピックに接続するには、Kamelet Binding を作成します。

  1. 任意のエディターで、以下の基本構造で YAML ファイルを作成します。

    apiVersion: camel.apache.org/v1alpha1
    kind: KameletBinding
    metadata:
      name:
    spec:
      source:
      sink:
  2. Kamelet Binding の名前を追加します。この例では、バインディングが coffee-source Kamelet を kafka-sink Kamelet に接続するため、名前は coffees-to-kafka になります。

    apiVersion: camel.apache.org/v1alpha1
    kind: KameletBinding
    metadata:
      name: coffees-to-kafka
    spec:
      source:
      sink:
  3. Kamelet Binding のソースの場合は、データソース Kamelet を指定し (たとえば、coffee-source Kamelet はコーヒーに関するデータが含まれるイベントを生成します)、Kamelet のパラメーターを設定します。

    apiVersion: camel.apache.org/v1alpha1
    kind: KameletBinding
    metadata:
      name: coffees-to-kafka
    spec:
      source:
        ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: coffee-source
        properties:
          period: 5000
      sink:
  4. Kamelet Binding のシンクの場合は、kafka-sink Kamelet およびその必要なプロパティーを指定します。

    たとえば、Kafka クラスターが OpenShift Streams 上にある場合:

    • user プロパティーには、clientID 指定します (例: srvc-acct-eb575691-b94a-41f1-ab97-50ade0cd1094)
    • password プロパティーには、 password 指定します (例: facf3df1-3c8d-4253-aa87-8c95ca5e1225)
    • securityProtocol プロパティーを設定する必要はありません。

      apiVersion: camel.apache.org/v1alpha1
      kind: KameletBinding
      metadata:
        name: coffees-to-kafka
      spec:
        source:
          ref:
            kind: Kamelet
            apiVersion: camel.apache.org/v1alpha1
            name: coffee-source
          properties:
            period: 5000
        sink:
          ref:
            kind: Kamelet
            apiVersion: camel.apache.org/v1alpha1
            name: kafka-sink
          properties:
            bootstrapServers: "my-kafka--ptdfzrhmlkwqw-a-ykm-mawgdh.kafka.devshift.org:443"
            password: "facf3df1-3c8d-4253-aa87-8c95ca5e1225"
            topic: "test-topic"
            user: "srvc-acct-eb575691-b94a-41f1-ab97-50ade0cd1094"

      別の例として、Kafka クラスターが AMQ Streams 上にある場合、securityProtocol プロパティーを “PLAINTEXT” に設定します。

      apiVersion: camel.apache.org/v1alpha1
      kind: KameletBinding
      metadata:
        name: coffees-to-kafka
      spec:
        source:
          ref:
            kind: Kamelet
            apiVersion: camel.apache.org/v1alpha1
            name: coffee-source
          properties:
            period: 5000
        sink:
          ref:
            kind: Kamelet
            apiVersion: camel.apache.org/v1alpha1
            name: kafka-sink
          properties:
            bootstrapServers: "broker.url:9092"
            password: "testpassword"
            topic: "test-topic"
            user: "testuser"
            securityProtocol: "PLAINTEXT"
  5. YAML ファイルを保存します (例:coffees-to-kafka.yaml)。
  6. OpenShift プロジェクトにログインします。
  7. Kamelet Binding をリソースとして OpenShift namespace に追加します。

    oc apply -f <kamelet binding filename>

    以下に例を示します。

    oc apply -f coffees-to-kafka.yaml

    Camel K Operator は、KameletBinding リソースを使用して Camel K インテグレーションを生成し、実行します。ビルドに数分かかる場合があります。

  8. KameletBinding リソースのステータスを表示するには、次のコマンドを実行します。

    oc get kameletbindings

  9. インテグレーションの状態を表示するには、以下を実行します。

    oc get integrations

  10. インテグレーションのログを表示するには、以下を実行します。

    kamel logs <integration> -n <project>

    以下に例を示します。

    kamel logs coffees-to-kafka -n my-camel-k-kafka

Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.