6.2. Kafka コンシューマーアプリケーションの例


この Java ベースの Kafka コンシューマーアプリケーションは、Kafka トピックからのメッセージを消費する自己完結型アプリケーションの例です。クライアントは、Kafka Consumer API を使用して、指定されたトピックからメッセージを非同期に取得して処理し、いくつかのエラー処理を行います。メッセージが正常に処理された後にオフセットをコミットすることにより、少なくとも 1 回のセマンティクスに従います。

クライアントは、パーティション処理のための ConsumerRebalanceListener インターフェイスと、オフセットをコミットするための OffsetCommitCallback インターフェイスを実装します。

Kafka コンシューマーアプリケーションを実行するには、Consumer クラスの main メソッドを実行します。クライアントは、NUM_MESSAGES メッセージ (設定例では 50) が消費されるまで、Kafka トピックからのメッセージを消費します。コンシューマーは、複数のスレッドが同時に安全にアクセスできるように設計されていません。

このサンプルクライアントは、特定のユースケース向けに、より複雑な Kafka コンシューマーを構築するための基本基盤を提供します。セキュアな接続の実装 など、追加の機能を組み込むことができます。

前提条件

  • 指定された BOOTSTRAP_SERVERS で実行されている Kafka ブローカー
  • メッセージが消費される TOPIC_NAME という名前の Kafka トピック。
  • クライアント依存関係

Kafka コンシューマーアプリケーションを実装する前に、プロジェクトに必要な依存関係を含める必要があります。Java ベースの Kafka クライアントの場合は、Kafka クライアント JAR が含まれます。この JAR ファイルには、クライアントの構築と実行に必要な Kafka ライブラリーが含まれています。

Maven プロジェクトの pom.xml ファイルに依存関係を追加する方法は、「Kafka クライアントの依存関係を Maven プロジェクトに追加する」 を参照してください。

Configuration

Consumer クラスで指定された次の定数を使用してコンシューマーアプリケーションを設定できます。

BOOTSTRAP_SERVERS
Kafka ブローカーに接続するためのアドレスおよびポート。
GROUP_ID
コンシューマーグループの識別子。
POLL_TIMEOUT_MS
各ポーリング中に新しいメッセージを待機する最大時間。
TOPIC_NAME
メッセージを消費する Kafka トピックの名前。
NUM_MESSAGES
停止する前に消費するメッセージの数。
PROCESSING_DELAY_MS
メッセージ送信間の遅延 (ミリ秒単位)。これにより、メッセージの処理時間をシミュレートでき、テストに役立ちます。

コンシューマーアプリケーションの例

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.LongDeserializer;

import static java.time.Duration.ofMillis;
import static java.util.Collections.singleton;

public class Consumer implements ConsumerRebalanceListener, OffsetCommitCallback {
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    private static final String GROUP_ID = "my-group";
    private static final long POLL_TIMEOUT_MS = 1_000L;
    private static final String TOPIC_NAME = "my-topic";
    private static final long NUM_MESSAGES = 50;
    private static final long PROCESSING_DELAY_MS = 1_000L;

    private KafkaConsumer<Long, byte[]> kafkaConsumer;
    protected AtomicLong messageCount = new AtomicLong(0);
    private Map<TopicPartition, OffsetAndMetadata> pendingOffsets = new HashMap<>();

    public static void main(String[] args) {
        new Consumer().run();
    }

