此内容没有您所选择的语言版本。

Chapter 2. Connecting to Kafka with Kamelets


Apache Kafka is an open-source, distributed, publish-subscribe messaging system for creating fault-tolerant, real-time data feeds. Kafka quickly stores and replicates data for a large number of consumers (external connections).

Kafka can help you build solutions that process streaming events. A distributed, event-driven architecture requires a "backbone" that captures, communicates and helps process events. Kafka can serve as the communication backbone that connects your data sources and events to applications.

You can use Kamelets to configure communication between Kafka and external resources. Kamelets allow you to configure how data moves from one endpoint to another in a Kafka stream-processing framework without writing code. Kamelets are route templates that you configure by specifying parameter values.

For example, Kafka stores data in a binary form. You can use Kamelets to serialize and deserialize the data for sending to, and receiving from, external connections. With Kamelets, you can validate the schema and make changes to the data, such as adding to it, filtering it, or masking it. Kamelets can also handle and process errors.

2.1. Overview of connecting to Kafka with Kamelets

If you use an Apache Kafka stream-processing framework, you can use Kamelets to connect services and applications to a Kafka topic. The Kamelet Catalog provides the following Kamelets specifically for making connections to a Kafka topic:

  • kafka-sink - Moves events from a data producer to a Kafka topic. In a Kamelet Binding, specify the kafka-sink Kamelet as the sink.
  • kafka-source - Moves events from a Kafka topic to a data consumer. In a Kamelet Binding, specify the kafka-source Kamelet as the source.

Figure 2.1 illustrates the flow of connecting source and sink Kamelets to a Kafka topic.

kafkafow generic

Figure 2.1: Data flow with Kamelets and a Kafka topic

Here is an overview of the basic steps for using Kamelets and Kamelet Bindings to connect applications and services to a Kafka topic:

  1. Set up Kafka:

    1. Install the needed OpenShift operators.

      • For OpenShift Streams for Apache Kafka, install the Camel K operator, the Camel K CLI, and the Red Hat OpenShift Application Services (RHOAS) CLI.
      • For AMQ streams, install the Camel K and AMQ streams operators and the Camel K CLI.
    2. Create a Kafka instance. A Kafka instance operates as a message broker. A broker contains topics and orchestrates the storage and passing of messages.
    3. Create a Kafka topic. A topic provides a destination for the storage of data.
    4. Obtain Kafka authentication credentials.
  2. Determine which services or applications you want to connect to your Kafka topic.
  3. View the Kamelet Catalog to find the Kamelets for the source and sink components that you want to add to your integration. Also, determine the required configuration parameters for each Kamelet that you want to use.
  4. Create Kamelet Bindings:

    • Create a Kamelet Binding that connects a data source (a component that produces data) to the Kafka topic (by using the kafka-sink Kamelet).
    • Create a Kamelet Binding that connects the kafka topic (by using kafka-source Kamelet) to a data sink (a component that consumes data).
  5. Optionally, manipulate the data that passes between the Kafka topic and the data source or sink by adding one or more action Kamelets as intermediary steps within a Kamelet Binding.
  6. Optionally, define how to handle errors within a Kamelet Binding.
  7. Apply the Kamelet Bindings as resources to the project.

    The Camel K operator generates a separate Camel K integration for each Kamelet Binding.

2.2. Setting up Kafka

Setting up Kafka involves installing the required OpenShift operators, creating a Kafka instance, and creating a Kafka topic.

Use one of these Red Hat products to set up Kafka:

  • Red Hat Advanced Message Queuing (AMQ) streams - A self-managed Apache Kafka offering. AMQ Streams is based on open source Strimzi and is included as part of Red Hat Integration. AMQ Streams is a distributed and scalable streaming platform based on Apache Kafka that includes a publish/subscribe messaging broker. Kafka Connect provides a framework to integrate Kafka-based systems with external systems. Using Kafka Connect, you can configure source and sink connectors to stream data from external systems into and out of a Kafka broker.
  • Red Hat OpenShift Streams for Apache Kafka - A managed cloud service that simplifies the process of running Apache Kafka. It provides a streamlined developer experience for building, deploying, and scaling new cloud-native applications or modernizing existing systems.

