Search

Chapter 4. Extending Camel Kafka Connector

download PDF

This chapter explains how to extend and customize Camel Kafka connectors and components. Camel Kafka Connector provides an easy way to configure Camel components directly in the Kafka Connect framework, without needing to write code. However, in some scenarios, you might want to extend and customize Camel Kafka Connector for specific use cases.

4.1. Configuring a Camel Kafka connector aggregator

In some scenarios using a Camel Kafka sink connector, you might want to add an aggregator to batch up your Kafka records before sending them to the external sink system. Typically, this involves defining a specific batch size and timeout for aggregation of records. When complete, the aggregate record is sent to the external system.

You can configure aggregation settings in your Camel Kafka Connector properties using one of the aggregators provided by Apache Camel, or you can implement a custom aggregator in Java. This section describes how to configure the Camel aggregator settings in your Camel Kafka Connector properties.

Prerequisites

Procedure

  • Configure your sink connector and aggregator settings in Camel Kafka Connector properties, depending on your installation platform:

    OpenShift

    The following example shows the AWS S3 sink connector and aggregator configuration in a custom resource:

    oc apply -f - << EOF
    apiVersion: kafka.strimzi.io/v1alpha1
    kind: KafkaConnector
    metadata:
      name: s3-sink-connector
      namespace: myproject
      labels:
        strimzi.io/cluster: my-connect-cluster
    spec:
      class: org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SinkConnector
      tasksMax: 1
      config:
        key.converter: org.apache.kafka.connect.storage.StringConverter
        value.converter: org.apache.kafka.connect.storage.StringConverter
        topics: s3-topic
        camel.sink.path.bucketNameOrArn: camel-kafka-connector
        camel.sink.endpoint.keyName: ${date:now:yyyyMMdd-HHmmssSSS}-${exchangeId}
        # Camel aggregator settings
        camel.beans.aggregate: class:org.apache.camel.kafkaconnector.aggregator.StringAggregator
        camel.beans.aggregation.size: 10
        camel.beans.aggregation.timeout: 5000
        camel.component.aws2-s3.accessKey: xxxx
        camel.component.aws2-s3.secretKey: yyyy
        camel.component.aws2-s3.region: region
    EOF
    Red Hat Enterprise Linux

    The following example shows the AWS S3 sink connector and aggregator configuration in the CamelAwss3SinkConnector.properties file:

    name=CamelAWS2S3SinkConnector
    connector.class=org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SinkConnector
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter
    
    topics=mytopic
    
    camel.sink.path.bucketNameOrArn=camel-kafka-connector
    
    camel.component.aws2-s3.access-key=xxxx
    camel.component.aws2-s3.secret-key=yyyy
    camel.component.aws2-s3.region=eu-west-1
    
    camel.sink.endpoint.keyName=${date:now:yyyyMMdd-HHmmssSSS}-${exchangeId}
    
    # Camel aggregator settings
    camel.beans.aggregate=class:org.apache.camel.kafkaconnector.aggregator.StringAggregator
    camel.beans.aggregation.size=10
    camel.beans.aggregation.timeout=5000

4.2. Writing a custom Camel Kafka connector aggregator

In some scenarios using a Camel Kafka sink connector, you might want to add an aggregator to batch up your Kafka records before sending them to the external sink system. Typically, this involves defining a specific batch size and timeout for aggregation of records. When complete, the aggregate record is sent to the external system.

You can implement your own aggregator or configure one of the aggregators provided by Apache Camel. This section describes how to implement a custom aggregator in Java using the Camel AggregationStrategy class.

Prerequisites

  • You must have Red Hat Fuse installed.

Procedure

  1. Write your own custom aggregator by implementing the Camel AggregationStrategy class, for example:

    package org.apache.camel.kafkaconnector.aggregator;
    
    import org.apache.camel.AggregationStrategy;
    import org.apache.camel.Exchange;
    import org.apache.camel.Message;
    
    public class StringAggregator implements AggregationStrategy {
    
        @Override
        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { 1
            // lets append the old body to the new body
            if (oldExchange == null) {
                return newExchange;
            }
    
            String body = oldExchange.getIn().getBody(String.class); 2
            if (body != null) {
                Message newIn = newExchange.getIn();
                String newBody = newIn.getBody(String.class);
                if (newBody != null) {
                    body += System.lineSeparator() + newBody;
                }
    
                newIn.setBody(body);
            }
            return newExchange; 3
        }
    }
    1
    The oldExchange and newExchange objects correspond to the Kafka records arriving at the aggregator.
    2
    In this case, each newExchange body will be concatenated with the oldExchange body and separated using the System line separator.
    3
    This process continues until the batch size is completed or the timeout is reached.
  2. Add your custom aggregator code to your existing Camel Kafka connector. See Section 4.4, “Extending Camel Kafka connectors using Maven archetypes”.

