搜索

22.3. 设置分布式追踪

download PDF

通过在自定义资源中指定追踪类型,在 Kafka 组件中启用分布式追踪。Kafka 客户端中的检测追踪程序用于对消息的端到端跟踪。

要设置分布式追踪,请遵循以下步骤:

22.3.1. 先决条件

在设置分布式追踪前,请确保 Jaeger 后端组件部署到 OpenShift 集群中。我们建议使用 Jaeger operator 在 OpenShift 集群上部署 Jaeger。

有关部署说明,请参阅 Jaeger 文档

注意

对于除 Apache Kafka 的流外的应用程序和系统设置追踪不在此内容范围内。

22.3.2. 在 MirrorMaker、Kafka Connect 和 Kafka Bridge 资源中启用追踪

MirrorMaker、MirrorMaker 2、Kafka Connect 和 Apache Kafka Bridge 的 Streams 支持分布式追踪。配置组件的自定义资源,以指定和启用 tracer 服务。

在资源中启用追踪会触发以下事件:

  • 拦截器类在组件的集成使用者和制作者中更新。
  • 对于 MirrorMaker、MirrorMaker 2 和 Kafka Connect,追踪代理会根据资源中定义的追踪配置初始化 tracer。
  • 对于 Kafka Bridge,基于资源中定义的追踪配置由 Kafka Bridge 本身初始化的 tracer。

您可以启用使用 OpenTelemetry 的追踪。

MirrorMaker 和 MirrorMaker 2 中的追踪

对于 MirrorMaker 和 MirrorMaker 2,信息从源集群追踪到目标集群。跟踪数据记录进入和离开 MirrorMaker 或 MirrorMaker 2 组件的消息。

Kafka Connect 中的追踪

对于 Kafka Connect,只有 Kafka Connect 生成和消耗的消息才会被 traced。要跟踪 Kafka Connect 和外部系统之间发送的消息,您必须在连接器中为这些系统配置追踪。

Kafka Bridge 中的追踪

对于 Kafka Bridge,Kafka Bridge 生成和消耗的消息会被跟踪。也跟踪来自客户端应用程序的 HTTP 请求,以通过 Kafka Bridge 发送和接收信息。要进行端到端追踪,您必须在 HTTP 客户端中配置追踪。

流程

为每个 KafkaMirrorMaker, KafkaMirrorMaker2, KafkaConnect, 和 KafkaBridge 资源执行这些步骤。

  1. spec.template 属性中,配置 tracer 服务。

    • 使用追踪环境变量作为模板配置属性。
    • 对于 OpenTelemetry,将 spec.tracing.type 属性设置为 opentelemetry

    使用 OpenTelemetry 的 Kafka Connect 的追踪配置示例

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnect
    metadata:
      name: my-connect-cluster
    spec:
      #...
      template:
        connectContainer:
          env:
            - name: OTEL_SERVICE_NAME
              value: my-otel-service
            - name: OTEL_EXPORTER_OTLP_ENDPOINT
              value: "http://otlp-host:4317"
      tracing:
        type: opentelemetry
      #...

    使用 OpenTelemetry 的 MirrorMaker 的追踪配置示例

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaMirrorMaker
    metadata:
      name: my-mirror-maker
    spec:
      #...
      template:
        mirrorMakerContainer:
          env:
            - name: OTEL_SERVICE_NAME
              value: my-otel-service
            - name: OTEL_EXPORTER_OTLP_ENDPOINT
              value: "http://otlp-host:4317"
      tracing:
        type: opentelemetry
    #...

    使用 OpenTelemetry 的 MirrorMaker 2 的追踪配置示例

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaMirrorMaker2
    metadata:
      name: my-mm2-cluster
    spec:
      #...
      template:
        connectContainer:
          env:
            - name: OTEL_SERVICE_NAME
              value: my-otel-service
            - name: OTEL_EXPORTER_OTLP_ENDPOINT
              value: "http://otlp-host:4317"
      tracing:
        type: opentelemetry
    #...

    使用 OpenTelemetry 的 Kafka Bridge 的追踪配置示例

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaBridge
    metadata:
      name: my-bridge
    spec:
      #...
      template:
        bridgeContainer:
          env:
            - name: OTEL_SERVICE_NAME
              value: my-otel-service
            - name: OTEL_EXPORTER_OTLP_ENDPOINT
              value: "http://otlp-host:4317"
      tracing:
        type: opentelemetry
    #...

  2. 创建或更新资源:

    oc apply -f <resource_configuration_file>