2.2.1. Setting up Kafka by using AMQ streams

AMQ Streams simplifies the process of running Apache Kafka in an OpenShift cluster.

2.2.1.1. Preparing your OpenShift cluster for AMQ Streams

To use Camel K or Kamelets and Red Hat AMQ Streams, you must install the following operators and tools:

  • Red Hat Integration - AMQ Streams operator - Manages the communication between your Openshift Cluster and AMQ Streams for Apache Kafka instances.
  • Red Hat Integration - Camel K operator - Installs and manages Camel K - a lightweight integration framework that runs natively in the cloud on OpenShift.
  • Camel K CLI tool - Allows you to access all Camel K features.

Prerequisites

  • You are familiar with Apache Kafka concepts.
  • You can access an OpenShift 4.6 (or later) cluster with the correct access level, the ability to create projects and install operators, and the ability to install the OpenShift and the Camel K CLI on your local system.
  • You installed the OpenShift CLI tool (oc) so that you can interact with the OpenShift cluster at the command line.

Procedure

To set up Kafka by using AMQ Streams:

  1. Log in to your OpenShift cluster’s web console.
  2. Create or open a project in which you plan to create your integration, for example my-camel-k-kafka.
  3. Install the Camel K operator and Camel K CLI as described in Installing Camel K.
  4. Install the AMQ streams operator:

    1. From any project, select Operators > OperatorHub.
    2. In the Filter by Keyword field, type AMQ Streams.
    3. Click the Red Hat Integration - AMQ Streams card and then click Install.

      The Install Operator page opens.

    4. Accept the defaults and then click Install.
  5. Select Operators > Installed Operators to verify that the Camel K and AMQ Streams operators are installed.

2.2.1.2. Setting up a Kafka topic with AMQ Streams

A Kafka topic provides a destination for the storage of data in a Kafka instance. You must set up a Kafka topic before you can send data to it.

Prerequisites

  • You can access an OpenShift cluster.
  • You installed the Red Hat Integration - Camel K and Red Hat Integration - AMQ Streams operators as described in Preparing your OpenShift cluster.
  • You installed the OpenShift CLI (oc) and the Camel K CLI (kamel).

Procedure

To set up a Kafka topic by using AMQ Streams:

  1. Log in to your OpenShift cluster’s web console.
  2. Select Projects and then click the project in which you installed the Red Hat Integration - AMQ Streams operator. For example, click the my-camel-k-kafka project.
  3. Select Operators > Installed Operators and then click Red Hat Integration - AMQ Streams.
  4. Create a Kafka cluster:

    1. Under Kafka, click Create instance.
    2. Type a name for the cluster, for example kafka-test.
    3. Accept the other defaults and then click Create.

      The process to create the Kafka instance might take a few minutes to complete.

      When the status is ready, continue to the next step.

  5. Create a Kafka topic:

    1. Select Operators > Installed Operators and then click Red Hat Integration - AMQ Streams.
    2. Under Kafka Topic, click Create Kafka Topic.
    3. Type a name for the topic, for example test-topic.
    4. Accept the other defaults and then click Create.

2.2.2. Setting up Kafka by using OpenShift streams

Red Hat OpenShift Streams for Apache Kafka is a managed cloud service that simplifies the process of running Apache Kafka.

To use OpenShift Streams for Apache Kafka, you must be logged into your Red Hat account.

2.2.2.1. Preparing your OpenShift cluster for OpenShift Streams

To use the Red Hat OpenShift Streams for Apache Kafka managed cloud service, you must install the following operators and tools:

  • OpenShift Application Services (RHOAS) CLI - Allows you to manage your application services from a terminal.
  • Red Hat Integration - Camel K operator Installs and manages Camel K - a lightweight integration framework that runs natively in the cloud on OpenShift.
  • Camel K CLI tool - Allows you to access all Camel K features.

