AMQ Streams Kafka Bridge の使用
AMQ Streams Kafka Bridge を使用した Kafka クラスターへの接続
概要
多様性を受け入れるオープンソースの強化
Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。まずは、マスター (master)、スレーブ (slave)、ブラックリスト (blacklist)、ホワイトリスト (whitelist) の 4 つの用語の置き換えから始めます。これは大規模な取り組みであるため、これらの変更は今後の複数のリリースで段階的に実施されます。詳細は、Red Hat CTO である Chris Wright のメッセージ をご覧ください。
第1章 Kafka Bridge の概要
AMQ Streams Kafka Bridge を使用して、Kafka クラスターに HTTP リクエストを送信します。
Kafka Bridge を使用して、HTTP クライアントアプリケーションを Kafka クラスターと統合できます。
HTTP クライアント統合
1.1. Kafka Bridge の実行
AMQ Streams Kafka Bridge をインストールして、Kafka クラスターと同じ環境で実行します。
Kafka Bridge インストールアーティファクトをダウンロードしてホストマシンに追加できます。ローカル環境で Kafka Bridge を試すには、Kafka Bridge クイックスタート を参照してください。
OpenShift に AMQ Streams をデプロイした場合は、AMQ Streams Cluster Operator を使用して、Kafka Bridge を OpenShift クラスターにデプロイできます。Cluster Operator によって OpenShift namespace にデプロイされた実行中の Kafka クラスターが必要です。OpenShift クラスターの外部で Kafka Bridge にアクセスするようにデプロイメントを設定できます。
関連情報
1.2. Kafka Bridge インターフェース
Kafka Bridge では、HTTP ベースのクライアントと Kafka クラスターとの対話を可能にする RESTful インターフェースが提供されます。 また、クライアントアプリケーションによる Kafka プロトコルの変換は必要なく、Web API コネクションの利点が AMQ Streams に提供されます。
API には consumers
と topics
の 2 つの主なリソースがあります。これらのリソースは、Kafka クラスターでコンシューマーおよびプロデューサーと対話するためにエンドポイント経由で公開され、アクセスが可能になります。リソースと関係があるのは Kafka ブリッジのみで、Kafka に直接接続されたコンシューマーやプロデューサーとは関係はありません。
1.2.1. HTTP リクエスト
Kafka Bridge は、以下の方法で Kafka クラスターへの HTTP リクエストをサポートします。
- トピックにメッセージを送信する。
- トピックからメッセージを取得する。
- トピックのパーティションリストを取得する。
- コンシューマーを作成および削除する。
- コンシューマーをトピックにサブスクライブし、このようなトピックからメッセージを受信できるようにする。
- コンシューマーがサブスクライブしているトピックの一覧を取得する。
- トピックからコンシューマーのサブスクライブを解除する。
- パーティションをコンシューマーに割り当てる。
- コンシューマーオフセットの一覧をコミットする。
- パーティションで検索して、コンシューマーが最初または最後のオフセットの位置、または指定のオフセットの位置からメッセージを受信できるようにする。
上記の方法で、JSON 応答と HTTP 応答コードのエラー処理を行います。メッセージは JSON またはバイナリー形式で送信できます。
クライアントは、ネイティブの Kafka プロトコルを使用する必要なくメッセージを生成して使用できます。
1.3. Kafka クラスターへの接続の保護
Kafka Bridge と Kafka クラスターの間で以下を設定できます。
- TLS または SASL ベースの認証
- TLS 暗号化接続
プロパティーファイル を使用して、認証用に Kafka Bridge を設定します。
また、Kafka ブローカーで ACL を使用することで、Kafka Bridge を使用して消費および生成できるトピックを制限することができます。
関連情報
1.4. Kafka Bridge HTTP インターフェイスの保護
HTTP クライアントと Kafka Bridge の間の認証と暗号化は、Kafka Bridge によって直接サポートされていません。クライアントから Kafka Bridge に送信される要求は、認証または暗号化なしで送信されます。リクエストでは、HTTPS ではなく HTTP を使用する必要があります。
Kafka Bridge を次のツールと組み合わせて、保護することができます。
- どの Pod が Kafka Bridge にアクセスできるかを定義するネットワークポリシーとファイアウォール
- リバースプロキシー (OAuth 2.0 など)
- API ゲートウェイ
1.5. Kafka Bridge へのリクエスト
データ形式と HTTP ヘッダーを指定し、有効なリクエストが Kafka Bridge に送信されるようにします。
1.5.1. コンテンツタイプヘッダー
API リクエストおよびレスポンス本文は、常に JSON としてエンコードされます。
コンシューマー操作の実行時に、
POST
リクエストの本文が空でない場合は、以下のContent-Type
ヘッダーが含まれている必要があります。Copy to Clipboard Copied! Toggle word wrap Toggle overflow Content-Type: application/vnd.kafka.v2+json
Content-Type: application/vnd.kafka.v2+json
プロデューサー操作を行う場合、
POST
リクエストには、生成されるメッセージの埋め込みデータ形式を示すContent-Type
ヘッダーを指定する必要があります。これはjson
またはbinary
のいずれかになります。埋め込みデータ形式 Content-Type ヘッダー JSON
Content-Type: application/vnd.kafka.json.v2+json
バイナリー
Content-Type: application/vnd.kafka.binary.v2+json
次のセクションで説明どおり、埋め込みデータ形式はコンシューマーごとに設定されます。
POST
リクエストのボディが空の場合は、Content-Type
を設定しないでください。空のボディーを使用して、デフォルト値のコンシューマーを作成できます。
1.5.2. 埋め込みデータ形式
埋め込みデータ形式は、Kafka メッセージが Kafka Bridge によりプロデューサーからコンシューマーに HTTP で送信される際の形式です。サポートされる埋め込みデータ形式には、JSON とバイナリーの 2 種類があります。
/consumers/groupid
エンドポイントを使用してコンシューマーを作成する場合、POST
リクエスト本文で JSON またはバイナリーいずれかの埋め込みデータ形式を指定する必要があります。これは、以下の例のように format
フィールドで指定します。
{ "name": "my-consumer", "format": "binary", # ... }
{
"name": "my-consumer",
"format": "binary",
# ...
}
- 1
- バイナリー埋め込みデータ形式。
コンシューマーの作成時に指定する埋め込みデータ形式は、コンシューマーが消費する Kafka メッセージのデータ形式と一致する必要があります。
バイナリー埋め込みデータ形式を指定する場合は、以降のプロデューサーリクエストで、リクエスト本文にバイナリーデータが Base64 でエンコードされた文字列として含まれる必要があります。たとえば、/topics/topicname
エンドポイントを使用してメッセージを送信する場合は、records.value
を Base64 でエンコードする必要があります。
{ "records": [ { "key": "my-key", "value": "ZWR3YXJkdGhldGhyZWVsZWdnZWRjYXQ=" }, ] }
{
"records": [
{
"key": "my-key",
"value": "ZWR3YXJkdGhldGhyZWVsZWdnZWRjYXQ="
},
]
}
プロデューサーリクエストには、埋め込みデータ形式に対応する Content-Type
ヘッダーも含まれる必要があります (例: Content-Type: application/vnd.kafka.binary.v2+json
)。
1.5.3. メッセージの形式
/topics
エンドポイントを使用してメッセージを送信する場合は、records
パラメーターのリクエストボディーにメッセージペイロードを入力します。
records
パラメーターには、以下のオプションフィールドを含めることができます。
-
Message
headers
-
Message
key
-
Message
value
-
Destination
partition
/topicsへのPOST
リクエストの例
curl -X POST \ http://localhost:8080/topics/my-topic \ -H 'content-type: application/vnd.kafka.json.v2+json' \ -d '{ "records": [ { "key": "my-key", "value": "sales-lead-0001" "partition": 2 "headers": [ { "key": "key1", "value": "QXBhY2hlIEthZmthIGlzIHRoZSBib21iIQ==" } ] }, ] }'
curl -X POST \
http://localhost:8080/topics/my-topic \
-H 'content-type: application/vnd.kafka.json.v2+json' \
-d '{
"records": [
{
"key": "my-key",
"value": "sales-lead-0001"
"partition": 2
"headers": [
{
"key": "key1",
"value": "QXBhY2hlIEthZmthIGlzIHRoZSBib21iIQ=="
}
]
},
]
}'
- 1
- バイナリー形式のヘッダー値。Base64 としてエンコードされます。
1.5.4. Accept ヘッダー
コンシューマーを作成したら、以降のすべての GET リクエストには Accept
ヘッダーが以下のような形式で含まれる必要があります。
Accept: application/vnd.kafka.EMBEDDED-DATA-FORMAT.v2+json
Accept: application/vnd.kafka.EMBEDDED-DATA-FORMAT.v2+json
EMBEDDED-DATA-FORMAT
は json
または binary
です。
たとえば、サブスクライブされたコンシューマーのレコードを JSON 埋め込みデータ形式で取得する場合、この Accept ヘッダーが含まれるようにします。
Accept: application/vnd.kafka.json.v2+json
Accept: application/vnd.kafka.json.v2+json
1.6. CORS
CORS (Cross-Origin Resource Sharing) を使用すると、Kafka Bridge HTTP の設定で Kafka クラスターにアクセスするために許可されるメソッドおよび元の URL を指定できます。
Kafka Bridge の CORS 設定例
...
# ...
http.cors.enabled=true
http.cors.allowedOrigins=https://strimzi.io
http.cors.allowedMethods=GET,POST,PUT,DELETE,OPTIONS,PATCH
CORS では、異なるドメイン上のオリジンソース間での シンプルな リクエストおよび プリフライト リクエストが可能です。
シンプルなリクエストは、GET
、HEAD
、POST
の各メソッドを使った標準的なリクエストに適しています。
プリフライトリクエストは、実際のリクエストが安全に送信できることを確認する最初のチェックとして HTTP OPTIONS リクエストを送信します。確認時に、実際のリクエストが送信されます。プリフライトリクエストは、PUT
やDELETE
など、より高い安全性が求められるメソッドや、非標準のヘッダーを使用するメソッドに適しています。
すべての要求には、HTTP 要求のソースであるヘッダーの origins 値が必要です。
1.6.1. シンプルなリクエスト
たとえば、この単純なリクエストヘッダーは、オリジンを https://strimzi.io
と指定します。
Origin: https://strimzi.io
Origin: https://strimzi.io
ヘッダー情報がリクエストに追加されます。
curl -v -X GET HTTP-ADDRESS/bridge-consumer/records \ -H 'Origin: https://strimzi.io'\ -H 'content-type: application/vnd.kafka.v2+json'
curl -v -X GET HTTP-ADDRESS/bridge-consumer/records \
-H 'Origin: https://strimzi.io'\
-H 'content-type: application/vnd.kafka.v2+json'
Kafka Bridgeからの応答では、Access-Control-Allow-Origin
ヘッダーが返されます。
HTTP/1.1 200 OK Access-Control-Allow-Origin: *
HTTP/1.1 200 OK
Access-Control-Allow-Origin: *
- 1
- アスタリスク(
*
)を返すと、リソースをどのドメインでもアクセスできることが分かります。
1.6.2. プリフライトリクエスト
最初のプリフライトリクエストは、OPTIONS
メソッドを使ってKafka Bridgeに送信されます。HTTP OPTIONS リクエストはヘッダー情報を送信し、Kafka Bridge が実際のリクエストを許可することを確認します。
ここでは、プリフライトリクエストは https://strimzi.io
から POST
リクエストが有効であることを確認します。
OPTIONS /my-group/instances/my-user/subscription HTTP/1.1 Origin: https://strimzi.io Access-Control-Request-Method: POST Access-Control-Request-Headers: Content-Type
OPTIONS /my-group/instances/my-user/subscription HTTP/1.1
Origin: https://strimzi.io
Access-Control-Request-Method: POST
Access-Control-Request-Headers: Content-Type
OPTIONS
は、プリフライトリクエストのヘッダー情報に追加されます。
curl -v -X OPTIONS -H 'Origin: https://strimzi.io' \ -H 'Access-Control-Request-Method: POST' \ -H 'content-type: application/vnd.kafka.v2+json'
curl -v -X OPTIONS -H 'Origin: https://strimzi.io' \
-H 'Access-Control-Request-Method: POST' \
-H 'content-type: application/vnd.kafka.v2+json'
Kafka Bridge は最初のリクエストに応答し、リクエストが受け入れられることを確認します。応答ヘッダーは、許可されるオリジン、メソッド、およびヘッダーを返します。
HTTP/1.1 200 OK Access-Control-Allow-Origin: https://strimzi.io Access-Control-Allow-Methods: GET,POST,PUT,DELETE,OPTIONS,PATCH Access-Control-Allow-Headers: content-type
HTTP/1.1 200 OK
Access-Control-Allow-Origin: https://strimzi.io
Access-Control-Allow-Methods: GET,POST,PUT,DELETE,OPTIONS,PATCH
Access-Control-Allow-Headers: content-type
オリジンまたはメソッドが拒否されると、エラーメッセージが返されます。
プリフライトリクエストで確認されたため、実際のリクエストには Access-Control-Request-Method
ヘッダーは必要ありませんが、元のヘッダーが必要です。
curl -v -X POST HTTP-ADDRESS/topics/bridge-topic \ -H 'Origin: https://strimzi.io' \ -H 'content-type: application/vnd.kafka.v2+json'
curl -v -X POST HTTP-ADDRESS/topics/bridge-topic \
-H 'Origin: https://strimzi.io' \
-H 'content-type: application/vnd.kafka.v2+json'
応答は、送信元 URL が許可されることを示します。
HTTP/1.1 200 OK Access-Control-Allow-Origin: https://strimzi.io
HTTP/1.1 200 OK
Access-Control-Allow-Origin: https://strimzi.io
関連情報
1.7. Kafka Bridge のロガーの設定
AMQ Streams Kafka ブリッジを使用すると、関連する OpenAPI 仕様で定義される操作ごとに異なるログレベルを設定できます。
各操作には、ブリッジが HTTP クライアントから要求を受信する対応するための API エンドポイントがあります。各エンドポイントのログレベルを変更すると、受信および送信 HTTP 要求に関する詳細なログ情報を作成できます。
ロガーは log4j.properties
ファイルで定義されます。このファイルには healthy
および ready
エンドポイントの以下のデフォルト設定が含まれています。
log4j.logger.http.openapi.operation.healthy=WARN, out log4j.additivity.http.openapi.operation.healthy=false log4j.logger.http.openapi.operation.ready=WARN, out log4j.additivity.http.openapi.operation.ready=false
log4j.logger.http.openapi.operation.healthy=WARN, out
log4j.additivity.http.openapi.operation.healthy=false
log4j.logger.http.openapi.operation.ready=WARN, out
log4j.additivity.http.openapi.operation.ready=false
その他すべての操作のログレベルは、デフォルトで INFO
に設定されます。ロガーは以下のようにフォーマットされます。
log4j.logger.http.openapi.operation.<operation_id>
log4j.logger.http.openapi.operation.<operation_id>
<operation_id>
は、特定の操作の識別子です。
Open API 仕様で定義されている操作のリスト
-
createConsumer
-
deleteConsumer
-
subscribe
-
unsubscribe
-
poll
-
assign
-
commit
-
send
-
sendToPartition
-
seekToBeginning
-
seekToEnd
-
seek
-
healthy
-
ready
-
openapi
第2章 Kafka Bridge クイックスタート
このクイックスタートを使用して、ローカルの開発環境で AMQ Streams の Kafka Bridge を試すことができます。
次の方法を学習します。
- Kafka クラスターのトピックおよびパーティションへのメッセージを生成する。
- Kafka Bridge コンシューマーを作成する。
- 基本的なコンシューマー操作を実行する (たとえば、コンシューマーをトピックにサブスクライブする、生成したメッセージを取得するなど)。
このクイックスタートでは、HTTP リクエストはターミナルにコピーおよび貼り付けできる curl コマンドを使用します。
前提条件を確認し、本章に指定されている順序でタスクを行うようにしてください。
データ形式について
このクイックスタートでは、バイナリーではなく JSON 形式でメッセージを生成および消費します。
クイックスタートの前提条件
- Kafka クラスターがホストマシンで実行しています。
2.1. Kafka Bridge アーカイブのダウンロード
AMQ Streams Kafka Bridge の zip 形式のディストリビューションをダウンロードできます。
手順
- カスタマーポータル から、最新バージョンの AMQ Streams Kafka Bridge アーカイブをダウンロードします。
2.2. Kafka Bridge プロパティーの設定
この手順では、AMQ Streams Kafka Bridge によって使用される Kafka および HTTP 接続プロパティーを設定する方法を説明します。
Kafka 関連のプロパティーに適切な接頭辞を使用して、他の Kafka クライアントと同様に Kafka Bridge を設定します。
-
kafka.
は、サーバー接続やセキュリティーなど、プロデューサーとコンシューマーに適用される一般的な設定用です。 -
kafka.consumer.
は、コンシューマーにのみ渡されるコンシューマー固有の設定用です。 -
kafka.producer.
は、プロデューサーにのみ渡されるプロデューサー固有の設定用です。
HTTP プロパティーは、Kafka クラスターへの HTTP アクセスを有効にする他に、CPRS (Cross-Origin Resource Sharing) により Kafka Bridge のアクセス制御を有効化または定義する機能を提供します。CORS は、複数のオリジンから指定のリソースにブラウザーでアクセスできるようにする HTTP メカニズムです。CORS を設定するには、許可されるリソースオリジンのリストと、それらにアクセスする HTTP メソッドを定義します。リクエストの追加の HTTP ヘッダーには Kafka クラスターへのアクセスが許可される CORS オリジンが記述されています。
手順
AMQ Streams Kafka Bridge のインストールアーカイブにある
application.properties
ファイルを編集します。プロパティーファイルを使用して、Kafka および HTTP 関連のプロパティーを指定し、分散トレースを有効にします。
Kafka コンシューマーおよびプロデューサーに固有のプロパティーなど、標準の Kafka 関連のプロパティーを設定します。
以下を使用します。
-
kafka.bootstrap.servers
: Kafkaクラスターへのホスト/ポート接続を定義。 -
kafka.producer.acks
: HTTP クライアントに確認を提供。 kafka.consumer.auto.offset.reset
: Kafka のオフセットのリセットを管理する方法を決定。Kafka プロパティーの設定に関する詳細は、Apache Kafka の Web サイトを参照してください。
-
Kafka クラスターへの HTTP アクセスを有効にするために HTTP 関連のプロパティーを設定します。
以下に例を示します。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow bridge.id=my-bridge http.enabled=true http.host=0.0.0.0 http.port=8080 http.cors.enabled=true http.cors.allowedOrigins=https://strimzi.io http.cors.allowedMethods=GET,POST,PUT,DELETE,OPTIONS,PATCH
bridge.id=my-bridge http.enabled=true http.host=0.0.0.0 http.port=8080
1 http.cors.enabled=true
2 http.cors.allowedOrigins=https://strimzi.io
3 http.cors.allowedMethods=GET,POST,PUT,DELETE,OPTIONS,PATCH
4
2.3. Kafka Bridge のインストール
この手順に従って、AMQ Streams Kafka Bridge をインストールします。
手順
- まだ行っていない場合は、Kafka Bridge インストールアーカイブを任意のディレクトリーに解凍します。
設定プロパティーをパラメーターとして使用して、Kafka Bridge スクリプトを実行します。
以下に例を示します。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow ./bin/kafka_bridge_run.sh --config-file=<path>/configfile.properties
./bin/kafka_bridge_run.sh --config-file=<path>/configfile.properties
インストールが成功したことをログで確認します。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow HTTP-Kafka Bridge started and listening on port 8080 HTTP-Kafka Bridge bootstrap servers localhost:9092
HTTP-Kafka Bridge started and listening on port 8080 HTTP-Kafka Bridge bootstrap servers localhost:9092
2.4. トピックおよびパーティションへのメッセージの作成
Kafka Bridge を使用して、トピックエンドポイントを使用して JSON 形式で Kafka トピックへのメッセージを生成します。
topics エンドポイントを使用して、トピックへのメッセージを JSON 形式で生成できます。リクエスト本文でメッセージの宛先パーティションを指定できます。partitions エンドポイントは、全メッセージの単一の宛先パーティションをパスパラメーターとして指定する代替方法を提供します。
この手順では、メッセージは bridge-quickstart-topic
と呼ばれるトピックに生成されます。
前提条件
Kafka クラスターには、3 つのパーティションを持つトピックがあります。
kafka-topics.sh
ユーティリティーを使用してトピックを作成できます。3 つのパーティションを使用したトピック作成の例
Copy to Clipboard Copied! Toggle word wrap Toggle overflow bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic bridge-quickstart-topic --partitions 3 --replication-factor 1
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic bridge-quickstart-topic --partitions 3 --replication-factor 1
トピックが作成されたことを確認する
Copy to Clipboard Copied! Toggle word wrap Toggle overflow bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic bridge-quickstart-topic
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic bridge-quickstart-topic
OpenShift に AMQ Streams をデプロイした場合は、KafkaTopic
カスタムリソースを使用してトピックを作成できます。
手順
Kafka Bridge を使用して、作成したトピックに 3 つのメッセージを生成します。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow curl -X POST \ http://localhost:8080/topics/bridge-quickstart-topic \ -H 'content-type: application/vnd.kafka.json.v2+json' \ -d '{ "records": [ { "key": "my-key", "value": "sales-lead-0001" }, { "value": "sales-lead-0002", "partition": 2 }, { "value": "sales-lead-0003" } ] }'
curl -X POST \ http://localhost:8080/topics/bridge-quickstart-topic \ -H 'content-type: application/vnd.kafka.json.v2+json' \ -d '{ "records": [ { "key": "my-key", "value": "sales-lead-0001" }, { "value": "sales-lead-0002", "partition": 2 }, { "value": "sales-lead-0003" } ] }'
-
sales-lead-0001
は、キーのハッシュに基づいてパーティションに送信されます。 -
sales-lead-0002
は、パーティション 2 に直接送信されます。 -
sales-lead-0003
は、ラウンドロビン方式を使用してbridge-quickstart-topic
トピックのパーティションに送信されます。
-
リクエストが正常に行われると、Kafka Bridge は
offsets
アレイを200
コードとapplication/vnd.kafka.v2+json
のcontent-type
ヘッダーとともに返します。各メッセージで、offsets
アレイは以下を記述します。- メッセージが送信されたパーティション。
パーティションの現在のメッセージオフセット。
応答例
Copy to Clipboard Copied! Toggle word wrap Toggle overflow #... { "offsets":[ { "partition":0, "offset":0 }, { "partition":2, "offset":0 }, { "partition":0, "offset":1 } ] }
#... { "offsets":[ { "partition":0, "offset":0 }, { "partition":2, "offset":0 }, { "partition":0, "offset":1 } ] }
追加のトピック要求
他の curl 要求を実行して、トピックおよびパーティションに関する情報を見つけます。
- トピックの一覧表示
Copy to Clipboard Copied! Toggle word wrap Toggle overflow curl -X GET \ http://localhost:8080/topics
curl -X GET \ http://localhost:8080/topics
応答例
Copy to Clipboard Copied! Toggle word wrap Toggle overflow [ "__strimzi_store_topic", "__strimzi-topic-operator-kstreams-topic-store-changelog", "bridge-quickstart-topic", "my-topic" ]
[ "__strimzi_store_topic", "__strimzi-topic-operator-kstreams-topic-store-changelog", "bridge-quickstart-topic", "my-topic" ]
- トピック設定およびパーティションの詳細の取得
Copy to Clipboard Copied! Toggle word wrap Toggle overflow curl -X GET \ http://localhost:8080/topics/bridge-quickstart-topic
curl -X GET \ http://localhost:8080/topics/bridge-quickstart-topic
応答例
Copy to Clipboard Copied! Toggle word wrap Toggle overflow { "name": "bridge-quickstart-topic", "configs": { "compression.type": "producer", "leader.replication.throttled.replicas": "", "min.insync.replicas": "1", "message.downconversion.enable": "true", "segment.jitter.ms": "0", "cleanup.policy": "delete", "flush.ms": "9223372036854775807", "follower.replication.throttled.replicas": "", "segment.bytes": "1073741824", "retention.ms": "604800000", "flush.messages": "9223372036854775807", "message.format.version": "2.8-IV1", "max.compaction.lag.ms": "9223372036854775807", "file.delete.delay.ms": "60000", "max.message.bytes": "1048588", "min.compaction.lag.ms": "0", "message.timestamp.type": "CreateTime", "preallocate": "false", "index.interval.bytes": "4096", "min.cleanable.dirty.ratio": "0.5", "unclean.leader.election.enable": "false", "retention.bytes": "-1", "delete.retention.ms": "86400000", "segment.ms": "604800000", "message.timestamp.difference.max.ms": "9223372036854775807", "segment.index.bytes": "10485760" }, "partitions": [ { "partition": 0, "leader": 0, "replicas": [ { "broker": 0, "leader": true, "in_sync": true }, { "broker": 1, "leader": false, "in_sync": true }, { "broker": 2, "leader": false, "in_sync": true } ] }, { "partition": 1, "leader": 2, "replicas": [ { "broker": 2, "leader": true, "in_sync": true }, { "broker": 0, "leader": false, "in_sync": true }, { "broker": 1, "leader": false, "in_sync": true } ] }, { "partition": 2, "leader": 1, "replicas": [ { "broker": 1, "leader": true, "in_sync": true }, { "broker": 2, "leader": false, "in_sync": true }, { "broker": 0, "leader": false, "in_sync": true } ] } ] }
{ "name": "bridge-quickstart-topic", "configs": { "compression.type": "producer", "leader.replication.throttled.replicas": "", "min.insync.replicas": "1", "message.downconversion.enable": "true", "segment.jitter.ms": "0", "cleanup.policy": "delete", "flush.ms": "9223372036854775807", "follower.replication.throttled.replicas": "", "segment.bytes": "1073741824", "retention.ms": "604800000", "flush.messages": "9223372036854775807", "message.format.version": "2.8-IV1", "max.compaction.lag.ms": "9223372036854775807", "file.delete.delay.ms": "60000", "max.message.bytes": "1048588", "min.compaction.lag.ms": "0", "message.timestamp.type": "CreateTime", "preallocate": "false", "index.interval.bytes": "4096", "min.cleanable.dirty.ratio": "0.5", "unclean.leader.election.enable": "false", "retention.bytes": "-1", "delete.retention.ms": "86400000", "segment.ms": "604800000", "message.timestamp.difference.max.ms": "9223372036854775807", "segment.index.bytes": "10485760" }, "partitions": [ { "partition": 0, "leader": 0, "replicas": [ { "broker": 0, "leader": true, "in_sync": true }, { "broker": 1, "leader": false, "in_sync": true }, { "broker": 2, "leader": false, "in_sync": true } ] }, { "partition": 1, "leader": 2, "replicas": [ { "broker": 2, "leader": true, "in_sync": true }, { "broker": 0, "leader": false, "in_sync": true }, { "broker": 1, "leader": false, "in_sync": true } ] }, { "partition": 2, "leader": 1, "replicas": [ { "broker": 1, "leader": true, "in_sync": true }, { "broker": 2, "leader": false, "in_sync": true }, { "broker": 0, "leader": false, "in_sync": true } ] } ] }
- 特定のトピックのパーティションの一覧表示
Copy to Clipboard Copied! Toggle word wrap Toggle overflow curl -X GET \ http://localhost:8080/topics/bridge-quickstart-topic/partitions
curl -X GET \ http://localhost:8080/topics/bridge-quickstart-topic/partitions
応答例
Copy to Clipboard Copied! Toggle word wrap Toggle overflow [ { "partition": 0, "leader": 0, "replicas": [ { "broker": 0, "leader": true, "in_sync": true }, { "broker": 1, "leader": false, "in_sync": true }, { "broker": 2, "leader": false, "in_sync": true } ] }, { "partition": 1, "leader": 2, "replicas": [ { "broker": 2, "leader": true, "in_sync": true }, { "broker": 0, "leader": false, "in_sync": true }, { "broker": 1, "leader": false, "in_sync": true } ] }, { "partition": 2, "leader": 1, "replicas": [ { "broker": 1, "leader": true, "in_sync": true }, { "broker": 2, "leader": false, "in_sync": true }, { "broker": 0, "leader": false, "in_sync": true } ] } ]
[ { "partition": 0, "leader": 0, "replicas": [ { "broker": 0, "leader": true, "in_sync": true }, { "broker": 1, "leader": false, "in_sync": true }, { "broker": 2, "leader": false, "in_sync": true } ] }, { "partition": 1, "leader": 2, "replicas": [ { "broker": 2, "leader": true, "in_sync": true }, { "broker": 0, "leader": false, "in_sync": true }, { "broker": 1, "leader": false, "in_sync": true } ] }, { "partition": 2, "leader": 1, "replicas": [ { "broker": 1, "leader": true, "in_sync": true }, { "broker": 2, "leader": false, "in_sync": true }, { "broker": 0, "leader": false, "in_sync": true } ] } ]
- 特定のトピックパーティションの詳細の一覧表示
Copy to Clipboard Copied! Toggle word wrap Toggle overflow curl -X GET \ http://localhost:8080/topics/bridge-quickstart-topic/partitions/0
curl -X GET \ http://localhost:8080/topics/bridge-quickstart-topic/partitions/0
応答例
Copy to Clipboard Copied! Toggle word wrap Toggle overflow { "partition": 0, "leader": 0, "replicas": [ { "broker": 0, "leader": true, "in_sync": true }, { "broker": 1, "leader": false, "in_sync": true }, { "broker": 2, "leader": false, "in_sync": true } ] }
{ "partition": 0, "leader": 0, "replicas": [ { "broker": 0, "leader": true, "in_sync": true }, { "broker": 1, "leader": false, "in_sync": true }, { "broker": 2, "leader": false, "in_sync": true } ] }
- 特定のトピックパーティションのオフセットの一覧表示
Copy to Clipboard Copied! Toggle word wrap Toggle overflow curl -X GET \ http://localhost:8080/topics/bridge-quickstart-topic/partitions/0/offsets
curl -X GET \ http://localhost:8080/topics/bridge-quickstart-topic/partitions/0/offsets
応答例
Copy to Clipboard Copied! Toggle word wrap Toggle overflow { "beginning_offset": 0, "end_offset": 1 }
{ "beginning_offset": 0, "end_offset": 1 }
次のステップ
トピックおよびパーティションへのメッセージを作成したら、Kafka Bridge コンシューマーを作成します。
2.5. Kafka Bridge コンシューマーの作成
Kafka クラスターで何らかのコンシューマー操作を実行するには、まず consumers エンドポイントを使用してコンシューマーを作成する必要があります。コンシューマーは Kafka Bridge コンシューマー と呼ばれます。
手順
bridge-quickstart-consumer-group
という名前の新しいコンシューマーグループに Kafka Bridge コンシューマーを作成します。Copy to Clipboard Copied! Toggle word wrap Toggle overflow curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group \ -H 'content-type: application/vnd.kafka.v2+json' \ -d '{ "name": "bridge-quickstart-consumer", "auto.offset.reset": "earliest", "format": "json", "enable.auto.commit": false, "fetch.min.bytes": 512, "consumer.request.timeout.ms": 30000 }'
curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group \ -H 'content-type: application/vnd.kafka.v2+json' \ -d '{ "name": "bridge-quickstart-consumer", "auto.offset.reset": "earliest", "format": "json", "enable.auto.commit": false, "fetch.min.bytes": 512, "consumer.request.timeout.ms": 30000 }'
-
コンシューマーには
bridge-quickstart-consumer
という名前を付け、埋め込みデータ形式はjson
として設定します。 - 一部の基本的な設定が定義されます。
コンシューマーはログへのオフセットに自動でコミットしません。これは、
enable.auto.commit
がfalse
に設定されているからです。このクイックスタートでは、オフセットを跡で手作業でコミットします。リクエストが正常に行われると、Kafka Bridge はレスポンス本文でコンシューマー ID (
instance_id
) とベース URL (base_uri
) を200
コードとともに返します。応答例
Copy to Clipboard Copied! Toggle word wrap Toggle overflow #... { "instance_id": "bridge-quickstart-consumer", "base_uri":"http://<bridge_id>-bridge-service:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer" }
#... { "instance_id": "bridge-quickstart-consumer", "base_uri":"http://<bridge_id>-bridge-service:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer" }
-
コンシューマーには
-
ベース URL (
base_uri
) をコピーし、このクイックスタートの他のコンシューマー操作で使用します。
次のステップ
上記で作成した Kafka Bridge コンシューマーをトピックにサブスクライブできます。
2.6. Kafka Bridge コンシューマーのトピックへのサブスクライブ
Kafka Bridge コンシューマーを作成したら、subscription エンドポイントを使用して、1 つ以上のトピックにサブスクライブします。サブスクライブすると、コンシューマーはトピックに生成されたすべてのメッセージの受信を開始します。
手順
前述の トピックおよびパーティションへのメッセージの作成 の手順ですでに作成した
bridge-quickstart-topic
トピックに、コンシューマーをサブスクライブします。Copy to Clipboard Copied! Toggle word wrap Toggle overflow curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/subscription \ -H 'content-type: application/vnd.kafka.v2+json' \ -d '{ "topics": [ "bridge-quickstart-topic" ] }'
curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/subscription \ -H 'content-type: application/vnd.kafka.v2+json' \ -d '{ "topics": [ "bridge-quickstart-topic" ] }'
topics
アレイには、例のような単一のトピック、または複数のトピックを含めることができます。正規表現に一致する複数のトピックにコンシューマーをサブスクライブする場合は、topics
アレイの代わりにtopic_pattern
文字列を使用できます。リクエストが正常に行われると、Kafka Bridge によって
204
(No Content) コードのみが返されます。
次のステップ
Kafka Bridge コンシューマーをトピックにサブスクライブしたら、コンシューマーからメッセージを取得 できます。
2.7. Kafka Bridge コンシューマーからの最新メッセージの取得
records エンドポイントからデータを要求して、Kafka Bridge コンシューマーから最新のメッセージを取得します。実稼働環境では、HTTP クライアントはこのエンドポイントを繰り返し (ループで) 呼び出すことができます。
手順
- 「トピックおよびパーティションへのメッセージの生成」の説明に従い、Kafka Bridge コンシューマーに新たなメッセージを生成します。
GET
リクエストをrecords
エンドポイントに送信します。Copy to Clipboard Copied! Toggle word wrap Toggle overflow curl -X GET http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/records \ -H 'accept: application/vnd.kafka.json.v2+json'
curl -X GET http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/records \ -H 'accept: application/vnd.kafka.json.v2+json'
Kafka Bridge コンシューマーを作成し、サブスクライブすると、最初の GET リクエストによって空のレスポンスが返されます。これは、ポーリング操作がリバランスプロセスを開始してパーティションを割り当てるからです。
手順 2 を繰り返し、Kafka Bridge コンシューマーからメッセージを取得します。
Kafka Bridge は、レスポンス本文でメッセージのアレイ (トピック名、キー、値、パーティション、オフセットの記述) を
200
コードとともに返します。メッセージはデフォルトで最新のオフセットから取得されます。Copy to Clipboard Copied! Toggle word wrap Toggle overflow HTTP/1.1 200 OK content-type: application/vnd.kafka.json.v2+json #... [ { "topic":"bridge-quickstart-topic", "key":"my-key", "value":"sales-lead-0001", "partition":0, "offset":0 }, { "topic":"bridge-quickstart-topic", "key":null, "value":"sales-lead-0003", "partition":0, "offset":1 }, #...
HTTP/1.1 200 OK content-type: application/vnd.kafka.json.v2+json #... [ { "topic":"bridge-quickstart-topic", "key":"my-key", "value":"sales-lead-0001", "partition":0, "offset":0 }, { "topic":"bridge-quickstart-topic", "key":null, "value":"sales-lead-0003", "partition":0, "offset":1 }, #...
注記空のレスポンスが返される場合は、「トピックおよびパーティションへのメッセージの生成」の説明に従い、コンシューマーに対して追加のレコードを生成し、メッセージの取得を再試行します。
次のステップ
Kafka Bridge コンシューマーからメッセージを取得したら、ログへのオフセットをコミットします。
2.8. ログへのオフセットのコミット
offsets エンドポイントを使用して、Kafka Bridge コンシューマーによって受信されるすべてのメッセージに対して、手動でオフセットをログにコミットします。この操作が必要なのは、前述の Kafka Bridge コンシューマーの作成 で作成した Kafka Bridge コンシューマー が enable.auto.commit
の設定で false
に指定されているからです。
手順
bridge-quickstart-consumer
のオフセットをログにコミットします。Copy to Clipboard Copied! Toggle word wrap Toggle overflow curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/offsets
curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/offsets
リクエスト本文は送信されないので、オフセットはコンシューマーによって受信されたすべてのレコードに対してコミットされます。この代わりに、リクエスト本文に、オフセットをコミットするトピックおよびパーティションを指定するアレイ (OffsetCommitSeekList) を含めることができます。
リクエストが正常に行われると、Kafka Bridge は
204
コードのみを返します。
次のステップ
オフセットをログにコミットしたら、オフセットをシークのエンドポイントを試行します。
2.9. パーティションのオフセットのシーク
positions エンドポイントを使用して、Kafka Bridge コンシューマーを設定し、パーティションのメッセージを特定のオフセットから取得し、さらに最新のオフセットから取得します。これは Apache Kafka では、シーク操作と呼ばれます。
手順
quickstart-bridge-topic
トピックで、パーティション 0 の特定のオフセットをシークします。Copy to Clipboard Copied! Toggle word wrap Toggle overflow curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/positions \ -H 'content-type: application/vnd.kafka.v2+json' \ -d '{ "offsets": [ { "topic": "bridge-quickstart-topic", "partition": 0, "offset": 2 } ] }'
curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/positions \ -H 'content-type: application/vnd.kafka.v2+json' \ -d '{ "offsets": [ { "topic": "bridge-quickstart-topic", "partition": 0, "offset": 2 } ] }'
リクエストが正常に行われると、Kafka Bridge は
204
コードのみを返します。GET
リクエストをrecords
エンドポイントに送信します。Copy to Clipboard Copied! Toggle word wrap Toggle overflow curl -X GET http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/records \ -H 'accept: application/vnd.kafka.json.v2+json'
curl -X GET http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/records \ -H 'accept: application/vnd.kafka.json.v2+json'
Kafka Bridge は、シークしたオフセットからのメッセージを返します。
同じパーティションの最後のオフセットをシークし、デフォルトのメッセージ取得動作を復元します。この時点で、positions/end エンドポイントを使用します。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/positions/end \ -H 'content-type: application/vnd.kafka.v2+json' \ -d '{ "partitions": [ { "topic": "bridge-quickstart-topic", "partition": 0 } ] }'
curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/positions/end \ -H 'content-type: application/vnd.kafka.v2+json' \ -d '{ "partitions": [ { "topic": "bridge-quickstart-topic", "partition": 0 } ] }'
リクエストが正常に行われると、Kafka Bridge は別の
204
コードを返します。
また、positions/beginning エンドポイントを使用して、1 つ以上のパーティションの最初のオフセットをシークすることもできます。
次のステップ
このクイックスタートでは、AMQ Streams Kafka Bridge を使用して Kafka クラスターの一般的な操作をいくつか実行しました。これで、すでに作成した Kafka Bridge コンシューマーを削除 できます。
2.10. Kafka Bridge コンシューマーの削除
このクイックスタートを通して使用した Kafa Bridge コンシューマーを削除します。
手順
DELETE
リクエストを instances エンドポイントに送信し、Kafka Bridge コンシューマーを削除します。Copy to Clipboard Copied! Toggle word wrap Toggle overflow curl -X DELETE http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer
curl -X DELETE http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer
リクエストが正常に行われると、Kafka Bridge は
204
コードを返します。
第3章 AMQ Streams Kafka Bridge API リファレンス
3.1. 概要
AMQ Streams Kafka Bridge は、HTTP ベースのクライアントアプリケーションを Kafka クラスターと統合するための RESTAPI を提供します。API を使用して、ネイティブの Kafka プロトコルではなく、HTTP を介してコンシューマーを作成および管理し、レコードを送受信できます。
3.1.1. バージョン情報
バージョン : 0.1.0
3.1.2. タグ
- コンシューマー: Kafka クラスターにコンシューマーを作成し、トピックのサブスクライブ、処理されたレコードの取得、オフセットのコミットなどの一般的なアクションを実行するためのコンシューマー操作。
- プロデューサー: 指定されたトピックまたはトピックパーティションにレコードを送信するプロデューサー操作。
- シーク: コンシューマーが特定のオフセット位置からメッセージの受信を開始できるようにするシーク操作。
- トピック: 指定されたトピックまたはトピックパーティションにメッセージを送信するトピック操作。任意で、リクエストにメッセージキーを含めます。トピックとトピックメタデータを取得することもできます。
3.1.3. 消費
-
application/json
3.1.4. 生成されるもの
-
application/json
3.2. 定義
3.2.1. AssignedTopicPartitions
Type : < string, < integer (int32) > array > map
3.2.2. BridgeInfo
Kafka Bridge インスタンスに関する情報。
名前 | スキーマ |
---|---|
bridge_version | 文字列 |
3.2.3. コンシューマー
名前 | 説明 | スキーマ |
---|---|---|
auto.offset.reset |
コンシューマーのオフセットの位置をリセットします。 | 文字列 |
consumer.request.timeout.ms | コンシューマーが要求のメッセージを待機する最大時間をミリ秒単位で設定します。応答なしでタイムアウト期間に達すると、エラーが返されます。 | 整数 |
enable.auto.commit |
| ブール値 |
fetch.min.bytes | コンシューマーが受信するデータの最小量をバイト単位で設定します。ブローカーは、送信するデータがこの量を超えるまで待機します。 | 整数 |
format |
コンシューマーに許可されるメッセージ形式。 | 文字列 |
isolation.level |
| 文字列 |
name | コンシューマーインスタンスの一意の名前。この名前は、コンシューマーグループの範囲内で一意です。名前は URL で使用されます。 | 文字列 |
3.2.4. ConsumerRecord
名前 | スキーマ |
---|---|
headers | |
key | 文字列 |
offset | integer (int64) |
partition | 整数 (int32) |
topic | 文字列 |
value | 文字列 |
3.2.5. ConsumerRecordList
Type : < ConsumerRecord > array
3.2.6. CreatedConsumer
名前 | 説明 | スキーマ |
---|---|---|
base_uri | このコンシューマーインスタンスに対する後続のリクエストの URI を構築するのに使用されるベース URI。 | 文字列 |
instance_id | グループ内のコンシューマーインスタンスの一意の ID。 | 文字列 |
3.2.7. Error
名前 | スキーマ |
---|---|
error_code | 整数 (int32) |
message | 文字列 |
3.2.8. KafkaHeader
名前 | 説明 | スキーマ |
---|---|---|
key | 文字列 | |
value |
バイナリー形式のヘッダー値: base64 でエンコードした | 文字列 (バイト) |
3.2.9. KafkaHeaderList
Type : < KafkaHeader > array
3.2.10. OffsetCommitSeek
名前 | スキーマ |
---|---|
offset | integer (int64) |
partition | 整数 (int32) |
topic | 文字列 |
3.2.11. OffsetCommitSeekList
名前 | スキーマ |
---|---|
offsets | < OffsetCommitSeek > array |
3.2.12. OffsetRecordSent
名前 | スキーマ |
---|---|
offset | integer (int64) |
partition | 整数 (int32) |
3.2.13. OffsetRecordSentList
名前 | スキーマ |
---|---|
offsets | < OffsetRecordSent > array |
3.2.14. OffsetsSummary
名前 | スキーマ |
---|---|
beginning_offset | integer (int64) |
end_offset | integer (int64) |
3.2.15. パーティション
名前 | スキーマ |
---|---|
partition | 整数 (int32) |
topic | 文字列 |
3.2.16. PartitionMetadata
名前 | スキーマ |
---|---|
leader | 整数 (int32) |
partition | 整数 (int32) |
replicas | < Replica > アレイ |
3.2.17. Partitions
名前 | スキーマ |
---|---|
partitions | < Partition > アレイ |
3.2.18. ProducerRecord
名前 | スキーマ |
---|---|
headers | |
partition | 整数 (int32) |
3.2.19. ProducerRecordList
名前 | スキーマ |
---|---|
records | < ProducerRecord > array |
3.2.20. ProducerRecordToPartition
Type : おぷジェクト
3.2.21. ProducerRecordToPartitionList
名前 | スキーマ |
---|---|
records | < ProducerRecordToPartition > array |
3.2.22. レプリカ
名前 | スキーマ |
---|---|
broker | 整数 (int32) |
in_sync | ブール値 |
leader | ブール値 |
3.2.23. SubscribedTopicList
名前 | スキーマ |
---|---|
partitions | < AssignedTopicPartitions > array |
topics |
3.2.24. TopicMetadata
名前 | 説明 | スキーマ |
---|---|---|
configs | トピックごとの設定のオーバーライド | < string, string > マップ |
name | トピックの名前 | 文字列 |
partitions | < PartitionMetadata > array |
3.2.25. トピック
名前 | 説明 | スキーマ |
---|---|---|
topic_pattern | 複数のトピックを照合するための正規表現トピックパターン | 文字列 |
topics | < 文字列 > 配列 |
3.3. パス
3.3.1. GET /
3.3.1.1. 説明
Kafka Bridge インスタンスに関する情報を JSON 形式で取得します。
3.3.1.2. 応答
HTTP コード | 説明 | スキーマ |
---|---|---|
200 | Kafka Bridge インスタンスに関する情報。 |
3.3.1.3. 生成されるもの
-
application/json
3.3.1.4. HTTP 応答の例
3.3.1.4.1. Response 200
{ "bridge_version" : "0.16.0" }
{
"bridge_version" : "0.16.0"
}
3.3.2. POST /consumers/{groupid}
3.3.2.1. 説明
指定されたコンシューマーグループにコンシューマーインスタンスを作成します。任意で、コンシューマー名とサポートされている設定オプションを指定できます。これは、このコンシューマーインスタンスに対する後続のリクエストの URL を構築するために使用する必要があるベース URI を返します。
3.3.2.2. パラメーター
タイプ | Name | 説明 | スキーマ |
---|---|---|---|
パス |
groupid | コンシューマーを作成するコンシューマーグループの ID。 | 文字列 |
ボディー |
body | コンシューマーの名前と設定。この名前は、コンシューマーグループの範囲内で一意です。名前が指定されていない場合は、ランダムに生成された名前が割り当てられます。すべてのパラメーターはオプションです。サポートされている設定オプションを次の例に示します。 |
3.3.2.3. 応答
HTTP コード | 説明 | スキーマ |
---|---|---|
200 | コンシューマーは正常に作成されました。 | |
409 | 指定された名前のコンシューマーインスタンスは、Kafka Bridge にすでに存在します。 | |
422 | 1 つ以上のコンシューマー設定オプションに無効な値があります。 |
3.3.2.4. 消費
-
application/vnd.kafka.v2+json
3.3.2.5. 生成されるもの
-
application/vnd.kafka.v2+json
3.3.2.6. タグ
- コンシューマー
3.3.2.7. HTTP 要求の例
3.3.2.7.1. 要求のボディー
{ "name" : "consumer1", "format" : "binary", "auto.offset.reset" : "earliest", "enable.auto.commit" : false, "fetch.min.bytes" : 512, "consumer.request.timeout.ms" : 30000, "isolation.level" : "read_committed" }
{
"name" : "consumer1",
"format" : "binary",
"auto.offset.reset" : "earliest",
"enable.auto.commit" : false,
"fetch.min.bytes" : 512,
"consumer.request.timeout.ms" : 30000,
"isolation.level" : "read_committed"
}
3.3.2.8. HTTP 応答の例
3.3.2.8.1. Response 200
{ "instance_id" : "consumer1", "base_uri" : "http://localhost:8080/consumers/my-group/instances/consumer1" }
{
"instance_id" : "consumer1",
"base_uri" : "http://localhost:8080/consumers/my-group/instances/consumer1"
}
3.3.2.8.2. Response 409
{ "error_code" : 409, "message" : "A consumer instance with the specified name already exists in the Kafka Bridge." }
{
"error_code" : 409,
"message" : "A consumer instance with the specified name already exists in the Kafka Bridge."
}
3.3.2.8.3. Response 422
{ "error_code" : 422, "message" : "One or more consumer configuration options have invalid values." }
{
"error_code" : 422,
"message" : "One or more consumer configuration options have invalid values."
}
3.3.3. DELETE /consumers/{groupid}/instances/{name}
3.3.3.1. 説明
指定されたコンシューマーインスタンスを削除します。この操作のリクエストは、このコンシューマーの作成に使用された /consumers/{groupid}
への POST
リクエストからの応答で返されたベース URL (ホストおよびポートを含む) を使用する必要があります。
3.3.3.2. パラメーター
タイプ | Name | 説明 | スキーマ |
---|---|---|---|
パス |
groupid | コンシューマーが属するコンシューマーグループの ID。 | 文字列 |
パス |
name | 削除するコンシューマーの名前。 | 文字列 |
3.3.3.3. 応答
HTTP コード | 説明 | スキーマ |
---|---|---|
204 | コンシューマーは正常に削除されました。 | コンテンツなし |
404 | 指定されたコンシューマーインスタンスが見つかりませんでした。 |
3.3.3.4. 消費
-
application/vnd.kafka.v2+json
3.3.3.5. 生成されるもの
-
application/vnd.kafka.v2+json
3.3.3.6. タグ
- コンシューマー
3.3.3.7. HTTP 応答の例
3.3.3.7.1. Response 404
{ "error_code" : 404, "message" : "The specified consumer instance was not found." }
{
"error_code" : 404,
"message" : "The specified consumer instance was not found."
}
3.3.4. POST /consumers/{groupid}/instances/{name}/assignments
3.3.4.1. 説明
1 つ以上のトピックパーティションをコンシューマーに割り当てます。
3.3.4.2. パラメーター
タイプ | Name | 説明 | スキーマ |
---|---|---|---|
パス |
groupid | コンシューマーが属するコンシューマーグループの ID。 | 文字列 |
パス |
name | トピックパーティションを割り当てるコンシューマーの名前。 | 文字列 |
ボディー |
body | コンシューマーに割り当てるトピックパーティションのリスト。 |
3.3.4.3. 応答
HTTP コード | 説明 | スキーマ |
---|---|---|
204 | パーティションは正常に割り当てられました。 | コンテンツなし |
404 | 指定されたコンシューマーインスタンスが見つかりませんでした。 | |
409 | トピック、パーティション、およびパターンへのサブスクリプションは相互に排他的です。 |
3.3.4.4. 消費
-
application/vnd.kafka.v2+json
3.3.4.5. 生成されるもの
-
application/vnd.kafka.v2+json
3.3.4.6. タグ
- コンシューマー
3.3.4.7. HTTP 要求の例
3.3.4.7.1. 要求のボディー
{ "partitions" : [ { "topic" : "topic", "partition" : 0 }, { "topic" : "topic", "partition" : 1 } ] }
{
"partitions" : [ {
"topic" : "topic",
"partition" : 0
}, {
"topic" : "topic",
"partition" : 1
} ]
}
3.3.4.8. HTTP 応答の例
3.3.4.8.1. Response 404
{ "error_code" : 404, "message" : "The specified consumer instance was not found." }
{
"error_code" : 404,
"message" : "The specified consumer instance was not found."
}
3.3.4.8.2. Response 409
{ "error_code" : 409, "message" : "Subscriptions to topics, partitions, and patterns are mutually exclusive." }
{
"error_code" : 409,
"message" : "Subscriptions to topics, partitions, and patterns are mutually exclusive."
}
3.3.5. POST /consumers/{groupid}/instances/{name}/offsets
3.3.5.1. 説明
コンシューマーオフセットのリストをコミットします。コンシューマーによってフェッチされたすべてのレコードのオフセットをコミットするには、リクエストの本文を空のままにします。
3.3.5.2. パラメーター
タイプ | Name | 説明 | スキーマ |
---|---|---|---|
パス |
groupid | コンシューマーが属するコンシューマーグループの ID。 | 文字列 |
パス |
name | コンシューマーの名前。 | 文字列 |
ボディー |
body | コンシューマーオフセットコミットログにコミットするコンシューマーオフセットのリスト。オフセットをコミットする 1 つ以上のトピックパーティションを指定できます。 |
3.3.5.3. 応答
HTTP コード | 説明 | スキーマ |
---|---|---|
204 | コミットは正常に行われました。 | コンテンツなし |
404 | 指定されたコンシューマーインスタンスが見つかりませんでした。 |
3.3.5.4. 消費
-
application/vnd.kafka.v2+json
3.3.5.5. 生成されるもの
-
application/vnd.kafka.v2+json
3.3.5.6. タグ
- コンシューマー
3.3.5.7. HTTP 要求の例
3.3.5.7.1. 要求のボディー
{ "offsets" : [ { "topic" : "topic", "partition" : 0, "offset" : 15 }, { "topic" : "topic", "partition" : 1, "offset" : 42 } ] }
{
"offsets" : [ {
"topic" : "topic",
"partition" : 0,
"offset" : 15
}, {
"topic" : "topic",
"partition" : 1,
"offset" : 42
} ]
}
3.3.5.8. HTTP 応答の例
3.3.5.8.1. Response 404
{ "error_code" : 404, "message" : "The specified consumer instance was not found." }
{
"error_code" : 404,
"message" : "The specified consumer instance was not found."
}
3.3.6. POST /consumers/{groupid}/instances/{name}/positions
3.3.6.1. 説明
サブスクライブされたコンシューマーが、次に特定のトピックパーティションからレコードのセットをフェッチするときに、特定のオフセットからオフセットをフェッチするように設定します。これは、コンシューマーのデフォルトのフェッチ動作をオーバーライドします。1 つ以上のトピックパーティションを指定できます。
3.3.6.2. パラメーター
タイプ | Name | 説明 | スキーマ |
---|---|---|---|
パス |
groupid | コンシューマーが属するコンシューマーグループの ID。 | 文字列 |
パス |
name | サブスクライブされたコンシューマーの名前。 | 文字列 |
ボディー |
body | サブスクライブされたコンシューマーが次にレコードをフェッチするパーティションオフセットのリスト。 |
3.3.6.3. 応答
HTTP コード | 説明 | スキーマ |
---|---|---|
204 | シークは正常に実行されました。 | コンテンツなし |
404 | 指定されたコンシューマーインスタンスが見つからなかったか、指定されたコンシューマーインスタンスに指定されたパーティションの 1 つが割り当てられていませんでした。 |
3.3.6.4. 消費
-
application/vnd.kafka.v2+json
3.3.6.5. 生成されるもの
-
application/vnd.kafka.v2+json
3.3.6.6. タグ
- コンシューマー
- Seek
3.3.6.7. HTTP 要求の例
3.3.6.7.1. 要求のボディー
{ "offsets" : [ { "topic" : "topic", "partition" : 0, "offset" : 15 }, { "topic" : "topic", "partition" : 1, "offset" : 42 } ] }
{
"offsets" : [ {
"topic" : "topic",
"partition" : 0,
"offset" : 15
}, {
"topic" : "topic",
"partition" : 1,
"offset" : 42
} ]
}
3.3.6.8. HTTP 応答の例
3.3.6.8.1. Response 404
{ "error_code" : 404, "message" : "The specified consumer instance was not found." }
{
"error_code" : 404,
"message" : "The specified consumer instance was not found."
}
3.3.7. POST /consumers/{groupid}/instances/{name}/positions/beginning
3.3.7.1. 説明
1 つ以上の指定されたトピックパーティションの最初のオフセットをシークする (そしてその後読み取る) ようにサブスクライブされたコンシューマーを設定します。
3.3.7.2. パラメーター
タイプ | Name | 説明 | スキーマ |
---|---|---|---|
パス |
groupid | サブスクライブされたコンシューマーが属するコンシューマーグループの ID。 | 文字列 |
パス |
name | サブスクライブされたコンシューマーの名前。 | 文字列 |
ボディー |
body | コンシューマーがサブスクライブしているトピックパーティションのリスト。コンシューマーは、指定されたパーティションの最初のオフセットを探します。 |
3.3.7.3. 応答
HTTP コード | 説明 | スキーマ |
---|---|---|
204 | 正常に実行されたものを最初にシークします。 | コンテンツなし |
404 | 指定されたコンシューマーインスタンスが見つからなかったか、指定されたコンシューマーインスタンスに指定されたパーティションの 1 つが割り当てられていませんでした。 |
3.3.7.4. 消費
-
application/vnd.kafka.v2+json
3.3.7.5. 生成されるもの
-
application/vnd.kafka.v2+json
3.3.7.6. タグ
- コンシューマー
- Seek
3.3.7.7. HTTP 要求の例
3.3.7.7.1. 要求のボディー
{ "partitions" : [ { "topic" : "topic", "partition" : 0 }, { "topic" : "topic", "partition" : 1 } ] }
{
"partitions" : [ {
"topic" : "topic",
"partition" : 0
}, {
"topic" : "topic",
"partition" : 1
} ]
}
3.3.7.8. HTTP 応答の例
3.3.7.8.1. Response 404
{ "error_code" : 404, "message" : "The specified consumer instance was not found." }
{
"error_code" : 404,
"message" : "The specified consumer instance was not found."
}
3.3.8. POST /consumers/{groupid}/instances/{name}/positions/end
3.3.8.1. 説明
1 つ以上の指定されたトピックパーティションの終わりでオフセットをシークする (そしてその後読み取る) ようにサブスクライブされたコンシューマーを構成します。
3.3.8.2. パラメーター
タイプ | Name | 説明 | スキーマ |
---|---|---|---|
パス |
groupid | サブスクライブされたコンシューマーが属するコンシューマーグループの ID。 | 文字列 |
パス |
name | サブスクライブされたコンシューマーの名前。 | 文字列 |
ボディー |
body | コンシューマーがサブスクライブしているトピックパーティションのリスト。コンシューマーは、指定されたパーティションの最後のオフセットを探します。 |
3.3.8.3. 応答
HTTP コード | 説明 | スキーマ |
---|---|---|
204 | 最後に正常に実行されたものをシークします。 | コンテンツなし |
404 | 指定されたコンシューマーインスタンスが見つからなかったか、指定されたコンシューマーインスタンスに指定されたパーティションの 1 つが割り当てられていませんでした。 |
3.3.8.4. 消費
-
application/vnd.kafka.v2+json
3.3.8.5. 生成されるもの
-
application/vnd.kafka.v2+json
3.3.8.6. タグ
- コンシューマー
- Seek
3.3.8.7. HTTP 要求の例
3.3.8.7.1. 要求のボディー
{ "partitions" : [ { "topic" : "topic", "partition" : 0 }, { "topic" : "topic", "partition" : 1 } ] }
{
"partitions" : [ {
"topic" : "topic",
"partition" : 0
}, {
"topic" : "topic",
"partition" : 1
} ]
}
3.3.8.8. HTTP 応答の例
3.3.8.8.1. Response 404
{ "error_code" : 404, "message" : "The specified consumer instance was not found." }
{
"error_code" : 404,
"message" : "The specified consumer instance was not found."
}
3.3.9. GET /consumers/{groupid}/instances/{name}/records
3.3.9.1. 説明
メッセージ値、トピック、パーティションなど、サブスクライブされたコンシューマーのレコードを取得します。この操作のリクエストは、このコンシューマーの作成に使用された /consumers/{groupid}
への POST
リクエストからの応答で返されたベース URL (ホストおよびポートを含む) を使用する必要があります。
3.3.9.2. パラメーター
タイプ | Name | 説明 | スキーマ |
---|---|---|---|
パス |
groupid | サブスクライブされたコンシューマーが属するコンシューマーグループの ID。 | 文字列 |
パス |
name | レコードを取得するサブスクライブされたコンシューマーの名前。 | 文字列 |
クエリー |
max_bytes | 応答に含めることができるエンコードされていないキーと値の最大サイズ (バイト単位)。それ以外の場合は、コード 422 のエラー応答が返されます。 | 整数 |
クエリー |
timeout | HTTP Bridge が要求をタイムアウトする前にレコードの取得に費やす最大時間 (ミリ秒単位)。 | 整数 |
3.3.9.3. 応答
HTTP コード | 説明 | スキーマ |
---|---|---|
200 | ポーリング要求は正常に実行されました。 | |
404 | 指定されたコンシューマーインスタンスが見つかりませんでした。 | |
406 |
コンシューマー作成リクエストで使用された | |
422 | 応答が、コンシューマーが受信できる最大バイト数を超えています |
3.3.9.4. 生成されるもの
-
application/vnd.kafka.json.v2+json
-
application/vnd.kafka.binary.v2+json
-
application/vnd.kafka.v2+json
3.3.9.5. タグ
- コンシューマー
3.3.9.6. HTTP 応答の例
3.3.9.6.1. Response 200
[ { "topic" : "topic", "key" : "key1", "value" : { "foo" : "bar" }, "partition" : 0, "offset" : 2 }, { "topic" : "topic", "key" : "key2", "value" : [ "foo2", "bar2" ], "partition" : 1, "offset" : 3 } ]
[ {
"topic" : "topic",
"key" : "key1",
"value" : {
"foo" : "bar"
},
"partition" : 0,
"offset" : 2
}, {
"topic" : "topic",
"key" : "key2",
"value" : [ "foo2", "bar2" ],
"partition" : 1,
"offset" : 3
} ]
[ { "topic": "test", "key": "a2V5", "value": "Y29uZmx1ZW50", "partition": 1, "offset": 100, }, { "topic": "test", "key": "a2V5", "value": "a2Fma2E=", "partition": 2, "offset": 101, } ]
[
{
"topic": "test",
"key": "a2V5",
"value": "Y29uZmx1ZW50",
"partition": 1,
"offset": 100,
},
{
"topic": "test",
"key": "a2V5",
"value": "a2Fma2E=",
"partition": 2,
"offset": 101,
}
]
3.3.9.6.2. Response 404
{ "error_code" : 404, "message" : "The specified consumer instance was not found." }
{
"error_code" : 404,
"message" : "The specified consumer instance was not found."
}
3.3.9.6.3. Response 406
{ "error_code" : 406, "message" : "The `format` used in the consumer creation request does not match the embedded format in the Accept header of this request." }
{
"error_code" : 406,
"message" : "The `format` used in the consumer creation request does not match the embedded format in the Accept header of this request."
}
3.3.9.6.4. Response 422
{ "error_code" : 422, "message" : "Response exceeds the maximum number of bytes the consumer can receive" }
{
"error_code" : 422,
"message" : "Response exceeds the maximum number of bytes the consumer can receive"
}
3.3.10. POST /consumers/{groupid}/instances/{name}/subscription
3.3.10.1. 説明
コンシューマーを 1 つ以上のトピックにサブスクライブします。コンシューマーがサブスクライブするトピックを (トピック
タイプの) リストで、または topic_pattern
フィールドとして記述できます。各呼び出しは、サブスクライバーのサブスクリプションを置き換えます。
3.3.10.2. パラメーター
タイプ | Name | 説明 | スキーマ |
---|---|---|---|
パス |
groupid | サブスクライブされたコンシューマーが属するコンシューマーグループの ID。 | 文字列 |
パス |
name | トピックをサブスクライブするコンシューマーの名前。 | 文字列 |
ボディー |
body | コンシューマーがサブスクライブするトピックのリスト。 |
3.3.10.3. 応答
HTTP コード | 説明 | スキーマ |
---|---|---|
204 | コンシューマーは正常にサブスクライブしました。 | コンテンツなし |
404 | 指定されたコンシューマーインスタンスが見つかりませんでした。 | |
409 | トピック、パーティション、およびパターンへのサブスクリプションは相互に排他的です。 | |
422 |
( |
3.3.10.4. 消費
-
application/vnd.kafka.v2+json
3.3.10.5. 生成されるもの
-
application/vnd.kafka.v2+json
3.3.10.6. タグ
- コンシューマー
3.3.10.7. HTTP 要求の例
3.3.10.7.1. 要求のボディー
{ "topics" : [ "topic1", "topic2" ] }
{
"topics" : [ "topic1", "topic2" ]
}
3.3.10.8. HTTP 応答の例
3.3.10.8.1. Response 404
{ "error_code" : 404, "message" : "The specified consumer instance was not found." }
{
"error_code" : 404,
"message" : "The specified consumer instance was not found."
}
3.3.10.8.2. Response 409
{ "error_code" : 409, "message" : "Subscriptions to topics, partitions, and patterns are mutually exclusive." }
{
"error_code" : 409,
"message" : "Subscriptions to topics, partitions, and patterns are mutually exclusive."
}
3.3.10.8.3. Response 422
{ "error_code" : 422, "message" : "A list (of Topics type) or a topic_pattern must be specified." }
{
"error_code" : 422,
"message" : "A list (of Topics type) or a topic_pattern must be specified."
}
3.3.11. GET /consumers/{groupid}/instances/{name}/subscription
3.3.11.1. 説明
コンシューマーがサブスクライブしているトピックのリストを取得します。
3.3.11.2. パラメーター
タイプ | Name | 説明 | スキーマ |
---|---|---|---|
パス |
groupid | サブスクライブされたコンシューマーが属するコンシューマーグループの ID。 | 文字列 |
パス |
name | サブスクライブされたコンシューマーの名前。 | 文字列 |
3.3.11.3. 応答
HTTP コード | 説明 | スキーマ |
---|---|---|
200 | サブスクライブされたトピックとパーティションのリスト。 | |
404 | 指定されたコンシューマーインスタンスが見つかりませんでした。 |
3.3.11.4. 生成されるもの
-
application/vnd.kafka.v2+json
3.3.11.5. タグ
- コンシューマー
3.3.11.6. HTTP 応答の例
3.3.11.6.1. Response 200
{ "topics" : [ "my-topic1", "my-topic2" ], "partitions" : [ { "my-topic1" : [ 1, 2, 3 ] }, { "my-topic2" : [ 1 ] } ] }
{
"topics" : [ "my-topic1", "my-topic2" ],
"partitions" : [ {
"my-topic1" : [ 1, 2, 3 ]
}, {
"my-topic2" : [ 1 ]
} ]
}
3.3.11.6.2. Response 404
{ "error_code" : 404, "message" : "The specified consumer instance was not found." }
{
"error_code" : 404,
"message" : "The specified consumer instance was not found."
}
3.3.12. DELETE /consumers/{groupid}/instances/{name}/subscription
3.3.12.1. 説明
すべてのトピックからコンシューマーの登録を解除します。
3.3.12.2. パラメーター
タイプ | Name | 説明 | スキーマ |
---|---|---|---|
パス |
groupid | サブスクライブされたコンシューマーが属するコンシューマーグループの ID。 | 文字列 |
パス |
name | トピックからサブスクライブを解除するコンシューマーの名前。 | 文字列 |
3.3.12.3. 応答
HTTP コード | 説明 | スキーマ |
---|---|---|
204 | コンシューマーは正常にサブスクライブを解除しました。 | コンテンツなし |
404 | 指定されたコンシューマーインスタンスが見つかりませんでした。 |
3.3.12.4. タグ
- コンシューマー
3.3.12.5. HTTP 応答の例
3.3.12.5.1. Response 404
{ "error_code" : 404, "message" : "The specified consumer instance was not found." }
{
"error_code" : 404,
"message" : "The specified consumer instance was not found."
}
3.3.13. GET /healthy
3.3.13.1. 説明
ブリッジが実行しているかどうかを確認します。これは、必ずしもリクエストを受け入れる準備ができていることを意味するわけではありません。
3.3.13.2. 応答
HTTP コード | 説明 | スキーマ |
---|---|---|
200 | ブリッジは問題なし | コンテンツなし |
3.3.14. GET /openapi
3.3.14.1. 説明
OpenAPI v2 仕様を JSON 形式で取得します。
3.3.14.2. 応答
HTTP コード | 説明 | スキーマ |
---|---|---|
200 | JSON 形式の OpenAPI v2 仕様が正常に取得されました。 | 文字列 |
3.3.14.3. 生成されるもの
-
application/json
3.3.15. GET /ready
3.3.15.1. 説明
ブリッジの準備ができており、リクエストを受け入れることができるかどうかを確認してください。
3.3.15.2. 応答
HTTP コード | 説明 | スキーマ |
---|---|---|
200 | ブリッジの準備完了 | コンテンツなし |
3.3.16. GET /topics
3.3.16.1. 説明
すべてのトピックのリストを取得します。
3.3.16.2. 応答
HTTP コード | 説明 | スキーマ |
---|---|---|
200 | トピックのリスト。 | < 文字列 > 配列 |
3.3.16.3. 生成されるもの
-
application/vnd.kafka.v2+json
3.3.16.4. タグ
- トピック
3.3.16.5. HTTP 応答の例
3.3.16.5.1. Response 200
[ "topic1", "topic2" ]
[ "topic1", "topic2" ]
3.3.17. POST /topics/{topicname}
3.3.17.1. 説明
1 つ以上のレコードを特定のトピックに送信し、任意でパーティション、キー、またはその両方を指定します。
3.3.17.2. パラメーター
タイプ | Name | 説明 | スキーマ |
---|---|---|---|
パス |
topicname | レコードの送信先またはメタデータの取得元のトピックの名前。 | 文字列 |
ボディー |
body |
3.3.17.3. 応答
HTTP コード | 説明 | スキーマ |
---|---|---|
200 | レコードは正常に送信されました。 | |
404 | 指定されたトピックが見つかりませんでした。 | |
422 | レコードリストが無効です。 |
3.3.17.4. 消費
-
application/vnd.kafka.json.v2+json
-
application/vnd.kafka.binary.v2+json
3.3.17.5. 生成されるもの
-
application/vnd.kafka.v2+json
3.3.17.6. タグ
- プロデューサー
- トピック
3.3.17.7. HTTP 要求の例
3.3.17.7.1. 要求のボディー
{ "records" : [ { "key" : "key1", "value" : "value1" }, { "value" : "value2", "partition" : 1 }, { "value" : "value3" } ] }
{
"records" : [ {
"key" : "key1",
"value" : "value1"
}, {
"value" : "value2",
"partition" : 1
}, {
"value" : "value3"
} ]
}
3.3.17.8. HTTP 応答の例
3.3.17.8.1. Response 200
{ "offsets" : [ { "partition" : 2, "offset" : 0 }, { "partition" : 1, "offset" : 1 }, { "partition" : 2, "offset" : 2 } ] }
{
"offsets" : [ {
"partition" : 2,
"offset" : 0
}, {
"partition" : 1,
"offset" : 1
}, {
"partition" : 2,
"offset" : 2
} ]
}
3.3.17.8.2. Response 404
{ "error_code" : 404, "message" : "The specified topic was not found." }
{
"error_code" : 404,
"message" : "The specified topic was not found."
}
3.3.17.8.3. Response 422
{ "error_code" : 422, "message" : "The record list contains invalid records." }
{
"error_code" : 422,
"message" : "The record list contains invalid records."
}
3.3.18. GET /topics/{topicname}
3.3.18.1. 説明
特定のトピックに関するメタデータを取得します。
3.3.18.2. パラメーター
タイプ | Name | 説明 | スキーマ |
---|---|---|---|
パス |
topicname | レコードの送信先またはメタデータの取得元のトピックの名前。 | 文字列 |
3.3.18.3. 応答
HTTP コード | 説明 | スキーマ |
---|---|---|
200 | トピックのメタデータ |
3.3.18.4. 生成されるもの
-
application/vnd.kafka.v2+json
3.3.18.5. タグ
- トピック
3.3.18.6. HTTP 応答の例
3.3.18.6.1. Response 200
{ "name" : "topic", "offset" : 2, "configs" : { "cleanup.policy" : "compact" }, "partitions" : [ { "partition" : 1, "leader" : 1, "replicas" : [ { "broker" : 1, "leader" : true, "in_sync" : true }, { "broker" : 2, "leader" : false, "in_sync" : true } ] }, { "partition" : 2, "leader" : 2, "replicas" : [ { "broker" : 1, "leader" : false, "in_sync" : true }, { "broker" : 2, "leader" : true, "in_sync" : true } ] } ] }
{
"name" : "topic",
"offset" : 2,
"configs" : {
"cleanup.policy" : "compact"
},
"partitions" : [ {
"partition" : 1,
"leader" : 1,
"replicas" : [ {
"broker" : 1,
"leader" : true,
"in_sync" : true
}, {
"broker" : 2,
"leader" : false,
"in_sync" : true
} ]
}, {
"partition" : 2,
"leader" : 2,
"replicas" : [ {
"broker" : 1,
"leader" : false,
"in_sync" : true
}, {
"broker" : 2,
"leader" : true,
"in_sync" : true
} ]
} ]
}
3.3.19. GET /topics/{topicname}/partitions
3.3.19.1. 説明
トピックのパーティションのリストを取得します。
3.3.19.2. パラメーター
タイプ | Name | 説明 | スキーマ |
---|---|---|---|
パス |
topicname | レコードの送信先またはメタデータの取得元のトピックの名前。 | 文字列 |
3.3.19.3. 応答
HTTP コード | 説明 | スキーマ |
---|---|---|
200 | パーティションのリスト | < PartitionMetadata > array |
404 | 指定されたトピックが見つかりませんでした。 |
3.3.19.4. 生成されるもの
-
application/vnd.kafka.v2+json
3.3.19.5. タグ
- トピック
3.3.19.6. HTTP 応答の例
3.3.19.6.1. Response 200
[ { "partition" : 1, "leader" : 1, "replicas" : [ { "broker" : 1, "leader" : true, "in_sync" : true }, { "broker" : 2, "leader" : false, "in_sync" : true } ] }, { "partition" : 2, "leader" : 2, "replicas" : [ { "broker" : 1, "leader" : false, "in_sync" : true }, { "broker" : 2, "leader" : true, "in_sync" : true } ] } ]
[ {
"partition" : 1,
"leader" : 1,
"replicas" : [ {
"broker" : 1,
"leader" : true,
"in_sync" : true
}, {
"broker" : 2,
"leader" : false,
"in_sync" : true
} ]
}, {
"partition" : 2,
"leader" : 2,
"replicas" : [ {
"broker" : 1,
"leader" : false,
"in_sync" : true
}, {
"broker" : 2,
"leader" : true,
"in_sync" : true
} ]
} ]
3.3.19.6.2. Response 404
{ "error_code" : 404, "message" : "The specified topic was not found." }
{
"error_code" : 404,
"message" : "The specified topic was not found."
}
3.3.20. POST /topics/{topicname}/partitions/{partitionid}
3.3.20.1. 説明
任意でキーを指定して、1 つ以上のレコードを特定のトピックパーティションに送信します。
3.3.20.2. パラメーター
タイプ | Name | 説明 | スキーマ |
---|---|---|---|
パス |
partitionid | レコードを送信したり、メタデータを取得したりするパーティションの ID。 | 整数 |
パス |
topicname | レコードの送信先またはメタデータの取得元のトピックの名前。 | 文字列 |
ボディー |
body | 値 (必須) とキー (任意) を含む、特定のトピックパーティションに送信するレコードのリスト。 |
3.3.20.3. 応答
HTTP コード | 説明 | スキーマ |
---|---|---|
200 | レコードは正常に送信されました。 | |
404 | 指定されたトピックパーティションが見つかりませんでした。 | |
422 | レコードが無効です。 |
3.3.20.4. 消費
-
application/vnd.kafka.json.v2+json
-
application/vnd.kafka.binary.v2+json
3.3.20.5. 生成されるもの
-
application/vnd.kafka.v2+json
3.3.20.6. タグ
- プロデューサー
- トピック
3.3.20.7. HTTP 要求の例
3.3.20.7.1. 要求のボディー
{ "records" : [ { "key" : "key1", "value" : "value1" }, { "value" : "value2" } ] }
{
"records" : [ {
"key" : "key1",
"value" : "value1"
}, {
"value" : "value2"
} ]
}
3.3.20.8. HTTP 応答の例
3.3.20.8.1. Response 200
{ "offsets" : [ { "partition" : 2, "offset" : 0 }, { "partition" : 1, "offset" : 1 }, { "partition" : 2, "offset" : 2 } ] }
{
"offsets" : [ {
"partition" : 2,
"offset" : 0
}, {
"partition" : 1,
"offset" : 1
}, {
"partition" : 2,
"offset" : 2
} ]
}
3.3.20.8.2. Response 404
{ "error_code" : 404, "message" : "The specified topic partition was not found." }
{
"error_code" : 404,
"message" : "The specified topic partition was not found."
}
3.3.20.8.3. Response 422
{ "error_code" : 422, "message" : "The record is not valid." }
{
"error_code" : 422,
"message" : "The record is not valid."
}
3.3.21. GET /topics/{topicname}/partitions/{partitionid}
3.3.21.1. 説明
トピックパーティションのパーティションメタデータを取得します。
3.3.21.2. パラメーター
タイプ | Name | 説明 | スキーマ |
---|---|---|---|
パス |
partitionid | レコードを送信したり、メタデータを取得したりするパーティションの ID。 | 整数 |
パス |
topicname | レコードの送信先またはメタデータの取得元のトピックの名前。 | 文字列 |
3.3.21.3. 応答
HTTP コード | 説明 | スキーマ |
---|---|---|
200 | パーティションメタデータ | |
404 | 指定されたトピックパーティションが見つかりませんでした。 |
3.3.21.4. 生成されるもの
-
application/vnd.kafka.v2+json
3.3.21.5. タグ
- トピック
3.3.21.6. HTTP 応答の例
3.3.21.6.1. Response 200
{ "partition" : 1, "leader" : 1, "replicas" : [ { "broker" : 1, "leader" : true, "in_sync" : true }, { "broker" : 2, "leader" : false, "in_sync" : true } ] }
{
"partition" : 1,
"leader" : 1,
"replicas" : [ {
"broker" : 1,
"leader" : true,
"in_sync" : true
}, {
"broker" : 2,
"leader" : false,
"in_sync" : true
} ]
}
3.3.21.6.2. Response 404
{ "error_code" : 404, "message" : "The specified topic partition was not found." }
{
"error_code" : 404,
"message" : "The specified topic partition was not found."
}
3.3.22. GET /topics/{topicname}/partitions/{partitionid}/offsets
3.3.22.1. 説明
トピックパーティションのオフセットの概要を取得します。
3.3.22.2. パラメーター
タイプ | Name | 説明 | スキーマ |
---|---|---|---|
パス |
partitionid | パーティションの ID。 | 整数 |
パス |
topicname | パーティションを含むトピックの名前。 | 文字列 |
3.3.22.3. 応答
HTTP コード | 説明 | スキーマ |
---|---|---|
200 | トピックパーティションのオフセットの要約。 | |
404 | 指定されたトピックパーティションが見つかりませんでした。 |
3.3.22.4. 生成されるもの
-
application/vnd.kafka.v2+json
3.3.22.5. タグ
- トピック
3.3.22.6. HTTP 応答の例
3.3.22.6.1. Response 200
{ "beginning_offset" : 10, "end_offset" : 50 }
{
"beginning_offset" : 10,
"end_offset" : 50
}
3.3.22.6.2. Response 404
{ "error_code" : 404, "message" : "The specified topic partition was not found." }
{
"error_code" : 404,
"message" : "The specified topic partition was not found."
}
付録A サブスクリプションの使用
AMQ Streams は、ソフトウェアサブスクリプションから提供されます。サブスクリプションを管理するには、Red Hat カスタマーポータルでアカウントにアクセスします。
アカウントへのアクセス
- access.redhat.com に移動します。
- アカウントがない場合は、作成します。
- アカウントにログインします。
サブスクリプションのアクティベート
- access.redhat.com に移動します。
- サブスクリプション に移動します。
- Activate a subscription に移動し、16 桁のアクティベーション番号を入力します。
Zip および Tar ファイルのダウンロード
zip または tar ファイルにアクセスするには、カスタマーポータルを使用して、ダウンロードする関連ファイルを検索します。RPM パッケージを使用している場合は、この手順は必要ありません。
- ブラウザーを開き、access.redhat.com/downloads で Red Hat カスタマーポータルの Product Downloads ページにログインします。
- INTEGRATION AND AUTOMATION カテゴリーで、AMQ Streams for Apache Kafka エントリーを見つけます。
- 必要な AMQ Streams 製品を選択します。Software Downloads ページが開きます。
- コンポーネントの Download リンクをクリックします。
改訂日時: 2022-07-12 13:29:11 +1000