红帽构建的 Apache Camel for Quarkus 的 kamelets 指南
红帽构建的 Apache Camel for Quarkus 的 kamelets 指南
摘要
前言
向红帽构建的 Apache Camel 文档提供反馈
要报告错误或改进文档,请登录您的红帽 JIRA 帐户并提交问题。如果您没有红帽 JIRA 帐户,系统会提示您创建一个帐户。
流程
- 点击以下链接 创建 ticket
- 在 Summary 中输入问题的简短描述。
- 提供有关描述中问题或功能增强的详细描述。包括一个 URL,以在文档中发生问题。
- 点 Submit 创建问题并将其路由到适当的文档团队。
kamelets 是可重复使用的路由组件,隐藏创建连接到外部系统的数据管道的复杂性。
第 1 章 Kamelets 概述
kamelets 是高级别连接器,可在事件驱动的构架解决方案中充当构建块。它们是您可以在 OpenShift 集群上安装的自定义资源,并在 Camel K 集成中使用。kamelets 可加速您的开发工作。它们简化了您连接数据源(发出事件)和数据接收器(消耗事件)的方式。因为您配置的 Kamelet 参数而不是编写代码,所以您不需要熟悉 Camel DSL 来使用 Kamelets。
您可以使用 Kamelets 将应用程序和服务直接连接到彼此或:
- Kafka 主题,如 使用 Kamelets 连接到 Kafka 中所述。
- Knative 目的地(频道或代理),如 使用 Kamelet 连接到 Knative 中所述。
- 特定的 Camel URI,如 连接到显式 Camel URI 中所述。
1.1. 关于 Kamelets
kamelets 是路由组件(封装代码),在 Camel 集成中作为连接器工作。您可以将 Kamelets 视为定义使用来自源(源)的数据和向(接收器)发送数据的模板 - 允许您编译数据管道。kamelets 也可以过滤、掩码,并对数据执行简单的计算逻辑。
Kamelets 有三种不同类型的:
- 源 - 生成数据的路由。您可以使用 source Kamelet 从组件检索数据。
- sink - 使用数据的路由。您可以使用 sink Kamelet 将数据发送到组件。
- action - 对数据执行操作的路由。当从一个源 Kamelet 传递给接收器 Kamelet 时,您可以使用一个 action Kamelet 来操作数据。
1.1.1. 为什么使用 Kamelets?
在 微服务和 事件驱动的构架解决方案中,Kamelets 可以充当发布事件和 sink 的源的构建块。
kamelets 提供抽象(它们隐藏了连接到外部系统的复杂性)和可重用性(它们是重复使用代码并将其应用到不同用例的简单方法)。
以下是一些用例示例:
- 您希望应用程序消耗来自 Telegram 的事件,您可以使用 Kamelets 将 Telegram 源绑定到事件频道。之后,您可以将应用程序连接到该频道,以便对这些事件做出反应。
- 您希望您的应用程序直接连接到 Slack。
kamelets 允许您和您的集成开发团队更高效。您可以重复使用 Kamelets,并将它们与可以配置实例的团队成员共享。底层 Camel K 操作器确实工作:它编译、构建、软件包并部署由 Kamelet 定义的集成。
1.1.2. 谁使用 Kamelets?
因为 Kamelets 允许您减少 Camel 集成中所需的编码量,因此它们非常适合不熟悉 Camel DSL 的开发人员。kamelets 可以帮助非 Camel 开发者更容易地学习相关的知识。无需您学习另一个框架或语言来获取 Camel 运行。
kamelets 对于希望将复杂 Camel 集成逻辑封装到可重复使用的 Kamelet 中的经验丰富的 Camel 开发人员也很有用,然后将其与其他用户共享。
1.1.3. 使用 Kamelets 的先决条件是什么?
要使用 Kamelets,您需要以下环境设置:
- 您可以使用正确的访问级别访问 OpenShift 4.6 (或更新版本)集群、创建项目和安装 Operator 的功能,以及在本地系统上安装 OpenShift 和 Camel K CLI 工具的功能。
-
已安装 OpenShift 命令行(
oc
)接口工具。
1.1.4. 如何使用 Kamelets?
使用 Kamelet 通常涉及两个组件: Kamelet 本身,它定义了一个可重复使用的路由片断,以及一个 Kamelet Binding,在其中引用并绑定到一个或多个 Kamelets。Kamelet Binding 是一个 OpenShift 资源(KameletBinding
)。
在 Kamelet Binding 资源中,您可以:
- 将 sink 或源 Kamelet 连接到事件频道: Kafka 主题或 Knative 目的地(频道或代理)。
- 将接收器 Kamelet 直接连接到 Camel 统一资源标识符(URI)。您还可以将源 Kamelet 连接到 Camel URI,但连接 URI 和接收器 Kamelet 是最常见的用例。
- 将接收器和源 Kamelet 直接连接到彼此,而无需将事件频道用作中间层。
- 在同一 Kamelet Binding 中多次引用同一 Kamelet。
- 添加 action Kamelets,以便在从源 Kamelet 传递给接收器 Kamelet 时操作数据。
- 定义错误处理策略,以指定在发送或接收事件数据时 Camel K 应做什么。
在运行时,Camel K operator 使用 Kamelet Binding 生成并运行 Camel K 集成。
注: 虽然 Camel DSL 开发人员可以直接在 Camel K 集成中使用 Kamelets,但实施 Kamelets 更简单的方式是指定 Kamelets 来构建高级别事件流。
1.2. 连接源和接收器
当您要连接两个或多个组件(外部应用程序或服务)时,请使用 Kamelets。每个 Kamelet 基本上是一个带有配置属性的路由模板。您需要知道您要从(一个源)获取数据的组件以及您要将数据发送到哪个组件(接收器)。您可以通过在 Kamelet Binding 中添加 Kamelets 来连接源和接收器组件,如图 1.1 所示。

图 1.1: Kamelet Binding source to sink
以下是在 Kamelet Binding 中使用 Kamelets 的步骤概述:
- 安装 Camel K operator。它包括一个 Kamelets 目录,作为 OpenShift 项目中的资源。
- 创建一个 Kamelet Binding。决定要在 Kamelet Binding 中连接的服务或应用程序。
- 查看 Kamelet Catalog,以查找您要使用的源和接收器组件的 Kamelets。
- 对于您要包含在 Kamelet Binding 中的每个 Kamelet,请确定您需要设置的配置属性。
- 在 Kamelet Binding 代码中,添加对每个 Kamelet 的引用并配置所需属性。
- 将 Kamelet Binding 作为资源应用到 OpenShift 项目中。
Camel K operator 使用 Kamelet Binding 来生成和运行集成。
1.2.1. 查看 Kamelet 目录
安装 Camel K 操作器时,它包含一个可在 Camel K 集成中使用的 Kamelets 目录。
前提条件
您在工作命名空间或集群范围内安装了 Camel K operator,如 安装 Camel K 所述。
流程
查看安装 Camel K operator 的 Kamelets 列表:
- 在终端窗口中,登录到您的 OpenShift 集群。
查看可用的 Kamelets 列表取决于如何安装 Camel K operator (在特定命名空间或 cluster-mode 中):
如果 Camel K Operator 在 cluster-mode 中安装,请使用这个命令查看可用的 Kamelets:
oc get kamelet -n openshift-operators
如果 Camel K Operator 安装在特定命名空间中:
打开安装 Camel K operator 的项目。
oc project <camelk-project>
例如,如果 Camel K 操作器安装在
my-camel-k-project
项目中:oc project my-camel-k-project
运行以下命令:
oc get kamelets
有关红帽支持的 Kamelets 列表,请参阅 Red Hat Integration 发行注记 。
1.2.1.1. 在 Kamelet Catalog 中添加自定义 Kamelet
如果您没有在符合您的要求的目录中看到 Kamelet,则 Camel DSL 开发人员可创建自定义 Kamelet,如 Apache Camel Kamelets Developers 指南 (社区文档)中所述。Kamelet 以 YAML
格式编码,根据惯例,它有一个 .kamelet.yaml
文件扩展名。
先决条件
- Camel DSL 开发人员为您提供了一个自定义 Kamelet 文件。
- Kamelet 名称对于安装 Camel K operator 的 OpenShift 命名空间必须是唯一的。
流程
使自定义 Kamelet 作为 OpenShift 命名空间中的资源提供:
-
将 Kamelet
YAML
文件(如custom-sink.kamelet.yaml
)下载到本地文件夹。 - 登录到您的 OpenShift 集群。
在终端窗口中,打开安装 Camel K operator 的项目,如
my-camel-k-project
:oc project my-camel-k-project
运行
oc apply
命令将自定义 Kamelet 作为资源添加到命名空间中:oc apply -f <custom-kamelet-filename>
例如,使用以下命令添加位于当前目录中的
custom-sink.kamelet.yaml
文件:oc apply -f custom-sink.kamelet.yaml
要验证 Kamelet 作为一个资源可用,请使用以下命令查看当前命名空间中的所有 Kamelets 的字母顺序列表,然后查找您的自定义 Kamelet:
oc get kamelets
1.2.1.2. 确定 Kamelet 的配置参数
在 Kamelet Binding 中,当您添加对 Kamelet 的引用时,您可以指定 Kamelet 的名称并配置 Kamelet 的参数。
前提条件
- 您已在工作命名空间或集群范围内安装了 Camel K operator。
流程
要确定 Kamelet 的名称和参数:
- 在终端窗口中,登录到您的 OpenShift 集群。
打开 Kamelet 的 YAML 文件:
oc describe kamelets/<kamelet-name>
例如,要查看
ftp-source
Kamelet 的代码,如果当前命名空间中安装了 Camel K operator,请使用以下命令:oc describe kamelets/ftp-source
如果 Camel K Operator 在 cluster-mode 中安装,使用以下命令:
oc describe -n openshift-operators kamelets/ftp-source
在 YAML 文件中,向下滚动到
spec.definition
部分(使用 JSON-schema 格式编写),以查看 Kamelet 的属性列表。在本节的末尾,必填字段列出了在引用 Kamelet 时必须配置的属性。例如,以下代码是
ftp-source
Kamelet 的spec.definition
部分的摘录。本节详细介绍了所有 Kamelet 的配置属性。此 Kamelet 的必要属性是connectionHost
,connectionPort
,username
,password
, 和directoryName
:spec: definition: title: "FTP Source" description: |- Receive data from an FTP Server. required: - connectionHost - connectionPort - username - password - directoryName type: object properties: connectionHost: title: Connection Host description: Hostname of the FTP server type: string connectionPort: title: Connection Port description: Port of the FTP server type: string default: 21 username: title: Username description: The username to access the FTP server type: string password: title: Password description: The password to access the FTP server type: string format: password x-descriptors: - urn:alm:descriptor:com.tectonic.ui:password directoryName: title: Directory Name description: The starting directory type: string passiveMode: title: Passive Mode description: Sets passive mode connection type: boolean default: false x-descriptors: - 'urn:alm:descriptor:com.tectonic.ui:checkbox' recursive: title: Recursive description: If a directory, will look for files in all the sub-directories as well. type: boolean default: false x-descriptors: - 'urn:alm:descriptor:com.tectonic.ui:checkbox' idempotent: title: Idempotency description: Skip already processed files. type: boolean default: true x-descriptors: - 'urn:alm:descriptor:com.tectonic.ui:checkbox'
spec: definition: title: "FTP Source" description: |- Receive data from an FTP Server. required: - connectionHost - connectionPort - username - password - directoryName type: object properties: connectionHost: title: Connection Host description: Hostname of the FTP server type: string connectionPort: title: Connection Port description: Port of the FTP server type: string default: 21 username: title: Username description: The username to access the FTP server type: string password: title: Password description: The password to access the FTP server type: string format: password x-descriptors: - urn:alm:descriptor:com.tectonic.ui:password directoryName: title: Directory Name description: The starting directory type: string passiveMode: title: Passive Mode description: Sets passive mode connection type: boolean default: false x-descriptors: - 'urn:alm:descriptor:com.tectonic.ui:checkbox' recursive: title: Recursive description: If a directory, will look for files in all the sub-directories as well. type: boolean default: false x-descriptors: - 'urn:alm:descriptor:com.tectonic.ui:checkbox' idempotent: title: Idempotency description: Skip already processed files. type: boolean default: true x-descriptors: - 'urn:alm:descriptor:com.tectonic.ui:checkbox'
Copy to Clipboard Copied!
另请参阅
1.2.2. 在 Kamelet Binding 中连接源和接收器组件
在 Kamelet Binding 中,您连接源和接收器组件。
此流程中的示例使用以下 Kamelets,如图 1.2 所示:
-
示例源 Kamelet 名为
coffee-source
。这个简单的 Kamelet 从网站目录中检索有关 coffee 类型的随机生成数据。它有一个参数(句点
-整数值
),它决定了检索 coffee 数据的频率(以秒为单位)。不需要该参数,因为有一个默认值(1000 秒)。 -
sink Kamelet 示例名为
log-sink
。它检索数据并将其输出到日志文件。log-sink
Kamelet 在 Kamelet Catalog 中提供。

