此内容没有您所选择的语言版本。

Chapter 19. Setting up distributed tracing


Distributed tracing allows you to track the progress of transactions between applications in a distributed system. In a microservices architecture, tracing tracks the progress of transactions between services. Trace data is useful for monitoring application performance and investigating issues with target systems and end-user applications.

In Streams for Apache Kafka, tracing facilitates the end-to-end tracking of messages: from source systems to Kafka, and then from Kafka to target systems and applications. It complements the metrics that are available to view in JMX metrics, as well as the component loggers.

Support for tracing is built in to the following Kafka components:

  • Kafka Connect
  • MirrorMaker
  • MirrorMaker 2
  • Streams for Apache Kafka Bridge

Tracing is not supported for Kafka brokers.

You add tracing configuration to the properties file of the component.

To enable tracing, you set environment variables and add the library of the tracing system to the Kafka classpath. For Jaeger tracing, you can add tracing artifacts for OpenTelemetry with the Jaeger Exporter.

Note

Streams for Apache Kafka no longer supports OpenTracing. If you were previously using OpenTracing with Jaeger, we encourage you to transition to using OpenTelemetry instead.

To enable tracing in Kafka producers, consumers, and Kafka Streams API applications, you instrument application code. When instrumented, clients generate trace data; for example, when producing messages or writing offsets to the log.

Note

Setting up tracing for applications and systems beyond Streams for Apache Kafka is outside the scope of this content.

19.1. Outline of procedures

To set up tracing for Streams for Apache Kafka, follow these procedures in order:

Note

For information on enabling tracing for the Kafka Bridge, see Using the Streams for Apache Kafka Bridge.

19.2. Tracing options

Distributed traces consist of spans, which represent individual units of work performed over a specific time period. When instrumented with tracers, applications generate traces that follow requests as they move through the system, making it easier to identify delays or issues.

OpenTelemetry, a telemetry framework, provides APIs for tracing that are independent of any specific backend tracing system. In Streams for Apache Kafka, the default protocol for transmitting traces between Kafka components and tracing systems is OpenTelemetry’s OTLP (OpenTelemetry Protocol), a vendor-neutral protocol.

While OTLP is the default, Streams for Apache Kafka also supports other tracing systems, such as Jaeger. Jaeger is a distributed tracing system designed for monitoring microservices, and its user interface allows you to query, filter, and analyze trace data in detail.

The Jaeger user interface showing a simple query

The Jaeger user interface showing a simple query

19.3. Environment variables for tracing

Use environment variables to enable tracing for Kafka components or to initialize a tracer for Kafka clients.

Tracing environment variables are subject to change. For the latest information, see the OpenTelemetry documentation.

The following table describes the key environment variables for setting up tracing with OpenTelemetry.

Expand
Table 19.1. OpenTelemetry environment variables
PropertyRequiredDescription

OTEL_SERVICE_NAME

Yes

The name of the tracing service for OpenTelemetry, such as OTLP or Jaeger.

OTEL_EXPORTER_OTLP_ENDPOINT

Yes (if using OTLP exporter)

The OTLP endpoint for exporting trace data to the tracing system. For Jaeger tracing, specify the OTEL_EXPORTER_JAEGER_ENDPOINT. For other tracing systems, specify the appropriate endpoint.

OTEL_TRACES_EXPORTER

No (unless using a non-OTLP exporter)

The exporter used for tracing. The default is otlp, which does not need to be specified. For Jaeger tracing, set this variable to jaeger. For other tracing systems, specify the appropriate exporter.

OTEL_EXPORTER_OTLP_CERTIFICATE

No (required if using TLS with OTLP)

The path to the file containing trusted certificates for TLS authentication. Required to secure communication between Kafka components and the OpenTelemetry endpoint when using TLS with the otlp exporter.

19.4. Enabling tracing for Kafka Connect

Enable distributed tracing for Kafka Connect using configuration properties. Only messages produced and consumed by Kafka Connect itself are traced. To trace messages sent between Kafka Connect and external systems, you must configure tracing in the connectors for those systems.

You can enable tracing that uses OpenTelemetry.

