5.2. 事件源


5.2.1. 事件源

Knative 事件源可以是生成或导入云事件的任何 Kubernetes 对象,并将这些事件转发到另一个端点,称为接收器(sink)。事件源对于开发对事件做出反应的分布式系统至关重要。

您可以使用 OpenShift Container Platform Web 控制台、Knative (kn) CLI 或应用 YAML 文件的 Developer 视角创建和管理 Knative 事件源。

目前,OpenShift Serverless 支持以下事件源类型:

API 服务器源
将 Kubernetes API 服务器事件引入 Knative。每次创建、更新或删除 Kubernetes 资源时,API 服务器源会发送一个新事件。
Ping 源
根据指定的 cron 计划生成带有固定有效负载的事件。
Kafka 事件源
将 Kafka 集群连接到接收器作为事件源。

您还可以创建自定义事件源

5.2.2. Administrator 视角中的事件源

事件源对于开发对事件做出反应的分布式系统至关重要。

5.2.2.1. 使用 Administrator 视角创建事件源

Knative 事件源可以是生成或导入云事件的任何 Kubernetes 对象,并将这些事件转发到另一个端点,称为接收器(sink)

先决条件

  • OpenShift Serverless Operator 和 Knative Eventing 已安装在 OpenShift Container Platform 集群中。
  • 您已登录到 Web 控制台,且处于 Administrator 视角。
  • 具有集群管理员 OpenShift Container Platform 的权限。

流程

  1. 在 OpenShift Container Platform Web 控制台的 Administrator 视角中,导航到 Serverless Eventing
  2. Create 列表中,选择 Event Source。您将被定向到 Event Sources 页面。
  3. 选择您要创建的事件源类型。

5.2.3. 创建 API 服务器源

API 服务器源是一个事件源,可用于将事件接收器(sink),如 Knative 服务,连接到 Kubernetes API 服务器。API 服务器源监视 Kubernetes 事件并将其转发到 Knative Eventing 代理。

5.2.3.1. 使用 Web 控制台创建 API 服务器源

在集群中安装 Knative Eventing 后,您可以使用 web 控制台创建 API 服务器源。使用 OpenShift Container Platform Web 控制台提供了一个简化且直观的用户界面来创建事件源。

先决条件

  • 已登陆到 OpenShift Container Platform Web 控制台。
  • 在集群中安装了 OpenShift Serverless Operator 和 Knative Eventing。
  • 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
  • 已安装 OpenShift CLI(oc)。
流程

如果要重新使用现有服务帐户,您可以修改现有的 ServiceAccount 资源,使其包含所需的权限,而不是创建新资源。

  1. 以 YAML 文件形式,为事件源创建服务帐户、角色和角色绑定:

    apiVersion: v1
    kind: ServiceAccount
    metadata:
      name: events-sa
      namespace: default 1
    
    ---
    apiVersion: rbac.authorization.k8s.io/v1
    kind: Role
    metadata:
      name: event-watcher
      namespace: default 2
    rules:
      - apiGroups:
          - ""
        resources:
          - events
        verbs:
          - get
          - list
          - watch
    
    ---
    apiVersion: rbac.authorization.k8s.io/v1
    kind: RoleBinding
    metadata:
      name: k8s-ra-event-watcher
      namespace: default 3
    roleRef:
      apiGroup: rbac.authorization.k8s.io
      kind: Role
      name: event-watcher
    subjects:
      - kind: ServiceAccount
        name: events-sa
        namespace: default 4
    1 2 3 4
    将这个命名空间更改为已选择安装事件源的命名空间。
  2. 应用 YAML 文件:

    $ oc apply -f <filename>
  3. Developer 视角中,导航到 +Add Event Source。此时会显示 Event Sources 页面。
  4. 可选:如果您的事件源有多个供应商,请从 Providers 列表中选择所需的供应商,以过滤供应商的可用事件源。
  5. 选择 ApiServerSource,然后点 Create Event Source。此时会显示 Create Event Source 页面。
  6. 使用 Form viewYAML view 配置 ApiServerSource 设置:

    注意

    您可以在 Form viewYAML view 间进行切换。在不同视图间切换时数据会被保留。

    1. 输入 v1 作为 APIVERSIONEvent 作为 KIND
    2. 为您创建的服务帐户选择 Service Account Name
    3. 为事件源选择 SinkSink 可以是一个 资源,如频道、代理或服务,也可以是一个 URI
  7. Create

验证

  • 创建 API 服务器源后,您会在 Topology 视图中看到它连接到接收器的服务。

    ApiServerSource Topology view
注意

如果使用 URI sink,请右键点击 URI sink Edit URI 来修改 URI。

删除 API 服务器源

  1. 导航到 Topology 视图。
  2. 右键点击 API 服务器源并选择 Delete ApiServerSource

    删除 ApiServerSource

5.2.3.2. 使用 Knative CLI 创建 API 服务器源

您可以使用 kn source apiserver create 命令,使用 kn CLI 创建 API 服务器源。使用 kn CLI 创建 API 服务器源可提供比直接修改 YAML 文件更精简且直观的用户界面。

先决条件

  • 在集群中安装了 OpenShift Serverless Operator 和 Knative Eventing。
  • 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
  • 已安装 OpenShift CLI(oc)。
  • 已安装 Knative (kn) CLI。
流程

如果要重新使用现有服务帐户,您可以修改现有的 ServiceAccount 资源,使其包含所需的权限,而不是创建新资源。

  1. 以 YAML 文件形式,为事件源创建服务帐户、角色和角色绑定:

    apiVersion: v1
    kind: ServiceAccount
    metadata:
      name: events-sa
      namespace: default 1
    
    ---
    apiVersion: rbac.authorization.k8s.io/v1
    kind: Role
    metadata:
      name: event-watcher
      namespace: default 2
    rules:
      - apiGroups:
          - ""
        resources:
          - events
        verbs:
          - get
          - list
          - watch
    
    ---
    apiVersion: rbac.authorization.k8s.io/v1
    kind: RoleBinding
    metadata:
      name: k8s-ra-event-watcher
      namespace: default 3
    roleRef:
      apiGroup: rbac.authorization.k8s.io
      kind: Role
      name: event-watcher
    subjects:
      - kind: ServiceAccount
        name: events-sa
        namespace: default 4
    1 2 3 4
    将这个命名空间更改为已选择安装事件源的命名空间。
  2. 应用 YAML 文件:

    $ oc apply -f <filename>
  3. 创建具有事件 sink 的 API 服务器源。在以下示例中,sink 是一个代理:

    $ kn source apiserver create <event_source_name> --sink broker:<broker_name> --resource "event:v1" --service-account <service_account_name> --mode Resource
  4. 要检查 API 服务器源是否已正确设置,请创建一个 Knative 服务,在日志中转储传入的信息:

    $ kn service create <service_name> --image quay.io/openshift-knative/knative-eventing-sources-event-display:latest
  5. 如果您使用代理作为事件 sink,请创建一个触发器将事件从 default 代理过滤到服务:

    $ kn trigger create <trigger_name> --sink ksvc:<service_name>
  6. 通过在 default 命名空间中启动 pod 来创建事件:

    $ oc create deployment hello-node --image quay.io/openshift-knative/knative-eventing-sources-event-display:latest
  7. 通过检查以下命令生成的输出来检查是否正确映射了控制器:

    $ kn source apiserver describe <source_name>

    输出示例

    Name:                mysource
    Namespace:           default
    Annotations:         sources.knative.dev/creator=developer, sources.knative.dev/lastModifier=developer
    Age:                 3m
    ServiceAccountName:  events-sa
    Mode:                Resource
    Sink:
      Name:       default
      Namespace:  default
      Kind:       Broker (eventing.knative.dev/v1)
    Resources:
      Kind:        event (v1)
      Controller:  false
    Conditions:
      OK TYPE                     AGE REASON
      ++ Ready                     3m
      ++ Deployed                  3m
      ++ SinkProvided              3m
      ++ SufficientPermissions     3m
      ++ EventTypesProvided        3m

验证

