4.2. 基本的なコンシューマークライアントの設定
コンシューマークライアントを開発する場合は、次のように設定します。
- Kafka クラスターへの接続
- Kafka ブローカーから取得したバイトを、クライアントアプリケーションが理解できるメッセージキーに変換するデシリアライザー
- Kafka ブローカーから取得したバイトを、クライアントアプリケーションが理解できるメッセージ値に変換するデシリアライザー
通常、コンシューマーグループ ID も追加して、コンシューマーをコンシューマーグループに関連付けます。コンシューマーグループは、1 つ以上のトピックからの大きなデータストリームの処理を並列コンシューマーに分散するための論理エンティティーです。コンシューマーは group.id
でグループ化され、メッセージをメンバー全体に分散できます。特定のコンシューマーグループでは、各トピックパーティションが単一のコンシューマーによって読み取られます。1 人のコンシューマーが多くのパーティションを処理できます。並列処理を最大限に高めるには、パーティションごとに 1 つのコンシューマーを作成します。パーティションより多くのコンシューマーが存在する場合、一部のコンシューマーはアイドル状態のままになり、障害が発生した場合に引き継げるように準備が整っています。
基本的なコンシューマークライアント設定プロパティー
client.id = my-consumer-id 1 group.id = my-group-id 2 bootstrap.servers = my-cluster-kafka-bootstrap:9092 3 key.deserializer = org.apache.kafka.common.serialization.StringDeserializer 4 value.deserializer = org.apache.kafka.common.serialization.StringDeserializer 5
コンシューマークライアント設定をコードに直接追加する
Properties props = new Properties(); props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "my-consumer-id"); props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-group-id"); props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "my-cluster-kafka-bootstrap:9092"); props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
KafkaConsumer
は、受信するメッセージの文字列キーと値のタイプを指定します。使用されるシリアライザーは、Kafka から受信したバイトを指定された型に変換できる必要があります。
各コンシューマーグループには一意の group.id
が必要です。同じ group.id
を持つコンシューマーを再起動すると、停止する前に中断したところからメッセージの消費が再開されます。