Procedure

  1. Add the tracing artifacts to the opt/kafka/libs directory.
  2. Configure producer and consumer tracing in the relevant Kafka Connect configuration file.

    • If you are running Kafka Connect in standalone mode, edit the ./config/connect-standalone.properties file.
    • If you are running Kafka Connect in distributed mode, edit the ./config/connect-distributed.properties file.

    Add the following tracing interceptor properties to the configuration file:

    Properties for OpenTelemetry

    producer.interceptor.classes=io.opentelemetry.instrumentation.kafkaclients.TracingProducerInterceptor
    consumer.interceptor.classes=io.opentelemetry.instrumentation.kafkaclients.TracingConsumerInterceptor
    Copy to Clipboard Toggle word wrap

    With tracing enabled, you initialize tracing when you run the Kafka Connect script.

  3. Save the configuration file.
  4. Set the environment variables for tracing.
  5. Start Kafka Connect in standalone or distributed mode with the configuration file as a parameter (plus any connector properties):

    Running Kafka Connect in standalone mode

    ./bin/connect-standalone.sh \
    ./config/connect-standalone.properties \
    connector1.properties \
    [connector2.properties ...]
    Copy to Clipboard Toggle word wrap

    Running Kafka Connect in distributed mode

    ./bin/connect-distributed.sh ./config/connect-distributed.properties
    Copy to Clipboard Toggle word wrap

    The internal consumers and producers of Kafka Connect are now enabled for tracing.

19.5. Enabling tracing for MirrorMaker 2

Enable distributed tracing for MirrorMaker 2 by defining the Interceptor properties in the MirrorMaker 2 properties file. Messages are traced between Kafka clusters. The trace data records messages entering and leaving the MirrorMaker 2 component.

You can enable tracing that uses OpenTelemetry.

Procedure

  1. Add the tracing artifacts to the opt/kafka/libs directory.
  2. Configure producer and consumer tracing in the opt/kafka/config/connect-mirror-maker.properties file.

    Add the following tracing interceptor properties to the configuration file:

    Properties for OpenTelemetry

    header.converter=org.apache.kafka.connect.converters.ByteArrayConverter
    producer.interceptor.classes=io.opentelemetry.instrumentation.kafkaclients.TracingProducerInterceptor
    consumer.interceptor.classes=io.opentelemetry.instrumentation.kafkaclients.TracingConsumerInterceptor
    Copy to Clipboard Toggle word wrap

    ByteArrayConverter prevents Kafka Connect from converting message headers (containing trace IDs) to base64 encoding. This ensures that messages are the same in both the source and the target clusters.

    With tracing enabled, you initialize tracing when you run the Kafka MirrorMaker 2 script.

  3. Save the configuration file.
  4. Set the environment variables for tracing.
  5. Start MirrorMaker 2 with the producer and consumer configuration files as parameters:

    ./bin/connect-mirror-maker.sh \
    ./config/connect-mirror-maker.properties
    Copy to Clipboard Toggle word wrap

    The internal consumers and producers of MirrorMaker 2 are now enabled for tracing.

19.6. Enabling tracing for MirrorMaker

Enable distributed tracing for MirrorMaker by passing the Interceptor properties as consumer and producer configuration parameters. Messages are traced from the source cluster to the target cluster. The trace data records messages entering and leaving the MirrorMaker component.

You can enable tracing that uses OpenTelemetry.

Procedure

  1. Add the tracing artifacts to the opt/kafka/libs directory.
  2. Configure producer tracing in the ./config/producer.properties file.

    Add the following tracing interceptor property:

    Producer property for OpenTelemetry

    producer.interceptor.classes=io.opentelemetry.instrumentation.kafkaclients.TracingProducerInterceptor
    Copy to Clipboard Toggle word wrap

  3. Save the configuration file.
  4. Configure consumer tracing in the ./config/consumer.properties file.

    Add the following tracing interceptor property:

    Consumer property for OpenTelemetry

    consumer.interceptor.classes=io.opentelemetry.instrumentation.kafkaclients.TracingConsumerInterceptor
    Copy to Clipboard Toggle word wrap

    With tracing enabled, you initialize tracing when you run the Kafka MirrorMaker script.

  5. Save the configuration file.
  6. Set the environment variables for tracing.
  7. Start MirrorMaker with the producer and consumer configuration files as parameters:

    ./bin/kafka-mirror-maker.sh \
    --producer.config ./config/producer.properties \
    --consumer.config ./config/consumer.properties \
    --num.streams=2
    Copy to Clipboard Toggle word wrap

    The internal consumers and producers of MirrorMaker are now enabled for tracing.

