7.6. Kafka Streams アプリケーションからのスキーマの使用
この手順では、Apicurio Registry からの Apache Avro スキーマを使用するように Java で書かれた Kafka Streams クライアントを設定する方法について説明します。
前提条件
- Apicurio Registry がインストールされている
- スキーマは Apicurio Registry に登録されている
手順
Apicurio Registry URL を使用して Java クライアントを作成および設定します。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow String registryUrl = "https://registry.example.com/apis/registry/v2"; RegistryService client = RegistryClient.cached(registryUrl);
String registryUrl = "https://registry.example.com/apis/registry/v2"; RegistryService client = RegistryClient.cached(registryUrl);
シリアライザーおよびデシリアライザーを設定します。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow Serializer<LogInput> serializer = new AvroKafkaSerializer<LogInput>(); Deserializer<LogInput> deserializer = new AvroKafkaDeserializer <LogInput>(); Serde<LogInput> logSerde = Serdes.serdeFrom( serializer, deserializer ); Map<String, Object> config = new HashMap<>(); config.put(SerdeConfig.REGISTRY_URL, registryUrl); config.put(AvroKafkaSerdeConfig.USE_SPECIFIC_AVRO_READER, true); logSerde.configure(config, false);
Serializer<LogInput> serializer = new AvroKafkaSerializer<LogInput>();
1 Deserializer<LogInput> deserializer = new AvroKafkaDeserializer <LogInput>();
2 Serde<LogInput> logSerde = Serdes.serdeFrom( serializer, deserializer ); Map<String, Object> config = new HashMap<>(); config.put(SerdeConfig.REGISTRY_URL, registryUrl); config.put(AvroKafkaSerdeConfig.USE_SPECIFIC_AVRO_READER, true); logSerde.configure(config, false);
3 Kafka Streams クライアントを作成します。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow KStream<String, LogInput> input = builder.stream( INPUT_TOPIC, Consumed.with(Serdes.String(), logSerde) );
KStream<String, LogInput> input = builder.stream( INPUT_TOPIC, Consumed.with(Serdes.String(), logSerde) );