17.8. 用于追踪的制作者和消费者的工具


检测应用程序代码,以便在 Kafka 生成者和消费者中启用追踪。使用 decorator 模式或拦截器来检测您的 Java 生成者和消费者应用程序代码以进行追踪。然后,您可以在从主题生成或检索消息时记录 trace。

OpenTelemetry 和 OpenTracing 检测项目提供类支持生成者和消费者的工具。

decorator 检测
对于 decorator 检测,请为追踪创建修改后的制作者或消费者实例。对于 OpenTelemetry 和 OpenTracing,decorator 检测有所不同。
拦截器检测
对于拦截器检测,请将追踪功能添加到消费者或生成者配置中。OpenTelemetry 和 OpenTracing 的拦截器检测是相同的。

先决条件

  • 您已为 客户端 初始化了追踪

    您可以通过在项目中添加追踪 JAR 作为依赖项来启用生成者和消费者应用程序的检测。

流程

在每个制作者和消费者应用的应用程序代码中执行这些步骤。使用 decorator 模式或拦截器(拦截器)检测您的客户端应用程序代码。

  • 要使用 decorator 模式,请创建一个修改后的制作者或消费者实例来发送或接收消息。

    您传递了原始 KafkaProducerKafkaConsumer 类。

    OpenTelemetry 的 decorator 检测示例

    // Producer instance
    Producer < String, String > op = new KafkaProducer < > (
        configs,
        new StringSerializer(),
        new StringSerializer()
        );
        Producer < String, String > producer = tracing.wrap(op);
    KafkaTracing tracing = KafkaTracing.create(GlobalOpenTelemetry.get());
    producer.send(...);
    
    //consumer instance
    Consumer<String, String> oc = new KafkaConsumer<>(
        configs,
        new StringDeserializer(),
        new StringDeserializer()
        );
        Consumer<String, String> consumer = tracing.wrap(oc);
    consumer.subscribe(Collections.singleton("mytopic"));
    ConsumerRecords<Integer, String> records = consumer.poll(1000);
    ConsumerRecord<Integer, String> record = ...
    SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);
    Copy to Clipboard Toggle word wrap

    OpenTracing 的 decorator 检测示例

    //producer instance
    KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
    TracingKafkaProducer<Integer, String> tracingProducer = new TracingKafkaProducer<>(producer, tracer);
    TracingKafkaProducer.send(...)
    
    //consumer instance
    KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
    TracingKafkaConsumer<Integer, String> tracingConsumer = new TracingKafkaConsumer<>(consumer, tracer);
    tracingConsumer.subscribe(Collections.singletonList("mytopic"));
    ConsumerRecords<Integer, String> records = tracingConsumer.poll(1000);
    ConsumerRecord<Integer, String> record = ...
    SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);
    Copy to Clipboard Toggle word wrap

  • 要使用拦截器,请在生成者或消费者配置中设置拦截器类。

    您以通常的方式使用 KafkaProducerKafkaConsumer 类。TracingProducerInterceptorTracingConsumerInterceptor interceptor 类负责追踪功能。

    使用拦截器的制作者配置示例

    senderProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
        TracingProducerInterceptor.class.getName());
    
    KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
    producer.send(...);
    Copy to Clipboard Toggle word wrap

    使用拦截器的消费者配置示例

    consumerProps.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
        TracingConsumerInterceptor.class.getName());
    
    KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
    consumer.subscribe(Collections.singletonList("messages"));
    ConsumerRecords<Integer, String> records = consumer.poll(1000);
    ConsumerRecord<Integer, String> record = ...
    SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);
    Copy to Clipboard Toggle word wrap

返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat