5.10. カスタムイベントソース


Knative に含まれていないイベントプロデューサーや、CloudEvent 形式ではないイベントを生成するプロデューサーからイベントを Ingress する必要がある場合は、カスタムイベントソースを使用してこれを実行できます。カスタムイベントソースは、次のいずれかの方法で作成できます。

  • シンクバインディングを作成して、PodSpecable オブジェクトをイベントソースとして使用します。
  • コンテナーソースを作成して、コンテナーをイベントソースとして使用します。

5.10.1. シンクバインディング

SinkBinding オブジェクトは、イベント生成を配信アドレス指定から切り離すことをサポートします。シンクバインディングは、イベントプロデューサー をイベントコンシューマーまたは シンク に接続するために使用されます。イベントプロデューサーは、PodSpec テンプレートを組み込む Kubernetes リソースであり、イベントを生成します。シンクは、イベントを受信できるアドレス指定可能な Kubernetes オブジェクトです。

SinkBinding オブジェクトは、環境変数をシンクの PodTemplateSpec に挿入します。つまり、アプリケーションコードが Kubernetes API と直接対話してイベントの宛先を見つける必要はありません。これらの環境変数は以下のとおりです。

K_SINK
解決されたシンクの URL。
K_CE_OVERRIDES
アウトバウンドイベントの上書きを指定する JSON オブジェクト。
注記

現在、SinkBinding オブジェクトはサービスのカスタムリビジョン名をサポートしません。

5.10.1.1. YAML を使用したシンクバインディングの作成

YAML ファイルを使用して Knative リソースを作成する場合、宣言的 API を使用するため、再現性の高い方法でイベントソースを宣言的に記述することができます。YAML を使用してシンクバインディングを作成するには、SinkBinding オブジェクトを定義する YAML ファイルを作成し、oc apply コマンドを使用してそれを適用する必要があります。

前提条件

  • OpenShift Serverless Operator、Knative Serving、および Knative Eventing がクラスターにインストールされている。
  • OpenShift CLI (oc) をインストールしている。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。

手順

  1. シンクバインディングが正しく設定されていることを確認するには、受信メッセージをダンプする Knative イベント表示サービスまたはイベントシンクを作成します。

    1. サービス YAML ファイルを作成します。

      サービス YAML ファイルの例

      apiVersion: serving.knative.dev/v1
      kind: Service
      metadata:
        name: event-display
      spec:
        template:
          spec:
            containers:
              - image: quay.io/openshift-knative/knative-eventing-sources-event-display:latest

    2. サービスを作成します。

      $ oc apply -f <filename>
  2. イベントをサービスに転送するシンクバインディングインスタンスを作成します。

    1. シンクバインディング YAML ファイルを作成します。

      サービス YAML ファイルの例

      apiVersion: sources.knative.dev/v1alpha1
      kind: SinkBinding
      metadata:
        name: bind-heartbeat
      spec:
        subject:
          apiVersion: batch/v1
          kind: Job 1
          selector:
            matchLabels:
              app: heartbeat-cron
      
        sink:
          ref:
            apiVersion: serving.knative.dev/v1
            kind: Service
            name: event-display

      1
      この例では、ラベル app: heartbeat-cron を指定したジョブがイベントシンクにバインドされます。
    2. シンクバインディングを作成します。

      $ oc apply -f <filename>
  3. CronJob オブジェクトを作成します。

    1. cron ジョブの YAML ファイルを作成します。

      cron ジョブの YAML ファイルの例

      apiVersion: batch/v1
      kind: CronJob
      metadata:
        name: heartbeat-cron
      spec:
        # Run every minute
        schedule: "* * * * *"
        jobTemplate:
          metadata:
            labels:
              app: heartbeat-cron
              bindings.knative.dev/include: "true"
          spec:
            template:
              spec:
                restartPolicy: Never
                containers:
                  - name: single-heartbeat
                    image: quay.io/openshift-knative/heartbeats:latest
                    args:
                      - --period=1
                    env:
                      - name: ONE_SHOT
                        value: "true"
                      - name: POD_NAME
                        valueFrom:
                          fieldRef:
                            fieldPath: metadata.name
                      - name: POD_NAMESPACE
                        valueFrom:
                          fieldRef:
                            fieldPath: metadata.namespace

      重要

      シンクバインディングを使用するには、bindings.knative.dev/include=true ラベルを Knative リソースに手動で追加する必要があります。

      たとえば、このラベルを CronJob インスタンスに追加するには、以下の行を Job リソースの YAML 定義に追加します。

        jobTemplate:
          metadata:
            labels:
              app: heartbeat-cron
              bindings.knative.dev/include: "true"
    2. cron ジョブを作成します。

      $ oc apply -f <filename>
  4. 以下のコマンドを入力し、出力を検査して、コントローラーが正しくマップされていることを確認します。

    $ oc get sinkbindings.sources.knative.dev bind-heartbeat -oyaml

    出力例

    spec:
      sink:
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: event-display
          namespace: default
      subject:
        apiVersion: batch/v1
        kind: Job
        namespace: default
        selector:
          matchLabels:
            app: heartbeat-cron