19.7. Initializing tracing for Kafka clients

Initialize a tracer for OpenTelemetry, then instrument your client applications for distributed tracing. You can instrument Kafka producer and consumer clients, and Kafka Streams API applications.

Configure and initialize a tracer using a set of tracing environment variables.

Procedure

In each client application add the dependencies for the tracer:

  1. Add the Maven dependencies to the pom.xml file for the client application:

    Dependencies for OpenTelemetry

    <dependency>
        <groupId>io.opentelemetry.semconv</groupId>
        <artifactId>opentelemetry-semconv</artifactId>
        <version>1.21.0.alpha-redhat-00001</version>
    </dependency>
    <dependency>
        <groupId>io.opentelemetry</groupId>
        <artifactId>opentelemetry-exporter-otlp</artifactId>
        <version>1.34.1.redhat-00001</version>
        <exclusions>
            <exclusion>
                <groupId>io.opentelemetry</groupId>
                <artifactId>opentelemetry-exporter-sender-okhttp</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>io.opentelemetry</groupId>
        <artifactId>opentelemetry-exporter-sender-grpc-managed-channel</artifactId>
        <version>1.34.1.redhat-00001</version>
        <scope>runtime</scope>
    </dependency>
    <dependency>
        <groupId>io.opentelemetry</groupId>
        <artifactId>opentelemetry-sdk-extension-autoconfigure</artifactId>
        <version>1.34.1.redhat-00001</version>
    </dependency>
    <dependency>
        <groupId>io.opentelemetry.instrumentation</groupId>
        <artifactId>opentelemetry-kafka-clients-2.6</artifactId>
        <version>1.32.0.redhat-00001</version>
    </dependency>
    <dependency>
        <groupId>io.opentelemetry</groupId>
        <artifactId>opentelemetry-sdk</artifactId>
        <version>1.34.1.redhat-00001</version>
    </dependency>
    <dependency>
        <groupId>io.opentelemetry</groupId>
        <artifactId>opentelemetry-exporter-sender-jdk</artifactId>
        <version>1.34.1.alpha-redhat-00001</version>
        <scope>runtime</scope>
    </dependency>
    <dependency>
        <groupId>io.grpc</groupId>
        <artifactId>grpc-netty-shaded</artifactId>
        <version>1.61.0.redhat-00004</version>
    </dependency>
    Copy to Clipboard Toggle word wrap

  2. Define the configuration of the tracer using the tracing environment variables.
  3. Create a tracer, which is initialized with the environment variables:

    Creating a tracer for OpenTelemetry

    OpenTelemetry ot = GlobalOpenTelemetry.get();
    Copy to Clipboard Toggle word wrap

  4. Register the tracer as a global tracer:

    GlobalTracer.register(tracer);
    Copy to Clipboard Toggle word wrap
  5. Instrument your client:

19.8. Instrumenting producers and consumers for tracing

Instrument application code to enable tracing in Kafka producers and consumers. Use a decorator pattern or interceptors to instrument your Java producer and consumer application code for tracing. You can then record traces when messages are produced or retrieved from a topic.

OpenTelemetry instrumentation project provides classes that support instrumentation of producers and consumers.

Decorator instrumentation
For decorator instrumentation, create a modified producer or consumer instance for tracing.
Interceptor instrumentation
For interceptor instrumentation, add the tracing capability to the consumer or producer configuration.

Prerequisites

  • You have initialized tracing for the client.

    You enable instrumentation in producer and consumer applications by adding the tracing JARs as dependencies to your project.

Procedure

