5.2. イベントソース


5.2.1. イベントソース

Knative イベントソース には、クラウドイベントの生成またはインポート、これらのイベントの別のエンドポイントへのリレー (sink とも呼ばれる) を行う Kubernetes オブジェクトを指定できます。イベントに対応する分散システムを開発するには、イベントのソースが重要になります。

OpenShift Container Platform Web コンソールの Developer パースペクティブ、Knative (kn) CLI を使用するか、YAML ファイルを適用することで、Knative イベントソースを作成および管理できます。

現時点で、OpenShift Serverless は以下のイベントソースタイプをサポートします。

API サーバーソース
Kubernetes API サーバーイベントを Knative に送ります。API サーバーソースは、Kubernetes リソースが作成、更新、または削除されるたびに新規イベントを送信します。
Ping ソース
指定された cron スケジュールに、固定ペイロードを使用してイベントを生成します。
Kafka イベントソース
Kafka クラスターをイベントソースとしてシンクに接続します。

カスタムイベントソース を作成することもできます。

5.2.2. Administrator パースペクティブのイベントソース

イベントに対応する分散システムを開発するには、イベントのソースが重要になります。

5.2.2.1. Administrator パースペクティブを使用したイベントソースの作成

Knative イベントソース には、クラウドイベントの生成またはインポート、これらのイベントの別のエンドポイントへのリレー (sink とも呼ばれる) を行う Kubernetes オブジェクトを指定できます。

前提条件

  • OpenShift Serverless Operator および Knative Eventing が OpenShift Container Platform クラスターにインストールされている。
  • Web コンソールにログインしており、Administrator パースペクティブを使用している。
  • OpenShift Container Platform のクラスター管理者パーミッションがある。

手順

  1. OpenShift Container Platform Web コンソールの Administrator パースペクティブで、 Serverless Eventing に移動します。
  2. Create 一覧で、Event Source を選択します。Event Sources ページに移動します。
  3. 作成するイベントソースタイプを選択します。

5.2.3. API サーバーソースの作成

API サーバーソースは、Knative サービスなどのイベントシンクを Kubernetes API サーバーに接続するために使用できるイベントソースです。API サーバーソースは Kubernetes イベントを監視し、それらを Knative Eventing ブローカーに転送します。

5.2.3.1. Web コンソールを使用した API サーバーソースの作成

Knative Eventing がクラスターにインストールされると、Web コンソールを使用して API サーバーソースを作成できます。OpenShift Container Platform Web コンソールを使用すると、イベントソースを作成するための合理的で直感的なユーザーインターフェイスが提供されます。

前提条件

  • OpenShift Container Platform Web コンソールにログインしている。
  • OpenShift Serverless Operator および Knative Eventing がクラスターにインストールされている。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
  • OpenShift CLI (oc) がインストールされている。
手順

既存のサービスアカウントを再利用する必要がある場合には、既存の ServiceAccount リソースを変更して、新規リソースを作成せずに、必要なパーミッションを含めることができます。

  1. イベントソースのサービスアカウント、ロールおよびロールバインディングを YAML ファイルとして作成します。

    apiVersion: v1
    kind: ServiceAccount
    metadata:
      name: events-sa
      namespace: default 1
    
    ---
    apiVersion: rbac.authorization.k8s.io/v1
    kind: Role
    metadata:
      name: event-watcher
      namespace: default 2
    rules:
      - apiGroups:
          - ""
        resources:
          - events
        verbs:
          - get
          - list
          - watch
    
    ---
    apiVersion: rbac.authorization.k8s.io/v1
    kind: RoleBinding
    metadata:
      name: k8s-ra-event-watcher
      namespace: default 3
    roleRef:
      apiGroup: rbac.authorization.k8s.io
      kind: Role
      name: event-watcher
    subjects:
      - kind: ServiceAccount
        name: events-sa
        namespace: default 4
    1 2 3 4
    この namespace を、イベントソースのインストールに選択した namespace に変更します。
  2. YAML ファイルを適用します。

    $ oc apply -f <filename>
  3. Developer パースペクティブで、+Add Event Source に移動します。Event Sources ページが表示されます。
  4. オプション: イベントソースに複数のプロバイダーがある場合は、Providers 一覧から必要なプロバイダーを選択し、プロバイダーから利用可能なイベントソースをフィルターします。
  5. ApiServerSource を選択してから Create Event Source をクリックします。Create Event Source ページが表示されます。
  6. Form view または YAML view を使用して、ApiServerSource 設定を設定します。

    注記

    Form viewYAML view 間で切り換えることができます。ビューの切り替え時に、データは永続化されます。

    1. APIVERSIONv1 を、KINDEvent を入力します。
    2. 作成したサービスアカウントの Service Account Name を選択します。
    3. イベントソースの Sink を選択します。Sink は、チャネル、ブローカー、またはサービスなどの Resource、または URI のいずれかになります。
  7. Create をクリックします。

検証

  • API サーバーソースの作成後、これが Topology ビューでシンクされるサービスに接続されていることを確認できます。

    ApiServerSource Topology ビュー
注記

URI シンクが使用される場合、URI sink Edit URI を右クリックして URI を変更します。

API サーバーソースの削除

  1. Topology ビューに移動します。
  2. API サーバーソースを右クリックし、Delete ApiServerSource を選択します。

    ApiServerSource の削除

5.2.3.2. Knative CLI を使用した API サーバーソースの作成

kn source apiserver create コマンドを使用し、kn CLI を使用して API サーバーソースを作成できます。API サーバーソースを作成するために kn CLI を使用すると、YAML ファイルを直接修正するよりも合理的で直感的なユーザーインターフェイスが得られます。

前提条件

  • OpenShift Serverless Operator および Knative Eventing がクラスターにインストールされている。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
  • OpenShift CLI (oc) がインストールされている。
  • Knative (kn) CLI をインストールしている。
手順

既存のサービスアカウントを再利用する必要がある場合には、既存の ServiceAccount リソースを変更して、新規リソースを作成せずに、必要なパーミッションを含めることができます。

  1. イベントソースのサービスアカウント、ロールおよびロールバインディングを YAML ファイルとして作成します。

    apiVersion: v1
    kind: ServiceAccount
    metadata:
      name: events-sa
      namespace: default 1
    
    ---
    apiVersion: rbac.authorization.k8s.io/v1
    kind: Role
    metadata:
      name: event-watcher
      namespace: default 2
    rules:
      - apiGroups:
          - ""
        resources:
          - events
        verbs:
          - get
          - list
          - watch
    
    ---
    apiVersion: rbac.authorization.k8s.io/v1
    kind: RoleBinding
    metadata:
      name: k8s-ra-event-watcher
      namespace: default 3
    roleRef:
      apiGroup: rbac.authorization.k8s.io
      kind: Role
      name: event-watcher
    subjects:
      - kind: ServiceAccount
        name: events-sa
        namespace: default 4
    1 2 3 4
    この namespace を、イベントソースのインストールに選択した namespace に変更します。
  2. YAML ファイルを適用します。

    $ oc apply -f <filename>
  3. イベントシンクを持つ API サーバーソースを作成します。次の例では、シンクはブローカーです。

    $ kn source apiserver create <event_source_name> --sink broker:<broker_name> --resource "event:v1" --service-account <service_account_name> --mode Resource
  4. API サーバーソースが正しく設定されていることを確認するには、受信メッセージをログにダンプする Knative サービスを作成します。

    $ kn service create <service_name> --image quay.io/openshift-knative/knative-eventing-sources-event-display:latest
  5. ブローカーをイベントシンクとして使用した場合は、トリガーを作成して、default のブローカーからサービスへのイベントをフィルターリングします。

    $ kn trigger create <trigger_name> --sink ksvc:<service_name>
  6. デフォルト namespace で Pod を起動してイベントを作成します。

    $ oc create deployment hello-node --image quay.io/openshift-knative/knative-eventing-sources-event-display:latest
  7. 以下のコマンドを入力し、生成される出力を検査して、コントローラーが正しくマップされていることを確認します。

    $ kn source apiserver describe <source_name>

    出力例

    Name:                mysource
    Namespace:           default
    Annotations:         sources.knative.dev/creator=developer, sources.knative.dev/lastModifier=developer
    Age:                 3m
    ServiceAccountName:  events-sa
    Mode:                Resource
    Sink:
      Name:       default
      Namespace:  default
      Kind:       Broker (eventing.knative.dev/v1)
    Resources:
      Kind:        event (v1)
      Controller:  false
    Conditions:
      OK TYPE                     AGE REASON
      ++ Ready                     3m
      ++ Deployed                  3m
      ++ SinkProvided              3m
      ++ SufficientPermissions     3m
      ++ EventTypesProvided        3m

検証

