8.3. さまざまなクライアントシリアライザー/デシリアライザータイプの設定方法


Kafka クライアントアプリケーションでスキーマを使用する場合は、ユースケースに応じて、使用する特定のスキーマタイプを選択する必要があります。Apicurio Registry は、Apache Avro、JSON スキーマ、および Google Protobuf 用の SerDe Java クラスを提供します。以下のセクションでは、各タイプを使用するように Kafka アプリケーションを設定する方法を説明します。

また、Kafka を使用してカスタムシリアライザーおよびデシリアライザークラスを実装し、Apicurio Registry REST Java クライアントを使用して Apicurio Registry 機能を活用することもできます。

シリアライザー/デシリアライザーの Kafka アプリケーション設定

Kafka アプリケーションで Apicurio Registry によって提供される SerDe クラスを使用するには、正しい設定プロパティーを設定する必要があります。以下の簡単な Avro の例は、Kafka プロデューサーアプリケーションでシリアライザーを設定する方法と、Kafka コンシューマーアプリケーションでデシリアライザーを設定する方法を示しています。

Kafka プロデューサーのシリアライザー設定の例

// Create the Kafka producer
private static Producer<Object, Object> createKafkaProducer() {
    Properties props = new Properties();

    // Configure standard Kafka settings
    props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
    props.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, "Producer-" + TOPIC_NAME);
    props.putIfAbsent(ProducerConfig.ACKS_CONFIG, "all");

    // Use Apicurio Registry-provided Kafka serializer for Avro
    props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroKafkaSerializer.class.getName());

    // Configure the Apicurio Registry location
    props.putIfAbsent(SerdeConfig.REGISTRY_URL, REGISTRY_URL);

    // Register the schema artifact if not found in the registry.
    props.putIfAbsent(SerdeConfig.AUTO_REGISTER_ARTIFACT, Boolean.TRUE);

    // Create the Kafka producer
    Producer<Object, Object> producer = new KafkaProducer<>(props);
    return producer;
}

Kafka コンシューマーのデシリアライザー設定の例

// Create the Kafka consumer
private static KafkaConsumer<Long, GenericRecord> createKafkaConsumer() {
    Properties props = new Properties();

    // Configure standard Kafka settings
    props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
    props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "Consumer-" + TOPIC_NAME);
    props.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    props.putIfAbsent(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    // Use Apicurio Registry-provided Kafka deserializer for Avro
    props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroKafkaDeserializer.class.getName());

    // Configure the Apicurio Registry location
    props.putIfAbsent(SerdeConfig.REGISTRY_URL, REGISTRY_URL);

    // No other configuration needed because the schema globalId the deserializer uses is sent
    // in the payload. The deserializer extracts the globalId and uses it to look up the schema
    // from the registry.

    // Create the Kafka consumer
    KafkaConsumer<Long, GenericRecord> consumer = new KafkaConsumer<>(props);
    return consumer;
}

関連情報

8.3.1. Apicurio Registry を使用して Avro SerDes を設定する

このトピックでは、Apache Avro の Kafka クライアントシリアライザーおよびデシリアライザー (SerDes) クラスを使用する方法について説明します。

Apicurio Registry は、Avro 用に次の Kafka クライアント SerDes クラスを提供します。

  • io.apicurio.registry.serde.avro.AvroKafkaSerializer
  • io.apicurio.registry.serde.avro.AvroKafkaDeserializer

Avro シリアライザーの設定

以下のように Avro シリアライザークラスを設定することができます。

  • Apicurio Registry URL
  • アーティファクトリーゾルバーストラテジー
  • ID の場所
  • ID エンコーディング
  • Avro datum プロバイダー
  • Avro エンコーディング

ID の場所

シリアライザーは、スキーマの一意の ID を Kafka メッセージの一部として渡し、コンシューマーがデシリアライズに正しいスキーマを使用できるようにします。ID は、メッセージのペイロードまたはメッセージヘッダーに存在できます。デフォルトの場所はメッセージペイロードです。メッセージヘッダーで ID を送信するには、以下の設定プロパティーを設定します。

