7.3.3. トピックおよびパーティションへのメッセージの作成
Kafka Bridge をデプロイし、そのサービスを公開した後、topics エンドポイントを使用して、トピックへのメッセージを JSON 形式で生成できます。以下に示すように、メッセージの宛先パーティションをリクエスト本文に指定できます。partitions エンドポイントは、全メッセージの単一の宛先パーティションをパスパラメーターとして指定する代替方法を提供します。
手順
テキストエディターを使用して、3 つのパーティションがある Kafka トピックの YAML 定義を作成します。
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: name: bridge-quickstart-topic labels: strimzi.io/cluster: <kafka-cluster-name>1 spec: partitions: 32 replicas: 1 config: retention.ms: 7200000 segment.bytes: 1073741824-
ファイルを
bridge-quickstart-topic.yamlとしてexamples/topicディレクトリーに保存します。 OpenShift クラスターにトピックを作成します。
oc apply -f examples/topic/bridge-quickstart-topic.yamlKafka Bridge を使用して、作成したトピックに 3 つのメッセージを生成します。
curl -X POST \ http://localhost:8080/topics/bridge-quickstart-topic \ -H 'content-type: application/vnd.kafka.json.v2+json' \ -d '{ "records": [ { "key": "my-key", "value": "sales-lead-0001" }, { "value": "sales-lead-0002", "partition": 2 }, { "value": "sales-lead-0003" } ] }'-
sales-lead-0001は、キーのハッシュに基づいてパーティションに送信されます。 -
sales-lead-0002は、パーティション 2 に直接送信されます。 -
sales-lead-0003は、ラウンドロビン方式を使用してbridge-quickstart-topicトピックのパーティションに送信されます。
-
リクエストが正常に行われると、Kafka Bridge は
offsetsアレイを200コードとapplication/vnd.kafka.v2+jsonのcontent-typeヘッダーとともに返します。各メッセージで、offsetsアレイは以下を記述します。- メッセージが送信されたパーティション。
パーティションの現在のメッセージオフセット。
応答例
#... { "offsets":[ { "partition":0, "offset":0 }, { "partition":2, "offset":0 }, { "partition":0, "offset":1 } ] }
追加のトピック要求
他の curl 要求を実行して、トピックおよびパーティションに関する情報を検索します。
- トピックの一覧表示
curl -X GET \ http://localhost:8080/topics応答例
[ "__strimzi_store_topic", "__strimzi-topic-operator-kstreams-topic-store-changelog", "bridge-quickstart-topic", "my-topic" ]- トピック設定およびパーティションの詳細の取得
curl -X GET \ http://localhost:8080/topics/bridge-quickstart-topic応答例
{ "name": "bridge-quickstart-topic", "configs": { "compression.type": "producer", "leader.replication.throttled.replicas": "", "min.insync.replicas": "1", "message.downconversion.enable": "true", "segment.jitter.ms": "0", "cleanup.policy": "delete", "flush.ms": "9223372036854775807", "follower.replication.throttled.replicas": "", "segment.bytes": "1073741824", "retention.ms": "604800000", "flush.messages": "9223372036854775807", "message.format.version": "2.8-IV1", "max.compaction.lag.ms": "9223372036854775807", "file.delete.delay.ms": "60000", "max.message.bytes": "1048588", "min.compaction.lag.ms": "0", "message.timestamp.type": "CreateTime", "preallocate": "false", "index.interval.bytes": "4096", "min.cleanable.dirty.ratio": "0.5", "unclean.leader.election.enable": "false", "retention.bytes": "-1", "delete.retention.ms": "86400000", "segment.ms": "604800000", "message.timestamp.difference.max.ms": "9223372036854775807", "segment.index.bytes": "10485760" }, "partitions": [ { "partition": 0, "leader": 0, "replicas": [ { "broker": 0, "leader": true, "in_sync": true }, { "broker": 1, "leader": false, "in_sync": true }, { "broker": 2, "leader": false, "in_sync": true } ] }, { "partition": 1, "leader": 2, "replicas": [ { "broker": 2, "leader": true, "in_sync": true }, { "broker": 0, "leader": false, "in_sync": true }, { "broker": 1, "leader": false, "in_sync": true } ] }, { "partition": 2, "leader": 1, "replicas": [ { "broker": 1, "leader": true, "in_sync": true }, { "broker": 2, "leader": false, "in_sync": true }, { "broker": 0, "leader": false, "in_sync": true } ] } ] }- 特定のトピックのパーティションを一覧表示する
curl -X GET \ http://localhost:8080/topics/bridge-quickstart-topic/partitions応答例
[ { "partition": 0, "leader": 0, "replicas": [ { "broker": 0, "leader": true, "in_sync": true }, { "broker": 1, "leader": false, "in_sync": true }, { "broker": 2, "leader": false, "in_sync": true } ] }, { "partition": 1, "leader": 2, "replicas": [ { "broker": 2, "leader": true, "in_sync": true }, { "broker": 0, "leader": false, "in_sync": true }, { "broker": 1, "leader": false, "in_sync": true } ] }, { "partition": 2, "leader": 1, "replicas": [ { "broker": 1, "leader": true, "in_sync": true }, { "broker": 2, "leader": false, "in_sync": true }, { "broker": 0, "leader": false, "in_sync": true } ] } ]- 特定のトピックパーティションの詳細を一覧表示します。
curl -X GET \ http://localhost:8080/topics/bridge-quickstart-topic/partitions/0応答例
{ "partition": 0, "leader": 0, "replicas": [ { "broker": 0, "leader": true, "in_sync": true }, { "broker": 1, "leader": false, "in_sync": true }, { "broker": 2, "leader": false, "in_sync": true } ] }- 特定のトピックパーティションのオフセットを一覧表示します。
curl -X GET \ http://localhost:8080/topics/bridge-quickstart-topic/partitions/0/offsets応答例
{ "beginning_offset": 0, "end_offset": 1 }
次のステップ
トピックおよびパーティションへのメッセージを作成したら、Kafka Bridge コンシューマーを作成します。