メッセージダンパー機能ログを確認して、Kubernetes イベントが Knative に送信されていることを確認できます。

  1. Pod を取得します。

    $ oc get pods
  2. Pod のメッセージダンパー機能ログを表示します。

    $ oc logs $(oc get pod -o name | grep event-display) -c user-container

    出力例

    ☁️  cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.apiserver.resource.update
      datacontenttype: application/json
      ...
    Data,
      {
        "apiVersion": "v1",
        "involvedObject": {
          "apiVersion": "v1",
          "fieldPath": "spec.containers{hello-node}",
          "kind": "Pod",
          "name": "hello-node",
          "namespace": "default",
           .....
        },
        "kind": "Event",
        "message": "Started container",
        "metadata": {
          "name": "hello-node.159d7608e3a3572c",
          "namespace": "default",
          ....
        },
        "reason": "Started",
        ...
      }

API サーバーソースの削除

  1. トリガーを削除します。

    $ kn trigger delete <trigger_name>
  2. イベントソースを削除します。

    $ kn source apiserver delete <source_name>
  3. サービスアカウント、クラスターロール、およびクラスターバインディングを削除します。

    $ oc delete -f authentication.yaml
5.2.3.2.1. Knative CLI シンクフラグ

Knative (kn) CLI を使用してイベントソースを作成する場合、--sink フラグを使用して、イベントがリソースから送信されるシンクを指定できます。シンクは、他のリソースから受信イベントを受信できる、アドレス指定可能または呼び出し可能な任意のリソースです。

以下の例では、サービスの http://event-display.svc.cluster.local をシンクとして使用するシンクバインディングを作成します。

シンクフラグを使用したコマンドの例

$ kn source binding create bind-heartbeat \
  --namespace sinkbinding-example \
  --subject "Job:batch/v1:app=heartbeat-cron" \
  --sink http://event-display.svc.cluster.local \ 1
  --ce-override "sink=bound"

1
http://event-display.svc.cluster.localsvc は、シンクが Knative サービスであることを判別します。他のデフォルトのシンクの接頭辞には、channel および broker が含まれます。

5.2.3.3. YAML ファイルを使用した API サーバーソースの作成

YAML ファイルを使用して Knative リソースを作成する場合、宣言的 API を使用するため、再現性の高い方法でイベントソースを宣言的に記述することができます。YAML を使用して API サーバーソースを作成するには、ApiServerSource オブジェクトを定義する YAML ファイルを作成し、oc apply コマンドを使用してそれを適用する必要があります。

前提条件

  • OpenShift Serverless Operator および Knative Eventing がクラスターにインストールされている。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
  • API サーバーソース YAML ファイルで定義されるものと同じ namespace に default ブローカーを作成している。
  • OpenShift CLI (oc) をインストールしている。
手順

既存のサービスアカウントを再利用する必要がある場合には、既存の ServiceAccount リソースを変更して、新規リソースを作成せずに、必要なパーミッションを含めることができます。

  1. イベントソースのサービスアカウント、ロールおよびロールバインディングを YAML ファイルとして作成します。

    apiVersion: v1
    kind: ServiceAccount
    metadata:
      name: events-sa
      namespace: default 1
    
    ---
    apiVersion: rbac.authorization.k8s.io/v1
    kind: Role
    metadata:
      name: event-watcher
      namespace: default 2
    rules:
      - apiGroups:
          - ""
        resources:
          - events
        verbs:
          - get
          - list
          - watch
    
    ---
    apiVersion: rbac.authorization.k8s.io/v1
    kind: RoleBinding
    metadata:
      name: k8s-ra-event-watcher
      namespace: default 3
    roleRef:
      apiGroup: rbac.authorization.k8s.io
      kind: Role
      name: event-watcher
    subjects:
      - kind: ServiceAccount
        name: events-sa
        namespace: default 4
    1 2 3 4
    この namespace を、イベントソースのインストールに選択した namespace に変更します。
  2. YAML ファイルを適用します。

    $ oc apply -f <filename>
  3. API サーバーソースを YAML ファイルとして作成します。

    apiVersion: sources.knative.dev/v1alpha1
    kind: ApiServerSource
    metadata:
      name: testevents
    spec:
      serviceAccountName: events-sa
      mode: Resource
      resources:
        - apiVersion: v1
          kind: Event
      sink:
        ref:
          apiVersion: eventing.knative.dev/v1
          kind: Broker
          name: default
  4. ApiServerSource YAML ファイルを適用します。

    $ oc apply -f <filename>
  5. API サーバーソースが正しく設定されていることを確認するには、受信メッセージをログにダンプする Knative サービスを YAML ファイルとして作成します。

    apiVersion: serving.knative.dev/v1
    kind: Service
    metadata:
      name: event-display
      namespace: default
    spec:
      template:
        spec:
          containers:
            - image: quay.io/openshift-knative/knative-eventing-sources-event-display:latest
  6. Service YAML ファイルを適用します。

    $ oc apply -f <filename>
  7. 直接の手順で作成下サービスに、default ブローカーからイベントをフィルターする Trigger オブジェクトを YAML ファイルとして作成します。

    apiVersion: eventing.knative.dev/v1
    kind: Trigger
    metadata:
      name: event-display-trigger
      namespace: default
    spec:
      broker: default
      subscriber:
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: event-display
  8. Trigger YAML ファイルを適用します。

    $ oc apply -f <filename>
  9. デフォルト namespace で Pod を起動してイベントを作成します。

    $ oc create deployment hello-node --image=quay.io/openshift-knative/knative-eventing-sources-event-display
  10. 以下のコマンドを入力し、出力を検査して、コントローラーが正しくマップされていることを確認します。

    $ oc get apiserversource.sources.knative.dev testevents -o yaml

    出力例

    apiVersion: sources.knative.dev/v1alpha1
    kind: ApiServerSource
    metadata:
      annotations:
      creationTimestamp: "2020-04-07T17:24:54Z"
      generation: 1
      name: testevents
      namespace: default
      resourceVersion: "62868"
      selfLink: /apis/sources.knative.dev/v1alpha1/namespaces/default/apiserversources/testevents2
      uid: 1603d863-bb06-4d1c-b371-f580b4db99fa
    spec:
      mode: Resource
      resources:
      - apiVersion: v1
        controller: false
        controllerSelector:
          apiVersion: ""
          kind: ""
          name: ""
          uid: ""
        kind: Event
        labelSelector: {}
      serviceAccountName: events-sa
      sink:
        ref:
          apiVersion: eventing.knative.dev/v1
          kind: Broker
          name: default

検証

Kubernetes イベントが Knative に送信されていることを確認するには、メッセージダンパー機能ログを確認します。

  1. 以下のコマンドを入力して Pod を取得します。

    $ oc get pods
  2. 以下のコマンドを入力して、Pod のメッセージダンパー機能ログを表示します。

    $ oc logs $(oc get pod -o name | grep event-display) -c user-container

    出力例

    ☁️  cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.apiserver.resource.update
      datacontenttype: application/json
      ...
    Data,
      {
        "apiVersion": "v1",
        "involvedObject": {
          "apiVersion": "v1",
          "fieldPath": "spec.containers{hello-node}",
          "kind": "Pod",
          "name": "hello-node",
          "namespace": "default",
           .....
        },
        "kind": "Event",
        "message": "Started container",
        "metadata": {
          "name": "hello-node.159d7608e3a3572c",
          "namespace": "default",
          ....
        },
        "reason": "Started",
        ...
      }

API サーバーソースの削除

  1. トリガーを削除します。

    $ oc delete -f trigger.yaml
  2. イベントソースを削除します。

    $ oc delete -f k8s-events.yaml
  3. サービスアカウント、クラスターロール、およびクラスターバインディングを削除します。

    $ oc delete -f authentication.yaml

5.2.4. ping ソースの作成

ping ソースは、一定のペイロードを使用して ping イベントをイベントコンシューマーに定期的に送信するために使用されるイベントソースです。ping ソースを使用すると、タイマーと同様にイベントの送信をスケジュールできます。

5.2.4.1. Web コンソールを使用した ping ソースの作成

Knative Eventing がクラスターにインストールされると、Web コンソールを使用して ping ソースを作成できます。OpenShift Container Platform Web コンソールを使用すると、イベントソースを作成するための合理的で直感的なユーザーインターフェイスが提供されます。

前提条件

  • OpenShift Container Platform Web コンソールにログインしている。
  • OpenShift Serverless Operator、Knative Serving、および Knative Eventing がクラスターにインストールされている。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。

