2.3. Kamelet Binding でのデータソースの Kafka トピックへの接続
データソースを Kafka トピックに接続するには、図 2.2で説明されているように Kamelet Binding を作成します。
図 2.2 データソースの Kafka トピックへの接続
前提条件
イベントの送信先となる Kafka トピックの名前を知っておく必要があります。
この手順の例では、イベントを受信するために
test-topic
を使用します。Kafka インスタンスの以下のパラメーターの値を知っている必要があります。
- bootstrapServers: Kafka Broker URL のコンマ区切りリスト
-
password: Kafka に対して認証を行うためのパスワード。OpenShift Streams では、これは
credentials.json
ファイルpassword
です。AMQ Streams の認証されていない kafka インスタンスでは、任意の空でない文字列を指定できます。 user: Kafka に対して認証するユーザー名。OpenShift Streams では、これは
credentials.json
ファイルclientID
です。AMQ Streams の認証されていない kafka インスタンスでは、任意の空でない文字列を指定できます。OpenShift Streams の使用時にこれらの値を取得する方法は、Kafka クレデンシャルの取得 を参照してください。
-
securityProtocol: Kafka ブローカーと通信するためのセキュリティープロトコルを知っている必要があります。OpenShift Streams の Kafka クラスターでは、
SASL_SSL
(デフォルト) です。AMQ Streams 上の Kafka クラスターでは、PLAINTEXT
になります。
Camel K インテグレーションに追加する Kamelets と必要なインスタンスパラメーターを知っている必要があります。
この手順の Kamelets の例は次のとおりです。
coffee-source
Kamelet: 各イベントを送信する頻度を指定するオプションのパラメーターperiod
があります。ソース Kamelet の例 のコードを、coffee-source.kamelet.yaml
ファイルという名前のファイルにコピーしてから、以下のコマンドを実行してこれをリソースとして namespace に追加できます。oc apply -f coffee-source.kamelet.yaml
-
Kamelet Catalog で提供される
kafka-sink
Kamelet。Kafka トピックがこのバインディングでデータ (データコンシューマー) を受信しているため、kafka-sink
Kamelet を使用します。
手順
データソースを Kafka トピックに接続するには、Kamelet Binding を作成します。
任意のエディターで、以下の基本構造で YAML ファイルを作成します。
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: spec: source: sink:
Kamelet Binding の名前を追加します。この例では、バインディングが
coffee-source
Kamelet をkafka-sink
Kamelet に接続するため、名前はcoffees-to-kafka
になります。apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffees-to-kafka spec: source: sink:
Kamelet Binding のソースの場合は、データソース Kamelet を指定し (たとえば、
coffee-source
Kamelet はコーヒーに関するデータが含まれるイベントを生成します)、Kamelet のパラメーターを設定します。apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffees-to-kafka spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: coffee-source properties: period: 5000 sink:
Kamelet Binding のシンクの場合は、
kafka-sink
Kamelet およびその必要なプロパティーを指定します。たとえば、Kafka クラスターが OpenShift Streams 上にある場合:
-
user
プロパティーには、clientID
指定します (例:srvc-acct-eb575691-b94a-41f1-ab97-50ade0cd1094
) -
password
プロパティーには、password
指定します (例:facf3df1-3c8d-4253-aa87-8c95ca5e1225
) securityProtocol
プロパティーを設定する必要はありません。apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffees-to-kafka spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: coffee-source properties: period: 5000 sink: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: kafka-sink properties: bootstrapServers: "my-kafka--ptdfzrhmlkwqw-a-ykm-mawgdh.kafka.devshift.org:443" password: "facf3df1-3c8d-4253-aa87-8c95ca5e1225" topic: "test-topic" user: "srvc-acct-eb575691-b94a-41f1-ab97-50ade0cd1094"
別の例として、Kafka クラスターが AMQ Streams 上にある場合、
securityProtocol
プロパティーを“PLAINTEXT”
に設定します。apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffees-to-kafka spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: coffee-source properties: period: 5000 sink: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: kafka-sink properties: bootstrapServers: "broker.url:9092" password: "testpassword" topic: "test-topic" user: "testuser" securityProtocol: "PLAINTEXT"
-
-
YAML ファイルを保存します (例:
coffees-to-kafka.yaml
)。 - OpenShift プロジェクトにログインします。
Kamelet Binding をリソースとして OpenShift namespace に追加します。
oc apply -f <kamelet binding filename>
以下に例を示します。
oc apply -f coffees-to-kafka.yaml
Camel K Operator は、
KameletBinding
リソースを使用して Camel K インテグレーションを生成し、実行します。ビルドに数分かかる場合があります。KameletBinding
リソースのステータスを表示するには、次のコマンドを実行します。oc get kameletbindings
インテグレーションの状態を表示するには、以下を実行します。
oc get integrations
インテグレーションのログを表示するには、以下を実行します。
kamel logs <integration> -n <project>
以下に例を示します。
kamel logs coffees-to-kafka -n my-camel-k-kafka