您可以通过查看消息转储程序功能日志来验证 Kubernetes 事件是否已发送到 Knative。

  1. 获取 pod:

    $ oc get pods
  2. 查看 pod 的消息转储程序功能日志:

    $ oc logs $(oc get pod -o name | grep event-display) -c user-container

    输出示例

    ☁️  cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.apiserver.resource.update
      datacontenttype: application/json
      ...
    Data,
      {
        "apiVersion": "v1",
        "involvedObject": {
          "apiVersion": "v1",
          "fieldPath": "spec.containers{hello-node}",
          "kind": "Pod",
          "name": "hello-node",
          "namespace": "default",
           .....
        },
        "kind": "Event",
        "message": "Started container",
        "metadata": {
          "name": "hello-node.159d7608e3a3572c",
          "namespace": "default",
          ....
        },
        "reason": "Started",
        ...
      }

删除 API 服务器源

  1. 删除触发器:

    $ kn trigger delete <trigger_name>
  2. 删除事件源:

    $ kn source apiserver delete <source_name>
  3. 删除服务帐户、集群角色和集群绑定:

    $ oc delete -f authentication.yaml
5.2.3.2.1. Knative CLI sink 标记

当使用 Knative (kn) CLI 创建事件源时,您可以使用 --sink 标志指定事件从该资源发送到的接收器。sink 可以是任何可寻址或可调用的资源,可以从其他资源接收传入的事件。

以下示例创建使用服务 http://event-display.svc.cluster.local 的接收器绑定作为接收器:

使用 sink 标记的命令示例

$ kn source binding create bind-heartbeat \
  --namespace sinkbinding-example \
  --subject "Job:batch/v1:app=heartbeat-cron" \
  --sink http://event-display.svc.cluster.local \ 1
  --ce-override "sink=bound"

1
http://event-display.svc.cluster.local 中的 svc 确定接收器是一个 Knative 服务。其他默认的接收器前缀包括 channelbroker

5.2.3.3. 使用 YAML 文件创建 API 服务器源

使用 YAML 文件创建 Knative 资源使用声明性 API,它允许您以可重复的方式描述事件源。要使用 YAML 创建 API 服务器源,您必须创建一个 YAML 文件来定义 ApiServerSource 对象,然后使用 oc apply 命令应用它。

先决条件

  • 在集群中安装了 OpenShift Serverless Operator 和 Knative Eventing。
  • 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
  • 您已在与 API 服务器源 YAML 文件中定义的相同的命名空间中创建 default 代理。
  • 安装 OpenShift CLI (oc) 。
流程

如果要重新使用现有服务帐户,您可以修改现有的 ServiceAccount 资源,使其包含所需的权限,而不是创建新资源。

  1. 以 YAML 文件形式,为事件源创建服务帐户、角色和角色绑定:

    apiVersion: v1
    kind: ServiceAccount
    metadata:
      name: events-sa
      namespace: default 1
    
    ---
    apiVersion: rbac.authorization.k8s.io/v1
    kind: Role
    metadata:
      name: event-watcher
      namespace: default 2
    rules:
      - apiGroups:
          - ""
        resources:
          - events
        verbs:
          - get
          - list
          - watch
    
    ---
    apiVersion: rbac.authorization.k8s.io/v1
    kind: RoleBinding
    metadata:
      name: k8s-ra-event-watcher
      namespace: default 3
    roleRef:
      apiGroup: rbac.authorization.k8s.io
      kind: Role
      name: event-watcher
    subjects:
      - kind: ServiceAccount
        name: events-sa
        namespace: default 4
    1 2 3 4
    将这个命名空间更改为已选择安装事件源的命名空间。
  2. 应用 YAML 文件:

    $ oc apply -f <filename>
  3. 将 API 服务器源创建为 YAML 文件:

    apiVersion: sources.knative.dev/v1alpha1
    kind: ApiServerSource
    metadata:
      name: testevents
    spec:
      serviceAccountName: events-sa
      mode: Resource
      resources:
        - apiVersion: v1
          kind: Event
      sink:
        ref:
          apiVersion: eventing.knative.dev/v1
          kind: Broker
          name: default
  4. 应用 ApiServerSource YAML 文件:

    $ oc apply -f <filename>
  5. 要检查 API 服务器源是否已正确设置,请创建一个 Knative 服务作为 YAML 文件,在日志中转储传入的信息:

    apiVersion: serving.knative.dev/v1
    kind: Service
    metadata:
      name: event-display
      namespace: default
    spec:
      template:
        spec:
          containers:
            - image: quay.io/openshift-knative/knative-eventing-sources-event-display:latest
  6. 应用 Service YAML 文件:

    $ oc apply -f <filename>
  7. 创建一个 Trigger 对象作为一个 YAML 文件,该文件将事件从 default 代理过滤到上一步中创建的服务:

    apiVersion: eventing.knative.dev/v1
    kind: Trigger
    metadata:
      name: event-display-trigger
      namespace: default
    spec:
      broker: default
      subscriber:
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: event-display
  8. 应用 Trigger YAML 文件:

    $ oc apply -f <filename>
  9. 通过在 default 命名空间中启动 pod 来创建事件:

    $ oc create deployment hello-node --image=quay.io/openshift-knative/knative-eventing-sources-event-display
  10. 输入以下命令并检查输出,检查是否正确映射了控制器:

    $ oc get apiserversource.sources.knative.dev testevents -o yaml

    输出示例

    apiVersion: sources.knative.dev/v1alpha1
    kind: ApiServerSource
    metadata:
      annotations:
      creationTimestamp: "2020-04-07T17:24:54Z"
      generation: 1
      name: testevents
      namespace: default
      resourceVersion: "62868"
      selfLink: /apis/sources.knative.dev/v1alpha1/namespaces/default/apiserversources/testevents2
      uid: 1603d863-bb06-4d1c-b371-f580b4db99fa
    spec:
      mode: Resource
      resources:
      - apiVersion: v1
        controller: false
        controllerSelector:
          apiVersion: ""
          kind: ""
          name: ""
          uid: ""
        kind: Event
        labelSelector: {}
      serviceAccountName: events-sa
      sink:
        ref:
          apiVersion: eventing.knative.dev/v1
          kind: Broker
          name: default

验证

要验证 Kubernetes 事件是否已发送到 Knative,您可以查看消息转储程序功能日志。

  1. 输入以下命令来获取 pod:

    $ oc get pods
  2. 输入以下命令来查看 pod 的消息转储程序功能日志:

    $ oc logs $(oc get pod -o name | grep event-display) -c user-container

    输出示例

    ☁️  cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.apiserver.resource.update
      datacontenttype: application/json
      ...
    Data,
      {
        "apiVersion": "v1",
        "involvedObject": {
          "apiVersion": "v1",
          "fieldPath": "spec.containers{hello-node}",
          "kind": "Pod",
          "name": "hello-node",
          "namespace": "default",
           .....
        },
        "kind": "Event",
        "message": "Started container",
        "metadata": {
          "name": "hello-node.159d7608e3a3572c",
          "namespace": "default",
          ....
        },
        "reason": "Started",
        ...
      }

删除 API 服务器源

  1. 删除触发器:

    $ oc delete -f trigger.yaml
  2. 删除事件源:

    $ oc delete -f k8s-events.yaml
  3. 删除服务帐户、集群角色和集群绑定:

    $ oc delete -f authentication.yaml

5.2.4. 创建 ping 源

ping 源是一个事件源,可用于定期向事件消费者发送带有恒定有效负载的 ping 事件。ping 源可以用来调度发送事件,类似于计时器。

5.2.4.1. 使用 Web 控制台创建 ping 源

在集群中安装 Knative Eventing 后,您可以使用 web 控制台创建 ping 源。使用 OpenShift Container Platform Web 控制台提供了一个简化且直观的用户界面来创建事件源。

先决条件

  • 已登陆到 OpenShift Container Platform Web 控制台。
  • 在集群中安装了 OpenShift Serverless Operator、Knative Serving 和 Knative Eventing。
  • 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。