props.putIfAbsent(SerdeConfig.ENABLE_HEADERS, "true")

プロパティー名は apicurio.registry.headers.enabled です。

ID エンコーディング

Kafka メッセージボディーに渡すときにスキーマ ID をエンコードする方法をカスタマイズできます。apicurio.registry.id-handler 設定プロパティーを、io.apicurio.registry.serde.IdHandler インターフェイスを実装するクラスに設定します。Apicurio Registry は、次の実装を提供します。

  • io.apicurio.registry.serde.DefaultIdHandler:ID を 8 バイト長として格納します
  • io.apicurio.registry.serde.Legacy4ByteIdHandler:ID を 4 バイト整数として格納します

Apicurio Registry はスキーマ ID を long として表しますが、従来の理由、または他のレジストリーまたは SerDe クラスとの互換性のため、ID の送信時に 4 バイトの使用が推奨される場合があります。

Avro datum プロバイダー

Avro は、データを読み書きするためのさまざまなデータライターとリーダーを提供します。Apicurio Registry は、次の 3 つの異なるタイプをサポートしています。

  • Generic
  • Specific
  • Reflect

Apicurio Registry AvroDatumProvider は、使用されるタイプの抽象概念であり、DefaultAvroDatumProvider がデフォルトで使用されます。

以下の設定オプションを設定できます。

  • apicurio.registry.avro-datum-provider: AvroDatumProvider 実装の完全修飾 Java クラス名を指定します (例えば、io.apicurio.registry.serde.avro.ReflectAvroDatumProvider)。
  • apicurio.registry.use-specific-avro-reader: DefaultAvroDatumProvider を使用するときに特定のタイプを使用するには、true に設定します

Avro エンコーディング

Avro を使用してデータをシリアライズする場合は、Avro バイナリーエンコーディング形式を使用して、データを可能な限り効率的な形式でエンコードすることができます。Avro は JSON としてデータのエンコードもサポートします。これにより、ロギングやデバッグなどの各メッセージのペイロードの検証が容易になります。

apicurio.registry.avro.encoding プロパティーを JSON または BINARY の値で設定することにより、Avro エンコーディングを設定できます。デフォルトは BINARY です。

Avro デシリアライザーの設定

Avro デシリアライザーの設定を、シリアライザーの設定と一致するように、Avro デシリアライザークラスを設定する必要があります。

  • Apicurio Registry URL
  • ID エンコーディング
  • Avro datum プロバイダー
  • Avro エンコーディング

これらの設定オプションは、シリアライザーセクションを参照してください。プロパティー名と値は同じです。

注記

デシリアライザーの設定時には、以下のオプションは必要ありません。

  • アーティファクトリーゾルバーストラテジー
  • ID の場所

デシリアライザークラスは、メッセージからこれらのオプションの値を判断できます。シリアライザーはメッセージの一部として ID を送信するため、ストラテジーは必要ありません。

ID の位置は、メッセージペイロードの先頭にあるマジックバイトを確認することで決定されます。そのバイトが見つかると、設定されたハンドラーを使用してメッセージペイロードから ID が読み取られます。マジックバイトが見つからない場合、ID はメッセージヘッダーから読み込まれます。

Avro SerDes とアーティファクトの参照

レコードがネスト化された Avro メッセージとスキーマを使用する場合に、ネストされたレコードごとに新しいアーティファクトが登録されます。たとえば、次の TradeKey スキーマには、ネストされた Exchange スキーマが含まれています。

ネストされた Exchange スキーマを持つ TradeKey スキーマ

{
  "namespace": "com.kubetrade.schema.trade",
  "type": "record",
  "name": "TradeKey",
  "fields": [
    {
      "name": "exchange",
      "type": "com.kubetrade.schema.common.Exchange"
    },
    {
      "name": "key",
      "type": "string"
    }
  ]
}

交換スキーマ