    public void run() {
        System.out.println("Running consumer");
        try (var consumer = createKafkaConsumer()) { 1
            kafkaConsumer = consumer;
            consumer.subscribe(singleton(TOPIC_NAME), this); 2
            System.out.printf("Subscribed to %s%n", TOPIC_NAME);
            while (messageCount.get() < NUM_MESSAGES) { 3
                try {
                    ConsumerRecords<Long, byte[]> records = consumer.poll(ofMillis(POLL_TIMEOUT_MS)); 4
                    if (!records.isEmpty()) { 5
                        for (ConsumerRecord<Long, byte[]> record : records) {
                            System.out.printf("Record fetched from %s-%d with offset %d%n",
                                record.topic(), record.partition(), record.offset());
                            sleep(PROCESSING_DELAY_MS); 6

                            pendingOffsets.put(new TopicPartition(record.topic(), record.partition()), 7
                                new OffsetAndMetadata(record.offset() + 1, null));
                            if (messageCount.incrementAndGet() == NUM_MESSAGES) {
                                break;
                            }
                        }
                        consumer.commitAsync(pendingOffsets, this); 8
                        pendingOffsets.clear();
                    }
                } catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) { 9
                    System.out.println("Invalid or no offset found, and auto.reset.policy unset, using latest");
                    consumer.seekToEnd(e.partitions());
                    consumer.commitSync();
                } catch (Exception e) {
                    System.err.println(e.getMessage());
                    if (!retriable(e)) {
                        e.printStackTrace();
                        System.exit(1);
                    }
                }
            }
        }
    }

    private KafkaConsumer<Long, byte[]> createKafkaConsumer() {
        Properties props = new Properties(); 10
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); 11
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID()); 12
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID); 13
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); 14
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); 15
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 16
        return new KafkaConsumer<>(props);
    }

    private void sleep(long ms) { 17
        try {
            TimeUnit.MILLISECONDS.sleep(ms);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private boolean retriable(Exception e) { 18
        if (e == null) {
            return false;
        } else if (e instanceof IllegalArgumentException
            || e instanceof UnsupportedOperationException
            || !(e instanceof RebalanceInProgressException)
            || !(e instanceof RetriableException)) {
            return false;
        } else {
            return true;
        }
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) { 19
        System.out.printf("Assigned partitions: %s%n", partitions);
    }

    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) { 20
        System.out.printf("Revoked partitions: %s%n", partitions);
        kafkaConsumer.commitSync(pendingOffsets);
        pendingOffsets.clear();
    }

    @Override
    public void onPartitionsLost(Collection<TopicPartition> partitions) { 21
        System.out.printf("Lost partitions: {}", partitions);
    }

    @Override
    public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) { 22
        if (e != null) {
            System.err.println("Failed to commit offsets");
            if (!retriable(e)) {
                e.printStackTrace();
                System.exit(1);
            }
        }
    }
}

1
クライアントは、createKafkaConsumer メソッドを使用して Kafka コンシューマーを作成します。
2
コンシューマーは特定のトピックをサブスクライブします。トピックをサブスクライブすると、確認メッセージが出力されます。
3
消費されるメッセージの最大数は、NUM_MESSAGES 定数値によって決まります。
4
メッセージを取得するための次のポーリングは、リバランスを避けるために session.timeout.ms 以内に呼び出す必要があります。
5
Kafka から取得したバッチメッセージを含む records オブジェクトが空でないことを確認する条件。records オブジェクトが空の場合、処理する新しいメッセージがないため、プロセスはスキップされます。
6
指定されたミリ秒数の間、メッセージ取得プロセスに遅延を導入するメソッド。
7
コンシューマーは、pendingOffsets マップを使用して、コミットする必要がある、消費されたメッセージのオフセットを保存します。
8
メッセージのバッチを処理した後、コンシューマーは commitAsync メソッドを使用してオフセットを非同期的にコミットし、少なくとも 1 回のセマンティクスを実装します。
9
メッセージを消費し、自動リセットポリシーが設定されていない場合に、致命的でないエラーと致命的なエラーを処理するための捕捉。致命的ではないエラーの場合、コンシューマーはパーティションの末尾までシークし、利用可能な最新のオフセットから消費を開始します。例外を再試行できない場合は、スタックトレースが出力され、コンシューマーは終了します。
10
クライアントは、提供された設定を使用して KafkaConsumer インスタンスを作成します。プロパティーファイルを使用することも、設定を直接追加することもできます。基本設定の詳細については、4章Kafka クラスターに接続するためのクライアントアプリケーションの設定 を参照してください。
11
Kafka ブローカーへの接続。
12
ランダムに生成された UUID を使用するプロデューサーの一意のクライアント ID。クライアント ID は必須ではありませんが、リクエストのソースを追跡するのに役立ちます。
13
コンシューマーがパーティションへの割り当てを調整するためのグループ ID。
14
キーおよび値をバイト配列として処理するための適切なデシリアライザークラス。
15
自動オフセットコミットを無効にする設定。
16
パーティションに対してコミットされたオフセットが見つからない場合に、コンシューマーが最も古い利用可能なオフセットからメッセージの消費を開始するための設定。
17
メッセージを消費するプロセスに指定されたミリ秒数の遅延を導入するメソッド。メッセージの送信を担当するスレッドが一時停止中に中断されると、InterruptedException エラーがスローされます。
18
例外の後にメッセージのコミットを再試行するかどうかを確認するメソッド。Null 例外と指定された例外は再試行されません。また、RebalanceInProgressException または RetriableException インターフェイスを実装していない例外も再試行されません。このメソッドをカスタマイズして、他のエラーを含めることができます。
19
コンシューマーに割り当てられているパーティションのリストを示すメッセージをコンソールに出力するメソッド。
20
コンシューマーグループのリバランス中にコンシューマーがパーティションの所有権を失いそうなときに呼び出されるメソッド。このメソッドは、コンシューマーから取り消されるパーティションのリストを出力します。保留中のオフセットはすべてコミットされます。
21
コンシューマーグループのリバランス中にコンシューマーがパーティションの所有権を失ったが、保留中のオフセットをコミットできなかった場合に呼び出されるメソッド。このメソッドは、コンシューマーによって失われたパーティションのリストを出力します。
22
コンシューマーがオフセットを Kafka にコミットするときに呼び出されるメソッド。オフセットのコミット時にエラーが発生した場合は、エラーメッセージが出力されます。このメソッドは例外をチェックし、それが致命的なエラーであるか致命的ではないエラーであるかに基づいて適切なアクションを実行します。エラーが致命的ではない場合、オフセットコミットプロセスは続行されます。エラーが致命的である場合、スタックトレースが出力され、コンシューマーは終了します。

