2.5. Apache Kafka のソース


Apache Kafka クラスターからイベントを読み取り、これらのイベントをシンクに渡す Apache Kafka ソースを作成できます。Kafka ソースを作成するには、OpenShift Container Platform Web コンソールの Knative (kn)CLI を使用するか、KafkaSource オブジェクトを YAML ファイルとして直接作成し、OpenShift CLI (oc) を使用して適用します。

注記

Apache Kafka の Knative ブローカーのインストール のドキュメントを参照してください。

2.5.1. Web コンソールを使用した Apache Kafka イベントソースの作成

Apache Kafka の Knative ブローカー実装がクラスターにインストールされたら、Web コンソールを使用して Apache Kafka ソースを作成できます。OpenShift Container Platform Web コンソールを使用すると、Kafka ソースを作成するための合理的で直感的なユーザーインターフェイスが提供されます。

前提条件

  • OpenShift Serverless Operator、Knative Serving、および KnativeKafka カスタムリソースがクラスターにインストールされている。
  • Web コンソールにログインしている。
  • インポートする Kafka メッセージを生成する Red Hat AMQ Streams (Kafka) クラスターにアクセスできる。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。

手順

  1. Developer パースペクティブで、+Add ページに移動し、Event Source を選択します。
  2. Event Sources ページで、Type セクションの Kafka Source を選択します。
  3. Kafka Source 設定を設定します。

    1. ブートストラップサーバー のコンマ区切りの一覧を追加します。
    2. トピック のコンマ区切りの一覧を追加します。
    3. コンシューマーグループ を追加します。
    4. 作成したサービスアカウントの Service Account Name を選択します。
    5. イベントソースの Sink を選択します。Sink は、チャネル、ブローカー、またはサービスなどの Resource、または URI のいずれかになります。
    6. Kafka イベントソースの Name を入力します。
  4. Create をクリックします。

検証

Topology ページを表示して、Kafka イベントソースが作成され、シンクに接続されていることを確認できます。

  1. Developer パースペクティブで、Topology に移動します。
  2. Kafka イベントソースおよびシンクを表示します。

    Topology ビューでの Kafka ソースおよびサービスの表示

2.5.2. Knative CLI を使用した Apache Kafka イベントソースの作成

kn source kafka create コマンドを使用し、Knative (kn) CLI を使用して Kafka ソースを作成できます。Knative CLI を使用してイベントソースを作成すると、YAML ファイルを直接変更するよりも合理化された直感的なユーザーインターフェイスが提供されます。

前提条件

  • OpenShift Serverless Operator、Knative Eventing、Knative Serving、および KnativeKafka カスタムリソース (CR) がクラスターにインストールされている。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
  • インポートする Kafka メッセージを生成する Red Hat AMQ Streams (Kafka) クラスターにアクセスできる。
  • Knative (kn) CLI がインストールされている。
  • オプション: この手順で検証ステップを使用する場合は、OpenShift CLI (oc) をインストールしている。

手順

  1. Kafka イベントソースが機能していることを確認するには、受信メッセージをサービスのログにダンプする Knative サービスを作成します。

    $ kn service create event-display \
        --image quay.io/openshift-knative/knative-eventing-sources-event-display
  2. KafkaSource CR を作成します。

    $ kn source kafka create <kafka_source_name> \
        --servers <cluster_kafka_bootstrap>.kafka.svc:9092 \
        --topics <topic_name> --consumergroup my-consumer-group \
        --sink event-display
    注記

    このコマンドのプレースホルダー値は、ソース名、ブートストラップサーバー、およびトピックの値に置き換えます。

    --servers--topics、および --consumergroup オプションは、Kafka クラスターへの接続パラメーターを指定します。--consumergroup オプションは任意です。

  3. オプション: 作成した KafkaSource CR の詳細を表示します。

    $ kn source kafka describe <kafka_source_name>

    出力例

    Name:              example-kafka-source
    Namespace:         kafka
    Age:               1h
    BootstrapServers:  example-cluster-kafka-bootstrap.kafka.svc:9092
    Topics:            example-topic
    ConsumerGroup:     example-consumer-group
    
    Sink:
      Name:       event-display
      Namespace:  default
      Resource:   Service (serving.knative.dev/v1)
    
    Conditions:
      OK TYPE            AGE REASON
      ++ Ready            1h
      ++ Deployed         1h
      ++ SinkProvided     1h

検証手順

  1. Kafka インスタンスをトリガーし、メッセージをトピックに送信します。

    $ oc -n kafka run kafka-producer \
        -ti --image=quay.io/strimzi/kafka:latest-kafka-2.7.0 --rm=true \
        --restart=Never -- bin/kafka-console-producer.sh \
        --broker-list <cluster_kafka_bootstrap>:9092 --topic my-topic

    プロンプトにメッセージを入力します。このコマンドは、以下を前提とします。

    • Kafka クラスターが kafka namespace にインストールされている。
    • KafkaSource オブジェクトは、my-topic トピックを使用するように設定されている。
  2. ログを表示して、メッセージが到達していることを確認します。

    $ oc logs $(oc get pod -o name | grep event-display) -c user-container

    出力例

    ☁️  cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.kafka.event
      source: /apis/v1/namespaces/default/kafkasources/example-kafka-source#example-topic
      subject: partition:46#0
      id: partition:46/offset:0
      time: 2021-03-10T11:21:49.4Z
    Extensions,
      traceparent: 00-161ff3815727d8755848ec01c866d1cd-7ff3916c44334678-00
    Data,
      Hello!