検証

メッセージダンパー機能ログを確認して、Kubernetes イベントが Knative イベントシンクに送信されていることを確認できます。

  1. コマンドを入力します。

    $ oc get pods
  2. コマンドを入力します。

    $ oc logs $(oc get pod -o name | grep event-display) -c user-container

    出力例

    ☁️  cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.eventing.samples.heartbeat
      source: https://knative.dev/eventing-contrib/cmd/heartbeats/#event-test/mypod
      id: 2b72d7bf-c38f-4a98-a433-608fbcdd2596
      time: 2019-10-18T15:23:20.809775386Z
      contenttype: application/json
    Extensions,
      beats: true
      heart: yes
      the: 42
    Data,
      {
        "id": 1,
        "label": ""
      }

5.10.1.2. Knative CLI を使用したシンクバインディングの作成

kn source binding create コマンドを使用し、Knative (kn) を使用してシンクバインディングを作成できます。イベントソースを作成するために Knative CLI を使用すると、YAML ファイルを直接修正するよりも合理的で直感的なユーザーインターフェイスが得られます。

前提条件

  • OpenShift Serverless Operator、Knative Serving、および Knative Eventing がクラスターにインストールされている。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
  • Knative (kn) CLI をインストールしている。
  • OpenShift CLI (oc) をインストールしている。
注記

以下の手順では、YAML ファイルを作成する必要があります。

サンプルで使用されたもので YAML ファイルの名前を変更する場合は、必ず対応する CLI コマンドを更新する必要があります。

手順

  1. シンクバインディングが正しく設定されていることを確認するには、受信メッセージをダンプする Knative イベント表示サービスまたはイベントシンクを作成します。

    $ kn service create event-display --image quay.io/openshift-knative/knative-eventing-sources-event-display:latest
  2. イベントをサービスに転送するシンクバインディングインスタンスを作成します。

    $ kn source binding create bind-heartbeat --subject Job:batch/v1:app=heartbeat-cron --sink ksvc:event-display
  3. CronJob オブジェクトを作成します。

    1. cron ジョブの YAML ファイルを作成します。

      cron ジョブの YAML ファイルの例

      apiVersion: batch/v1
      kind: CronJob
      metadata:
        name: heartbeat-cron
      spec:
        # Run every minute
        schedule: "* * * * *"
        jobTemplate:
          metadata:
            labels:
              app: heartbeat-cron
              bindings.knative.dev/include: "true"
          spec:
            template:
              spec:
                restartPolicy: Never
                containers:
                  - name: single-heartbeat
                    image: quay.io/openshift-knative/heartbeats:latest
                    args:
                      - --period=1
                    env:
                      - name: ONE_SHOT
                        value: "true"
                      - name: POD_NAME
                        valueFrom:
                          fieldRef:
                            fieldPath: metadata.name
                      - name: POD_NAMESPACE
                        valueFrom:
                          fieldRef:
                            fieldPath: metadata.namespace

      重要

      シンクバインディングを使用するには、bindings.knative.dev/include=true ラベルを Knative CR に手動で追加する必要があります。

      たとえば、このラベルを CronJob CR に追加するには、以下の行を Job CR の YAML 定義に追加します。

        jobTemplate:
          metadata:
            labels:
              app: heartbeat-cron
              bindings.knative.dev/include: "true"
    2. cron ジョブを作成します。

      $ oc apply -f <filename>
  4. 以下のコマンドを入力し、出力を検査して、コントローラーが正しくマップされていることを確認します。

    $ kn source binding describe bind-heartbeat

    出力例

    Name:         bind-heartbeat
    Namespace:    demo-2
    Annotations:  sources.knative.dev/creator=minikube-user, sources.knative.dev/lastModifier=minikub ...
    Age:          2m
    Subject:
      Resource:   job (batch/v1)
      Selector:
        app:      heartbeat-cron
    Sink:
      Name:       event-display
      Resource:   Service (serving.knative.dev/v1)
    
    Conditions:
      OK TYPE     AGE REASON
      ++ Ready     2m