手順

  1. PingSource が機能していることを確認するには、受信メッセージをサービスのログにダンプする単純な Knative サービスを作成します。

    1. Developer パースペクティブで、+Add YAML に移動します。
    2. サンプル YAML をコピーします。

      apiVersion: serving.knative.dev/v1
      kind: Service
      metadata:
        name: event-display
      spec:
        template:
          spec:
            containers:
              - image: quay.io/openshift-knative/knative-eventing-sources-event-display:latest
    3. Create をクリックします。
  2. 直前の手順で作成したサービスと同じ namespace、またはイベントの送信先となる他のシンクと同じ namespace に ping ソースを作成します。

    1. Developer パースペクティブで、+Add Event Source に移動します。Event Sources ページが表示されます。
    2. オプション: イベントソースに複数のプロバイダーがある場合は、Providers 一覧から必要なプロバイダーを選択し、プロバイダーから利用可能なイベントソースをフィルターします。
    3. Ping Source を選択してから Create Event Source をクリックします。Create Event Source ページが表示されます。

      注記

      Form view または YAML view を使用して PingSource 設定を設定し、これらのビューを切り換えることができます。ビューの切り替え時に、データは永続化されます。

    4. Schedule の値を入力します。この例では、値は */2 * * * * であり、2 分ごとにメッセージを送信する PingSource を作成します。
    5. オプション: Data の値を入力できます。これはメッセージのペイロードです。
    6. Sink を選択します。これは Resource または URI のいずれかになります。この例では、直前の手順で作成された event-display サービスが Resources シンクとして使用されます。
    7. Create をクリックします。

検証

Topology ページを表示して、ping ソースが作成され、シンクに接続されていることを確認できます。

  1. Developer パースペクティブで、Topology に移動します。
  2. ping ソースおよびシンクを表示します。

    Topology ビューでの ping ソースおよびサービスの表示

ping ソースの削除

  1. Topology ビューに移動します。
  2. API サーバーソースを右クリックし、Delete Ping Source を選択します。

5.2.4.2. Knative CLI を使用した ping ソースの作成

kn source ping create コマンドを使用し、Knative (kn) CLI を使用して ping ソースを作成できます。イベントソースを作成するために Knative CLI を使用すると、YAML ファイルを直接修正するよりも合理的で直感的なユーザーインターフェイスが得られます。

前提条件

  • OpenShift Serverless Operator、Knative Serving、および Knative Eventing がクラスターにインストールされている。
  • Knative (kn) CLI をインストールしている。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
  • オプション: この手順の検証手順を使用する場合は、OpenShift CLI (oc) をインストールします。

手順

  1. ping ソースが機能していることを確認するには、受信メッセージをサービスのログにダンプする単純な Knative サービスを作成します。

    $ kn service create event-display \
        --image quay.io/openshift-knative/knative-eventing-sources-event-display:latest
  2. 要求する必要のある ping イベントのセットごとに、PingSource をイベントコンシューマーと同じ namespace に作成します。

    $ kn source ping create test-ping-source \
        --schedule "*/2 * * * *" \
        --data '{"message": "Hello world!"}' \
        --sink ksvc:event-display
  3. 以下のコマンドを入力し、出力を検査して、コントローラーが正しくマップされていることを確認します。

    $ kn source ping describe test-ping-source

    出力例

    Name:         test-ping-source
    Namespace:    default
    Annotations:  sources.knative.dev/creator=developer, sources.knative.dev/lastModifier=developer
    Age:          15s
    Schedule:     */2 * * * *
    Data:         {"message": "Hello world!"}
    
    Sink:
      Name:       event-display
      Namespace:  default
      Resource:   Service (serving.knative.dev/v1)
    
    Conditions:
      OK TYPE                 AGE REASON
      ++ Ready                 8s
      ++ Deployed              8s
      ++ SinkProvided         15s
      ++ ValidSchedule        15s
      ++ EventTypeProvided    15s
      ++ ResourcesCorrect     15s

検証

シンク Pod のログを確認して、Kubernetes イベントが Knative イベントに送信されていることを確認できます。

デフォルトで、Knative サービスは、トラフィックが 60 秒以内に受信されない場合に Pod を終了します。本書の例では、新たに作成される Pod で各メッセージが確認されるように 2 分ごとにメッセージを送信する ping ソースを作成します。

  1. 作成された新規 Pod を監視します。

    $ watch oc get pods
  2. Ctrl+C を使用して Pod の監視をキャンセルし、作成された Pod のログを確認します。

    $ oc logs $(oc get pod -o name | grep event-display) -c user-container

    出力例

    ☁️  cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.sources.ping
      source: /apis/v1/namespaces/default/pingsources/test-ping-source
      id: 99e4f4f6-08ff-4bff-acf1-47f61ded68c9
      time: 2020-04-07T16:16:00.000601161Z
      datacontenttype: application/json
    Data,
      {
        "message": "Hello world!"
      }

ping ソースの削除

  • ping ソースを削除します。

    $ kn delete pingsources.sources.knative.dev <ping_source_name>
5.2.4.2.1. Knative CLI シンクフラグ

Knative (kn) CLI を使用してイベントソースを作成する場合、--sink フラグを使用して、イベントがリソースから送信されるシンクを指定できます。シンクは、他のリソースから受信イベントを受信できる、アドレス指定可能または呼び出し可能な任意のリソースです。

以下の例では、サービスの http://event-display.svc.cluster.local をシンクとして使用するシンクバインディングを作成します。

シンクフラグを使用したコマンドの例

$ kn source binding create bind-heartbeat \
  --namespace sinkbinding-example \
  --subject "Job:batch/v1:app=heartbeat-cron" \
  --sink http://event-display.svc.cluster.local \ 1
  --ce-override "sink=bound"

1
http://event-display.svc.cluster.localsvc は、シンクが Knative サービスであることを判別します。他のデフォルトのシンクの接頭辞には、channel および broker が含まれます。

5.2.4.3. YAML を使用した ping ソースの作成

YAML ファイルを使用して Knative リソースを作成する場合、宣言的 API を使用するため、再現性の高い方法でイベントソースを宣言的に記述することができます。YAML を使用してサーバーレス ping を作成するには、PingSource オブジェクトを定義する YAML ファイルを作成し、oc apply を使用してこれを適用する必要があります。

PingSource オブジェクトの例

apiVersion: sources.knative.dev/v1
kind: PingSource
metadata:
  name: test-ping-source
spec:
  schedule: "*/2 * * * *" 1
  data: '{"message": "Hello world!"}' 2
  sink: 3
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: event-display

1
CRON 式 を使用して指定されるイベントのスケジュール。
2
JSON でエンコードされたデータ文字列として表現されるイベントメッセージの本体。
3
これらはイベントコンシューマーの詳細です。この例では、event-display という名前の Knative サービスを使用しています。

前提条件

  • OpenShift Serverless Operator、Knative Serving、および Knative Eventing がクラスターにインストールされている。
  • OpenShift CLI (oc) をインストールしている。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。

手順

  1. ping ソースが機能していることを確認するには、受信メッセージをサービスのログにダンプする単純な Knative サービスを作成します。

    1. サービス YAML ファイルを作成します。

      apiVersion: serving.knative.dev/v1
      kind: Service
      metadata:
        name: event-display
      spec:
        template:
          spec:
            containers:
              - image: quay.io/openshift-knative/knative-eventing-sources-event-display:latest
    2. サービスを作成します。

      $ oc apply -f <filename>
  2. 要求する必要のある ping イベントのセットごとに、ping ソースをイベントコンシューマーと同じ namespace に作成します。

    1. ping ソースの YAML ファイルを作成します。

      apiVersion: sources.knative.dev/v1
      kind: PingSource
      metadata:
        name: test-ping-source
      spec:
        schedule: "*/2 * * * *"
        data: '{"message": "Hello world!"}'
        sink:
          ref:
            apiVersion: serving.knative.dev/v1
            kind: Service
            name: event-display
    2. ping ソースを作成します。

      $ oc apply -f <filename>
  3. 以下のコマンドを入力し、コントローラーが正しくマップされていることを確認します。

    $ oc get pingsource.sources.knative.dev <ping_source_name> -oyaml

    出力例

    apiVersion: sources.knative.dev/v1
    kind: PingSource
    metadata:
      annotations:
        sources.knative.dev/creator: developer
        sources.knative.dev/lastModifier: developer
      creationTimestamp: "2020-04-07T16:11:14Z"
      generation: 1
      name: test-ping-source
      namespace: default
      resourceVersion: "55257"
      selfLink: /apis/sources.knative.dev/v1/namespaces/default/pingsources/test-ping-source
      uid: 3d80d50b-f8c7-4c1b-99f7-3ec00e0a8164
    spec:
      data: '{ value: "hello" }'
      schedule: '*/2 * * * *'
      sink:
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: event-display
          namespace: default

検証

シンク Pod のログを確認して、Kubernetes イベントが Knative イベントに送信されていることを確認できます。