Prerequisites

  • You are familiar with Apache Kafka concepts.
  • You can access an OpenShift 4.6 (or later) cluster with the correct access level, the ability to create projects and install operators, and the ability to install the OpenShift and Apache Camel K CLI on your local system.
  • You installed the OpenShift CLI tool (oc) so that you can interact with the OpenShift cluster at the command line.

Procedure

  1. Log in to your OpenShift web console with a cluster admin account.
  2. Create the OpenShift project for your Camel K or Kamelets application.

    1. Select Home > Projects.
    2. Click Create Project.
    3. Type the name of the project, for example my-camel-k-kafka, then click Create.
  3. Download and install the RHOAS CLI as described in Getting started with the rhoas CLI.
  4. Install the Camel K operator and Camel K CLI as described in Installing Camel K.
  5. To verify that the Red Hat Integration - Camel K operator is installed, click Operators > Installed Operators.

2.2.2.2. Setting up a Kafka topic with RHOAS

Kafka organizes messages around topics. Each topic has a name. Applications send messages to topics and retrieve messages from topics. A Kafka topic provides a destination for the storage of data in a Kafka instance. You must set up a Kafka topic before you can send data to it.

Prerequisites

  • You can access an OpenShift cluster with the correct access level, the ability to create projects and install operators, and the ability to install the OpenShift and the Camel K CLI on your local system.
  • You installed the OpenShift CLI (oc) , the Camel K CLI (kamel) , and RHOAS CLI (rhoas) tools as described in Preparing your OpenShift cluster.
  • You installed the Red Hat Integration - Camel K operator as described in Preparing your OpenShift cluster.
  • You are logged in to the Red Hat Cloud site.

Procedure

To set up a Kafka topic by using Red Hat OpenShift Streams for Apache Kafka:

  1. From the command line, log in to your OpenShift cluster.
  2. Open your project, for example:

    oc project my-camel-k-kafka

  3. Verify that the Camel K operator is installed in your project:

    oc get csv

    The result lists the Red Hat Camel K operator and indicates that it is in the Succeeded phase.

  4. Prepare and connect a Kafka instance to RHOAS:

    1. Login to the RHOAS CLI by using this command:

      rhoas login

    2. Create a kafka instance, for example kafka-test:

      rhoas kafka create kafka-test

      The process to create the Kafka instance might take a few minutes to complete.

  5. To check the status of your Kafka instance:

    rhoas status

    You can also view the status in the web console:

    https://cloud.redhat.com/application-services/streams/kafkas/

    When the status is ready, continue to the next step.

  6. Create a new Kafka topic:

    rhoas kafka topic create --name test-topic

  7. Connect your Kafka instance (cluster) with the Openshift Application Services instance:

    rhoas cluster connect

  8. Follow the script instructions for obtaining a credential token.

    You should see output similar to the following:

    Token Secret "rh-cloud-services-accesstoken-cli" created successfully
    Service Account Secret "rh-cloud-services-service-account" created successfully
    KafkaConnection resource "kafka-test" has been created
    KafkaConnection successfully installed on your cluster.

2.2.2.3. Obtaining Kafka credentials

To connect your applications or services to a Kafka instance, you must first obtain the following Kafka credentials:

  • Obtain the bootstrap URL.
  • Create a service account with credentials (username and password).

For OpenShift Streams, the authentication protocol is SASL_SSL.

Prerequisite

  • You have created a Kafka instance, and it has a ready status.
  • You have created a Kafka topic.

