2.6. カスタムイベントソース
Knative に含まれていないイベントプロデューサーや、CloudEvent 形式ではないイベントを生成するプロデューサーからイベントを Ingress する必要がある場合は、カスタムイベントソースを使用してこれを実行できます。カスタムイベントソースは、次のいずれかの方法で作成できます。
-
シンクバインディングを作成して、
PodSpecableオブジェクトをイベントソースとして使用します。 - コンテナーソースを作成して、コンテナーをイベントソースとして使用します。
2.6.1. シンクバインディング リンクのコピーリンクがクリップボードにコピーされました!
SinkBinding オブジェクトは、イベント生成を配信アドレス指定から切り離すことをサポートします。シンクバインディングは、イベントプロデューサー をイベントコンシューマーまたは シンク に接続するために使用されます。イベントプロデューサーは、PodSpec テンプレートを組み込む Kubernetes リソースであり、イベントを生成します。シンクは、イベントを受信できるアドレス指定可能な Kubernetes オブジェクトです。
SinkBinding オブジェクトは、環境変数をシンクの PodTemplateSpec に挿入します。つまり、アプリケーションコードが Kubernetes API と直接対話してイベントの宛先を見つける必要はありません。これらの環境変数は以下のとおりです。
K_SINK- 解決されたシンクの URL。
K_CE_OVERRIDES- アウトバウンドイベントの上書きを指定する JSON オブジェクト。
現在、SinkBinding オブジェクトはサービスのカスタムリビジョン名をサポートしません。
2.6.1.1. YAML を使用したシンクバインディングの作成 リンクのコピーリンクがクリップボードにコピーされました!
YAML ファイルを使用して Knative リソースを作成する場合は、宣言的 API を使用するため、再現性の高い方法でイベントソースを宣言的に記述できます。YAML を使用してシンクバインディングを作成するには、SinkBinding オブジェクトを定義する YAML ファイルを作成し、oc apply コマンドを使用してそれを適用する必要があります。
前提条件
- OpenShift Serverless Operator、Knative Serving、および Knative Eventing がクラスターにインストールされている。
-
OpenShift CLI (
oc) がインストールされている。 - OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
手順
シンクバインディングが正しく設定されていることを確認するには、受信メッセージをダンプする Knative イベント表示サービスまたはイベントシンクを作成します。
サービス YAML ファイルを作成します。
サービス YAML ファイルの例
apiVersion: serving.knative.dev/v1 kind: Service metadata: name: event-display spec: template: spec: containers: - image: quay.io/openshift-knative/showcaseサービスを作成します。
$ oc apply -f <filename>
イベントをサービスに転送するシンクバインディングインスタンスを作成します。
シンクバインディング YAML ファイルを作成します。
サービス YAML ファイルの例
apiVersion: sources.knative.dev/v1alpha1 kind: SinkBinding metadata: name: bind-heartbeat spec: subject: apiVersion: batch/v1 kind: Job1 selector: matchLabels: app: heartbeat-cron sink: ref: apiVersion: serving.knative.dev/v1 kind: Service name: event-display- 1
- この例では、ラベル
app: heartbeat-cronを指定したジョブがイベントシンクにバインドされます。
シンクバインディングを作成します。
$ oc apply -f <filename>
CronJobオブジェクトを作成します。cron ジョブの YAML ファイルを作成します。
cron ジョブの YAML ファイルの例
apiVersion: batch/v1 kind: CronJob metadata: name: heartbeat-cron spec: # Run every minute schedule: "* * * * *" jobTemplate: metadata: labels: app: heartbeat-cron bindings.knative.dev/include: "true" spec: template: spec: restartPolicy: Never containers: - name: single-heartbeat image: quay.io/openshift-knative/heartbeats:latest args: - --period=1 env: - name: ONE_SHOT value: "true" - name: POD_NAME valueFrom: fieldRef: fieldPath: metadata.name - name: POD_NAMESPACE valueFrom: fieldRef: fieldPath: metadata.namespace重要シンクバインディングを使用するには、
bindings.knative.dev/include=trueラベルを Knative リソースに手動で追加する必要があります。たとえば、このラベルを
CronJobインスタンスに追加するには、以下の行をJobリソースの YAML 定義に追加します。jobTemplate: metadata: labels: app: heartbeat-cron bindings.knative.dev/include: "true"cron ジョブを作成します。
$ oc apply -f <filename>
以下のコマンドを入力し、出力を検査して、コントローラーが正しくマップされていることを確認します。
$ oc get sinkbindings.sources.knative.dev bind-heartbeat -oyaml出力例
spec: sink: ref: apiVersion: serving.knative.dev/v1 kind: Service name: event-display namespace: default subject: apiVersion: batch/v1 kind: Job namespace: default selector: matchLabels: app: heartbeat-cron
検証
メッセージダンパー機能ログを確認して、Kubernetes イベントが Knative イベントシンクに送信されていることを確認できます。
コマンドを入力します。
$ oc get podsコマンドを入力します。
$ oc logs $(oc get pod -o name | grep event-display) -c user-container出力例
☁️ cloudevents.Event Validation: valid Context Attributes, specversion: 1.0 type: dev.knative.eventing.samples.heartbeat source: https://knative.dev/eventing-contrib/cmd/heartbeats/#event-test/mypod id: 2b72d7bf-c38f-4a98-a433-608fbcdd2596 time: 2019-10-18T15:23:20.809775386Z contenttype: application/json Extensions, beats: true heart: yes the: 42 Data, { "id": 1, "label": "" }
2.6.1.2. Knative CLI を使用したシンクバインディングの作成 リンクのコピーリンクがクリップボードにコピーされました!
kn source binding create コマンドを使用し、Knative (kn) を使用してシンクバインディングを作成できます。Knative CLI を使用してイベントソースを作成すると、YAML ファイルを直接変更するよりも合理化された直感的なユーザーインターフェイスが提供されます。
前提条件
- OpenShift Serverless Operator、Knative Serving、および Knative Eventing がクラスターにインストールされている。
- OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
-
Knative (
kn) CLI をインストールしている。 -
OpenShift CLI (
oc) がインストールされている。
以下の手順では、YAML ファイルを作成する必要があります。
サンプルで使用されたもので YAML ファイルの名前を変更する場合は、必ず対応する CLI コマンドを更新する必要があります。
手順
シンクバインディングが正しく設定されていることを確認するには、受信メッセージをダンプする Knative イベント表示サービスまたはイベントシンクを作成します。
$ kn service create event-display --image quay.io/openshift-knative/showcaseイベントをサービスに転送するシンクバインディングインスタンスを作成します。
$ kn source binding create bind-heartbeat --subject Job:batch/v1:app=heartbeat-cron --sink ksvc:event-displayCronJobオブジェクトを作成します。cron ジョブの YAML ファイルを作成します。
cron ジョブの YAML ファイルの例
apiVersion: batch/v1 kind: CronJob metadata: name: heartbeat-cron spec: # Run every minute schedule: "* * * * *" jobTemplate: metadata: labels: app: heartbeat-cron bindings.knative.dev/include: "true" spec: template: spec: restartPolicy: Never containers: - name: single-heartbeat image: quay.io/openshift-knative/heartbeats:latest args: - --period=1 env: - name: ONE_SHOT value: "true" - name: POD_NAME valueFrom: fieldRef: fieldPath: metadata.name - name: POD_NAMESPACE valueFrom: fieldRef: fieldPath: metadata.namespace重要シンクバインディングを使用するには、
bindings.knative.dev/include=trueラベルを Knative CR に手動で追加する必要があります。たとえば、このラベルを
CronJobCR に追加するには、以下の行をJobCR の YAML 定義に追加します。jobTemplate: metadata: labels: app: heartbeat-cron bindings.knative.dev/include: "true"cron ジョブを作成します。
$ oc apply -f <filename>
以下のコマンドを入力し、出力を検査して、コントローラーが正しくマップされていることを確認します。
$ kn source binding describe bind-heartbeat出力例
Name: bind-heartbeat Namespace: demo-2 Annotations: sources.knative.dev/creator=minikube-user, sources.knative.dev/lastModifier=minikub ... Age: 2m Subject: Resource: job (batch/v1) Selector: app: heartbeat-cron Sink: Name: event-display Resource: Service (serving.knative.dev/v1) Conditions: OK TYPE AGE REASON ++ Ready 2m
検証
メッセージダンパー機能ログを確認して、Kubernetes イベントが Knative イベントシンクに送信されていることを確認できます。
以下のコマンドを入力して、メッセージダンパー機能ログを表示します。
$ oc get pods$ oc logs $(oc get pod -o name | grep event-display) -c user-container出力例
☁️ cloudevents.Event Validation: valid Context Attributes, specversion: 1.0 type: dev.knative.eventing.samples.heartbeat source: https://knative.dev/eventing-contrib/cmd/heartbeats/#event-test/mypod id: 2b72d7bf-c38f-4a98-a433-608fbcdd2596 time: 2019-10-18T15:23:20.809775386Z contenttype: application/json Extensions, beats: true heart: yes the: 42 Data, { "id": 1, "label": "" }
2.6.1.2.1. Knative CLI シンクフラグ リンクのコピーリンクがクリップボードにコピーされました!
Knative (kn) CLI を使用してイベントソースを作成する場合は、--sink フラグを使用して、そのリソースからイベントが送信されるシンクを指定できます。シンクは、他のリソースから受信イベントを受信できる、アドレス指定可能または呼び出し可能な任意のリソースです。
以下の例では、サービスの http://event-display.svc.cluster.local をシンクとして使用するシンクバインディングを作成します。
シンクフラグを使用したコマンドの例
$ kn source binding create bind-heartbeat \
--namespace sinkbinding-example \
--subject "Job:batch/v1:app=heartbeat-cron" \
--sink http://event-display.svc.cluster.local \
--ce-override "sink=bound"
- 1
http://event-display.svc.cluster.localのsvcは、シンクが Knative サービスであることを判別します。他のデフォルトのシンクの接頭辞には、channelおよびbrokerが含まれます。
2.6.1.3. Web コンソールを使用したシンクバインディングの作成 リンクのコピーリンクがクリップボードにコピーされました!
Knative Eventing がクラスターにインストールされると、Web コンソールを使用して シンクバインディングを作成できます。OpenShift Container Platform Web コンソールを使用すると、イベントソースを作成するための合理的で直感的なユーザーインターフェイスが提供されます。
前提条件
- OpenShift Container Platform Web コンソールにログインしている。
- OpenShift Serverless Operator、Knative Serving、および Knative Eventing が OpenShift Container Platform クラスターにインストールされている。
- OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
手順
シンクとして使用する Knative サービスを作成します。
-
Developer パースペクティブで、+Add
YAML に移動します。 サンプル YAML をコピーします。
apiVersion: serving.knative.dev/v1 kind: Service metadata: name: event-display spec: template: spec: containers: - image: quay.io/openshift-knative/showcase- Create をクリックします。
-
Developer パースペクティブで、+Add
イベントソースとして使用される
CronJobリソースを作成し、1 分ごとにイベントを送信します。-
Developer パースペクティブで、+Add
YAML に移動します。 サンプル YAML をコピーします。
apiVersion: batch/v1 kind: CronJob metadata: name: heartbeat-cron spec: # Run every minute schedule: "*/1 * * * *" jobTemplate: metadata: labels: app: heartbeat-cron bindings.knative.dev/include: true1 spec: template: spec: restartPolicy: Never containers: - name: single-heartbeat image: quay.io/openshift-knative/heartbeats args: - --period=1 env: - name: ONE_SHOT value: "true" - name: POD_NAME valueFrom: fieldRef: fieldPath: metadata.name - name: POD_NAMESPACE valueFrom: fieldRef: fieldPath: metadata.namespace- 1
bindings.knative.dev/include: trueラベルを含めるようにしてください。OpenShift Serverless のデフォルトの namespace 選択動作は包含モードを使用します。
- Create をクリックします。
-
Developer パースペクティブで、+Add
直前の手順で作成したサービスと同じ namespace、またはイベントの送信先となる他のシンクと同じ namespace にシンクバインディングを作成します。
-
Developer パースペクティブで、+Add
Event Source に移動します。Event Sources ページが表示されます。 - オプション: イベントソースに複数のプロバイダーがある場合は、Providers 一覧から必要なプロバイダーを選択し、プロバイダーから利用可能なイベントソースをフィルターします。
Sink Binding を選択し、Create Event Source をクリックします。Create Event Source ページが表示されます。
注記Form view または YAML view を使用して Sink Binding 設定を設定し、ビューを切り替えることができます。ビューの切り替え時に、データは永続化されます。
-
apiVersion フィールドに
batch/v1を入力します。 Kind フィールドに
Jobと入力します。注記CronJobの種類は OpenShift Serverless シンクバインディングで直接サポートされていないため、Kind フィールドは cron ジョブオブジェクト自体ではなく、cron ジョブで作成されるJobオブジェクトをターゲットにする必要があります。Target セクションで、イベントシンクを選択します。これは Resource または URI のいずれかです。
-
Resource を選択して、チャネル、ブローカー、またはサービスをイベントソースのシンクとして使用します。この例では、前の手順で作成した
event-displayサービスをターゲット Resource として使用します。 - URI を選択して、イベントのルーティング先となる URI (Uniform Resource Identifier) を指定します。
-
Resource を選択して、チャネル、ブローカー、またはサービスをイベントソースのシンクとして使用します。この例では、前の手順で作成した
Match labels セクションで以下を実行します。
-
Name フィールドに
appと入力します。 Value フィールドに
heartbeat-cronと入力します。注記ラベルセレクターは、リソース名ではなくシンクバインディングで cron ジョブを使用する場合に必要になります。これは、cron ジョブで作成されたジョブには予測可能な名前がなく、名前に無作為に生成される文字列が含まれているためです。たとえば、
hearthbeat-cron-1cc23fになります。
-
Name フィールドに
- Create をクリックします。
-
Developer パースペクティブで、+Add
検証
Topology ページおよび Pod ログを表示して、シンクバインディング、シンク、および cron ジョブが正常に作成され、機能していることを確認できます。
- Developer パースペクティブで、Topology に移動します。
シンクバインディング、シンク、およびハートビートの cron ジョブを表示します。
- シンクバインディングが追加されると、正常なジョブが cron ジョブによって登録されていることを確認します。つまり、シンクバインディングは cron ジョブで作成されたジョブが正常に再設定されることを意味します。
イベント表示サービスを参照して、ハートビート cron ジョブによって生成されたイベントを確認します。
2.6.1.4. シンクバインディング参照 リンクのコピーリンクがクリップボードにコピーされました!
シンクバインディングを作成して、PodSpecable オブジェクトをイベントソースとして使用できます。SinkBinding オブジェクトを作成するときに、複数のパラメーターを設定できます。
SinkBinding オブジェクトは以下のパラメーターをサポートします。
| フィールド | 説明 | 必須またはオプション |
|---|---|---|
|
|
API バージョンを指定します (例: | 必須 |
|
|
このリソースオブジェクトを | 必須 |
|
|
| 必須 |
|
|
この | 必須 |
|
| シンクとして使用する URI に解決するオブジェクトへの参照。 | 必須 |
|
| ランタイムコントラクトがバインディング実装によって拡張されるリソースを参照します。 | 必須 |
|
| 上書きを定義して、シンクに送信されたイベントへの出力形式および変更を制御します。 | オプション |
2.6.1.4.1. Subject パラメーター リンクのコピーリンクがクリップボードにコピーされました!
Subject パラメーターは、ランタイムコントラクトがバインディング実装によって拡張されるリソースを参照します。Subject 定義に複数のフィールドを設定できます。
Subject 定義は、以下のフィールドをサポートします。
| フィールド | 説明 | 必須またはオプション |
|---|---|---|
|
| 参照先の API バージョン。 | 必須 |
|
| 参照先の種類。 | 必須 |
|
| 参照先の namespace。省略されている場合、デフォルトはオブジェクトの namespace に設定されます。 | オプション |
|
| 参照先の名前。 |
|
|
| 参照先のセレクター。 |
|
|
| ラベルセレクターの要件のリストです。 |
|
|
| セレクターが適用されるラベルキー。 |
|
|
|
キーと値のセットの関係を表します。有効な演算子は |
|
|
|
文字列値の配列。 |
|
|
|
キーと値のペアのマップ。 |
|
サブジェクトパラメーターの例
以下の YAML の場合は、default namespace の mysubject という名前の Deployment オブジェクトが選択されます。
apiVersion: sources.knative.dev/v1
kind: SinkBinding
metadata:
name: bind-heartbeat
spec:
subject:
apiVersion: apps/v1
kind: Deployment
namespace: default
name: mysubject
...
以下の YAML の場合は、default namespace にラベル working=example が設定された Job オブジェクトが選択されます。
apiVersion: sources.knative.dev/v1
kind: SinkBinding
metadata:
name: bind-heartbeat
spec:
subject:
apiVersion: batch/v1
kind: Job
namespace: default
selector:
matchLabels:
working: example
...
以下の YAML の場合は、default namespace にラベル working=example または working=sample が含まれる Pod オブジェクトが選択されます。
apiVersion: sources.knative.dev/v1
kind: SinkBinding
metadata:
name: bind-heartbeat
spec:
subject:
apiVersion: v1
kind: Pod
namespace: default
selector:
- matchExpression:
key: working
operator: In
values:
- example
- sample
...
2.6.1.4.2. CloudEvent オーバーライド リンクのコピーリンクがクリップボードにコピーされました!
ceOverrides 定義は、シンクに送信される CloudEvent の出力形式および変更を制御するオーバーライドを提供します。ceOverrides 定義に複数のフィールドを設定できます。
ceOverrides の定義は、以下のフィールドをサポートします。
| フィールド | 説明 | 必須またはオプション |
|---|---|---|
|
|
アウトバウンドイベントで追加または上書きされる属性を指定します。各 | オプション |
拡張子として許可されるのは、有効な CloudEvent 属性名のみです。拡張機能オーバーライド設定から仕様定義属性を設定することはできません。たとえば、type 属性を変更することはできません。
CloudEvent オーバーライドの例
apiVersion: sources.knative.dev/v1
kind: SinkBinding
metadata:
name: bind-heartbeat
spec:
...
ceOverrides:
extensions:
extra: this is an extra attribute
additional: 42
これにより、subject に K_CE_OVERRIDES 環境変数が設定されます。
出力例
{ "extensions": { "extra": "this is an extra attribute", "additional": "42" } }
2.6.1.4.3. include ラベル リンクのコピーリンクがクリップボードにコピーされました!
シンクバインディングを使用するには、bindings.knative.dev/include: "true" ラベルをリソースまたはリソースが含まれる namespace のいずれかに割り当てる必要があります。リソース定義にラベルが含まれていない場合、クラスター管理者は以下を実行してこれを namespace に割り当てることができます。
$ oc label namespace <namespace> bindings.knative.dev/include=true
2.6.1.5. Service Mesh とシンクバインディングの統合 リンクのコピーリンクがクリップボードにコピーされました!
前提条件
- Service Mesh を OpenShift Serverless と統合しました。
手順
ServiceMeshMemberRollのメンバーである namespace にServiceを作成します。apiVersion: serving.knative.dev/v1 kind: Service metadata: name: event-display namespace: <namespace>1 spec: template: metadata: annotations: sidecar.istio.io/inject: "true"2 sidecar.istio.io/rewriteAppHTTPProbers: "true" spec: containers: - image: quay.io/openshift-knative/showcaseServiceリソースを適用します。$ oc apply -f <filename>SinkBindingリソースを作成します。apiVersion: sources.knative.dev/v1 kind: SinkBinding metadata: name: bind-heartbeat namespace: <namespace>1 spec: subject: apiVersion: batch/v1 kind: Job2 selector: matchLabels: app: heartbeat-cron sink: ref: apiVersion: serving.knative.dev/v1 kind: Service name: event-displaySinkBindingリソースを適用します。$ oc apply -f <filename>CronJobを作成します。apiVersion: batch/v1 kind: CronJob metadata: name: heartbeat-cron namespace: <namespace>1 spec: # Run every minute schedule: "* * * * *" jobTemplate: metadata: labels: app: heartbeat-cron bindings.knative.dev/include: "true" spec: template: metadata: annotations: sidecar.istio.io/inject: "true"2 sidecar.istio.io/rewriteAppHTTPProbers: "true" spec: restartPolicy: Never containers: - name: single-heartbeat image: quay.io/openshift-knative/heartbeats:latest args: - --period=1 env: - name: ONE_SHOT value: "true" - name: POD_NAME valueFrom: fieldRef: fieldPath: metadata.name - name: POD_NAMESPACE valueFrom: fieldRef: fieldPath: metadata.namespaceCronJobリソースを適用します。$ oc apply -f <filename>
検証
イベントが Knative イベントシンクに送信されたことを確認するには、メッセージダンパー機能のログを調べます。
以下のコマンドを入力します。
$ oc get pods以下のコマンドを入力します。
$ oc logs $(oc get pod -o name | grep event-display) -c user-container出力例
☁️ cloudevents.Event Validation: valid Context Attributes, specversion: 1.0 type: dev.knative.eventing.samples.heartbeat source: https://knative.dev/eventing/test/heartbeats/#event-test/mypod id: 2b72d7bf-c38f-4a98-a433-608fbcdd2596 time: 2019-10-18T15:23:20.809775386Z contenttype: application/json Extensions, beats: true heart: yes the: 42 Data, { "id": 1, "label": "" }
2.6.2. コンテナーソース リンクのコピーリンクがクリップボードにコピーされました!
コンテナーソースは、イベントを生成し、イベントをシンクに送信するコンテナーイメージを作成します。コンテナーソースを使用して、イメージ URI を使用するコンテナーイメージおよび ContainerSource オブジェクトを作成して、カスタムイベントソースを作成できます。
2.6.2.1. コンテナーイメージを作成するためのガイドライン リンクのコピーリンクがクリップボードにコピーされました!
コンテナーソースコントローラーには、K_SINK および K_CE_OVERRIDES の 2 つの環境変数が注入されます。これらの変数は、それぞれ sink および ceOverrides 仕様から解決されます。イベントは、K_SINK 環境変数で指定されたシンク URI に送信されます。メッセージは、CloudEvent HTTP 形式を使用して POST として送信する必要があります。
コンテナーイメージの例
以下は、ハートビートコンテナーイメージの例になります。
package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"log"
"os"
"strconv"
"time"
duckv1 "knative.dev/pkg/apis/duck/v1"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/kelseyhightower/envconfig"
)
type Heartbeat struct {
Sequence int `json:"id"`
Label string `json:"label"`
}
var (
eventSource string
eventType string
sink string
label string
periodStr string
)
func init() {
flag.StringVar(&eventSource, "eventSource", "", "the event-source (CloudEvents)")
flag.StringVar(&eventType, "eventType", "dev.knative.eventing.samples.heartbeat", "the event-type (CloudEvents)")
flag.StringVar(&sink, "sink", "", "the host url to heartbeat to")
flag.StringVar(&label, "label", "", "a special label")
flag.StringVar(&periodStr, "period", "5", "the number of seconds between heartbeats")
}
type envConfig struct {
// Sink URL where to send heartbeat cloud events
Sink string `envconfig:"K_SINK"`
// CEOverrides are the CloudEvents overrides to be applied to the outbound event.
CEOverrides string `envconfig:"K_CE_OVERRIDES"`
// Name of this pod.
Name string `envconfig:"POD_NAME" required:"true"`
// Namespace this pod exists in.
Namespace string `envconfig:"POD_NAMESPACE" required:"true"`
// Whether to run continuously or exit.
OneShot bool `envconfig:"ONE_SHOT" default:"false"`
}
func main() {
flag.Parse()
var env envConfig
if err := envconfig.Process("", &env); err != nil {
log.Printf("[ERROR] Failed to process env var: %s", err)
os.Exit(1)
}
if env.Sink != "" {
sink = env.Sink
}
var ceOverrides *duckv1.CloudEventOverrides
if len(env.CEOverrides) > 0 {
overrides := duckv1.CloudEventOverrides{}
err := json.Unmarshal([]byte(env.CEOverrides), &overrides)
if err != nil {
log.Printf("[ERROR] Unparseable CloudEvents overrides %s: %v", env.CEOverrides, err)
os.Exit(1)
}
ceOverrides = &overrides
}
p, err := cloudevents.NewHTTP(cloudevents.WithTarget(sink))
if err != nil {
log.Fatalf("failed to create http protocol: %s", err.Error())
}
c, err := cloudevents.NewClient(p, cloudevents.WithUUIDs(), cloudevents.WithTimeNow())
if err != nil {
log.Fatalf("failed to create client: %s", err.Error())
}
var period time.Duration
if p, err := strconv.Atoi(periodStr); err != nil {
period = time.Duration(5) * time.Second
} else {
period = time.Duration(p) * time.Second
}
if eventSource == "" {
eventSource = fmt.Sprintf("https://knative.dev/eventing-contrib/cmd/heartbeats/#%s/%s", env.Namespace, env.Name)
log.Printf("Heartbeats Source: %s", eventSource)
}
if len(label) > 0 && label[0] == '"' {
label, _ = strconv.Unquote(label)
}
hb := &Heartbeat{
Sequence: 0,
Label: label,
}
ticker := time.NewTicker(period)
for {
hb.Sequence++
event := cloudevents.NewEvent("1.0")
event.SetType(eventType)
event.SetSource(eventSource)
event.SetExtension("the", 42)
event.SetExtension("heart", "yes")
event.SetExtension("beats", true)
if ceOverrides != nil && ceOverrides.Extensions != nil {
for n, v := range ceOverrides.Extensions {
event.SetExtension(n, v)
}
}
if err := event.SetData(cloudevents.ApplicationJSON, hb); err != nil {
log.Printf("failed to set cloudevents data: %s", err.Error())
}
log.Printf("sending cloudevent to %s", sink)
if res := c.Send(context.Background(), event); !cloudevents.IsACK(res) {
log.Printf("failed to send cloudevent: %v", res)
}
if env.OneShot {
return
}
// Wait for next tick
<-ticker.C
}
}
以下は、以前のハートビートコンテナーイメージを参照するコンテナーソースの例です。
apiVersion: sources.knative.dev/v1
kind: ContainerSource
metadata:
name: test-heartbeats
spec:
template:
spec:
containers:
# This corresponds to a heartbeats image URI that you have built and published
- image: gcr.io/knative-releases/knative.dev/eventing/cmd/heartbeats
name: heartbeats
args:
- --period=1
env:
- name: POD_NAME
value: "example-pod"
- name: POD_NAMESPACE
value: "event-test"
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: showcase
...
2.6.2.2. Knative CLI を使用したコンテナーソースの作成および管理 リンクのコピーリンクがクリップボードにコピーされました!
kn source container コマンドを使用し、Knative (kn) CLI を使用してコンテナーソースを作成および管理できます。Knative CLI を使用してイベントソースを作成すると、YAML ファイルを直接変更するよりも合理化された直感的なユーザーインターフェイスが提供されます。
コンテナーソースの作成
$ kn source container create <container_source_name> --image <image_uri> --sink <sink>
コンテナーソースの削除
$ kn source container delete <container_source_name>
コンテナーソースの記述
$ kn source container describe <container_source_name>
既存のコンテナーソースをリスト表示
$ kn source container list
既存のコンテナーソースを YAML 形式でリスト表示
$ kn source container list -o yaml
コンテナーソースの更新
このコマンドにより、既存のコンテナーソースのイメージ URI が更新されます。
$ kn source container update <container_source_name> --image <image_uri>
2.6.2.3. Web コンソールを使用したコンテナーソースの作成 リンクのコピーリンクがクリップボードにコピーされました!
Knative Eventing がクラスターにインストールされると、Web コンソールを使用してコンテナーソースを作成できます。OpenShift Container Platform Web コンソールを使用すると、イベントソースを作成するための合理的で直感的なユーザーインターフェイスが提供されます。
前提条件
- OpenShift Container Platform Web コンソールにログインしている。
- OpenShift Serverless Operator、Knative Serving、および Knative Eventing が OpenShift Container Platform クラスターにインストールされている。
- OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
手順
-
Developer パースペクティブで、+Add
Event Source に移動します。Event Sources ページが表示されます。 - Container Source を選択してから Create Event Source をクリックします。Create Event Source ページが表示されます。
Form view または YAML view を使用して、Container Source 設定を設定します。
注記Form view と YAML view 間で切り換えることができます。ビューの切り替え時に、データは永続化されます。
- Image フィールドに、コンテナーソースが作成したコンテナーで実行するイメージの URI を入力します。
- Name フィールドにイメージの名前を入力します。
- オプション: Arguments フィールドで、コンテナーに渡す引数を入力します。
- オプション: Environment variables フィールドで、コンテナーに設定する環境変数を追加します。
Target セクションで、イベントシンクを選択します。これは Resource または URI のいずれかです。
- Resource を選択して、チャネル、ブローカー、またはサービスをイベントソースのシンクとして使用します。
- URI を選択して、イベントのルーティング先となる URI (Uniform Resource Identifier) を指定します。
- コンテナーソースの設定が完了したら、Create をクリックします。
2.6.2.4. コンテナーソースのリファレンス リンクのコピーリンクがクリップボードにコピーされました!
ContainerSource オブジェクトを作成することにより、コンテナーをイベントソースとして使用できます。ContainerSource オブジェクトを作成するときに、複数のパラメーターを設定できます。
ContainerSource オブジェクトは以下のフィールドをサポートします。
| フィールド | 説明 | 必須またはオプション |
|---|---|---|
|
|
API バージョンを指定します (例: | 必須 |
|
|
このリソースオブジェクトを | 必須 |
|
|
| 必須 |
|
|
この | 必須 |
|
| シンクとして使用する URI に解決するオブジェクトへの参照。 | 必須 |
|
|
| 必須 |
|
| 上書きを定義して、シンクに送信されたイベントへの出力形式および変更を制御します。 | オプション |
テンプレートパラメーターの例
apiVersion: sources.knative.dev/v1
kind: ContainerSource
metadata:
name: test-heartbeats
spec:
template:
spec:
containers:
- image: quay.io/openshift-knative/heartbeats:latest
name: heartbeats
args:
- --period=1
env:
- name: POD_NAME
value: "mypod"
- name: POD_NAMESPACE
value: "event-test"
...
2.6.2.4.1. CloudEvent オーバーライド リンクのコピーリンクがクリップボードにコピーされました!
ceOverrides 定義は、シンクに送信される CloudEvent の出力形式および変更を制御するオーバーライドを提供します。ceOverrides 定義に複数のフィールドを設定できます。
ceOverrides の定義は、以下のフィールドをサポートします。
| フィールド | 説明 | 必須またはオプション |
|---|---|---|
|
|
アウトバウンドイベントで追加または上書きされる属性を指定します。各 | オプション |
拡張子として許可されるのは、有効な CloudEvent 属性名のみです。拡張機能オーバーライド設定から仕様定義属性を設定することはできません。たとえば、type 属性を変更することはできません。
CloudEvent オーバーライドの例
apiVersion: sources.knative.dev/v1
kind: ContainerSource
metadata:
name: test-heartbeats
spec:
...
ceOverrides:
extensions:
extra: this is an extra attribute
additional: 42
これにより、subject に K_CE_OVERRIDES 環境変数が設定されます。
出力例
{ "extensions": { "extra": "this is an extra attribute", "additional": "42" } }
2.6.2.5. Service Mesh と ContainerSource の統合 リンクのコピーリンクがクリップボードにコピーされました!
前提条件
- Service Mesh を OpenShift Serverless と統合しました。
手順
ServiceMeshMemberRollのメンバーである namespace にServiceを作成します。apiVersion: serving.knative.dev/v1 kind: Service metadata: name: event-display namespace: <namespace>1 spec: template: metadata: annotations: sidecar.istio.io/inject: "true"2 sidecar.istio.io/rewriteAppHTTPProbers: "true" spec: containers: - image: quay.io/openshift-knative/showcaseServiceリソースを適用します。$ oc apply -f <filename>ServiceMeshMemberRollのメンバーである namespace にContainerSourceオブジェクトを作成し、event-displayに設定されたシンクを作成します。apiVersion: sources.knative.dev/v1 kind: ContainerSource metadata: name: test-heartbeats namespace: <namespace>1 spec: template: metadata:2 annotations: sidecar.istio.io/inject: "true" sidecar.istio.io/rewriteAppHTTPProbers: "true" spec: containers: - image: quay.io/openshift-knative/heartbeats:latest name: heartbeats args: - --period=1s env: - name: POD_NAME value: "example-pod" - name: POD_NAMESPACE value: "event-test" sink: ref: apiVersion: serving.knative.dev/v1 kind: Service name: event-displayContainerSourceリソースを適用します。$ oc apply -f <filename>
検証
イベントが Knative イベントシンクに送信されたことを確認するには、メッセージダンパー機能のログを調べます。
以下のコマンドを入力します。
$ oc get pods以下のコマンドを入力します。
$ oc logs $(oc get pod -o name | grep event-display) -c user-container出力例
☁️ cloudevents.Event Validation: valid Context Attributes, specversion: 1.0 type: dev.knative.eventing.samples.heartbeat source: https://knative.dev/eventing/test/heartbeats/#event-test/mypod id: 2b72d7bf-c38f-4a98-a433-608fbcdd2596 time: 2019-10-18T15:23:20.809775386Z contenttype: application/json Extensions, beats: true heart: yes the: 42 Data, { "id": 1, "label": "" }