デフォルトで、Knative サービスは、トラフィックが 60 秒以内に受信されない場合に Pod を終了します。本書の例では、新たに作成される Pod で各メッセージが確認されるように 2 分ごとにメッセージを送信する PingSource を作成します。

  1. 作成された新規 Pod を監視します。

    $ watch oc get pods
  2. Ctrl+C を使用して Pod の監視をキャンセルし、作成された Pod のログを確認します。

    $ oc logs $(oc get pod -o name | grep event-display) -c user-container

    出力例

    ☁️  cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.sources.ping
      source: /apis/v1/namespaces/default/pingsources/test-ping-source
      id: 042ff529-240e-45ee-b40c-3a908129853e
      time: 2020-04-07T16:22:00.000791674Z
      datacontenttype: application/json
    Data,
      {
        "message": "Hello world!"
      }

ping ソースの削除

  • ping ソースを削除します。

    $ oc delete -f <filename>

    コマンドの例

    $ oc delete -f ping-source.yaml

5.2.5. Kafka ソース

Apache Kafka クラスターからイベントを読み取り、これらのイベントをシンクに渡す Kafka ソースを作成できます。Kafka ソースを作成するには、OpenShift Container Platform Web コンソールの Knative (kn)CLI を使用するか、KafkaSource オブジェクトを YAML ファイルとして直接作成し、OpenShift CLI (oc) を使用して適用します。

5.2.5.1. Web コンソールを使用した Kafka イベントソースの作成

Knative Kafka をクラスターにインストールした後、Web コンソールを使用して Kafka ソースを作成できます。OpenShift Container Platform Web コンソールを使用すると、Kafka ソースを作成するための合理的で直感的なユーザーインターフェイスが提供されます。

前提条件

  • OpenShift Serverless Operator、Knative Serving、および KnativeKafka カスタムリソースがクラスターにインストールされている。
  • Web コンソールにログインしている。
  • インポートする Kafka メッセージを生成する Red Hat AMQ Streams (Kafka) クラスターにアクセスできる。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。

手順

  1. Developer パースペクティブで、+Add ページに移動し、Event Source を選択します。
  2. Event Sources ページで、Type セクションの Kafka Source を選択します。
  3. Kafka Source 設定を設定します。

    1. ブートストラップサーバー のコンマ区切りの一覧を追加します。
    2. トピック のコンマ区切りの一覧を追加します。
    3. コンシューマーグループ を追加します。
    4. 作成したサービスアカウントの Service Account Name を選択します。
    5. イベントソースの Sink を選択します。Sink は、チャネル、ブローカー、またはサービスなどの Resource、または URI のいずれかになります。
    6. Kafka イベントソースの Name を入力します。
  4. Create をクリックします。

検証

Topology ページを表示して、Kafka イベントソースが作成され、シンクに接続されていることを確認できます。

  1. Developer パースペクティブで、Topology に移動します。
  2. Kafka イベントソースおよびシンクを表示します。

    Topology ビューでの Kafka ソースおよびサービスの表示

5.2.5.2. Knative CLI を使用した Kafka イベントソースの作成

kn source kafka create コマンドを使用し、Knative (kn) CLI を使用して Kafka ソースを作成できます。イベントソースを作成するために Knative CLI を使用すると、YAML ファイルを直接修正するよりも合理的で直感的なユーザーインターフェイスが得られます。

前提条件

  • OpenShift Serverless Operator、Knative Eventing、Knative Serving、および KnativeKafka カスタムリソース (CR) がクラスターにインストールされている。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
  • インポートする Kafka メッセージを生成する Red Hat AMQ Streams (Kafka) クラスターにアクセスできる。
  • Knative (kn) CLI をインストールしている。
  • オプション: この手順で検証ステップを使用する場合は、OpenShift CLI (oc) をインストールします。

手順

  1. Kafka イベントソースが機能していることを確認するには、受信メッセージをサービスのログにダンプする Knative サービスを作成します。

    $ kn service create event-display \
        --image quay.io/openshift-knative/knative-eventing-sources-event-display
  2. KafkaSource CR を作成します。

    $ kn source kafka create <kafka_source_name> \
        --servers <cluster_kafka_bootstrap>.kafka.svc:9092 \
        --topics <topic_name> --consumergroup my-consumer-group \
        --sink event-display
    注記

    このコマンドのプレースホルダー値は、ソース名、ブートストラップサーバー、およびトピックの値に置き換えます。

    --servers--topics、および --consumergroup オプションは、Kafka クラスターへの接続パラメーターを指定します。--consumergroup オプションは任意です。

  3. オプション: 作成した KafkaSource CR の詳細を表示します。

    $ kn source kafka describe <kafka_source_name>

    出力例

    Name:              example-kafka-source
    Namespace:         kafka
    Age:               1h
    BootstrapServers:  example-cluster-kafka-bootstrap.kafka.svc:9092
    Topics:            example-topic
    ConsumerGroup:     example-consumer-group
    
    Sink:
      Name:       event-display
      Namespace:  default
      Resource:   Service (serving.knative.dev/v1)
    
    Conditions:
      OK TYPE            AGE REASON
      ++ Ready            1h
      ++ Deployed         1h
      ++ SinkProvided     1h

検証手順

  1. Kafka インスタンスをトリガーし、メッセージをトピックに送信します。

    $ oc -n kafka run kafka-producer \
        -ti --image=quay.io/strimzi/kafka:latest-kafka-2.7.0 --rm=true \
        --restart=Never -- bin/kafka-console-producer.sh \
        --broker-list <cluster_kafka_bootstrap>:9092 --topic my-topic

    プロンプトにメッセージを入力します。このコマンドは、以下を前提とします。

    • Kafka クラスターが kafka namespace にインストールされている。
    • KafkaSource オブジェクトは、my-topic トピックを使用するように設定されている。
  2. ログを表示して、メッセージが到達していることを確認します。

    $ oc logs $(oc get pod -o name | grep event-display) -c user-container

    出力例

    ☁️  cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.kafka.event
      source: /apis/v1/namespaces/default/kafkasources/example-kafka-source#example-topic
      subject: partition:46#0
      id: partition:46/offset:0
      time: 2021-03-10T11:21:49.4Z
    Extensions,
      traceparent: 00-161ff3815727d8755848ec01c866d1cd-7ff3916c44334678-00
    Data,
      Hello!

5.2.5.2.1. Knative CLI シンクフラグ

Knative (kn) CLI を使用してイベントソースを作成する場合、--sink フラグを使用して、イベントがリソースから送信されるシンクを指定できます。シンクは、他のリソースから受信イベントを受信できる、アドレス指定可能または呼び出し可能な任意のリソースです。

以下の例では、サービスの http://event-display.svc.cluster.local をシンクとして使用するシンクバインディングを作成します。

シンクフラグを使用したコマンドの例

$ kn source binding create bind-heartbeat \
  --namespace sinkbinding-example \
  --subject "Job:batch/v1:app=heartbeat-cron" \
  --sink http://event-display.svc.cluster.local \ 1
  --ce-override "sink=bound"

1
http://event-display.svc.cluster.localsvc は、シンクが Knative サービスであることを判別します。他のデフォルトのシンクの接頭辞には、channel および broker が含まれます。

5.2.5.3. YAML を使用した Kafka イベントソースの作成

YAML ファイルを使用して Knative リソースを作成する場合、宣言的 API を使用するため、再現性の高い方法でアプリケーションを宣言的に記述することができます。YAML を使用して Kafka ソースを作成するには、KafkaSource オブジェクトを定義する YAML ファイルを作成し、oc apply コマンドを使用してそれを適用する必要があります。

前提条件

  • OpenShift Serverless Operator、Knative Serving、および KnativeKafka カスタムリソースがクラスターにインストールされている。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
  • インポートする Kafka メッセージを生成する Red Hat AMQ Streams (Kafka) クラスターにアクセスできる。
  • OpenShift CLI (oc) をインストールしている。

手順

  1. KafkaSource オブジェクトを YAML ファイルとして作成します。

    apiVersion: sources.knative.dev/v1beta1
    kind: KafkaSource
    metadata:
      name: <source_name>
    spec:
      consumerGroup: <group_name> 1
      bootstrapServers:
      - <list_of_bootstrap_servers>
      topics:
      - <list_of_topics> 2
      sink:
      - <list_of_sinks> 3
    1
    コンシューマーグループは、同じグループ ID を使用し、トピックからデータを消費するコンシューマーのグループです。
    2
    トピックは、データの保存先を提供します。各トピックは、1 つまたは複数のパーティションに分割されます。
    3
    シンクは、イベントがソースから送信される場所を指定します。
    重要

    OpenShift Serverless 上の KafkaSource オブジェクトの API の v1beta1 バージョンのみがサポートされます。非推奨となった v1alpha1 バージョンの API は使用しないでください。

    KafkaSource オブジェクトの例

    apiVersion: sources.knative.dev/v1beta1
    kind: KafkaSource
    metadata:
      name: kafka-source
    spec:
      consumerGroup: knative-group
      bootstrapServers:
      - my-cluster-kafka-bootstrap.kafka:9092
      topics:
      - knative-demo-topic
      sink:
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: event-display

  2. KafkaSource YAML ファイルを適用します。

    $ oc apply -f <filename>

