15.2. 为 Kafka 客户端设置追踪


初始化 Jaeger tracer 以检测您的客户端应用程序以进行分布式追踪。

15.2.1. 为 Kafka 客户端初始化 Jaeger tracer

使用一组 追踪环境变量 配置并初始化 Jaeger tracer。

流程

在每个客户端应用程序中:

  1. 将 Jaeger 的 Maven 依赖项添加到客户端应用程序的 pom.xml 文件中:

    <dependency>
        <groupId>io.jaegertracing</groupId>
        <artifactId>jaeger-client</artifactId>
        <version>1.5.0.redhat-00001</version>
    </dependency>
    Copy to Clipboard Toggle word wrap
  2. 使用 追踪环境变量 定义 Jaeger tracer 的配置。
  3. 从在第 2 步中定义的环境变量创建 Jaeger tracer:

    Tracer tracer = Configuration.fromEnv().getTracer();
    Copy to Clipboard Toggle word wrap
    注意

    有关初始化 Jaeger tracer 的替代方法,请参阅 Java OpenTracing 库 文档。

  4. 将 Jaeger tracer 注册为全局 tracer:

    GlobalTracer.register(tracer);
    Copy to Clipboard Toggle word wrap

现在,会初始化 Jaeger tracer 供客户端应用程序使用。

15.2.2. 用于追踪的制作者和消费者的工具

使用 Decorator 模式或 Interceptors 检测您的 Java 生成者和消费者应用程序代码以进行追踪。

流程

在每个制作者和消费者应用程序的应用程序代码中:

  1. 将 OpenTracing 的 Maven 依赖项添加到制作者或消费者的 pom.xml 文件中。

    <dependency>
        <groupId>io.opentracing.contrib</groupId>
        <artifactId>opentracing-kafka-client</artifactId>
        <version>0.1.15.redhat-00002</version>
    </dependency>
    Copy to Clipboard Toggle word wrap
  2. 使用 Decorator 模式或 Interceptors 检测您的客户端应用程序代码。

    • 使用 Decorator 模式:

      // Create an instance of the KafkaProducer:
      KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
      
      // Create an instance of the TracingKafkaProducer:
      TracingKafkaProducer<Integer, String> tracingProducer = new TracingKafkaProducer<>(producer,
              tracer);
      
      // Send:
      tracingProducer.send(...);
      
      // Create an instance of the KafkaConsumer:
      KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
      
      // Create an instance of the TracingKafkaConsumer:
      TracingKafkaConsumer<Integer, String> tracingConsumer = new TracingKafkaConsumer<>(consumer,
              tracer);
      
      // Subscribe:
      tracingConsumer.subscribe(Collections.singletonList("messages"));
      
      // Get messages:
      ConsumerRecords<Integer, String> records = tracingConsumer.poll(1000);
      
      // Retrieve SpanContext from polled record (consumer side):
      ConsumerRecord<Integer, String> record = ...
      SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);
      Copy to Clipboard Toggle word wrap
    • 使用 Interceptors:

      // Register the tracer with GlobalTracer:
      GlobalTracer.register(tracer);
      
      // Add the TracingProducerInterceptor to the sender properties:
      senderProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
                TracingProducerInterceptor.class.getName());
      
      // Create an instance of the KafkaProducer:
      KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
      
      // Send:
      producer.send(...);
      
      // Add the TracingConsumerInterceptor to the consumer properties:
      consumerProps.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
                TracingConsumerInterceptor.class.getName());
      
      // Create an instance of the KafkaConsumer:
      KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
      
      // Subscribe:
      consumer.subscribe(Collections.singletonList("messages"));
      
      // Get messages:
      ConsumerRecords<Integer, String> records = consumer.poll(1000);
      
      // Retrieve the SpanContext from a polled message (consumer side):
      ConsumerRecord<Integer, String> record = ...
      SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);
      Copy to Clipboard Toggle word wrap
Decorator 模式中的自定义范围名称

span 是 Jaeger 中的逻辑工作单元,带有操作名称、开始时间和持续时间。

要使用 Decorator 模式来检测制作者和消费者应用程序,请在创建 TracingKafkaProducerTracingKafkaConsumer 对象时传递 BiFunction 对象来定义自定义范围名称。OpenTracing Apache Kafka 客户端调用库包括几个内置范围名称。

