Chapter 3. Event sinks


3.1. Event sinks

When you create an event source, you can specify an event sink where events are sent to from the source. An event sink is an addressable or a callable resource that can receive incoming events from other resources. Knative services, channels, and brokers are all examples of event sinks. There is also a specific Apache Kafka sink type available.

Addressable objects receive and acknowledge an event delivered over HTTP to an address defined in their status.address.url field. As a special case, the core Kubernetes Service object also fulfills the addressable interface.

Callable objects are able to receive an event delivered over HTTP and transform the event, returning 0 or 1 new events in the HTTP response. These returned events may be further processed in the same way that events from an external event source are processed.

3.1.1. Knative CLI sink flag

When you create an event source by using the Knative (kn) CLI, you can specify a sink where events are sent to from that resource by using the --sink flag. The sink can be any addressable or callable resource that can receive incoming events from other resources.

The following example creates a sink binding that uses a service, http://event-display.svc.cluster.local, as the sink:

Example command using the sink flag

$ kn source binding create bind-heartbeat \
  --namespace sinkbinding-example \
  --subject "Job:batch/v1:app=heartbeat-cron" \
  --sink http://event-display.svc.cluster.local \ 
1

  --ce-override "sink=bound"
Copy to Clipboard Toggle word wrap

1
svc in http://event-display.svc.cluster.local determines that the sink is a Knative service. Other default sink prefixes include channel, and broker.
Tip

You can configure which CRs can be used with the --sink flag for Knative (kn) CLI commands by Customizing kn.

3.2. Creating event sinks

When you create an event source, you can specify an event sink where events are sent to from the source. An event sink is an addressable or a callable resource that can receive incoming events from other resources. Knative services, channels, and brokers are all examples of event sinks. There is also a specific Apache Kafka sink type available.

For information about creating resources that can be used as event sinks, see the following documentation:

3.3. Sink for Apache Kafka

Apache Kafka sinks are a type of event sink that are available if a cluster administrator has enabled Apache Kafka on your cluster. You can send events directly from an event source to a Kafka topic by using a Kafka sink.

3.3.1. Creating an Apache Kafka sink by using YAML

You can create a Kafka sink that sends events to a Kafka topic. By default, a Kafka sink uses the binary content mode, which is more efficient than the structured mode. To create a Kafka sink by using YAML, you must create a YAML file that defines a KafkaSink object, then apply it by using the oc apply command.

Prerequisites

  • The OpenShift Serverless Operator, Knative Eventing, and the KnativeKafka custom resource (CR) are installed on your cluster.
  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.
  • You have access to a Red Hat AMQ Streams (Kafka) cluster that produces the Kafka messages you want to import.
  • Install the OpenShift CLI (oc).

Procedure

  1. Create a KafkaSink object definition as a YAML file:

    Kafka sink YAML

    apiVersion: eventing.knative.dev/v1alpha1
    kind: KafkaSink
    metadata:
      name: <sink-name>
      namespace: <namespace>
    spec:
      topic: <topic-name>
      bootstrapServers:
       - <bootstrap-server>
    Copy to Clipboard Toggle word wrap

  2. To create the Kafka sink, apply the KafkaSink YAML file:

    $ oc apply -f <filename>
    Copy to Clipboard Toggle word wrap
  3. Configure an event source so that the sink is specified in its spec:

    Example of a Kafka sink connected to an API server source

    apiVersion: sources.knative.dev/v1alpha2
    kind: ApiServerSource
    metadata:
      name: <source-name> 
    1
    
      namespace: <namespace> 
    2
    
    spec:
      serviceAccountName: <service-account-name> 
    3
    
      mode: Resource
      resources:
      - apiVersion: v1
        kind: Event
      sink:
        ref:
          apiVersion: eventing.knative.dev/v1alpha1
          kind: KafkaSink
          name: <sink-name> 
    4
    Copy to Clipboard Toggle word wrap

    1
    The name of the event source.
    2
    The namespace of the event source.
    3
    The service account for the event source.
    4
    The Kafka sink name.