{
  "namespace": "com.kubetrade.schema.common",
  "type": "enum",
  "name": "Exchange",
  "symbols" : ["GEMINI"]
}

これらのスキーマを Avro SerDes で使用すると、Apicurio Registry に、TradeKey スキーマ用と、Exchange スキーマ用の 2 つのアーティファクトが作成されます。TradeKey スキーマを使用するメッセージがシリアライズまたはデシリアライズされるたびに、両方のスキーマが取得されるため、定義を異なるファイルに分割できます。

関連情報

8.3.2. Apicurio Registry を使用して JSON スキーマ SerDes を設定する

このトピックでは、JSON スキーマの Kafka クライアントシリアライザーおよびデシリアライザー (SerDes) クラスを使用する方法について説明します。

Apicurio Registry は、JSON スキーマ用に次の Kafka クライアント SerDes クラスを提供します。

  • io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaSerializer
  • io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaDeserializer

Apache Avro とは異なり、JSON スキーマはシリアライズテクノロジーではなく、検証テクノロジーです。その結果、JSON スキーマの設定オプションは大きく異なります。たとえば、データは常に JSON としてエンコードされるため、エンコーディングオプションはありません。

JSON スキーマシリアライザーの設定

JSON スキーマシリアライザークラスを以下のように設定できます。

  • Apicurio Registry URL
  • アーティファクトリーゾルバーストラテジー
  • スキーマ検証

唯一の標準以外の設定プロパティーは、デフォルトで有効になっている JSON スキーマバリデーションです。これを無効にするには、apicurio.registry.serde.validation-enabledfalse に設定します。以下に例を示します。

props.putIfAbsent(SerdeConfig.VALIDATION_ENABLED, Boolean.FALSE)

JSON スキーマデシリアライザーの設定

JSON スキーマデシリアライザークラスを以下のように設定できます。

  • Apicurio Registry URL
  • スキーマ検証
  • データをデシリアライズするためのクラス

スキーマをロードできるように、Apicurio Registry の場所を指定する必要があります。他の設定はオプションです。

注記

デシリアライザーの検証は、シリアライザーが Kafka メッセージでグローバル ID を渡す場合にのみ機能します。これは、シリアライザーで検証が有効になっている場合にのみ発生します。

JSON スキーマの SerDes とアーティファクトの参照

JSON スキーマ SerDes はメッセージペイロードからスキーマを検出できないため、事前にスキーマアーティファクトを登録する必要があり、これはアーティファクト参照にも適用されます。

スキーマの内容に応じて、$ref の値は URL に置き換えます。SerDes はこの URL を使用して参照のスキーマを解決しようとし、主要なスキーマに対するデータの検証、ネスト化されたスキーマに対するネスト化された値の検証など、検証は通常どおりに行われます。Apicurio Registry でアーティファクトを参照するためのサポートも実装されています。

たとえば、次の city.json スキーマは city.json スキーマを参照ししています。

city.json スキーマを参照する citizen.json スキーマ

{
 "$id": "https://example.com/citizen.schema.json",
 "$schema": "http://json-schema.org/draft-07/schema#",
 "title": "Citizen",
 "type": "object",
 "properties": {
   "firstName": {
     "type": "string",
     "description": "The citizen's first name."
   },
   "lastName": {
     "type": "string",
     "description": "The citizen's last name."
   },
   "age": {
     "description": "Age in years which must be equal to or greater than zero.",
     "type": "integer",
     "minimum": 0
   },
   "city": {
     "$ref": "city.json"
   }
 }
}

city.json スキーマ

{
 "$id": "https://example.com/city.schema.json",
 "$schema": "http://json-schema.org/draft-07/schema#",
 "title": "City",
 "type": "object",
 "properties": {
   "name": {
     "type": "string",
     "description": "The city's name."
   },
   "zipCode": {
     "type": "integer",
     "description": "The zip code.",
     "minimum": 0
   }
 }
}