Procedure

  1. Obtain the Kafka Broker URL (Bootstrap URL):

    rhoas status

    This command returns output similar to the following:

      Kafka
      ---------------------------------------------------------------
      ID:                     1ptdfZRHmLKwqW6A3YKM2MawgDh
      Name:                   my-kafka
      Status:                 ready
      Bootstrap URL:        my-kafka--ptdfzrhmlkwqw-a-ykm-mawgdh.kafka.devshift.org:443
  2. To obtain a username and password, create a service account by using the following syntax:

    rhoas service-account create --name "<account-name>" --file-format json

    Note

    When creating a service account, you can choose the file format and location to save the credentials. For more information, type rhoas service-account create --help

    For example:

    rhoas service-account create --name "my-service-acct" --file-format json

    The service account is created and saved to a JSON file.

  3. To verify your service account credentials, view the credentials.json file:

    cat credentials.json

    This command returns output similar to the following:

    {"clientID":"srvc-acct-eb575691-b94a-41f1-ab97-50ade0cd1094", "password":"facf3df1-3c8d-4253-aa87-8c95ca5e1225"}
  4. Grant permission for sending and receiving messages to or from the Kakfa topic. Use the following command, where clientID is the value provided in the credentials.json file (from Step 3).

    rhoas kafka acl grant-access --producer --consumer --service-account $CLIENT_ID --topic test-topic --group all

    For example:

    rhoas kafka acl grant-access --producer --consumer --service-account srvc-acct-eb575691-b94a-41f1-ab97-50ade0cd1094 --topic test-topic --group all

2.3. Connecting a data source to a Kafka topic in a Kamelet Binding

To connect a data source to a Kafka topic, you create a Kamelet Binding as illustrated in Figure 2.2.

Connecting a data source to a Kafka topic Figure 2.2 Connecting a data source to a Kafka topic

Prerequisites

  • You know the name of the Kafka topic to which you want to send events.

    The example in this procedure uses test-topic for receiving events.

  • You know the values of the following parameters for your Kafka instance:

    • bootstrapServers - A comma separated list of Kafka Broker URLs.
    • password - The password to authenticate to Kafka. For OpenShift Streams, this is the password in the credentials.json file. For an unauthenticated kafka instance on AMQ Streams, you can specify any non-empty string.
    • user - The user name to authenticate to Kafka. For OpenShift Streams, this is the clientID in the credentials.json file. For an unauthenticated kafka instance on AMQ Streams, you can specify any non-empty string.

      For information on how to obtain these values when you use OpenShift Streams, see Obtaining Kafka credentials.

    • securityProtocol - You know the security protocol for communicating with the Kafka brokers. For a Kafka cluster on OpenShift Streams, it is SASL_SSL (the default). For a Kafka cluster on AMQ streams, it is PLAINTEXT.
  • You know which Kamelets you want to add to your Camel K integration and the required instance parameters.

    The example Kamelets for this procedure are:

    • The coffee-source Kamelet - It has an optional parameter, period, that specifies how often to send each event. You can copy the code from Example source Kamelet to a file named coffee-source.kamelet.yaml file and then run the following command to add it as a resource to your namespace:

      oc apply -f coffee-source.kamelet.yaml

    • The kafka-sink Kamelet provided in the Kamelet Catalog. You use the kafka-sink Kamelet because the Kafka topic is receiving data (it is the data consumer) in this binding.

Procedure

