16.9. Kafka Streams アプリケーションのトレース用のインストルメント化
アプリケーションコードを計測して、Kafka Streams API アプリケーションでのトレースを有効にします。デコレーターパターンまたはインターセプターを使用して、トレース用に Kafka Streams API アプリケーションをインストルメント化します。続いて、メッセージが生成されたとき、またはトピックから取得されたときにトレースを記録できます。
- デコレーターのインストルメント化
-
デコレーターのインストルメント化は、トレース用に変更した Kafka Streams インスタンスを作成します。OpenTelemetry の場合、Kafka Streams のトレースのインストルメント化を提供する
TracingKafkaClientSupplier
カスタムクラスを作成する必要があります。 - インターセプターのインストルメント化
- インターセプターインストルメント化の場合は、トレース機能を Kafka Streams プロデューサーおよびコンシューマー設定に追加します。
前提条件
クライアントのトレースを初期化 している。
トレース JAR を依存関係としてプロジェクトに追加して、Kafka Streams アプリケーションでインストルメント化を有効にしている。
-
OpenTelemetry で Kafka Streams をインストルメント化するために、カスタムの
TracingKafkaClientSupplier
を記述している。 カスタム
TracingKafkaClientSupplier
が Kafka のDefaultKafkaClientSupplier
を拡張し、プロデューサーとコンシューマーの作成メソッドを上書きして、インスタンスを Telemetry 関連のコードでラップできるようにしている。カスタム
TracingKafkaClientSupplier
の例private class TracingKafkaClientSupplier extends DefaultKafkaClientSupplier { @Override public Producer<byte[], byte[]> getProducer(Map<String, Object> config) { KafkaTelemetry telemetry = KafkaTelemetry.create(GlobalOpenTelemetry.get()); return telemetry.wrap(super.getProducer(config)); } @Override public Consumer<byte[], byte[]> getConsumer(Map<String, Object> config) { KafkaTelemetry telemetry = KafkaTelemetry.create(GlobalOpenTelemetry.get()); return telemetry.wrap(super.getConsumer(config)); } @Override public Consumer<byte[], byte[]> getRestoreConsumer(Map<String, Object> config) { return this.getConsumer(config); } @Override public Consumer<byte[], byte[]> getGlobalConsumer(Map<String, Object> config) { return this.getConsumer(config); } }
手順
Kafka Streams API アプリケーションごとにこの手順を実行します。
デコレーターパターンを使用するには、
TracingKafkaClientSupplier
サプライヤーインターフェイスのインスタンスを作成し、そのサプライヤーインターフェイスをKafkaStreams
に提供します。デコレーターのインストルメント化の例
KafkaClientSupplier supplier = new TracingKafkaClientSupplier(tracer); KafkaStreams streams = new KafkaStreams(builder.build(), new StreamsConfig(config), supplier); streams.start();
インターセプターを使用するには、Kafka Streams プロデューサーおよびコンシューマー設定でインターセプタークラスを設定します。
TracingProducerInterceptor
およびTracingConsumerInterceptor
インターセプタークラスは、トレース機能を処理します。インターセプターを使用したプロデューサーとコンシューマーの設定例
props.put(StreamsConfig.PRODUCER_PREFIX + ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName()); props.put(StreamsConfig.CONSUMER_PREFIX + ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());