2.4. 将 Kafka 主题连接到 Kamelet Binding 中的数据 sink
要将 Kafka 主题连接到数据接收器,您可以创建一个 Kamelet Binding,如图 2.3 所示。
图 2.3 将 Kafka 主题连接到数据 sink
前提条件
-
您知道您要发送事件的 Kafka 主题的名称。此流程中的示例使用
test-topic
来发送事件。这与您在 Kamelet Binding 中连接数据源中的 coffee 源中接收事件的主题与 Kafka 主题 相同。 您知道 Kafka 实例的以下参数值:
- BootstrapServers - 以逗号分隔的 Kafka Broker URL 列表。
- Password - 在 Kafka 中进行身份验证的密码。
User - Kafka 身份验证的用户名。
有关使用 OpenShift Streams 时如何获取这些值的详情,请参考 Obtaining Kafka 凭证。
-
您知道与 Kafka 代理通信的安全协议。对于 OpenShift Streams 上的 Kafka 集群,它是
SASL_SSL
(默认值)。对于 AMQ 流上的 Kafka 集群,它是PLAINTEXT
。 您知道哪个 Kame 可让您添加到 Camel K 集成以及所需的实例参数。Kamelet 在 Kamelet Catalog 中提供:
kafka-source
Kamelet - 使用kafka-source
Kamelet,因为 Kafka 主题是发送数据(它是数据制作者)。所需参数的示例值有:-
bootstrapServers -
"broker.url:9092"
-
password -
"testpassword"
-
user -
"testuser"
-
topic -
"test-topic"
-
securityProtocol - 对于 OpenShift Streams 上的 Kafka 集群,您不需要设置此参数,因为
SASL_SSL
是默认值。对于 AMQ 流上的 Kafka 集群,此参数值为"PLAINTEXT
"。
-
bootstrapServers -
-
log-sink
Kamelet - 使用log-sink
来记录它从kafka-source
Kamelet 接收的数据。(可选)指定showStreams
参数以显示数据的消息正文。log-sink
Kamelet 可用于调试目的。
流程
要将 Kafka 主题连接到数据接收器,请创建一个 Kamelet Binding:
在您选择的编辑器中,使用以下基本结构创建一个 YAML 文件:
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: spec: source: sink:
为 Kamelet Binding 添加名称。在本例中,名称是
kafka-to-log
,因为该绑定将kafka-source
Kamelet 连接到log-sink
Kamelet。apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: kafka-to-log spec: source: sink:
对于 Kamelet Binding 的源,指定
kafka-source
Kamelet 并配置其参数。例如,当 Kafka 集群位于 OpenShift Streams 时(您不需要设置
securityProtocol
参数):apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: kafka-to-log spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: kafka-source properties: bootstrapServers: "broker.url:9092" password: "testpassword" topic: "test-topic" user: "testuser" sink:
例如,当 Kafka 集群位于 AMQ Streams 时,您必须将
securityProtocol
参数设置为"PLAINTEXT
"。apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: kafka-to-log spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: kafka-source properties: bootstrapServers: "broker.url:9092" password: "testpassword" topic: "test-topic" user: "testuser" securityProtocol: "PLAINTEXT" sink:
对于 Kamelet Binding 的 sink,指定数据使用者 Kamelet (如
log-sink
Kamelet),并为 Kamelet 配置任何参数,例如:apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: kafka-to-log spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: kafka-source properties: bootstrapServers: "broker.url:9092" password: "testpassword" topic: "test-topic" user: "testuser" securityProtocol: "PLAINTEXT" // only for AMQ streams sink: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: log-sink properties: showStreams: true
-
保存 YAML 文件(例如,
kafka-to-log.yaml
)。 - 登录您的 OpenShift 项目。
将 Kamelet Binding 作为资源添加到 OpenShift 命名空间:
oc apply -f <kamelet 绑定 filename>
例如:
oc apply -f kafka-to-log.yaml
Camel K 运算符通过使用
KameletBinding
资源生成并运行 Camel K 集成。构建可能需要几分钟时间。查看
KameletBinding
资源的状态:oc get kameletbindings
查看其集成的状态:
oc get integrations
查看集成的日志:
kamel logs <integration> -n <project>
例如:
kamel logs kafka-to-log -n my-camel-k-kafka
在输出中,您应看到 coffee 事件,例如:
INFO [log-sink-E80C5C904418150-0000000000000001] (Camel (camel-1) thread #0 - timer://tick) {"id":7259,"uid":"a4ecb7c2-05b8-4a49-b0d2-d1e8db5bc5e2","blend_name":"Postmodern Symphony","origin":"Huila, Colombia","variety":"Kona","notes":"delicate, chewy, black currant, red apple, star fruit","intensifier":"balanced"}
要停止正在运行的集成,请删除关联的 Kamelet Binding 资源:
oc delete kameletbindings/<kameletbinding-name>
例如:
oc delete kameletbindings/kafka-to-log