Fuse 6 is no longer supported
As of February 2025, Red Hat Fuse 6 is no longer supported. If you are using Fuse 6, please upgrade to Red Hat build of Apache Camel.第90章 Kafka
Kafka コンポーネント リンクのコピーリンクがクリップボードにコピーされました!
リンクのコピーリンクがクリップボードにコピーされました!
Camel 2.13 で利用可能
kafka: コンポーネントは、Apache Kafka メッセージブローカーとの通信に使用されます。
Maven ユーザーは、このコンポーネントの以下の依存関係を
pom.xml
に追加する必要があります。
Camel 2.17 以降: Kafka Java クライアントを使用するため、Scala は使用されなくなりました。
Camel 2.16 以前: 選択した Scala ライブラリーの Maven 依存関係も追加する必要があります。camel-kafka はその依存関係を含みませんが、提供を想定します。たとえば、Scala 2.10.4 を使用するには、以下を追加します。
<dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.10.4</version> </dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.10.4</version>
</dependency>
URI 形式 リンクのコピーリンクがクリップボードにコピーされました!
リンクのコピーリンクがクリップボードにコピーされました!
kafka:server:port[?options]
kafka:server:port[?options]
オプション リンクのコピーリンクがクリップボードにコピーされました!
リンクのコピーリンクがクリップボードにコピーされました!
プロパティー
|
デフォルト
|
説明
|
---|---|---|
zookeeperHost
|
|
使用する zookeeper ホスト
|
zookeeperPort
|
2181
|
使用する zookeeper ポート
|
zookeeperConnect
|
Camel 2.13.3/2.14.1: 使用している場合は、zookeeperHost/zookeeperPort は使用されません。 | |
topic
|
|
使用するトピック
|
groupId
|
||
partitioner
|
||
consumerStreams
|
10 | |
clientId
|
||
zookeeperSessionTimeoutMs
|
||
zookeeperConnectionTimeoutMs
|
||
zookeeperSyncTimeMs
|
||
consumersCount
|
1
|
Camel 2.15.0: Kafka サーバーに接続するコンシューマーの数。 |
batchSize
|
100
|
Camel 2.15.0: BatchingConsumerTask が 1 度処理する batchSize 。
|
barrierAwaitTimeoutMs
|
10000
|
Camel 2.15.0: BatchingConsumerTask のエクスチェンジが batchSize を超える場合は、barrierAwaitTimeoutMs まで待機します。
|
bridgeEndpoint
|
false
|
Camel 2.16.0: bridgeEndpoint が true の場合、プロデューサーはメッセージのトピックヘッダー設定を無視します。
|
以下の形式で URI にクエリーオプションを追加できます。
?option=value&option=value&...
プロデューサーオプション リンクのコピーリンクがクリップボードにコピーされました!
リンクのコピーリンクがクリップボードにコピーされました!
プロパティー
|
デフォルト
|
説明
|
---|---|---|
producerType
|
sync
|
以下の値を使用できます。
|
compressionCodec
|
||
compressedTopics
|
||
messageSendMaxRetries
|
||
retryBackoffMs
|
||
topicMetadataRefreshIntervalMs
|
||
sendBufferBytes
|
||
requestRequiredAcks
|
||
requestTimeoutMs
|
||
queueBufferingMaxMs
|
||
queueBufferingMaxMessages
|
||
queueEnqueueTimeoutMs
|
||
batchNumMessages
|
||
serializerClass
|
||
keySerializerClass
|
コンシューマーオプション リンクのコピーリンクがクリップボードにコピーされました!
リンクのコピーリンクがクリップボードにコピーされました!
プロパティー
|
デフォルト
|
説明
|
---|---|---|
consumerId
|
|
|
socketTimeoutMs
|
||
socketReceiveBufferBytes
|
||
fetchMessageMaxBytes
|
||
autoCommitEnable
|
||
autoCommitIntervalMs
|
||
queuedMaxMessages
|
||
rebalanceMaxRetries
|
||
fetchMinBytes
|
||
fetchWaitMaxMs
|
||
rebalanceBackoffMs
|
||
refreshLeaderBackoffMs
|
||
autoOffsetReset
|
||
consumerTimeoutMs
|
サンプル リンクのコピーリンクがクリップボードにコピーされました!
リンクのコピーリンクがクリップボードにコピーされました!
メッセージの使用:
from("kafka:localhost:9092?topic=test&zookeeperHost=localhost&zookeeperPort=2181&groupId=group1").to("log:input");
from("kafka:localhost:9092?topic=test&zookeeperHost=localhost&zookeeperPort=2181&groupId=group1").to("log:input");
メッセージを生成します。
その他の例は、camel-kafka のユニットテスト を参照してください。
エンドポイント リンクのコピーリンクがクリップボードにコピーされました!
リンクのコピーリンクがクリップボードにコピーされました!
Camel は、Endpoint インターフェイスを使用した Message Endpoint パターンをサポートします。エンドポイントは通常コンポーネントによって 作成 され、エンドポイントは通常 URI を介して DSL で参照され ます。
エンドポイントから以下のメソッドを使用できます。
- createProducer () は、メッセージエクスチェンジをエンドポイントに送信するために Producer を作成します。
- createPollingConsumer () は、PollingConsumer経由でエンドポイントからメッセージエクスチェンジを消費するために Polling Consumer パターンを実装します。
関連項目 リンクのコピーリンクがクリップボードにコピーされました!
リンクのコピーリンクがクリップボードにコピーされました!