Chapter 47. Kafka Batch Source


Receive data from Kafka topics in batch through Plain Login Module and commit them manually through KafkaManualCommit..

47.1. Configuration Options

The following table summarizes the configuration options available for the kafka-batch-source Kamelet:

Expand
PropertyNameDescriptionTypeDefaultExample

bootstrapServers *

Bootstrap Servers

Comma separated list of Kafka Broker URLs

string

  

password *

Password

Password to authenticate to kafka

string

  

topic *

Topic Names

Comma separated list of Kafka topic names

string

  

user *

Username

Username to authenticate to Kafka

string

  

allowManualCommit

Allow Manual Commit

Whether to allow doing manual commits

boolean

False

 

autoCommitEnable

Auto Commit Enable

If true, periodically commit to ZooKeeper the offset of messages already fetched by the consumer

boolean

True

 

autoOffsetReset

Auto Offset Reset

What to do when there is no initial offset. There are 3 enums and the value can be one of latest, earliest, none

string

latest

 

batchSize

Batch Dimension

The maximum number of records returned in a single call to poll()

integer

500

 

batchingIntervalMs

Batching Interval

In consumer batching mode, then this option is specifying a time in millis, to trigger batch completion eager when the current batch size has not reached the maximum size defined by maxPollRecords. Notice the trigger is not exact at the given interval, as this can only happen between kafka polls (see pollTimeoutMs option).

integer

  

consumerGroup

Consumer Group

A string that uniquely identifies the group of consumers to which this source belongs

string

 

my-group-id

deserializeHeaders

Automatically Deserialize Headers

When enabled the Kamelet source will deserialize all message headers to String representation.

boolean

True

 

maxPollIntervalMs

Max Poll Interval

The maximum delay between invocations of poll() when using consumer group management

integer

  

pollOnError

Poll On Error Behavior

What to do if kafka threw an exception while polling for new messages. There are 5 enums and the value can be one of DISCARD, ERROR_HANDLER, RECONNECT, RETRY, STOP

string

ERROR_HANDLER

 

pollTimeout

Poll Timeout Interval

The timeout used when polling the KafkaConsumer

integer

5000

 

saslMechanism

SASL Mechanism

The Simple Authentication and Security Layer (SASL) Mechanism used.

string

PLAIN

 

securityProtocol

Security Protocol

Protocol used to communicate with brokers. SASL_PLAINTEXT, PLAINTEXT, SASL_SSL and SSL are supported

string

SASL_SSL

 

topicIsPattern

Topic Is Pattern

Whether the topic is a pattern (regular expression). This can be used to subscribe to dynamic number of topics matching the pattern.

boolean

False

 

* = Fields marked with an asterisk are mandatory.

47.2. Dependencies

47.2.1. Quarkus dependencies

<dependencies>
  <dependency>
    <groupId>org.apache.camel.quarkus</groupId>
    <artifact>camel-quarkus-core</artifact>
  </dependency>
  <dependency>
    <groupId>org.apache.camel.quarkus</groupId>
    <artifact>camel-quarkus-kafka</artifact>
  </dependency>
  <dependency>
    <groupId>org.apache.camel.quarkus</groupId>
    <artifact>camel-quarkus-kamelet</artifact>
  </dependency>
  <dependency>
    <groupId>org.apache.camel.kamelets</groupId>
    <artifact>camel-kamelets-utils</artifact>
    <version>4.8.5</version>
  </dependency>
</dependencies>

47.3. Usage

47.3.1. Camel JBang usage

47.3.1.1. Prerequisites for JBang

  • Install JBang.
  • You have executed the following command:

    jbang app install camel@apache/camel

47.3.1.2. Running a route with JBang

Suppose you have a file named route.yaml with this content:

- route:
    from:
      uri: "kamelet:timer-source"
      parameters:
        period: 10000
        message: 'test'
      steps:
        - to:
            uri: "kamelet:log-sink"

You can now run it directly through the following command.

camel run route.yaml

47.3.2. Knative Source

You can use the kafka-batch-source Kamelet as a Knative source by binding it to a Knative object.

kafka-batch-source-binding.yaml

apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
  name: kafka-batch-source-binding
spec:
  source:
    ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1
      name: kafka-batch-source
    properties:
      bootstrapServers: "The Bootstrap Servers"
      password: "The Password"
      topic: "The Topic Names"
      user: "The Username"
  sink:
    ref:
      kind: Channel
      apiVersion: messaging.knative.dev/v1
      name: mychannel

47.3.3. Kafka Source

You can use the kafka-batch-source Kamelet as a Kafka source by binding it to a Kafka topic.

kafka-batch-source-binding.yaml

apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
  name: kafka-batch-source-binding
spec:
  source:
    ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1
      name: kafka-batch-source
    properties:
      bootstrapServers: "The Bootstrap Servers"
      password: "The Password"
      topic: "The Topic Names"
      user: "The Username"
  sink:
    ref:
      kind: KafkaTopic
      apiVersion: kafka.strimzi.io/v1beta1
      name: my-topic

47.4. Kamelets source file

https://github.com/jboss-fuse/camel-kamelets/blob/camel-kamelets-4.10.3-branch/kamelets/kafka-batch-source.kamelet.yaml

Red Hat logoGithubredditYoutubeTwitter

Learn

Try, buy, & sell

Communities

About Red Hat Documentation

We help Red Hat users innovate and achieve their goals with our products and services with content they can trust. Explore our recent updates.

Making open source more inclusive

Red Hat is committed to replacing problematic language in our code, documentation, and web properties. For more details, see the Red Hat Blog.

About Red Hat

We deliver hardened solutions that make it easier for enterprises to work across platforms and environments, from the core datacenter to the network edge.

Theme

© 2026 Red Hat
Back to top