3.4. JobSink
事件处理通常在短时间内完成,比如几分钟。这样可确保 HTTP 连接保持打开状态,服务不会预先缩减。
维护长时间运行的连接会增加失败的风险,从而导致处理重启和重复请求重试。
您可以使用 JobSink 来使用完整的 Kubernetes batch/v1 作业资源和功能以及 Kubernetes 作业排队系统(如 Kue)支持长时间运行的异步作业和任务。
3.4.1. 使用 JobSink 复制链接链接已复制到粘贴板!
当事件发送到 时,Eventing 会创建一个作业,并将接收的事件作为 JSON 文件挂载到 Job Sink/etc/jobsink-event/event。
流程
创建一个
JobSink对象定义作为一个 YAML 文件:JobSink YAML
apiVersion: sinks.knative.dev/v1alpha1 kind: JobSink metadata: name: job-sink-logger spec: job: spec: completions: 1 parallelism: 1 template: spec: restartPolicy: Never containers: - name: main image: docker.io/library/bash:5 command: [ "cat" ] args: - "/etc/jobsink-event/event"应用
JobSinkYAML 文件:$ oc apply -f <job-sink-file.yaml>验证
JobSink已就绪:$ oc get jobsinks.sinks.knative.dev输出示例:
NAME URL AGE READY REASON job-sink-logger http://job-sink.knative-eventing.svc.cluster.local/default/job-sink-logger 5s True触发
JobSink。JobSink可以由任何事件源或触发器触发。$ oc run curl --image=curlimages/curl --rm=true --restart=Never -ti -- -X POST -v \ -H "content-type: application/json" \ -H "ce-specversion: 1.0" \ -H "ce-source: my/curl/command" \ -H "ce-type: my.demo.event" \ -H "ce-id: 123" \ -d '{"details":"JobSinkDemo"}' \ http://job-sink.knative-eventing.svc.cluster.local/default/job-sink-logger验证已创建了
作业:$ oc logs job-sink-loggerszoi6-dqbtq输出示例:
{"specversion":"1.0","id":"123","source":"my/curl/command","type":"my.demo.event","datacontenttype":"application/json","data":{"details":"JobSinkDemo"}}
Job Sink 为其接收的每个唯一事件创建一个作业。
事件通过其 source 和 id 属性的组合唯一标识。
如果在该事件的 作业 已存在时收到具有相同属性的事件,则不会创建另一个 作业。
3.4.2. 读取作业事件文件 复制链接链接已复制到粘贴板!
流程
读取
事件文件,并使用任何 CloudEvents JSON deserializer 进行反序列化。以下示例演示了如何使用 CloudEvents Go SDK 读取和处理事件:package mytask import ( "encoding/json" "fmt" "os" cloudevents "github.com/cloudevents/sdk-go/v2" ) func handleEvent() error { eventBytes, err := os.ReadFile("/etc/jobsink-event/event") if err != nil { return err } event := &cloudevents.Event{} if err := json.Unmarshal(eventBytes, event); err != nil { return err } fmt.Println(event) return nil }
3.4.3. 设置自定义事件文件挂载路径 复制链接链接已复制到粘贴板!
您可以在 JobSink 定义中设置自定义 事件 文件挂载路径。
流程
在容器定义中,包括
volumeMounts配置并根据需要设置。apiVersion: sinks.knative.dev/v1alpha1 kind: JobSink metadata: name: job-sink-custom-mount-path spec: job: spec: completions: 1 parallelism: 1 template: spec: restartPolicy: Never containers: - name: main image: docker.io/library/bash:5 command: [ "bash" ] args: - -c - echo "Hello world!" && sleep 5 # The event will be available in a file at `/etc/custom-path/event` volumeMounts: - name: "jobsink-event" mountPath: "/etc/custom-path" readOnly: true
3.4.4. 清理完成的作业 复制链接链接已复制到粘贴板!
您可以通过在 JobSink 定义中设置 ttlSecondsAfterFinished 值来清理已完成的作业。例如,将值设置为 600 会在完成后删除 600 秒完成的作业 600 秒(10 分钟)。
流程
在定义中,将
ttlSecondsAfterFinished的值设置为所需的数量。ttlSecondsAfterFinished 设置为 600 的示例
apiVersion: sinks.knative.dev/v1alpha1 kind: JobSink metadata: name: job-sink-example spec: job: spec: ttlSecondsAfterFinished: 600
3.4.5. 模拟 FailJob 操作 复制链接链接已复制到粘贴板!
流程
通过在 JobSink 定义中包含错误模拟命令来触发
FailJob操作。JobSink 失败示例
apiVersion: sinks.knative.dev/v1alpha1 kind: JobSink metadata: name: job-sink-failure spec: job: metadata: labels: my-label: my-value spec: completions: 12 parallelism: 3 template: spec: restartPolicy: Never containers: - name: main image: docker.io/library/bash:5 command: [ "bash" ] # example command simulating a bug which triggers the FailJob action args: - -c - echo "Hello world!" && sleep 5 && exit 42 backoffLimit: 6 podFailurePolicy: rules: - action: FailJob onExitCodes: containerName: main # optional operator: In # one of: In, NotIn values: [ 42 ] - action: Ignore # one of: Ignore, FailJob, Count onPodConditions: - type: DisruptionTarget # indicates Pod disruption