流程

  1. 要验证 ping 源是否可以工作,请创建一个简单的 Knative 服务,在服务日志中转储传入的信息。

    1. Developer 视角中,导航到 +Add YAML
    2. 复制 YAML 示例:

      apiVersion: serving.knative.dev/v1
      kind: Service
      metadata:
        name: event-display
      spec:
        template:
          spec:
            containers:
              - image: quay.io/openshift-knative/knative-eventing-sources-event-display:latest
    3. Create
  2. 在与上一步中创建的服务相同的命名空间中创建一个 ping 源,或您要将事件发送到的任何其他接收器。

    1. Developer 视角中,导航到 +Add Event Source。此时会显示 Event Sources 页面。
    2. 可选:如果您的事件源有多个供应商,请从 Providers 列表中选择所需的供应商,以过滤供应商的可用事件源。
    3. 选择 Ping Source,然后点击 Create Event Source。此时会显示 Create Event Source 页面。

      注意

      您可以使用 Form viewYAML view 配置 PingSource 设置,并可以在两者间切换。在不同视图间切换时数据会被保留。

    4. Schedule 输入一个值。在本例中,值为 */2 * * *,它会创建一个 PingSource,每两分钟发送一条消息。
    5. 可选:您可以为 Data 输入一个值,它是消息的有效负载。
    6. 选择一个 Sink。这可以是 Resource 或一个 URI。在这个示例中,上一步中创建的 event-display 服务被用作 Resource sink。
    7. Create

验证

您可以通过查看 Topology 页面来验证 ping 源是否已创建并连接到接收器。

  1. Developer 视角中,导航到 Topology
  2. 查看 ping 源和接收器。

    在 Topology 视图中查看 ping 源和服务

删除 ping 源

  1. 导航到 Topology 视图。
  2. 右键单击 API 服务器源,再选择 Delete Ping Source

5.2.4.2. 使用 Knative CLI 创建 ping 源

您可以使用 kn source ping create 命令,通过 Knative (kn) CLI 创建 ping 源。使用 Knative CLI 创建事件源提供了比直接修改 YAML 文件更精简且直观的用户界面。

先决条件

  • 在集群中安装了 OpenShift Serverless Operator、Knative Serving 和 Knative Eventing。
  • 已安装 Knative (kn) CLI。
  • 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
  • 可选: 如果要使用此流程验证步骤,请安装 OpenShift CLI (oc) 。

流程

  1. 要验证 ping 源是否可以工作,请创建一个简单的 Knative 服务,在服务日志中转储传入的信息:

    $ kn service create event-display \
        --image quay.io/openshift-knative/knative-eventing-sources-event-display:latest
  2. 对于您要请求的每一组 ping 事件,请在与事件消费者相同的命名空间中创建一个 ping 源:

    $ kn source ping create test-ping-source \
        --schedule "*/2 * * * *" \
        --data '{"message": "Hello world!"}' \
        --sink ksvc:event-display
  3. 输入以下命令并检查输出,检查是否正确映射了控制器:

    $ kn source ping describe test-ping-source

    输出示例

    Name:         test-ping-source
    Namespace:    default
    Annotations:  sources.knative.dev/creator=developer, sources.knative.dev/lastModifier=developer
    Age:          15s
    Schedule:     */2 * * * *
    Data:         {"message": "Hello world!"}
    
    Sink:
      Name:       event-display
      Namespace:  default
      Resource:   Service (serving.knative.dev/v1)
    
    Conditions:
      OK TYPE                 AGE REASON
      ++ Ready                 8s
      ++ Deployed              8s
      ++ SinkProvided         15s
      ++ ValidSchedule        15s
      ++ EventTypeProvided    15s
      ++ ResourcesCorrect     15s

验证

您可以通过查看 sink pod 的日志来验证 Kubernetes 事件是否已发送到 Knative 事件。

默认情况下,如果在 60 秒内都没有流量,Knative 服务会终止其 Pod。本指南中演示的示例创建了一个 ping 源,每 2 分钟发送一条消息,因此每个消息都应该在新创建的 pod 中观察到。

  1. 查看新创建的 pod:

    $ watch oc get pods
  2. 使用 Ctrl+C 取消查看 pod,然后查看所创建 pod 的日志:

    $ oc logs $(oc get pod -o name | grep event-display) -c user-container

    输出示例

    ☁️  cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.sources.ping
      source: /apis/v1/namespaces/default/pingsources/test-ping-source
      id: 99e4f4f6-08ff-4bff-acf1-47f61ded68c9
      time: 2020-04-07T16:16:00.000601161Z
      datacontenttype: application/json
    Data,
      {
        "message": "Hello world!"
      }

删除 ping 源

  • 删除 ping 源:

    $ kn delete pingsources.sources.knative.dev <ping_source_name>
5.2.4.2.1. Knative CLI sink 标记

当使用 Knative (kn) CLI 创建事件源时,您可以使用 --sink 标志指定事件从该资源发送到的接收器。sink 可以是任何可寻址或可调用的资源,可以从其他资源接收传入的事件。

以下示例创建使用服务 http://event-display.svc.cluster.local 的接收器绑定作为接收器:

使用 sink 标记的命令示例

$ kn source binding create bind-heartbeat \
  --namespace sinkbinding-example \
  --subject "Job:batch/v1:app=heartbeat-cron" \
  --sink http://event-display.svc.cluster.local \ 1
  --ce-override "sink=bound"

1
http://event-display.svc.cluster.local 中的 svc 确定接收器是一个 Knative 服务。其他默认的接收器前缀包括 channelbroker

5.2.4.3. 使用 YAML 创建 ping 源

使用 YAML 文件创建 Knative 资源使用声明性 API,它允许您以可重复的方式描述事件源。要使用 YAML 创建无服务器 ping 源,您必须创建一个 YAML 文件来定义 PingSource 对象,然后使用 oc apply 来应用它。

PingSource 对象示例

apiVersion: sources.knative.dev/v1
kind: PingSource
metadata:
  name: test-ping-source
spec:
  schedule: "*/2 * * * *" 1
  data: '{"message": "Hello world!"}' 2
  sink: 3
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: event-display

1
事件指定的调度使用 CRON 格式
2
事件消息正文以 JSON 编码的数据字符串表示。
3
这些是事件消费者的详情。在这个示例中,我们使用名为 event-display 的 Knative 服务。

先决条件

  • 在集群中安装了 OpenShift Serverless Operator、Knative Serving 和 Knative Eventing。
  • 安装 OpenShift CLI (oc) 。
  • 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。

流程

  1. 要验证 ping 源是否可以工作,请创建一个简单的 Knative 服务,在服务日志中转储传入的信息。

    1. 创建服务 YAML 文件:

      apiVersion: serving.knative.dev/v1
      kind: Service
      metadata:
        name: event-display
      spec:
        template:
          spec:
            containers:
              - image: quay.io/openshift-knative/knative-eventing-sources-event-display:latest
    2. 创建服务:

      $ oc apply -f <filename>
  2. 对于您要请求的每一组 ping 事件,请在与事件消费者相同的命名空间中创建一个 ping 源。

    1. 为 ping 源创建 YAML 文件:

      apiVersion: sources.knative.dev/v1
      kind: PingSource
      metadata:
        name: test-ping-source
      spec:
        schedule: "*/2 * * * *"
        data: '{"message": "Hello world!"}'
        sink:
          ref:
            apiVersion: serving.knative.dev/v1
            kind: Service
            name: event-display
    2. 创建 ping 源:

      $ oc apply -f <filename>
  3. 输入以下命令检查是否正确映射了控制器:

    $ oc get pingsource.sources.knative.dev <ping_source_name> -oyaml

    输出示例

    apiVersion: sources.knative.dev/v1
    kind: PingSource
    metadata:
      annotations:
        sources.knative.dev/creator: developer
        sources.knative.dev/lastModifier: developer
      creationTimestamp: "2020-04-07T16:11:14Z"
      generation: 1
      name: test-ping-source
      namespace: default
      resourceVersion: "55257"
      selfLink: /apis/sources.knative.dev/v1/namespaces/default/pingsources/test-ping-source
      uid: 3d80d50b-f8c7-4c1b-99f7-3ec00e0a8164
    spec:
      data: '{ value: "hello" }'
      schedule: '*/2 * * * *'
      sink:
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: event-display
          namespace: default

验证

