3.4. JobSink


事件处理通常在短时间内完成,比如几分钟。这样可确保 HTTP 连接保持打开状态,服务不会预先缩减。

维护长时间运行的连接会增加失败的风险,从而导致处理重启和重复请求重试。

您可以使用 JobSink 来使用完整的 Kubernetes batch/v1 作业资源和功能以及 Kubernetes 作业排队系统(如 Kue)支持长时间运行的异步作业和任务。

3.4.1. 使用 JobSink

当事件发送到 Job Sink 时,Eventing 会创建一个作业,并将接收的事件作为 JSON 文件挂载到 /etc/jobsink-event/event

流程

  1. 创建一个 JobSink 对象定义作为一个 YAML 文件:

    JobSink YAML

    apiVersion: sinks.knative.dev/v1alpha1
    kind: JobSink
    metadata:
      name: job-sink-logger
    spec:
      job:
        spec:
          completions: 1
          parallelism: 1
          template:
            spec:
              restartPolicy: Never
              containers:
                - name: main
                  image: docker.io/library/bash:5
                  command: [ "cat" ]
                  args:
                    - "/etc/jobsink-event/event"
    Copy to Clipboard Toggle word wrap

  2. 应用 JobSink YAML 文件:

    $ oc apply -f <job-sink-file.yaml>
    Copy to Clipboard Toggle word wrap
  3. 验证 JobSink 已就绪:

    $ oc get jobsinks.sinks.knative.dev
    Copy to Clipboard Toggle word wrap

    输出示例:

    NAME              URL                                                                          AGE   READY   REASON
    job-sink-logger   http://job-sink.knative-eventing.svc.cluster.local/default/job-sink-logger   5s    True
    Copy to Clipboard Toggle word wrap
  4. 触发 JobSinkJobSink 可以由任何事件源或触发器触发。

    $ oc run curl --image=curlimages/curl --rm=true --restart=Never -ti -- -X POST -v \
       -H "content-type: application/json"  \
       -H "ce-specversion: 1.0" \
       -H "ce-source: my/curl/command" \
       -H "ce-type: my.demo.event" \
       -H "ce-id: 123" \
       -d '{"details":"JobSinkDemo"}' \
       http://job-sink.knative-eventing.svc.cluster.local/default/job-sink-logger
    Copy to Clipboard Toggle word wrap
  5. 验证已创建了 作业

    $ oc logs job-sink-loggerszoi6-dqbtq
    Copy to Clipboard Toggle word wrap

    输出示例:

    {"specversion":"1.0","id":"123","source":"my/curl/command","type":"my.demo.event","datacontenttype":"application/json","data":{"details":"JobSinkDemo"}}
    Copy to Clipboard Toggle word wrap
注意

Job Sink 为其接收的每个唯一事件创建一个作业。

事件通过其 sourceid 属性的组合唯一标识。

如果在该事件的 作业 已存在时收到具有相同属性的事件,则不会创建另一个 作业

3.4.2. 读取作业事件文件

流程

  • 读取 事件 文件,并使用任何 CloudEvents JSON deserializer 进行反序列化。以下示例演示了如何使用 CloudEvents Go SDK 读取和处理事件:

    package mytask
    
    import (
        "encoding/json"
        "fmt"
        "os"
    
        cloudevents "github.com/cloudevents/sdk-go/v2"
    )
    
    func handleEvent() error {
        eventBytes, err := os.ReadFile("/etc/jobsink-event/event")
        if err != nil {
            return err
        }
    
        event := &cloudevents.Event{}
        if err := json.Unmarshal(eventBytes, event); err != nil {
            return err
        }
    
        fmt.Println(event)
    
        return nil
    }
    Copy to Clipboard Toggle word wrap

3.4.3. 设置自定义事件文件挂载路径

您可以在 JobSink 定义中设置自定义 事件 文件挂载路径。

流程

  • 在容器定义中,包括 volumeMounts 配置并根据需要设置。

    apiVersion: sinks.knative.dev/v1alpha1
    kind: JobSink
    metadata:
      name: job-sink-custom-mount-path
    spec:
      job:
        spec:
          completions: 1
          parallelism: 1
          template:
            spec:
              restartPolicy: Never
              containers:
                - name: main
                  image: docker.io/library/bash:5
                  command: [ "bash" ]
                  args:
                    - -c
                    - echo "Hello world!" && sleep 5
    
                  # The event will be available in a file at `/etc/custom-path/event`
                  volumeMounts:
                    - name: "jobsink-event"
                      mountPath: "/etc/custom-path"
                      readOnly: true
    Copy to Clipboard Toggle word wrap

3.4.4. 清理完成的作业

您可以通过在 JobSink 定义中设置 ttlSecondsAfterFinished 值来清理已完成的作业。例如,将值设置为 600 会在完成后删除 600 秒完成的作业 600 秒(10 分钟)。

流程

  • 在定义中,将 ttlSecondsAfterFinished 的值设置为所需的数量。

    ttlSecondsAfterFinished 设置为 600 的示例

    apiVersion: sinks.knative.dev/v1alpha1
    kind: JobSink
    metadata:
      name: job-sink-example
    spec:
      job:
        spec:
          ttlSecondsAfterFinished: 600
    Copy to Clipboard Toggle word wrap

3.4.5. 模拟 FailJob 操作

流程

  • 通过在 JobSink 定义中包含错误模拟命令来触发 FailJob 操作。

    JobSink 失败示例

    apiVersion: sinks.knative.dev/v1alpha1
    kind: JobSink
    metadata:
      name: job-sink-failure
    spec:
      job:
        metadata:
          labels:
            my-label: my-value
        spec:
          completions: 12
          parallelism: 3
          template:
            spec:
              restartPolicy: Never
              containers:
                - name: main
                  image: docker.io/library/bash:5
                  command: [ "bash" ]        # example command simulating a bug which triggers the FailJob action
                  args:
                    - -c
                    - echo "Hello world!" && sleep 5 && exit 42
          backoffLimit: 6
          podFailurePolicy:
            rules:
              - action: FailJob
                onExitCodes:
                  containerName: main      # optional
                  operator: In             # one of: In, NotIn
                  values: [ 42 ]
              - action: Ignore             # one of: Ignore, FailJob, Count
                onPodConditions:
                  - type: DisruptionTarget   # indicates Pod disruption
    Copy to Clipboard Toggle word wrap

返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat