第13章 Kafka Bridge
本章では、AMQ Streams Kafka Bridge on Red Hat Enterprise Linux について概説し、その REST API を使用して AMQ Streams と対話するためのサポートをします。ローカル環境で Kafka Bridge を試すには、本章で後述する 「Kafka Bridge クイックスタート」 を参照してください。
関連情報
- リクエストおよび応答の例など、API ドキュメントを確認するには、『Strimzi Kafka Bridge Documentation 』を参照してください。
- 分散トレーシング用に Kafka Bridge を設定するには、「Kafka Bridge のトレースの有効化」 を参照してください。
13.1. Kafka Bridge の概要
Kafka Bridge では、HTTP ベースのクライアントと Kafka クラスターとの対話を可能にする RESTful インターフェイスが提供されます。また、クライアントアプリケーションが Kafka プロトコルを変換する必要なく、AMQ Streams で Web API コネクションの利点を活用できます。
API には consumers
と topics
の 2 つの主なリソースがあります。これらのリソースは、Kafka クラスターでコンシューマーおよびプロデューサーと対話するためにエンドポイント経由で公開され、アクセスが可能になります。リソースと関係があるのは Kafka ブリッジのみで、Kafka に直接接続されたコンシューマーやプロデューサーとは関係はありません。
HTTP 要求
Kafka Bridge は、以下の方法で Kafka クラスターへの HTTP 要求をサポートします。
- トピックにメッセージを送信する。
- トピックからメッセージを取得する。
- トピックのパーティションリストを取得する。
- コンシューマーを作成および削除する。
- コンシューマーをトピックにサブスクライブし、このようなトピックからメッセージを受信できるようにする。
- コンシューマーがサブスクライブしているトピックの一覧を取得する。
- トピックからコンシューマーのサブスクライブを解除する。
- パーティションをコンシューマーに割り当てる。
- コンシューマーオフセットの一覧をコミットする。
- パーティションで検索して、コンシューマーが最初または最後のオフセットの位置、または指定のオフセットの位置からメッセージを受信できるようにする。
上記の方法で、JSON 応答と HTTP 応答コードのエラー処理を行います。メッセージは JSON またはバイナリー形式で送信できます。
クライアントは、ネイティブの Kafka プロトコルを使用する必要なくメッセージを生成して使用できます。
AMQ Streams のインストールと同様に、Red Hat Enterprise Linux にインストールするために Kafka Bridge ファイルをダウンロードできます。「Kafka Bridge アーカイブのダウンロード」 を参照してください。
KafkaBridge
リソースのホストおよびポートの設定に関する詳細は、「Kafka Bridge プロパティーの設定」 を参照してください。
13.1.1. 認証および暗号化
HTTP クライアントと Kafka Bridge 間の認証および暗号化はまだサポートされていません。そのため、クライアントから Kafka Bridge に送信されるリクエストは以下のようになります。
- 暗号化されず、HTTPS ではなく HTTP を使用する必要がある。
- 認証なしで送信される。
Kafka Bridge と Kafka クラスターとの間で、TLS または SASL ベースの認証 を設定できます。
プロパティーファイル を使用して、認証用に Kafka Bridge を設定します。
13.1.2. Kafka Bridge への要求
データ形式と HTTP ヘッダーを指定し、有効な要求が Kafka Bridge に送信されるようにします。
API 要求および応答本文は、常に JSON としてエンコードされます。
13.1.2.1. コンテンツタイプヘッダー
すべてのリクエストに対して、Content-Type
のヘッダーを提出する必要があります。唯一の例外は、POST
リクエストボディが空の場合で、Content-Type
ヘッダーを追加するとリクエストが失敗します。
コンシューマー操作 (/consumers
エンドポイント) とプロデューサー操作 (/topics
エンドポイント) では、異なる Content-Type
ヘッダーが必要です。
コンシューマー操作の Content-Type ヘッダー
埋め込まれたデータ形式にかかわらず、コンシューマー操作の POST
リクエストは、リクエストボディーにデータが含まれている場合に、以下の Content-Type
ヘッダーを提供する必要があります。
Content-Type: application/vnd.kafka.v2+json
プロデューサー操作の Content-Type ヘッダー
プロデューサー操作を行う場合に、POST
要求には、生成されるメッセージの 埋め込みデータ形式 を示す Content-Type
ヘッダーを指定する必要があります。これは json
または binary
のいずれかになります。
埋め込みデータ形式 | Content-Type ヘッダー |
---|---|
JSON |
|
バイナリー |
|
次のセクションで説明されているように、埋め込みデータ形式はコンシューマーごとに設定されます。
POST
要求の本文が空の場合は、Content-Type
を設定しないでください。空の本文を使用して、デフォルト値のコンシューマーを作成できます。
13.1.2.2. 埋め込みデータ形式
埋め込みデータ形式は、Kafka メッセージが Kafka Bridge によりプロデューサーからコンシューマーに HTTP で送信される際の形式です。サポートされる埋め込みデータ形式には、JSON またはバイナリーの 2 つがサポートされます。
/consumers/groupid
エンドポイントを使用してコンシューマーを作成する場合、POST
リクエスト本文で JSON またはバイナリーいずれかの埋め込みデータ形式を指定する必要があります。これは、以下の例のようにリクエストボディーの format
フィールドで指定します。
{
"name": "my-consumer",
"format": "binary", 1
...
}
- 1
- バイナリー埋め込みデータ形式。
コンシューマーの埋め込みデータ形式が指定されていない場合は、バイナリー形式が設定されます。
コンシューマーの作成時に指定する埋め込みデータ形式は、コンシューマーが消費する Kafka メッセージのデータ形式と一致する必要があります。
バイナリー埋め込みデータ形式を指定する場合は、以降のプロデューサー要求で、要求本文にバイナリーデータが Base64 でエンコードされた文字列として含まれる必要があります。たとえば、POST
リクエストを /topics/topicname
エンドポイントに対して作成してメッセージを送信する場合、value
は Base64 でエンコードする必要があります。
{ "records": [ { "key": "my-key", "value": "ZWR3YXJkdGhldGhyZWVsZWdnZWRjYXQ=" }, ] }
プロデューサー要求には、埋め込みデータ形式に対応する Content-Type
ヘッダーも含まれる必要があります (例: Content-Type: application/vnd.kafka.binary.v2+json
)。
13.1.2.3. メッセージの形式
/topics
エンドポイントを使用してメッセージを送信する場合は、records
パラメーターの要求本文にメッセージペイロードを入力します。
records
パラメーターには、以下のオプションフィールドを含めることができます。
-
Message
key
-
Message
value
-
Destination
partition
-
Message
headers
/topicsへのPOST
要求の例
curl -X POST \
http://localhost:8080/topics/my-topic \
-H 'content-type: application/vnd.kafka.json.v2+json' \
-d '{
"records": [
{
"key": "my-key",
"value": "sales-lead-0001"
"partition": 2
"headers": [
{
"key": "key1",
"value": "QXBhY2hlIEthZmthIGlzIHRoZSBib21iIQ==" 1
}
]
},
]
}'
- 1
- バイナリー形式のヘッダー値。Base64 としてエンコードされます。
13.1.2.4. Accept ヘッダー
コンシューマーを作成したら、以降のすべての GET 要求には Accept
ヘッダーが以下のような形式で含まれる必要があります。
Accept: application/vnd.kafka.embedded-data-format.v2+json
embedded-data-format は json
または binary
です。
たとえば、サブスクライブされたコンシューマーのレコードを JSON 埋め込みデータ形式で取得する場合、この Accept ヘッダーが含まれるようにします。
Accept: application/vnd.kafka.json.v2+json
13.1.3. Kafka Bridge のロガーの設定
AMQ Streams Kafka ブリッジを使用すると、関連する OpenAPI 仕様で定義される操作ごとに異なるログレベルを設定できます。
操作にはそれぞれ、対応の API エンドポイントがあり、このエンドポイントを通して、ブリッジが HTTP クライアントから要求を受信します。各エンドポイントのログレベルを変更すると、受信および送信 HTTP 要求に関する詳細なログ情報を作成できます。
ロガーは log4j.properties
ファイルで定義されます。このファイルには healthy
および ready
エンドポイントの以下のデフォルト設定が含まれています。
log4j.logger.http.openapi.operation.healthy=WARN, out log4j.additivity.http.openapi.operation.healthy=false log4j.logger.http.openapi.operation.ready=WARN, out log4j.additivity.http.openapi.operation.ready=false
その他すべての操作のログレベルは、デフォルトで INFO
に設定されます。ロガーは以下のようにフォーマットされます。
log4j.logger.http.openapi.operation.<operation-id>
<operation-id>
は、特定の操作の識別子です。以下は、OpenAPI 仕様によって定義される操作の一覧です。
-
createConsumer
-
deleteConsumer
-
subscribe
-
unsubscribe
-
poll
-
assign
-
commit
-
send
-
sendToPartition
-
seekToBeginning
-
seekToEnd
-
seek
-
healthy
-
ready
-
openapi
13.1.4. Kafka Bridge API リソース
リクエストやレスポンスの例などを含む REST API エンドポイントおよび説明の完全リストは、「Kafka Bridge API reference」を参照してください。
13.1.5. Kafka Bridge アーカイブのダウンロード
AMQ Streams Kafka Bridge の zip 形式のディストリビューションは、Red Hat の Web サイトからダウンロードできます。
手順
- カスタマーポータル から、最新バージョンの Red Hat AMQ Streams Kafka Bridge アーカイブをダウンロードします。
13.1.6. Kafka Bridge プロパティーの設定
この手順では、AMQ Streams Kafka Bridge によって使用される Kafka および HTTP 接続プロパティーの設定方法を説明します。
Kafka 関連のプロパティーに適切な接頭辞を使用して、他の Kafka クライアントと同様に Kafka Bridge を設定します。
-
kafka.
は、サーバー接続やセキュリティーなど、プロデューサーとコンシューマーに適用される一般的な設定用です。 -
kafka.consumer.
は、コンシューマーにのみ渡されるコンシューマー固有の設定用です。 -
kafka.producer.
は、プロデューサーにのみ渡されるプロデューサー固有の設定用です。
HTTP プロパティーは、Kafka クラスターへの HTTP アクセスを有効にする他に、CPRS (Cross-Origin Resource Sharing) により Kafka Bridge のアクセス制御を有効化または定義する機能を提供します。CORS は、複数のオリジンから指定のリソースにブラウザーでアクセスできるようにする HTTP メカニズムです。CORS を設定するには、許可されるリソースオリジンのリストと、それらにアクセスする HTTP メソッドを定義します。リクエストの追加の HTTP ヘッダーには Kafka クラスターへのアクセスが許可されるオリジンが記述 されています。
手順
AMQ Streams Kafka Bridge のインストールアーカイブにある
application.properties
ファイルを編集します。プロパティーファイルを使用して、Kafka および HTTP 関連のプロパティーを指定し、分散トレースを有効にします。
Kafka コンシューマーおよびプロデューサーに固有のプロパティーなど、標準の Kafka 関連のプロパティーを設定します。
以下を使用します。
-
kafka.bootstrap.servers
: Kafkaクラスターへのホスト/ポート接続を定義する。 -
kafka.producer.acks
: HTTP クライアントに確認を提供する。 kafka.consumer.auto.offset.reset
: Kafka のオフセットのリセットを管理する方法を決定する。Kafka プロパティーの設定に関する詳細は、Apache Kafka の Web サイトを参照してください。
-
Kafka クラスターへの HTTP アクセスを有効にするために HTTP 関連のプロパティーを設定します。
以下に例を示します。
http.enabled=true http.host=0.0.0.0 http.port=8080 1 http.cors.enabled=true 2 http.cors.allowedOrigins=https://strimzi.io 3 http.cors.allowedMethods=GET,POST,PUT,DELETE,OPTIONS,PATCH 4
分散トレースを有効または無効にします。
bridge.tracing=jaeger
分散トレースを有効にするため、プロパティーからコードコメントを削除します。
13.1.7. Kafka Bridge のインストール
以下の手順に従って、AMQ Streams Kafka Bridge on Red Hat Enterprise Linux をインストールします。
前提条件
手順
- AMQ Streams KafkaBridge インストールアーカイブを任意のディレクトリーに解凍します (まだ解答していない場合)。
設定プロパティーをパラメーターとして使用して、Kafka Bridge スクリプトを実行します。
以下に例を示します。
./bin/kafka_bridge_run.sh --config-file=_path_/configfile.properties
インストールが成功したことをログで確認します。
HTTP-Kafka Bridge started and listening on port 8080 HTTP-Kafka Bridge bootstrap servers localhost:9092