You can create a Kafka sink that sends events to a Kafka topic in the OpenShift Container Platform web console. By default, a Kafka sink uses the binary content mode, which is more efficient than the structured mode.

As a developer, you can create an event sink to receive events from a particular source and send them to a Kafka topic.

Prerequisites

  • You have installed the OpenShift Serverless Operator, with Knative Serving, Knative Eventing, and Knative broker for Apache Kafka APIs, from the OperatorHub.
  • You have created a Kafka topic in your Kafka environment.

Procedure

  1. Navigate to the +Add view.
  2. Click Event Sink in the Eventing catalog.
  3. Search for KafkaSink in the catalog items and click it.
  4. Click Create Event Sink.
  5. In the form view, type the URL of the bootstrap server, which is a combination of host name and port.

  6. Type the name of the topic to send event data.
  7. Type the name of the event sink.
  8. Click Create.

Verification

  1. Navigate to the Topology view.
  2. Click the created event sink to view its details in the right panel.

3.3.3. Configuring security for Apache Kafka sinks

Transport Layer Security (TLS) is used by Apache Kafka clients and servers to encrypt traffic between Knative and Kafka, as well as for authentication. TLS is the only supported method of traffic encryption for the Knative broker implementation for Apache Kafka.

Simple Authentication and Security Layer (SASL) is used by Apache Kafka for authentication. If you use SASL authentication on your cluster, users must provide credentials to Knative for communicating with the Kafka cluster; otherwise events cannot be produced or consumed.

Prerequisites

  • The OpenShift Serverless Operator, Knative Eventing, and the KnativeKafka custom resources (CRs) are installed on your OpenShift Container Platform cluster.
  • Kafka sink is enabled in the KnativeKafka CR.
  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.
  • You have a Kafka cluster CA certificate stored as a .pem file.
  • You have a Kafka cluster client certificate and a key stored as .pem files.
  • You have installed the OpenShift (oc) CLI.
  • You have chosen the SASL mechanism to use, for example, PLAIN, SCRAM-SHA-256, or SCRAM-SHA-512.

Procedure

  1. Create the certificate files as a secret in the same namespace as your KafkaSink object:

    Important

    Certificates and keys must be in PEM format.

    • For authentication using SASL without encryption:

      $ oc create secret -n <namespace> generic <secret_name> \
        --from-literal=protocol=SASL_PLAINTEXT \
        --from-literal=sasl.mechanism=<sasl_mechanism> \
        --from-literal=user=<username> \
        --from-literal=password=<password>
      Copy to Clipboard Toggle word wrap
    • For authentication using SASL and encryption using TLS:

      $ oc create secret -n <namespace> generic <secret_name> \
        --from-literal=protocol=SASL_SSL \
        --from-literal=sasl.mechanism=<sasl_mechanism> \
        --from-file=ca.crt=<my_caroot.pem_file_path> \ 
      1
      
        --from-literal=user=<username> \
        --from-literal=password=<password>
      Copy to Clipboard Toggle word wrap
      1
      The ca.crt can be omitted to use the system’s root CA set if you are using a public cloud managed Kafka service.
    • For authentication and encryption using TLS:

      $ oc create secret -n <namespace> generic <secret_name> \
        --from-literal=protocol=SSL \
        --from-file=ca.crt=<my_caroot.pem_file_path> \ 
      1
      
        --from-file=user.crt=<my_cert.pem_file_path> \
        --from-file=user.key=<my_key.pem_file_path>
      Copy to Clipboard Toggle word wrap
      1
      The ca.crt can be omitted to use the system’s root CA set if you are using a public cloud managed Kafka service.
  2. Create or modify a KafkaSink object and add a reference to your secret in the auth spec:

    apiVersion: eventing.knative.dev/v1alpha1
    kind: KafkaSink
    metadata:
       name: <sink_name>
       namespace: <namespace>
    spec:
    ...
       auth:
         secret:
           ref:
             name: <secret_name>
    ...
    Copy to Clipboard Toggle word wrap
  3. Apply the KafkaSink object:

    $ oc apply -f <filename>
    Copy to Clipboard Toggle word wrap

3.4. JobSink

