Fuse 6 is no longer supported
As of February 2025, Red Hat Fuse 6 is no longer supported. If you are using Fuse 6, please upgrade to Red Hat build of Apache Camel.이 콘텐츠는 선택한 언어로 제공되지 않습니다.
Chapter 83. Kafka
Kafka Component 링크 복사링크가 클립보드에 복사되었습니다!
링크 복사링크가 클립보드에 복사되었습니다!
Available as of Camel 2.13
The kafka: component is used for communicating with Apache Kafka message broker.
Maven users will need to add the following dependency to their
pom.xml
for this component:
URI format 링크 복사링크가 클립보드에 복사되었습니다!
링크 복사링크가 클립보드에 복사되었습니다!
kafka:server:port[?options]
kafka:server:port[?options]
Options 링크 복사링크가 클립보드에 복사되었습니다!
링크 복사링크가 클립보드에 복사되었습니다!
Property
|
Default
|
Description
|
---|---|---|
zookeeperHost
|
|
The zookeeper host to use
|
zookeeperPort
|
2181
|
The zookeeper port to use
|
zookeeperConnect | Camel 2.13.3/2.14.1: If in use, then zookeeperHost/zookeeperPort is not used. | |
topic
|
|
The topic to use
|
groupId | ||
partitioner | ||
consumerStreams | 10 | |
clientId | ||
zookeeperSessionTimeoutMs | ||
zookeeperConnectionTimeoutMs | ||
zookeeperSyncTimeMs | ||
consumersCount |
1
|
Camel 2.15.0: The number of consumers that connect to Kafka server. |
batchSize |
100
|
Camel 2.15.0: The batchSize that the BatchingConsumerTask processes once.
|
barrierAwaitTimeoutMs |
10000
|
Camel 2.15.0: If the BatchingConsumerTask processes exchange exceed the batchSize , it will wait for barrierAwaitTimeoutMs .
|
You can append query options to the URI in the following format,
?option=value&option=value&...
Producer Options 링크 복사링크가 클립보드에 복사되었습니다!
링크 복사링크가 클립보드에 복사되었습니다!
Property
|
Default
|
Description
|
---|---|---|
producerType
|
|
|
compressionCodec | ||
compressedTopics | ||
messageSendMaxRetries | ||
retryBackoffMs | ||
topicMetadataRefreshIntervalMs | ||
sendBufferBytes | ||
requestRequiredAcks | ||
requestTimeoutMs | ||
queueBufferingMaxMs | ||
queueBufferingMaxMessages | ||
queueEnqueueTimeoutMs | ||
batchNumMessages | ||
serializerClass | ||
keySerializerClass |
Consumer Options 링크 복사링크가 클립보드에 복사되었습니다!
링크 복사링크가 클립보드에 복사되었습니다!
Property
|
Default
|
Description
|
---|---|---|
consumerId
|
|
|
socketTimeoutMs | ||
socketReceiveBufferBytes | ||
fetchMessageMaxBytes | ||
autoCommitEnable | ||
autoCommitIntervalMs | ||
queuedMaxMessages | ||
rebalanceMaxRetries | ||
fetchMinBytes | ||
fetchWaitMaxMs | ||
rebalanceBackoffMs | ||
refreshLeaderBackoffMs | ||
autoOffsetReset | ||
consumerTimeoutMs |
Samples 링크 복사링크가 클립보드에 복사되었습니다!
링크 복사링크가 클립보드에 복사되었습니다!
Consuming messages:
from("kafka:localhost:9092?topic=test&zookeeperHost=localhost&zookeeperPort=2181&groupId=group1").to("log:input");
from("kafka:localhost:9092?topic=test&zookeeperHost=localhost&zookeeperPort=2181&groupId=group1").to("log:input");
Producing messages:
See unit tests of camel-kafka for more examples
Endpoints 링크 복사링크가 클립보드에 복사되었습니다!
링크 복사링크가 클립보드에 복사되었습니다!
Camel supports the Message Endpoint pattern using the Endpoint interface. Endpoints are usually created by a Component and Endpoints are usually referred to in the DSL via their URIs.
From an Endpoint you can use the following methods
- createProducer() will create a Producer for sending message exchanges to the endpoint
- createConsumer() implements the Event Driven Consumer pattern for consuming message exchanges from the endpoint via a Processor when creating a Consumer
- createPollingConsumer() implements the Polling Consumer pattern for consuming message exchanges from the endpoint via a PollingConsumer
See Also 링크 복사링크가 클립보드에 복사되었습니다!
링크 복사링크가 클립보드에 복사되었습니다!