第9章 Java クライアントでシリアライザー/デシリアライザーを使用した Kafka メッセージの検証
Apicurio Registry によって、Java で書かれた Kafka プロデューサーおよびコンシューマーアプリケーションのクライアントシリアライザー/デシリアライザー (SerDes) が提供されます。Kafka プロデューサーアプリケーションは、シリアライザーを使用して、特定のイベントスキーマに準拠するメッセージをエンコードします。Kafka コンシューマーアプリケーションはデシリアライザーを使用して、特定のスキーマ ID に基づいてメッセージが適切なスキーマを使用してシリアライズされたことを検証します。これにより、スキーマが一貫して使用されるようにし、実行時にデータエラーが発生しないようにします。
この章では、プロデューサーおよびコンシューマークライアントアプリケーションで Kafka クライアント SerDes を使用する方法を説明します。
前提条件
- 1章Apicurio Registry の概要 を読んでいる。
- Apicurio Registry をインストールしている。
- Kafka プロデューサーおよびコンシューマークライアントアプリケーションを作成している。
9.1. Kafka クライアントアプリケーションと Apicurio Registry リンクのコピーリンクがクリップボードにコピーされました!
Apicurio Registry は、クライアントアプリケーション設定からスキーマ管理を分離します。クライアントコードに URL を指定すると、Java クライアントアプリケーションが Apicurio Registry からスキーマを使用できます。
スキーマを Apicurio Registry に保存して、クライアントアプリケーションから参照されるメッセージをシリアライズおよびデシリアライズして、送受信するメッセージがそれらのスキーマと互換性があることを確認できます。Kafka クライアントアプリケーションは、実行時に Apicurio Registry からスキーマをプッシュまたはプルできます。
スキーマは進化するので、Apicurio Registry でルールを定義できます。たとえば、スキーマの変更が有効で、アプリケーションによって使用される以前のバージョンとの互換性を維持するようにします。Apicurio Registry は、変更済みのスキーマと以前のスキーマバージョンを比較することで、互換性をチェックします。
Apicurio Registry スキーマテクノロジー
Apicurio Registry は、以下のようなスキーマテクノロジーのスキーマレジストリーサポートを提供します。
- Avro
- Protobuf
- JSON Schema
これらのスキーマテクノロジーは、Apicurio Registry によって提供される Kafka クライアントのシリアライザー/デシリアライザー (SerDes) サービスを介してクライアントアプリケーションで使用できます。Apicurio Registry が提供する SerDes クラスの成熟度と使用法はさまざまです。以降のセクションでは、各スキーマタイプの詳細を提供します。
プロデューサースキーマの設定
プロデューサークライアントアプリケーションは、シリアライザーを使用して、特定のブローカートピックに送信するメッセージを正しいデータ形式にします。
プロデューサーがシリアル化に Apicurio Registry を使用できるようにするには:
- スキーマを定義し、Apicurio Registry に登録します (まだ存在しない場合)。
以下を使用して プロデューサークライアントコードの設定を行います。
- Apicurio Registry の URL
- メッセージで使用する Apicurio Registry シリアライザー
- Kafka メッセージを Apicurio Registry のスキーマアーティファクトにマッピングするストラテジー
- Apicurio Registry でシリアル化に使用されるスキーマを検索または登録するためのストラテジー
スキーマを登録したら、Kafka および Apicurio Registry を開始するときに、スキーマにアクセスして、プロデューサーにより Kafka ブローカートピックに送信されるメッセージをフォーマットできます。または、設定に応じて、プロデューサーは最初の使用時にスキーマを自動的に登録できます。
スキーマがすでに存在する場合、Apicurio Registry に定義される互換性ルールに基づいて、レジストリー REST API を使用して新バージョンのスキーマを作成できます。バージョンは、スキーマの進化にともなう互換性チェックに使用します。グループ ID、アーティファクト ID、およびバージョンは、スキーマを識別する一意のタプルを表します。
コンシューマースキーマの設定
コンシューマークライアントアプリケーションは、デシリアライザーを使用することで、そのアプリケーションが消費するメッセージを特定のブローカートピックから正しいデータ形式にします。
コンシューマーが逆シリアル化に Apicurio Registry を使用できるようにするには:
- スキーマを定義し、Apicurio Registry に登録します (まだ存在しない場合)。
以下を使用して コンシューマークライアントコードを設定します。
- Apicurio Registry の URL
- メッセージで使用する Apicurio Registry デシリアライザー
- デシリアライズの入力データストリーム
コンテンツ ID を使用したスキーマの取得
デフォルトでは、スキーマは、消費されるメッセージで指定されるコンテンツ ID (アーティファクトバージョンの コンテンツ に固有の ID ですが、バージョン自体に固有ではありません) を使用して、デシリアライザーによって Apicurio Registry から取得されます。スキーマコンテンツ ID は、プロデューサーアプリケーションの設定に応じて、メッセージヘッダーまたはメッセージペイロードに配置できます。デフォルトでは、コンテンツ ID はメッセージ本文に配置されます。
メッセージペイロード内でコンテンツ ID を見つける場合、データの形式は、コンシューマーへの信号として使用されるマジックバイトで始まり、その後にコンテンツ ID、そして通常のメッセージデータが続きます。以下に例を示します。
...
# ...
[MAGIC_BYTE]
[CONTENT_ID]
[MESSAGE DATA]
これで、Kafka および Apicurio Registry を開始するとき、スキーマにアクセスして、Kafka ブローカートピックから受信するメッセージをフォーマットできます。
グローバル ID を使用したスキーマの取得
または、アーティファクトバージョンの一意の ID であるグローバル ID に基づいて、Apicurio Registry からスキーマを取得するように設定することもできます。contentID の代わりにグローバル ID を使用する場合も、同じオプションが利用できます。グローバル ID は、メッセージヘッダーまたはメッセージ本文 (デフォルト) で送信できます。
メッセージペイロードでグローバル ID を見つけるとき、データの形式は、コンシューマーへの信号として使用されるマジックバイトで始まり、通常通りグローバル ID とメッセージデータが続きます。以下に例を示します。
...
# ...
[MAGIC_BYTE]
[GLOBAL_ID]
[MESSAGE DATA]