検証

  • 以下のコマンドを入力して、Kafka イベントソースが作成されたことを確認します。

    $ oc get pods

    出力例

    NAME                                    READY     STATUS    RESTARTS   AGE
    kafkasource-kafka-source-5ca0248f-...   1/1       Running   0          13m

5.2.5.4. Kafka ソースの SASL 認証の設定

Simple Authentication and Security Layer (SASL) は、Apache Kafka が認証に使用します。クラスターで SASL 認証を使用する場合、ユーザーは Kafka クラスターと通信するために Knative に認証情報を提供する必要があります。そうしないと、イベントを生成または消費できません。

前提条件

  • OpenShift Container Platform でクラスターまたは専用の管理者パーミッションを持っている。
  • OpenShift Serverless Operator、Knative Eventing、および KnativeKafka CR は、OpenShift Container Platform クラスターにインストールされている。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
  • Kafka クラスターのユーザー名およびパスワードがある。
  • 使用する SASL メカニズムを選択している (例: PLAINSCRAM-SHA-256、または SCRAM-SHA-512)。
  • TLS が有効にされている場合、Kafka クラスターの ca.crt 証明書ファイルも必要になります。
  • OpenShift (oc) CLI がインストールされている。

手順

  1. 選択された namespace にシークレットとして証明書ファイルを作成します。

    $ oc create secret -n <namespace> generic <kafka_auth_secret> \
      --from-file=ca.crt=caroot.pem \
      --from-literal=password="SecretPassword" \
      --from-literal=saslType="SCRAM-SHA-512" \ 1
      --from-literal=user="my-sasl-user"
    1
    SASL タイプは PLAINSCRAM-SHA-256、または SCRAM-SHA-512 です。
  2. Kafka ソースを作成または変更して、次の spec 設定が含まれるようにします。

    apiVersion: sources.knative.dev/v1beta1
    kind: KafkaSource
    metadata:
      name: example-source
    spec:
    ...
      net:
        sasl:
          enable: true
          user:
            secretKeyRef:
              name: <kafka_auth_secret>
              key: user
          password:
            secretKeyRef:
              name: <kafka_auth_secret>
              key: password
          type:
            secretKeyRef:
              name: <kafka_auth_secret>
              key: saslType
        tls:
          enable: true
          caCert: 1
            secretKeyRef:
              name: <kafka_auth_secret>
              key: ca.crt
    ...
    1
    Red Hat OpenShift Streams for Apache Kafka などのパブリッククラウド Kafka サービスを使用している場合は、caCert 仕様は必要ありません。

5.2.6. カスタムイベントソース

Knative に含まれていないイベントプロデューサーや、CloudEvent 形式ではないイベントを生成するプロデューサーからイベントを Ingress する必要がある場合は、カスタムイベントソースを使用してこれを実行できます。カスタムイベントソースは、次のいずれかの方法で作成できます。

  • シンクバインディングを作成して、PodSpecable オブジェクトをイベントソースとして使用します。
  • コンテナーソースを作成して、コンテナーをイベントソースとして使用します。

5.2.6.1. シンクバインディング

SinkBinding オブジェクトは、イベント生成を配信アドレス指定から切り離すことをサポートします。シンクバインディングは、イベントプロデューサー をイベントコンシューマーまたは シンク に接続するために使用されます。イベントプロデューサーは、PodSpec テンプレートを組み込む Kubernetes リソースであり、イベントを生成します。シンクは、イベントを受信できるアドレス指定可能な Kubernetes オブジェクトです。

SinkBinding オブジェクトは、環境変数をシンクの PodTemplateSpec に挿入します。つまり、アプリケーションコードが Kubernetes API と直接対話してイベントの宛先を見つける必要はありません。これらの環境変数は以下のとおりです。

K_SINK
解決されたシンクの URL。
K_CE_OVERRIDES
アウトバウンドイベントの上書きを指定する JSON オブジェクト。
注記

現在、SinkBinding オブジェクトはサービスのカスタムリビジョン名をサポートしません。

5.2.6.1.1. YAML を使用したシンクバインディングの作成

YAML ファイルを使用して Knative リソースを作成する場合、宣言的 API を使用するため、再現性の高い方法でイベントソースを宣言的に記述することができます。YAML を使用してシンクバインディングを作成するには、SinkBinding オブジェクトを定義する YAML ファイルを作成し、oc apply コマンドを使用してそれを適用する必要があります。

前提条件

  • OpenShift Serverless Operator、Knative Serving、および Knative Eventing がクラスターにインストールされている。
  • OpenShift CLI (oc) をインストールしている。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。

手順

  1. シンクバインディングが正しく設定されていることを確認するには、受信メッセージをダンプする Knative イベント表示サービスまたはイベントシンクを作成します。

    1. サービス YAML ファイルを作成します。

      サービス YAML ファイルの例

      apiVersion: serving.knative.dev/v1
      kind: Service
      metadata:
        name: event-display
      spec:
        template:
          spec:
            containers:
              - image: quay.io/openshift-knative/knative-eventing-sources-event-display:latest

    2. サービスを作成します。

      $ oc apply -f <filename>
  2. イベントをサービスに転送するシンクバインディングインスタンスを作成します。

    1. シンクバインディング YAML ファイルを作成します。

      サービス YAML ファイルの例

      apiVersion: sources.knative.dev/v1alpha1
      kind: SinkBinding
      metadata:
        name: bind-heartbeat
      spec:
        subject:
          apiVersion: batch/v1
          kind: Job 1
          selector:
            matchLabels:
              app: heartbeat-cron
      
        sink:
          ref:
            apiVersion: serving.knative.dev/v1
            kind: Service
            name: event-display

      1
      この例では、ラベル app: heartbeat-cron を指定したジョブがイベントシンクにバインドされます。
    2. シンクバインディングを作成します。

      $ oc apply -f <filename>
  3. CronJob オブジェクトを作成します。

    1. cron ジョブの YAML ファイルを作成します。

      cron ジョブの YAML ファイルの例

      apiVersion: batch/v1
      kind: CronJob
      metadata:
        name: heartbeat-cron
      spec:
        # Run every minute
        schedule: "* * * * *"
        jobTemplate:
          metadata:
            labels:
              app: heartbeat-cron
              bindings.knative.dev/include: "true"
          spec:
            template:
              spec:
                restartPolicy: Never
                containers:
                  - name: single-heartbeat
                    image: quay.io/openshift-knative/heartbeats:latest
                    args:
                      - --period=1
                    env:
                      - name: ONE_SHOT
                        value: "true"
                      - name: POD_NAME
                        valueFrom:
                          fieldRef:
                            fieldPath: metadata.name
                      - name: POD_NAMESPACE
                        valueFrom:
                          fieldRef:
                            fieldPath: metadata.namespace

      重要

      シンクバインディングを使用するには、bindings.knative.dev/include=true ラベルを Knative リソースに手動で追加する必要があります。

      たとえば、このラベルを CronJob インスタンスに追加するには、以下の行を Job リソースの YAML 定義に追加します。

        jobTemplate:
          metadata:
            labels:
              app: heartbeat-cron
              bindings.knative.dev/include: "true"
    2. cron ジョブを作成します。

      $ oc apply -f <filename>
  4. 以下のコマンドを入力し、出力を検査して、コントローラーが正しくマップされていることを確認します。

    $ oc get sinkbindings.sources.knative.dev bind-heartbeat -oyaml

    出力例

    spec:
      sink:
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: event-display
          namespace: default
      subject:
        apiVersion: batch/v1
        kind: Job
        namespace: default
        selector:
          matchLabels:
            app: heartbeat-cron

検証

メッセージダンパー機能ログを確認して、Kubernetes イベントが Knative イベントシンクに送信されていることを確認できます。

  1. コマンドを入力します。

    $ oc get pods
  2. コマンドを入力します。

    $ oc logs $(oc get pod -o name | grep event-display) -c user-container

    出力例

    ☁️  cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.eventing.samples.heartbeat
      source: https://knative.dev/eventing-contrib/cmd/heartbeats/#event-test/mypod
      id: 2b72d7bf-c38f-4a98-a433-608fbcdd2596
      time: 2019-10-18T15:23:20.809775386Z
      contenttype: application/json
    Extensions,
      beats: true
      heart: yes
      the: 42
    Data,
      {
        "id": 1,
        "label": ""
      }

5.2.6.1.2. Knative CLI を使用したシンクバインディングの作成

kn source binding create コマンドを使用し、Knative (kn) を使用してシンクバインディングを作成できます。イベントソースを作成するために Knative CLI を使用すると、YAML ファイルを直接修正するよりも合理的で直感的なユーザーインターフェイスが得られます。