エラー処理

コンシューマーアプリケーションによって捕捉された致命的な例外:

InterruptedException
一時停止中に現在のスレッドが中断された場合にスローされるエラー。中断は通常、コンシューマーを停止またはシャットダウンするときに発生します。例外は RuntimeException として再スローされ、コンシューマーを終了します。
IllegalArgumentException
コンシューマーが無効または不適切な引数を受け取ったときにスローされるエラー。たとえば、トピックが欠落している場合、例外がスローされます。
UnsupportedOperationException
操作がサポートされていない場合、またはメソッドが実装されていない場合にスローされるエラー。たとえば、サポートされていないコンシューマー設定を使用しようとしたり、KafkaConsumer クラスでサポートされていないメソッドを呼び出そうとした場合、例外がスローされます。

コンシューマーアプリケーションによって捕捉された致命的ではない例外:

OffsetOutOfRangeException
通常は、オフセットが対象のパーティションの有効なオフセット範囲外にある場合や、自動リセットポリシーが有効でない場合など、コンシューマーがパーティションに無効なオフセットがないか検索しようとするとエラーが出力されます。回復するには、コンシューマーはパーティションの最後まで検索してオフセットを同期的にコミットします (commitSync)。自動リセットポリシーが有効な場合、コンシューマーは設定に応じてパーティションの先頭または末尾を検索します。
NoOffsetForPartitionException
パーティションにコミットされたオフセットがない場合、または要求されたオフセットが無効で、自動リセットポリシーが有効になっていない場合に出力されるエラー。回復するには、コンシューマーはパーティションの最後まで検索してオフセットを同期的にコミットします (commitSync)。自動リセットポリシーが有効な場合、コンシューマーは設定に応じてパーティションの先頭または末尾を検索します。
RebalanceInProgressException
コンシューマーグループのリバランス中にパーティションが割り当てられているときにスローされるエラー。コンシューマーがリバランスを実行しているときは、オフセットコミットを完了できません。
RetriableException
Kafka クライアントライブラリーによって提供される RetriableException インターフェイスを実装する例外に対してスローされるエラー。

致命的ではないエラーの場合、コンシューマーはメッセージの処理を続行します。

Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.