検証

メッセージダンパー機能ログを確認して、Kubernetes イベントが Knative イベントシンクに送信されていることを確認できます。

  • 以下のコマンドを入力して、メッセージダンパー機能ログを表示します。

    $ oc get pods
    $ oc logs $(oc get pod -o name | grep event-display) -c user-container

    出力例

    ☁️  cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.eventing.samples.heartbeat
      source: https://knative.dev/eventing-contrib/cmd/heartbeats/#event-test/mypod
      id: 2b72d7bf-c38f-4a98-a433-608fbcdd2596
      time: 2019-10-18T15:23:20.809775386Z
      contenttype: application/json
    Extensions,
      beats: true
      heart: yes
      the: 42
    Data,
      {
        "id": 1,
        "label": ""
      }

5.10.1.2.1. Knative CLI シンクフラグ

Knative (kn) CLI を使用してイベントソースを作成する場合、--sink フラグを使用して、イベントがリソースから送信されるシンクを指定できます。シンクは、他のリソースから受信イベントを受信できる、アドレス指定可能または呼び出し可能な任意のリソースです。

以下の例では、サービスの http://event-display.svc.cluster.local をシンクとして使用するシンクバインディングを作成します。

シンクフラグを使用したコマンドの例

$ kn source binding create bind-heartbeat \
  --namespace sinkbinding-example \
  --subject "Job:batch/v1:app=heartbeat-cron" \
  --sink http://event-display.svc.cluster.local \ 1
  --ce-override "sink=bound"

1
http://event-display.svc.cluster.localsvc は、シンクが Knative サービスであることを判別します。他のデフォルトのシンクの接頭辞には、channel および broker が含まれます。

5.10.1.3. Web コンソールを使用したシンクバインディングの作成

Knative Eventing がクラスターにインストールされると、Web コンソールを使用して シンクバインディングを作成できます。OpenShift Container Platform Web コンソールを使用すると、イベントソースを作成するための合理的で直感的なユーザーインターフェイスが提供されます。

前提条件

  • OpenShift Container Platform Web コンソールにログインしている。
  • OpenShift Serverless Operator、Knative Serving、および Knative Eventing が OpenShift Container Platform クラスターにインストールされている。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。

