Chapter 4. Extending Camel Kafka Connector
This chapter explains how to extend and customize Camel Kafka connectors and components. Camel Kafka Connector provides an easy way to configure Camel components directly in the Kafka Connect framework, without needing to write code. However, in some scenarios, you might want to extend and customize Camel Kafka Connector for specific use cases.
4.1. Configuring a Camel Kafka connector aggregator Copy linkLink copied to clipboard!
In some scenarios using a Camel Kafka sink connector, you might want to add an aggregator to batch up your Kafka records before sending them to the external sink system. Typically, this involves defining a specific batch size and timeout for aggregation of records. When complete, the aggregate record is sent to the external system.
You can configure aggregation settings in your Camel Kafka Connector properties using one of the aggregators provided by Apache Camel, or you can implement a custom aggregator in Java. This section describes how to configure the Camel aggregator settings in your Camel Kafka Connector properties.
Prerequisites
- You must have installed Camel Kafka Connector, for example, see Section 2.2, “Installing AMQ Streams and Kafka Connect S2I on OpenShift”.
- You must have deployed your sink connector, for example, see Section 2.3, “Deploying Camel Kafka Connector using Kafka Connect S2I on OpenShift”. This section shows an example using the AWS S3 sink connector.
Procedure
Configure your sink connector and aggregator settings in Camel Kafka Connector properties, depending on your installation platform:
- OpenShift
The following example shows the AWS S3 sink connector and aggregator configuration in a custom resource:
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - Red Hat Enterprise Linux
The following example shows the AWS S3 sink connector and aggregator configuration in the
CamelAwss3SinkConnector.propertiesfile:Copy to Clipboard Copied! Toggle word wrap Toggle overflow
4.2. Writing a custom Camel Kafka connector aggregator Copy linkLink copied to clipboard!
In some scenarios using a Camel Kafka sink connector, you might want to add an aggregator to batch up your Kafka records before sending them to the external sink system. Typically, this involves defining a specific batch size and timeout for aggregation of records. When complete, the aggregate record is sent to the external system.
You can implement your own aggregator or configure one of the aggregators provided by Apache Camel. This section describes how to implement a custom aggregator in Java using the Camel AggregationStrategy class.
Prerequisites
- You must have Red Hat Fuse installed.
Procedure
Write your own custom aggregator by implementing the Camel
AggregationStrategyclass, for example:Copy to Clipboard Copied! Toggle word wrap Toggle overflow - 1
- The
oldExchangeandnewExchangeobjects correspond to the Kafka records arriving at the aggregator. - 2
- In this case, each
newExchangebody will be concatenated with theoldExchangebody and separated using theSystemline separator. - 3
- This process continues until the batch size is completed or the timeout is reached.
- Add your custom aggregator code to your existing Camel Kafka connector. See Section 4.4, “Extending Camel Kafka connectors using Maven archetypes”.
Additional resources
4.3. Configuring Camel data formats in Camel Kafka Connector Copy linkLink copied to clipboard!
Camel Kafka Connector provides marshaling/unmarshaling of Camel data formats for sink and source connectors. For example, these formats include Apache Avro, Base64, Google Protobuf, JSON, SOAP, Zip file, and many more.
Typically, you would use a Camel DataFormat in your Camel DSL to marshal and unmarshal messages to and from different Camel data formats. For example, if you are receiving messages from a Camel File or JMS component and want to unmarshal the payload for further processing, you can use a DataFormat to implement this in the Camel DSL.
Using Camel Kafka Connector, you can simply configure marshaling and unmarshaling of Camel data formats using properties in your connector configuration. This section shows how to configure marshaling for the Camel Zip file data format using the camel.sink.marshal: zipfile property.
Prerequisites
- You must have Camel Kafka Connector installed on OpenShift or Red Hat Enterprise Linux.
-
You must have already built your connector starting from an archetype and edited your
pom.xmlto add the required dependencies. See Section 4.4, “Extending Camel Kafka connectors using Maven archetypes”.
Procedure
Configure the connector settings for marshalling/unmarshalling the data format in your Camel Kafka Connector configuration, depending on your installation platform:
- OpenShift
The following example shows the AWS S3 sink connector and Camel Zip data format configuration in a custom resource:
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - Red Hat Enterprise Linux
The following example shows the AWS S3 sink connector and Camel Zip data configuration in the
CamelAwss3SinkConnector.propertiesfile:Copy to Clipboard Copied! Toggle word wrap Toggle overflow
4.4. Extending Camel Kafka connectors using Maven archetypes Copy linkLink copied to clipboard!
In some scenarios, you might need to extend your Camel Kafka Connector system. For example, when using a sink connector, you might want to add a custom aggregator to batch up your Kafka records before sending them to the external sink system. Alternatively, you might want to configure a connector for marshaling or unmarshaling of Camel data formats, such as Apache Avro, Google Protobuf, JSON, or Zip file.
You can extend an existing Camel Kafka connector using the Maven camel-kafka-connector-extensible-archetype. An archetype is a Maven project template, which provides a consistent way of generating a project. This section describes how to use the archetype to create a Maven project to be extended and how to add your project dependencies.
Using Maven archetypes to write additional Kafka Connect converters or transformers is not included in the Technology Preview and has community support only.
Prerequisites
- You must have Apache Maven installed.
Procedure
Enter the
mvn archetype:generatecommand to create a Maven project to extend Camel Kafka Connector. For example:Copy to Clipboard Copied! Toggle word wrap Toggle overflow Enter values for each of the properties when prompted. The following example extends a
camel-aws2-s3-kafka-connector:Copy to Clipboard Copied! Toggle word wrap Toggle overflow Enter
Yto confirm your properties:Copy to Clipboard Copied! Toggle word wrap Toggle overflow -
Enter the dependencies that you need in the
pom.xmlfor the created Maven project. Build the Maven project to create a
.ziportar.gzfile for your extended Camel Kafka connector:mvn clean package
mvn clean packageCopy to Clipboard Copied! Toggle word wrap Toggle overflow
Additional resources