3.4. JobSink
イベント処理は通常、数分などの短い時間枠内で完了します。これにより、HTTP 接続が開いたままになり、サービスが早期にスケールダウンすることがなくなります。
長時間接続を維持すると、障害のリスクが高まり、処理の再起動やリクエストの再試行が繰り返される可能性があります。
JobSink を使用すると、batch/v1 Job リソースと機能全体、および Kueue などの Kubernetes ジョブキューシステムを使用して、長時間実行される非同期ジョブとタスクをサポートできます。
3.4.1. JobSink の使用 リンクのコピーリンクがクリップボードにコピーされました!
イベントが JobSink に送信されると、Eventing は Job を作成し、受信したイベントを JSON ファイルとして /etc/jobsink-event/event にマウントします。
手順
JobSinkオブジェクト定義を YAML ファイルとして作成します。JobSink YAML
apiVersion: sinks.knative.dev/v1alpha1 kind: JobSink metadata: name: job-sink-logger spec: job: spec: completions: 1 parallelism: 1 template: spec: restartPolicy: Never containers: - name: main image: docker.io/library/bash:5 command: [ "cat" ] args: - "/etc/jobsink-event/event"JobSinkYAML ファイルを適用します。$ oc apply -f <job-sink-file.yaml>JobSinkの準備ができていることを確認します。$ oc get jobsinks.sinks.knative.dev出力例:
NAME URL AGE READY REASON job-sink-logger http://job-sink.knative-eventing.svc.cluster.local/default/job-sink-logger 5s TrueJobSinkをトリガーします。JobSinkは、任意のイベントソースまたはトリガーによってトリガーできます。$ oc run curl --image=curlimages/curl --rm=true --restart=Never -ti -- -X POST -v \ -H "content-type: application/json" \ -H "ce-specversion: 1.0" \ -H "ce-source: my/curl/command" \ -H "ce-type: my.demo.event" \ -H "ce-id: 123" \ -d '{"details":"JobSinkDemo"}' \ http://job-sink.knative-eventing.svc.cluster.local/default/job-sink-loggerJobが作成されたことを確認します。$ oc logs job-sink-loggerszoi6-dqbtq出力例:
{"specversion":"1.0","id":"123","source":"my/curl/command","type":"my.demo.event","datacontenttype":"application/json","data":{"details":"JobSinkDemo"}}
JobSink は、受信した一意のイベントごとに Job を作成します。
イベントは、source 属性と id 属性の組み合わせによって一意に識別されます。
同じ属性を持つイベントを受信した際に、そのイベントの Job がすでに存在する場合は、別の Job は作成されません。
3.4.2. Job イベントファイルの読み取り リンクのコピーリンクがクリップボードにコピーされました!
手順
eventファイルを読み取り、任意の CloudEvents JSON デシリアライザーを使用してデシリアライズします。次の例は、CloudEvents Go SDK を使用してイベントを読み取って処理する方法を示しています。package mytask import ( "encoding/json" "fmt" "os" cloudevents "github.com/cloudevents/sdk-go/v2" ) func handleEvent() error { eventBytes, err := os.ReadFile("/etc/jobsink-event/event") if err != nil { return err } event := &cloudevents.Event{} if err := json.Unmarshal(eventBytes, event); err != nil { return err } fmt.Println(event) return nil }
3.4.3. カスタムイベントファイルのマウントパスの設定 リンクのコピーリンクがクリップボードにコピーされました!
JobSink 定義でカスタム event ファイルのマウントパスを設定できます。
手順
コンテナー定義内に
volumeMounts設定を含め、必要に応じて設定します。apiVersion: sinks.knative.dev/v1alpha1 kind: JobSink metadata: name: job-sink-custom-mount-path spec: job: spec: completions: 1 parallelism: 1 template: spec: restartPolicy: Never containers: - name: main image: docker.io/library/bash:5 command: [ "bash" ] args: - -c - echo "Hello world!" && sleep 5 # The event will be available in a file at `/etc/custom-path/event` volumeMounts: - name: "jobsink-event" mountPath: "/etc/custom-path" readOnly: true
3.4.4. 終了したジョブのクリーンアップ リンクのコピーリンクがクリップボードにコピーされました!
JobSink 定義で ttlSecondsAfterFinished 値を設定することで、完了したジョブをクリーンアップできます。たとえば、値を 600 に設定すると、完了したジョブは完了後 600 秒 (10 分) で削除されます。
手順
定義では、
ttlSecondsAfterFinishedの値を必要な量に設定します。ttlSecondsAfterFinished を 600 に設定した場合の例
apiVersion: sinks.knative.dev/v1alpha1 kind: JobSink metadata: name: job-sink-example spec: job: spec: ttlSecondsAfterFinished: 600
3.4.5. FailJob アクションのシミュレーション リンクのコピーリンクがクリップボードにコピーされました!
手順
JobSink 定義にバグをシミュレートするコマンドを含めることで、
FailJobアクションをトリガーします。JobSink の失敗例
apiVersion: sinks.knative.dev/v1alpha1 kind: JobSink metadata: name: job-sink-failure spec: job: metadata: labels: my-label: my-value spec: completions: 12 parallelism: 3 template: spec: restartPolicy: Never containers: - name: main image: docker.io/library/bash:5 command: [ "bash" ] # example command simulating a bug which triggers the FailJob action args: - -c - echo "Hello world!" && sleep 5 && exit 42 backoffLimit: 6 podFailurePolicy: rules: - action: FailJob onExitCodes: containerName: main # optional operator: In # one of: In, NotIn values: [ 42 ] - action: Ignore # one of: Ignore, FailJob, Count onPodConditions: - type: DisruptionTarget # indicates Pod disruption