6.2. Kafka コンシューマーアプリケーションの例
この Java ベースの Kafka コンシューマーアプリケーションは、Kafka トピックからのメッセージを消費する自己完結型アプリケーションの例です。クライアントは、Kafka Consumer
API を使用して、指定されたトピックからメッセージを非同期に取得して処理し、いくつかのエラー処理を行います。メッセージが正常に処理された後にオフセットをコミットすることにより、少なくとも 1 回のセマンティクスに従います。
クライアントは、パーティション処理のための ConsumerRebalanceListener
インターフェイスと、オフセットをコミットするための OffsetCommitCallback
インターフェイスを実装します。
Kafka コンシューマーアプリケーションを実行するには、Consumer
クラスの main
メソッドを実行します。クライアントは、NUM_MESSAGES
メッセージ (設定例では 50) が消費されるまで、Kafka トピックからのメッセージを消費します。コンシューマーは、複数のスレッドが同時に安全にアクセスできるように設計されていません。
このサンプルクライアントは、特定のユースケース向けに、より複雑な Kafka コンシューマーを構築するための基本基盤を提供します。セキュアな接続の実装 など、追加の機能を組み込むことができます。
前提条件
-
指定された
BOOTSTRAP_SERVERS
で実行されている Kafka ブローカー -
メッセージが消費される
TOPIC_NAME
という名前の Kafka トピック。 - クライアント依存関係
Kafka コンシューマーアプリケーションを実装する前に、プロジェクトに必要な依存関係を含める必要があります。Java ベースの Kafka クライアントの場合は、Kafka クライアント JAR が含まれます。この JAR ファイルには、クライアントの構築と実行に必要な Kafka ライブラリーが含まれています。
Maven プロジェクトの pom.xml
ファイルに依存関係を追加する方法は、「Kafka クライアントの依存関係を Maven プロジェクトに追加する」 を参照してください。
Configuration
Consumer
クラスで指定された次の定数を使用してコンシューマーアプリケーションを設定できます。
BOOTSTRAP_SERVERS
- Kafka ブローカーに接続するためのアドレスおよびポート。
GROUP_ID
- コンシューマーグループの識別子。
POLL_TIMEOUT_MS
- 各ポーリング中に新しいメッセージを待機する最大時間。
TOPIC_NAME
- メッセージを消費する Kafka トピックの名前。
NUM_MESSAGES
- 停止する前に消費するメッセージの数。
PROCESSING_DELAY_MS
- メッセージ送信間の遅延 (ミリ秒単位)。これにより、メッセージの処理時間をシミュレートでき、テストに役立ちます。
コンシューマーアプリケーションの例
import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.NoOffsetForPartitionException; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.RebalanceInProgressException; import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.LongDeserializer; import static java.time.Duration.ofMillis; import static java.util.Collections.singleton; public class Consumer implements ConsumerRebalanceListener, OffsetCommitCallback { private static final String BOOTSTRAP_SERVERS = "localhost:9092"; private static final String GROUP_ID = "my-group"; private static final long POLL_TIMEOUT_MS = 1_000L; private static final String TOPIC_NAME = "my-topic"; private static final long NUM_MESSAGES = 50; private static final long PROCESSING_DELAY_MS = 1_000L; private KafkaConsumer<Long, byte[]> kafkaConsumer; protected AtomicLong messageCount = new AtomicLong(0); private Map<TopicPartition, OffsetAndMetadata> pendingOffsets = new HashMap<>(); public static void main(String[] args) { new Consumer().run(); } public void run() { System.out.println("Running consumer"); try (var consumer = createKafkaConsumer()) { 1 kafkaConsumer = consumer; consumer.subscribe(singleton(TOPIC_NAME), this); 2 System.out.printf("Subscribed to %s%n", TOPIC_NAME); while (messageCount.get() < NUM_MESSAGES) { 3 try { ConsumerRecords<Long, byte[]> records = consumer.poll(ofMillis(POLL_TIMEOUT_MS)); 4 if (!records.isEmpty()) { 5 for (ConsumerRecord<Long, byte[]> record : records) { System.out.printf("Record fetched from %s-%d with offset %d%n", record.topic(), record.partition(), record.offset()); sleep(PROCESSING_DELAY_MS); 6 pendingOffsets.put(new TopicPartition(record.topic(), record.partition()), 7 new OffsetAndMetadata(record.offset() + 1, null)); if (messageCount.incrementAndGet() == NUM_MESSAGES) { break; } } consumer.commitAsync(pendingOffsets, this); 8 pendingOffsets.clear(); } } catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) { 9 System.out.println("Invalid or no offset found, and auto.reset.policy unset, using latest"); consumer.seekToEnd(e.partitions()); consumer.commitSync(); } catch (Exception e) { System.err.println(e.getMessage()); if (!retriable(e)) { e.printStackTrace(); System.exit(1); } } } } } private KafkaConsumer<Long, byte[]> createKafkaConsumer() { Properties props = new Properties(); 10 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); 11 props.put(ConsumerConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID()); 12 props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID); 13 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); 14 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); 15 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 16 return new KafkaConsumer<>(props); } private void sleep(long ms) { 17 try { TimeUnit.MILLISECONDS.sleep(ms); } catch (InterruptedException e) { throw new RuntimeException(e); } } private boolean retriable(Exception e) { 18 if (e == null) { return false; } else if (e instanceof IllegalArgumentException || e instanceof UnsupportedOperationException || !(e instanceof RebalanceInProgressException) || !(e instanceof RetriableException)) { return false; } else { return true; } } @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { 19 System.out.printf("Assigned partitions: %s%n", partitions); } @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { 20 System.out.printf("Revoked partitions: %s%n", partitions); kafkaConsumer.commitSync(pendingOffsets); pendingOffsets.clear(); } @Override public void onPartitionsLost(Collection<TopicPartition> partitions) { 21 System.out.printf("Lost partitions: {}", partitions); } @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) { 22 if (e != null) { System.err.println("Failed to commit offsets"); if (!retriable(e)) { e.printStackTrace(); System.exit(1); } } } }
- 1
- クライアントは、
createKafkaConsumer
メソッドを使用して Kafka コンシューマーを作成します。 - 2
- コンシューマーは特定のトピックをサブスクライブします。トピックをサブスクライブすると、確認メッセージが出力されます。
- 3
- 消費されるメッセージの最大数は、
NUM_MESSAGES
定数値によって決まります。 - 4
- メッセージを取得するための次のポーリングは、リバランスを避けるために
session.timeout.ms
以内に呼び出す必要があります。 - 5
- Kafka から取得したバッチメッセージを含む
records
オブジェクトが空でないことを確認する条件。records
オブジェクトが空の場合、処理する新しいメッセージがないため、プロセスはスキップされます。 - 6
- 指定されたミリ秒数の間、メッセージ取得プロセスに遅延を導入するメソッド。
- 7
- コンシューマーは、
pendingOffsets
マップを使用して、コミットする必要がある、消費されたメッセージのオフセットを保存します。 - 8
- メッセージのバッチを処理した後、コンシューマーは
commitAsync
メソッドを使用してオフセットを非同期的にコミットし、少なくとも 1 回のセマンティクスを実装します。 - 9
- メッセージを消費し、自動リセットポリシーが設定されていない場合に、致命的でないエラーと致命的なエラーを処理するための捕捉。致命的ではないエラーの場合、コンシューマーはパーティションの末尾までシークし、利用可能な最新のオフセットから消費を開始します。例外を再試行できない場合は、スタックトレースが出力され、コンシューマーは終了します。
- 10
- クライアントは、提供された設定を使用して
KafkaConsumer
インスタンスを作成します。プロパティーファイルを使用することも、設定を直接追加することもできます。基本設定の詳細については、4章Kafka クラスターに接続するためのクライアントアプリケーションの設定 を参照してください。 - 11
- Kafka ブローカーへの接続。
- 12
- ランダムに生成された UUID を使用するプロデューサーの一意のクライアント ID。クライアント ID は必須ではありませんが、リクエストのソースを追跡するのに役立ちます。
- 13
- コンシューマーがパーティションへの割り当てを調整するためのグループ ID。
- 14
- キーおよび値をバイト配列として処理するための適切なデシリアライザークラス。
- 15
- 自動オフセットコミットを無効にする設定。
- 16
- パーティションに対してコミットされたオフセットが見つからない場合に、コンシューマーが最も古い利用可能なオフセットからメッセージの消費を開始するための設定。
- 17
- メッセージを消費するプロセスに指定されたミリ秒数の遅延を導入するメソッド。メッセージの送信を担当するスレッドが一時停止中に中断されると、
InterruptedException
エラーがスローされます。 - 18
- 例外の後にメッセージのコミットを再試行するかどうかを確認するメソッド。Null 例外と指定された例外は再試行されません。また、
RebalanceInProgressException
またはRetriableException
インターフェイスを実装していない例外も再試行されません。このメソッドをカスタマイズして、他のエラーを含めることができます。 - 19
- コンシューマーに割り当てられているパーティションのリストを示すメッセージをコンソールに出力するメソッド。
- 20
- コンシューマーグループのリバランス中にコンシューマーがパーティションの所有権を失いそうなときに呼び出されるメソッド。このメソッドは、コンシューマーから取り消されるパーティションのリストを出力します。保留中のオフセットはすべてコミットされます。
- 21
- コンシューマーグループのリバランス中にコンシューマーがパーティションの所有権を失ったが、保留中のオフセットをコミットできなかった場合に呼び出されるメソッド。このメソッドは、コンシューマーによって失われたパーティションのリストを出力します。
- 22
- コンシューマーがオフセットを Kafka にコミットするときに呼び出されるメソッド。オフセットのコミット時にエラーが発生した場合は、エラーメッセージが出力されます。このメソッドは例外をチェックし、それが致命的なエラーであるか致命的ではないエラーであるかに基づいて適切なアクションを実行します。エラーが致命的ではない場合、オフセットコミットプロセスは続行されます。エラーが致命的である場合、スタックトレースが出力され、コンシューマーは終了します。
エラー処理
コンシューマーアプリケーションによって捕捉された致命的な例外:
InterruptedException
-
一時停止中に現在のスレッドが中断された場合にスローされるエラー。中断は通常、コンシューマーを停止またはシャットダウンするときに発生します。例外は
RuntimeException
として再スローされ、コンシューマーを終了します。 IllegalArgumentException
- コンシューマーが無効または不適切な引数を受け取ったときにスローされるエラー。たとえば、トピックが欠落している場合、例外がスローされます。
UnsupportedOperationException
-
操作がサポートされていない場合、またはメソッドが実装されていない場合にスローされるエラー。たとえば、サポートされていないコンシューマー設定を使用しようとしたり、
KafkaConsumer
クラスでサポートされていないメソッドを呼び出そうとした場合、例外がスローされます。
コンシューマーアプリケーションによって捕捉された致命的ではない例外:
OffsetOutOfRangeException
-
通常は、オフセットが対象のパーティションの有効なオフセット範囲外にある場合や、自動リセットポリシーが有効でない場合など、コンシューマーがパーティションに無効なオフセットがないか検索しようとするとエラーが出力されます。回復するには、コンシューマーはパーティションの最後まで検索してオフセットを同期的にコミットします (
commitSync
)。自動リセットポリシーが有効な場合、コンシューマーは設定に応じてパーティションの先頭または末尾を検索します。 NoOffsetForPartitionException
-
パーティションにコミットされたオフセットがない場合、または要求されたオフセットが無効で、自動リセットポリシーが有効になっていない場合に出力されるエラー。回復するには、コンシューマーはパーティションの最後まで検索してオフセットを同期的にコミットします (
commitSync
)。自動リセットポリシーが有効な場合、コンシューマーは設定に応じてパーティションの先頭または末尾を検索します。 RebalanceInProgressException
- コンシューマーグループのリバランス中にパーティションが割り当てられているときにスローされるエラー。コンシューマーがリバランスを実行しているときは、オフセットコミットを完了できません。
RetriableException
-
Kafka クライアントライブラリーによって提供される
RetriableException
インターフェイスを実装する例外に対してスローされるエラー。
致命的ではないエラーの場合、コンシューマーはメッセージの処理を続行します。