2.3. Connecting a data source to a Kafka topic in a kamelet binding
データソースを Kafka トピックに接続するには、図 2.2 にあるように kamelet バインディングを作成します。
「Kafka トピックへのデータソースの接続」
前提条件
イベントの送信先となる Kafka トピックの名前を知っておく必要があります。
この手順の例では、イベントを受信するために
test-topic
を使用しています。Kafka インスタンスの以下のパラメーターの値を知っている必要があります。
- bootstrapServers: Kafka Broker URL のコンマ区切りリスト
- password: Kafka に対して認証を行うためのパスワード
user: Kafka に対して認証するユーザー名
OpenShift Streams の使用時にこれらの値を取得する方法は、「Kafka クレデンシャルの取得」を参照してください。
AMQストリームでの Kafka 認証に関する詳細は、「Managing secure access to Kafka」を参照してください。
-
Kafka ブローカーとの通信のセキュリティープロトコルを把握している。OpenShift Streams の Kafka クラスターでは、
SASL_SSL
(デフォルト) です。AMQ ストリームの Kafka クラスターの場合は、SASL/Plain
になります。 Camel K インテグレーションに追加する kamelets と必要なインスタンスパラメーターを把握している。この手順の kamelets の例は、以下のとおりです。
coffee-source
kamelet: 各イベントを送信する頻度を指定する任意のパラメーターperiod
があります。Example source kamelet のコードを、coffee-source.kamelet.yaml
ファイルという名前のファイルにコピーしてから、以下のコマンドを実行してこれをリソースとして namespace に追加できます。oc apply -f coffee-source.kamelet.yaml
kafka-sink
kamelet: Kamelet Catalog で提供されます。このバインディングでは Kafka トピックがデータを受信するため (データコンシューマー)、kafka-sink
kamelet を使用します。必須パラメーターの値の例は次のとおりです。-
bootstrapServers:
"broker.url:9092"
-
password -
"testpassword"
-
user:
"testuser"
-
topic:
"test-topic"
-
securityProtocol: OpenShift Streams の Kafka クラスターでは、
SASL_SSL
がデフォルト値であるため、このパラメーターを設定する必要はありません。AMQ ストリームの Kafka クラスターの場合は、このパラメーターの値は“PLAINTEXT”
です。
-
bootstrapServers:
手順
データソースを Kafka トピックに接続するには、kamelet バインディングを作成します。
任意のエディターで、以下の基本構造で YAML ファイルを作成します。
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: spec: source: sink:
kamelet binding の名前を追加します。この例では、バインディングが
coffee-source
kamelet をkafka-sink
kamelet に接続するため、名前はcoffees-to-kafka
になります。apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffees-to-kafka spec: source: sink:
kamelet バインディングのソースの場合は、データソース kamelet を指定し (たとえば、
coffee-source
kamelet はコーヒーに関するデータが含まれるイベントを生成します)、kamelet のパラメーターを設定します。apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffees-to-kafka spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: coffee-source properties: period: 5000 sink:
kamelet binding のシンクの場合は、
kafka-sink
kamelets およびその必要なパラメーターを指定します。たとえば、Kafka クラスターが OpenShift Streams にある場合 (
securityProtocol
パラメーターを設定する必要はありません)。apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffees-to-kafka spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: coffee-source properties: period: 5000 sink: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: kafka-sink properties: bootstrapServers: "broker.url:9092"" password: "testpassword" topic: "test-topic" user: "testuser"
たとえば、Kafka クラスターが AMQ Streams にある場合は、
securityProtocol
パラメーターを“PLAINTEXT”
に設定する必要があります。apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffees-to-kafka spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: coffee-source properties: period: 5000 sink: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: kafka-sink properties: bootstrapServers: "broker.url:9092"" password: "testpassword" topic: "test-topic" user: "testuser" securityProtocol: "PLAINTEXT"
-
YAML ファイルを保存します (例:
coffees-to-kafka.yaml
)。 - OpenShift プロジェクトにログインします。
KameletBinding をリソースとして OpenShift namespace に追加します。
oc apply -f <kamelet binding filename>
以下に例を示します。
oc apply -f coffees-to-kafka.yaml
Camel K Operator は、KameletBinding リソースを使用して Camel K インテグレーションを生成し、実行します。ビルドに数分かかる場合があります。
kamelet バインディングリソースのステータスを表示するには、次のコマンドを実行します。
oc get kameletbindings
インテグレーションの状態を表示するには、以下を実行します。
oc get integrations
インテグレーションのログを表示するには、以下を実行します。
kamel logs <integration> -n <project>
以下に例を示します。
kamel logs coffees-to-kafka -n my-camel-k-kafka
結果は以下の出力に似ています。
... [1] 2021-06-21 07:48:12,696 INFO [io.quarkus] (main) camel-k-integration 1.4.0 on JVM (powered by Quarkus 1.13.0.Final) started in 2.790s.