1.2. 连接源和接收器
当您要连接两个或多个组件(外部应用程序或服务)时,请使用 Kamelets。每个 Kamelet 基本上是一个带有配置属性的路由模板。您需要知道您要从(一个源)获取数据的组件以及您要将数据发送到哪个组件(接收器)。您可以通过在 Kamelet Binding 中添加 Kamelets 来连接源和接收器组件,如图 1.1 所示。

图 1.1: Kamelet Binding source to sink
以下是在 Kamelet Binding 中使用 Kamelets 的步骤概述:
- 安装 Camel K operator。它包括一个 Kamelets 目录,作为 OpenShift 项目中的资源。
- 创建一个 Kamelet Binding。决定要在 Kamelet Binding 中连接的服务或应用程序。
- 查看 Kamelet Catalog,以查找您要使用的源和接收器组件的 Kamelets。
- 对于您要包含在 Kamelet Binding 中的每个 Kamelet,请确定您需要设置的配置属性。
- 在 Kamelet Binding 代码中,添加对每个 Kamelet 的引用并配置所需属性。
- 将 Kamelet Binding 作为资源应用到 OpenShift 项目中。
Camel K operator 使用 Kamelet Binding 来生成和运行集成。
1.2.1. 查看 Kamelet 目录
安装 Camel K 操作器时,它包含一个可在 Camel K 集成中使用的 Kamelets 目录。
前提条件
您在工作命名空间或集群范围内安装了 Camel K operator,如 安装 Camel K 所述。
流程
查看安装 Camel K operator 的 Kamelets 列表:
- 在终端窗口中,登录到您的 OpenShift 集群。
查看可用的 Kamelets 列表取决于如何安装 Camel K operator (在特定命名空间或 cluster-mode 中):
如果 Camel K Operator 在 cluster-mode 中安装,请使用这个命令查看可用的 Kamelets:
oc get kamelet -n openshift-operators
如果 Camel K Operator 安装在特定命名空间中:
打开安装 Camel K operator 的项目。
oc project <camelk-project>
例如,如果 Camel K 操作器安装在
my-camel-k-project
项目中:oc project my-camel-k-project
运行以下命令:
oc get kamelets
有关红帽支持的 Kamelets 列表,请参阅 Red Hat Integration 发行注记 。
1.2.1.1. 在 Kamelet Catalog 中添加自定义 Kamelet
如果您没有在符合您的要求的目录中看到 Kamelet,则 Camel DSL 开发人员可创建自定义 Kamelet,如 Apache Camel Kamelets Developers 指南 (社区文档)中所述。Kamelet 以 YAML
格式编码,根据惯例,它有一个 .kamelet.yaml
文件扩展名。
先决条件
- Camel DSL 开发人员为您提供了一个自定义 Kamelet 文件。
- Kamelet 名称对于安装 Camel K operator 的 OpenShift 命名空间必须是唯一的。
流程
使自定义 Kamelet 作为 OpenShift 命名空间中的资源提供:
-
将 Kamelet
YAML
文件(如custom-sink.kamelet.yaml
)下载到本地文件夹。 - 登录到您的 OpenShift 集群。
在终端窗口中,打开安装 Camel K operator 的项目,如
my-camel-k-project
:oc project my-camel-k-project
运行
oc apply
命令将自定义 Kamelet 作为资源添加到命名空间中:oc apply -f <custom-kamelet-filename>
例如,使用以下命令添加位于当前目录中的
custom-sink.kamelet.yaml
文件:oc apply -f custom-sink.kamelet.yaml
要验证 Kamelet 作为一个资源可用,请使用以下命令查看当前命名空间中的所有 Kamelets 的字母顺序列表,然后查找您的自定义 Kamelet:
oc get kamelets
1.2.1.2. 确定 Kamelet 的配置参数
在 Kamelet Binding 中,当您添加对 Kamelet 的引用时,您可以指定 Kamelet 的名称并配置 Kamelet 的参数。
前提条件
- 您已在工作命名空间或集群范围内安装了 Camel K operator。
流程
要确定 Kamelet 的名称和参数:
- 在终端窗口中,登录到您的 OpenShift 集群。
打开 Kamelet 的 YAML 文件:
oc describe kamelets/<kamelet-name>
例如,要查看
ftp-source
Kamelet 的代码,如果当前命名空间中安装了 Camel K operator,请使用以下命令:oc describe kamelets/ftp-source
如果 Camel K Operator 在 cluster-mode 中安装,使用以下命令:
oc describe -n openshift-operators kamelets/ftp-source
在 YAML 文件中,向下滚动到
spec.definition
部分(使用 JSON-schema 格式编写),以查看 Kamelet 的属性列表。在本节的末尾,必填字段列出了在引用 Kamelet 时必须配置的属性。例如,以下代码是
ftp-source
Kamelet 的spec.definition
部分的摘录。本节详细介绍了所有 Kamelet 的配置属性。此 Kamelet 的必要属性是connectionHost
,connectionPort
,username
,password
, 和directoryName
:spec: definition: title: "FTP Source" description: |- Receive data from an FTP Server. required: - connectionHost - connectionPort - username - password - directoryName type: object properties: connectionHost: title: Connection Host description: Hostname of the FTP server type: string connectionPort: title: Connection Port description: Port of the FTP server type: string default: 21 username: title: Username description: The username to access the FTP server type: string password: title: Password description: The password to access the FTP server type: string format: password x-descriptors: - urn:alm:descriptor:com.tectonic.ui:password directoryName: title: Directory Name description: The starting directory type: string passiveMode: title: Passive Mode description: Sets passive mode connection type: boolean default: false x-descriptors: - 'urn:alm:descriptor:com.tectonic.ui:checkbox' recursive: title: Recursive description: If a directory, will look for files in all the sub-directories as well. type: boolean default: false x-descriptors: - 'urn:alm:descriptor:com.tectonic.ui:checkbox' idempotent: title: Idempotency description: Skip already processed files. type: boolean default: true x-descriptors: - 'urn:alm:descriptor:com.tectonic.ui:checkbox'
spec: definition: title: "FTP Source" description: |- Receive data from an FTP Server. required: - connectionHost - connectionPort - username - password - directoryName type: object properties: connectionHost: title: Connection Host description: Hostname of the FTP server type: string connectionPort: title: Connection Port description: Port of the FTP server type: string default: 21 username: title: Username description: The username to access the FTP server type: string password: title: Password description: The password to access the FTP server type: string format: password x-descriptors: - urn:alm:descriptor:com.tectonic.ui:password directoryName: title: Directory Name description: The starting directory type: string passiveMode: title: Passive Mode description: Sets passive mode connection type: boolean default: false x-descriptors: - 'urn:alm:descriptor:com.tectonic.ui:checkbox' recursive: title: Recursive description: If a directory, will look for files in all the sub-directories as well. type: boolean default: false x-descriptors: - 'urn:alm:descriptor:com.tectonic.ui:checkbox' idempotent: title: Idempotency description: Skip already processed files. type: boolean default: true x-descriptors: - 'urn:alm:descriptor:com.tectonic.ui:checkbox'
Copy to Clipboard Copied!
另请参阅
1.2.2. 在 Kamelet Binding 中连接源和接收器组件
在 Kamelet Binding 中,您连接源和接收器组件。
此流程中的示例使用以下 Kamelets,如图 1.2 所示:
-
示例源 Kamelet 名为
coffee-source
。这个简单的 Kamelet 从网站目录中检索有关 coffee 类型的随机生成数据。它有一个参数(句点
-整数值
),它决定了检索 coffee 数据的频率(以秒为单位)。不需要该参数,因为有一个默认值(1000 秒)。 -
sink Kamelet 示例名为
log-sink
。它检索数据并将其输出到日志文件。log-sink
Kamelet 在 Kamelet Catalog 中提供。