前提条件

  • OpenShift Serverless Operator、Knative Serving、および Knative Eventing がクラスターにインストールされている。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
  • Knative (kn) CLI をインストールしている。
  • OpenShift CLI (oc) をインストールしている。
注記

以下の手順では、YAML ファイルを作成する必要があります。

サンプルで使用されたもので YAML ファイルの名前を変更する場合は、必ず対応する CLI コマンドを更新する必要があります。

手順

  1. シンクバインディングが正しく設定されていることを確認するには、受信メッセージをダンプする Knative イベント表示サービスまたはイベントシンクを作成します。

    $ kn service create event-display --image quay.io/openshift-knative/knative-eventing-sources-event-display:latest
  2. イベントをサービスに転送するシンクバインディングインスタンスを作成します。

    $ kn source binding create bind-heartbeat --subject Job:batch/v1:app=heartbeat-cron --sink ksvc:event-display
  3. CronJob オブジェクトを作成します。

    1. cron ジョブの YAML ファイルを作成します。

      cron ジョブの YAML ファイルの例

      apiVersion: batch/v1
      kind: CronJob
      metadata:
        name: heartbeat-cron
      spec:
        # Run every minute
        schedule: "* * * * *"
        jobTemplate:
          metadata:
            labels:
              app: heartbeat-cron
              bindings.knative.dev/include: "true"
          spec:
            template:
              spec:
                restartPolicy: Never
                containers:
                  - name: single-heartbeat
                    image: quay.io/openshift-knative/heartbeats:latest
                    args:
                      - --period=1
                    env:
                      - name: ONE_SHOT
                        value: "true"
                      - name: POD_NAME
                        valueFrom:
                          fieldRef:
                            fieldPath: metadata.name
                      - name: POD_NAMESPACE
                        valueFrom:
                          fieldRef:
                            fieldPath: metadata.namespace

      重要

      シンクバインディングを使用するには、bindings.knative.dev/include=true ラベルを Knative CR に手動で追加する必要があります。

      たとえば、このラベルを CronJob CR に追加するには、以下の行を Job CR の YAML 定義に追加します。

        jobTemplate:
          metadata:
            labels:
              app: heartbeat-cron
              bindings.knative.dev/include: "true"
    2. cron ジョブを作成します。

      $ oc apply -f <filename>
  4. 以下のコマンドを入力し、出力を検査して、コントローラーが正しくマップされていることを確認します。

    $ kn source binding describe bind-heartbeat

    出力例

    Name:         bind-heartbeat
    Namespace:    demo-2
    Annotations:  sources.knative.dev/creator=minikube-user, sources.knative.dev/lastModifier=minikub ...
    Age:          2m
    Subject:
      Resource:   job (batch/v1)
      Selector:
        app:      heartbeat-cron
    Sink:
      Name:       event-display
      Resource:   Service (serving.knative.dev/v1)
    
    Conditions:
      OK TYPE     AGE REASON
      ++ Ready     2m

検証

メッセージダンパー機能ログを確認して、Kubernetes イベントが Knative イベントシンクに送信されていることを確認できます。

  • 以下のコマンドを入力して、メッセージダンパー機能ログを表示します。

    $ oc get pods
    $ oc logs $(oc get pod -o name | grep event-display) -c user-container

    出力例

    ☁️  cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.eventing.samples.heartbeat
      source: https://knative.dev/eventing-contrib/cmd/heartbeats/#event-test/mypod
      id: 2b72d7bf-c38f-4a98-a433-608fbcdd2596
      time: 2019-10-18T15:23:20.809775386Z
      contenttype: application/json
    Extensions,
      beats: true
      heart: yes
      the: 42
    Data,
      {
        "id": 1,
        "label": ""
      }

5.2.6.1.2.1. Knative CLI シンクフラグ

Knative (kn) CLI を使用してイベントソースを作成する場合、--sink フラグを使用して、イベントがリソースから送信されるシンクを指定できます。シンクは、他のリソースから受信イベントを受信できる、アドレス指定可能または呼び出し可能な任意のリソースです。

以下の例では、サービスの http://event-display.svc.cluster.local をシンクとして使用するシンクバインディングを作成します。

シンクフラグを使用したコマンドの例

$ kn source binding create bind-heartbeat \
  --namespace sinkbinding-example \
  --subject "Job:batch/v1:app=heartbeat-cron" \
  --sink http://event-display.svc.cluster.local \ 1
  --ce-override "sink=bound"

1
http://event-display.svc.cluster.localsvc は、シンクが Knative サービスであることを判別します。他のデフォルトのシンクの接頭辞には、channel および broker が含まれます。
5.2.6.1.3. Web コンソールを使用したシンクバインディングの作成

Knative Eventing がクラスターにインストールされると、Web コンソールを使用して シンクバインディングを作成できます。OpenShift Container Platform Web コンソールを使用すると、イベントソースを作成するための合理的で直感的なユーザーインターフェイスが提供されます。

前提条件

  • OpenShift Container Platform Web コンソールにログインしている。
  • OpenShift Serverless Operator、Knative Serving、および Knative Eventing が OpenShift Container Platform クラスターにインストールされている。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。

手順

  1. シンクとして使用する Knative サービスを作成します。

    1. Developer パースペクティブで、+Add YAML に移動します。
    2. サンプル YAML をコピーします。

      apiVersion: serving.knative.dev/v1
      kind: Service
      metadata:
        name: event-display
      spec:
        template:
          spec:
            containers:
              - image: quay.io/openshift-knative/knative-eventing-sources-event-display:latest
    3. Create をクリックします。
  2. イベントソースとして使用される CronJob リソースを作成し、1 分ごとにイベントを送信します。

    1. Developer パースペクティブで、+Add YAML に移動します。
    2. サンプル YAML をコピーします。

      apiVersion: batch/v1
      kind: CronJob
      metadata:
        name: heartbeat-cron
      spec:
        # Run every minute
        schedule: "*/1 * * * *"
        jobTemplate:
          metadata:
            labels:
              app: heartbeat-cron
              bindings.knative.dev/include: true 1
          spec:
            template:
              spec:
                restartPolicy: Never
                containers:
                  - name: single-heartbeat
                    image: quay.io/openshift-knative/heartbeats
                    args:
                    - --period=1
                    env:
                      - name: ONE_SHOT
                        value: "true"
                      - name: POD_NAME
                        valueFrom:
                          fieldRef:
                            fieldPath: metadata.name
                      - name: POD_NAMESPACE
                        valueFrom:
                          fieldRef:
                            fieldPath: metadata.namespace
      1
      bindings.knative.dev/include: true ラベルを含めるようにしてください。OpenShift Serverless のデフォルトの namespace 選択動作は包含モードを使用します。
    3. Create をクリックします。
  3. 直前の手順で作成したサービスと同じ namespace、またはイベントの送信先となる他のシンクと同じ namespace にシンクバインディングを作成します。

    1. Developer パースペクティブで、+Add Event Source に移動します。Event Sources ページが表示されます。
    2. オプション: イベントソースに複数のプロバイダーがある場合は、Providers 一覧から必要なプロバイダーを選択し、プロバイダーから利用可能なイベントソースをフィルターします。
    3. Sink Binding を選択し、Create Event Source をクリックします。Create Event Source ページが表示されます。

      注記

      Form view または YAML view を使用して Sink Binding 設定を設定し、ビューを切り替えることができます。ビューの切り替え時に、データは永続化されます。

    4. apiVersion フィールドに batch/v1 を入力します。
    5. Kind フィールドに Job と入力します。

      注記

      CronJob の種類は OpenShift Serverless シンクバインディングで直接サポートされていないため、Kind フィールドは cron ジョブオブジェクト自体ではなく、cron ジョブで作成される Job オブジェクトをターゲットにする必要があります。

    6. Sink を選択します。これは Resource または URI のいずれかになります。この例では、直前の手順で作成された event-display サービスが Resources シンクとして使用されます。
    7. Match labels セクションで以下を実行します。

      1. Name フィールドに app と入力します。
      2. Value フィールドに heartbeat-cron と入力します。

        注記

        ラベルセレクターは、リソース名ではなくシンクバインディングで cron ジョブを使用する場合に必要になります。これは、cron ジョブで作成されたジョブには予測可能な名前がなく、名前に無作為に生成される文字列が含まれているためです。たとえば、hearthbeat-cron-1cc23f になります。

    8. Create をクリックします。

検証

Topology ページおよび Pod ログを表示して、シンクバインディング、シンク、および cron ジョブが正常に作成され、機能していることを確認できます。

  1. Developer パースペクティブで、Topology に移動します。
  2. シンクバインディング、シンク、およびハートビートの cron ジョブを表示します。

    Topology ビューでのシンクバインディングおよびサービスの表示
  3. シンクバインディングが追加されると、正常なジョブが cron ジョブによって登録されていることを確認します。つまり、シンクバインディングは cron ジョブで作成されたジョブが正常に再設定されることを意味します。
  4. event-display サービス Pod のログを参照し、ハートビート cron ジョブで生成されるイベントを表示します。
