16.9. Kafka Streams アプリケーションのトレース用のインストルメント化


アプリケーションコードを計測して、Kafka Streams API アプリケーションでのトレースを有効にします。デコレーターパターンまたはインターセプターを使用して、トレース用に Kafka Streams API アプリケーションをインストルメント化します。続いて、メッセージが生成されたとき、またはトピックから取得されたときにトレースを記録できます。

デコレーターのインストルメント化
デコレーターのインストルメント化は、トレース用に変更した Kafka Streams インスタンスを作成します。OpenTelemetry の場合、Kafka Streams のトレースのインストルメント化を提供する TracingKafkaClientSupplier カスタムクラスを作成する必要があります。
インターセプターのインストルメント化
インターセプターインストルメント化の場合は、トレース機能を Kafka Streams プロデューサーおよびコンシューマー設定に追加します。

前提条件

  • クライアントのトレースを初期化 している。

    トレース JAR を依存関係としてプロジェクトに追加して、Kafka Streams アプリケーションでインストルメント化を有効にしている。

  • OpenTelemetry で Kafka Streams をインストルメント化するために、カスタムの TracingKafkaClientSupplier を記述している。
  • カスタム TracingKafkaClientSupplier が Kafka の DefaultKafkaClientSupplier を拡張し、プロデューサーとコンシューマーの作成メソッドを上書きして、インスタンスを Telemetry 関連のコードでラップできるようにしている。

    カスタム TracingKafkaClientSupplier の例

    private class TracingKafkaClientSupplier extends DefaultKafkaClientSupplier {
        @Override
        public Producer<byte[], byte[]> getProducer(Map<String, Object> config) {
            KafkaTelemetry telemetry = KafkaTelemetry.create(GlobalOpenTelemetry.get());
            return telemetry.wrap(super.getProducer(config));
        }
    
        @Override
        public Consumer<byte[], byte[]> getConsumer(Map<String, Object> config) {
            KafkaTelemetry telemetry = KafkaTelemetry.create(GlobalOpenTelemetry.get());
            return telemetry.wrap(super.getConsumer(config));
        }
    
        @Override
        public Consumer<byte[], byte[]> getRestoreConsumer(Map<String, Object> config) {
            return this.getConsumer(config);
        }
    
        @Override
        public Consumer<byte[], byte[]> getGlobalConsumer(Map<String, Object> config) {
            return this.getConsumer(config);
        }
    }

手順

Kafka Streams API アプリケーションごとにこの手順を実行します。

  • デコレーターパターンを使用するには、TracingKafkaClientSupplier サプライヤーインターフェイスのインスタンスを作成し、そのサプライヤーインターフェイスを KafkaStreams に提供します。

    デコレーターのインストルメント化の例

    KafkaClientSupplier supplier = new TracingKafkaClientSupplier(tracer);
    KafkaStreams streams = new KafkaStreams(builder.build(), new StreamsConfig(config), supplier);
    streams.start();

  • インターセプターを使用するには、Kafka Streams プロデューサーおよびコンシューマー設定でインターセプタークラスを設定します。

    TracingProducerInterceptor および TracingConsumerInterceptor インターセプタークラスは、トレース機能を処理します。

    インターセプターを使用したプロデューサーとコンシューマーの設定例

    props.put(StreamsConfig.PRODUCER_PREFIX + ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
    props.put(StreamsConfig.CONSUMER_PREFIX + ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());

Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.