5.2. イベントソース
5.2.1. イベントソース
Knative イベントソース には、クラウドイベントの生成またはインポート、これらのイベントの別のエンドポイントへのリレー (sink とも呼ばれる) を行う Kubernetes オブジェクトを指定できます。イベントに対応する分散システムを開発するには、イベントのソースが重要になります。
OpenShift Container Platform Web コンソールの Developer パースペクティブ、Knative (kn
) CLI を使用するか、YAML ファイルを適用することで、Knative イベントソースを作成および管理できます。
現時点で、OpenShift Serverless は以下のイベントソースタイプをサポートします。
- API サーバーソース
- Kubernetes API サーバーイベントを Knative に送ります。API サーバーソースは、Kubernetes リソースが作成、更新、または削除されるたびに新規イベントを送信します。
- Ping ソース
- 指定された cron スケジュールに、固定ペイロードを使用してイベントを生成します。
- Kafka イベントソース
- Kafka クラスターをイベントソースとしてシンクに接続します。
カスタムイベントソース を作成することもできます。
5.2.2. Administrator パースペクティブのイベントソース
イベントに対応する分散システムを開発するには、イベントのソースが重要になります。
5.2.2.1. Administrator パースペクティブを使用したイベントソースの作成
Knative イベントソース には、クラウドイベントの生成またはインポート、これらのイベントの別のエンドポイントへのリレー (sink とも呼ばれる) を行う Kubernetes オブジェクトを指定できます。
前提条件
- OpenShift Serverless Operator および Knative Eventing が OpenShift Container Platform クラスターにインストールされている。
- Web コンソールにログインしており、Administrator パースペクティブを使用している。
- OpenShift Container Platform のクラスター管理者パーミッションがある。
手順
-
OpenShift Container Platform Web コンソールの Administrator パースペクティブで、 Serverless
Eventing に移動します。 - Create 一覧で、Event Source を選択します。Event Sources ページに移動します。
- 作成するイベントソースタイプを選択します。
5.2.3. API サーバーソースの作成
API サーバーソースは、Knative サービスなどのイベントシンクを Kubernetes API サーバーに接続するために使用できるイベントソースです。API サーバーソースは Kubernetes イベントを監視し、それらを Knative Eventing ブローカーに転送します。
5.2.3.1. Web コンソールを使用した API サーバーソースの作成
Knative Eventing がクラスターにインストールされると、Web コンソールを使用して API サーバーソースを作成できます。OpenShift Container Platform Web コンソールを使用すると、イベントソースを作成するための合理的で直感的なユーザーインターフェイスが提供されます。
前提条件
- OpenShift Container Platform Web コンソールにログインしている。
- OpenShift Serverless Operator および Knative Eventing がクラスターにインストールされている。
- OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
-
OpenShift CLI (
oc
) がインストールされている。
既存のサービスアカウントを再利用する必要がある場合には、既存の ServiceAccount
リソースを変更して、新規リソースを作成せずに、必要なパーミッションを含めることができます。
イベントソースのサービスアカウント、ロールおよびロールバインディングを YAML ファイルとして作成します。
apiVersion: v1 kind: ServiceAccount metadata: name: events-sa namespace: default 1 --- apiVersion: rbac.authorization.k8s.io/v1 kind: Role metadata: name: event-watcher namespace: default 2 rules: - apiGroups: - "" resources: - events verbs: - get - list - watch --- apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding metadata: name: k8s-ra-event-watcher namespace: default 3 roleRef: apiGroup: rbac.authorization.k8s.io kind: Role name: event-watcher subjects: - kind: ServiceAccount name: events-sa namespace: default 4
YAML ファイルを適用します。
$ oc apply -f <filename>
-
Developer パースペクティブで、+Add
Event Source に移動します。Event Sources ページが表示されます。 - オプション: イベントソースに複数のプロバイダーがある場合は、Providers 一覧から必要なプロバイダーを選択し、プロバイダーから利用可能なイベントソースをフィルターします。
- ApiServerSource を選択してから Create Event Source をクリックします。Create Event Source ページが表示されます。
Form view または YAML view を使用して、ApiServerSource 設定を設定します。
注記Form view と YAML view 間で切り換えることができます。ビューの切り替え時に、データは永続化されます。
-
APIVERSION に
v1
を、KIND にEvent
を入力します。 - 作成したサービスアカウントの Service Account Name を選択します。
- イベントソースの Sink を選択します。Sink は、チャネル、ブローカー、またはサービスなどの Resource、または URI のいずれかになります。
-
APIVERSION に
- Create をクリックします。
検証
API サーバーソースの作成後、これが Topology ビューでシンクされるサービスに接続されていることを確認できます。
URI シンクが使用される場合、URI sink
API サーバーソースの削除
- Topology ビューに移動します。
API サーバーソースを右クリックし、Delete ApiServerSource を選択します。
5.2.3.2. Knative CLI を使用した API サーバーソースの作成
kn source apiserver create
コマンドを使用し、kn
CLI を使用して API サーバーソースを作成できます。API サーバーソースを作成するために kn
CLI を使用すると、YAML ファイルを直接修正するよりも合理的で直感的なユーザーインターフェイスが得られます。
前提条件
- OpenShift Serverless Operator および Knative Eventing がクラスターにインストールされている。
- OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
-
OpenShift CLI (
oc
) がインストールされている。 -
Knative (
kn
) CLI をインストールしている。
既存のサービスアカウントを再利用する必要がある場合には、既存の ServiceAccount
リソースを変更して、新規リソースを作成せずに、必要なパーミッションを含めることができます。
イベントソースのサービスアカウント、ロールおよびロールバインディングを YAML ファイルとして作成します。
apiVersion: v1 kind: ServiceAccount metadata: name: events-sa namespace: default 1 --- apiVersion: rbac.authorization.k8s.io/v1 kind: Role metadata: name: event-watcher namespace: default 2 rules: - apiGroups: - "" resources: - events verbs: - get - list - watch --- apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding metadata: name: k8s-ra-event-watcher namespace: default 3 roleRef: apiGroup: rbac.authorization.k8s.io kind: Role name: event-watcher subjects: - kind: ServiceAccount name: events-sa namespace: default 4
YAML ファイルを適用します。
$ oc apply -f <filename>
イベントシンクを持つ API サーバーソースを作成します。次の例では、シンクはブローカーです。
$ kn source apiserver create <event_source_name> --sink broker:<broker_name> --resource "event:v1" --service-account <service_account_name> --mode Resource
API サーバーソースが正しく設定されていることを確認するには、受信メッセージをログにダンプする Knative サービスを作成します。
$ kn service create <service_name> --image quay.io/openshift-knative/knative-eventing-sources-event-display:latest
ブローカーをイベントシンクとして使用した場合は、トリガーを作成して、
default
のブローカーからサービスへのイベントをフィルターリングします。$ kn trigger create <trigger_name> --sink ksvc:<service_name>
デフォルト namespace で Pod を起動してイベントを作成します。
$ oc create deployment hello-node --image quay.io/openshift-knative/knative-eventing-sources-event-display:latest
以下のコマンドを入力し、生成される出力を検査して、コントローラーが正しくマップされていることを確認します。
$ kn source apiserver describe <source_name>
出力例
Name: mysource Namespace: default Annotations: sources.knative.dev/creator=developer, sources.knative.dev/lastModifier=developer Age: 3m ServiceAccountName: events-sa Mode: Resource Sink: Name: default Namespace: default Kind: Broker (eventing.knative.dev/v1) Resources: Kind: event (v1) Controller: false Conditions: OK TYPE AGE REASON ++ Ready 3m ++ Deployed 3m ++ SinkProvided 3m ++ SufficientPermissions 3m ++ EventTypesProvided 3m
検証
メッセージダンパー機能ログを確認して、Kubernetes イベントが Knative に送信されていることを確認できます。
Pod を取得します。
$ oc get pods
Pod のメッセージダンパー機能ログを表示します。
$ oc logs $(oc get pod -o name | grep event-display) -c user-container
出力例
☁️ cloudevents.Event Validation: valid Context Attributes, specversion: 1.0 type: dev.knative.apiserver.resource.update datacontenttype: application/json ... Data, { "apiVersion": "v1", "involvedObject": { "apiVersion": "v1", "fieldPath": "spec.containers{hello-node}", "kind": "Pod", "name": "hello-node", "namespace": "default", ..... }, "kind": "Event", "message": "Started container", "metadata": { "name": "hello-node.159d7608e3a3572c", "namespace": "default", .... }, "reason": "Started", ... }
API サーバーソースの削除
トリガーを削除します。
$ kn trigger delete <trigger_name>
イベントソースを削除します。
$ kn source apiserver delete <source_name>
サービスアカウント、クラスターロール、およびクラスターバインディングを削除します。
$ oc delete -f authentication.yaml
5.2.3.2.1. Knative CLI シンクフラグ
Knative (kn
) CLI を使用してイベントソースを作成する場合、--sink
フラグを使用して、イベントがリソースから送信されるシンクを指定できます。シンクは、他のリソースから受信イベントを受信できる、アドレス指定可能または呼び出し可能な任意のリソースです。
以下の例では、サービスの http://event-display.svc.cluster.local
をシンクとして使用するシンクバインディングを作成します。
シンクフラグを使用したコマンドの例
$ kn source binding create bind-heartbeat \
--namespace sinkbinding-example \
--subject "Job:batch/v1:app=heartbeat-cron" \
--sink http://event-display.svc.cluster.local \ 1
--ce-override "sink=bound"
- 1
http://event-display.svc.cluster.local
のsvc
は、シンクが Knative サービスであることを判別します。他のデフォルトのシンクの接頭辞には、channel
およびbroker
が含まれます。
5.2.3.3. YAML ファイルを使用した API サーバーソースの作成
YAML ファイルを使用して Knative リソースを作成する場合、宣言的 API を使用するため、再現性の高い方法でイベントソースを宣言的に記述することができます。YAML を使用して API サーバーソースを作成するには、ApiServerSource
オブジェクトを定義する YAML ファイルを作成し、oc apply
コマンドを使用してそれを適用する必要があります。
前提条件
- OpenShift Serverless Operator および Knative Eventing がクラスターにインストールされている。
- OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
-
API サーバーソース YAML ファイルで定義されるものと同じ namespace に
default
ブローカーを作成している。 -
OpenShift CLI (
oc
) をインストールしている。
既存のサービスアカウントを再利用する必要がある場合には、既存の ServiceAccount
リソースを変更して、新規リソースを作成せずに、必要なパーミッションを含めることができます。
イベントソースのサービスアカウント、ロールおよびロールバインディングを YAML ファイルとして作成します。
apiVersion: v1 kind: ServiceAccount metadata: name: events-sa namespace: default 1 --- apiVersion: rbac.authorization.k8s.io/v1 kind: Role metadata: name: event-watcher namespace: default 2 rules: - apiGroups: - "" resources: - events verbs: - get - list - watch --- apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding metadata: name: k8s-ra-event-watcher namespace: default 3 roleRef: apiGroup: rbac.authorization.k8s.io kind: Role name: event-watcher subjects: - kind: ServiceAccount name: events-sa namespace: default 4
YAML ファイルを適用します。
$ oc apply -f <filename>
API サーバーソースを YAML ファイルとして作成します。
apiVersion: sources.knative.dev/v1alpha1 kind: ApiServerSource metadata: name: testevents spec: serviceAccountName: events-sa mode: Resource resources: - apiVersion: v1 kind: Event sink: ref: apiVersion: eventing.knative.dev/v1 kind: Broker name: default
ApiServerSource
YAML ファイルを適用します。$ oc apply -f <filename>
API サーバーソースが正しく設定されていることを確認するには、受信メッセージをログにダンプする Knative サービスを YAML ファイルとして作成します。
apiVersion: serving.knative.dev/v1 kind: Service metadata: name: event-display namespace: default spec: template: spec: containers: - image: quay.io/openshift-knative/knative-eventing-sources-event-display:latest
Service
YAML ファイルを適用します。$ oc apply -f <filename>
直接の手順で作成下サービスに、
default
ブローカーからイベントをフィルターするTrigger
オブジェクトを YAML ファイルとして作成します。apiVersion: eventing.knative.dev/v1 kind: Trigger metadata: name: event-display-trigger namespace: default spec: broker: default subscriber: ref: apiVersion: serving.knative.dev/v1 kind: Service name: event-display
Trigger
YAML ファイルを適用します。$ oc apply -f <filename>
デフォルト namespace で Pod を起動してイベントを作成します。
$ oc create deployment hello-node --image=quay.io/openshift-knative/knative-eventing-sources-event-display
以下のコマンドを入力し、出力を検査して、コントローラーが正しくマップされていることを確認します。
$ oc get apiserversource.sources.knative.dev testevents -o yaml
出力例
apiVersion: sources.knative.dev/v1alpha1 kind: ApiServerSource metadata: annotations: creationTimestamp: "2020-04-07T17:24:54Z" generation: 1 name: testevents namespace: default resourceVersion: "62868" selfLink: /apis/sources.knative.dev/v1alpha1/namespaces/default/apiserversources/testevents2 uid: 1603d863-bb06-4d1c-b371-f580b4db99fa spec: mode: Resource resources: - apiVersion: v1 controller: false controllerSelector: apiVersion: "" kind: "" name: "" uid: "" kind: Event labelSelector: {} serviceAccountName: events-sa sink: ref: apiVersion: eventing.knative.dev/v1 kind: Broker name: default
検証
Kubernetes イベントが Knative に送信されていることを確認するには、メッセージダンパー機能ログを確認します。
以下のコマンドを入力して Pod を取得します。
$ oc get pods
以下のコマンドを入力して、Pod のメッセージダンパー機能ログを表示します。
$ oc logs $(oc get pod -o name | grep event-display) -c user-container
出力例
☁️ cloudevents.Event Validation: valid Context Attributes, specversion: 1.0 type: dev.knative.apiserver.resource.update datacontenttype: application/json ... Data, { "apiVersion": "v1", "involvedObject": { "apiVersion": "v1", "fieldPath": "spec.containers{hello-node}", "kind": "Pod", "name": "hello-node", "namespace": "default", ..... }, "kind": "Event", "message": "Started container", "metadata": { "name": "hello-node.159d7608e3a3572c", "namespace": "default", .... }, "reason": "Started", ... }
API サーバーソースの削除
トリガーを削除します。
$ oc delete -f trigger.yaml
イベントソースを削除します。
$ oc delete -f k8s-events.yaml
サービスアカウント、クラスターロール、およびクラスターバインディングを削除します。
$ oc delete -f authentication.yaml
5.2.4. ping ソースの作成
ping ソースは、一定のペイロードを使用して ping イベントをイベントコンシューマーに定期的に送信するために使用されるイベントソースです。ping ソースを使用すると、タイマーと同様にイベントの送信をスケジュールできます。
5.2.4.1. Web コンソールを使用した ping ソースの作成
Knative Eventing がクラスターにインストールされると、Web コンソールを使用して ping ソースを作成できます。OpenShift Container Platform Web コンソールを使用すると、イベントソースを作成するための合理的で直感的なユーザーインターフェイスが提供されます。
前提条件
- OpenShift Container Platform Web コンソールにログインしている。
- OpenShift Serverless Operator、Knative Serving、および Knative Eventing がクラスターにインストールされている。
- OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
手順
PingSource が機能していることを確認するには、受信メッセージをサービスのログにダンプする単純な Knative サービスを作成します。
-
Developer パースペクティブで、+Add
YAML に移動します。 サンプル YAML をコピーします。
apiVersion: serving.knative.dev/v1 kind: Service metadata: name: event-display spec: template: spec: containers: - image: quay.io/openshift-knative/knative-eventing-sources-event-display:latest
- Create をクリックします。
-
Developer パースペクティブで、+Add
直前の手順で作成したサービスと同じ namespace、またはイベントの送信先となる他のシンクと同じ namespace に ping ソースを作成します。
-
Developer パースペクティブで、+Add
Event Source に移動します。Event Sources ページが表示されます。 - オプション: イベントソースに複数のプロバイダーがある場合は、Providers 一覧から必要なプロバイダーを選択し、プロバイダーから利用可能なイベントソースをフィルターします。
Ping Source を選択してから Create Event Source をクリックします。Create Event Source ページが表示されます。
注記Form view または YAML view を使用して PingSource 設定を設定し、これらのビューを切り換えることができます。ビューの切り替え時に、データは永続化されます。
-
Schedule の値を入力します。この例では、値は
*/2 * * * *
であり、2 分ごとにメッセージを送信する PingSource を作成します。 - オプション: Data の値を入力できます。これはメッセージのペイロードです。
-
Sink を選択します。これは Resource または URI のいずれかになります。この例では、直前の手順で作成された
event-display
サービスが Resources シンクとして使用されます。 - Create をクリックします。
-
Developer パースペクティブで、+Add
検証
Topology ページを表示して、ping ソースが作成され、シンクに接続されていることを確認できます。
- Developer パースペクティブで、Topology に移動します。
ping ソースおよびシンクを表示します。
ping ソースの削除
- Topology ビューに移動します。
- API サーバーソースを右クリックし、Delete Ping Source を選択します。
5.2.4.2. Knative CLI を使用した ping ソースの作成
kn source ping create
コマンドを使用し、Knative (kn
) CLI を使用して ping ソースを作成できます。イベントソースを作成するために Knative CLI を使用すると、YAML ファイルを直接修正するよりも合理的で直感的なユーザーインターフェイスが得られます。
前提条件
- OpenShift Serverless Operator、Knative Serving、および Knative Eventing がクラスターにインストールされている。
-
Knative (
kn
) CLI をインストールしている。 - OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
-
オプション: この手順の検証手順を使用する場合は、OpenShift CLI (
oc
) をインストールします。
手順
ping ソースが機能していることを確認するには、受信メッセージをサービスのログにダンプする単純な Knative サービスを作成します。
$ kn service create event-display \ --image quay.io/openshift-knative/knative-eventing-sources-event-display:latest
要求する必要のある ping イベントのセットごとに、PingSource をイベントコンシューマーと同じ namespace に作成します。
$ kn source ping create test-ping-source \ --schedule "*/2 * * * *" \ --data '{"message": "Hello world!"}' \ --sink ksvc:event-display
以下のコマンドを入力し、出力を検査して、コントローラーが正しくマップされていることを確認します。
$ kn source ping describe test-ping-source
出力例
Name: test-ping-source Namespace: default Annotations: sources.knative.dev/creator=developer, sources.knative.dev/lastModifier=developer Age: 15s Schedule: */2 * * * * Data: {"message": "Hello world!"} Sink: Name: event-display Namespace: default Resource: Service (serving.knative.dev/v1) Conditions: OK TYPE AGE REASON ++ Ready 8s ++ Deployed 8s ++ SinkProvided 15s ++ ValidSchedule 15s ++ EventTypeProvided 15s ++ ResourcesCorrect 15s
検証
シンク Pod のログを確認して、Kubernetes イベントが Knative イベントに送信されていることを確認できます。
デフォルトで、Knative サービスは、トラフィックが 60 秒以内に受信されない場合に Pod を終了します。本書の例では、新たに作成される Pod で各メッセージが確認されるように 2 分ごとにメッセージを送信する ping ソースを作成します。
作成された新規 Pod を監視します。
$ watch oc get pods
Ctrl+C を使用して Pod の監視をキャンセルし、作成された Pod のログを確認します。
$ oc logs $(oc get pod -o name | grep event-display) -c user-container
出力例
☁️ cloudevents.Event Validation: valid Context Attributes, specversion: 1.0 type: dev.knative.sources.ping source: /apis/v1/namespaces/default/pingsources/test-ping-source id: 99e4f4f6-08ff-4bff-acf1-47f61ded68c9 time: 2020-04-07T16:16:00.000601161Z datacontenttype: application/json Data, { "message": "Hello world!" }
ping ソースの削除
ping ソースを削除します。
$ kn delete pingsources.sources.knative.dev <ping_source_name>
5.2.4.2.1. Knative CLI シンクフラグ
Knative (kn
) CLI を使用してイベントソースを作成する場合、--sink
フラグを使用して、イベントがリソースから送信されるシンクを指定できます。シンクは、他のリソースから受信イベントを受信できる、アドレス指定可能または呼び出し可能な任意のリソースです。
以下の例では、サービスの http://event-display.svc.cluster.local
をシンクとして使用するシンクバインディングを作成します。
シンクフラグを使用したコマンドの例
$ kn source binding create bind-heartbeat \
--namespace sinkbinding-example \
--subject "Job:batch/v1:app=heartbeat-cron" \
--sink http://event-display.svc.cluster.local \ 1
--ce-override "sink=bound"
- 1
http://event-display.svc.cluster.local
のsvc
は、シンクが Knative サービスであることを判別します。他のデフォルトのシンクの接頭辞には、channel
およびbroker
が含まれます。
5.2.4.3. YAML を使用した ping ソースの作成
YAML ファイルを使用して Knative リソースを作成する場合、宣言的 API を使用するため、再現性の高い方法でイベントソースを宣言的に記述することができます。YAML を使用してサーバーレス ping を作成するには、PingSource
オブジェクトを定義する YAML ファイルを作成し、oc apply
を使用してこれを適用する必要があります。
PingSource
オブジェクトの例
apiVersion: sources.knative.dev/v1 kind: PingSource metadata: name: test-ping-source spec: schedule: "*/2 * * * *" 1 data: '{"message": "Hello world!"}' 2 sink: 3 ref: apiVersion: serving.knative.dev/v1 kind: Service name: event-display
前提条件
- OpenShift Serverless Operator、Knative Serving、および Knative Eventing がクラスターにインストールされている。
-
OpenShift CLI (
oc
) をインストールしている。 - OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
手順
ping ソースが機能していることを確認するには、受信メッセージをサービスのログにダンプする単純な Knative サービスを作成します。
サービス YAML ファイルを作成します。
apiVersion: serving.knative.dev/v1 kind: Service metadata: name: event-display spec: template: spec: containers: - image: quay.io/openshift-knative/knative-eventing-sources-event-display:latest
サービスを作成します。
$ oc apply -f <filename>
要求する必要のある ping イベントのセットごとに、ping ソースをイベントコンシューマーと同じ namespace に作成します。
ping ソースの YAML ファイルを作成します。
apiVersion: sources.knative.dev/v1 kind: PingSource metadata: name: test-ping-source spec: schedule: "*/2 * * * *" data: '{"message": "Hello world!"}' sink: ref: apiVersion: serving.knative.dev/v1 kind: Service name: event-display
ping ソースを作成します。
$ oc apply -f <filename>
以下のコマンドを入力し、コントローラーが正しくマップされていることを確認します。
$ oc get pingsource.sources.knative.dev <ping_source_name> -oyaml
出力例
apiVersion: sources.knative.dev/v1 kind: PingSource metadata: annotations: sources.knative.dev/creator: developer sources.knative.dev/lastModifier: developer creationTimestamp: "2020-04-07T16:11:14Z" generation: 1 name: test-ping-source namespace: default resourceVersion: "55257" selfLink: /apis/sources.knative.dev/v1/namespaces/default/pingsources/test-ping-source uid: 3d80d50b-f8c7-4c1b-99f7-3ec00e0a8164 spec: data: '{ value: "hello" }' schedule: '*/2 * * * *' sink: ref: apiVersion: serving.knative.dev/v1 kind: Service name: event-display namespace: default
検証
シンク Pod のログを確認して、Kubernetes イベントが Knative イベントに送信されていることを確認できます。
デフォルトで、Knative サービスは、トラフィックが 60 秒以内に受信されない場合に Pod を終了します。本書の例では、新たに作成される Pod で各メッセージが確認されるように 2 分ごとにメッセージを送信する PingSource を作成します。
作成された新規 Pod を監視します。
$ watch oc get pods
Ctrl+C を使用して Pod の監視をキャンセルし、作成された Pod のログを確認します。
$ oc logs $(oc get pod -o name | grep event-display) -c user-container
出力例
☁️ cloudevents.Event Validation: valid Context Attributes, specversion: 1.0 type: dev.knative.sources.ping source: /apis/v1/namespaces/default/pingsources/test-ping-source id: 042ff529-240e-45ee-b40c-3a908129853e time: 2020-04-07T16:22:00.000791674Z datacontenttype: application/json Data, { "message": "Hello world!" }
ping ソースの削除
ping ソースを削除します。
$ oc delete -f <filename>
コマンドの例
$ oc delete -f ping-source.yaml
5.2.5. Kafka ソース
Apache Kafka クラスターからイベントを読み取り、これらのイベントをシンクに渡す Kafka ソースを作成できます。Kafka ソースを作成するには、OpenShift Container Platform Web コンソールの Knative (kn
)CLI を使用するか、KafkaSource
オブジェクトを YAML ファイルとして直接作成し、OpenShift CLI (oc
) を使用して適用します。
5.2.5.1. Web コンソールを使用した Kafka イベントソースの作成
Knative Kafka をクラスターにインストールした後、Web コンソールを使用して Kafka ソースを作成できます。OpenShift Container Platform Web コンソールを使用すると、Kafka ソースを作成するための合理的で直感的なユーザーインターフェイスが提供されます。
前提条件
-
OpenShift Serverless Operator、Knative Serving、および
KnativeKafka
カスタムリソースがクラスターにインストールされている。 - Web コンソールにログインしている。
- インポートする Kafka メッセージを生成する Red Hat AMQ Streams (Kafka) クラスターにアクセスできる。
- OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
手順
- Developer パースペクティブで、+Add ページに移動し、Event Source を選択します。
- Event Sources ページで、Type セクションの Kafka Source を選択します。
Kafka Source 設定を設定します。
- ブートストラップサーバー のコンマ区切りの一覧を追加します。
- トピック のコンマ区切りの一覧を追加します。
- コンシューマーグループ を追加します。
- 作成したサービスアカウントの Service Account Name を選択します。
- イベントソースの Sink を選択します。Sink は、チャネル、ブローカー、またはサービスなどの Resource、または URI のいずれかになります。
- Kafka イベントソースの Name を入力します。
- Create をクリックします。
検証
Topology ページを表示して、Kafka イベントソースが作成され、シンクに接続されていることを確認できます。
- Developer パースペクティブで、Topology に移動します。
Kafka イベントソースおよびシンクを表示します。
5.2.5.2. Knative CLI を使用した Kafka イベントソースの作成
kn source kafka create
コマンドを使用し、Knative (kn
) CLI を使用して Kafka ソースを作成できます。イベントソースを作成するために Knative CLI を使用すると、YAML ファイルを直接修正するよりも合理的で直感的なユーザーインターフェイスが得られます。
前提条件
-
OpenShift Serverless Operator、Knative Eventing、Knative Serving、および
KnativeKafka
カスタムリソース (CR) がクラスターにインストールされている。 - OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
- インポートする Kafka メッセージを生成する Red Hat AMQ Streams (Kafka) クラスターにアクセスできる。
-
Knative (
kn
) CLI をインストールしている。 -
オプション: この手順で検証ステップを使用する場合は、OpenShift CLI (
oc
) をインストールします。
手順
Kafka イベントソースが機能していることを確認するには、受信メッセージをサービスのログにダンプする Knative サービスを作成します。
$ kn service create event-display \ --image quay.io/openshift-knative/knative-eventing-sources-event-display
KafkaSource
CR を作成します。$ kn source kafka create <kafka_source_name> \ --servers <cluster_kafka_bootstrap>.kafka.svc:9092 \ --topics <topic_name> --consumergroup my-consumer-group \ --sink event-display
注記このコマンドのプレースホルダー値は、ソース名、ブートストラップサーバー、およびトピックの値に置き換えます。
--servers
、--topics
、および--consumergroup
オプションは、Kafka クラスターへの接続パラメーターを指定します。--consumergroup
オプションは任意です。オプション: 作成した
KafkaSource
CR の詳細を表示します。$ kn source kafka describe <kafka_source_name>
出力例
Name: example-kafka-source Namespace: kafka Age: 1h BootstrapServers: example-cluster-kafka-bootstrap.kafka.svc:9092 Topics: example-topic ConsumerGroup: example-consumer-group Sink: Name: event-display Namespace: default Resource: Service (serving.knative.dev/v1) Conditions: OK TYPE AGE REASON ++ Ready 1h ++ Deployed 1h ++ SinkProvided 1h
検証手順
Kafka インスタンスをトリガーし、メッセージをトピックに送信します。
$ oc -n kafka run kafka-producer \ -ti --image=quay.io/strimzi/kafka:latest-kafka-2.7.0 --rm=true \ --restart=Never -- bin/kafka-console-producer.sh \ --broker-list <cluster_kafka_bootstrap>:9092 --topic my-topic
プロンプトにメッセージを入力します。このコマンドは、以下を前提とします。
-
Kafka クラスターが
kafka
namespace にインストールされている。 -
KafkaSource
オブジェクトは、my-topic
トピックを使用するように設定されている。
-
Kafka クラスターが
ログを表示して、メッセージが到達していることを確認します。
$ oc logs $(oc get pod -o name | grep event-display) -c user-container
出力例
☁️ cloudevents.Event Validation: valid Context Attributes, specversion: 1.0 type: dev.knative.kafka.event source: /apis/v1/namespaces/default/kafkasources/example-kafka-source#example-topic subject: partition:46#0 id: partition:46/offset:0 time: 2021-03-10T11:21:49.4Z Extensions, traceparent: 00-161ff3815727d8755848ec01c866d1cd-7ff3916c44334678-00 Data, Hello!
5.2.5.2.1. Knative CLI シンクフラグ
Knative (kn
) CLI を使用してイベントソースを作成する場合、--sink
フラグを使用して、イベントがリソースから送信されるシンクを指定できます。シンクは、他のリソースから受信イベントを受信できる、アドレス指定可能または呼び出し可能な任意のリソースです。
以下の例では、サービスの http://event-display.svc.cluster.local
をシンクとして使用するシンクバインディングを作成します。
シンクフラグを使用したコマンドの例
$ kn source binding create bind-heartbeat \
--namespace sinkbinding-example \
--subject "Job:batch/v1:app=heartbeat-cron" \
--sink http://event-display.svc.cluster.local \ 1
--ce-override "sink=bound"
- 1
http://event-display.svc.cluster.local
のsvc
は、シンクが Knative サービスであることを判別します。他のデフォルトのシンクの接頭辞には、channel
およびbroker
が含まれます。
5.2.5.3. YAML を使用した Kafka イベントソースの作成
YAML ファイルを使用して Knative リソースを作成する場合、宣言的 API を使用するため、再現性の高い方法でアプリケーションを宣言的に記述することができます。YAML を使用して Kafka ソースを作成するには、KafkaSource
オブジェクトを定義する YAML ファイルを作成し、oc apply
コマンドを使用してそれを適用する必要があります。
前提条件
-
OpenShift Serverless Operator、Knative Serving、および
KnativeKafka
カスタムリソースがクラスターにインストールされている。 - OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
- インポートする Kafka メッセージを生成する Red Hat AMQ Streams (Kafka) クラスターにアクセスできる。
-
OpenShift CLI (
oc
) をインストールしている。
手順
KafkaSource
オブジェクトを YAML ファイルとして作成します。apiVersion: sources.knative.dev/v1beta1 kind: KafkaSource metadata: name: <source_name> spec: consumerGroup: <group_name> 1 bootstrapServers: - <list_of_bootstrap_servers> topics: - <list_of_topics> 2 sink: - <list_of_sinks> 3
重要OpenShift Serverless 上の
KafkaSource
オブジェクトの API のv1beta1
バージョンのみがサポートされます。非推奨となったv1alpha1
バージョンの API は使用しないでください。KafkaSource
オブジェクトの例apiVersion: sources.knative.dev/v1beta1 kind: KafkaSource metadata: name: kafka-source spec: consumerGroup: knative-group bootstrapServers: - my-cluster-kafka-bootstrap.kafka:9092 topics: - knative-demo-topic sink: ref: apiVersion: serving.knative.dev/v1 kind: Service name: event-display
KafkaSource
YAML ファイルを適用します。$ oc apply -f <filename>
検証
以下のコマンドを入力して、Kafka イベントソースが作成されたことを確認します。
$ oc get pods
出力例
NAME READY STATUS RESTARTS AGE kafkasource-kafka-source-5ca0248f-... 1/1 Running 0 13m
5.2.5.4. Kafka ソースの SASL 認証の設定
Simple Authentication and Security Layer (SASL) は、Apache Kafka が認証に使用します。クラスターで SASL 認証を使用する場合、ユーザーは Kafka クラスターと通信するために Knative に認証情報を提供する必要があります。そうしないと、イベントを生成または消費できません。
前提条件
- OpenShift Container Platform でクラスターまたは専用の管理者パーミッションを持っている。
-
OpenShift Serverless Operator、Knative Eventing、および
KnativeKafka
CR は、OpenShift Container Platform クラスターにインストールされている。 - OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
- Kafka クラスターのユーザー名およびパスワードがある。
-
使用する SASL メカニズムを選択している (例:
PLAIN
、SCRAM-SHA-256
、またはSCRAM-SHA-512
)。 -
TLS が有効にされている場合、Kafka クラスターの
ca.crt
証明書ファイルも必要になります。 -
OpenShift (
oc
) CLI がインストールされている。
手順
選択された namespace にシークレットとして証明書ファイルを作成します。
$ oc create secret -n <namespace> generic <kafka_auth_secret> \ --from-file=ca.crt=caroot.pem \ --from-literal=password="SecretPassword" \ --from-literal=saslType="SCRAM-SHA-512" \ 1 --from-literal=user="my-sasl-user"
- 1
- SASL タイプは
PLAIN
、SCRAM-SHA-256
、またはSCRAM-SHA-512
です。
Kafka ソースを作成または変更して、次の
spec
設定が含まれるようにします。apiVersion: sources.knative.dev/v1beta1 kind: KafkaSource metadata: name: example-source spec: ... net: sasl: enable: true user: secretKeyRef: name: <kafka_auth_secret> key: user password: secretKeyRef: name: <kafka_auth_secret> key: password type: secretKeyRef: name: <kafka_auth_secret> key: saslType tls: enable: true caCert: 1 secretKeyRef: name: <kafka_auth_secret> key: ca.crt ...
- 1
- Red Hat OpenShift Streams for Apache Kafka などのパブリッククラウド Kafka サービスを使用している場合は、
caCert
仕様は必要ありません。
5.2.6. カスタムイベントソース
Knative に含まれていないイベントプロデューサーや、CloudEvent
形式ではないイベントを生成するプロデューサーからイベントを Ingress する必要がある場合は、カスタムイベントソースを使用してこれを実行できます。カスタムイベントソースは、次のいずれかの方法で作成できます。
-
シンクバインディングを作成して、
PodSpecable
オブジェクトをイベントソースとして使用します。 - コンテナーソースを作成して、コンテナーをイベントソースとして使用します。
5.2.6.1. シンクバインディング
SinkBinding
オブジェクトは、イベント生成を配信アドレス指定から切り離すことをサポートします。シンクバインディングは、イベントプロデューサー をイベントコンシューマーまたは シンク に接続するために使用されます。イベントプロデューサーは、PodSpec
テンプレートを組み込む Kubernetes リソースであり、イベントを生成します。シンクは、イベントを受信できるアドレス指定可能な Kubernetes オブジェクトです。
SinkBinding
オブジェクトは、環境変数をシンクの PodTemplateSpec
に挿入します。つまり、アプリケーションコードが Kubernetes API と直接対話してイベントの宛先を見つける必要はありません。これらの環境変数は以下のとおりです。
K_SINK
- 解決されたシンクの URL。
K_CE_OVERRIDES
- アウトバウンドイベントの上書きを指定する JSON オブジェクト。
現在、SinkBinding
オブジェクトはサービスのカスタムリビジョン名をサポートしません。
5.2.6.1.1. YAML を使用したシンクバインディングの作成
YAML ファイルを使用して Knative リソースを作成する場合、宣言的 API を使用するため、再現性の高い方法でイベントソースを宣言的に記述することができます。YAML を使用してシンクバインディングを作成するには、SinkBinding
オブジェクトを定義する YAML ファイルを作成し、oc apply
コマンドを使用してそれを適用する必要があります。
前提条件
- OpenShift Serverless Operator、Knative Serving、および Knative Eventing がクラスターにインストールされている。
-
OpenShift CLI (
oc
) をインストールしている。 - OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
手順
シンクバインディングが正しく設定されていることを確認するには、受信メッセージをダンプする Knative イベント表示サービスまたはイベントシンクを作成します。
サービス YAML ファイルを作成します。
サービス YAML ファイルの例
apiVersion: serving.knative.dev/v1 kind: Service metadata: name: event-display spec: template: spec: containers: - image: quay.io/openshift-knative/knative-eventing-sources-event-display:latest
サービスを作成します。
$ oc apply -f <filename>
イベントをサービスに転送するシンクバインディングインスタンスを作成します。
シンクバインディング YAML ファイルを作成します。
サービス YAML ファイルの例
apiVersion: sources.knative.dev/v1alpha1 kind: SinkBinding metadata: name: bind-heartbeat spec: subject: apiVersion: batch/v1 kind: Job 1 selector: matchLabels: app: heartbeat-cron sink: ref: apiVersion: serving.knative.dev/v1 kind: Service name: event-display
- 1
- この例では、ラベル
app: heartbeat-cron
を指定したジョブがイベントシンクにバインドされます。
シンクバインディングを作成します。
$ oc apply -f <filename>
CronJob
オブジェクトを作成します。cron ジョブの YAML ファイルを作成します。
cron ジョブの YAML ファイルの例
apiVersion: batch/v1 kind: CronJob metadata: name: heartbeat-cron spec: # Run every minute schedule: "* * * * *" jobTemplate: metadata: labels: app: heartbeat-cron bindings.knative.dev/include: "true" spec: template: spec: restartPolicy: Never containers: - name: single-heartbeat image: quay.io/openshift-knative/heartbeats:latest args: - --period=1 env: - name: ONE_SHOT value: "true" - name: POD_NAME valueFrom: fieldRef: fieldPath: metadata.name - name: POD_NAMESPACE valueFrom: fieldRef: fieldPath: metadata.namespace
重要シンクバインディングを使用するには、
bindings.knative.dev/include=true
ラベルを Knative リソースに手動で追加する必要があります。たとえば、このラベルを
CronJob
インスタンスに追加するには、以下の行をJob
リソースの YAML 定義に追加します。jobTemplate: metadata: labels: app: heartbeat-cron bindings.knative.dev/include: "true"
cron ジョブを作成します。
$ oc apply -f <filename>
以下のコマンドを入力し、出力を検査して、コントローラーが正しくマップされていることを確認します。
$ oc get sinkbindings.sources.knative.dev bind-heartbeat -oyaml
出力例
spec: sink: ref: apiVersion: serving.knative.dev/v1 kind: Service name: event-display namespace: default subject: apiVersion: batch/v1 kind: Job namespace: default selector: matchLabels: app: heartbeat-cron
検証
メッセージダンパー機能ログを確認して、Kubernetes イベントが Knative イベントシンクに送信されていることを確認できます。
コマンドを入力します。
$ oc get pods
コマンドを入力します。
$ oc logs $(oc get pod -o name | grep event-display) -c user-container
出力例
☁️ cloudevents.Event Validation: valid Context Attributes, specversion: 1.0 type: dev.knative.eventing.samples.heartbeat source: https://knative.dev/eventing-contrib/cmd/heartbeats/#event-test/mypod id: 2b72d7bf-c38f-4a98-a433-608fbcdd2596 time: 2019-10-18T15:23:20.809775386Z contenttype: application/json Extensions, beats: true heart: yes the: 42 Data, { "id": 1, "label": "" }
5.2.6.1.2. Knative CLI を使用したシンクバインディングの作成
kn source binding create
コマンドを使用し、Knative (kn
) を使用してシンクバインディングを作成できます。イベントソースを作成するために Knative CLI を使用すると、YAML ファイルを直接修正するよりも合理的で直感的なユーザーインターフェイスが得られます。
前提条件
- OpenShift Serverless Operator、Knative Serving、および Knative Eventing がクラスターにインストールされている。
- OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
-
Knative (
kn
) CLI をインストールしている。 -
OpenShift CLI (
oc
) をインストールしている。
以下の手順では、YAML ファイルを作成する必要があります。
サンプルで使用されたもので YAML ファイルの名前を変更する場合は、必ず対応する CLI コマンドを更新する必要があります。
手順
シンクバインディングが正しく設定されていることを確認するには、受信メッセージをダンプする Knative イベント表示サービスまたはイベントシンクを作成します。
$ kn service create event-display --image quay.io/openshift-knative/knative-eventing-sources-event-display:latest
イベントをサービスに転送するシンクバインディングインスタンスを作成します。
$ kn source binding create bind-heartbeat --subject Job:batch/v1:app=heartbeat-cron --sink ksvc:event-display
CronJob
オブジェクトを作成します。cron ジョブの YAML ファイルを作成します。
cron ジョブの YAML ファイルの例
apiVersion: batch/v1 kind: CronJob metadata: name: heartbeat-cron spec: # Run every minute schedule: "* * * * *" jobTemplate: metadata: labels: app: heartbeat-cron bindings.knative.dev/include: "true" spec: template: spec: restartPolicy: Never containers: - name: single-heartbeat image: quay.io/openshift-knative/heartbeats:latest args: - --period=1 env: - name: ONE_SHOT value: "true" - name: POD_NAME valueFrom: fieldRef: fieldPath: metadata.name - name: POD_NAMESPACE valueFrom: fieldRef: fieldPath: metadata.namespace
重要シンクバインディングを使用するには、
bindings.knative.dev/include=true
ラベルを Knative CR に手動で追加する必要があります。たとえば、このラベルを
CronJob
CR に追加するには、以下の行をJob
CR の YAML 定義に追加します。jobTemplate: metadata: labels: app: heartbeat-cron bindings.knative.dev/include: "true"
cron ジョブを作成します。
$ oc apply -f <filename>
以下のコマンドを入力し、出力を検査して、コントローラーが正しくマップされていることを確認します。
$ kn source binding describe bind-heartbeat
出力例
Name: bind-heartbeat Namespace: demo-2 Annotations: sources.knative.dev/creator=minikube-user, sources.knative.dev/lastModifier=minikub ... Age: 2m Subject: Resource: job (batch/v1) Selector: app: heartbeat-cron Sink: Name: event-display Resource: Service (serving.knative.dev/v1) Conditions: OK TYPE AGE REASON ++ Ready 2m
検証
メッセージダンパー機能ログを確認して、Kubernetes イベントが Knative イベントシンクに送信されていることを確認できます。
以下のコマンドを入力して、メッセージダンパー機能ログを表示します。
$ oc get pods
$ oc logs $(oc get pod -o name | grep event-display) -c user-container
出力例
☁️ cloudevents.Event Validation: valid Context Attributes, specversion: 1.0 type: dev.knative.eventing.samples.heartbeat source: https://knative.dev/eventing-contrib/cmd/heartbeats/#event-test/mypod id: 2b72d7bf-c38f-4a98-a433-608fbcdd2596 time: 2019-10-18T15:23:20.809775386Z contenttype: application/json Extensions, beats: true heart: yes the: 42 Data, { "id": 1, "label": "" }
5.2.6.1.2.1. Knative CLI シンクフラグ
Knative (kn
) CLI を使用してイベントソースを作成する場合、--sink
フラグを使用して、イベントがリソースから送信されるシンクを指定できます。シンクは、他のリソースから受信イベントを受信できる、アドレス指定可能または呼び出し可能な任意のリソースです。
以下の例では、サービスの http://event-display.svc.cluster.local
をシンクとして使用するシンクバインディングを作成します。
シンクフラグを使用したコマンドの例
$ kn source binding create bind-heartbeat \
--namespace sinkbinding-example \
--subject "Job:batch/v1:app=heartbeat-cron" \
--sink http://event-display.svc.cluster.local \ 1
--ce-override "sink=bound"
- 1
http://event-display.svc.cluster.local
のsvc
は、シンクが Knative サービスであることを判別します。他のデフォルトのシンクの接頭辞には、channel
およびbroker
が含まれます。
5.2.6.1.3. Web コンソールを使用したシンクバインディングの作成
Knative Eventing がクラスターにインストールされると、Web コンソールを使用して シンクバインディングを作成できます。OpenShift Container Platform Web コンソールを使用すると、イベントソースを作成するための合理的で直感的なユーザーインターフェイスが提供されます。
前提条件
- OpenShift Container Platform Web コンソールにログインしている。
- OpenShift Serverless Operator、Knative Serving、および Knative Eventing が OpenShift Container Platform クラスターにインストールされている。
- OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
手順
シンクとして使用する Knative サービスを作成します。
-
Developer パースペクティブで、+Add
YAML に移動します。 サンプル YAML をコピーします。
apiVersion: serving.knative.dev/v1 kind: Service metadata: name: event-display spec: template: spec: containers: - image: quay.io/openshift-knative/knative-eventing-sources-event-display:latest
- Create をクリックします。
-
Developer パースペクティブで、+Add
イベントソースとして使用される
CronJob
リソースを作成し、1 分ごとにイベントを送信します。-
Developer パースペクティブで、+Add
YAML に移動します。 サンプル YAML をコピーします。
apiVersion: batch/v1 kind: CronJob metadata: name: heartbeat-cron spec: # Run every minute schedule: "*/1 * * * *" jobTemplate: metadata: labels: app: heartbeat-cron bindings.knative.dev/include: true 1 spec: template: spec: restartPolicy: Never containers: - name: single-heartbeat image: quay.io/openshift-knative/heartbeats args: - --period=1 env: - name: ONE_SHOT value: "true" - name: POD_NAME valueFrom: fieldRef: fieldPath: metadata.name - name: POD_NAMESPACE valueFrom: fieldRef: fieldPath: metadata.namespace
- 1
bindings.knative.dev/include: true
ラベルを含めるようにしてください。OpenShift Serverless のデフォルトの namespace 選択動作は包含モードを使用します。
- Create をクリックします。
-
Developer パースペクティブで、+Add
直前の手順で作成したサービスと同じ namespace、またはイベントの送信先となる他のシンクと同じ namespace にシンクバインディングを作成します。
-
Developer パースペクティブで、+Add
Event Source に移動します。Event Sources ページが表示されます。 - オプション: イベントソースに複数のプロバイダーがある場合は、Providers 一覧から必要なプロバイダーを選択し、プロバイダーから利用可能なイベントソースをフィルターします。
Sink Binding を選択し、Create Event Source をクリックします。Create Event Source ページが表示されます。
注記Form view または YAML view を使用して Sink Binding 設定を設定し、ビューを切り替えることができます。ビューの切り替え時に、データは永続化されます。
-
apiVersion フィールドに
batch/v1
を入力します。 Kind フィールドに
Job
と入力します。注記CronJob
の種類は OpenShift Serverless シンクバインディングで直接サポートされていないため、Kind フィールドは cron ジョブオブジェクト自体ではなく、cron ジョブで作成されるJob
オブジェクトをターゲットにする必要があります。-
Sink を選択します。これは Resource または URI のいずれかになります。この例では、直前の手順で作成された
event-display
サービスが Resources シンクとして使用されます。 Match labels セクションで以下を実行します。
-
Name フィールドに
app
と入力します。 Value フィールドに
heartbeat-cron
と入力します。注記ラベルセレクターは、リソース名ではなくシンクバインディングで cron ジョブを使用する場合に必要になります。これは、cron ジョブで作成されたジョブには予測可能な名前がなく、名前に無作為に生成される文字列が含まれているためです。たとえば、
hearthbeat-cron-1cc23f
になります。
-
Name フィールドに
- Create をクリックします。
-
Developer パースペクティブで、+Add
検証
Topology ページおよび Pod ログを表示して、シンクバインディング、シンク、および cron ジョブが正常に作成され、機能していることを確認できます。
- Developer パースペクティブで、Topology に移動します。
シンクバインディング、シンク、およびハートビートの cron ジョブを表示します。
- シンクバインディングが追加されると、正常なジョブが cron ジョブによって登録されていることを確認します。つまり、シンクバインディングは cron ジョブで作成されたジョブが正常に再設定されることを意味します。
-
event-display
サービス Pod のログを参照し、ハートビート cron ジョブで生成されるイベントを表示します。
5.2.6.1.4. シンクバインディング参照
シンクバインディングを作成して、PodSpecable
オブジェクトをイベントソースとして使用できます。SinkBinding
オブジェクトを作成するときに、複数のパラメーターを設定できます。
SinkBinding
オブジェクトは以下のパラメーターをサポートします。
フィールド | 説明 | 必須またはオプション |
---|---|---|
|
API バージョンを指定します (例: | 必須 |
|
このリソースオブジェクトを | 必須 |
|
| 必須 |
|
この | 必須 |
| シンクとして使用する URI に解決するオブジェクトへの参照。 | 必須 |
| ランタイムコントラクトがバインディング実装によって拡張されるリソースを参照します。 | 必須 |
| 上書きを定義して、シンクに送信されたイベントへの出力形式および変更を制御します。 | 任意 |
5.2.6.1.4.1. Subject パラメーター
Subject
パラメーターは、ランタイムコントラクトがバインディング実装によって拡張されるリソースを参照します。Subject
定義に複数のフィールドを設定できます。
Subject
定義は、以下のフィールドをサポートします。
フィールド | 説明 | 必須またはオプション |
---|---|---|
| 参照先の API バージョン。 | 必須 |
| 参照先の種類。 | 必須 |
| 参照先の namespace。省略されている場合、デフォルトはオブジェクトの namespace に設定されます。 | 任意 |
| 参照先の名前。 |
|
| 参照先のセレクター。 |
|
| ラベルセレクターの要件の一覧です。 |
|
| セレクターが適用されるラベルキー。 |
|
|
キーと値のセットの関係を表します。有効な演算子は |
|
|
文字列値の配列。 |
|
|
キーと値のペアのマップ。 |
|
サブジェクトパラメーターの例
以下の YAML の場合は、default
namespace の mysubject
という名前の Deployment
オブジェクトが選択されます。
apiVersion: sources.knative.dev/v1 kind: SinkBinding metadata: name: bind-heartbeat spec: subject: apiVersion: apps/v1 kind: Deployment namespace: default name: mysubject ...
以下の YAML の場合、default
namespace にラベル working=example
が設定された Job
オブジェクトが選択されます。
apiVersion: sources.knative.dev/v1 kind: SinkBinding metadata: name: bind-heartbeat spec: subject: apiVersion: batch/v1 kind: Job namespace: default selector: matchLabels: working: example ...
以下の YAML の場合、default
namespace にラベル working=example
または working=sample
が含まれる Pod
オブジェクトが選択されます。
apiVersion: sources.knative.dev/v1 kind: SinkBinding metadata: name: bind-heartbeat spec: subject: apiVersion: v1 kind: Pod namespace: default selector: - matchExpression: key: working operator: In values: - example - sample ...
5.2.6.1.4.2. CloudEvent オーバーライド
ceOverrides
定義は、シンクに送信される CloudEvent の出力形式および変更を制御するオーバーライドを提供します。ceOverrides
定義に複数のフィールドを設定できます。
ceOverrides
の定義は、以下のフィールドをサポートします。
フィールド | 説明 | 必須またはオプション |
---|---|---|
|
アウトバウンドイベントで追加または上書きされる属性を指定します。各 | 任意 |
拡張子として許可されるのは、有効な CloudEvent
属性名のみです。拡張機能オーバーライド設定から仕様定義属性を設定することはできません。たとえば、type
属性を変更することはできません。
CloudEvent オーバーライドの例
apiVersion: sources.knative.dev/v1 kind: SinkBinding metadata: name: bind-heartbeat spec: ... ceOverrides: extensions: extra: this is an extra attribute additional: 42
これにより、subject
に K_CE_OVERRIDES
環境変数が設定されます。
出力例
{ "extensions": { "extra": "this is an extra attribute", "additional": "42" } }
5.2.6.1.4.3. include ラベル
シンクバインディングを使用するには、bindings.knative.dev/include: "true"
ラベルをリソースまたはリソースが含まれる namespace のいずれかに割り当てる必要があります。リソース定義にラベルが含まれていない場合には、クラスター管理者は以下を実行してこれを namespace に割り当てることができます。
$ oc label namespace <namespace> bindings.knative.dev/include=true
5.2.6.2. コンテナーソース
コンテナーソースは、イベントを生成し、イベントをシンクに送信するコンテナーイメージを作成します。コンテナーソースを使用して、イメージ URI を使用するコンテナーイメージおよび ContainerSource
オブジェクトを作成して、カスタムイベントソースを作成できます。
5.2.6.2.1. コンテナーイメージを作成するためのガイドライン
コンテナーソースコントローラーには、K_SINK
および K_CE_OVERRIDES
の 2 つの環境変数が注入されます。これらの変数は、それぞれ sink
および ceOverrides
仕様から解決されます。イベントは、K_SINK
環境変数で指定されたシンク URI に送信されます。メッセージは、CloudEvent
HTTP 形式を使用して POST
として送信する必要があります。
コンテナーイメージの例
以下は、ハートビートコンテナーイメージの例になります。
package main import ( "context" "encoding/json" "flag" "fmt" "log" "os" "strconv" "time" duckv1 "knative.dev/pkg/apis/duck/v1" cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/kelseyhightower/envconfig" ) type Heartbeat struct { Sequence int `json:"id"` Label string `json:"label"` } var ( eventSource string eventType string sink string label string periodStr string ) func init() { flag.StringVar(&eventSource, "eventSource", "", "the event-source (CloudEvents)") flag.StringVar(&eventType, "eventType", "dev.knative.eventing.samples.heartbeat", "the event-type (CloudEvents)") flag.StringVar(&sink, "sink", "", "the host url to heartbeat to") flag.StringVar(&label, "label", "", "a special label") flag.StringVar(&periodStr, "period", "5", "the number of seconds between heartbeats") } type envConfig struct { // Sink URL where to send heartbeat cloud events Sink string `envconfig:"K_SINK"` // CEOverrides are the CloudEvents overrides to be applied to the outbound event. CEOverrides string `envconfig:"K_CE_OVERRIDES"` // Name of this pod. Name string `envconfig:"POD_NAME" required:"true"` // Namespace this pod exists in. Namespace string `envconfig:"POD_NAMESPACE" required:"true"` // Whether to run continuously or exit. OneShot bool `envconfig:"ONE_SHOT" default:"false"` } func main() { flag.Parse() var env envConfig if err := envconfig.Process("", &env); err != nil { log.Printf("[ERROR] Failed to process env var: %s", err) os.Exit(1) } if env.Sink != "" { sink = env.Sink } var ceOverrides *duckv1.CloudEventOverrides if len(env.CEOverrides) > 0 { overrides := duckv1.CloudEventOverrides{} err := json.Unmarshal([]byte(env.CEOverrides), &overrides) if err != nil { log.Printf("[ERROR] Unparseable CloudEvents overrides %s: %v", env.CEOverrides, err) os.Exit(1) } ceOverrides = &overrides } p, err := cloudevents.NewHTTP(cloudevents.WithTarget(sink)) if err != nil { log.Fatalf("failed to create http protocol: %s", err.Error()) } c, err := cloudevents.NewClient(p, cloudevents.WithUUIDs(), cloudevents.WithTimeNow()) if err != nil { log.Fatalf("failed to create client: %s", err.Error()) } var period time.Duration if p, err := strconv.Atoi(periodStr); err != nil { period = time.Duration(5) * time.Second } else { period = time.Duration(p) * time.Second } if eventSource == "" { eventSource = fmt.Sprintf("https://knative.dev/eventing-contrib/cmd/heartbeats/#%s/%s", env.Namespace, env.Name) log.Printf("Heartbeats Source: %s", eventSource) } if len(label) > 0 && label[0] == '"' { label, _ = strconv.Unquote(label) } hb := &Heartbeat{ Sequence: 0, Label: label, } ticker := time.NewTicker(period) for { hb.Sequence++ event := cloudevents.NewEvent("1.0") event.SetType(eventType) event.SetSource(eventSource) event.SetExtension("the", 42) event.SetExtension("heart", "yes") event.SetExtension("beats", true) if ceOverrides != nil && ceOverrides.Extensions != nil { for n, v := range ceOverrides.Extensions { event.SetExtension(n, v) } } if err := event.SetData(cloudevents.ApplicationJSON, hb); err != nil { log.Printf("failed to set cloudevents data: %s", err.Error()) } log.Printf("sending cloudevent to %s", sink) if res := c.Send(context.Background(), event); !cloudevents.IsACK(res) { log.Printf("failed to send cloudevent: %v", res) } if env.OneShot { return } // Wait for next tick <-ticker.C } }
以下は、以前のハートビートコンテナーイメージを参照するコンテナーソースの例です。
apiVersion: sources.knative.dev/v1 kind: ContainerSource metadata: name: test-heartbeats spec: template: spec: containers: # This corresponds to a heartbeats image URI that you have built and published - image: gcr.io/knative-releases/knative.dev/eventing/cmd/heartbeats name: heartbeats args: - --period=1 env: - name: POD_NAME value: "example-pod" - name: POD_NAMESPACE value: "event-test" sink: ref: apiVersion: serving.knative.dev/v1 kind: Service name: example-service ...
5.2.6.2.2. Knative CLI を使用したコンテナーソースの作成および管理
kn source container
コマンドを使用し、Knative (kn
) CLI を使用してコンテナーソースを作成および管理できます。イベントソースを作成するために Knative CLI を使用すると、YAML ファイルを直接修正するよりも合理的で直感的なユーザーインターフェイスが得られます。
コンテナーソースを作成します。
$ kn source container create <container_source_name> --image <image_uri> --sink <sink>
コンテナーソースの削除
$ kn source container delete <container_source_name>
コンテナーソースを記述します。
$ kn source container describe <container_source_name>
既存のコンテナーソースを一覧表示
$ kn source container list
既存のコンテナーソースを YAML 形式で一覧表示
$ kn source container list -o yaml
コンテナーソースを更新します。
このコマンドにより、既存のコンテナーソースのイメージ URI が更新されます。
$ kn source container update <container_source_name> --image <image_uri>
5.2.6.2.3. Web コンソールを使用したコンテナーソースの作成
Knative Eventing がクラスターにインストールされると、Web コンソールを使用してコンテナーソースを作成できます。OpenShift Container Platform Web コンソールを使用すると、イベントソースを作成するための合理的で直感的なユーザーインターフェイスが提供されます。
前提条件
- OpenShift Container Platform Web コンソールにログインしている。
- OpenShift Serverless Operator、Knative Serving、および Knative Eventing が OpenShift Container Platform クラスターにインストールされている。
- OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
手順
-
Developer パースペクティブで、+Add
Event Source に移動します。Event Sources ページが表示されます。 - Container Source を選択してから Create Event Source をクリックします。Create Event Source ページが表示されます。
Form view または YAML view を使用して、Container Source 設定を設定します。
注記Form view と YAML view 間で切り換えることができます。ビューの切り替え時に、データは永続化されます。
- Image フィールドに、コンテナーソースが作成したコンテナーで実行するイメージの URI を入力します。
- Name フィールドにイメージの名前を入力します。
- オプション: Arguments フィールドで、コンテナーに渡す引数を入力します。
- オプション: Environment variables フィールドで、コンテナーに設定する環境変数を追加します。
Sink セクションで、コンテナーソースからのイベントがルーティングされるシンクを追加します。Form ビューを使用している場合は、以下のオプションから選択できます。
- Resource を選択して、チャネル、ブローカー、またはサービスをイベントソースのシンクとして使用します。
- URI を選択して、コンテナーソースからのイベントのルーティング先を指定します。
- コンテナーソースの設定が完了したら、Create をクリックします。
5.2.6.2.4. コンテナーソースのリファレンス
ContainerSource
オブジェクトを作成することにより、コンテナーをイベントソースとして使用できます。ContainerSource
オブジェクトを作成するときに、複数のパラメーターを設定できます。
ContainerSource
オブジェクトは以下のフィールドをサポートします。
フィールド | 説明 | 必須またはオプション |
---|---|---|
|
API バージョンを指定します (例: | 必須 |
|
このリソースオブジェクトを | 必須 |
|
| 必須 |
|
この | 必須 |
| シンクとして使用する URI に解決するオブジェクトへの参照。 | 必須 |
|
| 必須 |
| 上書きを定義して、シンクに送信されたイベントへの出力形式および変更を制御します。 | 任意 |
テンプレートパラメーターの例
apiVersion: sources.knative.dev/v1 kind: ContainerSource metadata: name: test-heartbeats spec: template: spec: containers: - image: quay.io/openshift-knative/heartbeats:latest name: heartbeats args: - --period=1 env: - name: POD_NAME value: "mypod" - name: POD_NAMESPACE value: "event-test" ...
5.2.6.2.4.1. CloudEvent オーバーライド
ceOverrides
定義は、シンクに送信される CloudEvent の出力形式および変更を制御するオーバーライドを提供します。ceOverrides
定義に複数のフィールドを設定できます。
ceOverrides
の定義は、以下のフィールドをサポートします。
フィールド | 説明 | 必須またはオプション |
---|---|---|
|
アウトバウンドイベントで追加または上書きされる属性を指定します。各 | 任意 |
拡張子として許可されるのは、有効な CloudEvent
属性名のみです。拡張機能オーバーライド設定から仕様定義属性を設定することはできません。たとえば、type
属性を変更することはできません。
CloudEvent オーバーライドの例
apiVersion: sources.knative.dev/v1 kind: ContainerSource metadata: name: test-heartbeats spec: ... ceOverrides: extensions: extra: this is an extra attribute additional: 42
これにより、subject
に K_CE_OVERRIDES
環境変数が設定されます。
出力例
{ "extensions": { "extra": "this is an extra attribute", "additional": "42" } }
5.2.7. 開発者パースペクティブを使用してイベントソースをシンクに接続する
OpenShift Container Platform Web コンソールを使用してイベントソースを作成する場合、イベントがソースから送信されるシンクを指定できます。シンクは、他のリソースから受信イベントを受信できる、アドレス指定可能または呼び出し可能な任意のリソースです。
5.2.7.1. Developer パースペクティブを使用してイベントソースをシンクに接続します。
前提条件
- OpenShift Serverless Operator、Knative Serving、および Knative Eventing が OpenShift Container Platform クラスターにインストールされている。
- Web コンソールにログインしており、Developer パースペクティブを使用している。
- OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
- Knative サービス、チャネル、ブローカーなどのシンクを作成している。
手順
-
+Add
Event Source に移動して任意のタイプのイベントソースを作成し、作成するイベントソースを選択します。 - イベントソースの作成 フォームビューの シンク セクションで、リソース リストからシンクを選択します。
- Create をクリックします。
検証
Topology ページを表示して、イベントソースが作成され、シンクに接続されていることを確認できます。
- Developer パースペクティブで、Topology に移動します。
- イベントソースを表示し、接続されたシンクをクリックして、右側のパネルにシンクの詳細を表示します。