您可以通过查看 sink pod 的日志来验证 Kubernetes 事件是否已发送到 Knative 事件。

默认情况下,如果在 60 秒内都没有流量,Knative 服务会终止其 Pod。本指南中演示的示例创建了一个 PingSource,每 2 分钟发送一条消息,因此每个消息都应该在新创建的 pod 中观察到。

  1. 查看新创建的 pod:

    $ watch oc get pods
  2. 使用 Ctrl+C 取消查看 pod,然后查看所创建 pod 的日志:

    $ oc logs $(oc get pod -o name | grep event-display) -c user-container

    输出示例

    ☁️  cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.sources.ping
      source: /apis/v1/namespaces/default/pingsources/test-ping-source
      id: 042ff529-240e-45ee-b40c-3a908129853e
      time: 2020-04-07T16:22:00.000791674Z
      datacontenttype: application/json
    Data,
      {
        "message": "Hello world!"
      }

删除 ping 源

  • 删除 ping 源:

    $ oc delete -f <filename>

    示例命令

    $ oc delete -f ping-source.yaml

5.2.5. Kafka 源

您可以创建一个 Kafka 源从 Apache Kafka 集群中读取事件,并将这些事件传递给接收器。您可以使用 OpenShift Container Platform web 控制台、Knative (kn) CLI 或直接创建 KafkaSource 对象并使用 OpenShift CLI (oc) 创建 Kafka 源来应用它。

5.2.5.1. 使用 Web 控制台创建 Kafka 事件源

在集群中安装了 Knative Kafka 后,您可以使用 Web 控制台创建 Kafka 源。使用 OpenShift Container Platform Web 控制台提供了一个简化的用户界面来创建 Kafka 源。

先决条件

  • OpenShift Serverless Operator、Knative Eventing 和 KnativeKafka 自定义资源已安装在集群中。
  • 已登陆到 web 控制台。
  • 您可以访问 Red Hat AMQ Streams(Kafka)集群,该集群会生成您要导入的 Kafka 信息。
  • 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。

流程

  1. Developer 视角中,进入到 +Add 页面并选择 Event Source
  2. Event Sources 页面中,在 Type 部分选择 Kafka Source
  3. 配置 Kafka Source 设置:

    1. 添加用逗号分开的 Bootstrap 服务器列表。
    2. 添加以逗号分隔的标题列表。
    3. 添加一个 消费者组
    4. 为您创建的服务帐户选择 Service Account Name
    5. 为事件源选择 SinkSink 可以是一个 资源,如频道、代理或服务,也可以是一个 URI
    6. 输入 Kafka 事件源的 名称
  4. Create

验证

您可以通过查看 Topology 页面来验证 Kafka 事件源是否已创建并连接到接收器。

  1. Developer 视角中,导航到 Topology
  2. 查看 Kafka 事件源和接收器。

    在 Topology 视图中查看 Kafka 源和服务

5.2.5.2. 使用 Knative CLI 创建 Kafka 事件源

您可以使用 kn source kafka create 命令,使用 Knative (kn) CLI 创建 Kafka 源。使用 Knative CLI 创建事件源提供了比直接修改 YAML 文件更精简且直观的用户界面。

先决条件

  • OpenShift Serverless Operator、Knative Eventing、Knative Serving 和 KnativeKafka 自定义资源(CR)已安装在集群中。
  • 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
  • 您可以访问 Red Hat AMQ Streams(Kafka)集群,该集群会生成您要导入的 Kafka 信息。
  • 已安装 Knative (kn) CLI。
  • 可选:如果您想要使用此流程中的验证步骤,已安装 OpenShift CLI (oc)。

流程

  1. 要验证 Kafka 事件源是否可以工作,请创建一个 Knative 服务,在服务日志中转储传入的事件:

    $ kn service create event-display \
        --image quay.io/openshift-knative/knative-eventing-sources-event-display
  2. 创建 KafkaSource CR:

    $ kn source kafka create <kafka_source_name> \
        --servers <cluster_kafka_bootstrap>.kafka.svc:9092 \
        --topics <topic_name> --consumergroup my-consumer-group \
        --sink event-display
    注意

    将此命令中的占位符值替换为源名称、引导服务器和主题的值。

    --servers--topics--consumergroup 选项指定到 Kafka 集群的连接参数。--consumergroup 选项是可选的。

  3. 可选:查看您创建的 KafkaSource CR 的详情:

    $ kn source kafka describe <kafka_source_name>

    输出示例

    Name:              example-kafka-source
    Namespace:         kafka
    Age:               1h
    BootstrapServers:  example-cluster-kafka-bootstrap.kafka.svc:9092
    Topics:            example-topic
    ConsumerGroup:     example-consumer-group
    
    Sink:
      Name:       event-display
      Namespace:  default
      Resource:   Service (serving.knative.dev/v1)
    
    Conditions:
      OK TYPE            AGE REASON
      ++ Ready            1h
      ++ Deployed         1h
      ++ SinkProvided     1h

验证步骤

  1. 触发 Kafka 实例将信息发送到主题:

    $ oc -n kafka run kafka-producer \
        -ti --image=quay.io/strimzi/kafka:latest-kafka-2.7.0 --rm=true \
        --restart=Never -- bin/kafka-console-producer.sh \
        --broker-list <cluster_kafka_bootstrap>:9092 --topic my-topic

    在提示符后输入信息。这个命令假设:

    • Kafka 集群安装在 kafka 命名空间中。
    • KafkaSource 对象已被配置为使用 my-topic 主题。
  2. 通过查看日志来验证消息是否显示:

    $ oc logs $(oc get pod -o name | grep event-display) -c user-container

    输出示例

    ☁️  cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.kafka.event
      source: /apis/v1/namespaces/default/kafkasources/example-kafka-source#example-topic
      subject: partition:46#0
      id: partition:46/offset:0
      time: 2021-03-10T11:21:49.4Z
    Extensions,
      traceparent: 00-161ff3815727d8755848ec01c866d1cd-7ff3916c44334678-00
    Data,
      Hello!

5.2.5.2.1. Knative CLI sink 标记

当使用 Knative (kn) CLI 创建事件源时,您可以使用 --sink 标志指定事件从该资源发送到的接收器。sink 可以是任何可寻址或可调用的资源,可以从其他资源接收传入的事件。

以下示例创建使用服务 http://event-display.svc.cluster.local 的接收器绑定作为接收器:

使用 sink 标记的命令示例

$ kn source binding create bind-heartbeat \
  --namespace sinkbinding-example \
  --subject "Job:batch/v1:app=heartbeat-cron" \
  --sink http://event-display.svc.cluster.local \ 1
  --ce-override "sink=bound"

1
http://event-display.svc.cluster.local 中的 svc 确定接收器是一个 Knative 服务。其他默认的接收器前缀包括 channelbroker

5.2.5.3. 使用 YAML 创建 Kafka 事件源

使用 YAML 文件创建 Knative 资源使用声明性 API,它允许您以声明性的方式描述应用程序,并以可重复的方式描述应用程序。要使用 YAML 创建 Kafka 源,您必须创建一个 YAML 文件来定义 KafkaSource 对象,然后使用 oc apply 命令应用它。

先决条件

  • OpenShift Serverless Operator、Knative Eventing 和 KnativeKafka 自定义资源已安装在集群中。
  • 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
  • 您可以访问 Red Hat AMQ Streams(Kafka)集群,该集群会生成您要导入的 Kafka 信息。
  • 安装 OpenShift CLI (oc) 。

流程

  1. 创建 KafkaSource 对象作为 YAML 文件:

    apiVersion: sources.knative.dev/v1beta1
    kind: KafkaSource
    metadata:
      name: <source_name>
    spec:
      consumerGroup: <group_name> 1
      bootstrapServers:
      - <list_of_bootstrap_servers>
      topics:
      - <list_of_topics> 2
      sink:
      - <list_of_sinks> 3
    1
    用户组是一组使用相同组群 ID 的用户,并消耗一个标题中的数据。
    2
    主题提供数据存储的目的地。每个主题都被分成一个或多个分区。
    3
    sink 指定事件从源发送到的位置。
    重要

    仅支持 OpenShift Serverless 上的 KafkaSource 对象的 v1beta1 API 版本。不要使用这个 API 的 v1alpha1 版本,因为这个版本现已弃用。

    KafkaSource 对象示例

    apiVersion: sources.knative.dev/v1beta1
    kind: KafkaSource
    metadata:
      name: kafka-source
    spec:
      consumerGroup: knative-group
      bootstrapServers:
      - my-cluster-kafka-bootstrap.kafka:9092
      topics:
      - knative-demo-topic
      sink:
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: event-display

  2. 应用 KafkaSource YAML 文件:

    $ oc apply -f <filename>

