13.8. Kafka Streams アプリケーションからのスキーマの使用
この手順では、Service Registry からのスキーマを使用するように Java で書かれた Kafka Streams クライアントを設定する方法を説明します。
前提条件
- Service Registry がインストールされている必要があります。
- スキーマが Service Registry に登録されている必要があります。
手順
Service Registry で REST クライアントを作成および設定します。以下に例を示します。
String registryUrl = "https://registry.example.com/api"; RegistryService client = RegistryClient.cached(registryUrl);
シリアライザー、デシリアライザーの設定、および Kafka Streams クライアントを作成します。以下に例を示します。
Serializer<LogInput> serializer = new AvroKafkaSerializer<>( 1 client, new DefaultAvroDatumProvider<LogInput>().setUseSpecificAvroReader(true) ); Deserializer<LogInput> deserializer = new AvroKafkaDeserializer <> ( 2 client, new DefaultAvroDatumProvider<LogInput>().setUseSpecificAvroReader(true) ); Serde<LogInput> logSerde = Serdes.serdeFrom( 3 serializer, deserializer ); KStream<String, LogInput> input = builder.stream( 4 INPUT_TOPIC, Consumed.with(Serdes.String(), logSerde) );