Chapter 16. Setting up distributed tracing
Distributed tracing allows you to track the progress of transactions between applications in a distributed system. In a microservices architecture, tracing tracks the progress of transactions between services. Trace data is useful for monitoring application performance and investigating issues with target systems and end-user applications.
In Streams for Apache Kafka, tracing facilitates the end-to-end tracking of messages: from source systems to Kafka, and then from Kafka to target systems and applications. It complements the metrics that are available to view in JMX metrics, as well as the component loggers.
Support for tracing is built in to the following Kafka components:
- Kafka Connect
- MirrorMaker
- MirrorMaker 2
- Streams for Apache Kafka Bridge
Tracing is not supported for Kafka brokers.
You add tracing configuration to the properties file of the component.
To enable tracing, you set environment variables and add the library of the tracing system to the Kafka classpath. For Jaeger tracing, you can add tracing artifacts for OpenTelemetry with the Jaeger Exporter.
Streams for Apache Kafka no longer supports OpenTracing. If you were previously using OpenTracing with Jaeger, we encourage you to transition to using OpenTelemetry instead.
To enable tracing in Kafka producers, consumers, and Kafka Streams API applications, you instrument application code. When instrumented, clients generate trace data; for example, when producing messages or writing offsets to the log.
Setting up tracing for applications and systems beyond Streams for Apache Kafka is outside the scope of this content.
16.1. Outline of procedures
To set up tracing for Streams for Apache Kafka, follow these procedures in order:
Set up tracing for Kafka Connect, MirrorMaker 2, and MirrorMaker:
Set up tracing for clients:
Instrument clients with tracers:
For information on enabling tracing for the Kafka Bridge, see Using the Streams for Apache Kafka Bridge.
16.2. Tracing options
Use OpenTelemetry with the Jaeger tracing system.
OpenTelemetry provides an API specification that is independent from the tracing or monitoring system.
You use the APIs to instrument application code for tracing.
- Instrumented applications generate traces for individual requests across the distributed system.
- Traces are composed of spans that define specific units of work over time.
Jaeger is a tracing system for microservices-based distributed systems.
- The Jaeger user interface allows you to query, filter, and analyze trace data.
The Jaeger user interface showing a simple query
Additional resources
16.3. Environment variables for tracing
Use environment variables when you are enabling tracing for Kafka components or initializing a tracer for Kafka clients.
Tracing environment variables are subject to change. For the latest information, see the OpenTelemetry documentation.
The following tables describe the key environment variables for setting up a tracer.
Property | Required | Description |
---|---|---|
| Yes | The name of the Jaeger tracing service for OpenTelemetry. |
| Yes | The exporter used for tracing. |
| Yes |
The exporter used for tracing. Set to |
16.4. Enabling tracing for Kafka Connect
Enable distributed tracing for Kafka Connect using configuration properties. Only messages produced and consumed by Kafka Connect itself are traced. To trace messages sent between Kafka Connect and external systems, you must configure tracing in the connectors for those systems.
You can enable tracing that uses OpenTelemetry.
Procedure
-
Add the tracing artifacts to the
opt/kafka/libs
directory. Configure producer and consumer tracing in the relevant Kafka Connect configuration file.
-
If you are running Kafka Connect in standalone mode, edit the
/opt/kafka/config/connect-standalone.properties
file. -
If you are running Kafka Connect in distributed mode, edit the
/opt/kafka/config/connect-distributed.properties
file.
Add the following tracing interceptor properties to the configuration file:
Properties for OpenTelemetry
producer.interceptor.classes=io.opentelemetry.instrumentation.kafkaclients.TracingProducerInterceptor consumer.interceptor.classes=io.opentelemetry.instrumentation.kafkaclients.TracingConsumerInterceptor
With tracing enabled, you initialize tracing when you run the Kafka Connect script.
-
If you are running Kafka Connect in standalone mode, edit the
- Save the configuration file.
- Set the environment variables for tracing.
Start Kafka Connect in standalone or distributed mode with the configuration file as a parameter (plus any connector properties):
Running Kafka Connect in standalone mode
su - kafka /opt/kafka/bin/connect-standalone.sh \ /opt/kafka/config/connect-standalone.properties \ connector1.properties \ [connector2.properties ...]
Running Kafka Connect in distributed mode
su - kafka /opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.properties
The internal consumers and producers of Kafka Connect are now enabled for tracing.
16.5. Enabling tracing for MirrorMaker 2
Enable distributed tracing for MirrorMaker 2 by defining the Interceptor properties in the MirrorMaker 2 properties file. Messages are traced between Kafka clusters. The trace data records messages entering and leaving the MirrorMaker 2 component.
You can enable tracing that uses OpenTelemetry.
Procedure
-
Add the tracing artifacts to the
opt/kafka/libs
directory. Configure producer and consumer tracing in the
opt/kafka/config/connect-mirror-maker.properties
file.Add the following tracing interceptor properties to the configuration file:
Properties for OpenTelemetry
header.converter=org.apache.kafka.connect.converters.ByteArrayConverter producer.interceptor.classes=io.opentelemetry.instrumentation.kafkaclients.TracingProducerInterceptor consumer.interceptor.classes=io.opentelemetry.instrumentation.kafkaclients.TracingConsumerInterceptor
ByteArrayConverter
prevents Kafka Connect from converting message headers (containing trace IDs) to base64 encoding. This ensures that messages are the same in both the source and the target clusters.With tracing enabled, you initialize tracing when you run the Kafka MirrorMaker 2 script.
- Save the configuration file.
- Set the environment variables for tracing.
Start MirrorMaker 2 with the producer and consumer configuration files as parameters:
su - kafka /opt/kafka/bin/connect-mirror-maker.sh \ /opt/kafka/config/connect-mirror-maker.properties
The internal consumers and producers of MirrorMaker 2 are now enabled for tracing.
16.6. Enabling tracing for MirrorMaker
Enable distributed tracing for MirrorMaker by passing the Interceptor properties as consumer and producer configuration parameters. Messages are traced from the source cluster to the target cluster. The trace data records messages entering and leaving the MirrorMaker component.
You can enable tracing that uses OpenTelemetry.
Procedure
-
Add the tracing artifacts to the
opt/kafka/libs
directory. Configure producer tracing in the
/opt/kafka/config/producer.properties
file.Add the following tracing interceptor property:
Producer property for OpenTelemetry
producer.interceptor.classes=io.opentelemetry.instrumentation.kafkaclients.TracingProducerInterceptor
- Save the configuration file.
Configure consumer tracing in the
/opt/kafka/config/consumer.properties
file.Add the following tracing interceptor property:
Consumer property for OpenTelemetry
consumer.interceptor.classes=io.opentelemetry.instrumentation.kafkaclients.TracingConsumerInterceptor
With tracing enabled, you initialize tracing when you run the Kafka MirrorMaker script.
- Save the configuration file.
- Set the environment variables for tracing.
Start MirrorMaker with the producer and consumer configuration files as parameters:
su - kafka /opt/kafka/bin/kafka-mirror-maker.sh \ --producer.config /opt/kafka/config/producer.properties \ --consumer.config /opt/kafka/config/consumer.properties \ --num.streams=2
The internal consumers and producers of MirrorMaker are now enabled for tracing.
16.7. Initializing tracing for Kafka clients
Initialize a tracer for OpenTelemetry, then instrument your client applications for distributed tracing. You can instrument Kafka producer and consumer clients, and Kafka Streams API applications.
Configure and initialize a tracer using a set of tracing environment variables.
Procedure
In each client application add the dependencies for the tracer:
Add the Maven dependencies to the
pom.xml
file for the client application:Dependencies for OpenTelemetry
<dependency> <groupId>io.opentelemetry.semconv</groupId> <artifactId>opentelemetry-semconv</artifactId> <version>1.21.0-alpha</version> </dependency> <dependency> <groupId>io.opentelemetry</groupId> <artifactId>opentelemetry-exporter-otlp</artifactId> <version>1.34.1</version> <exclusions> <exclusion> <groupId>io.opentelemetry</groupId> <artifactId>opentelemetry-exporter-sender-okhttp</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>io.opentelemetry</groupId> <artifactId>opentelemetry-exporter-sender-grpc-managed-channel</artifactId> <version>1.34.1</version> <scope>runtime</scope> </dependency> <dependency> <groupId>io.opentelemetry</groupId> <artifactId>opentelemetry-sdk-extension-autoconfigure</artifactId> <version>1.34.1</version> </dependency> <dependency> <groupId>io.opentelemetry.instrumentation</groupId> <artifactId>opentelemetry-kafka-clients-2.6</artifactId> <version>1.32.0-alpha</version> </dependency> <dependency> <groupId>io.opentelemetry</groupId> <artifactId>opentelemetry-sdk</artifactId> <version>1.34.1</version> </dependency> <dependency> <groupId>io.opentelemetry</groupId> <artifactId>opentelemetry-exporter-sender-jdk</artifactId> <version>1.34.1-alpha</version> <scope>runtime</scope> </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-netty-shaded</artifactId> <version>1.61.0</version> </dependency>
- Define the configuration of the tracer using the tracing environment variables.
Create a tracer, which is initialized with the environment variables:
Creating a tracer for OpenTelemetry
OpenTelemetry ot = GlobalOpenTelemetry.get();
Register the tracer as a global tracer:
GlobalTracer.register(tracer);
Instrument your client:
16.8. Instrumenting producers and consumers for tracing
Instrument application code to enable tracing in Kafka producers and consumers. Use a decorator pattern or interceptors to instrument your Java producer and consumer application code for tracing. You can then record traces when messages are produced or retrieved from a topic.
OpenTelemetry instrumentation project provides classes that support instrumentation of producers and consumers.
- Decorator instrumentation
- For decorator instrumentation, create a modified producer or consumer instance for tracing.
- Interceptor instrumentation
- For interceptor instrumentation, add the tracing capability to the consumer or producer configuration.
Prerequisites
You have initialized tracing for the client.
You enable instrumentation in producer and consumer applications by adding the tracing JARs as dependencies to your project.
Procedure
Perform these steps in the application code of each producer and consumer application. Instrument your client application code using either a decorator pattern or interceptors.
To use a decorator pattern, create a modified producer or consumer instance to send or receive messages.
You pass the original
KafkaProducer
orKafkaConsumer
class.Example decorator instrumentation for OpenTelemetry
// Producer instance Producer < String, String > op = new KafkaProducer < > ( configs, new StringSerializer(), new StringSerializer() ); Producer < String, String > producer = tracing.wrap(op); KafkaTracing tracing = KafkaTracing.create(GlobalOpenTelemetry.get()); producer.send(...); //consumer instance Consumer<String, String> oc = new KafkaConsumer<>( configs, new StringDeserializer(), new StringDeserializer() ); Consumer<String, String> consumer = tracing.wrap(oc); consumer.subscribe(Collections.singleton("mytopic")); ConsumerRecords<Integer, String> records = consumer.poll(1000); ConsumerRecord<Integer, String> record = ... SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);
To use interceptors, set the interceptor class in the producer or consumer configuration.
You use the
KafkaProducer
andKafkaConsumer
classes in the usual way. TheTracingProducerInterceptor
andTracingConsumerInterceptor
interceptor classes take care of the tracing capability.Example producer configuration using interceptors
senderProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName()); KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps); producer.send(...);
Example consumer configuration using interceptors
consumerProps.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName()); KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps); consumer.subscribe(Collections.singletonList("messages")); ConsumerRecords<Integer, String> records = consumer.poll(1000); ConsumerRecord<Integer, String> record = ... SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);
16.9. Instrumenting Kafka Streams applications for tracing
Instrument application code to enable tracing in Kafka Streams API applications. Use a decorator pattern or interceptors to instrument your Kafka Streams API applications for tracing. You can then record traces when messages are produced or retrieved from a topic.
- Decorator instrumentation
-
For decorator instrumentation, create a modified Kafka Streams instance for tracing. For OpenTelemetry, you need to create a custom
TracingKafkaClientSupplier
class to provide tracing instrumentation for Kafka Streams. - Interceptor instrumentation
- For interceptor instrumentation, add the tracing capability to the Kafka Streams producer and consumer configuration.
Prerequisites
You have initialized tracing for the client.
You enable instrumentation in Kafka Streams applications by adding the tracing JARs as dependencies to your project.
-
To instrument Kafka Streams with OpenTelemetry, you’ll need to write a custom
TracingKafkaClientSupplier
. The custom
TracingKafkaClientSupplier
can extend Kafka’sDefaultKafkaClientSupplier
, overriding the producer and consumer creation methods to wrap the instances with the telemetry-related code.Example custom
TracingKafkaClientSupplier
private class TracingKafkaClientSupplier extends DefaultKafkaClientSupplier { @Override public Producer<byte[], byte[]> getProducer(Map<String, Object> config) { KafkaTelemetry telemetry = KafkaTelemetry.create(GlobalOpenTelemetry.get()); return telemetry.wrap(super.getProducer(config)); } @Override public Consumer<byte[], byte[]> getConsumer(Map<String, Object> config) { KafkaTelemetry telemetry = KafkaTelemetry.create(GlobalOpenTelemetry.get()); return telemetry.wrap(super.getConsumer(config)); } @Override public Consumer<byte[], byte[]> getRestoreConsumer(Map<String, Object> config) { return this.getConsumer(config); } @Override public Consumer<byte[], byte[]> getGlobalConsumer(Map<String, Object> config) { return this.getConsumer(config); } }
Procedure
Perform these steps for each Kafka Streams API application.
To use a decorator pattern, create an instance of the
TracingKafkaClientSupplier
supplier interface, then provide the supplier interface toKafkaStreams
.Example decorator instrumentation
KafkaClientSupplier supplier = new TracingKafkaClientSupplier(tracer); KafkaStreams streams = new KafkaStreams(builder.build(), new StreamsConfig(config), supplier); streams.start();
To use interceptors, set the interceptor class in the Kafka Streams producer and consumer configuration.
The
TracingProducerInterceptor
andTracingConsumerInterceptor
interceptor classes take care of the tracing capability.Example producer and consumer configuration using interceptors
props.put(StreamsConfig.PRODUCER_PREFIX + ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName()); props.put(StreamsConfig.CONSUMER_PREFIX + ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
16.10. Specifying tracing systems with OpenTelemetry
Instead of the default Jaeger system, you can specify other tracing systems that are supported by OpenTelemetry.
If you want to use another tracing system with OpenTelemetry, do the following:
- Add the library of the tracing system to the Kafka classpath.
Add the name of the tracing system as an additional exporter environment variable.
Additional environment variable when not using Jaeger
OTEL_SERVICE_NAME=my-tracing-service OTEL_TRACES_EXPORTER=zipkin 1 OTEL_EXPORTER_ZIPKIN_ENDPOINT=http://localhost:9411/api/v2/spans 2
Additional resources
16.11. Specifying custom span names for OpenTelemetry
A tracing span is a logical unit of work in Jaeger, with an operation name, start time, and duration. Spans have built-in names, but you can specify custom span names in your Kafka client instrumentation where used.
Specifying custom span names is optional and only applies when using a decorator pattern in producer and consumer client instrumentation or Kafka Streams instrumentation.
Custom span names cannot be specified directly with OpenTelemetry. Instead, you retrieve span names by adding code to your client application to extract additional tags and attributes.
Example code to extract attributes
//Defines attribute extraction for a producer private static class ProducerAttribExtractor implements AttributesExtractor < ProducerRecord < ? , ? > , Void > { @Override public void onStart(AttributesBuilder attributes, ProducerRecord < ? , ? > producerRecord) { set(attributes, AttributeKey.stringKey("prod_start"), "prod1"); } @Override public void onEnd(AttributesBuilder attributes, ProducerRecord < ? , ? > producerRecord, @Nullable Void unused, @Nullable Throwable error) { set(attributes, AttributeKey.stringKey("prod_end"), "prod2"); } } //Defines attribute extraction for a consumer private static class ConsumerAttribExtractor implements AttributesExtractor < ConsumerRecord < ? , ? > , Void > { @Override public void onStart(AttributesBuilder attributes, ConsumerRecord < ? , ? > producerRecord) { set(attributes, AttributeKey.stringKey("con_start"), "con1"); } @Override public void onEnd(AttributesBuilder attributes, ConsumerRecord < ? , ? > producerRecord, @Nullable Void unused, @Nullable Throwable error) { set(attributes, AttributeKey.stringKey("con_end"), "con2"); } } //Extracts the attributes public static void main(String[] args) throws Exception { Map < String, Object > configs = new HashMap < > (Collections.singletonMap(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")); System.setProperty("otel.traces.exporter", "jaeger"); System.setProperty("otel.service.name", "myapp1"); KafkaTracing tracing = KafkaTracing.newBuilder(GlobalOpenTelemetry.get()) .addProducerAttributesExtractors(new ProducerAttribExtractor()) .addConsumerAttributesExtractors(new ConsumerAttribExtractor()) .build();