5.15. Knative Kafka の使用
Knative Kafka は、OpenShift Serverless でサポートされているバージョンの Apache Kafka メッセージストリーミングプラットフォームを使用する統合オプションを提供します。Kafka は、イベントソース、チャネル、ブローカー、およびイベントシンク機能のオプションを提供します。
Knative Kafka 機能は、クラスター管理者が KnativeKafka
カスタムリソースをインストールしている場合に、OpenShift Serverless インストールで利用できます。
現時点で、Knative Kafka は IBM Z および IBM Power Systems ではサポートされていません。
Knative Kafka は、以下のような追加オプションを提供します。
- Kafka ソース
- Kafka チャネル
- Kafka ブローカー
- Kafka シンク
5.15.1. Kafka イベント配信およびリトライ
イベント駆動型のアーキテクチャーで Kafka コンポーネントを使用すると、最低でも 1 度のイベント配信が提供されます。これは、戻りコード値を受け取るまで操作がリトライされることを意味します。これにより、失われたイベントに対してアプリケーションの回復性が強化されます。ただし、重複するイベントが送信されてしまう可能性があります。
Kafka イベントソースでは、デフォルトでイベント配信のリトライ回数が固定されています。Kafka チャネルの場合、Kafka チャネルの Delivery
仕様に設定されている場合にのみリトライが実行されます。
配信保証に関する詳細は、イベント配信 のドキュメントを参照してください。
5.15.2. Kafka ソース
Apache Kafka クラスターからイベントを読み取り、これらのイベントをシンクに渡す Kafka ソースを作成できます。Kafka ソースを作成するには、OpenShift Container Platform Web コンソールの Knative (kn
)CLI を使用するか、KafkaSource
オブジェクトを YAML ファイルとして直接作成し、OpenShift CLI (oc
) を使用して適用します。
5.15.2.1. Web コンソールを使用した Kafka イベントソースの作成
Knative Kafka をクラスターにインストールした後、Web コンソールを使用して Kafka ソースを作成できます。OpenShift Container Platform Web コンソールを使用すると、Kafka ソースを作成するための合理的で直感的なユーザーインターフェイスが提供されます。
前提条件
-
OpenShift Serverless Operator、Knative Serving、および
KnativeKafka
カスタムリソースがクラスターにインストールされている。 - Web コンソールにログインしている。
- インポートする Kafka メッセージを生成する Red Hat AMQ Streams (Kafka) クラスターにアクセスできる。
- OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
手順
- Developer パースペクティブで、+Add ページに移動し、Event Source を選択します。
- Event Sources ページで、Type セクションの Kafka Source を選択します。
Kafka Source 設定を設定します。
- ブートストラップサーバー のコンマ区切りの一覧を追加します。
- トピック のコンマ区切りの一覧を追加します。
- コンシューマーグループ を追加します。
- 作成したサービスアカウントの Service Account Name を選択します。
- イベントソースの Sink を選択します。Sink は、チャネル、ブローカー、またはサービスなどの Resource、または URI のいずれかになります。
- Kafka イベントソースの Name を入力します。
- Create をクリックします。
検証
Topology ページを表示して、Kafka イベントソースが作成され、シンクに接続されていることを確認できます。
- Developer パースペクティブで、Topology に移動します。
Kafka イベントソースおよびシンクを表示します。
5.15.2.2. Knative CLI を使用した Kafka イベントソースの作成
kn source kafka create
コマンドを使用し、Knative (kn
) CLI を使用して Kafka ソースを作成できます。イベントソースを作成するために Knative CLI を使用すると、YAML ファイルを直接修正するよりも合理的で直感的なユーザーインターフェイスが得られます。
前提条件
-
OpenShift Serverless Operator、Knative Eventing、Knative Serving、および
KnativeKafka
カスタムリソース (CR) がクラスターにインストールされている。 - OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
- インポートする Kafka メッセージを生成する Red Hat AMQ Streams (Kafka) クラスターにアクセスできる。
-
Knative (
kn
) CLI をインストールしている。 -
オプション: この手順で検証ステップを使用する場合は、OpenShift CLI (
oc
) をインストールします。
手順
Kafka イベントソースが機能していることを確認するには、受信メッセージをサービスのログにダンプする Knative サービスを作成します。
$ kn service create event-display \ --image quay.io/openshift-knative/knative-eventing-sources-event-display
KafkaSource
CR を作成します。$ kn source kafka create <kafka_source_name> \ --servers <cluster_kafka_bootstrap>.kafka.svc:9092 \ --topics <topic_name> --consumergroup my-consumer-group \ --sink event-display
注記このコマンドのプレースホルダー値は、ソース名、ブートストラップサーバー、およびトピックの値に置き換えます。
--servers
、--topics
、および--consumergroup
オプションは、Kafka クラスターへの接続パラメーターを指定します。--consumergroup
オプションは任意です。オプション: 作成した
KafkaSource
CR の詳細を表示します。$ kn source kafka describe <kafka_source_name>
出力例
Name: example-kafka-source Namespace: kafka Age: 1h BootstrapServers: example-cluster-kafka-bootstrap.kafka.svc:9092 Topics: example-topic ConsumerGroup: example-consumer-group Sink: Name: event-display Namespace: default Resource: Service (serving.knative.dev/v1) Conditions: OK TYPE AGE REASON ++ Ready 1h ++ Deployed 1h ++ SinkProvided 1h
検証手順
Kafka インスタンスをトリガーし、メッセージをトピックに送信します。
$ oc -n kafka run kafka-producer \ -ti --image=quay.io/strimzi/kafka:latest-kafka-2.7.0 --rm=true \ --restart=Never -- bin/kafka-console-producer.sh \ --broker-list <cluster_kafka_bootstrap>:9092 --topic my-topic
プロンプトにメッセージを入力します。このコマンドは、以下を前提とします。
-
Kafka クラスターが
kafka
namespace にインストールされている。 -
KafkaSource
オブジェクトは、my-topic
トピックを使用するように設定されている。
-
Kafka クラスターが
ログを表示して、メッセージが到達していることを確認します。
$ oc logs $(oc get pod -o name | grep event-display) -c user-container
出力例
☁️ cloudevents.Event Validation: valid Context Attributes, specversion: 1.0 type: dev.knative.kafka.event source: /apis/v1/namespaces/default/kafkasources/example-kafka-source#example-topic subject: partition:46#0 id: partition:46/offset:0 time: 2021-03-10T11:21:49.4Z Extensions, traceparent: 00-161ff3815727d8755848ec01c866d1cd-7ff3916c44334678-00 Data, Hello!
5.15.2.2.1. Knative CLI シンクフラグ
Knative (kn
) CLI を使用してイベントソースを作成する場合、--sink
フラグを使用して、イベントがリソースから送信されるシンクを指定できます。シンクは、他のリソースから受信イベントを受信できる、アドレス指定可能または呼び出し可能な任意のリソースです。
以下の例では、サービスの http://event-display.svc.cluster.local
をシンクとして使用するシンクバインディングを作成します。
シンクフラグを使用したコマンドの例
$ kn source binding create bind-heartbeat \
--namespace sinkbinding-example \
--subject "Job:batch/v1:app=heartbeat-cron" \
--sink http://event-display.svc.cluster.local \ 1
--ce-override "sink=bound"
- 1
http://event-display.svc.cluster.local
のsvc
は、シンクが Knative サービスであることを判別します。他のデフォルトのシンクの接頭辞には、channel
およびbroker
が含まれます。
5.15.2.3. YAML を使用した Kafka イベントソースの作成
YAML ファイルを使用して Knative リソースを作成する場合、宣言的 API を使用するため、再現性の高い方法でアプリケーションを宣言的に記述することができます。YAML を使用して Kafka ソースを作成するには、KafkaSource
オブジェクトを定義する YAML ファイルを作成し、oc apply
コマンドを使用してそれを適用する必要があります。
前提条件
-
OpenShift Serverless Operator、Knative Serving、および
KnativeKafka
カスタムリソースがクラスターにインストールされている。 - OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
- インポートする Kafka メッセージを生成する Red Hat AMQ Streams (Kafka) クラスターにアクセスできる。
-
OpenShift CLI (
oc
) をインストールしている。
手順
KafkaSource
オブジェクトを YAML ファイルとして作成します。apiVersion: sources.knative.dev/v1beta1 kind: KafkaSource metadata: name: <source_name> spec: consumerGroup: <group_name> 1 bootstrapServers: - <list_of_bootstrap_servers> topics: - <list_of_topics> 2 sink: - <list_of_sinks> 3
重要OpenShift Serverless 上の
KafkaSource
オブジェクトの API のv1beta1
バージョンのみがサポートされます。非推奨となったv1alpha1
バージョンの API は使用しないでください。KafkaSource
オブジェクトの例apiVersion: sources.knative.dev/v1beta1 kind: KafkaSource metadata: name: kafka-source spec: consumerGroup: knative-group bootstrapServers: - my-cluster-kafka-bootstrap.kafka:9092 topics: - knative-demo-topic sink: ref: apiVersion: serving.knative.dev/v1 kind: Service name: event-display
KafkaSource
YAML ファイルを適用します。$ oc apply -f <filename>
検証
以下のコマンドを入力して、Kafka イベントソースが作成されたことを確認します。
$ oc get pods
出力例
NAME READY STATUS RESTARTS AGE kafkasource-kafka-source-5ca0248f-... 1/1 Running 0 13m
5.15.3. Kafka ブローカー
実稼働環境に対応した Knative Eventing デプロイメントの場合、Red Hat は Knative Kafka ブローカーの実装を使用することをお勧めします。Kafka ブローカーは、Knative ブローカーの Apache Kafka ネイティブ実装であり、CloudEvents を Kafka インスタンスに直接送信します。
Kafka ブローカーの連邦情報処理標準 (FIPS) モードが無効になっています。
Kafka ブローカーは、イベントの保存とルーティングのために Kafka とネイティブに統合されています。これにより、他のブローカータイプよりもブローカーとトリガーモデルの Kafka との統合性が向上し、ネットワークホップを削減することができます。Kafka ブローカー実装のその他の利点は次のとおりです。
- 少なくとも 1 回の配信保証
- CloudEvents パーティショニング拡張機能に基づくイベントの順序付き配信
- コントロールプレーンの高可用性
- 水平方向にスケーラブルなデータプレーン
Knative Kafka ブローカーは、バイナリーコンテンツモードを使用して、受信 CloudEvents を Kafka レコードとして保存します。これは、CloudEvent のすべての属性と拡張機能が Kafka レコードのヘッダーとしてマップされ、CloudEvent の data
仕様が Kafka レコードの値に対応することを意味します。
Kafka ブローカーの使用については、ブローカーの作成 を参照してください。
5.15.4. YAML を使用した Kafka チャネルの作成
YAML ファイルを使用して Knative リソースを作成する場合、宣言的 API を使用するため、再現性の高い方法でチャネルを宣言的に記述することができます。Kafka チャネルを作成することで、Kafka トピックに裏打ちされた Knative Eventing チャネルを作成できます。YAML を使用して Kafka チャネルを作成するには、KafkaChannel
オブジェクトを定義する YAML ファイルを作成し、oc apply
コマンドを使用してそれを適用する必要があります。
前提条件
-
OpenShift Serverless Operator、Knative Eventing、および
KnativeKafka
カスタムリソースは OpenShift Container Platform クラスターにインストールされます。 -
OpenShift CLI (
oc
) をインストールしている。 - OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
手順
KafkaChannel
オブジェクトを YAML ファイルとして作成します。apiVersion: messaging.knative.dev/v1beta1 kind: KafkaChannel metadata: name: example-channel namespace: default spec: numPartitions: 3 replicationFactor: 1
重要OpenShift Serverless 上の
KafkaChannel
オブジェクトの API のv1beta1
バージョンのみがサポートされます。非推奨となったv1alpha1
バージョンの API は使用しないでください。KafkaChannel
YAML ファイルを適用します。$ oc apply -f <filename>
5.15.5. Kafka シンクコ
Kafka シンクは、クラスター管理者がクラスターで Kafka を有効にした場合に使用できるイベントシンクの一種です。Kafka シンクを使用して、イベントソースから Kafka トピックにイベントを直接送信できます。
5.15.5.1. Kafka シンクの使用
Kafka トピックにイベントを送信する Kafka シンクと呼ばれるイベントシンクを作成できます。YAML ファイルを使用して Knative リソースを作成する場合、宣言的 API を使用するため、再現性の高い方法でアプリケーションを宣言的に記述することができます。デフォルトでは、Kafka シンクはバイナリーコンテンツモードを使用します。これは、構造化モードよりも効率的です。YAML を使用して Kafka シンクを作成するには、KafkaSink
オブジェクトを定義する YAML ファイルを作成してから、ocapply
コマンドを使用してそれを適用する必要があります。
前提条件
-
OpenShift Serverless Operator、Knative Serving、および
KnativeKafka
カスタムリソース (CR) がクラスターにインストールされている。 - OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
- インポートする Kafka メッセージを生成する Red Hat AMQ Streams (Kafka) クラスターにアクセスできる。
-
OpenShift CLI (
oc
) をインストールしている。
手順
KafkaSink
オブジェクト定義を YAML ファイルとして作成します。Kafka シンク YAML
apiVersion: eventing.knative.dev/v1alpha1 kind: KafkaSink metadata: name: <sink-name> namespace: <namespace> spec: topic: <topic-name> bootstrapServers: - <bootstrap-server>
Kafka シンクを作成するには、
KafkaSink
YAML ファイルを適用します。$ oc apply -f <filename>
シンクが仕様で指定されるようにイベントソースを設定します。
API サーバーソースに接続された Kafka シンクの例
apiVersion: sources.knative.dev/v1alpha2 kind: ApiServerSource metadata: name: <source-name> 1 namespace: <namespace> 2 spec: serviceAccountName: <service-account-name> 3 mode: Resource resources: - apiVersion: v1 kind: Event sink: ref: apiVersion: eventing.knative.dev/v1alpha1 kind: KafkaSink name: <sink-name> 4