Chapter 79. Kafka
Kafka Component Copy linkLink copied to clipboard!
Copy linkLink copied to clipboard!
Available as of Camel 2.13
The kafka: component is used for communicating with Apache Kafka message broker.
Maven users will need to add the following dependency to their
pom.xml for this component:
URI format Copy linkLink copied to clipboard!
Copy linkLink copied to clipboard!
kafka:server:port[?options]
kafka:server:port[?options]
Options Copy linkLink copied to clipboard!
Copy linkLink copied to clipboard!
|
Property
|
Default
|
Description
|
|---|---|---|
|
zookeeperHost
|
|
The zookeeper host to use
|
|
zookeeperPort
|
2181
|
The zookeeper port to use
|
| zookeeperConnect | Camel 2.13.3/2.14.1: If in use, then zookeeperHost/zookeeperPort is not used. | |
|
topic
|
|
The topic to use
|
| groupId | ||
| partitioner | ||
| consumerStreams | 10 | |
| clientId | ||
| zookeeperSessionTimeoutMs | ||
| zookeeperConnectionTimeoutMs | ||
| zookeeperSyncTimeMs | ||
| consumersCount |
1
|
Camel 2.15.0: The number of consumers that connect to Kafka server. |
| batchSize |
100
|
Camel 2.15.0: The batchSize that the BatchingConsumerTask processes once.
|
| barrierAwaitTimeoutMs |
10000
|
Camel 2.15.0: If the BatchingConsumerTask processes exchange exceed the batchSize, it will wait for barrierAwaitTimeoutMs.
|
You can append query options to the URI in the following format,
?option=value&option=value&...
Producer Options Copy linkLink copied to clipboard!
Copy linkLink copied to clipboard!
|
Property
|
Default
|
Description
|
|---|---|---|
|
producerType
|
|
|
| compressionCodec | ||
| compressedTopics | ||
| messageSendMaxRetries | ||
| retryBackoffMs | ||
| topicMetadataRefreshIntervalMs | ||
| sendBufferBytes | ||
| requestRequiredAcks | ||
| requestTimeoutMs | ||
| queueBufferingMaxMs | ||
| queueBufferingMaxMessages | ||
| queueEnqueueTimeoutMs | ||
| batchNumMessages | ||
| serializerClass | ||
| keySerializerClass |
Consumer Options Copy linkLink copied to clipboard!
Copy linkLink copied to clipboard!
|
Property
|
Default
|
Description
|
|---|---|---|
|
consumerId
|
|
|
| socketTimeoutMs | ||
| socketReceiveBufferBytes | ||
| fetchMessageMaxBytes | ||
| autoCommitEnable | ||
| autoCommitIntervalMs | ||
| queuedMaxMessages | ||
| rebalanceMaxRetries | ||
| fetchMinBytes | ||
| fetchWaitMaxMs | ||
| rebalanceBackoffMs | ||
| refreshLeaderBackoffMs | ||
| autoOffsetReset | ||
| consumerTimeoutMs |
Samples Copy linkLink copied to clipboard!
Copy linkLink copied to clipboard!
Consuming messages:
from("kafka:localhost:9092?topic=test&zookeeperHost=localhost&zookeeperPort=2181&groupId=group1").to("log:input");
from("kafka:localhost:9092?topic=test&zookeeperHost=localhost&zookeeperPort=2181&groupId=group1").to("log:input");
Producing messages:
See unit tests of camel-kafka for more examples
Endpoints Copy linkLink copied to clipboard!
Copy linkLink copied to clipboard!
Camel supports the Message Endpoint pattern using the Endpoint interface. Endpoints are usually created by a Component and Endpoints are usually referred to in the DSL via their URIs.
From an Endpoint you can use the following methods
- createProducer() will create a Producer for sending message exchanges to the endpoint
- createConsumer() implements the Event Driven Consumer pattern for consuming message exchanges from the endpoint via a Processor when creating a Consumer
- createPollingConsumer() implements the Polling Consumer pattern for consuming message exchanges from the endpoint via a PollingConsumer
See Also Copy linkLink copied to clipboard!
Copy linkLink copied to clipboard!