2.4. Kamelet Binding での Kafka トピックのデータシンクへの接続
Kafka トピックをデータシンクに接続するには、図 2.3 にあるように Kamelet Binding を作成します。
図 2.3 Kafka トピックのデータシンクへの接続
前提条件
-
イベントの送信元となる Kafka トピックの名前を知っておく必要があります。この手順の例では、イベントを送信するために
test-topicを使用します。これは、Kamelet Binding でのデータソースの Kafka トピックへの接続 でコーヒーソースからイベントを受信するのに使用したトピックと同じです。 Kafka インスタンスの以下のパラメーターの値を知っている必要があります。
- bootstrapServers: Kafka Broker URL のコンマ区切りリスト
- password: Kafka に対して認証を行うためのパスワード。
user: Kafka に対して認証するユーザー名。
OpenShift Streams の使用時にこれらの値を取得する方法は、Kafka クレデンシャルの取得 を参照してください。
-
Kafka ブローカーとの通信のセキュリティープロトコルを把握している。OpenShift Streams の Kafka クラスターでは、
SASL_SSL(デフォルト) です。AMQ Streams 上の Kafka クラスターでは、PLAINTEXTになります。 Camel K インテグレーションに追加する Kamelets と必要なインスタンスパラメーターを知っている必要があります。この手順の Kamelets の例は、Katmelet Catalog に記載されています。
kafka-sourceKamelet: このバインディングでは Kafka トピックがデータを送信するため (データプロデューサー)、kafka-sourceKamelet を使用します。必須パラメーターの値の例は次のとおりです。-
bootstrapServers -
"broker.url:9092" -
password -
"testpassword" -
user -
"testuser" -
topic -
"test-topic" -
securityProtocol: OpenShift Streams の Kafka クラスターの場合には、
SASL_SSLがデフォルト値であるため、このパラメーターを設定する必要はありません。AMQ ストリームの Kafka クラスターの場合は、このパラメーターの値は"PLAINTEXT"です。
-
bootstrapServers -
-
log-sinkKamelet:log-sinkを使用して、kafka-sourceKamelet から受信するデータをログに記録します。必要に応じて、showStreamsパラメーターを指定して、データのメッセージボディーを表示します。log-sinkKamelet は、デバッグに役立ちます。
手順
Kafka トピックをデータシンクに接続するには、Kamelet Binding を作成します。
任意のエディターで、以下の基本構造で YAML ファイルを作成します。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow Kamelet Binding の名前を追加します。この例では、バインディングが
kafka-sourceKamelet をlog-sinkKamelet に接続するため、名前はkafka-to-logになります。Copy to Clipboard Copied! Toggle word wrap Toggle overflow Kamelet Binding のソースの場合は、
kafka-sourceKamelet を指定し、そのパラメーターを設定します。たとえば、Kafka クラスターが OpenShift Streams にある場合 (
securityProtocolパラメーターを設定する必要はありません)。Copy to Clipboard Copied! Toggle word wrap Toggle overflow たとえば、Kafka クラスターが AMQ Streams にある場合は、
securityProtocolパラメーターを"PLAINTEXT"に設定する必要があります。Copy to Clipboard Copied! Toggle word wrap Toggle overflow Kamelet Binding のシンクでは、データコンシューマー Kamelet (例:
log-sinkKamelet) を指定し、Kamelet のパラメーターを設定します。以下に例を示します。Copy to Clipboard Copied! Toggle word wrap Toggle overflow -
YAML ファイルを保存します (例:
kafka-to-log.yaml)。 - OpenShift プロジェクトにログインします。
Kamelet Binding をリソースとして OpenShift namespace に追加します。
oc apply -f <kamelet binding filename>以下に例を示します。
oc apply -f kafka-to-log.yamlCamel K Operator は、
KameletBindingリソースを使用して Camel K インテグレーションを生成し、実行します。ビルドに数分かかる場合があります。KameletBindingリソースのステータスを表示するには、次のコマンドを実行します。oc get kameletbindingsインテグレーションの状態を表示するには、以下を実行します。
oc get integrationsインテグレーションのログを表示するには、以下を実行します。
kamel logs <integration> -n <project>以下に例を示します。
kamel logs kafka-to-log -n my-camel-k-kafkaこの出力では、以下の例のようにコーヒーイベントが表示されるはずです。
INFO [log-sink-E80C5C904418150-0000000000000001] (Camel (camel-1) thread #0 - timer://tick) {"id":7259,"uid":"a4ecb7c2-05b8-4a49-b0d2-d1e8db5bc5e2","blend_name":"Postmodern Symphony","origin":"Huila, Colombia","variety":"Kona","notes":"delicate, chewy, black currant, red apple, star fruit","intensifier":"balanced"}INFO [log-sink-E80C5C904418150-0000000000000001] (Camel (camel-1) thread #0 - timer://tick) {"id":7259,"uid":"a4ecb7c2-05b8-4a49-b0d2-d1e8db5bc5e2","blend_name":"Postmodern Symphony","origin":"Huila, Colombia","variety":"Kona","notes":"delicate, chewy, black currant, red apple, star fruit","intensifier":"balanced"}Copy to Clipboard Copied! Toggle word wrap Toggle overflow 実行中のインテグレーションを停止するには、関連付けられた Kamelet Binding リソースを削除します。
oc delete kameletbindings/<kameletbinding-name>以下に例を示します。
oc delete kameletbindings/kafka-to-log