Eventing


Red Hat OpenShift Serverless 1.37

Using event-driven architectures with OpenShift Serverless

Red Hat OpenShift Documentation Team

Abstract

This document provides information about Eventing features such as event sources and sinks, brokers, triggers, channels, and subscriptions.

Chapter 1. Knative Eventing

Knative Eventing on OpenShift Container Platform enables developers to use an event-driven architecture with serverless applications. An event-driven architecture is based on the concept of decoupled relationships between event producers and event consumers.

Event producers create events, and event sinks, or consumers, receive events. Knative Eventing uses standard HTTP POST requests to send and receive events between event producers and sinks. These events conform to the CloudEvents specifications, which enables creating, parsing, sending, and receiving events in any programming language.

1.1. Knative Eventing use cases:

Knative Eventing supports the following use cases:

Publish an event without creating a consumer
You can send events to a broker as an HTTP POST, and use binding to decouple the destination configuration from your application that produces events.
Consume an event without creating a publisher
You can use a trigger to consume events from a broker based on event attributes. The application receives events as an HTTP POST.

To enable delivery to multiple types of sinks, Knative Eventing defines the following generic interfaces that can be implemented by multiple Kubernetes resources:

Addressable resources
Able to receive and acknowledge an event delivered over HTTP to an address defined in the status.address.url field of the event. The Kubernetes Service resource also satisfies the addressable interface.
Callable resources
Able to receive an event delivered over HTTP and transform it, returning 0 or 1 new events in the HTTP response payload. These returned events may be further processed in the same way that events from an external event source are processed.

Chapter 2. Event sources

2.1. Event sources

A Knative event source can be any Kubernetes object that generates or imports cloud events, and relays those events to another endpoint, known as a sink. Sourcing events is critical to developing a distributed system that reacts to events.

You can create and manage Knative event sources in the OpenShift Container Platform web console, the Knative (kn) CLI, or by applying YAML files.

Currently, OpenShift Serverless supports the following event source types:

API server source
Brings Kubernetes API server events into Knative. The API server source sends a new event each time a Kubernetes resource is created, updated or deleted.
Ping source
Produces events with a fixed payload on a specified cron schedule.
Kafka event source
Connects an Apache Kafka cluster to a sink as an event source.

You can also create a custom event source.

2.1.1. Creating an event source

A Knative event source can be any Kubernetes object that generates or imports cloud events, and relays those events to another endpoint, known as a sink.

Prerequisites

  • You have installed the OpenShift Serverless Operator and Knative Eventing on your OpenShift Container Platform cluster.
  • You have logged in to the web console.
  • You have cluster-admin privileges on OpenShift Container Platform, or you have cluster or dedicated administrator permissions on Red Hat OpenShift Service on AWS or OpenShift Dedicated.

Procedure

  1. In the OpenShift Container Platform web console, navigate to ServerlessEventing.
  2. In the Create list, select Event Source. The web console opens the Event Source page.
  3. Select the event source type that you want to create.

2.2. Creating an API server source

The API server source is an event source that can be used to connect an event sink, such as a Knative service, to the Kubernetes API server. The API server source watches for Kubernetes events and forwards them to the Knative Eventing broker.

After installing Knative Eventing on your cluster, you can create an API server source by using the web console. Using the OpenShift Container Platform web console provides a streamlined and intuitive user interface to create an event source.

Prerequisites

  • You have logged in to the OpenShift Container Platform web console.
  • You have installed the OpenShift Serverless Operator and Knative Eventing on the cluster.
  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.
  • You have installed the OpenShift CLI (oc).
Procedure

If you want to re-use an existing service account, you can modify your existing ServiceAccount resource to include the required permissions instead of creating a new resource.

  1. Create a service account, role, and role binding for the event source as a YAML file:

    apiVersion: v1
    kind: ServiceAccount
    metadata:
      name: events-sa
      namespace: default 
    1
    
    
    ---
    apiVersion: rbac.authorization.k8s.io/v1
    kind: Role
    metadata:
      name: event-watcher
      namespace: default 
    2
    
    rules:
      - apiGroups:
          - ""
        resources:
          - events
        verbs:
          - get
          - list
          - watch
    
    ---
    apiVersion: rbac.authorization.k8s.io/v1
    kind: RoleBinding
    metadata:
      name: k8s-ra-event-watcher
      namespace: default 
    3
    
    roleRef:
      apiGroup: rbac.authorization.k8s.io
      kind: Role
      name: event-watcher
    subjects:
      - kind: ServiceAccount
        name: events-sa
        namespace: default 
    4
    1 2 3 4
    Change this namespace to the namespace that you have selected for installing the event source.
  2. Apply the YAML file:

    $ oc apply -f <filename>
  3. Navigate to +AddEvent Source. The Event Sources page is displayed.
  4. Optional: If you have many providers for your event sources, select the required provider from the Providers list to filter the available event sources from the provider.
  5. Select ApiServerSource and then click Create Event Source. The Create Event Source page is displayed.
  6. Configure the ApiServerSource settings by using the Form view or YAML view:

    Note

    You can switch between the Form view and YAML view. The console preserves the data when you switch between the views.

    1. Enter v1 as the APIVERSION and Event as the KIND.
    2. Select the Service Account Name for the service account that you created.
    3. In the Target section, select your event sink. This can be either a Resource or a URI:

      1. Select Resource to use a channel, broker, or service as an event sink for the event source.
      2. Select URI to specify the Uniform Resource Identifier (URI) where the system routes events.
  7. Click Create.

Verification

  • After you create the API server source, check the connection to the event sink in the Topology view.

    ApiServerSource Topology view
Note

If you use a URI sink, you can change the URI by right-clicking on URI sinkEdit URI.

Deleting the API server source

  1. Navigate to the Topology view.
  2. Right-click the API server source and select Delete ApiServerSource.

You can use the kn source apiserver create command to create an API server source by using the kn CLI. Using the kn CLI to create an API server source provides a more streamlined and intuitive user interface than modifying YAML files directly.

Prerequisites

  • You have installed the OpenShift Serverless Operator and Knative Eventing on the cluster.
  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.
  • You have installed the OpenShift CLI (oc).
  • You have installed the Knative (kn) CLI.
Procedure

If you want to re-use an existing service account, you can modify your existing ServiceAccount resource to include the required permissions instead of creating a new resource.

  1. Create a service account, role, and role binding for the event source as a YAML file:

    apiVersion: v1
    kind: ServiceAccount
    metadata:
      name: events-sa
      namespace: default 
    1
    
    
    ---
    apiVersion: rbac.authorization.k8s.io/v1
    kind: Role
    metadata:
      name: event-watcher
      namespace: default 
    2
    
    rules:
      - apiGroups:
          - ""
        resources:
          - events
        verbs:
          - get
          - list
          - watch
    
    ---
    apiVersion: rbac.authorization.k8s.io/v1
    kind: RoleBinding
    metadata:
      name: k8s-ra-event-watcher
      namespace: default 
    3
    
    roleRef:
      apiGroup: rbac.authorization.k8s.io
      kind: Role
      name: event-watcher
    subjects:
      - kind: ServiceAccount
        name: events-sa
        namespace: default 
    4
    1 2 3 4
    Change this namespace to the namespace that you have selected for installing the event source.
  2. Apply the YAML file:

    $ oc apply -f <filename>
  3. Create an API server source that has an event sink. In the following example, the sink is a broker:

    $ kn source apiserver create <event_source_name> --sink broker:<broker_name> --resource "event:v1" --service-account <service_account_name> --mode Resource
  4. To verify the API server source setup, create a Knative service that dumps incoming messages to its log.

    $ kn service create event-display --image quay.io/openshift-knative/showcase
  5. If you used a broker as an event sink, create a trigger to filter events from the default broker to the service:

    $ kn trigger create <trigger_name> --sink ksvc:event-display
  6. Create events by launching a pod in the default namespace:

    $ oc create deployment event-origin --image quay.io/openshift-knative/showcase
  7. Check that the controller maps correctly by inspecting the output from the following command:

    $ kn source apiserver describe <source_name>

    You get an output similar to the following example:

    Name:                mysource
    Namespace:           default
    Annotations:         sources.knative.dev/creator=developer, sources.knative.dev/lastModifier=developer
    Age:                 3m
    ServiceAccountName:  events-sa
    Mode:                Resource
    Sink:
      Name:       default
      Namespace:  default
      Kind:       Broker (eventing.knative.dev/v1)
    Resources:
      Kind:        event (v1)
      Controller:  false
    Conditions:
      OK TYPE                     AGE REASON
      ++ Ready                     3m
      ++ Deployed                  3m
      ++ SinkProvided              3m
      ++ SufficientPermissions     3m
      ++ EventTypesProvided        3m

Verification

To verify that Kubernetes sends events to Knative, examine the event-display logs or use a web browser to view the events.

  • To view the events in a web browser, open the link returned by the following command:

    $ kn service describe event-display -o url

    The following example shows the browser page:

    Example visualization of ApiServerSource event
  • You can also see the logs in the terminal, view the event-display logs for the pods by running the following command:

    $ oc logs $(oc get pod -o name | grep event-display) -c user-container

    You get an output similar to the following example:

    ☁️  cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.apiserver.resource.update
      datacontenttype: application/json
      ...
    Data,
      {
        "apiVersion": "v1",
        "involvedObject": {
          "apiVersion": "v1",
          "fieldPath": "spec.containers{event-origin}",
          "kind": "Pod",
          "name": "event-origin",
          "namespace": "default",
           .....
        },
        "kind": "Event",
        "message": "Started container",
        "metadata": {
          "name": "event-origin.159d7608e3a3572c",
          "namespace": "default",
          ....
        },
        "reason": "Started",
        ...
      }
    1. Delete the trigger:

      $ kn trigger delete <trigger_name>
    2. Delete the event source:

      $ kn source apiserver delete <source_name>
    3. Delete the service account, cluster role, and cluster binding:

      $ oc delete -f authentication.yaml
2.2.2.1. Knative CLI sink flag

When you create an event source by using the Knative (kn) CLI, you can specify a sink where events are sent to from that resource by using the --sink flag. The sink can be any addressable or callable resource that can receive incoming events from other resources.

The following example creates a sink binding that uses a service, http://event-display.svc.cluster.local, as the sink:

You get an output similar to the following example:

$ kn source binding create bind-heartbeat \
  --namespace sinkbinding-example \
  --subject "Job:batch/v1:app=heartbeat-cron" \
  --sink http://event-display.svc.cluster.local \
  --ce-override "sink=bound"

svc in http://event-display.svc.cluster.local determines that the sink is a Knative service. Other default sink prefixes include channel, and broker.

Creating Knative resources by using YAML files uses a declarative API, which enables you to describe event sources declaratively and in a reproducible manner. To create an API server source by using YAML, you must create a YAML file that defines an ApiServerSource object, then apply it by using the oc apply command.

Prerequisites

  • You have installed the OpenShift Serverless Operator and Knative Eventing on the cluster.
  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.
  • You have created the default broker in the same namespace as the one defined in the API server source YAML file.
  • Install the OpenShift CLI (oc).
Procedure

If you want to re-use an existing service account, you can modify your existing ServiceAccount resource to include the required permissions instead of creating a new resource.

  1. Create a service account, role, and role binding for the event source as a YAML file:

    apiVersion: v1
    kind: ServiceAccount
    metadata:
      name: events-sa
      namespace: default 
    1
    
    
    ---
    apiVersion: rbac.authorization.k8s.io/v1
    kind: Role
    metadata:
      name: event-watcher
      namespace: default 
    2
    
    rules:
      - apiGroups:
          - ""
        resources:
          - events
        verbs:
          - get
          - list
          - watch
    
    ---
    apiVersion: rbac.authorization.k8s.io/v1
    kind: RoleBinding
    metadata:
      name: k8s-ra-event-watcher
      namespace: default 
    3
    
    roleRef:
      apiGroup: rbac.authorization.k8s.io
      kind: Role
      name: event-watcher
    subjects:
      - kind: ServiceAccount
        name: events-sa
        namespace: default 
    4
    1 2 3 4
    Change this namespace to the namespace that you have selected for installing the event source.
  2. Apply the YAML file:

    $ oc apply -f <filename>
  3. Create an API server source as a YAML file:

    apiVersion: sources.knative.dev/v1alpha1
    kind: ApiServerSource
    metadata:
      name: testevents
    spec:
      serviceAccountName: events-sa
      mode: Resource
      resources:
        - apiVersion: v1
          kind: Event
      sink:
        ref:
          apiVersion: eventing.knative.dev/v1
          kind: Broker
          name: default
  4. Apply the ApiServerSource YAML file:

    $ oc apply -f <filename>
  5. To verify the API server source configuration, create a Knative service as a YAML file that dumps incoming messages to its log:

    apiVersion: serving.knative.dev/v1
    kind: Service
    metadata:
      name: event-display
      namespace: default
    spec:
      template:
        spec:
          containers:
            - image: quay.io/openshift-knative/showcase
  6. Apply the Service YAML file:

    $ oc apply -f <filename>
  7. Create a Trigger object as a YAML file that filters events from the default broker to the service created in the earlier step:

    apiVersion: eventing.knative.dev/v1
    kind: Trigger
    metadata:
      name: event-display-trigger
      namespace: default
    spec:
      broker: default
      subscriber:
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: event-display
  8. Apply the Trigger YAML file:

    $ oc apply -f <filename>
  9. Create events by launching a pod in the default namespace:

    $ oc create deployment event-origin --image=quay.io/openshift-knative/showcase
  10. Check that the controller maps correctly by entering the following command and inspecting the output:

    $ oc get apiserversource.sources.knative.dev testevents -o yaml

    Example output

    apiVersion: sources.knative.dev/v1alpha1
    kind: ApiServerSource
    metadata:
      annotations:
      creationTimestamp: "2020-04-07T17:24:54Z"
      generation: 1
      name: testevents
      namespace: default
      resourceVersion: "62868"
      selfLink: /apis/sources.knative.dev/v1alpha1/namespaces/default/apiserversources/testevents2
      uid: 1603d863-bb06-4d1c-b371-f580b4db99fa
    spec:
      mode: Resource
      resources:
      - apiVersion: v1
        controller: false
        controllerSelector:
          apiVersion: ""
          kind: ""
          name: ""
          uid: ""
        kind: Event
        labelSelector: {}
      serviceAccountName: events-sa
      sink:
        ref:
          apiVersion: eventing.knative.dev/v1
          kind: Broker
          name: default

Verification

To verify that Kubernetes sends events to Knative, check the event-display logs or view the events in a web browser.

  • To view the events in a web browser, open the link returned by the following command:

    $ oc get ksvc event-display -o jsonpath='{.status.url}'

    Figure 2.1. Example browser page

    Example visualization of ApiServerSource event
  • To see the logs in the terminal, view the event-display logs for the pods by entering the following command:

    $ oc logs $(oc get pod -o name | grep event-display) -c user-container

    Example output

    ☁️  cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.apiserver.resource.update
      datacontenttype: application/json
      ...
    Data,
      {
        "apiVersion": "v1",
        "involvedObject": {
          "apiVersion": "v1",
          "fieldPath": "spec.containers{event-origin}",
          "kind": "Pod",
          "name": "event-origin",
          "namespace": "default",
           .....
        },
        "kind": "Event",
        "message": "Started container",
        "metadata": {
          "name": "event-origin.159d7608e3a3572c",
          "namespace": "default",
          ....
        },
        "reason": "Started",
        ...
      }

Deleting the API server source

  1. Delete the trigger:

    $ oc delete -f trigger.yaml
  2. Delete the event source:

    $ oc delete -f k8s-events.yaml
  3. Delete the service account, cluster role, and cluster binding:

    $ oc delete -f authentication.yaml

2.3. Creating a ping source

A ping source is an event source that can be used to periodically send ping events with a constant payload to an event consumer. A ping source can be used to schedule sending events, similar to a timer.

After Knative Eventing is installed on your cluster, you can create a ping source by using the web console. Using the OpenShift Container Platform web console provides a streamlined and intuitive user interface to create an event source.

Prerequisites

  • You have logged in to the OpenShift Container Platform web console.
  • The OpenShift Serverless Operator, Knative Serving and Knative Eventing are installed on the cluster.
  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.

Procedure

  1. To verify that the ping source is working, create a simple Knative service that dumps incoming messages to the logs of the service.

    1. Navigate to +AddYAML.
    2. Copy the example YAML:

      apiVersion: serving.knative.dev/v1
      kind: Service
      metadata:
        name: event-display
      spec:
        template:
          spec:
            containers:
              - image: quay.io/openshift-knative/showcase
    3. Click Create.
  2. Create a ping source in the same namespace as the service created in the previous step, or any other sink that you want to send events to.

    1. Navigate to +AddEvent Source. The Event Sources page is displayed.
    2. Optional: If you have multiple providers for your event sources, select the required provider from the Providers list to filter the available event sources from the provider.
    3. Select Ping Source and then click Create Event Source. The Create Event Source page is displayed.

      Note

      You can configure the PingSource settings by using the Form view or YAML view and can switch between the views. The data is persisted when switching between the views.

    4. Enter a value for Schedule. In this example, the value is */2 * * * *, which creates a PingSource that sends a message every two minutes.
    5. Optional: You can enter a value for Data, which is the message payload.
    6. In the Target section, select your event sink. This can be either a Resource or a URI:

      1. Select Resource to use a channel, broker, or service as an event sink for the event source. In this example, the event-display service created in the previous step is used as the target Resource.
      2. Select URI to specify a Uniform Resource Identifier (URI) where the events are routed to.
    7. Click Create.

Verification

You can verify that the ping source was created and is connected to the sink by viewing the Topology page.

  1. Navigate to Topology.
  2. View the ping source and sink.

    View the ping source and service in the Topology view
  3. View the event-display service in the web browser. You should see the ping source events in the web UI.

    View the ping source events in the web UI

Deleting the ping source

  1. Navigate to the Topology view.
  2. Right-click the API server source and select Delete Ping Source.

You can use the kn source ping create command to create a ping source by using the Knative (kn) CLI. Using the Knative CLI to create event sources provides a more streamlined and intuitive user interface than modifying YAML files directly.

Prerequisites

  • You have installed the OpenShift Serverless Operator, Knative Serving and Knative Eventing on the cluster.
  • You have installed the Knative (kn) CLI.
  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.
  • Optional: If you want to use the verification steps for this procedure, install the OpenShift CLI (oc).

Procedure

  1. To verify that the ping source is working, create a simple Knative service that dumps incoming messages to the service logs:

    $ kn service create event-display \
        --image quay.io/openshift-knative/showcase
  2. For each set of ping events that you want to request, create a ping source in the same namespace as the event consumer:

    $ kn source ping create test-ping-source \
        --schedule "*/2 * * * *" \
        --data '{"message": "Hello world!"}' \
        --sink ksvc:event-display
  3. Check that the controller is mapped correctly by entering the following command and inspecting the output:

    $ kn source ping describe test-ping-source

    You get an output similar to the following example:

    Name:         test-ping-source
    Namespace:    default
    Annotations:  sources.knative.dev/creator=developer, sources.knative.dev/lastModifier=developer
    Age:          15s
    Schedule:     */2 * * * *
    Data:         {"message": "Hello world!"}
    
    Sink:
      Name:       event-display
      Namespace:  default
      Resource:   Service (serving.knative.dev/v1)
    
    Conditions:
      OK TYPE                 AGE REASON
      ++ Ready                 8s
      ++ Deployed              8s
      ++ SinkProvided         15s
      ++ ValidSchedule        15s
      ++ EventTypeProvided    15s
      ++ ResourcesCorrect     15s

Verification

You can verify that the Kubernetes events reached the Knative event sink by checking the logs of the sink pod.

By default, Knative services cancel pods if they do not receive traffic within 60 seconds. The example in this guide creates a ping source that sends a message every 2 minutes, so each message is displayed in a newly created pod.

  1. Watch for new pods created:

    $ watch oc get pods
  2. Cancel watching the pods using Ctrl+C, then check the logs of the created pod:

    $ oc logs $(oc get pod -o name | grep event-display) -c user-container

    Example output

    ☁️  cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.sources.ping
      source: /apis/v1/namespaces/default/pingsources/test-ping-source
      id: 99e4f4f6-08ff-4bff-acf1-47f61ded68c9
      time: 2020-04-07T16:16:00.000601161Z
      datacontenttype: application/json
    Data,
      {
        "message": "Hello world!"
      }

2.3.2.1. Knative CLI sink flag

When you create an event source by using the Knative (kn) CLI, you can specify a sink where events are sent to from that resource by using the --sink flag. The sink can be any addressable or callable resource that can receive incoming events from other resources.

The following example creates a sink binding that uses a service, http://event-display.svc.cluster.local, as the sink:

You get an output similar to the following example:

$ kn source binding create bind-heartbeat \
  --namespace sinkbinding-example \
  --subject "Job:batch/v1:app=heartbeat-cron" \
  --sink http://event-display.svc.cluster.local \
  --ce-override "sink=bound"

svc in http://event-display.svc.cluster.local determines that the sink is a Knative service. Other default sink prefixes include channel, and broker.

2.3.3. Creating a ping source by using YAML

Creating Knative resources by using YAML files uses a declarative API, which enables you to describe event sources declaratively and in a reproducible manner. To create a serverless ping source by using YAML, you must create a YAML file that defines a PingSource object, then apply it by using oc apply.

Example PingSource object

apiVersion: sources.knative.dev/v1
kind: PingSource
metadata:
  name: test-ping-source
spec:
  schedule: "*/2 * * * *" 
1

  data: '{"message": "Hello world!"}' 
2

  sink: 
3

    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: event-display

1
The schedule of the event specified using CRON expression.
2
The event message body expressed as a JSON encoded data string.
3
These are the details of the event consumer. In this example, we are using a Knative service named event-display.

Prerequisites

  • The OpenShift Serverless Operator, Knative Serving and Knative Eventing are installed on the cluster.
  • Install the OpenShift CLI (oc).
  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.

Procedure

  1. To verify that the ping source is working, create a simple Knative service that dumps incoming messages to the service’s logs.

    1. Create a service YAML file:

      apiVersion: serving.knative.dev/v1
      kind: Service
      metadata:
        name: event-display
      spec:
        template:
          spec:
            containers:
              - image: quay.io/openshift-knative/showcase
    2. Create the service:

      $ oc apply -f <filename>
  2. For each set of ping events that you want to request, create a ping source in the same namespace as the event consumer.

    1. Create a YAML file for the ping source:

      apiVersion: sources.knative.dev/v1
      kind: PingSource
      metadata:
        name: test-ping-source
      spec:
        schedule: "*/2 * * * *"
        data: '{"message": "Hello world!"}'
        sink:
          ref:
            apiVersion: serving.knative.dev/v1
            kind: Service
            name: event-display
    2. Create the ping source:

      $ oc apply -f <filename>
  3. Check that the controller is mapped correctly by entering the following command:

    $ oc get pingsource.sources.knative.dev <ping_source_name> -oyaml

    Example output

    apiVersion: sources.knative.dev/v1
    kind: PingSource
    metadata:
      annotations:
        sources.knative.dev/creator: developer
        sources.knative.dev/lastModifier: developer
      creationTimestamp: "2020-04-07T16:11:14Z"
      generation: 1
      name: test-ping-source
      namespace: default
      resourceVersion: "55257"
      selfLink: /apis/sources.knative.dev/v1/namespaces/default/pingsources/test-ping-source
      uid: 3d80d50b-f8c7-4c1b-99f7-3ec00e0a8164
    spec:
      data: '{ value: "hello" }'
      schedule: '*/2 * * * *'
      sink:
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: event-display
          namespace: default

Verification

You can verify that the Kubernetes events were sent to the Knative event sink by looking at the sink pod’s logs.

By default, Knative services terminate their pods if no traffic is received within a 60 second period. The example shown in this guide creates a PingSource that sends a message every 2 minutes, so each message should be observed in a newly created pod.

  1. Watch for new pods created:

    $ watch oc get pods
  2. Cancel watching the pods using Ctrl+C, then look at the logs of the created pod:

    $ oc logs $(oc get pod -o name | grep event-display) -c user-container

    Example output

    ☁️  cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.sources.ping
      source: /apis/v1/namespaces/default/pingsources/test-ping-source
      id: 042ff529-240e-45ee-b40c-3a908129853e
      time: 2020-04-07T16:22:00.000791674Z
      datacontenttype: application/json
    Data,
      {
        "message": "Hello world!"
      }

Deleting the ping source

  • Delete the ping source:

    $ oc delete -f <filename>

    Example command

    $ oc delete -f ping-source.yaml

2.4. Source for Apache Kafka

You can create an Apache Kafka source that reads events from an Apache Kafka cluster and passes these events to a sink. You can create a Kafka source by using the OpenShift Container Platform web console, the Knative (kn) CLI, or by creating a KafkaSource object directly as a YAML file and using the OpenShift CLI (oc) to apply it.

Note

See the documentation for Installing Knative broker for Apache Kafka.

After the Knative broker implementation for Apache Kafka is installed on your cluster, you can create an Apache Kafka source by using the web console. Using the OpenShift Container Platform web console provides a streamlined and intuitive user interface to create a Kafka source.

Prerequisites

  • The OpenShift Serverless Operator, Knative Eventing, and the KnativeKafka custom resource are installed on your cluster.
  • You have logged in to the web console.
  • You have access to a Red Hat AMQ Streams (Kafka) cluster that produces the Kafka messages you want to import.
  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.

Procedure

  1. Navigate to the +Add page and select Event Source.
  2. In the Event Sources page, select Kafka Source in the Type section.
  3. Configure the Kafka Source settings:

    1. Add a comma-separated list of Bootstrap Servers.
    2. Add a comma-separated list of Topics.
    3. Add a Consumer Group.
    4. Select the Service Account Name for the service account that you created.
    5. In the Target section, select your event sink. This can be either a Resource or a URI:

      1. Select Resource to use a channel, broker, or service as an event sink for the event source.
      2. Select URI to specify a Uniform Resource Identifier (URI) where the events are routed to.
    6. Enter a Name for the Kafka event source.
  4. Click Create.

Verification

You can verify that the Kafka event source was created and is connected to the sink by viewing the Topology page.

  1. Navigate to Topology.
  2. View the Kafka event source and sink.

    View the Kafka source and service in the Topology view

You can use the kn source kafka create command to create a Kafka source by using the Knative (kn) CLI. Using the Knative CLI to create event sources provides a more streamlined and intuitive user interface than modifying YAML files directly.

Prerequisites

  • You have installed the OpenShift Serverless Operator, Knative Eventing, Knative Serving, and the KnativeKafka custom resource (CR) on your cluster.
  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.
  • You have access to a Red Hat AMQ Streams (Kafka) cluster that produces the Kafka messages you want to import.
  • You have installed the Knative (kn) CLI.
  • Optional: You have installed the OpenShift CLI (oc) if you want to use the verification steps in this procedure.

Procedure

  1. To verify that the Kafka event source is working, create a Knative service that dumps incoming events into the service logs:

    $ kn service create event-display \
        --image quay.io/openshift-knative/showcase
  2. Create a KafkaSource CR:

    $ kn source kafka create <kafka_source_name> \
        --servers <cluster_kafka_bootstrap>.kafka.svc:9092 \
        --topics <topic_name> --consumergroup my-consumer-group \
        --sink event-display
    Note

    Replace the placeholder values in this command with values for your source name, bootstrap servers, and topics.

    The --servers, --topics, and --consumergroup options specify the connection parameters to the Kafka cluster. The --consumergroup option is optional.

  3. Optional: View details about the KafkaSource CR you created:

    $ kn source kafka describe <kafka_source_name>

    You get an output similar to the following example:

    Name:              example-kafka-source
    Namespace:         kafka
    Age:               1h
    BootstrapServers:  example-cluster-kafka-bootstrap.kafka.svc:9092
    Topics:            example-topic
    ConsumerGroup:     example-consumer-group
    
    Sink:
      Name:       event-display
      Namespace:  default
      Resource:   Service (serving.knative.dev/v1)
    
    Conditions:
      OK TYPE            AGE REASON
      ++ Ready            1h
      ++ Deployed         1h
      ++ SinkProvided     1h

Verification

  1. Trigger the Kafka instance to send a message to the topic:

    $ oc -n kafka run kafka-producer \
        -ti --image=quay.io/strimzi/kafka:latest-kafka-2.7.0 --rm=true \
        --restart=Never -- bin/kafka-console-producer.sh \
        --broker-list <cluster_kafka_bootstrap>:9092 --topic my-topic

    Enter the message in the prompt. This command assumes that:

    • The Kafka cluster is now installed in the kafka namespace.
    • The KafkaSource object is now configured to use the my-topic topic.
  2. Verify that the message arrived by viewing the logs:

    $ oc logs $(oc get pod -o name | grep event-display) -c user-container

    You get an output similar to the following example:

    ☁️  cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.kafka.event
      source: /apis/v1/namespaces/default/kafkasources/example-kafka-source#example-topic
      subject: partition:46#0
      id: partition:46/offset:0
      time: 2021-03-10T11:21:49.4Z
    Extensions,
      traceparent: 00-161ff3815727d8755848ec01c866d1cd-7ff3916c44334678-00
    Data,
      Hello!