手順

  1. シンクとして使用する Knative サービスを作成します。

    1. Developer パースペクティブで、+Add YAML に移動します。
    2. サンプル YAML をコピーします。

      apiVersion: serving.knative.dev/v1
      kind: Service
      metadata:
        name: event-display
      spec:
        template:
          spec:
            containers:
              - image: quay.io/openshift-knative/knative-eventing-sources-event-display:latest
    3. Create をクリックします。
  2. イベントソースとして使用される CronJob リソースを作成し、1 分ごとにイベントを送信します。

    1. Developer パースペクティブで、+Add YAML に移動します。
    2. サンプル YAML をコピーします。

      apiVersion: batch/v1
      kind: CronJob
      metadata:
        name: heartbeat-cron
      spec:
        # Run every minute
        schedule: "*/1 * * * *"
        jobTemplate:
          metadata:
            labels:
              app: heartbeat-cron
              bindings.knative.dev/include: true 1
          spec:
            template:
              spec:
                restartPolicy: Never
                containers:
                  - name: single-heartbeat
                    image: quay.io/openshift-knative/heartbeats
                    args:
                    - --period=1
                    env:
                      - name: ONE_SHOT
                        value: "true"
                      - name: POD_NAME
                        valueFrom:
                          fieldRef:
                            fieldPath: metadata.name
                      - name: POD_NAMESPACE
                        valueFrom:
                          fieldRef:
                            fieldPath: metadata.namespace
      1
      bindings.knative.dev/include: true ラベルを含めるようにしてください。OpenShift Serverless のデフォルトの namespace 選択動作は包含モードを使用します。
    3. Create をクリックします。
  3. 直前の手順で作成したサービスと同じ namespace、またはイベントの送信先となる他のシンクと同じ namespace にシンクバインディングを作成します。

    1. Developer パースペクティブで、+Add Event Source に移動します。Event Sources ページが表示されます。
    2. オプション: イベントソースに複数のプロバイダーがある場合は、Providers 一覧から必要なプロバイダーを選択し、プロバイダーから利用可能なイベントソースをフィルターします。
    3. Sink Binding を選択し、Create Event Source をクリックします。Create Event Source ページが表示されます。

      注記

      Form view または YAML view を使用して Sink Binding 設定を設定し、ビューを切り替えることができます。ビューの切り替え時に、データは永続化されます。

    4. apiVersion フィールドに batch/v1 を入力します。
    5. Kind フィールドに Job と入力します。

      注記

      CronJob の種類は OpenShift Serverless シンクバインディングで直接サポートされていないため、Kind フィールドは cron ジョブオブジェクト自体ではなく、cron ジョブで作成される Job オブジェクトをターゲットにする必要があります。

    6. Sink を選択します。これは Resource または URI のいずれかになります。この例では、直前の手順で作成された event-display サービスが Resources シンクとして使用されます。
    7. Match labels セクションで以下を実行します。

      1. Name フィールドに app と入力します。
      2. Value フィールドに heartbeat-cron と入力します。

        注記

        ラベルセレクターは、リソース名ではなくシンクバインディングで cron ジョブを使用する場合に必要になります。これは、cron ジョブで作成されたジョブには予測可能な名前がなく、名前に無作為に生成される文字列が含まれているためです。たとえば、hearthbeat-cron-1cc23f になります。

    8. Create をクリックします。

検証

Topology ページおよび Pod ログを表示して、シンクバインディング、シンク、および cron ジョブが正常に作成され、機能していることを確認できます。

  1. Developer パースペクティブで、Topology に移動します。
  2. シンクバインディング、シンク、およびハートビートの cron ジョブを表示します。

    View the sink binding and service in the Topology view
  3. シンクバインディングが追加されると、正常なジョブが cron ジョブによって登録されていることを確認します。つまり、シンクバインディングは cron ジョブで作成されたジョブが正常に再設定されることを意味します。
  4. event-display サービス Pod のログを参照し、ハートビート cron ジョブで生成されるイベントを表示します。

5.10.1.4. シンクバインディング参照

シンクバインディングを作成して、PodSpecable オブジェクトをイベントソースとして使用できます。SinkBinding オブジェクトを作成するときに、複数のパラメーターを設定できます。

SinkBinding オブジェクトは以下のパラメーターをサポートします。

フィールド説明必須またはオプション

apiVersion

API バージョンを指定します (例: sources.knative.dev/v1)。

必須

kind

このリソースオブジェクトを SinkBinding オブジェクトとして特定します。

