15.2. 为 Kafka 客户端设置追踪


初始化 Jaeger tracer,以检测您的客户端应用程序以进行分布式追踪。

15.2.1. 为 Kafka 客户端初始化 Jaeger tracer

使用一组 追踪环境变量 配置和初始化 Jaeger tracer。

流程

在每个客户端应用程序中:

  1. 将 Jaeger 的 Maven 依赖项添加到客户端应用程序的 pom.xml 文件中:

    <dependency>
        <groupId>io.jaegertracing</groupId>
        <artifactId>jaeger-client</artifactId>
        <version>1.5.0.redhat-00001</version>
    </dependency>
    Copy to Clipboard Toggle word wrap
  2. 使用 追踪环境变量 定义 Jaeger tracer 的配置。
  3. 从您在第两个步骤中定义的环境变量创建 Jaeger tracer:

    Tracer tracer = Configuration.fromEnv().getTracer();
    Copy to Clipboard Toggle word wrap
    注意

    有关初始化 Jaeger tracer 的替代方法,请参阅 Java OpenTracing 库 文档。

  4. 将 Jaeger tracer 注册为全局 tracer:

    GlobalTracer.register(tracer);
    Copy to Clipboard Toggle word wrap

现在,为要使用的客户端应用程序初始化 Jaeger tracer。

15.2.2. 提取制作者和消费者以进行追踪

使用 Decorator 模式或 Interceptors 来检测您的 Java 制作者和消费者应用程序代码进行追踪。

流程

在每个制作者和消费者应用程序的应用程序代码中:

  1. 将 OpenTracing 的 Maven 依赖项添加到制作者或消费者的 pom.xml 文件。

    <dependency>
        <groupId>io.opentracing.contrib</groupId>
        <artifactId>opentracing-kafka-client</artifactId>
        <version>0.1.15.redhat-00006</version>
    </dependency>
    Copy to Clipboard Toggle word wrap
  2. 使用 Decorator 模式或 Interceptors 检测您的客户端应用程序代码。

    • 使用 Decorator 模式:

      // Create an instance of the KafkaProducer:
      KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
      
      // Create an instance of the TracingKafkaProducer:
      TracingKafkaProducer<Integer, String> tracingProducer = new TracingKafkaProducer<>(producer,
              tracer);
      
      // Send:
      tracingProducer.send(...);
      
      // Create an instance of the KafkaConsumer:
      KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
      
      // Create an instance of the TracingKafkaConsumer:
      TracingKafkaConsumer<Integer, String> tracingConsumer = new TracingKafkaConsumer<>(consumer,
              tracer);
      
      // Subscribe:
      tracingConsumer.subscribe(Collections.singletonList("messages"));
      
      // Get messages:
      ConsumerRecords<Integer, String> records = tracingConsumer.poll(1000);
      
      // Retrieve SpanContext from polled record (consumer side):
      ConsumerRecord<Integer, String> record = ...
      SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);
      Copy to Clipboard Toggle word wrap
    • 使用 Interceptors:

      // Register the tracer with GlobalTracer:
      GlobalTracer.register(tracer);
      
      // Add the TracingProducerInterceptor to the sender properties:
      senderProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
                TracingProducerInterceptor.class.getName());
      
      // Create an instance of the KafkaProducer:
      KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
      
      // Send:
      producer.send(...);
      
      // Add the TracingConsumerInterceptor to the consumer properties:
      consumerProps.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
                TracingConsumerInterceptor.class.getName());
      
      // Create an instance of the KafkaConsumer:
      KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
      
      // Subscribe:
      consumer.subscribe(Collections.singletonList("messages"));
      
      // Get messages:
      ConsumerRecords<Integer, String> records = consumer.poll(1000);
      
      // Retrieve the SpanContext from a polled message (consumer side):
      ConsumerRecord<Integer, String> record = ...
      SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);
      Copy to Clipboard Toggle word wrap
在 Decorator 模式中的自定义 span 名称

span 是 Jaeger 中的逻辑工作单元,包括操作名称、开始时间和持续时间。

要使用 Decorator 模式检测您的制作者和消费者应用程序,请在创建 TracingKafkaProducerTracingKafkaConsumer 对象时将 BiFunction 对象作为附加参数来定义自定义 span 名称。OpenTracing Apache Kafka Client Instrumentation 库包括几个内置范围名称。