2.4.2.1. Knative CLI sink flag

When you create an event source by using the Knative (kn) CLI, you can specify a sink where events are sent to from that resource by using the --sink flag. The sink can be any addressable or callable resource that can receive incoming events from other resources.

The following example creates a sink binding that uses a service, http://event-display.svc.cluster.local, as the sink:

You get an output similar to the following example:

$ kn source binding create bind-heartbeat \
  --namespace sinkbinding-example \
  --subject "Job:batch/v1:app=heartbeat-cron" \
  --sink http://event-display.svc.cluster.local \
  --ce-override "sink=bound"

svc in http://event-display.svc.cluster.local determines that the sink is a Knative service. Other default sink prefixes include channel, and broker.

Creating Knative resources by using YAML files uses a declarative API, which enables you to describe applications declaratively and in a reproducible manner. To create a Kafka source by using YAML, you must create a YAML file that defines a KafkaSource object, then apply it by using the oc apply command.

Prerequisites

  • The OpenShift Serverless Operator, Knative Eventing, and the KnativeKafka custom resource are installed on your cluster.
  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.
  • You have access to a Red Hat AMQ Streams (Kafka) cluster that produces the Kafka messages you want to import.
  • Install the OpenShift CLI (oc).

Procedure

  1. Create a KafkaSource object as a YAML file:

    apiVersion: sources.knative.dev/v1beta1
    kind: KafkaSource
    metadata:
      name: <source_name>
    spec:
      consumerGroup: <group_name> 
    1
    
      bootstrapServers:
      - <list_of_bootstrap_servers>
      topics:
      - <list_of_topics> 
    2
    
      sink:
      - <list_of_sinks> 
    3
    1
    A consumer group is a group of consumers that use the same group ID, and consume data from a topic.
    2
    A topic provides a destination for the storage of data. Each topic is split into one or more partitions.
    3
    A sink specifies where events are sent to from a source.
    Important

    Only the v1beta1 version of the API for KafkaSource objects on OpenShift Serverless is supported. Do not use the v1alpha1 version of this API, as this version is now deprecated.

    Example KafkaSource object

    apiVersion: sources.knative.dev/v1beta1
    kind: KafkaSource
    metadata:
      name: kafka-source
    spec:
      consumerGroup: knative-group
      bootstrapServers:
      - my-cluster-kafka-bootstrap.kafka:9092
      topics:
      - knative-demo-topic
      sink:
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: event-display

  2. Apply the KafkaSource YAML file:

    $ oc apply -f <filename>

Verification

  • Verify that the Kafka event source was created by entering the following command:

    $ oc get pods

    Example output

    NAME                                    READY     STATUS    RESTARTS   AGE
    kafkasource-kafka-source-5ca0248f-...   1/1       Running   0          13m

Simple Authentication and Security Layer (SASL) is used by Apache Kafka for authentication. If you use SASL authentication on your cluster, users must provide credentials to Knative for communicating with the Kafka cluster; otherwise events cannot be produced or consumed.

Prerequisites

  • You have cluster or dedicated administrator permissions on OpenShift Container Platform.
  • The OpenShift Serverless Operator, Knative Eventing, and the KnativeKafka CR are installed on your OpenShift Container Platform cluster.
  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.
  • You have a username and password for a Kafka cluster.
  • You have chosen the SASL mechanism to use, for example, PLAIN, SCRAM-SHA-256, or SCRAM-SHA-512.
  • If TLS is enabled, you also need the ca.crt certificate file for the Kafka cluster.
  • You have installed the OpenShift (oc) CLI.

Procedure

  1. Create the certificate files as secrets in your chosen namespace:

    $ oc create secret -n <namespace> generic <kafka_auth_secret> \
      --from-file=ca.crt=caroot.pem \
      --from-literal=password="SecretPassword" \
      --from-literal=saslType="SCRAM-SHA-512" \ 
    1
    
      --from-literal=user="my-sasl-user"
    1
    The SASL type can be PLAIN, SCRAM-SHA-256, or SCRAM-SHA-512.
  2. Create or modify your Kafka source so that it contains the following spec configuration:

    apiVersion: sources.knative.dev/v1beta1
    kind: KafkaSource
    metadata:
      name: example-source
    spec:
    ...
      net:
        sasl:
          enable: true
          user:
            secretKeyRef:
              name: <kafka_auth_secret>
              key: user
          password:
            secretKeyRef:
              name: <kafka_auth_secret>
              key: password
          type:
            secretKeyRef:
              name: <kafka_auth_secret>
              key: saslType
        tls:
          enable: true
          caCert: 
    1
    
            secretKeyRef:
              name: <kafka_auth_secret>
              key: ca.crt
    ...
    1
    The caCert spec is not required if you are using a public cloud Kafka service.

You can configure Knative Eventing sources for Apache Kafka (KafkaSource) to be autoscaled using the Custom Metrics Autoscaler Operator, which is based on the Kubernetes Event Driven Autoscaler (KEDA).

Important

Configuring KEDA autoscaling for KafkaSource is a Technology Preview feature only. Technology Preview features are not supported with Red Hat production service level agreements (SLAs) and might not be functionally complete. Red Hat does not recommend using them in production. These features provide early access to upcoming product features, enabling customers to test functionality and provide feedback during the development process.

For more information about the support scope of Red Hat Technology Preview features, see Technology Preview Features Support Scope.

Prerequisites

  • The OpenShift Serverless Operator, Knative Eventing, and the KnativeKafka custom resource are installed on your cluster.

Procedure

  1. In the KnativeKafka custom resource, enable KEDA scaling:

    Example YAML

    apiVersion: operator.serverless.openshift.io/v1alpha1
    kind: KnativeKafka
    metadata:
      name: knative-kafka
      namespace: knative-eventing
    spec:
      config:
        kafka-features:
          controller-autoscaler-keda: enabled

  2. Apply the KnativeKafka YAML file:

    $ oc apply -f <filename>

2.5. Custom event sources

If you need to ingress events from an event producer that is not included in Knative, or from a producer that emits events which are not in the CloudEvent format, you can do this by creating a custom event source. You can create a custom event source by using one of the following methods:

  • Use a PodSpecable object as an event source, by creating a sink binding.
  • Use a container as an event source, by creating a container source.

2.5.1. Sink binding

The SinkBinding object supports decoupling event production from delivery addressing. Sink binding is used to connect event producers to an event consumer, or sink. An event producer is a Kubernetes resource that embeds a PodSpec template and produces events. A sink is an addressable Kubernetes object that can receive events.

The SinkBinding object injects environment variables into the PodTemplateSpec of the sink, which means that the application code does not need to interact directly with the Kubernetes API to locate the event destination. These environment variables are as follows:

K_SINK
The URL of the resolved sink.
K_CE_OVERRIDES
A JSON object that specifies overrides to the outbound event.
Note

The SinkBinding object currently does not support custom revision names for services.

2.5.1.1. Creating a sink binding by using YAML

Creating Knative resources by using YAML files uses a declarative API, which enables you to describe event sources declaratively and in a reproducible manner. To create a sink binding by using YAML, you must create a YAML file that defines an SinkBinding object, then apply it by using the oc apply command.

Prerequisites

  • The OpenShift Serverless Operator, Knative Serving and Knative Eventing are installed on the cluster.
  • Install the OpenShift CLI (oc).
  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.

Procedure

  1. To check that sink binding is set up correctly, create a Knative event display service, or event sink, that dumps incoming messages to its log.

    1. Create a service YAML file:

      Example service YAML file

      apiVersion: serving.knative.dev/v1
      kind: Service
      metadata:
        name: event-display
      spec:
        template:
          spec:
            containers:
              - image: quay.io/openshift-knative/showcase

    2. Create the service:

      $ oc apply -f <filename>
  2. Create a sink binding instance that directs events to the service.

    1. Create a sink binding YAML file:

      Example service YAML file

      apiVersion: sources.knative.dev/v1alpha1
      kind: SinkBinding
      metadata:
        name: bind-heartbeat
      spec:
        subject:
          apiVersion: batch/v1
          kind: Job 
      1
      
          selector:
            matchLabels:
              app: heartbeat-cron
      
        sink:
          ref:
            apiVersion: serving.knative.dev/v1
            kind: Service
            name: event-display

      1
      In this example, any Job with the label app: heartbeat-cron will be bound to the event sink.
    2. Create the sink binding:

      $ oc apply -f <filename>
  3. Create a CronJob object.

    1. Create a cron job YAML file:

      Example cron job YAML file

      apiVersion: batch/v1
      kind: CronJob
      metadata:
        name: heartbeat-cron
      spec:
        # Run every minute
        schedule: "* * * * *"
        jobTemplate:
          metadata:
            labels:
              app: heartbeat-cron
              bindings.knative.dev/include: "true"
          spec:
            template:
              spec:
                restartPolicy: Never
                containers:
                  - name: single-heartbeat
                    image: quay.io/openshift-knative/heartbeats:latest
                    args:
                      - --period=1
                    env:
                      - name: ONE_SHOT
                        value: "true"
                      - name: POD_NAME
                        valueFrom:
                          fieldRef:
                            fieldPath: metadata.name
                      - name: POD_NAMESPACE
                        valueFrom:
                          fieldRef:
                            fieldPath: metadata.namespace

      Important

      To use sink binding, you must manually add a bindings.knative.dev/include=true label to your Knative resources.

      For example, to add this label to a CronJob resource, add the following lines to the Job resource YAML definition:

        jobTemplate:
          metadata:
            labels:
              app: heartbeat-cron
              bindings.knative.dev/include: "true"
    2. Create the cron job:

      $ oc apply -f <filename>
  4. Check that the controller is mapped correctly by entering the following command and inspecting the output:

    $ oc get sinkbindings.sources.knative.dev bind-heartbeat -oyaml

    Example output

    spec:
      sink:
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: event-display
          namespace: default
      subject:
        apiVersion: batch/v1
        kind: Job
        namespace: default
        selector:
          matchLabels:
            app: heartbeat-cron

Verification

You can verify that the Kubernetes events were sent to the Knative event sink by looking at the message dumper function logs.

  1. Enter the command:

    $ oc get pods
  2. Enter the command:

    $ oc logs $(oc get pod -o name | grep event-display) -c user-container

    Example output

    ☁️  cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.eventing.samples.heartbeat
      source: https://knative.dev/eventing-contrib/cmd/heartbeats/#event-test/mypod
      id: 2b72d7bf-c38f-4a98-a433-608fbcdd2596
      time: 2019-10-18T15:23:20.809775386Z
      contenttype: application/json
    Extensions,
      beats: true
      heart: yes
      the: 42
    Data,
      {
        "id": 1,
        "label": ""
      }

You can use the kn source binding create command to create a sink binding by using the Knative (kn) CLI. Using the Knative CLI to create event sources provides a more streamlined and intuitive user interface than modifying YAML files directly.

Prerequisites

  • The OpenShift Serverless Operator, Knative Serving and Knative Eventing are installed on the cluster.
  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.
  • Install the Knative (kn) CLI.
  • Install the OpenShift CLI (oc).
Note

The following procedure requires you to create YAML files.

If you change the names of the YAML files from those used in the examples, you must ensure that you also update the corresponding CLI commands.

Procedure

  1. To check that sink binding is set up correctly, create a Knative event display service, or event sink, that dumps incoming messages to its log:

    $ kn service create event-display --image quay.io/openshift-knative/showcase
  2. Create a sink binding instance that directs events to the service:

    $ kn source binding create bind-heartbeat --subject Job:batch/v1:app=heartbeat-cron --sink ksvc:event-display
  3. Create a CronJob object.

    1. Create a cron job YAML file:

      Example cron job YAML file

      apiVersion: batch/v1
      kind: CronJob
      metadata:
        name: heartbeat-cron
      spec:
        # Run every minute
        schedule: "* * * * *"
        jobTemplate:
          metadata:
            labels:
              app: heartbeat-cron
              bindings.knative.dev/include: "true"
          spec:
            template:
              spec:
                restartPolicy: Never
                containers:
                  - name: single-heartbeat
                    image: quay.io/openshift-knative/heartbeats:latest
                    args:
                      - --period=1
                    env:
                      - name: ONE_SHOT
                        value: "true"
                      - name: POD_NAME
                        valueFrom:
                          fieldRef:
                            fieldPath: metadata.name
                      - name: POD_NAMESPACE
                        valueFrom:
                          fieldRef:
                            fieldPath: metadata.namespace

      Important

      To use sink binding, you must manually add a bindings.knative.dev/include=true label to your Knative CRs.

      For example, to add this label to a CronJob CR, add the following lines to the Job CR YAML definition:

        jobTemplate:
          metadata:
            labels:
              app: heartbeat-cron
              bindings.knative.dev/include: "true"
    2. Create the cron job:

      $ oc apply -f <filename>
  4. Check that the controller is mapped correctly by entering the following command and inspecting the output:

    $ kn source binding describe bind-heartbeat

    Example output

    Name:         bind-heartbeat
    Namespace:    demo-2
    Annotations:  sources.knative.dev/creator=minikube-user, sources.knative.dev/lastModifier=minikub ...
    Age:          2m
    Subject:
      Resource:   job (batch/v1)
      Selector:
        app:      heartbeat-cron
    Sink:
      Name:       event-display
      Resource:   Service (serving.knative.dev/v1)
    
    Conditions:
      OK TYPE     AGE REASON
      ++ Ready     2m

Verification

You can verify that the Kubernetes events were sent to the Knative event sink by looking at the message dumper function logs.

  • View the message dumper function logs by entering the following commands:

    $ oc get pods
    $ oc logs $(oc get pod -o name | grep event-display) -c user-container

    Example output

    ☁️  cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.eventing.samples.heartbeat
      source: https://knative.dev/eventing-contrib/cmd/heartbeats/#event-test/mypod
      id: 2b72d7bf-c38f-4a98-a433-608fbcdd2596
      time: 2019-10-18T15:23:20.809775386Z
      contenttype: application/json
    Extensions,
      beats: true
      heart: yes
      the: 42
    Data,
      {
        "id": 1,
        "label": ""
      }

2.5.1.2.1. Knative CLI sink flag

When you create an event source by using the Knative (kn) CLI, you can specify a sink where events are sent to from that resource by using the --sink flag. The sink can be any addressable or callable resource that can receive incoming events from other resources.

The following example creates a sink binding that uses a service, http://event-display.svc.cluster.local, as the sink:

You get an output similar to the following example:

$ kn source binding create bind-heartbeat \
  --namespace sinkbinding-example \
  --subject "Job:batch/v1:app=heartbeat-cron" \
  --sink http://event-display.svc.cluster.local \
  --ce-override "sink=bound"

svc in http://event-display.svc.cluster.local determines that the sink is a Knative service. Other default sink prefixes include channel, and broker.

After Knative Eventing is installed on your cluster, you can create a sink binding by using the web console. Using the OpenShift Container Platform web console provides a streamlined and intuitive user interface to create an event source.

Prerequisites

  • You have logged in to the OpenShift Container Platform web console.
  • The OpenShift Serverless Operator, Knative Serving, and Knative Eventing are installed on your OpenShift Container Platform cluster.
  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.

Procedure

  1. Create a Knative service to use as a sink:

    1. Navigate to +AddYAML.
    2. Copy the example YAML:

      apiVersion: serving.knative.dev/v1
      kind: Service
      metadata:
        name: event-display
      spec:
        template:
          spec:
            containers:
              - image: quay.io/openshift-knative/showcase
    3. Click Create.
  2. Create a CronJob resource that is used as an event source and sends an event every minute.

    1. Navigate to +AddYAML.
    2. Copy the example YAML:

      apiVersion: batch/v1
      kind: CronJob
      metadata:
        name: heartbeat-cron
      spec:
        # Run every minute
        schedule: "*/1 * * * *"
        jobTemplate:
          metadata:
            labels:
              app: heartbeat-cron
              bindings.knative.dev/include: true 
      1
      
          spec:
            template:
              spec:
                restartPolicy: Never
                containers:
                  - name: single-heartbeat
                    image: quay.io/openshift-knative/heartbeats
                    args:
                    - --period=1
                    env:
                      - name: ONE_SHOT
                        value: "true"
                      - name: POD_NAME
                        valueFrom:
                          fieldRef:
                            fieldPath: metadata.name
                      - name: POD_NAMESPACE
                        valueFrom:
                          fieldRef:
                            fieldPath: metadata.namespace
      1
      Ensure that you include the bindings.knative.dev/include: true label. The default namespace selection behavior of OpenShift Serverless uses inclusion mode.
    3. Click Create.
  3. Create a sink binding in the same namespace as the service created in the previous step, or any other sink that you want to send events to.

    1. Navigate to +AddEvent Source. The Event Sources page is displayed.
    2. Optional: If you have multiple providers for your event sources, select the required provider from the Providers list to filter the available event sources from the provider.
    3. Select Sink Binding and then click Create Event Source. The Create Event Source page is displayed.

      Note

      You can configure the Sink Binding settings by using the Form view or YAML view and can switch between the views. The data is persisted when switching between the views.

    4. In the apiVersion field enter batch/v1.
    5. In the Kind field enter Job.

      Note

      The CronJob kind is not supported directly by OpenShift Serverless sink binding, so the Kind field must target the Job objects created by the cron job, rather than the cron job object itself.

    6. In the Target section, select your event sink. This can be either a Resource or a URI:

      1. Select Resource to use a channel, broker, or service as an event sink for the event source. In this example, the event-display service created in the previous step is used as the target Resource.
      2. Select URI to specify a Uniform Resource Identifier (URI) where the events are routed to.
    7. In the Match labels section:

      1. Enter app in the Name field.
      2. Enter heartbeat-cron in the Value field.

        Note

        The label selector is required when using cron jobs with sink binding, rather than the resource name. This is because jobs created by a cron job do not have a predictable name, and contain a randomly generated string in their name. For example, hearthbeat-cron-1cc23f.

    8. Click Create.

Verification

You can verify that the sink binding, sink, and cron job have been created and are working correctly by viewing the Topology page and pod logs.

  1. Navigate to Topology.
  2. View the sink binding, sink, and heartbeats cron job.

    View the sink binding and service in the Topology view
  3. Observe that successful jobs are being registered by the cron job once the sink binding is added. This means that the sink binding is successfully reconfiguring the jobs created by the cron job.
  4. Browse the event-display service to see events produced by the heartbeats cron job.

    View the events in the web UI
2.5.1.4. Sink binding reference

You can use a PodSpecable object as an event source by creating a sink binding. You can configure multiple parameters when creating a SinkBinding object.

SinkBinding objects support the following parameters:

Expand
FieldDescriptionRequired or optional

apiVersion

Specifies the API version, for example sources.knative.dev/v1.

Required

kind

Identifies this resource object as a SinkBinding object.

Required

metadata

Specifies metadata that uniquely identifies the SinkBinding object. For example, a name.

Required

spec

Specifies the configuration information for this SinkBinding object.

Required

spec.sink

A reference to an object that resolves to a URI to use as the sink.

Required

spec.subject

References the resources for which the runtime contract is augmented by binding implementations.

Required

spec.ceOverrides

Defines overrides to control the output format and modifications to the event sent to the sink.

Optional

2.5.1.4.1. Subject parameter

The Subject parameter references the resources for which the runtime contract is augmented by binding implementations. You can configure multiple fields for a Subject definition.

The Subject definition supports the following fields:

Expand
FieldDescriptionRequired or optional

apiVersion

API version of the referent.

Required

kind

Kind of the referent.

Required

namespace

Namespace of the referent. If omitted, this defaults to the namespace of the object.

Optional

name

Name of the referent.

Do not use if you configure selector.

selector

Selector of the referents.

Do not use if you configure name.

selector.matchExpressions

A list of label selector requirements.

Only use one of either matchExpressions or matchLabels.

selector.matchExpressions.key

The label key that the selector applies to.

Required if using matchExpressions.

selector.matchExpressions.operator

Represents a key’s relationship to a set of values. Valid operators are In, NotIn, Exists and DoesNotExist.

Required if using matchExpressions.

selector.matchExpressions.values

An array of string values. If the operator parameter value is In or NotIn, the values array must be non-empty. If the operator parameter value is Exists or DoesNotExist, the values array must be empty. This array is replaced during a strategic merge patch.

Required if using matchExpressions.

selector.matchLabels

A map of key-value pairs. Each key-value pair in the matchLabels map is equivalent to an element of matchExpressions, where the key field is matchLabels.<key>, the operator is In, and the values array contains only matchLabels.<value>.

Only use one of either matchExpressions or matchLabels.

Subject parameter examples

Given the following YAML, the Deployment object named mysubject in the default namespace is selected:

apiVersion: sources.knative.dev/v1
kind: SinkBinding
metadata:
  name: bind-heartbeat
spec:
  subject:
    apiVersion: apps/v1
    kind: Deployment
    namespace: default
    name: mysubject
  ...

Given the following YAML, any Job object with the label working=example in the default namespace is selected:

apiVersion: sources.knative.dev/v1
kind: SinkBinding
metadata:
  name: bind-heartbeat
spec:
  subject:
    apiVersion: batch/v1
    kind: Job
    namespace: default
    selector:
      matchLabels:
        working: example
  ...

Given the following YAML, any Pod object with the label working=example or working=sample in the default namespace is selected:

apiVersion: sources.knative.dev/v1
kind: SinkBinding
metadata:
  name: bind-heartbeat
spec:
  subject:
    apiVersion: v1
    kind: Pod
    namespace: default
    selector:
      - matchExpression:
        key: working
        operator: In
        values:
          - example
          - sample
  ...
2.5.1.4.2. CloudEvent overrides

A ceOverrides definition provides overrides that control the CloudEvent’s output format and modifications sent to the sink. You can configure multiple fields for the ceOverrides definition.

A ceOverrides definition supports the following fields:

Expand
FieldDescriptionRequired or optional

extensions

Specifies which attributes are added or overridden on the outbound event. Each extensions key-value pair is set independently on the event as an attribute extension.

Optional

Note

Only valid CloudEvent attribute names are allowed as extensions. You cannot set the spec defined attributes from the extensions override configuration. For example, you can not modify the type attribute.

CloudEvent Overrides example

apiVersion: sources.knative.dev/v1
kind: SinkBinding
metadata:
  name: bind-heartbeat
spec:
  ...
  ceOverrides:
    extensions:
      extra: this is an extra attribute
      additional: 42

This sets the K_CE_OVERRIDES environment variable on the subject:

Example output

{ "extensions": { "extra": "this is an extra attribute", "additional": "42" } }

2.5.1.4.3. The include label

To use a sink binding, you need to do assign the bindings.knative.dev/include: "true" label to either the resource or the namespace that the resource is included in. If the resource definition does not include the label, a cluster administrator can attach it to the namespace by running:

$ oc label namespace <namespace> bindings.knative.dev/include=true

You can integrate Service Mesh with SinkBinding event sources by injecting service mesh sidecars into both the sink and the bound workload pods. This enables secure communication and advanced networking features for applications by using sink bindings to send events.

Prerequisites

  • You have integrated Service Mesh with OpenShift Serverless.

Procedure

  1. Create a Service in a namespace that is a member of the ServiceMeshMemberRoll.

    apiVersion: serving.knative.dev/v1
    kind: Service
    metadata:
      name: event-display
      namespace: <namespace> 
    1
    
    spec:
      template:
        metadata:
          annotations:
            sidecar.istio.io/inject: "true" 
    2
    
            sidecar.istio.io/rewriteAppHTTPProbers: "true"
        spec:
          containers:
          - image: quay.io/openshift-knative/showcase
    1
    A namespace that is a member of the ServiceMeshMemberRoll.
    2
    Injects Service Mesh sidecars into the Knative service pods.
  2. Apply the Service resource.

    $ oc apply -f <filename>
  3. Create a SinkBinding resource.

    apiVersion: sources.knative.dev/v1
    kind: SinkBinding
    metadata:
      name: bind-heartbeat
      namespace: <namespace> 
    1
    
    spec:
      subject:
        apiVersion: batch/v1
        kind: Job 
    2
    
        selector:
          matchLabels:
            app: heartbeat-cron
    
      sink:
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: event-display
    1
    A namespace that is a member of the ServiceMeshMemberRoll.
    2
    In this example, any Job with the label app: heartbeat-cron is bound to the event sink.
  4. Apply the SinkBinding resource.

    $ oc apply -f <filename>
  5. Create a CronJob:

    apiVersion: batch/v1
    kind: CronJob
    metadata:
      name: heartbeat-cron
      namespace: <namespace> 
    1
    
    spec:
      # Run every minute
      schedule: "* * * * *"
      jobTemplate:
        metadata:
          labels:
            app: heartbeat-cron
            bindings.knative.dev/include: "true"
        spec:
          template:
            metadata:
              annotations:
                sidecar.istio.io/inject: "true" 
    2
    
                sidecar.istio.io/rewriteAppHTTPProbers: "true"
            spec:
              restartPolicy: Never
              containers:
                - name: single-heartbeat
                  image: quay.io/openshift-knative/heartbeats:latest
                  args:
                    - --period=1
                  env:
                    - name: ONE_SHOT
                      value: "true"
                    - name: POD_NAME
                      valueFrom:
                        fieldRef:
                          fieldPath: metadata.name
                    - name: POD_NAMESPACE
                      valueFrom:
                        fieldRef:
                          fieldPath: metadata.namespace
    1
    A namespace that is a member of the ServiceMeshMemberRoll.
    2
    Injects Service Mesh sidecars into the CronJob pods.
  6. Apply the CronJob resource.

    $ oc apply -f <filename>

Verification

To verify that the events were sent to the Knative event sink, examine the message dumper function logs.

  1. Enter the following command:

    $ oc get pods
  2. Enter the following command:

    $ oc logs $(oc get pod -o name | grep event-display) -c user-container

    Example output

    ☁️  cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.eventing.samples.heartbeat
      source: https://knative.dev/eventing/test/heartbeats/#event-test/mypod
      id: 2b72d7bf-c38f-4a98-a433-608fbcdd2596
      time: 2019-10-18T15:23:20.809775386Z
      contenttype: application/json
    Extensions,
      beats: true
      heart: yes
      the: 42
    Data,
      {
        "id": 1,
        "label": ""
      }

2.5.2. Container source

Container sources create a container image that generates events and sends events to a sink. You can use a container source to create a custom event source, by creating a container image and a ContainerSource object that uses your image URI.

2.5.2.1. Guidelines for creating a container image

The container source controller injects two environment variables: K_SINK and K_CE_OVERRIDES. The system resolves these variables from the sink and ceOverrides specifications. The system sends events to the sink URI specified in the K_SINK environment variable. Send the message as a POST request by using the CloudEvent HTTP format.

Example container images

The following is an example of a heartbeats container image:

package main

import (
	"context"
	"encoding/json"
	"flag"
	"fmt"
	"log"
	"os"
	"strconv"
	"time"

	duckv1 "knative.dev/pkg/apis/duck/v1"

	cloudevents "github.com/cloudevents/sdk-go/v2"
	"github.com/kelseyhightower/envconfig"
)

type Heartbeat struct {
	Sequence int    `json:"id"`
	Label    string `json:"label"`
}

var (
	eventSource string
	eventType   string
	sink        string
	label       string
	periodStr   string
)

func init() {
	flag.StringVar(&eventSource, "eventSource", "", "the event-source (CloudEvents)")
	flag.StringVar(&eventType, "eventType", "dev.knative.eventing.samples.heartbeat", "the event-type (CloudEvents)")
	flag.StringVar(&sink, "sink", "", "the host url to heartbeat to")
	flag.StringVar(&label, "label", "", "a special label")
	flag.StringVar(&periodStr, "period", "5", "the number of seconds between heartbeats")
}

type envConfig struct {
	// Sink URL where to send heartbeat cloud events
	Sink string `envconfig:"K_SINK"`

	// CEOverrides are the CloudEvents overrides to be applied to the outbound event.
	CEOverrides string `envconfig:"K_CE_OVERRIDES"`

	// Name of this pod.
	Name string `envconfig:"POD_NAME" required:"true"`

	// Namespace this pod exists in.
	Namespace string `envconfig:"POD_NAMESPACE" required:"true"`

	// Whether to run continuously or exit.
	OneShot bool `envconfig:"ONE_SHOT" default:"false"`
}

