5.15. Knative Kafka の使用


Knative Kafka は、OpenShift Serverless でサポートされているバージョンの Apache Kafka メッセージストリーミングプラットフォームを使用する統合オプションを提供します。Kafka は、イベントソース、チャネル、ブローカー、およびイベントシンク機能のオプションを提供します。

Knative Kafka 機能は、クラスター管理者が KnativeKafka カスタムリソースをインストールしている場合に、OpenShift Serverless インストールで利用できます。

注記

現時点で、Knative Kafka は IBM Z および IBM Power Systems ではサポートされていません。

Knative Kafka は、以下のような追加オプションを提供します。

  • Kafka ソース
  • Kafka チャネル
  • Kafka ブローカー
  • Kafka シンク

5.15.1. Kafka イベント配信およびリトライ

イベント駆動型のアーキテクチャーで Kafka コンポーネントを使用すると、最低でも 1 度のイベント配信が提供されます。これは、戻りコード値を受け取るまで操作がリトライされることを意味します。これにより、失われたイベントに対してアプリケーションの回復性が強化されます。ただし、重複するイベントが送信されてしまう可能性があります。

Kafka イベントソースでは、デフォルトでイベント配信のリトライ回数が固定されています。Kafka チャネルの場合、Kafka チャネルの Delivery 仕様に設定されている場合にのみリトライが実行されます。

配信保証に関する詳細は、イベント配信 のドキュメントを参照してください。

5.15.2. Kafka ソース

Apache Kafka クラスターからイベントを読み取り、これらのイベントをシンクに渡す Kafka ソースを作成できます。Kafka ソースを作成するには、OpenShift Container Platform Web コンソールの Knative (kn)CLI を使用するか、KafkaSource オブジェクトを YAML ファイルとして直接作成し、OpenShift CLI (oc) を使用して適用します。

5.15.2.1. Web コンソールを使用した Kafka イベントソースの作成

Knative Kafka をクラスターにインストールした後、Web コンソールを使用して Kafka ソースを作成できます。OpenShift Container Platform Web コンソールを使用すると、Kafka ソースを作成するための合理的で直感的なユーザーインターフェイスが提供されます。

前提条件

  • OpenShift Serverless Operator、Knative Serving、および KnativeKafka カスタムリソースがクラスターにインストールされている。
  • Web コンソールにログインしている。
  • インポートする Kafka メッセージを生成する Red Hat AMQ Streams (Kafka) クラスターにアクセスできる。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。

手順

  1. Developer パースペクティブで、+Add ページに移動し、Event Source を選択します。
  2. Event Sources ページで、Type セクションの Kafka Source を選択します。
  3. Kafka Source 設定を設定します。

    1. ブートストラップサーバー のコンマ区切りの一覧を追加します。
    2. トピック のコンマ区切りの一覧を追加します。
    3. コンシューマーグループ を追加します。
    4. 作成したサービスアカウントの Service Account Name を選択します。
    5. イベントソースの Sink を選択します。Sink は、チャネル、ブローカー、またはサービスなどの Resource、または URI のいずれかになります。
    6. Kafka イベントソースの Name を入力します。
  4. Create をクリックします。

検証

Topology ページを表示して、Kafka イベントソースが作成され、シンクに接続されていることを確認できます。

  1. Developer パースペクティブで、Topology に移動します。
  2. Kafka イベントソースおよびシンクを表示します。

    View the Kafka source and service in the Topology view

5.15.2.2. Knative CLI を使用した Kafka イベントソースの作成

kn source kafka create コマンドを使用し、Knative (kn) CLI を使用して Kafka ソースを作成できます。イベントソースを作成するために Knative CLI を使用すると、YAML ファイルを直接修正するよりも合理的で直感的なユーザーインターフェイスが得られます。

前提条件

  • OpenShift Serverless Operator、Knative Eventing、Knative Serving、および KnativeKafka カスタムリソース (CR) がクラスターにインストールされている。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
  • インポートする Kafka メッセージを生成する Red Hat AMQ Streams (Kafka) クラスターにアクセスできる。
  • Knative (kn) CLI をインストールしている。
  • オプション: この手順で検証ステップを使用する場合は、OpenShift CLI (oc) をインストールします。

手順

  1. Kafka イベントソースが機能していることを確認するには、受信メッセージをサービスのログにダンプする Knative サービスを作成します。

    $ kn service create event-display \
        --image quay.io/openshift-knative/knative-eventing-sources-event-display
  2. KafkaSource CR を作成します。

    $ kn source kafka create <kafka_source_name> \
        --servers <cluster_kafka_bootstrap>.kafka.svc:9092 \
        --topics <topic_name> --consumergroup my-consumer-group \
        --sink event-display
    注記

    このコマンドのプレースホルダー値は、ソース名、ブートストラップサーバー、およびトピックの値に置き換えます。

    --servers--topics、および --consumergroup オプションは、Kafka クラスターへの接続パラメーターを指定します。--consumergroup オプションは任意です。

  3. オプション: 作成した KafkaSource CR の詳細を表示します。

    $ kn source kafka describe <kafka_source_name>

    出力例

    Name:              example-kafka-source
    Namespace:         kafka
    Age:               1h
    BootstrapServers:  example-cluster-kafka-bootstrap.kafka.svc:9092
    Topics:            example-topic
    ConsumerGroup:     example-consumer-group
    
    Sink:
      Name:       event-display
      Namespace:  default
      Resource:   Service (serving.knative.dev/v1)
    
    Conditions:
      OK TYPE            AGE REASON
      ++ Ready            1h
      ++ Deployed         1h
      ++ SinkProvided     1h

