2.3. 将数据源连接到 Kamelet Binding 中的 Kafka 主题
要将数据源连接到 Kafka 主题,您可以创建一个 Kamelet Binding,如图 2.2 所示。
图 2.2 将数据源连接到 Kafka 主题
前提条件
您知道要发送事件的 Kafka 主题的名称。
此流程中的示例使用
test-topic
接收事件。您知道 Kafka 实例的以下参数值:
- BootstrapServers - 以逗号分隔的 Kafka Broker URL 列表。
-
Password - 在 Kafka 中进行身份验证的密码。对于 OpenShift Streams,这是
credentials.json
文件中的密码
。对于 AMQ Streams 上的未经身份验证的 kafka 实例,您可以指定任何非空字符串。 User - Kafka 身份验证的用户名。对于 OpenShift Streams,这是
credentials.json
文件中的clientID
。对于 AMQ Streams 上的未经身份验证的 kafka 实例,您可以指定任何非空字符串。有关使用 OpenShift Streams 时如何获取这些值的详情,请参考 Obtaining Kafka 凭证。
-
securityProtocol - 您知道与 Kafka 代理通信的安全协议。对于 OpenShift Streams 上的 Kafka 集群,它是
SASL_SSL
(默认值)。对于 AMQ 流上的 Kafka 集群,它是PLAINTEXT
。
您知道哪个 Kame 可让您添加到 Camel K 集成以及所需的实例参数。
该流程的 Kamelets 示例如下:
coffee-source
Kamelet - 它有一个可选参数,它指定每个事件的发送频率。您可以将 示例源 Kamelet 中的代码 复制到名为
coffee-source.kamelet.yaml
文件,然后运行以下命令将其添加为您的命名空间:oc apply -f coffee-source.kamelet.yaml
-
Kamelet Catalog 中提供的
kafka-sink
Kamelet。您可以使用kafka-sink
Kamelet,因为 Kafka 主题在这个绑定中收到数据(它是数据消费者)。
流程
要将数据源连接到 Kafka 主题,请创建一个 Kamelet Binding:
在您选择的编辑器中,使用以下基本结构创建一个 YAML 文件:
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: spec: source: sink:
为 Kamelet Binding 添加名称。在本例中,名称为
coffees-to-kafka
,因为绑定将coffee-source
Kamelet 连接到kafka-sink
Kamelet。apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffees-to-kafka spec: source: sink:
对于 Kamelet Binding 的源,指定数据源 Kamelet (例如,coff
ee-source
Kamelet 会生成包含与 coffee 的数据相关的事件),并为 Kamelet 配置任何参数。apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffees-to-kafka spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: coffee-source properties: period: 5000 sink:
对于 Kamelet Binding 的 sink,请指定
kafka-sink
Kamelet 及其必要属性。例如,当 Kafka 集群位于 OpenShift Streams 时:
-
对于
user
属性,指定clientID
,例如:srvc-acct-eb575691-b94a-41f1-ab97-50ade0cd1094
-
对于
password
属性,指定密码
,例如:facf3df1-3c8d-4253-aa87-8c95ca5e1225
您不需要设置
securityProtocol
属性。apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffees-to-kafka spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: coffee-source properties: period: 5000 sink: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: kafka-sink properties: bootstrapServers: "my-kafka--ptdfzrhmlkwqw-a-ykm-mawgdh.kafka.devshift.org:443" password: "facf3df1-3c8d-4253-aa87-8c95ca5e1225" topic: "test-topic" user: "srvc-acct-eb575691-b94a-41f1-ab97-50ade0cd1094"
对于另一个示例,当 Kafka 集群位于 AMQ Streams 时,将
securityProtocol
属性设置为"PLAINTEXT
"。apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffees-to-kafka spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: coffee-source properties: period: 5000 sink: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: kafka-sink properties: bootstrapServers: "broker.url:9092" password: "testpassword" topic: "test-topic" user: "testuser" securityProtocol: "PLAINTEXT"
-
对于
-
保存 YAML 文件(例如,coffe
es-to-kafka.yaml
)。 - 登录您的 OpenShift 项目。
将 Kamelet Binding 作为资源添加到 OpenShift 命名空间:
oc apply -f <kamelet 绑定 filename>
例如:
oc apply -f coffees-to-kafka.yaml
Camel K 运算符通过使用
KameletBinding
资源生成并运行 Camel K 集成。构建可能需要几分钟时间。查看
KameletBinding
资源的状态:oc get kameletbindings
查看其集成的状态:
oc get integrations
查看集成的日志:
kamel logs <integration> -n <project>
例如:
kamel logs coffees-to-kafka -n my-camel-k-kafka