5.2.6.1.4. シンクバインディング参照

シンクバインディングを作成して、PodSpecable オブジェクトをイベントソースとして使用できます。SinkBinding オブジェクトを作成するときに、複数のパラメーターを設定できます。

SinkBinding オブジェクトは以下のパラメーターをサポートします。

フィールド説明必須またはオプション

apiVersion

API バージョンを指定します (例: sources.knative.dev/v1)。

必須

kind

このリソースオブジェクトを SinkBinding オブジェクトとして特定します。

必須

metadata

SinkBinding オブジェクトを一意に識別するメタデータを指定します。たとえば、name です。

必須

spec

この SinkBinding オブジェクトの設定情報を指定します。

必須

spec.sink

シンクとして使用する URI に解決するオブジェクトへの参照。

必須

spec.subject

ランタイムコントラクトがバインディング実装によって拡張されるリソースを参照します。

必須

spec.ceOverrides

上書きを定義して、シンクに送信されたイベントへの出力形式および変更を制御します。

任意

5.2.6.1.4.1. Subject パラメーター

Subject パラメーターは、ランタイムコントラクトがバインディング実装によって拡張されるリソースを参照します。Subject 定義に複数のフィールドを設定できます。

Subject 定義は、以下のフィールドをサポートします。

フィールド説明必須またはオプション

apiVersion

参照先の API バージョン。

必須

kind

参照先の種類。

必須

namespace

参照先の namespace。省略されている場合、デフォルトはオブジェクトの namespace に設定されます。

任意

name

参照先の名前。

selector を設定する場合は、使用しないでください。

selector

参照先のセレクター。

name を設定する場合は、使用しないでください。

selector.matchExpressions

ラベルセレクターの要件の一覧です。

matchExpressions または matchLabels のいずれかのみを使用します。

selector.matchExpressions.key

セレクターが適用されるラベルキー。

matchExpressions を使用する場合に必須です。

selector.matchExpressions.operator

キーと値のセットの関係を表します。有効な演算子は InNotInExists、および DoesNotExist です。

matchExpressions を使用する場合に必須です。

selector.matchExpressions.values

文字列値の配列。operator パラメーターの値が In または NotIn の場合、値配列が空でないようにする必要があります。operator パラメーターの値が Exists または DoesNotExist の場合、値の配列は空である必要があります。この配列は、ストラテジーに基づいたマージパッチの適用中に置き換えられます。

matchExpressions を使用する場合に必須です。

selector.matchLabels

キーと値のペアのマップ。matchLabels マップの各キーと値のペアは matchExpressions の要素と同じです。ここで、キーフィールドは matchLabels.<key> で、operatorIn で、values の配列には matchLabels.<value> のみが含まれます。

matchExpressions または matchLabels のいずれかのみを使用します。

サブジェクトパラメーターの例

以下の YAML の場合は、default namespace の mysubject という名前の Deployment オブジェクトが選択されます。

apiVersion: sources.knative.dev/v1
kind: SinkBinding
metadata:
  name: bind-heartbeat
spec:
  subject:
    apiVersion: apps/v1
    kind: Deployment
    namespace: default
    name: mysubject
  ...

以下の YAML の場合、default namespace にラベル working=example が設定された Job オブジェクトが選択されます。

apiVersion: sources.knative.dev/v1
kind: SinkBinding
metadata:
  name: bind-heartbeat
spec:
  subject:
    apiVersion: batch/v1
    kind: Job
    namespace: default
    selector:
      matchLabels:
        working: example
  ...

以下の YAML の場合、default namespace にラベル working=example または working=sample が含まれる Pod オブジェクトが選択されます。

apiVersion: sources.knative.dev/v1
kind: SinkBinding
metadata:
  name: bind-heartbeat
spec:
  subject:
    apiVersion: v1
    kind: Pod
    namespace: default
    selector:
      - matchExpression:
        key: working
        operator: In
        values:
          - example
          - sample
  ...
5.2.6.1.4.2. CloudEvent オーバーライド

ceOverrides 定義は、シンクに送信される CloudEvent の出力形式および変更を制御するオーバーライドを提供します。ceOverrides 定義に複数のフィールドを設定できます。

ceOverrides の定義は、以下のフィールドをサポートします。

フィールド説明必須またはオプション

extensions

アウトバウンドイベントで追加または上書きされる属性を指定します。各 extensions のキーと値のペアは、属性拡張機能としてイベントに個別に設定されます。

任意

注記

拡張子として許可されるのは、有効な CloudEvent 属性名のみです。拡張機能オーバーライド設定から仕様定義属性を設定することはできません。たとえば、type 属性を変更することはできません。

CloudEvent オーバーライドの例

apiVersion: sources.knative.dev/v1
kind: SinkBinding
metadata:
  name: bind-heartbeat
spec:
  ...
  ceOverrides:
    extensions:
      extra: this is an extra attribute
      additional: 42

これにより、subjectK_CE_OVERRIDES 環境変数が設定されます。

出力例

{ "extensions": { "extra": "this is an extra attribute", "additional": "42" } }

5.2.6.1.4.3. include ラベル

シンクバインディングを使用するには、bindings.knative.dev/include: "true" ラベルをリソースまたはリソースが含まれる namespace のいずれかに割り当てる必要があります。リソース定義にラベルが含まれていない場合には、クラスター管理者は以下を実行してこれを namespace に割り当てることができます。

$ oc label namespace <namespace> bindings.knative.dev/include=true

5.2.6.2. コンテナーソース

コンテナーソースは、イベントを生成し、イベントをシンクに送信するコンテナーイメージを作成します。コンテナーソースを使用して、イメージ URI を使用するコンテナーイメージおよび ContainerSource オブジェクトを作成して、カスタムイベントソースを作成できます。

5.2.6.2.1. コンテナーイメージを作成するためのガイドライン

コンテナーソースコントローラーには、K_SINK および K_CE_OVERRIDES の 2 つの環境変数が注入されます。これらの変数は、それぞれ sink および ceOverrides 仕様から解決されます。イベントは、K_SINK 環境変数で指定されたシンク URI に送信されます。メッセージは、CloudEvent HTTP 形式を使用して POST として送信する必要があります。

コンテナーイメージの例

以下は、ハートビートコンテナーイメージの例になります。

package main

import (
	"context"
	"encoding/json"
	"flag"
	"fmt"
	"log"
	"os"
	"strconv"
	"time"

	duckv1 "knative.dev/pkg/apis/duck/v1"

	cloudevents "github.com/cloudevents/sdk-go/v2"
	"github.com/kelseyhightower/envconfig"
)

type Heartbeat struct {
	Sequence int    `json:"id"`
	Label    string `json:"label"`
}

var (
	eventSource string
	eventType   string
	sink        string
	label       string
	periodStr   string
)

func init() {
	flag.StringVar(&eventSource, "eventSource", "", "the event-source (CloudEvents)")
	flag.StringVar(&eventType, "eventType", "dev.knative.eventing.samples.heartbeat", "the event-type (CloudEvents)")
	flag.StringVar(&sink, "sink", "", "the host url to heartbeat to")
	flag.StringVar(&label, "label", "", "a special label")
	flag.StringVar(&periodStr, "period", "5", "the number of seconds between heartbeats")
}

type envConfig struct {
	// Sink URL where to send heartbeat cloud events
	Sink string `envconfig:"K_SINK"`

	// CEOverrides are the CloudEvents overrides to be applied to the outbound event.
	CEOverrides string `envconfig:"K_CE_OVERRIDES"`

	// Name of this pod.
	Name string `envconfig:"POD_NAME" required:"true"`

	// Namespace this pod exists in.
	Namespace string `envconfig:"POD_NAMESPACE" required:"true"`

	// Whether to run continuously or exit.
	OneShot bool `envconfig:"ONE_SHOT" default:"false"`
}

