Chapter 13. Distributed tracing
AMQ Streams on Red Hat Enterprise Linux supports distributed tracing with Jaeger for:
- Kafka Producers, Kafka Consumers, and Kafka Streams applications (referred to as Kafka clients)
- MirrorMaker and Kafka Connect
- Kafka Bridge
The steps to configure distributed tracing vary by client and component. However, three high-level tasks are involved:
- Enable a Jaeger tracer for the client or component.
Enable the Interceptors for the client or component.
- For Kafka clients, enabling the Interceptors involves instrumenting your application code for OpenTracing.
- For MirrorMaker, Kafka Connect, and the Kafka Bridge, enabling the Interceptors involves setting configuration properties for each component.
- Set tracing environment variables before deploying the client or component.
This chapter provides an overview of distributed tracing, as well as instructions for configuring supported clients and components. Setting up distributed tracing for applications and systems beyond AMQ Streams is outside the scope of this chapter. To learn more about this subject, see the OpenTracing documentation and search for "inject and extract".
Distributed tracing is not supported for Kafka brokers.
Prerequisites
- The Jaeger backend components are deployed to the host operating system. For deployment instructions, see the Jaeger deployment documentation.
13.1. Overview of distributed tracing
Distributed tracing allows developers and system administrators to track the progress of transactions between applications (and services in a microservice architecture) in a distributed system. This information is useful for monitoring application performance and investigating issues with target systems and end-user applications.
In AMQ Streams, distributed tracing facilitates the end-to-end tracking of messages: from source systems to the Kafka cluster and then to target systems and applications.
As an aspect of system observability, distributed tracing complements the metrics that are available to view in Grafana dashboards and the available loggers for each component.
13.1.1. OpenTracing and Jaeger
The OpenTracing and Jaeger projects are used to implement distributed tracing in AMQ Streams.
OpenTracing
The OpenTracing specification defines APIs that developers can use to instrument applications for distributed tracing. When you instrument an application, you add instrumentation code in order to monitor the execution of individual transactions. When instrumented, applications generate traces when individual transactions occur. Traces are composed of spans, which define specific units of work.
To simplify the instrumentation of Kafka clients, AMQ Streams includes the OpenTracing Apache Kafka Client Instrumentation library.
The OpenTracing project is merging with the OpenCensus project to form the OpenTelemetry project. OpenTelemetry will provide compatibility for applications that are instrumented using the OpenTracing APIs.
Jaeger
Jaeger, a tracing system, is an implementation of the OpenTracing APIs. Jaeger is used for monitoring and troubleshooting microservices-based distributed systems and provides client libraries for instrumenting applications.
Jaeger samples the total traces generated by an application, based on a set sampling strategy, and then visualizes them in a user interface. This allows you to visualize, query, filter, and analyze trace data.
An example of a query in the Jaeger user interface
13.2. Configuring Kafka clients for tracing
This section describes how to configure Kafka clients (Kafka Producers, Kafka Consumers, and Kafka Streams applications) for distributed tracing.
13.2.1. Enabling a Jaeger tracer for Kafka clients
Configure and enable a Jaeger tracer using the tracing environment variables.
Procedure
Perform the following steps for each Kafka client (Kafka Producer, Kafka Consumer, and Kafka Streams application):
Add Maven dependencies for Jaeger to the
pom.xml
file for the client:<dependency> <groupId>io.jaegertracing</groupId> <artifactId>jaeger-client</artifactId> <version>1.0.0</version> </dependency>
- Define the configuration of the Jaeger tracer using the tracing environment variables.
Create the Jaeger tracer from the environment variables that you defined in step two:
Tracer tracer = Configuration.fromEnv().getTracer();
NoteFor alternative ways to initialize a Jaeger tracer, see the Java OpenTracing library documentation.
Register the Jaeger tracer as a global tracer:
GlobalTracer.register(tracer);
A Jaeger tracer is now enabled for the Kafka client to use.
13.2.2. Instrumenting Kafka Producers and Consumers for tracing
Use a Decorator pattern or Interceptors to instrument your Kafka Producer and Consumer application code for distributed tracing. When instrumented, the Interceptors in the Kafka Producer or Consumer are enabled.
Procedure
Perform the following steps in the application code of each Kafka Producer and Consumer:
Add a Maven dependency for OpenTracing to the Producer or Consumer’s
pom.xml
file.<dependency> <groupId>io.opentracing.contrib</groupId> <artifactId>opentracing-kafka-client</artifactId> <version>0.1.4</version> </dependency>
Instrument your client application code using either a Decorator pattern or Interceptors.
If you prefer to use a Decorator pattern, use following example:
// Create an instance of the KafkaProducer: KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps); // Create an instance of the TracingKafkaProducer: TracingKafkaProducer<Integer, String> tracingProducer = new TracingKafkaProducer<>(producer, tracer); // Send: tracingProducer.send(...); // Create an instance of the KafkaConsumer: KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps); // Create an instance of the TracingKafkaConsumer: TracingKafkaConsumer<Integer, String> tracingConsumer = new TracingKafkaConsumer<>(consumer, tracer); // Subscribe: tracingConsumer.subscribe(Collections.singletonList("messages")); // Get messages: ConsumerRecords<Integer, String> records = tracingConsumer.poll(1000); // Retrieve SpanContext from polled record (consumer side): ConsumerRecord<Integer, String> record = ... SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);
If you prefer to use Interceptors, use the following example:
// Register the tracer with GlobalTracer: GlobalTracer.register(tracer); // Add the TracingProducerInterceptor to the sender properties: senderProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName()); // Create an instance of the KafkaProducer: KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps); // Send: producer.send(...); // Add the TracingConsumerInterceptor to the consumer properties: consumerProps.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName()); // Create an instance of the KafkaConsumer: KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps); // Subscribe: consumer.subscribe(Collections.singletonList("messages")); // Get messages: ConsumerRecords<Integer, String> records = consumer.poll(1000); // Retrieve the SpanContext from a polled message (consumer side): ConsumerRecord<Integer, String> record = ... SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);
13.2.2.1. Custom span names in a Decorator pattern
A span is a logical unit of work in Jaeger, with an operation name, start time, and duration.
If you use a Decorator pattern to instrument your Kafka Producer and Consumer applications, you can define custom span names by passing a BiFunction
object as an additional argument when creating the TracingKafkaProducer
and TracingKafkaConsumer
objects. The OpenTracing Apache Kafka Client Instrumentation library includes several built-in span names, which are described below.
Example: Using custom span names to instrument client application code in a Decorator pattern
// Create a BiFunction for the KafkaProducer that operates on (String operationName, ProducerRecord consumerRecord) and returns a String to be used as the name: BiFunction<String, ProducerRecord, String> producerSpanNameProvider = (operationName, producerRecord) -> "CUSTOM_PRODUCER_NAME"; // Create an instance of the KafkaProducer: KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps); // Create an instance of the TracingKafkaProducer TracingKafkaProducer<Integer, String> tracingProducer = new TracingKafkaProducer<>(producer, tracer, producerSpanNameProvider); // Spans created by the tracingProducer will now have "CUSTOM_PRODUCER_NAME" as the span name. // Create a BiFunction for the KafkaConsumer that operates on (String operationName, ConsumerRecord consumerRecord) and returns a String to be used as the name: BiFunction<String, ConsumerRecord, String> consumerSpanNameProvider = (operationName, consumerRecord) -> operationName.toUpperCase(); // Create an instance of the KafkaConsumer: KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps); // Create an instance of the TracingKafkaConsumer, passing in the consumerSpanNameProvider BiFunction: TracingKafkaConsumer<Integer, String> tracingConsumer = new TracingKafkaConsumer<>(consumer, tracer, consumerSpanNameProvider); // Spans created by the tracingConsumer will have the operation name as the span name, in upper-case. // "receive" -> "RECEIVE"
13.2.2.2. Built-in span names
When defining custom span names, you can use the following BiFunctions
in the ClientSpanNameProvider
class. If no spanNameProvider
is specified, CONSUMER_OPERATION_NAME
and PRODUCER_OPERATION_NAME
are used.
BiFunction | Description |
---|---|
|
Returns the |
|
Returns a String concatenation of |
|
Returns the name of the topic that the message was sent to or retrieved from in the format |
|
Returns a String concatenation of |
|
Returns the operation name and the topic name: |
|
Returns a String concatenation of |
13.2.3. Instrumenting Kafka Streams applications for tracing
Instrument Kafka Streams applications for distributed tracing using a supplier interface. This enables the Interceptors in the application.
Procedure
Perform the following steps for each Kafka Streams application:
Add the
opentracing-kafka-streams
dependency to the Kafka Streams application’spom.xml
file.<dependency> <groupId>io.opentracing.contrib</groupId> <artifactId>opentracing-kafka-streams</artifactId> <version>0.1.4</version> </dependency>
Create an instance of the
TracingKafkaClientSupplier
supplier interface:KafkaClientSupplier supplier = new TracingKafkaClientSupplier(tracer);
Provide the supplier interface to
KafkaStreams
:KafkaStreams streams = new KafkaStreams(builder.build(), new StreamsConfig(config), supplier); streams.start();
13.3. Configuring MirrorMaker and Kafka Connect for tracing
This section describes how to configure MirrorMaker and Kafka Connect for distributed tracing.
You must enable a Jaeger tracer for each component.
13.3.1. Enabling tracing for MirrorMaker
Enable distributed tracing for MirrorMaker by passing the Interceptor properties as consumer and producer configuration parameters.
Messages are traced from the source cluster to the target cluster; the trace data records messages entering and leaving the MirrorMaker component.
Procedure
- Configure and enable a Jaeger tracer.
Edit the
/opt/kafka/config/consumer.properties
file.Add the following Interceptor property:
consumer.interceptor.classes=io.opentracing.contrib.kafka.TracingConsumerInterceptor
Edit the
/opt/kafka/config/producer.properties
file.Add the following Interceptor property:
producer.interceptor.classes=io.opentracing.contrib.kafka.TracingProducerInterceptor
Start MirrorMaker with the consumer and producer configuration files as parameters:
su - kafka /opt/kafka/bin/kafka-mirror-maker.sh --consumer.config /opt/kafka/config/consumer.properties --producer.config /opt/kafka/config/producer.properties --num.streams=2
13.3.2. Enabling tracing for Kafka Connect
Enable distributed tracing for Kafka Connect using configuration properties.
Only messages produced and consumed by Kafka Connect itself are traced. To trace messages sent between Kafka Connect and external systems, you must configure tracing in the connectors for those systems.
Procedure
- Configure and enable a Jaeger tracer.
Edit the relevant Kafka Connect configuration file.
-
If you are running Kafka Connect in standalone mode, edit the
/opt/kafka/config/connect-standalone.properties
file. -
If you are running Kafka Connect in distributed mode, edit the
/opt/kafka/config/connect-distributed.properties
file.
-
If you are running Kafka Connect in standalone mode, edit the
Add the following properties to the configuration file:
producer.interceptor.classes=io.opentracing.contrib.kafka.TracingProducerInterceptor consumer.interceptor.classes=io.opentracing.contrib.kafka.TracingConsumerInterceptor
- Save the configuration file.
- Set tracing environment variables and then run Kafka Connect in standalone or distributed mode.
The Interceptors in Kafka Connect’s internal consumers and producers are now enabled.
13.4. Enabling tracing for the Kafka Bridge
Enable distributed tracing for the Kafka Bridge by editing the Kafka Bridge configuration file. You can then deploy a Kafka Bridge instance that is configured for distributed tracing to the host operating system.
Traces are generated for the following transactions:
- The Kafka Bridge sends messages to HTTP clients and consumes messages from HTTP clients.
- HTTP clients send HTTP requests to send and receive messages through the Kafka Bridge.
In order to have end-to-end tracing, you must also configure tracing in your HTTP clients.
Procedure
Edit the
config/application.properties
file in the Kafka Bridge installation directory.Remove the code comments from the following line:
bridge.tracing=jaeger
- Save the configuration file.
Run the
bin/kafka_bridge_run.sh
script using the configuration properties as a parameter:cd kafka-bridge-0.xy.x.redhat-0000x ./bin/kafka_bridge_run.sh --config-file=config/application.properties
The Interceptors in the Kafka Bridge’s internal consumers and producers are now enabled.
Additional resources
13.5. Tracing environment variables
Use these environment variables when configuring a Jaeger tracer for Kafka clients and components.
The tracing environment variables are part of the Jaeger project and are subject to change. For the latest environment variables, see the Jaeger documentation.
Property | Required | Description |
---|---|---|
| Yes | The name of the Jaeger tracer service. |
| No |
The hostname for communicating with the |
| No |
The port used for communicating with the |
| No |
The traces endpoint. Only define this variable if the client application will bypass the |
| No | The authentication token to send to the endpoint as a bearer token. |
| No | The username to send to the endpoint if using basic authentication. |
| No | The password to send to the endpoint if using basic authentication. |
| No |
A comma-separated list of formats to use for propagating the trace context. Defaults to the standard Jaeger format. Valid values are |
| No | Indicates whether the reporter should also log the spans. |
| No | The reporter’s maximum queue size. |
| No | The reporter’s flush interval, in ms. Defines how frequently the Jaeger reporter flushes span batches. |
| No | The sampling strategy to use for client traces: Constant, Probabilistic, Rate Limiting, or Remote (the default type). To sample all traces, use the Constant sampling strategy with a parameter of 1. For more information, see the Jaeger documentation. |
| No | The sampler parameter (number). |
| No | The hostname and port to use if a Remote sampling strategy is selected. |
| No | A comma-separated list of tracer-level tags that are added to all reported spans.
The value can also refer to an environment variable using the format |