此内容没有您所选择的语言版本。

Chapter 90. Kafka


Kafka Component

Available as of Camel 2.13
The kafka: component is used for communicating with Apache Kafka message broker.
Maven users will need to add the following dependency to their pom.xml for this component:
<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-kafka</artifactId>
    <version>2.17.0.redhat-630xxx</version>
    <!-- use the same version as your Camel core version -->
</dependency>
Camel 2.17 or newer: Scala is no longer used, as we use the Kafka Java client.
Camel 2.16 or older: You must also add a Maven dependency for your chosen Scala library. camel-kafka does not include that dependency, but assumes its provided. For example, to use Scala 2.10.4 add:
<dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>2.10.4</version>
</dependency>

URI format

kafka:server:port[?options]

Options

Property
Default
Description
zookeeperHost
The zookeeper host to use
zookeeperPort
2181
The zookeeper port to use
zookeeperConnect Camel 2.13.3/2.14.1: If in use, then zookeeperHost/zookeeperPort is not used.
topic
The topic to use
groupId
partitioner
consumerStreams 10
clientId
zookeeperSessionTimeoutMs
zookeeperConnectionTimeoutMs
zookeeperSyncTimeMs
consumersCount 1 Camel 2.15.0: The number of consumers that connect to Kafka server.
batchSize 100 Camel 2.15.0: The batchSize that the BatchingConsumerTask processes once.
barrierAwaitTimeoutMs 10000 Camel 2.15.0: If the BatchingConsumerTask processes exchange exceed the batchSize, it will wait for barrierAwaitTimeoutMs.
bridgeEndpoint false Camel 2.16.0: If the bridgeEndpoint is true, the producer will ignore the topic header setting of the message.
You can append query options to the URI in the following format, ?option=value&option=value&...

Producer Options

Property
Default
Description
producerType
sync
Can have the following values:
  • sync - send message/batch immediately, and wait until response is received.
  • async - queue the message/batch to send. There is a thread per broker (Kafka node) which polls from this queue upon queueBufferingMaxMs or batchNumMessages.
compressionCodec
compressedTopics
messageSendMaxRetries
retryBackoffMs
topicMetadataRefreshIntervalMs
sendBufferBytes
requestRequiredAcks
requestTimeoutMs
queueBufferingMaxMs
queueBufferingMaxMessages
queueEnqueueTimeoutMs
batchNumMessages
serializerClass
keySerializerClass

Consumer Options

Property
Default
Description
consumerId
socketTimeoutMs
socketReceiveBufferBytes
fetchMessageMaxBytes
autoCommitEnable
autoCommitIntervalMs
queuedMaxMessages
rebalanceMaxRetries
fetchMinBytes
fetchWaitMaxMs
rebalanceBackoffMs
refreshLeaderBackoffMs
autoOffsetReset
consumerTimeoutMs

Samples

Consuming messages:
from("kafka:localhost:9092?topic=test&zookeeperHost=localhost&zookeeperPort=2181&groupId=group1").to("log:input");
Producing messages:
See unit tests of camel-kafka for more examples

Endpoints

Camel supports the Message Endpoint pattern using the Endpoint interface. Endpoints are usually created by a Component and Endpoints are usually referred to in the DSL via their URIs.
From an Endpoint you can use the following methods

See Also

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.