func main() {
	flag.Parse()

	var env envConfig
	if err := envconfig.Process("", &env); err != nil {
		log.Printf("[ERROR] Failed to process env var: %s", err)
		os.Exit(1)
	}

	if env.Sink != "" {
		sink = env.Sink
	}

	var ceOverrides *duckv1.CloudEventOverrides
	if len(env.CEOverrides) > 0 {
		overrides := duckv1.CloudEventOverrides{}
		err := json.Unmarshal([]byte(env.CEOverrides), &overrides)
		if err != nil {
			log.Printf("[ERROR] Unparseable CloudEvents overrides %s: %v", env.CEOverrides, err)
			os.Exit(1)
		}
		ceOverrides = &overrides
	}

	p, err := cloudevents.NewHTTP(cloudevents.WithTarget(sink))
	if err != nil {
		log.Fatalf("failed to create http protocol: %s", err.Error())
	}

	c, err := cloudevents.NewClient(p, cloudevents.WithUUIDs(), cloudevents.WithTimeNow())
	if err != nil {
		log.Fatalf("failed to create client: %s", err.Error())
	}

	var period time.Duration
	if p, err := strconv.Atoi(periodStr); err != nil {
		period = time.Duration(5) * time.Second
	} else {
		period = time.Duration(p) * time.Second
	}

	if eventSource == "" {
		eventSource = fmt.Sprintf("https://knative.dev/eventing-contrib/cmd/heartbeats/#%s/%s", env.Namespace, env.Name)
		log.Printf("Heartbeats Source: %s", eventSource)
	}

	if len(label) > 0 && label[0] == '"' {
		label, _ = strconv.Unquote(label)
	}
	hb := &Heartbeat{
		Sequence: 0,
		Label:    label,
	}
	ticker := time.NewTicker(period)
	for {
		hb.Sequence++

		event := cloudevents.NewEvent("1.0")
		event.SetType(eventType)
		event.SetSource(eventSource)
		event.SetExtension("the", 42)
		event.SetExtension("heart", "yes")
		event.SetExtension("beats", true)

		if ceOverrides != nil && ceOverrides.Extensions != nil {
			for n, v := range ceOverrides.Extensions {
				event.SetExtension(n, v)
			}
		}

		if err := event.SetData(cloudevents.ApplicationJSON, hb); err != nil {
			log.Printf("failed to set cloudevents data: %s", err.Error())
		}

		log.Printf("sending cloudevent to %s", sink)
		if res := c.Send(context.Background(), event); !cloudevents.IsACK(res) {
			log.Printf("failed to send cloudevent: %v", res)
		}

		if env.OneShot {
			return
		}

		// Wait for next tick
		<-ticker.C
	}
}

The following is an example of a container source that references the earlier heartbeats container image:

apiVersion: sources.knative.dev/v1
kind: ContainerSource
metadata:
  name: test-heartbeats
spec:
  template:
    spec:
      containers:
        # This corresponds to a heartbeats image URI that you have built and published
        - image: gcr.io/knative-releases/knative.dev/eventing/cmd/heartbeats
          name: heartbeats
          args:
            - --period=1
          env:
            - name: POD_NAME
              value: "example-pod"
            - name: POD_NAMESPACE
              value: "event-test"
  sink:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: showcase
...

You can use the kn source container commands to create and manage container sources by using the Knative (kn) CLI. Using the Knative CLI to create event sources provides a more streamlined and intuitive user interface than modifying YAML files directly.

Procedure

  1. Create a container source by running the folloing command:

    $ kn source container create <container_source_name> --image <image_uri> --sink <sink>
  2. Delete a container source by running the folloing command:

    $ kn source container delete <container_source_name>
  3. Describe a container source by running the folloing command:

    $ kn source container describe <container_source_name>
  4. List existing container sources by running the folloing command:

    $ kn source container list
  5. List existing container sources in YAML format by running the folloing command:

    $ kn source container list -o yaml
  6. Update a container source by running the folloing command:

    This command updates the image URI for an existing container source:

    $ kn source container update <container_source_name> --image <image_uri>

After Knative Eventing is installed on your cluster, you can create a container source by using the web console. Using the OpenShift Container Platform web console provides a streamlined and intuitive user interface to create an event source.

Prerequisites

  • You have logged in to the OpenShift Container Platform web console.
  • The OpenShift Serverless Operator, Knative Serving, and Knative Eventing are installed on your OpenShift Container Platform cluster.
  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.

Procedure

  1. Navigate to +AddEvent Source. The Event Sources page is displayed.
  2. Select Container Source and then click Create Event Source. The Create Event Source page is displayed.
  3. Configure the Container Source settings by using the Form view or YAML view:

    Note

    You can switch between the Form view and YAML view. The data is persisted when switching between the views.

    1. In the Image field, enter the URI of the image that you want to run in the container created by the container source.
    2. In the Name field, enter the name of the image.
    3. Optional: In the Arguments field, enter any arguments to be passed to the container.
    4. Optional: In the Environment variables field, add any environment variables to set in the container.
    5. In the Target section, select your event sink. This can be either a Resource or a URI:

      1. Select Resource to use a channel, broker, or service as an event sink for the event source.
      2. Select URI to specify a Uniform Resource Identifier (URI) where the events are routed to.
  4. After you have finished configuring the container source, click Create.
2.5.2.4. Container source reference

You can use a container as an event source, by creating a ContainerSource object. You can configure many parameters when creating a ContainerSource object.

ContainerSource objects support the following fields:

Expand
FieldDescriptionRequired or optional

apiVersion

Specifies the API version, for example sources.knative.dev/v1.

Required

kind

Identifies this resource object as a ContainerSource object.

Required

metadata

Specifies metadata that uniquely identifies the ContainerSource object. For example, a name.

Required

spec

Specifies the configuration information for this ContainerSource object.

Required

spec.sink

A reference to an object that resolves to a URI to use as the sink.

Required

spec.template

A template spec for the ContainerSource object.

Required

spec.ceOverrides

Defines overrides to control the output format and modifications to the event sent to the sink.

Optional

Template parameter example

apiVersion: sources.knative.dev/v1
kind: ContainerSource
metadata:
  name: test-heartbeats
spec:
  template:
    spec:
      containers:
        - image: quay.io/openshift-knative/heartbeats:latest
          name: heartbeats
          args:
            - --period=1
          env:
            - name: POD_NAME
              value: "mypod"
            - name: POD_NAMESPACE
              value: "event-test"
  ...

2.5.2.4.1. CloudEvent overrides

A ceOverrides definition provides overrides that control the CloudEvent output format and modifications sent to the sink. You can configure many fields for the ceOverrides definition.

A ceOverrides definition supports the following fields:

Expand
FieldDescriptionRequired or optional

extensions

Specifies the attributes that the system adds to or overrides on the outbound event. The system sets each extensions key-value pair as an independent attribute extension on the event.

Optional

Note

Use only valid CloudEvent attribute names as extensions. Do not set specification-defined attributes in the extensions override configuration. For example, you cannot change the type attribute.

CloudEvent Overrides example

apiVersion: sources.knative.dev/v1
kind: ContainerSource
metadata:
  name: test-heartbeats
spec:
  ...
  ceOverrides:
    extensions:
      extra: this is an extra attribute
      additional: 42

This sets the K_CE_OVERRIDES environment variable on the subject:

Example output

{ "extensions": { "extra": "this is an extra attribute", "additional": "42" } }

You can integrate Service Mesh with ContainerSource event sources by injecting service mesh sidecars into the container source pods. This enables secure service-to-service communication and advanced traffic management features for your event-driven applications.

Prerequisites

  • You have integrated Service Mesh with OpenShift Serverless.

Procedure

  1. Create a Service in a namespace that is a member of the ServiceMeshMemberRoll.

    apiVersion: serving.knative.dev/v1
    kind: Service
    metadata:
      name: event-display
      namespace: <namespace> 
    1
    
    spec:
      template:
        metadata:
          annotations:
            sidecar.istio.io/inject: "true" 
    2
    
            sidecar.istio.io/rewriteAppHTTPProbers: "true"
        spec:
          containers:
          - image: quay.io/openshift-knative/showcase
    1
    A namespace that is a member of the ServiceMeshMemberRoll.
    2
    Injects Service Mesh sidecars into the Knative service pods.
  2. Apply the Service resource.

    $ oc apply -f <filename>
  3. Create a ContainerSource object in a namespace that is a member of the ServiceMeshMemberRoll and sink set to the event-display.

    apiVersion: sources.knative.dev/v1
    kind: ContainerSource
    metadata:
      name: test-heartbeats
      namespace: <namespace> 
    1
    
    spec:
      template:
        metadata: 
    2
    
          annotations:
            sidecar.istio.io/inject: "true"
            sidecar.istio.io/rewriteAppHTTPProbers: "true"
        spec:
          containers:
            - image: quay.io/openshift-knative/heartbeats:latest
              name: heartbeats
              args:
                - --period=1s
              env:
                - name: POD_NAME
                  value: "example-pod"
                - name: POD_NAMESPACE
                  value: "event-test"
      sink:
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: event-display
    1
    A namespace is part of the ServiceMeshMemberRoll.
    2
    Enables Service Mesh integration with a ContainerSource object.
  4. Apply the ContainerSource resource.

    $ oc apply -f <filename>

Verification

To verify that events reached the Knative event sink, check the message dumper function logs.

  1. Enter the following command:

    $ oc get pods
  2. Enter the following command:

    $ oc logs $(oc get pod -o name | grep event-display) -c user-container

    Example output

    ☁️  cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.eventing.samples.heartbeat
      source: https://knative.dev/eventing/test/heartbeats/#event-test/mypod
      id: 2b72d7bf-c38f-4a98-a433-608fbcdd2596
      time: 2019-10-18T15:23:20.809775386Z
      contenttype: application/json
    Extensions,
      beats: true
      heart: yes
      the: 42
    Data,
      {
        "id": 1,
        "label": ""
      }

2.6. Connecting an event source to an event sink

When you create an event source by using the OpenShift Container Platform web console, you can specify a target event sink that events are sent to from that source. The event sink can be any addressable or callable resource that can receive incoming events from other resources.

2.6.1. Connect an event source to an event sink

You can connect an event source to an event sink by using the OpenShift Container Platform web console to establish event-driven workflows. The event sink can be a Knative service, channel, broker, or a custom URI that receives and processes events from the source.

Prerequisites

  • You have installed the OpenShift Serverless Operator, Knative Serving, and Knative Eventing on your OpenShift Container Platform cluster.
  • You have logged in to the web console.
  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.
  • You have created an event sink, such as a Knative service, channel or broker.

Procedure

  1. Create an event source of any type, by navigating to +AddEvent Source and selecting the event source type that you want to create.
  2. In the Target section of the Create Event Source form view, select your event sink. This can be either a Resource or a URI:

    1. Select Resource to use a channel, broker, or service as an event sink for the event source.
    2. Select URI to specify the Uniform Resource Identifier (URI) where the system routes events.
  3. Click Create.

Verification

  1. Verify that the event source exists and connects to the sink on the Topology page.
  2. Navigate to Topology.
  3. View the event source and click the connected event sink to see the sink details in the right panel.

2.7. Getting started with IntegrationSource

Important

OpenShift Serverless IntegrationSource feature is a Technology Preview feature only. Technology Preview features are not supported with Red Hat production service level agreements (SLAs) and might not be functionally complete. Red Hat does not recommend using them in production. These features provide early access to upcoming product features, enabling customers to test functionality and provide feedback during the development process.

For more information about the support scope of Red Hat Technology Preview features, see Technology Preview Features Support Scope.

The IntegrationSource API is a Knative Eventing custom resource (CR) that enables you to connect to external systems by leveraging selected Kamelets from the Apache Camel project.

Kamelets act as reusable connectors that can function either as sources or sinks. By using an IntegrationSource API, you can consume data from external systems and forward the data as CloudEvents into Knative Eventing.

OpenShift Serverless supports the following Kamelet sources:

  • AWS DynamoDB Streams
  • AWS S3
  • AWS SQS
  • Timer Source

2.7.1. Creating AWS credentials

To connect to Amazon Web Services (AWS) resources, the IntegrationSource requires a Kubernetes Secret that contains valid AWS credentials. You must create this Secret in the same namespace as the IntegrationSource resource and must include both the AWS access key and secret key.

Prerequisites

  • You have an AWS account with an access key ID and secret access key that allow access to the Amazon DynamoDB Streams service.
  • You have the OpenShift CLI (oc) installed and are logged in to the cluster.
  • You have identified the namespace in which the IntegrationSource resource will be created.

Procedure

  • Create the Secret by running the following command:

    $ oc -n <namespace> create secret generic my-secret \
      --from-literal=aws.accessKey=<accessKey> \
      --from-literal=aws.secretKey=<secretKey>

    Replace <namespace> with the namespace where the IntegrationSource resource resides, and substitute <accessKey> and <secretKey> with the corresponding AWS credentials.

2.7.2. AWS DynamoDB Streams with IntegrationSource

The Amazon Web Services (AWS) DynamoDB Streams Kamelet allows you to consume changes from an Amazon DynamoDB table and forward them into Knative Eventing. This integration makes it easier to react to database changes and propagate events within your serverless applications.

To configure an IntegrationSource for DynamoDB Streams, you must provide valid AWS credentials, specify the DynamoDB table and its region, and define the event sink, such as a Knative broker.

To create an IntegrationSource API that receives events from Amazon DynamoDB Streams, apply a YAML manifest.

Prerequisites

  • You have created a Kubernetes Secret containing valid Amazon Web Services (AWS) credentials with access to Amazon DynamoDB Streams.
  • You have the OpenShift CLI (oc) installed and logged in to the target cluster.
  • Specify the namespace where you will create the IntegrationSink resource.
  • A Knative broker or another event source exists to produce CloudEvents and send them to this sink.

Procedure

  1. Save the following YAML manifest as integration-source-aws-ddb.yaml:

    apiVersion: sources.knative.dev/v1alpha1
    kind: IntegrationSource
    metadata:
      name: integration-source-aws-ddb
      namespace: knative-samples
    spec:
      aws:
        ddbStreams:
          table: "my-table"
          region: "eu-north-1"
        auth:
          secret:
            ref:
              name: "my-secret"
      sink:
        ref:
          apiVersion: eventing.knative.dev/v1
          kind: Broker
          name: default
  2. Apply the manifest by running the following command:

    $ oc apply -f integration-source-aws-ddb.yaml

    This manifest configures the IntegrationSource API to read change events from the DynamoDB table my-table in the eu-north-1 region. The auth.secret.ref.name field references the Kubernetes Secret (my-secret) that stores your AWS credentials. The system sends events to the default Knative broker defined in the sink section.

2.7.3. AWS S3 with IntegrationSource

The Amazon Web Services (AWS) Simple Storage Service (S3) Kamelet allows you to consume object storage events such as file uploads or updates from an Amazon S3 bucket and forward them into Knative Eventing. This integration helps OpenShift Serverless applications react to changes in object storage, such as new files or data updates, without the need for polling.

To configure an IntegrationSource for S3, you must provide valid AWS credentials, specify the Amazon S3 bucket Amazon Resource Name (ARN) and its region, and define the event sink, such as a Knative broker.

2.7.3.1. Creating an IntegrationSource for AWS S3

To create an IntegrationSource API that receives data from an Amazon S3 bucket, apply a YAML manifest.

Prerequisites

  • You have created your AWS credentials and stored them in a Kubernetes Secret in the same namespace as the IntegrationSource resource.
  • You have the OpenShift CLI (oc) installed and logged in to the target cluster.
  • Specify the namespace where you will create the IntegrationSink resource.
  • A Knative broker or another event source exists to produce CloudEvents and send them to this sink.

Procedure

  1. Save the following YAML manifest as integration-source-aws-s3.yaml:

    apiVersion: sources.knative.dev/v1alpha1
    kind: IntegrationSource
    metadata:
      name: integration-source-aws-s3
      namespace: knative-samples
    spec:
      aws:
        s3:
          arn: "arn:aws:s3:::my-bucket"
          region: "eu-north-1"
        auth:
          secret:
            ref:
              name: "my-secret"
      sink:
        ref:
          apiVersion: eventing.knative.dev/v1
          kind: Broker
          name: default
  2. Apply the manifest by running the following command:

    $ oc apply -f integration-source-aws-s3.yaml

    This manifest configures the IntegrationSource API to monitor an Amazon S3 bucket identified by its Amazon Resource Name (ARN)arn:aws:s3:::my-bucket in the eu-north-1 region. The auth.secret.ref.name field references the Kubernetes Secret (my-secret) that stores your AWS credentials. The system sends events to the default Knative broker defined in the sink section.

2.7.4. AWS SQS with IntegrationSource

The Amazon Web Services (AWS) Simple Queue Service (SQS) Kamelet enables you to consume messages from an Amazon SQS queue and forward them into Knative Eventing. This integration allows OpenShift Serverless applications to react to queued messages, supporting event-driven architectures that decouple producers and consumers.

To configure an IntegrationSource for SQS, you must provide valid AWS credentials, specify the Amazon SQS queue Amazon Resource Name (ARN) and its region, and define the event sink, such as a Knative broker. The Amazon Resource Name (ARN) uniquely identifies the SQS queue across all AWS accounts and regions.

2.7.4.1. Creating an IntegrationSource for SQS

To create an IntegrationSource API that receives messages from an Amazon Simple Queue Service (SQS) queue, apply a YAML manifest.

Prerequisites

  • You have created your AWS credentials and stored them in a Kubernetes Secret in the same namespace as the IntegrationSource resource.
  • You have the OpenShift CLI (oc) installed and logged in to the target cluster.
  • Specify the namespace where you will create the IntegrationSink resource.
  • A Knative broker or another event source exists to produce CloudEvents and send them to this sink.

Procedure

  1. Save the following YAML manifest as integration-source-aws-sqs.yaml:

    apiVersion: sources.knative.dev/v1alpha1
    kind: IntegrationSource
    metadata:
      name: integration-source-aws-sqs
      namespace: knative-samples
    spec:
      aws:
        sqs:
          arn: "arn:aws:sqs:eu-north-1:123456789012:my-queue"
          region: "eu-north-1"
        auth:
          secret:
            ref:
              name: "my-secret"
      sink:
        ref:
          apiVersion: eventing.knative.dev/v1
          kind: Broker
          name: default
    Note

    Ensure that the Amazon Resource Name (ARN) accurately reflects the SQS queue region, account number, and queue name. For example, arn:aws:sqs:<region>:<account_id>:<queue_name>

  2. Apply the manifest by running the following command:

    $ oc apply -f integration-source-aws-sqs.yaml

    This manifest configures the IntegrationSource API to monitor an Amazon SQS queue identified by its Amazon Resource Name (ARN) in the eu-north-1 region. The auth.secret.ref.name field references the Kubernetes Secret (my-secret) that stores your AWS credentials. The system sends events to the default Knative broker defined in the sink section.

2.7.5. Timer source with IntegrationSource

The Timer Kamelet produces periodic messages with a custom payload and forwards them into Knative Eventing. This is useful for testing event flows or generating scheduled events without relying on external systems.

To configure an IntegrationSource for the Timer Kamelet, you specify the interval at which messages are generated and the message content, and then define the event sink, such as a Knative broker.

2.7.5.1. Creating an IntegrationSource for timer

To create an IntegrationSource API that produces periodic messages with a custom payload, apply a YAML manifest.

Prerequisites

  • You have the OpenShift CLI (oc) installed and logged in to the target cluster.
  • Specify the namespace where you will create the IntegrationSource resource.
  • A Knative broker or another event sink exists to receive the Amazon Simple Queue Service (SQS) events.

Procedure

  1. Save the following YAML manifest as integration-source-timer.yaml:

    apiVersion: sources.knative.dev/v1alpha1
    kind: IntegrationSource
    metadata:
      name: integration-source-timer
      namespace: knative-samples
    spec:
      timer:
        period: 2000
        message: "Hello from Timer source"
      sink:
        ref:
          apiVersion: eventing.knative.dev/v1
          kind: Broker
          name: default
  2. Apply the manifest by running the following command:

    $ oc apply -f integration-source-timer.yaml

    This manifest configures the IntegrationSource to emit a message "Hello from Timer source" every 2000 milliseconds (2 seconds). The system sends events to the default Knative broker defined in the sink section.

Chapter 3. Event sinks

3.1. Event sinks

When you create an event source, you can specify an event sink where events are sent to from the source. An event sink is an addressable or a callable resource that can receive incoming events from other resources. Knative services, channels, and brokers are all examples of event sinks. There is also a specific Apache Kafka sink type available.

Addressable objects receive and acknowledge an event delivered over HTTP to an address defined in their status.address.url field. As a special case, the core Kubernetes Service object also fulfills the addressable interface.

Callable objects are able to receive an event delivered over HTTP and transform the event, returning 0 or 1 new events in the HTTP response. These returned events may be further processed in the same way that events from an external event source are processed.

3.1.1. Knative CLI sink flag

When you create an event source by using the Knative (kn) CLI, you can specify a sink where events are sent to from that resource by using the --sink flag. The sink can be any addressable or callable resource that can receive incoming events from other resources.

The following example creates a sink binding that uses a service, http://event-display.svc.cluster.local, as the sink:

You get an output similar to the following example:

$ kn source binding create bind-heartbeat \
  --namespace sinkbinding-example \
  --subject "Job:batch/v1:app=heartbeat-cron" \
  --sink http://event-display.svc.cluster.local \
  --ce-override "sink=bound"

svc in http://event-display.svc.cluster.local determines that the sink is a Knative service. Other default sink prefixes include channel, and broker.

Tip

You can configure which CRs can be used with the --sink flag for Knative (kn) CLI commands by Customizing kn.

3.2. Creating event sinks

When you create an event source, you can specify an event sink where events are sent to from the source. An event sink is an addressable or a callable resource that can receive incoming events from other resources. Knative services, channels, and brokers are all examples of event sinks. There is also a specific Apache Kafka sink type available.

For information about creating resources that can be used as event sinks, see the following documentation:

3.3. Sink for Apache Kafka

Apache Kafka sinks are a type of event sink that are available if a cluster administrator has enabled Apache Kafka on your cluster. You can send events directly from an event source to a Kafka topic by using a Kafka sink.

3.3.1. Creating an Apache Kafka sink by using YAML

You can create a Kafka sink that sends events to a Kafka topic. By default, a Kafka sink uses the binary content mode, which is more efficient than the structured mode. To create a Kafka sink by using YAML, you must create a YAML file that defines a KafkaSink object, then apply it by using the oc apply command.

Prerequisites

  • The OpenShift Serverless Operator, Knative Eventing, and the KnativeKafka custom resource (CR) are installed on your cluster.
  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.
  • You have access to a Red Hat AMQ Streams (Kafka) cluster that produces the Kafka messages you want to import.
  • Install the OpenShift CLI (oc).

Procedure

  1. Create a KafkaSink object definition as a YAML file:

    Kafka sink YAML

    apiVersion: eventing.knative.dev/v1alpha1
    kind: KafkaSink
    metadata:
      name: <sink-name>
      namespace: <namespace>
    spec:
      topic: <topic-name>
      bootstrapServers:
       - <bootstrap-server>

  2. To create the Kafka sink, apply the KafkaSink YAML file:

    $ oc apply -f <filename>
  3. Configure an event source so that the sink is specified in its spec:

    Example of a Kafka sink connected to an API server source

    apiVersion: sources.knative.dev/v1alpha2
    kind: ApiServerSource
    metadata:
      name: <source-name> 
    1
    
      namespace: <namespace> 
    2
    
    spec:
      serviceAccountName: <service-account-name> 
    3
    
      mode: Resource
      resources:
      - apiVersion: v1
        kind: Event
      sink:
        ref:
          apiVersion: eventing.knative.dev/v1alpha1
          kind: KafkaSink
          name: <sink-name> 
    4

    1
    The name of the event source.
    2
    The namespace of the event source.
    3
    The service account for the event source.
    4
    The Kafka sink name.

You can create a Kafka sink that sends events to a Kafka topic in the OpenShift Container Platform web console. By default, a Kafka sink uses the binary content mode, which is more efficient than the structured mode.

As a developer, you can create an event sink to receive events from a particular source and send them to a Kafka topic.

Prerequisites

  • You have installed the OpenShift Serverless Operator, with Knative Serving, Knative Eventing, and Knative broker for Apache Kafka APIs, from the OperatorHub.
  • You have created a Kafka topic in your Kafka environment.

Procedure

  1. Navigate to the +Add view.
  2. Click Event Sink in the Eventing catalog.
  3. Search for KafkaSink in the catalog items and click it.
  4. Click Create Event Sink.
  5. In the form view, type the URL of the bootstrap server, which is a combination of hostname and port.

    Event sink creation in OpenShift web console
  6. Type the name of the topic to send event data.
  7. Type the name of the event sink.
  8. Click Create.

Verification

  1. Navigate to the Topology view.
  2. Click the created event sink to view its details in the right panel.

3.3.3. Configuring security for Apache Kafka sinks

Transport Layer Security (TLS) is used by Apache Kafka clients and servers to encrypt traffic between Knative and Kafka, as well as for authentication. TLS is the only supported method of traffic encryption for the Knative broker implementation for Apache Kafka.

Simple Authentication and Security Layer (SASL) is used by Apache Kafka for authentication. If you use SASL authentication on your cluster, users must provide credentials to Knative for communicating with the Kafka cluster; otherwise events cannot be produced or consumed.

Prerequisites

  • The OpenShift Serverless Operator, Knative Eventing, and the KnativeKafka custom resources (CRs) are installed on your OpenShift Container Platform cluster.
  • Kafka sink is enabled in the KnativeKafka CR.
  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.
  • You have a Kafka cluster CA certificate stored as a .pem file.
  • You have a Kafka cluster client certificate and a key stored as .pem files.
  • You have installed the OpenShift (oc) CLI.
  • You have chosen the SASL mechanism to use, for example, PLAIN, SCRAM-SHA-256, or SCRAM-SHA-512.

Procedure

  1. Create the certificate files as a secret in the same namespace as your KafkaSink object:

    Important

    Certificates and keys must be in PEM format.

    • For authentication using SASL without encryption:

      $ oc create secret -n <namespace> generic <secret_name> \
        --from-literal=protocol=SASL_PLAINTEXT \
        --from-literal=sasl.mechanism=<sasl_mechanism> \
        --from-literal=user=<username> \
        --from-literal=password=<password>
    • For authentication using SASL and encryption using TLS:

      $ oc create secret -n <namespace> generic <secret_name> \
        --from-literal=protocol=SASL_SSL \
        --from-literal=sasl.mechanism=<sasl_mechanism> \
        --from-file=ca.crt=<my_caroot.pem_file_path> \ 
      1
      
        --from-literal=user=<username> \
        --from-literal=password=<password>
      1
      The ca.crt can be omitted to use the system’s root CA set if you are using a public cloud managed Kafka service.
    • For authentication and encryption using TLS:

      $ oc create secret -n <namespace> generic <secret_name> \
        --from-literal=protocol=SSL \
        --from-file=ca.crt=<my_caroot.pem_file_path> \ 
      1
      
        --from-file=user.crt=<my_cert.pem_file_path> \
        --from-file=user.key=<my_key.pem_file_path>
      1
      The ca.crt can be omitted to use the system’s root CA set if you are using a public cloud managed Kafka service.
  2. Create or modify a KafkaSink object and add a reference to your secret in the auth spec:

    apiVersion: eventing.knative.dev/v1alpha1
    kind: KafkaSink
    metadata:
       name: <sink_name>
       namespace: <namespace>
    spec:
    ...
       auth:
         secret:
           ref:
             name: <secret_name>
    ...
  3. Apply the KafkaSink object:

    $ oc apply -f <filename>

3.4. JobSink

Event processing usually completes within a short time frame, such as a few minutes. This ensures that the HTTP connection remains open and the service does not scale down prematurely.

Maintaining long-running connections increases the risk of failure, potentially leading to processing restarts and repeated request retries.

You can use JobSink to support long-running asynchronous jobs and tasks using the full Kubernetes batch/v1 Job resource and features and Kubernetes job queuing systems such as Kueue.

3.4.1. Using JobSink

When an event is sent to a JobSink, Eventing creates a Job and mounts the received event as JSON file at /etc/jobsink-event/event.

Procedure

  1. Create a JobSink object definition as a YAML file:

    JobSink YAML

    apiVersion: sinks.knative.dev/v1alpha1
    kind: JobSink
    metadata:
      name: job-sink-logger
    spec:
      job:
        spec:
          completions: 1
          parallelism: 1
          template:
            spec:
              restartPolicy: Never
              containers:
                - name: main
                  image: docker.io/library/bash:5
                  command: [ "cat" ]
                  args:
                    - "/etc/jobsink-event/event"

  2. Apply the JobSink YAML file:

    $ oc apply -f <job-sink-file.yaml>
  3. Verify JobSink is ready:

    $ oc get jobsinks.sinks.knative.dev

    Example output:

    NAME              URL                                                                          AGE   READY   REASON
    job-sink-logger   http://job-sink.knative-eventing.svc.cluster.local/default/job-sink-logger   5s    True
  4. Trigger a JobSink. JobSink can be triggered by any event source or trigger.

    $ oc run curl --image=curlimages/curl --rm=true --restart=Never -ti -- -X POST -v \
       -H "content-type: application/json"  \
       -H "ce-specversion: 1.0" \
       -H "ce-source: my/curl/command" \
       -H "ce-type: my.demo.event" \
       -H "ce-id: 123" \
       -d '{"details":"JobSinkDemo"}' \
       http://job-sink.knative-eventing.svc.cluster.local/default/job-sink-logger
  5. Verify a Job is created:

    $ oc logs job-sink-loggerszoi6-dqbtq

    Example output:

    {"specversion":"1.0","id":"123","source":"my/curl/command","type":"my.demo.event","datacontenttype":"application/json","data":{"details":"JobSinkDemo"}}
