15.9. 为追踪的工具 Kafka Streams 应用程序


在 Kafka Streams API 应用程序中启用追踪的检测应用程序代码。使用 decorator 模式或拦截器来检测您的 Kafka Streams API 应用程序进行追踪。然后,您可以在生成信息或从主题检索时记录 trace。

decorator 检测
对于 decorator 检测,请为追踪创建一个修改的 Kafka Streams 实例。OpenTracing 检测项目提供了一个 TracingKafkaClientSupplier 类,支持检测 Kafka Streams。您可以创建一个 TracingKafkaClientSupplier 供应商接口的嵌套实例,它为 Kafka Streams 提供追踪工具。对于 OpenTelemetry,进程相同,但您需要创建自定义 TracingKafkaClientSupplier 类来提供支持。
拦截器检测
对于拦截器检测,在 Kafka Streams producer 和消费者配置中添加追踪功能。

先决条件

  • 您已为 客户端初始化了追踪

    您可以通过在项目中添加追踪 JAR 作为依赖项来启用 Kafka Streams 应用程序中的检测。

  • 要使用 OpenTelemetry 检测 Kafka Streams,您需要编写自定义 TracingKafkaClientSupplier
  • 自定义 TracingKafkaClientSupplier 可以扩展 Kafka 的 DefaultKafkaClientSupplier,覆盖生成者和消费者创建方法,将实例嵌套与遥测相关的代码。

    自定义 TracingKafkaClientSupplier 示例

    private class TracingKafkaClientSupplier extends DefaultKafkaClientSupplier {
        @Override
        public Producer<byte[], byte[]> getProducer(Map<String, Object> config) {
            KafkaTelemetry telemetry = KafkaTelemetry.create(GlobalOpenTelemetry.get());
            return telemetry.wrap(super.getProducer(config));
        }
    
        @Override
        public Consumer<byte[], byte[]> getConsumer(Map<String, Object> config) {
            KafkaTelemetry telemetry = KafkaTelemetry.create(GlobalOpenTelemetry.get());
            return telemetry.wrap(super.getConsumer(config));
        }
    
        @Override
        public Consumer<byte[], byte[]> getRestoreConsumer(Map<String, Object> config) {
            return this.getConsumer(config);
        }
    
        @Override
        public Consumer<byte[], byte[]> getGlobalConsumer(Map<String, Object> config) {
            return this.getConsumer(config);
        }
    }

流程

为每个 Kafka Streams API 应用程序执行这些步骤。

  • 要使用 decorator 模式,请创建一个 TracingKafkaClientSupplier 供应商接口实例,然后为 KafkaStreams 提供供应商接口。

    decorator 检测示例

    KafkaClientSupplier supplier = new TracingKafkaClientSupplier(tracer);
    KafkaStreams streams = new KafkaStreams(builder.build(), new StreamsConfig(config), supplier);
    streams.start();

  • 要使用拦截器,在 Kafka Streams producer 和消费者配置中设置拦截器类。

    TracingProducerInterceptorTracingConsumerInterceptor interceptor 类负责追踪功能。

    使用拦截器的制作者和消费者配置示例

    props.put(StreamsConfig.PRODUCER_PREFIX + ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
    props.put(StreamsConfig.CONSUMER_PREFIX + ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());

Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2026 Red Hat
返回顶部