此内容没有您所选择的语言版本。
Chapter 90. Kafka
Kafka Component
Available as of Camel 2.13
The kafka: component is used for communicating with Apache Kafka message broker.
Maven users will need to add the following dependency to their
pom.xml
for this component:
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-kafka</artifactId> <version>2.17.0.redhat-630xxx</version> <!-- use the same version as your Camel core version --> </dependency>
Camel 2.17 or newer: Scala is no longer used, as we use the Kafka Java client.
Camel 2.16 or older: You must also add a Maven dependency for your chosen Scala library. camel-kafka does not include that dependency, but assumes its provided. For example, to use Scala 2.10.4 add:
<dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.10.4</version> </dependency>
URI format
kafka:server:port[?options]
Options
Property
|
Default
|
Description
|
---|---|---|
zookeeperHost
|
|
The zookeeper host to use
|
zookeeperPort
|
2181
|
The zookeeper port to use
|
zookeeperConnect
|
Camel 2.13.3/2.14.1: If in use, then zookeeperHost/zookeeperPort is not used. | |
topic
|
|
The topic to use
|
groupId
|
||
partitioner
|
||
consumerStreams
|
10 | |
clientId
|
||
zookeeperSessionTimeoutMs
|
||
zookeeperConnectionTimeoutMs
|
||
zookeeperSyncTimeMs
|
||
consumersCount
|
1
|
Camel 2.15.0: The number of consumers that connect to Kafka server. |
batchSize
|
100
|
Camel 2.15.0: The batchSize that the BatchingConsumerTask processes once.
|
barrierAwaitTimeoutMs
|
10000
|
Camel 2.15.0: If the BatchingConsumerTask processes exchange exceed the batchSize , it will wait for barrierAwaitTimeoutMs .
|
bridgeEndpoint
|
false
|
Camel 2.16.0: If the bridgeEndpoint is true , the producer will ignore the topic header setting of the message.
|
You can append query options to the URI in the following format,
?option=value&option=value&...
Producer Options
Property
|
Default
|
Description
|
---|---|---|
producerType
|
sync
|
Can have the following values:
|
compressionCodec
|
||
compressedTopics
|
||
messageSendMaxRetries
|
||
retryBackoffMs
|
||
topicMetadataRefreshIntervalMs
|
||
sendBufferBytes
|
||
requestRequiredAcks
|
||
requestTimeoutMs
|
||
queueBufferingMaxMs
|
||
queueBufferingMaxMessages
|
||
queueEnqueueTimeoutMs
|
||
batchNumMessages
|
||
serializerClass
|
||
keySerializerClass
|
Consumer Options
Property
|
Default
|
Description
|
---|---|---|
consumerId
|
|
|
socketTimeoutMs
|
||
socketReceiveBufferBytes
|
||
fetchMessageMaxBytes
|
||
autoCommitEnable
|
||
autoCommitIntervalMs
|
||
queuedMaxMessages
|
||
rebalanceMaxRetries
|
||
fetchMinBytes
|
||
fetchWaitMaxMs
|
||
rebalanceBackoffMs
|
||
refreshLeaderBackoffMs
|
||
autoOffsetReset
|
||
consumerTimeoutMs
|
Samples
Consuming messages:
from("kafka:localhost:9092?topic=test&zookeeperHost=localhost&zookeeperPort=2181&groupId=group1").to("log:input");
Producing messages:
See unit tests of camel-kafka for more examples
Endpoints
Camel supports the Message Endpoint pattern using the Endpoint interface. Endpoints are usually created by a Component and Endpoints are usually referred to in the DSL via their URIs.
From an Endpoint you can use the following methods
- createProducer() will create a Producer for sending message exchanges to the endpoint
- createConsumer() implements the Event Driven Consumer pattern for consuming message exchanges from the endpoint via a Processor when creating a Consumer
- createPollingConsumer() implements the Polling Consumer pattern for consuming message exchanges from the endpoint via a PollingConsumer