第6章 Kafka Bridge
本章では、AMQ Streams Kafka Bridge について概説し、その REST API を使用して AMQ Streams と対話するために役立つ情報を提供します。
- ローカル環境で Kafka Bridge を試すには、本章で後述する 「Kafka Bridge クイックスタート」 を参照してください。
- 詳細な設定手順は、「Kafka Bridge クラスターの設定」 を参照してください。
- API ドキュメントは、「Kafka Bridge API reference」を参照してください。
6.1. Kafka Bridge の概要
AMQ Streams Kafka Bridge をインターフェースとして使用し、Kafka クラスターに対して特定タイプの HTTP リクエストを行うことができます。
6.1.1. Kafka Bridge インターフェース
Kafka Bridge では、HTTP ベースのクライアントと Kafka クラスターとの対話を可能にする RESTful インターフェースが提供されます。 また、クライアントアプリケーションによる Kafka プロトコルの変換は必要なく、Web API コネクションの利点が AMQ Streams に提供されます。
API には consumers
と topics
の 2 つの主なリソースがあります。これらのリソースは、Kafka クラスターでコンシューマーおよびプロデューサーと対話するためにエンドポイント経由で公開され、アクセスが可能になります。リソースと関係があるのは Kafka ブリッジのみで、Kafka に直接接続されたコンシューマーやプロデューサーとは関係はありません。
6.1.1.1. HTTP リクエスト
Kafka Bridge は、以下の方法で Kafka クラスターへの HTTP リクエストをサポートします。
- トピックにメッセージを送信する。
- トピックからメッセージを取得する。
- トピックのパーティションリストを取得する。
- コンシューマーを作成および削除する。
- コンシューマーをトピックにサブスクライブし、このようなトピックからメッセージを受信できるようにする。
- コンシューマーがサブスクライブしているトピックの一覧を取得する。
- トピックからコンシューマーのサブスクライブを解除する。
- パーティションをコンシューマーに割り当てる。
- コンシューマーオフセットの一覧をコミットする。
- パーティションで検索して、コンシューマーが最初または最後のオフセットの位置、または指定のオフセットの位置からメッセージを受信できるようにする。
上記の方法で、JSON 応答と HTTP 応答コードのエラー処理を行います。メッセージは JSON またはバイナリー形式で送信できます。
クライアントは、ネイティブの Kafka プロトコルを使用する必要なくメッセージを生成して使用できます。
その他のリソース
- リクエストおよび応答の例など、API ドキュメントを確認するには、『Strimzi Kafka Bridge Documentation 』を参照してください。
6.1.2. Kafka Bridge でサポートされるクライアント
Kafka Bridge を使用して、内部および外部の HTTP クライアントアプリケーションの両方を Kafka クラスターに統合できます。
- 内部クライアント
-
内部クライアントとは、Kafka Bridge 自体と同じ OpenShift クラスターで実行されるコンテナーベースの HTTP クライアントのことです。内部クライアントは、ホストの Kafka Bridge および
KafkaBridge
のカスタムリソースで定義されたポートにアクセスできます。 - 外部クライアント
- 外部クライアントとは、Kafka Bridge がデプロイおよび実行される OpenShift クラスター外部で実行される HTTP クライアントのことです。外部クライアントは、OpenShift Route、ロードバランサーサービス、または Ingress を使用して Kafka Bridge にアクセスできます。
HTTP 内部および外部クライアントの統合
6.1.3. Kafka Bridge のセキュリティー保護
AMQ Streams には、現在 Kafka Bridge の暗号化、認証、または承認は含まれていません。そのため、外部クライアントから Kafka Bridge に送信されるリクエストは以下のようになります。
- 暗号化されず、HTTPS ではなく HTTP を使用する必要がある。
- 認証なしで送信される。
ただし、以下のような他の方法で Kafka Bridge をセキュアにできます。
- Kafka Bridge にアクセスできる Pod を定義する OpenShift ネットワークポリシー。
- 認証または承認によるリバースプロキシー (例: OAuth2 プロキシー)。
- API ゲートウェイ。
- TLS 終端をともなう Ingress または OpenShift ルート。
Kafka Bridge では、Kafka Broker への接続時に TLS 暗号化と、TLS および SASL 認証がサポートされます。OpenShift クラスター内で以下を設定できます。
- Kafka Bridge と Kafka クラスター間の TLS または SASL ベースの認証。
- Kafka Bridge と Kafka クラスター間の TLS 暗号化接続。
詳細は、「Kafka Bridge の設定」 を参照してください。
Kafka ブローカーで ACL を使用することで、Kafka Bridge を使用して消費および生成できるトピックを制限することができます。
6.1.4. OpenShift 外部の Kafka Bridge へのアクセス
デプロイメント後、AMQ Streams Kafka Bridge には同じ OpenShift クラスターで実行しているアプリケーションのみがアクセスできます。これらのアプリケーションは、kafka-bridge-name-bridge-service
サービスを使用して API にアクセスします。
OpenShift クラスター外部で実行しているアプリケーションに Kafka Bridge がアクセスできるようにする場合は、以下の機能のいずれかを使用して Kafka Bridge を手動で公開できます。
- LoadBalancer または NodePort タイプのサービス
- Ingress リソース
- OpenShift ルート
サービスを作成する場合には、selector
で以下のラベルを使用して、サービスがトラフィックをルーティングする Pod を設定します。
# ...
selector:
strimzi.io/cluster: kafka-bridge-name 1
strimzi.io/kind: KafkaBridge
#...
- 1
- OpenShift クラスターでの Kafka Bridge カスタムリソースの名前。
6.1.5. Kafka Bridge へのリクエスト
データ形式と HTTP ヘッダーを指定し、有効なリクエストが Kafka Bridge に送信されるようにします。
6.1.5.1. コンテンツタイプヘッダー
API リクエストおよびレスポンス本文は、常に JSON としてエンコードされます。
コンシューマー操作の実行時に、
POST
リクエストの本文が空でない場合は、以下のContent-Type
ヘッダーが含まれている必要があります。Content-Type: application/vnd.kafka.v2+json
プロデューサー操作の実行時に、
POST
リクエストは、生成されたメッセージの 埋め込みデータ形式 を指定するContent-Type
ヘッダーを提供する必要があります。これは、json
またはbinary
のいずれかになります。埋め込みデータ形式 Content-Type ヘッダー JSON
Content-Type: application/vnd.kafka.json.v2+json
バイナリー
Content-Type: application/vnd.kafka.binary.v2+json
次のセクションで説明どおり、埋め込みデータ形式はコンシューマーごとに設定されます。
POST
リクエストに空のボディーがある場合は、Content-Type
を設定しないでください。空のボディーを使用して、デフォルト値のコンシューマーを作成できます。
6.1.5.2. 埋め込みデータ形式
埋め込みデータ形式は、Kafka メッセージが Kafka Bridge によりプロデューサーからコンシューマーに HTTP で送信される際の形式です。サポートされる埋め込みデータ形式には、JSON とバイナリーの 2 種類があります。
/consumers/groupid
エンドポイントを使用してコンシューマーを作成する場合、POST
リクエスト本文で JSON またはバイナリーいずれかの埋め込みデータ形式を指定する必要があります。これは、以下の例のように format
フィールドで指定します。
{
"name": "my-consumer",
"format": "binary", 1
...
}
- 1
- バイナリー埋め込みデータ形式。
コンシューマーの作成時に指定する埋め込みデータ形式は、コンシューマーが消費する Kafka メッセージのデータ形式と一致する必要があります。
バイナリー埋め込みデータ形式を指定する場合は、以降のプロデューサーリクエストで、リクエスト本文にバイナリーデータが Base64 でエンコードされた文字列として含まれる必要があります。たとえば、/topics/topicname
エンドポイントを使用してメッセージを送信する場合は、records.value
を Base64 でエンコードする必要があります。
{ "records": [ { "key": "my-key", "value": "ZWR3YXJkdGhldGhyZWVsZWdnZWRjYXQ=" }, ] }
プロデューサーリクエストは、埋め込みデータ形式に対応する Content-Type
ヘッダーも提供する必要があります (例: Content-Type: application/vnd.kafka.binary.v2+json
)。
6.1.5.3. メッセージの形式
/topics
エンドポイントを使用してメッセージを送信する場合は、records
パラメーターでリクエストボディーにメッセージペイロードを入力します。
records
パラメーターには、以下のオプションフィールドを含めることができます。
-
メッセージの
headers
-
メッセージの
key
-
メッセージの
value
-
宛先の
partition
トピックへの POST
リクエストの例
curl -X POST \
http://localhost:8080/topics/my-topic \
-H 'content-type: application/vnd.kafka.json.v2+json' \
-d '{
"records": [
{
"key": "my-key",
"value": "sales-lead-0001"
"partition": 2
"headers": [
{
"key": "key1",
"value": "QXBhY2hlIEthZmthIGlzIHRoZSBib21iIQ==" 1
}
]
},
]
}'
- 1
- バイナリー形式のヘッダー値。Base64 としてエンコードされます。
6.1.5.4. Accept ヘッダー
コンシューマーを作成したら、以降のすべての GET リクエストには Accept
ヘッダーが以下のような形式で含まれる必要があります。
Accept: application/vnd.kafka.EMBEDDED-DATA-FORMAT.v2+json
EMBEDDED-DATA-FORMAT
は、json
または binary
のどちらかです。
たとえば、サブスクライブされたコンシューマーのレコードを JSON 埋め込みデータ形式で取得する場合、この Accept ヘッダーが含まれるようにします。
Accept: application/vnd.kafka.json.v2+json
6.1.6. CORS
CORS (Cross-Origin Resource Sharing) を使用すると、Kafka Bridge HTTP の設定 で Kafka クラスターにアクセスするために許可されるメソッドおよび元の URL を指定できます。
Kafka Bridge の CORS 設定例
# ... cors: allowedOrigins: "https://strimzi.io" allowedMethods: "GET,POST,PUT,DELETE,OPTIONS,PATCH" # ...
CORS では、異なるドメイン上のオリジンソース間での シンプルな リクエストおよび プリフライト リクエストが可能です。
シンプルなリクエストは、GET
、HEAD
、および POST
メソッドを使用する標準のリクエストに適しています。
プリフライトリクエストは、実際のリクエストが安全に送信できることを確認する最初のチェックとして HTTP OPTIONS リクエストを送信します。確認時に、実際のリクエストが送信されます。プリフライトリクエストは、PUT
および DELETE
などのより安全な手段が必要な方法や標準でないヘッダーを使用する方法に適しています。
すべての要求には、HTTP リクエストのソースであるヘッダーの Origin
値が必要です。
6.1.6.1. シンプルなリクエスト
たとえば、このシンプルなリクエストヘッダーは、オリジンを https://strimzi.io
と指定します。
Origin: https://strimzi.io
ヘッダー情報がリクエストに追加されます。
curl -v -X GET HTTP-ADDRESS/bridge-consumer/records \
-H 'Origin: https://strimzi.io'\
-H 'content-type: application/vnd.kafka.v2+json'
Kafka Bridge からの応答として、Access-Control-Allow-Origin
ヘッダーが返されます。
HTTP/1.1 200 OK
Access-Control-Allow-Origin: * 1
- 1
- 返されたアスタリスク (
*
) は、どのドメインでもリソースにアクセスできることを意味します。
6.1.6.2. プリフライトリクエスト
最初のプリフライトリクエストは、OPTIONS
メソッドを使用して Kafka Bridge に送信されます。HTTP OPTIONS リクエストはヘッダー情報を送信し、Kafka Bridge が実際のリクエストを許可することを確認します。
ここでは、プリフライトリクエストは https://strimzi.io
からの POST
リクエストが有効であることを確認します。
OPTIONS /my-group/instances/my-user/subscription HTTP/1.1 Origin: https://strimzi.io Access-Control-Request-Method: POST 1 Access-Control-Request-Headers: Content-Type 2
プリフライトリクエストのヘッダー情報に OPTIONS
が追加されます。
curl -v -X OPTIONS -H 'Origin: https://strimzi.io' \ -H 'Access-Control-Request-Method: POST' \ -H 'content-type: application/vnd.kafka.v2+json'
Kafka Bridge は最初のリクエストに応答し、リクエストが受け入れられることを確認します。応答ヘッダーは、許可されるオリジン、メソッド、およびヘッダーを返します。
HTTP/1.1 200 OK Access-Control-Allow-Origin: https://strimzi.io Access-Control-Allow-Methods: GET,POST,PUT,DELETE,OPTIONS,PATCH Access-Control-Allow-Headers: content-type
オリジンまたはメソッドが拒否されると、エラーメッセージが返されます。
プリフライトリクエストで確認されたため、実際のリクエストには Access-Control-Request-Method
ヘッダーは必要ありませんが、オリジンのヘッダーが必要です。
curl -v -X POST HTTP-ADDRESS/topics/bridge-topic \
-H 'Origin: https://strimzi.io' \
-H 'content-type: application/vnd.kafka.v2+json'
応答は、送信元 URL が許可されることを示します。
HTTP/1.1 200 OK Access-Control-Allow-Origin: https://strimzi.io
その他のリソース
Fetch CORS 仕様
6.1.7. Kafka Bridge API リソース
リクエストやレスポンスの例などを含む REST API エンドポイントおよび説明の完全リストは、「Kafka Bridge API reference」を参照してください。
6.1.8. Kafka Bridge デプロイメント
Cluster Operator を使用して、Kafka Bridge を OpenShift クラスターにデプロイします。
Kafka Bridge をデプロイすると、Cluster Operator により OpenShift クラスターに Kafka Bridge オブジェクトが作成されます。オブジェクトには、デプロイメント、サービス、および Pod が含まれ、それぞれ Kafka Bridge のカスタムリソースに付与された名前が付けられます。
その他のリソース
- デプロイメントの手順は『OpenShift での AMQ Streams のデプロイおよびアップグレード』の「Kafka Bridge を OpenShift クラスターへデプロイ」を参照してください。
- Kafka Bridge の設定に関する詳細は、「Kafka Bridge クラスターの設定」 を参照してください。
-
KafkaBridge
リソースのホストおよびポートの設定に関する詳細は、「Kafka Bridge の設定」 を参照してください。 - 外部クライアントの統合に関する詳細は、「OpenShift 外部の Kafka Bridge へのアクセス」 を参照してください。
6.2. Kafka Bridge クイックスタート
このクイックスタートを使用して、ローカルの開発環境で AMQ Streams の Kafka Bridge を試すことができます。以下の方法について説明します。
- OpenShift クラスターに Kafka Bridge をデプロイする。
- ポート転送を使用して Kafka Bridge サービスをローカルマシンに公開する。
- Kafka クラスターのトピックおよびパーティションへのメッセージを生成する。
- Kafka Bridge コンシューマーを作成する。
- 基本的なコンシューマー操作を実行する (たとえば、コンシューマーをトピックにサブスクライブする、生成したメッセージを取得するなど)。
このクイックスタートでは、HTTP リクエストはターミナルにコピーおよび貼り付けできる curl コマンドを使用します。OpenShift クラスターへのアクセスが必要になります。ローカルの OpenShift クラスターを実行および管理するには、Minikube、CodeReady Containers、または MiniShift などのツールを使用します。
前提条件を確認し、本章に指定されている順序でタスクを行うようにしてください。
データ形式について
このクイックスタートでは、バイナリーではなく JSON 形式でメッセージを生成および消費します。リクエスト例で使用されるデータ形式および HTTP ヘッダーの詳細は、「Kafka Bridge へのリクエスト」 を参照してください。
クイックスタートの前提条件
- ローカルまたはリモート OpenShift クラスターにアクセスできるクラスター管理者権限が必要です。
- AMQ Streams がインストールされている必要があります。
- Cluster Operator によってデプロイされた稼働中の Kafka クラスターが OpenShift namespace に必要です。
- Entity Operator がデプロイされ、Kafka クラスターの一部として稼働している必要があります。
6.2.1. OpenShift クラスターへの Kafka Bridge のデプロイメント
AMQ Streams には、AMQ Streams Kafka Bridge の設定を指定する YAML サンプルが含まれています。このファイルに最小限の変更を加え、Kafka Bridge のインスタンスを OpenShift クラスターにデプロイします。
手順
examples/bridge/kafka-bridge.yaml
ファイルを編集します。apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaBridge metadata: name: quickstart 1 spec: replicas: 1 bootstrapServers: <cluster-name>-kafka-bootstrap:9092 2 http: port: 8080
Kafka Bridge を OpenShift クラスターにデプロイします。
oc apply -f examples/bridge/kafka-bridge.yaml
quickstart-bridge
デプロイメント、サービス、および他の関連リソースが OpenShift クラスターに作成されます。Kafka Bridge が正常にデプロイされたことを確認します。
oc get deployments
NAME READY UP-TO-DATE AVAILABLE AGE quickstart-bridge 1/1 1 1 34m my-cluster-connect 1/1 1 1 24h my-cluster-entity-operator 1/1 1 1 24h #...
次のステップ
Kafka Bridge を OpenShift クラスターにデプロイしたら、Kafka Bridge サービスをローカルマシンに公開します。
その他のリソース
- Kafka Bridge の設定に関する詳細は、「Kafka Bridge クラスターの設定」 を参照してください。
6.2.2. Kafka Bridge サービスのローカルマシンへの公開
次に、ポート転送を使用して AMQ Streams の Kafka Bridge サービスを http://localhost:8080 上でローカルマシンに公開します。
ポート転送は、開発およびテストの目的でのみ適切です。
手順
OpenShift クラスターの Pod の名前をリストします。
oc get pods -o name pod/kafka-consumer # ... pod/quickstart-bridge-589d78784d-9jcnr pod/strimzi-cluster-operator-76bcf9bc76-8dnfm
ポート
8080
でquickstart-bridge
Pod に接続します。oc port-forward pod/quickstart-bridge-589d78784d-9jcnr 8080:8080 &
注記ローカルマシンのポート 8080 がすでに使用中の場合は、代わりの HTTP ポート (
8008
など) を使用します。
これで、API リクエストがローカルマシンのポート 8080 から Kafka Bridge Pod のポート 8080 に転送されるようになります。
6.2.3. トピックおよびパーティションへのメッセージの作成
次に、topics エンドポイントを使用して、トピックへのメッセージを JSON 形式で生成します。以下に示すように、メッセージの宛先パーティションをリクエスト本文に指定できます。partitions エンドポイントは、全メッセージの単一の宛先パーティションをパスパラメーターとして指定する代替方法を提供します。
手順
テキストエディターを使用して、3 つのパーティションがある Kafka トピックの YAML 定義を作成します。
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: name: bridge-quickstart-topic labels: strimzi.io/cluster: <kafka-cluster-name> 1 spec: partitions: 3 2 replicas: 1 config: retention.ms: 7200000 segment.bytes: 1073741824
-
ファイルを
bridge-quickstart-topic.yaml
としてexamples/topic
ディレクトリーに保存します。 OpenShift クラスターにトピックを作成します。
oc apply -f examples/topic/bridge-quickstart-topic.yaml
Kafka Bridge を使用して、作成したトピックに 3 つのメッセージを生成します。
curl -X POST \ http://localhost:8080/topics/bridge-quickstart-topic \ -H 'content-type: application/vnd.kafka.json.v2+json' \ -d '{ "records": [ { "key": "my-key", "value": "sales-lead-0001" }, { "value": "sales-lead-0002", "partition": 2 }, { "value": "sales-lead-0003" } ] }'
-
sales-lead-0001
は、キーのハッシュに基づいてパーティションに送信されます。 -
sales-lead-0002
は、パーティション 2 に直接送信されます。 -
sales-lead-0003
は、ラウンドロビン方式を使用してbridge-quickstart-topic
トピックのパーティションに送信されます。
-
リクエストが正常に行われると、Kafka Bridge は
offsets
アレイを200
コードとapplication/vnd.kafka.v2+json
のcontent-type
ヘッダーとともに返します。各メッセージで、offsets
アレイは以下を記述します。- メッセージが送信されたパーティション。
パーティションの現在のメッセージオフセット。
応答の例
#... { "offsets":[ { "partition":0, "offset":0 }, { "partition":2, "offset":0 }, { "partition":0, "offset":1 } ] }
次のステップ
トピックおよびパーティションへのメッセージを作成したら、Kafka Bridge コンシューマーを作成します。
その他のリソース
- API リファレンスドキュメントの「POST /topics/{topicname}」
- API リファレンスドキュメントの「POST /topics/{topicname}/partitions/{partitionid}」
6.2.4. Kafka Bridge コンシューマーの作成
Kafka クラスターで何らかのコンシューマー操作を実行するには、まず consumers エンドポイントを使用してコンシューマーを作成する必要があります。コンシューマーは Kafka Bridge コンシューマー と呼ばれます。
手順
bridge-quickstart-consumer-group
という名前の新しいコンシューマーグループに Kafka Bridge コンシューマーを作成します。curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group \ -H 'content-type: application/vnd.kafka.v2+json' \ -d '{ "name": "bridge-quickstart-consumer", "auto.offset.reset": "earliest", "format": "json", "enable.auto.commit": false, "fetch.min.bytes": 512, "consumer.request.timeout.ms": 30000 }'
-
コンシューマーには
bridge-quickstart-consumer
という名前を付け、埋め込みデータ形式はjson
として設定します。 - 一部の基本的な設定が定義されます。
コンシューマーはログへのオフセットに自動でコミットしません。これは、
enable.auto.commit
がfalse
に設定されているからです。このクイックスタートでは、オフセットを跡で手作業でコミットします。リクエストが正常に行われると、Kafka Bridge はレスポンス本文でコンシューマー ID (
instance_id
) とベース URL (base_uri
) を200
コードとともに返します。応答の例
#... { "instance_id": "bridge-quickstart-consumer", "base_uri":"http://<bridge-name>-bridge-service:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer" }
-
コンシューマーには
-
ベース URL (
base_uri
) をコピーし、このクイックスタートの他のコンシューマー操作で使用します。
次のステップ
上記で作成した Kafka Bridge コンシューマーをトピックにサブスクライブできます。
その他のリソース
- API リファレンスドキュメントの「POST /consumers/{groupid}」
6.2.5. Kafka Bridge コンシューマーのトピックへのサブスクライブ
Kafka Bridge コンシューマーを作成したら、subscription エンドポイントを使用して、1 つ以上のトピックにサブスクライブします。サブスクライブすると、コンシューマーはトピックに生成されたすべてのメッセージの受信を開始します。
手順
前述の「トピックおよびパーティションへのメッセージの作成」の手順ですでに作成した
bridge-quickstart-topic
トピックに、コンシューマーをサブスクライブします。curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/subscription \ -H 'content-type: application/vnd.kafka.v2+json' \ -d '{ "topics": [ "bridge-quickstart-topic" ] }'
topics
アレイには、例のような単一のトピック、または複数のトピックを含めることができます。正規表現に一致する複数のトピックにコンシューマーをサブスクライブする場合は、topics
アレイの代わりにtopic_pattern
文字列を使用できます。リクエストが正常に行われると、Kafka Bridge によって
204
(No Content) コードのみが返されます。
次のステップ
Kafka Bridge コンシューマーをトピックにサブスクライブしたら、コンシューマーからメッセージを取得できます。
その他のリソース
- API リファレンスドキュメントの「POST /consumers/{groupid}/instances/{name}/subscription」
6.2.6. Kafka Bridge コンシューマーからの最新メッセージの取得
次に、records エンドポイントからデータをリクエストすることで、Kafka Bridge コンシューマーから最新メッセージを取得します。実稼働環境では、HTTP クライアントはこのエンドポイントを繰り返し (ループで) 呼び出すことができます。
手順
- 「トピックおよびパーティションへのメッセージの生成」の説明に従い、Kafka Bridge コンシューマーに新たなメッセージを生成します。
GET
リクエストをrecords
エンドポイントに送信します。curl -X GET http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/records \ -H 'accept: application/vnd.kafka.json.v2+json'
Kafka Bridge コンシューマーを作成し、サブスクライブすると、最初の GET リクエストによって空のレスポンスが返されます。これは、ポーリング操作がリバランスプロセスを開始してパーティションを割り当てるからです。
手順 2 を繰り返し、Kafka Bridge コンシューマーからメッセージを取得します。
Kafka Bridge は、レスポンス本文でメッセージのアレイ (トピック名、キー、値、パーティション、オフセットの記述) を
200
コードとともに返します。メッセージはデフォルトで最新のオフセットから取得されます。HTTP/1.1 200 OK content-type: application/vnd.kafka.json.v2+json #... [ { "topic":"bridge-quickstart-topic", "key":"my-key", "value":"sales-lead-0001", "partition":0, "offset":0 }, { "topic":"bridge-quickstart-topic", "key":null, "value":"sales-lead-0003", "partition":0, "offset":1 }, #...
注記空のレスポンスが返される場合は、「トピックおよびパーティションへのメッセージの生成」の説明に従い、コンシューマーに対して追加のレコードを生成し、メッセージの取得を再試行します。
次のステップ
Kafka Bridge コンシューマーからメッセージを取得したら、ログへのオフセットをコミットします。
その他のリソース
- API リファレンスドキュメントの「GET /consumers/{groupid}/instances/{name}/records」
6.2.7. ログへのオフセットのコミット
次に、offsets エンドポイントを使用して、Kafka Bridge コンシューマーによって受信されるすべてのメッセージに対して、手動でオフセットをログにコミットします。この操作が必要なのは、前述の「Kafka Bridge コンシューマーの作成」で作成した Kafka Bridge コンシューマー が enable.auto.commit
の設定で false
に指定されているからです。
手順
bridge-quickstart-consumer
のオフセットをログにコミットします。curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/offsets
リクエスト本文は送信されないので、オフセットはコンシューマーによって受信されたすべてのレコードに対してコミットされます。この代わりに、リクエスト本文に、オフセットをコミットするトピックおよびパーティションを指定するアレイ (OffsetCommitSeekList) を含めることができます。
リクエストが正常に行われると、Kafka Bridge は
204
コードのみを返します。
次のステップ
オフセットをログにコミットしたら、オフセットをシークのエンドポイントを試行します。
その他のリソース
- API リファレンスドキュメントの「POST /consumers/{groupid}/instances/{name}/offsets」
6.2.8. パーティションのオフセットのシーク
次に、positions エンドポイントを使用して、Kafka Bridge コンシューマーを設定することで、パーティションのメッセージを特定のオフセットから取得し、さらに最新のオフセットから取得します。これは Apache Kafka では、シーク操作と呼ばれます。
手順
quickstart-bridge-topic
トピックで、パーティション 0 の特定のオフセットをシークします。curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/positions \ -H 'content-type: application/vnd.kafka.v2+json' \ -d '{ "offsets": [ { "topic": "bridge-quickstart-topic", "partition": 0, "offset": 2 } ] }'
リクエストが正常に行われると、Kafka Bridge は
204
コードのみを返します。GET
リクエストをrecords
エンドポイントに送信します。curl -X GET http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/records \ -H 'accept: application/vnd.kafka.json.v2+json'
Kafka Bridge は、シークしたオフセットからのメッセージを返します。
同じパーティションの最後のオフセットをシークし、デフォルトのメッセージ取得動作を復元します。この時点で、positions/end エンドポイントを使用します。
curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/positions/end \ -H 'content-type: application/vnd.kafka.v2+json' \ -d '{ "partitions": [ { "topic": "bridge-quickstart-topic", "partition": 0 } ] }'
リクエストが正常に行われると、Kafka Bridge は別の
204
コードを返します。
また、positions/beginning エンドポイントを使用して、1 つ以上のパーティションの最初のオフセットをシークすることもできます。
次のステップ
このクイックスタートでは、AMQ Streams Kafka Bridge を使用して Kafka クラスターの一般的な操作をいくつか実行しました。これで、すでに作成した Kafka Bridge コンシューマーを削除 できます。
その他のリソース
- API リファレンスドキュメントの「POST /consumers/{groupid}/instances/{name}/positions」
- API リファレンスドキュメントの「POST /consumers/{groupid}/instances/{name}/positions/beginning」
- API リファレンスドキュメントの「POST /consumers/{groupid}/instances/{name}/positions/end」
6.2.9. Kafka Bridge コンシューマーの削除
最後に、このクイックスタートを通して使用した Kafa Bridge コンシューマーを削除します。
手順
DELETE
リクエストを instances エンドポイントに送信し、Kafka Bridge コンシューマーを削除します。curl -X DELETE http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer
リクエストが正常に行われると、Kafka Bridge は
204
コードのみを返します。
その他のリソース
- API リファレンスドキュメントの「DELETE /consumers/{groupid}/instances/{name}」