4.3. Configuring Camel data formats in Camel Kafka Connector

Camel Kafka Connector provides marshaling/unmarshaling of Camel data formats for sink and source connectors. For example, these formats include Apache Avro, Base64, Google Protobuf, JSON, SOAP, Zip file, and many more.

Typically, you would use a Camel DataFormat in your Camel DSL to marshal and unmarshal messages to and from different Camel data formats. For example, if you are receiving messages from a Camel File or JMS component and want to unmarshal the payload for further processing, you can use a DataFormat to implement this in the Camel DSL.

Using Camel Kafka Connector, you can simply configure marshaling and unmarshaling of Camel data formats using properties in your connector configuration. This section shows how to configure marshaling for the Camel Zip file data format using the camel.sink.marshal: zipfile property.

Prerequisites

Procedure

  • Configure the connector settings for marshalling/unmarshalling the data format in your Camel Kafka Connector configuration, depending on your installation platform:

    OpenShift

    The following example shows the AWS S3 sink connector and Camel Zip data format configuration in a custom resource:

    oc apply -f - << EOF
    apiVersion: kafka.strimzi.io/v1alpha1
    kind: KafkaConnector
    metadata:
      name: s3-sink-connector
      namespace: myproject
      labels:
        strimzi.io/cluster: my-connect-cluster
    spec:
      class: org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SinkConnector
      tasksMax: 1
      config:
        key.converter: org.apache.kafka.connect.storage.StringConverter
        value.converter: org.apache.kafka.connect.storage.StringConverter
        topics: s3-topic
        camel.sink.path.bucketNameOrArn: camel-kafka-connector
        camel.sink.endpoint.keyName: ${date:now:yyyyMMdd-HHmmssSSS}-${exchangeId}.zip
        # Camel data format setting
        camel.sink.marshal: zipfile
        camel.component.aws2-s3.accessKey: xxxx
        camel.component.aws2-s3.secretKey: yyyy
        camel.component.aws2-s3.region: region
    EOF
    Red Hat Enterprise Linux

    The following example shows the AWS S3 sink connector and Camel Zip data configuration in the CamelAwss3SinkConnector.properties file:

    name=CamelAWS2S3SinkConnector
    connector.class=org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SinkConnector
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter
    
    topics=mytopic
    
    # Camel data format setting
    camel.sink.marshal=zipfile
    
    camel.sink.path.bucketNameOrArn=camel-kafka-connector
    
    camel.component.aws2-s3.access-key=xxxx
    camel.component.aws2-s3.secret-key=yyyy
    camel.component.aws2-s3.region=eu-west-1
    
    camel.sink.endpoint.keyName=${date:now:yyyyMMdd-HHmmssSSS}-${exchangeId}.zip

4.4. Extending Camel Kafka connectors using Maven archetypes

In some scenarios, you might need to extend your Camel Kafka Connector system. For example, when using a sink connector, you might want to add a custom aggregator to batch up your Kafka records before sending them to the external sink system. Alternatively, you might want to configure a connector for marshaling or unmarshaling of Camel data formats, such as Apache Avro, Google Protobuf, JSON, or Zip file.

You can extend an existing Camel Kafka connector using the Maven camel-kafka-connector-extensible-archetype. An archetype is a Maven project template, which provides a consistent way of generating a project. This section describes how to use the archetype to create a Maven project to be extended and how to add your project dependencies.

Note

Using Maven archetypes to write additional Kafka Connect converters or transformers is not included in the Technology Preview and has community support only.

Prerequisites

  • You must have Apache Maven installed.

