16.9. 为追踪检测 Kafka Streams 应用程序
检测应用程序代码,以便在 Kafka Streams API 应用程序中启用追踪。使用 decorator 模式或拦截器来检测您的 Kafka Streams API 应用程序以进行追踪。然后,您可以在从主题生成或检索消息时记录 trace。
- decorator 检测
-
对于 decorator 检测,请为追踪创建一个修改后的 Kafka Streams 实例。对于 OpenTelemetry,您需要创建一个自定义
TracingKafkaClientSupplier
类,以提供 Kafka Streams 的追踪工具。 - 拦截器检测
- 对于拦截器检测,在 Kafka Streams producer 和消费者配置中添加追踪功能。
先决条件
您已为 客户端 初始化了追踪。
您可以通过在项目中添加追踪 JAR 作为依赖项来启用 Kafka Streams 应用程序中的检测。
-
要使用 OpenTelemetry 检测 Kafka Streams,您需要编写自定义
TracingKafkaClientSupplier
。 自定义
TracingKafkaClientSupplier
可以扩展 Kafka 的DefaultKafkaClientSupplier
,覆盖生成者和消费者创建方法,将实例嵌套与遥测相关的代码。自定义
TracingKafkaClientSupplier
示例private class TracingKafkaClientSupplier extends DefaultKafkaClientSupplier { @Override public Producer<byte[], byte[]> getProducer(Map<String, Object> config) { KafkaTelemetry telemetry = KafkaTelemetry.create(GlobalOpenTelemetry.get()); return telemetry.wrap(super.getProducer(config)); } @Override public Consumer<byte[], byte[]> getConsumer(Map<String, Object> config) { KafkaTelemetry telemetry = KafkaTelemetry.create(GlobalOpenTelemetry.get()); return telemetry.wrap(super.getConsumer(config)); } @Override public Consumer<byte[], byte[]> getRestoreConsumer(Map<String, Object> config) { return this.getConsumer(config); } @Override public Consumer<byte[], byte[]> getGlobalConsumer(Map<String, Object> config) { return this.getConsumer(config); } }
流程
为每个 Kafka Streams API 应用程序执行这些步骤。
要使用 decorator 模式,创建一个
TracingKafkaClientSupplier
供应商接口的实例,然后为KafkaStreams
提供供应商接口。decorator 检测示例
KafkaClientSupplier supplier = new TracingKafkaClientSupplier(tracer); KafkaStreams streams = new KafkaStreams(builder.build(), new StreamsConfig(config), supplier); streams.start();
要使用拦截器,请在 Kafka Streams producer 和消费者配置中设置拦截器类。
TracingProducerInterceptor
和TracingConsumerInterceptor
interceptor 类负责追踪功能。使用拦截器的制作者和消费者配置示例
props.put(StreamsConfig.PRODUCER_PREFIX + ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName()); props.put(StreamsConfig.CONSUMER_PREFIX + ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());