Note

JobSink creates a Job for each unique event it receives.

An event is uniquely identified by the combination of its source and id attributes.

If an event with the same attributes is received while a Job for that event already exists, another Job will not be created.

3.4.2. Reading the Job event file

Procedure

  • Read the event file and deserialize it by using any CloudEvents JSON deserializer. The following example demonstrates how to read and process an event using CloudEvents Go SDK:

    package mytask
    
    import (
        "encoding/json"
        "fmt"
        "os"
    
        cloudevents "github.com/cloudevents/sdk-go/v2"
    )
    
    func handleEvent() error {
        eventBytes, err := os.ReadFile("/etc/jobsink-event/event")
        if err != nil {
            return err
        }
    
        event := &cloudevents.Event{}
        if err := json.Unmarshal(eventBytes, event); err != nil {
            return err
        }
    
        fmt.Println(event)
    
        return nil
    }

3.4.3. Setting custom event file mount path

You can set a custom event file mount path in your JobSink definition.

Procedure

  • Inside your container definition, include the volumeMounts configuration and set as required.

    apiVersion: sinks.knative.dev/v1alpha1
    kind: JobSink
    metadata:
      name: job-sink-custom-mount-path
    spec:
      job:
        spec:
          completions: 1
          parallelism: 1
          template:
            spec:
              restartPolicy: Never
              containers:
                - name: main
                  image: docker.io/library/bash:5
                  command: [ "bash" ]
                  args:
                    - -c
                    - echo "Hello world!" && sleep 5
    
                  # The event will be available in a file at `/etc/custom-path/event`
                  volumeMounts:
                    - name: "jobsink-event"
                      mountPath: "/etc/custom-path"
                      readOnly: true

3.4.4. Cleaning up finished jobs

You can clean up finished jobs by setting a ttlSecondsAfterFinished value in your JobSink definition. For example, setting the value to 600 removes completed jobs 600 seconds (10 minutes) after they finish.

Procedure

  • In your definition, set the value of ttlSecondsAfterFinished to the required amount.

    Example of ttlSecondsAfterFinished set to 600

    apiVersion: sinks.knative.dev/v1alpha1
    kind: JobSink
    metadata:
      name: job-sink-example
    spec:
      job:
        spec:
          ttlSecondsAfterFinished: 600

3.4.5. Simulating FailJob action

Procedure

  • Trigger a FailJob action by including a bug simulating command in your JobSink definition.

    Example of JobSink failure

    apiVersion: sinks.knative.dev/v1alpha1
    kind: JobSink
    metadata:
      name: job-sink-failure
    spec:
      job:
        metadata:
          labels:
            my-label: my-value
        spec:
          completions: 12
          parallelism: 3
          template:
            spec:
              restartPolicy: Never
              containers:
                - name: main
                  image: docker.io/library/bash:5
                  command: [ "bash" ]        # example command simulating a bug which triggers the FailJob action
                  args:
                    - -c
                    - echo "Hello world!" && sleep 5 && exit 42
          backoffLimit: 6
          podFailurePolicy:
            rules:
              - action: FailJob
                onExitCodes:
                  containerName: main      # optional
                  operator: In             # one of: In, NotIn
                  values: [ 42 ]
              - action: Ignore             # one of: Ignore, FailJob, Count
                onPodConditions:
                  - type: DisruptionTarget   # indicates Pod disruption

3.5. Getting started with IntegrationSink

Important

OpenShift Serverless IntegrationSink feature is a Technology Preview feature only. Technology Preview features are not supported with Red Hat production service level agreements (SLAs) and might not be functionally complete. Red Hat does not recommend using them in production. These features provide early access to upcoming product features, enabling customers to test functionality and provide feedback during the development process.

For more information about the support scope of Red Hat Technology Preview features, see Technology Preview Features Support Scope.

The IntegrationSink API is a Knative Eventing custom resource (CR) that enables you to send events from Knative into external systems. It leverages selected Kamelets from the Apache Camel project.

Kamelets act as reusable connectors that can function either as sources or sinks.By using an IntegrationSink API, you can reliably deliver CloudEvents produced within Knative Eventing to third-party services and external systems.

OpenShift Serverless supports the following Kamelet sinks:

  • AWS Simple Storage Service (S3)
  • AWS Simple Notification Service (SNS)
  • AWS Simple Queue Service (SQS)
  • Generic logger sink

3.5.1. Creating AWS credentials

Several IntegrationSink API resources require access to Amazon Web Services (AWS) services such as S3, SNS, and SQS. To connect securely, you must create a Kubernetes Secret containing valid AWS credentials in the namespace where the IntegrationSink API resource will be created.

The Secret must include an AWS access key ID and secret access key with sufficient permissions to access the target AWS service. This Secret will then be referenced in each IntegrationSink configuration.

Prerequisites

  • You have an AWS account with an access key ID and secret access key that provide access to the relevant service.
  • You have the OpenShift CLI (oc) installed and are logged in to the cluster.
  • You have identified the namespace in which the IntegrationSink resource will be created.

Procedure

  • Create the secret by running the following command:

    $ oc -n <namespace> create secret generic my-secret \
      --from-literal=aws.accessKey=<accessKey> \
      --from-literal=aws.secretKey=<secretKey>

    Replace <namespace> with the namespace where the IntegrationSink resource will be created, and substitute <accessKey> and <secretKey> with the appropriate AWS credentials.

3.5.2. AWS S3 with IntegrationSink

The Amazon Web Services (AWS) Simple Storage Service (S3) Kamelet allows you to deliver CloudEvents from Knative Eventing into an Amazon S3 bucket. This integration makes it possible to store events as objects for long-term storage, analytics, or downstream processing.

To configure an IntegrationSink API for AWS S3, you must reference a Kubernetes Secret with valid AWS credentials, specify the Amazon S3 bucket Amazon Resource Name (ARN) and its region, and configure the source that produces the CloudEvents.

3.5.2.1. Creating an IntegrationSink for AWS S3

You can create an IntegrationSink API resource to deliver CloudEvents from Knative Eventing to an Amazon Simple Storage Service (S3) bucket. Persist event data as objects in S3 for long-term use, analytics processing, or downstream applications. Apply a YAML manifest to create the IntegrationSink.

Prerequisites

  • You have created your AWS credentials and stored them in a Kubernetes Secret in the same namespace as the resource.
  • You have the OpenShift CLI (oc) installed and logged in to the cluster.
  • Specify the namespace where you will create the IntegrationSink resource.
  • A Knative broker or another event source exists to produce CloudEvents and send them to this sink.

Procedure

  1. Save the following YAML manifest as integration-sink-aws-s3.yaml:

    apiVersion: sinks.knative.dev/v1alpha1
    kind: IntegrationSink
    metadata:
      name: integration-sink-aws-s3
      namespace: knative-samples
    spec:
      aws:
        s3:
          arn: "arn:aws:s3:::my-bucket"
          region: "eu-north-1"
        auth:
          secret:
            ref:
              name: "my-secret"
  2. Apply the manifest by running the following command:

    $ oc apply -f integration-sink-aws-s3.yaml

    This manifest configures the IntegrationSink API to deliver CloudEvents to the Amazon S3 bucket identified by its Amazon Resource Name (ARN) (arn:aws:s3:::my-bucket) in the eu-north-1 region. The auth.secret.ref.name field references the Kubernetes Secret (my-secret) that stores your AWS credentials.

3.5.3. AWS SNS with IntegrationSink

The Amazon Web Services (AWS) Simple Notification Service (SNS) Kamelet enables you to deliver CloudEvents from Knative Eventing to an SNS topic. This integration is useful when you need broad distribution like email, SMS, HTTP subscribers, SQS queues, Lambda functions, etc. via SNS subscriptions.

To configure an IntegrationSink API resource for AWS SNS, reference a Kubernetes Secret with valid AWS credentials, specify the SNS topic Amazon Resource Name (ARN) and region, and configure the event source that will produce the CloudEvents.

3.5.3.1. Creating an IntegrationSink for AWS SNS

You can create an IntegrationSink API resource to publish CloudEvents from Knative Eventing to an Amazon Simple Notification Service (SNS) topic by applying a YAML manifest.

Prerequisites

  • You have created your AWS credentials and stored them in a Kubernetes Secret in the same namespace as the resource.
  • You have the OpenShift CLI (oc) installed and logged in to the cluster.
  • Specify the namespace where you will create the IntegrationSink resource.
  • A Knative broker or another event source exists to produce CloudEvents and send them to this sink.

Procedure

  1. Save the following YAML manifest as integration-sink-aws-sns.yaml:

    apiVersion: sinks.knative.dev/v1alpha1
    kind: IntegrationSink
    metadata:
      name: integration-sink-aws-sns
      namespace: knative-samples
    spec:
      aws:
        sns:
          arn: "arn:aws:sns:<region>:<account>:my-topic"
          region: "eu-north-1"
        auth:
          secret:
            ref:
              name: "my-secret"
  2. Apply the manifest by running the following command:

    $ oc apply -f integration-sink-aws-sns.yaml

    This manifest configures the IntegrationSink API to publish CloudEvents to the SNS topic identified by its Amazon Resource Name (ARN) in the eu-north-1 region. The auth.secret.ref.name field references the Kubernetes Secret (my-secret) that stores your AWS credentials.

3.5.4. AWS SQS with IntegrationSink

The Amazon Web Services (AWS) Simple Queue Service (SQS) Kamelet allows you to send CloudEvents from Knative Eventing into an SQS queue. This integration is useful when you need reliable, decoupled message delivery between event producers and consumers, or when downstream systems process events asynchronously from a queue.

To configure an IntegrationSink API resource for AWS SQS, you must reference a Kubernetes Secret with valid AWS credentials, specify the queue Amazon Resource Name (ARN) and region, and configure the event source that produces the CloudEvents.

3.5.4.1. Creating an IntegrationSink for AWS SQS

You can create an IntegrationSink API resource to send CloudEvents to an Amazon Simple Queue Service (SQS) queue by applying a YAML manifest.

Prerequisites

  • You have created your AWS credentials and stored them in a Kubernetes Secret in the same namespace as the resource.
  • You have the OpenShift CLI (oc) installed and logged in to the cluster.
  • Specify the namespace where you will create the IntegrationSink resource.
  • A Knative broker or another event source exists to produce CloudEvents and send them to this sink.

Procedure

  1. Save the following YAML manifest as integration-sink-aws-sqs.yaml:

    apiVersion: sinks.knative.dev/v1alpha1
    kind: IntegrationSink
    metadata:
      name: integration-sink-aws-sqs
      namespace: knative-samples
    spec:
      aws:
        sqs:
          arn: "arn:aws:sqs:<region>:<account>:my-queue"
          region: "eu-north-1"
        auth:
          secret:
            ref:
              name: "my-secret"
  2. Apply the manifest by running the following command:

    $ oc apply -f integration-sink-aws-sqs.yaml

    This manifest configures the IntegrationSink API to send CloudEvents to the SQS queue identified by its Amazon Resource Name (ARN) in the eu-north-1 region. The auth.secret.ref.name field references the Kubernetes Secret (my-secret) that stores your AWS credentials.

3.5.5. Generic logger sink with IntegrationSink

The Log Sink Kamelet allows you to output CloudEvents from Knative Eventing directly to the application log. This sink is primarily used for debugging or testing event flows, as it provides visibility into the event payloads and metadata without requiring an external system.

To configure a IntegrationSink API resource for the logger, you can set the logging level and optionally display event headers.

You can create an IntegrationSink API resource to log CloudEvents by applying a YAML manifest.

Prerequisites

  • You have the OpenShift CLI (oc) installed and logged in to the cluster.
  • Specify the namespace where you will create the IntegrationSink resource.
  • A Knative broker or another event source exists to produce CloudEvents and send them to this sink.

Procedure

  1. Save the following YAML manifest as integration-log-sink.yaml:

    apiVersion: sinks.knative.dev/v1alpha1
    kind: IntegrationSink
    metadata:
      name: integration-log-sink
      namespace: knative-samples
    spec:
      log:
        showHeaders: true
        level: INFO
  2. Apply the manifest by running the following command:

    $ oc apply -f integration-log-sink.yaml

    This manifest configures the IntegrationSink API resource to log all CloudEvents it receives at the INFO level. You can set the showHeaders option to true to log the HTTP headers of the event.

Chapter 4. Brokers

4.1. Brokers

Brokers can be used in combination with triggers to deliver events from an event source to an event sink. Events are sent from an event source to a broker as an HTTP POST request. After events have entered the broker, they can be filtered by CloudEvent attributes using triggers, and sent as an HTTP POST request to an event sink.

Broker event delivery overview

4.2. Broker types

Cluster administrators can set the default broker implementation for a cluster. When you create a broker, the default broker implementation is used, unless you provide set configurations in the Broker object.

Knative provides a default, channel-based broker implementation. This channel-based broker can be used for development and testing purposes, but does not provide adequate event delivery guarantees for production environments. The default broker is backed by the InMemoryChannel channel implementation by default.

If you want to use Apache Kafka to reduce network hops, use the Knative broker implementation for Apache Kafka. Do not configure the channel-based broker to be backed by the KafkaChannel channel implementation.

For production-ready Knative Eventing deployments, Red Hat recommends using the Knative broker implementation for Apache Kafka. The broker is an Apache Kafka native implementation of the Knative broker, which sends CloudEvents directly to the Kafka instance.

The Knative broker has a native integration with Kafka for storing and routing events. This allows better integration with Kafka for the broker and trigger model over other broker types, and reduces network hops. Other benefits of the Knative broker implementation include:

  • At-least-once delivery guarantees
  • Ordered delivery of events, based on the CloudEvents partitioning extension
  • Control plane high availability
  • A horizontally scalable data plane

The Knative broker implementation for Apache Kafka stores incoming CloudEvents as Kafka records, using the binary content mode. This means that all CloudEvent attributes and extensions are mapped as headers on the Kafka record, while the data spec of the CloudEvent corresponds to the value of the Kafka record.

4.3. Creating brokers

Knative provides a default, channel-based broker implementation. This channel-based broker can be used for development and testing purposes, but does not provide adequate event delivery guarantees for production environments.

If a cluster administrator has configured your OpenShift Serverless deployment to use Apache Kafka as the default broker type, creating a broker by using the default settings creates a Knative broker for Apache Kafka.

If your OpenShift Serverless deployment is not configured to use the Knative broker for Apache Kafka as the default broker type, the channel-based broker is created when you use the default settings in the following procedures.

4.3.1. Creating a broker by using the Knative CLI

You can use Brokers in combination with triggers to deliver events from an event source to an event sink. Using the Knative (kn) CLI to create brokers provides a more streamlined and intuitive user interface over modifying YAML files directly. You can use the kn broker create command to create a broker.

Prerequisites

  • You have installed the OpenShift Serverless Operator and Knative Eventing on your OpenShift Container Platform cluster.
  • You have installed the Knative (kn) CLI.
  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.

Procedure

  • Create a broker:

    $ kn broker create <broker_name>

Verification

  1. Use the kn command to list all existing brokers:

    $ kn broker list

    Example output

    NAME      URL                                                                     AGE   CONDITIONS   READY   REASON
    default   http://broker-ingress.knative-eventing.svc.cluster.local/test/default   45s   5 OK / 5     True

  2. Optional: If you are using the OpenShift Container Platform web console, you can navigate to the Topology view and observe that the broker exists:

    View the broker in the web console Topology view

4.3.2. Creating a broker by annotating a trigger

You can use Brokers in combination with triggers to deliver events from an event source to an event sink. You can create a broker by adding the eventing.knative.dev/injection: enabled annotation to a Trigger object.

Important

If you create a broker by using the eventing.knative.dev/injection: enabled annotation, you cannot delete this broker without cluster administrator permissions. If you delete the broker without asking a cluster administrator to remove this annotation first, the system recreates the broker.

Prerequisites

  • You have installed the OpenShift Serverless Operator and Knative Eventing on your OpenShift Container Platform cluster.
  • Install the OpenShift CLI (oc).
  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.

Procedure

  1. Create a Trigger object as a YAML file that has the eventing.knative.dev/injection: enabled annotation:

    apiVersion: eventing.knative.dev/v1
    kind: Trigger
    metadata:
      annotations:
        eventing.knative.dev/injection: enabled
      name: <trigger_name>
    spec:
      broker: default
      subscriber: 
    1
    
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: <service_name>
    1
    Specify details about the event sink, or subscriber, that the trigger sends events to.
  2. Apply the Trigger YAML file:

    $ oc apply -f <filename>

Verification

You can verify broker creation by using the oc CLI or by viewing it in the Topology view in the web console.

  1. Enter the following oc command to get the broker:

    $ oc -n <namespace> get broker default

    Example output

    NAME      READY     REASON    URL                                                                     AGE
    default   True                http://broker-ingress.knative-eventing.svc.cluster.local/test/default   3m56s

  2. Optional: If you are using the OpenShift Container Platform web console, you can navigate to the Topology view and observe that the broker exists:

    View the broker in the web console Topology view

4.3.3. Creating a broker by labeling a namespace

You can use Brokers in combination with triggers to deliver events from an event source to an event sink. You can create the default broker automatically by labelling a namespace that you own or have write permissions for.

Note

Brokers created using this method are not removed if you remove the label. You must manually delete them.

Prerequisites

  • You have installed the OpenShift Serverless Operator and Knative Eventing on your OpenShift Container Platform cluster.
  • Install the OpenShift CLI (oc).
  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.
  • You have cluster or dedicated administrator permissions if you are using Red Hat OpenShift Service on AWS or OpenShift Dedicated.

Procedure

  • Label a namespace with eventing.knative.dev/injection=enabled:

    $ oc label namespace <namespace> eventing.knative.dev/injection=enabled

Verification

You can verify broker creation by using the oc CLI or by viewing it in the Topology view in the web console.

  1. Use the oc command to get the broker:

    $ oc -n <namespace> get broker <broker_name>

    Example command

    $ oc -n default get broker default

    Example output

    NAME      READY     REASON    URL                                                                     AGE
    default   True                http://broker-ingress.knative-eventing.svc.cluster.local/test/default   3m56s

  2. Optional: If you are using the OpenShift Container Platform web console, you can navigate to the Topology view and observe that the broker exists:

    View the broker in the web console Topology view

If you create a broker by injection and later want to delete it, you must delete it manually. Brokers created by using a namespace label or trigger annotation are not deleted permanently if you remove the label or annotation.

Prerequisites

  • Install the OpenShift CLI (oc).

Procedure

  1. Remove the eventing.knative.dev/injection=enabled label from the namespace:

    $ oc label namespace <namespace> eventing.knative.dev/injection-

    Removing the annotation prevents Knative from recreating the broker after you delete it.

  2. Delete the broker from the selected namespace:

    $ oc -n <namespace> delete broker <broker_name>

Verification

  • Use the oc command to get the broker:

    $ oc -n <namespace> get broker <broker_name>

    Example command

    $ oc -n default get broker default

    Example output

    No resources found.
    Error from server (NotFound): brokers.eventing.knative.dev "default" not found

4.3.5. Creating a broker by using the web console

After installing Knative Eventing on your cluster, you can create a broker by using the web console. Using the OpenShift Container Platform web console provides a streamlined and intuitive user interface to create a broker.

Prerequisites

  • You have logged in to the OpenShift Container Platform web console.
  • You have installed the OpenShift Serverless Operator, Knative Serving and Knative Eventing on the cluster.
  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.

Procedure

  1. Navigate to +AddBroker. The Broker page is displayed.
  2. Optional. Update the Name of the broker. If you do not update the name, the system names the generated broker default.
  3. Click Create.

Verification

  1. Verify broker creation by viewing broker components on the Topology page.
  2. Navigate to Topology.
  3. View the mt-broker-ingress, mt-broker-filter, and mt-broker-controller components.

    View the broker components in the Topology view

4.3.6. Next steps

If you are using a channel-based broker, you can set the default backing channel type for the broker to either InMemoryChannel or KafkaChannel.

Prerequisites

  • You have administrator permissions on OpenShift Container Platform.
  • You have installed the OpenShift Serverless Operator and Knative Eventing on your cluster.
  • You have installed the OpenShift (oc) CLI.
  • If you want to use Apache Kafka channels as the default backing channel type, you must also install the KnativeKafka CR on your cluster.

Procedure

  1. Modify the KnativeEventing custom resource (CR) to add configuration details for the config-br-default-channel config map:

    apiVersion: operator.knative.dev/v1beta1
    kind: KnativeEventing
    metadata:
      name: knative-eventing
      namespace: knative-eventing
    spec:
      config: 
    1
    
        config-br-default-channel:
          channel-template-spec: |
            apiVersion: messaging.knative.dev/v1beta1
            kind: KafkaChannel 
    2
    
            spec:
              numPartitions: 6 
    3
    
              replicationFactor: 3 
    4
    1
    In spec.config, you can specify the config maps that you want to add modified configurations for.
    2
    The default backing channel type configuration. In this example, the default channel implementation for the cluster is KafkaChannel.
    3
    The number of partitions for the Kafka channel that backs the broker.
    4
    The replication factor for the Kafka channel that backs the broker.
  2. Apply the updated KnativeEventing CR:

    $ oc apply -f <filename>

4.5. Configuring the default broker class

You can use the config-br-defaults config map to specify default broker class settings for Knative Eventing. You can specify the default broker class for the entire cluster or for one or more namespaces. Currently the MTChannelBasedBroker and Kafka broker types are supported.

Prerequisites

  • You have administrator permissions on OpenShift Container Platform.
  • You have installed the OpenShift Serverless Operator and Knative Eventing on your cluster.
  • If you want to use the Knative broker for Apache Kafka as the default broker implementation, you must also install the KnativeKafka CR on your cluster.

Procedure

  • Modify the KnativeEventing custom resource to add configuration details for the config-br-defaults config map:

    apiVersion: operator.knative.dev/v1beta1
    kind: KnativeEventing
    metadata:
      name: knative-eventing
      namespace: knative-eventing
    spec:
      defaultBrokerClass: Kafka 
    1
    
      config: 
    2
    
        config-br-defaults: 
    3
    
          default-br-config: |
            clusterDefault: 
    4
    
              brokerClass: Kafka
              apiVersion: v1
              kind: ConfigMap
              name: kafka-broker-config 
    5
    
              namespace: knative-eventing 
    6
    
            namespaceDefaults: 
    7
    
              my-namespace:
                brokerClass: MTChannelBasedBroker
                apiVersion: v1
                kind: ConfigMap
                name: config-br-default-channel 
    8
    
                namespace: knative-eventing 
    9
    
    ...
    1
    The default broker class for Knative Eventing.
    2
    In spec.config, you can specify the config maps that you want to add modified configurations for.
    3
    The config-br-defaults config map specifies the default settings for any broker that does not specify spec.config settings or a broker class.
    4
    The cluster-wide default broker class configuration. In this example, the default broker class implementation for the cluster is Kafka.
    5
    The kafka-broker-config config map specifies default settings for the Kafka broker. See "Configuring Knative broker for Apache Kafka settings" in the "Additional resources" section.
    6
    The namespace where the kafka-broker-config config map exists.
    7
    The namespace-scoped default broker class configuration. In this example, the default broker class implementation for the my-namespace namespace is MTChannelBasedBroker. You can specify default broker class implementations for multiple namespaces.
    8
    The config-br-default-channel config map specifies the default backing channel for the broker. See "Configuring the default broker backing channel" in the "Additional resources" section.
    9
    The namespace where the config-br-default-channel config map exists.
    Important

    Configuring a namespace-specific default overrides any cluster-wide settings.

For production-ready Knative Eventing deployments, Red Hat recommends using the Knative broker implementation for Apache Kafka. The broker is an Apache Kafka native implementation of the Knative broker, which sends CloudEvents directly to the Kafka instance.

The Knative broker has a native integration with Kafka for storing and routing events. This allows better integration with Kafka for the broker and trigger model over other broker types, and reduces network hops. Other benefits of the Knative broker implementation include:

  • At-least-once delivery guarantees
  • Ordered delivery of events, based on the CloudEvents partitioning extension
  • Control plane high availability
  • A horizontally scalable data plane

The Knative broker implementation for Apache Kafka stores incoming CloudEvents as Kafka records, using the binary content mode. This means that all CloudEvent attributes and extensions are mapped as headers on the Kafka record, while the data spec of the CloudEvent corresponds to the value of the Kafka record.

If your OpenShift Serverless deployment is not configured to use Kafka broker as the default broker type, you can use one of the following procedures to create a Kafka-based broker.

Creating Knative resources by using YAML files uses a declarative API, which enables you to describe applications declaratively and in a reproducible manner. To create a Kafka broker by using YAML, you must create a YAML file that defines a Broker object, then apply it by using the oc apply command.

Prerequisites

  • The OpenShift Serverless Operator, Knative Eventing, and the KnativeKafka custom resource are installed on your OpenShift Container Platform cluster.
  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.
  • You have installed the OpenShift CLI (oc).

Procedure

  1. Create a Kafka-based broker as a YAML file:

    apiVersion: eventing.knative.dev/v1
    kind: Broker
    metadata:
      annotations:
        eventing.knative.dev/broker.class: Kafka 
    1
    
      name: example-kafka-broker
    spec:
      config:
        apiVersion: v1
        kind: ConfigMap
        name: kafka-broker-config 
    2
    
        namespace: knative-eventing
    1
    The broker class. If not specified, brokers use the default class as configured by cluster administrators. To use the Kafka broker, this value must be Kafka.
    2
    The default config map for Knative brokers for Apache Kafka. This config map is created when the Kafka broker functionality is enabled on the cluster by a cluster administrator.
  2. Apply the Kafka-based broker YAML file:

    $ oc apply -f <filename>

If you want to use a Kafka broker without allowing it to create its own internal topic, you can use an externally managed Kafka topic instead. To do this, you must create a Kafka Broker object that uses the kafka.eventing.knative.dev/external.topic annotation.

Prerequisites

  • The OpenShift Serverless Operator, Knative Eventing, and the KnativeKafka custom resource are installed on your OpenShift Container Platform cluster.
  • You have access to a Kafka instance such as Red Hat AMQ Streams, and have created a Kafka topic.
  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.
  • You have installed the OpenShift CLI (oc).

Procedure

  1. Create a Kafka-based broker as a YAML file:

    apiVersion: eventing.knative.dev/v1
    kind: Broker
    metadata:
      annotations:
        eventing.knative.dev/broker.class: Kafka 
    1
    
        kafka.eventing.knative.dev/external.topic: <topic_name> 
    2
    
    ...
    1
    The broker class. If not specified, brokers use the default class as configured by cluster administrators. To use the Kafka broker, this value must be Kafka.
    2
    The name of the Kafka topic that you want to use.
  2. Apply the Kafka-based broker YAML file:

    $ oc apply -f <filename>
Important

The Knative Broker implementation for Apache Kafka with isolated data plane is a Technology Preview feature only. Technology Preview features are not supported with Red Hat production service level agreements (SLAs) and might not be functionally complete. Red Hat does not recommend using them in production. These features provide early access to upcoming product features, enabling customers to test functionality and provide feedback during the development process.

For more information about the support scope of Red Hat Technology Preview features, see Technology Preview Features Support Scope.

The Knative Broker implementation for Apache Kafka has 2 planes:

Control plane
Consists of controllers that talk to the Kubernetes API, watch for custom objects, and manage the data plane.
Data plane
The collection of components that listen for incoming events, talk to Apache Kafka, and send events to the event sinks. The Knative Broker implementation for Apache Kafka data plane is where events flow. The implementation consists of kafka-broker-receiver and kafka-broker-dispatcher deployments.

When you configure a Broker class of Kafka, the Knative Broker implementation for Apache Kafka uses a shared data plane. This means that the kafka-broker-receiver and kafka-broker-dispatcher deployments in the knative-eventing namespace are used for all Apache Kafka Brokers in the cluster.

However, when you configure a Broker class of KafkaNamespaced, the Apache Kafka broker controller creates a new data plane for each namespace where a broker exists. This data plane is used by all KafkaNamespaced brokers in that namespace. This provides isolation between the data planes, so that the kafka-broker-receiver and kafka-broker-dispatcher deployments in the user namespace are only used for the broker in that namespace.

Important

As a consequence of having separate data planes, this security feature creates more deployments and uses more resources. Unless you have such isolation requirements, use a regular Broker with a class of Kafka.

Important

The Knative Broker implementation for Apache Kafka with isolated data plane is a Technology Preview feature only. Technology Preview features are not supported with Red Hat production service level agreements (SLAs) and might not be functionally complete. Red Hat does not recommend using them in production. These features provide early access to upcoming product features, enabling customers to test functionality and provide feedback during the development process.

For more information about the support scope of Red Hat Technology Preview features, see Technology Preview Features Support Scope.

To create a KafkaNamespaced broker, you must set the eventing.knative.dev/broker.class annotation to KafkaNamespaced.

