12.2.5. Kafka Bridge コンシューマーからの最新メッセージの取得
records エンドポイントからデータをリクエストして、Kafka Bridge コンシューマーから最新のメッセージを取得します。実稼働環境では、HTTP クライアントはこのエンドポイントを繰り返し (ループで) 呼び出すことができます。
手順
- 「トピックおよびパーティションへのメッセージの生成」の説明に従い、Kafka Bridge コンシューマーに新たなメッセージを生成します。
GET
リクエストをrecords
エンドポイントに送信します。curl -X GET http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/records \ -H 'accept: application/vnd.kafka.json.v2+json'
Kafka Bridge コンシューマーを作成し、サブスクライブすると、最初の GET リクエストによって空の応答が返されます。これは、ポーリング操作はリバランスプロセスをトリガーしてパーティションを割り当てます。
手順 2 を繰り返し、Kafka Bridge コンシューマーからメッセージを取得します。
Kafka Bridge は、
200
(OK)コードとともに、トピック名、キー、値、パーティション、および offset with the response body、および offset ボディーに messages ProcessingMode- の配列を返します。メッセージはデフォルトで最新のオフセットから取得されます。HTTP/1.1 200 OK content-type: application/vnd.kafka.json.v2+json #... [ { "topic":"bridge-quickstart-topic", "key":"my-key", "value":"sales-lead-0001", "partition":0, "offset":0 }, { "topic":"bridge-quickstart-topic", "key":null, "value":"sales-lead-0003", "partition":0, "offset":1 }, #...
注記空のレスポンスが返される場合は、「トピックおよびパーティションへのメッセージの生成」の説明に従い、コンシューマーに対して追加のレコードを生成し、メッセージの取得を再試行します。
次のステップ
Kafka Bridge コンシューマーからメッセージを取得したら、ログへのオフセットをコミットします。
その他のリソース
- API リファレンスドキュメントの「GET /consumers/{groupid}/instances/{name}/records」