To connect a data source to a Kafka topic, create a Kamelet Binding:

  1. In an editor of your choice, create a YAML file with the following basic structure:

    apiVersion: camel.apache.org/v1alpha1
    kind: KameletBinding
    metadata:
      name:
    spec:
      source:
      sink:
  2. Add a name for the Kamelet Binding. For this example, the name is coffees-to-kafka because the binding connects the coffee-source Kamelet to the kafka-sink Kamelet.

    apiVersion: camel.apache.org/v1alpha1
    kind: KameletBinding
    metadata:
      name: coffees-to-kafka
    spec:
      source:
      sink:
  3. For the Kamelet Binding’s source, specify a data source Kamelet (for example, the coffee-source Kamelet produces events that contain data about coffee) and configure any parameters for the Kamelet.

    apiVersion: camel.apache.org/v1alpha1
    kind: KameletBinding
    metadata:
      name: coffees-to-kafka
    spec:
      source:
        ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: coffee-source
        properties:
          period: 5000
      sink:
  4. For the Kamelet Binding’s sink, specify the kafka-sink Kamelet and its required properties.

    For example, when the Kafka cluster is on OpenShift Streams:

    • For the user property, specify the clientID, for example: srvc-acct-eb575691-b94a-41f1-ab97-50ade0cd1094
    • For the password property, specify the password, for example: facf3df1-3c8d-4253-aa87-8c95ca5e1225
    • You do not need to set the securityProtocol property.

      apiVersion: camel.apache.org/v1alpha1
      kind: KameletBinding
      metadata:
        name: coffees-to-kafka
      spec:
        source:
          ref:
            kind: Kamelet
            apiVersion: camel.apache.org/v1alpha1
            name: coffee-source
          properties:
            period: 5000
        sink:
          ref:
            kind: Kamelet
            apiVersion: camel.apache.org/v1alpha1
            name: kafka-sink
          properties:
            bootstrapServers: "my-kafka--ptdfzrhmlkwqw-a-ykm-mawgdh.kafka.devshift.org:443"
            password: "facf3df1-3c8d-4253-aa87-8c95ca5e1225"
            topic: "test-topic"
            user: "srvc-acct-eb575691-b94a-41f1-ab97-50ade0cd1094"

      For another example, when the Kafka cluster is on AMQ Streams, set the securityProtocol property to “PLAINTEXT”:

      apiVersion: camel.apache.org/v1alpha1
      kind: KameletBinding
      metadata:
        name: coffees-to-kafka
      spec:
        source:
          ref:
            kind: Kamelet
            apiVersion: camel.apache.org/v1alpha1
            name: coffee-source
          properties:
            period: 5000
        sink:
          ref:
            kind: Kamelet
            apiVersion: camel.apache.org/v1alpha1
            name: kafka-sink
          properties:
            bootstrapServers: "broker.url:9092"
            password: "testpassword"
            topic: "test-topic"
            user: "testuser"
            securityProtocol: "PLAINTEXT"
  5. Save the YAML file (for example, coffees-to-kafka.yaml).
  6. Log into your OpenShift project.
  7. Add the Kamelet Binding as a resource to your OpenShift namespace:

    oc apply -f <kamelet binding filename>

    For example:

    oc apply -f coffees-to-kafka.yaml

    The Camel K operator generates and runs a Camel K integration by using the KameletBinding resource. It might take a few minutes to build.

  8. To see the status of the KameletBinding resource:

    oc get kameletbindings

  9. To see the status of their integrations:

    oc get integrations

  10. To view the integration’s log:

    kamel logs <integration> -n <project>

    For example:

    kamel logs coffees-to-kafka -n my-camel-k-kafka

2.4. Connecting a Kafka topic to a data sink in a Kamelet Binding

To connect a Kafka topic to a data sink, you create a Kamelet Binding as illustrated in Figure 2.3.

Connecting a Kafka topic to a data sink Figure 2.3 Connecting a Kafka topic to a data sink

Prerequisites

  • You know the name of the Kafka topic from which you want to send events. The example in this procedure uses test-topic for sending events. It is the same topic that you used to receive events from the coffee source in Connecting a data source to a Kafka topic in a Kamelet Binding.
  • You know the values of the following parameters for your Kafka instance:

    • bootstrapServers - A comma separated list of Kafka Broker URLs.
    • password - The password to authenticate to Kafka.
    • user - The user name to authenticate to Kafka.

      For information on how to obtain these values when you use OpenShift Streams, see Obtaining Kafka credentials.

  • You know the security protocol for communicating with the Kafka brokers. For a Kafka cluster on OpenShift Streams, it is SASL_SSL (the default). For a Kafka cluster on AMQ streams, it is PLAINTEXT.
  • You know which Kamelets you want to add to your Camel K integration and the required instance parameters. The example Kamelets for this procedure are provided in the Kamelet Catalog:

    • The kafka-source Kamelet - Use the kafka-source Kamelet because the Kafka topic is sending data (it is the data producer) in this binding. The example values for the required parameters are:

      • bootstrapServers - "broker.url:9092"
      • password - "testpassword"
      • user - "testuser"
      • topic - "test-topic"
      • securityProtocol - For a Kafka cluster on OpenShift Streams, you do not need to set this parameter because SASL_SSL is the default value. For a Kafka cluster on AMQ streams, this parameter value is “PLAINTEXT”.
    • The log-sink Kamelet - Use the log-sink to log the data that it receives from the kafka-source Kamelet. Optionally, specify the showStreams parameter to show the message body of the data. The log-sink Kamelet is useful for debugging purposes.