图 1.2: Kamelet Binding 示例
先决条件
- 您知道如何创建和编辑 Camel K 集成。
- Red Hat Integration - Camel K operator 安装在 OpenShift 命名空间或集群中,您已下载 Red Hat Integration Camel K CLI 工具,如 安装 Camel K 所述。
- 您知道您要添加到 Camel K 集成中的 Kamelets 及其所需实例参数。
要使用的 Kamelets 包括在 Kamelet 目录中。
在本例中,
log-sink
Kamelet 在 Kamelet Catalog 中提供。如果要在本示例中使用 source Kamelet,将coffee-source
代码复制并 保存到名为coffee-source.kamelet.yaml
的本地文件中,然后运行以下命令将其添加到 Kamelet Catalog 中:oc apply -f coffee-source.kamelet.yaml
流程
- 登录到您的 OpenShift 集群。
打开安装 Camel K operator 的工作项目。如果您在 cluster-mode 中安装 Camel K operator,则它可供集群中的任何项目使用。
例如,要打开名为
my-camel-k-project
的现有项目:oc project my-camel-k-project
使用以下选项之一创建一个新的 Kamelet Binding:
-
使用
kamel bind
命令创建并运行 Kamelet Binding (此选项对于命令行定义是 conducive 的简单 Kamelet Bindings 非常有用) 创建一个 YAML 文件来定义 Kamelet Binding,然后使用
oc apply
命令运行它(此选项在 Kamelet Binding 配置更为复杂时很有用)。使用 kamel bind 命令创建一个新的 Kamelet Binding
使用以下
kamel 绑定
语法指定 source 和 sink Kamelets 以及任何配置参数:kamel bind <kamelet-source> -p “<property>=<property-value>” <kamelet-sink> -p “<property>=<property-value>”
kamel bind <kamelet-source> -p “<property>=<property-value>” <kamelet-sink> -p “<property>=<property-value>”
Copy to Clipboard Copied! 例如:
kamel bind coffee-source -p “source.period=5000” log-sink -p "sink.showStreams=true"
kamel bind coffee-source -p “source.period=5000” log-sink -p "sink.showStreams=true"
Copy to Clipboard Copied! Camel K operator 生成一个
KameletBinding
资源,并运行对应的 Camel K 集成。使用 YAML 文件创建一个新的 Kamelet Binding
在您选择的编辑器中,创建一个具有以下结构的 YAML 文件:
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: spec: source: sink:
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: spec: source: sink:
Copy to Clipboard Copied! 为 Kamelet Binding 添加一个名称。
在本例中,名称是
coffee-to-log
,因为绑定将coffee-source
Kamelet 连接到log-sink
Kamelet。apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffee-to-log spec: source: sink:
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffee-to-log spec: source: sink:
Copy to Clipboard Copied! 指定源 Kamelet (如
coffee-source
)并为 Kamelet 配置任何参数。注: 在本例中,参数在 Kamelet Binding 的 YAML 文件中定义。另外,您可以在属性文件、ConfigMap 或 Secret 中配置 Kamelet 的参数,如 配置 Kamelet 实例参数 中所述。
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffee-to-log spec: source: ref kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: coffee-source properties: period: 5000 sink:
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffee-to-log spec: source: ref kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: coffee-source properties: period: 5000 sink:
Copy to Clipboard Copied! 指定 sink Kamelet (如
log-sink
)并为 Kamelet 配置任何参数。使用log-sink
Kamelet 的可选showStreams
参数来显示消息正文。apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffee-to-log spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: coffee-source properties: period: 5000 sink: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: log-sink properties: showStreams: true
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffee-to-log spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: coffee-source properties: period: 5000 sink: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: log-sink properties: showStreams: true
Copy to Clipboard Copied! -
保存 YAML 文件(例如,
coffee-to-log.yaml
)。 将
KameletBinding
作为资源添加到 OpenShift 命名空间中:oc apply -f <kamelet-binding>.yaml
例如:
oc apply -f coffee-to-log.yaml
Camel K operator 使用
KameletBinding
资源生成并运行 Camel K 集成。
-
使用
查看 Kamelet Binding 的状态:
oc get kameletbindings
-
要查看相应集成的状态:
oc get integrations
查看输出:
要从命令行查看日志,请打开终端窗口,然后输入以下命令:
kamel log <integration-name>
例如,如果集成名称为
coffee-to-log
,请使用以下命令:kamel log coffee-to-log
查看 OpenShift Web 控制台的日志:
- 选择 Workloads > Pods。
单击 Camel K integration 的 pod 的名称,然后单击 Logs。
您应该看到类似以下示例的 coffee 事件列表:
INFO [log-sink-E80C5C904418150-0000000000000001] (Camel (camel-1) thread #0 - timer://tick) {"id":7259,"uid":"a4ecb7c2-05b8-4a49-b0d2-d1e8db5bc5e2","blend_name":"Postmodern Symphony","origin":"Huila, Colombia","variety":"Kona","notes":"delicate, chewy, black currant, red apple, star fruit","intensifier":"balanced"}
INFO [log-sink-E80C5C904418150-0000000000000001] (Camel (camel-1) thread #0 - timer://tick) {"id":7259,"uid":"a4ecb7c2-05b8-4a49-b0d2-d1e8db5bc5e2","blend_name":"Postmodern Symphony","origin":"Huila, Colombia","variety":"Kona","notes":"delicate, chewy, black currant, red apple, star fruit","intensifier":"balanced"}
Copy to Clipboard Copied!
要停止集成,请删除 Kamelet Binding:
oc delete kameletbindings/<kameletbinding-name>
例如:
oc delete kameletbindings/coffee-to-log
后续步骤
(可选):
- 添加 action Kamelets 作为中间步骤,如 向 Kamelet Binding 添加操作 中所述。
- 向 Kamelet Binding 添加错误处理,如 将错误处理程序策略添加到 Kamelet Binding 中所述。
1.2.3. 配置 Kamelet 实例参数
引用 Kamelet 时,您可以选择定义 Kamelet 的实例参数:
直接在指定 Kamelet URI 的 Kamelet Binding 中。在以下示例中,由 Telegram BotFather 提供的 bot 授权令牌是
123456
:from("kamelet:telegram-source?authorizationToken=123456")
全局配置 Kamelet 属性(因此您不必使用以下格式在 URI 中提供值):
"camel.kamelet.<kamelet-name>.<property-name>=<value>”
如使用 Camel K 开发和管理集成 中的 配置 Camel K 集成 章节中所述,您可以通过以下方式配置 Kamelet 参数:
- 将它们定义为属性
- 在属性文件中定义它们
- 在 OpenShift ConfigMap 或 Secret 中定义它们
另请参阅
1.2.4. 连接到事件频道
Kamelets 的最常见用例是使用 Kamelet Binding 来将它们连接到事件频道: Kafka 主题或 Knative 目的地(频道或代理)。这样做的好处在于,数据源和接收器相互独立,并"不感知"。这种分离允许单独开发和管理您的业务场景中的组件。如果您有多个数据接收器和源作为业务场景的一部分,则分离各种组件变得更为重要。例如,如果需要关闭事件 sink,事件源不受影响。另外,如果其他 sink 使用相同的源,则不会受到影响。
图 1.3 演示了将源和接收器 Kamelets 连接到事件频道的流。

图 1.3:将源和接收器 Kamelets 连接到事件频道
如果您使用 Apache Kafka stream-processing 框架,有关如何连接到 Kafka 主题的详情,请参阅 连接到 Kafka with Kamelets。
如果使用 Knative 无服务器框架,了解如何连接到 Knative 目标(频道或代理),请参阅 连接到 Knative with Kamelets。
1.2.5. 连接到显式 Camel URI
您可以创建一个 Kamelet Binding,其中 Kamelet 将事件发送到或从显式 Camel URI 接收事件。通常,您可以将源 Kamelet 绑定到可接收事件的 URI (即,您可以在 Kamelet Binding 中将 URI 指定为 sink)。接收事件的 Camel URI 示例是 HTTP 或 HTTPS 端点。
也可以将 URI 指定为 Kamelet Binding 中的源,但并不常见。发送事件的 Camel URI 示例是计时器、邮件或 FTP 端点。
要将 Kamelet 连接到 Camel URI,请按照 Kamelet Binding 和 sink.uri
字段中的 Connecting source 和 sink 组件 中的步骤进行操作,而不是 Kamelet,指定一个显式 Camel URI。
在以下示例中,sink 的 URI 是一个虚构 URI (https://mycompany.com/event-service):
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffee-to-event-service spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: coffee-source properties: period: 5000 sink: uri: https://mycompany.com/event-service
apiVersion: camel.apache.org/v1alpha1
kind: KameletBinding
metadata:
name: coffee-to-event-service
spec:
source:
ref:
kind: Kamelet
apiVersion: camel.apache.org/v1alpha1
name: coffee-source
properties:
period: 5000
sink:
uri: https://mycompany.com/event-service
1.3. 将操作应用到连接中的数据
如果要对 Kamelet 和事件频道之间的数据执行操作,请使用 action Kamelets 作为 Kamelet Binding 中的中间步骤。例如,您可以使用 action Kamelet 来序列化或反序列化数据,过滤数据,或者插入字段或消息标头。
操作(如过滤或添加字段)只适用于 JSON 数据(即,当 Content-Type
标头设置为 application/json
时)。如果事件数据使用 JSON 以外的格式(如 Avro 或 Protocol Buffers),则在处理操作前需要一个额外的反序列化步骤来进行格式转换(例如,protobuf-deserialize-action
或 avro-deserialize-action
Kamelet),并在处理操作后需要另外一个额外的序列化操作(例如 protobuf-serialize-action
or avro-serialize-action
Kamelet)。有关在连接中转换数据格式的更多信息,请参阅数据转换 Kamelets。
action Kamelets 包括:
1.3.1. 在 Kamelet Binding 中添加操作
要实现一个操作 Kamelet,在 Kamelet Binding 文件的 spec
部分中,在 source 和 sink 部分之间添加 一个步骤
部分。
先决条件
- 您已创建了 Kamelet Binding,如 Connecting source 和 sink 组件在 Kamelet Binding 中 所述。
您知道您要添加到 Kamelet Binding 以及 action Kamelet 所需的参数中的 action Kamelet。
对于此流程中的示例,
predicate-filter-action
Kamelet 的参数是一个字符串
类型 expression,它提供 JSON 路径表达式,它只过滤 coffee 数据来仅记录具有 "deep" 波动性的数据。请注意,predicate-filter-action
Kamelet 要求您在 Kamelet Binding 中设置 Builder 特征配置属性。这个示例还包括反序列化和序列化操作,它们在此情形中是可选的,因为事件数据格式为 JSON。
流程
在编辑器中打开
KameletBinding
文件。例如,以下是
coffee-to-log.yaml
文件的内容:apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffee-to-log spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: coffee-source properties: period: 5000 sink: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: log-sink
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffee-to-log spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: coffee-source properties: period: 5000 sink: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: log-sink
Copy to Clipboard Copied! 在
source
部分添加一个integration
部分,并提供以下 Builder 特征配置属性(根据predicate-filter-action
Kamelet 要求):apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffee-to-log spec: integration: traits: builder: configuration: properties: - "quarkus.arc.unremovable- types=com.fasterxml.jackson.databind.ObjectMapper" source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: coffee-source properties: period: 5000 sink: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: log-sink
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffee-to-log spec: integration: traits: builder: configuration: properties: - "quarkus.arc.unremovable- types=com.fasterxml.jackson.databind.ObjectMapper" source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: coffee-source properties: period: 5000 sink: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: log-sink
Copy to Clipboard Copied! 在
source
和sink
部分添加一个steps
部分,并定义 action Kamelet。例如:apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffee-to-log spec: integration: traits: builder: configuration: properties: - "quarkus.arc.unremovable-types=com.fasterxml.jackson.databind.ObjectMapper" source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: coffee-source properties: period: 5000 steps: - ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: json-deserialize-action - ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: predicate-filter-action properties: expression: "@.intensifier =~ /.*deep/" - ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: json-serialize-action sink: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: log-sink
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffee-to-log spec: integration: traits: builder: configuration: properties: - "quarkus.arc.unremovable-types=com.fasterxml.jackson.databind.ObjectMapper" source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: coffee-source properties: period: 5000 steps: - ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: json-deserialize-action - ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: predicate-filter-action properties: expression: "@.intensifier =~ /.*deep/" - ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: json-serialize-action sink: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: log-sink
Copy to Clipboard Copied! - 保存您的更改。
使用
oc apply
命令更新KameletBinding
资源,例如:oc apply -f coffee-to-log.yaml
Camel K operator 会重新生成并运行它根据更新的
KameletBinding
资源生成的 CamelK 集成。查看 Kamelet Binding 的状态:
oc get kameletbindings
查看其对应集成的状态:
oc get integrations
查看集成的日志文件输出:
kamel logs <integration-name>
例如,如果集成名称为
coffee-to-log
:kamel logs coffee-to-log
要停止集成,请删除 Kamelet Binding:
oc delete kameletbindings/<kameletbinding-name>
例如:
oc delete kameletbindings/coffee-to-log
- 对于命名空间 operator,kamelets 与 Camel K operator 一起安装到当前命名空间中。
- 对于全局 Operator 安装,kamelets 被安装到全局 operator 命名空间中(默认为 openshift-operators),并可在任意命名空间中使用 KameletBinding。
- KameletBinding 可以引用在当前或全局 operator 命名空间中安装的 kamelets。
1.3.2. action kamelets
1.3.2.1. 数据收集 Kamelets
您可以过滤在源和接收器组件之间传递的数据,例如防止泄漏敏感数据,或避免产生不必要的网络费用。
您可以根据以下条件过滤数据:
-
Kafka 主题 name - Filter 事件带有一个与给定 Java 正则表达式匹配的名称,方法是配置 Topic Name Matches Filter Action Kamelet (
topic-name-matches-filter-action
)。如需更多信息,请参阅 过滤特定 Kafka 主题的事件数据。 -
通过配置 Header Filter Action Kamelet (
has-header-filter-action
)来标头带有给定消息标头的标头。 -
null value - Filters tombstone 事件(带有 null 有效负载的事件)通过配置 Tombstone Filter Action Kamelet (
is-tombstone-filter-action
)。 predicate- Filter 事件基于给定的 JSON 路径表达式,方法是配置 Predicate Filter Action Kamelet (
predicate-filter-action
)。predicate-filter-action
Kamelet 要求您在 Kamelet Binding 中设置以下 Builder 特征 配置属性:spec: integration: traits: builder: configuration: properties: - "quarkus.arc.unremovable-types=com.fasterxml. jackson.databind.ObjectMapper"
spec: integration: traits: builder: configuration: properties: - "quarkus.arc.unremovable-types=com.fasterxml. jackson.databind.ObjectMapper"
Copy to Clipboard Copied!
数据过滤 Kamelets 可以正常工作,带有 JSON 数据(即,当 Content-Type 标头设置为 application/json 时)。如果事件数据使用 JSON 以外的格式,则在处理操作前需要一个额外的反序列化步骤来进行格式转换(例如,protobuf-deserialize-action
或 avro-deserialize-action
),并在处理操作后需要另外一个额外的序列化操作(例如 protobuf-serialize-action
or avro-serialize-action
)。有关在连接中转换数据格式的更多信息,请参阅数据转换 Kamelets。
1.3.2.2. 数据转换 Kamelets
使用以下数据转换 Kamelets,您可以序列化和反序列化在源和接收器组件之间传递的数据格式。数据转换适用于事件数据的有效负载(而不是键或标头)。
avro - 为 Apache Hadoop 提供数据序列化和数据交换服务的开源项目。
-
avro Deserialize Action Kamelet (
avro-deserialize-action
) -
avro Serialize Action Kamelet (
avro-serialize-action
)
-
avro Deserialize Action Kamelet (
协议缓冲 - Google 发明的紧凑二进制连接格式,以便他们可以在内部使用它,以便他们能够与其内部网络服务通信。
-
protobuf Deserialize Action Kamelet (
protobuf-deserialize-action
) -
protobuf Serialize Action Kamelet (
protobuf-serialize-action
)
-
protobuf Deserialize Action Kamelet (
JSON (JavaScript 对象表示法)- 基于 JavaScript 编程语言的子集的数据交互格式。JSON 是完全独立于语言的文本格式。
-
JSON Deserialize Action Kamelet (
json-deserialize-action
) -
JSON Serialize Action Kamelet (
json-serialize-action
)
-
JSON Deserialize Action Kamelet (
您必须在 Avro 和 Protobuf serialize/deserialize Kamelets 中指定 schema (作为单行,使用 JSON 格式)。您不需要对 JSON 序列化/反序列化 Kamelets 这样做。
1.3.2.3. 数据转换 Kamelets
使用以下数据转换 Kamelets,您可以对在源和接收器组件之间传递的数据执行简单的操作:
-
提取 Field - 使用
extract-field-action
Kamelet 从数据正文中拉取字段,并将整个数据正文替换为提取的字段。 -
Hoist Field - 使用
hoist-field-action
Kamelet 将数据正文嵌套到一个字段中。 -
insert Header - 使用
insert-header-action
Kamelet 使用静态数据或记录元数据添加标头字段。 -
插入 Field - 使用
insert-field-action
Kamelet 来通过使用静态数据或记录元数据添加字段值。 Mask Field - 使用
mask-field-action
Kamelet 为字段类型使用一个有效的 null 值(如 0 或空字符串)或一个给定的值(需要是一个非空的字符串或一个数字值)替换字段的值。例如,如果要从关系数据库捕获数据以发送到 Kafka,并且数据包含受保护的(PCI/PII)信息,如果您的 Kafka 集群尚未认证,则必须屏蔽受保护的信息。
-
替换 Field - 使用
replace-field-action
Kamelet 来过滤或重命名字段。您可以指定用于重命名、禁用(排除)或启用(包括)的字段。 -
To Key -(用于 Kafka)使用
value-to-key-action
Kamelet 将记录键替换为有效负载中字段子集的新键。您可以将 event 键设置为基于事件信息的值,然后再将数据写入 Kafka。例如,当从数据库表中读取记录时,您可以根据客户 ID 对 Kafka 中的记录进行分区。
1.4. 处理连接中的错误
要指定在发送或接收事件数据时正在运行的集成遇到故障时应该做什么,您可以选择将以下错误处理策略之一添加到 Kamelet Binding 中:
- 没有错误处理程序 - 忽略集成中发生的所有故障。
- 日志错误处理程序 - 向标准输出发送日志消息。
- 死信频道错误处理程序 - 将失败事件重定向到另一个组件,如第三方 URI、队列或其他 Kamelet,它们可以通过失败事件执行某些逻辑。还支持在将消息发送到死信端点之前尝试恢复消息交换次数。
- bean 错误 handler - specifys,以使用自定义 bean 处理错误。
- ref error handler - specifys 使用 bean 进行处理错误。Bean 在运行时必须在 Camel registry 中提供。
1.4.1. 将错误处理程序策略添加到 Kamelet Binding
要在源和接收器连接间发送或接收事件数据时处理错误,请在 Kamelet Binding 中添加错误处理程序策略。
先决条件
- 您知道您要使用的错误处理程序策略类型。
-
您有一个现有的
KameletBinding
YAML 文件。
流程
在 Kamelet Binding 中实施错误处理:
-
在编辑器中打开
KameletBinding
YAML 文件。 在
sink
定义后,在spec
部分添加 error handler 部分:apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: example-kamelet-binding spec: source: ... sink: ... errorHandler: ...
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: example-kamelet-binding spec: source: ... sink: ... errorHandler: ...
Copy to Clipboard Copied! 例如,在
coffee-to-log
Kamelet Binding 中,通过添加日志错误处理器来指定错误发送到日志文件的次数:apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffee-to-log spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: coffee-source properties: period: 5000 sink: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: log-sink errorHandler: log: parameters: maximumRedeliveries: 3
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffee-to-log spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: coffee-source properties: period: 5000 sink: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: log-sink errorHandler: log: parameters: maximumRedeliveries: 3
Copy to Clipboard Copied! - 保存您的文件。
1.4.2. 错误处理程序
1.4.2.1. 没有错误处理程序
如果要忽略集成中发生的任何故障,则无法在 Kamelet Binding 中包含 errorHandler
部分,或者将其设置为 none,如下例所示:
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: my-kamelet-binding spec: source: ... sink: ... errorHandler: none:
apiVersion: camel.apache.org/v1alpha1
kind: KameletBinding
metadata:
name: my-kamelet-binding
spec:
source:
...
sink:
...
errorHandler:
none:
1.4.2.2. 日志错误处理程序
处理任何失败的默认行为是将日志消息发送到标准输出。另外,您可以使用日志错误处理程序来指定其他行为,如重新传送或延迟策略,如下例所示:
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: my-kamelet-binding spec: source: ... sink: ... errorHandler: log: parameters: maximumRedeliveries: 3 redeliveryDelay: 2000
apiVersion: camel.apache.org/v1alpha1
kind: KameletBinding
metadata:
name: my-kamelet-binding
spec:
source:
...
sink:
...
errorHandler:
log:
parameters:
maximumRedeliveries: 3
redeliveryDelay: 2000
1.4.2.3. 死信频道错误处理程序
死信频道允许您将任何失败事件重定向到任何其他组件(如第三方 URI、队列或其他 Kamelet),用于定义如何处理失败事件,如下例所示:
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: my-kamelet-binding spec: source: ... sink: ... errorHandler: dead-letter-channel: endpoint: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: error-handler properties: message: "ERROR!" ... parameters: maximumRedeliveries: 1
apiVersion: camel.apache.org/v1alpha1
kind: KameletBinding
metadata:
name: my-kamelet-binding
spec:
source:
...
sink:
...
errorHandler:
dead-letter-channel:
endpoint:
ref:
kind: Kamelet
apiVersion: camel.apache.org/v1alpha1
name: error-handler
properties:
message: "ERROR!"
...
parameters:
maximumRedeliveries: 1
-
对于 端点,您可以使用
ref
或uri
。根据kind
,apiVersion
和name
的值的 Camel K operator interpretsref
。您可以使用任何 Kamelet、Kafka Topic 频道或 Knative 目的地。 -
属于端点的属性(本例中为一个名为
error-handler
的 Kamelet)。 - 属于 dead-letter-channel 错误处理程序类型 的参数。
1.4.2.4. Bean 错误处理程序
使用 Bean 错误处理程序,您可以通过提供处理错误的自定义 Bean 来扩展 Error Handler 的功能。对于 类型
,指定 ErrorHandlerBuilder
的完全限定名称。对于 属性
,请配置您在 type
中指定的 ErrorHandlerBuilder
所期望的属性。
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: my-kamelet-binding spec: source: ... sink: ... errorHandler: bean: type: "org.apache.camel.builder.DeadLetterChannelBuilder" properties: deadLetterUri: log:error
apiVersion: camel.apache.org/v1alpha1
kind: KameletBinding
metadata:
name: my-kamelet-binding
spec:
source:
...
sink:
...
errorHandler:
bean:
type: "org.apache.camel.builder.DeadLetterChannelBuilder"
properties:
deadLetterUri: log:error
1.4.2.5. ref 错误处理程序
使用 Ref 错误处理程序,您可以使用在运行时在 Camel registry 中期望可用的任何 bean。在以下示例中,my-custom-builder
是要在运行时查看的 bean 的名称。
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: my-kamelet-binding spec: source: ... sink: ... errorHandler: ref: my-custom-builder
apiVersion: camel.apache.org/v1alpha1
kind: KameletBinding
metadata:
name: my-kamelet-binding
spec:
source:
...
sink:
...
errorHandler:
ref: my-custom-builder
另请参阅:
第 2 章 使用 Kamelets 连接到 Kafka
Apache Kafka 是一个开源、分布式、发布-订阅消息传递系统,用于创建容错、实时数据源。Kafka 为大量消费者(外部连接)快速存储和复制数据。
Kafka 可帮助您构建处理流事件的解决方案。分布式、事件驱动的构架需要捕获、沟通和帮助处理事件的"后退"。Kafka 可以充当将数据源和事件连接到应用程序的通信主干。
您可以使用 Kamelets 配置 Kafka 和外部资源之间的通信。kamelets 允许您配置数据如何从一个端点移到 Kafka 流处理框架中的另一个端点,而无需编写代码。kamelets 是通过指定参数值来配置的路由模板。
例如,Kafka 以二进制形式存储数据。您可以使用 Kamelets 序列化和反序列化数据以发送到和接收外部连接。使用 Kamelets,您可以验证 schema 并对数据进行更改,如添加到其中、过滤它或屏蔽它。kamelets 也可以处理和处理错误。
2.1. 使用 Kamelets 连接到 Kafka 概述
如果使用 Apache Kafka 流处理框架,您可以使用 Kamelets 将服务和应用程序连接到 Kafka 主题。Kamelet Catalog 提供以下 Kamelets,专门用于连接到 Kafka 主题:
-
kafka-sink
- 将事件从数据制作者移到 Kafka 主题。在 Kamelet Binding 中,将kafka-sink
Kamelet 指定为接收器。 -
kafka-source
- 将事件从 Kafka 主题移到数据消费者。在 Kamelet Binding 中,将kafka-source
Kamelet 指定为源。
图 2.1 演示了将源和接收器 Kamelets 连接到 Kafka 主题的流。

图 2.1:带有 Kamelets 和 Kafka 主题的数据流
以下是使用 Kamelets 和 Kamelet Bindings 将应用程序和服务连接到 Kafka 主题的基本步骤概述:
设置 Kafka:
安装所需的 OpenShift operator。
- 对于 AMQ 流,安装 Camel K 和 AMQ 流运算符和 Camel K CLI。
- 创建 Kafka 实例。Kafka 实例作为消息代理运行。代理包含主题,并编配存储和传递信息。
- 创建 Kafka 主题。主题提供数据存储的目的地。
- 获取 Kafka 身份验证凭据。
- 确定您要连接到 Kafka 主题的服务或应用程序。
- 查看 Kamelet Catalog,以查找您要添加到集成的源和接收器组件的 Kamelets。另外,确定您要使用的每个 Kamelet 所需的配置参数。
创建 Kamelet Bindings:
-
创建一个 Kamelet Binding,它将数据源(生成数据)连接到 Kafka 主题(使用
kafka-sink
Kamelet)。 -
创建一个 Kamelet Binding,它将 kafka 主题(使用
kafka-source
Kamelet)连接到数据接收器(消耗数据的组件)。
-
创建一个 Kamelet Binding,它将数据源(生成数据)连接到 Kafka 主题(使用
- (可选)通过在 Kamelet Binding 中添加一个或多个 action Kamelets 作为中间步骤,操作 Kafka 主题和数据源或接收器之间的数据。
- (可选)定义如何在 Kamelet Binding 中处理错误。
将 Kamelet Bindings 作为资源应用到项目。
Camel K operator 为每个 Kamelet Binding 生成单独的 Camel K 集成。
2.2. 将数据源连接到 Kamelet Binding 中的 Kafka 主题
要将数据源连接到 Kafka 主题,您可以创建一个 Kamelet Binding,如图 2.2 所示。
图 2.2 将数据源连接到 Kafka 主题
先决条件
您知道要将事件发送到的 Kafka 主题的名称。
此流程中的示例使用
test-topic
接收事件。您知道 Kafka 实例的以下参数值:
- bootstrapServers - 以逗号分隔的 Kafka Broker URL 列表。
- password - Kafka 验证的密码。对于 AMQ Streams 上的未经身份验证的 kafka 实例,您可以指定任何非空字符串。
- user - 向 Kafka 进行身份验证的用户名。对于 AMQ Streams 上的未经身份验证的 kafka 实例,您可以指定任何非空字符串。
-
securityProtocol - 您知道与 Kafka 代理通信的安全协议。对于 AMQ 流上的 Kafka 集群,它是
PLAINTEXT
。
您知道您要添加到 Camel K 集成和所需的实例参数中的 Kamelets。
此流程的 Kamelets 示例是:
coffee-source
Kamelet - 它有一个可选参数(句点
),用于指定发送每个事件的频率。您可以将代码从 Example 源 Kamelet 复制到名为coffee-source.kamelet.yaml
文件的文件,然后运行以下命令将其作为资源添加到命名空间中:oc apply -f coffee-source.kamelet.yaml
-
Kamelet Catalog 中提供的
kafka-sink
Kamelet。您可以使用kafka-sink
Kamelet,因为 Kafka 主题在这个绑定中接收数据(这是数据消费者)。
流程
要将数据源连接到 Kafka 主题,请创建一个 Kamelet Binding:
在您选择的编辑器中,使用以下基本结构创建一个 YAML 文件:
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: spec: source: sink:
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: spec: source: sink:
Copy to Clipboard Copied! 为 Kamelet Binding 添加一个名称。在本例中,名称是
coffees-to-kafka
,因为绑定将coffee-source
Kamelet 连接到kafka-sink
Kamelet。apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffees-to-kafka spec: source: sink:
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffees-to-kafka spec: source: sink:
Copy to Clipboard Copied! 对于 Kamelet Binding 的源,指定一个数据源 Kamelet (例如,
coffee-source
Kamelet 生成包含 coffee)的事件,并为 Kamelet 配置任何参数。apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffees-to-kafka spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: coffee-source properties: period: 5000 sink:
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffees-to-kafka spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: coffee-source properties: period: 5000 sink:
Copy to Clipboard Copied! 对于 Kamelet Binding 的接收器,指定
kafka-sink
Kamelet 及其必要属性。例如,当 Kafka 集群位于 AMQ Streams 中时,将
securityProtocol
属性设置为"PLAINTEXT"
:apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffees-to-kafka spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: coffee-source properties: period: 5000 sink: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: kafka-sink properties: bootstrapServers: "broker.url:9092" password: "testpassword" topic: "test-topic" user: "testuser" securityProtocol: "PLAINTEXT"
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffees-to-kafka spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: coffee-source properties: period: 5000 sink: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: kafka-sink properties: bootstrapServers: "broker.url:9092" password: "testpassword" topic: "test-topic" user: "testuser" securityProtocol: "PLAINTEXT"
Copy to Clipboard Copied! -
保存 YAML 文件(例如,
coffees-to-kafka.yaml
)。 - 登录您的 OpenShift 项目。
将 Kamelet Binding 作为资源添加到 OpenShift 命名空间中:
oc apply -f <kamelet binding filename>
例如:
oc apply -f coffees-to-kafka.yaml
Camel K operator 使用
KameletBinding
资源生成并运行 Camel K 集成。构建可能需要几分钟时间。查看
KameletBinding
资源的状态:oc get kameletbindings
查看其集成的状态:
oc get integrations
查看集成日志:
kamel logs <integration> -n <project>
例如:
kamel logs coffees-to-kafka -n my-camel-k-kafka
2.3. 将 Kafka 主题连接到 Kamelet Binding 中的数据接收器
要将 Kafka 主题连接到数据接收器,您可以创建一个 Kamelet Binding,如图 2.3 所示。
图 2.3 将 Kafka 主题连接到数据接收器
先决条件
-
您知道您要从中发送事件的 Kafka 主题的名称。此流程中的示例使用
test-topic
来发送事件。您用来从将数据源 连接到 Kamelet Binding 中的 Kafka 主题中的 coffee 源中接收事件的主题 相同。 您知道 Kafka 实例的以下参数值:
- bootstrapServers - 以逗号分隔的 Kafka Broker URL 列表。
- password - Kafka 验证的密码。
- user - 向 Kafka 进行身份验证的用户名。
-
您知道与 Kafka 代理通信的安全协议。对于 AMQ 流上的 Kafka 集群,它是
PLAINTEXT
。 您知道您要添加到 Camel K 集成和所需的实例参数中的 Kamelets。此流程的示例 Kamelets 在 Kamelet Catalog 中提供:
kafka-source
Kamelet - 使用kafka-source
Kamelet,因为 Kafka 主题在此绑定中发送数据(这是数据制作者)。所需的参数的值示例有:-
bootstrapServers -
"broker.url:9092"
-
password -
"testpassword"
-
user -
"testuser"
-
topic -
"test-topic"
-
securityProtocol - 对于 AMQ 流上的 Kafka 集群,这个参数值为
"PLAINTEXT
"。
-
bootstrapServers -
-
log-sink
Kamelet - 使用log-sink
来记录它从kafka-source
Kamelet 接收的数据。(可选)指定showStreams
参数,以显示数据的消息正文。log-sink
Kamelet 可用于调试目的。
流程
要将 Kafka 主题连接到数据接收器,请创建一个 Kamelet Binding:
在您选择的编辑器中,使用以下基本结构创建一个 YAML 文件:
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: spec: source: sink:
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: spec: source: sink:
Copy to Clipboard Copied! 为 Kamelet Binding 添加一个名称。在本例中,名称是
kafka-to-log
,因为绑定将kafka-source
Kamelet 连接到log-sink
Kamelet。apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: kafka-to-log spec: source: sink:
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: kafka-to-log spec: source: sink:
Copy to Clipboard Copied! -
对于 Kamelet Binding 的源,指定
kafka-source
Kamelet 并配置其参数。
例如,当 Kafka 集群位于 AMQ Streams 中时,必须将 securityProtocol
参数设置为 "PLAINTEXT"
:
+
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: kafka-to-log spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: kafka-source properties: bootstrapServers: "broker.url:9092" password: "testpassword" topic: "test-topic" user: "testuser" securityProtocol: "PLAINTEXT" sink:
apiVersion: camel.apache.org/v1alpha1
kind: KameletBinding
metadata:
name: kafka-to-log
spec:
source:
ref:
kind: Kamelet
apiVersion: camel.apache.org/v1alpha1
name: kafka-source
properties:
bootstrapServers: "broker.url:9092"
password: "testpassword"
topic: "test-topic"
user: "testuser"
securityProtocol: "PLAINTEXT"
sink:
对于 Kamelet Binding 的接收器,指定数据消费者 Kamelet (例如,
log-sink
Kamelet)并为 Kamelet 配置任何参数,例如:apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: kafka-to-log spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: kafka-source properties: bootstrapServers: "broker.url:9092" password: "testpassword" topic: "test-topic" user: "testuser" securityProtocol: "PLAINTEXT" // only for AMQ streams sink: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: log-sink properties: showStreams: true
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: kafka-to-log spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: kafka-source properties: bootstrapServers: "broker.url:9092" password: "testpassword" topic: "test-topic" user: "testuser" securityProtocol: "PLAINTEXT" // only for AMQ streams sink: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: log-sink properties: showStreams: true
Copy to Clipboard Copied! -
保存 YAML 文件(如
kafka-to-log.yaml
)。 - 登录您的 OpenShift 项目。
将 Kamelet Binding 作为资源添加到 OpenShift 命名空间中:
oc apply -f <kamelet binding filename>
例如:
oc apply -f kafka-to-log.yaml
Camel K operator 使用
KameletBinding
资源生成并运行 Camel K 集成。构建可能需要几分钟时间。查看
KameletBinding
资源的状态:oc get kameletbindings
查看其集成的状态:
oc get integrations
查看集成日志:
kamel logs <integration> -n <project>
例如:
kamel logs kafka-to-log -n my-camel-k-kafka
在输出中,您应该看到 coffee 事件,例如:
INFO [log-sink-E80C5C904418150-0000000000000001] (Camel (camel-1) thread #0 - timer://tick) {"id":7259,"uid":"a4ecb7c2-05b8-4a49-b0d2-d1e8db5bc5e2","blend_name":"Postmodern Symphony","origin":"Huila, Colombia","variety":"Kona","notes":"delicate, chewy, black currant, red apple, star fruit","intensifier":"balanced"}
INFO [log-sink-E80C5C904418150-0000000000000001] (Camel (camel-1) thread #0 - timer://tick) {"id":7259,"uid":"a4ecb7c2-05b8-4a49-b0d2-d1e8db5bc5e2","blend_name":"Postmodern Symphony","origin":"Huila, Colombia","variety":"Kona","notes":"delicate, chewy, black currant, red apple, star fruit","intensifier":"balanced"}
Copy to Clipboard Copied! 要停止正在运行的集成,请删除关联的 Kamelet Binding 资源:
oc delete kameletbindings/<kameletbinding-name>
例如:
oc delete kameletbindings/kafka-to-log
2.4. 将操作应用到 Kafka 连接中的数据
如果要对 Kamelet 和 Kafka 主题之间的数据执行操作,请使用 action Kamelets 作为 Kamelet Binding 中的中间步骤。
2.4.1. 将事件数据路由到不同的目标主题
当您配置到 Kafka 实例时,您可以选择从事件数据转换主题信息,以便事件路由到不同的 Kafka 主题。使用以下转换操作 Kamelets 之一:
-
正则表达式路由器 - 使用正则表达式和替换字符串修改消息的主题。例如,如果要删除主题前缀、添加前缀或删除主题名称的一部分。配置 Regex Router Action Kamelet (
regex-router-action
)。 -
timestamp - 根据原始主题和消息的时间戳修改消息的主题。例如,在使用需要写入不同表或基于时间戳的索引的接收器时。例如,当您要将事件从 Kafka 写入 Elasticsearch 时,但每个事件都需要根据事件本身中的信息进入不同的索引。配置 Timestamp Router Action Kamelet (
timestamp-router-action
)。 -
Message TimeStamp - 修改消息的主题,它基于原始主题值和来自消息值字段的 timestamp 字段。配置 Message Timestamp Router Action Kamelet (
message-timestamp-router-action
)。 -
predicate- Filter 事件基于给定的 JSON 路径表达式,方法是配置 Predicate Filter Action Kamelet (
predicate-filter-action
)。
先决条件
-
您已创建了一个 Kamelet Binding,它是一个
kafka-sink
Kamelet,如将数据源 连接到 Kamelet Binding 中的 Kafka 主题 中所述。 - 您知道您要添加到 Kamelet Binding 中的转换类型。
流程
要转换目的地主题,请使用 transformation action Kamelets 作为 Kamelet Binding 中的中间步骤。
有关如何将 action Kamelet 添加到 Kamelet Binding 的详细信息,请参阅 Adding a operation to a Kamelet Binding。
2.4.2. 过滤特定 Kafka 主题的事件数据
如果您使用 source Kamelet 将记录生成到许多不同的 Kafka 主题,并且您要将记录过滤成一个 Kafka 主题,请将 topic-name-matches-filter-action Kamelet 作为 Kamelet Binding 中的中间步骤添加 topic-name-matches-filter-action
Kamelet。
先决条件
- 您已在 YAML 文件中创建了一个 Kamelet Binding。
- 您知道要从中过滤出事件数据的 Kafka 主题的名称。
流程
编辑 Kamelet Binding,使其包含
topic-name-matches-filter-action
Kamelet 作为源和接收器 Kamelets 之间的中间步骤。通常,您可以使用
kafka-source
Kamelet 作为源 Kamelet,并提供一个主题作为所需主题参数的值
。在以下 Kamelet Binding 示例中,
kafka-source
Kamelet 指定test-topic、test-topic-2 和 test-topic-3
Kafka 主题和topic-name-matches-filter-action
Kamelet 指定从topic-test
主题过滤事件数据:apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: kafka-to-log-by-topic spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: kafka-source properties: bootstrapServers: "broker.url:9092" password: "testpassword" topic: "test-topic, test-topic-2, test-topic-3" user: "testuser" securityProtocol: "PLAINTEXT" // only for AMQ streams steps: - ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: topic-name-matches-filter-action properties: regex: "test-topic" sink: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: log-sink properties: showStreams: true
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: kafka-to-log-by-topic spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: kafka-source properties: bootstrapServers: "broker.url:9092" password: "testpassword" topic: "test-topic, test-topic-2, test-topic-3" user: "testuser" securityProtocol: "PLAINTEXT" // only for AMQ streams steps: - ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: topic-name-matches-filter-action properties: regex: "test-topic" sink: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: log-sink properties: showStreams: true
Copy to Clipboard Copied! 如果要过滤
kafka-source
Kamelet 以外的源 Kamelet 主题,您必须提供 Kafka 主题信息。您可以使用insert-header-action
Kamelet 将 Kafka topic 字段添加为中间步骤,在 Kamelet Binding 中的topic-name-matches-filter-action
步骤前,如下例所示:apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffee-to-log-by-topic spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: coffee-source properties: period: 5000 steps: - ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: insert-header-action properties: name: "KAFKA.topic" value: "test-topic" - ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: topic-name-matches-filter-action properties: regex: "test-topic" sink: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: log-sink properties: showStreams: true
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffee-to-log-by-topic spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: coffee-source properties: period: 5000 steps: - ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: insert-header-action properties: name: "KAFKA.topic" value: "test-topic" - ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: topic-name-matches-filter-action properties: regex: "test-topic" sink: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: log-sink properties: showStreams: true
Copy to Clipboard Copied! - 保存 Kamelet Binding YAML 文件。
第 3 章 使用 Kamelet 连接到 Knative
您可以将 Kamelets 连接到 Knative 目的地(频道或代理)。Red Hat OpenShift Serverless 基于开源 Knative 项目,它通过启用企业级无服务器平台在混合和多云环境之间提供可移植性和一致性。OpenShift Serverless 包括对 Knative Eventing 和 Knative Serving 组件的支持。
Red Hat OpenShift Serverless、Knative Eventing 和 Knative Serving 可让您使用带有无服务器应用程序的事件驱动的架构,使用发布订阅或事件流模型分离事件制作者和消费者之间的关系。Knative Eventing 使用标准 HTTP POST 请求来发送和接收事件创建者和用户之间的事件。这些事件符合 CloudEvents 规范,它允许在任何编程语言中创建、解析、发送和接收事件。
您可以使用 Kamelets 将 CloudEvents 发送到 Knative,并将其从 Knative 发送到事件消费者。kamelets 可将消息转换为 CloudEvents,您可以使用它们应用 CloudEvents 中任何预处理和后处理数据。
3.1. 使用 Kamelets 连接到 Knative 概述
如果使用 Knative 流处理框架,您可以使用 Kamelets 将服务和应用程序连接到 Knative 目标(频道或代理)。
图 3.1 演示了将源和接收器 Kamelets 连接到 Knative 目的地的流。

图 3.1:带有 Kamelets 和 Knative 频道的数据流
以下是使用 Kamelets 和 Kamelet Bindings 将应用程序和服务连接到 Knative 目的地的基本步骤概述:
设置 Knative:
- 通过安装 Camel K 和 OpenShift Serverless operator 来准备 OpenShift 集群。
- 安装所需的 Knative Serving 和 Eventing 组件。
- 创建 Knative 频道或代理。
- 决定您要连接到 Knative 频道或代理的服务或应用程序。
- 查看 Kamelet Catalog,以查找您要添加到集成的源和接收器组件的 Kamelets。另外,确定您要使用的每个 Kamelet 所需的配置参数。
创建 Kamelet Bindings:
- 创建一个 Kamelet Binding,将源 Kamelet 连接到 Knative 频道(或代理)。
- 创建一个 Kamelet Binding,将 Knative 频道(或代理)连接到接收器 Kamelet。
- 另外,还可通过将一个或多个 action Kamelets 作为 Kamelet Binding 中的中间步骤,操作在 Knative 频道(或代理)和数据源或接收器间传递的数据。
- (可选)定义如何在 Kamelet Binding 中处理错误。
- 将 Kamelet Bindings 作为资源应用到项目。
Camel K operator 为每个 Kamelet Binding 生成单独的 Camel 集成。
当您将 Kamelet Binding 配置为使用 Knative 频道或代理作为事件源时,Camel K operator 会将对应的集成化为 Knative Serving 服务,以利用 Knative 提供的自动扩展功能。
3.2. 设置 Knative
设置 Knative 涉及安装所需的 OpenShift operator 并创建 Knative 频道。
3.2.1. 准备 OpenShift 集群
要使用 Kamelets 和 OpenShift Serverless,请安装以下 Operator、组件和 CLI 工具:
Red Hat Integration - Camel K operator 和 CLI 工具 - 操作器安装和管理 Camel K - 一个轻量级集成框架,可在 OpenShift 上的云中原生运行。
kamel
CLI 工具允许您访问所有 Camel K 功能。请参阅 安装 Camel K 中的安装说明。
-
OpenShift Serverless operator - 提供一系列 API,它允许容器、微服务和功能运行 "serverless"。无服务器应用程序可按需扩展和缩减(从零),并由多个事件源触发。安装 OpenShift Serverless Operator 时,它会自动创建
knative-serving
命名空间(用于安装 Knative Serving 组件)和knative-eventing
命名空间(安装 Knative Eventing 组件是必需的)。 - Knative Eventing 组件
- Knative Serving 组件
-
Knative CLI 工具(
kn
)- 允许您从命令行或 Shell 脚本中创建 Knative 资源。
3.2.2. 创建 Knative 频道
Knative 频道是转发事件的自定义资源。事件源或生成程序将事件发送到频道后,可使用订阅将这些事件发送到多个 Knative 服务或其他 sink。
本例使用 InMemoryChannel
频道,用于 OpenShift Serverless 进行开发目的。请注意,InMemoryChannel
类型频道有以下限制:
- 事件没有持久性。如果 Pod 停机,则 Pod 上的事件将会丢失。
-
InMemoryChannel
频道没有实现事件排序,因此同时接收到的两个事件可能会以任何顺序传送给订阅者。 - 如果订阅者拒绝某个事件,则不会默认重新发送尝试。您可以通过修改 Subscription 对象中的 delivery 规格来配置重新发送尝试。
先决条件
- OpenShift Serverless operator、Knative Eventing 和 Knative Serving 组件已安装在 OpenShift Container Platform 集群中。
-
已安装 OpenShift Serverless CLI (
kn
)。 - 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
流程
- 登录您的 OpenShift 集群。
打开您要在其中创建集成应用程序的项目。例如:
oc project camel-k-knative
使用 Knative (
kn
) CLI 命令创建频道kn channel create <channel_name> --type <channel_type>
例如,要创建一个名为
mychannel
的频道:kn channel create mychannel --type messaging.knative.dev:v1:InMemoryChannel
要确认频道现在存在,请输入以下命令列出所有现有频道:
kn 频道列表
您应该在列表中看到您的频道。
3.2.3. 创建 Knative 代理
Knative 代理是一个自定义资源,它定义了一个事件网格来收集 CloudEvents 池。OpenShift Serverless 提供了一个 default Knative 代理,您可以使用 kn
CLI 创建该代理。
您可以在 Kamelet Binding 中使用代理,例如,当应用程序处理多个事件类型时,您不想为每个事件类型创建频道。
先决条件
- OpenShift Serverless operator、Knative Eventing 和 Knative Serving 组件已安装在 OpenShift Container Platform 集群中。
-
已安装 OpenShift Serverless CLI (
kn
)。 - 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
流程
- 登录您的 OpenShift 集群。
打开您要在其中创建集成应用程序的项目。例如:
oc project camel-k-knative
使用此 Knative (
kn
) CLI 命令创建代理:kn 代理创建 default
要确认代理现在存在,请输入以下命令列出所有现有代理:
kn 代理列表
您应该在列表中看到 default 代理。
3.3. 在 Kamelet Binding 中将数据源连接到 Knative 目的地
要将数据源连接到 Knative 目的地(频道或代理),您可以创建一个 Kamelet Binding,如图 3.2 所示。
图 3.2 将数据源连接到 Knative 目的地
Knative 目的地可以是 Knative 频道或 Knative 代理。
当您向频道发送数据时,该频道只有一个事件类型。您不需要在 Kamelet Binding 中为频道指定任何属性值。
当您将数据发送到代理时,因为代理可以处理多个事件类型,所以您必须在 Kamelet Binding 中引用代理时为 type 属性指定一个值。
先决条件
您知道要发送事件的 Knative 频道或代理的名称和类型。
此流程中的示例使用名为
mychannel
的InMemoryChannel
频道或名为default
的代理。对于代理示例,type
属性值对于coffee
事件是 coffee。您知道您要添加到 Camel 集成和所需的实例参数中的 Kamelet。
此流程的示例 Kamelet 是
coffee-source
Kamelet。它有一个可选参数period
,用于指定发送每个事件的频率。您可以将代码从 Example 源 Kamelet 复制到名为coffee-source.kamelet.yaml
文件的文件,然后运行以下命令将其作为资源添加到命名空间中:oc apply -f coffee-source.kamelet.yaml
流程
要将数据源连接到 Knative 目的地,请创建一个 Kamelet Binding:
在您选择的编辑器中,使用以下基本结构创建一个 YAML 文件:
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: spec: source: sink:
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: spec: source: sink:
Copy to Clipboard Copied! 为 Kamelet Binding 添加一个名称。在本例中,名称是
coffees-to-knative
,因为绑定将coffee-source
Kamelet 连接到 Knative 目的地。apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffees-to-knative spec: source: sink:
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffees-to-knative spec: source: sink:
Copy to Clipboard Copied! 对于 Kamelet Binding 的源,指定一个数据源 Kamelet (例如,
coffee-source
Kamelet 生成包含 coffee)的事件,并为 Kamelet 配置任何参数。apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffees-to-knative spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: coffee-source properties: period: 5000 sink:
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffees-to-knative spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: coffee-source properties: period: 5000 sink:
Copy to Clipboard Copied! 对于 Kamelet Binding 的 sink,请指定 Knative 频道或代理以及所需参数。
本例将 Knative 频道指定为接收器:
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffees-to-knative spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: coffee-source properties: period: 5000 sink: ref: apiVersion: messaging.knative.dev/v1 kind: InMemoryChannel name: mychannel
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffees-to-knative spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: coffee-source properties: period: 5000 sink: ref: apiVersion: messaging.knative.dev/v1 kind: InMemoryChannel name: mychannel
Copy to Clipboard Copied! 本例将 Knative 代理指定为接收器:
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffees-to-knative spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: coffee-source properties: period: 5000 sink: ref: kind: Broker apiVersion: eventing.knative.dev/v1 name: default properties: type: coffee
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffees-to-knative spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: coffee-source properties: period: 5000 sink: ref: kind: Broker apiVersion: eventing.knative.dev/v1 name: default properties: type: coffee
Copy to Clipboard Copied! -
保存 YAML 文件(例如,
coffees-to-knative.yaml
)。 - 登录您的 OpenShift 项目。
将 Kamelet Binding 作为资源添加到 OpenShift 命名空间中:
oc apply -f <kamelet binding filename>
例如:
oc apply -f coffees-to-knative.yaml
Camel K operator 使用
KameletBinding
资源生成并运行 Camel K 集成。构建可能需要几分钟时间。查看
KameletBinding
的状态:oc get kameletbindings
查看其集成的状态:
oc get integrations
查看集成日志:
kamel logs <integration> -n <project>
例如:
kamel logs coffees-to-knative -n my-camel-knative
另请参阅
3.4. 在 Kamelet Binding 中将 Knative 目的地连接到数据接收器
要将 Knative 目的地连接到数据接收器,您可以创建一个 Kamelet Binding,如图 3.3 所示。
图 3.3 将 Knative 目的地连接到数据接收器
Knative 目的地可以是 Knative 频道或 Knative 代理。
当您从频道发送数据时,该频道只有一个事件类型。您不需要在 Kamelet Binding 中为频道指定任何属性值。
当您从代理发送数据时,因为代理可以处理多个事件类型,所以您必须在 Kamelet Binding 中引用代理时为 type 属性指定一个值。
先决条件
您知道 Knative 频道的名称和类型,或者您要接收事件的代理名称。对于代理,您也知道要接收的事件类型。
此流程中的示例使用名为 mychannel 或名为 mybroker 和 coffee 事件(用于 type 属性)的 InMemoryChannel 频道。这些是相同的示例目的地,它们用于从将数据源 连接到 Kamelet Binding 中的 Knative 频道中的 coffee 源接收事件。
您知道您要添加到 Camel 集成和所需的实例参数中的 Kamelet。
此流程的 Kamelet 示例是
log-sink
Kamelet,它在 Kamelet Catalog 中提供,可用于测试和调试。指定显示数据的消息正文的showStreams
参数。
流程
要将 Knative 频道连接到数据接收器,请创建一个 Kamelet Binding:
在您选择的编辑器中,使用以下基本结构创建一个 YAML 文件:
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: spec: source: sink:
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: spec: source: sink:
Copy to Clipboard Copied! 为 Kamelet Binding 添加一个名称。在本例中,名称是
knative-to-log
,因为绑定将 Knative 目的地连接到log-sink
Kamelet。apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: knative-to-log spec: source: sink:
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: knative-to-log spec: source: sink:
Copy to Clipboard Copied! 对于 Kamelet Binding 的源,指定 Knative 频道或代理以及所需参数。
本例将 Knative 频道指定为源:
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: knative-to-log spec: source: ref: apiVersion: messaging.knative.dev/v1 kind: InMemoryChannel name: mychannel sink:
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: knative-to-log spec: source: ref: apiVersion: messaging.knative.dev/v1 kind: InMemoryChannel name: mychannel sink:
Copy to Clipboard Copied! 本例将 Knative 代理指定为源:
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: knative-to-log spec: source: ref: kind: Broker apiVersion: eventing.knative.dev/v1 name: default properties: type: coffee sink:
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: knative-to-log spec: source: ref: kind: Broker apiVersion: eventing.knative.dev/v1 name: default properties: type: coffee sink:
Copy to Clipboard Copied! 对于 Kamelet Binding 的接收器,指定数据消费者 Kamelet (例如,
log-sink
Kamelet)并为 Kamelet 配置任何参数,例如:apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: knative-to-log spec: source: ref: apiVersion: messaging.knative.dev/v1 kind: InMemoryChannel name: mychannel sink: ref: apiVersion: camel.apache.org/v1alpha1 kind: Kamelet name: log-sink properties: showStreams: true
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: knative-to-log spec: source: ref: apiVersion: messaging.knative.dev/v1 kind: InMemoryChannel name: mychannel sink: ref: apiVersion: camel.apache.org/v1alpha1 kind: Kamelet name: log-sink properties: showStreams: true
Copy to Clipboard Copied! -
保存 YAML 文件(如
knative-to-log.yaml
)。 - 登录您的 OpenShift 项目。
将 Kamelet Binding 作为资源添加到 OpenShift 命名空间中:
oc apply -f <kamelet binding filename>
例如:
oc apply -f knative-to-log.yaml
Camel K operator 使用
KameletBinding
资源生成并运行 Camel K 集成。构建可能需要几分钟时间。查看
KameletBinding
的状态:oc get kameletbindings
查看集成的状态:
oc get integrations
查看集成日志:
kamel logs <integration> -n <project>
例如:
kamel logs knative-to-log -n my-camel-knative
在输出中,您应该看到 coffee 事件,例如:
[1] INFO [sink] (vert.x-worker-thread-1) {"id":254,"uid":"8e180ef7-8924-4fc7-ab81-d6058618cc42","blend_name":"Good-morning Star","origin":"Santander, Colombia","variety":"Kaffa","notes":"delicate, creamy, lemongrass, granola, soil","intensifier":"sharp"} [1] INFO [sink] (vert.x-worker-thread-2) {"id":8169,"uid":"3733c3a5-4ad9-43a3-9acc-d4cd43de6f3d","blend_name":"Caf? Java","origin":"Nayarit, Mexico","variety":"Red Bourbon","notes":"unbalanced, full, granola, bittersweet chocolate, nougat","intensifier":"delicate"}
[1] INFO [sink] (vert.x-worker-thread-1) {"id":254,"uid":"8e180ef7-8924-4fc7-ab81-d6058618cc42","blend_name":"Good-morning Star","origin":"Santander, Colombia","variety":"Kaffa","notes":"delicate, creamy, lemongrass, granola, soil","intensifier":"sharp"} [1] INFO [sink] (vert.x-worker-thread-2) {"id":8169,"uid":"3733c3a5-4ad9-43a3-9acc-d4cd43de6f3d","blend_name":"Caf? Java","origin":"Nayarit, Mexico","variety":"Red Bourbon","notes":"unbalanced, full, granola, bittersweet chocolate, nougat","intensifier":"delicate"}
Copy to Clipboard Copied! 要停止正在运行的集成,请删除关联的 Kamelet Binding 资源:
oc delete kameletbindings/<kameletbinding-name>
例如:
oc delete kameletbindings/knative-to-log
另请参阅
第 4 章 创建 Kamelets
4.1. kamelet 结构
Kamelet 通常以 YAML 域特定语言编写。文件名前缀是 Kamelet 的名称。例如,名为 FTP sink 的 Kamelet 具有文件名 ftp-sink.kamelet.yaml
。
请注意,在 OpenShift 中,一个 Kamelet 是一个资源,显示 Kamelet 的名称(而不是文件名)。
在高级别上,一个 Kamelet 资源描述:
-
包含 Kamelet 和其他信息的 metadata 部分,如 Kamelet 类型(
源
、sink
或action
)。 - 包含可用于配置 Kamelet 的一组参数的定义(JSON-schema 规格)。
-
可选
type
部分,其中包含 Kamelet 预期的输入和输出信息。 - YAML DSL 中的 Camel 模板,用于定义 Kamelet 的实施。
下图显示了 Kamelet 及其部分的示例。
Kamelet 结构示例
telegram-text-source.kamelet.yaml apiVersion: camel.apache.org/v1alpha1 kind: Kamelet metadata: name: telegram-source annotations: camel.apache.org/catalog.version: "master-SNAPSHOT" camel.apache.org/kamelet.icon: "data:image/..." camel.apache.org/provider: "Red Hat" camel.apache.org/kamelet.group: "Telegram" labels: camel.apache.org/kamelet.type: "source" spec: definition: title: "Telegram Source" description: |- Receive all messages that people send to your telegram bot. To create a bot, contact the @botfather account using the Telegram app. The source attaches the following headers to the messages: - chat-id / ce-chatid: the ID of the chat where the message comes from required: - authorizationToken type: object properties: authorizationToken: title: Token description: The token to access your bot on Telegram, that you can obtain from the Telegram "Bot Father". type: string format: password x-descriptors: - urn:alm:descriptor:com.tectonic.ui:password types: out: mediaType: application/json dependencies: - "camel:jackson" - "camel:kamelet" - "camel:telegram" template: from: uri: telegram:bots parameters: authorizationToken: "{{authorizationToken}}" steps: - set-header: name: chat-id simple: "${header[CamelTelegramChatId]}" - set-header: name: ce-chatid simple: "${header[CamelTelegramChatId]}" - marshal: json: {} - to: "kamelet:sink"
telegram-text-source.kamelet.yaml
apiVersion: camel.apache.org/v1alpha1
kind: Kamelet
metadata:
name: telegram-source
annotations:
camel.apache.org/catalog.version: "master-SNAPSHOT"
camel.apache.org/kamelet.icon: "data:image/..."
camel.apache.org/provider: "Red Hat"
camel.apache.org/kamelet.group: "Telegram"
labels:
camel.apache.org/kamelet.type: "source"
spec:
definition:
title: "Telegram Source"
description: |-
Receive all messages that people send to your telegram bot.
To create a bot, contact the @botfather account using the
Telegram app.
The source attaches the following headers to the messages:
- chat-id / ce-chatid: the ID of the chat where the
message comes from
required:
- authorizationToken
type: object
properties:
authorizationToken:
title: Token
description: The token to access your bot on Telegram, that you
can obtain from the Telegram "Bot Father".
type: string
format: password
x-descriptors:
- urn:alm:descriptor:com.tectonic.ui:password
types:
out:
mediaType: application/json
dependencies:
- "camel:jackson"
- "camel:kamelet"
- "camel:telegram"
template:
from:
uri: telegram:bots
parameters:
authorizationToken: "{{authorizationToken}}"
steps:
- set-header:
name: chat-id
simple: "${header[CamelTelegramChatId]}"
- set-header:
name: ce-chatid
simple: "${header[CamelTelegramChatId]}"
- marshal:
json: {}
- to: "kamelet:sink"
- Kamelet ID - 当您要引用 Kamelet 时,在 Camel K 集成中使用此 ID。
- 注解(如 icon)为 Kamelet 提供显示功能。
- 标签允许用户查询 Kamelets (例如: kind: "source", "sink", 或 "action")
- JSON-schema 规格格式的 Kamelet 和参数的描述。
- 输出的介质类型(可以包括 schema)。
- 定义 Kamelet 行为的路由模板。
4.2. 源 Kamelet 示例
以下是 coffee-source
Kamelet 示例的内容:
apiVersion: camel.apache.org/v1alpha1 kind: Kamelet metadata: name: coffee-source labels: camel.apache.org/kamelet.type: "source" spec: definition: title: "Coffee Source" description: "Retrieve a random coffee from a catalog of coffees" properties: period: title: Period description: The interval between two events in seconds type: integer default: 1000 types: out: mediaType: application/json template: from: uri: timer:tick parameters: period: "{{period}}" steps: - to: "https://random-data-api.com/api/coffee/random_coffee" - to: "kamelet:sink"
apiVersion: camel.apache.org/v1alpha1
kind: Kamelet
metadata:
name: coffee-source
labels:
camel.apache.org/kamelet.type: "source"
spec:
definition:
title: "Coffee Source"
description: "Retrieve a random coffee from a catalog of coffees"
properties:
period:
title: Period
description: The interval between two events in seconds
type: integer
default: 1000
types:
out:
mediaType: application/json
template:
from:
uri: timer:tick
parameters:
period: "{{period}}"
steps:
- to: "https://random-data-api.com/api/coffee/random_coffee"
- to: "kamelet:sink"