13.4. 異なるクライアントのシリアライザー/デシリアライザータイプの使用
Kafka アプリケーションでスキーマ技術を使用する場合は、使用する特定のスキーマタイプを選択する必要があります。一般的なオプションは以下のとおりです。
- Apache Avro
- JSON スキーマ
- Google Protobuf
選択するスキーマ技術は、ユースケースと設定に依存します。当然ながら、Kafka を使用してカスタムシリアライザーおよびデシリアライザークラスを実装することができるので、Service Registry REST Java クライアントを使用した Service Registry 機能の利用など、いつでも独自のクラスを作成することができます。
便宜上、Service Registry は Avro、JSON スキーマ、および Protobuf スキーマテクノロジーに追加設定なしで SerDe クラスを提供します。以下のセクションでは、各タイプを使用するように Kafka アプリケーションを設定する方法を説明します。
シリアライザー/デシリアライザーの Kafka アプリケーション設定
Kafka アプリケーションで Service Registry によって提供されるシリアライザーまたはデシリアライザークラスの 1 つを使用するには、正しい設定プロパティーを設定する必要があります。以下の例は、Kafka プロデューサーアプリケーションでシリアライザーを設定する方法と、Kafka コンシューマーアプリケーションでデシリアライザーを設定する方法を示しています。
Kafka プロデューサーのシリアライザー設定の例
public Producer<Object,Object> createKafkaProducer(String kafkaBootstrapServers, String topicName) { Properties props = new Properties(); // Configure standard Kafka settings props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); props.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, "Producer-" + topicName); props.putIfAbsent(ProducerConfig.ACKS_CONFIG, "all"); // Use a Service Registry-provided Kafka serializer props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, io.apicurio.registry.utils.serde.AvroKafkaSerializer.class.getName()); props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, io.apicurio.registry.utils.serde.AvroKafkaSerializer.class.getName()); // Configure Service Registry location props.putIfAbsent(AbstractKafkaSerDe.REGISTRY_URL_CONFIG_PARAM, REGISTRY_URL); // Map the topic name (plus -key/value) to the artifactId in the registry props.putIfAbsent(AbstractKafkaSerializer.REGISTRY_ARTIFACT_ID_STRATEGY_CONFIG_PARAM, io.apicurio.registry.utils.serde.strategy.TopicIdStrategy.class.getName()); // Get an existing schema or auto-register if not found props.putIfAbsent(AbstractKafkaSerializer.REGISTRY_GLOBAL_ID_STRATEGY_CONFIG_PARAM, io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy.class.getName()); // Create the Kafka producer Producer<Object, Object> producer = new KafkaProducer<>(props); return producer; }
Kafka コンシューマーのデシリアライザー設定の例
public Consumer<Object,Object> createKafkaConsumer(String kafkaBootstrapServers, String topicName) { Properties props = new Properties(); // Configure standard Kafka settings props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "Consumer-" + topicName); props.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.putIfAbsent(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Use a Service Registry-provided Kafka deserializer props.putIfAbsent(ProducerConfig.KEY_DESERIALIZER_CLASS_CONFIG, io.apicurio.registry.utils.serde.AvroKafkaDeserializer.class.getName()); props.putIfAbsent(ProducerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.apicurio.registry.utils.serde.AvroKafkaDeserializer.class.getName()); // Configure Service Registry location props.putIfAbsent(AbstractKafkaSerDe.REGISTRY_URL_CONFIG_PARAM, REGISTRY_URL); // No other configuration needed for deserializer because globalId of the schema // the deserializer uses is sent as part of the message. The deserializer simply // extracts that globalId and uses it to look up the schema from the registry. // Create the Kafka consumer KafkaConsumer<Long, GenericRecord> consumer = new KafkaConsumer<>(props); return consumer; }
13.4.1. Service Registry を使用した Avro SerDe の設定
Service Registry は、Avro を可能な限り簡単に使用できるように、Apache Avro 用の Kafka クライアントシリアライザーおよびデシリアライザークラスを提供します。
-
io.apicurio.registry.utils.serde.AvroKafkaSerializer
-
io.apicurio.registry.utils.serde.AvroKafkaDeserializer
Avro シリアライザーの設定
Avro シリアライザークラスは以下の方法で設定できます。
- URL としての Service Registry の場所
- アーティファクト ID ストラテジー
- グローバル ID ストラテジー
- グローバル ID の場所
- グローバル ID ハンドラー
- Avro datum プロバイダー
- Avro エンコーディング
グローバル ID の場所
シリアライザーは、スキーマの一意のグローバル ID を Kafka メッセージの一部として渡し、コンシューマーがデシリアライズに適切なスキーマを使用できるようにします。グローバル ID の場所はメッセージのペイロードまたはメッセージヘッダーになります。デフォルトの方法では、メッセージペイロードでグローバル ID を渡します。代わりに、メッセージヘッダーで送信される ID が必要な場合は、以下の設定プロパティーを設定できます。
props.putIfAbsent(AbstractKafkaSerDe.USE_HEADERS, "true")
プロパティー name は apicurio.registry.use.headers
です。
グローバル ID ハンドラー
Kafka メッセージボディーに渡すときにグローバル ID をエンコードする方法を正確にカスタマイズできます。設定プロパティー apicurio.registry.id-handler
を、io.apicurio.registry.utils.serde.strategy.IdHandler
インターフェースを実装するクラスに設定します。Service Registry は、そのインターフェースの実装を 2 つ提供します。
-
io.apicurio.registry.utils.serde.strategy.DefaultIdHandler
- ID を 8 バイト長として格納します。 -
io.apicurio.registry.utils.serde.strategy.Legacy4ByteIdHandler
- ID を 4 バイト int として保存します。
Service Registry は、アーティファクトのグローバル ID を long として表しますが、従来の理由(または他のレジストリーとの互換性、または他のレジストリーとの互換性、または ID を送信する場合)では、4 バイトを使用したい場合があります。
Avro datum プロバイダー
Avro は、データを読み書きするためのさまざまなデータライターとリーダーを提供します。Service Registry は、3 つの異なるタイプをサポートします。
- Generic
- Specific
- Reflect
Service Registry AvroDatumProvider
は、実際に使用するタイプを抽象化したものです。デフォルトでは DefaultAvroDatumProvider
が使用されます。
設定可能な設定オプションは 2 つあります。
-
Apicurio.
registry.avro-datum-provider
:AvroDatumProvider
実装の完全修飾 Java クラス名を指定します(例:io.apicurio.registry.utils.serde.avro.ReflectAvroDatumProvider
)。 -
Apicurio
.registry.use-specific-avro-reader -
true または false。DefaultAvroDatumProvider
を使用する際に特定のタイプを使用します。
Avro エンコーディング
Apache Avro を使用してデータをシリアライザーする場合、Avro バイナリーエンコーディング形式を使用するのが一般的です。これにより、データは可能な限り効率的な形式でエンコードされます。ただし、Avro は JSON としてデータのエンコードもサポートします。JSON のエンコーディングは、各メッセージのペイロードを検査することが非常に容易であるため、多くの場合、ロギング、デバッグなどのユースケースに使用されます。Service Registry Avro シリアライザーは、エンコーディングをデフォルト(バイナリー)から JSON に変更するように設定することができます。
apicurio.avro.encoding
プロパティーを設定して、使用する Avro エンコーディングを設定します。値は JSON
または BINARY
のいずれかである必要があります。
Avro デシリアライザーの設定
シリアライザーの設定と一致するように、Avro デシリアライザークラスを設定する必要があります。これにより、以下の方法で Avro デシリアライザークラスを設定することができます。
- URL としての Service Registry の場所
- グローバル ID ハンドラー
- Avro datum プロバイダー
- Avro エンコーディング
これらの設定オプションについては、シリアライザーセクションを参照してください。プロパティー名と値は同じです。
デシリアライザーの設定時には、以下のオプションは必要ありません。
- アーティファクト ID ストラテジー
- グローバル ID ストラテジー
- グローバル ID の場所
これらのオプションは必要ない理由は、デシリアライザークラスはこの情報をメッセージ自体から把握できることです。2 つのストラテジーの場合、シリアライザーはメッセージの一部としてスキーマのグローバル ID を送信するため、それらは必要ありません。
グローバル ID の場所は、メッセージペイロードの開始時にマジックバイトを確認するだけで、デシリアライザーによって決定されます。そのバイトが見つかった場合、グローバル ID は設定済みのハンドラーを使用してメッセージペイロードから読み込まれます。マジックバイトが見つからない場合、グローバル ID はメッセージヘッダーから読み取られます。