func main() {
	flag.Parse()

	var env envConfig
	if err := envconfig.Process("", &env); err != nil {
		log.Printf("[ERROR] Failed to process env var: %s", err)
		os.Exit(1)
	}

	if env.Sink != "" {
		sink = env.Sink
	}

	var ceOverrides *duckv1.CloudEventOverrides
	if len(env.CEOverrides) > 0 {
		overrides := duckv1.CloudEventOverrides{}
		err := json.Unmarshal([]byte(env.CEOverrides), &overrides)
		if err != nil {
			log.Printf("[ERROR] Unparseable CloudEvents overrides %s: %v", env.CEOverrides, err)
			os.Exit(1)
		}
		ceOverrides = &overrides
	}

	p, err := cloudevents.NewHTTP(cloudevents.WithTarget(sink))
	if err != nil {
		log.Fatalf("failed to create http protocol: %s", err.Error())
	}

	c, err := cloudevents.NewClient(p, cloudevents.WithUUIDs(), cloudevents.WithTimeNow())
	if err != nil {
		log.Fatalf("failed to create client: %s", err.Error())
	}

	var period time.Duration
	if p, err := strconv.Atoi(periodStr); err != nil {
		period = time.Duration(5) * time.Second
	} else {
		period = time.Duration(p) * time.Second
	}

	if eventSource == "" {
		eventSource = fmt.Sprintf("https://knative.dev/eventing-contrib/cmd/heartbeats/#%s/%s", env.Namespace, env.Name)
		log.Printf("Heartbeats Source: %s", eventSource)
	}

	if len(label) > 0 && label[0] == '"' {
		label, _ = strconv.Unquote(label)
	}
	hb := &Heartbeat{
		Sequence: 0,
		Label:    label,
	}
	ticker := time.NewTicker(period)
	for {
		hb.Sequence++

		event := cloudevents.NewEvent("1.0")
		event.SetType(eventType)
		event.SetSource(eventSource)
		event.SetExtension("the", 42)
		event.SetExtension("heart", "yes")
		event.SetExtension("beats", true)

		if ceOverrides != nil && ceOverrides.Extensions != nil {
			for n, v := range ceOverrides.Extensions {
				event.SetExtension(n, v)
			}
		}

		if err := event.SetData(cloudevents.ApplicationJSON, hb); err != nil {
			log.Printf("failed to set cloudevents data: %s", err.Error())
		}

		log.Printf("sending cloudevent to %s", sink)
		if res := c.Send(context.Background(), event); !cloudevents.IsACK(res) {
			log.Printf("failed to send cloudevent: %v", res)
		}

		if env.OneShot {
			return
		}

		// Wait for next tick
		<-ticker.C
	}
}

以下は、以前のハートビートコンテナーイメージを参照するコンテナーソースの例です。

apiVersion: sources.knative.dev/v1
kind: ContainerSource
metadata:
  name: test-heartbeats
spec:
  template:
    spec:
      containers:
        # This corresponds to a heartbeats image URI that you have built and published
        - image: gcr.io/knative-releases/knative.dev/eventing/cmd/heartbeats
          name: heartbeats
          args:
            - --period=1
          env:
            - name: POD_NAME
              value: "example-pod"
            - name: POD_NAMESPACE
              value: "event-test"
  sink:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: example-service
...
5.2.6.2.2. Knative CLI を使用したコンテナーソースの作成および管理

kn source container コマンドを使用し、Knative (kn) CLI を使用してコンテナーソースを作成および管理できます。イベントソースを作成するために Knative CLI を使用すると、YAML ファイルを直接修正するよりも合理的で直感的なユーザーインターフェイスが得られます。

コンテナーソースを作成します。

$ kn source container create <container_source_name> --image <image_uri> --sink <sink>

コンテナーソースの削除

$ kn source container delete <container_source_name>

コンテナーソースを記述します。

$ kn source container describe <container_source_name>

既存のコンテナーソースを一覧表示

$ kn source container list

既存のコンテナーソースを YAML 形式で一覧表示

$ kn source container list -o yaml

コンテナーソースを更新します。

このコマンドにより、既存のコンテナーソースのイメージ URI が更新されます。

$ kn source container update <container_source_name> --image <image_uri>
5.2.6.2.3. Web コンソールを使用したコンテナーソースの作成

Knative Eventing がクラスターにインストールされると、Web コンソールを使用してコンテナーソースを作成できます。OpenShift Container Platform Web コンソールを使用すると、イベントソースを作成するための合理的で直感的なユーザーインターフェイスが提供されます。

前提条件

  • OpenShift Container Platform Web コンソールにログインしている。
  • OpenShift Serverless Operator、Knative Serving、および Knative Eventing が OpenShift Container Platform クラスターにインストールされている。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。

手順

  1. Developer パースペクティブで、+Add Event Source に移動します。Event Sources ページが表示されます。
  2. Container Source を選択してから Create Event Source をクリックします。Create Event Source ページが表示されます。
  3. Form view または YAML view を使用して、Container Source 設定を設定します。

    注記

    Form viewYAML view 間で切り換えることができます。ビューの切り替え時に、データは永続化されます。

    1. Image フィールドに、コンテナーソースが作成したコンテナーで実行するイメージの URI を入力します。
    2. Name フィールドにイメージの名前を入力します。
    3. オプション: Arguments フィールドで、コンテナーに渡す引数を入力します。
    4. オプション: Environment variables フィールドで、コンテナーに設定する環境変数を追加します。
    5. Sink セクションで、コンテナーソースからのイベントがルーティングされるシンクを追加します。Form ビューを使用している場合は、以下のオプションから選択できます。

      1. Resource を選択して、チャネル、ブローカー、またはサービスをイベントソースのシンクとして使用します。
      2. URI を選択して、コンテナーソースからのイベントのルーティング先を指定します。
  4. コンテナーソースの設定が完了したら、Create をクリックします。
5.2.6.2.4. コンテナーソースのリファレンス

ContainerSource オブジェクトを作成することにより、コンテナーをイベントソースとして使用できます。ContainerSource オブジェクトを作成するときに、複数のパラメーターを設定できます。

ContainerSource オブジェクトは以下のフィールドをサポートします。

フィールド説明必須またはオプション

apiVersion

API バージョンを指定します (例: sources.knative.dev/v1)。

必須

kind

このリソースオブジェクトを ContainerSource オブジェクトとして特定します。

必須

metadata

ContainerSource オブジェクトを一意に識別するメタデータを指定します。たとえば、name です。

必須

spec

この ContainerSource オブジェクトの設定情報を指定します。

必須

spec.sink

シンクとして使用する URI に解決するオブジェクトへの参照。

必須

spec.template

ContainerSource オブジェクトの template 仕様。

必須

spec.ceOverrides

上書きを定義して、シンクに送信されたイベントへの出力形式および変更を制御します。

任意

テンプレートパラメーターの例

apiVersion: sources.knative.dev/v1
kind: ContainerSource
metadata:
  name: test-heartbeats
spec:
  template:
    spec:
      containers:
        - image: quay.io/openshift-knative/heartbeats:latest
          name: heartbeats
          args:
            - --period=1
          env:
            - name: POD_NAME
              value: "mypod"
            - name: POD_NAMESPACE
              value: "event-test"
  ...

5.2.6.2.4.1. CloudEvent オーバーライド

ceOverrides 定義は、シンクに送信される CloudEvent の出力形式および変更を制御するオーバーライドを提供します。ceOverrides 定義に複数のフィールドを設定できます。

ceOverrides の定義は、以下のフィールドをサポートします。

フィールド説明必須またはオプション

extensions

アウトバウンドイベントで追加または上書きされる属性を指定します。各 extensions のキーと値のペアは、属性拡張機能としてイベントに個別に設定されます。

任意

注記

拡張子として許可されるのは、有効な CloudEvent 属性名のみです。拡張機能オーバーライド設定から仕様定義属性を設定することはできません。たとえば、type 属性を変更することはできません。

CloudEvent オーバーライドの例

apiVersion: sources.knative.dev/v1
kind: ContainerSource
metadata:
  name: test-heartbeats
spec:
  ...
  ceOverrides:
    extensions:
      extra: this is an extra attribute
      additional: 42

これにより、subjectK_CE_OVERRIDES 環境変数が設定されます。

出力例

{ "extensions": { "extra": "this is an extra attribute", "additional": "42" } }

5.2.7. 開発者パースペクティブを使用してイベントソースをシンクに接続する

OpenShift Container Platform Web コンソールを使用してイベントソースを作成する場合、イベントがソースから送信されるシンクを指定できます。シンクは、他のリソースから受信イベントを受信できる、アドレス指定可能または呼び出し可能な任意のリソースです。

5.2.7.1. Developer パースペクティブを使用してイベントソースをシンクに接続します。

前提条件

  • OpenShift Serverless Operator、Knative Serving、および Knative Eventing が OpenShift Container Platform クラスターにインストールされている。
  • Web コンソールにログインしており、Developer パースペクティブを使用している。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
  • Knative サービス、チャネル、ブローカーなどのシンクを作成している。

手順

  1. +Add Event Source に移動して任意のタイプのイベントソースを作成し、作成するイベントソースを選択します。
  2. イベントソースの作成 フォームビューの シンク セクションで、リソース リストからシンクを選択します。
  3. Create をクリックします。

検証

Topology ページを表示して、イベントソースが作成され、シンクに接続されていることを確認できます。

  1. Developer パースペクティブで、Topology に移動します。
  2. イベントソースを表示し、接続されたシンクをクリックして、右側のパネルにシンクの詳細を表示します。
Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.