第7章 Java クライアントでシリアライザー/デシリアライザーを使用した Kafka メッセージの検証
Service Registry によって、Java で書かれた Kafka プロデューサーおよびコンシューマーアプリケーションのクライアントシリアライザー/デシリアライザー (SerDes) が提供されます。Kafka プロデューサーアプリケーションは、シリアライザーを使用して、特定のイベントスキーマに準拠するメッセージをエンコードします。Kafka コンシューマーアプリケーションはデシリアライザーを使用して、特定のスキーマ ID に基づいてメッセージが適切なスキーマを使用してシリアライズされたことを検証します。これにより、スキーマが一貫して使用されるようにし、実行時にデータエラーが発生しないようにします。
本章では、プロデューサーおよびコンシューマークライアントアプリケーションで Kafka クライアント SerDes を使用する方法について説明します。
前提条件
- 1章Service Registry の概要 を読む。
- Service Registry がインストールされている。
Kafka プロデューサーおよびコンシューマークライアントアプリケーションが作成済みである。
Kafka クライアントアプリケーションの詳細は OpenShift での AMQ Streams のデプロイとアップグレード を参照してください。
7.1. Kafka クライアントアプリケーションおよび Service Registry リンクのコピーリンクがクリップボードにコピーされました!
Service Registry は、クライアントアプリケーション設定からスキーマ管理を分離します。クライアントコードに URL を指定すると、Java クライアントアプリケーションが Service Registry からスキーマを使用できます。
Service Registry にスキーマを保存して、メッセージをシリアライズおよびデシリアライズできます。クライアントアプリケーションからスキーマが参照され、送受信されるメッセージとこれらのスキーマの互換性を維持するようにします。Kafka クライアントアプリケーションは実行時にスキーマを Service Registry からプッシュまたはプルできます。
スキーマは進化するので、Service Registry でルールを定義できます。たとえば、スキーマの変更が有効で、アプリケーションによって使用される以前のバージョンとの互換性を維持するようにします。Service Registry は、変更済みのスキーマと以前のスキーマバージョンを比較することで、互換性をチェックします。
Service Registry スキーマテクノロジー
Service Registry は、以下のようなスキーマテクノロジーのスキーマレジストリーサポートを提供します。
- Avro
- Protobuf
- JSON スキーマ
これらのスキーマテクノロジーは、Service Registry によって提供される Kafka クライアントのシリアライザー/デシリアライザー (SerDes) サービスを介してクライアントアプリケーションで使用できます。Service Registry によって提供される SerDes クラスの成熟度および使用法は異なる場合があります。以降のセクションでは、各スキーマタイプの詳細を提供します。
プロデューサースキーマの設定
プロデューサークライアントアプリケーションは、シリアライザーを使用して、特定のブローカートピックに送信するメッセージを正しいデータ形式にします。
プロデューサーが Service Registry を使用してシリアライズできるようにするには、以下を行います。
- スキーマを Service Registry に定義、登録します (存在しない場合)。
以下を使用して プロデューサークライアントコードの設定を行います。
- Service Registry の URL
- メッセージで使用する Service Registry シリアライザー
- Kafka メッセージを Service Registry のスキーマアーティファクトにマップするストラテジー
- Service Registry でのシリアライズに使用するスキーマを検索または登録するストラテジー
スキーマを登録したら、Kafka および Service Registry を開始するときに、スキーマにアクセスして、プロデューサーにより Kafka ブローカートピックに送信されるメッセージをフォーマットできます。または、設定に応じて、プロデューサーは最初の使用時にスキーマを自動的に登録できます。
スキーマがすでに存在する場合、Service Registry に定義される互換性ルールに基づいて、レジストリー REST API を使用して新バージョンのスキーマを作成できます。バージョンは、スキーマの進化にともなう互換性チェックに使用します。グループ ID、アーティファクト ID、およびバージョンは、スキーマを識別する一意のタプルを表します。
コンシューマースキーマの設定
コンシューマークライアントアプリケーションは、デシリアライザーを使用することで、そのアプリケーションが消費するメッセージを特定のブローカートピックから正しいデータ形式にします。
コンシューマーがデシリアライズに Service Registry を使用できるようにするには、以下を実行します。
- スキーマを Service Registry に定義、登録します (存在しない場合)。
以下を使用して コンシューマークライアントコードを設定します。
- Service Registry の URL
- メッセージで使用する Service Registry デシリアライザー
- デシリアライズの入力データストリーム
グローバル ID を使用したスキーマの取得
デフォルトでは、スキーマは、消費されるメッセージに指定されるグローバル ID を使用してデシリアライザーによって Service Registry から取得されます。スキーマグローバル ID は、プロデューサーアプリケーションの設定に応じて、メッセージヘッダーまたはメッセージペイロードに配置できます。
メッセージペイロードでグローバル ID を見つけるとき、データの形式は、コンシューマーへの信号として使用されるマジックバイトで始まり、通常通りグローバル ID とメッセージデータが続きます。以下に例を示します。
...
# ...
[MAGIC_BYTE]
[GLOBAL_ID]
[MESSAGE DATA]
これで、Kafka および Service Registry を開始するとき、スキーマにアクセスして、Kafka ブローカートピックから受信するメッセージをフォーマットできます。
コンテンツ ID を使用したスキーマの取得
または、アーティファクトコンテンツの一意の ID であるコンテンツ ID に基づいて、Service Registry からスキーマを取得するように設定できます。グローバル ID はアーティファクトバージョンの一意の ID です。
コンテンツ ID はバージョンを一意に識別しませんが、バージョンコンテンツのみを一意に識別します。複数のバージョンがまったく同じコンテンツを共有している場合、グローバル ID は異なりますが、コンテンツ ID は同じです。Confluent Schema Registry はデフォルトでコンテンツ ID を使用します。