7.4. 使用 Kafka 消费者客户端中的模式
此流程描述了如何配置使用 Java 编写的 Kafka 消费者客户端,以使用 Apicurio Registry 中的模式。
前提条件
- 已安装 Apicurio Registry
- 模式在 Apicurio Registry 中注册
流程
使用 Apicurio Registry 的 URL 配置客户端。例如:
Copy to Clipboard Copied! Toggle word wrap Toggle overflow String registryUrl = "https://registry.example.com/apis/registry/v2"; Properties props = new Properties(); props.putIfAbsent(SerdeConfig.REGISTRY_URL, registryUrl);
String registryUrl = "https://registry.example.com/apis/registry/v2"; Properties props = new Properties(); props.putIfAbsent(SerdeConfig.REGISTRY_URL, registryUrl);
使用 Apicurio Registry deserializer 配置客户端。例如:
Copy to Clipboard Copied! Toggle word wrap Toggle overflow // Configure Kafka settings props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS); props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "Consumer-" + TOPIC_NAME); props.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.putIfAbsent(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Configure deserializer settings props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, AvroKafkaDeserializer.class.getName()); props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroKafkaDeserializer.class.getName());
// Configure Kafka settings props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS); props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "Consumer-" + TOPIC_NAME); props.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.putIfAbsent(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Configure deserializer settings props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, AvroKafkaDeserializer.class.getName());
1 props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroKafkaDeserializer.class.getName());
2