検証手順

  1. Kafka インスタンスをトリガーし、メッセージをトピックに送信します。

    $ oc -n kafka run kafka-producer \
        -ti --image=quay.io/strimzi/kafka:latest-kafka-2.7.0 --rm=true \
        --restart=Never -- bin/kafka-console-producer.sh \
        --broker-list <cluster_kafka_bootstrap>:9092 --topic my-topic

    プロンプトにメッセージを入力します。このコマンドは、以下を前提とします。

    • Kafka クラスターが kafka namespace にインストールされている。
    • KafkaSource オブジェクトは、my-topic トピックを使用するように設定されている。
  2. ログを表示して、メッセージが到達していることを確認します。

    $ oc logs $(oc get pod -o name | grep event-display) -c user-container

    出力例

    ☁️  cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.kafka.event
      source: /apis/v1/namespaces/default/kafkasources/example-kafka-source#example-topic
      subject: partition:46#0
      id: partition:46/offset:0
      time: 2021-03-10T11:21:49.4Z
    Extensions,
      traceparent: 00-161ff3815727d8755848ec01c866d1cd-7ff3916c44334678-00
    Data,
      Hello!

5.15.2.2.1. Knative CLI シンクフラグ

Knative (kn) CLI を使用してイベントソースを作成する場合、--sink フラグを使用して、イベントがリソースから送信されるシンクを指定できます。シンクは、他のリソースから受信イベントを受信できる、アドレス指定可能または呼び出し可能な任意のリソースです。

以下の例では、サービスの http://event-display.svc.cluster.local をシンクとして使用するシンクバインディングを作成します。

シンクフラグを使用したコマンドの例

$ kn source binding create bind-heartbeat \
  --namespace sinkbinding-example \
  --subject "Job:batch/v1:app=heartbeat-cron" \
  --sink http://event-display.svc.cluster.local \ 1
  --ce-override "sink=bound"

1
http://event-display.svc.cluster.localsvc は、シンクが Knative サービスであることを判別します。他のデフォルトのシンクの接頭辞には、channel および broker が含まれます。

5.15.2.3. YAML を使用した Kafka イベントソースの作成

YAML ファイルを使用して Knative リソースを作成する場合、宣言的 API を使用するため、再現性の高い方法でアプリケーションを宣言的に記述することができます。YAML を使用して Kafka ソースを作成するには、KafkaSource オブジェクトを定義する YAML ファイルを作成し、oc apply コマンドを使用してそれを適用する必要があります。

前提条件

  • OpenShift Serverless Operator、Knative Serving、および KnativeKafka カスタムリソースがクラスターにインストールされている。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
  • インポートする Kafka メッセージを生成する Red Hat AMQ Streams (Kafka) クラスターにアクセスできる。
  • OpenShift CLI (oc) をインストールしている。

手順

  1. KafkaSource オブジェクトを YAML ファイルとして作成します。

    apiVersion: sources.knative.dev/v1beta1
    kind: KafkaSource
    metadata:
      name: <source_name>
    spec:
      consumerGroup: <group_name> 1
      bootstrapServers:
      - <list_of_bootstrap_servers>
      topics:
      - <list_of_topics> 2
      sink:
      - <list_of_sinks> 3
    1
    コンシューマーグループは、同じグループ ID を使用し、トピックからデータを消費するコンシューマーのグループです。
    2
    トピックは、データの保存先を提供します。各トピックは、1 つまたは複数のパーティションに分割されます。
    3
    シンクは、イベントがソースから送信される場所を指定します。
    重要

    OpenShift Serverless 上の KafkaSource オブジェクトの API の v1beta1 バージョンのみがサポートされます。非推奨となった v1alpha1 バージョンの API は使用しないでください。

    KafkaSource オブジェクトの例

    apiVersion: sources.knative.dev/v1beta1
    kind: KafkaSource
    metadata:
      name: kafka-source
    spec:
      consumerGroup: knative-group
      bootstrapServers:
      - my-cluster-kafka-bootstrap.kafka:9092
      topics:
      - knative-demo-topic
      sink:
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: event-display

  2. KafkaSource YAML ファイルを適用します。

    $ oc apply -f <filename>

検証

  • 以下のコマンドを入力して、Kafka イベントソースが作成されたことを確認します。

    $ oc get pods

    出力例

    NAME                                    READY     STATUS    RESTARTS   AGE
    kafkasource-kafka-source-5ca0248f-...   1/1       Running   0          13m

5.15.3. Kafka ブローカー