示例:使用自定义 span 名称以 Decorator 模式检测客户端应用程序代码

// Create a BiFunction for the KafkaProducer that operates on (String operationName, ProducerRecord consumerRecord) and returns a String to be used as the name:

BiFunction<String, ProducerRecord, String> producerSpanNameProvider =
    (operationName, producerRecord) -> "CUSTOM_PRODUCER_NAME";

// Create an instance of the KafkaProducer:
KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);

// Create an instance of the TracingKafkaProducer
TracingKafkaProducer<Integer, String> tracingProducer = new TracingKafkaProducer<>(producer,
        tracer,
        producerSpanNameProvider);

// Spans created by the tracingProducer will now have "CUSTOM_PRODUCER_NAME" as the span name.

// Create a BiFunction for the KafkaConsumer that operates on (String operationName, ConsumerRecord consumerRecord) and returns a String to be used as the name:

BiFunction<String, ConsumerRecord, String> consumerSpanNameProvider =
    (operationName, consumerRecord) -> operationName.toUpperCase();

// Create an instance of the KafkaConsumer:
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);

// Create an instance of the TracingKafkaConsumer, passing in the consumerSpanNameProvider BiFunction:

TracingKafkaConsumer<Integer, String> tracingConsumer = new TracingKafkaConsumer<>(consumer,
        tracer,
        consumerSpanNameProvider);

// Spans created by the tracingConsumer will have the operation name as the span name, in upper-case.
// "receive" -> "RECEIVE"
Copy to Clipboard Toggle word wrap

内置范围名称

在定义自定义范围名称时,您可以在 ClientSpanNameProvider 类中使用以下 BiFunctions。如果没有指定 spanNameProvider,则使用 CONSUMER_OPERATION_NAMEPRODUCER_OPERATION_NAME

Expand
表 15.1. BiFunctions 定义自定义范围名称
BiFunction描述

CONSUMER_OPERATION_NAME, PRODUCER_OPERATION_NAME

返回 operationName 作为使用者和"end"的 span name: "receive"。

CONSUMER_PREFIXED_OPERATION_NAME (字符串前缀)、PRODUCER_PREFIXED_OPERATION_NAME (字符串前缀)

返回 前缀operationName 的字符串。

CONSUMER_TOPIC, PRODUCER_TOPIC

以格式 (record.topic ()) 返回消息发送到或检索的主题名称。

PREFIXED_CONSUMER_TOPIC (字符串前缀)、PREFIXED_PRODUCER_TOPIC (字符串前缀)

返回 前缀的 String connection,以及格式为 (record.topic ()) 的主题名称。

CONSUMER_OPERATION_NAME_TOPIC, PRODUCER_OPERATION_NAME_TOPIC

返回操作名称和主题名称:" operationName - record.topic ()"

CONSUMER_PREFIXED_OPERATION_NAME_TOPIC (字符串前缀)、PRODUCER_PREFIXED_OPERATION_NAME_TOPIC (字符串前缀)

返回 前缀 String linkion 和 "operationName - record.topic ()"

15.2.3. 用于追踪的 Kafka Streams 应用程序

使用供应商接口检测 Kafka Streams 应用程序以进行分布式追踪。这会在应用程序中启用 Interceptors。

流程

在每个 Kafka Streams 应用程序中:

  1. opentracing-kafka-streams 依赖项添加到 Kafka Streams 应用的 pom.xml 文件中。

    <dependency>
        <groupId>io.opentracing.contrib</groupId>
        <artifactId>opentracing-kafka-streams</artifactId>
        <version>0.1.15.redhat-00006</version>
    </dependency>
    Copy to Clipboard Toggle word wrap
  2. 创建 TracingKafkaClientSupplier 供应商接口实例:

    KafkaClientSupplier supplier = new TracingKafkaClientSupplier(tracer);
    Copy to Clipboard Toggle word wrap
  3. KafkaStreams 提供供应商接口:

    KafkaStreams streams = new KafkaStreams(builder.build(), new StreamsConfig(config), supplier);
    streams.start();
    Copy to Clipboard Toggle word wrap
返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat