13.8. Kafka Streams アプリケーションからのスキーマの使用
この手順では、Service Registry からのスキーマを使用するように Java で書かれた Kafka Streams クライアントを設定する方法を説明します。
前提条件
- Service Registry がインストールされている必要があります。
- スキーマが Service Registry に登録されている必要があります。
手順
Service Registry で REST クライアントを作成および設定します。以下に例を示します。
String registryUrl = "https://registry.example.com/api"; RegistryService client = RegistryClient.cached(registryUrl);シリアライザー、デシリアライザーの設定、および Kafka Streams クライアントを作成します。以下に例を示します。
Serializer<LogInput> serializer = new AvroKafkaSerializer<>(1 client, new DefaultAvroDatumProvider<LogInput>().setUseSpecificAvroReader(true) ); Deserializer<LogInput> deserializer = new AvroKafkaDeserializer <> (2 client, new DefaultAvroDatumProvider<LogInput>().setUseSpecificAvroReader(true) ); Serde<LogInput> logSerde = Serdes.serdeFrom(3 serializer, deserializer ); KStream<String, LogInput> input = builder.stream(4 INPUT_TOPIC, Consumed.with(Serdes.String(), logSerde) );