图 1.2: Kamelet Binding 示例
先决条件
- 您知道如何创建和编辑 Camel K 集成。
- Red Hat Integration - Camel K operator 安装在 OpenShift 命名空间或集群中,您已下载 Red Hat Integration Camel K CLI 工具,如 安装 Camel K 所述。
- 您知道您要添加到 Camel K 集成中的 Kamelets 及其所需实例参数。
要使用的 Kamelets 包括在 Kamelet 目录中。
在本例中,
log-sink
Kamelet 在 Kamelet Catalog 中提供。如果要在本示例中使用 source Kamelet,将coffee-source
代码复制并 保存到名为coffee-source.kamelet.yaml
的本地文件中,然后运行以下命令将其添加到 Kamelet Catalog 中:oc apply -f coffee-source.kamelet.yaml
流程
- 登录到您的 OpenShift 集群。
打开安装 Camel K operator 的工作项目。如果您在 cluster-mode 中安装 Camel K operator,则它可供集群中的任何项目使用。
例如,要打开名为
my-camel-k-project
的现有项目:oc project my-camel-k-project
使用以下选项之一创建一个新的 Kamelet Binding:
-
使用
kamel bind
命令创建并运行 Kamelet Binding (此选项对于命令行定义是 conducive 的简单 Kamelet Bindings 非常有用) 创建一个 YAML 文件来定义 Kamelet Binding,然后使用
oc apply
命令运行它(此选项在 Kamelet Binding 配置更为复杂时很有用)。使用 kamel bind 命令创建一个新的 Kamelet Binding
使用以下
kamel 绑定
语法指定 source 和 sink Kamelets 以及任何配置参数:kamel bind <kamelet-source> -p “<property>=<property-value>” <kamelet-sink> -p “<property>=<property-value>”
kamel bind <kamelet-source> -p “<property>=<property-value>” <kamelet-sink> -p “<property>=<property-value>”
Copy to Clipboard Copied! 例如:
kamel bind coffee-source -p “source.period=5000” log-sink -p "sink.showStreams=true"
kamel bind coffee-source -p “source.period=5000” log-sink -p "sink.showStreams=true"
Copy to Clipboard Copied! Camel K operator 生成一个
KameletBinding
资源,并运行对应的 Camel K 集成。使用 YAML 文件创建一个新的 Kamelet Binding
在您选择的编辑器中,创建一个具有以下结构的 YAML 文件:
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: spec: source: sink:
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: spec: source: sink:
Copy to Clipboard Copied! 为 Kamelet Binding 添加一个名称。
在本例中,名称是
coffee-to-log
,因为绑定将coffee-source
Kamelet 连接到log-sink
Kamelet。apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffee-to-log spec: source: sink:
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffee-to-log spec: source: sink:
Copy to Clipboard Copied! 指定源 Kamelet (如
coffee-source
)并为 Kamelet 配置任何参数。注: 在本例中,参数在 Kamelet Binding 的 YAML 文件中定义。另外,您可以在属性文件、ConfigMap 或 Secret 中配置 Kamelet 的参数,如 配置 Kamelet 实例参数 中所述。
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffee-to-log spec: source: ref kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: coffee-source properties: period: 5000 sink:
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffee-to-log spec: source: ref kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: coffee-source properties: period: 5000 sink:
Copy to Clipboard Copied! 指定 sink Kamelet (如
log-sink
)并为 Kamelet 配置任何参数。使用log-sink
Kamelet 的可选showStreams
参数来显示消息正文。apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffee-to-log spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: coffee-source properties: period: 5000 sink: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: log-sink properties: showStreams: true
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffee-to-log spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: coffee-source properties: period: 5000 sink: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: log-sink properties: showStreams: true
Copy to Clipboard Copied! -
保存 YAML 文件(例如,
coffee-to-log.yaml
)。 将
KameletBinding
作为资源添加到 OpenShift 命名空间中:oc apply -f <kamelet-binding>.yaml
例如:
oc apply -f coffee-to-log.yaml
Camel K operator 使用
KameletBinding
资源生成并运行 Camel K 集成。
-
使用
查看 Kamelet Binding 的状态:
oc get kameletbindings
-
要查看相应集成的状态:
oc get integrations
查看输出:
要从命令行查看日志,请打开终端窗口,然后输入以下命令:
kamel log <integration-name>
例如,如果集成名称为
coffee-to-log
,请使用以下命令:kamel log coffee-to-log
查看 OpenShift Web 控制台的日志:
- 选择 Workloads > Pods。
单击 Camel K integration 的 pod 的名称,然后单击 Logs。
您应该看到类似以下示例的 coffee 事件列表:
INFO [log-sink-E80C5C904418150-0000000000000001] (Camel (camel-1) thread #0 - timer://tick) {"id":7259,"uid":"a4ecb7c2-05b8-4a49-b0d2-d1e8db5bc5e2","blend_name":"Postmodern Symphony","origin":"Huila, Colombia","variety":"Kona","notes":"delicate, chewy, black currant, red apple, star fruit","intensifier":"balanced"}
INFO [log-sink-E80C5C904418150-0000000000000001] (Camel (camel-1) thread #0 - timer://tick) {"id":7259,"uid":"a4ecb7c2-05b8-4a49-b0d2-d1e8db5bc5e2","blend_name":"Postmodern Symphony","origin":"Huila, Colombia","variety":"Kona","notes":"delicate, chewy, black currant, red apple, star fruit","intensifier":"balanced"}
Copy to Clipboard Copied!
要停止集成,请删除 Kamelet Binding:
oc delete kameletbindings/<kameletbinding-name>
例如:
oc delete kameletbindings/coffee-to-log
后续步骤
(可选):
- 添加 action Kamelets 作为中间步骤,如 向 Kamelet Binding 添加操作 中所述。
- 向 Kamelet Binding 添加错误处理,如 将错误处理程序策略添加到 Kamelet Binding 中所述。
1.2.3. 配置 Kamelet 实例参数
引用 Kamelet 时,您可以选择定义 Kamelet 的实例参数:
直接在指定 Kamelet URI 的 Kamelet Binding 中。在以下示例中,由 Telegram BotFather 提供的 bot 授权令牌是
123456
:from("kamelet:telegram-source?authorizationToken=123456")
全局配置 Kamelet 属性(因此您不必使用以下格式在 URI 中提供值):
"camel.kamelet.<kamelet-name>.<property-name>=<value>”
如使用 Camel K 开发和管理集成 中的 配置 Camel K 集成 章节中所述,您可以通过以下方式配置 Kamelet 参数:
- 将它们定义为属性
- 在属性文件中定义它们
- 在 OpenShift ConfigMap 或 Secret 中定义它们
另请参阅
1.2.4. 连接到事件频道
Kamelets 的最常见用例是使用 Kamelet Binding 来将它们连接到事件频道: Kafka 主题或 Knative 目的地(频道或代理)。这样做的好处在于,数据源和接收器相互独立,并"不感知"。这种分离允许单独开发和管理您的业务场景中的组件。如果您有多个数据接收器和源作为业务场景的一部分,则分离各种组件变得更为重要。例如,如果需要关闭事件 sink,事件源不受影响。另外,如果其他 sink 使用相同的源,则不会受到影响。
图 1.3 演示了将源和接收器 Kamelets 连接到事件频道的流。

图 1.3:将源和接收器 Kamelets 连接到事件频道
如果您使用 Apache Kafka stream-processing 框架,有关如何连接到 Kafka 主题的详情,请参阅 连接到 Kafka with Kamelets。
如果使用 Knative 无服务器框架,了解如何连接到 Knative 目标(频道或代理),请参阅 连接到 Knative with Kamelets。
1.2.5. 连接到显式 Camel URI
您可以创建一个 Kamelet Binding,其中 Kamelet 将事件发送到或从显式 Camel URI 接收事件。通常,您可以将源 Kamelet 绑定到可接收事件的 URI (即,您可以在 Kamelet Binding 中将 URI 指定为 sink)。接收事件的 Camel URI 示例是 HTTP 或 HTTPS 端点。
也可以将 URI 指定为 Kamelet Binding 中的源,但并不常见。发送事件的 Camel URI 示例是计时器、邮件或 FTP 端点。
要将 Kamelet 连接到 Camel URI,请按照 Kamelet Binding 和 sink.uri
字段中的 Connecting source 和 sink 组件 中的步骤进行操作,而不是 Kamelet,指定一个显式 Camel URI。
在以下示例中,sink 的 URI 是一个虚构 URI (https://mycompany.com/event-service):
apiVersion: camel.apache.org/v1alpha1 kind: KameletBinding metadata: name: coffee-to-event-service spec: source: ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: coffee-source properties: period: 5000 sink: uri: https://mycompany.com/event-service
apiVersion: camel.apache.org/v1alpha1
kind: KameletBinding
metadata:
name: coffee-to-event-service
spec:
source:
ref:
kind: Kamelet
apiVersion: camel.apache.org/v1alpha1
name: coffee-source
properties:
period: 5000
sink:
uri: https://mycompany.com/event-service