Red Hat Camel K is deprecated
Red Hat Camel K is deprecated and the End of Life date for this product is June 30, 2025. For help migrating to the current go-to solution, Red Hat build of Apache Camel, see the Migration Guide.1.3. 将操作应用到连接中的数据
如果要对 Kamelet 和事件频道之间的数据执行操作,请使用 action Kamelets 作为 Kamelet Binding 中的中间步骤。例如,您可以使用 action Kamelet 来序列化或反序列化数据,过滤数据,或者插入字段或消息标头。
操作(如过滤或添加字段)只适用于 JSON 数据(即,当 Content-Type
标头设置为 application/json
时)。如果事件数据使用 JSON 以外的格式(如 Avro 或 Protocol Buffers),则在处理操作前需要一个额外的反序列化步骤来进行格式转换(例如,protobuf-deserialize-action
或 avro-deserialize-action
Kamelet),并在处理操作后需要另外一个额外的序列化操作(例如 protobuf-serialize-action
or avro-serialize-action
Kamelet)。有关在连接中转换数据格式的更多信息,请参阅数据转换 Kamelets。
action Kamelets 包括:
1.3.1. 在 Kamelet Binding 中添加操作
要实现一个操作 Kamelet,在 Kamelet Binding 文件的 spec
部分中,在 source 和 sink 部分之间添加 一个步骤
部分。
先决条件
- 您已创建了 Kamelet Binding,如 Connecting source 和 sink 组件在 Kamelet Binding 中 所述。
您知道您要添加到 Kamelet Binding 以及 action Kamelet 所需的参数中的 action Kamelet。
对于此流程中的示例,
predicate-filter-action
Kamelet 的参数是一个字符串
类型 expression,它提供 JSON 路径表达式,它只过滤 coffee 数据来仅记录具有 "deep" 波动性的数据。请注意,predicate-filter-action
Kamelet 要求您在 Kamelet Binding 中设置 Builder 特征配置属性。这个示例还包括反序列化和序列化操作,它们在此情形中是可选的,因为事件数据格式为 JSON。
流程
在编辑器中打开
KameletBinding
文件。例如,以下是
coffee-to-log.yaml
文件的内容:apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffee-to-log spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: coffee-source properties: period: 5000 sink: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: log-sink
在
source
部分添加一个integration
部分,并提供以下 Builder 特征配置属性(根据predicate-filter-action
Kamelet 要求):apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffee-to-log spec: integration: traits: builder: configuration: properties: - "quarkus.arc.unremovable- types=com.fasterxml.jackson.databind.ObjectMapper" source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: coffee-source properties: period: 5000 sink: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: log-sink
在
source
和sink
部分添加一个steps
部分,并定义 action Kamelet。例如:apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffee-to-log spec: integration: traits: builder: configuration: properties: - "quarkus.arc.unremovable-types=com.fasterxml.jackson.databind.ObjectMapper" source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: coffee-source properties: period: 5000 steps: - ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: json-deserialize-action - ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: predicate-filter-action properties: expression: "@.intensifier =~ /.*deep/" - ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: json-serialize-action sink: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: log-sink
- 保存您的更改。
使用
oc apply
命令更新KameletBinding
资源,例如:oc apply -f coffee-to-log.yaml
Camel K operator 会重新生成并运行它根据更新的
KameletBinding
资源生成的 CamelK 集成。查看 Kamelet Binding 的状态:
oc get kameletbindings
查看其对应集成的状态:
oc get integrations
查看集成的日志文件输出:
kamel logs <integration-name>
例如,如果集成名称为
coffee-to-log
:kamel logs coffee-to-log
要停止集成,请删除 Kamelet Binding:
oc delete kameletbindings/<kameletbinding-name>
例如:
oc delete kameletbindings/coffee-to-log
- 对于命名空间 operator,kamelets 与 Camel K operator 一起安装到当前命名空间中。
- 对于全局 Operator 安装,kamelets 被安装到全局 operator 命名空间中(默认为 openshift-operators),并可在任意命名空间中使用 KameletBinding。
- KameletBinding 可以引用在当前或全局 operator 命名空间中安装的 kamelets。
1.3.2. action kamelets
1.3.2.1. 数据收集 Kamelets
您可以过滤在源和接收器组件之间传递的数据,例如防止泄漏敏感数据,或避免产生不必要的网络费用。
您可以根据以下条件过滤数据:
-
Kafka 主题 name - Filter 事件带有一个与给定 Java 正则表达式匹配的名称,方法是配置 Topic Name Matches Filter Action Kamelet (
topic-name-matches-filter-action
)。如需更多信息,请参阅 过滤特定 Kafka 主题的事件数据。 -
通过配置 Header Filter Action Kamelet (
has-header-filter-action
)来标头带有给定消息标头的标头。 -
null value - Filters tombstone 事件(带有 null 有效负载的事件)通过配置 Tombstone Filter Action Kamelet (
is-tombstone-filter-action
)。 predicate- Filter 事件基于给定的 JSON 路径表达式,方法是配置 Predicate Filter Action Kamelet (
predicate-filter-action
)。predicate-filter-action
Kamelet 要求您在 Kamelet Binding 中设置以下 Builder 特征 配置属性:spec: integration: traits: builder: configuration: properties: - "quarkus.arc.unremovable-types=com.fasterxml. jackson.databind.ObjectMapper"
数据过滤 Kamelets 可以正常工作,带有 JSON 数据(即,当 Content-Type 标头设置为 application/json 时)。如果事件数据使用 JSON 以外的格式,则在处理操作前需要一个额外的反序列化步骤来进行格式转换(例如,protobuf-deserialize-action
或 avro-deserialize-action
),并在处理操作后需要另外一个额外的序列化操作(例如 protobuf-serialize-action
or avro-serialize-action
)。有关在连接中转换数据格式的更多信息,请参阅数据转换 Kamelets。
1.3.2.2. 数据转换 Kamelets
使用以下数据转换 Kamelets,您可以序列化和反序列化在源和接收器组件之间传递的数据格式。数据转换适用于事件数据的有效负载(而不是键或标头)。
avro - 为 Apache Hadoop 提供数据序列化和数据交换服务的开源项目。
-
avro Deserialize Action Kamelet (
avro-deserialize-action
) -
avro Serialize Action Kamelet (
avro-serialize-action
)
-
avro Deserialize Action Kamelet (
协议缓冲 - Google 发明的紧凑二进制连接格式,以便他们可以在内部使用它,以便他们能够与其内部网络服务通信。
-
protobuf Deserialize Action Kamelet (
protobuf-deserialize-action
) -
protobuf Serialize Action Kamelet (
protobuf-serialize-action
)
-
protobuf Deserialize Action Kamelet (
JSON (JavaScript 对象表示法)- 基于 JavaScript 编程语言的子集的数据交互格式。JSON 是完全独立于语言的文本格式。
-
JSON Deserialize Action Kamelet (
json-deserialize-action
) -
JSON Serialize Action Kamelet (
json-serialize-action
)
-
JSON Deserialize Action Kamelet (
您必须在 Avro 和 Protobuf serialize/deserialize Kamelets 中指定 schema (作为单行,使用 JSON 格式)。您不需要对 JSON 序列化/反序列化 Kamelets 这样做。
1.3.2.3. 数据转换 Kamelets
使用以下数据转换 Kamelets,您可以对在源和接收器组件之间传递的数据执行简单的操作:
-
提取 Field - 使用
extract-field-action
Kamelet 从数据正文中拉取字段,并将整个数据正文替换为提取的字段。 -
Hoist Field - 使用
hoist-field-action
Kamelet 将数据正文嵌套到一个字段中。 -
insert Header - 使用
insert-header-action
Kamelet 使用静态数据或记录元数据添加标头字段。 -
插入 Field - 使用
insert-field-action
Kamelet 来通过使用静态数据或记录元数据添加字段值。 Mask Field - 使用
mask-field-action
Kamelet 为字段类型使用一个有效的 null 值(如 0 或空字符串)或一个给定的值(需要是一个非空的字符串或一个数字值)替换字段的值。例如,如果要从关系数据库捕获数据以发送到 Kafka,并且数据包含受保护的(PCI/PII)信息,如果您的 Kafka 集群尚未认证,则必须屏蔽受保护的信息。
-
替换 Field - 使用
replace-field-action
Kamelet 来过滤或重命名字段。您可以指定用于重命名、禁用(排除)或启用(包括)的字段。 -
To Key -(用于 Kafka)使用
value-to-key-action
Kamelet 将记录键替换为有效负载中字段子集的新键。您可以将 event 键设置为基于事件信息的值,然后再将数据写入 Kafka。例如,当从数据库表中读取记录时,您可以根据客户 ID 对 Kafka 中的记录进行分区。