22.3.3. 初始化 Kafka 客户端的追踪

为 OpenTelemetry 初始化 tracer,然后检测您的客户端应用程序以进行分布式追踪。您可以检测 Kafka producer 和消费者客户端,以及 Kafka Streams API 应用程序。

使用一组 追踪环境变量 配置和初始化 tracer。

流程

在每个客户端应用程序中添加 tracer 的依赖项:

  1. 将 Maven 依赖项添加到客户端应用程序的 pom.xml 文件中:

    OpenTelemetry 的依赖项

    <dependency>
        <groupId>io.opentelemetry.semconv</groupId>
        <artifactId>opentelemetry-semconv</artifactId>
        <version>1.21.0-alpha</version>
    </dependency>
    <dependency>
        <groupId>io.opentelemetry</groupId>
        <artifactId>opentelemetry-exporter-otlp</artifactId>
        <version>1.34.1</version>
        <exclusions>
            <exclusion>
                <groupId>io.opentelemetry</groupId>
                <artifactId>opentelemetry-exporter-sender-okhttp</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>io.opentelemetry</groupId>
        <artifactId>opentelemetry-exporter-sender-grpc-managed-channel</artifactId>
        <version>1.34.1</version>
        <scope>runtime</scope>
    </dependency>
    <dependency>
        <groupId>io.opentelemetry</groupId>
        <artifactId>opentelemetry-sdk-extension-autoconfigure</artifactId>
        <version>1.34.1</version>
    </dependency>
    <dependency>
        <groupId>io.opentelemetry.instrumentation</groupId>
        <artifactId>opentelemetry-kafka-clients-2.6</artifactId>
        <version>1.32.0-alpha</version>
    </dependency>
    <dependency>
        <groupId>io.opentelemetry</groupId>
        <artifactId>opentelemetry-sdk</artifactId>
        <version>1.34.1</version>
    </dependency>
    <dependency>
        <groupId>io.opentelemetry</groupId>
        <artifactId>opentelemetry-exporter-sender-jdk</artifactId>
        <version>1.34.1-alpha</version>
        <scope>runtime</scope>
    </dependency>
    <dependency>
        <groupId>io.grpc</groupId>
        <artifactId>grpc-netty-shaded</artifactId>
        <version>1.61.0</version>
    </dependency>

  2. 使用追踪环境变量定义 tracer 的配置。
  3. 创建一个 tracer,它使用环境变量初始化:

    为 OpenTelemetry 创建 tracer

    OpenTelemetry ot = GlobalOpenTelemetry.get();

  4. 将 tracer 注册为全局 tracer:

    GlobalTracer.register(tracer);
  5. 检测您的客户端:

22.3.4. 用于追踪的制作者和消费者的工具

检测应用程序代码,以便在 Kafka 生成者和消费者中启用追踪。使用 decorator 模式或拦截器来检测您的 Java 生成者和消费者应用程序代码以进行追踪。然后,您可以在从主题生成或检索消息时记录 trace。

OpenTelemetry 检测项目提供类来支持生成者和消费者的工具。

decorator 检测
对于 decorator 检测,请为追踪创建修改后的制作者或消费者实例。
拦截器检测
对于拦截器检测,请将追踪功能添加到消费者或生成者配置中。

先决条件

  • 您已为 客户端 初始化了追踪

    您可以通过在项目中添加追踪 JAR 作为依赖项来启用生成者和消费者应用程序的检测。

流程