示例:使用自定义范围名称来在 Decorator 模式中检测客户端应用程序代码

// Create a BiFunction for the KafkaProducer that operates on (String operationName, ProducerRecord consumerRecord) and returns a String to be used as the name:

BiFunction<String, ProducerRecord, String> producerSpanNameProvider =
    (operationName, producerRecord) -> "CUSTOM_PRODUCER_NAME";

// Create an instance of the KafkaProducer:
KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);

// Create an instance of the TracingKafkaProducer
TracingKafkaProducer<Integer, String> tracingProducer = new TracingKafkaProducer<>(producer,
        tracer,
        producerSpanNameProvider);

// Spans created by the tracingProducer will now have "CUSTOM_PRODUCER_NAME" as the span name.

// Create a BiFunction for the KafkaConsumer that operates on (String operationName, ConsumerRecord consumerRecord) and returns a String to be used as the name:

BiFunction<String, ConsumerRecord, String> consumerSpanNameProvider =
    (operationName, consumerRecord) -> operationName.toUpperCase();

// Create an instance of the KafkaConsumer:
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);

// Create an instance of the TracingKafkaConsumer, passing in the consumerSpanNameProvider BiFunction:

TracingKafkaConsumer<Integer, String> tracingConsumer = new TracingKafkaConsumer<>(consumer,
        tracer,
        consumerSpanNameProvider);

// Spans created by the tracingConsumer will have the operation name as the span name, in upper-case.
// "receive" -> "RECEIVE"
Copy to Clipboard Toggle word wrap

内置范围名称

在定义自定义 span 名称时,您可以在 ClientSpanNameProvider 类中使用以下 BiFunctions。如果没有指定 spanNameProvider,则使用 CONSUMER_OPERATION_NAMEPRODUCER_OPERATION_NAME

Expand
表 15.1. 用于定义自定义范围名称的 BiFunctions
BiFunction描述

CONSUMER_OPERATION_NAME, PRODUCER_OPERATION_NAME

返回 operationName 作为范围名称: "receive" (消费者)和 "send" for producers。

CONSUMER_PREFIXED_OPERATION_NAME (String prefix), PRODUCER_PREFIXED_OPERATION_NAME (String prefix)

返回 前缀和 operationName 的字符串串联。

CONSUMER_TOPIC, PRODUCER_TOPIC

返回消息发送到或从中检索的主题名称,格式为 (record.topic ())

PREFIXED_CONSUMER_TOPIC (String prefix), PREFIXED_PRODUCER_TOPIC (String prefix)

返回前缀和主题名称 (record.topic () )的字符串串联。

CONSUMER_OPERATION_NAME_TOPIC, PRODUCER_OPERATION_NAME_TOPIC

返回操作名称和主题名称:" operationName - record.topic ()"

CONSUMER_PREFIXED_OPERATION_NAME_TOPIC (String prefix), PRODUCER_PREFIXED_OPERATION_NAME_TOPIC (String prefix)

返回 前缀和 "operationName - record.topic ()" 的字符串串联。

15.2.3. 为追踪检测 Kafka Streams 应用程序

使用供应商接口为分布式追踪检测 Kafka Streams 应用程序。这会在应用程序中启用 Interceptors。

流程

在每个 Kafka Streams 应用程序中:

  1. opentracing-kafka-streams 依赖项添加到 Kafka Streams 应用程序的 pom.xml 文件中。

    <dependency>
        <groupId>io.opentracing.contrib</groupId>
        <artifactId>opentracing-kafka-streams</artifactId>
        <version>0.1.15.redhat-00002</version>
    </dependency>
    Copy to Clipboard Toggle word wrap
  2. 创建 TracingKafkaClientSupplier 供应商接口的实例:

    KafkaClientSupplier supplier = new TracingKafkaClientSupplier(tracer);
    Copy to Clipboard Toggle word wrap
  3. KafkaStreams 提供供应商接口:

    KafkaStreams streams = new KafkaStreams(builder.build(), new StreamsConfig(config), supplier);
    streams.start();
    Copy to Clipboard Toggle word wrap
返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat