14.2.2. トレーシングのための Kafka Producer および Consumer のインストルメント化
Decorator パターンまたは Interceptor を使用して、Java Producer および Consumer アプリケーションコードを分散トレーシング用にインストルメント化します。インストルメント化すると、Kafka プロデューサーまたはコンシューマーのインターセプターが有効になります。
手順
各 Kafka プロデューサーおよびコンシューマーのアプリケーションコードで次の手順を実行します。
OpenTracing の Maven 依存関係を、プロデューサーまたはコンシューマーの
pom.xml
ファイルに追加します。<dependency> <groupId>io.opentracing.contrib</groupId> <artifactId>opentracing-kafka-client</artifactId> <version>0.1.12.redhat-00001</version> </dependency>
Decorator パターンまたは Interceptor のいずれかを使用して、クライアントアプリケーションコードをインストルメント化します。
デコレーターパターンを使用する場合は、以下の例を使用します。
// Create an instance of the KafkaProducer: KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps); // Create an instance of the TracingKafkaProducer: TracingKafkaProducer<Integer, String> tracingProducer = new TracingKafkaProducer<>(producer, tracer); // Send: tracingProducer.send(...); // Create an instance of the KafkaConsumer: KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps); // Create an instance of the TracingKafkaConsumer: TracingKafkaConsumer<Integer, String> tracingConsumer = new TracingKafkaConsumer<>(consumer, tracer); // Subscribe: tracingConsumer.subscribe(Collections.singletonList("messages")); // Get messages: ConsumerRecords<Integer, String> records = tracingConsumer.poll(1000); // Retrieve SpanContext from polled record (consumer side): ConsumerRecord<Integer, String> record = ... SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);
インターセプターを使用する場合は、以下の例を使用します。
// Register the tracer with GlobalTracer: GlobalTracer.register(tracer); // Add the TracingProducerInterceptor to the sender properties: senderProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName()); // Create an instance of the KafkaProducer: KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps); // Send: producer.send(...); // Add the TracingConsumerInterceptor to the consumer properties: consumerProps.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName()); // Create an instance of the KafkaConsumer: KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps); // Subscribe: consumer.subscribe(Collections.singletonList("messages")); // Get messages: ConsumerRecords<Integer, String> records = consumer.poll(1000); // Retrieve the SpanContext from a polled message (consumer side): ConsumerRecord<Integer, String> record = ... SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);
14.2.2.1. Decorator パターンのカスタムスパン名
スパン は Jaeger の論理作業単位で、操作名、開始時間、および期間が含まれます。
Decorator パターンを使用して Kafka Producer および Consumer の各アプリケーションをインストルメント化する場合、TracingKafkaProducer
および TracingKafkaConsumer
オブジェクトの作成時に BiFunction
オブジェクトを追加の引数として渡すと、カスタムスパン名を定義できます。OpenTracing の Apache Kafka Client Instrumentation ライブラリーには、以下のようなビルトインスパン名がいくつか含まれています。
例: カスタムスパン名を使用した Decorator パターンでのクライアントアプリケーションコードのインストルメント化
// Create a BiFunction for the KafkaProducer that operates on (String operationName, ProducerRecord consumerRecord) and returns a String to be used as the name: BiFunction<String, ProducerRecord, String> producerSpanNameProvider = (operationName, producerRecord) -> "CUSTOM_PRODUCER_NAME"; // Create an instance of the KafkaProducer: KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps); // Create an instance of the TracingKafkaProducer TracingKafkaProducer<Integer, String> tracingProducer = new TracingKafkaProducer<>(producer, tracer, producerSpanNameProvider); // Spans created by the tracingProducer will now have "CUSTOM_PRODUCER_NAME" as the span name. // Create a BiFunction for the KafkaConsumer that operates on (String operationName, ConsumerRecord consumerRecord) and returns a String to be used as the name: BiFunction<String, ConsumerRecord, String> consumerSpanNameProvider = (operationName, consumerRecord) -> operationName.toUpperCase(); // Create an instance of the KafkaConsumer: KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps); // Create an instance of the TracingKafkaConsumer, passing in the consumerSpanNameProvider BiFunction: TracingKafkaConsumer<Integer, String> tracingConsumer = new TracingKafkaConsumer<>(consumer, tracer, consumerSpanNameProvider); // Spans created by the tracingConsumer will have the operation name as the span name, in upper-case. // "receive" -> "RECEIVE"