第90章 Kafka


Kafka コンポーネント

Camel 2.13 で利用可能
kafka: コンポーネントは、Apache Kafka メッセージブローカーとの通信に使用されます。
Maven ユーザーは、このコンポーネントの以下の依存関係を pom.xml に追加する必要があります。
<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-kafka</artifactId>
    <version>2.17.0.redhat-630xxx</version>
    <!-- use the same version as your Camel core version -->
</dependency>
Copy to Clipboard Toggle word wrap
Camel 2.17 以降: Kafka Java クライアントを使用するため、Scala は使用されなくなりました。
Camel 2.16 以前: 選択した Scala ライブラリーの Maven 依存関係も追加する必要があります。camel-kafka はその依存関係を含みませんが、提供を想定します。たとえば、Scala 2.10.4 を使用するには、以下を追加します。
<dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>2.10.4</version>
</dependency>
Copy to Clipboard Toggle word wrap

URI 形式

kafka:server:port[?options]
Copy to Clipboard Toggle word wrap

オプション

Expand
プロパティー
デフォルト
説明
zookeeperHost
使用する zookeeper ホスト
zookeeperPort
2181
使用する zookeeper ポート
zookeeperConnect Camel 2.13.3/2.14.1: 使用している場合は、zookeeperHost/zookeeperPort は使用されません。
topic
使用するトピック
groupId
partitioner
consumerStreams 10
clientId
zookeeperSessionTimeoutMs
zookeeperConnectionTimeoutMs
zookeeperSyncTimeMs
consumersCount 1 Camel 2.15.0: Kafka サーバーに接続するコンシューマーの数。
batchSize 100 Camel 2.15.0: BatchingConsumerTask が 1 度処理する batchSize
barrierAwaitTimeoutMs 10000 Camel 2.15.0: BatchingConsumerTask のエクスチェンジが batchSize を超える場合は、barrierAwaitTimeoutMs まで待機します。
bridgeEndpoint false Camel 2.16.0: bridgeEndpointtrue の場合、プロデューサーはメッセージのトピックヘッダー設定を無視します。
以下の形式で URI にクエリーオプションを追加できます。 ?option=value&option=value&...

プロデューサーオプション

Expand
プロパティー
デフォルト
説明
producerType
sync
以下の値を使用できます。
  • sync : メッセージ/バッチを即座に送信し、応答を受け取るまで待機します。
  • async : 送信するメッセージ/バッチをキューに入れます。queueBufferingMaxMs または batchNumMessages でこのキューからポーリングするブローカー(Kafka ノード)ごとにスレッドがあります。
compressionCodec
compressedTopics
messageSendMaxRetries
retryBackoffMs
topicMetadataRefreshIntervalMs
sendBufferBytes
requestRequiredAcks
requestTimeoutMs
queueBufferingMaxMs
queueBufferingMaxMessages
queueEnqueueTimeoutMs
batchNumMessages
serializerClass
keySerializerClass

コンシューマーオプション

Expand
プロパティー
デフォルト
説明
consumerId
socketTimeoutMs
socketReceiveBufferBytes
fetchMessageMaxBytes
autoCommitEnable
autoCommitIntervalMs
queuedMaxMessages
rebalanceMaxRetries
fetchMinBytes
fetchWaitMaxMs
rebalanceBackoffMs
refreshLeaderBackoffMs
autoOffsetReset
consumerTimeoutMs

サンプル

メッセージの使用:
from("kafka:localhost:9092?topic=test&zookeeperHost=localhost&zookeeperPort=2181&groupId=group1").to("log:input");
Copy to Clipboard Toggle word wrap
メッセージを生成します。
その他の例は、camel-kafka のユニットテスト を参照してください。

エンドポイント

Camel は、Endpoint インターフェイスを使用した Message Endpoint パターンをサポートします。エンドポイントは通常コンポーネントによって 作成 され、エンドポイントは通常 URI を介して DSL で参照され ます
エンドポイントから以下のメソッドを使用できます。

関連項目

トップに戻る
Red Hat logoGithubredditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。 最新の更新を見る.

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

Theme

© 2025 Red Hat