Prerequisites

  • You have installed the OpenShift Serverless Operator, Knative Eventing, and the KnativeKafka custom resource on your OpenShift Container Platform cluster.
  • You have access to an Apache Kafka instance, such as Red Hat AMQ Streams, and have created a Kafka topic.
  • You have created a project, or have access to a project, with the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.
  • You have installed the OpenShift CLI (oc).

Procedure

  1. Create an Apache Kafka-based broker by using a YAML file:

    apiVersion: eventing.knative.dev/v1
    kind: Broker
    metadata:
      annotations:
        eventing.knative.dev/broker.class: KafkaNamespaced 
    1
    
      name: default
      namespace: my-namespace 
    2
    
    spec:
      config:
        apiVersion: v1
        kind: ConfigMap
        name: my-config 
    3
    
    ...
    1
    To use the Apache Kafka broker with isolated data planes, the broker class value must be KafkaNamespaced.
    2 3
    The referenced ConfigMap object my-config must be in the same namespace as the Broker object, in this case my-namespace.
  2. Apply the Apache Kafka-based broker YAML file:

    $ oc apply -f <filename>
Important

The ConfigMap object in spec.config must be in the same namespace as the Broker object:

apiVersion: v1
kind: ConfigMap
metadata:
  name: my-config
  namespace: my-namespace
data:
  ...

When you create the first Broker object with the KafkaNamespaced class, the system creates the kafka-broker-receiver and kafka-broker-dispatcher deployments in the namespace. All brokers with the KafkaNamespaced class in the same namespace then use the same data plane. If the namespace has no brokers with the KafkaNamespaced class, the system deletes the data plane.

4.6.2. Configuring Apache Kafka broker settings

You can configure the replication factor, bootstrap servers, and the number of topic partitions for a Kafka broker, by creating a config map and referencing this config map in the Kafka Broker object.

Knative Eventing supports the full set of topic config options that Kafka supports. To set these options, you must add a key to the ConfigMap with the default.topic.config. prefix.

Prerequisites

  • You have cluster or dedicated administrator permissions on OpenShift Container Platform.
  • The OpenShift Serverless Operator, Knative Eventing, and the KnativeKafka custom resource (CR) are installed on your OpenShift Container Platform cluster.
  • You have created a project or have access to a project that has the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.
  • You have installed the OpenShift CLI (oc).

Procedure

  1. Modify the kafka-broker-config config map, or create your own config map that contains the following configuration:

    apiVersion: v1
    kind: ConfigMap
    metadata:
      name: <config_map_name> 
    1
    
      namespace: <namespace> 
    2
    
    data:
      default.topic.partitions: <integer> 
    3
    
      default.topic.replication.factor: <integer> 
    4
    
      bootstrap.servers: <list_of_servers> 
    5
    
      default.topic.config.<config_option>: <value> 
    6
    1
    The config map name.
    2
    The namespace where the config map exists.
    3
    The number of topic partitions for the Kafka broker. This controls how quickly events can be sent to the broker. A higher number of partitions requires greater compute resources.
    4
    The replication factor of topic messages. This prevents against data loss. A higher replication factor requires greater compute resources and more storage.
    5
    A comma separated list of bootstrap servers. This can be inside or outside of the OpenShift Container Platform cluster, and is a list of Kafka clusters that the broker receives events from and sends events to.
    6
    A topic config option. For more information, see the full set of possible options and values.
    Important

    The default.topic.replication.factor value must be less than or equal to the number of Kafka broker instances in your cluster. For example, if you only have one Kafka broker, the default.topic.replication.factor value should not be more than "1".

    Example Kafka broker config map

    apiVersion: v1
    kind: ConfigMap
    metadata:
      name: kafka-broker-config
      namespace: knative-eventing
    data:
      default.topic.partitions: "10"
      default.topic.replication.factor: "3"
      bootstrap.servers: "my-cluster-kafka-bootstrap.kafka:9092"
      default.topic.config.retention.ms: "3600"

  2. Apply the config map:

    $ oc apply -f <config_map_filename>
  3. Specify the config map for the Kafka Broker object:

    Example Broker object

    apiVersion: eventing.knative.dev/v1
    kind: Broker
    metadata:
      name: <broker_name> 
    1
    
      namespace: <namespace> 
    2
    
      annotations:
        eventing.knative.dev/broker.class: Kafka 
    3
    
    spec:
      config:
        apiVersion: v1
        kind: ConfigMap
        name: <config_map_name> 
    4
    
        namespace: <namespace> 
    5
    
    ...

    1
    The broker name.
    2
    The namespace where the broker exists.
    3
    The broker class annotation. In this example, the broker is a Kafka broker that uses the class value Kafka.
    4
    The config map name.
    5
    The namespace where the config map exists.
  4. Apply the broker:

    $ oc apply -f <broker_filename>

Kafka clusters are generally secured by using the TLS or SASL authentication methods. You can configure a Kafka broker or channel to work against a protected Red Hat AMQ Streams cluster by using TLS or SASL.

Note

Red Hat recommends that you enable both SASL and TLS together.

Transport Layer Security (TLS) is used by Apache Kafka clients and servers to encrypt traffic between Knative and Kafka, as well as for authentication. TLS is the only supported method of traffic encryption for the Knative broker implementation for Apache Kafka.

Prerequisites

  • You have cluster or dedicated administrator permissions on OpenShift Container Platform.
  • The OpenShift Serverless Operator, Knative Eventing, and the KnativeKafka CR are installed on your OpenShift Container Platform cluster.
  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.
  • You have a Kafka cluster CA certificate stored as a .pem file.
  • You have a Kafka cluster client certificate and a key stored as .pem files.
  • Install the OpenShift CLI (oc).

Procedure

  1. Create the certificate files as a secret in the knative-eventing namespace:

    $ oc create secret -n knative-eventing generic <secret_name> \
      --from-literal=protocol=SSL \
      --from-file=ca.crt=caroot.pem \
      --from-file=user.crt=certificate.pem \
      --from-file=user.key=key.pem
    Important

    Use the key names ca.crt, user.crt, and user.key. Do not change them.

  2. Edit the KnativeKafka CR and add a reference to your secret in the broker spec:

    apiVersion: operator.serverless.openshift.io/v1alpha1
    kind: KnativeKafka
    metadata:
      namespace: knative-eventing
      name: knative-kafka
    spec:
      broker:
        enabled: true
        defaultConfig:
          authSecretName: <secret_name>
    ...

Simple Authentication and Security Layer (SASL) is used by Apache Kafka for authentication. If you use SASL authentication on your cluster, users must provide credentials to Knative for communicating with the Kafka cluster; otherwise events cannot be produced or consumed.

Prerequisites

  • You have cluster or dedicated administrator permissions on OpenShift Container Platform.
  • The OpenShift Serverless Operator, Knative Eventing, and the KnativeKafka CR are installed on your OpenShift Container Platform cluster.
  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.
  • You have a username and password for a Kafka cluster.
  • You have chosen the SASL mechanism to use, for example, PLAIN, SCRAM-SHA-256, or SCRAM-SHA-512.
  • If TLS is enabled, you also need the ca.crt certificate file for the Kafka cluster.
  • Install the OpenShift CLI (oc).

Procedure

  1. Create the certificate files as a secret in the knative-eventing namespace:

    $ oc create secret -n knative-eventing generic <secret_name> \
      --from-literal=protocol=SASL_SSL \
      --from-literal=sasl.mechanism=<sasl_mechanism> \
      --from-file=ca.crt=caroot.pem \
      --from-literal=password="SecretPassword" \
      --from-literal=user="my-sasl-user"
    • Use the key names protocol, sasl.mechanism, ca.crt, password, and user. Do not change them.

      Note

      The ca.crt key is optional if the Kafka cluster uses a certificate signed by a public CA whose certificate is already in the system truststore.

  2. Edit the KnativeKafka CR and add a reference to your secret in the broker spec:

    apiVersion: operator.serverless.openshift.io/v1alpha1
    kind: KnativeKafka
    metadata:
      namespace: knative-eventing
      name: knative-kafka
    spec:
      broker:
        enabled: true
        defaultConfig:
          authSecretName: <secret_name>
    ...

4.7. Managing brokers

After you have created a broker, you can manage your broker by using Knative (kn) CLI commands, or by modifying it in the OpenShift Container Platform web console.

4.7.1. Managing brokers using the CLI

The Knative (kn) CLI provides commands that can be used to describe and list existing brokers.

Using the Knative (kn) CLI to list brokers provides a streamlined and intuitive user interface. You can use the kn broker list command to list existing brokers in your cluster by using the Knative CLI.

Prerequisites

  • The OpenShift Serverless Operator and Knative Eventing are installed on your OpenShift Container Platform cluster.
  • You have installed the Knative (kn) CLI.

Procedure

  • List all existing brokers:

    $ kn broker list

    Example output

    NAME      URL                                                                     AGE   CONDITIONS   READY   REASON
    default   http://broker-ingress.knative-eventing.svc.cluster.local/test/default   45s   5 OK / 5     True

Using the Knative (kn) CLI to describe brokers provides a streamlined and intuitive user interface. You can use the kn broker describe command to print information about existing brokers in your cluster by using the Knative CLI.

Prerequisites

  • You have installed the OpenShift Serverless Operator and Knative Eventing on your OpenShift Container Platform cluster.
  • You have installed the Knative (kn) CLI.

Procedure

  • Describe an existing broker:

    $ kn broker describe <broker_name>

    Example command by using default broker

    $ kn broker describe default

    Example output

    Name:         default
    Namespace:    default
    Annotations:  eventing.knative.dev/broker.class=MTChannelBasedBroker, eventing.knative.dev/creato ...
    Age:          22s
    
    Address:
      URL:    http://broker-ingress.knative-eventing.svc.cluster.local/default/default
    
    Conditions:
      OK TYPE                   AGE REASON
      ++ Ready                  22s
      ++ Addressable            22s
      ++ FilterReady            22s
      ++ IngressReady           22s
      ++ TriggerChannelReady    22s

4.7.2. Connect a broker to a sink

You can connect a broker to an event sink in the OpenShift Container Platform web console by creating a trigger.

Prerequisites

  • You have installed the OpenShift Serverless Operator, Knative Serving, and Knative Eventing on your OpenShift Container Platform cluster.
  • You have logged in to the web console.
  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.
  • You have created a sink, such as a Knative service or channel.
  • You have created a broker.

Procedure

  1. In the Topology view, point to the broker that you have created. An arrow is displayed. Drag the arrow to the sink that you want to connect to the broker. This action opens the Add Trigger dialog box.
  2. In the Add Trigger dialog box, enter a name for the trigger and click Add.

Verification

  1. Verify that the broker connects to the sink by viewing the Topology page.
  2. Navigate to Topology.
  3. Click the line that connects the broker to the sink to see details about the trigger in the Details panel.

Chapter 5. Triggers

5.1. Triggers overview

Triggers are an essential component in Knative Eventing that connect specific event sources to subscriber services based on defined filters. By creating a Trigger, you can dynamically manage how events are routed within your system, ensuring they reach the appropriate destination based on your business logic.

Brokers can be used in combination with triggers to deliver events from an event source to an event sink. Events are sent from an event source to a broker as an HTTP POST request. After events have entered the broker, they can be filtered by CloudEvent attributes using triggers, and sent as an HTTP POST request to an event sink.

Broker event delivery overview

5.2. Creating triggers

Triggers in Knative Eventing allow you to route events from a broker to a specific subscriber based on your requirements. By defining a Trigger, you can connect event producers to consumers dynamically, ensuring events are delivered to the correct destination. This section describes the steps to create a Trigger, configure its filters, and verify its functionality. Whether you’re working with simple routing needs or complex event-driven workflows.

The following examples displays common configurations for Triggers, demonstrating how to route events to Knative services or custom endpoints.

Example of routing events to a Knative Serving service

The following Trigger routes all events from the default broker to the Knative Serving service named my-service:

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: my-service-trigger
spec:
  broker: default
  filter:
    attributes:
      type: dev.knative.foo.bar
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: my-service
Note

Routing all events without a filter attribute is recommended for debugging purposes. It allows you to observe and analyze all incoming events, helping identify issues or validate the flow of events through the broker before applying specific filters. To know more about filtering, see Advanced trigger filters.

To apply this trigger, you can save the configuration to a file, for example, trigger.yaml and run the following command:

$ oc apply -f trigger.yaml

Example of routing events to a custom path

This Trigger routes all events from the default broker to a custom path /my-custom-path on the service named my-service:

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: my-service-trigger
spec:
  broker: default
  subscriber:
    ref:
      apiVersion: v1
      kind: Service
      name: my-service
    uri: /my-custom-path

You can save the configuration to a file, for example, custom-path-trigger.yaml and run the following command:

$ oc apply -f custom-path-trigger.yaml

5.2.1. Creating a trigger

Using the OpenShift Container Platform web console provides a streamlined and intuitive user interface to create a trigger. After installing Knative Eventing on your cluster and you have created a broker, you can create a trigger by using the web console.

Prerequisites

  • You have installed the OpenShift Serverless Operator, Knative Serving, and Knative Eventing on your OpenShift Container Platform cluster.
  • You have logged in to the web console.
  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.
  • You have created a broker and a Knative service or other event sink to connect to the trigger.

Procedure

  1. Navigate to the Topology page.
  2. Hover over the broker that you want to create a trigger for, and drag the arrow. The Add Trigger option is displayed.
  3. Click Add Trigger.
  4. Select your sink in the Subscriber list.
  5. Click Add.

Verification

  • After you create the subscription, view it on the Topology page, where a line connects the broker to the event sink.

Deleting a trigger

  1. Navigate to the Topology page.
  2. Click the trigger that you want to delete.
  3. In the Actions menu, select Delete Trigger.

5.2.2. Creating a trigger by using the Knative CLI

You can use the kn trigger create command to create a trigger.

Prerequisites

  • You have installed the OpenShift Serverless Operator and Knative Eventing on your OpenShift Container Platform cluster.
  • You have installed the Knative (kn) CLI.
  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.

Procedure

  • Create a trigger:

    $ kn trigger create <trigger_name> --broker <broker_name> --filter <key=value> --sink <sink_name>

    You can also create a trigger and simultaneously create the default broker by using broker injection:

    $ kn trigger create <trigger_name> --inject-broker --filter <key=value> --sink <sink_name>

    By default, triggers forward all events sent to a broker to sinks that subscribe to that broker. You can use the --filter attribute for triggers to filter events from a broker so that subscribers receive only a subset of events based on defined criteria.

5.3. List triggers from the command line

Using the Knative (kn) CLI to list triggers provides a streamlined and intuitive user interface.

5.3.1. Listing triggers by using the Knative CLI

You can use the kn trigger list command to list existing triggers in your cluster.

Prerequisites

  • You have installed the OpenShift Serverless Operator and Knative Eventing on your OpenShift Container Platform cluster.
  • You have installed the Knative (kn) CLI.

Procedure

  1. Print a list of available triggers:

    $ kn trigger list

    Example output

    NAME    BROKER    SINK           AGE   CONDITIONS   READY   REASON
    email   default   ksvc:edisplay   4s    5 OK / 5     True
    ping    default   ksvc:edisplay   32s   5 OK / 5     True

  2. Optional: Print a list of triggers in JSON format:

    $ kn trigger list -o json

5.4. Describe triggers from the command line

Using the Knative (kn) CLI to describe triggers provides a streamlined and intuitive user interface.

You can use the kn trigger describe command to print information about existing triggers in your cluster by using the Knative CLI.

Prerequisites

  • You have installed the OpenShift Serverless Operator and Knative Eventing are on your OpenShift Container Platform cluster.
  • You have installed the Knative (kn) CLI.
  • You have created a trigger.

Procedure

  • Enter the command:

    $ kn trigger describe <trigger_name>

    Example output

    Name:         ping
    Namespace:    default
    Labels:       eventing.knative.dev/broker=default
    Annotations:  eventing.knative.dev/creator=kube:admin, eventing.knative.dev/lastModifier=kube:admin
    Age:          2m
    Broker:       default
    Filter:
      type:       dev.knative.event
    
    Sink:
      Name:       edisplay
      Namespace:  default
      Resource:   Service (serving.knative.dev/v1)
    
    Conditions:
      OK TYPE                  AGE REASON
      ++ Ready                  2m
      ++ BrokerReady            2m
      ++ DependencyReady        2m
      ++ Subscribed             2m
      ++ SubscriberResolved     2m

5.5. Connecting a trigger to a sink

You can connect a trigger to a sink, so that events from a broker are filtered before they are sent to the sink. A sink that is connected to a trigger is configured as a subscriber in the Trigger object’s resource spec.

Example of a Trigger object connected to an Apache Kafka sink

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: <trigger_name> 
1

spec:
...
  subscriber:
    ref:
      apiVersion: eventing.knative.dev/v1alpha1
      kind: KafkaSink
      name: <kafka_sink_name> 
2

1
The name of the trigger being connected to the sink.
2
The name of a KafkaSink object.

5.6. Filtering triggers from the command line

Using the Knative (kn) CLI to filter events by using triggers provides a streamlined and intuitive user interface. You can use the kn trigger create command, along with the appropriate flags, to filter events by using triggers.

In the following trigger example, the trigger sends only events with the attribute type: dev.knative.samples.helloworld to the event sink:

$ kn trigger create <trigger_name> --broker <broker_name> --filter type=dev.knative.samples.helloworld --sink ksvc:<service_name>

You can also filter events by using many attributes. The following example shows how to filter events by using the type, source, and extension attributes:

$ kn trigger create <trigger_name> --broker <broker_name> --sink ksvc:<service_name> \
--filter type=dev.knative.samples.helloworld \
--filter source=dev.knative.samples/helloworldsource \
--filter myextension=my-extension-value

5.7. Advanced trigger filters

The advanced trigger filters give you advanced options for more precise event routing. You can filter events by exact matches, prefixes, or suffixes, as well as by CloudEvent extensions. This added control makes it easier to fine-tune how events flow ensuring that only relevant events trigger specific actions.

5.7.1. Advanced trigger filters overview

The advanced trigger filters feature adds a new filters field to triggers that aligns with the filters API field defined in the CloudEvents Subscriptions API. You can specify filter expressions, where each expression evaluates to true or false for each event.

The following example shows a trigger by using the advanced filters field:

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: my-service-trigger
spec:
  broker: default
  filters:
    - cesql: "source LIKE '%commerce%' AND type IN ('order.created', 'order.updated', 'order.canceled')"
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: my-service

The filters field has an array of filter expressions, each evaluating to either true or false. If any expression evaluates to false, the event is not sent to the subscriber. Each filter expression uses a specific dialect that determines the type of filter and the set of allowed additional properties within the expression.

5.7.2. Supported filter dialects

You can use dialects to define flexible filter expressions to target specific events.

The advanced trigger filters support the following dialects that offer different ways to match and filter events:

  • exact
  • prefix
  • suffix
  • all
  • any
  • not
  • cesql

Each dialect provides a different method for filtering events based on a specific criteria, enabling precise event selection for processing.

5.7.2.1. exact filter dialect

The exact dialect filters events by comparing a string value of the CloudEvent attribute to exactly match the specified string. The comparison is case sensitive. If the attribute is not a string, the filter converts the attribute to its string representation before comparing it to the specified value.

Example of the exact filter dialect

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  ...
spec:
  ...
  filters:
    - exact:
        type: com.github.push

5.7.2.2. prefix filter dialect

The prefix dialect filters events by comparing a string value of the CloudEvent attribute that starts with the specified string. This comparison is case sensitive. If the attribute is not a string, the filter converts the attribute to its string representation before matching it against the specified value.

Example of the prefix filter dialect

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  ...
spec:
  ...
  filters:
    - prefix:
        type: com.github.

5.7.2.3. suffix filter dialect

The suffix dialect filters events by comparing a string value of the CloudEvent attribute that ends with the specified string. This comparison is case-sensitive. If the attribute is not a string, the filter converts the attribute to its string representation before matching it to the specified value.

Example of the suffix filter dialect

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  ...
spec:
  ...
  filters:
    - suffix:
        type: .created

5.7.2.4. all filter dialect

The all filter dialect needs that all nested filter expressions evaluate to true to process the event. If any of the nested expressions return false, the event is not sent to the subscriber.

Example of the all filter dialect

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  ...
spec:
  ...
  filters:
    - all:
        - exact:
            type: com.github.push
        - exact:
            subject: https://github.com/cloudevents/spec

5.7.2.5. any filter dialect

The any filter dialect requires at least one of the nested filter expressions to evaluate to true. If none of the nested expressions return true, the event is not sent to the subscriber.

Example of the any filter dialect

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  ...
spec:
  ...
  filters:
    - any:
        - exact:
            type: com.github.push
        - exact:
            subject: https://github.com/cloudevents/spec

5.7.2.6. not filter dialect

The not filter dialect requires that the nested filter expression evaluates to false for the event to be processed. If the nested expression evaluates to true, the event is not sent to the subscriber.

Example of the not filter dialect

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  ...
spec:
  ...
  filters:
    - not:
        exact:
          type: com.github.push

5.7.2.7. cesql filter dialect

CloudEvents SQL expressions (cesql) allow computing values and matching of CloudEvent attributes against complex expressions that lean on the syntax of Structured Query Language (SQL) WHERE clauses.

The cesql filter dialect uses CloudEvents SQL expressions to filter events. The provided CESQL expression must evaluate to true for the event to be processed.

Example of the cesql filter dialect

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  ...
spec:
  ...
  filters:
    - cesql: "source LIKE '%commerce%' AND type IN ('order.created', 'order.updated', 'order.canceled')"

For more information about the syntax and the features of the cesql filter dialect, see CloudEvents SQL Expression Language.

5.7.3. Conflict with the existing filter field

You can use the filters and the existing filter field at the same time. If you enable the new new-trigger-filters feature and an object contains both filter and filters, the filters field overrides. This setup allows you to test the new filters field while maintaining support for existing filters. You can gradually introduce the new field into existing trigger objects.

Example of filters field overriding the filter field:

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: my-service-trigger
spec:
  broker: default
  # Existing filter field. This will be ignored when the new filters field is present.
  filter:
    attributes:
      type: dev.knative.foo.bar
      myextension: my-extension-value
  # New filters field. This takes precedence over the old filter field.
  filters:
    - cesql: "type = 'dev.knative.foo.bar' AND myextension = 'my-extension-value'"
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: my-service

5.7.4. Legacy attributes filter

The legacy attributes filter enables exact match filtering on any number of CloudEvents attributes, including extensions. Its functionality mirrors the exact filter dialect, and you are encouraged to transition to the exact filter whenever possible. However, for backward compatibility, the attributes filter remains available.

The following example displays how to filter events from the default broker that match the type attribute dev.knative.foo.bar and have the extension myextension with the my-extension-value value:

Example of filtering events with specific attributes

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: my-service-trigger
spec:
  broker: default
  filter:
    attributes:
      type: dev.knative.foo.bar
      myextension: my-extension-value
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: my-service

When both the filters field and the legacy filter field are specified, the filters field takes precedence.

For example, in the following example configuration, events with the dev.knative.a type are delivered, while events with the dev.knative.b type are ignored:

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: my-service-trigger
spec:
  broker: default
  filters:
    exact:
      type: dev.knative.a
  filter:
    attributes:
      type: dev.knative.b
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: my-service

5.8. Updating triggers from the command line

Using the Knative (kn) CLI to update triggers provides a streamlined and intuitive user interface.

5.8.1. Updating a trigger by using the Knative CLI

You can use the kn trigger update command with certain flags to update attributes for a trigger.

Prerequisites

  • You have installed the OpenShift Serverless Operator and Knative Eventing are on your OpenShift Container Platform cluster.
  • You have installed the Knative (kn) CLI.
  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.

Procedure

  • Update a trigger:

    $ kn trigger update <trigger_name> --filter <key=value> --sink <sink_name> [flags]
    • You can update a trigger to filter exact event attributes that match incoming events. For example, using the type attribute:

      $ kn trigger update <trigger_name> --filter type=knative.dev.event
    • You can remove a filter attribute from a trigger. For example, you can remove the filter attribute with key type:

      $ kn trigger update <trigger_name> --filter type-
    • You can use the --sink parameter to change the event sink of a trigger:

      $ kn trigger update <trigger_name> --sink ksvc:my-event-sink

5.9. Deleting triggers from the command line

Using the Knative (kn) CLI to delete a trigger provides a streamlined and intuitive user interface.

5.9.1. Deleting a trigger by using the Knative CLI

You can use the kn trigger delete command to delete a trigger.

Prerequisites

  • You have installed the OpenShift Serverless Operator and Knative Eventing on your OpenShift Container Platform cluster.
  • You have installed the Knative (kn) CLI.
  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.

Procedure

  • Delete a trigger:

    $ kn trigger delete <trigger_name>

Verification

  1. List existing triggers:

    $ kn trigger list
  2. Verify that the trigger no longer exists:

    Example output

    No triggers found.

5.10. Event delivery order for triggers

In Knative Eventing, the delivery order of events plays a critical role in ensuring messages are processed according to application requirements. When using a Kafka broker, you can specify whether events should be delivered in order or without strict ordering. By configuring the delivery order, you can optimize event handling for use cases that require sequential processing or prioritize performance for unordered delivery.

If you are using a Kafka broker, you can configure the delivery order of events from triggers to event sinks.

Prerequisites

  • You have installed the OpenShift Serverless Operator, Knative Eventing, and Knative Kafka on the cluster.
  • You have created a Kafka broker and it is enabled.
  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.
  • You have installed the OpenShift (oc) CLI.

Procedure

  1. Create or change a Trigger object and set the kafka.eventing.knative.dev/delivery.order annotation using the following example Trigger YAML file::

    apiVersion: eventing.knative.dev/v1
    kind: Trigger
    metadata:
      name: <trigger_name>
      annotations:
         kafka.eventing.knative.dev/delivery.order: ordered
    # ...

    The supported consumer delivery guarantees are:

    unordered
    An unordered consumer is a non-blocking consumer that delivers messages unordered, while preserving proper offset management.
    ordered

    An ordered consumer is a per-partition blocking consumer that waits for a successful response from the CloudEvent subscriber before it delivers the next message of the partition.

    The default ordering guarantee is unordered.

  2. Apply the Trigger object using the following command::

    $ oc apply -f <filename>

5.10.2. Next steps

Chapter 6. Channels

6.1. Channels and subscriptions

Channels are custom resources that define a single event-forwarding and persistence layer. After events have been sent to a channel from an event source or producer, these events can be sent to multiple Knative services or other sinks by using a subscription.

Channel workflow overview

You can create channels by instantiating a supported Channel object, and configure re-delivery attempts by modifying the delivery spec in a Subscription object.

After you create a Channel object, a mutating admission webhook adds a set of spec.channelTemplate properties for the Channel object based on the default channel implementation. For example, for an InMemoryChannel default implementation, the Channel object looks as follows:

apiVersion: messaging.knative.dev/v1
kind: Channel
metadata:
  name: example-channel
  namespace: default
spec:
  channelTemplate:
    apiVersion: messaging.knative.dev/v1
    kind: InMemoryChannel

The channel controller then creates the backing channel instance based on the spec.channelTemplate configuration.

Note

The spec.channelTemplate properties cannot be changed after creation, because they are set by the default channel mechanism rather than by the user.

When this mechanism is used with the preceding example, two objects are created: a generic backing channel and an InMemoryChannel channel. If you are using a different default channel implementation, the InMemoryChannel is replaced with one that is specific to your implementation. For example, with the Knative broker for Apache Kafka, the KafkaChannel channel is created.

The backing channel acts as a proxy that copies its subscriptions to the user-created channel object, and sets the user-created channel object status to reflect the status of the backing channel.

6.1.1. Channel implementation types

OpenShift Serverless supports the InMemoryChannel and KafkaChannel channels implementations. The InMemoryChannel channel is recommended for development use only due to its limitations. You can use the KafkaChannel channel for a production environment.

The following are limitations of InMemoryChannel type channels:

  • No event persistence is available. If a pod goes down, events on that pod are lost.
  • InMemoryChannel channels do not implement event ordering, so two events that are received in the channel at the same time can be delivered to a subscriber in any order.
  • If a subscriber rejects an event, there are no re-delivery attempts by default. You can configure re-delivery attempts by modifying the delivery spec in the Subscription object.

6.2. Creating channels

Channels are custom resources that define a single event-forwarding and persistence layer. After events have been sent to a channel from an event source or producer, these events can be sent to multiple Knative services or other sinks by using a subscription.

Channel workflow overview

You can create channels by instantiating a supported Channel object, and configure re-delivery attempts by modifying the delivery spec in a Subscription object.

6.2.1. Creating a channel

Using the OpenShift Container Platform web console provides a streamlined and intuitive user interface to create a channel. After installing Knative Eventing on your cluster, create a channel in the web console.

Prerequisites

  • You have logged in to the OpenShift Container Platform web console.
  • You have installed the OpenShift Serverless Operator and Knative Eventing on your OpenShift Container Platform cluster.
  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.

