10.3. 使用 tracers 强制 Kafka 客户端
检测 Kafka 制作者和消费者客户端,以及 Kafka Streams API 应用程序以实现分布式追踪。
10.3.1. 强制生产者和消费者进行追踪
使用 Decorator 模式或 Interceptors 来检测 Java 制作者和使用者应用代码进行追踪。
流程
在每个生产者和消费者应用程序的应用程序代码中:
将 OpenTracing 的 Maven 依赖项添加到制作者或消费者的
pom.xml
文件中。<dependency> <groupId>io.opentracing.contrib</groupId> <artifactId>opentracing-kafka-client</artifactId> <version>0.1.15.redhat-00001</version> </dependency>
使用 Decorator 模式或 Interceptors 检测您的客户端应用程序代码。
使用 Decorator 模式:
// Create an instance of the KafkaProducer: KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps); // Create an instance of the TracingKafkaProducer: TracingKafkaProducer<Integer, String> tracingProducer = new TracingKafkaProducer<>(producer, tracer); // Send: tracingProducer.send(...); // Create an instance of the KafkaConsumer: KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps); // Create an instance of the TracingKafkaConsumer: TracingKafkaConsumer<Integer, String> tracingConsumer = new TracingKafkaConsumer<>(consumer, tracer); // Subscribe: tracingConsumer.subscribe(Collections.singletonList("messages")); // Get messages: ConsumerRecords<Integer, String> records = tracingConsumer.poll(1000); // Retrieve SpanContext from polled record (consumer side): ConsumerRecord<Integer, String> record = ... SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);
使用 Interceptors:
// Register the tracer with GlobalTracer: GlobalTracer.register(tracer); // Add the TracingProducerInterceptor to the sender properties: senderProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName()); // Create an instance of the KafkaProducer: KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps); // Send: producer.send(...); // Add the TracingConsumerInterceptor to the consumer properties: consumerProps.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName()); // Create an instance of the KafkaConsumer: KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps); // Subscribe: consumer.subscribe(Collections.singletonList("messages")); // Get messages: ConsumerRecords<Integer, String> records = consumer.poll(1000); // Retrieve the SpanContext from a polled message (consumer side): ConsumerRecord<Integer, String> record = ... SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);
10.3.1.1. Decorator 模式中的自定义范围名称
span 是 Jaeger 中的逻辑工作单元,具有操作名称、开始时间和持续时间。
要使用 Decorator 模式来检测您的制作者和消费者应用程序,请在创建 TracingKafkaProducer 和
对象时传递 TracingKafka
ConsumerBiFunction
对象作为额外参数来定义自定义 span 名称。OpenTracing Apache Kafka Client Instrumentation 库包含多个内置范围名称。
示例:使用自定义 span 名称在 Decorator 模式中检测客户端应用程序代码
// Create a BiFunction for the KafkaProducer that operates on (String operationName, ProducerRecord consumerRecord) and returns a String to be used as the name: BiFunction<String, ProducerRecord, String> producerSpanNameProvider = (operationName, producerRecord) -> "CUSTOM_PRODUCER_NAME"; // Create an instance of the KafkaProducer: KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps); // Create an instance of the TracingKafkaProducer TracingKafkaProducer<Integer, String> tracingProducer = new TracingKafkaProducer<>(producer, tracer, producerSpanNameProvider); // Spans created by the tracingProducer will now have "CUSTOM_PRODUCER_NAME" as the span name. // Create a BiFunction for the KafkaConsumer that operates on (String operationName, ConsumerRecord consumerRecord) and returns a String to be used as the name: BiFunction<String, ConsumerRecord, String> consumerSpanNameProvider = (operationName, consumerRecord) -> operationName.toUpperCase(); // Create an instance of the KafkaConsumer: KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps); // Create an instance of the TracingKafkaConsumer, passing in the consumerSpanNameProvider BiFunction: TracingKafkaConsumer<Integer, String> tracingConsumer = new TracingKafkaConsumer<>(consumer, tracer, consumerSpanNameProvider); // Spans created by the tracingConsumer will have the operation name as the span name, in upper-case. // "receive" -> "RECEIVE"
10.3.1.2. 内置范围名称
在定义自定义 span 名称时,您可以在 ClientSpanNameProvider
类中使用以下 BiFunctions
。如果没有指定 spanNameProvider
,则使用 CONSUMER_OPERATION_NAME
和 PRODUCER_OPERATION_NAME
。
BiFunction | 描述 |
---|---|
|
将 |
|
返回 |
|
返回消息发送到或检索到的主题的名称,格式为 |
|
返回 |
|
返回操作名称和主题名称 |
|
返回 |