Event processing usually completes within a short time frame, such as a few minutes. This ensures that the HTTP connection remains open and the service does not scale down prematurely.

Maintaining long-running connections increases the risk of failure, potentially leading to processing restarts and repeated request retries.

You can use JobSink to support long-running asynchronous jobs and tasks using the full Kubernetes batch/v1 Job resource and features and Kubernetes job queuing systems such as Kueue.

3.4.1. Using JobSink

When an event is sent to a JobSink, Eventing creates a Job and mounts the received event as JSON file at /etc/jobsink-event/event.

Procedure

  1. Create a JobSink object definition as a YAML file:

    JobSink YAML

    apiVersion: sinks.knative.dev/v1alpha1
    kind: JobSink
    metadata:
      name: job-sink-logger
    spec:
      job:
        spec:
          completions: 1
          parallelism: 1
          template:
            spec:
              restartPolicy: Never
              containers:
                - name: main
                  image: docker.io/library/bash:5
                  command: [ "cat" ]
                  args:
                    - "/etc/jobsink-event/event"
    Copy to Clipboard Toggle word wrap

  2. Apply the JobSink YAML file:

    $ oc apply -f <job-sink-file.yaml>
    Copy to Clipboard Toggle word wrap
  3. Verify JobSink is ready:

    $ oc get jobsinks.sinks.knative.dev
    Copy to Clipboard Toggle word wrap

    Example output:

    NAME              URL                                                                          AGE   READY   REASON
    job-sink-logger   http://job-sink.knative-eventing.svc.cluster.local/default/job-sink-logger   5s    True
    Copy to Clipboard Toggle word wrap
  4. Trigger a JobSink. JobSink can be triggered by any event source or trigger.

    $ oc run curl --image=curlimages/curl --rm=true --restart=Never -ti -- -X POST -v \
       -H "content-type: application/json"  \
       -H "ce-specversion: 1.0" \
       -H "ce-source: my/curl/command" \
       -H "ce-type: my.demo.event" \
       -H "ce-id: 123" \
       -d '{"details":"JobSinkDemo"}' \
       http://job-sink.knative-eventing.svc.cluster.local/default/job-sink-logger
    Copy to Clipboard Toggle word wrap
  5. Verify a Job is created:

    $ oc logs job-sink-loggerszoi6-dqbtq
    Copy to Clipboard Toggle word wrap

    Example output:

    {"specversion":"1.0","id":"123","source":"my/curl/command","type":"my.demo.event","datacontenttype":"application/json","data":{"details":"JobSinkDemo"}}
    Copy to Clipboard Toggle word wrap
Note

JobSink creates a Job for each unique event it receives.

An event is uniquely identified by the combination of its source and id attributes.

If an event with the same attributes is received while a Job for that event already exists, another Job will not be created.

3.4.2. Reading the Job event file

Procedure

  • Read the event file and deserialize it by using any CloudEvents JSON deserializer. The following example demonstrates how to read and process an event using CloudEvents Go SDK:

    package mytask
    
    import (
        "encoding/json"
        "fmt"
        "os"
    
        cloudevents "github.com/cloudevents/sdk-go/v2"
    )
    
    func handleEvent() error {
        eventBytes, err := os.ReadFile("/etc/jobsink-event/event")
        if err != nil {
            return err
        }
    
        event := &cloudevents.Event{}
        if err := json.Unmarshal(eventBytes, event); err != nil {
            return err
        }
    
        fmt.Println(event)
    
        return nil
    }
    Copy to Clipboard Toggle word wrap

3.4.3. Setting custom event file mount path

You can set a custom event file mount path in your JobSink definition.

Procedure

  • Inside your container definition, include the volumeMounts configuration and set as required.

    apiVersion: sinks.knative.dev/v1alpha1
    kind: JobSink
    metadata:
      name: job-sink-custom-mount-path
    spec:
      job:
        spec:
          completions: 1
          parallelism: 1
          template:
            spec:
              restartPolicy: Never
              containers:
                - name: main
                  image: docker.io/library/bash:5
                  command: [ "bash" ]
                  args:
                    - -c
                    - echo "Hello world!" && sleep 5
    
                  # The event will be available in a file at `/etc/custom-path/event`
                  volumeMounts:
                    - name: "jobsink-event"
                      mountPath: "/etc/custom-path"
                      readOnly: true
    Copy to Clipboard Toggle word wrap