在每个制作者和消费者应用的应用程序代码中执行这些步骤。使用 decorator 模式或拦截器(拦截器)检测您的客户端应用程序代码。

  • 要使用 decorator 模式,请创建一个修改后的制作者或消费者实例来发送或接收消息。

    您传递了原始 KafkaProducerKafkaConsumer 类。

    OpenTelemetry 的 decorator 检测示例

    // Producer instance
    Producer < String, String > op = new KafkaProducer < > (
        configs,
        new StringSerializer(),
        new StringSerializer()
        );
        Producer < String, String > producer = tracing.wrap(op);
    KafkaTracing tracing = KafkaTracing.create(GlobalOpenTelemetry.get());
    producer.send(...);
    
    //consumer instance
    Consumer<String, String> oc = new KafkaConsumer<>(
        configs,
        new StringDeserializer(),
        new StringDeserializer()
        );
        Consumer<String, String> consumer = tracing.wrap(oc);
    consumer.subscribe(Collections.singleton("mytopic"));
    ConsumerRecords<Integer, String> records = consumer.poll(1000);
    ConsumerRecord<Integer, String> record = ...
    SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);

  • 要使用拦截器,请在生成者或消费者配置中设置拦截器类。

    您以通常的方式使用 KafkaProducerKafkaConsumer 类。TracingProducerInterceptorTracingConsumerInterceptor interceptor 类负责追踪功能。

    使用拦截器的制作者配置示例

    senderProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
        TracingProducerInterceptor.class.getName());
    
    KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
    producer.send(...);

    使用拦截器的消费者配置示例

    consumerProps.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
        TracingConsumerInterceptor.class.getName());
    
    KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
    consumer.subscribe(Collections.singletonList("messages"));
    ConsumerRecords<Integer, String> records = consumer.poll(1000);
    ConsumerRecord<Integer, String> record = ...
    SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);

22.3.5. 为追踪检测 Kafka Streams 应用程序

检测应用程序代码,以便在 Kafka Streams API 应用程序中启用追踪。使用 decorator 模式或拦截器来检测您的 Kafka Streams API 应用程序以进行追踪。然后,您可以在从主题生成或检索消息时记录 trace。

decorator 检测
对于 decorator 检测,请为追踪创建一个修改后的 Kafka Streams 实例。对于 OpenTelemetry,您需要创建一个自定义 TracingKafkaClientSupplier 类,以提供 Kafka Streams 的追踪工具。
拦截器检测
对于拦截器检测,在 Kafka Streams producer 和消费者配置中添加追踪功能。

先决条件

  • 您已为 客户端 初始化了追踪

    您可以通过在项目中添加追踪 JAR 作为依赖项来启用 Kafka Streams 应用程序中的检测。

  • 要使用 OpenTelemetry 检测 Kafka Streams,您需要编写自定义 TracingKafkaClientSupplier
  • 自定义 TracingKafkaClientSupplier 可以扩展 Kafka 的 DefaultKafkaClientSupplier,覆盖生成者和消费者创建方法,将实例嵌套与遥测相关的代码。

    自定义 TracingKafkaClientSupplier 示例

    private class TracingKafkaClientSupplier extends DefaultKafkaClientSupplier {
        @Override
        public Producer<byte[], byte[]> getProducer(Map<String, Object> config) {
            KafkaTelemetry telemetry = KafkaTelemetry.create(GlobalOpenTelemetry.get());
            return telemetry.wrap(super.getProducer(config));
        }
    
        @Override
        public Consumer<byte[], byte[]> getConsumer(Map<String, Object> config) {
            KafkaTelemetry telemetry = KafkaTelemetry.create(GlobalOpenTelemetry.get());
            return telemetry.wrap(super.getConsumer(config));
        }
    
        @Override
        public Consumer<byte[], byte[]> getRestoreConsumer(Map<String, Object> config) {
            return this.getConsumer(config);
        }
    
        @Override
        public Consumer<byte[], byte[]> getGlobalConsumer(Map<String, Object> config) {
            return this.getConsumer(config);
        }
    }

流程

