13.4. 異なるクライアントのシリアライザー/デシリアライザータイプの使用


Kafka アプリケーションでスキーマ技術を使用する場合は、使用する特定のスキーマタイプを選択する必要があります。一般的なオプションは以下のとおりです。

  • Apache Avro
  • JSON スキーマ
  • Google Protobuf

選択するスキーマ技術は、ユースケースと設定に依存します。当然ながら、Kafka を使用してカスタムシリアライザーおよびデシリアライザークラスを実装することができるので、Service Registry REST Java クライアントを使用した Service Registry 機能の利用など、いつでも独自のクラスを作成することができます。

便宜上、Service Registry は Avro、JSON スキーマ、および Protobuf スキーマテクノロジーに追加設定なしで SerDe クラスを提供します。以下のセクションでは、各タイプを使用するように Kafka アプリケーションを設定する方法を説明します。

シリアライザー/デシリアライザーの Kafka アプリケーション設定

Kafka アプリケーションで Service Registry によって提供されるシリアライザーまたはデシリアライザークラスの 1 つを使用するには、正しい設定プロパティーを設定する必要があります。以下の例は、Kafka プロデューサーアプリケーションでシリアライザーを設定する方法と、Kafka コンシューマーアプリケーションでデシリアライザーを設定する方法を示しています。

Kafka プロデューサーのシリアライザー設定の例

public Producer<Object,Object> createKafkaProducer(String kafkaBootstrapServers, String topicName) {
    Properties props = new Properties();

    // Configure standard Kafka settings
    props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
    props.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, "Producer-" + topicName);
    props.putIfAbsent(ProducerConfig.ACKS_CONFIG, "all");

    // Use a Service Registry-provided Kafka serializer
    props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
        io.apicurio.registry.utils.serde.AvroKafkaSerializer.class.getName());
    props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
        io.apicurio.registry.utils.serde.AvroKafkaSerializer.class.getName());

    // Configure Service Registry location
    props.putIfAbsent(AbstractKafkaSerDe.REGISTRY_URL_CONFIG_PARAM, REGISTRY_URL);

    // Map the topic name (plus -key/value) to the artifactId in the registry
    props.putIfAbsent(AbstractKafkaSerializer.REGISTRY_ARTIFACT_ID_STRATEGY_CONFIG_PARAM,
        io.apicurio.registry.utils.serde.strategy.TopicIdStrategy.class.getName());

    // Get an existing schema or auto-register if not found
    props.putIfAbsent(AbstractKafkaSerializer.REGISTRY_GLOBAL_ID_STRATEGY_CONFIG_PARAM,
        io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy.class.getName());

    // Create the Kafka producer
    Producer<Object, Object> producer = new KafkaProducer<>(props);
    return producer;
}

Kafka コンシューマーのデシリアライザー設定の例

public Consumer<Object,Object> createKafkaConsumer(String kafkaBootstrapServers, String topicName) {
    Properties props = new Properties();

    // Configure standard Kafka settings
    props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
    props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "Consumer-" + topicName);
    props.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    props.putIfAbsent(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    // Use a Service Registry-provided Kafka deserializer
    props.putIfAbsent(ProducerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
        io.apicurio.registry.utils.serde.AvroKafkaDeserializer.class.getName());
    props.putIfAbsent(ProducerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
        io.apicurio.registry.utils.serde.AvroKafkaDeserializer.class.getName());

    // Configure Service Registry location
    props.putIfAbsent(AbstractKafkaSerDe.REGISTRY_URL_CONFIG_PARAM, REGISTRY_URL);

    // No other configuration needed for deserializer because  globalId of the schema
    // the deserializer uses is sent as part of the message. The deserializer simply
    // extracts that globalId and uses it to look up the schema from the registry.

    // Create the Kafka consumer
    KafkaConsumer<Long, GenericRecord> consumer = new KafkaConsumer<>(props);
    return consumer;
}

13.4.1. Service Registry を使用した Avro SerDe の設定

Service Registry は、Avro を可能な限り簡単に使用できるように、Apache Avro 用の Kafka クライアントシリアライザーおよびデシリアライザークラスを提供します。

  • io.apicurio.registry.utils.serde.AvroKafkaSerializer
  • io.apicurio.registry.utils.serde.AvroKafkaDeserializer

Avro シリアライザーの設定

