22.3. 设置分布式追踪
通过在自定义资源中指定追踪类型,在 Kafka 组件中启用分布式追踪。Kafka 客户端中的检测追踪程序用于对消息的端到端跟踪。
要设置分布式追踪,请遵循以下步骤:
- 为 MirrorMaker、Kafka Connect 和 Kafka Bridge 启用追踪
为客户端设置追踪:
使用 tracers 检测客户端:
22.3.1. 先决条件
在设置分布式追踪前,请确保 Jaeger 后端组件部署到 OpenShift 集群中。我们建议使用 Jaeger operator 在 OpenShift 集群上部署 Jaeger。
有关部署说明,请参阅 Jaeger 文档。
对于除 Apache Kafka 的流外的应用程序和系统设置追踪不在此内容范围内。
22.3.2. 在 MirrorMaker、Kafka Connect 和 Kafka Bridge 资源中启用追踪
MirrorMaker、MirrorMaker 2、Kafka Connect 和 Apache Kafka Bridge 的 Streams 支持分布式追踪。配置组件的自定义资源,以指定和启用 tracer 服务。
在资源中启用追踪会触发以下事件:
- 拦截器类在组件的集成使用者和制作者中更新。
- 对于 MirrorMaker、MirrorMaker 2 和 Kafka Connect,追踪代理会根据资源中定义的追踪配置初始化 tracer。
- 对于 Kafka Bridge,基于资源中定义的追踪配置由 Kafka Bridge 本身初始化的 tracer。
您可以启用使用 OpenTelemetry 的追踪。
MirrorMaker 和 MirrorMaker 2 中的追踪
对于 MirrorMaker 和 MirrorMaker 2,信息从源集群追踪到目标集群。跟踪数据记录进入和离开 MirrorMaker 或 MirrorMaker 2 组件的消息。
Kafka Connect 中的追踪
对于 Kafka Connect,只有 Kafka Connect 生成和消耗的消息才会被 traced。要跟踪 Kafka Connect 和外部系统之间发送的消息,您必须在连接器中为这些系统配置追踪。
Kafka Bridge 中的追踪
对于 Kafka Bridge,Kafka Bridge 生成和消耗的消息会被跟踪。也跟踪来自客户端应用程序的 HTTP 请求,以通过 Kafka Bridge 发送和接收信息。要进行端到端追踪,您必须在 HTTP 客户端中配置追踪。
流程
为每个 KafkaMirrorMaker
, KafkaMirrorMaker2
, KafkaConnect
, 和 KafkaBridge
资源执行这些步骤。
在
spec.template
属性中,配置 tracer 服务。- 使用追踪环境变量作为模板配置属性。
-
对于 OpenTelemetry,将
spec.tracing.type
属性设置为opentelemetry
。
使用 OpenTelemetry 的 Kafka Connect 的追踪配置示例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect-cluster spec: #... template: connectContainer: env: - name: OTEL_SERVICE_NAME value: my-otel-service - name: OTEL_EXPORTER_OTLP_ENDPOINT value: "http://otlp-host:4317" tracing: type: opentelemetry #...
使用 OpenTelemetry 的 MirrorMaker 的追踪配置示例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaMirrorMaker metadata: name: my-mirror-maker spec: #... template: mirrorMakerContainer: env: - name: OTEL_SERVICE_NAME value: my-otel-service - name: OTEL_EXPORTER_OTLP_ENDPOINT value: "http://otlp-host:4317" tracing: type: opentelemetry #...
使用 OpenTelemetry 的 MirrorMaker 2 的追踪配置示例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaMirrorMaker2 metadata: name: my-mm2-cluster spec: #... template: connectContainer: env: - name: OTEL_SERVICE_NAME value: my-otel-service - name: OTEL_EXPORTER_OTLP_ENDPOINT value: "http://otlp-host:4317" tracing: type: opentelemetry #...
使用 OpenTelemetry 的 Kafka Bridge 的追踪配置示例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaBridge metadata: name: my-bridge spec: #... template: bridgeContainer: env: - name: OTEL_SERVICE_NAME value: my-otel-service - name: OTEL_EXPORTER_OTLP_ENDPOINT value: "http://otlp-host:4317" tracing: type: opentelemetry #...
创建或更新资源:
oc apply -f <resource_configuration_file>
22.3.3. 初始化 Kafka 客户端的追踪
为 OpenTelemetry 初始化 tracer,然后检测您的客户端应用程序以进行分布式追踪。您可以检测 Kafka producer 和消费者客户端,以及 Kafka Streams API 应用程序。
使用一组 追踪环境变量 配置和初始化 tracer。
流程
在每个客户端应用程序中添加 tracer 的依赖项:
将 Maven 依赖项添加到客户端应用程序的
pom.xml
文件中:OpenTelemetry 的依赖项
<dependency> <groupId>io.opentelemetry.semconv</groupId> <artifactId>opentelemetry-semconv</artifactId> <version>1.21.0-alpha</version> </dependency> <dependency> <groupId>io.opentelemetry</groupId> <artifactId>opentelemetry-exporter-otlp</artifactId> <version>1.34.1</version> <exclusions> <exclusion> <groupId>io.opentelemetry</groupId> <artifactId>opentelemetry-exporter-sender-okhttp</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>io.opentelemetry</groupId> <artifactId>opentelemetry-exporter-sender-grpc-managed-channel</artifactId> <version>1.34.1</version> <scope>runtime</scope> </dependency> <dependency> <groupId>io.opentelemetry</groupId> <artifactId>opentelemetry-sdk-extension-autoconfigure</artifactId> <version>1.34.1</version> </dependency> <dependency> <groupId>io.opentelemetry.instrumentation</groupId> <artifactId>opentelemetry-kafka-clients-2.6</artifactId> <version>1.32.0-alpha</version> </dependency> <dependency> <groupId>io.opentelemetry</groupId> <artifactId>opentelemetry-sdk</artifactId> <version>1.34.1</version> </dependency> <dependency> <groupId>io.opentelemetry</groupId> <artifactId>opentelemetry-exporter-sender-jdk</artifactId> <version>1.34.1-alpha</version> <scope>runtime</scope> </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-netty-shaded</artifactId> <version>1.61.0</version> </dependency>
- 使用追踪环境变量定义 tracer 的配置。
创建一个 tracer,它使用环境变量初始化:
为 OpenTelemetry 创建 tracer
OpenTelemetry ot = GlobalOpenTelemetry.get();
将 tracer 注册为全局 tracer:
GlobalTracer.register(tracer);
检测您的客户端:
22.3.4. 用于追踪的制作者和消费者的工具
检测应用程序代码,以便在 Kafka 生成者和消费者中启用追踪。使用 decorator 模式或拦截器来检测您的 Java 生成者和消费者应用程序代码以进行追踪。然后,您可以在从主题生成或检索消息时记录 trace。
OpenTelemetry 检测项目提供类来支持生成者和消费者的工具。
- decorator 检测
- 对于 decorator 检测,请为追踪创建修改后的制作者或消费者实例。
- 拦截器检测
- 对于拦截器检测,请将追踪功能添加到消费者或生成者配置中。
先决条件
您已为 客户端 初始化了追踪。
您可以通过在项目中添加追踪 JAR 作为依赖项来启用生成者和消费者应用程序的检测。
流程
在每个制作者和消费者应用的应用程序代码中执行这些步骤。使用 decorator 模式或拦截器(拦截器)检测您的客户端应用程序代码。
要使用 decorator 模式,请创建一个修改后的制作者或消费者实例来发送或接收消息。
您传递了原始
KafkaProducer
或KafkaConsumer
类。OpenTelemetry 的 decorator 检测示例
// Producer instance Producer < String, String > op = new KafkaProducer < > ( configs, new StringSerializer(), new StringSerializer() ); Producer < String, String > producer = tracing.wrap(op); KafkaTracing tracing = KafkaTracing.create(GlobalOpenTelemetry.get()); producer.send(...); //consumer instance Consumer<String, String> oc = new KafkaConsumer<>( configs, new StringDeserializer(), new StringDeserializer() ); Consumer<String, String> consumer = tracing.wrap(oc); consumer.subscribe(Collections.singleton("mytopic")); ConsumerRecords<Integer, String> records = consumer.poll(1000); ConsumerRecord<Integer, String> record = ... SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);
要使用拦截器,请在生成者或消费者配置中设置拦截器类。
您以通常的方式使用
KafkaProducer
和KafkaConsumer
类。TracingProducerInterceptor
和TracingConsumerInterceptor
interceptor 类负责追踪功能。使用拦截器的制作者配置示例
senderProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName()); KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps); producer.send(...);
使用拦截器的消费者配置示例
consumerProps.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName()); KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps); consumer.subscribe(Collections.singletonList("messages")); ConsumerRecords<Integer, String> records = consumer.poll(1000); ConsumerRecord<Integer, String> record = ... SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);
22.3.5. 为追踪检测 Kafka Streams 应用程序
检测应用程序代码,以便在 Kafka Streams API 应用程序中启用追踪。使用 decorator 模式或拦截器来检测您的 Kafka Streams API 应用程序以进行追踪。然后,您可以在从主题生成或检索消息时记录 trace。
- decorator 检测
-
对于 decorator 检测,请为追踪创建一个修改后的 Kafka Streams 实例。对于 OpenTelemetry,您需要创建一个自定义
TracingKafkaClientSupplier
类,以提供 Kafka Streams 的追踪工具。 - 拦截器检测
- 对于拦截器检测,在 Kafka Streams producer 和消费者配置中添加追踪功能。
先决条件
您已为 客户端 初始化了追踪。
您可以通过在项目中添加追踪 JAR 作为依赖项来启用 Kafka Streams 应用程序中的检测。
-
要使用 OpenTelemetry 检测 Kafka Streams,您需要编写自定义
TracingKafkaClientSupplier
。 自定义
TracingKafkaClientSupplier
可以扩展 Kafka 的DefaultKafkaClientSupplier
,覆盖生成者和消费者创建方法,将实例嵌套与遥测相关的代码。自定义
TracingKafkaClientSupplier
示例private class TracingKafkaClientSupplier extends DefaultKafkaClientSupplier { @Override public Producer<byte[], byte[]> getProducer(Map<String, Object> config) { KafkaTelemetry telemetry = KafkaTelemetry.create(GlobalOpenTelemetry.get()); return telemetry.wrap(super.getProducer(config)); } @Override public Consumer<byte[], byte[]> getConsumer(Map<String, Object> config) { KafkaTelemetry telemetry = KafkaTelemetry.create(GlobalOpenTelemetry.get()); return telemetry.wrap(super.getConsumer(config)); } @Override public Consumer<byte[], byte[]> getRestoreConsumer(Map<String, Object> config) { return this.getConsumer(config); } @Override public Consumer<byte[], byte[]> getGlobalConsumer(Map<String, Object> config) { return this.getConsumer(config); } }
流程
为每个 Kafka Streams API 应用程序执行这些步骤。
要使用 decorator 模式,创建一个
TracingKafkaClientSupplier
供应商接口的实例,然后为KafkaStreams
提供供应商接口。decorator 检测示例
KafkaClientSupplier supplier = new TracingKafkaClientSupplier(tracer); KafkaStreams streams = new KafkaStreams(builder.build(), new StreamsConfig(config), supplier); streams.start();
要使用拦截器,请在 Kafka Streams producer 和消费者配置中设置拦截器类。
TracingProducerInterceptor
和TracingConsumerInterceptor
interceptor 类负责追踪功能。使用拦截器的制作者和消费者配置示例
props.put(StreamsConfig.PRODUCER_PREFIX + ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName()); props.put(StreamsConfig.CONSUMER_PREFIX + ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
22.3.6. 引入不同的 OpenTelemetry 追踪系统
您可以指定 OpenTelemetry 支持的其他追踪系统,而不是默认的 OTLP 系统。您可以通过在 Apache Kafka 的 Streams 提供的 Kafka 镜像中添加所需的工件。还必须设置任何所需的特定环境变量。然后,您可以使用 OTEL_TRACES_EXPORTER
环境变量启用新的追踪实施。
此流程演示了如何实施 Zipkin tracing。
流程
将追踪工件添加到 Apache Kafka 镜像的 Streams 的
/opt/kafka/libs/
目录中。您可以使用 红帽生态系统目录 上的 Kafka 容器镜像作为基础镜像来创建新的自定义镜像。
Zipkin 的 OpenTelemetry 工件
io.opentelemetry:opentelemetry-exporter-zipkin
为新的追踪实现设置追踪导出器和端点。
Zikpin tracer 配置示例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaMirrorMaker2 metadata: name: my-mm2-cluster spec: #... template: connectContainer: env: - name: OTEL_SERVICE_NAME value: my-zipkin-service - name: OTEL_EXPORTER_ZIPKIN_ENDPOINT value: http://zipkin-exporter-host-name:9411/api/v2/spans 1 - name: OTEL_TRACES_EXPORTER value: zipkin 2 tracing: type: opentelemetry #...
22.3.7. 为 OpenTelemetry 指定自定义 span 名称
追踪 span 是 Jaeger 中的逻辑工作单元,包括操作名称、开始时间和持续时间。span 具有内置名称,但您可以在使用的 Kafka 客户端检测中指定自定义范围名称。
指定自定义范围名称是可选的,只有在生成者和消费者客户端检测或 Kafka Streams 检测中使用 decorator 模式时才适用。
无法通过 OpenTelemetry 直接指定自定义 span 名称。相反,您可以通过向客户端应用程序添加代码来提取额外的标签和属性来检索范围名称。
提取属性的代码示例
//Defines attribute extraction for a producer private static class ProducerAttribExtractor implements AttributesExtractor < ProducerRecord < ? , ? > , Void > { @Override public void onStart(AttributesBuilder attributes, ProducerRecord < ? , ? > producerRecord) { set(attributes, AttributeKey.stringKey("prod_start"), "prod1"); } @Override public void onEnd(AttributesBuilder attributes, ProducerRecord < ? , ? > producerRecord, @Nullable Void unused, @Nullable Throwable error) { set(attributes, AttributeKey.stringKey("prod_end"), "prod2"); } } //Defines attribute extraction for a consumer private static class ConsumerAttribExtractor implements AttributesExtractor < ConsumerRecord < ? , ? > , Void > { @Override public void onStart(AttributesBuilder attributes, ConsumerRecord < ? , ? > producerRecord) { set(attributes, AttributeKey.stringKey("con_start"), "con1"); } @Override public void onEnd(AttributesBuilder attributes, ConsumerRecord < ? , ? > producerRecord, @Nullable Void unused, @Nullable Throwable error) { set(attributes, AttributeKey.stringKey("con_end"), "con2"); } } //Extracts the attributes public static void main(String[] args) throws Exception { Map < String, Object > configs = new HashMap < > (Collections.singletonMap(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")); System.setProperty("otel.traces.exporter", "jaeger"); System.setProperty("otel.service.name", "myapp1"); KafkaTracing tracing = KafkaTracing.newBuilder(GlobalOpenTelemetry.get()) .addProducerAttributesExtractors(new ProducerAttribExtractor()) .addConsumerAttributesExtractors(new ConsumerAttribExtractor()) .build();