2.6. 自定义事件源


如果您需要从 Knative 中没有包含在 Knative 的事件制作者或发出没有 CloudEvent 格式的事件的制作者中入站事件,您可以通过创建自定义事件源来实现此目标。您可以使用以下方法之一创建自定义事件源:

  • 通过创建接收器绑定,将 PodSpecable 对象用作事件源。
  • 通过创建容器源,将容器用作事件源。

2.6.1. 接收器(sink)绑定

SinkBinding 对象支持将事件产品与交付寻址分离。接收器绑定用于将 事件制作者 连接到事件消费者(sink) 。event producer 是一个 Kubernetes 资源,用于嵌入 PodSpec 模板并生成事件。sink 是一个可寻址的 Kubernetes 对象,可以接收事件。

SinkBinding 对象将环境变量注入到 sink 的 PodTemplateSpec 中,这意味着应用程序代码不需要直接与 Kubernetes API 交互来定位事件目的地。这些环境变量如下:

K_SINK
解析 sink 的 URL。
K_CE_OVERRIDES
指定出站事件覆盖的 JSON 对象。
注意

SinkBinding 对象目前不支持服务的自定义修订名称。

2.6.1.1. 使用 YAML 创建接收器绑定

使用 YAML 文件创建 Knative 资源使用声明性 API,它允许您以可重复的方式描述事件源。要使用 YAML 创建接收器绑定,您必须创建一个 YAML 文件来定义 SinkBinding 对象,然后使用 oc apply 命令应用它。

先决条件

  • 在集群中安装了 OpenShift Serverless Operator、Knative Serving 和 Knative Eventing。
  • 安装 OpenShift CLI (oc) 。
  • 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。

流程

  1. 要检查接收器绑定是否已正确设置,请创建一个 Knative 事件显示服务或事件接收器,在日志中转储传入的信息。

    1. 创建服务 YAML 文件:

      服务 YAML 文件示例

      apiVersion: serving.knative.dev/v1
      kind: Service
      metadata:
        name: event-display
      spec:
        template:
          spec:
            containers:
              - image: quay.io/openshift-knative/showcase

    2. 创建服务:

      $ oc apply -f <filename>
  2. 创建将事件定向到该服务的接收器绑定实例。

    1. 创建接收器绑定 YAML 文件:

      服务 YAML 文件示例

      apiVersion: sources.knative.dev/v1alpha1
      kind: SinkBinding
      metadata:
        name: bind-heartbeat
      spec:
        subject:
          apiVersion: batch/v1
          kind: Job 1
          selector:
            matchLabels:
              app: heartbeat-cron
      
        sink:
          ref:
            apiVersion: serving.knative.dev/v1
            kind: Service
            name: event-display

      1
      在本例中,任何具有标签 app: heartbeat-cron 的作业都将被绑定到事件 sink。
    2. 创建接收器绑定:

      $ oc apply -f <filename>
  3. 创建 CronJob 对象。

    1. 创建 cron 任务 YAML 文件:

      Cron Job YAML 文件示例

      apiVersion: batch/v1
      kind: CronJob
      metadata:
        name: heartbeat-cron
      spec:
        # Run every minute
        schedule: "* * * * *"
        jobTemplate:
          metadata:
            labels:
              app: heartbeat-cron
              bindings.knative.dev/include: "true"
          spec:
            template:
              spec:
                restartPolicy: Never
                containers:
                  - name: single-heartbeat
                    image: quay.io/openshift-knative/heartbeats:latest
                    args:
                      - --period=1
                    env:
                      - name: ONE_SHOT
                        value: "true"
                      - name: POD_NAME
                        valueFrom:
                          fieldRef:
                            fieldPath: metadata.name
                      - name: POD_NAMESPACE
                        valueFrom:
                          fieldRef:
                            fieldPath: metadata.namespace

      重要

      要使用接收器绑定,您必须手动在 Knative 资源中添加 bindings.knative.dev/include=true 标签。

      例如,要将此标签添加到 CronJob 资源,请将以下行添加到 Job 资源 YAML 定义中:

        jobTemplate:
          metadata:
            labels:
              app: heartbeat-cron
              bindings.knative.dev/include: "true"
    2. 创建 cron job:

      $ oc apply -f <filename>
  4. 输入以下命令并检查输出,检查是否正确映射了控制器:

    $ oc get sinkbindings.sources.knative.dev bind-heartbeat -oyaml

    输出示例

    spec:
      sink:
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: event-display
          namespace: default
      subject:
        apiVersion: batch/v1
        kind: Job
        namespace: default
        selector:
          matchLabels:
            app: heartbeat-cron

