2.6. カスタムイベントソース
Knative に含まれていないイベントプロデューサーや、CloudEvent
形式ではないイベントを生成するプロデューサーからイベントを Ingress する必要がある場合は、カスタムイベントソースを使用してこれを実行できます。カスタムイベントソースは、次のいずれかの方法で作成できます。
-
シンクバインディングを作成して、
PodSpecable
オブジェクトをイベントソースとして使用します。 - コンテナーソースを作成して、コンテナーをイベントソースとして使用します。
2.6.1. シンクバインディング
SinkBinding
オブジェクトは、イベント生成を配信アドレス指定から切り離すことをサポートします。シンクバインディングは、イベントプロデューサー をイベントコンシューマーまたは シンク に接続するために使用されます。イベントプロデューサーは、PodSpec
テンプレートを組み込む Kubernetes リソースであり、イベントを生成します。シンクは、イベントを受信できるアドレス指定可能な Kubernetes オブジェクトです。
SinkBinding
オブジェクトは、環境変数をシンクの PodTemplateSpec
に挿入します。つまり、アプリケーションコードが Kubernetes API と直接対話してイベントの宛先を見つける必要はありません。これらの環境変数は以下のとおりです。
K_SINK
- 解決されたシンクの URL。
K_CE_OVERRIDES
- アウトバウンドイベントの上書きを指定する JSON オブジェクト。
現在、SinkBinding
オブジェクトはサービスのカスタムリビジョン名をサポートしません。
2.6.1.1. YAML を使用したシンクバインディングの作成
YAML ファイルを使用して Knative リソースを作成する場合は、宣言的 API を使用するため、再現性の高い方法でイベントソースを宣言的に記述できます。YAML を使用してシンクバインディングを作成するには、SinkBinding
オブジェクトを定義する YAML ファイルを作成し、oc apply
コマンドを使用してそれを適用する必要があります。
前提条件
- OpenShift Serverless Operator、Knative Serving、および Knative Eventing がクラスターにインストールされている。
-
OpenShift CLI (
oc
) がインストールされている。 - OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
手順
シンクバインディングが正しく設定されていることを確認するには、受信メッセージをダンプする Knative イベント表示サービスまたはイベントシンクを作成します。
サービス YAML ファイルを作成します。
サービス YAML ファイルの例
apiVersion: serving.knative.dev/v1 kind: Service metadata: name: event-display spec: template: spec: containers: - image: quay.io/openshift-knative/showcase
サービスを作成します。
$ oc apply -f <filename>
イベントをサービスに転送するシンクバインディングインスタンスを作成します。
シンクバインディング YAML ファイルを作成します。
サービス YAML ファイルの例
apiVersion: sources.knative.dev/v1alpha1 kind: SinkBinding metadata: name: bind-heartbeat spec: subject: apiVersion: batch/v1 kind: Job 1 selector: matchLabels: app: heartbeat-cron sink: ref: apiVersion: serving.knative.dev/v1 kind: Service name: event-display
- 1
- この例では、ラベル
app: heartbeat-cron
を指定したジョブがイベントシンクにバインドされます。
シンクバインディングを作成します。
$ oc apply -f <filename>
CronJob
オブジェクトを作成します。cron ジョブの YAML ファイルを作成します。
cron ジョブの YAML ファイルの例
apiVersion: batch/v1 kind: CronJob metadata: name: heartbeat-cron spec: # Run every minute schedule: "* * * * *" jobTemplate: metadata: labels: app: heartbeat-cron bindings.knative.dev/include: "true" spec: template: spec: restartPolicy: Never containers: - name: single-heartbeat image: quay.io/openshift-knative/heartbeats:latest args: - --period=1 env: - name: ONE_SHOT value: "true" - name: POD_NAME valueFrom: fieldRef: fieldPath: metadata.name - name: POD_NAMESPACE valueFrom: fieldRef: fieldPath: metadata.namespace
重要シンクバインディングを使用するには、
bindings.knative.dev/include=true
ラベルを Knative リソースに手動で追加する必要があります。たとえば、このラベルを
CronJob
インスタンスに追加するには、以下の行をJob
リソースの YAML 定義に追加します。jobTemplate: metadata: labels: app: heartbeat-cron bindings.knative.dev/include: "true"
cron ジョブを作成します。
$ oc apply -f <filename>
以下のコマンドを入力し、出力を検査して、コントローラーが正しくマップされていることを確認します。
$ oc get sinkbindings.sources.knative.dev bind-heartbeat -oyaml
出力例
spec: sink: ref: apiVersion: serving.knative.dev/v1 kind: Service name: event-display namespace: default subject: apiVersion: batch/v1 kind: Job namespace: default selector: matchLabels: app: heartbeat-cron
検証
メッセージダンパー機能ログを確認して、Kubernetes イベントが Knative イベントシンクに送信されていることを確認できます。
コマンドを入力します。
$ oc get pods
コマンドを入力します。
$ oc logs $(oc get pod -o name | grep event-display) -c user-container
出力例
☁️ cloudevents.Event Validation: valid Context Attributes, specversion: 1.0 type: dev.knative.eventing.samples.heartbeat source: https://knative.dev/eventing-contrib/cmd/heartbeats/#event-test/mypod id: 2b72d7bf-c38f-4a98-a433-608fbcdd2596 time: 2019-10-18T15:23:20.809775386Z contenttype: application/json Extensions, beats: true heart: yes the: 42 Data, { "id": 1, "label": "" }
2.6.1.2. Knative CLI を使用したシンクバインディングの作成
kn source binding create
コマンドを使用し、Knative (kn
) を使用してシンクバインディングを作成できます。Knative CLI を使用してイベントソースを作成すると、YAML ファイルを直接変更するよりも合理化された直感的なユーザーインターフェイスが提供されます。
前提条件
- OpenShift Serverless Operator、Knative Serving、および Knative Eventing がクラスターにインストールされている。
- OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
-
Knative (
kn
) CLI をインストールしている。 -
OpenShift CLI (
oc
) がインストールされている。
以下の手順では、YAML ファイルを作成する必要があります。
サンプルで使用されたもので YAML ファイルの名前を変更する場合は、必ず対応する CLI コマンドを更新する必要があります。
手順
シンクバインディングが正しく設定されていることを確認するには、受信メッセージをダンプする Knative イベント表示サービスまたはイベントシンクを作成します。
$ kn service create event-display --image quay.io/openshift-knative/showcase
イベントをサービスに転送するシンクバインディングインスタンスを作成します。
$ kn source binding create bind-heartbeat --subject Job:batch/v1:app=heartbeat-cron --sink ksvc:event-display
CronJob
オブジェクトを作成します。cron ジョブの YAML ファイルを作成します。
cron ジョブの YAML ファイルの例
apiVersion: batch/v1 kind: CronJob metadata: name: heartbeat-cron spec: # Run every minute schedule: "* * * * *" jobTemplate: metadata: labels: app: heartbeat-cron bindings.knative.dev/include: "true" spec: template: spec: restartPolicy: Never containers: - name: single-heartbeat image: quay.io/openshift-knative/heartbeats:latest args: - --period=1 env: - name: ONE_SHOT value: "true" - name: POD_NAME valueFrom: fieldRef: fieldPath: metadata.name - name: POD_NAMESPACE valueFrom: fieldRef: fieldPath: metadata.namespace
重要シンクバインディングを使用するには、
bindings.knative.dev/include=true
ラベルを Knative CR に手動で追加する必要があります。たとえば、このラベルを
CronJob
CR に追加するには、以下の行をJob
CR の YAML 定義に追加します。jobTemplate: metadata: labels: app: heartbeat-cron bindings.knative.dev/include: "true"
cron ジョブを作成します。
$ oc apply -f <filename>
以下のコマンドを入力し、出力を検査して、コントローラーが正しくマップされていることを確認します。
$ kn source binding describe bind-heartbeat
出力例
Name: bind-heartbeat Namespace: demo-2 Annotations: sources.knative.dev/creator=minikube-user, sources.knative.dev/lastModifier=minikub ... Age: 2m Subject: Resource: job (batch/v1) Selector: app: heartbeat-cron Sink: Name: event-display Resource: Service (serving.knative.dev/v1) Conditions: OK TYPE AGE REASON ++ Ready 2m
検証
メッセージダンパー機能ログを確認して、Kubernetes イベントが Knative イベントシンクに送信されていることを確認できます。
以下のコマンドを入力して、メッセージダンパー機能ログを表示します。
$ oc get pods
$ oc logs $(oc get pod -o name | grep event-display) -c user-container
出力例
☁️ cloudevents.Event Validation: valid Context Attributes, specversion: 1.0 type: dev.knative.eventing.samples.heartbeat source: https://knative.dev/eventing-contrib/cmd/heartbeats/#event-test/mypod id: 2b72d7bf-c38f-4a98-a433-608fbcdd2596 time: 2019-10-18T15:23:20.809775386Z contenttype: application/json Extensions, beats: true heart: yes the: 42 Data, { "id": 1, "label": "" }
2.6.1.2.1. Knative CLI シンクフラグ
Knative (kn
) CLI を使用してイベントソースを作成する場合は、--sink
フラグを使用して、そのリソースからイベントが送信されるシンクを指定できます。シンクは、他のリソースから受信イベントを受信できる、アドレス指定可能または呼び出し可能な任意のリソースです。
以下の例では、サービスの http://event-display.svc.cluster.local
をシンクとして使用するシンクバインディングを作成します。
シンクフラグを使用したコマンドの例
$ kn source binding create bind-heartbeat \
--namespace sinkbinding-example \
--subject "Job:batch/v1:app=heartbeat-cron" \
--sink http://event-display.svc.cluster.local \ 1
--ce-override "sink=bound"
- 1
http://event-display.svc.cluster.local
のsvc
は、シンクが Knative サービスであることを判別します。他のデフォルトのシンクの接頭辞には、channel
およびbroker
が含まれます。
2.6.1.3. Web コンソールを使用したシンクバインディングの作成
Knative Eventing がクラスターにインストールされると、Web コンソールを使用して シンクバインディングを作成できます。OpenShift Container Platform Web コンソールを使用すると、イベントソースを作成するための合理的で直感的なユーザーインターフェイスが提供されます。
前提条件
- OpenShift Container Platform Web コンソールにログインしている。
- OpenShift Serverless Operator、Knative Serving、および Knative Eventing が OpenShift Container Platform クラスターにインストールされている。
- OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
手順
シンクとして使用する Knative サービスを作成します。
-
Developer パースペクティブで、+Add
YAML に移動します。 サンプル YAML をコピーします。
apiVersion: serving.knative.dev/v1 kind: Service metadata: name: event-display spec: template: spec: containers: - image: quay.io/openshift-knative/showcase
- Create をクリックします。
-
Developer パースペクティブで、+Add
イベントソースとして使用される
CronJob
リソースを作成し、1 分ごとにイベントを送信します。-
Developer パースペクティブで、+Add
YAML に移動します。 サンプル YAML をコピーします。
apiVersion: batch/v1 kind: CronJob metadata: name: heartbeat-cron spec: # Run every minute schedule: "*/1 * * * *" jobTemplate: metadata: labels: app: heartbeat-cron bindings.knative.dev/include: true 1 spec: template: spec: restartPolicy: Never containers: - name: single-heartbeat image: quay.io/openshift-knative/heartbeats args: - --period=1 env: - name: ONE_SHOT value: "true" - name: POD_NAME valueFrom: fieldRef: fieldPath: metadata.name - name: POD_NAMESPACE valueFrom: fieldRef: fieldPath: metadata.namespace
- 1
bindings.knative.dev/include: true
ラベルを含めるようにしてください。OpenShift Serverless のデフォルトの namespace 選択動作は包含モードを使用します。
- Create をクリックします。
-
Developer パースペクティブで、+Add
直前の手順で作成したサービスと同じ namespace、またはイベントの送信先となる他のシンクと同じ namespace にシンクバインディングを作成します。
-
Developer パースペクティブで、+Add
Event Source に移動します。Event Sources ページが表示されます。 - オプション: イベントソースに複数のプロバイダーがある場合は、Providers 一覧から必要なプロバイダーを選択し、プロバイダーから利用可能なイベントソースをフィルターします。
Sink Binding を選択し、Create Event Source をクリックします。Create Event Source ページが表示されます。
注記Form view または YAML view を使用して Sink Binding 設定を設定し、ビューを切り替えることができます。ビューの切り替え時に、データは永続化されます。
-
apiVersion フィールドに
batch/v1
を入力します。 Kind フィールドに
Job
と入力します。注記CronJob
の種類は OpenShift Serverless シンクバインディングで直接サポートされていないため、Kind フィールドは cron ジョブオブジェクト自体ではなく、cron ジョブで作成されるJob
オブジェクトをターゲットにする必要があります。Target セクションで、イベントシンクを選択します。これは Resource または URI のいずれかです。
-
Resource を選択して、チャネル、ブローカー、またはサービスをイベントソースのシンクとして使用します。この例では、前の手順で作成した
event-display
サービスをターゲット Resource として使用します。 - URI を選択して、イベントのルーティング先となる URI (Uniform Resource Identifier) を指定します。
-
Resource を選択して、チャネル、ブローカー、またはサービスをイベントソースのシンクとして使用します。この例では、前の手順で作成した
Match labels セクションで以下を実行します。
-
Name フィールドに
app
と入力します。 Value フィールドに
heartbeat-cron
と入力します。注記ラベルセレクターは、リソース名ではなくシンクバインディングで cron ジョブを使用する場合に必要になります。これは、cron ジョブで作成されたジョブには予測可能な名前がなく、名前に無作為に生成される文字列が含まれているためです。たとえば、
hearthbeat-cron-1cc23f
になります。
-
Name フィールドに
- Create をクリックします。
-
Developer パースペクティブで、+Add
検証
Topology ページおよび Pod ログを表示して、シンクバインディング、シンク、および cron ジョブが正常に作成され、機能していることを確認できます。
- Developer パースペクティブで、Topology に移動します。
シンクバインディング、シンク、およびハートビートの cron ジョブを表示します。
- シンクバインディングが追加されると、正常なジョブが cron ジョブによって登録されていることを確認します。つまり、シンクバインディングは cron ジョブで作成されたジョブが正常に再設定されることを意味します。
イベント表示
サービスを参照して、ハートビート cron ジョブによって生成されたイベントを確認します。
2.6.1.4. シンクバインディング参照
シンクバインディングを作成して、PodSpecable
オブジェクトをイベントソースとして使用できます。SinkBinding
オブジェクトを作成するときに、複数のパラメーターを設定できます。
SinkBinding
オブジェクトは以下のパラメーターをサポートします。
フィールド | 説明 | 必須またはオプション |
---|---|---|
|
API バージョンを指定します (例: | 必須 |
|
このリソースオブジェクトを | 必須 |
|
| 必須 |
|
この | 必須 |
| シンクとして使用する URI に解決するオブジェクトへの参照。 | 必須 |
| ランタイムコントラクトがバインディング実装によって拡張されるリソースを参照します。 | 必須 |
| 上書きを定義して、シンクに送信されたイベントへの出力形式および変更を制御します。 | オプション |
2.6.1.4.1. Subject パラメーター
Subject
パラメーターは、ランタイムコントラクトがバインディング実装によって拡張されるリソースを参照します。Subject
定義に複数のフィールドを設定できます。
Subject
定義は、以下のフィールドをサポートします。
フィールド | 説明 | 必須またはオプション |
---|---|---|
| 参照先の API バージョン。 | 必須 |
| 参照先の種類。 | 必須 |
| 参照先の namespace。省略されている場合、デフォルトはオブジェクトの namespace に設定されます。 | オプション |
| 参照先の名前。 |
|
| 参照先のセレクター。 |
|
| ラベルセレクターの要件のリストです。 |
|
| セレクターが適用されるラベルキー。 |
|
|
キーと値のセットの関係を表します。有効な演算子は |
|
|
文字列値の配列。 |
|
|
キーと値のペアのマップ。 |
|
サブジェクトパラメーターの例
以下の YAML の場合は、default
namespace の mysubject
という名前の Deployment
オブジェクトが選択されます。
apiVersion: sources.knative.dev/v1 kind: SinkBinding metadata: name: bind-heartbeat spec: subject: apiVersion: apps/v1 kind: Deployment namespace: default name: mysubject ...
以下の YAML の場合は、default
namespace にラベル working=example
が設定された Job
オブジェクトが選択されます。
apiVersion: sources.knative.dev/v1 kind: SinkBinding metadata: name: bind-heartbeat spec: subject: apiVersion: batch/v1 kind: Job namespace: default selector: matchLabels: working: example ...
以下の YAML の場合は、default
namespace にラベル working=example
または working=sample
が含まれる Pod
オブジェクトが選択されます。
apiVersion: sources.knative.dev/v1 kind: SinkBinding metadata: name: bind-heartbeat spec: subject: apiVersion: v1 kind: Pod namespace: default selector: - matchExpression: key: working operator: In values: - example - sample ...
2.6.1.4.2. CloudEvent オーバーライド
ceOverrides
定義は、シンクに送信される CloudEvent の出力形式および変更を制御するオーバーライドを提供します。ceOverrides
定義に複数のフィールドを設定できます。
ceOverrides
の定義は、以下のフィールドをサポートします。
フィールド | 説明 | 必須またはオプション |
---|---|---|
|
アウトバウンドイベントで追加または上書きされる属性を指定します。各 | オプション |
拡張子として許可されるのは、有効な CloudEvent
属性名のみです。拡張機能オーバーライド設定から仕様定義属性を設定することはできません。たとえば、type
属性を変更することはできません。
CloudEvent オーバーライドの例
apiVersion: sources.knative.dev/v1 kind: SinkBinding metadata: name: bind-heartbeat spec: ... ceOverrides: extensions: extra: this is an extra attribute additional: 42
これにより、subject
に K_CE_OVERRIDES
環境変数が設定されます。
出力例
{ "extensions": { "extra": "this is an extra attribute", "additional": "42" } }
2.6.1.4.3. include ラベル
シンクバインディングを使用するには、bindings.knative.dev/include: "true"
ラベルをリソースまたはリソースが含まれる namespace のいずれかに割り当てる必要があります。リソース定義にラベルが含まれていない場合、クラスター管理者は以下を実行してこれを namespace に割り当てることができます。
$ oc label namespace <namespace> bindings.knative.dev/include=true
2.6.1.5. Service Mesh とシンクバインディングの統合
前提条件
- Service Mesh を OpenShift Serverless と統合しました。
手順
ServiceMeshMemberRoll
のメンバーである namespace にService
を作成します。apiVersion: serving.knative.dev/v1 kind: Service metadata: name: event-display namespace: <namespace> 1 spec: template: metadata: annotations: sidecar.istio.io/inject: "true" 2 sidecar.istio.io/rewriteAppHTTPProbers: "true" spec: containers: - image: quay.io/openshift-knative/showcase
Service
リソースを適用します。$ oc apply -f <filename>
SinkBinding
リソースを作成します。apiVersion: sources.knative.dev/v1 kind: SinkBinding metadata: name: bind-heartbeat namespace: <namespace> 1 spec: subject: apiVersion: batch/v1 kind: Job 2 selector: matchLabels: app: heartbeat-cron sink: ref: apiVersion: serving.knative.dev/v1 kind: Service name: event-display
SinkBinding
リソースを適用します。$ oc apply -f <filename>
CronJob
を作成します。apiVersion: batch/v1 kind: CronJob metadata: name: heartbeat-cron namespace: <namespace> 1 spec: # Run every minute schedule: "* * * * *" jobTemplate: metadata: labels: app: heartbeat-cron bindings.knative.dev/include: "true" spec: template: metadata: annotations: sidecar.istio.io/inject: "true" 2 sidecar.istio.io/rewriteAppHTTPProbers: "true" spec: restartPolicy: Never containers: - name: single-heartbeat image: quay.io/openshift-knative/heartbeats:latest args: - --period=1 env: - name: ONE_SHOT value: "true" - name: POD_NAME valueFrom: fieldRef: fieldPath: metadata.name - name: POD_NAMESPACE valueFrom: fieldRef: fieldPath: metadata.namespace
CronJob
リソースを適用します。$ oc apply -f <filename>
検証
イベントが Knative イベントシンクに送信されたことを確認するには、メッセージダンパー機能のログを調べます。
以下のコマンドを入力します。
$ oc get pods
以下のコマンドを入力します。
$ oc logs $(oc get pod -o name | grep event-display) -c user-container
出力例
☁️ cloudevents.Event Validation: valid Context Attributes, specversion: 1.0 type: dev.knative.eventing.samples.heartbeat source: https://knative.dev/eventing/test/heartbeats/#event-test/mypod id: 2b72d7bf-c38f-4a98-a433-608fbcdd2596 time: 2019-10-18T15:23:20.809775386Z contenttype: application/json Extensions, beats: true heart: yes the: 42 Data, { "id": 1, "label": "" }
2.6.2. コンテナーソース
コンテナーソースは、イベントを生成し、イベントをシンクに送信するコンテナーイメージを作成します。コンテナーソースを使用して、イメージ URI を使用するコンテナーイメージおよび ContainerSource
オブジェクトを作成して、カスタムイベントソースを作成できます。
2.6.2.1. コンテナーイメージを作成するためのガイドライン
コンテナーソースコントローラーには、K_SINK
および K_CE_OVERRIDES
の 2 つの環境変数が注入されます。これらの変数は、それぞれ sink
および ceOverrides
仕様から解決されます。イベントは、K_SINK
環境変数で指定されたシンク URI に送信されます。メッセージは、CloudEvent
HTTP 形式を使用して POST
として送信する必要があります。
コンテナーイメージの例
以下は、ハートビートコンテナーイメージの例になります。
package main import ( "context" "encoding/json" "flag" "fmt" "log" "os" "strconv" "time" duckv1 "knative.dev/pkg/apis/duck/v1" cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/kelseyhightower/envconfig" ) type Heartbeat struct { Sequence int `json:"id"` Label string `json:"label"` } var ( eventSource string eventType string sink string label string periodStr string ) func init() { flag.StringVar(&eventSource, "eventSource", "", "the event-source (CloudEvents)") flag.StringVar(&eventType, "eventType", "dev.knative.eventing.samples.heartbeat", "the event-type (CloudEvents)") flag.StringVar(&sink, "sink", "", "the host url to heartbeat to") flag.StringVar(&label, "label", "", "a special label") flag.StringVar(&periodStr, "period", "5", "the number of seconds between heartbeats") } type envConfig struct { // Sink URL where to send heartbeat cloud events Sink string `envconfig:"K_SINK"` // CEOverrides are the CloudEvents overrides to be applied to the outbound event. CEOverrides string `envconfig:"K_CE_OVERRIDES"` // Name of this pod. Name string `envconfig:"POD_NAME" required:"true"` // Namespace this pod exists in. Namespace string `envconfig:"POD_NAMESPACE" required:"true"` // Whether to run continuously or exit. OneShot bool `envconfig:"ONE_SHOT" default:"false"` } func main() { flag.Parse() var env envConfig if err := envconfig.Process("", &env); err != nil { log.Printf("[ERROR] Failed to process env var: %s", err) os.Exit(1) } if env.Sink != "" { sink = env.Sink } var ceOverrides *duckv1.CloudEventOverrides if len(env.CEOverrides) > 0 { overrides := duckv1.CloudEventOverrides{} err := json.Unmarshal([]byte(env.CEOverrides), &overrides) if err != nil { log.Printf("[ERROR] Unparseable CloudEvents overrides %s: %v", env.CEOverrides, err) os.Exit(1) } ceOverrides = &overrides } p, err := cloudevents.NewHTTP(cloudevents.WithTarget(sink)) if err != nil { log.Fatalf("failed to create http protocol: %s", err.Error()) } c, err := cloudevents.NewClient(p, cloudevents.WithUUIDs(), cloudevents.WithTimeNow()) if err != nil { log.Fatalf("failed to create client: %s", err.Error()) } var period time.Duration if p, err := strconv.Atoi(periodStr); err != nil { period = time.Duration(5) * time.Second } else { period = time.Duration(p) * time.Second } if eventSource == "" { eventSource = fmt.Sprintf("https://knative.dev/eventing-contrib/cmd/heartbeats/#%s/%s", env.Namespace, env.Name) log.Printf("Heartbeats Source: %s", eventSource) } if len(label) > 0 && label[0] == '"' { label, _ = strconv.Unquote(label) } hb := &Heartbeat{ Sequence: 0, Label: label, } ticker := time.NewTicker(period) for { hb.Sequence++ event := cloudevents.NewEvent("1.0") event.SetType(eventType) event.SetSource(eventSource) event.SetExtension("the", 42) event.SetExtension("heart", "yes") event.SetExtension("beats", true) if ceOverrides != nil && ceOverrides.Extensions != nil { for n, v := range ceOverrides.Extensions { event.SetExtension(n, v) } } if err := event.SetData(cloudevents.ApplicationJSON, hb); err != nil { log.Printf("failed to set cloudevents data: %s", err.Error()) } log.Printf("sending cloudevent to %s", sink) if res := c.Send(context.Background(), event); !cloudevents.IsACK(res) { log.Printf("failed to send cloudevent: %v", res) } if env.OneShot { return } // Wait for next tick <-ticker.C } }
以下は、以前のハートビートコンテナーイメージを参照するコンテナーソースの例です。
apiVersion: sources.knative.dev/v1 kind: ContainerSource metadata: name: test-heartbeats spec: template: spec: containers: # This corresponds to a heartbeats image URI that you have built and published - image: gcr.io/knative-releases/knative.dev/eventing/cmd/heartbeats name: heartbeats args: - --period=1 env: - name: POD_NAME value: "example-pod" - name: POD_NAMESPACE value: "event-test" sink: ref: apiVersion: serving.knative.dev/v1 kind: Service name: showcase ...
2.6.2.2. Knative CLI を使用したコンテナーソースの作成および管理
kn source container
コマンドを使用し、Knative (kn
) CLI を使用してコンテナーソースを作成および管理できます。Knative CLI を使用してイベントソースを作成すると、YAML ファイルを直接変更するよりも合理化された直感的なユーザーインターフェイスが提供されます。
コンテナーソースの作成
$ kn source container create <container_source_name> --image <image_uri> --sink <sink>
コンテナーソースの削除
$ kn source container delete <container_source_name>
コンテナーソースの記述
$ kn source container describe <container_source_name>
既存のコンテナーソースをリスト表示
$ kn source container list
既存のコンテナーソースを YAML 形式でリスト表示
$ kn source container list -o yaml
コンテナーソースの更新
このコマンドにより、既存のコンテナーソースのイメージ URI が更新されます。
$ kn source container update <container_source_name> --image <image_uri>
2.6.2.3. Web コンソールを使用したコンテナーソースの作成
Knative Eventing がクラスターにインストールされると、Web コンソールを使用してコンテナーソースを作成できます。OpenShift Container Platform Web コンソールを使用すると、イベントソースを作成するための合理的で直感的なユーザーインターフェイスが提供されます。
前提条件
- OpenShift Container Platform Web コンソールにログインしている。
- OpenShift Serverless Operator、Knative Serving、および Knative Eventing が OpenShift Container Platform クラスターにインストールされている。
- OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
手順
-
Developer パースペクティブで、+Add
Event Source に移動します。Event Sources ページが表示されます。 - Container Source を選択してから Create Event Source をクリックします。Create Event Source ページが表示されます。
Form view または YAML view を使用して、Container Source 設定を設定します。
注記Form view と YAML view 間で切り換えることができます。ビューの切り替え時に、データは永続化されます。
- Image フィールドに、コンテナーソースが作成したコンテナーで実行するイメージの URI を入力します。
- Name フィールドにイメージの名前を入力します。
- オプション: Arguments フィールドで、コンテナーに渡す引数を入力します。
- オプション: Environment variables フィールドで、コンテナーに設定する環境変数を追加します。
Target セクションで、イベントシンクを選択します。これは Resource または URI のいずれかです。
- Resource を選択して、チャネル、ブローカー、またはサービスをイベントソースのシンクとして使用します。
- URI を選択して、イベントのルーティング先となる URI (Uniform Resource Identifier) を指定します。
- コンテナーソースの設定が完了したら、Create をクリックします。
2.6.2.4. コンテナーソースのリファレンス
ContainerSource
オブジェクトを作成することにより、コンテナーをイベントソースとして使用できます。ContainerSource
オブジェクトを作成するときに、複数のパラメーターを設定できます。
ContainerSource
オブジェクトは以下のフィールドをサポートします。
フィールド | 説明 | 必須またはオプション |
---|---|---|
|
API バージョンを指定します (例: | 必須 |
|
このリソースオブジェクトを | 必須 |
|
| 必須 |
|
この | 必須 |
| シンクとして使用する URI に解決するオブジェクトへの参照。 | 必須 |
|
| 必須 |
| 上書きを定義して、シンクに送信されたイベントへの出力形式および変更を制御します。 | オプション |
テンプレートパラメーターの例
apiVersion: sources.knative.dev/v1 kind: ContainerSource metadata: name: test-heartbeats spec: template: spec: containers: - image: quay.io/openshift-knative/heartbeats:latest name: heartbeats args: - --period=1 env: - name: POD_NAME value: "mypod" - name: POD_NAMESPACE value: "event-test" ...
2.6.2.4.1. CloudEvent オーバーライド
ceOverrides
定義は、シンクに送信される CloudEvent の出力形式および変更を制御するオーバーライドを提供します。ceOverrides
定義に複数のフィールドを設定できます。
ceOverrides
の定義は、以下のフィールドをサポートします。
フィールド | 説明 | 必須またはオプション |
---|---|---|
|
アウトバウンドイベントで追加または上書きされる属性を指定します。各 | オプション |
拡張子として許可されるのは、有効な CloudEvent
属性名のみです。拡張機能オーバーライド設定から仕様定義属性を設定することはできません。たとえば、type
属性を変更することはできません。
CloudEvent オーバーライドの例
apiVersion: sources.knative.dev/v1 kind: ContainerSource metadata: name: test-heartbeats spec: ... ceOverrides: extensions: extra: this is an extra attribute additional: 42
これにより、subject
に K_CE_OVERRIDES
環境変数が設定されます。
出力例
{ "extensions": { "extra": "this is an extra attribute", "additional": "42" } }
2.6.2.5. Service Mesh と ContainerSource の統合
前提条件
- Service Mesh を OpenShift Serverless と統合しました。
手順
ServiceMeshMemberRoll
のメンバーである namespace にService
を作成します。apiVersion: serving.knative.dev/v1 kind: Service metadata: name: event-display namespace: <namespace> 1 spec: template: metadata: annotations: sidecar.istio.io/inject: "true" 2 sidecar.istio.io/rewriteAppHTTPProbers: "true" spec: containers: - image: quay.io/openshift-knative/showcase
Service
リソースを適用します。$ oc apply -f <filename>
ServiceMeshMemberRoll
のメンバーである namespace にContainerSource
オブジェクトを作成し、event-display
に設定されたシンクを作成します。apiVersion: sources.knative.dev/v1 kind: ContainerSource metadata: name: test-heartbeats namespace: <namespace> 1 spec: template: metadata: 2 annotations: sidecar.istio.io/inject: "true" sidecar.istio.io/rewriteAppHTTPProbers: "true" spec: containers: - image: quay.io/openshift-knative/heartbeats:latest name: heartbeats args: - --period=1s env: - name: POD_NAME value: "example-pod" - name: POD_NAMESPACE value: "event-test" sink: ref: apiVersion: serving.knative.dev/v1 kind: Service name: event-display
ContainerSource
リソースを適用します。$ oc apply -f <filename>
検証
イベントが Knative イベントシンクに送信されたことを確認するには、メッセージダンパー機能のログを調べます。
以下のコマンドを入力します。
$ oc get pods
以下のコマンドを入力します。
$ oc logs $(oc get pod -o name | grep event-display) -c user-container
出力例
☁️ cloudevents.Event Validation: valid Context Attributes, specversion: 1.0 type: dev.knative.eventing.samples.heartbeat source: https://knative.dev/eventing/test/heartbeats/#event-test/mypod id: 2b72d7bf-c38f-4a98-a433-608fbcdd2596 time: 2019-10-18T15:23:20.809775386Z contenttype: application/json Extensions, beats: true heart: yes the: 42 Data, { "id": 1, "label": "" }