Procedure

  1. Enter the mvn archetype:generate command to create a Maven project to extend Camel Kafka Connector. For example:

    $ mvn archetype:generate  -DarchetypeGroupId=org.apache.camel.kafkaconnector.archetypes  -DarchetypeArtifactId=camel-kafka-connector-extensible-archetype  -DarchetypeVersion=CONNECTOR_VERSION
    [INFO] Scanning for projects...
    [INFO]
    [INFO] ------------------< org.apache.maven:standalone-pom >-------------------
    [INFO] Building Maven Stub Project (No POM) 1
    [INFO] --------------------------------[ pom ]---------------------------------
    [INFO]
    [INFO] >>> maven-archetype-plugin:3.1.2:generate (default-cli) > generate-sources @ standalone-pom >>>
    [INFO]
    [INFO] <<< maven-archetype-plugin:3.1.2:generate (default-cli) < generate-sources @ standalone-pom <<<
    [INFO]
    [INFO]
    [INFO] --- maven-archetype-plugin:3.1.2:generate (default-cli) @ standalone-pom ---
    [INFO] Generating project in Interactive mode
    [INFO] Archetype repository not defined. Using the one from [org.apache.camel.kafkaconnector.archetypes:camel-kafka-connector-extensible-archetype:0.4.0] found in catalog remote
  2. Enter values for each of the properties when prompted. The following example extends a camel-aws2-s3-kafka-connector:

    Define value for property 'groupId': org.apache.camel.kafkaconnector.extended
    Define value for property 'artifactId': myconnector-extended
    Define value for property 'version' 1.0-SNAPSHOT: :
    Define value for property 'package' org.apache.camel.kafkaconnector.extended: :
    Define value for property 'camel-kafka-connector-name': camel-aws2-s3-kafka-connector
    [INFO] Using property: camel-kafka-connector-version = CONNECTOR_VERSION
    Confirm properties configuration:
    groupId: org.apache.camel.kafkaconnector.extended
    artifactId: myconnector-extended
    version: 1.0-SNAPSHOT
    package: org.apache.camel.kafkaconnector.extended
    camel-kafka-connector-name: camel-aws2-s3-kafka-connector
    camel-kafka-connector-version: CONNECTOR_VERSION
  3. Enter Y to confirm your properties:

    Y: : Y
    [INFO] ----------------------------------------------------------------------------
    [INFO] Using following parameters for creating project from Archetype: camel-kafka-connector-extensible-archetype:CONNECTOR_VERSION
    [INFO] ----------------------------------------------------------------------------
    [INFO] Parameter: groupId, Value: org.apache.camel.kafkaconnector.extended
    [INFO] Parameter: artifactId, Value: myconnector-extended
    [INFO] Parameter: version, Value: 1.0-SNAPSHOT
    [INFO] Parameter: package, Value: org.apache.camel.kafkaconnector.extended
    [INFO] Parameter: packageInPathFormat, Value: org/apache/camel/kafkaconnector/extended
    [INFO] Parameter: package, Value: org.apache.camel.kafkaconnector.extended
    [INFO] Parameter: version, Value: 1.0-SNAPSHOT
    [INFO] Parameter: groupId, Value: org.apache.camel.kafkaconnector.extended
    [INFO] Parameter: camel-kafka-connector-name, Value: camel-aws2-s3-kafka-connector
    [INFO] Parameter: camel-kafka-connector-version, Value: CONNECTOR_VERSION
    [INFO] Parameter: artifactId, Value: myconnector-extended
    [INFO] Project created from Archetype in dir: /home/workspace/myconnector-extended
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time:  05:44 min
    [INFO] Finished at: 2020-09-04T08:55:00+02:00
    [INFO] ------------------------------------------------------------------------
  4. Enter the dependencies that you need in the pom.xml for the created Maven project.
  5. Build the Maven project to create a .zip or tar.gz file for your extended Camel Kafka connector:

    mvn clean package
Red Hat logoGithubRedditYoutubeTwitter

Learn

Try, buy, & sell

Communities

About Red Hat Documentation

We help Red Hat users innovate and achieve their goals with our products and services with content they can trust.

Making open source more inclusive

Red Hat is committed to replacing problematic language in our code, documentation, and web properties. For more details, see the Red Hat Blog.

About Red Hat

We deliver hardened solutions that make it easier for enterprises to work across platforms and environments, from the core datacenter to the network edge.

© 2024 Red Hat, Inc.