第7章 Java クライアントでシリアライザー/デシリアライザーを使用した Kafka メッセージの検証
Apicurio Registry によって、Java で書かれた Kafka プロデューサーおよびコンシューマーアプリケーションのクライアントシリアライザー/デシリアライザー (SerDes) が提供されます。Kafka プロデューサーアプリケーションは、シリアライザーを使用して、特定のイベントスキーマに準拠するメッセージをエンコードします。Kafka コンシューマーアプリケーションはデシリアライザーを使用して、特定のスキーマ ID に基づいてメッセージが適切なスキーマを使用してシリアライズされたことを検証します。これにより、スキーマが一貫して使用されるようにし、実行時にデータエラーが発生しないようにします。
本章では、プロデューサーおよびコンシューマークライアントアプリケーションで Kafka クライアント SerDes を使用する方法について説明します。
前提条件
- 1章Apicurio Registry の概要 を読んでいる。
- Apicurio Registry をインストールしている。
Kafka プロデューサーおよびコンシューマークライアントアプリケーションを作成している。
Kafka クライアントアプリケーションの詳細は、OpenShift での AMQ Streams のデプロイと管理 を参照してください。
7.1. Kafka クライアントアプリケーションと Apicurio Registry リンクのコピーリンクがクリップボードにコピーされました!
Apicurio Registry は、クライアントアプリケーション設定からスキーマ管理を分離します。クライアントコードに URL を指定すると、Java クライアントアプリケーションが Apicurio Registry からスキーマを使用できます。
スキーマを Apicurio Registry に保存して、クライアントアプリケーションから参照されるメッセージをシリアライズおよびデシリアライズして、送受信するメッセージがそれらのスキーマと互換性があることを確認できます。Kafka クライアントアプリケーションは、実行時に Apicurio Registry からスキーマをプッシュまたはプルできます。
スキーマは進化するので、Apicurio Registry でルールを定義できます。たとえば、スキーマの変更が有効で、アプリケーションによって使用される以前のバージョンとの互換性を維持するようにします。Apicurio Registry は、変更済みのスキーマと以前のスキーマバージョンを比較することで、互換性をチェックします。
Apicurio Registry スキーマテクノロジー
Apicurio Registry は、以下のようなスキーマテクノロジーのスキーマレジストリーサポートを提供します。
- Avro
- Protobuf
- JSON スキーマ
これらのスキーマテクノロジーは、Apicurio Registry によって提供される Kafka クライアントのシリアライザー/デシリアライザー (SerDes) サービスを介してクライアントアプリケーションで使用できます。Apicurio Registry が提供する SerDes クラスの成熟度と使用法はさまざまです。以降のセクションでは、各スキーマタイプの詳細を提供します。
プロデューサースキーマの設定
プロデューサークライアントアプリケーションは、シリアライザーを使用して、特定のブローカートピックに送信するメッセージを正しいデータ形式にします。
プロデューサーがシリアル化に Apicurio Registry を使用できるようにするには:
- スキーマを定義し、Apicurio Registry に登録します (まだ存在しない場合)。
以下を使用して プロデューサークライアントコードの設定を行います。
- Apicurio Registry の URL
- メッセージで使用する Apicurio Registry シリアライザー
- Kafka メッセージを Apicurio Registry のスキーマアーティファクトにマッピングするストラテジー
- Apicurio Registry でシリアル化に使用されるスキーマを検索または登録するためのストラテジー
スキーマを登録したら、Kafka および Apicurio Registry を開始するときに、スキーマにアクセスして、プロデューサーにより Kafka ブローカートピックに送信されるメッセージをフォーマットできます。または、設定に応じて、プロデューサーは最初の使用時にスキーマを自動的に登録できます。
スキーマがすでに存在する場合、Apicurio Registry に定義される互換性ルールに基づいて、レジストリー REST API を使用して新バージョンのスキーマを作成できます。バージョンは、スキーマの進化にともなう互換性チェックに使用します。グループ ID、アーティファクト ID、およびバージョンは、スキーマを識別する一意のタプルを表します。
コンシューマースキーマの設定
コンシューマークライアントアプリケーションは、デシリアライザーを使用することで、そのアプリケーションが消費するメッセージを特定のブローカートピックから正しいデータ形式にします。
コンシューマーが逆シリアル化に Apicurio Registry を使用できるようにするには:
- スキーマを定義し、Apicurio Registry に登録します (まだ存在しない場合)。
以下を使用して コンシューマークライアントコードを設定します。
- Apicurio Registry の URL
- メッセージで使用する Apicurio Registry デシリアライザー
- デシリアライズの入力データストリーム
グローバル ID を使用したスキーマの取得
デフォルトでは、スキーマは、消費されるメッセージに指定されるグローバル ID を使用してデシリアライザーによって Apicurio Registry から取得されます。スキーマグローバル ID は、プロデューサーアプリケーションの設定に応じて、メッセージヘッダーまたはメッセージペイロードに配置できます。
メッセージペイロードでグローバル ID を見つけるとき、データの形式は、コンシューマーへの信号として使用されるマジックバイトで始まり、通常通りグローバル ID とメッセージデータが続きます。以下に例を示します。
# ... [MAGIC_BYTE] [GLOBAL_ID] [MESSAGE DATA]
# ...
[MAGIC_BYTE]
[GLOBAL_ID]
[MESSAGE DATA]
これで、Kafka および Apicurio Registry を開始するとき、スキーマにアクセスして、Kafka ブローカートピックから受信するメッセージをフォーマットできます。
コンテンツ ID を使用したスキーマの取得
または、アーティファクトコンテンツの一意の ID であるコンテンツ ID に基づいて、Apicurio Registry からスキーマを取得するように設定できます。グローバル ID はアーティファクトバージョンの一意の ID です。
コンテンツ ID はバージョンを一意に識別しませんが、バージョンコンテンツのみを一意に識別します。複数のバージョンがまったく同じコンテンツを共有している場合、グローバル ID は異なりますが、コンテンツ ID は同じです。Confluent Schema Registry はデフォルトでコンテンツ ID を使用します。