验证

  • 输入以下命令验证 Kafka 事件源是否已创建:

    $ oc get pods

    输出示例

    NAME                                    READY     STATUS    RESTARTS   AGE
    kafkasource-kafka-source-5ca0248f-...   1/1       Running   0          13m

5.2.5.4. 为 Kafka 源配置 SASL 身份验证

Apache Kafka 使用 简单身份验证和安全层 (SASL) 进行身份验证。如果在集群中使用 SASL 身份验证,用户则必须向 Knative 提供凭证才能与 Kafka 集群通信,否则无法生成或消耗事件。

先决条件

  • 在 OpenShift Container Platform 上具有集群或专用管理员权限。
  • OpenShift Serverless Operator、Knative Eventing 和 KnativeKafka CR 已安装在 OpenShift Container Platform 集群中。
  • 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
  • 您有一个 Kafka 集群的用户名和密码。
  • 您已选择使用 SASL 机制,例如 PLAINSCRAM-SHA-256SCRAM-SHA-512
  • 如果启用了 TLS,您还需要 Kafka 集群的 ca.crt 证书文件。
  • 已安装 OpenShift (oc) CLI。

流程

  1. 在所选命名空间中创建证书文件作为 secret:

    $ oc create secret -n <namespace> generic <kafka_auth_secret> \
      --from-file=ca.crt=caroot.pem \
      --from-literal=password="SecretPassword" \
      --from-literal=saslType="SCRAM-SHA-512" \ 1
      --from-literal=user="my-sasl-user"
    1
    SASL 类型可以是 PLAINSCRAM-SHA-256SCRAM-SHA-512
  2. 创建或修改 Kafka 源,使其包含以下 spec 配置:

    apiVersion: sources.knative.dev/v1beta1
    kind: KafkaSource
    metadata:
      name: example-source
    spec:
    ...
      net:
        sasl:
          enable: true
          user:
            secretKeyRef:
              name: <kafka_auth_secret>
              key: user
          password:
            secretKeyRef:
              name: <kafka_auth_secret>
              key: password
          type:
            secretKeyRef:
              name: <kafka_auth_secret>
              key: saslType
        tls:
          enable: true
          caCert: 1
            secretKeyRef:
              name: <kafka_auth_secret>
              key: ca.crt
    ...
    1
    如果您使用公有云 Kafka 服务,则不需要 caCert spec,如 Red Hat OpenShift Streams for Apache Kafka。

5.2.6. 自定义事件源

如果您需要从 Knative 中没有包含在 Knative 的事件制作者或发出没有 CloudEvent 格式的事件的制作者中入站事件,您可以通过创建自定义事件源来实现此目标。您可以使用以下方法之一创建自定义事件源:

  • 通过创建接收器绑定,将 PodSpecable 对象用作事件源。
  • 通过创建容器源,将容器用作事件源。

5.2.6.1. 接收器(sink)绑定

SinkBinding 对象支持将事件产品与交付寻址分离。接收器绑定用于将 事件制作者 连接到事件消费者(sink) 。event producer 是一个 Kubernetes 资源,用于嵌入 PodSpec 模板并生成事件。sink 是一个可寻址的 Kubernetes 对象,可以接收事件。

SinkBinding 对象将环境变量注入到 sink 的 PodTemplateSpec 中,这意味着应用程序代码不需要直接与 Kubernetes API 交互来定位事件目的地。这些环境变量如下:

K_SINK
解析 sink 的 URL。
K_CE_OVERRIDES
指定出站事件覆盖的 JSON 对象。
注意

SinkBinding 对象目前不支持服务的自定义修订名称。

5.2.6.1.1. 使用 YAML 创建接收器绑定

使用 YAML 文件创建 Knative 资源使用声明性 API,它允许您以可重复的方式描述事件源。要使用 YAML 创建接收器绑定,您必须创建一个 YAML 文件来定义 SinkBinding 对象,然后使用 oc apply 命令应用它。

先决条件

  • 在集群中安装了 OpenShift Serverless Operator、Knative Serving 和 Knative Eventing。
  • 安装 OpenShift CLI (oc) 。
  • 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。

流程

  1. 要检查接收器绑定是否已正确设置,请创建一个 Knative 事件显示服务或事件接收器,在日志中转储传入的信息。

    1. 创建服务 YAML 文件:

      服务 YAML 文件示例

      apiVersion: serving.knative.dev/v1
      kind: Service
      metadata:
        name: event-display
      spec:
        template:
          spec:
            containers:
              - image: quay.io/openshift-knative/knative-eventing-sources-event-display:latest

    2. 创建服务:

      $ oc apply -f <filename>
  2. 创建将事件定向到该服务的接收器绑定实例。

    1. 创建接收器绑定 YAML 文件:

      服务 YAML 文件示例

      apiVersion: sources.knative.dev/v1alpha1
      kind: SinkBinding
      metadata:
        name: bind-heartbeat
      spec:
        subject:
          apiVersion: batch/v1
          kind: Job 1
          selector:
            matchLabels:
              app: heartbeat-cron
      
        sink:
          ref:
            apiVersion: serving.knative.dev/v1
            kind: Service
            name: event-display

      1
      在本例中,任何具有标签 app: heartbeat-cron 的作业都将被绑定到事件 sink。
    2. 创建接收器绑定:

      $ oc apply -f <filename>
  3. 创建 CronJob 对象。

    1. 创建 cron 任务 YAML 文件:

      Cron Job YAML 文件示例

      apiVersion: batch/v1
      kind: CronJob
      metadata:
        name: heartbeat-cron
      spec:
        # Run every minute
        schedule: "* * * * *"
        jobTemplate:
          metadata:
            labels:
              app: heartbeat-cron
              bindings.knative.dev/include: "true"
          spec:
            template:
              spec:
                restartPolicy: Never
                containers:
                  - name: single-heartbeat
                    image: quay.io/openshift-knative/heartbeats:latest
                    args:
                      - --period=1
                    env:
                      - name: ONE_SHOT
                        value: "true"
                      - name: POD_NAME
                        valueFrom:
                          fieldRef:
                            fieldPath: metadata.name
                      - name: POD_NAMESPACE
                        valueFrom:
                          fieldRef:
                            fieldPath: metadata.namespace

      重要

      要使用接收器绑定,您必须手动在 Knative 资源中添加 bindings.knative.dev/include=true 标签。

      例如,要将此标签添加到 CronJob 资源,请将以下行添加到 Job 资源 YAML 定义中:

        jobTemplate:
          metadata:
            labels:
              app: heartbeat-cron
              bindings.knative.dev/include: "true"
    2. 创建 cron job:

      $ oc apply -f <filename>
  4. 输入以下命令并检查输出,检查是否正确映射了控制器:

    $ oc get sinkbindings.sources.knative.dev bind-heartbeat -oyaml

    输出示例

    spec:
      sink:
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: event-display
          namespace: default
      subject:
        apiVersion: batch/v1
        kind: Job
        namespace: default
        selector:
          matchLabels:
            app: heartbeat-cron

验证

您可以通过查看消息 dumper 功能日志,来验证 Kubernetes 事件是否已发送到 Knative 事件。

  1. 输入命令:

    $ oc get pods
  2. 输入命令:

    $ oc logs $(oc get pod -o name | grep event-display) -c user-container

    输出示例

    ☁️  cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.eventing.samples.heartbeat
      source: https://knative.dev/eventing-contrib/cmd/heartbeats/#event-test/mypod
      id: 2b72d7bf-c38f-4a98-a433-608fbcdd2596
      time: 2019-10-18T15:23:20.809775386Z
      contenttype: application/json
    Extensions,
      beats: true
      heart: yes
      the: 42
    Data,
      {
        "id": 1,
        "label": ""
      }