验证

您可以通过查看消息 dumper 功能日志,来验证 Kubernetes 事件是否已发送到 Knative 事件。

  1. 输入命令:

    $ oc get pods
  2. 输入命令:

    $ oc logs $(oc get pod -o name | grep event-display) -c user-container

    输出示例

    ☁️  cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.eventing.samples.heartbeat
      source: https://knative.dev/eventing-contrib/cmd/heartbeats/#event-test/mypod
      id: 2b72d7bf-c38f-4a98-a433-608fbcdd2596
      time: 2019-10-18T15:23:20.809775386Z
      contenttype: application/json
    Extensions,
      beats: true
      heart: yes
      the: 42
    Data,
      {
        "id": 1,
        "label": ""
      }

2.6.1.2. 使用 Knative CLI 创建接收器绑定

您可以使用 kn source binding create 命令通过 Knative (kn) CLI 创建接收器绑定。使用 Knative CLI 创建事件源提供了比直接修改 YAML 文件更精简且直观的用户界面。

先决条件

  • 在集群中安装了 OpenShift Serverless Operator、Knative Serving 和 Knative Eventing。
  • 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。
  • 安装 Knative (kn) CLI。
  • 安装 OpenShift CLI (oc) 。
注意

以下操作过程要求您创建 YAML 文件。

如果更改了示例中使用的 YAML 文件的名称,则需要更新对应的 CLI 命令。

流程

  1. 要检查接收器绑定是否已正确设置,请创建一个 Knative 事件显示服务或事件 sink,在日志中转储传入的信息:

    $ kn service create event-display --image quay.io/openshift-knative/showcase
  2. 创建将事件定向到该服务的接收器绑定实例:

    $ kn source binding create bind-heartbeat --subject Job:batch/v1:app=heartbeat-cron --sink ksvc:event-display
  3. 创建 CronJob 对象。

    1. 创建 cron 任务 YAML 文件:

      Cron Job YAML 文件示例

      apiVersion: batch/v1
      kind: CronJob
      metadata:
        name: heartbeat-cron
      spec:
        # Run every minute
        schedule: "* * * * *"
        jobTemplate:
          metadata:
            labels:
              app: heartbeat-cron
              bindings.knative.dev/include: "true"
          spec:
            template:
              spec:
                restartPolicy: Never
                containers:
                  - name: single-heartbeat
                    image: quay.io/openshift-knative/heartbeats:latest
                    args:
                      - --period=1
                    env:
                      - name: ONE_SHOT
                        value: "true"
                      - name: POD_NAME
                        valueFrom:
                          fieldRef:
                            fieldPath: metadata.name
                      - name: POD_NAMESPACE
                        valueFrom:
                          fieldRef:
                            fieldPath: metadata.namespace

      重要

      要使用接收器绑定,您必须手动在 Knative CR 中添加 bindings.knative.dev/include=true 标签。

      例如,要将此标签添加到 CronJob CR,请将以下行添加到 Job CR YAML 定义中:

        jobTemplate:
          metadata:
            labels:
              app: heartbeat-cron
              bindings.knative.dev/include: "true"
    2. 创建 cron job:

      $ oc apply -f <filename>
  4. 输入以下命令并检查输出,检查是否正确映射了控制器:

    $ kn source binding describe bind-heartbeat

    输出示例

    Name:         bind-heartbeat
    Namespace:    demo-2
    Annotations:  sources.knative.dev/creator=minikube-user, sources.knative.dev/lastModifier=minikub ...
    Age:          2m
    Subject:
      Resource:   job (batch/v1)
      Selector:
        app:      heartbeat-cron
    Sink:
      Name:       event-display
      Resource:   Service (serving.knative.dev/v1)
    
    Conditions:
      OK TYPE     AGE REASON
      ++ Ready     2m

验证

