Chapter 15. Setting up distributed tracing
Distributed tracing allows you to track the progress of transactions between applications in a distributed system. In a microservices architecture, tracing tracks the progress of transactions between services. Trace data is useful for monitoring application performance and investigating issues with target systems and end-user applications.
In AMQ Streams, tracing facilitates the end-to-end tracking of messages: from source systems to Kafka, and then from Kafka to target systems and applications. It complements the metrics that are available to view in JMX metrics, as well as the component loggers.
Support for tracing is built in to the following Kafka components:
- Kafka Connect
- MirrorMaker
- MirrorMaker 2
- AMQ Streams Kafka Bridge
Tracing is not supported for Kafka brokers.
You add tracing configuration to the properties file of the component.
To enable tracing, you set environment variables and add the library of the tracing system to the Kafka classpath. For Jaeger tracing, you can add tracing artifacts for the following systems:
- OpenTelemetry with the Jaeger Exporter
- OpenTracing with Jaeger
Support for OpenTracing is deprecated. The Jaeger clients are now retired and the OpenTracing project archived. As such, we cannot guarantee their support for future Kafka versions.
To enable tracing in Kafka producers, consumers, and Kafka Streams API applications, you instrument application code. When instrumented, clients generate trace data; for example, when producing messages or writing offsets to the log.
Setting up tracing for applications and systems beyond AMQ Streams is outside the scope of this content.
15.1. Outline of procedures
To set up tracing for AMQ Streams, follow these procedures in order:
Set up tracing for MirrorMaker, MirrorMaker 2, and Kafka Connect:
Set up tracing for clients:
Instrument clients with tracers:
For information on enabling tracing for the Kafka Bridge, see Using the AMQ Streams Kafka Bridge.
15.2. Tracing options
Use OpenTelemetry or OpenTracing (deprecated) with the Jaeger tracing system.
OpenTelemetry and OpenTracing provide API specifications that are independent from the tracing or monitoring system.
You use the APIs to instrument application code for tracing.
- Instrumented applications generate traces for individual requests across the distributed system.
- Traces are composed of spans that define specific units of work over time.
Jaeger is a tracing system for microservices-based distributed systems.
- Jaeger implements the tracing APIs and provides client libraries for instrumentation.
- The Jaeger user interface allows you to query, filter, and analyze trace data.
The Jaeger user interface showing a simple query
Additional resources
15.3. Environment variables for tracing
Use environment variables when you are enabling tracing for Kafka components or initializing a tracer for Kafka clients.
Tracing environment variables are subject to change. For the latest information, see the OpenTelemetry documentation and OpenTracing documentation.
The following tables describe the key environment variables for setting up a tracer.
Property | Required | Description |
---|---|---|
| Yes | The name of the Jaeger tracing service for OpenTelemetry. |
| Yes | The exporter used for tracing. |
| Yes |
The exporter used for tracing. Set to |
Property | Required | Description |
---|---|---|
| Yes | The name of the Jaeger tracer service. |
| No |
The hostname for communicating with the |
| No |
The port used for communicating with the |
15.4. Enabling tracing for Kafka Connect
Enable distributed tracing for Kafka Connect using configuration properties. Only messages produced and consumed by Kafka Connect itself are traced. To trace messages sent between Kafka Connect and external systems, you must configure tracing in the connectors for those systems.
You can enable tracing that uses OpenTelemetry or OpenTracing.
Procedure
-
Add the tracing artifacts to the
opt/kafka/libs
directory. Configure producer and consumer tracing in the relevant Kafka Connect configuration file.
-
If you are running Kafka Connect in standalone mode, edit the
/opt/kafka/config/connect-standalone.properties
file. -
If you are running Kafka Connect in distributed mode, edit the
/opt/kafka/config/connect-distributed.properties
file.
Add the following tracing interceptor properties to the configuration file:
Properties for OpenTelemetry
producer.interceptor.classes=io.opentelemetry.instrumentation.kafkaclients.TracingProducerInterceptor consumer.interceptor.classes=io.opentelemetry.instrumentation.kafkaclients.TracingConsumerInterceptor
Properties for OpenTracing
producer.interceptor.classes=io.opentracing.contrib.kafka.TracingProducerInterceptor consumer.interceptor.classes=io.opentracing.contrib.kafka.TracingConsumerInterceptor
With tracing enabled, you initialize tracing when you run the Kafka Connect script.
-
If you are running Kafka Connect in standalone mode, edit the
- Save the configuration file.
- Set the environment variables for tracing.
Start Kafka Connect in standalone or distributed mode with the configuration file as a parameter (plus any connector properties):
Running Kafka Connect in standalone mode
su - kafka /opt/kafka/bin/connect-standalone.sh \ /opt/kafka/config/connect-standalone.properties \ connector1.properties \ [connector2.properties ...]
Running Kafka Connect in distributed mode
su - kafka /opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.properties
The internal consumers and producers of Kafka Connect are now enabled for tracing.
15.5. Enabling tracing for MirrorMaker 2
Enable distributed tracing for MirrorMaker 2 by defining the Interceptor properties in the MirrorMaker 2 properties file. Messages are traced between Kafka clusters. The trace data records messages entering and leaving the MirrorMaker 2 component.
You can enable tracing that uses OpenTelemetry or OpenTracing.
Procedure
-
Add the tracing artifacts to the
opt/kafka/libs
directory. Configure producer and consumer tracing in the
opt/kafka/config/connect-mirror-maker.properties
file.Add the following tracing interceptor properties to the configuration file:
Properties for OpenTelemetry
header.converter=org.apache.kafka.connect.converters.ByteArrayConverter producer.interceptor.classes=io.opentelemetry.instrumentation.kafkaclients.TracingProducerInterceptor consumer.interceptor.classes=io.opentelemetry.instrumentation.kafkaclients.TracingConsumerInterceptor
Properties for OpenTracing
header.converter=org.apache.kafka.connect.converters.ByteArrayConverter producer.interceptor.classes=io.opentracing.contrib.kafka.TracingProducerInterceptor consumer.interceptor.classes=io.opentracing.contrib.kafka.TracingConsumerInterceptor
ByteArrayConverter
prevents Kafka Connect from converting message headers (containing trace IDs) to base64 encoding. This ensures that messages are the same in both the source and the target clusters.With tracing enabled, you initialize tracing when you run the Kafka MirrorMaker 2 script.
- Save the configuration file.
- Set the environment variables for tracing.
Start MirrorMaker 2 with the producer and consumer configuration files as parameters:
su - kafka /opt/kafka/bin/connect-mirror-maker.sh \ /opt/kafka/config/connect-mirror-maker.properties
The internal consumers and producers of MirrorMaker 2 are now enabled for tracing.
15.6. Enabling tracing for MirrorMaker
Enable distributed tracing for MirrorMaker by passing the Interceptor properties as consumer and producer configuration parameters. Messages are traced from the source cluster to the target cluster. The trace data records messages entering and leaving the MirrorMaker component.
You can enable tracing that uses OpenTelemetry or OpenTracing.
Procedure
-
Add the tracing artifacts to the
opt/kafka/libs
directory. Configure producer tracing in the
/opt/kafka/config/producer.properties
file.Add the following tracing interceptor property:
Producer property for OpenTelemetry
producer.interceptor.classes=io.opentelemetry.instrumentation.kafkaclients.TracingProducerInterceptor
Producer property for OpenTracing
producer.interceptor.classes=io.opentracing.contrib.kafka.TracingProducerInterceptor
- Save the configuration file.
Configure consumer tracing in the
/opt/kafka/config/consumer.properties
file.Add the following tracing interceptor property:
Consumer property for OpenTelemetry
consumer.interceptor.classes=io.opentelemetry.instrumentation.kafkaclients.TracingConsumerInterceptor
Consumer property for OpenTracing
consumer.interceptor.classes=io.opentracing.contrib.kafka.TracingConsumerInterceptor
With tracing enabled, you initialize tracing when you run the Kafka MirrorMaker script.
- Save the configuration file.
- Set the environment variables for tracing.
Start MirrorMaker with the producer and consumer configuration files as parameters:
su - kafka /opt/kafka/bin/kafka-mirror-maker.sh \ --producer.config /opt/kafka/config/producer.properties \ --consumer.config /opt/kafka/config/consumer.properties \ --num.streams=2
The internal consumers and producers of MirrorMaker are now enabled for tracing.
15.7. Initializing tracing for Kafka clients
Initialize a tracer, then instrument your client applications for distributed tracing. You can instrument Kafka producer and consumer clients, and Kafka Streams API applications. You can initialize a tracer for OpenTracing or OpenTelemetry.
Configure and initialize a tracer using a set of tracing environment variables.
Procedure
In each client application add the dependencies for the tracer:
Add the Maven dependencies to the
pom.xml
file for the client application:Dependencies for OpenTelemetry
<dependency> <groupId>io.opentelemetry</groupId> <artifactId>opentelemetry-sdk-extension-autoconfigure</artifactId> <version>1.18.0.redhat-00001</version> </dependency> <dependency> <groupId>io.opentelemetry.instrumentation</groupId> <artifactId>opentelemetry-kafka-clients-{OpenTelemetryKafkaClient}</artifactId> <version>1.18.0.redhat-00001</version> </dependency> <dependency> <groupId>io.opentelemetry</groupId> <artifactId>opentelemetry-exporter-otlp</artifactId> <version>1.18.0.redhat-00001</version> </dependency>
Dependencies for OpenTracing
<dependency> <groupId>io.jaegertracing</groupId> <artifactId>jaeger-client</artifactId> <version>1.8.1.redhat-00002</version> </dependency> <dependency> <groupId>io.opentracing.contrib</groupId> <artifactId>opentracing-kafka-client</artifactId> <version>0.1.15.redhat-00006</version> </dependency>
- Define the configuration of the tracer using the tracing environment variables.
Create a tracer, which is initialized with the environment variables:
Creating a tracer for OpenTelemetry
OpenTelemetry ot = GlobalOpenTelemetry.get();
Creating a tracer for OpenTracing
Tracer tracer = Configuration.fromEnv().getTracer();
Register the tracer as a global tracer:
GlobalTracer.register(tracer);
Instrument your client:
15.8. Instrumenting producers and consumers for tracing
Instrument application code to enable tracing in Kafka producers and consumers. Use a decorator pattern or interceptors to instrument your Java producer and consumer application code for tracing. You can then record traces when messages are produced or retrieved from a topic.
OpenTelemetry and OpenTracing instrumentation projects provide classes that support instrumentation of producers and consumers.
- Decorator instrumentation
- For decorator instrumentation, create a modified producer or consumer instance for tracing. Decorator instrumentation is different for OpenTelemetry and OpenTracing.
- Interceptor instrumentation
- For interceptor instrumentation, add the tracing capability to the consumer or producer configuration. Interceptor instrumentation is the same for OpenTelemetry and OpenTracing.
Prerequisites
You have initialized tracing for the client.
You enable instrumentation in producer and consumer applications by adding the tracing JARs as dependencies to your project.
Procedure
Perform these steps in the application code of each producer and consumer application. Instrument your client application code using either a decorator pattern or interceptors.
To use a decorator pattern, create a modified producer or consumer instance to send or receive messages.
You pass the original
KafkaProducer
orKafkaConsumer
class.Example decorator instrumentation for OpenTelemetry
// Producer instance Producer < String, String > op = new KafkaProducer < > ( configs, new StringSerializer(), new StringSerializer() ); Producer < String, String > producer = tracing.wrap(op); KafkaTracing tracing = KafkaTracing.create(GlobalOpenTelemetry.get()); producer.send(...); //consumer instance Consumer<String, String> oc = new KafkaConsumer<>( configs, new StringDeserializer(), new StringDeserializer() ); Consumer<String, String> consumer = tracing.wrap(oc); consumer.subscribe(Collections.singleton("mytopic")); ConsumerRecords<Integer, String> records = consumer.poll(1000); ConsumerRecord<Integer, String> record = ... SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);
Example decorator instrumentation for OpenTracing
//producer instance KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps); TracingKafkaProducer<Integer, String> tracingProducer = new TracingKafkaProducer<>(producer, tracer); TracingKafkaProducer.send(...) //consumer instance KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps); TracingKafkaConsumer<Integer, String> tracingConsumer = new TracingKafkaConsumer<>(consumer, tracer); tracingConsumer.subscribe(Collections.singletonList("mytopic")); ConsumerRecords<Integer, String> records = tracingConsumer.poll(1000); ConsumerRecord<Integer, String> record = ... SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);
To use interceptors, set the interceptor class in the producer or consumer configuration.
You use the
KafkaProducer
andKafkaConsumer
classes in the usual way. TheTracingProducerInterceptor
andTracingConsumerInterceptor
interceptor classes take care of the tracing capability.Example producer configuration using interceptors
senderProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName()); KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps); producer.send(...);
Example consumer configuration using interceptors
consumerProps.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName()); KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps); consumer.subscribe(Collections.singletonList("messages")); ConsumerRecords<Integer, String> records = consumer.poll(1000); ConsumerRecord<Integer, String> record = ... SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);
15.9. Instrumenting Kafka Streams applications for tracing
Instrument application code to enable tracing in Kafka Streams API applications. Use a decorator pattern or interceptors to instrument your Kafka Streams API applications for tracing. You can then record traces when messages are produced or retrieved from a topic.
- Decorator instrumentation
-
For decorator instrumentation, create a modified Kafka Streams instance for tracing. The OpenTracing instrumentation project provides a
TracingKafkaClientSupplier
class that supports instrumentation of Kafka Streams. You create a wrapped instance of theTracingKafkaClientSupplier
supplier interface, which provides tracing instrumentation for Kafka Streams. For OpenTelemetry, the process is the same but you need to create a customTracingKafkaClientSupplier
class to provide the support. - Interceptor instrumentation
- For interceptor instrumentation, add the tracing capability to the Kafka Streams producer and consumer configuration.
Prerequisites
You have initialized tracing for the client.
You enable instrumentation in Kafka Streams applications by adding the tracing JARs as dependencies to your project.
-
To instrument Kafka Streams with OpenTelemetry, you’ll need to write a custom
TracingKafkaClientSupplier
. The custom
TracingKafkaClientSupplier
can extend Kafka’sDefaultKafkaClientSupplier
, overriding the producer and consumer creation methods to wrap the instances with the telemetry-related code.Example custom
TracingKafkaClientSupplier
private class TracingKafkaClientSupplier extends DefaultKafkaClientSupplier { @Override public Producer<byte[], byte[]> getProducer(Map<String, Object> config) { KafkaTelemetry telemetry = KafkaTelemetry.create(GlobalOpenTelemetry.get()); return telemetry.wrap(super.getProducer(config)); } @Override public Consumer<byte[], byte[]> getConsumer(Map<String, Object> config) { KafkaTelemetry telemetry = KafkaTelemetry.create(GlobalOpenTelemetry.get()); return telemetry.wrap(super.getConsumer(config)); } @Override public Consumer<byte[], byte[]> getRestoreConsumer(Map<String, Object> config) { return this.getConsumer(config); } @Override public Consumer<byte[], byte[]> getGlobalConsumer(Map<String, Object> config) { return this.getConsumer(config); } }
Procedure
Perform these steps for each Kafka Streams API application.
To use a decorator pattern, create an instance of the
TracingKafkaClientSupplier
supplier interface, then provide the supplier interface toKafkaStreams
.Example decorator instrumentation
KafkaClientSupplier supplier = new TracingKafkaClientSupplier(tracer); KafkaStreams streams = new KafkaStreams(builder.build(), new StreamsConfig(config), supplier); streams.start();
To use interceptors, set the interceptor class in the Kafka Streams producer and consumer configuration.
The
TracingProducerInterceptor
andTracingConsumerInterceptor
interceptor classes take care of the tracing capability.Example producer and consumer configuration using interceptors
props.put(StreamsConfig.PRODUCER_PREFIX + ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName()); props.put(StreamsConfig.CONSUMER_PREFIX + ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
15.10. Specifying tracing systems with OpenTelemetry
Instead of the default Jaeger system, you can specify other tracing systems that are supported by OpenTelemetry.
If you want to use another tracing system with OpenTelemetry, do the following:
- Add the library of the tracing system to the Kafka classpath.
Add the name of the tracing system as an additional exporter environment variable.
Additional environment variable when not using Jaeger
OTEL_SERVICE_NAME=my-tracing-service OTEL_TRACES_EXPORTER=zipkin 1 OTEL_EXPORTER_ZIPKIN_ENDPOINT=http://localhost:9411/api/v2/spans 2
Additional resources
15.11. Custom span names
A tracing span is a logical unit of work in Jaeger, with an operation name, start time, and duration. Spans have built-in names, but you can specify custom span names in your Kafka client instrumentation where used.
Specifying custom span names is optional and only applies when using a decorator pattern in producer and consumer client instrumentation or Kafka Streams instrumentation.
15.11.1. Specifying span names for OpenTelemetry
Custom span names cannot be specified directly with OpenTelemetry. Instead, you retrieve span names by adding code to your client application to extract additional tags and attributes.
Example code to extract attributes
//Defines attribute extraction for a producer private static class ProducerAttribExtractor implements AttributesExtractor < ProducerRecord < ? , ? > , Void > { @Override public void onStart(AttributesBuilder attributes, ProducerRecord < ? , ? > producerRecord) { set(attributes, AttributeKey.stringKey("prod_start"), "prod1"); } @Override public void onEnd(AttributesBuilder attributes, ProducerRecord < ? , ? > producerRecord, @Nullable Void unused, @Nullable Throwable error) { set(attributes, AttributeKey.stringKey("prod_end"), "prod2"); } } //Defines attribute extraction for a consumer private static class ConsumerAttribExtractor implements AttributesExtractor < ConsumerRecord < ? , ? > , Void > { @Override public void onStart(AttributesBuilder attributes, ConsumerRecord < ? , ? > producerRecord) { set(attributes, AttributeKey.stringKey("con_start"), "con1"); } @Override public void onEnd(AttributesBuilder attributes, ConsumerRecord < ? , ? > producerRecord, @Nullable Void unused, @Nullable Throwable error) { set(attributes, AttributeKey.stringKey("con_end"), "con2"); } } //Extracts the attributes public static void main(String[] args) throws Exception { Map < String, Object > configs = new HashMap < > (Collections.singletonMap(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")); System.setProperty("otel.traces.exporter", "jaeger"); System.setProperty("otel.service.name", "myapp1"); KafkaTracing tracing = KafkaTracing.newBuilder(GlobalOpenTelemetry.get()) .addProducerAttributesExtractors(new ProducerAttribExtractor()) .addConsumerAttributesExtractors(new ConsumerAttribExtractor()) .build();
15.11.2. Specifying span names for OpenTracing
To specify custom span names for OpenTracing, pass a BiFunction
object as an additional argument when instrumenting producers and consumers.
For more information on built-in names and specifying custom span names to instrument client application code in a decorator pattern, see the OpenTracing Apache Kafka client instrumentation.