3.4.4. Cleaning up finished jobs

You can clean up finished jobs by setting a ttlSecondsAfterFinished value in your JobSink definition. For example, setting the value to 600 removes completed jobs 600 seconds (10 minutes) after they finish.

Procedure

  • In your definition, set the value of ttlSecondsAfterFinished to the required amount.

    Example of ttlSecondsAfterFinished set to 600

    apiVersion: sinks.knative.dev/v1alpha1
    kind: JobSink
    metadata:
      name: job-sink-example
    spec:
      job:
        spec:
          ttlSecondsAfterFinished: 600
    Copy to Clipboard Toggle word wrap

3.4.5. Simulating FailJob action

Procedure

  • Trigger a FailJob action by including a bug simulating command in your JobSink definition.

    Example of JobSink failure

    apiVersion: sinks.knative.dev/v1alpha1
    kind: JobSink
    metadata:
      name: job-sink-failure
    spec:
      job:
        metadata:
          labels:
            my-label: my-value
        spec:
          completions: 12
          parallelism: 3
          template:
            spec:
              restartPolicy: Never
              containers:
                - name: main
                  image: docker.io/library/bash:5
                  command: [ "bash" ]        # example command simulating a bug which triggers the FailJob action
                  args:
                    - -c
                    - echo "Hello world!" && sleep 5 && exit 42
          backoffLimit: 6
          podFailurePolicy:
            rules:
              - action: FailJob
                onExitCodes:
                  containerName: main      # optional
                  operator: In             # one of: In, NotIn
                  values: [ 42 ]
              - action: Ignore             # one of: Ignore, FailJob, Count
                onPodConditions:
                  - type: DisruptionTarget   # indicates Pod disruption
    Copy to Clipboard Toggle word wrap

3.5. Getting started with IntegrationSink

Important

OpenShift Serverless IntegrationSink feature is a Technology Preview feature only. Technology Preview features are not supported with Red Hat production service level agreements (SLAs) and might not be functionally complete. Red Hat does not recommend using them in production. These features provide early access to upcoming product features, enabling customers to test functionality and provide feedback during the development process.

For more information about the support scope of Red Hat Technology Preview features, see Technology Preview Features Support Scope.

The IntegrationSink API is a Knative Eventing custom resource (CR) that enables you to send events from Knative into external systems. It leverages selected Kamelets from the Apache Camel project.

Kamelets act as reusable connectors that can function either as sources or sinks.By using an IntegrationSink API, you can reliably deliver CloudEvents produced within Knative Eventing to third-party services and external systems.

OpenShift Serverless supports the following Kamelet sinks:

  • AWS Simple Storage Service (S3)
  • AWS Simple Notification Service (SNS)
  • AWS Simple Queue Service (SQS)
  • Generic logger sink

3.5.1. Creating AWS credentials

Several IntegrationSink API resources require access to Amazon Web Services (AWS) services such as S3, SNS, and SQS. To connect securely, you must create a Kubernetes Secret containing valid AWS credentials in the namespace where the IntegrationSink API resource will be created.

The Secret must include an AWS access key ID and secret access key with sufficient permissions to access the target AWS service. This Secret will then be referenced in each IntegrationSink configuration.

Prerequisites

  • You have an AWS account with an access key ID and secret access key that provide access to the relevant service.
  • You have the OpenShift CLI (oc) installed and are logged in to the cluster.
  • You have identified the namespace in which the IntegrationSink resource will be created.

Procedure

  • Create the secret by running the following command:

    $ oc -n <namespace> create secret generic my-secret \
      --from-literal=aws.accessKey=<accessKey> \
      --from-literal=aws.secretKey=<secretKey>
    Copy to Clipboard Toggle word wrap

    Replace <namespace> with the namespace where the IntegrationSink resource will be created, and substitute <accessKey> and <secretKey> with the appropriate AWS credentials.