この例では、citizen に city が含まれます。Apicurio Registry では、city アーティファクトへの参照を持つ市民アーティファクトが、city.json という名前を使用して作成されます。SerDes では、citizen スキーマをフェッチすると、citizen スキーマから参照されるため、city スキーマもフェッチされます。データをシリアライズ/デシリアライズする場合、ネストされたスキーマを解決するために参照名が使用され、citizen スキーマとネストされた city スキーマに対する検証が可能になります。

関連情報

8.3.3. Apicurio Registry を使用して Protobuf SerDes を設定する

このトピックでは、Google Protobuf の Kafka クライアントシリアライザーおよびデシリアライザー (SerDes) クラスを使用する方法について説明します。

Apicurio Registry は、Protobuf 用に次の Kafka クライアント SerDes クラスを提供します。

  • io.apicurio.registry.serde.protobuf.ProtobufKafkaSerializer
  • io.apicurio.registry.serde.protobuf.ProtobufKafkaDeserializer

Protobuf シリアライザーの設定

Protobuf シリアライザークラスを以下のように設定できます。

  • Apicurio Registry URL
  • アーティファクトリーゾルバーストラテジー
  • ID の場所
  • ID エンコーディング
  • スキーマ検証

これらの設定オプションの詳細は、以下のセクションを参照してください。

Protobuf デシリアライザーの設定

シリアライザーの以下の設定と一致するように、Protobuf デシリアライザークラスを設定する必要があります。

  • Apicurio Registry URL
  • ID エンコーディング

設定プロパティー名と値はシリアライザーの場合と同じです。

注記

デシリアライザーの設定時には、以下のオプションは必要ありません。

  • アーティファクトリーゾルバーストラテジー
  • ID の場所

デシリアライザークラスは、メッセージからこれらのオプションの値を判断できます。シリアライザーはメッセージの一部として ID を送信するため、ストラテジーは必要ありません。

ID の位置は、メッセージペイロードの先頭にあるマジックバイトを確認することで決定されます。そのバイトが見つかると、設定されたハンドラーを使用してメッセージペイロードから ID が読み取られます。マジックバイトが見つからない場合、ID はメッセージヘッダーから読み込まれます。

注記

Protobuf デシリアライザーは、正確な Protobuf Message 実装へデシリアライズしません。DynamicMessage インスタンスへデシリアライズします。設定しない場合は、適切な API はありません。

Protobuf SerDes とアーティファクト参照

import ステートメントを含む複雑な Protobuf メッセージが使用される場合に、インポートされた Protobuf メッセージは個別のアーティファクトとして Apicurio Registry に保存されます。次に、Apicurio Registry がメインスキーマを取得して Protobuf メッセージをチェックすると、完全なメッセージスキーマをチェックしてシリアル化できるように参照されているスキーマも取得されます。

たとえば、次の table_info.proto スキーマファイルには、インポートされた mode.proto スキーマファイルが含まれています。

インポートされた .mode.proto ファイルを含む table_info.proto ファイル

syntax = "proto3";
package sample;
option java_package = "io.api.sample";
option java_multiple_files = true;

import "sample/mode.proto";

message TableInfo {

 int32 winIndex = 1;
 Mode mode = 2;
 int32 min = 3;
 int32 max = 4;
 string id = 5;
 string dataAdapter = 6;
 string schema = 7;
 string selector = 8;
 string subscription_id = 9;
}

mode.proto ファイル

syntax = "proto3";
package sample;
option java_package = "io.api.sample";
option java_multiple_files = true;

enum Mode {

MODE_UNKNOWN = 0;
RAW = 1;
MERGE = 2;
DISTINCT = 3;
COMMAND = 4;
}

この例では、TableInfo 用と Mode 用の 2 つの Protobuf アーティファクトが Apicurio Registry に格納されます。ただし、ModeTableInfo の一部であるため、SerDes 内のメッセージをチェックするために TableInfo がフェッチされるたびに、ModeTableInfo で参照されるアーティファクトとして返されます。

関連情報

Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。 最新の更新を見る.

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.