必須

metadata

SinkBinding オブジェクトを一意に識別するメタデータを指定します。たとえば、name です。

必須

spec

この SinkBinding オブジェクトの設定情報を指定します。

必須

spec.sink

シンクとして使用する URI に解決するオブジェクトへの参照。

必須

spec.subject

ランタイムコントラクトがバインディング実装によって拡張されるリソースを参照します。

必須

spec.ceOverrides

上書きを定義して、シンクに送信されたイベントへの出力形式および変更を制御します。

任意

5.10.1.4.1. Subject パラメーター

Subject パラメーターは、ランタイムコントラクトがバインディング実装によって拡張されるリソースを参照します。Subject 定義に複数のフィールドを設定できます。

Subject 定義は、以下のフィールドをサポートします。

フィールド説明必須またはオプション

apiVersion

参照先の API バージョン。

必須

kind

参照先の種類。

必須

namespace

参照先の namespace。省略されている場合、デフォルトはオブジェクトの namespace に設定されます。

任意

name

参照先の名前。

selector を設定する場合は、使用しないでください。

selector

参照先のセレクター。

name を設定する場合は、使用しないでください。

selector.matchExpressions

ラベルセレクターの要件の一覧です。

matchExpressions または matchLabels のいずれかのみを使用します。

selector.matchExpressions.key

セレクターが適用されるラベルキー。

matchExpressions を使用する場合に必須です。

selector.matchExpressions.operator

キーと値のセットの関係を表します。有効な演算子は InNotInExists、および DoesNotExist です。

matchExpressions を使用する場合に必須です。

selector.matchExpressions.values

文字列値の配列。operator パラメーターの値が In または NotIn の場合、値配列が空でないようにする必要があります。operator パラメーターの値が Exists または DoesNotExist の場合、値の配列は空である必要があります。この配列は、ストラテジーに基づいたマージパッチの適用中に置き換えられます。

matchExpressions を使用する場合に必須です。

selector.matchLabels

キーと値のペアのマップ。matchLabels マップの各キーと値のペアは matchExpressions の要素と同じです。ここで、キーフィールドは matchLabels.<key> で、operatorIn で、values の配列には matchLabels.<value> のみが含まれます。

matchExpressions または matchLabels のいずれかのみを使用します。

サブジェクトパラメーターの例

以下の YAML の場合は、default namespace の mysubject という名前の Deployment オブジェクトが選択されます。

apiVersion: sources.knative.dev/v1
kind: SinkBinding
metadata:
  name: bind-heartbeat
spec:
  subject:
    apiVersion: apps/v1
    kind: Deployment
    namespace: default
    name: mysubject
  ...

以下の YAML の場合、default namespace にラベル working=example が設定された Job オブジェクトが選択されます。

apiVersion: sources.knative.dev/v1
kind: SinkBinding
metadata:
  name: bind-heartbeat
spec:
  subject:
    apiVersion: batch/v1
    kind: Job
    namespace: default
    selector:
      matchLabels:
        working: example
  ...

以下の YAML の場合、default namespace にラベル working=example または working=sample が含まれる Pod オブジェクトが選択されます。

apiVersion: sources.knative.dev/v1
kind: SinkBinding
metadata:
  name: bind-heartbeat
spec:
  subject:
    apiVersion: v1
    kind: Pod
    namespace: default
    selector:
      - matchExpression:
        key: working
        operator: In
        values:
          - example
          - sample
  ...
5.10.1.4.2. CloudEvent オーバーライド

ceOverrides 定義は、シンクに送信される CloudEvent の出力形式および変更を制御するオーバーライドを提供します。ceOverrides 定義に複数のフィールドを設定できます。

ceOverrides の定義は、以下のフィールドをサポートします。

フィールド説明必須またはオプション

extensions

アウトバウンドイベントで追加または上書きされる属性を指定します。各 extensions のキーと値のペアは、属性拡張機能としてイベントに個別に設定されます。

任意

注記

