搜索

15.2.2. 强制生产者和消费者进行追踪

download PDF

使用 Decorator 模式或 Interceptors 来检测 Java 制作者和使用者应用代码进行追踪。

流程

在每个生产者和消费者应用程序的应用程序代码中:

  1. 在生产者或消费者的 pom.xml 文件中,为 OpenTracing 添加 Maven 依赖项。

    <dependency>
        <groupId>io.opentracing.contrib</groupId>
        <artifactId>opentracing-kafka-client</artifactId>
        <version>0.1.15.redhat-00001</version>
    </dependency>
  2. 使用 Decorator 模式或 Interceptors 检测您的客户端应用程序代码。

    • 使用 Decorator 模式:

      // Create an instance of the KafkaProducer:
      KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
      
      // Create an instance of the TracingKafkaProducer:
      TracingKafkaProducer<Integer, String> tracingProducer = new TracingKafkaProducer<>(producer,
              tracer);
      
      // Send:
      tracingProducer.send(...);
      
      // Create an instance of the KafkaConsumer:
      KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
      
      // Create an instance of the TracingKafkaConsumer:
      TracingKafkaConsumer<Integer, String> tracingConsumer = new TracingKafkaConsumer<>(consumer,
              tracer);
      
      // Subscribe:
      tracingConsumer.subscribe(Collections.singletonList("messages"));
      
      // Get messages:
      ConsumerRecords<Integer, String> records = tracingConsumer.poll(1000);
      
      // Retrieve SpanContext from polled record (consumer side):
      ConsumerRecord<Integer, String> record = ...
      SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);
    • 使用 Interceptors:

      // Register the tracer with GlobalTracer:
      GlobalTracer.register(tracer);
      
      // Add the TracingProducerInterceptor to the sender properties:
      senderProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
                TracingProducerInterceptor.class.getName());
      
      // Create an instance of the KafkaProducer:
      KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
      
      // Send:
      producer.send(...);
      
      // Add the TracingConsumerInterceptor to the consumer properties:
      consumerProps.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
                TracingConsumerInterceptor.class.getName());
      
      // Create an instance of the KafkaConsumer:
      KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
      
      // Subscribe:
      consumer.subscribe(Collections.singletonList("messages"));
      
      // Get messages:
      ConsumerRecords<Integer, String> records = consumer.poll(1000);
      
      // Retrieve the SpanContext from a polled message (consumer side):
      ConsumerRecord<Integer, String> record = ...
      SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);
Decorator 模式中的自定义范围名称

span 是 Jaeger 中的逻辑工作单元,具有操作名称、开始时间和持续时间。

要使用 Decorator 模式来检测您的制作者和消费者应用程序,在创建 TracingKafkaProducerTracingKafkaConsumer 对象时传递 BiFunction 对象作为额外参数来定义自定义 span 名称。OpenTracing Apache Kafka Client Instrumentation 库包含多个内置范围名称。

示例:使用自定义 span 名称在 Decorator 模式中检测客户端应用程序代码

// Create a BiFunction for the KafkaProducer that operates on (String operationName, ProducerRecord consumerRecord) and returns a String to be used as the name:

BiFunction<String, ProducerRecord, String> producerSpanNameProvider =
    (operationName, producerRecord) -> "CUSTOM_PRODUCER_NAME";

// Create an instance of the KafkaProducer:
KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);

// Create an instance of the TracingKafkaProducer
TracingKafkaProducer<Integer, String> tracingProducer = new TracingKafkaProducer<>(producer,
        tracer,
        producerSpanNameProvider);

// Spans created by the tracingProducer will now have "CUSTOM_PRODUCER_NAME" as the span name.

// Create a BiFunction for the KafkaConsumer that operates on (String operationName, ConsumerRecord consumerRecord) and returns a String to be used as the name:

BiFunction<String, ConsumerRecord, String> consumerSpanNameProvider =
    (operationName, consumerRecord) -> operationName.toUpperCase();

// Create an instance of the KafkaConsumer:
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);

// Create an instance of the TracingKafkaConsumer, passing in the consumerSpanNameProvider BiFunction:

TracingKafkaConsumer<Integer, String> tracingConsumer = new TracingKafkaConsumer<>(consumer,
        tracer,
        consumerSpanNameProvider);

// Spans created by the tracingConsumer will have the operation name as the span name, in upper-case.
// "receive" -> "RECEIVE"

内置范围名称

在定义自定义 span 名称时,您可以在 ClientSpanNameProvider 类中使用以下 BiFunctions。如果没有指定 spanNameProvider,则使用 CONSUMER_OPERATION_NAMEPRODUCER_OPERATION_NAME

表 15.1. 定义自定义范围名称的 BiFunctions
BiFunction描述

CONSUMER_OPERATION_NAME, PRODUCER_OPERATION_NAME

operationName 返回为 span name: "receive" for使用者,并为生产者返回"send"。

CONSUMER_PREFIXED_OPERATION_NAME(String prefix), PRODUCER_PREFIXED_OPERATION_NAME(String prefix)

返回 prefixoperationName 的字符串连接。

CONSUMER_TOPIC, PRODUCER_TOPIC

返回消息发送到或检索到的主题的名称,格式为 (record.topic())

PREFIXED_CONSUMER_TOPIC(String prefix), PREFIXED_PRODUCER_TOPIC(String prefix)

返回 prefix 的 String 连接和 (record.topic()) 格式的主题名称。

CONSUMER_OPERATION_NAME_TOPIC, PRODUCER_OPERATION_NAME_TOPIC

返回操作名称和主题名称: "operationName - record.topic()"

CONSUMER_PREFIXED_OPERATION_NAME_TOPIC(String prefix), PRODUCER_PREFIXED_OPERATION_NAME_TOPIC(String prefix)

返回 prefix"operationName - record.topic()" 的字符串连接。

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.