15.8. 用于追踪的生产者和消费者
在 Kafka 生产者和消费者中启用追踪的检测应用程序代码。使用 decorator 模式或拦截器工具您的 Java producer 和消费者应用程序代码进行追踪。然后,您可以在从主题生成或检索信息时记录跟踪。
OpenTelemetry 和 OpenTracing 检测项目提供支持生产者和消费者工具的类。
- decorator 工具
- 对于 decorator 检测,请创建修改的制作者或消费者实例以进行追踪。decorator 工具与 OpenTelemetry 和 OpenTracing 的不同。
- 拦截器工具
- 要进行拦截器检测,请将追踪功能添加到使用者或制作者配置中。拦截器工具与 OpenTelemetry 和 OpenTracing 相同。
先决条件
您已为 客户端 初始化追踪。
您可以通过在项目中添加追踪 JAR 作为依赖项来启用制作者和使用者应用。
流程
在各个制作者和消费者应用的应用代码中执行这些步骤。使用 decorator 模式或拦截器(拦截器)检测您的客户端应用程序代码。
要使用 decorator 模式,请创建修改后的制作者或消费者实例来发送或接收消息。
您传递了原始
KafkaProducer或KafkaConsumer类。OpenTelemetry 的 decorator 检测示例
// Producer instance Producer < String, String > op = new KafkaProducer < > ( configs, new StringSerializer(), new StringSerializer() ); Producer < String, String > producer = tracing.wrap(op); KafkaTracing tracing = KafkaTracing.create(GlobalOpenTelemetry.get()); producer.send(...); //consumer instance Consumer<String, String> oc = new KafkaConsumer<>( configs, new StringDeserializer(), new StringDeserializer() ); Consumer<String, String> consumer = tracing.wrap(oc); consumer.subscribe(Collections.singleton("mytopic")); ConsumerRecords<Integer, String> records = consumer.poll(1000); ConsumerRecord<Integer, String> record = ... SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);OpenTracing 的 decorator 检测示例
//producer instance KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps); TracingKafkaProducer<Integer, String> tracingProducer = new TracingKafkaProducer<>(producer, tracer); TracingKafkaProducer.send(...) //consumer instance KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps); TracingKafkaConsumer<Integer, String> tracingConsumer = new TracingKafkaConsumer<>(consumer, tracer); tracingConsumer.subscribe(Collections.singletonList("mytopic")); ConsumerRecords<Integer, String> records = tracingConsumer.poll(1000); ConsumerRecord<Integer, String> record = ... SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);要使用拦截器,请在制作者或消费者配置中设置拦截器类。
您以通常的方式使用
KafkaProducer和KafkaConsumer类。TracingProducerInterceptor和TracingConsumerInterceptorinterceptor 类负责追踪功能。使用拦截器的制作者配置示例
senderProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName()); KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps); producer.send(...);使用拦截器的使用者配置示例
consumerProps.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName()); KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps); consumer.subscribe(Collections.singletonList("messages")); ConsumerRecords<Integer, String> records = consumer.poll(1000); ConsumerRecord<Integer, String> record = ... SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);