Avro シリアライザークラスは以下の方法で設定できます。

  • URL としての Service Registry の場所
  • アーティファクト ID ストラテジー
  • グローバル ID ストラテジー
  • グローバル ID の場所
  • グローバル ID ハンドラー
  • Avro datum プロバイダー
  • Avro エンコーディング

グローバル ID の場所

シリアライザーは、スキーマの一意のグローバル ID を Kafka メッセージの一部として渡し、コンシューマーがデシリアライズに適切なスキーマを使用できるようにします。グローバル ID の場所はメッセージのペイロードまたはメッセージヘッダーになります。デフォルトの方法では、メッセージペイロードでグローバル ID を渡します。代わりに、メッセージヘッダーで送信される ID が必要な場合は、以下の設定プロパティーを設定できます。

props.putIfAbsent(AbstractKafkaSerDe.USE_HEADERS, "true")

プロパティー name は apicurio.registry.use.headers です。

グローバル ID ハンドラー

Kafka メッセージボディーに渡すときにグローバル ID をエンコードする方法を正確にカスタマイズできます。設定プロパティー apicurio.registry.id-handler を、io.apicurio.registry.utils.serde.strategy.IdHandler インターフェースを実装するクラスに設定します。Service Registry は、そのインターフェースの実装を 2 つ提供します。

  • io.apicurio.registry.utils.serde.strategy.DefaultIdHandler - ID を 8 バイト長として格納します。
  • io.apicurio.registry.utils.serde.strategy.Legacy4ByteIdHandler - ID を 4 バイト int として保存します。

Service Registry は、アーティファクトのグローバル ID を long として表しますが、従来の理由(または他のレジストリーとの互換性、または他のレジストリーとの互換性、または ID を送信する場合)では、4 バイトを使用したい場合があります。

Avro datum プロバイダー

Avro は、データを読み書きするためのさまざまなデータライターとリーダーを提供します。Service Registry は、3 つの異なるタイプをサポートします。

  • Generic
  • Specific
  • Reflect

Service Registry AvroDatumProvider は、実際に使用するタイプを抽象化したものです。デフォルトでは DefaultAvroDatumProvider が使用されます。

設定可能な設定オプションは 2 つあります。

  • Apicurio. registry.avro-datum-provider: AvroDatumProvider 実装の完全修飾 Java クラス名を指定します(例: io.apicurio.registry.utils.serde.avro.ReflectAvroDatumProvider)。
  • Apicurio .registry.use-specific-avro-reader - true または false。DefaultAvroDatumProviderを使用する際に特定のタイプを使用します。

Avro エンコーディング

Apache Avro を使用してデータをシリアライザーする場合、Avro バイナリーエンコーディング形式を使用するのが一般的です。これにより、データは可能な限り効率的な形式でエンコードされます。ただし、Avro は JSON としてデータのエンコードもサポートします。JSON のエンコーディングは、各メッセージのペイロードを検査することが非常に容易であるため、多くの場合、ロギング、デバッグなどのユースケースに使用されます。Service Registry Avro シリアライザーは、エンコーディングをデフォルト(バイナリー)から JSON に変更するように設定することができます。

apicurio.avro.encoding プロパティーを設定して、使用する Avro エンコーディングを設定します。値は JSON または BINARY のいずれかである必要があります。

Avro デシリアライザーの設定

シリアライザーの設定と一致するように、Avro デシリアライザークラスを設定する必要があります。これにより、以下の方法で Avro デシリアライザークラスを設定することができます。

  • URL としての Service Registry の場所
  • グローバル ID ハンドラー
  • Avro datum プロバイダー
  • Avro エンコーディング

これらの設定オプションについては、シリアライザーセクションを参照してください。プロパティー名と値は同じです。

注記

デシリアライザーの設定時には、以下のオプションは必要ありません。

  • アーティファクト ID ストラテジー
  • グローバル ID ストラテジー
  • グローバル ID の場所

これらのオプションは必要ない理由は、デシリアライザークラスはこの情報をメッセージ自体から把握できることです。2 つのストラテジーの場合、シリアライザーはメッセージの一部としてスキーマのグローバル ID を送信するため、それらは必要ありません。

グローバル ID の場所は、メッセージペイロードの開始時にマジックバイトを確認するだけで、デシリアライザーによって決定されます。そのバイトが見つかった場合、グローバル ID は設定済みのハンドラーを使用してメッセージペイロードから読み込まれます。マジックバイトが見つからない場合、グローバル ID はメッセージヘッダーから読み取られます。

Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.