Perform these steps in the application code of each producer and consumer application. Instrument your client application code using either a decorator pattern or interceptors.

  • To use a decorator pattern, create a modified producer or consumer instance to send or receive messages.

    You pass the original KafkaProducer or KafkaConsumer class.

    Example decorator instrumentation for OpenTelemetry

    // Producer instance
    Producer < String, String > op = new KafkaProducer < > (
        configs,
        new StringSerializer(),
        new StringSerializer()
        );
        Producer < String, String > producer = tracing.wrap(op);
    KafkaTracing tracing = KafkaTracing.create(GlobalOpenTelemetry.get());
    producer.send(...);
    
    //consumer instance
    Consumer<String, String> oc = new KafkaConsumer<>(
        configs,
        new StringDeserializer(),
        new StringDeserializer()
        );
        Consumer<String, String> consumer = tracing.wrap(oc);
    consumer.subscribe(Collections.singleton("mytopic"));
    ConsumerRecords<Integer, String> records = consumer.poll(1000);
    ConsumerRecord<Integer, String> record = ...
    SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);
    Copy to Clipboard Toggle word wrap

  • To use interceptors, set the interceptor class in the producer or consumer configuration.

    You use the KafkaProducer and KafkaConsumer classes in the usual way. The TracingProducerInterceptor and TracingConsumerInterceptor interceptor classes take care of the tracing capability.

    Example producer configuration using interceptors

    senderProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
        TracingProducerInterceptor.class.getName());
    
    KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
    producer.send(...);
    Copy to Clipboard Toggle word wrap

    Example consumer configuration using interceptors

    consumerProps.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
        TracingConsumerInterceptor.class.getName());
    
    KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
    consumer.subscribe(Collections.singletonList("messages"));
    ConsumerRecords<Integer, String> records = consumer.poll(1000);
    ConsumerRecord<Integer, String> record = ...
    SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);
    Copy to Clipboard Toggle word wrap

19.9. Instrumenting Kafka Streams applications for tracing

Instrument application code to enable tracing in Kafka Streams API applications. Use a decorator pattern or interceptors to instrument your Kafka Streams API applications for tracing. You can then record traces when messages are produced or retrieved from a topic.

Decorator instrumentation
For decorator instrumentation, create a modified Kafka Streams instance for tracing. For OpenTelemetry, you need to create a custom TracingKafkaClientSupplier class to provide tracing instrumentation for Kafka Streams.
Interceptor instrumentation
For interceptor instrumentation, add the tracing capability to the Kafka Streams producer and consumer configuration.

Prerequisites

  • You have initialized tracing for the client.

    You enable instrumentation in Kafka Streams applications by adding the tracing JARs as dependencies to your project.

  • To instrument Kafka Streams with OpenTelemetry, you’ll need to write a custom TracingKafkaClientSupplier.
  • The custom TracingKafkaClientSupplier can extend Kafka’s DefaultKafkaClientSupplier, overriding the producer and consumer creation methods to wrap the instances with the telemetry-related code.

    Example custom TracingKafkaClientSupplier

    private class TracingKafkaClientSupplier extends DefaultKafkaClientSupplier {
        @Override
        public Producer<byte[], byte[]> getProducer(Map<String, Object> config) {
            KafkaTelemetry telemetry = KafkaTelemetry.create(GlobalOpenTelemetry.get());
            return telemetry.wrap(super.getProducer(config));
        }
    
        @Override
        public Consumer<byte[], byte[]> getConsumer(Map<String, Object> config) {
            KafkaTelemetry telemetry = KafkaTelemetry.create(GlobalOpenTelemetry.get());
            return telemetry.wrap(super.getConsumer(config));
        }
    
        @Override
        public Consumer<byte[], byte[]> getRestoreConsumer(Map<String, Object> config) {
            return this.getConsumer(config);
        }
    
        @Override
        public Consumer<byte[], byte[]> getGlobalConsumer(Map<String, Object> config) {
            return this.getConsumer(config);
        }
    }
    Copy to Clipboard Toggle word wrap

Procedure