5.2.6.1.2. 使用 Knative CLI 创建接收器绑定

您可以使用 kn source binding create 命令通过 Knative (kn) CLI 创建接收器绑定。使用 Knative CLI 创建事件源提供了比直接修改 YAML 文件更精简且直观的用户界面。

先决条件

  • 在集群中安装了 OpenShift Serverless Operator、Knative Serving 和 Knative Eventing。
  • 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
  • 安装 Knative (kn) CLI。
  • 安装 OpenShift CLI (oc) 。
注意

以下操作过程要求您创建 YAML 文件。

如果更改了示例中使用的 YAML 文件的名称,则需要更新对应的 CLI 命令。

流程

  1. 要检查接收器绑定是否已正确设置,请创建一个 Knative 事件显示服务或事件 sink,在日志中转储传入的信息:

    $ kn service create event-display --image quay.io/openshift-knative/knative-eventing-sources-event-display:latest
  2. 创建将事件定向到该服务的接收器绑定实例:

    $ kn source binding create bind-heartbeat --subject Job:batch/v1:app=heartbeat-cron --sink ksvc:event-display
  3. 创建 CronJob 对象。

    1. 创建 cron 任务 YAML 文件:

      Cron Job YAML 文件示例

      apiVersion: batch/v1
      kind: CronJob
      metadata:
        name: heartbeat-cron
      spec:
        # Run every minute
        schedule: "* * * * *"
        jobTemplate:
          metadata:
            labels:
              app: heartbeat-cron
              bindings.knative.dev/include: "true"
          spec:
            template:
              spec:
                restartPolicy: Never
                containers:
                  - name: single-heartbeat
                    image: quay.io/openshift-knative/heartbeats:latest
                    args:
                      - --period=1
                    env:
                      - name: ONE_SHOT
                        value: "true"
                      - name: POD_NAME
                        valueFrom:
                          fieldRef:
                            fieldPath: metadata.name
                      - name: POD_NAMESPACE
                        valueFrom:
                          fieldRef:
                            fieldPath: metadata.namespace

      重要

      要使用接收器绑定,您必须手动在 Knative CR 中添加 bindings.knative.dev/include=true 标签。

      例如,要将此标签添加到 CronJob CR,请将以下行添加到 Job CR YAML 定义中:

        jobTemplate:
          metadata:
            labels:
              app: heartbeat-cron
              bindings.knative.dev/include: "true"
    2. 创建 cron job:

      $ oc apply -f <filename>
  4. 输入以下命令并检查输出,检查是否正确映射了控制器:

    $ kn source binding describe bind-heartbeat

    输出示例

    Name:         bind-heartbeat
    Namespace:    demo-2
    Annotations:  sources.knative.dev/creator=minikube-user, sources.knative.dev/lastModifier=minikub ...
    Age:          2m
    Subject:
      Resource:   job (batch/v1)
      Selector:
        app:      heartbeat-cron
    Sink:
      Name:       event-display
      Resource:   Service (serving.knative.dev/v1)
    
    Conditions:
      OK TYPE     AGE REASON
      ++ Ready     2m

验证

您可以通过查看消息 dumper 功能日志,来验证 Kubernetes 事件是否已发送到 Knative 事件。

  • 您可以输入以下命令来查看消息转储程序功能日志:

    $ oc get pods
    $ oc logs $(oc get pod -o name | grep event-display) -c user-container

    输出示例

    ☁️  cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.eventing.samples.heartbeat
      source: https://knative.dev/eventing-contrib/cmd/heartbeats/#event-test/mypod
      id: 2b72d7bf-c38f-4a98-a433-608fbcdd2596
      time: 2019-10-18T15:23:20.809775386Z
      contenttype: application/json
    Extensions,
      beats: true
      heart: yes
      the: 42
    Data,
      {
        "id": 1,
        "label": ""
      }

5.2.6.1.2.1. Knative CLI sink 标记

当使用 Knative (kn) CLI 创建事件源时,您可以使用 --sink 标志指定事件从该资源发送到的接收器。sink 可以是任何可寻址或可调用的资源,可以从其他资源接收传入的事件。

以下示例创建使用服务 http://event-display.svc.cluster.local 的接收器绑定作为接收器:

使用 sink 标记的命令示例

$ kn source binding create bind-heartbeat \
  --namespace sinkbinding-example \
  --subject "Job:batch/v1:app=heartbeat-cron" \
  --sink http://event-display.svc.cluster.local \ 1
  --ce-override "sink=bound"

1
http://event-display.svc.cluster.local 中的 svc 确定接收器是一个 Knative 服务。其他默认的接收器前缀包括 channelbroker
5.2.6.1.3. 使用 Web 控制台创建接收器绑定

在集群中安装 Knative Eventing 后,您可以使用 web 控制台创建接收器绑定。使用 OpenShift Container Platform Web 控制台提供了一个简化且直观的用户界面来创建事件源。

先决条件

  • 已登陆到 OpenShift Container Platform Web 控制台。
  • OpenShift Serverless Operator、Knative Serving 和 Knative Eventing 已在 OpenShift Container Platform 集群中安装。
  • 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。

流程

  1. 创建 Knative 服务以用作接收器:

    1. Developer 视角中,导航到 +Add YAML
    2. 复制 YAML 示例:

      apiVersion: serving.knative.dev/v1
      kind: Service
      metadata:
        name: event-display
      spec:
        template:
          spec:
            containers:
              - image: quay.io/openshift-knative/knative-eventing-sources-event-display:latest
    3. Create
  2. 创建用作事件源的 CronJob 资源,并每分钟发送一个事件。

    1. Developer 视角中,导航到 +Add YAML
    2. 复制 YAML 示例:

      apiVersion: batch/v1
      kind: CronJob
      metadata:
        name: heartbeat-cron
      spec:
        # Run every minute
        schedule: "*/1 * * * *"
        jobTemplate:
          metadata:
            labels:
              app: heartbeat-cron
              bindings.knative.dev/include: true 1
          spec:
            template:
              spec:
                restartPolicy: Never
                containers:
                  - name: single-heartbeat
                    image: quay.io/openshift-knative/heartbeats
                    args:
                    - --period=1
                    env:
                      - name: ONE_SHOT
                        value: "true"
                      - name: POD_NAME
                        valueFrom:
                          fieldRef:
                            fieldPath: metadata.name
                      - name: POD_NAMESPACE
                        valueFrom:
                          fieldRef:
                            fieldPath: metadata.namespace
      1
      确保包含 bindings.knative.dev/include: true 标签。OpenShift Serverless 的默认命名空间选择行为使用包含模式。
    3. Create
  3. 在与上一步中创建的服务相同的命名空间中创建接收器绑定,或您要将事件发送到的任何其他接收器。

    1. Developer 视角中,导航到 +Add Event Source。此时会显示 Event Sources 页面。
    2. 可选:如果您的事件源有多个供应商,请从 Providers 列表中选择所需的供应商,以过滤供应商的可用事件源。
    3. 选择 Sink Binding,然后单击 Create Event Source。此时会显示 Create Event Source 页面。

      注意

      您可以使用 Form viewYAML view 配置 Sink Binding 设置,并可以在两者间切换。在不同视图间切换时数据会被保留。

    4. apiVersion 字段中,输入 batch/v1
    5. Kind 字段中,输入 Job

      注意

      OpenShift Serverless sink 绑定不支持 CronJob kind,因此 Kind 字段必须以 cron 任务创建的 Job 对象为目标,而不是 cron 作业对象本身。

    6. 选择一个 Sink。这可以是 Resource 或一个 URI。在这个示例中,上一步中创建的 event-display 服务被用作 Resource sink。
    7. Match labels 部分:

      1. Name 字段中输入 app
      2. Value 字段中输入 heartbeat-cron

        注意

        使用带有接收器绑定的 cron 任务时,需要标签选择器,而不是资源名称。这是因为,Cron Job 创建的作业没有可预测的名称,并在名称中包含随机生成的字符串。例如,hearthbeat-cron-1cc23f.

    8. Create