Procedure

  1. Navigate to +AddChannel.
  2. Select the type of Channel object that you want to create in the Type list.

    Note

    Currently only InMemoryChannel channel objects are supported by default. Knative channels for Apache Kafka are available if you have installed the Knative broker implementation for Apache Kafka on OpenShift Serverless.

  3. Click Create.

Verification

  • Confirm that the channel now exists by navigating to the Topology page.

    View the channel in the Topology view

6.2.2. Creating a channel by using the Knative CLI

Using the Knative (kn) CLI to create channels provides a more streamlined and intuitive user interface than modifying YAML files directly. You can use the kn channel create command to create a channel.

Prerequisites

  • You have installed the OpenShift Serverless Operator and Knative Eventing on the cluster.
  • You have installed the Knative (kn) CLI.
  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.

Procedure

  • Create a channel:

    $ kn channel create <channel_name> --type <channel_type>

    The channel type is optional. If you specify it, you must use the Group:Version:Kind format. For example, you can create an InMemoryChannel object:

    $ kn channel create mychannel --type messaging.knative.dev:v1:InMemoryChannel

    Example output

    Channel 'mychannel' created in namespace 'default'.

Verification

  • To confirm that the channel now exists, list the existing channels and inspect the output:

    $ kn channel list

    Example output

    kn channel list
    NAME        TYPE              URL                                                     AGE   READY   REASON
    mychannel   InMemoryChannel   http://mychannel-kn-channel.default.svc.cluster.local   93s   True

Deleting a channel

  • Delete a channel:

    $ kn channel delete <channel_name>

You can use YAML files to create Knative resources through a declarative API. Define channels declaratively and reproduce the configuration consistently. To create a serverless channel by using YAML, you must create a YAML file that defines a Channel object, then apply it by using the oc apply command.

Prerequisites

  • You have installed the OpenShift Serverless Operator and Knative Eventing on the cluster.
  • Install the OpenShift CLI (oc).
  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.

Procedure

  1. Create a Channel object as a YAML file:

    apiVersion: messaging.knative.dev/v1
    kind: Channel
    metadata:
      name: example-channel
      namespace: default
  2. Apply the YAML file:

    $ oc apply -f <filename>

You can use YAML files to create Knative resources through a declarative API. Define channels declaratively and reproduce the configuration consistently. You can create a Knative Eventing channel that uses Kafka topics by creating a Kafka channel. To create a Kafka channel with YAML, define a KafkaChannel object in a YAML file and apply the file by using the oc apply command.

Prerequisites

  • You have installed the OpenShift Serverless Operator, Knative Eventing, and the KnativeKafka custom resource on your OpenShift Container Platform cluster.
  • Install the OpenShift CLI (oc).
  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.

Procedure

  1. Create a KafkaChannel object as a YAML file:

    apiVersion: messaging.knative.dev/v1beta1
    kind: KafkaChannel
    metadata:
      name: example-channel
      namespace: default
    spec:
      numPartitions: 3
      replicationFactor: 1
    Important

    Only the v1beta1 version of the API for KafkaChannel objects on OpenShift Serverless is supported. Do not use the v1alpha1 version of this API, as this version is now deprecated.

  2. Apply the KafkaChannel YAML file:

    $ oc apply -f <filename>

6.2.5. Next steps

6.3. Connecting channels to sinks

Events that have been sent to a channel from an event source or producer can be forwarded to one or more sinks by using subscriptions. You can create subscriptions by configuring a Subscription object, which specifies the channel and the sink (also known as a subscriber) that consumes the events sent to that channel.

6.3.1. Creating a subscription

After you have created a channel and an event sink, you can create a subscription to enable event delivery. Using the OpenShift Container Platform web console provides a streamlined and intuitive user interface to create a subscription.

Prerequisites

  • You have installed the OpenShift Serverless Operator, Knative Serving, and Knative Eventing on your OpenShift Container Platform cluster.
  • You have logged in to the web console.
  • You have created an event sink, such as a Knative service, and a channel.
  • You have created a project or have access to a project with the appropriate roles and privileges to create applications and other workloads in OpenShift Container Platform.

Procedure

  1. Navigate to the Topology page.
  2. Create a subscription using one of the following methods:

    1. Hover over the channel that you want to create a subscription for, and drag the arrow. The Add Subscription option is displayed.

      Create a subscription for the channel
      1. Select your sink in the Subscriber list.
      2. Click Add.
    2. If the service is available in the Topology view under the same namespace or project as the channel, click the channel that you want to create a subscription for, and drag the arrow directly to a service to immediately create a subscription from the channel to that service.

Verification

  • After you create the subscription, the Topology view shows it as a line that connects the channel to the service.

    Subscription in the Topology view

6.3.2. Creating a subscription by using YAML

After you create a channel and an event sink, create a subscription to deliver events. You can define Knative resources declaratively by using YAML files. To create a subscription, create a YAML file that defines a Subscription object, and then apply the file by using the oc apply command.

Prerequisites

  • You have installed the OpenShift Serverless Operator and Knative Eventing on the cluster.
  • Install the OpenShift CLI (oc).
  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.

Procedure

  • Create a Subscription object:

    • Create a YAML file and copy the following sample code into it:

      apiVersion: messaging.knative.dev/v1
      kind: Subscription
      metadata:
        name: my-subscription 
      1
      
        namespace: default
      spec:
        channel: 
      2
      
          apiVersion: messaging.knative.dev/v1
          kind: Channel
          name: example-channel
        delivery: 
      3
      
          deadLetterSink:
            ref:
              apiVersion: serving.knative.dev/v1
              kind: Service
              name: error-handler
        subscriber: 
      4
      
          ref:
            apiVersion: serving.knative.dev/v1
            kind: Service
            name: event-display
      1
      Name of the subscription.
      2
      Configuration settings for the channel that the subscription connects to.
      3
      Configuration settings for event delivery. These settings define how the subscription handles events that the subscriber cannot receive. When you configure this option, the system sends failed events to the deadLetterSink. The system drops the event, does not try redelivery, and logs an error. The deadLetterSink value must be a Destination.
      4
      Configuration settings for the subscriber. The subscriber is the event sink that receives events from the channel.
    • Apply the YAML file:

      $ oc apply -f <filename>

After you have created a channel and an event sink, you can create a subscription to enable event delivery. Using the Knative (kn) CLI to create subscriptions provides a more streamlined and intuitive user interface than modifying YAML files directly. You can use the kn subscription create command with the appropriate flags to create a subscription.

Prerequisites

  • You have installed the OpenShift Serverless Operator and Knative Eventing on your OpenShift Container Platform cluster.
  • You have installed the Knative (kn) CLI.
  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.

Procedure

  • Create a subscription to connect a sink to a channel:

    $ kn subscription create <subscription_name> \
      --channel <group:version:kind>:<channel_name> \ 
    1
    
      --sink <sink_prefix>:<sink_name> \ 
    2
    
      --sink-dead-letter <sink_prefix>:<sink_name> 
    3
    1
    Use --channel to specify the source of CloudEvents to process. You can give the channel name. If you do not use the default InMemoryChannel defined by the Channel custom resource, prefix the channel name with <group:version:kind> for the channel type. For example, use messaging.knative.dev:v1beta1:KafkaChannel for an Apache Kafka channel.
    2
    Use --sink to specify the event destination. By default, the system treats <sink_name> as a Knative service with that name in the same namespace as the subscription. Use one of the following prefixes to specify the sink type:
    ksvc
    A Knative service.
    channel
    Specify the channel to use as the destination. Reference only default channel types.
    broker
    An Eventing broker.
    3
    Optional: Use the optional --sink-dead-letter flag to specify a sink that receives events when delivery fails.

    Example command

    $ kn subscription create mysubscription --channel mychannel --sink ksvc:event-display

    Example output

    Subscription 'mysubscription' created in namespace 'default'.

Verification

  • To confirm that a subscription connects the channel to the event sink, or subscriber, list the existing subscriptions and inspect the output by running the following command:

    $ kn subscription list

    Example output

    NAME            CHANNEL             SUBSCRIBER           REPLY   DEAD LETTER SINK   READY   REASON
    mysubscription   Channel:mychannel   ksvc:event-display                              True

Deleting a subscription

  • Delete a subscription:

    $ kn subscription delete <subscription_name>

After you have created a channel and an event sink, also known as a subscriber, you can create a subscription to enable event delivery. You can create subscriptions by configuring a Subscription object that specifies the channel and the subscriber that receives events. You can also specify some subscriber-specific options, such as how to handle failures.

Prerequisites

  • You have installed the OpenShift Serverless Operator and Knative Eventing on your OpenShift Container Platform cluster.
  • You have logged in to the web console.
  • You have cluster-admin privileges on OpenShift Container Platform, or you have cluster or dedicated administrator privileges on Red Hat OpenShift Service on AWS or OpenShift Dedicated.
  • You have created an event sink, such as a Knative service, and a channel.

Procedure

  1. In the OpenShift Container Platform web console, navigate to ServerlessEventing.
  2. In the Channel tab, select the Options menu kebab for the channel that you want to add a subscription to.
  3. Click Add Subscription in the list.
  4. In the Add Subscription dialogue box, select a Subscriber for the subscription. The subscriber is the Knative service that receives events from the channel.
  5. Click Add.

6.3.5. Next steps

  • Configure Event delivery parameters that are applied in cases where an event fails to be delivered to an event sink.

6.4. Default channel implementation

You can use the default-ch-webhook config map to specify the default channel implementation of Knative Eventing. You can specify the default channel implementation for the entire cluster or for one or more namespaces. Currently the InMemoryChannel and KafkaChannel channel types are supported.

You can configure the default channel implementation for Knative Eventing at the cluster or namespace level by using the KnativeEventing custom resource (CR). Specify the default channel implementation, such as InMemoryChannel, KafkaChannel, or other supported implementations, for newly created channels.

Prerequisites

  • You have administrator permissions on OpenShift Container Platform.
  • You have installed the OpenShift Serverless Operator and Knative Eventing on your cluster.
  • If you want to use Knative channels for Apache Kafka as the default channel implementation, you must also install the KnativeKafka CR on your cluster.

Procedure

  • Change the KnativeEventing custom resource to add configuration details for the default-ch-webhook config map:

    apiVersion: operator.knative.dev/v1beta1
    kind: KnativeEventing
    metadata:
      name: knative-eventing
      namespace: knative-eventing
    spec:
      config: 
    1
    
        default-ch-webhook: 
    2
    
          default-ch-config: |
            clusterDefault: 
    3
    
              apiVersion: messaging.knative.dev/v1
              kind: InMemoryChannel
              spec:
                delivery:
                  backoffDelay: PT0.5S
                  backoffPolicy: exponential
                  retry: 5
            namespaceDefaults: 
    4
    
              my-namespace:
                apiVersion: messaging.knative.dev/v1beta1
                kind: KafkaChannel
                spec:
                  numPartitions: 1
                  replicationFactor: 1
    1
    In spec.config, you can specify the config maps that you want to add modified configurations for.
    2
    You can use the default-ch-webhook config map to specify the default channel implementation for the cluster or for one or more namespaces.
    3
    The cluster-wide default channel type configuration. In this example, the default channel implementation for the cluster is InMemoryChannel.
    4
    The namespace-scoped default channel type configuration. In this example, the default channel implementation for the my-namespace namespace is KafkaChannel.
    Important

    Configuring a namespace-specific default overrides any cluster-wide settings.

6.5. Security configuration for channels

Transport Layer Security (TLS) is used by Apache Kafka clients and servers to encrypt traffic between Knative and Kafka, as well as for authentication. TLS is the only supported method of traffic encryption for the Knative broker implementation for Apache Kafka.

Prerequisites

  • You have cluster or dedicated administrator permissions on OpenShift Container Platform.
  • The OpenShift Serverless Operator, Knative Eventing, and the KnativeKafka CR are installed on your OpenShift Container Platform cluster.
  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.
  • You have a Kafka cluster CA certificate stored as a .pem file.
  • You have a Kafka cluster client certificate and a key stored as .pem files.
  • Install the OpenShift CLI (oc).

Procedure

  1. Create the certificate files as secrets in your chosen namespace:

    $ oc create secret -n <namespace> generic <kafka_auth_secret> \
      --from-file=ca.crt=caroot.pem \
      --from-file=user.crt=certificate.pem \
      --from-file=user.key=key.pem
    Important

    Use the key names ca.crt, user.crt, and user.key. Do not change them.

  2. Start editing the KnativeKafka custom resource:

    $ oc edit knativekafka
  3. Reference your secret and the namespace of the secret:

    apiVersion: operator.serverless.openshift.io/v1alpha1
    kind: KnativeKafka
    metadata:
      namespace: knative-eventing
      name: knative-kafka
    spec:
      channel:
        authSecretName: <kafka_auth_secret>
        authSecretNamespace: <kafka_auth_secret_namespace>
        bootstrapServers: <bootstrap_servers>
        enabled: true
      source:
        enabled: true
    Note

    Make sure to specify the matching port in the bootstrap server.

    For example:

    apiVersion: operator.serverless.openshift.io/v1alpha1
    kind: KnativeKafka
    metadata:
      namespace: knative-eventing
      name: knative-kafka
    spec:
      channel:
        authSecretName: tls-user
        authSecretNamespace: kafka
        bootstrapServers: eventing-kafka-bootstrap.kafka.svc:9094
        enabled: true
      source:
        enabled: true

Simple Authentication and Security Layer (SASL) is used by Apache Kafka for authentication. If you use SASL authentication on your cluster, users must provide credentials to Knative for communicating with the Kafka cluster; otherwise events cannot be produced or consumed.

Prerequisites

  • You have cluster or dedicated administrator permissions on OpenShift Container Platform.
  • The OpenShift Serverless Operator, Knative Eventing, and the KnativeKafka CR are installed on your OpenShift Container Platform cluster.
  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.
  • You have a username and password for a Kafka cluster.
  • You have chosen the SASL mechanism to use, for example, PLAIN, SCRAM-SHA-256, or SCRAM-SHA-512.
  • If TLS is enabled, you also need the ca.crt certificate file for the Kafka cluster.
  • Install the OpenShift CLI (oc).

Procedure

  1. Create the certificate files as secrets in your chosen namespace:

    $ oc create secret -n <namespace> generic <kafka_auth_secret> \
      --from-literal=protocol="SASL_SSL"
      --from-literal=sasl.mechanism="SCRAM-SHA-512" \
      --from-file=ca.crt=caroot.pem \
      --from-literal=password="SecretPassword" \
      --from-literal=user="my-sasl-user"
    • Use the key names protocol, sasl.mechanism, ca.crt, password, and user. Do not change them.

      Note

      The ca.crt key is optional if the Kafka cluster uses a certificate signed by a public CA whose certificate is already in the system truststore.

  2. Start editing the KnativeKafka custom resource:

    $ oc edit knativekafka
  3. Reference your secret and the namespace of the secret:

    apiVersion: operator.serverless.openshift.io/v1alpha1
    kind: KnativeKafka
    metadata:
      namespace: knative-eventing
      name: knative-kafka
    spec:
      channel:
        authSecretName: <kafka_auth_secret>
        authSecretNamespace: <kafka_auth_secret_namespace>
        bootstrapServers: <bootstrap_servers>
        enabled: true
      source:
        enabled: true
    Note

    Make sure to specify the matching port in the bootstrap server.

    For example:

    apiVersion: operator.serverless.openshift.io/v1alpha1
    kind: KnativeKafka
    metadata:
      namespace: knative-eventing
      name: knative-kafka
    spec:
      channel:
        authSecretName: scram-user
        authSecretNamespace: kafka
        bootstrapServers: eventing-kafka-bootstrap.kafka.svc:9093
        enabled: true
      source:
        enabled: true

Chapter 7. Subscriptions

7.1. Creating subscriptions

After you have created a channel and an event sink, you can create a subscription to enable event delivery. Subscriptions are created by configuring a Subscription object, which specifies the channel and the sink (also known as a subscriber) to deliver events to.

7.1.1. Creating a subscription

After you have created a channel and an event sink, you can create a subscription to enable event delivery. Using the OpenShift Container Platform web console provides a streamlined and intuitive user interface to create a subscription.

Prerequisites

  • You have installed the OpenShift Serverless Operator, Knative Serving, and Knative Eventing on your OpenShift Container Platform cluster.
  • You have logged in to the web console.
  • You have created an event sink, such as a Knative service, and a channel.
  • You have created a project or have access to a project with the appropriate roles and privileges to create applications and other workloads in OpenShift Container Platform.

Procedure

  1. Navigate to the Topology page.
  2. Create a subscription using one of the following methods:

    1. Hover over the channel that you want to create a subscription for, and drag the arrow. The Add Subscription option is displayed.

      Create a subscription for the channel
      1. Select your sink in the Subscriber list.
      2. Click Add.
    2. If the service is available in the Topology view under the same namespace or project as the channel, click the channel that you want to create a subscription for, and drag the arrow directly to a service to immediately create a subscription from the channel to that service.

Verification

  • After you create the subscription, the Topology view shows it as a line that connects the channel to the service.

    Subscription in the Topology view

7.1.2. Creating a subscription by using YAML

After you create a channel and an event sink, create a subscription to deliver events. You can define Knative resources declaratively by using YAML files. To create a subscription, create a YAML file that defines a Subscription object, and then apply the file by using the oc apply command.

Prerequisites

  • You have installed the OpenShift Serverless Operator and Knative Eventing on the cluster.
  • Install the OpenShift CLI (oc).
  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.

Procedure

  • Create a Subscription object:

    • Create a YAML file and copy the following sample code into it:

      apiVersion: messaging.knative.dev/v1
      kind: Subscription
      metadata:
        name: my-subscription 
      1
      
        namespace: default
      spec:
        channel: 
      2
      
          apiVersion: messaging.knative.dev/v1
          kind: Channel
          name: example-channel
        delivery: 
      3
      
          deadLetterSink:
            ref:
              apiVersion: serving.knative.dev/v1
              kind: Service
              name: error-handler
        subscriber: 
      4
      
          ref:
            apiVersion: serving.knative.dev/v1
            kind: Service
            name: event-display
      1
      Name of the subscription.
      2
      Configuration settings for the channel that the subscription connects to.
      3
      Configuration settings for event delivery. These settings define how the subscription handles events that the subscriber cannot receive. When you configure this option, the system sends failed events to the deadLetterSink. The system drops the event, does not try redelivery, and logs an error. The deadLetterSink value must be a Destination.
      4
      Configuration settings for the subscriber. The subscriber is the event sink that receives events from the channel.
    • Apply the YAML file:

      $ oc apply -f <filename>

After you have created a channel and an event sink, you can create a subscription to enable event delivery. Using the Knative (kn) CLI to create subscriptions provides a more streamlined and intuitive user interface than modifying YAML files directly. You can use the kn subscription create command with the appropriate flags to create a subscription.

Prerequisites

  • You have installed the OpenShift Serverless Operator and Knative Eventing on your OpenShift Container Platform cluster.
  • You have installed the Knative (kn) CLI.
  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.

Procedure

  • Create a subscription to connect a sink to a channel:

    $ kn subscription create <subscription_name> \
      --channel <group:version:kind>:<channel_name> \ 
    1
    
      --sink <sink_prefix>:<sink_name> \ 
    2
    
      --sink-dead-letter <sink_prefix>:<sink_name> 
    3
    1
    Use --channel to specify the source of CloudEvents to process. You can give the channel name. If you do not use the default InMemoryChannel defined by the Channel custom resource, prefix the channel name with <group:version:kind> for the channel type. For example, use messaging.knative.dev:v1beta1:KafkaChannel for an Apache Kafka channel.
    2
    Use --sink to specify the event destination. By default, the system treats <sink_name> as a Knative service with that name in the same namespace as the subscription. Use one of the following prefixes to specify the sink type:
    ksvc
    A Knative service.
    channel
    Specify the channel to use as the destination. Reference only default channel types.
    broker
    An Eventing broker.
    3
    Optional: Use the optional --sink-dead-letter flag to specify a sink that receives events when delivery fails.

    Example command

    $ kn subscription create mysubscription --channel mychannel --sink ksvc:event-display

    Example output

    Subscription 'mysubscription' created in namespace 'default'.

Verification

  • To confirm that a subscription connects the channel to the event sink, or subscriber, list the existing subscriptions and inspect the output by running the following command:

    $ kn subscription list

    Example output

    NAME            CHANNEL             SUBSCRIBER           REPLY   DEAD LETTER SINK   READY   REASON
    mysubscription   Channel:mychannel   ksvc:event-display                              True

Deleting a subscription

  • Delete a subscription:

    $ kn subscription delete <subscription_name>

7.1.4. Next steps

7.2. Managing subscriptions

You can use the kn subscription describe command to print information about a subscription in the terminal by using the Knative (kn) CLI. Using the Knative CLI to describe subscriptions provides a more streamlined and intuitive user interface than viewing YAML files directly.

Prerequisites

  • You have installed the Knative (kn) CLI.
  • You have created a subscription in your cluster.

Procedure

  • Describe a subscription:

    $ kn subscription describe <subscription_name>

    Example output

    Name:            my-subscription
    Namespace:       default
    Annotations:     messaging.knative.dev/creator=openshift-user, messaging.knative.dev/lastModifier=min ...
    Age:             43s
    Channel:         Channel:my-channel (messaging.knative.dev/v1)
    Subscriber:
      URI:           http://edisplay.default.example.com
    Reply:
      Name:          default
      Resource:      Broker (eventing.knative.dev/v1)
    DeadLetterSink:
      Name:          my-sink
      Resource:      Service (serving.knative.dev/v1)
    
    Conditions:
      OK TYPE                  AGE REASON
      ++ Ready                 43s
      ++ AddedToChannel        43s
      ++ ChannelReady          43s
      ++ ReferencesResolved    43s

You can use the kn subscription list command to list existing subscriptions on your cluster by using the Knative (kn) CLI. Using the Knative CLI to list subscriptions provides a streamlined and intuitive user interface.

Prerequisites

  • You have installed the Knative (kn) CLI.

Procedure

  • List subscriptions on your cluster:

    $ kn subscription list

    Example output

    NAME             CHANNEL             SUBSCRIBER           REPLY   DEAD LETTER SINK   READY   REASON
    mysubscription   Channel:mychannel   ksvc:event-display                              True

You can use the kn subscription update command as well as the appropriate flags to update a subscription from the terminal by using the Knative (kn) CLI. Using the Knative CLI to update subscriptions provides a more streamlined and intuitive user interface than updating YAML files directly.

Prerequisites

  • You have installed the Knative (kn) CLI.
  • You have created a subscription.

Procedure

  • Update a subscription:

    $ kn subscription update <subscription_name> \
      --sink <sink_prefix>:<sink_name> \ 
    1
    
      --sink-dead-letter <sink_prefix>:<sink_name> 
    2
    1
    --sink specifies the updated target destination to which the event should be delivered. You can specify the type of the sink by using one of the following prefixes:
    ksvc
    A Knative service.
    channel
    A channel that should be used as destination. Only default channel types can be referenced here.
    broker
    An Eventing broker.
    2
    Optional: --sink-dead-letter is an optional flag that can be used to specify a sink which events should be sent to in cases where events fail to be delivered. For more information, see the OpenShift Serverless Event delivery documentation.

    Example command

    $ kn subscription update mysubscription --sink ksvc:event-display

Chapter 8. Event delivery

You can configure event delivery parameters that are applied in cases where an event fails to be delivered to an event sink. Different channel and broker types have their own behavior patterns that are followed for event delivery.

Configuring event delivery parameters, including a dead letter sink, ensures that any events that fail to be delivered to an event sink are retried. Otherwise, undelivered events are dropped.

Important

If an event is successfully delivered to a channel or broker receiver for Apache Kafka, the receiver responds with a 202 status code, which means that the event has been safely stored inside a Kafka topic and is not lost. If the receiver responds with any other status code, the event is not safely stored, and steps must be taken by the user to resolve the issue.

8.1. Configurable event delivery parameters

You can configure the following parameters for event delivery:

Dead letter sink
You can configure the deadLetterSink delivery parameter to store events that fail delivery in the specified event sink. The system drops undelivered events that are not stored in a dead letter sink. The dead letter sink can be any addressable object that conforms to the Knative Eventing sink contract, such as a Knative service, a Kubernetes service, or a URI.
Retries
You can configure the retry delivery parameter with an integer value to set the minimum number of delivery attempts before the system sends the event to the dead letter sink.
Back off delay
You can set the backoffDelay delivery parameter to specify the delay before the system retries event delivery after a failure. Specify the backoffDelay value by using the ISO 8601 duration format. For example, PT1S specifies a delay of 1 second.
Back off policy
You can use the backoffPolicy delivery parameter to specify the retry backoff policy. Specify the policy as either linear or exponential. With the linear policy, the system calculates the backoff delay as backoffDelay * <numberOfRetries>. With the exponential policy, the system calculates the backoff delay as backoffDelay * 2^<numberOfRetries>.

You can configure event delivery parameters for Broker, Trigger, Channel, and Subscription objects. If you configure event delivery parameters for a broker or channel, these parameters propagates to triggers or subscriptions created for those objects. You can also set event delivery parameters for triggers or subscriptions to override the settings for the broker or channel.

Example Broker object

apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
# ...
spec:
  delivery:
    deadLetterSink:
      ref:
        apiVersion: eventing.knative.dev/v1alpha1
        kind: KafkaSink
        name: <sink_name>
    backoffDelay: <duration>
    backoffPolicy: <policy_type>
    retry: <integer>
# ...

Example Trigger object

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
# ...
spec:
  broker: <broker_name>
  delivery:
    deadLetterSink:
      ref:
        apiVersion: serving.knative.dev/v1
        kind: Service
        name: <sink_name>
    backoffDelay: <duration>
    backoffPolicy: <policy_type>
    retry: <integer>
# ...

Example Channel object

apiVersion: messaging.knative.dev/v1
kind: Channel
metadata:
# ...
spec:
  delivery:
    deadLetterSink:
      ref:
        apiVersion: serving.knative.dev/v1
        kind: Service
        name: <sink_name>
    backoffDelay: <duration>
    backoffPolicy: <policy_type>
    retry: <integer>
# ...

Example Subscription object

apiVersion: messaging.knative.dev/v1
kind: Subscription
metadata:
# ...
spec:
  channel:
    apiVersion: messaging.knative.dev/v1
    kind: Channel
    name: <channel_name>
  delivery:
    deadLetterSink:
      ref:
        apiVersion: serving.knative.dev/v1
        kind: Service
        name: <sink_name>
    backoffDelay: <duration>
    backoffPolicy: <policy_type>
    retry: <integer>
# ...

Chapter 9. Event discovery

9.1. Listing event sources and event source types

It is possible to view a list of all event sources or event source types that exist or are available for use on your OpenShift Container Platform cluster. You can use the Knative (kn) CLI or the OpenShift Container Platform web console to list available event sources or event source types.

Using the Knative (kn) CLI provides a streamlined and intuitive user interface to view available event source types on your cluster.

You can list event source types that you can create and use on your cluster by using the kn source list-types CLI command.

Prerequisites

  • You have installed the OpenShift Serverless Operator and Knative Eventing on the cluster.
  • You have installed the Knative (kn) CLI.

Procedure

  1. List the available event source types in the terminal:

    $ kn source list-types

    You get an output similar to the following example:

    TYPE              NAME                                            DESCRIPTION
    ApiServerSource   apiserversources.sources.knative.dev            Watch and send Kubernetes API events to a sink
    PingSource        pingsources.sources.knative.dev                 Periodically send ping events to a sink
    SinkBinding       sinkbindings.sources.knative.dev                Binding for connecting a PodSpecable to a sink
  2. Optional: On OpenShift Container Platform, you can also list the available event source types in YAML format:

    $ kn source list-types -o yaml

It is possible to view a list of all available event source types on your cluster. Using the OpenShift Container Platform web console provides a streamlined and intuitive user interface to view available event source types.

9.3.1. Viewing available event source types

You can view all available event source types in your cluster by using the OpenShift Container Platform web console. You can identify which event sources are installed and create event-driven integrations.

Prerequisites

  • You have logged in to the OpenShift Container Platform web console.
  • You have installed the OpenShift Serverless Operator and Knative Eventing on your OpenShift Container Platform cluster.
  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.

Procedure

  1. Click +Add.
  2. Click Event Source.
  3. View the available event source types.

9.4. Listing event sources from the command line

Using the Knative (kn) CLI provides a streamlined and intuitive user interface to view existing event sources on your cluster.

You can list existing event sources by using the kn source list command.

Prerequisites

  • The OpenShift Serverless Operator and Knative Eventing are installed on the cluster.
  • You have installed the Knative (kn) CLI.

Procedure

  1. List the existing event sources in the terminal:

    $ kn source list

    Example output

    NAME   TYPE              RESOURCE                               SINK         READY
    a1     ApiServerSource   apiserversources.sources.knative.dev   ksvc:eshow2   True
    b1     SinkBinding       sinkbindings.sources.knative.dev       ksvc:eshow3   False
    p1     PingSource        pingsources.sources.knative.dev        ksvc:eshow1   True

  2. Optional: You can list event sources of a specific type only, by using the --type flag:

    $ kn source list --type <event_source_type>

    Example command

    $ kn source list --type PingSource

    Example output

    NAME   TYPE              RESOURCE                               SINK         READY
    p1     PingSource        pingsources.sources.knative.dev        ksvc:eshow1   True