拡張子として許可されるのは、有効な CloudEvent 属性名のみです。拡張機能オーバーライド設定から仕様定義属性を設定することはできません。たとえば、type 属性を変更することはできません。

CloudEvent オーバーライドの例

apiVersion: sources.knative.dev/v1
kind: SinkBinding
metadata:
  name: bind-heartbeat
spec:
  ...
  ceOverrides:
    extensions:
      extra: this is an extra attribute
      additional: 42

これにより、subjectK_CE_OVERRIDES 環境変数が設定されます。

出力例

{ "extensions": { "extra": "this is an extra attribute", "additional": "42" } }

5.10.1.4.3. include ラベル

シンクバインディングを使用するには、bindings.knative.dev/include: "true" ラベルをリソースまたはリソースが含まれる namespace のいずれかに割り当てる必要があります。リソース定義にラベルが含まれていない場合には、クラスター管理者は以下を実行してこれを namespace に割り当てることができます。

$ oc label namespace <namespace> bindings.knative.dev/include=true

5.10.2. コンテナーソース

コンテナーソースは、イベントを生成し、イベントをシンクに送信するコンテナーイメージを作成します。コンテナーソースを使用して、イメージ URI を使用するコンテナーイメージおよび ContainerSource オブジェクトを作成して、カスタムイベントソースを作成できます。

5.10.2.1. コンテナーイメージを作成するためのガイドライン

コンテナーソースコントローラーには、K_SINK および K_CE_OVERRIDES の 2 つの環境変数が注入されます。これらの変数は、それぞれ sink および ceOverrides 仕様から解決されます。イベントは、K_SINK 環境変数で指定されたシンク URI に送信されます。メッセージは、CloudEvent HTTP 形式を使用して POST として送信する必要があります。

コンテナーイメージの例

以下は、ハートビートコンテナーイメージの例になります。

package main

import (
	"context"
	"encoding/json"
	"flag"
	"fmt"
	"log"
	"os"
	"strconv"
	"time"

	duckv1 "knative.dev/pkg/apis/duck/v1"

	cloudevents "github.com/cloudevents/sdk-go/v2"
	"github.com/kelseyhightower/envconfig"
)

type Heartbeat struct {
	Sequence int    `json:"id"`
	Label    string `json:"label"`
}

var (
	eventSource string
	eventType   string
	sink        string
	label       string
	periodStr   string
)

func init() {
	flag.StringVar(&eventSource, "eventSource", "", "the event-source (CloudEvents)")
	flag.StringVar(&eventType, "eventType", "dev.knative.eventing.samples.heartbeat", "the event-type (CloudEvents)")
	flag.StringVar(&sink, "sink", "", "the host url to heartbeat to")
	flag.StringVar(&label, "label", "", "a special label")
	flag.StringVar(&periodStr, "period", "5", "the number of seconds between heartbeats")
}

type envConfig struct {
	// Sink URL where to send heartbeat cloud events
	Sink string `envconfig:"K_SINK"`

	// CEOverrides are the CloudEvents overrides to be applied to the outbound event.
	CEOverrides string `envconfig:"K_CE_OVERRIDES"`

	// Name of this pod.
	Name string `envconfig:"POD_NAME" required:"true"`

	// Namespace this pod exists in.
	Namespace string `envconfig:"POD_NAMESPACE" required:"true"`

	// Whether to run continuously or exit.
	OneShot bool `envconfig:"ONE_SHOT" default:"false"`
}

