13.2. 为 Kafka 客户端设置追踪
初始化 Jaeger tracer 以检测您的客户端应用程序以进行分布式追踪。
13.2.1. 为 Kafka 客户端初始化 Jaeger tracer 复制链接链接已复制到粘贴板!
使用一组 追踪环境变量 配置并初始化 Jaeger tracer。
流程
在每个客户端应用程序中:
将 Jaeger 的 Maven 依赖项添加到客户端应用程序的
pom.xml文件中:<dependency> <groupId>io.jaegertracing</groupId> <artifactId>jaeger-client</artifactId> <version>1.5.0.redhat-00001</version> </dependency>- 使用 追踪环境变量 定义 Jaeger tracer 的配置。
从在第 2 步中定义的环境变量创建 Jaeger tracer:
Tracer tracer = Configuration.fromEnv().getTracer();注意有关初始化 Jaeger tracer 的替代方法,请参阅 Java OpenTracing 库 文档。
将 Jaeger tracer 注册为全局 tracer:
GlobalTracer.register(tracer);
现在,会初始化 Jaeger tracer 供客户端应用程序使用。
13.2.2. 用于追踪的制作者和消费者的工具 复制链接链接已复制到粘贴板!
使用 Decorator 模式或 Interceptors 检测您的 Java 生成者和消费者应用程序代码以进行追踪。
流程
在每个制作者和消费者应用程序的应用程序代码中:
将 OpenTracing 的 Maven 依赖项添加到制作者或消费者的
pom.xml文件中。<dependency> <groupId>io.opentracing.contrib</groupId> <artifactId>opentracing-kafka-client</artifactId> <version>0.1.15.redhat-00004</version> </dependency>使用 Decorator 模式或 Interceptors 检测您的客户端应用程序代码。
使用 Decorator 模式:
// Create an instance of the KafkaProducer: KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps); // Create an instance of the TracingKafkaProducer: TracingKafkaProducer<Integer, String> tracingProducer = new TracingKafkaProducer<>(producer, tracer); // Send: tracingProducer.send(...); // Create an instance of the KafkaConsumer: KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps); // Create an instance of the TracingKafkaConsumer: TracingKafkaConsumer<Integer, String> tracingConsumer = new TracingKafkaConsumer<>(consumer, tracer); // Subscribe: tracingConsumer.subscribe(Collections.singletonList("messages")); // Get messages: ConsumerRecords<Integer, String> records = tracingConsumer.poll(1000); // Retrieve SpanContext from polled record (consumer side): ConsumerRecord<Integer, String> record = ... SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);使用 Interceptors:
// Register the tracer with GlobalTracer: GlobalTracer.register(tracer); // Add the TracingProducerInterceptor to the sender properties: senderProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName()); // Create an instance of the KafkaProducer: KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps); // Send: producer.send(...); // Add the TracingConsumerInterceptor to the consumer properties: consumerProps.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName()); // Create an instance of the KafkaConsumer: KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps); // Subscribe: consumer.subscribe(Collections.singletonList("messages")); // Get messages: ConsumerRecords<Integer, String> records = consumer.poll(1000); // Retrieve the SpanContext from a polled message (consumer side): ConsumerRecord<Integer, String> record = ... SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);
Decorator 模式中的自定义范围名称
span 是 Jaeger 中的逻辑工作单元,带有操作名称、开始时间和持续时间。
要使用 Decorator 模式来检测制作者和消费者应用程序,请在创建 TracingKafkaProducer 和 TracingKafkaConsumer 对象时传递 BiFunction 对象来定义自定义范围名称。OpenTracing Apache Kafka 客户端调用库包括几个内置范围名称。
示例:使用自定义范围名称来在 Decorator 模式中检测客户端应用程序代码
// Create a BiFunction for the KafkaProducer that operates on (String operationName, ProducerRecord consumerRecord) and returns a String to be used as the name:
BiFunction<String, ProducerRecord, String> producerSpanNameProvider =
(operationName, producerRecord) -> "CUSTOM_PRODUCER_NAME";
// Create an instance of the KafkaProducer:
KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
// Create an instance of the TracingKafkaProducer
TracingKafkaProducer<Integer, String> tracingProducer = new TracingKafkaProducer<>(producer,
tracer,
producerSpanNameProvider);
// Spans created by the tracingProducer will now have "CUSTOM_PRODUCER_NAME" as the span name.
// Create a BiFunction for the KafkaConsumer that operates on (String operationName, ConsumerRecord consumerRecord) and returns a String to be used as the name:
BiFunction<String, ConsumerRecord, String> consumerSpanNameProvider =
(operationName, consumerRecord) -> operationName.toUpperCase();
// Create an instance of the KafkaConsumer:
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
// Create an instance of the TracingKafkaConsumer, passing in the consumerSpanNameProvider BiFunction:
TracingKafkaConsumer<Integer, String> tracingConsumer = new TracingKafkaConsumer<>(consumer,
tracer,
consumerSpanNameProvider);
// Spans created by the tracingConsumer will have the operation name as the span name, in upper-case.
// "receive" -> "RECEIVE"
内置范围名称
在定义自定义 span 名称时,您可以在 ClientSpanNameProvider 类中使用以下 BiFunctions。如果没有指定 spanNameProvider,则使用 CONSUMER_OPERATION_NAME 和 PRODUCER_OPERATION_NAME。
| BiFunction | 描述 |
|---|---|
|
|
返回 |
|
|
返回 |
|
|
返回消息发送到或从中检索的主题名称,格式为 |
|
|
返回前缀和主题名称 |
|
|
返回操作名称和主题名称:" |
|
|
返回 |
13.2.3. 为追踪检测 Kafka Streams 应用程序 复制链接链接已复制到粘贴板!
使用供应商接口为分布式追踪检测 Kafka Streams 应用程序。这会在应用程序中启用 Interceptors。
流程
在每个 Kafka Streams 应用程序中:
将
opentracing-kafka-streams依赖项添加到 Kafka Streams 应用程序的pom.xml文件中。<dependency> <groupId>io.opentracing.contrib</groupId> <artifactId>opentracing-kafka-streams</artifactId> <version>0.1.15.redhat-00004</version> </dependency>创建
TracingKafkaClientSupplier供应商接口的实例:KafkaClientSupplier supplier = new TracingKafkaClientSupplier(tracer);为
KafkaStreams提供供应商接口:KafkaStreams streams = new KafkaStreams(builder.build(), new StreamsConfig(config), supplier); streams.start();