Chapter 10. Tuning eventing configuration

You can override the default configurations for some specific deployments by modifying the workloads spec in the KnativeEventing custom resource (CR). Currently, overriding default configuration settings is supported for the eventing-controller, eventing-webhook, and imc-controller fields, as well as for the readiness and liveness fields for probes.

Important

The replicas spec cannot override the number of replicas for deployments that use the Horizontal Pod Autoscaler (HPA), and does not work for the eventing-webhook deployment.

Note

You can only override probes that are defined in the deployment by default.

10.1.1. Overriding deployment configurations

Currently, overriding default configuration settings is supported for the eventing-controller, eventing-webhook, and imc-controller fields, and for the readiness and liveness fields for probes.

Important

The replicas spec cannot override the number of replicas for deployments that use the Horizontal Pod Autoscaler (HPA), and does not work for the eventing-webhook deployment.

In the following example, a KnativeEventing CR overrides the eventing-controller deployment so that:

  • The eventing-controller readiness probe timeout is 10 seconds.
  • The deployment specifies CPU and memory resource limits.
  • The deployment runs with 3 replicas.
  • The deployment includes the example-label: label label.
  • The deployment includes the example-annotation: annotation annotation.
  • The nodeSelector field selects nodes with the disktype: hdd label.

KnativeEventing CR example

apiVersion: operator.knative.dev/v1beta1
kind: KnativeEventing
metadata:
  name: knative-eventing
  namespace: knative-eventing
spec:
  workloads:
  - name: eventing-controller
    readinessProbes: 
1

      - container: controller
        timeoutSeconds: 10
    resources:
    - container: eventing-controller
      requests:
        cpu: 300m
        memory: 100Mi
      limits:
        cpu: 1000m
        memory: 250Mi
    replicas: 3
    labels:
      example-label: label
    annotations:
      example-annotation: annotation
    nodeSelector:
      disktype: hdd

1
You can use the readiness and liveness probe overrides to override all fields of a probe in a container of a deployment as specified in the Kubernetes API except for the fields related to the probe handler: exec, grpc, httpGet, and tcpSocket.
Note

The KnativeEventing CR label and annotation settings override the deployment’s labels and annotations for both the deployment itself and the resulting pods.

You can change templates for generating consumer group IDs and topic names used by your triggers, brokers, and channels.

Prerequisites

  • You have cluster or dedicated administrator permissions on OpenShift Container Platform.
  • You have installed the OpenShift Serverless Operator, Knative Eventing, and the KnativeKafka custom resource (CR) on your OpenShift Container Platform cluster.
  • You have created a project or have access to a project that has the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.
  • You have installed the OpenShift CLI (oc).

Procedure

  1. To change templates for generating consumer group IDs and topic names used by your triggers, brokers, and channels, change the KnativeKafka resource:

    apiVersion: v1
    kind: KnativeKafka
    metadata:
      name: knative-kafka
      namespace: knative-eventing
    # ...
    spec:
      config:
        config-kafka-features:
          triggers.consumergroup.template: <template> 
    1
    
          brokers.topic.template: <template> 
    2
    
          channels.topic.template: <template> 
    3
    1
    The template for generating the consumer group ID used by your triggers. Use a valid Go text/template value. Defaults to "knative-trigger-{{ .Namespace }}-{{ .Name }}".
    2
    The template for generating Kafka topic names used by your brokers. Use a valid Go text/template value. Defaults to "knative-broker-{{ .Namespace }}-{{ .Name }}".
    3
    The template for generating Kafka topic names used by your channels. Use a valid Go text/template value. Defaults to "messaging-kafka.{{ .Namespace }}.{{ .Name }}".

    Example template configuration

    apiVersion: v1
    kind: KnativeKafka
    metadata:
      name: knative-kafka
      namespace: knative-eventing
    # ...
    spec:
      config:
        config-kafka-features:
          triggers.consumergroup.template: "{% raw %}"knative-trigger-{{ .Namespace }}-{{ .Name }}-{{ .annotations.my-annotation }}"{% endraw %}"
          brokers.topic.template: "{% raw %}"knative-broker-{{ .Namespace }}-{{ .Name }}-{{ .annotations.my-annotation }}"{% endraw %}"
          channels.topic.template: "{% raw %}"messaging-kafka.{{ .Namespace }}.{{ .Name }}-{{ .annotations.my-annotation }}"{% endraw %}"

  2. Apply the KnativeKafka YAML file:

    $ oc apply -f <knative_kafka_filename>

10.2. High availability

High availability (HA) is a standard feature of Kubernetes APIs that helps to ensure that APIs stay operational if a disruption occurs. In an HA deployment, if an active controller crashes or is deleted, another controller is readily available. This controller takes over processing of the APIs that were being serviced by the controller that is now unavailable.

HA in OpenShift Serverless is available through leader election, which is enabled by default after the Knative Serving or Eventing control plane is installed. When using a leader election HA pattern, instances of controllers are already scheduled and running inside the cluster before they are required. These controller instances compete to use a shared resource, known as the leader election lock. The instance of the controller that has access to the leader election lock resource at any given time is called the leader.

HA in OpenShift Serverless is available through leader election, which is enabled by default after the Knative Serving or Eventing control plane is installed. When using a leader election HA pattern, instances of controllers are already scheduled and running inside the cluster before they are required. These controller instances compete to use a shared resource, known as the leader election lock. The instance of the controller that has access to the leader election lock resource at any given time is called the leader.

By default, Knative Eventing runs the eventing-controller, eventing-webhook, imc-controller, imc-dispatcher, and mt-broker-controller components with high availability (HA). Each component runs with two replicas. You can change the number of replicas for these components by modifying the spec.high-availability.replicas value in the KnativeEventing custom resource (CR).

Note

For Knative Eventing, the mt-broker-filter and mt-broker-ingress deployments are not scaled by HA. If your environment requires more deployments, scale these components manually.

Prerequisites

  • You have cluster administrator permissions on OpenShift Container Platform, or you have cluster or dedicated administrator permissions on Red Hat OpenShift Service on AWS or OpenShift Dedicated.
  • You have installed the OpenShift Serverless Operator and Knative Eventing are on your cluster.

Procedure

  1. In the OpenShift Container Platform web console, navigate to OperatorHubInstalled Operators.
  2. Select the knative-eventing namespace.
  3. Click Knative Eventing in the list of Provided APIs for the OpenShift Serverless Operator to go to the Knative Eventing tab.
  4. Click knative-eventing, then go to the YAML tab in the knative-eventing page.
  5. Change the number of replicas in the KnativeEventing CR:

    Example YAML

    apiVersion: operator.knative.dev/v1beta1
    kind: KnativeEventing
    metadata:
      name: knative-eventing
      namespace: knative-eventing
    spec:
      high-availability:
        replicas: 3

  6. You can also specify the number of replicas for a specific workload.

    Note

    Workload-specific configuration overrides the global setting for Knative Eventing.

    Example YAML

    apiVersion: operator.knative.dev/v1beta1
    kind: KnativeEventing
    metadata:
      name: knative-eventing
      namespace: knative-eventing
    spec:
      high-availability:
        replicas: 3
      workloads:
      - name: mt-broker-filter
        replicas: 3

  7. Verify that the deployment respects the high availability limits:

    Example command

    $ oc get hpa -n knative-eventing

    Example output

    NAME                 REFERENCE                      TARGETS   MINPODS   MAXPODS   REPLICAS   AGE
    broker-filter-hpa    Deployment/mt-broker-filter    1%/70%    3         12        3          112s
    broker-ingress-hpa   Deployment/mt-broker-ingress   1%/70%    3         12        3          112s
    eventing-webhook     Deployment/eventing-webhook    4%/100%   3         7         3          115s

By default, the Knative broker implementation for Apache Kafka runs the kafka-controller and kafka-webhook-eventing components with high availability (HA). Each component runs with two replicas. You can change the number of replicas for these components by modifying the spec.high-availability.replicas value in the KnativeKafka custom resource (CR).

Prerequisites

  • You have cluster administrator permissions on OpenShift Container Platform, or you have cluster or dedicated administrator permissions on Red Hat OpenShift Service on AWS or OpenShift Dedicated.
  • You have installed the OpenShift Serverless Operator and Knative broker for Apache Kafka on your cluster.

Procedure

  1. In the OpenShift Container Platform web console, navigate to OperatorHubInstalled Operators.
  2. Select the knative-eventing namespace.
  3. Click Knative Kafka in the list of Provided APIs for the OpenShift Serverless Operator to go to the Knative Kafka tab.
  4. Click knative-kafka, then go to the YAML tab in the knative-kafka page.
  5. Change the number of replicas in the KnativeKafka CR:

    Example YAML

    apiVersion: operator.serverless.openshift.io/v1alpha1
    kind: KnativeKafka
    metadata:
      name: knative-kafka
      namespace: knative-eventing
    spec:
      high-availability:
        replicas: 3

10.2.3. Overriding disruption budgets

A Pod Disruption Budget (PDB) is a standard feature of Kubernetes APIs that helps limit the disruption to an application when its pods need to be rescheduled for maintenance reasons.

Procedure

  • Override the default PDB for a specific resource by modifying the minAvailable configuration value in the KnativeEventing custom resource (CR).

Example PDB with a minAvailable seting of 70%

apiVersion: operator.knative.dev/v1beta1
kind: KnativeEventing
metadata:
 name: knative-eventing
 namespace: knative-eventing
spec:
 podDisruptionBudgets:
 - name: eventing-webhook
   minAvailable: 70%

Note

If you disable high-availability, for example, by changing the high-availability.replicas value to 1, make sure you also update the corresponding PDB minAvailable value to 0. Otherwise, the pod disruption budget prevents automatic cluster or Operator updates.

With the transport encryption feature, you can transport data and events over secured and encrypted HTTPS connections by using Transport Layer Security (TLS).

The transport-encryption feature flag is an enum configuration that defines how Addressables, such as Broker, Channel, and Sink, accept events. It controls whether Addressables must accept events over HTTP or HTTPS based on the selected setting.

The possible values for transport-encryption are as follows:

Expand
ValueDescription

disabled

  • Addressables can accept events to HTTPS endpoints.
  • Producers can send events to HTTPS endpoints.

permissive

  • Addressables must accept events on both HTTP and HTTPS endpoints.
  • Addressables must advertise both HTTP and HTTPS endpoints.
  • Producers must send events to HTTPS endpoints, if available.

strict

  • Addressables must not accept events to non-HTTPS endpoints.
  • Addressables must only advertise HTTPS endpoints.

ClusterIssuers are Kubernetes resources that represent certificate authorities (CAs) that can generate signed certificates by honoring certificate signing requests. All cert-manager certificates require a referenced issuer in a ready condition to attempt to honor the request. For more details, see Issuer.

Important

For simplicity, this procedure uses a SelfSigned issuer as the root certificate authority. For more details about SelfSigned issuer implications and limitations, see SelfSigned issuers. If you are using a custom public key infrastructure (PKI), you must configure it so its privately signed CA certificates are recognized across the cluster. For more details about cert-manager, see certificate authorities (CAs). You can use any other issuer that is usable for cluster-local services.

Prerequisites

  • You have cluster administrator permissions on OpenShift Container Platform, or you have cluster or dedicated administrator permissions on Red Hat OpenShift Service on AWS or OpenShift Dedicated.
  • You have installed the OpenShift Serverless Operator.
  • You have installed the cert-manager Operator for Red Hat OpenShift.
  • You have installed the OpenShift (oc) CLI.

Procedure

  1. Create a SelfSigned ClusterIssuer resource as follows:

    apiVersion: cert-manager.io/v1
    kind: ClusterIssuer
    metadata:
      name: knative-eventing-selfsigned-issuer
    spec:
      selfSigned: {}
  2. Apply the ClusterIssuer resource by running the following command:

    $ oc apply -f <filename>
  3. Create a root certificate by using the SelfSigned ClusterIssuer resource as follows:

    apiVersion: cert-manager.io/v1
    kind: Certificate
    metadata:
      name: knative-eventing-selfsigned-ca
      namespace: cert-manager 
    1
    
    spec:
      secretName: knative-eventing-ca 
    2
    
      isCA: true
      commonName: selfsigned-ca
      privateKey:
        algorithm: ECDSA
        size: 256
      issuerRef:
        name: knative-eventing-selfsigned-issuer
        kind: ClusterIssuer
        group: cert-manager.io
    1
    Specify the cert-manager Operator for Red Hat OpenShift that is used by default.
    2
    Specify the secret where the certificate is stored. The name is later used by the ClusterIssuer for Eventing.
  4. Apply the Certificate resource by running the following:

    $ oc apply -f <filename>

ClusterIssuers are Kubernetes resources that represent certificate authorities (CAs) that can generate signed certificates by honoring certificate signing requests.

Prerequisites

  • You have cluster administrator permissions on OpenShift Container Platform, or you have cluster or dedicated administrator permissions on Red Hat OpenShift Service on AWS or OpenShift Dedicated.
  • You have installed the OpenShift Serverless Operator.
  • You have installed the cert-manager Operator for Red Hat OpenShift.
  • You have installed the OpenShift (oc) CLI.

Procedure

  1. Create the knative-eventing-ca-issuer ClusterIssuer resource as follows:

    Every Eventing component uses this issuer to issue their server’s certs.

    apiVersion: cert-manager.io/v1
    kind: ClusterIssuer
    metadata:
      name: knative-eventing-ca-issuer
    spec:
      ca:
        secretName: knative-eventing-ca 
    1
    1
    The secretName value in the cert-manager namespace (default for cert-manager Operator for Red Hat OpenShift) contains the certificate that can be used by Knative Eventing components.
    Note

    The ClusterIssuer name must be knative-eventing-ca-issuer.

  2. Apply the ClusterIssuer resource by running the following command:

    $ oc apply -f <filename>

You can enable transport encryption in KnativeEventing by setting the transport-encryption feature to strict.

Prerequisites

  • You have cluster administrator permissions on OpenShift Container Platform, or you have cluster or dedicated administrator permissions on Red Hat OpenShift Service on AWS or OpenShift Dedicated.
  • You have installed the OpenShift Serverless Operator.
  • You have installed the cert-manager Operator for Red Hat OpenShift.
  • You have installed the OpenShift (oc) CLI.

Procedure

  1. Enable the transport-encryption in KnativeEventing as follows:

    apiVersion: operator.knative.dev/v1beta1
    kind: KnativeEventing
    metadata:
      name: knative-eventing
      namespace: knative-eventing
    spec:
    
      # Other spec fields omitted ...
      # ...
    
      config:
        features:
          transport-encryption: strict
  2. Apply the KnativeEventing resource by running the following command:

    $ oc apply -f <filename>

11.4. Configuring additional CA trust bundles

By default, Eventing clients trust the OpenShift CA bundle configured for custom PKI. For more details, see Configuring a custom PKI.

Note

When a new connection is established, Eventing clients automatically include these CA bundles in their trusted list.

Prerequisites

  • You have cluster administrator permissions on OpenShift Container Platform, or you have cluster or dedicated administrator permissions on Red Hat OpenShift Service on AWS or OpenShift Dedicated.
  • You have installed the OpenShift Serverless Operator.
  • You have installed the cert-manager Operator for Red Hat OpenShift.

Procedure

  • Create a CA bundle for Eventing as follows:

    kind: ConfigMap
    metadata:
      name: <my_org_eventing_bundle> 
    1
    
      namespace: knative-eventing
      labels:
        networking.knative.dev/trust-bundle: "true"
    data: 
    2
    
      ca.crt: ...
      ca1.crt: ...
      tls.crt: ...
    1
    Use a unique name to avoid conflicts with existing or future Eventing config maps.
    2
    All keys with valid PEM-encoded CA bundles are trusted by Eventing clients.

To create a custom event source, use a SinkBinding. The SinkBinding can inject the configured CA trust bundles as a projected volume into each container by using the knative-custom-certs directory.

In specific cases, you might inject company-specific CA trust bundles into base container images and automatically configure runtimes, such as OpenJDK or Node.js, and so on. to trust those CA bundles. In such cases, you might not need to configure your clients.

By using the my_org_eventing_bundle config map from the previous example, with the ca.crt, ca1.crt, and tls.crt data keys, the knative-custom-certs directory has the following layout:

/knative-custom-certs/ca.crt
/knative-custom-certs/ca1.crt
/knative-custom-certs/tls.crt

You can use these files to add CA trust bundles to HTTP clients that send events to Eventing.

Note

Depending on the runtime, programming language, or library you use, different methods exist for configuring custom CA cert files, such as using command-line flags, environment variables, or reading the content of the files.

If you are using a SelfSigned ClusterIssuer resource, you can add the CA to the Eventing CA trust bundles.

Prerequisites

  • You have cluster administrator permissions on OpenShift Container Platform, or you have cluster or dedicated administrator permissions on Red Hat OpenShift Service on AWS or OpenShift Dedicated.
  • You have installed the OpenShift Serverless Operator.
  • You have installed the cert-manager Operator for Red Hat OpenShift.
  • You have installed the OpenShift (oc) CLI.

Procedure

  1. Export the CA from the knative-eventing-ca secret in the cert-manager Operator for Red Hat OpenShift namespace (default is cert-manager certificate) by running the following command:

    $ oc get secret -n cert-manager knative-eventing-ca -o=jsonpath='{.data.ca\.crt}' | base64 -d > ca.crt
  2. Create a CA trust bundle in the knative-eventing namespace by running the following command:

    $ oc create configmap -n knative-eventing my-org-selfsigned-ca-bundle --from-file=ca.crt
  3. Label the ConfigMap by running the following command:

    $ oc label configmap -n knative-eventing my-org-selfsigned-ca-bundle networking.knative.dev/trust-bundle=true

11.7. Ensuring seamless CA rotation

Ensuring seamless CA rotation is essential to avoid service downtime or to handle emergencies.

Prerequisites

  • You have cluster administrator permissions on OpenShift Container Platform, or you have cluster or dedicated administrator permissions on Red Hat OpenShift Service on AWS or OpenShift Dedicated.
  • You have installed the OpenShift Serverless Operator.
  • You have installed the cert-manager Operator for Red Hat OpenShift.
  • You have installed the OpenShift (oc) CLI.

Procedure

  1. Create a CA certificate.
  2. Add the public key of the new CA certificate to the CA trust bundles.

    Ensure that you also keep the public key of the existing CA.

  3. Ensure all clients use the latest CA trust bundles.

    Knative Eventing components automatically reload the updated CA trust bundles. For custom workloads that consume trust bundles, reload or restart them as needed.

  4. Update the knative-eventing-ca-issuer ClusterIssuer to reference the secret containing the CA certificate that you created in step 1.
  5. Force cert-manager to renew certificates in the knative-eventing namespace.

    For more information about cert-manager, see Reissuance triggered by user actions.

  6. As soon as the CA rotation is fully completed, remove the public key of the old CA from the trust bundle config map.

11.8. Verifying transport encryption in Eventing

To confirm that transport encryption is correctly configured, you can create and test an InMemoryChannel resource. Follow the steps to ensure that it uses HTTPS as expected.

Prerequisites

  • You have cluster administrator permissions on OpenShift Container Platform, or you have cluster or dedicated administrator permissions on Red Hat OpenShift Service on AWS or OpenShift Dedicated.
  • You have installed the OpenShift Serverless Operator.
  • You have installed the cert-manager Operator for Red Hat OpenShift.
  • You have installed the OpenShift (oc) CLI.

Procedure

  1. Create an InMemoryChannel resource as follows:

    apiVersion: messaging.knative.dev/v1
    kind: InMemoryChannel
    metadata:
      name: transport-encryption-test
  2. Apply the InMemoryChannel resource by running the following command:

    $ oc apply -f <filename>
  3. View the InMemoryChannel address by running the following command:

    $ oc get inmemorychannels.messaging.knative.dev transport-encryption-test

    Example output

    NAME                        URL                                                                                           AGE   READY   REASON
    transport-encryption-test   https://imc-dispatcher.knative-eventing.svc.cluster.local/default/transport-encryption-test   17s   True

Knative Eventing provides a mechanism to secure event delivery to prevent unauthorized access. Event-driven systems often rely on multiple producers and consumers of events, and without proper authorization controls, malicious or unintended event flows could compromise the system.

To achieve fine-grained access control, Knative Eventing introduces the EventPolicy custom resource. This resource allows administrators and developers to define which entities are authorized to send events to specific consumers within a namespace. By applying EventPolicies, you can ensure that only trusted event sources or service accounts are able to send data to selected event consumers and this improves the overall security posture of their event-driven architecture.

Note

You must enable the transport-encryption feature flag along with authentication-oidc to ensure secure and authenticated event delivery.

12.1. Default authorization mode

Two key mechanisms govern Knative Eventing authorization. The EventPolicy custom resource defines explicit rules for event delivery. The default-authorization-mode feature flag applies when a resource does not reference an EventPolicy.

The supported authorization modes are as follows:

Expand
Authorization modeDescription

allow-all

The system permits all incoming event requests without restriction.

deny-all

The system denies all incoming event requests. To enable event delivery, you must configure an EventPolicy that explicitly allows the requests.

allow-same-namespace

(Default) The system allows only requests from subjects within the same namespace.

To configure the default authorization mode from the OpenShift Serverless Operator, you can edit the KnativeEventing custom resource as shown in the following example:

apiVersion: operator.knative.dev/v1beta1
kind: KnativeEventing
metadata:
  name: knative-eventing
  namespace: knative-eventing
spec:
  config:
    features:
      authentication-oidc: "enabled"
      default-authorization-mode: "deny-all"

12.2. EventPolicy resource overview

The EventPolicy resource defines rules for event delivery. It specifies that entities, such as service accounts or event sources, can send events and which consumers can receive them. You can also define additional criteria that CloudEvents must match before the system accepts them.

An EventPolicy includes the following three primary sections:

Expand

.spec.to

Defines the resources such as Brokers or Channels that the EventPolicy applies to.

.spec.from

Specify which sources or subjects can send events to the targets.

.spec.filters

(Optional) Specify advanced filtering conditions that the CloudEvent must satisfy before the system accepts it.

The example of an EventPolicy is as follows:

apiVersion: eventing.knative.dev/v1alpha1
kind: EventPolicy
metadata:
  name: my-event-policy
  namespace: default
spec:
  to:
    - ref:
        apiVersion: eventing.knative.dev/v1
        kind: Broker
        name: my-broker
    - selector:
        apiVersion: eventing.knative.dev/v1
        kind: Broker
        matchLabels:
          app: special-app
  from:
    - ref:
        apiVersion: sources.knative.dev/v1
        kind: PingSource
        name: my-source
        namespace: another-namespace
    - sub: system:serviceaccount:default:trusted-app
    - sub: "system:serviceaccount:default:other-*"
  filters:
    - cesql: "type IN ('order.created', 'order.updated', 'order.canceled')"
    - exact:
        type: com.github.push

12.3. Defining targets in an EventPolicy

The .spec.to field determines that consumers an EventPolicy applies to. If you do not specify this field, the EventPolicy applies to all resources in the namespace. You can specify many targets to broaden the scope of the policy. You can define targets in two main ways.

12.3.1. Specific resource reference

The to.ref method directly references a specific resource by name. For example, referencing a Broker named my-broker applies the EventPolicy only to that specific Broker. This method is most appropriate when you want to protect or restrict access to a well-defined, individual resource.

Example of to.ref method

to:
  - ref:
      apiVersion: eventing.knative.dev/v1
      kind: Broker
      name: my-broker

12.3.2. Label-based resource selection

The to.selector method uses label selectors to match many resources of a specific type. For example, specifying a label selector with app: special-app applies the EventPolicy to all Brokers that share that label. This method is suitable when you want the same authorization rules applied across a group of similar resources.

Example of to.selector method

to:
  - selector:
      apiVersion: eventing.knative.dev/v1
      kind: Broker
      matchLabels:
        app: special-app

The .spec.from field specifies the entities that can send events to the resources listed in .spec.to. You can reference specific event sources or subject service accounts. The field also supports wildcard matching for broader coverage. You can define senders in two main ways.

12.4.1. Event source by name

The from.ref method directly references an event source resource. For example, referencing a PingSource in a different namespace ensures that only that specific source can deliver events to the defined targets.

Example of from.ref method

from:
  - ref:
      apiVersion: sources.knative.dev/v1
      kind: PingSource
      name: my-source
      namespace: another-namespace

12.4.2. Subject based authorization

The from.sub field specifies subjects, such as service accounts, which can send events. You can use wildcard patterns with * to allow many accounts that match a pattern. For example, an EventPolicy can allow one trusted service account and allow all accounts that begin with other- in the same namespace.

Example of from.sub method

from:
  - sub: system:serviceaccount:default:trusted-app
  - sub: "system:serviceaccount:default:other-*"

12.5. Advanced CloudEvent filtering criteria

The .spec.filters method is optional and provides advanced criteria for event delivery. These filters restrict events based not only on the source but also on the properties of the CloudEvent.

For example, you can configure filters to allow only events of type com.github.push. You can also configure filters that match a CloudEvents SQL (CESQL) expression for event types such as order.created, order.updated, or order.canceled.

Example of spec.filters method

filters:
  - cesql: "type IN ('order.created', 'order.updated', 'order.canceled')"
  - exact:
      type: com.github.push

Note

Filters apply in addition to the .spec.from method. An event must satisfy both the sender authorization and the filter criteria before the system delivers it.

12.6. EventPolicy status and application

The status of an EventPolicy shows runtime information about resolved sources and indicates whether the policy is active and ready, as shown in the following example:

status:
  from:
    - system:serviceaccount:default:trusted-app
    - "system:serviceaccount:default:other-*"
  conditions:
    - type: Ready
      status: "True"
    - type: SubjectsResolved
      status: "True"

Event consumers, such as Brokers, reflect that EventPolicies apply to them in their status.

status:
  policies:
    - name: my-event-policy
      apiVersion: v1alpha1
  conditions:
    - type: Ready
      status: "True"
    - type: EventPoliciesReady
      status: "True"

These conditions indicate whether EventPolicies are ready and have been successfully applied to the resource.

If an incoming request does not satisfy any applicable EventPolicy, the system returns an HTTP 403 Forbidden status code. When many EventPolicies apply to a resource, the system evaluates them in parallel. The system accepts an event if it matches at least one policy. This behavior blocks unauthorized event deliveries while still allowing valid events that satisfy at least one policy.

12.7. Applying an EventPolicy

You can apply an EventPolicy to secure event delivery by following these end-to-end steps. The following example configures a Broker in namespace-1 to accept events only from a PingSource running in namespace-2.

Prerequisites

  • You have installed the OpenShift Serverless Operator and Knative Eventing on your OpenShift Container Platform cluster.
  • You have enabled the authentication-oidc feature.

Procedure

  1. Create two namespaces, deploy a Broker in namespace-1, and configure one PingSource in each namespace as shown in the following example:

    apiVersion: v1
    kind: Namespace
    metadata:
      name: namespace-1
    ---
    apiVersion: v1
    kind: Namespace
    metadata:
      name: namespace-2
    ---
    apiVersion: eventing.knative.dev/v1
    kind: Broker
    metadata:
      name: broker
      namespace: namespace-1
    ---
    # PingSource in namespace-1
    apiVersion: sources.knative.dev/v1
    kind: PingSource
    metadata:
      name: pingsource-1
      namespace: namespace-1
    spec:
      data: '{"message": "Hi from pingsource-1 from namespace-1"}'
      schedule: '*/1 * * * *'
      sink:
        ref:
          apiVersion: eventing.knative.dev/v1
          kind: Broker
          name: broker
          namespace: namespace-1
    ---
    # PingSource in namespace-2
    apiVersion: sources.knative.dev/v1
    kind: PingSource
    metadata:
      name: pingsource-2
      namespace: namespace-2
    spec:
      data: '{"message": "Hi from pingsource-2 from namespace-2"}'
      schedule: '*/1 * * * *'
      sink:
        ref:
          apiVersion: eventing.knative.dev/v1
          kind: Broker
          name: broker
          namespace: namespace-1
  2. Create an event-display service to show incoming events and add a Trigger to connect it to the Broker as shown in the following example:

    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: event-display
      namespace: namespace-1
    spec:
      replicas: 1
      selector:
        matchLabels:
          app: event-display
      template:
        metadata:
          labels:
            app: event-display
        spec:
          containers:
            - name: event-display
              image: gcr.io/knative-releases/knative.dev/eventing/cmd/event_display
              ports:
              - containerPort: 8080
    ---
    apiVersion: v1
    kind: Service
    metadata:
      name: event-display
      namespace: namespace-1
    spec:
      selector:
        app: event-display
      ports:
        - name: http
          port: 80
          targetPort: 8080
    ---
    apiVersion: eventing.knative.dev/v1
    kind: Trigger
    metadata:
      name: trigger
      namespace: namespace-1
    spec:
      broker: broker
      subscriber:
        ref:
          apiVersion: v1
          kind: Service
          name: event-display

    At this stage, OIDC remains disabled and no EventPolicy exists, so the event-display service shows events from both PingSources.

  3. Enable OIDC in Knative Eventing and create an EventPolicy by executing the following example command:

    $ oc -n knative-eventing patch KnativeEventing knative-eventing \
      --type merge \
      -p '{"spec":{"config":{"features":{"authentication-oidc":"enabled"}}}}'
  4. After you enable OIDC, create an EventPolicy that authorizes only the PingSource from namespace-2, as shown in the following example:

    apiVersion: eventing.knative.dev/v1alpha1
    kind: EventPolicy
    metadata:
      name: event-policy
      namespace: namespace-1
    spec:
      to:
        - ref:
            apiVersion: eventing.knative.dev/v1
            kind: Broker
            name: broker
      from:
        - ref:
            apiVersion: sources.knative.dev/v1
            kind: PingSource
            name: pingsource-2
            namespace: namespace-2

