Chapter 4. Extending Camel Kafka Connector
This chapter explains how to extend and customize Camel Kafka connectors and components. Camel Kafka Connector provides an easy way to configure Camel components directly in the Kafka Connect framework, without needing to write code. However, in some scenarios, you might want to extend and customize Camel Kafka Connector for specific use cases.
4.1. Configuring a Camel Kafka connector aggregator
In some scenarios using a Camel Kafka sink connector, you might want to add an aggregator to batch up your Kafka records before sending them to the external sink system. Typically, this involves defining a specific batch size and timeout for aggregation of records. When complete, the aggregate record is sent to the external system.
You can configure aggregation settings in your Camel Kafka Connector properties using one of the aggregators provided by Apache Camel, or you can implement a custom aggregator in Java. This section describes how to configure the Camel aggregator settings in your Camel Kafka Connector properties.
Prerequisites
- You must have installed Camel Kafka Connector, for example, see Section 2.2, “Installing AMQ Streams and Kafka Connect S2I on OpenShift”.
- You must have deployed your sink connector, for example, see Section 2.3, “Deploying Camel Kafka Connector using Kafka Connect S2I on OpenShift”. This section shows an example using the AWS S3 sink connector.
Procedure
Configure your sink connector and aggregator settings in Camel Kafka Connector properties, depending on your installation platform:
- OpenShift
The following example shows the AWS S3 sink connector and aggregator configuration in a custom resource:
oc apply -f - << EOF apiVersion: kafka.strimzi.io/v1alpha1 kind: KafkaConnector metadata: name: s3-sink-connector namespace: myproject labels: strimzi.io/cluster: my-connect-cluster spec: class: org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SinkConnector tasksMax: 1 config: key.converter: org.apache.kafka.connect.storage.StringConverter value.converter: org.apache.kafka.connect.storage.StringConverter topics: s3-topic camel.sink.path.bucketNameOrArn: camel-kafka-connector camel.sink.endpoint.keyName: ${date:now:yyyyMMdd-HHmmssSSS}-${exchangeId} # Camel aggregator settings camel.beans.aggregate: class:org.apache.camel.kafkaconnector.aggregator.StringAggregator camel.beans.aggregation.size: 10 camel.beans.aggregation.timeout: 5000 camel.component.aws2-s3.accessKey: xxxx camel.component.aws2-s3.secretKey: yyyy camel.component.aws2-s3.region: region EOF
- Red Hat Enterprise Linux
The following example shows the AWS S3 sink connector and aggregator configuration in the
CamelAwss3SinkConnector.properties
file:name=CamelAWS2S3SinkConnector connector.class=org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SinkConnector key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter topics=mytopic camel.sink.path.bucketNameOrArn=camel-kafka-connector camel.component.aws2-s3.access-key=xxxx camel.component.aws2-s3.secret-key=yyyy camel.component.aws2-s3.region=eu-west-1 camel.sink.endpoint.keyName=${date:now:yyyyMMdd-HHmmssSSS}-${exchangeId} # Camel aggregator settings camel.beans.aggregate=class:org.apache.camel.kafkaconnector.aggregator.StringAggregator camel.beans.aggregation.size=10 camel.beans.aggregation.timeout=5000
4.2. Writing a custom Camel Kafka connector aggregator
In some scenarios using a Camel Kafka sink connector, you might want to add an aggregator to batch up your Kafka records before sending them to the external sink system. Typically, this involves defining a specific batch size and timeout for aggregation of records. When complete, the aggregate record is sent to the external system.
You can implement your own aggregator or configure one of the aggregators provided by Apache Camel. This section describes how to implement a custom aggregator in Java using the Camel AggregationStrategy
class.
Prerequisites
- You must have Red Hat Fuse installed.
Procedure
Write your own custom aggregator by implementing the Camel
AggregationStrategy
class, for example:package org.apache.camel.kafkaconnector.aggregator; import org.apache.camel.AggregationStrategy; import org.apache.camel.Exchange; import org.apache.camel.Message; public class StringAggregator implements AggregationStrategy { @Override public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { 1 // lets append the old body to the new body if (oldExchange == null) { return newExchange; } String body = oldExchange.getIn().getBody(String.class); 2 if (body != null) { Message newIn = newExchange.getIn(); String newBody = newIn.getBody(String.class); if (newBody != null) { body += System.lineSeparator() + newBody; } newIn.setBody(body); } return newExchange; 3 } }
- 1
- The
oldExchange
andnewExchange
objects correspond to the Kafka records arriving at the aggregator. - 2
- In this case, each
newExchange
body will be concatenated with theoldExchange
body and separated using theSystem
line separator. - 3
- This process continues until the batch size is completed or the timeout is reached.
- Add your custom aggregator code to your existing Camel Kafka connector. See Section 4.4, “Extending Camel Kafka connectors using Maven archetypes”.
Additional resources
4.3. Configuring Camel data formats in Camel Kafka Connector
Camel Kafka Connector provides marshaling/unmarshaling of Camel data formats for sink and source connectors. For example, these formats include Apache Avro, Base64, Google Protobuf, JSON, SOAP, Zip file, and many more.
Typically, you would use a Camel DataFormat
in your Camel DSL to marshal and unmarshal messages to and from different Camel data formats. For example, if you are receiving messages from a Camel File or JMS component and want to unmarshal the payload for further processing, you can use a DataFormat
to implement this in the Camel DSL.
Using Camel Kafka Connector, you can simply configure marshaling and unmarshaling of Camel data formats using properties in your connector configuration. This section shows how to configure marshaling for the Camel Zip file data format using the camel.sink.marshal: zipfile
property.
Prerequisites
- You must have Camel Kafka Connector installed on OpenShift or Red Hat Enterprise Linux.
-
You must have already built your connector starting from an archetype and edited your
pom.xml
to add the required dependencies. See Section 4.4, “Extending Camel Kafka connectors using Maven archetypes”.
Procedure
Configure the connector settings for marshalling/unmarshalling the data format in your Camel Kafka Connector configuration, depending on your installation platform:
- OpenShift
The following example shows the AWS S3 sink connector and Camel Zip data format configuration in a custom resource:
oc apply -f - << EOF apiVersion: kafka.strimzi.io/v1alpha1 kind: KafkaConnector metadata: name: s3-sink-connector namespace: myproject labels: strimzi.io/cluster: my-connect-cluster spec: class: org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SinkConnector tasksMax: 1 config: key.converter: org.apache.kafka.connect.storage.StringConverter value.converter: org.apache.kafka.connect.storage.StringConverter topics: s3-topic camel.sink.path.bucketNameOrArn: camel-kafka-connector camel.sink.endpoint.keyName: ${date:now:yyyyMMdd-HHmmssSSS}-${exchangeId}.zip # Camel data format setting camel.sink.marshal: zipfile camel.component.aws2-s3.accessKey: xxxx camel.component.aws2-s3.secretKey: yyyy camel.component.aws2-s3.region: region EOF
- Red Hat Enterprise Linux
The following example shows the AWS S3 sink connector and Camel Zip data configuration in the
CamelAwss3SinkConnector.properties
file:name=CamelAWS2S3SinkConnector connector.class=org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SinkConnector key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter topics=mytopic # Camel data format setting camel.sink.marshal=zipfile camel.sink.path.bucketNameOrArn=camel-kafka-connector camel.component.aws2-s3.access-key=xxxx camel.component.aws2-s3.secret-key=yyyy camel.component.aws2-s3.region=eu-west-1 camel.sink.endpoint.keyName=${date:now:yyyyMMdd-HHmmssSSS}-${exchangeId}.zip
4.4. Extending Camel Kafka connectors using Maven archetypes
In some scenarios, you might need to extend your Camel Kafka Connector system. For example, when using a sink connector, you might want to add a custom aggregator to batch up your Kafka records before sending them to the external sink system. Alternatively, you might want to configure a connector for marshaling or unmarshaling of Camel data formats, such as Apache Avro, Google Protobuf, JSON, or Zip file.
You can extend an existing Camel Kafka connector using the Maven camel-kafka-connector-extensible-archetype
. An archetype is a Maven project template, which provides a consistent way of generating a project. This section describes how to use the archetype to create a Maven project to be extended and how to add your project dependencies.
Using Maven archetypes to write additional Kafka Connect converters or transformers is not included in the Technology Preview and has community support only.
Prerequisites
- You must have Apache Maven installed.
Procedure
Enter the
mvn archetype:generate
command to create a Maven project to extend Camel Kafka Connector. For example:$ mvn archetype:generate -DarchetypeGroupId=org.apache.camel.kafkaconnector.archetypes -DarchetypeArtifactId=camel-kafka-connector-extensible-archetype -DarchetypeVersion=CONNECTOR_VERSION [INFO] Scanning for projects... [INFO] [INFO] ------------------< org.apache.maven:standalone-pom >------------------- [INFO] Building Maven Stub Project (No POM) 1 [INFO] --------------------------------[ pom ]--------------------------------- [INFO] [INFO] >>> maven-archetype-plugin:3.1.2:generate (default-cli) > generate-sources @ standalone-pom >>> [INFO] [INFO] <<< maven-archetype-plugin:3.1.2:generate (default-cli) < generate-sources @ standalone-pom <<< [INFO] [INFO] [INFO] --- maven-archetype-plugin:3.1.2:generate (default-cli) @ standalone-pom --- [INFO] Generating project in Interactive mode [INFO] Archetype repository not defined. Using the one from [org.apache.camel.kafkaconnector.archetypes:camel-kafka-connector-extensible-archetype:0.4.0] found in catalog remote
Enter values for each of the properties when prompted. The following example extends a
camel-aws2-s3-kafka-connector
:Define value for property 'groupId': org.apache.camel.kafkaconnector.extended Define value for property 'artifactId': myconnector-extended Define value for property 'version' 1.0-SNAPSHOT: : Define value for property 'package' org.apache.camel.kafkaconnector.extended: : Define value for property 'camel-kafka-connector-name': camel-aws2-s3-kafka-connector [INFO] Using property: camel-kafka-connector-version = CONNECTOR_VERSION Confirm properties configuration: groupId: org.apache.camel.kafkaconnector.extended artifactId: myconnector-extended version: 1.0-SNAPSHOT package: org.apache.camel.kafkaconnector.extended camel-kafka-connector-name: camel-aws2-s3-kafka-connector camel-kafka-connector-version: CONNECTOR_VERSION
Enter
Y
to confirm your properties:Y: : Y [INFO] ---------------------------------------------------------------------------- [INFO] Using following parameters for creating project from Archetype: camel-kafka-connector-extensible-archetype:CONNECTOR_VERSION [INFO] ---------------------------------------------------------------------------- [INFO] Parameter: groupId, Value: org.apache.camel.kafkaconnector.extended [INFO] Parameter: artifactId, Value: myconnector-extended [INFO] Parameter: version, Value: 1.0-SNAPSHOT [INFO] Parameter: package, Value: org.apache.camel.kafkaconnector.extended [INFO] Parameter: packageInPathFormat, Value: org/apache/camel/kafkaconnector/extended [INFO] Parameter: package, Value: org.apache.camel.kafkaconnector.extended [INFO] Parameter: version, Value: 1.0-SNAPSHOT [INFO] Parameter: groupId, Value: org.apache.camel.kafkaconnector.extended [INFO] Parameter: camel-kafka-connector-name, Value: camel-aws2-s3-kafka-connector [INFO] Parameter: camel-kafka-connector-version, Value: CONNECTOR_VERSION [INFO] Parameter: artifactId, Value: myconnector-extended [INFO] Project created from Archetype in dir: /home/workspace/myconnector-extended [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 05:44 min [INFO] Finished at: 2020-09-04T08:55:00+02:00 [INFO] ------------------------------------------------------------------------
-
Enter the dependencies that you need in the
pom.xml
for the created Maven project. Build the Maven project to create a
.zip
ortar.gz
file for your extended Camel Kafka connector:mvn clean package
Additional resources