2.5.2.1. Knative CLI シンクフラグ

Knative (kn) CLI を使用してイベントソースを作成する場合は、--sink フラグを使用して、イベントがリソースから送信されるシンクを指定できます。シンクは、他のリソースから受信イベントを受信できる、アドレス指定可能または呼び出し可能な任意のリソースです。

以下の例では、サービスの http://event-display.svc.cluster.local をシンクとして使用するシンクバインディングを作成します。

シンクフラグを使用したコマンドの例

$ kn source binding create bind-heartbeat \
  --namespace sinkbinding-example \
  --subject "Job:batch/v1:app=heartbeat-cron" \
  --sink http://event-display.svc.cluster.local \ 1
  --ce-override "sink=bound"

1
http://event-display.svc.cluster.localsvc は、シンクが Knative サービスであることを判別します。他のデフォルトのシンクの接頭辞には、channel および broker が含まれます。

2.5.3. YAML を使用した Apache Kafka イベントソースの作成

YAML ファイルを使用して Knative リソースを作成する場合、宣言的 API を使用するため、再現性の高い方法でアプリケーションを宣言的に記述することができます。YAML を使用して Kafka ソースを作成するには、KafkaSource オブジェクトを定義する YAML ファイルを作成し、oc apply コマンドを使用してそれを適用する必要があります。

前提条件

  • OpenShift Serverless Operator、Knative Serving、および KnativeKafka カスタムリソースがクラスターにインストールされている。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
  • インポートする Kafka メッセージを生成する Red Hat AMQ Streams (Kafka) クラスターにアクセスできる。
  • OpenShift CLI (oc) がインストールされている。

手順

  1. KafkaSource オブジェクトを YAML ファイルとして作成します。

    apiVersion: sources.knative.dev/v1beta1
    kind: KafkaSource
    metadata:
      name: <source_name>
    spec:
      consumerGroup: <group_name> 1
      bootstrapServers:
      - <list_of_bootstrap_servers>
      topics:
      - <list_of_topics> 2
      sink:
      - <list_of_sinks> 3
    1
    コンシューマーグループは、同じグループ ID を使用し、トピックからデータを消費するコンシューマーのグループです。
    2
    トピックは、データの保存先を提供します。各トピックは、1 つまたは複数のパーティションに分割されます。
    3
    シンクは、イベントがソースから送信される場所を指定します。
    重要

    OpenShift Serverless 上の KafkaSource オブジェクトの API の v1beta1 バージョンのみがサポートされます。非推奨となった v1alpha1 バージョンの API は使用しないでください。

    KafkaSource オブジェクトの例

    apiVersion: sources.knative.dev/v1beta1
    kind: KafkaSource
    metadata:
      name: kafka-source
    spec:
      consumerGroup: knative-group
      bootstrapServers:
      - my-cluster-kafka-bootstrap.kafka:9092
      topics:
      - knative-demo-topic
      sink:
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: event-display

  2. KafkaSource YAML ファイルを適用します。

    $ oc apply -f <filename>

検証

  • 以下のコマンドを入力して、Kafka イベントソースが作成されたことを確認します。

    $ oc get pods

    出力例

    NAME                                    READY     STATUS    RESTARTS   AGE
    kafkasource-kafka-source-5ca0248f-...   1/1       Running   0          13m

2.5.4. Apache Kafka ソースの SASL 認証の設定

Simple Authentication and Security Layer (SASL) は、Apache Kafka が認証に使用します。クラスターで SASL 認証を使用する場合、ユーザーは Kafka クラスターと通信するために Knative に認証情報を提供する必要があります。そうしないと、イベントを生成または消費できません。

前提条件

  • OpenShift Container Platform でクラスターまたは専用の管理者パーミッションを持っている。
  • OpenShift Serverless Operator、Knative Eventing、および KnativeKafka CR は、OpenShift Container Platform クラスターにインストールされている。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
  • Kafka クラスターのユーザー名およびパスワードがある。
  • 使用する SASL メカニズムを選択している (例: PLAINSCRAM-SHA-256、または SCRAM-SHA-512)。
  • TLS が有効にされている場合、Kafka クラスターの ca.crt 証明書ファイルも必要になります。
  • OpenShift (oc) CLI がインストールされている。

手順

  1. 選択された namespace にシークレットとして証明書ファイルを作成します。

    $ oc create secret -n <namespace> generic <kafka_auth_secret> \
      --from-file=ca.crt=caroot.pem \
      --from-literal=password="SecretPassword" \
      --from-literal=saslType="SCRAM-SHA-512" \ 1
      --from-literal=user="my-sasl-user"
    1
    SASL タイプは PLAINSCRAM-SHA-256、または SCRAM-SHA-512 です。
  2. Kafka ソースを作成または変更して、次の spec 設定が含まれるようにします。

    apiVersion: sources.knative.dev/v1beta1
    kind: KafkaSource
    metadata:
      name: example-source
    spec:
    ...
      net:
        sasl:
          enable: true
          user:
            secretKeyRef:
              name: <kafka_auth_secret>
              key: user
          password:
            secretKeyRef:
              name: <kafka_auth_secret>
              key: password
          type:
            secretKeyRef:
              name: <kafka_auth_secret>
              key: saslType
        tls:
          enable: true
          caCert: 1
            secretKeyRef:
              name: <kafka_auth_secret>
              key: ca.crt
    ...
    1
    パブリッククラウドの Kafka サービスを使用している場合は、caCert 仕様は必要ありません。
Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.