7.6. 使用 Kafka Streams 应用程序的 schema
这个步骤描述了如何配置在 Java 中编写的 Kafka Streams 客户端,以使用 Apicurio Registry 中的 Apache Avro 模式。
前提条件
- 已安装 Apicurio Registry
- 模式使用 Apicurio Registry 注册
流程
使用 Apicurio Registry URL 创建并配置 Java 客户端:
String registryUrl = "https://registry.example.com/apis/registry/v2"; RegistryService client = RegistryClient.cached(registryUrl);配置 serializer 和 deserializer:
Serializer<LogInput> serializer = new AvroKafkaSerializer<LogInput>();1 Deserializer<LogInput> deserializer = new AvroKafkaDeserializer <LogInput>();2 Serde<LogInput> logSerde = Serdes.serdeFrom( serializer, deserializer ); Map<String, Object> config = new HashMap<>(); config.put(SerdeConfig.REGISTRY_URL, registryUrl); config.put(AvroKafkaSerdeConfig.USE_SPECIFIC_AVRO_READER, true); logSerde.configure(config, false);3 - 1. Apicurio Registry 提供的 Avro serializer。
- 2.Apicurio Registry 提供的 Avro deserializer。
- 3.配置 Apicurio Registry URL 和 Avro 读取器以 Avro 格式进行反序列化。
创建 Kafka Streams 客户端:
KStream<String, LogInput> input = builder.stream( INPUT_TOPIC, Consumed.with(Serdes.String(), logSerde) );