5.2. 事件源
5.2.1. 事件源
Knative 事件源可以是生成或导入云事件的任何 Kubernetes 对象,并将这些事件转发到另一个端点,称为接收器(sink)。事件源对于开发对事件做出反应的分布式系统至关重要。
您可以使用 OpenShift Container Platform Web 控制台、Knative (kn
) CLI 或应用 YAML 文件的 Developer 视角创建和管理 Knative 事件源。
目前,OpenShift Serverless 支持以下事件源类型:
您还可以创建自定义事件源。
5.2.2. Administrator 视角中的事件源
事件源对于开发对事件做出反应的分布式系统至关重要。
5.2.2.1. 使用 Administrator 视角创建事件源
Knative 事件源可以是生成或导入云事件的任何 Kubernetes 对象,并将这些事件转发到另一个端点,称为接收器(sink)。
先决条件
- OpenShift Serverless Operator 和 Knative Eventing 已安装在 OpenShift Container Platform 集群中。
- 您已登录到 Web 控制台,且处于 Administrator 视角。
- 具有集群管理员 OpenShift Container Platform 的权限。
流程
-
在 OpenShift Container Platform Web 控制台的 Administrator 视角中,导航到 Serverless
Eventing。 - 在 Create 列表中,选择 Event Source。您将被定向到 Event Sources 页面。
- 选择您要创建的事件源类型。
5.2.3. 创建 API 服务器源
API 服务器源是一个事件源,可用于将事件接收器(sink),如 Knative 服务,连接到 Kubernetes API 服务器。API 服务器源监视 Kubernetes 事件并将其转发到 Knative Eventing 代理。
5.2.3.1. 使用 Web 控制台创建 API 服务器源
在集群中安装 Knative Eventing 后,您可以使用 web 控制台创建 API 服务器源。使用 OpenShift Container Platform Web 控制台提供了一个简化且直观的用户界面来创建事件源。
先决条件
- 已登陆到 OpenShift Container Platform Web 控制台。
- 在集群中安装了 OpenShift Serverless Operator 和 Knative Eventing。
- 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
-
已安装 OpenShift CLI(
oc
)。
如果要重新使用现有服务帐户,您可以修改现有的 ServiceAccount
资源,使其包含所需的权限,而不是创建新资源。
以 YAML 文件形式,为事件源创建服务帐户、角色和角色绑定:
apiVersion: v1 kind: ServiceAccount metadata: name: events-sa namespace: default 1 --- apiVersion: rbac.authorization.k8s.io/v1 kind: Role metadata: name: event-watcher namespace: default 2 rules: - apiGroups: - "" resources: - events verbs: - get - list - watch --- apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding metadata: name: k8s-ra-event-watcher namespace: default 3 roleRef: apiGroup: rbac.authorization.k8s.io kind: Role name: event-watcher subjects: - kind: ServiceAccount name: events-sa namespace: default 4
应用 YAML 文件:
$ oc apply -f <filename>
-
在 Developer 视角中,导航到 +Add
Event Source。此时会显示 Event Sources 页面。 - 可选:如果您的事件源有多个供应商,请从 Providers 列表中选择所需的供应商,以过滤供应商的可用事件源。
- 选择 ApiServerSource,然后点 Create Event Source。此时会显示 Create Event Source 页面。
使用 Form view 或 YAML view 配置 ApiServerSource 设置:
注意您可以在 Form view 和 YAML view 间进行切换。在不同视图间切换时数据会被保留。
-
输入
v1
作为 APIVERSION 和Event
作为 KIND。 - 为您创建的服务帐户选择 Service Account Name。
- 为事件源选择 Sink。Sink 可以是一个 资源,如频道、代理或服务,也可以是一个 URI。
-
输入
- 点 Create。
验证
创建 API 服务器源后,您会在 Topology 视图中看到它连接到接收器的服务。
如果使用 URI sink,请右键点击 URI sink
删除 API 服务器源
- 导航到 Topology 视图。
右键点击 API 服务器源并选择 Delete ApiServerSource。
5.2.3.2. 使用 Knative CLI 创建 API 服务器源
您可以使用 kn source apiserver create
命令,使用 kn
CLI 创建 API 服务器源。使用 kn
CLI 创建 API 服务器源可提供比直接修改 YAML 文件更精简且直观的用户界面。
先决条件
- 在集群中安装了 OpenShift Serverless Operator 和 Knative Eventing。
- 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
-
已安装 OpenShift CLI(
oc
)。 -
已安装 Knative (
kn
) CLI。
如果要重新使用现有服务帐户,您可以修改现有的 ServiceAccount
资源,使其包含所需的权限,而不是创建新资源。
以 YAML 文件形式,为事件源创建服务帐户、角色和角色绑定:
apiVersion: v1 kind: ServiceAccount metadata: name: events-sa namespace: default 1 --- apiVersion: rbac.authorization.k8s.io/v1 kind: Role metadata: name: event-watcher namespace: default 2 rules: - apiGroups: - "" resources: - events verbs: - get - list - watch --- apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding metadata: name: k8s-ra-event-watcher namespace: default 3 roleRef: apiGroup: rbac.authorization.k8s.io kind: Role name: event-watcher subjects: - kind: ServiceAccount name: events-sa namespace: default 4
应用 YAML 文件:
$ oc apply -f <filename>
创建具有事件 sink 的 API 服务器源。在以下示例中,sink 是一个代理:
$ kn source apiserver create <event_source_name> --sink broker:<broker_name> --resource "event:v1" --service-account <service_account_name> --mode Resource
要检查 API 服务器源是否已正确设置,请创建一个 Knative 服务,在日志中转储传入的信息:
$ kn service create <service_name> --image quay.io/openshift-knative/knative-eventing-sources-event-display:latest
如果您使用代理作为事件 sink,请创建一个触发器将事件从
default
代理过滤到服务:$ kn trigger create <trigger_name> --sink ksvc:<service_name>
通过在 default 命名空间中启动 pod 来创建事件:
$ oc create deployment hello-node --image quay.io/openshift-knative/knative-eventing-sources-event-display:latest
通过检查以下命令生成的输出来检查是否正确映射了控制器:
$ kn source apiserver describe <source_name>
输出示例
Name: mysource Namespace: default Annotations: sources.knative.dev/creator=developer, sources.knative.dev/lastModifier=developer Age: 3m ServiceAccountName: events-sa Mode: Resource Sink: Name: default Namespace: default Kind: Broker (eventing.knative.dev/v1) Resources: Kind: event (v1) Controller: false Conditions: OK TYPE AGE REASON ++ Ready 3m ++ Deployed 3m ++ SinkProvided 3m ++ SufficientPermissions 3m ++ EventTypesProvided 3m
验证
您可以通过查看消息转储程序功能日志来验证 Kubernetes 事件是否已发送到 Knative。
获取 pod:
$ oc get pods
查看 pod 的消息转储程序功能日志:
$ oc logs $(oc get pod -o name | grep event-display) -c user-container
输出示例
☁️ cloudevents.Event Validation: valid Context Attributes, specversion: 1.0 type: dev.knative.apiserver.resource.update datacontenttype: application/json ... Data, { "apiVersion": "v1", "involvedObject": { "apiVersion": "v1", "fieldPath": "spec.containers{hello-node}", "kind": "Pod", "name": "hello-node", "namespace": "default", ..... }, "kind": "Event", "message": "Started container", "metadata": { "name": "hello-node.159d7608e3a3572c", "namespace": "default", .... }, "reason": "Started", ... }
删除 API 服务器源
删除触发器:
$ kn trigger delete <trigger_name>
删除事件源:
$ kn source apiserver delete <source_name>
删除服务帐户、集群角色和集群绑定:
$ oc delete -f authentication.yaml
5.2.3.2.1. Knative CLI sink 标记
当使用 Knative (kn
) CLI 创建事件源时,您可以使用 --sink
标志指定事件从该资源发送到的接收器。sink 可以是任何可寻址或可调用的资源,可以从其他资源接收传入的事件。
以下示例创建使用服务 http://event-display.svc.cluster.local
的接收器绑定作为接收器:
使用 sink 标记的命令示例
$ kn source binding create bind-heartbeat \
--namespace sinkbinding-example \
--subject "Job:batch/v1:app=heartbeat-cron" \
--sink http://event-display.svc.cluster.local \ 1
--ce-override "sink=bound"
- 1
http://event-display.svc.cluster.local
中的svc
确定接收器是一个 Knative 服务。其他默认的接收器前缀包括channel
和broker
。
5.2.3.3. 使用 YAML 文件创建 API 服务器源
使用 YAML 文件创建 Knative 资源使用声明性 API,它允许您以可重复的方式描述事件源。要使用 YAML 创建 API 服务器源,您必须创建一个 YAML 文件来定义 ApiServerSource
对象,然后使用 oc apply
命令应用它。
先决条件
- 在集群中安装了 OpenShift Serverless Operator 和 Knative Eventing。
- 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
-
您已在与 API 服务器源 YAML 文件中定义的相同的命名空间中创建
default
代理。 -
安装 OpenShift CLI (
oc
) 。
如果要重新使用现有服务帐户,您可以修改现有的 ServiceAccount
资源,使其包含所需的权限,而不是创建新资源。
以 YAML 文件形式,为事件源创建服务帐户、角色和角色绑定:
apiVersion: v1 kind: ServiceAccount metadata: name: events-sa namespace: default 1 --- apiVersion: rbac.authorization.k8s.io/v1 kind: Role metadata: name: event-watcher namespace: default 2 rules: - apiGroups: - "" resources: - events verbs: - get - list - watch --- apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding metadata: name: k8s-ra-event-watcher namespace: default 3 roleRef: apiGroup: rbac.authorization.k8s.io kind: Role name: event-watcher subjects: - kind: ServiceAccount name: events-sa namespace: default 4
应用 YAML 文件:
$ oc apply -f <filename>
将 API 服务器源创建为 YAML 文件:
apiVersion: sources.knative.dev/v1alpha1 kind: ApiServerSource metadata: name: testevents spec: serviceAccountName: events-sa mode: Resource resources: - apiVersion: v1 kind: Event sink: ref: apiVersion: eventing.knative.dev/v1 kind: Broker name: default
应用
ApiServerSource
YAML 文件:$ oc apply -f <filename>
要检查 API 服务器源是否已正确设置,请创建一个 Knative 服务作为 YAML 文件,在日志中转储传入的信息:
apiVersion: serving.knative.dev/v1 kind: Service metadata: name: event-display namespace: default spec: template: spec: containers: - image: quay.io/openshift-knative/knative-eventing-sources-event-display:latest
应用
Service
YAML 文件:$ oc apply -f <filename>
创建一个
Trigger
对象作为一个 YAML 文件,该文件将事件从default
代理过滤到上一步中创建的服务:apiVersion: eventing.knative.dev/v1 kind: Trigger metadata: name: event-display-trigger namespace: default spec: broker: default subscriber: ref: apiVersion: serving.knative.dev/v1 kind: Service name: event-display
应用
Trigger
YAML 文件:$ oc apply -f <filename>
通过在 default 命名空间中启动 pod 来创建事件:
$ oc create deployment hello-node --image=quay.io/openshift-knative/knative-eventing-sources-event-display
输入以下命令并检查输出,检查是否正确映射了控制器:
$ oc get apiserversource.sources.knative.dev testevents -o yaml
输出示例
apiVersion: sources.knative.dev/v1alpha1 kind: ApiServerSource metadata: annotations: creationTimestamp: "2020-04-07T17:24:54Z" generation: 1 name: testevents namespace: default resourceVersion: "62868" selfLink: /apis/sources.knative.dev/v1alpha1/namespaces/default/apiserversources/testevents2 uid: 1603d863-bb06-4d1c-b371-f580b4db99fa spec: mode: Resource resources: - apiVersion: v1 controller: false controllerSelector: apiVersion: "" kind: "" name: "" uid: "" kind: Event labelSelector: {} serviceAccountName: events-sa sink: ref: apiVersion: eventing.knative.dev/v1 kind: Broker name: default
验证
要验证 Kubernetes 事件是否已发送到 Knative,您可以查看消息转储程序功能日志。
输入以下命令来获取 pod:
$ oc get pods
输入以下命令来查看 pod 的消息转储程序功能日志:
$ oc logs $(oc get pod -o name | grep event-display) -c user-container
输出示例
☁️ cloudevents.Event Validation: valid Context Attributes, specversion: 1.0 type: dev.knative.apiserver.resource.update datacontenttype: application/json ... Data, { "apiVersion": "v1", "involvedObject": { "apiVersion": "v1", "fieldPath": "spec.containers{hello-node}", "kind": "Pod", "name": "hello-node", "namespace": "default", ..... }, "kind": "Event", "message": "Started container", "metadata": { "name": "hello-node.159d7608e3a3572c", "namespace": "default", .... }, "reason": "Started", ... }
删除 API 服务器源
删除触发器:
$ oc delete -f trigger.yaml
删除事件源:
$ oc delete -f k8s-events.yaml
删除服务帐户、集群角色和集群绑定:
$ oc delete -f authentication.yaml
5.2.4. 创建 ping 源
ping 源是一个事件源,可用于定期向事件消费者发送带有恒定有效负载的 ping 事件。ping 源可以用来调度发送事件,类似于计时器。
5.2.4.1. 使用 Web 控制台创建 ping 源
在集群中安装 Knative Eventing 后,您可以使用 web 控制台创建 ping 源。使用 OpenShift Container Platform Web 控制台提供了一个简化且直观的用户界面来创建事件源。
先决条件
- 已登陆到 OpenShift Container Platform Web 控制台。
- 在集群中安装了 OpenShift Serverless Operator、Knative Serving 和 Knative Eventing。
- 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
流程
要验证 ping 源是否可以工作,请创建一个简单的 Knative 服务,在服务日志中转储传入的信息。
-
在 Developer 视角中,导航到 +Add
YAML。 复制 YAML 示例:
apiVersion: serving.knative.dev/v1 kind: Service metadata: name: event-display spec: template: spec: containers: - image: quay.io/openshift-knative/knative-eventing-sources-event-display:latest
- 点 Create。
-
在 Developer 视角中,导航到 +Add
在与上一步中创建的服务相同的命名空间中创建一个 ping 源,或您要将事件发送到的任何其他接收器。
-
在 Developer 视角中,导航到 +Add
Event Source。此时会显示 Event Sources 页面。 - 可选:如果您的事件源有多个供应商,请从 Providers 列表中选择所需的供应商,以过滤供应商的可用事件源。
选择 Ping Source,然后点击 Create Event Source。此时会显示 Create Event Source 页面。
注意您可以使用 Form view 或 YAML view 配置 PingSource 设置,并可以在两者间切换。在不同视图间切换时数据会被保留。
-
为 Schedule 输入一个值。在本例中,值为
*/2 * * *
,它会创建一个 PingSource,每两分钟发送一条消息。 - 可选:您可以为 Data 输入一个值,它是消息的有效负载。
-
选择一个 Sink。这可以是 Resource 或一个 URI。在这个示例中,上一步中创建的
event-display
服务被用作 Resource sink。 - 点 Create。
-
在 Developer 视角中,导航到 +Add
验证
您可以通过查看 Topology 页面来验证 ping 源是否已创建并连接到接收器。
- 在 Developer 视角中,导航到 Topology。
查看 ping 源和接收器。
删除 ping 源
- 导航到 Topology 视图。
- 右键单击 API 服务器源,再选择 Delete Ping Source。
5.2.4.2. 使用 Knative CLI 创建 ping 源
您可以使用 kn source ping create
命令,通过 Knative (kn
) CLI 创建 ping 源。使用 Knative CLI 创建事件源提供了比直接修改 YAML 文件更精简且直观的用户界面。
先决条件
- 在集群中安装了 OpenShift Serverless Operator、Knative Serving 和 Knative Eventing。
-
已安装 Knative (
kn
) CLI。 - 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
-
可选: 如果要使用此流程验证步骤,请安装 OpenShift CLI (
oc
) 。
流程
要验证 ping 源是否可以工作,请创建一个简单的 Knative 服务,在服务日志中转储传入的信息:
$ kn service create event-display \ --image quay.io/openshift-knative/knative-eventing-sources-event-display:latest
对于您要请求的每一组 ping 事件,请在与事件消费者相同的命名空间中创建一个 ping 源:
$ kn source ping create test-ping-source \ --schedule "*/2 * * * *" \ --data '{"message": "Hello world!"}' \ --sink ksvc:event-display
输入以下命令并检查输出,检查是否正确映射了控制器:
$ kn source ping describe test-ping-source
输出示例
Name: test-ping-source Namespace: default Annotations: sources.knative.dev/creator=developer, sources.knative.dev/lastModifier=developer Age: 15s Schedule: */2 * * * * Data: {"message": "Hello world!"} Sink: Name: event-display Namespace: default Resource: Service (serving.knative.dev/v1) Conditions: OK TYPE AGE REASON ++ Ready 8s ++ Deployed 8s ++ SinkProvided 15s ++ ValidSchedule 15s ++ EventTypeProvided 15s ++ ResourcesCorrect 15s
验证
您可以通过查看 sink pod 的日志来验证 Kubernetes 事件是否已发送到 Knative 事件。
默认情况下,如果在 60 秒内都没有流量,Knative 服务会终止其 Pod。本指南中演示的示例创建了一个 ping 源,每 2 分钟发送一条消息,因此每个消息都应该在新创建的 pod 中观察到。
查看新创建的 pod:
$ watch oc get pods
使用 Ctrl+C 取消查看 pod,然后查看所创建 pod 的日志:
$ oc logs $(oc get pod -o name | grep event-display) -c user-container
输出示例
☁️ cloudevents.Event Validation: valid Context Attributes, specversion: 1.0 type: dev.knative.sources.ping source: /apis/v1/namespaces/default/pingsources/test-ping-source id: 99e4f4f6-08ff-4bff-acf1-47f61ded68c9 time: 2020-04-07T16:16:00.000601161Z datacontenttype: application/json Data, { "message": "Hello world!" }
删除 ping 源
删除 ping 源:
$ kn delete pingsources.sources.knative.dev <ping_source_name>
5.2.4.2.1. Knative CLI sink 标记
当使用 Knative (kn
) CLI 创建事件源时,您可以使用 --sink
标志指定事件从该资源发送到的接收器。sink 可以是任何可寻址或可调用的资源,可以从其他资源接收传入的事件。
以下示例创建使用服务 http://event-display.svc.cluster.local
的接收器绑定作为接收器:
使用 sink 标记的命令示例
$ kn source binding create bind-heartbeat \
--namespace sinkbinding-example \
--subject "Job:batch/v1:app=heartbeat-cron" \
--sink http://event-display.svc.cluster.local \ 1
--ce-override "sink=bound"
- 1
http://event-display.svc.cluster.local
中的svc
确定接收器是一个 Knative 服务。其他默认的接收器前缀包括channel
和broker
。
5.2.4.3. 使用 YAML 创建 ping 源
使用 YAML 文件创建 Knative 资源使用声明性 API,它允许您以可重复的方式描述事件源。要使用 YAML 创建无服务器 ping 源,您必须创建一个 YAML 文件来定义 PingSource
对象,然后使用 oc apply
来应用它。
PingSource
对象示例
apiVersion: sources.knative.dev/v1 kind: PingSource metadata: name: test-ping-source spec: schedule: "*/2 * * * *" 1 data: '{"message": "Hello world!"}' 2 sink: 3 ref: apiVersion: serving.knative.dev/v1 kind: Service name: event-display
先决条件
- 在集群中安装了 OpenShift Serverless Operator、Knative Serving 和 Knative Eventing。
-
安装 OpenShift CLI (
oc
) 。 - 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
流程
要验证 ping 源是否可以工作,请创建一个简单的 Knative 服务,在服务日志中转储传入的信息。
创建服务 YAML 文件:
apiVersion: serving.knative.dev/v1 kind: Service metadata: name: event-display spec: template: spec: containers: - image: quay.io/openshift-knative/knative-eventing-sources-event-display:latest
创建服务:
$ oc apply -f <filename>
对于您要请求的每一组 ping 事件,请在与事件消费者相同的命名空间中创建一个 ping 源。
为 ping 源创建 YAML 文件:
apiVersion: sources.knative.dev/v1 kind: PingSource metadata: name: test-ping-source spec: schedule: "*/2 * * * *" data: '{"message": "Hello world!"}' sink: ref: apiVersion: serving.knative.dev/v1 kind: Service name: event-display
创建 ping 源:
$ oc apply -f <filename>
输入以下命令检查是否正确映射了控制器:
$ oc get pingsource.sources.knative.dev <ping_source_name> -oyaml
输出示例
apiVersion: sources.knative.dev/v1 kind: PingSource metadata: annotations: sources.knative.dev/creator: developer sources.knative.dev/lastModifier: developer creationTimestamp: "2020-04-07T16:11:14Z" generation: 1 name: test-ping-source namespace: default resourceVersion: "55257" selfLink: /apis/sources.knative.dev/v1/namespaces/default/pingsources/test-ping-source uid: 3d80d50b-f8c7-4c1b-99f7-3ec00e0a8164 spec: data: '{ value: "hello" }' schedule: '*/2 * * * *' sink: ref: apiVersion: serving.knative.dev/v1 kind: Service name: event-display namespace: default
验证
您可以通过查看 sink pod 的日志来验证 Kubernetes 事件是否已发送到 Knative 事件。
默认情况下,如果在 60 秒内都没有流量,Knative 服务会终止其 Pod。本指南中演示的示例创建了一个 PingSource,每 2 分钟发送一条消息,因此每个消息都应该在新创建的 pod 中观察到。
查看新创建的 pod:
$ watch oc get pods
使用 Ctrl+C 取消查看 pod,然后查看所创建 pod 的日志:
$ oc logs $(oc get pod -o name | grep event-display) -c user-container
输出示例
☁️ cloudevents.Event Validation: valid Context Attributes, specversion: 1.0 type: dev.knative.sources.ping source: /apis/v1/namespaces/default/pingsources/test-ping-source id: 042ff529-240e-45ee-b40c-3a908129853e time: 2020-04-07T16:22:00.000791674Z datacontenttype: application/json Data, { "message": "Hello world!" }
删除 ping 源
删除 ping 源:
$ oc delete -f <filename>
示例命令
$ oc delete -f ping-source.yaml
5.2.5. Kafka 源
您可以创建一个 Kafka 源从 Apache Kafka 集群中读取事件,并将这些事件传递给接收器。您可以使用 OpenShift Container Platform web 控制台、Knative (kn
) CLI 或直接创建 KafkaSource
对象并使用 OpenShift CLI (oc
) 创建 Kafka 源来应用它。
5.2.5.1. 使用 Web 控制台创建 Kafka 事件源
在集群中安装了 Knative Kafka 后,您可以使用 Web 控制台创建 Kafka 源。使用 OpenShift Container Platform Web 控制台提供了一个简化的用户界面来创建 Kafka 源。
先决条件
-
OpenShift Serverless Operator、Knative Eventing 和
KnativeKafka
自定义资源已安装在集群中。 - 已登陆到 web 控制台。
- 您可以访问 Red Hat AMQ Streams(Kafka)集群,该集群会生成您要导入的 Kafka 信息。
- 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
流程
- 在 Developer 视角中,进入到 +Add 页面并选择 Event Source。
- 在 Event Sources 页面中,在 Type 部分选择 Kafka Source。
配置 Kafka Source 设置:
- 添加用逗号分开的 Bootstrap 服务器列表。
- 添加以逗号分隔的标题列表。
- 添加一个 消费者组。
- 为您创建的服务帐户选择 Service Account Name。
- 为事件源选择 Sink。Sink 可以是一个 资源,如频道、代理或服务,也可以是一个 URI。
- 输入 Kafka 事件源的 名称。
- 点 Create。
验证
您可以通过查看 Topology 页面来验证 Kafka 事件源是否已创建并连接到接收器。
- 在 Developer 视角中,导航到 Topology。
查看 Kafka 事件源和接收器。
5.2.5.2. 使用 Knative CLI 创建 Kafka 事件源
您可以使用 kn source kafka create
命令,使用 Knative (kn
) CLI 创建 Kafka 源。使用 Knative CLI 创建事件源提供了比直接修改 YAML 文件更精简且直观的用户界面。
先决条件
-
OpenShift Serverless Operator、Knative Eventing、Knative Serving 和
KnativeKafka
自定义资源(CR)已安装在集群中。 - 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
- 您可以访问 Red Hat AMQ Streams(Kafka)集群,该集群会生成您要导入的 Kafka 信息。
-
已安装 Knative (
kn
) CLI。 -
可选:如果您想要使用此流程中的验证步骤,已安装 OpenShift CLI (
oc
)。
流程
要验证 Kafka 事件源是否可以工作,请创建一个 Knative 服务,在服务日志中转储传入的事件:
$ kn service create event-display \ --image quay.io/openshift-knative/knative-eventing-sources-event-display
创建
KafkaSource
CR:$ kn source kafka create <kafka_source_name> \ --servers <cluster_kafka_bootstrap>.kafka.svc:9092 \ --topics <topic_name> --consumergroup my-consumer-group \ --sink event-display
注意将此命令中的占位符值替换为源名称、引导服务器和主题的值。
--servers
、--topics
和--consumergroup
选项指定到 Kafka 集群的连接参数。--consumergroup
选项是可选的。可选:查看您创建的
KafkaSource
CR 的详情:$ kn source kafka describe <kafka_source_name>
输出示例
Name: example-kafka-source Namespace: kafka Age: 1h BootstrapServers: example-cluster-kafka-bootstrap.kafka.svc:9092 Topics: example-topic ConsumerGroup: example-consumer-group Sink: Name: event-display Namespace: default Resource: Service (serving.knative.dev/v1) Conditions: OK TYPE AGE REASON ++ Ready 1h ++ Deployed 1h ++ SinkProvided 1h
验证步骤
触发 Kafka 实例将信息发送到主题:
$ oc -n kafka run kafka-producer \ -ti --image=quay.io/strimzi/kafka:latest-kafka-2.7.0 --rm=true \ --restart=Never -- bin/kafka-console-producer.sh \ --broker-list <cluster_kafka_bootstrap>:9092 --topic my-topic
在提示符后输入信息。这个命令假设:
-
Kafka 集群安装在
kafka
命名空间中。 -
KafkaSource
对象已被配置为使用my-topic
主题。
-
Kafka 集群安装在
通过查看日志来验证消息是否显示:
$ oc logs $(oc get pod -o name | grep event-display) -c user-container
输出示例
☁️ cloudevents.Event Validation: valid Context Attributes, specversion: 1.0 type: dev.knative.kafka.event source: /apis/v1/namespaces/default/kafkasources/example-kafka-source#example-topic subject: partition:46#0 id: partition:46/offset:0 time: 2021-03-10T11:21:49.4Z Extensions, traceparent: 00-161ff3815727d8755848ec01c866d1cd-7ff3916c44334678-00 Data, Hello!
5.2.5.2.1. Knative CLI sink 标记
当使用 Knative (kn
) CLI 创建事件源时,您可以使用 --sink
标志指定事件从该资源发送到的接收器。sink 可以是任何可寻址或可调用的资源,可以从其他资源接收传入的事件。
以下示例创建使用服务 http://event-display.svc.cluster.local
的接收器绑定作为接收器:
使用 sink 标记的命令示例
$ kn source binding create bind-heartbeat \
--namespace sinkbinding-example \
--subject "Job:batch/v1:app=heartbeat-cron" \
--sink http://event-display.svc.cluster.local \ 1
--ce-override "sink=bound"
- 1
http://event-display.svc.cluster.local
中的svc
确定接收器是一个 Knative 服务。其他默认的接收器前缀包括channel
和broker
。
5.2.5.3. 使用 YAML 创建 Kafka 事件源
使用 YAML 文件创建 Knative 资源使用声明性 API,它允许您以声明性的方式描述应用程序,并以可重复的方式描述应用程序。要使用 YAML 创建 Kafka 源,您必须创建一个 YAML 文件来定义 KafkaSource
对象,然后使用 oc apply
命令应用它。
先决条件
-
OpenShift Serverless Operator、Knative Eventing 和
KnativeKafka
自定义资源已安装在集群中。 - 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
- 您可以访问 Red Hat AMQ Streams(Kafka)集群,该集群会生成您要导入的 Kafka 信息。
-
安装 OpenShift CLI (
oc
) 。
流程
创建
KafkaSource
对象作为 YAML 文件:apiVersion: sources.knative.dev/v1beta1 kind: KafkaSource metadata: name: <source_name> spec: consumerGroup: <group_name> 1 bootstrapServers: - <list_of_bootstrap_servers> topics: - <list_of_topics> 2 sink: - <list_of_sinks> 3
重要仅支持 OpenShift Serverless 上的
KafkaSource
对象的v1beta1
API 版本。不要使用这个 API 的v1alpha1
版本,因为这个版本现已弃用。KafkaSource
对象示例apiVersion: sources.knative.dev/v1beta1 kind: KafkaSource metadata: name: kafka-source spec: consumerGroup: knative-group bootstrapServers: - my-cluster-kafka-bootstrap.kafka:9092 topics: - knative-demo-topic sink: ref: apiVersion: serving.knative.dev/v1 kind: Service name: event-display
应用
KafkaSource
YAML 文件:$ oc apply -f <filename>
验证
输入以下命令验证 Kafka 事件源是否已创建:
$ oc get pods
输出示例
NAME READY STATUS RESTARTS AGE kafkasource-kafka-source-5ca0248f-... 1/1 Running 0 13m
5.2.5.4. 为 Kafka 源配置 SASL 身份验证
Apache Kafka 使用 简单身份验证和安全层 (SASL) 进行身份验证。如果在集群中使用 SASL 身份验证,用户则必须向 Knative 提供凭证才能与 Kafka 集群通信,否则无法生成或消耗事件。
先决条件
- 在 OpenShift Container Platform 上具有集群或专用管理员权限。
-
OpenShift Serverless Operator、Knative Eventing 和
KnativeKafka
CR 已安装在 OpenShift Container Platform 集群中。 - 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
- 您有一个 Kafka 集群的用户名和密码。
-
您已选择使用 SASL 机制,例如
PLAIN
、SCRAM-SHA-256
或SCRAM-SHA-512
。 -
如果启用了 TLS,您还需要 Kafka 集群的
ca.crt
证书文件。 -
已安装 OpenShift (
oc
) CLI。
流程
在所选命名空间中创建证书文件作为 secret:
$ oc create secret -n <namespace> generic <kafka_auth_secret> \ --from-file=ca.crt=caroot.pem \ --from-literal=password="SecretPassword" \ --from-literal=saslType="SCRAM-SHA-512" \ 1 --from-literal=user="my-sasl-user"
- 1
- SASL 类型可以是
PLAIN
、SCRAM-SHA-256
或SCRAM-SHA-512
。
创建或修改 Kafka 源,使其包含以下
spec
配置:apiVersion: sources.knative.dev/v1beta1 kind: KafkaSource metadata: name: example-source spec: ... net: sasl: enable: true user: secretKeyRef: name: <kafka_auth_secret> key: user password: secretKeyRef: name: <kafka_auth_secret> key: password type: secretKeyRef: name: <kafka_auth_secret> key: saslType tls: enable: true caCert: 1 secretKeyRef: name: <kafka_auth_secret> key: ca.crt ...
- 1
- 如果您使用公有云 Kafka 服务,则不需要
caCert
spec,如 Red Hat OpenShift Streams for Apache Kafka。
5.2.6. 自定义事件源
如果您需要从 Knative 中没有包含在 Knative 的事件制作者或发出没有 CloudEvent
格式的事件的制作者中入站事件,您可以通过创建自定义事件源来实现此目标。您可以使用以下方法之一创建自定义事件源:
-
通过创建接收器绑定,将
PodSpecable
对象用作事件源。 - 通过创建容器源,将容器用作事件源。
5.2.6.1. 接收器(sink)绑定
SinkBinding
对象支持将事件产品与交付寻址分离。接收器绑定用于将 事件制作者 连接到事件消费者(sink) 。event producer 是一个 Kubernetes 资源,用于嵌入 PodSpec
模板并生成事件。sink 是一个可寻址的 Kubernetes 对象,可以接收事件。
SinkBinding
对象将环境变量注入到 sink 的 PodTemplateSpec
中,这意味着应用程序代码不需要直接与 Kubernetes API 交互来定位事件目的地。这些环境变量如下:
K_SINK
- 解析 sink 的 URL。
K_CE_OVERRIDES
- 指定出站事件覆盖的 JSON 对象。
SinkBinding
对象目前不支持服务的自定义修订名称。
5.2.6.1.1. 使用 YAML 创建接收器绑定
使用 YAML 文件创建 Knative 资源使用声明性 API,它允许您以可重复的方式描述事件源。要使用 YAML 创建接收器绑定,您必须创建一个 YAML 文件来定义 SinkBinding
对象,然后使用 oc apply
命令应用它。
先决条件
- 在集群中安装了 OpenShift Serverless Operator、Knative Serving 和 Knative Eventing。
-
安装 OpenShift CLI (
oc
) 。 - 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
流程
要检查接收器绑定是否已正确设置,请创建一个 Knative 事件显示服务或事件接收器,在日志中转储传入的信息。
创建服务 YAML 文件:
服务 YAML 文件示例
apiVersion: serving.knative.dev/v1 kind: Service metadata: name: event-display spec: template: spec: containers: - image: quay.io/openshift-knative/knative-eventing-sources-event-display:latest
创建服务:
$ oc apply -f <filename>
创建将事件定向到该服务的接收器绑定实例。
创建接收器绑定 YAML 文件:
服务 YAML 文件示例
apiVersion: sources.knative.dev/v1alpha1 kind: SinkBinding metadata: name: bind-heartbeat spec: subject: apiVersion: batch/v1 kind: Job 1 selector: matchLabels: app: heartbeat-cron sink: ref: apiVersion: serving.knative.dev/v1 kind: Service name: event-display
- 1
- 在本例中,任何具有标签
app: heartbeat-cron
的作业都将被绑定到事件 sink。
创建接收器绑定:
$ oc apply -f <filename>
创建
CronJob
对象。创建 cron 任务 YAML 文件:
Cron Job YAML 文件示例
apiVersion: batch/v1 kind: CronJob metadata: name: heartbeat-cron spec: # Run every minute schedule: "* * * * *" jobTemplate: metadata: labels: app: heartbeat-cron bindings.knative.dev/include: "true" spec: template: spec: restartPolicy: Never containers: - name: single-heartbeat image: quay.io/openshift-knative/heartbeats:latest args: - --period=1 env: - name: ONE_SHOT value: "true" - name: POD_NAME valueFrom: fieldRef: fieldPath: metadata.name - name: POD_NAMESPACE valueFrom: fieldRef: fieldPath: metadata.namespace
重要要使用接收器绑定,您必须手动在 Knative 资源中添加
bindings.knative.dev/include=true
标签。例如,要将此标签添加到
CronJob
资源,请将以下行添加到Job
资源 YAML 定义中:jobTemplate: metadata: labels: app: heartbeat-cron bindings.knative.dev/include: "true"
创建 cron job:
$ oc apply -f <filename>
输入以下命令并检查输出,检查是否正确映射了控制器:
$ oc get sinkbindings.sources.knative.dev bind-heartbeat -oyaml
输出示例
spec: sink: ref: apiVersion: serving.knative.dev/v1 kind: Service name: event-display namespace: default subject: apiVersion: batch/v1 kind: Job namespace: default selector: matchLabels: app: heartbeat-cron
验证
您可以通过查看消息 dumper 功能日志,来验证 Kubernetes 事件是否已发送到 Knative 事件。
输入命令:
$ oc get pods
输入命令:
$ oc logs $(oc get pod -o name | grep event-display) -c user-container
输出示例
☁️ cloudevents.Event Validation: valid Context Attributes, specversion: 1.0 type: dev.knative.eventing.samples.heartbeat source: https://knative.dev/eventing-contrib/cmd/heartbeats/#event-test/mypod id: 2b72d7bf-c38f-4a98-a433-608fbcdd2596 time: 2019-10-18T15:23:20.809775386Z contenttype: application/json Extensions, beats: true heart: yes the: 42 Data, { "id": 1, "label": "" }
5.2.6.1.2. 使用 Knative CLI 创建接收器绑定
您可以使用 kn source binding create
命令通过 Knative (kn
) CLI 创建接收器绑定。使用 Knative CLI 创建事件源提供了比直接修改 YAML 文件更精简且直观的用户界面。
先决条件
- 在集群中安装了 OpenShift Serverless Operator、Knative Serving 和 Knative Eventing。
- 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
-
安装 Knative (
kn
) CLI。 -
安装 OpenShift CLI (
oc
) 。
以下操作过程要求您创建 YAML 文件。
如果更改了示例中使用的 YAML 文件的名称,则需要更新对应的 CLI 命令。
流程
要检查接收器绑定是否已正确设置,请创建一个 Knative 事件显示服务或事件 sink,在日志中转储传入的信息:
$ kn service create event-display --image quay.io/openshift-knative/knative-eventing-sources-event-display:latest
创建将事件定向到该服务的接收器绑定实例:
$ kn source binding create bind-heartbeat --subject Job:batch/v1:app=heartbeat-cron --sink ksvc:event-display
创建
CronJob
对象。创建 cron 任务 YAML 文件:
Cron Job YAML 文件示例
apiVersion: batch/v1 kind: CronJob metadata: name: heartbeat-cron spec: # Run every minute schedule: "* * * * *" jobTemplate: metadata: labels: app: heartbeat-cron bindings.knative.dev/include: "true" spec: template: spec: restartPolicy: Never containers: - name: single-heartbeat image: quay.io/openshift-knative/heartbeats:latest args: - --period=1 env: - name: ONE_SHOT value: "true" - name: POD_NAME valueFrom: fieldRef: fieldPath: metadata.name - name: POD_NAMESPACE valueFrom: fieldRef: fieldPath: metadata.namespace
重要要使用接收器绑定,您必须手动在 Knative CR 中添加
bindings.knative.dev/include=true
标签。例如,要将此标签添加到
CronJob
CR,请将以下行添加到Job
CR YAML 定义中:jobTemplate: metadata: labels: app: heartbeat-cron bindings.knative.dev/include: "true"
创建 cron job:
$ oc apply -f <filename>
输入以下命令并检查输出,检查是否正确映射了控制器:
$ kn source binding describe bind-heartbeat
输出示例
Name: bind-heartbeat Namespace: demo-2 Annotations: sources.knative.dev/creator=minikube-user, sources.knative.dev/lastModifier=minikub ... Age: 2m Subject: Resource: job (batch/v1) Selector: app: heartbeat-cron Sink: Name: event-display Resource: Service (serving.knative.dev/v1) Conditions: OK TYPE AGE REASON ++ Ready 2m
验证
您可以通过查看消息 dumper 功能日志,来验证 Kubernetes 事件是否已发送到 Knative 事件。
您可以输入以下命令来查看消息转储程序功能日志:
$ oc get pods
$ oc logs $(oc get pod -o name | grep event-display) -c user-container
输出示例
☁️ cloudevents.Event Validation: valid Context Attributes, specversion: 1.0 type: dev.knative.eventing.samples.heartbeat source: https://knative.dev/eventing-contrib/cmd/heartbeats/#event-test/mypod id: 2b72d7bf-c38f-4a98-a433-608fbcdd2596 time: 2019-10-18T15:23:20.809775386Z contenttype: application/json Extensions, beats: true heart: yes the: 42 Data, { "id": 1, "label": "" }
5.2.6.1.2.1. Knative CLI sink 标记
当使用 Knative (kn
) CLI 创建事件源时,您可以使用 --sink
标志指定事件从该资源发送到的接收器。sink 可以是任何可寻址或可调用的资源,可以从其他资源接收传入的事件。
以下示例创建使用服务 http://event-display.svc.cluster.local
的接收器绑定作为接收器:
使用 sink 标记的命令示例
$ kn source binding create bind-heartbeat \
--namespace sinkbinding-example \
--subject "Job:batch/v1:app=heartbeat-cron" \
--sink http://event-display.svc.cluster.local \ 1
--ce-override "sink=bound"
- 1
http://event-display.svc.cluster.local
中的svc
确定接收器是一个 Knative 服务。其他默认的接收器前缀包括channel
和broker
。
5.2.6.1.3. 使用 Web 控制台创建接收器绑定
在集群中安装 Knative Eventing 后,您可以使用 web 控制台创建接收器绑定。使用 OpenShift Container Platform Web 控制台提供了一个简化且直观的用户界面来创建事件源。
先决条件
- 已登陆到 OpenShift Container Platform Web 控制台。
- OpenShift Serverless Operator、Knative Serving 和 Knative Eventing 已在 OpenShift Container Platform 集群中安装。
- 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
流程
创建 Knative 服务以用作接收器:
-
在 Developer 视角中,导航到 +Add
YAML。 复制 YAML 示例:
apiVersion: serving.knative.dev/v1 kind: Service metadata: name: event-display spec: template: spec: containers: - image: quay.io/openshift-knative/knative-eventing-sources-event-display:latest
- 点 Create。
-
在 Developer 视角中,导航到 +Add
创建用作事件源的
CronJob
资源,并每分钟发送一个事件。-
在 Developer 视角中,导航到 +Add
YAML。 复制 YAML 示例:
apiVersion: batch/v1 kind: CronJob metadata: name: heartbeat-cron spec: # Run every minute schedule: "*/1 * * * *" jobTemplate: metadata: labels: app: heartbeat-cron bindings.knative.dev/include: true 1 spec: template: spec: restartPolicy: Never containers: - name: single-heartbeat image: quay.io/openshift-knative/heartbeats args: - --period=1 env: - name: ONE_SHOT value: "true" - name: POD_NAME valueFrom: fieldRef: fieldPath: metadata.name - name: POD_NAMESPACE valueFrom: fieldRef: fieldPath: metadata.namespace
- 1
- 确保包含
bindings.knative.dev/include: true
标签。OpenShift Serverless 的默认命名空间选择行为使用包含模式。
- 点 Create。
-
在 Developer 视角中,导航到 +Add
在与上一步中创建的服务相同的命名空间中创建接收器绑定,或您要将事件发送到的任何其他接收器。
-
在 Developer 视角中,导航到 +Add
Event Source。此时会显示 Event Sources 页面。 - 可选:如果您的事件源有多个供应商,请从 Providers 列表中选择所需的供应商,以过滤供应商的可用事件源。
选择 Sink Binding,然后单击 Create Event Source。此时会显示 Create Event Source 页面。
注意您可以使用 Form view 或 YAML view 配置 Sink Binding 设置,并可以在两者间切换。在不同视图间切换时数据会被保留。
-
在 apiVersion 字段中,输入
batch/v1
。 在 Kind 字段中,输入
Job
。注意OpenShift Serverless sink 绑定不支持
CronJob
kind,因此 Kind 字段必须以 cron 任务创建的Job
对象为目标,而不是 cron 作业对象本身。-
选择一个 Sink。这可以是 Resource 或一个 URI。在这个示例中,上一步中创建的
event-display
服务被用作 Resource sink。 在 Match labels 部分:
-
在 Name 字段中输入
app
。 在 Value 字段中输入
heartbeat-cron
。注意使用带有接收器绑定的 cron 任务时,需要标签选择器,而不是资源名称。这是因为,Cron Job 创建的作业没有可预测的名称,并在名称中包含随机生成的字符串。例如,
hearthbeat-cron-1cc23f
.
-
在 Name 字段中输入
- 点 Create。
-
在 Developer 视角中,导航到 +Add
验证
您可以通过查看 Topology 页面和 pod 日志来验证接收器绑定、接收器和 cron 任务是否已创建并正常工作。
- 在 Developer 视角中,导航到 Topology。
查看接收器绑定、接收器和心跳 cron 任务。
- 观察在添加了接收器绑定后 cron 任务正在注册成功的作业。这意味着接收器绑定成功重新配置由 cron 任务创建的作业。
-
浏览
event-display
服务 pod 的日志,以查看 heartbeats cron 作业生成的事件。
5.2.6.1.4. 接收器绑定引用
您可以通过创建接收器绑定,将 PodSpecable
对象用作事件源。您可以在创建 SinkBinding
对象时配置多个参数。
SinkBinding
对象支持以下参数:
字段 | 描述 | 必需或可选 |
---|---|---|
|
指定 API 版本,如 | 必需 |
|
将此资源对象标识为 | 必需 |
|
指定唯一标识 | 必需 |
|
指定此 | 必需 |
| 对解析为 URI 作为 sink 的对象的引用。 | 必需 |
| 提及通过绑定实施来增强运行时合同的资源。 | 必需 |
| 定义覆盖来控制发送到 sink 的事件的输出格式和修改。 | 选填 |
5.2.6.1.4.1. 主题参数
Subject
参数引用通过绑定实施来增强运行时合同的资源。您可以为 Subject
定义配置多个字段。
Subject
定义支持以下字段:
字段 | 描述 | 必需或可选 |
---|---|---|
| 引用的 API 版本。 | 必需 |
| 引用的类型。 | 必需 |
| 引用的命名空间。如果省略,则默认为对象的命名空间。 | 选填 |
| 引用的名称。 |
如果配置 |
| 引用的选择器。 |
如果配置 |
| 标签选择器要求列表。 |
仅使用 |
| 选择器应用到的标签键。 |
使用 |
|
代表键与一组值的关系。有效的运算符为 |
使用 |
|
字符串值数组。如果 |
使用 |
|
键值对映射. |
仅使用 |
主题参数示例
根据以下 YAML,选择 default
命名空间中名为 mysubject
的 Deployment
对象:
apiVersion: sources.knative.dev/v1 kind: SinkBinding metadata: name: bind-heartbeat spec: subject: apiVersion: apps/v1 kind: Deployment namespace: default name: mysubject ...
根据以下 YAML,可以选择在 default
命名空间中带有 working=example
标签的 Job
对象:
apiVersion: sources.knative.dev/v1 kind: SinkBinding metadata: name: bind-heartbeat spec: subject: apiVersion: batch/v1 kind: Job namespace: default selector: matchLabels: working: example ...
根据以下 YAML,可以选择在 default
命名空间中带有标签 working=example
或 working=sample
的 Pod
对象:
apiVersion: sources.knative.dev/v1 kind: SinkBinding metadata: name: bind-heartbeat spec: subject: apiVersion: v1 kind: Pod namespace: default selector: - matchExpression: key: working operator: In values: - example - sample ...
5.2.6.1.4.2. CloudEvent 覆盖
ceOverrides
定义提供覆盖控制发送到 sink 的 CloudEvent 输出格式和修改。您可以为 ceOverrides
定义配置多个字段。
ceOverrides
定义支持以下字段:
字段 | 描述 | 必需或可选 |
---|---|---|
|
指定在出站事件中添加或覆盖哪些属性。每个 | 选填 |
仅允许有效的 CloudEvent
属性名称作为扩展。您无法从扩展覆盖配置设置 spec 定义的属性。例如,您无法修改 type
属性。
CloudEvent Overrides 示例
apiVersion: sources.knative.dev/v1 kind: SinkBinding metadata: name: bind-heartbeat spec: ... ceOverrides: extensions: extra: this is an extra attribute additional: 42
这会在 主题
上设置 K_CE_OVERRIDES
环境变量:
输出示例
{ "extensions": { "extra": "this is an extra attribute", "additional": "42" } }
5.2.6.1.4.3. include 标签
要使用接收器绑定,您需要为资源或包含资源的命名空间分配 bindings.knative.dev/include: "true"
标签。如果资源定义不包括该标签,集群管理员可以通过运行以下命令将它附加到命名空间:
$ oc label namespace <namespace> bindings.knative.dev/include=true
5.2.6.2. 容器源
容器源创建容器镜像来生成事件并将事件发送到 sink。您可以通过创建容器镜像和使用您的镜像 URI 的 ContainerSource
对象,使用容器源创建自定义事件源。
5.2.6.2.1. 创建容器镜像的指南
两个环境变量由容器源控制器注入:K_SINK
和 K_CE_OVERRIDES
。这些变量分别从 sink
和 andceOverrides
spec 解析。事件发送到 K_SINK
环境变量中指定的 sink URI。该消息必须使用 CloudEvent
HTTP 格式作为 POST
发送。
容器镜像示例
以下是心跳容器镜像的示例:
package main import ( "context" "encoding/json" "flag" "fmt" "log" "os" "strconv" "time" duckv1 "knative.dev/pkg/apis/duck/v1" cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/kelseyhightower/envconfig" ) type Heartbeat struct { Sequence int `json:"id"` Label string `json:"label"` } var ( eventSource string eventType string sink string label string periodStr string ) func init() { flag.StringVar(&eventSource, "eventSource", "", "the event-source (CloudEvents)") flag.StringVar(&eventType, "eventType", "dev.knative.eventing.samples.heartbeat", "the event-type (CloudEvents)") flag.StringVar(&sink, "sink", "", "the host url to heartbeat to") flag.StringVar(&label, "label", "", "a special label") flag.StringVar(&periodStr, "period", "5", "the number of seconds between heartbeats") } type envConfig struct { // Sink URL where to send heartbeat cloud events Sink string `envconfig:"K_SINK"` // CEOverrides are the CloudEvents overrides to be applied to the outbound event. CEOverrides string `envconfig:"K_CE_OVERRIDES"` // Name of this pod. Name string `envconfig:"POD_NAME" required:"true"` // Namespace this pod exists in. Namespace string `envconfig:"POD_NAMESPACE" required:"true"` // Whether to run continuously or exit. OneShot bool `envconfig:"ONE_SHOT" default:"false"` } func main() { flag.Parse() var env envConfig if err := envconfig.Process("", &env); err != nil { log.Printf("[ERROR] Failed to process env var: %s", err) os.Exit(1) } if env.Sink != "" { sink = env.Sink } var ceOverrides *duckv1.CloudEventOverrides if len(env.CEOverrides) > 0 { overrides := duckv1.CloudEventOverrides{} err := json.Unmarshal([]byte(env.CEOverrides), &overrides) if err != nil { log.Printf("[ERROR] Unparseable CloudEvents overrides %s: %v", env.CEOverrides, err) os.Exit(1) } ceOverrides = &overrides } p, err := cloudevents.NewHTTP(cloudevents.WithTarget(sink)) if err != nil { log.Fatalf("failed to create http protocol: %s", err.Error()) } c, err := cloudevents.NewClient(p, cloudevents.WithUUIDs(), cloudevents.WithTimeNow()) if err != nil { log.Fatalf("failed to create client: %s", err.Error()) } var period time.Duration if p, err := strconv.Atoi(periodStr); err != nil { period = time.Duration(5) * time.Second } else { period = time.Duration(p) * time.Second } if eventSource == "" { eventSource = fmt.Sprintf("https://knative.dev/eventing-contrib/cmd/heartbeats/#%s/%s", env.Namespace, env.Name) log.Printf("Heartbeats Source: %s", eventSource) } if len(label) > 0 && label[0] == '"' { label, _ = strconv.Unquote(label) } hb := &Heartbeat{ Sequence: 0, Label: label, } ticker := time.NewTicker(period) for { hb.Sequence++ event := cloudevents.NewEvent("1.0") event.SetType(eventType) event.SetSource(eventSource) event.SetExtension("the", 42) event.SetExtension("heart", "yes") event.SetExtension("beats", true) if ceOverrides != nil && ceOverrides.Extensions != nil { for n, v := range ceOverrides.Extensions { event.SetExtension(n, v) } } if err := event.SetData(cloudevents.ApplicationJSON, hb); err != nil { log.Printf("failed to set cloudevents data: %s", err.Error()) } log.Printf("sending cloudevent to %s", sink) if res := c.Send(context.Background(), event); !cloudevents.IsACK(res) { log.Printf("failed to send cloudevent: %v", res) } if env.OneShot { return } // Wait for next tick <-ticker.C } }
以下是引用先前心跳容器镜像的容器源示例:
apiVersion: sources.knative.dev/v1 kind: ContainerSource metadata: name: test-heartbeats spec: template: spec: containers: # This corresponds to a heartbeats image URI that you have built and published - image: gcr.io/knative-releases/knative.dev/eventing/cmd/heartbeats name: heartbeats args: - --period=1 env: - name: POD_NAME value: "example-pod" - name: POD_NAMESPACE value: "event-test" sink: ref: apiVersion: serving.knative.dev/v1 kind: Service name: example-service ...
5.2.6.2.2. 使用 Knative CLI 创建和管理容器源
您可以使用 kn source container
命令来使用 Knative (kn
) 创建和管理容器源。使用 Knative CLI 创建事件源提供了比直接修改 YAML 文件更精简且直观的用户界面。
创建容器源
$ kn source container create <container_source_name> --image <image_uri> --sink <sink>
删除容器源
$ kn source container delete <container_source_name>
描述容器源
$ kn source container describe <container_source_name>
列出现有容器源
$ kn source container list
以 YAML 格式列出现有容器源
$ kn source container list -o yaml
更新容器源
此命令为现有容器源更新镜像 URI:
$ kn source container update <container_source_name> --image <image_uri>
5.2.6.2.3. 使用 Web 控制台创建容器源
在集群中安装 Knative Eventing 后,您可以使用 Web 控制台创建容器源。使用 OpenShift Container Platform Web 控制台提供了一个简化且直观的用户界面来创建事件源。
先决条件
- 已登陆到 OpenShift Container Platform Web 控制台。
- OpenShift Serverless Operator、Knative Serving 和 Knative Eventing 已在 OpenShift Container Platform 集群中安装。
- 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
流程
-
在 Developer 视角中,导航到 +Add
Event Source。此时会显示 Event Sources 页面。 - 选择 Container Source,然后点 Create Event Source。此时会显示 Create Event Source 页面。
使用 Form view 或 YAML 视图配置 Container Source 设置:
注意您可以在 Form view 和 YAML view 间进行切换。在不同视图间切换时数据会被保留。
- 在 Image 字段中,输入您要在由容器源创建的容器中运行的镜像的 URI。
- 在 Name 字段中输入镜像的名称。
- 可选:在 Arguments 参数字段中,输入要传递给容器的任何参数。
- 可选:在 Environment variables 字段中,添加容器中要设置的任何环境变量。
在 Sink 部分,添加一个接收器,其中将容器源的事件路由到其中。如果使用 Form 视图,您可以从以下选项中选择:
- 选择 Resource 使用频道、代理或服务作为事件源的接收器。
- 选择 URI,以指定容器源的事件路由到的位置。
- 配置完容器源后,点 Create。
5.2.6.2.4. 容器源参考
您可以通过创建 ContainerSource
对象来使用容器作为事件源。您可以在创建 ContainerSource
对象时配置多个参数。
ContainerSource
对象支持以下字段:
字段 | 描述 | 必需或可选 |
---|---|---|
|
指定 API 版本,如 | 必需 |
|
将此资源对象标识为 | 必需 |
|
指定唯一标识 | 必需 |
|
指定此 | 必需 |
| 对解析为 URI 作为 sink 的对象的引用。 | 必需 |
|
| 必需 |
| 定义覆盖来控制发送到 sink 的事件的输出格式和修改。 | 选填 |
模板参数示例
apiVersion: sources.knative.dev/v1 kind: ContainerSource metadata: name: test-heartbeats spec: template: spec: containers: - image: quay.io/openshift-knative/heartbeats:latest name: heartbeats args: - --period=1 env: - name: POD_NAME value: "mypod" - name: POD_NAMESPACE value: "event-test" ...
5.2.6.2.4.1. CloudEvent 覆盖
ceOverrides
定义提供覆盖控制发送到 sink 的 CloudEvent 输出格式和修改。您可以为 ceOverrides
定义配置多个字段。
ceOverrides
定义支持以下字段:
字段 | 描述 | 必需或可选 |
---|---|---|
|
指定在出站事件中添加或覆盖哪些属性。每个 | 选填 |
仅允许有效的 CloudEvent
属性名称作为扩展。您无法从扩展覆盖配置设置 spec 定义的属性。例如,您无法修改 type
属性。
CloudEvent Overrides 示例
apiVersion: sources.knative.dev/v1 kind: ContainerSource metadata: name: test-heartbeats spec: ... ceOverrides: extensions: extra: this is an extra attribute additional: 42
这会在 主题
上设置 K_CE_OVERRIDES
环境变量:
输出示例
{ "extensions": { "extra": "this is an extra attribute", "additional": "42" } }
5.2.7. 使用 Developer 视角将事件源连接到接收器(sink)
当使用 OpenShift Container Platform Web 控制台创建事件源时,您可以指定事件从该源发送到的接收器。sink 可以是任何可寻址或可调用的资源,可以从其他资源接收传入的事件。
5.2.7.1. 使用 Developer 视角将事件源连接到接收器(sink)
先决条件
- OpenShift Serverless Operator、Knative Serving 和 Knative Eventing 已在 OpenShift Container Platform 集群中安装。
- 已登陆到 web 控制台,且处于 Developer 视角。
- 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
- 您已创建了 sink,如 Knative 服务、频道或代理。
流程
-
进入 +Add
Event Source 并选择您要创建的事件源类型,创建任何类型的事件源。 - 在 Create Event Source 表单视图的 Sink 部分,在 Resource 列表中选择您的接收器。
- 点 Create。
验证
您可以通过查看 Topology 页面来验证事件源是否已创建并连接到 sink。
- 在 Developer 视角中,导航到 Topology。
- 查看事件源并点连接的接收器查看右侧面板中的接收器详情。