3.5.2. AWS S3 with IntegrationSink

The Amazon Web Services (AWS) Simple Storage Service (S3) Kamelet allows you to deliver CloudEvents from Knative Eventing into an Amazon S3 bucket. This integration makes it possible to store events as objects for long-term storage, analytics, or downstream processing.

To configure an IntegrationSink API for AWS S3, you must reference a Kubernetes Secret with valid AWS credentials, specify the Amazon S3 bucket Amazon Resource Name (ARN) and its region, and configure the source that produces the CloudEvents.

3.5.2.1. Creating an IntegrationSink for AWS S3

You can create an IntegrationSink API resource to deliver CloudEvents from Knative Eventing to an Amazon Simple Storage Service (S3) bucket. This enables you to persist event data as objects in S3, where it can be stored for long-term use, processed by analytics tools, or consumed by downstream applications. You can create an IntegrationSink to deliver CloudEvents to an Amazon S3 bucket by applying a YAML manifest.

Prerequisites

  • You have created your AWS credentials and stored them in a Kubernetes Secret in the same namespace as the resource.
  • You have the OpenShift CLI (oc) installed and are logged in to the cluster.
  • You know the namespace where the IntegrationSink resource will be created.
  • A Knative broker or another event source exists to produce CloudEvents to be delivered to this sink.

Procedure

  1. Save the following YAML manifest as integration-sink-aws-s3.yaml:

    apiVersion: sinks.knative.dev/v1alpha1
    kind: IntegrationSink
    metadata:
      name: integration-sink-aws-s3
      namespace: knative-samples
    spec:
      aws:
        s3:
          arn: "arn:aws:s3:::my-bucket"
          region: "eu-north-1"
        auth:
          secret:
            ref:
              name: "my-secret"
    Copy to Clipboard Toggle word wrap
  2. Apply the manifest by running the following command:

    $ oc apply -f integration-sink-aws-s3.yaml
    Copy to Clipboard Toggle word wrap

    This manifest configures the IntegrationSink API to deliver CloudEvents to the Amazon S3 bucket identified by its ARN (arn:aws:s3:::my-bucket) in the eu-north-1 region. The auth.secret.ref.name field references the Kubernetes Secret (my-secret) that stores your AWS credentials.

3.5.3. AWS SNS with IntegrationSink

The Amazon Web Services (AWS) Simple Notification Service (SNS) Kamelet enables you to deliver CloudEvents from Knative Eventing to an SNS topic. This integration is useful when you need broad distribution like email, SMS, HTTP subscribers, SQS queues, Lambda functions, etc. via SNS subscriptions.

To configure an IntegrationSink API resource for AWS SNS, reference a Kubernetes Secret with valid AWS credentials, specify the SNS topic Amazon Resource Name (ARN) and region, and configure the event source that will produce the CloudEvents.

3.5.3.1. Creating an IntegrationSink for AWS SNS

You can create an IntegrationSink API resource to publish CloudEvents from Knative Eventing to an Amazon Simple Notification Service (SNS) topic by applying a YAML manifest.

Prerequisites

  • You have created your AWS credentials and stored them in a Kubernetes Secret in the same namespace as the resource.
  • You have the OpenShift CLI (oc) installed and are logged in to the cluster.
  • You know the namespace where the IntegrationSink resource will be created.
  • A Knative broker or another event source exists to produce CloudEvents to be delivered to this sink.

Procedure

  1. Save the following YAML manifest as integration-sink-aws-sns.yaml:

    apiVersion: sinks.knative.dev/v1alpha1
    kind: IntegrationSink
    metadata:
      name: integration-sink-aws-sns
      namespace: knative-samples
    spec:
      aws:
        sns:
          arn: "arn:aws:sns:<region>:<account>:my-topic"
          region: "eu-north-1"
        auth:
          secret:
            ref:
              name: "my-secret"
    Copy to Clipboard Toggle word wrap
  2. Apply the manifest by running the following command:

    $ oc apply -f integration-sink-aws-sns.yaml
    Copy to Clipboard Toggle word wrap

    This manifest configures the IntegrationSink API to publish CloudEvents to the SNS topic identified by its ARN in the eu-north-1 region. The auth.secret.ref.name field references the Kubernetes Secret (my-secret) that stores your AWS credentials.