您可以通过查看消息 dumper 功能日志,来验证 Kubernetes 事件是否已发送到 Knative 事件。

  • 您可以输入以下命令来查看消息转储程序功能日志:

    $ oc get pods
    $ oc logs $(oc get pod -o name | grep event-display) -c user-container

    输出示例

    ☁️  cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.eventing.samples.heartbeat
      source: https://knative.dev/eventing-contrib/cmd/heartbeats/#event-test/mypod
      id: 2b72d7bf-c38f-4a98-a433-608fbcdd2596
      time: 2019-10-18T15:23:20.809775386Z
      contenttype: application/json
    Extensions,
      beats: true
      heart: yes
      the: 42
    Data,
      {
        "id": 1,
        "label": ""
      }

2.6.1.2.1. Knative CLI sink 标记

当使用 Knative (kn) CLI 创建事件源时,您可以使用 --sink 标志指定事件从该资源发送到的接收器。sink 可以是任何可寻址或可调用的资源,可以从其他资源接收传入的事件。

以下示例创建使用服务 http://event-display.svc.cluster.local 的接收器绑定作为接收器:

使用 sink 标记的命令示例

$ kn source binding create bind-heartbeat \
  --namespace sinkbinding-example \
  --subject "Job:batch/v1:app=heartbeat-cron" \
  --sink http://event-display.svc.cluster.local \ 1
  --ce-override "sink=bound"

1
http://event-display.svc.cluster.local 中的 svc 确定接收器是一个 Knative 服务。其他默认的接收器前缀包括 channelbroker

2.6.1.3. 使用 Web 控制台创建接收器绑定

在集群中安装 Knative Eventing 后,您可以使用 web 控制台创建接收器绑定。使用 OpenShift Container Platform Web 控制台提供了一个简化且直观的用户界面来创建事件源。

先决条件

  • 已登陆到 OpenShift Container Platform Web 控制台。
  • OpenShift Serverless Operator、Knative Serving 和 Knative Eventing 已在 OpenShift Container Platform 集群中安装。
  • 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。

流程

  1. 创建 Knative 服务以用作接收器:

    1. Developer 视角中,导航到 +Add YAML
    2. 复制 YAML 示例:

      apiVersion: serving.knative.dev/v1
      kind: Service
      metadata:
        name: event-display
      spec:
        template:
          spec:
            containers:
              - image: quay.io/openshift-knative/showcase
    3. Create
  2. 创建用作事件源的 CronJob 资源,并每分钟发送一个事件。

    1. Developer 视角中,导航到 +Add YAML
    2. 复制 YAML 示例:

      apiVersion: batch/v1
      kind: CronJob
      metadata:
        name: heartbeat-cron
      spec:
        # Run every minute
        schedule: "*/1 * * * *"
        jobTemplate:
          metadata:
            labels:
              app: heartbeat-cron
              bindings.knative.dev/include: true 1
          spec:
            template:
              spec:
                restartPolicy: Never
                containers:
                  - name: single-heartbeat
                    image: quay.io/openshift-knative/heartbeats
                    args:
                    - --period=1
                    env:
                      - name: ONE_SHOT
                        value: "true"
                      - name: POD_NAME
                        valueFrom:
                          fieldRef:
                            fieldPath: metadata.name
                      - name: POD_NAMESPACE
                        valueFrom:
                          fieldRef:
                            fieldPath: metadata.namespace
      1
      确保包含 bindings.knative.dev/include: true 标签。OpenShift Serverless 的默认命名空间选择行为使用包含模式。
    3. Create
  3. 在与上一步中创建的服务相同的命名空间中创建接收器绑定,或您要将事件发送到的任何其他接收器。

    1. Developer 视角中,导航到 +Add Event Source。此时会显示 Event Sources 页面。
    2. 可选:如果您的事件源有多个供应商,请从 Providers 列表中选择所需的供应商,以过滤供应商的可用事件源。
    3. 选择 Sink Binding,然后单击 Create Event Source。此时会显示 Create Event Source 页面。

      注意

      您可以使用 Form viewYAML view 配置 Sink Binding 设置,并可以在两者间切换。在不同视图间切换时数据会被保留。

    4. apiVersion 字段中,输入 batch/v1
    5. Kind 字段中,输入 Job

      注意

      OpenShift Serverless sink 绑定不支持 CronJob kind,因此 Kind 字段必须以 cron 任务创建的 Job 对象为目标,而不是 cron 作业对象本身。

    6. Target 部分中,选择您的事件 sink。这可以是 Resource 或一个 URI

      1. 选择 Resource 来使用频道、代理或服务作为事件源的事件 sink。在本例中,上一步中创建的 event-display 服务用作目标 资源
      2. 选择 URI 来指定事件路由到的 Uniform Resource Identifier (URI)。
    7. Match labels 部分:

      1. Name 字段中输入 app
      2. Value 字段中输入 heartbeat-cron

        注意

        使用带有接收器绑定的 cron 任务时,需要标签选择器,而不是资源名称。这是因为,Cron Job 创建的作业没有可预测的名称,并在名称中包含随机生成的字符串。例如,hearthbeat-cron-1cc23f.

    8. Create

