15.9. 为追踪提供 Kafka Streams 应用程序


在 Kafka Streams API 应用程序中启用追踪的检测应用程序代码。使用 decorator 模式或拦截器检测您的 Kafka Streams API 应用程序进行追踪。然后,您可以在从主题生成或检索信息时记录跟踪。

decorator 工具
对于 decorator 工具,为追踪创建修改后的 Kafka Streams 实例。OpenTracing 工具项目提供 TracingKafkaClientSupplier 类,它支持 Kafka 流的检测。您可以创建一个 TracingKafkaClientSupplier 供应商接口的嵌套实例,它为 Kafka Streams 提供追踪工具。对于 OpenTelemetry,此过程相同,但您需要创建一个自定义 TracingKafkaClientSupplier 类以提供支持。
拦截器工具
要进行拦截器检测,请将追踪功能添加到 Kafka Streams producer 和消费者配置中。

先决条件

  • 您已为 客户端 初始化追踪

    您可以通过在项目中添加追踪 JAR 作为依赖项来启用 Kafka Streams 应用程序中的检测。

  • 要使用 OpenTelemetry 检测 Kafka Streams,您需要编写自定义 TracingKafkaClientSupplier
  • 自定义 TracingKafkaClientSupplier 可以扩展 Kafka 的 DefaultKafkaClientSupplier,覆盖生成者和消费者创建方法,将实例嵌套与遥测相关的代码。

    自定义 TracingKafkaClientSupplier 示例

    private class TracingKafkaClientSupplier extends DefaultKafkaClientSupplier {
        @Override
        public Producer<byte[], byte[]> getProducer(Map<String, Object> config) {
            KafkaTelemetry telemetry = KafkaTelemetry.create(GlobalOpenTelemetry.get());
            return telemetry.wrap(super.getProducer(config));
        }
    
        @Override
        public Consumer<byte[], byte[]> getConsumer(Map<String, Object> config) {
            KafkaTelemetry telemetry = KafkaTelemetry.create(GlobalOpenTelemetry.get());
            return telemetry.wrap(super.getConsumer(config));
        }
    
        @Override
        public Consumer<byte[], byte[]> getRestoreConsumer(Map<String, Object> config) {
            return this.getConsumer(config);
        }
    
        @Override
        public Consumer<byte[], byte[]> getGlobalConsumer(Map<String, Object> config) {
            return this.getConsumer(config);
        }
    }
    Copy to Clipboard Toggle word wrap

流程

为每个 Kafka Streams API 应用程序执行这些步骤。

  • 要使用 decorator 模式,请创建一个 TracingKafkaClientSupplier 供应商接口实例,然后向 KafkaStreams 提供供应商接口。

    decorator 工具示例

    KafkaClientSupplier supplier = new TracingKafkaClientSupplier(tracer);
    KafkaStreams streams = new KafkaStreams(builder.build(), new StreamsConfig(config), supplier);
    streams.start();
    Copy to Clipboard Toggle word wrap

  • 要使用拦截器,请在 Kafka Streams producer 和消费者配置中设置拦截器类。

    TracingProducerInterceptorTracingConsumerInterceptor interceptor 类负责追踪功能。

    使用拦截器的制作者和使用者配置示例

    props.put(StreamsConfig.PRODUCER_PREFIX + ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
    props.put(StreamsConfig.CONSUMER_PREFIX + ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
    Copy to Clipboard Toggle word wrap

返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat