16.2.2. トレーシングのための Kafka プロデューサーおよびコンシューマーのインストルメント化
Decorator パターンまたは Interceptor を使用して、Java プロデューサーおよびコンシューマーアプリケーションコードをトレーシング用にインストルメント化します。
手順
各プロデューサーおよびコンシューマーアプリケーションのアプリケーションコードで以下を行います。
OpenTracing の Maven 依存関係を、プロデューサーまたはコンシューマーの
pom.xml
ファイルに追加します。<dependency> <groupId>io.opentracing.contrib</groupId> <artifactId>opentracing-kafka-client</artifactId> <version>0.1.15.redhat-00001</version> </dependency>
Decorator パターンまたは Interceptor のいずれかを使用して、クライアントアプリケーションコードをインストルメント化します。
Decorator パターンを使用する場合は以下を行います。
// Create an instance of the KafkaProducer: KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps); // Create an instance of the TracingKafkaProducer: TracingKafkaProducer<Integer, String> tracingProducer = new TracingKafkaProducer<>(producer, tracer); // Send: tracingProducer.send(...); // Create an instance of the KafkaConsumer: KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps); // Create an instance of the TracingKafkaConsumer: TracingKafkaConsumer<Integer, String> tracingConsumer = new TracingKafkaConsumer<>(consumer, tracer); // Subscribe: tracingConsumer.subscribe(Collections.singletonList("messages")); // Get messages: ConsumerRecords<Integer, String> records = tracingConsumer.poll(1000); // Retrieve SpanContext from polled record (consumer side): ConsumerRecord<Integer, String> record = ... SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);
インターセプターを使用する場合は以下を使用します。
// Register the tracer with GlobalTracer: GlobalTracer.register(tracer); // Add the TracingProducerInterceptor to the sender properties: senderProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName()); // Create an instance of the KafkaProducer: KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps); // Send: producer.send(...); // Add the TracingConsumerInterceptor to the consumer properties: consumerProps.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName()); // Create an instance of the KafkaConsumer: KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps); // Subscribe: consumer.subscribe(Collections.singletonList("messages")); // Get messages: ConsumerRecords<Integer, String> records = consumer.poll(1000); // Retrieve the SpanContext from a polled message (consumer side): ConsumerRecord<Integer, String> record = ... SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);
Decorator パターンのカスタムスパン名
スパン は Jaeger の論理作業単位で、操作名、開始時間、および期間が含まれます。
Decorator パターンを使用してプロデューサーおよびコンシューマーの各アプリケーションをインストルメント化する場合、TracingKafkaProducer
および TracingKafkaConsumer
オブジェクトの作成時に BiFunction
オブジェクトを追加の引数として渡すと、カスタムスパン名を定義できます。OpenTracing の Apache Kafka Client Instrumentation ライブラリーには、複数の組み込みスパン名が含まれています。
例: カスタムスパン名を使用した Decorator パターンでのクライアントアプリケーションコードのインストルメント化
// Create a BiFunction for the KafkaProducer that operates on (String operationName, ProducerRecord consumerRecord) and returns a String to be used as the name: BiFunction<String, ProducerRecord, String> producerSpanNameProvider = (operationName, producerRecord) -> "CUSTOM_PRODUCER_NAME"; // Create an instance of the KafkaProducer: KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps); // Create an instance of the TracingKafkaProducer TracingKafkaProducer<Integer, String> tracingProducer = new TracingKafkaProducer<>(producer, tracer, producerSpanNameProvider); // Spans created by the tracingProducer will now have "CUSTOM_PRODUCER_NAME" as the span name. // Create a BiFunction for the KafkaConsumer that operates on (String operationName, ConsumerRecord consumerRecord) and returns a String to be used as the name: BiFunction<String, ConsumerRecord, String> consumerSpanNameProvider = (operationName, consumerRecord) -> operationName.toUpperCase(); // Create an instance of the KafkaConsumer: KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps); // Create an instance of the TracingKafkaConsumer, passing in the consumerSpanNameProvider BiFunction: TracingKafkaConsumer<Integer, String> tracingConsumer = new TracingKafkaConsumer<>(consumer, tracer, consumerSpanNameProvider); // Spans created by the tracingConsumer will have the operation name as the span name, in upper-case. // "receive" -> "RECEIVE"
ビルトインスパン名
カスタムスパン名を定義するとき、ClientSpanNameProvider
クラスで以下の BiFunctions
を使用できます。spanNameProvider
の指定がない場合は、CONSUMER_OPERATION_NAME
および PRODUCER_OPERATION_NAME
が使用されます。
BiFunction | 説明 |
---|---|
|
|
|
|
|
メッセージの送信先または送信元となったトピックの名前を |
|
|
|
操作名およびトピック名を |
|
|