7.6. 使用 Kafka Streams 应用程序中的 schema


此流程描述了如何配置使用 Java 编写的 Kafka Streams 客户端,以使用来自 Service Registry 的 Apache Avro 模式。

先决条件

  • 已安装 Service Registry
  • 模式使用 Service Registry 注册

流程

  1. 使用 Service Registry URL 创建并配置 Java 客户端:

    String registryUrl = "https://registry.example.com/apis/registry/v2";
    
    RegistryService client = RegistryClient.cached(registryUrl);
  2. 配置 serializer 和 deserializer:

    Serializer<LogInput> serializer = new AvroKafkaSerializer<LogInput>(); 1
    
    Deserializer<LogInput> deserializer = new AvroKafkaDeserializer <LogInput>(); 2
    
    Serde<LogInput> logSerde = Serdes.serdeFrom(
        serializer,
        deserializer
    );
    
    Map<String, Object> config = new HashMap<>();
    config.put(SerdeConfig.REGISTRY_URL, registryUrl);
    config.put(AvroKafkaSerdeConfig.USE_SPECIFIC_AVRO_READER, true);
    logSerde.configure(config, false); 3
    1
    Service Registry 提供的 Avro serializer。
    2
    Service Registry 提供的 Avro deserializer。
    3
    配置 Service Registry URL 和 Avro reader,以 Avro 格式进行 deserialization。
  3. 创建 Kafka Streams 客户端:

    KStream<String, LogInput> input = builder.stream(
        INPUT_TOPIC,
        Consumed.with(Serdes.String(), logSerde)
    );
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.