为每个 Kafka Streams API 应用程序执行这些步骤。

  • 要使用 decorator 模式,创建一个 TracingKafkaClientSupplier 供应商接口的实例,然后为 KafkaStreams 提供供应商接口。

    decorator 检测示例

    KafkaClientSupplier supplier = new TracingKafkaClientSupplier(tracer);
    KafkaStreams streams = new KafkaStreams(builder.build(), new StreamsConfig(config), supplier);
    streams.start();

  • 要使用拦截器,请在 Kafka Streams producer 和消费者配置中设置拦截器类。

    TracingProducerInterceptorTracingConsumerInterceptor interceptor 类负责追踪功能。

    使用拦截器的制作者和消费者配置示例

    props.put(StreamsConfig.PRODUCER_PREFIX + ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
    props.put(StreamsConfig.CONSUMER_PREFIX + ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());

22.3.6. 引入不同的 OpenTelemetry 追踪系统

您可以指定 OpenTelemetry 支持的其他追踪系统,而不是默认的 OTLP 系统。您可以通过在 Apache Kafka 的 Streams 提供的 Kafka 镜像中添加所需的工件。还必须设置任何所需的特定环境变量。然后,您可以使用 OTEL_TRACES_EXPORTER 环境变量启用新的追踪实施。

此流程演示了如何实施 Zipkin tracing。

流程

  1. 将追踪工件添加到 Apache Kafka 镜像的 Streams 的 /opt/kafka/libs/ 目录中。

    您可以使用 红帽生态系统目录 上的 Kafka 容器镜像作为基础镜像来创建新的自定义镜像。

    Zipkin 的 OpenTelemetry 工件

    io.opentelemetry:opentelemetry-exporter-zipkin

  2. 为新的追踪实现设置追踪导出器和端点。

    Zikpin tracer 配置示例

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaMirrorMaker2
    metadata:
      name: my-mm2-cluster
    spec:
      #...
      template:
        connectContainer:
          env:
            - name: OTEL_SERVICE_NAME
              value: my-zipkin-service
            - name: OTEL_EXPORTER_ZIPKIN_ENDPOINT
              value: http://zipkin-exporter-host-name:9411/api/v2/spans 1
            - name: OTEL_TRACES_EXPORTER
              value: zipkin 2
      tracing:
        type: opentelemetry
    #...

    1
    指定要连接到的 Zipkin 端点。
    2
    Zipkin exporter。

22.3.7. 为 OpenTelemetry 指定自定义 span 名称

追踪 span 是 Jaeger 中的逻辑工作单元,包括操作名称、开始时间和持续时间。span 具有内置名称,但您可以在使用的 Kafka 客户端检测中指定自定义范围名称。

指定自定义范围名称是可选的,只有在生成者和消费者客户端检测Kafka Streams 检测中使用 decorator 模式时才适用。

无法通过 OpenTelemetry 直接指定自定义 span 名称。相反,您可以通过向客户端应用程序添加代码来提取额外的标签和属性来检索范围名称。

提取属性的代码示例

//Defines attribute extraction for a producer
private static class ProducerAttribExtractor implements AttributesExtractor < ProducerRecord < ? , ? > , Void > {
    @Override
    public void onStart(AttributesBuilder attributes, ProducerRecord < ? , ? > producerRecord) {
        set(attributes, AttributeKey.stringKey("prod_start"), "prod1");
    }
    @Override
    public void onEnd(AttributesBuilder attributes, ProducerRecord < ? , ? > producerRecord, @Nullable Void unused, @Nullable Throwable error) {
        set(attributes, AttributeKey.stringKey("prod_end"), "prod2");
    }
}
//Defines attribute extraction for a consumer
private static class ConsumerAttribExtractor implements AttributesExtractor < ConsumerRecord < ? , ? > , Void > {
    @Override
    public void onStart(AttributesBuilder attributes, ConsumerRecord < ? , ? > producerRecord) {
        set(attributes, AttributeKey.stringKey("con_start"), "con1");
    }
    @Override
    public void onEnd(AttributesBuilder attributes, ConsumerRecord < ? , ? > producerRecord, @Nullable Void unused, @Nullable Throwable error) {
        set(attributes, AttributeKey.stringKey("con_end"), "con2");
    }
}
//Extracts the attributes
public static void main(String[] args) throws Exception {
        Map < String, Object > configs = new HashMap < > (Collections.singletonMap(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"));
        System.setProperty("otel.traces.exporter", "jaeger");
        System.setProperty("otel.service.name", "myapp1");
        KafkaTracing tracing = KafkaTracing.newBuilder(GlobalOpenTelemetry.get())
            .addProducerAttributesExtractors(new ProducerAttribExtractor())
            .addConsumerAttributesExtractors(new ConsumerAttribExtractor())
            .build();

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.