Procedure

To connect a Kafka topic to a data sink, create a Kamelet Binding:

  1. In an editor of your choice, create a YAML file with the following basic structure:

    apiVersion: camel.apache.org/v1alpha1
    kind: KameletBinding
    metadata:
      name:
    spec:
      source:
      sink:
  2. Add a name for the Kamelet Binding. For this example, the name is kafka-to-log because the binding connects the kafka-source Kamelet to the log-sink Kamelet.

    apiVersion: camel.apache.org/v1alpha1
    kind: KameletBinding
    metadata:
      name: kafka-to-log
    spec:
      source:
      sink:
  3. For the Kamelet Binding’s source, specify the kafka-source Kamelet and configure its parameters.

    For example, when the Kafka cluster is on OpenShift Streams (you do not need to set the securityProtocol parameter):

    apiVersion: camel.apache.org/v1alpha1
    kind: KameletBinding
    metadata:
      name: kafka-to-log
    spec:
      source:
        ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: kafka-source
        properties:
          bootstrapServers: "broker.url:9092"
          password: "testpassword"
          topic: "test-topic"
          user: "testuser"
      sink:

    For example, when the Kafka cluster is on AMQ Streams you must set the securityProtocol parameter to “PLAINTEXT”:

    apiVersion: camel.apache.org/v1alpha1
    kind: KameletBinding
    metadata:
      name: kafka-to-log
    spec:
      source:
        ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: kafka-source
        properties:
          bootstrapServers: "broker.url:9092"
          password: "testpassword"
          topic: "test-topic"
          user: "testuser"
          securityProtocol: "PLAINTEXT"
      sink:
  4. For the Kamelet Binding’s sink, specify the data consumer Kamelet (for example, the log-sink Kamelet) and configure any parameters for the Kamelet, for example:

    apiVersion: camel.apache.org/v1alpha1
    kind: KameletBinding
    metadata:
      name: kafka-to-log
    spec:
      source:
        ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: kafka-source
        properties:
          bootstrapServers: "broker.url:9092"
          password: "testpassword"
          topic: "test-topic"
          user: "testuser"
          securityProtocol: "PLAINTEXT" // only for AMQ streams
      sink:
        ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: log-sink
        properties:
          showStreams: true
  5. Save the YAML file (for example, kafka-to-log.yaml).
  6. Log into your OpenShift project.
  7. Add the Kamelet Binding as a resource to your OpenShift namespace:

    oc apply -f <kamelet binding filename>

    For example:

    oc apply -f kafka-to-log.yaml

    The Camel K operator generates and runs a Camel K integration by using the KameletBinding resource. It might take a few minutes to build.

  8. To see the status of the KameletBinding resource:

    oc get kameletbindings

  9. To see the status of their integrations:

    oc get integrations

  10. To view the integration’s log:

    kamel logs <integration> -n <project>

    For example:

    kamel logs kafka-to-log -n my-camel-k-kafka

    In the output, you should see coffee events, for example:

    INFO  [log-sink-E80C5C904418150-0000000000000001] (Camel (camel-1) thread #0 - timer://tick) {"id":7259,"uid":"a4ecb7c2-05b8-4a49-b0d2-d1e8db5bc5e2","blend_name":"Postmodern Symphony","origin":"Huila, Colombia","variety":"Kona","notes":"delicate, chewy, black currant, red apple, star fruit","intensifier":"balanced"}
  11. To stop a running integration, delete the associated Kamelet Binding resource:

    oc delete kameletbindings/<kameletbinding-name>

    For example:

    oc delete kameletbindings/kafka-to-log

2.5. Applying operations to data within a Kafka connection

If you want to perform an operation on the data that passes between a Kamelet and a Kafka topic, use action Kamelets as intermediary steps within a Kamelet Binding.

2.5.1. Routing event data to different destination topics

When you configure a connection to a Kafka instance, you can optionally transform the topic information from the event data so that the event is routed to a different Kafka topic. Use one of the following transformation action Kamelets:

  • Regex Router - Modify the topic of a message by using a regular expression and a replacement string. For example, if you want to remove a topic prefix, add a prefix, or remove part of a topic name. Configure the Regex Router Action Kamelet (regex-router-action).
  • TimeStamp - Modify the topic of a message based on the original topic and the message’s timestamp. For example, when using a sink that needs to write to different tables or indexes based on timestamps. For example, when you want to write events from Kafka to Elasticsearch, but each event needs to go to a different index based on information in the event itself. Configure the Timestamp Router Action Kamelet (timestamp-router-action).
  • Message TimeStamp - Modify the topic of a message based on the original topic value and the timestamp field coming from a message value field. Configure the Message Timestamp Router Action Kamelet (message-timestamp-router-action).
  • Predicate - Filter events based on the given JSON path expression by configuring the Predicate Filter Action Kamelet (predicate-filter-action).

Prerequisites

Procedure

To transform the destination topic, use one of the transformation action Kamelets as an intermediary step within the Kamelet Binding.

For details on how to add an action Kamelet to a Kamelet Binding, see Adding an operation to a Kamelet Binding.

2.5.2. Filtering event data for a specific Kafka topic

If you use a source Kamelet that produces records to many different Kafka topics and you want to filter out the records to one Kafka topic, add the topic-name-matches-filter-action Kamelet as an intermediary step in the Kamelet Binding.

Prerequisites

  • You have created a Kamelet Binding in a YAML file.
  • You know the name of the Kafka topic from which you want to filter out event data.

Procedure

  1. Edit the Kamelet Binding to include the topic-name-matches-filter-action Kamelet as an intermediary step between the source and sink Kamelets.

    Typically, you use the kafka-source Kamelet, as the source Kamelet and you supply a topic as the value of the required topic parameter.

    In the following Kamelet Binding example, the kafka-source Kamelet specifies the test-topic, test-topic-2, and test-topic-3 Kafka topics and the topic-name-matches-filter-action Kamelet specifies to filter out the event data from the topic-test topic:

    apiVersion: camel.apache.org/v1alpha1
    kind: KameletBinding
    metadata:
      name: kafka-to-log-by-topic
    spec:
      source:
        ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: kafka-source
        properties:
          bootstrapServers: "broker.url:9092"
          password: "testpassword"
          topic: "test-topic, test-topic-2, test-topic-3"
          user: "testuser"
          securityProtocol: "PLAINTEXT" // only for AMQ streams
    steps:
      - ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: topic-name-matches-filter-action
        properties:
          regex:  "test-topic"
    sink:
        ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: log-sink
        properties:
            showStreams: true

    If you want to filter topics coming from a source Kamelet other than the kafka-source Kamelet, you must supply the Kafka topic information. You can use the insert-header-action Kamelet to add a Kafka topic field as an intermediary step, before the topic-name-matches-filter-action step in the Kamelet Binding as shown in the following example:

    apiVersion: camel.apache.org/v1alpha1
    kind: KameletBinding
    metadata:
      name: coffee-to-log-by-topic
    spec:
      source:
        ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: coffee-source
        properties:
          period: 5000
    steps:
      - ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: insert-header-action
        properties:
          name:  "KAFKA.topic"
          value:  "test-topic"
      - ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: topic-name-matches-filter-action
        properties:
          regex:  "test-topic"
    sink:
        ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: log-sink
        properties:
          showStreams: true
  2. Save the Kamelet Binding YAML file.
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.