验证

您可以通过查看 Topology 页面和 pod 日志来验证接收器绑定、接收器和 cron 任务是否已创建并正常工作。

  1. Developer 视角中,导航到 Topology
  2. 查看接收器绑定、接收器和心跳 cron 任务。

    在 Topology 视图中查看接收器绑定和服务
  3. 观察在添加了接收器绑定后 cron 任务正在注册成功的作业。这意味着接收器绑定成功重新配置由 cron 任务创建的作业。
  4. 浏览 event-display 服务,以查看 heartbeats cron 作业生成的事件。

    在 Web UI 中查看事件

2.6.1.4. 接收器绑定引用

您可以通过创建接收器绑定,将 PodSpecable 对象用作事件源。您可以在创建 SinkBinding 对象时配置多个参数。

SinkBinding 对象支持以下参数:

字段描述必需或可选

apiVersion

指定 API 版本,如 sources.knative.dev/v1

必需

kind

将此资源对象标识为 SinkBinding 对象。

必需

metadata

指定唯一标识 SinkBinding 对象的元数据。例如,名称

必需

spec

指定此 SinkBinding 对象的配置信息。

必需

spec.sink

对解析为 URI 作为 sink 的对象的引用。

必需

spec.subject

提及通过绑定实施来增强运行时合同的资源。

必需

spec.ceOverrides

定义覆盖来控制发送到 sink 的事件的输出格式和修改。

选填

2.6.1.4.1. 主题参数

Subject 参数引用通过绑定实施来增强运行时合同的资源。您可以为 Subject 定义配置多个字段。

Subject 定义支持以下字段:

字段描述必需或可选

apiVersion

引用的 API 版本。

必需

kind

引用的类型。

必需

namespace

引用的命名空间。如果省略,则默认为对象的命名空间。

选填

name

引用的名称。

如果配置 选择器,请不要使用。

selector

引用的选择器。

如果配置 名称,请不要使用。

selector.matchExpressions

标签选择器要求列表。

仅使用 matchExpressionsmatchLabels 中的一个。

selector.matchExpressions.key

选择器应用到的标签键。

使用 matchExpressions 时需要此项。

selector.matchExpressions.operator

代表键与一组值的关系。有效的运算符为 InNotInExistsDoesNotExist

使用 matchExpressions 时需要此项。

selector.matchExpressions.values

字符串值数组。如果 operator 参数值是 InNotIn,则值数组必须是非空的。如果 operator 参数值是 ExistsDoesNotExist,则值数组必须为空。这个数组会在策略性合并补丁中被替换。

使用 matchExpressions 时需要此项。

selector.matchLabels

键值对映射.matchLabels 映射中的每个键值对等同于 matchExpressions 元素,其中 key 字段是 matchLabels.<key>OperatorInvalues 数组仅包含 matchLabels.<value>

仅使用 matchExpressionsmatchLabels 中的一个。

主题参数示例

根据以下 YAML,选择 default 命名空间中名为 mysubjectDeployment 对象:

apiVersion: sources.knative.dev/v1
kind: SinkBinding
metadata:
  name: bind-heartbeat
spec:
  subject:
    apiVersion: apps/v1
    kind: Deployment
    namespace: default
    name: mysubject
  ...

根据以下 YAML,可以选择在 default 命名空间中带有 working=example 标签的 Job 对象:

apiVersion: sources.knative.dev/v1
kind: SinkBinding
metadata:
  name: bind-heartbeat
spec:
  subject:
    apiVersion: batch/v1
    kind: Job
    namespace: default
    selector:
      matchLabels:
        working: example
  ...

根据以下 YAML,可以选择在 default 命名空间中带有标签 working=exampleworking=samplePod 对象:

apiVersion: sources.knative.dev/v1
kind: SinkBinding
metadata:
  name: bind-heartbeat
spec:
  subject:
    apiVersion: v1
    kind: Pod
    namespace: default
    selector:
      - matchExpression:
        key: working
        operator: In
        values:
          - example
          - sample
  ...
