2.4. Kamelet Binding での Kafka トピックのデータシンクへの接続
Kafka トピックをデータシンクに接続するには、図 2.3 にあるように Kamelet Binding を作成します。
図 2.3 Kafka トピックのデータシンクへの接続
前提条件
-
イベントの送信元となる Kafka トピックの名前を知っておく必要があります。この手順の例では、イベントを送信するために
test-topic
を使用します。これは、Kamelet Binding でのデータソースの Kafka トピックへの接続 でコーヒーソースからイベントを受信するのに使用したトピックと同じです。 Kafka インスタンスの以下のパラメーターの値を知っている必要があります。
- bootstrapServers: Kafka Broker URL のコンマ区切りリスト
- password: Kafka に対して認証を行うためのパスワード。
user: Kafka に対して認証するユーザー名。
OpenShift Streams の使用時にこれらの値を取得する方法は、Kafka クレデンシャルの取得 を参照してください。
-
Kafka ブローカーとの通信のセキュリティープロトコルを把握している。OpenShift Streams の Kafka クラスターでは、
SASL_SSL
(デフォルト) です。AMQ Streams 上の Kafka クラスターでは、PLAINTEXT
になります。 Camel K インテグレーションに追加する Kamelets と必要なインスタンスパラメーターを知っている必要があります。この手順の Kamelets の例は、Katmelet Catalog に記載されています。
kafka-source
Kamelet: このバインディングでは Kafka トピックがデータを送信するため (データプロデューサー)、kafka-source
Kamelet を使用します。必須パラメーターの値の例は次のとおりです。-
bootstrapServers -
"broker.url:9092"
-
password -
"testpassword"
-
user -
"testuser"
-
topic -
"test-topic"
-
securityProtocol: OpenShift Streams の Kafka クラスターの場合には、
SASL_SSL
がデフォルト値であるため、このパラメーターを設定する必要はありません。AMQ ストリームの Kafka クラスターの場合は、このパラメーターの値は"PLAINTEXT"
です。
-
bootstrapServers -
-
log-sink
Kamelet:log-sink
を使用して、kafka-source
Kamelet から受信するデータをログに記録します。必要に応じて、showStreams
パラメーターを指定して、データのメッセージボディーを表示します。log-sink
Kamelet は、デバッグに役立ちます。
手順
Kafka トピックをデータシンクに接続するには、Kamelet Binding を作成します。
任意のエディターで、以下の基本構造で YAML ファイルを作成します。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow Kamelet Binding の名前を追加します。この例では、バインディングが
kafka-source
Kamelet をlog-sink
Kamelet に接続するため、名前はkafka-to-log
になります。Copy to Clipboard Copied! Toggle word wrap Toggle overflow Kamelet Binding のソースの場合は、
kafka-source
Kamelet を指定し、そのパラメーターを設定します。たとえば、Kafka クラスターが OpenShift Streams にある場合 (
securityProtocol
パラメーターを設定する必要はありません)。Copy to Clipboard Copied! Toggle word wrap Toggle overflow たとえば、Kafka クラスターが AMQ Streams にある場合は、
securityProtocol
パラメーターを"PLAINTEXT"
に設定する必要があります。Copy to Clipboard Copied! Toggle word wrap Toggle overflow Kamelet Binding のシンクでは、データコンシューマー Kamelet (例:
log-sink
Kamelet) を指定し、Kamelet のパラメーターを設定します。以下に例を示します。Copy to Clipboard Copied! Toggle word wrap Toggle overflow -
YAML ファイルを保存します (例:
kafka-to-log.yaml
)。 - OpenShift プロジェクトにログインします。
Kamelet Binding をリソースとして OpenShift namespace に追加します。
oc apply -f <kamelet binding filename>
以下に例を示します。
oc apply -f kafka-to-log.yaml
Camel K Operator は、
KameletBinding
リソースを使用して Camel K インテグレーションを生成し、実行します。ビルドに数分かかる場合があります。KameletBinding
リソースのステータスを表示するには、次のコマンドを実行します。oc get kameletbindings
インテグレーションの状態を表示するには、以下を実行します。
oc get integrations
インテグレーションのログを表示するには、以下を実行します。
kamel logs <integration> -n <project>
以下に例を示します。
kamel logs kafka-to-log -n my-camel-k-kafka
この出力では、以下の例のようにコーヒーイベントが表示されるはずです。
INFO [log-sink-E80C5C904418150-0000000000000001] (Camel (camel-1) thread #0 - timer://tick) {"id":7259,"uid":"a4ecb7c2-05b8-4a49-b0d2-d1e8db5bc5e2","blend_name":"Postmodern Symphony","origin":"Huila, Colombia","variety":"Kona","notes":"delicate, chewy, black currant, red apple, star fruit","intensifier":"balanced"}
INFO [log-sink-E80C5C904418150-0000000000000001] (Camel (camel-1) thread #0 - timer://tick) {"id":7259,"uid":"a4ecb7c2-05b8-4a49-b0d2-d1e8db5bc5e2","blend_name":"Postmodern Symphony","origin":"Huila, Colombia","variety":"Kona","notes":"delicate, chewy, black currant, red apple, star fruit","intensifier":"balanced"}
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 実行中のインテグレーションを停止するには、関連付けられた Kamelet Binding リソースを削除します。
oc delete kameletbindings/<kameletbinding-name>
以下に例を示します。
oc delete kameletbindings/kafka-to-log