2.3. 在 Kamelet Binding 中将数据源连接到 Kafka 主题
要将数据源连接到 Kafka 主题,您可以创建一个 Kamelet Binding,如图 2.2 所示。
图 2.2 将数据源连接到 Kafka 主题
先决条件
您知道要将事件发送到的 Kafka 主题的名称。
此流程中的示例使用
test-topic
接收事件。您知道 Kafka 实例的以下参数值:
- bootstrapServers - Kafka Broker URL 的逗号分隔列表。
-
Password - 向 Kafka 验证的密码。对于 OpenShift Streams,这是
credentials.json
文件中的密码
。对于 AMQ Streams 上的未经身份验证的 kafka 实例,您可以指定任何非空字符串。 user - 向 Kafka 进行身份验证的用户名。对于 OpenShift Streams,这是
credentials.json
文件中的clientID
。对于 AMQ Streams 上的未经身份验证的 kafka 实例,您可以指定任何非空字符串。如需有关如何使用 OpenShift Streams 获取这些值的信息,请参阅 获取 Kafka 凭证。
-
SecurityProtocol - 您知道与 Kafka 代理通信的安全协议。对于 OpenShift Streams 上的 Kafka 集群,它是
SASL_SSL
(默认值)。对于 AMQ 流上的 Kafka 集群,它是PLAINTEXT
。
您知道哪个 Kame 可让您添加到 Camel K 集成和所需的实例参数中。
此流程的 Kamelets 示例包括:
coffee-source
Kamelet - 它有一个可选参数period
,它指定了发送每个事件的频率。您可以将 Example source Kamelet 中的代码复制到名为coffee-source.kamelet.yaml
文件的文件,然后运行以下命令来将其添加为命名空间的资源:oc apply -f coffee-source.kamelet.yaml
-
Kamelet Catalog 中提供的
kafka-sink
Kamelet。您可以使用kafka-sink
Kamelet,因为 Kafka 主题在这个绑定中接收数据(它是数据消费者)。
流程
要将数据源连接到 Kafka 主题,请创建一个 Kamelet Binding:
在您选择的编辑器中,创建一个具有以下基本结构的 YAML 文件:
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: spec: source: sink:
为 Kamelet Binding 添加一个名称。在本例中,名称是
coffees-to-kafka
,因为绑定将coffee-source
Kamelet 连接到kafka-sink
Kamelet。apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffees-to-kafka spec: source: sink:
对于 Kamelet Binding 的源,请指定数据源 Kamelet(例如,coff
ee-source
Kamelet 会生成事件,其中包含有关 coffee 的数据)并为 Kamelet 配置任何参数。apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffees-to-kafka spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: coffee-source properties: period: 5000 sink:
对于 Kamelet Binding 的 sink,请指定
kafka-sink
Kamelet 及其必要属性。例如,当 Kafka 集群位于 OpenShift Streams 中时:
-
对于
user
属性,指定clientID
,例如:srvc-acct-eb575691-b94a-41f1-ab97-50ade0cd1094
-
对于
password
属性,指定密码
,例如:facf3df1-3c8d-4253-aa87-8c95ca5e1225
您不需要设置
securityProtocol
属性。apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffees-to-kafka spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: coffee-source properties: period: 5000 sink: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: kafka-sink properties: bootstrapServers: "my-kafka--ptdfzrhmlkwqw-a-ykm-mawgdh.kafka.devshift.org:443" password: "facf3df1-3c8d-4253-aa87-8c95ca5e1225" topic: "test-topic" user: "srvc-acct-eb575691-b94a-41f1-ab97-50ade0cd1094"
另外,当 Kafka 集群位于 AMQ Streams 时,将
securityProtocol
属性设置为"PLAINTEXT"
:apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffees-to-kafka spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: coffee-source properties: period: 5000 sink: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: kafka-sink properties: bootstrapServers: "broker.url:9092" password: "testpassword" topic: "test-topic" user: "testuser" securityProtocol: "PLAINTEXT"
-
对于
-
保存 YAML 文件(例如,coffe
es-to-kafka.yaml
)。 - 登录您的 OpenShift 项目。
将 Kamelet Binding 作为资源添加到 OpenShift 命名空间:
oc apply -f <kamelet binding filename>
例如:
oc apply -f coffees-to-kafka.yaml
Camel K operator 使用
KameletBinding
资源生成并运行 Camel K 集成。构建可能需要几分钟时间。查看
KameletBinding
资源的状态:oc get kameletbindings
查看其集成的状态:
oc get integrations
查看集成的日志:
kamel logs <integration> -n <project>
例如:
kamel logs coffees-to-kafka -n my-camel-k-kafka