第13章 Kafka クライアントシリアライザー/デシリアライザーを使用したスキーマの検証
Service Registry は、プロデューサーおよびコンシューマーアプリケーションの Kafka クライアントシリアライザー/デシリアライザーを提供します。Kafka プロデューサーアプリケーションは、シリアライザーを使用して、特定のイベントスキーマに準拠するメッセージをエンコードします。Kafka コンシューマーアプリケーションはデシリアライザーを使用して、特定のスキーマ ID に基づいてメッセージが適切なスキーマを使用してシリアライズされたことを検証します。これにより、スキーマが一貫して使用されるようにし、実行時にデータエラーが発生しないようにします。
本章では、Kafka プロデューサーおよびコンシューマークライアントアプリケーションで、Apache Avro、JSON スキーマ、および Google Protobuf の Kafka クライアントシリアライザーとデシリアライザーを使用する方法を説明します。
- 「Kafka クライアントアプリケーションおよび Service Registry」
- 「スキーマを検索するストラテジー」
- 「Service Registry シリアライザー/デシリアライザー定数」
- 「異なるクライアントのシリアライザー/デシリアライザータイプの使用」
- 「Service Registry を使用した Avro SerDe の設定」
- 「Service Registry を使用した JSON スキーマ SerDe の設定」
- 「Service Registry を使用した Protobuf SerDe の設定」
- 「スキーマの Service Registry への登録」
- 「Kafka コンシューマークライアントからのスキーマの使用」
- 「Kafka プロデューサークライアントからのスキーマの使用」
- 「Kafka Streams アプリケーションからのスキーマの使用」
前提条件
- 読む必要があります。 1章Service Registry の概要
- Service Registry がインストールされている必要があります。
Kafka プロデューサーおよびコンシューマークライアントアプリケーションが作成済みである必要があります。
Kafka クライアントアプリケーションの詳細は、『 Using AMQ Streams on Openshift 』を参照してください。
13.1. Kafka クライアントアプリケーションおよび Service Registry
Service Registry を使用すると、クライアントアプリケーション設定からスキーマ管理が分離されます。クライアントコードに URL を指定して、アプリケーションがレジストリーからスキーマを使用できるようにします。
たとえば、スキーマを保存して、レジストリーにメッセージをシリアライズおよびデシリアライズすることができます。次に、レジストリーを使用するアプリケーションから参照され、送受信されるメッセージとこれらのスキーマの互換性を維持するようにします。Kafka クライアントアプリケーションは、実行時にスキーマを Service Registry からプッシュまたはプルできます。
スキーマは進化するので、Service Registry でルールを定義できます。たとえば、スキーマへの変更が有効で、アプリケーションによって使用される以前のバージョンとの互換性を維持するようにします。Service Registry は、変更済みのスキーマと以前のスキーマバージョンを比較することで、互換性をチェックします。
Service Registry は、以下のような複数のスキーマ技術のスキーマレジストリーサポートを提供します。
- Avro
- Protobuf
- JSON スキーマ
これらのスキーマ技術は、Service Registry によって提供される Kafka クライアントのシリアライザー/デシリアライザー(SerDe)サービスを介してクライアントアプリケーションで使用できます。Service Registry によって提供される SerDe クラスの成熟度および使用法は異なる場合があります。それぞれのタイプ固有のセクションを参照してください。
プロデューサースキーマの設定
プロデューサークライアントアプリケーションは、シリアライザーを使用して、特定のブローカートピックに送信するメッセージを正しいデータ形式にします。
プロデューサーが Service Registry を使用してシリアライズできるようにするには、以下を行います。
- スキーマを Service Registry に定義、登録します (任意)。
- Service Registry の URL
- メッセージで使用する Service Registry シリアライザー
- Kafka メッセージを Service Registry のアーティファクト ID にマップするストラテジー
- Service Registry でのシリアライズに使用するスキーマを検索または登録するストラテジー
スキーマを登録したら、Kafka および Service Registry を開始するときに、スキーマにアクセスして、プロデューサーにより Kafka ブローカートピックに送信されるメッセージをフォーマットできます。または(設定により)、プロデューサーは初回使用時にスキーマを自動的に登録できます。
スキーマがすでに存在する場合、Service Registry に定義される互換性ルールに基づいて REST API を使用して新バージョンのスキーマを作成できます。バージョンは、スキーマの進化にともなう互換性チェックに使用します。アーティファクト ID およびスキーマバージョンは、スキーマを識別する一意のタプルを表します。
コンシューマースキーマの設定
コンシューマークライアントアプリケーションは、デシリアライザーを使用することで、そのアプリケーションが消費するメッセージを特定のブローカートピックから正しいデータ形式にします。
コンシューマーがデシリアライズに Service Registry を使用できるようにするには、以下を実行します。
- スキーマを Service Registry に定義、登録します。
- Service Registry の URL
- メッセージで使用する Service Registry デシリアライザー
- デシリアライズの入力データストリーム
次に、消費されるメッセージに書き込まれたグローバル ID を使用して、デシリアライザーによってスキーマが取得されます。スキーマグローバル ID は、プロデューサーアプリケーションの設定に応じて、メッセージヘッダーまたはメッセージペイロード自体に置くことができます。
メッセージペイロードでグローバル ID を見つけると、データの形式は(コンシューマーへのシグナルとして)マジックバイトで始まり、その後にグローバル ID の後に、通常通りメッセージデータが続きます。
以下に例を示します。
# ... [MAGIC_BYTE] [GLOBAL_ID] [MESSAGE DATA]
これで、Kafka および Service Registry を起動すると、スキーマにアクセスして、Kafka ブローカートピックから受け取ったメッセージをフォーマットできるようになりました。