実稼働環境に対応した Knative Eventing デプロイメントの場合、Red Hat は Knative Kafka ブローカーの実装を使用することをお勧めします。Kafka ブローカーは、Knative ブローカーの Apache Kafka ネイティブ実装であり、CloudEvents を Kafka インスタンスに直接送信します。

重要

Kafka ブローカーの連邦情報処理標準 (FIPS) モードが無効になっています。

Kafka ブローカーは、イベントの保存とルーティングのために Kafka とネイティブに統合されています。これにより、他のブローカータイプよりもブローカーとトリガーモデルの Kafka との統合性が向上し、ネットワークホップを削減することができます。Kafka ブローカー実装のその他の利点は次のとおりです。

  • 少なくとも 1 回の配信保証
  • CloudEvents パーティショニング拡張機能に基づくイベントの順序付き配信
  • コントロールプレーンの高可用性
  • 水平方向にスケーラブルなデータプレーン

Knative Kafka ブローカーは、バイナリーコンテンツモードを使用して、受信 CloudEvents を Kafka レコードとして保存します。これは、CloudEvent のすべての属性と拡張機能が Kafka レコードのヘッダーとしてマップされ、CloudEvent の data 仕様が Kafka レコードの値に対応することを意味します。

Kafka ブローカーの使用については、ブローカーの作成 を参照してください。

5.15.4. YAML を使用した Kafka チャネルの作成

YAML ファイルを使用して Knative リソースを作成する場合、宣言的 API を使用するため、再現性の高い方法でチャネルを宣言的に記述することができます。Kafka チャネルを作成することで、Kafka トピックに裏打ちされた Knative Eventing チャネルを作成できます。YAML を使用して Kafka チャネルを作成するには、KafkaChannel オブジェクトを定義する YAML ファイルを作成し、oc apply コマンドを使用してそれを適用する必要があります。

前提条件

  • OpenShift Serverless Operator、Knative Eventing、および KnativeKafka カスタムリソースは OpenShift Container Platform クラスターにインストールされます。
  • OpenShift CLI (oc) をインストールしている。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。

手順

  1. KafkaChannel オブジェクトを YAML ファイルとして作成します。

    apiVersion: messaging.knative.dev/v1beta1
    kind: KafkaChannel
    metadata:
      name: example-channel
      namespace: default
    spec:
      numPartitions: 3
      replicationFactor: 1
    重要

    OpenShift Serverless 上の KafkaChannel オブジェクトの API の v1beta1 バージョンのみがサポートされます。非推奨となった v1alpha1 バージョンの API は使用しないでください。

  2. KafkaChannel YAML ファイルを適用します。

    $ oc apply -f <filename>

5.15.5. Kafka シンクコ

Kafka シンクは、クラスター管理者がクラスターで Kafka を有効にした場合に使用できるイベントシンクの一種です。Kafka シンクを使用して、イベントソースから Kafka トピックにイベントを直接送信できます。

5.15.5.1. Kafka シンクの使用

Kafka トピックにイベントを送信する Kafka シンクと呼ばれるイベントシンクを作成できます。YAML ファイルを使用して Knative リソースを作成する場合、宣言的 API を使用するため、再現性の高い方法でアプリケーションを宣言的に記述することができます。デフォルトでは、Kafka シンクはバイナリーコンテンツモードを使用します。これは、構造化モードよりも効率的です。YAML を使用して Kafka シンクを作成するには、KafkaSink オブジェクトを定義する YAML ファイルを作成してから、ocapply コマンドを使用してそれを適用する必要があります。

前提条件

  • OpenShift Serverless Operator、Knative Serving、および KnativeKafka カスタムリソース (CR) がクラスターにインストールされている。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
  • インポートする Kafka メッセージを生成する Red Hat AMQ Streams (Kafka) クラスターにアクセスできる。
  • OpenShift CLI (oc) をインストールしている。

手順

  1. KafkaSink オブジェクト定義を YAML ファイルとして作成します。

    Kafka シンク YAML

    apiVersion: eventing.knative.dev/v1alpha1
    kind: KafkaSink
    metadata:
      name: <sink-name>
      namespace: <namespace>
    spec:
      topic: <topic-name>
      bootstrapServers:
       - <bootstrap-server>

  2. Kafka シンクを作成するには、KafkaSink YAML ファイルを適用します。

    $ oc apply -f <filename>
  3. シンクが仕様で指定されるようにイベントソースを設定します。

    API サーバーソースに接続された Kafka シンクの例

    apiVersion: sources.knative.dev/v1alpha2
    kind: ApiServerSource
    metadata:
      name: <source-name> 1
      namespace: <namespace> 2
    spec:
      serviceAccountName: <service-account-name> 3
      mode: Resource
      resources:
      - apiVersion: v1
        kind: Event
      sink:
        ref:
          apiVersion: eventing.knative.dev/v1alpha1
          kind: KafkaSink
          name: <sink-name> 4

    1
    イベントソースの名前。
    2
    イベントソースの namespace。
    3
    イベントソースのサービスアカウント。
    4
    Kafka シンクの名前。

5.15.6. 関連情報

Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.