2.6.1.4.2. CloudEvent 覆盖

ceOverrides 定义提供覆盖控制发送到 sink 的 CloudEvent 输出格式和修改。您可以为 ceOverrides 定义配置多个字段。

ceOverrides 定义支持以下字段:

字段描述必需或可选

extensions

指定在出站事件中添加或覆盖哪些属性。每个 extensions 键值对在事件上作为属性扩展进行独立设置。

选填

注意

仅允许有效的 CloudEvent 属性名称作为扩展。您无法从扩展覆盖配置设置 spec 定义的属性。例如,您无法修改 type 属性。

CloudEvent Overrides 示例

apiVersion: sources.knative.dev/v1
kind: SinkBinding
metadata:
  name: bind-heartbeat
spec:
  ...
  ceOverrides:
    extensions:
      extra: this is an extra attribute
      additional: 42

这会在 主题 上设置 K_CE_OVERRIDES 环境变量:

输出示例

{ "extensions": { "extra": "this is an extra attribute", "additional": "42" } }

2.6.1.4.3. include 标签

要使用接收器绑定,您需要为资源或包含资源的命名空间分配 bindings.knative.dev/include: "true" 标签。如果资源定义不包括该标签,集群管理员可以通过运行以下命令将它附加到命名空间:

$ oc label namespace <namespace> bindings.knative.dev/include=true

2.6.1.5. 将 Service Mesh 与 SinkBinding 集成

先决条件

  • 您已将 Service Mesh 与 OpenShift Serverless 集成。

流程

  1. 在作为 Service MeshMemberRoll 的成员的命名空间中创建一个 Service。

    apiVersion: serving.knative.dev/v1
    kind: Service
    metadata:
      name: event-display
      namespace: <namespace> 1
    spec:
      template:
        metadata:
          annotations:
            sidecar.istio.io/inject: "true" 2
            sidecar.istio.io/rewriteAppHTTPProbers: "true"
        spec:
          containers:
          - image: quay.io/openshift-knative/knative-eventing-sources-event-display:latest
    1
    作为 ServiceMeshMemberRoll 的成员的命名空间。
    2
    将 Service Mesh sidecar 注入 Knative 服务 pod。
  2. 应用 Service 资源。

    $ oc apply -f <filename>
  3. 创建 SinkBinding

    apiVersion: sources.knative.dev/v1
    kind: SinkBinding
    metadata:
      name: bind-heartbeat
      namespace: <namespace> 1
    spec:
      subject:
        apiVersion: batch/v1
        kind: Job 2
        selector:
          matchLabels:
            app: heartbeat-cron
    
      sink:
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: event-display
    1
    作为 ServiceMeshMemberRoll 的成员的命名空间。
    2
    在本例中,任何带有标签 app: heartbeat-cron 的作业都绑定到事件 sink。
  4. 应用 SinkBinding 资源。

    $ oc apply -f <filename>
  5. 创建 CronJob

    apiVersion: batch/v1
    kind: CronJob
    metadata:
      name: heartbeat-cron
      namespace: <namespace> 1
    spec:
      # Run every minute
      schedule: "* * * * *"
      jobTemplate:
        metadata:
          labels:
            app: heartbeat-cron
            bindings.knative.dev/include: "true"
        spec:
          template:
            metadata:
              annotations:
                sidecar.istio.io/inject: "true" 2
                sidecar.istio.io/rewriteAppHTTPProbers: "true"
            spec:
              restartPolicy: Never
              containers:
                - name: single-heartbeat
                  image: quay.io/openshift-knative/heartbeats:latest
                  args:
                    - --period=1
                  env:
                    - name: ONE_SHOT
                      value: "true"
                    - name: POD_NAME
                      valueFrom:
                        fieldRef:
                          fieldPath: metadata.name
                    - name: POD_NAMESPACE
                      valueFrom:
                        fieldRef:
                          fieldPath: metadata.namespace
    1
    作为 ServiceMeshMemberRoll 的成员的命名空间。
    2
    将 Service Mesh sidecar 注入 CronJob pod。
  6. 应用 CronJob 资源。

    $ oc apply -f <filename>

验证