验证

您可以通过查看 Topology 页面和 pod 日志来验证接收器绑定、接收器和 cron 任务是否已创建并正常工作。

  1. Developer 视角中,导航到 Topology
  2. 查看接收器绑定、接收器和心跳 cron 任务。

    在 Topology 视图中查看接收器绑定和服务
  3. 观察在添加了接收器绑定后 cron 任务正在注册成功的作业。这意味着接收器绑定成功重新配置由 cron 任务创建的作业。
  4. 浏览 event-display 服务 pod 的日志,以查看 heartbeats cron 作业生成的事件。
5.2.6.1.4. 接收器绑定引用

您可以通过创建接收器绑定,将 PodSpecable 对象用作事件源。您可以在创建 SinkBinding 对象时配置多个参数。

SinkBinding 对象支持以下参数:

字段描述必需或可选

apiVersion

指定 API 版本,如 sources.knative.dev/v1

必需

kind

将此资源对象标识为 SinkBinding 对象。

必需

metadata

指定唯一标识 SinkBinding 对象的元数据。例如,名称

必需

spec

指定此 SinkBinding 对象的配置信息。

必需

spec.sink

对解析为 URI 作为 sink 的对象的引用。

必需

spec.subject

提及通过绑定实施来增强运行时合同的资源。

必需

spec.ceOverrides

定义覆盖来控制发送到 sink 的事件的输出格式和修改。

选填

5.2.6.1.4.1. 主题参数

Subject 参数引用通过绑定实施来增强运行时合同的资源。您可以为 Subject 定义配置多个字段。

Subject 定义支持以下字段:

字段描述必需或可选

apiVersion

引用的 API 版本。

必需

kind

引用的类型。

必需

namespace

引用的命名空间。如果省略,则默认为对象的命名空间。

选填

name

引用的名称。

如果配置 选择器,请不要使用。

selector

引用的选择器。

如果配置 名称,请不要使用。

selector.matchExpressions

标签选择器要求列表。

仅使用 matchExpressionsmatchLabels 中的一个。

selector.matchExpressions.key

选择器应用到的标签键。

使用 matchExpressions 时需要此项。

selector.matchExpressions.operator

代表键与一组值的关系。有效的运算符为 InNotInExistsDoesNotExist

使用 matchExpressions 时需要此项。

selector.matchExpressions.values

字符串值数组。如果 operator 参数值是 InNotIn,则值数组必须是非空的。如果 operator 参数值是 ExistsDoesNotExist,则值数组必须为空。这个数组会在策略性合并补丁中被替换。

使用 matchExpressions 时需要此项。

selector.matchLabels

键值对映射.matchLabels 映射中的每个键值对等同于 matchExpressions 元素,其中 key 字段是 matchLabels.<key>OperatorInvalues 数组仅包含 matchLabels.<value>

仅使用 matchExpressionsmatchLabels 中的一个。

主题参数示例

根据以下 YAML,选择 default 命名空间中名为 mysubjectDeployment 对象:

apiVersion: sources.knative.dev/v1
kind: SinkBinding
metadata:
  name: bind-heartbeat
spec:
  subject:
    apiVersion: apps/v1
    kind: Deployment
    namespace: default
    name: mysubject
  ...

根据以下 YAML,可以选择在 default 命名空间中带有 working=example 标签的 Job 对象:

apiVersion: sources.knative.dev/v1
kind: SinkBinding
metadata:
  name: bind-heartbeat
spec:
  subject:
    apiVersion: batch/v1
    kind: Job
    namespace: default
    selector:
      matchLabels:
        working: example
  ...

根据以下 YAML,可以选择在 default 命名空间中带有标签 working=exampleworking=samplePod 对象:

apiVersion: sources.knative.dev/v1
kind: SinkBinding
metadata:
  name: bind-heartbeat
spec:
  subject:
    apiVersion: v1
    kind: Pod
    namespace: default
    selector:
      - matchExpression:
        key: working
        operator: In
        values:
          - example
          - sample
  ...
5.2.6.1.4.2. CloudEvent 覆盖

ceOverrides 定义提供覆盖控制发送到 sink 的 CloudEvent 输出格式和修改。您可以为 ceOverrides 定义配置多个字段。

ceOverrides 定义支持以下字段:

字段描述必需或可选

extensions

指定在出站事件中添加或覆盖哪些属性。每个 extensions 键值对在事件上作为属性扩展进行独立设置。

选填

注意

仅允许有效的 CloudEvent 属性名称作为扩展。您无法从扩展覆盖配置设置 spec 定义的属性。例如,您无法修改 type 属性。

CloudEvent Overrides 示例

apiVersion: sources.knative.dev/v1
kind: SinkBinding
metadata:
  name: bind-heartbeat
spec:
  ...
  ceOverrides:
    extensions:
      extra: this is an extra attribute
      additional: 42

这会在 主题 上设置 K_CE_OVERRIDES 环境变量:

输出示例

{ "extensions": { "extra": "this is an extra attribute", "additional": "42" } }

5.2.6.1.4.3. include 标签

要使用接收器绑定,您需要为资源或包含资源的命名空间分配 bindings.knative.dev/include: "true" 标签。如果资源定义不包括该标签,集群管理员可以通过运行以下命令将它附加到命名空间:

$ oc label namespace <namespace> bindings.knative.dev/include=true

5.2.6.2. 容器源

容器源创建容器镜像来生成事件并将事件发送到 sink。您可以通过创建容器镜像和使用您的镜像 URI 的 ContainerSource 对象,使用容器源创建自定义事件源。

5.2.6.2.1. 创建容器镜像的指南

两个环境变量由容器源控制器注入:K_SINKK_CE_OVERRIDES。这些变量分别从 sinkandceOverrides spec 解析。事件发送到 K_SINK 环境变量中指定的 sink URI。该消息必须使用 CloudEvent HTTP 格式作为 POST 发送。

容器镜像示例

以下是心跳容器镜像的示例:

package main

import (
	"context"
	"encoding/json"
	"flag"
	"fmt"
	"log"
	"os"
	"strconv"
	"time"

	duckv1 "knative.dev/pkg/apis/duck/v1"

	cloudevents "github.com/cloudevents/sdk-go/v2"
	"github.com/kelseyhightower/envconfig"
)

type Heartbeat struct {
	Sequence int    `json:"id"`
	Label    string `json:"label"`
}

var (
	eventSource string
	eventType   string
	sink        string
	label       string
	periodStr   string
)

func init() {
	flag.StringVar(&eventSource, "eventSource", "", "the event-source (CloudEvents)")
	flag.StringVar(&eventType, "eventType", "dev.knative.eventing.samples.heartbeat", "the event-type (CloudEvents)")
	flag.StringVar(&sink, "sink", "", "the host url to heartbeat to")
	flag.StringVar(&label, "label", "", "a special label")
	flag.StringVar(&periodStr, "period", "5", "the number of seconds between heartbeats")
}

type envConfig struct {
	// Sink URL where to send heartbeat cloud events
	Sink string `envconfig:"K_SINK"`

	// CEOverrides are the CloudEvents overrides to be applied to the outbound event.
	CEOverrides string `envconfig:"K_CE_OVERRIDES"`

	// Name of this pod.
	Name string `envconfig:"POD_NAME" required:"true"`

	// Namespace this pod exists in.
	Namespace string `envconfig:"POD_NAMESPACE" required:"true"`

	// Whether to run continuously or exit.
	OneShot bool `envconfig:"ONE_SHOT" default:"false"`
}