Verification

  1. Verify the EventPolicy to check the Broker status by executing the following command:

    $ oc -n namespace-1 get broker broker -o yaml
  2. View logs from the event-display service to confirm only pingsource-2 events arrive by executing the following command:

    $ oc -n namespace-1 logs -f -l app=event-display
  3. Delete the EventPolicy by executing the following command:

    $ oc -n namespace-1 delete eventpolicy event-policy
  4. Check the Broker status to confirm it has returned to the default allow-same-namespace mode by executing the following command:

    $ oc -n namespace-1 get broker broker -o yaml
  5. View the event-display service logs to confirm that only pingsource-1 events from the same namespace appear by executing the following command:

    $ oc -n namespace-1 logs -f -l app=event-display

The kube-rbac-proxy component provides internal authentication and authorization capabilities for Knative Eventing.

You can globally override resource allocation for the kube-rbac-proxy container by using the OpenShift Serverless Operator CR.

Note

You can also override resource allocation for a specific deployment.

The following configuration sets Knative Eventing kube-rbac-proxy minimum and maximum CPU and memory allocation:

KnativeEventing CR example

apiVersion: operator.knative.dev/v1beta1
kind: KnativeEventing
metadata:
  name: knative-eventing
  namespace: knative-eventing
spec:
  config:
    deployment:
      "kube-rbac-proxy-cpu-request": "10m" 
1

      "kube-rbac-proxy-memory-request": "20Mi" 
2

      "kube-rbac-proxy-cpu-limit": "100m" 
3

      "kube-rbac-proxy-memory-limit": "100Mi" 
4

1
Sets minimum CPU allocation.
2
Sets minimum RAM allocation.
3
Sets maximum CPU allocation.
4
Sets maximum RAM allocation.

You can globally override resource allocation for the kube-rbac-proxy container by using the OpenShift Serverless Operator CR.

Note

You can also override resource allocation for a specific deployment.

The following configuration sets Knative Kafka kube-rbac-proxy minimum and maximum CPU and memory allocation:

You get an output similar to the following example:

apiVersion: operator.serverless.openshift.io/v1alpha1
kind: KnativeKafka
metadata:
  name: knative-kafka
  namespace: knative-kafka
spec:
  config:
    deployment:
      "kube-rbac-proxy-cpu-request": "10m"
      "kube-rbac-proxy-memory-request": "20Mi"
      "kube-rbac-proxy-cpu-limit": "100m"
      "kube-rbac-proxy-memory-limit": "100Mi"
kube-rbac-proxy-cpu-request
Sets minimum CPU allocation.
kube-rbac-proxy-memory-request
Sets minimum RAM allocation.
kube-rbac-proxy-cpu-limit
Sets maximum CPU allocation.
kube-rbac-proxy-memory-limit
Sets maximum RAM allocation.

Chapter 14. Event transformation

EventTransform is a Knative API resource that enables declarative transformations of HTTP requests and responses without requiring custom code. With EventTransform, you can:

  • Modify the event attributes
  • Extract data from the event payloads
  • Reshape events to meet the requirements of different systems

EventTransform is designed as a flexible component in an event-driven architecture. You can place it at various points in the event flow to support seamless integration between diverse producers and consumers.

Note

EventTransform is an addressable resource and can be referenced from Knative sources, triggers, and subscriptions.

14.1. Key features of event transformation

You can use the EventTransform resource to simplify event manipulation, improve flexibility, and enable seamless integration across systems.

Expand
FeatureDescription

Declarative transformations

Define event transformations using standard Kubernetes resources without writing custom code.

JSONata expressions

Use JSONata to extract fields, reshape data, and perform complex transformations.

Addressable resource

Reference EventTransform directly from Knative sources, Triggers, or Subscriptions.

Flexible deployment

Place EventTransform at many points when flow depending on your architecture needs.

Sink configuration

Route transformed events to specific destinations by defining a sink.

Reply support

Transform and republish events using a built-in reply feature of Broker.

14.2. Common use cases for event transformation

You can use EventTransform to manipulate and reshape events based on system requirements. The most common use cases include the following:

14.2.1. Field extraction

You can extract specific fields from event payloads and expose them as CloudEvent attributes. This makes it easier to filter and route events downstream.

Example of extracting user ID from an event payload

apiVersion: eventing.knative.dev/v1alpha1
kind: EventTransform
metadata:
  name: extract-user-id
spec:
  jsonata:
    expression: |
      {
        "specversion": "1.0",
        "id": id,
        "type": "user.extracted",
        "source": "transform.user-extractor",
        "time": time,
        "userid": data.user.id,
        "data": $
      }

14.2.2. Event format conversion

You can convert the structure of an event into a different format so that it remains compatible with diverse consumer systems.

Example of converting an order event to a customer-centric format

apiVersion: eventing.knative.dev/v1alpha1
kind: EventTransform
metadata:
  name: format-converter
spec:
  sink:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: destination-service
  jsonata:
    expression: |
      {
        "specversion": "1.0",
        "id": id,
        "type": "order.converted",
        "source": "transform.format-converter",
        "time": time,
        "data": {
          "orderId": data.id,
          "customer": {
            "name": data.user.fullName,
            "email": data.user.email
          },
          "items": data.items
        }
      }

14.2.3. Event enrichment

You can add fixed or dynamic metadata such as environment or region to events without requiring any changes in the original producer.

Example of adding environment and region metadata to the event

apiVersion: eventing.knative.dev/v1alpha1
kind: EventTransform
metadata:
  name: event-enricher
spec:
  jsonata:
    expression: |
      {
        "specversion": "1.0",
        "id": id,                     /* Add the "id", "type", "source", and "time" attributes based on the input JSON object fields */
        "type": type,
        "source": source,
        "time": time,
        "environment": "production",  /* Add fixed environment and region attributes to the event metadata */
        "region": "us-west-1",
        "data": $                     /* Add the event transform input JSON body as CloudEvent "data" field */
      }

14.2.4. Event response reply transformation

You can transform not only the request sent to a sink but also the response received from the sink, enabling end-to-end event shaping.

Example of transforming both request and reply messages

apiVersion: eventing.knative.dev/v1alpha1
kind: EventTransform
metadata:apiVersion: eventing.knative.dev/v1alpha1
kind: EventTransform
metadata:
  name: request-reply-transform
spec:
  sink:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: processor-service
  jsonata:
    expression: |
      # Request transformation
      {
        "specversion": "1.0",
        "id": id,
        "type": "request.transformed",
        "source": source,
        "time": time,
        "data": data
      }
  reply:
    jsonata:
      expression: |
        # Reply transformation
        {
          "specversion": "1.0",
          "id": id,
          "type": "reply.transformed",
          "source": "transform.reply-processor",
          "time": time,
          "data": data
        }

14.3. Deployment patterns for event transformation

You can use EventTransform at different points in your event flow depending on your architecture needs. The following patterns are supported:

14.3.1. Source to Broker event transformation

You can route events from a Source to an EventTransform resource, apply transformation logic, and then forward them to a Broker so that only normalised or enriched events routes for consumption.

You can configure an ApiServerSource resource to send events to an EventTransform resource, which then transforms and routes them to the default Broker, as shown in the following example:

apiVersion: sources.knative.dev/v1
kind: ApiServerSource
metadata:
  name: k8s-events
spec:
  serviceAccountName: event-watcher
  resources:
    - apiVersion: v1
      kind: Event
  sink:
    ref:
      apiVersion: eventing.knative.dev/v1alpha1
      kind: EventTransform
      name: event-transformer
---
apiVersion: eventing.knative.dev/v1alpha1
kind: EventTransform
metadata:
  name: event-transformer
spec:
  sink:
    ref:
      apiVersion: eventing.knative.dev/v1
      kind: Broker
      name: default
  jsonata:
    expression: |
      # transformation expression

14.3.2. Trigger to Service event transformation

You can route events through BrokerTriggerEventTransform → Service or Sink. The Broker receives all events, the Trigger filters them by attributes, the EventTransform resource reshapes or enriches the filtered events, and the Service or Sink processes the results.

You can tailor events for specific consumers without changing the original producer or affecting other subscribers. Because transformations apply only after filtering, the system reshapes only relevant events. This behavior improves efficiency and reduces unnecessary processing.

You can filter events of type original.event.type, route them to an EventTransform, and deliver the transformed events to a Service with this configuration.

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: transform-trigger
spec:
  broker: default
  filter:
    attributes:
      type: original.event.type
  subscriber:
    ref:
      apiVersion: eventing.knative.dev/v1alpha1
      kind: EventTransform
      name: event-transformer
---
apiVersion: eventing.knative.dev/v1alpha1
kind: EventTransform
metadata:
  name: event-transformer
spec:
  sink:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: destination-service
  jsonata:
    expression: |
      # transformation expression

14.3.3. Broker reply event transformation

You can configure EventTransform without a sink to republish transformed events to the Broker, where additional Triggers or consumers can route and process them.

Note

When using the Broker reply feature, ensure the transformed events do not match the same Trigger that invoked the EventTransform. Otherwise, you risk creating an infinite event loop.

You can filter events of type original.event.type with a Trigger, transform them with EventTransform, and republish them to the Broker as transformed.event.type type. Updating the type or another attribute routes the event to different Triggers without reprocessing by the same Trigger.

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: transform-trigger
spec:
  broker: default
  filter:
    attributes:
      type: original.event.type
  subscriber:
    ref:
      apiVersion: eventing.knative.dev/v1alpha1
      kind: EventTransform
      name: event-transformer
---
apiVersion: eventing.knative.dev/v1alpha1
kind: EventTransform
metadata:
  name: event-transformer
spec:
  # No sink specified - reply to Broker
  jsonata:
    expression: |
      {
        "specversion": "1.0",
        "id": id,
        "time": time,
        "type": "transformed.event.type",
        "source": "transform.event-transformer",
        "data": $
      }

14.4. Event transformations for JSON with JSONata

JSONata is a lightweight query and transformation language for JSON data. In Knative EventTransform, JSONata expressions reshape events. Use these expressions to extract values from event data, promote fields to CloudEvent attributes, restructure payloads, add computed values, or apply conditional logic during event transformation.

You can specify a JSONata expression in the spec.jsonata.expression field of the EventTransform resource. The expression maps the incoming CloudEvent into a new CloudEvent as shown in the following example:

Example of simple event transformation by using JSONata

apiVersion: eventing.knative.dev/v1alpha1
kind: EventTransform
metadata:
  name: simple-transform
spec:
  jsonata:
    expression: |
      {
        "specversion": "1.0",
        "id": id,
        "time": time,
        "type": "transformed.type",
        "source": "transform.simple",
        "data": data
      }

14.4.1. CloudEvent structure in JSONata

When you use JSONata in an EventTransform, the input to the expression is the full CloudEvent object, which includes all standard attributes and the event payload.

Your JSONata expression must produce a valid CloudEvent. At a minimum, include the following fields to comply with the CloudEvents specification:

Expand
FieldRequirementDescription

specversion

Must be 1.0

Identifies the CloudEvents specification version.

id

Unique string

Serves as the unique identifier for the event.

type

Required

Indicates the event type, such as com.example.object.created.

source

Required

Identifies the context in which the event occurred.

data

Required

Contains the event payload being delivered.

By explicitly constructing these fields in your JSONata expression, you ensure that the transformed event is valid while still applying reshaping, enrichment, or conditional logic as needed.

14.5. Common transformation patterns

To effectively shape your event data, you can use common JSONata transformation patterns for preserving, extracting, restructuring, or conditionally modifying events.

14.5.1. Preserving the original event structure

You can add or adjust attributes while keeping the rest of the event unchanged, so that downstream consumers receive the original data with minimal modifications.

Example of adding a static attribute while preserving the original event structure

{
  "specversion": "1.0",
  "id": id,
  "type": type,
  "source": source,
  "time": time,
  "data": data,
  "newattribute": "static value"
}

14.5.2. Extracting fields as attributes

You can promote values from the payload to top-level CloudEvent attributes to make filtering and routing easier.

Example of extracting user ID and region from the payload and expose them as attributes

{
  "specversion": "1.0",
  "id": id,
  "type": "user.event",
  "source": source,
  "time": time,
  "userid": data.user.id,
  "region": data.region,
  "data": $
}

In JSONata, the $ symbol represents the entire input object. Using data: $ preserves the original event payload while promoting selected fields.

14.5.3. Restructuring an event data

Use JSONata to transform events into the schema required by another system by restructuring payloads, renaming fields, nesting objects, and performing calculations.

Example of restructuring an order event and calculate the total value of items

{
  "specversion": "1.0",
  "id": order.id,
  "type": "order.transformed",
  "source": "transform.order-processor",
  "time": order.time,
  "orderid": order.id,
  "data": {
    "customer": {
      "id": order.user.id,
      "name": order.user.name
    },
    "items": order.items.{ "sku": sku, "quantity": qty, "price": price },
    "total": $sum(order.items.(price * qty))
  }
}

Given the transformation earlier, and this JSON object as input:

{
  "order": {
    "time": "2024-04-05T17:31:05Z",
    "id": "8a76992e-cbe2-4dbe-96c0-7a951077089d",
    "user": {
      "id": "bd9779ef-cba5-4ad0-b89b-e23913f0a7a7",
      "name": "John Doe"
    },
    "items": [
      {"sku": "KNATIVE-1", "price": 99.99, "qty": 1},
      {"sku": "KNATIVE-2", "price": 129.99, "qty": 2}
    ]
  }
}
}

The transformation produces the following output:

{
  "specversion": "1.0",
  "id": "8a76992e-cbe2-4dbe-96c0-7a951077089d",
  "type": "order.transformed",
  "source": "transform.order-processor",
  "time": "2024-04-05T17:31:05Z",
  "orderid": "8a76992e-cbe2-4dbe-96c0-7a951077089d",
  "data": {
    "customer": {
      "id": "bd9779ef-cba5-4ad0-b89b-e23913f0a7a7",
      "name": "John Doe"
    },
    "items": [
      {
        "sku": "KNATIVE-1",
        "quantity": 1,
        "price": 99.99
      },
      {
        "sku": "KNATIVE-2",
        "quantity": 2,
        "price": 129.99
      }
    ],
    "total": 359.97
  }
}

Use this pattern to integrate with APIs that require specific structures or calculated fields.

14.5.4. Conditional transformations

With JSONata, you can embed conditional logic directly into your transformation, enabling dynamic event shaping based on attributes or payload values.

Example of applying different types and priorities based on conditions

{
  "specversion": "1.0",
  "id": id,
  "type": type = "order.created" ? "new.order" : "updated.order",
  "source": source,
  "time": time,
  "priority": data.total > 1000 ? "high" : "normal",
  "data": $
}

In the given example:

  • If the event type is order.created, the system sets the new type to new.order. Otherwise, the system sets it to updated.order.
  • If the total field in the payload is greater than 1000, the system adds a priority attribute with the value high. Otherwise, the system sets the value to normal.

14.6. Advanced JSONata features

You can use JSONata to transform events more effectively by applying advanced features such as array processing and built-in functions.

14.6.1. Array processing

JSONata makes it easy to work with arrays in event payloads. You can count items, filter them based on conditions, and calculate total values.

Example of processing items in an order and compute totals

{
  "specversion": "1.0",
  "id": id,
  "type": "order.processed",
  "source": source,
  "time": $now(),
  "itemcount": $count(order.items),
  "multiorder": $count(order.items) > 1,
  "data": {
    "order": order.id,
    "items": order.items[quantity > 1].{
      "product": name,
      "quantity": quantity,
      "lineTotal": price * quantity
    },
    "totalvalue": $sum(order.items.(price * quantity))
  }
}

Given this as an input:

{
  "id": "12345",
  "source": "https://example.com/orders",
  "order": {
    "id": "order-67890",
    "items": [
      { "name": "Laptop", "price": 1000, "quantity": 1 },
      { "name": "Mouse", "price": 50, "quantity": 2 },
      { "name": "Keyboard", "price": 80, "quantity": 3 }
    ]
  }
}

The transformation produces the following output:

{
  "specversion": "1.0",
  "id": "12345",
  "type": "order.processed",
  "source": "https://example.com/orders",
  "time": "2025-03-03T09:13:23.753Z",
  "itemcount": 3,
  "multiorder": true,
  "data": {
    "order": "order-67890",
    "items": [
      { "product": "Mouse", "quantity": 2, "lineTotal": 100 },
      { "product": "Keyboard", "quantity": 3, "lineTotal": 240 }
    ],
    "totalvalue": 1340
  }
}

In this example:

  • $count(order.items) counts how many items are in the array.
  • The filter [quantity > 1] selects only items with quantity greater than one.
  • $sum(order.items.(price * quantity)) calculates the total value.

14.6.2. Using built-in functions

JSONata includes a wide range of built-in functions that can manipulate strings, numbers, dates, and arrays. These functions can enrich events with new fields derived from existing data.

Example of adding metadata by using built-in functions

{
  "specversion": "1.0",
  "id": id,
  "type": "user.event",
  "source": source,
  "time": time,
  "timestamp": $now(),
  "username": $lowercase(data.user.name),
  "initials": $join($map($split(data.user.name, " "), function($v) { $substring($v, 0, 1) }), ""),
  "data": $
}

In this example:

  • $now() adds the current timestamp.
  • $lowercase() converts the user name to lowercase.
  • $split() breaks the name into parts, $map() extracts the first letter of each, and $join() combines them into initials.

14.7. Transforming replies

You can configure an EventTransform resource to change both incoming requests and the responses returned from a sink in request–reply scenarios.

Note

You must use the same type of transformation for both the request and the reply. For example, JSONata with JSONata.

The following example shows how to transform both request and reply events:

apiVersion: eventing.knative.dev/v1alpha1
kind: EventTransform
metadata:
  name: request-reply-transform
spec:
  sink:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: processor-service
  jsonata:
    expression: |
      # Request transformation
      {
        "specversion": "1.0",
        "id": id,
        "type": "request.transformed",
        "source": source,
        "time": time,
        "data": data
      }
  reply:
    jsonata:
      expression: |
        # Reply transformation
        {
          "specversion": "1.0",
          "id": id,
          "type": "reply.transformed",
          "source": "transform.reply-processor",
          "time": time,
          "data": data
        }

14.8. Event transformation examples

You can use these examples to apply JSONata in EventTransform resources to reshape and enrich events for common business scenarios.

14.8.1. User registration event transformer

You can use the following example to process user registration events. The example normalizes email addresses, derives a display name if it is missing, sets a registration timestamp, and applies default values for region and subscription tier.

apiVersion: eventing.knative.dev/v1alpha1
kind: EventTransform
metadata:
  name: user-registration-transformer
spec:
  sink:
    ref:
      apiVersion: eventing.knative.dev/v1
      kind: Broker
      name: default
  jsonata:
    expression: |
      {
        "specversion": "1.0",
        "id": id,
        "type": "user.registered.processed",
        "source": "transform.user-processor",
        "time": time,
        "userid": data.user.id,
        "region": data.region ? data.region : "unknown",
        "tier": data.subscription.tier ? data.subscription.tier : "free",
        "data": {
          "userId": data.user.id,
          "email": $lowercase(data.user.email),
          "displayName": data.user.name ? data.user.name : $substring(data.user.email, 0, $indexOf(data.user.email, "@")),
          "registrationDate": $now(),
          "subscription": data.subscription ? data.subscription : { "tier": "free" }
        }
      }

14.8.2. Order processing event transformer

The following example processes order events. It calculates totals, tax, and grand total, determines the order priority, and enriches the event with processing timestamps:

apiVersion: eventing.knative.dev/v1alpha1
kind: EventTransform
metadata:
  name: order-processor
spec:
  jsonata:
    expression: |
      {
        "specversion": "1.0",
        "id": id,
        "type": "order.processed",
        "source": "transform.order-processor",
        "time": time,
        "orderid": data.id,
        "customerid": data.customer.id,
        "region": data.region,
        "priority": $sum(data.items.(price * quantity)) > 1000 ? "high" : "standard",
        "data": {
          "orderId": data.id,
          "customer": data.customer,
          "items": data.items.{
            "productId": productId,
            "name": name,
            "quantity": quantity,
            "unitPrice": price,
            "totalPrice": price * quantity
          },
          "total": $sum(data.items.(price * quantity)),
          "tax": $sum(data.items.(price * quantity)) * 0.1,
          "grandTotal": $sum(data.items.(price * quantity)) * 1.1,
          "created": data.created,
          "processed": $now()
        }
      }

You can use container source with Service Mesh.

This procedure describes how to configure container source with Service Mesh.

Prerequisites

  • You have set up integration of Service Mesh and Serverless.

Procedure

  1. Create a Service in a namespace that is member of the ServiceMeshMemberRoll:

    Example event-display-service.yaml configuration file

    apiVersion: serving.knative.dev/v1
    kind: Service
    metadata:
      name: event-display
      namespace: <namespace> 
    1
    
    spec:
      template:
        metadata:
          annotations:
            sidecar.istio.io/inject: "true" 
    2
    
            sidecar.istio.io/rewriteAppHTTPProbers: "true"
        spec:
          containers:
          - image: quay.io/openshift-knative/knative-eventing-sources-event-display:latest

    1
    A namespace that is a member of the ServiceMeshMemberRoll.
    2
    This annotation injects Service Mesh sidecars into the Knative service pods.
  2. Apply the Service resource:

    $ oc apply -f event-display-service.yaml
  3. Create a ContainerSource object in a namespace that is member of the ServiceMeshMemberRoll and sink set to the event-display:

    Example test-heartbeats-containersource.yaml configuration file

    apiVersion: sources.knative.dev/v1
    kind: ContainerSource
    metadata:
      name: test-heartbeats
      namespace: <namespace> 
    1
    
    spec:
      template:
        metadata: 
    2
    
          annotations:
            sidecar.istio.io/inject": "true"
            sidecar.istio.io/rewriteAppHTTPProbers: "true"
        spec:
          containers:
            # This corresponds to a heartbeats image URI that you have built and published
            - image: quay.io/openshift-knative/heartbeats
              name: heartbeats
              args:
                - --period=1s
              env:
                - name: POD_NAME
                  value: "example-pod"
                - name: POD_NAMESPACE
                  value: "event-test"
      sink:
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: event-display-service

    1
    A namespace that is part of the ServiceMeshMemberRoll.
    2
    These annotations enable Service Mesh integration with the ContainerSource object.
  4. Apply the ContainerSource resource:

    $ oc apply -f test-heartbeats-containersource.yaml
  5. Optional: Verify that events reach the Knative event sink by checking the message dumper function logs:

    Example command

    $ oc logs $(oc get pod -o name | grep event-display) -c user-container

    Example output

    ☁️  cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.eventing.samples.heartbeat
      source: https://knative.dev/eventing-contrib/cmd/heartbeats/#event-test/mypod
      id: 2b72d7bf-c38f-4a98-a433-608fbcdd2596
      time: 2019-10-18T15:23:20.809775386Z
      contenttype: application/json
    Extensions,
      beats: true
      heart: yes
      the: 42
    Data,
      {
        "id": 1,
        "label": ""
      }

You can use a sink binding with Service Mesh.

16.1. Configuring a sink binding with Service Mesh

This procedure describes how to configure a sink binding with Service Mesh.

Prerequisites

  • You have set up integration of Service Mesh and Serverless.

Procedure

  1. Create a Service object in a namespace that is member of the ServiceMeshMemberRoll:

    Example event-display-service.yaml configuration file

    apiVersion: serving.knative.dev/v1
    kind: Service
    metadata:
      name: event-display
      namespace: <namespace> 
    1
    
    spec:
      template:
        metadata:
          annotations:
            sidecar.istio.io/inject: "true" 
    2
    
            sidecar.istio.io/rewriteAppHTTPProbers: "true"
        spec:
          containers:
          - image: quay.io/openshift-knative/knative-eventing-sources-event-display:latest

    1
    A namespace that is a member of the ServiceMeshMemberRoll.
    2
    This annotation injects Service Mesh sidecars into the Knative service pods.
  2. Apply the Service object:

    $ oc apply -f event-display-service.yaml
  3. Create a SinkBinding object:

    Example heartbeat-sinkbinding.yaml configuration file

    apiVersion: sources.knative.dev/v1alpha1
    kind: SinkBinding
    metadata:
      name: bind-heartbeat
      namespace: <namespace> 
    1
    
    spec:
      subject:
        apiVersion: batch/v1
        kind: Job 
    2
    
        selector:
          matchLabels:
            app: heartbeat-cron
    
      sink:
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: event-display

    1
    A namespace that is part of the ServiceMeshMemberRoll.
    2
    Bind any Job with the label app: heartbeat-cron to the event sink.
  4. Apply the SinkBinding object:

    $ oc apply -f heartbeat-sinkbinding.yaml
  5. Create a CronJob object:

    Example heartbeat-cronjob.yaml configuration file

    apiVersion: batch/v1
    kind: CronJob
    metadata:
      name: heartbeat-cron
      namespace: <namespace> 
    1
    
    spec:
      # Run every minute
      schedule: "* * * * *"
      jobTemplate:
        metadata:
          labels:
            app: heartbeat-cron
            bindings.knative.dev/include: "true"
        spec:
          template:
            metadata:
              annotations:
                sidecar.istio.io/inject: "true" 
    2
    
                sidecar.istio.io/rewriteAppHTTPProbers: "true"
            spec:
              restartPolicy: Never
              containers:
                - name: single-heartbeat
                  image: quay.io/openshift-knative/heartbeats:latest
                  args:
                    - --period=1
                  env:
                    - name: ONE_SHOT
                      value: "true"
                    - name: POD_NAME
                      valueFrom:
                        fieldRef:
                          fieldPath: metadata.name
                    - name: POD_NAMESPACE
                      valueFrom:
                        fieldRef:
                          fieldPath: metadata.namespace

    1
    A namespace that is part of the ServiceMeshMemberRoll.
    2
    Inject Service Mesh sidecars into the CronJob pods.
  6. Apply the CronJob object:

    $ oc apply -f heartbeat-cronjob.yaml
  7. Optional: Verify that the events reach the Knative event sink by checking the message dumper function logs:

    Example command

    $ oc logs $(oc get pod -o name | grep event-display) -c user-container

    Example output

    ☁️  cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.eventing.samples.heartbeat
      source: https://knative.dev/eventing-contrib/cmd/heartbeats/#event-test/mypod
      id: 2b72d7bf-c38f-4a98-a433-608fbcdd2596
      time: 2019-10-18T15:23:20.809775386Z
      contenttype: application/json
    Extensions,
      beats: true
      heart: yes
      the: 42
    Data,
      {
        "id": 1,
        "label": ""
      }

Legal Notice

Copyright © Red Hat.
Except as otherwise noted below, the text of and illustrations in this documentation are licensed by Red Hat under the Creative Commons Attribution–Share Alike 3.0 Unported license . If you distribute this document or an adaptation of it, you must provide the URL for the original version.
Red Hat, as the licensor of this document, waives the right to enforce, and agrees not to assert, Section 4d of CC-BY-SA to the fullest extent permitted by applicable law.
Red Hat, the Red Hat logo, JBoss, Hibernate, and RHCE are trademarks or registered trademarks of Red Hat, LLC. or its subsidiaries in the United States and other countries.
Linux® is the registered trademark of Linus Torvalds in the United States and other countries.
XFS is a trademark or registered trademark of Hewlett Packard Enterprise Development LP or its subsidiaries in the United States and other countries.
The OpenStack® Word Mark and OpenStack logo are trademarks or registered trademarks of the Linux Foundation, used under license.
All other trademarks are the property of their respective owners.
Red Hat logoGithubredditYoutubeTwitter

Learn

Try, buy, & sell

Communities

About Red Hat Documentation

We help Red Hat users innovate and achieve their goals with our products and services with content they can trust. Explore our recent updates.

Making open source more inclusive

Red Hat is committed to replacing problematic language in our code, documentation, and web properties. For more details, see the Red Hat Blog.

About Red Hat

We deliver hardened solutions that make it easier for enterprises to work across platforms and environments, from the core datacenter to the network edge.

Theme

© 2026 Red Hat
Back to top