Chapter 3. Event sinks
3.1. Event sinks Copy linkLink copied to clipboard!
When you create an event source, you can specify an event sink where events are sent to from the source. An event sink is an addressable or a callable resource that can receive incoming events from other resources. Knative services, channels, and brokers are all examples of event sinks. There is also a specific Apache Kafka sink type available.
3.1.1. Understanding addressable and callable objects Copy linkLink copied to clipboard!
Learn how addressable and callable objects handle events in Knative Eventing.
Addressable objects receive and acknowledge events delivered over HTTP to an address defined in the status.address.url field. The core Kubernetes Service object also satisfies the addressable interface.
Callable objects receive events delivered over HTTP, transform the events, and return 0 or 1 new events in the HTTP response. These returned events follow the same processing flow as events from an external event source.
You can configure which CRs can be used with the --sink flag for Knative (kn) CLI commands by Customizing kn.
3.1.2. Knative CLI sink flag Copy linkLink copied to clipboard!
When you create an event source by using the Knative (kn) CLI, you can specify a sink where events are sent to from that resource by using the --sink flag. The sink can be any addressable or callable resource that can receive incoming events from other resources.
The following example creates a sink binding that uses a service, http://event-display.svc.cluster.local, as the sink:
You get an output similar to the following example:
$ kn source binding create bind-heartbeat \
--namespace sinkbinding-example \
--subject "Job:batch/v1:app=heartbeat-cron" \
--sink http://event-display.svc.cluster.local \
--ce-override "sink=bound"
svc in http://event-display.svc.cluster.local determines that the sink is a Knative service. Other default sink prefixes include channel, and broker.
3.2. Creating event sinks Copy linkLink copied to clipboard!
When you create an event source, you can specify an event sink where events are sent to from the source. An event sink is an addressable or a callable resource that can receive incoming events from other resources. Knative services, channels, and brokers are all examples of event sinks. There is also a specific Apache Kafka sink type available.
3.3. Sink for Apache Kafka Copy linkLink copied to clipboard!
Apache Kafka sinks are a type of event sink that are available if a cluster administrator has enabled Apache Kafka on your cluster. You can send events directly from an event source to a Kafka topic by using a Kafka sink.
3.3.1. Creating an Apache Kafka sink by using YAML Copy linkLink copied to clipboard!
You can create a Kafka sink that sends events to a Kafka topic. By default, a Kafka sink uses the binary content mode, which is more efficient than the structured mode. To create a Kafka sink by using YAML, you must create a YAML file that defines a KafkaSink object, then apply it by using the oc apply command.
Prerequisites
-
You have installed the OpenShift Serverless Operator, Knative Eventing, and the
KnativeKafkacustom resource (CR) on your cluster. - You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.
- You have access to a Red Hat AMQ Streams (Kafka) cluster that produces the Kafka messages you want to import.
-
Install the OpenShift CLI (
oc).
Procedure
Create a
KafkaSinkobject definition as a YAML file:Kafka sink YAML
apiVersion: eventing.knative.dev/v1alpha1 kind: KafkaSink metadata: name: <sink_name> namespace: <namespace> spec: topic: <topic_name> bootstrapServers: - <bootstrap_server>To create the Kafka sink, apply the
KafkaSinkYAML file:$ oc apply -f <filename>Configure an event source so that the sink is specified in its spec:
The following example displays Kafka sink connected to an API server source:
apiVersion: sources.knative.dev/v1alpha2 kind: ApiServerSource metadata: name: <source_name>1 namespace: <namespace>2 spec: serviceAccountName: <service_account_name>3 mode: Resource resources: - apiVersion: v1 kind: Event sink: ref: apiVersion: eventing.knative.dev/v1alpha1 kind: KafkaSink name: <sink_name>4 -
name: <source_name>: The name of the event source. -
namespace: <namespace>: The namespace of the event source. -
serviceAccountName: <service_account_name>: The service account for the event source. -
name: <sink_name>: The Kafka sink name.
-
3.3.2. Creating an event sink for Apache Kafka by using the OpenShift Container Platform web console Copy linkLink copied to clipboard!
You can create a Kafka sink that sends events to a Kafka topic in the OpenShift Container Platform web console. By default, a Kafka sink uses the binary content mode, which is more efficient than the structured mode.
As a developer, you can create an event sink to receive events from a particular source and send them to a Kafka topic.
Prerequisites
- You have installed the OpenShift Serverless Operator, with Knative Serving, Knative Eventing, and Knative broker for Apache Kafka APIs, from the OperatorHub.
- You have created a Kafka topic in your Kafka environment.
Procedure
- Navigate to the +Add view.
- Click Event Sink in the Eventing catalog.
-
Search for
KafkaSinkin the catalog items and click it. - Click Create Event Sink.
In the form view, type the URL of the bootstrap server, which is a combination of hostname and port.
- Type the name of the topic to send event data.
- Type the name of the event sink.
- Click Create.
Verification
- Navigate to the Topology view.
- Click the created event sink to view its details in the right panel.
3.3.3. Configuring security for Apache Kafka sinks Copy linkLink copied to clipboard!
Transport Layer Security (TLS) is used by Apache Kafka clients and servers to encrypt traffic between Knative and Kafka, and for authentication. TLS is the only supported method of traffic encryption for the Knative broker implementation for Apache Kafka.
Simple Authentication and Security Layer (SASL) is used by Apache Kafka for authentication. If you use SASL authentication on your cluster, users must give credentials to Knative for communicating with the Kafka cluster; otherwise events cannot be produced or consumed.
Prerequisites
-
You have installed the OpenShift Serverless Operator, Knative Eventing, and the
KnativeKafkacustom resources (CRs) on your OpenShift Container Platform cluster. -
You have enabled Kafka sink in the
KnativeKafkaCR. - You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.
-
You have a Kafka cluster CA certificate stored as a
.pemfile. -
You have a Kafka cluster client certificate and a key stored as
.pemfiles. -
You have installed the OpenShift (
oc) CLI. -
You have chosen the SASL mechanism to use, for example,
PLAIN,SCRAM-SHA-256, orSCRAM-SHA-512.
Procedure
Create the certificate files as a secret in the same namespace as your
KafkaSinkobject:ImportantCertificates and keys must use Privacy-Enhanced Mail (PEM) format.
For authentication using SASL without encryption:
$ oc create secret -n <namespace> generic <secret_name> \ --from-literal=protocol=SASL_PLAINTEXT \ --from-literal=sasl.mechanism=<sasl_mechanism> \ --from-literal=user=<username> \ --from-literal=password=<password>For authentication using SASL and encryption using TLS:
$ oc create secret -n <namespace> generic <secret_name> \ --from-literal=protocol=SASL_SSL \ --from-literal=sasl.mechanism=<sasl_mechanism> \ --from-file=ca.crt=<my_caroot.pem_file_path> \ --from-literal=user=<username> \ --from-literal=password=<password>from-file=ca.crt=<my_caroot.pem_file_path>: Theca.crtcan be omitted to use the system’s root CA set if you are using a public cloud managed Kafka service.For authentication and encryption using TLS:
$ oc create secret -n <namespace> generic <secret_name> \ --from-literal=protocol=SSL \ --from-file=ca.crt=<my_caroot.pem_file_path> \ --from-file=user.crt=<my_cert.pem_file_path> \ --from-file=user.key=<my_key.pem_file_path>from-file=ca.crt=<my_caroot.pem_file_path>: Theca.crtcan be omitted to use the system’s root CA set if you are using a public cloud managed Kafka service.
Create or modify a
KafkaSinkobject and add a reference to your secret in theauthspec:apiVersion: eventing.knative.dev/v1alpha1 kind: KafkaSink metadata: name: <sink_name> namespace: <namespace> spec: ... auth: secret: ref: name: <secret_name> ...Apply the
KafkaSinkobject by running the following command:$ oc apply -f <filename>
3.4. JobSink Copy linkLink copied to clipboard!
Event processing usually completes within a short time frame, such as a few minutes. This ensures that the HTTP connection remains open and the service does not scale down prematurely.
Long-running connections increase the risk of failures, which can trigger restarts and repeated retries. You can use JobSink to run long-running asynchronous jobs by using Kubernetes batch/v1 Job resources and Job queuing systems such as Kueue.
3.4.1. Using JobSink Copy linkLink copied to clipboard!
When an event is sent to a JobSink, Eventing creates a Job and mounts the received event as JSON file at /etc/jobsink-event/event.
Procedure
Create a
JobSinkobject definition as a YAML file:The following example displays
JobSinkYAML:apiVersion: sinks.knative.dev/v1alpha1 kind: JobSink metadata: name: job-sink-logger spec: job: spec: completions: 1 parallelism: 1 template: spec: restartPolicy: Never containers: - name: main image: docker.io/library/bash:5 command: [ "cat" ] args: - "/etc/jobsink-event/event"Apply the
JobSinkYAML file by running the following command:$ oc apply -f <job-sink-file.yaml>Verify
JobSinkis ready by running the following command:$ oc get jobsinks.sinks.knative.devYou get an output similar to the following example:
NAME URL AGE READY REASON job-sink-logger http://job-sink.knative-eventing.svc.cluster.local/default/job-sink-logger 5s TrueTrigger a
JobSink.JobSinkcan be triggered by any event source or trigger.$ oc run curl --image=curlimages/curl --rm=true --restart=Never -ti -- -X POST -v \ -H "content-type: application/json" \ -H "ce-specversion: 1.0" \ -H "ce-source: my/curl/command" \ -H "ce-type: my.demo.event" \ -H "ce-id: 123" \ -d '{"details":"JobSinkDemo"}' \ http://job-sink.knative-eventing.svc.cluster.local/default/job-sink-loggerVerify a
Jobis created by running the following command:$ oc logs job-sink-loggerszoi6-dqbtqYou get an output similar to the following example:
{"specversion":"1.0","id":"123","source":"my/curl/command","type":"my.demo.event","datacontenttype":"application/json","data":{"details":"JobSinkDemo"}}
JobSink creates a Job for each unique event it receives.
An event is uniquely identified by the combination of its source and id attributes.
If an event with the same attributes is received while a Job for that event already exists, another Job will not be created.
3.4.2. Reading the Job event file Copy linkLink copied to clipboard!
You can read the Job event file to inspect event data generated during Job execution.
Procedure
Read the
eventfile and deserialize it by using any CloudEvents JSON deserializer. The following example demonstrates how to read and process an event using CloudEvents Go SDK:package mytask import ( "encoding/json" "fmt" "os" cloudevents "github.com/cloudevents/sdk-go/v2" ) func handleEvent() error { eventBytes, err := os.ReadFile("/etc/jobsink-event/event") if err != nil { return err } event := &cloudevents.Event{} if err := json.Unmarshal(eventBytes, event); err != nil { return err } fmt.Println(event) return nil }
3.4.3. Setting custom event file mount path Copy linkLink copied to clipboard!
You can set a custom event file mount path in your JobSink definition.
Procedure
Inside your container definition, include the
volumeMountsconfiguration and set as required.apiVersion: sinks.knative.dev/v1alpha1 kind: JobSink metadata: name: job-sink-custom-mount-path spec: job: spec: completions: 1 parallelism: 1 template: spec: restartPolicy: Never containers: - name: main image: docker.io/library/bash:5 command: [ "bash" ] args: - -c - echo "Hello world!" && sleep 5 # The event will be available in a file at `/etc/custom-path/event` volumeMounts: - name: "jobsink-event" mountPath: "/etc/custom-path" readOnly: true
3.4.4. Cleaning up finished jobs Copy linkLink copied to clipboard!
You can clean up finished jobs by setting a ttlSecondsAfterFinished value in your JobSink definition. For example, setting the value to 600 removes completed jobs 600 seconds (10 minutes) after they finish.
Procedure
In your definition, set the value of
ttlSecondsAfterFinishedto the required amount.The following example displays
ttlSecondsAfterFinishedset to600:apiVersion: sinks.knative.dev/v1alpha1 kind: JobSink metadata: name: job-sink-example spec: job: spec: ttlSecondsAfterFinished: 600
3.4.5. Simulating FailJob action Copy linkLink copied to clipboard!
Simulate a FailJob action to test failure handling and retry behavior in Job execution.
Procedure
Trigger a
FailJobaction by including a bug simulating command in yourJobSinkdefinition.The following example displays
JobSinkfailure:apiVersion: sinks.knative.dev/v1alpha1 kind: JobSink metadata: name: job-sink-failure spec: job: metadata: labels: my-label: my-value spec: completions: 12 parallelism: 3 template: spec: restartPolicy: Never containers: - name: main image: docker.io/library/bash:5 command: [ "bash" ] # example command simulating a bug which triggers the FailJob action args: - -c - echo "Hello world!" && sleep 5 && exit 42 backoffLimit: 6 podFailurePolicy: rules: - action: FailJob onExitCodes: containerName: main # optional operator: In # one of: In, NotIn values: [ 42 ] - action: Ignore # one of: Ignore, FailJob, Count onPodConditions: - type: DisruptionTarget # indicates Pod disruption
3.5. Getting started with IntegrationSink Copy linkLink copied to clipboard!
The IntegrationSink API is a Knative Eventing custom resource (CR) that sends events from Knative to external systems by using selected Kamelets from the Apache Camel project.
Kamelets act as reusable connectors that function as sources or sinks. You can use the IntegrationSink API to deliver CloudEvents from Knative Eventing to external services, including AWS Simple Storage Service (S3), AWS Simple Notification Service (SNS), AWS Simple Queue Service (SQS), and a generic logger sink.
OpenShift Serverless IntegrationSink feature is a Technology Preview feature only. Technology Preview features are not supported with Red Hat production service level agreements (SLAs) and might not be functionally complete. Red Hat does not recommend using them in production. These features provide early access to upcoming product features, enabling customers to test functionality and provide feedback during the development process.
For more information about the support scope of Red Hat Technology Preview features, see Technology Preview Features Support Scope.
3.5.1. Creating AWS credentials Copy linkLink copied to clipboard!
Several IntegrationSink API resources require access to Amazon Web Services (AWS) services such as WS Simple Storage Service (S3), AWS Simple Notification Service (SNS), AWS Simple Queue Service (SQS). To connect securely, you must create a Kubernetes Secret containing valid AWS credentials in the namespace where the IntegrationSink API resource will be created.
The Secret must include an AWS access key ID and secret access key with enough permissions to access the target AWS service. This Secret will then be referenced in each IntegrationSink configuration.
Prerequisites
- You have an AWS account with an access key ID and secret access key that give access to the relevant service.
- You have the OpenShift CLI (oc) installed and are logged in to the cluster.
-
You have identified the namespace in which the
IntegrationSinkresource will be created.
Procedure
Create the secret by running the following command:
$ oc -n <namespace> create secret generic my-secret \ --from-literal=aws.accessKey=<accessKey> \ --from-literal=aws.secretKey=<secretKey>Replace
<namespace>with the namespace where theIntegrationSinkresource will be created, and substitute<accessKey>and<secretKey>with the appropriate AWS credentials.
3.5.2. AWS S3 with IntegrationSink Copy linkLink copied to clipboard!
You can use the Amazon Web Services (AWS) Simple Storage Service (S3) Kamelet to deliver CloudEvents from Knative Eventing to an Amazon S3 bucket. Store events as objects for long-term storage, analytics, or downstream processing.
To configure an IntegrationSink API for AWS S3, reference a Kubernetes Secret with valid AWS credentials, specify the S3 bucket Amazon Resource Name (ARN) and region, and configure the event source that produces CloudEvents.
3.5.2.1. Creating an IntegrationSink for AWS S3 Copy linkLink copied to clipboard!
You can create an IntegrationSink API resource to deliver CloudEvents from Knative Eventing to an Amazon Simple Storage Service (S3) bucket. Persist event data as objects in S3 for long-term use, analytics processing, or downstream applications. Apply a YAML manifest to create the IntegrationSink.
Prerequisites
- You have created your AWS credentials and stored them in a Kubernetes Secret in the same namespace as the resource.
- You have the OpenShift CLI (oc) installed and logged in to the cluster.
-
Specify the namespace where you will create the
IntegrationSinkresource. -
A Knative broker or another event source exists to produce
CloudEventsand send them to this sink.
Procedure
Save the following YAML manifest as
integration-sink-aws-s3.yaml:apiVersion: sinks.knative.dev/v1alpha1 kind: IntegrationSink metadata: name: integration-sink-aws-s3 namespace: knative-samples spec: aws: s3: arn: "arn:aws:s3:::my-bucket" region: "eu-north-1" auth: secret: ref: name: "my-secret"Apply the manifest by running the following command:
$ oc apply -f integration-sink-aws-s3.yamlThis manifest configures the
IntegrationSinkAPI to deliverCloudEventsto the Amazon S3 bucket identified by its Amazon Resource Name (ARN) (arn:aws:s3:::my-bucket) in theeu-north-1region. Theauth.secret.ref.namefield references the Kubernetes Secret (my-secret) that stores your AWS credentials.
3.5.3. AWS SNS with IntegrationSink Copy linkLink copied to clipboard!
You can use the Amazon Web Services (AWS) Simple Notification Service (SNS) Kamelet to deliver CloudEvents from Knative Eventing to an SNS topic. Distribute events to subscribers such as email, Short Message Service (SMS), HTTP endpoints, Amazon SQS queues, and AWS Lambda functions.
To configure an IntegrationSink API resource for AWS SNS, reference a Kubernetes Secret with valid AWS credentials, specify the SNS topic Amazon Resource Name (ARN) and region, and configure the event source that produces CloudEvents.
3.5.3.1. Creating an IntegrationSink for AWS SNS Copy linkLink copied to clipboard!
You can create an IntegrationSink API resource to publish CloudEvents from Knative Eventing to an Amazon Simple Notification Service (SNS) topic by applying a YAML manifest.
Prerequisites
- You have created your AWS credentials and stored them in a Kubernetes Secret in the same namespace as the resource.
- You have the OpenShift CLI (oc) installed and logged in to the cluster.
-
Specify the namespace where you will create the
IntegrationSinkresource. -
A Knative broker or another event source exists to produce
CloudEventsand send them to this sink.
Procedure
Save the following YAML manifest as
integration-sink-aws-sns.yaml:apiVersion: sinks.knative.dev/v1alpha1 kind: IntegrationSink metadata: name: integration-sink-aws-sns namespace: knative-samples spec: aws: sns: arn: "arn:aws:sns:<region>:<account>:my-topic" region: "eu-north-1" auth: secret: ref: name: "my-secret"Apply the manifest by running the following command:
$ oc apply -f integration-sink-aws-sns.yamlThis manifest configures the
IntegrationSinkAPI to publishCloudEventsto the SNS topic identified by its Amazon Resource Name (ARN) in theeu-north-1region. Theauth.secret.ref.namefield references the Kubernetes Secret (my-secret) that stores your AWS credentials.
3.5.4. AWS SQS with IntegrationSink Copy linkLink copied to clipboard!
The Amazon Web Services (AWS) Simple Queue Service (SQS) Kamelet allows you to send CloudEvents from Knative Eventing into an SQS queue. This integration is useful when you need reliable, decoupled message delivery between event producers and consumers, or when downstream systems process events asynchronously from a queue.
To configure an IntegrationSink API resource for AWS SQS, you must reference a Kubernetes Secret with valid AWS credentials, specify the queue Amazon Resource Name (ARN) and region, and configure the event source that produces the CloudEvents.
3.5.4.1. Creating an IntegrationSink for AWS SQS Copy linkLink copied to clipboard!
You can create an IntegrationSink API resource to send CloudEvents to an Amazon Simple Queue Service (SQS) queue by applying a YAML manifest.
Prerequisites
- You have created your AWS credentials and stored them in a Kubernetes Secret in the same namespace as the resource.
- You have the OpenShift CLI (oc) installed and logged in to the cluster.
-
Specify the namespace where you will create the
IntegrationSinkresource. -
A Knative broker or another event source exists to produce
CloudEventsand send them to this sink.
Procedure
Save the following YAML manifest as
integration-sink-aws-sqs.yaml:apiVersion: sinks.knative.dev/v1alpha1 kind: IntegrationSink metadata: name: integration-sink-aws-sqs namespace: knative-samples spec: aws: sqs: arn: "arn:aws:sqs:<region>:<account>:my-queue" region: "eu-north-1" auth: secret: ref: name: "my-secret"Apply the manifest by running the following command:
$ oc apply -f integration-sink-aws-sqs.yamlThis manifest configures the
IntegrationSinkAPI to sendCloudEventsto the SQS queue identified by its Amazon Resource Name (ARN) in theeu-north-1region. Theauth.secret.ref.namefield references the Kubernetes Secret (my-secret) that stores your AWS credentials.
3.5.5. Generic logger sink with IntegrationSink Copy linkLink copied to clipboard!
You can use the Log Sink Kamelet to output CloudEvents from Knative Eventing to the application log. Use this sink for debugging or testing event flows, because it provides visibility into event payloads and metadata without requiring an external system.
To configure a IntegrationSink API resource for the logger, you can set the logging level and optionally display event headers.
3.5.5.1. Creating an IntegrationSink for the Logger Copy linkLink copied to clipboard!
You can create an IntegrationSink API resource to log CloudEvents by applying a YAML manifest.
Prerequisites
- You have the OpenShift CLI (oc) installed and logged in to the cluster.
-
Specify the namespace where you will create the
IntegrationSinkresource. -
A Knative broker or another event source exists to produce
CloudEventsand send them to this sink.
Procedure
Save the following YAML manifest as
integration-log-sink.yaml:apiVersion: sinks.knative.dev/v1alpha1 kind: IntegrationSink metadata: name: integration-log-sink namespace: knative-samples spec: log: showHeaders: true level: INFOApply the manifest by running the following command:
$ oc apply -f integration-log-sink.yamlThis manifest configures the
IntegrationSinkAPI resource to log allCloudEventsit receives at theINFOlevel. You can set theshowHeadersoption totrueto log the HTTP headers of the event.