2.3. トピックおよびパーティションへのメッセージの作成
Kafka Bridge で、トピックエンドポイントを使用して Kafka トピックへのメッセージを JSON 形式で生成します。
topics エンドポイントを使用して、トピックへのメッセージを JSON 形式で生成できます。リクエスト本文でメッセージの宛先パーティションを指定できます。partitions エンドポイントは、すべてのメッセージに対して単一の宛先パーティションをパスパラメーターとして指定するための代替方法を提供します。
この手順では、メッセージが bridge-quickstart-topic
と呼ばれるトピックに生成されます。
前提条件
Kafka クラスターにはトピックが 1 つあり、そのトピックが 3 つのパーティションに分割されている。
kafka-topics.sh
ユーティリティーを使用してトピックを作成できる。3 つのパーティションを使用したトピック作成の例
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic bridge-quickstart-topic --partitions 3 --replication-factor 1
トピックが作成されたことの確認
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic bridge-quickstart-topic
OpenShift に Streams for Apache Kafka をデプロイした場合は、KafkaTopic
カスタムリソースを使用してトピックを作成できます。
手順
Kafka Bridge を使用して、作成したトピックに 3 つのメッセージを生成します。
curl -X POST \ http://localhost:8080/topics/bridge-quickstart-topic \ -H 'content-type: application/vnd.kafka.json.v2+json' \ -d '{ "records": [ { "key": "my-key", "value": "sales-lead-0001" }, { "value": "sales-lead-0002", "partition": 2 }, { "value": "sales-lead-0003" } ] }'
-
sales-lead-0001
は、キーのハッシュに基づいてパーティションに送信されます。 -
sales-lead-0002
は、パーティション 2 に直接送信されます。 -
sales-lead-0003
は、ラウンドロビン方式を使用してbridge-quickstart-topic
トピックのパーティションに送信されます。
-
リクエストが正常に行われると、Kafka Bridge は
offsets
配列を200
コードとapplication/vnd.kafka.v2+json
のcontent-type
ヘッダーとともに返します。各メッセージで、offsets
配列は以下を記述します。- メッセージが送信されたパーティション。
パーティションの現在のメッセージオフセット。
レスポンス例
#... { "offsets":[ { "partition":0, "offset":0 }, { "partition":2, "offset":0 }, { "partition":0, "offset":1 } ] }
追加のトピックリクエスト
他の curl リクエストを実行して、トピックおよびパーティションに関する情報を見つけます。
- トピックのリスト表示
curl -X GET \ http://localhost:8080/topics
レスポンス例
[ "__strimzi_store_topic", "__strimzi-topic-operator-kstreams-topic-store-changelog", "bridge-quickstart-topic", "my-topic" ]
- トピック設定およびパーティションの詳細の取得
curl -X GET \ http://localhost:8080/topics/bridge-quickstart-topic
レスポンス例
{ "name": "bridge-quickstart-topic", "configs": { "compression.type": "producer", "leader.replication.throttled.replicas": "", "min.insync.replicas": "1", "message.downconversion.enable": "true", "segment.jitter.ms": "0", "cleanup.policy": "delete", "flush.ms": "9223372036854775807", "follower.replication.throttled.replicas": "", "segment.bytes": "1073741824", "retention.ms": "604800000", "flush.messages": "9223372036854775807", "message.format.version": "2.8-IV1", "max.compaction.lag.ms": "9223372036854775807", "file.delete.delay.ms": "60000", "max.message.bytes": "1048588", "min.compaction.lag.ms": "0", "message.timestamp.type": "CreateTime", "preallocate": "false", "index.interval.bytes": "4096", "min.cleanable.dirty.ratio": "0.5", "unclean.leader.election.enable": "false", "retention.bytes": "-1", "delete.retention.ms": "86400000", "segment.ms": "604800000", "message.timestamp.difference.max.ms": "9223372036854775807", "segment.index.bytes": "10485760" }, "partitions": [ { "partition": 0, "leader": 0, "replicas": [ { "broker": 0, "leader": true, "in_sync": true }, { "broker": 1, "leader": false, "in_sync": true }, { "broker": 2, "leader": false, "in_sync": true } ] }, { "partition": 1, "leader": 2, "replicas": [ { "broker": 2, "leader": true, "in_sync": true }, { "broker": 0, "leader": false, "in_sync": true }, { "broker": 1, "leader": false, "in_sync": true } ] }, { "partition": 2, "leader": 1, "replicas": [ { "broker": 1, "leader": true, "in_sync": true }, { "broker": 2, "leader": false, "in_sync": true }, { "broker": 0, "leader": false, "in_sync": true } ] } ] }
- 特定のトピックのパーティションのリスト表示
curl -X GET \ http://localhost:8080/topics/bridge-quickstart-topic/partitions
レスポンス例
[ { "partition": 0, "leader": 0, "replicas": [ { "broker": 0, "leader": true, "in_sync": true }, { "broker": 1, "leader": false, "in_sync": true }, { "broker": 2, "leader": false, "in_sync": true } ] }, { "partition": 1, "leader": 2, "replicas": [ { "broker": 2, "leader": true, "in_sync": true }, { "broker": 0, "leader": false, "in_sync": true }, { "broker": 1, "leader": false, "in_sync": true } ] }, { "partition": 2, "leader": 1, "replicas": [ { "broker": 1, "leader": true, "in_sync": true }, { "broker": 2, "leader": false, "in_sync": true }, { "broker": 0, "leader": false, "in_sync": true } ] } ]
- 特定のトピックパーティションの詳細のリスト表示
curl -X GET \ http://localhost:8080/topics/bridge-quickstart-topic/partitions/0
レスポンス例
{ "partition": 0, "leader": 0, "replicas": [ { "broker": 0, "leader": true, "in_sync": true }, { "broker": 1, "leader": false, "in_sync": true }, { "broker": 2, "leader": false, "in_sync": true } ] }
- 特定のトピックパーティションのオフセットのリスト表示
curl -X GET \ http://localhost:8080/topics/bridge-quickstart-topic/partitions/0/offsets
レスポンス例
{ "beginning_offset": 0, "end_offset": 1 }
次のステップ
トピックおよびパーティションへのメッセージを作成したので、Kafka Bridge コンシューマーを作成します。