Fuse 6 is no longer supported
As of February 2025, Red Hat Fuse 6 is no longer supported. If you are using Fuse 6, please upgrade to Red Hat build of Apache Camel.Este conteúdo não está disponível no idioma selecionado.
Chapter 79. Kafka
Kafka Component Copiar o linkLink copiado para a área de transferência!
Copiar o linkLink copiado para a área de transferência!
Available as of Camel 2.13
The kafka: component is used for communicating with Apache Kafka message broker.
Maven users will need to add the following dependency to their
pom.xml for this component:
URI format Copiar o linkLink copiado para a área de transferência!
Copiar o linkLink copiado para a área de transferência!
kafka:server:port[?options]
kafka:server:port[?options]
Options Copiar o linkLink copiado para a área de transferência!
Copiar o linkLink copiado para a área de transferência!
|
Property
|
Default
|
Description
|
|---|---|---|
|
zookeeperHost
|
|
The zookeeper host to use
|
|
zookeeperPort
|
2181
|
The zookeeper port to use
|
| zookeeperConnect | Camel 2.13.3/2.14.1: If in use, then zookeeperHost/zookeeperPort is not used. | |
|
topic
|
|
The topic to use
|
| groupId | ||
| partitioner | ||
| consumerStreams | 10 | |
| clientId | ||
| zookeeperSessionTimeoutMs | ||
| zookeeperConnectionTimeoutMs | ||
| zookeeperSyncTimeMs | ||
| consumersCount |
1
|
Camel 2.15.0: The number of consumers that connect to Kafka server. |
| batchSize |
100
|
Camel 2.15.0: The batchSize that the BatchingConsumerTask processes once.
|
| barrierAwaitTimeoutMs |
10000
|
Camel 2.15.0: If the BatchingConsumerTask processes exchange exceed the batchSize, it will wait for barrierAwaitTimeoutMs.
|
You can append query options to the URI in the following format,
?option=value&option=value&...
Producer Options Copiar o linkLink copiado para a área de transferência!
Copiar o linkLink copiado para a área de transferência!
|
Property
|
Default
|
Description
|
|---|---|---|
|
producerType
|
|
|
| compressionCodec | ||
| compressedTopics | ||
| messageSendMaxRetries | ||
| retryBackoffMs | ||
| topicMetadataRefreshIntervalMs | ||
| sendBufferBytes | ||
| requestRequiredAcks | ||
| requestTimeoutMs | ||
| queueBufferingMaxMs | ||
| queueBufferingMaxMessages | ||
| queueEnqueueTimeoutMs | ||
| batchNumMessages | ||
| serializerClass | ||
| keySerializerClass |
Consumer Options Copiar o linkLink copiado para a área de transferência!
Copiar o linkLink copiado para a área de transferência!
|
Property
|
Default
|
Description
|
|---|---|---|
|
consumerId
|
|
|
| socketTimeoutMs | ||
| socketReceiveBufferBytes | ||
| fetchMessageMaxBytes | ||
| autoCommitEnable | ||
| autoCommitIntervalMs | ||
| queuedMaxMessages | ||
| rebalanceMaxRetries | ||
| fetchMinBytes | ||
| fetchWaitMaxMs | ||
| rebalanceBackoffMs | ||
| refreshLeaderBackoffMs | ||
| autoOffsetReset | ||
| consumerTimeoutMs |
Samples Copiar o linkLink copiado para a área de transferência!
Copiar o linkLink copiado para a área de transferência!
Consuming messages:
from("kafka:localhost:9092?topic=test&zookeeperHost=localhost&zookeeperPort=2181&groupId=group1").to("log:input");
from("kafka:localhost:9092?topic=test&zookeeperHost=localhost&zookeeperPort=2181&groupId=group1").to("log:input");
Producing messages:
See unit tests of camel-kafka for more examples
Endpoints Copiar o linkLink copiado para a área de transferência!
Copiar o linkLink copiado para a área de transferência!
Camel supports the Message Endpoint pattern using the Endpoint interface. Endpoints are usually created by a Component and Endpoints are usually referred to in the DSL via their URIs.
From an Endpoint you can use the following methods
- createProducer() will create a Producer for sending message exchanges to the endpoint
- createConsumer() implements the Event Driven Consumer pattern for consuming message exchanges from the endpoint via a Processor when creating a Consumer
- createPollingConsumer() implements the Polling Consumer pattern for consuming message exchanges from the endpoint via a PollingConsumer
See Also Copiar o linkLink copiado para a área de transferência!
Copiar o linkLink copiado para a área de transferência!