func main() {
	flag.Parse()

	var env envConfig
	if err := envconfig.Process("", &env); err != nil {
		log.Printf("[ERROR] Failed to process env var: %s", err)
		os.Exit(1)
	}

	if env.Sink != "" {
		sink = env.Sink
	}

	var ceOverrides *duckv1.CloudEventOverrides
	if len(env.CEOverrides) > 0 {
		overrides := duckv1.CloudEventOverrides{}
		err := json.Unmarshal([]byte(env.CEOverrides), &overrides)
		if err != nil {
			log.Printf("[ERROR] Unparseable CloudEvents overrides %s: %v", env.CEOverrides, err)
			os.Exit(1)
		}
		ceOverrides = &overrides
	}

	p, err := cloudevents.NewHTTP(cloudevents.WithTarget(sink))
	if err != nil {
		log.Fatalf("failed to create http protocol: %s", err.Error())
	}

	c, err := cloudevents.NewClient(p, cloudevents.WithUUIDs(), cloudevents.WithTimeNow())
	if err != nil {
		log.Fatalf("failed to create client: %s", err.Error())
	}

	var period time.Duration
	if p, err := strconv.Atoi(periodStr); err != nil {
		period = time.Duration(5) * time.Second
	} else {
		period = time.Duration(p) * time.Second
	}

	if eventSource == "" {
		eventSource = fmt.Sprintf("https://knative.dev/eventing-contrib/cmd/heartbeats/#%s/%s", env.Namespace, env.Name)
		log.Printf("Heartbeats Source: %s", eventSource)
	}

	if len(label) > 0 && label[0] == '"' {
		label, _ = strconv.Unquote(label)
	}
	hb := &Heartbeat{
		Sequence: 0,
		Label:    label,
	}
	ticker := time.NewTicker(period)
	for {
		hb.Sequence++

		event := cloudevents.NewEvent("1.0")
		event.SetType(eventType)
		event.SetSource(eventSource)
		event.SetExtension("the", 42)
		event.SetExtension("heart", "yes")
		event.SetExtension("beats", true)

		if ceOverrides != nil && ceOverrides.Extensions != nil {
			for n, v := range ceOverrides.Extensions {
				event.SetExtension(n, v)
			}
		}

		if err := event.SetData(cloudevents.ApplicationJSON, hb); err != nil {
			log.Printf("failed to set cloudevents data: %s", err.Error())
		}

		log.Printf("sending cloudevent to %s", sink)
		if res := c.Send(context.Background(), event); !cloudevents.IsACK(res) {
			log.Printf("failed to send cloudevent: %v", res)
		}

		if env.OneShot {
			return
		}

		// Wait for next tick
		<-ticker.C
	}
}

以下是引用先前心跳容器镜像的容器源示例:

apiVersion: sources.knative.dev/v1
kind: ContainerSource
metadata:
  name: test-heartbeats
spec:
  template:
    spec:
      containers:
        # This corresponds to a heartbeats image URI that you have built and published
        - image: gcr.io/knative-releases/knative.dev/eventing/cmd/heartbeats
          name: heartbeats
          args:
            - --period=1
          env:
            - name: POD_NAME
              value: "example-pod"
            - name: POD_NAMESPACE
              value: "event-test"
  sink:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: example-service
...
5.2.6.2.2. 使用 Knative CLI 创建和管理容器源

您可以使用 kn source container 命令来使用 Knative (kn) 创建和管理容器源。使用 Knative CLI 创建事件源提供了比直接修改 YAML 文件更精简且直观的用户界面。

创建容器源

$ kn source container create <container_source_name> --image <image_uri> --sink <sink>

删除容器源

$ kn source container delete <container_source_name>

描述容器源

$ kn source container describe <container_source_name>

列出现有容器源

$ kn source container list

以 YAML 格式列出现有容器源

$ kn source container list -o yaml

更新容器源

此命令为现有容器源更新镜像 URI:

$ kn source container update <container_source_name> --image <image_uri>
5.2.6.2.3. 使用 Web 控制台创建容器源

在集群中安装 Knative Eventing 后,您可以使用 Web 控制台创建容器源。使用 OpenShift Container Platform Web 控制台提供了一个简化且直观的用户界面来创建事件源。

先决条件

  • 已登陆到 OpenShift Container Platform Web 控制台。
  • OpenShift Serverless Operator、Knative Serving 和 Knative Eventing 已在 OpenShift Container Platform 集群中安装。
  • 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。

流程

  1. Developer 视角中,导航到 +Add Event Source。此时会显示 Event Sources 页面。
  2. 选择 Container Source,然后点 Create Event Source。此时会显示 Create Event Source 页面。
  3. 使用 Form viewYAML 视图配置 Container Source 设置:

    注意

    您可以在 Form viewYAML view 间进行切换。在不同视图间切换时数据会被保留。

    1. Image 字段中,输入您要在由容器源创建的容器中运行的镜像的 URI。
    2. Name 字段中输入镜像的名称。
    3. 可选:在 Arguments 参数字段中,输入要传递给容器的任何参数。
    4. 可选:在 Environment variables 字段中,添加容器中要设置的任何环境变量。
    5. Sink 部分,添加一个接收器,其中将容器源的事件路由到其中。如果使用 Form 视图,您可以从以下选项中选择:

      1. 选择 Resource 使用频道、代理或服务作为事件源的接收器。
      2. 选择 URI,以指定容器源的事件路由到的位置。
  4. 配置完容器源后,点 Create
5.2.6.2.4. 容器源参考

您可以通过创建 ContainerSource 对象来使用容器作为事件源。您可以在创建 ContainerSource 对象时配置多个参数。

ContainerSource 对象支持以下字段:

字段描述必需或可选

apiVersion

指定 API 版本,如 sources.knative.dev/v1

必需

kind

将此资源对象标识为 ContainerSource 对象。

必需

metadata

指定唯一标识 ContainerSource 对象的元数据。例如,name

必需

spec

指定此 ContainerSource 对象的配置信息。

必需

spec.sink

对解析为 URI 作为 sink 的对象的引用。

必需

spec.template

ContainerSource 对象的 template 规格。

必需

spec.ceOverrides

定义覆盖来控制发送到 sink 的事件的输出格式和修改。

选填

模板参数示例

apiVersion: sources.knative.dev/v1
kind: ContainerSource
metadata:
  name: test-heartbeats
spec:
  template:
    spec:
      containers:
        - image: quay.io/openshift-knative/heartbeats:latest
          name: heartbeats
          args:
            - --period=1
          env:
            - name: POD_NAME
              value: "mypod"
            - name: POD_NAMESPACE
              value: "event-test"
  ...

5.2.6.2.4.1. CloudEvent 覆盖

ceOverrides 定义提供覆盖控制发送到 sink 的 CloudEvent 输出格式和修改。您可以为 ceOverrides 定义配置多个字段。

ceOverrides 定义支持以下字段:

字段描述必需或可选

extensions

指定在出站事件中添加或覆盖哪些属性。每个 extensions 键值对在事件上作为属性扩展进行独立设置。

选填

注意

仅允许有效的 CloudEvent 属性名称作为扩展。您无法从扩展覆盖配置设置 spec 定义的属性。例如,您无法修改 type 属性。

CloudEvent Overrides 示例

apiVersion: sources.knative.dev/v1
kind: ContainerSource
metadata:
  name: test-heartbeats
spec:
  ...
  ceOverrides:
    extensions:
      extra: this is an extra attribute
      additional: 42

这会在 主题 上设置 K_CE_OVERRIDES 环境变量:

输出示例

{ "extensions": { "extra": "this is an extra attribute", "additional": "42" } }

5.2.7. 使用 Developer 视角将事件源连接到接收器(sink)

当使用 OpenShift Container Platform Web 控制台创建事件源时,您可以指定事件从该源发送到的接收器。sink 可以是任何可寻址或可调用的资源,可以从其他资源接收传入的事件。

5.2.7.1. 使用 Developer 视角将事件源连接到接收器(sink)

先决条件

  • OpenShift Serverless Operator、Knative Serving 和 Knative Eventing 已在 OpenShift Container Platform 集群中安装。
  • 已登陆到 web 控制台,且处于 Developer 视角。
  • 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
  • 您已创建了 sink,如 Knative 服务、频道或代理。

流程

  1. 进入 +Add Event Source 并选择您要创建的事件源类型,创建任何类型的事件源。
  2. Create Event Source 表单视图的 Sink 部分,在 Resource 列表中选择您的接收器。
  3. Create

验证

您可以通过查看 Topology 页面来验证事件源是否已创建并连接到 sink。

  1. Developer 视角中,导航到 Topology
  2. 查看事件源并点连接的接收器查看右侧面板中的接收器详情。
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.