func main() {
	flag.Parse()

	var env envConfig
	if err := envconfig.Process("", &env); err != nil {
		log.Printf("[ERROR] Failed to process env var: %s", err)
		os.Exit(1)
	}

	if env.Sink != "" {
		sink = env.Sink
	}

	var ceOverrides *duckv1.CloudEventOverrides
	if len(env.CEOverrides) > 0 {
		overrides := duckv1.CloudEventOverrides{}
		err := json.Unmarshal([]byte(env.CEOverrides), &overrides)
		if err != nil {
			log.Printf("[ERROR] Unparseable CloudEvents overrides %s: %v", env.CEOverrides, err)
			os.Exit(1)
		}
		ceOverrides = &overrides
	}

	p, err := cloudevents.NewHTTP(cloudevents.WithTarget(sink))
	if err != nil {
		log.Fatalf("failed to create http protocol: %s", err.Error())
	}

	c, err := cloudevents.NewClient(p, cloudevents.WithUUIDs(), cloudevents.WithTimeNow())
	if err != nil {
		log.Fatalf("failed to create client: %s", err.Error())
	}

	var period time.Duration
	if p, err := strconv.Atoi(periodStr); err != nil {
		period = time.Duration(5) * time.Second
	} else {
		period = time.Duration(p) * time.Second
	}

	if eventSource == "" {
		eventSource = fmt.Sprintf("https://knative.dev/eventing-contrib/cmd/heartbeats/#%s/%s", env.Namespace, env.Name)
		log.Printf("Heartbeats Source: %s", eventSource)
	}

	if len(label) > 0 && label[0] == '"' {
		label, _ = strconv.Unquote(label)
	}
	hb := &Heartbeat{
		Sequence: 0,
		Label:    label,
	}
	ticker := time.NewTicker(period)
	for {
		hb.Sequence++

		event := cloudevents.NewEvent("1.0")
		event.SetType(eventType)
		event.SetSource(eventSource)
		event.SetExtension("the", 42)
		event.SetExtension("heart", "yes")
		event.SetExtension("beats", true)

		if ceOverrides != nil && ceOverrides.Extensions != nil {
			for n, v := range ceOverrides.Extensions {
				event.SetExtension(n, v)
			}
		}

		if err := event.SetData(cloudevents.ApplicationJSON, hb); err != nil {
			log.Printf("failed to set cloudevents data: %s", err.Error())
		}

		log.Printf("sending cloudevent to %s", sink)
		if res := c.Send(context.Background(), event); !cloudevents.IsACK(res) {
			log.Printf("failed to send cloudevent: %v", res)
		}

		if env.OneShot {
			return
		}

		// Wait for next tick
		<-ticker.C
	}
}

以下は、以前のハートビートコンテナーイメージを参照するコンテナーソースの例です。

apiVersion: sources.knative.dev/v1
kind: ContainerSource
metadata:
  name: test-heartbeats
spec:
  template:
    spec:
      containers:
        # This corresponds to a heartbeats image URI that you have built and published
        - image: gcr.io/knative-releases/knative.dev/eventing/cmd/heartbeats
          name: heartbeats
          args:
            - --period=1
          env:
            - name: POD_NAME
              value: "example-pod"
            - name: POD_NAMESPACE
              value: "event-test"
  sink:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: example-service
...

5.10.2.2. Knative CLI を使用したコンテナーソースの作成および管理

kn source container コマンドを使用し、Knative (kn) CLI を使用してコンテナーソースを作成および管理できます。イベントソースを作成するために Knative CLI を使用すると、YAML ファイルを直接修正するよりも合理的で直感的なユーザーインターフェイスが得られます。

コンテナーソースを作成します。

$ kn source container create <container_source_name> --image <image_uri> --sink <sink>

コンテナーソースの削除

$ kn source container delete <container_source_name>

コンテナーソースを記述します。

$ kn source container describe <container_source_name>

既存のコンテナーソースを一覧表示

$ kn source container list

既存のコンテナーソースを YAML 形式で一覧表示

$ kn source container list -o yaml

コンテナーソースを更新します。

このコマンドにより、既存のコンテナーソースのイメージ URI が更新されます。

$ kn source container update <container_source_name> --image <image_uri>

5.10.2.3. Web コンソールを使用したコンテナーソースの作成

Knative Eventing がクラスターにインストールされると、Web コンソールを使用してコンテナーソースを作成できます。OpenShift Container Platform Web コンソールを使用すると、イベントソースを作成するための合理的で直感的なユーザーインターフェイスが提供されます。

