5.15.2.2. Knative CLI を使用した Kafka イベントソースの作成
kn source kafka create
コマンドを使用し、Knative (kn
) CLI を使用して Kafka ソースを作成できます。イベントソースを作成するために Knative CLI を使用すると、YAML ファイルを直接修正するよりも合理的で直感的なユーザーインターフェイスが得られます。
前提条件
-
OpenShift Serverless Operator、Knative Eventing、Knative Serving、および
KnativeKafka
カスタムリソース (CR) がクラスターにインストールされている。 - OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
- インポートする Kafka メッセージを生成する Red Hat AMQ Streams (Kafka) クラスターにアクセスできる。
-
Knative (
kn
) CLI をインストールしている。 -
オプション: この手順で検証ステップを使用する場合は、OpenShift CLI (
oc
) をインストールします。
手順
Kafka イベントソースが機能していることを確認するには、受信メッセージをサービスのログにダンプする Knative サービスを作成します。
$ kn service create event-display \ --image quay.io/openshift-knative/knative-eventing-sources-event-display
KafkaSource
CR を作成します。$ kn source kafka create <kafka_source_name> \ --servers <cluster_kafka_bootstrap>.kafka.svc:9092 \ --topics <topic_name> --consumergroup my-consumer-group \ --sink event-display
注記このコマンドのプレースホルダー値は、ソース名、ブートストラップサーバー、およびトピックの値に置き換えます。
--servers
、--topics
、および--consumergroup
オプションは、Kafka クラスターへの接続パラメーターを指定します。--consumergroup
オプションは任意です。オプション: 作成した
KafkaSource
CR の詳細を表示します。$ kn source kafka describe <kafka_source_name>
出力例
Name: example-kafka-source Namespace: kafka Age: 1h BootstrapServers: example-cluster-kafka-bootstrap.kafka.svc:9092 Topics: example-topic ConsumerGroup: example-consumer-group Sink: Name: event-display Namespace: default Resource: Service (serving.knative.dev/v1) Conditions: OK TYPE AGE REASON ++ Ready 1h ++ Deployed 1h ++ SinkProvided 1h
検証手順
Kafka インスタンスをトリガーし、メッセージをトピックに送信します。
$ oc -n kafka run kafka-producer \ -ti --image=quay.io/strimzi/kafka:latest-kafka-2.7.0 --rm=true \ --restart=Never -- bin/kafka-console-producer.sh \ --broker-list <cluster_kafka_bootstrap>:9092 --topic my-topic
プロンプトにメッセージを入力します。このコマンドは、以下を前提とします。
-
Kafka クラスターが
kafka
namespace にインストールされている。 -
KafkaSource
オブジェクトは、my-topic
トピックを使用するように設定されている。
-
Kafka クラスターが
ログを表示して、メッセージが到達していることを確認します。
$ oc logs $(oc get pod -o name | grep event-display) -c user-container
出力例
☁️ cloudevents.Event Validation: valid Context Attributes, specversion: 1.0 type: dev.knative.kafka.event source: /apis/v1/namespaces/default/kafkasources/example-kafka-source#example-topic subject: partition:46#0 id: partition:46/offset:0 time: 2021-03-10T11:21:49.4Z Extensions, traceparent: 00-161ff3815727d8755848ec01c866d1cd-7ff3916c44334678-00 Data, Hello!
5.15.2.2.1. Knative CLI シンクフラグ
Knative (kn
) CLI を使用してイベントソースを作成する場合、--sink
フラグを使用して、イベントがリソースから送信されるシンクを指定できます。シンクは、他のリソースから受信イベントを受信できる、アドレス指定可能または呼び出し可能な任意のリソースです。
以下の例では、サービスの http://event-display.svc.cluster.local
をシンクとして使用するシンクバインディングを作成します。
シンクフラグを使用したコマンドの例
$ kn source binding create bind-heartbeat \
--namespace sinkbinding-example \
--subject "Job:batch/v1:app=heartbeat-cron" \
--sink http://event-display.svc.cluster.local \ 1
--ce-override "sink=bound"
- 1
http://event-display.svc.cluster.local
のsvc
は、シンクが Knative サービスであることを判別します。他のデフォルトのシンクの接頭辞には、channel
およびbroker
が含まれます。