2.5. Apache Kafka のソース
Apache Kafka クラスターからイベントを読み取り、これらのイベントをシンクに渡す Apache Kafka ソースを作成できます。Kafka ソースを作成するには、OpenShift Container Platform Web コンソールの Knative (kn
)CLI を使用するか、KafkaSource
オブジェクトを YAML ファイルとして直接作成し、OpenShift CLI (oc
) を使用して適用します。
Apache Kafka の Knative ブローカーのインストール のドキュメントを参照してください。
2.5.1. Web コンソールを使用した Apache Kafka イベントソースの作成 リンクのコピーリンクがクリップボードにコピーされました!
Apache Kafka の Knative ブローカー実装がクラスターにインストールされたら、Web コンソールを使用して Apache Kafka ソースを作成できます。OpenShift Container Platform Web コンソールを使用すると、Kafka ソースを作成するための合理的で直感的なユーザーインターフェイスが提供されます。
前提条件
-
OpenShift Serverless Operator、Knative Serving、および
KnativeKafka
カスタムリソースがクラスターにインストールされている。 - Web コンソールにログインしている。
- インポートする Kafka メッセージを生成する Red Hat AMQ Streams (Kafka) クラスターにアクセスできる。
- OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
手順
- Developer パースペクティブで、+Add ページに移動し、Event Source を選択します。
- Event Sources ページで、Type セクションの Kafka Source を選択します。
Kafka Source 設定を設定します。
- ブートストラップサーバー のコンマ区切りの一覧を追加します。
- トピック のコンマ区切りの一覧を追加します。
- コンシューマーグループ を追加します。
- 作成したサービスアカウントの Service Account Name を選択します。
- イベントソースの Sink を選択します。Sink は、チャネル、ブローカー、またはサービスなどの Resource、または URI のいずれかになります。
- Kafka イベントソースの Name を入力します。
- Create をクリックします。
検証
Topology ページを表示して、Kafka イベントソースが作成され、シンクに接続されていることを確認できます。
- Developer パースペクティブで、Topology に移動します。
Kafka イベントソースおよびシンクを表示します。
2.5.2. Knative CLI を使用した Apache Kafka イベントソースの作成 リンクのコピーリンクがクリップボードにコピーされました!
kn source kafka create
コマンドを使用し、Knative (kn
) CLI を使用して Kafka ソースを作成できます。Knative CLI を使用してイベントソースを作成すると、YAML ファイルを直接変更するよりも合理化された直感的なユーザーインターフェイスが提供されます。
前提条件
-
OpenShift Serverless Operator、Knative Eventing、Knative Serving、および
KnativeKafka
カスタムリソース (CR) がクラスターにインストールされている。 - OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
- インポートする Kafka メッセージを生成する Red Hat AMQ Streams (Kafka) クラスターにアクセスできる。
-
Knative (
kn
) CLI がインストールされている。 -
オプション: この手順で検証ステップを使用する場合は、OpenShift CLI (
oc
) をインストールしている。
手順
Kafka イベントソースが機能していることを確認するには、受信メッセージをサービスのログにダンプする Knative サービスを作成します。
kn service create event-display \ --image quay.io/openshift-knative/knative-eventing-sources-event-display
$ kn service create event-display \ --image quay.io/openshift-knative/knative-eventing-sources-event-display
Copy to Clipboard Copied! Toggle word wrap Toggle overflow KafkaSource
CR を作成します。kn source kafka create <kafka_source_name> \ --servers <cluster_kafka_bootstrap>.kafka.svc:9092 \ --topics <topic_name> --consumergroup my-consumer-group \ --sink event-display
$ kn source kafka create <kafka_source_name> \ --servers <cluster_kafka_bootstrap>.kafka.svc:9092 \ --topics <topic_name> --consumergroup my-consumer-group \ --sink event-display
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 注記このコマンドのプレースホルダー値は、ソース名、ブートストラップサーバー、およびトピックの値に置き換えます。
--servers
、--topics
、および--consumergroup
オプションは、Kafka クラスターへの接続パラメーターを指定します。--consumergroup
オプションは任意です。オプション: 作成した
KafkaSource
CR の詳細を表示します。kn source kafka describe <kafka_source_name>
$ kn source kafka describe <kafka_source_name>
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 出力例
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
検証手順
Kafka インスタンスをトリガーし、メッセージをトピックに送信します。
oc -n kafka run kafka-producer \ -ti --image=quay.io/strimzi/kafka:latest-kafka-2.7.0 --rm=true \ --restart=Never -- bin/kafka-console-producer.sh \ --broker-list <cluster_kafka_bootstrap>:9092 --topic my-topic
$ oc -n kafka run kafka-producer \ -ti --image=quay.io/strimzi/kafka:latest-kafka-2.7.0 --rm=true \ --restart=Never -- bin/kafka-console-producer.sh \ --broker-list <cluster_kafka_bootstrap>:9092 --topic my-topic
Copy to Clipboard Copied! Toggle word wrap Toggle overflow プロンプトにメッセージを入力します。このコマンドは、以下を前提とします。
-
Kafka クラスターが
kafka
namespace にインストールされている。 -
KafkaSource
オブジェクトは、my-topic
トピックを使用するように設定されている。
-
Kafka クラスターが
ログを表示して、メッセージが到達していることを確認します。
oc logs $(oc get pod -o name | grep event-display) -c user-container
$ oc logs $(oc get pod -o name | grep event-display) -c user-container
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 出力例
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
2.5.2.1. Knative CLI シンクフラグ リンクのコピーリンクがクリップボードにコピーされました!
Knative (kn
) CLI を使用してイベントソースを作成する場合は、--sink
フラグを使用して、イベントがリソースから送信されるシンクを指定できます。シンクは、他のリソースから受信イベントを受信できる、アドレス指定可能または呼び出し可能な任意のリソースです。
以下の例では、サービスの http://event-display.svc.cluster.local
をシンクとして使用するシンクバインディングを作成します。
シンクフラグを使用したコマンドの例
kn source binding create bind-heartbeat \ --namespace sinkbinding-example \ --subject "Job:batch/v1:app=heartbeat-cron" \ --sink http://event-display.svc.cluster.local \ --ce-override "sink=bound"
$ kn source binding create bind-heartbeat \
--namespace sinkbinding-example \
--subject "Job:batch/v1:app=heartbeat-cron" \
--sink http://event-display.svc.cluster.local \
--ce-override "sink=bound"
- 1
http://event-display.svc.cluster.local
のsvc
は、シンクが Knative サービスであることを判別します。他のデフォルトのシンクの接頭辞には、channel
およびbroker
が含まれます。
2.5.3. YAML を使用した Apache Kafka イベントソースの作成 リンクのコピーリンクがクリップボードにコピーされました!
YAML ファイルを使用して Knative リソースを作成する場合、宣言的 API を使用するため、再現性の高い方法でアプリケーションを宣言的に記述することができます。YAML を使用して Kafka ソースを作成するには、KafkaSource
オブジェクトを定義する YAML ファイルを作成し、oc apply
コマンドを使用してそれを適用する必要があります。
前提条件
-
OpenShift Serverless Operator、Knative Serving、および
KnativeKafka
カスタムリソースがクラスターにインストールされている。 - OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
- インポートする Kafka メッセージを生成する Red Hat AMQ Streams (Kafka) クラスターにアクセスできる。
-
OpenShift CLI (
oc
) がインストールされている。
手順
KafkaSource
オブジェクトを YAML ファイルとして作成します。Copy to Clipboard Copied! Toggle word wrap Toggle overflow 重要OpenShift Serverless 上の
KafkaSource
オブジェクトの API のv1beta1
バージョンのみがサポートされます。非推奨となったv1alpha1
バージョンの API は使用しないでください。KafkaSource
オブジェクトの例Copy to Clipboard Copied! Toggle word wrap Toggle overflow KafkaSource
YAML ファイルを適用します。oc apply -f <filename>
$ oc apply -f <filename>
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
検証
以下のコマンドを入力して、Kafka イベントソースが作成されたことを確認します。
oc get pods
$ oc get pods
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 出力例
NAME READY STATUS RESTARTS AGE kafkasource-kafka-source-5ca0248f-... 1/1 Running 0 13m
NAME READY STATUS RESTARTS AGE kafkasource-kafka-source-5ca0248f-... 1/1 Running 0 13m
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
2.5.4. Apache Kafka ソースの SASL 認証の設定 リンクのコピーリンクがクリップボードにコピーされました!
Simple Authentication and Security Layer (SASL) は、Apache Kafka が認証に使用します。クラスターで SASL 認証を使用する場合、ユーザーは Kafka クラスターと通信するために Knative に認証情報を提供する必要があります。そうしないと、イベントを生成または消費できません。
前提条件
- OpenShift Container Platform でクラスターまたは専用の管理者パーミッションを持っている。
-
OpenShift Serverless Operator、Knative Eventing、および
KnativeKafka
CR は、OpenShift Container Platform クラスターにインストールされている。 - OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
- Kafka クラスターのユーザー名およびパスワードがある。
-
使用する SASL メカニズムを選択している (例:
PLAIN
、SCRAM-SHA-256
、またはSCRAM-SHA-512
)。 -
TLS が有効にされている場合、Kafka クラスターの
ca.crt
証明書ファイルも必要になります。 -
OpenShift (
oc
) CLI がインストールされている。
手順
選択された namespace にシークレットとして証明書ファイルを作成します。
oc create secret -n <namespace> generic <kafka_auth_secret> \ --from-file=ca.crt=caroot.pem \ --from-literal=password="SecretPassword" \ --from-literal=saslType="SCRAM-SHA-512" \ --from-literal=user="my-sasl-user"
$ oc create secret -n <namespace> generic <kafka_auth_secret> \ --from-file=ca.crt=caroot.pem \ --from-literal=password="SecretPassword" \ --from-literal=saslType="SCRAM-SHA-512" \
1 --from-literal=user="my-sasl-user"
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - 1
- SASL タイプは
PLAIN
、SCRAM-SHA-256
、またはSCRAM-SHA-512
です。
Kafka ソースを作成または変更して、次の
spec
設定が含まれるようにします。Copy to Clipboard Copied! Toggle word wrap Toggle overflow - 1
- パブリッククラウドの Kafka サービスを使用している場合は、
caCert
仕様は必要ありません。