2.5. Apache Kafka のソース
Apache Kafka クラスターからイベントを読み取り、これらのイベントをシンクに渡す Apache Kafka ソースを作成できます。Kafka ソースを作成するには、OpenShift Container Platform Web コンソールの Knative (kn
) CLI を使用するか、KafkaSource
オブジェクトを YAML ファイルとして直接作成し、OpenShift CLI (oc
) を使用して適用します。
Apache Kafka の Knative ブローカーのインストール を参照してください。
2.5.1. Web コンソールを使用した Apache Kafka イベントソースの作成
Apache Kafka の Knative ブローカー実装がクラスターにインストールされたら、Web コンソールを使用して Apache Kafka ソースを作成できます。OpenShift Container Platform Web コンソールを使用すると、Kafka ソースを作成するための合理的で直感的なユーザーインターフェイスが提供されます。
前提条件
-
OpenShift Serverless Operator、Knative Serving、および
KnativeKafka
カスタムリソースがクラスターにインストールされている。 - Web コンソールにログインしている。
- インポートする Kafka メッセージを生成する Red Hat AMQ Streams (Kafka) クラスターにアクセスできる。
- OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
手順
- Developer パースペクティブで、+Add ページに移動し、Event Source を選択します。
- Event Sources ページで、Type セクションの Kafka Source を選択します。
Kafka Source 設定を設定します。
- ブートストラップサーバー のコンマ区切りの一覧を追加します。
- トピック のコンマ区切りの一覧を追加します。
- コンシューマーグループ を追加します。
- 作成したサービスアカウントの Service Account Name を選択します。
- イベントソースの Sink を選択します。Sink は、チャネル、ブローカー、またはサービスなどの Resource、または URI のいずれかになります。
- Kafka イベントソースの Name を入力します。
- Create をクリックします。
検証
Topology ページを表示して、Kafka イベントソースが作成され、シンクに接続されていることを確認できます。
- Developer パースペクティブで、Topology に移動します。
Kafka イベントソースおよびシンクを表示します。
2.5.2. Knative CLI を使用した Apache Kafka イベントソースの作成
kn source kafka create
コマンドを使用し、Knative (kn
) CLI を使用して Kafka ソースを作成できます。Knative CLI を使用してイベントソースを作成すると、YAML ファイルを直接変更するよりも合理化された直感的なユーザーインターフェイスが提供されます。
前提条件
-
OpenShift Serverless Operator、Knative Eventing、Knative Serving、および
KnativeKafka
カスタムリソース (CR) がクラスターにインストールされている。 - OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
- インポートする Kafka メッセージを生成する Red Hat AMQ Streams (Kafka) クラスターにアクセスできる。
-
Knative (
kn
) CLI がインストールされている。 -
オプション: この手順で検証ステップを使用する場合は、OpenShift CLI (
oc
) をインストールしている。
手順
Kafka イベントソースが機能していることを確認するには、受信メッセージをサービスのログにダンプする Knative サービスを作成します。
$ kn service create event-display \ --image quay.io/openshift-knative/knative-eventing-sources-event-display
KafkaSource
CR を作成します。$ kn source kafka create <kafka_source_name> \ --servers <cluster_kafka_bootstrap>.kafka.svc:9092 \ --topics <topic_name> --consumergroup my-consumer-group \ --sink event-display
注記このコマンドのプレースホルダー値は、ソース名、ブートストラップサーバー、およびトピックの値に置き換えます。
--servers
、--topics
、および--consumergroup
オプションは、Kafka クラスターへの接続パラメーターを指定します。--consumergroup
オプションは任意です。オプション: 作成した
KafkaSource
CR の詳細を表示します。$ kn source kafka describe <kafka_source_name>
出力例
Name: example-kafka-source Namespace: kafka Age: 1h BootstrapServers: example-cluster-kafka-bootstrap.kafka.svc:9092 Topics: example-topic ConsumerGroup: example-consumer-group Sink: Name: event-display Namespace: default Resource: Service (serving.knative.dev/v1) Conditions: OK TYPE AGE REASON ++ Ready 1h ++ Deployed 1h ++ SinkProvided 1h
検証手順
Kafka インスタンスをトリガーし、メッセージをトピックに送信します。
$ oc -n kafka run kafka-producer \ -ti --image=quay.io/strimzi/kafka:latest-kafka-2.7.0 --rm=true \ --restart=Never -- bin/kafka-console-producer.sh \ --broker-list <cluster_kafka_bootstrap>:9092 --topic my-topic
プロンプトにメッセージを入力します。このコマンドは、以下を前提とします。
-
Kafka クラスターが
kafka
namespace にインストールされている。 -
KafkaSource
オブジェクトがmy-topic
トピックを使用するように設定されている。
-
Kafka クラスターが
ログを表示して、メッセージが到達していることを確認します。
$ oc logs $(oc get pod -o name | grep event-display) -c user-container
出力例
☁️ cloudevents.Event Validation: valid Context Attributes, specversion: 1.0 type: dev.knative.kafka.event source: /apis/v1/namespaces/default/kafkasources/example-kafka-source#example-topic subject: partition:46#0 id: partition:46/offset:0 time: 2021-03-10T11:21:49.4Z Extensions, traceparent: 00-161ff3815727d8755848ec01c866d1cd-7ff3916c44334678-00 Data, Hello!
2.5.2.1. Knative CLI シンクフラグ
Knative (kn
) CLI を使用してイベントソースを作成する場合は、--sink
フラグを使用して、そのリソースからイベントが送信されるシンクを指定できます。シンクは、他のリソースから受信イベントを受信できる、アドレス指定可能または呼び出し可能な任意のリソースです。
以下の例では、サービスの http://event-display.svc.cluster.local
をシンクとして使用するシンクバインディングを作成します。
シンクフラグを使用したコマンドの例
$ kn source binding create bind-heartbeat \
--namespace sinkbinding-example \
--subject "Job:batch/v1:app=heartbeat-cron" \
--sink http://event-display.svc.cluster.local \ 1
--ce-override "sink=bound"
- 1
http://event-display.svc.cluster.local
のsvc
は、シンクが Knative サービスであることを判別します。他のデフォルトのシンクの接頭辞には、channel
およびbroker
が含まれます。
2.5.3. YAML を使用した Apache Kafka イベントソースの作成
YAML ファイルを使用して Knative リソースを作成する場合は、宣言的 API を使用するため、再現性の高い方法でアプリケーションを宣言的に記述できます。YAML を使用して Kafka ソースを作成するには、KafkaSource
オブジェクトを定義する YAML ファイルを作成し、oc apply
コマンドを使用してそれを適用する必要があります。
前提条件
-
OpenShift Serverless Operator、Knative Serving、および
KnativeKafka
カスタムリソースがクラスターにインストールされている。 - OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
- インポートする Kafka メッセージを生成する Red Hat AMQ Streams (Kafka) クラスターにアクセスできる。
-
OpenShift CLI (
oc
) がインストールされている。
手順
KafkaSource
オブジェクトを YAML ファイルとして作成します。apiVersion: sources.knative.dev/v1beta1 kind: KafkaSource metadata: name: <source_name> spec: consumerGroup: <group_name> 1 bootstrapServers: - <list_of_bootstrap_servers> topics: - <list_of_topics> 2 sink: - <list_of_sinks> 3
重要OpenShift Serverless 上の
KafkaSource
オブジェクトの API のv1beta1
バージョンのみがサポートされます。非推奨となったv1alpha1
バージョンの API は使用しないでください。KafkaSource
オブジェクトの例apiVersion: sources.knative.dev/v1beta1 kind: KafkaSource metadata: name: kafka-source spec: consumerGroup: knative-group bootstrapServers: - my-cluster-kafka-bootstrap.kafka:9092 topics: - knative-demo-topic sink: ref: apiVersion: serving.knative.dev/v1 kind: Service name: event-display
KafkaSource
YAML ファイルを適用します。$ oc apply -f <filename>
検証
以下のコマンドを入力して、Kafka イベントソースが作成されたことを確認します。
$ oc get pods
出力例
NAME READY STATUS RESTARTS AGE kafkasource-kafka-source-5ca0248f-... 1/1 Running 0 13m
2.5.4. Apache Kafka ソースの SASL 認証の設定
Simple Authentication and Security Layer (SASL) は、Apache Kafka が認証に使用します。クラスターで SASL 認証を使用する場合、ユーザーは Kafka クラスターと通信するために Knative に認証情報を提供する必要があります。そうしないと、イベントを生成または消費できません。
前提条件
- OpenShift Container Platform でクラスターまたは専用の管理者パーミッションを持っている。
-
OpenShift Serverless Operator、Knative Eventing、および
KnativeKafka
CR は、OpenShift Container Platform クラスターにインストールされている。 - OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
- Kafka クラスターのユーザー名およびパスワードがある。
-
使用する SASL メカニズムを選択している (例:
PLAIN
、SCRAM-SHA-256
、またはSCRAM-SHA-512
)。 -
TLS が有効になっている場合は、Kafka クラスターの
ca.crt
証明書ファイルがある。 -
OpenShift (
oc
) CLI がインストールされている。
手順
選択された namespace にシークレットとして証明書ファイルを作成します。
$ oc create secret -n <namespace> generic <kafka_auth_secret> \ --from-file=ca.crt=caroot.pem \ --from-literal=password="SecretPassword" \ --from-literal=saslType="SCRAM-SHA-512" \ 1 --from-literal=user="my-sasl-user"
- 1
- SASL タイプは
PLAIN
、SCRAM-SHA-256
、またはSCRAM-SHA-512
です。
Kafka ソースを作成または変更して、次の
spec
設定が含まれるようにします。apiVersion: sources.knative.dev/v1beta1 kind: KafkaSource metadata: name: example-source spec: ... net: sasl: enable: true user: secretKeyRef: name: <kafka_auth_secret> key: user password: secretKeyRef: name: <kafka_auth_secret> key: password type: secretKeyRef: name: <kafka_auth_secret> key: saslType tls: enable: true caCert: 1 secretKeyRef: name: <kafka_auth_secret> key: ca.crt ...
- 1
- パブリッククラウドの Kafka サービスを使用している場合は、
caCert
仕様は必要ありません。