3.3. Apache Kafka のシンク
Apache Kafka シンクは、クラスター管理者がクラスターで Apache Kafka を有効にした場合に使用できる イベントシンク の一種です。Kafka シンクを使用して、イベントソースから Kafka トピックにイベントを直接送信できます。
3.3.1. YAML を使用した Apache Kafka シンクの作成
イベントを Kafka トピックに送信する Kafka シンクを作成できます。デフォルトでは、Kafka シンクはバイナリーコンテンツモードを使用します。これは、構造化モードよりも効率的です。YAML を使用して Kafka シンクを作成するには、KafkaSink
オブジェクトを定義する YAML ファイルを作成してから、ocapply
コマンドを使用してそれを適用する必要があります。
前提条件
-
OpenShift Serverless Operator、Knative Serving、および
KnativeKafka
カスタムリソース (CR) がクラスターにインストールされている。 - OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
- インポートする Kafka メッセージを生成する Red Hat AMQ Streams (Kafka) クラスターにアクセスできる。
-
OpenShift CLI (
oc
) がインストールされている。
手順
KafkaSink
オブジェクト定義を YAML ファイルとして作成します。Kafka シンク YAML
apiVersion: eventing.knative.dev/v1alpha1 kind: KafkaSink metadata: name: <sink-name> namespace: <namespace> spec: topic: <topic-name> bootstrapServers: - <bootstrap-server>
Kafka シンクを作成するには、
KafkaSink
YAML ファイルを適用します。$ oc apply -f <filename>
シンクが仕様で指定されるようにイベントソースを設定します。
API サーバーソースに接続された Kafka シンクの例
apiVersion: sources.knative.dev/v1alpha2 kind: ApiServerSource metadata: name: <source-name> 1 namespace: <namespace> 2 spec: serviceAccountName: <service-account-name> 3 mode: Resource resources: - apiVersion: v1 kind: Event sink: ref: apiVersion: eventing.knative.dev/v1alpha1 kind: KafkaSink name: <sink-name> 4
3.3.2. OpenShift Container Platform Web コンソールを使用した Apache Kafka のイベントシンクの作成
OpenShift Container Platform Web コンソールの Developer パースペクティブを使用して、イベントを Kafka トピックに送信する Kafka シンクを作成できます。デフォルトでは、Kafka シンクはバイナリーコンテンツモードを使用します。これは、構造化モードよりも効率的です。
開発者は、イベントシンクを作成して、特定のソースからイベントを受信し、それらを Kafka トピックに送信できます。
前提条件
- OperatorHub から、Knative Serving、Knative Eventing、および Apache Kafka API 用の Knative ブローカーを使用して OpenShift Serverless Operator をインストールしている。
- Kafka 環境で Kafka トピックを作成しました。
手順
- Developer パースペクティブで、+Add ビューに移動します。
- Eventing カタログ で Event Sink をクリックします。
-
カタログ項目で
KafkaSink
を検索してクリックします。 - イベントシンクの作成 をクリックします。
フォームビューで、ホスト名とポートの組み合わせであるブートストラップサーバーの URL を入力します。
- イベントデータを送信するトピックの名前を入力します。
- イベントシンクの名前を入力します。
- Create をクリックします。
検証
- Developer パースペクティブで、Topology ビューに移動します。
- 作成したイベントシンクをクリックして、右側のパネルに詳細を表示します。
3.3.3. Apache Kafka シンクのセキュリティーの設定
Transport Layer Security (TLS) は、Apache Kafka クライアントおよびサーバーによって、Knative と Kafka 間のトラフィックを暗号化するため、および認証のために使用されます。TLS は、Apache Kafka の Knative ブローカー実装でサポートされている唯一のトラフィック暗号化方式です。
Simple Authentication and Security Layer (SASL) は、Apache Kafka が認証に使用します。クラスターで SASL 認証を使用する場合、ユーザーは Kafka クラスターと通信するために Knative に認証情報を提供する必要があります。そうしないと、イベントを生成または消費できません。
前提条件
-
OpenShift Serverless Operator、Knative Eventing、および
KnativeKafka
カスタムリソース (CR) は OpenShift Container Platform クラスターにインストールされます。 -
Kafka シンクは
KnativeKafka
CR で有効になっています。 - OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
-
.pem
ファイルとして Kafka クラスター CA 証明書が保存されている。 -
Kafka クラスタークライアント証明書とキーが
.pem
ファイルとして保存されている。 -
OpenShift (
oc
) CLI がインストールされている。 -
使用する SASL メカニズムを選択している (例:
PLAIN
、SCRAM-SHA-256
、またはSCRAM-SHA-512
)。
手順
KafkaSink
オブジェクトと同じ namespace に証明書ファイルをシークレットとして作成します。重要証明書とキーは PEM 形式である必要があります。
暗号化なしで SASL を使用した認証の場合:
$ oc create secret -n <namespace> generic <secret_name> \ --from-literal=protocol=SASL_PLAINTEXT \ --from-literal=sasl.mechanism=<sasl_mechanism> \ --from-literal=user=<username> \ --from-literal=password=<password>
SASL を使用した認証と TLS を使用した暗号化の場合:
$ oc create secret -n <namespace> generic <secret_name> \ --from-literal=protocol=SASL_SSL \ --from-literal=sasl.mechanism=<sasl_mechanism> \ --from-file=ca.crt=<my_caroot.pem_file_path> \ 1 --from-literal=user=<username> \ --from-literal=password=<password>
- 1
- パブリッククラウドで管理される Kafka サービスを使用している場合は、
ca.crt
を省略してシステムのルート CA セットを使用できます。
TLS を使用した認証と暗号化の場合:
$ oc create secret -n <namespace> generic <secret_name> \ --from-literal=protocol=SSL \ --from-file=ca.crt=<my_caroot.pem_file_path> \ 1 --from-file=user.crt=<my_cert.pem_file_path> \ --from-file=user.key=<my_key.pem_file_path>
- 1
- パブリッククラウドで管理される Kafka サービスを使用している場合は、
ca.crt
を省略してシステムのルート CA セットを使用できます。
KafkaSink
オブジェクトを作成または変更し、auth
仕様にシークレットへの参照を追加します。apiVersion: eventing.knative.dev/v1alpha1 kind: KafkaSink metadata: name: <sink_name> namespace: <namespace> spec: ... auth: secret: ref: name: <secret_name> ...
KafkaSink
オブジェクトを適用します。$ oc apply -f <filename>