15.8. 用于追踪的生产者和消费者


在 Kafka 生产者和消费者中启用追踪的检测应用程序代码。使用 decorator 模式或拦截器工具您的 Java producer 和消费者应用程序代码进行追踪。然后,您可以在从主题生成或检索信息时记录跟踪。

OpenTelemetry 和 OpenTracing 检测项目提供支持生产者和消费者工具的类。

decorator 工具
对于 decorator 检测,请创建修改的制作者或消费者实例以进行追踪。decorator 工具与 OpenTelemetry 和 OpenTracing 的不同。
拦截器工具
要进行拦截器检测,请将追踪功能添加到使用者或制作者配置中。拦截器工具与 OpenTelemetry 和 OpenTracing 相同。

先决条件

  • 您已为 客户端 初始化追踪

    您可以通过在项目中添加追踪 JAR 作为依赖项来启用制作者和使用者应用。

流程

在各个制作者和消费者应用的应用代码中执行这些步骤。使用 decorator 模式或拦截器(拦截器)检测您的客户端应用程序代码。

  • 要使用 decorator 模式,请创建修改后的制作者或消费者实例来发送或接收消息。

    您传递了原始 KafkaProducerKafkaConsumer 类。

    OpenTelemetry 的 decorator 检测示例

    // Producer instance
    Producer < String, String > op = new KafkaProducer < > (
        configs,
        new StringSerializer(),
        new StringSerializer()
        );
        Producer < String, String > producer = tracing.wrap(op);
    KafkaTracing tracing = KafkaTracing.create(GlobalOpenTelemetry.get());
    producer.send(...);
    
    //consumer instance
    Consumer<String, String> oc = new KafkaConsumer<>(
        configs,
        new StringDeserializer(),
        new StringDeserializer()
        );
        Consumer<String, String> consumer = tracing.wrap(oc);
    consumer.subscribe(Collections.singleton("mytopic"));
    ConsumerRecords<Integer, String> records = consumer.poll(1000);
    ConsumerRecord<Integer, String> record = ...
    SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);
    Copy to Clipboard Toggle word wrap

    OpenTracing 的 decorator 检测示例

    //producer instance
    KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
    TracingKafkaProducer<Integer, String> tracingProducer = new TracingKafkaProducer<>(producer, tracer);
    TracingKafkaProducer.send(...)
    
    //consumer instance
    KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
    TracingKafkaConsumer<Integer, String> tracingConsumer = new TracingKafkaConsumer<>(consumer, tracer);
    tracingConsumer.subscribe(Collections.singletonList("mytopic"));
    ConsumerRecords<Integer, String> records = tracingConsumer.poll(1000);
    ConsumerRecord<Integer, String> record = ...
    SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);
    Copy to Clipboard Toggle word wrap

  • 要使用拦截器,请在制作者或消费者配置中设置拦截器类。

    您以通常的方式使用 KafkaProducerKafkaConsumer 类。TracingProducerInterceptorTracingConsumerInterceptor interceptor 类负责追踪功能。

    使用拦截器的制作者配置示例

    senderProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
        TracingProducerInterceptor.class.getName());
    
    KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
    producer.send(...);
    Copy to Clipboard Toggle word wrap

    使用拦截器的使用者配置示例

    consumerProps.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
        TracingConsumerInterceptor.class.getName());
    
    KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
    consumer.subscribe(Collections.singletonList("messages"));
    ConsumerRecords<Integer, String> records = consumer.poll(1000);
    ConsumerRecord<Integer, String> record = ...
    SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);
    Copy to Clipboard Toggle word wrap

返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat