2.4. 在 Kamelet Binding 中将 Kafka 主题连接到一个数据接收器
要将 Kafka 主题连接到数据接收器,您可以创建一个 Kamelet Binding,如图 2.3 所示。
图 2.3 将 Kafka 主题连接到一个数据接收器
先决条件
-
您知道要从中发送事件的 Kafka 主题的名称。此流程中的示例使用
test-topic
来发送事件。在 Kamelet Binding 中,用于接收从数据源连接到 Kafka 主题的事件时 的信息相同。 您知道 Kafka 实例的以下参数值:
- bootstrapServers - Kafka Broker URL 的逗号分隔列表。
- Password - 向 Kafka 验证的密码。
user - 向 Kafka 进行身份验证的用户名。
如需有关如何使用 OpenShift Streams 获取这些值的信息,请参阅 获取 Kafka 凭证。
-
您知道与 Kafka 代理通信的安全协议。对于 OpenShift Streams 上的 Kafka 集群,它是
SASL_SSL
(默认值)。对于 AMQ 流上的 Kafka 集群,它是PLAINTEXT
。 您知道哪个 Kame 可让您添加到 Camel K 集成和所需的实例参数中。Kamelet Catalog 中提供了这个流程的示例 Kamelets:
kafka-source
Kamelet - 使用kafka-source
Kamelet,因为 Kafka 主题在这个绑定中发送数据(它是数据制作者)。需要参数的示例值有:-
bootstrapServers -
"broker.url:9092"
-
password -
"testpassword"
-
user -
"testuser"
-
topic -
"test-topic"
-
securityProtocol - 对于 OpenShift Streams 上的 Kafka 集群,您不需要设置此参数,因为
SASL_SSL
是默认值。对于 AMQ 流上的 Kafka 集群,这个参数值为"PLAINTEXT
"。
-
bootstrapServers -
-
log-sink
Kamelet - 使用log-sink
来记录它从kafka-source
Kamelet 接收的数据。(可选)指定showStreams
参数来显示数据的消息正文。log-sink
Kamelet 可用于调试目的。
流程
要将 Kafka 主题连接到数据接收器,请创建一个 Kamelet Binding:
在您选择的编辑器中,创建一个具有以下基本结构的 YAML 文件:
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: spec: source: sink:
为 Kamelet Binding 添加一个名称。在本例中,名称是
kafka-to-log
,因为绑定将kafka-source
Kamelet 连接到log-sink
Kamelet。apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: kafka-to-log spec: source: sink:
对于 Kamelet Binding 的源,请指定
kafka-source
Kamelet 并配置其参数。例如,当 Kafka 集群位于 OpenShift Streams 中时(您不需要设置
securityProtocol
参数):apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: kafka-to-log spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: kafka-source properties: bootstrapServers: "broker.url:9092" password: "testpassword" topic: "test-topic" user: "testuser" sink:
例如,当 Kafka 集群位于 AMQ Streams 中时,您必须将
securityProtocol
参数设置为"PLAINTEXT"
:apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: kafka-to-log spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: kafka-source properties: bootstrapServers: "broker.url:9092" password: "testpassword" topic: "test-topic" user: "testuser" securityProtocol: "PLAINTEXT" sink:
对于 Kamelet Binding 的 sink,请指定数据使用者 Kamelet(例如,
log-sink
Kamelet)并为 Kamelet 配置任何参数:apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: kafka-to-log spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: kafka-source properties: bootstrapServers: "broker.url:9092" password: "testpassword" topic: "test-topic" user: "testuser" securityProtocol: "PLAINTEXT" // only for AMQ streams sink: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: log-sink properties: showStreams: true
-
保存 YAML 文件(例如,
kafka-to-log.yaml
)。 - 登录您的 OpenShift 项目。
将 Kamelet Binding 作为资源添加到 OpenShift 命名空间:
oc apply -f <kamelet binding filename>
例如:
oc apply -f kafka-to-log.yaml
Camel K operator 使用
KameletBinding
资源生成并运行 Camel K 集成。构建可能需要几分钟时间。查看
KameletBinding
资源的状态:oc get kameletbindings
查看其集成的状态:
oc get integrations
查看集成的日志:
kamel logs <integration> -n <project>
例如:
kamel logs kafka-to-log -n my-camel-k-kafka
在输出中,您应该看到 coffee 事件,例如:
INFO [log-sink-E80C5C904418150-0000000000000001] (Camel (camel-1) thread #0 - timer://tick) {"id":7259,"uid":"a4ecb7c2-05b8-4a49-b0d2-d1e8db5bc5e2","blend_name":"Postmodern Symphony","origin":"Huila, Colombia","variety":"Kona","notes":"delicate, chewy, black currant, red apple, star fruit","intensifier":"balanced"}
要停止正在运行的集成,请删除相关的 Kamelet Binding 资源:
oc delete kameletbindings/<kameletbinding-name>
例如:
oc delete kameletbindings/kafka-to-log