要验证事件是否已发送到 Knative 事件 sink,请查看消息转储程序功能日志。

  1. 输入以下命令:

    $ oc get pods
  2. 输入以下命令:

    $ oc logs $(oc get pod -o name | grep event-display) -c user-container

    输出示例

    ☁️  cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.eventing.samples.heartbeat
      source: https://knative.dev/eventing/test/heartbeats/#event-test/mypod
      id: 2b72d7bf-c38f-4a98-a433-608fbcdd2596
      time: 2019-10-18T15:23:20.809775386Z
      contenttype: application/json
    Extensions,
      beats: true
      heart: yes
      the: 42
    Data,
      {
        "id": 1,
        "label": ""
      }

2.6.2. 容器源

容器源创建容器镜像来生成事件并将事件发送到 sink。您可以通过创建容器镜像和使用您的镜像 URI 的 ContainerSource 对象,使用容器源创建自定义事件源。

2.6.2.1. 创建容器镜像的指南

两个环境变量由容器源控制器注入:K_SINKK_CE_OVERRIDES。这些变量分别从 sinkandceOverrides spec 解析。事件发送到 K_SINK 环境变量中指定的 sink URI。该消息必须使用 CloudEvent HTTP 格式作为 POST 发送。

容器镜像示例

以下是心跳容器镜像的示例:

package main

import (
	"context"
	"encoding/json"
	"flag"
	"fmt"
	"log"
	"os"
	"strconv"
	"time"

	duckv1 "knative.dev/pkg/apis/duck/v1"

	cloudevents "github.com/cloudevents/sdk-go/v2"
	"github.com/kelseyhightower/envconfig"
)

type Heartbeat struct {
	Sequence int    `json:"id"`
	Label    string `json:"label"`
}

var (
	eventSource string
	eventType   string
	sink        string
	label       string
	periodStr   string
)

func init() {
	flag.StringVar(&eventSource, "eventSource", "", "the event-source (CloudEvents)")
	flag.StringVar(&eventType, "eventType", "dev.knative.eventing.samples.heartbeat", "the event-type (CloudEvents)")
	flag.StringVar(&sink, "sink", "", "the host url to heartbeat to")
	flag.StringVar(&label, "label", "", "a special label")
	flag.StringVar(&periodStr, "period", "5", "the number of seconds between heartbeats")
}

type envConfig struct {
	// Sink URL where to send heartbeat cloud events
	Sink string `envconfig:"K_SINK"`

	// CEOverrides are the CloudEvents overrides to be applied to the outbound event.
	CEOverrides string `envconfig:"K_CE_OVERRIDES"`

	// Name of this pod.
	Name string `envconfig:"POD_NAME" required:"true"`

	// Namespace this pod exists in.
	Namespace string `envconfig:"POD_NAMESPACE" required:"true"`

	// Whether to run continuously or exit.
	OneShot bool `envconfig:"ONE_SHOT" default:"false"`
}

func main() {
	flag.Parse()

	var env envConfig
	if err := envconfig.Process("", &env); err != nil {
		log.Printf("[ERROR] Failed to process env var: %s", err)
		os.Exit(1)
	}

	if env.Sink != "" {
		sink = env.Sink
	}

	var ceOverrides *duckv1.CloudEventOverrides
	if len(env.CEOverrides) > 0 {
		overrides := duckv1.CloudEventOverrides{}
		err := json.Unmarshal([]byte(env.CEOverrides), &overrides)
		if err != nil {
			log.Printf("[ERROR] Unparseable CloudEvents overrides %s: %v", env.CEOverrides, err)
			os.Exit(1)
		}
		ceOverrides = &overrides
	}

	p, err := cloudevents.NewHTTP(cloudevents.WithTarget(sink))
	if err != nil {
		log.Fatalf("failed to create http protocol: %s", err.Error())
	}

	c, err := cloudevents.NewClient(p, cloudevents.WithUUIDs(), cloudevents.WithTimeNow())
	if err != nil {
		log.Fatalf("failed to create client: %s", err.Error())
	}

	var period time.Duration
	if p, err := strconv.Atoi(periodStr); err != nil {
		period = time.Duration(5) * time.Second
	} else {
		period = time.Duration(p) * time.Second
	}

	if eventSource == "" {
		eventSource = fmt.Sprintf("https://knative.dev/eventing-contrib/cmd/heartbeats/#%s/%s", env.Namespace, env.Name)
		log.Printf("Heartbeats Source: %s", eventSource)
	}

	if len(label) > 0 && label[0] == '"' {
		label, _ = strconv.Unquote(label)
	}
	hb := &Heartbeat{
		Sequence: 0,
		Label:    label,
	}
	ticker := time.NewTicker(period)
	for {
		hb.Sequence++

		event := cloudevents.NewEvent("1.0")
		event.SetType(eventType)
		event.SetSource(eventSource)
		event.SetExtension("the", 42)
		event.SetExtension("heart", "yes")
		event.SetExtension("beats", true)

		if ceOverrides != nil && ceOverrides.Extensions != nil {
			for n, v := range ceOverrides.Extensions {
				event.SetExtension(n, v)
			}
		}

		if err := event.SetData(cloudevents.ApplicationJSON, hb); err != nil {
			log.Printf("failed to set cloudevents data: %s", err.Error())
		}

		log.Printf("sending cloudevent to %s", sink)
		if res := c.Send(context.Background(), event); !cloudevents.IsACK(res) {
			log.Printf("failed to send cloudevent: %v", res)
		}

		if env.OneShot {
			return
		}

		// Wait for next tick
		<-ticker.C
	}
}

