2.4. 在 Kamelet Binding 中将 Kafka 主题连接到数据 sink
要将 Kafka 主题连接到数据 sink,您可以创建一个 Kamelet Binding,如 图 2.3 所示。
图 2.3 将 Kafka 主题连接到数据 sink
先决条件
-
您知道要从中发送事件的 Kafka 主题的名称。此流程中的示例使用
test-topic
发送事件。您在 Kamelet Binding 中用于将数据源连接到 Kafka 主题中的 coffee 源接收事件相同。 您知道 Kafka 实例的以下参数值:
- bootstrapServers - 以逗号分隔的 Kafka Broker URL 列表。
- Password - 要向 Kafka 进行身份验证的密码。
user - 要向 Kafka 进行身份验证的用户名。
有关如何使用 OpenShift Streams 时获取这些值的信息,请参阅 获取 Kafka 凭证。
-
您知道与 Kafka 代理通信的安全协议。对于 OpenShift Streams 上的 Kafka 集群,它是
SASL_SSL
(默认)。对于 AMQ 流上的 Kafka 集群,它是PLAINTEXT
。 您知道您要添加到 Camel K 集成和所需的实例参数中的 Kamelets。Kamelet 目录提供了此流程的 Kamelet 示例:
kafka-source
Kamelet - 使用kafka-source
Kamelet,因为 Kafka 主题在这个绑定中发送数据(它是数据制作者)。所需参数的值示例如下:-
bootstrapServers -
"broker.url:9092"
-
password -
"testpassword"
-
user -
"testuser"
-
topic -
"test-topic"
-
securityProtocol - 对于 OpenShift Streams 上的 Kafka 集群,您不需要设置此参数,因为
SASL_SSL
是默认值。对于 AMQ 流上的 Kafka 集群,此参数值为"PLAINTEXT
"。
-
bootstrapServers -
-
log-sink
Kamelet - 使用log-sink
来记录它从kafka-source
Kamelet 接收的数据。(可选)指定showStreams
参数来显示数据的消息正文。log-sink
Kamelet 可用于调试。
流程
要将 Kafka 主题连接到数据 sink,请创建一个 Kamelet Binding:
在您选择的编辑器中,创建具有以下基本结构的 YAML 文件:
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: spec: source: sink:
为 Kamelet Binding 添加名称。在本例中,名称为
kafka-to-log
,因为绑定将kafka-source
Kamelet 连接到log-sink
Kamelet。apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: kafka-to-log spec: source: sink:
对于 Kamelet Binding 的源,指定
kafka-source
Kamelet 并配置其参数。例如,当 Kafka 集群位于 OpenShift Streams 中时(您不需要设置
securityProtocol
参数):apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: kafka-to-log spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: kafka-source properties: bootstrapServers: "broker.url:9092" password: "testpassword" topic: "test-topic" user: "testuser" sink:
例如,当 Kafka 集群位于 AMQ Streams 上时,必须将
securityProtocol
参数设置为"PLAINTEXT"
:apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: kafka-to-log spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: kafka-source properties: bootstrapServers: "broker.url:9092" password: "testpassword" topic: "test-topic" user: "testuser" securityProtocol: "PLAINTEXT" sink:
对于 Kamelet Binding 的 sink,请指定 data consumer Kamelet (如
log-sink
Kamelet),并为 Kamelet 配置任何参数,例如:apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: kafka-to-log spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: kafka-source properties: bootstrapServers: "broker.url:9092" password: "testpassword" topic: "test-topic" user: "testuser" securityProtocol: "PLAINTEXT" // only for AMQ streams sink: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: log-sink properties: showStreams: true
-
保存 YAML 文件(如
kafka-to-log.yaml
)。 - 登录您的 OpenShift 项目。
将 Kamelet Binding 作为资源添加到 OpenShift 命名空间中:
oc apply -f <kamelet 绑定 filename>
例如:
oc apply -f kafka-to-log.yaml
Camel K operator 使用
KameletBinding
资源生成并运行 Camel K 集成。构建可能需要几分钟时间。查看
KameletBinding
资源的状态:oc get kameletbindings
查看其集成状态:
oc get integrations
查看集成的日志:
kamel logs <integration> -n <project>
例如:
kamel logs kafka-to-log -n my-camel-k-kafka
在输出中,您应该看到 coffee 事件,例如:
INFO [log-sink-E80C5C904418150-0000000000000001] (Camel (camel-1) thread #0 - timer://tick) {"id":7259,"uid":"a4ecb7c2-05b8-4a49-b0d2-d1e8db5bc5e2","blend_name":"Postmodern Symphony","origin":"Huila, Colombia","variety":"Kona","notes":"delicate, chewy, black currant, red apple, star fruit","intensifier":"balanced"}
要停止正在运行的集成,请删除关联的 Kamelet Binding 资源:
oc delete kameletbindings/<kameletbinding-name>
例如:
oc delete kameletbindings/kafka-to-log