1.3. 将操作应用到连接中的数据
如果要在 Kamelet 和 event 频道之间传递数据执行操作,请使用 action Kamelets 作为 Kamelet 中的中间步骤。例如,您可以使用操作 Kamelet 序列化或反序列化数据,过滤数据,或者插入字段或消息标头。
操作操作,如过滤或添加字段,仅适用于 JSON 数据(即,当 Content-Type
标头设置为 application/json
时)。如果事件数据使用 JSON 以外的格式(如 Avro 或 Protocol Buffers),您必须通过添加反序列化步骤(例如,引用 protobuf-deserialize-action
或 avro-deserialize-action
Kamelet)来转换数据格式(例如: 它引用了 protobuf-serialize-action
或 avro-serialize-action
Kamelet)。有关转换连接中数据格式的更多信息,请参阅数据转换 Kamelets。
action Kamelets 包括:
1.3.1. 在 Kamelet Binding 中添加操作
要实施一个操作 Kamelet,在 Kamelet 文件的 spec
部分,在 source 和 sink 部分之间添加一个 steps
部分。
先决条件
- 您已创建了一个 Kamelet Binding,如 Kamelet Binding 中的 Connecting source 和 sink 组件 所述。
您知道要添加到 Kamelet 的哪个操作 Kamelet 以及 action Kamelet 的必要参数。
对于此过程中的示例,
predicate-filter-action
Kamelet 的参数是一个字符串类型
expression,它提供了一个 JSON Path Expression,它只过滤 coffee 数据到具有 "deep" taste intensity 的日志中。请注意,predicate-filter-action
Kamelet 要求您在 Kamelet Binding 中设置 Builder 特征配置属性。这个示例还包括反序列化和序列化操作,在本例中是可选的,因为事件数据格式是 JSON。
流程
在编辑器中打开
KameletBinding
文件。例如,以下是
coffee-to-log.yaml
文件的内容:apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffee-to-log spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: coffee-source properties: period: 5000 sink: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: log-sink
在
source
部分上方添加一个集成
部分,并提供以下 Builder 特征配置属性(根据predicate-filter-action
Kamelet 要求):apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffee-to-log spec: integration: traits: builder: configuration: properties: - "quarkus.arc.unremovable- types=com.fasterxml.jackson.databind.ObjectMapper" source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: coffee-source properties: period: 5000 sink: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: log-sink
在
源和
接收器
部分之间添加steps
部分并定义 action Kamelet。例如:apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffee-to-log spec: integration: traits: builder: configuration: properties: - "quarkus.arc.unremovable-types=com.fasterxml.jackson.databind.ObjectMapper" source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: coffee-source properties: period: 5000 steps: - ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: json-deserialize-action - ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: predicate-filter-action properties: expression: "@.intensifier =~ /.*deep/" - ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: json-serialize-action sink: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: log-sink
- 保存您的更改。
使用
oc apply
命令来更新KameletBinding
资源,例如:oc apply -f coffee-to-log.yaml
Camel K operator re-generates 并运行基于更新的
KameletBinding
资源生成的 CamelK 集成。查看 Kamelet Binding 的状态:
oc get kameletbindings
查看其对应的集成状态:
oc get integrations
查看集成的日志文件输出:
kamel logs <integration-name>
例如,如果集成名称为
coffee-to-log
:kamel logs coffee-to-log
要停止集成,删除 Kamelet Binding:
oc delete kameletbindings/<kameletbinding-name>
例如:
oc delete kameletbindings/coffee-to-log