以下是引用先前心跳容器镜像的容器源示例:

apiVersion: sources.knative.dev/v1
kind: ContainerSource
metadata:
  name: test-heartbeats
spec:
  template:
    spec:
      containers:
        # This corresponds to a heartbeats image URI that you have built and published
        - image: gcr.io/knative-releases/knative.dev/eventing/cmd/heartbeats
          name: heartbeats
          args:
            - --period=1
          env:
            - name: POD_NAME
              value: "example-pod"
            - name: POD_NAMESPACE
              value: "event-test"
  sink:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: showcase
...

2.6.2.2. 使用 Knative CLI 创建和管理容器源

您可以使用 kn source container 命令来使用 Knative (kn) 创建和管理容器源。使用 Knative CLI 创建事件源提供了比直接修改 YAML 文件更精简且直观的用户界面。

创建容器源

$ kn source container create <container_source_name> --image <image_uri> --sink <sink>

删除容器源

$ kn source container delete <container_source_name>

描述容器源

$ kn source container describe <container_source_name>

列出现有容器源

$ kn source container list

以 YAML 格式列出现有容器源

$ kn source container list -o yaml

更新容器源

此命令为现有容器源更新镜像 URI:

$ kn source container update <container_source_name> --image <image_uri>

2.6.2.3. 使用 Web 控制台创建容器源

在集群中安装 Knative Eventing 后,您可以使用 Web 控制台创建容器源。使用 OpenShift Container Platform Web 控制台提供了一个简化且直观的用户界面来创建事件源。

先决条件

  • 已登陆到 OpenShift Container Platform Web 控制台。
  • OpenShift Serverless Operator、Knative Serving 和 Knative Eventing 已在 OpenShift Container Platform 集群中安装。
  • 您已创建了一个项目,或者具有适当的角色和权限访问项目,以便在 OpenShift Container Platform 中创建应用程序和其他工作负载。

流程

  1. Developer 视角中,导航到 +Add Event Source。此时会显示 Event Sources 页面。
  2. 选择 Container Source,然后点 Create Event Source。此时会显示 Create Event Source 页面。
  3. 使用 Form viewYAML 视图配置 Container Source 设置:

    注意

    您可以在 Form viewYAML view 间进行切换。在不同视图间切换时数据会被保留。

    1. Image 字段中,输入您要在由容器源创建的容器中运行的镜像的 URI。
    2. Name 字段中输入镜像的名称。
    3. 可选:在 Arguments 参数字段中,输入要传递给容器的任何参数。
    4. 可选:在 Environment variables 字段中,添加容器中要设置的任何环境变量。
    5. Target 部分中,选择您的事件 sink。这可以是 Resource 或一个 URI

      1. 选择 Resource 来使用频道、代理或服务作为事件源的事件 sink。
      2. 选择 URI 来指定事件路由到的 Uniform Resource Identifier (URI)。
  4. 配置完容器源后,点 Create

2.6.2.4. 容器源参考

您可以通过创建 ContainerSource 对象来使用容器作为事件源。您可以在创建 ContainerSource 对象时配置多个参数。

ContainerSource 对象支持以下字段:

字段描述必需或可选

apiVersion

指定 API 版本,如 sources.knative.dev/v1

必需

kind

将此资源对象标识为 ContainerSource 对象。

必需

metadata

指定唯一标识 ContainerSource 对象的元数据。例如,name