前提条件

  • OpenShift Container Platform Web コンソールにログインしている。
  • OpenShift Serverless Operator、Knative Serving、および Knative Eventing が OpenShift Container Platform クラスターにインストールされている。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。

手順

  1. Developer パースペクティブで、+Add Event Source に移動します。Event Sources ページが表示されます。
  2. Container Source を選択してから Create Event Source をクリックします。Create Event Source ページが表示されます。
  3. Form view または YAML view を使用して、Container Source 設定を設定します。

    注記

    Form viewYAML view 間で切り換えることができます。ビューの切り替え時に、データは永続化されます。

    1. Image フィールドに、コンテナーソースが作成したコンテナーで実行するイメージの URI を入力します。
    2. Name フィールドにイメージの名前を入力します。
    3. オプション: Arguments フィールドで、コンテナーに渡す引数を入力します。
    4. オプション: Environment variables フィールドで、コンテナーに設定する環境変数を追加します。
    5. Sink セクションで、コンテナーソースからのイベントがルーティングされるシンクを追加します。Form ビューを使用している場合は、以下のオプションから選択できます。

      1. Resource を選択して、チャネル、ブローカー、またはサービスをイベントソースのシンクとして使用します。
      2. URI を選択して、コンテナーソースからのイベントのルーティング先を指定します。
  4. コンテナーソースの設定が完了したら、Create をクリックします。

5.10.2.4. コンテナーソースのリファレンス

ContainerSource オブジェクトを作成することにより、コンテナーをイベントソースとして使用できます。ContainerSource オブジェクトを作成するときに、複数のパラメーターを設定できます。

ContainerSource オブジェクトは以下のフィールドをサポートします。

フィールド説明必須またはオプション

apiVersion

API バージョンを指定します (例: sources.knative.dev/v1)。

必須

kind

このリソースオブジェクトを ContainerSource オブジェクトとして特定します。

必須

metadata

ContainerSource オブジェクトを一意に識別するメタデータを指定します。たとえば、name です。

必須

spec

この ContainerSource オブジェクトの設定情報を指定します。

必須

spec.sink

シンクとして使用する URI に解決するオブジェクトへの参照。

必須

spec.template

ContainerSource オブジェクトの template 仕様。

必須

spec.ceOverrides

上書きを定義して、シンクに送信されたイベントへの出力形式および変更を制御します。

任意

テンプレートパラメーターの例

apiVersion: sources.knative.dev/v1
kind: ContainerSource
metadata:
  name: test-heartbeats
spec:
  template:
    spec:
      containers:
        - image: quay.io/openshift-knative/heartbeats:latest
          name: heartbeats
          args:
            - --period=1
          env:
            - name: POD_NAME
              value: "mypod"
            - name: POD_NAMESPACE
              value: "event-test"
  ...

5.10.2.4.1. CloudEvent オーバーライド

ceOverrides 定義は、シンクに送信される CloudEvent の出力形式および変更を制御するオーバーライドを提供します。ceOverrides 定義に複数のフィールドを設定できます。

ceOverrides の定義は、以下のフィールドをサポートします。

フィールド説明必須またはオプション

extensions

アウトバウンドイベントで追加または上書きされる属性を指定します。各 extensions のキーと値のペアは、属性拡張機能としてイベントに個別に設定されます。

任意

注記

拡張子として許可されるのは、有効な CloudEvent 属性名のみです。拡張機能オーバーライド設定から仕様定義属性を設定することはできません。たとえば、type 属性を変更することはできません。

CloudEvent オーバーライドの例

apiVersion: sources.knative.dev/v1
kind: ContainerSource
metadata:
  name: test-heartbeats
spec:
  ...
  ceOverrides:
    extensions:
      extra: this is an extra attribute
      additional: 42

これにより、subjectK_CE_OVERRIDES 環境変数が設定されます。

出力例

{ "extensions": { "extra": "this is an extra attribute", "additional": "42" } }

Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.