第13章 Kafka Bridge
本章では、Red Hat Enterprise Linux での AMQ Streams Kafka Bridge の概要と、その REST API を使用して AMQ Streams と対話する方法を説明します。ローカル環境で Kafka Bridge を試すには、本章で後述する 「Kafka Bridge クイックスタート」 を参照してください。
関連情報
- リクエストおよび応答の例など、API ドキュメントを確認するには、「 Kafka Bridge API reference 」を参照してください。
- 分散トレーシング向けに Kafka Bridge を設定するには、「Kafka Bridge のトレースの有効化」 を参照してください。
13.1. Kafka Bridge の概要 リンクのコピーリンクがクリップボードにコピーされました!
Kafka Bridge では、HTTP ベースのクライアントと Kafka クラスターとの対話を可能にする RESTful インターフェースが提供されます。また、クライアントアプリケーションによる Kafka プロトコルの変換は必要なく、Web API コネクションの利点が AMQ Streams に提供されます。
API にはconsumers と topicsの 2 つの主なリソースがあります。つまり、Kafka クラスターでコンシューマーおよびプロデューサーと対話するためにエンドポイント経由で公開され、アクセスが可能になります。リソースと関係があるのは Kafka ブリッジのみで、Kafka に直接接続されたコンシューマーやプロデューサーとは関係はありません。
HTTP リクエスト
Kafka Bridge は、以下の方法で Kafka クラスターへの HTTP リクエストをサポートします。
- トピックにメッセージを送信する。
- トピックからメッセージを取得する。
- トピックのパーティションリストを取得する。
- コンシューマーを作成および削除する。
- コンシューマーをトピックにサブスクライブし、このようなトピックからメッセージを受信できるようにする。
- コンシューマーがサブスクライブしているトピックの一覧を取得する。
- トピックからコンシューマーのサブスクライブを解除する。
- パーティションをコンシューマーに割り当てる。
- コンシューマーオフセットの一覧をコミットする。
- パーティションで検索して、コンシューマーが最初または最後のオフセットの位置、または指定のオフセットの位置からメッセージを受信できるようにする。
上記の方法で、JSON 応答と HTTP 応答コードのエラー処理を行います。メッセージは JSON またはバイナリー形式で送信できます。
クライアントは、ネイティブの Kafka プロトコルを使用する必要なくメッセージを生成して使用できます。
AMQ Streams のインストールと同様に、Red Hat Enterprise Linux にインストールする Kafka Bridge ファイルをダウンロードします。「Kafka Bridge アーカイブのダウンロード」 を参照してください。
KafkaBridge リソースのホストおよびポートの設定に関する詳細は、「Kafka Bridge プロパティーの設定」 を参照してください。
13.1.1. 認証および暗号化 リンクのコピーリンクがクリップボードにコピーされました!
HTTP クライアントと Kafka Bridge 間の認証および暗号化はまだサポートされていません。そのため、クライアントから Kafka Bridge に送信されるリクエストは以下のようになります。
- 暗号化されず、HTTPS ではなく HTTP を使用する必要がある。
- 認証なしで送信される。
Kafka Bridge と Kafka クラスター間の TLS または SASL ベースの認証 を設定できます。
13.1.2. Kafka Bridge へのリクエスト リンクのコピーリンクがクリップボードにコピーされました!
データ形式と HTTP ヘッダーを指定し、有効なリクエストが Kafka Bridge に送信されるようにします。
API リクエストおよびレスポンス本文は、常に JSON としてエンコードされます。
13.1.2.1. コンテンツタイプヘッダー リンクのコピーリンクがクリップボードにコピーされました!
すべてのリクエストに対して Content-Type ヘッダーを提出する必要があります。唯一の例外は、POST リクエストボディーが空であることです。Content-Type ヘッダーを追加すると、リクエストは失敗します。
コンシューマー操作(/consumers エンドポイント)およびプロデューサー操作(/topics エンドポイント)には異なる Content-Type ヘッダーが必要です。
コンシューマー操作の Content-Type ヘッダー
埋め込みデータ形式に関係なく、リクエストボディーにデータ が含まれる場合は、コンシューマー操作の POST リクエストに以下の Content-Type ヘッダーが含まれている必要があります。
Content-Type: application/vnd.kafka.v2+json
Content-Type: application/vnd.kafka.v2+json
プロデューサー操作の Content-Type ヘッダー
プロデューサー操作の実行時に、POST リクエストは、生成されたメッセージの 埋め込みデータ形式 を指定する Content-Type ヘッダーを提供する必要があります。これは、json または binary のいずれかになります。
| 埋め込みデータ形式 | Content-Type ヘッダー |
|---|---|
| JSON |
|
| バイナリー |
|
次のセクションで説明どおり、埋め込みデータ形式はコンシューマーごとに設定されます。
POST リクエストに空のボディーがある場合は、Content-Type を設定しないでください。空のボディーを使用して、デフォルト値のコンシューマーを作成できます。
13.1.2.2. 埋め込みデータ形式 リンクのコピーリンクがクリップボードにコピーされました!
埋め込みデータ形式は、Kafka メッセージが Kafka Bridge によりプロデューサーからコンシューマーに HTTP で送信される際の形式です。サポートされる埋め込みデータ形式には、JSON またはバイナリーの 2 つがサポートされます。
/consumers/groupid エンドポイントを使用してコンシューマーを作成する場合、POST リクエスト本文で JSON またはバイナリーいずれかの埋め込みデータ形式を指定する必要があります。これは、リクエストボディーの format フィールドで指定します。以下に例を示します。
{
"name": "my-consumer",
"format": "binary",
...
}
{
"name": "my-consumer",
"format": "binary",
...
}
- 1
- バイナリー埋め込みデータ形式。
コンシューマーの埋め込みデータ形式が指定されていない場合は、バイナリー形式が設定されます。
コンシューマーの作成時に指定する埋め込みデータ形式は、コンシューマーが消費する Kafka メッセージのデータ形式と一致する必要があります。
バイナリー埋め込みデータ形式を指定する場合は、以降のプロデューサーリクエストで、リクエスト本文にバイナリーデータが Base64 でエンコードされた文字列として含まれる必要があります。たとえば、POST リクエストを /topics/topicname エンドポイントに送信してメッセージを送信する場合は、value を Base64 でエンコードする必要があります。
プロデューサーリクエストは、埋め込みデータ形式に対応する Content-Type ヘッダーも提供する必要があります (例: Content-Type: application/vnd.kafka.binary.v2+json)。
13.1.2.3. メッセージの形式 リンクのコピーリンクがクリップボードにコピーされました!
/topics エンドポイントを使用してメッセージを送信する場合は、records パラメーターでリクエストボディーにメッセージペイロードを入力します。
records パラメーターには、以下のオプションフィールドを含めることができます。
-
メッセージの
key -
メッセージの
value -
宛先の
partition -
メッセージの
headers
トピックへの POST リクエストの例
- 1
- バイナリー形式のヘッダー値。Base64 としてエンコードされます。
13.1.2.4. Accept ヘッダー リンクのコピーリンクがクリップボードにコピーされました!
コンシューマーを作成したら、以降のすべての GET リクエストには Accept ヘッダーが以下のような形式で含まれる必要があります。
Accept: application/vnd.kafka.embedded-data-format.v2+json
Accept: application/vnd.kafka.embedded-data-format.v2+json
embedded-data-format は、json または binary のどちらかです。
たとえば、サブスクライブされたコンシューマーのレコードを JSON 埋め込みデータ形式で取得する場合、この Accept ヘッダーが含まれるようにします。
Accept: application/vnd.kafka.json.v2+json
Accept: application/vnd.kafka.json.v2+json
13.1.3. Kafka Bridge のロガーの設定 リンクのコピーリンクがクリップボードにコピーされました!
AMQ Streams の Kafka ブリッジを使用すると、関連する OpenAPI 仕様によって定義される操作ごとに異なるログレベルを設定できます。
それぞれの操作には、ブリッジが HTTP クライアントから要求を受信する対応の API エンドポイントがあります。各エンドポイントのログレベルを変更すると、送受信 HTTP リクエストに関する詳細なログ情報を生成することができます。
ロガーは log4j.properties ファイルで定義されます。このファイルには healthy および ready エンドポイントの以下のデフォルト設定が含まれています。
log4j.logger.http.openapi.operation.healthy=WARN, out log4j.additivity.http.openapi.operation.healthy=false log4j.logger.http.openapi.operation.ready=WARN, out log4j.additivity.http.openapi.operation.ready=false
log4j.logger.http.openapi.operation.healthy=WARN, out
log4j.additivity.http.openapi.operation.healthy=false
log4j.logger.http.openapi.operation.ready=WARN, out
log4j.additivity.http.openapi.operation.ready=false
その他すべての操作のログレベルは、デフォルトで INFO に設定されます。ロガーは以下のようにフォーマットされます。
log4j.logger.http.openapi.operation.<operation-id>
log4j.logger.http.openapi.operation.<operation-id>
ここで、<operation-id> は特定の操作の識別子になります。以下は、OpenAPI 仕様で定義されたオペレーションの一覧です。
-
createConsumer -
deleteConsumer -
subscribe -
unsubscribe -
poll -
assign -
commit -
send -
sendToPartition -
seekToBeginning -
seekToEnd -
seek -
healthy -
ready -
openapi
13.1.4. Kafka Bridge API リソース リンクのコピーリンクがクリップボードにコピーされました!
リクエストやレスポンスの例などを含む REST API エンドポイントおよび説明の完全リストは、「 Kafka Bridge API reference 」を参照してください。
13.1.5. Kafka Bridge アーカイブのダウンロード リンクのコピーリンクがクリップボードにコピーされました!
AMQ Streams Kafka Bridge の zip ディストリビューションは、Red Hat の Web サイトからダウンロードできます。
13.1.6. Kafka Bridge プロパティーの設定 リンクのコピーリンクがクリップボードにコピーされました!
この手順では、AMQ Streams Kafka Bridge によって使用される Kafka および HTTP コネクションプロパティーを設定する方法を説明します。
Kafka 関連のプロパティーに適切な接頭辞を使用して、Kafka Bridge を他の Kafka クライアントとして設定します。
-
kafka.サーバー接続やセキュリティーなどのプロデューサーおよびコンシューマーに適用される一般的な設定。 -
kafka.consumer.コンシューマーにのみ渡されたコンシューマー固有の設定の場合 -
kafka.producer.プロデューサー固有の設定の場合は、プロデューサーにのみ渡されます。
HTTP プロパティーは、Kafka クラスターへの HTTP アクセスを有効にする他に、CPRS (Cross-Origin Resource Sharing) により Kafka Bridge のアクセス制御を有効化または定義する機能を提供します。CORS は、複数のオリジンから指定のリソースにブラウザーでアクセスできるようにする HTTP メカニズムです。CORS を設定するには、許可されるリソースオリジンのリストと、それらにアクセスする HTTP メソッドを定義します。リクエストの追加の HTTP ヘッダーには Kafka クラスターへのアクセスが許可されるオリジンが記述 されています。
手順
AMQ Streams Kafka Bridge インストールアーカイブで提供される
application.propertiesファイルを編集します。プロパティーファイルを使用して Kafka および HTTP 関連のプロパティーを指定し、分散トレーシングを有効にします。
Kafka コンシューマーおよびプロデューサーに固有のプロパティーなど、標準の Kafka 関連のプロパティーを設定します。
以下を使用します。
-
kafka.bootstrap.serversKafka クラスターへのホスト/ポート接続を定義します。 -
kafka.producer.acksHTTP クライアントに承認を提供する kafka.consumer.auto.offset.resetKafka でオフセットのリセット方法を決定するため。Kafka プロパティーの設定に関する詳細は Apache Kafka の Web サイトを参照してください。
-
HTTP 関連のプロパティーを設定し、Kafka クラスターへの HTTP アクセスを有効にします。
以下に例を示します。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 分散トレーシングを有効または無効にします。
bridge.tracing=jaeger
bridge.tracing=jaegerCopy to Clipboard Copied! Toggle word wrap Toggle overflow プロパティーからコードコメントを削除して、分散トレースを有効にします。
13.1.7. Kafka Bridge のインストール リンクのコピーリンクがクリップボードにコピーされました!
以下の手順に従って、AMQ Streams Kafka Bridge を Red Hat Enterprise Linux にインストールします。
前提条件
手順
- AMQ Streams Kafka Bridge インストールアーカイブを任意のディレクトリーに展開していない場合は、そのディレクトリーに展開してください。
設定プロパティーをパラメーターとして使用して、Kafka Bridge スクリプトを実行します。
以下に例を示します。
./bin/kafka_bridge_run.sh --config-file=_path_/configfile.properties
./bin/kafka_bridge_run.sh --config-file=_path_/configfile.propertiesCopy to Clipboard Copied! Toggle word wrap Toggle overflow インストールがログで成功したことを確認します。
HTTP-Kafka Bridge started and listening on port 8080 HTTP-Kafka Bridge bootstrap servers localhost:9092
HTTP-Kafka Bridge started and listening on port 8080 HTTP-Kafka Bridge bootstrap servers localhost:9092Copy to Clipboard Copied! Toggle word wrap Toggle overflow