必需

spec

指定此 ContainerSource 对象的配置信息。

必需

spec.sink

对解析为 URI 作为 sink 的对象的引用。

必需

spec.template

ContainerSource 对象的 template 规格。

必需

spec.ceOverrides

定义覆盖来控制发送到 sink 的事件的输出格式和修改。

选填

模板参数示例

apiVersion: sources.knative.dev/v1
kind: ContainerSource
metadata:
  name: test-heartbeats
spec:
  template:
    spec:
      containers:
        - image: quay.io/openshift-knative/heartbeats:latest
          name: heartbeats
          args:
            - --period=1
          env:
            - name: POD_NAME
              value: "mypod"
            - name: POD_NAMESPACE
              value: "event-test"
  ...

2.6.2.4.1. CloudEvent 覆盖

ceOverrides 定义提供覆盖控制发送到 sink 的 CloudEvent 输出格式和修改。您可以为 ceOverrides 定义配置多个字段。

ceOverrides 定义支持以下字段:

字段描述必需或可选

extensions

指定在出站事件中添加或覆盖哪些属性。每个 extensions 键值对在事件上作为属性扩展进行独立设置。

选填

注意

仅允许有效的 CloudEvent 属性名称作为扩展。您无法从扩展覆盖配置设置 spec 定义的属性。例如,您无法修改 type 属性。

CloudEvent Overrides 示例

apiVersion: sources.knative.dev/v1
kind: ContainerSource
metadata:
  name: test-heartbeats
spec:
  ...
  ceOverrides:
    extensions:
      extra: this is an extra attribute
      additional: 42

这会在 主题 上设置 K_CE_OVERRIDES 环境变量:

输出示例

{ "extensions": { "extra": "this is an extra attribute", "additional": "42" } }

2.6.2.5. 将 Service Mesh 与 ContainerSource 集成

先决条件

  • 您已将 Service Mesh 与 OpenShift Serverless 集成。

流程

  1. 在作为 Service MeshMemberRoll 的成员的命名空间中创建一个 Service。

    apiVersion: serving.knative.dev/v1
    kind: Service
    metadata:
      name: event-display
      namespace: <namespace> 1
    spec:
      template:
        metadata:
          annotations:
            sidecar.istio.io/inject: "true" 2
            sidecar.istio.io/rewriteAppHTTPProbers: "true"
        spec:
          containers:
          - image: quay.io/openshift-knative/knative-eventing-sources-event-display:latest
    1
    作为 ServiceMeshMemberRoll 的成员的命名空间。
    2
    将 Service Mesh sidecar 注入 Knative 服务 pod。
  2. 应用 Service 资源。

    $ oc apply -f <filename>
  3. 在作为 ServiceMeshMemberRoll 的成员的命名空间中创建一个 ContainerSource,将 sink 设置为 event-display

    apiVersion: sources.knative.dev/v1
    kind: ContainerSource
    metadata:
      name: test-heartbeats
      namespace: <namespace> 1
    spec:
      template:
        metadata: 2
          annotations:
            sidecar.istio.io/inject: "true"
            sidecar.istio.io/rewriteAppHTTPProbers: "true"
        spec:
          containers:
            - image: quay.io/openshift-knative/heartbeats:latest
              name: heartbeats
              args:
                - --period=1s
              env:
                - name: POD_NAME
                  value: "example-pod"
                - name: POD_NAMESPACE
                  value: "event-test"
      sink:
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: event-display
    1
    命名空间是 ServiceMeshMemberRoll 的一部分。
    2
    启用 Service Mesh 与 ContainerSource集成
  4. 应用 ContainerSource 资源。

    $ oc apply -f <filename>

验证

要验证事件是否已发送到 Knative 事件 sink,请查看消息转储程序功能日志。

  1. 输入以下命令:

    $ oc get pods
  2. 输入以下命令:

    $ oc logs $(oc get pod -o name | grep event-display) -c user-container

    输出示例

    ☁️  cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.eventing.samples.heartbeat
      source: https://knative.dev/eventing/test/heartbeats/#event-test/mypod
      id: 2b72d7bf-c38f-4a98-a433-608fbcdd2596
      time: 2019-10-18T15:23:20.809775386Z
      contenttype: application/json
    Extensions,
      beats: true
      heart: yes
      the: 42
    Data,
      {
        "id": 1,
        "label": ""
      }

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.