3.5.4. AWS SQS with IntegrationSink

The Amazon Web Services (AWS) Simple Queue Service (SQS) Kamelet allows you to send CloudEvents from Knative Eventing into an SQS queue. This integration is useful when you need reliable, decoupled message delivery between event producers and consumers, or when downstream systems process events asynchronously from a queue.

To configure an IntegrationSink API resource for AWS SQS, you must reference a Kubernetes Secret with valid AWS credentials, specify the queue Amazon Resource Name (ARN) and region, and configure the event source that produces the CloudEvents.

3.5.4.1. Creating an IntegrationSink for AWS SQS

You can create an IntegrationSink API resource to send CloudEvents to an Amazon SQS queue by applying a YAML manifest.

Prerequisites

  • You have created your AWS credentials and stored them in a Kubernetes Secret in the same namespace as the resource.
  • You have the OpenShift CLI (oc) installed and are logged in to the cluster.
  • You know the namespace where the IntegrationSink resource will be created.
  • A Knative broker or another event source exists to produce CloudEvents that will be delivered to this sink.

Procedure

  1. Save the following YAML manifest as integration-sink-aws-sqs.yaml:

    apiVersion: sinks.knative.dev/v1alpha1
    kind: IntegrationSink
    metadata:
      name: integration-sink-aws-sqs
      namespace: knative-samples
    spec:
      aws:
        sqs:
          arn: "arn:aws:sqs:<region>:<account>:my-queue"
          region: "eu-north-1"
        auth:
          secret:
            ref:
              name: "my-secret"
    Copy to Clipboard Toggle word wrap
  2. Apply the manifest by running the following command:

    $ oc apply -f integration-sink-aws-sqs.yaml
    Copy to Clipboard Toggle word wrap

    This manifest configures the IntegrationSink API to send CloudEvents to the SQS queue identified by its ARN in the eu-north-1 region. The auth.secret.ref.name field references the Kubernetes Secret (my-secret) that stores your AWS credentials.

3.5.5. Generic logger sink with IntegrationSink

The Log Sink Kamelet allows you to output CloudEvents from Knative Eventing directly to the application log. This sink is primarily used for debugging or testing event flows, as it provides visibility into the event payloads and metadata without requiring an external system.

To configure a IntegrationSink API resource for the logger, you can set the logging level and optionally display event headers.

You can create an IntegrationSink API resource to log CloudEvents by applying a YAML manifest.

Prerequisites

  • You have the OpenShift CLI (oc) installed and are logged in to the cluster.
  • You know the namespace where the IntegrationSink resource will be created.
  • A Knative broker or another event source exists to produce CloudEvents to be delivered to this sink.

Procedure

  1. Save the following YAML manifest as integration-log-sink.yaml:

    apiVersion: sinks.knative.dev/v1alpha1
    kind: IntegrationSink
    metadata:
      name: integration-log-sink
      namespace: knative-samples
    spec:
      log:
        showHeaders: true
        level: INFO
    Copy to Clipboard Toggle word wrap
  2. Apply the manifest by running the following command:

    $ oc apply -f integration-log-sink.yaml
    Copy to Clipboard Toggle word wrap

    This manifest configures the IntegrationSink API resource to log all CloudEvents it receives at the INFO level. The showHeaders option is set to true, which means the HTTP headers of the event will also be logged.

Back to top
Red Hat logoGithubredditYoutubeTwitter

Learn

Try, buy, & sell

Communities

About Red Hat Documentation

We help Red Hat users innovate and achieve their goals with our products and services with content they can trust. Explore our recent updates.

Making open source more inclusive

Red Hat is committed to replacing problematic language in our code, documentation, and web properties. For more details, see the Red Hat Blog.

About Red Hat

We deliver hardened solutions that make it easier for enterprises to work across platforms and environments, from the core datacenter to the network edge.

Theme

© 2025 Red Hat