Perform these steps for each Kafka Streams API application.

  • To use a decorator pattern, create an instance of the TracingKafkaClientSupplier supplier interface, then provide the supplier interface to KafkaStreams.

    Example decorator instrumentation

    KafkaClientSupplier supplier = new TracingKafkaClientSupplier(tracer);
    KafkaStreams streams = new KafkaStreams(builder.build(), new StreamsConfig(config), supplier);
    streams.start();
    Copy to Clipboard Toggle word wrap

  • To use interceptors, set the interceptor class in the Kafka Streams producer and consumer configuration.

    The TracingProducerInterceptor and TracingConsumerInterceptor interceptor classes take care of the tracing capability.

    Example producer and consumer configuration using interceptors

    props.put(StreamsConfig.PRODUCER_PREFIX + ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
    props.put(StreamsConfig.CONSUMER_PREFIX + ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
    Copy to Clipboard Toggle word wrap

19.10. Specifying tracing systems with OpenTelemetry

Instead of the default Jaeger system, you can specify other tracing systems that are supported by OpenTelemetry.

If you want to use another tracing system with OpenTelemetry, do the following:

  1. Add the library of the tracing system to the Kafka classpath.
  2. Add the name of the tracing system as an additional exporter environment variable.

    Additional environment variable when not using Jaeger

    OTEL_SERVICE_NAME=my-tracing-service
    OTEL_TRACES_EXPORTER=zipkin 
    1
    
    OTEL_EXPORTER_ZIPKIN_ENDPOINT=http://localhost:9411/api/v2/spans 
    2
    Copy to Clipboard Toggle word wrap

    1
    The name of the tracing system. In this example, Zipkin is specified.
    2
    The endpoint of the specific selected exporter that listens for spans. In this example, a Zipkin endpoint is specified.

19.11. Specifying custom span names for OpenTelemetry

A tracing span is a logical unit of work in Jaeger, with an operation name, start time, and duration. Spans have built-in names, but you can specify custom span names in your Kafka client instrumentation where used.

Specifying custom span names is optional and only applies when using a decorator pattern in producer and consumer client instrumentation or Kafka Streams instrumentation.

Custom span names cannot be specified directly with OpenTelemetry. Instead, you retrieve span names by adding code to your client application to extract additional tags and attributes.

Example code to extract attributes

//Defines attribute extraction for a producer
private static class ProducerAttribExtractor implements AttributesExtractor < ProducerRecord < ? , ? > , Void > {
    @Override
    public void onStart(AttributesBuilder attributes, ProducerRecord < ? , ? > producerRecord) {
        set(attributes, AttributeKey.stringKey("prod_start"), "prod1");
    }
    @Override
    public void onEnd(AttributesBuilder attributes, ProducerRecord < ? , ? > producerRecord, @Nullable Void unused, @Nullable Throwable error) {
        set(attributes, AttributeKey.stringKey("prod_end"), "prod2");
    }
}
//Defines attribute extraction for a consumer
private static class ConsumerAttribExtractor implements AttributesExtractor < ConsumerRecord < ? , ? > , Void > {
    @Override
    public void onStart(AttributesBuilder attributes, ConsumerRecord < ? , ? > producerRecord) {
        set(attributes, AttributeKey.stringKey("con_start"), "con1");
    }
    @Override
    public void onEnd(AttributesBuilder attributes, ConsumerRecord < ? , ? > producerRecord, @Nullable Void unused, @Nullable Throwable error) {
        set(attributes, AttributeKey.stringKey("con_end"), "con2");
    }
}
//Extracts the attributes
public static void main(String[] args) throws Exception {
        Map < String, Object > configs = new HashMap < > (Collections.singletonMap(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"));
        System.setProperty("otel.traces.exporter", "jaeger");
        System.setProperty("otel.service.name", "myapp1");
        KafkaTracing tracing = KafkaTracing.newBuilder(GlobalOpenTelemetry.get())
            .addProducerAttributesExtractors(new ProducerAttribExtractor())
            .addConsumerAttributesExtractors(new ConsumerAttribExtractor())
            .build();
Copy to Clipboard Toggle word wrap

返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat